diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 5563e01dd15b9..9e9c02a0365ba 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -244,7 +244,6 @@ create source s18 with ( ) row format avro row schema location 'file:///risingwave/avro-complex-schema.avsc' - # we cannot use confluent schema registry when connector is not kafka statement error create table s19 @@ -256,6 +255,38 @@ with ( row format avro row schema location confluent schema registry 'http://127.0.0.1:8081' +# we cannot create debezium source with generated column +statement error +create table s20 ( + id integer primary key, + first_name varchar, + last_name varchar, + email varchar, + gen_id integer as id+1 +) with ( + connector = 'kafka', + topic = 'debezium_log', + properties.bootstrap.server = '127.0.0.1:29092' +) row format debezium_json + +# create kafka source table with generated column +statement ok +create table s21 (v1 int as v2-1, v2 int, v3 int as v2+1) with ( + connector = 'kafka', + topic = 'kafka_4_partition_topic_generated_columns', + properties.bootstrap.server = '127.0.0.1:29092', + scan.startup.mode = 'earliest' +) row format json + +# create kafka source with generated column +statement ok +create source s22 (v1 int as v2-1, v2 int, v3 int as v2+1) with ( + connector = 'kafka', + topic = 'kafka_4_partition_topic_generated_columns', + properties.bootstrap.server = '127.0.0.1:29092', + scan.startup.mode = 'earliest' +) row format json + statement ok flush; @@ -422,6 +453,20 @@ select count(*) from s16 ---- 0 +query III rowsort +select * from s21; +---- +19 20 21 +20 21 22 +NULL NULL NULL + +query III rowsort +select * from s22; +---- +19 20 21 +20 21 22 +NULL NULL NULL + statement ok drop materialized view source_mv1 @@ -463,3 +508,9 @@ drop source s17 statement ok drop source s18 + +statement ok +drop table s21 + +statement ok +drop source s22 \ No newline at end of file diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 51ab2c42dbe86..46c58c46de347 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -127,7 +127,8 @@ message StreamSource { catalog.Table state_table = 2; optional uint32 row_id_index = 3; repeated plan_common.ColumnCatalog columns = 4; - repeated int32 pk_column_ids = 5; + reserved "pk_column_ids"; + reserved 5; map properties = 6; catalog.StreamSourceInfo info = 7; string source_name = 8; diff --git a/scripts/source/test_data/kafka_4_partition_topic_generated_columns.1 b/scripts/source/test_data/kafka_4_partition_topic_generated_columns.1 new file mode 100644 index 0000000000000..3d8a8ee5a2ff1 --- /dev/null +++ b/scripts/source/test_data/kafka_4_partition_topic_generated_columns.1 @@ -0,0 +1,3 @@ +{"v1": 10, "v2": 20} +{"v2": 21, "v3": "10"} +{"v1": 0} \ No newline at end of file diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 29e0d7db7cb13..ed59d93447818 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -207,7 +207,7 @@ impl ColumnDesc { Self::from_field_with_column_id(field, 0) } - pub fn is_generated_column(&self) -> bool { + pub fn is_generated(&self) -> bool { self.generated_column.is_some() } } diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 350ee7f38a975..08f7836dafb05 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -121,15 +121,9 @@ async fn test_table_materialize() -> StreamResult<()> { "fields.v1.max" => "1000", "fields.v1.seed" => "12345", )); - let pk_column_ids = vec![0]; let row_id_index: usize = 0; - let source_builder = create_source_desc_builder( - &schema, - pk_column_ids, - Some(row_id_index), - source_info, - properties, - ); + let source_builder = + create_source_desc_builder(&schema, Some(row_id_index), source_info, properties); // Ensure the source exists. let source_desc = source_builder.build().await.unwrap(); diff --git a/src/frontend/planner_test/tests/testdata/generated_columns.yaml b/src/frontend/planner_test/tests/testdata/generated_columns.yaml index 32d37c68a407a..a04fb9ff2bda3 100644 --- a/src/frontend/planner_test/tests/testdata/generated_columns.yaml +++ b/src/frontend/planner_test/tests/testdata/generated_columns.yaml @@ -9,3 +9,11 @@ └─StreamProject { exprs: [(v2 - 1:Int32) as $expr1, v2, (v2 + 1:Int32) as $expr2, _row_id] } └─StreamDml { columns: [v2, _row_id] } └─StreamSource +- name: source with generated columns + sql: | + create source s1 (v1 int as v2-1, v2 int, v3 int as v2+1) with (connector = 'kinesis') ROW FORMAT JSON;; + select v3 from s1 + batch_plan: | + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [(v2 + 1:Int32) as $expr1] } + └─BatchSource { source: "s1", columns: ["v2", "_row_id"], filter: (None, None) } diff --git a/src/frontend/src/binder/insert.rs b/src/frontend/src/binder/insert.rs index b832cd2c97cf1..300d1bd905b20 100644 --- a/src/frontend/src/binder/insert.rs +++ b/src/frontend/src/binder/insert.rs @@ -120,7 +120,7 @@ impl Binder { // TODO(yuhao): refine this if row_id is always the last column. // - // `row_id_index` in bin insert operation should rule out generated column + // `row_id_index` in insert operation should rule out generated column let row_id_index = { if let Some(row_id_index) = table_catalog.row_id_index { let mut cnt = 0; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 3aae62a5cec40..bf39b1dfe4cca 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -693,10 +693,10 @@ pub async fn handle_create_source( bind_sql_column_constraints(&session, name.clone(), &mut columns, stmt.columns)?; - if columns.iter().any(|c| c.is_generated()) { - // TODO(yuhao): allow generated columns on source + if row_id_index.is_none() && columns.iter().any(|c| c.is_generated()) { + // TODO(yuhao): allow delete from a non append only source return Err(RwError::from(ErrorCode::BindError( - "Generated columns on source has not been implemented.".to_string(), + "Generated columns are only allowed in an append only source.".to_string(), ))); } diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index c8df286c2d59d..7010b3ef77035 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -400,6 +400,14 @@ pub(crate) async fn gen_create_table_plan_with_source( bind_sql_column_constraints(session, table_name.real_value(), &mut columns, column_defs)?; + if row_id_index.is_none() && columns.iter().any(|c| c.is_generated()) { + // TODO(yuhao): allow delete from a non append only source + return Err(ErrorCode::BindError( + "Generated columns are only allowed in an append only source.".to_string(), + ) + .into()); + } + gen_table_plan_inner( context.into(), table_name, @@ -532,11 +540,7 @@ fn gen_table_plan_inner( let source_catalog = source.as_ref().map(|source| Rc::new((source).into())); let source_node: PlanRef = LogicalSource::new( source_catalog, - columns - .iter() - .map(|column| column.column_desc.clone()) - .collect_vec(), - pk_column_ids, + columns.clone(), row_id_index, false, true, @@ -544,7 +548,7 @@ fn gen_table_plan_inner( ) .into(); - let required_cols = FixedBitSet::with_capacity(source_node.schema().len()); + let required_cols = FixedBitSet::with_capacity(columns.len()); let mut plan_root = PlanRoot::new( source_node, RequiredDist::Any, @@ -572,6 +576,7 @@ fn gen_table_plan_inner( name, columns, definition, + pk_column_ids, row_id_index, append_only, watermark_descs, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index bc79cff0137f7..fb75bd964bd1d 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; // Copyright 2023 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -35,7 +36,7 @@ pub use logical_optimization::*; pub use optimizer_context::*; use plan_expr_rewriter::ConstEvalRewriter; use property::Order; -use risingwave_common::catalog::{ColumnCatalog, ConflictBehavior, Field, Schema}; +use risingwave_common::catalog::{ColumnCatalog, ColumnId, ConflictBehavior, Field, Schema}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::iter_util::ZipEqDebug; @@ -392,6 +393,7 @@ impl PlanRoot { table_name: String, columns: Vec, definition: String, + pk_column_ids: Vec, row_id_index: Option, append_only: bool, watermark_descs: Vec, @@ -433,14 +435,36 @@ impl PlanRoot { true => ConflictBehavior::NoCheck, false => ConflictBehavior::Overwrite, }; + + let pk_column_indices = { + let mut id_to_idx = HashMap::new(); + + columns.iter().enumerate().for_each(|(idx, c)| { + id_to_idx.insert(c.column_id(), idx); + }); + pk_column_ids + .iter() + .map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns. + .collect_vec() + }; + + let table_required_dist = { + let mut bitset = FixedBitSet::with_capacity(columns.len()); + for idx in &pk_column_indices { + bitset.insert(*idx); + } + RequiredDist::ShardByKey(bitset) + }; + StreamMaterialize::create_for_table( stream_plan, table_name, - self.required_dist.clone(), - self.required_order.clone(), + table_required_dist, + Order::any(), columns, definition, conflict_behavior, + pk_column_indices, row_id_index, version, ) diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index 49eb14ada6e59..e32590e3670b6 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -97,8 +97,10 @@ impl ToBatchPb for BatchSource { NodeBody::Source(SourceNode { source_id: source_catalog.id, info: Some(source_catalog.info.clone()), - columns: source_catalog - .columns + columns: self + .logical + .core + .column_catalog .iter() .map(|c| c.to_protobuf()) .collect_vec(), diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 8a650007f3458..1df2e93e971c1 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -15,14 +15,13 @@ use std::collections::HashMap; use std::rc::Rc; use derivative::Derivative; -use risingwave_common::catalog::{ColumnDesc, Field, Schema}; +use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; use super::super::utils::TableCatalogBuilder; use super::GenericPlanNode; use crate::catalog::source_catalog::SourceCatalog; -use crate::catalog::ColumnId; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::property::FunctionalDependencySet; use crate::{TableCatalog, WithOptions}; @@ -35,8 +34,7 @@ pub struct Source { pub catalog: Option>, /// NOTE(Yuanxin): Here we store column descriptions, pk column ids, and row id index for plan /// generating, even if there is no external stream source. - pub column_descs: Vec, - pub pk_col_ids: Vec, + pub column_catalog: Vec, pub row_id_index: Option, /// Whether the "SourceNode" should generate the row id column for append only source pub gen_row_id: bool, @@ -49,24 +47,16 @@ pub struct Source { impl GenericPlanNode for Source { fn schema(&self) -> Schema { - let fields = self.non_generated_columns().map(Into::into).collect(); - // let fields = self.column_descs.iter().map(Into::into).collect(); + let fields = self + .column_catalog + .iter() + .map(|c| (&c.column_desc).into()) + .collect(); Schema { fields } } fn logical_pk(&self) -> Option> { - let mut id_to_idx = HashMap::new(); - // self.column_descs.iter().filter(|c| !c.is_generated_column()).enumerate().for_each(|(idx, - // c)| { - self.non_generated_columns() - .enumerate() - .for_each(|(idx, c)| { - id_to_idx.insert(c.column_id, idx); - }); - self.pk_col_ids - .iter() - .map(|c| id_to_idx.get(c).copied()) - .collect::>>() + self.row_id_index.map(|idx| vec![idx]) } fn ctx(&self) -> OptimizerContextRef { @@ -75,12 +65,11 @@ impl GenericPlanNode for Source { fn functional_dependency(&self) -> FunctionalDependencySet { let pk_indices = self.logical_pk(); - let non_generated_columns_count = self.non_generated_columns().count(); match pk_indices { Some(pk_indices) => { - FunctionalDependencySet::with_key(non_generated_columns_count, &pk_indices) + FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices) } - None => FunctionalDependencySet::new(non_generated_columns_count), + None => FunctionalDependencySet::new(self.column_catalog.len()), } } } @@ -114,11 +103,4 @@ impl Source { builder.build(vec![], 1) } - - /// Non-generated columns - fn non_generated_columns(&self) -> impl Iterator { - self.column_descs - .iter() - .filter(|c| !c.is_generated_column()) - } } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index ae09243085921..5762a23d73786 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -18,18 +18,19 @@ use std::ops::Bound; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::rc::Rc; -use risingwave_common::catalog::{ColumnDesc, Schema}; +use itertools::Itertools; +use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema}; use risingwave_common::error::Result; use risingwave_connector::source::DataType; use risingwave_pb::plan_common::GeneratedColumnDesc; use super::stream_watermark_filter::StreamWatermarkFilter; use super::{ - generic, BatchSource, ColPrunable, ExprRewritable, LogicalFilter, LogicalProject, PlanBase, - PlanRef, PredicatePushdown, StreamRowIdGen, StreamSource, ToBatch, ToStream, + generic, BatchProject, BatchSource, ColPrunable, ExprRewritable, LogicalFilter, LogicalProject, + PlanBase, PlanRef, PredicatePushdown, StreamProject, StreamRowIdGen, StreamSource, ToBatch, + ToStream, }; use crate::catalog::source_catalog::SourceCatalog; -use crate::catalog::ColumnId; use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, InputRef}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::{ @@ -57,8 +58,7 @@ pub struct LogicalSource { impl LogicalSource { pub fn new( source_catalog: Option>, - column_descs: Vec, - pk_col_ids: Vec, + column_catalog: Vec, row_id_index: Option, gen_row_id: bool, for_table: bool, @@ -66,8 +66,7 @@ impl LogicalSource { ) -> Self { let core = generic::Source { catalog: source_catalog, - column_descs, - pk_col_ids, + column_catalog, row_id_index, gen_row_id, for_table, @@ -84,6 +83,56 @@ impl LogicalSource { } } + pub fn with_catalog( + source_catalog: Rc, + for_table: bool, + ctx: OptimizerContextRef, + ) -> Self { + let column_catalogs = source_catalog.columns.clone(); + let row_id_index = source_catalog.row_id_index; + let gen_row_id = source_catalog.append_only; + + Self::new( + Some(source_catalog), + column_catalogs, + row_id_index, + gen_row_id, + for_table, + ctx, + ) + } + + pub fn create( + source_catalog: Rc, + for_table: bool, + ctx: OptimizerContextRef, + ) -> Result { + let column_catalogs = source_catalog.columns.clone(); + let column_descs = column_catalogs + .iter() + .map(|c| &c.column_desc) + .cloned() + .collect(); + let row_id_index = source_catalog.row_id_index; + let gen_row_id = source_catalog.append_only; + + let source = Self::new( + Some(source_catalog), + column_catalogs, + row_id_index, + gen_row_id, + for_table, + ctx, + ); + + let exprs = Self::gen_optional_generated_column_project_exprs(column_descs)?; + if let Some(exprs) = exprs { + Ok(LogicalProject::new(source.into(), exprs).into()) + } else { + Ok(source.into()) + } + } + pub fn gen_optional_generated_column_project_exprs( column_descs: Vec, ) -> Result>> { @@ -129,30 +178,18 @@ impl LogicalSource { Ok(Some(exprs)) } - pub fn create( - source_catalog: Option>, - column_descs: Vec, - pk_col_ids: Vec, - row_id_index: Option, - gen_row_id: bool, - for_table: bool, - ctx: OptimizerContextRef, - ) -> Result { - let source = Self::new( - source_catalog, - column_descs.clone(), - pk_col_ids, - row_id_index, - gen_row_id, - for_table, - ctx, - ); - let exprs = Self::gen_optional_generated_column_project_exprs(column_descs)?; - if let Some(exprs) = exprs { - Ok(LogicalProject::new(source.into(), exprs).into()) - } else { - Ok(source.into()) - } + /// `row_id_index` in source node should rule out generated column + #[must_use] + fn rewrite_row_id_idx(columns: &[ColumnCatalog], row_id_index: Option) -> Option { + row_id_index.map(|idx| { + let mut cnt = 0; + for col in columns.iter().take(idx + 1) { + if col.is_generated() { + cnt += 1; + } + } + idx - cnt + }) } pub(super) fn column_names(&self) -> Vec { @@ -197,6 +234,72 @@ impl LogicalSource { kafka_timestamp_range: range, } } + + /// The columns in stream/batch source node indicate the actual columns it will produce, + /// instead of the columns defined in source catalog. The difference is generated columns. + #[must_use] + fn rewrite_to_stream_batch_source(&self) -> Self { + let column_catalog = self.core.column_catalog.clone(); + // Filter out the generated columns. + let row_id_index = Self::rewrite_row_id_idx(&column_catalog, self.core.row_id_index); + let source_column_catalogs = column_catalog + .into_iter() + .filter(|c| !c.is_generated()) + .collect_vec(); + let core = generic::Source { + catalog: self.core.catalog.clone(), + column_catalog: source_column_catalogs, + row_id_index, + gen_row_id: self.core.gen_row_id, + for_table: self.core.for_table, + ctx: self.core.ctx.clone(), + }; + let base = PlanBase::new_logical_with_core(&core); + + Self { + base, + core, + kafka_timestamp_range: self.kafka_timestamp_range, + } + } + + fn wrap_with_optional_generated_columns_stream_proj(&self) -> Result { + let column_catalogs = self.core.column_catalog.clone(); + let exprs = Self::gen_optional_generated_column_project_exprs( + column_catalogs + .iter() + .map(|c| &c.column_desc) + .cloned() + .collect_vec(), + )?; + if let Some(exprs) = exprs { + let source = StreamSource::new(self.rewrite_to_stream_batch_source()); + let logical_project = LogicalProject::new(source.into(), exprs); + Ok(StreamProject::new(logical_project).into()) + } else { + let source = StreamSource::new(self.clone()); + Ok(source.into()) + } + } + + fn wrap_with_optional_generated_columns_batch_proj(&self) -> Result { + let column_catalogs = self.core.column_catalog.clone(); + let exprs = Self::gen_optional_generated_column_project_exprs( + column_catalogs + .iter() + .map(|c| &c.column_desc) + .cloned() + .collect_vec(), + )?; + if let Some(exprs) = exprs { + let source = BatchSource::new(self.rewrite_to_stream_batch_source()); + let logical_project = LogicalProject::new(source.into(), exprs); + Ok(BatchProject::new(logical_project).into()) + } else { + let source = BatchSource::new(self.clone()); + Ok(source.into()) + } + } } impl_plan_tree_node_for_leaf! {LogicalSource} @@ -411,13 +514,20 @@ impl PredicatePushdown for LogicalSource { impl ToBatch for LogicalSource { fn to_batch(&self) -> Result { - Ok(BatchSource::new(self.clone()).into()) + let source = self.wrap_with_optional_generated_columns_batch_proj()?; + Ok(source) } } impl ToStream for LogicalSource { fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { - let mut plan: PlanRef = StreamSource::new(self.clone()).into(); + let mut plan = if self.core.for_table { + StreamSource::new(self.rewrite_to_stream_batch_source()).into() + } else { + // Create MV on source. + self.wrap_with_optional_generated_columns_stream_proj()? + }; + if let Some(catalog) = self.source_catalog() && !catalog.watermark_descs.is_empty() && !self.core.for_table{ plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into(); } diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 72d9ccb21095c..502f7ff0cc151 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -718,7 +718,6 @@ pub fn to_stream_prost_body( info: Some(me.info.clone()), row_id_index: me.row_id_index.map(|index| index as _), columns: me.columns.iter().map(|c| c.to_protobuf()).collect(), - pk_column_ids: me.pk_col_ids.iter().map(Into::into).collect(), properties: me.properties.clone().into_iter().collect(), }); PbNodeBody::Source(SourceNode { source_inner }) diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 78ecc49ed1a41..ca7589e7699ed 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -20,7 +20,7 @@ use itertools::Itertools; use risingwave_common::catalog::{ColumnCatalog, ConflictBehavior, TableId}; use risingwave_common::error::Result; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::derive::derive_columns; @@ -76,6 +76,7 @@ impl StreamMaterialize { definition, ConflictBehavior::NoCheck, None, + None, table_type, None, )?; @@ -97,6 +98,7 @@ impl StreamMaterialize { columns: Vec, definition: String, conflict_behavior: ConflictBehavior, + pk_column_indices: Vec, row_id_index: Option, version: Option, ) -> Result { @@ -109,6 +111,7 @@ impl StreamMaterialize { columns, definition, conflict_behavior, + Some(pk_column_indices), row_id_index, TableType::Table, version, @@ -126,7 +129,11 @@ impl StreamMaterialize { let required_dist = match input.distribution() { Distribution::Single => RequiredDist::single(), _ => match table_type { - TableType::Table | TableType::MaterializedView => { + TableType::Table => { + assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_)); + user_distributed_by + } + TableType::MaterializedView => { assert_matches!(user_distributed_by, RequiredDist::Any); // ensure the same pk will not shuffle to different node RequiredDist::shard_by_key(input.schema().len(), input.logical_pk()) @@ -157,6 +164,7 @@ impl StreamMaterialize { columns: Vec, definition: String, conflict_behavior: ConflictBehavior, + pk_column_indices: Option>, // Is some when create table row_id_index: Option, table_type: TableType, version: Option, @@ -169,7 +177,17 @@ impl StreamMaterialize { let append_only = input.append_only(); let watermark_columns = input.watermark_columns().clone(); - let (pk, stream_key) = derive_pk(input, user_order_by, &columns); + let (pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices { + let pk = pk_column_indices + .iter() + .map(|idx| ColumnOrder::new(*idx, OrderType::ascending())) + .collect(); + // No order by for create table, so stream key is identical to pk. + (pk, pk_column_indices) + } else { + derive_pk(input, user_order_by, &columns) + }; + let read_prefix_len_hint = stream_key.len(); Ok(TableCatalog { id: TableId::placeholder(), diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index fd7640a828bd6..9c473c9eab650 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -96,17 +96,14 @@ impl StreamNode for StreamSource { .to_internal_table_prost(), ), info: Some(source_catalog.info.clone()), - row_id_index: source_catalog.row_id_index.map(|index| index as _), - columns: source_catalog - .columns + row_id_index: self.logical.core.row_id_index.map(|index| index as _), + columns: self + .logical + .core + .column_catalog .iter() .map(|c| c.to_protobuf()) .collect_vec(), - pk_column_ids: source_catalog - .pk_col_ids - .iter() - .map(Into::into) - .collect_vec(), properties: source_catalog.properties.clone().into_iter().collect(), }); PbNodeBody::Source(SourceNode { source_inner }) diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index df442536b4a8c..6475ba3c178be 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -77,24 +77,7 @@ impl Planner { } pub(super) fn plan_source(&mut self, source: BoundSource) -> Result { - let column_descs = source - .catalog - .columns - .iter() - .map(|column| column.column_desc.clone()) - .collect_vec(); - let pk_col_ids = source.catalog.pk_col_ids.clone(); - let row_id_index = source.catalog.row_id_index; - let gen_row_id = source.catalog.append_only; - LogicalSource::create( - Some(Rc::new(source.catalog)), - column_descs, - pk_col_ids, - row_id_index, - gen_row_id, - false, - self.ctx(), - ) + Ok(LogicalSource::with_catalog(Rc::new(source.catalog), false, self.ctx()).into()) } pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result { diff --git a/src/source/src/source_desc.rs b/src/source/src/source_desc.rs index 2bcc5bfec1ab8..1ee4e126b3da6 100644 --- a/src/source/src/source_desc.rs +++ b/src/source/src/source_desc.rs @@ -37,7 +37,6 @@ pub struct SourceDesc { pub format: SourceFormat, pub columns: Vec, pub metrics: Arc, - pub pk_column_ids: Vec, } /// `FsSourceDesc` describes a stream source. @@ -47,14 +46,12 @@ pub struct FsSourceDesc { pub format: SourceFormat, pub columns: Vec, pub metrics: Arc, - pub pk_column_ids: Vec, } #[derive(Clone)] pub struct SourceDescBuilder { columns: Vec, metrics: Arc, - pk_column_ids: Vec, row_id_index: Option, properties: HashMap, source_info: PbStreamSourceInfo, @@ -67,7 +64,6 @@ impl SourceDescBuilder { pub fn new( columns: Vec, metrics: Arc, - pk_column_ids: Vec, row_id_index: Option, properties: HashMap, source_info: PbStreamSourceInfo, @@ -77,7 +73,6 @@ impl SourceDescBuilder { Self { columns, metrics, - pk_column_ids, row_id_index, properties, source_info, @@ -86,6 +81,18 @@ impl SourceDescBuilder { } } + fn column_catalogs_to_source_column_descs(&self) -> Vec { + let mut columns: Vec<_> = self + .columns + .iter() + .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap()))) + .collect(); + if let Some(row_id_index) = self.row_id_index { + columns[row_id_index].is_row_id = true; + } + columns + } + pub async fn build(self) -> Result { let format = match self.source_info.get_row_format()? { PbRowFormatType::Json => SourceFormat::Json, @@ -105,18 +112,7 @@ impl SourceDescBuilder { return Err(ProtocolError("protobuf file location not provided".to_string()).into()); } - let mut columns: Vec<_> = self - .columns - .iter() - .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap()))) - .collect(); - if let Some(row_id_index) = self.row_id_index { - columns[row_id_index].is_row_id = true; - } - assert!( - !self.pk_column_ids.is_empty(), - "source should have at least one pk column" - ); + let columns = self.column_catalogs_to_source_column_descs(); let psrser_config = SpecificParserConfig::new(format, &self.source_info, &self.properties).await?; @@ -134,7 +130,6 @@ impl SourceDescBuilder { format, columns, metrics: self.metrics, - pk_column_ids: self.pk_column_ids, }) } @@ -149,20 +144,7 @@ impl SourceDescBuilder { _ => unreachable!(), }; - let mut columns: Vec<_> = self - .columns - .iter() - .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap()))) - .collect(); - - if let Some(row_id_index) = self.row_id_index { - columns[row_id_index].is_row_id = true; - } - - assert!( - !self.pk_column_ids.is_empty(), - "source should have at least one pk column" - ); + let columns = self.column_catalogs_to_source_column_descs(); let parser_config = SpecificParserConfig::new(format, &self.source_info, &self.properties).await?; @@ -179,7 +161,6 @@ impl SourceDescBuilder { format, columns, metrics: self.metrics.clone(), - pk_column_ids: self.pk_column_ids.clone(), }) } } @@ -195,7 +176,6 @@ pub mod test_utils { pub fn create_source_desc_builder( schema: &Schema, - pk_column_ids: Vec, row_id_index: Option, source_info: StreamSourceInfo, properties: HashMap, @@ -222,7 +202,6 @@ pub mod test_utils { SourceDescBuilder { columns, metrics: Default::default(), - pk_column_ids, row_id_index, properties, source_info, diff --git a/src/source/src/table.rs b/src/source/src/table.rs index 778b33846d525..b1039ecee5838 100644 --- a/src/source/src/table.rs +++ b/src/source/src/table.rs @@ -102,7 +102,7 @@ impl TableDmlHandle { risingwave_common::util::schema_check::schema_check( self.column_descs .iter() - .filter_map(|c| (!c.is_generated_column()).then_some(&c.data_type)), + .filter_map(|c| (!c.is_generated()).then_some(&c.data_type)), chunk.columns(), ) .expect("table source write chunk schema check failed"); diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 67d33cc059a77..0af18e670acef 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -516,7 +516,6 @@ mod tests { fields: vec![Field::with_name(DataType::Int32, "sequence_int")], }; let row_id_index = None; - let pk_column_ids = vec![0]; let pk_indices = vec![0]; let source_info = StreamSourceInfo { row_format: PbRowFormatType::Native as i32, @@ -533,13 +532,8 @@ mod tests { "fields.sequence_int.start" => "11", "fields.sequence_int.end" => "11111", )); - let source_desc_builder = create_source_desc_builder( - &schema, - pk_column_ids, - row_id_index, - source_info, - properties, - ); + let source_desc_builder = + create_source_desc_builder(&schema, row_id_index, source_info, properties); let split_state_store = SourceStateTableHandler::from_table_catalog( &default_source_internal_table(0x2333), MemoryStateStore::new(), @@ -609,7 +603,6 @@ mod tests { fields: vec![Field::with_name(DataType::Int32, "v1")], }; let row_id_index = None; - let pk_column_ids = vec![0]; let pk_indices = vec![0_usize]; let source_info = StreamSourceInfo { row_format: PbRowFormatType::Native as i32, @@ -622,13 +615,8 @@ mod tests { "fields.v1.end" => "11111", )); - let source_desc_builder = create_source_desc_builder( - &schema, - pk_column_ids, - row_id_index, - source_info, - properties, - ); + let source_desc_builder = + create_source_desc_builder(&schema, row_id_index, source_info, properties); let mem_state_store = MemoryStateStore::new(); let column_ids = vec![ColumnId::from(0)]; diff --git a/src/stream/src/from_proto/source.rs b/src/stream/src/from_proto/source.rs index 2fed46c13e671..e13853a7292c8 100644 --- a/src/stream/src/from_proto/source.rs +++ b/src/stream/src/from_proto/source.rs @@ -57,7 +57,6 @@ impl ExecutorBuilder for SourceExecutorBuilder { let source_desc_builder = SourceDescBuilder::new( source.columns.clone(), params.env.source_metrics(), - source.pk_column_ids.clone(), source.row_id_index.map(|x| x as _), source.properties.clone(), source.get_info()?.clone(),