import os
from os.path import basename, join
import structlog
from itertools import groupby
import re
from typing import List
from ..util import replace_ext, gdal, get_all_data_files
logger = structlog.getLogger(__name__)
[docs]def stack_bands_step(
source_dir: os.PathLike,
target_dir: os.PathLike,
preprocessor_config: dict,
group_by: str = None,
sort_by: str = None,
order: List[str] = None,
data_file_globs: List[str] = [],
):
"""Stack bands of the individual images"""
filenames = get_all_data_files(source_dir, preprocessor_config, data_file_globs)
groups = create_groups(group_by, filenames)
for groupname, group in groups.items():
# check if a sort_by is specified. if yes, use the sort_by regex group
# and optionally a ordered list to order the filenames
group = handle_group_sort(sort_by, order, group, groupname)
vrt_filename = replace_ext(join(target_dir, groupname), ".vrt")
# correct rotated geotransforms as those prevent vrt creation
group_new = remove_rotated_geotransform(group, target_dir)
logger.debug("Group contents %s" % group_new)
# build a VRT to stack bands for each group
gdal.BuildVRT(vrt_filename, group_new, separate=True)
[docs]def handle_group_sort(sort_by, order, group, groupname):
if sort_by:
logger.debug("Handling group before sort %s" % groupname)
re_sort_by = re.compile(sort_by)
if order:
group = [
v
for v in group
if re_sort_by.match(v) and re_sort_by.match(v).group(1) in order
]
group = sorted(
group, key=lambda v: order.index(re_sort_by.match(v).group(1))
)
else:
group = sorted(group, key=lambda v: re_sort_by.match(v).group(1))
return group
[docs]def create_groups(group_by, filenames):
"""
Creates groups of files based on group_by configuration
check if we have a group_by regex. If yes, use the first
re-group to group by.
Fallback is basename of file as the only groupname
"""
if group_by:
re_group_by = re.compile(group_by)
filenames.sort(key=lambda v: re_group_by.match(v).group(1))
groups = {
k: list(v)
for k, v in groupby(filenames, key=lambda v: re_group_by.match(v).group(1))
}
else:
groups = {basename(filenames[0]): filenames}
return groups