API Documentation

Core interface

These modules constitute the core API of Psyrun and most likely to be used in other programs.

backend module

backend.base

Base backend interface.

class psyrun.backend.base.Backend(task)[source]

Abstract base class for processing backends.

Processing backends determine how work is split across jobs.

Deriving classes are supposed to implement create_job and get_missing.

Parameters:
task : TaskDef

The task to create processing jobs for.

Attributes:
task : TaskDef

The task to create processing jobs for.

workdir : str

Directory in which supporting files for processing the task are stored.

create_job(cont=False)[source]

Create the job tree to process given task.

Parameters:
cont : bool, optional

By default old results will be discarded, but when this option is set to True, old results will be kept and merged with the new results.

get_failed()[source]

Returns a list of failed jobs.

May return None if this is not supported by the backend.

get_missing()[source]

Returns a ParameterSpace with missing parameter assignments.

Missing paramaters assignments are parameter assignments requested by the task definition, but that have not been evaluated yet.

get_queued()[source]

Returns parameter sets that are still in queue to be processed.

May return None if this is not supported by the backend.

submit_array(n, job_source_file, name, depends_on=None, args=None)[source]

Submits a source file to execute to the task scheduler.

Parameters:
job_source_file : JobSourceFile

Source file to execute in job.

name: str

Job name.

depends_on: sequence

Job IDs that have to finish before the submitted code can be executed.

args : sequence

Additional arguments to pass to the job.

Returns:
dict

Contains the id of the submitted job under the key 'id'.

submit_code(code, name, depends_on=None, args=None)[source]

Submits some code to execute to the task scheduler.

Parameters:
code : str

Code to execute in job.

name : str

Job name.

depends_on : sequence

Job IDs that have to finish before the submitted code can be executed.

args : sequence

Additional arguments to pass to the job.

Returns:
dict

Contains the id of the submitted job under the key 'id'.

submit_file(job_source_file, name, depends_on=None, args=None)[source]

Submits a source file to execute to the task scheduler.

Parameters:
job_source_file : JobSourceFile

Source file to execute in job.

name: str

Job name.

depends_on: sequence

Job IDs that have to finish before the submitted code can be executed.

args : sequence

Additional arguments to pass to the job.

Returns:
dict

Contains the id of the submitted job under the key 'id'.

class psyrun.backend.base.JobSourceFile(path, task, job_code)[source]

Describes a source code file for a job.

Parameters:
path : str

Path to write the source file to.

task : Task

Task the source file corresponds to.

job_code : str

Job specific code to execute.

Attributes:
written : bool

Whether the source file has been written.

full_code : str

The full job code including non job-specific parts that are used for every job.

full_code
write()[source]

Write the job code to the file self.path.

backend.distribute

Backend for distributed parameter evaluation.

class psyrun.backend.distribute.DistributeBackend(task)[source]

Create subtasks for distributed parameter evaluation.

This will create one tasks that splits the parameter space in a number of equal batches (at most max_jobs, but with at least min_items for each batch). After processing all batches the results will be merged into a single file.

This is similar to map-reduce processing.

Parameters:
task : TaskDef

Task definition to create subtasks for.

create_job(cont=False)[source]

Create the job tree to process given task.

Parameters:
cont : bool, optional

By default old results will be discarded, but when this option is set to True, old results will be kept and merged with the new results.

create_merge_job(splitter)[source]
create_process_job(splitter)[source]
create_split_job(splitter)[source]
get_failed()[source]

Returns a list of failed jobs.

May return None if this is not supported by the backend.

get_missing()[source]

Returns a ParameterSpace with missing parameter assignments.

Missing paramaters assignments are parameter assignments requested by the task definition, but that have not been evaluated yet.

get_queued()[source]

Returns parameter sets that are still in queue to be processed.

May return None if this is not supported by the backend.

pspace_file

File that will store the input parameters space.

resultfile

File in which the results will be stored.

class psyrun.backend.distribute.Splitter(workdir, pspace, max_splits=64, min_items=4, store=<psyrun.store.pickle.PickleStore object>)[source]

Split a parameter space into multiple input files and merge results after processing.

Parameters:
workdir : str

Working directory to create input files in and read output files from.

pspace : ParameterSpace

Parameter space to split up.

max_splits : int, optional

Maximum number of splits to perform.

min_items : int, optional

Minimum number of parameter sets in each split.

store : Store, optional

Input/output backend.

Attributes:
indir : str

Directory to store input files.

max_splits : int

Maximum number of splits to perform.

min_items : int

Minimum number of parameter sets in each split.

outdir : str

Directory to store output files.

pspace : ParameterSpace

Parameter space to split up.

store : Store

Input/output backend.

workdir : str

Working directory to create input files in and read output files from.

iter_in_out_files()[source]

Return generator returning tuples of corresponding input and output filenames.

classmethod merge(outdir, merged_filename, append=True, store=<psyrun.store.pickle.PickleStore object>)[source]

Merge processed files together.

Parameters:
outdir : str

Directory with the output files.

merged_filename : str

Filename of file to save with the merged results.

append : bool, optional

If True the merged data will be appended, otherwise the file will be overwritten with the merged data.

