diff --git a/dashboard/proto/gen/java_binding.ts b/dashboard/proto/gen/java_binding.ts index ed7c72438c87b..5cc7e19adaf82 100644 --- a/dashboard/proto/gen/java_binding.ts +++ b/dashboard/proto/gen/java_binding.ts @@ -67,6 +67,7 @@ export interface ReadPlan { epoch: number; version: HummockVersion | undefined; tableCatalog: Table | undefined; + vnodeIds: number[]; } function createBaseKeyRange(): KeyRange { @@ -118,6 +119,7 @@ function createBaseReadPlan(): ReadPlan { epoch: 0, version: undefined, tableCatalog: undefined, + vnodeIds: [], }; } @@ -131,6 +133,7 @@ export const ReadPlan = { epoch: isSet(object.epoch) ? Number(object.epoch) : 0, version: isSet(object.version) ? HummockVersion.fromJSON(object.version) : undefined, tableCatalog: isSet(object.tableCatalog) ? Table.fromJSON(object.tableCatalog) : undefined, + vnodeIds: Array.isArray(object?.vnodeIds) ? object.vnodeIds.map((e: any) => Number(e)) : [], }; }, @@ -145,6 +148,11 @@ export const ReadPlan = { (obj.version = message.version ? HummockVersion.toJSON(message.version) : undefined); message.tableCatalog !== undefined && (obj.tableCatalog = message.tableCatalog ? Table.toJSON(message.tableCatalog) : undefined); + if (message.vnodeIds) { + obj.vnodeIds = message.vnodeIds.map((e) => Math.round(e)); + } else { + obj.vnodeIds = []; + } return obj; }, @@ -163,6 +171,7 @@ export const ReadPlan = { message.tableCatalog = (object.tableCatalog !== undefined && object.tableCatalog !== null) ? Table.fromPartial(object.tableCatalog) : undefined; + message.vnodeIds = object.vnodeIds?.map((e) => e) || []; return message; }, }; diff --git a/proto/java_binding.proto b/proto/java_binding.proto index 017655963f256..32ed2f5df1992 100644 --- a/proto/java_binding.proto +++ b/proto/java_binding.proto @@ -32,4 +32,6 @@ message ReadPlan { hummock.HummockVersion version = 6; catalog.Table table_catalog = 7; + + repeated uint32 vnode_ids = 8; } diff --git a/src/java_binding/java/com_risingwave_java_binding_Binding.h b/src/java_binding/java/com_risingwave_java_binding_Binding.h index 91be8c9b22393..0f4181f2cefb7 100644 --- a/src/java_binding/java/com_risingwave_java_binding_Binding.h +++ b/src/java_binding/java/com_risingwave_java_binding_Binding.h @@ -7,6 +7,14 @@ #ifdef __cplusplus extern "C" { #endif +/* + * Class: com_risingwave_java_binding_Binding + * Method: vnodeCount + * Signature: ()I + */ +JNIEXPORT jint JNICALL Java_com_risingwave_java_binding_Binding_vnodeCount + (JNIEnv *, jclass); + /* * Class: com_risingwave_java_binding_Binding * Method: iteratorNew diff --git a/src/java_binding/java/java-binding/src/main/java/com/risingwave/java/Demo.java b/src/java_binding/java/java-binding/src/main/java/com/risingwave/java/Demo.java index 6a8bd798ed29f..c6c2aa7a66027 100644 --- a/src/java_binding/java/java-binding/src/main/java/com/risingwave/java/Demo.java +++ b/src/java_binding/java/java-binding/src/main/java/com/risingwave/java/Demo.java @@ -1,5 +1,6 @@ package com.risingwave.java; +import com.risingwave.java.binding.Binding; import com.risingwave.java.binding.Iterator; import com.risingwave.java.binding.KeyedRow; import com.risingwave.java.binding.rpc.MetaClient; @@ -9,7 +10,9 @@ import com.risingwave.proto.JavaBinding.KeyRange.Bound; import com.risingwave.proto.JavaBinding.ReadPlan; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -34,6 +37,14 @@ public static void main(String[] args) { metaClient.startHeartbeatLoop(Duration.ofMillis(1000)); HummockVersion version = metaClient.pinVersion(); Table tableCatalog = metaClient.getTable(dbName, tableName); + + int vnodeCount = Binding.vnodeCount(); + + List vnodeList = new ArrayList<>(); + for (int i = 0; i < vnodeCount; i++) { + vnodeList.add(i); + } + ReadPlan readPlan = ReadPlan.newBuilder() .setDataDir(dataDir) @@ -43,6 +54,7 @@ public static void main(String[] args) { .setEpoch(version.getMaxCommittedEpoch()) .setVersion(version) .setTableCatalog(tableCatalog) + .addAllVnodeIds(vnodeList) .build(); try (Iterator iter = new Iterator(readPlan)) { diff --git a/src/java_binding/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java b/src/java_binding/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java index 5e469d09f8513..e596603514df0 100644 --- a/src/java_binding/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java +++ b/src/java_binding/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java @@ -5,6 +5,8 @@ public class Binding { System.loadLibrary("risingwave_java_binding"); } + public static native int vnodeCount(); + // iterator method // Return a pointer to the iterator static native long iteratorNew(byte[] readPlan); diff --git a/src/java_binding/src/iterator.rs b/src/java_binding/src/iterator.rs index e65d0e3e2796f..e77049a9c488d 100644 --- a/src/java_binding/src/iterator.rs +++ b/src/java_binding/src/iterator.rs @@ -12,30 +12,39 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::pin::Pin; use std::sync::Arc; use bytes::Bytes; use futures::TryStreamExt; +use risingwave_common::hash::VirtualNode; use risingwave_common::row::{OwnedRow, RowDeserializer}; use risingwave_common::types::ScalarImpl; -use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; +use risingwave_common::util::select_all; +use risingwave_hummock_sdk::key::{map_table_key_range, prefixed_range, TableKeyRange}; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; use risingwave_pb::java_binding::key_range::Bound; use risingwave_pb::java_binding::{KeyRange, ReadPlan}; -use risingwave_storage::error::{StorageError, StorageResult}; +use risingwave_storage::error::StorageResult; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use risingwave_storage::hummock::store::state_store::HummockStorageIterator; use risingwave_storage::hummock::store::version::HummockVersionReader; use risingwave_storage::hummock::{SstableStore, TieredCache}; use risingwave_storage::monitor::HummockStateStoreMetrics; -use risingwave_storage::store::{ReadOptions, StreamTypeOfIter}; +use risingwave_storage::store::{ReadOptions, StateStoreReadIterStream, StreamTypeOfIter}; use tokio::sync::mpsc::unbounded_channel; +type SelectAllIterStream = impl StateStoreReadIterStream + Unpin; + +fn select_all_vnode_stream( + streams: Vec>, +) -> SelectAllIterStream { + select_all(streams.into_iter().map(Box::pin)) +} + pub struct Iterator { row_serializer: RowDeserializer, - stream: Pin>>, + stream: SelectAllIterStream, } pub struct KeyedRow { @@ -122,10 +131,17 @@ impl Iterator { let reader = HummockVersionReader::new(sstable_store, Arc::new(HummockStateStoreMetrics::unused())); - let stream = { + let mut streams = Vec::with_capacity(read_plan.vnode_ids.len()); + let key_range = read_plan.key_range.unwrap(); + let pin_version = PinnedVersion::new(read_plan.version.unwrap(), unbounded_channel().0); + + for vnode in read_plan.vnode_ids { let stream = reader .iter( - table_key_range_from_prost(read_plan.key_range.unwrap()), + table_key_range_from_prost( + VirtualNode::from_index(vnode as usize), + key_range.clone(), + ), read_plan.epoch, ReadOptions { prefix_hint: None, @@ -134,17 +150,13 @@ impl Iterator { table_id: read_plan.table_id.into(), read_version_from_backup: false, }, - ( - vec![], - vec![], - PinnedVersion::new(read_plan.version.unwrap(), unbounded_channel().0), - ), + (vec![], vec![], pin_version.clone()), ) .await?; - Ok::>>, StorageError>( - Box::pin(stream), - ) - }?; + streams.push(stream); + } + + let stream = select_all_vnode_stream(streams); Ok(Self { row_serializer: RowDeserializer::new( @@ -172,16 +184,19 @@ impl Iterator { } } -fn table_key_range_from_prost(r: KeyRange) -> TableKeyRange { +fn table_key_range_from_prost(vnode: VirtualNode, r: KeyRange) -> TableKeyRange { let map_bound = |b, v| match b { Bound::Unbounded => std::ops::Bound::Unbounded, - Bound::Included => std::ops::Bound::Included(TableKey(v)), - Bound::Excluded => std::ops::Bound::Excluded(TableKey(v)), + Bound::Included => std::ops::Bound::Included(v), + Bound::Excluded => std::ops::Bound::Excluded(v), _ => unreachable!(), }; let left_bound = r.left_bound(); let right_bound = r.right_bound(); let left = map_bound(left_bound, r.left); let right = map_bound(right_bound, r.right); - (left, right) + + let vnode_slice = vnode.to_be_bytes(); + + map_table_key_range(prefixed_range((left, right), &vnode_slice[..])) } diff --git a/src/java_binding/src/lib.rs b/src/java_binding/src/lib.rs index 4d81bf5237841..5ef98b3e12d07 100644 --- a/src/java_binding/src/lib.rs +++ b/src/java_binding/src/lib.rs @@ -15,6 +15,7 @@ #![feature(error_generic_member_access)] #![feature(provide_any)] #![feature(once_cell)] +#![feature(type_alias_impl_trait)] mod iterator; @@ -30,6 +31,7 @@ use jni::objects::{AutoArray, JClass, JObject, JString, ReleaseMode}; use jni::sys::{jboolean, jbyte, jbyteArray, jdouble, jfloat, jint, jlong, jshort}; use jni::JNIEnv; use prost::{DecodeError, Message}; +use risingwave_common::hash::VirtualNode; use risingwave_storage::error::StorageError; use thiserror::Error; use tokio::runtime::Runtime; @@ -207,6 +209,13 @@ where } } +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount( + _env: EnvParam<'_>, +) -> jint { + VirtualNode::COUNT as jint +} + #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNew<'a>( env: EnvParam<'a>,