Module mworks.task
Python task script for interfacing with MWorks.
The main class in this file is TaskManager, which is a task to be run by main.mwel.
Expand source code
"""Python task script for interfacing with MWorks.
The main class in this file is TaskManager, which is a task to be run by
main.mwel.
"""
################################################################################
#### Update sys.path
################################################################################
# MWorks clones python dependencies into some mworks-specific directory, so we
# need to manually set the sys.path here in order for imports to work. You will
# need to change the pwd and python site-packages here to point to the current
# directory and your computer's python site-packages (or that of a virtual
# environment).
_PWD = '/Users/nicholaswatters/Desktop/grad_school/research/mehrdad/moog/mworks'
_PYTHON_SITE_PACKAGES = (
'/Users/nicholaswatters/miniconda3/envs/mworks_moog/lib/python3.8/'
'site-packages'
)
import sys
if '' not in sys.path:
sys.path.insert(0, '')
if _PYTHON_SITE_PACKAGES not in sys.path:
sys.path.append(_PYTHON_SITE_PACKAGES)
if _PWD not in sys.path:
sys.path.append(_PWD)
################################################################################
#### Imports
################################################################################
from datetime import datetime
import importlib
import numpy as np
import os
import threading
from moog import action_spaces
from moog import environment
from moog import env_wrappers
from moog import observers
class TaskManager():
"""MOOG task manager.
This loads a MOOG environment and handles the interface for MWorks. It
should only be used by main.mwel.
"""
def __init__(self, config_name='configs.pong', level=''):
"""Constructor.
Args:
config_name: String. Name of config file in configs/ to run.
level: Argument for config_name.get_config().
"""
self.lock = threading.Lock()
config = importlib.import_module(config_name)
# Force MWorks server to reload the config, so changes to the config
# will be propagated to MWorks without restarting the server.
importlib.reload(config)
config = config.get_config(level)
image_size = (getvar('image_pixel_width'), getvar('image_pixel_height'))
renderer = observers.PILRenderer(
image_size=image_size,
anti_aliasing=1,
color_to_rgb=config['observers']['image'].color_to_rgb,
)
config['observers'] = {'image': renderer}
log_dir = os.path.join(
_PWD, 'logs/' + datetime.now().strftime('%Y_%m_%d'),
'/'.join(config_name.split('.')), 'level_' + str(level))
self.env = env_wrappers.LoggingEnvironment(
environment=environment.Environment(**config),
log_dir=log_dir,
)
if isinstance(self.env.action_space, action_spaces.Joystick):
self._using_joystick = True
elif isinstance(self.env.action_space, action_spaces.Grid):
self._using_joystick = False
else:
raise ValueError(
'Unrecognized action space {}'.format(config['action_space']))
def reset(self):
"""Reset environment.
This should be called at the beginning of every trial.
"""
self.env.reset()
unregister_event_callbacks()
self.events = {}
if self._using_joystick:
controller_varnames = ('x_force', 'y_force')
else:
controller_varnames = (
'up_pressed', 'down_pressed', 'left_pressed', 'right_pressed')
for varname in controller_varnames:
self._register_event_callback(varname)
self.complete = False
def _register_event_callback(self, varname):
self.events[varname] = []
def cb(evt):
with self.lock:
self.events[varname].append(evt.data)
register_event_callback(varname, cb)
def _get_action_joystick(self):
"""Get joystick action."""
if self.env.step_count == 0:
# Don't move on the first step
# We set x_force and y_force to zero because some joysticks
# initially give a non-zero action, which persists unless we
# explicitly terminate zero it out.
setvar('x_force', 0.)
setvar('y_force', 0.)
return np.zeros(2)
else:
return np.array([getvar('x_force'), getvar('y_force')])
def _get_action_grid(self):
"""Get grid action."""
if self.env.step_count == 0:
# Don't move on the first step
# We set x_force and y_force to zero because some joysticks
# initially give a non-zero action, which persists unless we
# explicitly terminate zero it out.
setvar('left_pressed', 0)
setvar('right_pressed', 0)
setvar('down_pressed', 0)
setvar('up_pressed', 0)
return 4
else:
keys_pressed = np.array([
getvar('left_pressed'),
getvar('right_pressed'),
getvar('down_pressed'),
getvar('up_pressed'),
])
if sum(keys_pressed) > 1:
keys_pressed[self._keys_pressed] = 0
if sum(keys_pressed) > 1:
random_ind = np.random.choice(np.argwhere(keys_pressed)[:, 0])
keys_pressed = np.zeros(4, dtype=int)
keys_pressed[random_ind] = 1
self._keys_pressed = keys_pressed
if sum(keys_pressed):
key_ind = np.argwhere(keys_pressed)[0, 0]
else:
key_ind = 4
return key_ind
def step(self):
"""Step environment."""
if self.complete:
# Don't step if the task is already complete. Returning None tells
# MWorks that the image texture doesn't need to be updated.
return
if isinstance(self.env.action_space, action_spaces.Joystick):
action = self._get_action_joystick()
elif isinstance(self.env.action_space, action_spaces.Grid):
action = self._get_action_grid()
else:
raise ValueError(
'Unrecognized action space {}'.format(config['action_space']))
timestep = self.env.step(action)
reward = timestep.reward
img = timestep.observation['image']
if reward:
setvar('reward_duration', reward * 1000) # ms to us
if timestep.last():
setvar('end_trial', True)
self.complete = True
# MWorks' Python image stimulus requires a contiguous buffer, so we use
# ascontiguousarray to provide one.
to_return = np.ascontiguousarray(img)
return to_return
Classes
class TaskManager (config_name='configs.pong', level='')
-
MOOG task manager.
This loads a MOOG environment and handles the interface for MWorks. It should only be used by main.mwel.
Constructor.
Args
config_name
- String. Name of config file in configs/ to run.
level
- Argument for config_name.get_config().
Expand source code
class TaskManager(): """MOOG task manager. This loads a MOOG environment and handles the interface for MWorks. It should only be used by main.mwel. """ def __init__(self, config_name='configs.pong', level=''): """Constructor. Args: config_name: String. Name of config file in configs/ to run. level: Argument for config_name.get_config(). """ self.lock = threading.Lock() config = importlib.import_module(config_name) # Force MWorks server to reload the config, so changes to the config # will be propagated to MWorks without restarting the server. importlib.reload(config) config = config.get_config(level) image_size = (getvar('image_pixel_width'), getvar('image_pixel_height')) renderer = observers.PILRenderer( image_size=image_size, anti_aliasing=1, color_to_rgb=config['observers']['image'].color_to_rgb, ) config['observers'] = {'image': renderer} log_dir = os.path.join( _PWD, 'logs/' + datetime.now().strftime('%Y_%m_%d'), '/'.join(config_name.split('.')), 'level_' + str(level)) self.env = env_wrappers.LoggingEnvironment( environment=environment.Environment(**config), log_dir=log_dir, ) if isinstance(self.env.action_space, action_spaces.Joystick): self._using_joystick = True elif isinstance(self.env.action_space, action_spaces.Grid): self._using_joystick = False else: raise ValueError( 'Unrecognized action space {}'.format(config['action_space'])) def reset(self): """Reset environment. This should be called at the beginning of every trial. """ self.env.reset() unregister_event_callbacks() self.events = {} if self._using_joystick: controller_varnames = ('x_force', 'y_force') else: controller_varnames = ( 'up_pressed', 'down_pressed', 'left_pressed', 'right_pressed') for varname in controller_varnames: self._register_event_callback(varname) self.complete = False def _register_event_callback(self, varname): self.events[varname] = [] def cb(evt): with self.lock: self.events[varname].append(evt.data) register_event_callback(varname, cb) def _get_action_joystick(self): """Get joystick action.""" if self.env.step_count == 0: # Don't move on the first step # We set x_force and y_force to zero because some joysticks # initially give a non-zero action, which persists unless we # explicitly terminate zero it out. setvar('x_force', 0.) setvar('y_force', 0.) return np.zeros(2) else: return np.array([getvar('x_force'), getvar('y_force')]) def _get_action_grid(self): """Get grid action.""" if self.env.step_count == 0: # Don't move on the first step # We set x_force and y_force to zero because some joysticks # initially give a non-zero action, which persists unless we # explicitly terminate zero it out. setvar('left_pressed', 0) setvar('right_pressed', 0) setvar('down_pressed', 0) setvar('up_pressed', 0) return 4 else: keys_pressed = np.array([ getvar('left_pressed'), getvar('right_pressed'), getvar('down_pressed'), getvar('up_pressed'), ]) if sum(keys_pressed) > 1: keys_pressed[self._keys_pressed] = 0 if sum(keys_pressed) > 1: random_ind = np.random.choice(np.argwhere(keys_pressed)[:, 0]) keys_pressed = np.zeros(4, dtype=int) keys_pressed[random_ind] = 1 self._keys_pressed = keys_pressed if sum(keys_pressed): key_ind = np.argwhere(keys_pressed)[0, 0] else: key_ind = 4 return key_ind def step(self): """Step environment.""" if self.complete: # Don't step if the task is already complete. Returning None tells # MWorks that the image texture doesn't need to be updated. return if isinstance(self.env.action_space, action_spaces.Joystick): action = self._get_action_joystick() elif isinstance(self.env.action_space, action_spaces.Grid): action = self._get_action_grid() else: raise ValueError( 'Unrecognized action space {}'.format(config['action_space'])) timestep = self.env.step(action) reward = timestep.reward img = timestep.observation['image'] if reward: setvar('reward_duration', reward * 1000) # ms to us if timestep.last(): setvar('end_trial', True) self.complete = True # MWorks' Python image stimulus requires a contiguous buffer, so we use # ascontiguousarray to provide one. to_return = np.ascontiguousarray(img) return to_return
Methods
def reset(self)
-
Reset environment.
This should be called at the beginning of every trial.
Expand source code
def reset(self): """Reset environment. This should be called at the beginning of every trial. """ self.env.reset() unregister_event_callbacks() self.events = {} if self._using_joystick: controller_varnames = ('x_force', 'y_force') else: controller_varnames = ( 'up_pressed', 'down_pressed', 'left_pressed', 'right_pressed') for varname in controller_varnames: self._register_event_callback(varname) self.complete = False
def step(self)
-
Step environment.
Expand source code
def step(self): """Step environment.""" if self.complete: # Don't step if the task is already complete. Returning None tells # MWorks that the image texture doesn't need to be updated. return if isinstance(self.env.action_space, action_spaces.Joystick): action = self._get_action_joystick() elif isinstance(self.env.action_space, action_spaces.Grid): action = self._get_action_grid() else: raise ValueError( 'Unrecognized action space {}'.format(config['action_space'])) timestep = self.env.step(action) reward = timestep.reward img = timestep.observation['image'] if reward: setvar('reward_duration', reward * 1000) # ms to us if timestep.last(): setvar('end_trial', True) self.complete = True # MWorks' Python image stimulus requires a contiguous buffer, so we use # ascontiguousarray to provide one. to_return = np.ascontiguousarray(img) return to_return