Source code for preprocessor.archive

import os.path
import io
from typing import List, Union, BinaryIO
import tarfile
import zipfile
import structlog
from fnmatch import translate
import re

logger = structlog.getLogger(__name__)

depth = 0


[docs]def filter_filenames(filenames: List[str], glob: str, case: bool = False) -> List[str]: regex = translate(glob) if case: reobj = re.compile(regex) else: reobj = re.compile(regex, re.IGNORECASE) return [filename for filename in filenames if reobj.match(filename)]
[docs]def is_tarfile(archive_file) -> bool: """Helper to detect whether a path or a file object is referencing a valid TAR file. """ try: return tarfile.is_tarfile(archive_file) except TypeError: pass try: tarfile.open(fileobj=archive_file) return True except (TypeError, tarfile.ReadError): return False
[docs]def open_tarfile(archive_file: Union[str, bytes]) -> tarfile.TarFile: """Open a TAR file from either a path or a file object.""" if isinstance(archive_file, (BinaryIO, io.BufferedReader)): return tarfile.open(fileobj=archive_file) return tarfile.open(archive_file)
[docs]def unpack_files( archive_path, target_dir: str, glob=None, case=None, filenames=None, recursive=False, ) -> List[str]: """Unpacks the contents of the specified ZIP or TAR archive to the given target directory. Optionally, only a given list of filenames will be extracted. When a glob is passed, all filenames (either given or from the archive) will be filtered and only the matching files will be extracted. """ global depth logger.debug("Reaching unpack files, depth: %s" % depth) ARCHIVE_EXTENSIONS = ["zip", "tar", "tar.bz2", "tar.gz", "tgz"] iszip = False istar = False if case: ARCHIVE_EXTENSIONS = [ "ZIP", "zip", "TAR", "tar", "TAR.BZ2", "tar.bz2", "TAR.GZ", "tar.gz", ] logger.debug("Handling archive path %s", archive_path) # open the archive and extract a list of filenames if is_tarfile(archive_path): archive = open_tarfile(archive_path) all_filenames = archive.getnames() filenames = filenames or all_filenames istar = True elif zipfile.is_zipfile(archive_path): zip_archive = zipfile.ZipFile(archive_path) all_filenames = zip_archive.namelist() filenames = filenames or all_filenames iszip = True else: raise Exception("Cannot open archive %s", archive_path) # filter the filenames when a glob is passed if glob: filenames = filter_filenames(filenames, glob, case) extracted_filenames = [] # extract the files to the target directory if istar: members = [ member for member in archive.getmembers() if member.name in filenames ] logger.debug("Extracting tar files: %s" % members) archive.extractall(target_dir, members) extracted_filenames.extend( [os.path.join(target_dir, member.name) for member in members] ) elif iszip: logger.debug("Extracting zip files: %s" % filenames) zip_archive.extractall(target_dir, filenames) extracted_filenames.extend( [os.path.join(target_dir, filename) for filename in filenames] ) # go into the sub-archives to extract files if recursive: for extension in ARCHIVE_EXTENSIONS: sub_archives = filter_filenames(all_filenames, "*.%s" % extension) for sub_archive in sub_archives: sub_archive_filename = os.path.join( os.path.dirname(archive_path), os.path.basename(sub_archive), ) if not os.path.exists(sub_archive_filename): logger.debug("Extracting archive %s" % sub_archive_filename) if istar: archive.extract(archive.getmember(sub_archive)) os.rename(sub_archive, sub_archive_filename) if iszip: zip_archive.extract(sub_archive) os.rename(sub_archive, sub_archive_filename) logger.debug("Unpacking nested archive %s" % sub_archive_filename) logger.debug("Increasing depth +1") depth = depth + 1 sub_filenames = unpack_files( sub_archive_filename, os.path.join(target_dir, sub_archive), glob, case, filenames, recursive, ) logger.debug("Decreasing depth -1") depth = depth - 1 extracted_filenames.extend(sub_filenames) # return a list of files extracted return extracted_filenames