from lxml import etree
from datetime import datetime, timezone
from .util import pairwise
import logging
import structlog
from typing import List, Dict, Optional
logging.basicConfig()
logger = structlog.getLogger(__name__)
[docs]def evaluate_xpath(root, xpath):
""" """
result = root.xpath(xpath, namespaces=root.nsmap)
if result:
if isinstance(result, list):
return result[0]
return result
return None
[docs]def extract_product_types_and_levels(metadata_files: List, config: dict):
"""
Extracts product_types and product_levels found in the metadata
based on configured XML xpath extractors.
"""
product_types = []
product_levels = []
for metadata_file in metadata_files:
with open(metadata_file) as f:
tree = etree.parse(f)
root = tree.getroot()
xpaths = config["type_extractor"]["xpath"]
xpaths = [xpaths] if isinstance(xpaths, str) else xpaths
for xpath in xpaths:
product_type = evaluate_xpath(root, xpath)
if product_type is not None:
product_types.append(product_type)
xpaths = config["level_extractor"]["xpath"]
if xpaths:
xpaths = [xpaths] if isinstance(xpaths, str) else xpaths
for xpath in xpaths:
product_level = evaluate_xpath(root, xpath)
if product_level is not None:
product_levels.append(product_level)
return product_types, product_levels
[docs]def update_config_by_product_types_and_levels(metadata_files: List, config: dict):
"""
Extracts product_type and product_level based on config,
updates the config dict by type based config.
"""
# open the XML to retrieve the product type and level
product_types, product_levels = extract_product_types_and_levels(
metadata_files, config
)
logger.info(
"Detected product_types: %s and level_types: %s"
% (product_types, product_levels)
)
# get a concrete configuration for the type, filled with the defaults
default_config = dict(config["preprocessing"].get("defaults", {}))
type_based_config = {}
for product_type in product_types:
# search metadata product type in configuration until it finds it
configured_preprocessor_config = dict(
config["preprocessing"]["types"].get(product_type, {})
)
if configured_preprocessor_config != {}:
type_based_config = configured_preprocessor_config
break
default_config.update(type_based_config)
product_level = product_levels[0] if len(product_levels) > 0 else None
return default_config, product_type, product_level
[docs]def parse_ring(string):
raw_coords = string.split()
return [(lon, lat) for lat, lon in pairwise(raw_coords)]
[docs]def serialize_coord_list(coords):
return ",".join(f"{x} {y}" for x, y in coords)
[docs]def parse_polygons_gsc(elem):
interior = serialize_coord_list(
parse_ring(
elem.xpath(
"gml:exterior/gml:LinearRing/gml:posList", namespaces=elem.nsmap
)[0].text.strip()
)
)
exteriors = [
f"""({
serialize_coord_list(
parse_ring(poslist_elem.text.strip())
)
})"""
for poslist_elem in elem.xpath(
"gml:interior/gml:LinearRing/gml:posList", namespaces=elem.nsmap
)
]
return f"POLYGON(({interior}){',' if exteriors else ''}{','.join(exteriors)})"