Source code for preprocessor.util

import os
from os.path import splitext, join
from contextlib import contextmanager
from tempfile import TemporaryDirectory, mkdtemp
from time import time
from glob import glob
from .archive import filter_filenames


try:
    from osgeo import gdal
except ImportError:
    import gdal

gdal.UseExceptions()

try:
    from osgeo import osr
except ImportError:
    import osr

osr.UseExceptions()

try:
    from osgeo import ogr
except ImportError:
    import ogr

ogr.UseExceptions()


[docs]def replace_ext(filename: str, new_ext: str, force_dot: bool = True) -> str: return ( splitext(filename)[0] + ("" if new_ext.startswith(".") or not force_dot else ".") + new_ext )
[docs]def flatten(llist): return [item for sublist in llist for item in sublist]
[docs]@contextmanager def workdir(config: dict, use_dir: str = None): prefix = config.get("prefix", "preprocess_") workdir = config.get("workdir") if use_dir: os.chdir(use_dir) yield use_dir elif config.get("keep_temp", False): dirname = mkdtemp(prefix=prefix, dir=workdir) os.chdir(dirname) yield dirname else: with TemporaryDirectory(prefix=prefix, dir=workdir) as dirname: os.chdir(dirname) yield dirname
[docs]def pairwise(col): iterator = iter(col) while True: try: yield (next(iterator), next(iterator)) except StopIteration: break
[docs]class Timer: """Helper timer class to allow logging of timing values""" def __init__(self): self.start = None self.end = None def __enter__(self): self.start = time() return self def __exit__(self, *args, **kwargs): self.end = time() @property def elapsed(self): return (self.end if self.end is not None else time()) - self.start
[docs]def convert_unit(size_in_bytes, unit="B"): """Convert the size from bytes to other units like KB, MB, GB, TB""" if unit == "KB": return size_in_bytes / 1024 elif unit == "MB": return size_in_bytes / (1024 * 1024) elif unit == "GB": return size_in_bytes / (1024 * 1024 * 1024) elif unit == "TB": return size_in_bytes / (1024 * 1024 * 1024 * 1024) else: return size_in_bytes
[docs]def get_size_in_bytes(file_path, unit): """Get size of file at given path in bytes""" size = os.path.getsize(file_path) return convert_unit(size, unit)
[docs]def get_all_data_files(source_dir, preprocessor_config, data_file_globs=[]): """Based on 'data_file_globs' configuration, gets all unique data file paths from folder matching any of the globs""" # get all file paths recursively file_paths = [ p for p in glob(join(source_dir, "**"), recursive=True) if not os.path.isdir(p) ] # filter them by data_globs file_paths_filt = [] used_globs = preprocessor_config.get("data_file_globs", ["*"]) # override global data_file_globs by the provided one if possible if len(data_file_globs) > 0: used_globs = data_file_globs for dataglob in used_globs: file_paths_filt += filter_filenames( file_paths, dataglob, preprocessor_config.get("glob_case", False) ) # get only unique files to compensate for possibly bad glob yielding doubles # keeping order file_paths_filt = list(dict.fromkeys(file_paths_filt)) return file_paths_filt
[docs]def apply_gdal_config_options(preprocessor_config): """Applies config specific gdal configuration options for a given preprocessing step Returning original values to allow switching them back after preprocessing done. """ original_gdal_config_options = {} for config_option in preprocessor_config.get("gdal_config_options", []): key, _, val = config_option.partition("=") orig_val = gdal.GetConfigOption(key) gdal.SetConfigOption(key, val) original_gdal_config_options[key] = orig_val return original_gdal_config_options
[docs]def set_gdal_options(config_options): """Sets a key, value dictionary of config options to gdal""" for key, value in config_options.items(): gdal.SetConfigOption(key, value)
GDT_NODATA_LOOKUP = { # gdal data types and their respective default no data values to set "Byte": 255, "UInt16": 65535, "Int16": -32767, "CInt16": complex(-32768, -32768), "UInt32": 4294967293, "Int32": -2147483647, "CInt32": complex(-2147483648, -2147483648), "Float32": 3.402823466e38, "CFloat32": complex(-3.40282e38, -3.40282e38), "Float64": 1.7976931348623158e308, "CFloat64": complex(-1.79769e308, -1.79769e308), }