From 9af4c84045cd15b5fcfdf5a29da67ca4b26e8edd Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Thu, 23 Mar 2023 16:32:34 +0800 Subject: [PATCH 01/10] feat: more java bindings --- .../deserializer/StreamChunkDeserializer.java | 18 +++++++++ .../com/risingwave/java/binding/Utils.java | 8 ++-- .../com/risingwave/java/binding/BaseRow.java | 8 ++++ .../com/risingwave/java/binding/Binding.java | 4 ++ src/common/src/row/owned_row.rs | 14 +++++++ src/java_binding/gen-demo-insert-data.py | 3 +- src/java_binding/run_demo.sh | 2 +- .../src/bin/data-chunk-payload-generator.rs | 8 +++- src/java_binding/src/lib.rs | 37 +++++++++++++++++++ 9 files changed, 95 insertions(+), 7 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java index b6be0af1d9394..dac584549a3c4 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java @@ -107,6 +107,24 @@ static ValueGetter[] buildValueGetter(TableSchema tableSchema) { return row.getString(index); }; break; + case TIMESTAMP: + ret[i] = + row -> { + if (row.isNull(index)) { + return null; + } + return row.getDateTime(index); + }; + break; + case DECIMAL: + ret[i] = + row -> { + if (row.isNull(index)) { + return null; + } + return row.getDecimal(index); + }; + break; default: throw io.grpc.Status.INVALID_ARGUMENT .withDescription("unsupported type " + typeName) diff --git a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Utils.java b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Utils.java index 193ba4811bdc1..0e2438e657f0c 100644 --- a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Utils.java +++ b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Utils.java @@ -27,9 +27,9 @@ public static void validateRow(BaseRow row) { throw new RuntimeException( String.format("invalid long value: %s %s", row.getLong(2), rowIndex)); } - if (row.getFloat(3) != (float) rowIndex) { + if (row.getDecimal(3).equals(rowIndex)) { throw new RuntimeException( - String.format("invalid float value: %s %s", row.getFloat(3), rowIndex)); + String.format("invalid decimal value: %s %s", row.getDecimal(3), rowIndex)); } if (row.getDouble(4) != (double) rowIndex) { throw new RuntimeException( @@ -47,7 +47,9 @@ public static void validateRow(BaseRow row) { row.getString(6), ((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))); } - if (row.isNull(7) != (rowIndex % 5 == 0)) { + System.err.format("var: %s\n", row.getDateTime(7)); + + if (row.isNull(8) != (rowIndex % 5 == 0)) { throw new RuntimeException( String.format( "invalid isNull value: %s %s", row.isNull(7), (rowIndex % 5 == 0))); diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java b/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java index 22d55a145deaa..70f2dc4693da7 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java @@ -55,6 +55,14 @@ public String getString(int index) { return Binding.rowGetStringValue(pointer, index); } + public java.sql.Date getDateTime(int index) { + return Binding.rowGetDateTimeValue(pointer, index); + } + + public java.math.BigDecimal getDecimal(int index) { + return Binding.rowGetDecimalValue(pointer, index); + } + @Override public void close() { if (!isClosed) { diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java index f4dec3eecb426..f0c2cb04b3025 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java @@ -53,6 +53,10 @@ public class Binding { static native String rowGetStringValue(long pointer, int index); + static native java.sql.Date rowGetDateTimeValue(long pointer, int index); + + static native java.math.BigDecimal rowGetDecimalValue(long pointer, int index); + // Since the underlying rust does not have garbage collection, we will have to manually call // close on the row to release the row instance pointed by the pointer. static native void rowClose(long pointer); diff --git a/src/common/src/row/owned_row.rs b/src/common/src/row/owned_row.rs index 97373e630d8b1..9d57aff1da8ee 100644 --- a/src/common/src/row/owned_row.rs +++ b/src/common/src/row/owned_row.rs @@ -144,6 +144,20 @@ impl OwnedRow { _ => unreachable!("type is not utf8 at index: {}", idx), } } + + pub fn get_datetime(&self, idx: usize) -> &NaiveDateTimeWrapper { + match self[idx].as_ref().unwrap() { + ScalarImpl::NaiveDateTime(dt) => dt, + _ => unreachable!("type is not NaiveDateTime at index: {}", idx), + } + } + + pub fn get_decimal(&self, idx: usize) -> &Decimal { + match self[idx].as_ref().unwrap() { + ScalarImpl::Decimal(d) => d, + _ => unreachable!("type is not NaiveDateTime at index: {}", idx), + } + } } impl EstimateSize for OwnedRow { diff --git a/src/java_binding/gen-demo-insert-data.py b/src/java_binding/gen-demo-insert-data.py index 56be589763ab2..e804262aeca53 100644 --- a/src/java_binding/gen-demo-insert-data.py +++ b/src/java_binding/gen-demo-insert-data.py @@ -8,8 +8,9 @@ def gen_row(index): v5 = float(index) v6 = index % 3 == 0 v7 = str(index) * ((index % 10) + 1) + v8 = '2023-03-23 03:21:54.141+00:00' may_null = None if index % 5 == 0 else int(index) - row_data = [v1, v2, v3, v4, v5, v6, v7, may_null] + row_data = [v1, v2, v3, v4, v5, v6, v7, v8, may_null] repr = [o.__repr__() if o is not None else 'null' for o in row_data] return '(' + ', '.join(repr) + ')' diff --git a/src/java_binding/run_demo.sh b/src/java_binding/run_demo.sh index fc49d96fd5678..d8caabd447b9b 100644 --- a/src/java_binding/run_demo.sh +++ b/src/java_binding/run_demo.sh @@ -10,7 +10,7 @@ INSERT_DATA=$(python3 ${RISINGWAVE_ROOT}/src/java_binding/gen-demo-insert-data.p psql -d ${DB_NAME} -h localhost -p 4566 -U root << EOF DROP TABLE IF EXISTS ${TABLE_NAME}; -CREATE TABLE ${TABLE_NAME} (v1 smallint, v2 int, v3 bigint, v4 float4, v5 float8, v6 bool, v7 varchar, may_null bigint); +CREATE TABLE ${TABLE_NAME} (v1 smallint, v2 int, v3 bigint, v4 float4, v5 float8, v6 bool, v7 varchar, v8 timestamp, may_null bigint); INSERT INTO ${TABLE_NAME} values ${INSERT_DATA}; FLUSH; EOF diff --git a/src/java_binding/src/bin/data-chunk-payload-generator.rs b/src/java_binding/src/bin/data-chunk-payload-generator.rs index 179d1baab4e2d..a53803fd9120e 100644 --- a/src/java_binding/src/bin/data-chunk-payload-generator.rs +++ b/src/java_binding/src/bin/data-chunk-payload-generator.rs @@ -21,7 +21,7 @@ use risingwave_common::types::{DataType, ScalarImpl, F32, F64}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; fn build_row(index: usize) -> OwnedRow { - let mut row_value = Vec::with_capacity(8); + let mut row_value = Vec::with_capacity(9); row_value.push(Some(ScalarImpl::Int16(index as i16))); row_value.push(Some(ScalarImpl::Int32(index as i32))); row_value.push(Some(ScalarImpl::Int64(index as i64))); @@ -31,6 +31,9 @@ fn build_row(index: usize) -> OwnedRow { row_value.push(Some(ScalarImpl::Utf8( format!("{}", index).repeat((index % 10) + 1).into(), ))); + row_value.push(Some(ScalarImpl::NaiveDateTime( + NaiveDateTimeWrapper::default(), + ))); row_value.push(if index % 5 == 0 { None } else { @@ -46,10 +49,11 @@ fn main() { DataType::Int16, DataType::Int32, DataType::Int64, - DataType::Float32, + DataType::Decimal, DataType::Float64, DataType::Boolean, DataType::Varchar, + DataType::Timestamp, DataType::Int64, ]; let mut ops = Vec::with_capacity(row_count); diff --git a/src/java_binding/src/lib.rs b/src/java_binding/src/lib.rs index 77c752fdb231e..b4ecd6c7aed51 100644 --- a/src/java_binding/src/lib.rs +++ b/src/java_binding/src/lib.rs @@ -425,6 +425,43 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetStringValu }) } +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDateTimeValue<'a>( + env: EnvParam<'a>, + pointer: Pointer<'a, JavaBindingRow>, + idx: jint, +) -> JObject<'a> { + execute_and_catch(env, move || { + let millis = pointer + .as_ref() + .get_datetime(idx as usize) + .0 + .timestamp_millis(); + let date_class = env.find_class("java/sql/Date")?; + let constructor = env.get_method_id(date_class, "", "(J)V")?; + let date_obj = env.new_object_unchecked(date_class, constructor, &[millis.into()])?; + + Ok(date_obj) + }) +} + +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDecimalValue<'a>( + env: EnvParam<'a>, + pointer: Pointer<'a, JavaBindingRow>, + idx: jint, +) -> JObject<'a> { + execute_and_catch(env, move || { + let value = pointer.as_ref().get_decimal(idx as usize).to_string(); + let string_value = env.new_string(value)?; + let decimal_class = env.find_class("java/math/BigDecimal")?; + let constructor = env.get_method_id(decimal_class, "", "(Ljava/lang/String;)V")?; + let date_obj = env.new_object_unchecked(decimal_class, constructor, &[string_value.into()])?; + + Ok(date_obj) + }) +} + #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowClose<'a>( _env: EnvParam<'a>, From db314ead8b37a7480191590ef83a6223ecc55e96 Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Thu, 23 Mar 2023 23:51:35 +0800 Subject: [PATCH 02/10] fix test --- .../deserializer/StreamChunkDeserializer.java | 2 +- .../java/com/risingwave/java/binding/Utils.java | 17 +++++++++++++---- .../com/risingwave/java/binding/BaseRow.java | 4 ++-- .../com/risingwave/java/binding/Binding.java | 2 +- src/java_binding/gen-demo-insert-data.py | 5 +++-- .../src/bin/data-chunk-payload-generator.rs | 8 +++++--- src/java_binding/src/lib.rs | 7 ++++--- 7 files changed, 29 insertions(+), 16 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java index dac584549a3c4..c2d755a9ccc0f 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java @@ -113,7 +113,7 @@ static ValueGetter[] buildValueGetter(TableSchema tableSchema) { if (row.isNull(index)) { return null; } - return row.getDateTime(index); + return row.getTimestamp(index); }; break; case DECIMAL: diff --git a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Utils.java b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Utils.java index 0e2438e657f0c..ac1dfbf210ba4 100644 --- a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Utils.java +++ b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Utils.java @@ -27,9 +27,9 @@ public static void validateRow(BaseRow row) { throw new RuntimeException( String.format("invalid long value: %s %s", row.getLong(2), rowIndex)); } - if (row.getDecimal(3).equals(rowIndex)) { + if (row.getFloat(3) != (float) rowIndex) { throw new RuntimeException( - String.format("invalid decimal value: %s %s", row.getDecimal(3), rowIndex)); + String.format("invalid float value: %s %s", row.getFloat(3), rowIndex)); } if (row.getDouble(4) != (double) rowIndex) { throw new RuntimeException( @@ -47,9 +47,18 @@ public static void validateRow(BaseRow row) { row.getString(6), ((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))); } - System.err.format("var: %s\n", row.getDateTime(7)); - if (row.isNull(8) != (rowIndex % 5 == 0)) { + if (row.getTimestamp(7).getTime() != rowIndex * 1000) { + throw new RuntimeException( + String.format("invalid Timestamp value: %s %s", row.getTimestamp(7), rowIndex)); + } + + if (row.getDecimal(8).intValue() != rowIndex) { + throw new RuntimeException( + String.format("invalid decimal value: %s %s", row.getDecimal(8), rowIndex)); + } + + if (row.isNull(9) != (rowIndex % 5 == 0)) { throw new RuntimeException( String.format( "invalid isNull value: %s %s", row.isNull(7), (rowIndex % 5 == 0))); diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java b/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java index 70f2dc4693da7..2e493691ef801 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java @@ -55,8 +55,8 @@ public String getString(int index) { return Binding.rowGetStringValue(pointer, index); } - public java.sql.Date getDateTime(int index) { - return Binding.rowGetDateTimeValue(pointer, index); + public java.sql.Timestamp getTimestamp(int index) { + return Binding.rowGetTimestampValue(pointer, index); } public java.math.BigDecimal getDecimal(int index) { diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java index f0c2cb04b3025..1fcaa659db577 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java @@ -53,7 +53,7 @@ public class Binding { static native String rowGetStringValue(long pointer, int index); - static native java.sql.Date rowGetDateTimeValue(long pointer, int index); + static native java.sql.Timestamp rowGetTimestampValue(long pointer, int index); static native java.math.BigDecimal rowGetDecimalValue(long pointer, int index); diff --git a/src/java_binding/gen-demo-insert-data.py b/src/java_binding/gen-demo-insert-data.py index e804262aeca53..47761ee7cc6ed 100644 --- a/src/java_binding/gen-demo-insert-data.py +++ b/src/java_binding/gen-demo-insert-data.py @@ -8,9 +8,10 @@ def gen_row(index): v5 = float(index) v6 = index % 3 == 0 v7 = str(index) * ((index % 10) + 1) - v8 = '2023-03-23 03:21:54.141+00:00' + v8 = index + v9 = str(index) may_null = None if index % 5 == 0 else int(index) - row_data = [v1, v2, v3, v4, v5, v6, v7, v8, may_null] + row_data = [v1, v2, v3, v4, v5, v6, v7, v8, v9, may_null] repr = [o.__repr__() if o is not None else 'null' for o in row_data] return '(' + ', '.join(repr) + ')' diff --git a/src/java_binding/src/bin/data-chunk-payload-generator.rs b/src/java_binding/src/bin/data-chunk-payload-generator.rs index a53803fd9120e..61899643de246 100644 --- a/src/java_binding/src/bin/data-chunk-payload-generator.rs +++ b/src/java_binding/src/bin/data-chunk-payload-generator.rs @@ -21,7 +21,7 @@ use risingwave_common::types::{DataType, ScalarImpl, F32, F64}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; fn build_row(index: usize) -> OwnedRow { - let mut row_value = Vec::with_capacity(9); + let mut row_value = Vec::with_capacity(10); row_value.push(Some(ScalarImpl::Int16(index as i16))); row_value.push(Some(ScalarImpl::Int32(index as i32))); row_value.push(Some(ScalarImpl::Int64(index as i64))); @@ -32,8 +32,9 @@ fn build_row(index: usize) -> OwnedRow { format!("{}", index).repeat((index % 10) + 1).into(), ))); row_value.push(Some(ScalarImpl::NaiveDateTime( - NaiveDateTimeWrapper::default(), + NaiveDateTimeWrapper::from_timestamp_uncheck(index as _, 0), ))); + row_value.push(Some(ScalarImpl::Decimal(index.into()))); row_value.push(if index % 5 == 0 { None } else { @@ -49,11 +50,12 @@ fn main() { DataType::Int16, DataType::Int32, DataType::Int64, - DataType::Decimal, + DataType::Float32, DataType::Float64, DataType::Boolean, DataType::Varchar, DataType::Timestamp, + DataType::Decimal, DataType::Int64, ]; let mut ops = Vec::with_capacity(row_count); diff --git a/src/java_binding/src/lib.rs b/src/java_binding/src/lib.rs index b4ecd6c7aed51..287535691763c 100644 --- a/src/java_binding/src/lib.rs +++ b/src/java_binding/src/lib.rs @@ -426,7 +426,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetStringValu } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDateTimeValue<'a>( +pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetTimestampValue<'a>( env: EnvParam<'a>, pointer: Pointer<'a, JavaBindingRow>, idx: jint, @@ -437,7 +437,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDateTimeVa .get_datetime(idx as usize) .0 .timestamp_millis(); - let date_class = env.find_class("java/sql/Date")?; + let date_class = env.find_class("java/sql/Timestamp")?; let constructor = env.get_method_id(date_class, "", "(J)V")?; let date_obj = env.new_object_unchecked(date_class, constructor, &[millis.into()])?; @@ -456,7 +456,8 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDecimalVal let string_value = env.new_string(value)?; let decimal_class = env.find_class("java/math/BigDecimal")?; let constructor = env.get_method_id(decimal_class, "", "(Ljava/lang/String;)V")?; - let date_obj = env.new_object_unchecked(decimal_class, constructor, &[string_value.into()])?; + let date_obj = + env.new_object_unchecked(decimal_class, constructor, &[string_value.into()])?; Ok(date_obj) }) From ba561a9e28e1c0335122e0d5841245cea99dbd49 Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Fri, 24 Mar 2023 09:54:52 +0800 Subject: [PATCH 03/10] fix test --- src/java_binding/gen-demo-insert-data.py | 4 ++-- src/java_binding/run_demo.sh | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/java_binding/gen-demo-insert-data.py b/src/java_binding/gen-demo-insert-data.py index 47761ee7cc6ed..fe715c3a48579 100644 --- a/src/java_binding/gen-demo-insert-data.py +++ b/src/java_binding/gen-demo-insert-data.py @@ -8,8 +8,8 @@ def gen_row(index): v5 = float(index) v6 = index % 3 == 0 v7 = str(index) * ((index % 10) + 1) - v8 = index - v9 = str(index) + v8 = "to_timestamp(" + str(index) + ")" + v9 = index may_null = None if index % 5 == 0 else int(index) row_data = [v1, v2, v3, v4, v5, v6, v7, v8, v9, may_null] repr = [o.__repr__() if o is not None else 'null' for o in row_data] diff --git a/src/java_binding/run_demo.sh b/src/java_binding/run_demo.sh index d8caabd447b9b..9c7fa0fd8158f 100644 --- a/src/java_binding/run_demo.sh +++ b/src/java_binding/run_demo.sh @@ -10,7 +10,7 @@ INSERT_DATA=$(python3 ${RISINGWAVE_ROOT}/src/java_binding/gen-demo-insert-data.p psql -d ${DB_NAME} -h localhost -p 4566 -U root << EOF DROP TABLE IF EXISTS ${TABLE_NAME}; -CREATE TABLE ${TABLE_NAME} (v1 smallint, v2 int, v3 bigint, v4 float4, v5 float8, v6 bool, v7 varchar, v8 timestamp, may_null bigint); +CREATE TABLE ${TABLE_NAME} (v1 smallint, v2 int, v3 bigint, v4 float4, v5 float8, v6 bool, v7 varchar, v8 timestamp, v9 decimal, may_null bigint); INSERT INTO ${TABLE_NAME} values ${INSERT_DATA}; FLUSH; EOF From 640595806bab365a1a7f30c75a138af21ffc25a4 Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Fri, 24 Mar 2023 10:23:27 +0800 Subject: [PATCH 04/10] fix test --- src/java_binding/gen-demo-insert-data.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/java_binding/gen-demo-insert-data.py b/src/java_binding/gen-demo-insert-data.py index fe715c3a48579..0eee30121f583 100644 --- a/src/java_binding/gen-demo-insert-data.py +++ b/src/java_binding/gen-demo-insert-data.py @@ -7,12 +7,12 @@ def gen_row(index): v4 = float(index) v5 = float(index) v6 = index % 3 == 0 - v7 = str(index) * ((index % 10) + 1) + v7 = '\'' + str(index) * ((index % 10) + 1) '\'' v8 = "to_timestamp(" + str(index) + ")" v9 = index may_null = None if index % 5 == 0 else int(index) row_data = [v1, v2, v3, v4, v5, v6, v7, v8, v9, may_null] - repr = [o.__repr__() if o is not None else 'null' for o in row_data] + repr = [str(o) if o is not None else 'null' for o in row_data] return '(' + ', '.join(repr) + ')' From 8b72ff972fad811d482151aabe70499c2d271bea Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Fri, 24 Mar 2023 10:43:00 +0800 Subject: [PATCH 05/10] fix test --- src/java_binding/gen-demo-insert-data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java_binding/gen-demo-insert-data.py b/src/java_binding/gen-demo-insert-data.py index 0eee30121f583..6ffc79077eb82 100644 --- a/src/java_binding/gen-demo-insert-data.py +++ b/src/java_binding/gen-demo-insert-data.py @@ -7,7 +7,7 @@ def gen_row(index): v4 = float(index) v5 = float(index) v6 = index % 3 == 0 - v7 = '\'' + str(index) * ((index % 10) + 1) '\'' + v7 = '\'' + str(index) * ((index % 10) + 1) + '\'' v8 = "to_timestamp(" + str(index) + ")" v9 = index may_null = None if index % 5 == 0 else int(index) From 54ab2370eb5830b75ca55e380bf0df7a644ba063 Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Mon, 27 Mar 2023 17:01:02 +0800 Subject: [PATCH 06/10] cache class --- Cargo.lock | 1 + src/java_binding/Cargo.toml | 1 + src/java_binding/src/hummock_iterator.rs | 7 +- src/java_binding/src/lib.rs | 94 +++++++++++++++---- src/java_binding/src/stream_chunk_iterator.rs | 4 + 5 files changed, 86 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 17247bffd21ec..b2ed0e967afed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6298,6 +6298,7 @@ dependencies = [ "itertools", "jni", "madsim-tokio", + "once_cell", "prost 0.11.8", "risingwave_common", "risingwave_hummock_sdk", diff --git a/src/java_binding/Cargo.toml b/src/java_binding/Cargo.toml index 43f1aae283a8b..4e67e678e79a6 100644 --- a/src/java_binding/Cargo.toml +++ b/src/java_binding/Cargo.toml @@ -15,6 +15,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } itertools = "0.10" jni = "0.20.0" prost = "0.11" +once_cell = "1" risingwave_common = { path = "../common" } risingwave_hummock_sdk = { path = "../storage/hummock_sdk" } risingwave_object_store = { path = "../object_store" } diff --git a/src/java_binding/src/hummock_iterator.rs b/src/java_binding/src/hummock_iterator.rs index 5515d50a48c9b..f1c27d468636a 100644 --- a/src/java_binding/src/hummock_iterator.rs +++ b/src/java_binding/src/hummock_iterator.rs @@ -50,6 +50,7 @@ fn select_all_vnode_stream( pub struct HummockJavaBindingIterator { row_serde: EitherSerde, stream: SelectAllIterStream, + pub class_cache: Arc, } pub struct KeyedRow { @@ -137,7 +138,11 @@ impl HummockJavaBindingIterator { BasicSerde::new(&column_ids, schema.into()).into() }; - Ok(Self { row_serde, stream }) + Ok(Self { + row_serde, + stream, + class_cache: Default::default(), + }) } pub async fn next(&mut self) -> StorageResult> { diff --git a/src/java_binding/src/lib.rs b/src/java_binding/src/lib.rs index 287535691763c..c4c8e6268bdc0 100644 --- a/src/java_binding/src/lib.rs +++ b/src/java_binding/src/lib.rs @@ -25,12 +25,13 @@ use std::marker::PhantomData; use std::ops::Deref; use std::panic::catch_unwind; use std::slice::from_raw_parts; -use std::sync::LazyLock; +use std::sync::{Arc, LazyLock}; use hummock_iterator::{HummockJavaBindingIterator, KeyedRow}; -use jni::objects::{AutoArray, JClass, JObject, JString, ReleaseMode}; +use jni::objects::{AutoArray, GlobalRef, JClass, JMethodID, JObject, JString, ReleaseMode}; use jni::sys::{jboolean, jbyte, jbyteArray, jdouble, jfloat, jint, jlong, jshort}; use jni::JNIEnv; +use once_cell::sync::OnceCell; use prost::{DecodeError, Message}; use risingwave_common::array::{ArrayError, StreamChunk}; use risingwave_common::hash::VirtualNode; @@ -221,22 +222,46 @@ where } } -pub enum JavaBindingRow { +pub enum JavaBindingRowKind { Keyed(KeyedRow), StreamChunk(StreamChunkRow), } +#[derive(Default)] +pub struct JavaBindingRowCache { + big_decimal_class: OnceCell, + timestamp_class: OnceCell, +} + +pub struct JavaBindingRow { + underlying: JavaBindingRowKind, + class_cache: Arc, +} impl JavaBindingRow { + fn new_stream_chunk(underlying: StreamChunkRow, class_cache: Arc) -> Self { + Self { + underlying: JavaBindingRowKind::StreamChunk(underlying), + class_cache, + } + } + + fn new_keyed(underlying: KeyedRow, class_cache: Arc) -> Self { + Self { + underlying: JavaBindingRowKind::Keyed(underlying), + class_cache, + } + } + fn as_keyed(&self) -> &KeyedRow { - match &self { - JavaBindingRow::Keyed(r) => r, + match &self.underlying { + JavaBindingRowKind::Keyed(r) => r, _ => unreachable!("can only call as_keyed for KeyedRow"), } } fn as_stream_chunk(&self) -> &StreamChunkRow { - match &self { - JavaBindingRow::StreamChunk(r) => r, + match &self.underlying { + JavaBindingRowKind::StreamChunk(r) => r, _ => unreachable!("can only call as_stream_chunk for StreamChunkRow"), } } @@ -246,9 +271,9 @@ impl Deref for JavaBindingRow { type Target = OwnedRow; fn deref(&self) -> &Self::Target { - match &self { - JavaBindingRow::Keyed(r) => r.row(), - JavaBindingRow::StreamChunk(r) => r.row(), + match &self.underlying { + JavaBindingRowKind::Keyed(r) => r.row(), + JavaBindingRowKind::StreamChunk(r) => r.row(), } } } @@ -278,9 +303,10 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorN mut pointer: Pointer<'a, HummockJavaBindingIterator>, ) -> Pointer<'static, JavaBindingRow> { execute_and_catch(env, move || { - match RUNTIME.block_on(pointer.as_mut().next())? { + let iter = pointer.as_mut(); + match RUNTIME.block_on(iter.next())? { None => Ok(Pointer::null()), - Some(row) => Ok(JavaBindingRow::Keyed(row).into()), + Some(row) => Ok(JavaBindingRow::new_keyed(row, iter.class_cache.clone()).into()), } }) } @@ -311,9 +337,12 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_streamChunkItera env: EnvParam<'a>, mut pointer: Pointer<'a, StreamChunkIterator>, ) -> Pointer<'static, JavaBindingRow> { - execute_and_catch(env, move || match pointer.as_mut().next() { - None => Ok(Pointer::null()), - Some(row) => Ok(JavaBindingRow::StreamChunk(row).into()), + execute_and_catch(env, move || { + let iter = pointer.as_mut(); + match iter.next() { + None => Ok(Pointer::null()), + Some(row) => Ok(JavaBindingRow::new_stream_chunk(row, iter.class_cache.clone()).into()), + } }) } @@ -431,15 +460,27 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetTimestampV pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> JObject<'a> { + // since JMethodID is always validate until the belonging class is unload. + static INIT_METHOD: OnceCell = OnceCell::new(); execute_and_catch(env, move || { let millis = pointer .as_ref() .get_datetime(idx as usize) .0 .timestamp_millis(); - let date_class = env.find_class("java/sql/Timestamp")?; - let constructor = env.get_method_id(date_class, "", "(J)V")?; - let date_obj = env.new_object_unchecked(date_class, constructor, &[millis.into()])?; + let ts_class_ref = pointer + .as_ref() + .class_cache + .timestamp_class + .get_or_try_init(|| { + let cls = env.find_class("java/sql/Timestamp")?; + Ok::<_, jni::errors::Error>(env.new_global_ref(cls)?) + })?; + let ts_class = JClass::from(ts_class_ref.as_obj()); + let constructor = INIT_METHOD + .get_or_try_init(|| env.get_method_id(ts_class, "", "(J)V"))? + .clone(); + let date_obj = env.new_object_unchecked(ts_class, constructor, &[millis.into()])?; Ok(date_obj) }) @@ -451,11 +492,24 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDecimalVal pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> JObject<'a> { + static INIT_METHOD: OnceCell = OnceCell::new(); execute_and_catch(env, move || { let value = pointer.as_ref().get_decimal(idx as usize).to_string(); let string_value = env.new_string(value)?; - let decimal_class = env.find_class("java/math/BigDecimal")?; - let constructor = env.get_method_id(decimal_class, "", "(Ljava/lang/String;)V")?; + let ts_class_ref = pointer + .as_ref() + .class_cache + .big_decimal_class + .get_or_try_init(|| { + let cls = env.find_class("java/math/BigDecimal")?; + Ok::<_, jni::errors::Error>(env.new_global_ref(cls)?) + })?; + let decimal_class = JClass::from(ts_class_ref.as_obj()); + let constructor = INIT_METHOD + .get_or_try_init(|| { + env.get_method_id(decimal_class, "", "(Ljava/lang/String;)V") + })? + .clone(); let date_obj = env.new_object_unchecked(decimal_class, constructor, &[string_value.into()])?; diff --git a/src/java_binding/src/stream_chunk_iterator.rs b/src/java_binding/src/stream_chunk_iterator.rs index bf6b3e8acc710..7277a4c1697ef 100644 --- a/src/java_binding/src/stream_chunk_iterator.rs +++ b/src/java_binding/src/stream_chunk_iterator.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::row::{OwnedRow, Row}; @@ -36,6 +38,7 @@ type StreamChunkRowIterator = impl Iterator + 'static; pub struct StreamChunkIterator { iter: StreamChunkRowIterator, + pub class_cache: Arc, } impl StreamChunkIterator { @@ -49,6 +52,7 @@ impl StreamChunkIterator { }) .collect_vec() .into_iter(), + class_cache: Default::default(), } } From 1b0f6ed0a1779b9b722c9256b7ced85af901a795 Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Tue, 28 Mar 2023 09:45:20 +0800 Subject: [PATCH 07/10] fix test --- src/java_binding/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java_binding/Cargo.toml b/src/java_binding/Cargo.toml index 4e67e678e79a6..4e47a0121d93c 100644 --- a/src/java_binding/Cargo.toml +++ b/src/java_binding/Cargo.toml @@ -14,8 +14,8 @@ bytes = "1" futures = { version = "0.3", default-features = false, features = ["alloc"] } itertools = "0.10" jni = "0.20.0" -prost = "0.11" once_cell = "1" +prost = "0.11" risingwave_common = { path = "../common" } risingwave_hummock_sdk = { path = "../storage/hummock_sdk" } risingwave_object_store = { path = "../object_store" } From e27e0e6fbf6f759318e4901e27463e60e26b0293 Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Tue, 28 Mar 2023 10:04:12 +0800 Subject: [PATCH 08/10] fix test --- src/common/src/row/owned_row.rs | 4 ++-- .../src/bin/data-chunk-payload-generator.rs | 6 +++--- src/java_binding/src/lib.rs | 21 ++++++++----------- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/src/common/src/row/owned_row.rs b/src/common/src/row/owned_row.rs index 9d57aff1da8ee..a52ec5617c394 100644 --- a/src/common/src/row/owned_row.rs +++ b/src/common/src/row/owned_row.rs @@ -145,9 +145,9 @@ impl OwnedRow { } } - pub fn get_datetime(&self, idx: usize) -> &NaiveDateTimeWrapper { + pub fn get_datetime(&self, idx: usize) -> &Timestamp { match self[idx].as_ref().unwrap() { - ScalarImpl::NaiveDateTime(dt) => dt, + ScalarImpl::Timestamp(dt) => dt, _ => unreachable!("type is not NaiveDateTime at index: {}", idx), } } diff --git a/src/java_binding/src/bin/data-chunk-payload-generator.rs b/src/java_binding/src/bin/data-chunk-payload-generator.rs index 61899643de246..20ab2b65148e3 100644 --- a/src/java_binding/src/bin/data-chunk-payload-generator.rs +++ b/src/java_binding/src/bin/data-chunk-payload-generator.rs @@ -17,7 +17,7 @@ use std::io::Write; use prost::Message; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::row::OwnedRow; -use risingwave_common::types::{DataType, ScalarImpl, F32, F64}; +use risingwave_common::types::{DataType, ScalarImpl, Timestamp, F32, F64}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; fn build_row(index: usize) -> OwnedRow { @@ -31,8 +31,8 @@ fn build_row(index: usize) -> OwnedRow { row_value.push(Some(ScalarImpl::Utf8( format!("{}", index).repeat((index % 10) + 1).into(), ))); - row_value.push(Some(ScalarImpl::NaiveDateTime( - NaiveDateTimeWrapper::from_timestamp_uncheck(index as _, 0), + row_value.push(Some(ScalarImpl::Timestamp( + Timestamp::from_timestamp_uncheck(index as _, 0), ))); row_value.push(Some(ScalarImpl::Decimal(index.into()))); row_value.push(if index % 5 == 0 { diff --git a/src/java_binding/src/lib.rs b/src/java_binding/src/lib.rs index c4c8e6268bdc0..d78fe3d2dcaa6 100644 --- a/src/java_binding/src/lib.rs +++ b/src/java_binding/src/lib.rs @@ -474,13 +474,12 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetTimestampV .timestamp_class .get_or_try_init(|| { let cls = env.find_class("java/sql/Timestamp")?; - Ok::<_, jni::errors::Error>(env.new_global_ref(cls)?) + env.new_global_ref(cls) })?; let ts_class = JClass::from(ts_class_ref.as_obj()); - let constructor = INIT_METHOD - .get_or_try_init(|| env.get_method_id(ts_class, "", "(J)V"))? - .clone(); - let date_obj = env.new_object_unchecked(ts_class, constructor, &[millis.into()])?; + let constructor = + INIT_METHOD.get_or_try_init(|| env.get_method_id(ts_class, "", "(J)V"))?; + let date_obj = env.new_object_unchecked(ts_class, *constructor, &[millis.into()])?; Ok(date_obj) }) @@ -502,16 +501,14 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDecimalVal .big_decimal_class .get_or_try_init(|| { let cls = env.find_class("java/math/BigDecimal")?; - Ok::<_, jni::errors::Error>(env.new_global_ref(cls)?) + env.new_global_ref(cls) })?; let decimal_class = JClass::from(ts_class_ref.as_obj()); - let constructor = INIT_METHOD - .get_or_try_init(|| { - env.get_method_id(decimal_class, "", "(Ljava/lang/String;)V") - })? - .clone(); + let constructor = INIT_METHOD.get_or_try_init(|| { + env.get_method_id(decimal_class, "", "(Ljava/lang/String;)V") + })?; let date_obj = - env.new_object_unchecked(decimal_class, constructor, &[string_value.into()])?; + env.new_object_unchecked(decimal_class, *constructor, &[string_value.into()])?; Ok(date_obj) }) From dbe284436e3d156201bdb77ee59011607f62228e Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Tue, 28 Mar 2023 15:41:57 +0800 Subject: [PATCH 09/10] rename --- src/java_binding/src/hummock_iterator.rs | 2 +- src/java_binding/src/lib.rs | 44 +++++++++++-------- src/java_binding/src/stream_chunk_iterator.rs | 2 +- 3 files changed, 28 insertions(+), 20 deletions(-) diff --git a/src/java_binding/src/hummock_iterator.rs b/src/java_binding/src/hummock_iterator.rs index f1c27d468636a..c2c0b8a7cfc8c 100644 --- a/src/java_binding/src/hummock_iterator.rs +++ b/src/java_binding/src/hummock_iterator.rs @@ -50,7 +50,7 @@ fn select_all_vnode_stream( pub struct HummockJavaBindingIterator { row_serde: EitherSerde, stream: SelectAllIterStream, - pub class_cache: Arc, + pub class_cache: Arc, } pub struct KeyedRow { diff --git a/src/java_binding/src/lib.rs b/src/java_binding/src/lib.rs index d78fe3d2dcaa6..e46f59039474a 100644 --- a/src/java_binding/src/lib.rs +++ b/src/java_binding/src/lib.rs @@ -222,46 +222,49 @@ where } } -pub enum JavaBindingRowKind { +pub enum JavaBindingRowInner { Keyed(KeyedRow), StreamChunk(StreamChunkRow), } #[derive(Default)] -pub struct JavaBindingRowCache { +pub struct JavaClassMethodCache { big_decimal_class: OnceCell, timestamp_class: OnceCell, } pub struct JavaBindingRow { - underlying: JavaBindingRowKind, - class_cache: Arc, + inner: JavaBindingRowInner, + class_cache: Arc, } impl JavaBindingRow { - fn new_stream_chunk(underlying: StreamChunkRow, class_cache: Arc) -> Self { + fn with_stream_chunk( + underlying: StreamChunkRow, + class_cache: Arc, + ) -> Self { Self { - underlying: JavaBindingRowKind::StreamChunk(underlying), + inner: JavaBindingRowInner::StreamChunk(underlying), class_cache, } } - fn new_keyed(underlying: KeyedRow, class_cache: Arc) -> Self { + fn with_keyed(underlying: KeyedRow, class_cache: Arc) -> Self { Self { - underlying: JavaBindingRowKind::Keyed(underlying), + inner: JavaBindingRowInner::Keyed(underlying), class_cache, } } fn as_keyed(&self) -> &KeyedRow { - match &self.underlying { - JavaBindingRowKind::Keyed(r) => r, + match &self.inner { + JavaBindingRowInner::Keyed(r) => r, _ => unreachable!("can only call as_keyed for KeyedRow"), } } fn as_stream_chunk(&self) -> &StreamChunkRow { - match &self.underlying { - JavaBindingRowKind::StreamChunk(r) => r, + match &self.inner { + JavaBindingRowInner::StreamChunk(r) => r, _ => unreachable!("can only call as_stream_chunk for StreamChunkRow"), } } @@ -271,9 +274,9 @@ impl Deref for JavaBindingRow { type Target = OwnedRow; fn deref(&self) -> &Self::Target { - match &self.underlying { - JavaBindingRowKind::Keyed(r) => r.row(), - JavaBindingRowKind::StreamChunk(r) => r.row(), + match &self.inner { + JavaBindingRowInner::Keyed(r) => r.row(), + JavaBindingRowInner::StreamChunk(r) => r.row(), } } } @@ -306,7 +309,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorN let iter = pointer.as_mut(); match RUNTIME.block_on(iter.next())? { None => Ok(Pointer::null()), - Some(row) => Ok(JavaBindingRow::new_keyed(row, iter.class_cache.clone()).into()), + Some(row) => Ok(JavaBindingRow::with_keyed(row, iter.class_cache.clone()).into()), } }) } @@ -341,7 +344,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_streamChunkItera let iter = pointer.as_mut(); match iter.next() { None => Ok(Pointer::null()), - Some(row) => Ok(JavaBindingRow::new_stream_chunk(row, iter.class_cache.clone()).into()), + Some(row) => { + Ok(JavaBindingRow::with_stream_chunk(row, iter.class_cache.clone()).into()) + } } }) } @@ -460,7 +465,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetTimestampV pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> JObject<'a> { - // since JMethodID is always validate until the belonging class is unload. + // The JMethodID is globally available until the referenced class is unloaded. + // However, if the referenced class is unloaded, calling find_class will result in an error and + // prevent access to the INIT_METHOD. So it's safe here to cache it with static lifetime. static INIT_METHOD: OnceCell = OnceCell::new(); execute_and_catch(env, move || { let millis = pointer @@ -491,6 +498,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDecimalVal pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> JObject<'a> { + // Same as Java_com_risingwave_java_binding_Binding_rowGetTimestampValue. static INIT_METHOD: OnceCell = OnceCell::new(); execute_and_catch(env, move || { let value = pointer.as_ref().get_decimal(idx as usize).to_string(); diff --git a/src/java_binding/src/stream_chunk_iterator.rs b/src/java_binding/src/stream_chunk_iterator.rs index 7277a4c1697ef..d62117a0aa108 100644 --- a/src/java_binding/src/stream_chunk_iterator.rs +++ b/src/java_binding/src/stream_chunk_iterator.rs @@ -38,7 +38,7 @@ type StreamChunkRowIterator = impl Iterator + 'static; pub struct StreamChunkIterator { iter: StreamChunkRowIterator, - pub class_cache: Arc, + pub class_cache: Arc, } impl StreamChunkIterator { From d5965905e3cf162ba09191dec58362a699f26b71 Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Thu, 30 Mar 2023 16:25:42 +0800 Subject: [PATCH 10/10] cache method together with class ref --- src/java_binding/src/lib.rs | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/src/java_binding/src/lib.rs b/src/java_binding/src/lib.rs index e46f59039474a..955f2f6e17494 100644 --- a/src/java_binding/src/lib.rs +++ b/src/java_binding/src/lib.rs @@ -228,8 +228,8 @@ pub enum JavaBindingRowInner { } #[derive(Default)] pub struct JavaClassMethodCache { - big_decimal_class: OnceCell, - timestamp_class: OnceCell, + big_decimal_ctor: OnceCell<(GlobalRef, JMethodID)>, + timestamp_ctor: OnceCell<(GlobalRef, JMethodID)>, } pub struct JavaBindingRow { @@ -465,27 +465,22 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetTimestampV pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> JObject<'a> { - // The JMethodID is globally available until the referenced class is unloaded. - // However, if the referenced class is unloaded, calling find_class will result in an error and - // prevent access to the INIT_METHOD. So it's safe here to cache it with static lifetime. - static INIT_METHOD: OnceCell = OnceCell::new(); execute_and_catch(env, move || { let millis = pointer .as_ref() .get_datetime(idx as usize) .0 .timestamp_millis(); - let ts_class_ref = pointer + let (ts_class_ref, constructor) = pointer .as_ref() .class_cache - .timestamp_class + .timestamp_ctor .get_or_try_init(|| { let cls = env.find_class("java/sql/Timestamp")?; - env.new_global_ref(cls) + let init_method = env.get_method_id(cls, "", "(J)V")?; + Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method)) })?; let ts_class = JClass::from(ts_class_ref.as_obj()); - let constructor = - INIT_METHOD.get_or_try_init(|| env.get_method_id(ts_class, "", "(J)V"))?; let date_obj = env.new_object_unchecked(ts_class, *constructor, &[millis.into()])?; Ok(date_obj) @@ -498,23 +493,19 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDecimalVal pointer: Pointer<'a, JavaBindingRow>, idx: jint, ) -> JObject<'a> { - // Same as Java_com_risingwave_java_binding_Binding_rowGetTimestampValue. - static INIT_METHOD: OnceCell = OnceCell::new(); execute_and_catch(env, move || { let value = pointer.as_ref().get_decimal(idx as usize).to_string(); let string_value = env.new_string(value)?; - let ts_class_ref = pointer + let (decimal_class_ref, constructor) = pointer .as_ref() .class_cache - .big_decimal_class + .big_decimal_ctor .get_or_try_init(|| { let cls = env.find_class("java/math/BigDecimal")?; - env.new_global_ref(cls) + let init_method = env.get_method_id(cls, "", "(Ljava/lang/String;)V")?; + Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method)) })?; - let decimal_class = JClass::from(ts_class_ref.as_obj()); - let constructor = INIT_METHOD.get_or_try_init(|| { - env.get_method_id(decimal_class, "", "(Ljava/lang/String;)V") - })?; + let decimal_class = JClass::from(decimal_class_ref.as_obj()); let date_obj = env.new_object_unchecked(decimal_class, *constructor, &[string_value.into()])?;