- Feature Name: Storage-Level Primitive for Change Feeds
- Status: draft
- Start Date: 2017-06-13
- Authors: Arjun Narayan and Tobias Schottdorf
- RFC PR: TBD
- Cockroach Issue: #9712, #6130
This RFC proposes a mechanism for subscribing to changes to a set of key ranges starting at an initial timestamp.
It is for use by Cockroach-internal higher-level systems such as distributed SQL, change data capture, or a Kafka producer endpoint. A "sister RFC" detailing these higher-level systems will be prepared by @arjunravinarayan.
Add a basic building block for change feeds. Namely, add a command ChangeFeed
served by *Replica
which, given a (sufficiently recent) HLC timestamp and a
set of key spans contained in the Replica returns a stream-based connection that
- eagerly delivers updates that affect any of the given key spans, and
- periodically delivers "closed timestamps", i.e. tuples
(timestamp, key_range)
where receiving(ts1, [a,b))
guarantees that no future update affecting[a,b)
will be sent for a timestamp less thants1
.
The design overlaps with incremental Backup/Restore and in particular follower reads, but also has relevance for streaming SQL results
The design touches on the higher-level primitives that abstract away the Range level, but only enough to motivate the Replica-level primitive itself.
Many databases have various features for propagating database updates to
external sinks eagerly. Currently, however, the only way to get data out of
CockroachDB is via SQL SELECT
queries, ./cockroach dump
, or Backups
(enterprise only). Furthermore, SELECT
queries at the SQL layer do not support
incremental reads, so polling through SQL is inefficient for data that is not
strictly ordered. Anecdotally, change feeds are one of the more frequently
requested features for CockroachDB.
Our motivating use cases are:
- wait for updates on individual rows or small spans in a table. For example, if a branch is pushed while its diff is viewed on Github, a notification will pop up in the browser to alert the user that the diff they're viewing has changed. This kind of functionality should be easy to achieve when using CockroachDB. Individual developers often ask for this, and it's one of the RethinkDB/etcd features folks really like.
- stream updates to a table or database into an external system, for example Kafka, with at-least-once semantics. This is also known as Change Data Capture (CDC). This is something companies tend to ask about.
- implement efficient incrementally updated materialized views. This is something we think everyone wants, but does not dare to ask.
The above use cases were key in informing the design of the basic building block presented here: We
- initiate change feeds using a HLC timestamp since that is the right primitive
for connecting the "initial state" (think
SELECT * FROM ...
orINSERT ... RETURNING CURRENT_TIMESTAMP()
) to the stream of updates. - chose to operate on a set of key ranges since that captures both collections of individual keys, sets of tables, or whole databases (CDC).
- require close notifications because often, a higher-level system needs to buffer updates until is knows that older data won't change any more; a simple example is wanting to output updates in timestamp-sorted order. More generally, close notifications enable check pointing for the case in which a change feed disconnects (which would not be uncommon with large key spans).
- emit close notifications with attached key ranges since that is a natural consequence of the Range-based sharding in CockroachDB, and fine-grained information is always preferrable. When not necessary, the key range can be processed away by an intermediate stage that tracks the minimum closed timestamp over all tracked key spans and emits that (with a global key range) whenever it changes.
- make the close notification threshold configurable via
ZoneConfig
s (though this is really a requirement we impose on 16593). Fast close notifications (which allow consumers to operate more efficiently) correspond to disabling long transactions, which we must not impose globally. - aim to serve change feeds from follower replicas (hence the connection to 16593).
- make the protocol efficient enough to operate on whole databases in practice.
Note that the consumers of this primitive are always going to be
CockroachDB-internal subsystems that consume raw data (similar to a stream of
WriteBatch
). We propose here a design for the core layer primitive, and the
higher-level subsystems are out of scope.
The events emitted are simple:
key
was set tovalue
attimestamp
,key
was deleted attimestamp
, and[startKey, endKey)
was closed attimestamp
, i.e. no more events will be emitted for[startKey, endKey)
at timestamps below the event's.
Consumers should not make any assumptions about timestamp ordering. Events or close timestamps could be reported out of order. For example, the following would be legal:
key=x
changes toa
atts=123.0
key=x
gets deleted atts=120
// deletion reported out of order[a, z)
closes atts=110
[a, z)
closes atts=100
// close notification out of order
However, the following would not be legal (because the deletion violates the close notification's promise):
[a, z)
closes atts=100
key=x
gets deleted atts=90
In practice, these "anomalies" would happen only infrequently, and usually accompagnied by a reconnection.
The exact format in which they are reported are TBD. Throughout this document,
we assume that we are passing along WriteBatch
es, though that is not a hard
requirement. Typical consumers will live in the SQL subsystem, so they may
profit from a simplified format.
Replicas (whether they're lease holder or not) accept a top-level ChangeFeed
RPC which contains a base HLC timestamp and (in the usual header) a key range
for which updates are to be delivered. The ChangeFeed
command first grabs
raftMu
, opens a RocksDB snapshot, registers itself with the raft processing
goroutine and releases raftMu
again. By registering itself, it receives all
future WriteBatch
es which apply on the Replica, and sends them on the stream
to the caller (after suitably sanitizing to account only for the updates which
are relevant to the span the caller has supplied).
The remaining difficulty is that additionally, we must retrieve all updates made at or after the given base HLC timestamp, and this is what the engine snapshot is for. We invoke a new MVCC operation (specified below) that, given a base timestamp and a set of key ranges, synthesizes ChangeFeed notifications from the snapshot (which is possible assuming that the base timestamp is a valid read timestamp, i.e. does not violate the GCThreshold or the like).
Once these synthesized events have been fed to the client, we begin relaying close notifications. We assume that close notifications are driven by follower reads and can be observed periodically, so that we are really only relaying them to the stream. Close notifications are per-Range, so all the key ranges (which are contained in the range) will be affected equally.
When the range splits or merges, of if the Replica gets removed, the stream terminates in an orderly fashion. The caller (who has knowledge of Replica distribution) will retry accordingly. For example, when the range splits, the caller will open two individual ChangeFeed streams to Replicas on both sides of the post-split Ranges, using the highest close notification timestamp it received before the stream disconnected. If the Replica gets removed, it will simply reconnect to another Replica, again using its most recently received close notification.
Initially, streams may also disconnect if they find themselves unable to keep up with write traffic on the Range. Later, we can consider triggering an early split adding backpressure, but this is out of scope for now.
We need a command that, given a snapshot, a key span and a base timestamp,
synthesizes a set of WriteBatch
es with the following property: If all MVCC
key-value pairs (whether insertions or deletions) with a timestamp larger than
or equal to the base timestamp were removed from the snapshot, applying the
emitted WriteBatch
es (in any order) would restore the initial snapshot.
Note that if a key is deleted at ts1
and then a new value written at ts2 > ts1
, we don't have to emit the delete before the write; we give no ordering
guarantees (doing so would require a scan in increasing timestamp order, which
is at odds with how we lay out data on-disk).
However, when so requested, we vow to emit events in ascending timestamp order
for each individual key, which is easy: Different versions of a key are grouped
together in descending timestamp order, so once we've created N
WriteBatch
es
for this key, we simply emit these N
items in reverse order once we're done
with the key. This mode of operation would be requested by ChangeFeed
s which
listen for on an individual key, and which don't want to wait for the next close
notification (which may take ~10s to arrive) until they can emit sorted updates
to the client (see the Github example).
Note: There is significant overlap with NewMVCCIncrementalIterator
, which is
used in incremental Backup/Restore.
The design is hand-wavy in this section because it overlaps with follower
reads significantly. When reads can be served from followers,
DistSender
or the distributed SQL framework need to be able to leverage this
fact. That means that they need to be able to (mostly correctly) guess which, if
any, follower Replica is able to serve reads at a given timestamp. Reads can be
served from a Replica if it has been notified of that fact, i.e. if a higher
timestamp has been closed.
Assuming such a mechanism in place, whether in DistSender or distributed SQL, it
should be straightforward to entity which abstracts away the Range level. For
example, given the key span of a whole table, that entity splits it along range
boundaries, and opens individual ChangeFeed
streams to suitable members of
each range. When individual streams disconnect (for instance due to a split) new
streams are initiated (using the last known closed timestamp for the lost
stream) and any higher-level consumers notified that they need to drop their
buffers for the affected key range.
There is a straightforward path to implement small bits and pieces of this functionality without embarking on an overwhelming endeavour all at once:
First, implement the basic ChangeFeed
command, but without the base timestamp
or close notifications. That is, once registered with Raft
, updates are
streamed, but there is no information on which updates were missed and what
timestamps to expect. In turn, the MVCC work is not necessary yet, and neither
are follower reads.
Next, add an experimental ./cockroach debug changefeed <key>
command which
launches a single ChangeFeed
request through DistSender
. It immediately
prints results to the command line without buffering and simply retries the
ChangeFeed
command until the client disconnects. It may thus both produce
duplicates and miss updates.
This is a toy implementation that already makes for a good demo and allows a similar toy implementation that uses streaming SQL once it's available to explore the SQL API associated to this feature.
A logical next step is adding the MVCC work to also emit the events from the snapshot. That with some simple buffering at the consumer gives at-least-once delivery.
Once follower reads are available, it should be straightforward to send close
notifications from ChangeFeed
.
At that point, core-level work should be nearly complete and the focus shifts to
implementing distributed SQL processors which provide real changefeeds by
implementing the routing layer, buffering based on close notifications, and to
expose that through SQL in a full-fledged API. This should allow watching simple
SELECT
statements (anything that boils down to simple table-readers with not
too much aggregation going on), and then, of course, materialized views.
Performance concerns exist mostly in two areas: keeping the follower reads active (i.e. proactively closing out timestamps) and the MVCC scan to recover the base timestamp. There are also concerns about the memory and CPU costs associated to having many (smaller) watchers.
We defer discussion of the former into its own RFC. The concern here is that when one is receiving events from, say, a giant database which is mostly inactive, we don't want to force continuous Range activity even if there aren't any writes on almost all of the Ranges. Naively, this is necessary to be able to close out timestamps.
The second concern is the case in which there is a high rate of ChangeFeed
requests with a base timestamp and large key ranges, so that a lot of work is
spent on scanning data from snapshots which is not relevant to the feed. This
shouldn't be an issue for small (think single-key) watchers, but a large feed in
a busy reconnect loop could inflict serious damage to the system, especially if
it uses a low base timestamp (we may want to impose limits here) and thus has to
transfer significant amounts of data before switching to streaming mode.
Change Data Capture-type change feeds are likely to be singletons and long-lived. Thus, it's possible that all that's needed here are good diagnostic tools and the option to avoid the snapshot catch-up operation (when missing a few updates is OK).
Note that the existing single-key events immediately make the notification for,
say, DeleteRange(a,z)
very expensive since that would have to report each
individual deletion. This is similar to the problem of large Raft proposals due
to proposer-evaluated KV for these operations.
Pending the Revert RFC, we may be able to track range deletions more efficiently as well, but a straightforward general approach is to attach to such proposals enough information about the original batch to synthesize a ranged event downstream of Raft.
This design aims at killing all birds with one stone: short-lived watchers, materialized views, change data capture. It's worth discussing whether there are specialized solutions for, say, CDC, that are strictly superior and worth having two separate subsystems, or that may even replace this proposed one.
We will have to figure out what's CCL and what's OSS. Intuitively CDC sounds like it could be an enterprise feature and single-column watches should be OSS. However, there's a lot in between, and the primitive presented here is shared between all of them.