Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): sink one-dimensional array data type to postgres and mysql #10032

Merged
merged 31 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0b1ffcc
add date, time data types for jdbc sink(pg)
StrikeW May 22, 2023
aa2977b
clean
StrikeW May 23, 2023
393b9fb
add support for interval and jsonb
StrikeW May 23, 2023
01b28fb
check downstream database type
StrikeW May 24, 2023
4c44ab5
add date, time, jsonb for jdbc sink json format
WillyKidd May 25, 2023
bba0327
save work
StrikeW May 26, 2023
66dc210
array type
StrikeW May 28, 2023
46ad92b
minor
StrikeW May 29, 2023
39f3ed2
save work
StrikeW May 29, 2023
d1e92f8
stream_chunk array
StrikeW May 30, 2023
14d981f
call static method
StrikeW May 30, 2023
a39f3a2
minor
StrikeW May 30, 2023
d98e4b4
fix
StrikeW May 30, 2023
334f4ec
fix
StrikeW May 30, 2023
4431952
Merge branch 'siyuan/jdbc-sink-array' of github.com:risingwavelabs/ri…
StrikeW May 30, 2023
1e2294a
minor
StrikeW May 31, 2023
fd90a4f
Merge remote-tracking branch 'origin/main' into siyuan/jdbc-sink-array
StrikeW May 31, 2023
7a58b0a
convert array to text for mysql
StrikeW May 31, 2023
c551568
minor
StrikeW May 31, 2023
611a305
fix
StrikeW May 31, 2023
fce807a
fix
StrikeW May 31, 2023
44271c1
fix
StrikeW May 31, 2023
ae18d16
fix e2e test
StrikeW May 31, 2023
d2a9396
change ulimit
StrikeW Jun 1, 2023
baebd75
fix mysql table ddl
StrikeW Jun 1, 2023
42385aa
fix mysql expect output
StrikeW Jun 1, 2023
65282f9
fix array string
StrikeW Jun 5, 2023
c1dd758
fix
StrikeW Jun 5, 2023
68cebef
Merge remote-tracking branch 'origin/main' into siyuan/jdbc-sink-array
StrikeW Jun 5, 2023
d9f9d49
minor
StrikeW Jun 5, 2023
833a753
refactor test cases
StrikeW Jun 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ download_java_binding "$profile"
# TODO: Switch to stream_chunk encoding once it's completed, and then remove json encoding as well as this env var.
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk

# Change process number limit
echo "--- os limits"
ulimit -a

echo "--- Download connector node package"
buildkite-agent artifact download risingwave-connector.tar.gz ./
mkdir ./connector-node
Expand Down Expand Up @@ -110,6 +114,15 @@ else
exit 1
fi

diff -u ./e2e_test/sink/remote/mysql_expected_result_2.tsv \
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_types ORDER BY id")
if [ $? -eq 0 ]; then
echo "mysql sink check 0 passed"
else
echo "The output is not as expected."
exit 1
fi

echo "--- testing kafka sink"
./ci/scripts/e2e-kafka-sink-test.sh
if [ $? -eq 0 ]; then
Expand Down
10 changes: 9 additions & 1 deletion e2e_test/sink/remote/jdbc.check.pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,16 @@ select * from t_remote_0 order by id;
query II
select * from t_remote_1 order by id;
----
1 Alex Text value 1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 2023-05-22 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value"} \xdeadbeef
1 Alex Text value 1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 2023-05-22 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value"} \xdeadbeef
3 Varchar value 3 Text value 3 345 678 901 34.56 78.9 12.34 t 2023-05-24 12:34:56 2023-05-24 12:34:56 2023-05-24 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value3"} \xcafebabe
4 Varchar value 4 Text value 4 456 789 12 45.67 89.01 23.45 f 2023-05-25 23:45:01 2023-05-25 23:45:01 2023-05-25 23:45:01+00 2 years 3 mons 4 days 05:06:07 {"key": "value4"} \xbabec0de
5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 t 2023-05-26 12:34:56 2023-05-26 12:34:56 2023-05-26 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value5"} \xdeadbabe
6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 f 2023-05-27 23:45:01 2023-05-27 23:45:01 2023-05-27 23:45:01+00 2 years 3 mons 4 days 05:06:07 {"key": "value6"} \xdeadbabe


query III
select * from t_types order by id;
----
1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"Value 1","Value 2"} {12.345,56.789}
2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} {"Value 3","Value 4"} {43.21,65.432}
3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"How're you?","\"hello\\ \\world\""} {12.345,56.789}
52 changes: 52 additions & 0 deletions e2e_test/sink/remote/jdbc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,27 @@ CREATE TABLE t_remote_1 (
v_bytea BYTEA
);

