Skip to content

Commit

Permalink
feat: add multiple servers scaling, reconnect gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
bugwheels94 committed Apr 13, 2023
1 parent a04911b commit edecc1b
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 152 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"@types/ws": "^8.5.3",
"babel-plugin-const-enum": "^1.2.0",
"cross-env": "^7.0.3",
"cz-conventional-changelog": "^3.3.0",
"fast-glob": "^3.2.11",
"husky": "^7.0.4",
"lint-staged": "^12.3.7",
Expand Down
35 changes: 0 additions & 35 deletions src/StorageAdapter.ts

This file was deleted.

17 changes: 10 additions & 7 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import WebSocket from 'isomorphic-ws';
import HttpStatusCode from './statusCodes';
import { MessageStore } from './storageAdapter';
// import { MessageStore } from './messageStore';
import crypto from 'crypto';
export type ClientResponse = {
_id: number;
Expand Down Expand Up @@ -30,7 +30,7 @@ export type ClientPromiseStore = Record<
export class Socket {
id: string;
socket: WebSocket;
store?: MessageStore | undefined;
// store?: MessageStore | undefined;
groups: Set<string> = new Set();
lastMessageId = 0;
send(data: Uint8Array) {
Expand All @@ -43,9 +43,9 @@ export class Socket {
data[0] = id & 255;

socket = this.socket;
if (this.store) {
this.store.insert(this.id, [[this.lastMessageId + '-0', data]]);
}
// if (this.store) {
// this.store.insert(this.id, [[this.lastMessageId + '-0', data]]);
// }

// inject code for insert into reconnect queue here
// 1. Save locally for 5 seconds(configurable)
Expand All @@ -57,9 +57,12 @@ export class Socket {
setId(id: string) {
this.id = id;
}
constructor(socket: WebSocket, store?: MessageStore) {
constructor(
socket: WebSocket
// , store?: MessageStore
) {
this.socket = socket;
this.store = store;
// this.store = store;
this.id = crypto.randomUUID();
}
}
11 changes: 5 additions & 6 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,17 @@ export class Client {
let socket: WebSocket, message: Uint8Array;
const { forget, ...remaining } = options;
let id: number | undefined;
if (!forget) {
this.id += 1;
id = this.id;
}
this.id += 1;
id = this.id;
message = createMessageForServer(url, method, id, remaining);
socket = this.socket;
console.log('Sent', { url, method, id, ...remaining });

if (socket.CONNECTING === socket.readyState) {
this.pendingMessageStore.push(message);
} else {
socket.send(message);
}
if (forget) return null;
return new Promise<ParsedServerMessage>((resolve, reject) => {
this.promiseStore[this.id] = { resolve, reject };
});
Expand Down Expand Up @@ -71,8 +69,9 @@ export class Client {
async listener(message: ParsedServerMessage) {
// Message is coming from client to router and execution should be skipped
if (message.respondingMessageId === undefined) return;
// @ts-ignore
window.ankit = this.promiseStore;
if (message.status < 300) {
console.log(Object.keys(this.promiseStore), message.respondingMessageId);
this.promiseStore[message.respondingMessageId].resolve(message);
} else if (message.status >= 300) {
this.promiseStore[message.respondingMessageId].reject(message);
Expand Down
19 changes: 10 additions & 9 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const onSocketCreated = (socket: WebSocket, restifySocket: RestifyWebSocket<X>)
socket.addEventListener('message', async ({ data }) => {
try {
const message = await parseServerMessage(data);
console.log('Received', message);
restifySocket.lastMessageId = message.messageId;
restifySocket.receiver.listener(message);
restifySocket.client.listener(message);
Expand Down Expand Up @@ -66,13 +65,18 @@ class RestifyWebSocket<T extends X> {
}
onWebsocketOpen(options: WebSocketPlusOptions) {
this.currentReconnectDelay = options.firstReconnectDelay;
console.log('ankit2', this.connectionId);
if (this.connectionId) {
this.client.meta('/connection', {
body: this.connectionId,
});
this.client
.meta('/connection', {
body: this.connectionId,
})
.then((res) => {
this.connectionId = res.data;
});
} else {
this.client.meta('/connection');
this.client.meta('/connection').then((res) => {
this.connectionId = res.data;
});
}
if (this.lastMessageId) this.socket;
}
Expand Down Expand Up @@ -117,9 +121,6 @@ class RestifyWebSocket<T extends X> {
constructor(urlOrSocket: T, options?: T extends string ? WebSocketPlusOptions : Options) {
this.client = new Client();
this.receiver = new Receiver();
this.receiver.meta('/connection', (_req, res) => {
this.connectionId = res.data;
});
let socket: WebSocket;
if (typeof urlOrSocket === 'string') {
this.url = urlOrSocket;
Expand Down
2 changes: 1 addition & 1 deletion src/client/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export type ReceiverResponse = {
export type ReceiverCallback<P extends object = object> = (
request: ReceiverRequest<P>,
response: ReceiverResponse
) => void;
) => Promise<void> | void;
export type ReceiverRoute = {
literalRoute: string;
match: MatchFunction<any>;
Expand Down
7 changes: 1 addition & 6 deletions src/client/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,11 @@ export async function parseServerMessage(data: WebSocket.Data) {
if (!(data instanceof Blob)) return null;
const ui8 = new Uint8Array(await data.arrayBuffer());
let index = 3;
console.log(decoder.decode(ui8), 'ok', ui8);
const messageId = ui8[0] * 255 * 255 + ui8[1] * 255 + ui8[2];
const method = ui8[index++];
const isRespondingToPreviousMessage = ui8[index++];
const status = ui8[index++] * 255 + ui8[index++];
const isHeaderPresent = ui8[index++];
console.log('Read header at', index);
let respondingMessageId: number | undefined;
if (isRespondingToPreviousMessage) {
respondingMessageId = ui8[index++] * 255 + ui8[index++];
Expand All @@ -53,12 +51,9 @@ export async function parseServerMessage(data: WebSocket.Data) {
index += urlLength;

let header: Record<string, string>;
console.log({
isHeaderPresent,
});

if (isHeaderPresent) {
const headerLength = ui8[index++] * 255 + ui8[index++];
console.log('parsing json', decoder.decode(ui8.subarray(index, index + headerLength)));

header = JSON.parse(decoder.decode(ui8.subarray(index, index + headerLength)));
index += headerLength;
Expand Down
46 changes: 46 additions & 0 deletions src/distributor/inMemory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import EventEmitter from 'events';
import { MessageDistributor } from '.';
const decoder = new TextDecoder();

export class InMemoryMessageDistributor implements MessageDistributor {
initialized?: boolean;
list: Map<string, Set<string>> = new Map();
keyStore: Map<string, string> = new Map();
eventEmitter = new EventEmitter();
constructor() {}
async initialize() {
this.initialized = true;
}

async addListItem(listId: string, item: string) {
if (this.list.has(listId)) this.list.get(listId).add(item);
else this.list.set(listId, new Set([item]));
}
async getListItems(listId: string) {
return this.list.get(listId) || [];
}
async removeListItem(listId: string, item: string) {
return this.list.get(listId)?.delete(item);
}

async set(key: string, value: string) {
return this.keyStore.set(key, value);
}
async get(key: string) {
return this.keyStore.get(key);
}
async enqueue(queueId: string, message: Uint8Array) {
this.eventEmitter.emit(queueId, message);
}
async listen(channel: string, callback: (_: string, _s: Uint8Array) => void) {
this.eventEmitter.on(channel, (message) => {
const finalMessage = new Uint8Array(message);
const groupLength = finalMessage[0];
const id = decoder.decode(finalMessage.subarray(1, 1 + groupLength));

const remaining = finalMessage.subarray(1 + groupLength, finalMessage.length);

callback(id, remaining);
});
}
}
12 changes: 12 additions & 0 deletions src/distributor/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
export interface MessageDistributor {
initialize: (serverId: string) => Promise<void>;
listen: (queueId: string, callback: (receiverId: string, message: Uint8Array) => void) => void;
enqueue: (queueId: string, message: Uint8Array) => void;
addListItem: (listId: string, item: string) => Promise<any>;
getListItems: (listId: string) => Promise<Iterable<string>>;
set: (key: string, value: string) => Promise<any>;
get: (key: string) => Promise<string>;
}

export { InMemoryMessageDistributor } from './inMemory';
export { RedisMessageDistributor } from './redis';
50 changes: 8 additions & 42 deletions src/distributedStore.ts → src/distributor/redis.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,14 @@
import { commandOptions, RedisClientType, createClient } from 'redis';
import crypto from 'crypto';

export abstract class StorageClass<Client> {
client: Client;
serverId: string;
constructor(options: { client: Client }) {
this.client = options.client;
this.serverId = crypto.randomUUID();
}
addToMap(_mapName: string, _key: string, _value: string) {}
deleteFromMap(_mapName: string, _key: string) {}
addToGroup(_groupName: string) {
// Insert this server id to thes passed groupName in storage
// store websocket instance corresponsing to groupName[] in local memory
}
removeFromGroup(_groupName: string) {
// remove websocket instance corresponsing to groupName[] in local memory
// if groupName[] is empty them remove this server id from the passed groupName in storage
}
}

export interface DistributedStore {
initialize: (serverId: string) => Promise<void>;
listen: (queueId: string, callback: (receiverId: string, message: Uint8Array) => void) => void;
enqueue: (queueId: string, message: Uint8Array) => void;
addListItem: (listId: string, item: string) => Promise<any>;
getListItems: (listId: string) => Promise<string[]>;
set: (key: string, value: string) => Promise<any>;
get: (key: string) => Promise<string>;
}
import { MessageDistributor } from '.';

const decoder = new TextDecoder();
export class RedisStore implements DistributedStore {

export class RedisMessageDistributor implements MessageDistributor {
redisClient: RedisClientType;
private serverId: string;
initialized?: boolean;

constructor(private url: string) {}
async initialize(serverId: string) {
this.serverId = serverId;
async initialize() {
const client = createClient({
url: this.url,
});
Expand Down Expand Up @@ -69,10 +40,8 @@ export class RedisStore implements DistributedStore {
async enqueue(queueId: string, message: Uint8Array) {
const buffer = message.buffer;
const length = buffer.byteLength;
this.redisClient.lPush(commandOptions({ returnBuffers: true }), queueId, Buffer.from(buffer, 0, length));
}
async addIndividualToServer(connectionId: string) {
return this.redisClient.set(`i:${connectionId}`, this.serverId);

this.redisClient.rPush(commandOptions({ returnBuffers: true }), queueId, Buffer.from(buffer, 0, length));
}

async listen(channel: string, callback: (_: string, _s: Uint8Array) => void) {
Expand All @@ -81,21 +50,18 @@ export class RedisStore implements DistributedStore {
});
await redisClient.connect();
while (true) {
console.log('BLOCKING LIST', channel);
try {
const pp = redisClient.blPop(commandOptions({ returnBuffers: true }), channel, 0);
const { element: message } = await pp;
const finalMessage = new Uint8Array(message);
console.log('MNESSAGE FOUND', message, finalMessage);
const groupLength = finalMessage[0];
const id = decoder.decode(finalMessage.subarray(1, 1 + groupLength));

const remaining = finalMessage.subarray(1 + groupLength, finalMessage.length);
console.log('ITEM FOUND', { id, remaining });

callback(id, remaining);
} catch (e) {
console.log('OKOK', e);
console.log(e);
}
}
}
Expand Down
Loading

0 comments on commit edecc1b

Please sign in to comment.