import functools
import logging
import os
import shutil
import tarfile
import tempfile
from typing import Any, Dict, Iterable, Optional, Sequence
import mgzip
logger = logging.getLogger(__name__)
[docs]def class_repr(cls, filter_types: Sequence[Any] = [], **kwargs) -> str:
attributes = class_attributes(cls, filter_types=filter_types)
kwargstring = kv_string((kwargs, attributes))
return f"{cls.__class__.__name__}({kwargstring})"
[docs]def kv_string(dicts: Iterable[Dict[Any, Any]]) -> str:
return ",".join(f"{k}={v}" for d in dicts for k, v in d.items())
[docs]def class_attributes(cls, filter_types: Sequence[Any] = (int, float, bool, str)) -> Dict[str, Any]:
return {
k: v for k, v in vars(cls).items() if (type(v) in filter_types or not filter_types) and len(v.__repr__()) < 80
}
[docs]def basic_attribute_repr(cls):
@functools.wraps(cls, updated=())
class ReprDecorated(cls):
def __repr__(self) -> str:
return class_repr(self)
return ReprDecorated
[docs]def human_bytes(bytes: float) -> str:
"""Human readable string representation of bytes"""
units = "bytes"
if bytes > 1024:
units = "KiB"
bytes = bytes / 1024
if bytes > 1024:
units = "MiB"
bytes = bytes / 1024
if bytes > 1024:
units = "GiB"
bytes = bytes / 1024
return f"%.1f {units}" % bytes
[docs]def filesize(filename: str) -> str:
"""Human readable string representation of filesize"""
if not os.path.exists(filename):
logger.warn(f"File {filename} does not exist")
return human_bytes(0)
return human_bytes(os.path.getsize(filename))
[docs]def tar_files(
output_filename: str,
files: Iterable[str],
file_arcnames: Iterable[str],
delete_added: bool = True,
) -> str:
"""Tar files."""
with tarfile.open(output_filename, mode="w") as tarf:
for file, arcname in zip(files, file_arcnames):
tarf.add(file, arcname=arcname)
if delete_added:
os.remove(file)
return output_filename
[docs]def untar(input_tarfile: str, members: Iterable[str], output_dir: str) -> Iterable[str]:
"""Untar an tarball."""
output_filenames = []
with tarfile.open(input_tarfile, mode="r") as tarf:
for member in members:
out_filename = os.path.join(output_dir, member)
logger.info(f"Extracting {member} to {out_filename}...")
tarf.extract(member, path=output_dir)
output_filenames.append(out_filename)
return output_filenames
[docs]def gzip_file(
output_filename: str,
input_filename: str,
compress_level: int = 9,
blocksize: int = 5 * 1024 * 1024,
threads: Optional[int] = None,
) -> str:
"""Gzip a file using mgzip for multithreading."""
with mgzip.open(
output_filename, mode="wb", compresslevel=compress_level, blocksize=blocksize, thread=threads
) as f_out:
with open(input_filename, "rb") as f_in:
shutil.copyfileobj(f_in, f_out, length=blocksize // 2)
return output_filename
[docs]def gunzip_file(
input_filename: str,
output_filename: str,
blocksize: int = 5 * 1024 * 1024,
threads: Optional[int] = None,
) -> str:
"""Gzip a file using mgzip for multithreading."""
with open(output_filename, mode="wb") as f_out:
with mgzip.open(input_filename, mode="rb", blocksize=blocksize, thread=threads) as f_in:
shutil.copyfileobj(f_in, f_out, length=blocksize // 2)
return output_filename