Skip to content

Commit

Permalink
Update EventHub to enable live testing in sovereign clouds for multip…
Browse files Browse the repository at this point in the history
…le services (#21715)

These changes enable Event Hub to run live tests against Public, UsGov and China.

@benbp , @jameszliao-msft , @lmazuel , @lilyjma , @ramya-rao-a, @annatisch for notification.
  • Loading branch information
v-xuto authored Mar 8, 2023
1 parent 1eb9c4f commit ab0b377
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 52 deletions.
2 changes: 2 additions & 0 deletions scripts/devops_tasks/test_run_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@
"sample_publish_events_to_a_topic_using_sas_credential_async.py"
],
"azure-eventhub": [
"client_identity_authentication.py", # TODO: remove after fixing issue #29177
"connection_to_custom_endpoint_address.py",
"proxy.py",
"connection_to_custom_endpoint_address_async.py",
"iot_hub_connection_string_receive_async.py",
"proxy_async.py",
"send_stream.py", # TODO: remove after fixing issue #29177
],
"azure-eventhub-checkpointstoretable": ["receive_events_using_checkpoint_store.py"],
"azure-servicebus": [
Expand Down
15 changes: 11 additions & 4 deletions sdk/eventhub/azure-eventhub/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ def resource_group():
except KeyError:
pytest.skip('AZURE_SUBSCRIPTION_ID undefined')
return
resource_client = ResourceManagementClient(EnvironmentCredential(), SUBSCRIPTION_ID)
base_url = os.environ.get("EVENTHUB_RESOURCE_MANAGER_URL", "https://management.azure.com/")
credential_scopes = ["{}.default".format(base_url)]
resource_client = ResourceManagementClient(EnvironmentCredential(), SUBSCRIPTION_ID, base_url=base_url, credential_scopes=credential_scopes)
resource_group_name = RES_GROUP_PREFIX + str(uuid.uuid4())
parameters = {"location": LOCATION}
expiry = datetime.datetime.utcnow() + datetime.timedelta(days=1)
Expand All @@ -122,7 +124,9 @@ def eventhub_namespace(resource_group):
except KeyError:
pytest.skip('AZURE_SUBSCRIPTION_ID defined')
return
resource_client = EventHubManagementClient(EnvironmentCredential(), SUBSCRIPTION_ID)
base_url = os.environ.get("EVENTHUB_RESOURCE_MANAGER_URL", "https://management.azure.com/")
credential_scopes = ["{}.default".format(base_url)]
resource_client = EventHubManagementClient(EnvironmentCredential(), SUBSCRIPTION_ID, base_url=base_url, credential_scopes=credential_scopes)
namespace_name = NAMESPACE_PREFIX + str(uuid.uuid4())
try:
namespace = resource_client.namespaces.begin_create_or_update(
Expand All @@ -147,16 +151,19 @@ def live_eventhub(resource_group, eventhub_namespace): # pylint: disable=redefi
except KeyError:
pytest.skip('AZURE_SUBSCRIPTION_ID defined')
return
resource_client = EventHubManagementClient(EnvironmentCredential(), SUBSCRIPTION_ID)
base_url = os.environ.get("EVENTHUB_RESOURCE_MANAGER_URL", "https://management.azure.com/")
credential_scopes = ["{}.default".format(base_url)]
resource_client = EventHubManagementClient(EnvironmentCredential(), SUBSCRIPTION_ID, base_url=base_url, credential_scopes=credential_scopes)
eventhub_name = EVENTHUB_PREFIX + str(uuid.uuid4())
eventhub_ns_name, connection_string, key_name, primary_key = eventhub_namespace
eventhub_endpoint_suffix = os.environ.get("EVENT_HUB_ENDPOINT_SUFFIX", ".servicebus.windows.net")
try:
eventhub = resource_client.event_hubs.create_or_update(
resource_group.name, eventhub_ns_name, eventhub_name, {"partition_count": PARTITION_COUNT}
)
live_eventhub_config = {
'resource_group': resource_group.name,
'hostname': "{}.servicebus.windows.net".format(eventhub_ns_name),
'hostname': "{}{}".format(eventhub_ns_name, eventhub_endpoint_suffix),
'key_name': key_name,
'access_key': primary_key,
'namespace': eventhub_ns_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ async def test_client_secret_credential_async(live_eventhub, uamqp_transport):
eventhub_name=live_eventhub['event_hub'],
credential=credential,
user_agent='customized information',
auth_timeout=3,
auth_timeout=30,
uamqp_transport=uamqp_transport
)
consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
consumer_group='$default',
credential=credential,
user_agent='customized information',
auth_timeout=3,
auth_timeout=30,
uamqp_transport=uamqp_transport
)

Expand All @@ -46,7 +46,7 @@ async def on_event(partition_context, event):
on_event.called = False
async with consumer_client:
task = asyncio.ensure_future(consumer_client.receive(on_event, partition_id='0', starting_position='-1'))
await asyncio.sleep(13)
await asyncio.sleep(15)
await task
assert on_event.called is True
assert on_event.partition_id == "0"
Expand Down Expand Up @@ -108,7 +108,7 @@ async def test_client_azure_sas_credential_async(live_eventhub, uamqp_transport)
token = (await credential.get_token(auth_uri)).token
producer_client = EventHubProducerClient(fully_qualified_namespace=hostname,
eventhub_name=live_eventhub['event_hub'],
auth_timeout=3,
auth_timeout=30,
credential=AzureSasCredential(token), uamqp_transport=uamqp_transport)

async with producer_client:
Expand All @@ -128,7 +128,7 @@ async def test_client_azure_named_key_credential_async(live_eventhub, uamqp_tran
eventhub_name=live_eventhub['event_hub'],
consumer_group='$default',
credential=credential,
auth_timeout=3,
auth_timeout=30,
user_agent='customized information', uamqp_transport=uamqp_transport)

assert (await consumer_client.get_eventhub_properties()) is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ async def on_error(events, pid, err):
await asyncio.sleep(5)
assert not sent_events
await asyncio.sleep(20)

assert sum([len(sent_events[pid]) for pid in partitions]) == 1

assert not on_error.err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async def on_event_batch(partition_context, event_batch):


@pytest.mark.parametrize("max_wait_time, sleep_time, expected_result",
[(3, 10, []),
[(3, 15, []),
(3, 2, None),
])
@pytest.mark.liveTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,29 +310,31 @@ async def on_error(partition_context, error):
token = (await credential.get_token(auth_uri)).token
producer_client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token[:-1], time.time() + 5),
credential=EventHubSASTokenCredential(token, time.time() + 5),
uamqp_transport=uamqp_transport)
await asyncio.sleep(10)
await asyncio.sleep(15)
# expired credential
async with producer_client:
with pytest.raises(AuthenticationError):
await producer_client.create_batch(partition_id='0')

consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token, time.time() + 7),
consumer_group='$Default',
retry_total=0,
uamqp_transport=uamqp_transport)
on_error.err = None
async with consumer_client:
task = asyncio.ensure_future(consumer_client.receive(on_event,
starting_position= "-1", on_error=on_error))
await asyncio.sleep(15)
await task
# TODO: expired credential AuthenticationError not raised for east-asia/China regions
if 'servicebus.windows.net' in live_eventhub['hostname']:
consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token, time.time() + 7),
consumer_group='$Default',
retry_total=0,
uamqp_transport=uamqp_transport)
on_error.err = None
async with consumer_client:
task = asyncio.ensure_future(consumer_client.receive(on_event,
starting_position= "-1", on_error=on_error))
await asyncio.sleep(15)
await task

# expired credential
assert isinstance(on_error.err, AuthenticationError)
# expired credential
assert isinstance(on_error.err, AuthenticationError)

credential = EventHubSharedKeyCredential('fakekey', live_eventhub['access_key'])
producer_client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async def on_error(partition_context, error):
for i in range(5):
ed = EventData("Event Number {}".format(i))
senders[0].send(ed)
await asyncio.sleep(10)
await asyncio.sleep(20)
await task1
await task2
assert isinstance(on_error.error, EventHubError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def on_event(partition_context, event):
"starting_position": '-1'
})
worker.start()
time.sleep(13)
time.sleep(15)

