diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index 6f072415803fd..976de472300f1 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -35,12 +35,10 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.Set; -import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; import static org.apache.kafka.streams.tests.SmokeTestDriver.verify; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -48,10 +46,7 @@ @Timeout(600) @Tag("integration") public class SmokeTestDriverIntegrationTest { - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( - 3, - mkProperties( - Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE)))); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); @BeforeAll public static void startCluster() throws IOException { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java index 1b24fd6175517..2fac180e5b633 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java @@ -306,10 +306,10 @@ public String toString() { } public static DataSet generate(final int numArticles, final int numComments) { - // generate four days' worth of data, starting right now (to avoid broker retention/compaction) + // generate four days' worth of data, starting 4 days in the past (avoiding future records) final int timeSpan = 1000 * 60 * 60 * 24 * 4; - final long dataStartTime = System.currentTimeMillis(); - final long dataEndTime = dataStartTime + timeSpan; + final long dataStartTime = System.currentTimeMillis() - timeSpan; + final long dataEndTime = System.currentTimeMillis(); // Explicitly create a seed so we can we can log. // If we are debugging a failed run, we can deterministically produce the same dataset @@ -648,6 +648,7 @@ public static Properties getConfig(final String broker, ) ); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + properties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, Duration.ofDays(5).toMillis()); return properties; } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index 0b93006adf16d..bf53c76921be1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -133,6 +133,7 @@ private Properties getStreamsConfig(final Properties props) { fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name); fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); fullProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + fullProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, Duration.ofDays(3).toMillis()); fullProps.putAll(props); return fullProps; } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 7867734a6c540..26ae46aa6d4a4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -61,6 +61,12 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; public class SmokeTestDriver extends SmokeTestUtil { + /** + * We are creating all records two days in the past, so that we can flush all windows by sending a final record + * using the current timestamp, without using timestamps in the future. + */ + private static final long CREATE_TIME_SHIFT_MS = Duration.ofDays(2).toMillis(); + private static final String[] NUMERIC_VALUE_TOPICS = { "data", "echo", @@ -137,6 +143,8 @@ static void generatePerpetually(final String kafka, final ProducerRecord record = new ProducerRecord<>( "data", + null, + System.currentTimeMillis() - CREATE_TIME_SHIFT_MS, stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); @@ -146,6 +154,8 @@ static void generatePerpetually(final String kafka, final ProducerRecord fkRecord = new ProducerRecord<>( "fk", + null, + System.currentTimeMillis() - CREATE_TIME_SHIFT_MS, intSerde.serializer().serialize("", value), stringSerde.serializer().serialize("", key) ); @@ -198,6 +208,8 @@ public static Map> generate(final String kafka, final ProducerRecord record = new ProducerRecord<>( "data", + null, + System.currentTimeMillis() - CREATE_TIME_SHIFT_MS, stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); @@ -207,6 +219,8 @@ public static Map> generate(final String kafka, final ProducerRecord fkRecord = new ProducerRecord<>( "fk", + null, + System.currentTimeMillis() - CREATE_TIME_SHIFT_MS, intSerde.serializer().serialize("", value), stringSerde.serializer().serialize("", key) ); @@ -272,7 +286,7 @@ private static void flush(final KafkaProducer producer, producer.send(new ProducerRecord<>( partition.topic(), partition.partition(), - System.currentTimeMillis() + Duration.ofDays(2).toMillis(), + System.currentTimeMillis(), keyBytes, valBytes ));