Source code for registrar.backend.eoxserver

"""
backend.py
============

Contains implementations for different backends where data may be registered

"""
import os
import sys
from typing import List, TYPE_CHECKING, Optional, TypedDict, cast
import json
from urllib.parse import urlparse
from uuid import uuid4

import structlog
import django
from django.db import transaction

from django.db.models import Q

if TYPE_CHECKING:
    from pystac import Item

from ..exceptions import RegistrationError
from ..source import HTTPSource, Source, LocalSource, S3Source, SwiftSource
from ..abc import Backend


logger = structlog.getLogger(__name__)


[docs]class AssetsToCoverageMapping(TypedDict): """Mapping structure to map specific STAC Item Assets to coverages Attributes: assets: list of asset names that shall be grouped to the same EOxServer Coverage """ assets: List[str]
[docs]class ItemToProductTypeMapping(TypedDict): """Mapping structure to map STAC Items to EOxServer ProductTypes Attributes: product_type: the name of the EOxServer ProductType filter: specific filters to limit on which STAC Items this mapping is applied coverages: a mapping of coverage type name to an AssetsToCoverageMapping collections: a list of collection names that products of this ProductType shall be inserted into """ name: str product_type: str filter: dict coverages: dict # TODO ? collections: List[str]
[docs]class ItemBackend(Backend["Item"]): """ EOxServer backend allows registration to be performed on a running EOxServer instance Args: instance_base_path (str): base path of the instance instance_name (str): name of django instance product_types (str): type of product to register auto_create_product_types (bool): whether to create product types from the STAC Item """ def __init__( self, instance_base_path: str, instance_name: str, product_types: List[ItemToProductTypeMapping], auto_create_product_types: bool = False, automatic_visibilities: Optional[List[str]] = None, simplify_footprint_tolerance: Optional[float] = None ): self.product_types = product_types self.simplify_footprint_tolerance = simplify_footprint_tolerance self.instance_name = instance_name self.auto_create_product_types = auto_create_product_types self.automatic_visibilities = automatic_visibilities path = os.path.join(instance_base_path, instance_name) if path not in sys.path: sys.path.append(path) os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"{instance_name}.settings") django.setup() def __repr__(self) -> str: return f"<{self.__class__.__name__} instance_name ={self.instance_name}>"
[docs] def exists(self, source: Optional[Source], item: "Item") -> bool: """Checks whether the item exists in the given source Args: source (Source): source of the data item (Item): item to be checked Returns: (bool): true if exists, false otherwise """ from eoxserver.resources.coverages import models return models.Product.objects.filter(identifier=item.id).exists()
def _get_storage_from_source(self, source: Optional[Source], item: "Item"): from eoxserver.backends import models as backends created_storage_auth = False created_storage = False storage_name = None if source is None: return None elif isinstance(source, LocalSource): storage, created_storage = backends.Storage.objects.get_or_create( name=source.name, url=source.root_directory, storage_type="directory", ) storage_name = storage.name elif isinstance(source, S3Source): params = json.dumps( { "ACCESS_KEY_ID": source.access_key_id, "SECRET_ACCESS_KEY": source.secret_access_key, "AWS_REGION": source.region_name, } ) endpoint_url = source.endpoint_url if endpoint_url.startswith("https://"): endpoint_url = endpoint_url[len("https://") :] elif endpoint_url.startswith("http://"): endpoint_url = endpoint_url[len("http://") :] bucket = source.bucket_name # get default bucket name from "first" asset. The first path # component is the bucket. if bucket is None: asset = next(iter(item.get_assets().values())) parsed = urlparse(asset.href) if parsed.scheme.lower() == 's3': bucket = parsed.netloc else: bucket = parsed.path.partition("/")[0] ( storage_auth, created_storage_auth, ) = backends.StorageAuth.objects.get_or_create( name=(source.name if source.bucket_name else f"{source.name}-{bucket}"), url=endpoint_url, storage_auth_type="S3", ) storage_auth.auth_parameters = params storage_auth.save() storage, created_storage = backends.Storage.objects.get_or_create( name=(source.name if source.bucket_name else f"{source.name}-{bucket}"), url=bucket, storage_type="S3", storage_auth=storage_auth, streaming=source.streaming, ) storage_name = storage.name elif isinstance(source, SwiftSource): params = json.dumps( { "auth-version": str(source.auth_version), "identity-api-version": str(source.auth_version), "username": source.username, "password": source.password, "project-name": source.project_name, "project-id": source.project_id, "region-name": source.region_name, } ) ( storage_auth, created_storage_auth, ) = backends.StorageAuth.objects.get_or_create( name=source.auth_url, url=source.auth_url_short or source.auth_url, storage_auth_type="keystone", auth_parameters=params, ) container = source.container if container is None: asset = next(iter(item.get_assets().values())) container, _ = source.get_container_and_path(asset.href) storage, created_storage = backends.Storage.objects.get_or_create( name=(source.name if source.container else f"{source.name}{container}"), url=container, storage_type="swift", storage_auth=storage_auth, streaming=source.streaming, ) storage_name = storage.name elif isinstance(source, HTTPSource): storage, created_storage = backends.Storage.objects.get_or_create( name=source.name, url=source.endpoint_url, storage_type="HTTP", streaming=source.streaming, # TODO: maybe add storage auth at some point ) storage_name = storage.name if created_storage_auth: logger.info("Created storage auth", source=source.name) if created_storage: logger.info("Created storage for", source=source.name) return [storage_name] if storage_name else [] def _register_with_stac( self, source: Optional[Source], item: "Item", replace: bool, storage, product_type: Optional[ItemToProductTypeMapping], simplify_footprint_tolerance: Optional[float] = None, create_product_type_model: bool = False, automatic_visibilities: Optional[List[str]] = None, ): from eoxserver.backends import models as backends from eoxserver.resources.coverages import models from eoxserver.resources.coverages.registration.stac import ( register_stac_product, create_product_type_from_stac_item, ) from eoxserver.services import models as service_models # TODO: flag to re-use product type? if create_product_type_model: if product_type is not None: product_type_obj, created = create_product_type_from_stac_item( item.to_dict(transform_hrefs=False), product_type_name=product_type["name"], ignore_existing=True, coverage_mapping=product_type.get("coverages", {}), ) else: coverage_mapping = {} # select all assets as coverages which have the "data" role for asset_name, asset in item.assets.items(): if asset.roles is not None and "data" in asset.roles: coverage_mapping[asset_name] = {"assets": [asset_name]} # fallback when only one asset provided if not coverage_mapping and len(item.assets) == 1: for asset_name, asset in item.assets.items(): coverage_mapping[asset_name] = {"assets": [asset_name]} product_type_obj, created = create_product_type_from_stac_item( item.to_dict(transform_hrefs=False), product_type_name=uuid4().hex, ignore_existing=False, # TODO: find better way of dealing with missing mappings # ideally EOxServer figures it out by itself, even when no # eo:bands are set coverage_mapping=coverage_mapping, ) elif product_type is not None: created = False product_type_obj = models.ProductType.objects.get(name=product_type["name"]) if created: logger.info("Created Product Type" , product_type_obj=product_type_obj) else: logger.info("Using existing Product Type", product_type_obj=product_type_obj) # resolve storage object if storage: storage = backends.Storage.objects.get(name=storage[0]) if product_type is not None: product, replaced = register_stac_product( item.to_dict(transform_hrefs=False), product_type_obj, storage, replace=replace, coverage_mapping=product_type.get("coverages", {}), browse_mapping=product_type.get("browses", {}), metadata_asset_names=product_type.get("metadata_assets"), self_href=item.get_self_href(), simplify_footprint_tolerance=simplify_footprint_tolerance ) else: product, replaced = register_stac_product( item.to_dict(transform_hrefs=False), product_type_obj, storage, replace=replace, self_href=item.get_self_href(), simplify_footprint_tolerance=simplify_footprint_tolerance ) if product_type: service_visibilities = cast( List[str], product_type.get( "service_visibility", automatic_visibilities or [] ) ) else: service_visibilities = automatic_visibilities or [] # make product visible for certain services for service in service_visibilities: ( service_visibility, _, ) = service_models.ServiceVisibility.objects.get_or_create( eo_object=product, service=service, ) service_visibility.visibility = True service_visibility.full_clean() service_visibility.save() logger.info( f"Successfully {'replaced' if replaced else 'registered'} Product", product=product.identifier, ) return product
[docs] @transaction.atomic def register(self, source: Optional[Source], item: "Item", replace: bool): """Registers the item to the endpoint Args: source (Source): source of the data item ('Item'): item to be registered replace (bool): replace existing or not """ # ugly, ugly hack from eoxserver.resources.coverages import models storage = self._get_storage_from_source(source, item) simplify_footprint_tolerance = self.simplify_footprint_tolerance product_type_found = False # match self.product_types with item for product_type in self.product_types: match = [] filter_ = product_type.get("filter") if not filter_ and len(self.product_types) > 1: raise RegistrationError( "Multiple product types found without filter, cannot match" ) if filter_: for k, v in filter_.items(): if k in item.properties and item.properties[k] == v: match.append(True) else: match.append(False) if all(match): product_type = product_type product_type_found = True break else: product_type_found = True break if product_type_found: logger.info("Registering into EOxServer", type=product_type['name']) product = self._register_with_stac( source, item, replace, storage, product_type, simplify_footprint_tolerance ) # insert the product in the to be associated collections for collection_id in product_type.get("collections", []): collection = models.Collection.objects.get(identifier=collection_id) models.collection_insert_eo_object(collection, product) elif self.auto_create_product_types: product = self._register_with_stac( source, item, replace, storage, product_type=None, simplify_footprint_tolerance=simplify_footprint_tolerance, create_product_type_model=True, automatic_visibilities=self.automatic_visibilities, ) else: raise RegistrationError(f"{item} not matched to any product_type")
[docs] def deregister(self, source: Optional[Source], item: "Item"): """ Defers to ``deregister_identifier`` with the items identifier """ return self.deregister_identifier(item.id)
[docs] @transaction.atomic def deregister_identifier(self, identifier: str): """Attempts to deregister item Args: identifier (str): identifier to be deregistered """ # ugly, ugly hack from eoxserver.resources.coverages import models with structlog.contextvars.bound_contextvars(indentifier=identifier): try: logger.info("Deregistering product") product = models.Product.objects.get(identifier=identifier) grids = list(models.Grid.objects.filter(coverage__parent_product=product)) product.delete() # clean up grids for grid in grids: grid_used = models.EOObject.objects.filter( Q(coverage__grid=grid) | Q(mosaic__grid=grid), ).exists() # clean up grid as well, if it is not referenced # anymore but saving named (user defined) grids if grid and not grid.name and not grid_used: grid.delete() except models.Product.DoesNotExist: logger.info("No product with identifier found") # no product found with that id # return empty list return None logger.info("Deregistered product") # return the deleted identifier return identifier