-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items #14426
Changes from 13 commits
97050ef
20bb46e
a2ecbcb
bba82fd
c7354c5
134de21
4396399
a5b68f8
54be2e6
e12bfd6
513ae8c
d91f2ea
6e16a0e
8a20ca7
1a0d887
ec48dbb
edf535c
2cf235a
8497258
eb1d438
f00d0f4
7e75db7
69bbf43
34d4d24
f401d91
8f547a8
c22f15b
32c5ec2
ce7ff3f
9a8f8f5
664e428
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { | |
} | ||
} | ||
|
||
@Test | ||
public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this test case? Seems it's fully contained in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, removed this test. |
||
final StreamsBuilder builder = new StreamsBuilder(); | ||
|
||
final int[] expectedKeys = new int[] {0, 1, 2, 3}; | ||
|
||
final KStream<Integer, String> stream1; | ||
final KStream<Integer, String> stream2; | ||
final KStream<Integer, String> joined; | ||
final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); | ||
stream1 = builder.stream(topic1, consumed); | ||
stream2 = builder.stream(topic2, consumed); | ||
|
||
joined = stream1.leftJoin( | ||
stream2, | ||
MockValueJoiner.TOSTRING_JOINER, | ||
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), | ||
StreamJoined.with(Serdes.Integer(), | ||
Serdes.String(), | ||
Serdes.String()) | ||
); | ||
joined.process(supplier); | ||
|
||
final Collection<Set<String>> copartitionGroups = | ||
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); | ||
|
||
assertEquals(1, copartitionGroups.size()); | ||
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); | ||
|
||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { | ||
final TestInputTopic<Integer, String> inputTopic1 = | ||
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); | ||
final TestInputTopic<Integer, String> inputTopic2 = | ||
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); | ||
final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor(); | ||
|
||
processor.init(null); | ||
// push four items with increasing timestamps to the primary stream; this should emit null-joined items | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. modified comment |
||
// w1 = {} | ||
// w2 = {} | ||
// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed B into A |
||
// w2 = {} | ||
final long time = 1000L; | ||
for (int i = 0; i < expectedKeys.length; i++) { | ||
inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], time + i); | ||
} | ||
processor.checkAndClearProcessResult( | ||
new KeyValueTimestamp<>(0, "B0+null", 1000L), | ||
new KeyValueTimestamp<>(1, "B1+null", 1001L), | ||
new KeyValueTimestamp<>(2, "B2+null", 1002L) | ||
); | ||
} | ||
VictorvandenHoven marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
@Test | ||
public void testLeftJoinedRecordsWithZeroAfterAreEmitted() { | ||
final StreamsBuilder builder = new StreamsBuilder(); | ||
|
||
final int[] expectedKeys = new int[] {0, 1, 2, 3}; | ||
final int[] expectedKeysNotJoined = new int[] {10, 11, 12, 13}; | ||
|
||
final KStream<Integer, String> stream1; | ||
final KStream<Integer, String> stream2; | ||
final KStream<Integer, String> joined; | ||
final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); | ||
stream1 = builder.stream(topic1, consumed); | ||
stream2 = builder.stream(topic2, consumed); | ||
|
||
joined = stream1.leftJoin( | ||
stream2, | ||
MockValueJoiner.TOSTRING_JOINER, | ||
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(Duration.ZERO), | ||
StreamJoined.with(Serdes.Integer(), | ||
Serdes.String(), | ||
Serdes.String()) | ||
); | ||
joined.process(supplier); | ||
|
||
final Collection<Set<String>> copartitionGroups = | ||
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); | ||
|
||
assertEquals(1, copartitionGroups.size()); | ||
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); | ||
|
||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { | ||
final TestInputTopic<Integer, String> inputTopic1 = | ||
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); | ||
final TestInputTopic<Integer, String> inputTopic2 = | ||
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); | ||
final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor(); | ||
|
||
processor.init(null); | ||
|
||
// push four items with increasing timestamps to the primary stream; the other window is empty; | ||
// this should emit the first three left-joined items; | ||
// A3 is not triggered yet | ||
// w1 = {} | ||
// w2 = {} | ||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } | ||
// w2 = {} | ||
long time = 1000L; | ||
for (int i = 0; i < expectedKeys.length; i++) { | ||
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i); | ||
} | ||
processor.checkAndClearProcessResult( | ||
new KeyValueTimestamp<>(0, "A0+null", 1000L), | ||
new KeyValueTimestamp<>(1, "A1+null", 1001L), | ||
new KeyValueTimestamp<>(2, "A2+null", 1002L) | ||
); | ||
|
||
// push four items smaller timestamps (out of window) to the secondary stream; | ||
// this should produce four joined items | ||
VictorvandenHoven marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } | ||
// w2 = {} | ||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } | ||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999) } | ||
time = 1000L - 1L; | ||
for (final int expectedKey : expectedKeys) { | ||
inputTopic2.pipeInput(expectedKey, "a" + expectedKey, time); | ||
} | ||
processor.checkAndClearProcessResult( | ||
new KeyValueTimestamp<>(0, "A0+a0", 1000L), | ||
new KeyValueTimestamp<>(1, "A1+a1", 1001L), | ||
new KeyValueTimestamp<>(2, "A2+a2", 1002L), | ||
new KeyValueTimestamp<>(3, "A3+a3", 1003L) | ||
); | ||
|
||
// push four items with increased timestamps to the secondary stream; | ||
// this should produce four joined item | ||
VictorvandenHoven marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } | ||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999) } | ||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } | ||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), | ||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000) } | ||
time += 1L; | ||
for (final int expectedKey : expectedKeys) { | ||
inputTopic2.pipeInput(expectedKey, "b" + expectedKey, time); | ||
} | ||
processor.checkAndClearProcessResult( | ||
new KeyValueTimestamp<>(0, "A0+b0", 1000L), | ||
new KeyValueTimestamp<>(1, "A1+b1", 1001L), | ||
new KeyValueTimestamp<>(2, "A2+b2", 1002L), | ||
new KeyValueTimestamp<>(3, "A3+b3", 1003L) | ||
); | ||
|
||
// push four items with increased timestamps to the secondary stream; | ||
// this should produce only three joined items; | ||
// c0 arrives too late to be joined with A0 | ||
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } | ||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), | ||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000) } | ||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } | ||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), | ||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), | ||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001) } | ||
time += 1L; | ||
for (final int expectedKey : expectedKeys) { | ||
inputTopic2.pipeInput(expectedKey, "c" + expectedKey, time); | ||
} | ||
processor.checkAndClearProcessResult( | ||
new KeyValueTimestamp<>(1, "A1+c1", 1001L), | ||
new KeyValueTimestamp<>(2, "A2+c2", 1002L), | ||
new KeyValueTimestamp<>(3, "A3+c3", 1003L) | ||
); | ||
|
||
// push four items with increased timestamps to the secondary stream; | ||
// this should produce only two joined items; | ||
// d0 and d1 arrive too late to be joined with A0 and A1 | ||
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } | ||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), | ||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), | ||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001) } | ||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } | ||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), | ||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), | ||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001), | ||
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002) } | ||
time += 1L; | ||
for (final int expectedKey : expectedKeys) { | ||
inputTopic2.pipeInput(expectedKey, "d" + expectedKey, time); | ||
} | ||
processor.checkAndClearProcessResult( | ||
new KeyValueTimestamp<>(2, "A2+d2", 1002L), | ||
new KeyValueTimestamp<>(3, "A3+d3", 1003L) | ||
); | ||
|
||
// push four items with increased timestamps to the secondary stream; | ||
// this should produce one joined item; | ||
// only e3 can be joined with A3; | ||
// e0, e1 and e2 arrive too late to be joined with A0, A1 and A2 | ||
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } | ||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), | ||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), | ||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001), | ||
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002) } | ||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } | ||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), | ||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), | ||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001), | ||
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002), | ||
// 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003) } | ||
time += 1L; | ||
for (final int expectedKey : expectedKeys) { | ||
inputTopic2.pipeInput(expectedKey, "e" + expectedKey, time); | ||
} | ||
processor.checkAndClearProcessResult( | ||
new KeyValueTimestamp<>(3, "A3+e3", 1003L) | ||
); | ||
|
||
// push four items with larger timestamps to the secondary stream; | ||
// no (non-)joined items can be produced | ||
// | ||
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } | ||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), | ||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), | ||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001), | ||
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002), | ||
// 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003) } | ||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } | ||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), | ||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), | ||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001), | ||
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002), | ||
// 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003), | ||
// 0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100) } | ||
time = 1000 + 100L; | ||
for (final int expectedKey : expectedKeys) { | ||
inputTopic2.pipeInput(expectedKey, "f" + expectedKey, time); | ||
} | ||
processor.checkAndClearProcessResult(); | ||
} | ||
} | ||
|
||
@Test | ||
public void testLeftJoinWithInMemoryCustomSuppliers() { | ||
final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)); | ||
|
@@ -609,17 +842,18 @@ public void testOrdering() { | |
inputTopic1.pipeInput(1, "A1", 100L); | ||
processor.checkAndClearProcessResult(); | ||
|
||
// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then | ||
// the joined records | ||
// push one item to the other window that has a join; | ||
// this should produce the joined record first; | ||
// then non-joined record with a closed window | ||
// by the time they were produced before | ||
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } | ||
// w2 = { } | ||
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } | ||
// --> w2 = { 1:a1 (ts: 110) } | ||
inputTopic2.pipeInput(1, "a1", 110L); | ||
processor.checkAndClearProcessResult( | ||
new KeyValueTimestamp<>(0, "A0+null", 0L), | ||
new KeyValueTimestamp<>(1, "A1+a1", 110L) | ||
new KeyValueTimestamp<>(1, "A1+a1", 110L), | ||
new KeyValueTimestamp<>(0, "A0+null", 0L) | ||
VictorvandenHoven marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi,
Seems like
outerJoinLeftBreak && outerJoinRightBreak
is always false.Doesn't this break the behavior described in the comment on top of this block?
// Skip next records if window has not closed
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably these two lines need to be outside the while loop:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See: #15510