Skip to content

Commit

Permalink
docs(connectors): adds procedures to manage connector offsets (#10661)
Browse files Browse the repository at this point in the history
Signed-off-by: prmellor <[email protected]>
  • Loading branch information
PaulRMellor authored Oct 14, 2024
1 parent 72f251c commit 2b8c900
Show file tree
Hide file tree
Showing 10 changed files with 427 additions and 6 deletions.
6 changes: 6 additions & 0 deletions documentation/assemblies/configuring/assembly-config.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ include::../../modules/configuring/proc-manual-stop-pause-connector.adoc[levelof
include::../../modules/configuring/proc-manual-restart-connector.adoc[leveloffset=+2]
//Procedure to manually restart a Kafka connector task
include::../../modules/configuring/proc-manual-restart-connector-task.adoc[leveloffset=+2]
//procedure to list offsets
include::../../modules/configuring/proc-listing-connector-offsets.adoc[leveloffset=+2]
//procedure to alter offsets
include::../../modules/configuring/proc-altering-connector-offsets.adoc[leveloffset=+2]
//procedure to reset offsets
include::../../modules/configuring/proc-resetting-connector-offsets.adoc[leveloffset=+2]

//`KafkaMirrorMaker2` resource config
include::../../modules/configuring/con-config-mirrormaker2.adoc[leveloffset=+1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,96 @@ Specifically, ensure that the following properties have the same value across al
* `topic.filter.class`

For example, the value for `replication.policy.class` must be the same for the source, checkpoint, and heartbeat connectors.
Mismatched or missing settings cause issues with data replication or offset syncing, so it's essential to keep all relevant connectors configured with the same settings.
Mismatched or missing settings cause issues with data replication or offset syncing, so it's essential to keep all relevant connectors configured with the same settings.

== Listing the offsets of MirrorMaker 2 connectors

To list the offset positions of the internal MirrorMaker 2 connectors, use the same configuration that's used to manage Kafka Connect connectors.
For more information on setting up the configuration and listing offsets, see xref:proc-listing-connector-offsets-{context}[].

In this example, the `sourceConnector` configuration is updated to return the connector offset position.
The offset information is written to a specified config map.

.Example configuration for MirrorMaker 2 connector
[source,yaml,subs="+quotes,attributes"]
----
apiVersion: {KafkaMirrorMaker2ApiVersion}
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker2
spec:
version: {DefaultKafkaVersion}
# ...
clusters:
- alias: "my-cluster-source"
bootstrapServers: my-cluster-source-kafka-bootstrap:9092
- alias: "my-cluster-target"
bootstrapServers: my-cluster-target-kafka-bootstrap:9092
mirrors:
- sourceCluster: "my-cluster-source"
targetCluster: "my-cluster-target"
sourceConnector:
listOffsets:
toConfigMap:
name: my-connector-offsets
# ...
----

You must apply the following annotations to the `KafkaMirrorMaker2` resource be able to manage connector offsets:

* `strimzi.io/connector-offsets`
* `strimzi.io/mirrormaker-connector`

The `strimzi.io/mirrormaker-connector` annotation must be set to the name of the connector.
These annotations remain until the operation succeeds or they are manually removed from the resource.

MirrorMaker 2 connectors are named using the aliases of the source and target clusters, followed by the connector type: `<source_alias>&#45;&#62;<target_alias>.<connector_type>`.

In the following example, the annotations are applied for a connector named `my-cluster-source&#45;&#62;my-cluster-target.MirrorSourceConnector`.

.Example application of annotations for connector
[source,shell]
----
kubectl annotate kafkamirrormaker2 my-mirror-maker-2 strimzi.io/connector-offsets=list strimzi.io/mirrormaker-connector="my-cluster-source->my-cluster-target.MirrorSourceConnector" -n kafka
----

The offsets are listed in the specified config map.
Strimzi puts the offset information into a `.json` property named after the connector.
This does not overwrite any other properties when updating an existing config map.

.Example source connector offset list
[source,yaml,subs="+attributes"]
----
apiVersion: v1
kind: ConfigMap
metadata:
# ...
ownerReferences: # <1>
- apiVersion: {KafkaMirrorMaker2ApiVersion}
blockOwnerDeletion: false
controller: false
kind: KafkaMirrorMaker2
name: my-mirror-maker2
uid: 637e3be7-bd96-43ab-abde-c55b4c4550e0
data:
my-cluster-source--my-cluster-target.MirrorSourceConnector.json: |- # <2>
{
"offsets": [
{
"partition": {
"cluster": "east-kafka",
"partition": 0,
"topic": "mirrormaker2-cluster-configs"
},
"offset": {
"offset": 0
}
}
]
}
----
<1> The owner reference pointing to the `KafkaMirrorMaker2` resource.
To provide a custom owner reference, create the config map in advance and set the owner reference.
<2> The `.json` property uses the connector name. Since `&#45;&#62;` characters are not allowed in config map keys, `&#45;&#62;` is changed to `--` in the connector name.

NOTE: It is possible to use configuration to xref:proc-altering-connector-offsets-{context}[alter] or xref:proc-resetting-connector-offsets-{context}[reset] connector offsets, though this is rarely necessary.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Module included in the following assemblies:
//
// assembly-config.adoc

[id='proc-altering-connector-offsets-{context}']
= Altering connector offsets

[role="_abstract"]
To alter connector offsets using `KafkaConnector` resources, configure the resource to stop the connector and add `alterOffsets` configuration to specify the offset changes in a config map.
You can reuse the same config map used to xref:proc-listing-connector-offsets-{context}[list offsets].

After the connector is stopped and the configuration is in place, annotate the `KafkaConnector` resource to apply the offset alteration, then restart the connector.

Altering connector offsets can be useful, for example, to skip a _poison_ record or replay a record.

In this procedure, we alter the offset position for a source connector named `my-source-connector`.

.Prerequisites

* The Cluster Operator is running.

.Procedure

. Edit the `KafkaConnector` resource to stop the connector and include the `alterOffsets` configuration.
+
.Example configuration to stop a connector and alter offsets
[source,yaml,subs="+attributes"]
----
apiVersion: {KafkaConnectorApiVersion}
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
state: stopped # <1>
alterOffsets:
fromConfigMap: # <2>
name: my-connector-offsets # <3>
# ...
----
<1> Changes the state of the connector to `stopped`. The default state for the connector when this property is not set is `running`.
<2> The reference to the config map that provides the update.
<3> The name of the config map, which is named `my-connector-offsets` in this example.

. Edit the config map to make the alteration.
+
In this example, we're resetting the offset position for a source connector to 15000.
+
.Example source connector offset list configuration
[source,yaml,subs="+attributes"]
----
apiVersion: v1
kind: ConfigMap
metadata:
# ...
data:
offsets.json: |- # <1>
{
"offsets" : [ {
"partition" : {
"filename" : "/data/myfile.txt"
},
"offset" : {
"position" : 15000 # <2>
}
} ]
}
----
<1> Edits must be made within the `offsets.json` property.
<2> The updated offset position in the source partition.

. Run the command to update the offset position by annotating the `KafkaConnector` resource:
+
[source,shell]
----
kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=alter -n <namespace>
----
+
The annotation remains until either the update operation succeeds or it is manually removed from the resource.

. Check the changes by using the procedure to xref:proc-listing-connector-offsets-{context}[list connector offsets].

. Restart the connector by changing the state to `running`.
+
.Example configuration to start a connector
[source,yaml,subs="+attributes"]
----
apiVersion: {KafkaConnectorApiVersion}
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
state: running
# ...
----
145 changes: 145 additions & 0 deletions documentation/modules/configuring/proc-listing-connector-offsets.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Module included in the following assemblies:
//
// assembly-config.adoc

[id='proc-listing-connector-offsets-{context}']
= Listing connector offsets

[role="_abstract"]
To track connector offsets using `KafkaConnector` resources, add the `listOffsets` configuration.
The offsets, which keep track of the flow of data, are written to a config map specified in the configuration.
If the config map does not exist, Strimzi creates it.

After the configuration is in place, annotate the `KafkaConnector` resource to write the list to the config map.

Sink connectors use Kafka's standard consumer offset mechanism, while source connectors store offsets in a custom format within a Kafka topic.

* For sink connectors, the list shows Kafka topic partitions and the last committed offset for each partition.
* For source connectors, the list shows the source system’s partition and the last offset processed.

.Prerequisites

* The Cluster Operator is running.

.Procedure

. Edit the `KafkaConnector` resource for the connector to include the `listOffsets` configuration.
+
.Example configuration to list offsets
[source,yaml,subs="+attributes"]
----
apiVersion: {KafkaConnectorApiVersion}
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
listOffsets:
toConfigMap: # <1>
name: my-connector-offsets # <2>
# ...
----
<1> The reference to the config map where the list of offsets will be written to.
<2> The name of the config map, which is named `my-connector-offsets` in this example.

. Run the command to write the list to the config map by annotating the `KafkaConnector` resource:
+
[source,shell]
----
kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list -n <namespace>
----
+
The annotation remains until either the list operation succeeds or it is manually removed from the resource.

. After the `KafkaConnector` resource is updated, use the following command to check if the config map with the offsets was created:
+
[source,shell]
----
kubectl get configmap my-connector-offsets -n <namespace>
----

. Inspect the contents of the config map to verify the offsets are being listed:
+
[source,shell]
----
kubectl describe configmap my-connector-offsets -n <namespace>
----
+
Strimzi puts the offset information into the `offsets.json` property.
This does not overwrite any other properties when updating an existing config map.
+
--
.Example source connector offset list
[source,yaml,subs="+attributes"]
----
apiVersion: v1
kind: ConfigMap
metadata:
# ...
ownerReferences: # <1>
- apiVersion: {KafkaConnectApiVersion}
blockOwnerDeletion: false
controller: false
kind: KafkaConnector
name: my-source-connector
uid: 637e3be7-bd96-43ab-abde-c55b4c4550e0
resourceVersion: "66951"
uid: 641d60a9-36eb-4f29-9895-8f2c1eb9638e
data:
offsets.json: |-
{
"offsets" : [ {
"partition" : {
"filename" : "/data/myfile.txt" # <2>
},
"offset" : {
"position" : 15295 # <3>
}
} ]
}
----
<1> The owner reference pointing to the `KafkaConnector` resource for the source connector.
To provide a custom owner reference, create the config map in advance and set the owner reference.
<2> The source partition, represented by the filename `/data/myfile.txt` in this example for a file-based connector.
<3> The last processed offset position in the source partition.
--
+
--
.Example sink connector offset list
[source,yaml,subs="+attributes"]
----
apiVersion: v1
kind: ConfigMap
metadata:
# ...
ownerReferences: # <1>
- apiVersion: {KafkaConnectApiVersion}
blockOwnerDeletion: false
controller: false
kind: KafkaConnector
name: my-sink-connector
uid: 84a29d7f-77e6-43ac-bfbb-719f9b9a4b3b
resourceVersion: "79241"
uid: 721e30bc-23df-41a2-9b48-fb2b7d9b042c
data:
offsets.json: |-
{
"offsets": [
{
"partition": {
"kafka_topic": "my-topic", # <2>
"kafka_partition": 2 # <3>
},
"offset": {
"kafka_offset": 4 # <4>
}
}
]
}
----
<1> The owner reference pointing to the `KafkaConnector` resource for the sink connector.
<2> The Kafka topic that the sink connector is consuming from.
<3> The partition of the Kafka topic.
<4> The last committed Kafka offset for this topic and partition.
--
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Module included in the following assemblies:
//
// assembly-deploy-kafka-connect-with-plugins.adoc
// assembly-config.adoc

[id='proc-manual-restart-connector-task-{context}']
= Manually restarting Kafka Connect connector tasks
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Module included in the following assemblies:
//
// assembly-deploy-kafka-connect-with-plugins.adoc
// assembly-config.adoc

[id='proc-manual-restart-connector-{context}']
= Manually restarting Kafka Connect connectors
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Module included in the following assemblies:
//
// assembly-deploy-kafka-connect-with-plugins.adoc
// assembly-config.adoc

[id='proc-manual-stop-pause-connector-{context}']
= Manually stopping or pausing Kafka Connect connectors
Expand Down
Loading

0 comments on commit 2b8c900

Please sign in to comment.