Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DBZ-PGYB] Enable more test classes in the PG connector test suite #141

Merged
merged 48 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
7ca385f
initial commit to for PG connector parity
vaibhav-yb Mar 11, 2024
9b0aab6
addressed review comments
vaibhav-yb Mar 18, 2024
9f9854e
added file
vaibhav-yb Mar 18, 2024
ed8c457
addressed review comments
vaibhav-yb Mar 18, 2024
c6cb244
Merge branch 'ybdb-debezium-2.5.2' into ybdb-debezium-252-vaibhav
vaibhav-yb Mar 19, 2024
b9c0adc
Merge branch 'ybdb-debezium-2.5.2' into ybdb-debezium-252-vaibhav
vaibhav-yb Mar 20, 2024
f825f15
initial commit to modify record for replica identity CHANGE
vaibhav-yb Mar 20, 2024
61aff29
commit with working set of changes for snapshot and streaming
vaibhav-yb Mar 26, 2024
221277e
custom replica identity storage class
vaibhav-yb Mar 26, 2024
2a6808c
added transformer for custom structure of messages when in CHANGE rep…
vaibhav-yb Mar 26, 2024
e5aec77
cleanup of the transformer
vaibhav-yb Mar 26, 2024
a6b6418
cleanup of the transformer
vaibhav-yb Mar 26, 2024
8c58146
fixed bug for pgschemabuilder
vaibhav-yb Mar 26, 2024
9e8b868
test changes
vaibhav-yb Mar 26, 2024
afa9972
addressed review comments
vaibhav-yb Mar 28, 2024
681b118
addressed review comments
vaibhav-yb Mar 28, 2024
245dd46
Merge branch 'ybdb-debezium-2.5.2' into ybdb-debezium-252-vaibhav
vaibhav-yb Mar 28, 2024
76b4796
do not fetch replica identity while processing NOOP
vaibhav-yb Apr 2, 2024
be24a73
record structure will be transformed only with yboutput plugin now
vaibhav-yb Apr 4, 2024
6034614
resolved merge conflicts
vaibhav-yb May 21, 2024
351839d
added code to store replica identity to help process before images
vaibhav-yb May 30, 2024
87def88
added missing import
vaibhav-yb May 30, 2024
feeaa6a
enabled before image tests
vaibhav-yb May 31, 2024
2f8eb02
Merge branch 'ybdb-debezium-2.5.2' into ybdb-debezium-252-vaibhav
vaibhav-yb Jun 11, 2024
5fdb03b
Merge branch 'ybdb-debezium-252-vaibhav' into enable-tests-before-image
vaibhav-yb Jun 11, 2024
f16b888
added tests
vaibhav-yb Jun 25, 2024
ac4fda8
changed default plugin to yboutput
vaibhav-yb Jul 9, 2024
203a3c8
changed some tests
vaibhav-yb Jul 10, 2024
3669402
added log interceptor
vaibhav-yb Jul 10, 2024
a6706a3
added heartbeat package
vaibhav-yb Jul 10, 2024
be2c92a
resolved merge conflicts
vaibhav-yb Jul 10, 2024
0659d1e
Merge branch 'stream-producer-tests' into merge-tests-final
vaibhav-yb Jul 10, 2024
d9cdf48
fixed tests
vaibhav-yb Jul 11, 2024
cd71121
removed changes in non-test code
vaibhav-yb Jul 11, 2024
a8b79af
Merge branch 'ybdb-debezium-2.5.2' into merge-tests-final
vaibhav-yb Jul 11, 2024
150255d
added more tests to be run
vaibhav-yb Jul 11, 2024
ad40d18
enabled more tests
vaibhav-yb Jul 11, 2024
17e5376
revert core code changes
vaibhav-yb Jul 11, 2024
12b256c
revert core code changes
vaibhav-yb Jul 11, 2024
37f2c2d
addressed self-review comments
vaibhav-yb Jul 11, 2024
8b8ca94
resolved merge conflicts
vaibhav-yb Jul 12, 2024
b727f5c
Merge branch 'ybdb-debezium-2.5.2' into merge-tests-final
vaibhav-yb Jul 12, 2024
b1f8ea7
added records stream producer tests from #141
vaibhav-yb Jul 12, 2024
8b5ee43
addressed review comments
vaibhav-yb Jul 12, 2024
b314f93
Merge branch 'ybdb-debezium-2.5.2' into records-stream-producer-IT
vaibhav-yb Jul 12, 2024
bb34cf4
resolved merge conflicts
vaibhav-yb Jul 12, 2024
dd56869
resolved merge conflicts
vaibhav-yb Jul 12, 2024
070dd3c
fixed formatting issue
vaibhav-yb Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,18 @@ public void shouldSerializeToJson() throws Exception {
.notifying((records, committer) -> {

for (ChangeEvent<String, String> r : records) {
if (r.destination().equals(TestHelper.getDefaultHeartbeatTopic())) {
continue;
}

assertThat(r.key()).isNotNull();
assertThat(r.value()).isNotNull();
try {
final Document key = DocumentReader.defaultReader().read(r.key());
final Document value = DocumentReader.defaultReader().read(r.value());
assertThat(key.getInteger("id")).isEqualTo(1);
assertThat(value.getDocument("after").getInteger("id")).isEqualTo(1);
assertThat(value.getDocument("after").getString("val")).isEqualTo("value1");
assertThat(key.getDocument("id").getInteger("value")).isEqualTo(1);
assertThat(value.getDocument("after").getDocument("id").getInteger("value")).isEqualTo(1);
assertThat(value.getDocument("after").getDocument("val").getString("value")).isEqualTo("value1");
}
catch (IOException e) {
throw new IllegalStateException(e);
Expand All @@ -117,7 +121,7 @@ public void shouldSerializeToJson() throws Exception {
LoggingContext.forConnector(getClass().getSimpleName(), "debezium-engine", "engine");
engine.run();
});
allLatch.await(5000, TimeUnit.MILLISECONDS);
allLatch.await(35000, TimeUnit.MILLISECONDS);
assertThat(allLatch.getCount()).isEqualTo(0);
}
}
Expand Down Expand Up @@ -158,7 +162,7 @@ public void handle(boolean success, String message, Throwable error) {
LoggingContext.forConnector(getClass().getSimpleName(), "debezium-engine", "engine");
engine.run();
});
allLatch.await(5000, TimeUnit.MILLISECONDS);
allLatch.await(35000, TimeUnit.MILLISECONDS);
assertThat(allLatch.getCount()).isEqualTo(0);
}
}
Expand All @@ -183,14 +187,18 @@ public void shouldSerializeToCloudEvents() throws Exception {

for (ChangeEvent<String, String> r : records) {
try {
if (r.destination().equals(TestHelper.getDefaultHeartbeatTopic())) {
continue;
}

final Document key = DocumentReader.defaultReader().read(r.key());
assertThat(key.getInteger("id")).isEqualTo(1);
assertThat(key.getDocument("id").getInteger("value")).isEqualTo(1);
assertThat(r.value()).isNotNull();

final Document value = DocumentReader.defaultReader().read(r.value());
assertThat(value.getString("id")).contains("txId");
assertThat(value.getDocument("data").getDocument("payload").getDocument("after").getInteger("id")).isEqualTo(1);
assertThat(value.getDocument("data").getDocument("payload").getDocument("after").getString("val")).isEqualTo("value1");
assertThat(value.getDocument("data").getDocument("payload").getDocument("after").getDocument("id").getInteger("value")).isEqualTo(1);
assertThat(value.getDocument("data").getDocument("payload").getDocument("after").getDocument("val").getString("value")).isEqualTo("value1");
}
catch (IOException e) {
throw new IllegalStateException(e);
Expand All @@ -205,7 +213,7 @@ public void shouldSerializeToCloudEvents() throws Exception {
LoggingContext.forConnector(getClass().getSimpleName(), "debezium-engine", "engine");
engine.run();
});
allLatch.await(5000, TimeUnit.MILLISECONDS);
allLatch.await(35000, TimeUnit.MILLISECONDS);
assertThat(allLatch.getCount()).isEqualTo(0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1424,9 +1424,9 @@ public void shouldTakeExcludeListFiltersIntoAccount() throws Exception {
TestHelper.execute("CREATE TABLE s1.b (pk SERIAL, aa integer, bb integer, PRIMARY KEY(pk));");
TestHelper.execute("ALTER TABLE s1.a ADD COLUMN bb integer;");
TestHelper.execute("INSERT INTO s1.a (aa, bb) VALUES (2, 2); "
+ "INSERT INTO s1.a (aa, bb) VALUES (3, 3); "
+ "INSERT INTO s1.b (aa, bb) VALUES (4, 4); "
+ "INSERT INTO s2.a (aa) VALUES (5);");
+ "INSERT INTO s1.a (aa, bb) VALUES (3, 3); "
+ "INSERT INTO s1.b (aa, bb) VALUES (4, 4); "
+ "INSERT INTO s2.a (aa) VALUES (5);");
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;

// TODO Vaibhav: Enabling this test doesn't make sense unless we populate the default value of the
// columns in the schema.
public class PostgresDefaultValueConverterIT extends AbstractConnectorTest {

@Before
Expand Down Expand Up @@ -143,17 +145,18 @@ private void createTableAndInsertData() {
private void assertDefaultValueChangeRecord(SourceRecord sourceRecord) {
final Schema valueSchema = sourceRecord.valueSchema();

assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dint")).isNull();
assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc1")).isNull();
assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc2")).isEqualTo("NULL");
assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc3")).isEqualTo("MYVALUE");
assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc4")).isEqualTo("NULL");
assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc5")).isEqualTo("NULL::character varying");
assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc6")).isNull();
assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt64("dt1")).isNotNull();
assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dt2")).isNotNull();
assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt64("dt3")).isNotNull();

assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dint").get("value")).isNull();
assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dvc1").get("value")).isNull();
assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dvc2").get("value")).isEqualTo("NULL");
assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dvc3").get("value")).isEqualTo("MYVALUE");
assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dvc4").get("value")).isEqualTo("NULL");
assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dvc5").get("value")).isEqualTo("NULL::character varying");
assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dvc6").get("value")).isNull();
assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dt1").get("value")).isNotNull();
assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dt2").get("value")).isNotNull();
assertThat(((Struct) sourceRecord.value()).getStruct("after").getStruct("dt3").get("value")).isNotNull();

// YB Note: We do not populate the default value while sending replication messages.
assertThat(valueSchema.field("after").schema().field("dint").schema().defaultValue()).isNull();
assertThat(valueSchema.field("after").schema().field("dvc1").schema().defaultValue()).isNull();
assertThat(valueSchema.field("after").schema().field("dvc2").schema().defaultValue()).isEqualTo("NULL");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testLifecycle() throws Exception {
// start connector
start(YugabyteDBConnector.class,
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS)
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.build());

Expand Down Expand Up @@ -117,7 +117,7 @@ public void testSnapshotAndStreamingMetrics() throws Exception {
// start connector
start(YugabyteDBConnector.class,
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS)
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.build());

Expand All @@ -133,7 +133,7 @@ public void testSnapshotAndStreamingWithCustomMetrics() throws Exception {

// start connector
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS)
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import io.debezium.config.Configuration;
Expand Down Expand Up @@ -43,6 +44,7 @@ public void after() {
TestHelper.dropPublication();
}

@Ignore("YB Note: Decimal handling mode precise unsupported")
@Test
@FixFor("DBZ-5991")
public void shouldReceiveChangesForInsertsWithPreciseMode() throws Exception {
Expand Down Expand Up @@ -89,9 +91,9 @@ public void shouldReceiveChangesForInsertsWithStringMode() throws Exception {
assertThat(recordsForTopic).hasSize(2);

Struct after = ((Struct) recordsForTopic.get(0).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("m")).isEqualTo("-92233720368547758.08");
assertThat(after.getStruct("m").get("value")).isEqualTo("-92233720368547758.08");
after = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("m")).isEqualTo("92233720368547758.07");
assertThat(after.getStruct("m").get("value")).isEqualTo("92233720368547758.07");
}

@Test
Expand All @@ -115,11 +117,12 @@ public void shouldReceiveChangesForInsertsWithDoubleMode() throws Exception {
assertThat(recordsForTopic).hasSize(2);

Struct after = ((Struct) recordsForTopic.get(0).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("m")).isEqualTo(-92233720368547758.00);
assertThat(after.getStruct("m").get("value")).isEqualTo(-92233720368547758.00);
after = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("m")).isEqualTo(92233720368547758.00);
assertThat(after.getStruct("m").get("value")).isEqualTo(92233720368547758.00);
}

@Ignore("YB Note: Decimal handling mode precise unsupported")
@Test
@FixFor("DBZ-6001")
public void shouldReceiveChangesForInsertNullAndZeroMoney() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ public void shouldSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabled() throw
final List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("updates_test.debezium_test"));
assertThat(recordsForTopic).hasSize(3);
Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(secondMessage.get("white")).isEqualTo(2);
assertThat(secondMessage.getStruct("white").getInt32("value")).isEqualTo(2);
Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(thirdMessage.get("white")).isEqualTo(3);
assertThat(thirdMessage.getStruct("white").getInt32("value")).isEqualTo(3);
}

@Test
Expand Down Expand Up @@ -117,9 +117,9 @@ public void shouldSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabledWithExcl
final List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("updates_test.debezium_test"));
assertThat(recordsForTopic).hasSize(3);
Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(secondMessage.get("white")).isEqualTo(2);
assertThat(secondMessage.getStruct("white").getInt32("value")).isEqualTo(2);
Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(thirdMessage.get("white")).isEqualTo(3);
assertThat(thirdMessage.getStruct("white").getInt32("value")).isEqualTo(3);
}

