Skip to content

Commit

Permalink
Merge branch 'main' into zp/fix-conn-for-user
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Feb 6, 2023
2 parents e6a09e7 + 261c914 commit 08bbc18
Show file tree
Hide file tree
Showing 37 changed files with 254 additions and 118 deletions.
3 changes: 3 additions & 0 deletions e2e_test/batch/catalog/pg_enum.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
query IIIT
SELECT * FROM pg_catalog.pg_enum;
----
1 change: 1 addition & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ profile:
- use: meta-node
- use: compute-node
enable-tiered-cache: true
total-memory-bytes: 17179869184
- use: frontend
- use: compactor

Expand Down
10 changes: 10 additions & 0 deletions src/frontend/planner_test/tests/testdata/batch_index_join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,13 @@
└─BatchLookupJoin { type: Inner, predicate: t.a = t2.c, output: [t2.c, t2.d, t.a] }
└─BatchExchange { order: [], dist: UpstreamHashShard(t.a) }
└─BatchScan { table: t, columns: [t.a], distribution: SomeShard }
- sql: |
create table t1 (a int, b int);
create table t2 (c int, d int);
create index idx on t2(c) include (d);
select * from t1 join idx on t1.a = idx.c and t1.b = idx.d;
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchLookupJoin { type: Inner, predicate: t1.a = idx.c AND t1.b = idx.d, output: all }
└─BatchExchange { order: [], dist: UpstreamHashShard(t1.a) }
└─BatchScan { table: t1, columns: [t1.a, t1.b], distribution: SomeShard }
1 change: 0 additions & 1 deletion src/frontend/planner_test/tests/testdata/expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,6 @@
logical_plan: |
LogicalProject { exprs: [Array(1:Int32, null:Int32)] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
- name: regression (#7641) -fuzzing test failed at Bind error,types Boolean and Varchar cannot be matched
sql: |
select false >= 'LN1O0QP1yi' NOT IN (md5('4SeUPZhUbH'))
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/planner_test/tests/testdata/sysinfo_funcs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,8 @@
binder_error: |-
Feature is not yet implemented: Only boolean literals are supported in `current_schemas`.
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
- sql: |
select current_timestamp;
batch_plan: |-
BatchProject { exprs: [Now] }
└─BatchValues { rows: [[]] }
24 changes: 14 additions & 10 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,18 @@ impl Binder {
fn raw_literal(literal: ExprImpl) -> Handle {
Box::new(move |_binder, _inputs| Ok(literal.clone()))
}
fn now() -> Handle {
Box::new(move |binder, mut inputs| {
binder.ensure_now_function_allowed()?;
if !binder.in_create_mv {
inputs.push(ExprImpl::from(Literal::new(
Some(ScalarImpl::Int64((binder.bind_timestamp_ms * 1000) as i64)),
DataType::Timestamptz,
)));
}
raw_call(ExprType::Now)(binder, inputs)
})
}

static HANDLES: LazyLock<HashMap<&'static str, Handle>> = LazyLock::new(|| {
[
Expand Down Expand Up @@ -498,16 +510,8 @@ impl Binder {
GIT_SHA
)))),
// non-deterministic
("now", raw(|binder, mut inputs|{
binder.ensure_now_function_allowed()?;
if !binder.in_create_mv {
inputs.push(ExprImpl::from(Literal::new(
Some(ScalarImpl::Int64((binder.bind_timestamp_ms * 1000) as i64)),
DataType::Timestamptz,
)));
}
raw_call(ExprType::Now)(binder, inputs)
}))
("now", now()),
("current_timestamp", now())
]
.into_iter()
.collect()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Binder {
Expr::Row(exprs) => self.bind_row(exprs),
// input ref
Expr::Identifier(ident) => {
if ["session_user", "current_schema"]
if ["session_user", "current_schema", "current_timestamp"]
.iter()
.any(|e| ident.real_value().as_str() == *e)
{
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ prepare_sys_catalog! {
{ PG_CATALOG, PG_SHDESCRIPTION, vec![0], read_shdescription_info },
{ PG_CATALOG, PG_TABLESPACE, vec![0], read_tablespace_info },
{ PG_CATALOG, PG_STAT_ACTIVITY, vec![0], read_stat_activity },
{ PG_CATALOG, PG_ENUM, vec![0], read_enum_info },
{ INFORMATION_SCHEMA, COLUMNS, vec![], read_columns_info },
{ INFORMATION_SCHEMA, TABLES, vec![], read_tables_info },
{ RW_CATALOG, RW_META_SNAPSHOT, vec![], read_meta_snapshot await },
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod pg_class;
pub mod pg_collation;
pub mod pg_database;
pub mod pg_description;
pub mod pg_enum;
pub mod pg_index;
pub mod pg_keywords;
pub mod pg_matviews;
Expand All @@ -46,6 +47,7 @@ pub use pg_class::*;
pub use pg_collation::*;
pub use pg_database::*;
pub use pg_description::*;
pub use pg_enum::*;
pub use pg_index::*;
pub use pg_keywords::*;
pub use pg_matviews::*;
Expand Down Expand Up @@ -250,6 +252,10 @@ impl SysCatalogReaderImpl {
Ok(vec![])
}

pub(crate) fn read_enum_info(&self) -> Result<Vec<OwnedRow>> {
Ok(vec![])
}

pub(super) fn read_roles_info(&self) -> Result<Vec<OwnedRow>> {
let reader = self.user_info_reader.read_guard();
let users = reader.get_all_users();
Expand Down
28 changes: 28 additions & 0 deletions src/frontend/src/catalog/system_catalog/pg_catalog/pg_enum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::DataType;

use crate::catalog::system_catalog::SystemCatalogColumnsDef;

/// The `pg_enum` catalog contains entries showing the values and labels for each enum type.
/// The internal representation of a given enum value is actually the OID of its associated row in
/// `pg_enum`. Reference: [`https://www.postgresql.org/docs/current/catalog-pg-enum.html`]
pub const PG_ENUM_TABLE_NAME: &str = "pg_enum";
pub const PG_ENUM_COLUMNS: &[SystemCatalogColumnsDef<'_>] = &[
(DataType::Int32, "oid"),
(DataType::Int32, "enumtypid"),
(DataType::Float32, "enumsortorder"),
(DataType::Varchar, "enumlabel"),
];
6 changes: 5 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,11 @@ impl ToDistributedBatch for BatchLookupJoin {
let input = self.input().to_distributed_with_required(
&Order::any(),
&RequiredDist::PhysicalDist(Distribution::UpstreamHashShard(
self.eq_join_predicate.left_eq_indexes(),
self.eq_join_predicate
.left_eq_indexes()
.into_iter()
.take(self.lookup_prefix_len)
.collect(),
self.right_table_desc.table_id,
)),
)?;
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/optimizer/plan_node/plan_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl PlanBase {
functional_dependency: FunctionalDependencySet,
) -> Self {
let id = ctx.next_plan_node_id();
let watermark_cols = FixedBitSet::with_capacity(schema.len());
let watermark_columns = FixedBitSet::with_capacity(schema.len());
Self {
id,
ctx,
Expand All @@ -87,7 +87,7 @@ impl PlanBase {
// Logical plan node won't touch `append_only` field
append_only: true,
functional_dependency,
watermark_columns: watermark_cols,
watermark_columns,
}
}

Expand Down Expand Up @@ -123,7 +123,7 @@ impl PlanBase {
) -> Self {
let id = ctx.next_plan_node_id();
let functional_dependency = FunctionalDependencySet::new(schema.len());
let watermark_cols = FixedBitSet::with_capacity(schema.len());
let watermark_columns = FixedBitSet::with_capacity(schema.len());
Self {
id,
ctx,
Expand All @@ -134,7 +134,7 @@ impl PlanBase {
// Batch plan node won't touch `append_only` field
append_only: true,
functional_dependency,
watermark_columns: watermark_cols,
watermark_columns,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl StreamDml {
input.functional_dependency().clone(),
input.distribution().clone(),
append_only,
FixedBitSet::with_capacity(input.schema().len()),
FixedBitSet::with_capacity(input.schema().len()), // no watermark if dml is allowed
);

Self {
Expand Down
11 changes: 5 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ impl StreamDynamicFilter {
pub fn new(left_index: usize, comparator: ExprType, left: PlanRef, right: PlanRef) -> Self {
assert_eq!(right.schema().len(), 1);

let watermark_cols = {
let mut watermark_cols = FixedBitSet::with_capacity(left.schema().len());
let watermark_columns = {
let mut watermark_columns = FixedBitSet::with_capacity(left.schema().len());
if right.watermark_columns()[0] {
match comparator {
ExprType::GreaterThan | ExprType::GreaterThanOrEqual => {
watermark_cols.set(left_index, true)
watermark_columns.set(left_index, true)
}
_ => {}
}
}
watermark_cols
watermark_columns
};

// TODO: derive from input
Expand All @@ -62,8 +62,7 @@ impl StreamDynamicFilter {
left.distribution().clone(),
false, /* we can have a new abstraction for append only and monotonically increasing
* in the future */
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
watermark_cols,
watermark_columns,
);
let core = generic::DynamicFilter {
comparator,
Expand Down
20 changes: 15 additions & 5 deletions src/frontend/src/optimizer/plan_node/stream_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,33 @@ pub struct StreamExpand {

impl StreamExpand {
pub fn new(logical: LogicalExpand) -> Self {
let dist = match logical.input().distribution() {
let input = logical.input();
let schema = logical.schema().clone();

let dist = match input.distribution() {
Distribution::Single => Distribution::Single,
Distribution::SomeShard
| Distribution::HashShard(_)
| Distribution::UpstreamHashShard(_, _) => Distribution::SomeShard,
Distribution::Broadcast => unreachable!(),
};

let mut watermark_columns = FixedBitSet::with_capacity(schema.len());
watermark_columns.extend(
input
.watermark_columns()
.ones()
.map(|idx| idx + input.schema().len()),
);

let base = PlanBase::new_stream(
logical.base.ctx.clone(),
logical.schema().clone(),
schema,
logical.base.logical_pk.to_vec(),
logical.functional_dependency().clone(),
dist,
logical.input().append_only(),
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
FixedBitSet::with_capacity(logical.schema().len()),
input.append_only(),
watermark_columns,
);
StreamExpand { base, logical }
}
Expand Down
10 changes: 7 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,27 @@ impl StreamGlobalSimpleAgg {
pub fn new(logical: LogicalAgg) -> Self {
let ctx = logical.base.ctx.clone();
let pk_indices = logical.base.logical_pk.to_vec();
let schema = logical.schema().clone();
let input = logical.input();
let input_dist = input.distribution();
let dist = match input_dist {
Distribution::Single => Distribution::Single,
_ => panic!(),
};

// Empty because watermark column(s) must be in group key and global simple agg have no
// group key.
let watermark_columns = FixedBitSet::with_capacity(schema.len());

// Simple agg executor might change the append-only behavior of the stream.
let base = PlanBase::new_stream(
ctx,
logical.schema().clone(),
schema,
pk_indices,
logical.functional_dependency().clone(),
dist,
false,
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
FixedBitSet::with_capacity(logical.schema().len()),
watermark_columns,
);
StreamGlobalSimpleAgg { base, logical }
}
Expand Down
19 changes: 16 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,28 @@ impl StreamGroupTopN {
assert!(!logical.group_key().is_empty());
assert!(logical.limit() > 0);
let input = logical.input();
let schema = input.schema().clone();

let watermark_columns = if input.append_only() {
input.watermark_columns().clone()
} else {
let mut watermark_columns = FixedBitSet::with_capacity(schema.len());
for &idx in logical.group_key() {
if input.watermark_columns().contains(idx) {
watermark_columns.insert(idx);
}
}
watermark_columns
};

let base = PlanBase::new_stream(
input.ctx(),
input.schema().clone(),
schema,
input.logical_pk().to_vec(),
input.functional_dependency().clone(),
input.distribution().clone(),
false,
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
FixedBitSet::with_capacity(logical.schema().len()),
watermark_columns,
);
StreamGroupTopN {
base,
Expand Down
15 changes: 12 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl StreamHashAgg {
pub fn new(logical: LogicalAgg, vnode_col_idx: Option<usize>) -> Self {
let ctx = logical.base.ctx.clone();
let pk_indices = logical.base.logical_pk.to_vec();
let schema = logical.schema().clone();
let input = logical.input();
let input_dist = input.distribution();
let dist = match input_dist {
Expand All @@ -44,16 +45,24 @@ impl StreamHashAgg {
.rewrite_provided_distribution(input_dist),
d => d.clone(),
};

let mut watermark_columns = FixedBitSet::with_capacity(schema.len());
// Watermark column(s) must be in group key.
for (idx, input_idx) in logical.group_key().iter().enumerate() {
if input.watermark_columns().contains(*input_idx) {
watermark_columns.insert(idx);
}
}

// Hash agg executor might change the append-only behavior of the stream.
let base = PlanBase::new_stream(
ctx,
logical.schema().clone(),
schema,
pk_indices,
logical.functional_dependency().clone(),
dist,
false,
// TODO: https://github.com/risingwavelabs/risingwave/issues/7205
FixedBitSet::with_capacity(logical.schema().len()),
watermark_columns,
);
StreamHashAgg {
base,
Expand Down
Loading

0 comments on commit 08bbc18

Please sign in to comment.