statement ok
CREATE TABLE rw_types (
id BIGINT PRIMARY KEY,
varchar_column VARCHAR,
text_column TEXT,
integer_column INTEGER,
smallint_column SMALLINT,
bigint_column BIGINT,
decimal_column DECIMAL,
real_column REAL,
double_column DOUBLE PRECISION,
boolean_column BOOLEAN,
date_column DATE,
time_column TIME,
timestamp_column TIMESTAMP,
interval_column INTERVAL,
jsonb_column JSONB,
array_column VARCHAR[],
array_column2 FLOAT[]
);

statement ok
create materialized view mv_remote_0 as select * from t_remote_0;

Expand Down Expand Up @@ -70,6 +91,22 @@ CREATE SINK s_mysql_1 FROM mv_remote_1 WITH (
type='upsert'
);

statement ok
CREATE SINK s2_postgres FROM rw_types WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name='t_types',
type='upsert'
);

statement ok
CREATE SINK s2_mysql FROM rw_types WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw',
table.name='t_types',
type='upsert'
);

statement ok
INSERT INTO t_remote_0 VALUES
(1, 'Alice', 28208, 281620391, 4986480304337356659, 28162.0391, 2.03, 28162.0391, '2023-03-20 10:18:30'),
Expand All @@ -94,6 +131,12 @@ INSERT INTO t_remote_1 VALUES
(5, 'Varchar value 5', 'Text value 5', 567, 890, 123, 56.78, 90.12, 34.56, TRUE, '2023-05-26', '12:34:56', '2023-05-26 12:34:56', '2023-05-26 12:34:56', '2 years 3 months 4 days 5 hours 6 minutes 7 seconds', '{"key": "value5"}', E'\\xDEADBABE'),
(6, 'Varchar value 6', 'Text value 6', 789, 123, 456, 67.89, 34.56, 78.91, FALSE, '2023-05-27', '23:45:01', '2023-05-27 23:45:01', '2023-05-27 23:45:01', '2 years 3 months 4 days 5 hours 6 minutes 7 seconds', '{"key": "value6"}', E'\\xDEADBABE');

statement ok
INSERT INTO rw_types (id, varchar_column, text_column, integer_column, smallint_column, bigint_column, decimal_column, real_column, double_column, boolean_column, date_column, time_column, timestamp_column, interval_column, jsonb_column, array_column, array_column2) VALUES
(1, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['Value 1', 'Value 2'], '{12.345,56.789}'),
(2, 'Varcharvalue2', 'Textvalue2', 234, 567, 890, 23.45, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '2 days', '{"key": "value2"}', ARRAY['Value 3', 'Value 4'], '{43.21,65.432}'),
(3, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['How''re you?', '"hello\ \world"'], ARRAY[12.345,56.789]);

statement ok
FLUSH;

Expand All @@ -118,6 +161,12 @@ DROP SINK s_postgres_0;
statement ok
DROP SINK s_postgres_1;

statement ok
DROP SINK s2_postgres;

statement ok
DROP SINK s2_mysql;

statement ok
DROP SINK s_mysql_0;

Expand All @@ -136,5 +185,8 @@ DROP TABLE t_remote_0;
statement ok
DROP TABLE t_remote_1;

statement ok
DROP TABLE rw_types;

statement ok
FLUSH;
20 changes: 20 additions & 0 deletions e2e_test/sink/remote/mysql_create_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,23 @@ CREATE TABLE t_remote_1 (
v_jsonb JSON,
v_bytea BLOB
);

CREATE TABLE t_types (
id BIGINT PRIMARY KEY,
varchar_column VARCHAR(100),
text_column TEXT,
integer_column INTEGER,
smallint_column SMALLINT,
bigint_column BIGINT,
decimal_column DECIMAL(10,2),
real_column float,
double_column DOUBLE,
boolean_column TINYINT,
date_column DATE,
time_column TIME,
timestamp_column TIMESTAMP,
interval_column VARCHAR(100),
jsonb_column JSON,
array_column LONGTEXT,
array_column2 LONGTEXT
);
3 changes: 3 additions & 0 deletions e2e_test/sink/remote/mysql_expected_result_2.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} Value 1,Value 2 12.345,56.789
2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 0 2023-05-23 23:45:01 2023-05-23 23:45:01 P0Y0M2DT0H0M0S {"key": "value2"} Value 3,Value 4 43.21,65.432
3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} How're you?,"hello\ \world" 12.345,56.789
21 changes: 21 additions & 0 deletions e2e_test/sink/remote/pg_create_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,24 @@ CREATE TABLE t_remote_1 (
v_jsonb JSONB,
v_bytea BYTEA
);


