Skip to content

Commit

Permalink
Avoid disseminating redundant changes by filtering out change that or…
Browse files Browse the repository at this point in the history
…iginated from source
  • Loading branch information
jwolski committed Jun 23, 2015
1 parent 4a9373b commit 05722fd
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 71 deletions.
117 changes: 78 additions & 39 deletions lib/dissemination.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ Dissemination.prototype.adjustMaxPiggybackCount = function adjustMaxPiggybackCou
}
};

Dissemination.prototype.clearChanges = function clearChanges() {
this.changes = [];
};

Dissemination.prototype.fullSync = function fullSync() {
var changes = [];

Expand All @@ -71,14 +75,74 @@ Dissemination.prototype.fullSync = function fullSync() {
return changes;
};

Dissemination.prototype.issueChanges = function issueChanges(checksum, source) {
Dissemination.prototype.issueAsSender = function issueAsSender() {
return issueAs(this, null, mapChanges);

function mapChanges(changes) {
return changes;
}
};

Dissemination.prototype.issueAsReceiver = function issueAsReceiver(senderAddr, senderIncarnationNumber, senderChecksum) {
var self = this;

return issueAs(this, filterChange, mapChanges);

function filterChange(change) {
return !!(senderAddr &&
senderIncarnationNumber &&
change.source &&
change.sourceIncarnationNumber &&
senderAddr === change.source &&
senderIncarnationNumber === change.sourceIncarnationNumber);
}

function mapChanges(changes) {
// If no changes left to disseminate and checksums do not match, perform a full-sync.
if (changes.length > 0) {
return changes;
} else if (self.ringpop.membership.checksum !== senderChecksum) {
self.ringpop.stat('increment', 'full-sync');
self.ringpop.logger.info('full sync', {
local: self.ringpop.whoami(),
localChecksum: self.ringpop.membership.checksum,
dest: senderAddr,
destChecksum: senderChecksum
});

// TODO Somehow send back indication of isFullSync
return self.fullSync();
} else {
return [];
}
}
};

Dissemination.prototype.onRingChanged = function onRingChanged() {
this.adjustMaxPiggybackCount();
};

Dissemination.prototype.recordChange = function recordChange(change) {
this.changes[change.address] = change;
};

Dissemination.prototype.resetMaxPiggybackCount = function resetMaxPiggybackCount() {
this.maxPiggybackCount = Dissemination.Defaults.maxPiggybackCount;
};

Dissemination.Defaults = {
maxPiggybackCount: 1,
piggybackFactor: 15 // A lower piggyback factor leads to more full-syncs
};

function issueAs(dissemination, filterChange, mapChanges) {
var changesToDisseminate = [];

var changedNodes = Object.keys(this.changes);
var changedNodes = Object.keys(dissemination.changes);

for (var i = 0; i < changedNodes.length; i++) {
var address = changedNodes[i];
var change = this.changes[address];
var change = dissemination.changes[address];

// TODO We're bumping the piggyback count even though
// we don't know whether the change successfully made
Expand All @@ -88,57 +152,32 @@ Dissemination.prototype.issueChanges = function issueChanges(checksum, source) {
change.piggybackCount = 0;
}

if (typeof filterChange === 'function' && filterChange(change)) {
dissemination.ringpop.stat('increment', 'filtered-change');
continue;
}

change.piggybackCount += 1;

if (change.piggybackCount > this.maxPiggybackCount) {
delete this.changes[address];
if (change.piggybackCount > dissemination.maxPiggybackCount) {
delete dissemination.changes[address];
continue;
}

// TODO Compute change ID
// TODO Include change timestamp
changesToDisseminate.push({
source: change.source,
sourceIncarnationNumber: change.sourceIncarnationNumber,
address: change.address,
status: change.status,
incarnationNumber: change.incarnationNumber
});
}

this.ringpop.stat('gauge', 'changes.disseminate', changesToDisseminate.length);

if (changesToDisseminate.length) {
return changesToDisseminate;
} else if (checksum && this.ringpop.membership.checksum !== checksum) {
this.ringpop.stat('increment', 'full-sync');
this.ringpop.logger.info('full sync', {
localChecksum: this.ringpop.membership.checksum,
remoteChecksum: checksum,
remoteNode: source
});

// TODO Somehow send back indication of isFullSync
return this.fullSync();
} else {
return [];
}
};

Dissemination.prototype.onRingChanged = function onRingChanged() {
this.adjustMaxPiggybackCount();
};

Dissemination.prototype.recordChange = function recordChange(change) {
this.changes[change.address] = change;
};

Dissemination.prototype.resetMaxPiggybackCount = function resetMaxPiggybackCount() {
this.maxPiggybackCount = Dissemination.Defaults.maxPiggybackCount;
};
dissemination.ringpop.stat('gauge', 'changes.disseminate', changesToDisseminate.length);

Dissemination.Defaults = {
maxPiggybackCount: 1,
piggybackFactor: 15 // A lower piggyback factor leads to more full-syncs
};
return mapChanges(changesToDisseminate);
}

module.exports = Dissemination;
2 changes: 1 addition & 1 deletion lib/membership-update-rollup.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ MembershipUpdateRollup.prototype.flushBuffer = function flushBuffer() {
var now = Date.now();

var numUpdates = this.getNumUpdates();
this.ringpop.logger.info('ringpop flushed membership update buffer', {
this.ringpop.logger.debug('ringpop flushed membership update buffer', {
local: this.ringpop.whoami(),
checksum: this.ringpop.membership.checksum,
sinceFirstUpdate: this.lastUpdateTime && (this.lastUpdateTime - this.firstUpdateTime),
Expand Down
32 changes: 24 additions & 8 deletions lib/membership.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ Membership.prototype.generateChecksumString = function generateChecksumString()
return checksumStrings.sort().join(';');
};

Membership.prototype.getIncarnationNumber = function getIncarnationNumber() {
return this.localMember && this.localMember.incarnationNumber;
};

Membership.prototype.getJoinPosition = function getJoinPosition() {
return Math.floor(Math.random() * (this.members.length - 0)) + 0;
};
Expand Down Expand Up @@ -117,39 +121,51 @@ Membership.prototype.isPingable = function isPingable(member) {
member.status === 'suspect');
};

Membership.prototype.makeAlive = function makeAlive(address, incarnationNumber, source) {
Membership.prototype.makeAlive = function makeAlive(address, incarnationNumber) {
var localMember = this.localMember || {};

return this.update({
source: source || this.ringpop.whoami(),
source: localMember.address,
sourceIncarnationNumber: localMember.incarnationNumber,
address: address,
status: Member.Status.alive,
incarnationNumber: incarnationNumber,
timestamp: Date.now()
});
};

Membership.prototype.makeFaulty = function makeFaulty(address, incarnationNumber, source) {
Membership.prototype.makeFaulty = function makeFaulty(address, incarnationNumber) {
var localMember = this.localMember || {};

return this.update({
source: source || this.ringpop.whoami(),
source: localMember.address,
sourceIncarnationNumber: localMember.incarnationNumber,
address: address,
status: Member.Status.faulty,
incarnationNumber: incarnationNumber,
timestamp: Date.now()
});
};

Membership.prototype.makeLeave = function makeLeave(address, incarnationNumber, source) {
Membership.prototype.makeLeave = function makeLeave(address, incarnationNumber) {
var localMember = this.localMember || {};

return this.update({
source: source || this.ringpop.whoami(),
source: localMember.address,
sourceIncarnationNumber: localMember.incarnationNumber,
address: address,
status: Member.Status.leave,
incarnationNumber: incarnationNumber,
timestamp: Date.now()
});
};

Membership.prototype.makeSuspect = function makeSuspect(address, incarnationNumber, source) {
Membership.prototype.makeSuspect = function makeSuspect(address, incarnationNumber) {
var localMember = this.localMember || {};

return this.update({
source: source || this.ringpop.whoami(),
source: localMember.address,
sourceIncarnationNumber: localMember.incarnationNumber,
address: address,
status: Member.Status.suspect,
incarnationNumber: incarnationNumber,
Expand Down
4 changes: 3 additions & 1 deletion lib/swim/ping-recvr.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
module.exports = function recvPing(opts, callback) {
var ringpop = opts.ringpop;
var source = opts.source;
var sourceIncarnationNumber = opts.sourceIncarnationNumber;
var changes = opts.changes;
var checksum = opts.checksum;

Expand All @@ -33,6 +34,7 @@ module.exports = function recvPing(opts, callback) {
ringpop.membership.update(changes);

callback(null, {
changes: ringpop.dissemination.issueChanges(checksum, source)
changes: ringpop.dissemination.issueAsReceiver(source,
sourceIncarnationNumber, checksum),
});
};
4 changes: 3 additions & 1 deletion lib/swim/ping-req-recvr.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module.exports = function recvPingReq(opts, callback) {
ringpop.stat('increment', 'ping-req.recv');

var source = opts.source;
var sourceIncarnationNumber = opts.sourceIncarnationNumber;
var target = opts.target;
var changes = opts.changes;
var checksum = opts.checksum;
Expand All @@ -50,7 +51,8 @@ module.exports = function recvPingReq(opts, callback) {
}

callback(null, {
changes: ringpop.dissemination.issueChanges(checksum, source),
changes: ringpop.dissemination.issueAsReceiver(source,
sourceIncarnationNumber, checksum),
pingStatus: isOk,
target: target
});
Expand Down
32 changes: 15 additions & 17 deletions lib/swim/ping-req-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

var safeParse = require('../util').safeParse;
var TypedError = require('error/typed');

Expand Down Expand Up @@ -53,22 +54,6 @@ var PingReqPingError = TypedError({
errMessage: null
});

function body(sender) {
return JSON.stringify({
checksum: sender.ring.membership.checksum,
changes: sender.ring.dissemination.issueChanges(),
source: sender.ring.whoami(),
target: sender.target.address
});
}

function sendOptions(sender) {
return {
host: sender.member.address,
timeout: sender.ring.pingReqTimeout
};
}

function PingReqSender(ring, member, target, callback) {
this.ring = ring;
this.member = member;
Expand All @@ -79,7 +64,20 @@ function PingReqSender(ring, member, target, callback) {
PingReqSender.prototype.send = function send() {
var self = this;

this.ring.channel.send(sendOptions(this), '/protocol/ping-req', null, body(this), onSend);
var channelOpts = {
host: this.member.address,
timeout: this.ring.pingReqTimeout
};

var body = JSON.stringify({
checksum: this.ring.membership.checksum,
changes: this.ring.dissemination.issueAsSender(),
source: this.ring.whoami(),
sourceIncarnationNumber: this.ring.membership.getIncarnationNumber(),
target: this.target.address
});

this.ring.channel.send(channelOpts, '/protocol/ping-req', null, body, onSend);

function onSend(err, res1, res2) {
self.onPingReq(err, res1, res2);
Expand Down
9 changes: 6 additions & 3 deletions lib/swim/ping-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

var safeParse = require('../util').safeParse;

function PingSender(ring, member, callback) {
Expand Down Expand Up @@ -58,17 +59,19 @@ PingSender.prototype.send = function send() {
host: this.address,
timeout: this.ring.pingTimeout
};
var changes = this.ring.dissemination.issueChanges();
var changes = this.ring.dissemination.issueAsSender();

var body = JSON.stringify({
checksum: this.ring.membership.checksum,
changes: changes,
source: this.ring.whoami()
source: this.ring.whoami(),
sourceIncarnationNumber: this.ring.membership.getIncarnationNumber()
});

this.ring.debugLog('ping send member=' + this.address + ' changes=' + JSON.stringify(changes), 'p');

var self = this;
this.ring.channel.send(options, '/protocol/ping', null, body, function(err, res1, res2) {
this.ring.channel.send(options, '/protocol/ping', null, body, function onSend(err, res1, res2) {
self.onPing(err, res1, res2);
});
};
Expand Down
8 changes: 8 additions & 0 deletions lib/tchannel.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,17 @@ RingPopTChannel.prototype.protocolJoin = function (arg1, arg2, hostInfo, cb) {

RingPopTChannel.prototype.protocolPing = function (arg1, arg2, hostInfo, cb) {
var body = safeParse(arg2);

// NOTE sourceIncarnationNumber is an optional argument. It was not present
// until after the v9.8.12 release.
if (body === null || !body.source || !body.changes || !body.checksum) {
return cb(new Error('need req body with source, changes, and checksum'));
}

recvPing({
ringpop: this.ringpop,
source: body.source,
sourceIncarnationNumber: body.sourceIncarnationNumber,
changes: body.changes,
checksum: body.checksum
}, function(err, res) {
Expand All @@ -179,13 +183,17 @@ RingPopTChannel.prototype.protocolPing = function (arg1, arg2, hostInfo, cb) {

RingPopTChannel.prototype.protocolPingReq = function protocolPingReq(arg1, arg2, hostInfo, cb) {
var body = safeParse(arg2);

// NOTE sourceIncarnationNumber is an optional argument. It was not present
// until after the v9.8.12 release.
if (body === null || !body.source || !body.target || !body.changes || !body.checksum) {
return cb(new Error('need req body with source, target, changes, and checksum'));
}

recvPingReq({
ringpop: this.ringpop,
source: body.source,
sourceIncarnationNumber: body.sourceIncarnationNumber,
target: body.target,
changes: body.changes,
checksum: body.checksum
Expand Down
Loading

0 comments on commit 05722fd

Please sign in to comment.