Module moog.physics.maze_walk

Maze walk classes.

The classes in this file can be used to walk sprites in mazes, either randomly or with a deterministic policy. They should be used as forces in a .physics.Physics instance.

Expand source code
"""Maze walk classes.

The classes in this file can be used to walk sprites in mazes, either randomly
or with a deterministic policy. They should be used as forces in a
.physics.Physics instance.

import abc
import numpy as np
from moog import physics
from moog import maze_lib

# Small float to snap positions to grid
_EPSILON = 1e-5

class AbstractMazeWalk(physics.AbstractForce, metaclass=abc.ABCMeta):
    """Abstract maze walk class.
    All maze walk classes should inherit from this.

    def __init__(self, speed, maze_layer='walls'):

            speed: Constant speed at which sprites walk.
            maze_layer: String. Layer in the environment state containing the
                maze wall sprites.
        self._speed = speed
        self._maze_layer = maze_layer

    def reset(self, state):
        """Resetting re-infers the maze from the state."""
        self._maze = maze_lib.Maze.from_state(
            state, maze_layer=self._maze_layer)

    def step(self, *sprites, updates_per_env_step=1):
        """Step the sprites, updating their velocities."""
        for sprite in sprites:
            self._step_sprite(sprite, updates_per_env_step=updates_per_env_step)

    def _step_sprite(self, sprite, updates_per_env_step=1):
        """Step a sprite, updating its velocity."""

    def _get_pos_vel(self, sprite, updates_per_env_step=1):
        """Get position, velocity, and whether entering intersection.
            sprite: Sprite instance.
            updates_per_env_step: Int. Number of physics steps per environment

            position: Position of the sprite.
            velocity: Velocity of the sprite, adjusted to have self._speed
                speed if nonzero.
            entering_intersection: Bool. Whether or not the sprite is entering
                an intersection in the next step.
        position = sprite.position
        velocity = self._speed * np.sign(sprite.velocity)
        next_position = position + velocity / updates_per_env_step

        intersection = (
            self._maze.grid_side * self._get_nearest_point(position) +

        dist_next_current = sum(np.abs(next_position - position))
        dist_intersection_next = sum(np.abs(next_position - intersection))
        dist_intersection_current = sum(np.abs(next_position - intersection))

        entering_intersection = (
            dist_next_current > dist_intersection_current and 
            dist_next_current > dist_intersection_next)

        return position, velocity, entering_intersection

    def _get_nearest_point(self, position):
        """Get nearest point of the maze to a position.
            position: Float array of size (2,).

            nearest_inds: Int array of size (2,). Indices of the nearest maze
                point to position.
        nearest_inds = np.round(position / self._maze.grid_side - 0.5)
        return nearest_inds.astype(int)

