Source code for harvester.filescheme.stac_catalog

from os.path import join
from typing import Dict, Iterator, Optional

from structlog import get_logger
from vs_common.filesystem import (
    FilesystemConfig,
    authentication_environment,
)
from vs_common.stac import Catalog


from ..resource import FileScheme
from ..model import ResourceConfig


LOGGER = get_logger(__name__)


[docs]class STACCatalogScheme(FileScheme): """FileScheme for STAC catalogs. Recurses into sub-catalogs and harvests all items it finds along the way. Args: filesystem_config (AbstractFileSystem): filesystem to search root_path (str): path to perform and recurse for searching deduplicate (bool, optional): Whether to deduplicate. Defaults to False. """ def __init__( self, filesystem_config: FilesystemConfig, root_path: str, collection_id: Optional[str] = None, deduplicate: bool = False, ): super().__init__(filesystem_config, root_path) self.collection_id = collection_id self.deduplicate = deduplicate
[docs] def harvest(self) -> Iterator[dict]: with authentication_environment(self.filesystem_config): LOGGER.info("Starting STACCatalog harvesting...") catalog = Catalog.from_file(join(self.root_path, "catalog.json")) if self.collection_id: collection = next( ( collection for collection in catalog.get_all_collections() if collection.id == self.collection_id ) ) yield from (i.to_dict() for i in collection.get_all_items()) else: yield from (i.to_dict() for i in catalog.get_all_items())
[docs]def create_staccatalogscheme( resource_config: ResourceConfig, filesystem_configs: Dict[str, FilesystemConfig] ) -> FileScheme: staccatalog_config = resource_config.staccatalog if not staccatalog_config: raise TypeError("ProcessorConfig.staccatalog cannot be None") filesystem_config = filesystem_configs[staccatalog_config.filesystem] return STACCatalogScheme( filesystem_config=filesystem_config, root_path=staccatalog_config.root_path, collection_id=staccatalog_config.collection_id, deduplicate=staccatalog_config.deduplicate, )