Module moog_demos.example_configs.multi_tracking_with_feature

Multi-object tracking with features.

In this task, the subject controls the position of a small cross. Typically, this would be controlled by an eye-tracker to follow the subject's gaze. There are a number of circles with oriented bars (each initially randomly vertical or horizontal). Those circles with bars drift around an arena, bouncing off the walls. At a random time, one of the bars rotates 90 degrees. The subject's goal is to identify and fixate on that target.

After a brief initial period in which they are all visible, an occluder appears occluding peripheral vision. This forces the subject to mentally keep track of the targets' locations and orientations.

Functions

def get_config(num_targets)
Expand source code
def get_config(num_targets):
    """Get environment config.
    
    Args:
        num_targets: Int. Number of targets.
    """

    ############################################################################
    # Sprite initialization
    ############################################################################

    # Target circles
    target_factors = distribs.Product(
        [distribs.Continuous('x', 0.1, 0.9),
         distribs.Continuous('y', 0.1, 0.9),
         RadialVelocity(speed=0.01),],
        scale=0.1, shape='circle', c0=0., c1=0., c2=0.9,
    )

    # Target bars
    bar_factors = dict(
        scale=0.1, shape='square', aspect_ratio=0.3, c0=0., c1=0., c2=0.2)

    # Walls
    bottom_wall = [[-1, 0], [2, 0], [2, -1], [-1, -1]]
    top_wall = [[-1, 1], [2, 1], [2, 2], [-1, 2]]
    left_wall = [[0, -1], [0, 4], [-1, 4], [-1, -1]]
    right_wall = [[1, -1], [1, 4], [2, 4], [2, -1]]
    walls = [
        sprite.Sprite(shape=np.array(v), x=0, y=0, c0=0., c1=0., c2=0.5)
        for v in [bottom_wall, top_wall, left_wall, right_wall]
    ]

    # Occluder
    occluder_factors = dict(x=0.5, y=0.5, c0=0.6, c1=0.25, c2=0.5, opacity=0)

    # Cross shape for agent and fixation cross
    cross_shape = 0.1 * np.array([
        [-5, 1], [-1, 1], [-1, 5], [1, 5], [1, 1], [5, 1], [5, -1], [1, -1],
        [1, -5], [-1, -5], [-1, -1], [-5, -1]
    ])
    
    def state_initializer():

        fixation = sprite.Sprite(
            x=0.5, y=0.5, shape=cross_shape, scale=0.1, c0=0., c1=0., c2=0.)
        screen = sprite.Sprite(
            x=0.5, y=0.5, shape='square', scale=2., c0=0., c1=0., c2=1.)
            
        agent = sprite.Sprite(
            x=0.5, y=0.5, scale=0.04, shape=cross_shape, c0=0.33, c1=1., c2=1.)
        occluder_shape = shapes.annulus_vertices(0.13, 2.)
        occluder = sprite.Sprite(shape=occluder_shape, **occluder_factors)

        targets = [
            sprite.Sprite(**target_factors.sample())
            for _ in range(num_targets)
        ]

        bar_angles = 0.5 * np.pi * np.random.binomial(1, 0.5, (num_targets))
        bars = [
            sprite.Sprite(
                x=s.x, y=s.y, x_vel=s.x_vel, y_vel=s.y_vel, angle=angle,
                **bar_factors)
            for s, angle in zip(targets, bar_angles)
        ]

        state = collections.OrderedDict([
            ('walls', walls),
            ('targets', targets),
            ('bars', bars),
            ('occluder', [occluder]),
            ('screen', [screen]),
            ('fixation', [fixation]),
            ('agent', [agent]),
        ])
        return state

    ############################################################################
    # Physics
    ############################################################################

    elastic_collision = physics_lib.Collision(
        elasticity=1., symmetric=False, update_angle_vel=False)
    tether = physics_lib.TetherZippedLayers(
        layer_names=('targets', 'bars'), update_angle_vel=False)
    physics = physics_lib.Physics(
        (elastic_collision, 'targets', 'walls'),
        updates_per_env_step=10,
        corrective_physics=[tether],
    )

    ############################################################################
    # Task
    ############################################################################

    def _reward_condition(_, meta_state):
        return meta_state['phase'] == 'reward'
    task = tasks.Reset(
        condition=_reward_condition, reward_fn=lambda _: 1,
        steps_after_condition=10,
    )

    ############################################################################
    # Action space
    ############################################################################

    action_space = action_spaces.SetPosition(
        action_layers=('agent', 'occluder'))

    ############################################################################
    # Observer
    ############################################################################

    observer = observers.PILRenderer(
        image_size=(64, 64),
        anti_aliasing=1,
        color_to_rgb=observers.color_maps.hsv_to_rgb,
    )

    ############################################################################
    # Game rules
    ############################################################################

    # Fixation phase

    fixation_rule = gr.Fixation(
        'agent', 'fixation', _FIXATION_THRESHOLD, 'fixation_duration')
    def _should_end_fixation(_, meta_state):
        return meta_state['fixation_duration'] >= 15

    fixation_phase = gr.Phase(
        continual_rules=fixation_rule,
        end_condition=_should_end_fixation,
        name='fixation',
    )

    # Visible Phase
    
    vanish_fixation = gr.VanishByFilter('fixation', lambda _: True)
    vanish_screen = gr.VanishByFilter('screen', lambda _: True)

    visible_phase = gr.Phase(
        one_time_rules=[vanish_fixation, vanish_screen],
        duration=5,
        name='visible',
    )

    # Tracking Phase

    def _make_opaque(s):
        s.opacity = 255
    appear_occluder = gr.ModifySprites('occluder', _make_opaque)

    tracking_phase = gr.Phase(
        one_time_rules=appear_occluder,
        duration=lambda: np.random.randint(40, 80),
        name='tracking',
    )

    # Change Phase
    
    fixation_response_rule = gr.Fixation(
        'agent', 'targets', _FIXATION_THRESHOLD, 'response_duration')
    def _should_end_change(_, meta_state):
        return meta_state['response_duration'] >= 30

    change_phase = gr.Phase(
        one_time_rules=ChangeTargetFeature(),
        continual_rules=fixation_response_rule,
        name='change',
        end_condition=_should_end_change,
    )

    # Reward Phase

    def _make_transparent(s):
        s.opacity = 0
    disappear_occluder = gr.ModifySprites('occluder', _make_transparent)
    def _glue(s):
        s.velocity = np.zeros(2)
    glue_targets = gr.ModifySprites(('targets', 'bars'), _glue)
    
    reward_phase = gr.Phase(
        one_time_rules=(disappear_occluder, glue_targets),
        name='reward',
    )

    phase_sequence = gr.PhaseSequence(
        fixation_phase, visible_phase, tracking_phase, change_phase,
        reward_phase, meta_state_phase_name_key='phase',
    )

    ############################################################################
    # Final config
    ############################################################################

    config = {
        'state_initializer': state_initializer,
        'physics': physics,
        'task': task,
        'action_space': action_space,
        'observers': {'image': observer, 'state': observers.RawState()},
        'game_rules': (phase_sequence,),
        'meta_state_initializer': lambda: {'phase': ''},
    }
    return config

