Skip to content

Commit

Permalink
only raise piggyback count if ping is succesfull
Browse files Browse the repository at this point in the history
  • Loading branch information
Wieger Steggerda committed Dec 15, 2015
1 parent 1ba586b commit b95b02f
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 46 deletions.
45 changes: 30 additions & 15 deletions lib/gossip/dissemination.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,36 @@ Dissemination.prototype.isEmpty = function isEmpty() {
return Object.keys(this.changes).length === 0;
};

Dissemination.prototype.issueAsSender = function issueAsSender() {
return this._issueAs(null, mapChanges);

function mapChanges(changes) {
Dissemination.prototype.issueAsSender = function issueAsSender(issue) {
var self = this;
var membershipChanges = this._issueAs(null, function map(changes) {
return changes;
}
});

issue(membershipChanges, function onIssue(err) {
// Raise the piggyback count if the local change has been sent
// successfully. When the local change has been updated we don't
// want to raise the piggyback count. We can deduce if the change
// has been updated by comparing the id's of the changes.
if (err) {
self.ringpop.stat('increment', 'dont-bump-piggyback-count');
self.logger.info('ringpop dissemination not bumping piggyback count', {
local: self.ringpop.whoami(),
err: err,
numMembershipChanges: membershipChanges.length
});
return;
}

for (var i = 0; i < membershipChanges.length; i++) {
var issuedChange = membershipChanges[i];
var localChange = self.changes[issuedChange.address];
if (localChange && localChange.id === issuedChange.id) {
localChange.piggybackCount++;
}
}
});

};

Dissemination.prototype.issueAsReceiver = function issueAsReceiver(senderAddr, senderIncarnationNumber, senderChecksum) {
Expand Down Expand Up @@ -130,6 +154,7 @@ Dissemination.prototype.onRingChanged = function onRingChanged() {

Dissemination.prototype.recordChange = function recordChange(change) {
this.changes[change.address] = change;
this.changes[change.address].piggybackCount = 0;
};

Dissemination.prototype.resetMaxPiggybackCount = function resetMaxPiggybackCount() {
Expand All @@ -153,21 +178,11 @@ Dissemination.prototype._issueAs = function _issueAs(filterChange, mapChanges) {
var address = changedNodes[i];
var change = this.changes[address];

// TODO We're bumping the piggyback count even though
// we don't know whether the change successfully made
// it over to the other side. This can result in undesired
// full-syncs.
if (typeof change.piggybackCount === 'undefined') {
change.piggybackCount = 0;
}

if (typeof filterChange === 'function' && filterChange(change)) {
this.ringpop.stat('increment', 'filtered-change');
continue;
}

change.piggybackCount += 1;

if (change.piggybackCount > this.maxPiggybackCount) {
delete this.changes[address];
if (Object.keys(this.changes).length === 0) {
Expand Down
29 changes: 16 additions & 13 deletions lib/gossip/ping-req-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,22 @@ function PingReqSender(ring, member, target, callback) {

PingReqSender.prototype.send = function send() {
var self = this;

this.ring.client.protocolPingReq({
host: this.member.address,
retryLimit: this.ring.config.get('tchannelRetryLimit'),
timeout: this.ring.pingReqTimeout
}, {
checksum: this.ring.membership.checksum,
changes: this.ring.dissemination.issueAsSender(),
source: this.ring.whoami(),
sourceIncarnationNumber: this.ring.membership.getIncarnationNumber(),
target: this.target.address
}, function onJoin(err, res) {
self.onPingReq(err, res);

self.ring.dissemination.issueAsSender(function issue(changes, onIssue) {
self.ring.client.protocolPingReq({
host: self.member.address,
retryLimit: self.ring.config.get('tchannelRetryLimit'),
timeout: self.ring.pingReqTimeout
}, {
checksum: self.ring.membership.checksum,
changes: changes,
source: self.ring.whoami(),
sourceIncarnationNumber: self.ring.membership.getIncarnationNumber(),
target: self.target.address
}, function onPingReq(err, res) {
self.onPingReq(err, res);
onIssue(err);
});
});
};

Expand Down
39 changes: 21 additions & 18 deletions lib/gossip/ping-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,28 @@ PingSender.prototype.onPing = function onPing(err, res) {
};

PingSender.prototype.send = function send() {
var changes = this.ring.dissemination.issueAsSender();
this.gossipLogger.info('ringpop ping send', {
local: this.ring.whoami(),
member: this.address,
changes: changes
});

var self = this;
this.ring.client.protocolPing({
host: this.address,
retryLimit: this.ring.config.get('tchannelRetryLimit'),
timeout: this.ring.pingTimeout
}, {
checksum: this.ring.membership.checksum,
changes: changes,
source: this.ring.whoami(),
sourceIncarnationNumber: this.ring.membership.getIncarnationNumber()
}, function onPing(err, res) {
self.onPing(err, res);

self.ring.dissemination.issueAsSender(function issue(changes, onIssue) {
self.gossipLogger.info('ringpop ping send', {
local: self.ring.whoami(),
member: self.address,
changes: changes
});

self.ring.client.protocolPing({
host: self.address,
retryLimit: self.ring.config.get('tchannelRetryLimit'),
timeout: self.ring.pingTimeout
}, {
checksum: self.ring.membership.checksum,
changes: changes,
source: self.ring.whoami(),
sourceIncarnationNumber: self.ring.membership.getIncarnationNumber()
}, function onPing(err, res) {
onIssue(err);
self.onPing(err, res);
});
});
};

Expand Down
1 change: 1 addition & 0 deletions lib/membership/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ function Membership(opts) {
this.checksum = null;
this.stashedUpdates = [];
this.decayTimer = null;
this.localMember = null;
}

util.inherits(Membership, EventEmitter);
Expand Down
2 changes: 2 additions & 0 deletions test/integration/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ require('./proxy_test.js');
require('./ring-test.js');
require('./gossip_test.js');
require('./tchannel-proxy-test.js');
require('./not-ready-test.js');
require('./piggyback-test.js');
66 changes: 66 additions & 0 deletions test/integration/piggyback-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

var testRingpopCluster = require('../lib/test-ringpop-cluster.js');

testRingpopCluster({
size: 2,
waitForConvergence: false,
autoGossip: false
}, 'raise piggyback counter', function t(bootRes, cluster, assert) {
assert.plan(1);

var pingSender = cluster[0];
pingSender.gossip.tick(function() {
var changes = pingSender.dissemination.changes;
var joinChange = changes[pingSender.whoami()];
var piggybackCount = joinChange.piggybackCount;
assert.equals(piggybackCount, 1, 'piggyback counter raised by one');
assert.end();
});
});

function mkBadPingResponder(ringpop) {
ringpop.channel.register('/protocol/ping', function protocolPing(req, res) {
res.headers.as = 'raw';
res.sendNotOk(null, JSON.stringify('ping failed on purpose'));
});
}

testRingpopCluster({
size: 2,
waitForConvergence: false,
autoGossip: false,
tap: function tap(cluster) {
mkBadPingResponder(cluster[1]);
},
}, 'don\'t raise piggyback counter when ping fails', function t(bootRes, cluster, assert) {
assert.plan(1);

var pingSender = cluster[0];
pingSender.gossip.tick(function() {
var changes = pingSender.dissemination.changes;
var joinChange = changes[pingSender.whoami()];
var piggybackCount = joinChange.piggybackCount;
assert.equals(piggybackCount, 0, 'piggyback counter not raised');
assert.end();
});
});

0 comments on commit b95b02f

Please sign in to comment.