Source code for psyrun.backend.distribute

"""Backend for distributed parameter evaluation."""

import os
import shutil

from psyrun.backend.base import Backend, JobSourceFile
from psyrun.jobs import Job, JobChain, JobArray
from psyrun.pspace import dict_concat, missing, Param
from psyrun.mapper import map_pspace_hdd_backed
from psyrun.store import DefaultStore
from psyrun.utils.doc import inherit_docs


[docs]@inherit_docs class DistributeBackend(Backend): """Create subtasks for distributed parameter evaluation. This will create one tasks that splits the parameter space in a number of equal batches (at most *max_jobs*, but with at least *min_items* for each batch). After processing all batches the results will be merged into a single file. This is similar to map-reduce processing. Parameters ---------- task : `TaskDef` Task definition to create subtasks for. """ @property def resultfile(self): """File in which the results will be stored.""" if self.task.resultfile: return self.task.resultfile else: return os.path.join( self.workdir, 'result' + self.task.store.ext) @property def pspace_file(self): """File that will store the input parameters space.""" return os.path.join(self.workdir, 'pspace' + self.task.store.ext) def _try_mv_to_out(self, filename): try: os.rename( os.path.join(self.workdir, filename), os.path.join(self.workdir, 'out', 'pre' + self.task.store.ext)) return True except OSError: return False
[docs] def create_job(self, cont=False): if cont: outdir = os.path.join(self.workdir, 'out') if not self._try_mv_to_out('result' + self.task.store.ext): Splitter.merge( outdir, os.path.join(outdir, 'pre' + self.task.store.ext)) for filename in os.listdir(outdir): if not filename.startswith('pre'): os.remove(os.path.join(outdir, filename)) pspace = self.get_missing() try: indir = os.path.join(self.workdir, 'in') shutil.rmtree(indir) except OSError: pass else: pspace = self.task.pspace self.task.store.save(self.pspace_file, pspace.build()) splitter = Splitter( self.workdir, pspace, self.task.max_jobs, self.task.min_items, store=self.task.store) split = self.create_split_job(splitter) process = self.create_process_job(splitter) merge = self.create_merge_job(splitter) return JobChain(self.task.name, [split, process, merge])
[docs] def create_split_job(self, splitter): code = ''' from psyrun.backend.distribute import Splitter from psyrun.pspace import Param pspace = Param(**task.store.load({pspace!r})) Splitter( {workdir!r}, pspace, {max_jobs!r}, {min_items!r}, store=task.store).split() '''.format( pspace=self.pspace_file, workdir=splitter.workdir, max_jobs=self.task.max_jobs, min_items=self.task.min_items) file_dep = [os.path.join(os.path.dirname(self.task.path), f) for f in self.task.file_dep] return Job( 'split', self.submit_code, {'code': code}, [self.task.path] + file_dep, [f for f, _ in splitter.iter_in_out_files()])
[docs] def create_process_job(self, splitter): source_file = JobSourceFile( os.path.join(self.workdir, self.task.name + ':process.py'), self.task, ''' import sys from psyrun.backend.distribute import Worker def execute(*args, **kwargs): return task.execute(*args, **kwargs) if __name__ == '__main__': Worker( int(sys.argv[3]), store=task.store, exclude_from_result=task.exclude_from_result).start( execute, sys.argv[1], sys.argv[2], pool_size={pool_size}, setup_fn=task.setup) '''.format(pool_size=self.task.pool_size)) infile = os.path.join(splitter.indir, '%a' + splitter.store.ext) outfile = os.path.join(splitter.outdir, '%a' + splitter.store.ext) return JobArray( splitter.n_splits, 'process', self.submit_array, self.submit_file, {'job_source_file': source_file, 'args': [infile, outfile, '%a']}, [infile], [outfile])
[docs] def create_merge_job(self, splitter): code = ''' from psyrun.backend.distribute import Splitter Splitter.merge({outdir!r}, {filename!r}, append=False, store=task.store) '''.format(outdir=splitter.outdir, filename=self.resultfile) return Job( 'merge', self.submit_code, {'code': code}, [f for _, f in splitter.iter_in_out_files()], [self.resultfile])
[docs] def get_missing(self): pspace = self.task.pspace try: missing_items = missing( pspace, Param(**self.task.store.load(self.resultfile))) except (IOError, OSError): missing_items = pspace try: for filename in os.listdir(os.path.join(self.workdir, 'out')): if os.path.splitext(filename)[1] != self.task.store.ext: continue outfile = os.path.join(self.workdir, 'out', filename) try: missing_items = missing( missing_items, Param(**self.task.store.load(outfile))) except (IOError, OSError): pass except (IOError, OSError): pass return missing_items
[docs] def get_queued(self): scheduler = self.task.scheduler status = [scheduler.get_status(j) for j in scheduler.get_jobs()] for s in status: if s.status != 'D' and self.task.name + ':split' in s.name: return Param(**self.task.store.load(self.pspace_file)) queued = Param() for s in status: if s.status != 'D' and self.task.name + ':process' in s.name: num = s.name.rsplit(':', 1)[-1] filename = os.path.join( self.workdir, 'in', num + self.task.store.ext) queued += Param(**self.task.store.load(filename)) return queued
[docs] def get_failed(self): scheduler = self.task.scheduler status = (scheduler.get_status(j) for j in scheduler.get_jobs()) queued = [s.name for s in status if s.status != 'D'] indir = os.path.join(self.workdir, 'in') if (not os.path.exists(indir) or self.task.name + ':split' in queued): return [] elif not os.path.exists(indir) or len(os.listdir(indir)) == 0: return [self.task.name + ':split'] failed = [] for filename in os.listdir(indir): if not os.path.exists(os.path.join(self.workdir, 'out', filename)): jobname = self.task.name + ':process:' + os.path.splitext( filename)[0] if jobname not in queued: failed.append(jobname) if len(failed) == 0: if not os.path.exists(self.resultfile): return [self.task.name + ':merge'] return failed
[docs]class Splitter(object): """Split a parameter space into multiple input files and merge results after processing. Parameters ---------- workdir : str Working directory to create input files in and read output files from. pspace : `ParameterSpace` Parameter space to split up. max_splits : int, optional Maximum number of splits to perform. min_items : int, optional Minimum number of parameter sets in each split. store : `Store`, optional Input/output backend. Attributes ---------- indir : str Directory to store input files. max_splits : int Maximum number of splits to perform. min_items : int Minimum number of parameter sets in each split. outdir : str Directory to store output files. pspace : `ParameterSpace` Parameter space to split up. store : `Store` Input/output backend. workdir : str Working directory to create input files in and read output files from. """ def __init__( self, workdir, pspace, max_splits=64, min_items=4, store=DefaultStore()): self.workdir = workdir self.indir = self._get_indir(workdir) self.outdir = self._get_outdir(workdir) if not os.path.exists(self.indir): os.makedirs(self.indir) if not os.path.exists(self.outdir): os.makedirs(self.outdir) self.pspace = pspace self.max_splits = max_splits self.min_items = min_items self.store = store @property def n_splits(self): """Number of total splits that will be generated.""" n_splits = (len(self.pspace) - 1) // self.min_items + 1 if self.max_splits is not None: n_splits = min(self.max_splits, n_splits) return n_splits
[docs] def split(self): """Perform splitting of parameters space and save input files for processing.""" items_remaining = len(self.pspace) param_iter = self.pspace.iterate() for i, filename in enumerate(self._iter_filenames()): split_size = self.min_items if self.max_splits is not None: split_size = max( split_size, items_remaining // (self.max_splits - i)) items_remaining -= split_size block = dict_concat( [row for row in self._iter_n(param_iter, split_size)]) self.store.save(os.path.join(self.indir, filename), block)
[docs] @classmethod def merge(cls, outdir, merged_filename, append=True, store=DefaultStore()): """Merge processed files together. Parameters ---------- outdir : str Directory with the output files. merged_filename : str Filename of file to save with the merged results. append : bool, optional If True the merged data will be appended, otherwise the file will be overwritten with the merged data. store : `Store`, optional Input/output backend. """ if not append: store.save(merged_filename, {}) for filename in os.listdir(outdir): if os.path.splitext(filename)[1] != store.ext: continue infile = os.path.join(outdir, filename) store.append(merged_filename, store.load(infile))
[docs] def iter_in_out_files(self): """Return generator returning tuples of corresponding input and output filenames.""" return ((os.path.join(self.indir, f), os.path.join(self.outdir, f)) for f in self._iter_filenames())
def _iter_filenames(self): return (str(i) + self.store.ext for i in range(self.n_splits)) @staticmethod def _iter_n(it, n): for _ in range(n): yield next(it) @classmethod def _get_indir(cls, workdir): return os.path.join(workdir, 'in') @classmethod def _get_outdir(cls, workdir): return os.path.join(workdir, 'out')
[docs]class Worker(object): """Maps a function to the parameter space loaded from a file and writes the result to an output file. Parameters ---------- proc_id : int Worker ID. store : `Store`, optional Input/output backend. exclude_from_result : sequence, optional Keys of items to exclude from the result. Attributes ---------- proc_id : int Worker ID. store : `Store` Input/output backend. exclude_from_result : sequence, optional Keys of items to exclude from the result. """ def __init__( self, proc_id, store=DefaultStore(), exclude_from_result=None): self.proc_id = proc_id self.store = store if exclude_from_result is None: exclude_from_result = [] self.exclude_from_result = exclude_from_result
[docs] def start(self, fn, infile, outfile, pool_size=1, setup_fn=None): """Start processing a parameter space. Parameters ---------- fn : function Function to evaluate on the parameter space. infile : str Parameter space input filename. outfile : str Output filename for the results. pool_size : int, optional Number of parallel processes. setup_fn : function, optional Setup function, called with the worker ID as argument before processing of parameter sets begins. May return a dictionary of parameters added to the invocation of *fn*. """ add_params = None if setup_fn is not None: add_params = setup_fn(self.proc_id) if add_params is None: add_params = {} pspace = Param(**self.store.load(infile)) out_root, out_ext = os.path.splitext(outfile) map_pspace_hdd_backed( fn, Param(**add_params) * pspace, out_root + '.part' + out_ext, store=self.store, return_data=False, pool_size=pool_size, exclude=self.exclude_from_result) os.rename(out_root + '.part' + out_ext, outfile)