Skip to content

Commit

Permalink
Add Presence functionality
Browse files Browse the repository at this point in the history
This change adds the ability for clients to broadcast information about
"Presence" - the notion of a client's position or state in a particular
document. This might be represent a cursor in a text document, or a
highlighted field in a more complex JSON document, or any other
transient, current information about a client that shouldn't necessarily
be stored in the document's chain of ops.

The main complication that this feature solves is the issue of keeping
presence correctly associated with the version of a `Doc` it was created
at. For example, in a "naive" implementation of presence, presence
information can arrive ahead of or behind ops, which - in a text-based
example - can cause the cursor to "jitter" around the change. Using the
ShareDB implementation will ensure that the presence is correctly
transformed against any ops, and will ensure that presence information
is always consistent with the version of the document. We also locally
transform existing presence, which should help to keep (static) remote
presence correctly positioned, independent of latency.

In order to facilitate this, the feature must be used with an OT type
that supports presence. The only requirement for supporting presence is
the support of a `transformPresence` method:

```javascript
type.transformPresence(presence, op, isOwnOperation): presence;
```

* `presence` _Object_: the presence data being transformed. The type
  will define this shape to be whatever is appropriate for the type.
* `op` _Op_: the operation against which to transform the presence
* `isOwnOperation`: _boolean_: whether the presence and the op have the
  same "owner". This information can be useful for some types to break
  ties when transforming a presence, for example as used in
  [`rich-text`][1]

This work is based on the [work][2] by @gkubisa and @curran, but with
the following aims:

  - avoid modifying the existing `Doc` class as much as possible, and
    instead use lifecycle hooks
  - keep presence separate as its own conceptual entity
  - break the presence subscriptions apart from `Doc` subscriptions
    (although in practice, the two are obviously tightly coupled)
  - allow multiple presences on a single `Doc` on the same `Connection`

[1]: https://github.com/quilljs/delta#tranformposition
[2]: #288
  • Loading branch information
Alec Gibson committed Nov 20, 2019
1 parent 1a483b7 commit 8863d40
Show file tree
Hide file tree
Showing 20 changed files with 2,518 additions and 18 deletions.
85 changes: 85 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ Register a new middleware.
the database.
* `'receive'`: Received a message from a client
* `'reply'`: About to send a non-error reply to a client message
* `'sendPresence'`: About to send presence information to a client
* `fn` _(Function(context, callback))_
Call this function at the time specified by `action`.
* `context` will always have the following properties:
Expand Down Expand Up @@ -303,6 +304,20 @@ Get a read-only snapshot of a document at the requested version.
}
```

`connection.getPresence(collection, id, presenceId): Presence;`
Get a [`Presence`](#class-sharedbpresence) instance that can be used to subscribe to presence information to other clients, and create instances of local presence.

* `channel` _(String)_
Presence channel to subscribe to

`connection.getPresence(collection, id, presenceId): Presence;`
Get a special [`Presence`](#class-sharedbpresence) instance that can be used to subscribe to presence information to other clients, and create instances of local presence. This is tied to a `Doc`, and all presence will be automatically transformed against ops to keep presence current. Note that the `Doc` must be of a type that supports presence.

* `collection` _(String)_
Document collection
* `id` _(String)_
Document ID

### Class: `ShareDB.Doc`

`doc.type` _(String_)
Expand Down Expand Up @@ -349,6 +364,9 @@ The document was deleted. Document contents before deletion are passed in as an
`doc.on('error', function(err) {...})`
There was an error fetching the document or applying an operation.

`doc.on('presence', function(id, presence) {...})`
A remote client has sent presence information. `id` is an ID provided by the remote client, and `presence` is the presence data, whose structure will depend on document's OT type.

`doc.removeListener(eventName, listener)`
Removes any listener you added with `doc.on`. `eventName` should be one of `'load'`, `'create'`, `'before op'`, `'op'`, `'del'`, or `'error'`. `listener` should be the function you passed in as the second argument to `on`. Note that both `on` and `removeListener` are inherited from [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter).

Expand Down Expand Up @@ -379,6 +397,12 @@ Invokes the given callback function after

Note that `whenNothingPending` does NOT wait for pending `model.query()` calls.

`doc.subscribeToPresence([function(err) {...}])`
Subscribes to presence updates sent by other clients, emitting `presence` events (see above).

`doc.unsubscribeFromPresence(function(err) {...})`
Unsubscribe from presence updates sent by other clients, and stop receiving `presence` events (see above).

### Class: `ShareDB.Query`

`query.ready` _(Boolean)_
Expand Down Expand Up @@ -629,6 +653,67 @@ var connectionInfo = getUserPermissions();
var connection = backend.connect(null, connectionInfo);
```

