Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Change Data Capture #17535

Closed
wants to merge 1 commit into from
Closed

Conversation

rjnn
Copy link
Contributor

@rjnn rjnn commented Aug 9, 2017

This RFC proposes a mechanism for subscribing to changes to a table or
database. It will support the Apache Kafka publisher API. It will make
use of a Replica level subsystem "ChangeFeed", the design of which is
described in #16838.

@rjnn rjnn added this to the 1.2 milestone Aug 9, 2017
@rjnn rjnn requested a review from a team as a code owner August 9, 2017 02:30
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@rjnn rjnn force-pushed the change_feeds_distsql branch from 6149daf to 8ac9bc9 Compare August 9, 2017 02:31
@bdarnell
Copy link
Contributor

bdarnell commented Aug 9, 2017

This generally looks good to me.

I'd like to see some discussion of latency here. There are a lot of things that can cause delays, especially for in-order delivery. How quickly can changes make it through this pipeline?

If Kafka and/or the ChangeAggregator can't keep up, what are the failure modes? There should be circuit breakers to ensure a clean break instead of endless buffering or spinning on catch-up scans.


Review status: 0 of 1 files reviewed at latest revision, 8 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_distsql.md, line 43 at r1 (raw file):

There are, broadly, two ways clients want to capture changes: by
transaction or by row. The former streams transactions that touched a
watched-set (i.e. a table or a database), the second streams

Is watching a database equivalent to separate watches on every table in the DB, or are transactions combined across all the tables they touch? Can a watched-set include multiple tables (but not an entire DB)?


docs/RFCS/change_feeds_distsql.md, line 53 at r1 (raw file):

   HLC_timestamp timestamp
   UUID transaction_id
   Database_row old_row

Maybe I'm overthinking this (given the example of rethinkdb), but putting the entire old row here (as opposed to just the PK) seems unnecessarily expensive. We may want to consider making this optional.


docs/RFCS/change_feeds_distsql.md, line 68 at r1 (raw file):

generality: If we assume that we will build a CDC that streams these
row-by-row updates in `HLC_timestamp` order, we can simply buffer
messages and batch them by `transaction_id` order to provide the

When can you be sure you have everything for a transaction id?


docs/RFCS/change_feeds_distsql.md, line 190 at r1 (raw file):

`HLC_timestamp` order. It also receives watermark updates from each
`ChangeFeed`. It only relays a message at time `T_1` out when every
replica has raised the watermarked past `T_1`.

If the client requested out-of-order updates, this part is skipped and a single failed ChangeFeed doesn't block everything else, right?


docs/RFCS/change_feeds_distsql.md, line 209 at r1 (raw file):

## System.jobs table and Job Resumability

We use the System.jobs table to keep track of whether the

s/the/any/ ?


docs/RFCS/change_feeds_distsql.md, line 335 at r1 (raw file):

guarantees (a crash after commit, but before a trigger has run, for
example, could result in the trigger never running), or atomicity
guarantees. Triggers thus typically provide "at most once" semantics.

This is true of triggers with effects outside the database; triggers that write to other tables in the same database are typically exactly-once.


docs/RFCS/change_feeds_distsql.md, line 412 at r1 (raw file):

3. Each of `R_B` and `R_C` does the following: it sends an `ABORT` (if
   it couldn't successfully write the intent key) or `STAGE` (if it
   did) back to `R_A`.

This is not the terminology we normally use. The result of writing an intent goes to the coordinator, not the range holding the transaction record.


docs/RFCS/change_feeds_distsql.md, line 461 at r1 (raw file):

A compromise is for ChangeFeeds to only depend on those ranges that
hold keys that the ChangeFeed depends on. However, a range (say `R_B`)
is responsible for registering dependencies on other ranges when

What exactly does "registering dependencies" mean here? What part of the range is responsible for doing this? It sounds like the ChangeFeed operation must periodically try to scan its whole range to trigger intent resolution for any extant intents.


Comments from Reviewable

@knz
Copy link
Contributor

knz commented Aug 9, 2017

Review status: 0 of 1 files reviewed at latest revision, 10 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_distsql.md, line 42 at r1 (raw file):

There are, broadly, two ways clients want to capture changes: by
transaction or by row. The former streams transactions that touched a

You're forgetting one here: by geo-partitioning prefix.


