Source code for psyrun.tasks

"""Psyrun's task loading API."""

from __future__ import print_function

import os
import os.path
import re
import sys
import traceback
import warnings

from psyrun.backend import DefaultBackend
from psyrun.pspace import Param
from psyrun.store import DefaultStore
from psyrun.scheduler import ImmediateRun


[docs]class TaskDef(object): """Task defined by a Python file. Parameters ---------- path : str Python file to load as task. conf : `Config` Default values for task parameters. Attributes ---------- TASK_PATTERN : re.RegexObject Regular expression to match task filenames. """ TASK_PATTERN = re.compile(r'^task_(.*)$') def __init__(self, path, conf=None): if conf is None: taskdir = os.path.dirname(path) conffile = os.path.join(taskdir, 'psy-conf.py') if os.path.exists(conffile): conf = Config.load_from_file(conffile) else: conf = Config() _set_public_attrs_from_dict( self, _load_pyfile(path), only_existing=False) self.path = path if not hasattr(self, 'name'): prefixed_name, _ = os.path.splitext(os.path.basename(path)) m = self.TASK_PATTERN.match(prefixed_name) if m: self.name = m.group(1) else: self.name = prefixed_name conf.apply_as_default(self)
def _load_pyfile(filename): source = '' with open(filename, 'r') as f: source += f.read() code = compile(source, filename, 'exec') loaded = {'__file__': filename} exec(code, loaded) # pylint: disable=exec-used return loaded def _set_public_attrs_from_dict(obj, d, only_existing=True): for k, v in d.items(): if not k.startswith('_') and (not only_existing or hasattr(obj, k)): setattr(obj, k, v)
[docs]class Config(object): # pylint: disable=too-many-instance-attributes """Task configuration. Attributes ---------- backend : `Backend`, default: `DistributeBackend` The processing backend which determines how work is distributed across jobs. exclude_from_result : sequence of str, default: ``[]`` Keys of items to exclude from result. This can be useful if parameters or parts of the result cannot be saved to disk. file_dep : sequence of str, default: ``[]`` Additional files the task depends on. max_jobs : int, default: 100 Maximum number of jobs to start. With less jobs each job has to process more parameter assignments. It depends on the scheduler and backend used to which degree these will run in parallel. min_items : int, default: 1 Minimum number of parameter assignment to evaluate per job. If a single assignment is fast to evaluate, increasing this number can improve performance because Psyrun will not start a new job for each parameter assignment which can save some overhead. overwrite_dirty : bool, default: True Whether to overwrite dirty workdirs without a warning. pool_size : int, default: 1 Number of parallel threads or processes each job will run. This allows for parallelization without a proper scheduler (e.g. when using `psyrun.scheduler.ImmediateRun`). pspace : `ParameterSpace`, required Parameter space to evaluate. python : str, default: ``sys.executable`` Path to Python interpreter to use. resultfile : str or None, default: None Path to save the results of the finished task at. If None, this defaults to ``'result.<ext>'`` in the *workdir*. scheduler : `Scheduler`, default: `ImmediateRun` Scheduler to use to submit individual jobs. scheduler_args : dict, default: ``{}`` Additional scheduler arguments. See the documentation of the scheduler for details. setup : function, default: None Function to call after starting a worker process before any parameter sets are processed. The function gets the ID of the worker process (usually starting at 0 and incremented by one for each process) as sole argument. It may return a dictionary of additional arguments to pass to the processing function. The setup function can be used to initialize process wide resources. store : `Store`, default: `PickleStore` Input/output backend. workdir : str, default: ``'psy-work'`` Working directory to store results and supporting data to process the task. """ __slots__ = [ 'backend', 'exclude_from_result', 'file_dep', 'max_jobs', 'min_items', 'pool_size', 'pspace', 'overwrite_dirty', 'python', 'resultfile', 'scheduler', 'scheduler_args', 'setup', 'store', 'workdir'] def __init__(self): self.backend = DefaultBackend self.exclude_from_result = [] self.file_dep = [] self.max_jobs = 100 self.min_items = 1 self.overwrite_dirty = True self.pool_size = 1 self.pspace = Param() self.python = sys.executable self.resultfile = None self.scheduler = ImmediateRun() self.scheduler_args = dict() self.setup = None self.store = DefaultStore() self.workdir = os.path.abspath('psy-work')
[docs] @classmethod def load_from_file(cls, filename): """Load the configuration values from a Python file. Parameters ---------- filename : str Python file to load. """ conf = cls() loaded_conf = _load_pyfile(filename) _set_public_attrs_from_dict(conf, loaded_conf) return conf
[docs] def apply_as_default(self, task): """Copies the attributes to a different object given they are not set in that object. Parameters ---------- task : obj Object to copy the attributes to. """ for attr in self.__slots__: if not hasattr(task, attr): setattr(task, attr, getattr(self, attr))
[docs]class PackageLoader(object): """Loads tasks from Python files. Filenames have to match the regular expression defined in `TaskDef.TASK_PATTERN`. See `Config` for supported module level variables in the task definition. It is possible to set these variables for all tasks by setting them in the file ``psy-conf.py`` in the *taskdir*. Parameters ---------- taskdir : str Directory to load task files from. Attributes ---------- taskdir : str Directory to load task files from. conf : `Config` Default values for module level task variables. """ def __init__(self, taskdir): super(PackageLoader, self).__init__() self.taskdir = taskdir conffile = os.path.join(self.taskdir, 'psy-conf.py') if os.path.exists(conffile): self.conf = Config.load_from_file(conffile) else: self.conf = Config()
[docs] def load_task_defs(self): """Load task definitions. Returns ------- list of `TaskDef` Task definitions. """ task_defs = [] for filename in os.listdir(self.taskdir): root, ext = os.path.splitext(filename) if TaskDef.TASK_PATTERN.match(root) and ext == '.py': path = os.path.join(self.taskdir, filename) try: task_defs.append(TaskDef(path, self.conf)) except Exception: # pylint: disable=broad-except traceback.print_exc() warnings.warn("Task {path!r} could not be loaded.".format( path=path)) return task_defs