From b2a7efa7d6ea1a3e842689c032438dfb06c7711e Mon Sep 17 00:00:00 2001 From: Godot Bian Date: Mon, 23 Sep 2024 10:41:17 -0700 Subject: [PATCH] chore: refactor sync_inputs to attachment_sync_inputs with decomposed helpers (#449) * chore: refactor sync_inputs to attachment_sync_inputs with decomposed helpers Signed-off-by: Bian <13778003+godobyte@users.noreply.github.com> --- src/deadline/job_attachments/asset_sync.py | 341 ++++++++++ .../test_asset_sync.py | 599 ++++++++++++++++++ 2 files changed, 940 insertions(+) diff --git a/src/deadline/job_attachments/asset_sync.py b/src/deadline/job_attachments/asset_sync.py index 0ca94795..15846620 100644 --- a/src/deadline/job_attachments/asset_sync.py +++ b/src/deadline/job_attachments/asset_sync.py @@ -2,6 +2,7 @@ """ Module for File Attachment synching """ from __future__ import annotations +from dataclasses import dataclass, asdict import os import shutil import sys @@ -68,6 +69,18 @@ logger = getLogger("deadline.job_attachments") +@dataclass +class PathMappingRule: + source_path_format: str + """The path format associated with the source path (windows vs posix)""" + + source_path: str + """The path we're looking to change""" + + destination_path: str + """The path to transform the source path to""" + + class AssetSync: """Class for managing AWS Deadline Cloud job-level attachments.""" @@ -105,6 +118,333 @@ def __init__( self.hash_alg: HashAlgorithm = self.manifest_model.AssetManifest.get_default_hash_alg() + @staticmethod + def generate_dynamic_path_mapping( + session_dir: Path, + attachments: Attachments, + ) -> dict[str, PathMappingRule]: + """ + Compute path mapping rules that are relative to the given session directory. + + Args: + session_dir: path to the current session directory + attachments: an object that holds all input assets for the job. + + Returns: a dictionary of local roots for each asset root, used for path mapping. + """ + mapped_path: dict[str, PathMappingRule] = dict() + + for manifest_properties in attachments.manifests: + if not manifest_properties.fileSystemLocationName: + dir_name: str = _get_unique_dest_dir_name(manifest_properties.rootPath) + local_root = str(session_dir.joinpath(dir_name)) + mapped_path[manifest_properties.rootPath] = PathMappingRule( + source_path_format=manifest_properties.rootPathFormat.value, + source_path=manifest_properties.rootPath, + destination_path=local_root, + ) + + return mapped_path + + @staticmethod + def get_local_destination( + manifest_properties: ManifestProperties, + dynamic_mapping_rules: dict[str, PathMappingRule] = {}, + storage_profiles_path_mapping_rules: dict[str, str] = {}, + ) -> str: + """ + Args: + manifest_properties: manifest properties to search local destination for. + dynamic_mapping_rules: manifest root path to worker host destination mapping relative to local session. + storage_profiles_path_mapping_rules: a dict of source path -> destination path mappings. + + Returns: local destination corresponding to the given manifest properties. + Raises: AssetSyncError If no path mapping rule is found for the given root path. + """ + root_path = manifest_properties.rootPath + + if manifest_properties.fileSystemLocationName: + local_destination = storage_profiles_path_mapping_rules.get(root_path) + else: + path_mapping: Optional[PathMappingRule] = dynamic_mapping_rules.get(root_path) + local_destination = path_mapping.destination_path if path_mapping else None + + if local_destination: + return local_destination + else: + raise AssetSyncError( + "Error occurred while attempting to sync input files: " + f"No path mapping rule found for the source path {manifest_properties.rootPath}" + ) + + def aggregate_asset_root_manifests( + self, + session_dir: Path, + s3_settings: JobAttachmentS3Settings, + queue_id: str, + job_id: str, + attachments: Attachments, + step_dependencies: Optional[list[str]] = None, + dynamic_mapping_rules: dict[str, PathMappingRule] = {}, + storage_profiles_path_mapping_rules: dict[str, str] = {}, + ) -> DefaultDict[str, list[BaseAssetManifest]]: + """ + Args: + session_dir: the directory that the session is going to use. + s3_settings: S3-specific Job Attachment settings. + queue_id: the ID of the queue for step-step dependency. + job_id: the ID of the job for step-step dependency. + attachments: an object that holds all input assets for the job. + step_dependencies: the list of Step IDs whose output should be downloaded over the input job attachments. + dynamic_mapping_rules: manifest root path to worker host destination mapping relative to local session. + storage_profiles_path_mapping_rules: manifest root path to worker host destination mapping given storage profile. + Returns: a dictionary of manifest files stored in the session directory. + """ + grouped_manifests_by_root: DefaultDict[str, list[BaseAssetManifest]] = DefaultDict(list) + + for manifest_properties in attachments.manifests: + local_root: str = AssetSync.get_local_destination( + manifest_properties=manifest_properties, + dynamic_mapping_rules=dynamic_mapping_rules, + storage_profiles_path_mapping_rules=storage_profiles_path_mapping_rules, + ) + + if manifest_properties.inputManifestPath: + manifest_s3_key = s3_settings.add_root_and_manifest_folder_prefix( + manifest_properties.inputManifestPath + ) + manifest = get_manifest_from_s3( + manifest_key=manifest_s3_key, + s3_bucket=s3_settings.s3BucketName, + session=self.session, + ) + grouped_manifests_by_root[local_root].append(manifest) + + # Handle step-step dependencies. + if step_dependencies: + for step_id in step_dependencies: + manifests_by_root = get_output_manifests_by_asset_root( + s3_settings, + self.farm_id, + queue_id, + job_id, + step_id=step_id, + session=self.session, + ) + for root, manifests in manifests_by_root.items(): + dir_name = _get_unique_dest_dir_name(root) + local_root = str(session_dir.joinpath(dir_name)) + grouped_manifests_by_root[local_root].extend(manifests) + + return grouped_manifests_by_root + + def _launch_vfs( + self, + s3_settings: JobAttachmentS3Settings, + session_dir: Path, + fs_permission_settings: Optional[FileSystemPermissionSettings] = None, + merged_manifests_by_root: dict[str, BaseAssetManifest] = dict(), + os_env_vars: dict[str, str] | None = None, + ) -> None: + """ + Args: + s3_settings: S3-specific Job Attachment settings. + session_dir: the directory that the session is going to use. + fs_permission_settings: An instance defining group ownership and permission modes + to be set on the downloaded (synchronized) input files and directories. + merged_manifests_by_root: Merged manifests produced by + aggregate_asset_root_manifests() + Returns: None + Raises: VFSExecutableMissingError If VFS is not startable. + """ + + try: + VFSProcessManager.find_vfs() + mount_vfs_from_manifests( + s3_bucket=s3_settings.s3BucketName, + manifests_by_root=merged_manifests_by_root, + boto3_session=self.session, + session_dir=session_dir, + fs_permission_settings=fs_permission_settings, # type: ignore[arg-type] + os_env_vars=os_env_vars, # type: ignore[arg-type] + cas_prefix=s3_settings.full_cas_prefix(), + ) + + except VFSExecutableMissingError: + logger.error( + f"Virtual File System not found, falling back to {JobAttachmentsFileSystem.COPIED} for JobAttachmentsFileSystem." + ) + + def copied_download( + self, + s3_settings: JobAttachmentS3Settings, + session_dir: Path, + fs_permission_settings: Optional[FileSystemPermissionSettings] = None, + merged_manifests_by_root: dict[str, BaseAssetManifest] = dict(), + on_downloading_files: Optional[Callable[[ProgressReportMetadata], bool]] = None, + ) -> SummaryStatistics: + """ + Args: + s3_settings: S3-specific Job Attachment settings. + session_dir: the directory that the session is going to use. + fs_permission_settings: An instance defining group ownership and permission modes + to be set on the downloaded (synchronized) input files and directories. + merged_manifests_by_root: Merged manifests produced by aggregate_asset_root_manifests() + on_downloading_files: Callback when download files from S3. + + Returns: + The download summary statistics. + + Raises: + JobAttachmentsS3ClientError if any issue is encountered while downloading. + """ + try: + return download_files_from_manifests( + s3_bucket=s3_settings.s3BucketName, + manifests_by_root=merged_manifests_by_root, + cas_prefix=s3_settings.full_cas_prefix(), + fs_permission_settings=fs_permission_settings, + session=self.session, + on_downloading_files=on_downloading_files, + logger=self.logger, + ).convert_to_summary_statistics() + except JobAttachmentsS3ClientError as exc: + if exc.status_code == 404: + raise JobAttachmentsS3ClientError( + action=exc.action, + status_code=exc.status_code, + bucket_name=exc.bucket_name, + key_or_prefix=exc.key_or_prefix, + message=( + "This can happen if the S3 check cache on the submitting machine is out of date. " + "Please delete the cache file from the submitting machine, usually located in the " + "home directory (~/.deadline/cache/s3_check_cache.db) and try submitting again." + ), + ) from exc + else: + raise + + def attachment_sync_inputs( + self, + s3_settings: Optional[JobAttachmentS3Settings], + attachments: Optional[Attachments], + queue_id: str, + job_id: str, + session_dir: Path, + fs_permission_settings: Optional[FileSystemPermissionSettings] = None, + storage_profiles_path_mapping_rules: dict[str, str] = {}, + step_dependencies: Optional[list[str]] = None, + on_downloading_files: Optional[Callable[[ProgressReportMetadata], bool]] = None, + os_env_vars: Dict[str, str] | None = None, + ) -> Tuple[SummaryStatistics, List[Dict[str, str]]]: + """ + Depending on the fileSystem in the Attachments this will perform two + different behaviors: + COPIED / None : downloads a manifest file and corresponding input files, if found. + VIRTUAL: downloads a manifest file and mounts a Virtual File System at the + specified asset root corresponding to the manifest contents + + Args: + s3_settings: S3-specific Job Attachment settings. + attachments: an object that holds all input assets for the job. + queue_id: the ID of the queue. + job_id: the ID of the job. + session_dir: the directory that the session is going to use. + fs_permission_settings: An instance defining group ownership and permission modes + to be set on the downloaded (synchronized) input files and directories. + storage_profiles_path_mapping_rules: A dict of source path -> destination path mappings. + If this dict is not empty, it means that the Storage Profile set in the job is + different from the one configured in the Fleet performing the input-syncing. + step_dependencies: the list of Step IDs whose output should be downloaded over the input + job attachments. + on_downloading_files: a function that will be called with a ProgressReportMetadata object + for each file being downloaded. If the function returns False, the download will be + cancelled. If it returns True, the download will continue. + os_env_vars: environment variables to set for launched subprocesses + + Returns: + COPIED / None : a tuple of (1) final summary statistics for file downloads, + and (2) a list of local roots for each asset root, used for + path mapping. + VIRTUAL: same as COPIED, but the summary statistics will be empty since the + download hasn't started yet. + """ + + if not s3_settings: + self.logger.info( + f"No Job Attachment settings configured for Queue {queue_id}, no inputs to sync." + ) + return (SummaryStatistics(), []) + if not attachments: + self.logger.info(f"No attachments configured for Job {job_id}, no inputs to sync.") + return (SummaryStatistics(), []) + + # Generate absolute Path Mapping to local session (no storage profile) + # returns root path to PathMappingRule mapping + dynamic_mapping_rules: dict[str, PathMappingRule] = AssetSync.generate_dynamic_path_mapping( + session_dir=session_dir, + attachments=attachments, + ) + + # Aggregate manifests (with step step dependency handling) + grouped_manifests_by_root: DefaultDict[str, list[BaseAssetManifest]] = ( + self.aggregate_asset_root_manifests( + session_dir=session_dir, + s3_settings=s3_settings, + queue_id=queue_id, + job_id=job_id, + attachments=attachments, + step_dependencies=step_dependencies, + dynamic_mapping_rules=dynamic_mapping_rules, + storage_profiles_path_mapping_rules=storage_profiles_path_mapping_rules, + ) + ) + + # Merge the manifests in each root into a single manifest + merged_manifests_by_root: dict[str, BaseAssetManifest] = dict() + total_input_size: int = 0 + for root, manifests in grouped_manifests_by_root.items(): + merged_manifest = merge_asset_manifests(manifests) + + if merged_manifest: + merged_manifests_by_root[root] = merged_manifest + total_input_size += merged_manifest.totalSize # type: ignore[attr-defined] + + # Download + summary_statistics: SummaryStatistics = SummaryStatistics() + if ( + attachments.fileSystem == JobAttachmentsFileSystem.VIRTUAL.value + and sys.platform != "win32" + and fs_permission_settings is not None + and os_env_vars is not None + and "AWS_PROFILE" in os_env_vars + and isinstance(fs_permission_settings, PosixFileSystemPermissionSettings) + ): + # Virtual Download Flow + self._launch_vfs( + s3_settings=s3_settings, + session_dir=session_dir, + fs_permission_settings=fs_permission_settings, + merged_manifests_by_root=merged_manifests_by_root, + os_env_vars=os_env_vars, + ) + else: + # Copied Download flow + self._ensure_disk_capacity(session_dir, total_input_size) + summary_statistics = self.copied_download( + s3_settings=s3_settings, + session_dir=session_dir, + fs_permission_settings=fs_permission_settings, + merged_manifests_by_root=merged_manifests_by_root, + on_downloading_files=on_downloading_files, + ) + + self._record_attachment_mtimes(merged_manifests_by_root) + return ( + summary_statistics, + list(asdict(r) for r in dynamic_mapping_rules.values()), + ) + def _upload_output_files_to_s3( self, s3_settings: JobAttachmentS3Settings, @@ -351,6 +691,7 @@ def _ensure_disk_capacity(self, session_dir: Path, total_input_bytes: int) -> No f"Total file size required for download ({input_size_readable}) is larger than available disk space ({disk_free_readable})" ) + # This is on deprecation path, please use attachment_sync_inputs instead. def sync_inputs( self, s3_settings: Optional[JobAttachmentS3Settings], diff --git a/test/unit/deadline_job_attachments/test_asset_sync.py b/test/unit/deadline_job_attachments/test_asset_sync.py index 3e15b7c0..b2a4a21c 100644 --- a/test/unit/deadline_job_attachments/test_asset_sync.py +++ b/test/unit/deadline_job_attachments/test_asset_sync.py @@ -1038,3 +1038,602 @@ def test_cleanup_session_virtual_witout_os_user_raises(self, tmp_path): session_dir=tmp_path, file_system=JobAttachmentsFileSystem.VIRTUAL, ) + + def test_attachment_sync_inputs_no_inputs_successful( + self, + tmp_path: Path, + default_queue: Queue, + default_job: Job, + attachments_no_inputs: Attachments, + ): + """Asserts that sync_inputs is successful when no required assets exist for the Job""" + # GIVEN + default_job.attachments = attachments_no_inputs + session_dir = str(tmp_path) + dest_dir = "assetroot-27bggh78dd2b568ab123" + local_root = str(Path(session_dir) / dest_dir) + # WHEN + with patch( + f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests", + side_effect=[DownloadSummaryStatistics()], + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name", + side_effect=[dest_dir], + ): + mock_on_downloading_files = MagicMock(return_value=True) + + (summary_statistics, result_pathmap_rules) = ( + self.default_asset_sync.attachment_sync_inputs( + default_queue.jobAttachmentSettings, + attachments_no_inputs, + default_queue.queueId, + default_job.jobId, + tmp_path, + on_downloading_files=mock_on_downloading_files, + ) + ) + + # THEN + expected_source_path_format = ( + "windows" + if default_job.attachments.manifests[0].rootPathFormat == PathFormat.WINDOWS + else "posix" + ) + assert result_pathmap_rules == [ + { + "source_path_format": expected_source_path_format, + "source_path": default_job.attachments.manifests[0].rootPath, + "destination_path": local_root, + } + ] + expected_summary_statistics = SummaryStatistics( + total_time=summary_statistics.total_time, + total_files=0, + total_bytes=0, + processed_files=0, + processed_bytes=0, + skipped_files=0, + skipped_bytes=0, + transfer_rate=0.0, + ) + assert summary_statistics == expected_summary_statistics + + @pytest.mark.parametrize( + ("job_fixture_name"), + [ + ("default_job"), + ("vfs_job"), + ], + ) + @pytest.mark.parametrize( + ("s3_settings_fixture_name"), + [ + ("default_job_attachment_s3_settings"), + ], + ) + def test_attachment_sync_inputs_successful( + self, + tmp_path: Path, + default_queue: Queue, + job_fixture_name: str, + s3_settings_fixture_name: str, + test_manifest_one: dict, + request: pytest.FixtureRequest, + ): + """Asserts that a valid manifest can be processed to download attachments from S3""" + # GIVEN + job: Job = request.getfixturevalue(job_fixture_name) + s3_settings: JobAttachmentS3Settings = request.getfixturevalue(s3_settings_fixture_name) + default_queue.jobAttachmentSettings = s3_settings + session_dir = str(tmp_path) + dest_dir = "assetroot-27bggh78dd2b568ab123" + local_root = str(Path(session_dir) / dest_dir) + test_manifest = decode_manifest(json.dumps(test_manifest_one)) + test_fs_permission_settings: PosixFileSystemPermissionSettings = ( + PosixFileSystemPermissionSettings( + os_user="test-user", + os_group="test-group", + dir_mode=0o20, + file_mode=0o20, + ) + ) + os_env_vars: Dict[str, str] = {"AWS_PROFILE": "test-profile"} + assert job.attachments + # WHEN + with patch( + f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3", + return_value=test_manifest, + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests", + side_effect=[DownloadSummaryStatistics()], + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name", + side_effect=[dest_dir], + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.mount_vfs_from_manifests" + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.VFSProcessManager.find_vfs" + ), patch.object( + Path, "stat", MagicMock(st_mtime_ns=1234512345123451) + ): + mock_on_downloading_files = MagicMock(return_value=True) + + (_, result_pathmap_rules) = self.default_asset_sync.attachment_sync_inputs( + s3_settings, + job.attachments, + default_queue.queueId, + job.jobId, + tmp_path, + on_downloading_files=mock_on_downloading_files, + fs_permission_settings=test_fs_permission_settings, + os_env_vars=os_env_vars, + ) + # THEN + expected_source_path_format = ( + "windows" + if job.attachments.manifests[0].rootPathFormat == PathFormat.WINDOWS + else "posix" + ) + assert result_pathmap_rules == [ + { + "source_path_format": expected_source_path_format, + "source_path": job.attachments.manifests[0].rootPath, + "destination_path": local_root, + } + ] + + @pytest.mark.parametrize( + ("job_fixture_name"), + [ + ("default_job"), + ], + ) + @pytest.mark.parametrize( + ("s3_settings_fixture_name"), + [ + ("default_job_attachment_s3_settings"), + ], + ) + def test_attachment_sync_inputs_404_error( + self, + tmp_path: Path, + default_queue: Queue, + job_fixture_name: str, + s3_settings_fixture_name: str, + test_manifest_one: dict, + request: pytest.FixtureRequest, + ): + """Asserts that a specific error message is raised when getting 404 errors synching inputs""" + # GIVEN + download_exception = JobAttachmentsS3ClientError( + action="get-object", + status_code=404, + bucket_name="test bucket", + key_or_prefix="test-key.xxh128", + message="File not found", + ) + job: Job = request.getfixturevalue(job_fixture_name) + test_manifest = decode_manifest(json.dumps(test_manifest_one)) + s3_settings: JobAttachmentS3Settings = request.getfixturevalue(s3_settings_fixture_name) + default_queue.jobAttachmentSettings = s3_settings + dest_dir = "assetroot-27bggh78dd2b568ab123" + assert job.attachments + # WHEN + with patch( + f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3", + return_value=test_manifest, + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name", + side_effect=[dest_dir], + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests", + side_effect=download_exception, + ): + with pytest.raises(JobAttachmentsS3ClientError) as excinfo: + self.default_asset_sync.attachment_sync_inputs( + s3_settings, + job.attachments, + default_queue.queueId, + job.jobId, + tmp_path, + ) + # THEN + assert "usually located in the home directory (~/.deadline/cache/s3_check_cache.db)" in str( + excinfo + ) + + @pytest.mark.parametrize( + ("s3_settings_fixture_name"), + [ + ("default_job_attachment_s3_settings"), + ], + ) + def test_sync_attachment_inputs_with_step_dependencies( + self, + tmp_path: Path, + default_queue: Queue, + default_job: Job, + s3_settings_fixture_name: str, + test_manifest_one: dict, + request: pytest.FixtureRequest, + ): + """Asserts that input syncing is done correctly when step dependencies are provided.""" + # GIVEN + s3_settings: JobAttachmentS3Settings = request.getfixturevalue(s3_settings_fixture_name) + default_queue.jobAttachmentSettings = s3_settings + session_dir = str(tmp_path) + dest_dir = "assetroot-27bggh78dd2b568ab123" + local_root = str(Path(session_dir) / dest_dir) + test_manifest = decode_manifest(json.dumps(test_manifest_one)) + assert default_job.attachments + step_output_root = "/home/outputs_roots" + step_dest_dir = "assetroot-8a7d189e9c17186fb88b" + # WHEN + with patch( + f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3", + return_value=test_manifest, + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests", + side_effect=[DownloadSummaryStatistics()], + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name", + side_effect=[dest_dir, step_dest_dir], + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.get_output_manifests_by_asset_root", + side_effect=[{step_output_root: {}}], + ), patch.object( + Path, "stat", MagicMock(st_mtime_ns=1234512345123451) + ): + mock_on_downloading_files = MagicMock(return_value=True) + + (_, result_pathmap_rules) = self.default_asset_sync.attachment_sync_inputs( + s3_settings, + default_job.attachments, + default_queue.queueId, + default_job.jobId, + tmp_path, + step_dependencies=["step-1"], + on_downloading_files=mock_on_downloading_files, + ) + # THEN + expected_source_path_format = ( + "windows" + if default_job.attachments.manifests[0].rootPathFormat == PathFormat.WINDOWS + else "posix" + ) + assert result_pathmap_rules == [ + { + "source_path_format": expected_source_path_format, + "source_path": default_job.attachments.manifests[0].rootPath, + "destination_path": local_root, + }, + ] + + @pytest.mark.parametrize( + ("s3_settings_fixture_name"), + [ + ("default_job_attachment_s3_settings"), + ], + ) + def test_attachment_sync_inputs_with_step_dependencies_same_root_vfs_on_posix( + self, + tmp_path: Path, + default_queue: Queue, + vfs_job: Job, + s3_settings_fixture_name: str, + test_manifest_one: dict, + test_manifest_two: dict, + request: pytest.FixtureRequest, + ): + """Asserts that input syncing is done correctly when step dependencies are provided.""" + # GIVEN + job = vfs_job + s3_settings: JobAttachmentS3Settings = request.getfixturevalue(s3_settings_fixture_name) + default_queue.jobAttachmentSettings = s3_settings + session_dir = str(tmp_path) + dest_dir = "assetroot-27bggh78dd2b568ab123" + local_root = str(Path(session_dir) / dest_dir) + test_fs_permission_settings: PosixFileSystemPermissionSettings = ( + PosixFileSystemPermissionSettings( + os_user="test-user", + os_group="test-group", + dir_mode=0o20, + file_mode=0o20, + ) + ) + os_env_vars: Dict[str, str] = {"AWS_PROFILE": "test-profile"} + assert job.attachments + test_manifest = decode_manifest(json.dumps(test_manifest_two)) + # WHEN + with patch( + f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3", + return_value=json.dumps(test_manifest_one), + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests", + side_effect=[DownloadSummaryStatistics()], + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name", + return_value=dest_dir, + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.get_output_manifests_by_asset_root", + return_value={"tmp/": [(test_manifest, "hello")]}, + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.merge_asset_manifests", + ) as merge_manifests_mock, patch( + f"{deadline.__package__}.job_attachments.asset_sync.AssetSync._ensure_disk_capacity", + ) as disk_capacity_mock, patch( + f"{deadline.__package__}.job_attachments.download._write_manifest_to_temp_file", + return_value="tmp_manifest", + ), patch( + "sys.platform", "linux" + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.mount_vfs_from_manifests" + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.VFSProcessManager.find_vfs" + ): + mock_on_downloading_files = MagicMock(return_value=True) + + (_, result_pathmap_rules) = self.default_asset_sync.attachment_sync_inputs( + s3_settings, + job.attachments, + default_queue.queueId, + job.jobId, + tmp_path, + step_dependencies=["step-1"], + on_downloading_files=mock_on_downloading_files, + fs_permission_settings=test_fs_permission_settings, + os_env_vars=os_env_vars, + ) + # THEN + merge_manifests_mock.assert_called() + disk_capacity_mock.assert_not_called() + expected_source_path_format = ( + "windows" + if job.attachments.manifests[0].rootPathFormat == PathFormat.WINDOWS + else "posix" + ) + assert result_pathmap_rules == [ + { + "source_path_format": expected_source_path_format, + "source_path": job.attachments.manifests[0].rootPath, + "destination_path": local_root, + }, + ] + + @pytest.mark.parametrize( + ("job_fixture_name"), + [ + ("default_job"), + ], + ) + @pytest.mark.parametrize( + ("s3_settings_fixture_name"), + [ + ("default_job_attachment_s3_settings"), + ], + ) + def test_attachment_sync_inputs_no_space_left( + self, + tmp_path: Path, + default_queue: Queue, + job_fixture_name: str, + s3_settings_fixture_name: str, + really_big_manifest: dict, + request: pytest.FixtureRequest, + ): + """Asserts that an AssetSyncError is thrown if there is not enough space left on the device to download all inputs.""" + # GIVEN + job: Job = request.getfixturevalue(job_fixture_name) + s3_settings: JobAttachmentS3Settings = request.getfixturevalue(s3_settings_fixture_name) + default_queue.jobAttachmentSettings = s3_settings + dest_dir = "assetroot-27bggh78dd2b568ab123" + test_manifest = decode_manifest(json.dumps(really_big_manifest)) + test_fs_permission_settings: PosixFileSystemPermissionSettings = ( + PosixFileSystemPermissionSettings( + os_user="test-user", + os_group="test-group", + dir_mode=0o20, + file_mode=0o20, + ) + ) + os_env_vars: Dict[str, str] = {"AWS_PROFILE": "test-profile"} + assert job.attachments + # WHEN + with patch( + f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3", + return_value=test_manifest, + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests", + side_effect=[DownloadSummaryStatistics()], + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name", + side_effect=[dest_dir], + ), patch.object( + Path, "stat", MagicMock(st_mtime_ns=1234512345123451) + ): + mock_on_downloading_files = MagicMock(return_value=True) + + with pytest.raises(AssetSyncError) as ase: + self.default_asset_sync.attachment_sync_inputs( + s3_settings, + job.attachments, + default_queue.queueId, + job.jobId, + tmp_path, + on_downloading_files=mock_on_downloading_files, + fs_permission_settings=test_fs_permission_settings, + os_env_vars=os_env_vars, + ) + + # THEN + assert ( + "Total file size required for download (300.0 PB) is larger than available disk space" + in str(ase) + ) + + def test_attachment_sync_inputs_with_storage_profiles_path_mapping_rules( + self, + default_queue: Queue, + default_job: Job, + test_manifest_one: dict, + tmp_path: Path, + ): + """Tests when a non-empty `storage_profiles_path_mapping_rules` is passed to `sync_inputs`. + Check that, for input manifests with an `fileSystemLocationName`, if the root path + corresponding to it exists in the `storage_profiles_path_mapping_rules`, the download + is attempted to the correct destination path.""" + # GIVEN + default_job.attachments = Attachments( + manifests=[ + ManifestProperties( + rootPath="/tmp", + rootPathFormat=PathFormat.POSIX, + inputManifestPath="manifest_input", + inputManifestHash="manifesthash", + outputRelativeDirectories=["test/outputs"], + ), + ManifestProperties( + fileSystemLocationName="Movie 1", + rootPath="/home/user/movie1", + rootPathFormat=PathFormat.POSIX, + inputManifestPath="manifest-movie1_input", + inputManifestHash="manifestmovie1hash", + outputRelativeDirectories=["test/outputs"], + ), + ], + ) + test_manifest = decode_manifest(json.dumps(test_manifest_one)) + dest_dir = "assetroot-27bggh78dd2b568ab123" + local_root = str(tmp_path.joinpath(dest_dir)) + + storage_profiles_path_mapping_rules = { + "/home/user/movie1": "/tmp/movie1", + } + + # WHEN + with patch( + f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3", + return_value=test_manifest, + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests", + return_value=DownloadSummaryStatistics(), + ) as mock_download_files_from_manifests, patch( + f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name", + side_effect=[dest_dir], + ), patch.object( + Path, "stat", MagicMock(st_mtime_ns=1234512345123451) + ): + mock_on_downloading_files = MagicMock(return_value=True) + + (summary_statistics, result_pathmap_rules) = ( + self.default_asset_sync.attachment_sync_inputs( + s3_settings=default_queue.jobAttachmentSettings, + attachments=default_job.attachments, + queue_id=default_queue.queueId, + job_id=default_job.jobId, + session_dir=tmp_path, + storage_profiles_path_mapping_rules=storage_profiles_path_mapping_rules, + on_downloading_files=mock_on_downloading_files, + ) + ) + + # THEN + assert result_pathmap_rules == [ + { + "source_path_format": "posix", + "source_path": default_job.attachments.manifests[0].rootPath, + "destination_path": local_root, + } + ] + + mock_download_files_from_manifests.assert_called_once_with( + s3_bucket="test-bucket", + manifests_by_root={ + f"{local_root}": test_manifest, + "/tmp/movie1": test_manifest, + }, + cas_prefix="assetRoot/Data", + fs_permission_settings=None, + session=ANY, + on_downloading_files=mock_on_downloading_files, + logger=getLogger("deadline.job_attachments"), + ) + + @pytest.mark.parametrize( + ("job_fixture_name"), + [ + ("default_job"), + ("vfs_job"), + ], + ) + @pytest.mark.parametrize( + ("s3_settings_fixture_name"), + [ + ("default_job_attachment_s3_settings"), + ], + ) + def test_attachment_sync_inputs_successful_using_vfs_fallback( + self, + tmp_path: Path, + default_queue: Queue, + job_fixture_name: str, + s3_settings_fixture_name: str, + test_manifest_one: dict, + request: pytest.FixtureRequest, + ): + """Asserts that a valid manifest can be processed to download attachments from S3""" + # GIVEN + job: Job = request.getfixturevalue(job_fixture_name) + s3_settings: JobAttachmentS3Settings = request.getfixturevalue(s3_settings_fixture_name) + default_queue.jobAttachmentSettings = s3_settings + session_dir = str(tmp_path) + dest_dir = "assetroot-27bggh78dd2b568ab123" + local_root = str(Path(session_dir) / dest_dir) + test_manifest = decode_manifest(json.dumps(test_manifest_one)) + assert job.attachments + + # WHEN + with patch( + f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3", + return_value=test_manifest, + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests", + side_effect=[DownloadSummaryStatistics()], + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name", + side_effect=[dest_dir], + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.VFSProcessManager.find_vfs", + side_effect=VFSExecutableMissingError, + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.mount_vfs_from_manifests" + ) as mock_mount_vfs, patch( + "sys.platform", "linux" + ), patch.object( + Path, "stat", MagicMock(st_mtime_ns=1234512345123451) + ): + mock_on_downloading_files = MagicMock(return_value=True) + + (_, result_pathmap_rules) = self.default_asset_sync.attachment_sync_inputs( + s3_settings, + job.attachments, + default_queue.queueId, + job.jobId, + tmp_path, + on_downloading_files=mock_on_downloading_files, + ) + + # THEN + expected_source_path_format = ( + "windows" + if job.attachments.manifests[0].rootPathFormat == PathFormat.WINDOWS + else "posix" + ) + assert result_pathmap_rules == [ + { + "source_path_format": expected_source_path_format, + "source_path": job.attachments.manifests[0].rootPath, + "destination_path": local_root, + } + ] + mock_mount_vfs.assert_not_called()