class RandomMazeWalk(AbstractMazeWalk):
    """Random maze walk."""

    def __init__(self, speed, maze_layer='walls', prevent_backtracking=True,
                 allow_wall_backtracking=False, only_turn_at_wall=False):
        Applying this physics to sprites makes them walk with constant speed in
        a maze, taking random turns at corners and intersections.

            speed: Float. Speed for the sprite to move at.
            maze_layer: String. Layer in the environment state containing the
                maze sprites.
            prevent_backtracking: Bool. Whether to prevent backtracking
                (changing direction to go the opposite way).
            allow_wall_backtracking: Bool. Whether to allow backtracking if the
                sprite cannot go forward. If False, the sprite will turn when it
                hits a wall but never go back the way it came.
            only_turn_at_wall: Bool. Whether to only turn when cannot continue
        super(RandomMazeWalk, self).__init__(speed, maze_layer=maze_layer)
        self._prevent_backtracking = prevent_backtracking
        self._allow_wall_backtracking = allow_wall_backtracking
        self._only_turn_at_wall = only_turn_at_wall

    def _update_valid_directions(self, valid_directions, velocity):
        """Update valid directions based on what kinds of backtracking to allow.

        This updating occurs in place.

            valid_directions: Binary array of shape (2, 2) indicating which
                cardinal directions are available to move in.
            velocity: Current sprite velocity.
        # If not preventing backtracking, all open directions are valid
        if not self._prevent_backtracking:
        axis = np.argmax(np.abs(velocity))
        direction = np.sign(velocity[axis])

        # If velocity is zero, all open directions are valid
        if direction == 0:
        # If hit a wall and allow wall backtracking, all open directions are
        # valid
        can_continue = valid_directions[axis, int(0.5 * (1 + direction))]
        if not can_continue and self._allow_wall_backtracking:
        # If not hit a wall and only turn at wall, then continue
        if can_continue and self._only_turn_at_wall:
            valid_directions[axis, int(0.5 * (1 + direction))] = 1

        # If none of the above conditions are true, prevent backtracking
        valid_directions[axis, int(0.5 * (1 - direction))] = False
    def _step_sprite(self, sprite, updates_per_env_step=1):
        """Update a sprite's velocity.
            sprite: Sprite instance.
            updates_per_env_step: Int. Number of physics steps per environment
        if np.isinf(sprite.mass):

        position, velocity, entering_intersection = self._get_pos_vel(
            sprite, updates_per_env_step=updates_per_env_step)
        nearest_inds = self._get_nearest_point(position)
        # If sprite is entering an intersection or stationary, find the valid
        # directions to move in.
        if entering_intersection:
            valid_directions = self._maze.valid_directions(
                nearest_inds[0], nearest_inds[1])
            self._update_valid_directions(valid_directions, velocity)
        elif np.all(velocity == 0.):
            rounded_position = (
                self._maze.half_grid_side + nearest_inds * self._maze.grid_side)
            on_grid = np.abs(rounded_position - position) < _EPSILON
            if np.all(on_grid):
                valid_directions = self._maze.valid_directions(
                    nearest_inds[0], nearest_inds[1])
                valid_directions = np.zeros((2, 2))
                valid_directions[1 - np.argmax(on_grid)] = 1
            sprite.velocity = velocity
        # Sample new direction to move in
        sample = valid_directions * np.random.rand(2, 2)
        sample_ind = np.argmax(np.ravel(sample))

        # Update velocity to move in new direction, but don't eliminate current
        # velocity as that might be needed to get us to the intersection
        velocity[sample_ind // 2] = (
            (1 + _EPSILON) * self._speed * (2 * (sample_ind % 2) - 1))
        sprite.velocity = velocity

class DeterministicMazeWalk(AbstractMazeWalk):
    """Deterministic maze walk."""

    def __init__(self, speed, step_velocities, maze_layer='walls'):

            speed: Float. Speed for the sprite to move at.
            step_velocities: Iterable of 2-iterables. Each element is the
                perscribed velocity of a sprite at a timestep. The elements are
                read out front-to back each time a sprite enters an
                intersection. Note that to handle multiple sprites you have to
                interleave their velocities, since this class has no way of
                knowing which sprite it is stepping.
            maze_layer: String. Layer in the environment state containing the
                maze sprites.
        super(DeterministicMazeWalk, self).__init__(
            speed, maze_layer=maze_layer)
        self._step_velocities = [np.array(v) for v in step_velocities]

    def _step_sprite(self, sprite, updates_per_env_step=1):
        """Update a sprite's velocity.
            sprite: Sprite instance.
            updates_per_env_step: Int. Number of physics steps per environment
        _, velocity, entering_intersection = self._get_pos_vel(
            sprite, updates_per_env_step=updates_per_env_step)

        if entering_intersection or np.all(velocity == 0.):
            # If self._step_velocities is not empty use the next one. Otherwise,
            # do nothing.
            if len(self._step_velocities) > 0:
                new_velocity = self._step_velocities.pop(0)
                if np.any(np.sign(new_velocity) != np.sign(velocity)):
                    sprite.velocity = np.clip(
                        (1 - _EPSILON) * velocity + new_velocity, -self._speed,


class AbstractMazeWalk (speed, maze_layer='walls')

Abstract maze walk class.

All maze walk classes should inherit from this.



Constant speed at which sprites walk.
String. Layer in the environment state containing the maze wall sprites.
Expand source code
class AbstractMazeWalk(physics.AbstractForce, metaclass=abc.ABCMeta):
    """Abstract maze walk class.
    All maze walk classes should inherit from this.

    def __init__(self, speed, maze_layer='walls'):

            speed: Constant speed at which sprites walk.
            maze_layer: String. Layer in the environment state containing the
                maze wall sprites.
        self._speed = speed
        self._maze_layer = maze_layer

    def reset(self, state):
        """Resetting re-infers the maze from the state."""
        self._maze = maze_lib.Maze.from_state(
            state, maze_layer=self._maze_layer)

    def step(self, *sprites, updates_per_env_step=1):
        """Step the sprites, updating their velocities."""
        for sprite in sprites:
            self._step_sprite(sprite, updates_per_env_step=updates_per_env_step)

    def _step_sprite(self, sprite, updates_per_env_step=1):
        """Step a sprite, updating its velocity."""

    def _get_pos_vel(self, sprite, updates_per_env_step=1):
        """Get position, velocity, and whether entering intersection.
            sprite: Sprite instance.
            updates_per_env_step: Int. Number of physics steps per environment

            position: Position of the sprite.
            velocity: Velocity of the sprite, adjusted to have self._speed
                speed if nonzero.
            entering_intersection: Bool. Whether or not the sprite is entering
                an intersection in the next step.
        position = sprite.position
        velocity = self._speed * np.sign(sprite.velocity)
        next_position = position + velocity / updates_per_env_step

        intersection = (
            self._maze.grid_side * self._get_nearest_point(position) +

        dist_next_current = sum(np.abs(next_position - position))
        dist_intersection_next = sum(np.abs(next_position - intersection))
        dist_intersection_current = sum(np.abs(next_position - intersection))

        entering_intersection = (
            dist_next_current > dist_intersection_current and 
            dist_next_current > dist_intersection_next)

        return position, velocity, entering_intersection

    def _get_nearest_point(self, position):
        """Get nearest point of the maze to a position.
            position: Float array of size (2,).

            nearest_inds: Int array of size (2,). Indices of the nearest maze
                point to position.
        nearest_inds = np.round(position / self._maze.grid_side - 0.5)
        return nearest_inds.astype(int)


  • moog.physics.abstract_force.AbstractForce
  • abc.ABC



