Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Temporal Join #49

Merged
merged 4 commits into from
Mar 31, 2023
Merged

RFC: Temporal Join #49

merged 4 commits into from
Mar 31, 2023

Conversation

chenzl25
Copy link
Contributor

@chenzl25 chenzl25 commented Feb 8, 2023

No description provided.

);
```

As we finish the syntax part, let's dive deep to the implementation. We want to support process time temporal join as a stateless operator. In order to achieve this, first we need to materialize the dimension table as a table with a connector which is already supported by our system and users can meet this requirement easily. Second, we need to schedule the process time temporal join operator together with the dimension table which is similar to the way we schedule the batch lookup join, the chain operator and delta join. More specifically, the process time temporal join operator requires consistent hash shuffle for its outer side input stream and no shuffle for the inner side input stream. Finally, each outer side row can lookup the inner side table locally. No state exists! One more thing need to be discussed is which snapshot of the inner side needs to be accessed? We can lookup the current epoch version which is the same as the regular join but it needs to buffer data. Or we can just look up the previous epoch which is fresh enough and do not need to buffer anymore.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether there'll be a case where the inner side and outer side have perfect locality, that is, the key in the dimension table often comes simultaneously with the fact table. By looking up the current epoch, we can get more stable results. I guess this is also what Flink's "semantic problem" is taking about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some strategies we can apply:

  1. Do not block the outer side and read the previous epoch.
  2. Do not block the outer side and read the current epoch data ASAP.
  3. Block the outer side util barrier comes and read the current epoch data.

I prefer 1 or 2 without waiting any barriers.


## Event time temporal join

The event time temporal join is far more complicated than the process time temporal join. **It requires watermark columns and maintaining states for both input sides**. The syntax looks like that `SELECT * FROM A LEFT JOIN B FOR SYSTEM_TIME AS OF A.even_time ON A.col = B.id`, where `event_time` is the watermark column of the table `A`. This SQL means for each row from table A, it will lookup the snapshot of table B based on its `even_time`. In order to find the exact snapshot of table B, table B (the inner side) also needs to have a watermark column. Only in this way we can make sure table A will not join a stale snapshot of table B. The reason why both sides need state is that both sides need to store rows beyond the current watermark. The reason why the outer side needs a watermark is that it can use it to keep its state small.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we specify the event time column of the inner side? 👀

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It comes from the watermark column defined in the inner side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, we can have multiple watermark columns in our design 🥵.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to ensure only exact one column for this case, otherwise, reject it.


## Event time temporal join

The event time temporal join is far more complicated than the process time temporal join. **It requires watermark columns and maintaining states for both input sides**. The syntax looks like that `SELECT * FROM A LEFT JOIN B FOR SYSTEM_TIME AS OF A.even_time ON A.col = B.id`, where `event_time` is the watermark column of the table `A`. This SQL means for each row from table A, it will lookup the snapshot of table B based on its `even_time`. In order to find the exact snapshot of table B, table B (the inner side) also needs to have a watermark column. Only in this way we can make sure table A will not join a stale snapshot of table B. The reason why both sides need state is that both sides need to store rows beyond the current watermark. The reason why the outer side needs a watermark is that it can use it to keep its state small.
Copy link
Contributor

@TennyZhuang TennyZhuang Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For watermark, we have an assumption that a watermark can be delayed at any time or even discarded, so we can't depend watermark to achieve a snapshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Mv on Mv can delay watermark in a long time if the upstream mv is large.


- Temporal join requires the outer side to be append-only and the inner side's primary key contained in the equivalence condition of the temporal join condition
- Process time temporal join can be stateless. It relies on the scheduler to place the join operator together with the inner side table.
- Event time temporal join requires watermark columns and maintaining states for both input sides. The outer side state's primary key should be in the order of the watermark key first and then the join key, while the inner side state's primary key should be in the order of join key first and then watermark key

This comment was marked as resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Watermark-filtered input will delete rows as watermark increases, but event time temporal join needs to keep at least one row for each primary key of the inner side table.

@fuyufjh
Copy link
Member

fuyufjh commented Mar 30, 2023

We have completed the processing-time temporal join part and may delay the event-time temporal join to the future. Shall we note down this and merge the PR?

@chenzl25
Copy link
Contributor Author

We have completed the processing-time temporal join part and may delay the event-time temporal join to the future. Shall we note down this and merge the PR?

+1 to merge it first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants