Skip to content

Commit

Permalink
KAFKA-15417: flip joinSpuriousLookBackTimeMs and emit non-joined items (
Browse files Browse the repository at this point in the history
apache#14426)

Kafka Streams support asymmetric join windows. Depending on the window configuration
we need to compute window close time etc differently.

This PR flips `joinSpuriousLookBackTimeMs`, because they were not correct, and
introduced the `windowsAfterIntervalMs`-field that is used to find if emitting records can be skipped.

Reviewers: Hao Li <[email protected]>, Guozhang Wang <[email protected]>, Matthias J. Sax <[email protected]>
  • Loading branch information
VictorvandenHoven authored and clolov committed Apr 5, 2024
1 parent 7d89dfc commit f19d71d
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
private final long joinAfterMs;
private final long joinGraceMs;
private final boolean enableSpuriousResultFix;
private final long joinSpuriousLookBackTimeMs;
private final long windowsBeforeMs;
private final long windowsAfterMs;

private final boolean outer;
private final boolean isLeftSide;
Expand All @@ -72,12 +73,12 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
if (isLeftSide) {
this.joinBeforeMs = windows.beforeMs;
this.joinAfterMs = windows.afterMs;
this.joinSpuriousLookBackTimeMs = windows.beforeMs;
} else {
this.joinBeforeMs = windows.afterMs;
this.joinAfterMs = windows.beforeMs;
this.joinSpuriousLookBackTimeMs = windows.afterMs;
}
this.windowsAfterMs = windows.afterMs;
this.windowsBeforeMs = windows.beforeMs;
this.joinGraceMs = windows.gracePeriodMs();
this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
this.joiner = joiner;
Expand Down Expand Up @@ -136,11 +137,12 @@ public void process(final Record<K, V1> record) {
return;
}

boolean needOuterJoin = outer;
// Emit all non-joined records which window has closed
if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record));
}

boolean needOuterJoin = outer;
try (final WindowStoreIterator<V2> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
while (iter.hasNext()) {
needOuterJoin = false;
Expand Down Expand Up @@ -200,7 +202,7 @@ private void emitNonJoinedOuterRecords(
// to reduce runtime cost, we try to avoid paying those cost

// only try to emit left/outer join results if there _might_ be any result records
if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinSpuriousLookBackTimeMs - joinGraceMs) {
if (sharedTimeTracker.minTime + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) {
return;
}
// throttle the emit frequency to a (configurable) interval;
Expand All @@ -222,6 +224,8 @@ private void emitNonJoinedOuterRecords(
TimestampedKeyAndJoinSide<K> prevKey = null;

while (it.hasNext()) {
boolean outerJoinLeftBreak = false;
boolean outerJoinRightBreak = false;
final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> next = it.next();
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = next.key;
final LeftOrRightValue<V1, V2> value = next.value;
Expand All @@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords(
sharedTimeTracker.minTime = timestamp;

// Skip next records if window has not closed
if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) {
break;
final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) {
if (timestampedKeyAndJoinSide.isLeftSide()) {
outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side
} else {
outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side
}
if (outerJoinLeftBreak && outerJoinRightBreak) {
break; // there are no more candidates to emit on left-outerJoin-side and
// right-outerJoin-side
} else {
continue; // there are possibly candidates left on the other outerJoin-side
}
}

final VOut nullJoinedValue;
Expand Down Expand Up @@ -268,6 +283,15 @@ private void emitNonJoinedOuterRecords(
}
}

private long getOuterJoinLookBackTimeMs(final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
// depending on the JoinSide we fill in the outerJoinLookBackTimeMs
if (timestampedKeyAndJoinSide.isLeftSide()) {
return windowsAfterMs; // On the left-JoinSide we look back in time
} else {
return windowsBeforeMs; // On the right-JoinSide we look forward in time
}
}

@Override
public void close() {
sharedTimeTrackerSupplier.remove(context().taskId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
Expand Down Expand Up @@ -99,6 +100,7 @@ public void before(final TestInfo testInfo) throws IOException {
final String safeTestName = safeUniqueTestName(testInfo);
streamsConfig = getStreamsConfig(safeTestName);
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
streamsConfig.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,184 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
}
}

@Test
public void testLeftJoinedRecordsWithZeroAfterAreEmitted() {
final StreamsBuilder builder = new StreamsBuilder();

final int[] expectedKeys = new int[] {0, 1, 2, 3};

final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);

joined = stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
StreamJoined.with(Serdes.Integer(),
Serdes.String(),
Serdes.String())
);
joined.process(supplier);

final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();

assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();

processor.init(null);

// push four items with increasing timestamps to the primary stream; the other window is empty;
// this should emit the first three left-joined items;
// A3 is not triggered yet
// w1 = {}
// w2 = {}
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = {}
long time = 1000L;
for (int i = 0; i < expectedKeys.length; i++) {
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i);
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+null", 1000L),
new KeyValueTimestamp<>(1, "A1+null", 1001L),
new KeyValueTimestamp<>(2, "A2+null", 1002L)
);

// push four items smaller timestamps (out of window) to the secondary stream;
// this should produce four joined items
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = {}
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999) }
time = 1000L - 1L;
for (final int expectedKey : expectedKeys) {
inputTopic2.pipeInput(expectedKey, "a" + expectedKey, time);
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+a0", 1000L),
new KeyValueTimestamp<>(1, "A1+a1", 1001L),
new KeyValueTimestamp<>(2, "A2+a2", 1002L),
new KeyValueTimestamp<>(3, "A3+a3", 1003L)
);

// push four items with increased timestamps to the secondary stream;
// this should produce four joined item
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999) }
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000) }
time += 1L;
for (final int expectedKey : expectedKeys) {
inputTopic2.pipeInput(expectedKey, "b" + expectedKey, time);
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+b0", 1000L),
new KeyValueTimestamp<>(1, "A1+b1", 1001L),
new KeyValueTimestamp<>(2, "A2+b2", 1002L),
new KeyValueTimestamp<>(3, "A3+b3", 1003L)
);

