Source code for harvester.output

from typing import Iterator, Dict, Protocol
from redis import Redis
from json import dumps
import click

from .model import HarvesterAppConfig, OutputType


[docs]class OutputHandler(Protocol):
[docs] def handle(self, data: Iterator[Dict]) -> None: ...
[docs]class QueueHandler: def __init__(self, queue: str, client: Redis) -> None: self.queue = queue self.client = client
[docs] def handle(self, data: Iterator[Dict]) -> None: for d in data: self.client.lpush(self.queue, dumps(d, default=str))
[docs]def create_queue_handler( config: HarvesterAppConfig, harvester_name: str ) -> OutputHandler: harvest_config = config.harvesters[harvester_name] queue = harvest_config.queue client = Redis(config.redis.host, config.redis.port) if not queue: raise ValueError("HarvestConfig.queue cannot be None when output type is queue") return QueueHandler(queue=queue, client=client)
[docs]class ConsoleHandler:
[docs] def handle(self, data: Iterator[Dict]) -> None: for d in data: click.echo(dumps(d, default=str))
[docs]def create_console_handler( config: HarvesterAppConfig, harvester_name: str ) -> OutputHandler: return ConsoleHandler()
OUTPUTS = { OutputType.console: create_console_handler, OutputType.queue: create_queue_handler, }
[docs]def get_output_handler( config: HarvesterAppConfig, harvester_name: str ) -> OutputHandler: output_type = OutputType(config.harvesters[harvester_name].output) output_handler_f = OUTPUTS[output_type] return output_handler_f(config, harvester_name)