harvester package

Subpackages

Submodules

harvester.app module

app.py

Contains functionality related to running the application waiting on redis messages

class harvester.app.App(config: HarvesterAppConfig, client: Redis, listen_queue: str)[source]

Bases: object

exit_gracefully(signum, frame)[source]
run() None[source]

Run the harvester daemon, listening on a redis queue for harvest jobs.

harvester.cli module

cli.py

Contains command line interface

harvester.cli.setup_logging(debug=False)[source]

harvester.exceptions module

exception harvester.exceptions.HarvestError[source]

Bases: Exception

exception harvester.exceptions.QueryError[source]

Bases: Exception

harvester.filter module

harvester.filter.cql_filter(_filter: Dict, data: Iterator[Dict], context: Dict | None = None) Iterator[Dict][source]

harvester.harvester module

harvester.harvester.init_resource(harvest_config: HarvesterConfig, filesystem_config: Dict[str, FilesystemConfig]) Resource[source]
harvester.harvester.main(config: HarvesterAppConfig, harvester_name: str)[source]

harvester.model module

class harvester.model.ATOMXMLConfig[source]

Bases: object

class harvester.model.FileMatcherConfig(root_path: str, filesystem: str, asset_regex_map: Dict[str, str], id_regex: str, datetime_regex: str)[source]

Bases: object

asset_regex_map: Dict[str, str]
datetime_regex: str
filesystem: str
id_regex: str
root_path: str
class harvester.model.FilterConfig(context: Dict = <factory>, expression: Dict = <factory>)[source]

Bases: object

context: Dict
expression: Dict
class harvester.model.FormatConfig(type: str, json: harvester.model.JSONConfig | NoneType = None, atom_xml: harvester.model.ATOMXMLConfig | NoneType = None)[source]

Bases: object

atom_xml: ATOMXMLConfig | None = None
json: JSONConfig | None = None
type: str
class harvester.model.HarvesterAppConfig(harvesters: Dict[str, harvester.model.HarvesterConfig] = <factory>, redis: harvester.model.RedisConfig = RedisConfig(host='vs-redis-master', port=6379), filesystems: Dict[str, vs_common.model.FilesystemConfig] = <factory>)[source]

Bases: object

filesystems: Dict[str, FilesystemConfig]
harvesters: Dict[str, HarvesterConfig]
redis: RedisConfig = RedisConfig(host='vs-redis-master', port=6379)
class harvester.model.HarvesterConfig(resource: harvester.model.ResourceConfig, filter: Union[harvester.model.FilterConfig, NoneType] = None, output: harvester.model.OutputType = <OutputType.queue: 'queue'>, queue: Union[str, NoneType] = None, postprocessors: Union[List[harvester.model.PostprocessorConfig], NoneType] = None)[source]

Bases: object

filter: FilterConfig | None = None
output: OutputType = 'queue'
postprocessors: List[PostprocessorConfig] | None = None
queue: str | None = None
resource: ResourceConfig
class harvester.model.JSONConfig(property_mapping: Dict[str, str])[source]

Bases: object

property_mapping: Dict[str, str]
class harvester.model.OADSConfig(url: str, use_oads_ext: bool = False)[source]

Bases: object

url: str
use_oads_ext: bool = False
class harvester.model.OpenSearchConfig(url: str, query: harvester.model.QueryConfig, format: harvester.model.FormatConfig)[source]

Bases: object

format: FormatConfig
query: QueryConfig
url: str
class harvester.model.OutputType(value)[source]

Bases: str, Enum

An enumeration.

console = 'console'
queue = 'queue'
class harvester.model.PostprocessorConfig(type: harvester.model.PostprocessorType, process: str, kwargs: Dict[str, Any] = <factory>)[source]

Bases: object

kwargs: Dict[str, Any]
process: str
type: PostprocessorType
class harvester.model.PostprocessorType(value)[source]

Bases: str, Enum

An enumeration.

builtin = 'builtin'
external = 'external'
class harvester.model.QueryConfig(time: harvester.model.TimeConfig, bbox: str, collection: Union[str, NoneType] = None, extra_params: Dict[str, str] = <factory>)[source]

Bases: object

