Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): watermark derivation for various plan nodes #7655

Merged
merged 4 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/frontend/src/optimizer/plan_node/plan_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl PlanBase {
functional_dependency: FunctionalDependencySet,
) -> Self {
let id = ctx.next_plan_node_id();
let watermark_cols = FixedBitSet::with_capacity(schema.len());
let watermark_columns = FixedBitSet::with_capacity(schema.len());
Self {
id,
ctx,
Expand All @@ -87,7 +87,7 @@ impl PlanBase {
// Logical plan node won't touch `append_only` field
append_only: true,
functional_dependency,
watermark_columns: watermark_cols,
watermark_columns,
}
}

Expand Down Expand Up @@ -123,7 +123,7 @@ impl PlanBase {
) -> Self {
let id = ctx.next_plan_node_id();
let functional_dependency = FunctionalDependencySet::new(schema.len());
let watermark_cols = FixedBitSet::with_capacity(schema.len());
let watermark_columns = FixedBitSet::with_capacity(schema.len());
Self {
id,
ctx,
Expand All @@ -134,7 +134,7 @@ impl PlanBase {
// Batch plan node won't touch `append_only` field
append_only: true,
functional_dependency,
watermark_columns: watermark_cols,
watermark_columns,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl StreamDml {
input.functional_dependency().clone(),
input.distribution().clone(),
append_only,
FixedBitSet::with_capacity(input.schema().len()),
FixedBitSet::with_capacity(input.schema().len()), // no watermark if dml is allowed
);

Self {
Expand Down
11 changes: 5 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ impl StreamDynamicFilter {
pub fn new(left_index: usize, comparator: ExprType, left: PlanRef, right: PlanRef) -> Self {
assert_eq!(right.schema().len(), 1);

let watermark_cols = {
let mut watermark_cols = FixedBitSet::with_capacity(left.schema().len());
let watermark_columns = {
let mut watermark_columns = FixedBitSet::with_capacity(left.schema().len());
if right.watermark_columns()[0] {
match comparator {
ExprType::GreaterThan | ExprType::GreaterThanOrEqual => {
watermark_cols.set(left_index, true)
watermark_columns.set(left_index, true)
}
_ => {}
}
}
watermark_cols
watermark_columns
};

// TODO: derive from input
Expand All @@ -62,8 +62,7 @@ impl StreamDynamicFilter {
left.distribution().clone(),
false, /* we can have a new abstraction for append only and monotonically increasing
* in the future */
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
watermark_cols,
watermark_columns,
);
let core = generic::DynamicFilter {
comparator,
Expand Down
20 changes: 15 additions & 5 deletions src/frontend/src/optimizer/plan_node/stream_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,33 @@ pub struct StreamExpand {

impl StreamExpand {
pub fn new(logical: LogicalExpand) -> Self {
let dist = match logical.input().distribution() {
let input = logical.input();
let schema = logical.schema().clone();

let dist = match input.distribution() {
Distribution::Single => Distribution::Single,
Distribution::SomeShard
| Distribution::HashShard(_)
| Distribution::UpstreamHashShard(_, _) => Distribution::SomeShard,
Distribution::Broadcast => unreachable!(),
};

let mut watermark_columns = FixedBitSet::with_capacity(schema.len());
watermark_columns.extend(
input
.watermark_columns()
.ones()
.map(|idx| idx + input.schema().len()),
);

let base = PlanBase::new_stream(
logical.base.ctx.clone(),
logical.schema().clone(),
schema,
logical.base.logical_pk.to_vec(),
logical.functional_dependency().clone(),
dist,
logical.input().append_only(),
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
FixedBitSet::with_capacity(logical.schema().len()),
input.append_only(),
watermark_columns,
);
StreamExpand { base, logical }
}
Expand Down
10 changes: 7 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,27 @@ impl StreamGlobalSimpleAgg {
pub fn new(logical: LogicalAgg) -> Self {
let ctx = logical.base.ctx.clone();
let pk_indices = logical.base.logical_pk.to_vec();
let schema = logical.schema().clone();
let input = logical.input();
let input_dist = input.distribution();
let dist = match input_dist {
Distribution::Single => Distribution::Single,
_ => panic!(),
};

// Empty because watermark column(s) must be in group key and global simple agg have no
// group key.
let watermark_columns = FixedBitSet::with_capacity(schema.len());

// Simple agg executor might change the append-only behavior of the stream.
let base = PlanBase::new_stream(
ctx,
logical.schema().clone(),
schema,
pk_indices,
logical.functional_dependency().clone(),
dist,
false,
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
FixedBitSet::with_capacity(logical.schema().len()),
watermark_columns,
);
StreamGlobalSimpleAgg { base, logical }
}
Expand Down
19 changes: 16 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,28 @@ impl StreamGroupTopN {
assert!(!logical.group_key().is_empty());
assert!(logical.limit() > 0);
let input = logical.input();
let schema = input.schema().clone();

let watermark_columns = if input.append_only() {
input.watermark_columns().clone()
} else {
let mut watermark_columns = FixedBitSet::with_capacity(schema.len());
for &idx in logical.group_key() {
if input.watermark_columns().contains(idx) {
watermark_columns.insert(idx);
}
}
watermark_columns
};

let base = PlanBase::new_stream(
input.ctx(),
input.schema().clone(),
schema,
input.logical_pk().to_vec(),
input.functional_dependency().clone(),
input.distribution().clone(),
false,
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
FixedBitSet::with_capacity(logical.schema().len()),
watermark_columns,
);
StreamGroupTopN {
base,
Expand Down
15 changes: 12 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl StreamHashAgg {
pub fn new(logical: LogicalAgg, vnode_col_idx: Option<usize>) -> Self {
let ctx = logical.base.ctx.clone();
let pk_indices = logical.base.logical_pk.to_vec();
let schema = logical.schema().clone();
let input = logical.input();
let input_dist = input.distribution();
let dist = match input_dist {
Expand All @@ -44,16 +45,24 @@ impl StreamHashAgg {
.rewrite_provided_distribution(input_dist),
d => d.clone(),
};

let mut watermark_columns = FixedBitSet::with_capacity(schema.len());
// Watermark column(s) must be in group key.
for (idx, input_idx) in logical.group_key().iter().enumerate() {
if input.watermark_columns().contains(*input_idx) {
watermark_columns.insert(idx);
}
}

// Hash agg executor might change the append-only behavior of the stream.
let base = PlanBase::new_stream(
ctx,
logical.schema().clone(),
schema,
pk_indices,
logical.functional_dependency().clone(),
dist,
false,
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
FixedBitSet::with_capacity(logical.schema().len()),
watermark_columns,
);
StreamHashAgg {
base,
Expand Down
14 changes: 10 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use fixedbitset::FixedBitSet;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::HopWindowNode;

Expand All @@ -33,19 +32,26 @@ impl StreamHopWindow {
let ctx = logical.base.ctx.clone();
let pk_indices = logical.base.logical_pk.to_vec();
let input = logical.input();
let schema = logical.schema().clone();

let i2o = logical.i2o_col_mapping();
let dist = i2o.rewrite_provided_distribution(input.distribution());

let mut watermark_columns = i2o.rewrite_bitset(input.watermark_columns());
if watermark_columns.contains(logical.core.time_col.index) {
// Watermark on `time_col` indicates watermark on both `window_start` and `window_end`.
watermark_columns.insert(schema.len() - 2); // window_start
watermark_columns.insert(schema.len() - 1); // window_end
}

let base = PlanBase::new_stream(
ctx,
logical.schema().clone(),
schema,
pk_indices,
logical.functional_dependency().clone(),
dist,
logical.input().append_only(),
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
FixedBitSet::with_capacity(logical.schema().len()),
watermark_columns,
);
Self { base, logical }
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl StreamIndexScan {
logical.functional_dependency().clone(),
distribution,
false, // TODO: determine the `append-only` field of table scan
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
// TODO: https://github.com/risingwavelabs/risingwave/issues/7660
FixedBitSet::with_capacity(logical.schema().len()),
);
Self {
Expand Down
14 changes: 11 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,27 @@ impl StreamLocalSimpleAgg {
pub fn new(logical: LogicalAgg) -> Self {
let ctx = logical.base.ctx.clone();
let pk_indices = logical.base.logical_pk.to_vec();
let schema = logical.schema().clone();
let input = logical.input();
let input_dist = input.distribution();
debug_assert!(input_dist.satisfies(&RequiredDist::AnyShard));

let mut watermark_columns = FixedBitSet::with_capacity(schema.len());
// Watermark column(s) must be in group key.
for (idx, input_idx) in logical.group_key().iter().enumerate() {
if input.watermark_columns().contains(*input_idx) {
watermark_columns.insert(idx);
}
}

let base = PlanBase::new_stream(
ctx,
logical.schema().clone(),
schema,
pk_indices,
logical.functional_dependency().clone(),
input_dist.clone(),
input.append_only(),
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
FixedBitSet::with_capacity(logical.schema().len()),
watermark_columns,
);
StreamLocalSimpleAgg { base, logical }
}
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ impl StreamNow {
sub_fields: vec![],
type_name: String::default(),
}]);
let mut watermark_cols = FixedBitSet::with_capacity(1);
watermark_cols.set(0, true);
let mut watermark_columns = FixedBitSet::with_capacity(1);
watermark_columns.set(0, true);
let base = PlanBase::new_stream(
ctx,
schema,
vec![],
FunctionalDependencySet::default(),
Distribution::Single,
false,
watermark_cols,
watermark_columns,
);
Self { base }
}
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/optimizer/plan_node/stream_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,31 +70,31 @@ impl StreamProject {
let ctx = logical.base.ctx.clone();
let input = logical.input();
let pk_indices = logical.base.logical_pk.to_vec();
let schema = logical.schema().clone();
let distribution = logical
.i2o_col_mapping()
.rewrite_provided_distribution(input.distribution());

let mut watermark_derivations = vec![];
let mut watermark_cols = FixedBitSet::with_capacity(logical.schema().len());
let mut watermark_columns = FixedBitSet::with_capacity(schema.len());
for (expr_idx, expr) in logical.exprs().iter().enumerate() {
if let Some(input_idx) = try_derive_watermark(expr) {
if input.watermark_columns().contains(input_idx) {
watermark_derivations.push((input_idx, expr_idx));
watermark_cols.insert(expr_idx);
watermark_columns.insert(expr_idx);
}
}
}
// Project executor won't change the append-only behavior of the stream, so it depends on
// input's `append_only`.
let base = PlanBase::new_stream(
ctx,
logical.schema().clone(),
schema,
pk_indices,
logical.functional_dependency().clone(),
distribution,
logical.input().append_only(),
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
watermark_cols,
watermark_columns,
);
StreamProject {
base,
Expand Down
18 changes: 15 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::ProjectSetNode;

use super::{LogicalProjectSet, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::try_derive_watermark;
use crate::stream_fragmenter::BuildFragmentGraphState;

#[derive(Debug, Clone)]
Expand All @@ -33,20 +34,31 @@ impl StreamProjectSet {
let ctx = logical.base.ctx.clone();
let input = logical.input();
let pk_indices = logical.base.logical_pk.to_vec();
let schema = logical.schema().clone();
let distribution = logical
.i2o_col_mapping()
.rewrite_provided_distribution(input.distribution());

let mut watermark_columns = FixedBitSet::with_capacity(schema.len());
for (expr_idx, expr) in logical.select_list().iter().enumerate() {
if let Some(input_idx) = try_derive_watermark(expr) {
if input.watermark_columns().contains(input_idx) {
// The first column of ProjectSet is `projected_row_id`.
watermark_columns.insert(expr_idx + 1);
}
}
}

// ProjectSet executor won't change the append-only behavior of the stream, so it depends on
// input's `append_only`.
let base = PlanBase::new_stream(
ctx,
logical.schema().clone(),
schema,
pk_indices,
logical.functional_dependency().clone(),
distribution,
logical.input().append_only(),
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
FixedBitSet::with_capacity(logical.schema().len()),
watermark_columns,
);
StreamProjectSet { base, logical }
}
Expand Down
Loading