Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide #15601

Merged
merged 26 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0e3cf15
separate left and right stream classes
wernerdv Mar 25, 2024
e29a702
Merge branch 'trunk' of https://github.com/apache/kafka into refactor…
raminqaf Apr 12, 2024
7699acb
Move init method to abstract class
raminqaf Apr 12, 2024
bdf8d2c
Move class fields to super class
raminqaf Apr 12, 2024
0a9714d
Move more code to super class
raminqaf Apr 12, 2024
7da135b
Move inner join logic to super class
raminqaf Apr 13, 2024
08d38a0
Clean up
raminqaf Apr 13, 2024
c6b734d
Clean up
raminqaf Apr 13, 2024
333ddcb
Clean up
raminqaf Apr 13, 2024
0539964
Clean up
raminqaf Apr 14, 2024
7c61118
Formatting
raminqaf Apr 23, 2024
d32425a
Merge branch 'trunk' of https://github.com/apache/kafka into refactor…
raminqaf Apr 30, 2024
76c1e76
Address reviews and refactor code into methods
raminqaf Apr 30, 2024
0880ec5
Merge branch 'trunk' of https://github.com/apache/kafka into refactor…
raminqaf May 7, 2024
df620ed
Indent code for smaller diff
raminqaf May 7, 2024
b1f2d7a
Update files
raminqaf May 7, 2024
a082cb3
Update files
raminqaf May 7, 2024
aa6ac4c
fix super
raminqaf May 7, 2024
e434c66
Update files
raminqaf May 7, 2024
9727220
fix indent
raminqaf May 7, 2024
9e309a2
Revert window open condition
raminqaf May 8, 2024
b6a9097
Merge branch 'trunk' of https://github.com/apache/kafka into refactor…
raminqaf May 14, 2024
46ba0a6
Address reviews
raminqaf May 14, 2024
b8ceac0
address reviews
raminqaf May 21, 2024
0dfee4f
Merge branch 'trunk' of https://github.com/apache/kafka into refactor…
raminqaf May 21, 2024
e223f33
Merge branch 'trunk' of https://github.com/apache/kafka into refactor…
raminqaf May 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
final TimeTrackerSupplier sharedTimeTrackerSupplier = new TimeTrackerSupplier();

final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows);
final KStreamKStreamJoin<K, V1, V2, VOut> joinThis = new KStreamKStreamJoin<>(
true,
final KStreamKStreamLeftJoinSide<K, V1, V2, VOut> joinThis = new KStreamKStreamLeftJoinSide<>(
otherWindowStore.name(),
internalWindows,
joiner,
Expand All @@ -182,8 +181,7 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
sharedTimeTrackerSupplier
);

final KStreamKStreamJoin<K, V2, V1, VOut> joinOther = new KStreamKStreamJoin<>(
false,
final KStreamKStreamRightJoinSide<K, V1, V2, VOut> joinOther = new KStreamKStreamRightJoinSide<>(
thisWindowStore.name(),
internalWindows,
AbstractStream.reverseJoinerWithKey(joiner),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
Expand All @@ -43,7 +42,7 @@
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;

class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, VThis, VOther> implements ProcessorSupplier<K, VThis, K, VOut> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class);

private final String otherWindowName;
Expand All @@ -55,28 +54,22 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
private final long windowsAfterMs;

private final boolean outer;
private final boolean isLeftSide;
private final Optional<String> outerJoinWindowName;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner;
private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner;

private final TimeTrackerSupplier sharedTimeTrackerSupplier;

KStreamKStreamJoin(final boolean isLeftSide,
final String otherWindowName,
KStreamKStreamJoin(final String otherWindowName,
final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner,
final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner,
final boolean outer,
final Optional<String> outerJoinWindowName,
final long joinBeforeMs,
final long joinAfterMs,
final TimeTrackerSupplier sharedTimeTrackerSupplier) {
this.isLeftSide = isLeftSide;
this.otherWindowName = otherWindowName;
if (isLeftSide) {
this.joinBeforeMs = windows.beforeMs;
this.joinAfterMs = windows.afterMs;
} else {
this.joinBeforeMs = windows.afterMs;
this.joinAfterMs = windows.beforeMs;
}
this.joinBeforeMs = joinBeforeMs;
this.joinAfterMs = joinAfterMs;
this.windowsAfterMs = windows.afterMs;
this.windowsBeforeMs = windows.beforeMs;
this.joinGraceMs = windows.gracePeriodMs();
Expand All @@ -87,15 +80,10 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
}

@Override
public Processor<K, V1, K, VOut> get() {
return new KStreamKStreamJoinProcessor();
}

private class KStreamKStreamJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
private WindowStore<K, V2> otherWindowStore;
protected abstract class KStreamKStreamJoinProcessor extends ContextualProcessor<K, VThis, K, VOut> {
private WindowStore<K, VOther> otherWindowStore;
private Sensor droppedRecordsSensor;
private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty();
private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>>> outerJoinStore = Optional.empty();
private InternalProcessorContext<K, VOut> internalProcessorContext;
private TimeTracker sharedTimeTracker;

Expand All @@ -122,13 +110,10 @@ public void init(final ProcessorContext<K, VOut> context) {
}
}

@SuppressWarnings("unchecked")
@Override
public void process(final Record<K, V1> record) {
public void process(final Record<K, VThis> record) {

final long inputRecordTimestamp = record.timestamp();
final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);

sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);

Expand All @@ -144,26 +129,11 @@ public void process(final Record<K, V1> record) {
outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record));
}

boolean needOuterJoin = outer;
try (final WindowStoreIterator<V2> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
while (iter.hasNext()) {
needOuterJoin = false;
final KeyValue<Long, V2> otherRecord = iter.next();
final long otherRecordTimestamp = otherRecord.key;

outerJoinStore.ifPresent(store -> {
// use putIfAbsent to first read and see if there's any values for the key,
// if yes delete the key, otherwise do not issue a put;
// we may delete some values with the same key early but since we are going
// range over all values of the same key even after failure, since the other window-store
// is only cleaned up by stream time, so this is okay for at-least-once.
store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), otherRecordTimestamp), null);
});

