Skip to content

API

The section shows information about the API relevant for the developers. Once the platform is hosted, you can view the documentation at /redoc endpoint. Example

Stacture

stacture.api.base

BaseAPI()

Bases: ABC

Source code in stacture/stacture/api/base.py
@abstractmethod
def __init__(self):
    pass
get_api_router() abstractmethod
Source code in stacture/stacture/api/base.py
@abstractmethod
def get_api_router(self) -> APIRouter: ...
Source code in stacture/stacture/api/base.py
@abstractmethod
def get_collection_links(
    self, request: Request, collection_id: str, collection_conf: Collection
) -> list[LinkType]:
    # NOTE: this should probably return LinkType
    ...
Source code in stacture/stacture/api/base.py
@abstractmethod
def get_landing_page_links(self, request: Request) -> list[LinkType]:
    # NOTE: this should probably return LinkType
    ...

LinkType

Bases: BaseModel

href: str instance-attribute
hreflang: str | None = None class-attribute instance-attribute
length: int | None = None class-attribute instance-attribute
rel: str instance-attribute
title: str | None = None class-attribute instance-attribute
type: str | None = None class-attribute instance-attribute

stacture.api.core

ConfigAnnotated = Annotated[Config, Depends(get_config)] module-attribute

router = APIRouter() module-attribute

CoreAPI()

Bases: BaseAPI

Source code in stacture/stacture/api/core.py
def __init__(self):
    pass
get_api_router()
Source code in stacture/stacture/api/core.py
def get_api_router(self) -> APIRouter:
    return router
Source code in stacture/stacture/api/core.py
def get_collection_links(
    self, request: Request, collection_id: str, collection_conf: Collection
) -> list[LinkType]:
    return [
        LinkType(
            href=str(request.url_for("landing_page")),
            rel="root",
            type="application/json",
        ),
        LinkType(
            href=str(request.url_for("collection", collection_id=collection_id)),
            rel="self",
            type="application/json",
        ),
    ]
Source code in stacture/stacture/api/core.py
def get_landing_page_links(self, request: Request) -> list[LinkType]:
    return [
        LinkType(
            href=str(request.url_for("landing_page")),
            rel="self",
            title="Landing Page",
            type="application/json",
        ),
        # {
        #     "rel": "service-desc",
        #     "type": "application/vnd.oai.openapi+json;version=3.0",
        #     "title": "The OpenAPI definition as JSON",
        #     "href": "https://demo.pygeoapi.io/stable/openapi"
        # },
        # {
        #     "rel": "service-doc",
        #     "type": "text/html",
        #     "title": "The OpenAPI definition as HTML",
        #     "href": "https://demo.pygeoapi.io/stable/openapi?f=html",
        #     "hreflang": "en-US"
        # },
        # {
        #     "rel": "conformance",
        #     "type": "application/json",
        #     "title": "Conformance",
        #     "href": "https://demo.pygeoapi.io/stable/conformance"
        # },
        LinkType(
            href=str(request.url_for("collections")),
            rel="data",
            title="Collections",
            type="application/json",
        ),
    ]

collection(request, collection_id, config) async

Source code in stacture/stacture/api/core.py
@router.get("/collections/{collection_id}")
async def collection(request: Request, collection_id: str, config: ConfigAnnotated):
    registry = get_registry()
    return await get_collection_description(request, registry, config, collection_id)

collections(request, config) async

Source code in stacture/stacture/api/core.py
@router.get("/collections")
async def collections(request: Request, config: ConfigAnnotated):
    registry = get_registry()
    return {
        "collections": [
            await get_collection_description(request, registry, config, collection_id)
            for collection_id in config.collections
        ],
        "links": [
            LinkType(
                href=str(request.url_for("collections")),
                rel="self",
                title="Collections",
                type="application/json",
            ).model_dump(exclude_unset=True)
        ],
    }

get_collection_description(request, registry, config, collection_id) async

Source code in stacture/stacture/api/core.py
async def get_collection_description(
    request: Request, registry: APIRegistry, config: ConfigAnnotated, collection_id: str
) -> dict:
    if collection_id in config.collections:
        collection_conf = config.collections[collection_id]
    else:
        for collection_conf in config.collections.values():
            if any(
                re.match(pattern, collection_id)
                for pattern in collection_conf.id_patterns
            ):
                break
        else:
            raise HTTPNotFound(detail="Item not found")

    links = []
    for api in registry.apis:
        links.extend(api.get_collection_links(request, collection_id, collection_conf))

    source = get_source(collection_conf.source)
    collection = await source.get_collection()

    return {
        "id": collection_id,
        "title": collection.title,
        "description": collection.description,
        "keywords": collection.keywords,
        "extent": collection.extent,
        "links": links,
    }

landing_page(request, config) async

Source code in stacture/stacture/api/core.py
@router.get("/", response_model_exclude_unset=True)
async def landing_page(request: Request, config: ConfigAnnotated):
    registry = get_registry()
    links = []
    for api in registry.apis:
        links.extend(api.get_landing_page_links(request))

    return {
        "title": config.title,
        "description": config.description,
        "links": links,
    }

stacture.api.coverages

__all__ = ['CoveragesAPI'] module-attribute

CoveragesAPI()

Bases: BaseAPI

Source code in stacture/stacture/api/coverages/api.py
def __init__(self):
    pass
get_api_router()
Source code in stacture/stacture/api/coverages/api.py
def get_api_router(self) -> APIRouter:
    return router
Source code in stacture/stacture/api/coverages/api.py
def get_collection_links(
    self, request: Request, collection_id: str, collection_conf: Collection
) -> list[LinkType]:
    coverage_conf = collection_conf.coverage
    if coverage_conf is None:
        return []

    links = [
        LinkType(
            href=str(
                request.url_for("collection_schema", collection_id=collection_id)
            ),
            rel="http://www.opengis.net/def/rel/ogc/1.0/schema",
        ),
        LinkType(
            href=str(
                request.url_for("collection_coverage", collection_id=collection_id)
            ),
            rel="http://www.opengis.net/def/rel/ogc/1.0/coverage",
        ),
        LinkType(
            href=str(
                request.url_for("collection_scenes", collection_id=collection_id)
            ),
            rel="http://www.opengis.net/def/rel/ogc/1.0/coverage-scenes",
        ),
        # TODO tilesets
        # http://www.opengis.net/def/rel/ogc/1.0/tilesets-coverage
    ]
    # links.extend(
    #     [
    #         LinkType(
    #             href=str(
    #                 request.url_for(
    #                     "collection_map_styled",
    #                     collection_id=collection_id,
    #                     style_id=style_id,
    #                 )
    #             ),
    #             rel="http://www.opengis.net/def/rel/ogc/1.0/map",
    #         )
    #         for tile_matrix_set_id in tile_matrix_set_id
    #     ]
    # )
    return links
Source code in stacture/stacture/api/coverages/api.py
def get_landing_page_links(self, request: Request) -> list[LinkType]:
    return []

stacture.api.maps

FLOAT_LIST_PATTERN = '^(-?\\d+(\\.\\d+)?,){3}-?\\d+(\\.\\d+)?$' module-attribute

GetMapCommon = Annotated[GetMapParameters, Depends(get_map_common)] module-attribute

HEX_COLOR_PATTERN = '^0x(?:[0-9a-fA-F]{3}){1,2}$' module-attribute

LOGGER = structlog.get_logger() module-attribute

MAP_REL = 'http://www.opengis.net/def/rel/ogc/1.0/map' module-attribute

maps_api = MapsAPI() module-attribute

router = APIRouter() module-attribute

GetMapParameters

Bases: BaseModel

accept: str instance-attribute
bbox: BBox instance-attribute
bgcolor: str instance-attribute
height: int instance-attribute
transparent: bool instance-attribute
width: int instance-attribute

MapsAPI()

Bases: BaseAPI

Source code in stacture/stacture/api/maps.py
def __init__(self):
    pass
get_api_router()
Source code in stacture/stacture/api/maps.py
def get_api_router(self) -> APIRouter:
    return router
Source code in stacture/stacture/api/maps.py
def get_collection_links(
    self, request: Request, collection_id: str, collection_conf: Collection
) -> list[LinkType]:
    map_conf = collection_conf.map
    if map_conf is None:
        return []

    links = [
        LinkType(
            href=str(request.url_for("collection_map", collection_id=collection_id)),
            rel="http://www.opengis.net/def/rel/ogc/1.0/map",
        )
    ]
    links.extend(
        [
            LinkType(
                href=str(
                    request.url_for(
                        "collection_map_styled",
                        collection_id=collection_id,
                        style_id=style_id,
                    )
                ),
                rel="http://www.opengis.net/def/rel/ogc/1.0/map",
            )
            for style_id in map_conf.styles
        ]
    )
    return links
Source code in stacture/stacture/api/maps.py
def get_landing_page_links(self, request: Request) -> list[LinkType]:
    return [LinkType(href=str(request.url_for("root_map")), rel="abc")]

collection_map(collection_id, common, config) async

Source code in stacture/stacture/api/maps.py
@router.get("/collections/{collection_id}/map")
async def collection_map(
    collection_id: str,
    common: GetMapCommon,
    config: Annotated[Config, Depends(get_config)],
):
    if not (collection := config.collections.get(collection_id)):
        raise HTTPException(status_code=HTTPStatus.NOT_FOUND)
    else:
        source = get_source(collection.source)
        items = source.search_items(SourceQuery(bbox=common.bbox, limit=10))
        # TODO; allow style selection
        workflow = await create_render_image_workflow(
            items=items,
            map_config=collection.map,
            style=(collection.map.default_style),
            width=common.width,
            height=common.height,
            bbox=common.bbox.as_tuple() if common.bbox else None,
            crs=common.bbox.crs,
            mediatype="image/png",
            background_color=common.bgcolor,
            preferred_alternate_asset_name=source.preferred_alternate_asset_name,
            transparent=common.transparent,
        )

        LOGGER.debug(f"Final created image workflow: {workflow}")
        response_content, response_headers = await send_terravis_image_request(
            workflow,
            config.terravis_url,
        )
        return Response(
            response_content,
            headers={
                "Content-Type": response_headers["Content-Type"],
            },
        )

collection_map_styled(collection_id, style_id, common)

Source code in stacture/stacture/api/maps.py
@router.get("/collections/{collection_id}/styles/{style_id}/map")
def collection_map_styled(collection_id: str, style_id: str, common: GetMapCommon):
    return {"collection": collection_id, "query": common}

get_map_common(width, height, bbox, bbox_crs='EPSG:4326', accept='*/*', transparent=False, bgcolor='0xFFFFFF')

Source code in stacture/stacture/api/maps.py
def get_map_common(
    width: int,
    height: int,
    bbox: Annotated[tuple[float, float, float, float], Depends(parse_bbox)],
    bbox_crs: Annotated[str, Query(alias="bbox-crs")] = "EPSG:4326",
    accept: Annotated[str, Header()] = "*/*",
    transparent: bool = False,
    bgcolor: Annotated[str, Depends(parse_bgcolor)] = "0xFFFFFF",
) -> GetMapParameters:
    _bbox = BBox(minx=bbox[0], miny=bbox[1], maxx=bbox[2], maxy=bbox[3], crs=bbox_crs)
    params = GetMapParameters(
        width=width,
        height=height,
        bbox=_bbox,
        accept=accept,
        transparent=transparent,
        bgcolor=bgcolor,
    )
    LOGGER.debug("Maps request", params=params)

    return params

parse_bbox(bbox=None)

Source code in stacture/stacture/api/maps.py
def parse_bbox(
    bbox: Annotated[str | None, Query(pattern=FLOAT_LIST_PATTERN)] = None,
) -> tuple[float, float, float, float] | None:
    if not bbox:
        return None
    bbox_parsed = tuple([float(part.strip()) for part in bbox.split(",")])
    if len(bbox_parsed) != 4:
        raise RequestValidationError(f"Invalid bounding box size: {len(bbox_parsed )}")
    return bbox_parsed

parse_bgcolor(bgcolor='0xFFFFFF')

Source code in stacture/stacture/api/maps.py
def parse_bgcolor(bgcolor: str = "0xFFFFFF") -> str:
    if not (match(HEX_COLOR_PATTERN, bgcolor) or bgcolor in colors):
        raise RequestValidationError(
            f"Invalid bgcolor {bgcolor}, use hex color value 0xRRGGBB or W3C color string"
        )

    return bgcolor

root_map(common)

Source code in stacture/stacture/api/maps.py
@router.get("/map")
def root_map(common: GetMapCommon):
    return {"query": common}

stacture.api.registry

APIRegistry()

Source code in stacture/stacture/api/registry.py
def __init__(self) -> None:
    self.apis: list[BaseAPI] = []
apis: list[BaseAPI] = [] instance-attribute
register(api)
Source code in stacture/stacture/api/registry.py
def register(self, api: BaseAPI):
    self.apis.append(api)

get_registry() cached

Source code in stacture/stacture/api/registry.py
@lru_cache
def get_registry() -> APIRegistry:
    config = get_config()
    registry = APIRegistry()

    from .core import CoreAPI
    from .coverages import CoveragesAPI
    from .maps import MapsAPI

    include_maps = config.apis.maps
    include_coverages = config.apis.coverages

    if include_maps or include_coverages:
        registry.register(CoreAPI())

    if include_maps:
        registry.register(MapsAPI())

    if include_coverages:
        registry.register(CoveragesAPI())

    return registry

stacture.interactions.utils

ASSET_HREF_RE = re.compile('#/assets/([^/]+)/?(?:bands/([0-9]*))?') module-attribute

LOGGER = structlog.get_logger() module-attribute

WarpOptions

Bases: TypedDict

bbox: tuple[float, float, float, float] | None instance-attribute
bbox_crs: str | None instance-attribute
crs: str | None instance-attribute
resolution_x: float | None instance-attribute
resolution_y: float | None instance-attribute
scale_x: float | None instance-attribute
scale_y: float | None instance-attribute
size_x: PositiveInt | None instance-attribute
size_y: PositiveInt | None instance-attribute

add_outlines(node, geometry, outline_color, width, height, bbox, crs)

Adds outline rendering, by wrapping the node in a geometry rasterization node Only adds that rasterization step when, geometry and outline color are passed.

Parameters:

Name Type Description Default
item Item

the Item to get the outline geometry

required
node Node

the node to wrap with

required
outline_color str | None

the name of the color of the outlines

required
width PositiveInt

rasterized image width

required
height PositiveInt

rasterized image height

required
bbox tuple[float, float, float, float]

bbox of the rasterized image

required
crs str

crs of the rasterized image

required

Returns:

Type Description
DatacubeNodes

dag.Node: the potentially wrapped node

Source code in stacture/stacture/interactions/utils.py
def add_outlines(
    node: dag.DatacubeNodes,
    geometry: dict | None,
    outline_color: str | None,
    width: PositiveInt,
    height: PositiveInt,
    bbox: tuple[float, float, float, float],
    crs: str,
) -> dag.DatacubeNodes:
    """Adds outline rendering, by wrapping the node in a geometry rasterization node
    Only adds that rasterization step when, geometry and outline color are passed.

    Args:
        item (pystac.Item): the Item to get the outline geometry
        node (dag.Node): the node to wrap with
        outline_color (str | None): the name of the color of the outlines
        width (PositiveInt): rasterized image width
        height (PositiveInt): rasterized image height
        bbox (tuple[float, float, float, float]): bbox of the rasterized image
        crs (str): crs of the rasterized image

    Returns:
        dag.Node: the potentially wrapped node
    """
    outlines = None
    if outline_color and geometry:
        outlines = geojson_pydantic.LineString(
            type="LineString", coordinates=geometry["coordinates"][0]
        )

    if outlines:
        return dag.Merge(
            args=[
                dag.Rasterize(
                    arg=outlines,
                    size=(width, height),
                    bbox=bbox,
                    crs=crs,
                    burn_values=outline_color,
                ),
                node,
            ]
        )

    return node

create_coverage_render_workflow(items, coverage_config, bbox, bbox_crs, output_crs, size_x, size_y, properties, mediatype) async

