Skip to content

Commit

Permalink
Rebase
Browse files Browse the repository at this point in the history
Signed-off-by: Federico Valeri <[email protected]>
  • Loading branch information
fvaleri committed Sep 2, 2024
1 parent 2a595be commit 0166acb
Show file tree
Hide file tree
Showing 15 changed files with 190 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@ private Collection<AlterConfigOp> buildAlterConfigOps(ReconcilableTopic reconcil
new ConfigEntry(key, null),
AlterConfigOp.OpType.DELETE));
}

return alterConfigOps;
}

Expand Down Expand Up @@ -894,31 +893,4 @@ private void updateStatusForException(ReconcilableTopic reconcilableTopic, Excep
kubernetesHandler.updateStatus(reconcilableTopic);
metricsHolder.failedReconciliationsCounter(config.namespace()).increment();
}

private void updateStatusForSuccess(ReconcilableTopic reconcilableTopic, Results results) {
List<Condition> conditions = new ArrayList<>();
var conditionType = "Ready";
if (!TopicOperatorUtil.isManaged(reconcilableTopic.kt())) {
conditionType = "Unmanaged";
} else if (TopicOperatorUtil.isPaused(reconcilableTopic.kt())) {
conditionType = "ReconciliationPaused";
}

conditions.add(new ConditionBuilder()
.withType(conditionType)
.withStatus("True")
.withLastTransitionTime(StatusUtils.iso8601Now())
.build());

conditions.addAll(results.getConditions(reconcilableTopic));

reconcilableTopic.kt().setStatus(
new KafkaTopicStatusBuilder(reconcilableTopic.kt().getStatus())
.withConditions(conditions)
.withReplicasChange(results.getReplicasChange(reconcilableTopic))
.build());

kubernetesHandler.updateStatus(reconcilableTopic);
metricsHolder.successfulReconciliationsCounter(config.namespace()).increment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@

import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.metrics.MetricsHolder;
import io.strimzi.operator.topic.model.TopicEvent.TopicDelete;
import io.strimzi.operator.topic.model.TopicEvent.TopicUpsert;

import java.util.Objects;

import static io.strimzi.operator.common.Annotations.isReconciliationPausedWithAnnotation;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.util.concurrent.TimeUnit;

/**
* Handler for {@link KafkaTopic} events.
Expand All @@ -38,7 +37,7 @@ public TopicEventHandler(TopicOperatorConfig config, BatchingLoop queue, Metrics
public void onAdd(KafkaTopic obj) {
LOGGER.debugOp("Informed about add event for topic {}", TopicOperatorUtil.topicName(obj));
metrics.resourceCounter(config.namespace()).incrementAndGet();
if (isReconciliationPausedWithAnnotation(obj)) {
if (Annotations.isReconciliationPausedWithAnnotation(obj)) {
metrics.pausedResourceCounter(config.namespace()).incrementAndGet();
}
queue.offer(new TopicUpsert(System.nanoTime(), obj.getMetadata().getNamespace(),
Expand All @@ -49,16 +48,16 @@ public void onAdd(KafkaTopic obj) {
@Override
public void onUpdate(KafkaTopic oldObj, KafkaTopic newObj) {
String trigger = Objects.equals(oldObj, newObj) ? "resync" : "update";
if (trigger.equals("resync") && (NANOSECONDS.toMillis(System.nanoTime()) - lastPeriodicTimestampMs) > config.fullReconciliationIntervalMs()) {
if (trigger.equals("resync") && (TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - lastPeriodicTimestampMs) > config.fullReconciliationIntervalMs()) {
LOGGER.infoOp("Triggering periodic reconciliation of {} resources for namespace {}", KafkaTopic.RESOURCE_KIND, config.namespace());
this.lastPeriodicTimestampMs = NANOSECONDS.toMillis(System.nanoTime());
this.lastPeriodicTimestampMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
if (trigger.equals("update")) {
LOGGER.debugOp("Informed about update event for topic {}", TopicOperatorUtil.topicName(newObj));
}
if (isReconciliationPausedWithAnnotation(oldObj) && !isReconciliationPausedWithAnnotation(newObj)) {
if (Annotations.isReconciliationPausedWithAnnotation(oldObj) && !Annotations.isReconciliationPausedWithAnnotation(newObj)) {
metrics.pausedResourceCounter(config.namespace()).decrementAndGet();
} else if (!isReconciliationPausedWithAnnotation(oldObj) && isReconciliationPausedWithAnnotation(newObj)) {
} else if (!Annotations.isReconciliationPausedWithAnnotation(oldObj) && Annotations.isReconciliationPausedWithAnnotation(newObj)) {
metrics.pausedResourceCounter(config.namespace()).incrementAndGet();
}
queue.offer(new TopicUpsert(System.nanoTime(), newObj.getMetadata().getNamespace(),
Expand All @@ -70,7 +69,7 @@ public void onUpdate(KafkaTopic oldObj, KafkaTopic newObj) {
public void onDelete(KafkaTopic obj, boolean deletedFinalStateUnknown) {
LOGGER.debugOp("Informed about delete event for topic {}", TopicOperatorUtil.topicName(obj));
metrics.resourceCounter(config.namespace()).decrementAndGet();
if (isReconciliationPausedWithAnnotation(obj)) {
if (Annotations.isReconciliationPausedWithAnnotation(obj)) {
metrics.pausedResourceCounter(config.namespace()).decrementAndGet();
}
if (config.useFinalizer()) {
Expand Down
Loading

0 comments on commit 0166acb

Please sign in to comment.