context().forward(
record.withValue(joiner.apply(record.key(), record.value(), otherRecord.value))
.withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
}
final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
try (final WindowStoreIterator<VOther> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
final boolean needOuterJoin = outer && !iter.hasNext();
iter.forEachRemaining(otherRecord -> emitInnerJoin(record, otherRecord, inputRecordTimestamp));

if (needOuterJoin) {
// The maxStreamTime contains the max time observed in both sides of the join.
Expand All @@ -187,17 +157,25 @@ public void process(final Record<K, V1> record) {
context().forward(record.withValue(joiner.apply(record.key(), record.value(), null)));
} else {
sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
outerJoinStore.ifPresent(store -> store.put(
TimestampedKeyAndJoinSide.make(isLeftSide, record.key(), inputRecordTimestamp),
LeftOrRightValue.make(isLeftSide, record.value())));
putInOuterJoinStore(record);
}
}
}
}

protected abstract TimestampedKeyAndJoinSide<K> makeThisKey(final K key, final long inputRecordTimestamp);

protected abstract LeftOrRightValue<VLeft, VRight> makeThisValue(final VThis thisValue);

protected abstract TimestampedKeyAndJoinSide<K> makeOtherKey(final K key, final long timestamp);

protected abstract VThis getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue);

protected abstract VOther getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue);

private void emitNonJoinedOuterRecords(
final KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> store,
final Record<K, V1> record) {
final KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> store,
final Record<K, ?> record) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

private void emitNonJoinedOuterRecords(final KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> store, 
                                       final Record<K, ?> record) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't that be

final Record<K, VThis> record

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Avoiding the wildcard here makes it easier to read and understand. I have changed it!


