From 937c7aa1530d1a4db206ed43839eaa9c3b96c4b9 Mon Sep 17 00:00:00 2001 From: dailai Date: Mon, 11 Mar 2024 16:48:50 +0800 Subject: [PATCH] [Improve][Paimon] Replace the test case by fakesource and add doc --- docs/en/connector-v2/sink/Paimon.md | 82 +++-- docs/zh/connector-v2/sink/Paimon.md | 87 +++++ .../paimon/sink/PaimonSinkFactory.java | 9 +- .../e2e/connector/paimon/PaimonSinkCDCIT.java | 337 ++++++------------ ...1.conf => fake_cdc_sink_paimon_case1.conf} | 66 +++- .../resources/fake_cdc_sink_paimon_case2.conf | 142 ++++++++ .../test/resources/mysql/server-gtids/my.cnf | 65 ---- .../src/test/resources/mysql/setup.sql | 24 -- 8 files changed, 457 insertions(+), 355 deletions(-) create mode 100644 docs/zh/connector-v2/sink/Paimon.md rename seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/{mysql_cdc_sink_paimon_case1.conf => fake_cdc_sink_paimon_case1.conf} (54%) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql/server-gtids/my.cnf delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql/setup.sql diff --git a/docs/en/connector-v2/sink/Paimon.md b/docs/en/connector-v2/sink/Paimon.md index 6fa721a1e63..5e9d3c431f7 100644 --- a/docs/en/connector-v2/sink/Paimon.md +++ b/docs/en/connector-v2/sink/Paimon.md @@ -4,7 +4,7 @@ ## Description -Write data to Apache Paimon. +Sink connector for Apache Paimon. It can support cdc mode 、auto create table. ## Key features @@ -12,40 +12,76 @@ Write data to Apache Paimon. ## Options -| name | type | required | default value | -|----------------|--------|----------|---------------| -| warehouse | String | Yes | - | -| database | String | Yes | - | -| table | String | Yes | - | -| hdfs_site_path | String | No | - | +| name | type | required | default value | Description | +|------------------|--------|----------|------------------------------|---------------------------------| +| warehouse | String | Yes | - | Paimon warehouse path | +| database | String | Yes | - | The database you want to access | +| table | String | Yes | - | The table you want to access | +| hdfs_site_path | String | No | - | | +| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | The schema save mode | +| data_save_mode | Enum | no | APPEND_DATA | The data save mode | -### warehouse [string] - -Paimon warehouse path - -### database [string] +## Examples -The database you want to access +### Single table -### table [String] +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} -The table you want to access +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + table-names = ["seatunnel.role"] + } +} -## Examples +transform { +} -```hocon sink { Paimon { - warehouse = "/tmp/paimon" - database = "default" - table = "st_test" + catalog_name="seatunnel_test" + warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/" + database="seatunnel" + table="role" } } ``` -## Changelog +### Multiple table + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} -### next version +transform { +} -- Add Paimon Sink Connector +sink { + Paimon { + catalog_name="seatunnel_test" + warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/" + database="${database_name}" + table="${table_name}" + } +} +``` diff --git a/docs/zh/connector-v2/sink/Paimon.md b/docs/zh/connector-v2/sink/Paimon.md new file mode 100644 index 00000000000..82c23aef7ad --- /dev/null +++ b/docs/zh/connector-v2/sink/Paimon.md @@ -0,0 +1,87 @@ +# Paimon + +> Paimon 数据连接器 + +## 描述 + +Apache Paimon数据连接器。支持cdc写以及自动建表。 + +## 主要特性 + +- [x] [exactly-once](../../concept/connector-v2-features.md) + +## 连接器选项 + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +|------------------|--------|------|------------------------------|-------------------| +| warehouse | String | Yes | - | Paimon warehouse路 | +| database | String | Yes | - | 数据库名称 | +| table | String | Yes | - | 表名 | +| hdfs_site_path | String | No | - | | +| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式 | +| data_save_mode | Enum | no | APPEND_DATA | 数据保存模式 | + +## 示例 + +### 单表 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + table-names = ["seatunnel.role"] + } +} + +transform { +} + +sink { + Paimon { + catalog_name="seatunnel_test" + warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/" + database="seatunnel" + table="role" + } +} +``` + +### 多表 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + Paimon { + catalog_name="seatunnel_test" + warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/" + database="${database_name}" + table="${table_name}" + } +} +``` + diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java index 35f9d319d7a..c0b4d997ead 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java @@ -49,10 +49,11 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(PaimonConfig.WAREHOUSE) - .required(PaimonConfig.DATABASE) - .required(PaimonConfig.TABLE) - .optional(PaimonConfig.HDFS_SITE_PATH) + .required(PaimonConfig.WAREHOUSE, PaimonConfig.DATABASE, PaimonConfig.TABLE) + .optional( + PaimonConfig.HDFS_SITE_PATH, + PaimonSinkConfig.SCHEMA_SAVE_MODE, + PaimonSinkConfig.DATA_SAVE_MODE) .build(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java index f25c04a9425..ae391e36f07 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java @@ -18,15 +18,12 @@ package org.apache.seatunnel.e2e.connector.paimon; import org.apache.seatunnel.common.utils.FileUtils; -import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; -import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; -import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; @@ -45,10 +42,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.DockerLoggerFactory; import lombok.AllArgsConstructor; import lombok.Data; @@ -56,16 +49,10 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; -import static java.lang.Thread.sleep; import static org.awaitility.Awaitility.given; @DisabledOnContainer( @@ -75,191 +62,148 @@ "Spark and Flink engine can not auto create paimon table on worker node(e.g flink tm) by org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink.PaimonSink which can lead error") @Slf4j public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource { - - private static final String CATALOG_DIR = "/tmp/paimon/"; - private static final String NAMESPACE = "seatunnel_namespace"; - private static final String MYSQL_HOST = "paimon-e2e"; - private static final String MYSQL_USER_NAME = "st_user"; - private static final String MYSQL_USER_PASSWORD = "Abc!@#135_seatunnel"; - private static final String SOURCE_TABLE = "mysql_cdc_e2e_source_table"; - private static final String SOURCE_DATABASE = "db"; - private static final String DATABASE_SUFFIX = ".db"; + private static final String CATALOG_ROOT_DIR = "/tmp/"; + private static final String NAMESPACE = "paimon"; + private static final String NAMESPACE_TAR = "paimon.tar.gz"; + private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE + "/"; private static final String TARGET_TABLE = "st_test"; private static final String TARGET_DATABASE = "seatunnel_namespace"; - private MySqlContainer mysqlContainer; + private static final String FAKE_TABLE1 = "FakeTable1"; + private static final String FAKE_DATABASE1 = "FakeDatabase1"; + private static final String FAKE_TABLE2 = "FakeTable1"; + private static final String FAKE_DATABASE2 = "FakeDatabase2"; @BeforeAll @Override - public void startUp() throws Exception { - log.info("Mysql container starting"); - this.mysqlContainer = startMySqlContainer(); - log.info("Mysql container started"); - initializeMysqlTable(); - } + public void startUp() throws Exception {} @AfterAll @Override - public void tearDown() throws Exception { - if (mysqlContainer != null) { - mysqlContainer.close(); - } - } + public void tearDown() throws Exception {} @TestTemplate - public void testMysqlCDCSinkPaimon(TestContainer container) throws Exception { - clearTable(SOURCE_DATABASE, SOURCE_TABLE); - CompletableFuture.supplyAsync( - () -> { - try { - container.executeJob("/mysql_cdc_sink_paimon_case1.conf"); - } catch (Exception e) { - log.error("Commit task exception :" + e.getMessage()); - throw new RuntimeException(e); - } - return null; - }); - insertAndCheckData(container); - upsertAndCheckData(container); - } + public void testFakeCDCSinkPaimon(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case1.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); - private MySqlContainer startMySqlContainer() { - MySqlContainer container = - new MySqlContainer(MySqlVersion.V8_0) - .withConfigurationOverride("mysql/server-gtids/my.cnf") - .withSetupSQL("mysql/setup.sql") - .withNetwork(NETWORK) - .withNetworkAliases(MYSQL_HOST) - .withDatabaseName(SOURCE_DATABASE) - .withUsername(MYSQL_USER_NAME) - .withPassword(MYSQL_USER_PASSWORD) - .withLogConsumer( - new Slf4jLogConsumer( - DockerLoggerFactory.getLogger("mysql-mysql-image"))); - - Startables.deepStart(Stream.of(container)).join(); - return container; - } - - // Execute SQL - private void executeSql(String sql) { - try (Connection connection = getJdbcConnection()) { - connection.createStatement().execute(sql); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - private Connection getJdbcConnection() throws SQLException { - return DriverManager.getConnection( - mysqlContainer.getJdbcUrl(), - mysqlContainer.getUsername(), - mysqlContainer.getPassword()); - } - - private void insertAndCheckData(TestContainer container) - throws InterruptedException, IOException { - // Init table data - initSourceTableData(SOURCE_DATABASE, SOURCE_TABLE); - // Waiting 30s for source capture data - sleep(30000); - - // stream stage given().ignoreExceptions() .await() - .atMost(60000, TimeUnit.MILLISECONDS) + .atLeast(100L, TimeUnit.MILLISECONDS) + .atMost(30L, TimeUnit.SECONDS) .untilAsserted( () -> { - // copy iceberg to local + // copy paimon to local container.executeExtraCommands(containerExtendedFactory); - Assertions.assertEquals(3, loadPaimonData().size()); + List paimonRecords = + loadPaimonData(TARGET_DATABASE, TARGET_TABLE); + Assertions.assertEquals(2, paimonRecords.size()); + paimonRecords.forEach( + paimonRecord -> { + if (paimonRecord.getPkId() == 1) { + Assertions.assertEquals("A_1", paimonRecord.getName()); + } + if (paimonRecord.getPkId() == 3) { + Assertions.assertEquals("C", paimonRecord.getName()); + } + }); }); + + cleanPaimonTable(container); } - private void upsertAndCheckData(TestContainer container) - throws InterruptedException, IOException { - upsertDeleteSourceTable(SOURCE_DATABASE, SOURCE_TABLE); - // Waiting 60s for source capture data - sleep(30000); + @TestTemplate + public void testFakeMultipleTableSinkPaimon(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case2.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); - // stream stage given().ignoreExceptions() .await() - .atMost(60000, TimeUnit.MILLISECONDS) + .atLeast(100L, TimeUnit.MILLISECONDS) + .atMost(30L, TimeUnit.SECONDS) .untilAsserted( () -> { - // copy iceberg to local + // copy paimon to local container.executeExtraCommands(containerExtendedFactory); - List internalRows = loadPaimonData(); - Assertions.assertEquals(5, internalRows.size()); - for (PaimonRecord paimonRecord : internalRows) { - if (paimonRecord.getPkId() == 3) { - Assertions.assertEquals(150, paimonRecord.getScore()); - } - } + // Check FakeDatabase1.FakeTable1 + List fake1PaimonRecords = + loadPaimonData(FAKE_DATABASE1, FAKE_TABLE1); + Assertions.assertEquals(2, fake1PaimonRecords.size()); + fake1PaimonRecords.forEach( + paimonRecord -> { + if (paimonRecord.getPkId() == 1) { + Assertions.assertEquals("A_1", paimonRecord.getName()); + } + if (paimonRecord.getPkId() == 3) { + Assertions.assertEquals("C", paimonRecord.getName()); + } + }); + // Check FakeDatabase2.FakeTable1 + List fake2PaimonRecords = + loadPaimonData(FAKE_DATABASE2, FAKE_TABLE2); + Assertions.assertEquals(2, fake2PaimonRecords.size()); + fake2PaimonRecords.forEach( + paimonRecord -> { + if (paimonRecord.getPkId() == 100) { + Assertions.assertEquals( + "A_100", paimonRecord.getName()); + } + if (paimonRecord.getPkId() == 200) { + Assertions.assertEquals("C", paimonRecord.getName()); + } + }); }); + + cleanPaimonTable(container); } - private String driverUrl() { - return "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar"; + protected final ContainerExtendedFactory cleanContainerExtendedFactory = + genericContainer -> + genericContainer.execInContainer("sh", "-c", "rm -rf " + CATALOG_DIR + "**"); + + private void cleanPaimonTable(TestContainer container) + throws IOException, InterruptedException { + // clean table + container.executeExtraCommands(cleanContainerExtendedFactory); } - @TestContainerExtension - protected final ContainerExtendedFactory extendedFactory = + protected final ContainerExtendedFactory containerExtendedFactory = container -> { - container.execInContainer("sh", "-c", "mkdir -p " + CATALOG_DIR); - container.execInContainer("sh", "-c", "chmod -R 777 " + CATALOG_DIR); - Container.ExecResult extraCommands = - container.execInContainer( - "sh", - "-c", - "mkdir -p /tmp/seatunnel/plugins/MySQL-CDC/lib && cd /tmp/seatunnel/plugins/MySQL-CDC/lib && wget " - + driverUrl()); - Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR); + FileUtils.createNewDir(CATALOG_DIR); + container.execInContainer( + "sh", + "-c", + "cd " + + CATALOG_ROOT_DIR + + " && tar -czvf " + + NAMESPACE_TAR + + " " + + NAMESPACE); + container.copyFileFromContainer( + CATALOG_ROOT_DIR + NAMESPACE_TAR, CATALOG_ROOT_DIR + NAMESPACE_TAR); + extractFiles(); }; - private final String NAMESPACE_TAR = NAMESPACE + ".tar.gz"; - protected final ContainerExtendedFactory containerExtendedFactory = - new ContainerExtendedFactory() { - @Override - public void extend(GenericContainer container) - throws IOException, InterruptedException { - FileUtils.createNewDir(CATALOG_DIR); - container.execInContainer( - "sh", - "-c", - "cd " - + CATALOG_DIR - + " && tar -czvf " - + NAMESPACE_TAR - + " " - + NAMESPACE - + DATABASE_SUFFIX); - container.copyFileFromContainer( - CATALOG_DIR + NAMESPACE_TAR, CATALOG_DIR + NAMESPACE_TAR); - extractFiles(); - } - - private void extractFiles() { - ProcessBuilder processBuilder = new ProcessBuilder(); - processBuilder.command( - "sh", "-c", "cd " + CATALOG_DIR + " && tar -zxvf " + NAMESPACE_TAR); - try { - Process process = processBuilder.start(); - // 等待命令执行完成 - int exitCode = process.waitFor(); - if (exitCode == 0) { - log.info("Extract files successful."); - } else { - log.error("Extract files failed with exit code " + exitCode); - } - } catch (IOException | InterruptedException e) { - e.printStackTrace(); - } - } - }; + private void extractFiles() { + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command( + "sh", "-c", "cd " + CATALOG_ROOT_DIR + " && tar -zxvf " + NAMESPACE_TAR); + try { + Process process = processBuilder.start(); + // 等待命令执行完成 + int exitCode = process.waitFor(); + if (exitCode == 0) { + log.info("Extract files successful."); + } else { + log.error("Extract files failed with exit code " + exitCode); + } + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } - private List loadPaimonData() throws Exception { - Table table = getTable(); + private List loadPaimonData(String dbName, String tbName) throws Exception { + Table table = getTable(dbName, tbName); ReadBuilder readBuilder = table.newReadBuilder(); TableScan.Plan plan = readBuilder.newScan().plan(); TableRead tableRead = readBuilder.newRead(); @@ -273,18 +217,8 @@ private List loadPaimonData() throws Exception { try (RecordReader reader = tableRead.createReader(plan)) { reader.forEachRemaining( row -> { - result.add( - new PaimonRecord( - row.getLong(0), - row.getString(1).toString(), - row.getInt(2))); - log.info( - "key_id:" - + row.getLong(0) - + ", name:" - + row.getString(1) - + ", score:" - + row.getInt(2)); + result.add(new PaimonRecord(row.getLong(0), row.getString(1).toString())); + log.info("key_id:" + row.getLong(0) + ", name:" + row.getString(1)); }); } log.info( @@ -296,17 +230,17 @@ private List loadPaimonData() throws Exception { return result; } - private Table getTable() { + private Table getTable(String dbName, String tbName) { try { - return getCatalog().getTable(getIdentifier()); + return getCatalog().getTable(getIdentifier(dbName, tbName)); } catch (Catalog.TableNotExistException e) { // do something throw new RuntimeException("table not exist"); } } - private Identifier getIdentifier() { - return Identifier.create(TARGET_DATABASE, TARGET_TABLE); + private Identifier getIdentifier(String dbName, String tbName) { + return Identifier.create(dbName, tbName); } private Catalog getCatalog() { @@ -316,54 +250,11 @@ private Catalog getCatalog() { return catalog; } - private void initializeMysqlTable() { - String sql = - String.format( - "create table if not exists %s.%s(\n" - + " `pk_id` bigint primary key,\n" - + " `name` varchar(255),\n" - + " `score` int\n" - + ")", - SOURCE_DATABASE, SOURCE_TABLE); - executeSql(sql); - } - - private void clearTable(String database, String tableName) { - executeSql("truncate table " + database + "." + tableName); - } - - private void initSourceTableData(String database, String tableName) { - executeSql( - "INSERT INTO " - + database - + "." - + tableName - + " ( pk_id, name, score )\n" - + "VALUES ( 1, 'person1', 100 ),\n" - + " ( 2, 'person2', 99 ),\n" - + " ( 3, 'person3', 98 );\n"); - } - - private void upsertDeleteSourceTable(String database, String tableName) { - executeSql( - "INSERT INTO " - + database - + "." - + tableName - + " ( pk_id, name, score )\n" - + "VALUES ( 4, 'person4', 100 ),\n" - + " ( 5, 'person5', 99 ),\n" - + " ( 7, 'person6', 98 );\n"); - executeSql("DELETE FROM " + database + "." + tableName + " where pk_id = 2"); - executeSql("UPDATE " + database + "." + tableName + " SET score = 150 where pk_id = 3"); - } - @Data @NoArgsConstructor @AllArgsConstructor public class PaimonRecord { private Long pkId; private String name; - private Integer score; } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_sink_paimon_case1.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf similarity index 54% rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_sink_paimon_case1.conf rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf index 2c8a49161a4..59e3a0cf727 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_sink_paimon_case1.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf @@ -20,33 +20,67 @@ env { parallelism = 1 - job.mode = "STREAMING" - checkpoint.interval = 5000 + job.mode = "BATCH" } source { - MySQL-CDC { - result_table_name="customer_result_table" - catalog { - factory = Mysql + FakeSource { + schema = { + fields { + pk_id = bigint + name = string + score = int + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } } - database-names=["db"] - table-names = ["db.mysql_cdc_e2e_source_table"] - format=DEFAULT - username = "st_user" - password = "Abc!@#135_seatunnel" - base-url = "jdbc:mysql://paimon-e2e:3306/db" + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + } + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + } + ] } } -transform { -} - sink { Paimon { warehouse = "file:///tmp/paimon" database = "seatunnel_namespace" table = "st_test" - primary_key = "pk_id" } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf new file mode 100644 index 00000000000..ddc92268710 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf @@ -0,0 +1,142 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "FakeDatabase1.FakeTable1" + fields { + pk_id = bigint + name = string + score = int + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + } + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + } + ] + }, + { + schema = { + table = "FakeDatabase2.FakeTable1" + fields { + pk_id = bigint + name = string + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [100, "A"] + }, + { + kind = INSERT + fields = [200, "B"] + }, + { + kind = INSERT + fields = [300, "C"] + }, + { + kind = INSERT + fields = [300, "C"] + }, + { + kind = INSERT + fields = [300, "C"] + }, + { + kind = INSERT + fields = [300, "C"] + } + { + kind = UPDATE_BEFORE + fields = [100, "A"] + }, + { + kind = UPDATE_AFTER + fields = [100, "A_100"] + }, + { + kind = DELETE + fields = [200, "B"] + } + ] + } + ] + } +} + +sink { + Paimon { + warehouse = "file:///tmp/paimon" + database = "${database_name}" + table = "${table_name}" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql/server-gtids/my.cnf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql/server-gtids/my.cnf deleted file mode 100644 index a390897885d..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql/server-gtids/my.cnf +++ /dev/null @@ -1,65 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# For advice on how to change settings please see -# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html - -[mysqld] -# -# Remove leading # and set to the amount of RAM for the most important data -# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. -# innodb_buffer_pool_size = 128M -# -# Remove leading # to turn on a very important data integrity option: logging -# changes to the binary log between backups. -# log_bin -# -# Remove leading # to set options mainly useful for reporting servers. -# The server defaults are faster for transactions and fast SELECTs. -# Adjust sizes as needed, experiment to find the optimal values. -# join_buffer_size = 128M -# sort_buffer_size = 2M -# read_rnd_buffer_size = 2M -skip-host-cache -skip-name-resolve -#datadir=/var/lib/mysql -#socket=/var/lib/mysql/mysql.sock -secure-file-priv=/var/lib/mysql -user=mysql - -# Disabling symbolic-links is recommended to prevent assorted security risks -symbolic-links=0 - -#log-error=/var/log/mysqld.log -#pid-file=/var/run/mysqld/mysqld.pid - -# ---------------------------------------------- -# Enable the binlog for replication & CDC -# ---------------------------------------------- - -# Enable binary replication log and set the prefix, expiration, and log format. -# The prefix is arbitrary, expiration can be short for integration tests but would -# be longer on a production system. Row-level info is required for ingest to work. -# Server ID is required, but this will vary on production systems -server-id = 223344 -log_bin = mysql-bin -expire_logs_days = 1 -binlog_format = row - -# enable gtid mode -gtid_mode = on -enforce_gtid_consistency = on \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql/setup.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql/setup.sql deleted file mode 100644 index 7ed5e544c68..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql/setup.sql +++ /dev/null @@ -1,24 +0,0 @@ --- --- Licensed to the Apache Software Foundation (ASF) under one or more --- contributor license agreements. See the NOTICE file distributed with --- this work for additional information regarding copyright ownership. --- The ASF licenses this file to You under the Apache License, Version 2.0 --- (the "License"); you may not use this file except in compliance with --- the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. --- - --- In production you would almost certainly limit the replication user must be on the follower (slave) machine, --- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'. --- However, in this database we'll grant 2 users different privileges: --- --- 1) 'root' - all privileges --- -GRANT ALL PRIVILEGES ON *.* TO 'st_user'@'%';