bbox: str
collection: str | None = None
extra_params: Dict[str, str]
time: TimeConfig
class harvester.model.RedisConfig(host: str = 'vs-redis-master', port: int = 6379)[source]

Bases: object

host: str = 'vs-redis-master'
port: int = 6379
class harvester.model.ResourceConfig(type: harvester.model.ResourceType, stacapi: harvester.model.STACAPIConfig | NoneType = None, staccatalog: harvester.model.STACCatalogConfig | NoneType = None, filematcher: harvester.model.FileMatcherConfig | NoneType = None, oads: harvester.model.OADSConfig | NoneType = None, opensearch: harvester.model.OpenSearchConfig | NoneType = None)[source]

Bases: object

filematcher: FileMatcherConfig | None = None
oads: OADSConfig | None = None
opensearch: OpenSearchConfig | None = None
stacapi: STACAPIConfig | None = None
staccatalog: STACCatalogConfig | None = None
type: ResourceType
class harvester.model.ResourceType(value)[source]

Bases: str, Enum

An enumeration.

FileMatcher = 'filematcher'
OADS = 'oads'
OpenSearch = 'opensearch'
STACAPI = 'stacapi'
STACCatalog = 'staccatalog'
class harvester.model.STACAPIConfig(url: str, query: harvester.model.QueryConfig)[source]

Bases: object

query: QueryConfig
url: str
class harvester.model.STACCatalogConfig(root_path: str, filesystem: str, collection_id: str | NoneType = None, deduplicate: bool = False)[source]

Bases: object

collection_id: str | None = None
deduplicate: bool = False
filesystem: str
root_path: str
class harvester.model.TimeConfig(begin: str = '2023-05-10T11:32:48.827412', end: str = '2023-05-10T11:32:48.827420')[source]

Bases: object

begin: str = '2023-05-10T11:32:48.827412'
end: str = '2023-05-10T11:32:48.827420'
harvester.model.default_filesystem()[source]

harvester.output module

class harvester.output.ConsoleHandler[source]

Bases: object

handle(data: Iterator[Dict]) None[source]
class harvester.output.OutputHandler(*args, **kwargs)[source]

Bases: Protocol

handle(data: Iterator[Dict]) None[source]
class harvester.output.QueueHandler(queue: str, client: Redis)[source]

Bases: object

handle(data: Iterator[Dict]) None[source]
harvester.output.create_console_handler(config: HarvesterAppConfig, harvester_name: str) OutputHandler[source]
harvester.output.create_queue_handler(config: HarvesterAppConfig, harvester_name: str) OutputHandler[source]
harvester.output.get_output_handler(config: HarvesterAppConfig, harvester_name: str) OutputHandler[source]

harvester.postprocess module

harvester.postprocess.apply_postprocessing(items: Iterator[dict], postprocessors: List[PostprocessorConfig]) Iterator[dict][source]

Wrapper to correctly handle errors in postprocessing.

Parameters:
  • items (Iterator[dict]) – Items to apply postprocessing to

  • postprocessors (List[PostprocessorConfig]) – List of postprocess configurations

Yields:

Iterator[dict] – Items with postprocessing applied to them

harvester.postprocess.get_postprocessor(config: PostprocessorConfig) Callable[[...], Dict][source]
harvester.postprocess.import_by_path(path: str) Callable[[...], Dict][source]

Imports the object from the referenced module.

Parameters:
  • path (str) – the dotted Python path, where the last element is the

  • module. (object in the referenced) –

harvester.postprocess.merge(a: Dict, b: Dict, path=None)[source]

merges b into a

harvester.postprocess.process_static(item: Dict, values: Dict) Dict[source]

harvester.resource module

class harvester.resource.Endpoint(url: str)[source]

Bases: Resource

Endpoints are resources that use a search protocol (or something similar) to harvest items. Thus, they are always associated with a specific URL.

class harvester.resource.FileScheme(filesystem_config: FilesystemConfig, root_path: str)[source]

Bases: Resource

FileSchemes are resources that operate on a file basis on a given file source.

class harvester.resource.Resource[source]

Bases: ABC

Represents online resource such as an endpoint (API…) or data source (S3/swift…) that provides data or metadata.

abstract harvest() Iterator[dict][source]

Starts the harvesting of the resource, returning an iterator of the harvested items.

Module contents