docs/RFCS/change_feeds_distsql.md, line 109 at r1 (raw file):

  sure that they have not missed any message in between the read and
  the first message received on the feed, nor will they receive
  changes that are already accounted for in that `SELECT`.

There's some black magic needed here to ensure that a retried txn (either server-side or client-side) only causes one change feed to be created, with the last one.

Perhaps delay the creation of the change feed to the moment the SQL transaction commits?


Comments from Reviewable

@tbg
Copy link
Member

tbg commented Aug 10, 2017

Looks good! I didn't read the appendix thoroughly.

  • You were planning to allow watches to an arbitrary set of key ranges, right (perhaps restricting to within one database for technical reasons)? I think giving the proto of the request setting up the ChangeAggregator would be helpful. Without watching multiple key ranges, things could get awkward quickly: you don't need to watch the whole replica if all you care about is one key (push notifications example) and you don't have to watch all 200 tables in your DB when you only care about exactly two.

  • You mention that clients can decline in-order delivery, but I'm not really sure what mode that puts us in. Do you just pass everything on right away, or do you still buffer?

@bdarnell afaict latency is really about the slowest close notification. Once a timestamp is closed, the latency shouldn't be much more than what we typically expect of DistSQL. #16838 specifically discusses cases in which you want to cut down your latency (listening for a handful of keys as opposed to full-blown CDC) by making use of the fact that updates are always ordered when restricted to a single key.


Reviewed 1 of 1 files at r1.
Review status: all files reviewed at latest revision, 16 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_distsql.md, line 53 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

Maybe I'm overthinking this (given the example of rethinkdb), but putting the entire old row here (as opposed to just the PK) seems unnecessarily expensive. We may want to consider making this optional.

old_row isn't available from the primitive, at least not as written now, and adding it would come with additional cost. Maybe it's worth it (I can see how apps would want it), but let's quadruple check that and think through the performance implications. I think the canonical way of updating #16838 would avoid extra cost unless the client actually wanted the old row, so perhaps that's acceptable. Either way, making this optional (and disabled by default) sounds reasonable.


docs/RFCS/change_feeds_distsql.md, line 68 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

When can you be sure you have everything for a transaction id?

Streaming by transaction is a good bit less trivial than streaming by row. To decide that a transaction is finished, the consumer of the various primitives must be notified of the successful EndTransaction and then wait until any range that that transaction has written to has closed out that timestamp. Fortunately, the information on where the intents live is in the transaction record and so can be obtained, but what if we're not watching all of these ranges? Besides, having the info centralized in the txn record is something we'd like to be able to change in the future. It's unlikely that even with these changes you'd not be able to get the same information (after all, we're probably still putting the txn near the information on its intents), but it might be trickier after. For example, the intents in the record are already only guaranteed to be a superset of the actual writes, and we're thinking about making it much larger in many cases to accommodate large writing transactions. I don't see immediately how that would break you, but there's a lot to watch out for. We would also need extra plumbing to put the txn ID back into 1PC WriteBatches.

Can you make to without the txn ID and simply group by timestamp? The only downside is that you might be seeing multiple transactions' writes at the same timestamp (in which case they won't overlap), but maybe that's good enough.

Please motivate the need for transactional grouping -- it's a bunch of extra work and if we can avoid it at all, I'm pretty sure we should.


docs/RFCS/change_feeds_distsql.md, line 71 at r1 (raw file):

alternate design.

Finally, we can also stream transactions _out of order_, at the

What do you win here? You don't have to sort when the txn timestamp closes out on all potentially written to ranges? Anything else?


docs/RFCS/change_feeds_distsql.md, line 109 at r1 (raw file):

Previously, knz (kena) wrote…

There's some black magic needed here to ensure that a retried txn (either server-side or client-side) only causes one change feed to be created, with the last one.

Perhaps delay the creation of the change feed to the moment the SQL transaction commits?

The ChangeFeed itself doesn't have to be transactional. You could do

BEGIN TRANSACTION; -- read-only
SELECT bunch_of_stuff();
SELECT transaction_timestamp(); -- or whatever the right method is
COMMIT;

MAKE MAGIC CHANGEFEED AS OF SYSTEM TIME timestamp_from_above();

I think that's roughly what's envisioned above, but @arjunravinarayan should clarify.


docs/RFCS/change_feeds_distsql.md, line 113 at r1 (raw file):

* A change feed should gracefully partition when the rate of changes
  are too large for a single processor to handle into multiple Kafka
  "topics".

