Skip to content

Commit

Permalink
feat: apply column-aware row encoding & enable schema change (risingw…
Browse files Browse the repository at this point in the history
…avelabs#8394)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Mar 9, 2023
1 parent 3bc6b46 commit 630685f
Show file tree
Hide file tree
Showing 26 changed files with 463 additions and 174 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions dashboard/proto/gen/plan_common.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 100 additions & 28 deletions e2e_test/ddl/alter_table_column.slt
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (v int);

statement ok
create materialized view mv as select * from t;

statement ok
insert into t values (1);

# Errors
statement error column .* already exists
alter table t add column v int;
Expand All @@ -30,57 +36,100 @@ alter table t add column r real;
statement ok
create materialized view mv2 as select * from t;

query IR
query IR rowsort
select v, r from t;
----
1 NULL

query TT
show create table t;
----
public.t CREATE TABLE t (v INT, r REAL)

statement ok
insert into t values (2, 2.2);

query IR rowsort
select v, r from t;
----
1 NULL
2 2.2

statement ok
alter table t add column s varchar;

statement ok
create materialized view mv3 as select * from t;

query IRT
query IRT rowsort
select v, r, s from t;
----
1 NULL NULL
2 2.2 NULL

query TT
show create table t;
----
public.t CREATE TABLE t (v INT, r REAL, s CHARACTER VARYING)

# Insert data
# TODO(#7906): alter after insert.
statement ok
insert into t values (1, 1.1, 'a');
insert into t values (3, 3.3, '3-3');

statement ok
flush;
query IRT rowsort
select v, r, s from t;
----
1 NULL NULL
2 2.2 NULL
3 3.3 3-3

# All materialized views should keep the schema when it's created.
query I
query I rowsort
select * from mv;
----
1
2
3

query IR
query IR rowsort
select * from mv2;
----
1 1.1
1 NULL
2 2.2
3 3.3

query IRT
query IRT rowsort
select * from mv3;
----
1 1.1 a
1 NULL NULL
2 2.2 NULL
3 3.3 3-3

# Clean up
statement ok
drop materialized view mv;
update t set r = 1.1, s = '1-1' where v = 1;

query IRT rowsort
select v, r, s from t where v = 1;
----
1 1.1 1-1

query IR rowsort
select * from mv2;
----
1 1.1
2 2.2
3 3.3

query IRT rowsort
select * from mv3;
----
1 1.1 1-1
2 2.2 NULL
3 3.3 3-3

# Drop column
# TODO(#4529): create mview on partial columns and test whether dropping the unrefereced column works.
statement error being referenced
alter table t drop column s;

statement ok
drop materialized view mv2;
Expand All @@ -89,34 +138,57 @@ statement ok
drop materialized view mv3;

statement ok
drop table t;
alter table t drop column r;

# Drop column
statement ok
create table t (v int, r real);
query TT
show create table t;
----
public.t CREATE TABLE t (v INT, s CHARACTER VARYING)

query IR rowsort
select v, s from t;
----
1 1-1
2 NULL
3 3-3

# Add column after dropping column, to test that the column ID is not reused.
statement ok
alter table t add column s varchar;
alter table t add column r real;

query TT
show create table t;
----
public.t CREATE TABLE t (v INT, r REAL, s CHARACTER VARYING)
public.t CREATE TABLE t (v INT, s CHARACTER VARYING, r REAL)

query ITR rowsort
select v, s, r from t;
----
1 1-1 NULL
2 NULL NULL
3 3-3 NULL

statement ok
alter table t drop column r;
insert into t values (4, '4-4', 4.4);

query TT
show create table t;
query ITR rowsort
select v, s, r from t;
----
public.t CREATE TABLE t (v INT, s CHARACTER VARYING)
1 1-1 NULL
2 NULL NULL
3 3-3 NULL
4 4-4 4.4

# TODO(#4529): create mview on partial columns and test whether dropping the unrefereced column works.
statement ok
create materialized view mv as select * from t;
update t set r = 2.2 where v = 2;

statement error being referenced
alter table t drop column s;
query ITR rowsort
select v, s, r from t;
----
1 1-1 NULL
2 NULL 2.2
3 3-3 NULL
4 4-4 4.4

# Clean up
statement ok
Expand Down
3 changes: 3 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ message StorageTableDesc {
uint32 retention_seconds = 5;
repeated uint32 value_indices = 6;
uint32 read_prefix_len_hint = 7;
// Whether the table is versioned. If `true`, column-aware row encoding will be used
// to be compatible with schema changes.
bool versioned = 8;
}

enum JoinType {
Expand Down
2 changes: 2 additions & 0 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
.map(|&k| k as usize)
.collect_vec();
let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
let versioned = table_desc.versioned;
dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(
state_store,
Expand All @@ -227,6 +228,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
table_option,
value_indices,
prefix_hint_len,
versioned,
);

let inner_side_builder = InnerSideExecutorBuilder::new(
Expand Down
2 changes: 2 additions & 0 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
.map(|&k| k as usize)
.collect_vec();
let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
let versioned = table_desc.versioned;
let scan_ranges = {
let scan_ranges = &seq_scan_node.scan_ranges;
if scan_ranges.is_empty() {
Expand Down Expand Up @@ -265,6 +266,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
table_option,
value_indices,
prefix_hint_len,
versioned,
);
Ok(Box::new(RowSeqScanExecutor::new(
table,
Expand Down
2 changes: 2 additions & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ comfy-table = "6"
crc32fast = "1"
derivative = "2"
easy-ext = "1"
either = "1"
enum-as-inner = "0.5"
fixedbitset = { version = "0.4", features = ["std"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = "0.2"
Expand Down
34 changes: 27 additions & 7 deletions src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::types::to_text::ToText;
use crate::types::{DataType, Datum, NaiveDateTimeWrapper, ToOwnedDatum};
use crate::util::hash_util::finalize_hashers;
use crate::util::iter_util::{ZipEqDebug, ZipEqFast};
use crate::util::value_encoding::serialize_datum_into;
use crate::util::value_encoding::{serialize_datum_into, ValueRowSerializer};

/// `DataChunk` is a collection of arrays with visibility mask.
#[derive(Clone, PartialEq)]
Expand Down Expand Up @@ -377,12 +377,14 @@ impl DataChunk {
DataChunk::new(columns, indexes.len())
}

/// Serialize each rows into value encoding bytes.
/// Serialize each row into value encoding bytes.
///
/// the returned vector's size is self.capacity() and for the invisible row will give a empty
/// vec<u8>
/// The returned vector's size is `self.capacity()` and for the invisible row will give a empty
/// bytes.
// Note(bugen): should we exclude the invisible rows in the output so that the caller won't need
// to handle visibility again?
pub fn serialize(&self) -> Vec<Bytes> {
match &self.vis2 {
let buffers = match &self.vis2 {
Vis::Bitmap(vis) => {
let rows_num = vis.len();
let mut buffers = vec![BytesMut::new(); rows_num];
Expand All @@ -398,7 +400,7 @@ impl DataChunk {
}
}
}
buffers.into_iter().map(BytesMut::freeze).collect_vec()
buffers
}
Vis::Compact(rows_num) => {
let mut buffers = vec![BytesMut::new(); *rows_num];
Expand All @@ -412,9 +414,27 @@ impl DataChunk {
}
}
}
buffers.into_iter().map(BytesMut::freeze).collect_vec()
buffers
}
};

buffers.into_iter().map(BytesMut::freeze).collect_vec()
}

/// Serialize each row into bytes with given serializer.
///
/// This is similar to `serialize` but it uses a custom serializer. Prefer `serialize` if
/// possible since it might be more efficient due to columnar operations.
pub fn serialize_with(&self, serializer: &impl ValueRowSerializer) -> Vec<Bytes> {
let mut results = Vec::with_capacity(self.capacity());
for row in self.rows_with_holes() {
results.push(if let Some(row) = row {
serializer.serialize(row).into()
} else {
Bytes::new()
});
}
results
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ pub struct TableDesc {

/// the column indices which could receive watermarks.
pub watermark_columns: FixedBitSet,

/// Whether the table is versioned. If `true`, column-aware row encoding will be used
/// to be compatible with schema changes.
///
/// See `version` field in `TableCatalog` for more details.
pub versioned: bool,
}

impl TableDesc {
Expand Down Expand Up @@ -79,6 +85,7 @@ impl TableDesc {
retention_seconds: self.retention_seconds,
value_indices: self.value_indices.iter().map(|&v| v as u32).collect(),
read_prefix_len_hint: self.read_prefix_len_hint as u32,
versioned: self.versioned,
}
}

Expand Down
Loading

0 comments on commit 630685f

Please sign in to comment.