Source code for preprocessor.steps.georeference

from os.path import join, basename, splitext
import structlog
from glob import glob
import shutil
from typing import Callable, Dict, List, Tuple, Union
import numpy as np

from ..metadata import extract_footprint
from ..util import gdal, osr, replace_ext, get_all_data_files, ogr

logger = structlog.getLogger(__name__)


[docs]def georeference_step( source_dir: str, target_dir: str, preprocessor_config: dict, geotransforms: List[dict], data_file_globs: List[str] = [], ): success = False for options in geotransforms: type_name = options["type"].lower() opts_dict = {i: options[i] for i in options if i != "type"} georef_func: Callable = FUNCTION_MAP[type_name] try: filenames = get_all_data_files( source_dir, preprocessor_config, data_file_globs ) for filename in filenames: target_filename = join(target_dir, basename(filename)) georef_func(filename, target_filename, **opts_dict) except Exception as e: # if any file fails, try another method logger.warn(e) continue else: # all files georeferenced without exception, no need to try another methods success = True logger.debug("Used geotransform method success: %s" % type_name) break if not success and len(geotransforms) > 0: raise Exception( "No configured georeference method successfull: %s" % ", ".join([item["type"] for item in geotransforms]) )
[docs]def gcp_georef( input_filename: str, target_filename: str, order: int = 1, projection: str = "EPSG:4326", tps: bool = False, warp_options: dict = None, ): succeded = False # simple case: get the geotransform from some GCPs read_only = False try: ds = gdal.Open(input_filename, gdal.GA_Update) except RuntimeError: logger.warn("Can not open file by GDAL for update mode %s" % (input_filename)) try: ds = gdal.Open(input_filename) read_only = True except RuntimeError: logger.warn( "Can not open file by GDAL for read mode too %s" % (input_filename) ) return if ds.GetGCPCount() <= 4: if read_only: # need to circumvent read_only via a temporary tif, which should be picked up # by data_file_globs ds = gdal.GetDriverByName("GTiff").CreateCopy( replace_ext(target_filename, ".tif"), ds, strict=0 ) try: gcps = ds.GetGCPs() gt = gdal.GCPsToGeoTransform(gcps) ds.SetGeoTransform(gt) except Exception: del ds logger.warning("Failed to get geotransform via GCPsToGeoTransform") else: del ds shutil.move(input_filename, target_filename) succeded = True logger.debug("GCPsToGeoTransform successful") # otherwise warp if not succeded: logger.info("Applying GCP transform by warping") options: Dict[str, Union[int, bool]] = {} if tps: options["tps"] = tps else: options["polynomialOrder"] = order gdal.Warp( target_filename, input_filename, dstSRS=projection, **(warp_options or {}), **options, )
[docs]def rpc_georef( input_filename: str, target_filename: str, rpc_file_template: str = "{fileroot}.RPC", warp_options: dict = None, ): fileroot, extension = splitext(input_filename) rpc_file_glob = rpc_file_template.format( filename=input_filename, fileroot=fileroot, extension=extension, ) rpc_filename = None try: rpc_filename = glob(rpc_file_glob, recursive=True)[0] except IndexError: logger.warn("No RPC filename found with glob %s" % rpc_file_glob) # rename RPC filename to be compatible with GDAL if rpc_filename: shutil.copy(rpc_filename, replace_ext(input_filename, ".rpc")) gdal.Warp(target_filename, input_filename, rpc=True, **(warp_options or {}))
[docs]def corner_georef( input_filename: str, target_filename: str, corner_names: List[str] = None, gcp_srid: int = 4326, warp: bool = False, ): corner_names = corner_names or [ "bottom_left", "bottom_right", "top_left", "top_right", ] ds = gdal.Open(input_filename, gdal.GA_Update) metadata = ds.GetRasterBand(1).GetMetadata() # string match on metadata keys instead of direct lookup to give more flexibility corners = [] for corner_name in corner_names: # find keys which contain corner_name corner_metadata = [ val for key, val in metadata.items() if corner_name.lower() in key.lower() ][0] # convert to float corners.append([float(num) for num in corner_metadata.split()]) bl, br, tl, tr = corners gcps = gcps_from_borders( (ds.RasterXSize, ds.RasterYSize), (bl, br, tl, tr), ) sr = osr.SpatialReference() sr.ImportFromEPSG(gcp_srid) ds.SetGCPs(gcps, sr.ExportToWkt()) if warp: gdal.Warp( target_filename, ds, ) del ds else: ds.SetGeoTransform(gdal.GCPsToGeoTransform(ds.GetGCPs())) driver = ds.GetDriver() del ds driver.Rename(target_filename, input_filename)
[docs]def world_georef(): # TODO: implement pass
[docs]def no_op(input_filename: str, target_filename: str): # does not perform any operation # configure as last step if you want other geotransform method to either pass or # fail and then perform no other shutil.move(input_filename, target_filename)
[docs]def fix_geotrans(input_filename: str, target_filename: str, warp_options: dict = None): # assumes already existing geotransform and if xres=0,yres=0, fixes it try: ds = gdal.Open(input_filename, gdal.GA_Update) except RuntimeError: logger.warn("Can not open file by GDAL %s" % (input_filename)) return ds = correct_geo_transform(ds) if ds.GetDriver().ShortName == "JP2OpenJPEG": # workaround for rewriting jp2 files and exception thrown by # "USE_SRC_CODESTREAM=YES specified, but no codestream found" # then needs .vrt configured in data_file_globs too target_filename = replace_ext(target_filename, ".vrt") gdal.Warp( target_filename, ds, **(warp_options or {}), )
[docs]def correct_geo_transform(src_ds): # input - gdal dataset # sets new geotransform if necessary by creating control points of a # raster with switched height and width # returns - gdal dataset ulx, xres, xskew, uly, yskew, yres = src_ds.GetGeoTransform() logger.debug("Testing for malformed geotransform") # test geotransform if necessary to shift if xres == 0.0 and yres == 0.0: logger.info("Malformed geotransform xres,yres=0 detected, correcting.") # malformed image, compute xres and yres switched in geotransform lrx = ulx + (src_ds.RasterXSize * xskew) lry = uly + (src_ds.RasterYSize * yskew) # [ulx, lrx, lry, uly] - bounds = lon_min, lon_max, lat_min, lat_max fp = [ [0, src_ds.RasterXSize, src_ds.RasterXSize, 0], [0, 0, src_ds.RasterYSize, src_ds.RasterYSize], ] tp = [[ulx, lrx, lrx, ulx], [lry, lry, uly, uly]] pix = list(zip(fp[0], fp[1])) coor = list(zip(tp[0], tp[1])) # compute the gdal.GCP parameters gcps = [] for index, txt in enumerate(pix): gcps.append(gdal.GCP()) gcps[index].GCPPixel = pix[index][0] gcps[index].GCPLine = src_ds.RasterYSize - int(pix[index][1]) gcps[index].GCPX = coor[index][0] gcps[index].GCPY = coor[index][1] # get correct geotransform from gcps geotransform_new = gdal.GCPsToGeoTransform(gcps) # overwrite geotransform with new src_ds.SetGeoTransform(geotransform_new) return src_ds
[docs]def gcps_from_borders( size: Tuple[float, float], coords: Tuple[List[float], List[float], List[float], List[float]], ): x_size, y_size = size # expects coordinates in dict(.*border_left.*:[lat,lon],...) gcps = [] bl, br, tl, tr = coords x_left = 0.5 x_right = x_size - 0.5 y_bottom = y_size - 0.5 y_top = 0.5 gcps.extend( [ gdal.GCP(bl[1], bl[0], 0, x_left, y_bottom), gdal.GCP(br[1], br[0], 0, x_right, y_bottom), gdal.GCP(tl[1], tl[0], 0, x_left, y_top), gdal.GCP(tr[1], tr[0], 0, x_right, y_top), ] ) return gcps
[docs]def gsc_footprint_georef( input_filename: str, target_filename: str, no_data: Union[int, float] = None ): read_only = False try: ds = gdal.Open(input_filename, gdal.GA_Update) except RuntimeError: logger.warn("Can not open file by GDAL for update mode %s" % (input_filename)) try: ds = gdal.Open(input_filename) read_only = True except RuntimeError: logger.warn( "Can not open file by GDAL for read mode too %s" % (input_filename) ) return if read_only or ds.GetDriver().ShortName == "JP2OpenJPEG": # need to circumvent read_only via a temporary tif, which # should be picked up by data_file_globs ds = gdal.GetDriverByName("GTiff").CreateCopy( replace_ext(target_filename, ".tif"), ds, strict=0 ) set_gcps_from_gsc_footprint(ds, {}, no_data) if not read_only: shutil.move(input_filename, target_filename)
[docs]def GetOgrEnvelope(geom): (minX, maxX, minY, maxY) = geom.GetEnvelope() # Create ring ring = ogr.Geometry(ogr.wkbLinearRing) # tl, bl, br, tr ring.AddPoint(minX, maxY) ring.AddPoint(minX, minY) ring.AddPoint(maxX, minY) ring.AddPoint(maxX, maxY) # Create polygon poly_envelope = ogr.Geometry(ogr.wkbPolygon) poly_envelope.AddGeometry(ring) return poly_envelope
[docs]def set_gcps_from_gsc_footprint(ds, config, no_data=0): logger.info("Trying to set GCPs from GSC footprint.") gsc_files = get_all_data_files("extra", config, ["*GSC*.xml"]) wkt_footprint = extract_footprint(gsc_files) if wkt_footprint is not None: sr = osr.SpatialReference() sr.ImportFromEPSG(4326) geom = ogr.CreateGeometryFromWkt(wkt_footprint) # if multipolygon or invalid geom extract convex hull if not geom.IsValid() or geom.GetGeometryType() == 6: geom = geom.ConvexHull() polygon = geom # need polygon with 4 points to match with 4 border gcps if polygon.GetPointCount() != 5: polygon = GetOgrEnvelope(polygon) linear_ring = polygon.GetGeometryRef(0) gcps = [] tl, bl, br, tr = [ (linear_ring.GetPoint(i)[0], linear_ring.GetPoint(i)[1]) for i in range(0, 4) ] np_array = np.array(ds.GetRasterBand(1).ReadAsArray()) x_size = ds.RasterXSize y_size = ds.RasterYSize found = False # find topleft if no_data is not None: # find first non-nodatavalue pixel in cols/rows for i, row in enumerate(np_array): if not found: for j, col in enumerate(row): if col != no_data: gcps.append(gdal.GCP(tl[0], tl[1], 0, j + 0.5, i + 0.5)) found = True break else: break # find bottomright found = False for i, row in enumerate(np_array[::-1]): if not found: for j, col in enumerate(row): if col != no_data: gcps.append( gdal.GCP(br[0], br[1], 0, j + 0.5, y_size - i - 0.5) ) found = True break else: break # find bottomleft transpose = np_array.transpose() found = False for i, col in enumerate(transpose): if not found: for j, row in enumerate(col): if row != no_data: gcps.append(gdal.GCP(bl[0], bl[1], 0, i + 0.5, j + 0.5)) found = True break else: break found = False # find bottomright for i, col in enumerate(transpose[::-1]): if not found: for j, row in enumerate(col): if row != no_data: gcps.append( gdal.GCP(tr[0], tr[1], 0, x_size - i - 0.5, j + 0.5) ) found = True break else: break else: # do not consider nodata areas and just take corners gcps.append(gdal.GCP(tl[0], tl[1], 0, 0.5, 0.5)) gcps.append(gdal.GCP(br[0], br[1], 0, x_size - 0.5, y_size - 0.5)) gcps.append(gdal.GCP(bl[0], bl[1], 0, 0.5, y_size - 0.5)) gcps.append(gdal.GCP(tr[0], tr[1], 0, x_size - 0.5, 0.5)) ds.SetGCPs(gcps, sr.ExportToWkt()) geotransform = gdal.GCPsToGeoTransform(ds.GetGCPs()) ds.SetGeoTransform(geotransform)
FUNCTION_MAP: Dict[str, Callable] = { "gcp": gcp_georef, "rpc": rpc_georef, "world": world_georef, "corners": corner_georef, "gsc_footprint": gsc_footprint_georef, "no_op": no_op, "fix_geotrans": fix_geotrans, }