import importlib
from typing import Callable, Dict, Iterator, List
from structlog import get_logger
from harvester.model import PostprocessorConfig, PostprocessorType
LOGGER = get_logger(__name__)
PostprocessFunction = Callable[..., Dict]
[docs]def import_by_path(path: str) -> PostprocessFunction:
"""Imports the object from the referenced module.
Args:
path (str): the dotted Python path, where the last element is the
object in the referenced module.
"""
module_path, _, object_name = path.rpartition(".")
return getattr(importlib.import_module(module_path), object_name)
[docs]def merge(a: Dict, b: Dict, path=None):
"merges b into a"
if path is None:
path = []
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
merge(a[key], b[key], path + [str(key)])
elif a[key] == b[key]:
# same leaf value
pass
else:
# override existing value
a[key] = b[key]
else:
a[key] = b[key]
return a
[docs]def process_static(item: Dict, values: Dict) -> Dict:
return merge(item, values)
BUILTIN_POSTPROCESSORS = {"static": process_static}
[docs]def get_postprocessor(config: PostprocessorConfig) -> PostprocessFunction:
if config.type == PostprocessorType.builtin:
return BUILTIN_POSTPROCESSORS[config.process]
if config.type == PostprocessorType.external:
return import_by_path(config.process)
raise ValueError(f"Couldn't handle {config}")
[docs]def apply_postprocessing(
items: Iterator[dict], postprocessors: List[PostprocessorConfig]
) -> Iterator[dict]:
"""Wrapper to correctly handle errors in postprocessing.
Args:
items (Iterator[dict]): Items to apply postprocessing to
postprocessors (List[PostprocessorConfig]): List of postprocess configurations
Yields:
Iterator[dict]: Items with postprocessing applied to them
"""
for item in items:
for postprocessor_config in postprocessors:
postprocessor_f = get_postprocessor(postprocessor_config)
try:
item = postprocessor_f(item, **postprocessor_config.kwargs)
except Exception as e:
LOGGER.error("Failed to apply postprocessor, skipping", item=item)
LOGGER.exception(e)
break
else:
yield item