Skip to content

Commit

Permalink
MINOR: Fix streams smoke test flush records (#18830)
Browse files Browse the repository at this point in the history
In the streams smoke test, flush records that are appended to the input topics, to advance the stream time so that all suppressed windows are flushed at the end of the test. The records are created with record time equal to current time + 2 days. caf0b67 changed the broker defaults so that records more than one hour in the future are rejected by the broker. This breaks the flush messages. By moving all record time stamps 2 days into the past, the existing logic should work correctly with the new default broker configuration.

A similar thing happens in the relational smoke test, where data is emitted 4 days into the future. To avoid running into retention / compaction, the window retention time is increased for both tests.

Reviewers: Bruno Cadonna <[email protected]>, Bill Bejeck <[email protected]>
  • Loading branch information
lucasbru authored and mjsax committed Feb 7, 2025
1 parent 557c17b commit 8d294cf
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,18 @@
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Timeout(600)
@Tag("integration")
public class SmokeTestDriverIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
3,
mkProperties(
Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

@BeforeAll
public static void startCluster() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,10 @@ public String toString() {
}

public static DataSet generate(final int numArticles, final int numComments) {
// generate four days' worth of data, starting right now (to avoid broker retention/compaction)
// generate four days' worth of data, starting 4 days in the past (avoiding future records)
final int timeSpan = 1000 * 60 * 60 * 24 * 4;
final long dataStartTime = System.currentTimeMillis();
final long dataEndTime = dataStartTime + timeSpan;
final long dataStartTime = System.currentTimeMillis() - timeSpan;
final long dataEndTime = System.currentTimeMillis();

// Explicitly create a seed so we can we can log.
// If we are debugging a failed run, we can deterministically produce the same dataset
Expand Down Expand Up @@ -648,6 +648,7 @@ public static Properties getConfig(final String broker,
)
);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
properties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, Duration.ofDays(5).toMillis());
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ private Properties getStreamsConfig(final Properties props) {
fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
fullProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
fullProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, Duration.ofDays(3).toMillis());
fullProps.putAll(props);
return fullProps;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@
import static org.apache.kafka.common.utils.Utils.mkEntry;

public class SmokeTestDriver extends SmokeTestUtil {
/**
* We are creating all records two days in the past, so that we can flush all windows by sending a final record
* using the current timestamp, without using timestamps in the future.
*/
private static final long CREATE_TIME_SHIFT_MS = Duration.ofDays(2).toMillis();

private static final String[] NUMERIC_VALUE_TOPICS = {
"data",
"echo",
Expand Down Expand Up @@ -137,6 +143,8 @@ static void generatePerpetually(final String kafka,
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(
"data",
null,
System.currentTimeMillis() - CREATE_TIME_SHIFT_MS,
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
Expand All @@ -146,6 +154,8 @@ static void generatePerpetually(final String kafka,
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
null,
System.currentTimeMillis() - CREATE_TIME_SHIFT_MS,
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
Expand Down Expand Up @@ -198,6 +208,8 @@ public static Map<String, Set<Integer>> generate(final String kafka,
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(
"data",
null,
System.currentTimeMillis() - CREATE_TIME_SHIFT_MS,
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
Expand All @@ -207,6 +219,8 @@ public static Map<String, Set<Integer>> generate(final String kafka,
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
null,
System.currentTimeMillis() - CREATE_TIME_SHIFT_MS,
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
Expand Down Expand Up @@ -272,7 +286,7 @@ private static void flush(final KafkaProducer<byte[], byte[]> producer,
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
System.currentTimeMillis(),
keyBytes,
valBytes
));
Expand Down

0 comments on commit 8d294cf

Please sign in to comment.