diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisher.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisher.java index 895aa735365ef..f7ea5f933d2bd 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisher.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisher.java @@ -33,11 +33,13 @@ import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json; -import io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy; +import io.debezium.engine.spi.OffsetCommitPolicy; import java.util.Properties; import java.util.Queue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -58,6 +60,7 @@ public class DebeziumRecordPublisher implements AutoCloseable { private final AtomicBoolean hasClosed; private final AtomicBoolean isClosing; private final AtomicReference thrownError; + private final CountDownLatch engineLatch; public DebeziumRecordPublisher(JsonNode config, ConfiguredAirbyteCatalog catalog, AirbyteFileOffsetBackingStore offsetManager) { this.config = config; @@ -67,12 +70,13 @@ public DebeziumRecordPublisher(JsonNode config, ConfiguredAirbyteCatalog catalog this.isClosing = new AtomicBoolean(false); this.thrownError = new AtomicReference<>(); this.executor = Executors.newSingleThreadExecutor(); + this.engineLatch = new CountDownLatch(1); } public void start(Queue> queue) { engine = DebeziumEngine.create(Json.class) .using(getDebeziumProperties(config, catalog, offsetManager)) - .using(new AlwaysCommitOffsetPolicy()) + .using(new OffsetCommitPolicy.AlwaysCommitOffsetPolicy()) .notifying(e -> { // debezium outputs a tombstone event that has a value of null. this is an artifact of how it // interacts with kafka. we want to ignore it. @@ -85,6 +89,7 @@ public void start(Queue> queue) { .using((success, message, error) -> { LOGGER.info("Debezium engine shutdown."); thrownError.set(error); + engineLatch.countDown(); }) .build(); @@ -103,10 +108,15 @@ public void close() throws Exception { engine.close(); } - // announce closure only engine is off. - hasClosed.set(true); + // wait for closure before shutting down executor service + engineLatch.await(5, TimeUnit.MINUTES); + // shut down and await for thread to actually go down executor.shutdown(); + executor.awaitTermination(5, TimeUnit.MINUTES); + + // after the engine is completely off, we can mark this as closed + hasClosed.set(true); if (thrownError.get() != null) { throw new RuntimeException(thrownError.get()); @@ -145,12 +155,16 @@ protected static Properties getDebeziumProperties(JsonNode config, ConfiguredAir } props.setProperty("slot.name", config.get("replication_slot").asText()); + props.setProperty("publication.name", "airbyte_publication"); // todo: allow as configured input // table selection final String tableWhitelist = getTableWhitelist(catalog); props.setProperty("table.include.list", tableWhitelist); props.setProperty("database.include.list", config.get("database").asText()); + // recommended when using pgoutput + props.setProperty("publication.autocreate.mode", "disabled"); + return props; } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 6064cc9a412ab..7ddee39f000bb 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -162,6 +162,7 @@ void setup() throws Exception { database = getDatabaseFromConfig(config); database.query(ctx -> { ctx.execute("SELECT pg_create_logical_replication_slot('" + fullReplicationSlot + "', 'pgoutput');"); + ctx.execute("CREATE PUBLICATION airbyte_publication FOR ALL TABLES;"); ctx.execute(String.format("CREATE TABLE %s(%s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));", MAKES_STREAM_NAME, COL_ID, COL_MAKE, COL_ID)); ctx.execute(String.format("CREATE TABLE %s(%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));", MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID)); @@ -308,15 +309,40 @@ void testRecordsProducedDuringAndAfterSync() throws Exception { assertExpectedStateMessages(extractStateMessages(actualRecords2)); - final Set recordMessages1 = extractRecordMessages(actualRecords1); - final Set recordMessages2 = extractRecordMessages(actualRecords2); + // sometimes there can be more than one of these at the end of the snapshot and just before the + // first incremental. + final Set recordMessages1 = removeDuplicates(extractRecordMessages(actualRecords1)); + final Set recordMessages2 = removeDuplicates(extractRecordMessages(actualRecords2)); final int recordsCreatedBeforeTestCount = MAKE_RECORDS.size() + MODEL_RECORDS.size(); assertTrue(recordsCreatedBeforeTestCount < recordMessages1.size(), "Expected first sync to include records created while the test was running."); assertTrue(0 < recordMessages2.size(), "Expected records to be replicated in the second sync."); + LOGGER.info("recordsToCreate = " + recordsToCreate); + LOGGER.info("recordsCreatedBeforeTestCount = " + recordsCreatedBeforeTestCount); + LOGGER.info("recordMessages1.size() = " + recordMessages1.size()); + LOGGER.info("recordMessages2.size() = " + recordMessages2.size()); assertEquals(recordsToCreate + recordsCreatedBeforeTestCount, recordMessages1.size() + recordMessages2.size()); } + private static Set removeDuplicates(Set messages) { + final Set existingDataRecordsWithoutUpdated = new HashSet<>(); + final Set output = new HashSet<>(); + + for (AirbyteRecordMessage message : messages) { + ObjectNode node = message.getData().deepCopy(); + node.remove("_ab_cdc_updated_at"); + + if (existingDataRecordsWithoutUpdated.contains(node)) { + LOGGER.info("Removing duplicate node: " + node); + } else { + output.add(message); + existingDataRecordsWithoutUpdated.add(node); + } + } + + return output; + } + @Test @DisplayName("When both incremental CDC and full refresh are configured for different streams in a sync, the data is replicated as expected.") void testCdcAndFullRefreshInSameSync() throws Exception { diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index ba977efb62612..589b381962371 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -126,6 +126,7 @@ Please read the [CDC docs](../../architecture/cdc.md) for an overview of how Air * Using logical replication increases disk space used on the database server. The additional data is stored until it is consumed. * We recommend setting frequent syncs for CDC in order to ensure that this data doesn't fill up your disk space. * If you stop syncing a CDC-configured Postgres instance to Airbyte, you should delete the replication slot. Otherwise, it may fill up your disk space. +* Our CDC implementation uses at least once delivery for all change records. ### Setting up CDC for Postgres @@ -134,16 +135,18 @@ Follow one of these guides to enable logical replication: * [AWS Postgres RDS or Aurora](#setting-up-cdc-on-aws-postgres-rds-or-aurora) * [Azure Database for Postgres](#setting-up-cdc-on-azure-database-for-postgres) -Then, the Airbyte user for your instance needs to be granted `REPLICATION` and `LOGIN` permissions. Since we are using embedded Debezium under the hood for Postgres, we recommend reading the [permissioning section of the Debezium docs](https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-permissions) for more information on what is required. +Then, the Airbyte user for your instance needs to be granted `REPLICATION` and `LOGIN` permissions. You can create a role with `CREATE ROLE REPLICATION LOGIN;` and grant that role to a user. You still need to make sure the user can connect to the database, use the schema, and to use `SELECT` on tables (the same are required for non-CDC incremental syncs and all full refreshes). -Finally, you will need to create a replication slot. Here is the query used to create a replication slot called `airbyte_slot`: +Next, you will need to create a replication slot. Here is the query used to create a replication slot called `airbyte_slot`: ``` SELECT pg_create_logical_replication_slot('airbyte_slot', 'pgoutput');` ``` This slot **must** use `pgoutput`. -After providing the name of this slot when configuring the source, you should be ready to sync data with CDC! +Finally, you will need to run `CREATE PUBLICATION airbyte_publication FOR ALL TABLES;`. This publication name is customizable. If you prefer, you can create a publication only for specific tables using `CREATE PUBLICATION airbyte_publication FOR TABLE `, but you won't be able to sync from other tables using CDC, even if you select them in the UI. + +When configuring the source, select CDC and provide the replication slot and publication you just created. You should be ready to sync data with CDC! ### Setting up CDC on Bare Metal, VMs (EC2/GCE/etc), Docker, etc. Some settings must be configured in the `postgresql.conf` file for your database. You can find the location of this file using `psql -U postgres -c 'SHOW config_file'` withe the correct `psql` credentials specified. Alternatively, a custom file can be specified when running postgres with the `-c` flag. For example `postgres -c config_file=/etc/postgresql/postgresql.conf` runs Postgres with the config file at `/etc/postgresql/postgresql.conf`.