Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: Update functions #14

Merged
merged 5 commits into from
Dec 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 170 additions & 12 deletions client/ayon_third_party/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,26 @@
import datetime
import subprocess
import copy
import hashlib
import zipfile
import tarfile

import ayon_api

from ayon_common import (
get_ayon_appdirs,
validate_file_checksum,
extract_archive_file,
)
try:
from ayon_core.lib import get_launcher_storage_dir
except ImportError:
from ayon_core.lib import get_ayon_appdirs as get_launcher_storage_dir

from .version import __version__
from .constants import ADDON_NAME

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
DOWNLOAD_DIR = os.path.join(CURRENT_DIR, "downloads")
NOT_SET = type("NOT_SET", (), {"__bool__": lambda: False})()

IMPLEMENTED_ARCHIVE_FORMATS = {
".zip", ".tar", ".tgz", ".tar.gz", ".tar.xz", ".tar.bz2"
}

class _OIIOArgs:
download_needed = None
Expand Down Expand Up @@ -48,6 +52,159 @@ class _ThirdPartyCache:
addon_settings = NOT_SET


class ZipFileLongPaths(zipfile.ZipFile):
"""Allows longer paths in zip files.
Regular DOS paths are limited to MAX_PATH (260) characters, including
the string's terminating NUL character.
That limit can be exceeded by using an extended-length path that
starts with the '\\?\' prefix.
"""
_is_windows = platform.system().lower() == "windows"

def _extract_member(self, member, tpath, pwd):
if self._is_windows:
tpath = os.path.abspath(tpath)
if tpath.startswith("\\\\"):
tpath = "\\\\?\\UNC\\" + tpath[2:]
else:
tpath = "\\\\?\\" + tpath

return super()._extract_member(member, tpath, pwd)


def calculate_file_checksum(filepath, checksum_algorithm, chunk_size=10000):
"""Calculate file checksum for given algorithm.
Args:
filepath (str): Path to a file.
checksum_algorithm (str): Algorithm to use. ('md5', 'sha1', 'sha256')
chunk_size (Optional[int]): Chunk size to read file.
Defaults to 10000.
Returns:
str: Calculated checksum.
Raises:
ValueError: File not found or unknown checksum algorithm.
"""

if not filepath:
raise ValueError("Filepath is empty.")

if not os.path.exists(filepath):
raise ValueError("{} doesn't exist.".format(filepath))

if not os.path.isfile(filepath):
raise ValueError("{} is not a file.".format(filepath))

func = getattr(hashlib, checksum_algorithm, None)
if func is None:
raise ValueError(
"Unknown checksum algorithm '{}'".format(checksum_algorithm)
)

hash_obj = func()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
hash_obj.update(chunk)
return hash_obj.hexdigest()


def validate_file_checksum(filepath, checksum, checksum_algorithm):
"""Validate file checksum.
Args:
filepath (str): Path to file.
checksum (str): Hash of file.
checksum_algorithm (str): Type of checksum.
Returns:
bool: Hash is valid/invalid.
Raises:
ValueError: File not found or unknown checksum algorithm.
"""
return checksum == calculate_file_checksum(filepath, checksum_algorithm)


def get_archive_ext_and_type(archive_file):
"""Get archive extension and type.
Args:
archive_file (str): Path to archive file.
Returns:
Tuple[str, str]: Archive extension and type.
"""

tmp_name = archive_file.lower()
if tmp_name.endswith(".zip"):
return ".zip", "zip"

for ext in (
".tar",
".tgz",
".tar.gz",
".tar.xz",
".tar.bz2",
):
if tmp_name.endswith(ext):
return ext, "tar"

return None, None


def extract_archive_file(archive_file, dst_folder=None):
"""Extract archived file to a directory.
Args:
archive_file (str): Path to a archive file.
dst_folder (Optional[str]): Directory where content will be extracted.
By default, same folder where archive file is.
"""

if not dst_folder:
dst_folder = os.path.dirname(archive_file)

archive_ext, archive_type = get_archive_ext_and_type(archive_file)

print("Extracting {} -> {}".format(archive_file, dst_folder))
if archive_type is None:
_, ext = os.path.splitext(archive_file)
raise ValueError((
f"Invalid file extension \"{ext}\"."
f" Expected {', '.join(IMPLEMENTED_ARCHIVE_FORMATS)}"
))

if archive_type == "zip":
zip_file = ZipFileLongPaths(archive_file)
zip_file.extractall(dst_folder)
zip_file.close()

elif archive_type == "tar":
if archive_ext == ".tar":
tar_type = "r:"
elif archive_ext.endswith(".xz"):
tar_type = "r:xz"
elif archive_ext.endswith(".gz"):
tar_type = "r:gz"
elif archive_ext.endswith(".bz2"):
tar_type = "r:bz2"
else:
tar_type = "r:*"

try:
tar_file = tarfile.open(archive_file, tar_type)
except tarfile.ReadError:
raise ValueError("corrupted archive")

tar_file.extractall(dst_folder)
tar_file.close()


def get_addon_settings():
if _ThirdPartyCache.addon_settings is NOT_SET:
_ThirdPartyCache.addon_settings = ayon_api.get_addon_settings(
Expand Down Expand Up @@ -125,12 +282,13 @@ def validate_oiio_args(args):


def _get_addon_endpoint():
return "addons/{}/{}".format(ADDON_NAME, __version__)
return f"addons/{ADDON_NAME}/{__version__}"


def _get_info_path(name):
return get_ayon_appdirs(
"addons", "{}-{}.json".format(ADDON_NAME, name))
return get_launcher_storage_dir(
"addons", f"{ADDON_NAME}-{name}.json"
)


def filter_file_info(name):
Expand Down Expand Up @@ -265,7 +423,7 @@ def _fill_ffmpeg_tool_args(tool_name, addon_settings=None):
path_parts = [get_downloaded_ffmpeg_root()]
if platform_name == "windows":
path_parts.append("bin")
tool_name = "{}.exe".format(tool_name)
tool_name = f"{tool_name}.exe"
path_parts.append(tool_name)

args = [
Expand Down Expand Up @@ -330,7 +488,7 @@ def _fill_oiio_tool_args(tool_name, addon_settings=None):
if platform_name == "linux":
path_parts.append("bin")
elif platform_name == "windows":
tool_name = "{}.exe".format(tool_name)
tool_name = f"{tool_name}.exe"
path_parts.append(tool_name)

args = [
Expand Down Expand Up @@ -359,7 +517,7 @@ def _fill_oiio_tool_args(tool_name, addon_settings=None):
try:
root = root.format(**format_data)
except (ValueError, KeyError):
print("Failed to format root '{}'".format(root))
print(f"Failed to format root '{root}'")

if os.path.exists(root):
filtered_roots.append(root)
Expand Down
Loading