Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Storage-Level Primitive for Change Feeds #16838

Merged
merged 1 commit into from
Dec 6, 2017

Conversation

tbg
Copy link
Member

@tbg tbg commented Jul 1, 2017

This RFC proposes a mechanism for subscribing to changes to a set of key ranges
starting at an initial timestamp.

It is for use by Cockroach-internal higher-level systems such as distributed
SQL, change data capture, or a Kafka producer endpoint. A "sister RFC"
detailing these higher-level systems will be prepared by @arjunravinarayan.

NB: This isn't on the roadmap, but we thought it would be good to get the ball rolling.

cc @arjunravinarayan

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@bdarnell
Copy link
Contributor

bdarnell commented Jul 4, 2017

This looks pretty good, although it's leaning heavily on the follower-read RFC which still has a lot of unresolved questions, and I'm not sure the answers there are necessarily compatible with the needs of this RFC. For transactional writes, the change feed event is emitted when the intent is resolved, instead of when the original write happens. For follower reads, we've discussed implementing close timestamps via the timestamp cache or something like it, with the semantics that "no more writes with a timestamp less than T will be accepted". However, there could still be unresolved intents with timestamps less than T. For change feeds, we rewrite that "no new events with timestamp less than T will be emitted", so we must resolve all intents before we can advance the close timestamp.


Review status: 0 of 1 files reviewed at latest revision, 7 unresolved discussions, some commit checks failed.


docs/RFCS/change_feeds_primitive.md, line 55 at r1 (raw file):

  (CDC). This is something companies tend to ask about.
- implement efficient incrementally updated materialized views. This is
  something we think everyone wants, but does not dare to ask.

This is one of the coolest features of CouchDB :)


docs/RFCS/change_feeds_primitive.md, line 64 at r1 (raw file):

  RETURNING CURRENT_TIMESTAMP()`) to the stream of updates.
- chose to operate on a set of key ranges since that captures both collections
  of individual keys, sets of tables, or whole databases (CDT).

s/CDT/CDC/g


docs/RFCS/change_feeds_primitive.md, line 82 at r1 (raw file):

- aim to serve change feeds from follower replicas (hence the connection to
  [16593][followerreads]).
- make the protocol efficient enough to poll whole databases in practice.

What does "poll" mean here?


docs/RFCS/change_feeds_primitive.md, line 96 at r1 (raw file):

- `key` was set to `value` at `timestamp`, and
- `key` was deleted at `timestamp`.

There's also the "closed timestamp" event described above.

Can consumers assume that for a given key, timestamps are monotonic? Can they make any assumptions about timestamps across keys (other than what is given by closed timestamps)?

Is DeleteRange represented as a range operation or does it return the specific keys that are deleted? (I'm assuming the latter based on the use of WriteBatch)


docs/RFCS/change_feeds_primitive.md, line 105 at r1 (raw file):

## Replica-level

Replicas (whether they're lease holder or not) accept a new `ChangeFeed` command

This will be a new top-level RPC (peer to Batch), and not a member of RequestUnion, right? (Otherwise I don't see how you stream responses)


docs/RFCS/change_feeds_primitive.md, line 223 at r1 (raw file):

## Performance

Performance concerns exist mostly in two areas: keeping the follower reads

A third performance concern is that when single-row watches exist, there will likely be a lot of them active at once. This will have a memory cost, and could have a CPU cost as every write might need to be matched against many watchers.


docs/RFCS/change_feeds_primitive.md, line 233 at r1 (raw file):

close out timestamps.

The second concern is the case in which there is a high rate of `ChangeFeed`

A single ChangeFeed operation with a base timestamp far in the past could also be a problem. It needs to process all events since its base timestamp before it can switch into streaming mode and get its first closed timestamp update. This could happen in a loop if the clients can't keep up. We may want to place limits on how far back ChangeFeeds can go (on the other hand, if a ChangeFeed is rejected, the client will probably fall back to doing a SELECT which may be even more expensive).


Comments from Reviewable

@rjnn
Copy link
Contributor

rjnn commented Jul 4, 2017

Reviewed 1 of 1 files at r1.
Review status: all files reviewed at latest revision, 17 unresolved discussions, some commit checks failed.


docs/RFCS/change_feeds_primitive.md, line 25 at r1 (raw file):

   key_range)` where receiving `(ts1, [a,b))` guarantees that no future update
   affecting `[a,b)` will be sent for a timestamp less than `ts1`.

It should also return a failure condition, if the timestamp is too far back in time and it cannot stream all changes from that timestamp because the GC queue has kicked in.


