Skip to content

Commit

Permalink
Add Topic Operator refactoring: handlers (#10412)
Browse files Browse the repository at this point in the history
Signed-off-by: Federico Valeri <[email protected]>
  • Loading branch information
fvaleri authored Sep 5, 2024
1 parent b422f3e commit f430670
Show file tree
Hide file tree
Showing 25 changed files with 3,210 additions and 1,875 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,55 +30,64 @@
* the reconciliation of those events using a {@link BatchingTopicController}.
* Any given {@link KafkaTopic} is only being reconciled by a single thread at any one time.
*/
class BatchingLoop {

public class BatchingLoop {
static final ReconciliationLogger LOGGER = ReconciliationLogger.create(BatchingLoop.class);

private final BatchingTopicController controller;
private final BlockingDeque<TopicEvent> queue;

private final int maxQueueSize;
private final int maxBatchSize;
private final long maxBatchLingerMs;
private final String namespace;

private final ItemStore<KafkaTopic> itemStore;
private final Runnable stopRunnable;
private final TopicOperatorMetricsHolder metricsHolder;

/**
* The set of topics currently being reconciled by a controller.
* Guarded by the monitor of the BatchingLoop.
* This functions as mechanism for preventing concurrent reconciliation of the same topic.
*/
private final Set<KubeRef> inFlight = new HashSet<>(); // guarded by this
private final LoopRunnable[] threads;
private final int maxBatchSize;
private final long maxBatchLingerMs;
private final ItemStore<KafkaTopic> itemStore;
private final Runnable stop;
private final int maxQueueSize;
private final TopicOperatorMetricsHolder metrics;
private final String namespace;

public BatchingLoop(
int maxQueueSize,
BatchingTopicController controller,
int maxThreads,
int maxBatchSize,
long maxBatchLingerMs,
ItemStore<KafkaTopic> itemStore,
Runnable stop,
TopicOperatorMetricsHolder metrics,
String namespace) {
this.maxQueueSize = maxQueueSize;
this.queue = new LinkedBlockingDeque<>(maxQueueSize);
/**
* Create a new instance.
*
* @param config Topic Operator configuration.
* @param controller KafkaTopic controller.
* @param maxThreads Max number of LoopRunnable threads.
* @param itemStore Item store.
* @param stopRunnable Stop runnable.
* @param metricsHolder Metrics holder.
*/
public BatchingLoop(TopicOperatorConfig config,
BatchingTopicController controller,
int maxThreads,
ItemStore<KafkaTopic> itemStore,
Runnable stopRunnable,
TopicOperatorMetricsHolder metricsHolder) {
this.maxQueueSize = config.maxQueueSize();
this.maxBatchSize = config.maxBatchSize();
this.maxBatchLingerMs = config.maxBatchLingerMs();
this.namespace = config.namespace();

this.controller = controller;
this.itemStore = itemStore;
this.stopRunnable = stopRunnable;
this.metricsHolder = metricsHolder;

this.queue = new LinkedBlockingDeque<>(maxQueueSize);
this.threads = new LoopRunnable[maxThreads];
for (int i = 0; i < maxThreads; i++) {
threads[i] = new LoopRunnable("LoopRunnable-" + i);
}
this.maxBatchSize = maxBatchSize;
this.maxBatchLingerMs = maxBatchLingerMs;
this.itemStore = itemStore;
this.stop = stop;
this.metrics = metrics;
this.namespace = namespace;
}

/**
* Starts the threads
* Starts the threads.
*/
public void start() {
for (var thread : threads) {
Expand All @@ -87,7 +96,7 @@ public void start() {
}

/**
* Stops the threads
* Stops the threads.
* @throws InterruptedException If interrupted while waiting for the threads to stop.
*/
public void stop() throws InterruptedException {
Expand All @@ -101,24 +110,24 @@ public void stop() throws InterruptedException {

/**
* Add an event to be reconciled to the tail of the {@link #queue}.
* @param event The event
* @param event The event.
*/
public void offer(TopicEvent event) {
if (queue.offerFirst(event)) {
LOGGER.debugOp("Item {} added to front of queue", event);
metrics.reconciliationsMaxQueueSize(namespace).getAndUpdate(size -> Math.max(size, queue.size()));
metricsHolder.reconciliationsMaxQueueSize(namespace).getAndUpdate(size -> Math.max(size, queue.size()));
} else {
LOGGER.errorOp("Queue length {} exceeded, stopping operator. Please increase {} environment variable.",
maxQueueSize,
TopicOperatorConfig.MAX_QUEUE_SIZE.key());
this.stop.run();
maxQueueSize,
TopicOperatorConfig.MAX_QUEUE_SIZE.key());
this.stopRunnable.run();
}
}

/**
* The loop is alive if none of the threads have been blocked for more than 2 minutes.
* "Blocked" means they're not returned to their outermost loop.
* @return True if the loop is alive..
* @return True if the loop is alive.
*/
boolean isAlive() {
for (var thread : threads) {
Expand Down Expand Up @@ -154,7 +163,6 @@ boolean isReady() {
* A thread that services the head of the {@link #queue}.
*/
class LoopRunnable extends Thread {

private volatile boolean stopRequested = false;

LoopRunnable(String name) {
Expand All @@ -163,21 +171,21 @@ class LoopRunnable extends Thread {
}

static final ReconciliationLogger LOGGER = ReconciliationLogger.create(LoopRunnable.class);
private volatile long lastLoop = System.nanoTime();
private volatile long lastLoopNs = System.nanoTime();

long msSinceLastLoop() {
return (System.nanoTime() - lastLoop) / 1_000_000;
return (System.nanoTime() - lastLoopNs) / 1_000_000;
}

@Override
public void run() {
LOGGER.debugOp("Entering run()");
Batch batch = new Batch(maxBatchSize);
int batchId = 0;
lastLoop = System.nanoTime();
var batch = new Batch(maxBatchSize);
var batchId = 0;
lastLoopNs = System.nanoTime();
while (!runOnce(batchId, batch)) {
batchId++;
lastLoop = System.nanoTime();
lastLoopNs = System.nanoTime();
}
LOGGER.debugOp("Exiting run()");
}
Expand Down Expand Up @@ -232,15 +240,15 @@ private ReconcilableTopic lookup(int batchId, TopicUpsert topicUpsert) {
var kt = itemStore.get(key);
if (kt != null) {
LOGGER.traceOp("[Batch #{}] Lookup from item store for {} yielded KafkaTopic with resourceVersion {}",
batchId, topicUpsert, BatchingTopicController.resourceVersion(kt));
batchId, topicUpsert, TopicOperatorUtil.resourceVersion(kt));
var r = new Reconciliation("upsert", "KafkaTopic", topicUpsert.namespace(), topicUpsert.name());
LOGGER.debugOp("[Batch #{}] Contains {}", batchId, r);
return new ReconcilableTopic(r, kt, TopicOperatorUtil.topicName(kt));
} else {
// Null can happen if the KafkaTopic has been deleted from Kube and we've not yet processed
// the corresponding delete event
LOGGER.traceOp("[Batch #{}] Lookup from item store for {} yielded nothing",
batchId, topicUpsert);
batchId, topicUpsert);
return null;
}
}
Expand All @@ -249,20 +257,20 @@ private void fillBatch(int batchId, Batch batch) throws InterruptedException {
LOGGER.traceOp("[Batch #{}] Filling", batchId);
List<TopicEvent> rejected = new ArrayList<>();

final long deadlineNanoTime = System.nanoTime() + maxBatchLingerMs * 1_000_000;
var deadlineNs = System.nanoTime() + maxBatchLingerMs * 1_000_000;
while (true) {
if (batch.size() >= maxBatchSize) {
LOGGER.traceOp("[Batch #{}] Reached maxBatchSize, batch complete", batchId, maxBatchSize);
break;
}

long timeoutNs = deadlineNanoTime - System.nanoTime();
var timeoutNs = deadlineNs - System.nanoTime();
if (timeoutNs <= 0) {
LOGGER.traceOp("[Batch #{}] {}ms linger expired", batchId, maxBatchLingerMs);
break;
}
LOGGER.traceOp("[Batch #{}] Taking next item from deque head with timeout {}ns", batchId, timeoutNs);
TopicEvent topicEvent = queue.pollFirst(timeoutNs, TimeUnit.NANOSECONDS);
var topicEvent = queue.pollFirst(timeoutNs, TimeUnit.NANOSECONDS);

if (topicEvent == null) {
LOGGER.traceOp("[Batch #{}] Linger expired, batch complete", batchId);
Expand All @@ -271,7 +279,7 @@ private void fillBatch(int batchId, Batch batch) throws InterruptedException {
addToBatch(batchId, batch, rejected, topicEvent);
}
LOGGER.traceOp("[Batch #{}] Filled with {} topics", batchId, batch.size());
metrics.reconciliationsMaxBatchSize(namespace).getAndUpdate(size -> Math.max(size, batch.size()));
metricsHolder.reconciliationsMaxBatchSize(namespace).getAndUpdate(size -> Math.max(size, batch.size()));

// here we need a deque and can push `rejected` back on the front of the queue
// where they can be taken by the next thread.
Expand All @@ -287,7 +295,7 @@ private void addToBatch(int batchId, Batch batch, List<TopicEvent> rejected, Top
// E.g. upset then delete is equivalent to just a delete
// It's actually a bit tricky since you have to process the events in reverse order to correctly
// simplify them, so `Batch` would have to be something like a `Map<Ref, List<TopicEvent>>`.
KubeRef ref = topicEvent.toRef();
var ref = topicEvent.toRef();
if (inFlight.add(ref)) {
// wasn't already inflight
LOGGER.debugOp("[Batch #{}] Adding {}", batchId, topicEvent);
Expand All @@ -299,7 +307,7 @@ private void addToBatch(int batchId, Batch batch, List<TopicEvent> rejected, Top
} else {
LOGGER.debugOp("[Batch #{}] Rejecting item {}, already inflight", batchId, topicEvent);
rejected.add(topicEvent);
metrics.lockedReconciliationsCounter(namespace).increment();
metricsHolder.lockedReconciliationsCounter(namespace).increment();
}
}
}
Expand Down
Loading

0 comments on commit f430670

Please sign in to comment.