def reset(self, state)

Resetting re-infers the maze from the state.

Expand source code
def reset(self, state):
    """Resetting re-infers the maze from the state."""
    self._maze = maze_lib.Maze.from_state(
        state, maze_layer=self._maze_layer)
def step(self, *sprites, updates_per_env_step=1)

Step the sprites, updating their velocities.

Expand source code
def step(self, *sprites, updates_per_env_step=1):
    """Step the sprites, updating their velocities."""
    for sprite in sprites:
        self._step_sprite(sprite, updates_per_env_step=updates_per_env_step)
class DeterministicMazeWalk (speed, step_velocities, maze_layer='walls')

Deterministic maze walk.



Float. Speed for the sprite to move at.
Iterable of 2-iterables. Each element is the perscribed velocity of a sprite at a timestep. The elements are read out front-to back each time a sprite enters an intersection. Note that to handle multiple sprites you have to interleave their velocities, since this class has no way of knowing which sprite it is stepping.
String. Layer in the environment state containing the maze sprites.
Expand source code
class DeterministicMazeWalk(AbstractMazeWalk):
    """Deterministic maze walk."""

    def __init__(self, speed, step_velocities, maze_layer='walls'):

            speed: Float. Speed for the sprite to move at.
            step_velocities: Iterable of 2-iterables. Each element is the
                perscribed velocity of a sprite at a timestep. The elements are
                read out front-to back each time a sprite enters an
                intersection. Note that to handle multiple sprites you have to
                interleave their velocities, since this class has no way of
                knowing which sprite it is stepping.
            maze_layer: String. Layer in the environment state containing the
                maze sprites.
        super(DeterministicMazeWalk, self).__init__(
            speed, maze_layer=maze_layer)
        self._step_velocities = [np.array(v) for v in step_velocities]

    def _step_sprite(self, sprite, updates_per_env_step=1):
        """Update a sprite's velocity.
            sprite: Sprite instance.
            updates_per_env_step: Int. Number of physics steps per environment
        _, velocity, entering_intersection = self._get_pos_vel(
            sprite, updates_per_env_step=updates_per_env_step)

        if entering_intersection or np.all(velocity == 0.):
            # If self._step_velocities is not empty use the next one. Otherwise,
            # do nothing.
            if len(self._step_velocities) > 0:
                new_velocity = self._step_velocities.pop(0)
                if np.any(np.sign(new_velocity) != np.sign(velocity)):
                    sprite.velocity = np.clip(
                        (1 - _EPSILON) * velocity + new_velocity, -self._speed,


Inherited members

class RandomMazeWalk (speed, maze_layer='walls', prevent_backtracking=True, allow_wall_backtracking=False, only_turn_at_wall=False)

Random maze walk.


Applying this physics to sprites makes them walk with constant speed in a maze, taking random turns at corners and intersections.


Float. Speed for the sprite to move at.
String. Layer in the environment state containing the maze sprites.
Bool. Whether to prevent backtracking (changing direction to go the opposite way).
Bool. Whether to allow backtracking if the sprite cannot go forward. If False, the sprite will turn when it hits a wall but never go back the way it came.
Bool. Whether to only turn when cannot continue straight.
Expand source code
class RandomMazeWalk(AbstractMazeWalk):
    """Random maze walk."""

    def __init__(self, speed, maze_layer='walls', prevent_backtracking=True,
                 allow_wall_backtracking=False, only_turn_at_wall=False):
        Applying this physics to sprites makes them walk with constant speed in
        a maze, taking random turns at corners and intersections.

            speed: Float. Speed for the sprite to move at.
            maze_layer: String. Layer in the environment state containing the
                maze sprites.
            prevent_backtracking: Bool. Whether to prevent backtracking
                (changing direction to go the opposite way).
            allow_wall_backtracking: Bool. Whether to allow backtracking if the
                sprite cannot go forward. If False, the sprite will turn when it
                hits a wall but never go back the way it came.
            only_turn_at_wall: Bool. Whether to only turn when cannot continue
        super(RandomMazeWalk, self).__init__(speed, maze_layer=maze_layer)
        self._prevent_backtracking = prevent_backtracking
        self._allow_wall_backtracking = allow_wall_backtracking
        self._only_turn_at_wall = only_turn_at_wall

    def _update_valid_directions(self, valid_directions, velocity):
        """Update valid directions based on what kinds of backtracking to allow.

        This updating occurs in place.

            valid_directions: Binary array of shape (2, 2) indicating which
                cardinal directions are available to move in.
            velocity: Current sprite velocity.
        # If not preventing backtracking, all open directions are valid
        if not self._prevent_backtracking:
        axis = np.argmax(np.abs(velocity))
        direction = np.sign(velocity[axis])

        # If velocity is zero, all open directions are valid
        if direction == 0:
        # If hit a wall and allow wall backtracking, all open directions are
        # valid
        can_continue = valid_directions[axis, int(0.5 * (1 + direction))]
        if not can_continue and self._allow_wall_backtracking:
        # If not hit a wall and only turn at wall, then continue
        if can_continue and self._only_turn_at_wall:
            valid_directions[axis, int(0.5 * (1 + direction))] = 1

        # If none of the above conditions are true, prevent backtracking
        valid_directions[axis, int(0.5 * (1 - direction))] = False
    def _step_sprite(self, sprite, updates_per_env_step=1):
        """Update a sprite's velocity.
            sprite: Sprite instance.
            updates_per_env_step: Int. Number of physics steps per environment
        if np.isinf(sprite.mass):

        position, velocity, entering_intersection = self._get_pos_vel(
            sprite, updates_per_env_step=updates_per_env_step)
        nearest_inds = self._get_nearest_point(position)
        # If sprite is entering an intersection or stationary, find the valid
        # directions to move in.
        if entering_intersection:
            valid_directions = self._maze.valid_directions(
                nearest_inds[0], nearest_inds[1])
            self._update_valid_directions(valid_directions, velocity)
        elif np.all(velocity == 0.):
            rounded_position = (
                self._maze.half_grid_side + nearest_inds * self._maze.grid_side)
            on_grid = np.abs(rounded_position - position) < _EPSILON
            if np.all(on_grid):
                valid_directions = self._maze.valid_directions(
                    nearest_inds[0], nearest_inds[1])
                valid_directions = np.zeros((2, 2))
                valid_directions[1 - np.argmax(on_grid)] = 1
            sprite.velocity = velocity
        # Sample new direction to move in
        sample = valid_directions * np.random.rand(2, 2)
        sample_ind = np.argmax(np.ravel(sample))

        # Update velocity to move in new direction, but don't eliminate current
        # velocity as that might be needed to get us to the intersection
        velocity[sample_ind // 2] = (
            (1 + _EPSILON) * self._speed * (2 * (sample_ind % 2) - 1))
        sprite.velocity = velocity


Inherited members