docs/RFCS/change_feeds_primitive.md, line 64 at r1 (raw file):

  RETURNING CURRENT_TIMESTAMP()`) to the stream of updates.
- chose to operate on a set of key ranges since that captures both collections
  of individual keys, sets of tables, or whole databases (CDT).

CDC is the preferred acronym (Change Data Capture)


docs/RFCS/change_feeds_primitive.md, line 101 at r1 (raw file):

we assume that we are passing along `WriteBatch`es, though that is not a hard
requirement. Typical consumers will live in the SQL subsystem, so they may
profit from a simplified format.

Definitely think the format will be much simpler, although doesn't need to be specced out in this document.


docs/RFCS/change_feeds_primitive.md, line 118 at r1 (raw file):

is for. We invoke a new MVCC operation (specified below) that, given a base
timestamp and a set of key ranges, synthesizes ChangeFeed notifications from the
snapshot (this is possible if the base timestamp does not violate the GC

I don't understand the sentence inside the parenthesis, it needs elaboration.


docs/RFCS/change_feeds_primitive.md, line 136 at r1 (raw file):

close notification.

Initially, streams may also disconnect if they find themselves unable to keep up

Instead of backpressure, we should consider triggering an early replica split if we're under heavy load as well. It's an easier solution.


docs/RFCS/change_feeds_primitive.md, line 146 at r1 (raw file):

key-value pairs (whether insertions or deletions) with a timestamp larger than
or equal to the base timestamp were removed from the snapshot, applying the
emitted `WriteBatch`es (in any order) would restore the initial snapshot.

How about: We need a command C defined as follows: given a snapshot S1, a key span [K], and and a base timestamp T1 < current time T2, C(S1, T1, K) -> [B] produces a set of WriteBatches [B], such that:
If all MVCC KVs with timestamp t such that T1 < t <= T2 and keys in [K] are removed from S2,
Applying all the set of WriteBatches [B] in order on S1 recovers S2.


docs/RFCS/change_feeds_primitive.md, line 150 at r1 (raw file):

Note that if a key is deleted at `ts1` and then a new value written at `ts2 >
ts1`, we don't have to emit the delete before the write; we give no ordering
guarantees (doing so would require a scan in increasing timestamp order, which

I don't get this argument, perhaps it needs more elaboration, with a little bit more formality.

Would applying the WriteBatches (Put K1t1, Delete K1t2, Put K1t3, Delete K1t4) in any order give the right answer? What happens if we apply them in the order (Delete K1t4, Delete K1t2, Put K1t3, Put K1t1)?


docs/RFCS/change_feeds_primitive.md, line 172 at r1 (raw file):

fact. That means that they need to be able to (mostly correctly) guess which, if
any, follower Replica is able to serve reads at a given timestamp. Reads can be
served from a Replica if it has been notified of that fact, i.e. if a higher

So far we have not covered how close notifications are propagated to followers and become part of the regular Raft commands. That would be required to make this happen.


docs/RFCS/change_feeds_primitive.md, line 181 at r1 (raw file):

each range. When individual streams disconnect (for instance due to a split) new
streams are initiated (using the last known closed timestamp for the lost
stream) and any higher-level consumers notified that they need to drop their

Streams need to be heartbeating their close notifications anyway, so this may not even be required, although it's a nice-to-have.


docs/RFCS/change_feeds_primitive.md, line 217 at r1 (raw file):

expose that through SQL in a full-fledged API. This should allow watching simple
`SELECT` statements (anything that boils down to simple table-readers with not
too much aggregation going on), and then, of course, materialized views.

of course. ;)


Comments from Reviewable

@petermattis
Copy link
Collaborator

I'm very happy to see thought being put into this already. I believe CDC is going to be the first user and am mildly anxious about the proposed mechanism because we don't have the companion document describing CDC that is needed to understand the complete system.


Review status: all files reviewed at latest revision, 25 unresolved discussions, some commit checks failed.


docs/RFCS/change_feeds_primitive.md, line 17 at r1 (raw file):

detailing these higher-level systems will be prepared by @arjunravinarayan.

Add a basic building block for change feeds. Namely, add a command `ChangeFeed`

The first sentence sounds incomplete to me. Or perhaps just abrupt.


docs/RFCS/change_feeds_primitive.md, line 28 at r1 (raw file):

The design overlaps with incremental Backup/Restore and in particular [follower
reads][followerreads], but also has relevance for [streaming SQL
results][streamsql]

Sentence needs a period.


docs/RFCS/change_feeds_primitive.md, line 53 at r1 (raw file):

- stream updates to a table or database into an external system, for example
  Kafka, with at-least-once semantics. This is also known as Change Data Capture
  (CDC). This is something companies tend to ask about.

Should add why companies tend to ask for it (e.g. to move data into an analytics platform).


docs/RFCS/change_feeds_primitive.md, line 78 at r1 (raw file):

- make the close notification threshold configurable via `ZoneConfig`s (though
  this is really a requirement we impose on [16593][followerreads]). Fast close
  notifications (which allow consumers to operate more efficiently) correspond

Is the term "close notifications" taken from somewhere? I'm finding it confusing when reading this doc as I think of closing a stream which isn't analogous.


docs/RFCS/change_feeds_primitive.md, line 112 at r1 (raw file):

future `WriteBatch`es which apply on the Replica, and sends them on the stream
to the caller (after suitably sanitizing to account only for the updates which
are relevant to the span the caller has supplied).

How is buffering going to work here? What happens if the ChangeFeed stream is "slow" and cannot consume the changes being fed to it fast enough? Either we slow down the rate of accepting changes to the replica, or we drop the ChangeFeed which could cause more disruption.


docs/RFCS/change_feeds_primitive.md, line 115 at r1 (raw file):

The remaining difficulty is that additionally, we must retrieve all updates made
at or after the given base HLC timestamp, and this is what the engine snapshot

This sentence was confusing to me as written. The snapshot is used to retrieve the updates that were made between a base timestamp and the time when the snapshot was taken (which is also when the feed was registered).

My question about buffering becomes more serious here. I assume we're going to send these old changes to the feed first, but doing so might take 10s of seconds (or longer), during which time we'd have to buffer the "live" changes. Buffering those changes in memory feels problematic. Buffering them to disk feels complicated.


docs/RFCS/change_feeds_primitive.md, line 134 at r1 (raw file):

received before the stream disconnected. If the Replica gets removed, it will
simply reconnect to another Replica, again using its most recently received
close notification.

The close notification is expected to be O(10s), right? This protocol will result in the changes for the past 10s getting resent. Am I missing something here? I think this needs to be called out explicitly in the doc.


docs/RFCS/change_feeds_primitive.md, line 254 at r1 (raw file):

ranged event downstream of Raft.

# Unresolved Questions

Does the proposed mechanism provide the correct building block for Change Data Capture? In particular, what interface does CDC want to present? A stream of key updates? A stream of row updates? A stream of committed transactions? A stream of key or row updates seems feasible with this mechanism. I don't see how a stream of committed transactions would be accomplished. I realize that the design of CDC is outside of the scope of this RFC, but it would be a shame to implement this mechanism and then discover that certain CDC requirements make it unusable.


Comments from Reviewable

@a-robinson
Copy link
Contributor

Review status: all files reviewed at latest revision, 30 unresolved discussions, some commit checks failed.


docs/RFCS/change_feeds_primitive.md, line 41 at r1 (raw file):

Anecdotally, change feeds are one of the more frequently
requested features for CockroachDB.

I'm sure you're aware, but there's a very real discussion to be had about prioritization here. Change data feeds are only useful to a user if the rest of the system is good enough for them to use it as their primary data store :)


docs/RFCS/change_feeds_primitive.md, line 106 at r1 (raw file):

a key range

So if, at a higher level, a client wants to watch multiple key spans from the same range, it'll have to issue multiple ChangeFeed commands to (and listen on multiple streams from) the same replica? Either that, or we'll have to make a BatchStream that could include multiple ChangeFeeds. It seems like this would be cleaner with multiple spans allowed in the command.


docs/RFCS/change_feeds_primitive.md, line 136 at r1 (raw file):

close notification.

Initially, streams may also disconnect if they find themselves unable to keep up

When you say "if they find themselves unable to keep up", do you mean if the client isn't processing items off the stream quickly enough? Is that easy to detect in grpc?


docs/RFCS/change_feeds_primitive.md, line 150 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

I don't get this argument, perhaps it needs more elaboration, with a little bit more formality.

Would applying the WriteBatches (Put K1t1, Delete K1t2, Put K1t3, Delete K1t4) in any order give the right answer? What happens if we apply them in the order (Delete K1t4, Delete K1t2, Put K1t3, Put K1t1)?

I don't see why the question of whether the WriteBatches could be reapplied to RocksDB is important here. Shouldn't we only care about what clients see, which I was curious about at first but seems to be addressed by the option described in the next paragraph?


docs/RFCS/change_feeds_primitive.md, line 157 at r1 (raw file):

together in descending timestamp order, so once we've created `N` `WriteBatch`es
for this key, we simply emit these `N` items in reverse order once we're done
with the key. This mode of operation would be requested by `ChangeFeed`s which

This sentence makes it sound like clients could request ascending order to avoid waiting, which seems backwards.

However, I'm not seeing why ascending order on a per-key basis would require waiting during steady-state operation. A replica shouldn't be applying an update to a key until all after all prior updates to that key have already been applied, so it seems as though there wouldn't be waiting required to just emit updates as they're applied.


docs/RFCS/change_feeds_primitive.md, line 176 at r1 (raw file):

Assuming such a mechanism in place, whether in DistSender or distributed SQL, it
should be straightforward to entity which abstracts away the Range level. For

"straightforward to entity"?


Comments from Reviewable

@rjnn
Copy link
Contributor

rjnn commented Jul 5, 2017

Reviewed 1 of 1 files at r2.
Review status: all files reviewed at latest revision, 32 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_primitive.md, line 41 at r1 (raw file):

Previously, a-robinson (Alex Robinson) wrote…

Anecdotally, change feeds are one of the more frequently
requested features for CockroachDB.

I'm sure you're aware, but there's a very real discussion to be had about prioritization here. Change data feeds are only useful to a user if the rest of the system is good enough for them to use it as their primary data store :)

To make it completely clear, I (and I believe Toby as well) have been working on change feeds on Fridays. I'm working on it now because I'm on vacation :)


docs/RFCS/change_feeds_primitive.md, line 53 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Should add why companies tend to ask for it (e.g. to move data into an analytics platform).

But also replication to cold storage/legacy caches.


docs/RFCS/change_feeds_primitive.md, line 78 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Is the term "close notifications" taken from somewhere? I'm finding it confusing when reading this doc as I think of closing a stream which isn't analogous.

It was cribbed from the skunkworks materialized views RFC (which cribbed it from Timely Dataflow). It needs definition in this doc. Here's something to steal:

  1. Each tuple is passed along with a timestamp.
  2. Timestamps can optionally be "closed".
  3. When a sender outputs that timestamp t is closed, it guarantees that it will never ever send another timestamp with t' where t' <= t. (Condition 1)
  4. A receiver of a timestamp t may only output timestamps t' where t' > t. This guarantees that tuples do not go "backwards in time". (Condition 2)
  5. Close t_2 where t_2 >= t_1 implies close t_1.
  6. A sender should eventually close every timestamp it sends.

docs/RFCS/change_feeds_primitive.md, line 96 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

There's also the "closed timestamp" event described above.

Can consumers assume that for a given key, timestamps are monotonic? Can they make any assumptions about timestamps across keys (other than what is given by closed timestamps)?

Is DeleteRange represented as a range operation or does it return the specific keys that are deleted? (I'm assuming the latter based on the use of WriteBatch)

It is my understanding that Cockroach timestamps can be treated as monotonic across all keys.
DeleteRange returns a bag of keys, like WriteBatch.


docs/RFCS/change_feeds_primitive.md, line 106 at r1 (raw file):

Previously, a-robinson (Alex Robinson) wrote…

a key range

So if, at a higher level, a client wants to watch multiple key spans from the same range, it'll have to issue multiple ChangeFeed commands to (and listen on multiple streams from) the same replica? Either that, or we'll have to make a BatchStream that could include multiple ChangeFeeds. It seems like this would be cleaner with multiple spans allowed in the command.

I've been thinking about watching entire ranges. Any filtering that involves spans-with-holes can happen downstream, to keep things simple.


docs/RFCS/change_feeds_primitive.md, line 112 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

How is buffering going to work here? What happens if the ChangeFeed stream is "slow" and cannot consume the changes being fed to it fast enough? Either we slow down the rate of accepting changes to the replica, or we drop the ChangeFeed which could cause more disruption.

I think we should drop the ChangeFeed. A guiding principle in this design is to impose as minimal as possible an overhead on the OLTP side of Cockroach. You are correct that dropping the ChangeFeed would cause disruption if it just results in the consumer doing a full table scan, so we should note that follower-reads are an important component of imposing minimum overhead on the OLTP side.


docs/RFCS/change_feeds_primitive.md, line 115 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

This sentence was confusing to me as written. The snapshot is used to retrieve the updates that were made between a base timestamp and the time when the snapshot was taken (which is also when the feed was registered).

My question about buffering becomes more serious here. I assume we're going to send these old changes to the feed first, but doing so might take 10s of seconds (or longer), during which time we'd have to buffer the "live" changes. Buffering those changes in memory feels problematic. Buffering them to disk feels complicated.

We can send changes out of order (making buffering the receiver's problem), we just can't send close notifications until we are sure of the ordering. If the receiver requires a total ordering, it is the receivers job to buffer, sort, and wait for close notifications before doing any work with the stream. The first consumer of these change feeds will be a DistSQL processor, and DistSQL processors have access to sorted External Storage, and they can do exactly this.

You should not think of the change feed sender as doing anything memory intensive, ever. If you are, then this document is misleading and needs fixing.


docs/RFCS/change_feeds_primitive.md, line 134 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

The close notification is expected to be O(10s), right? This protocol will result in the changes for the past 10s getting resent. Am I missing something here? I think this needs to be called out explicitly in the doc.

Close notifications for a running stream are expected to trail by ~10 seconds. This doesn't mean the tuples are resent. Here's how it would work:
Replica receives a write for k1=v1 at t=1. It eagerly sends out this update.
Replica receives a write for k2=v2 at t=4. It eagerly sends this update.
It is now t=11, and the 10 second trailing watermark has passed. it sends along a close timestamp 1 message. The client can now be sure that nothing will ever commit henceforth with t<=1.
It is now t=13.
Replica receives a write for k3=v3 at t=7 (long running transaction that took a while to commit). It sends out this update. This is legal, because 7 is not yet closed.
It is now t=14. The replica sends close timestamp 4. The client can now be sure that k2=v2 won't be written under at an earlier timestamp and can process it.


docs/RFCS/change_feeds_primitive.md, line 136 at r1 (raw file):

Previously, a-robinson (Alex Robinson) wrote…

When you say "if they find themselves unable to keep up", do you mean if the client isn't processing items off the stream quickly enough? Is that easy to detect in grpc?

Yes. A client that is unable to process the stream quickly needs to do something, most likely binary split itself into two clients, which process one half of the stream each. This will be covered in the sister RFC.


docs/RFCS/change_feeds_primitive.md, line 157 at r1 (raw file):

Previously, a-robinson (Alex Robinson) wrote…

This sentence makes it sound like clients could request ascending order to avoid waiting, which seems backwards.

However, I'm not seeing why ascending order on a per-key basis would require waiting during steady-state operation. A replica shouldn't be applying an update to a key until all after all prior updates to that key have already been applied, so it seems as though there wouldn't be waiting required to just emit updates as they're applied.

I don't think clients should ever be able to request ascending order at this level of abstraction. If we want to service clients who have this need, we just need a simple DistSQL processor that buffers rows, sorts them, and waits for close notifications before sending them on in-order. The DistSQL processor sits in between this change feed and the end-client.


docs/RFCS/change_feeds_primitive.md, line 223 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

A third performance concern is that when single-row watches exist, there will likely be a lot of them active at once. This will have a memory cost, and could have a CPU cost as every write might need to be matched against many watchers.

Yes. It's an open question how many CDCs will come out of a replica. I think of there being a single CDC out of every range, for the entire range, and that getting mirrored, filtered, etc to multiple "real" consumers.
Eventually we will have zone configs that split a cluster into Cockroach OLTP nodes and Cockroach OLAP nodes.
From the OLTP nodes, there will be CDC streams from the ranges to the OLAP nodes. Each range should have a single stream. From the OLAP nodes we can service as many downstream client-CDCs, or materialized views, or whatever.


docs/RFCS/change_feeds_primitive.md, line 233 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

A single ChangeFeed operation with a base timestamp far in the past could also be a problem. It needs to process all events since its base timestamp before it can switch into streaming mode and get its first closed timestamp update. This could happen in a loop if the clients can't keep up. We may want to place limits on how far back ChangeFeeds can go (on the other hand, if a ChangeFeed is rejected, the client will probably fall back to doing a SELECT which may be even more expensive).

You can only truly solve this load issue with follower reads.


docs/RFCS/change_feeds_primitive.md, line 254 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Does the proposed mechanism provide the correct building block for Change Data Capture? In particular, what interface does CDC want to present? A stream of key updates? A stream of row updates? A stream of committed transactions? A stream of key or row updates seems feasible with this mechanism. I don't see how a stream of committed transactions would be accomplished. I realize that the design of CDC is outside of the scope of this RFC, but it would be a shame to implement this mechanism and then discover that certain CDC requirements make it unusable.

A stream of row updates is what people really want, at least from the perspective of a) what RethinkDB provided, and b) what materialized views needs.


docs/RFCS/change_feeds_primitive.md, line 104 at r2 (raw file):

1. `key=x` gets deleted at `ts=120`     // deletion reported out of order
1. `[a, z)` closes at `ts=110`
1. `[a, z)` closes at `ts=100`          // close notification out of order

But do note that this second close notification is completely moot, as close(t1) implies close(t2) for all t2 <= t1.


docs/RFCS/change_feeds_primitive.md, line 111 at r2 (raw file):

1. `key=x` gets deleted at `ts=90`

In practice, these "anomalies" would happen only infrequently, and usually

No these should never happen.


docs/RFCS/change_feeds_primitive.md, line 155 at r2 (raw file):

Initially, streams may also disconnect if they find themselves unable to keep up
with write traffic on the Range. Later, we can consider triggering an early
split adding backpressure, but this is out of scope for now.

sentence needs fixing, missing a comma or something.


Comments from Reviewable

@rjnn
Copy link
Contributor

rjnn commented Jul 5, 2017

@petermattis I will work on finishing up the companion RFC that more thoroughly describes CDCs and send it out soon.


Review status: all files reviewed at latest revision, 32 unresolved discussions, all commit checks successful.


Comments from Reviewable

@tbg tbg force-pushed the rfc_change_feeds branch from f6d72c6 to fb45822 Compare July 5, 2017 15:13
@tbg
Copy link
Member Author

tbg commented Jul 5, 2017

Yep, the design of follower reads would be influenced by this (this is partially the motivation in putting this RFC out so early). An alternative is serving change feeds from the lease holders only, but it would be good to avoid it.


Review status: 0 of 1 files reviewed at latest revision, 34 unresolved discussions.


docs/RFCS/change_feeds_primitive.md, line 17 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

The first sentence sounds incomplete to me. Or perhaps just abrupt.

Done.


docs/RFCS/change_feeds_primitive.md, line 25 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

It should also return a failure condition, if the timestamp is too far back in time and it cannot stream all changes from that timestamp because the GC queue has kicked in.

I thought that was clear, but added a few words.


docs/RFCS/change_feeds_primitive.md, line 28 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Sentence needs a period.

Done.


docs/RFCS/change_feeds_primitive.md, line 41 at r1 (raw file):

Previously, a-robinson (Alex Robinson) wrote…

Anecdotally, change feeds are one of the more frequently
requested features for CockroachDB.

I'm sure you're aware, but there's a very real discussion to be had about prioritization here. Change data feeds are only useful to a user if the rest of the system is good enough for them to use it as their primary data store :)

Oh, yeah, this feature isn't on any current roadmap afaict. May be worth pointing that out explicitly.


docs/RFCS/change_feeds_primitive.md, line 53 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Should add why companies tend to ask for it (e.g. to move data into an analytics platform).

Done.


docs/RFCS/change_feeds_primitive.md, line 64 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

CDC is the preferred acronym (Change Data Capture)

Thanks, not sure what CDT stands for.


docs/RFCS/change_feeds_primitive.md, line 78 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Is the term "close notifications" taken from somewhere? I'm finding it confusing when reading this doc as I think of closing a stream which isn't analogous.

I suppose the Naiad papers (for example https://cs.stanford.edu/~matei/courses/2015/6.S897/readings/naiad.pdf) but it's not really named explicitly there. I'm open to suggestions.


docs/RFCS/change_feeds_primitive.md, line 82 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

What does "poll" mean here?

Just "operate on". Changed.


docs/RFCS/change_feeds_primitive.md, line 96 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

There's also the "closed timestamp" event described above.

Can consumers assume that for a given key, timestamps are monotonic? Can they make any assumptions about timestamps across keys (other than what is given by closed timestamps)?

Is DeleteRange represented as a range operation or does it return the specific keys that are deleted? (I'm assuming the latter based on the use of WriteBatch)

DeleteRange would return the individual keys. If we want more, that can be arranged by attaching some meta information to the Raft proposals (i.e. the proposal corresponding to a DeleteRange would have to remember, more or less, the original request).

I expanded a little bit on the guarantees (TL;DR none given)


docs/RFCS/change_feeds_primitive.md, line 105 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

This will be a new top-level RPC (peer to Batch), and not a member of RequestUnion, right? (Otherwise I don't see how you stream responses)

Yep, made that clear.


docs/RFCS/change_feeds_primitive.md, line 106 at r1 (raw file):
Maybe the concreteness here suggests that it's not malleable, but I would expect the API to somewhat change if we actually went and implemented this. I added a bit here that indicates that the API is mostly presentational. Once we actually build this, we'll have to also route requests, which I think will influence the batching question (if we're routing through something like DistSender, adding multiple key ranges makes things more complicated, so if it isn't really needed, why do it; on the other hand, the consumers may be more like DistSQL processors from day one). Haven't thought this through because it seemed a bit early.

I've been thinking about watching entire ranges.

That also makes things hard, because now you're always moving a lot of data, even if you don't care about anything but a single key. Hence my initial reduction to one key range, which covers both that case and CDC.


docs/RFCS/change_feeds_primitive.md, line 112 at r1 (raw file):
See below, near

Initially, streams may also...


docs/RFCS/change_feeds_primitive.md, line 115 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

This sentence was confusing to me as written. The snapshot is used to retrieve the updates that were made between a base timestamp and the time when the snapshot was taken (which is also when the feed was registered).

My question about buffering becomes more serious here. I assume we're going to send these old changes to the feed first, but doing so might take 10s of seconds (or longer), during which time we'd have to buffer the "live" changes. Buffering those changes in memory feels problematic. Buffering them to disk feels complicated.

No, you don't guarantee any order, so you only have to hold back close notifications while you're not "caught up". In effect, this pushes this problem to the caller, but I think of those like DistSQL processors with temporary storage, etc, so they're equipped to do this sort of thing well already.


docs/RFCS/change_feeds_primitive.md, line 118 at r1 (raw file):
How's this?

(which is possible assuming that the base timestamp is a valid read
timestamp, i.e. does not violate the GCThreshold or the like).


docs/RFCS/change_feeds_primitive.md, line 134 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

The close notification is expected to be O(10s), right? This protocol will result in the changes for the past 10s getting resent. Am I missing something here? I think this needs to be called out explicitly in the doc.

Yes, a stream is only exactly-once until it is disrupted. The purpose of close notifications is to checkpoint work efficiently (well, as efficiently as it gets, anyway). Note that the caller isn't some user; it's some subsystem that knows that it has to keep enough state to filter duplicates that could result from duplications. I added a "semantics" section above.


docs/RFCS/change_feeds_primitive.md, line 136 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

Instead of backpressure, we should consider triggering an early replica split if we're under heavy load as well. It's an easier solution.

Done.


docs/RFCS/change_feeds_primitive.md, line 136 at r1 (raw file):

Previously, a-robinson (Alex Robinson) wrote…

When you say "if they find themselves unable to keep up", do you mean if the client isn't processing items off the stream quickly enough? Is that easy to detect in grpc?

I don't know if it's easy enough to detect in gRPC, but you could for example use a buffered channel on which you put updates, and if that channel fills up, the stream cancels (with some more lenient mechanism during the initial catchup).


docs/RFCS/change_feeds_primitive.md, line 146 at r1 (raw file):

We add an MVCC command that, given a snapshot, a key span and a base timestamp,
synthesizes events for all newer-than-base-timestamp versions of keys contained
in the span.


docs/RFCS/change_feeds_primitive.md, line 150 at r1 (raw file):
Yeah, I think this description was trying to be overly technical and that didn't add anything relevant (more the opposite...). Replaced this with just:

We add an MVCC command that, given a snapshot, a key span and a base timestamp,
synthesizes events for all newer-than-base-timestamp versions of keys contained
in the span.

I also reworked the paragraph under this thread to be clearer about ordering.


docs/RFCS/change_feeds_primitive.md, line 157 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

I don't think clients should ever be able to request ascending order at this level of abstraction. If we want to service clients who have this need, we just need a simple DistSQL processor that buffers rows, sorts them, and waits for close notifications before sending them on in-order. The DistSQL processor sits in between this change feed and the end-client.

No, we can't wait for close notifications for the Github example, it needs to be snappier than that. I reworked this by adding a SteadyState event which, once sent, acts as kind of a close notification by letting the caller know that now things happen in a more predictable order. That avoids the clutter and makes sure we don't have to wait (any more than we have to).


docs/RFCS/change_feeds_primitive.md, line 172 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

So far we have not covered how close notifications are propagated to followers and become part of the regular Raft commands. That would be required to make this happen.

That's because I think it should be discussed in the context of follower reads (though as Ben pointed out, the close notifications are more complicated than follower read notifications).


docs/RFCS/change_feeds_primitive.md, line 176 at r1 (raw file):

Previously, a-robinson (Alex Robinson) wrote…

"straightforward to entity"?

Done.


docs/RFCS/change_feeds_primitive.md, line 181 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

Streams need to be heartbeating their close notifications anyway, so this may not even be required, although it's a nice-to-have.

What do you mean by "heartbeating a close notification"?


docs/RFCS/change_feeds_primitive.md, line 223 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

A third performance concern is that when single-row watches exist, there will likely be a lot of them active at once. This will have a memory cost, and could have a CPU cost as every write might need to be matched against many watchers.

Added.


docs/RFCS/change_feeds_primitive.md, line 233 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

A single ChangeFeed operation with a base timestamp far in the past could also be a problem. It needs to process all events since its base timestamp before it can switch into streaming mode and get its first closed timestamp update. This could happen in a loop if the clients can't keep up. We may want to place limits on how far back ChangeFeeds can go (on the other hand, if a ChangeFeed is rejected, the client will probably fall back to doing a SELECT which may be even more expensive).

Done.


docs/RFCS/change_feeds_primitive.md, line 254 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Does the proposed mechanism provide the correct building block for Change Data Capture? In particular, what interface does CDC want to present? A stream of key updates? A stream of row updates? A stream of committed transactions? A stream of key or row updates seems feasible with this mechanism. I don't see how a stream of committed transactions would be accomplished. I realize that the design of CDC is outside of the scope of this RFC, but it would be a shame to implement this mechanism and then discover that certain CDC requirements make it unusable.

That's the paragraph below in "Alternative designs". I agree that getting a rough idea of CDC's requirements would be good.


Comments from Reviewable

@tbg
Copy link
Member Author

tbg commented Jul 5, 2017

BTW, I'm not sure CDC will be the "first user" here. I think the Github example is also something that's really interesting for developers. We have someone every other week asking about polling stuff like this efficiently, and there just isn't a good way to do it so far. We can make single-key watchers work much more easily than full-blown CDC.


Review status: 0 of 1 files reviewed at latest revision, 27 unresolved discussions, some commit checks pending.


docs/RFCS/change_feeds_primitive.md, line 64 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

s/CDT/CDC/g

Done.


docs/RFCS/change_feeds_primitive.md, line 104 at r2 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

But do note that this second close notification is completely moot, as close(t1) implies close(t2) for all t2 <= t1.

Reworked these examples, and added reason for why clients should be expected to handle close notifications out of order (TL;DR don't expect them in practice, but really don't want to commit to guaranteeing it seeing that it's unclear where they're even gonna come from).


docs/RFCS/change_feeds_primitive.md, line 111 at r2 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

No these should never happen.

Yeah, that was bad wording. "anomalies" referred to the stuff above the.. well, real anomaly. Since removed.


docs/RFCS/change_feeds_primitive.md, line 155 at r2 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

sentence needs fixing, missing a comma or something.

Done.


Comments from Reviewable

@tbg tbg force-pushed the rfc_change_feeds branch from fb45822 to 856a127 Compare July 5, 2017 15:23
@rjnn
Copy link
Contributor

rjnn commented Jul 5, 2017

A CDC can be the first user, but with a DistSQL layer in between that does reorderings. You should note more explicitly that a "real" user-facing CDC will have that layer in-between (since some readers assumed the opposite).


Review status: 0 of 1 files reviewed at latest revision, 23 unresolved discussions, some commit checks pending.


docs/RFCS/change_feeds_primitive.md, line 78 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

I suppose the Naiad papers (for example https://cs.stanford.edu/~matei/courses/2015/6.S897/readings/naiad.pdf) but it's not really named explicitly there. I'm open to suggestions.

The Naiad paper is not great at defining things in easily digestible words. I recommend you just copy the definition I pasted above, with a citation.


docs/RFCS/change_feeds_primitive.md, line 118 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

How's this?

(which is possible assuming that the base timestamp is a valid read
timestamp, i.e. does not violate the GCThreshold or the like).

Great!


docs/RFCS/change_feeds_primitive.md, line 134 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

Yes, a stream is only exactly-once until it is disrupted. The purpose of close notifications is to checkpoint work efficiently (well, as efficiently as it gets, anyway). Note that the caller isn't some user; it's some subsystem that knows that it has to keep enough state to filter duplicates that could result from duplications. I added a "semantics" section above.

I would suggest not ever using the words 'exactly-once', if only to avoid triggering some drive-by reader. It's like saying we are a 'CA' system in CAP. I know what you mean, but it's just asking for trouble. :)
Formally it's at-least once.


docs/RFCS/change_feeds_primitive.md, line 157 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

No, we can't wait for close notifications for the Github example, it needs to be snappier than that. I reworked this by adding a SteadyState event which, once sent, acts as kind of a close notification by letting the caller know that now things happen in a more predictable order. That avoids the clutter and makes sure we don't have to wait (any more than we have to).

But a client is free to do something with out-of-order events that aren't yet closed! Like a chat application can just deliver messages eagerly, out of order. I guess I see the point of the SteadyState event now, since after the SteadyState event we have single key ordering even if we don't have multi-key orderings. But if you think you can do some things in advance of a close notification, you are free to do so.

As a tangent, I'd like to note that this is why Naiad is so fast: in practice, it turns out that most relational operators can do a lot of things with out-of-order updates before they get close notifications, such that when the close notification finally arrives, they have O(1) or O(log n) work remaining. For a long dataflow pipeline, this dramatically reduces tail latencies at each layer of the dataflow graph, vastly improving end-to-end query processing time. This is why Naiad is faster than vanilla dataflow (i.e. DistSQL) even if you throw out all the incremental computation stateful-differential computation stuff.


docs/RFCS/change_feeds_primitive.md, line 181 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

What do you mean by "heartbeating a close notification"?

You need close notifications for timestamps even if there is no activity on the range. If I am a client who needs to interleave changes in-order from two different ranges, I need to know if timestamp t is closed from range 1, even if range 1 has no activity, because I can't ship an update at time t1 from range 2 in-order until I know for sure that range 1 has closed t1. So a range needs to send close notifications regularly, akin to heartbeats.


docs/RFCS/change_feeds_primitive.md, line 104 at r2 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

Reworked these examples, and added reason for why clients should be expected to handle close notifications out of order (TL;DR don't expect them in practice, but really don't want to commit to guaranteeing it seeing that it's unclear where they're even gonna come from).

I think SteadyState is fine, although we can bikeshed the name. What it really guarantees is single-key orderings after that point, so maybe a name that reflects that property?


Comments from Reviewable

@petermattis
Copy link
Collaborator

BTW, I'm not sure CDC will be the "first user" here. I think the Github example is also something that's really interesting for developers.

Heh, I likely overstated my belief, though I was basing that on reading the roadmap tea leaves. Let me restate: CDC appears to be a frequently requested requirement for enterprise integration. It is likely to be coming in the 1.2/1.3 time frame and we should, as pessimistic engineers, assume 1.2 until we learn otherwise. Yes, there are a whole lot of questions to be answered about CDC. I've been imagining it was a kafka feed of committed transactions. Hopefully that isn't too far off.


Review status: 0 of 1 files reviewed at latest revision, 21 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_primitive.md, line 78 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

The Naiad paper is not great at defining things in easily digestible words. I recommend you just copy the definition I pasted above, with a citation.

Perhaps we could use the term "checkpoint notification".


docs/RFCS/change_feeds_primitive.md, line 145 at r3 (raw file):

1. `key=x` gets deleted at `ts=90`

For simplicity, the stream does not commit to strictly ascending close

Curious what simplification this provides. Performing the check on the stream doesn't seem onerous.


Comments from Reviewable

@tbg tbg force-pushed the rfc_change_feeds branch from 856a127 to 0b43466 Compare July 5, 2017 17:35
@tbg
Copy link
Member Author

tbg commented Jul 5, 2017

Review status: 0 of 1 files reviewed at latest revision, 17 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_primitive.md, line 78 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Perhaps we could use the term "checkpoint notification".

@arjunravinarayan didn't copy the whole definition (after all, this RFC doesn't care about the receiver of its events - that's your RFC, and 2. doesn't really fit here), but I specialized it to Value() notifications (which are now the only kind) so that 1., 2., 3., 5., 6. are explicitly true.


docs/RFCS/change_feeds_primitive.md, line 134 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

I would suggest not ever using the words 'exactly-once', if only to avoid triggering some drive-by reader. It's like saying we are a 'CA' system in CAP. I know what you mean, but it's just asking for trouble. :)
Formally it's at-least once.

It's better than at-least-once until you reconnect, but yes, best to avoid that phrase. Updated.


docs/RFCS/change_feeds_primitive.md, line 181 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

You need close notifications for timestamps even if there is no activity on the range. If I am a client who needs to interleave changes in-order from two different ranges, I need to know if timestamp t is closed from range 1, even if range 1 has no activity, because I can't ship an update at time t1 from range 2 in-order until I know for sure that range 1 has closed t1. So a range needs to send close notifications regularly, akin to heartbeats.

Ah, I somehow misread this as heartbeats the other way around - from client to Replica. I removed this sentence - this sort of thing really fits into your RFC much better.


docs/RFCS/change_feeds_primitive.md, line 104 at r2 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

I think SteadyState is fine, although we can bikeshed the name. What it really guarantees is single-key orderings after that point, so maybe a name that reflects that property?

Happy to bikeshed, though I'm not interested in taking the lead 😅


docs/RFCS/change_feeds_primitive.md, line 145 at r3 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Curious what simplification this provides. Performing the check on the stream doesn't seem onerous.

Between reconnects, you may be talking to completely different replicas.


Comments from Reviewable

@bdarnell
Copy link
Contributor

bdarnell commented Jul 5, 2017

Review status: 0 of 1 files reviewed at latest revision, 18 unresolved discussions, some commit checks pending.


docs/RFCS/change_feeds_primitive.md, line 112 at r1 (raw file):

that follower-reads are an important component of imposing minimum overhead on the OLTP side.

Followers for some ranges are leaders for others. I think serving change feeds from followers is going to be of limited use in minimizing the production impact.

Ultimately, I think we're going to need to allow operators to classify consumers of these feeds as critical or non-critical, and for critical feeds we apply backpressure to writes. (For non-critical feeds, maybe we skip the backfill step or allow it to discard some data)


docs/RFCS/change_feeds_primitive.md, line 56 at r3 (raw file):

  Individual developers often ask for this, and it's one of the RethinkDB/etcd
  features folks really like.
- stream updates to a table or database into an external system, for example

Another variation of this use case is migrating a table from one cockroach cluster to another (for example, between a hosted service and a self-managed cluster). Our regular replication can't handle replication between clusters; something like this could be used to move tables between clusters with minimal (but non-zero) downtime.


Comments from Reviewable

@rjnn
Copy link
Contributor

rjnn commented Jul 5, 2017

Review status: 0 of 1 files reviewed at latest revision, 17 unresolved discussions, some commit checks pending.


docs/RFCS/change_feeds_primitive.md, line 78 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

@arjunravinarayan didn't copy the whole definition (after all, this RFC doesn't care about the receiver of its events - that's your RFC, and 2. doesn't really fit here), but I specialized it to Value() notifications (which are now the only kind) so that 1., 2., 3., 5., 6. are explicitly true.

Looks good to me!


docs/RFCS/change_feeds_primitive.md, line 112 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

that follower-reads are an important component of imposing minimum overhead on the OLTP side.

Followers for some ranges are leaders for others. I think serving change feeds from followers is going to be of limited use in minimizing the production impact.

Ultimately, I think we're going to need to allow operators to classify consumers of these feeds as critical or non-critical, and for critical feeds we apply backpressure to writes. (For non-critical feeds, maybe we skip the backfill step or allow it to discard some data)

Followers are leaders for other ranges, but almost every workload is skewed in some fashion, such that some specific hot ranges are the bottlenecks at any given point in time.


docs/RFCS/change_feeds_primitive.md, line 104 at r2 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

Happy to bikeshed, though I'm not interested in taking the lead 😅

How about StreamState?


Comments from Reviewable

@tbg
Copy link
Member Author

tbg commented Jul 5, 2017

Review status: 0 of 1 files reviewed at latest revision, 17 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_primitive.md, line 112 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

Followers are leaders for other ranges, but almost every workload is skewed in some fashion, such that some specific hot ranges are the bottlenecks at any given point in time.

Maybe this should've been written down more explicitly (I think @arjunravinarayan will probably dwell on it since I think I heard it from him) I was picturing that in a serious CDC setup we would have, say, ZoneConfig rules that designate some set of nodes to primarily push out changes (and/or deal with updating the views), so they would prefer to remain followers and avoid data they're not CDC'ing as much as they can.

For the single-key watcher case, there's still some benefit to having this on the followers. Better watchers/3 on each node than all on the lease holder; perhaps only a few ranges are watcher-hot. But it's hard to argue that that would be a general case, so if that was the only use case, probably easier to just serve change feeds from the lease holder only.


docs/RFCS/change_feeds_primitive.md, line 104 at r2 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

How about StreamState?

Done.


docs/RFCS/change_feeds_primitive.md, line 56 at r3 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

Another variation of this use case is migrating a table from one cockroach cluster to another (for example, between a hosted service and a self-managed cluster). Our regular replication can't handle replication between clusters; something like this could be used to move tables between clusters with minimal (but non-zero) downtime.

Added.


Comments from Reviewable

@tbg tbg force-pushed the rfc_change_feeds branch from 0b43466 to c5004a6 Compare July 5, 2017 22:09
@petermattis
Copy link
Collaborator

Review status: 0 of 1 files reviewed at latest revision, 18 unresolved discussions, some commit checks failed.


docs/RFCS/change_feeds_primitive.md, line 324 at r5 (raw file):

materialized views, change data capture. It's worth discussing whether there are
specialized solutions for, say, CDC, that are strictly superior and worth having
two separate subsystems, or that may even replace this proposed one.

Could we use the Raft log index to reduce duplicate entries when a feed reconnects? This would only help with rebalancing-based reconnects, not splits, but rebalances happen much more frequently. We'd have to include the Raft log index (and perhaps range ID) along with the timestamp. There might be other downsides to this approach that I'm not thinking of.

I was pondering this because, if I squint, the ChangeFeed at steady-state for a replica looks a lot like the contents of the Raft log.


Comments from Reviewable

@bdarnell
Copy link
Contributor

bdarnell commented Jul 6, 2017

Review status: 0 of 1 files reviewed at latest revision, 18 unresolved discussions, some commit checks failed.


docs/RFCS/change_feeds_primitive.md, line 324 at r5 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Could we use the Raft log index to reduce duplicate entries when a feed reconnects? This would only help with rebalancing-based reconnects, not splits, but rebalances happen much more frequently. We'd have to include the Raft log index (and perhaps range ID) along with the timestamp. There might be other downsides to this approach that I'm not thinking of.

I was pondering this because, if I squint, the ChangeFeed at steady-state for a replica looks a lot like the contents of the Raft log.

The main downside is that raft logs are truncated frequently, so we can't provide much of a guarantee with them (unless we want the truncation queue to consider this and apply backpressure for active change feeds).

I think it's a good idea for close notifications to include both an application-readable timestamp and an opaque blob to be passed back on reconnections.


Comments from Reviewable

@rjnn
Copy link
Contributor

rjnn commented Jul 6, 2017

A question: how feasible would it be to stream the pair <oldval, newval> for writes that are updates (as opposed to just newval)? How much overhead would this impose on the replica?


Reviewed 1 of 1 files at r4, 1 of 1 files at r5.
Review status: all files reviewed at latest revision, 17 unresolved discussions, some commit checks failed.


Comments from Reviewable

@bdarnell
Copy link
Contributor

bdarnell commented Jul 6, 2017

On a follower, retrieving the old value is always going to be fairly expensive since the old value probably won't be in cache and will have to go all the way to disk. On the leader, because most UPDATEs read the old value first, it would probably be a cache hit. Even with a cache hit, there's a nontrivial cost to processing this. It's (probably) cheaper than if the consumer of the feed would just do queries for the old values anyway, but it would be even better if consumers that need the old value maintained their own cache.


Review status: all files reviewed at latest revision, 17 unresolved discussions, some commit checks failed.


Comments from Reviewable

@tbg tbg force-pushed the rfc_change_feeds branch from c5004a6 to 118d036 Compare July 6, 2017 20:31
@tbg
Copy link
Member Author

tbg commented Jul 6, 2017

Review status: 0 of 1 files reviewed at latest revision, 17 unresolved discussions.


docs/RFCS/change_feeds_primitive.md, line 324 at r5 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

The main downside is that raft logs are truncated frequently, so we can't provide much of a guarantee with them (unless we want the truncation queue to consider this and apply backpressure for active change feeds).

I think it's a good idea for close notifications to include both an application-readable timestamp and an opaque blob to be passed back on reconnections.

Passing the log index back sounds good to me, though it's more effective to do it on every emitted event (which is the version I wrote down as an optimization). That makes catch-up a lot less scary, I think. In some distant future in which log truncation is less replicated, we could even keep the log around for longer if we are serving feeds.


Comments from Reviewable

@petermattis
Copy link
Collaborator

Review status: 0 of 1 files reviewed at latest revision, 18 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_primitive.md, line 266 at r6 (raw file):

1. Since log-based catch-up is more effective, `resume_token` should be
   specified whenever it is available, even if the caller already keeps state
   for deduplication.

👍


Comments from Reviewable

@bdarnell
Copy link
Contributor

Review status: 0 of 1 files reviewed at latest revision, 19 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_primitive.md, line 252 at r6 (raw file):

   that has triggered the event.
1. when a stream drops, the caller reconnects with both the highest known
   checkpointed timestamp *and* the largest seen `resume_token` (if any).

Does "largest" here imply that the client is doing a lexicographic comparison, or is it just using the most recent? If so, the token should be a version number followed by the raft log index, encoded with our ordered key encoding. (if there's no lexicographic comparison it should probably be a protobuf)


Comments from Reviewable

@tbg
Copy link
Member Author

tbg commented Jul 11, 2017

Review status: 0 of 1 files reviewed at latest revision, 19 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_primitive.md, line 252 at r6 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

Does "largest" here imply that the client is doing a lexicographic comparison, or is it just using the most recent? If so, the token should be a version number followed by the raft log index, encoded with our ordered key encoding. (if there's no lexicographic comparison it should probably be a protobuf)

Largest as defined by some total order (basically highest log index). I don't see how these would arrive out of order anyway, but I like cementing the fact that we don't have to think about it. From the clients perspective, I think an opaque blob (int or lexicographical bytes) will do.


Comments from Reviewable

@rjnn
Copy link
Contributor

rjnn commented Jul 22, 2017

docs/RFCS/change_feeds_primitive.md, line 105 at r6 (raw file):

The events emitted are simple:
Value(key, value, timestamp): key was set to value at timestamp, and in particular

I think we also need the transaction UUID as well. It is my understanding from an offline conversation with @bdarnell that this is possible without much additional effort, but I'm making this desire explicit here in case there are complications with delivering it.


Comments from Reviewable

@tbg
Copy link
Member Author

tbg commented Jul 22, 2017

Review status: 0 of 1 files reviewed at latest revision, 14 unresolved discussions, all commit checks successful.


docs/RFCS/change_feeds_primitive.md, line 105 at r6 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

The events emitted are simple:
Value(key, value, timestamp): key was set to value at timestamp, and in particular

I think we also need the transaction UUID as well. It is my understanding from an offline conversation with @bdarnell that this is possible without much additional effort, but I'm making this desire explicit here in case there are complications with delivering it.

The UUID naively can't be reconstructed from the WriteBatch because it isn't always sent (1PC transactions), so additional effort will be necessary. We have the ID in most WriteBatches when intents are written, so it could mostly be recovered, but we'll definitely have to add it in the 1PC path. What's the use case that requires the txn id?


Comments from Reviewable

@rjnn
Copy link
Contributor

rjnn commented Jul 22, 2017

docs/RFCS/change_feeds_primitive.md, line 105 at r6 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

The UUID naively can't be reconstructed from the WriteBatch because it isn't always sent (1PC transactions), so additional effort will be necessary. We have the ID in most WriteBatches when intents are written, so it could mostly be recovered, but we'll definitely have to add it in the 1PC path. What's the use case that requires the txn id?

If we want to group changes at the client-facing CDC by transaction rather than by row, we need the transaction UUID.


Comments from Reviewable

@bdarnell
Copy link
Contributor

1PC transactions are by definition single-range operations, so we never need to correlate them with anything else. It may not matter that the original transaction id has been lost; we can just synthesize a new UUID for any write batch that applies without a transaction id. (although it's possible to expose the transaction id through sql, so it's probably better to plumb it through the evaluation stack if we can)

@rjnn rjnn mentioned this pull request Aug 9, 2017
rjnn pushed a commit to rjnn/cockroach that referenced this pull request Aug 9, 2017
This RFC proposes a mechanism for subscribing to changes to a table or
database. It will support the Apache Kafka publisher API. It will make
use of a Replica level subsystem "ChangeFeed", the design of which is
described in cockroachdb#16838.
Copy link
Contributor

@knz knz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am missing something in this RFC: operational concerns.

An administrator must be able to monitor which change trackers have been configured on each node, be able to measure how fast they appear to be, how much they are "lagging behind", etc. this means all these trackers must be aptly named, and registered in some concurrently inspectable data structure.

Also be able to disconnect them from "outside" of the app/client that defined them, or at least present a textual hint about how to disconnect them by other means.

Also report these things in cockroach zip and debug endpoints.

duplicates and miss updates.

This is a toy implementation that already makes for a good demo and allows a
similar toy implementation that uses [streaming SQL][streamsql] once it's
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to clarify somewhere what this "streaming SQL" feature is that you're referring to multiple times.

rjnn pushed a commit to rjnn/cockroach that referenced this pull request Aug 11, 2017
This RFC proposes a mechanism for subscribing to changes to a table or
database. It will support the Apache Kafka publisher API. It will make
use of a Replica level subsystem "ChangeFeed", the design of which is
described in cockroachdb#16838.
@knz
Copy link
Contributor

knz commented Aug 25, 2017

reminder: don't forget to rebase if you plan to continue work on this, and pick up the new RFC sections + naming convention.

@coder0718
Copy link

coder0718 commented Oct 31, 2017

Does reovery ChangeFeeds's history data from the rocksdb's snapshot needs to scanning the entire range?

if there are multiple ChangeFeeds that need to be restored at the same time, it may take a long time to scan.


MVCC-level
We add an MVCC command that, given a snapshot, a key span and a base timestamp, synthesizes events for all newer-than-base-timestamp versions of keys contained in the span.
Note that data is laid out in timestamp-descending per-key order, which is the opposite of the naively expected order. However, during the catch-up phase, the stream may emit events in any order, and so there is no problem. A reverse scan could be used instead and would result in forward order, albeit perhaps at a slight performance cost and little added benefit.
Note: There is significant overlap with NewMVCCIncrementalIterator, which is used in incremental Backup/Restore.

@tbg
Copy link
Member Author

tbg commented Nov 3, 2017 via email

@tbg
Copy link
Member Author

tbg commented Dec 6, 2017

I was thinking that I should rebase this (to pick up the new naming convention) and merge it as a draft. Or is it preferable to just leave it open?

@a-robinson
Copy link
Contributor

I'd be fine with merging it as a draft given that we don't expect to touch it for a few months.

@tbg tbg force-pushed the rfc_change_feeds branch from 118d036 to 7a7911d Compare December 6, 2017 15:16
@tbg tbg requested a review from a team as a code owner December 6, 2017 15:16
@knz
Copy link
Contributor

knz commented Dec 6, 2017

LGTM with nit


Reviewed 2 of 2 files at r7.
Review status: all files reviewed at latest revision, 15 unresolved discussions, some commit checks pending.


docs/RFCS/20170613_change_feeds_storage_primitive.md, line 5 at r7 (raw file):

- Start Date: 2017-06-13
- Authors: Arjun Narayan and Tobias Schottdorf
- RFC PR: TBD

update this field


Comments from Reviewable

@tbg tbg force-pushed the rfc_change_feeds branch from 7a7911d to 4860851 Compare December 6, 2017 15:31
@tbg
Copy link
Member Author

tbg commented Dec 6, 2017

TFTR!


Review status: 0 of 1 files reviewed at latest revision, 15 unresolved discussions, some commit checks pending.


docs/RFCS/20170613_change_feeds_storage_primitive.md, line 105 at r6 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

If we want to group changes at the client-facing CDC by transaction rather than by row, we need the transaction UUID.

I added this to "Unresolved Questions". It seems impossible to support, though? We don't store TxnIDs with writes and it would be prohibitive to do so, so at the very least, you have hard to overcome problems when setting up the stream.


docs/RFCS/20170613_change_feeds_storage_primitive.md, line 5 at r7 (raw file):

Previously, knz (kena) wrote…

update this field

Done.


Comments from Reviewable

This RFC is incomplete. Is is merged in `draft` status.

This RFC proposes a mechanism for subscribing to changes to a set of key ranges
starting at an initial timestamp.

It is for use by Cockroach-internal higher-level systems such as distributed
SQL, change data capture, or a Kafka producer endpoint. A "sister RFC" detailing
these higher-level systems is in cockroachdb#17535. See also cockroachdb#19222 for a related
follower-reads RFC.
@tbg tbg force-pushed the rfc_change_feeds branch from 4860851 to 34c65ba Compare December 6, 2017 15:38
@tbg tbg merged commit d0472f0 into cockroachdb:master Dec 6, 2017
@tbg tbg deleted the rfc_change_feeds branch December 6, 2017 16:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants