Skip to content

Commit

Permalink
split left and right join classes
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf committed Mar 31, 2024
1 parent 522dd26 commit dc60885
Show file tree
Hide file tree
Showing 16 changed files with 660 additions and 456 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;

public enum JoinSide {
LEFT,
RIGHT
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.errors.StreamsException;
Expand All @@ -35,14 +41,6 @@
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.streams.state.internals.JoinSide;

class KStreamImplJoin {

private final InternalStreamsBuilder builder;
Expand Down Expand Up @@ -173,8 +171,8 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
final TimeTrackerSupplier sharedTimeTrackerSupplier = new TimeTrackerSupplier();

final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows);
final KStreamKStreamJoin<K, V1, V2, VOut> joinLeft = new KStreamKStreamJoin<>(
JoinSide.LEFT,

final KStreamKStreamJoin<K, V1, V2, VOut> joinThis = new KStreamKStreamLeftJoin<>(
otherWindowStore.name(),
internalWindows,
joiner,
Expand All @@ -183,8 +181,7 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
sharedTimeTrackerSupplier
);

final KStreamKStreamJoin<K, V2, V1, VOut> joinRight = new KStreamKStreamJoin<>(
JoinSide.RIGHT,
final KStreamKStreamJoin<K, V2, V1, VOut> joinOther = new KStreamKStreamRightJoin<>(
thisWindowStore.name(),
internalWindows,
AbstractStream.reverseJoinerWithKey(joiner),
Expand All @@ -204,8 +201,8 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,

final StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K, V1, V2, VOut> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();

final ProcessorParameters<K, V1, ?, ?> joinThisProcessorParams = new ProcessorParameters<>(joinLeft, joinThisName);
final ProcessorParameters<K, V2, ?, ?> joinOtherProcessorParams = new ProcessorParameters<>(joinRight, joinOtherName);
final ProcessorParameters<K, V1, ?, ?> joinThisProcessorParams = new ProcessorParameters<>(joinThis, joinThisName);
final ProcessorParameters<K, V2, ?, ?> joinOtherProcessorParams = new ProcessorParameters<>(joinOther, joinOtherName);
final ProcessorParameters<K, VOut, ?, ?> joinMergeProcessorParams = new ProcessorParameters<>(joinMerge, joinMergeName);
final ProcessorParameters<K, V1, ?, ?> selfJoinProcessorParams = new ProcessorParameters<>(selfJoin, joinMergeName);

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;

import java.util.Optional;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KStreamKStreamLeftJoin<K, V1, V2, VOut> extends KStreamKStreamJoin<K, V1, V2, VOut> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class);

KStreamKStreamLeftJoin(final String otherWindowName,
final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner,
final boolean outer,
final Optional<String> outerJoinWindowName,
final TimeTrackerSupplier sharedTimeTrackerSupplier) {
super(otherWindowName, windows, windows.beforeMs, windows.afterMs, joiner, outerJoinWindowName,
sharedTimeTrackerSupplier, outer);
}

@Override
public Processor<K, V1, K, VOut> get() {
return new KStreamKStreamLeftJoinProcessor();
}

private class KStreamKStreamLeftJoinProcessor extends KStreamKStreamJoinProcessor {
@Override
public void process(final Record<K, V1> leftRecord) {
final long inputRecordTimestamp = leftRecord.timestamp();
final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);

if (outer && leftRecord.key() == null && leftRecord.value() != null) {
final VOut leftJoinValue = joiner.apply(leftRecord.key(), leftRecord.value(), null);
context().forward(leftRecord.withValue(leftJoinValue));
return;
} else if (StreamStreamJoinUtil.skipRecord(leftRecord, LOG, droppedRecordsSensor, context())) {
return;
}

// Emit all non-joined records which window has closed
if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
leftOuterJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, leftRecord));
}

boolean needOuterJoin = outer;
try (final WindowStoreIterator<V2> iter = otherWindowStore.fetch(leftRecord.key(), timeFrom, timeTo)) {
while (iter.hasNext()) {
needOuterJoin = false;
final KeyValue<Long, V2> rightRecord = iter.next();
final long rightRecordTimestamp = rightRecord.key;

leftOuterJoinStore.ifPresent(store -> {
// use putIfAbsent to first read and see if there's any values for the key,
// if yes delete the key, otherwise do not issue a put;
// we may delete some values with the same key early but since we are going
// range over all values of the same key even after failure, since the other window-store
// is only cleaned up by stream time, so this is okay for at-least-once.
final TimestampedKeyAndJoinSide<K> leftKeyWithRightRecordTimestamp =
TimestampedKeyAndJoinSide.makeRightSide(leftRecord.key(), rightRecordTimestamp);
store.putIfAbsent(leftKeyWithRightRecordTimestamp, null);
});

final VOut joinResult = joiner.apply(leftRecord.key(), leftRecord.value(), rightRecord.value);
context().forward(leftRecord.withValue(joinResult)
.withTimestamp(Math.max(inputRecordTimestamp, rightRecordTimestamp)));
}

if (needOuterJoin) {
// The maxStreamTime contains the max time observed in both sides of the join.
// Having access to the time observed in the other join side fixes the following
// problem:
//
// Say we have a window size of 5 seconds
// 1. A non-joined record with time T10 is seen in the left-topic (maxLeftStreamTime: 10)
// The record is not processed yet, and is added to the outer-join store
// 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
// The record is not processed yet, and is added to the outer-join store
// 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
// It is time to look at the expired records. T10 and T2 should be emitted, but
// because T2 was late, then it is not fetched by the window store, so it is not processed
//
// See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
//
// This condition below allows us to process the out-of-order records without the need
// to hold it in the temporary outer store
if (!leftOuterJoinStore.isPresent() || timeTo < sharedTimeTracker.streamTime) {
final VOut leftJoinValue = joiner.apply(leftRecord.key(), leftRecord.value(), null);
context().forward(leftRecord.withValue(leftJoinValue));
} else {
sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
leftOuterJoinStore.ifPresent(store -> {
final TimestampedKeyAndJoinSide<K> leftKey =
TimestampedKeyAndJoinSide.makeLeftSide(leftRecord.key(), inputRecordTimestamp);
final LeftOrRightValue<V1, V2> leftValue = LeftOrRightValue.makeLeftValue(leftRecord.value());
store.put(leftKey, leftValue);
});
}
}
}
}