CREATE TABLE t_types (
id BIGINT PRIMARY KEY,
varchar_column VARCHAR(100),
text_column TEXT,
integer_column INTEGER,
smallint_column SMALLINT,
bigint_column BIGINT,
decimal_column DECIMAL,
real_column REAL,
double_column DOUBLE PRECISION,
boolean_column BOOLEAN,
date_column DATE,
time_column TIME,
timestamp_column TIMESTAMP,
interval_column INTERVAL,
jsonb_column JSONB,
array_column VARCHAR[],
array_column2 DECIMAL[]
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector.api;

import com.risingwave.proto.Data;

public class ColumnDesc {
String name;
Data.DataType dataType;

public ColumnDesc(String name, Data.DataType dataType) {
this.name = name;
this.dataType = dataType;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Data.DataType getDataType() {
return dataType;
}

public void setDataType(Data.DataType dataType) {
this.dataType = dataType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

package com.risingwave.connector.api;

import com.google.common.collect.Lists;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.Data;
import com.risingwave.proto.Data.DataType.TypeName;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -27,18 +28,21 @@ public class TableSchema {
private final List<String> columnNames;
private final Map<String, TypeName> columns;
private final Map<String, Integer> columnIndices;
private List<ColumnDesc> columnDescs;

private final List<String> primaryKeys;

public TableSchema(
List<String> columnNames, List<TypeName> typeNames, List<String> primaryKeys) {
List<String> columnNames, List<Data.DataType> dataTypes, List<String> primaryKeys) {
this.columnNames = columnNames;
this.primaryKeys = primaryKeys;
this.columns = new HashMap<>();
this.columnIndices = new HashMap<>();
this.columnDescs = new ArrayList<>();
for (int i = 0; i < columnNames.size(); i++) {
columns.put(columnNames.get(i), typeNames.get(i));
columns.put(columnNames.get(i), dataTypes.get(i).getTypeName());
columnIndices.put(columnNames.get(i), i);
columnDescs.add(new ColumnDesc(columnNames.get(i), dataTypes.get(i)));
}
}

Expand All @@ -54,6 +58,10 @@ public TypeName getColumnType(String columnName) {
return columns.get(columnName);
}

public ColumnDesc getColumnDesc(int index) {
return columnDescs.get(index);
}

public Map<String, TypeName> getColumnTypes() {
return new HashMap<>(columns);
}
Expand All @@ -62,27 +70,8 @@ public String[] getColumnNames() {
return columnNames.toArray(new String[0]);
}

public static TableSchema getMockTableSchema() {
return new TableSchema(
Lists.newArrayList("id", "name"),
Lists.newArrayList(TypeName.INT32, TypeName.VARCHAR),
Lists.newArrayList("id"));
}

public static ConnectorServiceProto.TableSchema getMockTableProto() {
return ConnectorServiceProto.TableSchema.newBuilder()
.addColumns(
ConnectorServiceProto.TableSchema.Column.newBuilder()
.setName("id")
.setDataType(TypeName.INT32)
.build())
.addColumns(
ConnectorServiceProto.TableSchema.Column.newBuilder()
.setName("name")
.setDataType(TypeName.VARCHAR)
.build())
.addAllPkIndices(List.of(1))
.build();
public List<ColumnDesc> getColumnDescs() {
return columnDescs;
}

public Object getFromRow(String columnName, SinkRow row) {
Expand Down
5 changes: 3 additions & 2 deletions java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
import grpc
import connector_service_pb2_grpc
import connector_service_pb2
import data_pb2
import psycopg2


def make_mock_schema():
# todo
schema = connector_service_pb2.TableSchema(
columns=[
connector_service_pb2.TableSchema.Column(name="id", data_type=2),
connector_service_pb2.TableSchema.Column(name="name", data_type=7)
connector_service_pb2.TableSchema.Column(name="id", data_type=data_pb2.DataType(type_name=2)),
connector_service_pb2.TableSchema.Column(name="name", data_type=data_pb2.DataType(type_name=7))
],
pk_indices=[0]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,13 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj
}
byte[] bytes = Base64.getDecoder().decode((String) value);
return new ByteArrayInputStream(bytes);
case LIST:
if (!(value instanceof java.util.ArrayList<?>)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected list, got " + value.getClass())
.asRuntimeException();
}
return value;
default:
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unsupported type " + typeName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public void onNext(ConnectorServiceProto.SinkStreamRequest sinkTask) {
}
sinkId = sinkTask.getStart().getSinkId();
bindSink(sinkTask.getStart().getSinkConfig(), sinkTask.getStart().getFormat());
LOG.debug("Sink initialized");
responseObserver.onNext(
ConnectorServiceProto.SinkResponse.newBuilder()
.setStart(StartResponse.newBuilder().build())
Expand Down
Loading