From 245d8d149e1618bd6b465452ae794ddcff0f840b Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Mon, 27 Mar 2023 14:25:46 +0800 Subject: [PATCH 01/11] cast --- batch-ddl-debug-junit.xml | 7 +++++++ e2e_test/ddl/table/generated_columns.slt.part | 8 ++++---- src/frontend/src/optimizer/plan_node/logical_source.rs | 7 ++----- 3 files changed, 13 insertions(+), 9 deletions(-) create mode 100644 batch-ddl-debug-junit.xml diff --git a/batch-ddl-debug-junit.xml b/batch-ddl-debug-junit.xml new file mode 100644 index 0000000000000..353391a28802f --- /dev/null +++ b/batch-ddl-debug-junit.xml @@ -0,0 +1,7 @@ + + + + + + + diff --git a/e2e_test/ddl/table/generated_columns.slt.part b/e2e_test/ddl/table/generated_columns.slt.part index f6c0a18067838..ccc52d8efceec 100644 --- a/e2e_test/ddl/table/generated_columns.slt.part +++ b/e2e_test/ddl/table/generated_columns.slt.part @@ -1,6 +1,6 @@ # Create a table with generated columns. statement ok -create table t1 (v1 int as v2-1, v2 int, v3 int as v2+1); +create table t1 (v1 int as v2-1, v2 int, v3 int as v2+1.02, v4 double as v2 + 1.02); statement ok insert into t1 (v2) values (1), (2); @@ -8,11 +8,11 @@ insert into t1 (v2) values (1), (2); statement ok flush; -query III +query IIIR select * from t1; ---- -0 1 2 -1 2 3 +0 1 2 2.02 +1 2 3 3.02 statement ok drop table t1; diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index f582bc9eb2bfc..ae09243085921 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -18,7 +18,6 @@ use std::ops::Bound; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::rc::Rc; -use risingwave_common::bail; use risingwave_common::catalog::{ColumnDesc, Schema}; use risingwave_common::error::Result; use risingwave_connector::source::DataType; @@ -115,10 +114,8 @@ impl LogicalSource { let GeneratedColumnDesc { expr } = generated_column; // TODO(yuhao): avoid this `from_expr_proto`. let proj_expr = rewriter.rewrite_expr(ExprImpl::from_expr_proto(&expr.unwrap())?); - if proj_expr.return_type() != column_desc.data_type { - bail!("Expression return type should match the type specified for the column"); - } - exprs.push(proj_expr); + let casted_expr = proj_expr.cast_assign(column_desc.data_type)?; + exprs.push(casted_expr); } else { let input_ref = InputRef { data_type: ret_data_type, From 29f19e56edcd41f279ed4e929a900c3efdd92f3a Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Mon, 27 Mar 2023 14:33:38 +0800 Subject: [PATCH 02/11] delete file --- batch-ddl-debug-junit.xml | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 batch-ddl-debug-junit.xml diff --git a/batch-ddl-debug-junit.xml b/batch-ddl-debug-junit.xml deleted file mode 100644 index 353391a28802f..0000000000000 --- a/batch-ddl-debug-junit.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - - - - From c726b280c0a45238246beec514e24458bf289687 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 29 Mar 2023 03:52:18 +0800 Subject: [PATCH 03/11] add source support --- proto/stream_plan.proto | 3 +- src/common/src/catalog/column.rs | 2 +- src/compute/tests/integration_tests.rs | 10 +--- src/frontend/src/binder/insert.rs | 2 +- src/frontend/src/handler/create_source.rs | 6 +-- src/frontend/src/handler/create_table.rs | 13 +++-- .../src/optimizer/plan_node/batch_source.rs | 6 ++- .../src/optimizer/plan_node/generic/source.rs | 32 +++++------- .../src/optimizer/plan_node/logical_source.rs | 49 +++++++++++++++---- .../src/optimizer/plan_node/stream.rs | 1 - .../src/optimizer/plan_node/stream_source.rs | 13 ++--- src/frontend/src/planner/relation.rs | 19 +------ src/source/src/source_desc.rs | 49 ++++++------------- src/source/src/table.rs | 2 +- .../src/executor/source/source_executor.rs | 20 ++------ src/stream/src/from_proto/source.rs | 1 - 16 files changed, 99 insertions(+), 129 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index fefdf6ab635db..27628eed9c331 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -125,7 +125,8 @@ message StreamSource { catalog.Table state_table = 2; optional uint32 row_id_index = 3; repeated plan_common.ColumnCatalog columns = 4; - repeated int32 pk_column_ids = 5; + // repeated int32 pk_column_ids = 5; + reserved 5; map properties = 6; catalog.StreamSourceInfo info = 7; string source_name = 8; diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 29e0d7db7cb13..ed59d93447818 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -207,7 +207,7 @@ impl ColumnDesc { Self::from_field_with_column_id(field, 0) } - pub fn is_generated_column(&self) -> bool { + pub fn is_generated(&self) -> bool { self.generated_column.is_some() } } diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 350ee7f38a975..08f7836dafb05 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -121,15 +121,9 @@ async fn test_table_materialize() -> StreamResult<()> { "fields.v1.max" => "1000", "fields.v1.seed" => "12345", )); - let pk_column_ids = vec![0]; let row_id_index: usize = 0; - let source_builder = create_source_desc_builder( - &schema, - pk_column_ids, - Some(row_id_index), - source_info, - properties, - ); + let source_builder = + create_source_desc_builder(&schema, Some(row_id_index), source_info, properties); // Ensure the source exists. let source_desc = source_builder.build().await.unwrap(); diff --git a/src/frontend/src/binder/insert.rs b/src/frontend/src/binder/insert.rs index b832cd2c97cf1..300d1bd905b20 100644 --- a/src/frontend/src/binder/insert.rs +++ b/src/frontend/src/binder/insert.rs @@ -120,7 +120,7 @@ impl Binder { // TODO(yuhao): refine this if row_id is always the last column. // - // `row_id_index` in bin insert operation should rule out generated column + // `row_id_index` in insert operation should rule out generated column let row_id_index = { if let Some(row_id_index) = table_catalog.row_id_index { let mut cnt = 0; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 3c0f316f86b84..26f26d84f369f 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -631,10 +631,10 @@ pub async fn handle_create_source( bind_sql_column_constraints(&session, name.clone(), &mut columns, stmt.columns)?; - if columns.iter().any(|c| c.is_generated()) { - // TODO(yuhao): allow generated columns on source + if row_id_index.is_none() && columns.iter().any(|c| c.is_generated()) { + // TODO(yuhao): allow delete from a non append only source return Err(RwError::from(ErrorCode::BindError( - "Generated columns on source has not been implemented.".to_string(), + "Generated columns are only allowed in an append only source.".to_string(), ))); } diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index d6d8b22a817a6..37b34ce32b0b2 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -399,6 +399,14 @@ pub(crate) async fn gen_create_table_plan_with_source( bind_sql_column_constraints(session, table_name.real_value(), &mut columns, column_defs)?; + if row_id_index.is_none() && columns.iter().any(|c| c.is_generated()) { + // TODO(yuhao): allow delete from a non append only source + return Err(ErrorCode::BindError( + "Generated columns are only allowed in an append only source.".to_string(), + ) + .into()); + } + gen_table_plan_inner( context.into(), table_name, @@ -528,10 +536,7 @@ fn gen_table_plan_inner( let source_catalog = source.as_ref().map(|source| Rc::new((source).into())); let source_node: PlanRef = LogicalSource::new( source_catalog, - columns - .iter() - .map(|column| column.column_desc.clone()) - .collect_vec(), + columns.clone(), pk_column_ids, row_id_index, false, diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index 49eb14ada6e59..e32590e3670b6 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -97,8 +97,10 @@ impl ToBatchPb for BatchSource { NodeBody::Source(SourceNode { source_id: source_catalog.id, info: Some(source_catalog.info.clone()), - columns: source_catalog - .columns + columns: self + .logical + .core + .column_catalog .iter() .map(|c| c.to_protobuf()) .collect_vec(), diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 8a650007f3458..53b9531dcd267 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::rc::Rc; use derivative::Derivative; -use risingwave_common::catalog::{ColumnDesc, Field, Schema}; +use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; @@ -35,7 +35,7 @@ pub struct Source { pub catalog: Option>, /// NOTE(Yuanxin): Here we store column descriptions, pk column ids, and row id index for plan /// generating, even if there is no external stream source. - pub column_descs: Vec, + pub column_catalog: Vec, pub pk_col_ids: Vec, pub row_id_index: Option, /// Whether the "SourceNode" should generate the row id column for append only source @@ -49,20 +49,22 @@ pub struct Source { impl GenericPlanNode for Source { fn schema(&self) -> Schema { - let fields = self.non_generated_columns().map(Into::into).collect(); + let fields = self + .column_catalog + .iter() + .map(|c| (&c.column_desc).into()) + .collect(); // let fields = self.column_descs.iter().map(Into::into).collect(); Schema { fields } } fn logical_pk(&self) -> Option> { let mut id_to_idx = HashMap::new(); - // self.column_descs.iter().filter(|c| !c.is_generated_column()).enumerate().for_each(|(idx, + // self.column_descs.iter().filter(|c| !c.is_generated()).enumerate().for_each(|(idx, // c)| { - self.non_generated_columns() - .enumerate() - .for_each(|(idx, c)| { - id_to_idx.insert(c.column_id, idx); - }); + self.column_catalog.iter().enumerate().for_each(|(idx, c)| { + id_to_idx.insert(c.column_id(), idx); + }); self.pk_col_ids .iter() .map(|c| id_to_idx.get(c).copied()) @@ -75,12 +77,11 @@ impl GenericPlanNode for Source { fn functional_dependency(&self) -> FunctionalDependencySet { let pk_indices = self.logical_pk(); - let non_generated_columns_count = self.non_generated_columns().count(); match pk_indices { Some(pk_indices) => { - FunctionalDependencySet::with_key(non_generated_columns_count, &pk_indices) + FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices) } - None => FunctionalDependencySet::new(non_generated_columns_count), + None => FunctionalDependencySet::new(self.column_catalog.len()), } } } @@ -114,11 +115,4 @@ impl Source { builder.build(vec![], 1) } - - /// Non-generated columns - fn non_generated_columns(&self) -> impl Iterator { - self.column_descs - .iter() - .filter(|c| !c.is_generated_column()) - } } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index ae09243085921..5c568b076f5b4 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -18,7 +18,8 @@ use std::ops::Bound; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::rc::Rc; -use risingwave_common::catalog::{ColumnDesc, Schema}; +use itertools::Itertools; +use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema}; use risingwave_common::error::Result; use risingwave_connector::source::DataType; use risingwave_pb::plan_common::GeneratedColumnDesc; @@ -57,16 +58,23 @@ pub struct LogicalSource { impl LogicalSource { pub fn new( source_catalog: Option>, - column_descs: Vec, + column_catalog: Vec, pk_col_ids: Vec, row_id_index: Option, gen_row_id: bool, for_table: bool, ctx: OptimizerContextRef, ) -> Self { + // Filter out the generated columns. + let row_id_index = Self::rewrite_row_id_idx(&column_catalog, row_id_index); + let source_column_catalogs = column_catalog + .into_iter() + .filter(|c| !c.is_generated()) + .collect_vec(); + let core = generic::Source { catalog: source_catalog, - column_descs, + column_catalog: source_column_catalogs, pk_col_ids, row_id_index, gen_row_id, @@ -130,23 +138,30 @@ impl LogicalSource { } pub fn create( - source_catalog: Option>, - column_descs: Vec, - pk_col_ids: Vec, - row_id_index: Option, - gen_row_id: bool, + source_catalog: Rc, for_table: bool, ctx: OptimizerContextRef, ) -> Result { + let column_catalogs = source_catalog.columns.clone(); + let column_descs = column_catalogs + .iter() + .map(|c| &c.column_desc) + .cloned() + .collect(); + let pk_col_ids = source_catalog.pk_col_ids.clone(); + let row_id_index = source_catalog.row_id_index; + let gen_row_id = source_catalog.append_only; + let source = Self::new( - source_catalog, - column_descs.clone(), + Some(source_catalog), + column_catalogs, pk_col_ids, row_id_index, gen_row_id, for_table, ctx, ); + let exprs = Self::gen_optional_generated_column_project_exprs(column_descs)?; if let Some(exprs) = exprs { Ok(LogicalProject::new(source.into(), exprs).into()) @@ -155,6 +170,20 @@ impl LogicalSource { } } + /// `row_id_index` in source node should rule out generated column + #[must_use] + fn rewrite_row_id_idx(columns: &[ColumnCatalog], row_id_index: Option) -> Option { + row_id_index.map(|idx| { + let mut cnt = 0; + for col in columns.iter().take(idx + 1) { + if col.is_generated() { + cnt += 1; + } + } + idx - cnt + }) + } + pub(super) fn column_names(&self) -> Vec { self.schema() .fields() diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 0ecb43df610ec..081d599b8c9f0 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -719,7 +719,6 @@ pub fn to_stream_prost_body( info: Some(me.info.clone()), row_id_index: me.row_id_index.map(|index| index as _), columns: me.columns.iter().map(|c| c.to_protobuf()).collect(), - pk_column_ids: me.pk_col_ids.iter().map(Into::into).collect(), properties: me.properties.clone().into_iter().collect(), }); PbNodeBody::Source(SourceNode { source_inner }) diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index de298c2a1b3e7..47a046dfa3533 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -92,17 +92,14 @@ impl StreamNode for StreamSource { .to_internal_table_prost(), ), info: Some(source_catalog.info.clone()), - row_id_index: source_catalog.row_id_index.map(|index| index as _), - columns: source_catalog - .columns + row_id_index: self.logical.core.row_id_index.map(|index| index as _), + columns: self + .logical + .core + .column_catalog .iter() .map(|c| c.to_protobuf()) .collect_vec(), - pk_column_ids: source_catalog - .pk_col_ids - .iter() - .map(Into::into) - .collect_vec(), properties: source_catalog.properties.clone().into_iter().collect(), }); PbNodeBody::Source(SourceNode { source_inner }) diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 132fda401ddd8..ade9acf805c52 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -77,24 +77,7 @@ impl Planner { } pub(super) fn plan_source(&mut self, source: BoundSource) -> Result { - let column_descs = source - .catalog - .columns - .iter() - .map(|column| column.column_desc.clone()) - .collect_vec(); - let pk_col_ids = source.catalog.pk_col_ids.clone(); - let row_id_index = source.catalog.row_id_index; - let gen_row_id = source.catalog.append_only; - LogicalSource::create( - Some(Rc::new(source.catalog)), - column_descs, - pk_col_ids, - row_id_index, - gen_row_id, - false, - self.ctx(), - ) + LogicalSource::create(Rc::new(source.catalog), false, self.ctx()) } pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result { diff --git a/src/source/src/source_desc.rs b/src/source/src/source_desc.rs index 2bcc5bfec1ab8..1ee4e126b3da6 100644 --- a/src/source/src/source_desc.rs +++ b/src/source/src/source_desc.rs @@ -37,7 +37,6 @@ pub struct SourceDesc { pub format: SourceFormat, pub columns: Vec, pub metrics: Arc, - pub pk_column_ids: Vec, } /// `FsSourceDesc` describes a stream source. @@ -47,14 +46,12 @@ pub struct FsSourceDesc { pub format: SourceFormat, pub columns: Vec, pub metrics: Arc, - pub pk_column_ids: Vec, } #[derive(Clone)] pub struct SourceDescBuilder { columns: Vec, metrics: Arc, - pk_column_ids: Vec, row_id_index: Option, properties: HashMap, source_info: PbStreamSourceInfo, @@ -67,7 +64,6 @@ impl SourceDescBuilder { pub fn new( columns: Vec, metrics: Arc, - pk_column_ids: Vec, row_id_index: Option, properties: HashMap, source_info: PbStreamSourceInfo, @@ -77,7 +73,6 @@ impl SourceDescBuilder { Self { columns, metrics, - pk_column_ids, row_id_index, properties, source_info, @@ -86,6 +81,18 @@ impl SourceDescBuilder { } } + fn column_catalogs_to_source_column_descs(&self) -> Vec { + let mut columns: Vec<_> = self + .columns + .iter() + .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap()))) + .collect(); + if let Some(row_id_index) = self.row_id_index { + columns[row_id_index].is_row_id = true; + } + columns + } + pub async fn build(self) -> Result { let format = match self.source_info.get_row_format()? { PbRowFormatType::Json => SourceFormat::Json, @@ -105,18 +112,7 @@ impl SourceDescBuilder { return Err(ProtocolError("protobuf file location not provided".to_string()).into()); } - let mut columns: Vec<_> = self - .columns - .iter() - .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap()))) - .collect(); - if let Some(row_id_index) = self.row_id_index { - columns[row_id_index].is_row_id = true; - } - assert!( - !self.pk_column_ids.is_empty(), - "source should have at least one pk column" - ); + let columns = self.column_catalogs_to_source_column_descs(); let psrser_config = SpecificParserConfig::new(format, &self.source_info, &self.properties).await?; @@ -134,7 +130,6 @@ impl SourceDescBuilder { format, columns, metrics: self.metrics, - pk_column_ids: self.pk_column_ids, }) } @@ -149,20 +144,7 @@ impl SourceDescBuilder { _ => unreachable!(), }; - let mut columns: Vec<_> = self - .columns - .iter() - .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap()))) - .collect(); - - if let Some(row_id_index) = self.row_id_index { - columns[row_id_index].is_row_id = true; - } - - assert!( - !self.pk_column_ids.is_empty(), - "source should have at least one pk column" - ); + let columns = self.column_catalogs_to_source_column_descs(); let parser_config = SpecificParserConfig::new(format, &self.source_info, &self.properties).await?; @@ -179,7 +161,6 @@ impl SourceDescBuilder { format, columns, metrics: self.metrics.clone(), - pk_column_ids: self.pk_column_ids.clone(), }) } } @@ -195,7 +176,6 @@ pub mod test_utils { pub fn create_source_desc_builder( schema: &Schema, - pk_column_ids: Vec, row_id_index: Option, source_info: StreamSourceInfo, properties: HashMap, @@ -222,7 +202,6 @@ pub mod test_utils { SourceDescBuilder { columns, metrics: Default::default(), - pk_column_ids, row_id_index, properties, source_info, diff --git a/src/source/src/table.rs b/src/source/src/table.rs index 778b33846d525..b1039ecee5838 100644 --- a/src/source/src/table.rs +++ b/src/source/src/table.rs @@ -102,7 +102,7 @@ impl TableDmlHandle { risingwave_common::util::schema_check::schema_check( self.column_descs .iter() - .filter_map(|c| (!c.is_generated_column()).then_some(&c.data_type)), + .filter_map(|c| (!c.is_generated()).then_some(&c.data_type)), chunk.columns(), ) .expect("table source write chunk schema check failed"); diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 8d457c85b24bd..925956ba0bcc5 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -516,7 +516,6 @@ mod tests { fields: vec![Field::with_name(DataType::Int32, "sequence_int")], }; let row_id_index = None; - let pk_column_ids = vec![0]; let pk_indices = vec![0]; let source_info = StreamSourceInfo { row_format: PbRowFormatType::Native as i32, @@ -533,13 +532,8 @@ mod tests { "fields.sequence_int.start" => "11", "fields.sequence_int.end" => "11111", )); - let source_desc_builder = create_source_desc_builder( - &schema, - pk_column_ids, - row_id_index, - source_info, - properties, - ); + let source_desc_builder = + create_source_desc_builder(&schema, row_id_index, source_info, properties); let split_state_store = SourceStateTableHandler::from_table_catalog( &default_source_internal_table(0x2333), MemoryStateStore::new(), @@ -608,7 +602,6 @@ mod tests { fields: vec![Field::with_name(DataType::Int32, "v1")], }; let row_id_index = None; - let pk_column_ids = vec![0]; let pk_indices = vec![0_usize]; let source_info = StreamSourceInfo { row_format: PbRowFormatType::Native as i32, @@ -621,13 +614,8 @@ mod tests { "fields.v1.end" => "11111", )); - let source_desc_builder = create_source_desc_builder( - &schema, - pk_column_ids, - row_id_index, - source_info, - properties, - ); + let source_desc_builder = + create_source_desc_builder(&schema, row_id_index, source_info, properties); let mem_state_store = MemoryStateStore::new(); let column_ids = vec![ColumnId::from(0)]; diff --git a/src/stream/src/from_proto/source.rs b/src/stream/src/from_proto/source.rs index 2fed46c13e671..e13853a7292c8 100644 --- a/src/stream/src/from_proto/source.rs +++ b/src/stream/src/from_proto/source.rs @@ -57,7 +57,6 @@ impl ExecutorBuilder for SourceExecutorBuilder { let source_desc_builder = SourceDescBuilder::new( source.columns.clone(), params.env.source_metrics(), - source.pk_column_ids.clone(), source.row_id_index.map(|x| x as _), source.properties.clone(), source.get_info()?.clone(), From d4c337b15f7fd1e861696585371345b041e6f857 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 29 Mar 2023 14:09:33 +0800 Subject: [PATCH 04/11] test --- e2e_test/source/basic/kafka.slt | 34 ++++++++++++++++++- ...afka_4_partition_topic_generated_columns.1 | 3 ++ 2 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 scripts/source/test_data/kafka_4_partition_topic_generated_columns.1 diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 5563e01dd15b9..f3792d0333e37 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -244,7 +244,6 @@ create source s18 with ( ) row format avro row schema location 'file:///risingwave/avro-complex-schema.avsc' - # we cannot use confluent schema registry when connector is not kafka statement error create table s19 @@ -256,6 +255,29 @@ with ( row format avro row schema location confluent schema registry 'http://127.0.0.1:8081' +# we cannot create debezium source with with generated column +statement error +create table s20 ( + id integer primary key, + first_name varchar, + last_name varchar, + email varchar, + gen_id integer as id+1 +) with ( + connector = 'kafka', + topic = 'debezium_log', + properties.bootstrap.server = '127.0.0.1:29092' +) row format debezium_json + +# create kafka source without with generated column +statement ok +create table s21 (v1 int as v2-1, v2 int, v3 int as v2+1) with ( + connector = 'kafka', + topic = 'kafka_4_partition_topic_generated_columns', + properties.bootstrap.server = '127.0.0.1:29092', + scan.startup.mode = 'latest' +) row format json + statement ok flush; @@ -422,6 +444,13 @@ select count(*) from s16 ---- 0 +query III rowsort +select * from s21; +---- +19 20 21 +20 21 22 +NULL NULL NULL + statement ok drop materialized view source_mv1 @@ -463,3 +492,6 @@ drop source s17 statement ok drop source s18 + +statement ok +drop table s21 diff --git a/scripts/source/test_data/kafka_4_partition_topic_generated_columns.1 b/scripts/source/test_data/kafka_4_partition_topic_generated_columns.1 new file mode 100644 index 0000000000000..3d8a8ee5a2ff1 --- /dev/null +++ b/scripts/source/test_data/kafka_4_partition_topic_generated_columns.1 @@ -0,0 +1,3 @@ +{"v1": 10, "v2": 20} +{"v2": 21, "v3": "10"} +{"v1": 0} \ No newline at end of file From bb649fd6d6b728d928c58eec6a0171791fa48d86 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 29 Mar 2023 17:31:54 +0800 Subject: [PATCH 05/11] delete comment --- src/frontend/src/optimizer/plan_node/generic/source.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 53b9531dcd267..d069cc9c90940 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -54,14 +54,12 @@ impl GenericPlanNode for Source { .iter() .map(|c| (&c.column_desc).into()) .collect(); - // let fields = self.column_descs.iter().map(Into::into).collect(); Schema { fields } } fn logical_pk(&self) -> Option> { let mut id_to_idx = HashMap::new(); - // self.column_descs.iter().filter(|c| !c.is_generated()).enumerate().for_each(|(idx, - // c)| { + self.column_catalog.iter().enumerate().for_each(|(idx, c)| { id_to_idx.insert(c.column_id(), idx); }); From 49f3e2e2584526dfb201771578192535b9945a52 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 30 Mar 2023 03:22:42 +0800 Subject: [PATCH 06/11] fix --- .../optimizer/plan_node/generic/project.rs | 45 ++++- .../optimizer/plan_node/logical_project.rs | 7 + .../src/optimizer/plan_node/logical_source.rs | 179 +++++++++++++----- src/frontend/src/planner/relation.rs | 2 +- 4 files changed, 177 insertions(+), 56 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/generic/project.rs b/src/frontend/src/optimizer/plan_node/generic/project.rs index 0955ebe4fffed..47f2cc29e40b9 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project.rs @@ -49,6 +49,8 @@ fn check_expr_type(expr: &ExprImpl) -> std::result::Result<(), &'static str> { pub struct Project { pub exprs: Vec, pub input: PlanRef, + pub pk_indices: Option>, + pub o2i_col_mapping: ColIndexMapping, // we need some check when construct the `Project::new` _private: (), } @@ -119,15 +121,41 @@ impl GenericPlanNode for Project { impl Project { pub fn new(exprs: Vec, input: PlanRef) -> Self { + Self::new_inner(exprs, input, None) + } + + fn new_inner(exprs: Vec, input: PlanRef, pk_indices: Option>) -> Self { for expr in &exprs { assert_input_ref!(expr, input.schema().fields().len()); check_expr_type(expr) .map_err(|expr| format!("{expr} should not in Project operator")) .unwrap(); } + + let input_len = input.schema().len(); + let mut map = vec![None; exprs.len()]; + for (i, expr) in exprs.iter().enumerate() { + map[i] = match expr { + ExprImpl::InputRef(input) => Some(input.index()), + _ => None, + } + } + let o2i_col_mapping = ColIndexMapping::with_target_size(map, input_len); + + let i2o = o2i_col_mapping.inverse(); + let pk_indices = pk_indices.or_else(|| { + input + .logical_pk() + .iter() + .map(|pk_col| i2o.try_map(*pk_col)) + .collect::>>() + }); + Project { exprs, input, + o2i_col_mapping, + pk_indices, _private: (), } } @@ -173,6 +201,12 @@ impl Project { Self::new(exprs, input) } + /// Creates a `Project` with given pk column indices, + /// overwrite the one calculated by `generic::Project`. + pub fn with_pk_indices(input: PlanRef, exprs: Vec, pk_indices: &[usize]) -> Self { + Self::new_inner(exprs, input, Some(pk_indices.to_vec())) + } + pub fn decompose(self) -> (Vec, PlanRef) { (self.exprs, self.input) } @@ -212,16 +246,7 @@ impl Project { } pub fn o2i_col_mapping(&self) -> ColIndexMapping { - let exprs = &self.exprs; - let input_len = self.input.schema().len(); - let mut map = vec![None; exprs.len()]; - for (i, expr) in exprs.iter().enumerate() { - map[i] = match expr { - ExprImpl::InputRef(input) => Some(input.index()), - _ => None, - } - } - ColIndexMapping::with_target_size(map, input_len) + self.o2i_col_mapping.clone() } /// get the Mapping of columnIndex from input column index to output column index,if a input diff --git a/src/frontend/src/optimizer/plan_node/logical_project.rs b/src/frontend/src/optimizer/plan_node/logical_project.rs index a46757accb62c..9810e234f634d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project.rs @@ -71,6 +71,13 @@ impl LogicalProject { Self::with_core(generic::Project::with_mapping(input, mapping)) } + /// Creates a `LogicalProject` with given pk column indices, + /// overwrite the one calculated by `generic::Project` from `exprs`. + pub fn with_pk_indices(input: PlanRef, exprs: Vec, pk_indices: &[usize]) -> Self { + let core = generic::Project::with_pk_indices(input, exprs, pk_indices); + Self::with_core(core) + } + /// Creates a `LogicalProject` which select some columns from the input. pub fn with_out_fields(input: PlanRef, out_fields: &FixedBitSet) -> Self { Self::with_core(generic::Project::with_out_fields(input, out_fields)) diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 5c568b076f5b4..f2422feeea655 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -26,8 +26,9 @@ use risingwave_pb::plan_common::GeneratedColumnDesc; use super::stream_watermark_filter::StreamWatermarkFilter; use super::{ - generic, BatchSource, ColPrunable, ExprRewritable, LogicalFilter, LogicalProject, PlanBase, - PlanRef, PredicatePushdown, StreamRowIdGen, StreamSource, ToBatch, ToStream, + generic, BatchProject, BatchSource, ColPrunable, ExprRewritable, LogicalFilter, LogicalProject, + PlanBase, PlanRef, PredicatePushdown, StreamProject, StreamRowIdGen, StreamSource, ToBatch, + ToStream, }; use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::ColumnId; @@ -65,16 +66,9 @@ impl LogicalSource { for_table: bool, ctx: OptimizerContextRef, ) -> Self { - // Filter out the generated columns. - let row_id_index = Self::rewrite_row_id_idx(&column_catalog, row_id_index); - let source_column_catalogs = column_catalog - .into_iter() - .filter(|c| !c.is_generated()) - .collect_vec(); - let core = generic::Source { catalog: source_catalog, - column_catalog: source_column_catalogs, + column_catalog, pk_col_ids, row_id_index, gen_row_id, @@ -92,6 +86,60 @@ impl LogicalSource { } } + pub fn with_catalog( + source_catalog: Rc, + for_table: bool, + ctx: OptimizerContextRef, + ) -> Self { + let column_catalogs = source_catalog.columns.clone(); + let pk_col_ids = source_catalog.pk_col_ids.clone(); + let row_id_index = source_catalog.row_id_index; + let gen_row_id = source_catalog.append_only; + + Self::new( + Some(source_catalog), + column_catalogs, + pk_col_ids, + row_id_index, + gen_row_id, + for_table, + ctx, + ) + } + + pub fn create( + source_catalog: Rc, + for_table: bool, + ctx: OptimizerContextRef, + ) -> Result { + let column_catalogs = source_catalog.columns.clone(); + let column_descs = column_catalogs + .iter() + .map(|c| &c.column_desc) + .cloned() + .collect(); + let pk_col_ids = source_catalog.pk_col_ids.clone(); + let row_id_index = source_catalog.row_id_index; + let gen_row_id = source_catalog.append_only; + + let source = Self::new( + Some(source_catalog), + column_catalogs, + pk_col_ids, + row_id_index, + gen_row_id, + for_table, + ctx, + ); + + let exprs = Self::gen_optional_generated_column_project_exprs(column_descs)?; + if let Some(exprs) = exprs { + Ok(LogicalProject::new(source.into(), exprs).into()) + } else { + Ok(source.into()) + } + } + pub fn gen_optional_generated_column_project_exprs( column_descs: Vec, ) -> Result>> { @@ -137,39 +185,6 @@ impl LogicalSource { Ok(Some(exprs)) } - pub fn create( - source_catalog: Rc, - for_table: bool, - ctx: OptimizerContextRef, - ) -> Result { - let column_catalogs = source_catalog.columns.clone(); - let column_descs = column_catalogs - .iter() - .map(|c| &c.column_desc) - .cloned() - .collect(); - let pk_col_ids = source_catalog.pk_col_ids.clone(); - let row_id_index = source_catalog.row_id_index; - let gen_row_id = source_catalog.append_only; - - let source = Self::new( - Some(source_catalog), - column_catalogs, - pk_col_ids, - row_id_index, - gen_row_id, - for_table, - ctx, - ); - - let exprs = Self::gen_optional_generated_column_project_exprs(column_descs)?; - if let Some(exprs) = exprs { - Ok(LogicalProject::new(source.into(), exprs).into()) - } else { - Ok(source.into()) - } - } - /// `row_id_index` in source node should rule out generated column #[must_use] fn rewrite_row_id_idx(columns: &[ColumnCatalog], row_id_index: Option) -> Option { @@ -226,6 +241,73 @@ impl LogicalSource { kafka_timestamp_range: range, } } + + #[must_use] + fn rewrite_to_stream_batch_source(&self) -> Self { + let column_catalog = self.core.column_catalog.clone(); + // Filter out the generated columns. + let row_id_index = Self::rewrite_row_id_idx(&column_catalog, self.core.row_id_index); + let source_column_catalogs = column_catalog + .into_iter() + .filter(|c| !c.is_generated()) + .collect_vec(); + let core = generic::Source { + catalog: self.core.catalog.clone(), + column_catalog: source_column_catalogs, + pk_col_ids: self.core.pk_col_ids.clone(), + row_id_index, + gen_row_id: self.core.gen_row_id, + for_table: self.core.for_table, + ctx: self.core.ctx.clone(), + }; + let base = PlanBase::new_logical_with_core(&core); + + Self { + base, + core, + kafka_timestamp_range: self.kafka_timestamp_range, + } + } + + fn wrap_with_optional_generated_columns_stream_proj(&self) -> Result { + let column_catalogs = self.core.column_catalog.clone(); + let pk_indices = self.logical_pk(); + let exprs = Self::gen_optional_generated_column_project_exprs( + column_catalogs + .iter() + .map(|c| &c.column_desc) + .cloned() + .collect_vec(), + )?; + if let Some(exprs) = exprs { + let source = StreamSource::new(self.rewrite_to_stream_batch_source()); + let logical_project = LogicalProject::with_pk_indices(source.into(), exprs, pk_indices); + Ok(StreamProject::new(logical_project).into()) + } else { + let source = StreamSource::new(self.clone()); + Ok(source.into()) + } + } + + fn wrap_with_optional_generated_columns_batch_proj(&self) -> Result { + let column_catalogs = self.core.column_catalog.clone(); + let pk_indices = self.logical_pk(); + let exprs = Self::gen_optional_generated_column_project_exprs( + column_catalogs + .iter() + .map(|c| &c.column_desc) + .cloned() + .collect_vec(), + )?; + if let Some(exprs) = exprs { + let source = BatchSource::new(self.rewrite_to_stream_batch_source()); + let logical_project = LogicalProject::with_pk_indices(source.into(), exprs, pk_indices); + Ok(BatchProject::new(logical_project).into()) + } else { + let source = BatchSource::new(self.clone()); + Ok(source.into()) + } + } } impl_plan_tree_node_for_leaf! {LogicalSource} @@ -440,13 +522,20 @@ impl PredicatePushdown for LogicalSource { impl ToBatch for LogicalSource { fn to_batch(&self) -> Result { - Ok(BatchSource::new(self.clone()).into()) + let source = self.wrap_with_optional_generated_columns_batch_proj()?; + Ok(source) } } impl ToStream for LogicalSource { fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { - let mut plan: PlanRef = StreamSource::new(self.clone()).into(); + let mut plan = if self.core.for_table { + StreamSource::new(self.rewrite_to_stream_batch_source()).into() + } else { + // Create MV on source. + self.wrap_with_optional_generated_columns_stream_proj()? + }; + if let Some(catalog) = self.source_catalog() && !catalog.watermark_descs.is_empty() && !self.core.for_table{ plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into(); } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 78efbca63c11b..6475ba3c178be 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -77,7 +77,7 @@ impl Planner { } pub(super) fn plan_source(&mut self, source: BoundSource) -> Result { - LogicalSource::create(Rc::new(source.catalog), false, self.ctx()) + Ok(LogicalSource::with_catalog(Rc::new(source.catalog), false, self.ctx()).into()) } pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result { From 15751715ac1c9cfbf211bf66b7bab35ea802d4b7 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 30 Mar 2023 16:51:08 +0800 Subject: [PATCH 07/11] fix --- src/frontend/src/optimizer/plan_node/generic/project.rs | 7 +------ src/frontend/src/optimizer/plan_node/logical_source.rs | 2 ++ 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/generic/project.rs b/src/frontend/src/optimizer/plan_node/generic/project.rs index 47f2cc29e40b9..43fdf3c3395cd 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project.rs @@ -101,12 +101,7 @@ impl GenericPlanNode for Project { } fn logical_pk(&self) -> Option> { - let i2o = self.i2o_col_mapping(); - self.input - .logical_pk() - .iter() - .map(|pk_col| i2o.try_map(*pk_col)) - .collect::>>() + self.pk_indices.clone() } fn ctx(&self) -> OptimizerContextRef { diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index f2422feeea655..b231a757dac62 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -242,6 +242,8 @@ impl LogicalSource { } } + /// The columns in stream/batch source node indicate the actual columns it will produce, + /// instead of the columns defined in source catalog. The difference is generated columns. #[must_use] fn rewrite_to_stream_batch_source(&self) -> Self { let column_catalog = self.core.column_catalog.clone(); From 2da49000b887fcc1b1ae88cfc6d375f15eff3f82 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 30 Mar 2023 18:08:10 +0800 Subject: [PATCH 08/11] fix scan.startup.mode --- e2e_test/source/basic/kafka.slt | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index f3792d0333e37..9e9c02a0365ba 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -255,7 +255,7 @@ with ( row format avro row schema location confluent schema registry 'http://127.0.0.1:8081' -# we cannot create debezium source with with generated column +# we cannot create debezium source with generated column statement error create table s20 ( id integer primary key, @@ -269,13 +269,22 @@ create table s20 ( properties.bootstrap.server = '127.0.0.1:29092' ) row format debezium_json -# create kafka source without with generated column +# create kafka source table with generated column statement ok create table s21 (v1 int as v2-1, v2 int, v3 int as v2+1) with ( connector = 'kafka', topic = 'kafka_4_partition_topic_generated_columns', properties.bootstrap.server = '127.0.0.1:29092', - scan.startup.mode = 'latest' + scan.startup.mode = 'earliest' +) row format json + +# create kafka source with generated column +statement ok +create source s22 (v1 int as v2-1, v2 int, v3 int as v2+1) with ( + connector = 'kafka', + topic = 'kafka_4_partition_topic_generated_columns', + properties.bootstrap.server = '127.0.0.1:29092', + scan.startup.mode = 'earliest' ) row format json statement ok @@ -451,6 +460,13 @@ select * from s21; 20 21 22 NULL NULL NULL +query III rowsort +select * from s22; +---- +19 20 21 +20 21 22 +NULL NULL NULL + statement ok drop materialized view source_mv1 @@ -495,3 +511,6 @@ drop source s18 statement ok drop table s21 + +statement ok +drop source s22 \ No newline at end of file From bbaee2f6b46d34b927cc853c9e206fb412dda9af Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 30 Mar 2023 20:29:10 +0800 Subject: [PATCH 09/11] fix --- proto/stream_plan.proto | 2 +- .../planner_test/tests/testdata/generated_columns.yaml | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 2c284e07a848d..cc81bbf7fb076 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -127,7 +127,7 @@ message StreamSource { catalog.Table state_table = 2; optional uint32 row_id_index = 3; repeated plan_common.ColumnCatalog columns = 4; - // repeated int32 pk_column_ids = 5; + reserved "pk_column_ids"; reserved 5; map properties = 6; catalog.StreamSourceInfo info = 7; diff --git a/src/frontend/planner_test/tests/testdata/generated_columns.yaml b/src/frontend/planner_test/tests/testdata/generated_columns.yaml index 32d37c68a407a..a04fb9ff2bda3 100644 --- a/src/frontend/planner_test/tests/testdata/generated_columns.yaml +++ b/src/frontend/planner_test/tests/testdata/generated_columns.yaml @@ -9,3 +9,11 @@ └─StreamProject { exprs: [(v2 - 1:Int32) as $expr1, v2, (v2 + 1:Int32) as $expr2, _row_id] } └─StreamDml { columns: [v2, _row_id] } └─StreamSource +- name: source with generated columns + sql: | + create source s1 (v1 int as v2-1, v2 int, v3 int as v2+1) with (connector = 'kinesis') ROW FORMAT JSON;; + select v3 from s1 + batch_plan: | + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [(v2 + 1:Int32) as $expr1] } + └─BatchSource { source: "s1", columns: ["v2", "_row_id"], filter: (None, None) } From d2ca92aece7286b0f05a8e0fe8472ba459f577aa Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Fri, 31 Mar 2023 17:29:47 +0800 Subject: [PATCH 10/11] refactor --- src/frontend/src/handler/create_table.rs | 4 +- src/frontend/src/optimizer/mod.rs | 30 +++++++++-- .../optimizer/plan_node/generic/project.rs | 52 ++++++------------- .../src/optimizer/plan_node/generic/source.rs | 12 +---- .../optimizer/plan_node/logical_project.rs | 7 --- .../src/optimizer/plan_node/logical_source.rs | 14 +---- .../optimizer/plan_node/stream_exchange.rs | 1 + .../optimizer/plan_node/stream_materialize.rs | 24 +++++++-- 8 files changed, 70 insertions(+), 74 deletions(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 37b34ce32b0b2..022d25a516016 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -537,7 +537,6 @@ fn gen_table_plan_inner( let source_node: PlanRef = LogicalSource::new( source_catalog, columns.clone(), - pk_column_ids, row_id_index, false, true, @@ -545,7 +544,7 @@ fn gen_table_plan_inner( ) .into(); - let required_cols = FixedBitSet::with_capacity(source_node.schema().len()); + let required_cols = FixedBitSet::with_capacity(columns.len()); let mut plan_root = PlanRoot::new( source_node, RequiredDist::Any, @@ -573,6 +572,7 @@ fn gen_table_plan_inner( name, columns, definition, + pk_column_ids, row_id_index, append_only, watermark_descs, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 94bd9cc842ee8..6c0c99449c859 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; // Copyright 2023 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -33,7 +34,7 @@ pub use logical_optimization::*; pub use optimizer_context::*; use plan_expr_rewriter::ConstEvalRewriter; use property::Order; -use risingwave_common::catalog::{ColumnCatalog, ConflictBehavior, Field, Schema}; +use risingwave_common::catalog::{ColumnCatalog, ColumnId, ConflictBehavior, Field, Schema}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::iter_util::ZipEqDebug; @@ -390,6 +391,7 @@ impl PlanRoot { table_name: String, columns: Vec, definition: String, + pk_column_ids: Vec, row_id_index: Option, append_only: bool, watermark_descs: Vec, @@ -431,14 +433,36 @@ impl PlanRoot { true => ConflictBehavior::NoCheck, false => ConflictBehavior::Overwrite, }; + + let pk_column_indices = { + let mut id_to_idx = HashMap::new(); + + columns.iter().enumerate().for_each(|(idx, c)| { + id_to_idx.insert(c.column_id(), idx); + }); + pk_column_ids + .iter() + .map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns. + .collect_vec() + }; + + let table_required_dist = { + let mut bitset = FixedBitSet::with_capacity(columns.len()); + for idx in &pk_column_indices { + bitset.insert(*idx); + } + RequiredDist::ShardByKey(bitset) + }; + StreamMaterialize::create_for_table( stream_plan, table_name, - self.required_dist.clone(), - self.required_order.clone(), + table_required_dist, + Order::any(), columns, definition, conflict_behavior, + pk_column_indices, row_id_index, version, ) diff --git a/src/frontend/src/optimizer/plan_node/generic/project.rs b/src/frontend/src/optimizer/plan_node/generic/project.rs index 43fdf3c3395cd..0955ebe4fffed 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project.rs @@ -49,8 +49,6 @@ fn check_expr_type(expr: &ExprImpl) -> std::result::Result<(), &'static str> { pub struct Project { pub exprs: Vec, pub input: PlanRef, - pub pk_indices: Option>, - pub o2i_col_mapping: ColIndexMapping, // we need some check when construct the `Project::new` _private: (), } @@ -101,7 +99,12 @@ impl GenericPlanNode for Project { } fn logical_pk(&self) -> Option> { - self.pk_indices.clone() + let i2o = self.i2o_col_mapping(); + self.input + .logical_pk() + .iter() + .map(|pk_col| i2o.try_map(*pk_col)) + .collect::>>() } fn ctx(&self) -> OptimizerContextRef { @@ -116,41 +119,15 @@ impl GenericPlanNode for Project { impl Project { pub fn new(exprs: Vec, input: PlanRef) -> Self { - Self::new_inner(exprs, input, None) - } - - fn new_inner(exprs: Vec, input: PlanRef, pk_indices: Option>) -> Self { for expr in &exprs { assert_input_ref!(expr, input.schema().fields().len()); check_expr_type(expr) .map_err(|expr| format!("{expr} should not in Project operator")) .unwrap(); } - - let input_len = input.schema().len(); - let mut map = vec![None; exprs.len()]; - for (i, expr) in exprs.iter().enumerate() { - map[i] = match expr { - ExprImpl::InputRef(input) => Some(input.index()), - _ => None, - } - } - let o2i_col_mapping = ColIndexMapping::with_target_size(map, input_len); - - let i2o = o2i_col_mapping.inverse(); - let pk_indices = pk_indices.or_else(|| { - input - .logical_pk() - .iter() - .map(|pk_col| i2o.try_map(*pk_col)) - .collect::>>() - }); - Project { exprs, input, - o2i_col_mapping, - pk_indices, _private: (), } } @@ -196,12 +173,6 @@ impl Project { Self::new(exprs, input) } - /// Creates a `Project` with given pk column indices, - /// overwrite the one calculated by `generic::Project`. - pub fn with_pk_indices(input: PlanRef, exprs: Vec, pk_indices: &[usize]) -> Self { - Self::new_inner(exprs, input, Some(pk_indices.to_vec())) - } - pub fn decompose(self) -> (Vec, PlanRef) { (self.exprs, self.input) } @@ -241,7 +212,16 @@ impl Project { } pub fn o2i_col_mapping(&self) -> ColIndexMapping { - self.o2i_col_mapping.clone() + let exprs = &self.exprs; + let input_len = self.input.schema().len(); + let mut map = vec![None; exprs.len()]; + for (i, expr) in exprs.iter().enumerate() { + map[i] = match expr { + ExprImpl::InputRef(input) => Some(input.index()), + _ => None, + } + } + ColIndexMapping::with_target_size(map, input_len) } /// get the Mapping of columnIndex from input column index to output column index,if a input diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index d069cc9c90940..1df2e93e971c1 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -22,7 +22,6 @@ use risingwave_common::util::sort_util::OrderType; use super::super::utils::TableCatalogBuilder; use super::GenericPlanNode; use crate::catalog::source_catalog::SourceCatalog; -use crate::catalog::ColumnId; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::property::FunctionalDependencySet; use crate::{TableCatalog, WithOptions}; @@ -36,7 +35,6 @@ pub struct Source { /// NOTE(Yuanxin): Here we store column descriptions, pk column ids, and row id index for plan /// generating, even if there is no external stream source. pub column_catalog: Vec, - pub pk_col_ids: Vec, pub row_id_index: Option, /// Whether the "SourceNode" should generate the row id column for append only source pub gen_row_id: bool, @@ -58,15 +56,7 @@ impl GenericPlanNode for Source { } fn logical_pk(&self) -> Option> { - let mut id_to_idx = HashMap::new(); - - self.column_catalog.iter().enumerate().for_each(|(idx, c)| { - id_to_idx.insert(c.column_id(), idx); - }); - self.pk_col_ids - .iter() - .map(|c| id_to_idx.get(c).copied()) - .collect::>>() + self.row_id_index.map(|idx| vec![idx]) } fn ctx(&self) -> OptimizerContextRef { diff --git a/src/frontend/src/optimizer/plan_node/logical_project.rs b/src/frontend/src/optimizer/plan_node/logical_project.rs index 9810e234f634d..a46757accb62c 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project.rs @@ -71,13 +71,6 @@ impl LogicalProject { Self::with_core(generic::Project::with_mapping(input, mapping)) } - /// Creates a `LogicalProject` with given pk column indices, - /// overwrite the one calculated by `generic::Project` from `exprs`. - pub fn with_pk_indices(input: PlanRef, exprs: Vec, pk_indices: &[usize]) -> Self { - let core = generic::Project::with_pk_indices(input, exprs, pk_indices); - Self::with_core(core) - } - /// Creates a `LogicalProject` which select some columns from the input. pub fn with_out_fields(input: PlanRef, out_fields: &FixedBitSet) -> Self { Self::with_core(generic::Project::with_out_fields(input, out_fields)) diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index b231a757dac62..5762a23d73786 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -31,7 +31,6 @@ use super::{ ToStream, }; use crate::catalog::source_catalog::SourceCatalog; -use crate::catalog::ColumnId; use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, InputRef}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::{ @@ -60,7 +59,6 @@ impl LogicalSource { pub fn new( source_catalog: Option>, column_catalog: Vec, - pk_col_ids: Vec, row_id_index: Option, gen_row_id: bool, for_table: bool, @@ -69,7 +67,6 @@ impl LogicalSource { let core = generic::Source { catalog: source_catalog, column_catalog, - pk_col_ids, row_id_index, gen_row_id, for_table, @@ -92,14 +89,12 @@ impl LogicalSource { ctx: OptimizerContextRef, ) -> Self { let column_catalogs = source_catalog.columns.clone(); - let pk_col_ids = source_catalog.pk_col_ids.clone(); let row_id_index = source_catalog.row_id_index; let gen_row_id = source_catalog.append_only; Self::new( Some(source_catalog), column_catalogs, - pk_col_ids, row_id_index, gen_row_id, for_table, @@ -118,14 +113,12 @@ impl LogicalSource { .map(|c| &c.column_desc) .cloned() .collect(); - let pk_col_ids = source_catalog.pk_col_ids.clone(); let row_id_index = source_catalog.row_id_index; let gen_row_id = source_catalog.append_only; let source = Self::new( Some(source_catalog), column_catalogs, - pk_col_ids, row_id_index, gen_row_id, for_table, @@ -256,7 +249,6 @@ impl LogicalSource { let core = generic::Source { catalog: self.core.catalog.clone(), column_catalog: source_column_catalogs, - pk_col_ids: self.core.pk_col_ids.clone(), row_id_index, gen_row_id: self.core.gen_row_id, for_table: self.core.for_table, @@ -273,7 +265,6 @@ impl LogicalSource { fn wrap_with_optional_generated_columns_stream_proj(&self) -> Result { let column_catalogs = self.core.column_catalog.clone(); - let pk_indices = self.logical_pk(); let exprs = Self::gen_optional_generated_column_project_exprs( column_catalogs .iter() @@ -283,7 +274,7 @@ impl LogicalSource { )?; if let Some(exprs) = exprs { let source = StreamSource::new(self.rewrite_to_stream_batch_source()); - let logical_project = LogicalProject::with_pk_indices(source.into(), exprs, pk_indices); + let logical_project = LogicalProject::new(source.into(), exprs); Ok(StreamProject::new(logical_project).into()) } else { let source = StreamSource::new(self.clone()); @@ -293,7 +284,6 @@ impl LogicalSource { fn wrap_with_optional_generated_columns_batch_proj(&self) -> Result { let column_catalogs = self.core.column_catalog.clone(); - let pk_indices = self.logical_pk(); let exprs = Self::gen_optional_generated_column_project_exprs( column_catalogs .iter() @@ -303,7 +293,7 @@ impl LogicalSource { )?; if let Some(exprs) = exprs { let source = BatchSource::new(self.rewrite_to_stream_batch_source()); - let logical_project = LogicalProject::with_pk_indices(source.into(), exprs, pk_indices); + let logical_project = LogicalProject::new(source.into(), exprs); Ok(BatchProject::new(logical_project).into()) } else { let source = BatchSource::new(self.clone()); diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 96843f040ca92..9e5854b9a7392 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::panic; use std::fmt; use risingwave_pb::stream_plan::stream_node::NodeBody; diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 78ecc49ed1a41..ca7589e7699ed 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -20,7 +20,7 @@ use itertools::Itertools; use risingwave_common::catalog::{ColumnCatalog, ConflictBehavior, TableId}; use risingwave_common::error::Result; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::derive::derive_columns; @@ -76,6 +76,7 @@ impl StreamMaterialize { definition, ConflictBehavior::NoCheck, None, + None, table_type, None, )?; @@ -97,6 +98,7 @@ impl StreamMaterialize { columns: Vec, definition: String, conflict_behavior: ConflictBehavior, + pk_column_indices: Vec, row_id_index: Option, version: Option, ) -> Result { @@ -109,6 +111,7 @@ impl StreamMaterialize { columns, definition, conflict_behavior, + Some(pk_column_indices), row_id_index, TableType::Table, version, @@ -126,7 +129,11 @@ impl StreamMaterialize { let required_dist = match input.distribution() { Distribution::Single => RequiredDist::single(), _ => match table_type { - TableType::Table | TableType::MaterializedView => { + TableType::Table => { + assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_)); + user_distributed_by + } + TableType::MaterializedView => { assert_matches!(user_distributed_by, RequiredDist::Any); // ensure the same pk will not shuffle to different node RequiredDist::shard_by_key(input.schema().len(), input.logical_pk()) @@ -157,6 +164,7 @@ impl StreamMaterialize { columns: Vec, definition: String, conflict_behavior: ConflictBehavior, + pk_column_indices: Option>, // Is some when create table row_id_index: Option, table_type: TableType, version: Option, @@ -169,7 +177,17 @@ impl StreamMaterialize { let append_only = input.append_only(); let watermark_columns = input.watermark_columns().clone(); - let (pk, stream_key) = derive_pk(input, user_order_by, &columns); + let (pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices { + let pk = pk_column_indices + .iter() + .map(|idx| ColumnOrder::new(*idx, OrderType::ascending())) + .collect(); + // No order by for create table, so stream key is identical to pk. + (pk, pk_column_indices) + } else { + derive_pk(input, user_order_by, &columns) + }; + let read_prefix_len_hint = stream_key.len(); Ok(TableCatalog { id: TableId::placeholder(), From 1033786e46545be1f46d570fdb46037b93412336 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 31 Mar 2023 18:23:29 +0800 Subject: [PATCH 11/11] Update src/frontend/src/optimizer/plan_node/stream_exchange.rs --- src/frontend/src/optimizer/plan_node/stream_exchange.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 9e5854b9a7392..96843f040ca92 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::panic; use std::fmt; use risingwave_pb::stream_plan::stream_node::NodeBody;