import os
import os.path
import importlib
import logging
import structlog
import shutil
from pprint import pformat
from urllib.parse import urlparse
from typing import List
from .transfer import get_downloader, get_uploader
from .archive import unpack_files, filter_filenames
from .metadata import (
update_config_by_product_types_and_levels,
extract_metadata_for_stac,
)
from .steps import (
georeference_step,
extract_subdataset_step,
calc_step,
stack_bands_step,
output_step,
)
from .steps.browse_report import browse_georeference
from .util import (
workdir,
Timer,
get_size_in_bytes,
apply_gdal_config_options,
set_gdal_options,
flatten,
)
from .stac import create_simple_stac_item
logging.basicConfig()
logger = structlog.getLogger(__name__)
# -----------------------------------------------------------------------------
[docs]def copy_files(source, target, move=False):
for item in os.listdir(source):
src_path = os.path.join(source, item)
dst_path = os.path.join(target, item)
if move:
shutil.move(src_path, dst_path)
else:
if os.path.isdir(src_path):
shutil.copytree(src_path, dst_path)
else:
shutil.copy(src_path, dst_path)
[docs]def custom_preprocessor(
source_dir,
target_dir,
preprocess_config,
path,
data_file_globs: List[str] = [],
args=None,
kwargs=None,
):
"""Preprocessing step for a custom preprocessing."""
module_name, _, func_name = path.rpartition(".")
func = getattr(importlib.import_module(module_name), func_name)
func(source_dir, target_dir, preprocess_config, *(args or []), **(kwargs or {}))
[docs]def custom_postprocessor(
source_dir,
target_dir,
preprocess_config,
path,
data_file_globs: List[str] = [],
args=None,
kwargs=None,
):
"""Preprocessing step for a custom preprocessing."""
module_name, _, func_name = path.rpartition(".")
func = getattr(importlib.import_module(module_name), func_name)
func(source_dir, target_dir, preprocess_config, *(args or []), **(kwargs or {}))
STEP_FUNCTIONS = {
"custom_preprocessor": custom_preprocessor,
"subdatasets": extract_subdataset_step,
"georeference": georeference_step,
"calc": calc_step,
"stack_bands": stack_bands_step,
"output": output_step,
"custom_postprocessor": custom_postprocessor,
}
# -----------------------------------------------------------------------------
[docs]def preprocess_internal(preprocess_config, previous_step="unpack"):
force_refresh = False
# apply specific gdal config options
original_config = apply_gdal_config_options(preprocess_config)
# make processing steps
for step in [
"custom_preprocessor",
"subdatasets",
"georeference",
"calc",
"stack_bands",
"output",
"custom_postprocessor",
]:
step_config = preprocess_config.get(step)
if not step_config:
logger.debug("Skipping step %s as it is not configured." % step)
continue
# run the step if it was not already run
if not os.path.isdir(step) or force_refresh:
if os.path.isdir(step):
logger.info("Forcing refresh of existing directory %s" % step)
shutil.rmtree(step)
logger.info("Running preprocessing step %s" % step)
os.mkdir(step)
preprocessor = STEP_FUNCTIONS[step]
with Timer() as step_timer:
preprocessor(previous_step, step, preprocess_config, **step_config)
logger.info(
"Finished preprocessing step %s after %.3f seconds."
% (step, step_timer.elapsed)
)
force_refresh = True
else:
logger.info("%s dir already exists, skipping step..." % step)
previous_step = step
# put back original configuration for further steps
set_gdal_options(original_config)
if not os.path.isdir("upload") or force_refresh:
try:
os.mkdir("upload")
except FileExistsError:
logger.debug("Upload folder already exists.")
# copy or move files from previous step directory to upload directory
copy_files(
previous_step, "upload", move=preprocess_config.get("move_files", False)
)
[docs]def preprocess_file(config: dict, file_path: str, use_dir: str = None):
"""Runs the preprocessing of a single file."""
with workdir(config, use_dir) as dirname, Timer() as preprocess_timer:
logger.info("Preprocessing %s in %s" % (file_path, dirname))
target_config = config["target"]
already_existed = False
# check if target.replace is configured and if not, check storage if files there
if not target_config.get("replace"):
uploader = get_uploader(
target_config["type"],
target_config.get("args"),
target_config.get("kwargs"),
)
product_exists, uploaded_files = uploader.product_exists(
file_path, target_config.get("expected_files_count", 2)
)
if product_exists:
logger.info("Product already exists: %s" % file_path)
if config.get("stac_output", False):
# filter which of the contents of the archive is a metadata file
remote_path_metadata = filter_filenames(
filenames=uploaded_files,
glob=config["metadata_glob"],
case=config.get("glob_case", False),
)[0]
# download the metadata file
downloader_target = get_downloader(
target_config["type"],
target_config.get("args"),
target_config.get("kwargs"),
)
if not os.path.isdir("extra"):
os.mkdir("extra")
local_metadata_file = downloader_target.download(
remote_path_metadata, "extra"
)
md_file = {local_metadata_file: ""}
metadata = extract_metadata_for_stac(md_file, "", "")
identifier = metadata["id"]
with structlog.contextvars.bound_contextvars(identifier=identifier):
# update the config dict by type based config
(
preprocess_config,
product_type,
product_level,
) = update_config_by_product_types_and_levels(
[local_metadata_file], config
)
uploaded_images = filter_filenames(
filenames=uploaded_files,
glob=config.get("output_image_glob", "*.tif"),
case=config.get("glob_case", False),
)
# convert to dict remote_path: remote_path
# because we are not downloading the images again
upload_images = {
f"remote_reference_{i}": key
for i, key in enumerate(uploaded_images)
}
uploaded_metadata_files = {
local_metadata_file: remote_path_metadata
}
logger.debug("Uploaded images found: %s" % upload_images)
logger.debug(
"Uploaded metadata found: %s" % uploaded_metadata_files
)
# build a stac asset based on contents of the target storage
# based on config of the product type
stac_item = create_simple_stac_item(
preprocess_config,
config,
upload_images,
uploaded_metadata_files,
product_type,
product_level,
)
logger.info("Created a STAC item only.")
logger.debug("STAC item: %s" % stac_item)
logger.info(
"Finished preprocessing of %s after %.3f seconds."
% (file_path, preprocess_timer.elapsed)
)
return stac_item, already_existed
return file_path, already_existed
else:
logger.debug("Product does not yet exist on target")
# check if we can reuse a previous download
if not os.path.isdir("download"):
os.mkdir("download")
logger.info("Downloading %s to %s..." % (file_path, dirname))
# get the Downloader for the configured source archive to download the
# given source file
source_config = config["source"]
downloader = get_downloader(
source_config["type"],
source_config.get("args"),
source_config.get("kwargs"),
)
with Timer() as download_timer:
source_archive_path = downloader.download(file_path, "download")
logger.info(
'Downloaded file %s with size "%.02f" MB in "%.2f" seconds'
% (
file_path,
get_size_in_bytes(source_archive_path, "MB"),
download_timer.elapsed,
)
)
else:
source_archive_path = os.path.join("download", os.path.basename(file_path))
logger.info("Download dir already exists, skipping...")
# fetch the metadata XML file from the downloaded archive
metadata_files = unpack_files(
source_archive_path,
"extra",
glob=config["metadata_glob"],
case=config.get("glob_case", False),
)
md_file = {
metadata_files[0]: "",
}
metadata = extract_metadata_for_stac(md_file, "", "")
identifier = metadata["id"]
with structlog.contextvars.bound_contextvars(identifier=identifier):
(
preprocess_config,
product_type,
product_level,
) = update_config_by_product_types_and_levels(metadata_files, config)
# logger.debug("Using preprocessing config %s" % pformat(preprocess_config))
if not os.path.isdir("unpack"):
os.mkdir("unpack")
logger.info("Unpacking original files...")
# select and unpack files according to configuration
with Timer() as unpack_timer:
data_files = flatten(
[
unpack_files(
source_archive_path,
"unpack",
glob=glob,
case=config.get("glob_case", False),
recursive=preprocess_config.get("nested", False),
)
for glob in preprocess_config["data_file_globs"]
]
)
metadata_files = flatten(
[
unpack_files(
source_archive_path,
"unpack",
glob=glob,
case=config.get("glob_case", False),
recursive=preprocess_config.get("nested", False),
)
for glob in preprocess_config.get(
"additional_file_globs", []
)
]
)
logger.info(
"Unpacked files: %s in %.3f seconds"
% (", ".join(metadata_files + data_files), unpack_timer.elapsed)
)
if config.get("delete_download_after_unpack", False):
shutil.rmtree("download")
else:
logger.info("Unpack dir already exists, skipping...")
# actually perform the preprocessing from the downloaded file
preprocess_internal(preprocess_config, "unpack")
# get an uploader for the finalized images
uploader = get_uploader(
target_config["type"],
target_config.get("args"),
target_config.get("kwargs"),
)
if len(os.listdir("upload")) == 0:
# end here, so not only metadata file is uploaded
raise Exception("No data files to upload, aborting.")
upload_filenames = [
os.path.join(dirpath, filename)
for dirpath, _, filenames in os.walk("upload")
for filename in filenames
]
extra_filenames = [
os.path.join(dirpath, filename)
for dirpath, _, filenames in os.walk("extra")
for filename in filenames
]
# send all files in the upload directory to the target storage
logger.info(
"Starting uploading of %d files to %s"
% (len(upload_filenames + extra_filenames), file_path)
)
with Timer() as upload_timer:
# returns dict local_path:upload_path
upload_images = uploader.upload(upload_filenames, file_path)
upload_extra = uploader.upload(extra_filenames, file_path)
logger.info(
"Finished uploading after %.3f seconds." % (upload_timer.elapsed)
)
logger.info(
"Finished preprocessing of %s after %.3f seconds."
% (file_path, preprocess_timer.elapsed)
)
if config.get("stac_output", False):
# build a stac asset
stac_item = create_simple_stac_item(
preprocess_config,
config,
upload_images,
upload_extra,
product_type,
product_level,
)
return stac_item, already_existed
return file_path, already_existed
[docs]def preprocess_browse(
config: dict,
browse_type: str,
browse_report: dict,
browse: dict,
use_dir: str = None,
):
# TODO: TO-BE-DELETED - should not be responsibility
# TODO of preprocessor but ingestor/harvester
with workdir(config, use_dir) as dirname, Timer() as preprocess_timer:
filename = browse["filename"]
logger.info('Preprocessing browse "%s" in %s' % (filename, dirname))
already_existed = False
parsed = urlparse(filename)
if not parsed.scheme:
# check if target.replace is configured and if not,
# check storage if files there
if not config["target"].get("replace"):
uploader = get_uploader(
config["target"]["type"],
config["target"].get("args"),
config["target"].get("kwargs"),
)
if uploader.product_exists(
filename, config["target"].get("expected_files_count", 1)
):
already_existed = True
logger.warning(
"Target.replace configuration is not set to true and objects"
"alrzeady exist in target %s.",
filename,
)
# TODO UNIFY BROWSE AND PREPROCESS CODE PATHS!
# if config.get('stac_output', False):
# # build a stac asset
# stac_item = create_simple_stac_item(
# preprocess_config, config, upload_images, upload_extra)
return browse["browse_identifier"], already_existed
else:
logger.debug("Browse does not yet exist on target")
# check if we can reuse a previous download
if not os.path.isdir("download"):
os.mkdir("download")
logger.info("Downloading %s from %s..." % (filename, dirname))
# get the Downloader for the configured source archive to download the
# given source file
source_config = config["source"]
downloader = get_downloader(
source_config["type"],
source_config.get("args"),
source_config.get("kwargs"),
)
with Timer() as download_timer:
_ = downloader.download(filename, "download")
logger.info(
"Downloaded file %s in %.3f seconds"
% (filename, download_timer.elapsed)
)
else:
_ = os.path.join("download", os.path.basename(filename))
logger.info("Download dir already exists, skipping...")
elif parsed.scheme in ("http", "https"):
# TODO: check if allowed and download from there
raise NotImplementedError
if not os.path.isdir("unpack"):
os.mkdir("unpack")
if not os.path.isdir("extra"):
os.mkdir("extra")
logger.info("Applying browse georeference to browse %s" % filename)
browse_georeference("download", "unpack", "extra", browse_report, browse)
# fetch the product type from the browse_type
product_type = config.get("browse_type_mapping", {}).get(
browse_type, browse_type
)
logger.info("Detected product_type %s" % (product_type))
# get a concrete configuration for the type, filled with the defaults
default_config = dict(config["preprocessing"].get("defaults", {}))
type_based_config = dict(config["preprocessing"]["types"].get(product_type, {}))
default_config.update(type_based_config)
preprocess_config = default_config
logger.debug("Using preprocessing config %s" % pformat(preprocess_config))
preprocess_internal(preprocess_config)
# get an uploader for the finalized images
target_config = config["target"]
uploader = get_uploader(
target_config["type"],
target_config.get("args"),
target_config.get("kwargs"),
)
upload_filenames = [
os.path.join(dirpath, filename)
for dirpath, _, filenames in os.walk("upload")
for filename in filenames
]
extra_filenames = [
os.path.join(dirpath, filename)
for dirpath, _, filenames in os.walk("extra")
for filename in filenames
]
file_path = browse["browse_identifier"] or upload_filenames[0]
# send all files in the upload directory to the target storage
logger.info(
"Starting uploading of %d files to %s"
% (len(upload_filenames + extra_filenames), file_path)
)
with Timer() as upload_timer:
# returns dict local_path:upload_path
upload_images = uploader.upload(upload_filenames, file_path)
upload_extra = uploader.upload(extra_filenames, file_path)
logger.info("Finished uploading after %.3f seconds." % (upload_timer.elapsed))
logger.info(
'Finished preprocessing of browse "%s" after %.3f seconds.'
% (filename, preprocess_timer.elapsed)
)
if config.get("stac_output", False):
# build a stac asset
stac_item = create_simple_stac_item(
preprocess_config,
config,
upload_images,
upload_extra,
product_type,
None,
)
return stac_item, already_existed
return file_path, already_existed