Skip to content

Commit

Permalink
fix(cubesql): Fix SELECT DISTINCT on pushdown (#9144)
Browse files Browse the repository at this point in the history
* add test_select_distinct_dimensions

* started doing transforming_rewrite → "select-distinct-dimensions"

* fix tests

* wip: implement self.select_distinct_dimensions()

* fix tests

* some improvements

* implement select_distinct_dimensions

* adding tests

* refactor a bit

* improve select_distinct_dimensions()

* more tests

* more cases covered in select_distinct_dimensions

* improve

* some improvements
  • Loading branch information
KSDaemon authored Feb 12, 2025
1 parent 60cff30 commit 6483f66
Show file tree
Hide file tree
Showing 2 changed files with 282 additions and 5 deletions.
184 changes: 182 additions & 2 deletions rust/cubesql/cubesql/src/compile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8205,6 +8205,184 @@ ORDER BY "source"."str0" ASC
)
}

#[tokio::test]
async fn test_select_distinct_dimensions() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_testing_logger();

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT customer_gender FROM KibanaSampleDataEcommerce".to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec![
"KibanaSampleDataEcommerce.customer_gender".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
..Default::default()
}
);

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT customer_gender FROM KibanaSampleDataEcommerce LIMIT 100".to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec![
"KibanaSampleDataEcommerce.customer_gender".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
limit: Some(100),
..Default::default()
}
);

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT * FROM (SELECT customer_gender FROM KibanaSampleDataEcommerce LIMIT 100) q_0".to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec![
"KibanaSampleDataEcommerce.customer_gender".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
limit: Some(100),
ungrouped: Some(true),
..Default::default()
}
);

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT customer_gender, order_date FROM KibanaSampleDataEcommerce"
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec![
"KibanaSampleDataEcommerce.customer_gender".to_string(),
"KibanaSampleDataEcommerce.order_date".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
..Default::default()
}
);

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT MAX(maxPrice) FROM KibanaSampleDataEcommerce".to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec!["KibanaSampleDataEcommerce.maxPrice".to_string(),]),
dimensions: Some(vec![]),
segments: Some(vec![]),
order: Some(vec![]),
..Default::default()
}
);

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT * FROM (SELECT customer_gender, MAX(maxPrice) FROM KibanaSampleDataEcommerce GROUP BY 1) q_0".to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec!["KibanaSampleDataEcommerce.maxPrice".to_string(),]),
dimensions: Some(vec![
"KibanaSampleDataEcommerce.customer_gender".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
..Default::default()
}
);

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT * FROM KibanaSampleDataEcommerce".to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![
"KibanaSampleDataEcommerce.count".to_string(),
"KibanaSampleDataEcommerce.maxPrice".to_string(),
"KibanaSampleDataEcommerce.sumPrice".to_string(),
"KibanaSampleDataEcommerce.minPrice".to_string(),
"KibanaSampleDataEcommerce.avgPrice".to_string(),
"KibanaSampleDataEcommerce.countDistinct".to_string(),
]),
dimensions: Some(vec![
"KibanaSampleDataEcommerce.order_date".to_string(),
"KibanaSampleDataEcommerce.last_mod".to_string(),
"KibanaSampleDataEcommerce.customer_gender".to_string(),
"KibanaSampleDataEcommerce.notes".to_string(),
"KibanaSampleDataEcommerce.taxful_total_price".to_string(),
"KibanaSampleDataEcommerce.has_subscription".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
ungrouped: Some(true),
..Default::default()
}
)
}

