Source code for openpaperwork_core.config.backend.configparser

"""
Manages a configuration file using configparser.
"""

import collections
import configparser
import datetime
import logging
import os
import os.path

from ... import PluginBase


LOGGER = logging.getLogger(__name__)


[docs]class ConfigBool(object): def __init__(self, value=False): if isinstance(value, str): self.value = (value.lower() == 'true') else: self.value = value def __eq__(self, o): return (self.value == o) def __bool__(self): return self.value def __str__(self): return str(self.value)
[docs]class ConfigDate(object): DATE_FORMAT = "%Y-%m-%d" def __init__(self, value=datetime.datetime(year=1970, month=1, day=1)): if isinstance(value, str): self.value = ( datetime.datetime .strptime(value, self.DATE_FORMAT) .date() ) else: self.value = value def __eq__(self, o): return (self.value == o) def __str__(self): return self.value.strftime(self.DATE_FORMAT)
[docs]class ConfigList(object): SEPARATOR = ";" def __init__(self, value=None, elements=None): if elements is None: elements = [] self.elements = elements if value is not None: if isinstance(value, str): if value != '': elements = value.split(self.SEPARATOR) for i in elements: (t, v) = i.split(_TYPE_SEPARATOR, 1) self.elements.append(_STR_TO_TYPE[t](v)) elif hasattr(value, 'elements'): self.elements = value.elements[:] else: self.elements = list(value) def __iter__(self): return iter(self.elements) def __contains__(self, o): return o in self.elements def __getitem__(self, *args, **kwargs): return self.elements.__getitem__(*args, **kwargs) def __setitem__(self, *args, **kwargs): return self.elements.__setitem__(*args, **kwargs) def __len__(self): return len(self.elements)
[docs] def append(self, value): self.elements.append(value)
[docs] def remove(self, value): self.elements.remove(value)
def __str__(self): out = [] for e in self.elements: out.append("{}{}{}".format( _TYPE_TO_STR[type(e)], _TYPE_SEPARATOR, str(e) )) return self.SEPARATOR.join(out)
[docs]class ConfigDict(object): SEPARATOR_ITEMS = ";" SEPARATOR_KEYVALS = "=" def __init__(self, value=None, elements={}): self.elements = elements if value is not None: if isinstance(value, str): elements = value.split(self.SEPARATOR_ITEMS) for i in elements: (k, v) = i.split(self.SEPARATOR_KEYVALS, 1) (t, v) = v.split(_TYPE_SEPARATOR, 1) self.elements[k] = _STR_TO_TYPE[t](v) elif hasattr(value, 'elements'): self.elements = value.elements[:] else: self.elements = dict(value) def __iter__(self): return iter(self.elements) def __contains__(self, o): return o in self.elements def __getitem__(self, *args, **kwargs): return self.elements.__getitem__(*args, **kwargs) def __setitem__(self, *args, **kwargs): return self.elements.__setitem__(*args, **kwargs) def __len__(self): return len(self.elements) def __str__(self): out = [] for (k, v) in self.elements.items(): out.append("{}{}{}{}{}".format( k, self.SEPARATOR_KEYVALS, _TYPE_TO_STR[type(v)], _TYPE_SEPARATOR, str(v) )) return self.SEPARATOR_ITEMS.join(out)
_TYPE_TO_STR = { bool: "bool", ConfigBool: "bool", ConfigDate: "date", ConfigDict: "dict", ConfigList: "list", datetime.date: "date", dict: "dict", float: "float", int: "int", list: "list", str: "str", tuple: "list", } _STR_TO_TYPE = { "bool": ConfigBool, "date": ConfigDate, "dict": ConfigDict, "list": ConfigList, "float": float, "int": int, "str": str, } _TYPE_SEPARATOR = ":"
[docs]class Plugin(PluginBase): def __init__(self): self.config = configparser.RawConfigParser() self.base_path = os.getenv( "XDG_CONFIG_HOME", os.path.expanduser("~/.config") ) self.config_file_path_fmt = os.path.join( "{directory}", "{app_name}.conf" ) self.application_name = None self.observers = collections.defaultdict(set) self.core = None self.default_plugins = []
[docs] def init(self, core): self.core = core
[docs] def get_interfaces(self): return ['config_backend']
[docs] def config_backend_load(self, application_name): self.application_name = application_name config_path = self.config_file_path_fmt.format( directory=self.base_path, app_name=application_name, ) self.config = configparser.RawConfigParser() LOGGER.info("Loading configuration '%s' ...", config_path) if os.path.exists(config_path): with open(config_path, 'r') as fd: self.config.read_file(fd) else: LOGGER.warning( "Cannot load configuration '%s'. File does not exist", config_path ) for observers in self.observers.values(): for observer in observers: observer()
[docs] def config_backend_save(self, application_name=None): if application_name is not None: self.application_name = application_name config_path = self.config_file_path_fmt.format( directory=self.base_path, app_name=self.application_name, ) LOGGER.info("Writing configuration '%s' ...", config_path) os.makedirs(os.path.dirname(config_path), mode=0o700, exist_ok=True) with open(config_path, 'w') as fd: self.config.write(fd)
[docs] def config_backend_load_plugins(self, opt_name, default=[]): """ Load and init the plugin list from the configuration. """ self.default_plugins = default modules = self.config_backend_get( "plugins", opt_name, ConfigList(None, self.default_plugins) ) LOGGER.info( "Loading and initializing plugins from configuration: %s", str(modules) ) for module in modules: self.core.load(module) self.core.init()
[docs] def config_backend_list_active_plugins(self, opt_name): return self.config_backend_get( "plugins", opt_name, ConfigList(None, self.default_plugins) )
[docs] def config_backend_reset_plugins(self, opt_name): self.config_backend_del("plugins", opt_name)
[docs] def config_backend_add_plugin(self, opt_name, module_name): LOGGER.info("Adding plugin '%s' to configuration", module_name) modules = self.config_backend_list_active_plugins(opt_name) modules.elements.append(module_name) self.config_backend_put("plugins", opt_name, modules)
[docs] def config_backend_remove_plugin(self, opt_name, module_name): LOGGER.info("Removing plugin '%s' from configuration", module_name) modules = self.config_backend_list_active_plugins(opt_name) modules.elements.remove(module_name) self.config_backend_put("plugins", opt_name, modules)
[docs] def config_backend_put(self, section, key, value): """ Section must be a string. Key must be a string. """ LOGGER.debug("Configuration: %s:%s <-- %s", section, key, str(value)) if value is None: if section not in self.config: return if key not in self.config[section]: return self.config[section].pop(key) return t_str = _TYPE_TO_STR[type(value)] t = _STR_TO_TYPE[t_str] value = t(value) value = "{}{}{}".format(t_str, _TYPE_SEPARATOR, str(value)) if section not in self.config: self.config[section] = {key: value} else: self.config[section][key] = value for observer in self.observers[section]: observer()
[docs] def config_backend_del(self, section, key): if section not in self.config: return if key not in self.config[section]: return self.config.remove_option(section, key)
[docs] def config_backend_get(self, section, key, default=None): try: value = self.config[section][key] (t, value) = value.split(_TYPE_SEPARATOR, 1) r = _STR_TO_TYPE[t](value) LOGGER.debug("Configuration: %s:%s --> %s", section, key, str(r)) return r except KeyError: LOGGER.debug( "Configuration: %s:%s --> %s (default value)", section, key, str(default) ) return default
[docs] def config_backend_add_observer(self, section, callback): self.observers[section].add(callback)
[docs] def config_backend_remove_observer(self, section, callback): self.observers[section].remove(callback)