Module moog_demos.example_configs.predators_arena

Task with predators chasing agent in open arena.

The predators (red circles) chase the agent. The predators bouce off the arena boundaries, while the agent cannot exit but does not bounce (i.e. it has inelastic collisions with the boundaries). Trials only terminate when the agent is caught by a predator. The subject controls the agent with a joystick.

This task also contains an auto-curriculum: When the subject does well (evades the predators for a long time before being caught), the predators' masses are decreased, thereby increasing the predators' speeds. Conversely, when the subject does poorly (gets caught quickly), the predators' masses are increased, thereby decreasing the predators' speeds.

Expand source code
"""Task with predators chasing agent in open arena.

The predators (red circles) chase the agent. The predators bouce off the arena
boundaries, while the agent cannot exit but does not bounce (i.e. it has
inelastic collisions with the boundaries). Trials only terminate when the agent
is caught by a predator. The subject controls the agent with a joystick.

This task also contains an auto-curriculum: When the subject does well (evades
the predators for a long time before being caught), the predators' masses are
decreased, thereby increasing the predators' speeds. Conversely, when the
subject does poorly (gets caught quickly), the predators' masses are increased,
thereby decreasing the predators' speeds.
"""

import collections
import numpy as np
import os

from moog import action_spaces
from moog import game_rules
from moog import observers
from moog import physics as physics_lib
from moog import shapes
from moog import tasks
from moog.state_initialization import distributions as distribs
from moog.state_initialization import sprite_generators


class StateInitialization():
    """State initialization class to dynamically adapt predator mass.
    
    This is essentially an auto-curriculum: When the subject does well (evades
    the predators for a long time before being caught), the predators' masses
    are decreased, thereby increasing the predators' speeds. Conversely, when
    the subject does poorly (gets caught quickly), the predators' masses are
    increased, thereby decreasing the predators' speeds.
    """

    def __init__(self, num_predators, step_scaling_factor, threshold_trial_len):
        """Constructor.

        This class uses the meta-state to keep track of the number of steps
        before the agent is caught. See the game rules section near the bottom
        of this file for the counter incrementer.

        Args:
            step_scaling_factor: Float. Fractional decrease of predator mass
                after a trial longer than threshold_trial_len. Also used as
                fractional increase of predator mass after a trial shorter than
                threshold_trial_len. Should be small and positive.
            threshold_trial_len: Length of a trial above which the predator
                mass is decreased and below which the predator mass is
                increased.
        """
        self._mass = 1.
        self._step_scaling_factor = step_scaling_factor
        self._threshold_trial_len = threshold_trial_len

        # Agent
        agent_factors = distribs.Product(
            [distribs.Continuous('x', 0., 1.),
            distribs.Continuous('y', 0., 1.)],
            shape='circle', scale=0.1, c0=0.33, c1=1., c2=0.66,
        )
        self._agent_generator = sprite_generators.generate_sprites(
            agent_factors, num_sprites=1)

        # Predators
        predator_factors = distribs.Product(
            [distribs.Continuous('x', 0., 1.),
            distribs.Continuous('y', 0., 1.)],
            shape='circle', scale=0.1, c0=0., c1=1., c2=0.8,
        )
        self._predator_generator = sprite_generators.generate_sprites(
            predator_factors, num_sprites=num_predators)

        # Walls
        self._walls = shapes.border_walls(
            visible_thickness=0., c0=0., c1=0., c2=0.5)

        self._meta_state = None

    def state_initializer(self):
        """State initializer method to be fed to environment."""
        agent = self._agent_generator(without_overlapping=self._walls)
        predators = self._predator_generator(
            without_overlapping=self._walls + agent)

        if self._meta_state is not None:
            if self._meta_state['step_count'] > self._threshold_trial_len:
                self._mass -= self._mass * self._step_scaling_factor
            else:
                self._mass += self._mass * self._step_scaling_factor
        for s in predators:
            s.mass = self._mass
        
        state = collections.OrderedDict([
            ('walls', self._walls),
            ('agent', agent),
            ('predators', predators),
        ])
        return state

    def meta_state_initializer(self):
        """Meta-state initializer method to be fed to environment."""
        self._meta_state = {'step_count': 0}
        return self._meta_state


