diff --git a/src/deadline/client/cli/_groups/manifest_group.py b/src/deadline/client/cli/_groups/manifest_group.py index cf15d998..a8b5b86a 100644 --- a/src/deadline/client/cli/_groups/manifest_group.py +++ b/src/deadline/client/cli/_groups/manifest_group.py @@ -36,6 +36,7 @@ S3_MANIFEST_FOLDER_NAME, JobAttachmentS3Settings, ManifestDiff, + AssetType, ) from ...exceptions import NonValidInputError @@ -247,6 +248,18 @@ def manifest_diff( @click.option("--step-id", help="The AWS Deadline Cloud Step to get. ") @click.option("--farm-id", help="The AWS Deadline Cloud Farm to use. ") @click.option("--queue-id", help="The AWS Deadline Cloud Queue to use. ") +@click.option( + "--asset-type", + default=AssetType.ALL.value, + help="Which asset type to download:\n" + "INPUT means download only input asset files for given job/step.\n" + "OUTPUT means download only output asset files for given job/step.\n" + "ALL (default) means download all input & output asset files for given job/step.\n", + type=click.Choice( + [e.value for e in AssetType], + case_sensitive=False, + ), +) @click.option( "--json", default=None, is_flag=True, help="Output is printed as JSON for scripting. " ) @@ -255,11 +268,12 @@ def manifest_download( download_dir: str, job_id: str, step_id: str, + asset_type: str, json: bool, **args, ): """ - Downloads input manifest of previously submitted job. + Downloads input/output manifests of a submitted job as per provided asset_type """ logger: ClickLogger = ClickLogger(is_json=json) if not os.path.isdir(download_dir): @@ -280,6 +294,7 @@ def manifest_download( queue_id=queue_id, job_id=job_id, step_id=step_id, + asset_type=AssetType(asset_type), boto3_session=boto3_session, logger=logger, ) diff --git a/src/deadline/job_attachments/api/manifest.py b/src/deadline/job_attachments/api/manifest.py index 65b8394f..9df2d498 100644 --- a/src/deadline/job_attachments/api/manifest.py +++ b/src/deadline/job_attachments/api/manifest.py @@ -40,6 +40,7 @@ ManifestSnapshot, ManifestMerge, default_glob_all, + AssetType, ) from deadline.job_attachments._utils import _get_long_path_compatible_path from deadline.job_attachments.upload import S3AssetManager, S3AssetUploader @@ -340,6 +341,7 @@ def _manifest_download( job_id: str, boto3_session: boto3.Session, step_id: Optional[str] = None, + asset_type: AssetType = AssetType.ALL, logger: ClickLogger = ClickLogger(False), ) -> ManifestDownloadResponse: """ @@ -351,6 +353,7 @@ def _manifest_download( job_id: Job Id to download. boto_session: Boto3 session. step_id: Optional[str]: Optional, download manifest for a step + asset_type: Which asset manifests should be downloaded for given job (& optionally step), options are Input, Output, All. Default behaviour is All. logger: Click Logger instance to print to CLI as text or JSON. return ManifestDownloadResponse Downloaded Manifest data. Contains source S3 key and local download path. """ @@ -384,6 +387,14 @@ def _manifest_download( # Utility function to build up manifests by root. manifests_by_root: Dict[str, List[BaseAssetManifest]] = dict() + # Set the values of download input & output as per selected asset types in the api request + download_input: bool = ( + True if asset_type is None or asset_type in (AssetType.INPUT, AssetType.ALL) else False + ) + download_output: bool = ( + True if asset_type is None or asset_type in (AssetType.OUTPUT, AssetType.ALL) else False + ) + def add_manifest_by_root( manifests_by_root: Dict[str, list], root: str, manifest: BaseAssetManifest ): @@ -391,62 +402,105 @@ def add_manifest_by_root( manifests_by_root[root] = [] manifests_by_root[root].append(manifest) - # Get input_manifest_paths from Deadline GetJob API + # Get the job from deadline api job: dict = deadline.get_job(farmId=farm_id, queueId=queue_id, jobId=job_id) - attachments: dict = job["attachments"] if "attachments" in job else {} - input_manifest_paths: List[Tuple[str, str]] = [ - (manifest.get("inputManifestPath", ""), manifest["rootPath"]) - for manifest in attachments["manifests"] - ] - - # Download each input_manifest_path - for input_manifest_path, root_path in input_manifest_paths: - asset_manifest: BaseAssetManifest = get_manifest_from_s3( - manifest_key=(s3_prefix / input_manifest_path).as_posix(), - s3_bucket=queue_s3_settings.s3BucketName, - session=queue_role_session, - ) - if asset_manifest is not None: - logger.echo(f"Downloaded input manifest for root: {root_path}") - add_manifest_by_root( - manifests_by_root=manifests_by_root, root=root_path, manifest=asset_manifest - ) - # Now handle step-step dependencies - if step_id is not None: - # Get Step-Step dependencies. - nextToken = "" - step_dep_response = deadline.list_step_dependencies( - farmId=farm_id, - queueId=queue_id, - jobId=job_id, - stepId=step_id, - nextToken=nextToken, - ) - for dependent_step in step_dep_response["dependencies"]: - logger.echo(f"Found Step-Step dependency. {dependent_step['stepId']}") - - # Get manifests for the step-step dependency - step_manifests_by_root: Dict[str, List[BaseAssetManifest]] = ( - get_output_manifests_by_asset_root( - s3_settings=queue_s3_settings, - farm_id=farm_id, - queue_id=queue_id, - job_id=job_id, - step_id=dependent_step["stepId"], - session=queue_role_session, + # If input manifests need to be downloaded + if download_input: + logger.echo(f"Downloading input manifests for job: {job_id}") + + # Get input_manifest_paths from Deadline GetJob API + attachments: dict = job["attachments"] if "attachments" in job else {} + input_manifest_paths: List[Tuple[str, str]] = [ + (manifest.get("inputManifestPath", ""), manifest["rootPath"]) + for manifest in attachments["manifests"] + ] + + # Download each input_manifest_path + for input_manifest_path, root_path in input_manifest_paths: + asset_manifest: BaseAssetManifest = get_manifest_from_s3( + manifest_key=(s3_prefix / input_manifest_path).as_posix(), + s3_bucket=queue_s3_settings.s3BucketName, + session=queue_role_session, + ) + if asset_manifest is not None: + logger.echo(f"Found input manifest for root: {root_path}") + add_manifest_by_root( + manifests_by_root=manifests_by_root, root=root_path, manifest=asset_manifest ) + + # Now handle step-step dependencies + if step_id is not None: + logger.echo(f"Finding step-step dependency manifests for step: {step_id}") + + # Get Step-Step dependencies. + nextToken = "" + step_dep_response = deadline.list_step_dependencies( + farmId=farm_id, + queueId=queue_id, + jobId=job_id, + stepId=step_id, + nextToken=nextToken, ) - # Merge all manifests by root. - for root in step_manifests_by_root.keys(): - for manifest in step_manifests_by_root[root]: - logger.echo(f"Found step-step output manifest for root: {root}") - add_manifest_by_root( - manifests_by_root=manifests_by_root, root=root, manifest=manifest + for dependent_step in step_dep_response["dependencies"]: + logger.echo(f"Found Step-Step dependency. {dependent_step['stepId']}") + + # Get manifests for the step-step dependency + step_manifests_by_root: Dict[str, List[BaseAssetManifest]] = ( + get_output_manifests_by_asset_root( + s3_settings=queue_s3_settings, + farm_id=farm_id, + queue_id=queue_id, + job_id=job_id, + step_id=dependent_step["stepId"], + session=queue_role_session, ) + ) + # Merge all manifests by root. + for root in step_manifests_by_root.keys(): + for manifest in step_manifests_by_root[root]: + logger.echo(f"Found step-step output manifest for root: {root}") + add_manifest_by_root( + manifests_by_root=manifests_by_root, root=root, manifest=manifest + ) + + # If output manifests need to be downloaded + if download_output: + output_manifests_by_root: Dict[str, List[BaseAssetManifest]] + if step_id is not None: + logger.echo(f"Downloading output manifests step: {step_id} of job: {job_id}") + # Only get the output manifests for selected step + output_manifests_by_root = get_output_manifests_by_asset_root( + s3_settings=queue_s3_settings, + farm_id=farm_id, + queue_id=queue_id, + job_id=job_id, + step_id=step_id, + session=queue_role_session, + ) + + else: + logger.echo(f"Downloading output manifests for job: {job_id}") + # Get output manifests for all steps of the job + output_manifests_by_root = get_output_manifests_by_asset_root( + s3_settings=queue_s3_settings, + farm_id=farm_id, + queue_id=queue_id, + job_id=job_id, + session=queue_role_session, + ) + + # Merge all output manifests by root. + for root in output_manifests_by_root.keys(): + for manifest in output_manifests_by_root[root]: + logger.echo(f"Found output manifest for root: {root}") + add_manifest_by_root( + manifests_by_root=manifests_by_root, root=root, manifest=manifest + ) # Finally, merge all manifest paths to create unified manifests. # TODO: Filter outputs by path + merged_manifests: Dict[str, BaseAssetManifest] = {} for root in manifests_by_root.keys(): merged_manifest = merge_asset_manifests(manifests_by_root[root]) diff --git a/src/deadline/job_attachments/models.py b/src/deadline/job_attachments/models.py index fb6c5627..dc8ae14b 100644 --- a/src/deadline/job_attachments/models.py +++ b/src/deadline/job_attachments/models.py @@ -134,6 +134,12 @@ def _missing_(cls, value): return None +class AssetType(str, Enum): + INPUT = "input" + OUTPUT = "output" + ALL = "all" + + class PathFormat(str, Enum): WINDOWS = "windows" POSIX = "posix" diff --git a/test/integ/cli/test_cli_manifest_download.py b/test/integ/cli/test_cli_manifest_download.py index 64601ac8..f561a5a5 100644 --- a/test/integ/cli/test_cli_manifest_download.py +++ b/test/integ/cli/test_cli_manifest_download.py @@ -22,6 +22,8 @@ ) from deadline.job_attachments.asset_sync import AssetSync from .test_utils import JobAttachmentTest, UploadInputFilesOneAssetInCasOutputs +from typing import Optional +from deadline.job_attachments.models import AssetType @pytest.mark.integ @@ -31,6 +33,61 @@ def temp_dir(self): with tempfile.TemporaryDirectory() as tmpdir_path: yield tmpdir_path + def _assert_input_mainfests_exist(self, files): + assert "inputs/textures", "brick.png" in files + assert "inputs/textures", "cloth.png" in files + assert "inputs/scene.ma" in files + + def _assert_output_manifests_exist(self, files): + assert "output_file" in files + assert "output/nested_output_file" in files + + def _assert_dependent_step_output_exist(self, files): + assert "dependent_step_output_file" in files + assert "dependent_step_output/nested_output_file" in files + + def check_json_mode_contains_files(self, result): + # If JSON mode was specified, make sure the output is JSON and contains the downloaded manifest file. + download = json.loads(result.output) + assert download is not None + assert len(download["downloaded"]) == 1 + return download + + def validate_result_exit_code_0(self, job_attachment_test, result): + # Then + assert result.exit_code == 0, ( + f"{result.output}, {job_attachment_test.farm_id}, {job_attachment_test.queue_id}" + ) + + def run_cli_with_params( + self, asset_type, job_attachment_test, job_id, json_output, step_id, temp_dir + ): + runner = CliRunner() + # Download for farm, queue, job to temp dir. + args = [ + "manifest", + "download", + "--farm-id", + job_attachment_test.farm_id, + "--queue-id", + job_attachment_test.queue_id, + "--job-id", + job_id, + "--asset-type", + asset_type, + temp_dir, + ] + + if json_output: + args.append("--json") + + if step_id: + args.append("--step-id") + args.append(step_id) + + result = runner.invoke(main, args) + return result + def _setup_create_job( self, upload_input_files_one_asset_in_cas: UploadInputFilesOneAssetInCasOutputs, @@ -71,20 +128,27 @@ def _setup_create_job( # Return the created Job ID. return job_id + def validate_manifest_is_not_None(self, manifest_file): + manifest: BaseAssetManifest = decode_manifest(manifest_file.read()) + assert manifest is not None + return manifest + def _sync_mock_output_file( self, job_attachment_test: JobAttachmentTest, job_id: str, first_step_name: str, - second_step_name: str, + second_step_name: Optional[str], asset_root_path: str, + output_file_path: str, + nested_output_file_path: str, ) -> str: """ - Create a fake manifest file, uplaod it as a step output and return the step ID that is dependent. + Create a fake manifest file, upload it as a step output and return the step ID that is dependent. job_attachment_test: JobAttachmentTest test harness - job_id: str, self explainatory. - first_step_name: str, self explainatory. - second_step_name: str, self explainatory. + job_id: str, self-explanatory. + first_step_name: str, independent step. + second_step_name: Optional[str], dependent on first step. asset_root_path: Asset root to upload an output file. """ list_steps_response = job_attachment_test.deadline_client.list_steps( @@ -96,7 +160,7 @@ def _sync_mock_output_file( # Find the IDs of the steps: step_ids = {step["name"]: step["stepId"] for step in list_steps_response["steps"]} first_step_id = step_ids[first_step_name] - second_step_id = step_ids[second_step_name] + second_step_id = step_ids[second_step_name] if second_step_name is not None else None # Get the task of the first step so we can upload a fake manifest. first_step_first_task_id = job_attachment_test.deadline_client.list_tasks( @@ -115,9 +179,9 @@ def _sync_mock_output_file( hash_alg=HashAlgorithm("xxh128"), total_size=10, paths=[ - ManifestPath(path="output_file", hash="a", size=1, mtime=167907934333848), + ManifestPath(path=output_file_path, hash="a", size=1, mtime=167907934333848), ManifestPath( - path="output/nested_output_file", hash="b", size=1, mtime=1479079344833848 + path=nested_output_file_path, hash="b", size=1, mtime=1479079344833848 ), ], ) @@ -139,7 +203,7 @@ def _sync_mock_output_file( full_output_prefix=full_output_prefix, root_path=asset_root_path, ) - return second_step_id + return second_step_id if second_step_id is not None else first_step_id @pytest.mark.parametrize( "json_output", @@ -148,10 +212,14 @@ def _sync_mock_output_file( pytest.param(False), ], ) - def test_manifest_download_job( + @pytest.mark.parametrize( + "asset_type", [AssetType.INPUT.value, AssetType.OUTPUT.value, AssetType.ALL.value] + ) + def test_manifest_download_job_asset_type_with_no_step_dependency( self, temp_dir: str, json_output: bool, + asset_type: str, upload_input_files_one_asset_in_cas: UploadInputFilesOneAssetInCasOutputs, default_job_template: str, job_attachment_test: JobAttachmentTest, @@ -162,44 +230,48 @@ def test_manifest_download_job( upload_input_files_one_asset_in_cas, default_job_template, job_attachment_test ) - # When - runner = CliRunner() - # Download for farm, queue, job to temp dir. - args = [ - "manifest", - "download", - "--farm-id", - job_attachment_test.farm_id, - "--queue-id", - job_attachment_test.queue_id, - "--job-id", + # Upload the task output manifest for the step + asset_root_path: str = upload_input_files_one_asset_in_cas.attachments.manifests[0].rootPath + step_id: str = self._sync_mock_output_file( + job_attachment_test, job_id, - temp_dir, - ] - if json_output: - args.append("--json") - result = runner.invoke(main, args) + "custom-step-2", + None, + asset_root_path, + "output_file", + "output/nested_output_file", + ) - # Then - assert result.exit_code == 0, ( - f"{result.output}, {job_attachment_test.farm_id}, {job_attachment_test.queue_id}" + # When + result = self.run_cli_with_params( + asset_type=asset_type, + job_attachment_test=job_attachment_test, + job_id=job_id, + json_output=json_output, + temp_dir=temp_dir, + step_id=step_id, ) + + self.validate_result_exit_code_0(job_attachment_test, result) if json_output: - # If JSON mode was specified, make sure the output is JSON and contains the downloaded manifest file. - download = json.loads(result.output) - assert download is not None - assert len(download["downloaded"]) == 1 + download = self.check_json_mode_contains_files(result) # With JSON mode, we can also check the manifest file itself. with open(download["downloaded"][0]["local_manifest_path"]) as manifest_file: - manifest: BaseAssetManifest = decode_manifest(manifest_file.read()) - assert manifest is not None + manifest = self.validate_manifest_is_not_None(manifest_file) # Create a list of files we know should be in the input paths. files: List[str] = [path.path for path in manifest.paths] - assert "inputs/textures/brick.png" in files - assert "inputs/textures/cloth.png" in files - assert "inputs/scene.ma" in files + if asset_type == AssetType.INPUT: + self._assert_input_mainfests_exist(files) + + if asset_type == AssetType.OUTPUT: + assert "inputs/textures", "brick.png" not in files + self._assert_output_manifests_exist(files) + + if asset_type == AssetType.ALL: + self._assert_input_mainfests_exist(files) + self._assert_output_manifests_exist(files) @pytest.mark.parametrize( "json_output", @@ -208,9 +280,13 @@ def test_manifest_download_job( pytest.param(False), ], ) - def test_manifest_download_job_step_dependency( + @pytest.mark.parametrize( + "asset_type", [AssetType.INPUT.value, AssetType.OUTPUT.value, AssetType.ALL.value, None] + ) + def test_manifest_download_asset_type_with_job_step_dependency( self, temp_dir: str, + asset_type: str, json_output: bool, upload_input_files_one_asset_in_cas: UploadInputFilesOneAssetInCasOutputs, default_job_template_step_step_dependency: str, @@ -223,51 +299,197 @@ def test_manifest_download_job_step_dependency( job_attachment_test, ) + # Upload a dependent task output manifest. + asset_root_path: str = upload_input_files_one_asset_in_cas.attachments.manifests[0].rootPath + self._sync_mock_output_file( + job_attachment_test, + job_id, + "custom-step", + "custom-step-2", + asset_root_path, + "dependent_step_output_file", + "dependent_step_output/nested_output_file", + ) + + # Upload the task output manifest + self._sync_mock_output_file( + job_attachment_test, + job_id, + "custom-step-2", + None, + asset_root_path, + "output_file", + "output/nested_output_file", + ) + + result = self.run_cli_with_params( + asset_type=asset_type, + job_attachment_test=job_attachment_test, + job_id=job_id, + json_output=json_output, + temp_dir=temp_dir, + step_id=None, + ) + + self.validate_result_exit_code_0(job_attachment_test, result) + if json_output: + download = self.check_json_mode_contains_files(result) + + # With JSON mode, we can also check the manifest file itself. + with open(download["downloaded"][0]["local_manifest_path"]) as manifest_file: + manifest = self.validate_manifest_is_not_None(manifest_file) + + # Create a list of files we know should be in the input paths. + files: List[str] = [path.path for path in manifest.paths] + if asset_type == AssetType.INPUT: + self._assert_input_mainfests_exist(files) + # No step id in request hence no step dependencies outputs + assert "dependent_step_output_file" not in files + + if asset_type == AssetType.OUTPUT: + assert "inputs/textures", "brick.png" not in files + self._assert_output_manifests_exist(files) + self._assert_dependent_step_output_exist(files) + + if asset_type == AssetType.ALL or asset_type is None: + self._assert_input_mainfests_exist(files) + self._assert_dependent_step_output_exist(files) + self._assert_output_manifests_exist(files) + + @pytest.mark.parametrize( + "json_output", + [ + pytest.param(True), + pytest.param(False), + ], + ) + def test_manifest_download_output_only_for_one_step( + self, + temp_dir: str, + json_output: bool, + upload_input_files_one_asset_in_cas: UploadInputFilesOneAssetInCasOutputs, + default_job_template_step_step_dependency: str, + job_attachment_test: JobAttachmentTest, + ): + # Create a job, with step-step dependency. + job_id: str = self._setup_create_job( + upload_input_files_one_asset_in_cas, + default_job_template_step_step_dependency, + job_attachment_test, + ) + # Upload a dependent task output manifest. asset_root_path: str = upload_input_files_one_asset_in_cas.attachments.manifests[0].rootPath second_step_id: str = self._sync_mock_output_file( - job_attachment_test, job_id, "custom-step", "custom-step-2", asset_root_path + job_attachment_test, + job_id, + "custom-step", + "custom-step-2", + asset_root_path, + "dependent_step_output_file", + "dependent_step_output/nested_output_file", ) - # When - runner = CliRunner() - # Download for farm, queue, job to temp dir. - args = [ - "manifest", - "download", - "--farm-id", - job_attachment_test.farm_id, - "--queue-id", - job_attachment_test.queue_id, - "--job-id", + # Upload the task output manifest for the step + self._sync_mock_output_file( + job_attachment_test, job_id, - "--step-id", - second_step_id, - temp_dir, - ] + "custom-step-2", + None, + asset_root_path, + "output_file", + "output/nested_output_file", + ) + + # When + result = self.run_cli_with_params( + asset_type="output", + job_attachment_test=job_attachment_test, + job_id=job_id, + json_output=json_output, + temp_dir=temp_dir, + step_id=second_step_id, + ) + + self.validate_result_exit_code_0(job_attachment_test, result) if json_output: - args.append("--json") - result = runner.invoke(main, args) + download = self.check_json_mode_contains_files(result) - # Then - assert result.exit_code == 0, ( - f"{result.output}, {job_attachment_test.farm_id}, {job_attachment_test.queue_id}" + # With JSON mode, we can also check the manifest file itself. + with open(download["downloaded"][0]["local_manifest_path"]) as manifest_file: + manifest = self.validate_manifest_is_not_None(manifest_file) + + # Create a list of files we know should be in the input paths. + files: List[str] = [path.path for path in manifest.paths] + assert "inputs/textures", "brick.png" not in files + assert "dependent_step_output_file" not in files + self._assert_output_manifests_exist(files) + + @pytest.mark.parametrize( + "json_output", + [ + pytest.param(True), + pytest.param(False), + ], + ) + def test_manifest_download_job_input_with_given_step_and_step_dependency( + self, + temp_dir: str, + json_output: bool, + upload_input_files_one_asset_in_cas: UploadInputFilesOneAssetInCasOutputs, + default_job_template_step_step_dependency: str, + job_attachment_test: JobAttachmentTest, + ): + # Given: + # Create a job, with step step dependency. + job_id: str = self._setup_create_job( + upload_input_files_one_asset_in_cas, + default_job_template_step_step_dependency, + job_attachment_test, + ) + + # Upload the task output manifest for the step + asset_root_path: str = upload_input_files_one_asset_in_cas.attachments.manifests[0].rootPath + self._sync_mock_output_file( + job_attachment_test, + job_id, + "custom-step", + "custom-step-2", + asset_root_path, + "dependent_step_output_file", + "dependent_step_output/nested_output_file", ) + + # Upload the task output manifest for the step + second_step_id: str = self._sync_mock_output_file( + job_attachment_test, + job_id, + "custom-step-2", + None, + asset_root_path, + "output_file", + "output/nested_output_file", + ) + + # When + result = self.run_cli_with_params( + asset_type="input", + job_attachment_test=job_attachment_test, + job_id=job_id, + json_output=json_output, + temp_dir=temp_dir, + step_id=second_step_id, + ) + + self.validate_result_exit_code_0(job_attachment_test, result) if json_output: - # If JSON mode was specified, make sure the output is JSON and contains the downloaded manifest file. - download = json.loads(result.output) - assert download is not None - assert len(download["downloaded"]) == 1 + download = self.check_json_mode_contains_files(result) # With JSON mode, we can also check the manifest file itself. with open(download["downloaded"][0]["local_manifest_path"]) as manifest_file: - manifest: BaseAssetManifest = decode_manifest(manifest_file.read()) - assert manifest is not None + manifest = self.validate_manifest_is_not_None(manifest_file) # Create a list of files we know should be in the input paths. files: List[str] = [path.path for path in manifest.paths] - assert "inputs/textures", "brick.png" in files - assert "inputs/textures", "cloth.png" in files - assert "inputs/scene.ma" in files - assert "output_file" in files - assert "output/nested_output_file" in files + self._assert_input_mainfests_exist(files) + self._assert_dependent_step_output_exist(files)