store : Store, optional

Input/output backend.

n_splits

Number of total splits that will be generated.

split()[source]

Perform splitting of parameters space and save input files for processing.

class psyrun.backend.distribute.Worker(proc_id, store=<psyrun.store.pickle.PickleStore object>, exclude_from_result=None)[source]

Maps a function to the parameter space loaded from a file and writes the result to an output file.

Parameters:
proc_id : int

Worker ID.

store : Store, optional

Input/output backend.

exclude_from_result : sequence, optional

Keys of items to exclude from the result.

Attributes:
proc_id : int

Worker ID.

store : Store

Input/output backend.

exclude_from_result : sequence, optional

Keys of items to exclude from the result.

start(fn, infile, outfile, pool_size=1, setup_fn=None)[source]

Start processing a parameter space.

Parameters:
fn : function

Function to evaluate on the parameter space.

infile : str

Parameter space input filename.

outfile : str

Output filename for the results.

pool_size : int, optional

Number of parallel processes.

setup_fn : function, optional

Setup function, called with the worker ID as argument before processing of parameter sets begins. May return a dictionary of parameters added to the invocation of fn.

backend.load_balancing

Load balancing backend.

class psyrun.backend.load_balancing.LoadBalancingBackend(task)[source]

Create subtasks for load balanced parameter evaluation.

This will create max_jobs worker jobs that will fetch parameter assignments from a queue. Thus, all worker jobs should be busy all of the time.

This backend is useful if individual parameters assignments can vary to a large degree in their processing time. It is recommended to use a Store that supports efficient fetching of a single row from a file.

Parameters:
task : TaskDef

Task definition to create subtasks for.

create_finalize_job()[source]
create_job(cont=False)[source]

Create the job tree to process given task.

Parameters:
cont : bool, optional

By default old results will be discarded, but when this option is set to True, old results will be kept and merged with the new results.

create_process_job()[source]
create_pspace_job(cont)[source]
get_failed()[source]

Returns a list of failed jobs.

May return None if this is not supported by the backend.

get_missing()[source]

Returns a ParameterSpace with missing parameter assignments.

Missing paramaters assignments are parameter assignments requested by the task definition, but that have not been evaluated yet.

get_queued()[source]

Returns parameter sets that are still in queue to be processed.

May return None if this is not supported by the backend.

infile

File from which input parameter assignments are fetched.

partial_resultfile

File results are appended to while processing is in progress.

resultfile

Final result file.

statusfile

File that stores the processing status.

class psyrun.backend.load_balancing.LoadBalancingWorker(proc_id, infile, outfile, statusfile, store=<psyrun.store.pickle.PickleStore object>, exclude_from_result=None)[source]

Maps a function to the parameter space supporting other LoadBalancingWorkers processing the same input file at the same time.

Parameters:
proc_id : int

Worker ID.

infile : str

Filename of the file with the input parameters space.

outfile : str

Filename of the file to write the results to.

statusfile : str

Filename of the file to track the processing progress.

store : Store, optional

Input/output backend.

exclude_from_result : sequence, optional

Keys of items to exclude from the result.

classmethod create_statusfile(statusfile)[source]

Creates the status file required by all load balancing workers.

Parameters:
statusfile : str

Filename of the status file.

get_next_ix()[source]

Get the index of the next parameter assignment to process.

get_next_param_set()[source]

Load the next parameter assignment to process.

save_data(data)[source]

Appends data to the outfile.

Uses a lock on the file to support concurrent access.

start(fn, setup_fn=None)[source]

Start processing a parameter space.

A status file needs to be created before invoking this function by calling create_statusfile.

Parameters:
fn : function

Function to evaluate on the parameter space.

setup_fn : function, optional

Setup function, called with the worker ID as argument before processing of parameter sets begins. May return a dictionary of parameters added to the invocation of fn.

exceptions

Psyrun specific exceptions and warnings.

exception psyrun.exceptions.IneffectiveExcludeWarning(key)[source]

Warning issued when a key in exclude_from_result was not found in the result.

exception psyrun.exceptions.JobsRunningWarning(task_name)[source]

Warning issued when jobs for a task are still running.

exception psyrun.exceptions.TaskWarning(task_name, *args, **kwargs)[source]

General warning related to task processing.

TaskWarnings will always be shown by default.

exception psyrun.exceptions.TaskWorkdirDirtyWarning(task_name)[source]

Warning issued when the workdir is dirty and would be overwritten.

mapper module

Map functions onto parameter spaces.

psyrun.mapper.get_result(fn, params, exclude=None)[source]

Evaluates a function with given parameters.

Evaluates fn with the parameters param and returns a dictionary with the input parameters and returned output values.

Parameters:
fn : function

Function to evaluate. Has to return a dictionary.

params : dict

Parameters passed to fn as keyword arguments.

exclude : sequence, optional

Keys of elements to exclude from the results dictionary.

Returns:
dict

Returns params updated with the return value of fn.

Examples

