Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector-node): support stream chunk payload in connector node #8548

Merged
merged 35 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
39978e1
feat(java-binding): support java binding on stream chunk
wenym1 Mar 13, 2023
6486af4
rename the previous general iterator and add demo and ci for data chu…
wenym1 Mar 14, 2023
ddbcfd9
fix license
wenym1 Mar 14, 2023
c4a6e9d
fix license again
wenym1 Mar 14, 2023
fc324f9
add java rpc code
yufansong Mar 14, 2023
423b64f
add sink observer case: streamchunk payload
yufansong Mar 14, 2023
2308d72
move demo, anmotation remote.rs and obeserver.java
yufansong Mar 15, 2023
5cfd57b
rename iterator to HummockJavaBindingIterator
wenym1 Mar 15, 2023
faa566e
Merge branch 'main' into yiming/stream-chunk-java-binding
wenym1 Mar 15, 2023
3c2489d
Merge branch 'yiming/stream-chunk-java-binding' into yufan/stream-chunk
wenym1 Mar 15, 2023
d169f13
feat(connector-node): specify sink payload format in start sink
wenym1 Mar 16, 2023
dd90d72
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 16, 2023
4c78b31
fix ci
wenym1 Mar 16, 2023
a0c2260
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 16, 2023
2d063a8
impl Closeable for iterator and sink row
wenym1 Mar 16, 2023
95e53e3
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 16, 2023
79f829e
update typescript
wenym1 Mar 16, 2023
fc8d6b1
add license
wenym1 Mar 16, 2023
7dc1a84
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 16, 2023
d0e8c97
complete rpc with stream chunk
wenym1 Mar 16, 2023
481335e
add log and initialize stream chunk iter
wenym1 Mar 16, 2023
e0c636c
Merge branch 'main' into yiming/set-sink-payload-format
wenym1 Mar 16, 2023
ffb5313
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 16, 2023
ad9bdc0
implement closure for column value getter
wenym1 Mar 16, 2023
eee1f0a
Merge branch 'main' into yiming/set-sink-payload-format
wenym1 Mar 17, 2023
a8f4915
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 17, 2023
f0db319
Merge branch 'main' into yiming/set-sink-payload-format
wenym1 Mar 17, 2023
51888ea
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 17, 2023
ae6beb1
update proto
wenym1 Mar 17, 2023
19f2ba5
enable stream payload in e2e ci
wenym1 Mar 17, 2023
ae2b845
fix(conector-node): do not store sink row inside upsert iceberg sink
wenym1 Mar 17, 2023
c726631
Merge branch 'yiming/fix-iceberg-sink-row-leak' into yufan/stream-chunk
wenym1 Mar 17, 2023
e6ace50
Merge branch 'main' into yiming/fix-iceberg-sink-row-leak
wenym1 Mar 17, 2023
19bc205
Merge branch 'yiming/fix-iceberg-sink-row-leak' into yufan/stream-chunk
wenym1 Mar 17, 2023
7a6509d
update proto
wenym1 Mar 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ci/scripts/java-binding-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,6 @@ cargo make ingest-data-and-run-java-binding

echo "--- Kill cluster"
cargo make ci-kill

