Source code for preprocessor.metadata

from lxml import etree
from datetime import datetime, timezone
from .util import pairwise
import logging
import structlog
from typing import List, Dict, Optional

logging.basicConfig()
logger = structlog.getLogger(__name__)


[docs]def evaluate_xpath(root, xpath): """ """ result = root.xpath(xpath, namespaces=root.nsmap) if result: if isinstance(result, list): return result[0] return result return None
[docs]def extract_product_types_and_levels(metadata_files: List, config: dict): """ Extracts product_types and product_levels found in the metadata based on configured XML xpath extractors. """ product_types = [] product_levels = [] for metadata_file in metadata_files: with open(metadata_file) as f: tree = etree.parse(f) root = tree.getroot() xpaths = config["type_extractor"]["xpath"] xpaths = [xpaths] if isinstance(xpaths, str) else xpaths for xpath in xpaths: product_type = evaluate_xpath(root, xpath) if product_type is not None: product_types.append(product_type) xpaths = config["level_extractor"]["xpath"] if xpaths: xpaths = [xpaths] if isinstance(xpaths, str) else xpaths for xpath in xpaths: product_level = evaluate_xpath(root, xpath) if product_level is not None: product_levels.append(product_level) return product_types, product_levels
[docs]def update_config_by_product_types_and_levels(metadata_files: List, config: dict): """ Extracts product_type and product_level based on config, updates the config dict by type based config. """ # open the XML to retrieve the product type and level product_types, product_levels = extract_product_types_and_levels( metadata_files, config ) logger.info( "Detected product_types: %s and level_types: %s" % (product_types, product_levels) ) # get a concrete configuration for the type, filled with the defaults default_config = dict(config["preprocessing"].get("defaults", {})) type_based_config = {} for product_type in product_types: # search metadata product type in configuration until it finds it configured_preprocessor_config = dict( config["preprocessing"]["types"].get(product_type, {}) ) if configured_preprocessor_config != {}: type_based_config = configured_preprocessor_config break default_config.update(type_based_config) product_level = product_levels[0] if len(product_levels) > 0 else None return default_config, product_type, product_level
[docs]def extract_metadata_for_stac( metadata_files: Dict[str, str], product_type: str, product_level: Optional[str] ): """ Temporary function extracting necessary metadata to create a minimal STAC item. For now the xpaths are hardcoded here """ GSC_SCHEMA = { "identifier": "//gml:metaDataProperty/gsc:EarthObservationMetaData" "/eop:identifier/text()", "start_datetime": "//gml:validTime/gml:TimePeriod/gml:beginPosition/text()", "end_datetime": "//gml:validTime/gml:TimePeriod/gml:endPosition/text()", } # just considering the first metadata file metadata_file = next(iter(metadata_files.keys())) with open(metadata_file) as f: tree = etree.parse(f) root = tree.getroot() output_metadata = { "id": evaluate_xpath(root, GSC_SCHEMA["identifier"]), "geometry": None, # optional "bbox": None, # optional "datetime": datetime.now(timezone.utc).replace(microsecond=0), "properties": { "product_type": product_type, "start_datetime": evaluate_xpath(root, GSC_SCHEMA["start_datetime"]), "end_datetime": evaluate_xpath(root, GSC_SCHEMA["end_datetime"]), }, "extra_fields": {}, } if product_level is not None: output_metadata["product_level"] = product_level return output_metadata
[docs]def parse_ring(string): raw_coords = string.split() return [(lon, lat) for lat, lon in pairwise(raw_coords)]
[docs]def serialize_coord_list(coords): return ",".join(f"{x} {y}" for x, y in coords)
[docs]def parse_polygons_gsc(elem): interior = serialize_coord_list( parse_ring( elem.xpath( "gml:exterior/gml:LinearRing/gml:posList", namespaces=elem.nsmap )[0].text.strip() ) ) exteriors = [ f"""({ serialize_coord_list( parse_ring(poslist_elem.text.strip()) ) })""" for poslist_elem in elem.xpath( "gml:interior/gml:LinearRing/gml:posList", namespaces=elem.nsmap ) ] return f"POLYGON(({interior}){',' if exteriors else ''}{','.join(exteriors)})"
[docs]def extract_footprint( metadata_files, footprint_extractor: str = """//gml:target/eop:Footprint/gml:multiExtentOf /gml:MultiSurface/gml:surfaceMembers/gml:Polygon""", ): footprint = None for metadata_file in metadata_files: with open(metadata_file) as f: tree = etree.parse(f) root = tree.getroot() footprint = evaluate_xpath(root, footprint_extractor) if footprint is not None: break return parse_polygons_gsc(footprint)