Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request-reply operator to Microsoft Azure provider #44675

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

perry2of5
Copy link
Contributor

This PR adds a request-reply operator to implement the design pattern from Enterprise Integration Patterns, Hohpe, Woolf,
Addison-Wesley, 2003

In particular, this means one could:
a) Create a service bus queue and topic for a batch process
b) set up an auto-scaling Azure Container Job listening to an Azure Service Bus queue for messages
c) create a DAG using the request-reply operator to start the Azure Container Job and capture the reply when it finishes.

Potential improvements:

  • have the operator background itself while waiting for a reply. No need to tie up a worker thread while a remote process runs
  • Provide more parameters to control the subscription to the reply queue. Right now it deregisters itself if not used for 6 hours and drops messages after 1 hour. This seems reasonable to me since this subscription should only exist for the life of the operator, but more configuration might help some use case I haven't thought of.

A working Azure Container App Job can be built using the scripts/event-job-aca.zsh in the repo https://github.com/perry2of5/http-file-rtrvr

A working DAG is provided below:

from datetime import datetime
from airflow import DAG
from airflow.utils.context import Context
from airflow.operators.python import PythonOperator
from airflow.providers.microsoft.azure.operators.asb import AzureServiceBusRequestReplyOperator
from azure.servicebus import ServiceBusMessage
import json

dag = DAG('test-http-req-reply-dag', description='Test sending message to HTTP download service',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)


def print_hello():
    return 'Hello world from first Airflow DAG!'


def body_generator(context: Context):
    # Define the request body here
    return '''
        {
            "method": "GET",
            "url": "http://example.com/index.html",
            "save_to": "example/dag/1",
            "timeout_seconds": 5
        }
        '''


def process_reply(message: ServiceBusMessage, context: Context):
    # Process the reply message here
    print(f"Received reply: {message}")
    body = json.loads(str(message))
    context['ti'].xcom_push(key='URL', value=body['saved_to_fqn'])
    context['ti'].xcom_push(key='STATUS_CODE', value=body['status'])


def print_url(**context):
    url = context['ti'].xcom_pull(task_ids='send_request', key='URL')
    print('url:', url)


def print_status_code(**context):
    status_code = context['ti'].xcom_pull(task_ids='send_request', key='STATUS_CODE')
    print("status_code", status_code)


hello_operator = PythonOperator(task_id='hello_task', dag=dag, python_callable=print_hello)

send_request = AzureServiceBusRequestReplyOperator(
        task_id='send_request',
        dag=dag,
        request_queue_name="file-rtrvr-request",
        request_body_generator=body_generator,
        reply_topic_name="file-rtrvr-complete",
        max_wait_time=360, # 6 minutes, poll for messages is 5 minutes in Azure Container App Job
        reply_message_callback=process_reply,
        azure_service_bus_conn_id="azure_service_bus_default",
)

status_operator = PythonOperator(
    task_id='print_status_task',
    dag=dag,
    python_callable=print_status_code,
    provide_context=True,
)

url_operator = PythonOperator(
    task_id='done',
    dag=dag,
    python_callable=print_url,
    provide_context=True,
)


hello_operator >> send_request >> url_operator >> status_operator

@perry2of5 perry2of5 force-pushed the add-req-reply-op-to-azure branch 7 times, most recently from 4ca3d16 to e890eb6 Compare December 9, 2024 17:49
admin_hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
self._create_reply_subscription_for_correlation_id(admin_hook, context)

message_hook = MessageHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
Copy link
Contributor

@dabla dabla Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to refactor this one into a message_hook() cached property method:

@cached_property
def message_hook() -> MessageHook:
    return MessageHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)

# Remove the subscription on the reply topic
self._remove_reply_subscription(admin_hook)

def _send_request_message(self, message_hook: MessageHook, context: Context) -> None:
Copy link
Contributor

@dabla dabla Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All those protected methods which have the MessageHook or AdminClientHook as parameter should become a public method of the corresponding hook. Most of the logic should always be located in the hooks, not in the operator, see the operator as some kind of facilitator within DAG's which delegates the heavy lifting to the hooks, so that the hooks on their turn can also be easily (re)used within PythonOperators without the need to rewrite the same logic as in the operators.

So in this case _send_request_message method should be part of MessageHook.

def execute(self, context: Context) -> None:
"""Implement the request-reply pattern using existing hooks."""
self._validate_params()
admin_hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
Copy link
Contributor

@dabla dabla Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this also in a admin_hook() cached property method:

@cached_property
def admin_hook() -> AdminClientHook:
    return AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)

self._remove_reply_subscription(admin_hook)

def _send_request_message(self, message_hook: MessageHook, context: Context) -> None:
with message_hook.get_conn() as service_bus_client:
Copy link
Contributor

@dabla dabla Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After above remarks, this could then become:

with self.message_hook.get_conn() as service_bus_client:

"""Implement the request-reply pattern using existing hooks."""
self._validate_params()
admin_hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
self._create_reply_subscription_for_correlation_id(admin_hook, context)
Copy link
Contributor