Source code in stacture/stacture/interactions/utils.py
async def create_coverage_render_workflow(
    items: AsyncIterator[pystac.Item],
    coverage_config: CoverageProperties,
    bbox: tuple[float, float, float, float] | None,
    bbox_crs: str,
    output_crs: str | None,
    size_x: PositiveInt | None,
    size_y: PositiveInt | None,
    properties: list[str] | None,
    mediatype: str,
) -> CoverageWorkflow:
    # TODO creater bbox from subsets and raise error for now when we don't
    # have a full bbox available

    assets = coverage_config.default if properties is None else properties

    item_dags = []
    item_functions: dict[str, dag.DatacubeNodes] = {}
    async for item in items:
        asset_dags = []

        for asset_name in assets:
            try:
                asset = item.assets[asset_name]
            except KeyError as e:
                raise BadRequest(
                    f"Requested asset {asset_name} not present in stac item {item.id}, "
                    f"only: {', '.join(item.assets.keys())}"
                ) from e

            asset_dag, functions = create_dag_from_asset(
                item,
                asset,
                warp_options={
                    "size_x": size_x,
                    "size_y": size_y,
                    "bbox": bbox,
                    "bbox_crs": bbox_crs,
                },
            )
            asset_dags.append(asset_dag)
            item_functions.update(functions)

        if len(asset_dags) == 0:
            # TODO:
            pass
        elif len(asset_dags) == 1:
            item_dags.append(asset_dags[0])
        else:
            item_dags.append(dag.Stack(args=asset_dags))

    if not item_dags:
        # maybe this is actually 404?
        raise BadRequest("No matching data found.")

    try:
        # no merge if only 1 item
        (root,) = item_dags
    except ValueError:
        # TODO: for now merge the result, otherwise keep them separate
        root = dag.Merge(args=item_dags)

    # TODO: currently Warp only allows very specific arguments
    # if output_crs is not None:
    #     root = dag.Warp(arg=root, )

    return CoverageWorkflow(
        root=root,
        functions=item_functions,
        format=mediatype,
    )

create_dag_from_asset(item, asset, warp_options, band_index=None, resample_alg='nearest', preferred_alternate_asset_name=None)

Create a DAG for a given asset. Returns the DAG itself, as well as any functions the DAG uses.

Parameters:

Name Type Description Default
item Item

The item to create the DAG for

required
asset Asset

The Asset to create the DAG for

required

Returns:

Type Description
tuple[DatacubeNodes, dict[str, DatacubeNodes]]

tuple[dag.Node, dict[str, dag.Node]]: The created DAG, and a dictionary of all function it uses.

Source code in stacture/stacture/interactions/utils.py
def create_dag_from_asset(
    item: pystac.Item,
    asset: pystac.Asset,
    warp_options: WarpOptions,
    band_index: int | list[int] | None = None,
    resample_alg: str = "nearest",
    preferred_alternate_asset_name: str | None = None,
) -> tuple[dag.DatacubeNodes, dict[str, dag.DatacubeNodes]]:
    """Create a DAG for a given asset. Returns the DAG itself, as well as any
    functions the DAG uses.

    Args:
        item (pystac.Item): The item to create the DAG for
        asset (pystac.Asset): The Asset to create the DAG for

    Returns:
        tuple[dag.Node, dict[str, dag.Node]]: The created DAG, and a dictionary
            of all function it uses.
    """
    # create DAG for virtual assets
    if asset.roles and "virtual" in asset.roles:
        vrt_hrefs = asset.extra_fields["vrt:hrefs"]

        arguments = {}
        functions = {}
        for href in vrt_hrefs:
            ref_asset, ref_band_index = resolve_asset_href_and_band(item, href["href"])
            arg, arg_funcs = create_dag_from_asset(
                item,
                ref_asset,
                warp_options,
                ref_band_index,
                preferred_alternate_asset_name=preferred_alternate_asset_name,
            )
            arguments[href["key"]] = arg
            functions.update(arg_funcs)

        if asset.extra_fields.get("vrt:algorithm") == "band_arithmetic":
            # we have a band arithmetic expression, we create a function for
            # expression and return a call to it with the argument mapping
            algorithm_opts = asset.extra_fields["vrt:algorithm_opts"]
            return dag.Call(
                name=algorithm_opts["expression"],
                arguments=cast(dict[str, dag.DatacubeNodes | dag.VectorNodes], arguments),
            ), {
                algorithm_opts["expression"]: convert_expression(
                    algorithm_opts["expression"],
                    algorithm_opts.get("dtype"),
                ),
                **functions,
            }
        else:
            # we have a band composition, so we stack them.
            return dag.Stack(args=list(arguments.values())), functions
    else:
        # allow the usage of alternate assets
        # (https://github.com/stac-extensions/alternate-assets)
        location: str
        if (alternate := asset.extra_fields.get("alternate")) and (
            alt_asset := alternate.get(preferred_alternate_asset_name)
        ):
            location = alt_asset.get("href", asset.href)
        else:
            location = asset.href

        # TODO: add some metadata?
        node: dag.DatacubeNodes = dag.OpenDatacube(location=location)

        # optionally select a band
        if isinstance(band_index, int):
            node = dag.Select(arg=node, bands=[band_index])
        elif isinstance(band_index, list):
            node = dag.Select(arg=node, bands=band_index)

        # Wrap it in a Warp for output CRS/size/bounds
        node = dag.Warp(
            arg=node,
            **warp_options,
            resample_alg=dag.ResampleAlg(resample_alg),
        )
        return node, {}

create_dag_from_render(item, render, warp_options, preferred_alternate_asset_name=None)

Create a DAG from a render extension

Parameters:

Name Type Description Default
item Item

The Item to create the DAG for

required
render dict

the render object

required
width PositiveInt

output width

required
height PositiveInt

output height

required
bbox tuple[float, float, float, float]

output bbox

required
crs str

output CRS

required

Returns:

Type Description
tuple[DatacubeNodes, dict[str, DatacubeNodes]]

dag.Node: the resulting DAG

Source code in stacture/stacture/interactions/utils.py
def create_dag_from_render(
    item: pystac.Item,
    render: dict,
    warp_options: WarpOptions,
    preferred_alternate_asset_name: str | None = None,
) -> tuple[dag.DatacubeNodes, dict[str, dag.DatacubeNodes]]:
    """Create a DAG from a render extension

    Args:
        item (pystac.Item): The Item to create the DAG for
        render (dict): the render object
        width (PositiveInt): output width
        height (PositiveInt): output height
        bbox (tuple[float, float, float, float]): output bbox
        crs (str): output CRS

    Returns:
        dag.Node: the resulting DAG
    """
    asset_names = render["assets"]
    assets = [item.assets[name] for name in asset_names]
    rescale = render.get("rescale")

    # get parent asset rescale if no rescale is provided
    if rescale is None:
        rescale = []
        for asset in assets:
            if asset_rescale := asset.extra_fields.get("vrt:algorithm_opts", {}).get(
                "rescale"
            ):
                rescale.extend(asset_rescale)

    if rescale and len(rescale) == 1:
        rescale *= len(assets)

    resampling: str | None = render.get("resampling")
    colormap_name: str | None = render.get("colormap_name")
    colormap: dict[str, str] | None = render.get("colormap")

    # TODO implement
    # nodata = render.get("nodata")
    # color_formula = render.get("color_formula")
    # minmax_zoom = render.get("minmax_zoom")

    dags: list[dag.DatacubeNodes] = []
    functions: dict[str, dag.DatacubeNodes] = {}
    node: dag.DatacubeNodes
    for asset in assets:
        root, funcs = create_dag_from_asset(
            item,
            asset,
            warp_options,
            resample_alg=resampling or "nearest",
            preferred_alternate_asset_name=preferred_alternate_asset_name,
        )
        dags.append(root)
        functions.update(funcs)

    classes = get_classes(assets[0], item)

    if expression := render.get("expression"):
        functions[expression] = convert_expression(expression)
        node = dag.Call(
            name=expression,
            arguments=cast(
                dict[str, dag.DatacubeNodes | dag.VectorNodes],
                dict(zip(asset_names, dags, strict=True)),
            ),
        )
    else:
        if rescale and not (colormap_name or colormap):
            if len(dags) == 1:
                node = dag.Rescale(
                    arg=dags[0], rescale=[(r[0], r[1], 0, 255) for r in rescale]
                )
            else:
                dags = [
                    dag.Rescale(arg=root, rescale=[resc])
                    for root, resc in zip(dags, rescale, strict=True)
                ]
                node = dag.Stack(args=dags) if len(dags) > 1 else dags[0]

        elif rescale:
            node = dag.Rescale(
                arg=dags[0], rescale=[(r[0], r[1], 0, 1.0) for r in rescale]
            )

        else:
            node = dag.Stack(args=dags) if len(dags) > 1 else dags[0]

    node = _create_dag_from_render_color_output(node, colormap, colormap_name, classes)

    return node, functions

create_render_image_workflow(items, map_config, style, width, height, bbox, crs, mediatype, background_color, transparent, preferred_alternate_asset_name) async

builds an imageworkflow based on rendering request inputs

Source code in stacture/stacture/interactions/utils.py
async def create_render_image_workflow(
    items: AsyncIterator[pystac.Item],
    map_config: MapProperties,
    style: str,
    width: PositiveInt,
    height: PositiveInt,
    bbox: tuple[float, float, float, float],
    crs: str,
    mediatype: str,
    background_color: str,
    transparent: bool,
    preferred_alternate_asset_name: str | None,
) -> ImageWorkflow:
    """
    builds an imageworkflow based on rendering request inputs
    """
    # decide which config used
    # todo add exceptions and handle multiple assets from config

    style_config = map_config.styles.get(style)
    if not style_config:
        raise StyleNotDefined(style)
    asset_name = style_config.asset
    render_name = style_config.render

    item_dags: list = []

    if not transparent and background_color is not None:
        color = pydantic_extra_types.color.Color(background_color)
        background_colors = list(
            cast(
                tuple[float, float, float, float],
                (
                    *(c for c in color.as_rgb_tuple(alpha=False)),
                    255,
                ),
            )
        )
        item_dags.append(
            dag.CreateDatacube(
                init_values=background_colors,  # COlor here
                datatype=dag.DataType.Byte,
                bbox=bbox,
                size=(width, height),
                crs=crs,
            )
        )
    item_functions: dict[str, dag.DatacubeNodes] = {}
    warp_options: WarpOptions = {
        "size_x": width,
        "size_y": height,
        "bbox": bbox,
        "crs": crs,
    }
    async for item in items:
        item_dag: dag.DatacubeNodes | None = None
        item_funcs: dict[str, dag.DatacubeNodes] | None = None
        if render_name and (
            render := item.properties.get("renders", {}).get(render_name)
        ):
            item_dag, item_funcs = create_dag_from_render(
                item,
                render,
                warp_options,
                preferred_alternate_asset_name=preferred_alternate_asset_name,
            )

        elif asset_name and (asset := item.assets.get(asset_name)):
            item_dag, item_funcs = create_dag_from_asset(
                item,
                asset,
                warp_options,
                preferred_alternate_asset_name=preferred_alternate_asset_name,
            )
        else:
            ...
            # TODO default rendering/asset?

        if item_dag and style_config.outline_color:
            item_dag = add_outlines(
                item_dag,
                item.geometry,
                style_config.outline_color,
                width,
                height,
                bbox,
                crs,
            )

        if item_dag:
            item_dags.insert(0, item_dag)
        if item_funcs:
            item_functions.update(item_funcs)

    # produce a final merge of the item renderings
    root: dag.DatacubeNodes
    if item_dags:
        root = dag.Merge(args=item_dags)
    else:
        root = dag.CreateDatacube(
            init_values=[0, 0, 0, 0],
            datatype=dag.DataType.Byte,
            bbox=bbox,
            size=(width, height),
            crs=crs,
        )

    return ImageWorkflow(root=root, format=mediatype, functions=item_functions)

get_classes(asset, item)

Source code in stacture/stacture/interactions/utils.py
def get_classes(asset: pystac.Asset, item: pystac.Item) -> dict | None:
    raster_bands = asset.extra_fields.get("raster:bands") or item.extra_fields.get(
        "raster:bands"
    )
    if raster_bands and len(raster_bands) == 1:
        return raster_bands[0].get("classification:classes")
    return None

resolve_asset_href_and_band(item, asset_href)