>>> def fn(x, is_result):
...     return {'y': x * x, 'is_result': 1}
>>>
>>> from pprint import pprint
>>> pprint(get_result(fn, {'x': 4, 'is_result': 0}))
{'is_result': 1, 'x': 4, 'y': 16}
psyrun.mapper.map_pspace(fn, pspace, exclude=None)[source]

Maps a function to parameter space values.

Parameters:
fn : function

Function to evaluate on parameter space. Has to return a dictionary.

pspace : ParameterSpace

Parameter space providing parameter values to evaluate function on.

exclude : sequence, optional

Keys of elements to exclude from the results dictionary.

Returns:
dict

Dictionary with the input parameter values and the function return values.

Examples

>>> def fn(x):
...     return {'y': x * x}
>>>
>>> from pprint import pprint
>>> from psyrun import Param
>>> pprint(map_pspace(fn, Param(x=[1, 2])))
{'x': [1, 2], 'y': [1, 4]}
psyrun.mapper.map_pspace_hdd_backed(fn, pspace, filename, store, return_data=True, pool_size=1, exclude=None)[source]

Maps a function to parameter space values while storing produced data.

Data is stored progressively. Thus, if the program crashes, not all data will be lost.

Parameters:
fn : function

Function to evaluate on parameter space. Has to return a dictionary.

pspace : ParameterSpace

Parameter space providing parameter values to evaluate function on.

filename : str

Filename of file to store data to.

store : Store

Store to save data with.

return_data : bool, optional

Whether to return the resulting data after mapping the function. This will read all produced data from the disk.

exclude : sequence, optional

Keys of elements to exclude from the results dictionary.

Returns:
None or dict

Dictionary with the input parameter values and the function return values if requested.

psyrun.mapper.map_pspace_parallel(fn, pspace, n_jobs=-1, backend='multiprocessing', exclude=None)[source]

Maps a function to parameter space values in parallel.

Requires joblib.

Parameters:
fn : function

Function to evaluate on parameter space. Has to return a dictionary.

pspace : ParameterSpace

Parameter space providing parameter values to evaluate function on.

n_jobs : int, optional

Number of parallel jobs. Set to -1 to automatically determine.

backend : str, optional

Backend to use. See joblib documentation for details.

exclude : sequence, optional

Keys of elements to exclude from the results dictionary.

Returns:
dict

Dictionary with the input parameter values and the function return values.

Examples

>>> from pprint import pprint
>>> from psyrun import Param
>>> from psyrun.utils.example import square
>>>
>>> pprint(map_pspace_parallel(square, Param(x=[1, 2])))
{'x': [1, 2], 'y': [1, 4]}

pspace module

Construction of parameter spaces.

exception psyrun.pspace.AmbiguousOperationError[source]

Attempt to combine two parameter spaces in an ambiguous way.

class psyrun.pspace.Difference(minuend, subtrahend)[source]

Implements the difference of two parameter spaces.

Parameters:
minuend : ParameterSpace

Minuend (left operand).

subtrahend : ParameterSpace

Subtrahend (right operand).

Examples

>>> from pprint import pprint
>>> pprint(Difference(Param(a=[1, 2], b=[1, 2]), Param(a=[1])).build())
{'a': [2], 'b': [2]}
Attributes:
minuend : ParameterSpace

Minuend (left operand).

subtrahend : ParameterSpace

Subtrahend (right operand).

iterate()[source]

Iterates over the parameter assignments in the parameter space.

class psyrun.pspace.Param(**params)[source]

Constructs a simple parameter space from constructor arguments.

Supports addition, subtraction, and multiplication operators to construct more complicated parameter spaces.

Parameters:
params :

Each keyword argument defines a parameter with a sequence of parameter values for it. The length of all lists has to be equal. If a scalar instead of a sequence is passed in, it will be replicated to match the length of the other parameters. At least one keyword argument has to be a sequence.

get_param(key, i)[source]

Return the i-th parameter assignment.

Parameters:
key : str

Parameter name of parameter to retrieve.

i : int

Index of assigned value to return.

iterate()[source]

Iterates over the parameter assignments in the parameter space.

class psyrun.pspace.ParameterSpace(keys)[source]

Abstract base class for objects representing a parameter space.

Supports addition, subtraction, and multiplication operators to construct more complicated parameter spaces.

Deriving classes are supposed to implement the iterate and __len__ methods.

Parameters:
keys : sequence of strings

Parameter names.

build()[source]

Builds the parameter space into a dictionary of parameter lists.

Returns:
dict

A dictionary with the parameter names as keys and lists with the parameter values.

Examples

>>> from pprint import pprint
>>> pprint(Param(a=[1, 2], b=[1, 2]).build())
{'a': [1, 2], 'b': [1, 2]}
iterate()[source]

Iterates over the parameter assignments in the parameter space.

keys()[source]

Returns the parameter names.

class psyrun.pspace.Product(left, right)[source]

Implements the Cartesian product of two parameter spaces.

Parameters:
left : ParameterSpace

Left operand.

right : ParameterSpace

Right operand.

Examples

>>> from pprint import pprint
>>> pprint(Product(Param(a=[1, 2]), Param(b=[1, 2])).build())
{'a': [1, 1, 2, 2], 'b': [1, 2, 1, 2]}
Attributes:
left : ParameterSpace

