Skip to content

Commit

Permalink
Fix cluster listening support #211 (#334)
Browse files Browse the repository at this point in the history
* Adding most listen cluster support

* Moving distributed-state-registry to cluster package

* Adding unit tests for cluster functionality

* Adding more cluster based listen tests

* Removing test listener leak

* comments and minor tweaks

* Code review

* Changing default timeout values

* Reverting package updates

* Code review suggestions
  • Loading branch information
yasserf authored Aug 26, 2016
1 parent 73ff9ab commit ba2d0a2
Show file tree
Hide file tree
Showing 44 changed files with 2,265 additions and 1,235 deletions.
8 changes: 7 additions & 1 deletion conf/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,10 @@ clusterKeepAliveInterval: 5000
# The interval at which all nodes within a cluster registry are checked for timeouts
clusterActiveCheckInterval: 1000
# The longest duration since the last status message was received until the remote node is declared inactive
clusterNodeInactiveTimeout: 6000
clusterNodeInactiveTimeout: 6000
# The amount of time to wait for a provider to acknowledge or reject a listen request
listenResponseTimeout: 500
# The amount of time a lock can be reserved for before it is force released
lockTimeout: 1000
# The amount of time a lock request waits for before it defaults to false
lockRequestTimeout: 1000
15 changes: 7 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"test": "test"
},
"scripts": {
"coverage": "istanbul cover node_modules/jasmine/bin/jasmine.js JASMINE_CONFIG_PATH=jasmine.json",
"coverage": "istanbul cover node_modules/jasmine/bin/jasmine.js JASMINE_CONFIG_PATH=jasmine.json -x **/pid-helper.js",
"watch": "node node_modules/watch/cli.js \"npm test\" ./src ./test",
"reporter": "node jasmine-runner",
"test": "jasmine JASMINE_CONFIG_PATH=jasmine.json",
Expand All @@ -26,27 +26,26 @@
},
"dependencies": {
"adm-zip": "^0.4.7",
"colors": "1.0.3",
"colors": "^1.0.3",
"commander": "^2.9.0",
"engine.io": "1.6.11",
"glob": "^7.0.5",
"js-yaml": "^3.6.1",
"mkdirp": "^0.5.1",
"needle": "^1.0.0"
"needle": "^1.1.0"
},
"devDependencies": {
"async": "^0.2.9",
"coveralls": "^2.11.9",
"coveralls": "^2.11.12",
"engine.io-client": "^1.6.11",
"grunt": "^1.0.1 ",
"grunt-release": "^0.14.0",
"istanbul": "^0.4.3",
"grunt": "^1.0.1",
"istanbul": "^0.4.4",
"jasmine": "^2.4.1",
"jasmine-spec-reporter": "^2.5.0",
"n0p3": "^1.0.2",
"nexe": "^1.1.2",
"proxyquire": "1.7.10",
"watch": "^0.19.1"
"watch": "^0.19.2"
},
"author": "deepstreamHub GmbH",
"license": "AGPL-3.0",
Expand Down
20 changes: 18 additions & 2 deletions src/cluster/cluster-registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const EventEmitter = require( 'events' ).EventEmitter;
* @emits add <serverName>
* @emits remove <serverName>
*/
module.exports = class ClusterRegistry extends EventEmitter{
module.exports = class ClusterRegistry extends EventEmitter {

/**
* Creates the class, initialises all intervals and publishes the
Expand Down Expand Up @@ -61,7 +61,11 @@ module.exports = class ClusterRegistry extends EventEmitter{
action: C.ACTIONS.REMOVE,
data:[ this._options.serverName ]
});
this._options.messageConnector.unsubscribe( C.TOPIC.CLUSTER, this._onMessageFn );

// TODO: If a message connector doesn't close this is required to avoid an error
// being thrown during shutdown
//this._options.messageConnector.unsubscribe( C.TOPIC.CLUSTER, this._onMessageFn );

process.removeListener( 'beforeExit', this._leaveClusterFn );
process.removeListener( 'exit', this._leaveClusterFn );
clearInterval( this._publishInterval );
Expand All @@ -80,6 +84,18 @@ module.exports = class ClusterRegistry extends EventEmitter{
return Object.keys( this._nodes );
}

/**
* Returns true if this node is the cluster leader
* @return {Boolean} [description]
*/
isLeader() {
return this._options.serverName === this.getCurrentLeader();
}

/**
* Returns the name of the current leader
* @return {String}
*/
getCurrentLeader() {
var maxScore = 0,
serverName,
Expand Down
Loading

0 comments on commit ba2d0a2

Please sign in to comment.