Can you elaborate? I thought Kafka topics shard and you could have 10 Cockroach nodes each pushing into one shard. As written, it sounds like we need one topic per producer (= CockroachDB processor?)


docs/RFCS/change_feeds_distsql.md, line 117 at r1 (raw file):

* Finally, a design for change feeds should be forward compatible with
  feeding changes _into_ incrementally updated materialized views
  using the timely+differential dataflow design.

[citation needed]


docs/RFCS/change_feeds_distsql.md, line 153 at r1 (raw file):

* row-by-row changes, along with the HLC timestamp and transaction
  UUID of the change:
    ValueChange<RangeID, key, newValue, timestamp, UUID>

You're talking to a replica, so you've looked up its RangeID, and there's no need to put that into every message.


docs/RFCS/change_feeds_distsql.md, line 158 at r1 (raw file):

  timestamp `t`, which are a promise that all updates for this watched
  keyspace `<=t` have been sent (i.e. the min-safe timestamp watermark
  has passed this HLC timestamp):

This isn't sufficient, see the comment in #16593 (comment)

It'll be more difficult to guarantee that a closed-out timestamp has no intents under it that are committed but not resolved. I think the onus here is on #16838 (RFC: Storage-Level Primitive for Change Feeds) to make that happen, but it doesn't hurt to point that out (or just remove the parens). Also affects next subsection.


docs/RFCS/change_feeds_distsql.md, line 249 at r1 (raw file):

so it's rather unhelpful as a reference.

## Fault tolerant DistSQL processors.

nit: no dot


docs/RFCS/change_feeds_distsql.md, line 461 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

What exactly does "registering dependencies" mean here? What part of the range is responsible for doing this? It sounds like the ChangeFeed operation must periodically try to scan its whole range to trigger intent resolution for any extant intents.

That or the Replica must maintain an (unreplicated, i.e. in aux storage, and consistent enough) index of timestamp -> intent. We could make sure we only write that index when the range is being watched (doing a single scan when the feed is set up, which we'll have to do at least once anyway). I think making that happen is more a requirement we'll want to impose on #16593 (comment) than this PR. Sure looks a bit unpleasant, though it seems necessary.


Comments from Reviewable

@petermattis
Copy link
Collaborator

Review status: all files reviewed at latest revision, 23 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_distsql.md, line 20 at r1 (raw file):

single-range `ChangeFeed` primitive described in #16838. ChangeFeeds
are a replica-level system, watching a single contiguous keyspan from
a single key range, and are not fault tolerant. They provide at most

s/single key range/single range/g? I think you're referring to a Range here, but maybe I'm misunderstanding the text.


docs/RFCS/change_feeds_distsql.md, line 29 at r1 (raw file):

3) aggregating individual `ChangeFeed` streams from many ranges,
surviving various chaos events such as leadership changes, range

I wouldn't describe these as chaos events. Perhaps "maintenance events".


docs/RFCS/change_feeds_distsql.md, line 32 at r1 (raw file):

rebalancing, and lagging replicas.

4) Supporting resumability and cancellability of CDC feeds

s/feeds/feeds./g


docs/RFCS/change_feeds_distsql.md, line 36 at r1 (raw file):

# Motivation

See the motivations section in #16838 for a general motivation for

Better to include a link to the RFC directly (when it is available).


docs/RFCS/change_feeds_distsql.md, line 43 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

Is watching a database equivalent to separate watches on every table in the DB, or are transactions combined across all the tables they touch? Can a watched-set include multiple tables (but not an entire DB)?

If I watch for transactions on a single table, do I see all of the mutations in the transaction even if they occur on other tables or only the mutations that touch the table I'm watching?


docs/RFCS/change_feeds_distsql.md, line 54 at r1 (raw file):

   UUID transaction_id
   Database_row old_row
   Database_row new_row

Does the ChangeFeed primitive give us the full new row in the presence of column families?


docs/RFCS/change_feeds_distsql.md, line 84 at r1 (raw file):

* A CDC feed should be registerable against all writes to a single
  database or a single table. The change feed should persist across
  chaos, rolling cluster upgrades, and large tables/databases split

s/chaos/node restarts/g


docs/RFCS/change_feeds_distsql.md, line 189 at r1 (raw file):

them. It then receives the stream of changes, buffering them in sorted
`HLC_timestamp` order. It also receives watermark updates from each
`ChangeFeed`. It only relays a message at time `T_1` out when every

