Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Band Join #32

Merged
merged 3 commits into from
Aug 14, 2023
Merged

RFC: Band Join #32

merged 3 commits into from
Aug 14, 2023

Conversation

chenzl25
Copy link
Contributor

No description provided.

@yuhao-su
Copy link

The implementation proposed in this RFC may enlighten a way to Temporal join.

Also, it is possible to implement with shared arrangement. (Maybe can avoid broadcast)

@chenzl25
Copy link
Contributor Author

The implementation proposed in this RFC may enlighten a way to Temporal join.

Also, it is possible to implement with shared arrangement. (Maybe can avoid broadcast)

I think actually it is more related to Interval Joins.

select * from A join B on A.p between B.start and B.end.
```

The band join has a nice property. The range condition of band join only involves 2 columns: one from the LHS the other from RHS, so we can always reverse the condition. For example, `A.p between B.d - 10 and B.d + 20` can be converted into `B.d between A.p - 20 and A.p + 10`. This is a crucial property for streaming queries, as we need to treat both sides of the join logically equivalent (both sides need to be built and probed).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a crucial property for streaming queries, as we need to treat both sides of the join logically equivalent (both sides need to be built and probed).

Why do we need to probe both sides? Why is this different in streaming vs. batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For streaming, updates from either left side or right side need to be reflected on the join output. Think about StreamHashJoin, we need to build a hash table for both sides and if an update comes from one side, we need to use it to probe the other side to find the matched rows. This is also applicable to the stream-band-join. As you can see, we need to build some index structures for both sides in streaming, while in batch we just need to build an index structure for one side (the build side), because the input size is bounded in batch.


- If we have an equal condition `A.a = B.b` and its selectivity is low, using `HashJoin` is enough.
- If we have an equal condition `A.a = B.b` and its selectivity is high, we can use this condition to distribute the data to acquire parallelism. For `A.p between B.d - 10 and B.d + 20`, we can construct an internal table with order key = `A.a, A.p, A.rid` for A side and order key `B.b, B.d, B.rid` for B side. For `A.q between B.e - 10 and B.e + 20`, we can construct an internal table with order key = `A.a, A.q, A.rid` for A side and order key `B.b, B.e, B.rid` for B side. When a row came from B with (B.b, B.d, B.e) = (100, 200, 300). We can lookup A's internal table row ids with range queries: A between (A.a = 100, A.p = 200 - 10 = 190) and (A.a = 100, A.p = 200 + 20 = 220). Merge the other A's internal table row ids with range queries: A between (A.a = 100, A.q = 300 - 5 = 295) and (A.a = 100, A.p = 200 + 15 = 315). Finally we can intersect the row ids to get the corresponding A matched rows. When a row comes from A, we first need to reverse the range condition as we mentioned before and then do the same logic as for row came from B. Row deleted is basically equivalent to the insertion, but with opposed operators. Update can be handled as delete followed by insert.
- If we don't have equal conditions `A.a = B.b`, we can only use singleton for both input sides or broadcast one side to the other side. The other logic is basically equivalent with the above example without `A.a` and `B.b` as their prefix keys.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be careful when doing broadcast since it duplicates the broadcasted state table. It would be better to do it with user hint.

```

- If we have an equal condition `A.a = B.b` and its selectivity is low, using `HashJoin` is enough.
- If we have an equal condition `A.a = B.b` and its selectivity is high, we can use this condition to distribute the data to acquire parallelism. For `A.p between B.d - 10 and B.d + 20`, we can construct an internal table with order key = `A.a, A.p, A.rid` for A side and order key `B.b, B.d, B.rid` for B side. For `A.q between B.e - 10 and B.e + 20`, we can construct an internal table with order key = `A.a, A.q, A.rid` for A side and order key `B.b, B.e, B.rid` for B side. When a row came from B with (B.b, B.d, B.e) = (100, 200, 300). We can lookup A's internal table row ids with range queries: A between (A.a = 100, A.p = 200 - 10 = 190) and (A.a = 100, A.p = 200 + 20 = 220). Merge the other A's internal table row ids with range queries: A between (A.a = 100, A.q = 300 - 5 = 295) and (A.a = 100, A.p = 200 + 15 = 315). Finally we can intersect the row ids to get the corresponding A matched rows. When a row comes from A, we first need to reverse the range condition as we mentioned before and then do the same logic as for row came from B. Row deleted is basically equivalent to the insertion, but with opposed operators. Update can be handled as delete followed by insert.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In streaming I guess most band join conditions are on timestamp columns, so it seems fine to only support one band condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if there are more than one band condition, we can always choose one of them to construct the internal state and other band conditions can be just treated as other conditions.


## Future possibilities

If you are familiar with Flink, we can find that they have interval join which is just a special case of the `BandJoin`. Interval join requires an equal condition and the range condition looks like `b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]`. Interval join also requires the input stream to be append-only and can cooperate with the watermark to prune the old states. We can use `BandJoin` to implement interval join in the future.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using the time-bound condition to do state cleaning, there is a unresolve problem: should the time column be placed before join key or after join key?

  1. time column before join key: state cleanning is easy (one range delete), but random acesss by join key become impossible.
  2. join key before time column: Join key access is fast as usual, but state cleanning's cost is O(N) where N = number of join keys.
  3. Alternatively, use two seperated state tables, but this would increase the IO cost multiple times.

Neither sounds a good solution. We need to take a look on Flink's implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had taken a look at Flink's TimeIntervalJoin and found that they use alternative two. First they utilize the join equal keys to partition the join state. Second, they maintain a leftCache and rightCache with type MapState<Long, List<Tuple2<RowData, Boolean>>> for each join equal key. The Long type is actually the timestamp. BTW, they suffer from a lack of ordered key MapState. The state cleaning mechanism of Flink TimeIntervalJoin works like that, it registers a cleanup timer to its time service. The cleanup timer triggered timestamp is calculated based on the row timestamp, input watermark and time interval provided by the user. As soon as the cleanup timer triggers, it iterates the LeftCache and RightCache, deletes expired rows, and registers another cleanup timer based on the earliest valid row timestamp left. As we can see,
Flink uses a bunch of point deletes triggered by the cleanup timer, rather than a range delete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using the time-bound condition to do state cleaning, there is a unresolve problem: should the time column be placed before join key or after join key?

  1. time column before join key: state cleanning is easy (one range delete), but random acesss by join key become impossible.
  2. join key before time column: Join key access is fast as usual, but state cleanning's cost is O(N) where N = number of join keys.
  3. Alternatively, use two seperated state tables, but this would increase the IO cost multiple times.

Neither sounds a good solution. We need to take a look on Flink's implementation.

I think the first solution is ridiculous. We do not need a fast state cleanning.

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we merge this?

@chenzl25
Copy link
Contributor Author

Shall we merge this?

Yes, because we have support Interval Join which could clean the state at least.

@chenzl25 chenzl25 merged commit b8cf1f8 into main Aug 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants