Skip to content

Commit

Permalink
added re-subscribe mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
frozeman committed Jul 5, 2016
1 parent 778713a commit d5b73e9
Show file tree
Hide file tree
Showing 7 changed files with 436 additions and 39 deletions.
7 changes: 5 additions & 2 deletions lib/web3.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var Batch = require('./web3/batch');
var Property = require('./web3/property');
var HttpProvider = require('./web3/httpprovider');
var IpcProvider = require('./web3/ipcprovider');
var WebsocketProvider = require('./web3/websocketprovider');
var BigNumber = require('bignumber.js');


Expand All @@ -59,7 +60,8 @@ function Web3 (provider) {
};
this.providers = {
HttpProvider: HttpProvider,
IpcProvider: IpcProvider
IpcProvider: IpcProvider,
WebsocketProvider: WebsocketProvider
};
this._extend = extend(this);
this._extend({
Expand All @@ -70,7 +72,8 @@ function Web3 (provider) {
// expose providers on the class
Web3.providers = {
HttpProvider: HttpProvider,
IpcProvider: IpcProvider
IpcProvider: IpcProvider,
WebsocketProvider: WebsocketProvider
};

Web3.prototype.setProvider = function (provider) {
Expand Down
21 changes: 12 additions & 9 deletions lib/web3/formatters.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
You should have received a copy of the GNU Lesser General Public License
along with web3.js. If not, see <http://www.gnu.org/licenses/>.
*/
/**
/**
* @file formatters.js
* @author Marek Kotewicz <[email protected]>
* @author Fabian Vogelsteller <[email protected]>
Expand Down Expand Up @@ -81,7 +81,7 @@ var inputCallFormatter = function (options){
options[key] = utils.fromDecimal(options[key]);
});

return options;
return options;
};

/**
Expand All @@ -106,12 +106,12 @@ var inputTransactionFormatter = function (options){
options[key] = utils.fromDecimal(options[key]);
});

return options;
return options;
};

/**
* Formats the output of a transaction to its proper values
*
*
* @method outputTransactionFormatter
* @param {Object} tx
* @returns {Object}
Expand All @@ -130,7 +130,7 @@ var outputTransactionFormatter = function (tx){

/**
* Formats the output of a transaction receipt to its proper values
*
*
* @method outputTransactionReceiptFormatter
* @param {Object} receipt
* @returns {Object}
Expand All @@ -156,7 +156,7 @@ var outputTransactionReceiptFormatter = function (receipt){
* Formats the output of a block to its proper values
*
* @method outputBlockFormatter
* @param {Object} block
* @param {Object} block
* @returns {Object}
*/
var outputBlockFormatter = function(block) {
Expand Down Expand Up @@ -184,7 +184,7 @@ var outputBlockFormatter = function(block) {

/**
* Formats the input of a log
*
*
* @method inputLogFormatter
* @param {Object} log object
* @returns {Object} log
Expand Down Expand Up @@ -214,12 +214,15 @@ var inputLogFormatter = function(options) {
if(options.address && !utils.isAddress(options.address))
throw new Error('The given address is not valid!');

// if(options.address)
// options.address = options.address.toLowerCase();

return options;
};

/**
* Formats the output of a log
*
*
* @method outputLogFormatter
* @param {Object} log object
* @returns {Object} log
Expand Down Expand Up @@ -260,7 +263,7 @@ var inputPostFormatter = function(post) {
return (topic.indexOf('0x') === 0) ? topic : utils.fromUtf8(topic);
});

return post;
return post;
};

/**
Expand Down
54 changes: 45 additions & 9 deletions lib/web3/ipcprovider.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var IpcProvider = function (path, net) {
this.responseCallbacks = {};
this.notificationCallbacks = [];
this.path = path;

this.connection = net.connect({path: this.path});

this.addDefaultEvents();
Expand Down Expand Up @@ -79,17 +79,29 @@ Will add the error and end event to timeout existing calls
IpcProvider.prototype.addDefaultEvents = function(){
var _this = this;

this.connection.on('connect', function(){
console.log('->>CONCONCOCN');
});

this.connection.on('error', function(){
_this._timeout();
});

this.connection.on('end', function(){
this.connection.on('end', function(e){
_this._timeout();

console.log('->>ENENENENENED');

// inform notifications
_this.notificationCallbacks.forEach(function (callback) {
if (utils.isFunction(callback))
callback(new Error('IPC socket connection closed'));
});
});

this.connection.on('timeout', function(){
_this._timeout();
});
});
};

/**
Expand All @@ -101,7 +113,7 @@ Will parse the response and make an array out of it.
IpcProvider.prototype._parseResponse = function(data) {
var _this = this,
returnValues = [];

// DE-CHUNKER
var dechunkedData = data
.replace(/\}[\n\r]?\{/g,'}|--|{') // }{
Expand Down Expand Up @@ -175,6 +187,14 @@ IpcProvider.prototype._timeout = function() {
}
};

/**
Try to reconnect
@method reconnect
*/
IpcProvider.prototype.reconnect = function() {
this.connection.connect({path: this.path});
};

/**
Check if the current connection is still valid.
Expand Down Expand Up @@ -205,7 +225,7 @@ IpcProvider.prototype.send = function (payload) {
try {
result = JSON.parse(data);
} catch(e) {
throw errors.InvalidResponse(data);
throw errors.InvalidResponse(data);
}

return result;
Expand All @@ -229,7 +249,7 @@ IpcProvider.prototype.sendAsync = function (payload, callback) {
Subscribes to provider events.provider
@method on
@param {String} type 'notifcation', 'connect', 'error', 'end' or 'data'
@param {String} type 'notification', 'connect', 'error', 'end' or 'data'
@param {Function} callback the callback to call
*/
IpcProvider.prototype.on = function (type, callback) {
Expand All @@ -248,11 +268,26 @@ IpcProvider.prototype.on = function (type, callback) {
}
};

/**
Subscribes to provider events.provider
@method on
@param {String} type 'connect', 'error', 'end' or 'data'
@param {Function} callback the callback to call
*/
IpcProvider.prototype.once = function (type, callback) {

if(typeof callback !== 'function')
throw new Error('The second parameter callback must be a function.');

this.connection.once(type, callback);
};

/**
Removes event listener
@method removeListener
@param {String} type 'notifcation', 'connect', 'error', 'end' or 'data'
@param {String} type 'notification', 'connect', 'error', 'end' or 'data'
@param {Function} callback the callback to call
*/
IpcProvider.prototype.removeListener = function (type, callback) {
Expand All @@ -276,7 +311,7 @@ IpcProvider.prototype.removeListener = function (type, callback) {
Removes all event listeners
@method removeAllListeners
@param {String} type 'notifcation', 'connect', 'error', 'end' or 'data'
@param {String} type 'notification', 'connect', 'error', 'end' or 'data'
*/
IpcProvider.prototype.removeAllListeners = function (type) {
switch(type){
Expand All @@ -291,7 +326,7 @@ IpcProvider.prototype.removeAllListeners = function (type) {
};

/**
Resetes the providers, clears all callbacks
Resets the providers, clears all callbacks
@method reset
*/
Expand All @@ -301,6 +336,7 @@ IpcProvider.prototype.reset = function () {

this.connection.removeAllListeners('error');
this.connection.removeAllListeners('end');
this.connection.removeAllListeners('timeout');

this.addDefaultEvents();
};
Expand Down
32 changes: 18 additions & 14 deletions lib/web3/requestmanager.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
You should have received a copy of the GNU Lesser General Public License
along with web3.js. If not, see <http://www.gnu.org/licenses/>.
*/
/**
/**
* @file requestmanager.js
* @author Jeffrey Wilcke <[email protected]>
* @author Marek Kotewicz <[email protected]>
Expand Down Expand Up @@ -70,11 +70,13 @@ RequestManager.prototype.send = function (data) {
* @param {Function} callback
*/
RequestManager.prototype.sendAsync = function (data, callback) {
callback = callback || function(){};

if (!this.provider) {
return callback(errors.InvalidProvider());
}
var payload = Jsonrpc.getInstance().toPayload(data.method, data.params);
this.provider.sendAsync(payload, function (err, result) {
this.provider.sendAsync(payload, function (err, result) {
if (err) {
return callback(err);
}
Expand Down Expand Up @@ -110,7 +112,7 @@ RequestManager.prototype.sendBatch = function (data, callback) {
}

callback(err, results);
});
});
};


Expand All @@ -119,9 +121,11 @@ RequestManager.prototype.sendBatch = function (data, callback) {
*
* @method addSubscription
* @param {String} id the subscription id
* @param {String} name the subscription name
* @param {String} type the subscription namespace (eth, personal, etc)
* @param {Function} callback the callback to call for incoming notifications
*/
RequestManager.prototype.addSubscription = function (name, type, id, callback) {
RequestManager.prototype.addSubscription = function (id, name, type, callback) {
if(this.provider.on) {
this.subscriptions[id] = {
callback: callback,
Expand Down Expand Up @@ -149,16 +153,10 @@ RequestManager.prototype.removeSubscription = function (id, callback) {
this.sendAsync({
method: this.subscriptions[id].type + '_unsubscribe',
params: [id]
}, function(err, result){

if(!err) {
delete _this.subscriptions[id];
}

if(utils.isFunction(callback))
callback(err, result);
});
}, callback);

// remove subscription
delete _this.subscriptions[id];
}
};

Expand All @@ -179,10 +177,16 @@ RequestManager.prototype.setProvider = function (p) {

// listen to incoming notifications
if(this.provider && this.provider.on) {
this.provider.on('notification', function(err, result){
this.provider.on('notification', function requestManagerNotification(err, result){
if(!err) {
if(_this.subscriptions[result.params.subscription] && _this.subscriptions[result.params.subscription].callback)
_this.subscriptions[result.params.subscription].callback(null, result.params.result);
} else {

Object.keys(_this.subscriptions).forEach(function(id){
if(_this.subscriptions[id].callback)
_this.subscriptions[id].callback(err);
});
}
});
}
Expand Down
Loading

0 comments on commit d5b73e9

Please sign in to comment.