import os.path
import io
from typing import List, Union, BinaryIO
import tarfile
import zipfile
import structlog
from fnmatch import translate
import re
logger = structlog.getLogger(__name__)
depth = 0
[docs]def filter_filenames(filenames: List[str], glob: str, case: bool = False) -> List[str]:
regex = translate(glob)
if case:
reobj = re.compile(regex)
else:
reobj = re.compile(regex, re.IGNORECASE)
return [filename for filename in filenames if reobj.match(filename)]
[docs]def is_tarfile(archive_file) -> bool:
"""Helper to detect whether a path or a file object is
referencing a valid TAR file.
"""
try:
return tarfile.is_tarfile(archive_file)
except TypeError:
pass
try:
tarfile.open(fileobj=archive_file)
return True
except (TypeError, tarfile.ReadError):
return False
[docs]def open_tarfile(archive_file: Union[str, bytes]) -> tarfile.TarFile:
"""Open a TAR file from either a path or a file object."""
if isinstance(archive_file, (BinaryIO, io.BufferedReader)):
return tarfile.open(fileobj=archive_file)
return tarfile.open(archive_file)
[docs]def unpack_files(
archive_path,
target_dir: str,
glob=None,
case=None,
filenames=None,
recursive=False,
) -> List[str]:
"""Unpacks the contents of the specified ZIP or TAR archive to the
given target directory. Optionally, only a given list of filenames
will be extracted.
When a glob is passed, all filenames (either given or from the archive)
will be filtered and only the matching files will be extracted.
"""
global depth
logger.debug("Reaching unpack files, depth: %s" % depth)
ARCHIVE_EXTENSIONS = ["zip", "tar", "tar.bz2", "tar.gz", "tgz"]
iszip = False
istar = False
if case:
ARCHIVE_EXTENSIONS = [
"ZIP",
"zip",
"TAR",
"tar",
"TAR.BZ2",
"tar.bz2",
"TAR.GZ",
"tar.gz",
]
logger.debug("Handling archive path %s", archive_path)
# open the archive and extract a list of filenames
if is_tarfile(archive_path):
archive = open_tarfile(archive_path)
all_filenames = archive.getnames()
filenames = filenames or all_filenames
istar = True
elif zipfile.is_zipfile(archive_path):
zip_archive = zipfile.ZipFile(archive_path)
all_filenames = zip_archive.namelist()
filenames = filenames or all_filenames
iszip = True
else:
raise Exception("Cannot open archive %s", archive_path)
# filter the filenames when a glob is passed
if glob:
filenames = filter_filenames(filenames, glob, case)
extracted_filenames = []
# extract the files to the target directory
if istar:
members = [
member for member in archive.getmembers() if member.name in filenames
]
logger.debug("Extracting tar files: %s" % members)
archive.extractall(target_dir, members)
extracted_filenames.extend(
[os.path.join(target_dir, member.name) for member in members]
)
elif iszip:
logger.debug("Extracting zip files: %s" % filenames)
zip_archive.extractall(target_dir, filenames)
extracted_filenames.extend(
[os.path.join(target_dir, filename) for filename in filenames]
)
# go into the sub-archives to extract files
if recursive:
for extension in ARCHIVE_EXTENSIONS:
sub_archives = filter_filenames(all_filenames, "*.%s" % extension)
for sub_archive in sub_archives:
sub_archive_filename = os.path.join(
os.path.dirname(archive_path),
os.path.basename(sub_archive),
)
if not os.path.exists(sub_archive_filename):
logger.debug("Extracting archive %s" % sub_archive_filename)
if istar:
archive.extract(archive.getmember(sub_archive))
os.rename(sub_archive, sub_archive_filename)
if iszip:
zip_archive.extract(sub_archive)
os.rename(sub_archive, sub_archive_filename)
logger.debug("Unpacking nested archive %s" % sub_archive_filename)
logger.debug("Increasing depth +1")
depth = depth + 1
sub_filenames = unpack_files(
sub_archive_filename,
os.path.join(target_dir, sub_archive),
glob,
case,
filenames,
recursive,
)
logger.debug("Decreasing depth -1")
depth = depth - 1
extracted_filenames.extend(sub_filenames)
# return a list of files extracted
return extracted_filenames