worker.join()
assert on_event.called is True
Expand Down Expand Up @@ -112,7 +112,7 @@ def test_client_azure_sas_credential(live_eventhub, uamqp_transport):
producer_client = EventHubProducerClient(fully_qualified_namespace=hostname,
eventhub_name=live_eventhub['event_hub'],
credential=AzureSasCredential(token),
auth_timeout=3,
auth_timeout=30,
uamqp_transport=uamqp_transport)

with producer_client:
Expand All @@ -131,7 +131,7 @@ def test_client_azure_named_key_credential(live_eventhub, uamqp_transport):
consumer_group='$default',
credential=credential,
user_agent='customized information',
auth_timeout=3,
auth_timeout=30,
uamqp_transport=uamqp_transport)

assert consumer_client.get_eventhub_properties() is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def on_event_batch(partition_context, event_batch):


@pytest.mark.parametrize("max_wait_time, sleep_time, expected_result",
[(3, 10, []),
[(3, 15, []),
(3, 2, None)])
def test_receive_batch_empty_with_max_wait_time(uamqp_transport, connection_str, max_wait_time, sleep_time, expected_result):
'''Test whether event handler is called when max_wait_time > 0 and no event is received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,15 @@ def on_error(partition_context, error):
producer_client.create_batch(partition_id='0')

consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token, time.time() + 7),
consumer_group='$Default',
retry_total=0,
uamqp_transport=uamqp_transport)
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token, time.time() + 7),
consumer_group='$Default',
retry_total=0,
uamqp_transport=uamqp_transport)
on_error.err = None
with consumer_client:
thread = threading.Thread(target=consumer_client.receive, args=(on_event,),
kwargs={"starting_position": "-1", "on_error": on_error})
kwargs={"starting_position": "-1", "on_error": on_error})
thread.daemon = True
thread.start()
time.sleep(15)
Expand Down Expand Up @@ -330,7 +330,11 @@ def on_error(partition_context, error):
uamqp_transport=uamqp_transport)

with producer_client:
with pytest.raises(AuthenticationError):
errors = (AuthenticationError)
# TODO: flaky TimeoutError during connect for China region
if 'servicebus.windows.net' not in live_eventhub['hostname']:
errors = (AuthenticationError, ConnectError)
with pytest.raises(errors):
producer_client.create_batch(partition_id='0')

consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def on_event(partition_context, event):
"track_last_enqueued_event_properties": True})
thread.daemon = True
thread.start()
time.sleep(10)
time.sleep(30)
assert on_event.event_position is not None
thread.join()
senders[0].send(EventData(expected_result))
Expand All @@ -105,7 +105,7 @@ def on_event(partition_context, event):
"track_last_enqueued_event_properties": True})
thread.daemon = True
thread.start()
time.sleep(15)
time.sleep(30)
assert on_event.event.body_as_str() == expected_result

thread.join()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ def test_send_and_receive_large_body_size(connstr_receivers, uamqp_transport, ti
if sys.platform.startswith('darwin'):
pytest.skip("Skipping on OSX - open issue regarding message size")
connection_str, receivers = connstr_receivers

# TODO: sending large batch to China cloud results in write timeout for pyamqp
# https://github.com/Azure/azure-sdk-for-python/issues/29177
if not uamqp_transport and 'servicebus.windows.net' not in connection_str:
pytest.skip("Skipping for pyamqp - open issue regarding write timeout")

client = EventHubProducerClient.from_connection_string(connection_str, uamqp_transport=uamqp_transport)
with client:
payload = 250 * 1024
Expand All @@ -104,7 +110,7 @@ def test_send_and_receive_large_body_size(connstr_receivers, uamqp_transport, ti
client.send_event(EventData("A" * payload))

received = []
timeout = 10 * timeout_factor
timeout = 20 * timeout_factor
for r in receivers:
received.extend([EventData._from_message(x) for x in r.receive_message_batch(timeout=timeout)])

Expand Down
17 changes: 16 additions & 1 deletion sdk/eventhub/test-resources.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
"description": "The subscription ID to which the application and resources belong."
}
},
"serviceBusEndpointSuffix": {
"type": "string",
"defaultValue": ".servicebus.windows.net",
"metadata": {
"description": "The url suffix to use when creating eventhubs connection strings."
}
},
"tenantId": {
"type": "string",
"defaultValue": "[subscription().tenantId]",
Expand Down Expand Up @@ -180,7 +187,7 @@
},
"EVENT_HUB_HOSTNAME": {
"type": "string",
"value": "[concat(variables('eventHubsNamespace'), '.servicebus.windows.net')]"
"value": "[concat(variables('eventHubsNamespace'), parameters('serviceBusEndpointSuffix'))]"
},
"EVENT_HUB_CONN_STR": {
"type": "string",
Expand Down Expand Up @@ -213,6 +220,14 @@
"AZURE_TABLES_CONN_STR": {
"type": "string",
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=', parameters('storageEndpointSuffix'))]"
},
"RESOURCE_REGION": {
"type": "string",
"value": "[parameters('location')]"
},
"EVENT_HUB_ENDPOINT_SUFFIX": {
"type": "string",
"value": "[parameters('serviceBusEndpointSuffix')]"
}
}
}
23 changes: 13 additions & 10 deletions sdk/eventhub/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ stages:
ServiceDirectory: eventhub
TestTimeoutInMinutes: 240
BuildTargetingString: azure-eventhub*
SupportedClouds: 'Public,UsGov,China'
CloudConfig:
Public:
SubscriptionConfiguration: $(sub-config-azure-cloud-test-resources)
UsGov:
SubscriptionConfiguration: $(sub-config-gov-test-resources)
China:
SubscriptionConfiguration: $(sub-config-cn-test-resources)
Location: 'chinanorth2'
MatrixReplace:
- TestSamples=.*/true
MatrixFilters:
Expand All @@ -14,15 +23,9 @@ stages:
AZURE_STORAGE_DATA_LAKE_ENABLED_CONN_STR: $(python-eh-livetest-event-hub-storage-data-lake-enabled-conn-str)
IOTHUB_CONNECTION_STR: $(python-eh-livetest-event-hub-iothub-connection-str)
IOTHUB_DEVICE: $(python-eh-livetest-event-hub-iothub-device)
AZURE_CLIENT_ID: $(python-eh-livetest-event-hub-aad-client-id)
AZURE_TENANT_ID: $(python-eh-livetest-event-hub-aad-tenant-id)
AZURE_CLIENT_SECRET: $(python-eh-livetest-event-hub-aad-secret)
AZURE_SUBSCRIPTION_ID: $(python-eh-livetest-event-hub-subscription-id)
AZURE_CLIENT_ID: $(EVENTHUB_CLIENT_ID)
AZURE_TENANT_ID: $(EVENTHUB_TENANT_ID)
AZURE_CLIENT_SECRET: $(EVENTHUB_CLIENT_SECRET)
AZURE_SUBSCRIPTION_ID: $(EVENTHUB_SUBSCRIPTION_ID)
AZURE_COSMOS_CONN_STR: $(python-eventhub-livetest-cosmos-conn-str)
Clouds: 'Public,Canary'
CloudConfig:
Public:
SubscriptionConfiguration: $(sub-config-azure-cloud-test-resources)
Canary:
SubscriptionConfiguration: $(sub-config-azure-cloud-test-resources)
Location: 'eastus2euap'

0 comments on commit ab0b377

Please sign in to comment.