@dabla dabla Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After above remark, this could then become a oneliner:

self._create_reply_subscription_for_correlation_id(self.admin_hook, context)

"Created subscription %s on topic %s", self.subscription_name, self.reply_topic_name
)

def _create_subscription(self, admin_asb_conn: ServiceBusAdministrationClient, context: Context):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_create_subscription method should be part of AdminHook

"Sent request with id %s to queue %s", self.reply_correlation_id, self.request_queue_name
)

def _create_reply_subscription_for_correlation_id(
Copy link
Contributor

@dabla dabla Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_create_reply_subscription_for_correlation_id method should be part of AdminClientHook

auto_delete_on_idle="PT6H", # 6 hours
)

def _remove_reply_subscription(self, admin_hook: AdminClientHook) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_remove_reply_subscription method should be part of AdminClientHook

Copy link
Contributor

@dabla dabla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work (y)

@perry2of5
Copy link
Contributor Author

Thank you for the review.

@perry2of5
Copy link
Contributor Author

Just want to check, right now several of the existing operators get a handle to the connection and call the Microsoft Azure library directly. These should really be refactored down into the hook as well, right? For example this: https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/microsoft/azure/operators/asb.py#L316-L339

@dabla
Copy link
Contributor

dabla commented Dec 11, 2024

Just want to check, right now several of the existing operators get a handle to the connection and call the Microsoft Azure library directly. These should really be refactored down into the hook as well, right? For example this: https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/microsoft/azure/operators/asb.py#L316-L339

Good catch, indeed it would be better that this code resides within the hook, the hook should take care of the connection handling and exposes the logic within a public method which on it's turn is called from the operator, that way the same operation can also be executed from the hook within a PythonOperator.

But don't worry, there are still a lot of operators written that way unfortunately, but if we clean up every time we need to modify an operator, we will get there one day :-)

@perry2of5
Copy link
Contributor Author

When I was working on this new operator I thought about moving some of those down into the hooks so I could reuse. I'll go ahead and put in a PR to address that and then update this PR to use those. It will reduce duplication and be a good thing. I'll try and do it this afternoon...we'll see, I have day-job work to do suddenly.

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clarify the value of this operator in a wide context. In your prespective what % of the users of azure provider need this operator?

This PR contains alot of code. It somewhat feels out of scope for the operators we serve (but I don't know azure so I can't say for sure). I am not very comfortable with accepting complex logic operators for providers that are owned by us. Unlike GCP and AWS where they share responsibility for maintaining the provider code, Azure are not involved. So I have to ask the question - do we want this code at Airflow or maybe a better place for it is in a 3rd party provider that you can roll on your own public repo?

Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html

Steps are:
1. Generate a unique ID for the message. The subscription needs to exist before the request message is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of what you write here needs to be in the docs not in the class doc string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems fair. I'm not sure where to move it to though. I don't see documentation outside of the docstrings. Sorry if I'm missing something obvious....not really a python dev.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you mean under the top-level docs folder. It never occurred to me that the providers would be documented outside the top-level providers folder. I'm happy to move things over. I'll try to get to that later this week.

@perry2of5
Copy link
Contributor Author

perry2of5 commented Dec 11, 2024

It seems to me if a message is sent from an airflow DAG then the DAG author probably wants a message back at some point to confirm completion, check for errors, et cetera. To the best of my knowledge, the logic in this PR implements the standard design pattern for doing that.

Also, after the refactors that dabla requested this will be much smaller and the hooks will be more useful.

@perry2of5 perry2of5 marked this pull request as draft December 20, 2024 17:03
Copy link

github-actions bot commented Feb 4, 2025

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Feb 4, 2025
@perry2of5 perry2of5 force-pushed the add-req-reply-op-to-azure branch from e890eb6 to ab34641 Compare February 7, 2025 17:53
@perry2of5
Copy link
Contributor Author

Need to rewrite to use a dedicated response queue because there is a race condition between adding the subscription and modifying the filter. The alternatives are to discard any messages before sending the request or to fix the python SDK for ASB but seems simpler to use a queue.

@github-actions github-actions bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Feb 8, 2025
@perry2of5 perry2of5 force-pushed the add-req-reply-op-to-azure branch 2 times, most recently from 63d6301 to 972d753 Compare February 9, 2025 00:18
@perry2of5 perry2of5 force-pushed the add-req-reply-op-to-azure branch from c17d00f to 1176c3f Compare February 19, 2025 19:55
@perry2of5 perry2of5 force-pushed the add-req-reply-op-to-azure branch from 1176c3f to 733992a Compare March 6, 2025 17:28
@perry2of5
Copy link
Contributor Author

I moved the ability to set message headers, reply-to and message-id into another PR #47522

@perry2of5 perry2of5 force-pushed the add-req-reply-op-to-azure branch from 733992a to 351ff51 Compare March 21, 2025 05:59
@perry2of5 perry2of5 force-pushed the add-req-reply-op-to-azure branch from 351ff51 to 574c47b Compare March 28, 2025 21:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants