Source code for harvester.filescheme

from typing import Optional, Callable, Dict

from fsspec import AbstractFileSystem
from vs_common.filesystem import FilesystemConfig

from .filematcher import FileMatcherScheme, create_filematcherscheme
from .stac_catalog import STACCatalogScheme, create_staccatalogscheme
from ..resource import FileScheme
from ..model import ResourceConfig, ResourceType

FileSchemeCreationFunction = Callable[[ResourceConfig, AbstractFileSystem], FileScheme]


SCHEME_MAP: Dict[ResourceType, FileSchemeCreationFunction] = {
    ResourceType.FileMatcher: create_filematcherscheme,
    ResourceType.STACCatalog: create_staccatalogscheme,
}


[docs]def get_filescheme( resource_config: ResourceConfig, filesystem_configs: Dict[str, FilesystemConfig] ) -> Optional[FileScheme]: """Retrieves filescheme from mapping if found Args: resource_config (ResourceConfig): Resource configuration Returns: Optional[FileScheme]: Initialized endpoint from SCHEME_MAP """ scheme_f = SCHEME_MAP.get(resource_config.type) if not scheme_f: return None return scheme_f(resource_config, filesystem_configs)
__all__ = [ "FileMatcherScheme", "create_filematcherscheme", "STACCatalogScheme", "create_staccatalogscheme", "get_filescheme", ]