def get_config(num_predators):
    """Get config dictionary of kwargs for environment constructor.
    
    Args:
        num_predators: Int. Number of predators.
    """

    ############################################################################
    # Sprite initialization
    ############################################################################

    state_initialization = StateInitialization(
        num_predators=num_predators,
        step_scaling_factor=0.1,
        threshold_trial_len=200,
    )

    ############################################################################
    # Physics
    ############################################################################

    agent_friction_force = physics_lib.Drag(coeff_friction=0.25)
    predator_friction_force = physics_lib.Drag(coeff_friction=0.04)
    predator_random_force = physics_lib.RandomForce(max_force_magnitude=0.03)
    predator_attraction = physics_lib.DistanceForce(
        physics_lib.linear_force_fn(zero_intercept=-0.0025, slope=0.0001))
    elastic_asymmetric_collision = physics_lib.Collision(
        elasticity=1., symmetric=False)
    inelastic_asymmetric_collision = physics_lib.Collision(
        elasticity=0., symmetric=False)
    
    forces = (
        (agent_friction_force, 'agent'),
        (predator_friction_force, 'predators'),
        (predator_random_force, 'predators'),
        (predator_attraction, 'agent', 'predators'),
        (elastic_asymmetric_collision, 'predators', 'walls'),
        (inelastic_asymmetric_collision, 'agent', 'walls'),
    )
    
    physics = physics_lib.Physics(*forces, updates_per_env_step=10)

    ############################################################################
    # Task
    ############################################################################

    task = tasks.ContactReward(
        -1, layers_0='agent', layers_1='predators', reset_steps_after_contact=0)

    ############################################################################
    # Action space
    ############################################################################

    action_space = action_spaces.Joystick(
        scaling_factor=0.01, action_layers='agent')

    ############################################################################
    # Observer
    ############################################################################

    observer = observers.PILRenderer(
        image_size=(64, 64), anti_aliasing=1, color_to_rgb='hsv_to_rgb')

    ############################################################################
    # Game rules
    ############################################################################

    def _increment_count(meta_state):
        meta_state['count'] += 1
    rules = game_rules.ModifyMetaState(_increment_count)

    ############################################################################
    # Final config
    ############################################################################

    config = {
        'state_initializer': state_initialization.state_initializer,
        'physics': physics,
        'task': task,
        'action_space': action_space,
        'observers': {'image': observer},
        'meta_state_initializer': state_initialization.meta_state_initializer,
    }
    return config

Functions

def get_config(num_predators)

Get config dictionary of kwargs for environment constructor.

Args

num_predators
Int. Number of predators.
Expand source code
def get_config(num_predators):
    """Get config dictionary of kwargs for environment constructor.
    
    Args:
        num_predators: Int. Number of predators.
    """

    ############################################################################
    # Sprite initialization
    ############################################################################

    state_initialization = StateInitialization(
        num_predators=num_predators,
        step_scaling_factor=0.1,
        threshold_trial_len=200,
    )

    ############################################################################
    # Physics
    ############################################################################

    agent_friction_force = physics_lib.Drag(coeff_friction=0.25)
    predator_friction_force = physics_lib.Drag(coeff_friction=0.04)
    predator_random_force = physics_lib.RandomForce(max_force_magnitude=0.03)
    predator_attraction = physics_lib.DistanceForce(
        physics_lib.linear_force_fn(zero_intercept=-0.0025, slope=0.0001))
    elastic_asymmetric_collision = physics_lib.Collision(
        elasticity=1., symmetric=False)
    inelastic_asymmetric_collision = physics_lib.Collision(
        elasticity=0., symmetric=False)
    
    forces = (
        (agent_friction_force, 'agent'),
        (predator_friction_force, 'predators'),
        (predator_random_force, 'predators'),
        (predator_attraction, 'agent', 'predators'),
        (elastic_asymmetric_collision, 'predators', 'walls'),
        (inelastic_asymmetric_collision, 'agent', 'walls'),
    )
    
    physics = physics_lib.Physics(*forces, updates_per_env_step=10)

    ############################################################################
    # Task
    ############################################################################

    task = tasks.ContactReward(
        -1, layers_0='agent', layers_1='predators', reset_steps_after_contact=0)

    ############################################################################
    # Action space
    ############################################################################

    action_space = action_spaces.Joystick(
        scaling_factor=0.01, action_layers='agent')

    ############################################################################
    # Observer
    ############################################################################

    observer = observers.PILRenderer(
        image_size=(64, 64), anti_aliasing=1, color_to_rgb='hsv_to_rgb')

    ############################################################################
    # Game rules
    ############################################################################

    def _increment_count(meta_state):
        meta_state['count'] += 1
    rules = game_rules.ModifyMetaState(_increment_count)

    ############################################################################
    # Final config
    ############################################################################

    config = {
        'state_initializer': state_initialization.state_initializer,
        'physics': physics,
        'task': task,
        'action_space': action_space,
        'observers': {'image': observer},
        'meta_state_initializer': state_initialization.meta_state_initializer,
    }
    return config

