Skip to content

Commit

Permalink
postgres cdc (#2548)
Browse files Browse the repository at this point in the history
* spike

* more

* debezium wip

* use oneof for configuration

* iterator wrapping structure

* push current

* working loop

* move capability into source

* hack it into a sharable state

* debezium test runner (#2617)

* CDC Wait for Values (#2618)

* output actual AirbyteMessages for cdc (#2631)

* message conversion

* fmt

* add lsn extraction and comparison (#2613)

* postgres cdc catalog (#2673)

* update cdc catalog

* A

* table selection for cdc (#2690)

* table selection for cdc

* fix broken merge

* also test double quote in name

* Add state management to CDC (#2718)

* CDC: Fix Producer/Consumer State Machine (#2721)

* CDC Postgres Tests (#2777)

* fix postgres cdc image name and run check before reading data (#2785)

* minor postgres cdc fixes

* add test and fix check behavior

* fix

* improve comment

* remove unused props, remove todos, add some more sanity tests (#2791)

* cdc: add offset store tests (#2793)

* clean (#2798)

* postgres cdc docs (#2784)

* cdc docs

* Update docs/integrations/sources/postgres.md

Co-authored-by: Charles <[email protected]>

* address gcp

* learn too english

* add link

* add more disk space warnings

* add additional cdc use case

* add information on how to find postgresql.conf

* add how to find the file

Co-authored-by: Charles <[email protected]>

* various merge conflict fixes (#2799)

* cdc standard tests (#2813)

* require cdc users to create publications & update docs (#2818)

* postgres cdc race condition

* working? but different process

* add additional logging to help debug in the future

* everything done except working config

* remove unintended change

* Use oneOf in PG CDC spec (#2827)

* add oneOf configuration for postgres cdc  (#2831)

* add oneof configuration for cdc postgres

* fmt

Co-authored-by: Charles <[email protected]>

* fix test (#2834)

* fix test

* bump version

* add docs on creating replica identities (#2838)

* add docs on creating replica identities

* emphasize danger

* grammar

* bump pg version in source catalog

* generate seed files

Co-authored-by: cgardens <[email protected]>
  • Loading branch information
jrhizor and cgardens authored Apr 9, 2021
1 parent 470b7a3 commit 2b19da8
Show file tree
Hide file tree
Showing 48 changed files with 3,506 additions and 46 deletions.
9 changes: 9 additions & 0 deletions airbyte-commons/src/main/java/io/airbyte/commons/io/IOs.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ public static Path writeFile(Path path, String fileName, String contents) {
return writeFile(filePath, contents);
}

public static Path writeFile(Path filePath, byte[] contents) {
try {
Files.write(filePath, contents);
return filePath;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public static Path writeFile(Path filePath, String contents) {
try {
Files.writeString(filePath, contents, StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public static String readResource(String name) throws IOException {
return Resources.toString(resource, StandardCharsets.UTF_8);
}

public static byte[] readBytes(String name) throws IOException {
URL resource = Resources.getResource(name);
return Resources.toByteArray(resource);
}

/**
* This class is a bit of a hack. Might have unexpected behavior.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ public static <T> AutoCloseableIterator<T> fromStream(Stream<T> stream) {
return new DefaultAutoCloseableIterator<>(stream.iterator(), stream::close);
}

/**
* Consumes entire iterator and collect it into a list. Then it closes the iterator.
*/
public static <T> List<T> toListAndClose(AutoCloseableIterator<T> iterator) throws Exception {
try (iterator) {
return MoreIterators.toList(iterator);
}
}

/**
* Returns a {@link AutoCloseableIterator} that will call the provided supplier ONE time when
* {@link AutoCloseableIterator#hasNext()} is called the first time. The supplier returns a stream
Expand Down Expand Up @@ -131,6 +140,11 @@ public static <T> AutoCloseableIterator<T> transform(Function<AutoCloseableItera
return new DefaultAutoCloseableIterator<>(iteratorCreator.apply(autoCloseableIterator), autoCloseableIterator::close);
}

@SafeVarargs
public static <T> CompositeIterator<T> concatWithEagerClose(AutoCloseableIterator<T>... iterators) {
return concatWithEagerClose(List.of(iterators));
}

public static <T> CompositeIterator<T> concatWithEagerClose(List<AutoCloseableIterator<T>> iterators) {
return new CompositeIterator<>(iterators);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

package io.airbyte.commons.util;

import com.google.common.collect.AbstractIterator;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;

public class MoreIterators {

Expand Down Expand Up @@ -74,4 +76,22 @@ public static <T> Set<T> toSet(Iterator<T> iterator) {
return set;
}

public static <T> Iterator<T> singletonIteratorFromSupplier(Supplier<T> supplier) {
return new AbstractIterator<T>() {

private boolean hasSupplied = false;

@Override
protected T computeNext() {
if (!hasSupplied) {
hasSupplied = true;
return supplier.get();
} else {
return endOfData();
}
}

};
}

}
2 changes: 1 addition & 1 deletion airbyte-commons/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
</Appenders>

<Loggers>
<Root level="DEBUG">
<Root level="INFO">
<AppenderRef ref="Default"/>
<AppenderRef ref="LogSplit"/>
<AppenderRef ref="AppLogSplit"/>
Expand Down
11 changes: 11 additions & 0 deletions airbyte-commons/src/test/java/io/airbyte/commons/io/IOsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
Expand All @@ -55,6 +56,16 @@ void testReadWrite() throws IOException {
assertEquals("abc", IOs.readFile(path.resolve("file")));
}

@Test
void testWriteBytes() throws IOException {
final Path path = Files.createTempDirectory("tmp");

final Path filePath = IOs.writeFile(path.resolve("file"), "abc".getBytes(StandardCharsets.UTF_8));

assertEquals(path.resolve("file"), filePath);
assertEquals("abc", IOs.readFile(path, "file"));
}

@Test
public void testWriteFileToRandomDir() throws IOException {
final String contents = "something to remember";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
Expand All @@ -43,6 +44,14 @@ void testResourceRead() throws IOException {
assertThrows(IllegalArgumentException.class, () -> MoreResources.readResource("invalid"));
}

@Test
void testReadBytes() throws IOException {
assertEquals("content1\n", new String(MoreResources.readBytes("resource_test"), StandardCharsets.UTF_8));
assertEquals("content2\n", new String(MoreResources.readBytes("subdir/resource_test_sub"), StandardCharsets.UTF_8));

assertThrows(IllegalArgumentException.class, () -> MoreResources.readBytes("invalid"));
}

@Test
void testResourceReadDuplicateName() throws IOException {
assertEquals("content1\n", MoreResources.readResource("resource_test_a"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.2.3",
"dockerImageTag": "0.2.4",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.2.3
dockerImageTag: 0.2.4
documentationUrl: https://hub.docker.com/r/airbyte/source-postgres
- sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01
name: Recurly
Expand Down
104 changes: 104 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/PgLsn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;

/**
* Doc on the structure of a Postgres LSN
* https://www.postgresql.org/docs/current/datatype-pg-lsn.html
*/
public class PgLsn implements Comparable<PgLsn> {

private final long lsn;

public static PgLsn fromLong(final long lsn) {
return new PgLsn(lsn);
}

public static PgLsn fromPgString(final String lsn) {
return new PgLsn(lsnToLong(lsn));
}

private PgLsn(final long lsn) {
this.lsn = lsn;
}

public long asLong() {
return lsn;
}

public String asPgString() {
return longToLsn(lsn);
}

@Override
public int compareTo(final PgLsn o) {
return Long.compare(lsn, o.asLong());
}

/**
* The LSN returned by Postgres is a 64-bit integer represented as hex encoded 32-bit integers
* separated by a /. reference: https://github.com/davecramer/LogicalDecode
*
* @param lsn string representation as returned by postgres
* @return long representation of the lsn string.
*/
@VisibleForTesting
static long lsnToLong(String lsn) {
int slashIndex = lsn.lastIndexOf('/');
Preconditions.checkArgument(slashIndex >= 0);

String logicalXLogStr = lsn.substring(0, slashIndex);
// parses as a long but then cast to int. this allows us to retain the full 32 bits of the integer
// as opposed to the reduced value of Integer.MAX_VALUE.
int logicalXlog = (int) Long.parseLong(logicalXLogStr, 16);
String segmentStr = lsn.substring(slashIndex + 1, lsn.length());
int segment = (int) Long.parseLong(segmentStr, 16);

ByteBuffer buf = ByteBuffer.allocate(8);
buf.putInt(logicalXlog);
buf.putInt(segment);
buf.position(0);
return buf.getLong();
}

@VisibleForTesting
static String longToLsn(long long1) {
int front = (int) (long1 >> 32);
int back = (int) long1;
return (Integer.toHexString(front) + "/" + Integer.toHexString(back)).toUpperCase();
}

@Override
public String toString() {
return "PgLsn{" +
"lsn=" + lsn +
'}';
}

}
45 changes: 45 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/PostgresUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import java.sql.SQLException;
import java.util.List;

public class PostgresUtils {

public static PgLsn getLsn(JdbcDatabase database) throws SQLException {
// pg version 10+.
final List<JsonNode> jsonNodes = database
.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery("SELECT pg_current_wal_lsn()"), JdbcUtils::rowToJson);

Preconditions.checkState(jsonNodes.size() == 1);
return PgLsn.fromPgString(jsonNodes.get(0).get("pg_current_wal_lsn").asText());
}

}
56 changes: 56 additions & 0 deletions airbyte-db/src/test/java/io/airbyte/db/PgLsnTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.junit.jupiter.api.Test;

class PgLsnTest {

private static final Map<String, Long> TEST_LSNS = ImmutableMap.<String, Long>builder()
.put("0/15E7A10", 22968848L)
.put("0/15E7B08", 22969096L)
.put("16/15E7B08", 94512249608L)
.put("16/FFFFFFFF", 98784247807L)
.put("7FFFFFFF/FFFFFFFF", Long.MAX_VALUE)
.put("0/0", 0L)
.build();

@Test
void testLsnToLong() {
TEST_LSNS.forEach(
(key, value) -> assertEquals(value, PgLsn.lsnToLong(key), String.format("Conversion failed. lsn: %s long value: %s", key, value)));
}

@Test
void testLongToLsn() {
TEST_LSNS.forEach(
(key, value) -> assertEquals(key, PgLsn.longToLsn(value), String.format("Conversion failed. lsn: %s long value: %s", key, value)));
}

}
Loading

0 comments on commit 2b19da8

Please sign in to comment.