Source code for vs_common.stac

"""
stac.py
==============
This module contains functions for STAC concepts.
"""

import json
from typing import Dict, Optional, Any
from datetime import datetime

import fsspec
from pystac import (
    Item,
    Asset,
    Catalog,
    StacIO,
    CatalogType,
    Collection,
    Extent,
    SpatialExtent,
    TemporalExtent,
)
from pystac.link import HREF


from .model import CatalogConfig


[docs] def get_item(json_item_or_path: str) -> Optional[Item]: """Get item from json string or from file Args: item (str): JSON encoded stac item or path to a file Returns: Item: pystac.Item resolved item """ resolved: Optional[Item] try: data = json.loads(json_item_or_path) resolved = Item.from_dict(data) except json.decoder.JSONDecodeError: try: resolved = Item.from_file(json_item_or_path) except (FileNotFoundError, OSError): resolved = None return resolved
[docs] def create_item( old_item: Item, metadata: Dict, assets: Dict[str, Asset], overwrite_metadata: bool = True, ) -> Item: """Create a new item from an existing item Args: old_item (Item): Existing old item metadata (Dict): Metadata to add to properties assets (Dict[str, Asset]): Assets to plug in overwrite_metadata: (bool): Attempt to overwrite metadata. If set to False, sets these as None. Defaults to True. Returns: Item: A new item ready for upload """ id = old_item.id if overwrite_metadata: geometry = old_item.geometry or metadata["geometry"] bbox = old_item.bbox or metadata["bbox"] datetime = old_item.datetime or metadata["datetime"] else: geometry = None bbox = None datetime = None collection = old_item.collection_id or metadata["collection"] stac_extensions = old_item.stac_extensions properties = {**metadata, **old_item.properties} item = Item( id=id, geometry=geometry, bbox=bbox, datetime=datetime, collection=collection, stac_extensions=stac_extensions, properties=properties, ) for key, asset in assets.items(): item.add_asset(key, asset) return item
[docs] def encode_item(output: Item) -> str: """Encodes item as json Args: output (Item): Item to encode Returns: str: JSON encoded item """ return json.dumps(output.to_dict())
[docs] class FSSpecStacIO(StacIO): """ Extension of StacIO to allow working with different filesystems using fsspec. """
[docs] def write_text(self, dest: HREF, txt: str, *args: Any, **kwargs: Any) -> None: if fs := kwargs.get("filesystem"): with fs.open(dest, "w", *args, **kwargs) as f: f.write(txt) else: with fsspec.open(dest, "w", *args, **kwargs) as f: f.write(txt)
[docs] def read_text(self, source: HREF, *args: Any, **kwargs: Any) -> str: if fs := kwargs.get("filesystem"): with fs.open(source, "r", *args, **kwargs) as f: return f.read() else: with fsspec.open(source, "r", *args, **kwargs) as f: return f.read()
[docs] def get_or_create_catalog( catalog_path: str, catalog_config: CatalogConfig = CatalogConfig(), ) -> Catalog: """Retrieves or creates a new stac catalog from given filesystem config parameters Args: catalog_path (str): Path to STAC catalog. By default sits at the root of each \ filesystem with file named catalog.json. Example: s3://bucket/catalog.json filesystem_config (FilesystemConfig): Filesystem configuration catalog_config (CatalogConfig, optional): Configuration of the catalog. \ Defaults to CatalogConfig(). Returns: Catalog: STAC Catalog object """ try: catalog = Catalog.from_file(catalog_path) except FileNotFoundError: catalog = Catalog( catalog_config.id, catalog_config.description, catalog_config.title, catalog_type=CatalogType.ABSOLUTE_PUBLISHED, ) return catalog
[docs] def get_or_create_collection(collection_id: str, catalog: Catalog) -> Collection: """Retrieves from catalog or creates a new STAC collection Args: collection_id (str): Id of the collection catalog (Catalog): Catalog to crawl Returns: Collection: STAC Collection """ try: collection = next( col for col in catalog.get_collections() if col.id == collection_id ) except StopIteration: spatial_extent = SpatialExtent( [ [-180.0, -90.0, 180.0, 90.0], ] ) temporal_extent = TemporalExtent([[datetime.now()]]) extent = Extent(spatial=spatial_extent, temporal=temporal_extent) collection = Collection(id=collection_id, description="Sample", extent=extent) return collection
StacIO.set_default(FSSpecStacIO)