### Class: `ShareDB.Presence`

Representation of the presence data associated with a given channel, or - in the case of `getDocPresence` - presence data associated with a `Doc` instance.

#### `subscribe`

```javascript
presence.subscribe(callback);
```

Subscribe to presence updates from other clients. Note that presence can be submitted without subscribing, but remote clients will not be able to re-request presence if not subscribed.

* `callback` _Function_: a callback with the signature `function (error: Error): void;`

#### `unsubscribe`

```javascript
presence.unsubscribe(callback);
```

Unsubscribe from presence updates from remote clients.

* `callback` _Function_: a callback with the signature `function (error: Error): void;`

#### `create`

```javascript
presence.create(presenceId): LocalPresence;
```

Create an instance of [`LocalPresence`](#class-sharedblocalpresence), which can be used to represent local presence. Many or none such local presences may exist on a `Presence` instance.

* `presenceId` _string_: a unique ID representing the local presence

#### `destroy`

```javascript
presence.destroy(callback);
```

Updates all remote clients with a `null` presence. Then unsubscribes and destroys the local instance of the presence by de-registering all the `Doc` hooks it listens to, and removes it from the `Connection` cache, so that it can be garbage-collected. This should be called when you are done with a presence, and no longer need to use it to fire updates.

This method is automatically called when calling `doc.destroy`.

* `callback` _Function_: a callback with the signature `function (error: Error): void;`

### Class: `ShareDB.LocalPresence`

`LocalPresence` represents the presence of the local client in a given `Doc`. For example, this might be the position of a caret in a text document; which field has been highlighted in a complex JSON object; etc. Multiple presences may exist per `Doc` even on the same client.

#### `submit`

```javascript
localPresence.submit(presence, callback);
```

Update the local representation of presence, and broadcast that presence to any other document presence subscribers.

* `presence` _Object_: the presence object to broadcast. The structure of this will depend on the OT type
* `callback` _Function_: a callback with the signature `function (error: Error): void;`

### Logging

By default, ShareDB logs to `console`. This can be overridden if you wish to silence logs, or to log to your own logging driver or alert service.
Expand Down
127 changes: 125 additions & 2 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ function Agent(backend, stream) {
// Map from queryId -> emitter
this.subscribedQueries = {};

// Track which documents are subscribed to presence by the client. This is a
// map of channel -> stream
this.subscribedPresences = {};
// Highest seq received for a subscription request. Any seq lower than this
// value is stale, and should be ignored. Used for keeping the subscription
// state in sync with the client's desired state
this.presenceSubscriptionSeq = 0;

// We need to track this manually to make sure we don't reply to messages
// after the stream was closed.
this.closed = false;
Expand Down Expand Up @@ -77,6 +85,11 @@ Agent.prototype._cleanup = function() {
}
this.subscribedDocs = {};

for (var channel in this.subscribedPresences) {
this.subscribedPresences[channel].destroy();
}
this.subscribedPresences = {};

// Clean up query subscription streams
for (var id in this.subscribedQueries) {
var emitter = this.subscribedQueries[id];
Expand Down Expand Up @@ -120,6 +133,22 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
});
};

