Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items #14426

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
97050ef
KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items
VictorvandenHoven Sep 22, 2023
20bb46e
KAFKA-15417 Checkstyle issues
VictorvandenHoven Sep 22, 2023
a2ecbcb
KAFKA-15417 layout
VictorvandenHoven Sep 25, 2023
bba82fd
KAFKA-15417 force build
VictorvandenHoven Sep 25, 2023
c7354c5
KAFKA-15417 force build
VictorvandenHoven Sep 25, 2023
134de21
Merge branch 'apache:trunk' into KAFKA-15417-flip-joinSpuriousLookBac…
VictorvandenHoven Sep 27, 2023
4396399
Merge branch 'apache:trunk' into KAFKA-15417-flip-joinSpuriousLookBac…
VictorvandenHoven Oct 16, 2023
a5b68f8
KAFKA-15417 nit: ofMillis(0) -> ZERO?
VictorvandenHoven Oct 16, 2023
54be2e6
KAFKA-15417 Added the outerJoinSide dependent logic to determine
VictorvandenHoven Oct 31, 2023
e12bfd6
KAFKA-15417 checkStyle violations
VictorvandenHoven Oct 31, 2023
513ae8c
KAFKA-15417 more compliant to kafka-streams layout conventions
VictorvandenHoven Nov 20, 2023
d91f2ea
Merge remote-tracking branch 'origin/trunk' into
VictorvandenHoven Nov 20, 2023
6e16a0e
KAFKA-15417 re-formatted the layout to the previous version
VictorvandenHoven Nov 20, 2023
8a20ca7
KAFKA-15417 formatting to kafka-streams conventions
VictorvandenHoven Nov 21, 2023
1a0d887
KAFKA-15417 Checkstyle: removed a white space
VictorvandenHoven Nov 21, 2023
ec48dbb
Merge branch 'apache:trunk' into KAFKA-15417-flip-joinSpuriousLookBac…
VictorvandenHoven Feb 9, 2024
edf535c
KAFKA-15417 merged code from KAFKA-16123: Fix
VictorvandenHoven Feb 9, 2024
2cf235a
KAFKA-15417 merged code from KAFKA-16123
VictorvandenHoven Feb 9, 2024
8497258
Merge branch 'KAFKA-15417-flip-joinSpuriousLookBackTimeMs' of https:/…
VictorvandenHoven Feb 9, 2024
eb1d438
Merge branch 'apache:trunk' into KAFKA-15417-flip-joinSpuriousLookBac…
VictorvandenHoven Feb 12, 2024
f00d0f4
Revert "KAFKA-15417 merged code from KAFKA-16123"
VictorvandenHoven Feb 12, 2024
7e75db7
Revert "Revert "KAFKA-15417 merged code from KAFKA-16123""
VictorvandenHoven Feb 12, 2024
69bbf43
Revert "Revert "Revert "KAFKA-15417 merged code from KAFKA-16123"""
VictorvandenHoven Feb 12, 2024
34d4d24
Merge branch 'apache:trunk' into KAFKA-15417-flip-joinSpuriousLookBac…
VictorvandenHoven Feb 19, 2024
f401d91
KAFKA-15417 changing back the ordering to what is was before
VictorvandenHoven Feb 19, 2024
8f547a8
KAFKA-15417 set
VictorvandenHoven Mar 4, 2024
c22f15b
Merge remote-tracking branch 'origin/trunk' into
VictorvandenHoven Mar 4, 2024
32c5ec2
KAFKA-15417 remove debug lines
VictorvandenHoven Mar 4, 2024
ce7ff3f
KAFKA-15417-flip-joinSpuriousLookBackTimeMs
VictorvandenHoven Mar 5, 2024
9a8f8f5
KAFKA-15417-flip-joinSpuriousLookBackTimeMs
VictorvandenHoven Mar 5, 2024
664e428
long time is not a final
VictorvandenHoven Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
private final long joinGraceMs;
private final boolean enableSpuriousResultFix;
private final long joinSpuriousLookBackTimeMs;
private final long windowsAfterIntervalMs;