Resolve an asset href as defined in the (virtual-asset extension)[https://github.com/stac-extensions/virtual-assets#object-referencing-using-vrthrefs]

Parameters:

Name Type Description Default
item Item

The Item to resolve the href on.

required
asset_href str

The href

required

Raises:

Type Description
ValueError

When the href is not local to the item.

Returns:

Type Description
tuple[Asset, int | None]

tuple[pystac.Asset, int | None]: the resolved asset and an optional band index

Source code in stacture/stacture/interactions/utils.py
def resolve_asset_href_and_band(
    item: pystac.Item, asset_href: str
) -> tuple[pystac.Asset, int | None]:
    """Resolve an asset href as defined in the
    (virtual-asset extension)[https://github.com/stac-extensions/virtual-assets#object-referencing-using-vrthrefs]

    Args:
        item (pystac.Item): The Item to resolve the href on.
        asset_href (str): The href

    Raises:
        ValueError: When the href is not local to the item.

    Returns:
        tuple[pystac.Asset, int | None]: the resolved asset and an optional band index
    """
    # returns resolved asset and band index
    if self_href := item.get_self_href():
        asset_href = asset_href.removeprefix(self_href)

    if (match := ASSET_HREF_RE.match(asset_href)) is None:
        raise ValueError(f"Invalid asset href {asset_href}")

    asset_name, _band = match.groups()
    band = int(_band) if _band is not None else _band

    return (item.assets[asset_name], band)

send_terravis_coverage_request(workflow, url) async

Source code in stacture/stacture/interactions/utils.py
async def send_terravis_coverage_request(workflow, url):
    return await _terravis_request(terravis.client.coverage, workflow, url)

send_terravis_image_request(workflow, url) async

Source code in stacture/stacture/interactions/utils.py
async def send_terravis_image_request(workflow, url):
    return await _terravis_request(terravis.client.image, workflow, url)

stacture.source.fs.s3

open_from_credentials(fs_config)

Source code in stacture/stacture/source/fs/s3.py
def open_from_credentials(fs_config: config.S3Credentials) -> S3FileSystem:
    return S3FileSystem(
        key=fs_config.access_key,
        secret=fs_config.secret_key,
        endpoint_url=fs_config.endpoint_url,
        asynchronous=True,
    )

open_from_profile(fs_config)

Source code in stacture/stacture/source/fs/s3.py
def open_from_profile(fs_config: config.S3Profile) -> S3FileSystem:
    return S3FileSystem(
        profile=fs_config.profile,
        asynchronous=True,
    )

open_fs(fs_config) async

Source code in stacture/stacture/source/fs/s3.py
@contextlib.asynccontextmanager
async def open_fs(
    fs_config: config.S3Profile | config.S3Credentials,
) -> AsyncIterator[fsspec.asyn.AsyncFileSystem]:
    if isinstance(fs_config, config.S3Profile):
        fs = open_from_profile(fs_config)
    else:
        fs = open_from_credentials(fs_config)

    session = await fs.set_session()
    yield fs
    await session.close()

stacture.source.base

BBox(minx, maxx, miny, maxy, crs) dataclass

crs: str instance-attribute
maxx: float instance-attribute
maxy: float instance-attribute
minx: float instance-attribute
miny: float instance-attribute
as_tuple()
Source code in stacture/stacture/source/base.py
def as_tuple(self):
    return (self.minx, self.miny, self.maxx, self.maxy)

BaseSource(extra=None, preferred_alternate_asset_name=None)

Bases: ABC

Source code in stacture/stacture/source/base.py
def __init__(
    self, extra: dict | None = None, preferred_alternate_asset_name: str | None = None
):
    self.extra = extra
    self.preferred_alternate_asset_name = preferred_alternate_asset_name
extra = extra instance-attribute
preferred_alternate_asset_name = preferred_alternate_asset_name instance-attribute
extend_item(raw_item)
Source code in stacture/stacture/source/base.py
def extend_item(self, raw_item: dict) -> dict:
    if self.extra:
        return deepmerge(raw_item, self.extra)
    return raw_item
get_collection() abstractmethod async
Source code in stacture/stacture/source/base.py
@abstractmethod
async def get_collection(self) -> pystac.Collection: ...
get_item(identifier) abstractmethod async
Source code in stacture/stacture/source/base.py
@abstractmethod
async def get_item(self, identifier: str) -> pystac.Item | None: ...
search_items(query) abstractmethod
Source code in stacture/stacture/source/base.py
@abstractmethod
def search_items(self, query: Query) -> AsyncIterator[pystac.Item]: ...

Query(bbox=None, time=None, filter=None, limit=None, offset=None) dataclass

bbox: BBox | None = None class-attribute instance-attribute
filter: Node | None = None class-attribute instance-attribute
limit: int | None = None class-attribute instance-attribute
offset: int | None = None class-attribute instance-attribute
time: TimeInterval | None = None class-attribute instance-attribute

TimeInterval(start, end) dataclass

end: datetime | None instance-attribute
start: datetime | None instance-attribute

deepmerge(destination, source)

a = { 'first' : { 'all_rows' : { 'pass' : 'dog', 'number' : '1' } } } b = { 'first' : { 'all_rows' : { 'fail' : 'cat', 'number' : '5' } } } merge(b, a) == { 'first' : { 'all_rows' : { 'pass' : 'dog', 'fail' : 'cat', 'number' : '5' } } } True

Source code in stacture/stacture/source/base.py
def deepmerge(destination: dict, source: dict) -> dict:
    """
    >>> a = { 'first' : { 'all_rows' : { 'pass' : 'dog', 'number' : '1' } } }
    >>> b = { 'first' : { 'all_rows' : { 'fail' : 'cat', 'number' : '5' } } }
    >>> merge(b, a) == { 'first' : { 'all_rows' :
            { 'pass' : 'dog', 'fail' : 'cat', 'number' : '5' } } }
    True
    """
    for key, value in source.items():
        if isinstance(value, dict):
            # get node or create one
            node = destination.setdefault(key, {})
            deepmerge(node, value)
        else:
            destination[key] = value

    return destination

stacture.source.stac_api

LOGGER = structlog.get_logger() module-attribute

HTTPMethod

Bases: Enum

CONNECT = 'CONNECT' class-attribute instance-attribute
DELETE = 'DELETE' class-attribute instance-attribute
GET = 'GET' class-attribute instance-attribute
HEAD = 'HEAD' class-attribute instance-attribute
OPTIONS = 'OPTIONS' class-attribute instance-attribute
PATCH = 'PATCH' class-attribute instance-attribute
POST = 'POST' class-attribute instance-attribute
PUT = 'PUT' class-attribute instance-attribute
TRACE = 'TRACE' class-attribute instance-attribute

STACAPISource(href, collection, filter_=None, method=HTTPMethod.GET, extra=None, preferred_alternate_asset_name=None)

Bases: BaseSource

Source code in stacture/stacture/source/stac_api.py
def __init__(
    self,
    href: str,
    collection: str,
    filter_: dict | None = None,
    method: HTTPMethod = HTTPMethod.GET,
    extra: dict | None = None,
    preferred_alternate_asset_name: str | None = None,
):
    super().__init__(extra, preferred_alternate_asset_name)
    self.api_url = href
    self.collection = collection
    self.filter = filter_
    self.method = method

    self._collection: pystac.Collection | None = None
api_url = href instance-attribute
collection = collection instance-attribute
filter = filter_ instance-attribute
method = method instance-attribute
build_stac_api_params(query)

builds stac parameters from a query

Parameters:

Name Type Description Default
query Query

Query object

required

Returns:

Name Type Description
dict dict

stac parameters

Source code in stacture/stacture/source/stac_api.py
def build_stac_api_params(self, query: Query) -> dict:
    """builds stac parameters from a query

    Args:
        query (Query): Query object

    Returns:
        dict: stac parameters
    """
    params = {}

    if query.limit is not None:
        params["limit"] = str(query.limit)

    if query.bbox is not None:
        bbox = query.bbox.crs or "EPSG:4326"
        transformer = Transformer.from_crs(bbox, "EPSG:4326", always_xy=True)
        x_min_target, y_min_target = transformer.transform(
            query.bbox.minx,
            query.bbox.miny,
        )
        x_max_target, y_max_target = transformer.transform(
            query.bbox.maxx,
            query.bbox.maxy,
        )
        params["bbox"] = ",".join(
            [
                str(x_min_target),
                str(y_min_target),
                str(x_max_target),
                str(y_max_target),
            ]
        )

    if query.time:
        start = query.time.start
        end = query.time.end

        if start == end and start is not None:
            params["datetime"] = start.isoformat()
        else:
            start_str = start.isoformat() if start else ".."
            end_str = end.isoformat() if end else ".."
            params["datetime"] = f"{start_str}/{end_str}"

    return params
get_collection() async
Source code in stacture/stacture/source/stac_api.py
async def get_collection(self) -> pystac.Collection:
    if not self._collection:
        async with httpx.AsyncClient() as client:
            url = urljoin(self.api_url, f"collections/{self.collection}")
            response = await self.request(client, "GET", url)
            response.raise_for_status()
            self._collection = pystac.Collection.from_dict(response.json(), url)

    return self._collection
get_item(identifier) async
Source code in stacture/stacture/source/stac_api.py
async def get_item(self, identifier: str) -> pystac.Item | None:
    async with httpx.AsyncClient() as client:
        if self.collection:
            url = urljoin(
                self.api_url,
                f"collections/{self.collection}/items/{identifier}",
            )

            try:
                response = await self.request(
                    client,
                    self.method.value,
                    url,
                )
            except httpx.HTTPStatusError as e:
                if e.response.status_code == HTTPStatus.NOT_FOUND:
                    return None
                else:
                    raise
            else:
                return pystac.Item.from_dict(self.extend_item(response.json()))

        else:
            url = urljoin(self.api_url, "search")
            params = {"ids": identifier}

            response = await self.request(
                client,
                self.method.value,
                url,
                params=params,
            )
            feature_collection = response.json()
            if feature_collection["features"]:
                return pystac.Item.from_dict(
                    self.extend_item(feature_collection["features"][0])
                )
            else:
                return None
request(client, method, url, params=None) async
Source code in stacture/stacture/source/stac_api.py
async def request(
    self, client: httpx.AsyncClient, method: str, url: str, params: dict | None = None
):
    LOGGER.debug("Sending STAC API request", method=method, url=url, params=params)
    response = await client.request(method, url, params=params)
    response.raise_for_status()
    return response
search_items(query) async
Source code in stacture/stacture/source/stac_api.py
async def search_items(self, query: Query) -> AsyncIterator[pystac.Item]:
    async with httpx.AsyncClient() as client:
        url = urljoin(self.api_url, "search")
        if self.collection:
            url = urljoin(self.api_url, f"collections/{self.collection}/items")

        params = self.build_stac_api_params(query)

        response = await self.request(client, self.method.value, url, params=params)

        feature_collection = response.json()

        total_count = 0
        for feature in feature_collection["features"]:
            if query.limit is not None and total_count >= query.limit:
                break
            yield pystac.Item.from_dict(self.extend_item(feature))
            total_count += 1

        while query.limit is not None and total_count < query.limit:
            if not (next_link := get_next_link(feature_collection["links"])):
                break

            response = await self.request(
                client,
                next_link.get("method", "GET"),
                next_link["href"],
            )
            feature_collection = response.json()

            for feature in feature_collection["features"]:
                if query.limit is not None and total_count >= query.limit:
                    break
                yield pystac.Item.from_dict(self.extend_item(feature))
                total_count += 1
Source code in stacture/stacture/source/stac_api.py
def get_next_link(links: list[dict]) -> dict | None:
    for link in links:
        if link.get("rel") == "next":
            return link
    return None

stacture.source.stac_item

STACItemSource(href, fs, preferred_alternate_asset_name=None)

Bases: BaseSource

Source code in stacture/stacture/source/stac_item.py
def __init__(
    self,
    href: str,
    fs: fsspec.asyn.AsyncFileSystem,
    preferred_alternate_asset_name: str | None = None,
):
    super().__init__(preferred_alternate_asset_name=preferred_alternate_asset_name)
    self.href = href
    self.fs = fs
fs = fs instance-attribute
href = href instance-attribute
get_collection() async
Source code in stacture/stacture/source/stac_item.py
async def get_collection(self) -> pystac.Collection:
    raise NotImplementedError
get_item(identifier) async
Source code in stacture/stacture/source/stac_item.py
async def get_item(self, identifier: str) -> pystac.Item | None:
    item = await self._get_item()
    return item if item.id == identifier else None
search_items(query) async
Source code in stacture/stacture/source/stac_item.py
async def search_items(self, query: Query) -> AsyncIterator[pystac.Item]:
    # TODO: check if item matches query
    item = await self._get_item()
    yield item

stacture.ows.base

BaseOWS

request(name, default_version=None)

Source code in stacture/stacture/ows/base.py
def request(name: str, default_version: Version | None = None):
    @wraps
    def inner(func: Any) -> Callable:
        func._OWS_SERVICE_REQUEST = name
        func._OWS_DEFAULT_VERSION = default_version
        return func

    return inner

service(name, versions)

Source code in stacture/stacture/ows/base.py
def service(name: str, versions: set[Version]):
    @wraps
    def inner(cls: type[BaseOWS]) -> type[BaseOWS]:
        cls._OWS_SERVICE_NAME = name
        cls._OWS_SERVICE_VERSIONS = name
        return cls

    return inner

stacture.ows.registry

OWSRegistry()

Source code in stacture/stacture/ows/registry.py
def __init__(self):
    self.ows = {}
ows = {} instance-attribute
get_handler(service, version, request)
Source code in stacture/stacture/ows/registry.py
def get_handler(self, service: str, version: Version | None, request: str):
    versions = self.ows.get(service)
    # TODO: print only added to fix linting
    print(versions)
register(ows)
Source code in stacture/stacture/ows/registry.py
def register(self, ows: BaseOWS):
    service_reg = self.ows.setdefault(ows._OWS_SERVICE_NAME.lower(), {})
    for version in ows._OWS_SERVICE_VERSIONS:
        service_reg[version] = ows

stacture.ows.router

router = APIRouter() module-attribute

wms = WebMapService() module-attribute

ows_get(request, request_params, config) async

Source code in stacture/stacture/ows/router.py
@router.get("/ows")
async def ows_get(
    request: Request,
    request_params: Annotated[tuple[OWSRequest, dict], Depends(ows_parameters)],
    config: Annotated[Config, Depends(get_config)],
):
    ows_request, params = request_params

    match ows_request:
        case OWSRequest.GETCAPABILITIES:
            url = str(request.url.replace(query=None))
            return await wms.get_capabilities(
                config, url, params["version"], params.get("acceptversions")
            )
        case OWSRequest.GETMAP:
            return await wms.get_map(
                config,
                layers=params["layers"],
                styles=params["styles"],
                bbox=params["bbox"],
                width=params["width"],
                height=params["height"],
                _format=params["format"],
                transparent=params["transparent"],
                bgcolor=params["bgcolor"],
                exceptions=params["exceptions"],
                time=params["time"],
            )
        case OWSRequest.GETFEATUREINFO:
            raise NotImplementedError
        case OWSRequest.DESCRIBELAYER:
            raise NotImplementedError
        case OWSRequest.GETLEGENDGRAPHIC:
            raise NotImplementedError
        case _:
            raise HTTPException(status_code=400, detail="Unsupported request")

stacture.ows.wms

LOGGER = structlog.get_logger() module-attribute

WebMapService

VERSIONS = (Version(1, 1, 0), Version(1, 1, 1), Version(1, 3, 0)) class-attribute instance-attribute
get_capabilities(config, url, version, acceptversions=None) async
Source code in stacture/stacture/ows/wms.py
async def get_capabilities(
    self,
    config: Config,
    url: str,
    version: str,
    acceptversions: str | None = None,
) -> Response:
    version_param = version
    if version_param is None or version_param not in self.VERSIONS:
        version = self.version_negotiation(acceptversions, self.VERSIONS)
    else:
        version = Version.from_str(version_param)

    available_projections = [
        f"EPSG:{crs}" for crs in config.apis.wms_config.projections
    ]
    if "EPSG:4326" in available_projections:
        available_projections.insert(0, "CRS:84")

    for collection_id, collection_conf in config.collections.items():
        map_conf = collection_conf.map
        if map_conf is None:
            continue

        source = get_source(collection_conf.source)
        # TODO: parallelize this here
        collection = await source.get_collection()

        styles = [Style(style_name, style_name, "") for style_name in map_conf.styles]
        # prepare time dimension from collection temporal extent
        temporal_extent = collection.extent.temporal.intervals
        start_datetime = (
            temporal_extent[0][0] or datetime(1970, 1, 1, 0, 0, 0, tzinfo=UTC)
        ).strftime("%Y-%m-%dT%H:%M:%SZ")
        end_datetime = (temporal_extent[0][1] or datetime.now(UTC)).strftime(
            "%Y-%m-%dT%H:%M:%SZ"
        )

        dimensions = [
            Dimension(
                name="time",
                units="ISO8601",
                default=end_datetime,
                values=[f"{start_datetime}/{end_datetime}/PT1S"],
            )
        ]

        layers = []
        layers.append(
            Layer(
                title=collection.title,
                name=collection_id,
                abstract=collection.description,
                keywords=collection.keywords,
                wgs84_bounding_box=WGS84BoundingBox(
                    collection.extent.spatial.bboxes[0]
                ),
                dimensions=dimensions,
                styles=styles,
            )
        )

    capabilities = ServiceCapabilities.with_defaults(
        url,
        ["image/png", "image/jpeg"],
        layer=Layer(
            title="stacture root",
            name="root",
            abstract="",
            keywords=[],
            crss=available_projections,
            wgs84_bounding_box=[],
            bounding_boxes=[],
            dimensions=[],
            attribution="",
            authority_urls={},
            identifiers={},
            metadata_urls=[],
            data_urls=[],
            feature_list_urls=[],
            styles=[],
            min_scale_denominator=None,
            max_scale_denominator=None,
            layers=layers,
            queryable=True,
            cascaded=None,
            opaque=False,
        ),
        service_type_versions=[str(version)],
    )
    result = encoders_v13.xml_encode_capabilities(capabilities)
    return Response(result.value, media_type=result.content_type)
get_map(config, layers, styles, bbox, width, height, _format, transparent=False, bgcolor='0xFFFFFF', time=None, exceptions='XML') async
Source code in stacture/stacture/ows/wms.py
async def get_map(
    self,
    config: Config,
    layers: list[str],
    styles: list[str],
    bbox: BBox,
    width: int,
    height: int,
    _format: str,
    transparent: bool = False,
    bgcolor: str = "0xFFFFFF",
    time: TimeInterval | None = None,
    exceptions: str = "XML",
) -> Response:
    def serialize_error(error_message: str, code: str) -> Response:
        if exceptions in ("inimage", "blank"):
            mediatype = _format
            image_encoder = WMS13ExceptionImageEncoder(
                width,
                height,
                _format,
                bgcolor,
                exceptions == "blank",
            )

            le_image = image_encoder.encode_exception(error_message)
            content = image_encoder.serialize(le_image)
        else:
            xml_encoder = WMS13ExceptionXMLEncoder()
            le_xml = xml_encoder.encode_exception(error_message, code)
            content = xml_encoder.serialize(le_xml)

            mediatype = xml_encoder.content_type

        return Response(
            content,
            headers={"Content-Type": mediatype},
        )

    try:
        allowed_projections = config.apis.wms_config.projections
        srid = int(bbox.crs.replace("EPSG:", ""))
        if srid == -1 or (allowed_projections and srid not in allowed_projections):
            raise InvalidCRS(srid, "CRS")
        collection_id = layers[0]

        if not (collection_conf := config.collections.get(collection_id)):
            for collection_conf in config.collections.values():
                if any(
                    re.match(pattern, collection_id)
                    for pattern in collection_conf.id_patterns
                ):
                    break
            else:
                raise LayerNotDefined(collection_id)
        source = get_source(collection_conf.source)

        query = Query(
            bbox=bbox,
            time=time,
            limit=10,
        )
        items = source.search_items(query)
        workflow = await create_render_image_workflow(
            items=items,
            map_config=collection_conf.map,
            style=(styles[0] if styles else collection_conf.map.default_style),
            width=width,
            height=height,
            bbox=bbox.as_tuple(),
            crs=bbox.crs,
            mediatype=_format,
            background_color=bgcolor,
            preferred_alternate_asset_name=source.preferred_alternate_asset_name,
            transparent=transparent,
        )
        LOGGER.debug(f"Final created image workflow: {workflow}")

    except WMSException as e:
        LOGGER.info(e)
        return serialize_error(error_message=str(e), code=e.code)

    try:
        response_content, response_headers = await send_terravis_image_request(
            workflow,
            config.terravis_url,
        )
        return Response(
            response_content,
            headers={"Content-Type": response_headers["Content-Type"]},
        )
    except StactureHTTPException as e:
        LOGGER.exception(e)
        return serialize_error(error_message=str(e), code=e.error_type)
    except HTTPError as e:
        LOGGER.exception(e)
        return serialize_error(error_message=str(e), code=type(e).__name__)
version_negotiation(versions, accepted_versions) staticmethod
Source code in stacture/stacture/ows/wms.py
@staticmethod
def version_negotiation(versions: str | None, accepted_versions: Iterable[Version]):
    server_versions = sorted(accepted_versions, reverse=True)
    if versions is None:
        return next(iter(server_versions))
    versions_list = [Version.from_str(version) for version in versions.split(",")]
    client_versions = sorted(versions_list, reverse=True)  # highest to lowest
    highest_compatible_server_version = None
    for client_version in client_versions:
        for server_version in server_versions:
            if client_version == server_version:
                return client_version
            if client_version > server_version:
                highest_compatible_server_version = server_version
            if (
                client_version < server_version
                and client_version != client_versions[0]
            ):
                return highest_compatible_server_version
    raise VersionNegotiationException()

Terravis

terravis.client

LOGGER = structlog.get_logger() module-attribute

coverage(endpoint, workflow) async

Source code in stacture/terravis/client.py
async def coverage(
    endpoint: str, workflow: CoverageWorkflow
) -> tuple[bytes, httpx.Headers]:
    headers = _prepare_headers()
    async with httpx.AsyncClient(headers=headers) as client:
        url = urljoin(endpoint, "/coverage/")
        response = await client.post(url, json=workflow.model_dump(), timeout=None)
        response.raise_for_status()
        # TODO: maybe return complete response if more data is needed?
        return await response.aread(), response.headers

image(endpoint, workflow) async

Source code in stacture/terravis/client.py
async def image(endpoint: str, workflow: ImageWorkflow) -> tuple[bytes, httpx.Headers]:
    headers = _prepare_headers()
    async with httpx.AsyncClient(headers=headers) as client:
        url = urljoin(endpoint, "/image/")
        with log_time(LOGGER, "Sending requesting /image", headers=headers):
            response = await client.post(url, json=workflow.model_dump(), timeout=None)
        response.raise_for_status()
        # TODO: maybe return complete response if more data is needed?
        return await response.aread(), response.headers

statistics(endpoint, workflow) async

Source code in stacture/terravis/client.py
async def statistics(endpoint: str, workflow: StatisticsWorkflow) -> dict:
    headers = _prepare_headers()
    async with httpx.AsyncClient(headers=headers) as client:
        url = urljoin(endpoint, "/statistics/")
        response = await client.post(url, json=workflow.model_dump(), timeout=None)
        response.raise_for_status()
        return json.loads(await response.aread())

terravis.utils

BadRequest(*args, **kwargs)

Bases: StactureHTTPException

Convenience Stacture HTTPException for 400 Bad Request errors

Source code in stacture/terravis/utils.py
def __init__(self, *args, **kwargs):
    super().__init__(HTTPStatus.BAD_REQUEST, *args, **kwargs)

HTTPNotFound(*args, **kwargs)

Bases: StactureHTTPException

Convenience Stacture HTTPException for 404 Not Found errors

Source code in stacture/terravis/utils.py
def __init__(self, *args, **kwargs):
    super().__init__(HTTPStatus.NOT_FOUND, *args, **kwargs)

HttpNotImplemented(*args, **kwargs)

Bases: StactureHTTPException

Convenience Stacture HTTPException for 501 Not Implemented errors

Source code in stacture/terravis/utils.py
def __init__(self, *args, **kwargs):
    super().__init__(HTTPStatus.NOT_IMPLEMENTED, *args, **kwargs)

StactureHTTPException(*args, error_type=None, **kwargs)

Bases: HTTPException

Extension of basic fastapi errors with error_type

This type is serialized to the caller to provide more context. This can be used to decide if an error is the caller's fault or the caller's caller.

Source code in stacture/terravis/utils.py
def __init__(self, *args, error_type=None, **kwargs):
    super().__init__(*args, **kwargs)
    self.error_type = error_type
error_type = error_type instance-attribute

gather_dict(values) async

Source code in stacture/terravis/utils.py
async def gather_dict(values: dict[str, Awaitable]) -> dict:
    keys = values.keys()
    result_values = await asyncio.gather(*values.values())
    return dict(zip(keys, result_values, strict=False))

terravis.workflow

BaseWorkflow

Bases: BaseModel

credentials_profile: Annotated[str | None, StringConstraints(pattern='^[^\\./:]+$')] = None class-attribute instance-attribute
functions: dict[str, DatacubeNodes] = Field(default_factory=dict) class-attribute instance-attribute
root: DatacubeNodes instance-attribute
check_circular_references()
Source code in stacture/terravis/workflow.py
@model_validator(mode="after")
def check_circular_references(self) -> "BaseWorkflow":
    sorter: TopologicalSorter = TopologicalSorter()
    sorter.add(None, *find_call_names(self.root))
    for name, node in self.functions.items():
        sorter.add(name, *find_call_names(node))
    # will raise a CycleError if circular references detected
    list(sorter.static_order())
    return self
check_function_and_args()
Source code in stacture/terravis/workflow.py
@model_validator(mode="after")
def check_function_and_args(self) -> "BaseWorkflow":
    function_argnames = {
        name: set(find_args(node)) for name, node in self.functions.items()
    }

    # TODO also validate argument types

    for node in chain([self.root], self.functions.values()):
        for name, args in find_calls_and_args(node):
            if name not in self.functions:
                raise ValueError(f"No such function {name}")

            args = set(args)
            function_args = function_argnames[name]
            unknown_args = args - function_args
            missing_args = function_args - args
            if unknown_args:
                raise ValueError(
                    f"Unknown arguments {', '.join(unknown_args)} for "
                    f"function {name}"
                )
            if missing_args:
                raise ValueError(
                    f"Missing arguments {', '.join(missing_args)} for "
                    f"function {name}"
                )
    return self

CoverageWorkflow

Bases: BaseWorkflow

encoding_options: dict[str, str] = Field(default_factory=dict) class-attribute instance-attribute
format: str instance-attribute

ImageWorkflow

Bases: BaseWorkflow

encoding_options: dict[str, str] = Field(default_factory=dict) class-attribute instance-attribute
format: str instance-attribute

StatisticsWorkflow

Bases: BaseWorkflow

WorkflowType

Bases: str, Enum

COVERAGE = 'coverage' class-attribute instance-attribute
IMAGE = 'image' class-attribute instance-attribute
STATISTICS = 'statistics' class-attribute instance-attribute

find_args(node)

Source code in stacture/terravis/workflow.py
def find_args(node: Node):
    yield from descend_tree(
        node, (DatacubeArg, VectorArg, ScalarArg), lambda node: node.name
    )

find_call_names(node)

Source code in stacture/terravis/workflow.py
def find_call_names(node: Node):
    yield from descend_tree(node, Call, lambda node: node.name)

find_calls_and_args(node)

Source code in stacture/terravis/workflow.py
def find_calls_and_args(node: Node):
    yield from descend_tree(node, Call, lambda node: (node.name, node.arguments.keys()))

terravis.runner

LOGGER = structlog.get_logger() module-attribute

AsyncThreadedWorkflowRunner(engine, loop=None)

Bases: WorkflowRunner

Source code in stacture/terravis/runner.py
def __init__(self, engine: Engine, loop: asyncio.AbstractEventLoop | None = None):
    super().__init__(engine)
    self.loop = loop or asyncio.new_event_loop()
loop = loop or asyncio.new_event_loop() instance-attribute
execute_workflow(workflow)
Source code in stacture/terravis/runner.py
def execute_workflow(self, workflow: BaseWorkflow):
    # return self.run_node(workflow.root, RunContext(workflow.functions))
    with log_time(LOGGER, "Running workflow", level="info"):
        return self.loop.run_until_complete(
            self._run_node(workflow.root, RunContext(workflow.functions))
        )

RunContext(functions, arguments=dict(), parent=None, call_parent=None, path='$.workflow') dataclass

arguments: dict[str, Any] = field(default_factory=dict) class-attribute instance-attribute
call_parent: Union[RunContext, None] = None class-attribute instance-attribute
full_path: str property
full_paths: list[str] property
functions: Mapping[str, Node] instance-attribute
parent: Union[RunContext, None] = None class-attribute instance-attribute
path: str = '$.workflow' class-attribute instance-attribute
make_call_child(function_name, arguments)
Source code in stacture/terravis/runner.py
def make_call_child(self, function_name: str, arguments: dict[str, Any]):
    return dataclasses.replace(
        self,
        path=f'$.functions["{function_name}"]',
        arguments=arguments,
        parent=None,
        call_parent=self,
    )
make_child(path)
Source code in stacture/terravis/runner.py
def make_child(self, path):
    return dataclasses.replace(self, path=path, parent=self, call_parent=None)

WorkflowRunner(engine)

Source code in stacture/terravis/runner.py
def __init__(self, engine: Engine):
    self.engine = engine
ADOPT_TYPES = (Node, Scalar, *get_args(Geometry)) class-attribute instance-attribute
engine = engine instance-attribute
execute_workflow(workflow)
Source code in stacture/terravis/runner.py
def execute_workflow(self, workflow: BaseWorkflow):
    with log_time(LOGGER, "Running workflow", level="info"):
        return self.run_node(workflow.root, RunContext(workflow.functions))
run_node(node, context)
Source code in stacture/terravis/runner.py
def run_node(self, node: Node, context: RunContext):
    sub_values = {
        name: self._adopt_node(sub_node, context.make_child(f".{name}"))
        for name, sub_node in vars(node).items()
        if isinstance(sub_node, self.ADOPT_TYPES)
    }

    # handle list[Node] as well
    sub_values.update(
        {
            name: [
                self._adopt_node(item, context.make_child(f".{name}[{i}]"))
                for i, item in enumerate(items)
            ]
            for name, items in vars(node).items()
            if isinstance(items, list)
            and all(isinstance(item, self.ADOPT_TYPES) for item in items)
        }
    )
    # handle dict[str, Node]
    sub_values.update(
        {
            name: {
                key: self._adopt_node(item, context.make_child(f".{name}.{key}"))
                for key, item in items.items()
            }
            for name, items in vars(node).items()
            if isinstance(items, dict)
            and all(isinstance(item, self.ADOPT_TYPES) for item in items.values())
        }
    )

    if isinstance(node, Call):  # call into function
        return self.run_node(
            context.functions[node.name],
            context.make_call_child(node.name, sub_values["arguments"]),
        )

    with log_time(LOGGER, f"Handling node {type(node)}", level="debug"):
        return self.engine.process_node(node, **sub_values)

terravis.engine

DatacubeResult = TypeVar('DatacubeResult') module-attribute

SET_ENV_THREAD_LOCK = threading.Lock() module-attribute

VectorResult = TypeVar('VectorResult') module-attribute

Engine(config, credentials_profile=None)

Bases: ABC, Generic[DatacubeResult, VectorResult]

Source code in stacture/terravis/engine.py
def __init__(self, config: Config, credentials_profile: str | None = None) -> None:
    # good idea to give engine access to full config?
    self.config = config
    # good idea to have this here?
    self.credentials_profile = credentials_profile
config = config instance-attribute
credentials_profile = credentials_profile instance-attribute
handler_map: dict[type, Callable] instance-attribute
encode_image(result, format_, encode_options) abstractmethod
Source code in stacture/terravis/engine.py
@abstractmethod
def encode_image(
    self, result: DatacubeResult, format_: str, encode_options: dict[str, str]
) -> Any:
    pass
process_node(node, **kwargs)
Source code in stacture/terravis/engine.py
def process_node(self, node: Node, **kwargs) -> DatacubeResult | VectorResult:
    node_type = type(node)
    handler = self.handler_map.get(node_type)
    if not handler:
        raise TypeError(f"Type {node_type} is not supported")

    return handler(self, node, **kwargs)
select_format(format_, default) abstractmethod

Returns a mime type acceptable to the engine given a requested specification

Source code in stacture/terravis/engine.py
@abstractmethod
def select_format(self, format_: str, default: str) -> str:
    """Returns a mime type acceptable to the engine given a requested specification"""
    pass
set_access_credentials()
Source code in stacture/terravis/engine.py
@contextlib.contextmanager
def set_access_credentials(self):
    credentials_env = self._read_credentials_env()

    # this lock should ensure that 2 threads don't modify os.environ at the same time.
    # note that different gunicorn worker processes are allowed to modify their env,
    # this is only relevant if we have threads.
    with SET_ENV_THREAD_LOCK:
        os.environ.update(credentials_env)

        yield

        # modifying env vars is hacky already, try to not leak to much info though
        for key in credentials_env:
            del os.environ[key]
statistics(result) abstractmethod
Source code in stacture/terravis/engine.py
@abstractmethod
def statistics(self, result: DatacubeResult) -> dict:
    pass

EngineMeta(name, bases, dct)

Bases: ABCMeta

Metaclass for the Engine class to create a static map for all handler methods by their respective handled types.

Source code in stacture/terravis/engine.py
def __init__(cls, name, bases, dct: dict):
    cls.handler_map: dict[type, Callable] = {}
    for base in bases:
        cls.handler_map.update(getattr(base, "handler_map", {}))

    for value in dct.values():
        if hasattr(value, "handles_classes"):
            for handled_class in value.handles_classes:
                cls.handler_map[handled_class] = value

handle(*node_classes)

Function-decorator to mark a class function as a handler for a given node type.

Source code in stacture/terravis/engine.py
def handle(*node_classes: type) -> Callable:
    """Function-decorator to mark a class function as a handler for a
    given node type.
    """
    if not node_classes:
        raise ValueError("Must provide at least one node class")

    @wraps(handle)
    def inner(func):
        func.handles_classes = node_classes
        return func

    return inner

terravis.dag

DatacubeNodes = Annotated[OpenDatacube | CreateDatacube | Slice | Select | Stack | Warp | Arithmetic | Rescale | Cast | Function | Hillshade | Slope | Aspect | TRI | TPI | Roughness | Pansharpen | Rasterize | Colorize | Merge | Call | DatacubeArg, pydantic.Field(discriminator='op')] module-attribute

Geometry = GeometryCollection | Polygon | MultiPolygon | LineString | MultiLineString | Point | MultiPoint module-attribute

ResultType = TypeVar('ResultType', bound=ValueType) module-attribute

VectorNodes = Annotated[OpenVector | Contours | VectorArg, pydantic.Field(discriminator='op')] module-attribute

Argument

Bases: BaseModel

name: str instance-attribute

Arithmetic

Bases: Node[Datacube]

lhs: Union[DatacubeNodes, Scalar] instance-attribute
op: Literal['Arithmetic'] = 'Arithmetic' class-attribute instance-attribute
operation: ArithmeticOp instance-attribute
rhs: Union[DatacubeNodes, Scalar] instance-attribute

ArithmeticOp

Bases: str, Enum

ADD = '+' class-attribute instance-attribute
DIV = '/' class-attribute instance-attribute
MUL = '*' class-attribute instance-attribute
SUB = '-' class-attribute instance-attribute

Aspect

Bases: Node[Datacube]

algorithm: DEMAlgorithm = DEMAlgorithm.HORN class-attribute instance-attribute
arg: DatacubeNodes instance-attribute
op: Literal['Aspect'] = 'Aspect' class-attribute instance-attribute
scale: float instance-attribute
trignonometric: bool instance-attribute
zero_for_flat: bool instance-attribute

Call

Bases: Node[Datacube]

arguments: dict[str, Union[DatacubeNodes, VectorNodes]] instance-attribute
name: str instance-attribute
op: Literal['Call'] = 'Call' class-attribute instance-attribute

Cast

Bases: Node[Datacube]

arg: DatacubeNodes instance-attribute
datatype: DataType instance-attribute
op: Literal['Cast'] = 'Cast' class-attribute instance-attribute

ColorMap

Bases: BaseModel

entries: list[ColorMapEntry] instance-attribute
type: ColorMapType = ColorMapType.ramp class-attribute instance-attribute

ColorMapEntry

Bases: BaseModel

color: str instance-attribute
label: str | None = None class-attribute instance-attribute
opacity: float = 1.0 class-attribute instance-attribute
quantity: float instance-attribute

ColorMapType

Bases: str, Enum

intervals = 'intervals' class-attribute instance-attribute
ramp = 'ramp' class-attribute instance-attribute
values = 'values' class-attribute instance-attribute

Colorize

Bases: Node[Datacube]

arg: DatacubeNodes instance-attribute
colormap: ColorMap | str instance-attribute
normalize: tuple[float, float] | None = None class-attribute instance-attribute
op: Literal['Colorize'] = 'Colorize' class-attribute instance-attribute

Contours

Bases: Node[Vector]

arg: DatacubeNodes instance-attribute
base: float instance-attribute
id_field: str = 'id' class-attribute instance-attribute
interval: float instance-attribute
layer_name: str = 'contours' class-attribute instance-attribute
op: Literal['Contours'] = 'Contours' class-attribute instance-attribute
value_field: str = 'data' class-attribute instance-attribute

CreateDatacube

Bases: Node[Datacube]

bbox: tuple[float, float, float, float] instance-attribute
crs: str | None = None class-attribute instance-attribute
datatype: DataType = DataType.Byte class-attribute instance-attribute
init_values: list[float] instance-attribute
op: Literal['CreateDatacube'] = 'CreateDatacube' class-attribute instance-attribute
resolution: tuple[float, float] | None = None class-attribute instance-attribute
size: tuple[pydantic.PositiveInt, pydantic.PositiveInt] instance-attribute

DEMAlgorithm

Bases: str, Enum

HORN = 'Horn' class-attribute instance-attribute
ZEVENBERGEN_THORNE = 'ZevenbergenThorne' class-attribute instance-attribute

DataType

Bases: str, Enum

Byte = 'Byte' class-attribute instance-attribute
Float32 = 'Float32' class-attribute instance-attribute
Float64 = 'Float64' class-attribute instance-attribute
Int16 = 'Int16' class-attribute instance-attribute
Int32 = 'Int32' class-attribute instance-attribute
Int64 = 'Int64' class-attribute instance-attribute
Int8 = 'Int8' class-attribute instance-attribute
UInt16 = 'UInt16' class-attribute instance-attribute
UInt32 = 'UInt32' class-attribute instance-attribute
UInt64 = 'UInt64' class-attribute instance-attribute

Datacube

Bases: ValueType

DatacubeArg

Bases: Argument, Node[Datacube]

op: Literal['DatacubeArg'] = 'DatacubeArg' class-attribute instance-attribute

Function

Bases: Node[Datacube]

args: list[Union[DatacubeNodes, Scalar]] instance-attribute
name: str instance-attribute
op: Literal['Function'] = 'Function' class-attribute instance-attribute

Hillshade

Bases: Node[Datacube]

algorithm: DEMAlgorithm = DEMAlgorithm.HORN class-attribute instance-attribute
altitude: float instance-attribute
arg: DatacubeNodes instance-attribute
azimuth: float = 315.0 class-attribute instance-attribute
mode: HillshadeMode = HillshadeMode.NORMAL class-attribute instance-attribute
op: Literal['Hillshade'] = 'Hillshade' class-attribute instance-attribute
scale: float instance-attribute
z_factor: float instance-attribute

HillshadeMode

Bases: str, Enum

COMBINED = 'combined' class-attribute instance-attribute
IGOR = 'igor' class-attribute instance-attribute
MULTI_DIRECTIONAL = 'multiDirectional' class-attribute instance-attribute
NORMAL = 'normal' class-attribute instance-attribute

Mask

Bases: Node[Datacube]

arg: DatacubeNodes instance-attribute
invert: bool = False class-attribute instance-attribute
mask: DatacubeNodes instance-attribute
op: Literal['Mask'] = 'Mask' class-attribute instance-attribute

Merge

Bases: Node[Datacube]

args: list[DatacubeNodes] instance-attribute
op: Literal['Merge'] = 'Merge' class-attribute instance-attribute

Metadata

Bases: ValueType

Node

Bases: BaseModel, Generic[ResultType]

OpenDatacube

Bases: Node[Datacube]

location: str instance-attribute
op: Literal['OpenDatacube'] = 'OpenDatacube' class-attribute instance-attribute

OpenVector

Bases: Node[Vector]

location: str instance-attribute
op: Literal['OpenVector'] = 'OpenVector' class-attribute instance-attribute

Pansharpen

Bases: Node[Datacube]

op: Literal['Pansharpen'] = 'Pansharpen' class-attribute instance-attribute
pan: DatacubeNodes instance-attribute
spectral: list[DatacubeNodes] instance-attribute

Rasterize

Bases: Node[Datacube]

arg: Union[VectorNodes, Geometry] instance-attribute
bbox: tuple[float, float, float, float] instance-attribute
burn_values: list[int] | str | None = None class-attribute instance-attribute
crs: str instance-attribute
nodata_value: float | None = None class-attribute instance-attribute
op: Literal['Rasterize'] = 'Rasterize' class-attribute instance-attribute
size: tuple[pydantic.PositiveInt, pydantic.PositiveInt] instance-attribute
value_field: str = 'data' class-attribute instance-attribute

ResampleAlg

Bases: str, Enum

AVERAGE = 'average' class-attribute instance-attribute
BILINEAR = 'bilinear' class-attribute instance-attribute
CUBIC = 'cubic' class-attribute instance-attribute
CUBIC_SPLINE = 'cubic_spline' class-attribute instance-attribute
GAUSS = 'gauss' class-attribute instance-attribute
LANCZOS = 'lanczos' class-attribute instance-attribute
MODE = 'mode' class-attribute instance-attribute
NEAREST = 'nearest' class-attribute instance-attribute
RMS = 'rms' class-attribute instance-attribute

Rescale

Bases: Node[Datacube]

arg: Union[DatacubeNodes, Scalar] instance-attribute
clip: bool = True class-attribute instance-attribute
op: Literal['Rescale'] = 'Rescale' class-attribute instance-attribute
rescale: list[tuple[float, float, float, float]] instance-attribute

Roughness

Bases: Node[Datacube]

arg: DatacubeNodes instance-attribute
op: Literal['Roughness'] = 'Roughness' class-attribute instance-attribute
scale: float instance-attribute

Scalar

Bases: ValueType

value: int | float | str | datetime instance-attribute

ScalarArg

Bases: Argument, Node[Scalar]

op: Literal['ScalarArg'] = 'ScalarArg' class-attribute instance-attribute

Select

Bases: Node[Datacube]

arg: DatacubeNodes instance-attribute
bands: list[int] instance-attribute
op: Literal['Select'] = 'Select' class-attribute instance-attribute

Slice

Bases: Node[Datacube]

arg: DatacubeNodes instance-attribute
dimension: str instance-attribute
op: Literal['Slice'] = 'Slice' class-attribute instance-attribute
value: str | int instance-attribute

Slope

Bases: Node[Datacube]

algorithm: DEMAlgorithm = DEMAlgorithm.HORN class-attribute instance-attribute
arg: DatacubeNodes instance-attribute
format: SlopeFormat = SlopeFormat.DEGREE class-attribute instance-attribute
op: Literal['Slope'] = 'Slope' class-attribute instance-attribute
scale: float instance-attribute

SlopeFormat

Bases: str, Enum

DEGREE = 'degree' class-attribute instance-attribute
PERCENT = 'percent' class-attribute instance-attribute

Stack

Bases: Node[Datacube]

args: list[DatacubeNodes] instance-attribute
op: Literal['Stack'] = 'Stack' class-attribute instance-attribute

TPI

Bases: Node[Datacube]

arg: DatacubeNodes instance-attribute
op: Literal['TPI'] = 'TPI' class-attribute instance-attribute
scale: float instance-attribute

TRI

Bases: Node[Datacube]

algorithm: TRIAlgorithm = TRIAlgorithm.WILSON class-attribute instance-attribute
arg: DatacubeNodes instance-attribute
op: Literal['TRI'] = 'TRI' class-attribute instance-attribute
scale: float instance-attribute

TRIAlgorithm

Bases: str, Enum

RILEY = 'Riley' class-attribute instance-attribute
WILSON = 'Wilson' class-attribute instance-attribute

ValueType

Bases: BaseModel

Vector

Bases: ValueType

VectorArg

Bases: Argument, Node[Vector]

op: Literal['VectorArg'] = 'VectorArg' class-attribute instance-attribute

Warp

Bases: Node[Datacube]

arg: DatacubeNodes instance-attribute
bbox: tuple[float, float, float, float] | None = None class-attribute instance-attribute
bbox_crs: str | None = None class-attribute instance-attribute
crs: str | None = None class-attribute instance-attribute
op: Literal['Warp'] = 'Warp' class-attribute instance-attribute
resample_alg: ResampleAlg = ResampleAlg.NEAREST class-attribute instance-attribute
resolution_x: float | None = None class-attribute instance-attribute
resolution_y: float | None = None class-attribute instance-attribute
scale_x: float | None = None class-attribute instance-attribute
scale_y: float | None = None class-attribute instance-attribute
size_x: pydantic.PositiveInt | None = None class-attribute instance-attribute
size_y: pydantic.PositiveInt | None = None class-attribute instance-attribute

descend_tree(node, check_types, extractor)

Helper function to extract collected information about specific nodes in the tree.

Source code in stacture/terravis/dag.py
def descend_tree(node: Node, check_types: tuple | type, extractor: Callable):
    """Helper function to extract collected information about specific nodes
    in the tree.
    """
    if isinstance(node, check_types):
        yield extractor(node)

    for child in vars(node).values():
        if isinstance(child, Node):
            yield from descend_tree(child, check_types, extractor)
        elif isinstance(child, list):
            for item in child:
                if isinstance(item, Node):
                    yield from descend_tree(item, check_types, extractor)
        elif isinstance(child, dict):
            for item in child.values():
                if isinstance(item, Node):
                    yield from descend_tree(item, check_types, extractor)

terravis.runner

LOGGER = structlog.get_logger() module-attribute

AsyncThreadedWorkflowRunner(engine, loop=None)

Bases: WorkflowRunner

Source code in stacture/terravis/runner.py
def __init__(self, engine: Engine, loop: asyncio.AbstractEventLoop | None = None):
    super().__init__(engine)
    self.loop = loop or asyncio.new_event_loop()
loop = loop or asyncio.new_event_loop() instance-attribute
execute_workflow(workflow)
Source code in stacture/terravis/runner.py
def execute_workflow(self, workflow: BaseWorkflow):
    # return self.run_node(workflow.root, RunContext(workflow.functions))
    with log_time(LOGGER, "Running workflow", level="info"):
        return self.loop.run_until_complete(
            self._run_node(workflow.root, RunContext(workflow.functions))
        )

RunContext(functions, arguments=dict(), parent=None, call_parent=None, path='$.workflow') dataclass

arguments: dict[str, Any] = field(default_factory=dict) class-attribute instance-attribute
call_parent: Union[RunContext, None] = None class-attribute instance-attribute
full_path: str property
full_paths: list[str] property
functions: Mapping[str, Node] instance-attribute
parent: Union[RunContext, None] = None class-attribute instance-attribute
path: str = '$.workflow' class-attribute instance-attribute
make_call_child(function_name, arguments)
Source code in stacture/terravis/runner.py
def make_call_child(self, function_name: str, arguments: dict[str, Any]):
    return dataclasses.replace(
        self,
        path=f'$.functions["{function_name}"]',
        arguments=arguments,
        parent=None,
        call_parent=self,
    )
make_child(path)
Source code in stacture/terravis/runner.py
def make_child(self, path):
    return dataclasses.replace(self, path=path, parent=self, call_parent=None)

WorkflowRunner(engine)

Source code in stacture/terravis/runner.py
def __init__(self, engine: Engine):
    self.engine = engine
ADOPT_TYPES = (Node, Scalar, *get_args(Geometry)) class-attribute instance-attribute
engine = engine instance-attribute
execute_workflow(workflow)
Source code in stacture/terravis/runner.py
def execute_workflow(self, workflow: BaseWorkflow):
    with log_time(LOGGER, "Running workflow", level="info"):
        return self.run_node(workflow.root, RunContext(workflow.functions))
run_node(node, context)
Source code in stacture/terravis/runner.py
def run_node(self, node: Node, context: RunContext):
    sub_values = {
        name: self._adopt_node(sub_node, context.make_child(f".{name}"))
        for name, sub_node in vars(node).items()
        if isinstance(sub_node, self.ADOPT_TYPES)
    }

    # handle list[Node] as well
    sub_values.update(
        {
            name: [
                self._adopt_node(item, context.make_child(f".{name}[{i}]"))
                for i, item in enumerate(items)
            ]
            for name, items in vars(node).items()
            if isinstance(items, list)
            and all(isinstance(item, self.ADOPT_TYPES) for item in items)
        }
    )
    # handle dict[str, Node]
    sub_values.update(
        {
            name: {
                key: self._adopt_node(item, context.make_child(f".{name}.{key}"))
                for key, item in items.items()
            }
            for name, items in vars(node).items()
            if isinstance(items, dict)
            and all(isinstance(item, self.ADOPT_TYPES) for item in items.values())
        }
    )

    if isinstance(node, Call):  # call into function
        return self.run_node(
            context.functions[node.name],
            context.make_call_child(node.name, sub_values["arguments"]),
        )

    with log_time(LOGGER, f"Handling node {type(node)}", level="debug"):
        return self.engine.process_node(node, **sub_values)

terravis.gdal.engine

ARITHMETIC_OP_TO_OPERATOR = {dag.ArithmeticOp.ADD: operator.add, dag.ArithmeticOp.SUB: operator.sub, dag.ArithmeticOp.MUL: operator.mul, dag.ArithmeticOp.DIV: operator.truediv} module-attribute

DATATYPE_MAP = {dag.DataType.Byte: gdal.GDT_Byte, dag.DataType.Int8: gdal.GDT_Byte, dag.DataType.UInt16: gdal.GDT_UInt16, dag.DataType.Int16: gdal.GDT_Int16, dag.DataType.UInt32: gdal.GDT_UInt32, dag.DataType.Int32: gdal.GDT_Int32, dag.DataType.Float32: gdal.GDT_Float32, dag.DataType.Float64: gdal.GDT_Float64} module-attribute

FUNCTION_MAP: dict[str, Callable] = {'sin': np.sin, 'cos': np.cos, 'tan': np.tan, 'arcsin': np.arcsin, 'arccos': np.arccos, 'arctan': np.arctan, 'hypot': np.hypot, 'arctan2': np.arctan2, 'degrees': np.degrees, 'radians': np.radians, 'unwrap': np.unwrap, 'deg2rad': np.deg2rad, 'rad2deg': np.rad2deg, 'sinh': np.sinh, 'cosh': np.cosh, 'tanh': np.tanh, 'arcsinh': np.arcsinh, 'arccosh': np.arccosh, 'arctanh': np.arctanh, 'exp': np.exp, 'expm1': np.expm1, 'exp2': np.exp2, 'log': np.log, 'log10': np.log10, 'log2': np.log2, 'log1p': np.log1p} module-attribute

LOGGER = structlog.get_logger() module-attribute

MEM_DRIVER: gdal.Driver = gdal.GetDriverByName('MEM') module-attribute

MEM_DRIVER_VEC: ogr.Driver = ogr.GetDriverByName('Memory') module-attribute

MIMETYPE_TO_DRIVER_NAME = {'image/png': 'PNG', 'image/jpeg': 'JPEG', 'image/tiff': 'GTiff'} module-attribute

NUMPY_DTYPE_MAP = {np.dtype(np.float16): np.float32} module-attribute

RESAMPLEALG_MAP = {dag.ResampleAlg.NEAREST: gdal.GRA_NearestNeighbour, dag.ResampleAlg.AVERAGE: gdal.GRA_Average, dag.ResampleAlg.BILINEAR: gdal.GRA_Bilinear, dag.ResampleAlg.CUBIC: gdal.GRA_Cubic, dag.ResampleAlg.CUBIC_SPLINE: gdal.GRA_CubicSpline, dag.ResampleAlg.LANCZOS: gdal.GRA_Lanczos, dag.ResampleAlg.MODE: gdal.GRA_Mode, dag.ResampleAlg.RMS: gdal.GRA_RMS} module-attribute

GDALDatacube(dataset) dataclass

dataset: gdal.Dataset instance-attribute
from_ndarry(data, gt, crs) classmethod
Source code in stacture/terravis/gdal/engine.py
@classmethod
def from_ndarry(cls, data: np.ndarray, gt: tuple[float, ...], crs: str):
    ds: gdal.Dataset = gdal_array.OpenNumPyArray(data, data.ndim == 2)
    ds.SetGeoTransform(gt)
    ds.SetProjection(crs)
    if isinstance(data, np.ma.MaskedArray):
        # TODO: also write masked band
        pass
    return cls(ds)

GDALEngine(config, credentials_profile=None)

Bases: Engine[GDALDatacube, GDALVector]

Source code in stacture/terravis/engine.py
def __init__(self, config: Config, credentials_profile: str | None = None) -> None:
    # good idea to give engine access to full config?
    self.config = config
    # good idea to have this here?
    self.credentials_profile = credentials_profile
arithmetic(node, lhs, rhs)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Arithmetic)
def arithmetic(
    self,
    node: dag.Arithmetic,
    lhs: GDALDatacube | dag.Scalar,
    rhs: GDALDatacube | dag.Scalar,
) -> GDALDatacube | dag.Scalar:
    gt = None
    proj = None

    if isinstance(lhs, dag.Scalar):
        lhs_value = lhs.value
    else:
        band: gdal.Band = lhs.dataset.GetRasterBand(1)
        lhs_value = band.ReadAsArray()
        gt = lhs.dataset.GetGeoTransform()
        proj = lhs.dataset.GetProjection()

    if isinstance(rhs, dag.Scalar):
        rhs_value = rhs.value
    else:
        band = rhs.dataset.GetRasterBand(1)
        rhs_value = band.ReadAsArray()
        gt = rhs.dataset.GetGeoTransform()
        proj = rhs.dataset.GetProjection()

    op = ARITHMETIC_OP_TO_OPERATOR[node.operation]

    # TODO: make this blockwise
    with np.errstate(divide="ignore", invalid="ignore"):
        out_data = op(lhs_value, rhs_value)

    if np.isscalar(out_data):
        return dag.Scalar(value=cast(float, out_data))

    out_ds: gdal.Dataset = gdal_array.OpenNumPyArray(out_data, False)
    if gt:
        out_ds.SetGeoTransform(gt)
    if proj:
        out_ds.SetProjection(proj)

    return GDALDatacube(out_ds)
