Source code for registrar.config

"""
config.py
============

Contains configuration functions for the registrar

"""

from dataclasses import dataclass, field
from enum import Enum
import os
from typing import Any, Dict, List, Optional, TextIO
import re
from os.path import join, dirname

import yaml
import jsonschema


ENV_PATTERN = re.compile(r".*?\${(\w+)}.*?")


[docs]def constructor_env_variables(loader: yaml.Loader, node): """Extracts the environment variable from the node's value Args: loader (yaml.Loader): the yaml loader node (str): the current node in the yaml Returns: str: the parsed string that contains the value of the environment variable """ value = loader.construct_scalar(node) if isinstance(value, str): match = ENV_PATTERN.findall(value) # to find all env variables in line if match: full_value: str = value for group in match: env_variable = os.environ.get(group) if env_variable is not None: full_value = full_value.replace( f"${{{group}}}", env_variable ) # type: ignore # noqa: E501 else: return None return full_value return value
[docs]def load_config(input_file: TextIO) -> dict: """Attempts to load config from yaml file Args: input_file (TextIO): the file to be read Returns: dict: dictionary containing configuration values """ tag = "!env" loader = yaml.SafeLoader # the tag will be used to mark where to start searching for the pattern # e.g. somekey: !env somestring${MYENVVAR}blah blah blah loader.add_implicit_resolver(tag, ENV_PATTERN, None) loader.add_constructor(tag, constructor_env_variables) return yaml.load(input_file, Loader=loader)
[docs]def validate_config(config): """Validate against the config-schema.yaml""" with open(join(dirname(__file__), "config-schema.yaml")) as f: schema = yaml.load(f) jsonschema.validate(config, schema)
[docs]@dataclass(eq=True) class BackendConfig: """Configuration for a specific backend. Attributes: path (str): the dotpath to the implementing class args (dict): any arguments to be passed to the class when instantiating it kwargs (dict): any keyword arguments to be passed to the class when instantiating it """ path: str args: List[Any] = field(default_factory=list) kwargs: Dict[str, Any] = field(default_factory=dict)
[docs]@dataclass(eq=True) class HandlerConfig: """A handler configuration Attributes: path (str): the dotpath of the handler class args (List[Any]): any handler arguments kwargs (dict): any handler keyword-arguments """ path: str args: List[Any] = field(default_factory=list) kwargs: Dict[str, Any] = field(default_factory=dict)
[docs]@dataclass(eq=True) class HandlersConfig: """Pre-/success-/error-handlers for a given route. Attributes: pre (List[HandlerConfig]): the pre-handlers configuration post (List[HandlerConfig]): the post-handlers configuration error (List[HandlerConfig]): the error-handlers configuration """ pre: List[HandlerConfig] = field(default_factory=list) post: List[HandlerConfig] = field(default_factory=list) error: List[HandlerConfig] = field(default_factory=list)
[docs] @classmethod def from_dict(cls, values: dict) -> "HandlersConfig": """Constructs a handler config from a dict""" return cls( pre=[ HandlerConfig(**handler_values) for handler_values in values.pop("pre", []) ], post=[ HandlerConfig(**handler_values) for handler_values in values.pop("post", []) ], error=[ HandlerConfig(**handler_values) for handler_values in values.pop("error", []) ], )
[docs]class RouteMode(Enum): """The route mode.""" REGISTER = "REGISTER" DEREGISTER = "DEREGISTER" DEREGISTER_IDENTIFIER = "DEREGISTER_IDENTIFIER"
[docs]@dataclass(eq=True) class RouteConfig: """A registration route configuration Attributes: path (str): the dotpath to the implementing class queue (str): the queue for this route to listen on mode: (RouteMode, optional): the default mode to use for this route success_queue (str, optional): the queue to put successfully registered items on error_queue (str, optional): the queue to put the items on upon eror replace (bool, optional): whether replacement of items is allowed backends (List[BackendConfig], optional): all associated backends for that route handlers (HandlerConfig): any handlers associated args (list, optional): any arguments to supply to the implementing class kwargs (list, optional): any keyword arguments to supply to the implementing class """ path: str queue: str mode: RouteMode = RouteMode.REGISTER success_queue: Optional[str] = None error_queue: Optional[str] = None replace: bool = False simplify_footprint_tolerance: Optional[float] = None backends: List[BackendConfig] = field(default_factory=list) handlers: HandlersConfig = field(default_factory=HandlersConfig) args: List[Any] = field(default_factory=list) kwargs: Dict[str, Any] = field(default_factory=dict)
[docs] @classmethod def from_dict(cls, values: dict) -> "RouteConfig": """Parses a RouteConfig from a dictionary""" return cls( handlers=HandlersConfig.from_dict(values.pop("handlers", {})), backends=[ BackendConfig(**backend_cfg) for backend_cfg in values.pop("backends") ], mode=RouteMode( values.pop("default_mode", "REGISTER").upper() ), **values, )
[docs]@dataclass(eq=True) class SourceConfig: """The configuration of a specific remote source. Attributes: type (str): the type name of the source name (str): the given name of the source filter (str, optional): a filter to see if a path is relative to this source args (list): additional options for the implementing source class constructor kwargs (dict): additional options for the implementing source class constructor """ type: str name: str filter: Optional[str] = None args: List[Any] = field(default_factory=list) kwargs: Dict[str, str] = field(default_factory=dict)
[docs]@dataclass(eq=True) class RegistrarConfig: """The root registration configuration object. Attributes: routes (Dict[str, RouteConfig]): all routes sources (List[SourceConfig]): the sources """ routes: Dict[str, RouteConfig] sources: List[SourceConfig] redis_host: Optional[str] = "redis" redis_port: Optional[int] = 6379
[docs] def get_route(self, queue: str) -> RouteConfig: """Returns the RouteConfig for the given queue name""" for route in self.routes.values(): if route.queue == queue: return route raise KeyError(queue)
[docs] @classmethod def from_file( cls, input_file: TextIO, validate: bool = False ) -> "RegistrarConfig": """Parses a RegistrarConfig from a file""" return cls.from_dict(load_config(input_file), validate)
[docs] @classmethod def from_dict( cls, values: dict, validate: bool = False ) -> "RegistrarConfig": """Parses a RegistrarConfig from a dictionary""" if validate: validate_config(values) return cls( { name: RouteConfig.from_dict(route_cfg) for name, route_cfg in values.pop("routes").items() }, [ SourceConfig(**source_cfg) for source_cfg in values.pop("sources", []) ], **values, )