Left operand.

right : ParameterSpace

Right operand.

iterate()[source]

Iterates over the parameter assignments in the parameter space.

class psyrun.pspace.Sum(left, right)[source]

Implements the concatenation of two parameter spaces.

Parameters:
left : ParameterSpace

Left operand.

right : ParameterSpace

Right operand.

Examples

>>> from pprint import pprint
>>> pprint(Sum(Param(a=[1]), Param(a=[2])).build())
{'a': [1, 2]}
Attributes:
left : ParameterSpace

Left operand.

right : ParameterSpace

Right operand.

iterate()[source]

Iterates over the parameter assignments in the parameter space.

psyrun.pspace.dict_concat(args)[source]

Concatenates elements with the same key in the passed dictionaries.

Parameters:
args : sequenece of dict

Dictionaries with sequences to concatenate.

Returns:
dict

The dictionary with the union of all the keys of the dictionaries passed in and elements with the same key concatenated. Missing elements will be None.

Examples

>>> from pprint import pprint
>>> pprint(dict_concat(({'a': 0, 'b': 0}, {'a': 1})))
{'a': [0, 1], 'b': [0, None]}
psyrun.pspace.missing(minuend, subtrahend)[source]

Return set of parameter assignments missing from another set.

This differs from a simple subtraction by allowing additional keys in the subtrahend, but no additional keys in the minuend.

Parameters:
minuend : ParameterSpace

Parameter space with all assignments.

subtrahend : Param

Parameter space with assignments to remove from the parameter space.

Returns:
`ParameterSpace`

The reduced parameter space.

Examples

>>> from pprint import pprint
>>> pprint(missing(Param(a=[1, 2, 3]), Param(a=[2])).build())
{'a': [1, 3]}

store module

store.base

Base store interface.

class psyrun.store.base.AutodetectStore[source]

Automatically selects the store based on the file extension.

append(filename, data)[source]

Append data to file.

When trying to append data to a non-existing file, a new file will be created. The backend may require that a file was created with this function to be able to append to it.

Parameters:
filename : str

Filename of file to append the data to.

data : dict

Dictionary with data to append.

classmethod get_concrete_store(filename)[source]
load(filename, row=None)[source]

Load data from a file.

Parameters:
filename : str

Filename of file to load data from.

row : int, optional

If given, only the row with this index will be loaded.

Returns:
dict

Loaded data.

registry = {'.pkl': <class 'psyrun.store.pickle.PickleStore'>}
save(filename, data)[source]

Save data to a file.

Parameters:
filename : str

Filename of file to save data to.

data : dict

Dictionary with data to store.

class psyrun.store.base.Store[source]

Defines the interface of stores.

Register implemented stores as entry points in the psyrun.stores groupn. For example, add the following to the setup call in your store’s setup.py for a store providing the .ext format:

entry_points={
    'psyrun.stores': ['.ext = pkg.name:ClassName'],
}
Attributes:
ext : str

Filename extension used by the store.

append(filename, data)[source]

Append data to file.

When trying to append data to a non-existing file, a new file will be created. The backend may require that a file was created with this function to be able to append to it.

Parameters:
filename : str

Filename of file to append the data to.

data : dict

Dictionary with data to append.

ext = ''
load(filename, row=None)[source]

Load data from a file.

Parameters:
filename : str

Filename of file to load data from.

row : int, optional

If given, only the row with this index will be loaded.

Returns:
dict

Loaded data.

save(filename, data)[source]

Save data to a file.

Parameters:
filename : str

Filename of file to save data to.

data : dict

Dictionary with data to store.

store.h5

store.npz

store.pickle

Store using pickle files.

class psyrun.store.pickle.PickleStore(protocol=4)[source]

Store using Python pickle .pkl files.

It supports all pickle-able data types and has no additional dependencies, but is not the most efficient store. Also, it has to load the complete file to append to it.

append(filename, data)[source]

Append data to file.

When trying to append data to a non-existing file, a new file will be created. The backend may require that a file was created with this function to be able to append to it.

Parameters:
filename : str

Filename of file to append the data to.

data : dict

Dictionary with data to append.

ext = '.pkl'
load(filename, row=None)[source]

Load data from a file.

Parameters:
filename : str

Filename of file to load data from.

row : int, optional

If given, only the row with this index will be loaded.

Returns:
dict

Loaded data.

save(filename, data)[source]

Save data to a file.

Parameters:
filename : str

Filename of file to save data to.

data : dict

Dictionary with data to store.

scheduler module

Job scheduler.

class psyrun.scheduler.ExternalScheduler[source]
KNOWN_ARGS = {}
class LongOption(name, conversion=<class 'str'>)[source]
build(value)[source]
class Option(name, conversion=<class 'str'>)[source]
build(value)[source]
class ShortOption(name, conversion=<class 'str'>)[source]
build(value)[source]
build_args(**kwargs)[source]
class psyrun.scheduler.ImmediateRun[source]

Runs jobs immediately on the local machine.

get_jobs()[source]

Returns an empty list.

get_status(jobid)[source]

Has no effect.

kill(jobid)[source]

Has no effect.

