Source code for preprocessor.stac

from .metadata import extract_metadata_for_stac
from math import floor
from pystac import Item, Asset
import json
from enum import IntEnum
from typing import Any, Dict, Optional
from .util import gdal
from .archive import filter_filenames
from .transfer import get_uploader
import structlog

logger = structlog.getLogger(__name__)


[docs]class STATS_APPROX(IntEnum): NO_APROX = 0 # statistics and histogram from raw data and no subset APPROX_OK = 1 # statistics and histogram from topmost overview and subset APPROX_OVERVIEW = 2 # statistics and histogram from topmost overview, no subset
[docs]class HrefSortableAsset(Asset): """Helper function enabling sorting Assets by href.""" def __lt__(self, other): return self.href < other.href
[docs]def create_stac_asset( local_path: str, remote_path: str, root_config: Dict[str, Any], asset_config: Dict[str, Any], name: str = "", aggregator: Dict = {}, is_image: bool = False, compute_statistics: bool = False, approx: STATS_APPROX = STATS_APPROX.APPROX_OK, force_histogram_min_value: Optional[float] = None, force_histogram_max_value: Optional[float] = None, ): """Helper function creating a STAC asset and filling it with image/metadata properties based on config.""" extra_fields = asset_config.get("extra_fields", {}) if is_image: ds = None target_config = root_config["target"] if "remote_reference_" not in local_path: ds = gdal.Open(local_path) else: if target_config.get("type") == "swift": uploader = get_uploader( target_config["type"], target_config.get("args"), target_config.get("kwargs"), ) container, path = uploader.validate_container(remote_path) # type: ignore image_path = f"/vsiswift/{container}/{path}" uploader.export_env_vars() # type: ignore try: ds = gdal.Open(image_path) except Exception as e: logger.debug("Can not open remote file %s" % image_path) logger.debug(e) else: ds = gdal.Open(remote_path) if ds: # get number of bands band_count = ds.RasterCount bands = [] raster_bands = [] # get band count and accordingly add band_name mapping # in config, save as eo:bands for j in range(band_count): band = ds.GetRasterBand(j + 1) bands_d = {"name": "band%s" % (j + 1)} # if mapping configured, extract common_name # based on index of band in mapping if asset_config.get("band_mapping", False): band_common_names = asset_config.get("band_mapping", {}).get( band_count, False ) if band_common_names: band_common_names = ( [band_common_names] if isinstance(band_common_names, str) else band_common_names ) bands_d["common_name"] = band_common_names[j] else: bands_d["common_name"] = "band%s" % (j + 1) else: bands_d["common_name"] = "band%s" % (j + 1) bands.append(bands_d) raster_band_object = {} nodata = band.GetNoDataValue() datatype = gdal.GetDataTypeName(band.DataType) # get statistics for all bands if compute_statistics: histogram_min = force_histogram_min_value histogram_max = force_histogram_max_value if ( approx == STATS_APPROX.APPROX_OK or approx == STATS_APPROX.NO_APROX ): buckets = 256 stats = band.GetStatistics(approx, True) # allowing to override min/max of created histogram if histogram_min is None: histogram_min = stats[0] if histogram_max is None: histogram_max = stats[1] histogram = band.GetHistogram( min=histogram_min, max=histogram_max, buckets=buckets, approx_ok=int(approx), ) elif approx == STATS_APPROX.APPROX_OVERVIEW: overview_count = band.GetOverviewCount() if overview_count > 0: overview = band.GetOverview((overview_count - 1) // 3) else: overview = band stats = overview.GetStatistics(False, True) if histogram_min is None: histogram_min = stats[0] if histogram_max is None: histogram_max = stats[1] # heuristics to ensure meaningful buckets buckets = max( 255, (histogram_max - histogram_min) // 20 ) # type: ignore buckets = floor(min(buckets, 1500)) histogram = overview.GetHistogram( min=histogram_min, max=histogram_max, buckets=buckets, approx_ok=False, ) else: raise ValueError("Value STATS_APPROX not allowed %s " % approx) raster_band_object["statistics"] = { "minimum": stats[0], "maximum": stats[1], "mean": stats[2], "stddev": stats[3], } raster_band_object["histogram"] = { "count": buckets, "min": histogram_min, "max": histogram_max, "buckets": histogram, } raster_band_object["nodata"] = nodata raster_band_object["data_type"] = datatype.lower() raster_bands.append(raster_band_object) extra_fields["raster:bands"] = raster_bands extra_fields["eo:bands"] = bands del ds if ( target_config.get("type") == "swift" and "remote_reference_" in local_path ): uploader.reset_env_vars() # type: ignore asset = HrefSortableAsset( href=remote_path, title=asset_config.get("title", None), description=asset_config.get("description", None), media_type=asset_config.get("media_type", None), roles=asset_config.get("roles", []), extra_fields=extra_fields, ) # add to intermediate asset aggregator with object counts if aggregator.get(name) is not None: # add counter to asset name aggregator[name]["assets"].append(asset) aggregator[name]["count"] += 1 else: aggregator[name] = {} aggregator[name]["assets"] = [asset] aggregator[name]["count"] = 1
[docs]def extract_asset_config_by_glob( local_path: str, stac_item_structure: Dict[str, Any], config: Dict[str, Any] ): found = False # find matching item definition in config by glob for asset_name, asset_config in stac_item_structure.get("assets", {}).items(): for glob in asset_config.get("globs"): filenames = filter_filenames( [local_path], glob, config.get("glob_case", False) ) if len(filenames) > 0: found = True break if found: break if not found: raise Exception("No asset configuration matched for file: %s" % local_path) return asset_config, asset_name
[docs]def create_simple_stac_item( preprocessor_config: dict, root_config: dict, upload_files: Dict[str, str], extra_files: Dict[str, str], product_type: str, product_level: Optional[str], ): """Temporary method creating a minimal STAC item from information about products uploaded and metadata files uploaded. Accepts: 'upload_files' dictionary of upload_files (images), where key is local path and value is remote path. 'extra_files' dictionary of extra_files (sidecar or metadata), where key is local path and value is remote path. Assuming metadata file to read and create a STAC info from is first to pick by iterator. """ # get relevant metadata from GSC metadata = extract_metadata_for_stac(extra_files, product_type, product_level) stac_item_structure = preprocessor_config.get("stac_item_structure", {}) stac_item_properties = { **metadata["properties"], **stac_item_structure.get("properties", {}), } stac_item_extra_fields = { **metadata["extra_fields"], **stac_item_structure.get("extra_fields", {}), } stac_item = Item( id=metadata["id"], geometry=metadata["geometry"], bbox=metadata["bbox"], datetime=metadata["datetime"], properties=stac_item_properties, extra_fields=stac_item_extra_fields, ) asset_aggregator: Dict[str, Any] = {} for local_path, remote_path in extra_files.items(): # get the asset config mapping based on configured glob on the remote path asset_config, asset_name = extract_asset_config_by_glob( remote_path, stac_item_structure, root_config ) create_stac_asset( local_path, remote_path, root_config, asset_config, asset_name, asset_aggregator, is_image=False, ) for local_path, remote_path in upload_files.items(): asset_config, asset_name = extract_asset_config_by_glob( remote_path, stac_item_structure, root_config ) create_stac_asset( local_path, remote_path, root_config, asset_config, asset_name, asset_aggregator, is_image=True, compute_statistics=stac_item_structure.get("statistics", {}).get( "compute_statistics", False ), approx=STATS_APPROX( stac_item_structure.get("statistics", {}).get("stats_approx", 0) ), force_histogram_min_value=stac_item_structure.get("statistics", {}).get( "force_histogram_min_value", None ), force_histogram_max_value=stac_item_structure.get("statistics", {}).get( "force_histogram_max_value", None ), ) # add assets to item for name, value in asset_aggregator.items(): if value["count"] == 1: stac_item.add_asset(name, value["assets"][0]) else: # keys are unique -> add index to name for i, asset_ in enumerate(sorted(value["assets"])): stac_item.add_asset("%s_%s" % (name, i), asset_) stac_item_json = json.dumps(stac_item.to_dict(False)) return stac_item_json