Skip to content

Commit bd4c874

Browse files
author
Alec Gibson
committed
Add Presence functionality
This change adds the ability for clients to broadcast information about "Presence" - the notion of a client's position or state in a particular document. This might be represent a cursor in a text document, or a highlighted field in a more complex JSON document, or any other transient, current information about a client that shouldn't necessarily be stored in the document's chain of ops. The main complication that this feature solves is the issue of keeping presence correctly associated with the version of a `Doc` it was created at. For example, in a "naive" implementation of presence, presence information can arrive ahead of or behind ops, which - in a text-based example - can cause the cursor to "jitter" around the change. Using the ShareDB implementation will ensure that the presence is correctly transformed against any ops, and will ensure that presence information is always consistent with the version of the document. We also locally transform existing presence, which should help to keep (static) remote presence correctly positioned, independent of latency. In order to facilitate this, the feature must be used with an OT type that supports presence. The only requirement for supporting presence is the support of a `transformPresence` method: ```javascript type.transformPresence(presence, op, isOwnOperation): presence; ``` * `presence` _Object_: the presence data being transformed. The type will define this shape to be whatever is appropriate for the type. * `op` _Op_: the operation against which to transform the presence * `isOwnOperation`: _boolean_: whether the presence and the op have the same "owner". This information can be useful for some types to break ties when transforming a presence, for example as used in [`rich-text`][1] This work is based on the [work][2] by @gkubisa and @curran, but with the following aims: - avoid modifying the existing `Doc` class as much as possible, and instead use lifecycle hooks - keep presence separate as its own conceptual entity - break the presence subscriptions apart from `Doc` subscriptions (although in practice, the two are obviously tightly coupled) - allow multiple presences on a single `Doc` on the same `Connection` [1]: https://github.com/quilljs/delta#tranformposition [2]: #288
1 parent bbdfbe1 commit bd4c874

19 files changed

+2507
-18
lines changed

README.md

+85
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ Register a new middleware.
156156
the database.
157157
* `'receive'`: Received a message from a client
158158
* `'reply'`: About to send a non-error reply to a client message
159+
* `'sendPresence'`: About to send presence information to a client
159160
* `fn` _(Function(context, callback))_
160161
Call this function at the time specified by `action`.
161162
* `context` will always have the following properties:
@@ -303,6 +304,20 @@ Get a read-only snapshot of a document at the requested version.
303304
}
304305
```
305306

307+
`connection.getPresence(collection, id, presenceId): Presence;`
308+
Get a [`Presence`](#class-sharedbpresence) instance that can be used to subscribe to presence information to other clients, and create instances of local presence.
309+
310+
* `channel` _(String)_
311+
Presence channel to subscribe to
312+
313+
`connection.getPresence(collection, id, presenceId): Presence;`
314+
Get a special [`Presence`](#class-sharedbpresence) instance that can be used to subscribe to presence information to other clients, and create instances of local presence. This is tied to a `Doc`, and all presence will be automatically transformed against ops to keep presence current. Note that the `Doc` must be of a type that supports presence.
315+
316+
* `collection` _(String)_
317+
Document collection
318+
* `id` _(String)_
319+
Document ID
320+
306321
### Class: `ShareDB.Doc`
307322

308323
`doc.type` _(String_)
@@ -349,6 +364,9 @@ The document was deleted. Document contents before deletion are passed in as an
349364
`doc.on('error', function(err) {...})`
350365
There was an error fetching the document or applying an operation.
351366

367+
`doc.on('presence', function(id, presence) {...})`
368+
A remote client has sent presence information. `id` is an ID provided by the remote client, and `presence` is the presence data, whose structure will depend on document's OT type.
369+
352370
`doc.removeListener(eventName, listener)`
353371
Removes any listener you added with `doc.on`. `eventName` should be one of `'load'`, `'create'`, `'before op'`, `'op'`, `'del'`, or `'error'`. `listener` should be the function you passed in as the second argument to `on`. Note that both `on` and `removeListener` are inherited from [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter).
354372

@@ -379,6 +397,12 @@ Invokes the given callback function after
379397

380398
Note that `whenNothingPending` does NOT wait for pending `model.query()` calls.
381399

400+
`doc.subscribeToPresence([function(err) {...}])`
401+
Subscribes to presence updates sent by other clients, emitting `presence` events (see above).
402+
403+
`doc.unsubscribeFromPresence(function(err) {...})`
404+
Unsubscribe from presence updates sent by other clients, and stop receiving `presence` events (see above).
405+
382406
### Class: `ShareDB.Query`
383407

384408
`query.ready` _(Boolean)_
@@ -629,6 +653,67 @@ var connectionInfo = getUserPermissions();
629653
var connection = backend.connect(null, connectionInfo);
630654
```
631655

