Skip to content

Commit

Permalink
docs: Add samples and test for ingestion from Kafka sources (#1354)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelpri10 authored Feb 10, 2025
1 parent 316883a commit 820f986
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 2 deletions.
2 changes: 1 addition & 1 deletion samples/snippets/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def get_pytest_env_vars() -> Dict[str, str]:
"--builtin=gettext",
"--max-complexity=20",
"--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py",
"--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202",
"--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202,C901",
"--max-line-length=88",
]

Expand Down
209 changes: 209 additions & 0 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,147 @@ def create_topic_with_cloud_storage_ingestion(
# [END pubsub_create_topic_with_cloud_storage_ingestion]


def create_topic_with_aws_msk_ingestion(
project_id: str,
topic_id: str,
cluster_arn: str,
msk_topic: str,
aws_role_arn: str,
gcp_service_account: str,
) -> None:
"""Create a new Pub/Sub topic with AWS MSK Ingestion Settings."""
# [START pubsub_create_topic_with_aws_msk_ingestion]
from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# cluster_arn = "your-cluster-arn"
# msk_topic = "your-msk-topic"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
name=topic_path,
ingestion_data_source_settings=IngestionDataSourceSettings(
aws_msk=IngestionDataSourceSettings.AwsMsk(
cluster_arn=cluster_arn,
topic=msk_topic,
aws_role_arn=aws_role_arn,
gcp_service_account=gcp_service_account,
)
),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS MSK Ingestion Settings")
# [END pubsub_create_topic_with_aws_msk_ingestion]


def create_topic_with_azure_event_hubs_ingestion(
project_id: str,
topic_id: str,
resource_group: str,
namespace: str,
event_hub: str,
client_id: str,
tenant_id: str,
subscription_id: str,
gcp_service_account: str,
) -> None:
"""Create a new Pub/Sub topic with Azure Event Hubs Ingestion Settings."""
# [START pubsub_create_topic_with_azure_event_hubs_ingestion]
from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# resource_group = "your-resource-group"
# namespace = "your-namespace"
# event_hub = "your-event-hub"
# client_id = "your-client-id"
# tenant_id = "your-tenant-id"
# subscription_id = "your-subscription-id"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
name=topic_path,
ingestion_data_source_settings=IngestionDataSourceSettings(
azure_event_hubs=IngestionDataSourceSettings.AzureEventHubs(
resource_group=resource_group,
namespace=namespace,
event_hub=event_hub,
client_id=client_id,
tenant_id=tenant_id,
subscription_id=subscription_id,
gcp_service_account=gcp_service_account,
)
),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Azure Event Hubs Ingestion Settings")
# [END pubsub_create_topic_with_azure_event_hubs_ingestion]


def create_topic_with_confluent_cloud_ingestion(
project_id: str,
topic_id: str,
bootstrap_server: str,
cluster_id: str,
confluent_topic: str,
identity_pool_id: str,
gcp_service_account: str,
) -> None:
"""Create a new Pub/Sub topic with Confluent Cloud Ingestion Settings."""
# [START pubsub_create_topic_with_confluent_cloud_ingestion]
from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bootstrap_server = "your-bootstrap-server"
# cluster_id = "your-cluster-id"
# confluent_topic = "your-confluent-topic"
# identity_pool_id = "your-identity-pool-id"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
name=topic_path,
ingestion_data_source_settings=IngestionDataSourceSettings(
confluent_cloud=IngestionDataSourceSettings.ConfluentCloud(
bootstrap_server=bootstrap_server,
cluster_id=cluster_id,
topic=confluent_topic,
identity_pool_id=identity_pool_id,
gcp_service_account=gcp_service_account,
)
),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Confluent Cloud Ingestion Settings")
# [END pubsub_create_topic_with_confluent_cloud_ingestion]


def update_topic_type(
project_id: str,
topic_id: str,
Expand Down Expand Up @@ -710,6 +851,43 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
"minimum_object_create_time"
)

create_topic_with_aws_msk_ingestion_parser = subparsers.add_parser(
"create_aws_msk_ingestion", help=create_topic_with_aws_msk_ingestion.__doc__
)
create_topic_with_aws_msk_ingestion_parser.add_argument("topic_id")
create_topic_with_aws_msk_ingestion_parser.add_argument("cluster_arn")
create_topic_with_aws_msk_ingestion_parser.add_argument("msk_topic")
create_topic_with_aws_msk_ingestion_parser.add_argument("aws_role_arn")
create_topic_with_aws_msk_ingestion_parser.add_argument("gcp_service_account")

create_topic_with_azure_event_hubs_ingestion_parser = subparsers.add_parser(
"create_azure_event_hubs_ingestion",
help=create_topic_with_azure_event_hubs_ingestion.__doc__,
)
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("topic_id")
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("resource_group")
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("namespace")
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("event_hub")
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("client_id")
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("tenant_id")
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("subscription_id")
create_topic_with_azure_event_hubs_ingestion_parser.add_argument(
"gcp_service_account"
)

create_topic_with_confluent_cloud_ingestion_parser = subparsers.add_parser(
"create_confluent_cloud_ingestion",
help=create_topic_with_confluent_cloud_ingestion.__doc__,
)
create_topic_with_confluent_cloud_ingestion_parser.add_argument("topic_id")
create_topic_with_confluent_cloud_ingestion_parser.add_argument("bootstrap_server")
create_topic_with_confluent_cloud_ingestion_parser.add_argument("cluster_id")
create_topic_with_confluent_cloud_ingestion_parser.add_argument("confluent_topic")
create_topic_with_confluent_cloud_ingestion_parser.add_argument("identity_pool_id")
create_topic_with_confluent_cloud_ingestion_parser.add_argument(
"gcp_service_account"
)

update_topic_type_parser = subparsers.add_parser(
"update_kinesis_ingestion", help=update_topic_type.__doc__
)
Expand Down Expand Up @@ -798,6 +976,37 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
args.match_glob,
args.minimum_object_create_time,
)
elif args.command == "create_aws_msk_ingestion":
create_topic_with_aws_msk_ingestion(
args.project_id,
args.topic_id,
args.cluster_arn,
args.msk_topic,
args.aws_role_arn,
args.gcp_service_account,
)
elif args.command == "create_azure_event_hubs_ingestion":
create_topic_with_azure_event_hubs_ingestion(
args.project_id,
args.topic_id,
args.resource_group,
args.namespace,
args.event_hub,
args.client_id,
args.tenant_id,
args.subscription_id,
args.gcp_service_account,
)
elif args.command == "create_confluent_cloud_ingestion":
create_topic_with_confluent_cloud_ingestion(
args.project_id,
args.topic_id,
args.bootstrap_server,
args.cluster_id,
args.confluent_topic,
args.identity_pool_id,
args.gcp_service_account,
)
elif args.command == "update_kinesis_ingestion":
update_topic_type(
args.project_id,
Expand Down
117 changes: 117 additions & 0 deletions samples/snippets/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,123 @@ def test_create_topic_with_cloud_storage_ingestion(
publisher_client.delete_topic(request={"topic": topic_path})


def test_create_topic_with_aws_msk_ingestion(
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
) -> None:
# The scope of `topic_path` is limited to this function.
topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID)

# Outside of automated CI tests, these values must be of actual AWS resources for the test to pass.
cluster_arn = (
"arn:aws:kafka:us-east-1:111111111111:cluster/fake-cluster-name/11111111-1111-1"
)
msk_topic = "fake-msk-topic-name"
aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name"
gcp_service_account = (
"[email protected]"
)

try:
publisher_client.delete_topic(request={"topic": topic_path})
except NotFound:
pass

publisher.create_topic_with_aws_msk_ingestion(
PROJECT_ID,
TOPIC_ID,
cluster_arn,
msk_topic,
aws_role_arn,
gcp_service_account,
)

out, _ = capsys.readouterr()
assert f"Created topic: {topic_path} with AWS MSK Ingestion Settings" in out

# Clean up resource created for the test.
publisher_client.delete_topic(request={"topic": topic_path})


def test_create_topic_with_azure_event_hubs_ingestion(
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
) -> None:
# The scope of `topic_path` is limited to this function.
topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID)

