From 58810462174f20acb09eb1050694c9adce12ce33 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 15:29:05 +0100 Subject: [PATCH 01/18] pool of 1 for redis to help with async --- package.json | 2 ++ src/db.ts | 43 ++++++++++++++++++++++++++++-------- src/ingestion/kafka-queue.ts | 2 +- yarn.lock | 12 ++++++++++ 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/package.json b/package.json index b7b6dfa4..8c4f22d8 100644 --- a/package.json +++ b/package.json @@ -52,6 +52,7 @@ "@sentry/tracing": "^5.29.0", "adm-zip": "^0.4.16", "fastify": "^3.8.0", + "generic-pool": "^3.7.1", "hot-shots": "^8.2.1", "ioredis": "^4.19.2", "kafkajs": "^1.15.0", @@ -76,6 +77,7 @@ "@posthog/plugin-scaffold": "0.2.8", "@types/adm-zip": "^0.4.33", "@types/babel__standalone": "^7.1.3", + "@types/generic-pool": "^3.1.9", "@types/ioredis": "^4.17.7", "@types/jest": "^26.0.15", "@types/luxon": "^1.25.0", diff --git a/src/db.ts b/src/db.ts index 7044fa95..1fca4af9 100644 --- a/src/db.ts +++ b/src/db.ts @@ -1,5 +1,6 @@ import ClickHouse from '@posthog/clickhouse' import { Properties } from '@posthog/plugin-scaffold' +import { createPool,Pool as GenericPool } from 'generic-pool' import Redis from 'ioredis' import { Producer, ProducerRecord } from 'kafkajs' import { DateTime } from 'luxon' @@ -7,6 +8,7 @@ import { Pool, PoolClient, QueryConfig, QueryResult, QueryResultRow } from 'pg' import { KAFKA_PERSON, KAFKA_PERSON_UNIQUE_ID } from './ingestion/topics' import { chainToElements, hashElements, timeoutGuard, unparsePersonPartial } from './ingestion/utils' +import { status } from './status' import { ClickHouseEvent, ClickHousePerson, @@ -36,7 +38,8 @@ export class DB { /** Postgres connection pool for primary database access. */ postgres: Pool /** Redis used for various caches. */ - redis: Redis.Redis + redisPool: GenericPool + /** Kafka producer used for syncing Postgres and ClickHouse person data. */ kafkaProducer?: Producer /** ClickHouse used for syncing Postgres and ClickHouse person data. */ @@ -44,9 +47,19 @@ export class DB { constructor(postgres: Pool, redis: Redis.Redis, kafkaProducer?: Producer, clickhouse?: ClickHouse) { this.postgres = postgres - this.redis = redis this.kafkaProducer = kafkaProducer this.clickhouse = clickhouse + this.redisPool = createPool( + { + create: () => Promise.resolve(redis), + destroy: () => Promise.resolve(undefined), + }, + { + min: 1, + max: 1, + autostart: true, + } + ) } // Postgres @@ -118,10 +131,11 @@ export class DB { // Redis public async redisGet(key: string, defaultValue: unknown, parseJSON = true): Promise { + const client = await this.redisPool.acquire() const timeout = timeoutGuard(`Getting redis key delayed. Waiting over 30 sec to get key: ${key}`) try { const value = await tryTwice( - async () => await this.redis.get(key), + async () => await client.get(key), `Waited 5 sec to get redis key: ${key}, retrying once!` ) if (typeof value === 'undefined') { @@ -137,57 +151,68 @@ export class DB { } } finally { clearTimeout(timeout) + await this.redisPool.release(client) } } public async redisSet(key: string, value: unknown, ttlSeconds?: number, stringify = true): Promise { + const client = await this.redisPool.acquire() const timeout = timeoutGuard(`Setting redis key delayed. Waiting over 30 sec to set key: ${key}`) try { const serializedValue = stringify ? JSON.stringify(value) : (value as string) if (ttlSeconds) { - await this.redis.set(key, serializedValue, 'EX', ttlSeconds) + await client.set(key, serializedValue, 'EX', ttlSeconds) } else { - await this.redis.set(key, serializedValue) + await client.set(key, serializedValue) } } finally { clearTimeout(timeout) + await this.redisPool.release(client) } } public async redisIncr(key: string): Promise { + const client = await this.redisPool.acquire() const timeout = timeoutGuard(`Incrementing redis key delayed. Waiting over 30 sec to incr key: ${key}`) try { - return await this.redis.incr(key) + return await client.incr(key) } finally { clearTimeout(timeout) + await this.redisPool.release(client) } } public async redisExpire(key: string, ttlSeconds: number): Promise { + const client = await this.redisPool.acquire() const timeout = timeoutGuard(`Expiring redis key delayed. Waiting over 30 sec to expire key: ${key}`) try { - return (await this.redis.expire(key, ttlSeconds)) === 1 + return (await client.expire(key, ttlSeconds)) === 1 } finally { clearTimeout(timeout) + await this.redisPool.release(client) } } public async redisLPush(key: string, value: unknown, stringify = true): Promise { + const client = await this.redisPool.acquire() const timeout = timeoutGuard(`LPushing redis key delayed. Waiting over 30 sec to lpush key: ${key}`) try { const serializedValue = stringify ? JSON.stringify(value) : (value as string) - return await this.redis.lpush(key, serializedValue) + return await client.lpush(key, serializedValue) } finally { clearTimeout(timeout) + await this.redisPool.release(client) } } public async redisBRPop(key1: string, key2: string): Promise<[string, string]> { + const client = await this.redisPool.acquire() const timeout = timeoutGuard(`BRPoping redis key delayed. Waiting over 30 sec to brpop keys: ${key1}, ${key2}`) try { - return await this.redis.brpop(key1, key2) + return await client.brpop(key1, key2) } finally { clearTimeout(timeout) + await this.redisPool.release(client) } } diff --git a/src/ingestion/kafka-queue.ts b/src/ingestion/kafka-queue.ts index afb3a04b..d95b03e8 100644 --- a/src/ingestion/kafka-queue.ts +++ b/src/ingestion/kafka-queue.ts @@ -129,7 +129,7 @@ export class KafkaQueue implements Queue { status.info( '🧩', - `Kafka Batch of ${batch.messages.length} events completed in ${ + `Kafka Batch of ${pluginEvents.length} events completed in ${ new Date().valueOf() - batchStartTimer.valueOf() }ms (plugins: ${batchIngestionTimer.valueOf() - batchStartTimer.valueOf()}ms, ingestion: ${ new Date().valueOf() - batchIngestionTimer.valueOf() diff --git a/yarn.lock b/yarn.lock index ff1fe5cd..a45afd35 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1391,6 +1391,13 @@ resolved "https://registry.yarnpkg.com/@types/cookiejar/-/cookiejar-2.1.2.tgz#66ad9331f63fe8a3d3d9d8c6e3906dd10f6446e8" integrity sha512-t73xJJrvdTjXrn4jLS9VSGRbz0nUY3cl2DMGDU48lKl+HR9dbbjW2A9r3g40VA++mQpy6uuHg33gy7du2BKpog== +"@types/generic-pool@^3.1.9": + version "3.1.9" + resolved "https://registry.yarnpkg.com/@types/generic-pool/-/generic-pool-3.1.9.tgz#cc82ee0d92561fce713f8f9a7b2380eda8a89dcb" + integrity sha512-IkXMs8fhV6+E4J8EWv8iL7mLvApcLLQUH4m1Rex3KCPRqT+Xya0DDHIeGAokk/6VXe9zg8oTWyr+FGyeuimEYQ== + dependencies: + "@types/node" "*" + "@types/graceful-fs@^4.1.2": version "4.1.4" resolved "https://registry.yarnpkg.com/@types/graceful-fs/-/graceful-fs-4.1.4.tgz#4ff9f641a7c6d1a3508ff88bc3141b152772e753" @@ -3508,6 +3515,11 @@ gcp-metadata@^4.2.0: gaxios "^4.0.0" json-bigint "^1.0.0" +generic-pool@^3.7.1: + version "3.7.1" + resolved "https://registry.yarnpkg.com/generic-pool/-/generic-pool-3.7.1.tgz#36fe5bb83e7e0e032e5d32cd05dc00f5ff119aa8" + integrity sha512-ug6DAZoNgWm6q5KhPFA+hzXfBLFQu5sTXxPpv44DmE0A2g+CiHoq9LTVdkXpZMkYVMoGw83F6W+WT0h0MFMK/w== + gensync@^1.0.0-beta.1: version "1.0.0-beta.2" resolved "https://registry.yarnpkg.com/gensync/-/gensync-1.0.0-beta.2.tgz#32a6ee76c3d7f52d46b2b1ae5d93fea8580a25e0" From b4ea9b3f500bfbcba73837f254e22ad3a68c1648 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 15:35:04 +0100 Subject: [PATCH 02/18] space --- src/db.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.ts b/src/db.ts index 1fca4af9..4faefc3a 100644 --- a/src/db.ts +++ b/src/db.ts @@ -1,6 +1,6 @@ import ClickHouse from '@posthog/clickhouse' import { Properties } from '@posthog/plugin-scaffold' -import { createPool,Pool as GenericPool } from 'generic-pool' +import { createPool, Pool as GenericPool } from 'generic-pool' import Redis from 'ioredis' import { Producer, ProducerRecord } from 'kafkajs' import { DateTime } from 'luxon' From 3c0c6b593ed59eb59aeec9de6a6c9b2cbc711825 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 15:44:30 +0100 Subject: [PATCH 03/18] increase delay now that redis is sequential --- benchmarks/postgres/e2e.celery.benchmark.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/postgres/e2e.celery.benchmark.ts b/benchmarks/postgres/e2e.celery.benchmark.ts index 0b933fec..fe477b9b 100644 --- a/benchmarks/postgres/e2e.celery.benchmark.ts +++ b/benchmarks/postgres/e2e.celery.benchmark.ts @@ -66,7 +66,7 @@ describe('e2e celery & postgres benchmark', () => { for (let i = 0; i < count; i++) { createEvent() } - await delay(1000) + await delay(2000) expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(count) queue.resume() From c16c038a321aa46aae6675b0b5ba097e654d68ac Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 16:10:30 +0100 Subject: [PATCH 04/18] actually use a pool --- src/db.ts | 23 +++++++++-------------- src/server.ts | 38 +++++++++++++++++++++----------------- src/services/schedule.ts | 5 ++++- src/types.ts | 3 ++- src/utils.ts | 19 ++++++++++++++++++- 5 files changed, 54 insertions(+), 34 deletions(-) diff --git a/src/db.ts b/src/db.ts index 4faefc3a..5213a37e 100644 --- a/src/db.ts +++ b/src/db.ts @@ -1,6 +1,6 @@ import ClickHouse from '@posthog/clickhouse' import { Properties } from '@posthog/plugin-scaffold' -import { createPool, Pool as GenericPool } from 'generic-pool' +import { Pool as GenericPool } from 'generic-pool' import Redis from 'ioredis' import { Producer, ProducerRecord } from 'kafkajs' import { DateTime } from 'luxon' @@ -8,7 +8,6 @@ import { Pool, PoolClient, QueryConfig, QueryResult, QueryResultRow } from 'pg' import { KAFKA_PERSON, KAFKA_PERSON_UNIQUE_ID } from './ingestion/topics' import { chainToElements, hashElements, timeoutGuard, unparsePersonPartial } from './ingestion/utils' -import { status } from './status' import { ClickHouseEvent, ClickHousePerson, @@ -28,6 +27,7 @@ import { import { castTimestampOrNow, clickHouseTimestampToISO, + createRedis, escapeClickHouseString, sanitizeSqlIdentifier, tryTwice, @@ -45,21 +45,16 @@ export class DB { /** ClickHouse used for syncing Postgres and ClickHouse person data. */ clickhouse?: ClickHouse - constructor(postgres: Pool, redis: Redis.Redis, kafkaProducer?: Producer, clickhouse?: ClickHouse) { + constructor( + postgres: Pool, + redisPool: GenericPool, + kafkaProducer?: Producer, + clickhouse?: ClickHouse + ) { this.postgres = postgres this.kafkaProducer = kafkaProducer this.clickhouse = clickhouse - this.redisPool = createPool( - { - create: () => Promise.resolve(redis), - destroy: () => Promise.resolve(undefined), - }, - { - min: 1, - max: 1, - autostart: true, - } - ) + this.redisPool = redisPool } // Postgres diff --git a/src/server.ts b/src/server.ts index ca00d674..10cabe5e 100644 --- a/src/server.ts +++ b/src/server.ts @@ -3,6 +3,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' import { FastifyInstance } from 'fastify' import * as fs from 'fs' +import { createPool } from 'generic-pool' import { StatsD } from 'hot-shots' import Redis from 'ioredis' import { Kafka, logLevel, Producer } from 'kafkajs' @@ -20,7 +21,7 @@ import { KAFKA_EVENTS_PLUGIN_INGESTION, KAFKA_EVENTS_WAL } from './ingestion/top import { startSchedule } from './services/schedule' import { status } from './status' import { PluginsServer, PluginsServerConfig, Queue } from './types' -import { delay, UUIDT } from './utils' +import { createRedis, delay, UUIDT } from './utils' import { version } from './version' import { startFastifyInstance, stopFastifyInstance } from './web/server' import { startQueue } from './worker/queue' @@ -34,19 +35,6 @@ export async function createServer( ...config, } - const redis = new Redis(serverConfig.REDIS_URL, { maxRetriesPerRequest: -1 }) - redis - .on('error', (error) => { - Sentry.captureException(error) - status.error('🔴', 'Redis error encountered! Trying to reconnect...\n', error) - }) - .on('ready', () => { - if (process.env.NODE_ENV !== 'test') { - status.info('✅', 'Connected to Redis!') - } - }) - await redis.info() - let kafkaSsl: ConnectionOptions | undefined if ( serverConfig.KAFKA_CLIENT_CERT_B64 && @@ -129,7 +117,22 @@ export async function createServer( } : undefined, }) - const db = new DB(postgres, redis, kafkaProducer, clickhouse) + + const redisPool = createPool( + { + create: () => createRedis(serverConfig), + destroy: async (client) => { + await client.quit() + }, + }, + { + min: 1, + max: 3, + autostart: true, + } + ) + + const db = new DB(postgres, redisPool, kafkaProducer, clickhouse) let statsd: StatsD | undefined if (serverConfig.STATSD_HOST) { @@ -151,7 +154,7 @@ export async function createServer( ...serverConfig, db, postgres, - redis, + redisPool, clickhouse, kafka, kafkaProducer, @@ -169,7 +172,8 @@ export async function createServer( const closeServer = async () => { await kafkaProducer?.disconnect() - await server.redis.quit() + await redisPool.drain() + await redisPool.clear() await server.postgres.end() } diff --git a/src/services/schedule.ts b/src/services/schedule.ts index 20cdcafd..d52f52ab 100644 --- a/src/services/schedule.ts +++ b/src/services/schedule.ts @@ -25,7 +25,9 @@ export async function startSchedule( const retryDelay = lockTTL / 10 // 6 sec const extendDelay = lockTTL / 2 // 30 sec - const redlock = new Redlock([server.redis], { + const redis = await server.redisPool.acquire() + + const redlock = new Redlock([redis], { // we handle retires ourselves to have a way to cancel the promises on quit // without this, the `await redlock.lock()` code will remain inflight and cause issues retryCount: 0, @@ -100,6 +102,7 @@ export async function startSchedule( runEveryMinuteJob && schedule.cancelJob(runEveryMinuteJob) await lock?.unlock().catch(Sentry.captureException) + await server.redisPool.release(redis) await waitForTasksToFinish(server!) } diff --git a/src/types.ts b/src/types.ts index 2dc443ea..d1e40724 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,5 +1,6 @@ import ClickHouse from '@posthog/clickhouse' import { PluginAttachment, PluginConfigSchema, PluginEvent, Properties } from '@posthog/plugin-scaffold' +import { Pool as GenericPool } from 'generic-pool' import { StatsD } from 'hot-shots' import { EventsProcessor } from 'ingestion/process-event' import { Redis } from 'ioredis' @@ -57,7 +58,7 @@ export interface PluginsServer extends PluginsServerConfig { // active connections to Postgres, Redis, ClickHouse, Kafka, StatsD db: DB postgres: Pool - redis: Redis + redisPool: GenericPool clickhouse?: ClickHouse kafka?: Kafka kafkaProducer?: Producer diff --git a/src/utils.ts b/src/utils.ts index 2a07be31..bc51adf5 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,13 +1,14 @@ import * as Sentry from '@sentry/node' import AdmZip from 'adm-zip' import { randomBytes } from 'crypto' +import Redis from 'ioredis' import { DateTime } from 'luxon' import { Readable } from 'stream' import * as tar from 'tar-stream' import * as zlib from 'zlib' import { status } from './status' -import { LogLevel, TimestampFormat } from './types' +import { LogLevel, PluginsServerConfig, TimestampFormat } from './types' /** Time until autoexit (due to error) gives up on graceful exit and kills the process right away. */ const GRACEFUL_EXIT_PERIOD_SECONDS = 5 @@ -369,3 +370,19 @@ export async function tryTwice( return await callback() } } + +export async function createRedis(serverConfig: PluginsServerConfig) { + const redis = new Redis(serverConfig.REDIS_URL, { maxRetriesPerRequest: -1 }) + redis + .on('error', (error) => { + Sentry.captureException(error) + status.error('🔴', 'Redis error encountered! Trying to reconnect...\n', error) + }) + .on('ready', () => { + if (process.env.NODE_ENV !== 'test') { + status.info('✅', 'Connected to Redis!') + } + }) + await redis.info() + return redis +} From 3db06024853e82922d20db493c4f02bd706ca44e Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 16:12:35 +0100 Subject: [PATCH 05/18] fix some tests --- tests/postgres/queue.test.ts | 128 +++++++++++++++++++--------------- tests/postgres/worker.test.ts | 30 ++++---- 2 files changed, 89 insertions(+), 69 deletions(-) diff --git a/tests/postgres/queue.test.ts b/tests/postgres/queue.test.ts index de89ed38..62bd8e8d 100644 --- a/tests/postgres/queue.test.ts +++ b/tests/postgres/queue.test.ts @@ -19,15 +19,18 @@ async function getServer(): Promise<[PluginsServer, () => Promise]> { LOG_LEVEL: LogLevel.Log, }) - await server.redis.del(server.PLUGINS_CELERY_QUEUE) - await server.redis.del(server.CELERY_DEFAULT_QUEUE) + const redis = await server.redisPool.acquire() + await redis.del(server.PLUGINS_CELERY_QUEUE) + await redis.del(server.CELERY_DEFAULT_QUEUE) + await server.redisPool.release(redis) return [server, stopServer] } test('worker and task passing via redis', async () => { const [server, stopServer] = await getServer() + const redis = await server.redisPool.acquire() // Nothing in the redis queue - const queue1 = await server.redis.llen(server.PLUGINS_CELERY_QUEUE) + const queue1 = await redis.llen(server.PLUGINS_CELERY_QUEUE) expect(queue1).toBe(0) const kwargs = { @@ -50,11 +53,12 @@ test('worker and task passing via redis', async () => { client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) // It's there - const queue2 = await server.redis.llen(server.PLUGINS_CELERY_QUEUE) + await advanceOneTick() + const queue2 = await redis.llen(server.PLUGINS_CELERY_QUEUE) expect(queue2).toBe(1) - const item2 = await server.redis.lpop(server.PLUGINS_CELERY_QUEUE) - await server.redis.lpush(server.PLUGINS_CELERY_QUEUE, item2) + const item2 = await redis.lpop(server.PLUGINS_CELERY_QUEUE) + await redis.lpush(server.PLUGINS_CELERY_QUEUE, item2) const item = JSON.parse(item2) expect(item['content-type']).toBe('application/json') @@ -77,10 +81,10 @@ test('worker and task passing via redis', async () => { await advanceOneTick() // get the new processed task from CELERY_DEFAULT_QUEUE - const queue3 = await server.redis.llen(server.CELERY_DEFAULT_QUEUE) + const queue3 = await redis.llen(server.CELERY_DEFAULT_QUEUE) expect(queue3).toBe(1) - const item3 = await server.redis.lpop(server.CELERY_DEFAULT_QUEUE) - await server.redis.lpush(server.CELERY_DEFAULT_QUEUE, item3) + const item3 = await redis.lpop(server.CELERY_DEFAULT_QUEUE) + await redis.lpush(server.CELERY_DEFAULT_QUEUE, item3) const processedItem = JSON.parse(item3) expect(processedItem['content-type']).toBe('application/json') @@ -94,18 +98,20 @@ test('worker and task passing via redis', async () => { expect(args3).toEqual([]) expect(kwargs3).toEqual(kwargs) - const queue4 = await server.redis.llen(server.PLUGINS_CELERY_QUEUE) - const queue5 = await server.redis.llen(server.CELERY_DEFAULT_QUEUE) + const queue4 = await redis.llen(server.PLUGINS_CELERY_QUEUE) + const queue5 = await redis.llen(server.CELERY_DEFAULT_QUEUE) await advanceOneTick() await queue.stop() + await server.redisPool.release(redis) await stopServer() }) test('process multiple tasks', async () => { const [server, stopServer] = await getServer() + const redis = await server.redisPool.acquire() // Nothing in the redis queue - const queue1 = await server.redis.llen(server.PLUGINS_CELERY_QUEUE) + const queue1 = await redis.llen(server.PLUGINS_CELERY_QUEUE) expect(queue1).toBe(0) const kwargs = { @@ -128,10 +134,11 @@ test('process multiple tasks', async () => { client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) + await advanceOneTick() // There'll be a "tick lag" with the events moving from one queue to the next. :this_is_fine: - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(0) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(0) const queue = await startQueue(server, undefined, { processEvent: (event) => runPlugins(server, event), @@ -140,36 +147,38 @@ test('process multiple tasks', async () => { }) await advanceOneTick() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(2) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(0) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(2) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(0) await advanceOneTick() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(1) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(2) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) await advanceOneTick() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(0) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(2) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(1) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(2) await advanceOneTick() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(0) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(0) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) - const defaultQueue = ((await server.redis.get(server.CELERY_DEFAULT_QUEUE)) as any) as string[] + const defaultQueue = ((await redis.get(server.CELERY_DEFAULT_QUEUE)) as any) as string[] expect(defaultQueue.map((q) => JSON.parse(q)['headers']['lang']).join('-o-')).toBe('js-o-js-o-js') await queue.stop() + await server.redisPool.release(redis) await stopServer() }) test('pause and resume queue', async () => { const [server, stopServer] = await getServer() + const redis = await server.redisPool.acquire() // Nothing in the redis queue - const queue1 = await server.redis.llen(server.PLUGINS_CELERY_QUEUE) + const queue1 = await redis.llen(server.PLUGINS_CELERY_QUEUE) expect(queue1).toBe(0) const kwargs = { @@ -195,10 +204,11 @@ test('pause and resume queue', async () => { client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) + await advanceOneTick() // There'll be a "tick lag" with the events moving from one queue to the next. :this_is_fine: - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(6) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(0) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(6) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(0) const queue = await startQueue(server, undefined, { processEvent: (event) => runPlugins(server, event), @@ -207,85 +217,89 @@ test('pause and resume queue', async () => { }) await advanceOneTick() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(0) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(0) await queue.pause() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) await advanceOneTick() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) await advanceOneTick() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) queue.resume() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) await advanceOneTick() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(4) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(4) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) await advanceOneTick() + await Promise.resolve(true) - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(2) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(2) await queue.pause() await advanceOneTick() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) await queue.pause() await advanceOneTick() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) await advanceOneTick() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) queue.resume() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) await advanceOneTick() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(2) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(2) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) await advanceOneTick() + await Promise.resolve(true) - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(1) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(4) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(1) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(4) await advanceOneTick() + await Promise.resolve(true) - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(0) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(5) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(0) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(5) await advanceOneTick() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(0) - expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(6) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(0) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(6) - const defaultQueue = ((await server.redis.get(server.CELERY_DEFAULT_QUEUE)) as any) as string[] + const defaultQueue = ((await redis.get(server.CELERY_DEFAULT_QUEUE)) as any) as string[] expect(defaultQueue.map((q) => JSON.parse(q)['headers']['lang']).join('-o-')).toBe('js-o-js-o-js-o-js-o-js-o-js') await queue.stop() + await server.redisPool.release(redis) await stopServer() }) diff --git a/tests/postgres/worker.test.ts b/tests/postgres/worker.test.ts index 3e9c126f..9dabb869 100644 --- a/tests/postgres/worker.test.ts +++ b/tests/postgres/worker.test.ts @@ -116,8 +116,12 @@ test('pause the queue if too many tasks', async () => { }, makePiscina ) - await pluginsServer.server.redis.del(pluginsServer.server.PLUGINS_CELERY_QUEUE) - await pluginsServer.server.redis.del(pluginsServer.server.CELERY_DEFAULT_QUEUE) + + const redis = await pluginsServer.server.redisPool.acquire() + + await redis.del(pluginsServer.server.PLUGINS_CELERY_QUEUE) + await redis.del(pluginsServer.server.CELERY_DEFAULT_QUEUE) + const kwargs = { distinct_id: 'my-id', ip: '127.0.0.1', @@ -160,26 +164,28 @@ test('pause the queue if too many tasks', async () => { } for (let i = 0; i < 50; i++) { - if ((await pluginsServer.server.redis.llen(pluginsServer.server.PLUGINS_CELERY_QUEUE)) === 50) { + if ((await redis.llen(pluginsServer.server.PLUGINS_CELERY_QUEUE)) > 40) { await delay(100) } } - expect(await pluginsServer.server.redis.llen(pluginsServer.server.PLUGINS_CELERY_QUEUE)).toBe(40) - expect(await pluginsServer.server.redis.llen(pluginsServer.server.CELERY_DEFAULT_QUEUE)).toBe(2) + expect(await redis.llen(pluginsServer.server.PLUGINS_CELERY_QUEUE)).toBe(40) + expect(await redis.llen(pluginsServer.server.CELERY_DEFAULT_QUEUE)).toBe(2) + + await delay(100) expect(pluginsServer.queue.isPaused()).toBe(true) expect(pluginsServer.piscina.queueSize).toBe(6) expect(pluginsServer.piscina.completed).toBe(baseCompleted + 2) for (let i = 0; i < 50; i++) { - if ((await pluginsServer.server.redis.llen(pluginsServer.server.PLUGINS_CELERY_QUEUE)) > 32) { + if ((await redis.llen(pluginsServer.server.PLUGINS_CELERY_QUEUE)) > 32) { await delay(100) } } - expect(await pluginsServer.server.redis.llen(pluginsServer.server.PLUGINS_CELERY_QUEUE)).toBe(32) - expect(await pluginsServer.server.redis.llen(pluginsServer.server.CELERY_DEFAULT_QUEUE)).toBe(10) + expect(await redis.llen(pluginsServer.server.PLUGINS_CELERY_QUEUE)).toBe(32) + expect(await redis.llen(pluginsServer.server.CELERY_DEFAULT_QUEUE)).toBe(10) expect(pluginsServer.queue.isPaused()).toBe(true) expect(pluginsServer.piscina.queueSize).toBe(6) @@ -187,8 +193,8 @@ test('pause the queue if too many tasks', async () => { await delay(1000) - expect(await pluginsServer.server.redis.llen(pluginsServer.server.PLUGINS_CELERY_QUEUE)).toBe(32) - expect(await pluginsServer.server.redis.llen(pluginsServer.server.CELERY_DEFAULT_QUEUE)).toBe(14) + expect(await redis.llen(pluginsServer.server.PLUGINS_CELERY_QUEUE)).toBe(32) + expect(await redis.llen(pluginsServer.server.CELERY_DEFAULT_QUEUE)).toBe(14) expect(pluginsServer.queue.isPaused()).toBe(true) expect(pluginsServer.piscina.queueSize).toBe(2) @@ -200,8 +206,8 @@ test('pause the queue if too many tasks', async () => { expect(pluginsServer.piscina.queueSize).toBe(0) expect(pluginsServer.piscina.completed).toBe(baseCompleted + 52) - expect(await pluginsServer.server.redis.llen(pluginsServer.server.PLUGINS_CELERY_QUEUE)).toBe(0) - expect(await pluginsServer.server.redis.llen(pluginsServer.server.CELERY_DEFAULT_QUEUE)).toBe(52) + expect(await redis.llen(pluginsServer.server.PLUGINS_CELERY_QUEUE)).toBe(0) + expect(await redis.llen(pluginsServer.server.CELERY_DEFAULT_QUEUE)).toBe(52) await pluginsServer.stop() }) From 2895520f09b0b7a5fe7c8bbd040a3f3b74b2c0dd Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 17:34:15 +0100 Subject: [PATCH 06/18] add REDIS_POOL_MIN_SIZE and REDIS_POOL_MAX_SIZE, and fix a few tests --- benchmarks/postgres/e2e.celery.benchmark.ts | 16 +++++++++++----- src/config.ts | 4 ++++ src/server.ts | 4 ++-- src/types.ts | 2 ++ tests/postgres/e2e.test.ts | 9 +++++++-- tests/postgres/vm.test.ts | 6 +++--- tests/shared/process-event.ts | 10 +++++++--- 7 files changed, 36 insertions(+), 15 deletions(-) diff --git a/benchmarks/postgres/e2e.celery.benchmark.ts b/benchmarks/postgres/e2e.celery.benchmark.ts index fe477b9b..04f0da7a 100644 --- a/benchmarks/postgres/e2e.celery.benchmark.ts +++ b/benchmarks/postgres/e2e.celery.benchmark.ts @@ -1,3 +1,4 @@ +import * as IORedis from 'ioredis' import { performance } from 'perf_hooks' import { startPluginsServer } from '../../src/server' @@ -14,6 +15,8 @@ jest.setTimeout(600000) // 10min timeout const extraServerConfig: Partial = { WORKER_CONCURRENCY: 4, + REDIS_POOL_MIN_SIZE: 3, + REDIS_POOL_MAX_SIZE: 3, PLUGINS_CELERY_QUEUE: 'test-plugins-celery-queue', CELERY_DEFAULT_QUEUE: 'test-celery-default-queue', PLUGIN_SERVER_INGESTION: true, @@ -26,6 +29,7 @@ describe('e2e celery & postgres benchmark', () => { let server: PluginsServer let stopServer: () => Promise let posthog: DummyPostHog + let redis: IORedis.Redis beforeEach(async () => { await resetTestDatabase(` @@ -40,14 +44,16 @@ describe('e2e celery & postgres benchmark', () => { server = startResponse.server stopServer = startResponse.stop queue = startResponse.queue + redis = await server.redisPool.acquire() - await server.redis.del(server.PLUGINS_CELERY_QUEUE) - await server.redis.del(server.CELERY_DEFAULT_QUEUE) + await redis.del(server.PLUGINS_CELERY_QUEUE) + await redis.del(server.CELERY_DEFAULT_QUEUE) posthog = createPosthog(server, pluginConfig39) }) afterEach(async () => { + await server.redisPool.release(redis) await stopServer() }) @@ -62,12 +68,12 @@ describe('e2e celery & postgres benchmark', () => { posthog.capture('custom event', { name: 'haha', uuid, randomProperty: 'lololo' }) } await queue.pause() - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(0) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(0) for (let i = 0; i < count; i++) { createEvent() } - await delay(2000) - expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(count) + await delay(1000) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(count) queue.resume() console.log('Starting timer') diff --git a/src/config.ts b/src/config.ts index 87c28aa4..2d248336 100644 --- a/src/config.ts +++ b/src/config.ts @@ -41,6 +41,8 @@ export function getDefaultConfig(): PluginsServerConfig { STATSD_PORT: 8125, STATSD_PREFIX: 'plugin-server.', SCHEDULE_LOCK_TTL: 60, + REDIS_POOL_MIN_SIZE: 1, + REDIS_POOL_MAX_SIZE: 3, } } @@ -77,6 +79,8 @@ export function getConfigHelp(): Record { STATSD_PORT: 'StatsD port', STATSD_PREFIX: 'StatsD prefix', SCHEDULE_LOCK_TTL: 'How many seconds to hold the lock for the schedule', + REDIS_POOL_MIN_SIZE: 'Minimum number of Redis connections to use per thread', + REDIS_POOL_MAX_SIZE: 'Maximum number of Redis connections to use per thread', } } diff --git a/src/server.ts b/src/server.ts index 10cabe5e..99f5a632 100644 --- a/src/server.ts +++ b/src/server.ts @@ -126,8 +126,8 @@ export async function createServer( }, }, { - min: 1, - max: 3, + min: serverConfig.REDIS_POOL_MIN_SIZE, + max: serverConfig.REDIS_POOL_MAX_SIZE, autostart: true, } ) diff --git a/src/types.ts b/src/types.ts index d1e40724..078df96b 100644 --- a/src/types.ts +++ b/src/types.ts @@ -52,6 +52,8 @@ export interface PluginsServerConfig extends Record { STATSD_PORT: number STATSD_PREFIX: string SCHEDULE_LOCK_TTL: number + REDIS_POOL_MIN_SIZE: number + REDIS_POOL_MAX_SIZE: number } export interface PluginsServer extends PluginsServerConfig { diff --git a/tests/postgres/e2e.test.ts b/tests/postgres/e2e.test.ts index 1a2a4733..548ea30e 100644 --- a/tests/postgres/e2e.test.ts +++ b/tests/postgres/e2e.test.ts @@ -1,3 +1,5 @@ +import * as IORedis from 'ioredis' + import { startPluginsServer } from '../../src/server' import { LogLevel } from '../../src/types' import { PluginsServer } from '../../src/types' @@ -14,6 +16,7 @@ describe('e2e postgres ingestion', () => { let server: PluginsServer let stopServer: () => Promise let posthog: DummyPostHog + let redis: IORedis.Redis beforeEach(async () => { await resetTestDatabase(` @@ -36,14 +39,16 @@ describe('e2e postgres ingestion', () => { ) server = startResponse.server stopServer = startResponse.stop + redis = await server.redisPool.acquire() - await server.redis.del(server.PLUGINS_CELERY_QUEUE) - await server.redis.del(server.CELERY_DEFAULT_QUEUE) + await redis.del(server.PLUGINS_CELERY_QUEUE) + await redis.del(server.CELERY_DEFAULT_QUEUE) posthog = createPosthog(server, pluginConfig39) }) afterEach(async () => { + await server.redisPool.release(redis) await stopServer() }) diff --git a/tests/postgres/vm.test.ts b/tests/postgres/vm.test.ts index ffff6d1a..91f00d41 100644 --- a/tests/postgres/vm.test.ts +++ b/tests/postgres/vm.test.ts @@ -21,15 +21,15 @@ const defaultEvent = { } let mockServer: PluginsServer +let stopServer: () => Promise beforeEach(async () => { ;(Client as any).mockClear() - mockServer = (await createServer())[0] + ;[mockServer, stopServer] = await createServer() }) afterEach(async () => { - mockServer.redis.disconnect() - await mockServer.postgres.end() + await stopServer() jest.clearAllMocks() }) diff --git a/tests/shared/process-event.ts b/tests/shared/process-event.ts index 37d449cb..c46c5b79 100644 --- a/tests/shared/process-event.ts +++ b/tests/shared/process-event.ts @@ -1,4 +1,5 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' +import * as IORedis from 'ioredis' import { DateTime } from 'luxon' import { performance } from 'perf_hooks' @@ -66,6 +67,7 @@ export const createProcessEventTests = ( let team: Team let server: PluginsServer let stopServer: () => Promise + let redis: IORedis.Redis let eventsProcessor: EventsProcessor let now = DateTime.utc() const returned: ReturnWithServer = {} @@ -78,8 +80,8 @@ export const createProcessEventTests = ( ...(extraServerConfig ?? {}), }) - await server.redis.del(server.PLUGINS_CELERY_QUEUE) - await server.redis.del(server.CELERY_DEFAULT_QUEUE) + await redis.del(server.PLUGINS_CELERY_QUEUE) + await redis.del(server.CELERY_DEFAULT_QUEUE) onQuery(server, () => queryCounter++) @@ -128,13 +130,15 @@ export const createProcessEventTests = ( processEventCounter = 0 team = await getFirstTeam(server) now = DateTime.utc() + redis = await server.redisPool.acquire() // clear the webhook redis cache const hooksCacheKey = `@posthog/plugin-server/hooks/${team.id}` - await server.redis.del(hooksCacheKey) + await redis.del(hooksCacheKey) }) afterEach(async () => { + await server.redisPool.release(redis) await stopServer?.() }) From 24e5b1b6991ea0013c5e281d6e0012deb2115011 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 17:35:27 +0100 Subject: [PATCH 07/18] redlock uses another redis connection outside the pool --- src/services/schedule.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/services/schedule.ts b/src/services/schedule.ts index d52f52ab..5cd7d1fe 100644 --- a/src/services/schedule.ts +++ b/src/services/schedule.ts @@ -6,6 +6,7 @@ import Redlock from 'redlock' import { processError } from '../error' import { status } from '../status' import { PluginConfigId, PluginsServer } from '../types' +import { createRedis } from '../utils' const LOCKED_RESOURCE = 'plugin-server:locks:schedule' @@ -25,7 +26,8 @@ export async function startSchedule( const retryDelay = lockTTL / 10 // 6 sec const extendDelay = lockTTL / 2 // 30 sec - const redis = await server.redisPool.acquire() + // use another redis connection for redlock + const redis = await createRedis(server) const redlock = new Redlock([redis], { // we handle retires ourselves to have a way to cancel the promises on quit @@ -102,7 +104,7 @@ export async function startSchedule( runEveryMinuteJob && schedule.cancelJob(runEveryMinuteJob) await lock?.unlock().catch(Sentry.captureException) - await server.redisPool.release(redis) + await redis.quit() await waitForTasksToFinish(server!) } From 2a1f1c05b0360c2952b514c5f40b3822887619b4 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 17:45:21 +0100 Subject: [PATCH 08/18] increase delay --- benchmarks/postgres/e2e.celery.benchmark.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/postgres/e2e.celery.benchmark.ts b/benchmarks/postgres/e2e.celery.benchmark.ts index 04f0da7a..f5b8b72d 100644 --- a/benchmarks/postgres/e2e.celery.benchmark.ts +++ b/benchmarks/postgres/e2e.celery.benchmark.ts @@ -72,7 +72,7 @@ describe('e2e celery & postgres benchmark', () => { for (let i = 0; i < count; i++) { createEvent() } - await delay(1000) + await delay(3000) expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(count) queue.resume() From d9a5552bcfdf07ecb9dc8b8f8497af0799e9d188 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 17:45:34 +0100 Subject: [PATCH 09/18] fix type --- src/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils.ts b/src/utils.ts index bc51adf5..0f3a2dd6 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -371,7 +371,7 @@ export async function tryTwice( } } -export async function createRedis(serverConfig: PluginsServerConfig) { +export async function createRedis(serverConfig: PluginsServerConfig): Promise { const redis = new Redis(serverConfig.REDIS_URL, { maxRetriesPerRequest: -1 }) redis .on('error', (error) => { From 6580a7d05abb77a87b9d8b4939e42a11ac345099 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 17:48:16 +0100 Subject: [PATCH 10/18] more fixes --- tests/postgres/worker.test.ts | 2 ++ tests/shared/process-event.ts | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/postgres/worker.test.ts b/tests/postgres/worker.test.ts index 9dabb869..687d0124 100644 --- a/tests/postgres/worker.test.ts +++ b/tests/postgres/worker.test.ts @@ -110,6 +110,8 @@ test('pause the queue if too many tasks', async () => { { WORKER_CONCURRENCY: 2, TASKS_PER_WORKER: 2, + REDIS_POOL_MIN_SIZE: 1, + REDIS_POOL_MAX_SIZE: 1, PLUGINS_CELERY_QUEUE: 'test-plugins-celery-queue', CELERY_DEFAULT_QUEUE: 'test-celery-default-queue', LOG_LEVEL: LogLevel.Debug, diff --git a/tests/shared/process-event.ts b/tests/shared/process-event.ts index c46c5b79..5cb7e350 100644 --- a/tests/shared/process-event.ts +++ b/tests/shared/process-event.ts @@ -80,6 +80,8 @@ export const createProcessEventTests = ( ...(extraServerConfig ?? {}), }) + redis = await server.redisPool.acquire() + await redis.del(server.PLUGINS_CELERY_QUEUE) await redis.del(server.CELERY_DEFAULT_QUEUE) @@ -130,7 +132,6 @@ export const createProcessEventTests = ( processEventCounter = 0 team = await getFirstTeam(server) now = DateTime.utc() - redis = await server.redisPool.acquire() // clear the webhook redis cache const hooksCacheKey = `@posthog/plugin-server/hooks/${team.id}` From bee009b8591531e370010950e42caecfa05dc33b Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 18:00:03 +0100 Subject: [PATCH 11/18] more fixes --- tests/postgres/e2e.timeout.test.ts | 7 ++++--- tests/postgres/worker.test.ts | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/postgres/e2e.timeout.test.ts b/tests/postgres/e2e.timeout.test.ts index 13ad4649..111c6b2d 100644 --- a/tests/postgres/e2e.timeout.test.ts +++ b/tests/postgres/e2e.timeout.test.ts @@ -40,9 +40,10 @@ describe('e2e postgres ingestion timeout', () => { ) server = startResponse.server stopServer = startResponse.stop - - await server.redis.del(server.PLUGINS_CELERY_QUEUE) - await server.redis.del(server.CELERY_DEFAULT_QUEUE) + const redis = await server.redisPool.acquire() + await redis.del(server.PLUGINS_CELERY_QUEUE) + await redis.del(server.CELERY_DEFAULT_QUEUE) + await server.redisPool.release(redis) posthog = createPosthog(server, pluginConfig39) }) diff --git a/tests/postgres/worker.test.ts b/tests/postgres/worker.test.ts index 687d0124..2d9ca5e3 100644 --- a/tests/postgres/worker.test.ts +++ b/tests/postgres/worker.test.ts @@ -153,7 +153,7 @@ test('pause the queue if too many tasks', async () => { expect(pluginsServer.piscina.completed).toBe(baseCompleted) expect(pluginsServer.queue.isPaused()).toBe(false) - await delay(3000) + await delay(5000) expect(pluginsServer.piscina.queueSize).toBe(0) expect(pluginsServer.piscina.completed).toBe(baseCompleted + 2) @@ -211,5 +211,6 @@ test('pause the queue if too many tasks', async () => { expect(await redis.llen(pluginsServer.server.PLUGINS_CELERY_QUEUE)).toBe(0) expect(await redis.llen(pluginsServer.server.CELERY_DEFAULT_QUEUE)).toBe(52) + await pluginsServer.server.redisPool.release(redis) await pluginsServer.stop() }) From db8dbca8dad6c37cec3bbd7c99a983ccae34ca1d Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 20:23:07 +0100 Subject: [PATCH 12/18] queue test without mocked redis --- package.json | 1 - tests/helpers/redis.ts | 21 ----- tests/postgres/queue.test.ts | 135 ++++++++----------------------- yarn.lock | 150 ----------------------------------- 4 files changed, 31 insertions(+), 276 deletions(-) delete mode 100644 tests/helpers/redis.ts diff --git a/package.json b/package.json index 8c4f22d8..c9768366 100644 --- a/package.json +++ b/package.json @@ -101,7 +101,6 @@ "eslint-plugin-react": "^7.20.0", "eslint-plugin-simple-import-sort": "^7.0.0", "husky": ">=4", - "ioredis-mock": "^5.2.0", "jest": "^26.6.3", "lint-staged": ">=10.5.1", "prettier": "^2.2.1", diff --git a/tests/helpers/redis.ts b/tests/helpers/redis.ts deleted file mode 100644 index 9cf78046..00000000 --- a/tests/helpers/redis.ts +++ /dev/null @@ -1,21 +0,0 @@ -// Adapted from https://github.com/stipsan/ioredis-mock/issues/568#issuecomment-492558489 -export const redisFactory = (): any => { - const Redis = require('ioredis-mock') - if (typeof Redis === 'object') { - // the first mock is an ioredis shim because ioredis-mock depends on it - // https://github.com/stipsan/ioredis-mock/blob/2ba837f07c0723cde993fb8f791a5fcfdabce719/src/index.js#L100-L109 - return { - Command: { _transformer: { argument: {}, reply: {} } }, - } - } - // second mock for our code - return function (...args: any[]) { - const redis = new Redis(args) - // adapted from copy/paste - our own brpop function! - redis.brpop = async (...args: any[]) => { - args.pop() - return [args[0], await redis.rpop(...args)] - } - return redis - } -} diff --git a/tests/postgres/queue.test.ts b/tests/postgres/queue.test.ts index 62bd8e8d..f30751d5 100644 --- a/tests/postgres/queue.test.ts +++ b/tests/postgres/queue.test.ts @@ -1,12 +1,11 @@ -// eslint-disable-next-line -import { redisFactory } from '../helpers/redis' import Client from '../../src/celery/client' import { runPlugins } from '../../src/plugins' import { createServer } from '../../src/server' import { LogLevel, PluginsServer } from '../../src/types' +import { delay } from '../../src/utils' import { startQueue } from '../../src/worker/queue' -jest.mock('ioredis', () => redisFactory()) +jest.setTimeout(60000) // 60 sec timeout function advanceOneTick() { return new Promise((resolve) => process.nextTick(resolve)) @@ -14,8 +13,11 @@ function advanceOneTick() { async function getServer(): Promise<[PluginsServer, () => Promise]> { const [server, stopServer] = await createServer({ - PLUGINS_CELERY_QUEUE: 'test-plugins-celery-queue', - CELERY_DEFAULT_QUEUE: 'test-celery-default-queue', + REDIS_POOL_MIN_SIZE: 3, + REDIS_POOL_MAX_SIZE: 3, + PLUGINS_CELERY_QUEUE: 'ttt-test-plugins-celery-queue', + CELERY_DEFAULT_QUEUE: 'ttt-test-celery-default-queue', + PLUGIN_SERVER_INGESTION: false, LOG_LEVEL: LogLevel.Log, }) @@ -52,8 +54,8 @@ test('worker and task passing via redis', async () => { const client = new Client(server.db, server.PLUGINS_CELERY_QUEUE) client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) - // It's there - await advanceOneTick() + await delay(1000) + const queue2 = await redis.llen(server.PLUGINS_CELERY_QUEUE) expect(queue2).toBe(1) @@ -77,8 +79,8 @@ test('worker and task passing via redis', async () => { processEventBatch: (events) => Promise.all(events.map((event) => runPlugins(server, event))), ingestEvent: () => Promise.resolve({ success: true }), }) - await advanceOneTick() - await advanceOneTick() + + await delay(100) // get the new processed task from CELERY_DEFAULT_QUEUE const queue3 = await redis.llen(server.CELERY_DEFAULT_QUEUE) @@ -98,10 +100,6 @@ test('worker and task passing via redis', async () => { expect(args3).toEqual([]) expect(kwargs3).toEqual(kwargs) - const queue4 = await redis.llen(server.PLUGINS_CELERY_QUEUE) - const queue5 = await redis.llen(server.CELERY_DEFAULT_QUEUE) - await advanceOneTick() - await queue.stop() await server.redisPool.release(redis) await stopServer() @@ -134,40 +132,25 @@ test('process multiple tasks', async () => { client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) - await advanceOneTick() - // There'll be a "tick lag" with the events moving from one queue to the next. :this_is_fine: + await delay(1000) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(0) const queue = await startQueue(server, undefined, { - processEvent: (event) => runPlugins(server, event), + processEvent: async (event) => runPlugins(server, event), processEventBatch: (events) => Promise.all(events.map((event) => runPlugins(server, event))), ingestEvent: () => Promise.resolve({ success: true }), }) - await advanceOneTick() - - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(2) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(0) - - await advanceOneTick() - - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(2) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) - await advanceOneTick() - - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(1) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(2) - - await advanceOneTick() + await delay(1000) expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(0) expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) - const defaultQueue = ((await redis.get(server.CELERY_DEFAULT_QUEUE)) as any) as string[] - - expect(defaultQueue.map((q) => JSON.parse(q)['headers']['lang']).join('-o-')).toBe('js-o-js-o-js') + const oneTask = await redis.lpop(server.CELERY_DEFAULT_QUEUE) + expect(JSON.parse(oneTask)['headers']['lang']).toBe('js') await queue.stop() await server.redisPool.release(redis) @@ -198,13 +181,11 @@ test('pause and resume queue', async () => { // Tricky: make a client to the PLUGINS_CELERY_QUEUE queue (not CELERY_DEFAULT_QUEUE as normally) // This is so that the worker can directly read from it. Basically we will simulate a event sent from posthog. const client = new Client(server.db, server.PLUGINS_CELERY_QUEUE) - client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) - client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) - client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) - client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) - client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) - client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) - await advanceOneTick() + for (let i = 0; i < 6; i++) { + client.sendTask('posthog.tasks.process_event.process_event_with_plugins', args, {}) + } + + await delay(1000) // There'll be a "tick lag" with the events moving from one queue to the next. :this_is_fine: expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(6) @@ -222,82 +203,28 @@ test('pause and resume queue', async () => { await queue.pause() - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) - - await advanceOneTick() - - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) - - await advanceOneTick() - - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) - - queue.resume() - - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) - - await advanceOneTick() - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(4) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(1) - - await advanceOneTick() - await Promise.resolve(true) - - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(2) - await queue.pause() - await advanceOneTick() + await delay(100) - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) - - await queue.pause() - await advanceOneTick() - - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(4) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(2) - await advanceOneTick() + await delay(100) - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(4) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(2) queue.resume() - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) - - await advanceOneTick() - - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(2) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(3) - - await advanceOneTick() - await Promise.resolve(true) - - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(1) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(4) - - await advanceOneTick() - await Promise.resolve(true) - - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(0) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(5) - - await advanceOneTick() + await delay(500) expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(0) expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(6) - const defaultQueue = ((await redis.get(server.CELERY_DEFAULT_QUEUE)) as any) as string[] - - expect(defaultQueue.map((q) => JSON.parse(q)['headers']['lang']).join('-o-')).toBe('js-o-js-o-js-o-js-o-js-o-js') + const oneTask = await redis.lpop(server.CELERY_DEFAULT_QUEUE) + expect(JSON.parse(oneTask)['headers']['lang']).toBe('js') await queue.stop() await server.redisPool.release(redis) diff --git a/yarn.lock b/yarn.lock index a45afd35..ea1a8c17 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1808,11 +1808,6 @@ array-find-index@^1.0.1: resolved "https://registry.yarnpkg.com/array-find-index/-/array-find-index-1.0.2.tgz#df010aa1287e164bbda6f9723b0a96a1ec4187a1" integrity sha1-3wEKoSh+Fku9pvlyOwqWoexBh6E= -array-from@^2.1.1: - version "2.1.1" - resolved "https://registry.yarnpkg.com/array-from/-/array-from-2.1.1.tgz#cfe9d8c26628b9dc5aecc62a9f5d8f1f352c1195" - integrity sha1-z+nYwmYoudxa7MYqn12PHzUsEZU= - array-includes@^3.1.1: version "3.1.2" resolved "https://registry.yarnpkg.com/array-includes/-/array-includes-3.1.2.tgz#a8db03e0b88c8c6aeddc49cb132f9bcab4ebf9c8" @@ -2510,14 +2505,6 @@ currently-unhandled@^0.4.1: dependencies: array-find-index "^1.0.1" -d@1, d@^1.0.1: - version "1.0.1" - resolved "https://registry.yarnpkg.com/d/-/d-1.0.1.tgz#8698095372d58dbee346ffd0c7093f99f8f9eb5a" - integrity sha512-m62ShEObQ39CfralilEQRjH6oAMtNCV1xJyEx5LpRYUVN+EviphDgUc/F3hnYbADmkiNs67Y+3ylmlG7Lnu+FA== - dependencies: - es5-ext "^0.10.50" - type "^1.0.1" - dashdash@^1.12.0: version "1.14.1" resolved "https://registry.yarnpkg.com/dashdash/-/dashdash-1.14.1.tgz#853cfa0f7cbe2fed5de20326b8dd581035f6e2f0" @@ -2793,63 +2780,6 @@ es-to-primitive@^1.2.1: is-date-object "^1.0.1" is-symbol "^1.0.2" -es5-ext@^0.10.35, es5-ext@^0.10.50, es5-ext@~0.10.14: - version "0.10.53" - resolved "https://registry.yarnpkg.com/es5-ext/-/es5-ext-0.10.53.tgz#93c5a3acfdbef275220ad72644ad02ee18368de1" - integrity sha512-Xs2Stw6NiNHWypzRTY1MtaG/uJlwCk8kH81920ma8mvN8Xq1gsfhZvpkImLQArw8AHnv8MT2I45J3c0R8slE+Q== - dependencies: - es6-iterator "~2.0.3" - es6-symbol "~3.1.3" - next-tick "~1.0.0" - -es6-iterator@~2.0.1, es6-iterator@~2.0.3: - version "2.0.3" - resolved "https://registry.yarnpkg.com/es6-iterator/-/es6-iterator-2.0.3.tgz#a7de889141a05a94b0854403b2d0a0fbfa98f3b7" - integrity sha1-p96IkUGgWpSwhUQDstCg+/qY87c= - dependencies: - d "1" - es5-ext "^0.10.35" - es6-symbol "^3.1.1" - -es6-map@^0.1.5: - version "0.1.5" - resolved "https://registry.yarnpkg.com/es6-map/-/es6-map-0.1.5.tgz#9136e0503dcc06a301690f0bb14ff4e364e949f0" - integrity sha1-kTbgUD3MBqMBaQ8LsU/042TpSfA= - dependencies: - d "1" - es5-ext "~0.10.14" - es6-iterator "~2.0.1" - es6-set "~0.1.5" - es6-symbol "~3.1.1" - event-emitter "~0.3.5" - -es6-set@^0.1.5, es6-set@~0.1.5: - version "0.1.5" - resolved "https://registry.yarnpkg.com/es6-set/-/es6-set-0.1.5.tgz#d2b3ec5d4d800ced818db538d28974db0a73ccb1" - integrity sha1-0rPsXU2ADO2BjbU40ol02wpzzLE= - dependencies: - d "1" - es5-ext "~0.10.14" - es6-iterator "~2.0.1" - es6-symbol "3.1.1" - event-emitter "~0.3.5" - -es6-symbol@3.1.1: - version "3.1.1" - resolved "https://registry.yarnpkg.com/es6-symbol/-/es6-symbol-3.1.1.tgz#bf00ef4fdab6ba1b46ecb7b629b4c7ed5715cc77" - integrity sha1-vwDvT9q2uhtG7Le2KbTH7VcVzHc= - dependencies: - d "1" - es5-ext "~0.10.14" - -es6-symbol@^3.1.1, es6-symbol@~3.1.1, es6-symbol@~3.1.3: - version "3.1.3" - resolved "https://registry.yarnpkg.com/es6-symbol/-/es6-symbol-3.1.3.tgz#bad5d3c1bcdac28269f4cb331e431c78ac705d18" - integrity sha512-NJ6Yn3FuDinBaBRWl/q5X/s4koRHBrgKAu+yGI6JCBeiu3qrcbJhwT2GeR/EXVfylRk8dpQVJoLEFhK+Mu31NA== - dependencies: - d "^1.0.1" - ext "^1.1.2" - escalade@^3.1.1: version "3.1.1" resolved "https://registry.yarnpkg.com/escalade/-/escalade-3.1.1.tgz#d8cfdc7000965c5a0174b4a82eaa5c0552742e40" @@ -3075,14 +3005,6 @@ esutils@^2.0.2: resolved "https://registry.yarnpkg.com/esutils/-/esutils-2.0.3.tgz#74d2eb4de0b8da1293711910d50775b9b710ef64" integrity sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g== -event-emitter@~0.3.5: - version "0.3.5" - resolved "https://registry.yarnpkg.com/event-emitter/-/event-emitter-0.3.5.tgz#df8c69eef1647923c7157b9ce83840610b02cc39" - integrity sha1-34xp7vFkeSPHFXuc6DhAYQsCzDk= - dependencies: - d "1" - es5-ext "~0.10.14" - event-target-shim@^5.0.0: version "5.0.1" resolved "https://registry.yarnpkg.com/event-target-shim/-/event-target-shim-5.0.1.tgz#5d4d3ebdf9583d63a5333ce2deb7480ab2b05789" @@ -3156,13 +3078,6 @@ expect@^26.6.2: jest-message-util "^26.6.2" jest-regex-util "^26.0.0" -ext@^1.1.2: - version "1.4.0" - resolved "https://registry.yarnpkg.com/ext/-/ext-1.4.0.tgz#89ae7a07158f79d35517882904324077e4379244" - integrity sha512-Key5NIsUxdqKg3vIsdw9dSuXpPCQ297y6wBjL30edxwPgt2E44WcWBZey/ZvUc6sERLTxKdyCu4gZFmUbk1Q7A== - dependencies: - type "^2.0.0" - extend-shallow@^2.0.1: version "2.0.1" resolved "https://registry.yarnpkg.com/extend-shallow/-/extend-shallow-2.0.1.tgz#51af7d614ad9a9f610ea1bafbb989d6b1c56890f" @@ -3309,20 +3224,6 @@ fb-watchman@^2.0.0: dependencies: bser "2.1.1" -fengari-interop@^0.1.2: - version "0.1.2" - resolved "https://registry.yarnpkg.com/fengari-interop/-/fengari-interop-0.1.2.tgz#f7731dcdd2ff4449073fb7ac3c451a8841ce1e87" - integrity sha512-8iTvaByZVoi+lQJhHH9vC+c/Yaok9CwOqNQZN6JrVpjmWwW4dDkeblBXhnHC+BoI6eF4Cy5NKW3z6ICEjvgywQ== - -fengari@^0.1.4: - version "0.1.4" - resolved "https://registry.yarnpkg.com/fengari/-/fengari-0.1.4.tgz#72416693cd9e43bd7d809d7829ddc0578b78b0bb" - integrity sha512-6ujqUuiIYmcgkGz8MGAdERU57EIluGGPSUgGPTsco657EHa+srq0S3/YUl/r9kx1+D+d4rGfYObd+m8K22gB1g== - dependencies: - readline-sync "^1.4.9" - sprintf-js "^1.1.1" - tmp "^0.0.33" - figures@^3.2.0: version "3.2.0" resolved "https://registry.yarnpkg.com/figures/-/figures-3.2.0.tgz#625c18bd293c604dc4a8ddb2febf0c88341746af" @@ -3901,20 +3802,6 @@ internal-slot@^1.0.2: has "^1.0.3" side-channel "^1.0.2" -ioredis-mock@^5.2.0: - version "5.2.0" - resolved "https://registry.yarnpkg.com/ioredis-mock/-/ioredis-mock-5.2.0.tgz#f2c26817ab89855774d5e57f9ec09b02398ae485" - integrity sha512-BGB0ANqW/a+W89mToXUTSL/wqr+WXGeGElLme7Do2X9hM3tQuZYswNw1VdfstdbbFiK4t/qzou+OXZWPi8CieA== - dependencies: - array-from "^2.1.1" - es6-map "^0.1.5" - es6-set "^0.1.5" - fengari "^0.1.4" - fengari-interop "^0.1.2" - lodash "^4.17.20" - minimatch "^3.0.4" - standard-as-callback "^2.0.1" - ioredis@^4.19.2: version "4.19.2" resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-4.19.2.tgz#e3eab394c653cea5aea07c0c784d8c772dce8801" @@ -5204,11 +5091,6 @@ natural-compare@^1.4.0: resolved "https://registry.yarnpkg.com/natural-compare/-/natural-compare-1.4.0.tgz#4abebfeed7541f2c27acfb29bdbbd15c8d5ba4f7" integrity sha1-Sr6/7tdUHywnrPspvbvRXI1bpPc= -next-tick@~1.0.0: - version "1.0.0" - resolved "https://registry.yarnpkg.com/next-tick/-/next-tick-1.0.0.tgz#ca86d1fe8828169b0120208e3dc8424b9db8342c" - integrity sha1-yobR/ogoFpsBICCOPchCS524NCw= - nice-napi@^1.0.2: version "1.0.2" resolved "https://registry.yarnpkg.com/nice-napi/-/nice-napi-1.0.2.tgz#dc0ab5a1eac20ce548802fc5686eaa6bc654927b" @@ -5445,11 +5327,6 @@ optionator@^0.9.1: type-check "^0.4.0" word-wrap "^1.2.3" -os-tmpdir@~1.0.2: - version "1.0.2" - resolved "https://registry.yarnpkg.com/os-tmpdir/-/os-tmpdir-1.0.2.tgz#bbe67406c79aa85c5cfec766fe5734555dfa1274" - integrity sha1-u+Z0BseaqFxc/sdm/lc0VV36EnQ= - p-each-series@^2.1.0: version "2.2.0" resolved "https://registry.yarnpkg.com/p-each-series/-/p-each-series-2.2.0.tgz#105ab0357ce72b202a8a8b94933672657b5e2a9a" @@ -6020,11 +5897,6 @@ readdirp@~3.5.0: dependencies: picomatch "^2.2.1" -readline-sync@^1.4.9: - version "1.4.10" - resolved "https://registry.yarnpkg.com/readline-sync/-/readline-sync-1.4.10.tgz#41df7fbb4b6312d673011594145705bf56d8873b" - integrity sha512-gNva8/6UAe8QYepIQH/jQ2qn91Qj0B9sYjMBBs3QOB8F2CXcKgLxQaJRP76sWVRQt+QU+8fAkCbCvjjMFu7Ycw== - redent@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/redent/-/redent-1.0.0.tgz#cf916ab1fd5f1f16dfb20822dd6ec7f730c2afde" @@ -6611,11 +6483,6 @@ split2@^3.1.1: dependencies: readable-stream "^3.0.0" -sprintf-js@^1.1.1: - version "1.1.2" - resolved "https://registry.yarnpkg.com/sprintf-js/-/sprintf-js-1.1.2.tgz#da1765262bf8c0f571749f2ad6c26300207ae673" - integrity sha512-VE0SOVEHCk7Qc8ulkWw3ntAzXuqf7S2lvwQaDLRnUeIEaKNQJzV6BwmLKhOqT61aGhfUMrXeaBk+oDGCzvhcug== - sprintf-js@~1.0.2: version "1.0.3" resolved "https://registry.yarnpkg.com/sprintf-js/-/sprintf-js-1.0.3.tgz#04e6926f662895354f3dd015203633b857297e2c" @@ -6929,13 +6796,6 @@ tiny-lru@^7.0.0: resolved "https://registry.yarnpkg.com/tiny-lru/-/tiny-lru-7.0.6.tgz#b0c3cdede1e5882aa2d1ae21cb2ceccf2a331f24" integrity sha512-zNYO0Kvgn5rXzWpL0y3RS09sMK67eGaQj9805jlK9G6pSadfriTczzLHFXa/xcW4mIRfmlB9HyQ/+SgL0V1uow== -tmp@^0.0.33: - version "0.0.33" - resolved "https://registry.yarnpkg.com/tmp/-/tmp-0.0.33.tgz#6d34335889768d21b2bcda0aa277ced3b1bfadf9" - integrity sha512-jRCJlojKnZ3addtTOjdIqoRuPEKBvNXcGYqzO6zWZX8KfKEpnGY5jfggJQ3EjKuu8D4bJRr0y+cYJFmYbImXGw== - dependencies: - os-tmpdir "~1.0.2" - tmpl@1.0.x: version "1.0.4" resolved "https://registry.yarnpkg.com/tmpl/-/tmpl-1.0.4.tgz#23640dd7b42d00433911140820e5cf440e521dd1" @@ -7136,16 +6996,6 @@ type-fest@^0.8.1: resolved "https://registry.yarnpkg.com/type-fest/-/type-fest-0.8.1.tgz#09e249ebde851d3b1e48d27c105444667f17b83d" integrity sha512-4dbzIzqvjtgiM5rw1k5rEHtBANKmdudhGyBEajN01fEyhaAIhsoKNy6y7+IN93IfpFtwY9iqi7kD+xwKhQsNJA== -type@^1.0.1: - version "1.2.0" - resolved "https://registry.yarnpkg.com/type/-/type-1.2.0.tgz#848dd7698dafa3e54a6c479e759c4bc3f18847a0" - integrity sha512-+5nt5AAniqsCnu2cEQQdpzCAh33kVx8n0VoFidKpB1dVVLAN/F+bgVOqOJqOnEnrhp222clB5p3vUlD+1QAnfg== - -type@^2.0.0: - version "2.1.0" - resolved "https://registry.yarnpkg.com/type/-/type-2.1.0.tgz#9bdc22c648cf8cf86dd23d32336a41cfb6475e3f" - integrity sha512-G9absDWvhAWCV2gmF1zKud3OyC61nZDwWvBL2DApaVFogI07CprggiQAOOjvp2NRjYWFzPyu7vwtDrQFq8jeSA== - typedarray-to-buffer@^3.1.5: version "3.1.5" resolved "https://registry.yarnpkg.com/typedarray-to-buffer/-/typedarray-to-buffer-3.1.5.tgz#a97ee7a9ff42691b9f783ff1bc5112fe3fca9080" From eee12b09fb42a55394b05ad2924b5c450e97df5f Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 20:24:27 +0100 Subject: [PATCH 13/18] increase redis pool for worker test --- tests/postgres/worker.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/postgres/worker.test.ts b/tests/postgres/worker.test.ts index 2d9ca5e3..fab3f269 100644 --- a/tests/postgres/worker.test.ts +++ b/tests/postgres/worker.test.ts @@ -110,8 +110,8 @@ test('pause the queue if too many tasks', async () => { { WORKER_CONCURRENCY: 2, TASKS_PER_WORKER: 2, - REDIS_POOL_MIN_SIZE: 1, - REDIS_POOL_MAX_SIZE: 1, + REDIS_POOL_MIN_SIZE: 2, + REDIS_POOL_MAX_SIZE: 2, PLUGINS_CELERY_QUEUE: 'test-plugins-celery-queue', CELERY_DEFAULT_QUEUE: 'test-celery-default-queue', LOG_LEVEL: LogLevel.Debug, From 3d83718b826dd3ca20a3c6a8cdfbb61478daf9ae Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 20:26:53 +0100 Subject: [PATCH 14/18] fix worker test --- tests/postgres/worker.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/postgres/worker.test.ts b/tests/postgres/worker.test.ts index fab3f269..61feebf3 100644 --- a/tests/postgres/worker.test.ts +++ b/tests/postgres/worker.test.ts @@ -110,8 +110,8 @@ test('pause the queue if too many tasks', async () => { { WORKER_CONCURRENCY: 2, TASKS_PER_WORKER: 2, - REDIS_POOL_MIN_SIZE: 2, - REDIS_POOL_MAX_SIZE: 2, + REDIS_POOL_MIN_SIZE: 3, + REDIS_POOL_MAX_SIZE: 3, PLUGINS_CELERY_QUEUE: 'test-plugins-celery-queue', CELERY_DEFAULT_QUEUE: 'test-celery-default-queue', LOG_LEVEL: LogLevel.Debug, From 930ebfcd6208eae0d9bacf3a0fbb6c669fd95229 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 20:35:22 +0100 Subject: [PATCH 15/18] shorter delay --- benchmarks/postgres/e2e.celery.benchmark.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/postgres/e2e.celery.benchmark.ts b/benchmarks/postgres/e2e.celery.benchmark.ts index f5b8b72d..04f0da7a 100644 --- a/benchmarks/postgres/e2e.celery.benchmark.ts +++ b/benchmarks/postgres/e2e.celery.benchmark.ts @@ -72,7 +72,7 @@ describe('e2e celery & postgres benchmark', () => { for (let i = 0; i < count; i++) { createEvent() } - await delay(3000) + await delay(1000) expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(count) queue.resume() From 5bc9ccb5d46554b02ef619a070f413a0fa860cd1 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 20:38:16 +0100 Subject: [PATCH 16/18] create pubsub via createRedis --- src/server.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/server.ts b/src/server.ts index 99f5a632..7e72564a 100644 --- a/src/server.ts +++ b/src/server.ts @@ -259,7 +259,8 @@ export async function startPluginsServer( queue?.resume() }) - pubSub = new Redis(server.REDIS_URL, { enableAutoPipelining: true }) + // use one extra connection for redis pubsub + pubSub = await createRedis(server) await pubSub.subscribe(server.PLUGINS_RELOAD_PUBSUB_CHANNEL) pubSub.on('message', async (channel: string, message) => { if (channel === server!.PLUGINS_RELOAD_PUBSUB_CHANNEL) { From 3b23e55ad8f26a58113bf6433b76f94d73859a50 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 20:48:10 +0100 Subject: [PATCH 17/18] improve flakiness --- tests/postgres/queue.test.ts | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/tests/postgres/queue.test.ts b/tests/postgres/queue.test.ts index f30751d5..12d9fc4d 100644 --- a/tests/postgres/queue.test.ts +++ b/tests/postgres/queue.test.ts @@ -203,18 +203,23 @@ test('pause and resume queue', async () => { await queue.pause() - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(4) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(2) + const pluginQueue = await redis.llen(server.PLUGINS_CELERY_QUEUE) + const defaultQueue = await redis.llen(server.CELERY_DEFAULT_QUEUE) + + expect(pluginQueue + defaultQueue).toBe(6) + + expect(pluginQueue).not.toBe(0) + expect(defaultQueue).not.toBe(0) await delay(100) - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(4) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(2) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(pluginQueue) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(defaultQueue) await delay(100) - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(4) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(2) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(pluginQueue) + expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(defaultQueue) queue.resume() From 8e342e37711e207f624eed59f3c283aebf462dd9 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 23 Feb 2021 20:57:32 +0100 Subject: [PATCH 18/18] improve flakiness... again --- tests/postgres/queue.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/postgres/queue.test.ts b/tests/postgres/queue.test.ts index 12d9fc4d..b6a3381e 100644 --- a/tests/postgres/queue.test.ts +++ b/tests/postgres/queue.test.ts @@ -198,8 +198,7 @@ test('pause and resume queue', async () => { }) await advanceOneTick() - expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) - expect(await redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(0) + expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).not.toBe(6) await queue.pause()