aspect(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Aspect)
def aspect(self, node: dag.Aspect, arg: GDALDatacube) -> GDALDatacube:
    ds = arg.dataset
    out_ds = gdal.DEMProcessing(
        "",
        ds,
        "aspect",
        format="MEM",
        alg=node.algorithm.value,
        scale=node.scale,
        trigonometric=node.trignonometric,
        zeroForFlat=node.zero_for_flat,
    )
    return GDALDatacube(out_ds)
cast(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Cast)
def cast(self, node: dag.Cast, arg: GDALDatacube) -> GDALDatacube:
    datatype = DATATYPE_MAP[node.datatype]
    out_ds = gdal.Translate(
        "",
        arg.dataset,
        options=gdal.TranslateOptions(outputType=datatype, format="VRT"),
    )
    return GDALDatacube(out_ds)
colorize(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Colorize)
def colorize(self, node: dag.Colorize, arg: GDALDatacube):
    colormap = node.colormap
    if node.normalize:
        norm = matplotlib.colors.Normalize(*node.normalize)
    else:
        norm = matplotlib.colors.NoNorm()

    def _entry_to_color(
        entry: dag.ColorMapEntry,
    ) -> tuple[float, tuple[float, float, float, float]]:
        color = pydantic_extra_types.color.Color(entry.color)
        return (
            entry.quantity,
            cast(
                tuple[float, float, float, float],
                (
                    *(c / 255 for c in color.as_rgb_tuple(alpha=False)),
                    color._alpha_float(),
                ),
            ),
        )

    if isinstance(colormap, str):
        cmap = matplotlib.colormaps.get_cmap(colormap)

    elif colormap.type == dag.ColorMapType.ramp:
        cmap = matplotlib.colors.LinearSegmentedColormap.from_list(
            "",
            # TODO: according to mypy this is not supported, no idea if it would work
            [_entry_to_color(entry) for entry in colormap.entries],  # type: ignore
        )
        cmap.set_extremes(bad=(0, 0, 0, 0), under=(0, 0, 0, 0), over=(0, 0, 0, 0))
    elif colormap.type in (dag.ColorMapType.intervals, dag.ColorMapType.values):
        v_min, v_max = node.normalize or (0.0, 1.0)
        boundaries = [
            (
                (1 - entry.quantity) * v_min + entry.quantity * v_max
                if node.normalize
                else entry.quantity
            )
            for entry in colormap.entries
        ]

        cmap = matplotlib.colors.ListedColormap(
            [_entry_to_color(entry)[1] for entry in colormap.entries]
        )
        norm = matplotlib.colors.BoundaryNorm(boundaries, cmap.N)

    elif colormap.type == dag.ColorMapType.values:
        # TODO: handle
        cmap = matplotlib.colors.ListedColormap(
            [_entry_to_color(entry)[1] for entry in colormap.entries]
        )

    ds = arg.dataset
    band: gdal.Band = ds.GetRasterBand(1)
    data = band.ReadAsArray()

    # handling masks
    if get_ds_mask_flags(ds) != gdal.GMF_ALL_VALID:
        mask_band: gdal.Band = band.GetMaskBand()
        data = np.ma.masked_where(
            ~mask_band.ReadAsArray(),  # need inverted mask here
            data,
        )

    sm = matplotlib.cm.ScalarMappable(norm, cmap)
    out_data = sm.to_rgba(data, bytes=True)

    out_ds: gdal.Dataset = gdal_array.OpenNumPyArray(out_data, False)
    out_ds.GetRasterBand(1).SetColorInterpretation(gdal.GCI_RedBand)
    out_ds.GetRasterBand(2).SetColorInterpretation(gdal.GCI_GreenBand)
    out_ds.GetRasterBand(3).SetColorInterpretation(gdal.GCI_BlueBand)
    out_ds.GetRasterBand(4).SetColorInterpretation(gdal.GCI_AlphaBand)

    out_ds.SetGeoTransform(arg.dataset.GetGeoTransform())
    out_ds.SetProjection(arg.dataset.GetProjection())

    return GDALDatacube(out_ds)
