Module moog_demos.example_configs.match_to_sample

Match-to-sample task.

In this task there are some colored circles (targets) on a ring, and an agent avatar in the center of the ring. After an initial stimulus period where the colored targets are visible, they all turn grey and begin rotating together. After some time they stop and a colored cue appears on the agent avatar. The subject must identify the target that had the same color as the cue and respond by navigating towards that target.

This task requires workimg memory of multiple objects with features (colors).

Functions

def get_config(num_targets)
Expand source code
def get_config(num_targets):
    """Get environment config.
    
    Args:
        num_targets: Int. Number of targets.
    """
    if num_targets == 0 or not isinstance(num_targets, int):
        raise ValueError(
            f'num_targets is {num_targets}, but must be a positive integer')

    ############################################################################
    # State initialization
    ############################################################################

    screen = sprite.Sprite(
        x=0.5, y=0.5, shape='square', scale=2., c0=0.6, c1=0.7, c2=0.7)
        
    target_factor_distrib = distribs.Product(
        [distribs.Continuous('c0', 0., 1.)],
        shape='circle', scale=0.085, c1=1., c2=1.,
    )
    cover_factors = dict(
        mass=0., shape='circle', scale=0.1, c0=0., c1=0., c2=0.5, opacity=0)
    
    def state_initializer():
        """State initializer method to be fed into environment."""

        # Get targets and covers
        sprite_positions = 0.5 + 0.35 * _get_polygon(num_targets, 0.7)
        target_factors = [
            target_factor_distrib.sample() for _ in range(num_targets)]
        targets = [
            sprite.Sprite(x=pos[0], y=pos[1], **factors)
            for pos, factors in zip(sprite_positions, target_factors)
        ]
        covers = [
            sprite.Sprite(x=pos[0], y=pos[1], **cover_factors)
            for pos in sprite_positions
        ]

        # Tag the cover metadata based on whether they are prey or not
        for i, s in enumerate(covers):
            if i == 0:
                s.metadata = {'prey': True}
            else:
                s.metadata = {'prey': False}
        
        # Make cue have the same factors as the first target, except slightly
        # smaller
        cue_factors = copy.deepcopy(target_factors[0])
        cue_factors['scale'] = 0.7 * target_factors[0]['scale']
        cue = sprite.Sprite(
            x=0.5, y=0.501, opacity=0, mass=np.inf, **cue_factors)

        agent = sprite.Sprite(
            x=0.5, y=0.5, shape='circle', scale=0.1, c0=0.4, c1=0., c2=1.,
            mass=np.inf)
        annulus_verts = shapes.annulus_vertices(0.34, 0.36)
        annulus = sprite.Sprite(
            x=0.5, y=0.5, shape=annulus_verts, scale=1., c0=0., c1=0., c2=0.3)
        
        state = collections.OrderedDict([
            ('annulus', [annulus]),
            ('targets', targets),
            ('covers', covers),
            ('agent', [agent]),
            ('cue', [cue]),
            ('screen', [screen]),
        ])
        return state

    ################################################################################
    # Physics
    ################################################################################

    drag = (physics_lib.Drag(coeff_friction=0.25), ['agent', 'cue'])
    tether_covers = physics_lib.TetherZippedLayers(
        ('targets', 'covers'), anchor=np.array([0.5, 0.5]))
    physics = physics_lib.Physics(
        drag,
        updates_per_env_step=1,
        corrective_physics=[tether_covers],
    )

    ################################################################################
    # Task
    ################################################################################

    contact_task = tasks.ContactReward(
        reward_fn=lambda _, s: 1 if s.metadata['prey'] else -1,
        layers_0='agent',
        layers_1='covers',
    )

    def _should_reset(state, meta_state):
        should_reset = (
            state['covers'][0].opacity == 0 and
            meta_state['phase'] == 'response'
        )
        return should_reset
    reset_task = tasks.Reset(
        condition=_should_reset,
        steps_after_condition=15,
    )

    task = tasks.CompositeTask(contact_task, reset_task, timeout_steps=800)
    
    ################################################################################
    # Action Space
    ################################################################################

    action_space = action_spaces.Joystick(
        scaling_factor=0.01, action_layers=['agent', 'cue'])

    ################################################################################
    # Observer
    ################################################################################

    _polygon_modifier = observers.polygon_modifiers.FirstPersonAgent(
        agent_layer='agent')
    observer = observers.PILRenderer(
        image_size=(64, 64),
        anti_aliasing=1,
        color_to_rgb='hsv_to_rgb',
        polygon_modifier=_polygon_modifier,
    )

    ############################################################################
    # Game rules
    ############################################################################

    def _make_opaque(s):
        s.opacity = 255
    def _make_transparent(s):
        s.opacity = 0

    # Screen Phase

    screen_phase = gr.Phase(duration=1, name='screen')

    # Visible Phase

    disappear_screen = gr.ModifySprites('screen', _make_transparent)
    visible_phase = gr.Phase(
        one_time_rules=disappear_screen, duration=2, name='visible')

    # Motion Phase

    def _move(s):
        s.velocity = np.random.uniform(-0.25, 0.25, size=(2,))

    cover_targets = gr.ModifySprites('covers', _make_opaque)
    begin_motion = BeginMotion(angle_vel_range=(0.1, 0.3))
    motion_phase = gr.Phase(
        one_time_rules=[cover_targets, begin_motion],
        duration=100,
        name='motion',
    )

    # Response Phase

    def _stop(s):
        s.angle_vel = 0.
        s.velocity = np.zeros(2)
    def _unglue(s):
        s.mass = 1.

    appear_cue = gr.ModifySprites('cue', _make_opaque)
    stop_targets = gr.ModifySprites(('targets', 'covers'), _stop)
    unglue_agent = gr.ModifySprites(('agent', 'cue'), _unglue)
    make_targets_discoverable = gr.ModifyOnContact(
        layers_0='agent', layers_1='covers', modifier_1=_make_transparent)

    response_phase = gr.Phase(
        one_time_rules=[appear_cue, stop_targets, unglue_agent],
        continual_rules=make_targets_discoverable,
        name='response',
    )

    phase_sequence = gr.PhaseSequence(
        screen_phase, visible_phase, motion_phase, response_phase,
        meta_state_phase_name_key='phase',
    )

    ############################################################################
    # Final config
    ############################################################################

    config = {
        'state_initializer': state_initializer,
        'physics': physics,
        'task': task,
        'action_space': action_space,
        'observers': {'image': observer},
        'game_rules': (phase_sequence,),
        'meta_state_initializer': lambda: {'phase': ''}
    }
    return config

