Skip to content

Commit

Permalink
[Feature][Connector-V2] Starrocks implements multi table sink (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
jw-itq authored Jan 9, 2025
1 parent f08e142 commit 55eebfa
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink;
import org.apache.seatunnel.api.table.catalog.Catalog;
Expand All @@ -40,7 +41,7 @@
import java.util.Optional;

public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportSaveMode, SupportSchemaEvolutionSink {
implements SupportSaveMode, SupportSchemaEvolutionSink, SupportMultiTableSink {

private final TableSchema tableSchema;
private final SinkConfig sinkConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Arrays;
import java.util.List;

import static org.apache.seatunnel.api.sink.SinkCommonOptions.MULTI_TABLE_SINK_REPLICA;
import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions.DATA_SAVE_MODE;

@AutoService(Factory.class)
Expand Down Expand Up @@ -64,6 +65,7 @@ public OptionRule optionRule() {
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
StarRocksSinkOptions.SCHEMA_SAVE_MODE,
StarRocksSinkOptions.DATA_SAVE_MODE,
MULTI_TABLE_SINK_REPLICA,
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
.conditional(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;

import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
Expand Down Expand Up @@ -46,7 +47,7 @@

@Slf4j
public class StarRocksSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportSchemaEvolutionSinkWriter {
implements SupportMultiTableSinkWriter<Void>, SupportSchemaEvolutionSinkWriter {
private StarRocksISerializer serializer;
private StarRocksSinkManager manager;
private TableSchema tableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ VALUES (101,"scooter","Small 2-wheel scooter",3.14),
(108,"jacket","water resistent black wind breaker",0.1),
(109,"spare tire","24 inch spare tire",22.2);


drop table if exists products_on_hand;
CREATE TABLE products_on_hand (
product_id INTEGER NOT NULL PRIMARY KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void initializeStarRocksServer() {

@TestTemplate
public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer container)
throws InterruptedException, IOException {
throws InterruptedException, IOException, SQLException {
String jobId = String.valueOf(JobIdGenerator.newJobId());
String jobConfigFile = "/mysqlcdc_to_starrocks_with_schema_change.conf";
CompletableFuture.runAsync(
Expand All @@ -187,16 +187,26 @@ public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer container)
}
});
TimeUnit.SECONDS.sleep(20);

// verify multi table sink
verifyDataConsistency("orders");
verifyDataConsistency("customers");

// waiting for case1 completed
assertSchemaEvolutionForAddColumns(
DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection);

assertSchemaEvolutionForDropColumns(
DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection);

insertNewDataIntoMySQL();
insertNewDataIntoMySQL();
// verify incremental
verifyDataConsistency("orders");

// savepoint 1
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

insertNewDataIntoMySQL();
// case2 drop columns with cdc data at same time
shopDatabase.setTemplateName("drop_columns").createAndInitialize();

Expand Down Expand Up @@ -240,6 +250,30 @@ public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer container)
// waiting for case3/case4 completed
assertTableStructureAndData(
DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection);
insertNewDataIntoMySQL();
// verify restore
verifyDataConsistency("orders");
}

private void insertNewDataIntoMySQL() throws SQLException {
mysqlConnection
.createStatement()
.execute(
"INSERT INTO orders (id, customer_id, order_date, total_amount, status) "
+ "VALUES (null, 1, '2025-01-04 13:00:00', 498.99, 'pending')");
}

private void verifyDataConsistency(String tableName) {
await().atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertIterableEquals(
query(
String.format(QUERY, DATABASE, tableName),
mysqlConnection),
query(
String.format(QUERY, DATABASE, tableName),
starRocksConnection)));
}

private void assertSchemaEvolutionForAddColumns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,30 @@ CREATE TABLE products (
weight FLOAT
);

drop table if exists orders;

CREATE TABLE orders (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
customer_id BIGINT NOT NULL,
order_date DATETIME NOT NULL,
total_amount DECIMAL ( 10, 2 ) NOT NULL,
STATUS VARCHAR ( 50 ) DEFAULT 'pending',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

drop table if exists customers;

CREATE TABLE customers (
id BIGINT PRIMARY KEY,
NAME VARCHAR ( 255 ) NOT NULL,
email VARCHAR ( 255 ) NOT NULL,
phone VARCHAR ( 50 ),
address TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
Expand All @@ -41,4 +65,16 @@ VALUES (101,"scooter","Small 2-wheel scooter",3.14),
(106,"hammer","16oz carpenter's hammer",1.0),
(107,"rocks","box of assorted rocks",5.3),
(108,"jacket","water resistent black wind breaker",0.1),
(109,"spare tire","24 inch spare tire",22.2);
(109,"spare tire","24 inch spare tire",22.2);

INSERT INTO orders ( id, customer_id, order_date, total_amount, STATUS )
VALUES
( 1, 1, '2024-01-01 10:00:00', 299.99, 'completed' ),
( 2, 2, '2024-01-02 11:00:00', 199.99, 'completed' ),
( 3, 3, '2024-01-03 12:00:00', 399.99, 'processing' );

INSERT INTO customers ( id, NAME, email, phone, address )
VALUES
( 1, 'John Doe', '[email protected]', '123-456-7890', '123 Main St' ),
( 2, 'Jane Smith', '[email protected]', '234-567-8901', '456 Oak Ave' ),
( 3, 'Bob Johnson', '[email protected]', '345-678-9012', '789 Pine Rd' );
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@
env {
# You can set engine configuration here
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
checkpoint.interval = 2000
}

source {
MySQL-CDC {
username = "st_user_source"
password = "mysqlpw"
table-names = ["shop.products"]
table-names = ["shop.products", "shop.orders", "shop.customers"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"

schema-changes.enabled = true
Expand Down

0 comments on commit 55eebfa

Please sign in to comment.