Skip to content

Commit

Permalink
Improve logging of altering or resetting connecotr offsets (strimzi#1…
Browse files Browse the repository at this point in the history
…0840)

Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj authored and ocorriga committed Dec 6, 2024
1 parent 0a585a0 commit 63f2690
Showing 1 changed file with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -847,11 +847,17 @@ private Future<List<Condition>> alterConnectorOffsets(Reconciliation reconciliat
return Future.succeededFuture(conditions);
}

LOGGER.infoCr(reconciliation, "Altering offsets of connector {}", connectorName);

String configMapNamespace = resource.getMetadata().getNamespace();
String configMapName = alterOffsetsConfig.get().getFromConfigMap().getName();
return verifyConnectorStopped(reconciliation, host, apiClient, connectorName)
.compose(v -> getOffsetsForAlterRequest(configMapNamespace, configMapName, getConnectorOffsetsConfigMapEntryKey(connectorName)))
.compose(offsets -> apiClient.alterConnectorOffsets(reconciliation, host, port, connectorName, offsets))
.compose(v -> {
LOGGER.infoCr(reconciliation, "Offsets of connector {} were altered", connectorName);
return Future.succeededFuture();
})
.compose(v -> removeConnectorOffsetsAnnotations(reconciliation, resource))
.map(v -> conditions)
.otherwise(throwable -> {
Expand Down Expand Up @@ -908,8 +914,14 @@ private Future<String> getOffsetsForAlterRequest(String configMapNamespace, Stri
*/
@SuppressWarnings({ "rawtypes" })
private Future<List<Condition>> resetConnectorOffsets(Reconciliation reconciliation, String host, KafkaConnectApi apiClient, String connectorName, CustomResource resource, List<Condition> conditions) {
LOGGER.infoCr(reconciliation, "Resetting offsets of connector {}", connectorName);

return verifyConnectorStopped(reconciliation, host, apiClient, connectorName)
.compose(v -> apiClient.resetConnectorOffsets(reconciliation, host, port, connectorName))
.compose(v -> {
LOGGER.infoCr(reconciliation, "Offsets of connector {} were reset", connectorName);
return Future.succeededFuture();
})
.compose(v -> removeConnectorOffsetsAnnotations(reconciliation, resource))
.map(v -> conditions)
.otherwise(throwable -> {
Expand Down

0 comments on commit 63f2690

Please sign in to comment.