Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): support delta join on primary table #7662

Merged
merged 4 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ statement ok
set streaming_parallelism = 0;

statement ok
create materialized view v as select * from a join b on a.a1 = b.b1 ;
create materialized view v as select * from a join b on a.a1 = b.b1;

query IIII rowsort
select * from v order by a1, a2, b1, b2;
Expand All @@ -50,3 +50,6 @@ drop table a;

statement ok
drop table b;

statement ok
set rw_streaming_enable_delta_join = false;
47 changes: 47 additions & 0 deletions e2e_test/streaming/delta_join/delta_join_snapshot_no_index.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
statement ok
set rw_implicit_flush = true;

statement ok
set rw_streaming_enable_delta_join = true;

statement ok
set streaming_parallelism = 2;

statement ok
create table a (a1 int primary key, a2 int);

statement ok
set streaming_parallelism = 3;

statement ok
create table b (b1 int primary key, b2 int);

statement ok
insert into A values (1,2), (11, 22);

statement ok
insert into B values (1,4), (11, 44);

statement ok
set streaming_parallelism = 0;

statement ok
create materialized view v as select * from a join b on a.a1 = b.b1;

query IIII rowsort
select * from v order by a1, a2, b1, b2;
----
1 2 1 4
11 22 11 44

statement ok
drop materialized view v;

statement ok
drop table a;

statement ok
drop table b;

statement ok
set rw_streaming_enable_delta_join = false;
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ statement ok
set streaming_parallelism = 0;

statement ok
create materialized view v as select * from a join b on a.a1 = b.b1 ;
create materialized view v as select * from a join b on a.a1 = b.b1;

statement ok
insert into A values (1,2), (1,3);
Expand All @@ -50,3 +50,6 @@ drop table a;

statement ok
drop table b;

statement ok
set rw_streaming_enable_delta_join = false;
47 changes: 47 additions & 0 deletions e2e_test/streaming/delta_join/delta_join_upstream_no_index.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
statement ok
set rw_implicit_flush = true;

statement ok
set rw_streaming_enable_delta_join = true;

statement ok
set streaming_parallelism = 2;

statement ok
create table a (a1 int primary key, a2 int);

statement ok
set streaming_parallelism = 3;

statement ok
create table b (b1 int primary key, b2 int);

statement ok
set streaming_parallelism = 0;

statement ok
create materialized view v as select * from a join b on a.a1 = b.b1;

statement ok
insert into A values (1,2), (11, 22);

statement ok
insert into B values (1,4), (11, 44);

query IIII rowsort
select * from v order by a1, a2, b1, b2;
----
1 2 1 4
11 22 11 44

statement ok
drop materialized view v;

statement ok
drop table a;

statement ok
drop table b;

