"""
stac.py
==============
This module contains functions for STAC concepts.
"""
import json
from typing import Dict, Optional, Any
from datetime import datetime
import fsspec
from pystac import (
Item,
Asset,
Catalog,
StacIO,
CatalogType,
Collection,
Extent,
SpatialExtent,
TemporalExtent,
)
from pystac.link import HREF
from .model import CatalogConfig
[docs]
def get_item(json_item_or_path: str) -> Optional[Item]:
"""Get item from json string or from file
Args:
item (str): JSON encoded stac item or path to a file
Returns:
Item: pystac.Item resolved item
"""
resolved: Optional[Item]
try:
data = json.loads(json_item_or_path)
resolved = Item.from_dict(data)
except json.decoder.JSONDecodeError:
try:
resolved = Item.from_file(json_item_or_path)
except (FileNotFoundError, OSError):
resolved = None
return resolved
[docs]
def create_item(
old_item: Item,
metadata: Dict,
assets: Dict[str, Asset],
overwrite_metadata: bool = True,
) -> Item:
"""Create a new item from an existing item
Args:
old_item (Item): Existing old item
metadata (Dict): Metadata to add to properties
assets (Dict[str, Asset]): Assets to plug in
overwrite_metadata: (bool): Attempt to overwrite metadata. If set to False,
sets these as None. Defaults to True.
Returns:
Item: A new item ready for upload
"""
id = old_item.id
if overwrite_metadata:
geometry = old_item.geometry or metadata["geometry"]
bbox = old_item.bbox or metadata["bbox"]
datetime = old_item.datetime or metadata["datetime"]
else:
geometry = None
bbox = None
datetime = None
collection = old_item.collection_id or metadata["collection"]
stac_extensions = old_item.stac_extensions
properties = {**metadata, **old_item.properties}
item = Item(
id=id,
geometry=geometry,
bbox=bbox,
datetime=datetime,
collection=collection,
stac_extensions=stac_extensions,
properties=properties,
)
for key, asset in assets.items():
item.add_asset(key, asset)
return item
[docs]
def encode_item(output: Item) -> str:
"""Encodes item as json
Args:
output (Item): Item to encode
Returns:
str: JSON encoded item
"""
return json.dumps(output.to_dict())
[docs]
class FSSpecStacIO(StacIO):
"""
Extension of StacIO to allow working with different filesystems using fsspec.
"""
[docs]
def write_text(self, dest: HREF, txt: str, *args: Any, **kwargs: Any) -> None:
if fs := kwargs.get("filesystem"):
with fs.open(dest, "w", *args, **kwargs) as f:
f.write(txt)
else:
with fsspec.open(dest, "w", *args, **kwargs) as f:
f.write(txt)
[docs]
def read_text(self, source: HREF, *args: Any, **kwargs: Any) -> str:
if fs := kwargs.get("filesystem"):
with fs.open(source, "r", *args, **kwargs) as f:
return f.read()
else:
with fsspec.open(source, "r", *args, **kwargs) as f:
return f.read()
[docs]
def get_or_create_catalog(
catalog_path: str,
catalog_config: CatalogConfig = CatalogConfig(),
) -> Catalog:
"""Retrieves or creates a new stac catalog from given filesystem config parameters
Args:
catalog_path (str): Path to STAC catalog. By default sits at the root of each \
filesystem with file named catalog.json. Example: s3://bucket/catalog.json
filesystem_config (FilesystemConfig): Filesystem configuration
catalog_config (CatalogConfig, optional): Configuration of the catalog. \
Defaults to CatalogConfig().
Returns:
Catalog: STAC Catalog object
"""
try:
catalog = Catalog.from_file(catalog_path)
except FileNotFoundError:
catalog = Catalog(
catalog_config.id,
catalog_config.description,
catalog_config.title,
catalog_type=CatalogType.ABSOLUTE_PUBLISHED,
)
return catalog
[docs]
def get_or_create_collection(collection_id: str, catalog: Catalog) -> Collection:
"""Retrieves from catalog or creates a new STAC collection
Args:
collection_id (str): Id of the collection
catalog (Catalog): Catalog to crawl
Returns:
Collection: STAC Collection
"""
try:
collection = next(
col for col in catalog.get_collections() if col.id == collection_id
)
except StopIteration:
spatial_extent = SpatialExtent(
[
[-180.0, -90.0, 180.0, 90.0],
]
)
temporal_extent = TemporalExtent([[datetime.now()]])
extent = Extent(spatial=spatial_extent, temporal=temporal_extent)
collection = Collection(id=collection_id, description="Sample", extent=extent)
return collection
StacIO.set_default(FSSpecStacIO)