@Test
Expand Down Expand Up @@ -154,11 +154,11 @@ public void shouldNotSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabledButTa
final List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("updates_test.debezium_test"));
assertThat(recordsForTopic).hasSize(4);
Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(secondMessage.get("white")).isEqualTo(1);
assertThat(secondMessage.getStruct("white").getInt32("value")).isEqualTo(1);
Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(thirdMessage.get("white")).isEqualTo(2);
assertThat(thirdMessage.getStruct("white").getInt32("value")).isEqualTo(2);
Struct forthMessage = ((Struct) recordsForTopic.get(3).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(forthMessage.get("white")).isEqualTo(3);
assertThat(forthMessage.getStruct("white").getInt32("value")).isEqualTo(3);
}

@Test
Expand Down Expand Up @@ -196,11 +196,11 @@ public void shouldNotSkipEventsWithNoChangeInIncludedColumnsWhenSkipDisabled() t

assertThat(recordsForTopic).hasSize(4);
Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(secondMessage.get("white")).isEqualTo(1);
assertThat(secondMessage.getStruct("white").getInt32("value")).isEqualTo(1);
Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(thirdMessage.get("white")).isEqualTo(2);
assertThat(thirdMessage.getStruct("white").getInt32("value")).isEqualTo(2);
Struct forthMessage = ((Struct) recordsForTopic.get(3).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(forthMessage.get("white")).isEqualTo(3);
assertThat(forthMessage.getStruct("white").getInt32("value")).isEqualTo(3);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,13 @@ public void transactionMetadata() throws InterruptedException {
.build();
start(YugabyteDBConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForDefaultReplicationSlotBeActive();

waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();

TestHelper.waitFor(Duration.ofSeconds(15));

// insert and verify 2 new records
TestHelper.execute(INSERT_STMT);

Expand Down