private void emitNonJoinedOuterRecords(
final KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> store,
final Record<K, V1> record) {

// calling `store.all()` creates an iterator what is an expensive operation on RocksDB;
// to reduce runtime cost, we try to avoid paying those cost

// only try to emit left/outer join results if there _might_ be any result records
if (sharedTimeTracker.minTime + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) {
return;
}
// throttle the emit frequency to a (configurable) interval;
// we use processing time to decouple from data properties,
// as throttling is a non-functional performance optimization
if (internalProcessorContext.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) {
return;
}

// Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, if we dont set it everytime,
// they can get out of sync during a clock drift
sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs();
sharedTimeTracker.advanceNextTimeToEmit();

// reset to MAX_VALUE in case the store is empty
sharedTimeTracker.minTime = Long.MAX_VALUE;

try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> it = store.all()) {
TimestampedKeyAndJoinSide<K> prevKey = null;

while (it.hasNext()) {
final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> next = it.next();
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = next.key;
final LeftOrRightValue<V1, V2> value = next.value;
final K key = timestampedKeyAndJoinSide.getKey();
final long timestamp = timestampedKeyAndJoinSide.getTimestamp();
sharedTimeTracker.minTime = timestamp;

// Skip next records if window has not closed
final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs
>= sharedTimeTracker.streamTime) {
continue; // there are possibly candidates left on the other outerJoin-side
}

final VOut nullJoinedValue = joiner.apply(key, value.getLeftValue(), value.getRightValue());

context().forward(
record.withKey(key)
.withValue(nullJoinedValue)
.withTimestamp(timestamp)
);

if (prevKey != null && !prevKey.equals(timestampedKeyAndJoinSide)) {
// blind-delete the previous key from the outer window store now it is emitted;
// we do this because this delete would remove the whole list of values of the same key,
// and hence if we delete eagerly and then fail, we would miss emitting join results of the
// later
// values in the list.
// we do not use delete() calls since it would incur extra get()
store.put(prevKey, null);
}

prevKey = timestampedKeyAndJoinSide;
}

// at the end of the iteration, we need to delete the last key
if (prevKey != null) {
store.put(prevKey, null);
}
}
}
}
}
Loading

0 comments on commit dc60885

Please sign in to comment.