Source code for harvester.postprocess

import importlib
from typing import Callable, Dict, Iterator, List

from structlog import get_logger

from harvester.model import PostprocessorConfig, PostprocessorType

LOGGER = get_logger(__name__)
PostprocessFunction = Callable[..., Dict]


[docs]def import_by_path(path: str) -> PostprocessFunction: """Imports the object from the referenced module. Args: path (str): the dotted Python path, where the last element is the object in the referenced module. """ module_path, _, object_name = path.rpartition(".") return getattr(importlib.import_module(module_path), object_name)
[docs]def merge(a: Dict, b: Dict, path=None): "merges b into a" if path is None: path = [] for key in b: if key in a: if isinstance(a[key], dict) and isinstance(b[key], dict): merge(a[key], b[key], path + [str(key)]) elif a[key] == b[key]: # same leaf value pass else: # override existing value a[key] = b[key] else: a[key] = b[key] return a
[docs]def process_static(item: Dict, values: Dict) -> Dict: return merge(item, values)
BUILTIN_POSTPROCESSORS = {"static": process_static}
[docs]def get_postprocessor(config: PostprocessorConfig) -> PostprocessFunction: if config.type == PostprocessorType.builtin: return BUILTIN_POSTPROCESSORS[config.process] if config.type == PostprocessorType.external: return import_by_path(config.process) raise ValueError(f"Couldn't handle {config}")
[docs]def apply_postprocessing( items: Iterator[dict], postprocessors: List[PostprocessorConfig] ) -> Iterator[dict]: """Wrapper to correctly handle errors in postprocessing. Args: items (Iterator[dict]): Items to apply postprocessing to postprocessors (List[PostprocessorConfig]): List of postprocess configurations Yields: Iterator[dict]: Items with postprocessing applied to them """ for item in items: for postprocessor_config in postprocessors: postprocessor_f = get_postprocessor(postprocessor_config) try: item = postprocessor_f(item, **postprocessor_config.kwargs) except Exception as e: LOGGER.error("Failed to apply postprocessor, skipping", item=item) LOGGER.exception(e) break else: yield item