submit(args, output_filename, name=None, depends_on=None, scheduler_args=None)[source]

Submit a job.

Parameters:
args : sequence

The command and arguments to execute.

output_filename : str

File to write process output to.

name

Unused.

depends_on

Unused

scheduler_args

Unused.

Returns:
int

Job ID

submit_array(n, args, output_filename, name=None, depends_on=None, scheduler_args=None)[source]

Submit a job array.

If the scheduler does not support job arrays, this method should raise NotImplementedError.

Parameters:
n : int

Number of tasks to submit.

args : sequence

The command and arguments to execute. The string '%a' will be replaced with the task number in each argument.

output_filename : str

File to write process output to. The string '%a' will be replaced by the task number.

name : str, optional

Name of job.

depends_on : sequence of int, optional

IDs of jobs that need to finish first before the submitted job can be started.

scheduler_args : dict, optional

Additional arguments for the scheduler.

Returns:
Job ID
class psyrun.scheduler.JobStatus(id, status, name)
id

Alias for field number 0

name

Alias for field number 2

status

Alias for field number 1

class psyrun.scheduler.Scheduler[source]

Scheduler interface.

USER_DEFAULT_ARGS = {}
get_jobs()[source]

Get all queued, running, and recently finished jobs.

Returns:
sequence

Job IDs

get_status(jobid)[source]

Get the status of a job.

Parameters:
jobid

Job to request status of.

Returns:
`JobStatus`

Returns a tuple with (id, status, name) wherein status can be

  • 'Q' for a queued job
  • '*Q' for a queued job waiting on another job to finish
  • 'Z' for a sleeping job
  • 'D' for a completed job

If no status data is available for the job ID, None will be returned.

kill(jobid)[source]

Kill a job.

Parameters:
jobid

Job to kill.

submit(args, output_filename, name=None, depends_on=None, scheduler_args=None)[source]

Submit a job.

Parameters:
args : sequence

The command and arguments to execute.

output_filename : str

File to write process output to.

name : str, optional

Name of job.

depends_on : sequence of int, optional

IDs of jobs that need to finish first before the submitted job can be started.

scheduler_args : dict, optional

Additional arguments for the scheduler.

Returns:
Job ID
submit_array(n, args, output_filename, name=None, depends_on=None, scheduler_args=None)[source]

Submit a job array.

If the scheduler does not support job arrays, this method should raise NotImplementedError.

Parameters:
n : int

Number of tasks to submit.

args : sequence

The command and arguments to execute. The string '%a' will be replaced with the task number in each argument.

output_filename : str

File to write process output to. The string '%a' will be replaced by the task number.

name : str, optional

Name of job.

depends_on : sequence of int, optional

IDs of jobs that need to finish first before the submitted job can be started.

scheduler_args : dict, optional

Additional arguments for the scheduler.

Returns:
Job ID
class psyrun.scheduler.Slurm(workdir=None)[source]

Slurm (sbatch) scheduler.

KNOWN_ARGS = {'array': <LongOption '--array'>, 'cores-per-socket': <LongOption '--cores-per-socket'>, 'depends_on': <ShortOption '-d'>, 'memory': <LongOption '--mem'>, 'memory_per_cpu': <LongOption '--mem-per-cpu'>, 'n_cpus': <ShortOption '-c'>, 'n_nodes': <ShortOption '-N'>, 'name': <ShortOption '-J'>, 'output_file': <ShortOption '-o'>, 'sockets-per-node': <LongOption '--sockets-per-node'>, 'timelimit': <ShortOption '-t'>}
STATUS_MAP = {'CA': 'D', 'CD': 'D', 'CF': 'R', 'CG': 'R', 'F': 'D', 'NF': 'D', 'PD': 'Q', 'R': 'R', 'RV': 'D', 'SE': 'D', 'TO': 'D'}
USER_DEFAULT_ARGS = {'memory': '1G', 'timelimit': '1h'}
get_jobs()[source]

Get all queued and running jobs.

Returns:
list of str

Job IDs

get_status(jobid=None)[source]

Get the status of a job.

Parameters:
jobid : str

Job to request status of.

Returns:
namedtuple or None

Returns a tuple with (id, status, name) wherein status can be

  • 'R' for a running job
  • 'Q' for a queued job
  • '*Q' for a queued job waiting on another job to finish
  • 'Z' for a sleeping job
  • 'D' for a completed job

If no status data is available for the job ID, None will be returned.

kill(jobid)[source]

Kill a job.

Parameters:
jobid : str

Job to kill.

refresh_job_info()[source]
submit(args, output_filename, name=None, depends_on=None, scheduler_args=None)[source]

Submit a job.

Parameters:
args : list

The command and arguments to execute.

output_filename : str

File to write process output to.

name : str, optional

Name of job.

depends_on : list of int, optional

IDs of jobs that need to finish first before the submitted job can be started.

scheduler_args : dict, optional

Additional arguments for the scheduler.

Returns:
str

Job ID

submit_array(n, args, output_filename, name=None, depends_on=None, scheduler_args=None)[source]

Submit a job array.

If the scheduler does not support job arrays, this method should raise NotImplementedError.