Get environment config.

Args

num_targets
Int. Number of targets.

Classes

class BeginMotion (angle_vel_range)
Expand source code
class BeginMotion(gr.AbstractRule):
    """Custom rule for motion initiation."""
    
    def __init__(self, angle_vel_range):
        """Constructor.

        Args:
            angle_vel_range: Tuple of non-negative floats. Range of angular
                velocity magnitudes, in radians per step.
        """
        self._angle_vel_range = angle_vel_range

    def step(self, state, meta_state):
        del meta_state
        targets = state['targets']
        covers = state['covers']
        angle_vel = np.random.uniform(*self._angle_vel_range)
        angle_vel *= (2 * np.random.randint(2) - 1)
        for t, c in zip(targets, covers):
            relative_pos = t.position - 0.5
            perpendicular = np.matmul(np.array([[0, -1], [1, 0]]), relative_pos)
            radius = np.linalg.norm(relative_pos)
            vel = perpendicular * radius * angle_vel
            t.velocity = vel
            c.velocity = vel

Custom rule for motion initiation.

Constructor.

Args

angle_vel_range
Tuple of non-negative floats. Range of angular velocity magnitudes, in radians per step.

Ancestors

  • moog.game_rules.abstract_rule.AbstractRule
  • abc.ABC

Methods

def step(self, state, meta_state)
Expand source code
def step(self, state, meta_state):
    del meta_state
    targets = state['targets']
    covers = state['covers']
    angle_vel = np.random.uniform(*self._angle_vel_range)
    angle_vel *= (2 * np.random.randint(2) - 1)
    for t, c in zip(targets, covers):
        relative_pos = t.position - 0.5
        perpendicular = np.matmul(np.array([[0, -1], [1, 0]]), relative_pos)
        radius = np.linalg.norm(relative_pos)
        vel = perpendicular * radius * angle_vel
        t.velocity = vel
        c.velocity = vel

Apply rule to the environment state.

This method can in-place modify the state however it likes.

Args

state
OrderedDict of iterables of sprites. Environment state.
meta_state
meta_state of environment.