Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sink): fix connector node sink json payload serialization #8461

Merged
merged 4 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,18 @@ else
fi

ARTIFACT="risingwave-connector-1.0.0.tar.gz"
TARGET_PATH="${JAVA_DIR}/connector-node/assembly/target/${ARTIFACT}"

cd "${JAVA_DIR}"
"${MAVEN_PATH}" --batch-mode --update-snapshots clean package -Dmaven.test.skip
if [[ ! -f ${TARGET_PATH} ]] || [[ ! -z ${REBUILD_CONNECTOR_NODE} ]]; then
echo "Rebuild connector node"
cd "${JAVA_DIR}"
"${MAVEN_PATH}" --batch-mode --update-snapshots clean package -Dmaven.test.skip
else
echo "Connector node was built already. Skipped. Set REBUILD_CONNECTOR_NODE=1 to enable rebuild"
fi
rm -rf ${PREFIX_BIN}/connector-node
mkdir -p "${PREFIX_BIN}/connector-node"
tar xf "${JAVA_DIR}/connector-node/assembly/target/${ARTIFACT}" -C "${PREFIX_BIN}/connector-node"
tar xf ${TARGET_PATH} -C "${PREFIX_BIN}/connector-node"
'''


Expand Down
16 changes: 8 additions & 8 deletions ci/scripts/e2e-iceberg-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301 \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=hummockadmin \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=hummockadmin \
--S --e "CREATE TABLE demo.demo_db.demo_table(v1 int, v2 int) TBLPROPERTIES ('format-version'='2');"
--S --e "CREATE TABLE demo.demo_db.demo_table(v1 int, v2 bigint, v3 string) TBLPROPERTIES ('format-version'='2');"

echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/iceberg_sink.slt'
Expand All @@ -80,13 +80,13 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \

# check sink destination using shell
if cat ./spark-output/*.csv | sort | awk -F "," '{
if ($1 == 1 && $2 == 2) c1++;
if ($1 == 13 && $2 == 2) c2++;
if ($1 == 21 && $2 == 2) c3++;
if ($1 == 2 && $2 == 2) c4++;
if ($1 == 3 && $2 == 2) c5++;
if ($1 == 5 && $2 == 2) c6++;
if ($1 == 8 && $2 == 2) c7++; }
if ($1 == 1 && $2 == 2 && $3 == "1-2") c1++;
if ($1 == 13 && $2 == 2 && $3 == "13-2") c2++;
if ($1 == 21 && $2 == 2 && $3 == "21-2") c3++;
if ($1 == 2 && $2 == 2 && $3 == "2-2") c4++;
if ($1 == 3 && $2 == 2 && $3 == "3-2") c5++;
if ($1 == 5 && $2 == 2 && $3 == "5-2") c6++;
if ($1 == 8 && $2 == 2 && $3 == "8-2") c7++; }
END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1 && c6 == 1 && c7 == 1); }'; then
echo "Iceberg sink check passed"
else
Expand Down
12 changes: 6 additions & 6 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
statement ok
CREATE TABLE t6 (v1 int primary key, v2 int);
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2 from mv6 WITH (
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg',
sink.mode='append-only',
location.type='minio',
Expand All @@ -15,7 +15,10 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2 from mv6 WITH (
);

statement ok
INSERT INTO t6 VALUES (1, 2), (2, 2), (3, 2), (5, 2), (8, 2), (13, 2), (21, 2);
INSERT INTO t6 VALUES (1, 2, '1-2'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2'), (8, 2, '8-2'), (13, 2, '13-2'), (21, 2, '21-2');

statement ok
FLUSH;

statement ok
DROP SINK s6;
Expand All @@ -25,6 +28,3 @@ DROP MATERIALIZED VIEW mv6;

statement ok
DROP TABLE t6;

statement ok
FLUSH;
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,18 @@ public static TableSchema fromProto(ConnectorServiceProto.TableSchema tableSchem
public List<String> getPrimaryKeys() {
return primaryKeys;
}

@Override
public String toString() {
return "TableSchema{"
+ "columnNames="
+ columnNames
+ ", columns="
+ columns
+ ", columnIndices="
+ columnIndices
+ ", primaryKeys="
+ primaryKeys
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,63 +49,82 @@ public Iterator<SinkRow> deserialize(Object payload) {
.iterator();
}

private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) {
if (value instanceof Double
&& (Double) value % 1 == 0
&& typeName != Data.DataType.TypeName.DOUBLE
&& typeName != Data.DataType.TypeName.FLOAT) {
return (int) (double) value;
private static Long castLong(Object value) {
if (value instanceof Integer) {
return ((Integer) value).longValue();
} else if (value instanceof Double) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the places calling castLong are INT16, INT32, and INT64. Why should we cover the case of Double and Float?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The json library we are using will convert a json value such 1 into a double. To be compatible with it, we may also check the case of double and float. We have added a check on whether the floating number represents an integer. For example, if we get a 1.1 in castLong, we will get an error.

double d = (Double) value;
if (d % 1.0 != 0.0) {

throw io.grpc.Status.INVALID_ARGUMENT
.withDescription(
"unable to cast into long from non-integer double value: " + d)
.asRuntimeException();
}
return ((Double) value).longValue();
} else if (value instanceof Long) {
return (Long) value;
} else if (value instanceof Short) {
return ((Short) value).longValue();
} else if (value instanceof Float) {
double f = (Float) value;
if (f % 1.0 != 0.0) {

throw io.grpc.Status.INVALID_ARGUMENT
.withDescription(
"unable to cast into long from non-integer float value: " + f)
.asRuntimeException();
}
return ((Float) value).longValue();
} else {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unable to cast into long from " + value.getClass())
.asRuntimeException();
}
}

private static Double castDouble(Object value) {
if (value instanceof Double) {
return (Double) value;
} else if (value instanceof Float) {
return ((Float) value).doubleValue();
} else {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unable to cast into double from " + value.getClass())
.asRuntimeException();
}
}

private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) {
switch (typeName) {
case INT16:
return castLong(value).shortValue();
case INT32:
return castLong(value).intValue();
case INT64:
case INT16:
if (!(value instanceof Integer)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected int, got " + value.getClass())
.asRuntimeException();
}
break;
return castLong(value);
case VARCHAR:
if (!(value instanceof String)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected string, got " + value.getClass())
.asRuntimeException();
}
break;
return value;
case DOUBLE:
if (!(value instanceof Double)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected double, got " + value.getClass())
.asRuntimeException();
}
break;
return castDouble(value);
case FLOAT:
if (!(value instanceof Float)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected float, got " + value.getClass())
.asRuntimeException();
}
break;
case DECIMAL:
if (!(value instanceof Float || value instanceof Double)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected float, got " + value.getClass())
.asRuntimeException();
}
break;
return castDouble(value).floatValue();
case BOOLEAN:
if (!(value instanceof Boolean)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected boolean, got " + value.getClass())
.asRuntimeException();
}
break;
return value;
default:
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unsupported type " + typeName)
.asRuntimeException();
}
return value;
}
}
4 changes: 2 additions & 2 deletions src/risedevtool/connector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ description = "Download Maven"
script = '''
#!/usr/bin/env bash

if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}')" =~ "11" ]]); then
echo "JDK 11 is not installed. Please install JDK 11 first."
if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}')" =~ ^(11|17) ]]); then
echo "JDK 11+ is not installed. Please install JDK 11+ first."
exit 1
fi

Expand Down