Skip to content

Commit

Permalink
Merge pull request #233 from uber/ws-defensive-piggyback-count-bumping
Browse files Browse the repository at this point in the history
only raise piggyback count if ping is succesfull
  • Loading branch information
jwolski2 committed Dec 16, 2015
2 parents 118fe07 + 72092d0 commit b8b6e40
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 53 deletions.
4 changes: 0 additions & 4 deletions lib/gossip/damper.js
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,6 @@ Damper.prototype._fanoutDampReqs = function _fanoutDampReqs(flapperAddrs, dampRe
if (err) {
errors.push(err);
} else {
if (Array.isArray(res.changes)) {
self.ringpop.membership.update(res.changes);
}

// Enrich the result with the addr of the damp
// req member for reporting purposes.
res.dampReqAddr = addr;
Expand Down
44 changes: 29 additions & 15 deletions lib/gossip/dissemination.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,33 @@ Dissemination.prototype.isEmpty = function isEmpty() {
return Object.keys(this.changes).length === 0;
};

Dissemination.prototype.issueAsSender = function issueAsSender() {
return this._issueAs(null, mapChanges);

function mapChanges(changes) {
Dissemination.prototype.issueAsSender = function issueAsSender(issue) {
var self = this;
var membershipChanges = this._issueAs(null, function map(changes) {
return changes;
}
});

issue(membershipChanges, function onIssue(err) {
// Bump the piggyback count only when we get confirmation that
// dissemination was successful.
if (err) {
self.ringpop.stat('increment', 'dissemination.bump-bypass');
self.logger.info('ringpop dissemination not bumping piggyback count', {
local: self.ringpop.whoami(),
err: err,
numMembershipChanges: membershipChanges.length
});
return;
}

for (var i = 0; i < membershipChanges.length; i++) {
var issuedChange = membershipChanges[i];
var localChange = self.changes[issuedChange.address];
if (localChange && localChange.id === issuedChange.id) {
localChange.piggybackCount++;
}
}
});
};

Dissemination.prototype.issueAsReceiver = function issueAsReceiver(senderAddr, senderIncarnationNumber, senderChecksum) {
Expand Down Expand Up @@ -129,6 +150,9 @@ Dissemination.prototype.onRingChanged = function onRingChanged() {
};

Dissemination.prototype.recordChange = function recordChange(change) {
// TODO This should not mutate the input. But it does and will
// until we can refactor.
change.piggybackCount = 0;
this.changes[change.address] = change;
};

Expand All @@ -153,21 +177,11 @@ Dissemination.prototype._issueAs = function _issueAs(filterChange, mapChanges) {
var address = changedNodes[i];
var change = this.changes[address];

// TODO We're bumping the piggyback count even though
// we don't know whether the change successfully made
// it over to the other side. This can result in undesired
// full-syncs.
if (typeof change.piggybackCount === 'undefined') {
change.piggybackCount = 0;
}

if (typeof filterChange === 'function' && filterChange(change)) {
this.ringpop.stat('increment', 'filtered-change');
continue;
}

change.piggybackCount += 1;

if (change.piggybackCount > this.maxPiggybackCount) {
delete this.changes[address];
if (Object.keys(this.changes).length === 0) {
Expand Down
27 changes: 15 additions & 12 deletions lib/gossip/ping-req-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,21 @@ function PingReqSender(ring, member, target, callback) {
PingReqSender.prototype.send = function send() {
var self = this;

this.ring.client.protocolPingReq({
host: this.member.address,
retryLimit: this.ring.config.get('tchannelRetryLimit'),
timeout: this.ring.pingReqTimeout
}, {
checksum: this.ring.membership.checksum,
changes: this.ring.dissemination.issueAsSender(),
source: this.ring.whoami(),
sourceIncarnationNumber: this.ring.membership.getIncarnationNumber(),
target: this.target.address
}, function onJoin(err, res) {
self.onPingReq(err, res);
self.ring.dissemination.issueAsSender(function issue(changes, onIssue) {
self.ring.client.protocolPingReq({
host: self.member.address,
retryLimit: self.ring.config.get('tchannelRetryLimit'),
timeout: self.ring.pingReqTimeout
}, {
checksum: self.ring.membership.checksum,
changes: changes,
source: self.ring.whoami(),
sourceIncarnationNumber: self.ring.membership.getIncarnationNumber(),
target: self.target.address
}, function onPingReq(err, res) {
self.onPingReq(err, res);
onIssue(err);
});
});
};

Expand Down
39 changes: 21 additions & 18 deletions lib/gossip/ping-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,28 @@ PingSender.prototype.onPing = function onPing(err, res) {
};

PingSender.prototype.send = function send() {
var changes = this.ring.dissemination.issueAsSender();
this.gossipLogger.info('ringpop ping send', {
local: this.ring.whoami(),
member: this.address,
changes: changes
});

var self = this;
this.ring.client.protocolPing({
host: this.address,
retryLimit: this.ring.config.get('tchannelRetryLimit'),
timeout: this.ring.pingTimeout
}, {
checksum: this.ring.membership.checksum,
changes: changes,
source: this.ring.whoami(),
sourceIncarnationNumber: this.ring.membership.getIncarnationNumber()
}, function onPing(err, res) {
self.onPing(err, res);

self.ring.dissemination.issueAsSender(function issue(changes, onIssue) {
self.gossipLogger.info('ringpop ping send', {
local: self.ring.whoami(),
member: self.address,
changes: changes
});

self.ring.client.protocolPing({
host: self.address,
retryLimit: self.ring.config.get('tchannelRetryLimit'),
timeout: self.ring.pingTimeout
}, {
checksum: self.ring.membership.checksum,
changes: changes,
source: self.ring.whoami(),
sourceIncarnationNumber: self.ring.membership.getIncarnationNumber()
}, function onPing(err, res) {
onIssue(err);
self.onPing(err, res);
});
});
};

Expand Down
1 change: 1 addition & 0 deletions lib/membership/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ function Membership(opts) {
this.checksum = null;
this.stashedUpdates = [];
this.decayTimer = null;
this.localMember = null;
}

util.inherits(Membership, EventEmitter);
Expand Down
5 changes: 1 addition & 4 deletions request_response.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ function ProtocolRequest(ringpop) {
this.sourceAddr = localMember.address;
this.sourceIncarnationNumber = localMember.incarnationNumber;
this.sourceChecksum = ringpop.membership.checksum;
this.changes = ringpop.dissemination.issueAsSender();
}

function ProtocolResponse(ringpop, req) {
this.changes = ringpop.dissemination.issueAsReceiver(req.sourceAddr,
req.sourceIncarnationNumber, req.sourceChecksum);
function ProtocolResponse(/*ringpop*/) {
}

function DampReqRequest(ringpop, flappers) {
Expand Down
2 changes: 2 additions & 0 deletions test/integration/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ require('./proxy_test.js');
require('./ring-test.js');
require('./gossip_test.js');
require('./tchannel-proxy-test.js');
require('./not-ready-test.js');
require('./piggyback-test.js');
66 changes: 66 additions & 0 deletions test/integration/piggyback-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

var testRingpopCluster = require('../lib/test-ringpop-cluster.js');

testRingpopCluster({
size: 2,
waitForConvergence: false,
autoGossip: false
}, 'raise piggyback counter', function t(bootRes, cluster, assert) {
assert.plan(1);

var pingSender = cluster[0];
pingSender.gossip.tick(function() {
var changes = pingSender.dissemination.changes;
var joinChange = changes[pingSender.whoami()];
var piggybackCount = joinChange.piggybackCount;
assert.equals(piggybackCount, 1, 'piggyback counter raised by one');
assert.end();
});
});

function mkBadPingResponder(ringpop) {
ringpop.channel.register('/protocol/ping', function protocolPing(req, res) {
res.headers.as = 'raw';
res.sendNotOk(null, JSON.stringify('ping failed on purpose'));
});
}

testRingpopCluster({
size: 2,
waitForConvergence: false,
autoGossip: false,
tap: function tap(cluster) {
mkBadPingResponder(cluster[1]);
},
}, 'don\'t raise piggyback counter when ping fails', function t(bootRes, cluster, assert) {
assert.plan(1);

var pingSender = cluster[0];
pingSender.gossip.tick(function() {
var changes = pingSender.dissemination.changes;
var joinChange = changes[pingSender.whoami()];
var piggybackCount = joinChange.piggybackCount;
assert.equals(piggybackCount, 0, 'piggyback counter not raised');
assert.end();
});
});

0 comments on commit b8b6e40

Please sign in to comment.