private final boolean outer;
private final boolean isLeftSide;
Expand All @@ -72,12 +73,13 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
if (isLeftSide) {
this.joinBeforeMs = windows.beforeMs;
this.joinAfterMs = windows.afterMs;
this.joinSpuriousLookBackTimeMs = windows.beforeMs;
this.joinSpuriousLookBackTimeMs = windows.afterMs;
} else {
this.joinBeforeMs = windows.afterMs;
this.joinAfterMs = windows.beforeMs;
this.joinSpuriousLookBackTimeMs = windows.afterMs;
this.joinSpuriousLookBackTimeMs = windows.beforeMs;
}
this.windowsAfterIntervalMs = windows.afterMs;
this.joinGraceMs = windows.gracePeriodMs();
this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
this.joiner = joiner;
Expand Down Expand Up @@ -135,18 +137,14 @@ public void process(final Record<K, V1> record) {

sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);

// Emit all non-joined records which window has closed
if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record));
}
try (final WindowStoreIterator<V2> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
while (iter.hasNext()) {
needOuterJoin = false;
final KeyValue<Long, V2> otherRecord = iter.next();
final long otherRecordTimestamp = otherRecord.key;

outerJoinStore.ifPresent(store -> {
// use putIfAbsent to first read and see if there's any values for the key,
// Use putIfAbsent to first read and see if there's any values for the key,
// if yes delete the key, otherwise do not issue a put;
// we may delete some values with the same key early but since we are going
// range over all values of the same key even after failure, since the other window-store
Expand Down Expand Up @@ -187,6 +185,11 @@ public void process(final Record<K, V1> record) {
}
}
}

// Emit all non-joined records which window has closed
if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record));
}
}

@SuppressWarnings("unchecked")
Expand All @@ -198,7 +201,7 @@ private void emitNonJoinedOuterRecords(
// to reduce runtime cost, we try to avoid paying those cost

// only try to emit left/outer join results if there _might_ be any result records
if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinSpuriousLookBackTimeMs - joinGraceMs) {
if (sharedTimeTracker.minTime + joinSpuriousLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) {
return;
}
// throttle the emit frequency to a (configurable) interval;
Expand Down Expand Up @@ -228,7 +231,7 @@ private void emitNonJoinedOuterRecords(
sharedTimeTracker.minTime = timestamp;

// Skip next records if window has not closed
if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) {
if (sharedTimeTracker.minTime + windowsAfterIntervalMs + joinGraceMs >= sharedTimeTracker.streamTime) {
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
}
}

@Test
public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this test case? Seems it's fully contained in testLeftJoinedRecordsWithZeroAfterAreEmitted below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, removed this test.

final StreamsBuilder builder = new StreamsBuilder();

final int[] expectedKeys = new int[] {0, 1, 2, 3};

final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);

joined = stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
StreamJoined.with(Serdes.Integer(),
Serdes.String(),
Serdes.String())
);
joined.process(supplier);

final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();

assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();

processor.init(null);
// push four items with increasing timestamps to the primary stream; this should emit null-joined items
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

his should emit [three] null-joined items; the last one does not emit yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified comment

// w1 = {}
// w2 = {}
// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using B not A for left input?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed B into A

// w2 = {}
final long time = 1000L;
for (int i = 0; i < expectedKeys.length; i++) {
inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], time + i);
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "B0+null", 1000L),
new KeyValueTimestamp<>(1, "B1+null", 1001L),
new KeyValueTimestamp<>(2, "B2+null", 1002L)
);
}
}

@Test
public void testLeftJoinedRecordsWithZeroAfterAreEmitted() {
final StreamsBuilder builder = new StreamsBuilder();

final int[] expectedKeys = new int[] {0, 1, 2, 3};
final int[] expectedKeysNotJoined = new int[] {10, 11, 12, 13};

final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);

joined = stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(Duration.ZERO),
StreamJoined.with(Serdes.Integer(),
Serdes.String(),
Serdes.String())
);
joined.process(supplier);

final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();

assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();

