diff --git a/samples/snippets/noxfile.py b/samples/snippets/noxfile.py index c9a3d1ecb..075047f97 100644 --- a/samples/snippets/noxfile.py +++ b/samples/snippets/noxfile.py @@ -124,7 +124,7 @@ def get_pytest_env_vars() -> Dict[str, str]: "--builtin=gettext", "--max-complexity=20", "--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py", - "--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202", + "--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202,C901", "--max-line-length=88", ] diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index 7cb7ca223..270451511 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -185,6 +185,147 @@ def create_topic_with_cloud_storage_ingestion( # [END pubsub_create_topic_with_cloud_storage_ingestion] +def create_topic_with_aws_msk_ingestion( + project_id: str, + topic_id: str, + cluster_arn: str, + msk_topic: str, + aws_role_arn: str, + gcp_service_account: str, +) -> None: + """Create a new Pub/Sub topic with AWS MSK Ingestion Settings.""" + # [START pubsub_create_topic_with_aws_msk_ingestion] + from google.cloud import pubsub_v1 + from google.pubsub_v1.types import Topic + from google.pubsub_v1.types import IngestionDataSourceSettings + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # cluster_arn = "your-cluster-arn" + # msk_topic = "your-msk-topic" + # aws_role_arn = "your-aws-role-arn" + # gcp_service_account = "your-gcp-service-account" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + request = Topic( + name=topic_path, + ingestion_data_source_settings=IngestionDataSourceSettings( + aws_msk=IngestionDataSourceSettings.AwsMsk( + cluster_arn=cluster_arn, + topic=msk_topic, + aws_role_arn=aws_role_arn, + gcp_service_account=gcp_service_account, + ) + ), + ) + + topic = publisher.create_topic(request=request) + + print(f"Created topic: {topic.name} with AWS MSK Ingestion Settings") + # [END pubsub_create_topic_with_aws_msk_ingestion] + + +def create_topic_with_azure_event_hubs_ingestion( + project_id: str, + topic_id: str, + resource_group: str, + namespace: str, + event_hub: str, + client_id: str, + tenant_id: str, + subscription_id: str, + gcp_service_account: str, +) -> None: + """Create a new Pub/Sub topic with Azure Event Hubs Ingestion Settings.""" + # [START pubsub_create_topic_with_azure_event_hubs_ingestion] + from google.cloud import pubsub_v1 + from google.pubsub_v1.types import Topic + from google.pubsub_v1.types import IngestionDataSourceSettings + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # resource_group = "your-resource-group" + # namespace = "your-namespace" + # event_hub = "your-event-hub" + # client_id = "your-client-id" + # tenant_id = "your-tenant-id" + # subscription_id = "your-subscription-id" + # gcp_service_account = "your-gcp-service-account" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + request = Topic( + name=topic_path, + ingestion_data_source_settings=IngestionDataSourceSettings( + azure_event_hubs=IngestionDataSourceSettings.AzureEventHubs( + resource_group=resource_group, + namespace=namespace, + event_hub=event_hub, + client_id=client_id, + tenant_id=tenant_id, + subscription_id=subscription_id, + gcp_service_account=gcp_service_account, + ) + ), + ) + + topic = publisher.create_topic(request=request) + + print(f"Created topic: {topic.name} with Azure Event Hubs Ingestion Settings") + # [END pubsub_create_topic_with_azure_event_hubs_ingestion] + + +def create_topic_with_confluent_cloud_ingestion( + project_id: str, + topic_id: str, + bootstrap_server: str, + cluster_id: str, + confluent_topic: str, + identity_pool_id: str, + gcp_service_account: str, +) -> None: + """Create a new Pub/Sub topic with Confluent Cloud Ingestion Settings.""" + # [START pubsub_create_topic_with_confluent_cloud_ingestion] + from google.cloud import pubsub_v1 + from google.pubsub_v1.types import Topic + from google.pubsub_v1.types import IngestionDataSourceSettings + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # bootstrap_server = "your-bootstrap-server" + # cluster_id = "your-cluster-id" + # confluent_topic = "your-confluent-topic" + # identity_pool_id = "your-identity-pool-id" + # gcp_service_account = "your-gcp-service-account" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + request = Topic( + name=topic_path, + ingestion_data_source_settings=IngestionDataSourceSettings( + confluent_cloud=IngestionDataSourceSettings.ConfluentCloud( + bootstrap_server=bootstrap_server, + cluster_id=cluster_id, + topic=confluent_topic, + identity_pool_id=identity_pool_id, + gcp_service_account=gcp_service_account, + ) + ), + ) + + topic = publisher.create_topic(request=request) + + print(f"Created topic: {topic.name} with Confluent Cloud Ingestion Settings") + # [END pubsub_create_topic_with_confluent_cloud_ingestion] + + def update_topic_type( project_id: str, topic_id: str, @@ -710,6 +851,43 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: "minimum_object_create_time" ) + create_topic_with_aws_msk_ingestion_parser = subparsers.add_parser( + "create_aws_msk_ingestion", help=create_topic_with_aws_msk_ingestion.__doc__ + ) + create_topic_with_aws_msk_ingestion_parser.add_argument("topic_id") + create_topic_with_aws_msk_ingestion_parser.add_argument("cluster_arn") + create_topic_with_aws_msk_ingestion_parser.add_argument("msk_topic") + create_topic_with_aws_msk_ingestion_parser.add_argument("aws_role_arn") + create_topic_with_aws_msk_ingestion_parser.add_argument("gcp_service_account") + + create_topic_with_azure_event_hubs_ingestion_parser = subparsers.add_parser( + "create_azure_event_hubs_ingestion", + help=create_topic_with_azure_event_hubs_ingestion.__doc__, + ) + create_topic_with_azure_event_hubs_ingestion_parser.add_argument("topic_id") + create_topic_with_azure_event_hubs_ingestion_parser.add_argument("resource_group") + create_topic_with_azure_event_hubs_ingestion_parser.add_argument("namespace") + create_topic_with_azure_event_hubs_ingestion_parser.add_argument("event_hub") + create_topic_with_azure_event_hubs_ingestion_parser.add_argument("client_id") + create_topic_with_azure_event_hubs_ingestion_parser.add_argument("tenant_id") + create_topic_with_azure_event_hubs_ingestion_parser.add_argument("subscription_id") + create_topic_with_azure_event_hubs_ingestion_parser.add_argument( + "gcp_service_account" + ) + + create_topic_with_confluent_cloud_ingestion_parser = subparsers.add_parser( + "create_confluent_cloud_ingestion", + help=create_topic_with_confluent_cloud_ingestion.__doc__, + ) + create_topic_with_confluent_cloud_ingestion_parser.add_argument("topic_id") + create_topic_with_confluent_cloud_ingestion_parser.add_argument("bootstrap_server") + create_topic_with_confluent_cloud_ingestion_parser.add_argument("cluster_id") + create_topic_with_confluent_cloud_ingestion_parser.add_argument("confluent_topic") + create_topic_with_confluent_cloud_ingestion_parser.add_argument("identity_pool_id") + create_topic_with_confluent_cloud_ingestion_parser.add_argument( + "gcp_service_account" + ) + update_topic_type_parser = subparsers.add_parser( "update_kinesis_ingestion", help=update_topic_type.__doc__ ) @@ -798,6 +976,37 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: args.match_glob, args.minimum_object_create_time, ) + elif args.command == "create_aws_msk_ingestion": + create_topic_with_aws_msk_ingestion( + args.project_id, + args.topic_id, + args.cluster_arn, + args.msk_topic, + args.aws_role_arn, + args.gcp_service_account, + ) + elif args.command == "create_azure_event_hubs_ingestion": + create_topic_with_azure_event_hubs_ingestion( + args.project_id, + args.topic_id, + args.resource_group, + args.namespace, + args.event_hub, + args.client_id, + args.tenant_id, + args.subscription_id, + args.gcp_service_account, + ) + elif args.command == "create_confluent_cloud_ingestion": + create_topic_with_confluent_cloud_ingestion( + args.project_id, + args.topic_id, + args.bootstrap_server, + args.cluster_id, + args.confluent_topic, + args.identity_pool_id, + args.gcp_service_account, + ) elif args.command == "update_kinesis_ingestion": update_topic_type( args.project_id, diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index 6f17305cb..dc7b94027 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -196,6 +196,123 @@ def test_create_topic_with_cloud_storage_ingestion( publisher_client.delete_topic(request={"topic": topic_path}) +def test_create_topic_with_aws_msk_ingestion( + publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str] +) -> None: + # The scope of `topic_path` is limited to this function. + topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID) + + # Outside of automated CI tests, these values must be of actual AWS resources for the test to pass. + cluster_arn = ( + "arn:aws:kafka:us-east-1:111111111111:cluster/fake-cluster-name/11111111-1111-1" + ) + msk_topic = "fake-msk-topic-name" + aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name" + gcp_service_account = ( + "fake-service-account@fake-gcp-project.iam.gserviceaccount.com" + ) + + try: + publisher_client.delete_topic(request={"topic": topic_path}) + except NotFound: + pass + + publisher.create_topic_with_aws_msk_ingestion( + PROJECT_ID, + TOPIC_ID, + cluster_arn, + msk_topic, + aws_role_arn, + gcp_service_account, + ) + + out, _ = capsys.readouterr() + assert f"Created topic: {topic_path} with AWS MSK Ingestion Settings" in out + + # Clean up resource created for the test. + publisher_client.delete_topic(request={"topic": topic_path}) + + +def test_create_topic_with_azure_event_hubs_ingestion( + publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str] +) -> None: + # The scope of `topic_path` is limited to this function. + topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID) + + # Outside of automated CI tests, these values must be of actual Azure resources for the test to pass. + resource_group = "fake-resource-group" + namespace = "fake-namespace" + event_hub = "fake-event-hub" + client_id = "fake-client-id" + tenant_id = "fake-tenant-id" + subcription_id = "fake-subscription-id" + gcp_service_account = ( + "fake-service-account@fake-gcp-project.iam.gserviceaccount.com" + ) + + try: + publisher_client.delete_topic(request={"topic": topic_path}) + except NotFound: + pass + + publisher.create_topic_with_azure_event_hubs_ingestion( + PROJECT_ID, + TOPIC_ID, + resource_group, + namespace, + event_hub, + client_id, + tenant_id, + subcription_id, + gcp_service_account, + ) + + out, _ = capsys.readouterr() + assert ( + f"Created topic: {topic_path} with Azure Event Hubs Ingestion Settings" in out + ) + + # Clean up resource created for the test. + publisher_client.delete_topic(request={"topic": topic_path}) + + +def test_create_topic_with_confluent_cloud_ingestion( + publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str] +) -> None: + # The scope of `topic_path` is limited to this function. + topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID) + + # Outside of automated CI tests, these values must be of actual Confluent resources for the test to pass. + bootstrap_server = "fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092" + cluster_id = "fake-cluster-id" + confluent_topic = "fake-confluent-topic-name" + identity_pool_id = "fake-identity-pool-id" + gcp_service_account = ( + "fake-service-account@fake-gcp-project.iam.gserviceaccount.com" + ) + + try: + publisher_client.delete_topic(request={"topic": topic_path}) + except NotFound: + pass + + publisher.create_topic_with_confluent_cloud_ingestion( + PROJECT_ID, + TOPIC_ID, + bootstrap_server, + cluster_id, + confluent_topic, + identity_pool_id, + gcp_service_account, + ) + + out, _ = capsys.readouterr() + assert f"Created topic: {topic_path} with Confluent Cloud Ingestion Settings" in out + + # Clean up resource created for the test. + publisher_client.delete_topic(request={"topic": topic_path}) + + def test_update_topic_type( publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str] ) -> None: diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index b348927be..b6ae767c6 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1,4 +1,4 @@ -google-cloud-pubsub==2.27.1 +google-cloud-pubsub==2.28.0 avro==1.12.0 protobuf===4.24.4; python_version == '3.7' protobuf==5.29.2; python_version >= '3.8'