contours(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Contours)
def contours(self, node: dag.Contours, arg: GDALDatacube) -> GDALVector:
    ds = arg.dataset
    band = ds.GetRasterBand(1)

    datasource: ogr.DataSource = MEM_DRIVER_VEC.CreateDataSource("")
    layer: ogr.Layer = datasource.CreateLayer("contour")

    # set up fields for id/value
    field_defn = ogr.FieldDefn(node.id_field, ogr.OFTInteger)
    layer.CreateField(field_defn)

    field_defn = ogr.FieldDefn(node.value_field, ogr.OFTReal)
    layer.CreateField(field_defn)

    gdal.ContourGenerate(band, node.interval, node.base, [], True, 0.0, layer, 0, 1)
    return GDALVector(datasource)
create_datacube(node)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.CreateDatacube)
def create_datacube(self, node: dag.CreateDatacube) -> GDALDatacube:
    bbox = node.bbox
    if node.size:
        width, height = node.size
    elif node.resolution:
        resx, resy = node.resolution
        width = ceil((bbox[2] - bbox[0]) / resx)
        height = ceil((bbox[3] - bbox[1]) / resy)
    else:
        raise ValueError("Neither size nor resolution passed")

    out_ds: gdal.Dataset = MEM_DRIVER.Create(
        "",
        width,
        height,
        len(node.init_values),
        DATATYPE_MAP[node.datatype],
    )

    for i, init_value in enumerate(node.init_values, start=1):
        band: gdal.Band = out_ds.GetRasterBand(i)
        band.Fill(init_value)

    out_ds.SetGeoTransform(
        [
            bbox[0],
            (bbox[2] - bbox[0]) / width,
            0,
            bbox[3],
            0,
            -(bbox[3] - bbox[1]) / height,
        ]
    )
    if node.crs:
        out_ds.SetProjection(node.crs)

    return GDALDatacube(out_ds)