processor.init(null);

// push four items with increasing timestamps to the primary stream; the other window is empty;
// this should emit the first three left-joined items;
// A3 is not triggered yet
// w1 = {}
// w2 = {}
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = {}
long time = 1000L;
for (int i = 0; i < expectedKeys.length; i++) {
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i);
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+null", 1000L),
new KeyValueTimestamp<>(1, "A1+null", 1001L),
new KeyValueTimestamp<>(2, "A2+null", 1002L)
);

// push four items smaller timestamps (out of window) to the secondary stream;
// this should produce four joined items
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = {}
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999) }
time = 1000L - 1L;
for (final int expectedKey : expectedKeys) {
inputTopic2.pipeInput(expectedKey, "a" + expectedKey, time);
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+a0", 1000L),
new KeyValueTimestamp<>(1, "A1+a1", 1001L),
new KeyValueTimestamp<>(2, "A2+a2", 1002L),
new KeyValueTimestamp<>(3, "A3+a3", 1003L)
);

// push four items with increased timestamps to the secondary stream;
// this should produce four joined item
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999) }
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000) }
time += 1L;
for (final int expectedKey : expectedKeys) {
inputTopic2.pipeInput(expectedKey, "b" + expectedKey, time);
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+b0", 1000L),
new KeyValueTimestamp<>(1, "A1+b1", 1001L),
new KeyValueTimestamp<>(2, "A2+b2", 1002L),
new KeyValueTimestamp<>(3, "A3+b3", 1003L)
);

// push four items with increased timestamps to the secondary stream;
// this should produce only three joined items;
// c0 arrives too late to be joined with A0
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000) }
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001) }
time += 1L;
for (final int expectedKey : expectedKeys) {
inputTopic2.pipeInput(expectedKey, "c" + expectedKey, time);
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(1, "A1+c1", 1001L),
new KeyValueTimestamp<>(2, "A2+c2", 1002L),
new KeyValueTimestamp<>(3, "A3+c3", 1003L)
);

// push four items with increased timestamps to the secondary stream;
// this should produce only two joined items;
// d0 and d1 arrive too late to be joined with A0 and A1
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001) }
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002) }
time += 1L;
for (final int expectedKey : expectedKeys) {
inputTopic2.pipeInput(expectedKey, "d" + expectedKey, time);
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(2, "A2+d2", 1002L),
new KeyValueTimestamp<>(3, "A3+d3", 1003L)
);

// push four items with increased timestamps to the secondary stream;
// this should produce one joined item;
// only e3 can be joined with A3;
// e0, e1 and e2 arrive too late to be joined with A0, A1 and A2
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002) }
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
// 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003) }
time += 1L;
for (final int expectedKey : expectedKeys) {
inputTopic2.pipeInput(expectedKey, "e" + expectedKey, time);
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(3, "A3+e3", 1003L)
);

// push four items with larger timestamps to the secondary stream;
// no (non-)joined items can be produced
//
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
// 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003) }
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
// 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003),
// 0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100) }
time = 1000 + 100L;
for (final int expectedKey : expectedKeys) {
inputTopic2.pipeInput(expectedKey, "f" + expectedKey, time);
}
processor.checkAndClearProcessResult();
}
}

@Test
public void testLeftJoinWithInMemoryCustomSuppliers() {
final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L));
Expand Down Expand Up @@ -609,17 +842,18 @@ public void testOrdering() {
inputTopic1.pipeInput(1, "A1", 100L);
processor.checkAndClearProcessResult();

// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then
// the joined records
// push one item to the other window that has a join;
// this should produce the joined record first;
// then non-joined record with a closed window
// by the time they were produced before
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// w2 = { }
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// --> w2 = { 1:a1 (ts: 110) }
inputTopic2.pipeInput(1, "a1", 110L);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+null", 0L),
new KeyValueTimestamp<>(1, "A1+a1", 110L)
new KeyValueTimestamp<>(1, "A1+a1", 110L),
new KeyValueTimestamp<>(0, "A0+null", 0L)
);
}
}
Expand Down
Loading