Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Redis connection pool #204

Merged
merged 19 commits into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"@sentry/tracing": "^5.29.0",
"adm-zip": "^0.4.16",
"fastify": "^3.8.0",
"generic-pool": "^3.7.1",
"hot-shots": "^8.2.1",
"ioredis": "^4.19.2",
"kafkajs": "^1.15.0",
Expand All @@ -76,6 +77,7 @@
"@posthog/plugin-scaffold": "0.2.8",
"@types/adm-zip": "^0.4.33",
"@types/babel__standalone": "^7.1.3",
"@types/generic-pool": "^3.1.9",
"@types/ioredis": "^4.17.7",
"@types/jest": "^26.0.15",
"@types/luxon": "^1.25.0",
Expand Down
43 changes: 34 additions & 9 deletions src/db.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import ClickHouse from '@posthog/clickhouse'
import { Properties } from '@posthog/plugin-scaffold'
import { createPool, Pool as GenericPool } from 'generic-pool'
import Redis from 'ioredis'
import { Producer, ProducerRecord } from 'kafkajs'
import { DateTime } from 'luxon'
import { Pool, PoolClient, QueryConfig, QueryResult, QueryResultRow } from 'pg'

import { KAFKA_PERSON, KAFKA_PERSON_UNIQUE_ID } from './ingestion/topics'
import { chainToElements, hashElements, timeoutGuard, unparsePersonPartial } from './ingestion/utils'
import { status } from './status'
import {
ClickHouseEvent,
ClickHousePerson,
Expand Down Expand Up @@ -36,17 +38,28 @@ export class DB {
/** Postgres connection pool for primary database access. */
postgres: Pool
/** Redis used for various caches. */
redis: Redis.Redis
redisPool: GenericPool<Redis.Redis>

/** Kafka producer used for syncing Postgres and ClickHouse person data. */
kafkaProducer?: Producer
/** ClickHouse used for syncing Postgres and ClickHouse person data. */
clickhouse?: ClickHouse

constructor(postgres: Pool, redis: Redis.Redis, kafkaProducer?: Producer, clickhouse?: ClickHouse) {
this.postgres = postgres
this.redis = redis
this.kafkaProducer = kafkaProducer
this.clickhouse = clickhouse
this.redisPool = createPool<Redis.Redis>(
{
create: () => Promise.resolve(redis),
destroy: () => Promise.resolve(undefined),
},
{
min: 1,
max: 1,
autostart: true,
}
)
}

// Postgres
Expand Down Expand Up @@ -118,10 +131,11 @@ export class DB {
// Redis

public async redisGet(key: string, defaultValue: unknown, parseJSON = true): Promise<unknown> {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard(`Getting redis key delayed. Waiting over 30 sec to get key: ${key}`)
try {
const value = await tryTwice(
async () => await this.redis.get(key),
async () => await client.get(key),
`Waited 5 sec to get redis key: ${key}, retrying once!`
)
if (typeof value === 'undefined') {
Expand All @@ -137,57 +151,68 @@ export class DB {
}
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
}

public async redisSet(key: string, value: unknown, ttlSeconds?: number, stringify = true): Promise<void> {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard(`Setting redis key delayed. Waiting over 30 sec to set key: ${key}`)
try {
const serializedValue = stringify ? JSON.stringify(value) : (value as string)
if (ttlSeconds) {
await this.redis.set(key, serializedValue, 'EX', ttlSeconds)
await client.set(key, serializedValue, 'EX', ttlSeconds)
} else {
await this.redis.set(key, serializedValue)
await client.set(key, serializedValue)
}
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
}

public async redisIncr(key: string): Promise<number> {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard(`Incrementing redis key delayed. Waiting over 30 sec to incr key: ${key}`)
try {
return await this.redis.incr(key)
return await client.incr(key)
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
}

public async redisExpire(key: string, ttlSeconds: number): Promise<boolean> {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard(`Expiring redis key delayed. Waiting over 30 sec to expire key: ${key}`)
try {
return (await this.redis.expire(key, ttlSeconds)) === 1
return (await client.expire(key, ttlSeconds)) === 1
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
}

public async redisLPush(key: string, value: unknown, stringify = true): Promise<number> {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard(`LPushing redis key delayed. Waiting over 30 sec to lpush key: ${key}`)
try {
const serializedValue = stringify ? JSON.stringify(value) : (value as string)
return await this.redis.lpush(key, serializedValue)
return await client.lpush(key, serializedValue)
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
}

public async redisBRPop(key1: string, key2: string): Promise<[string, string]> {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard(`BRPoping redis key delayed. Waiting over 30 sec to brpop keys: ${key1}, ${key2}`)
try {
return await this.redis.brpop(key1, key2)
return await client.brpop(key1, key2)
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/ingestion/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export class KafkaQueue implements Queue {

status.info(
'🧩',
`Kafka Batch of ${batch.messages.length} events completed in ${
`Kafka Batch of ${pluginEvents.length} events completed in ${
new Date().valueOf() - batchStartTimer.valueOf()
}ms (plugins: ${batchIngestionTimer.valueOf() - batchStartTimer.valueOf()}ms, ingestion: ${
new Date().valueOf() - batchIngestionTimer.valueOf()
Expand Down
12 changes: 12 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,13 @@
resolved "https://registry.yarnpkg.com/@types/cookiejar/-/cookiejar-2.1.2.tgz#66ad9331f63fe8a3d3d9d8c6e3906dd10f6446e8"
integrity sha512-t73xJJrvdTjXrn4jLS9VSGRbz0nUY3cl2DMGDU48lKl+HR9dbbjW2A9r3g40VA++mQpy6uuHg33gy7du2BKpog==

"@types/generic-pool@^3.1.9":
version "3.1.9"
resolved "https://registry.yarnpkg.com/@types/generic-pool/-/generic-pool-3.1.9.tgz#cc82ee0d92561fce713f8f9a7b2380eda8a89dcb"
integrity sha512-IkXMs8fhV6+E4J8EWv8iL7mLvApcLLQUH4m1Rex3KCPRqT+Xya0DDHIeGAokk/6VXe9zg8oTWyr+FGyeuimEYQ==
dependencies:
"@types/node" "*"

"@types/graceful-fs@^4.1.2":
version "4.1.4"
resolved "https://registry.yarnpkg.com/@types/graceful-fs/-/graceful-fs-4.1.4.tgz#4ff9f641a7c6d1a3508ff88bc3141b152772e753"
Expand Down Expand Up @@ -3508,6 +3515,11 @@ gcp-metadata@^4.2.0:
gaxios "^4.0.0"
json-bigint "^1.0.0"

generic-pool@^3.7.1:
version "3.7.1"
resolved "https://registry.yarnpkg.com/generic-pool/-/generic-pool-3.7.1.tgz#36fe5bb83e7e0e032e5d32cd05dc00f5ff119aa8"
integrity sha512-ug6DAZoNgWm6q5KhPFA+hzXfBLFQu5sTXxPpv44DmE0A2g+CiHoq9LTVdkXpZMkYVMoGw83F6W+WT0h0MFMK/w==

gensync@^1.0.0-beta.1:
version "1.0.0-beta.2"
resolved "https://registry.yarnpkg.com/gensync/-/gensync-1.0.0-beta.2.tgz#32a6ee76c3d7f52d46b2b1ae5d93fea8580a25e0"
Expand Down