From 777e8367dcb344c36b4422f179e18494706afbed Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Thu, 16 Mar 2023 14:09:19 +0800 Subject: [PATCH] perf(streaming): add the missed read prefix hint on state table (#8545) --- dashboard/proto/gen/catalog.ts | 4 + proto/catalog.proto | 2 + .../planner_test/tests/testdata/nexmark.yaml | 10 +-- .../tests/testdata/nexmark_source.yaml | 78 +++++++++--------- .../tests/testdata/temporal_filter.yaml | 4 +- .../planner_test/tests/testdata/tpch.yaml | 4 +- .../tests/testdata/watermark.yaml | 2 +- .../src/optimizer/plan_node/generic/agg.rs | 80 ++++++++++++------- .../plan_node/generic/dynamic_filter.rs | 6 +- .../src/optimizer/plan_node/generic/source.rs | 2 +- .../src/optimizer/plan_node/generic/top_n.rs | 8 +- .../src/optimizer/plan_node/stream.rs | 7 +- .../src/optimizer/plan_node/stream_now.rs | 2 +- .../plan_node/stream_watermark_filter.rs | 2 +- src/frontend/src/optimizer/plan_node/utils.rs | 13 +-- 15 files changed, 128 insertions(+), 96 deletions(-) diff --git a/dashboard/proto/gen/catalog.ts b/dashboard/proto/gen/catalog.ts index 1f6feadf45796..3b37fa9cc1f32 100644 --- a/dashboard/proto/gen/catalog.ts +++ b/dashboard/proto/gen/catalog.ts @@ -264,6 +264,10 @@ export interface Table { valueIndices: number[]; definition: string; handlePkConflictBehavior: HandleConflictBehavior; + /** + * Anticipated read prefix pattern (number of fields) for the table, which can be utilized + * for implementing the table's bloom filter or other storage optimization techniques. + */ readPrefixLenHint: number; watermarkIndices: number[]; distKeyInPk: number[]; diff --git a/proto/catalog.proto b/proto/catalog.proto index 0da76e7a4a2e1..0501abfbb892d 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -169,6 +169,8 @@ message Table { repeated int32 value_indices = 20; string definition = 21; HandleConflictBehavior handle_pk_conflict_behavior = 22; + // Anticipated read prefix pattern (number of fields) for the table, which can be utilized + // for implementing the table's bloom filter or other storage optimization techniques. uint32 read_prefix_len_hint = 23; repeated int32 watermark_indices = 24; repeated int32 dist_key_in_pk = 25; diff --git a/src/frontend/planner_test/tests/testdata/nexmark.yaml b/src/frontend/planner_test/tests/testdata/nexmark.yaml index aa363c7e569a6..6c3ba568f537a 100644 --- a/src/frontend/planner_test/tests/testdata/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/nexmark.yaml @@ -784,8 +784,8 @@ BatchPlanNode Table 0 { columns: [$expr1, count, count filter((bid_price < 10000:Int32)), count filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32)), count filter((bid_price >= 1000000:Int32)), count(distinct bid_bidder), count(distinct bid_bidder) filter((bid_price < 10000:Int32)), count(distinct bid_bidder) filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32)), count(distinct bid_bidder) filter((bid_price >= 1000000:Int32)), count(distinct bid_auction), count(distinct bid_auction) filter((bid_price < 10000:Int32)), count(distinct bid_auction) filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32)), count(distinct bid_auction) filter((bid_price >= 1000000:Int32))], primary key: [$0 ASC], value indices: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], distribution key: [0], read pk prefix len hint: 1 } - Table 1 { columns: [$expr1, bid_bidder, count_for_agg_call_4, count_for_agg_call_5, count_for_agg_call_6, count_for_agg_call_7], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5], distribution key: [0], read pk prefix len hint: 0 } - Table 2 { columns: [$expr1, bid_auction, count_for_agg_call_8, count_for_agg_call_9, count_for_agg_call_10, count_for_agg_call_11], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5], distribution key: [0], read pk prefix len hint: 0 } + Table 1 { columns: [$expr1, bid_bidder, count_for_agg_call_4, count_for_agg_call_5, count_for_agg_call_6, count_for_agg_call_7], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5], distribution key: [0], read pk prefix len hint: 2 } + Table 2 { columns: [$expr1, bid_auction, count_for_agg_call_8, count_for_agg_call_9, count_for_agg_call_10, count_for_agg_call_11], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5], distribution key: [0], read pk prefix len hint: 2 } Table 4294967294 { columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], primary key: [$0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], distribution key: [0], read pk prefix len hint: 1 } - id: nexmark_q16 before: @@ -840,8 +840,8 @@ BatchPlanNode Table 0 { columns: [bid_channel, $expr1, max($expr2), count, count filter((bid_price < 10000:Int32)), count filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32)), count filter((bid_price >= 1000000:Int32)), count(distinct bid_bidder), count(distinct bid_bidder) filter((bid_price < 10000:Int32)), count(distinct bid_bidder) filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32)), count(distinct bid_bidder) filter((bid_price >= 1000000:Int32)), count(distinct bid_auction), count(distinct bid_auction) filter((bid_price < 10000:Int32)), count(distinct bid_auction) filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32)), count(distinct bid_auction) filter((bid_price >= 1000000:Int32))], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], distribution key: [0, 1], read pk prefix len hint: 2 } - Table 1 { columns: [bid_channel, $expr1, bid_bidder, count_for_agg_call_5, count_for_agg_call_6, count_for_agg_call_7, count_for_agg_call_8], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3, 4, 5, 6], distribution key: [0, 1], read pk prefix len hint: 0 } - Table 2 { columns: [bid_channel, $expr1, bid_auction, count_for_agg_call_9, count_for_agg_call_10, count_for_agg_call_11, count_for_agg_call_12], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3, 4, 5, 6], distribution key: [0, 1], read pk prefix len hint: 0 } + Table 1 { columns: [bid_channel, $expr1, bid_bidder, count_for_agg_call_5, count_for_agg_call_6, count_for_agg_call_7, count_for_agg_call_8], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3, 4, 5, 6], distribution key: [0, 1], read pk prefix len hint: 3 } + Table 2 { columns: [bid_channel, $expr1, bid_auction, count_for_agg_call_9, count_for_agg_call_10, count_for_agg_call_11, count_for_agg_call_12], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3, 4, 5, 6], distribution key: [0, 1], read pk prefix len hint: 3 } Table 4294967294 { columns: [channel, day, minute, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], primary key: [$0 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], distribution key: [0, 1], read pk prefix len hint: 2 } - id: nexmark_q17 before: @@ -1206,7 +1206,7 @@ Upstream BatchPlanNode - Table 0 { columns: [auction_id, auction_item_name, count(bid_auction)], primary key: [$2 ASC, $0 ASC, $1 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 0 } + Table 0 { columns: [auction_id, auction_item_name, count(bid_auction)], primary key: [$2 ASC, $0 ASC, $1 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 1 } Table 1 { columns: [$expr1], primary key: [], value indices: [0], distribution key: [], read pk prefix len hint: 0 } Table 2 { columns: [auction_id, auction_item_name, count(bid_auction), count], primary key: [$0 ASC, $1 ASC], value indices: [2, 3], distribution key: [0], read pk prefix len hint: 2 } Table 3 { columns: [auction_id, auction_item_name], primary key: [$0 ASC], value indices: [0, 1], distribution key: [0], read pk prefix len hint: 1 } diff --git a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml index e06d0bf0db533..6d35aa31b6d01 100644 --- a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml +++ b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml @@ -68,7 +68,7 @@ StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } source state table: 0 - Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, date_time, _row_id], primary key: [$4 ASC], value indices: [0, 1, 2, 3, 4], distribution key: [4], read pk prefix len hint: 1 } - id: nexmark_q1 before: @@ -102,7 +102,7 @@ StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } source state table: 0 - Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, date_time, _row_id], primary key: [$4 ASC], value indices: [0, 1, 2, 3, 4], distribution key: [4], read pk prefix len hint: 1 } - id: nexmark_q2 before: @@ -133,7 +133,7 @@ StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } source state table: 0 - Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, price, _row_id], primary key: [$2 ASC], value indices: [0, 1, 2], distribution key: [2], read pk prefix len hint: 1 } - id: nexmark_q3 before: @@ -196,8 +196,8 @@ Table 1 { columns: [seller, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 2 { columns: [id, name, city, state, _row_id], primary key: [$0 ASC, $4 ASC], value indices: [0, 1, 2, 3, 4], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } - Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [name, city, state, id, _row_id, seller, _row_id#1], primary key: [$4 ASC, $6 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [5], read pk prefix len hint: 3 } - id: nexmark_q4 before: @@ -282,8 +282,8 @@ Table 3 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 4 { columns: [auction, price, date_time, _row_id], primary key: [$0 ASC, $3 ASC], value indices: [0, 1, 2, 3], distribution key: [0], read pk prefix len hint: 1 } Table 5 { columns: [auction, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } - Table 7 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 7 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [category, avg], primary key: [$0 ASC], value indices: [0, 1], distribution key: [0], read pk prefix len hint: 1 } - id: nexmark_q5 before: @@ -404,7 +404,7 @@ Table 2 { columns: [max(count), window_start], primary key: [$1 ASC], value indices: [0, 1], distribution key: [1], read pk prefix len hint: 1 } Table 3 { columns: [window_start, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } Table 4 { columns: [auction, window_start, count], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0, 1], read pk prefix len hint: 2 } - Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 6 { columns: [window_start, count, auction], primary key: [$0 ASC, $1 DESC, $2 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 1 } Table 7 { columns: [window_start, max(count), count], primary key: [$0 ASC], value indices: [1, 2], distribution key: [0], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, num, window_start, window_start#1], primary key: [$0 ASC, $2 ASC, $3 ASC], value indices: [0, 1, 2, 3], distribution key: [2], read pk prefix len hint: 3 } @@ -516,7 +516,7 @@ Table 1 { columns: [price, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 2 { columns: [max(price), $expr1, $expr2], primary key: [$0 ASC, $1 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [max(price), $expr1, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 5 { columns: [$expr1, max(price), count], primary key: [$0 ASC], value indices: [1, 2], distribution key: [0], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, price, bidder, date_time, _row_id, $expr1], primary key: [$4 ASC, $5 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [1], read pk prefix len hint: 3 } - id: nexmark_q8 @@ -618,9 +618,9 @@ Table 2 { columns: [seller, $expr3, $expr4], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0, 1, 2], read pk prefix len hint: 3 } Table 3 { columns: [seller, $expr3, $expr4, _degree], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [0, 1, 2], read pk prefix len hint: 3 } Table 4 { columns: [id, name, $expr1, $expr2, count], primary key: [$0 ASC, $1 ASC, $2 ASC, $3 ASC], value indices: [4], distribution key: [0, 1, 2, 3], read pk prefix len hint: 4 } - Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 6 { columns: [seller, $expr3, $expr4, count], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [0, 1, 2], read pk prefix len hint: 3 } - Table 7 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 7 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [id, name, starttime, $expr2, seller, $expr3, $expr4], primary key: [$0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC, $5 ASC, $6 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [0, 2, 3], read pk prefix len hint: 7 } - id: nexmark_q9 before: @@ -700,8 +700,8 @@ Table 2 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], primary key: [$0 ASC, $7 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7], distribution key: [0], read pk prefix len hint: 1 } Table 4 { columns: [auction, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } - Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id, _row_id#1], primary key: [$13 ASC, $14 ASC, $0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], distribution key: [0], read pk prefix len hint: 3 } - id: nexmark_q10 before: @@ -730,7 +730,7 @@ StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } source state table: 0 - Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, date_time, date, time, _row_id], primary key: [$6 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [6], read pk prefix len hint: 1 } - id: nexmark_q11 before: @@ -826,7 +826,7 @@ StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } source state table: 0 - Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, bidtimetype, date_time, extra, _row_id], primary key: [$6 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [6], read pk prefix len hint: 1 } - id: nexmark_q15 before: @@ -880,9 +880,9 @@ source state table: 3 Table 0 { columns: [$expr1, count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count(distinct bidder), count(distinct bidder) filter((price < 10000:Int32)), count(distinct bidder) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct bidder) filter((price >= 1000000:Int32)), count(distinct auction), count(distinct auction) filter((price < 10000:Int32)), count(distinct auction) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct auction) filter((price >= 1000000:Int32))], primary key: [$0 ASC], value indices: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], distribution key: [0], read pk prefix len hint: 1 } - Table 1 { columns: [$expr1, bidder, count_for_agg_call_4, count_for_agg_call_5, count_for_agg_call_6, count_for_agg_call_7], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5], distribution key: [0], read pk prefix len hint: 0 } - Table 2 { columns: [$expr1, auction, count_for_agg_call_8, count_for_agg_call_9, count_for_agg_call_10, count_for_agg_call_11], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5], distribution key: [0], read pk prefix len hint: 0 } - Table 3 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 1 { columns: [$expr1, bidder, count_for_agg_call_4, count_for_agg_call_5, count_for_agg_call_6, count_for_agg_call_7], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5], distribution key: [0], read pk prefix len hint: 2 } + Table 2 { columns: [$expr1, auction, count_for_agg_call_8, count_for_agg_call_9, count_for_agg_call_10, count_for_agg_call_11], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5], distribution key: [0], read pk prefix len hint: 2 } + Table 3 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], primary key: [$0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], distribution key: [0], read pk prefix len hint: 1 } - id: nexmark_q16 before: @@ -938,9 +938,9 @@ source state table: 3 Table 0 { columns: [channel, $expr1, max($expr2), count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count(distinct bidder), count(distinct bidder) filter((price < 10000:Int32)), count(distinct bidder) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct bidder) filter((price >= 1000000:Int32)), count(distinct auction), count(distinct auction) filter((price < 10000:Int32)), count(distinct auction) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct auction) filter((price >= 1000000:Int32))], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], distribution key: [0, 1], read pk prefix len hint: 2 } - Table 1 { columns: [channel, $expr1, bidder, count_for_agg_call_5, count_for_agg_call_6, count_for_agg_call_7, count_for_agg_call_8], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3, 4, 5, 6], distribution key: [0, 1], read pk prefix len hint: 0 } - Table 2 { columns: [channel, $expr1, auction, count_for_agg_call_9, count_for_agg_call_10, count_for_agg_call_11, count_for_agg_call_12], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3, 4, 5, 6], distribution key: [0, 1], read pk prefix len hint: 0 } - Table 3 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 1 { columns: [channel, $expr1, bidder, count_for_agg_call_5, count_for_agg_call_6, count_for_agg_call_7, count_for_agg_call_8], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3, 4, 5, 6], distribution key: [0, 1], read pk prefix len hint: 3 } + Table 2 { columns: [channel, $expr1, auction, count_for_agg_call_9, count_for_agg_call_10, count_for_agg_call_11, count_for_agg_call_12], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3, 4, 5, 6], distribution key: [0, 1], read pk prefix len hint: 3 } + Table 3 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [channel, day, minute, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], primary key: [$0 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], distribution key: [0, 1], read pk prefix len hint: 2 } - id: nexmark_q17 before: @@ -990,7 +990,7 @@ source state table: 1 Table 0 { columns: [auction, $expr1, count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), min(price), max(price), sum(price), count(price)], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5, 6, 7, 8, 9], distribution key: [0, 1], read pk prefix len hint: 2 } - Table 1 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 1 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, day, total_bids, rank1_bids, rank2_bids, rank3_bids, min_price, max_price, avg_price, sum_price], primary key: [$0 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], distribution key: [0, 1], read pk prefix len hint: 2 } - id: nexmark_q18 before: @@ -1038,7 +1038,7 @@ source state table: 1 Table 0 { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], primary key: [$1 ASC, $0 ASC, $5 DESC, $7 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7], distribution key: [1, 0], read pk prefix len hint: 2 } - Table 1 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 1 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], primary key: [$7 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7], distribution key: [7], read pk prefix len hint: 1 } - id: nexmark_q19 before: @@ -1117,8 +1117,8 @@ Table 1 { columns: [auction, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 2 { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, _row_id], primary key: [$0 ASC, $9 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } - Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, channel, url, date_timeb, item_name, description, initial_bid, reserve, date_timea, expires, seller, category, _row_id, _row_id#1], primary key: [$14 ASC, $15 ASC, $0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], distribution key: [0], read pk prefix len hint: 3 } - id: nexmark_q21 before: @@ -1170,7 +1170,7 @@ StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } source state table: 0 - Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, channel, dir1, dir2, dir3, _row_id], primary key: [$7 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7], distribution key: [7], read pk prefix len hint: 1 } - id: nexmark_q101 before: @@ -1240,9 +1240,9 @@ Table 1 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 2 { columns: [auction, max(price)], primary key: [$0 ASC], value indices: [0, 1], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [auction, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } - Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 5 { columns: [auction, max(price), count], primary key: [$0 ASC], value indices: [1, 2], distribution key: [0], read pk prefix len hint: 1 } - Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction_id, auction_item_name, current_highest_bid, _row_id, auction], primary key: [$3 ASC, $4 ASC, $0 ASC], value indices: [0, 1, 2, 3, 4], distribution key: [0], read pk prefix len hint: 3 } - id: nexmark_q102 before: @@ -1352,15 +1352,15 @@ StreamProject { exprs: [auction, _row_id] } StreamExchange Hash([1]) from 3 - Table 0 { columns: [id, item_name, count(auction)], primary key: [$2 ASC, $0 ASC, $1 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 0 } + Table 0 { columns: [id, item_name, count(auction)], primary key: [$2 ASC, $0 ASC, $1 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 1 } Table 1 { columns: [$expr1], primary key: [], value indices: [0], distribution key: [], read pk prefix len hint: 0 } Table 2 { columns: [id, item_name, count(auction), count], primary key: [$0 ASC, $1 ASC], value indices: [2, 3], distribution key: [0], read pk prefix len hint: 2 } Table 3 { columns: [id, item_name, _row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 1 } Table 4 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 5 { columns: [auction, _row_id], primary key: [$0 ASC, $1 ASC], value indices: [0, 1], distribution key: [0], read pk prefix len hint: 1 } Table 6 { columns: [auction, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 7 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } - Table 8 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 7 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 8 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 9 { columns: [sum0(count), count(auction), count], primary key: [], value indices: [0, 1, 2], distribution key: [], read pk prefix len hint: 0 } Table 10 { columns: [auction, count], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction_id, auction_item_name, bid_count], primary key: [$0 ASC, $1 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 2 } @@ -1432,9 +1432,9 @@ Table 1 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 2 { columns: [auction], primary key: [$0 ASC], value indices: [0], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [auction, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } - Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 5 { columns: [auction, count], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } - Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction_id, auction_item_name, _row_id], primary key: [$2 ASC, $0 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 2 } - id: nexmark_q104 before: @@ -1504,9 +1504,9 @@ Table 1 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 2 { columns: [auction], primary key: [$0 ASC], value indices: [0], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [auction, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } - Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 5 { columns: [auction, count], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } - Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction_id, auction_item_name, _row_id], primary key: [$2 ASC, $0 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 2 } - id: nexmark_q105 before: @@ -1594,8 +1594,8 @@ Table 4 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 5 { columns: [auction, _row_id], primary key: [$0 ASC, $1 ASC], value indices: [0, 1], distribution key: [0], read pk prefix len hint: 1 } Table 6 { columns: [auction, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 7 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } - Table 8 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 7 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 8 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction_id, auction_item_name, bid_count], primary key: [$2 DESC, $0 ASC, $1 ASC], value indices: [0, 1, 2], distribution key: [], read pk prefix len hint: 2 } - id: nexmark_q106 before: @@ -1696,6 +1696,6 @@ Table 6 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 7 { columns: [auction, price, date_time, _row_id], primary key: [$0 ASC, $3 ASC], value indices: [0, 1, 2, 3], distribution key: [0], read pk prefix len hint: 1 } Table 8 { columns: [auction, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 9 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } - Table 10 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 9 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 10 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [min_final], primary key: [], value indices: [0], distribution key: [], read pk prefix len hint: 0 } diff --git a/src/frontend/planner_test/tests/testdata/temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/temporal_filter.yaml index d9827629eff4d..b0929c9ddb8ab 100644 --- a/src/frontend/planner_test/tests/testdata/temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/temporal_filter.yaml @@ -80,9 +80,9 @@ StreamNow { output: [now] } state table: 5 - Table 0 { columns: [t1_ts, t1__row_id], primary key: [$0 ASC, $1 ASC], value indices: [0, 1], distribution key: [1], read pk prefix len hint: 0 } + Table 0 { columns: [t1_ts, t1__row_id], primary key: [$0 ASC, $1 ASC], value indices: [0, 1], distribution key: [1], read pk prefix len hint: 1 } Table 1 { columns: [$expr2], primary key: [], value indices: [0], distribution key: [], read pk prefix len hint: 0 } - Table 2 { columns: [t1_ts, t1__row_id], primary key: [$0 ASC, $1 ASC], value indices: [0, 1], distribution key: [1], read pk prefix len hint: 0 } + Table 2 { columns: [t1_ts, t1__row_id], primary key: [$0 ASC, $1 ASC], value indices: [0, 1], distribution key: [1], read pk prefix len hint: 1 } Table 3 { columns: [$expr1], primary key: [], value indices: [0], distribution key: [], read pk prefix len hint: 0 } Table 4 { columns: [now], primary key: [], value indices: [0], distribution key: [], read pk prefix len hint: 0 } Table 5 { columns: [now], primary key: [], value indices: [0], distribution key: [], read pk prefix len hint: 0 } diff --git a/src/frontend/planner_test/tests/testdata/tpch.yaml b/src/frontend/planner_test/tests/testdata/tpch.yaml index 296f3253b3986..fca70439fe903 100644 --- a/src/frontend/planner_test/tests/testdata/tpch.yaml +++ b/src/frontend/planner_test/tests/testdata/tpch.yaml @@ -2126,7 +2126,7 @@ StreamProject { exprs: [(partsupp.ps_supplycost * partsupp.ps_availqty) as $expr2, partsupp.ps_partkey, partsupp.ps_suppkey, supplier.s_suppkey, nation.n_nationkey, supplier.s_nationkey] } StreamExchange Hash([5]) from 2 - Table 0 { columns: [partsupp_ps_partkey, sum($expr1)], primary key: [$1 ASC, $0 ASC], value indices: [0, 1], distribution key: [0], read pk prefix len hint: 0 } + Table 0 { columns: [partsupp_ps_partkey, sum($expr1)], primary key: [$1 ASC, $0 ASC], value indices: [0, 1], distribution key: [0], read pk prefix len hint: 1 } Table 1 { columns: [$expr3], primary key: [], value indices: [0], distribution key: [], read pk prefix len hint: 0 } Table 2 { columns: [partsupp_ps_partkey, sum($expr1), count], primary key: [$0 ASC], value indices: [1, 2], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [partsupp_ps_partkey, partsupp_ps_availqty, partsupp_ps_supplycost, supplier_s_nationkey, partsupp_ps_suppkey, supplier_s_suppkey], primary key: [$3 ASC, $0 ASC, $4 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3], read pk prefix len hint: 1 } @@ -2714,7 +2714,7 @@ BatchPlanNode Table 0 { columns: [part_p_brand, part_p_type, part_p_size, count(distinct partsupp_ps_suppkey), count], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3, 4], distribution key: [0, 1, 2], read pk prefix len hint: 3 } - Table 1 { columns: [part_p_brand, part_p_type, part_p_size, partsupp_ps_suppkey, count_for_agg_call_0], primary key: [$0 ASC, $1 ASC, $2 ASC, $3 ASC], value indices: [4], distribution key: [0, 1, 2], read pk prefix len hint: 0 } + Table 1 { columns: [part_p_brand, part_p_type, part_p_size, partsupp_ps_suppkey, count_for_agg_call_0], primary key: [$0 ASC, $1 ASC, $2 ASC, $3 ASC], value indices: [4], distribution key: [0, 1, 2], read pk prefix len hint: 4 } Table 2 { columns: [partsupp_ps_suppkey, part_p_brand, part_p_type, part_p_size, partsupp_ps_partkey, part_p_partkey], primary key: [$0 ASC, $4 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [partsupp_ps_suppkey, partsupp_ps_partkey, part_p_partkey, _degree], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [0], read pk prefix len hint: 1 } Table 4 { columns: [supplier_s_suppkey], primary key: [$0 ASC], value indices: [0], distribution key: [0], read pk prefix len hint: 1 } diff --git a/src/frontend/planner_test/tests/testdata/watermark.yaml b/src/frontend/planner_test/tests/testdata/watermark.yaml index 308c848e7f80a..ea49a48c88910 100644 --- a/src/frontend/planner_test/tests/testdata/watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/watermark.yaml @@ -26,7 +26,7 @@ StreamSource { source: "t", columns: ["v1", "_row_id"] } source state table: 1 - Table 1 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 } + Table 1 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [v1, _row_id], primary key: [$1 ASC], value indices: [0, 1], distribution key: [1], read pk prefix len hint: 1 } - name: watermark on append only table with source sql: | diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index a0322817b4aea..931bfa2371160 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -196,33 +196,56 @@ impl Agg { let mut included_upstream_indices = vec![]; // all upstream indices that are included in the state table let mut column_mapping = BTreeMap::new(); // key: upstream col idx, value: table col idx let mut table_value_indices = BTreeSet::new(); // table column indices of value columns - let mut add_column = |upstream_idx, order_type, is_value| { - column_mapping.entry(upstream_idx).or_insert_with(|| { - let table_col_idx = - internal_table_catalog_builder.add_column(&in_fields[upstream_idx]); - if let Some(order_type) = order_type { - internal_table_catalog_builder.add_order_column(table_col_idx, order_type); + let mut add_column = + |upstream_idx, + order_type, + is_value, + internal_table_catalog_builder: &mut TableCatalogBuilder| { + column_mapping.entry(upstream_idx).or_insert_with(|| { + let table_col_idx = + internal_table_catalog_builder.add_column(&in_fields[upstream_idx]); + if let Some(order_type) = order_type { + internal_table_catalog_builder + .add_order_column(table_col_idx, order_type); + } + included_upstream_indices.push(upstream_idx); + table_col_idx + }); + if is_value { + // note that some indices may be added before as group keys which are not + // value + table_value_indices.insert(column_mapping[&upstream_idx]); } - included_upstream_indices.push(upstream_idx); - table_col_idx - }); - if is_value { - // note that some indices may be added before as group keys which are not value - table_value_indices.insert(column_mapping[&upstream_idx]); - } - }; + }; for &idx in &self.group_key { - add_column(idx, Some(OrderType::ascending()), false); + add_column( + idx, + Some(OrderType::ascending()), + false, + &mut internal_table_catalog_builder, + ); } + let read_prefix_len_hint = internal_table_catalog_builder.get_current_pk_len(); + for (order_type, idx) in sort_keys { - add_column(idx, Some(order_type), true); + add_column( + idx, + Some(order_type), + true, + &mut internal_table_catalog_builder, + ); } for &idx in &in_pks { - add_column(idx, Some(OrderType::ascending()), true); + add_column( + idx, + Some(OrderType::ascending()), + true, + &mut internal_table_catalog_builder, + ); } for idx in include_keys { - add_column(idx, None, true); + add_column(idx, None, true, &mut internal_table_catalog_builder); } let mapping = @@ -232,16 +255,13 @@ impl Agg { internal_table_catalog_builder.set_vnode_col_idx(tb_vnode_idx); } - // prefix_len_hint should be the length of deduplicated group key because pk is - // deduplicated. - let prefix_len = self.group_key.iter().unique().count(); - internal_table_catalog_builder.set_read_prefix_len_hint(prefix_len); // set value indices to reduce ser/de overhead let table_value_indices = table_value_indices.into_iter().collect_vec(); internal_table_catalog_builder.set_value_indices(table_value_indices.clone()); MaterializedInputState { - table: internal_table_catalog_builder.build(tb_dist.unwrap_or_default()), + table: internal_table_catalog_builder + .build(tb_dist.unwrap_or_default(), read_prefix_len_hint), included_upstream_indices, table_value_indices, } @@ -258,8 +278,7 @@ impl Agg { .add_order_column(tb_column_idx, OrderType::ascending()); included_upstream_indices.push(idx); } - - internal_table_catalog_builder.set_read_prefix_len_hint(self.group_key.len()); + let read_prefix_len_hint = internal_table_catalog_builder.get_current_pk_len(); match agg_kind { AggKind::ApproxCountDistinct => { @@ -288,7 +307,8 @@ impl Agg { internal_table_catalog_builder.set_vnode_col_idx(tb_vnode_idx); } TableState { - table: internal_table_catalog_builder.build(tb_dist.unwrap_or_default()), + table: internal_table_catalog_builder + .build(tb_dist.unwrap_or_default(), read_prefix_len_hint), } }; @@ -369,7 +389,8 @@ impl Agg { .add_order_column(tb_column_idx, OrderType::ascending()); } } - internal_table_catalog_builder.set_read_prefix_len_hint(self.group_key.len()); + let read_prefix_len_hint = self.group_key.len(); + let mapping = self.i2o_col_mapping(); let tb_dist = mapping.rewrite_dist_key(&in_dist_key).unwrap_or_default(); if let Some(tb_vnode_idx) = vnode_col_idx.and_then(|idx| mapping.try_map(idx)) { @@ -380,7 +401,7 @@ impl Agg { // of this table should skip group_key.len(). internal_table_catalog_builder .set_value_indices((self.group_key.len()..out_fields.len()).collect()); - internal_table_catalog_builder.build(tb_dist) + internal_table_catalog_builder.build(tb_dist, read_prefix_len_hint) } /// Infer dedup tables for distinct agg calls, partitioned by distinct columns. @@ -417,6 +438,7 @@ impl Agg { let table_col_idx = table_builder.add_column(&in_fields[idx]); table_builder.add_order_column(table_col_idx, OrderType::ascending()); } + let read_prefix_len_hint = table_builder.get_current_pk_len(); // Agg calls with same distinct column share the same dedup table, but they may have // different filter conditions, so the count of occurrence of one distinct key may @@ -437,7 +459,7 @@ impl Agg { table_builder.set_vnode_col_idx(idx); } let dist_key = mapping.rewrite_dist_key(&in_dist_key).unwrap_or_default(); - let table = table_builder.build(dist_key); + let table = table_builder.build(dist_key, read_prefix_len_hint); (distinct_col, table) }) .collect() diff --git a/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs index 4ba5b9f885230..3201147335aee 100644 --- a/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs @@ -66,6 +66,8 @@ pub fn infer_left_internal_table_catalog( // The pk of dynamic filter internal table should be left_key + input_pk. let mut pk_indices = vec![left_key_index]; + let read_prefix_len_hint = pk_indices.len(); + // TODO(yuhao): dedup the dist key and pk. pk_indices.extend(me.logical_pk()); @@ -80,7 +82,7 @@ pub fn infer_left_internal_table_catalog( internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending()) }); - internal_table_catalog_builder.build(dist_keys) + internal_table_catalog_builder.build(dist_keys, read_prefix_len_hint) } pub fn infer_right_internal_table_catalog(input: &impl stream::StreamPlanRef) -> TableCatalog { @@ -100,5 +102,5 @@ pub fn infer_right_internal_table_catalog(input: &impl stream::StreamPlanRef) -> }); // No distribution keys - internal_table_catalog_builder.build(vec![]) + internal_table_catalog_builder.build(vec![], 0) } diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 56d80b3ae644c..9cf55c4de13c7 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -106,6 +106,6 @@ impl Source { builder.add_column(&value); builder.add_order_column(ordered_col_idx, OrderType::ascending()); - builder.build(vec![]) + builder.build(vec![], 1) } } diff --git a/src/frontend/src/optimizer/plan_node/generic/top_n.rs b/src/frontend/src/optimizer/plan_node/generic/top_n.rs index a356ef9375280..d546bcd93debe 100644 --- a/src/frontend/src/optimizer/plan_node/generic/top_n.rs +++ b/src/frontend/src/optimizer/plan_node/generic/top_n.rs @@ -62,6 +62,7 @@ impl TopN { order_cols.insert(idx); }); + let read_prefix_len_hint = internal_table_catalog_builder.get_current_pk_len(); column_orders.iter().for_each(|order| { if !order_cols.contains(&order.column_index) { internal_table_catalog_builder @@ -80,9 +81,10 @@ impl TopN { internal_table_catalog_builder.set_vnode_col_idx(vnode_col_idx); } - internal_table_catalog_builder.set_read_prefix_len_hint(self.group_key.len()); - internal_table_catalog_builder - .build(self.input.distribution().dist_column_indices().to_vec()) + internal_table_catalog_builder.build( + self.input.distribution().dist_column_indices().to_vec(), + read_prefix_len_hint, + ) } } diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 0222cd9e82d09..c63795d5fb44c 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -287,15 +287,12 @@ impl HashJoin { degree_table_catalog_builder .set_value_indices(vec![degree_table_catalog_builder.columns().len() - 1]); - internal_table_catalog_builder.set_read_prefix_len_hint(join_key_len); - degree_table_catalog_builder.set_read_prefix_len_hint(join_key_len); - internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone()); degree_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk); ( - internal_table_catalog_builder.build(internal_table_dist_keys), - degree_table_catalog_builder.build(degree_table_dist_keys), + internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len), + degree_table_catalog_builder.build(degree_table_dist_keys, join_key_len), deduped_input_pk_indices, ) } diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs index 2a3695b259358..113c825c7fbf1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_now.rs +++ b/src/frontend/src/optimizer/plan_node/stream_now.rs @@ -90,7 +90,7 @@ impl StreamNode for StreamNow { }); let table_catalog = internal_table_catalog_builder - .build(dist_keys) + .build(dist_keys, 0) .with_id(state.gen_table_id_wrapped()); NodeBody::Now(NowNode { state_table: Some(table_catalog.to_internal_table_prost()), diff --git a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs index a0d5a5468a7b1..45ef145bfd4fb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs @@ -137,7 +137,7 @@ pub fn infer_internal_table_catalog(watermark_type: DataType) -> TableCatalog { builder.set_vnode_col_idx(0); builder.set_value_indices(vec![1]); - builder.build(vec![0]) + builder.build(vec![0], 1) } impl StreamNode for StreamWatermarkFilter { diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index ad46f2f387f6f..240e0e3d39564 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -75,8 +75,9 @@ impl TableCatalogBuilder { self.pk.push(ColumnOrder::new(column_index, order_type)); } - pub fn set_read_prefix_len_hint(&mut self, read_prefix_len_hint: usize) { - self.read_prefix_len_hint = read_prefix_len_hint; + /// get the current exist field number of the primary key. + pub fn get_current_pk_len(&self) -> usize { + self.pk.len() } pub fn set_vnode_col_idx(&mut self, vnode_col_idx: usize) { @@ -114,8 +115,10 @@ impl TableCatalogBuilder { self.column_names.insert(column_desc.name.clone(), 0); } - /// Consume builder and create `TableCatalog` (for proto). - pub fn build(self, distribution_key: Vec) -> TableCatalog { + /// Consume builder and create `TableCatalog` (for proto). The `read_prefix_len_hint` is the + /// anticipated read prefix pattern (number of fields) for the table, which can be utilized for + /// implementing the table's bloom filter or other storage optimization techniques. + pub fn build(self, distribution_key: Vec, read_prefix_len_hint: usize) -> TableCatalog { assert!(self.read_prefix_len_hint <= self.pk.len()); let watermark_columns = match self.watermark_columns { Some(w) => w, @@ -144,7 +147,7 @@ impl TableCatalogBuilder { .unwrap_or_else(|| (0..self.columns.len()).collect_vec()), definition: "".into(), conflict_behavior_type: 0, - read_prefix_len_hint: self.read_prefix_len_hint, + read_prefix_len_hint, version: None, // the internal table is not versioned and can't be schema changed watermark_columns, dist_key_in_pk: self.dist_key_in_pk.unwrap_or(vec![]),