from os.path import join
from typing import Dict, Iterator, Optional
from structlog import get_logger
from vs_common.filesystem import (
FilesystemConfig,
authentication_environment,
)
from vs_common.stac import Catalog
from ..resource import FileScheme
from ..model import ResourceConfig
LOGGER = get_logger(__name__)
[docs]class STACCatalogScheme(FileScheme):
"""FileScheme for STAC catalogs. Recurses into sub-catalogs and harvests all
items it finds along the way.
Args:
filesystem_config (AbstractFileSystem): filesystem to search
root_path (str): path to perform and recurse for searching
deduplicate (bool, optional): Whether to deduplicate. Defaults to False.
"""
def __init__(
self,
filesystem_config: FilesystemConfig,
root_path: str,
collection_id: Optional[str] = None,
deduplicate: bool = False,
):
super().__init__(filesystem_config, root_path)
self.collection_id = collection_id
self.deduplicate = deduplicate
[docs] def harvest(self) -> Iterator[dict]:
with authentication_environment(self.filesystem_config):
LOGGER.info("Starting STACCatalog harvesting...")
catalog = Catalog.from_file(join(self.root_path, "catalog.json"))
if self.collection_id:
collection = next(
(
collection
for collection in catalog.get_all_collections()
if collection.id == self.collection_id
)
)
yield from (i.to_dict() for i in collection.get_all_items())
else:
yield from (i.to_dict() for i in catalog.get_all_items())
[docs]def create_staccatalogscheme(
resource_config: ResourceConfig, filesystem_configs: Dict[str, FilesystemConfig]
) -> FileScheme:
staccatalog_config = resource_config.staccatalog
if not staccatalog_config:
raise TypeError("ProcessorConfig.staccatalog cannot be None")
filesystem_config = filesystem_configs[staccatalog_config.filesystem]
return STACCatalogScheme(
filesystem_config=filesystem_config,
root_path=staccatalog_config.root_path,
collection_id=staccatalog_config.collection_id,
deduplicate=staccatalog_config.deduplicate,
)