Skip to content

Commit

Permalink
fix(sink): sink exec get params from fe (#6922)
Browse files Browse the repository at this point in the history
In the past, we assume the SinkExecutor's input must be a Materialize executor. SinkExecutor in Compute Node depends on that and uses its upstream input's schema and pk_indices to build the sinkImpl. But after we support creating a sink with query, we can not simply use the upstream executor's schema and pk_indices as the sink's property.
it is just a workaround fix for this release. It will be like #6899 finally.


Approved-By: BugenZhao
Approved-By: yuhao-su

Co-Authored-By: st1page <[email protected]>
  • Loading branch information
st1page and st1page authored Dec 16, 2022
1 parent 1ae051b commit bf87df6
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 48 deletions.
30 changes: 21 additions & 9 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ message SourceNode {

message SinkNode {
uint32 table_id = 1;
repeated int32 column_ids = 2;
map<string, string> properties = 3;
repeated plan_common.Field fields = 4;
repeated uint32 sink_pk = 5;
}

message ProjectNode {
Expand Down
20 changes: 12 additions & 8 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(Project, core
#[derive(Debug, Clone)]
pub struct Sink {
pub input: PlanRef,
pub table: TableCatalog,
pub sink_desc: TableCatalog,
}
impl_plan_tree_node_v2_for_stream_unary_node!(Sink, input);
/// [`Source`] represents a table/connector source at the very beginning of the graph.
Expand Down Expand Up @@ -681,13 +681,17 @@ pub fn to_stream_prost_body(
select_list: me.exprs.iter().map(Expr::to_expr_proto).collect(),
})
}
Node::Sink(me) => {
ProstNode::Sink(SinkNode {
table_id: me.table.id().into(),
column_ids: vec![], // TODO(nanderstabel): fix empty Vector
properties: me.table.properties.inner().clone(),
})
}
Node::Sink(me) => ProstNode::Sink(SinkNode {
table_id: me.sink_desc.id().into(),
properties: me.sink_desc.properties.inner().clone(),
fields: me
.sink_desc
.columns()
.iter()
.map(|c| Field::from(c.column_desc.clone()).to_prost())
.collect(),
sink_pk: me.sink_desc.pk().iter().map(|c| c.index as u32).collect(),
}),
Node::Source(me) => {
let me = &me.core.catalog;
ProstNode::Source(SourceNode {
Expand Down
14 changes: 13 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::fmt;

use risingwave_common::catalog::Field;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::{PlanBase, PlanRef, StreamNode};
Expand Down Expand Up @@ -76,7 +77,18 @@ impl StreamNode for StreamSink {

ProstStreamNode::Sink(SinkNode {
table_id: self.sink_catalog.id().into(),
column_ids: vec![], // TODO(nanderstabel): fix empty Vector
fields: self
.sink_catalog
.columns()
.iter()
.map(|c| Field::from(c.column_desc.clone()).to_prost())
.collect(),
sink_pk: self
.sink_catalog
.pk()
.iter()
.map(|c| c.index as u32)
.collect(),
properties: self.sink_catalog.properties.inner().clone(),
})
}
Expand Down
38 changes: 19 additions & 19 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,20 @@ use futures_async_stream::try_stream;
use risingwave_common::catalog::Schema;
use risingwave_connector::sink::{Sink, SinkConfig, SinkImpl};
use risingwave_connector::ConnectorParams;
use risingwave_storage::StateStore;

use super::error::{StreamExecutorError, StreamExecutorResult};
use super::{BoxedExecutor, Executor, Message};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::PkIndices;

pub struct SinkExecutor<S: StateStore> {
pub struct SinkExecutor {
input: BoxedExecutor,
_store: S,
metrics: Arc<StreamingMetrics>,
properties: HashMap<String, String>,
identity: String,
connector_params: ConnectorParams,
pk_indices: PkIndices,
schema: Schema,
pk_indices: Vec<usize>,
}

async fn build_sink(
Expand All @@ -49,26 +48,26 @@ async fn build_sink(
))
}

impl<S: StateStore> SinkExecutor<S> {
impl SinkExecutor {
pub fn new(
materialize_executor: BoxedExecutor,
_store: S,
metrics: Arc<StreamingMetrics>,
mut properties: HashMap<String, String>,
executor_id: u64,
connector_params: ConnectorParams,
schema: Schema,
pk_indices: Vec<usize>,
) -> Self {
// This field can be used to distinguish a specific actor in parallelism to prevent
// transaction execution errors
properties.insert("identifier".to_string(), format!("sink-{:?}", executor_id));
let pk_indices = materialize_executor.pk_indices().to_vec();
Self {
input: materialize_executor,
_store,
metrics,
properties,
identity: format!("SinkExecutor_{:?}", executor_id),
pk_indices,
schema,
connector_params,
}
}
Expand All @@ -81,11 +80,10 @@ impl<S: StateStore> SinkExecutor<S> {
let mut in_transaction = false;
let mut epoch = 0;

let schema = self.schema().clone();
let sink_config = SinkConfig::from_hashmap(self.properties.clone())?;
let mut sink = build_sink(
sink_config.clone(),
schema,
self.schema,
self.pk_indices,
self.connector_params,
)
Expand Down Expand Up @@ -149,7 +147,7 @@ impl<S: StateStore> SinkExecutor<S> {
}
}

impl<S: StateStore> Executor for SinkExecutor<S> {
impl Executor for SinkExecutor {
fn execute(self: Box<Self>) -> super::BoxedMessageStream {
self.execute_inner().boxed()
}
Expand Down Expand Up @@ -179,7 +177,6 @@ mod test {
use risingwave_common::array::StreamChunkTestExt;
use risingwave_common::catalog::Field;
use risingwave_common::types::DataType;
use risingwave_storage::memory::MemoryStateStore;

use crate::executor::Barrier;

Expand All @@ -190,15 +187,17 @@ mod test {
"table".into() => "t".into(),
"user".into() => "root".into()
};
let schema = Schema::new(vec![
Field::with_name(DataType::Int32, "v1"),
Field::with_name(DataType::Int32, "v2"),
Field::with_name(DataType::Int32, "v3"),
]);
let pk = vec![];

// Mock `child`
let mock = MockSource::with_messages(
Schema::new(vec![
Field::with_name(DataType::Int32, "v1"),
Field::with_name(DataType::Int32, "v2"),
Field::with_name(DataType::Int32, "v3"),
]),
PkIndices::new(),
schema.clone(),
pk.clone(),
vec![
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I I
Expand All @@ -214,11 +213,12 @@ mod test {

let sink_executor = SinkExecutor::new(
Box::new(mock),
MemoryStateStore::new(),
Arc::new(StreamingMetrics::unused()),
properties,
0,
Default::default(),
schema.clone(),
pk.clone(),
);

let mut executor = SinkExecutor::execute(Box::new(sink_executor));
Expand Down
20 changes: 10 additions & 10 deletions src/stream/src/from_proto/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::{ColumnId, TableId};
use risingwave_common::catalog::Field;
use risingwave_pb::stream_plan::SinkNode;

use super::*;
Expand All @@ -27,27 +27,27 @@ impl ExecutorBuilder for SinkExecutorBuilder {
async fn new_boxed_executor(
params: ExecutorParams,
node: &Self::Node,
store: impl StateStore,
_store: impl StateStore,
stream: &mut LocalStreamManagerCore,
) -> StreamResult<BoxedExecutor> {
let [materialize_executor]: [_; 1] = params.input.try_into().unwrap();

let _sink_id = TableId::from(node.table_id);
let _column_ids = node
.get_column_ids()
.iter()
.map(|i| ColumnId::from(*i))
.collect::<Vec<ColumnId>>();

let properties = node.get_properties();
let pk_indices = node
.sink_pk
.iter()
.map(|idx| *idx as usize)
.collect::<Vec<_>>();
let schema = node.fields.iter().map(Field::from).collect();

Ok(Box::new(SinkExecutor::new(
materialize_executor,
store,
stream.streaming_metrics.clone(),
properties.clone(),
params.executor_id,
params.env.connector_params(),
schema,
pk_indices,
)))
}
}

0 comments on commit bf87df6

Please sign in to comment.