statement ok
set rw_streaming_enable_delta_join = false;
29 changes: 27 additions & 2 deletions src/frontend/planner_test/tests/testdata/delta_join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,30 @@
StreamMaterialize { columns: [a1, a2, b1, b2, i_a1.a._row_id(hidden), i_b1.b._row_id(hidden)], pk_columns: [i_a1.a._row_id, i_b1.b._row_id, a1, b1] }
└─StreamExchange { dist: HashShard(i_a1.a1, i_b1.b1, i_a1.a._row_id, i_b1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: i_a1.a1 = i_b1.b1, output: [i_a1.a1, i_a1.a2, i_b1.b1, i_b1.b2, i_a1.a._row_id, i_b1.b._row_id] }
├─StreamIndexScan { index: "i_a1", columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) }
└─StreamIndexScan { index: "i_b1", columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
├─StreamIndexScan { index: i_a1, columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) }
└─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
- sql: |
set rw_streaming_enable_delta_join = true;
create table a (a1 int primary key, a2 int);
create table b (b1 int, b2 int);
create index i_b1 on b(b1);
/* should generate delta join plan, and stream index scan */
select * from a join b on a.a1 = b.b1 ;
stream_plan: |
StreamMaterialize { columns: [a1, a2, b1, b2, i_b1.b._row_id(hidden)], pk_columns: [a1, i_b1.b._row_id, b1] }
└─StreamExchange { dist: HashShard(a.a1, i_b1.b1, i_b1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: a.a1 = i_b1.b1, output: all }
├─StreamTableScan { table: a, columns: [a.a1, a.a2], pk: [a.a1], dist: UpstreamHashShard(a.a1) }
└─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
- sql: |
set rw_streaming_enable_delta_join = true;
create table a (a1 int primary key, a2 int);
create table b (b1 int primary key, b2 int);
/* should generate delta join plan, and stream index scan */
select * from a join b on a.a1 = b.b1 ;
stream_plan: |
StreamMaterialize { columns: [a1, a2, b1, b2], pk_columns: [a1, b1] }
└─StreamExchange { dist: HashShard(a.a1, b.b1) }
└─StreamDeltaJoin { type: Inner, predicate: a.a1 = b.b1, output: all }
├─StreamTableScan { table: a, columns: [a.a1, a.a2], pk: [a.a1], dist: UpstreamHashShard(a.a1) }
└─StreamTableScan { table: b, columns: [b.b1, b.b2], pk: [b.b1], dist: UpstreamHashShard(b.b1) }
155 changes: 104 additions & 51 deletions src/frontend/planner_test/tests/testdata/distribution_derive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,40 @@
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] }
├─StreamIndexScan { index: "ak1", columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamIndexScan { index: "bk1", columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
materialized table: 4294967294
StreamExchange Hash([2, 3, 4, 5]) from 1

Fragment 1
Union
StreamExchange Hash([2, 4, 3, 5]) from 4
StreamExchange Hash([2, 4, 3, 5]) from 5

Fragment 2
StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Upstream
BatchPlanNode

Fragment 3
StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Upstream
BatchPlanNode

Fragment 4
Lookup
StreamExchange Hash([0]) from 3
StreamExchange NoShuffle from 2

Fragment 5
Lookup
StreamExchange Hash([0]) from 2
StreamExchange NoShuffle from 3

Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [2, 3, 4, 5] }
- id: Ak1_join_B_onk1
before:
- create_tables
Expand All @@ -32,36 +64,43 @@
└─BatchExchange { order: [], dist: UpstreamHashShard(ak1.k1) }
└─BatchScan { table: ak1, columns: [ak1.k1, ak1.v], distribution: UpstreamHashShard(ak1.k1) }
stream_plan: |
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), b._row_id(hidden), b.k1(hidden)], pk_columns: [ak1.a._row_id, b._row_id, ak1.k1, b.k1] }
└─StreamHashJoin { type: Inner, predicate: ak1.k1 = b.k1, output: [ak1.v, b.v, ak1.a._row_id, ak1.k1, b._row_id, b.k1] }
├─StreamExchange { dist: HashShard(ak1.k1) }
| └─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamExchange { dist: HashShard(b.k1) }
└─StreamTableScan { table: b, columns: [b.k1, b.v, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] }
├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), b._row_id(hidden), b.k1(hidden)], pk_columns: [ak1.a._row_id, b._row_id, ak1.k1, b.k1] }
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
materialized table: 4294967294
StreamHashJoin { type: Inner, predicate: ak1.k1 = b.k1, output: [ak1.v, b.v, ak1.a._row_id, ak1.k1, b._row_id, b.k1] }
left table: 0, right table 2, left degree table: 1, right degree table: 3,
StreamExchange Hash([0]) from 1
StreamExchange Hash([0]) from 2
StreamExchange Hash([2, 3, 4, 5]) from 1

Fragment 1
Union
StreamExchange Hash([2, 4, 3, 5]) from 4
StreamExchange Hash([2, 4, 3, 5]) from 5

Fragment 2
Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Upstream
BatchPlanNode