Is there 1 ChangeAggregator for each "watch"? That seems like a significant bottleneck and a burden to whichever node is processing the ChangeAggregator. Consider a 100-node cluster with a table that is spread across all of the nodes. A single ChangeAggregator is almost certainly not going to be able to handle all of those feeds, if for no other reason than network bandwidth.

Rather than be a complete downer with this comment, I'll suggest (without having put much thought into it) that we can support N ChangeAggregators for a single "watch" by extending the ChangeFeed primitive to allow selecting only changes where txnID % num == 0.


Comments from Reviewable

This RFC proposes a mechanism for subscribing to changes to a table or
database. It will support the Apache Kafka publisher API. It will make
use of a Replica level subsystem "ChangeFeed", the design of which is
described in cockroachdb#16838.
@rjnn rjnn force-pushed the change_feeds_distsql branch from 8ac9bc9 to 97e3e11 Compare August 11, 2017 02:25
@rjnn
Copy link
Contributor Author

rjnn commented Aug 11, 2017

Thanks for the reviews, everyone! There's a lot to digest here, so I've only addressed some of the comments. For the rest, I need some time to fully work through before posting answers, which I will do in a day or two and re-up the RFC.


Review status: 0 of 1 files reviewed at latest revision, 23 unresolved discussions, some commit checks pending.


docs/RFCS/change_feeds_distsql.md, line 20 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

s/single key range/single range/g? I think you're referring to a Range here, but maybe I'm misunderstanding the text.

Correct. Done.


docs/RFCS/change_feeds_distsql.md, line 29 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

I wouldn't describe these as chaos events. Perhaps "maintenance events".

Done.


docs/RFCS/change_feeds_distsql.md, line 32 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

s/feeds/feeds./g

Done.


docs/RFCS/change_feeds_distsql.md, line 36 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Better to include a link to the RFC directly (when it is available).

Made into a link to the PR for now, so that at least its a real link. Will update to the RFC when merged.


docs/RFCS/change_feeds_distsql.md, line 42 at r1 (raw file):

Previously, knz (kena) wrote…

You're forgetting one here: by geo-partitioning prefix.

I do not follow this point. Could you please elaborate?


docs/RFCS/change_feeds_distsql.md, line 43 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

If I watch for transactions on a single table, do I see all of the mutations in the transaction even if they occur on other tables or only the mutations that touch the table I'm watching?

I have not thought seriously about which answer to any of these questions would be the best behavior. On thinking about it, I believe that all the variations are possible, with various tradeoffs. I will add a section that addresses these tradeoffs soon. Thank you.


docs/RFCS/change_feeds_distsql.md, line 53 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

old_row isn't available from the primitive, at least not as written now, and adding it would come with additional cost. Maybe it's worth it (I can see how apps would want it), but let's quadruple check that and think through the performance implications. I think the canonical way of updating #16838 would avoid extra cost unless the client actually wanted the old row, so perhaps that's acceptable. Either way, making this optional (and disabled by default) sounds reasonable.

Just the PK is an elegant default solution. Tracking the complete old row should definitely be optional. I will add this.


docs/RFCS/change_feeds_distsql.md, line 54 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Does the ChangeFeed primitive give us the full new row in the presence of column families?

No it does not, as currently envisioned. Good point. I will document this.


docs/RFCS/change_feeds_distsql.md, line 71 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

What do you win here? You don't have to sort when the txn timestamp closes out on all potentially written to ranges? Anything else?

Added an answer to your question in the test.


docs/RFCS/change_feeds_distsql.md, line 84 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

s/chaos/node restarts/g

Done.


docs/RFCS/change_feeds_distsql.md, line 109 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

The ChangeFeed itself doesn't have to be transactional. You could do

BEGIN TRANSACTION; -- read-only
SELECT bunch_of_stuff();
SELECT transaction_timestamp(); -- or whatever the right method is
COMMIT;

MAKE MAGIC CHANGEFEED AS OF SYSTEM TIME timestamp_from_above();

I think that's roughly what's envisioned above, but @arjunravinarayan should clarify.

@tschottdorf is correct. Amended.


docs/RFCS/change_feeds_distsql.md, line 113 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

Can you elaborate? I thought Kafka topics shard and you could have 10 Cockroach nodes each pushing into one shard. As written, it sounds like we need one topic per producer (= CockroachDB processor?)

