Source code for preprocessor.steps.georeference
from os.path import join, basename, splitext
import structlog
from glob import glob
import shutil
from typing import Callable, Dict, List, Tuple, Union
import numpy as np
from ..metadata import extract_footprint
from ..util import gdal, osr, replace_ext, get_all_data_files, ogr
logger = structlog.getLogger(__name__)
[docs]def georeference_step(
source_dir: str,
target_dir: str,
preprocessor_config: dict,
geotransforms: List[dict],
data_file_globs: List[str] = [],
):
success = False
for options in geotransforms:
type_name = options["type"].lower()
opts_dict = {i: options[i] for i in options if i != "type"}
georef_func: Callable = FUNCTION_MAP[type_name]
try:
filenames = get_all_data_files(
source_dir, preprocessor_config, data_file_globs
)
for filename in filenames:
target_filename = join(target_dir, basename(filename))
georef_func(filename, target_filename, **opts_dict)
except Exception as e:
# if any file fails, try another method
logger.warn(e)
continue
else:
# all files georeferenced without exception, no need to try another methods
success = True
logger.debug("Used geotransform method success: %s" % type_name)
break
if not success and len(geotransforms) > 0:
raise Exception(
"No configured georeference method successfull: %s"
% ", ".join([item["type"] for item in geotransforms])
)
[docs]def gcp_georef(
input_filename: str,
target_filename: str,
order: int = 1,
projection: str = "EPSG:4326",
tps: bool = False,
warp_options: dict = None,
):
succeded = False
# simple case: get the geotransform from some GCPs
read_only = False
try:
ds = gdal.Open(input_filename, gdal.GA_Update)
except RuntimeError:
logger.warn("Can not open file by GDAL for update mode %s" % (input_filename))
try:
ds = gdal.Open(input_filename)
read_only = True
except RuntimeError:
logger.warn(
"Can not open file by GDAL for read mode too %s" % (input_filename)
)
return
if ds.GetGCPCount() <= 4:
if read_only:
# need to circumvent read_only via a temporary tif, which should be picked up
# by data_file_globs
ds = gdal.GetDriverByName("GTiff").CreateCopy(
replace_ext(target_filename, ".tif"), ds, strict=0
)
try:
gcps = ds.GetGCPs()
gt = gdal.GCPsToGeoTransform(gcps)
ds.SetGeoTransform(gt)
except Exception:
del ds
logger.warning("Failed to get geotransform via GCPsToGeoTransform")
else:
del ds
shutil.move(input_filename, target_filename)
succeded = True
logger.debug("GCPsToGeoTransform successful")
# otherwise warp
if not succeded:
logger.info("Applying GCP transform by warping")
options: Dict[str, Union[int, bool]] = {}
if tps:
options["tps"] = tps
else:
options["polynomialOrder"] = order
gdal.Warp(
target_filename,
input_filename,
dstSRS=projection,
**(warp_options or {}),
**options,
)
[docs]def rpc_georef(
input_filename: str,
target_filename: str,
rpc_file_template: str = "{fileroot}.RPC",
warp_options: dict = None,
):
fileroot, extension = splitext(input_filename)
rpc_file_glob = rpc_file_template.format(
filename=input_filename,
fileroot=fileroot,
extension=extension,
)
rpc_filename = None
try:
rpc_filename = glob(rpc_file_glob, recursive=True)[0]
except IndexError:
logger.warn("No RPC filename found with glob %s" % rpc_file_glob)
# rename RPC filename to be compatible with GDAL
if rpc_filename:
shutil.copy(rpc_filename, replace_ext(input_filename, ".rpc"))
gdal.Warp(target_filename, input_filename, rpc=True, **(warp_options or {}))
[docs]def corner_georef(
input_filename: str,
target_filename: str,
corner_names: List[str] = None,
gcp_srid: int = 4326,
warp: bool = False,
):
corner_names = corner_names or [
"bottom_left",
"bottom_right",
"top_left",
"top_right",
]
ds = gdal.Open(input_filename, gdal.GA_Update)
metadata = ds.GetRasterBand(1).GetMetadata()
# string match on metadata keys instead of direct lookup to give more flexibility
corners = []
for corner_name in corner_names:
# find keys which contain corner_name
corner_metadata = [
val for key, val in metadata.items() if corner_name.lower() in key.lower()
][0]
# convert to float
corners.append([float(num) for num in corner_metadata.split()])
bl, br, tl, tr = corners
gcps = gcps_from_borders(
(ds.RasterXSize, ds.RasterYSize),
(bl, br, tl, tr),
)
sr = osr.SpatialReference()
sr.ImportFromEPSG(gcp_srid)
ds.SetGCPs(gcps, sr.ExportToWkt())
if warp:
gdal.Warp(
target_filename,
ds,
)
del ds
else:
ds.SetGeoTransform(gdal.GCPsToGeoTransform(ds.GetGCPs()))
driver = ds.GetDriver()
del ds
driver.Rename(target_filename, input_filename)
[docs]def no_op(input_filename: str, target_filename: str):
# does not perform any operation
# configure as last step if you want other geotransform method to either pass or
# fail and then perform no other
shutil.move(input_filename, target_filename)
[docs]def fix_geotrans(input_filename: str, target_filename: str, warp_options: dict = None):
# assumes already existing geotransform and if xres=0,yres=0, fixes it
try:
ds = gdal.Open(input_filename, gdal.GA_Update)
except RuntimeError:
logger.warn("Can not open file by GDAL %s" % (input_filename))
return
ds = correct_geo_transform(ds)
if ds.GetDriver().ShortName == "JP2OpenJPEG":
# workaround for rewriting jp2 files and exception thrown by
# "USE_SRC_CODESTREAM=YES specified, but no codestream found"
# then needs .vrt configured in data_file_globs too
target_filename = replace_ext(target_filename, ".vrt")
gdal.Warp(
target_filename,
ds,
**(warp_options or {}),
)
[docs]def correct_geo_transform(src_ds):
# input - gdal dataset
# sets new geotransform if necessary by creating control points of a
# raster with switched height and width
# returns - gdal dataset
ulx, xres, xskew, uly, yskew, yres = src_ds.GetGeoTransform()
logger.debug("Testing for malformed geotransform")
# test geotransform if necessary to shift
if xres == 0.0 and yres == 0.0:
logger.info("Malformed geotransform xres,yres=0 detected, correcting.")
# malformed image, compute xres and yres switched in geotransform
lrx = ulx + (src_ds.RasterXSize * xskew)
lry = uly + (src_ds.RasterYSize * yskew)
# [ulx, lrx, lry, uly] - bounds = lon_min, lon_max, lat_min, lat_max
fp = [
[0, src_ds.RasterXSize, src_ds.RasterXSize, 0],
[0, 0, src_ds.RasterYSize, src_ds.RasterYSize],
]
tp = [[ulx, lrx, lrx, ulx], [lry, lry, uly, uly]]
pix = list(zip(fp[0], fp[1]))
coor = list(zip(tp[0], tp[1]))
# compute the gdal.GCP parameters
gcps = []
for index, txt in enumerate(pix):
gcps.append(gdal.GCP())
gcps[index].GCPPixel = pix[index][0]
gcps[index].GCPLine = src_ds.RasterYSize - int(pix[index][1])
gcps[index].GCPX = coor[index][0]
gcps[index].GCPY = coor[index][1]
# get correct geotransform from gcps
geotransform_new = gdal.GCPsToGeoTransform(gcps)
# overwrite geotransform with new
src_ds.SetGeoTransform(geotransform_new)
return src_ds
[docs]def gcps_from_borders(
size: Tuple[float, float],
coords: Tuple[List[float], List[float], List[float], List[float]],
):
x_size, y_size = size
# expects coordinates in dict(.*border_left.*:[lat,lon],...)
gcps = []
bl, br, tl, tr = coords
x_left = 0.5
x_right = x_size - 0.5
y_bottom = y_size - 0.5
y_top = 0.5
gcps.extend(
[
gdal.GCP(bl[1], bl[0], 0, x_left, y_bottom),
gdal.GCP(br[1], br[0], 0, x_right, y_bottom),
gdal.GCP(tl[1], tl[0], 0, x_left, y_top),
gdal.GCP(tr[1], tr[0], 0, x_right, y_top),
]
)
return gcps
[docs]def gsc_footprint_georef(
input_filename: str, target_filename: str, no_data: Union[int, float] = None
):
read_only = False
try:
ds = gdal.Open(input_filename, gdal.GA_Update)
except RuntimeError:
logger.warn("Can not open file by GDAL for update mode %s" % (input_filename))
try:
ds = gdal.Open(input_filename)
read_only = True
except RuntimeError:
logger.warn(
"Can not open file by GDAL for read mode too %s" % (input_filename)
)
return
if read_only or ds.GetDriver().ShortName == "JP2OpenJPEG":
# need to circumvent read_only via a temporary tif, which
# should be picked up by data_file_globs
ds = gdal.GetDriverByName("GTiff").CreateCopy(
replace_ext(target_filename, ".tif"), ds, strict=0
)
set_gcps_from_gsc_footprint(ds, {}, no_data)
if not read_only:
shutil.move(input_filename, target_filename)
[docs]def GetOgrEnvelope(geom):
(minX, maxX, minY, maxY) = geom.GetEnvelope()
# Create ring
ring = ogr.Geometry(ogr.wkbLinearRing)
# tl, bl, br, tr
ring.AddPoint(minX, maxY)
ring.AddPoint(minX, minY)
ring.AddPoint(maxX, minY)
ring.AddPoint(maxX, maxY)
# Create polygon
poly_envelope = ogr.Geometry(ogr.wkbPolygon)
poly_envelope.AddGeometry(ring)
return poly_envelope
[docs]def set_gcps_from_gsc_footprint(ds, config, no_data=0):
logger.info("Trying to set GCPs from GSC footprint.")
gsc_files = get_all_data_files("extra", config, ["*GSC*.xml"])
wkt_footprint = extract_footprint(gsc_files)
if wkt_footprint is not None:
sr = osr.SpatialReference()
sr.ImportFromEPSG(4326)
geom = ogr.CreateGeometryFromWkt(wkt_footprint)
# if multipolygon or invalid geom extract convex hull
if not geom.IsValid() or geom.GetGeometryType() == 6:
geom = geom.ConvexHull()
polygon = geom
# need polygon with 4 points to match with 4 border gcps
if polygon.GetPointCount() != 5:
polygon = GetOgrEnvelope(polygon)
linear_ring = polygon.GetGeometryRef(0)
gcps = []
tl, bl, br, tr = [
(linear_ring.GetPoint(i)[0], linear_ring.GetPoint(i)[1])
for i in range(0, 4)
]
np_array = np.array(ds.GetRasterBand(1).ReadAsArray())
x_size = ds.RasterXSize
y_size = ds.RasterYSize
found = False
# find topleft
if no_data is not None:
# find first non-nodatavalue pixel in cols/rows
for i, row in enumerate(np_array):
if not found:
for j, col in enumerate(row):
if col != no_data:
gcps.append(gdal.GCP(tl[0], tl[1], 0, j + 0.5, i + 0.5))
found = True
break
else:
break
# find bottomright
found = False
for i, row in enumerate(np_array[::-1]):
if not found:
for j, col in enumerate(row):
if col != no_data:
gcps.append(
gdal.GCP(br[0], br[1], 0, j + 0.5, y_size - i - 0.5)
)
found = True
break
else:
break
# find bottomleft
transpose = np_array.transpose()
found = False
for i, col in enumerate(transpose):
if not found:
for j, row in enumerate(col):
if row != no_data:
gcps.append(gdal.GCP(bl[0], bl[1], 0, i + 0.5, j + 0.5))
found = True
break
else:
break
found = False
# find bottomright
for i, col in enumerate(transpose[::-1]):
if not found:
for j, row in enumerate(col):
if row != no_data:
gcps.append(
gdal.GCP(tr[0], tr[1], 0, x_size - i - 0.5, j + 0.5)
)
found = True
break
else:
break
else:
# do not consider nodata areas and just take corners
gcps.append(gdal.GCP(tl[0], tl[1], 0, 0.5, 0.5))
gcps.append(gdal.GCP(br[0], br[1], 0, x_size - 0.5, y_size - 0.5))
gcps.append(gdal.GCP(bl[0], bl[1], 0, 0.5, y_size - 0.5))
gcps.append(gdal.GCP(tr[0], tr[1], 0, x_size - 0.5, 0.5))
ds.SetGCPs(gcps, sr.ExportToWkt())
geotransform = gdal.GCPsToGeoTransform(ds.GetGCPs())
ds.SetGeoTransform(geotransform)
FUNCTION_MAP: Dict[str, Callable] = {
"gcp": gcp_georef,
"rpc": rpc_georef,
"world": world_georef,
"corners": corner_georef,
"gsc_footprint": gsc_footprint_georef,
"no_op": no_op,
"fix_geotrans": fix_geotrans,
}