encode_coverage(result, format_, encode_options)

Encodes a coverage

Parameters:

Name Type Description Default
result GDALDatacube

Result to encode as an image

required
format_ str

Format to encode the image as

required
encode_options dict[str, str]

additional encoding options

required

Returns:

Type Description
IOBase

io.IOBase: image bytes

Source code in stacture/terravis/gdal/engine.py
def encode_coverage(
    self, result: GDALDatacube, format_: str, encode_options: dict[str, str]
) -> io.IOBase:
    """Encodes a coverage

    Args:
        result (GDALDatacube): Result to encode as an image
        format_ (str): Format to encode the image as
        encode_options (dict[str, str]): additional encoding options

    Returns:
        io.IOBase: image bytes
    """
    driver: gdal.Driver = gdal.GetDriverByName(MIMETYPE_TO_DRIVER_NAME[format_])
    with vsi.TemporaryVSIFile.from_buffer(b"") as f:
        driver.CreateCopy(f.name, result.dataset)
        f.seek(0)
        return f.read()
encode_image(result, format_, encode_options)

Encodes an image

Parameters:

Name Type Description Default
result GDALDatacube

Result to encode as an image

required
format_ str

Format to encode the image as

required
encode_options dict[str, str]

additional encoding options

required

Returns:

Type Description
IOBase

io.IOBase: image bytes

Source code in stacture/terravis/gdal/engine.py
def encode_image(
    self, result: GDALDatacube, format_: str, encode_options: dict[str, str]
) -> io.IOBase:
    """Encodes an image

    Args:
        result (GDALDatacube): Result to encode as an image
        format_ (str): Format to encode the image as
        encode_options (dict[str, str]): additional encoding options

    Returns:
        io.IOBase: image bytes
    """
    driver: gdal.Driver = gdal.GetDriverByName(MIMETYPE_TO_DRIVER_NAME[format_])
    with vsi.TemporaryVSIFile.from_buffer(b"") as f:
        driver.CreateCopy(f.name, result.dataset)
        f.seek(0)
        return f.read()
function(node, args)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Function)
def function(
    self,
    node: dag.Function,
    args: list[GDALDatacube | dag.Scalar],
) -> GDALDatacube | dag.Scalar:
    func = FUNCTION_MAP[node.name]
    gt = None
    proj = None
    prepped_args: list[np.ndarray | Any] = []
    for arg in args:
        if isinstance(arg, dag.Scalar):
            prepped_args.append(arg.value)
        else:
            band: gdal.Band = arg.dataset.GetRasterBand(1)
            prepped_args.append(band.ReadAsArray())
            gt = arg.dataset.GetGeoTransform()
            proj = arg.dataset.GetProjection()

    # TODO: currently all functions convert to float64
    result = func(*prepped_args, dtype=np.float64)

    if np.isscalar(result):
        return dag.Scalar(value=cast(float, result))

    # if result.dtype in NUMPY_DTYPE_MAP:
    #     result = result.astype(NUMPY_DTYPE_MAP[result.dtype])

    out_ds: gdal.Dataset = gdal_array.OpenNumPyArray(result, False)
    if gt:
        out_ds.SetGeoTransform(gt)
    if proj:
        out_ds.SetProjection(proj)

    return GDALDatacube(out_ds)
hillshade(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Hillshade)
def hillshade(self, node: dag.Hillshade, arg: GDALDatacube) -> GDALDatacube:
    ds = arg.dataset
    mode = node.mode
    out_ds = gdal.DEMProcessing(
        "",
        ds,
        "hillshade",
        format="MEM",
        alg=node.algorithm.value,
        zFactor=node.z_factor,
        scale=node.scale,
        azimuth=node.azimuth,
        altitude=node.altitude,
        combined=mode == dag.HillshadeMode.COMBINED,
        multiDirectional=mode == dag.HillshadeMode.MULTI_DIRECTIONAL,
        igor=mode == dag.HillshadeMode.IGOR,
    )
    return GDALDatacube(out_ds)
mask(node, arg, mask)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Mask)
def mask(self, node: dag.Mask, arg: GDALDatacube, mask: GDALDatacube):
    mask_ds = mask.dataset
    out_ds: gdal.Dataset = MEM_DRIVER.CreateCopy("", arg.dataset)

    band0: gdal.Band = out_ds.GetRasterBand(1)
    band0.CreateMaskBand()
    mask_band: gdal.Band
    if mask_ds.RasterCount == 1:
        in_mask_band: gdal.Band = mask_ds.GetRasterBand(1)
        mask_data = in_mask_band.ReadAsArray()
        for band in iter_bands(arg.dataset):
            mask_band = band.GetMaskBand()
            mask_band.WriteArray(mask_data)

    elif mask_ds.RasterCount == out_ds.RasterCount:
        zipped = zip(iter_bands(out_ds), iter_bands(mask_ds), strict=True)
        for band, in_mask_band in zipped:
            mask_band = band.GetMaskBand()
            mask_band.WriteArray(in_mask_band.ReadAsArray())

    else:
        raise ValueError("Mask Datacube has invalid amount of bands")

    return GDALDatacube(out_ds)
merge(node, args)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Merge)
def merge(self, node: dag.Merge, args: list[GDALDatacube]):
    dss = [arg.dataset for arg in args]
    ds0 = dss[0]
    band0: gdal.Band = ds0.GetRasterBand(1)

    if len(dss) == 1:
        return GDALDatacube(ds0)

    color_interpretations = [get_color_interpretations(ds) for ds in dss]

    if all(ci in CIS_TREAT_AS_RGBX for ci in color_interpretations) and any(
        ci in (CI_UNDEFINED_4, CI_RGBA) for ci in color_interpretations
    ):
        # we allow RGB/RGBA merging
        result = self._merge_alpha(*dss)
    else:
        # do some sanity checks: band count
        for ds in dss[1:]:
            if ds.RasterCount != ds0.RasterCount:
                raise ValueError(
                    "Unable to merge datasets due to different band counts"
                )
            # TODO: also datatype?

        if band0.GetMaskFlags() == gdal.GMF_PER_DATASET:
            result = self._merge_mask_band(*dss)
        elif band0.GetNoDataValue() is not None:
            result = self._merge_nodata_value(*dss)
        else:
            result = ds0

    return GDALDatacube(result)
open_datacube(node)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.OpenDatacube)
def open_datacube(self, node: dag.OpenDatacube) -> GDALDatacube:
    with self.set_access_credentials():
        vsi_path = vsi.to_vsi_path(node.location)
        try:
            return GDALDatacube(gdal.Open(vsi_path, gdal.GA_ReadOnly))
        except RuntimeError as e:
            if e.args and e.args[0].startswith("invalid-location"):
                raise BadRequest(f"Invalid location: {node.location}") from e
            else:
                raise
