Source code for registrar.daemon

"""
daemon.py
===========

Contains the daemon functions for the registrar

"""

from typing import Dict

import structlog
import redis

from .registrar import register, deregister
from .config import RegistrarConfig, RouteConfig, RouteMode


logger = structlog.getLogger(__name__)


[docs]def run_daemon(config: RegistrarConfig): """Run the registration daemon Arguments: config (RegistarConfig): the root configuration. """ client = redis.Redis( host=config.redis_host or "redis", port=config.redis_port or 6379, charset="utf-8", decode_responses=True ) queue_to_route: Dict[str, RouteConfig] = { route_cfg.queue: route_cfg for route_cfg in config.routes.values() } queue_names = list(queue_to_route.keys()) logger.debug( f"waiting on items on redis " f"queue{'s' if len(queue_names) > 1 else ''}", queues=queue_names, ) while True: # fetch an item from the queue to be handled queue, value = client.brpop(queue_names) logger.debug(f"{queue}: {value}") try: route_cfg = queue_to_route[queue] if route_cfg.mode == RouteMode.REGISTER: register(route_cfg, config.sources, value) elif route_cfg.mode == RouteMode.DEREGISTER: deregister(route_cfg, config.sources, value) elif route_cfg.mode == RouteMode.DEREGISTER_IDENTIFIER: deregister(route_cfg, config.sources, value, use_id=True) if route_cfg.success_queue is not None: client.lpush(route_cfg.success_queue, value) except Exception as exc: # pylint: disable=W0703 logger.exception(exc) if route_cfg.error_queue is not None: client.lpush(route_cfg.error_queue, value)