Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

require cdc users to create publications & update docs #2818

Merged
merged 6 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy;
import io.debezium.engine.spi.OffsetCommitPolicy;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -58,6 +60,7 @@ public class DebeziumRecordPublisher implements AutoCloseable {
private final AtomicBoolean hasClosed;
private final AtomicBoolean isClosing;
private final AtomicReference<Throwable> thrownError;
private final CountDownLatch engineLatch;

public DebeziumRecordPublisher(JsonNode config, ConfiguredAirbyteCatalog catalog, AirbyteFileOffsetBackingStore offsetManager) {
this.config = config;
Expand All @@ -67,12 +70,13 @@ public DebeziumRecordPublisher(JsonNode config, ConfiguredAirbyteCatalog catalog
this.isClosing = new AtomicBoolean(false);
this.thrownError = new AtomicReference<>();
this.executor = Executors.newSingleThreadExecutor();
this.engineLatch = new CountDownLatch(1);
}

public void start(Queue<ChangeEvent<String, String>> queue) {
engine = DebeziumEngine.create(Json.class)
.using(getDebeziumProperties(config, catalog, offsetManager))
.using(new AlwaysCommitOffsetPolicy())
.using(new OffsetCommitPolicy.AlwaysCommitOffsetPolicy())
.notifying(e -> {
// debezium outputs a tombstone event that has a value of null. this is an artifact of how it
// interacts with kafka. we want to ignore it.
Expand All @@ -85,6 +89,7 @@ public void start(Queue<ChangeEvent<String, String>> queue) {
.using((success, message, error) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this callback only gets invoked when the engine is totally off i'm assuming?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, it's the last thing

LOGGER.info("Debezium engine shutdown.");
thrownError.set(error);
engineLatch.countDown();
})
.build();

Expand All @@ -103,10 +108,15 @@ public void close() throws Exception {
engine.close();
}

// announce closure only engine is off.
hasClosed.set(true);
// wait for closure before shutting down executor service
engineLatch.await(5, TimeUnit.MINUTES);

// shut down and await for thread to actually go down
executor.shutdown();
executor.awaitTermination(5, TimeUnit.MINUTES);

// after the engine is completely off, we can mark this as closed
hasClosed.set(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this can go before the execute shutdown. it just had to be after the latch. not that its'a big difference either way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I didn't notice any difference. I just wanted to be as conservative as possible.


if (thrownError.get() != null) {
throw new RuntimeException(thrownError.get());
Expand Down Expand Up @@ -145,12 +155,16 @@ protected static Properties getDebeziumProperties(JsonNode config, ConfiguredAir
}

props.setProperty("slot.name", config.get("replication_slot").asText());
props.setProperty("publication.name", "airbyte_publication"); // todo: allow as configured input

// table selection
final String tableWhitelist = getTableWhitelist(catalog);
props.setProperty("table.include.list", tableWhitelist);
props.setProperty("database.include.list", config.get("database").asText());

// recommended when using pgoutput
props.setProperty("publication.autocreate.mode", "disabled");

return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ void setup() throws Exception {
database = getDatabaseFromConfig(config);
database.query(ctx -> {
ctx.execute("SELECT pg_create_logical_replication_slot('" + fullReplicationSlot + "', 'pgoutput');");
ctx.execute("CREATE PUBLICATION airbyte_publication FOR ALL TABLES;");
ctx.execute(String.format("CREATE TABLE %s(%s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));", MAKES_STREAM_NAME, COL_ID, COL_MAKE, COL_ID));
ctx.execute(String.format("CREATE TABLE %s(%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));",
MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID));
Expand Down Expand Up @@ -308,15 +309,40 @@ void testRecordsProducedDuringAndAfterSync() throws Exception {

assertExpectedStateMessages(extractStateMessages(actualRecords2));

final Set<AirbyteRecordMessage> recordMessages1 = extractRecordMessages(actualRecords1);
final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
// sometimes there can be more than one of these at the end of the snapshot and just before the
// first incremental.
final Set<AirbyteRecordMessage> recordMessages1 = removeDuplicates(extractRecordMessages(actualRecords1));
final Set<AirbyteRecordMessage> recordMessages2 = removeDuplicates(extractRecordMessages(actualRecords2));

final int recordsCreatedBeforeTestCount = MAKE_RECORDS.size() + MODEL_RECORDS.size();
assertTrue(recordsCreatedBeforeTestCount < recordMessages1.size(), "Expected first sync to include records created while the test was running.");
assertTrue(0 < recordMessages2.size(), "Expected records to be replicated in the second sync.");
LOGGER.info("recordsToCreate = " + recordsToCreate);
LOGGER.info("recordsCreatedBeforeTestCount = " + recordsCreatedBeforeTestCount);
LOGGER.info("recordMessages1.size() = " + recordMessages1.size());
LOGGER.info("recordMessages2.size() = " + recordMessages2.size());
assertEquals(recordsToCreate + recordsCreatedBeforeTestCount, recordMessages1.size() + recordMessages2.size());
}

private static Set<AirbyteRecordMessage> removeDuplicates(Set<AirbyteRecordMessage> messages) {
final Set<JsonNode> existingDataRecordsWithoutUpdated = new HashSet<>();
final Set<AirbyteRecordMessage> output = new HashSet<>();

for (AirbyteRecordMessage message : messages) {
ObjectNode node = message.getData().deepCopy();
node.remove("_ab_cdc_updated_at");

if (existingDataRecordsWithoutUpdated.contains(node)) {
LOGGER.info("Removing duplicate node: " + node);
} else {
output.add(message);
existingDataRecordsWithoutUpdated.add(node);
}
}

return output;
}

@Test
@DisplayName("When both incremental CDC and full refresh are configured for different streams in a sync, the data is replicated as expected.")
void testCdcAndFullRefreshInSameSync() throws Exception {
Expand Down
9 changes: 6 additions & 3 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ Please read the [CDC docs](../../architecture/cdc.md) for an overview of how Air
* Using logical replication increases disk space used on the database server. The additional data is stored until it is consumed.
* We recommend setting frequent syncs for CDC in order to ensure that this data doesn't fill up your disk space.
* If you stop syncing a CDC-configured Postgres instance to Airbyte, you should delete the replication slot. Otherwise, it may fill up your disk space.
* Our CDC implementation uses at least once delivery for all change records.

### Setting up CDC for Postgres

Expand All @@ -134,16 +135,18 @@ Follow one of these guides to enable logical replication:
* [AWS Postgres RDS or Aurora](#setting-up-cdc-on-aws-postgres-rds-or-aurora)
* [Azure Database for Postgres](#setting-up-cdc-on-azure-database-for-postgres)

Then, the Airbyte user for your instance needs to be granted `REPLICATION` and `LOGIN` permissions. Since we are using embedded Debezium under the hood for Postgres, we recommend reading the [permissioning section of the Debezium docs](https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-permissions) for more information on what is required.
Then, the Airbyte user for your instance needs to be granted `REPLICATION` and `LOGIN` permissions. You can create a role with `CREATE ROLE <name> REPLICATION LOGIN;` and grant that role to a user. You still need to make sure the user can connect to the database, use the schema, and to use `SELECT` on tables (the same are required for non-CDC incremental syncs and all full refreshes).

Finally, you will need to create a replication slot. Here is the query used to create a replication slot called `airbyte_slot`:
Next, you will need to create a replication slot. Here is the query used to create a replication slot called `airbyte_slot`:
```
SELECT pg_create_logical_replication_slot('airbyte_slot', 'pgoutput');`
```

This slot **must** use `pgoutput`.

After providing the name of this slot when configuring the source, you should be ready to sync data with CDC!
Finally, you will need to run `CREATE PUBLICATION airbyte_publication FOR ALL TABLES;`. This publication name is customizable. If you prefer, you can create a publication only for specific tables using `CREATE PUBLICATION airbyte_publication FOR TABLE <tbl1, tbl2, tbl3>`, but you won't be able to sync from other tables using CDC, even if you select them in the UI.

When configuring the source, select CDC and provide the replication slot and publication you just created. You should be ready to sync data with CDC!

### Setting up CDC on Bare Metal, VMs (EC2/GCE/etc), Docker, etc.
Some settings must be configured in the `postgresql.conf` file for your database. You can find the location of this file using `psql -U postgres -c 'SHOW config_file'` withe the correct `psql` credentials specified. Alternatively, a custom file can be specified when running postgres with the `-c` flag. For example `postgres -c config_file=/etc/postgresql/postgresql.conf` runs Postgres with the config file at `/etc/postgresql/postgresql.conf`.
Expand Down