"""
backend.py
============
Contains implementations for different backends where data may be registered
"""
import os
import sys
from typing import List, TYPE_CHECKING, Optional, TypedDict, cast
import json
from urllib.parse import urlparse
from uuid import uuid4
import structlog
import django
from django.db import transaction
from django.db.models import Q
if TYPE_CHECKING:
from pystac import Item
from ..exceptions import RegistrationError
from ..source import HTTPSource, Source, LocalSource, S3Source, SwiftSource
from ..abc import Backend
logger = structlog.getLogger(__name__)
[docs]class AssetsToCoverageMapping(TypedDict):
"""Mapping structure to map specific STAC Item Assets to coverages
Attributes:
assets: list of asset names that shall be grouped to the same
EOxServer Coverage
"""
assets: List[str]
[docs]class ItemToProductTypeMapping(TypedDict):
"""Mapping structure to map STAC Items to EOxServer ProductTypes
Attributes:
product_type: the name of the EOxServer ProductType
filter: specific filters to limit on which STAC Items this mapping is
applied
coverages: a mapping of coverage type name to an
AssetsToCoverageMapping
collections: a list of collection names that products of this
ProductType shall be inserted into
"""
name: str
product_type: str
filter: dict
coverages: dict # TODO ?
collections: List[str]
[docs]class ItemBackend(Backend["Item"]):
"""
EOxServer backend allows registration to be performed on a running
EOxServer instance
Args:
instance_base_path (str): base path of the instance
instance_name (str): name of django instance
product_types (str): type of product to register
auto_create_product_types (bool): whether to create product types from
the STAC Item
"""
def __init__(
self,
instance_base_path: str,
instance_name: str,
product_types: List[ItemToProductTypeMapping],
auto_create_product_types: bool = False,
automatic_visibilities: Optional[List[str]] = None,
simplify_footprint_tolerance: Optional[float] = None
):
self.product_types = product_types
self.simplify_footprint_tolerance = simplify_footprint_tolerance
self.instance_name = instance_name
self.auto_create_product_types = auto_create_product_types
self.automatic_visibilities = automatic_visibilities
path = os.path.join(instance_base_path, instance_name)
if path not in sys.path:
sys.path.append(path)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"{instance_name}.settings")
django.setup()
def __repr__(self) -> str:
return f"<{self.__class__.__name__} instance_name ={self.instance_name}>"
[docs] def exists(self, source: Optional[Source], item: "Item") -> bool:
"""Checks whether the item exists in the given source
Args:
source (Source): source of the data
item (Item): item to be checked
Returns:
(bool): true if exists, false otherwise
"""
from eoxserver.resources.coverages import models
return models.Product.objects.filter(identifier=item.id).exists()
def _get_storage_from_source(self, source: Optional[Source], item: "Item"):
from eoxserver.backends import models as backends
created_storage_auth = False
created_storage = False
storage_name = None
if source is None:
return None
elif isinstance(source, LocalSource):
storage, created_storage = backends.Storage.objects.get_or_create(
name=source.name,
url=source.root_directory,
storage_type="directory",
)
storage_name = storage.name
elif isinstance(source, S3Source):
params = json.dumps(
{
"ACCESS_KEY_ID": source.access_key_id,
"SECRET_ACCESS_KEY": source.secret_access_key,
"AWS_REGION": source.region_name,
}
)
endpoint_url = source.endpoint_url
if endpoint_url.startswith("https://"):
endpoint_url = endpoint_url[len("https://") :]
elif endpoint_url.startswith("http://"):
endpoint_url = endpoint_url[len("http://") :]
bucket = source.bucket_name
# get default bucket name from "first" asset. The first path
# component is the bucket.
if bucket is None:
asset = next(iter(item.get_assets().values()))
parsed = urlparse(asset.href)
if parsed.scheme.lower() == 's3':
bucket = parsed.netloc
else:
bucket = parsed.path.partition("/")[0]
(
storage_auth,
created_storage_auth,
) = backends.StorageAuth.objects.get_or_create(
name=(source.name if source.bucket_name else f"{source.name}-{bucket}"),
url=endpoint_url,
storage_auth_type="S3",
)
storage_auth.auth_parameters = params
storage_auth.save()
storage, created_storage = backends.Storage.objects.get_or_create(
name=(source.name if source.bucket_name else f"{source.name}-{bucket}"),
url=bucket,
storage_type="S3",
storage_auth=storage_auth,
streaming=source.streaming,
)
storage_name = storage.name
elif isinstance(source, SwiftSource):
params = json.dumps(
{
"auth-version": str(source.auth_version),
"identity-api-version": str(source.auth_version),
"username": source.username,
"password": source.password,
"project-name": source.project_name,
"project-id": source.project_id,
"region-name": source.region_name,
}
)
(
storage_auth,
created_storage_auth,
) = backends.StorageAuth.objects.get_or_create(
name=source.auth_url,
url=source.auth_url_short or source.auth_url,
storage_auth_type="keystone",
auth_parameters=params,
)
container = source.container
if container is None:
asset = next(iter(item.get_assets().values()))
container, _ = source.get_container_and_path(asset.href)
storage, created_storage = backends.Storage.objects.get_or_create(
name=(source.name if source.container else f"{source.name}{container}"),
url=container,
storage_type="swift",
storage_auth=storage_auth,
streaming=source.streaming,
)
storage_name = storage.name
elif isinstance(source, HTTPSource):
storage, created_storage = backends.Storage.objects.get_or_create(
name=source.name,
url=source.endpoint_url,
storage_type="HTTP",
streaming=source.streaming,
# TODO: maybe add storage auth at some point
)
storage_name = storage.name
if created_storage_auth:
logger.info("Created storage auth", source=source.name)
if created_storage:
logger.info("Created storage for", source=source.name)
return [storage_name] if storage_name else []
def _register_with_stac(
self,
source: Optional[Source],
item: "Item",
replace: bool,
storage,
product_type: Optional[ItemToProductTypeMapping],
simplify_footprint_tolerance: Optional[float] = None,
create_product_type_model: bool = False,
automatic_visibilities: Optional[List[str]] = None,
):
from eoxserver.backends import models as backends
from eoxserver.resources.coverages import models
from eoxserver.resources.coverages.registration.stac import (
register_stac_product,
create_product_type_from_stac_item,
)
from eoxserver.services import models as service_models
# TODO: flag to re-use product type?
if create_product_type_model:
if product_type is not None:
product_type_obj, created = create_product_type_from_stac_item(
item.to_dict(transform_hrefs=False),
product_type_name=product_type["name"],
ignore_existing=True,
coverage_mapping=product_type.get("coverages", {}),
)
else:
coverage_mapping = {}
# select all assets as coverages which have the "data" role
for asset_name, asset in item.assets.items():
if asset.roles is not None and "data" in asset.roles:
coverage_mapping[asset_name] = {"assets": [asset_name]}
# fallback when only one asset provided
if not coverage_mapping and len(item.assets) == 1:
for asset_name, asset in item.assets.items():
coverage_mapping[asset_name] = {"assets": [asset_name]}
product_type_obj, created = create_product_type_from_stac_item(
item.to_dict(transform_hrefs=False),
product_type_name=uuid4().hex,
ignore_existing=False,
# TODO: find better way of dealing with missing mappings
# ideally EOxServer figures it out by itself, even when no
# eo:bands are set
coverage_mapping=coverage_mapping,
)
elif product_type is not None:
created = False
product_type_obj = models.ProductType.objects.get(name=product_type["name"])
if created:
logger.info("Created Product Type" , product_type_obj=product_type_obj)
else:
logger.info("Using existing Product Type", product_type_obj=product_type_obj)
# resolve storage object
if storage:
storage = backends.Storage.objects.get(name=storage[0])
if product_type is not None:
product, replaced = register_stac_product(
item.to_dict(transform_hrefs=False),
product_type_obj,
storage,
replace=replace,
coverage_mapping=product_type.get("coverages", {}),
browse_mapping=product_type.get("browses", {}),
metadata_asset_names=product_type.get("metadata_assets"),
self_href=item.get_self_href(),
simplify_footprint_tolerance=simplify_footprint_tolerance
)
else:
product, replaced = register_stac_product(
item.to_dict(transform_hrefs=False),
product_type_obj,
storage,
replace=replace,
self_href=item.get_self_href(),
simplify_footprint_tolerance=simplify_footprint_tolerance
)
if product_type:
service_visibilities = cast(
List[str],
product_type.get(
"service_visibility", automatic_visibilities or []
)
)
else:
service_visibilities = automatic_visibilities or []
# make product visible for certain services
for service in service_visibilities:
(
service_visibility,
_,
) = service_models.ServiceVisibility.objects.get_or_create(
eo_object=product,
service=service,
)
service_visibility.visibility = True
service_visibility.full_clean()
service_visibility.save()
logger.info(
f"Successfully {'replaced' if replaced else 'registered'} Product",
product=product.identifier,
)
return product
[docs] @transaction.atomic
def register(self, source: Optional[Source], item: "Item", replace: bool):
"""Registers the item to the endpoint
Args:
source (Source): source of the data
item ('Item'): item to be registered
replace (bool): replace existing or not
"""
# ugly, ugly hack
from eoxserver.resources.coverages import models
storage = self._get_storage_from_source(source, item)
simplify_footprint_tolerance = self.simplify_footprint_tolerance
product_type_found = False
# match self.product_types with item
for product_type in self.product_types:
match = []
filter_ = product_type.get("filter")
if not filter_ and len(self.product_types) > 1:
raise RegistrationError(
"Multiple product types found without filter, cannot match"
)
if filter_:
for k, v in filter_.items():
if k in item.properties and item.properties[k] == v:
match.append(True)
else:
match.append(False)
if all(match):
product_type = product_type
product_type_found = True
break
else:
product_type_found = True
break
if product_type_found:
logger.info("Registering into EOxServer", type=product_type['name'])
product = self._register_with_stac(
source,
item,
replace,
storage,
product_type,
simplify_footprint_tolerance
)
# insert the product in the to be associated collections
for collection_id in product_type.get("collections", []):
collection = models.Collection.objects.get(identifier=collection_id)
models.collection_insert_eo_object(collection, product)
elif self.auto_create_product_types:
product = self._register_with_stac(
source,
item,
replace,
storage,
product_type=None,
simplify_footprint_tolerance=simplify_footprint_tolerance,
create_product_type_model=True,
automatic_visibilities=self.automatic_visibilities,
)
else:
raise RegistrationError(f"{item} not matched to any product_type")
[docs] def deregister(self, source: Optional[Source], item: "Item"):
""" Defers to ``deregister_identifier`` with the items identifier
"""
return self.deregister_identifier(item.id)
[docs] @transaction.atomic
def deregister_identifier(self, identifier: str):
"""Attempts to deregister item
Args:
identifier (str): identifier to be deregistered
"""
# ugly, ugly hack
from eoxserver.resources.coverages import models
with structlog.contextvars.bound_contextvars(indentifier=identifier):
try:
logger.info("Deregistering product")
product = models.Product.objects.get(identifier=identifier)
grids = list(models.Grid.objects.filter(coverage__parent_product=product))
product.delete()
# clean up grids
for grid in grids:
grid_used = models.EOObject.objects.filter(
Q(coverage__grid=grid) | Q(mosaic__grid=grid),
).exists()
# clean up grid as well, if it is not referenced
# anymore but saving named (user defined) grids
if grid and not grid.name and not grid_used:
grid.delete()
except models.Product.DoesNotExist:
logger.info("No product with identifier found")
# no product found with that id
# return empty list
return None
logger.info("Deregistered product")
# return the deleted identifier
return identifier