Source code for openpaperwork_core.config.backend.configparser

"""
Manages a configuration file using configparser.
"""

import collections
import configparser
import datetime
import logging

from ... import (_, PluginBase)


LOGGER = logging.getLogger(__name__)


[docs]class ConfigBool(object): def __init__(self, value=False): if isinstance(value, str): self.value = (value.lower() == 'true') else: self.value = value def __eq__(self, o): return (self.value == o) def __bool__(self): return self.value def __str__(self): return str(self.value)
[docs] def get_value(self): return self.value
[docs]class ConfigDate(object): DATE_FORMAT = "%Y-%m-%d" def __init__(self, value=datetime.datetime(year=1971, month=1, day=1)): if isinstance(value, str): self.value = ( datetime.datetime .strptime(value, self.DATE_FORMAT) .date() ) else: self.value = value def __eq__(self, o): return (self.value == o) def __str__(self): return self.value.strftime(self.DATE_FORMAT)
[docs] def get_value(self): return self.value
[docs]class ConfigList(object): SEPARATOR = ";" def __init__(self, value=None, elements=None): if elements is None: elements = [] self.elements = elements if value is not None: if isinstance(value, str): if value != '': elements = value.split(self.SEPARATOR) for i in elements: (t, v) = i.split(_TYPE_SEPARATOR, 1) self.elements.append(_STR_TO_TYPE[t](v)) elif hasattr(value, 'elements'): self.elements = value.elements[:] else: self.elements = list(value) def __iter__(self): return iter(self.elements) def __contains__(self, o): return o in self.elements def __getitem__(self, *args, **kwargs): return self.elements.__getitem__(*args, **kwargs) def __setitem__(self, *args, **kwargs): return self.elements.__setitem__(*args, **kwargs) def __len__(self): return len(self.elements)
[docs] def append(self, value): self.elements.append(value)
[docs] def remove(self, value): self.elements.remove(value)
def __str__(self): out = [] for e in self.elements: out.append("{}{}{}".format( _TYPE_TO_STR[type(e)], _TYPE_SEPARATOR, str(e) )) return self.SEPARATOR.join(out)
[docs] def get_value(self): return [ e.get_value() if hasattr(e, 'get_value') else e for e in self.elements ]
[docs]class ConfigDict(object): SEPARATOR_ITEMS = ";" SEPARATOR_KEYVALS = "=" def __init__(self, value=None, elements={}): self.elements = elements if value is not None: if isinstance(value, str): elements = value.split(self.SEPARATOR_ITEMS) for i in elements: (k, v) = i.split(self.SEPARATOR_KEYVALS, 1) (t, v) = v.split(_TYPE_SEPARATOR, 1) self.elements[k] = _STR_TO_TYPE[t](v) elif hasattr(value, 'elements'): self.elements = value.elements[:] else: self.elements = dict(value) def __iter__(self): return iter(self.elements) def __contains__(self, o): return o in self.elements def __getitem__(self, *args, **kwargs): return self.elements.__getitem__(*args, **kwargs) def __setitem__(self, *args, **kwargs): return self.elements.__setitem__(*args, **kwargs) def __len__(self): return len(self.elements) def __str__(self): out = [] for (k, v) in self.elements.items(): out.append("{}{}{}{}{}".format( k, self.SEPARATOR_KEYVALS, _TYPE_TO_STR[type(v)], _TYPE_SEPARATOR, str(v) )) return self.SEPARATOR_ITEMS.join(out)
[docs] def get_value(self): return { k: v.get_value() if hasattr(v, 'get_value') else v for (k, v) in self.elements.items() }
_TYPE_TO_STR = { bool: "bool", ConfigBool: "bool", ConfigDate: "date", ConfigDict: "dict", ConfigList: "list", datetime.date: "date", dict: "dict", float: "float", int: "int", list: "list", str: "str", tuple: "list", } _STR_TO_TYPE = { "bool": ConfigBool, "date": ConfigDate, "dict": ConfigDict, "list": ConfigList, "float": float, "int": int, "str": str, } _TYPE_SEPARATOR = ":"
[docs]class Plugin(PluginBase): TEST_FILE_URL = None # unit tests only def __init__(self): self.config = configparser.RawConfigParser() self.base_path = None self.application_name = None self.config_path = None self.observers = collections.defaultdict(set) self.core = None self.default_plugins = []
[docs] def get_interfaces(self): return ['config_backend']
[docs] def get_deps(self): return [ { 'interface': 'fs', 'defaults': ['openpaperwork_core.fs.python'], }, { 'interface': 'paths', 'defaults': ['openpaperwork_core.paths.xdg'], }, ]
[docs] def init(self, core): self.core = core if self.base_path is None: self.base_path = self.core.call_success("paths_get_config_dir")
def _get_filepath(self): if self.TEST_FILE_URL is not None: return self.TEST_FILE_URL return self.core.call_success( "fs_join", self.base_path, self.application_name + ".conf" )
[docs] def config_backend_load(self, application_name): self.application_name = application_name self.config_path = self._get_filepath() self.config = configparser.RawConfigParser() LOGGER.info("Loading configuration '%s' ...", self.config_path) if self.core.call_success("fs_exists", self.config_path) is not None: fd = self.core.call_success("fs_open", self.config_path, 'r') with fd: self.config.read_file(fd) else: LOGGER.warning( "Cannot load configuration '%s'. File does not exist", self.config_path ) for observers in self.observers.values(): for observer in observers: observer()
[docs] def config_backend_save(self, application_name=None): if application_name is not None: self.application_name = application_name config_path = self._get_filepath() LOGGER.info("Writing configuration '%s' ...", config_path) with self.core.call_success("fs_open", config_path, 'w') as fd: self.config.write(fd)
[docs] def config_backend_load_plugins(self, opt_name, default=[]): """ Load and init the plugin list from the configuration. """ self.default_plugins = default modules = self.config_backend_get( "plugins", opt_name, ConfigList(None, self.default_plugins) ) LOGGER.info( "Loading and initializing plugins from configuration: %s", str(modules) ) for module in modules: self.core.load(module) self.core.init()
[docs] def config_backend_list_active_plugins(self, opt_name): return self.config_backend_get( "plugins", opt_name, ConfigList(None, self.default_plugins) )
[docs] def config_backend_reset_plugins(self, opt_name): self.config_backend_del("plugins", opt_name)
[docs] def config_backend_add_plugin(self, opt_name, module_name): LOGGER.info("Adding plugin '%s' to configuration", module_name) modules = self.config_backend_list_active_plugins(opt_name) modules.append(module_name) self.config_backend_put("plugins", opt_name, modules)
[docs] def config_backend_remove_plugin(self, opt_name, module_name): LOGGER.info("Removing plugin '%s' from configuration", module_name) modules = self.config_backend_list_active_plugins(opt_name) try: modules.remove(module_name) except ValueError: LOGGER.warning("Plugin '%s' not found", module_name) self.config_backend_put("plugins", opt_name, modules)
[docs] def config_backend_put(self, section, key, value): """ Section must be a string. Key must be a string. """ LOGGER.debug("Configuration: %s:%s <-- %s", section, key, str(value)) if value is None: if section not in self.config: return if key not in self.config[section]: return self.config[section].pop(key) return t_str = _TYPE_TO_STR[type(value)] t = _STR_TO_TYPE[t_str] value = t(value) value = "{}{}{}".format(t_str, _TYPE_SEPARATOR, str(value)) if section not in self.config: self.config[section] = {key: value} else: self.config[section][key] = value for observer in self.observers[section]: observer()
[docs] def config_backend_del(self, section, key): if section not in self.config: return if key not in self.config[section]: return self.config.remove_option(section, key)
[docs] def config_backend_get(self, section, key, default=None): try: value = self.config[section][key] if value.strip() == "": return None (t, value) = value.split(_TYPE_SEPARATOR, 1) r = _STR_TO_TYPE[t](value) if hasattr(r, 'get_value'): r = r.get_value() LOGGER.debug("Configuration: %s:%s --> %s", section, key, str(r)) return r except KeyError: LOGGER.debug( "Configuration: %s:%s --> %s (default value)", section, key, str(default) ) return default
[docs] def config_backend_add_observer(self, section, callback): self.observers[section].add(callback)
[docs] def config_backend_remove_observer(self, section, callback): self.observers[section].remove(callback)
[docs] def bug_report_get_attachments(self, out: dict): if self.config_path is None: return if self.core.call_success("fs_exists", self.config_path) is None: return out['config'] = { 'include_by_default': True, 'date': None, 'file_type': _("App. config."), 'file_url': self.config_path, 'file_size': self.core.call_success("fs_getsize", self.config_path) }