"""Base backend interface."""
import os
import stat
import sys
[docs]class JobSourceFile(object):
"""Describes a source code file for a job.
Parameters
----------
path : str
Path to write the source file to.
task : Task
Task the source file corresponds to.
job_code : str
Job specific code to execute.
Attributes
----------
written : bool
Whether the source file has been written.
full_code : str
The full job code including non job-specific parts that are used for
every job.
"""
def __init__(self, path, task, job_code):
self.path = path
self.task = task
self.job_code = job_code
self.written = False
[docs] def write(self):
"""Write the job code to the file *self.path*."""
with open(self.path, 'w') as f:
f.write(self.full_code)
fd = f.fileno()
os.fchmod(fd, os.fstat(fd).st_mode | stat.S_IXUSR)
self.written = True
@property
def full_code(self):
return '''#!{python}
try:
import faulthandler
faulthandler.enable()
except:
pass
import time
print("")
print("----------------------------------------------------------------------")
print("Job started ({{}})".format(time.strftime('%a, %d %b %Y %H:%M:%S, %Z')))
print("----------------------------------------------------------------------")
import os
os.chdir({taskdir!r})
from psyrun.tasks import TaskDef
task = TaskDef({taskpath!r})
{code}
'''.format(
python=self.task.python,
path=sys.path,
taskdir=os.path.abspath(os.path.dirname(self.task.path)),
taskpath=os.path.abspath(self.task.path), code=self.job_code)
[docs]class Backend(object):
"""Abstract base class for processing backends.
Processing backends determine how work is split across jobs.
Deriving classes are supposed to implement `create_job` and `get_missing`.
Parameters
----------
task : `TaskDef`
The task to create processing jobs for.
Attributes
----------
task : `TaskDef`
The task to create processing jobs for.
workdir : str
Directory in which supporting files for processing the task are stored.
"""
def __init__(self, task):
super(Backend, self).__init__()
self.task = task
self.workdir = os.path.join(task.workdir, task.name)
if not os.path.exists(self.workdir):
os.makedirs(self.workdir)
[docs] def submit_code(self, code, name, depends_on=None, args=None):
"""Submits some code to execute to the task scheduler.
Parameters
----------
code : str
Code to execute in job.
name : str
Job name.
depends_on : sequence
Job IDs that have to finish before the submitted code can be
executed.
args : sequence
Additional arguments to pass to the job.
Returns
-------
dict
Contains the id of the submitted job under the key ``'id'``.
"""
codefile = os.path.join(self.workdir, name + '.py')
return self.submit_file(JobSourceFile(codefile, self.task, code),
name, depends_on=depends_on, args=args)
[docs] def submit_file(self, job_source_file, name, depends_on=None, args=None):
"""Submits a source file to execute to the task scheduler.
Parameters
----------
job_source_file : `JobSourceFile`
Source file to execute in job.
name: str
Job name.
depends_on: sequence
Job IDs that have to finish before the submitted code can be
executed.
args : sequence
Additional arguments to pass to the job.
Returns
-------
dict
Contains the id of the submitted job under the key ``'id'``.
"""
if args is None:
args = []
self._prepare_job_submission(job_source_file, name)
output_filename = os.path.join(self.workdir, name + '.log')
return self.task.scheduler.submit(
[job_source_file.path] + args, output_filename,
name, depends_on, self.task.scheduler_args)
[docs] def submit_array(
self, n, job_source_file, name, depends_on=None, args=None):
"""Submits a source file to execute to the task scheduler.
Parameters
----------
job_source_file : `JobSourceFile`
Source file to execute in job.
name: str
Job name.
depends_on: sequence
Job IDs that have to finish before the submitted code can be
executed.
args : sequence
Additional arguments to pass to the job.
Returns
-------
dict
Contains the id of the submitted job under the key ``'id'``.
"""
if args is None:
args = []
self._prepare_job_submission(job_source_file, name)
output_filename = os.path.join(self.workdir, name + ':%a.log')
return self.task.scheduler.submit_array(
n, [job_source_file.path] + args, output_filename,
name, depends_on, self.task.scheduler_args)
def _prepare_job_submission(self, job_source_file, name):
if not job_source_file.written:
job_source_file.write()
for job in self.task.scheduler.get_jobs():
status = self.task.scheduler.get_status(job)
if status is not None and name == status.name:
self.task.scheduler.kill(job)
[docs] def create_job(self, cont=False):
"""Create the job tree to process given task.
Parameters
----------
cont : bool, optional
By default old results will be discarded, but when this option is
set to True, old results will be kept and merged with the new
results.
"""
raise NotImplementedError()
[docs] def get_missing(self):
"""Returns a `ParameterSpace` with missing parameter assignments.
Missing paramaters assignments are parameter assignments requested by
the task definition, but that have not been evaluated yet.
"""
raise NotImplementedError
[docs] def get_queued(self):
"""Returns parameter sets that are still in queue to be processed.
May return ``None`` if this is not supported by the backend.
"""
raise NotImplementedError()
[docs] def get_failed(self):
"""Returns a list of failed jobs.
May return ``None`` if this is not supported by the backend.
"""
raise NotImplementedError()