Parameters:
n : int

Number of tasks to submit.

args : sequence

The command and arguments to execute. The string '%a' will be replaced with the task number in each argument.

output_filename : str

File to write process output to. The string '%a' will be replaced by the task number.

name : str, optional

Name of job.

depends_on : sequence of int, optional

IDs of jobs that need to finish first before the submitted job can be started.

scheduler_args : dict, optional

Additional arguments for the scheduler.

Returns:
Job ID
class psyrun.scheduler.Sqsub(workdir=None)[source]

sqsub (sharcnet) scheduler.

KNOWN_ARGS = {'depends_on': <ShortOption '-w'>, 'idfile': <LongOption '--idfile'>, 'memory': <LongOption '--mpp'>, 'n_cpus': <ShortOption '-n'>, 'n_nodes': <ShortOption '-N'>, 'name': <ShortOption '-j'>, 'output_file': <ShortOption '-o'>, 'timelimit': <ShortOption '-r'>}
USER_DEFAULT_ARGS = {'memory': '1G', 'n_cpus': 1, 'n_nodes': 1, 'timelimit': '1h'}
get_jobs()[source]

Get all queued and running jobs.

Returns:
list of int

Job IDs

get_status(jobid=None)[source]

Get the status of a job.

Parameters:
jobid : int

Job to request status of.

Returns:
namedtuple or None

Returns a tuple with (id, status, name) wherein status can be

  • 'R' for a running job
  • 'Q' for a queued job
  • '*Q' for a queued job waiting on another job to finish
  • 'Z' for a sleeping job
  • 'D' for a completed job

If no status data is available for the job ID, None will be returned.

kill(jobid)[source]

Kill a job.

Parameters:
jobid : int

Job to kill.

refresh_job_info()[source]
submit(args, output_filename, name=None, depends_on=None, scheduler_args=None)[source]

Submit a job.

Parameters:
args : list

The command and arguments to execute.

output_filename : str

File to write process output to.

name : str, optional

Name of job.

depends_on : list of int, optional

IDs of jobs that need to finish first before the submitted job can be started.

scheduler_args : dict, optional

Additional arguments for the scheduler.

Returns:
int

Job ID

submit_array(n, args, output_filename, name=None, depends_on=None, scheduler_args=None)[source]

Submit a job array.

If the scheduler does not support job arrays, this method should raise NotImplementedError.

Parameters:
n : int

Number of tasks to submit.

args : sequence

The command and arguments to execute. The string '%a' will be replaced with the task number in each argument.

output_filename : str

File to write process output to. The string '%a' will be replaced by the task number.

name : str, optional

Name of job.

depends_on : sequence of int, optional

IDs of jobs that need to finish first before the submitted job can be started.

scheduler_args : dict, optional

Additional arguments for the scheduler.

Returns:
Job ID

tasks module

Psyrun’s task loading API.

class psyrun.tasks.Config[source]

Task configuration.

Attributes:
backend : Backend, default: DistributeBackend

The processing backend which determines how work is distributed across jobs.

exclude_from_result : sequence of str, default: []

Keys of items to exclude from result. This can be useful if parameters or parts of the result cannot be saved to disk.

file_dep : sequence of str, default: []

Additional files the task depends on.

max_jobs : int, default: 100

Maximum number of jobs to start. With less jobs each job has to process more parameter assignments. It depends on the scheduler and backend used to which degree these will run in parallel.

min_items : int, default: 1

Minimum number of parameter assignment to evaluate per job. If a single assignment is fast to evaluate, increasing this number can improve performance because Psyrun will not start a new job for each parameter assignment which can save some overhead.

overwrite_dirty : bool, default: True

Whether to overwrite dirty workdirs without a warning.

pool_size : int, default: 1

Number of parallel threads or processes each job will run. This allows for parallelization without a proper scheduler (e.g. when using psyrun.scheduler.ImmediateRun).

pspace : ParameterSpace, required

Parameter space to evaluate.

python : str, default: sys.executable

Path to Python interpreter to use.

resultfile : str or None, default: None

Path to save the results of the finished task at. If None, this defaults to 'result.<ext>' in the workdir.

scheduler : Scheduler, default: ImmediateRun

Scheduler to use to submit individual jobs.

scheduler_args : dict, default: {}

Additional scheduler arguments. See the documentation of the scheduler for details.

setup : function, default: None

Function to call after starting a worker process before any parameter sets are processed. The function gets the ID of the worker process (usually starting at 0 and incremented by one for each process) as sole argument. It may return a dictionary of additional arguments to pass to the processing function. The setup function can be used to initialize process wide resources.

store : Store, default: PickleStore

Input/output backend.

workdir : str, default: 'psy-work'

Working directory to store results and supporting data to process the task.

apply_as_default(task)[source]

Copies the attributes to a different object given they are not set in that object.

Parameters:
task : obj

Object to copy the attributes to.

backend
exclude_from_result
file_dep
classmethod load_from_file(filename)[source]

Load the configuration values from a Python file.

Parameters:
filename : str

Python file to load.

max_jobs
min_items
overwrite_dirty
pool_size
pspace
python
resultfile
scheduler
scheduler_args
setup
store
workdir
class psyrun.tasks.PackageLoader(taskdir)[source]

