Skip to content

Commit

Permalink
chore: refactor sync_inputs to attachment_sync_inputs with decomposed…
Browse files Browse the repository at this point in the history
… helpers (#449)

* chore: refactor sync_inputs to attachment_sync_inputs with decomposed helpers

Signed-off-by: Bian <[email protected]>
  • Loading branch information
godobyte authored Sep 23, 2024
1 parent 41c6486 commit b2a7efa
Show file tree
Hide file tree
Showing 2 changed files with 940 additions and 0 deletions.
341 changes: 341 additions & 0 deletions src/deadline/job_attachments/asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

""" Module for File Attachment synching """
from __future__ import annotations
from dataclasses import dataclass, asdict
import os
import shutil
import sys
Expand Down Expand Up @@ -68,6 +69,18 @@
logger = getLogger("deadline.job_attachments")


@dataclass
class PathMappingRule:
source_path_format: str
"""The path format associated with the source path (windows vs posix)"""

source_path: str
"""The path we're looking to change"""

destination_path: str
"""The path to transform the source path to"""


class AssetSync:
"""Class for managing AWS Deadline Cloud job-level attachments."""

Expand Down Expand Up @@ -105,6 +118,333 @@ def __init__(

self.hash_alg: HashAlgorithm = self.manifest_model.AssetManifest.get_default_hash_alg()

@staticmethod
def generate_dynamic_path_mapping(
session_dir: Path,
attachments: Attachments,
) -> dict[str, PathMappingRule]:
"""
Compute path mapping rules that are relative to the given session directory.
Args:
session_dir: path to the current session directory
attachments: an object that holds all input assets for the job.
Returns: a dictionary of local roots for each asset root, used for path mapping.
"""
mapped_path: dict[str, PathMappingRule] = dict()

for manifest_properties in attachments.manifests:
if not manifest_properties.fileSystemLocationName:
dir_name: str = _get_unique_dest_dir_name(manifest_properties.rootPath)
local_root = str(session_dir.joinpath(dir_name))
mapped_path[manifest_properties.rootPath] = PathMappingRule(
source_path_format=manifest_properties.rootPathFormat.value,
source_path=manifest_properties.rootPath,
destination_path=local_root,
)

return mapped_path

@staticmethod
def get_local_destination(
manifest_properties: ManifestProperties,
dynamic_mapping_rules: dict[str, PathMappingRule] = {},
storage_profiles_path_mapping_rules: dict[str, str] = {},
) -> str:
"""
Args:
manifest_properties: manifest properties to search local destination for.
dynamic_mapping_rules: manifest root path to worker host destination mapping relative to local session.
storage_profiles_path_mapping_rules: a dict of source path -> destination path mappings.
Returns: local destination corresponding to the given manifest properties.
Raises: AssetSyncError If no path mapping rule is found for the given root path.
"""
root_path = manifest_properties.rootPath

if manifest_properties.fileSystemLocationName:
local_destination = storage_profiles_path_mapping_rules.get(root_path)
else:
path_mapping: Optional[PathMappingRule] = dynamic_mapping_rules.get(root_path)
local_destination = path_mapping.destination_path if path_mapping else None

if local_destination:
return local_destination
else:
raise AssetSyncError(
"Error occurred while attempting to sync input files: "
f"No path mapping rule found for the source path {manifest_properties.rootPath}"
)

def aggregate_asset_root_manifests(
self,
session_dir: Path,
s3_settings: JobAttachmentS3Settings,
queue_id: str,
job_id: str,
attachments: Attachments,
step_dependencies: Optional[list[str]] = None,
dynamic_mapping_rules: dict[str, PathMappingRule] = {},
storage_profiles_path_mapping_rules: dict[str, str] = {},
) -> DefaultDict[str, list[BaseAssetManifest]]:
"""
Args:
session_dir: the directory that the session is going to use.
s3_settings: S3-specific Job Attachment settings.
queue_id: the ID of the queue for step-step dependency.
job_id: the ID of the job for step-step dependency.
attachments: an object that holds all input assets for the job.
step_dependencies: the list of Step IDs whose output should be downloaded over the input job attachments.
dynamic_mapping_rules: manifest root path to worker host destination mapping relative to local session.
storage_profiles_path_mapping_rules: manifest root path to worker host destination mapping given storage profile.
Returns: a dictionary of manifest files stored in the session directory.
"""
grouped_manifests_by_root: DefaultDict[str, list[BaseAssetManifest]] = DefaultDict(list)

for manifest_properties in attachments.manifests:
local_root: str = AssetSync.get_local_destination(
manifest_properties=manifest_properties,
dynamic_mapping_rules=dynamic_mapping_rules,
storage_profiles_path_mapping_rules=storage_profiles_path_mapping_rules,
)

if manifest_properties.inputManifestPath:
manifest_s3_key = s3_settings.add_root_and_manifest_folder_prefix(
manifest_properties.inputManifestPath
)
manifest = get_manifest_from_s3(
manifest_key=manifest_s3_key,
s3_bucket=s3_settings.s3BucketName,
session=self.session,
)
grouped_manifests_by_root[local_root].append(manifest)

# Handle step-step dependencies.
if step_dependencies:
for step_id in step_dependencies:
manifests_by_root = get_output_manifests_by_asset_root(
s3_settings,
self.farm_id,
queue_id,
job_id,
step_id=step_id,
session=self.session,
)
for root, manifests in manifests_by_root.items():
dir_name = _get_unique_dest_dir_name(root)
local_root = str(session_dir.joinpath(dir_name))
grouped_manifests_by_root[local_root].extend(manifests)

return grouped_manifests_by_root

def _launch_vfs(
self,
s3_settings: JobAttachmentS3Settings,
session_dir: Path,
fs_permission_settings: Optional[FileSystemPermissionSettings] = None,
merged_manifests_by_root: dict[str, BaseAssetManifest] = dict(),
os_env_vars: dict[str, str] | None = None,
) -> None:
"""
Args:
s3_settings: S3-specific Job Attachment settings.
session_dir: the directory that the session is going to use.
fs_permission_settings: An instance defining group ownership and permission modes
to be set on the downloaded (synchronized) input files and directories.
merged_manifests_by_root: Merged manifests produced by
aggregate_asset_root_manifests()
Returns: None
Raises: VFSExecutableMissingError If VFS is not startable.
"""

try:
VFSProcessManager.find_vfs()
mount_vfs_from_manifests(
s3_bucket=s3_settings.s3BucketName,
manifests_by_root=merged_manifests_by_root,
boto3_session=self.session,
session_dir=session_dir,
fs_permission_settings=fs_permission_settings, # type: ignore[arg-type]
os_env_vars=os_env_vars, # type: ignore[arg-type]
cas_prefix=s3_settings.full_cas_prefix(),
)

except VFSExecutableMissingError:
logger.error(
f"Virtual File System not found, falling back to {JobAttachmentsFileSystem.COPIED} for JobAttachmentsFileSystem."
)

def copied_download(
self,
s3_settings: JobAttachmentS3Settings,
session_dir: Path,
fs_permission_settings: Optional[FileSystemPermissionSettings] = None,
merged_manifests_by_root: dict[str, BaseAssetManifest] = dict(),
on_downloading_files: Optional[Callable[[ProgressReportMetadata], bool]] = None,
) -> SummaryStatistics:
"""
Args:
s3_settings: S3-specific Job Attachment settings.
session_dir: the directory that the session is going to use.
fs_permission_settings: An instance defining group ownership and permission modes
to be set on the downloaded (synchronized) input files and directories.
merged_manifests_by_root: Merged manifests produced by aggregate_asset_root_manifests()
on_downloading_files: Callback when download files from S3.
Returns:
The download summary statistics.
Raises:
JobAttachmentsS3ClientError if any issue is encountered while downloading.
"""
try:
return download_files_from_manifests(
s3_bucket=s3_settings.s3BucketName,
manifests_by_root=merged_manifests_by_root,
cas_prefix=s3_settings.full_cas_prefix(),
fs_permission_settings=fs_permission_settings,
session=self.session,
on_downloading_files=on_downloading_files,
logger=self.logger,
).convert_to_summary_statistics()
except JobAttachmentsS3ClientError as exc:
if exc.status_code == 404:
raise JobAttachmentsS3ClientError(
action=exc.action,
status_code=exc.status_code,
bucket_name=exc.bucket_name,
key_or_prefix=exc.key_or_prefix,
message=(
"This can happen if the S3 check cache on the submitting machine is out of date. "
"Please delete the cache file from the submitting machine, usually located in the "
"home directory (~/.deadline/cache/s3_check_cache.db) and try submitting again."
),
) from exc
else:
raise

def attachment_sync_inputs(
self,
s3_settings: Optional[JobAttachmentS3Settings],
attachments: Optional[Attachments],
queue_id: str,
job_id: str,
session_dir: Path,
fs_permission_settings: Optional[FileSystemPermissionSettings] = None,
storage_profiles_path_mapping_rules: dict[str, str] = {},
step_dependencies: Optional[list[str]] = None,
on_downloading_files: Optional[Callable[[ProgressReportMetadata], bool]] = None,
os_env_vars: Dict[str, str] | None = None,
) -> Tuple[SummaryStatistics, List[Dict[str, str]]]:
"""
Depending on the fileSystem in the Attachments this will perform two
different behaviors:
COPIED / None : downloads a manifest file and corresponding input files, if found.
VIRTUAL: downloads a manifest file and mounts a Virtual File System at the
specified asset root corresponding to the manifest contents
Args:
s3_settings: S3-specific Job Attachment settings.
attachments: an object that holds all input assets for the job.
queue_id: the ID of the queue.
job_id: the ID of the job.
session_dir: the directory that the session is going to use.
fs_permission_settings: An instance defining group ownership and permission modes
to be set on the downloaded (synchronized) input files and directories.
storage_profiles_path_mapping_rules: A dict of source path -> destination path mappings.
If this dict is not empty, it means that the Storage Profile set in the job is
different from the one configured in the Fleet performing the input-syncing.
step_dependencies: the list of Step IDs whose output should be downloaded over the input
job attachments.
on_downloading_files: a function that will be called with a ProgressReportMetadata object
for each file being downloaded. If the function returns False, the download will be
cancelled. If it returns True, the download will continue.
os_env_vars: environment variables to set for launched subprocesses
Returns:
COPIED / None : a tuple of (1) final summary statistics for file downloads,
and (2) a list of local roots for each asset root, used for
path mapping.
VIRTUAL: same as COPIED, but the summary statistics will be empty since the
download hasn't started yet.
"""

if not s3_settings:
self.logger.info(
f"No Job Attachment settings configured for Queue {queue_id}, no inputs to sync."
)
return (SummaryStatistics(), [])
if not attachments:
self.logger.info(f"No attachments configured for Job {job_id}, no inputs to sync.")
return (SummaryStatistics(), [])

# Generate absolute Path Mapping to local session (no storage profile)
# returns root path to PathMappingRule mapping
dynamic_mapping_rules: dict[str, PathMappingRule] = AssetSync.generate_dynamic_path_mapping(
session_dir=session_dir,
attachments=attachments,
)

# Aggregate manifests (with step step dependency handling)
grouped_manifests_by_root: DefaultDict[str, list[BaseAssetManifest]] = (
self.aggregate_asset_root_manifests(
session_dir=session_dir,
s3_settings=s3_settings,
queue_id=queue_id,
job_id=job_id,
attachments=attachments,
step_dependencies=step_dependencies,
dynamic_mapping_rules=dynamic_mapping_rules,
storage_profiles_path_mapping_rules=storage_profiles_path_mapping_rules,
)
)

# Merge the manifests in each root into a single manifest
merged_manifests_by_root: dict[str, BaseAssetManifest] = dict()
total_input_size: int = 0
for root, manifests in grouped_manifests_by_root.items():
merged_manifest = merge_asset_manifests(manifests)

if merged_manifest:
merged_manifests_by_root[root] = merged_manifest
total_input_size += merged_manifest.totalSize # type: ignore[attr-defined]

# Download
summary_statistics: SummaryStatistics = SummaryStatistics()
if (
attachments.fileSystem == JobAttachmentsFileSystem.VIRTUAL.value
and sys.platform != "win32"
and fs_permission_settings is not None
and os_env_vars is not None
and "AWS_PROFILE" in os_env_vars
and isinstance(fs_permission_settings, PosixFileSystemPermissionSettings)
):
# Virtual Download Flow
self._launch_vfs(
s3_settings=s3_settings,
session_dir=session_dir,
fs_permission_settings=fs_permission_settings,
merged_manifests_by_root=merged_manifests_by_root,
os_env_vars=os_env_vars,
)
else:
# Copied Download flow
self._ensure_disk_capacity(session_dir, total_input_size)
summary_statistics = self.copied_download(
s3_settings=s3_settings,
session_dir=session_dir,
fs_permission_settings=fs_permission_settings,
merged_manifests_by_root=merged_manifests_by_root,
on_downloading_files=on_downloading_files,
)

self._record_attachment_mtimes(merged_manifests_by_root)
return (
summary_statistics,
list(asdict(r) for r in dynamic_mapping_rules.values()),
)

def _upload_output_files_to_s3(
self,
s3_settings: JobAttachmentS3Settings,
Expand Down Expand Up @@ -351,6 +691,7 @@ def _ensure_disk_capacity(self, session_dir: Path, total_input_bytes: int) -> No
f"Total file size required for download ({input_size_readable}) is larger than available disk space ({disk_free_readable})"
)

# This is on deprecation path, please use attachment_sync_inputs instead.
def sync_inputs(
self,
s3_settings: Optional[JobAttachmentS3Settings],
Expand Down
Loading

0 comments on commit b2a7efa

Please sign in to comment.