Skip to content

Commit

Permalink
feat(java-binding): support setting vnodes to read in java-binding re…
Browse files Browse the repository at this point in the history
…ad plan (#8307)
  • Loading branch information
wenym1 authored Mar 3, 2023
1 parent b237d7d commit d85c631
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 20 deletions.
9 changes: 9 additions & 0 deletions dashboard/proto/gen/java_binding.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/java_binding.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ message ReadPlan {

hummock.HummockVersion version = 6;
catalog.Table table_catalog = 7;

repeated uint32 vnode_ids = 8;
}
8 changes: 8 additions & 0 deletions src/java_binding/java/com_risingwave_java_binding_Binding.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.risingwave.java;

import com.risingwave.java.binding.Binding;
import com.risingwave.java.binding.Iterator;
import com.risingwave.java.binding.KeyedRow;
import com.risingwave.java.binding.rpc.MetaClient;
Expand All @@ -9,7 +10,9 @@
import com.risingwave.proto.JavaBinding.KeyRange.Bound;
import com.risingwave.proto.JavaBinding.ReadPlan;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;

Expand All @@ -34,6 +37,14 @@ public static void main(String[] args) {
metaClient.startHeartbeatLoop(Duration.ofMillis(1000));
HummockVersion version = metaClient.pinVersion();
Table tableCatalog = metaClient.getTable(dbName, tableName);

int vnodeCount = Binding.vnodeCount();

List<Integer> vnodeList = new ArrayList<>();
for (int i = 0; i < vnodeCount; i++) {
vnodeList.add(i);
}

ReadPlan readPlan =
ReadPlan.newBuilder()
.setDataDir(dataDir)
Expand All @@ -43,6 +54,7 @@ public static void main(String[] args) {
.setEpoch(version.getMaxCommittedEpoch())
.setVersion(version)
.setTableCatalog(tableCatalog)
.addAllVnodeIds(vnodeList)
.build();

try (Iterator iter = new Iterator(readPlan)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ public class Binding {
System.loadLibrary("risingwave_java_binding");
}

public static native int vnodeCount();

// iterator method
// Return a pointer to the iterator
static native long iteratorNew(byte[] readPlan);
Expand Down
55 changes: 35 additions & 20 deletions src/java_binding/src/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,39 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::Pin;
use std::sync::Arc;

use bytes::Bytes;
use futures::TryStreamExt;
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::{OwnedRow, RowDeserializer};
use risingwave_common::types::ScalarImpl;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_common::util::select_all;
use risingwave_hummock_sdk::key::{map_table_key_range, prefixed_range, TableKeyRange};
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::java_binding::key_range::Bound;
use risingwave_pb::java_binding::{KeyRange, ReadPlan};
use risingwave_storage::error::{StorageError, StorageResult};
use risingwave_storage::error::StorageResult;
use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
use risingwave_storage::hummock::store::state_store::HummockStorageIterator;
use risingwave_storage::hummock::store::version::HummockVersionReader;
use risingwave_storage::hummock::{SstableStore, TieredCache};
use risingwave_storage::monitor::HummockStateStoreMetrics;
use risingwave_storage::store::{ReadOptions, StreamTypeOfIter};
use risingwave_storage::store::{ReadOptions, StateStoreReadIterStream, StreamTypeOfIter};
use tokio::sync::mpsc::unbounded_channel;

type SelectAllIterStream = impl StateStoreReadIterStream + Unpin;

fn select_all_vnode_stream(
streams: Vec<StreamTypeOfIter<HummockStorageIterator>>,
) -> SelectAllIterStream {
select_all(streams.into_iter().map(Box::pin))
}

pub struct Iterator {
row_serializer: RowDeserializer,
stream: Pin<Box<StreamTypeOfIter<HummockStorageIterator>>>,
stream: SelectAllIterStream,
}

pub struct KeyedRow {
Expand Down Expand Up @@ -122,10 +131,17 @@ impl Iterator {
let reader =
HummockVersionReader::new(sstable_store, Arc::new(HummockStateStoreMetrics::unused()));

let stream = {
let mut streams = Vec::with_capacity(read_plan.vnode_ids.len());
let key_range = read_plan.key_range.unwrap();
let pin_version = PinnedVersion::new(read_plan.version.unwrap(), unbounded_channel().0);

for vnode in read_plan.vnode_ids {
let stream = reader
.iter(
table_key_range_from_prost(read_plan.key_range.unwrap()),
table_key_range_from_prost(
VirtualNode::from_index(vnode as usize),
key_range.clone(),
),
read_plan.epoch,
ReadOptions {
prefix_hint: None,
Expand All @@ -134,17 +150,13 @@ impl Iterator {
table_id: read_plan.table_id.into(),
read_version_from_backup: false,
},
(
vec![],
vec![],
PinnedVersion::new(read_plan.version.unwrap(), unbounded_channel().0),
),
(vec![], vec![], pin_version.clone()),
)
.await?;
Ok::<std::pin::Pin<Box<StreamTypeOfIter<HummockStorageIterator>>>, StorageError>(
Box::pin(stream),
)
}?;
streams.push(stream);
}

let stream = select_all_vnode_stream(streams);

Ok(Self {
row_serializer: RowDeserializer::new(
Expand Down Expand Up @@ -172,16 +184,19 @@ impl Iterator {
}
}

fn table_key_range_from_prost(r: KeyRange) -> TableKeyRange {
fn table_key_range_from_prost(vnode: VirtualNode, r: KeyRange) -> TableKeyRange {
let map_bound = |b, v| match b {
Bound::Unbounded => std::ops::Bound::Unbounded,
Bound::Included => std::ops::Bound::Included(TableKey(v)),
Bound::Excluded => std::ops::Bound::Excluded(TableKey(v)),
Bound::Included => std::ops::Bound::Included(v),
Bound::Excluded => std::ops::Bound::Excluded(v),
_ => unreachable!(),
};
let left_bound = r.left_bound();
let right_bound = r.right_bound();
let left = map_bound(left_bound, r.left);
let right = map_bound(right_bound, r.right);
(left, right)

let vnode_slice = vnode.to_be_bytes();

map_table_key_range(prefixed_range((left, right), &vnode_slice[..]))
}
9 changes: 9 additions & 0 deletions src/java_binding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#![feature(error_generic_member_access)]
#![feature(provide_any)]
#![feature(once_cell)]
#![feature(type_alias_impl_trait)]

mod iterator;

Expand All @@ -30,6 +31,7 @@ use jni::objects::{AutoArray, JClass, JObject, JString, ReleaseMode};
use jni::sys::{jboolean, jbyte, jbyteArray, jdouble, jfloat, jint, jlong, jshort};
use jni::JNIEnv;
use prost::{DecodeError, Message};
use risingwave_common::hash::VirtualNode;
use risingwave_storage::error::StorageError;
use thiserror::Error;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -207,6 +209,13 @@ where
}
}

#[no_mangle]
pub extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount(
_env: EnvParam<'_>,
) -> jint {
VirtualNode::COUNT as jint
}

#[no_mangle]
pub extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNew<'a>(
env: EnvParam<'a>,
Expand Down

0 comments on commit d85c631

Please sign in to comment.