656+
### Class: `ShareDB.Presence`
657+
658+
Representation of the presence data associated with a given channel, or - in the case of `getDocPresence` - presence data associated with a `Doc` instance.
659+
660+
#### `subscribe`
661+
662+
```javascript
663+
presence.subscribe(callback);
664+
```
665+
666+
Subscribe to presence updates from other clients. Note that presence can be submitted without subscribing, but remote clients will not be able to re-request presence if not subscribed.
667+
668+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
669+
670+
#### `unsubscribe`
671+
672+
```javascript
673+
presence.unsubscribe(callback);
674+
```
675+
676+
Unsubscribe from presence updates from remote clients.
677+
678+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
679+
680+
#### `create`
681+
682+
```javascript
683+
presence.create(presenceId): LocalPresence;
684+
```
685+
686+
Create an instance of [`LocalPresence`](#class-sharedblocalpresence), which can be used to represent local presence. Many or none such local presences may exist on a `Presence` instance.
687+
688+
* `presenceId` _string_: a unique ID representing the local presence
689+
690+
#### `destroy`
691+
692+
```javascript
693+
presence.destroy(callback);
694+
```
695+
696+
Updates all remote clients with a `null` presence. Then unsubscribes and destroys the local instance of the presence by de-registering all the `Doc` hooks it listens to, and removes it from the `Connection` cache, so that it can be garbage-collected. This should be called when you are done with a presence, and no longer need to use it to fire updates.
697+
698+
This method is automatically called when calling `doc.destroy`.
699+
700+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
701+
702+
### Class: `ShareDB.LocalPresence`
703+
704+
`LocalPresence` represents the presence of the local client in a given `Doc`. For example, this might be the position of a caret in a text document; which field has been highlighted in a complex JSON object; etc. Multiple presences may exist per `Doc` even on the same client.
705+
706+
#### `submit`
707+
708+
```javascript
709+
localPresence.submit(presence, callback);
710+
```
711+
712+
Update the local representation of presence, and broadcast that presence to any other document presence subscribers.
713+
714+
* `presence` _Object_: the presence object to broadcast. The structure of this will depend on the OT type
715+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
716+
632717
### Logging
633718

634719
By default, ShareDB logs to `console`. This can be overridden if you wish to silence logs, or to log to your own logging driver or alert service.

lib/agent.js

+122-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ function Agent(backend, stream) {
2626
// Map from queryId -> emitter
2727
this.subscribedQueries = {};
2828

29+
// Track which documents are subscribed to presence by the client. This is a
30+
// map of channel -> stream
31+
this.subscribedPresences = {};
32+
// Highest seq received for a subscription request. Any seq lower than this
33+
// value is stale, and should be ignored. Used for keeping the subscription
34+
// state in sync with the client's desired state
35+
this.presenceSubscriptionSeq = 0;
36+
2937
// We need to track this manually to make sure we don't reply to messages
3038
// after the stream was closed.
3139
this.closed = false;
@@ -74,6 +82,11 @@ Agent.prototype._cleanup = function() {
7482
}
7583
this.subscribedDocs = {};
7684

85+
for (var channel in this.subscribedPresences) {
86+
this.subscribedPresences[channel].destroy();
87+
}
88+
this.subscribedPresences = {};
89+
7790
// Clean up query subscription streams
7891
for (var id in this.subscribedQueries) {
7992
var emitter = this.subscribedQueries[id];
@@ -117,6 +130,22 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
117130
});
118131
};
119132