An error. Rewritten. The intention is for there to be multiple changefeeds writing to the same topic.


docs/RFCS/change_feeds_distsql.md, line 117 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

[citation needed]

Done.


docs/RFCS/change_feeds_distsql.md, line 153 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

You're talking to a replica, so you've looked up its RangeID, and there's no need to put that into every message.

Done.


docs/RFCS/change_feeds_distsql.md, line 158 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

This isn't sufficient, see the comment in #16593 (comment)

It'll be more difficult to guarantee that a closed-out timestamp has no intents under it that are committed but not resolved. I think the onus here is on #16838 (RFC: Storage-Level Primitive for Change Feeds) to make that happen, but it doesn't hurt to point that out (or just remove the parens). Also affects next subsection.

Removed the parens. Added some text to the next subsection; PTAL.


docs/RFCS/change_feeds_distsql.md, line 189 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Is there 1 ChangeAggregator for each "watch"? That seems like a significant bottleneck and a burden to whichever node is processing the ChangeAggregator. Consider a 100-node cluster with a table that is spread across all of the nodes. A single ChangeAggregator is almost certainly not going to be able to handle all of those feeds, if for no other reason than network bandwidth.

Rather than be a complete downer with this comment, I'll suggest (without having put much thought into it) that we can support N ChangeAggregators for a single "watch" by extending the ChangeFeed primitive to allow selecting only changes where txnID % num == 0.

The intention is to have a ChangeAggregator deal with some max number of (based on empirical testing, for now lets say something like 10) ChangeFeeds. We spin up more ChangeAggregators as necessary to cover all the tables in the watched set. Multiple ChangeAggregators have to watch disjoint parts of the keyspace.


docs/RFCS/change_feeds_distsql.md, line 190 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

If the client requested out-of-order updates, this part is skipped and a single failed ChangeFeed doesn't block everything else, right?

Correct. Added.


docs/RFCS/change_feeds_distsql.md, line 209 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

s/the/any/ ?

Done.


docs/RFCS/change_feeds_distsql.md, line 249 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

nit: no dot

Done.


docs/RFCS/change_feeds_distsql.md, line 335 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

This is true of triggers with effects outside the database; triggers that write to other tables in the same database are typically exactly-once.

Done.


docs/RFCS/change_feeds_distsql.md, line 412 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

This is not the terminology we normally use. The result of writing an intent goes to the coordinator, not the range holding the transaction record.

Great! This was the intention of writing the appendix - to make sure I had the story straight. I will carefully go over this with someone and update this.


docs/RFCS/change_feeds_distsql.md, line 461 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

