Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Band Join #32

Merged
merged 3 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions rfcs/0032-band-join.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
---
feature: band_join
authors:
- "Dylan Chen"
start_date: "2022/12/26"
---

# Band Join

A band join is a special type of non equi join in which key values in one data set must fall within the specified range (“band”) of the second data set. There is another concept, range join which is similar to the band join but more flexible. Unluckily, range join is too general for streaming queries. We can only implement range join for batch queries.

```sql
-- This is a band-join.
select * from A join B on A.p between B.d - 10 and B.d + 20.

-- This is not a band-join, but a range join.
select * from A join B on A.p between B.start and B.end.
```

The band join has a nice property. The range condition of band join only involves 2 columns: one from the LHS the other from RHS, so we can always reverse the condition. For example, `A.p between B.d - 10 and B.d + 20` can be converted into `B.d between A.p - 20 and A.p + 10`. This is a crucial property for streaming queries, as we need to treat both sides of the join logically equivalent (both sides need to be built and probed).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a crucial property for streaming queries, as we need to treat both sides of the join logically equivalent (both sides need to be built and probed).

Why do we need to probe both sides? Why is this different in streaming vs. batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For streaming, updates from either left side or right side need to be reflected on the join output. Think about StreamHashJoin, we need to build a hash table for both sides and if an update comes from one side, we need to use it to probe the other side to find the matched rows. This is also applicable to the stream-band-join. As you can see, we need to build some index structures for both sides in streaming, while in batch we just need to build an index structure for one side (the build side), because the input size is bounded in batch.



## Motivation

A general nested-loop join in streaming query is known as inefficient join implementation, so we are seeking opportunities to recognize the special case of non equi join and try to implement a efficient join algorithm for it, e.g. dynamic filter which requires the RHS of join to be a scalar row and contains comparison condition. Band join is also one of these cases that we can implement an efficient join for it. I also want to mention that so many database systems have already supported band join in some way e.g. Oracle, Vertica, Duckdb, Umbra, DataBricks, Flink (Interval Join).

## Design

### Streaming

![](./images/0032-band-join/band-join.png)

For simplicity, we consider the following query first.
```sql
select * from A join B on A.p between B.d - 10 and B.d + 20.
```

- We can use singleton for both input sides or broadcast one side to the other side. We will construct an internal table for A side with order key = `A.p, A.rid` and for B side with order key = `B.d, B.rid`.
- We can reverse the condition `A.p between B.d - 10 and B.d + 20` into `B.d between A.p - 20 and Ap.p + 10`.
- Suppose we already have 3 rows for A side (10, 10001), (20, 10002), (30, 10003) and 3 rows for B side (5, 20001), (15, 20002), (45, 20003).
- When A side row changed comes with +(15, 10004), we need to search B side with condition `B.d between 15 - 20 = -5 and 15 + 10 = 25`, so B side row (5 , 20001) and (15, 20002) matched and return.
- When A side row change comes with -(20, 10002), we need to search B side with condition `B.d between 20 - 20 = 0 and 20 + 10 = 30`, so B side row (5 , 20001) and (15, 20002) matched and return.


Band join can contain more than one range condition, generally if it contains k range conditions we call it k dimensions band join. By the way, band join can also contain equal conditions. If the equal conditions of the band join with poor selectivity we can still use `BandJoin` to optimize it, otherwise a `HashJoin` is enough.

Without loss of generality, we will consider the following query.

```sql
select * from A join B on A.a = B.b and A.p between B.d - 10 and B.d + 20 and A.q between B.e - 5 and B.e + 15.
```

- If we have an equal condition `A.a = B.b` and its selectivity is low, using `HashJoin` is enough.
- If we have an equal condition `A.a = B.b` and its selectivity is high, we can use this condition to distribute the data to acquire parallelism. For `A.p between B.d - 10 and B.d + 20`, we can construct an internal table with order key = `A.a, A.p, A.rid` for A side and order key `B.b, B.d, B.rid` for B side. For `A.q between B.e - 10 and B.e + 20`, we can construct an internal table with order key = `A.a, A.q, A.rid` for A side and order key `B.b, B.e, B.rid` for B side. When a row came from B with (B.b, B.d, B.e) = (100, 200, 300). We can lookup A's internal table row ids with range queries: A between (A.a = 100, A.p = 200 - 10 = 190) and (A.a = 100, A.p = 200 + 20 = 220). Merge the other A's internal table row ids with range queries: A between (A.a = 100, A.q = 300 - 5 = 295) and (A.a = 100, A.p = 200 + 15 = 315). Finally we can intersect the row ids to get the corresponding A matched rows. When a row comes from A, we first need to reverse the range condition as we mentioned before and then do the same logic as for row came from B. Row deleted is basically equivalent to the insertion, but with opposed operators. Update can be handled as delete followed by insert.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In streaming I guess most band join conditions are on timestamp columns, so it seems fine to only support one band condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if there are more than one band condition, we can always choose one of them to construct the internal state and other band conditions can be just treated as other conditions.

