from typing import Iterator, Dict, Protocol
from redis import Redis
from json import dumps
import click
from .model import HarvesterAppConfig, OutputType
[docs]class OutputHandler(Protocol):
[docs] def handle(self, data: Iterator[Dict]) -> None:
...
[docs]class QueueHandler:
def __init__(self, queue: str, client: Redis) -> None:
self.queue = queue
self.client = client
[docs] def handle(self, data: Iterator[Dict]) -> None:
for d in data:
self.client.lpush(self.queue, dumps(d, default=str))
[docs]def create_queue_handler(
config: HarvesterAppConfig, harvester_name: str
) -> OutputHandler:
harvest_config = config.harvesters[harvester_name]
queue = harvest_config.queue
client = Redis(config.redis.host, config.redis.port)
if not queue:
raise ValueError("HarvestConfig.queue cannot be None when output type is queue")
return QueueHandler(queue=queue, client=client)
[docs]class ConsoleHandler:
[docs] def handle(self, data: Iterator[Dict]) -> None:
for d in data:
click.echo(dumps(d, default=str))
[docs]def create_console_handler(
config: HarvesterAppConfig, harvester_name: str
) -> OutputHandler:
return ConsoleHandler()
OUTPUTS = {
OutputType.console: create_console_handler,
OutputType.queue: create_queue_handler,
}
[docs]def get_output_handler(
config: HarvesterAppConfig, harvester_name: str
) -> OutputHandler:
output_type = OutputType(config.harvesters[harvester_name].output)
output_handler_f = OUTPUTS[output_type]
return output_handler_f(config, harvester_name)