That or the Replica must maintain an (unreplicated, i.e. in aux storage, and consistent enough) index of timestamp -> intent. We could make sure we only write that index when the range is being watched (doing a single scan when the feed is set up, which we'll have to do at least once anyway). I think making that happen is more a requirement we'll want to impose on #16593 (comment) than this PR. Sure looks a bit unpleasant, though it seems necessary.

Yes, I'm not sure there's a way to avoid that.


Comments from Reviewable

@knz
Copy link
Contributor

knz commented Aug 11, 2017

Review status: 0 of 1 files reviewed at latest revision, 23 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_distsql.md, line 42 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

I do not follow this point. Could you please elaborate?

If the user does geo-partitioning they will typically want to route change events that touch rows from a region to a separate collector than that of rows for another region.


docs/RFCS/change_feeds_distsql.md, line 109 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

@tschottdorf is correct. Amended.

This means that clients cannot expect to start seeing events on the change feed until the txn has committed. Perhaps this can be highlighted too.


Comments from Reviewable

@knz
Copy link
Contributor

knz commented Aug 11, 2017

Review status: 0 of 1 files reviewed at latest revision, 23 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_distsql.md, line 42 at r1 (raw file):

Previously, knz (kena) wrote…

If the user does geo-partitioning they will typically want to route change events that touch rows from a region to a separate collector than that of rows for another region.

(Data sovereignty concerns etc)


Comments from Reviewable

@petermattis
Copy link
Collaborator

Review status: 0 of 1 files reviewed at latest revision, 18 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_distsql.md, line 189 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

The intention is to have a ChangeAggregator deal with some max number of (based on empirical testing, for now lets say something like 10) ChangeFeeds. We spin up more ChangeAggregators as necessary to cover all the tables in the watched set. Multiple ChangeAggregators have to watch disjoint parts of the keyspace.

Ok, I'm missing something fundamental about how this would work. If you have 2 ChangeAggregators covering a table a given ChangeAggregator won't necessarily see all of the mutations for a single transaction. Simplest case is a table with 2 rows and 2 ChangeAggregators, one covering row 1 and the other covering row 2. If a transaction modifies both rows, one or the other ChangeAggregator will miss one of the mutations.

Also, with multiple ChangeAggregators how do we provide a stream of transactions in transaction timestamp order. Does each ChangeAggregator provide a separate stream and the consumer is required to merge them?


Comments from Reviewable

@bdarnell
Copy link
Contributor

afaict latency is really about the slowest close notification. Once a timestamp is closed, the latency shouldn't be much more than what we typically expect of DistSQL

Yes, my question was trying to get a little more specific about the "slowest close notification". How frequently do we intend to send close notifications, and how are our actual close notifications distributed around that target? Full table-level CDC is less latency-sensitive than other potential uses of change feeds, but we should try to get a handle on what that latency would be, and also the catch-up effects (it's one thing if a single slow close notification delays the close notification of a downstream consumer but the data is otherwise flowing, but if a slow close notification blocks the data flow as well (as I think it does in this proposal), it's more severe since the CDC process will consume more resources while it catches up. This will need to be rate limited to avoid interfering with OLTP traffic, but we also need to consider what happens when the CDC feed is unable to catch up)


Review status: 0 of 1 files reviewed at latest revision, 17 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_distsql.md, line 43 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

I have not thought seriously about which answer to any of these questions would be the best behavior. On thinking about it, I believe that all the variations are possible, with various tradeoffs. I will add a section that addresses these tradeoffs soon. Thank you.

This is where more concrete examples in the motivation section would be useful (and not just deferring to #16838 - some of those examples would definitely not use CDC; we should be clear about which woulds are intended to and let that guide which tradeoffs we make).


docs/RFCS/change_feeds_distsql.md, line 68 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

Streaming by transaction is a good bit less trivial than streaming by row. To decide that a transaction is finished, the consumer of the various primitives must be notified of the successful EndTransaction and then wait until any range that that transaction has written to has closed out that timestamp. Fortunately, the information on where the intents live is in the transaction record and so can be obtained, but what if we're not watching all of these ranges? Besides, having the info centralized in the txn record is something we'd like to be able to change in the future. It's unlikely that even with these changes you'd not be able to get the same information (after all, we're probably still putting the txn near the information on its intents), but it might be trickier after. For example, the intents in the record are already only guaranteed to be a superset of the actual writes, and we're thinking about making it much larger in many cases to accommodate large writing transactions. I don't see immediately how that would break you, but there's a lot to watch out for. We would also need extra plumbing to put the txn ID back into 1PC WriteBatches.

Can you make to without the txn ID and simply group by timestamp? The only downside is that you might be seeing multiple transactions' writes at the same timestamp (in which case they won't overlap), but maybe that's good enough.

Please motivate the need for transactional grouping -- it's a bunch of extra work and if we can avoid it at all, I'm pretty sure we should.

On second thought, I don't think you'd need to do anything tricky to figure out when you have everything for a transaction. If you're already producing a feed in timestamp order (which is itself expensive! I think that to minimize catch-up effects we should encourage consumers of CDC to use out-of-order feeds and understand close notifications), you just need to buffer one timestamp's worth of data to split it into transactions. The only costly change is that you'd need to do something about 1PC transactions.


docs/RFCS/change_feeds_distsql.md, line 77 at r2 (raw file):

immediately), although it would not reduce the used buffer space,
since the aggregator needs to keep track of all messages after the
lower watermark anyway, to ensure it does not duplicate deliver

The summary proposes to offer both at-least-once and at-most-once options; this buffering would only be required in at-most-once modes. (Which use cases would use each mode?)


docs/RFCS/change_feeds_distsql.md, line 122 at r2 (raw file):

  SELECT transaction_timestamp();
  COMMIT;
  CREATE CHANGEEED (...) AS OF SYSTEM TIME timestamp_from_above();

s/EEE/FEE/


Comments from Reviewable

@a-robinson
Copy link
Contributor

Review status: 0 of 1 files reviewed at latest revision, 24 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_distsql.md, line 95 at r2 (raw file):

  registerable as an Apache Kafka publisher.

* An important consideration in implementing change feeds is

Is this required by Kafka or other major consumers? Why is it worth the effort?


docs/RFCS/change_feeds_distsql.md, line 136 at r2 (raw file):

  design.

Supporting filters and more complex transformations on the result of

#16838 includes waiting on updates for individual rows or small spans as the first motivating use case. That may just be chance, but did you check up on what the feature requests we've gotten were looking for? I'd think that allowing for filtering on a contiguous range of primary keys may be worthwhile, but would like to see what folks are asking for either way.


docs/RFCS/change_feeds_distsql.md, line 163 at r2 (raw file):

which works as follows: we "open" a long-lived ChangeFeed to a
leaseholder replica by providing it a keyspan that is a subset of its
keyspace and a starting HLC timestamp that is newer than the current

If the starting timestamp has to be newer than the current timestamp, then how will the ChangeAggregator collect all the writes between the user's requested starting timestamp and the timestamp used here?

What if the timestamp chosen here isn't newer than the leaseholder's current timestamp? I think this will need a little more thought.


docs/RFCS/change_feeds_distsql.md, line 207 at r2 (raw file):

`ChangeAggregator`, which calls the `DistSQL` span resolver, finds the
ranges that it is watching, and sets up `ChangeFeed`s on each of
them. It then receives the stream of changes, buffering them in sorted

A back-of-the-envelope calculation on how much buffering this will involve would be helpful. buffering 10+ seconds of all the writes on a large table or database seems as though it could get quite large. We'll probably need memory accounting here (although perhaps that's already implied).