#[tokio::test]
async fn test_sort_relations() -> Result<(), CubeError> {
init_testing_logger();
Expand Down Expand Up @@ -15665,8 +15843,10 @@ LIMIT {{ limit }}{% endif %}"#.to_string(),
"KibanaSampleDataEcommerce.customer_gender".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
ungrouped: Some(true),
order: Some(vec![vec![
"KibanaSampleDataEcommerce.customer_gender".to_string(),
"asc".to_string()
],]),
..Default::default()
}
)
Expand Down
103 changes: 100 additions & 3 deletions rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::{
compile::rewrite::{
agg_fun_expr, aggregate, alias_expr, all_members,
analysis::{ConstantFolding, LogicalPlanData, MemberNamesToExpr, OriginalExpr},
analysis::{ConstantFolding, LogicalPlanData, Member, MemberNamesToExpr, OriginalExpr},
binary_expr, cast_expr, change_user_expr, column_expr, cross_join, cube_scan,
cube_scan_filters_empty_tail, cube_scan_members, cube_scan_members_empty_tail,
cube_scan_order_empty_tail, dimension_expr, expr_column_name, fun_expr, join, like_expr,
limit, list_concat_pushdown_replacer, list_concat_pushup_replacer, literal_expr,
cube_scan_order_empty_tail, dimension_expr, distinct, expr_column_name, fun_expr, join,
like_expr, limit, list_concat_pushdown_replacer, list_concat_pushup_replacer, literal_expr,
literal_member, measure_expr, member_pushdown_replacer, member_replacer,
merged_members_replacer, original_expr_name, projection, referenced_columns, rewrite,
rewriter::{CubeEGraph, CubeRewrite, RewriteRules},
Expand Down Expand Up @@ -262,6 +262,39 @@ impl RewriteRules for MemberRules {
),
self.push_down_limit("?skip", "?fetch", "?new_skip", "?new_fetch"),
),
transforming_rewrite(
"select-distinct-dimensions",
distinct(cube_scan(
"?alias_to_cube",
"?members",
"?filters",
"?orders",
"CubeScanLimit:None",
"CubeScanOffset:None",
"?split",
"?can_pushdown_join",
"CubeScanWrapped:false",
"?left_ungrouped",
)),
cube_scan(
"?alias_to_cube",
"?members",
"?filters",
"?orders",
"CubeScanLimit:None",
"CubeScanOffset:None",
"?split",
"?can_pushdown_join",
"CubeScanWrapped:false",
"CubeScanUngrouped:false",
),
self.select_distinct_dimensions(
"?alias_to_cube",
"?members",
"?filters",
"?left_ungrouped",
),
),
// MOD function to binary expr
transforming_rewrite_with_root(
"mod-fun-to-binary-expr",
Expand Down Expand Up @@ -1478,6 +1511,70 @@ impl MemberRules {
)
}

fn select_distinct_dimensions(
&self,
alias_to_cube_var: &'static str,
members_var: &'static str,
filters_var: &'static str,
left_ungrouped_var: &'static str,
) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool {
let alias_to_cube_var = var!(alias_to_cube_var);
let members_var = var!(members_var);
let filters_var = var!(filters_var);
let left_ungrouped_var = var!(left_ungrouped_var);
let meta_context = self.meta_context.clone();

move |egraph, subst| {
let empty_filters = &egraph[subst[filters_var]]
.data
.is_empty_list
.unwrap_or(true);
let ungrouped =
var_iter!(egraph[subst[left_ungrouped_var]], CubeScanUngrouped).any(|v| *v);

if !empty_filters && ungrouped {
return false;
}

let res = match egraph
.index(subst[members_var])
.data
.member_name_to_expr
.as_ref()
{
Some(names_to_expr) => {
names_to_expr.list.iter().all(|(_, member, _)| {
// we should allow transform for queries with dimensions only,
// as it doesn't make sense for measures
match member {
Member::Dimension { .. } => true,
Member::VirtualField { .. } => true,
Member::LiteralMember { .. } => true,
_ => false,
}
})
}
None => {
// this might be the case of `SELECT DISTINCT *`
// we need to check that there are only dimensions defined in the referenced cube(s)
var_iter!(egraph[subst[alias_to_cube_var]], CubeScanAliasToCube)
.cloned()
.all(|alias_to_cube| {
alias_to_cube.iter().all(|(_, cube_name)| {
if let Some(cube) = meta_context.find_cube_with_name(&cube_name) {
cube.measures.len() == 0 && cube.segments.len() == 0
} else {
false
}
})
})
}
};

res
}
}

fn push_down_non_empty_aggregate(
&self,
alias_to_cube_var: &'static str,
Expand Down

0 comments on commit 6483f66

Please sign in to comment.