diff --git a/packages/web3-core-requestmanager/src/index.js b/packages/web3-core-requestmanager/src/index.js index 4fc48d493c7..d07c0cd9fb0 100644 --- a/packages/web3-core-requestmanager/src/index.js +++ b/packages/web3-core-requestmanager/src/index.js @@ -103,13 +103,16 @@ RequestManager.prototype.setProvider = function (p, net) { _this.subscriptions[result.params.subscription].callback(null, result.params.result); } }); - // TODO add error, end, timeout, connect?? - // this.provider.on('error', function requestManagerNotification(result){ - // Object.keys(_this.subscriptions).forEach(function(id){ - // if(_this.subscriptions[id].callback) - // _this.subscriptions[id].callback(err); - // }); - // } + + // notify all subscriptions about the error condition + this.provider.on('error', function (event) { + Object.keys(_this.subscriptions).forEach(function(id){ + if(_this.subscriptions[id] && _this.subscriptions[id].callback) + _this.subscriptions[id].callback(event.code || new Error('Provider error')); + }); + }); + + // TODO add end, timeout, connect?? } }; @@ -205,17 +208,20 @@ RequestManager.prototype.addSubscription = function (id, name, type, callback) { * @param {Function} callback fired once the subscription is removed */ RequestManager.prototype.removeSubscription = function (id, callback) { - var _this = this; - if(this.subscriptions[id]) { + var type = this.subscriptions[id].type; + + // remove subscription first to avoid reentry + delete this.subscriptions[id]; + // then, try to actually unsubscribe this.send({ - method: this.subscriptions[id].type + '_unsubscribe', + method: type + '_unsubscribe', params: [id] }, callback); - - // remove subscription - delete _this.subscriptions[id]; + } else if (typeof callback === 'function') { + // call the callback if the subscription was already removed + callback(null); } }; diff --git a/packages/web3-core-subscriptions/src/subscription.js b/packages/web3-core-subscriptions/src/subscription.js index 92d24551b45..9ec301f1615 100644 --- a/packages/web3-core-subscriptions/src/subscription.js +++ b/packages/web3-core-subscriptions/src/subscription.js @@ -272,32 +272,18 @@ Subscription.prototype.subscribe = function() { _this.callback(null, output, _this); }); } else { - // unsubscribe, but keep listeners - _this.options.requestManager.removeSubscription(_this.id); - - // re-subscribe, if connection fails - if(_this.options.requestManager.provider.once) { - _this._reconnectIntervalId = setInterval(function () { - // TODO check if that makes sense! - if (_this.options.requestManager.provider.reconnect) { - _this.options.requestManager.provider.reconnect(); - } - }, 500); - - _this.options.requestManager.provider.once('connect', function () { - clearInterval(_this._reconnectIntervalId); - _this.subscribe(_this.callback); - }); - } - _this.emit('error', err); - - // call the callback, last so that unsubscribe there won't affect the emit above - _this.callback(err, null, _this); + _this._resubscribe(err); } }); + + // just in case the provider reconnects silently, resubscribe over the new connection + if (_this.options.requestManager.provider.once) { + _this.options.requestManager.provider.once('connect', function () { + _this._resubscribe(); + }); + } } else { - _this.callback(err, null, _this); - _this.emit('error', err); + _this._resubscribe(err); } }); @@ -305,4 +291,38 @@ Subscription.prototype.subscribe = function() { return this; }; +Subscription.prototype._resubscribe = function (err) { + var _this = this; + + // unsubscribe + this.options.requestManager.removeSubscription(this.id); + + // re-subscribe, if connection fails + if(this.options.requestManager.provider.once && !_this._reconnectIntervalId) { + this._reconnectIntervalId = setInterval(function () { + // TODO check if that makes sense! + if (_this.options.requestManager.provider.reconnect) { + _this.options.requestManager.provider.reconnect(); + } + }, 500); + + this.options.requestManager.provider.once('connect', function () { + clearInterval(_this._reconnectIntervalId); + _this._reconnectIntervalId = null; + + // delete id to keep the listeners on subscribe + _this.id = null; + + _this.subscribe(_this.callback); + }); + } + + if (err) { + this.emit('error', err); + } + + // call the callback, last so that unsubscribe there won't affect the emit above + this.callback(err, null, this); +}; + module.exports = Subscription; diff --git a/packages/web3-providers-ws/README.md b/packages/web3-providers-ws/README.md index 4be274764be..3cf2c7b640a 100644 --- a/packages/web3-providers-ws/README.md +++ b/packages/web3-providers-ws/README.md @@ -32,8 +32,9 @@ var Web3WsProvider = require('web3-providers-ws'); var options = { timeout: 30000, - headers: { authorization: 'Basic username:password' } -}; // set a custom timeout at 30 seconds, and credentials (you can also add the credentials to the URL: ws://username:password@localhost:8546) + headers: { authorization: 'Basic username:password' }, + autoReconnect: true +}; // set a custom timeout at 30 seconds, credentials (you can also add the credentials to the URL: ws://username:password@localhost:8546), and enable WebSocket auto-reconnection var ws = new Web3WsProvider('ws://localhost:8546', options); ``` diff --git a/packages/web3-providers-ws/package.json b/packages/web3-providers-ws/package.json index 50519e7d119..7d916a550ac 100644 --- a/packages/web3-providers-ws/package.json +++ b/packages/web3-providers-ws/package.json @@ -15,10 +15,11 @@ "dependencies": { "underscore": "1.9.1", "web3-core-helpers": "1.2.2", - "websocket": "github:web3-js/WebSocket-Node#polyfill/globalThis" + "websocket": "github:web3-js/WebSocket-Node#polyfill/globalThis", + "websocket-reconnector": "1.1.1" }, "devDependencies": { "definitelytyped-header-parser": "^1.0.1", "dtslint": "0.4.2" } -} +} \ No newline at end of file diff --git a/packages/web3-providers-ws/src/index.js b/packages/web3-providers-ws/src/index.js index b9a271f5cd6..8501a2fd28d 100644 --- a/packages/web3-providers-ws/src/index.js +++ b/packages/web3-providers-ws/src/index.js @@ -25,6 +25,7 @@ var _ = require('underscore'); var errors = require('web3-core-helpers').errors; var Ws = require('websocket').w3cwebsocket; +var WsReconnector = require('websocket-reconnector'); var isNode = Object.prototype.toString.call(typeof process !== 'undefined' ? process : 0) === '[object process]'; @@ -86,6 +87,11 @@ var WebsocketProvider = function WebsocketProvider(url, options) { // https://github.com/theturtle32/WebSocket-Node/blob/master/docs/WebSocketClient.md#connectrequesturl-requestedprotocols-origin-headers-requestoptions var requestOptions = options.requestOptions || undefined; + // Enable automatic reconnection wrapping `Ws` with reconnector + if (options.autoReconnect) { + Ws = WsReconnector(Ws); + } + // When all node core implementations that do not have the // WHATWG compatible URL parser go out of service this line can be removed. if (parsedURL.auth) { @@ -238,7 +244,13 @@ WebsocketProvider.prototype._addResponseCallback = function(payload, callback) { setTimeout(function () { if (_this.responseCallbacks[id]) { _this.responseCallbacks[id](errors.ConnectionTimeout(_this._customTimeout)); + delete _this.responseCallbacks[id]; + + // try to reconnect + if (_this.connection.reconnect) { + _this.connection.reconnect(); + } } }, this._customTimeout); } @@ -305,15 +317,15 @@ WebsocketProvider.prototype.on = function (type, callback) { break; case 'connect': - this.connection.onopen = callback; + this.connection.addEventListener('open', callback); break; case 'end': - this.connection.onclose = callback; + this.connection.addEventListener('close', callback); break; case 'error': - this.connection.onerror = callback; + this.connection.addEventListener('error', callback); break; // default: @@ -322,7 +334,26 @@ WebsocketProvider.prototype.on = function (type, callback) { } }; -// TODO add once +/** + Subscribes to provider only once + + @method once + @param {String} type 'notifcation', 'connect', 'error', 'end' or 'data' + @param {Function} callback the callback to call + */ +WebsocketProvider.prototype.once = function (type, callback) { + var _this = this; + + function onceCallback(event) { + setTimeout(function () { + _this.removeListener(type, onceCallback); + }, 0); + + callback(event); + } + + this.on(type, onceCallback); +}; /** Removes event listener @@ -342,7 +373,17 @@ WebsocketProvider.prototype.removeListener = function (type, callback) { }); break; - // TODO remvoving connect missing + case 'connect': + this.connection.removeEventListener('open', callback); + break; + + case 'end': + this.connection.removeEventListener('close', callback); + break; + + case 'error': + this.connection.removeEventListener('error', callback); + break; // default: // this.connection.removeListener(type, callback);