Skip to content

Commit

Permalink
feat(JA): Add a fast mode for snapshot diff which compares using time…
Browse files Browse the repository at this point in the history
…stamp and file size. Useful for fast snapshots like on worker (#463)

Signed-off-by: David Leong <[email protected]>
  • Loading branch information
leongdl authored Oct 9, 2024
1 parent 9f36fc7 commit 1aa23b3
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 61 deletions.
50 changes: 49 additions & 1 deletion src/deadline/job_attachments/_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import concurrent.futures

import os
from pathlib import Path
from pathlib import Path, PurePosixPath
from typing import Dict, List, Tuple
from deadline.client.cli._groups.click_logger import ClickLogger
from deadline.client.config import config_file
from deadline.client.exceptions import NonValidInputError
from deadline.job_attachments.asset_manifests.base_manifest import (
Expand Down Expand Up @@ -113,3 +114,50 @@ def compare_manifest(
differences.append((FileStatus.DELETED, manifest_path))

return differences


def _fast_file_list_to_manifest_diff(
root: str, current_files: List[str], diff_manifest: BaseAssetManifest, logger: ClickLogger
) -> List[str]:
"""
Perform a fast difference of the current list of files to a previous manifest to diff against using time stamps and file sizes.
:param root: Root folder of files to diff against.
:param current_files: List of files to compare with.
:param diff_manifest: Manifest containing files to diff against.
:return List[str]: List of files that are new, or modified.
"""
changed_paths: List[str] = []
input_files_map: Dict[str, BaseManifestPath] = {}
for input_file in diff_manifest.paths:
# Normalize paths so we can compare different OSes
normalized_path = os.path.normpath(input_file.path)
input_files_map[normalized_path] = input_file

# Iterate for each file that we found in glob.
for local_file in current_files:
# Get the file's time stamp and size. We want to compare both.
# From enabling CRT, sometimes timestamp update can fail.
local_file_path = Path(local_file)
file_stat = local_file_path.stat()

# Compare the glob against the relative path we store in the manifest.
root_relative_path = str(PurePosixPath(*local_file_path.relative_to(root).parts))
if root_relative_path not in input_files_map:
# This is a new file
logger.echo(f"Found difference at: {root_relative_path}, Status: FileStatus.NEW")
changed_paths.append(local_file)
else:
# This is a modified file, compare with manifest relative timestamp.
input_file = input_files_map[root_relative_path]
# Check file size first as it is easier to test. Usually modified files will also have size diff.
if file_stat.st_size != input_file.size:
changed_paths.append(local_file)
logger.echo(
f"Found size difference at: {root_relative_path}, Status: FileStatus.MODIFIED"
)
elif int(file_stat.st_mtime_ns // 1000) != input_file.mtime:
changed_paths.append(local_file)
logger.echo(
f"Found time difference at: {root_relative_path}, Status: FileStatus.MODIFIED"
)
return changed_paths
107 changes: 47 additions & 60 deletions src/deadline/job_attachments/api/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,23 @@
import os
from typing import List, Optional, Tuple


from deadline.client.api._job_attachment import _hash_attachments
from deadline.client.cli._common import _ProgressBarCallbackManager
from deadline.client.cli._groups.click_logger import ClickLogger
from deadline.job_attachments._diff import compare_manifest
from deadline.job_attachments._diff import _fast_file_list_to_manifest_diff, compare_manifest
from deadline.job_attachments._glob import _process_glob_inputs, _glob_paths
from deadline.job_attachments.asset_manifests._create_manifest import (
_create_manifest_for_single_root,
)
from deadline.job_attachments.asset_manifests.base_manifest import (
BaseManifestPath,
)
from deadline.job_attachments.asset_manifests.decode import decode_manifest
from deadline.job_attachments.exceptions import ManifestCreationException
from deadline.job_attachments.asset_manifests.hash_algorithms import hash_data
from deadline.job_attachments.models import (
FileStatus,
GlobConfig,
JobAttachmentS3Settings,
ManifestSnapshot,
default_glob_all,
)
from deadline.job_attachments.upload import S3AssetManager

"""
APIs here should be business logic only. It should perform one thing, and one thing well.
Expand All @@ -39,6 +37,7 @@ def _manifest_snapshot(
exclude: Optional[List[str]] = None,
include_exclude_config: Optional[str] = None,
diff: Optional[str] = None,
force_rehash: bool = False,
logger: ClickLogger = ClickLogger(False),
) -> Optional[ManifestSnapshot]:

Expand All @@ -54,80 +53,68 @@ def _manifest_snapshot(
# Default, include all.
glob_config = GlobConfig()

inputs = _glob_paths(root, include=glob_config.include_glob, exclude=glob_config.exclude_glob)

# Placeholder Asset Manager
asset_manager = S3AssetManager(
farm_id=" ", queue_id=" ", job_attachment_settings=JobAttachmentS3Settings(" ", " ")
current_files = _glob_paths(
root, include=glob_config.include_glob, exclude=glob_config.exclude_glob
)

hash_callback_manager = _ProgressBarCallbackManager(length=100, label="Hashing Attachments")

upload_group = asset_manager.prepare_paths_for_upload(
input_paths=inputs, output_paths=[root], referenced_paths=[]
)
# We only provided 1 root path, so output should only have 1 group.
assert len(upload_group.asset_groups) == 1

if upload_group.asset_groups:
_, manifests = _hash_attachments(
asset_manager=asset_manager,
asset_groups=upload_group.asset_groups,
total_input_files=upload_group.total_input_files,
total_input_bytes=upload_group.total_input_bytes,
print_function_callback=logger.echo,
hashing_progress_callback=hash_callback_manager.callback,
# Compute the output manifest immediately and hash.
if not diff:
output_manifest = _create_manifest_for_single_root(
files=current_files, root=root, logger=logger
)

if not manifests or len(manifests) == 0:
logger.echo("No manifest generated")
return None

# This is a hard failure, we are snapshotting 1 directory.
assert len(manifests) == 1
output_manifest = manifests[0].asset_manifest
if output_manifest is None:
raise ManifestCreationException()
if not output_manifest:
return None

# If this is a diff manifest, load the supplied manifest file.
if diff:
else:
# Parse local manifest
with open(diff) as source_diff:
source_manifest_str = source_diff.read()
source_manifest = decode_manifest(source_manifest_str)

# Get the differences
changed_paths: List[str] = []
differences: List[Tuple[FileStatus, BaseManifestPath]] = compare_manifest(
source_manifest, output_manifest
)
for diff_item in differences:
if diff_item[0] == FileStatus.MODIFIED or diff_item[0] == FileStatus.NEW:
full_diff_path = f"{root}/{diff_item[1].path}"
changed_paths.append(full_diff_path)
logger.echo(f"Found difference at: {full_diff_path}, Status: {diff_item[0]}")

# Fast comparison using time stamps and sizes.
if not force_rehash:
changed_paths = _fast_file_list_to_manifest_diff(
root, current_files, source_manifest, logger
)
else:
# In "slow / thorough" mode, we check by hash, which is definitive.
output_manifest = _create_manifest_for_single_root(
files=current_files, root=root, logger=logger
)
if not output_manifest:
return None
differences: List[Tuple[FileStatus, BaseManifestPath]] = compare_manifest(
source_manifest, output_manifest
)
for diff_item in differences:
if diff_item[0] == FileStatus.MODIFIED or diff_item[0] == FileStatus.NEW:
full_diff_path = f"{root}/{diff_item[1].path}"
changed_paths.append(full_diff_path)
logger.echo(f"Found difference at: {full_diff_path}, Status: {diff_item[0]}")

# If there were no files diffed, return None, there was nothing to snapshot.
if len(changed_paths) == 0:
return None

# Since the files are already hashed, we can easily re-use has_attachments to remake a diff manifest.
diff_group = asset_manager.prepare_paths_for_upload(
input_paths=changed_paths, output_paths=[root], referenced_paths=[]
)
_, diff_manifests = _hash_attachments(
asset_manager=asset_manager,
asset_groups=diff_group.asset_groups,
total_input_files=diff_group.total_input_files,
total_input_bytes=diff_group.total_input_bytes,
print_function_callback=logger.echo,
hashing_progress_callback=hash_callback_manager.callback,
output_manifest = _create_manifest_for_single_root(
files=changed_paths, root=root, logger=logger
)
output_manifest = diff_manifests[0].asset_manifest
if not output_manifest:
return None

# Write created manifest into local file, at the specified location at destination
if output_manifest is not None:

# Encode the root path as
root_hash: str = hash_data(root.encode("utf-8"), output_manifest.get_default_hash_alg())
timestamp = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
manifest_name = name if name else root.replace("/", "_")
manifest_name = manifest_name[1:] if manifest_name[0] == "_" else manifest_name
manifest_name = f"{manifest_name}-{timestamp}.manifest"
manifest_name = f"{manifest_name}-{root_hash}-{timestamp}.manifest"

local_manifest_file = os.path.join(destination, manifest_name)
os.makedirs(os.path.dirname(local_manifest_file), exist_ok=True)
Expand Down
59 changes: 59 additions & 0 deletions src/deadline/job_attachments/asset_manifests/_create_manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from typing import List, Optional

from deadline.client.api._job_attachment import _hash_attachments
from deadline.client.cli._common import _ProgressBarCallbackManager
from deadline.client.cli._groups.click_logger import ClickLogger
from deadline.job_attachments.asset_manifests.base_manifest import BaseAssetManifest
from deadline.job_attachments.exceptions import ManifestCreationException
from deadline.job_attachments.models import JobAttachmentS3Settings
from deadline.job_attachments.upload import S3AssetManager


def _create_manifest_for_single_root(
files: List[str], root: str, logger: ClickLogger
) -> Optional[BaseAssetManifest]:
"""
Shared logic to create a manifest file from a single root.
:param files: Input files to create a manifest with.
:param root: Asset root of the files.
:param logger: Click logger for stdout.
:return
"""
# Placeholder Asset Manager
asset_manager = S3AssetManager(
farm_id=" ", queue_id=" ", job_attachment_settings=JobAttachmentS3Settings(" ", " ")
)

hash_callback_manager = _ProgressBarCallbackManager(length=100, label="Hashing Attachments")

upload_group = asset_manager.prepare_paths_for_upload(
input_paths=files, output_paths=[root], referenced_paths=[]
)
# We only provided 1 root path, so output should only have 1 group.
assert len(upload_group.asset_groups) == 1

if upload_group.asset_groups:
_, manifests = _hash_attachments(
asset_manager=asset_manager,
asset_groups=upload_group.asset_groups,
total_input_files=upload_group.total_input_files,
total_input_bytes=upload_group.total_input_bytes,
print_function_callback=logger.echo,
hashing_progress_callback=hash_callback_manager.callback,
)

if not manifests or len(manifests) == 0:
logger.echo("No manifest generated")
return None
else:
# This is a hard failure, we are snapshotting 1 directory.
assert len(manifests) == 1

output_manifest = manifests[0].asset_manifest
if output_manifest is None:
raise ManifestCreationException()

# Return the generated manifest.
return output_manifest
89 changes: 89 additions & 0 deletions test/unit/deadline_job_attachments/api/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,95 @@ def test_snapshot_diff(self, temp_dir):

assert second_test_file_name in files

def test_snapshot_time_diff(self, temp_dir):
"""
Create a snapshot with 1 file. Change the time stamp of the file.
The diff manifest should contain the file again.
"""

# Given snapshot folder and 1 test file
root_dir = os.path.join(temp_dir, "snapshot")

test_file_name = "test_file"
test_file = os.path.join(root_dir, test_file_name)
os.makedirs(os.path.dirname(test_file), exist_ok=True)
with open(test_file, "w") as f:
f.write("testing123")

# When
manifest: Optional[ManifestSnapshot] = _manifest_snapshot(
root=root_dir, destination=temp_dir, name="test"
)

# Then
assert manifest is not None
assert manifest.manifest is not None

# Given the file's timestamp is updated.
os.utime(test_file, (1234567890, 1234567890))

# When snapshot again.
diffed_manifest: Optional[ManifestSnapshot] = _manifest_snapshot(
root=root_dir, destination=temp_dir, name="test", diff=manifest.manifest
)

# Then. We should find the file again.
assert diffed_manifest is not None
assert diffed_manifest.manifest is not None
with open(diffed_manifest.manifest, "r") as diff_manifest_file:
manifest_payload = json.load(diff_manifest_file)
assert len(manifest_payload["paths"]) == 1
files = set()
for item in manifest_payload["paths"]:
files.add(item["path"])

assert test_file_name in files

def test_snapshot_size_diff(self, temp_dir):
"""
Create a snapshot with 1 file. Change the contents of the file.
The diff manifest should contain the file again.
"""

# Given snapshot folder and 1 test file
root_dir = os.path.join(temp_dir, "snapshot")

test_file_name = "test_file"
test_file = os.path.join(root_dir, test_file_name)
os.makedirs(os.path.dirname(test_file), exist_ok=True)
with open(test_file, "w") as f:
f.write("testing123")

# When
manifest: Optional[ManifestSnapshot] = _manifest_snapshot(
root=root_dir, destination=temp_dir, name="test"
)

# Then
assert manifest is not None
assert manifest.manifest is not None

# Given the file's contents is updated.
with open(test_file, "w") as f:
f.write("testing123testing123testing123")

# When snapshot again.
diffed_manifest: Optional[ManifestSnapshot] = _manifest_snapshot(
root=root_dir, destination=temp_dir, name="test", diff=manifest.manifest
)

# Then. We should find the file again.
assert diffed_manifest is not None
assert diffed_manifest.manifest is not None
with open(diffed_manifest.manifest, "r") as diff_manifest_file:
manifest_payload = json.load(diff_manifest_file)
assert len(manifest_payload["paths"]) == 1
files = set()
for item in manifest_payload["paths"]:
files.add(item["path"])

assert test_file_name in files

def test_snapshot_diff_no_diff(self, temp_dir):
"""
Create a snapshot with 1 file. Snapshot again and diff. It should have no manifest.
Expand Down

0 comments on commit 1aa23b3

Please sign in to comment.