Agent.prototype._subscribeToPresenceStream = function(channel, stream) {
if (this.closed) return stream.destroy();

stream.on('data', function(data) {
if (data.error) {
logger.error('Presence subscription stream error', channel, data.error);
}
this._handlePresenceData(data);
}.bind(this));

stream.on('end', function() {
if (this.subscribedPresences[channel] !== stream) return;
delete this.subscribedPresences[channel];
}.bind(this));
};

Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) {
var previous = this.subscribedQueries[queryId];
if (previous) previous.destroy();
Expand Down Expand Up @@ -310,14 +339,18 @@ Agent.prototype._checkRequest = function(request) {
if (request.a === 'qf' || request.a === 'qs' || request.a === 'qu') {
// Query messages need an ID property.
if (typeof request.id !== 'number') return 'Missing query ID';
} else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u') {
} else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u' || request.a === 'p') {
// Doc-based request.
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
if (request.d != null && typeof request.d !== 'string') return 'Invalid id';

if (request.a === 'op') {
if (request.a === 'op' || request.a === 'p') {
if (request.v != null && (typeof request.v !== 'number' || request.v < 0)) return 'Invalid version';
}

if (request.a === 'p') {
if (typeof request.id !== 'string') return 'Missing presence ID';
}
} else if (request.a === 'bf' || request.a === 'bs' || request.a === 'bu') {
// Bulk request
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
Expand Down Expand Up @@ -359,6 +392,19 @@ Agent.prototype._handleMessage = function(request, callback) {
return this._fetchSnapshot(request.c, request.d, request.v, callback);
case 'nt':
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
case 'p':
var presence = this._createPresence(request);
if (presence.t && !util.supportsPresence(types.map[presence.t])) {
return callback({
code: ERROR_CODE.ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE,
message: 'Type does not support presence: ' + presence.t
});
}
return this._broadcastPresence(presence, callback);
case 'ps':
return this._subscribePresence(request.ch, request.seq, callback);
case 'pu':
return this._unsubscribePresence(request.ch, request.seq, callback);
default:
callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message'));
}
Expand Down Expand Up @@ -627,6 +673,83 @@ Agent.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp,
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
};

Agent.prototype._broadcastPresence = function(presence, callback) {
this.backend.transformPresenceToLatestVersion(this, presence, function(error, presence) {
if (error) return callback(error);
var channel = this._getPresenceChannel(presence.ch);
this.backend.pubsub.publish([channel], presence, function(error) {
if (error) return callback(error);
callback(null, presence);
});
}.bind(this));
};

Agent.prototype._createPresence = function(request) {
return {
a: 'p',
ch: request.ch,
src: this.clientId,
seq: request.seq,
id: request.id,
p: request.p,
c: request.c,
d: request.d,
v: request.v,
t: request.t
};
};

Agent.prototype._subscribePresence = function(channel, seq, callback) {
var presenceChannel = this._getPresenceChannel(channel);
this.backend.pubsub.subscribe(presenceChannel, function(error, stream) {
if (error) return callback(error);
if (seq < this.presenceSubscriptionSeq) return callback(null, {ch: channel, seq: seq});
this.presenceSubscriptionSeq = seq;
this.subscribedPresences[channel] = stream;
this._subscribeToPresenceStream(channel, stream);
this._requestPresence(channel, function(error) {
callback(error, {ch: channel, seq: seq});
});
}.bind(this));
};

Agent.prototype._unsubscribePresence = function(channel, seq, callback) {
if (seq < this.presenceSubscriptionSeq) return;
this.presenceSubscriptionSeq = seq;
var stream = this.subscribedPresences[channel];
if (stream) stream.destroy();
callback(null, {ch: channel, seq: seq});
};

Agent.prototype._getPresenceChannel = function(channel) {
// TODO: May need to namespace this further if we want to have automatic Doc channels that don't
// clash with arbitrary user input (eg if a user decides to name their channel the same as a doc collection)
// TODO: What if a user creates a collection called _presence?
return '_presence.' + channel;
};

Agent.prototype._requestPresence = function(channel, callback) {
var presenceChannel = this._getPresenceChannel(channel);
this.backend.pubsub.publish([presenceChannel], {ch: channel, r: true, src: this.clientId}, callback);
};

Agent.prototype._handlePresenceData = function(presence) {
if (presence.src === this.clientId) return;

if (presence.r) return this.send({a: 'pr', ch: presence.ch});

var backend = this.backend;
var context = {
collection: presence.c,
presence: presence
};
backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) {
if (error) {
return this.send({a: 'p', ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)});
}
this.send(presence);
}.bind(this));
};

function createClientOp(request, clientId) {
// src can be provided if it is not the same as the current agent,
Expand Down
18 changes: 18 additions & 0 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ Backend.prototype.MIDDLEWARE_ACTIONS = {
// by design, changing existing reply properties can cause weird bugs, since
// the rest of ShareDB would be unaware of those changes.
reply: 'reply',
// About to send presence information to a client
sendPresence: 'sendPresence',
// An operation is about to be submitted to the database
submit: 'submit'
};
Expand Down Expand Up @@ -812,6 +814,22 @@ Backend.prototype._buildSnapshotFromOps = function(id, startingSnapshot, ops, ca
callback(error, snapshot);
};

Backend.prototype.transformPresenceToLatestVersion = function(agent, presence, callback) {
if (!presence.c || !presence.d) return callback(null, presence);
this.getOps(agent, presence.c, presence.d, presence.v, null, function(error, ops) {
if (error) return callback(error);
for (var i = 0; i < ops.length; i++) {
var op = ops[i];
var isOwnOp = op.src === presence.src;
var transformError = ot.transformPresence(presence, op, isOwnOp);
if (transformError) {
return callback(transformError);
}
}
callback(null, presence);
});
};

function pluckIds(snapshots) {
var ids = [];
for (var i = 0; i < snapshots.length; i++) {
Expand Down
Loading

0 comments on commit 8863d40

Please sign in to comment.