Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(generated_columns): support select generated columns from source #8841

Merged
merged 14 commits into from
Mar 31, 2023
34 changes: 33 additions & 1 deletion e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ create source s18 with (
) row format avro
row schema location 'file:///risingwave/avro-complex-schema.avsc'


# we cannot use confluent schema registry when connector is not kafka
statement error
create table s19
Expand All @@ -256,6 +255,29 @@ with (
row format avro
row schema location confluent schema registry 'http://127.0.0.1:8081'

# we cannot create debezium source with with generated column
statement error
create table s20 (
id integer primary key,
first_name varchar,
last_name varchar,
email varchar,
gen_id integer as id+1
) with (
connector = 'kafka',
topic = 'debezium_log',
properties.bootstrap.server = '127.0.0.1:29092'
) row format debezium_json

# create kafka source without with generated column
statement ok
create table s21 (v1 int as v2-1, v2 int, v3 int as v2+1) with (
connector = 'kafka',
topic = 'kafka_4_partition_topic_generated_columns',
properties.bootstrap.server = '127.0.0.1:29092',
scan.startup.mode = 'latest'
) row format json

statement ok
flush;

Expand Down Expand Up @@ -422,6 +444,13 @@ select count(*) from s16
----
0

query III rowsort
select * from s21;
----
19 20 21
20 21 22
NULL NULL NULL

statement ok
drop materialized view source_mv1

Expand Down Expand Up @@ -463,3 +492,6 @@ drop source s17

statement ok
drop source s18

statement ok
drop table s21
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ message StreamSource {
catalog.Table state_table = 2;
optional uint32 row_id_index = 3;
repeated plan_common.ColumnCatalog columns = 4;
repeated int32 pk_column_ids = 5;
// repeated int32 pk_column_ids = 5;
reserved 5;
map<string, string> properties = 6;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"v1": 10, "v2": 20}
{"v2": 21, "v3": "10"}
{"v1": 0}
2 changes: 1 addition & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl ColumnDesc {
Self::from_field_with_column_id(field, 0)
}

pub fn is_generated_column(&self) -> bool {
pub fn is_generated(&self) -> bool {
self.generated_column.is_some()
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,9 @@ async fn test_table_materialize() -> StreamResult<()> {
"fields.v1.max" => "1000",
"fields.v1.seed" => "12345",
));
let pk_column_ids = vec![0];
let row_id_index: usize = 0;
let source_builder = create_source_desc_builder(
&schema,
pk_column_ids,
Some(row_id_index),
source_info,
properties,
);
let source_builder =
create_source_desc_builder(&schema, Some(row_id_index), source_info, properties);

// Ensure the source exists.
let source_desc = source_builder.build().await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl Binder {

// TODO(yuhao): refine this if row_id is always the last column.
//
// `row_id_index` in bin insert operation should rule out generated column
// `row_id_index` in insert operation should rule out generated column
let row_id_index = {
if let Some(row_id_index) = table_catalog.row_id_index {
let mut cnt = 0;
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,10 +631,10 @@ pub async fn handle_create_source(

bind_sql_column_constraints(&session, name.clone(), &mut columns, stmt.columns)?;

if columns.iter().any(|c| c.is_generated()) {
// TODO(yuhao): allow generated columns on source
if row_id_index.is_none() && columns.iter().any(|c| c.is_generated()) {
// TODO(yuhao): allow delete from a non append only source
return Err(RwError::from(ErrorCode::BindError(
"Generated columns on source has not been implemented.".to_string(),
"Generated columns are only allowed in an append only source.".to_string(),
)));
}

Expand Down
13 changes: 9 additions & 4 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,14 @@ pub(crate) async fn gen_create_table_plan_with_source(

bind_sql_column_constraints(session, table_name.real_value(), &mut columns, column_defs)?;

if row_id_index.is_none() && columns.iter().any(|c| c.is_generated()) {
// TODO(yuhao): allow delete from a non append only source
return Err(ErrorCode::BindError(
"Generated columns are only allowed in an append only source.".to_string(),
)
.into());
}

gen_table_plan_inner(
context.into(),
table_name,
Expand Down Expand Up @@ -528,10 +536,7 @@ fn gen_table_plan_inner(
let source_catalog = source.as_ref().map(|source| Rc::new((source).into()));
let source_node: PlanRef = LogicalSource::new(
source_catalog,
columns
.iter()
.map(|column| column.column_desc.clone())
.collect_vec(),
columns.clone(),
pk_column_ids,
row_id_index,
false,
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ impl ToBatchPb for BatchSource {
NodeBody::Source(SourceNode {
source_id: source_catalog.id,
info: Some(source_catalog.info.clone()),
columns: source_catalog
.columns
columns: self
.logical
.core
.column_catalog
.iter()
.map(|c| c.to_protobuf())
.collect_vec(),
Expand Down
32 changes: 13 additions & 19 deletions src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::collections::HashMap;
use std::rc::Rc;

use derivative::Derivative;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;

Expand All @@ -35,7 +35,7 @@ pub struct Source {
pub catalog: Option<Rc<SourceCatalog>>,
/// NOTE(Yuanxin): Here we store column descriptions, pk column ids, and row id index for plan
/// generating, even if there is no external stream source.
pub column_descs: Vec<ColumnDesc>,
pub column_catalog: Vec<ColumnCatalog>,
pub pk_col_ids: Vec<ColumnId>,
pub row_id_index: Option<usize>,
/// Whether the "SourceNode" should generate the row id column for append only source
Expand All @@ -49,20 +49,22 @@ pub struct Source {

impl GenericPlanNode for Source {
fn schema(&self) -> Schema {
let fields = self.non_generated_columns().map(Into::into).collect();
let fields = self
.column_catalog
.iter()
.map(|c| (&c.column_desc).into())
.collect();
// let fields = self.column_descs.iter().map(Into::into).collect();
Schema { fields }
}

fn logical_pk(&self) -> Option<Vec<usize>> {
let mut id_to_idx = HashMap::new();
// self.column_descs.iter().filter(|c| !c.is_generated_column()).enumerate().for_each(|(idx,
// self.column_descs.iter().filter(|c| !c.is_generated()).enumerate().for_each(|(idx,
// c)| {
self.non_generated_columns()
.enumerate()
.for_each(|(idx, c)| {
id_to_idx.insert(c.column_id, idx);
});
self.column_catalog.iter().enumerate().for_each(|(idx, c)| {
id_to_idx.insert(c.column_id(), idx);
});
self.pk_col_ids
.iter()
.map(|c| id_to_idx.get(c).copied())
Expand All @@ -75,12 +77,11 @@ impl GenericPlanNode for Source {

fn functional_dependency(&self) -> FunctionalDependencySet {
let pk_indices = self.logical_pk();
let non_generated_columns_count = self.non_generated_columns().count();
match pk_indices {
Some(pk_indices) => {
FunctionalDependencySet::with_key(non_generated_columns_count, &pk_indices)
FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices)
}
None => FunctionalDependencySet::new(non_generated_columns_count),
None => FunctionalDependencySet::new(self.column_catalog.len()),
}
}
}
Expand Down Expand Up @@ -114,11 +115,4 @@ impl Source {

builder.build(vec![], 1)
}

/// Non-generated columns
fn non_generated_columns(&self) -> impl Iterator<Item = &ColumnDesc> {
self.column_descs
.iter()
.filter(|c| !c.is_generated_column())
}
}
49 changes: 39 additions & 10 deletions src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use std::ops::Bound;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::rc::Rc;

use risingwave_common::catalog::{ColumnDesc, Schema};
use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema};
use risingwave_common::error::Result;
use risingwave_connector::source::DataType;
use risingwave_pb::plan_common::GeneratedColumnDesc;
Expand Down Expand Up @@ -57,16 +58,23 @@ pub struct LogicalSource {
impl LogicalSource {
pub fn new(
source_catalog: Option<Rc<SourceCatalog>>,
column_descs: Vec<ColumnDesc>,
column_catalog: Vec<ColumnCatalog>,
pk_col_ids: Vec<ColumnId>,
row_id_index: Option<usize>,
gen_row_id: bool,
for_table: bool,
ctx: OptimizerContextRef,
) -> Self {
// Filter out the generated columns.
let row_id_index = Self::rewrite_row_id_idx(&column_catalog, row_id_index);
let source_column_catalogs = column_catalog
.into_iter()
.filter(|c| !c.is_generated())
.collect_vec();

let core = generic::Source {
catalog: source_catalog,
column_descs,
column_catalog: source_column_catalogs,
pk_col_ids,
row_id_index,
gen_row_id,
Expand Down Expand Up @@ -130,23 +138,30 @@ impl LogicalSource {
}

pub fn create(
source_catalog: Option<Rc<SourceCatalog>>,
column_descs: Vec<ColumnDesc>,
pk_col_ids: Vec<ColumnId>,
row_id_index: Option<usize>,
gen_row_id: bool,
source_catalog: Rc<SourceCatalog>,
for_table: bool,
ctx: OptimizerContextRef,
) -> Result<PlanRef> {
let column_catalogs = source_catalog.columns.clone();
let column_descs = column_catalogs
.iter()
.map(|c| &c.column_desc)
.cloned()
.collect();
let pk_col_ids = source_catalog.pk_col_ids.clone();
let row_id_index = source_catalog.row_id_index;
let gen_row_id = source_catalog.append_only;

let source = Self::new(
source_catalog,
column_descs.clone(),
Some(source_catalog),
column_catalogs,
pk_col_ids,
row_id_index,
gen_row_id,
for_table,
ctx,
);

let exprs = Self::gen_optional_generated_column_project_exprs(column_descs)?;
if let Some(exprs) = exprs {
Ok(LogicalProject::new(source.into(), exprs).into())
Expand All @@ -155,6 +170,20 @@ impl LogicalSource {
}
}

/// `row_id_index` in source node should rule out generated column
#[must_use]
fn rewrite_row_id_idx(columns: &[ColumnCatalog], row_id_index: Option<usize>) -> Option<usize> {
row_id_index.map(|idx| {
let mut cnt = 0;
for col in columns.iter().take(idx + 1) {
if col.is_generated() {
cnt += 1;
}
}
idx - cnt
})
}

pub(super) fn column_names(&self) -> Vec<String> {
self.schema()
.fields()
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ pub fn to_stream_prost_body(
info: Some(me.info.clone()),
row_id_index: me.row_id_index.map(|index| index as _),
columns: me.columns.iter().map(|c| c.to_protobuf()).collect(),
pk_column_ids: me.pk_col_ids.iter().map(Into::into).collect(),
properties: me.properties.clone().into_iter().collect(),
});
PbNodeBody::Source(SourceNode { source_inner })
Expand Down
13 changes: 5 additions & 8 deletions src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,14 @@ impl StreamNode for StreamSource {
.to_internal_table_prost(),
),
info: Some(source_catalog.info.clone()),
row_id_index: source_catalog.row_id_index.map(|index| index as _),
columns: source_catalog
.columns
row_id_index: self.logical.core.row_id_index.map(|index| index as _),
columns: self
.logical
.core
.column_catalog
.iter()
.map(|c| c.to_protobuf())
.collect_vec(),
pk_column_ids: source_catalog
.pk_col_ids
.iter()
.map(Into::into)
.collect_vec(),
properties: source_catalog.properties.clone().into_iter().collect(),
});
PbNodeBody::Source(SourceNode { source_inner })
Expand Down
19 changes: 1 addition & 18 deletions src/frontend/src/planner/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,7 @@ impl Planner {
}

pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
let column_descs = source
.catalog
.columns
.iter()
.map(|column| column.column_desc.clone())
.collect_vec();
let pk_col_ids = source.catalog.pk_col_ids.clone();
let row_id_index = source.catalog.row_id_index;
let gen_row_id = source.catalog.append_only;
LogicalSource::create(
Some(Rc::new(source.catalog)),
column_descs,
pk_col_ids,
row_id_index,
gen_row_id,
false,
self.ctx(),
)
LogicalSource::create(Rc::new(source.catalog), false, self.ctx())
}

pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
Expand Down
Loading