echo "--- run stream chunk java binding"
cargo make run-java-binding-stream-chunk-demo
44 changes: 38 additions & 6 deletions java/com_risingwave_java_binding_Binding.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,27 @@ public void onNext(ConnectorServiceProto.SinkStreamRequest sinkTask) {
.asRuntimeException();
}
break;
// case STREAM_CHUNK_PAYLOAD:
// if (deserializer == null) {
// deserializer = new
// StreamChunkDeserializer(tableSchema);
// }
//
// if (deserializer instanceof
// StreamChunkDeserializer) {
// rows =
// deserializer.deserialize(
//
// sinkTask.getWrite().getStreamChunkPayload());
// } else {
// throw INTERNAL.withDescription(
// "invalid payload type:
// expected StreamChunk, got "
// +
// deserializer.getClass().getName())
// .asRuntimeException();
// }
// break;
default:
throw INVALID_ARGUMENT
.withDescription("invalid payload type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.risingwave.java.binding;

import static com.risingwave.java.binding.Utils.validateRow;

import com.risingwave.java.utils.MetaClient;
import com.risingwave.proto.Catalog.Table;
import com.risingwave.proto.Hummock.HummockVersion;
Expand All @@ -27,7 +29,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;

/** Hello world! */
public class Demo {
public class HummockReadDemo {
public static void main(String[] args) {
String objectStore = System.getenv("OBJECT_STORE");
String dbName = System.getenv("DB_NAME");
Expand Down Expand Up @@ -67,7 +69,7 @@ public static void main(String[] args) {
.addAllVnodeIds(vnodeList)
.build();

try (Iterator iter = new Iterator(readPlan)) {
try (HummockIterator iter = new HummockIterator(readPlan)) {
int count = 0;
while (true) {
try (KeyedRow row = iter.next()) {
Expand All @@ -92,43 +94,4 @@ public static void main(String[] args) {

scheduledThreadPool.shutdown();
}

static void validateRow(KeyedRow row) {
// The validation of row data are according to the data generation rule
// defined in ${REPO_ROOT}/src/java_binding/gen-demo-insert-data.py
short rowIndex = row.getShort(0);
if (row.getInt(1) != rowIndex) {
throw new RuntimeException(
String.format("invalid int value: %s %s", row.getInt(1), rowIndex));
}
if (row.getLong(2) != rowIndex) {
throw new RuntimeException(
String.format("invalid long value: %s %s", row.getLong(2), rowIndex));
}
if (row.getFloat(3) != (float) rowIndex) {
throw new RuntimeException(
String.format("invalid float value: %s %s", row.getFloat(3), rowIndex));
}
if (row.getDouble(4) != (double) rowIndex) {
throw new RuntimeException(
String.format("invalid double value: %s %s", row.getDouble(4), rowIndex));
}
if (row.getBoolean(5) != (rowIndex % 3 == 0)) {
throw new RuntimeException(
String.format(
"invalid bool value: %s %s", row.getBoolean(5), (rowIndex % 3 == 0)));
}
if (!row.getString(6).equals(((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))) {
throw new RuntimeException(
String.format(
"invalid string value: %s %s",
row.getString(6),
((Short) rowIndex).toString().repeat((rowIndex % 10) + 1)));
}
if (row.isNull(7) != (rowIndex % 5 == 0)) {
throw new RuntimeException(
String.format(
"invalid isNull value: %s %s", row.isNull(7), (rowIndex % 5 == 0)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.java.binding;

import static com.risingwave.java.binding.Utils.validateRow;

import java.io.IOException;

public class StreamChunkDemo {

public static void main(String[] args) throws IOException {
byte[] payload = System.in.readAllBytes();
try (StreamChunkIterator iter = new StreamChunkIterator(payload)) {
int count = 0;
while (true) {
try (StreamChunkRow row = iter.next()) {
if (row == null) {
break;
}
count += 1;
validateRow(row);
}
}
int expectedCount = 30000;
if (count != expectedCount) {
throw new RuntimeException(
String.format("row count is %s, should be %s", count, expectedCount));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.risingwave.java.binding;

import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.StreamChunkPayload;
import com.risingwave.proto.Data;
import java.io.IOException;
import java.util.Iterator;

public class StreamChunkDeserializerDemo {
public static void main(String[] args) throws IOException {
byte[] binaryData = System.in.readAllBytes();
TableSchema tableSchema = getMockTableSchema();
StreamChunkDeserializer deseriablizer = new StreamChunkDeserializer(tableSchema);
StreamChunkPayload payload =
StreamChunkPayload.newBuilder()
.setBinaryData(ByteString.copyFrom(binaryData))
.build();
Iterator<SinkRow> iter = deseriablizer.deserialize(payload);
int count = 0;
while (true) {
if (!iter.hasNext()) {
break;
}
SinkRow row = iter.next();
count += 1;
validateSinkRow(row);
}
int expectedCount = 30000;
if (count != expectedCount) {
throw new RuntimeException(
String.format("row count is %s, should be %s", count, expectedCount));
}
}

public static TableSchema getMockTableSchema() {
return new TableSchema(
Lists.newArrayList("v1", "v2", "v3", "v4", "v5", "v6", "v7", "may_null"),
Lists.newArrayList(
Data.DataType.TypeName.INT16,
Data.DataType.TypeName.INT32,
Data.DataType.TypeName.INT64,
Data.DataType.TypeName.FLOAT,
Data.DataType.TypeName.DOUBLE,
Data.DataType.TypeName.BOOLEAN,
Data.DataType.TypeName.VARCHAR,
Data.DataType.TypeName.INT64),
Lists.newArrayList("v1"));
}

public static void validateSinkRow(SinkRow row) {
// The validation of row data are according to the data generation rule
// defined in ${REPO_ROOT}/src/java_binding/gen-demo-insert-data.py
short rowIndex = (short) row.get(0);
if (row.get(1) instanceof Short && !((Short) row.get(1)).equals(rowIndex)) {
throw new RuntimeException(
String.format("invalid int value: %s %s", row.get(1), rowIndex));
}
if (row.get(2) instanceof Long && !((Long) row.get(2)).equals((long) rowIndex)) {
throw new RuntimeException(
String.format("invalid long value: %s %s", row.get(2), rowIndex));
}
if (row.get(3) instanceof Float && !((Float) row.get(3)).equals((float) rowIndex)) {
throw new RuntimeException(
String.format("invalid float value: %s %s", row.get(3), rowIndex));
}
if (row.get(4) instanceof Double && !((Double) row.get(4)).equals((double) rowIndex)) {
throw new RuntimeException(
String.format("invalid double value: %s %s", row.get(4), rowIndex));
}
if (row.get(5) instanceof Boolean && !((Boolean) row.get(5)).equals(rowIndex % 3 == 0)) {
throw new RuntimeException(
String.format("invalid bool value: %s %s", row.get(5), (rowIndex % 3 == 0)));
}
if (row.get(6) instanceof String
&& !row.get(6).equals(((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))) {
throw new RuntimeException(
String.format(
"invalid string value: %s %s",
row.get(6), ((Short) rowIndex).toString().repeat((rowIndex % 10) + 1)));
}
if ((row.get(7) == null) != (rowIndex % 5 == 0)) {
throw new RuntimeException(
String.format("invalid isNull value: %s %s", row.get(7), (rowIndex % 5 == 0)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.java.binding;

public class Utils {
public static void validateRow(BaseRow row) {
// The validation of row data are according to the data generation rule
// defined in ${REPO_ROOT}/src/java_binding/gen-demo-insert-data.py
short rowIndex = row.getShort(0);
if (row.getInt(1) != rowIndex) {
throw new RuntimeException(
String.format("invalid int value: %s %s", row.getInt(1), rowIndex));
}
if (row.getLong(2) != rowIndex) {
throw new RuntimeException(
String.format("invalid long value: %s %s", row.getLong(2), rowIndex));
}
if (row.getFloat(3) != (float) rowIndex) {
throw new RuntimeException(
String.format("invalid float value: %s %s", row.getFloat(3), rowIndex));
}
if (row.getDouble(4) != (double) rowIndex) {
throw new RuntimeException(
String.format("invalid double value: %s %s", row.getDouble(4), rowIndex));
}
if (row.getBoolean(5) != (rowIndex % 3 == 0)) {
throw new RuntimeException(
String.format(
"invalid bool value: %s %s", row.getBoolean(5), (rowIndex % 3 == 0)));
}
if (!row.getString(6).equals(((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))) {
throw new RuntimeException(
String.format(
"invalid string value: %s %s",
row.getString(6),
((Short) rowIndex).toString().repeat((rowIndex % 10) + 1)));
}
if (row.isNull(7) != (rowIndex % 5 == 0)) {
throw new RuntimeException(
String.format(
"invalid isNull value: %s %s", row.isNull(7), (rowIndex % 5 == 0)));
}
}
}
8 changes: 8 additions & 0 deletions java/java-binding/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
<groupId>com.risingwave.java</groupId>
<artifactId>common-utils</artifactId>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>connector-api</artifactId>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-connector-service</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Loading