Get environment config.

Args

num_targets
Int. Number of targets.

Classes

class ChangeTargetFeature
Expand source code
class ChangeTargetFeature(gr.AbstractRule):
    """Game rule to change the first target's oriented bar."""

    def step(self, state, meta_state):
        del meta_state
        """Rotate first target's bar np 90 degrees."""
        s = state['bars'][0]
        s.angle = s.angle + 0.5 * np.pi

Game rule to change the first target's oriented bar.

Ancestors

  • moog.game_rules.abstract_rule.AbstractRule
  • abc.ABC

Methods

def step(self, state, meta_state)
Expand source code
def step(self, state, meta_state):
    del meta_state
    """Rotate first target's bar np 90 degrees."""
    s = state['bars'][0]
    s.angle = s.angle + 0.5 * np.pi

Apply rule to the environment state.

This method can in-place modify the state however it likes.

Args

state
OrderedDict of iterables of sprites. Environment state.
meta_state
meta_state of environment.
class RadialVelocity (speed)
Expand source code
class RadialVelocity(distribs.AbstractDistribution):
    """Radial velocity distribution."""

    def __init__(self, speed):
        """Constructor.

        Args:
            speed: Float. Speed of the sampled velocities.
        """
        self._speed = speed

    def sample(self, rng):
        rng = self._get_rng(rng)
        theta = rng.uniform(0., 2 * np.pi)
        x_vel = self._speed * np.cos(theta)
        y_vel = self._speed * np.sin(theta)
        return {'x_vel': x_vel, 'y_vel': y_vel}

    def contains(self, spec):
        if 'x_vel' not in spec or 'y_vel' not in spec:
            return False

        vel_norm = np.linalg.norm([spec['x_vel'], spec['y_vel']])
        if np.abs(vel_norm - self._speed) < _EPSILON:
            return True
        else:
            return False

    def to_str(self, indent):
        s = 'RadialVelocity({})'.format(self._speed)
        return indent * '  ' + s

    @property
    def keys(self):
        return set(['x_vel', 'y_vel'])

Radial velocity distribution.

Constructor.

Args

speed
Float. Speed of the sampled velocities.

Ancestors

  • moog.state_initialization.distributions.AbstractDistribution
  • abc.ABC

Instance variables

prop keys
Expand source code
@property
def keys(self):
    return set(['x_vel', 'y_vel'])

The set of keys in specs sampled from this distribution.

Methods

def contains(self, spec)
Expand source code
def contains(self, spec):
    if 'x_vel' not in spec or 'y_vel' not in spec:
        return False

    vel_norm = np.linalg.norm([spec['x_vel'], spec['y_vel']])
    if np.abs(vel_norm - self._speed) < _EPSILON:
        return True
    else:
        return False

Return whether distribution contains spec dictionary.

def sample(self, rng)
Expand source code
def sample(self, rng):
    rng = self._get_rng(rng)
    theta = rng.uniform(0., 2 * np.pi)
    x_vel = self._speed * np.cos(theta)
    y_vel = self._speed * np.sin(theta)
    return {'x_vel': x_vel, 'y_vel': y_vel}

Sample a spec from this distribution. Returns a dictionary.

Args

rng
Random number generator. Fed into self._get_rng(), if None defaults to np.random.
def to_str(self, indent)
Expand source code
def to_str(self, indent):
    s = 'RadialVelocity({})'.format(self._speed)
    return indent * '  ' + s

Recursive string description of this distribution.