Fragment 2
Chain { table: b, columns: [b.k1, b.v, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
Fragment 3
StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Upstream
BatchPlanNode

Table 0 { columns: [ak1_k1, ak1_v, ak1_a__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 1 { columns: [ak1_k1, ak1_a__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 2 { columns: [b_k1, b_v, b__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 3 { columns: [b_k1, b__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, b._row_id, b.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3] }
Fragment 4
Lookup
StreamExchange Hash([0]) from 3
StreamExchange NoShuffle from 2

Fragment 5
Lookup
StreamExchange Hash([0]) from 2
StreamExchange NoShuffle from 3

Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [2, 3, 4, 5] }
- id: A_join_Bk1_onk1
before:
- create_tables
Expand All @@ -72,36 +111,43 @@
└─BatchExchange { order: [], dist: UpstreamHashShard(a.k1) }
└─BatchScan { table: a, columns: [a.k1, a.v], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [v, bv, a._row_id(hidden), a.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [a._row_id, bk1.b._row_id, a.k1, bk1.k1] }
└─StreamHashJoin { type: Inner, predicate: a.k1 = bk1.k1, output: [a.v, bk1.v, a._row_id, a.k1, bk1.b._row_id, bk1.k1] }
├─StreamExchange { dist: HashShard(a.k1) }
| └─StreamTableScan { table: a, columns: [a.k1, a.v, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
└─StreamExchange { dist: HashShard(bk1.k1) }
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] }
├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [v, bv, a._row_id(hidden), a.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [a._row_id, bk1.b._row_id, a.k1, bk1.k1] }
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
materialized table: 4294967294
StreamHashJoin { type: Inner, predicate: a.k1 = bk1.k1, output: [a.v, bk1.v, a._row_id, a.k1, bk1.b._row_id, bk1.k1] }
left table: 0, right table 2, left degree table: 1, right degree table: 3,
StreamExchange Hash([0]) from 1
StreamExchange Hash([0]) from 2
StreamExchange Hash([2, 3, 4, 5]) from 1

Fragment 1
Chain { table: a, columns: [a.k1, a.v, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
Union
StreamExchange Hash([2, 4, 3, 5]) from 4
StreamExchange Hash([2, 4, 3, 5]) from 5

Fragment 2
StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Upstream
BatchPlanNode

Fragment 2
Fragment 3
Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Upstream
BatchPlanNode

Table 0 { columns: [a_k1, a_v, a__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 1 { columns: [a_k1, a__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 2 { columns: [bk1_k1, bk1_v, bk1_b__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 3 { columns: [bk1_k1, bk1_b__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 4294967294 { columns: [v, bv, a._row_id, a.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3] }
Fragment 4
Lookup
StreamExchange Hash([0]) from 3
StreamExchange NoShuffle from 2

Fragment 5
Lookup
StreamExchange Hash([0]) from 2
StreamExchange NoShuffle from 3

Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [2, 3, 4, 5] }
- id: Ak1_join_Bk1_onk1
before:
- create_tables
Expand All @@ -113,35 +159,42 @@
└─BatchScan { table: ak1, columns: [ak1.k1, ak1.v], distribution: UpstreamHashShard(ak1.k1) }
stream_plan: |
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
└─StreamHashJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] }
├─StreamExchange { dist: HashShard(ak1.k1) }
| └─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamExchange { dist: HashShard(bk1.k1) }
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] }
├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
materialized table: 4294967294
StreamHashJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] }
left table: 0, right table 2, left degree table: 1, right degree table: 3,
StreamExchange Hash([0]) from 1
StreamExchange Hash([0]) from 2
StreamExchange Hash([2, 3, 4, 5]) from 1

Fragment 1
Union
StreamExchange Hash([2, 4, 3, 5]) from 4
StreamExchange Hash([2, 4, 3, 5]) from 5

Fragment 2
Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Upstream
BatchPlanNode

Fragment 2
Fragment 3
Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Upstream
BatchPlanNode

Table 0 { columns: [ak1_k1, ak1_v, ak1_a__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 1 { columns: [ak1_k1, ak1_a__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 2 { columns: [bk1_k1, bk1_v, bk1_b__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 3 { columns: [bk1_k1, bk1_b__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3] }
Fragment 4
Lookup
StreamExchange Hash([0]) from 3
StreamExchange NoShuffle from 2

Fragment 5
Lookup
StreamExchange Hash([0]) from 2
StreamExchange NoShuffle from 3

Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [2, 3, 4, 5] }
- id: aggk1_from_A
before:
- create_tables
Expand Down
Loading