Skip to content

Commit

Permalink
refactor: remove keyspace (#6406)
Browse files Browse the repository at this point in the history
* remove all functions in keyspace

* remove keyspace

* misc

* revert version.rs

* rename store to local_store in streaming table

* replace ExtractTableKeyIterator with a more general interface

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and xxchan committed Nov 19, 2022
1 parent e7fdeaa commit ed3c13c
Show file tree
Hide file tree
Showing 14 changed files with 157 additions and 376 deletions.
2 changes: 1 addition & 1 deletion docs/relational_table/relational-table-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ In this doc, we will take HashAgg with extreme state (`max`, `min`) or value sta
[Code](https://github.com/risingwavelabs/risingwave/blob/7f9ad2240712aa0cfe3edffb4535d43b42f32cc5/src/frontend/src/optimizer/plan_node/logical_agg.rs#L144)

## Table id
For all relational table states, the keyspace must start with `table_id`. This is a globally unique id allocated in meta. Meta is responsible for traversing the Plan Tree and calculating the total number of Relational Tables needed. For example, the Hash Join Operator needs 2, one for the left table and one for the right table. The number of tables needed for Agg depends on the number of agg calls.
`table_id` is a globally unique id allocated in meta for each relational table object. Meta is responsible for traversing the Plan Tree and calculating the total number of Relational Tables needed. For example, the Hash Join Operator needs 2, one for the left table and one for the right table. The number of tables needed for Agg depends on the number of agg calls.

## Value State (Sum, Count)
Query example:
Expand Down
8 changes: 4 additions & 4 deletions docs/state-store-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ Hummock consists of a manager service on the meta node, clients on worker nodes

The streaming state store has distinguished workload characteristics.

* Every streaming executor will only ***read and write its own portion of data***, which are multiple consecutive non-overlapping ranges of keys (we call it ***key space***).
* Every streaming executor will only ***read and write its own portion of data***.
* Data (generally) ***won’t be shared across nodes***, so every worker node will only read and write its own data. Therefore, every Hummock API, like `get` or `scan`, only guarantees that writes on one node can be immediately read from the same node. In some cases, if we want to read data written from other nodes, we will need to ***wait for the epoch***.
* Streaming data are ***committed in serial***. Based on the [barrier-based checkpoint algorithm](https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm), the states are persisted epoch by epoch. We can tailor the write path specifically for the epoch-based checkpoint workload.

This leads to the design of Hummock, the cloud-native KV-based streaming state store. We’ll explain concepts like “epoch”, “key space” and “barrier” in the following chapters.
This leads to the design of Hummock, the cloud-native KV-based streaming state store. We’ll explain concepts like “epoch” and “barrier” in the following chapters.

## The Hummock User API

Expand Down Expand Up @@ -119,8 +119,8 @@ For `scan`, we simply select by overlapping key range. For point get, we will fi
Hummock implements the following iterators:
- `BlockIterator`: iterates a block of an SSTable.
- `SSTableIterator`: iterates an SSTable.
- `ConcatIterator`: iterates SSTables with non-overlapping keyspaces.
- `MergeIterator`: iterates SSTables with overlapping keyspaces.
- `ConcatIterator`: iterates SSTables with non-overlapping key ranges.
- `MergeIterator`: iterates SSTables with overlapping key ranges.
- `UserIterator`: wraps internal iterators and outputs user key-value with epoch <= read epoch.

[iterators source code](https://github.com/risingwavelabs/risingwave/tree/main/src/storage/src/hummock/iterator)
Expand Down
4 changes: 0 additions & 4 deletions src/source/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,6 @@ mod tests {
use risingwave_pb::catalog::{ColumnIndex, StreamSourceInfo, TableSourceInfo};
use risingwave_pb::plan_common::ColumnCatalog;
use risingwave_pb::stream_plan::source_node::Info;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::Keyspace;

use crate::*;

Expand Down Expand Up @@ -441,8 +439,6 @@ mod tests {
let pk_column_ids = vec![1];
let info = TableSourceInfo {};

let _keyspace = Keyspace::table_root(MemoryStateStore::new(), table_id);

let mem_source_manager: TableSourceManagerRef = Arc::new(TableSourceManager::default());
let mut source_builder = SourceDescBuilder::new(
table_id,
Expand Down
5 changes: 0 additions & 5 deletions src/source/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,10 @@ mod tests {
use risingwave_common::array::{Array, I64Array, Op};
use risingwave_common::column_nonnull;
use risingwave_common::types::DataType;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::Keyspace;

use super::*;

fn new_source() -> TableSource {
let store = MemoryStateStore::new();
let _keyspace = Keyspace::table_root(store, Default::default());

TableSource::new(vec![ColumnDesc::unnamed(
ColumnId::from(0),
DataType::Int64,
Expand Down
39 changes: 9 additions & 30 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ pub(crate) mod tests {
use risingwave_storage::store::{
ReadOptions, StateStoreReadExt, StateStoreWrite, WriteOptions,
};
use risingwave_storage::Keyspace;

use crate::test_utils::{
get_test_notification_client, HummockV2MixedStateStore,
Expand Down Expand Up @@ -431,7 +430,6 @@ pub(crate) mod tests {

async fn prepare_data(
hummock_meta_client: Arc<dyn HummockMetaClient>,
keyspace: &Keyspace<HummockStorage>,
storage: &HummockStorage,
existing_table_id: u32,
keys_per_epoch: usize,
Expand All @@ -443,7 +441,7 @@ pub(crate) mod tests {
let val = Bytes::from(b"0"[..].repeat(1 << 10)); // 1024 Byte value
for idx in 0..kv_count {
epoch += 1;
let mut local = keyspace.start_write_batch(WriteOptions {
let mut local = storage.local.start_write_batch(WriteOptions {
epoch,
table_id: existing_table_id.into(),
});
Expand Down Expand Up @@ -514,16 +512,8 @@ pub(crate) mod tests {
existing_table_id,
)
.await;
let keyspace = Keyspace::table_root(storage.clone(), TableId::new(existing_table_id));

prepare_data(
hummock_meta_client.clone(),
&keyspace,
&storage,
existing_table_id,
1,
)
.await;
prepare_data(hummock_meta_client.clone(), &storage, existing_table_id, 1).await;

// Mimic dropping table
unregister_table_ids_from_compaction_group(&hummock_manager_ref, &[existing_table_id])
Expand Down Expand Up @@ -627,15 +617,14 @@ pub(crate) mod tests {
} else {
existing_table_ids
};
let keyspace = Keyspace::table_root(storage.clone(), TableId::new(table_id));
register_table_ids_to_compaction_group(
&hummock_manager_ref,
&[table_id],
StaticCompactionGroupId::StateDefault.into(),
)
.await;
epoch += 1;
let mut local = keyspace.start_write_batch(WriteOptions {
let mut local = storage.start_write_batch(WriteOptions {
epoch,
table_id: TableId::from(table_id),
});
Expand Down Expand Up @@ -789,7 +778,6 @@ pub(crate) mod tests {
let base_epoch = Epoch::now();
let mut epoch: u64 = base_epoch.0;
let millisec_interval_epoch: u64 = (1 << 16) * 100;
let keyspace = Keyspace::table_root(storage.clone(), TableId::new(existing_table_id));
register_table_ids_to_compaction_group(
&hummock_manager_ref,
&[existing_table_id],
Expand All @@ -800,7 +788,7 @@ pub(crate) mod tests {
for _ in 0..kv_count {
epoch += millisec_interval_epoch;
epoch_set.insert(epoch);
let mut local = keyspace.start_write_batch(WriteOptions {
let mut local = storage.start_write_batch(WriteOptions {
epoch,
table_id: TableId::from(existing_table_id),
});
Expand Down Expand Up @@ -963,20 +951,19 @@ pub(crate) mod tests {
let base_epoch = Epoch::now();
let mut epoch: u64 = base_epoch.0;
let millisec_interval_epoch: u64 = (1 << 16) * 100;
let keyspace = Keyspace::table_root(storage.clone(), TableId::new(existing_table_id));
register_table_ids_to_compaction_group(
&hummock_manager_ref,
&[keyspace.table_id().table_id],
&[existing_table_id],
StaticCompactionGroupId::StateDefault.into(),
)
.await;
let mut epoch_set = BTreeSet::new();
for _ in 0..kv_count {
epoch += millisec_interval_epoch;
epoch_set.insert(epoch);
let mut local = keyspace.start_write_batch(WriteOptions {
let mut local = storage.start_write_batch(WriteOptions {
epoch,
table_id: keyspace.table_id(),
table_id: TableId::new(existing_table_id),
});

let ramdom_key = [key_prefix, &rand::thread_rng().gen::<[u8; 32]>()].concat();
Expand Down Expand Up @@ -1121,16 +1108,8 @@ pub(crate) mod tests {
)
.await;

let keyspace = Keyspace::table_root(storage.clone(), TableId::new(existing_table_id));
prepare_data(
hummock_meta_client.clone(),
&keyspace,
&storage,
existing_table_id,
2,
)
.await;
let mut local = keyspace.start_write_batch(WriteOptions {
prepare_data(hummock_meta_client.clone(), &storage, existing_table_id, 2).await;
let mut local = storage.start_write_batch(WriteOptions {
epoch: 130,
table_id: existing_table_id.into(),
});
Expand Down
12 changes: 5 additions & 7 deletions src/storage/hummock_test/src/sync_point_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::compactor::{Compactor, CompactorContext};
use risingwave_storage::hummock::SstableIdManager;
use risingwave_storage::storage_value::StorageValue;
use risingwave_storage::store::{ReadOptions, WriteOptions};
use risingwave_storage::Keyspace;
use risingwave_storage::store::{ReadOptions, StateStoreWrite, WriteOptions};
use serial_test::serial;

use super::compactor_tests::tests::{
Expand Down Expand Up @@ -265,11 +264,10 @@ async fn test_syncpoints_get_in_delete_range_boundary() {
let compactor_manager = hummock_manager_ref.compactor_manager_ref_for_test();
compactor_manager.add_compactor(worker_node.id, u64::MAX);

let keyspace = Keyspace::table_root(storage.clone(), TableId::new(existing_table_id));
// 1. add sstables
let val0 = Bytes::from(b"0"[..].repeat(1 << 10)); // 1024 Byte value
let val1 = Bytes::from(b"1"[..].repeat(1 << 10)); // 1024 Byte value
let mut local = keyspace.start_write_batch(WriteOptions {
let mut local = storage.local.start_write_batch(WriteOptions {
epoch: 100,
table_id: existing_table_id.into(),
});
Expand All @@ -284,7 +282,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() {
local.ingest().await.unwrap();
flush_and_commit(&hummock_meta_client, &storage, 100).await;
compact_once(hummock_manager_ref.clone(), compact_ctx.clone()).await;
let mut local = keyspace.start_write_batch(WriteOptions {
let mut local = storage.local.start_write_batch(WriteOptions {
epoch: 101,
table_id: existing_table_id.into(),
});
Expand All @@ -294,7 +292,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() {
local.ingest().await.unwrap();
flush_and_commit(&hummock_meta_client, &storage, 101).await;
compact_once(hummock_manager_ref.clone(), compact_ctx.clone()).await;
let mut local = keyspace.start_write_batch(WriteOptions {
let mut local = storage.local.start_write_batch(WriteOptions {
epoch: 102,
table_id: existing_table_id.into(),
});
Expand All @@ -306,7 +304,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() {
// move this two file to the same level.
compact_once(hummock_manager_ref.clone(), compact_ctx.clone()).await;

let mut local = keyspace.start_write_batch(WriteOptions {
let mut local = storage.local.start_write_batch(WriteOptions {
epoch: 103,
table_id: existing_table_id.into(),
});
Expand Down
5 changes: 0 additions & 5 deletions src/storage/src/hummock/state_store_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,6 @@ impl StateStoreWrite for HummockStorageV1 {
/// * Ordered. KV pairs will be directly written to the table, so it must be ordered.
/// * Locally unique. There should not be two or more operations on the same key in one write
/// batch.
/// * Globally unique. The streaming operators should ensure that different operators won't
/// operate on the same key. The operator operating on one keyspace should always wait for all
/// changes to be committed before reading and writing new keys to the engine. That is because
/// that the table with lower epoch might be committed after a table with higher epoch has
/// been committed. If such case happens, the outcome is non-predictable.
fn ingest_batch(
&self,
kv_pairs: Vec<(Bytes, StorageValue)>,
Expand Down
Loading

0 comments on commit ed3c13c

Please sign in to comment.