Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(generated_columns): support select generated columns from source #8841

Merged
merged 14 commits into from
Mar 31, 2023
53 changes: 52 additions & 1 deletion e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ create source s18 with (
) row format avro
row schema location 'file:///risingwave/avro-complex-schema.avsc'


# we cannot use confluent schema registry when connector is not kafka
statement error
create table s19
Expand All @@ -256,6 +255,38 @@ with (
row format avro
row schema location confluent schema registry 'http://127.0.0.1:8081'

# we cannot create debezium source with generated column
statement error
create table s20 (
id integer primary key,
first_name varchar,
last_name varchar,
email varchar,
gen_id integer as id+1
) with (
connector = 'kafka',
topic = 'debezium_log',
properties.bootstrap.server = '127.0.0.1:29092'
) row format debezium_json

# create kafka source table with generated column
statement ok
create table s21 (v1 int as v2-1, v2 int, v3 int as v2+1) with (
connector = 'kafka',
topic = 'kafka_4_partition_topic_generated_columns',
properties.bootstrap.server = '127.0.0.1:29092',
scan.startup.mode = 'earliest'
) row format json

# create kafka source with generated column
statement ok
create source s22 (v1 int as v2-1, v2 int, v3 int as v2+1) with (
connector = 'kafka',
topic = 'kafka_4_partition_topic_generated_columns',
properties.bootstrap.server = '127.0.0.1:29092',
scan.startup.mode = 'earliest'
) row format json

statement ok
flush;

Expand Down Expand Up @@ -422,6 +453,20 @@ select count(*) from s16
----
0

query III rowsort
select * from s21;
----
19 20 21
20 21 22
NULL NULL NULL

query III rowsort
select * from s22;
----
19 20 21
20 21 22
NULL NULL NULL

statement ok
drop materialized view source_mv1

Expand Down Expand Up @@ -463,3 +508,9 @@ drop source s17

statement ok
drop source s18

statement ok
drop table s21

statement ok
drop source s22
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ message StreamSource {
catalog.Table state_table = 2;
optional uint32 row_id_index = 3;
repeated plan_common.ColumnCatalog columns = 4;
repeated int32 pk_column_ids = 5;
reserved "pk_column_ids";
reserved 5;
map<string, string> properties = 6;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"v1": 10, "v2": 20}
{"v2": 21, "v3": "10"}
{"v1": 0}
2 changes: 1 addition & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl ColumnDesc {
Self::from_field_with_column_id(field, 0)
}

pub fn is_generated_column(&self) -> bool {
pub fn is_generated(&self) -> bool {
self.generated_column.is_some()
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,9 @@ async fn test_table_materialize() -> StreamResult<()> {
"fields.v1.max" => "1000",
"fields.v1.seed" => "12345",
));
let pk_column_ids = vec![0];
let row_id_index: usize = 0;
let source_builder = create_source_desc_builder(
&schema,
pk_column_ids,
Some(row_id_index),
source_info,
properties,
);
let source_builder =
create_source_desc_builder(&schema, Some(row_id_index), source_info, properties);

// Ensure the source exists.
let source_desc = source_builder.build().await.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@
└─StreamProject { exprs: [(v2 - 1:Int32) as $expr1, v2, (v2 + 1:Int32) as $expr2, _row_id] }
└─StreamDml { columns: [v2, _row_id] }
└─StreamSource
- name: source with generated columns
sql: |
create source s1 (v1 int as v2-1, v2 int, v3 int as v2+1) with (connector = 'kinesis') ROW FORMAT JSON;;
select v3 from s1
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [(v2 + 1:Int32) as $expr1] }
└─BatchSource { source: "s1", columns: ["v2", "_row_id"], filter: (None, None) }
2 changes: 1 addition & 1 deletion src/frontend/src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl Binder {

// TODO(yuhao): refine this if row_id is always the last column.
//
// `row_id_index` in bin insert operation should rule out generated column
// `row_id_index` in insert operation should rule out generated column
let row_id_index = {
if let Some(row_id_index) = table_catalog.row_id_index {
let mut cnt = 0;
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,10 +631,10 @@ pub async fn handle_create_source(

bind_sql_column_constraints(&session, name.clone(), &mut columns, stmt.columns)?;

if columns.iter().any(|c| c.is_generated()) {
// TODO(yuhao): allow generated columns on source
if row_id_index.is_none() && columns.iter().any(|c| c.is_generated()) {
// TODO(yuhao): allow delete from a non append only source
return Err(RwError::from(ErrorCode::BindError(
"Generated columns on source has not been implemented.".to_string(),
"Generated columns are only allowed in an append only source.".to_string(),
)));
}

Expand Down
17 changes: 11 additions & 6 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,14 @@ pub(crate) async fn gen_create_table_plan_with_source(

bind_sql_column_constraints(session, table_name.real_value(), &mut columns, column_defs)?;

if row_id_index.is_none() && columns.iter().any(|c| c.is_generated()) {
// TODO(yuhao): allow delete from a non append only source
return Err(ErrorCode::BindError(
"Generated columns are only allowed in an append only source.".to_string(),
)
.into());
}

gen_table_plan_inner(
context.into(),
table_name,
Expand Down Expand Up @@ -528,19 +536,15 @@ fn gen_table_plan_inner(
let source_catalog = source.as_ref().map(|source| Rc::new((source).into()));
let source_node: PlanRef = LogicalSource::new(
source_catalog,
columns
.iter()
.map(|column| column.column_desc.clone())
.collect_vec(),
pk_column_ids,
columns.clone(),
row_id_index,
false,
true,
context.clone(),
)
.into();

let required_cols = FixedBitSet::with_capacity(source_node.schema().len());
let required_cols = FixedBitSet::with_capacity(columns.len());
let mut plan_root = PlanRoot::new(
source_node,
RequiredDist::Any,
Expand Down Expand Up @@ -568,6 +572,7 @@ fn gen_table_plan_inner(
name,
columns,
definition,
pk_column_ids,
row_id_index,
append_only,
watermark_descs,
Expand Down
30 changes: 27 additions & 3 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -33,7 +34,7 @@ pub use logical_optimization::*;
pub use optimizer_context::*;
use plan_expr_rewriter::ConstEvalRewriter;
use property::Order;
use risingwave_common::catalog::{ColumnCatalog, ConflictBehavior, Field, Schema};
use risingwave_common::catalog::{ColumnCatalog, ColumnId, ConflictBehavior, Field, Schema};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::iter_util::ZipEqDebug;
Expand Down Expand Up @@ -390,6 +391,7 @@ impl PlanRoot {
table_name: String,
columns: Vec<ColumnCatalog>,
definition: String,
pk_column_ids: Vec<ColumnId>,
row_id_index: Option<usize>,
append_only: bool,
watermark_descs: Vec<WatermarkDesc>,
Expand Down Expand Up @@ -431,14 +433,36 @@ impl PlanRoot {
true => ConflictBehavior::NoCheck,
false => ConflictBehavior::Overwrite,
};

let pk_column_indices = {
let mut id_to_idx = HashMap::new();

columns.iter().enumerate().for_each(|(idx, c)| {
id_to_idx.insert(c.column_id(), idx);
});
pk_column_ids
.iter()
.map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns.
.collect_vec()
};

let table_required_dist = {
let mut bitset = FixedBitSet::with_capacity(columns.len());
for idx in &pk_column_indices {
bitset.insert(*idx);
}
RequiredDist::ShardByKey(bitset)
};

StreamMaterialize::create_for_table(
stream_plan,
table_name,
self.required_dist.clone(),
self.required_order.clone(),
table_required_dist,
Order::any(),
columns,
definition,
conflict_behavior,
pk_column_indices,
row_id_index,
version,
)
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ impl ToBatchPb for BatchSource {
NodeBody::Source(SourceNode {
source_id: source_catalog.id,
info: Some(source_catalog.info.clone()),
columns: source_catalog
.columns
columns: self
.logical
.core
.column_catalog
.iter()
.map(|c| c.to_protobuf())
.collect_vec(),
Expand Down
38 changes: 10 additions & 28 deletions src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ use std::collections::HashMap;
use std::rc::Rc;

use derivative::Derivative;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;

use super::super::utils::TableCatalogBuilder;
use super::GenericPlanNode;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::ColumnId;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::{TableCatalog, WithOptions};
Expand All @@ -35,8 +34,7 @@ pub struct Source {
pub catalog: Option<Rc<SourceCatalog>>,
/// NOTE(Yuanxin): Here we store column descriptions, pk column ids, and row id index for plan
/// generating, even if there is no external stream source.
pub column_descs: Vec<ColumnDesc>,
pub pk_col_ids: Vec<ColumnId>,
pub column_catalog: Vec<ColumnCatalog>,
pub row_id_index: Option<usize>,
/// Whether the "SourceNode" should generate the row id column for append only source
pub gen_row_id: bool,
Expand All @@ -49,24 +47,16 @@ pub struct Source {

impl GenericPlanNode for Source {
fn schema(&self) -> Schema {
let fields = self.non_generated_columns().map(Into::into).collect();
// let fields = self.column_descs.iter().map(Into::into).collect();
let fields = self
.column_catalog
.iter()
.map(|c| (&c.column_desc).into())
.collect();
Schema { fields }
}

fn logical_pk(&self) -> Option<Vec<usize>> {
let mut id_to_idx = HashMap::new();
// self.column_descs.iter().filter(|c| !c.is_generated_column()).enumerate().for_each(|(idx,
// c)| {
self.non_generated_columns()
.enumerate()
.for_each(|(idx, c)| {
id_to_idx.insert(c.column_id, idx);
});
self.pk_col_ids
.iter()
.map(|c| id_to_idx.get(c).copied())
.collect::<Option<Vec<_>>>()
self.row_id_index.map(|idx| vec![idx])
}

fn ctx(&self) -> OptimizerContextRef {
Expand All @@ -75,12 +65,11 @@ impl GenericPlanNode for Source {

fn functional_dependency(&self) -> FunctionalDependencySet {
let pk_indices = self.logical_pk();
let non_generated_columns_count = self.non_generated_columns().count();
match pk_indices {
Some(pk_indices) => {
FunctionalDependencySet::with_key(non_generated_columns_count, &pk_indices)
FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices)
}
None => FunctionalDependencySet::new(non_generated_columns_count),
None => FunctionalDependencySet::new(self.column_catalog.len()),
}
}
}
Expand Down Expand Up @@ -114,11 +103,4 @@ impl Source {

builder.build(vec![], 1)
}

/// Non-generated columns
fn non_generated_columns(&self) -> impl Iterator<Item = &ColumnDesc> {
self.column_descs
.iter()
.filter(|c| !c.is_generated_column())
}
}
Loading