// push four items with increased timestamps to the secondary stream;
// this should produce only three joined items;
// c0 arrives too late to be joined with A0
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000) }
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001) }
time += 1L;
for (final int expectedKey : expectedKeys) {
inputTopic2.pipeInput(expectedKey, "c" + expectedKey, time);
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(1, "A1+c1", 1001L),
new KeyValueTimestamp<>(2, "A2+c2", 1002L),
new KeyValueTimestamp<>(3, "A3+c3", 1003L)
);

// push four items with increased timestamps to the secondary stream;
// this should produce only two joined items;
// d0 and d1 arrive too late to be joined with A0 and A1
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001) }
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002) }
time += 1L;
for (final int expectedKey : expectedKeys) {
inputTopic2.pipeInput(expectedKey, "d" + expectedKey, time);
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(2, "A2+d2", 1002L),
new KeyValueTimestamp<>(3, "A3+d3", 1003L)
);

// push four items with increased timestamps to the secondary stream;
// this should produce one joined item;
// only e3 can be joined with A3;
// e0, e1 and e2 arrive too late to be joined with A0, A1 and A2
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002) }
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
// 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003) }
time += 1L;
for (final int expectedKey : expectedKeys) {
inputTopic2.pipeInput(expectedKey, "e" + expectedKey, time);
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(3, "A3+e3", 1003L)
);

// push four items with larger timestamps to the secondary stream;
// no (non-)joined items can be produced
//
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
// 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003) }
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
// 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003),
// 0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100) }
time = 1000 + 100L;
for (final int expectedKey : expectedKeys) {
inputTopic2.pipeInput(expectedKey, "f" + expectedKey, time);
}
processor.checkAndClearProcessResult();
}
}

@Test
public void testLeftJoinWithInMemoryCustomSuppliers() {
final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L));
Expand Down Expand Up @@ -609,8 +787,9 @@ public void testOrdering() {
inputTopic1.pipeInput(1, "A1", 100L);
processor.checkAndClearProcessResult();

// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then
// the joined records
// push one item to the other window that has a join;
// this should produce the joined record first;
// then non-joined record with a closed window
// by the time they were produced before
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// w2 = { }
Expand Down
Loading

0 comments on commit f19d71d

Please sign in to comment.