Module moog_demos.example_configs.multi_tracking_with_feature
Multi-object tracking with features.
In this task, the subject controls the position of a small cross. Typically, this would be controlled by an eye-tracker to follow the subject's gaze. There are a number of circles with oriented bars (each initially randomly vertical or horizontal). Those circles with bars drift around an arena, bouncing off the walls. At a random time, one of the bars rotates 90 degrees. The subject's goal is to identify and fixate on that target.
After a brief initial period in which they are all visible, an occluder appears occluding peripheral vision. This forces the subject to mentally keep track of the targets' locations and orientations.
Expand source code
"""Multi-object tracking with features.
In this task, the subject controls the position of a small cross. Typically,
this would be controlled by an eye-tracker to follow the subject's gaze. There
are a number of circles with oriented bars (each initially randomly vertical or
horizontal). Those circles with bars drift around an arena, bouncing off the
walls. At a random time, one of the bars rotates 90 degrees. The subject's goal
is to identify and fixate on that target.
After a brief initial period in which they are all visible, an occluder
appears occluding peripheral vision. This forces the subject to mentally keep
track of the targets' locations and orientations.
"""
import collections
import numpy as np
from moog import action_spaces
from moog import game_rules as gr
from moog import observers
from moog import physics as physics_lib
from moog import shapes
from moog import sprite
from moog import tasks
from moog.state_initialization import distributions as distribs
_FIXATION_THRESHOLD = 0.1
class RadialVelocity(distribs.AbstractDistribution):
"""Radial velocity distribution."""
def __init__(self, speed):
"""Constructor.
Args:
speed: Float. Speed of the sampled velocities.
"""
self._speed = speed
def sample(self, rng):
rng = self._get_rng(rng)
theta = rng.uniform(0., 2 * np.pi)
x_vel = self._speed * np.cos(theta)
y_vel = self._speed * np.sin(theta)
return {'x_vel': x_vel, 'y_vel': y_vel}
def contains(self, spec):
if 'x_vel' not in spec or 'y_vel' not in spec:
return False
vel_norm = np.linalg.norm([spec['x_vel'], spec['y_vel']])
if np.abs(vel_norm - self._speed) < _EPSILON:
return True
else:
return False
def to_str(self, indent):
s = 'RadialVelocity({})'.format(self._speed)
return indent * ' ' + s
@property
def keys(self):
return set(['x_vel', 'y_vel'])
class ChangeTargetFeature(gr.AbstractRule):
"""Game rule to change the first target's oriented bar."""
def step(self, state, meta_state):
del meta_state
"""Rotate first target's bar np 90 degrees."""
s = state['bars'][0]
s.angle = s.angle + 0.5 * np.pi
def get_config(num_targets):
"""Get environment config.
Args:
num_targets: Int. Number of targets.
"""
############################################################################
# Sprite initialization
############################################################################
# Target circles
target_factors = distribs.Product(
[distribs.Continuous('x', 0.1, 0.9),
distribs.Continuous('y', 0.1, 0.9),
RadialVelocity(speed=0.01),],
scale=0.1, shape='circle', c0=0., c1=0., c2=0.9,
)
# Target bars
bar_factors = dict(
scale=0.1, shape='square', aspect_ratio=0.3, c0=0., c1=0., c2=0.2)
# Walls
bottom_wall = [[-1, 0], [2, 0], [2, -1], [-1, -1]]
top_wall = [[-1, 1], [2, 1], [2, 2], [-1, 2]]
left_wall = [[0, -1], [0, 4], [-1, 4], [-1, -1]]
right_wall = [[1, -1], [1, 4], [2, 4], [2, -1]]
walls = [
sprite.Sprite(shape=np.array(v), x=0, y=0, c0=0., c1=0., c2=0.5)
for v in [bottom_wall, top_wall, left_wall, right_wall]
]
# Occluder
occluder_factors = dict(x=0.5, y=0.5, c0=0.6, c1=0.25, c2=0.5, opacity=0)
# Cross shape for agent and fixation cross
cross_shape = 0.1 * np.array([
[-5, 1], [-1, 1], [-1, 5], [1, 5], [1, 1], [5, 1], [5, -1], [1, -1],
[1, -5], [-1, -5], [-1, -1], [-5, -1]
])
def state_initializer():
fixation = sprite.Sprite(
x=0.5, y=0.5, shape=cross_shape, scale=0.1, c0=0., c1=0., c2=0.)
screen = sprite.Sprite(
x=0.5, y=0.5, shape='square', scale=2., c0=0., c1=0., c2=1.)
agent = sprite.Sprite(
x=0.5, y=0.5, scale=0.04, shape=cross_shape, c0=0.33, c1=1., c2=1.)
occluder_shape = shapes.annulus_vertices(0.13, 2.)
occluder = sprite.Sprite(shape=occluder_shape, **occluder_factors)
targets = [
sprite.Sprite(**target_factors.sample())
for _ in range(num_targets)
]
bar_angles = 0.5 * np.pi * np.random.binomial(1, 0.5, (num_targets))
bars = [
sprite.Sprite(
x=s.x, y=s.y, x_vel=s.x_vel, y_vel=s.y_vel, angle=angle,
**bar_factors)
for s, angle in zip(targets, bar_angles)
]
state = collections.OrderedDict([
('walls', walls),
('targets', targets),
('bars', bars),
('occluder', [occluder]),
('screen', [screen]),
('fixation', [fixation]),
('agent', [agent]),
])
return state
############################################################################
# Physics
############################################################################
elastic_collision = physics_lib.Collision(
elasticity=1., symmetric=False, update_angle_vel=False)
tether = physics_lib.TetherZippedLayers(
layer_names=('targets', 'bars'), update_angle_vel=False)
physics = physics_lib.Physics(
(elastic_collision, 'targets', 'walls'),
updates_per_env_step=10,
corrective_physics=[tether],
)
############################################################################
# Task
############################################################################
def _reward_condition(_, meta_state):
return meta_state['phase'] == 'reward'
task = tasks.Reset(
condition=_reward_condition, reward_fn=lambda _: 1,
steps_after_condition=10,
)
############################################################################
# Action space
############################################################################
action_space = action_spaces.SetPosition(
action_layers=('agent', 'occluder'))
############################################################################
# Observer
############################################################################
observer = observers.PILRenderer(
image_size=(64, 64),
anti_aliasing=1,
color_to_rgb=observers.color_maps.hsv_to_rgb,
)
############################################################################
# Game rules
############################################################################
# Fixation phase
fixation_rule = gr.Fixation(
'agent', 'fixation', _FIXATION_THRESHOLD, 'fixation_duration')
def _should_end_fixation(_, meta_state):
return meta_state['fixation_duration'] >= 15
fixation_phase = gr.Phase(
continual_rules=fixation_rule,
end_condition=_should_end_fixation,
name='fixation',
)
# Visible Phase
vanish_fixation = gr.VanishByFilter('fixation', lambda _: True)
vanish_screen = gr.VanishByFilter('screen', lambda _: True)
visible_phase = gr.Phase(
one_time_rules=[vanish_fixation, vanish_screen],
duration=5,
name='visible',
)
# Tracking Phase
def _make_opaque(s):
s.opacity = 255
appear_occluder = gr.ModifySprites('occluder', _make_opaque)
tracking_phase = gr.Phase(
one_time_rules=appear_occluder,
duration=lambda: np.random.randint(40, 80),
name='tracking',
)
# Change Phase
fixation_response_rule = gr.Fixation(
'agent', 'targets', _FIXATION_THRESHOLD, 'response_duration')
def _should_end_change(_, meta_state):
return meta_state['response_duration'] >= 30
change_phase = gr.Phase(
one_time_rules=ChangeTargetFeature(),
continual_rules=fixation_response_rule,
name='change',
end_condition=_should_end_change,
)
# Reward Phase
def _make_transparent(s):
s.opacity = 0
disappear_occluder = gr.ModifySprites('occluder', _make_transparent)
def _glue(s):
s.velocity = np.zeros(2)
glue_targets = gr.ModifySprites(('targets', 'bars'), _glue)
reward_phase = gr.Phase(
one_time_rules=(disappear_occluder, glue_targets),
name='reward',
)
phase_sequence = gr.PhaseSequence(
fixation_phase, visible_phase, tracking_phase, change_phase,
reward_phase, meta_state_phase_name_key='phase',
)
############################################################################
# Final config
############################################################################
config = {
'state_initializer': state_initializer,
'physics': physics,
'task': task,
'action_space': action_space,
'observers': {'image': observer, 'state': observers.RawState()},
'game_rules': (phase_sequence,),
'meta_state_initializer': lambda: {'phase': ''},
}
return config
Functions
def get_config(num_targets)
-
Get environment config.
Args
num_targets
- Int. Number of targets.
Expand source code
def get_config(num_targets): """Get environment config. Args: num_targets: Int. Number of targets. """ ############################################################################ # Sprite initialization ############################################################################ # Target circles target_factors = distribs.Product( [distribs.Continuous('x', 0.1, 0.9), distribs.Continuous('y', 0.1, 0.9), RadialVelocity(speed=0.01),], scale=0.1, shape='circle', c0=0., c1=0., c2=0.9, ) # Target bars bar_factors = dict( scale=0.1, shape='square', aspect_ratio=0.3, c0=0., c1=0., c2=0.2) # Walls bottom_wall = [[-1, 0], [2, 0], [2, -1], [-1, -1]] top_wall = [[-1, 1], [2, 1], [2, 2], [-1, 2]] left_wall = [[0, -1], [0, 4], [-1, 4], [-1, -1]] right_wall = [[1, -1], [1, 4], [2, 4], [2, -1]] walls = [ sprite.Sprite(shape=np.array(v), x=0, y=0, c0=0., c1=0., c2=0.5) for v in [bottom_wall, top_wall, left_wall, right_wall] ] # Occluder occluder_factors = dict(x=0.5, y=0.5, c0=0.6, c1=0.25, c2=0.5, opacity=0) # Cross shape for agent and fixation cross cross_shape = 0.1 * np.array([ [-5, 1], [-1, 1], [-1, 5], [1, 5], [1, 1], [5, 1], [5, -1], [1, -1], [1, -5], [-1, -5], [-1, -1], [-5, -1] ]) def state_initializer(): fixation = sprite.Sprite( x=0.5, y=0.5, shape=cross_shape, scale=0.1, c0=0., c1=0., c2=0.) screen = sprite.Sprite( x=0.5, y=0.5, shape='square', scale=2., c0=0., c1=0., c2=1.) agent = sprite.Sprite( x=0.5, y=0.5, scale=0.04, shape=cross_shape, c0=0.33, c1=1., c2=1.) occluder_shape = shapes.annulus_vertices(0.13, 2.) occluder = sprite.Sprite(shape=occluder_shape, **occluder_factors) targets = [ sprite.Sprite(**target_factors.sample()) for _ in range(num_targets) ] bar_angles = 0.5 * np.pi * np.random.binomial(1, 0.5, (num_targets)) bars = [ sprite.Sprite( x=s.x, y=s.y, x_vel=s.x_vel, y_vel=s.y_vel, angle=angle, **bar_factors) for s, angle in zip(targets, bar_angles) ] state = collections.OrderedDict([ ('walls', walls), ('targets', targets), ('bars', bars), ('occluder', [occluder]), ('screen', [screen]), ('fixation', [fixation]), ('agent', [agent]), ]) return state ############################################################################ # Physics ############################################################################ elastic_collision = physics_lib.Collision( elasticity=1., symmetric=False, update_angle_vel=False) tether = physics_lib.TetherZippedLayers( layer_names=('targets', 'bars'), update_angle_vel=False) physics = physics_lib.Physics( (elastic_collision, 'targets', 'walls'), updates_per_env_step=10, corrective_physics=[tether], ) ############################################################################ # Task ############################################################################ def _reward_condition(_, meta_state): return meta_state['phase'] == 'reward' task = tasks.Reset( condition=_reward_condition, reward_fn=lambda _: 1, steps_after_condition=10, ) ############################################################################ # Action space ############################################################################ action_space = action_spaces.SetPosition( action_layers=('agent', 'occluder')) ############################################################################ # Observer ############################################################################ observer = observers.PILRenderer( image_size=(64, 64), anti_aliasing=1, color_to_rgb=observers.color_maps.hsv_to_rgb, ) ############################################################################ # Game rules ############################################################################ # Fixation phase fixation_rule = gr.Fixation( 'agent', 'fixation', _FIXATION_THRESHOLD, 'fixation_duration') def _should_end_fixation(_, meta_state): return meta_state['fixation_duration'] >= 15 fixation_phase = gr.Phase( continual_rules=fixation_rule, end_condition=_should_end_fixation, name='fixation', ) # Visible Phase vanish_fixation = gr.VanishByFilter('fixation', lambda _: True) vanish_screen = gr.VanishByFilter('screen', lambda _: True) visible_phase = gr.Phase( one_time_rules=[vanish_fixation, vanish_screen], duration=5, name='visible', ) # Tracking Phase def _make_opaque(s): s.opacity = 255 appear_occluder = gr.ModifySprites('occluder', _make_opaque) tracking_phase = gr.Phase( one_time_rules=appear_occluder, duration=lambda: np.random.randint(40, 80), name='tracking', ) # Change Phase fixation_response_rule = gr.Fixation( 'agent', 'targets', _FIXATION_THRESHOLD, 'response_duration') def _should_end_change(_, meta_state): return meta_state['response_duration'] >= 30 change_phase = gr.Phase( one_time_rules=ChangeTargetFeature(), continual_rules=fixation_response_rule, name='change', end_condition=_should_end_change, ) # Reward Phase def _make_transparent(s): s.opacity = 0 disappear_occluder = gr.ModifySprites('occluder', _make_transparent) def _glue(s): s.velocity = np.zeros(2) glue_targets = gr.ModifySprites(('targets', 'bars'), _glue) reward_phase = gr.Phase( one_time_rules=(disappear_occluder, glue_targets), name='reward', ) phase_sequence = gr.PhaseSequence( fixation_phase, visible_phase, tracking_phase, change_phase, reward_phase, meta_state_phase_name_key='phase', ) ############################################################################ # Final config ############################################################################ config = { 'state_initializer': state_initializer, 'physics': physics, 'task': task, 'action_space': action_space, 'observers': {'image': observer, 'state': observers.RawState()}, 'game_rules': (phase_sequence,), 'meta_state_initializer': lambda: {'phase': ''}, } return config
Classes
class ChangeTargetFeature
-
Game rule to change the first target's oriented bar.
Expand source code
class ChangeTargetFeature(gr.AbstractRule): """Game rule to change the first target's oriented bar.""" def step(self, state, meta_state): del meta_state """Rotate first target's bar np 90 degrees.""" s = state['bars'][0] s.angle = s.angle + 0.5 * np.pi
Ancestors
- moog.game_rules.abstract_rule.AbstractRule
- abc.ABC
Methods
def step(self, state, meta_state)
-
Apply rule to the environment state.
This method can in-place modify the state however it likes.
Args
state
- OrderedDict of iterables of sprites. Environment state.
meta_state
- meta_state of environment.
Expand source code
def step(self, state, meta_state): del meta_state """Rotate first target's bar np 90 degrees.""" s = state['bars'][0] s.angle = s.angle + 0.5 * np.pi
class RadialVelocity (speed)
-
Radial velocity distribution.
Constructor.
Args
speed
- Float. Speed of the sampled velocities.
Expand source code
class RadialVelocity(distribs.AbstractDistribution): """Radial velocity distribution.""" def __init__(self, speed): """Constructor. Args: speed: Float. Speed of the sampled velocities. """ self._speed = speed def sample(self, rng): rng = self._get_rng(rng) theta = rng.uniform(0., 2 * np.pi) x_vel = self._speed * np.cos(theta) y_vel = self._speed * np.sin(theta) return {'x_vel': x_vel, 'y_vel': y_vel} def contains(self, spec): if 'x_vel' not in spec or 'y_vel' not in spec: return False vel_norm = np.linalg.norm([spec['x_vel'], spec['y_vel']]) if np.abs(vel_norm - self._speed) < _EPSILON: return True else: return False def to_str(self, indent): s = 'RadialVelocity({})'.format(self._speed) return indent * ' ' + s @property def keys(self): return set(['x_vel', 'y_vel'])
Ancestors
- moog.state_initialization.distributions.AbstractDistribution
- abc.ABC
Instance variables
var keys
-
The set of keys in specs sampled from this distribution.
Expand source code
@property def keys(self): return set(['x_vel', 'y_vel'])
Methods
def contains(self, spec)
-
Return whether distribution contains spec dictionary.
Expand source code
def contains(self, spec): if 'x_vel' not in spec or 'y_vel' not in spec: return False vel_norm = np.linalg.norm([spec['x_vel'], spec['y_vel']]) if np.abs(vel_norm - self._speed) < _EPSILON: return True else: return False
def sample(self, rng)
-
Sample a spec from this distribution. Returns a dictionary.
Args
rng
- Random number generator. Fed into self._get_rng(), if None defaults to np.random.
Expand source code
def sample(self, rng): rng = self._get_rng(rng) theta = rng.uniform(0., 2 * np.pi) x_vel = self._speed * np.cos(theta) y_vel = self._speed * np.sin(theta) return {'x_vel': x_vel, 'y_vel': y_vel}
def to_str(self, indent)
-
Recursive string description of this distribution.
Expand source code
def to_str(self, indent): s = 'RadialVelocity({})'.format(self._speed) return indent * ' ' + s