Source code for psyrun.exceptions
"""Psyrun specific exceptions and warnings."""
import warnings
[docs]class TaskWarning(UserWarning):
"""General warning related to task processing.
*TaskWarnings* will always be shown by default.
"""
def __init__(self, task_name, *args, **kwargs):
super(TaskWarning, self).__init__(*args, **kwargs)
self.task_name = task_name
[docs]class JobsRunningWarning(TaskWarning):
"""Warning issued when jobs for a task are still running."""
def __init__(self, task_name):
super(JobsRunningWarning, self).__init__(
task_name,
"Task '{}' has unfinished jobs queued. Not starting new jobs "
"until these are finished or have been killed.".format(task_name))
[docs]class TaskWorkdirDirtyWarning(TaskWarning):
"""Warning issued when the workdir is dirty and would be overwritten."""
def __init__(self, task_name):
# TODO explain how to solve this
super(TaskWorkdirDirtyWarning, self).__init__(
task_name,
"Work directory of task '{}' is dirty.".format(task_name))
[docs]class IneffectiveExcludeWarning(UserWarning):
"""Warning issued when a key in *exclude_from_result* was not found in the
result.
"""
def __init__(self, key):
super(IneffectiveExcludeWarning, self).__init__(
"The key '{}' to remove from the result was not found in the "
"result dictionary.".format(key))
self.key = key
warnings.simplefilter('always', category=TaskWarning)