Made an application for supporting sustainable local businesses in San Pancho.
Never really got completed, but it has some useful Svelte components for maps that we can reuse.
http://greenspots.dctrl.space
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
173 lines
6.4 KiB
173 lines
6.4 KiB
"""Utilities for extracting common archive formats""" |
|
|
|
import zipfile |
|
import tarfile |
|
import os |
|
import shutil |
|
import posixpath |
|
import contextlib |
|
from distutils.errors import DistutilsError |
|
|
|
from pkg_resources import ensure_directory |
|
|
|
__all__ = [ |
|
"unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter", |
|
"UnrecognizedFormat", "extraction_drivers", "unpack_directory", |
|
] |
|
|
|
|
|
class UnrecognizedFormat(DistutilsError): |
|
"""Couldn't recognize the archive type""" |
|
|
|
|
|
def default_filter(src, dst): |
|
"""The default progress/filter callback; returns True for all files""" |
|
return dst |
|
|
|
|
|
def unpack_archive(filename, extract_dir, progress_filter=default_filter, |
|
drivers=None): |
|
"""Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat`` |
|
|
|
`progress_filter` is a function taking two arguments: a source path |
|
internal to the archive ('/'-separated), and a filesystem path where it |
|
will be extracted. The callback must return the desired extract path |
|
(which may be the same as the one passed in), or else ``None`` to skip |
|
that file or directory. The callback can thus be used to report on the |
|
progress of the extraction, as well as to filter the items extracted or |
|
alter their extraction paths. |
|
|
|
`drivers`, if supplied, must be a non-empty sequence of functions with the |
|
same signature as this function (minus the `drivers` argument), that raise |
|
``UnrecognizedFormat`` if they do not support extracting the designated |
|
archive type. The `drivers` are tried in sequence until one is found that |
|
does not raise an error, or until all are exhausted (in which case |
|
``UnrecognizedFormat`` is raised). If you do not supply a sequence of |
|
drivers, the module's ``extraction_drivers`` constant will be used, which |
|
means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that |
|
order. |
|
""" |
|
for driver in drivers or extraction_drivers: |
|
try: |
|
driver(filename, extract_dir, progress_filter) |
|
except UnrecognizedFormat: |
|
continue |
|
else: |
|
return |
|
else: |
|
raise UnrecognizedFormat( |
|
"Not a recognized archive type: %s" % filename |
|
) |
|
|
|
|
|
def unpack_directory(filename, extract_dir, progress_filter=default_filter): |
|
""""Unpack" a directory, using the same interface as for archives |
|
|
|
Raises ``UnrecognizedFormat`` if `filename` is not a directory |
|
""" |
|
if not os.path.isdir(filename): |
|
raise UnrecognizedFormat("%s is not a directory" % filename) |
|
|
|
paths = { |
|
filename: ('', extract_dir), |
|
} |
|
for base, dirs, files in os.walk(filename): |
|
src, dst = paths[base] |
|
for d in dirs: |
|
paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d) |
|
for f in files: |
|
target = os.path.join(dst, f) |
|
target = progress_filter(src + f, target) |
|
if not target: |
|
# skip non-files |
|
continue |
|
ensure_directory(target) |
|
f = os.path.join(base, f) |
|
shutil.copyfile(f, target) |
|
shutil.copystat(f, target) |
|
|
|
|
|
def unpack_zipfile(filename, extract_dir, progress_filter=default_filter): |
|
"""Unpack zip `filename` to `extract_dir` |
|
|
|
Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined |
|
by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation |
|
of the `progress_filter` argument. |
|
""" |
|
|
|
if not zipfile.is_zipfile(filename): |
|
raise UnrecognizedFormat("%s is not a zip file" % (filename,)) |
|
|
|
with zipfile.ZipFile(filename) as z: |
|
for info in z.infolist(): |
|
name = info.filename |
|
|
|
# don't extract absolute paths or ones with .. in them |
|
if name.startswith('/') or '..' in name.split('/'): |
|
continue |
|
|
|
target = os.path.join(extract_dir, *name.split('/')) |
|
target = progress_filter(name, target) |
|
if not target: |
|
continue |
|
if name.endswith('/'): |
|
# directory |
|
ensure_directory(target) |
|
else: |
|
# file |
|
ensure_directory(target) |
|
data = z.read(info.filename) |
|
with open(target, 'wb') as f: |
|
f.write(data) |
|
unix_attributes = info.external_attr >> 16 |
|
if unix_attributes: |
|
os.chmod(target, unix_attributes) |
|
|
|
|
|
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter): |
|
"""Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir` |
|
|
|
Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined |
|
by ``tarfile.open()``). See ``unpack_archive()`` for an explanation |
|
of the `progress_filter` argument. |
|
""" |
|
try: |
|
tarobj = tarfile.open(filename) |
|
except tarfile.TarError: |
|
raise UnrecognizedFormat( |
|
"%s is not a compressed or uncompressed tar file" % (filename,) |
|
) |
|
with contextlib.closing(tarobj): |
|
# don't do any chowning! |
|
tarobj.chown = lambda *args: None |
|
for member in tarobj: |
|
name = member.name |
|
# don't extract absolute paths or ones with .. in them |
|
if not name.startswith('/') and '..' not in name.split('/'): |
|
prelim_dst = os.path.join(extract_dir, *name.split('/')) |
|
|
|
# resolve any links and to extract the link targets as normal |
|
# files |
|
while member is not None and (member.islnk() or member.issym()): |
|
linkpath = member.linkname |
|
if member.issym(): |
|
base = posixpath.dirname(member.name) |
|
linkpath = posixpath.join(base, linkpath) |
|
linkpath = posixpath.normpath(linkpath) |
|
member = tarobj._getmember(linkpath) |
|
|
|
if member is not None and (member.isfile() or member.isdir()): |
|
final_dst = progress_filter(name, prelim_dst) |
|
if final_dst: |
|
if final_dst.endswith(os.sep): |
|
final_dst = final_dst[:-1] |
|
try: |
|
# XXX Ugh |
|
tarobj._extract_member(member, final_dst) |
|
except tarfile.ExtractError: |
|
# chown/chmod/mkfifo/mknode/makedev failed |
|
pass |
|
return True |
|
|
|
|
|
extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile
|
|
|