Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide #15601

Merged
merged 26 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0e3cf15
separate left and right stream classes
wernerdv Mar 25, 2024
e29a702
Merge branch 'trunk' of https://github.com/apache/kafka into refactor…
raminqaf Apr 12, 2024
7699acb
Move init method to abstract class
raminqaf Apr 12, 2024
bdf8d2c
Move class fields to super class
raminqaf Apr 12, 2024
0a9714d
Move more code to super class
raminqaf Apr 12, 2024
7da135b
Move inner join logic to super class
raminqaf Apr 13, 2024
08d38a0
Clean up
raminqaf Apr 13, 2024
c6b734d
Clean up
raminqaf Apr 13, 2024
333ddcb
Clean up
raminqaf Apr 13, 2024
0539964
Clean up
raminqaf Apr 14, 2024
7c61118
Formatting
raminqaf Apr 23, 2024
d32425a
Merge branch 'trunk' of https://github.com/apache/kafka into refactor…
raminqaf Apr 30, 2024
76c1e76
Address reviews and refactor code into methods
raminqaf Apr 30, 2024
0880ec5
Merge branch 'trunk' of https://github.com/apache/kafka into refactor…
raminqaf May 7, 2024
df620ed
Indent code for smaller diff
raminqaf May 7, 2024
b1f2d7a
Update files
raminqaf May 7, 2024
a082cb3
Update files
raminqaf May 7, 2024
aa6ac4c
fix super
raminqaf May 7, 2024
e434c66
Update files
raminqaf May 7, 2024
9727220
fix indent
raminqaf May 7, 2024
9e309a2
Revert window open condition
raminqaf May 8, 2024
b6a9097
Merge branch 'trunk' of https://github.com/apache/kafka into refactor…
raminqaf May 14, 2024
46ba0a6
Address reviews
raminqaf May 14, 2024
b8ceac0
address reviews
raminqaf May 21, 2024
0dfee4f
Merge branch 'trunk' of https://github.com/apache/kafka into refactor…
raminqaf May 21, 2024
e223f33
Merge branch 'trunk' of https://github.com/apache/kafka into refactor…
raminqaf May 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
final TimeTrackerSupplier sharedTimeTrackerSupplier = new TimeTrackerSupplier();

