from .metadata import extract_metadata_for_stac
from math import floor
from pystac import Item, Asset
import json
from enum import IntEnum
from typing import Any, Dict, Optional
from .util import gdal
from .archive import filter_filenames
from .transfer import get_uploader
import structlog
logger = structlog.getLogger(__name__)
[docs]class STATS_APPROX(IntEnum):
NO_APROX = 0 # statistics and histogram from raw data and no subset
APPROX_OK = 1 # statistics and histogram from topmost overview and subset
APPROX_OVERVIEW = 2 # statistics and histogram from topmost overview, no subset
[docs]class HrefSortableAsset(Asset):
"""Helper function enabling sorting Assets by href."""
def __lt__(self, other):
return self.href < other.href
[docs]def create_stac_asset(
local_path: str,
remote_path: str,
root_config: Dict[str, Any],
asset_config: Dict[str, Any],
name: str = "",
aggregator: Dict = {},
is_image: bool = False,
compute_statistics: bool = False,
approx: STATS_APPROX = STATS_APPROX.APPROX_OK,
force_histogram_min_value: Optional[float] = None,
force_histogram_max_value: Optional[float] = None,
):
"""Helper function creating a STAC asset and filling it with
image/metadata properties based on config."""
extra_fields = asset_config.get("extra_fields", {})
if is_image:
ds = None
target_config = root_config["target"]
if "remote_reference_" not in local_path:
ds = gdal.Open(local_path)
else:
if target_config.get("type") == "swift":
uploader = get_uploader(
target_config["type"],
target_config.get("args"),
target_config.get("kwargs"),
)
container, path = uploader.validate_container(remote_path) # type: ignore
image_path = f"/vsiswift/{container}/{path}"
uploader.export_env_vars() # type: ignore
try:
ds = gdal.Open(image_path)
except Exception as e:
logger.debug("Can not open remote file %s" % image_path)
logger.debug(e)
else:
ds = gdal.Open(remote_path)
if ds:
# get number of bands
band_count = ds.RasterCount
bands = []
raster_bands = []
# get band count and accordingly add band_name mapping
# in config, save as eo:bands
for j in range(band_count):
band = ds.GetRasterBand(j + 1)
bands_d = {"name": "band%s" % (j + 1)}
# if mapping configured, extract common_name
# based on index of band in mapping
if asset_config.get("band_mapping", False):
band_common_names = asset_config.get("band_mapping", {}).get(
band_count, False
)
if band_common_names:
band_common_names = (
[band_common_names]
if isinstance(band_common_names, str)
else band_common_names
)
bands_d["common_name"] = band_common_names[j]
else:
bands_d["common_name"] = "band%s" % (j + 1)
else:
bands_d["common_name"] = "band%s" % (j + 1)
bands.append(bands_d)
raster_band_object = {}
nodata = band.GetNoDataValue()
datatype = gdal.GetDataTypeName(band.DataType)
# get statistics for all bands
if compute_statistics:
histogram_min = force_histogram_min_value
histogram_max = force_histogram_max_value
if (
approx == STATS_APPROX.APPROX_OK
or approx == STATS_APPROX.NO_APROX
):
buckets = 256
stats = band.GetStatistics(approx, True)
# allowing to override min/max of created histogram
if histogram_min is None:
histogram_min = stats[0]
if histogram_max is None:
histogram_max = stats[1]
histogram = band.GetHistogram(
min=histogram_min,
max=histogram_max,
buckets=buckets,
approx_ok=int(approx),
)
elif approx == STATS_APPROX.APPROX_OVERVIEW:
overview_count = band.GetOverviewCount()
if overview_count > 0:
overview = band.GetOverview((overview_count - 1) // 3)
else:
overview = band
stats = overview.GetStatistics(False, True)
if histogram_min is None:
histogram_min = stats[0]
if histogram_max is None:
histogram_max = stats[1]
# heuristics to ensure meaningful buckets
buckets = max(
255, (histogram_max - histogram_min) // 20
) # type: ignore
buckets = floor(min(buckets, 1500))
histogram = overview.GetHistogram(
min=histogram_min,
max=histogram_max,
buckets=buckets,
approx_ok=False,
)
else:
raise ValueError("Value STATS_APPROX not allowed %s " % approx)
raster_band_object["statistics"] = {
"minimum": stats[0],
"maximum": stats[1],
"mean": stats[2],
"stddev": stats[3],
}
raster_band_object["histogram"] = {
"count": buckets,
"min": histogram_min,
"max": histogram_max,
"buckets": histogram,
}
raster_band_object["nodata"] = nodata
raster_band_object["data_type"] = datatype.lower()
raster_bands.append(raster_band_object)
extra_fields["raster:bands"] = raster_bands
extra_fields["eo:bands"] = bands
del ds
if (
target_config.get("type") == "swift"
and "remote_reference_" in local_path
):
uploader.reset_env_vars() # type: ignore
asset = HrefSortableAsset(
href=remote_path,
title=asset_config.get("title", None),
description=asset_config.get("description", None),
media_type=asset_config.get("media_type", None),
roles=asset_config.get("roles", []),
extra_fields=extra_fields,
)
# add to intermediate asset aggregator with object counts
if aggregator.get(name) is not None:
# add counter to asset name
aggregator[name]["assets"].append(asset)
aggregator[name]["count"] += 1
else:
aggregator[name] = {}
aggregator[name]["assets"] = [asset]
aggregator[name]["count"] = 1
[docs]def create_simple_stac_item(
preprocessor_config: dict,
root_config: dict,
upload_files: Dict[str, str],
extra_files: Dict[str, str],
product_type: str,
product_level: Optional[str],
):
"""Temporary method creating a minimal STAC item from information about products
uploaded and metadata files uploaded. Accepts: 'upload_files' dictionary of
upload_files (images), where key is local path and value is remote path.
'extra_files' dictionary of extra_files (sidecar or metadata), where key is
local path and value is remote path. Assuming metadata file to read and create a STAC
info from is first to pick by iterator.
"""
# get relevant metadata from GSC
metadata = extract_metadata_for_stac(extra_files, product_type, product_level)
stac_item_structure = preprocessor_config.get("stac_item_structure", {})
stac_item_properties = {
**metadata["properties"],
**stac_item_structure.get("properties", {}),
}
stac_item_extra_fields = {
**metadata["extra_fields"],
**stac_item_structure.get("extra_fields", {}),
}
stac_item = Item(
id=metadata["id"],
geometry=metadata["geometry"],
bbox=metadata["bbox"],
datetime=metadata["datetime"],
properties=stac_item_properties,
extra_fields=stac_item_extra_fields,
)
asset_aggregator: Dict[str, Any] = {}
for local_path, remote_path in extra_files.items():
# get the asset config mapping based on configured glob on the remote path
asset_config, asset_name = extract_asset_config_by_glob(
remote_path, stac_item_structure, root_config
)
create_stac_asset(
local_path,
remote_path,
root_config,
asset_config,
asset_name,
asset_aggregator,
is_image=False,
)
for local_path, remote_path in upload_files.items():
asset_config, asset_name = extract_asset_config_by_glob(
remote_path, stac_item_structure, root_config
)
create_stac_asset(
local_path,
remote_path,
root_config,
asset_config,
asset_name,
asset_aggregator,
is_image=True,
compute_statistics=stac_item_structure.get("statistics", {}).get(
"compute_statistics", False
),
approx=STATS_APPROX(
stac_item_structure.get("statistics", {}).get("stats_approx", 0)
),
force_histogram_min_value=stac_item_structure.get("statistics", {}).get(
"force_histogram_min_value", None
),
force_histogram_max_value=stac_item_structure.get("statistics", {}).get(
"force_histogram_max_value", None
),
)
# add assets to item
for name, value in asset_aggregator.items():
if value["count"] == 1:
stac_item.add_asset(name, value["assets"][0])
else:
# keys are unique -> add index to name
for i, asset_ in enumerate(sorted(value["assets"])):
stac_item.add_asset("%s_%s" % (name, i), asset_)
stac_item_json = json.dumps(stac_item.to_dict(False))
return stac_item_json