docs/RFCS/change_feeds_distsql.md, line 231 at r2 (raw file):

We use the System.jobs table to keep track of whether any
`ChangeAggregator`s are alive. This part is Work in progress, as it is
not clear to me how much work is required for the `ChangeAggregator`

cc @benesch


docs/RFCS/change_feeds_distsql.md, line 280 at r2 (raw file):

the CDC to resume without throwing everything away and starting from
scratch, as we do with regular DistSQL queries, only retrying at the
executor level. This is not something that is in scope for 1.2.

So what will the behavior be for 1.2 then? Everything will have to restart when a processor dies?


docs/RFCS/change_feeds_distsql.md, line 440 at r2 (raw file):

   the final arbiter of whether the write happens or not.

5. `R_A` waits until it receives unanimous consent. If it receives a

As with Ben's comment above, R_A isn't responsible for this (or for step 6 below) -- the transaction coordinator is.


Comments from Reviewable

@tbg
Copy link
Member

tbg commented Aug 14, 2017

A close notification for time T can roughly be sent at T+max_configured_txn_duration, so there's a fairly variable duration for which the processors may have to buffer changes (... to aux storage?). We should be able to advance the watermark fairly closely, to that optimal window, though if we have a lot of open txn intents below our desired watermark we'll have to nuke those txns first, which could cause us to fall behind in the presence of an adversarial workload (lots of single-write txns that don't respect their max durations).

For a CDC that simply outputs everything to some other MVCC-aware storage without any aggregation at the txn level, I think close notifications don't block anything -- each processor receiving change feed updates simply relays everything back up the chain; close notifications only serve to limit the amount of work that has to be done should the feed disconnect. It's trickier if you're trying to group emitted updates into complete per-timestamp write sets (i.e. emit all writes at ts T at the same time, or emit all writes of a txn at the same time) because you do have to wait for all possible sources of such values to close out for any given timestamp you wish to report.


Reviewed 1 of 1 files at r2.
Review status: all files reviewed at latest revision, 21 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_distsql.md, line 109 at r1 (raw file):

Previously, knz (kena) wrote…

This means that clients cannot expect to start seeing events on the change feed until the txn has committed. Perhaps this can be highlighted too.

What do you mean, @knz?


docs/RFCS/change_feeds_distsql.md, line 207 at r2 (raw file):

Previously, a-robinson (Alex Robinson) wrote…

A back-of-the-envelope calculation on how much buffering this will involve would be helpful. buffering 10+ seconds of all the writes on a large table or database seems as though it could get quite large. We'll probably need memory accounting here (although perhaps that's already implied).

I think auxiliary storage will be at work here, though it'd be good to point that out.


Comments from Reviewable

@dianasaur323
Copy link
Contributor

docs/RFCS/change_feeds_distsql.md, line 20 at r2 (raw file):

