Skip to content

Commit

Permalink
Aggregate join responses and update membership once after join completes
Browse files Browse the repository at this point in the history
  • Loading branch information
jwolski committed Jul 1, 2015
1 parent 8335c2e commit a852383
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 2 deletions.
53 changes: 53 additions & 0 deletions lib/swim/join-response-merge.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

function mergeJoinResponses(joinResponses) {
if (!Array.isArray(joinResponses) || joinResponses.length === 0) {
return [];
}

var mergeIndex = {};

for (var i = 0; i < joinResponses.length; i++) {
var members = joinResponses[i].members;

for (var j = 0; j < members.length; j++) {
var member = members[j];

var indexVal = mergeIndex[member.address];

if (!indexVal || indexVal.incarnationNumber < member.incarnationNumber) {
mergeIndex[member.address] = member;
}
}
}

var indexKeys = Object.keys(mergeIndex);
var updates = new Array(indexKeys.length);

for (i = 0; i < indexKeys.length; i++) {
updates[i] = mergeIndex[indexKeys[i]];
}

return updates;
}

module.exports = mergeJoinResponses;
24 changes: 22 additions & 2 deletions lib/swim/join-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
var captureHost = require('../util.js').captureHost;
var errors = require('../errors.js');
var isEmptyArray = require('../util.js').isEmptyArray;
var mergeJoinResponses = require('./join-response-merge.js');
var numOrDefault = require('../util.js').numOrDefault;
var safeParse = require('../util.js').safeParse;
var TypedError = require('error/typed');
Expand Down Expand Up @@ -144,6 +145,10 @@ function JoinCluster(opts) {
// exceeded.
this.roundPotentialNodes = null;
this.roundPreferredNodes = null;

// Changes received by other nodes will be aggregated and
// applied once the join process is complete.
this.joinResponses = [];
}

// Potential nodes are those that are not this instance of ringpop.
Expand Down Expand Up @@ -249,6 +254,14 @@ JoinCluster.prototype.join = function join(callback) {
if (numJoined >= self.joinSize) {
var joinTime = Date.now() - startTime;

var updates = mergeJoinResponses(self.joinResponses);

// Update membership only once, when join is complete and successful.
self.ringpop.membership.update(updates);

// No need to keep this data hanging around.
self.joinResponses = null;

self.ringpop.stat('timing', 'join', joinTime);
self.ringpop.stat('increment', 'join.complete');
self.ringpop.logger.debug('ringpop join complete', {
Expand Down Expand Up @@ -416,8 +429,15 @@ JoinCluster.prototype.joinNode = function joinNode(node, callback) {

var bodyObj = safeParse(arg3.toString());

if (bodyObj) {
self.ringpop.membership.update(bodyObj.membership);
// Verify that `joinResponses` is not null. It is set
// to null upon completion of the join process. There may,
// however, be in-flight /protocol/join requests that have
// yet to complete.
if (bodyObj && self.joinResponses !== null) {
self.joinResponses.push({
checksum: bodyObj.membershipChecksum,
members: bodyObj.membership
});
}

callback(null, node);
Expand Down

0 comments on commit a852383

Please sign in to comment.