133+
Agent.prototype._subscribeToPresenceStream = function(channel, stream) {
134+
if (this.closed) return stream.destroy();
135+
136+
stream.on('data', function(data) {
137+
if (data.error) {
138+
logger.error('Presence subscription stream error', channel, data.error);
139+
}
140+
this._handlePresenceData(data);
141+
}.bind(this));
142+
143+
stream.on('end', function() {
144+
if (this.subscribedPresences[channel] !== stream) return;
145+
delete this.subscribedPresences[channel];
146+
}.bind(this));
147+
};
148+
120149
Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) {
121150
var previous = this.subscribedQueries[queryId];
122151
if (previous) previous.destroy();
@@ -307,14 +336,18 @@ Agent.prototype._checkRequest = function(request) {
307336
if (request.a === 'qf' || request.a === 'qs' || request.a === 'qu') {
308337
// Query messages need an ID property.
309338
if (typeof request.id !== 'number') return 'Missing query ID';
310-
} else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u') {
339+
} else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u' || request.a === 'p') {
311340
// Doc-based request.
312341
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
313342
if (request.d != null && typeof request.d !== 'string') return 'Invalid id';
314343

315-
if (request.a === 'op') {
344+
if (request.a === 'op' || request.a === 'p') {
316345
if (request.v != null && (typeof request.v !== 'number' || request.v < 0)) return 'Invalid version';
317346
}
347+
348+
if (request.a === 'p') {
349+
if (typeof request.id !== 'string') return 'Missing presence ID';
350+
}
318351
} else if (request.a === 'bf' || request.a === 'bs' || request.a === 'bu') {
319352
// Bulk request
320353
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
@@ -356,6 +389,16 @@ Agent.prototype._handleMessage = function(request, callback) {
356389
return this._fetchSnapshot(request.c, request.d, request.v, callback);
357390
case 'nt':
358391
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
392+
case 'p':
393+
var presence = this._createPresence(request);
394+
if (presence.t && !util.supportsPresence(types.map[presence.t])) {
395+
return callback({code: 9999, message: 'Type does not support presence: ' + presence.t});
396+
}
397+
return this._broadcastPresence(presence, callback);
398+
case 'ps':
399+
return this._subscribePresence(request.ch, request.seq, callback);
400+
case 'pu':
401+
return this._unsubscribePresence(request.ch, request.seq, callback);
359402
default:
360403
callback({code: 4000, message: 'Invalid or unknown message'});
361404
}
@@ -624,6 +667,83 @@ Agent.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp,
624667
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
625668
};
626669

670+
Agent.prototype._broadcastPresence = function(presence, callback) {
671+
this.backend.transformPresenceToLatestVersion(this, presence, function(error, presence) {
672+
if (error) return callback(error);
673+
var channel = this._getPresenceChannel(presence.ch);
674+
this.backend.pubsub.publish([channel], presence, function(error) {
675+
if (error) return callback(error);
676+
callback(null, presence);
677+
});
678+
}.bind(this));
679+
};
680+
681+
Agent.prototype._createPresence = function(request) {
682+
return {
683+
a: 'p',
684+
ch: request.ch,
685+
src: this.clientId,
686+
seq: request.seq,
687+
id: request.id,
688+
p: request.p,
689+
c: request.c,
690+
d: request.d,
691+
v: request.v,
692+
t: request.t
693+
};
694+
};
695+
696+
Agent.prototype._subscribePresence = function(channel, seq, callback) {
697+
var presenceChannel = this._getPresenceChannel(channel);
698+
this.backend.pubsub.subscribe(presenceChannel, function(error, stream) {
699+
if (error) return callback(error);
700+
if (seq < this.presenceSubscriptionSeq) return callback(null, {ch: channel, seq: seq});
701+
this.presenceSubscriptionSeq = seq;
702+
this.subscribedPresences[channel] = stream;
703+
this._subscribeToPresenceStream(channel, stream);
704+
this._requestPresence(channel, function(error) {
705+
callback(error, {ch: channel, seq: seq});
706+
});
707+
}.bind(this));
708+
};
709+
710+
Agent.prototype._unsubscribePresence = function(channel, seq, callback) {
711+
if (seq < this.presenceSubscriptionSeq) return;
712+
this.presenceSubscriptionSeq = seq;
713+
var stream = this.subscribedPresences[channel];
714+
if (stream) stream.destroy();
715+
callback(null, {ch: channel, seq: seq});
716+
};
717+
718+
Agent.prototype._getPresenceChannel = function(channel) {
719+
// TODO: May need to namespace this further if we want to have automatic Doc channels that don't
720+
// clash with arbitrary user input (eg if a user decides to name their channel the same as a doc collection)
721+
// TODO: What if a user creates a collection called _presence?
722+
return '_presence.' + channel;
723+
};
724+
725+
Agent.prototype._requestPresence = function(channel, callback) {
726+
var presenceChannel = this._getPresenceChannel(channel);
727+
this.backend.pubsub.publish([presenceChannel], {ch: channel, r: true, src: this.clientId}, callback);
728+
};
729+
730+
Agent.prototype._handlePresenceData = function(presence) {
731+
if (presence.src === this.clientId) return;
732+
733+
if (presence.r) return this.send({a: 'pr', ch: presence.ch});
734+
735+
var backend = this.backend;
736+
var context = {
737+
collection: presence.c,
738+
presence: presence
739+
};
740+
backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) {
741+
if (error) {
742+
return this.send({a: 'p', ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)});
743+
}
744+
this.send(presence);
745+
}.bind(this));
746+
};
627747

628748
function createClientOp(request, clientId) {
629749
// src can be provided if it is not the same as the current agent,

lib/backend.js

+18
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ Backend.prototype.MIDDLEWARE_ACTIONS = {
8787
// by design, changing existing reply properties can cause weird bugs, since
8888
// the rest of ShareDB would be unaware of those changes.
8989
reply: 'reply',
90+
// About to send presence information to a client
91+
sendPresence: 'sendPresence',
9092
// An operation is about to be submitted to the database
9193
submit: 'submit'
9294
};
@@ -800,6 +802,22 @@ Backend.prototype._buildSnapshotFromOps = function(id, startingSnapshot, ops, ca
800802
callback(error, snapshot);
801803
};
802804

805+
Backend.prototype.transformPresenceToLatestVersion = function(agent, presence, callback) {
806+
if (!presence.c || !presence.d) return callback(null, presence);
807+
this.getOps(agent, presence.c, presence.d, presence.v, null, function(error, ops) {
808+
if (error) return callback(error);
809+
for (var i = 0; i < ops.length; i++) {
810+
var op = ops[i];
811+
var isOwnOp = op.src === presence.src;
812+
var transformError = ot.transformPresence(presence, op, isOwnOp);
813+
if (transformError) {
814+
return callback(transformError);
815+
}
816+
}
817+
callback(null, presence);
818+
});
819+
};
820+
803821
function pluckIds(snapshots) {
804822
var ids = [];
805823
for (var i = 0; i < snapshots.length; i++) {

0 commit comments

Comments
 (0)