single-range `ChangeFeed` primitive described in #16838. ChangeFeeds
are a replica-level system, watching a single contiguous keyspan from
a single range, and are not fault tolerant. They provide at most

Clarifying question - how would you support at-least-once at the database level if you only support at most once at the single-range primitive level?


Comments from Reviewable

@dianasaur323
Copy link
Contributor

Review status: all files reviewed at latest revision, 25 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_distsql.md, line 77 at r2 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

The summary proposes to offer both at-least-once and at-most-once options; this buffering would only be required in at-most-once modes. (Which use cases would use each mode?)

One use case I'm aware of is stream processing of IoT data, where you don't need every single ping in order to get interesting insights.


docs/RFCS/change_feeds_distsql.md, line 95 at r2 (raw file):

Previously, a-robinson (Alex Robinson) wrote…

Is this required by Kafka or other major consumers? Why is it worth the effort?

Is this basically saying that we want to preserve commit ordering? If we want to be able to replay transaction logs, it's an important feature, but based on my understanding of Kafka, it's not a requirement.


docs/RFCS/change_feeds_distsql.md, line 300 at r2 (raw file):

possible for a recovered CDC to deduplicate, and recover exactly-once
delivery as long as recovery happens before the sliding window grows
too large and messages are dropped.

On this note, for 1.2, I think we should try to punt complexity to Kafka as much as possible. Also, PostgreSQL has this functionality for progress tracking that is actually user-facing / configurable. https://www.postgresql.org/docs/9.5/static/replication-origins.html


docs/RFCS/change_feeds_distsql.md, line 320 at r2 (raw file):

2. A stub `ChangeAggregator` that strips the internal information out
of the incoming `ChangeFeed` stream, stores everything in-memory, and
pushes as a Kafka producer to a Kafka cluster.

Do we also want to support becoming a data sink - ingesting data from Kafka?


docs/RFCS/change_feeds_distsql.md, line 343 at r2 (raw file):

5 is an unknown (comments welcome!), as it's unclear how much of the
work done by the Bulk I/O team on resumability and jobs tables can be
reused. 7 is not scoped out in this RFC.

Could you clarify 7 then with the expected behavior with the current implementation?


Comments from Reviewable

@knz
Copy link
Contributor

knz commented Aug 25, 2017

reminder: don't forget to rebase if you plan to continue work on this, and pick up the new RFC sections + naming convention.

@dianasaur323
Copy link
Contributor

docs/RFCS/change_feeds_distsql.md, line 117 at r1 (raw file):

* Finally, a design for change feeds should be forward compatible with
  feeding changes _into_ incrementally updated materialized views
  using the timely+differential dataflow design.

Another consideration here from Spencer is we eventually might want to support CRDB<>CRDB streams. Just something to keep in mind.


Comments from Reviewable

@dianasaur323
Copy link
Contributor

docs/RFCS/change_feeds_distsql.md, line 113 at r1 (raw file):

* A change feed should gracefully partition when the rate of changes
  are too large for a single processor to handle into multiple Kafka
  "topics".

Correct me if I'm wrong, but won't this make ordering incredibly difficult?


Comments from Reviewable

tbg added a commit to tbg/cockroach that referenced this pull request Dec 6, 2017
This RFC is incomplete. Is is merged in `draft` status.

This RFC proposes a mechanism for subscribing to changes to a set of key ranges
starting at an initial timestamp.

It is for use by Cockroach-internal higher-level systems such as distributed
SQL, change data capture, or a Kafka producer endpoint. A "sister RFC" detailing
these higher-level systems is in cockroachdb#17535. See also cockroachdb#19222 for a related
follower-reads RFC.
tbg added a commit to tbg/cockroach that referenced this pull request Dec 6, 2017
This RFC is incomplete. Is is merged in `draft` status.

This RFC proposes a mechanism for subscribing to changes to a set of key ranges
starting at an initial timestamp.

It is for use by Cockroach-internal higher-level systems such as distributed
SQL, change data capture, or a Kafka producer endpoint. A "sister RFC" detailing
these higher-level systems is in cockroachdb#17535. See also cockroachdb#19222 for a related
follower-reads RFC.
@jordanlewis jordanlewis removed this from the 2.0 milestone Mar 20, 2018
@rjnn rjnn closed this Jun 6, 2018
@rjnn rjnn deleted the change_feeds_distsql branch June 6, 2018 17:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants