Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

output actual AirbyteMessages for cdc #2631

Merged
merged 2 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@

package io.airbyte.integrations.source.postgres;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.lang.CloseableQueue;
import io.airbyte.protocol.models.AirbyteMessage;
import java.util.concurrent.LinkedBlockingQueue;

public class CloseableLinkedBlockingQueue extends LinkedBlockingQueue<JsonNode> implements CloseableQueue<JsonNode> {
public class CloseableLinkedBlockingQueue extends LinkedBlockingQueue<AirbyteMessage> implements CloseableQueue<AirbyteMessage> {

private final Runnable onClose;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static java.lang.Thread.sleep;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.functional.CheckedConsumer;
Expand Down Expand Up @@ -136,17 +137,10 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(JsonN
.using(getDebeziumProperties(config))
.notifying(record -> {
try {
LOGGER.info("record = " + record);
JsonNode node = Jsons.jsonNode(
ImmutableMap.of("key", record.key() != null ? record.key() : "null", "value", record.value(), "destination", record.destination())); // todo:
// better
// transformation
// function
// here
LOGGER.info("node = " + node);
queue.add(node);
final AirbyteMessage message = convertChangeEvent(record, emittedAt);
queue.add(message);
} catch (Exception e) {
LOGGER.info("error");
LOGGER.info("error: " + e);
thrownError.set(e);
}
})
Expand All @@ -162,13 +156,13 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(JsonN

// todo - this obviously needs to actually do something.
final Predicate<Object> hasReachedLsnPredicate = (r) -> false;
final Iterator<JsonNode> queueIterator = Queues.toStream(queue).iterator();
final AbstractIterator<JsonNode> iterator = new AbstractIterator<>() {
final Iterator<AirbyteMessage> queueIterator = Queues.toStream(queue).iterator();
final AbstractIterator<AirbyteMessage> iterator = new AbstractIterator<>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this in-line class definition really threw me off while reading this code. Why don't we refactor it out of this method? The method is pretty large right now which makes it hard to follow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's still a lot we're trying to lock down here right now. i agree, we will want to refactor this to make the responsibility of each piece more clear and testable. can we get a pass on this for now while we are still making pretty big code changes in this area?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼


private boolean hasReachedLsn = false;

@Override
protected JsonNode computeNext() {
protected AirbyteMessage computeNext() {
// if we have reached the lsn we stop, otherwise we have the potential to wait indefinitely for the
// next value.
if (!hasReachedLsn) {
Expand All @@ -181,7 +175,7 @@ protected JsonNode computeNext() {
}
}

final JsonNode next = queueIterator.next();
final AirbyteMessage next = queueIterator.next();
LOGGER.info("next {}", next);
// todo fix this cast. the record passed to this iterator has to include the lsn somewhere. it can
// either be the full change event or some smaller object that just includes the lsn.
Expand All @@ -198,7 +192,7 @@ protected JsonNode computeNext() {

};

final AutoCloseableIterator<JsonNode> jsonIterator = AutoCloseableIterators.fromIterator(iterator, () -> {
final AutoCloseableIterator<AirbyteMessage> messageIterator = AutoCloseableIterators.fromIterator(iterator, () -> {
engine.close();
executor.shutdown();

Expand All @@ -207,13 +201,6 @@ protected JsonNode computeNext() {
}
});

final AutoCloseableIterator<AirbyteMessage> messageIterator = AutoCloseableIterators.transform(jsonIterator, r -> new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream("single_stream") // todo: refactor this
.withEmittedAt(emittedAt.toEpochMilli())
.withData(r)));

return Collections.singletonList(messageIterator);
} else {
return super.getIncrementalIterators(config, database, catalog, tableNameToTable, stateManager, emittedAt);
Expand Down Expand Up @@ -270,6 +257,44 @@ private static boolean isCdc(JsonNode config) {
return !(config.get("replication_slot") == null);
}

public static AirbyteMessage convertChangeEvent(ChangeEvent<String, String> event, Instant emittedAt) {
final JsonNode debeziumRecord = Jsons.deserialize(event.value());
final JsonNode before = debeziumRecord.get("before");
final JsonNode after = debeziumRecord.get("after");
final JsonNode source = debeziumRecord.get("source");

final JsonNode data = formatDebeziumData(before, after, source);

final String streamName = source.get("table").asText();

final AirbyteRecordMessage airbyteRecordMessage = new AirbyteRecordMessage()
.withStream(streamName)
.withEmittedAt(emittedAt.toEpochMilli())
.withData(data);

return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(airbyteRecordMessage);
}

public static JsonNode formatDebeziumData(JsonNode before, JsonNode after, JsonNode source) {
final ObjectNode base = (ObjectNode) (after.isNull() ? before : after);

long transactionMillis = source.get("ts_ms").asLong();
long lsn = source.get("lsn").asLong();

base.put("_ab_cdc_updated_at", transactionMillis);
base.put("_ab_cdc_lsn", lsn);

if (after.isNull()) {
base.put("_ab_cdc_deleted_at", transactionMillis);
} else {
base.put("_ab_cdc_deleted_at", (Long) null);
}

return base;
}

public static void main(String[] args) throws Exception {
final Source source = new PostgresSource();
LOGGER.info("starting source: {}", PostgresSource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@

package io.airbyte.integrations.source.postgres;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
Expand All @@ -34,12 +40,16 @@
import io.airbyte.db.Databases;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import io.debezium.engine.ChangeEvent;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
Expand Down Expand Up @@ -163,6 +173,18 @@ public void testIt() throws Exception {

final AutoCloseableIterator<AirbyteMessage> read = source.read(getConfig(PSQL_DB, dbName), configuredCatalog, null);

Thread.sleep(5000);
final JsonNode config = getConfig(PSQL_DB, dbName);
final Database database = getDatabaseFromConfig(config);
database.query(ctx -> {
ctx.fetch(
"UPDATE names SET power = 10000.2 WHERE first_name = 'san';");
ctx.fetch(
"DELETE FROM names WHERE first_name = 'san';");
return null;
});
database.close();

long startMillis = System.currentTimeMillis();
while (read.hasNext() || startMillis - System.currentTimeMillis() < 10000) {
if (read.hasNext()) {
Expand All @@ -172,4 +194,51 @@ public void testIt() throws Exception {
}
}

@Test
public void testConvertChangeEvent() throws IOException {
final String stream = "names";
final Instant emittedAt = Instant.now();
ChangeEvent<String, String> insertChangeEvent = mockChangeEvent("insert_change_event.json");
ChangeEvent<String, String> updateChangeEvent = mockChangeEvent("update_change_event.json");
ChangeEvent<String, String> deleteChangeEvent = mockChangeEvent("delete_change_event.json");

final AirbyteMessage actualInsert = PostgresSource.convertChangeEvent(insertChangeEvent, emittedAt);
final AirbyteMessage actualUpdate = PostgresSource.convertChangeEvent(updateChangeEvent, emittedAt);
final AirbyteMessage actualDelete = PostgresSource.convertChangeEvent(deleteChangeEvent, emittedAt);

final AirbyteMessage expectedInsert = createAirbyteMessage(stream, emittedAt, "insert_message.json");
final AirbyteMessage expectedUpdate = createAirbyteMessage(stream, emittedAt, "update_message.json");
final AirbyteMessage expectedDelete = createAirbyteMessage(stream, emittedAt, "delete_message.json");

deepCompare(expectedInsert, actualInsert);
deepCompare(expectedUpdate, actualUpdate);
deepCompare(expectedDelete, actualDelete);
}

private static ChangeEvent<String, String> mockChangeEvent(String resourceName) throws IOException {
final ChangeEvent<String, String> mocked = mock(ChangeEvent.class);
final String resource = MoreResources.readResource(resourceName);
when(mocked.value()).thenReturn(resource);

return mocked;
}

private static AirbyteMessage createAirbyteMessage(String stream, Instant emittedAt, String resourceName) throws IOException {
final String data = MoreResources.readResource(resourceName);

final AirbyteRecordMessage recordMessage = new AirbyteRecordMessage()
.withStream(stream)
.withData(Jsons.deserialize(data))
.withEmittedAt(emittedAt.toEpochMilli());

return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(recordMessage);
}

private static void deepCompare(Object expected, Object actual) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
assertEquals(objectMapper.readTree(Jsons.serialize(expected)), objectMapper.readTree(Jsons.serialize(actual)));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"before": {
"first_name": "san",
"last_name": "goku",
"power": null
},
"after": null,
"source": {
"version": "1.4.2.Final",
"connector": "postgresql",
"name": "orders",
"ts_ms": 1616775646886,
"snapshot": false,
"db": "db_lwfoyffqvx",
"schema": "public",
"table": "names",
"txId": 498,
"lsn": 23012360,
"xmin": null
},
"op": "d",
"ts_ms": 1616775646931,
"transaction": null,
"destination": "orders.public.names"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"first_name": "san",
"last_name": "goku",
"power": null,
"_ab_cdc_updated_at": 1616775646886,
"_ab_cdc_lsn": 23012360,
"_ab_cdc_deleted_at": 1616775646886
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"before": null,
"after": {
"first_name": "san",
"last_name": "goku",
"power": "Infinity"
},
"source": {
"version": "1.4.2.Final",
"connector": "postgresql",
"name": "orders",
"ts_ms": 1616775642623,
"snapshot": true,
"db": "db_lwfoyffqvx",
"schema": "public",
"table": "names",
"txId": 495,
"lsn": 23011544,
"xmin": null
},
"op": "r",
"ts_ms": 1616775642624,
"transaction": null,
"destination": "orders.public.names"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"first_name": "san",
"last_name": "goku",
"power": "Infinity",
"_ab_cdc_updated_at": 1616775642623,
"_ab_cdc_lsn": 23011544,
"_ab_cdc_deleted_at": null
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"before": null,
"after": {
"first_name": "san",
"last_name": "goku",
"power": 10000.2
},
"source": {
"version": "1.4.2.Final",
"connector": "postgresql",
"name": "orders",
"ts_ms": 1616775646881,
"snapshot": false,
"db": "db_lwfoyffqvx",
"schema": "public",
"table": "names",
"txId": 497,
"lsn": 23012216,
"xmin": null
},
"op": "u",
"ts_ms": 1616775646929,
"transaction": null,
"destination": "orders.public.names"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"first_name": "san",
"last_name": "goku",
"power": 10000.2,
"_ab_cdc_updated_at": 1616775646881,
"_ab_cdc_lsn": 23012216,
"_ab_cdc_deleted_at": null
}