Module mworks.task
Python task script for interfacing with MWorks.
The main class in this file is TaskManager, which is a task to be run by main.mwel.
Classes
class TaskManager (config_name='configs.pong', level='')
-
Expand source code
class TaskManager(): """MOOG task manager. This loads a MOOG environment and handles the interface for MWorks. It should only be used by main.mwel. """ def __init__(self, config_name='configs.pong', level=''): """Constructor. Args: config_name: String. Name of config file in configs/ to run. level: Argument for config_name.get_config(). """ self.lock = threading.Lock() config = importlib.import_module(config_name) # Force MWorks server to reload the config, so changes to the config # will be propagated to MWorks without restarting the server. importlib.reload(config) config = config.get_config(level) image_size = (getvar('image_pixel_width'), getvar('image_pixel_height')) renderer = observers.PILRenderer( image_size=image_size, anti_aliasing=1, color_to_rgb=config['observers']['image'].color_to_rgb, ) config['observers'] = {'image': renderer} log_dir = os.path.join( _PWD, 'logs/' + datetime.now().strftime('%Y_%m_%d'), '/'.join(config_name.split('.')), 'level_' + str(level)) self.env = env_wrappers.LoggingEnvironment( environment=environment.Environment(**config), log_dir=log_dir, ) if isinstance(self.env.action_space, action_spaces.Joystick): self._using_joystick = True elif isinstance(self.env.action_space, action_spaces.Grid): self._using_joystick = False else: raise ValueError( 'Unrecognized action space {}'.format(config['action_space'])) def reset(self): """Reset environment. This should be called at the beginning of every trial. """ self.env.reset() unregister_event_callbacks() self.events = {} if self._using_joystick: controller_varnames = ('x_force', 'y_force') else: controller_varnames = ( 'up_pressed', 'down_pressed', 'left_pressed', 'right_pressed') for varname in controller_varnames: self._register_event_callback(varname) self.complete = False def _register_event_callback(self, varname): self.events[varname] = [] def cb(evt): with self.lock: self.events[varname].append(evt.data) register_event_callback(varname, cb) def _get_action_joystick(self): """Get joystick action.""" if self.env.step_count == 0: # Don't move on the first step # We set x_force and y_force to zero because some joysticks # initially give a non-zero action, which persists unless we # explicitly terminate zero it out. setvar('x_force', 0.) setvar('y_force', 0.) return np.zeros(2) else: return np.array([getvar('x_force'), getvar('y_force')]) def _get_action_grid(self): """Get grid action.""" if self.env.step_count == 0: # Don't move on the first step # We set x_force and y_force to zero because some joysticks # initially give a non-zero action, which persists unless we # explicitly terminate zero it out. setvar('left_pressed', 0) setvar('right_pressed', 0) setvar('down_pressed', 0) setvar('up_pressed', 0) return 4 else: keys_pressed = np.array([ getvar('left_pressed'), getvar('right_pressed'), getvar('down_pressed'), getvar('up_pressed'), ]) if sum(keys_pressed) > 1: keys_pressed[self._keys_pressed] = 0 if sum(keys_pressed) > 1: random_ind = np.random.choice(np.argwhere(keys_pressed)[:, 0]) keys_pressed = np.zeros(4, dtype=int) keys_pressed[random_ind] = 1 self._keys_pressed = keys_pressed if sum(keys_pressed): key_ind = np.argwhere(keys_pressed)[0, 0] else: key_ind = 4 return key_ind def step(self): """Step environment.""" if self.complete: # Don't step if the task is already complete. Returning None tells # MWorks that the image texture doesn't need to be updated. return if isinstance(self.env.action_space, action_spaces.Joystick): action = self._get_action_joystick() elif isinstance(self.env.action_space, action_spaces.Grid): action = self._get_action_grid() else: raise ValueError( 'Unrecognized action space {}'.format(config['action_space'])) timestep = self.env.step(action) reward = timestep.reward img = timestep.observation['image'] if reward: setvar('reward_duration', reward * 1000) # ms to us if timestep.last(): setvar('end_trial', True) self.complete = True # MWorks' Python image stimulus requires a contiguous buffer, so we use # ascontiguousarray to provide one. to_return = np.ascontiguousarray(img) return to_return
MOOG task manager.
This loads a MOOG environment and handles the interface for MWorks. It should only be used by main.mwel.
Constructor.
Args
config_name
- String. Name of config file in configs/ to run.
level
- Argument for config_name.get_config().
Methods
def reset(self)
-
Expand source code
def reset(self): """Reset environment. This should be called at the beginning of every trial. """ self.env.reset() unregister_event_callbacks() self.events = {} if self._using_joystick: controller_varnames = ('x_force', 'y_force') else: controller_varnames = ( 'up_pressed', 'down_pressed', 'left_pressed', 'right_pressed') for varname in controller_varnames: self._register_event_callback(varname) self.complete = False
Reset environment.
This should be called at the beginning of every trial.
def step(self)
-
Expand source code
def step(self): """Step environment.""" if self.complete: # Don't step if the task is already complete. Returning None tells # MWorks that the image texture doesn't need to be updated. return if isinstance(self.env.action_space, action_spaces.Joystick): action = self._get_action_joystick() elif isinstance(self.env.action_space, action_spaces.Grid): action = self._get_action_grid() else: raise ValueError( 'Unrecognized action space {}'.format(config['action_space'])) timestep = self.env.step(action) reward = timestep.reward img = timestep.observation['image'] if reward: setvar('reward_duration', reward * 1000) # ms to us if timestep.last(): setvar('end_trial', True) self.complete = True # MWorks' Python image stimulus requires a contiguous buffer, so we use # ascontiguousarray to provide one. to_return = np.ascontiguousarray(img) return to_return
Step environment.