open_vector(node)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.OpenVector)
def open_vector(self, node: dag.OpenVector) -> GDALDatacube:
    with self.set_access_credentials():
        vsi_path = vsi.to_vsi_path(node.location)
        return GDALDatacube(gdal.Open(vsi_path, gdal.GA_ReadOnly))
pansharpen(node, pan, spectral)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Pansharpen)
def pansharpen(
    self, node: dag.Pansharpen, pan: GDALDatacube, spectral: list[GDALDatacube]
) -> GDALDatacube:
    spectral_band_xml = "".join(
        f'<SpectralBand dstBand="{i + 1}"></SpectralBand>'
        for i in range(len(spectral))
    )
    ds: gdal.Dataset = gdal.CreatePansharpenedVRT(
        f"""f
        <VRTDataset subClass="VRTPansharpenedDataset">
            <PansharpeningOptions>
                {spectral_band_xml}
            </PansharpeningOptions>
        </VRTDataset>
        """,
        pan.dataset.GetRasterBand(1),
        [dc.dataset.GetRasterBand(1) for dc in spectral],
    )

    out_ds: gdal.Dataset = gdal_array.OpenNumPyArray(ds.ReadAsArray(), True)
    # restore original nodata from pan band to output ds
    nodata_value = pan.dataset.GetRasterBand(1).GetNoDataValue()
    if nodata_value is not None:
        for i in range(out_ds.RasterCount):
            out_ds.GetRasterBand(i + 1).SetNoDataValue(nodata_value)

    return GDALDatacube(out_ds)
rasterize(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Rasterize)
def rasterize(
    self, node: dag.Rasterize, arg: GDALVector | dag.Geometry
) -> GDALDatacube:
    layer: ogr.Layer
    if isinstance(arg, GDALVector):
        layer = arg.datasource.GetLayer()
    else:
        ds: ogr.DataSource = MEM_DRIVER_VEC.CreateDataSource("")
        sr = osr.SpatialReference()
        sr.ImportFromEPSG(4326)
        layer = ds.CreateLayer("", sr)
        feature = ogr.Feature(layer.GetLayerDefn())
        feature.SetGeometry(ogr.CreateGeometryFromWkt(arg.wkt))
        layer.CreateFeature(feature)

    width, height = node.size
    minx, miny, maxx, maxy = node.bbox

    if node.burn_values:
        if isinstance(node.burn_values, str):
            color = matplotlib.colors.cnames[node.burn_values]
            burn_values = [
                int(component * 255) for component in matplotlib.colors.to_rgba(color)
            ]
        else:
            burn_values = node.burn_values

        # if we have explicit burn values, we need to determine the datatype
        datatype = gdal.GDT_Byte
        for burn_value in burn_values:
            datatype = gdal_datatype_union_and_value(datatype, burn_value)

    else:
        burn_values = [0]
        datatype = gdal.GDT_Float64

    out_ds: gdal.Dataset = MEM_DRIVER.Create(
        "",
        width,
        height,
        len(burn_values),
        datatype,
    )
    out_ds.SetProjection(node.crs)
    out_ds.SetGeoTransform(
        [
            minx,
            (maxx - minx) / width,
            0,
            maxy,
            0,
            -(maxy - miny) / height,
        ]
    )
    if node.nodata_value is not None:
        band: gdal.Band = out_ds.GetRasterBand(1)
        band.SetNoDataValue(node.nodata_value)
        band.Fill(node.nodata_value)

    if node.burn_values is not None:
        out_bands = list(range(1, len(burn_values) + 1))
        options = []
    else:
        out_bands = [1]
        options = [f"ATTRIBUTE={node.value_field}"]

    gdal.RasterizeLayer(
        out_ds, out_bands, layer, burn_values=burn_values, options=options
    )

    # TODO: layer.GetDataset not yet implemented
    # out_ds: gdal.Dataset = gdal.Rasterize(
    #     "",
    #     arg.datasource.GetLayerByIndex(0).GetDataset(),
    #     options=gdal.RasterizeOptions(
    #         format="MEM",
    #         noData=node.nodata_value, initValues=node.nodata_value,
    #         outputSRS=node.crs,
    #         outputBounds=node.bbox,
    #         width=node.size[0], height=node.size[1],
    #         burnValues=node.burn_value,
    #         attribute=node.value_field if node.burn_value is not None else None,

    #     )
    # )

    return GDALDatacube(out_ds)
rescale(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Rescale)
def rescale(self, node: dag.Rescale, arg: GDALDatacube) -> GDALDatacube:
    ds = arg.dataset
    gt = ds.GetGeoTransform()
    proj = ds.GetProjection()

    rescales = node.rescale
    if len(rescales) == 1:
        rescales *= ds.RasterCount

    out_data = []
    for band, rescale in zip(iter_bands(ds), rescales, strict=True):
        x1, x2, y1, y2 = rescale
        data = band.ReadAsArray()

        # NOTE: the interpolate formula uses large numbers which lead to
        # overflows on uint16
        if data.dtype != convert_dtype(data.dtype):
            data = data.astype(convert_dtype(data.dtype))
        band_data = ((y2 - y1) * data + x2 * y1 - x1 * y2) / (x2 - x1)
        if node.clip:
            # clamp values below min to min and above max to max
            np.clip(band_data, y1, y2, out=band_data)

        out_data.append(band_data)

    out_ds: gdal.Dataset = MEM_DRIVER.CreateCopy(
        "", gdal_array.OpenNumPyArray(np.stack(out_data), True)
    )
    if gt:
        out_ds.SetGeoTransform(gt)
    if proj:
        out_ds.SetProjection(proj)

    if get_ds_mask_flags(ds) != gdal.GMF_ALL_VALID:
        out_ds.CreateMaskBand(gdal.GMF_PER_DATASET)
        out_band: gdal.Band = out_ds.GetRasterBand(1)
        mask_band: gdal.Band = out_band.GetMaskBand()
        mask_band.WriteArray(ds.GetRasterBand(1).GetMaskBand().ReadAsArray())

    return GDALDatacube(out_ds)
roughness(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Roughness)
def roughness(self, node: dag.Roughness, arg: GDALDatacube) -> GDALDatacube:
    ds = arg.dataset
    out_ds = gdal.DEMProcessing(
        "",
        ds,
        "Roughness",
        format="MEM",
        scale=node.scale,
    )
    return GDALDatacube(out_ds)
select(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Select)
def select(self, node: dag.Select, arg: GDALDatacube) -> GDALDatacube:
    out_ds = gdal.Translate(
        "",
        arg.dataset,
        options=gdal.TranslateOptions(bandList=node.bands, format="VRT"),
    )
    return GDALDatacube(out_ds)
select_format(format_, default)
Source code in stacture/terravis/gdal/engine.py
def select_format(self, format_: str, default: str) -> str:
    supported = sorted(
        MIMETYPE_TO_DRIVER_NAME,
        key=lambda elem: 0 if elem == default else 1,
    )
    try:
        return content_negotiation.decide_content_type(
            accept_headers=[format_],
            supported_content_types=supported,
        )
    except content_negotiation.NoAgreeableContentTypeError as e:
        raise StactureHTTPException(HTTPStatus.NOT_ACCEPTABLE) from e
slice(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Slice)
def slice(self, node: dag.Slice, arg: GDALDatacube) -> GDALDatacube:
    # TODO: implement
    # how to deal with higher, lower dimensionality?
    raise NotImplementedError
slope(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Slope)
def slope(self, node: dag.Slope, arg: GDALDatacube) -> GDALDatacube:
    ds = arg.dataset
    out_ds = gdal.DEMProcessing(
        "",
        ds,
        "slope",
        format="MEM",
        alg=node.algorithm.value,
        scale=node.scale,
        slopeFormat=node.format.value,
    )
    return GDALDatacube(out_ds)
stack(node, args)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Stack)
def stack(self, node: dag.Stack, args: list[GDALDatacube]) -> GDALDatacube:
    dss = [arg.dataset for arg in args]
    ds0 = dss[0]
    band0: gdal.Band = ds0.GetRasterBand(1)

    band_count = sum(ds.RasterCount for ds in dss)
    out_ds: gdal.Dataset = MEM_DRIVER.Create(
        "",
        ds0.RasterXSize,
        ds0.RasterYSize,
        band_count,
        band0.DataType,
    )
    out_ds.CreateMaskBand(gdal.GMF_PER_DATASET)
    out_mask_band: gdal.Band = out_ds.GetRasterBand(1).GetMaskBand()

    mask = np.zeros((ds0.RasterYSize, ds0.RasterXSize))

    out_bands = iter(range(1, band_count + 1))
    for ds in dss:
        mask = np.logical_or(mask, ds.GetRasterBand(1).GetMaskBand().ReadAsArray())
        for band in iter_bands(ds):
            out_band: gdal.Band = out_ds.GetRasterBand(next(out_bands))
            out_band.WriteArray(band.ReadAsArray())

    out_mask_band.WriteArray(mask)
    return GDALDatacube(out_ds)
statistics(result)
Source code in stacture/terravis/gdal/engine.py
def statistics(self, result: GDALDatacube) -> dict:
    def _stats_for_band(band: gdal.Band) -> dict:
        stats = band.ComputeStatistics(approx_ok=False)
        histogram = band.GetHistogram(approx_ok=False, include_out_of_range=True)
        min_, max_, mean, stddev = stats
        return {
            "min": min_,
            "max": max_,
            "mean": mean,
            "stddev": stddev,
            "histogram": histogram,
        }

    return {"bands": [_stats_for_band(band) for band in iter_bands(result.dataset)]}
tpi(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.TPI)
def tpi(self, node: dag.TPI, arg: GDALDatacube) -> GDALDatacube:
    ds = arg.dataset
    out_ds = gdal.DEMProcessing(
        "",
        ds,
        "TPI",
        format="MEM",
        scale=node.scale,
    )
    return GDALDatacube(out_ds)
tri(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.TRI)
def tri(self, node: dag.TRI, arg: GDALDatacube) -> GDALDatacube:
    ds = arg.dataset
    out_ds = gdal.DEMProcessing(
        "",
        ds,
        "TRI",
        format="MEM",
        alg=node.algorithm.value,
        scale=node.scale,
    )
    return GDALDatacube(out_ds)
warp(node, arg)
Source code in stacture/terravis/gdal/engine.py
@handle(dag.Warp)
def warp(self, node: dag.Warp, arg: GDALDatacube) -> GDALDatacube:
    bbox = node.bbox
    ds = arg.dataset

    # in many cases, we don't need the complex behavior and can just do a translate
    if (
        not any(
            (
                node.size_x,
                node.size_y,
                node.scale_x,
                node.scale_y,
                node.resolution_x,
                node.resolution_y,
            )
        )
        and node.bbox
    ):
        # OGC coverages boundinx boxes are  llx, lly, urx, ury
        llx, lly, urx, ury = node.bbox
        # proj win is in format: ulx, uly, lrx, lry
        proj_win = llx, ury, urx, lly
        tiff_path = f"/vsimem/{uuid4().hex}.tif"
        result_ds = gdal.Translate(
            tiff_path,
            ds,
            options=gdal.TranslateOptions(
                projWin=proj_win,
                projWinSRS=node.bbox_crs,
            ),
        )

        driver = gdal.Open(tiff_path).GetDriver()
        driver.Delete(tiff_path)

        return GDALDatacube(result_ds)

    width: int | None = None
    height: int | None = None

    suggested_ds: gdal.Dataset | None = None
    pre_width: int | None = None
    pre_height: int | None = None
    if (
        node.scale_x is not None or node.scale_y is not None or bbox is None
    ) and node.crs:
        tiff_path = f"/vsimem/{uuid4().hex}.tif"
        gdal.Translate(tiff_path, ds, maskBand="auto")

        vrt_path = f"/vsimem/{uuid4().hex}.vrt"
        gdal.Warp(vrt_path, tiff_path, dstSRS=node.crs)
        suggested_ds = gdal.Open(vrt_path, gdal.GA_ReadOnly)
        pre_width = suggested_ds.RasterXSize
        pre_height = suggested_ds.RasterYSize

        # use the bbox of the warped images as target bbox
        if bbox is None:
            bbox = get_ds_bbox(suggested_ds)

        driver = suggested_ds.GetDriver()
        del suggested_ds
        driver.Delete(vrt_path)

        driver = gdal.Open(tiff_path).GetDriver()
        driver.Delete(tiff_path)
    elif node.scale_x is not None or node.scale_y is not None:
        # scale is meant relative to the original image
        pre_width = ds.RasterXSize
        pre_height = ds.RasterYSize

    # use the whole original bbox if none was passed
    if bbox is None:
        bbox = get_ds_bbox(ds)

    width, height = consolidate_warp_size(node, bbox, pre_width, pre_height)
    band: gdal.Band = ds.GetRasterBand(1)
    out_ds = warp(
        ds,
        width,
        height,
        ds.RasterCount,
        band.DataType,
        bbox,
        crs=node.crs or ds.GetProjection(),
        masks=True,
        multithread=True,
        resampleAlg=RESAMPLEALG_MAP[node.resample_alg],
    )

    return GDALDatacube(out_ds)

GDALVector(datasource) dataclass

datasource: ogr.DataSource instance-attribute

terravis.gdal.utils

CIS_RGBX = {CI_RGB, CI_RGBA} module-attribute

CIS_TREAT_AS_RGBX = CIS_RGBX | {CI_UNDEFINED_3, CI_UNDEFINED_4} module-attribute

CI_RGB = (gdal.GCI_RedBand, gdal.GCI_GreenBand, gdal.GCI_BlueBand) module-attribute

CI_RGBA = (gdal.GCI_RedBand, gdal.GCI_GreenBand, gdal.GCI_BlueBand, gdal.GCI_AlphaBand) module-attribute

CI_UNDEFINED_3 = (gdal.GCI_Undefined, gdal.GCI_Undefined, gdal.GCI_Undefined) module-attribute

CI_UNDEFINED_4 = (gdal.GCI_Undefined, gdal.GCI_Undefined, gdal.GCI_Undefined, gdal.GCI_Undefined) module-attribute

DATA_TYPE_INFO = {gdal.GDT_Byte: (False, False, 8), gdal.GDT_UInt16: (False, False, 16), gdal.GDT_UInt32: (False, False, 32), gdal.GDT_Int16: (False, True, 16), gdal.GDT_Int32: (False, True, 32), gdal.GDT_Float32: (True, True, 32), gdal.GDT_Float64: (True, True, 64)} module-attribute

convert_dtype(dtype)

Maps numpy dtype to a larger itemsize to avoid value overflow during mathematical operations

Parameters:

Name Type Description Default
dtype dtype

input dtype

required

Returns:

Name Type Description
dtype dtype

either one size larger dtype or original dtype

Source code in stacture/terravis/gdal/utils.py
def convert_dtype(dtype: np.dtype):
    """Maps numpy dtype to a larger itemsize
    to avoid value overflow during mathematical operations

    Args:
        dtype (np.dtype): input dtype

    Returns:
        dtype (np.dtype): either one size larger dtype or original dtype
    """
    mapping = {
        np.dtype(np.int8): np.dtype(np.int16),
        np.dtype(np.int16): np.dtype(np.int32),
        np.dtype(np.int32): np.dtype(np.int64),
        np.dtype(np.uint8): np.dtype(np.int16),
        np.dtype(np.uint16): np.dtype(np.int32),
        np.dtype(np.uint32): np.dtype(np.int64),
        np.dtype(np.float16): np.dtype(np.float32),
        np.dtype(np.float32): np.dtype(np.float64),
    }
    output_dtype = mapping.get(dtype, dtype)
    return output_dtype

find_datatype(floating, signed, num_bits)

