from typing import Optional, Callable, Dict
from fsspec import AbstractFileSystem
from vs_common.filesystem import FilesystemConfig
from .filematcher import FileMatcherScheme, create_filematcherscheme
from .stac_catalog import STACCatalogScheme, create_staccatalogscheme
from ..resource import FileScheme
from ..model import ResourceConfig, ResourceType
FileSchemeCreationFunction = Callable[[ResourceConfig, AbstractFileSystem], FileScheme]
SCHEME_MAP: Dict[ResourceType, FileSchemeCreationFunction] = {
ResourceType.FileMatcher: create_filematcherscheme,
ResourceType.STACCatalog: create_staccatalogscheme,
}
[docs]def get_filescheme(
resource_config: ResourceConfig, filesystem_configs: Dict[str, FilesystemConfig]
) -> Optional[FileScheme]:
"""Retrieves filescheme from mapping if found
Args:
resource_config (ResourceConfig): Resource configuration
Returns:
Optional[FileScheme]: Initialized endpoint from SCHEME_MAP
"""
scheme_f = SCHEME_MAP.get(resource_config.type)
if not scheme_f:
return None
return scheme_f(resource_config, filesystem_configs)
__all__ = [
"FileMatcherScheme",
"create_filematcherscheme",
"STACCatalogScheme",
"create_staccatalogscheme",
"get_filescheme",
]