Source code for harvester.filescheme.filematcher

import re
from os.path import join, dirname
from typing import Iterator, Dict

import pystac
from dateutil.parser import isoparse
from structlog import get_logger
from vs_common.filesystem import FilesystemConfig

from ..resource import FileScheme
from ..model import ResourceConfig


LOGGER = get_logger(__name__)


[docs]class FileMatcherScheme(FileScheme): def __init__( self, filesystem_config: FilesystemConfig, root_path: str, asset_regex_map: Dict[str, str], id_regex: str, datetime_regex: str, ): super().__init__(filesystem_config, root_path) self.asset_regex_map = asset_regex_map self.id_regex = id_regex self.datetime_regex = datetime_regex
[docs] def harvest(self) -> Iterator[dict]: LOGGER.info("Starting FileMatcher harvesting...") found_assets: Dict[str, str] = {} for root, _, files in self.filesystem.walk(self.root_path): for file in files: full_path = join(root, file) for asset, regex in self.asset_regex_map.items(): if re.search(regex, full_path): found_assets[asset] = full_path base_path = dirname(full_path) break if found_assets and len(found_assets) == len(self.asset_regex_map): yield self._create_item(found_assets).to_dict() found_assets = {} continue if ( found_assets and len(found_assets) != len(self.asset_regex_map) and base_path not in root ): missing = set(self.asset_regex_map).difference(set(found_assets)) LOGGER.warn("Couldn't match all assets", path=root, missing=missing)
def _create_item(self, assets: Dict[str, str]) -> pystac.Item: match_asset = list(assets.values())[0] identifier_match = re.search(self.id_regex, match_asset) dt_match = re.search(self.datetime_regex, match_asset) if not dt_match: raise ValueError( f"Couldn't match datetime {self.datetime_regex}, {match_asset}" ) dt = dt_match[0] if not identifier_match: raise ValueError( f"Could not match identifier {self.id_regex}, {match_asset}" ) identifier = identifier_match[0] item = pystac.Item( id=identifier, geometry=None, bbox=None, datetime=isoparse(dt), properties={}, ) for name, path in assets.items(): item.add_asset(name, pystac.Asset(path)) return item
[docs]def create_filematcherscheme( resource_config: ResourceConfig, filesystem_configs: Dict[str, FilesystemConfig] ) -> FileScheme: filematcher_config = resource_config.filematcher if not filematcher_config: raise TypeError("ProcessorConfig.filematcher cannot be None") filesystem_config = filesystem_configs[filematcher_config.filesystem] return FileMatcherScheme( filesystem_config=filesystem_config, root_path=filematcher_config.root_path, asset_regex_map=filematcher_config.asset_regex_map, id_regex=filematcher_config.id_regex, datetime_regex=filematcher_config.datetime_regex, )