Loads tasks from Python files.

Filenames have to match the regular expression defined in TaskDef.TASK_PATTERN. See Config for supported module level variables in the task definition.

It is possible to set these variables for all tasks by setting them in the file psy-conf.py in the taskdir.

Parameters:
taskdir : str

Directory to load task files from.

Attributes:
taskdir : str

Directory to load task files from.

conf : Config

Default values for module level task variables.

load_task_defs()[source]

Load task definitions.

Returns:
list of `TaskDef`

Task definitions.

class psyrun.tasks.TaskDef(path, conf=None)[source]

Task defined by a Python file.

Parameters:
path : str

Python file to load as task.

conf : Config

Default values for task parameters.

Attributes:
TASK_PATTERN : re.RegexObject

Regular expression to match task filenames.

TASK_PATTERN = re.compile('^task_(.*)$')

Supporting interfaces

These modules provide functionality that is still central to Psyrun, but are usually not required to be accessed in other programs.

main

Defines the psy command line interface commands.

class psyrun.main.CleanCmd(cmd, argv)[source]
default_to_all = False
long_desc = 'Removes all processing and result files associated with a task.'
run_task(task)[source]
short_desc = 'clean task data'
class psyrun.main.Command(cmd, argv)[source]

Base class for commands.

Deriving classes are supposed to implement run and may overwrite add_args to add additional arguments to self.parser.

Parameters:
cmd : str

Command name.

argv : sequence

Arguments to the command.

Attributes:
parser : argparse.ArgumentParser

Parser for arguments.

args : argparse.Namespace

Parsed arguments.

add_args()[source]

Add command arguments to self.parser.

long_desc = ''
run()[source]

Run the command.

short_desc = ''
class psyrun.main.KillCmd(cmd, argv)[source]
default_to_all = False
long_desc = 'Kill all running and queued task jobs.'
run_task(task)[source]
short_desc = 'kill task jobs'
class psyrun.main.ListCmd(cmd, argv)[source]
long_desc = 'Lists all available tasks.'
run()[source]

Run the command.

short_desc = 'list tasks'
class psyrun.main.MergeCmd(cmd, argv)[source]
add_args()[source]

Add command arguments to self.parser.

long_desc = 'Merges all data files in a directory into a single data file.'
run()[source]

Run the command.

short_desc = 'merge data files into single file'
class psyrun.main.NewTaskCmd(cmd, argv)[source]
add_args()[source]

Add command arguments to self.parser.

long_desc = 'Copy task file template to a new task file.'
run()[source]

Run the command.

short_desc = 'create new task'
class psyrun.main.RunCmd(cmd, argv)[source]
add_args()[source]

Add command arguments to self.parser.

long_desc = 'Submits the jobs to run one or more tasks. If no task name is provided, all tasks will be run.'
run_task(task)[source]
short_desc = 'run one or more tasks'
class psyrun.main.StatusCmd(cmd, argv)[source]
add_args()[source]

Add command arguments to self.parser.

long_desc = 'Prints the status (number of completed parameter assignments of the total). Can also print parameter assignments that have not been evaluated yet.'
run_task(task)[source]
short_desc = 'print status of tasks'
class psyrun.main.TaskdirCmd(cmd, argv)[source]

Base class for commands that accept a --taskdir argument.

add_args()[source]

Add command arguments to self.parser.

run()[source]

Run the command.

class psyrun.main.TaskselCmd(cmd, argv)[source]

Base class for commands that accept a selection of tasks as arguments.

Attributes:
default_to_all : bool

Indicates whether the command defaults to all (or no) tasks when no task names are specified as arguments.

add_args()[source]

Add command arguments to self.parser.

default_to_all = True
run()[source]

Run the command.

run_task(task)[source]
class psyrun.main.TestCmd(cmd, argv)[source]
long_decs = 'Tests the task execution by running a single parameter assignment. The job will not be submitted with to the scheduler, but run immediatly.'
run_task(task)[source]
short_desc = 'test execution of single parameter assignment'
psyrun.main.psy_main(argv=None, init_venv=True)[source]

Runs psyrun tasks.

Parameters:
argv : sequence of str, optional

psy command line arguments.

init_venv : bool, optional

Use the virtualenv active in the shell environment if set to True.

Returns:
int

Return code.

jobs

Handling and processing of job trees.

class psyrun.jobs.Clean(job, task, names, uptodate=None)[source]

Clean all target files and supporting files of jobs that are outdated.

The constructor will call visit.

Parameters:
job : job tree

Tree of jobs to clean.

task : TaskDef

Task that generated the job tree.

names : dict

Maps jobs to their names. (Can be obtained with Fullname.)

uptodate : dict, optional

Maps jobs to their up-to-date status. (Can be obtained with Uptodate.) If not provided, all jobs are treated as outdated.

Attributes:
task : TaskDef

Task that generated the job tree.

names : dict

Maps jobs to their names.

uptodate : dict

Maps jobs to their up-to-date status.

visit_chain(chain)[source]
visit_group(group)[source]
visit_job(job)[source]
class psyrun.jobs.Fullname(jobtree)[source]