// calling `store.all()` creates an iterator what is an expensive operation on RocksDB;
// to reduce runtime cost, we try to avoid paying those cost
Expand All @@ -221,26 +199,24 @@ private void emitNonJoinedOuterRecords(
// reset to MAX_VALUE in case the store is empty
sharedTimeTracker.minTime = Long.MAX_VALUE;

try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> it = store.all()) {
try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> it = store.all()) {
TimestampedKeyAndJoinSide<K> prevKey = null;

boolean outerJoinLeftWindowOpen = false;
boolean outerJoinRightWindowOpen = false;
while (it.hasNext()) {
final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> nextKeyValue = it.next();
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = nextKeyValue.key;
sharedTimeTracker.minTime = timestampedKeyAndJoinSide.getTimestamp();
if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
// if windows are open for both joinSides we can break since there are no more candidates to emit
break;
}
final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> next = it.next();
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = next.key;
final long timestamp = timestampedKeyAndJoinSide.getTimestamp();
sharedTimeTracker.minTime = timestamp;

// Continue with the next outer record if window for this joinSide has not closed yet
// There might be an outer record for the other joinSide which window has not closed yet
// We rely on the <timestamp><left/right-boolean><key> ordering of KeyValueIterator
final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) {
if (isOuterJoinWindowOpen(timestampedKeyAndJoinSide)) {
if (timestampedKeyAndJoinSide.isLeftSide()) {
outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side
} else {
Expand All @@ -249,13 +225,8 @@ private void emitNonJoinedOuterRecords(
// We continue with the next outer record
continue;
}

final K key = timestampedKeyAndJoinSide.getKey();
final LeftOrRightValue<V1, V2> leftOrRightValue = next.value;
final VOut nullJoinedValue = getNullJoinedValue(key, leftOrRightValue);
context().forward(
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
);

forwardNonJoinedOuterRecords(record, nextKeyValue);

if (prevKey != null && !prevKey.equals(timestampedKeyAndJoinSide)) {
// blind-delete the previous key from the outer window store now it is emitted;
Expand All @@ -275,20 +246,22 @@ private void emitNonJoinedOuterRecords(
}
}

@SuppressWarnings("unchecked")
private VOut getNullJoinedValue(
final K key,
final LeftOrRightValue<V1, V2> leftOrRightValue) {
// depending on the JoinSide fill in the joiner key and joiner values
if (isLeftSide) {
return joiner.apply(key,
leftOrRightValue.getLeftValue(),
leftOrRightValue.getRightValue());
} else {
return joiner.apply(key,
(V1) leftOrRightValue.getRightValue(),
(V2) leftOrRightValue.getLeftValue());
}
private void forwardNonJoinedOuterRecords(final Record<K, ?> record, final KeyValue<? extends TimestampedKeyAndJoinSide<K>, ? extends LeftOrRightValue<VLeft, VRight>> nextKeyValue) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be simpler to define this method as:

private void forwardNonJoinedOuterRecords(final Record<K, VThis> record,
                                          final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide,
                                          final LeftOrRightValue<VLeft, VRight> leftOrRightValue) {

It makes the code a bit simpler and shorter.
Also here, why not Record<K, VThis> record. That is exactly the type used at the call site.

final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = nextKeyValue.key;
final K key = timestampedKeyAndJoinSide.getKey();
final long timestamp = timestampedKeyAndJoinSide.getTimestamp();
final LeftOrRightValue<VLeft, VRight> leftOrRightValue = nextKeyValue.value;
final VThis thisValue = getThisValue(leftOrRightValue);
final VOther otherValue = getOtherValue(leftOrRightValue);
final VOut nullJoinedValue = joiner.apply(key, thisValue, otherValue);
context().forward(
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
);
}

private boolean isOuterJoinWindowOpen(final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
return sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime;
}

private long getOuterJoinLookBackTimeMs(
Expand All @@ -301,6 +274,31 @@ private long getOuterJoinLookBackTimeMs(
}
}

private void emitInnerJoin(final Record<K, VThis> thisRecord, final KeyValue<Long, VOther> otherRecord,
final long inputRecordTimestamp) {
outerJoinStore.ifPresent(store -> {
// use putIfAbsent to first read and see if there's any values for the key,
// if yes delete the key, otherwise do not issue a put;
// we may delete some values with the same key early but since we are going
// range over all values of the same key even after failure, since the other window-store
// is only cleaned up by stream time, so this is okay for at-least-once.
final TimestampedKeyAndJoinSide<K> otherKey = makeOtherKey(thisRecord.key(), otherRecord.key);
store.putIfAbsent(otherKey, null);
});

context().forward(
thisRecord.withValue(joiner.apply(thisRecord.key(), thisRecord.value(), otherRecord.value))
.withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
}

private void putInOuterJoinStore(final Record<K, VThis> thisRecord) {
outerJoinStore.ifPresent(store -> {
final TimestampedKeyAndJoinSide<K> thisKey = makeThisKey(thisRecord.key(), thisRecord.timestamp());
final LeftOrRightValue<VLeft, VRight> thisValue = makeThisValue(thisRecord.value());
store.put(thisKey, thisValue);
});
}

@Override
public void close() {
sharedTimeTrackerSupplier.remove(context().taskId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;

import java.util.Optional;

class KStreamKStreamLeftJoinSide<K, VLeft, VRight, VOut> extends KStreamKStreamJoin<K, VLeft, VRight, VOut, VLeft, VRight> {

KStreamKStreamLeftJoinSide(final String otherWindowName,
final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super VLeft, ? super VRight, ? extends VOut> joiner,
final boolean outer,
final Optional<String> outerJoinWindowName,
final TimeTrackerSupplier sharedTimeTrackerSupplier) {
super(otherWindowName, windows, joiner, outer, outerJoinWindowName, windows.beforeMs, windows.afterMs,
sharedTimeTrackerSupplier);
}

@Override
public Processor<K, VLeft, K, VOut> get() {
return new KStreamKStreamJoinLeftProcessor();
}

private class KStreamKStreamJoinLeftProcessor extends KStreamKStreamJoinProcessor {


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

@Override
public TimestampedKeyAndJoinSide<K> makeThisKey(final K key, final long timestamp) {
return TimestampedKeyAndJoinSide.makeLeft(key, timestamp);
}

@Override
public LeftOrRightValue<VLeft, VRight> makeThisValue(final VLeft thisValue) {
return LeftOrRightValue.makeLeftValue(thisValue);
}

@Override
public TimestampedKeyAndJoinSide<K> makeOtherKey(final K key, final long timestamp) {
return TimestampedKeyAndJoinSide.makeRight(key, timestamp);
}

@Override
public VLeft getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getLeftValue();
}

@Override
public VRight getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getRightValue();
}
}
}
Loading