"""
daemon.py
===========
Contains the daemon functions for the registrar
"""
from typing import Dict
import structlog
import redis
from .registrar import register, deregister
from .config import RegistrarConfig, RouteConfig, RouteMode
logger = structlog.getLogger(__name__)
[docs]def run_daemon(config: RegistrarConfig):
"""Run the registration daemon
Arguments:
config (RegistarConfig): the root configuration.
"""
client = redis.Redis(
host=config.redis_host or "redis",
port=config.redis_port or 6379,
charset="utf-8",
decode_responses=True
)
queue_to_route: Dict[str, RouteConfig] = {
route_cfg.queue: route_cfg
for route_cfg in config.routes.values()
}
queue_names = list(queue_to_route.keys())
logger.debug(
f"waiting on items on redis "
f"queue{'s' if len(queue_names) > 1 else ''}",
queues=queue_names,
)
while True:
# fetch an item from the queue to be handled
queue, value = client.brpop(queue_names)
logger.debug(f"{queue}: {value}")
try:
route_cfg = queue_to_route[queue]
if route_cfg.mode == RouteMode.REGISTER:
register(route_cfg, config.sources, value)
elif route_cfg.mode == RouteMode.DEREGISTER:
deregister(route_cfg, config.sources, value)
elif route_cfg.mode == RouteMode.DEREGISTER_IDENTIFIER:
deregister(route_cfg, config.sources, value, use_id=True)
if route_cfg.success_queue is not None:
client.lpush(route_cfg.success_queue, value)
except Exception as exc: # pylint: disable=W0703
logger.exception(exc)
if route_cfg.error_queue is not None:
client.lpush(route_cfg.error_queue, value)