From c0d8c5ab234d0d2bef0d0dec472973cc9662f647 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Fri, 12 Nov 2021 06:34:24 +0100 Subject: [PATCH] feat: add an implementation based on uWebSockets.js Usage: ```js const { App } = require("uWebSockets.js"); const { Server } = require("socket.io"); const app = new App(); const server = new Server(); server.attachApp(app); app.listen(3000); ``` The Adapter prototype is updated so we can benefit from the publish functionality of uWebSockets.js, so this will apply to all adapters extending the default adapter. Reference: https://github.com/uNetworking/uWebSockets.js Related: - https://github.com/socketio/socket.io/issues/3601 - https://github.com/socketio/engine.io/issues/578 --- lib/index.ts | 68 +++++++++++++++++ lib/socket.ts | 4 +- lib/uws.ts | 164 +++++++++++++++++++++++++++++++++++++++++ package-lock.json | 34 +++------ package.json | 5 +- test/socket.io.ts | 1 + test/uws.ts | 181 ++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 431 insertions(+), 26 deletions(-) create mode 100644 lib/uws.ts create mode 100644 test/uws.ts diff --git a/lib/index.ts b/lib/index.ts index 3fcd7edfb5..73d70f1cf8 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -9,6 +9,7 @@ import { Server as Engine, ServerOptions as EngineOptions, AttachOptions, + uServer, } from "engine.io"; import { Client } from "./client"; import { EventEmitter } from "events"; @@ -27,6 +28,7 @@ import { StrictEventEmitter, EventNames, } from "./typed-events"; +import { patchAdapter, restoreAdapter, serveFile } from "./uws.js"; const debug = debugModule("socket.io:server"); @@ -344,6 +346,69 @@ export class Server< return this; } + public attachApp(app /*: TemplatedApp */, opts: Partial = {}) { + // merge the options passed to the Socket.IO server + Object.assign(opts, this.opts); + // set engine.io path to `/socket.io` + opts.path = opts.path || this._path; + + // initialize engine + debug("creating uWebSockets.js-based engine with opts %j", opts); + const engine = new uServer(opts); + + engine.attach(app, opts); + + // bind to engine events + this.bind(engine); + + if (this._serveClient) { + // attach static file serving + app.get(`${this._path}/*`, (res, req) => { + if (!this.clientPathRegex.test(req.getUrl())) { + req.setYield(true); + return; + } + + const filename = req + .getUrl() + .replace(this._path, "") + .replace(/\?.*$/, "") + .replace(/^\//, ""); + const isMap = dotMapRegex.test(filename); + const type = isMap ? "map" : "source"; + + // Per the standard, ETags must be quoted: + // https://tools.ietf.org/html/rfc7232#section-2.3 + const expectedEtag = '"' + clientVersion + '"'; + const weakEtag = "W/" + expectedEtag; + + const etag = req.getHeader("if-none-match"); + if (etag) { + if (expectedEtag === etag || weakEtag === etag) { + debug("serve client %s 304", type); + res.writeStatus("304 Not Modified"); + res.end(); + return; + } + } + + debug("serve client %s", type); + + res.writeHeader("cache-control", "public, max-age=0"); + res.writeHeader( + "content-type", + "application/" + (isMap ? "json" : "javascript") + ); + res.writeHeader("etag", expectedEtag); + + const filepath = path.join(__dirname, "../client-dist/", filename); + serveFile(res, filepath); + }); + } + + patchAdapter(app); + } + /** * Initialize engine * @@ -562,6 +627,9 @@ export class Server< this.engine.close(); + // restore the Adapter prototype + restoreAdapter(); + if (this.httpServer) { this.httpServer.close(fn); } else { diff --git a/lib/socket.ts b/lib/socket.ts index 603c8652fb..5bfc0c23da 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -1,5 +1,4 @@ import { Packet, PacketType } from "socket.io-parser"; -import url = require("url"); import debugModule from "debug"; import type { Server } from "./index"; import { @@ -184,7 +183,8 @@ export class Socket< secure: !!this.request.connection.encrypted, issued: +new Date(), url: this.request.url!, - query: url.parse(this.request.url!, true).query, + // @ts-ignore + query: this.request._query, auth, }; } diff --git a/lib/uws.ts b/lib/uws.ts new file mode 100644 index 0000000000..7fdde31565 --- /dev/null +++ b/lib/uws.ts @@ -0,0 +1,164 @@ +import { Adapter, Room } from "socket.io-adapter"; +import type { WebSocket } from "uWebSockets.js"; +import type { Socket } from "./socket.js"; +import { createReadStream, statSync } from "fs"; +import debugModule from "debug"; + +const debug = debugModule("socket.io:adapter-uws"); + +const SEPARATOR = "\x1f"; // see https://en.wikipedia.org/wiki/Delimiter#ASCII_delimited_text + +const { addAll, del, broadcast } = Adapter.prototype; + +export function patchAdapter(app /* : TemplatedApp */) { + Adapter.prototype.addAll = function (id, rooms) { + const isNew = !this.sids.has(id); + addAll.call(this, id, rooms); + const socket: Socket = this.nsp.sockets.get(id); + if (!socket) { + return; + } + if (socket.conn.transport.name === "websocket") { + subscribe(this.nsp.name, socket, isNew, rooms); + return; + } + if (isNew) { + socket.conn.on("upgrade", () => { + const rooms = this.sids.get(id); + subscribe(this.nsp.name, socket, isNew, rooms); + }); + } + }; + + Adapter.prototype.del = function (id, room) { + del.call(this, id, room); + const socket: Socket = this.nsp.sockets.get(id); + if (socket && socket.conn.transport.name === "websocket") { + // @ts-ignore + const sessionId = socket.conn.id; + // @ts-ignore + const websocket: WebSocket = socket.conn.transport.socket; + const topic = `${this.nsp.name}${SEPARATOR}${room}`; + debug("unsubscribe connection %s from topic %s", sessionId, topic); + websocket.unsubscribe(topic); + } + }; + + Adapter.prototype.broadcast = function (packet, opts) { + const useFastPublish = opts.rooms.size <= 1 && opts.except!.size === 0; + if (!useFastPublish) { + broadcast.call(this, packet, opts); + return; + } + + const flags = opts.flags || {}; + const basePacketOpts = { + preEncoded: true, + volatile: flags.volatile, + compress: flags.compress, + }; + + packet.nsp = this.nsp.name; + const encodedPackets = this.encoder.encode(packet); + + const topic = + opts.rooms.size === 0 + ? this.nsp.name + : `${this.nsp.name}${SEPARATOR}${opts.rooms.keys().next().value}`; + debug("fast publish to %s", topic); + + // fast publish for clients connected with WebSocket + encodedPackets.forEach((encodedPacket) => { + const isBinary = typeof encodedPacket !== "string"; + // "4" being the message type in the Engine.IO protocol, see https://github.com/socketio/engine.io-protocol + app.publish( + topic, + isBinary ? encodedPacket : "4" + encodedPacket, + isBinary + ); + }); + + this.apply(opts, (socket) => { + if (socket.conn.transport.name !== "websocket") { + // classic publish for clients connected with HTTP long-polling + for (let i = 0; i < encodedPackets.length; i++) { + socket.client.writeToEngine(encodedPackets[i], basePacketOpts); + } + } + }); + }; +} + +function subscribe( + namespaceName: string, + socket: Socket, + isNew: boolean, + rooms: Set +) { + // @ts-ignore + const sessionId = socket.conn.id; + // @ts-ignore + const websocket: WebSocket = socket.conn.transport.socket; + if (isNew) { + debug("subscribe connection %s to topic %s", sessionId, namespaceName); + websocket.subscribe(namespaceName); + } + rooms.forEach((room) => { + const topic = `${namespaceName}${SEPARATOR}${room}`; // '#' can be used as wildcard + debug("subscribe connection %s to topic %s", sessionId, topic); + websocket.subscribe(topic); + }); +} + +export function restoreAdapter() { + Adapter.prototype.addAll = addAll; + Adapter.prototype.del = del; + Adapter.prototype.broadcast = broadcast; +} + +const toArrayBuffer = (buffer: Buffer) => { + const { buffer: arrayBuffer, byteOffset, byteLength } = buffer; + return arrayBuffer.slice(byteOffset, byteOffset + byteLength); +}; + +// imported from https://github.com/kolodziejczak-sz/uwebsocket-serve +export function serveFile(res /* : HttpResponse */, filepath: string) { + const { size } = statSync(filepath); + const readStream = createReadStream(filepath); + const destroyReadStream = () => !readStream.destroyed && readStream.destroy(); + + const onError = (error: Error) => { + destroyReadStream(); + throw error; + }; + + const onDataChunk = (chunk: Buffer) => { + const arrayBufferChunk = toArrayBuffer(chunk); + + const lastOffset = res.getWriteOffset(); + const [ok, done] = res.tryEnd(arrayBufferChunk, size); + + if (!done && !ok) { + readStream.pause(); + + res.onWritable((offset) => { + const [ok, done] = res.tryEnd( + arrayBufferChunk.slice(offset - lastOffset), + size + ); + + if (!done && ok) { + readStream.resume(); + } + + return ok; + }); + } + }; + + res.onAborted(destroyReadStream); + readStream + .on("data", onDataChunk) + .on("error", onError) + .on("end", destroyReadStream); +} diff --git a/package-lock.json b/package-lock.json index 2a1092d91e..0724d4814b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -442,9 +442,9 @@ "dev": true }, "@types/node": { - "version": "16.7.6", - "resolved": "https://registry.npmjs.org/@types/node/-/node-16.7.6.tgz", - "integrity": "sha512-VESVNFoa/ahYA62xnLBjo5ur6gPsgEE5cNRy8SrdnkZ2nwJSW0kJ4ufbFr2zuU9ALtHM8juY53VcRoTA7htXSg==" + "version": "16.11.7", + "resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.7.tgz", + "integrity": "sha512-QB5D2sqfSjCmTuWcBWyJ+/44bcjO7VbjSbOE0ucoVbAsSNQc4Lt6QkgkVXkTDwkL4z/beecZNDvVX15D4P8Jbw==" }, "@types/normalize-package-data": { "version": "2.4.0", @@ -888,9 +888,9 @@ "dev": true }, "engine.io": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.0.0.tgz", - "integrity": "sha512-Ui7yl3JajEIaACg8MOUwWvuuwU7jepZqX3BKs1ho7NQRuP4LhN4XIykXhp8bEy+x/DhA0LBZZXYSCkZDqrwMMg==", + "version": "6.1.0", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.1.0.tgz", + "integrity": "sha512-ErhZOVu2xweCjEfYcTdkCnEYUiZgkAcBBAhW4jbIvNG8SLU3orAqoJCiytZjYF7eTpVmmCrLDjLIEaPlUAs1uw==", "requires": { "@types/cookie": "^0.4.1", "@types/cors": "^2.8.12", @@ -904,19 +904,6 @@ "ws": "~8.2.3" }, "dependencies": { - "base64-arraybuffer": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/base64-arraybuffer/-/base64-arraybuffer-1.0.1.tgz", - "integrity": "sha512-vFIUq7FdLtjZMhATwDul5RZWv2jpXQ09Pd6jcVEOvIsqCWTRFD/ONHNfyOS8dA/Ippi5dsIgpyKWKZaAKZltbA==" - }, - "engine.io-parser": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.0.0.tgz", - "integrity": "sha512-wn6QavHEqXoM+cg+x8uUG7GhxLBCfKEKNEsCNc7V2ugj3gB3lK91l1MuZiy6xFB2V9D1eew0aWkmpiT/aBb/KA==", - "requires": { - "base64-arraybuffer": "~1.0.1" - } - }, "ws": { "version": "8.2.3", "resolved": "https://registry.npmjs.org/ws/-/ws-8.2.3.tgz", @@ -959,7 +946,6 @@ "version": "5.0.1", "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.0.1.tgz", "integrity": "sha512-j4p3WwJrG2k92VISM0op7wiq60vO92MlF3CRGxhKHy9ywG1/Dkc72g0dXeDQ+//hrcDn8gqQzoEkdO9FN0d9AA==", - "dev": true, "requires": { "base64-arraybuffer": "~1.0.1" }, @@ -967,8 +953,7 @@ "base64-arraybuffer": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/base64-arraybuffer/-/base64-arraybuffer-1.0.1.tgz", - "integrity": "sha512-vFIUq7FdLtjZMhATwDul5RZWv2jpXQ09Pd6jcVEOvIsqCWTRFD/ONHNfyOS8dA/Ippi5dsIgpyKWKZaAKZltbA==", - "dev": true + "integrity": "sha512-vFIUq7FdLtjZMhATwDul5RZWv2jpXQ09Pd6jcVEOvIsqCWTRFD/ONHNfyOS8dA/Ippi5dsIgpyKWKZaAKZltbA==" } } }, @@ -2849,6 +2834,11 @@ "integrity": "sha512-gzP+t5W4hdy4c+68bfcv0t400HVJMMd2+H9B7gae1nQlBzCqvrXX+6GL/b3GAgyTH966pzrZ70/fRjwAtZksSQ==", "dev": true }, + "uWebSockets.js": { + "version": "github:uNetworking/uWebSockets.js#4558ee00f9f1f686fffe1accbfc2e85b1af9c50f", + "from": "github:uNetworking/uWebSockets.js#v20.0.0", + "dev": true + }, "util-deprecate": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", diff --git a/package.json b/package.json index 2a563be82a..376a111767 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,7 @@ "accepts": "~1.3.4", "base64id": "~2.0.0", "debug": "~4.3.2", - "engine.io": "~6.0.0", + "engine.io": "~6.1.0", "socket.io-adapter": "~2.3.2", "socket.io-parser": "~4.0.4" }, @@ -65,7 +65,8 @@ "supertest": "^6.1.6", "ts-node": "^10.2.1", "tsd": "^0.17.0", - "typescript": "^4.4.2" + "typescript": "^4.4.2", + "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.0.0" }, "contributors": [ { diff --git a/test/socket.io.ts b/test/socket.io.ts index 63e7f1b830..28fffd789c 100644 --- a/test/socket.io.ts +++ b/test/socket.io.ts @@ -14,6 +14,7 @@ import { io as ioc, Socket as ClientSocket } from "socket.io-client"; import "./support/util"; import "./utility-methods"; +import "./uws"; type callback = (err: Error | null, success: boolean) => void; diff --git a/test/uws.ts b/test/uws.ts new file mode 100644 index 0000000000..7a7438a0aa --- /dev/null +++ b/test/uws.ts @@ -0,0 +1,181 @@ +import { App, us_socket_local_port } from "uWebSockets.js"; +import { Server } from ".."; +import { io as ioc, Socket as ClientSocket } from "socket.io-client"; +import request from "supertest"; +import expect from "expect.js"; + +const createPartialDone = (done: (err?: Error) => void, count: number) => { + let i = 0; + return () => { + if (++i === count) { + done(); + } else if (i > count) { + done(new Error(`partialDone() called too many times: ${i} > ${count}`)); + } + }; +}; + +const shouldNotHappen = (done) => () => done(new Error("should not happen")); + +describe("socket.io with uWebSocket.js-based engine", () => { + let io: Server, + port: number, + client: ClientSocket, + clientWSOnly: ClientSocket, + clientPollingOnly: ClientSocket, + clientCustomNamespace: ClientSocket; + + beforeEach((done) => { + const app = App(); + io = new Server(); + io.attachApp(app); + + io.of("/custom"); + + app.listen(0, (listenSocket) => { + port = us_socket_local_port(listenSocket); + + client = ioc(`http://localhost:${port}`); + clientWSOnly = ioc(`http://localhost:${port}`, { + transports: ["websocket"], + }); + clientPollingOnly = ioc(`http://localhost:${port}`, { + transports: ["polling"], + }); + clientCustomNamespace = ioc(`http://localhost:${port}/custom`); + }); + + const partialDone = createPartialDone(done, 4); + io.on("connection", partialDone); + io.of("/custom").on("connection", partialDone); + }); + + afterEach(() => { + io.close(); + client.disconnect(); + clientWSOnly.disconnect(); + clientPollingOnly.disconnect(); + clientCustomNamespace.disconnect(); + }); + + it("should broadcast", (done) => { + const partialDone = createPartialDone(done, 3); + + client.on("hello", partialDone); + clientWSOnly.on("hello", partialDone); + clientPollingOnly.on("hello", partialDone); + clientCustomNamespace.on("hello", shouldNotHappen(done)); + + io.emit("hello"); + }); + + it("should broadcast in a namespace", (done) => { + client.on("hello", shouldNotHappen(done)); + clientWSOnly.on("hello", shouldNotHappen(done)); + clientPollingOnly.on("hello", shouldNotHappen(done)); + clientCustomNamespace.on("hello", done); + + io.of("/custom").emit("hello"); + }); + + it("should broadcast in a dynamic namespace", (done) => { + const dynamicNamespace = io.of(/\/dynamic-\d+/); + const dynamicClient = clientWSOnly.io.socket("/dynamic-101"); + + dynamicClient.on("connect", () => { + dynamicNamespace.emit("hello"); + }); + + dynamicClient.on("hello", () => { + dynamicClient.disconnect(); + done(); + }); + }); + + it("should broadcast binary content", (done) => { + const partialDone = createPartialDone(done, 3); + + client.on("hello", partialDone); + clientWSOnly.on("hello", partialDone); + clientPollingOnly.on("hello", partialDone); + clientCustomNamespace.on("hello", shouldNotHappen(done)); + + io.emit("hello", Buffer.from([1, 2, 3])); + }); + + it("should broadcast in a room", (done) => { + const partialDone = createPartialDone(done, 2); + + client.on("hello", shouldNotHappen(done)); + clientWSOnly.on("hello", partialDone); + clientPollingOnly.on("hello", partialDone); + clientCustomNamespace.on("hello", shouldNotHappen(done)); + + io.of("/").sockets.get(clientWSOnly.id)!.join("room1"); + io.of("/").sockets.get(clientPollingOnly.id)!.join("room1"); + + io.to("room1").emit("hello"); + }); + + it("should broadcast in multiple rooms", (done) => { + const partialDone = createPartialDone(done, 2); + + client.on("hello", shouldNotHappen(done)); + clientWSOnly.on("hello", partialDone); + clientPollingOnly.on("hello", partialDone); + clientCustomNamespace.on("hello", shouldNotHappen(done)); + + io.of("/").sockets.get(clientWSOnly.id)!.join("room1"); + io.of("/").sockets.get(clientPollingOnly.id)!.join("room2"); + + io.to(["room1", "room2"]).emit("hello"); + }); + + it("should broadcast in all but a given room", (done) => { + const partialDone = createPartialDone(done, 2); + + client.on("hello", partialDone); + clientWSOnly.on("hello", partialDone); + clientPollingOnly.on("hello", shouldNotHappen(done)); + clientCustomNamespace.on("hello", shouldNotHappen(done)); + + io.of("/").sockets.get(clientWSOnly.id)!.join("room1"); + io.of("/").sockets.get(clientPollingOnly.id)!.join("room2"); + + io.except("room2").emit("hello"); + }); + + it("should work even after leaving room", (done) => { + const partialDone = createPartialDone(done, 2); + + client.on("hello", partialDone); + clientWSOnly.on("hello", shouldNotHappen(done)); + clientPollingOnly.on("hello", partialDone); + clientCustomNamespace.on("hello", shouldNotHappen(done)); + + io.of("/").sockets.get(client.id)!.join("room1"); + io.of("/").sockets.get(clientPollingOnly.id)!.join("room1"); + + io.of("/").sockets.get(clientWSOnly.id)!.join("room1"); + io.of("/").sockets.get(clientWSOnly.id)!.leave("room1"); + + io.to("room1").emit("hello"); + }); + + it("should serve static files", (done) => { + const clientVersion = require("socket.io-client/package.json").version; + + request(`http://localhost:${port}`) + .get("/socket.io/socket.io.js") + .buffer(true) + .end((err, res) => { + if (err) return done(err); + expect(res.headers["content-type"]).to.be("application/javascript"); + expect(res.headers.etag).to.be('"' + clientVersion + '"'); + expect(res.headers["x-sourcemap"]).to.be(undefined); + expect(res.text).to.match(/engine\.io/); + expect(res.status).to.be(200); + done(); + }); + }); +});