diff --git a/providers/src/airflow/providers/amazon/aws/links/comprehend.py b/providers/src/airflow/providers/amazon/aws/links/comprehend.py new file mode 100644 index 0000000000000..fc2fc0b1d1cde --- /dev/null +++ b/providers/src/airflow/providers/amazon/aws/links/comprehend.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow.providers.amazon.aws.links.base_aws import BASE_AWS_CONSOLE_LINK, BaseAwsLink + + +class ComprehendPiiEntitiesDetectionLink(BaseAwsLink): + """Helper class for constructing Amazon Comprehend PII Detection console link.""" + + name = "PII Detection Job" + key = "comprehend_pii_detection" + format_str = ( + BASE_AWS_CONSOLE_LINK + + "/comprehend/home?region={region_name}#" + + "/analysis-job-details/pii/{job_id}" + ) + + +class ComprehendDocumentClassifierLink(BaseAwsLink): + """Helper class for constructing Amazon Comprehend Document Classifier console link.""" + + name = "Document Classifier" + key = "comprehend_document_classifier" + format_str = ( + BASE_AWS_CONSOLE_LINK + "/comprehend/home?region={region_name}#" + "classifier-version-details/{arn}" + ) diff --git a/providers/src/airflow/providers/amazon/aws/operators/comprehend.py b/providers/src/airflow/providers/amazon/aws/operators/comprehend.py index 01496bfc912ee..acee6878e46fa 100644 --- a/providers/src/airflow/providers/amazon/aws/operators/comprehend.py +++ b/providers/src/airflow/providers/amazon/aws/operators/comprehend.py @@ -23,6 +23,10 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.comprehend import ComprehendHook +from airflow.providers.amazon.aws.links.comprehend import ( + ComprehendDocumentClassifierLink, + ComprehendPiiEntitiesDetectionLink, +) from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator from airflow.providers.amazon.aws.triggers.comprehend import ( ComprehendCreateDocumentClassifierCompletedTrigger, @@ -120,6 +124,8 @@ class ComprehendStartPiiEntitiesDetectionJobOperator(ComprehendBaseOperator): https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html """ + operator_extra_links = (ComprehendPiiEntitiesDetectionLink(),) + def __init__( self, input_data_config: dict, @@ -166,6 +172,19 @@ def execute(self, context: Context) -> str: **self.start_pii_entities_kwargs, )["JobId"] + job_url = ComprehendPiiEntitiesDetectionLink.format_str.format( + aws_domain=ComprehendPiiEntitiesDetectionLink.get_aws_domain(self.hook.conn_partition), + region_name=self.hook.conn_region_name, + job_id=job_id, + ) + ComprehendPiiEntitiesDetectionLink.persist( + context=context, + operator=self, + region_name=self.hook.conn_region_name, + aws_partition=self.hook.conn_partition, + job_id=job_id, + ) + self.log.info("You can view the PII entities detection job at %s", job_url) message_description = f"start pii entities detection job {job_id} to complete." if self.deferrable: self.log.info("Deferring %s", message_description) @@ -238,6 +257,7 @@ class ComprehendCreateDocumentClassifierOperator(AwsBaseOperator[ComprehendHook] """ aws_hook_class = ComprehendHook + operator_extra_links = (ComprehendDocumentClassifierLink(),) template_fields: Sequence[str] = aws_template_fields( "document_classifier_name", @@ -300,6 +320,22 @@ def execute(self, context: Context) -> str: **self.document_classifier_kwargs, )["DocumentClassifierArn"] + # create the link to console + job_url = ComprehendDocumentClassifierLink.format_str.format( + aws_domain=ComprehendDocumentClassifierLink.get_aws_domain(self.hook.conn_partition), + region_name=self.hook.conn_region_name, + arn=document_classifier_arn, + ) + + ComprehendDocumentClassifierLink.persist( + context=context, + operator=self, + region_name=self.hook.conn_region_name, + aws_partition=self.hook.conn_partition, + arn=document_classifier_arn, + ) + self.log.info("You can monitor the classifier at %s", job_url) + message_description = f"document classifier {document_classifier_arn} to complete." if self.deferrable: self.log.info("Deferring %s", message_description) diff --git a/providers/src/airflow/providers/amazon/provider.yaml b/providers/src/airflow/providers/amazon/provider.yaml index d89532b848c1b..824c9b08dee66 100644 --- a/providers/src/airflow/providers/amazon/provider.yaml +++ b/providers/src/airflow/providers/amazon/provider.yaml @@ -887,6 +887,9 @@ extra-links: - airflow.providers.amazon.aws.links.sagemaker.SageMakerTransformJobLink - airflow.providers.amazon.aws.links.step_function.StateMachineDetailsLink - airflow.providers.amazon.aws.links.step_function.StateMachineExecutionsDetailsLink + - airflow.providers.amazon.aws.links.comprehend.ComprehendPiiEntitiesDetectionLink + - airflow.providers.amazon.aws.links.comprehend.ComprehendDocumentClassifierLink + connection-types: - hook-class-name: airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook diff --git a/providers/tests/amazon/aws/links/test_comprehend.py b/providers/tests/amazon/aws/links/test_comprehend.py new file mode 100644 index 0000000000000..4e6a4e958474d --- /dev/null +++ b/providers/tests/amazon/aws/links/test_comprehend.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow.providers.amazon.aws.links.comprehend import ( + ComprehendDocumentClassifierLink, + ComprehendPiiEntitiesDetectionLink, +) + +from providers.tests.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase + + +class TestComprehendPiiEntitiesDetectionLink(BaseAwsLinksTestCase): + link_class = ComprehendPiiEntitiesDetectionLink + + def test_extra_link(self): + test_job_id = "123-345-678" + self.assert_extra_link_url( + expected_url=( + f"https://console.aws.amazon.com/comprehend/home?region=eu-west-1#/analysis-job-details/pii/{test_job_id}" + ), + region_name="eu-west-1", + aws_partition="aws", + job_id=test_job_id, + ) + + +class TestComprehendDocumentClassifierLink(BaseAwsLinksTestCase): + link_class = ComprehendDocumentClassifierLink + + def test_extra_link(self): + test_job_id = ( + "arn:aws:comprehend:us-east-1:0123456789:document-classifier/test-custom-document-classifier" + ) + self.assert_extra_link_url( + expected_url=( + f"https://console.aws.amazon.com/comprehend/home?region=us-east-1#classifier-version-details/{test_job_id}" + ), + region_name="us-east-1", + aws_partition="aws", + arn=test_job_id, + )