From 2663f7c5851f7eae930d7d031b34d1cc462a0ef0 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 9 Mar 2023 20:49:04 +0800 Subject: [PATCH 1/4] fix(sink): fix connector node sink json payload serialization --- .../risingwave/connector/api/TableSchema.java | 14 ++++ .../connector/JsonDeserializer.java | 73 ++++++++++--------- 2 files changed, 52 insertions(+), 35 deletions(-) diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java index d10a913f75a50..ffa77cc8782ef 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java @@ -91,4 +91,18 @@ public static TableSchema fromProto(ConnectorServiceProto.TableSchema tableSchem public List getPrimaryKeys() { return primaryKeys; } + + @Override + public String toString() { + return "TableSchema{" + + "columnNames=" + + columnNames + + ", columns=" + + columns + + ", columnIndices=" + + columnIndices + + ", primaryKeys=" + + primaryKeys + + '}'; + } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java index 1b627d48aa8ec..94dbd2d2dca56 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java @@ -49,63 +49,66 @@ public Iterator deserialize(Object payload) { .iterator(); } - private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) { - if (value instanceof Double - && (Double) value % 1 == 0 - && typeName != Data.DataType.TypeName.DOUBLE - && typeName != Data.DataType.TypeName.FLOAT) { - return (int) (double) value; + private static Long castLong(Object value) { + if (value instanceof Integer) { + return ((Integer) value).longValue(); + } else if (value instanceof Double) { + return ((Double) value).longValue(); + } else if (value instanceof Long) { + return (Long) value; + } else if (value instanceof Short) { + return ((Short) value).longValue(); + } else if (value instanceof Float) { + return ((Float) value).longValue(); + } else { + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription("unable to cast into long from " + value.getClass()) + .asRuntimeException(); + } + } + + private static Double castDouble(Object value) { + if (value instanceof Double) { + return (Double) value; + } else if (value instanceof Float) { + return ((Float) value).doubleValue(); + } else { + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription("unable to cast into double from " + value.getClass()) + .asRuntimeException(); } + } + + private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) { switch (typeName) { + case INT16: + return castLong(value).shortValue(); case INT32: + return castLong(value).intValue(); case INT64: - case INT16: - if (!(value instanceof Integer)) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("Expected int, got " + value.getClass()) - .asRuntimeException(); - } - break; + return castLong(value); case VARCHAR: if (!(value instanceof String)) { throw io.grpc.Status.INVALID_ARGUMENT .withDescription("Expected string, got " + value.getClass()) .asRuntimeException(); } - break; + return value; case DOUBLE: - if (!(value instanceof Double)) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("Expected double, got " + value.getClass()) - .asRuntimeException(); - } - break; + return castDouble(value); case FLOAT: - if (!(value instanceof Float)) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("Expected float, got " + value.getClass()) - .asRuntimeException(); - } - break; - case DECIMAL: - if (!(value instanceof Float || value instanceof Double)) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("Expected float, got " + value.getClass()) - .asRuntimeException(); - } - break; + return castDouble(value).floatValue(); case BOOLEAN: if (!(value instanceof Boolean)) { throw io.grpc.Status.INVALID_ARGUMENT .withDescription("Expected boolean, got " + value.getClass()) .asRuntimeException(); } - break; + return value; default: throw io.grpc.Status.INVALID_ARGUMENT .withDescription("unsupported type " + typeName) .asRuntimeException(); } - return value; } } From 398833ed56598162037380237758cd376870a4b7 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 9 Mar 2023 21:12:13 +0800 Subject: [PATCH 2/4] add test on ci --- ci/scripts/e2e-iceberg-sink-test.sh | 16 ++++++++-------- e2e_test/sink/iceberg_sink.slt | 12 ++++++------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/ci/scripts/e2e-iceberg-sink-test.sh b/ci/scripts/e2e-iceberg-sink-test.sh index 4fe338dbffe1e..abdbd877574cb 100755 --- a/ci/scripts/e2e-iceberg-sink-test.sh +++ b/ci/scripts/e2e-iceberg-sink-test.sh @@ -62,7 +62,7 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \ --conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301 \ --conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=hummockadmin \ --conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=hummockadmin \ - --S --e "CREATE TABLE demo.demo_db.demo_table(v1 int, v2 int) TBLPROPERTIES ('format-version'='2');" + --S --e "CREATE TABLE demo.demo_db.demo_table(v1 int, v2 bigint, v3 string) TBLPROPERTIES ('format-version'='2');" echo "--- testing sinks" sqllogictest -p 4566 -d dev './e2e_test/sink/iceberg_sink.slt' @@ -80,13 +80,13 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \ # check sink destination using shell if cat ./spark-output/*.csv | sort | awk -F "," '{ -if ($1 == 1 && $2 == 2) c1++; - if ($1 == 13 && $2 == 2) c2++; - if ($1 == 21 && $2 == 2) c3++; - if ($1 == 2 && $2 == 2) c4++; - if ($1 == 3 && $2 == 2) c5++; - if ($1 == 5 && $2 == 2) c6++; - if ($1 == 8 && $2 == 2) c7++; } +if ($1 == 1 && $2 == 2 && $3 == "1-2") c1++; + if ($1 == 13 && $2 == 2 && $3 == "13-2") c2++; + if ($1 == 21 && $2 == 2 && $3 == "21-2") c3++; + if ($1 == 2 && $2 == 2 && $3 == "2-2") c4++; + if ($1 == 3 && $2 == 2 && $3 == "3-2") c5++; + if ($1 == 5 && $2 == 2 && $3 == "5-2") c6++; + if ($1 == 8 && $2 == 2 && $3 == "8-2") c7++; } END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1 && c6 == 1 && c7 == 1); }'; then echo "Iceberg sink check passed" else diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index 9b9e51c095cdd..26338f089dcca 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -1,11 +1,11 @@ statement ok -CREATE TABLE t6 (v1 int primary key, v2 int); +CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar); statement ok CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; statement ok -CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2 from mv6 WITH ( +CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH ( connector = 'iceberg', sink.mode='append-only', location.type='minio', @@ -15,7 +15,10 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2 from mv6 WITH ( ); statement ok -INSERT INTO t6 VALUES (1, 2), (2, 2), (3, 2), (5, 2), (8, 2), (13, 2), (21, 2); +INSERT INTO t6 VALUES (1, 2, '1-2'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2'), (8, 2, '8-2'), (13, 2, '13-2'), (21, 2, '21-2'); + +statement ok +FLUSH; statement ok DROP SINK s6; @@ -25,6 +28,3 @@ DROP MATERIALIZED VIEW mv6; statement ok DROP TABLE t6; - -statement ok -FLUSH; From f2e3491fc6a4abe6e400f33a6c8343fece19c880 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 9 Mar 2023 21:17:59 +0800 Subject: [PATCH 3/4] add integer check on double and float --- .../risingwave/connector/JsonDeserializer.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java index 94dbd2d2dca56..64f7b04cef48d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java @@ -53,12 +53,28 @@ private static Long castLong(Object value) { if (value instanceof Integer) { return ((Integer) value).longValue(); } else if (value instanceof Double) { + double d = (Double) value; + if (d % 1.0 != 0.0) { + + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription( + "unable to cast into long from non-integer double value: " + d) + .asRuntimeException(); + } return ((Double) value).longValue(); } else if (value instanceof Long) { return (Long) value; } else if (value instanceof Short) { return ((Short) value).longValue(); } else if (value instanceof Float) { + double f = (Float) value; + if (f % 1.0 != 0.0) { + + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription( + "unable to cast into long from non-integer float value: " + f) + .asRuntimeException(); + } return ((Float) value).longValue(); } else { throw io.grpc.Status.INVALID_ARGUMENT From 7f1873213d5eea831737beacb5045ae4d29d19e2 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 9 Mar 2023 21:22:33 +0800 Subject: [PATCH 4/4] include risedev change --- Makefile.toml | 13 ++++++++++--- src/risedevtool/connector.toml | 4 ++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/Makefile.toml b/Makefile.toml index 94b199d94439d..274941161846d 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -599,11 +599,18 @@ else fi ARTIFACT="risingwave-connector-1.0.0.tar.gz" +TARGET_PATH="${JAVA_DIR}/connector-node/assembly/target/${ARTIFACT}" -cd "${JAVA_DIR}" -"${MAVEN_PATH}" --batch-mode --update-snapshots clean package -Dmaven.test.skip +if [[ ! -f ${TARGET_PATH} ]] || [[ ! -z ${REBUILD_CONNECTOR_NODE} ]]; then + echo "Rebuild connector node" + cd "${JAVA_DIR}" + "${MAVEN_PATH}" --batch-mode --update-snapshots clean package -Dmaven.test.skip +else + echo "Connector node was built already. Skipped. Set REBUILD_CONNECTOR_NODE=1 to enable rebuild" +fi +rm -rf ${PREFIX_BIN}/connector-node mkdir -p "${PREFIX_BIN}/connector-node" -tar xf "${JAVA_DIR}/connector-node/assembly/target/${ARTIFACT}" -C "${PREFIX_BIN}/connector-node" +tar xf ${TARGET_PATH} -C "${PREFIX_BIN}/connector-node" ''' diff --git a/src/risedevtool/connector.toml b/src/risedevtool/connector.toml index 0d826f4edd948..55b485412dde1 100644 --- a/src/risedevtool/connector.toml +++ b/src/risedevtool/connector.toml @@ -49,8 +49,8 @@ description = "Download Maven" script = ''' #!/usr/bin/env bash -if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}')" =~ "11" ]]); then - echo "JDK 11 is not installed. Please install JDK 11 first." +if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}')" =~ ^(11|17) ]]); then + echo "JDK 11+ is not installed. Please install JDK 11+ first." exit 1 fi