import re
from os.path import join, dirname
from typing import Iterator, Dict
import pystac
from dateutil.parser import isoparse
from structlog import get_logger
from vs_common.filesystem import FilesystemConfig
from ..resource import FileScheme
from ..model import ResourceConfig
LOGGER = get_logger(__name__)
[docs]class FileMatcherScheme(FileScheme):
def __init__(
self,
filesystem_config: FilesystemConfig,
root_path: str,
asset_regex_map: Dict[str, str],
id_regex: str,
datetime_regex: str,
):
super().__init__(filesystem_config, root_path)
self.asset_regex_map = asset_regex_map
self.id_regex = id_regex
self.datetime_regex = datetime_regex
[docs] def harvest(self) -> Iterator[dict]:
LOGGER.info("Starting FileMatcher harvesting...")
found_assets: Dict[str, str] = {}
for root, _, files in self.filesystem.walk(self.root_path):
for file in files:
full_path = join(root, file)
for asset, regex in self.asset_regex_map.items():
if re.search(regex, full_path):
found_assets[asset] = full_path
base_path = dirname(full_path)
break
if found_assets and len(found_assets) == len(self.asset_regex_map):
yield self._create_item(found_assets).to_dict()
found_assets = {}
continue
if (
found_assets
and len(found_assets) != len(self.asset_regex_map)
and base_path not in root
):
missing = set(self.asset_regex_map).difference(set(found_assets))
LOGGER.warn("Couldn't match all assets", path=root, missing=missing)
def _create_item(self, assets: Dict[str, str]) -> pystac.Item:
match_asset = list(assets.values())[0]
identifier_match = re.search(self.id_regex, match_asset)
dt_match = re.search(self.datetime_regex, match_asset)
if not dt_match:
raise ValueError(
f"Couldn't match datetime {self.datetime_regex}, {match_asset}"
)
dt = dt_match[0]
if not identifier_match:
raise ValueError(
f"Could not match identifier {self.id_regex}, {match_asset}"
)
identifier = identifier_match[0]
item = pystac.Item(
id=identifier,
geometry=None,
bbox=None,
datetime=isoparse(dt),
properties={},
)
for name, path in assets.items():
item.add_asset(name, pystac.Asset(path))
return item
[docs]def create_filematcherscheme(
resource_config: ResourceConfig, filesystem_configs: Dict[str, FilesystemConfig]
) -> FileScheme:
filematcher_config = resource_config.filematcher
if not filematcher_config:
raise TypeError("ProcessorConfig.filematcher cannot be None")
filesystem_config = filesystem_configs[filematcher_config.filesystem]
return FileMatcherScheme(
filesystem_config=filesystem_config,
root_path=filematcher_config.root_path,
asset_regex_map=filematcher_config.asset_regex_map,
id_regex=filematcher_config.id_regex,
datetime_regex=filematcher_config.datetime_regex,
)