Classes

class StateInitialization (num_predators, step_scaling_factor, threshold_trial_len)

State initialization class to dynamically adapt predator mass.

This is essentially an auto-curriculum: When the subject does well (evades the predators for a long time before being caught), the predators' masses are decreased, thereby increasing the predators' speeds. Conversely, when the subject does poorly (gets caught quickly), the predators' masses are increased, thereby decreasing the predators' speeds.

Constructor.

This class uses the meta-state to keep track of the number of steps before the agent is caught. See the game rules section near the bottom of this file for the counter incrementer.

Args

step_scaling_factor
Float. Fractional decrease of predator mass after a trial longer than threshold_trial_len. Also used as fractional increase of predator mass after a trial shorter than threshold_trial_len. Should be small and positive.
threshold_trial_len
Length of a trial above which the predator mass is decreased and below which the predator mass is increased.
Expand source code
class StateInitialization():
    """State initialization class to dynamically adapt predator mass.
    
    This is essentially an auto-curriculum: When the subject does well (evades
    the predators for a long time before being caught), the predators' masses
    are decreased, thereby increasing the predators' speeds. Conversely, when
    the subject does poorly (gets caught quickly), the predators' masses are
    increased, thereby decreasing the predators' speeds.
    """

    def __init__(self, num_predators, step_scaling_factor, threshold_trial_len):
        """Constructor.

        This class uses the meta-state to keep track of the number of steps
        before the agent is caught. See the game rules section near the bottom
        of this file for the counter incrementer.

        Args:
            step_scaling_factor: Float. Fractional decrease of predator mass
                after a trial longer than threshold_trial_len. Also used as
                fractional increase of predator mass after a trial shorter than
                threshold_trial_len. Should be small and positive.
            threshold_trial_len: Length of a trial above which the predator
                mass is decreased and below which the predator mass is
                increased.
        """
        self._mass = 1.
        self._step_scaling_factor = step_scaling_factor
        self._threshold_trial_len = threshold_trial_len

        # Agent
        agent_factors = distribs.Product(
            [distribs.Continuous('x', 0., 1.),
            distribs.Continuous('y', 0., 1.)],
            shape='circle', scale=0.1, c0=0.33, c1=1., c2=0.66,
        )
        self._agent_generator = sprite_generators.generate_sprites(
            agent_factors, num_sprites=1)

        # Predators
        predator_factors = distribs.Product(
            [distribs.Continuous('x', 0., 1.),
            distribs.Continuous('y', 0., 1.)],
            shape='circle', scale=0.1, c0=0., c1=1., c2=0.8,
        )
        self._predator_generator = sprite_generators.generate_sprites(
            predator_factors, num_sprites=num_predators)

        # Walls
        self._walls = shapes.border_walls(
            visible_thickness=0., c0=0., c1=0., c2=0.5)

        self._meta_state = None

    def state_initializer(self):
        """State initializer method to be fed to environment."""
        agent = self._agent_generator(without_overlapping=self._walls)
        predators = self._predator_generator(
            without_overlapping=self._walls + agent)

        if self._meta_state is not None:
            if self._meta_state['step_count'] > self._threshold_trial_len:
                self._mass -= self._mass * self._step_scaling_factor
            else:
                self._mass += self._mass * self._step_scaling_factor
        for s in predators:
            s.mass = self._mass
        
        state = collections.OrderedDict([
            ('walls', self._walls),
            ('agent', agent),
            ('predators', predators),
        ])
        return state

    def meta_state_initializer(self):
        """Meta-state initializer method to be fed to environment."""
        self._meta_state = {'step_count': 0}
        return self._meta_state

Methods

def meta_state_initializer(self)

Meta-state initializer method to be fed to environment.

Expand source code
def meta_state_initializer(self):
    """Meta-state initializer method to be fed to environment."""
    self._meta_state = {'step_count': 0}
    return self._meta_state
def state_initializer(self)

State initializer method to be fed to environment.

Expand source code
def state_initializer(self):
    """State initializer method to be fed to environment."""
    agent = self._agent_generator(without_overlapping=self._walls)
    predators = self._predator_generator(
        without_overlapping=self._walls + agent)

    if self._meta_state is not None:
        if self._meta_state['step_count'] > self._threshold_trial_len:
            self._mass -= self._mass * self._step_scaling_factor
        else:
            self._mass += self._mass * self._step_scaling_factor
    for s in predators:
        s.mass = self._mass
    
    state = collections.OrderedDict([
        ('walls', self._walls),
        ('agent', agent),
        ('predators', predators),
    ])
    return state