Source code for psyrun.mapper

"""Map functions onto parameter spaces."""

from multiprocessing import Pool
import os.path
import warnings

from psyrun.exceptions import IneffectiveExcludeWarning
from psyrun.pspace import dict_concat, missing, Param


[docs]def get_result(fn, params, exclude=None): """Evaluates a function with given parameters. Evaluates *fn* with the parameters *param* and returns a dictionary with the input parameters and returned output values. Parameters ---------- fn : function Function to evaluate. Has to return a dictionary. params : dict Parameters passed to *fn* as keyword arguments. exclude : sequence, optional Keys of elements to exclude from the results dictionary. Returns ------- dict Returns *params* updated with the return value of *fn*. Examples -------- >>> def fn(x, is_result): ... return {'y': x * x, 'is_result': 1} >>> >>> from pprint import pprint >>> pprint(get_result(fn, {'x': 4, 'is_result': 0})) {'is_result': 1, 'x': 4, 'y': 16} """ result = dict(params) result.update(fn(**params)) if exclude is not None: for k in exclude: if k in result: del result[k] else: warnings.warn(IneffectiveExcludeWarning(k)) return result
def _get_result_single_arg(args): return get_result(*args)
[docs]def map_pspace(fn, pspace, exclude=None): """Maps a function to parameter space values. Parameters ---------- fn : function Function to evaluate on parameter space. Has to return a dictionary. pspace : `ParameterSpace` Parameter space providing parameter values to evaluate function on. exclude : sequence, optional Keys of elements to exclude from the results dictionary. Returns ------- dict Dictionary with the input parameter values and the function return values. Examples -------- >>> def fn(x): ... return {'y': x * x} >>> >>> from pprint import pprint >>> from psyrun import Param >>> pprint(map_pspace(fn, Param(x=[1, 2]))) {'x': [1, 2], 'y': [1, 4]} """ return dict_concat(list(get_result( fn, p, exclude) for p in pspace.iterate()))
[docs]def map_pspace_hdd_backed( fn, pspace, filename, store, return_data=True, pool_size=1, exclude=None): """Maps a function to parameter space values while storing produced data. Data is stored progressively. Thus, if the program crashes, not all data will be lost. Parameters ---------- fn : function Function to evaluate on parameter space. Has to return a dictionary. pspace : `ParameterSpace` Parameter space providing parameter values to evaluate function on. filename : str Filename of file to store data to. store : `Store` Store to save data with. return_data : bool, optional Whether to return the resulting data after mapping the function. This will read all produced data from the disk. exclude : sequence, optional Keys of elements to exclude from the results dictionary. Returns ------- None or dict Dictionary with the input parameter values and the function return values if requested. """ if os.path.exists(filename): pspace = missing(pspace, Param(**store.load(filename))) chunksize = max(1, len(pspace) // pool_size) for r in Pool(pool_size).imap_unordered( _get_result_single_arg, ((fn, p, exclude) for p in pspace.iterate()), chunksize): store.append(filename, dict_concat((r,))) if not os.path.exists(filename): store.save(filename, {}) if return_data: return store.load(filename)
[docs]def map_pspace_parallel( fn, pspace, n_jobs=-1, backend='multiprocessing', exclude=None): """Maps a function to parameter space values in parallel. Requires `joblib <https://pythonhosted.org/joblib/>`_. Parameters ---------- fn : function Function to evaluate on parameter space. Has to return a dictionary. pspace : ParameterSpace Parameter space providing parameter values to evaluate function on. n_jobs : int, optional Number of parallel jobs. Set to -1 to automatically determine. backend : str, optional Backend to use. See `joblib documentation <https://pythonhosted.org/joblib/parallel.html#using-the-threading-backend>`_ for details. exclude : sequence, optional Keys of elements to exclude from the results dictionary. Returns ------- dict Dictionary with the input parameter values and the function return values. Examples -------- >>> from pprint import pprint >>> from psyrun import Param >>> from psyrun.utils.example import square >>> >>> pprint(map_pspace_parallel(square, Param(x=[1, 2]))) {'x': [1, 2], 'y': [1, 4]} """ import joblib parallel = joblib.Parallel(n_jobs=n_jobs, backend=backend) return dict_concat(parallel( joblib.delayed(get_result)(fn, p, exclude) for p in pspace.iterate()))