Skip to content

Commit

Permalink
Make central location for handling events from major modules like rin…
Browse files Browse the repository at this point in the history
…g and membership
  • Loading branch information
jwolski committed Oct 4, 2015
1 parent 5d81b5f commit dc4426d
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 158 deletions.
14 changes: 7 additions & 7 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ var Gossip = require('./lib/gossip');
var Suspicion = require('./lib/gossip/suspicion');

var Config = require('./config.js');
var createEventForwarder = require('./lib/event-forwarder.js');
var createMembershipSetListener = require('./lib/membership-set-listener.js');
var createMembershipUpdateListener = require('./lib/membership-update-listener.js');
var Dissemination = require('./lib/gossip/dissemination.js');
var errors = require('./lib/errors.js');
var getTChannelVersion = require('./lib/util.js').getTChannelVersion;
Expand All @@ -57,6 +54,9 @@ var MembershipUpdateRollup = require('./lib/membership/rollup.js');
var nulls = require('./lib/nulls');
var rawHead = require('./lib/request-proxy/util.js').rawHead;
var RequestProxy = require('./lib/request-proxy/index.js');
var registerMembershipListeners = require('./lib/on_membership_event.js').register;
var registerRingListeners = require('./lib/on_ring_event.js').register;
var registerRingpopListeners = require('./lib/on_ringpop_event.js').register;
var RingpopClient = require('./client.js');
var RingpopServer = require('./server');
var safeParse = require('./lib/util').safeParse;
Expand Down Expand Up @@ -134,8 +134,6 @@ function RingPop(options) {
this.dissemination = new Dissemination(this);

this.membership = initMembership(this);
this.membership.on('set', createMembershipSetListener(this));
this.membership.on('updated', createMembershipUpdateListener(this));
this.memberIterator = new MembershipIterator(this);
this.gossip = new Gossip({
ringpop: this,
Expand All @@ -150,10 +148,12 @@ function RingPop(options) {
flushInterval: this.membershipUpdateFlushInterval
});

createEventForwarder(this);

this.tracers = new TracerStore(this);

registerMembershipListeners(this);
registerRingListeners(this);
registerRingpopListeners(this);

this.clientRate = new metrics.Meter();
this.serverRate = new metrics.Meter();
this.totalRate = new metrics.Meter();
Expand Down
17 changes: 0 additions & 17 deletions lib/gossip/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@
// THE SOFTWARE.
'use strict';

var MembershipEvents = require('../membership/events.js');
var metrics = require('metrics');
var sendPing = require('./ping-sender.js');
var sendPingReq = require('./ping-req-sender.js');

function Gossip(options) {
this.ringpop = options.ringpop;
this.ringpop.on('ready', this._onRingpopReady.bind(this));
this.ringpop.membership.on('event', this._onMembershipEvent.bind(this));
this.minProtocolPeriod = options.minProtocolPeriod ||
Gossip.Defaults.minProtocolPeriod;

Expand Down Expand Up @@ -194,20 +191,6 @@ Gossip.prototype.tick = function tick(callback) {
});
};

Gossip.prototype._onRingpopReady = function _onRingpopReady() {
if (this.ringpop.config.get('autoGossip')) {
this.start();
}
};

Gossip.prototype._onMembershipEvent = function _onMembershipEvent(event) {
switch (event.name) {
case MembershipEvents.LocalMemberLeaveEvent.Name:
this.stop();
break;
}
};

Gossip.Defaults = {
minProtocolPeriod: 200
};
Expand Down
11 changes: 0 additions & 11 deletions lib/gossip/suspicion.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
// THE SOFTWARE.
'use strict';

var MembershipEvents = require('../membership/events.js');

function Suspicion(options) {
this.ringpop = options.ringpop;
this.ringpop.membership.on('event', this._onMembershipEvent.bind(this));
this.period = options.suspicionTimeout ||
Suspicion.Defaults.suspicionTimeout;

Expand Down Expand Up @@ -111,14 +108,6 @@ Suspicion.prototype.stopAll = function stopAll() {
});
};

Suspicion.prototype._onMembershipEvent = function _onMembershipEvent(event) {
switch (event.name) {
case MembershipEvents.LocalMemberLeaveEvent.Name:
this.stopAll();
break;
}
};

