Skip to content

Commit

Permalink
Significant refactor of membership change paths in prep for smaller
Browse files Browse the repository at this point in the history
damping pull request
  • Loading branch information
jwolski committed Mar 31, 2015
1 parent 0c5672f commit 02d3311
Show file tree
Hide file tree
Showing 20 changed files with 791 additions and 780 deletions.
100 changes: 34 additions & 66 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ var sendPingReq = require('./lib/swim/ping-req-sender.js');
var Suspicion = require('./lib/swim/suspicion');

var createRingPopTChannel = require('./lib/tchannel.js').createRingPopTChannel;
var Dissemination = require('./lib/members').Dissemination;
var Dissemination = require('./lib/dissemination.js');
var errors = require('./lib/errors.js');
var HashRing = require('./lib/ring');
var joinCluster = require('./lib/join_cluster.js').joinCluster;
var MemberIterator = require('./lib/members').MemberIterator;
var Membership = require('./lib/members').Membership;
var MembershipUpdateRollup = require('./lib/membership_update_rollup.js');
var Membership = require('./lib/membership.js');
var MembershipIterator = require('./lib/membership-iterator.js');
var MembershipUpdateRollup = require('./lib/membership-update-rollup.js');
var nulls = require('./lib/nulls');
var rawHead = require('./lib/request-proxy/util.js').rawHead;
var RequestProxy = require('./lib/request-proxy/index.js');
Expand Down Expand Up @@ -128,7 +128,7 @@ function RingPop(options) {
this.dissemination = new Dissemination(this);
this.membership = new Membership(this);
this.membership.on('updated', this.onMembershipUpdated.bind(this));
this.memberIterator = new MemberIterator(this);
this.memberIterator = new MembershipIterator(this);
this.gossip = new Gossip({
ringpop: this,
minProtocolPeriod: options.minProtocolPeriod
Expand Down Expand Up @@ -188,14 +188,6 @@ RingPop.prototype.setupChannel = function setupChannel() {
createRingPopTChannel(this, this.channel);
};

RingPop.prototype.addLocalMember = function addLocalMember(info) {
this.membership.addMember({
address: this.hostPort,
incarnationNumber: info && info.incarnationNumber,
status: 'alive'
});
};

RingPop.prototype.adminJoin = function adminJoin(callback) {
if (!this.membership.localMember) {
process.nextTick(function() {
Expand Down Expand Up @@ -239,7 +231,9 @@ RingPop.prototype.adminLeave = function adminLeave(callback) {
}

// TODO Explicitly infect other members (like admin join)?
this.membership.makeLeave();
this.membership.makeLeave(this.whoami(),
this.membership.localMember.incarnationNumber);

this.gossip.stop();
this.suspicion.stopAll();

Expand Down Expand Up @@ -282,7 +276,8 @@ RingPop.prototype.bootstrap = function bootstrap(opts, callback) {
this.checkForMissingBootstrapHost();
this.checkForHostnameIpMismatch();

this.addLocalMember();
// Add local member to membership.
this.membership.makeAlive(this.whoami(), Date.now());

this.adminJoin(function(err, nodesJoined) {
if (err) {
Expand Down Expand Up @@ -441,12 +436,12 @@ RingPop.prototype.protocolJoin = function protocolJoin(options, callback) {
this.serverRate.mark();
this.totalRate.mark();

this.membership.makeJoin(joinerAddress, options.incarnationNumber);
this.membership.makeAlive(joinerAddress, options.incarnationNumber);

callback(null, {
app: this.app,
coordinator: this.whoami(),
membership: this.membership.getState()
membership: this.dissemination.fullSync()
});
};

Expand All @@ -467,7 +462,7 @@ RingPop.prototype.protocolPing = function protocolPing(options, callback) {
this.membership.update(changes);

callback(null, {
changes: this.issueMembershipChanges(checksum, source)
changes: this.dissemination.issueChanges(checksum, source)
});
};

Expand Down Expand Up @@ -495,79 +490,51 @@ RingPop.prototype.whoami = function whoami() {
return this.hostPort;
};

RingPop.prototype.issueMembershipChanges = function issueMembershipChanges(checksum, source) {
return this.dissemination.getChanges(checksum, source);
};

RingPop.prototype.onMemberAlive = function onMemberAlive(member) {
RingPop.prototype.onMemberAlive = function onMemberAlive(change) {
this.stat('increment', 'membership-update.alive');
this.logger.debug('member is alive', {
local: this.membership.localMember.address,
alive: member.address
});

this.dissemination.addChange({
address: member.address,
status: member.status,
incarnationNumber: member.incarnationNumber,
piggybackCount: 0
alive: change.address
});

this.ring.addServer(member.address);
this.suspicion.stop(member);
this.dissemination.recordChange(change);
this.ring.addServer(change.address);
this.suspicion.stop(change);
};

RingPop.prototype.onMemberFaulty = function onMemberFaulty(member) {
RingPop.prototype.onMemberFaulty = function onMemberFaulty(change) {
this.stat('increment', 'membership-update.faulty');
this.logger.debug('member is faulty', {
local: this.membership.localMember.address,
faulty: member.address
});

this.dissemination.addChange({
address: member.address,
status: member.status,
incarnationNumber: member.incarnationNumber,
piggybackCount: 0
faulty: change.address,
});

this.ring.removeServer(member.address);
this.suspicion.stop(member);
this.dissemination.recordChange(change);
this.ring.removeServer(change.address);
this.suspicion.stop(change);
};

RingPop.prototype.onMemberLeave = function onMemberLeave(member) {
RingPop.prototype.onMemberLeave = function onMemberLeave(change) {
this.stat('increment', 'membership-update.leave');
this.logger.debug('member has left', {
local: this.membership.localMember.address,
left: member.address
left: change.address
});

this.dissemination.addChange({
address: member.address,
status: member.status,
incarnationNumber: member.incarnationNumber,
piggybackCount: 0
});

this.ring.removeServer(member.address);
this.suspicion.stop(member);
this.dissemination.recordChange(change);
this.ring.removeServer(change.address);
this.suspicion.stop(change);
};

RingPop.prototype.onMemberSuspect = function onMemberSuspect(member) {
RingPop.prototype.onMemberSuspect = function onMemberSuspect(change) {
this.stat('increment', 'membership-update.suspect');
this.logger.debug('member is suspect', {
local: this.membership.localMember.address,
suspect: member.address
suspect: change.address
});

this.suspicion.start(member);

this.dissemination.addChange({
address: member.address,
status: member.status,
incarnationNumber: member.incarnationNumber,
piggybackCount: 0
});
this.suspicion.start(change);
this.dissemination.recordChange(change);
};

RingPop.prototype.onMembershipUpdated = function onMembershipUpdated(updates) {
Expand Down Expand Up @@ -683,7 +650,8 @@ RingPop.prototype.readHostsFile = function readHostsFile(file) {
};

RingPop.prototype.rejoin = function rejoin(callback) {
this.membership.affirmAliveness();
// Assert local member is alive.
this.membership.makeAlive(this.whoami(), Date.now());
this.gossip.start();
this.suspicion.reenable();

Expand Down
131 changes: 131 additions & 0 deletions lib/dissemination.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

'use strict';

var MembershipUpdateRules = require('./membership-update-rules.js');

var LOG_10 = Math.log(10);

function Dissemination(ringpop) {
this.ringpop = ringpop;
this.ringpop.on('changed', this.onRingChanged.bind(this));

this.changes = {};
this.maxPiggybackCount = 1;
this.piggybackFactor = 15; // A lower piggyback factor leads to more full-syncs
}

Dissemination.prototype.adjustMaxPiggybackCount = function adjustMaxPiggybackCount() {
var serverCount = this.ringpop.ring.getServerCount();
var prevPiggybackCount = this.maxPiggybackCount;
var newPiggybackCount = this.piggybackFactor * Math.ceil(Math.log(serverCount + 1) / LOG_10);

if (this.maxPiggybackCount !== newPiggybackCount) {
this.maxPiggybackCount = newPiggybackCount;
this.ringpop.stat('gauge', 'max-piggyback', this.maxPiggybackCount);
this.ringpop.logger.debug('adjusted max piggyback count', {
newPiggybackCount: this.maxPiggybackCount,
oldPiggybackCount: prevPiggybackCount,
piggybackFactor: this.piggybackFactor,
serverCount: serverCount
});
}
};

Dissemination.prototype.fullSync = function fullSync() {
var changes = [];

for (var i = 0; i < this.ringpop.membership.members; i++) {
var member = this.ringpop.membership.members[i];

changes.push({
source: this.ringpop.whoami(),
address: member.address,
status: member.status,
incarnationNumber: member.incarnationNumber
});
}

return changes;
};

Dissemination.prototype.issueChanges = function issueChanges(checksum, source) {
var changesToDisseminate = [];

var changedNodes = Object.keys(this.changes);

for (var i = 0; i < changedNodes.length; i++) {
var address = changedNodes[i];
var change = this.changes[address];

// TODO We're bumping the piggyback count even though
// we don't know whether the change successfully made
// it over to the other side. This can result in undesired
// full-syncs.
if (typeof change.piggybackCount === 'undefined') {
change.piggybackCount = 0;
}

change.piggybackCount += 1;

if (change.piggybackCount > this.maxPiggybackCount) {
delete this.changes[address];
continue;
}

// TODO Compute change ID
// TODO Include change timestamp
changesToDisseminate.push({
source: change.source,
address: change.address,
status: change.status,
incarnationNumber: change.incarnationNumber
});
}

this.ringpop.stat('gauge', 'changes.disseminate', changesToDisseminate.length);

if (changesToDisseminate.length) {
return changesToDisseminate;
} else if (checksum && this.ringpop.membership.checksum !== checksum) {
this.ringpop.stat('increment', 'full-sync');
this.ringpop.logger.info('full sync', {
localChecksum: this.ringpop.membership.checksum,
remoteChecksum: checksum,
remoteNode: source
});

// TODO Somehow send back indication of isFullSync
return this.fullSync();
} else {
return [];
}
};

Dissemination.prototype.onRingChanged = function onRingChanged() {
this.adjustMaxPiggybackCount();
};

Dissemination.prototype.recordChange = function recordChange(change) {
this.changes[change.address] = change;
};

module.exports = Dissemination;
8 changes: 4 additions & 4 deletions lib/member.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ function Member() {
}

Member.Status = {
Alive: 'alive',
Faulty: 'faulty',
Leave: 'leave',
Suspect: 'suspect'
alive: 'alive',
faulty: 'faulty',
leave: 'leave',
suspect: 'suspect'
};

module.exports = Member;
Loading

0 comments on commit 02d3311

Please sign in to comment.