"""Load balancing backend."""
import fcntl
import os
from psyrun.backend.base import Backend, JobSourceFile
from psyrun.jobs import Job, JobArray, JobChain
from psyrun.mapper import map_pspace
from psyrun.pspace import missing, Param
from psyrun.store import DefaultStore
from psyrun.utils.doc import inherit_docs
[docs]@inherit_docs
class LoadBalancingBackend(Backend):
"""Create subtasks for load balanced parameter evaluation.
This will create *max_jobs* worker jobs that will fetch parameter
assignments from a queue. Thus, all worker jobs should be busy all of the
time.
This backend is useful if individual parameters assignments can vary to a
large degree in their processing time. It is recommended to use a `Store`
that supports efficient fetching of a single row from a file.
Parameters
----------
task : `TaskDef`
Task definition to create subtasks for.
"""
@property
def infile(self):
"""File from which input parameter assignments are fetched."""
return os.path.join(self.workdir, 'in' + self.task.store.ext)
@property
def statusfile(self):
"""File that stores the processing status."""
return os.path.join(self.workdir, 'status')
@property
def partial_resultfile(self):
"""File results are appended to while processing is in progress."""
root, ext = os.path.splitext(self.resultfile)
return root + '.part' + ext
@property
def resultfile(self):
"""Final result file."""
if self.task.resultfile:
return self.task.resultfile
else:
return os.path.join(self.workdir, 'result' + self.task.store.ext)
[docs] def create_job(self, cont=False):
pspace = self.create_pspace_job(cont=cont)
process = self.create_process_job()
finalize = self.create_finalize_job()
return JobChain(self.task.name, [pspace, process, finalize])
[docs] def create_pspace_job(self, cont):
code = '''
import os.path
from psyrun.pspace import missing, Param
from psyrun.backend.load_balancing import LoadBalancingWorker
pspace = task.pspace
if {cont!r}:
if os.path.exists({outfile!r}):
os.rename({outfile!r}, {part_outfile!r})
if os.path.exists({part_outfile!r}):
pspace = missing(pspace, Param(**task.store.load({part_outfile!r})))
task.store.save({infile!r}, pspace.build())
LoadBalancingWorker.create_statusfile({statusfile!r})
'''.format(
cont=cont, infile=self.infile, outfile=self.resultfile,
part_outfile=self.partial_resultfile, statusfile=self.statusfile)
file_dep = [os.path.join(os.path.dirname(self.task.path), f)
for f in self.task.file_dep]
return Job(
'pspace', self.submit_code, {'code': code},
[self.task.path] + file_dep, [self.infile])
[docs] def create_process_job(self):
source_file = JobSourceFile(
os.path.join(self.workdir, self.task.name + ':process.py'),
self.task,
'''
from multiprocessing import Process
import sys
from psyrun.backend.load_balancing import LoadBalancingWorker
if __name__ == '__main__':
workers = [
LoadBalancingWorker(
i, sys.argv[1], sys.argv[2], sys.argv[3],
task.store, task.exclude_from_result)
for i in range(task.pool_size)]
processes = [Process(target=w.start, args=(task.execute, task.setup))
for w in workers]
for p in processes:
p.start()
for p in processes:
p.join()
''')
return JobArray(
self.task.max_jobs, 'process', self.submit_array, self.submit_file,
{'job_source_file': source_file, 'args': [
self.infile, self.partial_resultfile, self.statusfile
]}, [self.infile], [self.partial_resultfile])
[docs] def create_finalize_job(self):
code = '''
import os
os.rename({part!r}, {whole!r})
'''.format(part=self.partial_resultfile, whole=self.resultfile)
return Job(
'finalize', self.submit_code, {'code': code},
[self.partial_resultfile],
[self.resultfile])
[docs] def get_missing(self):
missing_items = self.task.pspace
try:
missing_items = missing(
missing_items, Param(**self.task.store.load(self.resultfile)))
except IOError:
try:
missing_items = missing(
missing_items,
Param(**self.task.store.load(self.partial_resultfile)))
except IOError:
pass
return missing_items
[docs] def get_queued(self):
return None
[docs] def get_failed(self):
return None
[docs]class LoadBalancingWorker(object):
"""Maps a function to the parameter space supporting other
*LoadBalancingWorkers* processing the same input file at the same time.
Parameters
----------
proc_id : int
Worker ID.
infile : str
Filename of the file with the input parameters space.
outfile : str
Filename of the file to write the results to.
statusfile : str
Filename of the file to track the processing progress.
store : `Store`, optional
Input/output backend.
exclude_from_result : sequence, optional
Keys of items to exclude from the result.
"""
def __init__(
self, proc_id, infile, outfile, statusfile, store=DefaultStore(),
exclude_from_result=None):
self.proc_id = proc_id
self.infile = infile
self.outfile = outfile
self.statusfile = statusfile
self.store = store
if exclude_from_result is None:
exclude_from_result = []
self.exclude_from_result = exclude_from_result
[docs] @classmethod
def create_statusfile(cls, statusfile):
"""Creates the status file required by all load balancing workers.
Parameters
----------
statusfile : str
Filename of the status file.
"""
with open(statusfile, 'w') as f:
f.write('0')
f.flush()
[docs] def get_next_ix(self):
"""Get the index of the next parameter assignment to process."""
with open(self.statusfile, 'r+') as f:
fcntl.flock(f, fcntl.LOCK_EX)
try:
ix = int(f.read())
f.seek(0)
f.truncate(0)
f.write(str(ix + 1))
f.flush()
finally:
fcntl.flock(f, fcntl.LOCK_UN)
return ix
[docs] def get_next_param_set(self):
"""Load the next parameter assignment to process."""
return self.store.load(self.infile, row=self.get_next_ix())
[docs] def save_data(self, data):
"""Appends data to the *outfile*.
Uses a lock on the file to support concurrent access.
"""
with open(self.statusfile + '.lock', 'w') as lock:
fcntl.flock(lock, fcntl.LOCK_EX)
try:
self.store.append(self.outfile, data)
finally:
fcntl.flock(lock, fcntl.LOCK_UN)
[docs] def start(self, fn, setup_fn=None):
"""Start processing a parameter space.
A status file needs to be created before invoking this function by
calling `create_statusfile`.
Parameters
----------
fn : function
Function to evaluate on the parameter space.
setup_fn : function, optional
Setup function, called with the worker ID as argument before
processing of parameter sets begins. May return a dictionary of
parameters added to the invocation of *fn*.
"""
add_params = None
if setup_fn is not None:
add_params = setup_fn(self.proc_id)
if add_params is None:
add_params = {}
while True:
try:
pspace = Param(**self.get_next_param_set())
except IndexError:
return
data = map_pspace(
fn, Param(**add_params) * pspace,
exclude=self.exclude_from_result)
self.save_data(data)