Skip to content

Commit

Permalink
move ParallelUnitId into common, move vnode related contants into com…
Browse files Browse the repository at this point in the history
…mon/types
  • Loading branch information
xx01cyx committed Jun 17, 2022
1 parent 3a95caf commit 72c1d62
Show file tree
Hide file tree
Showing 16 changed files with 33 additions and 27 deletions.
8 changes: 2 additions & 6 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use crate::array::{
use crate::error::Result;
use crate::types::{
DataType, Datum, Decimal, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper,
NaiveTimeWrapper, OrderedF32, OrderedF64, ScalarRef, ToOwnedDatum,
NaiveTimeWrapper, OrderedF32, OrderedF64, ScalarRef, ToOwnedDatum, VirtualNode,
VIRTUAL_NODE_COUNT,
};
use crate::util::hash_util::CRC32FastBuilder;

Expand All @@ -40,11 +41,6 @@ use crate::util::hash_util::CRC32FastBuilder;
/// are encoded from both `t.b, t.c`. If t.b="abc", t.c=1, the hashkey may be
/// encoded in certain format of ("abc", 1).
pub type VirtualNode = u16;
pub const VNODE_BITS: usize = 11;
pub const VIRTUAL_NODE_COUNT: usize = 1 << VNODE_BITS;
pub const VNODE_BITMAP_LEN: usize = 1 << (VNODE_BITS - 3);

/// A wrapper for u64 hash result.
#[derive(Default, Clone, Debug, PartialEq)]
pub struct HashCode(pub u64);
Expand Down
10 changes: 10 additions & 0 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ use crate::array::{
StructValue,
};

/// Parallel unit is the minimal scheduling unit.
pub type ParallelUnitId = u32;

// VirtualNode (a.k.a. VNode) is a minimal partition that a set of keys belong to. It is used for
// consistent hashing.
pub type VirtualNode = u16;
pub const VNODE_BITS: usize = 11;
pub const VIRTUAL_NODE_COUNT: usize = 1 << VNODE_BITS;
pub const VNODE_BITMAP_LEN: usize = 1 << (VNODE_BITS - 3);

pub type OrderedF32 = ordered_float::OrderedFloat<f32>;
pub type OrderedF64 = ordered_float::OrderedFloat<f64>;

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::time::{Duration, SystemTime};
use itertools::Itertools;
use risingwave_common::error::{internal_error, ErrorCode, Result};
use risingwave_common::try_match_expand;
use risingwave_common::types::ParallelUnitId;
use risingwave_pb::common::worker_node::State;
use risingwave_pb::common::{HostAddress, ParallelUnit, ParallelUnitType, WorkerNode, WorkerType};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
Expand All @@ -35,7 +36,6 @@ use crate::model::{MetadataModel, Worker, INVALID_EXPIRE_AT};
use crate::storage::MetaStore;

pub type WorkerId = u32;
pub type ParallelUnitId = u32;
pub type WorkerLocations = HashMap<WorkerId, WorkerNode>;
pub type ClusterManagerRef<S> = Arc<ClusterManager<S>>;

Expand Down
5 changes: 2 additions & 3 deletions src/meta/src/manager/hash_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use parking_lot::Mutex;
use risingwave_common::hash::{VirtualNode, VIRTUAL_NODE_COUNT};
use risingwave_common::types::{ParallelUnitId, VirtualNode, VIRTUAL_NODE_COUNT};
use risingwave_pb::common::ParallelUnit;

use super::TableId;
use crate::cluster::ParallelUnitId;
use crate::model::FragmentId;

pub type HashMappingManagerRef = Arc<HashMappingManager>;
Expand Down Expand Up @@ -222,7 +221,7 @@ impl HashMappingManagerCore {
#[cfg(test)]
mod tests {
use itertools::Itertools;
use risingwave_common::hash::VIRTUAL_NODE_COUNT;
use risingwave_common::types::VIRTUAL_NODE_COUNT;
use risingwave_pb::common::{ParallelUnit, ParallelUnitType};

use super::{HashMappingInfo, HashMappingManager};
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::error::Result;
use risingwave_common::types::ParallelUnitId;
use risingwave_pb::meta::table_fragments::{ActorState, ActorStatus, Fragment};
use risingwave_pb::meta::TableFragments as ProstTableFragments;
use risingwave_pb::stream_plan::source_node::SourceType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{FragmentType, StreamActor, StreamNode};

use super::{ActorId, FragmentId};
use crate::cluster::{ParallelUnitId, WorkerId};
use crate::cluster::WorkerId;
use crate::manager::SourceId;
use crate::model::MetadataModel;

Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/stream/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::hash::VIRTUAL_NODE_COUNT;
use risingwave_common::try_match_expand;
use risingwave_common::types::{ParallelUnitId, VIRTUAL_NODE_COUNT};
use risingwave_common::util::compress::decompress_data;
use risingwave_pb::meta::table_fragments::ActorState;
use risingwave_pb::stream_plan::{FragmentType, StreamActor};
use tokio::sync::RwLock;

use crate::cluster::{ParallelUnitId, WorkerId};
use crate::cluster::WorkerId;
use crate::hummock::compaction_group::manager::CompactionGroupManagerRef;
use crate::manager::{HashMappingManagerRef, MetaSrvEnv};
use crate::model::{ActorId, MetadataModel, TableFragments, Transactional};
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/stream/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};

use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{internal_error, Result};
use risingwave_common::hash::VNODE_BITMAP_LEN;
use risingwave_common::types::VNODE_BITMAP_LEN;
use risingwave_common::util::compress::compress_data;
use risingwave_pb::common::{ActorInfo, ParallelUnit, ParallelUnitMapping, ParallelUnitType};
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
Expand Down Expand Up @@ -288,7 +288,7 @@ mod test {
use std::time::Duration;

use itertools::Itertools;
use risingwave_common::hash::VIRTUAL_NODE_COUNT;
use risingwave_common::types::VIRTUAL_NODE_COUNT;
use risingwave_pb::common::{HostAddress, WorkerType};
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::plan_common::TableRefId;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use itertools::Itertools;
use log::{debug, info};
use risingwave_common::catalog::TableId;
use risingwave_common::error::{internal_error, Result};
use risingwave_common::hash::VIRTUAL_NODE_COUNT;
use risingwave_common::types::{ParallelUnitId, VIRTUAL_NODE_COUNT};
use risingwave_pb::catalog::Source;
use risingwave_pb::common::{ActorInfo, ParallelUnitMapping, WorkerType};
use risingwave_pb::meta::table_fragments::{ActorState, ActorStatus};
Expand All @@ -34,7 +34,7 @@ use uuid::Uuid;

use super::ScheduledLocations;
use crate::barrier::{BarrierManagerRef, Command};
use crate::cluster::{ClusterManagerRef, ParallelUnitId, WorkerId};
use crate::cluster::{ClusterManagerRef, WorkerId};
use crate::manager::{HashMappingManagerRef, MetaSrvEnv};
use crate::model::{ActorId, DispatcherId, TableFragments};
use crate::storage::MetaStore;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::BTreeMap;

use bytes::{BufMut, Bytes, BytesMut};
use risingwave_common::config::StorageConfig;
use risingwave_common::hash::{VNODE_BITMAP_LEN, VNODE_BITS};
use risingwave_common::types::{VNODE_BITMAP_LEN, VNODE_BITS};
use risingwave_hummock_sdk::key::{get_table_id, user_key};
use risingwave_pb::common::VNodeBitmap;

Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/sstable/group_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ mod tests {
use std::sync::atomic::Ordering::SeqCst;

use bytes::Buf;
use risingwave_common::hash::VirtualNode;
use risingwave_common::types::VirtualNode;

use super::*;
use crate::hummock::iterator::test_utils::mock_sstable_store;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use bytes::{Buf, BufMut, Bytes};
use risingwave_common::hash::VirtualNode;
use risingwave_common::types::VirtualNode;

use super::{HummockError, HummockResult};
use crate::storage_value::{StorageValue, ValueMeta, VALUE_META_SIZE};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/storage_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use bytes::{Buf, BufMut, Bytes};
use risingwave_common::hash::VirtualNode;
use risingwave_common::types::VirtualNode;

/// Size of value meta in bytes. Since there might exist paddings between fields in `ValueMeta`, we
/// can't simply use `size_of` to retrieve its size.
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/batch_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use futures_async_stream::try_stream;
use risingwave_common::array::{DataChunk, Op, StreamChunk};
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::{OrderedColumnDesc, Schema};
use risingwave_common::hash::VIRTUAL_NODE_COUNT;
use risingwave_common::types::VIRTUAL_NODE_COUNT;
use risingwave_common::util::hash_util::CRC32FastBuilder;
use risingwave_storage::table::cell_based_table::CellBasedTable;
use risingwave_storage::table::TableIter;
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use itertools::Itertools;
use madsim::collections::{HashMap, HashSet};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::error::{internal_error, Result};
use risingwave_common::hash::VIRTUAL_NODE_COUNT;
use risingwave_common::types::VIRTUAL_NODE_COUNT;
use risingwave_common::util::addr::{is_local_address, HostAddr};
use risingwave_common::util::hash_util::CRC32FastBuilder;
use tracing::event;
Expand Down Expand Up @@ -779,7 +779,7 @@ mod tests {
use risingwave_common::array::stream_chunk::StreamChunkTestExt;
use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder, Op};
use risingwave_common::catalog::Schema;
use risingwave_common::hash::VIRTUAL_NODE_COUNT;
use risingwave_common::types::VIRTUAL_NODE_COUNT;
use risingwave_pb::common::{ActorInfo, HostAddress};

use super::*;
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/managed_state/aggregation/extreme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use madsim::collections::BTreeMap;
use risingwave_common::array::stream_chunk::{Op, Ops};
use risingwave_common::array::{Array, ArrayImpl};
use risingwave_common::buffer::Bitmap;
use risingwave_common::hash::{HashCode, VirtualNode};
use risingwave_common::types::*;
use risingwave_common::hash::HashCode;
use risingwave_common::types::{VirtualNode, *};
use risingwave_common::util::value_encoding::{deserialize_cell, serialize_cell};
use risingwave_expr::expr::AggKind;
use risingwave_storage::storage_value::{StorageValue, ValueMeta};
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/from_proto/batch_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use itertools::Itertools;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::hash::VIRTUAL_NODE_COUNT;
use risingwave_common::types::VIRTUAL_NODE_COUNT;
use risingwave_storage::table::cell_based_table::CellBasedTable;
use risingwave_storage::{Keyspace, StateStore};

Expand Down

0 comments on commit 72c1d62

Please sign in to comment.