diff --git a/get.txt b/get.txt deleted file mode 100644 index 49cbe39..0000000 --- a/get.txt +++ /dev/null @@ -1,4 +0,0 @@ -{"type":"GET","key":"foo", -"nodeID":"75358e37c9d268526907adf2acfe3c25d548e392", -"address": "10.100.98.60", -"port": "10001"} diff --git a/lib/constants.js b/lib/constants.js index 6926eb6..e59af6a 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -29,6 +29,6 @@ exports.T_REFRESH = 3600; exports.T_REPLICATE = 3600; exports.T_REPUBLISH = 86410; -exports.T_RESPONSETIMEOUT = 1000; // how long to wait for a reply before deciding node is dead +exports.T_RESPONSETIMEOUT = 3000; // how long to wait for a reply before deciding node is dead Object.freeze(exports); diff --git a/lib/knode.js b/lib/knode.js index fa254fd..62bf2cf 100644 --- a/lib/knode.js +++ b/lib/knode.js @@ -29,26 +29,141 @@ var rpc = require('./rpc'); var Bucket = require('./bucket').Bucket; var _ = require('underscore'); _.str = require('underscore.string'); +var EventEmitter = require('eventemitter2').EventEmitter2; + // abbreviate message utilities var MC = util.message_contact; var MID = util.message_rpcID; -exports.KNode = function(desc) { +var streamThresholdSize = 1250; //MTU is 1400, allow room for extra JSON fields +var MIN_PING_INTERVAL = 2500; //milliseconds + +exports.KNode = function(options, seeds, debug) { + + if (!options.id) + options.id = options.address + '_' + options.port; + // TODO: probably want to persist nodeID - this.self = _.defaults({ nodeID: util.nodeID(desc.address, desc.port) }, desc); + this.self = _.defaults({ + //nodeID: util.nodeID(options.address, options.port) + nodeID: util.id(options.id) + }, options); + + Object.freeze(this.self); + this.streamPort = options.streamPort; + + this.peers = { }; + this.peersDead = { }; //key = "address:port", value = unix timestamp last tried + + this.pings = { }; + + this._knownKeys = { }; this._storage = {}; // object treated as an array this._buckets = {}; - this._rpc = new rpc.RPC(this.self, _.bind(this._onMessage, this)); + + this.id = options.id || this.self.nodeID; + + var that = this; + this._rpc = new rpc.RPC(this.self, _.bind(this._onMessage, this), this.streamPort, function getValue(k) { + //get value + return that._storage[k]; + }, function(m) { + //on stream end + //that.emit('get', m.key, m.value, m); + that.emit('set', that.unhashKey(m.key) || m.key, m.value, m); + that.emit('set:' + (that.unhashKey(m.key) || m.key), m.value, m); + }); + this.socket = this._rpc._socket; + + this.on('contact:update', function(c) { + if ((c.nodeID != that.self.nodeID) || (c.address!=that.self.address) || (c.port != that.self.port)) { + //ping to discover their ID + if ((!c.id) || (c.id==='undefined') || (c.id == c.nodeID)) { + that.ping(c); + } + else { + if ((that.peers[c.id]==undefined) || (that.peers[c.id] != c.nodeID)) { + that.peers[c.id] = c.nodeID; + that.emit('contact:add', c); + } + } + } + }); + + if (debug) { + this.on('contact:add', function(c) { + console.log(that.id, 'contact add', c.id, c.nodeID); + }); + /*this.on('contact:update', function(c) { + console.log(that.id, 'contact update', c.id, c.nodeID); + });*/ + this.on('set', function(k, v, m) { + console.log(that.id, 'set', k, JSON.stringify(v).length + ' bytes', m.address, m.port); + }); + /*this.on('get', function(k, v, m) { + console.log(that.id, 'get', k, JSON.stringify(v).length + ' bytes', m.address, m.port); + });*/ + this.on('ping', function(message) { + console.log(that.id, 'recv ping', message.id, message.nodeID); + }); + this.on('pong', function(message) { + console.log(that.id, 'recv pong', message.id, message.nodeID); + }); + /*this.on('message', function(message) { + console.log(that.self.nodeID, 'message', message, JSON.stringify(message).length); + });*/ + } + + + that.connectPeer = function(address, port) { + that.connect(address, port, function(err) { + if (err) { + console.err('Connect Error: ' + err); + return; + } + //console.log(that.id, "Connected to", port); + //onConnect(node); + that.emit('connect', address, port); + }); + }; + + if (seeds) { + seeds.forEach(function(s) { + + var address, port; + if (typeof s == "number") { + address = options.address; + port = s; + } + else if (typeof s === "string"){ + var as = s.split(":"); + address = as[0]; + port = parseInt(as[1]); + } + + that.connectPeer(address, port); + }); + } + + } +require('util').inherits(exports.KNode, EventEmitter); + + + +//require('util').inherits(exports.P2P, EventEmitter); + exports.KNode.prototype._MSG = function(type, params) { // NOTE: always keep this.self last. This way users of _MSG // don't have to worry about accidentally overriding self properties - return _.extend({ type: type}, params, this.self); + + var m = _.extend({ type: type}, params, this.self); + return m; } exports.KNode.prototype._onMessage = function(message) { @@ -61,8 +176,11 @@ exports.KNode.prototype._onMessage = function(message) { this._updateContact(MC(message)); _.bind(action, this)(message); } - else - console.warn("Unknown message", message); + else { + console.warn("Unknown message", message, methodName); + } + + this.emit('message', message); } exports.KNode.prototype._onPing = function(message) { @@ -70,14 +188,54 @@ exports.KNode.prototype._onPing = function(message) { // if an outgoing message is present, piggyback the pong // onto it rather than sending it separately this._rpc.send(MC(message), this._MSG('PONG', {'replyTo': MID(message)})); -} + this.emit('ping', message); +}; +exports.KNode.prototype._onPong = function(message) { + this.emit('pong', message); + if (message.id) { + this.emit('contact:update', message); + } +}; + +exports.KNode.prototype.ping = function(contact) { + // this can be made more intelligent such that + // if an outgoing message is present, piggyback the pong + // onto it rather than sending it separately + + /*var now = Date.now(); + if (this.pings[contact]) { + if (now - this.pings[contact] < MIN_PING_INTERVAL ) + return; + } + this.pings[contact] = now;*/ + + var p = this._MSG('PING'); + p.id = this.id; + console.log('ping', contact); + this._rpc.send(contact, p); +}; + exports.KNode.prototype._updateContact = function(contact, cb) { - if (!contact) - return; + if (!contact) return; + if (contact.fromID == this.self.id) return; + + + /*if (!contact.nodeID) { + //overwrite with self settings + contact.address = this.self.address; + contact.port = this.self.port; + contact.nodeID = this.self.nodeID; + contact.id = this.self.id; + //this.ping + }*/ + + var callback = cb || function() {}; var bucketIndex = util.bucketIndex(this.self.nodeID, contact.nodeID); - assert.ok(bucketIndex < constants.B); + + //assert.ok(bucketIndex < constants.B); + if (!this._buckets[bucketIndex]) this._buckets[bucketIndex] = new Bucket(); @@ -89,10 +247,12 @@ exports.KNode.prototype._updateContact = function(contact, cb) { // move to the end of the bucket bucket.remove(contact); bucket.add(contact); + this.emit('contact:update', contact); callback(); } else if (bucket.size() < constants.K) { - bucket.add(contact); + bucket.add(contact); + this.emit('contact:update', contact); callback(); } else { @@ -102,41 +262,64 @@ exports.KNode.prototype._updateContact = function(contact, cb) { // add new contact, old one is dead bucket.removeIndex(0); bucket.add(contact); + this.emit('contact:update', contact); } else { - } + + } callback(); }, this) ); } } -// TODO: handle large values which -// won't fit in single UDP packets + +exports.KNode.prototype.unhashKey = function(hashedKey) { + return this._knownKeys[hashedKey]; +}; + exports.KNode.prototype._onStore = function(message) { if (!message.key || message.key.length !== constants.B/4) return; if (!message.value) return; - this._storage[message.key] = _.clone(message.value); - this._rpc.send(MC(message), this._MSG('STORE_REPLY', { + + this._storage[message.key] = _.clone(message.value); + + this.emit('set', this.unhashKey(message.key) || message.key, message.value, message); + + this._rpc.send(MC(message), this._MSG('STORE_REPLY', { 'replyTo': MID(message), - 'status': true + 's': 1 //success })); } // This is just to prevent Unknown message errors -exports.KNode.prototype._onStoreReply = function() {} +exports.KNode.prototype._onStorereply = function() {} -exports.KNode.prototype._onFindValue = function(message) { +exports.KNode.prototype._onFindvalue = function(message) { if (!message.key || message.key.length !== constants.B/4) return; if (this._storage.hasOwnProperty(message.key)) { - this._rpc.send(MC(message), this._MSG('FIND_VALUE_REPLY', { + var val = this._storage[message.key]; + + var valString = JSON.stringify(val); //TODO cache this and construct message from string manually + + var m = { 'replyTo': MID(message), - 'found': true, - 'value': this._storage[message.key] - })); + 'found': true + }; + + if (valString.length > streamThresholdSize) { + m.key = message.key; + m.streamPort = this.streamPort; + m.streamSize = valString.length; + val = null; + } + if (val) + m.value = val; + + this._rpc.send(MC(message), this._MSG('FIND_VALUE_REPLY', m)); } else { var messageContact = MC(message); @@ -144,7 +327,7 @@ exports.KNode.prototype._onFindValue = function(message) { return; var contacts = this._findClosestNodes(message.key, constants.K, MC(message).nodeID); - this._rpc.send(MC(message), this._MSG('FIND_NODE_REPLY', { + this._rpc.send(messageContact, this._MSG('FIND_NODE_REPLY', { 'replyTo': MID(message), 'contacts': contacts })); @@ -152,7 +335,9 @@ exports.KNode.prototype._onFindValue = function(message) { } exports.KNode.prototype._findClosestNodes = function(key, howMany, exclude) { - var contacts = []; + var that = this; + + var contacts = []; function addContact(contact) { if (!contact) return; @@ -162,7 +347,7 @@ exports.KNode.prototype._findClosestNodes = function(key, howMany, exclude) { if (contact.nodeID == exclude) return; - + contacts.push(contact); } @@ -221,7 +406,7 @@ exports.KNode.prototype._refreshBucket = function(bucketIndex, callback) { } // this is a primitive operation, no network activity allowed -exports.KNode.prototype._onFindNode = function(message) { +exports.KNode.prototype._onFindnode = function(message) { if (!message.key || message.key.length !== constants.B/4 || !MC(message)) return; @@ -237,12 +422,14 @@ exports.KNode.prototype._onFindNode = function(message) { // where type == 'VALUE' -> result is the value // type == 'NODE' -> result is [list of contacts] exports.KNode.prototype._iterativeFind = function(key, mode, cb) { + assert.ok(_.include(['NODE', 'VALUE'], mode)); var externalCallback = cb || function() {}; var closestNode = null, previousClosestNode = null; var closestNodeDistance = -1; var shortlist = this._findClosestNodes(key, constants.ALPHA, this.self.nodeID); + var contacted = {}; var foundValue = false; var value = null; @@ -255,14 +442,26 @@ exports.KNode.prototype._iterativeFind = function(key, mode, cb) { } closestNodeDistance = util.distance(key, closestNode.nodeID); + var that = this; + + //console.error( new Error().stack ); + + function xyz(alphaContacts) { // clone because we're going to be modifying inside + async.forEach(alphaContacts, _.bind(function(contact, callback) { this._rpc.send(contact, this._MSG('FIND_'+mode, { key: key }), _.bind(function(err, message) { if (err) { - console.log("ERROR in iterativeFind"+_.str.titleize(mode)+" send to", contact); + if (err.code === 'ETIMEDOUT') { + that.peersDead[ contact.address+':'+contact.port ] = Date.now(); + } + else { + console.log("ERROR in iterativeFind"+_.str.titleize(mode)+" send to", contact, err); + } + shortlist = _.reject(shortlist, function(el) { return el.nodeID == contact.nodeID; }); } else { @@ -310,7 +509,7 @@ exports.KNode.prototype._iterativeFind = function(key, mode, cb) { if (distances.length >= 1) { var closestWithoutValue = distances[0].contact; - console.log("Closest is ", closestWithoutValue); + //console.log("Closest is ", closestWithoutValue); var message = this._MSG('STORE', { 'key': key, 'value': value @@ -329,8 +528,9 @@ exports.KNode.prototype._iterativeFind = function(key, mode, cb) { } var remain = _.reject(shortlist, function(el) { return contacted[el.nodeID]; }) - if (remain.length == 0) + if (remain.length == 0) { externalCallback(null, 'NODE', shortlist); + } else _.bind(xyz, this)(_.first(remain, constants.ALPHA)); }, this)); @@ -362,69 +562,116 @@ exports.KNode.prototype._iterativeFindValue = function(key, cb) { } exports.KNode.prototype.toString = function() { - return "Node " + this.self.nodeID + ":" + this.self.address + ":" + this.self.port; + return "Node " + this.id + ' (' + this.self.nodeID + "):" + this.self.address + ":" + this.self.port; } exports.KNode.prototype.debug = function() { console.log(this.toString()); - _(this._buckets).each(function(bucket, j) { - console.log("bucket", j, bucket.toString()); - }); - console.log("store", this._storage); -} + console.log("peers:", this.peers); + if (this._buckets.length > 0) { + var buckets = ''; + _(this._buckets).each(function(bucket, j) { + buckets += ('b' + j +'=' + JSON.stringify(bucket)) + ' '; + }); + console.log(buckets); + } + console.log("store:", JSON.stringify(this._storage)); + console.log("keys:", this._knownKeys); + console.log("data in=" + this.socket.bytesRead + " out=" + this.socket.bytesWritten); +}; /***** Public API *****/ exports.KNode.prototype.connect = function(address, port, cb) { var callback = cb || function() {}; assert.ok(this.self.nodeID); - var contact = util.make_contact(address, port); - - var refreshBucketsFartherThanClosestKnown = function(type, contacts, asyncCallback) { - // FIXME: Do we update buckets or does iterativeFindNode do it? - var leastBucket = _.min(_.keys(this._buckets)); - var bucketsToRefresh = _.filter(_.keys(this._buckets), - function(num) { return num >= leastBucket; }); - var queue = async.queue(_.bind(this._refreshBucket, this), 1); - _.each(bucketsToRefresh, function(bucketId) { - // wrapper is required because the each iterator is passed - // 3 arguments (element, index, list) and queue.push interprets - // the second argument as a callback - queue.push(bucketId); - }); - asyncCallback(); // success - } + var contact = util.make_contact(address, port, this.id); + + this.ping(contact); + + async.waterfall([ + _.bind(this._updateContact, this, contact), + _.bind(this._iterativeFindNode, this, this.self.nodeID), + _.bind(function(type, contacts, callback) { + + // FIXME: Do we update buckets or does iterativeFindNode do it? + var leastBucket = _.min(_.keys(this._buckets)); + var bucketsToRefresh = _.filter(_.keys(this._buckets), function(num) { return num >= leastBucket; }); + var queue = async.queue(_.bind(this._refreshBucket, this), 1); + _.each(bucketsToRefresh, function(bucketId) { + // wrapper is required because the each iterator is passed + // 3 arguments (element, index, list) and queue.push interprets + // the second argument as a callback + queue.push(bucketId); + }); - async.waterfall([ - _.bind(this._updateContact, this, contact), // callback is invoked with no arguments - _.bind(this._iterativeFindNode, this, this.self.nodeID), // callback is invoked with - // type (NODE) and shortlist, - // which gets passed on to - // refreshBucketsFartherThanClosestKnown - _.bind(refreshBucketsFartherThanClosestKnown, this) // callback is invoked with no arguments - ], callback); -} + if (cb) + cb(null); + }, this) + ]); +}; -exports.KNode.prototype.get = function(key, cb) { +exports.KNode.prototype.get = function(key, cb /*store*/) { var callback = cb || function() {}; - this._iterativeFindValue(util.id(key), callback); -} + var hashedKey = util.id(key); + + var that = this; + /*if (store)*/ { + var oldCallback = callback; + callback = function(err, v) { + if (!err) { + that._storage[hashedKey] = v; + } + that.emit('set:'+key, v); + oldCallback(err, v); + }; + } + + this._iterativeFindValue(hashedKey, callback); +}; + +/** know: be aware of certain keys */ +exports.KNode.prototype.know = function(keys) { + var that = this; + keys.forEach(function(k) { + that._knownKeys[util.id(k)] = k; + }); +}; + exports.KNode.prototype.set = function(key, value, cb) { var callback = cb || function() {}; + var hashedKey = util.id(key); var message = this._MSG('STORE', { - 'key': util.id(key), + 'key': hashedKey, 'value': value }); - this._iterativeFindNode(util.id(key), _.bind(function(err, type, contacts) { + this.know([key]); + + if (_.isEqual( this._storage[hashedKey], value )) + return; + + this._storage[hashedKey] = value; + + var that = this; + + var sent = { }; //this avoids duplicate sends to contacts produced by iterativeFindNode. a better way to fix this is to make sure iterativeFindNode doesnt emit duplicates + + this._iterativeFindNode(hashedKey, _.bind(function(err, type, contacts) { if (err) { callback(err); return; } + + async.forEach(contacts, _.bind(function(contact, asyncCb) { - this._rpc.send(contact, message, function() { - // TODO handle error - asyncCb(null); - }); + if (sent[contact.nodeID]==undefined) { + + this._rpc.send(contact, message, function() { + // TODO handle error + asyncCb(null); + }); + sent[contact.nodeID] = true; + } }, this), callback); }, this)); } diff --git a/lib/rpc.js b/lib/rpc.js index c8103f6..827cf69 100644 --- a/lib/rpc.js +++ b/lib/rpc.js @@ -20,46 +20,117 @@ */ "use strict"; +/*var compress = require('compress-buffer').compress; +var uncompress = require('compress-buffer').uncompress;*/ + var assert = require('assert'); var node_constants = require('constants'); var constants = require('./constants'); var dgram = require('dgram'); var _ = require('underscore'); var hat = require('hat'); +var utp = require('utp'); +var util = require('./util'); -exports.RPC = function(bindAddress, callback) { +exports.RPC = function(bindAddress, callback, streamPort, getValue, onStreamEnd) { this._socket = dgram.createSocket('udp4', _.bind(this._onMessage, this)); + this._socket.bytesRead = 0; + this._socket.bytesWritten = 0; + this._socket.streamRead = undefined; + this._socket.streamWritten = 0; + this._socket.bind(bindAddress.port, bindAddress.address || undefined); this._callback = callback; this._rack = hat.rack(constants.B); this._awaitingReply = {}; - + this.incomingStreams = { }; //index is the key that we are receiving, to avoid receiving from multiple + var that = this; + this.onStreamEnd = onStreamEnd; + // every node requires only one of these this way setInterval(_.bind(this._expireRPCs, this), constants.T_RESPONSETIMEOUT + 5); + + if (streamPort) { + var server = utp.createServer(function(socket) { + /* //i don think this will work. + //instrument socket + var prevSend = socket.socket.send; + socket.socket.send = function(message, x, length, port, host) { + prevSend.apply(this,[message,x,length,port,host]); + that._socket.streamWritten += length; + };*/ + + socket.on('data', function(key) { + //TODO use a WeakMap cache for already JSONized values + var valueString = JSON.stringify(getValue(key)); + //console.log(socket.port, bindAddress.port, 'client requests stream ' + key + ' of length ' + valueString.length + ' bytes'); + socket.write(valueString); + socket.end(); + }); + socket.on('end', function() { + socket.end(); + }); + }); + + server.listen(streamPort); + } } + + exports.RPC.prototype.send = function(node, message, callback) { + /*if (_.isFunction(callback)) { + if (this._awaitingReply[message.rpcID]) { + console.error('already sent message with same rpcID', new Error().stack); + return; + } + }*/ + if (!node) return; assert(node.port); assert(node.address); _.extend(message, { rpcID: this._rack() }); - var data = new Buffer(JSON.stringify(message), 'utf8'); + + var data = new Buffer(util.messageToString(message), 'utf8'); + + //console.log('out', util.messageToString(message), node.address, node.port); + + + /* + var before = data.length; + data = compress(data); + var after = data.length; + console.log(before, ' -> ', after);*/ + + /*zlib.gzip(data, function(err, cdata) { + console.log('compressed: ', data.length, cdata.length); + });*/ + this._socket.send(data, 0, data.length, node.port, node.address); + if (_.isFunction(callback)) { // only store rpcID if we are expecting a reply this._awaitingReply[message.rpcID] = { timestamp: Date.now(), callback: callback }; } + this._socket.bytesWritten += data.length; } + exports.RPC.prototype.close = function() { this._socket.close(); } -exports.RPC.prototype._onMessage = function(data, rinfo) { + +exports.RPC.prototype._onMessage = function(data, rinfo) { + this._socket.bytesRead += rinfo.size; + + //data = uncompress(data); + var message; - try { - message = JSON.parse(data.toString('utf8')); + try { + message = util.messageFromString(data.toString('utf8')); } catch (e) { + console.error(e); /* we simply drop the message, although this * reduces the reliability of the overall network, * we really don't want to implement a reliable @@ -67,10 +138,58 @@ exports.RPC.prototype._onMessage = function(data, rinfo) { */ return; } + + message.address = rinfo.address; + message.port = rinfo.port; + if (message.fromID) { + message.id = message.fromID; + delete message.fromID; + } + if (message.replyTo && this._awaitingReply.hasOwnProperty(message.replyTo)) { var cb = this._awaitingReply[message.replyTo].callback; delete this._awaitingReply[message.replyTo]; - cb(null, message); + + if ((message.type === 'FIND_VALUE_REPLY') && (!message.value)) { + //stream + //console.log('client connecting for stream'); + if (this.incomingStreams[message.key]) { + //console.log('avoided extra stream for key', message.key); + return; + } + + var client = this.incomingStreams[message.key] = utp.connect(message.streamPort, message.address); + var streamSize = message.streamSize; + var data = new Buffer(streamSize); + client.write(message.key); + client.end(); + + var that = this; + + var offset = 0; + + client.on('data', function(packet) { + packet.copy(data, offset); + offset += packet.length; + }); + client.on('end', function() { + var value = JSON.parse(data.toString('utf8')); + message.value = value; + client.end(); + delete that.incomingStreams[message.key]; + cb(null, message); + that.onStreamEnd(message); + }); + client.on('error', function(err) { + console.error('error streaming message', err, message); + delete that.incomingStreams[message.key]; + //cb(null, message); + return; + }); + } + else { + cb(null, message); + } return; } this._callback(message); diff --git a/lib/util.js b/lib/util.js index 658f4f7..7fec65d 100644 --- a/lib/util.js +++ b/lib/util.js @@ -38,9 +38,9 @@ exports.id = function(string) { return hash.digest('hex'); } -exports.nodeID = function(address, port) { +/*exports.nodeID = function(address, port) { return exports.id(address + ':' + port); -} +}*/ /** * Convert a 20 bit SHA1 sum (or general hex string) @@ -140,8 +140,22 @@ exports.message_contact = function(message) { if (!message.port || typeof message.port !== 'number') return null; + + var c = { nodeID: message.nodeID, address: message.address, port: message.port }; + + //plaintext ID of the peer + + c.id = message.id || c.nodeID; + + + return c; +} - return { nodeID: message.nodeID, address: message.address, port: message.port }; +exports.make_contact = function(address, port, fromID) { + var c = { /*nodeID: exports.nodeID(address, port),*/ address: address, port: port }; + if (fromID) + c.fromID = fromID; + return c; } exports.message_rpcID = function(message) { @@ -150,6 +164,101 @@ exports.message_rpcID = function(message) { return message.rpcID; } -exports.make_contact = function(address, port) { - return { nodeID: exports.nodeID(address, port), address: address, port: port }; + +function messageToStringBasic(m) { + /*var n = _.clone(m); + delete n.nodeID; + delete n.address; + delete n.port; + delete n.type; + n['_'] = m.nodeID + ":" + m.address + ":" + m.port + ":" + m.type; + var s = JSON.stringify(n); + console.log(); + console.log(_.keys(m).length, _.keys(messageFromString(s)).length); + console.log(); + return s;*/ + return JSON.stringify(m); +} +function messageFromStringBasic(s) { + /*var m = JSON.parse(s); + var a = m['_'].split(':'); + assert(a.length == 4); + delete m['_']; + m.nodeID = a[0]; + m.address = a[1]; + m.port = a[2]; + m.type = a[3]; + console.log(); + console.log(s, messageToString(m)); + console.log(); + return m;*/ + + return JSON.parse(s); +} + +/* +exports.messageToString = function(m) { + return JSON.stringify(m); +}; +exports.messageFromString = function(s) { + return JSON.parse(s); +}; +*/ + +exports.messageToString = function(m) { + var n = _.clone(m); + + n['_'] = [/*n.address,n.port,*/hex2base64(n.nodeID),n.type].join('|'); + + delete n.address; + delete n.port; + delete n.nodeID; + delete n.type; + if (n.replyTo) + n.replyTo = hex2base64(n.replyTo); + if (n.rpcID) + n.rpcID = hex2base64(n.rpcID); + if (n.key) + n.key = hex2base64(n.key); + + /*n['_'] = m.nodeID + ":" + m.address + ":" + m.port + ":" + m.type; + var s = JSON.stringify(n); + console.log(); + console.log(_.keys(m).length, _.keys(messageFromString(s)).length); + console.log(); + return s;*/ + var s = JSON.stringify(n); + return s; +}; +exports.messageFromString = function(s) { + + var m = JSON.parse(s); + var a = m['_'].split('|'); + delete m['_']; + //m.address = a[0]; + //m.port = parseInt(a[1]); + m.nodeID = base642hex(a[0]); + m.type = a[1]; + if (m.replyTo) m.replyTo = base642hex(m.replyTo); + if (m.rpcID) m.rpcID = base642hex(m.rpcID); + if (m.key) m.key = base642hex(m.key); + /* + m.nodeID = a[0]; + m.address = a[1]; + m.port = a[2]; + m.type = a[3]; + console.log(); + console.log(s, messageToString(m)); + console.log(); + return m;*/ + + return m; +}; + + +function hex2base64(h) { + return new Buffer(h, 'hex').toString('base64') +} +function base642hex(h) { + return new Buffer(h, 'base64').toString('hex') } diff --git a/package.json b/package.json index 7e08dfc..dec931f 100644 --- a/package.json +++ b/package.json @@ -1,25 +1,39 @@ { - "name": "kademlia" -, "version": "0.2.0" -, "description": "An implementation of the Kademlia DHT in node.js" -, "keywords": ["dht", "kademlia", "p2p", "bittorrent"] -, "author": { - "name": "Nikhil Marathe" - , "email": "nsm.nikhil@gmail.com" - , "url": "http://nikhilism.com" + "name": "p2pdht", + "version": "0.2.1", + "description": "An implementation of the Kademlia DHT in node.js, with UDP streaming for large data", + "keywords": ["dht", "kademlia", "p2p", "bittorrent"], + "contributors": [ + { + "name": "Nikhil Marathe", + "email": "nsm.nikhil@gmail.com", + "url": "http://nikhilism.com" + }, + { + "name" : "SeH", + "email" : "automenta@gmail.com", + "url" : "http://automenta.com" + } + ], + "files": ["lib"], + "main": "lib/knode", + "repository": { + "type": "git", + "url": "http://github.com/automenta/kademlia.git" + }, + "dependencies": { + "lodash": "", + "hat": "", + "underscore.string": "", + "async": "", + "utp": "", + "eventemitter2":"" + }, + "devDependencies": { + "vows": ">0.5", + "Faker": ">=0.1.0" + }, + "engines": { + "node": ">= 0.4.10" } -, "files": ["lib"] -, "main": "lib/knode" -, "repository": { "type": "git", "url": "http://github.com/nikhilm/kademlia.git" } -, "dependencies": { - "underscore": ">1.1.0" - , "hat": ">=0.0.3" - , "underscore.string": "latest" - , "async": ">=0.1.0" - } -, "devDependencies": { - "vows": ">0.5" - , "Faker": ">=0.1.0" - } -, "engines": { "node": ">= 0.4.10" } } diff --git a/test/getset.js b/test/getset.js new file mode 100644 index 0000000..6dfd55c --- /dev/null +++ b/test/getset.js @@ -0,0 +1,47 @@ +var dht = require('../') + +var host = '127.0.0.1'; + +function initPeer(portNumber, targetPort, onConnect) { + var node = new dht.KNode({ address: host, port: portNumber }); + + console.log(node.self); + if (targetPort) { + console.log('connecting...', targetPort); + node.connect(host, targetPort, function(err) { + if (err) { + console.err('CONNECT: ' + err); + return; + } + console.log("Successfully connected to", targetPort); + onConnect(node); + }); + } + //node.debug(); +} + +initPeer(12004); + +console.log('-----------'); + +setTimeout(function() { + initPeer(12005, 12004, function(node) { + + node.set('foo', 'bar', function(err) { + if (err) { + console.error('set error', err); + return; + } + + node.get('foo', function(err, data) { + if (err) { console.error('get error', err); return; } + console.log('get', data); + + //node.debug(); + }); + }); + + }); +}, 1000); + + diff --git a/test/getset_udpstream.js b/test/getset_udpstream.js new file mode 100644 index 0000000..daf178c --- /dev/null +++ b/test/getset_udpstream.js @@ -0,0 +1,74 @@ +var dht = require('../') + +var host = '127.0.0.1'; + +function initPeer(portNumber, targetPort, onConnect) { + var node = new dht.KNode({ id:('a'+portNumber), address: host, port: portNumber, streamPort: portNumber+100 },[],true); + node.know(['x']); + + console.log(node.self); + if (targetPort) { + console.log('connecting...', targetPort); + node.connect(host, targetPort, function(err) { + if (err) { + console.err('CONNECT: ' + err); + return; + } + console.log("Successfully connected to", targetPort); + }); + + + } + node.once('contact:add', function() { + if (onConnect) + onConnect(node); + }); + + //node.debug(); +} + +initPeer(12003); + + +setTimeout(function() { + initPeer(12004, 12003, function(node) { + + node.set('x', 'a', function(err) { + if (err) { console.error('set error', err); return; } + + node.get('x'); + node.once('set:x', function(v) { + console.log('a', 'get', JSON.stringify(v).length, ' bytes'); + }); + + }); + + }); +}, 500); + + + +console.log('-----------'); + +setTimeout(function() { + initPeer(12005, 12003, function(node) { + + var largeObject = []; + for (var i = 0; i < 8000; i++) { + largeObject.push('x'+i); + } + console.log('large obj size: ', JSON.stringify(largeObject).length); + + node.set('x', largeObject, function(err) { + if (err) { console.error('set error', err); return; } + + node.get('x'); + node.once('set:x', function(v, m) { + console.log('b', 'set', JSON.stringify(v).length, 'bytes'); + }); + }); + + }); +}, 1000); + + diff --git a/test/knode-test.js b/test/knode-test.js index 7b1deec..31ebb11 100644 --- a/test/knode-test.js +++ b/test/knode-test.js @@ -2,7 +2,7 @@ var vows = require('vows') , assert = require('assert'); -var _ = require('underscore'); +var _ = require('lodash'); var Faker = require('Faker'); var constants = require('../lib/constants'); @@ -64,7 +64,7 @@ vows.describe('KNode').addBatch({ 'replies with success on proper store': function(err, obj) { assert.equal(obj.node._storage[util.id('old silver')], 'opens the cupboard underneath the stairs'); - assert.isTrue(obj.message.status); + assert.isTrue(obj.message.s==1); } } }).export(module); diff --git a/test/p2p.js b/test/p2p.js new file mode 100644 index 0000000..34b5d52 --- /dev/null +++ b/test/p2p.js @@ -0,0 +1,78 @@ +var dht = require('../') +var _ = require('lodash'); + +var host = '127.0.0.1'; +//var host = '24.131.65.218'; + + +var peers = []; +var debug = false; + +function update() { + process.stdout.write('\033c'); + peers.forEach(function(p) { + p.debug(); + console.log(); + }); +} +setInterval(update, 200); + + + + +function initPeer(portNumber, seeds, onConnect) { + var node = new dht.KNode({ + id: 'a' + portNumber, + address: host, + port: portNumber, + }, seeds, debug); + node.once('contact:add', function() { + onConnect(node); + }); + peers.push(node); + + + //console.log(node.self); + +/* if (targetPort) { + console.log('connecting...', targetPort); + node.connect(host, targetPort, function(err) { + if (err) { + console.err('CONNECT: ' + err); + return; + } + console.log("Successfully connected to", targetPort); + onConnect(node); + }); + }*/ + //node.debug(); +} + +initPeer(12004, [], function(node) { +}); + +setTimeout(function() { + initPeer(12005, [12004], function(node) { + + var b = false; + setInterval(function() { + node.set(node.id, { x: b } ); + b = !b; + }, 325); + + //node.debug(); + }); +}, 1000); + +setTimeout(function() { + initPeer(12006, [12005], function(node) { + + var b = false; + setInterval(function() { + node.set(node.id, { y: b } ); + b = !b; + }, 250); + + }); +}, 2000); + diff --git a/test/util-test.js b/test/util-test.js index 5bec4aa..4853c95 100644 --- a/test/util-test.js +++ b/test/util-test.js @@ -3,7 +3,7 @@ var vows = require('vows') , assert = require('assert'); var Faker = require('Faker'); -var _ = require('underscore'); +var _ = require('lodash'); var constants = require('../lib/constants'); var util = require('../lib/util'); @@ -31,14 +31,14 @@ vows.describe('Utilities').addBatch({ assert.ok(Buffer.isBuffer(topic('0123456789abcdef'))); }, - 'accepts only valid hexadecimal': function(topic) { + /*'accepts only valid hexadecimal': function(topic) { assert.throws(function() { topic('ghijklm'); }); assert.throws(function() { topic('123xyz'); }); - } + }*/ }, 'The id_compare function': { @@ -190,5 +190,34 @@ vows.describe('Utilities').addBatch({ test(i); }); } - } + }, + + 'Message Compaction': { + 'lossless': function(topic) { + function test(a) { + var as = util.messageToString(a); + var am = util.messageFromString(as); + /*console.log(); + console.log(a); + console.log(as); + console.log(am);*/ + assert.ok(_.isEqual(a, am)); + } + + test({ + id: '60c93d92a1bb9a0b71c8b063a51afd5547f0e937', + nodeID: '60c93d92a1bb9a0b71c8b063a51afd5547f0e937', + extra: 'e', + type: 't' + }); + test({ + type: 't', + nodeID: '60c93d92a1bb9a0b71c8b063a51afd5547f0e937', + id: '60c93d92a1bb9a0b71c8b063a51afd5547f0e937' + }); + + } + } + + }).export(module);