Source code for registrar.registrar

"""Contains all functions relevant for registration
"""
from typing import Callable, List

import structlog
import structlog.contextvars

from .abc import Route
from .config import HandlerConfig, HandlersConfig, RouteConfig, SourceConfig
from .backend import get_backends
from .utils import import_by_path
from .exceptions import RegistrationError

logger = structlog.getLogger(__name__)


[docs]def register( route_cfg: RouteConfig, source_cfgs: List[SourceConfig], value: str ): """Handles the registration of a specific item Arguments: route_cfg (RouteConfig): the used route configuration source_cfgs (List[SourceConfig]): the source configs value (str): the raw value to be parsed """ route_cls = import_by_path(route_cfg.path) route: Route = route_cls(route_cfg, *route_cfg.args, **route_cfg.kwargs) # parse the actual item to be processed item = route.parse(value) # determine the source to be used with that item source = route.get_source(source_cfgs, item) replaced = False with structlog.contextvars.bound_contextvars(item=item): for pre_handler in get_pre_handlers(route_cfg.handlers): pre_handler(route_cfg, item) try: for backend in get_backends(route_cfg.backends): if backend.exists(source, item): if route.replace: logger.info("Replacing item") replaced = True backend.register(source, item, replace=True) else: raise RegistrationError( f"Item {item!r} is already registered" ) else: logger.info("Registering item") backend.register(source, item, replace=False) except Exception as exc: for error_handler in get_error_handlers(route_cfg.handlers): error_handler(route_cfg, item, exc) raise else: for post_handler in get_post_handlers(route_cfg.handlers): post_handler(route_cfg, item) logger.info( f"Successfully {'replaced' if replaced else 'registered'}" )
[docs]def deregister( route_cfg: RouteConfig, source_cfgs: List[SourceConfig], value: str, use_id: bool = False, ): """Handles the deregistration of a specific item. Arguments: route_cfg (RouteConfig): the used route configuration source_cfgs (List[SourceConfig]): the source configs value (str): the raw value to be parsed or the identifier if ``use_id`` is used. use_id (bool): to deregister using the identifier, or a parsed item. """ route_cls = import_by_path(route_cfg.path) route: Route = route_cls(route_cfg) # parse the actual item to be processed if use_id: item = value logger.info(f"Handling deregistration of identifier '{item}'.") else: item = route.parse(value) logger.info(f"Handling deregistration for '{item!r}'.") # determine the source to be used with that item source = route.get_source(source_cfgs, item) with structlog.contextvars.bound_contextvars(item=item): for pre_handler in get_pre_handlers(route_cfg.handlers): pre_handler(route_cfg, item) try: for backend in get_backends(route_cfg.backends): if use_id: backend.deregister_identifier(item) else: backend.deregister(source, item) except Exception as exc: for error_handler in get_error_handlers(route_cfg.handlers): error_handler(route_cfg, item, exc) raise else: for post_handler in get_post_handlers(route_cfg.handlers): post_handler(route_cfg, item) logger.info("Successfully deregistered")
def _instantiate_handlers(handler_configs: List[HandlerConfig]): """Helper to get an arbitrary handler""" return [ import_by_path(handler_config.path)( *handler_config.args, **handler_config.kwargs, ) for handler_config in handler_configs ]
[docs]def get_pre_handlers(config: HandlersConfig) -> List[Callable]: """Instantiates pre error handlers.""" return _instantiate_handlers(config.pre)
[docs]def get_post_handlers(config: HandlersConfig) -> List[Callable]: """Instantiates post error handlers.""" return _instantiate_handlers(config.post)
[docs]def get_error_handlers(config: HandlersConfig) -> List[Callable]: """Instantiates error error handlers.""" return _instantiate_handlers(config.error)