Source code for xmen.experiment

"""A module containing the Experiment class definition."""
#  Copyright (C) 2019  Robert J Weston, Oxford Robotics Institute
#
#  xmen
#  email:   robw@robots.ox.ac.uk
#  github: https://github.com/robw4/xmen/
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#  You should have received a copy of the GNU General Public License
#   along with this program. If not, see <http://www.gnu.org/licenses/>.

import sys
import datetime
import argparse
from typing import Any
import signal

from xmen.utils import get_meta, get_version, commented_to_py, DATE_FORMAT, recursive_print_lines, TypedMeta, MultiOut
from xmen.server import *
import os

helps = {
    'execute': 'Execute the experiment from a given params.yml file linked to folder. If no folder is '
               'passed the experiment will be run in a detached state. This is useful for debugging but is'
               'not recommended for deployment.',
    'update': 'Update the parameters given by a yaml string. Note this will be called before '
              'other flags and can be used in combination with --to_root, --to_defaults, and --link',
    'root': 'Generate a run script and defaults.yml file for interfacing with xgent',
    'debug': 'Run experiment in debug mode. The experiments debug will be called before registering.',
    'txt': 'Also log stdout and stderr to an out.txt file. Enabled by default (default taken from xmen config)',
    'restart': 'Restart the experiment',
    'purpose': 'A string giving the purpose of the current experiment'}

from xmen.config import Config

CONFIG = Config()

import textwrap
for k in helps:
    helps[k] = '\n'.join(textwrap.wrap(helps[k], 50))


class NullRoot(str):
    def __new__(cls):
        obj = str.__new__(cls, os.devnull)
        return obj

    def __add__(self, other):
        return self


experiment_parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
experiment_parser.add_argument('--update', '-u', type=str, default=None, nargs='+', help=helps['update'])
experiment_parser.add_argument('--purpose', '-p', type=str, default=None, nargs='+', help=helps['purpose'])
experiment_parser.add_argument('--execute', '-x', type=str, default=None, help=helps['execute'],
                               nargs='?', const=NullRoot())
experiment_parser.add_argument('--to_root', '-r', type=str, default=None, help=helps['root'])
# optional extras
experiment_parser.add_argument('--debug', '-d', default=None, action='store_true', help=helps['debug'])
experiment_parser.add_argument('--txt', '-t', default=not CONFIG.redirect_stdout, action='store_true', help=helps['txt'])
experiment_parser.add_argument('--restart', '-f', default=None, action='store_true', help=helps['restart'])
_SPECIALS = ['_root', '_status', '_purpose', '_messages', '_version', '_meta']
_SPECIALS += ['_user', '_host', '_notes', '_timestamps']
_DEPRECIATED = ['_name', '_created']


class TimeoutException(Exception):
    pass


def get_time():
    return datetime.datetime.now().strftime(DATE_FORMAT)


DEFAULT = 'default'
REGISTERED = 'registered'
RUNNING = 'running'
ERROR = 'error'
STOPPED = 'stopped'
TIMEOUT = 'timeout'
FINISHED = 'finished'
DETACHED = 'detached'
REQUEUE = 'requeue'
DELETED = 'deleted'


def get_timestamps(created=None, start=None, stopped=None, last=None, registered=None):
    return {
        'created': created,
        'start': start,
        'stopped': stopped,
        'last': last,
        'registered': registered}


