Source code for preprocessor.preprocess

import os
import os.path
import importlib
import logging
import structlog
import shutil
from pprint import pformat
from urllib.parse import urlparse
from typing import List

from .transfer import get_downloader, get_uploader
from .archive import unpack_files, filter_filenames
from .metadata import (
    update_config_by_product_types_and_levels,
    extract_metadata_for_stac,
)
from .steps import (
    georeference_step,
    extract_subdataset_step,
    calc_step,
    stack_bands_step,
    output_step,
)
from .steps.browse_report import browse_georeference
from .util import (
    workdir,
    Timer,
    get_size_in_bytes,
    apply_gdal_config_options,
    set_gdal_options,
    flatten,
)
from .stac import create_simple_stac_item

logging.basicConfig()

logger = structlog.getLogger(__name__)

# -----------------------------------------------------------------------------


[docs]def copy_files(source, target, move=False): for item in os.listdir(source): src_path = os.path.join(source, item) dst_path = os.path.join(target, item) if move: shutil.move(src_path, dst_path) else: if os.path.isdir(src_path): shutil.copytree(src_path, dst_path) else: shutil.copy(src_path, dst_path)
[docs]def custom_preprocessor( source_dir, target_dir, preprocess_config, path, data_file_globs: List[str] = [], args=None, kwargs=None, ): """Preprocessing step for a custom preprocessing.""" module_name, _, func_name = path.rpartition(".") func = getattr(importlib.import_module(module_name), func_name) func(source_dir, target_dir, preprocess_config, *(args or []), **(kwargs or {}))
[docs]def custom_postprocessor( source_dir, target_dir, preprocess_config, path, data_file_globs: List[str] = [], args=None, kwargs=None, ): """Preprocessing step for a custom preprocessing.""" module_name, _, func_name = path.rpartition(".") func = getattr(importlib.import_module(module_name), func_name) func(source_dir, target_dir, preprocess_config, *(args or []), **(kwargs or {}))
STEP_FUNCTIONS = { "custom_preprocessor": custom_preprocessor, "subdatasets": extract_subdataset_step, "georeference": georeference_step, "calc": calc_step, "stack_bands": stack_bands_step, "output": output_step, "custom_postprocessor": custom_postprocessor, } # -----------------------------------------------------------------------------
[docs]def preprocess_internal(preprocess_config, previous_step="unpack"): force_refresh = False # apply specific gdal config options original_config = apply_gdal_config_options(preprocess_config) # make processing steps for step in [ "custom_preprocessor", "subdatasets", "georeference", "calc", "stack_bands", "output", "custom_postprocessor", ]: step_config = preprocess_config.get(step) if not step_config: logger.debug("Skipping step %s as it is not configured." % step) continue # run the step if it was not already run if not os.path.isdir(step) or force_refresh: if os.path.isdir(step): logger.info("Forcing refresh of existing directory %s" % step) shutil.rmtree(step) logger.info("Running preprocessing step %s" % step) os.mkdir(step) preprocessor = STEP_FUNCTIONS[step] with Timer() as step_timer: preprocessor(previous_step, step, preprocess_config, **step_config) logger.info( "Finished preprocessing step %s after %.3f seconds." % (step, step_timer.elapsed) ) force_refresh = True else: logger.info("%s dir already exists, skipping step..." % step) previous_step = step # put back original configuration for further steps set_gdal_options(original_config) if not os.path.isdir("upload") or force_refresh: try: os.mkdir("upload") except FileExistsError: logger.debug("Upload folder already exists.") # copy or move files from previous step directory to upload directory copy_files( previous_step, "upload", move=preprocess_config.get("move_files", False) )
[docs]def preprocess_file(config: dict, file_path: str, use_dir: str = None): """Runs the preprocessing of a single file.""" with workdir(config, use_dir) as dirname, Timer() as preprocess_timer: logger.info("Preprocessing %s in %s" % (file_path, dirname)) target_config = config["target"] already_existed = False # check if target.replace is configured and if not, check storage if files there if not target_config.get("replace"): uploader = get_uploader( target_config["type"], target_config.get("args"), target_config.get("kwargs"), ) product_exists, uploaded_files = uploader.product_exists( file_path, target_config.get("expected_files_count", 2) ) if product_exists: logger.info("Product already exists: %s" % file_path) if config.get("stac_output", False): # filter which of the contents of the archive is a metadata file remote_path_metadata = filter_filenames( filenames=uploaded_files, glob=config["metadata_glob"], case=config.get("glob_case", False), )[0] # download the metadata file downloader_target = get_downloader( target_config["type"], target_config.get("args"), target_config.get("kwargs"), ) if not os.path.isdir("extra"): os.mkdir("extra") local_metadata_file = downloader_target.download( remote_path_metadata, "extra" ) md_file = {local_metadata_file: ""} metadata = extract_metadata_for_stac(md_file, "", "") identifier = metadata["id"] with structlog.contextvars.bound_contextvars(identifier=identifier): # update the config dict by type based config ( preprocess_config, product_type, product_level, ) = update_config_by_product_types_and_levels( [local_metadata_file], config ) uploaded_images = filter_filenames( filenames=uploaded_files, glob=config.get("output_image_glob", "*.tif"), case=config.get("glob_case", False), ) # convert to dict remote_path: remote_path # because we are not downloading the images again upload_images = { f"remote_reference_{i}": key for i, key in enumerate(uploaded_images) } uploaded_metadata_files = { local_metadata_file: remote_path_metadata } logger.debug("Uploaded images found: %s" % upload_images) logger.debug( "Uploaded metadata found: %s" % uploaded_metadata_files ) # build a stac asset based on contents of the target storage # based on config of the product type stac_item = create_simple_stac_item( preprocess_config, config, upload_images, uploaded_metadata_files, product_type, product_level, ) logger.info("Created a STAC item only.") logger.debug("STAC item: %s" % stac_item) logger.info( "Finished preprocessing of %s after %.3f seconds." % (file_path, preprocess_timer.elapsed) ) return stac_item, already_existed return file_path, already_existed else: logger.debug("Product does not yet exist on target") # check if we can reuse a previous download if not os.path.isdir("download"): os.mkdir("download") logger.info("Downloading %s to %s..." % (file_path, dirname)) # get the Downloader for the configured source archive to download the # given source file source_config = config["source"] downloader = get_downloader( source_config["type"], source_config.get("args"), source_config.get("kwargs"), ) with Timer() as download_timer: source_archive_path = downloader.download(file_path, "download") logger.info( 'Downloaded file %s with size "%.02f" MB in "%.2f" seconds' % ( file_path, get_size_in_bytes(source_archive_path, "MB"), download_timer.elapsed, ) ) else: source_archive_path = os.path.join("download", os.path.basename(file_path)) logger.info("Download dir already exists, skipping...") # fetch the metadata XML file from the downloaded archive metadata_files = unpack_files( source_archive_path, "extra", glob=config["metadata_glob"], case=config.get("glob_case", False), ) md_file = { metadata_files[0]: "", } metadata = extract_metadata_for_stac(md_file, "", "") identifier = metadata["id"] with structlog.contextvars.bound_contextvars(identifier=identifier): ( preprocess_config, product_type, product_level, ) = update_config_by_product_types_and_levels(metadata_files, config) # logger.debug("Using preprocessing config %s" % pformat(preprocess_config)) if not os.path.isdir("unpack"): os.mkdir("unpack") logger.info("Unpacking original files...") # select and unpack files according to configuration with Timer() as unpack_timer: data_files = flatten( [ unpack_files( source_archive_path, "unpack", glob=glob, case=config.get("glob_case", False), recursive=preprocess_config.get("nested", False), ) for glob in preprocess_config["data_file_globs"] ] ) metadata_files = flatten( [ unpack_files( source_archive_path, "unpack", glob=glob, case=config.get("glob_case", False), recursive=preprocess_config.get("nested", False), ) for glob in preprocess_config.get( "additional_file_globs", [] ) ] ) logger.info( "Unpacked files: %s in %.3f seconds" % (", ".join(metadata_files + data_files), unpack_timer.elapsed) ) if config.get("delete_download_after_unpack", False): shutil.rmtree("download") else: logger.info("Unpack dir already exists, skipping...") # actually perform the preprocessing from the downloaded file preprocess_internal(preprocess_config, "unpack") # get an uploader for the finalized images uploader = get_uploader( target_config["type"], target_config.get("args"), target_config.get("kwargs"), ) if len(os.listdir("upload")) == 0: # end here, so not only metadata file is uploaded raise Exception("No data files to upload, aborting.") upload_filenames = [ os.path.join(dirpath, filename) for dirpath, _, filenames in os.walk("upload") for filename in filenames ] extra_filenames = [ os.path.join(dirpath, filename) for dirpath, _, filenames in os.walk("extra") for filename in filenames ] # send all files in the upload directory to the target storage logger.info( "Starting uploading of %d files to %s" % (len(upload_filenames + extra_filenames), file_path) ) with Timer() as upload_timer: # returns dict local_path:upload_path upload_images = uploader.upload(upload_filenames, file_path) upload_extra = uploader.upload(extra_filenames, file_path) logger.info( "Finished uploading after %.3f seconds." % (upload_timer.elapsed) ) logger.info( "Finished preprocessing of %s after %.3f seconds." % (file_path, preprocess_timer.elapsed) ) if config.get("stac_output", False): # build a stac asset stac_item = create_simple_stac_item( preprocess_config, config, upload_images, upload_extra, product_type, product_level, ) return stac_item, already_existed return file_path, already_existed
[docs]def preprocess_browse( config: dict, browse_type: str, browse_report: dict, browse: dict, use_dir: str = None, ): # TODO: TO-BE-DELETED - should not be responsibility # TODO of preprocessor but ingestor/harvester with workdir(config, use_dir) as dirname, Timer() as preprocess_timer: filename = browse["filename"] logger.info('Preprocessing browse "%s" in %s' % (filename, dirname)) already_existed = False parsed = urlparse(filename) if not parsed.scheme: # check if target.replace is configured and if not, # check storage if files there if not config["target"].get("replace"): uploader = get_uploader( config["target"]["type"], config["target"].get("args"), config["target"].get("kwargs"), ) if uploader.product_exists( filename, config["target"].get("expected_files_count", 1) ): already_existed = True logger.warning( "Target.replace configuration is not set to true and objects" "alrzeady exist in target %s.", filename, ) # TODO UNIFY BROWSE AND PREPROCESS CODE PATHS! # if config.get('stac_output', False): # # build a stac asset # stac_item = create_simple_stac_item( # preprocess_config, config, upload_images, upload_extra) return browse["browse_identifier"], already_existed else: logger.debug("Browse does not yet exist on target") # check if we can reuse a previous download if not os.path.isdir("download"): os.mkdir("download") logger.info("Downloading %s from %s..." % (filename, dirname)) # get the Downloader for the configured source archive to download the # given source file source_config = config["source"] downloader = get_downloader( source_config["type"], source_config.get("args"), source_config.get("kwargs"), ) with Timer() as download_timer: _ = downloader.download(filename, "download") logger.info( "Downloaded file %s in %.3f seconds" % (filename, download_timer.elapsed) ) else: _ = os.path.join("download", os.path.basename(filename)) logger.info("Download dir already exists, skipping...") elif parsed.scheme in ("http", "https"): # TODO: check if allowed and download from there raise NotImplementedError if not os.path.isdir("unpack"): os.mkdir("unpack") if not os.path.isdir("extra"): os.mkdir("extra") logger.info("Applying browse georeference to browse %s" % filename) browse_georeference("download", "unpack", "extra", browse_report, browse) # fetch the product type from the browse_type product_type = config.get("browse_type_mapping", {}).get( browse_type, browse_type ) logger.info("Detected product_type %s" % (product_type)) # get a concrete configuration for the type, filled with the defaults default_config = dict(config["preprocessing"].get("defaults", {})) type_based_config = dict(config["preprocessing"]["types"].get(product_type, {})) default_config.update(type_based_config) preprocess_config = default_config logger.debug("Using preprocessing config %s" % pformat(preprocess_config)) preprocess_internal(preprocess_config) # get an uploader for the finalized images target_config = config["target"] uploader = get_uploader( target_config["type"], target_config.get("args"), target_config.get("kwargs"), ) upload_filenames = [ os.path.join(dirpath, filename) for dirpath, _, filenames in os.walk("upload") for filename in filenames ] extra_filenames = [ os.path.join(dirpath, filename) for dirpath, _, filenames in os.walk("extra") for filename in filenames ] file_path = browse["browse_identifier"] or upload_filenames[0] # send all files in the upload directory to the target storage logger.info( "Starting uploading of %d files to %s" % (len(upload_filenames + extra_filenames), file_path) ) with Timer() as upload_timer: # returns dict local_path:upload_path upload_images = uploader.upload(upload_filenames, file_path) upload_extra = uploader.upload(extra_filenames, file_path) logger.info("Finished uploading after %.3f seconds." % (upload_timer.elapsed)) logger.info( 'Finished preprocessing of browse "%s" after %.3f seconds.' % (filename, preprocess_timer.elapsed) ) if config.get("stac_output", False): # build a stac asset stac_item = create_simple_stac_item( preprocess_config, config, upload_images, upload_extra, product_type, None, ) return stac_item, already_existed return file_path, already_existed