Construct names of the jobs.

The constructor will call visit.

Parameters:
jobtree : job tree

Tree of jobs to construct names for.

Attributes:
names : dict

Maps jobs to their names.

visit_chain(chain)[source]
visit_group(group)[source]
visit_job(job)[source]
class psyrun.jobs.Job(name, submit_fn, submit_kwargs, dependencies, targets)[source]

Describes a single processing job.

Parameters:
name : str

Name of the job.

submit_fn : function

Function to use to submit the job for processing.

submit_kwargs : dict

Additional Keyword arguments to submit function (in addition to name and depends_on).

dependencies : sequence

Identifiers of other jobs that need to finish first before this job can be run.

targets : sequence of str

Files created by this job.

Attributes:
name : str

Name of the job.

submit_fn : function

Function to use to submit the job for processing.

code : str

Python code to execute.

dependencies : sequence

Identifiers of other jobs that need to finish first before this job can be run.

targets : sequence of str

Files created by this job.

class psyrun.jobs.JobArray(n, name, submit_fn, single_submit_fn, submit_kwargs, dependency_patterns, target_patterns)[source]
dependencies
targets
class psyrun.jobs.JobChain(name, jobs)[source]

Chain of jobs to run in succession.

Parameters:
name : str

Name of the job chain.

jobs : sequence of Job

Jobs to run in succession.

Attributes:
name : str

Name of the job chain.

jobs : sequence of Job

Jobs to run in succession.

dependencies : sequence

Jobs that need to run first before the job chain can be run (equivalent to the dependencies of the first job in the chain).

targets : sequence of str

Files created or updated by the job chain (equivalent to the targets of the last job in the chain).

dependencies
targets
class psyrun.jobs.JobGroup(name, jobs)[source]

Group of jobs that can run in parallel.

Parameters:
name : str

Name of the job group.

jobs : sequence of Job

Jobs to run in the job group.

Attributes:
name : str

Name of the job group.

jobs : sequence of Job

Jobs to run in the job group.

dependencies : sequence

Jobs that need to run first before the job group can be run (equivalent to the union of all the group’s job’s dependencies).

targets : sequence of str

Files that will be created or updated by the group’s jobs (equivalent to the union of all the group’s job’s targets).

dependencies
targets
class psyrun.jobs.JobTreeVisitor[source]

Abstract base class to implement visitors on trees of jobs.

Base class to implement visitors following the Visitor pattern to traverse the tree constructed out of Job, JobChain, and JobGroup instances.

A deriving class should overwrite visit_job, visit_chain, and visit_group. Use the visit method to start visiting a tree of jobs.

visit(job)[source]

Visit all jobs in the tree job.

visit_array(job_array)[source]
visit_chain(chain)[source]
visit_group(group)[source]
visit_job(job)[source]
class psyrun.jobs.Submit(job, names, uptodate)[source]

Submit all jobs that are not up-to-date.

The constructor will call visit.

Parameters:
job : job tree

Tree of jobs to submit.

names : dict

Maps jobs to their names. (Can be obtained with Fullname.)

uptodate : dict

Maps jobs to their up-to-date status. (Can be obtained with Uptodate.)

Attributes:
names : dict

Maps jobs to their names.

uptodate : dict

Maps jobs to their up-to-date status.

visit_array(job)[source]
visit_chain(chain)[source]
visit_group(group)[source]
visit_job(job)[source]
class psyrun.jobs.Uptodate(jobtree, names, task)[source]

Determines the up-to-date status of jobs.

The constructor will call visit.

Parameters:
jobtree : job tree

Tree of jobs to determine the up-to-date status for.

names : dict

Maps jobs to their names. (Can be obtained with Fullname.)

task : TaskDef

Task that generated the job tree.

Attributes:
names : dict

Maps jobs to their names.

task : TaskDef

Task that generated the job tree.

status : dict

Maps jobs to their up-to-date status.

files_uptodate(tref, targets)[source]

Checks that all targets are newer than tref.

is_job_queued(job)[source]

Checks whether job is queud.

post_visit()[source]

Called after visit.

Checks whether jobs are still running and marks these as up-to-date while issuing a warning.

visit_chain(chain)[source]
visit_group(group)[source]
visit_job(job)[source]

Utilities

The utility modules provide various things that are not related to Psyrun’s core functionality. The API of utilities is not guaranteed to be stable across versions.

utils.doc module

Documentation utilities.

psyrun.utils.doc.inherit_docs(cls)[source]

Class decorator that makes it inherit function doc strings.

utils.example module

Things used in examples.

psyrun.utils.example.square(x)[source]

Returns {'y': x * x}.

utils.testing module

utils.venv module

Management of virtual environments.

psyrun.utils.venv.init_virtualenv(venv=None)[source]

Activates a virtual environment.

This function won’t initialize a virtual environment when run on Travis-CI which is detected with the CI environment variable.

Parameters:
venv : str, optional

Name of virtualenv to activate. If None, the virtualenv given in the environment variable VIRTUAL_ENV will be used. If that environment variable is not set either, no virtualenv will be activated.