diff --git a/package.json b/package.json index 3a8c3ec..5cf4441 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "@types/ws": "^8.5.3", "babel-plugin-const-enum": "^1.2.0", "cross-env": "^7.0.3", + "cz-conventional-changelog": "^3.3.0", "fast-glob": "^3.2.11", "husky": "^7.0.4", "lint-staged": "^12.3.7", diff --git a/src/StorageAdapter.ts b/src/StorageAdapter.ts deleted file mode 100644 index 9560bf9..0000000 --- a/src/StorageAdapter.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { RedisClientType, createClient } from 'redis'; - -const textDecoder = new TextDecoder(); -export interface MessageStore { - initialize: (serverId: string) => Promise; - insert: (key: string, messages: [string, Uint8Array][]) => void; - getMessagesAfterId: (messageId: string) => Uint8Array[]; -} -export class RedisMessageStore { - redisClient: RedisClientType; - constructor(private url?: string) {} - - async initialize() { - const client = this.url - ? createClient({ - url: this.url, - }) - : createClient(); - - client.on('error', (err) => console.log('Redis Client Error', err)); - - await client.connect(); - // @ts-ignore - this.redisClient = client; - } - insert(key: string, messages: [string, Uint8Array][]) { - messages.map(([messageId, message]) => { - return this.redisClient.xAdd(key, messageId, { '0': textDecoder.decode(message) }); - }); - } - async getMessagesAfterId(key: string, messageId: string) { - const data = await this.redisClient.xRange(key, messageId, '+'); - return data; - } -} diff --git a/src/client.ts b/src/client.ts index bf82108..28d1913 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,6 +1,6 @@ import WebSocket from 'isomorphic-ws'; import HttpStatusCode from './statusCodes'; -import { MessageStore } from './storageAdapter'; +// import { MessageStore } from './messageStore'; import crypto from 'crypto'; export type ClientResponse = { _id: number; @@ -30,7 +30,7 @@ export type ClientPromiseStore = Record< export class Socket { id: string; socket: WebSocket; - store?: MessageStore | undefined; + // store?: MessageStore | undefined; groups: Set = new Set(); lastMessageId = 0; send(data: Uint8Array) { @@ -43,9 +43,9 @@ export class Socket { data[0] = id & 255; socket = this.socket; - if (this.store) { - this.store.insert(this.id, [[this.lastMessageId + '-0', data]]); - } + // if (this.store) { + // this.store.insert(this.id, [[this.lastMessageId + '-0', data]]); + // } // inject code for insert into reconnect queue here // 1. Save locally for 5 seconds(configurable) @@ -57,9 +57,12 @@ export class Socket { setId(id: string) { this.id = id; } - constructor(socket: WebSocket, store?: MessageStore) { + constructor( + socket: WebSocket + // , store?: MessageStore + ) { this.socket = socket; - this.store = store; + // this.store = store; this.id = crypto.randomUUID(); } } diff --git a/src/client/client.ts b/src/client/client.ts index 079160e..bdabf3d 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -26,19 +26,17 @@ export class Client { let socket: WebSocket, message: Uint8Array; const { forget, ...remaining } = options; let id: number | undefined; - if (!forget) { - this.id += 1; - id = this.id; - } + this.id += 1; + id = this.id; message = createMessageForServer(url, method, id, remaining); socket = this.socket; - console.log('Sent', { url, method, id, ...remaining }); if (socket.CONNECTING === socket.readyState) { this.pendingMessageStore.push(message); } else { socket.send(message); } + if (forget) return null; return new Promise((resolve, reject) => { this.promiseStore[this.id] = { resolve, reject }; }); @@ -71,8 +69,9 @@ export class Client { async listener(message: ParsedServerMessage) { // Message is coming from client to router and execution should be skipped if (message.respondingMessageId === undefined) return; + // @ts-ignore + window.ankit = this.promiseStore; if (message.status < 300) { - console.log(Object.keys(this.promiseStore), message.respondingMessageId); this.promiseStore[message.respondingMessageId].resolve(message); } else if (message.status >= 300) { this.promiseStore[message.respondingMessageId].reject(message); diff --git a/src/client/index.ts b/src/client/index.ts index 791231e..c76e044 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -18,7 +18,6 @@ const onSocketCreated = (socket: WebSocket, restifySocket: RestifyWebSocket) socket.addEventListener('message', async ({ data }) => { try { const message = await parseServerMessage(data); - console.log('Received', message); restifySocket.lastMessageId = message.messageId; restifySocket.receiver.listener(message); restifySocket.client.listener(message); @@ -66,13 +65,18 @@ class RestifyWebSocket { } onWebsocketOpen(options: WebSocketPlusOptions) { this.currentReconnectDelay = options.firstReconnectDelay; - console.log('ankit2', this.connectionId); if (this.connectionId) { - this.client.meta('/connection', { - body: this.connectionId, - }); + this.client + .meta('/connection', { + body: this.connectionId, + }) + .then((res) => { + this.connectionId = res.data; + }); } else { - this.client.meta('/connection'); + this.client.meta('/connection').then((res) => { + this.connectionId = res.data; + }); } if (this.lastMessageId) this.socket; } @@ -117,9 +121,6 @@ class RestifyWebSocket { constructor(urlOrSocket: T, options?: T extends string ? WebSocketPlusOptions : Options) { this.client = new Client(); this.receiver = new Receiver(); - this.receiver.meta('/connection', (_req, res) => { - this.connectionId = res.data; - }); let socket: WebSocket; if (typeof urlOrSocket === 'string') { this.url = urlOrSocket; diff --git a/src/client/receiver.ts b/src/client/receiver.ts index 4d54bb9..31c3793 100644 --- a/src/client/receiver.ts +++ b/src/client/receiver.ts @@ -16,7 +16,7 @@ export type ReceiverResponse = { export type ReceiverCallback

= ( request: ReceiverRequest

, response: ReceiverResponse -) => void; +) => Promise | void; export type ReceiverRoute = { literalRoute: string; match: MatchFunction; diff --git a/src/client/utils.ts b/src/client/utils.ts index 133147d..90b041e 100644 --- a/src/client/utils.ts +++ b/src/client/utils.ts @@ -37,13 +37,11 @@ export async function parseServerMessage(data: WebSocket.Data) { if (!(data instanceof Blob)) return null; const ui8 = new Uint8Array(await data.arrayBuffer()); let index = 3; - console.log(decoder.decode(ui8), 'ok', ui8); const messageId = ui8[0] * 255 * 255 + ui8[1] * 255 + ui8[2]; const method = ui8[index++]; const isRespondingToPreviousMessage = ui8[index++]; const status = ui8[index++] * 255 + ui8[index++]; const isHeaderPresent = ui8[index++]; - console.log('Read header at', index); let respondingMessageId: number | undefined; if (isRespondingToPreviousMessage) { respondingMessageId = ui8[index++] * 255 + ui8[index++]; @@ -53,12 +51,9 @@ export async function parseServerMessage(data: WebSocket.Data) { index += urlLength; let header: Record; - console.log({ - isHeaderPresent, - }); + if (isHeaderPresent) { const headerLength = ui8[index++] * 255 + ui8[index++]; - console.log('parsing json', decoder.decode(ui8.subarray(index, index + headerLength))); header = JSON.parse(decoder.decode(ui8.subarray(index, index + headerLength))); index += headerLength; diff --git a/src/distributor/inMemory.ts b/src/distributor/inMemory.ts new file mode 100644 index 0000000..c315ba2 --- /dev/null +++ b/src/distributor/inMemory.ts @@ -0,0 +1,46 @@ +import EventEmitter from 'events'; +import { MessageDistributor } from '.'; +const decoder = new TextDecoder(); + +export class InMemoryMessageDistributor implements MessageDistributor { + initialized?: boolean; + list: Map> = new Map(); + keyStore: Map = new Map(); + eventEmitter = new EventEmitter(); + constructor() {} + async initialize() { + this.initialized = true; + } + + async addListItem(listId: string, item: string) { + if (this.list.has(listId)) this.list.get(listId).add(item); + else this.list.set(listId, new Set([item])); + } + async getListItems(listId: string) { + return this.list.get(listId) || []; + } + async removeListItem(listId: string, item: string) { + return this.list.get(listId)?.delete(item); + } + + async set(key: string, value: string) { + return this.keyStore.set(key, value); + } + async get(key: string) { + return this.keyStore.get(key); + } + async enqueue(queueId: string, message: Uint8Array) { + this.eventEmitter.emit(queueId, message); + } + async listen(channel: string, callback: (_: string, _s: Uint8Array) => void) { + this.eventEmitter.on(channel, (message) => { + const finalMessage = new Uint8Array(message); + const groupLength = finalMessage[0]; + const id = decoder.decode(finalMessage.subarray(1, 1 + groupLength)); + + const remaining = finalMessage.subarray(1 + groupLength, finalMessage.length); + + callback(id, remaining); + }); + } +} diff --git a/src/distributor/index.ts b/src/distributor/index.ts new file mode 100644 index 0000000..a145d8f --- /dev/null +++ b/src/distributor/index.ts @@ -0,0 +1,12 @@ +export interface MessageDistributor { + initialize: (serverId: string) => Promise; + listen: (queueId: string, callback: (receiverId: string, message: Uint8Array) => void) => void; + enqueue: (queueId: string, message: Uint8Array) => void; + addListItem: (listId: string, item: string) => Promise; + getListItems: (listId: string) => Promise>; + set: (key: string, value: string) => Promise; + get: (key: string) => Promise; +} + +export { InMemoryMessageDistributor } from './inMemory'; +export { RedisMessageDistributor } from './redis'; diff --git a/src/distributedStore.ts b/src/distributor/redis.ts similarity index 50% rename from src/distributedStore.ts rename to src/distributor/redis.ts index 01511f7..ca17054 100644 --- a/src/distributedStore.ts +++ b/src/distributor/redis.ts @@ -1,43 +1,14 @@ import { commandOptions, RedisClientType, createClient } from 'redis'; -import crypto from 'crypto'; - -export abstract class StorageClass { - client: Client; - serverId: string; - constructor(options: { client: Client }) { - this.client = options.client; - this.serverId = crypto.randomUUID(); - } - addToMap(_mapName: string, _key: string, _value: string) {} - deleteFromMap(_mapName: string, _key: string) {} - addToGroup(_groupName: string) { - // Insert this server id to thes passed groupName in storage - // store websocket instance corresponsing to groupName[] in local memory - } - removeFromGroup(_groupName: string) { - // remove websocket instance corresponsing to groupName[] in local memory - // if groupName[] is empty them remove this server id from the passed groupName in storage - } -} - -export interface DistributedStore { - initialize: (serverId: string) => Promise; - listen: (queueId: string, callback: (receiverId: string, message: Uint8Array) => void) => void; - enqueue: (queueId: string, message: Uint8Array) => void; - addListItem: (listId: string, item: string) => Promise; - getListItems: (listId: string) => Promise; - set: (key: string, value: string) => Promise; - get: (key: string) => Promise; -} +import { MessageDistributor } from '.'; const decoder = new TextDecoder(); -export class RedisStore implements DistributedStore { + +export class RedisMessageDistributor implements MessageDistributor { redisClient: RedisClientType; - private serverId: string; initialized?: boolean; + constructor(private url: string) {} - async initialize(serverId: string) { - this.serverId = serverId; + async initialize() { const client = createClient({ url: this.url, }); @@ -69,10 +40,8 @@ export class RedisStore implements DistributedStore { async enqueue(queueId: string, message: Uint8Array) { const buffer = message.buffer; const length = buffer.byteLength; - this.redisClient.lPush(commandOptions({ returnBuffers: true }), queueId, Buffer.from(buffer, 0, length)); - } - async addIndividualToServer(connectionId: string) { - return this.redisClient.set(`i:${connectionId}`, this.serverId); + + this.redisClient.rPush(commandOptions({ returnBuffers: true }), queueId, Buffer.from(buffer, 0, length)); } async listen(channel: string, callback: (_: string, _s: Uint8Array) => void) { @@ -81,21 +50,18 @@ export class RedisStore implements DistributedStore { }); await redisClient.connect(); while (true) { - console.log('BLOCKING LIST', channel); try { const pp = redisClient.blPop(commandOptions({ returnBuffers: true }), channel, 0); const { element: message } = await pp; const finalMessage = new Uint8Array(message); - console.log('MNESSAGE FOUND', message, finalMessage); const groupLength = finalMessage[0]; const id = decoder.decode(finalMessage.subarray(1, 1 + groupLength)); const remaining = finalMessage.subarray(1 + groupLength, finalMessage.length); - console.log('ITEM FOUND', { id, remaining }); callback(id, remaining); } catch (e) { - console.log('OKOK', e); + console.log(e); } } } diff --git a/src/index.ts b/src/index.ts index 55f140f..dfcc3c6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,8 +3,8 @@ import { Socket } from './client'; import { Router } from './router'; import { ServerOptions } from 'ws'; import crypto from 'crypto'; -import { DistributedStore, RedisStore } from './distributedStore'; -import { MessageStore } from './storageAdapter'; +import { MessageDistributor, RedisMessageDistributor, InMemoryMessageDistributor } from './distributor'; +// import { MessageStore } from './messageStore'; import { MethodEnum, parseBrowserMessage } from './utils'; import EventEmitter from 'events'; type RestifyServerEvents = 'connection' | 'close'; @@ -38,7 +38,6 @@ const onServerSocketCreated = (socket: Socket, router: Router) => { } else { socket.setId(crypto.randomUUID()); onServerSocketInitialized(socket, router); - console.log('wow', parsedData); router.listener(parsedData, socket); } } @@ -70,25 +69,23 @@ export class RestifyWebSocketServer extends EventEmitter { }; constructor( options: ServerOptions & { - distributedStore?: DistributedStore; - messageStore?: MessageStore; + distributor?: MessageDistributor; + // messageStore?: MessageStore; } ) { super(); - const { distributedStore } = options; + const { distributor } = options; this.serverId = crypto.randomUUID(); Promise.all([ - options.distributedStore ? options.distributedStore.initialize(this.serverId) : undefined, - options.messageStore ? options.messageStore.initialize(this.serverId) : undefined, + options.distributor ? options.distributor.initialize(this.serverId) : undefined, + // options.messageStore ? options.messageStore.initialize(this.serverId) : undefined, ]) .then(() => { - console.log('Stores Initialized!'); this.rawWebSocketServer = new WebSocket.Server(options); - this.router = new Router(this.serverId, distributedStore); + this.router = new Router(this.serverId, distributor); this.router.meta('/connection', async (req, res) => { - console.log('hahah'); if (!req.body) { // @ts-ignore return res.send(res.socket.id, { @@ -99,11 +96,9 @@ export class RestifyWebSocketServer extends EventEmitter { // @ts-ignore res.socket.setId(req.body); }); - console.log('emitting ready!'); this.emit('ready'); this.on('connection', (rawSocket) => { - console.log('New Connection!'); - const socket = new Socket(rawSocket, options.messageStore); + const socket = new Socket(rawSocket); onServerSocketCreated(socket, this.router); const connectionEvents = this.eventStore['connection'] || []; connectionEvents.forEach(({ listener }) => { @@ -133,5 +128,4 @@ export class RestifyWebSocketServer extends EventEmitter { router: Router; } export type { RouterRequest, Router, RouterResponse } from './router'; -export { RedisMessageStore } from './storageAdapter'; -export { RedisStore }; +export { RedisMessageDistributor, InMemoryMessageDistributor }; diff --git a/src/router.ts b/src/router.ts index 03012a2..e95701c 100644 --- a/src/router.ts +++ b/src/router.ts @@ -3,7 +3,7 @@ import HttpStatusCode from './statusCodes'; import { ApiError, MethodEnum, parseBrowserMessage, createMessageForBrowser, Method } from './utils'; import { match, MatchFunction, MatchResult } from 'path-to-regexp'; import { Socket } from './client'; -import { DistributedStore } from './distributedStore'; +import { MessageDistributor } from './distributor'; import { SocketGroupStore, IndividualSocketConnectionStore } from './localStores'; export type RouterStore = Record; @@ -14,12 +14,13 @@ type SendMessageFromServerOptions = { status?: HttpStatusCode; url?: string; }; -const temp = { +const temp: Record = { get: MethodEnum.GET, post: MethodEnum.POST, put: MethodEnum.PUT, patch: MethodEnum.PATCH, delete: MethodEnum.DELETE, + meta: MethodEnum.META, }; export function createResponse( type: 'self' | 'group' | 'individual', @@ -68,7 +69,6 @@ export function createResponse( data ); if (type === 'group' && typeof instance === 'string') { - console.log('pushing to redis', message, options); router.sendToGroup(instance, finalMessage); } else if (type === 'individual' && typeof instance === 'string') router.sendToGroup(instance, finalMessage); else if (type === 'self' && instance instanceof Socket) instance.send(finalMessage); @@ -79,7 +79,7 @@ export type RouterResponse = ReturnType; export type RouterCallback

= ( request: RouterRequest

, response: ReturnType -) => Promise; +) => Promise | void; export type Route = { literalRoute: string; match: MatchFunction; @@ -87,7 +87,6 @@ export type Route = { }; type Params = Record; -const decoder = new TextDecoder(); const encoder = new TextEncoder(); export class Router { @@ -99,7 +98,7 @@ export class Router { [MethodEnum.DELETE]: [], [MethodEnum.META]: [], }; - constructor(private serverId: string, private store?: DistributedStore) { + constructor(private serverId: string, private store?: MessageDistributor) { this.individualSocketConnectionStore = new IndividualSocketConnectionStore(); this.socketGroupStore = new SocketGroupStore(); this.listenToIndividualQueue(`i:${this.serverId}`); @@ -118,18 +117,13 @@ export class Router { async listenToGroupQueue(queueName: string) { // `g:${serverId}` if (!this.store) return; - console.log('Listening to Queue', queueName); this.store.listen(queueName, (groupId: string, message: Uint8Array) => { - console.log('New message received on queue', this.serverId); this.socketGroupStore.find(groupId)?.forEach((socket) => { - console.log('Popped Redis', message); socket.send(message); }); }); } async sendToGroup(id: string, message: Uint8Array) { - console.log('a', this.socketGroupStore, id); - // this.socketGroupStore.find(id)?.forEach((socket) => { // socket.send(message); // }); @@ -141,27 +135,24 @@ export class Router { messageWithGroupId[0] = groupArray.length; messageWithGroupId.set(groupArray, 1); messageWithGroupId.set(message, 1 + groupArray.length); - console.log('Found servers for publishing to groups', servers); - console.log('pushing in array format', messageWithGroupId); - for (let i = 0; i < servers.length; i++) { - const server = servers[i]; + for (let server of servers) { this.store.enqueue(`server-messages:${server}`, messageWithGroupId); // send to the server oin group channel } } async joinGroup(id: string, socket: Socket) { this.socketGroupStore.add(socket, id); - socket.groups.add(id); // remove it and use store methods only so in absence of distributedstore use localstore or something + socket.groups.add(id); // remove it and use store methods only so in absence of MessageDistributor use localstore or something if (!this.store) return undefined; return Promise.all([ this.store.addListItem(`my-groups:${socket.id}`, id), this.store.addListItem(`group-servers:${id}`, this.serverId), ]); } - async joinGroups(socket: Socket, groupdIds: string[]) { - groupdIds.forEach((groupId) => { + async joinGroups(socket: Socket, groupdIds: Iterable) { + for (let groupId of groupdIds) { this.socketGroupStore.add(socket, groupId); if (this.store) this.store.addListItem(`group-servers:${groupId}`, this.serverId); - }); + } } async getGroups(connectionId: string) { return this.store.getListItems(`my-groups:${connectionId}`); @@ -183,7 +174,6 @@ export class Router { this.store.enqueue(`i:${server}`, messageWithGroupId); } registerRoute(method: MethodEnum, url: string, ...callbacks: RouterCallback[]) { - console.log('pushing'); this.requestStore[method].push({ literalRoute: url, match: match(url, { decode: decodeURIComponent }), @@ -206,7 +196,6 @@ export class Router { this.registerRoute(MethodEnum.DELETE, url, ...callbacks); } meta

(url: string, ...callbacks: RouterCallback

[]) { - console.log('registering'); this.registerRoute(MethodEnum.META, url, ...callbacks); } async listener(message: ReturnType, mySocket: Socket) { @@ -232,7 +221,6 @@ export class Router { await store[i].callbacks[j]({ ...message, ...matched }, response); } } catch (error) { - console.log(error); if (error instanceof ApiError) { response.status(error.status); response.send(error.message); diff --git a/src/utils.ts b/src/utils.ts index 748fdc0..e6cde1f 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -70,14 +70,7 @@ export function parseBrowserMessage(data: WebSocket.Data) { if (dataType === DataType.TEXT) message = decoder.decode(rawData); if (dataType === DataType.BINARY) message = rawData; } catch (e) { - const str = decoder.decode(rawData); - console.log( - 'WOWOW', - `<>${str}<>`, - { method, isHeaderPresent, isIdPresent, url, urlLength }, - { 16: str[16], 17: str[17], 18: str[18], 19: str[19], 20: str[20], 21: str[21], 22: str[22] }, - e - ); + console.log(e); } return { requestId,