diff --git a/Cargo.lock b/Cargo.lock index dc213e1fec007..5d8578a62fd43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6229,6 +6229,7 @@ version = "0.1.0" dependencies = [ "bytes", "futures", + "itertools", "jni", "madsim-tokio", "prost 0.11.8", diff --git a/ci/scripts/java-binding-test.sh b/ci/scripts/java-binding-test.sh index 9dca6068a6915..8818bd7d8a412 100755 --- a/ci/scripts/java-binding-test.sh +++ b/ci/scripts/java-binding-test.sh @@ -53,3 +53,6 @@ cargo make ingest-data-and-run-java-binding echo "--- Kill cluster" cargo make ci-kill + +echo "--- run stream chunk java binding" +cargo make run-java-binding-stream-chunk-demo diff --git a/java/com_risingwave_java_binding_Binding.h b/java/com_risingwave_java_binding_Binding.h index 0f4181f2cefb7..bd03892223a6d 100644 --- a/java/com_risingwave_java_binding_Binding.h +++ b/java/com_risingwave_java_binding_Binding.h @@ -17,26 +17,26 @@ JNIEXPORT jint JNICALL Java_com_risingwave_java_binding_Binding_vnodeCount /* * Class: com_risingwave_java_binding_Binding - * Method: iteratorNew + * Method: hummockIteratorNew * Signature: ([B)J */ -JNIEXPORT jlong JNICALL Java_com_risingwave_java_binding_Binding_iteratorNew +JNIEXPORT jlong JNICALL Java_com_risingwave_java_binding_Binding_hummockIteratorNew (JNIEnv *, jclass, jbyteArray); /* * Class: com_risingwave_java_binding_Binding - * Method: iteratorNext + * Method: hummockIteratorNext * Signature: (J)J */ -JNIEXPORT jlong JNICALL Java_com_risingwave_java_binding_Binding_iteratorNext +JNIEXPORT jlong JNICALL Java_com_risingwave_java_binding_Binding_hummockIteratorNext (JNIEnv *, jclass, jlong); /* * Class: com_risingwave_java_binding_Binding - * Method: iteratorClose + * Method: hummockIteratorClose * Signature: (J)V */ -JNIEXPORT void JNICALL Java_com_risingwave_java_binding_Binding_iteratorClose +JNIEXPORT void JNICALL Java_com_risingwave_java_binding_Binding_hummockIteratorClose (JNIEnv *, jclass, jlong); /* @@ -47,6 +47,14 @@ JNIEXPORT void JNICALL Java_com_risingwave_java_binding_Binding_iteratorClose JNIEXPORT jbyteArray JNICALL Java_com_risingwave_java_binding_Binding_rowGetKey (JNIEnv *, jclass, jlong); +/* + * Class: com_risingwave_java_binding_Binding + * Method: rowGetOp + * Signature: (J)I + */ +JNIEXPORT jint JNICALL Java_com_risingwave_java_binding_Binding_rowGetOp + (JNIEnv *, jclass, jlong); + /* * Class: com_risingwave_java_binding_Binding * Method: rowIsNull @@ -119,6 +127,30 @@ JNIEXPORT jstring JNICALL Java_com_risingwave_java_binding_Binding_rowGetStringV JNIEXPORT void JNICALL Java_com_risingwave_java_binding_Binding_rowClose (JNIEnv *, jclass, jlong); +/* + * Class: com_risingwave_java_binding_Binding + * Method: streamChunkIteratorNew + * Signature: ([B)J + */ +JNIEXPORT jlong JNICALL Java_com_risingwave_java_binding_Binding_streamChunkIteratorNew + (JNIEnv *, jclass, jbyteArray); + +/* + * Class: com_risingwave_java_binding_Binding + * Method: streamChunkIteratorNext + * Signature: (J)J + */ +JNIEXPORT jlong JNICALL Java_com_risingwave_java_binding_Binding_streamChunkIteratorNext + (JNIEnv *, jclass, jlong); + +/* + * Class: com_risingwave_java_binding_Binding + * Method: streamChunkIteratorClose + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_com_risingwave_java_binding_Binding_streamChunkIteratorClose + (JNIEnv *, jclass, jlong); + #ifdef __cplusplus } #endif diff --git a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Demo.java b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java similarity index 65% rename from java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Demo.java rename to java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java index bb62e5acd4846..9f4038cf3f9a3 100644 --- a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Demo.java +++ b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java @@ -14,6 +14,8 @@ package com.risingwave.java.binding; +import static com.risingwave.java.binding.Utils.validateRow; + import com.risingwave.java.utils.MetaClient; import com.risingwave.proto.Catalog.Table; import com.risingwave.proto.Hummock.HummockVersion; @@ -27,7 +29,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; /** Hello world! */ -public class Demo { +public class HummockReadDemo { public static void main(String[] args) { String objectStore = System.getenv("OBJECT_STORE"); String dbName = System.getenv("DB_NAME"); @@ -67,7 +69,7 @@ public static void main(String[] args) { .addAllVnodeIds(vnodeList) .build(); - try (Iterator iter = new Iterator(readPlan)) { + try (HummockIterator iter = new HummockIterator(readPlan)) { int count = 0; while (true) { try (KeyedRow row = iter.next()) { @@ -92,43 +94,4 @@ public static void main(String[] args) { scheduledThreadPool.shutdown(); } - - static void validateRow(KeyedRow row) { - // The validation of row data are according to the data generation rule - // defined in ${REPO_ROOT}/src/java_binding/gen-demo-insert-data.py - short rowIndex = row.getShort(0); - if (row.getInt(1) != rowIndex) { - throw new RuntimeException( - String.format("invalid int value: %s %s", row.getInt(1), rowIndex)); - } - if (row.getLong(2) != rowIndex) { - throw new RuntimeException( - String.format("invalid long value: %s %s", row.getLong(2), rowIndex)); - } - if (row.getFloat(3) != (float) rowIndex) { - throw new RuntimeException( - String.format("invalid float value: %s %s", row.getFloat(3), rowIndex)); - } - if (row.getDouble(4) != (double) rowIndex) { - throw new RuntimeException( - String.format("invalid double value: %s %s", row.getDouble(4), rowIndex)); - } - if (row.getBoolean(5) != (rowIndex % 3 == 0)) { - throw new RuntimeException( - String.format( - "invalid bool value: %s %s", row.getBoolean(5), (rowIndex % 3 == 0))); - } - if (!row.getString(6).equals(((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))) { - throw new RuntimeException( - String.format( - "invalid string value: %s %s", - row.getString(6), - ((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))); - } - if (row.isNull(7) != (rowIndex % 5 == 0)) { - throw new RuntimeException( - String.format( - "invalid isNull value: %s %s", row.isNull(7), (rowIndex % 5 == 0))); - } - } } diff --git a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/StreamChunkDemo.java b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/StreamChunkDemo.java new file mode 100644 index 0000000000000..0cc6977de2f0c --- /dev/null +++ b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/StreamChunkDemo.java @@ -0,0 +1,43 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.java.binding; + +import static com.risingwave.java.binding.Utils.validateRow; + +import java.io.IOException; + +public class StreamChunkDemo { + + public static void main(String[] args) throws IOException { + byte[] payload = System.in.readAllBytes(); + try (StreamChunkIterator iter = new StreamChunkIterator(payload)) { + int count = 0; + while (true) { + try (StreamChunkRow row = iter.next()) { + if (row == null) { + break; + } + count += 1; + validateRow(row); + } + } + int expectedCount = 30000; + if (count != expectedCount) { + throw new RuntimeException( + String.format("row count is %s, should be %s", count, expectedCount)); + } + } + } +} diff --git a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Utils.java b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Utils.java new file mode 100644 index 0000000000000..193ba4811bdc1 --- /dev/null +++ b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Utils.java @@ -0,0 +1,56 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.java.binding; + +public class Utils { + public static void validateRow(BaseRow row) { + // The validation of row data are according to the data generation rule + // defined in ${REPO_ROOT}/src/java_binding/gen-demo-insert-data.py + short rowIndex = row.getShort(0); + if (row.getInt(1) != rowIndex) { + throw new RuntimeException( + String.format("invalid int value: %s %s", row.getInt(1), rowIndex)); + } + if (row.getLong(2) != rowIndex) { + throw new RuntimeException( + String.format("invalid long value: %s %s", row.getLong(2), rowIndex)); + } + if (row.getFloat(3) != (float) rowIndex) { + throw new RuntimeException( + String.format("invalid float value: %s %s", row.getFloat(3), rowIndex)); + } + if (row.getDouble(4) != (double) rowIndex) { + throw new RuntimeException( + String.format("invalid double value: %s %s", row.getDouble(4), rowIndex)); + } + if (row.getBoolean(5) != (rowIndex % 3 == 0)) { + throw new RuntimeException( + String.format( + "invalid bool value: %s %s", row.getBoolean(5), (rowIndex % 3 == 0))); + } + if (!row.getString(6).equals(((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))) { + throw new RuntimeException( + String.format( + "invalid string value: %s %s", + row.getString(6), + ((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))); + } + if (row.isNull(7) != (rowIndex % 5 == 0)) { + throw new RuntimeException( + String.format( + "invalid isNull value: %s %s", row.isNull(7), (rowIndex % 5 == 0))); + } + } +} diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java b/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java new file mode 100644 index 0000000000000..22d55a145deaa --- /dev/null +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java @@ -0,0 +1,65 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.java.binding; + +public class BaseRow implements AutoCloseable { + protected final long pointer; + private boolean isClosed; + + protected BaseRow(long pointer) { + this.pointer = pointer; + this.isClosed = false; + } + + public boolean isNull(int index) { + return Binding.rowIsNull(pointer, index); + } + + public short getShort(int index) { + return Binding.rowGetInt16Value(pointer, index); + } + + public int getInt(int index) { + return Binding.rowGetInt32Value(pointer, index); + } + + public long getLong(int index) { + return Binding.rowGetInt64Value(pointer, index); + } + + public float getFloat(int index) { + return Binding.rowGetFloatValue(pointer, index); + } + + public double getDouble(int index) { + return Binding.rowGetDoubleValue(pointer, index); + } + + public boolean getBoolean(int index) { + return Binding.rowGetBooleanValue(pointer, index); + } + + public String getString(int index) { + return Binding.rowGetStringValue(pointer, index); + } + + @Override + public void close() { + if (!isClosed) { + isClosed = true; + Binding.rowClose(pointer); + } + } +} diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java index fead26d164dbf..f4dec3eecb426 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java @@ -21,20 +21,22 @@ public class Binding { public static native int vnodeCount(); - // iterator method + // hummock iterator method // Return a pointer to the iterator - static native long iteratorNew(byte[] readPlan); + static native long hummockIteratorNew(byte[] readPlan); // return a pointer to the next row - static native long iteratorNext(long pointer); + static native long hummockIteratorNext(long pointer); // Since the underlying rust does not have garbage collection, we will have to manually call // close on the iterator to release the iterator instance pointed by the pointer. - static native void iteratorClose(long pointer); + static native void hummockIteratorClose(long pointer); // row method static native byte[] rowGetKey(long pointer); + static native int rowGetOp(long pointer); + static native boolean rowIsNull(long pointer, int index); static native short rowGetInt16Value(long pointer, int index); @@ -54,4 +56,11 @@ public class Binding { // Since the underlying rust does not have garbage collection, we will have to manually call // close on the row to release the row instance pointed by the pointer. static native void rowClose(long pointer); + + // stream chunk iterator method + static native long streamChunkIteratorNew(byte[] streamChunkPayload); + + static native long streamChunkIteratorNext(long pointer); + + static native void streamChunkIteratorClose(long pointer); } diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/Iterator.java b/java/java-binding/src/main/java/com/risingwave/java/binding/HummockIterator.java similarity index 77% rename from java/java-binding/src/main/java/com/risingwave/java/binding/Iterator.java rename to java/java-binding/src/main/java/com/risingwave/java/binding/HummockIterator.java index 0242e62fa1871..ced034fd649d9 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/Iterator.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/HummockIterator.java @@ -16,17 +16,17 @@ import com.risingwave.proto.JavaBinding.ReadPlan; -public class Iterator implements AutoCloseable { +public class HummockIterator implements AutoCloseable { private final long pointer; private boolean isClosed; - public Iterator(ReadPlan readPlan) { - this.pointer = Binding.iteratorNew(readPlan.toByteArray()); + public HummockIterator(ReadPlan readPlan) { + this.pointer = Binding.hummockIteratorNew(readPlan.toByteArray()); this.isClosed = false; } public KeyedRow next() { - long pointer = Binding.iteratorNext(this.pointer); + long pointer = Binding.hummockIteratorNext(this.pointer); if (pointer == 0) { return null; } @@ -37,7 +37,7 @@ public KeyedRow next() { public void close() { if (!isClosed) { isClosed = true; - Binding.iteratorClose(pointer); + Binding.hummockIteratorClose(pointer); } } } diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/KeyedRow.java b/java/java-binding/src/main/java/com/risingwave/java/binding/KeyedRow.java index 60de3535d370e..6bbfdaafebabc 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/KeyedRow.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/KeyedRow.java @@ -14,56 +14,12 @@ package com.risingwave.java.binding; -public class KeyedRow implements AutoCloseable { - private final long pointer; - private boolean isClosed; - - KeyedRow(long pointer) { - this.pointer = pointer; - this.isClosed = false; +public class KeyedRow extends BaseRow { + public KeyedRow(long pointer) { + super(pointer); } public byte[] getKey() { return Binding.rowGetKey(pointer); } - - public boolean isNull(int index) { - return Binding.rowIsNull(pointer, index); - } - - public short getShort(int index) { - return Binding.rowGetInt16Value(pointer, index); - } - - public int getInt(int index) { - return Binding.rowGetInt32Value(pointer, index); - } - - public long getLong(int index) { - return Binding.rowGetInt64Value(pointer, index); - } - - public float getFloat(int index) { - return Binding.rowGetFloatValue(pointer, index); - } - - public double getDouble(int index) { - return Binding.rowGetDoubleValue(pointer, index); - } - - public boolean getBoolean(int index) { - return Binding.rowGetBooleanValue(pointer, index); - } - - public String getString(int index) { - return Binding.rowGetStringValue(pointer, index); - } - - @Override - public void close() { - if (!isClosed) { - isClosed = true; - Binding.rowClose(pointer); - } - } } diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkIterator.java b/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkIterator.java new file mode 100644 index 0000000000000..9d4d71650a82a --- /dev/null +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkIterator.java @@ -0,0 +1,41 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.java.binding; + +public class StreamChunkIterator implements AutoCloseable { + private final long pointer; + private boolean isClosed; + + public StreamChunkIterator(byte[] streamChunkPayload) { + this.pointer = Binding.streamChunkIteratorNew(streamChunkPayload); + this.isClosed = false; + } + + public StreamChunkRow next() { + long pointer = Binding.streamChunkIteratorNext(this.pointer); + if (pointer == 0) { + return null; + } + return new StreamChunkRow(pointer); + } + + @Override + public void close() { + if (!isClosed) { + isClosed = true; + Binding.streamChunkIteratorClose(pointer); + } + } +} diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkRow.java b/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkRow.java new file mode 100644 index 0000000000000..401d3d98f766d --- /dev/null +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkRow.java @@ -0,0 +1,27 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.java.binding; + +import com.risingwave.proto.Data; + +public class StreamChunkRow extends BaseRow { + public StreamChunkRow(long pointer) { + super(pointer); + } + + public Data.Op getOp() { + return Data.Op.forNumber(Binding.rowGetOp(pointer)); + } +} diff --git a/src/common/src/row/owned_row.rs b/src/common/src/row/owned_row.rs index e929176dc8727..afa18c07ae52f 100644 --- a/src/common/src/row/owned_row.rs +++ b/src/common/src/row/owned_row.rs @@ -92,6 +92,61 @@ impl OwnedRow { } } +impl OwnedRow { + pub fn is_null(&self, idx: usize) -> bool { + self[idx].is_none() + } + + pub fn get_int16(&self, idx: usize) -> i16 { + match self[idx].as_ref().unwrap() { + ScalarImpl::Int16(num) => *num, + _ => unreachable!("type is not int16 at index: {}", idx), + } + } + + pub fn get_int32(&self, idx: usize) -> i32 { + match self[idx].as_ref().unwrap() { + ScalarImpl::Int32(num) => *num, + _ => unreachable!("type is not int32 at index: {}", idx), + } + } + + pub fn get_int64(&self, idx: usize) -> i64 { + match self[idx].as_ref().unwrap() { + ScalarImpl::Int64(num) => *num, + _ => unreachable!("type is not int64 at index: {}", idx), + } + } + + pub fn get_f32(&self, idx: usize) -> f32 { + match self[idx].as_ref().unwrap() { + ScalarImpl::Float32(num) => num.into_inner(), + _ => unreachable!("type is not float32 at index: {}", idx), + } + } + + pub fn get_f64(&self, idx: usize) -> f64 { + match self[idx].as_ref().unwrap() { + ScalarImpl::Float64(num) => num.into_inner(), + _ => unreachable!("type is not float64 at index: {}", idx), + } + } + + pub fn get_bool(&self, idx: usize) -> bool { + match self[idx].as_ref().unwrap() { + ScalarImpl::Bool(num) => *num, + _ => unreachable!("type is not boolean at index: {}", idx), + } + } + + pub fn get_utf8(&self, idx: usize) -> &str { + match self[idx].as_ref().unwrap() { + ScalarImpl::Utf8(s) => s.as_ref(), + _ => unreachable!("type is not utf8 at index: {}", idx), + } + } +} + impl EstimateSize for OwnedRow { fn estimated_heap_size(&self) -> usize { // FIXME(bugen): this is not accurate now as the heap size of some `Scalar` is not counted. diff --git a/src/java_binding/Cargo.toml b/src/java_binding/Cargo.toml index 61f4cf3b757d0..43f1aae283a8b 100644 --- a/src/java_binding/Cargo.toml +++ b/src/java_binding/Cargo.toml @@ -12,6 +12,7 @@ normal = ["workspace-hack"] [dependencies] bytes = "1" futures = { version = "0.3", default-features = false, features = ["alloc"] } +itertools = "0.10" jni = "0.20.0" prost = "0.11" risingwave_common = { path = "../common" } @@ -33,3 +34,8 @@ tracing = "0.1" [lib] crate_type = ["cdylib"] + +[[bin]] +name = "data-chunk-payload-generator" +test = false +bench = false diff --git a/src/java_binding/make-java-binding.toml b/src/java_binding/make-java-binding.toml index 18722cbe1bc2e..56d69dd40d31d 100644 --- a/src/java_binding/make-java-binding.toml +++ b/src/java_binding/make-java-binding.toml @@ -22,6 +22,7 @@ script = ''' set -ex cd java mvn clean install --pl java-binding-integration-test --am -DskipTests=true +mvn dependency:copy-dependencies --pl java-binding-integration-test ''' [tasks.start-java-binding-demo-cluster] @@ -76,3 +77,24 @@ dependencies = [ "ingest-data-and-run-java-binding", "kill-java-binding-demo-cluster" ] + +[tasks.run-java-binding-stream-chunk-demo] +description = "Run the java binding stream chunk demo" +dependencies = [ + "build-java-binding-rust", + "build-java-binding-java", +] +script = ''' +#!/usr/bin/env bash +set -ex + +RISINGWAVE_ROOT=$(git rev-parse --show-toplevel) + +cd ${RISINGWAVE_ROOT}/java + +(cd ${RISINGWAVE_ROOT} && cargo run --bin data-chunk-payload-generator) | \ + java -cp "./java-binding-integration-test/target/dependency/*:./java-binding-integration-test/target/classes" \ + -Djava.library.path=${RISINGWAVE_ROOT}/target/debug com.risingwave.java.binding.StreamChunkDemo + + +''' diff --git a/src/java_binding/run_demo.sh b/src/java_binding/run_demo.sh index 72bcc9642b3c0..fc49d96fd5678 100644 --- a/src/java_binding/run_demo.sh +++ b/src/java_binding/run_demo.sh @@ -19,13 +19,8 @@ set -x cd ${RISINGWAVE_ROOT}/java -mvn exec:exec \ - -pl java-binding-integration-test \ - -Dexec.executable=java \ - -Dexec.args=" \ - -cp %classpath:java-binding/target*.jar:proto/target/*.jar \ - -Djava.library.path=${RISINGWAVE_ROOT}/target/debug \ - com.risingwave.java.binding.Demo" +java -cp "./java-binding-integration-test/target/dependency/*:./java-binding-integration-test/target/classes" \ + -Djava.library.path=${RISINGWAVE_ROOT}/target/debug com.risingwave.java.binding.HummockReadDemo psql -d dev -h localhost -p 4566 -U root << EOF DROP TABLE ${TABLE_NAME}; diff --git a/src/java_binding/src/bin/data-chunk-payload-generator.rs b/src/java_binding/src/bin/data-chunk-payload-generator.rs new file mode 100644 index 0000000000000..4d67b4a6fa6e1 --- /dev/null +++ b/src/java_binding/src/bin/data-chunk-payload-generator.rs @@ -0,0 +1,78 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Write; + +use prost::Message; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, OrderedF32, OrderedF64, ScalarImpl}; +use risingwave_common::util::chunk_coalesce::DataChunkBuilder; + +fn build_row(index: usize) -> OwnedRow { + let mut row_value = Vec::with_capacity(8); + row_value.push(Some(ScalarImpl::Int16(index as i16))); + row_value.push(Some(ScalarImpl::Int32(index as i32))); + row_value.push(Some(ScalarImpl::Int64(index as i64))); + row_value.push(Some(ScalarImpl::Float32(OrderedF32::from(index as f32)))); + row_value.push(Some(ScalarImpl::Float64(OrderedF64::from(index as f64)))); + row_value.push(Some(ScalarImpl::Bool(index % 3 == 0))); + row_value.push(Some(ScalarImpl::Utf8( + format!("{}", index).repeat((index % 10) + 1).into(), + ))); + row_value.push(if index % 5 == 0 { + None + } else { + Some(ScalarImpl::Int64(index as i64)) + }); + + OwnedRow::new(row_value) +} + +fn main() { + let row_count = 30000; + let data_types = vec![ + DataType::Int16, + DataType::Int32, + DataType::Int64, + DataType::Float32, + DataType::Float64, + DataType::Boolean, + DataType::Varchar, + DataType::Int64, + ]; + let mut ops = Vec::with_capacity(row_count); + let mut builder = DataChunkBuilder::new(data_types, row_count * 1024); + for i in 0..row_count { + assert!( + builder.append_one_row(build_row(i)).is_none(), + "should not finish" + ); + if i % 2 == 0 { + ops.push(Op::Insert); + } else { + ops.push(Op::Delete); + } + } + + let data_chunk = builder.consume_all().expect("should not be empty"); + let stream_chunk = StreamChunk::from_parts(ops, data_chunk); + let prost_stream_chunk = stream_chunk.to_protobuf(); + + let payload = Message::encode_to_vec(&prost_stream_chunk); + + std::io::stdout() + .write_all(&payload) + .expect("should success"); +} diff --git a/src/java_binding/src/iterator.rs b/src/java_binding/src/hummock_iterator.rs similarity index 77% rename from src/java_binding/src/iterator.rs rename to src/java_binding/src/hummock_iterator.rs index bf75cdfc8e8b2..5515d50a48c9b 100644 --- a/src/java_binding/src/iterator.rs +++ b/src/java_binding/src/hummock_iterator.rs @@ -19,7 +19,7 @@ use futures::TryStreamExt; use risingwave_common::catalog::ColumnId; use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; -use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_common::types::DataType; use risingwave_common::util::select_all; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; use risingwave_common::util::value_encoding::{ @@ -47,7 +47,7 @@ fn select_all_vnode_stream( select_all(streams.into_iter().map(Box::pin)) } -pub struct Iterator { +pub struct HummockJavaBindingIterator { row_serde: EitherSerde, stream: SelectAllIterStream, } @@ -62,61 +62,12 @@ impl KeyedRow { self.key.as_ref() } - pub fn is_null(&self, idx: usize) -> bool { - self.row[idx].is_none() - } - - pub fn get_int16(&self, idx: usize) -> i16 { - match self.row[idx].as_ref().unwrap() { - ScalarImpl::Int16(num) => *num, - _ => unreachable!("type is not int16 at index: {}", idx), - } - } - - pub fn get_int32(&self, idx: usize) -> i32 { - match self.row[idx].as_ref().unwrap() { - ScalarImpl::Int32(num) => *num, - _ => unreachable!("type is not int32 at index: {}", idx), - } - } - - pub fn get_int64(&self, idx: usize) -> i64 { - match self.row[idx].as_ref().unwrap() { - ScalarImpl::Int64(num) => *num, - _ => unreachable!("type is not int64 at index: {}", idx), - } - } - - pub fn get_f32(&self, idx: usize) -> f32 { - match self.row[idx].as_ref().unwrap() { - ScalarImpl::Float32(num) => num.into_inner(), - _ => unreachable!("type is not float32 at index: {}", idx), - } - } - - pub fn get_f64(&self, idx: usize) -> f64 { - match self.row[idx].as_ref().unwrap() { - ScalarImpl::Float64(num) => num.into_inner(), - _ => unreachable!("type is not float64 at index: {}", idx), - } - } - - pub fn get_bool(&self, idx: usize) -> bool { - match self.row[idx].as_ref().unwrap() { - ScalarImpl::Bool(num) => *num, - _ => unreachable!("type is not boolean at index: {}", idx), - } - } - - pub fn get_utf8(&self, idx: usize) -> &str { - match self.row[idx].as_ref().unwrap() { - ScalarImpl::Utf8(s) => s.as_ref(), - _ => unreachable!("type is not utf8 at index: {}", idx), - } + pub fn row(&self) -> &OwnedRow { + &self.row } } -impl Iterator { +impl HummockJavaBindingIterator { pub async fn new(read_plan: ReadPlan) -> StorageResult { // Note(bugen): should we forward the implementation to the `StorageTable`? let object_store = Arc::new( diff --git a/src/java_binding/src/lib.rs b/src/java_binding/src/lib.rs index 5ef98b3e12d07..77c752fdb231e 100644 --- a/src/java_binding/src/lib.rs +++ b/src/java_binding/src/lib.rs @@ -17,7 +17,8 @@ #![feature(once_cell)] #![feature(type_alias_impl_trait)] -mod iterator; +mod hummock_iterator; +mod stream_chunk_iterator; use std::backtrace::Backtrace; use std::marker::PhantomData; @@ -26,16 +27,20 @@ use std::panic::catch_unwind; use std::slice::from_raw_parts; use std::sync::LazyLock; -use iterator::{Iterator, KeyedRow}; +use hummock_iterator::{HummockJavaBindingIterator, KeyedRow}; use jni::objects::{AutoArray, JClass, JObject, JString, ReleaseMode}; use jni::sys::{jboolean, jbyte, jbyteArray, jdouble, jfloat, jint, jlong, jshort}; use jni::JNIEnv; use prost::{DecodeError, Message}; +use risingwave_common::array::{ArrayError, StreamChunk}; use risingwave_common::hash::VirtualNode; +use risingwave_common::row::OwnedRow; use risingwave_storage::error::StorageError; use thiserror::Error; use tokio::runtime::Runtime; +use crate::stream_chunk_iterator::{StreamChunkIterator, StreamChunkRow}; + static RUNTIME: LazyLock = LazyLock::new(|| tokio::runtime::Runtime::new().unwrap()); #[derive(Error, Debug)] @@ -60,6 +65,13 @@ enum BindingError { error: DecodeError, backtrace: Backtrace, }, + + #[error("StreamChunkArrayError {error}")] + StreamChunkArray { + #[from] + error: ArrayError, + backtrace: Backtrace, + }, } type Result = std::result::Result; @@ -209,6 +221,38 @@ where } } +pub enum JavaBindingRow { + Keyed(KeyedRow), + StreamChunk(StreamChunkRow), +} + +impl JavaBindingRow { + fn as_keyed(&self) -> &KeyedRow { + match &self { + JavaBindingRow::Keyed(r) => r, + _ => unreachable!("can only call as_keyed for KeyedRow"), + } + } + + fn as_stream_chunk(&self) -> &StreamChunkRow { + match &self { + JavaBindingRow::StreamChunk(r) => r, + _ => unreachable!("can only call as_stream_chunk for StreamChunkRow"), + } + } +} + +impl Deref for JavaBindingRow { + type Target = OwnedRow; + + fn deref(&self) -> &Self::Target { + match &self { + JavaBindingRow::Keyed(r) => r.row(), + JavaBindingRow::StreamChunk(r) => r.row(), + } + } +} + #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount( _env: EnvParam<'_>, @@ -217,34 +261,66 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount( } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNew<'a>( +pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorNew<'a>( env: EnvParam<'a>, read_plan: JByteArray<'a>, -) -> Pointer<'static, Iterator> { +) -> Pointer<'static, HummockJavaBindingIterator> { execute_and_catch(env, move || { let read_plan = Message::decode(read_plan.to_guarded_slice(*env)?.deref())?; - let iter = RUNTIME.block_on(Iterator::new(read_plan))?; + let iter = RUNTIME.block_on(HummockJavaBindingIterator::new(read_plan))?; Ok(iter.into()) }) } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNext<'a>( +pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorNext<'a>( env: EnvParam<'a>, - mut pointer: Pointer<'a, Iterator>, -) -> Pointer<'static, KeyedRow> { + mut pointer: Pointer<'a, HummockJavaBindingIterator>, +) -> Pointer<'static, JavaBindingRow> { execute_and_catch(env, move || { match RUNTIME.block_on(pointer.as_mut().next())? { None => Ok(Pointer::null()), - Some(row) => Ok(row.into()), + Some(row) => Ok(JavaBindingRow::Keyed(row).into()), } }) } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorClose( +pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorClose( _env: EnvParam<'_>, - pointer: Pointer<'_, Iterator>, + pointer: Pointer<'_, HummockJavaBindingIterator>, +) { + pointer.drop(); +} + +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_streamChunkIteratorNew<'a>( + env: EnvParam<'a>, + stream_chunk_payload: JByteArray<'a>, +) -> Pointer<'static, StreamChunkIterator> { + execute_and_catch(env, move || { + let prost_stream_chumk = + Message::decode(stream_chunk_payload.to_guarded_slice(*env)?.deref())?; + let iter = StreamChunkIterator::new(StreamChunk::from_protobuf(&prost_stream_chumk)?); + Ok(iter.into()) + }) +} + +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_streamChunkIteratorNext<'a>( + env: EnvParam<'a>, + mut pointer: Pointer<'a, StreamChunkIterator>, +) -> Pointer<'static, JavaBindingRow> { + execute_and_catch(env, move || match pointer.as_mut().next() { + None => Ok(Pointer::null()), + Some(row) => Ok(JavaBindingRow::StreamChunk(row).into()), + }) +} + +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_streamChunkIteratorClose( + _env: EnvParam<'_>, + pointer: Pointer<'_, StreamChunkIterator>, ) { pointer.drop(); } @@ -252,19 +328,29 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorClose( #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetKey<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, KeyedRow>, + pointer: Pointer<'a, JavaBindingRow>, ) -> JByteArray<'a> { execute_and_catch(env, move || { - Ok(JByteArray::from( - env.byte_array_from_slice(pointer.as_ref().key())?, - )) + Ok(JByteArray::from(env.byte_array_from_slice( + pointer.as_ref().as_keyed().key(), + )?)) + }) +} + +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetOp<'a>( + env: EnvParam<'a>, + pointer: Pointer<'a, JavaBindingRow>, +) -> jint { + execute_and_catch(env, move || { + Ok(pointer.as_ref().as_stream_chunk().op() as jint) }) } #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowIsNull<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, KeyedRow>, + pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> jboolean { execute_and_catch(env, move || { @@ -275,7 +361,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowIsNull<'a>( #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetInt16Value<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, KeyedRow>, + pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> jshort { execute_and_catch(env, move || Ok(pointer.as_ref().get_int16(idx as usize))) @@ -284,7 +370,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetInt16Value #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetInt32Value<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, KeyedRow>, + pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> jint { execute_and_catch(env, move || Ok(pointer.as_ref().get_int32(idx as usize))) @@ -293,7 +379,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetInt32Value #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetInt64Value<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, KeyedRow>, + pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> jlong { execute_and_catch(env, move || Ok(pointer.as_ref().get_int64(idx as usize))) @@ -302,7 +388,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetInt64Value #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetFloatValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, KeyedRow>, + pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> jfloat { execute_and_catch(env, move || Ok(pointer.as_ref().get_f32(idx as usize))) @@ -311,7 +397,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetFloatValue #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDoubleValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, KeyedRow>, + pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> jdouble { execute_and_catch(env, move || Ok(pointer.as_ref().get_f64(idx as usize))) @@ -320,7 +406,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDoubleValu #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetBooleanValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, KeyedRow>, + pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> jboolean { execute_and_catch(env, move || { @@ -331,7 +417,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetBooleanVal #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetStringValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, KeyedRow>, + pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> JString<'a> { execute_and_catch(env, move || { @@ -342,7 +428,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetStringValu #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowClose<'a>( _env: EnvParam<'a>, - pointer: Pointer<'a, KeyedRow>, + pointer: Pointer<'a, JavaBindingRow>, ) { pointer.drop() } diff --git a/src/java_binding/src/stream_chunk_iterator.rs b/src/java_binding/src/stream_chunk_iterator.rs new file mode 100644 index 0000000000000..bf6b3e8acc710 --- /dev/null +++ b/src/java_binding/src/stream_chunk_iterator.rs @@ -0,0 +1,58 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::array::StreamChunk; +use risingwave_common::row::{OwnedRow, Row}; +use risingwave_pb::data::Op; + +pub struct StreamChunkRow { + op: Op, + row: OwnedRow, +} + +impl StreamChunkRow { + pub fn op(&self) -> Op { + self.op + } + + pub fn row(&self) -> &OwnedRow { + &self.row + } +} + +type StreamChunkRowIterator = impl Iterator + 'static; + +pub struct StreamChunkIterator { + iter: StreamChunkRowIterator, +} + +impl StreamChunkIterator { + pub(crate) fn new(stream_chunk: StreamChunk) -> Self { + Self { + iter: stream_chunk + .rows() + .map(|(op, row_ref)| StreamChunkRow { + op: op.to_protobuf(), + row: row_ref.to_owned_row(), + }) + .collect_vec() + .into_iter(), + } + } + + pub(crate) fn next(&mut self) -> Option { + self.iter.next() + } +}