"""
config.py
============
Contains configuration functions for the registrar
"""
from dataclasses import dataclass, field
from enum import Enum
import os
from typing import Any, Dict, List, Optional, TextIO
import re
from os.path import join, dirname
import yaml
import jsonschema
ENV_PATTERN = re.compile(r".*?\${(\w+)}.*?")
[docs]def constructor_env_variables(loader: yaml.Loader, node):
"""Extracts the environment variable from the node's value
Args:
loader (yaml.Loader): the yaml loader
node (str): the current node in the yaml
Returns:
str: the parsed string that contains the value of the environment
variable
"""
value = loader.construct_scalar(node)
if isinstance(value, str):
match = ENV_PATTERN.findall(value) # to find all env variables in line
if match:
full_value: str = value
for group in match:
env_variable = os.environ.get(group)
if env_variable is not None:
full_value = full_value.replace(
f"${{{group}}}", env_variable
) # type: ignore # noqa: E501
else:
return None
return full_value
return value
[docs]def load_config(input_file: TextIO) -> dict:
"""Attempts to load config from yaml file
Args:
input_file (TextIO): the file to be read
Returns:
dict: dictionary containing configuration values
"""
tag = "!env"
loader = yaml.SafeLoader
# the tag will be used to mark where to start searching for the pattern
# e.g. somekey: !env somestring${MYENVVAR}blah blah blah
loader.add_implicit_resolver(tag, ENV_PATTERN, None)
loader.add_constructor(tag, constructor_env_variables)
return yaml.load(input_file, Loader=loader)
[docs]def validate_config(config):
"""Validate against the config-schema.yaml"""
with open(join(dirname(__file__), "config-schema.yaml")) as f:
schema = yaml.load(f)
jsonschema.validate(config, schema)
[docs]@dataclass(eq=True)
class BackendConfig:
"""Configuration for a specific backend.
Attributes:
path (str): the dotpath to the implementing class
args (dict): any arguments to be passed to the class when
instantiating it
kwargs (dict): any keyword arguments to be passed to the class when
instantiating it
"""
path: str
args: List[Any] = field(default_factory=list)
kwargs: Dict[str, Any] = field(default_factory=dict)
[docs]@dataclass(eq=True)
class HandlerConfig:
"""A handler configuration
Attributes:
path (str): the dotpath of the handler class
args (List[Any]): any handler arguments
kwargs (dict): any handler keyword-arguments
"""
path: str
args: List[Any] = field(default_factory=list)
kwargs: Dict[str, Any] = field(default_factory=dict)
[docs]@dataclass(eq=True)
class HandlersConfig:
"""Pre-/success-/error-handlers for a given route.
Attributes:
pre (List[HandlerConfig]): the pre-handlers configuration
post (List[HandlerConfig]): the post-handlers configuration
error (List[HandlerConfig]): the error-handlers configuration
"""
pre: List[HandlerConfig] = field(default_factory=list)
post: List[HandlerConfig] = field(default_factory=list)
error: List[HandlerConfig] = field(default_factory=list)
[docs] @classmethod
def from_dict(cls, values: dict) -> "HandlersConfig":
"""Constructs a handler config from a dict"""
return cls(
pre=[
HandlerConfig(**handler_values)
for handler_values in values.pop("pre", [])
],
post=[
HandlerConfig(**handler_values)
for handler_values in values.pop("post", [])
],
error=[
HandlerConfig(**handler_values)
for handler_values in values.pop("error", [])
],
)
[docs]class RouteMode(Enum):
"""The route mode."""
REGISTER = "REGISTER"
DEREGISTER = "DEREGISTER"
DEREGISTER_IDENTIFIER = "DEREGISTER_IDENTIFIER"
[docs]@dataclass(eq=True)
class RouteConfig:
"""A registration route configuration
Attributes:
path (str): the dotpath to the implementing class
queue (str): the queue for this route to listen on
mode: (RouteMode, optional): the default mode to use for this route
success_queue (str, optional): the queue to put successfully registered
items on
error_queue (str, optional): the queue to put the items on upon eror
replace (bool, optional): whether replacement of items is allowed
backends (List[BackendConfig], optional): all associated backends for
that route
handlers (HandlerConfig): any handlers associated
args (list, optional): any arguments to supply to the implementing
class
kwargs (list, optional): any keyword arguments to supply to the
implementing class
"""
path: str
queue: str
mode: RouteMode = RouteMode.REGISTER
success_queue: Optional[str] = None
error_queue: Optional[str] = None
replace: bool = False
simplify_footprint_tolerance: Optional[float] = None
backends: List[BackendConfig] = field(default_factory=list)
handlers: HandlersConfig = field(default_factory=HandlersConfig)
args: List[Any] = field(default_factory=list)
kwargs: Dict[str, Any] = field(default_factory=dict)
[docs] @classmethod
def from_dict(cls, values: dict) -> "RouteConfig":
"""Parses a RouteConfig from a dictionary"""
return cls(
handlers=HandlersConfig.from_dict(values.pop("handlers", {})),
backends=[
BackendConfig(**backend_cfg)
for backend_cfg in values.pop("backends")
],
mode=RouteMode(
values.pop("default_mode", "REGISTER").upper()
),
**values,
)
[docs]@dataclass(eq=True)
class SourceConfig:
"""The configuration of a specific remote source.
Attributes:
type (str): the type name of the source
name (str): the given name of the source
filter (str, optional): a filter to see if a path is relative to this
source
args (list): additional options for the implementing source class
constructor
kwargs (dict): additional options for the implementing source class
constructor
"""
type: str
name: str
filter: Optional[str] = None
args: List[Any] = field(default_factory=list)
kwargs: Dict[str, str] = field(default_factory=dict)
[docs]@dataclass(eq=True)
class RegistrarConfig:
"""The root registration configuration object.
Attributes:
routes (Dict[str, RouteConfig]): all routes
sources (List[SourceConfig]): the sources
"""
routes: Dict[str, RouteConfig]
sources: List[SourceConfig]
redis_host: Optional[str] = "redis"
redis_port: Optional[int] = 6379
[docs] def get_route(self, queue: str) -> RouteConfig:
"""Returns the RouteConfig for the given queue name"""
for route in self.routes.values():
if route.queue == queue:
return route
raise KeyError(queue)
[docs] @classmethod
def from_file(
cls, input_file: TextIO, validate: bool = False
) -> "RegistrarConfig":
"""Parses a RegistrarConfig from a file"""
return cls.from_dict(load_config(input_file), validate)
[docs] @classmethod
def from_dict(
cls, values: dict, validate: bool = False
) -> "RegistrarConfig":
"""Parses a RegistrarConfig from a dictionary"""
if validate:
validate_config(values)
return cls(
{
name: RouteConfig.from_dict(route_cfg)
for name, route_cfg in values.pop("routes").items()
},
[
SourceConfig(**source_cfg)
for source_cfg in values.pop("sources", [])
],
**values,
)