Source code for psyrun.store.base

"""Base store interface."""

import os.path
from pkg_resources import iter_entry_points


[docs]class Store(object): """Defines the interface of stores. Register implemented stores as `entry points <https://setuptools.readthedocs.io/en/latest/setuptools.html#dynamic-discovery-of-services-and-plugins>`_ in the ``psyrun.stores`` groupn. For example, add the following to the ``setup`` call in your store's ``setup.py`` for a store providing the ``.ext`` format:: entry_points={ 'psyrun.stores': ['.ext = pkg.name:ClassName'], } Attributes ---------- ext : str Filename extension used by the store. """ ext = ''
[docs] def append(self, filename, data): """Append data to file. When trying to append data to a non-existing file, a new file will be created. The backend may require that a file was created with this function to be able to append to it. Parameters ---------- filename : str Filename of file to append the data to. data : dict Dictionary with data to append. """ raise NotImplementedError()
[docs] def save(self, filename, data): """Save data to a file. Parameters ---------- filename : str Filename of file to save data to. data : dict Dictionary with data to store. """ raise NotImplementedError()
[docs] def load(self, filename, row=None): """Load data from a file. Parameters ---------- filename : str Filename of file to load data from. row : int, optional If given, only the row with this index will be loaded. Returns ------- dict Loaded data. """ raise NotImplementedError()
def _safe_ep_load(): for ep in iter_entry_points('psyrun.stores'): try: yield ep.name, ep.load() except ImportError: pass
[docs]class AutodetectStore(Store): """Automatically selects the store based on the file extension.""" registry = dict(_safe_ep_load())
[docs] @classmethod def get_concrete_store(cls, filename): _, ext = os.path.splitext(filename) return cls.registry[ext.lower()]()
[docs] def append(self, filename, data): return self.get_concrete_store(filename).append(filename, data)
[docs] def save(self, filename, data): return self.get_concrete_store(filename).save(filename, data)
[docs] def load(self, filename, row=None): return self.get_concrete_store(filename).load(filename, row=row)