-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
48de901
commit 07b2023
Showing
703 changed files
with
76,773 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
./node_modules/ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
test: *.js | ||
rm -rf /tmp/*.ipc | ||
node index.js "A" & node index.js "B" & node index.js "C" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
|
||
module.exports = { | ||
"A": { roles: [1], types: "send send recv 4 recv 5"}, | ||
"B": { roles: [2], types: "recv 1 recv 1 recv 4 recv 5"}, | ||
"C": { roles: [3,4,5], types: "recv 1 recv 1 send send"} | ||
// "a": { roles: [3], types: "recv 1 recv 1 recv 4 recv 5"}, | ||
// "b": { roles: [4], types: "recv 1 recv 1 send recv 5"}, | ||
// "c": { roles: [1,2,5], types: "send send recv 4"} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
"use strict" | ||
|
||
let Msg = require("./msg.js") | ||
let Socket = require("./socket.js") | ||
|
||
|
||
/** | ||
* Simple leader election. | ||
*/ | ||
class Consensus { | ||
|
||
/** | ||
* @param {Socket} sock a pre-connected bus socket | ||
* @param {uuid} id the uuid of myself | ||
* @param {object} metadata anything that's attached to this id | ||
* @param {Function} callback invoked when it becomes leader, or when leader sends first command | ||
*/ | ||
constructor(sock, id, metadata, callback) { | ||
this.sock = sock | ||
|
||
this.id = id | ||
this.meta = metadata // may contain roles and stype | ||
|
||
this.state = "follower" // follower | candidate | leader | ||
this.members = {} | ||
|
||
this.members[this.id] = this.meta | ||
|
||
this.timer = this.reset_timer() | ||
this.pinger = setInterval(() => this.ping(), 150) | ||
this.callback = callback | ||
|
||
// this will remove all other handlers | ||
// and use ours exclusively | ||
this.sock.onmessage = msg => this.on_message(msg) | ||
} | ||
|
||
/** | ||
* Send ping messages to gossip what metadata I have | ||
*/ | ||
ping() { | ||
this.sock.send(new Msg("ping", this.id, {meta: this.meta})) | ||
} | ||
|
||
/** | ||
* Reset the election timer | ||
*/ | ||
reset_timer() { | ||
clearTimeout(this.timer) | ||
|
||
if (this.state != "leader") { | ||
// [150, 300) ms recommended timeouts from RAFT | ||
this.timer = setTimeout(() => this.on_timeout(), Math.random() * 150 + 150) | ||
} else { | ||
// 150 ms for leader heartbeat | ||
this.timer = setTimeout(() => this.on_timeout(), 150) | ||
} | ||
} | ||
|
||
/** | ||
* As a leader, send out a sync command to sync metadata about all the members. | ||
* @param {Object} meta metadata to by synced | ||
* @param {Array of uuid} receivers intended receivers | ||
*/ | ||
syncmeta(meta, receivers) { | ||
this.meta = meta | ||
this.sock.send(new Msg("sync", this.id, {meta: this.meta, receivers: receivers})) | ||
} | ||
|
||
/** | ||
* Election timeouts | ||
*/ | ||
on_timeout() { | ||
switch (this.state) { | ||
// if a follower times out, it becomes candidate | ||
// if there is a leader, the heartbeat should keep it within follower | ||
case "follower": | ||
this.state = "candidate" | ||
this.sock.send(new Msg("candidate", this.id, {meta: this.meta})) | ||
this.reset_timer() | ||
break | ||
|
||
// if a candidate times out, it becomes leader | ||
// if there is a leader, the hearbeat should force it to become a follower | ||
case "candidate": | ||
this.sock.send(new Msg("leader", this.id, {meta: this.meta})) | ||
this.state = "leader" | ||
this.reset_timer() | ||
break | ||
|
||
// if a leader times out, it heartbeats | ||
case "leader": | ||
this.sock.send(new Msg("leader", this.id, {meta: this.meta})) | ||
this.reset_timer() | ||
this.callback(this) | ||
break | ||
} | ||
} | ||
|
||
/** | ||
* On consensus messages or leader commands | ||
* @param {Msg} msg | ||
*/ | ||
on_message(msg) { | ||
|
||
switch (msg.label) { | ||
// ping message to update members list | ||
// reset timer if it comes from a smaller id | ||
case "ping": | ||
this.members[msg.sender] = msg.payload.meta | ||
if (msg.sender < this.id) { | ||
this.state = "follower" | ||
this.reset_timer() | ||
} | ||
break | ||
|
||
// if the sender is smaller, help it become leader | ||
// otherwise, promote myself to be the leader | ||
case "candidate": | ||
// if the candidate is smaller than me | ||
if (msg.sender < this.id) { | ||
this.state = "follower" | ||
} else { | ||
this.state = "candidate" | ||
this.sock.send(new Msg("candidate", this.id, {meta: this.meta})) | ||
} | ||
this.reset_timer() | ||
break | ||
|
||
// as long as I'm not a leader, I will be follower. | ||
// If I'm also a leader, we need to compare our id | ||
case "leader": | ||
if (this.state == "leader") { | ||
if (msg.sender < this.id) { | ||
// I should not be the leader | ||
this.state = "follower" | ||
} else { | ||
// I should, but let's re-elect | ||
this.state = "candidate" | ||
this.sock.send(new Msg("candidate", this.id, {meta: this.meta})) | ||
} | ||
} else { | ||
// just follow the leader | ||
this.state = "follower" | ||
} | ||
this.reset_timer() | ||
break | ||
|
||
// the leader sends out this command | ||
case "sync": | ||
if (this.state != "leader" && msg.payload.receivers.includes(this.id)) { | ||
this.meta = msg.payload.meta | ||
clearTimeout(this.timer) | ||
clearInterval(this.pinger) | ||
this.callback(this) | ||
} | ||
} | ||
} | ||
} | ||
|
||
module.exports = Consensus |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
"use strict" | ||
|
||
let nanomsg = require("nanomsg") | ||
let uuidv4 = require("uuid/v4") | ||
let Mailbox = require("./mailbox.js") | ||
let Socket = require("./socket.js") | ||
let Msg = require("./msg.js") | ||
let Consensus = require("./consensus.js") | ||
|
||
class Endpoint { | ||
/** | ||
* local states: | ||
* - id: endpoint id | ||
* - roles: self roles within the session | ||
* - peers: peer roles and peer endpoint id's | ||
* - mbox: message box for received msgs | ||
* - sock: pre-connected socket in "bus/broadcast" mode | ||
*/ | ||
|
||
|
||
/** | ||
* Construct an endpoint that wraps around a pre-connected | ||
* broadcasting socket. | ||
* @param {Socket} sock | ||
* @param {[int]} roles | ||
*/ | ||
constructor(sock, roles) { | ||
this.id = uuidv4(); | ||
this.roles = roles.sort(); | ||
this.peers = {} // a map where peers[role] = peer_uuid | ||
this.mbox = new Mailbox() | ||
this.sock = sock | ||
this.state = "init" // init | ready | active | ||
|
||
this.handler = | ||
msg => | ||
msg.sender in this.peers ? this.mbox.put(msg) : "" | ||
} | ||
|
||
|
||
async init(fullroles) { | ||
|
||
fullroles.sort() | ||
|
||
let req = new Promise((resolve, reject) => { | ||
let consensus = new Consensus(this.sock, this.id, this.roles, obj => { | ||
|
||
switch (obj.state) { | ||
|
||
// becomes a leader | ||
case "leader": | ||
// reset this.peers | ||
this.peers = {} | ||
|
||
// add peers, from the smallest id to the largest id | ||
let keys = Object.keys(obj.members) | ||
keys.sort() | ||
keys.forEach(id => { | ||
// non-overlapping roles | ||
if (obj.members[id].every(role => !(role in this.peers))) { | ||
obj.members[id].forEach(role => this.peers[role] = id) | ||
} | ||
}) | ||
|
||
// check for all the roles | ||
keys = Object.keys(this.peers) | ||
keys.sort() | ||
if (keys.length == fullroles.length && | ||
keys.reduce((acc, cur, idx) => acc && (cur == fullroles[idx]), true)) { | ||
|
||
// leader elected, stop the timer, replace message handler, and sync peers | ||
clearTimeout(obj.timer) | ||
clearInterval(obj.pinger) | ||
this.sock.onmessage = this.handler | ||
obj.syncmeta(this.peers, Object.values(this.peers)) | ||
resolve(obj) | ||
} else { | ||
obj.state = "candidate" | ||
} | ||
break | ||
|
||
// some leader is elected, and received "sync" message | ||
default: | ||
clearTimeout(obj.timer) | ||
clearInterval(obj.pinger) | ||
this.peers = obj.meta | ||
this.sock.onmessage = this.handler | ||
resolve(obj) | ||
} | ||
}) | ||
}) | ||
|
||
|
||
let consensus = await req | ||
} | ||
|
||
/** | ||
* Broadcast a payload to all | ||
* @param {anything} payload | ||
* @return {void} | ||
*/ | ||
broadcast(payload) { | ||
let msg = new Msg("send", this.id, payload) | ||
this.sock.send(msg) | ||
} | ||
|
||
/** | ||
* Receive a message from a particular sender. | ||
* @param {role} sender | ||
* @return {anything} | ||
*/ | ||
async receive(sender) { | ||
let promises = [this.mbox.match("send", this.peers[sender.toString()]), | ||
this.mbox.match("link", null)] | ||
let msg = await promises.race() | ||
|
||
switch (msg.label) { | ||
case "send": | ||
return msg | ||
case "link": | ||
this.peers = msg.payload | ||
return this.receive(sender) | ||
} | ||
} | ||
|
||
close() { | ||
this.sock.close() | ||
this.mbox = null | ||
} | ||
|
||
static async partialcut(ep1, ep2) { | ||
let intersection_roles = ep1.roles.filter(role => ep2.includes(role)) | ||
let direction = await ep1.mbox.isempty() // true: ep1 <- ep2, false: ep1 -> ep2 | ||
|
||
let ep = new Endpoint(this.sock, intersection_roles) | ||
ep.sock.onmessage = ep.handler | ||
|
||
let peers = {} | ||
intersection_roles.forEach(role => peers[role] = ep.id) | ||
|
||
Object.keys(ep1.peers).forEach(role => (!ep1.roles.includes(role)) ? peers[role] = ep1.peers[role] : "") | ||
Object.keys(ep2.peers).forEach(role => (!ep2.roles.includes(role)) ? peers[role] = ep2.peers[role] : "") | ||
|
||
ep1.sock.send(new Msg("link", ep1.id, peers)) | ||
ep2.sock.send(new Msg("link", ep2.id, peers)) | ||
|
||
if (direction) { | ||
while (!ep2.mbox.isempty()) { | ||
let msg = await ep2.mbox.get() | ||
ep1.sock.send(msg) | ||
} | ||
} else { | ||
while (!ep1.mbox.isempty()) { | ||
let msg = await ep1.mbox.get() | ||
ep2.sock.send(msg) | ||
} | ||
} | ||
|
||
return ep | ||
} | ||
|
||
} | ||
|
||
module.exports = Endpoint |
Oops, something went wrong.