diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index 39a78486fc584..14cbd2fb74b32 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -345,8 +345,9 @@ export interface SourceNode_PropertiesEntry { export interface SinkNode { tableId: number; - columnIds: number[]; properties: { [key: string]: string }; + fields: Field[]; + sinkPk: number[]; } export interface SinkNode_PropertiesEntry { @@ -1747,44 +1748,53 @@ export const SourceNode_PropertiesEntry = { }; function createBaseSinkNode(): SinkNode { - return { tableId: 0, columnIds: [], properties: {} }; + return { tableId: 0, properties: {}, fields: [], sinkPk: [] }; } export const SinkNode = { fromJSON(object: any): SinkNode { return { tableId: isSet(object.tableId) ? Number(object.tableId) : 0, - columnIds: Array.isArray(object?.columnIds) ? object.columnIds.map((e: any) => Number(e)) : [], properties: isObject(object.properties) ? Object.entries(object.properties).reduce<{ [key: string]: string }>((acc, [key, value]) => { acc[key] = String(value); return acc; }, {}) : {}, + fields: Array.isArray(object?.fields) + ? object.fields.map((e: any) => Field.fromJSON(e)) + : [], + sinkPk: Array.isArray(object?.sinkPk) + ? object.sinkPk.map((e: any) => Number(e)) + : [], }; }, toJSON(message: SinkNode): unknown { const obj: any = {}; message.tableId !== undefined && (obj.tableId = Math.round(message.tableId)); - if (message.columnIds) { - obj.columnIds = message.columnIds.map((e) => Math.round(e)); - } else { - obj.columnIds = []; - } obj.properties = {}; if (message.properties) { Object.entries(message.properties).forEach(([k, v]) => { obj.properties[k] = v; }); } + if (message.fields) { + obj.fields = message.fields.map((e) => e ? Field.toJSON(e) : undefined); + } else { + obj.fields = []; + } + if (message.sinkPk) { + obj.sinkPk = message.sinkPk.map((e) => Math.round(e)); + } else { + obj.sinkPk = []; + } return obj; }, fromPartial, I>>(object: I): SinkNode { const message = createBaseSinkNode(); message.tableId = object.tableId ?? 0; - message.columnIds = object.columnIds?.map((e) => e) || []; message.properties = Object.entries(object.properties ?? {}).reduce<{ [key: string]: string }>( (acc, [key, value]) => { if (value !== undefined) { @@ -1794,6 +1804,8 @@ export const SinkNode = { }, {}, ); + message.fields = object.fields?.map((e) => Field.fromPartial(e)) || []; + message.sinkPk = object.sinkPk?.map((e) => e) || []; return message; }, }; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 3742ff2c3a5ea..3d55218854be8 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -130,8 +130,9 @@ message SourceNode { message SinkNode { uint32 table_id = 1; - repeated int32 column_ids = 2; map properties = 3; + repeated plan_common.Field fields = 4; + repeated uint32 sink_pk = 5; } message ProjectNode { diff --git a/src/frontend/planner_test/tests/testdata/predicate_pushdown.yaml b/src/frontend/planner_test/tests/testdata/predicate_pushdown.yaml index 093cc56ee9c20..302a9ff387a35 100644 --- a/src/frontend/planner_test/tests/testdata/predicate_pushdown.yaml +++ b/src/frontend/planner_test/tests/testdata/predicate_pushdown.yaml @@ -59,21 +59,22 @@ LogicalFilter { predicate: (window_start >= '1997-07-02':Varchar::Date) AND (window_end >= '1997-07-03':Varchar::Date) AND (window_start >= (t.ts + '1 day':Interval)) AND (window_end > (t.ts + '4 days':Interval)) } └─LogicalHopWindow { time_col: t.ts, slide: 1 day, size: 3 days, output: all } └─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t.v4, t.ts], predicate: (t.v1 = 10:Int32) AND (t.v2 = 20:Int32) AND (t.v3 = 30:Int32) AND (t.ts >= '1997-07-01':Varchar::Date) } -- name: filter hop transpose, window_start column +- name: filter hop transpose with non-trivial output-indices sql: | create table t(v1 int, v2 int, v3 int, v4 int, ts date); - with cte as (select window_end from hop(t, ts, interval '1' day, interval '3' day)) - select * from cte where window_end > date '2022-01-01' + with cte as (select window_end, v4, v2 from hop(t, ts, interval '1' day, interval '3' day)) + select * from cte where window_end > date '2022-01-01' AND v4=10 AND v2 > 20 logical_plan: | - LogicalProject { exprs: [window_end] } - └─LogicalFilter { predicate: (window_end > '2022-01-01':Varchar::Date) } - └─LogicalProject { exprs: [window_end] } + LogicalProject { exprs: [window_end, t.v4, t.v2] } + └─LogicalFilter { predicate: (window_end > '2022-01-01':Varchar::Date) AND (t.v4 = 10:Int32) AND (t.v2 > 20:Int32) } + └─LogicalProject { exprs: [window_end, t.v4, t.v2] } └─LogicalHopWindow { time_col: t.ts, slide: 1 day, size: 3 days, output: all } └─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t.v4, t.ts, t._row_id] } optimized_logical_plan: | - LogicalFilter { predicate: (window_end > '2022-01-01':Varchar::Date) } - └─LogicalHopWindow { time_col: t.ts, slide: 1 day, size: 3 days, output: [window_end] } - └─LogicalScan { table: t, columns: [t.ts] } + LogicalProject { exprs: [window_end, t.v4, t.v2] } + └─LogicalFilter { predicate: (window_end > '2022-01-01':Varchar::Date) } + └─LogicalHopWindow { time_col: t.ts, slide: 1 day, size: 3 days, output: [t.v2, t.v4, window_end] } + └─LogicalScan { table: t, columns: [t.v2, t.v4, t.ts], predicate: (t.v4 = 10:Int32) AND (t.v2 > 20:Int32) } - name: filter union transpose sql: | create table t1 (v1 int, v2 int, v3 int); diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 8431ae4b5ee9f..63f4b1537a99d 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -335,7 +335,7 @@ impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(Project, core #[derive(Debug, Clone)] pub struct Sink { pub input: PlanRef, - pub table: TableCatalog, + pub sink_desc: TableCatalog, } impl_plan_tree_node_v2_for_stream_unary_node!(Sink, input); /// [`Source`] represents a table/connector source at the very beginning of the graph. @@ -681,13 +681,17 @@ pub fn to_stream_prost_body( select_list: me.exprs.iter().map(Expr::to_expr_proto).collect(), }) } - Node::Sink(me) => { - ProstNode::Sink(SinkNode { - table_id: me.table.id().into(), - column_ids: vec![], // TODO(nanderstabel): fix empty Vector - properties: me.table.properties.inner().clone(), - }) - } + Node::Sink(me) => ProstNode::Sink(SinkNode { + table_id: me.sink_desc.id().into(), + properties: me.sink_desc.properties.inner().clone(), + fields: me + .sink_desc + .columns() + .iter() + .map(|c| Field::from(c.column_desc.clone()).to_prost()) + .collect(), + sink_pk: me.sink_desc.pk().iter().map(|c| c.index as u32).collect(), + }), Node::Source(me) => { let me = &me.core.catalog; ProstNode::Source(SourceNode { diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 2aea3bb6b1cac..0c04368981478 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -14,6 +14,7 @@ use std::fmt; +use risingwave_common::catalog::Field; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use super::{PlanBase, PlanRef, StreamNode}; @@ -76,7 +77,18 @@ impl StreamNode for StreamSink { ProstStreamNode::Sink(SinkNode { table_id: self.sink_catalog.id().into(), - column_ids: vec![], // TODO(nanderstabel): fix empty Vector + fields: self + .sink_catalog + .columns() + .iter() + .map(|c| Field::from(c.column_desc.clone()).to_prost()) + .collect(), + sink_pk: self + .sink_catalog + .pk() + .iter() + .map(|c| c.index as u32) + .collect(), properties: self.sink_catalog.properties.inner().clone(), }) } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 8560dcd31eb08..9d60e6864474a 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -21,21 +21,20 @@ use futures_async_stream::try_stream; use risingwave_common::catalog::Schema; use risingwave_connector::sink::{Sink, SinkConfig, SinkImpl}; use risingwave_connector::ConnectorParams; -use risingwave_storage::StateStore; use super::error::{StreamExecutorError, StreamExecutorResult}; use super::{BoxedExecutor, Executor, Message}; use crate::executor::monitor::StreamingMetrics; use crate::executor::PkIndices; -pub struct SinkExecutor { +pub struct SinkExecutor { input: BoxedExecutor, - _store: S, metrics: Arc, properties: HashMap, identity: String, connector_params: ConnectorParams, - pk_indices: PkIndices, + schema: Schema, + pk_indices: Vec, } async fn build_sink( @@ -49,26 +48,26 @@ async fn build_sink( )) } -impl SinkExecutor { +impl SinkExecutor { pub fn new( materialize_executor: BoxedExecutor, - _store: S, metrics: Arc, mut properties: HashMap, executor_id: u64, connector_params: ConnectorParams, + schema: Schema, + pk_indices: Vec, ) -> Self { // This field can be used to distinguish a specific actor in parallelism to prevent // transaction execution errors properties.insert("identifier".to_string(), format!("sink-{:?}", executor_id)); - let pk_indices = materialize_executor.pk_indices().to_vec(); Self { input: materialize_executor, - _store, metrics, properties, identity: format!("SinkExecutor_{:?}", executor_id), pk_indices, + schema, connector_params, } } @@ -81,11 +80,10 @@ impl SinkExecutor { let mut in_transaction = false; let mut epoch = 0; - let schema = self.schema().clone(); let sink_config = SinkConfig::from_hashmap(self.properties.clone())?; let mut sink = build_sink( sink_config.clone(), - schema, + self.schema, self.pk_indices, self.connector_params, ) @@ -149,7 +147,7 @@ impl SinkExecutor { } } -impl Executor for SinkExecutor { +impl Executor for SinkExecutor { fn execute(self: Box) -> super::BoxedMessageStream { self.execute_inner().boxed() } @@ -179,7 +177,6 @@ mod test { use risingwave_common::array::StreamChunkTestExt; use risingwave_common::catalog::Field; use risingwave_common::types::DataType; - use risingwave_storage::memory::MemoryStateStore; use crate::executor::Barrier; @@ -190,15 +187,17 @@ mod test { "table".into() => "t".into(), "user".into() => "root".into() }; + let schema = Schema::new(vec![ + Field::with_name(DataType::Int32, "v1"), + Field::with_name(DataType::Int32, "v2"), + Field::with_name(DataType::Int32, "v3"), + ]); + let pk = vec![]; // Mock `child` let mock = MockSource::with_messages( - Schema::new(vec![ - Field::with_name(DataType::Int32, "v1"), - Field::with_name(DataType::Int32, "v2"), - Field::with_name(DataType::Int32, "v3"), - ]), - PkIndices::new(), + schema.clone(), + pk.clone(), vec![ Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( " I I I @@ -214,11 +213,12 @@ mod test { let sink_executor = SinkExecutor::new( Box::new(mock), - MemoryStateStore::new(), Arc::new(StreamingMetrics::unused()), properties, 0, Default::default(), + schema.clone(), + pk.clone(), ); let mut executor = SinkExecutor::execute(Box::new(sink_executor)); diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 902fca21d851d..ad39df912cdd4 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::catalog::{ColumnId, TableId}; +use risingwave_common::catalog::Field; use risingwave_pb::stream_plan::SinkNode; use super::*; @@ -27,27 +27,27 @@ impl ExecutorBuilder for SinkExecutorBuilder { async fn new_boxed_executor( params: ExecutorParams, node: &Self::Node, - store: impl StateStore, + _store: impl StateStore, stream: &mut LocalStreamManagerCore, ) -> StreamResult { let [materialize_executor]: [_; 1] = params.input.try_into().unwrap(); - let _sink_id = TableId::from(node.table_id); - let _column_ids = node - .get_column_ids() - .iter() - .map(|i| ColumnId::from(*i)) - .collect::>(); - let properties = node.get_properties(); + let pk_indices = node + .sink_pk + .iter() + .map(|idx| *idx as usize) + .collect::>(); + let schema = node.fields.iter().map(Field::from).collect(); Ok(Box::new(SinkExecutor::new( materialize_executor, - store, stream.streaming_metrics.clone(), properties.clone(), params.executor_id, params.env.connector_params(), + schema, + pk_indices, ))) } }