Skip to content

Commit

Permalink
Bug fix for flappy test having to do with talking to TChannel when de…
Browse files Browse the repository at this point in the history
…stroyed
  • Loading branch information
jwolski committed Nov 13, 2015
1 parent c140c01 commit 7a2bea9
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 154 deletions.
64 changes: 54 additions & 10 deletions client.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@

var safeParse = require('./lib/util.js').safeParse;
var TChannel = require('tchannel');
var TypedError = require('error/typed');

var ChannelDestroyedError = TypedError({
type: 'ringpop.client.channel-destroyed',
message: 'Channel is already destroyed',
endpoint: null
});

function RingpopClient(subChannel) {
this.subChannel = subChannel;
Expand All @@ -35,23 +42,45 @@ function RingpopClient(subChannel) {
}

RingpopClient.prototype.adminConfigGet = function adminConfigGet(host, body, callback) {
this._request(host, '/admin/config/get', null, body, callback);
this._request({
host: host
}, '/admin/config/get', null, body, callback);
};

RingpopClient.prototype.adminConfigSet = function adminConfigSet(host, body, callback) {
this._request(host, '/admin/config/set', null, body, callback);
this._request({
host: host
}, '/admin/config/set', null, body, callback);
};

RingpopClient.prototype.adminGossipStart = function adminGossipStart(host, callback) {
this._request(host, '/admin/gossip/start', null, null, callback);
this._request({
host: host
}, '/admin/gossip/start', null, null, callback);
};

RingpopClient.prototype.adminGossipStop = function adminGossipStop(host, callback) {
this._request(host, '/admin/gossip/stop', null, null, callback);
this._request({
host: host
}, '/admin/gossip/stop', null, null, callback);
};

RingpopClient.prototype.adminGossipTick = function adminGossipTick(host, callback) {
this._request(host, '/admin/gossip/tick', null, null, callback);
this._request({
host: host
}, '/admin/gossip/tick', null, null, callback);
};

RingpopClient.prototype.protocolJoin = function protocolJoin(opts, body, callback) {
this._request(opts, '/protocol/join', null, body, callback);
};

RingpopClient.prototype.protocolPing = function protocolPing(opts, body, callback) {
this._request(opts, '/protocol/ping', null, body, callback);
};

RingpopClient.prototype.protocolPingReq = function protocolPingReq(opts, body, callback) {
this._request(opts, '/protocol/ping-req', null, body, callback);
};

RingpopClient.prototype.destroy = function destroy(callback) {
Expand All @@ -61,30 +90,45 @@ RingpopClient.prototype.destroy = function destroy(callback) {
};

/* jshint maxparams: 5 */
RingpopClient.prototype._request = function _request(host, endpoint, head, body, callback) {
RingpopClient.prototype._request = function _request(opts, endpoint, head, body, callback) {
var self = this;

if (this.subChannel.destroyed) {
process.nextTick(function onTick() {
callback(ChannelDestroyedError({
endpoint: endpoint
}));
});
return;
}

this.subChannel.waitForIdentified({
host: host
host: opts.host
}, function onIdentified(err) {
if (err) {
callback(err);
return;
}

self.subChannel.request({
host: host,
host: opts.host,
serviceName: 'ringpop',
hasNoParent: true,
retryLimit: 1,
retryLimit: opts.retryLimit || 0,
trace: false,
headers: {
as: 'raw',
cn: 'ringpop'
}
},
timeout: opts.timeout || 30000
}).send(endpoint, JSON.stringify(head), JSON.stringify(body), onSend);
});

function onSend(err, res, arg2, arg3) {
if (!err && !res.ok) {
err = safeParse(arg3) || new Error('Server Error');
}

if (err) {
callback(err);
return;
Expand Down
69 changes: 20 additions & 49 deletions lib/gossip/joiner.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ var globalTimers = require('timers');
var isEmptyArray = require('../util.js').isEmptyArray;
var mergeJoinResponses = require('./join-response-merge.js');
var numOrDefault = require('../util.js').numOrDefault;
var safeParse = require('../util.js').safeParse;
var TypedError = require('error/typed');

var JoinAbortedError = TypedError({
Expand Down Expand Up @@ -380,61 +379,33 @@ Joiner.prototype.joinGroup = function joinGroup(totalNodesJoined, callback) {

Joiner.prototype.joinNode = function joinNode(node, callback) {
var self = this;
var joinOpts = {
self.ringpop.client.protocolJoin({
host: node,
timeout: this.joinTimeout,
serviceName: 'ringpop',
hasNoParent: true,
retryLimit: 1,
trace: false,
headers: {
'as': 'raw',
'cn': 'ringpop'
}
};
var joinBody = JSON.stringify({
timeout: this.joinTimeout
}, {
app: this.ringpop.app,
source: this.ringpop.whoami(),
incarnationNumber: this.ringpop.membership.localMember.incarnationNumber
});

self.ringpop.channel
.waitForIdentified({
host: joinOpts.host
}, onIdentified);

function onIdentified(err) {
}, function onJoin(err, res) {
if (err) {
callback(err);
} else {
self.ringpop.channel
.request(joinOpts)
.send('/protocol/join', null, joinBody, function onSend(err, res, arg2, arg3) {
if (!err && !res.ok) {
err = new Error(String(arg3));
}

if (err) {
return callback(err, node);
}

var bodyObj = safeParse(arg3.toString());

// Verify that `joinResponses` is not null. It is set
// to null upon completion of the join process. There may,
// however, be in-flight /protocol/join requests that have
// yet to complete.
if (bodyObj && self.joinResponses !== null) {
self.joinResponses.push({
checksum: bodyObj.membershipChecksum,
members: bodyObj.membership
});
}

callback(null, node);
});
callback(err, node);
return;
}
}

// Verify that `joinResponses` is not null. It is set
// to null upon completion of the join process. There may,
// however, be in-flight /protocol/join requests that have
// yet to complete.
if (res && self.joinResponses !== null) {
self.joinResponses.push({
checksum: res.membershipChecksum,
members: res.membership
});
}

callback(null, node);
});
};

Joiner.prototype.selectGroup = function selectGroup(nodesJoined) {
Expand Down
57 changes: 12 additions & 45 deletions lib/gossip/ping-req-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
// THE SOFTWARE.
'use strict';

var safeParse = require('../util').safeParse;
var TypedError = require('error/typed');

var BadPingReqPingStatusError = TypedError({
Expand Down Expand Up @@ -60,52 +59,22 @@ function PingReqSender(ring, member, target, callback) {
PingReqSender.prototype.send = function send() {
var self = this;

var channelOpts = {
this.ring.client.protocolPingReq({
host: this.member.address,
timeout: this.ring.pingReqTimeout,
serviceName: 'ringpop',
hasNoParent: true,
trace: false,
retryLimit: 1,
headers: {
'as': 'raw',
'cn': 'ringpop'
}
};

var body = JSON.stringify({
timeout: this.ring.pingReqTimeout
}, {
checksum: this.ring.membership.checksum,
changes: this.ring.dissemination.issueAsSender(),
source: this.ring.whoami(),
sourceIncarnationNumber: this.ring.membership.getIncarnationNumber(),
target: this.target.address
}, function onJoin(err, res) {
self.onPingReq(err, res);
});

this.ring.channel
.waitForIdentified({
host: channelOpts.host
}, onIdentified);


function onIdentified(err) {
if (err) {
return self.onPingReq(err);
}

self.ring.channel
.request(channelOpts)
.send('/protocol/ping-req', null, body, onSend);
}

function onSend(err, res, arg2, arg3) {
if (!err && !res.ok) {
err = new Error(String(arg3));
}
self.onPingReq(err, arg2, arg3);
}
};

PingReqSender.prototype.onPingReq = function (err, res1, res2) {
PingReqSender.prototype.onPingReq = function (err, res) {
if (err) {
this.ring.logger.warn('bad response to ping-req', {
address: this.member.address,
Expand All @@ -117,33 +86,31 @@ PingReqSender.prototype.onPingReq = function (err, res1, res2) {
return;
}

var res2Str = res2.toString();
var bodyObj = safeParse(res2Str);
if (! bodyObj || !bodyObj.changes || bodyObj.pingStatus === 'undefined') {
if (! res || !res.changes || res.pingStatus === 'undefined') {
this.ring.logger.warn('bad response body in ping-req', {
address: this.member.address
});
this.callback(BadPingReqRespBodyError({
selected: this.member.address,
target: this.target.address,
body: res2Str
body: res
}));
return;
}

this.ring.membership.update(bodyObj.changes);
this.ring.membership.update(res.changes);
this.gossipLogger.info('ringpop ping-req response', {
local: this.ring.whoami(),
source: this.member.address,
target: this.target.address,
isOk: bodyObj.pingStatus
isOk: res.pingStatus
});

if (!bodyObj.pingStatus) {
if (!res.pingStatus) {
this.callback(BadPingReqPingStatusError({
selected: this.member.address,
target: this.target.address,
pingStatus: bodyObj.pingStatus
pingStatus: res.pingStatus
}));
return;
}
Expand Down
Loading

0 comments on commit 7a2bea9

Please sign in to comment.