Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create operators for working with Topics for GCP Apache Kafka #46865

Merged
merged 1 commit into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions providers/google/docs/operators/cloud/managed_kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,54 @@ To update cluster you can use
:start-after: [START how_to_cloud_managed_kafka_update_cluster_operator]
:end-before: [END how_to_cloud_managed_kafka_update_cluster_operator]

Interacting with Apache Kafka Topics
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

To create an Apache Kafka topic you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaCreateTopicOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_create_topic_operator]
:end-before: [END how_to_cloud_managed_kafka_create_topic_operator]

To delete topic you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaDeleteTopicOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_delete_topic_operator]
:end-before: [END how_to_cloud_managed_kafka_delete_topic_operator]

To get topic you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaGetTopicOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_get_topic_operator]
:end-before: [END how_to_cloud_managed_kafka_get_topic_operator]

To get a list of topics you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaListTopicsOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_list_topic_operator]
:end-before: [END how_to_cloud_managed_kafka_list_topic_operator]

To update topic you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaUpdateTopicOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_update_topic_operator]
:end-before: [END how_to_cloud_managed_kafka_update_topic_operator]

Reference
^^^^^^^^^

Expand Down
1 change: 1 addition & 0 deletions providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,7 @@ extra-links:
- airflow.providers.google.cloud.links.translate.TranslationGlossariesListLink
- airflow.providers.google.cloud.links.managed_kafka.ApacheKafkaClusterLink
- airflow.providers.google.cloud.links.managed_kafka.ApacheKafkaClusterListLink
- airflow.providers.google.cloud.links.managed_kafka.ApacheKafkaTopicLink


secrets-backends:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.cloud.managedkafka_v1 import Cluster, ManagedKafkaClient, types
from google.cloud.managedkafka_v1 import Cluster, ManagedKafkaClient, Topic, types

if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.cloud.managedkafka_v1.services.managed_kafka.pagers import ListClustersPager
from google.cloud.managedkafka_v1.services.managed_kafka.pagers import ListClustersPager, ListTopicsPager
from google.protobuf.field_mask_pb2 import FieldMask


Expand Down Expand Up @@ -286,3 +286,197 @@ def delete_cluster(
metadata=metadata,
)
return operation

@GoogleBaseHook.fallback_to_default_project_id
def create_topic(
self,
project_id: str,
location: str,
cluster_id: str,
topic_id: str,
topic: types.Topic | dict,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> types.Topic:
"""
Create a new topic in a given project and location.

:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param location: Required. The ID of the Google Cloud region that the service belongs to.
:param cluster_id: Required. The ID of the cluster in which to create the topic.
:param topic_id: Required. The ID to use for the topic, which will become the final component of the
topic's name.
:param topic: Required. Configuration of the topic to create.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_managed_kafka_client()
parent = client.cluster_path(project_id, location, cluster_id)

result = client.create_topic(
request={
"parent": parent,
"topic_id": topic_id,
"topic": topic,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)
return result

@GoogleBaseHook.fallback_to_default_project_id
def list_topics(
self,
project_id: str,
location: str,
cluster_id: str,
page_size: int | None = None,
page_token: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> ListTopicsPager:
"""
List the topics in a given cluster.

:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param location: Required. The ID of the Google Cloud region that the service belongs to.
:param cluster_id: Required. The ID of the cluster whose topics are to be listed.
:param page_size: Optional. The maximum number of topics to return. The service may return fewer than
this value. If unset or zero, all topics for the parent is returned.
:param page_token: Optional. A page token, received from a previous ``ListTopics`` call. Provide this
to retrieve the subsequent page. When paginating, all other parameters provided to ``ListTopics``
must match the call that provided the page token.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_managed_kafka_client()
parent = client.cluster_path(project_id, location, cluster_id)

result = client.list_topics(
request={
"parent": parent,
"page_size": page_size,
"page_token": page_token,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)
return result

@GoogleBaseHook.fallback_to_default_project_id
def get_topic(
self,
project_id: str,
location: str,
cluster_id: str,
topic_id: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> types.Topic:
"""
Return the properties of a single topic.

:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param location: Required. The ID of the Google Cloud region that the service belongs to.
:param cluster_id: Required. The ID of the cluster whose topic is to be returned.
:param topic_id: Required. The ID of the topic whose configuration to return.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_managed_kafka_client()
name = client.topic_path(project_id, location, cluster_id, topic_id)

result = client.get_topic(
request={
"name": name,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)
return result

@GoogleBaseHook.fallback_to_default_project_id
def update_topic(
self,
project_id: str,
location: str,
cluster_id: str,
topic_id: str,
topic: types.Topic | dict,
update_mask: FieldMask | dict,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> types.Topic:
"""
Update the properties of a single topic.

:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param location: Required. The ID of the Google Cloud region that the service belongs to.
:param cluster_id: Required. The ID of the cluster whose topic is to be updated.
:param topic_id: Required. The ID of the topic whose configuration to update.
:param topic: Required. The topic to update. Its ``name`` field must be populated.
:param update_mask: Required. Field mask is used to specify the fields to be overwritten in the Topic
resource by the update. The fields specified in the update_mask are relative to the resource, not
the full request. A field will be overwritten if it is in the mask.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_managed_kafka_client()
_topic = deepcopy(topic) if isinstance(topic, dict) else Topic.to_dict(topic)
_topic["name"] = client.topic_path(project_id, location, cluster_id, topic_id)

result = client.update_topic(
request={
"update_mask": update_mask,
"topic": _topic,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)
return result

@GoogleBaseHook.fallback_to_default_project_id
def delete_topic(
self,
project_id: str,
location: str,
cluster_id: str,
topic_id: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> None:
"""
Delete a single topic.

:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param location: Required. The ID of the Google Cloud region that the service belongs to.
:param cluster_id: Required. The ID of the cluster whose topic is to be deleted.
:param topic_id: Required. The ID of the topic to delete.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_managed_kafka_client()
name = client.topic_path(project_id, location, cluster_id, topic_id)

client.delete_topic(
request={
"name": name,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
MANAGED_KAFKA_BASE_LINK + "/{location}/clusters/{cluster_id}?project={project_id}"
)
MANAGED_KAFKA_CLUSTER_LIST_LINK = MANAGED_KAFKA_BASE_LINK + "/clusters?project={project_id}"
MANAGED_KAFKA_TOPIC_LINK = (
MANAGED_KAFKA_BASE_LINK + "/{location}/clusters/{cluster_id}/topics/{topic_id}?project={project_id}"
)


class ApacheKafkaClusterLink(BaseGoogleLink):
Expand Down Expand Up @@ -73,3 +76,29 @@ def persist(
"project_id": task_instance.project_id,
},
)


class ApacheKafkaTopicLink(BaseGoogleLink):
"""Helper class for constructing Apache Kafka Topic link."""

name = "Apache Kafka Topic"
key = "topic_conf"
format_str = MANAGED_KAFKA_TOPIC_LINK

@staticmethod
def persist(
context: Context,
task_instance,
cluster_id: str,
topic_id: str,
):
task_instance.xcom_push(
context=context,
key=ApacheKafkaTopicLink.key,
value={
"location": task_instance.location,
"cluster_id": cluster_id,
"topic_id": topic_id,
"project_id": task_instance.project_id,
},
)
Loading
Loading