Source code in stacture/terravis/gdal/utils.py
def find_datatype(floating: bool, signed: bool, num_bits: int) -> int:
    if floating:
        return gdal.GDT_Float32 if num_bits <= 32 else gdal.GDT_Float64

    if num_bits <= 8:
        return gdal.GDT_Byte if not signed else gdal.GDT_Int16
    if num_bits <= 16:
        return gdal.GDT_UInt16 if not signed else gdal.GDT_Int16
    if num_bits <= 32:
        return gdal.GDT_UInt32 if not signed else gdal.GDT_Int32

    return gdal.GDT_Float64

gdal_datatype_parameters(value)

Source code in stacture/terravis/gdal/utils.py
def gdal_datatype_parameters(value: int | float) -> tuple[bool, bool, int]:
    if isinstance(value, int):
        if 0 <= value <= 0xFF:
            return False, False, 8
        elif 0 <= value <= 0xFFFF:
            return False, False, 16
        elif 0 <= value <= 0xFFFFFFFF:
            return False, False, 32
        # elif 0 <= value <= 0xFFFFFFFFFFFFFFFF:
        #     return False, False, 64
        elif -32768 <= value <= 32767:
            return False, True, 16
        elif -2147483648 <= value <= 2147483647:
            return False, True, 32
        # elif -2^63 <= value <= 2^63 - 1:
        #     return False, True, 64
    # else:
    # TODO: determine if 32 bit possible
    return True, True, 64

gdal_datatype_union(dt1, dt2)

Source code in stacture/terravis/gdal/utils.py
def gdal_datatype_union(dt1: int, dt2: int) -> int:
    info1 = DATA_TYPE_INFO[dt1]
    info2 = DATA_TYPE_INFO[dt2]

    return find_datatype(
        info1[0] or info2[0], info1[1] or info2[1], max(info1[2], info2[2])
    )

gdal_datatype_union_and_value(dt1, value)

Source code in stacture/terravis/gdal/utils.py
def gdal_datatype_union_and_value(dt1: int, value: int | float) -> int:
    info1 = DATA_TYPE_INFO[dt1]
    info2 = gdal_datatype_parameters(value)

    return find_datatype(
        info1[0] or info2[0], info1[1] or info2[1], max(info1[2], info2[2])
    )

get_color_interpretations(ds)

Source code in stacture/terravis/gdal/utils.py
def get_color_interpretations(ds: gdal.Dataset) -> tuple[int, ...]:
    return tuple(band.GetColorInterpretation() for band in iter_bands(ds))

get_ds_bbox(ds)

Source code in stacture/terravis/gdal/utils.py
def get_ds_bbox(ds: gdal.Dataset) -> tuple[float, float, float, float]:
    size_x = ds.RasterXSize
    size_y = ds.RasterYSize
    gt = ds.GetGeoTransform()

    xa = gt[0]
    xb = gt[0] + gt[1] * size_x
    ya = gt[3]
    yb = gt[3] + gt[5] * size_y

    return (min(xa, xb), min(ya, yb), max(xa, xb), max(ya, yb))

get_ds_mask_flags(ds)

Convenience function to get the mask flags of the dataset.

Source code in stacture/terravis/gdal/utils.py
def get_ds_mask_flags(ds: gdal.Dataset) -> int:
    """Convenience function to get the mask flags of the dataset."""
    band: gdal.Band = ds.GetRasterBand(1)
    return band.GetMaskFlags()

iter_bands(ds)

Source code in stacture/terravis/gdal/utils.py
def iter_bands(ds: gdal.Dataset) -> Iterator[gdal.Band]:
    for i in range(1, ds.RasterCount + 1):
        yield ds.GetRasterBand(i)

read_as_array(ds)

Source code in stacture/terravis/gdal/utils.py
def read_as_array(ds: gdal.Dataset) -> np.ndarray:
    data: np.ndarray = ds.ReadAsArray()
    if data.ndim == 2:
        data = data.reshape((1, *data.shape))
    return data

read_rgb_and_alpha(ds)

Reads the RGB-A values of a dataset. When the dataset already is in RGBA (or unknown with 4 bands), RGB and alpha is returned. Otherwise, the dataset mask will be used as alpha values.

RGB is in the range of the datatype, alpha is transformed to float from 0..1

Parameters:

Name Type Description Default
ds Dataset

the dataset to read the RGBA data from

required

Returns:

Type Description
tuple[ndarray, ndarray]

tuple[np.ndarray, np.ndarray]: the RGB and alpha values

Source code in stacture/terravis/gdal/utils.py
def read_rgb_and_alpha(ds: gdal.Dataset) -> tuple[np.ndarray, np.ndarray]:
    """Reads the RGB-A values of a dataset. When the dataset already is in
    RGBA (or unknown with 4 bands), RGB and alpha is returned. Otherwise, the
    dataset mask will be used as alpha values.

    RGB is in the range of the datatype, alpha is transformed to float from 0..1

    Args:
        ds (gdal.Dataset): the dataset to read the RGBA data from

    Returns:
        tuple[np.ndarray, np.ndarray]: the RGB and alpha values
    """
    data = ds.ReadAsArray()
    band: gdal.Band = ds.GetRasterBand(1)
    if get_color_interpretations(ds) in (CI_RGBA, CI_UNDEFINED_4):
        return data[:-1], data[-1] / 255

    msk_band: gdal.Band = band.GetMaskBand()
    mask: np.ndarray = msk_band.ReadAsArray()
    return data, mask.astype(float)

terravis.gdal.vsi

This module provides Python file-object like access to VSI files.

PROTOCOL_MAP = {'s3': ('vsis3', False), 'http': ('vsicurl', True), 'https': ('vsicurl', True), 'ftp': ('vsicurl', True)} module-attribute

UINT32_MAX = numpy.iinfo('uint32').max module-attribute

remove = gdal.Unlink module-attribute

rename = gdal.Rename module-attribute

TemporaryVSIFile(filename, mode='r')

Bases: VSIFile

Subclass of VSIFile, that automatically deletes the physical file upon deletion.

Source code in stacture/terravis/gdal/vsi.py
def __init__(self, filename: os.PathLike, mode="r"):
    fspath = os.fspath(filename)
    self._handle = gdal.VSIFOpenL(fspath, mode)
    self._filename = fspath
    self._mode = mode

    if self._handle is None:
        raise OSError(f"Failed to open file '{self._filename}'.")
close()

Close the file. This also deletes it.

Source code in stacture/terravis/gdal/vsi.py
def close(self):
    """Close the file. This also deletes it."""
    if not self.closed:
        super().close()
        remove(self.name)
from_buffer(buf, mode='wb', filename=None) classmethod

Creates a :class:TemporaryVSIFile from a string.

Source code in stacture/terravis/gdal/vsi.py
@classmethod
def from_buffer(cls, buf, mode="wb", filename: os.PathLike | None = None):
    """Creates a :class:`TemporaryVSIFile` from a string."""
    effective_filename: os.PathLike = (
        pathlib.Path(f"/vsimem/{uuid4().hex}") if filename is None else filename
    )
    f = cls(effective_filename, mode)
    f.write(buf)
    f.seek(0)
    return f

VSIFile(filename, mode='r')

File-like object interface for VSI file API.

:param filename: the path to the file; this might also be any VSI special path like "/vsicurl/..." or "/vsizip/...". See the GDAL documentation <http://trac.osgeo.org/gdal/wiki/UserDocs/ReadInZip> and manuals <http://www.gdal.org/gdal_virtual_file_systems.html> for reference. :param mode: the file opening mode

Source code in stacture/terravis/gdal/vsi.py
def __init__(self, filename: os.PathLike, mode="r"):
    fspath = os.fspath(filename)
    self._handle = gdal.VSIFOpenL(fspath, mode)
    self._filename = fspath
    self._mode = mode

    if self._handle is None:
        raise OSError(f"Failed to open file '{self._filename}'.")
closed: bool property

Return a boolean value to indicate whether or not the file is already closed.

name: str property

Returns the filename referenced by this file

read1 = read class-attribute instance-attribute
readinto1 = readinto class-attribute instance-attribute
size: int property

Return the size of the file in bytes

__del__()
Source code in stacture/terravis/gdal/vsi.py
def __del__(self):
    self.close()
__enter__()
Source code in stacture/terravis/gdal/vsi.py
def __enter__(self):
    return self
__exit__(etype=None, evalue=None, tb=None)
Source code in stacture/terravis/gdal/vsi.py
def __exit__(self, etype=None, evalue=None, tb=None):
    self.close()
__iter__()

Iterate over the lines within the file.

Source code in stacture/terravis/gdal/vsi.py
def __iter__(self):
    """Iterate over the lines within the file."""
    return self
close()

Close the file.

Source code in stacture/terravis/gdal/vsi.py
def close(self):
    """Close the file."""
    if self._handle is not None:
        gdal.VSIFCloseL(self._handle)
    self._handle = None
fileno()
Source code in stacture/terravis/gdal/vsi.py
def fileno(self) -> int:
    # TODO: return the raw file descriptor somehow?
    raise OSError()
flush()
Source code in stacture/terravis/gdal/vsi.py
@_ensure_open
def flush(self):
    pass
isatty()

Never a TTY

Source code in stacture/terravis/gdal/vsi.py
def isatty(self):
    """Never a TTY"""
    return False
next()

Satisfaction of the iterator protocol. Return the next line in the file or raise StopIteration.

Source code in stacture/terravis/gdal/vsi.py
@_ensure_open
def next(self):
    """Satisfaction of the iterator protocol. Return the next line in the
    file or raise `StopIteration`.
    """
    line = self.readline()
    if not line:
        raise StopIteration
    return line
read(size=UINT32_MAX)

Read from the file. If no size is specified, read until the end of the file.

Source code in stacture/terravis/gdal/vsi.py
@_ensure_open
def read(self, size=UINT32_MAX):
    """Read from the file. If no ``size`` is specified, read until the end
    of the file.
    """

    value = gdal.VSIFReadL(1, size, self._handle)
    if isinstance(value, bytes):
        return value
    elif isinstance(value, bytearray):
        return bytes(value)
    elif value is None:
        return b""
    else:
        raise ValueError(value)
readable()
Source code in stacture/terravis/gdal/vsi.py
def readable(self):
    return True
readinto(into)
Source code in stacture/terravis/gdal/vsi.py
def readinto(self, into: bytearray) -> int:
    data = self.read(len(into))
    into[:] = data
    return len(data)
readline(length=None, windowsize=1024)

Read a single line from the file and return it.

:param length: the maximum number of bytes to read to look for a whole line. :param windowsize: the windowsize to search for a newline character.

Source code in stacture/terravis/gdal/vsi.py
@_ensure_open
def readline(self, length=None, windowsize=1024):
    """Read a single line from the file and return it.

    :param length: the maximum number of bytes to read to look for a whole
                   line.
    :param windowsize: the windowsize to search for a newline character.
    """
    line = ""
    while True:
        # read amount and detect for EOF
        string = self.read(length or windowsize)
        if not string:
            break

        try:
            position = string.index("\n")
            line += string[:position]

            # retun the cursor for the remainder of the string
            self.seek(-(len(string) - (position + 1)), os.SEEK_CUR)
            break
        except ValueError:
            line += string

        # also break when a specific size was requested but no newline was
        # found
        if length:
            break

    return line
readlines(sizehint=0)

Read the remainder of the file (or up to sizehint bytes) and return the lines.

:param sizehint: the number of bytes to scan for lines. :return: the lines :rtype: list of strings

Source code in stacture/terravis/gdal/vsi.py
def readlines(self, sizehint=0):
    """Read the remainder of the file (or up to `sizehint` bytes) and
    return the lines.

    :param sizehint: the number of bytes to scan for lines.
    :return: the lines
    :rtype: list of strings
    """
    # TODO: take sizehint into account
    lines = list(self)
    return lines
seek(offset, whence=os.SEEK_SET)

Set the new read/write offset in the file.

Source code in stacture/terravis/gdal/vsi.py
@_ensure_open
def seek(self, offset, whence=os.SEEK_SET) -> int:
    """Set the new read/write offset in the file."""
    return gdal.VSIFSeekL(self._handle, offset, whence)
seekable()
Source code in stacture/terravis/gdal/vsi.py
def seekable(self) -> bool:
    return True
tell()

Return the current read/write offset of the file.

Source code in stacture/terravis/gdal/vsi.py
@_ensure_open
def tell(self) -> int:
    """Return the current read/write offset of the file."""
    return gdal.VSIFTellL(self._handle)
truncate(size=None)

Truncates the file to the given size or to the size until the current position.

:param size: the new size of the file.

Source code in stacture/terravis/gdal/vsi.py
@_ensure_open
def truncate(self, size=None):
    """Truncates the file to the given size or to the size until the current
        position.

    :param size: the new size of the file.
    """
    size = size or self.tell()
    gdal.VSIFTruncateL(self._handle, size)
write(data)

Write the buffer data to the file.

Source code in stacture/terravis/gdal/vsi.py
@_ensure_open
def write(self, data: bytes) -> int:
    """Write the buffer ``data`` to the file."""
    return gdal.VSIFWriteL(data, 1, len(data), self._handle)
writeable()
Source code in stacture/terravis/gdal/vsi.py
def writeable(self) -> bool:
    return "w" in self._mode
writelines(lines)
Source code in stacture/terravis/gdal/vsi.py
def writelines(self, lines: Iterable[bytes]):
    for line in lines:
        self.write(line)
        self.write(b"\n")

join(first, *paths)

Joins the given VSI path specifiers. Similar to :func:os.path.join but takes care of the VSI-specific handles such as vsicurl, vsizip, etc.

Source code in stacture/terravis/gdal/vsi.py
def join(first: str, *paths: str) -> str:
    """Joins the given VSI path specifiers. Similar to :func:`os.path.join`
    but takes care of the VSI-specific handles such as `vsicurl`, `vsizip`,
    etc.
    """
    parts = first.split("/")
    for path in paths:
        new = path.split("/")
        if path.startswith("/vsi"):
            parts = new[0:2] + (parts if parts[0] else parts[1:]) + new[2:]
        else:
            parts.extend(new)
    return "/".join(parts)

open_(filename, mode='r')

A function mimicking the builtin function :func:open <__builtins__.open> but returning a :class:VSIFile instead.

:param filename: the path to the file; this might also be any VSI special path like "/vsicurl/..." or "/vsizip/...". See the GDAL documentation <http://trac.osgeo.org/gdal/wiki/UserDocs/ReadInZip>_ for reference. :param mode: the file opening mode :returns: a :class:VSIFile

Source code in stacture/terravis/gdal/vsi.py
def open_(filename: os.PathLike, mode="r") -> "VSIFile":
    """A function mimicking the builtin function
    :func:`open <__builtins__.open>` but returning a :class:`VSIFile` instead.

    :param filename: the path to the file; this might also be any VSI special
                     path like "/vsicurl/..." or "/vsizip/...". See the `GDAL
                     documentation
                     <http://trac.osgeo.org/gdal/wiki/UserDocs/ReadInZip>`_
                     for reference.
    :param mode: the file opening mode
    :returns: a :class:`VSIFile`
    """
    return VSIFile(filename, mode)

to_vsi_path(href, streaming=False)

Converts URLs to VSI compatible paths.

e.g: s3://bucket/path/to/file becomes /vsis3/bucket/path/to/file

Parameters:

Name Type Description Default
href str

the URL to transform

required
streaming bool

Whether to use the streaming interface

False

Returns:

Name Type Description
str str

the VSI path

Source code in stacture/terravis/gdal/vsi.py
def to_vsi_path(href: str, streaming: bool = False) -> str:
    """Converts URLs to VSI compatible paths.

    e.g: s3://bucket/path/to/file becomes /vsis3/bucket/path/to/file

    Args:
        href (str): the URL to transform
        streaming (bool): Whether to use the streaming interface

    Returns:
        str: the VSI path
    """
    parsed = urlparse(href)
    if parsed.scheme in PROTOCOL_MAP:
        vsi_prefix, full_url = PROTOCOL_MAP[parsed.scheme]
        if streaming:
            vsi_prefix = f"{vsi_prefix}_streaming"

        if full_url:
            return f"/{vsi_prefix}/{href}"
        return f"/{vsi_prefix}/{parsed.netloc}{parsed.path}"

    return href