Suspicion.Defaults = {
suspicionTimeout: 5000
};
Expand Down
76 changes: 0 additions & 76 deletions lib/membership-update-listener.js

This file was deleted.

158 changes: 158 additions & 0 deletions lib/on_membership_event.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

var Member = require('./membership/member.js');
var MembershipEvents = require('./membership/events.js');

function createChecksumComputedHandler(ringpop) {
return function onMembershipChecksumComputed() {
ringpop.stat('increment', 'membership.checksum-computed');
ringpop.emit('membershipChecksumComputed');
};
}

function createEventHandler(ringpop) {
return function onEvent(event) {
switch (event.name) {
case MembershipEvents.LocalMemberLeaveEvent.Name:
ringpop.gossip.stop();
ringpop.suspicion.stopAll();
break;
}
};
}
function createSetHandler(ringpop) {
return function onMembershipSet(updates) {
var serversToAdd = [];

for (var i = 0; i < updates.length; i++) {
var update = updates[i];

ringpop.stat('increment', 'membership-set.' + (update.status || 'unknown'));

if (update.status === Member.Status.alive) {
serversToAdd.push(update.address);
} else if (update.status === Member.Status.suspect) {
serversToAdd.push(update.address);
ringpop.suspicion.start(update);
}

ringpop.dissemination.recordChange(update);
}

// Must add/remove servers from ring in batch. There are
// efficiency gains when only having to compute the ring
// checksum once.
if (serversToAdd.length > 0) {
ringpop.ring.addRemoveServers(serversToAdd);
}
};
}

function createUpdatedHandler(ringpop) {
return function onMembershipUpdated(updates) {
for (var i = 0; i < updates.length; i++) {
var update = updates[i];
ringpop.stat('increment', 'membership-update.' +
(update.status || 'unknown'));
}

ringpop.membershipUpdateRollup.trackUpdates(updates);
ringpop.stat('gauge', 'num-members', ringpop.membership.members.length);
ringpop.stat('timing', 'updates', updates.length);
ringpop.emit('membershipChanged');
ringpop.emit('changed'); // Deprecated
};
}

function createUpdatedHandlerForGossip(ringpop) {
return function onUpdated(updates) {
for (var i = 0; i < updates.length; i++) {
var update = updates[i];
switch (update.status) {
case Member.Status.alive:
case Member.Status.faulty:
case Member.Status.leave:
ringpop.suspicion.stop(update);
break;
case Member.Status.suspect:
ringpop.suspicion.start(update);
break;
}

ringpop.dissemination.recordChange(update);
}
};
}

function createUpdatedHandlerForRing(ringpop) {
return function onUpdated(updates) {
var serversToAdd = [], serversToRemove = [];
for (var i = 0; i < updates.length; i++) {
var update = updates[i];
switch (update.status) {
case Member.Status.alive:
serversToAdd.push(update.address);
break;
case Member.Status.faulty:
case Member.Status.leave:
serversToRemove.push(update.address);
break;
}
}

// Must add/remove servers from ring in batch. There are
// efficiency gains when only having to compute the ring
// checksum once.
if (serversToAdd.length > 0 || serversToRemove.length > 0) {
var ringChanged = ringpop.ring.addRemoveServers(serversToAdd,
serversToRemove);

if (ringChanged) {
ringpop.emit('ringChanged');
}
}
};
}

function register(ringpop) {
if (!ringpop.membership) {
return;
}

var membership = ringpop.membership;
membership.on('checksumComputed', createChecksumComputedHandler(ringpop));
membership.on('event', createEventHandler(ringpop));
membership.on('set', createSetHandler(ringpop));
membership.on('updated', createUpdatedHandler(ringpop));
membership.on('updated', createUpdatedHandlerForGossip(ringpop));
membership.on('updated', createUpdatedHandlerForRing(ringpop));
}

module.exports = {
createChecksumComputedHandler: createChecksumComputedHandler,
createEventHandler: createEventHandler,
createSetHandler: createSetHandler,
createUpdatedHandler: createUpdatedHandler,
createUpdatedHandlerForGossip: createUpdatedHandlerForGossip,
createUpdatedHandlerForRing: createUpdatedHandlerForRing,
register: register
};
Loading

0 comments on commit dc4426d

Please sign in to comment.