final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows);
final KStreamKStreamJoin<K, V1, V2, VOut> joinThis = new KStreamKStreamJoin<>(
true,
final KStreamKStreamLeftJoin<K, V1, V2, VOut> joinThis = new KStreamKStreamLeftJoin<>(
otherWindowStore.name(),
internalWindows,
joiner,
Expand All @@ -182,8 +181,7 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
sharedTimeTrackerSupplier
);

final KStreamKStreamJoin<K, V2, V1, VOut> joinOther = new KStreamKStreamJoin<>(
false,
final KStreamKStreamRightJoin<K, V1, V2, VOut> joinOther = new KStreamKStreamRightJoin<>(
thisWindowStore.name(),
internalWindows,
AbstractStream.reverseJoinerWithKey(joiner),
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;

import java.util.Optional;

class KStreamKStreamLeftJoin<K, VL, VR, VOut> extends KStreamKStreamJoin<K, VL, VR, VOut, VL, VR> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that this is the left side of the join or that this is a left join? That is a bit confusing. Maybe a better name might be KStreamKStreamJoinLeftSide.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. What do you think about KStreamKStreamLeftJoinSide instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would still prefer KStreamKStreamJoinLeftSide. It is the left side of a stream-stream join. The processor are also named KStreamKStreamJoinLeftProcessor.


KStreamKStreamLeftJoin(final String otherWindowName,
final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super VL, ? super VR, ? extends VOut> joiner,
final boolean outer,
final Optional<String> outerJoinWindowName,
final TimeTrackerSupplier sharedTimeTrackerSupplier) {
super(otherWindowName, windows, joiner, outer, outerJoinWindowName, windows.beforeMs, windows.afterMs,
sharedTimeTrackerSupplier);
}

@Override
public Processor<K, VL, K, VOut> get() {
return new KStreamKStreamJoinLeftProcessor();
}

private class KStreamKStreamJoinLeftProcessor extends KStreamKStreamJoinProcessor {


@Override
public TimestampedKeyAndJoinSide<K> makeThisKey(final K key, final long timestamp) {
return TimestampedKeyAndJoinSide.makeLeft(key, timestamp);
}

@Override
public LeftOrRightValue<VL, VR> makeThisValue(final VL thisValue) {
return LeftOrRightValue.makeLeftValue(thisValue);
}

@Override
public TimestampedKeyAndJoinSide<K> makeOtherKey(final K key, final long timestamp) {
return TimestampedKeyAndJoinSide.makeRight(key, timestamp);
}

@Override
public VL getThisValue(final LeftOrRightValue<? extends VL, ? extends VR> leftOrRightValue) {
return leftOrRightValue.getLeftValue();
}

@Override
public VR getOtherValue(final LeftOrRightValue<? extends VL, ? extends VR> leftOrRightValue) {
return leftOrRightValue.getRightValue();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;

import java.util.Optional;

class KStreamKStreamRightJoin<K, VL, VR, VOut> extends KStreamKStreamJoin<K, VL, VR, VOut, VR, VL> {

KStreamKStreamRightJoin(final String otherWindowName,
final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super VR, ? super VL, ? extends VOut> joiner,
final boolean outer,
final Optional<String> outerJoinWindowName,
final TimeTrackerSupplier sharedTimeTrackerSupplier) {
super(otherWindowName, windows, joiner, outer, outerJoinWindowName, windows.afterMs, windows.beforeMs,
sharedTimeTrackerSupplier);
}

@Override
public Processor<K, VR, K, VOut> get() {
return new KStreamKStreamRightJoinProcessor();
}

private class KStreamKStreamRightJoinProcessor extends KStreamKStreamJoinProcessor {
@Override
public TimestampedKeyAndJoinSide<K> makeThisKey(final K key, final long timestamp) {
return TimestampedKeyAndJoinSide.makeRight(key, timestamp);
}

@Override
public LeftOrRightValue<VL, VR> makeThisValue(final VR thisValue) {
return LeftOrRightValue.makeRightValue(thisValue);
}

@Override
public TimestampedKeyAndJoinSide<K> makeOtherKey(final K key, final long timestamp) {
return TimestampedKeyAndJoinSide.makeLeft(key, timestamp);
}

@Override
public VR getThisValue(final LeftOrRightValue<? extends VL, ? extends VR> leftOrRightValue) {
return leftOrRightValue.getRightValue();
}

@Override
public VL getOtherValue(final LeftOrRightValue<? extends VL, ? extends VR> leftOrRightValue) {
return leftOrRightValue.getLeftValue();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,6 @@ public static <V1, V2> LeftOrRightValue<V1, V2> makeRightValue(final V2 rightVal
return new LeftOrRightValue<>(null, rightValue);
}

/**
* Create a new {@link LeftOrRightValue} instance with the V value as {@code leftValue} if
* {@code isLeftSide} is True; otherwise {@code rightValue} if {@code isLeftSide} is False.
*
* @param value the V value (either V1 or V2 type)
* @param <V> the type of the value
* @return a new {@link LeftOrRightValue} instance
*/
public static <V> LeftOrRightValue make(final boolean isLeftSide, final V value) {
Objects.requireNonNull(value, "value is null");
return isLeftSide
? LeftOrRightValue.makeLeftValue(value)
: LeftOrRightValue.makeRightValue(value);
}

public V1 getLeftValue() {
return leftValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,25 @@ private TimestampedKeyAndJoinSide(final boolean leftSide, final K key, final lon
}

/**
* Create a new {@link TimestampedKeyAndJoinSide} instance if the provide {@code key} is not {@code null}.
* Create a new {@link TimestampedKeyAndJoinSide} instance for the left join side if the provide {@code key} is not {@code null}.
*
* @param leftSide True if the key is part of the left join side; False if it is from the right join side
* @param key the key
* @param <K> the type of the key
* @return a new {@link TimestampedKeyAndJoinSide} instance if the provide {@code key} is not {@code null}
* @return a new {@link TimestampedKeyAndJoinSide} instance for the left join side if the provide {@code key} is not {@code null}
*/
public static <K> TimestampedKeyAndJoinSide<K> make(final boolean leftSide, final K key, final long timestamp) {
return new TimestampedKeyAndJoinSide<>(leftSide, key, timestamp);
public static <K> TimestampedKeyAndJoinSide<K> makeLeft(final K key, final long timestamp) {
return new TimestampedKeyAndJoinSide<>(true, key, timestamp);
}
/**
* Create a new {@link TimestampedKeyAndJoinSide} instance for the right join side if the provide {@code key} is not {@code null}.
*
* @param key the key
* @param <K> the type of the key
* @return a new {@link TimestampedKeyAndJoinSide} instance for the right join side if the provide {@code key} is not {@code null}
*/
public static <K> TimestampedKeyAndJoinSide<K> makeRight(final K key, final long timestamp) {
return new TimestampedKeyAndJoinSide<>(false, key, timestamp);
}

public boolean isLeftSide() {
return leftSide;
}
Expand Down Expand Up @@ -89,4 +97,4 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(leftSide, key, timestamp);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ public void configure(final Map<String, ?> configs, final boolean isKey) {

@Override
public TimestampedKeyAndJoinSide<K> deserialize(final String topic, final byte[] data) {
final boolean bool = data[StateSerdes.TIMESTAMP_SIZE] == 1;
final boolean isLeft = data[StateSerdes.TIMESTAMP_SIZE] == 1;
final K key = keyDeserializer.deserialize(topic, rawKey(data));
final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp(data));

return TimestampedKeyAndJoinSide.make(bool, key, timestamp);
return isLeft ? TimestampedKeyAndJoinSide.makeLeft(key, timestamp) :
TimestampedKeyAndJoinSide.makeRight(key, timestamp);
}

private byte[] rawTimestamp(final byte[] data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,7 @@ public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {
* This test is testing something internal to [[KStreamKStreamJoin]], so we had to setup low-level api manually.
*/
final KStreamImplJoin.TimeTrackerSupplier tracker = new KStreamImplJoin.TimeTrackerSupplier();
final KStreamKStreamJoin<String, String, String, String> join = new KStreamKStreamJoin<>(
false,
final KStreamKStreamRightJoin<String, String, String, String> join = new KStreamKStreamRightJoin<>(
"other",
new JoinWindowsInternal(JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(1000))),
(key, v1, v2) -> v1 + v2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class TimestampedKeyAndJoinSideSerializerTest {
public void shouldSerializeKeyWithJoinSideAsTrue() {
final String value = "some-string";

final TimestampedKeyAndJoinSide<String> timestampedKeyAndJoinSide = TimestampedKeyAndJoinSide.make(true, value, 10);
final TimestampedKeyAndJoinSide<String> timestampedKeyAndJoinSide = TimestampedKeyAndJoinSide.makeLeft(value, 10);

final byte[] serialized =
STRING_SERDE.serializer().serialize(TOPIC, timestampedKeyAndJoinSide);
Expand All @@ -51,7 +51,7 @@ public void shouldSerializeKeyWithJoinSideAsTrue() {
public void shouldSerializeKeyWithJoinSideAsFalse() {
final String value = "some-string";

final TimestampedKeyAndJoinSide<String> timestampedKeyAndJoinSide = TimestampedKeyAndJoinSide.make(false, value, 20);
final TimestampedKeyAndJoinSide<String> timestampedKeyAndJoinSide = TimestampedKeyAndJoinSide.makeRight(value, 20);

final byte[] serialized =
STRING_SERDE.serializer().serialize(TOPIC, timestampedKeyAndJoinSide);
Expand All @@ -67,6 +67,6 @@ public void shouldSerializeKeyWithJoinSideAsFalse() {
@Test
public void shouldThrowIfSerializeNullData() {
assertThrows(NullPointerException.class,
() -> STRING_SERDE.serializer().serialize(TOPIC, TimestampedKeyAndJoinSide.make(true, null, 0)));
() -> STRING_SERDE.serializer().serialize(TOPIC, TimestampedKeyAndJoinSide.makeLeft(null, 0)));
}
}