diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DebeziumEngineIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DebeziumEngineIT.java index 3fcaf3dfd03..2bdce248eee 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DebeziumEngineIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DebeziumEngineIT.java @@ -95,14 +95,18 @@ public void shouldSerializeToJson() throws Exception { .notifying((records, committer) -> { for (ChangeEvent r : records) { + if (r.destination().equals(TestHelper.getDefaultHeartbeatTopic())) { + continue; + } + assertThat(r.key()).isNotNull(); assertThat(r.value()).isNotNull(); try { final Document key = DocumentReader.defaultReader().read(r.key()); final Document value = DocumentReader.defaultReader().read(r.value()); - assertThat(key.getInteger("id")).isEqualTo(1); - assertThat(value.getDocument("after").getInteger("id")).isEqualTo(1); - assertThat(value.getDocument("after").getString("val")).isEqualTo("value1"); + assertThat(key.getDocument("id").getInteger("value")).isEqualTo(1); + assertThat(value.getDocument("after").getDocument("id").getInteger("value")).isEqualTo(1); + assertThat(value.getDocument("after").getDocument("val").getString("value")).isEqualTo("value1"); } catch (IOException e) { throw new IllegalStateException(e); @@ -117,7 +121,7 @@ public void shouldSerializeToJson() throws Exception { LoggingContext.forConnector(getClass().getSimpleName(), "debezium-engine", "engine"); engine.run(); }); - allLatch.await(5000, TimeUnit.MILLISECONDS); + allLatch.await(35000, TimeUnit.MILLISECONDS); assertThat(allLatch.getCount()).isEqualTo(0); } } @@ -158,7 +162,7 @@ public void handle(boolean success, String message, Throwable error) { LoggingContext.forConnector(getClass().getSimpleName(), "debezium-engine", "engine"); engine.run(); }); - allLatch.await(5000, TimeUnit.MILLISECONDS); + allLatch.await(35000, TimeUnit.MILLISECONDS); assertThat(allLatch.getCount()).isEqualTo(0); } } @@ -183,14 +187,18 @@ public void shouldSerializeToCloudEvents() throws Exception { for (ChangeEvent r : records) { try { + if (r.destination().equals(TestHelper.getDefaultHeartbeatTopic())) { + continue; + } + final Document key = DocumentReader.defaultReader().read(r.key()); - assertThat(key.getInteger("id")).isEqualTo(1); + assertThat(key.getDocument("id").getInteger("value")).isEqualTo(1); assertThat(r.value()).isNotNull(); final Document value = DocumentReader.defaultReader().read(r.value()); assertThat(value.getString("id")).contains("txId"); - assertThat(value.getDocument("data").getDocument("payload").getDocument("after").getInteger("id")).isEqualTo(1); - assertThat(value.getDocument("data").getDocument("payload").getDocument("after").getString("val")).isEqualTo("value1"); + assertThat(value.getDocument("data").getDocument("payload").getDocument("after").getDocument("id").getInteger("value")).isEqualTo(1); + assertThat(value.getDocument("data").getDocument("payload").getDocument("after").getDocument("val").getString("value")).isEqualTo("value1"); } catch (IOException e) { throw new IllegalStateException(e); @@ -205,7 +213,7 @@ public void shouldSerializeToCloudEvents() throws Exception { LoggingContext.forConnector(getClass().getSimpleName(), "debezium-engine", "engine"); engine.run(); }); - allLatch.await(5000, TimeUnit.MILLISECONDS); + allLatch.await(35000, TimeUnit.MILLISECONDS); assertThat(allLatch.getCount()).isEqualTo(0); } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index c9b33b4f14a..d51354134ed 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1424,9 +1424,9 @@ public void shouldTakeExcludeListFiltersIntoAccount() throws Exception { TestHelper.execute("CREATE TABLE s1.b (pk SERIAL, aa integer, bb integer, PRIMARY KEY(pk));"); TestHelper.execute("ALTER TABLE s1.a ADD COLUMN bb integer;"); TestHelper.execute("INSERT INTO s1.a (aa, bb) VALUES (2, 2); " - + "INSERT INTO s1.a (aa, bb) VALUES (3, 3); " - + "INSERT INTO s1.b (aa, bb) VALUES (4, 4); " - + "INSERT INTO s2.a (aa) VALUES (5);"); + + "INSERT INTO s1.a (aa, bb) VALUES (3, 3); " + + "INSERT INTO s1.b (aa, bb) VALUES (4, 4); " + + "INSERT INTO s2.a (aa) VALUES (5);"); Configuration.Builder configBuilder = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresDefaultValueConverterIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresDefaultValueConverterIT.java index b54e0e1fbc2..19c87b4911a 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresDefaultValueConverterIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresDefaultValueConverterIT.java @@ -28,6 +28,8 @@ import io.debezium.junit.SkipWhenDatabaseVersion; import io.debezium.junit.logging.LogInterceptor; +// TODO Vaibhav: Enabling this test doesn't make sense unless we populate the default value of the +// columns in the schema. public class PostgresDefaultValueConverterIT extends AbstractConnectorTest { @Before @@ -143,17 +145,18 @@ private void createTableAndInsertData() { private void assertDefaultValueChangeRecord(SourceRecord sourceRecord) { final Schema valueSchema = sourceRecord.valueSchema(); - assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dint")).isNull(); - assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc1")).isNull(); - assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc2")).isEqualTo("NULL"); - assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc3")).isEqualTo("MYVALUE"); - assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc4")).isEqualTo("NULL"); - assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc5")).isEqualTo("NULL::character varying"); - assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc6")).isNull(); - assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt64("dt1")).isNotNull(); - assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dt2")).isNotNull(); - assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt64("dt3")).isNotNull(); - + assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dint").get("value")).isNull(); + assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dvc1").get("value")).isNull(); + assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dvc2").get("value")).isEqualTo("NULL"); + assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dvc3").get("value")).isEqualTo("MYVALUE"); + assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dvc4").get("value")).isEqualTo("NULL"); + assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dvc5").get("value")).isEqualTo("NULL::character varying"); + assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dvc6").get("value")).isNull(); + assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dt1").get("value")).isNotNull(); + assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dt2").get("value")).isNotNull(); + assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dt3").get("value")).isNotNull(); + + // YB Note: We do not populate the default value while sending replication messages. assertThat(valueSchema.field("after").schema().field("dint").schema().defaultValue()).isNull(); assertThat(valueSchema.field("after").schema().field("dvc1").schema().defaultValue()).isNull(); assertThat(valueSchema.field("after").schema().field("dvc2").schema().defaultValue()).isEqualTo("NULL"); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java index 5cc76aa6c4a..2c53bffd97d 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java @@ -61,7 +61,7 @@ public void testLifecycle() throws Exception { // start connector start(YugabyteDBConnector.class, TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .build()); @@ -117,7 +117,7 @@ public void testSnapshotAndStreamingMetrics() throws Exception { // start connector start(YugabyteDBConnector.class, TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .build()); @@ -133,7 +133,7 @@ public void testSnapshotAndStreamingWithCustomMetrics() throws Exception { // start connector Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(PostgresConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata") .build(); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMoneyIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMoneyIT.java index a444e7245b5..3309431789a 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMoneyIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMoneyIT.java @@ -16,6 +16,7 @@ import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import io.debezium.config.Configuration; @@ -43,6 +44,7 @@ public void after() { TestHelper.dropPublication(); } + @Ignore("YB Note: Decimal handling mode precise unsupported") @Test @FixFor("DBZ-5991") public void shouldReceiveChangesForInsertsWithPreciseMode() throws Exception { @@ -89,9 +91,9 @@ public void shouldReceiveChangesForInsertsWithStringMode() throws Exception { assertThat(recordsForTopic).hasSize(2); Struct after = ((Struct) recordsForTopic.get(0).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(after.get("m")).isEqualTo("-92233720368547758.08"); + assertThat(after.getStruct("m").get("value")).isEqualTo("-92233720368547758.08"); after = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(after.get("m")).isEqualTo("92233720368547758.07"); + assertThat(after.getStruct("m").get("value")).isEqualTo("92233720368547758.07"); } @Test @@ -115,11 +117,12 @@ public void shouldReceiveChangesForInsertsWithDoubleMode() throws Exception { assertThat(recordsForTopic).hasSize(2); Struct after = ((Struct) recordsForTopic.get(0).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(after.get("m")).isEqualTo(-92233720368547758.00); + assertThat(after.getStruct("m").get("value")).isEqualTo(-92233720368547758.00); after = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(after.get("m")).isEqualTo(92233720368547758.00); + assertThat(after.getStruct("m").get("value")).isEqualTo(92233720368547758.00); } + @Ignore("YB Note: Decimal handling mode precise unsupported") @Test @FixFor("DBZ-6001") public void shouldReceiveChangesForInsertNullAndZeroMoney() throws Exception { diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSkipMessagesWithoutChangeConfigIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSkipMessagesWithoutChangeConfigIT.java index cfe2ccbfdb5..9f0b3fa5138 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSkipMessagesWithoutChangeConfigIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSkipMessagesWithoutChangeConfigIT.java @@ -77,9 +77,9 @@ public void shouldSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabled() throw final List recordsForTopic = records.recordsForTopic(topicName("updates_test.debezium_test")); assertThat(recordsForTopic).hasSize(3); Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(secondMessage.get("white")).isEqualTo(2); + assertThat(secondMessage.getStruct("white").getInt32("value")).isEqualTo(2); Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(thirdMessage.get("white")).isEqualTo(3); + assertThat(thirdMessage.getStruct("white").getInt32("value")).isEqualTo(3); } @Test @@ -117,9 +117,9 @@ public void shouldSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabledWithExcl final List recordsForTopic = records.recordsForTopic(topicName("updates_test.debezium_test")); assertThat(recordsForTopic).hasSize(3); Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(secondMessage.get("white")).isEqualTo(2); + assertThat(secondMessage.getStruct("white").getInt32("value")).isEqualTo(2); Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(thirdMessage.get("white")).isEqualTo(3); + assertThat(thirdMessage.getStruct("white").getInt32("value")).isEqualTo(3); } @Test @@ -154,11 +154,11 @@ public void shouldNotSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabledButTa final List recordsForTopic = records.recordsForTopic(topicName("updates_test.debezium_test")); assertThat(recordsForTopic).hasSize(4); Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(secondMessage.get("white")).isEqualTo(1); + assertThat(secondMessage.getStruct("white").getInt32("value")).isEqualTo(1); Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(thirdMessage.get("white")).isEqualTo(2); + assertThat(thirdMessage.getStruct("white").getInt32("value")).isEqualTo(2); Struct forthMessage = ((Struct) recordsForTopic.get(3).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(forthMessage.get("white")).isEqualTo(3); + assertThat(forthMessage.getStruct("white").getInt32("value")).isEqualTo(3); } @Test @@ -196,11 +196,11 @@ public void shouldNotSkipEventsWithNoChangeInIncludedColumnsWhenSkipDisabled() t assertThat(recordsForTopic).hasSize(4); Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(secondMessage.get("white")).isEqualTo(1); + assertThat(secondMessage.getStruct("white").getInt32("value")).isEqualTo(1); Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(thirdMessage.get("white")).isEqualTo(2); + assertThat(thirdMessage.getStruct("white").getInt32("value")).isEqualTo(2); Struct forthMessage = ((Struct) recordsForTopic.get(3).value()).getStruct(Envelope.FieldName.AFTER); - assertThat(forthMessage.get("white")).isEqualTo(3); + assertThat(forthMessage.getStruct("white").getInt32("value")).isEqualTo(3); } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TransactionMetadataIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TransactionMetadataIT.java index 1674f342822..57f4a3d97ac 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TransactionMetadataIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TransactionMetadataIT.java @@ -82,12 +82,13 @@ public void transactionMetadata() throws InterruptedException { .build(); start(YugabyteDBConnector.class, config); assertConnectorIsRunning(); - TestHelper.waitForDefaultReplicationSlotBeActive(); waitForAvailableRecords(100, TimeUnit.MILLISECONDS); // there shouldn't be any snapshot records assertNoRecordsToConsume(); + TestHelper.waitFor(Duration.ofSeconds(15)); + // insert and verify 2 new records TestHelper.execute(INSERT_STMT);