Skip to content

Commit

Permalink
add EventHubBatch E2E pystein tests. (#1052)
Browse files Browse the repository at this point in the history
* added EventHubBatch E2E pystein tests.

* added copyright lic
  • Loading branch information
pdthummar authored Jul 20, 2022
1 parent e4a7744 commit 5f28c27
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import os
import typing
import azure.functions as func
from azure.eventhub import EventHubProducerClient, EventData

app = func.FunctionApp()


# This is an actual EventHub trigger which handles Eventhub events in batches.
# It serializes multiple event data into a json and store it into a blob.
@app.function_name(name="eventhub_multiple")
@app.event_hub_message_trigger(
arg_name="events",
event_hub_name="python-worker-ci-eventhub-batch",
connection="AzureWebJobsEventHubConnectionString",
data_type="string",
cardinality="many")
@app.write_table(arg_name="$return",
connection="AzureWebJobsStorage",
table_name="EventHubBatchTest")
def eventhub_multiple(events):
table_entries = []
for event in events:
json_entry = event.get_body()
table_entry = json.loads(json_entry)
table_entries.append(table_entry)

table_json = json.dumps(table_entries)

return table_json


# An HttpTrigger to generating EventHub event from EventHub Output Binding
@app.function_name(name="eventhub_output_batch")
@app.write_event_hub_message(arg_name="$return",
connection="AzureWebJobsEventHubConnectionString",
event_hub_name="python-worker-ci-eventhub-batch")
@app.route(route="eventhub_output_batch", binding_arg_name="out")
def eventhub_output_batch(req: func.HttpRequest, out: func.Out[str]) -> str:
events = req.get_body().decode('utf-8')
out.set('hello')
return events


# Retrieve the event data from storage blob and return it as Http response
@app.function_name(name="get_eventhub_batch_triggered")
@app.route(route="get_eventhub_batch_triggered/{id}")
@app.read_table(arg_name="testEntities",
connection="AzureWebJobsStorage",
table_name="EventHubBatchTest",
partition_key="{id}")
def get_eventhub_batch_triggered(req: func.HttpRequest, testEntities):
return func.HttpResponse(status_code=200, body=testEntities)


# Retrieve the event data from storage blob and return it as Http response
@app.function_name(name="get_metadata_batch_triggered")
@app.route(route="get_metadata_batch_triggered")
@app.read_blob(arg_name="file",
path="python-worker-tests/test-metadata-batch-triggered.txt",
connection="AzureWebJobsStorage")
def get_metadata_batch_triggered(req: func.HttpRequest,
file: func.InputStream) -> str:
return func.HttpResponse(body=file.read().decode('utf-8'),
status_code=200,
mimetype='application/json')


# This is an actual EventHub trigger which handles Eventhub events in batches.
# It serializes multiple event data into a json and store it into a blob.
@app.function_name(name="metadata_multiple")
@app.event_hub_message_trigger(
arg_name="events",
event_hub_name="python-worker-ci-eventhub-batch-metadata",
connection="AzureWebJobsEventHubConnectionString",
data_type="binary",
cardinality="many")
@app.write_blob(arg_name="$return",
path="python-worker-tests/test-metadata-batch-triggered.txt",
connection="AzureWebJobsStorage")
def metadata_multiple(events: typing.List[func.EventHubEvent]) -> bytes:
event_list = []
for event in events:
event_dict: typing.Mapping[str, typing.Any] = {
'body': event.get_body().decode('utf-8'),
'enqueued_time': event.enqueued_time.isoformat(),
'partition_key': event.partition_key,
'sequence_number': event.sequence_number,
'offset': event.offset,
'metadata': event.metadata
}
event_list.append(event_dict)

return json.dumps(event_list)


# An HttpTrigger to generating EventHub event from azure-eventhub SDK.
# Events generated from azure-eventhub contain the full metadata.
@app.function_name(name="metadata_output_batch")
@app.route(route="metadata_output_batch")
def main(req: func.HttpRequest):
# Get event count from http request query parameter
count = int(req.params.get('count', '1'))

# Parse event metadata from http request
json_string = req.get_body().decode('utf-8')
event_dict = json.loads(json_string)

# Create an EventHub Client and event batch
client = EventHubProducerClient.from_connection_string(
os.getenv('AzureWebJobsEventHubConnectionString'),
eventhub_name='python-worker-ci-eventhub-batch-metadata')

# Generate new event based on http request with full metadata
event_data_batch = client.create_batch()
random_number = int(event_dict.get('body', '0'))
for i in range(count):
event_data_batch.add(EventData(str(random_number + i)))

# Send out event into event hub
with client:
client.send_batch(event_data_batch)

return 'OK'
105 changes: 105 additions & 0 deletions tests/endtoend/test_eventhub_batch_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,108 @@ def _get_table_function_json_path(self):
script_dir = pathlib.Path(self.get_script_dir())
json_path = pathlib.Path('get_eventhub_batch_triggered/function.json')
return testutils.TESTS_ROOT / script_dir / json_path


class TestEventHubBatchFunctionsStein(testutils.WebHostTestCase):

@classmethod
def get_script_dir(cls):
return testutils.E2E_TESTS_FOLDER / 'eventhub_batch_functions' / \
'eventhub_batch_functions_stein'

@testutils.retryable_test(3, 5)
def test_eventhub_multiple(self):
NUM_EVENTS = 3
all_row_keys_seen = dict([(str(i), True) for i in range(NUM_EVENTS)])
partition_key = str(round(time.time()))

docs = []
for i in range(NUM_EVENTS):
doc = {'PartitionKey': partition_key, 'RowKey': i}
docs.append(doc)

r = self.webhost.request('POST', 'eventhub_output_batch',
data=json.dumps(docs))
self.assertEqual(r.status_code, 200)

row_keys = [str(i) for i in range(NUM_EVENTS)]
seen = [False] * NUM_EVENTS
row_keys_seen = dict(zip(row_keys, seen))

# Allow trigger to fire.
time.sleep(5)

r = self.webhost.request(
'GET',
f'get_eventhub_batch_triggered/{partition_key}')
self.assertEqual(r.status_code, 200)
entries = r.json()
for entry in entries:
self.assertEqual(entry['PartitionKey'], partition_key)
row_key = entry['RowKey']
row_keys_seen[row_key] = True

self.assertDictEqual(all_row_keys_seen, row_keys_seen)

@testutils.retryable_test(3, 5)
def test_eventhub_multiple_with_metadata(self):
# Generate a unique event body for EventHub event
# Record the start_time and end_time for checking event enqueue time
start_time = datetime.now(tz=tz.UTC)
count = 10
random_number = str(round(time.time()) % 1000)
req_body = {
'body': random_number
}

# Invoke metadata_output HttpTrigger to generate an EventHub event
# from azure-eventhub SDK
r = self.webhost.request('POST',
f'metadata_output_batch?count={count}',
data=json.dumps(req_body))
self.assertEqual(r.status_code, 200)
self.assertIn('OK', r.text)
end_time = datetime.now(tz=tz.UTC)

# Once the event get generated, allow function host to pool from
# EventHub and wait for metadata_multiple to execute,
# converting the event metadata into a blob.
time.sleep(5)

# Call get_metadata_batch_triggered to retrieve event metadata
r = self.webhost.request('GET', 'get_metadata_batch_triggered')
self.assertEqual(r.status_code, 200)

# Check metadata and events length, events should be batched processed
events = r.json()
self.assertIsInstance(events, list)
self.assertGreater(len(events), 1)

# EventhubEvent property check
for event_index in range(len(events)):
event = events[event_index]

# Check if the event is enqueued between start_time and end_time
enqueued_time = parser.isoparse(event['enqueued_time']).astimezone(
tz=tz.UTC)
self.assertTrue(start_time < enqueued_time < end_time)

# Check if event properties are properly set
self.assertIsNone(event['partition_key']) # only 1 partition
self.assertGreaterEqual(event['sequence_number'], 0)
self.assertIsNotNone(event['offset'])

# Check if event.metadata field is properly set
self.assertIsNotNone(event['metadata'])
metadata = event['metadata']
sys_props_array = metadata['SystemPropertiesArray']
sys_props = sys_props_array[event_index]
enqueued_time = parser.isoparse(sys_props['EnqueuedTimeUtc'])

# Check event trigger time and other system properties
self.assertTrue(
start_time.timestamp() < enqueued_time.timestamp()
< end_time.timestamp()) # NoQA
self.assertIsNone(sys_props['PartitionKey'])
self.assertGreaterEqual(sys_props['SequenceNumber'], 0)
self.assertIsNotNone(sys_props['Offset'])

0 comments on commit 5f28c27

Please sign in to comment.