Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(generated_columns): support select generated columns from source #8841

Merged
merged 14 commits into from
Mar 31, 2023
34 changes: 33 additions & 1 deletion e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ create source s18 with (
) row format avro
row schema location 'file:///risingwave/avro-complex-schema.avsc'


# we cannot use confluent schema registry when connector is not kafka
statement error
create table s19
Expand All @@ -256,6 +255,29 @@ with (
row format avro
row schema location confluent schema registry 'http://127.0.0.1:8081'

# we cannot create debezium source with with generated column
statement error
create table s20 (
id integer primary key,
first_name varchar,
last_name varchar,
email varchar,
gen_id integer as id+1
) with (
connector = 'kafka',
topic = 'debezium_log',
properties.bootstrap.server = '127.0.0.1:29092'
) row format debezium_json

# create kafka source without with generated column
statement ok
create table s21 (v1 int as v2-1, v2 int, v3 int as v2+1) with (
connector = 'kafka',
topic = 'kafka_4_partition_topic_generated_columns',
properties.bootstrap.server = '127.0.0.1:29092',
scan.startup.mode = 'latest'
) row format json

statement ok
flush;

Expand Down Expand Up @@ -422,6 +444,13 @@ select count(*) from s16
----
0

query III rowsort
select * from s21;
----
19 20 21
20 21 22
NULL NULL NULL

statement ok
drop materialized view source_mv1

Expand Down Expand Up @@ -463,3 +492,6 @@ drop source s17

statement ok
drop source s18

statement ok
drop table s21
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ message StreamSource {
catalog.Table state_table = 2;
optional uint32 row_id_index = 3;
repeated plan_common.ColumnCatalog columns = 4;
repeated int32 pk_column_ids = 5;
// repeated int32 pk_column_ids = 5;
reserved 5;
map<string, string> properties = 6;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"v1": 10, "v2": 20}
{"v2": 21, "v3": "10"}
{"v1": 0}
2 changes: 1 addition & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl ColumnDesc {
Self::from_field_with_column_id(field, 0)
}

pub fn is_generated_column(&self) -> bool {
pub fn is_generated(&self) -> bool {
self.generated_column.is_some()
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,9 @@ async fn test_table_materialize() -> StreamResult<()> {
"fields.v1.max" => "1000",
"fields.v1.seed" => "12345",
));
let pk_column_ids = vec![0];
let row_id_index: usize = 0;
let source_builder = create_source_desc_builder(
&schema,
pk_column_ids,
Some(row_id_index),
source_info,
properties,
);
let source_builder =
create_source_desc_builder(&schema, Some(row_id_index), source_info, properties);

// Ensure the source exists.
let source_desc = source_builder.build().await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl Binder {

// TODO(yuhao): refine this if row_id is always the last column.
//
// `row_id_index` in bin insert operation should rule out generated column
// `row_id_index` in insert operation should rule out generated column
let row_id_index = {
if let Some(row_id_index) = table_catalog.row_id_index {
let mut cnt = 0;
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,10 +631,10 @@ pub async fn handle_create_source(

bind_sql_column_constraints(&session, name.clone(), &mut columns, stmt.columns)?;

if columns.iter().any(|c| c.is_generated()) {
// TODO(yuhao): allow generated columns on source
if row_id_index.is_none() && columns.iter().any(|c| c.is_generated()) {
// TODO(yuhao): allow delete from a non append only source
return Err(RwError::from(ErrorCode::BindError(
"Generated columns on source has not been implemented.".to_string(),
"Generated columns are only allowed in an append only source.".to_string(),
)));
}

Expand Down
13 changes: 9 additions & 4 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,14 @@ pub(crate) async fn gen_create_table_plan_with_source(

bind_sql_column_constraints(session, table_name.real_value(), &mut columns, column_defs)?;

if row_id_index.is_none() && columns.iter().any(|c| c.is_generated()) {
// TODO(yuhao): allow delete from a non append only source
return Err(ErrorCode::BindError(
"Generated columns are only allowed in an append only source.".to_string(),
)
.into());
}

gen_table_plan_inner(
context.into(),
table_name,
Expand Down Expand Up @@ -528,10 +536,7 @@ fn gen_table_plan_inner(
let source_catalog = source.as_ref().map(|source| Rc::new((source).into()));
let source_node: PlanRef = LogicalSource::new(
source_catalog,
columns
.iter()
.map(|column| column.column_desc.clone())
.collect_vec(),
columns.clone(),
pk_column_ids,
row_id_index,
false,
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ impl ToBatchPb for BatchSource {
NodeBody::Source(SourceNode {
source_id: source_catalog.id,
info: Some(source_catalog.info.clone()),
columns: source_catalog
.columns
columns: self
.logical
.core
.column_catalog
.iter()
.map(|c| c.to_protobuf())
.collect_vec(),
Expand Down
52 changes: 36 additions & 16 deletions src/frontend/src/optimizer/plan_node/generic/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ fn check_expr_type(expr: &ExprImpl) -> std::result::Result<(), &'static str> {
pub struct Project<PlanRef> {
pub exprs: Vec<ExprImpl>,
pub input: PlanRef,
pub pk_indices: Option<Vec<usize>>,
pub o2i_col_mapping: ColIndexMapping,
// we need some check when construct the `Project::new`
_private: (),
}
Expand Down Expand Up @@ -99,12 +101,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {
}

fn logical_pk(&self) -> Option<Vec<usize>> {
let i2o = self.i2o_col_mapping();
self.input
.logical_pk()
.iter()
.map(|pk_col| i2o.try_map(*pk_col))
.collect::<Option<Vec<_>>>()
self.pk_indices.clone()
}

fn ctx(&self) -> OptimizerContextRef {
Expand All @@ -119,15 +116,41 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {

impl<PlanRef: GenericPlanRef> Project<PlanRef> {
pub fn new(exprs: Vec<ExprImpl>, input: PlanRef) -> Self {
Self::new_inner(exprs, input, None)
}

fn new_inner(exprs: Vec<ExprImpl>, input: PlanRef, pk_indices: Option<Vec<usize>>) -> Self {
for expr in &exprs {
assert_input_ref!(expr, input.schema().fields().len());
check_expr_type(expr)
.map_err(|expr| format!("{expr} should not in Project operator"))
.unwrap();
}

let input_len = input.schema().len();
let mut map = vec![None; exprs.len()];
for (i, expr) in exprs.iter().enumerate() {
map[i] = match expr {
ExprImpl::InputRef(input) => Some(input.index()),
_ => None,
}
}
let o2i_col_mapping = ColIndexMapping::with_target_size(map, input_len);

let i2o = o2i_col_mapping.inverse();
let pk_indices = pk_indices.or_else(|| {
input
.logical_pk()
.iter()
.map(|pk_col| i2o.try_map(*pk_col))
.collect::<Option<Vec<_>>>()
});

Project {
exprs,
input,
o2i_col_mapping,
pk_indices,
_private: (),
}
}
Expand Down Expand Up @@ -173,6 +196,12 @@ impl<PlanRef: GenericPlanRef> Project<PlanRef> {
Self::new(exprs, input)
}

/// Creates a `Project` with given pk column indices,
/// overwrite the one calculated by `generic::Project`.
pub fn with_pk_indices(input: PlanRef, exprs: Vec<ExprImpl>, pk_indices: &[usize]) -> Self {
Self::new_inner(exprs, input, Some(pk_indices.to_vec()))
}

pub fn decompose(self) -> (Vec<ExprImpl>, PlanRef) {
(self.exprs, self.input)
}
Expand Down Expand Up @@ -212,16 +241,7 @@ impl<PlanRef: GenericPlanRef> Project<PlanRef> {
}

pub fn o2i_col_mapping(&self) -> ColIndexMapping {
let exprs = &self.exprs;
let input_len = self.input.schema().len();
let mut map = vec![None; exprs.len()];
for (i, expr) in exprs.iter().enumerate() {
map[i] = match expr {
ExprImpl::InputRef(input) => Some(input.index()),
_ => None,
}
}
ColIndexMapping::with_target_size(map, input_len)
self.o2i_col_mapping.clone()
}

/// get the Mapping of columnIndex from input column index to output column index,if a input
Expand Down
34 changes: 13 additions & 21 deletions src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::collections::HashMap;
use std::rc::Rc;

use derivative::Derivative;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;

Expand All @@ -35,7 +35,7 @@ pub struct Source {
pub catalog: Option<Rc<SourceCatalog>>,
/// NOTE(Yuanxin): Here we store column descriptions, pk column ids, and row id index for plan
/// generating, even if there is no external stream source.
pub column_descs: Vec<ColumnDesc>,
pub column_catalog: Vec<ColumnCatalog>,
pub pk_col_ids: Vec<ColumnId>,
pub row_id_index: Option<usize>,
/// Whether the "SourceNode" should generate the row id column for append only source
Expand All @@ -49,20 +49,20 @@ pub struct Source {

impl GenericPlanNode for Source {
fn schema(&self) -> Schema {
let fields = self.non_generated_columns().map(Into::into).collect();
// let fields = self.column_descs.iter().map(Into::into).collect();
let fields = self
.column_catalog
.iter()
.map(|c| (&c.column_desc).into())
.collect();
Schema { fields }
}

fn logical_pk(&self) -> Option<Vec<usize>> {
let mut id_to_idx = HashMap::new();
// self.column_descs.iter().filter(|c| !c.is_generated_column()).enumerate().for_each(|(idx,
// c)| {
self.non_generated_columns()
.enumerate()
.for_each(|(idx, c)| {
id_to_idx.insert(c.column_id, idx);
});

self.column_catalog.iter().enumerate().for_each(|(idx, c)| {
id_to_idx.insert(c.column_id(), idx);
});
self.pk_col_ids
.iter()
.map(|c| id_to_idx.get(c).copied())
Expand All @@ -75,12 +75,11 @@ impl GenericPlanNode for Source {

fn functional_dependency(&self) -> FunctionalDependencySet {
let pk_indices = self.logical_pk();
let non_generated_columns_count = self.non_generated_columns().count();
match pk_indices {
Some(pk_indices) => {
FunctionalDependencySet::with_key(non_generated_columns_count, &pk_indices)
FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices)
}
None => FunctionalDependencySet::new(non_generated_columns_count),
None => FunctionalDependencySet::new(self.column_catalog.len()),
}
}
}
Expand Down Expand Up @@ -114,11 +113,4 @@ impl Source {

builder.build(vec![], 1)
}

/// Non-generated columns
fn non_generated_columns(&self) -> impl Iterator<Item = &ColumnDesc> {
self.column_descs
.iter()
.filter(|c| !c.is_generated_column())
}
}
7 changes: 7 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ impl LogicalProject {
Self::with_core(generic::Project::with_mapping(input, mapping))
}

/// Creates a `LogicalProject` with given pk column indices,
/// overwrite the one calculated by `generic::Project` from `exprs`.
pub fn with_pk_indices(input: PlanRef, exprs: Vec<ExprImpl>, pk_indices: &[usize]) -> Self {
let core = generic::Project::with_pk_indices(input, exprs, pk_indices);
Self::with_core(core)
}

/// Creates a `LogicalProject` which select some columns from the input.
pub fn with_out_fields(input: PlanRef, out_fields: &FixedBitSet) -> Self {
Self::with_core(generic::Project::with_out_fields(input, out_fields))
Expand Down
Loading