Skip to content

Commit

Permalink
Merge pull request #14 from ynput/enhancement/update-functions
Browse files Browse the repository at this point in the history
Chore: Update functions
  • Loading branch information
iLLiCiTiT authored Dec 11, 2024
2 parents f0316a1 + e00b8c7 commit ecb5fb7
Showing 1 changed file with 170 additions and 12 deletions.
182 changes: 170 additions & 12 deletions client/ayon_third_party/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,26 @@
import datetime
import subprocess
import copy
import hashlib
import zipfile
import tarfile

import ayon_api

from ayon_common import (
get_ayon_appdirs,
validate_file_checksum,
extract_archive_file,
)
try:
from ayon_core.lib import get_launcher_storage_dir
except ImportError:
from ayon_core.lib import get_ayon_appdirs as get_launcher_storage_dir

from .version import __version__
from .constants import ADDON_NAME

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
DOWNLOAD_DIR = os.path.join(CURRENT_DIR, "downloads")
NOT_SET = type("NOT_SET", (), {"__bool__": lambda: False})()

IMPLEMENTED_ARCHIVE_FORMATS = {
".zip", ".tar", ".tgz", ".tar.gz", ".tar.xz", ".tar.bz2"
}

class _OIIOArgs:
download_needed = None
Expand Down Expand Up @@ -48,6 +52,159 @@ class _ThirdPartyCache:
addon_settings = NOT_SET


class ZipFileLongPaths(zipfile.ZipFile):
"""Allows longer paths in zip files.
Regular DOS paths are limited to MAX_PATH (260) characters, including
the string's terminating NUL character.
That limit can be exceeded by using an extended-length path that
starts with the '\\?\' prefix.
"""
_is_windows = platform.system().lower() == "windows"

def _extract_member(self, member, tpath, pwd):
if self._is_windows:
tpath = os.path.abspath(tpath)
if tpath.startswith("\\\\"):
tpath = "\\\\?\\UNC\\" + tpath[2:]
else:
tpath = "\\\\?\\" + tpath

return super()._extract_member(member, tpath, pwd)


def calculate_file_checksum(filepath, checksum_algorithm, chunk_size=10000):
"""Calculate file checksum for given algorithm.
Args:
filepath (str): Path to a file.
checksum_algorithm (str): Algorithm to use. ('md5', 'sha1', 'sha256')
chunk_size (Optional[int]): Chunk size to read file.
Defaults to 10000.
Returns:
str: Calculated checksum.
Raises:
ValueError: File not found or unknown checksum algorithm.
"""

if not filepath:
raise ValueError("Filepath is empty.")

if not os.path.exists(filepath):
raise ValueError("{} doesn't exist.".format(filepath))

if not os.path.isfile(filepath):
raise ValueError("{} is not a file.".format(filepath))

func = getattr(hashlib, checksum_algorithm, None)
if func is None:
raise ValueError(
"Unknown checksum algorithm '{}'".format(checksum_algorithm)
)

hash_obj = func()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
hash_obj.update(chunk)
return hash_obj.hexdigest()


def validate_file_checksum(filepath, checksum, checksum_algorithm):
"""Validate file checksum.
Args:
filepath (str): Path to file.
checksum (str): Hash of file.
checksum_algorithm (str): Type of checksum.
Returns:
bool: Hash is valid/invalid.
Raises:
ValueError: File not found or unknown checksum algorithm.
"""
return checksum == calculate_file_checksum(filepath, checksum_algorithm)


def get_archive_ext_and_type(archive_file):
"""Get archive extension and type.
Args:
archive_file (str): Path to archive file.
Returns:
Tuple[str, str]: Archive extension and type.
"""

tmp_name = archive_file.lower()
if tmp_name.endswith(".zip"):
return ".zip", "zip"

for ext in (
".tar",
".tgz",
".tar.gz",
".tar.xz",
".tar.bz2",
):
if tmp_name.endswith(ext):
return ext, "tar"

return None, None


def extract_archive_file(archive_file, dst_folder=None):
"""Extract archived file to a directory.
Args:
archive_file (str): Path to a archive file.
dst_folder (Optional[str]): Directory where content will be extracted.
By default, same folder where archive file is.
"""

if not dst_folder:
dst_folder = os.path.dirname(archive_file)

archive_ext, archive_type = get_archive_ext_and_type(archive_file)

print("Extracting {} -> {}".format(archive_file, dst_folder))
if archive_type is None:
_, ext = os.path.splitext(archive_file)
raise ValueError((
f"Invalid file extension \"{ext}\"."
f" Expected {', '.join(IMPLEMENTED_ARCHIVE_FORMATS)}"
))

if archive_type == "zip":
zip_file = ZipFileLongPaths(archive_file)
zip_file.extractall(dst_folder)
zip_file.close()

elif archive_type == "tar":
if archive_ext == ".tar":
tar_type = "r:"
elif archive_ext.endswith(".xz"):
tar_type = "r:xz"
elif archive_ext.endswith(".gz"):
tar_type = "r:gz"
elif archive_ext.endswith(".bz2"):
tar_type = "r:bz2"
else:
tar_type = "r:*"

try:
tar_file = tarfile.open(archive_file, tar_type)
except tarfile.ReadError:
raise ValueError("corrupted archive")

tar_file.extractall(dst_folder)
tar_file.close()


def get_addon_settings():
if _ThirdPartyCache.addon_settings is NOT_SET:
_ThirdPartyCache.addon_settings = ayon_api.get_addon_settings(
Expand Down Expand Up @@ -125,12 +282,13 @@ def validate_oiio_args(args):


def _get_addon_endpoint():
return "addons/{}/{}".format(ADDON_NAME, __version__)
return f"addons/{ADDON_NAME}/{__version__}"


def _get_info_path(name):
return get_ayon_appdirs(
"addons", "{}-{}.json".format(ADDON_NAME, name))
return get_launcher_storage_dir(
"addons", f"{ADDON_NAME}-{name}.json"
)


def filter_file_info(name):
Expand Down Expand Up @@ -265,7 +423,7 @@ def _fill_ffmpeg_tool_args(tool_name, addon_settings=None):
path_parts = [get_downloaded_ffmpeg_root()]
if platform_name == "windows":
path_parts.append("bin")
tool_name = "{}.exe".format(tool_name)
tool_name = f"{tool_name}.exe"
path_parts.append(tool_name)

args = [
Expand Down Expand Up @@ -330,7 +488,7 @@ def _fill_oiio_tool_args(tool_name, addon_settings=None):
if platform_name == "linux":
path_parts.append("bin")
elif platform_name == "windows":
tool_name = "{}.exe".format(tool_name)
tool_name = f"{tool_name}.exe"
path_parts.append(tool_name)

args = [
Expand Down Expand Up @@ -359,7 +517,7 @@ def _fill_oiio_tool_args(tool_name, addon_settings=None):
try:
root = root.format(**format_data)
except (ValueError, KeyError):
print("Failed to format root '{}'".format(root))
print(f"Failed to format root '{root}'")

if os.path.exists(root):
filtered_roots.append(root)
Expand Down

0 comments on commit ecb5fb7

Please sign in to comment.