From 55ce82914bca151ba3ccfd7312a1ea3a0444186a Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Sat, 13 May 2023 00:09:13 +0200 Subject: [PATCH] test: run tests against a Redis cluster --- .github/workflows/ci.yml | 10 + docker-compose.yml | 5 + package.json | 7 +- test/custom-parser.ts | 31 ++- test/index.ts | 496 +++++++++++++++++++-------------------- test/specifics.ts | 381 +++++++++++++++--------------- test/test-runner.ts | 201 ++++++++++++++++ test/util.ts | 46 +--- 8 files changed, 687 insertions(+), 490 deletions(-) create mode 100644 test/test-runner.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4470170..39f6809 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,6 +30,16 @@ jobs: ports: - 6379:6379 + redis-cluster: + image: grokzen/redis-cluster:7.0.10 + options: >- + --health-cmd "redis-cli -p 7005 ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - "7000-7005:7000-7005" + steps: - name: Checkout repository uses: actions/checkout@v3 diff --git a/docker-compose.yml b/docker-compose.yml index c5c85d2..f138f0f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,3 +3,8 @@ services: image: redis:7 ports: - "6379:6379" + + redis-cluster: + image: grokzen/redis-cluster:7.0.10 + ports: + - "7000-7005:7000-7005" diff --git a/package.json b/package.json index 58875e4..e2e3249 100644 --- a/package.json +++ b/package.json @@ -13,12 +13,7 @@ "main": "./dist/index.js", "types": "./dist/index.d.ts", "scripts": { - "test": "npm run format:check && tsc && npm run test:default && npm run test:redis-v4-specific-channel && npm run test:redis-v3 && npm run test:ioredis && npm run test:sharded", - "test:default": "nyc mocha --bail --require ts-node/register test/*.ts", - "test:redis-v4-specific-channel": "SPECIFIC_CHANNEL=1 npm run test:default", - "test:redis-v3": "REDIS_CLIENT=redis-v3 npm run test:default", - "test:ioredis": "REDIS_CLIENT=ioredis npm run test:default", - "test:sharded": "SHARDED=1 npm run test:default", + "test": "npm run format:check && tsc && nyc mocha --bail --require ts-node/register test/test-runner.ts", "format:check": "prettier --parser typescript --check 'lib/**/*.ts' 'test/**/*.ts'", "format:fix": "prettier --parser typescript --write 'lib/**/*.ts' 'test/**/*.ts'", "prepack": "tsc" diff --git a/test/custom-parser.ts b/test/custom-parser.ts index ad47851..8e022c0 100644 --- a/test/custom-parser.ts +++ b/test/custom-parser.ts @@ -2,6 +2,8 @@ import type { Server } from "socket.io"; import type { Socket as ClientSocket } from "socket.io-client"; import { setup, times } from "./util"; import expect = require("expect.js"); +import { createClient } from "redis"; +import { createAdapter } from "../lib"; describe("custom parser", () => { let servers: Server[]; @@ -9,15 +11,28 @@ describe("custom parser", () => { let cleanup: () => void; beforeEach(async () => { - const testContext = await setup({ - parser: { - decode(msg) { - return JSON.parse(msg); - }, - encode(msg) { - return JSON.stringify(msg); + const testContext = await setup(async () => { + const pubClient = createClient(); + const subClient = pubClient.duplicate(); + + await Promise.all([pubClient.connect(), subClient.connect()]); + + return [ + createAdapter(pubClient, subClient, { + parser: { + decode(msg) { + return JSON.parse(msg); + }, + encode(msg) { + return JSON.stringify(msg); + }, + }, + }), + () => { + pubClient.disconnect(); + subClient.disconnect(); }, - }, + ]; }); servers = testContext.servers; clientSockets = testContext.clientSockets; diff --git a/test/index.ts b/test/index.ts index 33d11d3..fb27bcd 100644 --- a/test/index.ts +++ b/test/index.ts @@ -10,343 +10,343 @@ import { times, sleep, shouldNotHappen, setup } from "./util"; * @see https://github.com/socketio/socket.io-mongo-adapter * @see https://github.com/socketio/socket.io-postgres-adapter */ -describe("@socket.io/redis-adapter", () => { - let servers: Server[]; - let serverSockets: ServerSocket[]; - let clientSockets: ClientSocket[]; - let cleanup: () => void; - - beforeEach(async () => { - const testContext = await setup({ - requestsTimeout: 1000, +export function testSuite(createAdapter: any) { + describe("common", () => { + let servers: Server[]; + let serverSockets: ServerSocket[]; + let clientSockets: ClientSocket[]; + let cleanup: () => void; + + beforeEach(async () => { + const testContext = await setup(createAdapter); + servers = testContext.servers; + serverSockets = testContext.serverSockets; + clientSockets = testContext.clientSockets; + cleanup = testContext.cleanup; }); - servers = testContext.servers; - serverSockets = testContext.serverSockets; - clientSockets = testContext.clientSockets; - cleanup = testContext.cleanup; - }); - afterEach(() => cleanup()); + afterEach(() => cleanup()); - describe("broadcast", function () { - it("broadcasts to all clients", (done) => { - const partialDone = times(3, done); + describe("broadcast", function () { + it("broadcasts to all clients", (done) => { + const partialDone = times(3, done); - clientSockets.forEach((clientSocket) => { - clientSocket.on("test", (arg1, arg2, arg3) => { - expect(arg1).to.eql(1); - expect(arg2).to.eql("2"); - expect(Buffer.isBuffer(arg3)).to.be(true); - partialDone(); + clientSockets.forEach((clientSocket) => { + clientSocket.on("test", (arg1, arg2, arg3) => { + expect(arg1).to.eql(1); + expect(arg2).to.eql("2"); + expect(Buffer.isBuffer(arg3)).to.be(true); + partialDone(); + }); }); - }); - - servers[0].emit("test", 1, "2", Buffer.from([3, 4])); - }); - it("broadcasts to all clients in a namespace", (done) => { - const partialDone = times(3, () => { - servers.forEach((server) => server.of("/custom").adapter.close()); - done(); + servers[0].emit("test", 1, "2", Buffer.from([3, 4])); }); - servers.forEach((server) => server.of("/custom")); + it("broadcasts to all clients in a namespace", (done) => { + const partialDone = times(3, () => { + servers.forEach((server) => server.of("/custom").adapter.close()); + done(); + }); - const onConnect = times(3, async () => { - await sleep(200); + servers.forEach((server) => server.of("/custom")); - servers[0].of("/custom").emit("test"); - }); + const onConnect = times(3, async () => { + await sleep(200); - clientSockets.forEach((clientSocket) => { - const socket = clientSocket.io.socket("/custom"); - socket.on("connect", onConnect); - socket.on("test", () => { - socket.disconnect(); - partialDone(); + servers[0].of("/custom").emit("test"); }); - }); - }); - - it("broadcasts to all clients in a room", (done) => { - serverSockets[1].join("room1"); - clientSockets[0].on("test", shouldNotHappen(done)); - clientSockets[1].on("test", () => done()); - clientSockets[2].on("test", shouldNotHappen(done)); + clientSockets.forEach((clientSocket) => { + const socket = clientSocket.io.socket("/custom"); + socket.on("connect", onConnect); + socket.on("test", () => { + socket.disconnect(); + partialDone(); + }); + }); + }); - // delay is needed for the sharded adapter in dynamic mode - setTimeout(() => servers[0].to("room1").emit("test"), 50); - }); + it("broadcasts to all clients in a room", (done) => { + serverSockets[1].join("room1"); - it("broadcasts to all clients except in room", (done) => { - const partialDone = times(2, done); - serverSockets[1].join("room1"); + clientSockets[0].on("test", shouldNotHappen(done)); + clientSockets[1].on("test", () => done()); + clientSockets[2].on("test", shouldNotHappen(done)); - clientSockets[0].on("test", () => partialDone()); - clientSockets[1].on("test", shouldNotHappen(done)); - clientSockets[2].on("test", () => partialDone()); + // delay is needed for the sharded adapter in dynamic mode + setTimeout(() => servers[0].to("room1").emit("test"), 50); + }); - servers[0].of("/").except("room1").emit("test"); - }); + it("broadcasts to all clients except in room", (done) => { + const partialDone = times(2, done); + serverSockets[1].join("room1"); - it("broadcasts to all clients once", (done) => { - const partialDone = times(2, done); - serverSockets[0].join(["room1", "room2"]); - serverSockets[1].join(["room1", "room2", "room3"]); - serverSockets[2].join("room1"); + clientSockets[0].on("test", () => partialDone()); + clientSockets[1].on("test", shouldNotHappen(done)); + clientSockets[2].on("test", () => partialDone()); - clientSockets[0].on("test", () => partialDone()); - clientSockets[1].on("test", shouldNotHappen(done)); - clientSockets[2].on("test", () => partialDone()); + servers[0].of("/").except("room1").emit("test"); + }); - servers[0].of("/").to("room1").to("room2").except("room3").emit("test"); - }); + it("broadcasts to all clients once", (done) => { + const partialDone = times(2, done); + serverSockets[0].join(["room1", "room2"]); + serverSockets[1].join(["room1", "room2", "room3"]); + serverSockets[2].join("room1"); - it("broadcasts to local clients only", (done) => { - clientSockets[0].on("test", () => done()); - clientSockets[1].on("test", shouldNotHappen(done)); - clientSockets[2].on("test", shouldNotHappen(done)); + clientSockets[0].on("test", () => partialDone()); + clientSockets[1].on("test", shouldNotHappen(done)); + clientSockets[2].on("test", () => partialDone()); - servers[0].local.emit("test"); - }); + servers[0].of("/").to("room1").to("room2").except("room3").emit("test"); + }); - it("broadcasts with multiple acknowledgements", (done) => { - clientSockets[0].on("test", (cb) => cb(1)); - clientSockets[1].on("test", (cb) => cb(2)); - clientSockets[2].on("test", (cb) => cb(3)); + it("broadcasts to local clients only", (done) => { + clientSockets[0].on("test", () => done()); + clientSockets[1].on("test", shouldNotHappen(done)); + clientSockets[2].on("test", shouldNotHappen(done)); - servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { - expect(err).to.be(null); - expect(responses).to.contain(1); - expect(responses).to.contain(2); - expect(responses).to.contain(3); + servers[0].local.emit("test"); + }); - setTimeout(() => { - // @ts-ignore - expect(servers[0].of("/").adapter.ackRequests.size).to.eql(0); + it("broadcasts with multiple acknowledgements", (done) => { + clientSockets[0].on("test", (cb) => cb(1)); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", (cb) => cb(3)); - done(); - }, 500); - }); - }); + servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + expect(responses).to.contain(1); + expect(responses).to.contain(2); + expect(responses).to.contain(3); - it("broadcasts with multiple acknowledgements (binary content)", (done) => { - clientSockets[0].on("test", (cb) => cb(Buffer.from([1]))); - clientSockets[1].on("test", (cb) => cb(Buffer.from([2]))); - clientSockets[2].on("test", (cb) => cb(Buffer.from([3]))); + setTimeout(() => { + // @ts-ignore + expect(servers[0].of("/").adapter.ackRequests.size).to.eql(0); - servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { - expect(err).to.be(null); - responses.forEach((response) => { - expect(Buffer.isBuffer(response)).to.be(true); + done(); + }, 500); }); - - done(); }); - }); - it("broadcasts with multiple acknowledgements (no client)", (done) => { - servers[0] - .to("abc") - .timeout(500) - .emit("test", (err: Error, responses: any[]) => { + it("broadcasts with multiple acknowledgements (binary content)", (done) => { + clientSockets[0].on("test", (cb) => cb(Buffer.from([1]))); + clientSockets[1].on("test", (cb) => cb(Buffer.from([2]))); + clientSockets[2].on("test", (cb) => cb(Buffer.from([3]))); + + servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { expect(err).to.be(null); - expect(responses).to.eql([]); + responses.forEach((response) => { + expect(Buffer.isBuffer(response)).to.be(true); + }); done(); }); - }); + }); + + it("broadcasts with multiple acknowledgements (no client)", (done) => { + servers[0] + .to("abc") + .timeout(500) + .emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + expect(responses).to.eql([]); - it("broadcasts with multiple acknowledgements (timeout)", (done) => { - clientSockets[0].on("test", (cb) => cb(1)); - clientSockets[1].on("test", (cb) => cb(2)); - clientSockets[2].on("test", (_cb) => { - // do nothing + done(); + }); }); - servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { - expect(err).to.be.an(Error); - expect(responses).to.contain(1); - expect(responses).to.contain(2); + it("broadcasts with multiple acknowledgements (timeout)", (done) => { + clientSockets[0].on("test", (cb) => cb(1)); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", (_cb) => { + // do nothing + }); + + servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be.an(Error); + expect(responses).to.contain(1); + expect(responses).to.contain(2); - done(); + done(); + }); }); }); - }); - describe("socketsJoin", () => { - it("makes all socket instances join the specified room", async () => { - servers[0].socketsJoin("room1"); + describe("socketsJoin", () => { + it("makes all socket instances join the specified room", async () => { + servers[0].socketsJoin("room1"); - await sleep(200); + await sleep(200); - expect(serverSockets[0].rooms.has("room1")).to.be(true); - expect(serverSockets[1].rooms.has("room1")).to.be(true); - expect(serverSockets[2].rooms.has("room1")).to.be(true); - }); + expect(serverSockets[0].rooms.has("room1")).to.be(true); + expect(serverSockets[1].rooms.has("room1")).to.be(true); + expect(serverSockets[2].rooms.has("room1")).to.be(true); + }); - it("makes the matching socket instances join the specified room", async () => { - serverSockets[0].join("room1"); - serverSockets[2].join("room1"); + it("makes the matching socket instances join the specified room", async () => { + serverSockets[0].join("room1"); + serverSockets[2].join("room1"); - servers[0].in("room1").socketsJoin("room2"); + servers[0].in("room1").socketsJoin("room2"); - await sleep(200); + await sleep(200); - expect(serverSockets[0].rooms.has("room2")).to.be(true); - expect(serverSockets[1].rooms.has("room2")).to.be(false); - expect(serverSockets[2].rooms.has("room2")).to.be(true); - }); + expect(serverSockets[0].rooms.has("room2")).to.be(true); + expect(serverSockets[1].rooms.has("room2")).to.be(false); + expect(serverSockets[2].rooms.has("room2")).to.be(true); + }); - it("makes the given socket instance join the specified room", async () => { - servers[0].in(serverSockets[1].id).socketsJoin("room3"); + it("makes the given socket instance join the specified room", async () => { + servers[0].in(serverSockets[1].id).socketsJoin("room3"); - await sleep(200); + await sleep(200); - expect(serverSockets[0].rooms.has("room3")).to.be(false); - expect(serverSockets[1].rooms.has("room3")).to.be(true); - expect(serverSockets[2].rooms.has("room3")).to.be(false); + expect(serverSockets[0].rooms.has("room3")).to.be(false); + expect(serverSockets[1].rooms.has("room3")).to.be(true); + expect(serverSockets[2].rooms.has("room3")).to.be(false); + }); }); - }); - describe("socketsLeave", () => { - it("makes all socket instances leave the specified room", async () => { - serverSockets[0].join("room1"); - serverSockets[2].join("room1"); + describe("socketsLeave", () => { + it("makes all socket instances leave the specified room", async () => { + serverSockets[0].join("room1"); + serverSockets[2].join("room1"); - servers[0].socketsLeave("room1"); + servers[0].socketsLeave("room1"); - await sleep(200); + await sleep(200); - expect(serverSockets[0].rooms.has("room1")).to.be(false); - expect(serverSockets[1].rooms.has("room1")).to.be(false); - expect(serverSockets[2].rooms.has("room1")).to.be(false); - }); + expect(serverSockets[0].rooms.has("room1")).to.be(false); + expect(serverSockets[1].rooms.has("room1")).to.be(false); + expect(serverSockets[2].rooms.has("room1")).to.be(false); + }); - it("makes the matching socket instances leave the specified room", async () => { - serverSockets[0].join(["room1", "room2"]); - serverSockets[1].join(["room1", "room2"]); - serverSockets[2].join(["room2"]); + it("makes the matching socket instances leave the specified room", async () => { + serverSockets[0].join(["room1", "room2"]); + serverSockets[1].join(["room1", "room2"]); + serverSockets[2].join(["room2"]); - servers[0].in("room1").socketsLeave("room2"); + servers[0].in("room1").socketsLeave("room2"); - await sleep(200); + await sleep(200); - expect(serverSockets[0].rooms.has("room2")).to.be(false); - expect(serverSockets[1].rooms.has("room2")).to.be(false); - expect(serverSockets[2].rooms.has("room2")).to.be(true); - }); + expect(serverSockets[0].rooms.has("room2")).to.be(false); + expect(serverSockets[1].rooms.has("room2")).to.be(false); + expect(serverSockets[2].rooms.has("room2")).to.be(true); + }); - it("makes the given socket instance leave the specified room", async () => { - serverSockets[0].join("room3"); - serverSockets[1].join("room3"); - serverSockets[2].join("room3"); + it("makes the given socket instance leave the specified room", async () => { + serverSockets[0].join("room3"); + serverSockets[1].join("room3"); + serverSockets[2].join("room3"); - servers[0].in(serverSockets[1].id).socketsLeave("room3"); + servers[0].in(serverSockets[1].id).socketsLeave("room3"); - await sleep(200); + await sleep(200); - expect(serverSockets[0].rooms.has("room3")).to.be(true); - expect(serverSockets[1].rooms.has("room3")).to.be(false); - expect(serverSockets[2].rooms.has("room3")).to.be(true); + expect(serverSockets[0].rooms.has("room3")).to.be(true); + expect(serverSockets[1].rooms.has("room3")).to.be(false); + expect(serverSockets[2].rooms.has("room3")).to.be(true); + }); }); - }); - describe("disconnectSockets", () => { - it("makes all socket instances disconnect", (done) => { - const partialDone = times(3, done); + describe("disconnectSockets", () => { + it("makes all socket instances disconnect", (done) => { + const partialDone = times(3, done); - clientSockets.forEach((clientSocket) => { - clientSocket.on("disconnect", (reason) => { - expect(reason).to.eql("io server disconnect"); - partialDone(); + clientSockets.forEach((clientSocket) => { + clientSocket.on("disconnect", (reason) => { + expect(reason).to.eql("io server disconnect"); + partialDone(); + }); }); - }); - servers[0].disconnectSockets(); + servers[0].disconnectSockets(); + }); }); - }); - describe("fetchSockets", () => { - it("returns all socket instances", async () => { - const sockets = await servers[0].fetchSockets(); + describe("fetchSockets", () => { + it("returns all socket instances", async () => { + const sockets = await servers[0].fetchSockets(); - expect(sockets).to.be.an(Array); - expect(sockets).to.have.length(3); - // @ts-ignore - expect(servers[0].of("/").adapter.requests.size).to.eql(0); // clean up - }); + expect(sockets).to.be.an(Array); + expect(sockets).to.have.length(3); + // @ts-ignore + expect(servers[0].of("/").adapter.requests.size).to.eql(0); // clean up + }); - it("returns a single socket instance", async () => { - serverSockets[1].data = "test" as any; + it("returns a single socket instance", async () => { + serverSockets[1].data = "test" as any; - const [remoteSocket] = await servers[0] - .in(serverSockets[1].id) - .fetchSockets(); + const [remoteSocket] = await servers[0] + .in(serverSockets[1].id) + .fetchSockets(); - expect(remoteSocket.handshake).to.eql(serverSockets[1].handshake); - expect(remoteSocket.data).to.eql("test"); - expect(remoteSocket.rooms.size).to.eql(1); - }); + expect(remoteSocket.handshake).to.eql(serverSockets[1].handshake); + expect(remoteSocket.data).to.eql("test"); + expect(remoteSocket.rooms.size).to.eql(1); + }); - it("returns only local socket instances", async () => { - const sockets = await servers[0].local.fetchSockets(); + it("returns only local socket instances", async () => { + const sockets = await servers[0].local.fetchSockets(); - expect(sockets).to.have.length(1); + expect(sockets).to.have.length(1); + }); }); - }); - describe("serverSideEmit", () => { - it("sends an event to other server instances", (done) => { - const partialDone = times(2, done); + describe("serverSideEmit", () => { + it("sends an event to other server instances", (done) => { + const partialDone = times(2, done); + + servers[0].serverSideEmit("hello", "world", 1, "2"); - servers[0].serverSideEmit("hello", "world", 1, "2"); + servers[0].on("hello", shouldNotHappen(done)); - servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (arg1, arg2, arg3) => { + expect(arg1).to.eql("world"); + expect(arg2).to.eql(1); + expect(arg3).to.eql("2"); + partialDone(); + }); - servers[1].on("hello", (arg1, arg2, arg3) => { - expect(arg1).to.eql("world"); - expect(arg2).to.eql(1); - expect(arg3).to.eql("2"); - partialDone(); + servers[2].of("/").on("hello", () => partialDone()); }); - servers[2].of("/").on("hello", () => partialDone()); - }); + it("sends an event and receives a response from the other server instances", (done) => { + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err).to.be(null); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + expect(response).to.contain("3"); + done(); + }); - it("sends an event and receives a response from the other server instances", (done) => { - servers[0].serverSideEmit("hello", (err: Error, response: any) => { - expect(err).to.be(null); - expect(response).to.be.an(Array); - expect(response).to.contain(2); - expect(response).to.contain("3"); - done(); + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", (cb) => cb("3")); }); - servers[0].on("hello", shouldNotHappen(done)); - servers[1].on("hello", (cb) => cb(2)); - servers[2].on("hello", (cb) => cb("3")); - }); + it("sends an event but timeout if one server does not respond", function (done) { + // TODO the serverSideEmit() method currently ignores the timeout() flag + this.timeout(6000); - it("sends an event but timeout if one server does not respond", function (done) { - // TODO the serverSideEmit() method currently ignores the timeout() flag - this.timeout(6000); - - servers[0].serverSideEmit("hello", (err: Error, response: any) => { - expect(err.message).to.be( - "timeout reached: only 1 responses received out of 2" - ); - expect(response).to.be.an(Array); - expect(response).to.contain(2); - done(); - }); + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err.message).to.be( + "timeout reached: only 1 responses received out of 2" + ); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + done(); + }); - servers[0].on("hello", shouldNotHappen(done)); - servers[1].on("hello", (cb) => cb(2)); - servers[2].on("hello", () => { - // do nothing + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", () => { + // do nothing + }); }); }); }); -}); +} diff --git a/test/specifics.ts b/test/specifics.ts index b913705..1d3c09d 100644 --- a/test/specifics.ts +++ b/test/specifics.ts @@ -4,212 +4,217 @@ import expect = require("expect.js"); import { shouldNotHappen, setup } from "./util"; import type { RedisAdapter } from "../lib"; -describe("specifics", () => { - let servers: Server[]; - let serverSockets: ServerSocket[]; - let clientSockets: ClientSocket[]; - let cleanup: () => void; - - beforeEach(async () => { - const testContext = await setup({ - requestsTimeout: 1000, +export function testSuite( + createAdapter: any, + redisPackage: string, + sharded: boolean +) { + describe("specifics", () => { + let servers: Server[]; + let serverSockets: ServerSocket[]; + let clientSockets: ClientSocket[]; + let cleanup: () => void; + + beforeEach(async () => { + const testContext = await setup(createAdapter); + servers = testContext.servers; + serverSockets = testContext.serverSockets; + clientSockets = testContext.clientSockets; + cleanup = testContext.cleanup; }); - servers = testContext.servers; - serverSockets = testContext.serverSockets; - clientSockets = testContext.clientSockets; - cleanup = testContext.cleanup; - }); - - afterEach(() => cleanup()); - - describe("broadcast", function () { - it("broadcasts to a numeric room", function (done) { - if (process.env.SHARDED === "1") { - return this.skip(); - } - // @ts-ignore - serverSockets[0].join(123); - - clientSockets[0].on("test", () => done()); - clientSockets[1].on("test", shouldNotHappen(done)); - clientSockets[2].on("test", shouldNotHappen(done)); - - // @ts-ignore - servers[1].to(123).emit("test"); - }); - }); - it("unsubscribes when close is called", async function () { - if (process.env.SHARDED === "1") { - return this.skip(); - } - const parseInfo = (rawInfo: string) => { - const info = {}; + afterEach(() => cleanup()); - rawInfo.split("\r\n").forEach((line) => { - if (line.length > 0 && !line.startsWith("#")) { - const fieldVal = line.split(":"); - info[fieldVal[0]] = fieldVal[1]; + describe("broadcast", function () { + it("broadcasts to a numeric room", function (done) { + if (sharded) { + return this.skip(); } - }); + // @ts-ignore + serverSockets[0].join(123); - return info; - }; + clientSockets[0].on("test", () => done()); + clientSockets[1].on("test", shouldNotHappen(done)); + clientSockets[2].on("test", shouldNotHappen(done)); - const getInfo = async (): Promise => { - if (process.env.REDIS_CLIENT === undefined) { - return parseInfo( - await ( - servers[2].of("/").adapter as RedisAdapter - ).pubClient.sendCommand(["info"]) - ); - } else if (process.env.REDIS_CLIENT === "ioredis") { // @ts-ignore - return parseInfo( - await (servers[2].of("/").adapter as RedisAdapter).pubClient.call( - "info" - ) - ); - } else { - return await new Promise((resolve, reject) => { - (servers[2].of("/").adapter as RedisAdapter).pubClient.sendCommand( - "info", - [], - (err, result) => { - if (err) { - reject(err); - } - resolve(parseInfo(result)); - } - ); - }); - } - }; - - return new Promise(async (resolve, reject) => { - // Give it a moment to subscribe to all the channels - setTimeout(async () => { - try { - const info = await getInfo(); - - // Depending on the version of redis this may be 3 (redis < v5) or 1 (redis > v4) - // Older versions subscribed multiple times on the same pattern. Newer versions only sub once. - expect(info.pubsub_patterns).to.be.greaterThan(0); - expect(info.pubsub_channels).to.eql(5); // 2 shared (request/response) + 3 unique for each namespace - - servers[0].of("/").adapter.close(); - servers[1].of("/").adapter.close(); - servers[2].of("/").adapter.close(); - - // Give it a moment to unsubscribe - setTimeout(async () => { - try { - const info = await getInfo(); - - expect(info.pubsub_patterns).to.eql(0); // All patterns subscriptions should be unsubscribed - expect(info.pubsub_channels).to.eql(0); // All subscriptions should be unsubscribed - resolve(); - } catch (error) { - reject(error); - } - }, 100); - } catch (error) { - reject(error); - } - }, 100); + servers[1].to(123).emit("test"); + }); }); - }); - if (process.env.REDIS_CLIENT === undefined) { - // redis@4 - it("ignores messages from unknown channels", (done) => { - (servers[0].of("/").adapter as RedisAdapter).subClient - .PSUBSCRIBE("f?o", () => { - setTimeout(done, 50); - }) - .then(() => { - (servers[2].of("/").adapter as RedisAdapter).pubClient.publish( - "foo", - "bar" - ); + // TODO handle Redis cluster + it.skip("unsubscribes when close is called", async function () { + if (sharded) { + return this.skip(); + } + const parseInfo = (rawInfo: string) => { + const info = {}; + + rawInfo.split("\r\n").forEach((line) => { + if (line.length > 0 && !line.startsWith("#")) { + const fieldVal = line.split(":"); + info[fieldVal[0]] = fieldVal[1]; + } }); - }); - it("ignores messages from unknown channels (2)", (done) => { - (servers[0].of("/").adapter as RedisAdapter).subClient - .PSUBSCRIBE("woot", () => { - setTimeout(done, 50); - }) - .then(() => { - (servers[2].of("/").adapter as RedisAdapter).pubClient.publish( - "woot", - "toow" + return info; + }; + + const getInfo = async (): Promise => { + if (process.env.REDIS_CLIENT === undefined) { + return parseInfo( + await ( + servers[2].of("/").adapter as RedisAdapter + ).pubClient.sendCommand(["info"]) ); - }); - }); - } else { - // redis@3 and ioredis - it("ignores messages from unknown channels", (done) => { - (servers[0].of("/").adapter as RedisAdapter).subClient.psubscribe( - "f?o", - () => { - (servers[2].of("/").adapter as RedisAdapter).pubClient.publish( - "foo", - "bar" + } else if (process.env.REDIS_CLIENT === "ioredis") { + // @ts-ignore + return parseInfo( + await (servers[2].of("/").adapter as RedisAdapter).pubClient.call( + "info" + ) ); + } else { + return await new Promise((resolve, reject) => { + (servers[2].of("/").adapter as RedisAdapter).pubClient.sendCommand( + "info", + [], + (err, result) => { + if (err) { + reject(err); + } + resolve(parseInfo(result)); + } + ); + }); } - ); - - (servers[0].of("/").adapter as RedisAdapter).subClient.on( - "pmessageBuffer", - () => { - setTimeout(done, 50); - } - ); + }; + + return new Promise(async (resolve, reject) => { + // Give it a moment to subscribe to all the channels + setTimeout(async () => { + try { + const info = await getInfo(); + + // Depending on the version of redis this may be 3 (redis < v5) or 1 (redis > v4) + // Older versions subscribed multiple times on the same pattern. Newer versions only sub once. + expect(info.pubsub_patterns).to.be.greaterThan(0); + expect(info.pubsub_channels).to.eql(5); // 2 shared (request/response) + 3 unique for each namespace + + servers[0].of("/").adapter.close(); + servers[1].of("/").adapter.close(); + servers[2].of("/").adapter.close(); + + // Give it a moment to unsubscribe + setTimeout(async () => { + try { + const info = await getInfo(); + + expect(info.pubsub_patterns).to.eql(0); // All patterns subscriptions should be unsubscribed + expect(info.pubsub_channels).to.eql(0); // All subscriptions should be unsubscribed + resolve(); + } catch (error) { + reject(error); + } + }, 100); + } catch (error) { + reject(error); + } + }, 100); + }); }); - it("ignores messages from unknown channels (2)", (done) => { - (servers[0].of("/").adapter as RedisAdapter).subClient.subscribe( - "woot", - () => { - (servers[2].of("/").adapter as RedisAdapter).pubClient.publish( - "woot", - "toow" - ); - } - ); + if (redisPackage === "redis@4") { + // redis@4 + it("ignores messages from unknown channels", (done) => { + (servers[0].of("/").adapter as RedisAdapter).subClient + .PSUBSCRIBE("f?o", () => { + setTimeout(done, 50); + }) + .then(() => { + (servers[2].of("/").adapter as RedisAdapter).pubClient.publish( + "foo", + "bar" + ); + }); + }); - (servers[0].of("/").adapter as RedisAdapter).subClient.on( - "messageBuffer", - () => { - setTimeout(done, 50); - } - ); - }); - } + it("ignores messages from unknown channels (2)", (done) => { + (servers[0].of("/").adapter as RedisAdapter).subClient + .PSUBSCRIBE("woot", () => { + setTimeout(done, 50); + }) + .then(() => { + (servers[2].of("/").adapter as RedisAdapter).pubClient.publish( + "woot", + "toow" + ); + }); + }); + } else { + // redis@3 and ioredis + it("ignores messages from unknown channels", (done) => { + (servers[0].of("/").adapter as RedisAdapter).subClient.psubscribe( + "f?o", + () => { + (servers[2].of("/").adapter as RedisAdapter).pubClient.publish( + "foo", + "bar" + ); + } + ); - describe("allRooms", () => { - afterEach(() => { - // @ts-ignore - expect(servers[0].of("/").adapter.requests.size).to.eql(0); - }); + (servers[0].of("/").adapter as RedisAdapter).subClient.on( + "pmessageBuffer", + () => { + setTimeout(done, 50); + } + ); + }); - it("returns all rooms across several nodes", async function () { - if (process.env.SHARDED === "1") { - return this.skip(); - } - serverSockets[0].join("woot1"); - - const rooms = await ( - servers[0].of("/").adapter as RedisAdapter - ).allRooms(); - - expect(rooms).to.be.a(Set); - expect(rooms.size).to.eql(4); - expect(rooms.has(serverSockets[0].id)).to.be(true); - expect(rooms.has(serverSockets[1].id)).to.be(true); - expect(rooms.has(serverSockets[2].id)).to.be(true); - expect(rooms.has("woot1")).to.be(true); + it("ignores messages from unknown channels (2)", (done) => { + (servers[0].of("/").adapter as RedisAdapter).subClient.subscribe( + "woot", + () => { + (servers[2].of("/").adapter as RedisAdapter).pubClient.publish( + "woot", + "toow" + ); + } + ); + + (servers[0].of("/").adapter as RedisAdapter).subClient.on( + "messageBuffer", + () => { + setTimeout(done, 50); + } + ); + }); + } + + describe("allRooms", () => { + afterEach(() => { + // @ts-ignore + expect(servers[0].of("/").adapter.requests.size).to.eql(0); + }); + + it("returns all rooms across several nodes", async function () { + if (sharded) { + return this.skip(); + } + serverSockets[0].join("woot1"); + + const rooms = await ( + servers[0].of("/").adapter as RedisAdapter + ).allRooms(); + + expect(rooms).to.be.a(Set); + expect(rooms.size).to.eql(4); + expect(rooms.has(serverSockets[0].id)).to.be(true); + expect(rooms.has(serverSockets[1].id)).to.be(true); + expect(rooms.has(serverSockets[2].id)).to.be(true); + expect(rooms.has("woot1")).to.be(true); + }); }); }); -}); +} diff --git a/test/test-runner.ts b/test/test-runner.ts new file mode 100644 index 0000000..a2b5e99 --- /dev/null +++ b/test/test-runner.ts @@ -0,0 +1,201 @@ +import { testSuite as commonTestSuite } from "./index"; +import { testSuite as specificsTestSuite } from "./specifics"; +import { createAdapter, createShardedAdapter } from "../lib"; +import { createClient, createCluster } from "redis"; +import Redis from "ioredis"; +import { createClient as createClientV3 } from "redis-v3"; + +const clusterNodes = [ + { + url: "redis://localhost:7000", + host: "localhost", + port: 7000, + }, + { + url: "redis://localhost:7001", + host: "localhost", + port: 7001, + }, + { + url: "redis://localhost:7002", + host: "localhost", + port: 7002, + }, + { + url: "redis://localhost:7003", + host: "localhost", + port: 7003, + }, + { + url: "redis://localhost:7004", + host: "localhost", + port: 7004, + }, + { + url: "redis://localhost:7005", + host: "localhost", + port: 7005, + }, +]; + +function testSuite( + createAdapter: any, + redisPackage: string = "redis@4", + sharded = false +) { + commonTestSuite(createAdapter); + specificsTestSuite(createAdapter, redisPackage, sharded); +} + +describe("@socket.io/redis-adapter", () => { + describe("redis@4 standalone", () => + testSuite(async () => { + const pubClient = createClient(); + const subClient = pubClient.duplicate(); + + await Promise.all([pubClient.connect(), subClient.connect()]); + + return [ + createAdapter(pubClient, subClient, { + requestsTimeout: 1000, + }), + () => { + pubClient.disconnect(); + subClient.disconnect(); + }, + ]; + })); + + describe("redis@4 standalone (specific response channel)", () => + testSuite(async () => { + const pubClient = createClient(); + const subClient = pubClient.duplicate(); + + await Promise.all([pubClient.connect(), subClient.connect()]); + + return [ + createAdapter(pubClient, subClient, { + requestsTimeout: 1000, + publishOnSpecificResponseChannel: true, + }), + () => { + pubClient.disconnect(); + subClient.disconnect(); + }, + ]; + })); + + describe("redis@4 cluster", () => + testSuite(async () => { + const pubClient = createCluster({ + rootNodes: clusterNodes, + }); + const subClient = pubClient.duplicate(); + + await Promise.all([pubClient.connect(), subClient.connect()]); + + return [ + createAdapter(pubClient, subClient, { + requestsTimeout: 1000, + }), + () => { + pubClient.disconnect(); + subClient.disconnect(); + }, + ]; + })); + + describe("redis@3 standalone", () => + testSuite(async () => { + const pubClient = createClientV3(); + const subClient = pubClient.duplicate(); + + return [ + createAdapter(pubClient, subClient, { + requestsTimeout: 1000, + }), + () => { + pubClient.quit(); + subClient.quit(); + }, + ]; + }, "redis@3")); + + describe("ioredis standalone", () => + testSuite(async () => { + const pubClient = Redis.createClient(); + const subClient = pubClient.duplicate(); + + return [ + createAdapter(pubClient, subClient, { + requestsTimeout: 1000, + }), + () => { + pubClient.disconnect(); + subClient.disconnect(); + }, + ]; + }, "ioredis")); + + describe("ioredis cluster", () => + testSuite(async () => { + const pubClient = new Redis.Cluster(clusterNodes); + const subClient = pubClient.duplicate(); + + return [ + createAdapter(pubClient, subClient, { + requestsTimeout: 1000, + }), + () => { + pubClient.disconnect(); + subClient.disconnect(); + }, + ]; + }, "ioredis")); + + describe("[sharded] redis@4 standalone (dynamic subscription mode)", () => + testSuite( + async () => { + const pubClient = createClient(); + const subClient = pubClient.duplicate(); + + await Promise.all([pubClient.connect(), subClient.connect()]); + + return [ + createShardedAdapter(pubClient, subClient, { + subscriptionMode: "dynamic", + }), + () => { + pubClient.disconnect(); + subClient.disconnect(); + }, + ]; + }, + "redis@4", + true + )); + + describe("[sharded] redis@4 standalone (static subscription mode)", () => + testSuite( + async () => { + const pubClient = createClient(); + const subClient = pubClient.duplicate(); + + await Promise.all([pubClient.connect(), subClient.connect()]); + + return [ + createShardedAdapter(pubClient, subClient, { + subscriptionMode: "static", + }), + () => { + pubClient.disconnect(); + subClient.disconnect(); + }, + ]; + }, + "redis@4", + true + )); + + import("./custom-parser"); +}); diff --git a/test/util.ts b/test/util.ts index e17066b..0b1412a 100644 --- a/test/util.ts +++ b/test/util.ts @@ -1,10 +1,5 @@ import { createServer } from "http"; import { AddressInfo } from "net"; -import { - createAdapter, - createShardedAdapter, - RedisAdapterOptions, -} from "../lib"; import { Server, Socket as ServerSocket } from "socket.io"; import { io as ioc, Socket as ClientSocket } from "socket.io-client"; @@ -37,42 +32,19 @@ interface TestContext { cleanup: () => void; } -async function createRedisClient() { - switch (process.env.REDIS_CLIENT) { - case "ioredis": - return require("ioredis").createClient(); - case "redis-v3": - return require("redis-v3").createClient(); - default: - // redis@4 - const client = require("redis").createClient(); - await client.connect(); - return client; - } -} - -export function setup(adapterOptions: Partial = {}) { +export function setup(createAdapter: any) { const servers = []; const serverSockets = []; const clientSockets = []; - const redisClients = []; + const redisCleanupFunctions = []; return new Promise(async (resolve) => { for (let i = 1; i <= NODES_COUNT; i++) { - const [pubClient, subClient] = await Promise.all([ - createRedisClient(), - createRedisClient(), - ]); - - adapterOptions.publishOnSpecificResponseChannel = - process.env.SPECIFIC_CHANNEL !== undefined; + const [adapter, redisCleanup] = await createAdapter(); const httpServer = createServer(); const io = new Server(httpServer, { - adapter: - process.env.SHARDED === "1" - ? createShardedAdapter(pubClient, subClient) - : createAdapter(pubClient, subClient, adapterOptions), + adapter, }); httpServer.listen(() => { const port = (httpServer.address() as AddressInfo).port; @@ -82,7 +54,7 @@ export function setup(adapterOptions: Partial = {}) { clientSockets.push(clientSocket); serverSockets.push(socket); servers.push(io); - redisClients.push(pubClient, subClient); + redisCleanupFunctions.push(redisCleanup); if (servers.length === NODES_COUNT) { await sleep(200); @@ -99,13 +71,7 @@ export function setup(adapterOptions: Partial = {}) { clientSockets.forEach((socket) => { socket.disconnect(); }); - redisClients.forEach((redisClient) => { - if (process.env.REDIS_CLIENT === "redis-v3") { - redisClient.quit(); - } else { - redisClient.disconnect(); - } - }); + redisCleanupFunctions.forEach((fn) => fn()); }, }); }