Source code for harvester.harvester

from typing import Dict

from structlog import get_logger
from vs_common.model import FilesystemConfig

from .model import HarvesterAppConfig, HarvesterConfig
from .resource import Resource
from .endpoint import get_endpoint
from .filescheme import get_filescheme
from .exceptions import HarvestError
from .filter import cql_filter
from .postprocess import apply_postprocessing
from .output import get_output_handler


LOGGER = get_logger(__name__)


[docs]def init_resource( harvest_config: HarvesterConfig, filesystem_config: Dict[str, FilesystemConfig] ) -> Resource: resource_config = harvest_config.resource if endpoint := get_endpoint(resource_config): return endpoint if source := get_filescheme(resource_config, filesystem_config): return source raise HarvestError(f"Resource type {resource_config.type} not found")
[docs]def main(config: HarvesterAppConfig, harvester_name: str): LOGGER.info("Running harvesting...", name=harvester_name) # get the harvest config try: harvest_config = config.harvesters[harvester_name] except KeyError: LOGGER.error("Harvester not configured", name=harvester_name) return # Initialize resource resource = init_resource(harvest_config, config.filesystems) # Perform harvest result = resource.harvest() # Apply postprocessing if postprocessors := harvest_config.postprocessors: result = apply_postprocessing(result, postprocessors) # Filter data if filter_config := harvest_config.filter: # generate a context context = filter_config.context filter_ = filter_config.expression LOGGER.debug("Applying filter", filter=filter_, context=context) result = cql_filter(filter_, result, context) # Push data to output output_handler = get_output_handler(config, harvester_name) output_handler.handle(result) LOGGER.info("Finished harvesting")