# Outside of automated CI tests, these values must be of actual Azure resources for the test to pass.
resource_group = "fake-resource-group"
namespace = "fake-namespace"
event_hub = "fake-event-hub"
client_id = "fake-client-id"
tenant_id = "fake-tenant-id"
subcription_id = "fake-subscription-id"
gcp_service_account = (
"[email protected]"
)

try:
publisher_client.delete_topic(request={"topic": topic_path})
except NotFound:
pass

publisher.create_topic_with_azure_event_hubs_ingestion(
PROJECT_ID,
TOPIC_ID,
resource_group,
namespace,
event_hub,
client_id,
tenant_id,
subcription_id,
gcp_service_account,
)

out, _ = capsys.readouterr()
assert (
f"Created topic: {topic_path} with Azure Event Hubs Ingestion Settings" in out
)

# Clean up resource created for the test.
publisher_client.delete_topic(request={"topic": topic_path})


def test_create_topic_with_confluent_cloud_ingestion(
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
) -> None:
# The scope of `topic_path` is limited to this function.
topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID)

# Outside of automated CI tests, these values must be of actual Confluent resources for the test to pass.
bootstrap_server = "fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092"
cluster_id = "fake-cluster-id"
confluent_topic = "fake-confluent-topic-name"
identity_pool_id = "fake-identity-pool-id"
gcp_service_account = (
"[email protected]"
)

try:
publisher_client.delete_topic(request={"topic": topic_path})
except NotFound:
pass

publisher.create_topic_with_confluent_cloud_ingestion(
PROJECT_ID,
TOPIC_ID,
bootstrap_server,
cluster_id,
confluent_topic,
identity_pool_id,
gcp_service_account,
)

out, _ = capsys.readouterr()
assert f"Created topic: {topic_path} with Confluent Cloud Ingestion Settings" in out

# Clean up resource created for the test.
publisher_client.delete_topic(request={"topic": topic_path})


def test_update_topic_type(
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
) -> None:
Expand Down
2 changes: 1 addition & 1 deletion samples/snippets/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
google-cloud-pubsub==2.27.1
google-cloud-pubsub==2.28.0
avro==1.12.0
protobuf===4.24.4; python_version == '3.7'
protobuf==5.29.2; python_version >= '3.8'
Expand Down

0 comments on commit 820f986

Please sign in to comment.