Module moog_demos.example_configs.match_to_sample
Match-to-sample task.
In this task there are some colored circles (targets) on a ring, and an agent avatar in the center of the ring. After an initial stimulus period where the colored targets are visible, they all turn grey and begin rotating together. After some time they stop and a colored cue appears on the agent avatar. The subject must identify the target that had the same color as the cue and respond by navigating towards that target.
This task requires workimg memory of multiple objects with features (colors).
Expand source code
"""Match-to-sample task.
In this task there are some colored circles (targets) on a ring, and an agent
avatar in the center of the ring. After an initial stimulus period where the
colored targets are visible, they all turn grey and begin rotating together.
After some time they stop and a colored cue appears on the agent avatar. The
subject must identify the target that had the same color as the cue and respond
by navigating towards that target.
This task requires workimg memory of multiple objects with features (colors).
"""
import collections
import copy
import functools
import numpy as np
from moog import action_spaces
from moog import game_rules as gr
from moog import observers
from moog import physics as physics_lib
from moog import shapes
from moog import sprite
from moog import tasks
from moog.state_initialization import distributions as distribs
def _get_polygon(num_sides, min_angle):
"""Get polygon vertices centered around 0 with radius 1."""
# Get vertex angles
angles = [0.]
def _get_random_angle():
return np.random.uniform(min_angle, 2 * np.pi - min_angle)
for _ in range(num_sides - 1):
valid_angle = False
while not valid_angle:
angle = _get_random_angle()
if all([np.abs(angle - a) > min_angle for a in angles]):
valid_angle = True
angles.append(angle)
# Sort and randomly rotate the angles
angles = np.sort(angles)
angles += np.random.uniform(0., 2 * np.pi)
# Generate the vertices
vertices = np.stack((np.sin(angles), np.cos(angles)), axis=1)
return vertices
class BeginMotion(gr.AbstractRule):
"""Custom rule for motion initiation."""
def __init__(self, angle_vel_range):
"""Constructor.
Args:
angle_vel_range: Tuple of non-negative floats. Range of angular
velocity magnitudes, in radians per step.
"""
self._angle_vel_range = angle_vel_range
def step(self, state, meta_state):
del meta_state
targets = state['targets']
covers = state['covers']
angle_vel = np.random.uniform(*self._angle_vel_range)
angle_vel *= (2 * np.random.randint(2) - 1)
for t, c in zip(targets, covers):
relative_pos = t.position - 0.5
perpendicular = np.matmul(np.array([[0, -1], [1, 0]]), relative_pos)
radius = np.linalg.norm(relative_pos)
vel = perpendicular * radius * angle_vel
t.velocity = vel
c.velocity = vel
def get_config(num_targets):
"""Get environment config.
Args:
num_targets: Int. Number of targets.
"""
if num_targets == 0 or not isinstance(num_targets, int):
raise ValueError(
f'num_targets is {num_targets}, but must be a positive integer')
############################################################################
# State initialization
############################################################################
screen = sprite.Sprite(
x=0.5, y=0.5, shape='square', scale=2., c0=0.6, c1=0.7, c2=0.7)
target_factor_distrib = distribs.Product(
[distribs.Continuous('c0', 0., 1.)],
shape='circle', scale=0.085, c1=1., c2=1.,
)
cover_factors = dict(
mass=0., shape='circle', scale=0.1, c0=0., c1=0., c2=0.5, opacity=0)
def state_initializer():
"""State initializer method to be fed into environment."""
# Get targets and covers
sprite_positions = 0.5 + 0.35 * _get_polygon(num_targets, 0.7)
target_factors = [
target_factor_distrib.sample() for _ in range(num_targets)]
targets = [
sprite.Sprite(x=pos[0], y=pos[1], **factors)
for pos, factors in zip(sprite_positions, target_factors)
]
covers = [
sprite.Sprite(x=pos[0], y=pos[1], **cover_factors)
for pos in sprite_positions
]
# Tag the cover metadata based on whether they are prey or not
for i, s in enumerate(covers):
if i == 0:
s.metadata = {'prey': True}
else:
s.metadata = {'prey': False}
# Make cue have the same factors as the first target, except slightly
# smaller
cue_factors = copy.deepcopy(target_factors[0])
cue_factors['scale'] = 0.7 * target_factors[0]['scale']
cue = sprite.Sprite(
x=0.5, y=0.501, opacity=0, mass=np.inf, **cue_factors)
agent = sprite.Sprite(
x=0.5, y=0.5, shape='circle', scale=0.1, c0=0.4, c1=0., c2=1.,
mass=np.inf)
annulus_verts = shapes.annulus_vertices(0.34, 0.36)
annulus = sprite.Sprite(
x=0.5, y=0.5, shape=annulus_verts, scale=1., c0=0., c1=0., c2=0.3)
state = collections.OrderedDict([
('annulus', [annulus]),
('targets', targets),
('covers', covers),
('agent', [agent]),
('cue', [cue]),
('screen', [screen]),
])
return state
################################################################################
# Physics
################################################################################
drag = (physics_lib.Drag(coeff_friction=0.25), ['agent', 'cue'])
tether_covers = physics_lib.TetherZippedLayers(
('targets', 'covers'), anchor=np.array([0.5, 0.5]))
physics = physics_lib.Physics(
drag,
updates_per_env_step=1,
corrective_physics=[tether_covers],
)
################################################################################
# Task
################################################################################
contact_task = tasks.ContactReward(
reward_fn=lambda _, s: 1 if s.metadata['prey'] else -1,
layers_0='agent',
layers_1='covers',
)
def _should_reset(state, meta_state):
should_reset = (
state['covers'][0].opacity == 0 and
meta_state['phase'] == 'response'
)
return should_reset
reset_task = tasks.Reset(
condition=_should_reset,
steps_after_condition=15,
)
task = tasks.CompositeTask(contact_task, reset_task, timeout_steps=800)
################################################################################
# Action Space
################################################################################
action_space = action_spaces.Joystick(
scaling_factor=0.01, action_layers=['agent', 'cue'])
################################################################################
# Observer
################################################################################
_polygon_modifier = observers.polygon_modifiers.FirstPersonAgent(
agent_layer='agent')
observer = observers.PILRenderer(
image_size=(64, 64),
anti_aliasing=1,
color_to_rgb='hsv_to_rgb',
polygon_modifier=_polygon_modifier,
)
############################################################################
# Game rules
############################################################################
def _make_opaque(s):
s.opacity = 255
def _make_transparent(s):
s.opacity = 0
# Screen Phase
screen_phase = gr.Phase(duration=1, name='screen')
# Visible Phase
disappear_screen = gr.ModifySprites('screen', _make_transparent)
visible_phase = gr.Phase(
one_time_rules=disappear_screen, duration=2, name='visible')
# Motion Phase
def _move(s):
s.velocity = np.random.uniform(-0.25, 0.25, size=(2,))
cover_targets = gr.ModifySprites('covers', _make_opaque)
begin_motion = BeginMotion(angle_vel_range=(0.1, 0.3))
motion_phase = gr.Phase(
one_time_rules=[cover_targets, begin_motion],
duration=100,
name='motion',
)
# Response Phase
def _stop(s):
s.angle_vel = 0.
s.velocity = np.zeros(2)
def _unglue(s):
s.mass = 1.
appear_cue = gr.ModifySprites('cue', _make_opaque)
stop_targets = gr.ModifySprites(('targets', 'covers'), _stop)
unglue_agent = gr.ModifySprites(('agent', 'cue'), _unglue)
make_targets_discoverable = gr.ModifyOnContact(
layers_0='agent', layers_1='covers', modifier_1=_make_transparent)
response_phase = gr.Phase(
one_time_rules=[appear_cue, stop_targets, unglue_agent],
continual_rules=make_targets_discoverable,
name='response',
)
phase_sequence = gr.PhaseSequence(
screen_phase, visible_phase, motion_phase, response_phase,
meta_state_phase_name_key='phase',
)
############################################################################
# Final config
############################################################################
config = {
'state_initializer': state_initializer,
'physics': physics,
'task': task,
'action_space': action_space,
'observers': {'image': observer},
'game_rules': (phase_sequence,),
'meta_state_initializer': lambda: {'phase': ''}
}
return config
Functions
def get_config(num_targets)
-
Get environment config.
Args
num_targets
- Int. Number of targets.
Expand source code
def get_config(num_targets): """Get environment config. Args: num_targets: Int. Number of targets. """ if num_targets == 0 or not isinstance(num_targets, int): raise ValueError( f'num_targets is {num_targets}, but must be a positive integer') ############################################################################ # State initialization ############################################################################ screen = sprite.Sprite( x=0.5, y=0.5, shape='square', scale=2., c0=0.6, c1=0.7, c2=0.7) target_factor_distrib = distribs.Product( [distribs.Continuous('c0', 0., 1.)], shape='circle', scale=0.085, c1=1., c2=1., ) cover_factors = dict( mass=0., shape='circle', scale=0.1, c0=0., c1=0., c2=0.5, opacity=0) def state_initializer(): """State initializer method to be fed into environment.""" # Get targets and covers sprite_positions = 0.5 + 0.35 * _get_polygon(num_targets, 0.7) target_factors = [ target_factor_distrib.sample() for _ in range(num_targets)] targets = [ sprite.Sprite(x=pos[0], y=pos[1], **factors) for pos, factors in zip(sprite_positions, target_factors) ] covers = [ sprite.Sprite(x=pos[0], y=pos[1], **cover_factors) for pos in sprite_positions ] # Tag the cover metadata based on whether they are prey or not for i, s in enumerate(covers): if i == 0: s.metadata = {'prey': True} else: s.metadata = {'prey': False} # Make cue have the same factors as the first target, except slightly # smaller cue_factors = copy.deepcopy(target_factors[0]) cue_factors['scale'] = 0.7 * target_factors[0]['scale'] cue = sprite.Sprite( x=0.5, y=0.501, opacity=0, mass=np.inf, **cue_factors) agent = sprite.Sprite( x=0.5, y=0.5, shape='circle', scale=0.1, c0=0.4, c1=0., c2=1., mass=np.inf) annulus_verts = shapes.annulus_vertices(0.34, 0.36) annulus = sprite.Sprite( x=0.5, y=0.5, shape=annulus_verts, scale=1., c0=0., c1=0., c2=0.3) state = collections.OrderedDict([ ('annulus', [annulus]), ('targets', targets), ('covers', covers), ('agent', [agent]), ('cue', [cue]), ('screen', [screen]), ]) return state ################################################################################ # Physics ################################################################################ drag = (physics_lib.Drag(coeff_friction=0.25), ['agent', 'cue']) tether_covers = physics_lib.TetherZippedLayers( ('targets', 'covers'), anchor=np.array([0.5, 0.5])) physics = physics_lib.Physics( drag, updates_per_env_step=1, corrective_physics=[tether_covers], ) ################################################################################ # Task ################################################################################ contact_task = tasks.ContactReward( reward_fn=lambda _, s: 1 if s.metadata['prey'] else -1, layers_0='agent', layers_1='covers', ) def _should_reset(state, meta_state): should_reset = ( state['covers'][0].opacity == 0 and meta_state['phase'] == 'response' ) return should_reset reset_task = tasks.Reset( condition=_should_reset, steps_after_condition=15, ) task = tasks.CompositeTask(contact_task, reset_task, timeout_steps=800) ################################################################################ # Action Space ################################################################################ action_space = action_spaces.Joystick( scaling_factor=0.01, action_layers=['agent', 'cue']) ################################################################################ # Observer ################################################################################ _polygon_modifier = observers.polygon_modifiers.FirstPersonAgent( agent_layer='agent') observer = observers.PILRenderer( image_size=(64, 64), anti_aliasing=1, color_to_rgb='hsv_to_rgb', polygon_modifier=_polygon_modifier, ) ############################################################################ # Game rules ############################################################################ def _make_opaque(s): s.opacity = 255 def _make_transparent(s): s.opacity = 0 # Screen Phase screen_phase = gr.Phase(duration=1, name='screen') # Visible Phase disappear_screen = gr.ModifySprites('screen', _make_transparent) visible_phase = gr.Phase( one_time_rules=disappear_screen, duration=2, name='visible') # Motion Phase def _move(s): s.velocity = np.random.uniform(-0.25, 0.25, size=(2,)) cover_targets = gr.ModifySprites('covers', _make_opaque) begin_motion = BeginMotion(angle_vel_range=(0.1, 0.3)) motion_phase = gr.Phase( one_time_rules=[cover_targets, begin_motion], duration=100, name='motion', ) # Response Phase def _stop(s): s.angle_vel = 0. s.velocity = np.zeros(2) def _unglue(s): s.mass = 1. appear_cue = gr.ModifySprites('cue', _make_opaque) stop_targets = gr.ModifySprites(('targets', 'covers'), _stop) unglue_agent = gr.ModifySprites(('agent', 'cue'), _unglue) make_targets_discoverable = gr.ModifyOnContact( layers_0='agent', layers_1='covers', modifier_1=_make_transparent) response_phase = gr.Phase( one_time_rules=[appear_cue, stop_targets, unglue_agent], continual_rules=make_targets_discoverable, name='response', ) phase_sequence = gr.PhaseSequence( screen_phase, visible_phase, motion_phase, response_phase, meta_state_phase_name_key='phase', ) ############################################################################ # Final config ############################################################################ config = { 'state_initializer': state_initializer, 'physics': physics, 'task': task, 'action_space': action_space, 'observers': {'image': observer}, 'game_rules': (phase_sequence,), 'meta_state_initializer': lambda: {'phase': ''} } return config
Classes
class BeginMotion (angle_vel_range)
-
Custom rule for motion initiation.
Constructor.
Args
angle_vel_range
- Tuple of non-negative floats. Range of angular velocity magnitudes, in radians per step.
Expand source code
class BeginMotion(gr.AbstractRule): """Custom rule for motion initiation.""" def __init__(self, angle_vel_range): """Constructor. Args: angle_vel_range: Tuple of non-negative floats. Range of angular velocity magnitudes, in radians per step. """ self._angle_vel_range = angle_vel_range def step(self, state, meta_state): del meta_state targets = state['targets'] covers = state['covers'] angle_vel = np.random.uniform(*self._angle_vel_range) angle_vel *= (2 * np.random.randint(2) - 1) for t, c in zip(targets, covers): relative_pos = t.position - 0.5 perpendicular = np.matmul(np.array([[0, -1], [1, 0]]), relative_pos) radius = np.linalg.norm(relative_pos) vel = perpendicular * radius * angle_vel t.velocity = vel c.velocity = vel
Ancestors
- moog.game_rules.abstract_rule.AbstractRule
- abc.ABC
Methods
def step(self, state, meta_state)
-
Apply rule to the environment state.
This method can in-place modify the state however it likes.
Args
state
- OrderedDict of iterables of sprites. Environment state.
meta_state
- meta_state of environment.
Expand source code
def step(self, state, meta_state): del meta_state targets = state['targets'] covers = state['covers'] angle_vel = np.random.uniform(*self._angle_vel_range) angle_vel *= (2 * np.random.randint(2) - 1) for t, c in zip(targets, covers): relative_pos = t.position - 0.5 perpendicular = np.matmul(np.array([[0, -1], [1, 0]]), relative_pos) radius = np.linalg.norm(relative_pos) vel = perpendicular * radius * angle_vel t.velocity = vel c.velocity = vel