- If we don't have equal conditions `A.a = B.b`, we can only use singleton for both input sides or broadcast one side to the other side. The other logic is basically equivalent with the above example without `A.a` and `B.b` as their prefix keys.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be careful when doing broadcast since it duplicates the broadcasted state table. It would be better to do it with user hint.


As we can see, we need to estimate the selectivity of the equal condition to decide whether to convert the join into a `BandJoin`, for the reason that we have no statistics now, we can provide a join hint or other ways for users to use it or not. If we have no equal condition, we can always turn the join into `BandJoin`.

### Batch

For batch query, it is more flexible to implement the `BandJoin`. We can even support `RangeJoin` rather than `BandJoin`. One of the most elegant ways to implement it is to use kd-tree as an index, for more details you can read the paper of `A scalable and generic approach to range joins`. At least we have nested-loop-join to run the `BandJoin` or `RangeJoin`, despite their inefficiency.


## Unresolved questions

* Parallelize the streaming `BandJoin` if there is no equal condition. Some papers have discussed how to optimize the distributed `BandJoin` by partitioning the input datasets, but it might need a special shuffle algorithm.
* For simplicity, we can only support `InnerJoin` for now.

## Future possibilities

If you are familiar with Flink, we can find that they have interval join which is just a special case of the `BandJoin`. Interval join requires an equal condition and the range condition looks like `b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]`. Interval join also requires the input stream to be append-only and can cooperate with the watermark to prune the old states. We can use `BandJoin` to implement interval join in the future.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using the time-bound condition to do state cleaning, there is a unresolve problem: should the time column be placed before join key or after join key?

  1. time column before join key: state cleanning is easy (one range delete), but random acesss by join key become impossible.
  2. join key before time column: Join key access is fast as usual, but state cleanning's cost is O(N) where N = number of join keys.
  3. Alternatively, use two seperated state tables, but this would increase the IO cost multiple times.

Neither sounds a good solution. We need to take a look on Flink's implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had taken a look at Flink's TimeIntervalJoin and found that they use alternative two. First they utilize the join equal keys to partition the join state. Second, they maintain a leftCache and rightCache with type MapState<Long, List<Tuple2<RowData, Boolean>>> for each join equal key. The Long type is actually the timestamp. BTW, they suffer from a lack of ordered key MapState. The state cleaning mechanism of Flink TimeIntervalJoin works like that, it registers a cleanup timer to its time service. The cleanup timer triggered timestamp is calculated based on the row timestamp, input watermark and time interval provided by the user. As soon as the cleanup timer triggers, it iterates the LeftCache and RightCache, deletes expired rows, and registers another cleanup timer based on the earliest valid row timestamp left. As we can see,
Flink uses a bunch of point deletes triggered by the cleanup timer, rather than a range delete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using the time-bound condition to do state cleaning, there is a unresolve problem: should the time column be placed before join key or after join key?

  1. time column before join key: state cleanning is easy (one range delete), but random acesss by join key become impossible.
  2. join key before time column: Join key access is fast as usual, but state cleanning's cost is O(N) where N = number of join keys.
  3. Alternatively, use two seperated state tables, but this would increase the IO cost multiple times.

Neither sounds a good solution. We need to take a look on Flink's implementation.

I think the first solution is ridiculous. We do not need a fast state cleanning.


## Reference

- Li R, Gatterbauer W, Riedewald M. Near-Optimal Distributed Band-Joins through Recursive Partitioning[C]//Proceedings of the 2020 ACM SIGMOD International Conference on Management of Data. 2020: 2375-2390.
- Reif M, Neumann T. A scalable and generic approach to range joins[J]. Proceedings of the VLDB Endowment, 2022, 15(11): 3018-3030.
- https://www.vertica.com/blog/what-is-a-range-join-and-why-is-it-so-fastba-p223413/
- https://docs.oracle.com/en/database/oracle/oracle-database/19/tgsql/joins.html#GUID-24F34188-110F-4245-9DE7-43954092AFE0
- https://docs.databricks.com/optimizations/range-join.html
- https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/joining/#interval-join
Binary file added rfcs/images/0032-band-join/band-join.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.