[docs]class Experiment(object, metaclass=TypedMeta): """Base class from which all other experiments derive. Experiments are defined by: 1. *Parameters*: class attributes declared with the special parameter ``# @p`` in a comment after the definition. 2. *Execution*: defined by overloading the ``run()`` method For example:: class AnExperiment(Experiment): ''' Doc strings should be used to document the purpose of the experiment''' # experiment parameters are defined as class attributes with an @p tag in there comment a: str = 'Hello' # @p a parameter b: str = 'World!' # @p another parameter # experiment execution code is defined in the experiments run method def run(self): print(f'{self.a} {self.b}) """ _params = {} # Used to store parameters registered by the MetaClass
[docs] def __init__(self, root=None, purpose='', copy_params=True, **kwargs): """Create a new experiment object. Args: root, name (str): If not None then the experiment will be registered to a folder ``{root}\{name}`` purpose (str): An optional string giving the purpose of the experiment. copy_params (bool): If True then parameters are deep copied to the object instance from the class definition. Mutable attributes will no longer be shared. **kwargs: Override parameter defaults. """ import copy if copy_params: for k in [k for k in dir(self) if k in self._params]: setattr(self, k, copy.deepcopy(getattr(self, k))) if root is None: self._root: Optional[str] = None # @p The root directory of the experiment self._status: str = DEFAULT # @p One of ['default' | 'created' | 'running' | 'error' | 'finished'] # self._created: Optional[float] = None # @p Initial time the experiment was created self._notes: Optional[List[str]] = None # @p Notes attached to the experiment self._purpose: Optional[str] = None # @p A description of the experiment purpose # new attributes self._user: Optional[str] = CONFIG.local_user # @p The user of the experiment self._host: Optional[str] = CONFIG.local_host # @p The name of the default host self._timestamps: Dict[str, Optional[str]] = get_timestamps() # @p timestamps attached to the experiment # These can all be varied self._messages: Dict[Any, Any] = {} # @p Messages left by the experiment self._version: Optional[Dict[Any, Any]] = None # @p Experiment version information. See `get_version` self._meta: Optional[Dict] = None # @p The global configuration for the experiment manager # depreciated self._specials: List[str] = _SPECIALS self._helps: Optional[Dict] = None # queues self._queues = [] self._processes = [] self._manager = None else: raise ValueError("Either both or neither of name and root can be set") # Update kwargs self.update(kwargs) if root is not None: self.register(root, purpose=purpose)
@property def root(self): """The root directory to which the experiment belongs""" return self._root @property def directory(self): return self._root @property def status(self): """The status of the experiment. One of ``'default'``, ``'registered'``, ``'running'``, ``'finished'`` or ``'error'``.""" return self._status @property def created(self): """The date the experiment parameters was first registered.""" return self._timestamps['created'] @property def start(self): """The time the experiment last started running.""" return self._timestamps['start'] @property def registered(self): """The time the experiment was last registered.""" return self._timestamps['registered'] @property def stopped(self): """The time the experiment was last stopped.""" return self._timestamps['stopped'] @property def last(self): """The time the experiment state was last communicated.""" return self._timestamps['last'] @property def purpose(self): """A string giving a purpose message for the experiment""" return self._purpose @property def messages(self): """A dictionary of messages logged by the experimet.""" return self._messages @property def version(self): """A dictionary giving the version information for the experiment""" return self._version @property def notes(self): """A dictionary containing the notes attached to the experiment""" return self._notes @property def user(self): """The current user of the experiment""" return self._user @property def host(self): """The host of the experiment""" return self._host @root.setter def root(self, value): raise AttributeError('Property root cannot be set.') @status.setter def status(self, value): raise AttributeError('Property status cannot be set.') @created.setter def created(self, value): raise AttributeError('Property created cannot be set.') @purpose.setter def purpose(self, value): raise AttributeError('Property purpose cannot be set.') @messages.setter def messages(self, value): raise AttributeError('Property messages cannot be set.') @version.setter def version(self, value): raise AttributeError('Property version cannot be set.') @user.setter def user(self, value): raise AttributeError('Property user cannot be set.') @host.setter def host(self, value): """The host of the experiment""" raise AttributeError('Property host cannot be set.')
[docs] def note(self, string, remove=False): """Leave a note with the experiment. Will be removed if ``remove`` is ``False``""" if self._notes is None: self._notes = [] if remove: self._notes.remove(string) else: self._notes.append(string) if not self._notes: self._notes = None
[docs] def get_param_helps(self): """Get help for all attributes in class (including inherited and private).""" return {k: v[3].strip() for k, v in self._params.items()}
def update_version(self): if hasattr(self, 'fn'): self._version = get_version(fn=self.fn) else: self._version = get_version(cls=self.__class__) def update_meta(self, get_platform=False, get_cpu=False, get_memory=False, save=False, **kwargs): self._meta = get_meta(get_platform, get_cpu, get_memory, **kwargs) if save: self._save()
[docs] def to_defaults(self, defaults_dir): """Create a ``defaults.yml`` file from experiment object. Any base class inheriting from Experiment can create a default file as:: MyExperiment().to_yaml('/dir/to/defaults/root') """ assert self._status == DEFAULT, 'An experiment can only be converted to default if it has not been registered' self.update_version() self.update_meta() self._timestamps['created'] = get_time() if not os.path.exists(defaults_dir): os.makedirs(defaults_dir) self._save(defaults_dir)
def as_yaml(self, as_string=True): from xmen.utils import dic_to_yaml from ruamel.yaml.comments import CommentedMap params = {k: getattr(self, k) for k in dir(self) if k in self.param_keys() or k in self._specials} params = {k: v for k, v in params.items() if '_' + k not in self.__dict__} helps = self.get_param_helps() # Add definition module to experiment object map = CommentedMap() for i, (k, v) in enumerate(params.items()): if self._status == DEFAULT: if k in ['_root', '_status', '_purpose', '_messages', '_origin', '_timestamps', '_notes', '_type']: continue comment = helps[k] if comment == '': comment = None map.insert(i, k, v, comment=comment) if as_string: return dic_to_yaml(map) else: return map def to_update_request(self): from xmen.config import Config config = Config() data = self.as_yaml() root = f'{config.local_user}@{config.local_host}:{self.root}' return UpdateExperiment( user=config.user, password=config.password, root=root, data=data, status=self.status) def _save(self, defaults_dir=None): """Save experiment to either a defaults.yml file or a params.yml file depending on its status""" if self._status == DEFAULT: path = os.path.join(os.path.join(defaults_dir, 'defaults.yml')) else: path = os.path.join(self.directory, 'params.yml') if self.status == RUNNING: self._timestamps['last'] = get_time() # save parameters (always) string = self.as_yaml() with open(path, 'w') as file: file.write(string) if self.status not in [DEFAULT, REGISTERED]: request = self.to_update_request() for q in self._queues: # get one item and put one item try: q.get(block=False) except queue.Empty: pass q.put(request, block=False) elif self.status == REGISTERED: # the global configuration will link with the server... Config().link(self.root)
[docs] def debug(self): """Inherited classes may overload debug. Used to define a set of open_socket for minimum example""" return self
[docs] def from_yml(self, path, copy=False): """Load state from either a ``params.yml`` or ``defaults.yml`` file (inferred from the filename). The status of the experiment will be updated to ``'default'`` if ``'defaults.yml'`` file else ``'registered'`` if ``params.yml`` file. If copy is ``True`` then only user defined parameters themselves will be copied from the params.yml file. """ from xmen.utils import dic_from_yml params = dic_from_yml(path=path) # backward compatibility if '_name' in params: params['_root'] = os.path.join(params['_root'], params.pop('_name')) if '_created' in params: params['_timestamps'] = get_timestamps() params['_timestamps']['created'] = params.pop('_created') params = {k: commented_to_py(v) for k, v in params.items() if k in self.__dict__} if copy: # Copy only parameter values themselves (and not specials) params = {k: v for k, v in params.items() if not k.startswith('_')} # update created date self.__dict__.update(params)
[docs] def register(self, root, purpose='', force=True, same_names=100, restart=False, **_): """Register an experiment to an experiment directory. Its status will be updated to ``registered``. If an experiment called ``name`` exists in ``root`` and ``force==True`` then name will be appended with an int (eg. ``{name}_0``) until a unique name is found in ``root``. If ``force==False`` a ``ValueError`` will be raised. If restart is also passed, and an experiment called name also exists, then the experiment will be loaded from the params.yml file found in ``'{root}/{name}'``. Raises: ValueError: if ``{root}/{name}`` already contains a ``params.yml`` file """ exists = os.path.exists(os.path.join(root, 'params.yml')) if not restart or (restart and not exists): if exists: i = 0 if force: while i < same_names: if not os.path.exists(os.path.join(root + '_' + str(i), 'params.yml')): root += '_' + str(i) # name += '_' + str(i) break i += 1 elif i == same_names or not force: raise ValueError(f'Experiment folder {root} already contains a params.yml file. ' f'An Experiment cannot be created in an already existing experiment folder') # Make the folder if it does not exist if not os.path.isdir(root): os.makedirs(root) self.update_version() # Get new version information self.update_meta() # Get the newest meta information self._root = root self._purpose = purpose if self._timestamps['created'] is None: self._timestamps['created'] = get_time() self._timestamps['registered'] = get_time() self._status = REGISTERED self._save() else: self.from_yml(os.path.join(root, 'params.yml')) if self.status != REGISTERED: self._status = REGISTERED self._save()
[docs] def to_root(self, root_dir, shell='/bin/bash'): """Generate a ``defaults.yml`` file and ``script.sh`` file in ``root_dir``. Args: root_dir (str): A path to the root directory in which to generate a script.sh and defaults.yml to run the experiment. """ import stat # get_git is deliberately called outside to_defaults as git information is also added to script.sh self.update_version() self.update_meta(save=False) from xmen.utils import get_run_script if hasattr(self, 'fn'): script = get_run_script(*self.fn) path = os.path.join(root_dir, '.'.join(self.fn)) else: path = os.path.join(root_dir, '.'.join([self.__class__.__module__, self.__class__.__name__])) script = get_run_script(self.__class__.__module__, self.__class__.__name__) if not os.path.exists(root_dir): os.makedirs(root_dir) # Save to root directory open(path, 'w').write(script) st = os.stat(path) os.chmod(path, st.st_mode | stat.S_IEXEC) open(os.path.join(root_dir, 'script.sh'), 'w').write( f'#!{shell}\nexec {path} --execute ${{1}}') self.to_defaults(root_dir)
def _update_status(self, status): """Update the status of the experiment""" self._status = status self._save() def detach(self): self._root = NullRoot() self._status = DETACHED
[docs] def update(self, kwargs): """Update the parameters with a given dictionary""" if self._status in [DEFAULT, DETACHED]: if any([k not in self.param_keys() and k in self._specials for k in kwargs]): raise ValueError('Key not recognised!') else: self.__dict__.update(kwargs) else: raise ValueError('Parameters of a created experiment cannot be updated.')
def param_keys(self): return self._params.keys() def __setattr__(self, key, value): """Attributes can only be changed when the status of the experiment is default""" if '_status' in self.__dict__: if key in self.param_keys() and self._status not in [DEFAULT, DETACHED] and key not in self._specials: raise AttributeError('Parameters can only be changed when status = "default" or "detached"') self.__dict__.update({key: value}) def __enter__(self): def _sigusr1_handler(signum, handler): raise TimeoutException # set up the signal usr1 signal handler signal.signal(signal.SIGUSR1, _sigusr1_handler) # get all the meta information for the current system meta = get_meta(get_platform=True, get_cpu=True, get_memory=True, get_disk=True, get_slurm=True, get_conda=CONFIG.save_conda, get_network=True, get_gpu=True, get_environ=True) # save conda environment information # saved seperately from the params.yml file for clarity conda = meta.pop('conda', None) from ruamel.yaml import YAML yaml = YAML() yaml.default_flow_style = False if conda is not None: with open(os.path.join(self.directory, 'environment.yml'), 'w') as f: yaml.dump(conda, f) # update internal state self._meta = meta self._update_status(RUNNING) self._timestamps['start'] = get_time() # setup queues for interfacing with remote server from multiprocessing import Process, Manager self._manager = Manager() self._queues += [self._manager.Queue(maxsize=2)] p = Process(target=send_request_task, args=(self._queues[0], )) p.start() self._processes += [p] # finally save the experiment (and send UpdateExperiment request to server) self._save() return self def __exit__(self, exc_type, exc_val, exc_tb): self._timestamps['stopped'] = get_time() if exc_type is None: self._update_status(FINISHED) elif exc_type is KeyboardInterrupt: print('########################') print('Stopping experiment') print('########################') self._update_status(STOPPED) elif exc_type is TimeoutException: print('########################') print('Timeout encountered') print('########################') self._update_status(TIMEOUT) slurm_job = os.environ.get('SLURM_JOBID', None) if slurm_job is not None and CONFIG.reque: import subprocess self._update_status(REQUEUE) subprocess.call(['scontrol', 'requeue', f'{slurm_job}']) else: print('########################') print('An error occurred encountered') print('########################') if self.status not in [TIMEOUT, STOPPED]: self._update_status(ERROR) # stop running processes for q in self._queues: q.put(None) for p in self._processes: p.join() print('Processes terminated') def __call__(self, *args, **kwargs): """Used to run experiment. Upon entering the experiment status is updated to ``'running`` before ``args`` and ``kwargs`` are passed to ``run()``. If ``run()`` is successful the experiment ``status`` is updated to ``'finished'`` else it will be given ``status='error'``. Both *args and **kwargs are passed to self.run. """ if self._status == 'default': raise ValueError('An experiment in default status must be registered before it can be executed') with self: self.run(*args, **kwargs) def run(self, *args, **kwargs): raise NotImplementedError('Derived classes must implement the run method in order to be called')
[docs] def message(self, messages, keep='latest', leader=None): """Add a message to the experiment (and an experiments params.yml file). If the experiment is not registered to a root then no messages will be logged. Args: messages (dict): A dictionary of messages. Keys are interpreted as subjects and values interpreted as messages. If the ``defaults.yml`` already contains subject then the message for subject will be updated. keep (str): which message to keep in the case of collision. One of ['latest', 'min', 'max'] leader (str): If not None then all messages will be saved if the keep condition is met for the leader key. Note: Only messages of type float, int and string are supported. Any other message will be converted to type float (if possible) then string thereafter. """ if self._root is not None: # Add leader to messages group if leader is not None: best = self.compare(leader, messages[leader], keep)[0] if best: self._messages.update({k: self.convert_type(v) for k, v in messages.items()}) else: for k, v in messages.items(): if self.compare(k, v, keep)[0]: self._messages.update({k: self.convert_type(v)}) self._save()
@staticmethod def convert_type(v): if not isinstance(v, (str, float, int)): try: v = float(v) except (ValueError, TypeError): v = str(v) pass return v def compare(self, k, v, keep='latest'): assert keep in ['max', 'min', 'latest'] cur = self.messages.get(k, None) if cur is not None and keep in ['max', 'min']: out = {'max': max, 'min': min}[keep](v, cur) return out == v, out else: return True, v
[docs] def parse_args(self): """Configure the experiment instance from the command line arguments. """ from xmen.utils import IncompatibleYmlException experiment_parser.prog = f'xmen {self.__class__.__name__}' # Configure help information from the class n = 7 experiment_parser.usage = '\n'.join( [' ' * 0 + experiment_parser.prog + ' ' + '-u YML -r PATH', ' ' * n + experiment_parser.prog + ' ' + '-u YML -x DIR ', ' ' * n + experiment_parser.prog + ' ' + '-x PARAMS']) # experiment_parser.description = self.__doc__ args = experiment_parser.parse_args() # Run the debug method if implemented and # debug flag is passed. if args.debug is not None: # Update debug parameters print('Running as debug') self.debug() # update the parameter values from either # a parameter string or from a defaults.yml file. if args.update is not None: for update in args.update: # Case (1) update parameters from yaml string try: self.from_yml(update, copy=True) except IncompatibleYmlException: try: # Update passed parameters import ruamel.yaml overrides = ruamel.yaml.load(update, Loader=ruamel.yaml.Loader) print(f'Updating parameters {overrides}') self.update(overrides) except: print(f'ERROR: {update} is either not a valid yaml string or ' f'is not a path to a defaults.yml or params.yml file') exit() # Then execute the experiment if args.execute is not None: # (1) link experiment from pre-existing params.yml file if args.execute != os.devnull: if os.path.isfile(args.execute): try: self.from_yml(args.execute) if not self.status == 'registered': raise IncompatibleYmlException except IncompatibleYmlException: print(f'ERROR: File {args.execute} is not a valid params.yml file') # (2) link experiment to a repository else: purpose = '' if args.purpose is None and CONFIG.prompt: purpose = input('Enter the purpose of the experiment: ') self.register(args.execute, restart=args.restart, purpose=purpose) else: self.detach() return args
[docs] def main(self, args=None): """Take the command line args and execute the experiment (see ``parse_args`` for more information). In order to expose the command line interface:: if __name__ == '__main__': exp = AnExperiment().main() Note that for backwards compatibility it is also possible to pass ``args`` as an argument to ``main``. This allows the experiment to be run from the commandline as:: if __name__ == '__main__': exp = AnExperiment() args = exp.parse_args() exp.main(args) """ if args is None: args = self.parse_args() if all(a is None for a in (args.debug, args.execute, args.to_root, args.txt, args.update)): print(self.__doc__) print('\nFor more help use --help.') # Generate experiment root if args.to_root is not None: print(f'Generating experiment root at {args.to_root}') self.to_root(args.to_root) # Run the experiment if args.execute is not None: assert self.status in [REGISTERED, DETACHED, TIMEOUT, REQUEUE],\ f'Experiment must be registered before execution but got {self.status}' # Configure standard out to print to the registered directory as well as # the original standard out if args.txt: self.stdout_to_txt() # Execute experiment try: self.__call__() except NotImplementedError: print(f'WARNING: The --execute flag was passed but run is not implemented for {self.__class__}') pass
[docs] def stdout_to_txt(self): """Configure stdout to also log to a text file in the experiment directory""" sys.stdout = MultiOut(sys.__stdout__, open(os.path.join(self.directory, 'out.txt'), 'a+')) sys.stderr = MultiOut(sys.__stderr__, open(os.path.join(self.directory, 'out.txt'), 'a+'))
def __repr__(self): """Pretty print the experiment""" # params = {k: v for k, v in self.__dict__.items() if k[0] != '_'} helps = self.get_param_helps() base_params = {k[1:]: v for k, v in self.__dict__.items() if k in self._specials} params = {k: getattr(self, k) for k in helps if not k.startswith('_')} lines = recursive_print_lines(base_params) lines += ['parameters:'] lines += [' ' + f'{k}: {v}' for k, v in params.items()] return '\n'.join(lines)