Skip to content

Commit

Permalink
[plugin-server] Redis connection pool (PostHog/plugin-server#204)
Browse files Browse the repository at this point in the history
* pool of 1 for redis to help with async

* space

* increase delay now that redis is sequential

* actually use a pool

* fix some tests

* add REDIS_POOL_MIN_SIZE and REDIS_POOL_MAX_SIZE, and fix a few tests

* redlock uses another redis connection outside the pool

* increase delay

* fix type

* more fixes

* more fixes

* queue test without mocked redis

* increase redis pool for worker test

* fix worker test

* shorter delay

* create pubsub via createRedis

* improve flakiness

* improve flakiness... again

Co-authored-by: Michael Matloka <[email protected]>
  • Loading branch information
mariusandra and Twixes authored Feb 23, 2021
1 parent 680cf5d commit 25126be
Show file tree
Hide file tree
Showing 17 changed files with 218 additions and 351 deletions.
14 changes: 10 additions & 4 deletions benchmarks/postgres/e2e.celery.benchmark.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as IORedis from 'ioredis'
import { performance } from 'perf_hooks'

import { startPluginsServer } from '../../src/server'
Expand All @@ -14,6 +15,8 @@ jest.setTimeout(600000) // 10min timeout

const extraServerConfig: Partial<PluginsServerConfig> = {
WORKER_CONCURRENCY: 4,
REDIS_POOL_MIN_SIZE: 3,
REDIS_POOL_MAX_SIZE: 3,
PLUGINS_CELERY_QUEUE: 'test-plugins-celery-queue',
CELERY_DEFAULT_QUEUE: 'test-celery-default-queue',
PLUGIN_SERVER_INGESTION: true,
Expand All @@ -26,6 +29,7 @@ describe('e2e celery & postgres benchmark', () => {
let server: PluginsServer
let stopServer: () => Promise<void>
let posthog: DummyPostHog
let redis: IORedis.Redis

beforeEach(async () => {
await resetTestDatabase(`
Expand All @@ -40,14 +44,16 @@ describe('e2e celery & postgres benchmark', () => {
server = startResponse.server
stopServer = startResponse.stop
queue = startResponse.queue
redis = await server.redisPool.acquire()

await server.redis.del(server.PLUGINS_CELERY_QUEUE)
await server.redis.del(server.CELERY_DEFAULT_QUEUE)
await redis.del(server.PLUGINS_CELERY_QUEUE)
await redis.del(server.CELERY_DEFAULT_QUEUE)

posthog = createPosthog(server, pluginConfig39)
})

afterEach(async () => {
await server.redisPool.release(redis)
await stopServer()
})

Expand All @@ -62,12 +68,12 @@ describe('e2e celery & postgres benchmark', () => {
posthog.capture('custom event', { name: 'haha', uuid, randomProperty: 'lololo' })
}
await queue.pause()
expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(0)
expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(0)
for (let i = 0; i < count; i++) {
createEvent()
}
await delay(1000)
expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(count)
expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(count)
queue.resume()

console.log('Starting timer')
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"@sentry/tracing": "^5.29.0",
"adm-zip": "^0.4.16",
"fastify": "^3.8.0",
"generic-pool": "^3.7.1",
"hot-shots": "^8.2.1",
"ioredis": "^4.19.2",
"kafkajs": "^1.15.0",
Expand All @@ -76,6 +77,7 @@
"@posthog/plugin-scaffold": "0.2.8",
"@types/adm-zip": "^0.4.33",
"@types/babel__standalone": "^7.1.3",
"@types/generic-pool": "^3.1.9",
"@types/ioredis": "^4.17.7",
"@types/jest": "^26.0.15",
"@types/luxon": "^1.25.0",
Expand All @@ -99,7 +101,6 @@
"eslint-plugin-react": "^7.20.0",
"eslint-plugin-simple-import-sort": "^7.0.0",
"husky": ">=4",
"ioredis-mock": "^5.2.0",
"jest": "^26.6.3",
"lint-staged": ">=10.5.1",
"prettier": "^2.2.1",
Expand Down
4 changes: 4 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export function getDefaultConfig(): PluginsServerConfig {
STATSD_PORT: 8125,
STATSD_PREFIX: 'plugin-server.',
SCHEDULE_LOCK_TTL: 60,
REDIS_POOL_MIN_SIZE: 1,
REDIS_POOL_MAX_SIZE: 3,
}
}

Expand Down Expand Up @@ -77,6 +79,8 @@ export function getConfigHelp(): Record<keyof PluginsServerConfig, string> {
STATSD_PORT: 'StatsD port',
STATSD_PREFIX: 'StatsD prefix',
SCHEDULE_LOCK_TTL: 'How many seconds to hold the lock for the schedule',
REDIS_POOL_MIN_SIZE: 'Minimum number of Redis connections to use per thread',
REDIS_POOL_MAX_SIZE: 'Maximum number of Redis connections to use per thread',
}
}

Expand Down
40 changes: 30 additions & 10 deletions src/db.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import ClickHouse from '@posthog/clickhouse'
import { Properties } from '@posthog/plugin-scaffold'
import { Pool as GenericPool } from 'generic-pool'
import Redis from 'ioredis'
import { Producer, ProducerRecord } from 'kafkajs'
import { DateTime } from 'luxon'
Expand All @@ -26,6 +27,7 @@ import {
import {
castTimestampOrNow,
clickHouseTimestampToISO,
createRedis,
escapeClickHouseString,
sanitizeSqlIdentifier,
tryTwice,
Expand All @@ -36,17 +38,23 @@ export class DB {
/** Postgres connection pool for primary database access. */
postgres: Pool
/** Redis used for various caches. */
redis: Redis.Redis
redisPool: GenericPool<Redis.Redis>

/** Kafka producer used for syncing Postgres and ClickHouse person data. */
kafkaProducer?: Producer
/** ClickHouse used for syncing Postgres and ClickHouse person data. */
clickhouse?: ClickHouse

constructor(postgres: Pool, redis: Redis.Redis, kafkaProducer?: Producer, clickhouse?: ClickHouse) {
constructor(
postgres: Pool,
redisPool: GenericPool<Redis.Redis>,
kafkaProducer?: Producer,
clickhouse?: ClickHouse
) {
this.postgres = postgres
this.redis = redis
this.kafkaProducer = kafkaProducer
this.clickhouse = clickhouse
this.redisPool = redisPool
}

// Postgres
Expand Down Expand Up @@ -118,10 +126,11 @@ export class DB {
// Redis

public async redisGet(key: string, defaultValue: unknown, parseJSON = true): Promise<unknown> {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard(`Getting redis key delayed. Waiting over 30 sec to get key: ${key}`)
try {
const value = await tryTwice(
async () => await this.redis.get(key),
async () => await client.get(key),
`Waited 5 sec to get redis key: ${key}, retrying once!`
)
if (typeof value === 'undefined') {
Expand All @@ -137,57 +146,68 @@ export class DB {
}
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
}

public async redisSet(key: string, value: unknown, ttlSeconds?: number, stringify = true): Promise<void> {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard(`Setting redis key delayed. Waiting over 30 sec to set key: ${key}`)
try {
const serializedValue = stringify ? JSON.stringify(value) : (value as string)
if (ttlSeconds) {
await this.redis.set(key, serializedValue, 'EX', ttlSeconds)
await client.set(key, serializedValue, 'EX', ttlSeconds)
} else {
await this.redis.set(key, serializedValue)
await client.set(key, serializedValue)
}
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
}

public async redisIncr(key: string): Promise<number> {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard(`Incrementing redis key delayed. Waiting over 30 sec to incr key: ${key}`)
try {
return await this.redis.incr(key)
return await client.incr(key)
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
}

public async redisExpire(key: string, ttlSeconds: number): Promise<boolean> {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard(`Expiring redis key delayed. Waiting over 30 sec to expire key: ${key}`)
try {
return (await this.redis.expire(key, ttlSeconds)) === 1
return (await client.expire(key, ttlSeconds)) === 1
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
}

public async redisLPush(key: string, value: unknown, stringify = true): Promise<number> {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard(`LPushing redis key delayed. Waiting over 30 sec to lpush key: ${key}`)
try {
const serializedValue = stringify ? JSON.stringify(value) : (value as string)
return await this.redis.lpush(key, serializedValue)
return await client.lpush(key, serializedValue)
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
}

public async redisBRPop(key1: string, key2: string): Promise<[string, string]> {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard(`BRPoping redis key delayed. Waiting over 30 sec to brpop keys: ${key1}, ${key2}`)
try {
return await this.redis.brpop(key1, key2)
return await client.brpop(key1, key2)
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/ingestion/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export class KafkaQueue implements Queue {

status.info(
'🧩',
`Kafka Batch of ${batch.messages.length} events completed in ${
`Kafka Batch of ${pluginEvents.length} events completed in ${
new Date().valueOf() - batchStartTimer.valueOf()
}ms (plugins: ${batchIngestionTimer.valueOf() - batchStartTimer.valueOf()}ms, ingestion: ${
new Date().valueOf() - batchIngestionTimer.valueOf()
Expand Down
41 changes: 23 additions & 18 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import { FastifyInstance } from 'fastify'
import * as fs from 'fs'
import { createPool } from 'generic-pool'
import { StatsD } from 'hot-shots'
import Redis from 'ioredis'
import { Kafka, logLevel, Producer } from 'kafkajs'
Expand All @@ -20,7 +21,7 @@ import { KAFKA_EVENTS_PLUGIN_INGESTION, KAFKA_EVENTS_WAL } from './ingestion/top
import { startSchedule } from './services/schedule'
import { status } from './status'
import { PluginsServer, PluginsServerConfig, Queue } from './types'
import { delay, UUIDT } from './utils'
import { createRedis, delay, UUIDT } from './utils'
import { version } from './version'
import { startFastifyInstance, stopFastifyInstance } from './web/server'
import { startQueue } from './worker/queue'
Expand All @@ -34,19 +35,6 @@ export async function createServer(
...config,
}

const redis = new Redis(serverConfig.REDIS_URL, { maxRetriesPerRequest: -1 })
redis
.on('error', (error) => {
Sentry.captureException(error)
status.error('🔴', 'Redis error encountered! Trying to reconnect...\n', error)
})
.on('ready', () => {
if (process.env.NODE_ENV !== 'test') {
status.info('✅', 'Connected to Redis!')
}
})
await redis.info()

let kafkaSsl: ConnectionOptions | undefined
if (
serverConfig.KAFKA_CLIENT_CERT_B64 &&
Expand Down Expand Up @@ -129,7 +117,22 @@ export async function createServer(
}
: undefined,
})
const db = new DB(postgres, redis, kafkaProducer, clickhouse)

const redisPool = createPool<Redis.Redis>(
{
create: () => createRedis(serverConfig),
destroy: async (client) => {
await client.quit()
},
},
{
min: serverConfig.REDIS_POOL_MIN_SIZE,
max: serverConfig.REDIS_POOL_MAX_SIZE,
autostart: true,
}
)

const db = new DB(postgres, redisPool, kafkaProducer, clickhouse)

let statsd: StatsD | undefined
if (serverConfig.STATSD_HOST) {
Expand All @@ -151,7 +154,7 @@ export async function createServer(
...serverConfig,
db,
postgres,
redis,
redisPool,
clickhouse,
kafka,
kafkaProducer,
Expand All @@ -169,7 +172,8 @@ export async function createServer(

const closeServer = async () => {
await kafkaProducer?.disconnect()
await server.redis.quit()
await redisPool.drain()
await redisPool.clear()
await server.postgres.end()
}

Expand Down Expand Up @@ -255,7 +259,8 @@ export async function startPluginsServer(
queue?.resume()
})

pubSub = new Redis(server.REDIS_URL, { enableAutoPipelining: true })
// use one extra connection for redis pubsub
pubSub = await createRedis(server)
await pubSub.subscribe(server.PLUGINS_RELOAD_PUBSUB_CHANNEL)
pubSub.on('message', async (channel: string, message) => {
if (channel === server!.PLUGINS_RELOAD_PUBSUB_CHANNEL) {
Expand Down
7 changes: 6 additions & 1 deletion src/services/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Redlock from 'redlock'
import { processError } from '../error'
import { status } from '../status'
import { PluginConfigId, PluginsServer } from '../types'
import { createRedis } from '../utils'

const LOCKED_RESOURCE = 'plugin-server:locks:schedule'

Expand All @@ -25,7 +26,10 @@ export async function startSchedule(
const retryDelay = lockTTL / 10 // 6 sec
const extendDelay = lockTTL / 2 // 30 sec

const redlock = new Redlock([server.redis], {
// use another redis connection for redlock
const redis = await createRedis(server)

const redlock = new Redlock([redis], {
// we handle retires ourselves to have a way to cancel the promises on quit
// without this, the `await redlock.lock()` code will remain inflight and cause issues
retryCount: 0,
Expand Down Expand Up @@ -100,6 +104,7 @@ export async function startSchedule(
runEveryMinuteJob && schedule.cancelJob(runEveryMinuteJob)

await lock?.unlock().catch(Sentry.captureException)
await redis.quit()
await waitForTasksToFinish(server!)
}

Expand Down
5 changes: 4 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import ClickHouse from '@posthog/clickhouse'
import { PluginAttachment, PluginConfigSchema, PluginEvent, Properties } from '@posthog/plugin-scaffold'
import { Pool as GenericPool } from 'generic-pool'
import { StatsD } from 'hot-shots'
import { EventsProcessor } from 'ingestion/process-event'
import { Redis } from 'ioredis'
Expand Down Expand Up @@ -51,13 +52,15 @@ export interface PluginsServerConfig extends Record<string, any> {
STATSD_PORT: number
STATSD_PREFIX: string
SCHEDULE_LOCK_TTL: number
REDIS_POOL_MIN_SIZE: number
REDIS_POOL_MAX_SIZE: number
}

export interface PluginsServer extends PluginsServerConfig {
// active connections to Postgres, Redis, ClickHouse, Kafka, StatsD
db: DB
postgres: Pool
redis: Redis
redisPool: GenericPool<Redis>
clickhouse?: ClickHouse
kafka?: Kafka
kafkaProducer?: Producer
Expand Down
Loading

0 comments on commit 25126be

Please sign in to comment.