Module moog_demos.example_configs.functional_maze
Maze task with predators, prey, and boosters.
The predators (red circles) chase the agent. The agent receives reward for catching prey (yellow circles), which disappear upon capture. The boosters (blue triangles) temporarily increase the agent's speed. The portals (white squares) teleport the agent from one place to another.
Expand source code
"""Maze task with predators, prey, and boosters.
The predators (red circles) chase the agent. The agent receives reward for
catching prey (yellow circles), which disappear upon capture. The boosters (blue
triangles) temporarily increase the agent's speed. The portals (white squares)
teleport the agent from one place to another.
"""
import collections
import numpy as np
from moog import action_spaces
from moog import game_rules
from moog import observers
from moog import physics as physics_lib
from moog import shapes
from moog import sprite
from moog import tasks
from moog.state_initialization import distributions as distribs
from moog.state_initialization import sprite_generators
class Booster(game_rules.AbstractRule):
"""Game rule for boosters.
This rule makes the agent temporarily brigher in color and lighter in mass
(to move faster) upon contact with a booster sprite.
"""
def __init__(self,
mass_multiplier=0.4,
c2_multiplier=0.1,
boost_duration=60,
agent_layer='agent',
booster_layer='boosters'):
"""Constructor.
Args:
mass_multiplier: Float. Mass of agent after contacting a booster.
c2_multiplier: Float. Brightness increase factor upon contacting a
booster.
boost_duration: Int. Number of steps before boost ends.
agent_layer: String. Name of the agent layer in the state.
booster_layer: String. Name of the booster layer in the state.
"""
self._mass_multiplier = mass_multiplier
self._c2_multiplier = c2_multiplier
self.boost_duration = boost_duration
self._agent_layer = agent_layer
self._booster_layer = booster_layer
def reset(self, state, meta_state):
del state
del meta_state
self._steps_until_expire = np.inf
def _apply_change(self, agent):
agent.mass *= self._mass_multiplier
agent.c2 = 1. - (1. - agent.c2) * self._c2_multiplier
def _revert_change(self, agent):
agent.mass /= self._mass_multiplier
agent.c2 = 1. - (1. - agent.c2) / self._c2_multiplier
def step(self, state, meta_state):
del meta_state
self._steps_until_expire -= 1
agent = state[self._agent_layer][0]
if self._steps_until_expire == np.inf:
boosters = state[self._booster_layer]
if any([agent.overlaps_sprite(s) for s in boosters]):
self._apply_change(agent)
self._steps_until_expire = self.boost_duration
elif self._steps_until_expire <= 0:
self._revert_change(agent)
self._steps_until_expire = np.inf
def get_config(_):
"""Get environment config."""
############################################################################
# Sprite initialization
############################################################################
# Agent
agent_factors = distribs.Product(
[distribs.Continuous('x', 0.1, 0.9),
distribs.Continuous('y', 0.1, 0.9)],
shape='circle', scale=0.1, c0=0.33, c1=1., c2=0.7,
)
# Predators
predator_factors = distribs.Product(
[distribs.Continuous('x', 0.1, 0.9),
distribs.Continuous('y', 0.1, 0.9)],
shape='circle', scale=0.1, c0=0., c1=1., c2=0.8,
)
# Prey
prey_factors = distribs.Product(
[distribs.Continuous('x', 0.1, 0.9),
distribs.Continuous('y', 0.1, 0.9)],
shape='circle', scale=0.1, c0=0.2, c1=1., c2=1.,
)
# Boosters
booster_factors = distribs.Product(
[distribs.Continuous('x', 0.1, 0.9),
distribs.Continuous('y', 0.1, 0.9)],
shape='triangle', scale=0.1, c0=0.6, c1=1., c2=1.,
)
# Portals
portal_factors = dict(shape='square', scale=0.1, c0=0., c1=0., c2=0.95)
portal_sprites = [
sprite.Sprite(x=0.125, y=0.125, **portal_factors),
sprite.Sprite(x=0.875, y=0.875, **portal_factors),
]
# Walls
wall_color = dict(c0=0., c1=0., c2=0.5)
island_wall_shape_0 = np.array(
[[0.2, 0.2], [0.4, 0.2], [0.4, 0.4], [0.2, 0.4]])
island_wall_shapes = [
island_wall_shape_0,
island_wall_shape_0 + np.array([[0., 0.4]]),
island_wall_shape_0 + np.array([[0.4, 0.4]]),
island_wall_shape_0 + np.array([[0.4, 0.]]),
]
island_walls = [
sprite.Sprite(shape=shape, x=0., y=0., **wall_color)
for shape in island_wall_shapes
]
boundary_walls = shapes.border_walls(visible_thickness=0.05, **wall_color)
walls = boundary_walls + island_walls
# Callable sprite generators
agent_generator = sprite_generators.generate_sprites(
agent_factors, num_sprites=1)
predator_generator = sprite_generators.generate_sprites(
predator_factors, num_sprites=1)
prey_generator = sprite_generators.generate_sprites(
prey_factors, num_sprites=lambda: np.random.randint(2, 5))
booster_generator = sprite_generators.generate_sprites(
booster_factors, num_sprites=2)
# Create callable initializer returning entire state
def state_initializer():
portals = portal_sprites
agent = agent_generator(without_overlapping=walls)
predators = predator_generator(without_overlapping=walls + agent)
boosters = booster_generator(without_overlapping=walls + agent)
prey = prey_generator(without_overlapping=walls)
state = collections.OrderedDict([
('walls', walls),
('portals', portals),
('boosters', boosters),
('prey', prey),
('predators', predators),
('agent', agent),
])
return state
############################################################################
# Physics
############################################################################
agent_friction_force = physics_lib.Drag(coeff_friction=0.25)
predator_friction_force = physics_lib.Drag(coeff_friction=0.05)
predator_random_force = physics_lib.RandomForce(max_force_magnitude=0.02)
prey_friction_force = physics_lib.Drag(coeff_friction=0.02)
prey_random_force = physics_lib.RandomForce(max_force_magnitude=0.02)
predator_attraction = physics_lib.DistanceForce(
force_fn=physics_lib.linear_force_fn(zero_intercept=-0.002, slope=0.001)
)
asymmetric_collision = physics_lib.Collision(
elasticity=0.25, symmetric=False, update_angle_vel=False)
forces = (
(agent_friction_force, 'agent'),
(predator_friction_force, 'predators'),
(predator_random_force, 'predators'),
(prey_friction_force, 'prey'),
(prey_random_force, 'prey'),
(predator_attraction, 'agent', 'predators'),
(asymmetric_collision, ['agent', 'predators', 'prey'], 'walls'),
)
physics = physics_lib.Physics(*forces, updates_per_env_step=5)
############################################################################
# Task
############################################################################
predator_task = tasks.ContactReward(
-5, layers_0='agent', layers_1='predators', reset_steps_after_contact=0)
prey_task = tasks.ContactReward(1, layers_0='agent', layers_1='prey')
reset_task = tasks.Reset(
condition=lambda state: len(state['prey']) == 0,
steps_after_condition=5,
)
task = tasks.CompositeTask(
predator_task, prey_task, reset_task, timeout_steps=400)
############################################################################
# Action space
############################################################################
action_space = action_spaces.Joystick(
scaling_factor=0.01,
action_layers='agent',
)
############################################################################
# Observer
############################################################################
observer = observers.PILRenderer(
image_size=(64, 64),
anti_aliasing=1,
color_to_rgb='hsv_to_rgb',
)
############################################################################
# Game rules
############################################################################
disappear_rule = game_rules.VanishOnContact(
vanishing_layer='prey', contacting_layer='agent')
portal_rule = game_rules.Portal(
teleporting_layer='agent', portal_layer='portals')
rules = (disappear_rule, portal_rule, Booster())
############################################################################
# Final config
############################################################################
config = {
'state_initializer': state_initializer,
'physics': physics,
'task': task,
'action_space': action_space,
'observers': {'image': observer},
'game_rules': rules,
}
return config
Functions
def get_config(_)
-
Get environment config.
Expand source code
def get_config(_): """Get environment config.""" ############################################################################ # Sprite initialization ############################################################################ # Agent agent_factors = distribs.Product( [distribs.Continuous('x', 0.1, 0.9), distribs.Continuous('y', 0.1, 0.9)], shape='circle', scale=0.1, c0=0.33, c1=1., c2=0.7, ) # Predators predator_factors = distribs.Product( [distribs.Continuous('x', 0.1, 0.9), distribs.Continuous('y', 0.1, 0.9)], shape='circle', scale=0.1, c0=0., c1=1., c2=0.8, ) # Prey prey_factors = distribs.Product( [distribs.Continuous('x', 0.1, 0.9), distribs.Continuous('y', 0.1, 0.9)], shape='circle', scale=0.1, c0=0.2, c1=1., c2=1., ) # Boosters booster_factors = distribs.Product( [distribs.Continuous('x', 0.1, 0.9), distribs.Continuous('y', 0.1, 0.9)], shape='triangle', scale=0.1, c0=0.6, c1=1., c2=1., ) # Portals portal_factors = dict(shape='square', scale=0.1, c0=0., c1=0., c2=0.95) portal_sprites = [ sprite.Sprite(x=0.125, y=0.125, **portal_factors), sprite.Sprite(x=0.875, y=0.875, **portal_factors), ] # Walls wall_color = dict(c0=0., c1=0., c2=0.5) island_wall_shape_0 = np.array( [[0.2, 0.2], [0.4, 0.2], [0.4, 0.4], [0.2, 0.4]]) island_wall_shapes = [ island_wall_shape_0, island_wall_shape_0 + np.array([[0., 0.4]]), island_wall_shape_0 + np.array([[0.4, 0.4]]), island_wall_shape_0 + np.array([[0.4, 0.]]), ] island_walls = [ sprite.Sprite(shape=shape, x=0., y=0., **wall_color) for shape in island_wall_shapes ] boundary_walls = shapes.border_walls(visible_thickness=0.05, **wall_color) walls = boundary_walls + island_walls # Callable sprite generators agent_generator = sprite_generators.generate_sprites( agent_factors, num_sprites=1) predator_generator = sprite_generators.generate_sprites( predator_factors, num_sprites=1) prey_generator = sprite_generators.generate_sprites( prey_factors, num_sprites=lambda: np.random.randint(2, 5)) booster_generator = sprite_generators.generate_sprites( booster_factors, num_sprites=2) # Create callable initializer returning entire state def state_initializer(): portals = portal_sprites agent = agent_generator(without_overlapping=walls) predators = predator_generator(without_overlapping=walls + agent) boosters = booster_generator(without_overlapping=walls + agent) prey = prey_generator(without_overlapping=walls) state = collections.OrderedDict([ ('walls', walls), ('portals', portals), ('boosters', boosters), ('prey', prey), ('predators', predators), ('agent', agent), ]) return state ############################################################################ # Physics ############################################################################ agent_friction_force = physics_lib.Drag(coeff_friction=0.25) predator_friction_force = physics_lib.Drag(coeff_friction=0.05) predator_random_force = physics_lib.RandomForce(max_force_magnitude=0.02) prey_friction_force = physics_lib.Drag(coeff_friction=0.02) prey_random_force = physics_lib.RandomForce(max_force_magnitude=0.02) predator_attraction = physics_lib.DistanceForce( force_fn=physics_lib.linear_force_fn(zero_intercept=-0.002, slope=0.001) ) asymmetric_collision = physics_lib.Collision( elasticity=0.25, symmetric=False, update_angle_vel=False) forces = ( (agent_friction_force, 'agent'), (predator_friction_force, 'predators'), (predator_random_force, 'predators'), (prey_friction_force, 'prey'), (prey_random_force, 'prey'), (predator_attraction, 'agent', 'predators'), (asymmetric_collision, ['agent', 'predators', 'prey'], 'walls'), ) physics = physics_lib.Physics(*forces, updates_per_env_step=5) ############################################################################ # Task ############################################################################ predator_task = tasks.ContactReward( -5, layers_0='agent', layers_1='predators', reset_steps_after_contact=0) prey_task = tasks.ContactReward(1, layers_0='agent', layers_1='prey') reset_task = tasks.Reset( condition=lambda state: len(state['prey']) == 0, steps_after_condition=5, ) task = tasks.CompositeTask( predator_task, prey_task, reset_task, timeout_steps=400) ############################################################################ # Action space ############################################################################ action_space = action_spaces.Joystick( scaling_factor=0.01, action_layers='agent', ) ############################################################################ # Observer ############################################################################ observer = observers.PILRenderer( image_size=(64, 64), anti_aliasing=1, color_to_rgb='hsv_to_rgb', ) ############################################################################ # Game rules ############################################################################ disappear_rule = game_rules.VanishOnContact( vanishing_layer='prey', contacting_layer='agent') portal_rule = game_rules.Portal( teleporting_layer='agent', portal_layer='portals') rules = (disappear_rule, portal_rule, Booster()) ############################################################################ # Final config ############################################################################ config = { 'state_initializer': state_initializer, 'physics': physics, 'task': task, 'action_space': action_space, 'observers': {'image': observer}, 'game_rules': rules, } return config
Classes
class Booster (mass_multiplier=0.4, c2_multiplier=0.1, boost_duration=60, agent_layer='agent', booster_layer='boosters')
-
Game rule for boosters.
This rule makes the agent temporarily brigher in color and lighter in mass (to move faster) upon contact with a booster sprite.
Constructor.
Args
mass_multiplier
- Float. Mass of agent after contacting a booster.
c2_multiplier
- Float. Brightness increase factor upon contacting a booster.
boost_duration
- Int. Number of steps before boost ends.
agent_layer
- String. Name of the agent layer in the state.
booster_layer
- String. Name of the booster layer in the state.
Expand source code
class Booster(game_rules.AbstractRule): """Game rule for boosters. This rule makes the agent temporarily brigher in color and lighter in mass (to move faster) upon contact with a booster sprite. """ def __init__(self, mass_multiplier=0.4, c2_multiplier=0.1, boost_duration=60, agent_layer='agent', booster_layer='boosters'): """Constructor. Args: mass_multiplier: Float. Mass of agent after contacting a booster. c2_multiplier: Float. Brightness increase factor upon contacting a booster. boost_duration: Int. Number of steps before boost ends. agent_layer: String. Name of the agent layer in the state. booster_layer: String. Name of the booster layer in the state. """ self._mass_multiplier = mass_multiplier self._c2_multiplier = c2_multiplier self.boost_duration = boost_duration self._agent_layer = agent_layer self._booster_layer = booster_layer def reset(self, state, meta_state): del state del meta_state self._steps_until_expire = np.inf def _apply_change(self, agent): agent.mass *= self._mass_multiplier agent.c2 = 1. - (1. - agent.c2) * self._c2_multiplier def _revert_change(self, agent): agent.mass /= self._mass_multiplier agent.c2 = 1. - (1. - agent.c2) / self._c2_multiplier def step(self, state, meta_state): del meta_state self._steps_until_expire -= 1 agent = state[self._agent_layer][0] if self._steps_until_expire == np.inf: boosters = state[self._booster_layer] if any([agent.overlaps_sprite(s) for s in boosters]): self._apply_change(agent) self._steps_until_expire = self.boost_duration elif self._steps_until_expire <= 0: self._revert_change(agent) self._steps_until_expire = np.inf
Ancestors
- moog.game_rules.abstract_rule.AbstractRule
- abc.ABC
Methods
def reset(self, state, meta_state)
-
Reset rule at beginning of every trial.
This method can be used to reset any attributes of the rule that serve as memory within trials.
Args
state
- OrderedDict of iterables of sprites. Environment state.
meta_state
- meta_state of environment.
Expand source code
def reset(self, state, meta_state): del state del meta_state self._steps_until_expire = np.inf
def step(self, state, meta_state)
-
Apply rule to the environment state.
This method can in-place modify the state however it likes.
Args
state
- OrderedDict of iterables of sprites. Environment state.
meta_state
- meta_state of environment.
Expand source code
def step(self, state, meta_state): del meta_state self._steps_until_expire -= 1 agent = state[self._agent_layer][0] if self._steps_until_expire == np.inf: boosters = state[self._booster_layer] if any([agent.overlaps_sprite(s) for s in boosters]): self._apply_change(agent) self._steps_until_expire = self.boost_duration elif self._steps_until_expire <= 0: self._revert_change(agent) self._steps_until_expire = np.inf