Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: nexmark q103 #8165

Closed
Tracked by #7289
lmatz opened this issue Feb 23, 2023 · 2 comments
Closed
Tracked by #7289

perf: nexmark q103 #8165

lmatz opened this issue Feb 23, 2023 · 2 comments

Comments

@lmatz
Copy link
Contributor

lmatz commented Feb 23, 2023

Query:

CREATE MATERIALIZED VIEW nexmark_q103
AS
SELECT
    a.id AS auction_id,
    a.item_name AS auction_item_name
FROM auction a
WHERE a.id IN (
    SELECT b.auction FROM bid b
    GROUP BY b.auction
    HAVING COUNT(*) >= 20
);

RW:

 StreamMaterialize { columns: [auction_id, auction_item_name, _row_id(hidden)], pk_columns: [_row_id, auction_id], pk_conflict: "no check" }
 └─StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr3 }
   ├─StreamExchange { dist: HashShard($expr1) }
   | └─StreamProject { exprs: [Field(auction, 0:Int32) as $expr1, Field(auction, 1:Int32) as $expr2, _row_id] }
   |   └─StreamFilter { predicate: (event_type = 1:Int32) }
   |     └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
   |       └─StreamShare { id = 547 }
   |         └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
   |           └─StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) }
   |             └─StreamRowIdGen { row_id_index: 4 }
   |               └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
   └─StreamProject { exprs: [$expr3] }
     └─StreamFilter { predicate: (count >= 20:Int32) }
       └─StreamProject { exprs: [$expr3, count] }
         └─StreamAppendOnlyHashAgg { group_key: [$expr3], aggs: [count, count] }
           └─StreamExchange { dist: HashShard($expr3) }
             └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr3, _row_id] }
               └─StreamFilter { predicate: (event_type = 2:Int32) }
                 └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
                   └─StreamShare { id = 547 }
                     └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
                       └─StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) }
                         └─StreamRowIdGen { row_id_index: 4 }
                           └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
(24 rows)
 Fragment 0
   StreamMaterialize { columns: [auction_id, auction_item_name, _row_id(hidden)], pk_columns: [_row_id, auction_id], pk_conflict: "no check" }
       materialized table: 4294967294
     StreamHashJoin { type: LeftSemi, predicate: $expr296 = $expr298 }
         left table: 0, right table 2, left degree table: 1, right degree table: 3,
       StreamExchange Hash([0]) from 1
       StreamProject { exprs: [$expr298] }
         StreamFilter { predicate: (count >= 20:Int32) }
           StreamProject { exprs: [$expr298, count] }
             StreamAppendOnlyHashAgg { group_key: [$expr298], aggs: [count, count] }
                 result table: 5, state tables: []
               StreamExchange Hash([0]) from 3
 
 Fragment 1
   StreamProject { exprs: [Field(auction, 0:Int32) as $expr296, Field(auction, 1:Int32) as $expr297, _row_id] }
     StreamFilter { predicate: (event_type = 1:Int32) }
       StreamProject { exprs: [event_type, auction, bid, _row_id] }
         StreamExchange Hash([3]) from 2
 
 Fragment 2
   StreamProject { exprs: [event_type, auction, bid, _row_id] }
     StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) }
       StreamRowIdGen { row_id_index: 4 }
         StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
             source state table: 4
 
 Fragment 3
   StreamProject { exprs: [Field(bid, 0:Int32) as $expr298, _row_id] }
     StreamFilter { predicate: (event_type = 2:Int32) }
       StreamProject { exprs: [event_type, auction, bid, _row_id] }
         StreamExchange Hash([3]) from 2
 
  Table 0 { columns: [$expr296, $expr297, _row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
  Table 1 { columns: [$expr296, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
  Table 2 { columns: [$expr298], primary key: [$0 ASC], value indices: [0], distribution key: [0] }
  Table 3 { columns: [$expr298, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0] }
  Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [] }
  Table 5 { columns: [$expr298, count, count_0], primary key: [$0 ASC], value indices: [1, 2], distribution key: [0] }
  Table 4294967294 { columns: [auction_id, auction_item_name, _row_id], primary key: [$2 ASC, $0 ASC], value indices: [0, 1, 2], distribution key: [0] }
(39 rows)
@github-actions github-actions bot added this to the release-0.1.18 milestone Feb 23, 2023
@lmatz
Copy link
Contributor Author

lmatz commented Feb 23, 2023

Flink:

== Optimized Physical Plan ==
Join(joinType=[LeftSemiJoin], where=[=(id, auction)], select=[id, itemName], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
:- Exchange(distribution=[hash[id]])
:  +- Calc(select=[auction.id AS id, auction.itemName AS itemName], where=[=(event_type, 1)])
:     +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
:        +- Calc(select=[event_type, person, auction, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
:           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
+- Exchange(distribution=[hash[auction]])
   +- Calc(select=[auction], where=[>=($f1, 20)])
      +- GroupAggregate(groupBy=[auction], select=[auction, COUNT(*) AS $f1])
         +- Exchange(distribution=[hash[auction]])
            +- Calc(select=[bid.auction AS auction], where=[=(event_type, 2)])
               +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
                  +- Calc(select=[event_type, person, auction, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
                     +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

== Optimized Execution Plan ==
Join(joinType=[LeftSemiJoin], where=[(id = auction)], select=[id, itemName], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
:- Exchange(distribution=[hash[id]])
:  +- Calc(select=[auction.id AS id, auction.itemName AS itemName], where=[(event_type = 1)])
:     +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])(reuse_id=[1])
:        +- Calc(select=[event_type, person, auction, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
:           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
+- Exchange(distribution=[hash[auction]])
   +- Calc(select=[auction], where=[($f1 >= 20)])
      +- GroupAggregate(groupBy=[auction], select=[auction, COUNT(*) AS $f1])
         +- Exchange(distribution=[hash[auction]])
            +- Calc(select=[bid.auction AS auction], where=[(event_type = 2)])
               +- Reused(reference_id=[1])

@lmatz lmatz removed this from the release-0.18 milestone Mar 22, 2023
@github-actions
Copy link
Contributor

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

@lmatz lmatz closed this as not planned Won't fix, can't repro, duplicate, stale Jan 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant