diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d0030629f..39ccf6730 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -148,7 +148,7 @@ jobs: ping -c 1 kafka ping -c 1 zookeeper - - name: Start Kafka, Clickhouse, Zookeeper + - name: Start Kafka, ClickHouse, Zookeeper run: | cd posthog/ee docker-compose -f docker-compose.ch.yml up -d zookeeper kafka clickhouse @@ -191,6 +191,97 @@ jobs: REDIS_URL: 'redis://localhost' run: cd plugin-server && yarn test:clickhouse + benchmarks-clickhouse: + name: Benchmarks / ClickHouse + Kafka + runs-on: ubuntu-20.04 + + services: + postgres: + image: postgres:12 + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: test_posthog + ports: ['5432:5432'] + options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 + redis: + image: redis + ports: + - '6379:6379' + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + env: + REDIS_URL: 'redis://localhost' + CLICKHOUSE_HOST: 'localhost' + CLICKHOUSE_DATABASE: 'posthog_test' + KAFKA_ENABLED: 'true' + KAFKA_HOSTS: 'kafka:9092' + + steps: + - name: Check out Django server for database setup + uses: actions/checkout@v2 + with: + repository: 'PostHog/posthog' + path: 'posthog/' + + - name: Check out plugin server + uses: actions/checkout@v2 + with: + path: 'plugin-server' + + - name: Fix Kafka Hostname + run: | + sudo bash -c 'echo "127.0.0.1 kafka zookeeper" >> /etc/hosts' + ping -c 1 kafka + ping -c 1 zookeeper + + - name: Start Kafka, ClickHouse, Zookeeper + run: | + cd posthog/ee + docker-compose -f docker-compose.ch.yml up -d zookeeper kafka clickhouse + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: 3.8 + + - name: Set up Node 14 + uses: actions/setup-node@v2 + with: + node-version: 14 + + - uses: actions/cache@v2 + with: + path: ${{ env.pythonLocation }} + key: ${{ env.pythonLocation }}-${{ hashFiles('posthog/requirements.txt') }} + + - name: Install requirements.txt dependencies with pip + run: | + pip install --upgrade pip + pip install --upgrade --upgrade-strategy eager -r posthog/requirements.txt + + - name: Set up databases + env: + SECRET_KEY: 'abcdef' # unsafe - for testing only + DATABASE_URL: 'postgres://postgres:postgres@localhost:5432/posthog' + PRIMARY_DB: 'clickhouse' + TEST: 'true' + run: python posthog/manage.py setup_test_environment + + - name: Install package.json dependencies with Yarn + run: cd plugin-server && yarn + + - name: Run benchmarks + env: + # Below DB name has `test_` prepended, as that's how Django (ran above) creates the test DB + DATABASE_URL: 'postgres://postgres:postgres@localhost:5432/test_posthog' + REDIS_URL: 'redis://localhost' + run: cd plugin-server && yarn benchmark:clickhouse + benchmarks-postgres: name: Benchmarks / Postgres + Redis runs-on: ubuntu-20.04 @@ -264,4 +355,79 @@ jobs: # Below DB name has `test_` prepended, as that's how Django (ran above) creates the test DB DATABASE_URL: 'postgres://postgres:postgres@localhost:5432/test_posthog' REDIS_URL: 'redis://localhost' - run: cd plugin-server && yarn benchmark + run: cd plugin-server && yarn benchmark:postgres + + benchmarks-vm: + name: Benchmarks / VM Performance & Memory + runs-on: ubuntu-20.04 + + services: + postgres: + image: postgres:12 + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: test_posthog + ports: ['5432:5432'] + options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 + redis: + image: redis + ports: + - '6379:6379' + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + env: + REDIS_URL: 'redis://localhost' + + steps: + - name: Check out Django server for database setup + uses: actions/checkout@v2 + with: + repository: 'PostHog/posthog' + path: 'posthog/' + + - name: Check out plugin server + uses: actions/checkout@v2 + with: + path: 'plugin-server' + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: 3.8 + + - name: Set up Node 14 + uses: actions/setup-node@v2 + with: + node-version: 14 + + - uses: actions/cache@v2 + with: + path: ${{ env.pythonLocation }} + key: ${{ env.pythonLocation }}-${{ hashFiles('posthog/requirements.txt') }} + + - name: Install requirements.txt dependencies with pip + run: | + pip install --upgrade pip + pip install --upgrade --upgrade-strategy eager -r posthog/requirements.txt + + - name: Set up databases + env: + SECRET_KEY: 'abcdef' # unsafe - for testing only + DATABASE_URL: 'postgres://postgres:postgres@localhost:5432/posthog' + TEST: 'true' + run: python posthog/manage.py setup_test_environment + + - name: Install package.json dependencies with Yarn + run: cd plugin-server && yarn + + - name: Run benchmarks + env: + # Below DB name has `test_` prepended, as that's how Django (ran above) creates the test DB + DATABASE_URL: 'postgres://postgres:postgres@localhost:5432/test_posthog' + REDIS_URL: 'redis://localhost' + run: cd plugin-server && yarn benchmark:vm diff --git a/benchmarks/clickhouse/e2e.kafka.benchmark.ts b/benchmarks/clickhouse/e2e.kafka.benchmark.ts new file mode 100644 index 000000000..e81f39616 --- /dev/null +++ b/benchmarks/clickhouse/e2e.kafka.benchmark.ts @@ -0,0 +1,95 @@ +import { performance } from 'perf_hooks' + +import { createPosthog, DummyPostHog } from '../../src/extensions/posthog' +import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics' +import { startPluginsServer } from '../../src/server' +import { LogLevel, PluginsServerConfig, Queue } from '../../src/types' +import { PluginsServer } from '../../src/types' +import { delay, UUIDT } from '../../src/utils' +import { makePiscina } from '../../src/worker/piscina' +import { resetTestDatabaseClickhouse } from '../../tests/helpers/clickhouse' +import { resetKafka } from '../../tests/helpers/kafka' +import { pluginConfig39 } from '../../tests/helpers/plugins' +import { resetTestDatabase } from '../../tests/helpers/sql' +import { delayUntilEventIngested } from '../../tests/shared/process-event' + +jest.setTimeout(600000) // 10min timeout + +const extraServerConfig: Partial = { + KAFKA_ENABLED: true, + KAFKA_HOSTS: process.env.KAFKA_HOSTS || 'kafka:9092', + WORKER_CONCURRENCY: 4, + PLUGIN_SERVER_INGESTION: true, + KAFKA_CONSUMPTION_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION, + KAFKA_BATCH_PARALELL_PROCESSING: true, + LOG_LEVEL: LogLevel.Log, +} + +describe('e2e kafka & clickhouse benchmark', () => { + let queue: Queue + let server: PluginsServer + let stopServer: () => Promise + let posthog: DummyPostHog + + beforeEach(async () => { + await resetTestDatabase(` + async function processEventBatch (batch) { + // console.log(\`Received batch of \${batch.length} events\`) + return batch.map(event => { + event.properties.processed = 'hell yes' + event.properties.upperUuid = event.properties.uuid?.toUpperCase() + return event + }) + } + `) + await resetKafka(extraServerConfig) + await resetTestDatabaseClickhouse(extraServerConfig) + + const startResponse = await startPluginsServer(extraServerConfig, makePiscina) + server = startResponse.server + stopServer = startResponse.stop + queue = startResponse.queue + + posthog = createPosthog(server, pluginConfig39) + }) + + afterEach(async () => { + await stopServer() + }) + + test('measure performance', async () => { + console.debug = () => null + + const count = 3000 + + // fill in the queue + function createEvent() { + const uuid = new UUIDT().toString() + posthog.capture('custom event', { name: 'haha', uuid, randomProperty: 'lololo' }) + } + await queue.pause() + for (let i = 0; i < count; i++) { + createEvent() + } + + // hope that 5sec is enough to load kafka with all the events (posthog.capture can't be awaited) + await delay(5000) + await queue.resume() + + console.log('Starting timer') + const startTime = performance.now() + await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count) + const timeMs = performance.now() - startTime + console.log('Finished!') + + const n = (n: number) => `${Math.round(n * 100) / 100}` + console.log( + `[Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n( + 1000 / (timeMs / count) + )} events/sec, ${n(timeMs / count)}ms per event)` + ) + + const events = await server.db.fetchEvents() + expect(events[count - 1].properties.upperUuid).toEqual(events[count - 1].properties.uuid.toUpperCase()) + }) +}) diff --git a/benchmarks/postgres/e2e.celery.benchmark.ts b/benchmarks/postgres/e2e.celery.benchmark.ts new file mode 100644 index 000000000..4acdd1ed3 --- /dev/null +++ b/benchmarks/postgres/e2e.celery.benchmark.ts @@ -0,0 +1,89 @@ +import { performance } from 'perf_hooks' + +import { createPosthog, DummyPostHog } from '../../src/extensions/posthog' +import { startPluginsServer } from '../../src/server' +import { LogLevel, PluginsServerConfig, Queue } from '../../src/types' +import { PluginsServer } from '../../src/types' +import { delay, UUIDT } from '../../src/utils' +import { makePiscina } from '../../src/worker/piscina' +import { pluginConfig39 } from '../../tests/helpers/plugins' +import { resetTestDatabase } from '../../tests/helpers/sql' +import { delayUntilEventIngested } from '../../tests/shared/process-event' + +jest.setTimeout(600000) // 10min timeout + +const extraServerConfig: Partial = { + WORKER_CONCURRENCY: 4, + PLUGINS_CELERY_QUEUE: 'test-plugins-celery-queue', + CELERY_DEFAULT_QUEUE: 'test-celery-default-queue', + PLUGIN_SERVER_INGESTION: true, + LOG_LEVEL: LogLevel.Log, + KAFKA_ENABLED: false, +} + +describe('e2e celery & postgres benchmark', () => { + let queue: Queue + let server: PluginsServer + let stopServer: () => Promise + let posthog: DummyPostHog + + beforeEach(async () => { + await resetTestDatabase(` + async function processEvent (event) { + event.properties.processed = 'hell yes' + event.properties.upperUuid = event.properties.uuid?.toUpperCase() + return event + } + `) + + const startResponse = await startPluginsServer(extraServerConfig, makePiscina) + server = startResponse.server + stopServer = startResponse.stop + queue = startResponse.queue + + await server.redis.del(server.PLUGINS_CELERY_QUEUE) + await server.redis.del(server.CELERY_DEFAULT_QUEUE) + + posthog = createPosthog(server, pluginConfig39) + }) + + afterEach(async () => { + await stopServer() + }) + + test('measure performance', async () => { + console.debug = () => null + + const count = 3000 + + // fill in the queue + function createEvent() { + const uuid = new UUIDT().toString() + posthog.capture('custom event', { name: 'haha', uuid, randomProperty: 'lololo' }) + } + await queue.pause() + expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(0) + for (let i = 0; i < count; i++) { + await createEvent() + } + await delay(1000) + expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(count) + await queue.resume() + + console.log('Starting timer') + const startTime = performance.now() + await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count) + const timeMs = performance.now() - startTime + console.log('Finished!') + + const n = (n: number) => `${Math.round(n * 100) / 100}` + console.log( + `[Celery & Postgres] Ingested ${count} events in ${n(timeMs / 1000)}s (${n( + 1000 / (timeMs / count) + )} events/sec, ${n(timeMs / count)}ms per event)` + ) + + const events = await server.db.fetchEvents() + expect(events[count - 1].properties.upperUuid).toEqual(events[count - 1].properties.uuid.toUpperCase()) + }) +}) diff --git a/benchmarks/postgres/helpers/log.ts b/benchmarks/postgres/helpers/log.ts new file mode 100644 index 000000000..f3ee9a980 --- /dev/null +++ b/benchmarks/postgres/helpers/log.ts @@ -0,0 +1,26 @@ +import { performance } from 'perf_hooks' + +const roundToTwo = (n: number) => `${Math.round(n * 100) / 100}` + +let _services: string +let _test: string +let _singular: string +let _plural: string +let _now: number + +export function startLog(services: string, test: string, singular = 'req', plural = 'reqs') { + _services = services + _test = test + _singular = singular + _plural = plural + _now = performance.now() +} + +export function endLog(count: number) { + const timeMs = performance.now() - _now + console.log( + `[${_services}] ${_test}: ${count} ${count === 1 ? _singular : _plural}, ${roundToTwo( + timeMs / 1000 + )} sec, ${roundToTwo(1000 / (timeMs / count))} ${_plural}/sec, ${roundToTwo(timeMs / count)}ms per ${_singular}` + ) +} diff --git a/benchmarks/postgres/helpers/piscina.ts b/benchmarks/postgres/helpers/piscina.ts new file mode 100644 index 000000000..41714729d --- /dev/null +++ b/benchmarks/postgres/helpers/piscina.ts @@ -0,0 +1,47 @@ +import { PluginEvent } from '@posthog/plugin-scaffold/src/types' + +import { defaultConfig } from '../../../src/config' +import { LogLevel } from '../../../src/types' +import { UUIDT } from '../../../src/utils' +import { makePiscina } from '../../../src/worker/piscina' + +export function setupPiscina(workers: number, tasksPerWorker: number) { + return makePiscina({ + ...defaultConfig, + WORKER_CONCURRENCY: workers, + TASKS_PER_WORKER: tasksPerWorker, + LOG_LEVEL: LogLevel.Log, + }) +} + +export function ingestOneEvent( + ingestEvent: (event: PluginEvent) => Promise, + index: number +): Promise { + const defaultEvent = { + distinct_id: 'my_id', + ip: '127.0.0.1', + site_url: 'http://localhost', + team_id: 2, + now: new Date().toISOString(), + event: 'default event', + properties: { key: 'value', index }, + uuid: new UUIDT().toString(), + } + return ingestEvent(defaultEvent) +} + +export async function ingestCountEvents(piscina: ReturnType, count: number) { + const maxPromises = 500 + const promises = Array(maxPromises) + const ingestEvent = (event: PluginEvent) => piscina.runTask({ task: 'ingestEvent', args: { event } }) + + const groups = Math.ceil(count / maxPromises) + for (let j = 0; j < groups; j++) { + const groupCount = groups === 1 ? count : j === groups - 1 ? count % maxPromises : maxPromises + for (let i = 0; i < groupCount; i++) { + promises[i] = ingestOneEvent(ingestEvent, i) + } + await Promise.all(promises) + } +} diff --git a/benchmarks/postgres/ingestion.benchmark.ts b/benchmarks/postgres/ingestion.benchmark.ts new file mode 100644 index 000000000..8ead9203a --- /dev/null +++ b/benchmarks/postgres/ingestion.benchmark.ts @@ -0,0 +1,129 @@ +import { PluginEvent } from '@posthog/plugin-scaffold/src/types' +import { DateTime } from 'luxon' +import os from 'os' +import { performance } from 'perf_hooks' + +import { IEvent } from '../../src/idl/protos' +import { EventsProcessor } from '../../src/ingestion/process-event' +import { createServer } from '../../src/server' +import { LogLevel, PluginsServer, SessionRecordingEvent, Team } from '../../src/types' +import { UUIDT } from '../../src/utils' +import { getFirstTeam, resetTestDatabase } from '../../tests/helpers/sql' +import { endLog, startLog } from './helpers/log' +import { ingestCountEvents, setupPiscina } from './helpers/piscina' + +jest.mock('../../src/sql') +jest.setTimeout(600000) // 600 sec timeout + +describe('ingestion benchmarks', () => { + let team: Team + let server: PluginsServer + let stopServer: () => Promise + let eventsProcessor: EventsProcessor + let now = DateTime.utc() + + async function processOneEvent(): Promise { + return await eventsProcessor.processEvent( + 'my_id', + '127.0.0.1', + 'http://localhost', + ({ + event: 'default event', + timestamp: now.toISO(), + properties: { token: team.api_token }, + } as any) as PluginEvent, + team.id, + now, + now, + new UUIDT().toString() + ) + } + + beforeEach(async () => { + await resetTestDatabase(` + function processEvent (event, meta) { + event.properties["somewhere"] = "in a benchmark"; + return event + } + `) + ;[server, stopServer] = await createServer({ + PLUGINS_CELERY_QUEUE: 'benchmark-plugins-celery-queue', + CELERY_DEFAULT_QUEUE: 'benchmark-celery-default-queue', + LOG_LEVEL: LogLevel.Log, + }) + eventsProcessor = new EventsProcessor(server) + team = await getFirstTeam(server) + now = DateTime.utc() + + // warmup + for (let i = 0; i < 5; i++) { + await processOneEvent() + } + }) + + afterEach(async () => { + await stopServer?.() + }) + + test('basic sequential ingestion', async () => { + const count = 3000 + + startLog('Postgres', 'Await Ingested', 'event', 'events') + + for (let i = 0; i < count; i++) { + await processOneEvent() + } + + endLog(count) + }) + + test('basic parallel ingestion', async () => { + const count = 3000 + const promises = [] + + startLog('Postgres', 'Promise.all Ingested', 'event', 'events') + + for (let i = 0; i < count; i++) { + promises.push(processOneEvent()) + } + await Promise.all(promises) + + endLog(count) + }) + + test('piscina ingestion', async () => { + const coreCount = os.cpus().length + const workerThreads = [1, 2, 4, 8, 12, 16].filter((threads) => threads <= coreCount) + const rounds = 1 + + const events = 10000 + + const result: Record = { + coreCount, + events, + } + + const results = [] + for (const threads of workerThreads) { + await resetTestDatabase('const processEvent = e => e') + const piscina = setupPiscina(threads, 10) + + // warmup + await ingestCountEvents(piscina, threads * 4) + + // start + const startTime = performance.now() + for (let i = 0; i < rounds; i++) { + await ingestCountEvents(piscina, events) + } + result[`${threads} thread${threads === 1 ? '' : 's'}`] = Math.round( + 1000 / ((performance.now() - startTime) / events / rounds) + ) + + await piscina.destroy() + console.log(JSON.stringify({ result }, null, 2)) + } + results.push(result) + console.table(results) + }) +}) diff --git a/benchmarks/memory.benchmark.ts b/benchmarks/vm/memory.benchmark.ts similarity index 92% rename from benchmarks/memory.benchmark.ts rename to benchmarks/vm/memory.benchmark.ts index 9a3bb3917..55739591f 100644 --- a/benchmarks/memory.benchmark.ts +++ b/benchmarks/vm/memory.benchmark.ts @@ -1,11 +1,11 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' -import { createServer } from '../src/server' -import { Plugin, PluginConfig, PluginConfigVMReponse } from '../src/types' -import { createPluginConfigVM } from '../src/vm' -import { commonOrganizationId } from '../tests/helpers/plugins' +import { createServer } from '../../src/server' +import { Plugin, PluginConfig, PluginConfigVMReponse } from '../../src/types' +import { createPluginConfigVM } from '../../src/vm' +import { commonOrganizationId } from '../../tests/helpers/plugins' -jest.mock('../src/sql') +jest.mock('../../src/sql') jest.setTimeout(600000) // 600 sec timeout function createEvent(index: number): PluginEvent { diff --git a/benchmarks/worker.benchmark.ts b/benchmarks/vm/worker.benchmark.ts similarity index 95% rename from benchmarks/worker.benchmark.ts rename to benchmarks/vm/worker.benchmark.ts index 6f333bc76..3b37e0e8e 100644 --- a/benchmarks/worker.benchmark.ts +++ b/benchmarks/vm/worker.benchmark.ts @@ -2,12 +2,12 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' import * as os from 'os' import { performance } from 'perf_hooks' -import { defaultConfig } from '../src/config' -import { LogLevel } from '../src/types' -import { makePiscina } from '../src/worker/piscina' -import { resetTestDatabase } from '../tests/helpers/sql' +import { defaultConfig } from '../../src/config' +import { LogLevel } from '../../src/types' +import { makePiscina } from '../../src/worker/piscina' +import { resetTestDatabase } from '../../tests/helpers/sql' -jest.mock('../src/sql') +jest.mock('../../src/sql') jest.setTimeout(600000) // 600 sec timeout function processOneEvent( @@ -111,7 +111,7 @@ test('piscina worker benchmark', async () => { async function processEvent (event, meta) { await new Promise(resolve => __jestSetTimeout(() => resolve(), 100)) event.properties = { "somewhere": "over the rainbow" }; - return event + return event } `, }, diff --git a/package.json b/package.json index 4e01bb82f..e6d844a39 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,10 @@ "test": "jest --runInBand tests/**/*.test.ts", "test:postgres": "jest --runInBand tests/postgres/*.test.ts tests/*.test.ts", "test:clickhouse": "jest --runInBand tests/clickhouse/*.test.ts", - "benchmark": "node --expose-gc node_modules/.bin/jest --runInBand benchmarks/", + "benchmark": "yarn run benchmarks:clickhouse && yarn run benchmark:postgres && yarn run benchmarks:vm", + "benchmark:clickhouse": "node --expose-gc node_modules/.bin/jest --runInBand benchmarks/clickhouse/", + "benchmark:postgres": "node --expose-gc node_modules/.bin/jest --runInBand benchmarks/postgres/", + "benchmark:vm": "node --expose-gc node_modules/.bin/jest --runInBand benchmarks/vm/", "start": "yarn start:dev", "start:dist": "node dist/src/index.js --base-dir ../posthog", "start:dev": "ts-node-dev --exit-child src/index.ts --base-dir ../posthog", @@ -27,7 +30,11 @@ "setup:dev:clickhouse": "cd ../posthog && export DEBUG=1 PRIMARY_DB=clickhouse && source env/bin/activate && python manage.py migrate_clickhouse", "setup:test:ee": "yarn setup:test:postgres && yarn setup:test:clickhouse", "setup:test:postgres": "cd ../posthog && (dropdb test_posthog || echo 'no db to drop') && createdb test_posthog && source env/bin/activate && DATABASE_URL=postgres://localhost:5432/test_posthog DEBUG=1 python manage.py migrate", - "setup:test:clickhouse": "cd ../posthog && export TEST=1 PRIMARY_DB=clickhouse CLICKHOUSE_DATABASE=posthog_test && source env/bin/activate && python manage.py migrate_clickhouse" + "setup:test:clickhouse": "cd ../posthog && export TEST=1 PRIMARY_DB=clickhouse CLICKHOUSE_DATABASE=posthog_test && source env/bin/activate && python manage.py migrate_clickhouse", + "services:start": "cd ../posthog && docker-compose -f ee/docker-compose.ch.yml up zookeeper kafka clickhouse", + "services:stop": "cd ../posthog && docker-compose -f ee/docker-compose.ch.yml down", + "services:clean": "cd ../posthog && docker-compose -f ee/docker-compose.ch.yml rm -v zookeeper kafka clickhouse", + "services": "yarn services:stop && yarn services:clean && yarn services:start" }, "bin": { "posthog-plugin-server": "bin/posthog-plugin-server" diff --git a/src/config.ts b/src/config.ts index f0888db29..f976443c3 100644 --- a/src/config.ts +++ b/src/config.ts @@ -21,6 +21,7 @@ export function getDefaultConfig(): PluginsServerConfig { KAFKA_CLIENT_CERT_KEY_B64: null, KAFKA_TRUSTED_CERT_B64: null, KAFKA_CONSUMPTION_TOPIC: null, + KAFKA_BATCH_PARALELL_PROCESSING: false, PLUGIN_SERVER_INGESTION: false, PLUGINS_CELERY_QUEUE: 'posthog-plugins', REDIS_URL: 'redis://127.0.0.1', @@ -67,6 +68,7 @@ export function getConfigHelp(): Record { KAFKA_CLIENT_CERT_B64: 'Kafka certificate in Base64', KAFKA_CLIENT_CERT_KEY_B64: 'Kafka certificate key in Base64', KAFKA_TRUSTED_CERT_B64: 'Kafka trusted CA in Base64', + KAFKA_BATCH_PARALELL_PROCESSING: 'Process batches coming from Kafka in parallel', SENTRY_DSN: 'Sentry ingestion URL', STATSD_HOST: 'StatsD host - integration disabled if this is not provided', STATSD_PORT: 'StatsD port', diff --git a/src/ingestion/ingest-event.ts b/src/ingestion/ingest-event.ts new file mode 100644 index 000000000..458e3c56a --- /dev/null +++ b/src/ingestion/ingest-event.ts @@ -0,0 +1,29 @@ +import { PluginEvent } from '@posthog/plugin-scaffold' +import * as Sentry from '@sentry/node' +import { DateTime } from 'luxon' + +import { PluginsServer } from '../types' + +export type IngestEventResponse = { success?: boolean; error?: string } + +export async function ingestEvent(server: PluginsServer, event: PluginEvent): Promise { + try { + const { distinct_id, ip, site_url, team_id, now, sent_at, uuid } = event + await server.eventsProcessor.processEvent( + distinct_id, + ip, + site_url, + event, + team_id, + DateTime.fromISO(now), + sent_at ? DateTime.fromISO(sent_at) : null, + uuid! // it will throw if it's undefined + ) + // We don't want to return the inserted DB entry that `processEvent` returns. + // This response is passed to piscina and would be discarded anyway. + return { success: true } + } catch (e) { + Sentry.captureException(e) + return { error: e.message } + } +} diff --git a/src/ingestion/kafka-queue.ts b/src/ingestion/kafka-queue.ts index 40c73648e..6c353d91d 100644 --- a/src/ingestion/kafka-queue.ts +++ b/src/ingestion/kafka-queue.ts @@ -4,7 +4,7 @@ import { Consumer, EachBatchPayload, Kafka, Message } from 'kafkajs' import { PluginsServer, Queue, RawEventMessage } from 'types' import { status } from '../status' -import { killGracefully } from '../utils' +import { killGracefully, runInParallelBatches } from '../utils' export class KafkaQueue implements Queue { private pluginsServer: PluginsServer @@ -51,7 +51,16 @@ export class KafkaQueue implements Queue { } }) - const processedEvents = await this.processEventBatch(pluginEvents) + const maxBatchSize = Math.max( + 1, + Math.min( + 100, + Math.ceil( + pluginEvents.length / this.pluginsServer.WORKER_CONCURRENCY / this.pluginsServer.TASKS_PER_WORKER + ) + ) + ) + const processedEvents = await runInParallelBatches(pluginEvents, maxBatchSize, this.processEventBatch) // Sort in the original order that the events came in, putting any randomly added events to the end. // This is so we would resolve the correct kafka offsets in order. @@ -59,27 +68,39 @@ export class KafkaQueue implements Queue { (a, b) => (uuidOrder.get(a.uuid!) || pluginEvents.length) - (uuidOrder.get(b.uuid!) || pluginEvents.length) ) - for (const event of processedEvents) { - if (!isRunning()) { - status.info('😮', 'Consumer not running anymore, canceling batch processing!') - return - } - if (isStale()) { - status.info('😮', 'Batch stale, canceling batch processing!') - return + // TODO: add chunking into groups of 500 or so. Might start too many promises at once now + if (this.pluginsServer.KAFKA_BATCH_PARALELL_PROCESSING) { + const ingestOneEvent = async (event: PluginEvent) => { + const singleIngestionTimer = new Date() + await this.saveEvent(event) + this.pluginsServer.statsd?.timing('kafka_queue.single_ingestion', singleIngestionTimer) } - const singleIngestionTimer = new Date() - await this.saveEvent(event) - const offset = uuidOffset.get(event.uuid!) - if (offset) { - resolveOffset(offset) + await Promise.all(processedEvents.map((event) => ingestOneEvent(event))) + } else { + for (const event of processedEvents) { + if (!isRunning()) { + status.info('😮', 'Consumer not running anymore, canceling batch processing!') + return + } + if (isStale()) { + status.info('😮', 'Batch stale, canceling batch processing!') + return + } + + const singleIngestionTimer = new Date() + await this.saveEvent(event) + const offset = uuidOffset.get(event.uuid!) + if (offset) { + resolveOffset(offset) + } + await heartbeat() + await commitOffsetsIfNecessary() + this.pluginsServer.statsd?.timing('kafka_queue.single_ingestion', singleIngestionTimer) } - await heartbeat() - await commitOffsetsIfNecessary() - this.pluginsServer.statsd?.timing('kafka_queue.single_ingestion', singleIngestionTimer) } this.pluginsServer.statsd?.timing('kafka_queue.each_batch', batchProcessingTimer) resolveOffset(batch.lastOffset()) + await heartbeat() await commitOffsetsIfNecessary() } diff --git a/src/server.ts b/src/server.ts index 9cf1bf598..1f77ce83f 100644 --- a/src/server.ts +++ b/src/server.ts @@ -244,25 +244,12 @@ export async function startPluginsServer( ;[server, closeServer] = await createServer(serverConfig, null) piscina = makePiscina(serverConfig) - const processEvent = (event: PluginEvent) => { - if ((piscina?.queueSize || 0) > (server?.WORKER_CONCURRENCY || 4) * (server?.WORKER_CONCURRENCY || 4)) { - queue?.pause() - } - return piscina!.runTask({ task: 'processEvent', args: { event } }) - } - const processEventBatch = (batch: PluginEvent[]) => { - if ((piscina?.queueSize || 0) > (server?.WORKER_CONCURRENCY || 4) * (server?.WORKER_CONCURRENCY || 4)) { - queue?.pause() - } - return piscina!.runTask({ task: 'processEventBatch', args: { batch } }) - } - if (!server.DISABLE_WEB) { fastifyInstance = await startFastifyInstance(server) } stopSchedule = await startSchedule(server, piscina) - queue = await startQueue(server, processEvent, processEventBatch) + queue = await startQueue(server, piscina) piscina.on('drain', () => { queue?.resume() }) @@ -278,7 +265,7 @@ export async function startPluginsServer( await stopPiscina(piscina) } piscina = makePiscina(serverConfig!) - queue = await startQueue(server!, processEvent, processEventBatch) + queue = await startQueue(server!, piscina) stopSchedule = await startSchedule(server!, piscina) } }) diff --git a/src/types.ts b/src/types.ts index b787c09f2..c29c46f75 100644 --- a/src/types.ts +++ b/src/types.ts @@ -36,6 +36,7 @@ export interface PluginsServerConfig extends Record { KAFKA_CLIENT_CERT_KEY_B64: string | null KAFKA_TRUSTED_CERT_B64: string | null KAFKA_CONSUMPTION_TOPIC: string | null + KAFKA_BATCH_PARALELL_PROCESSING: boolean PLUGINS_CELERY_QUEUE: string REDIS_URL: string BASE_DIR: string diff --git a/src/utils.ts b/src/utils.ts index 80892bc59..adb924b8f 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -335,3 +335,16 @@ export function escapeClickHouseString(string: string): string { // https://clickhouse.tech/docs/en/sql-reference/syntax/ return string.replace(/\\/g, '\\\\').replace(/'/g, "\\'") } + +export async function runInParallelBatches Promise = (batch: T[]) => Promise>( + array: T[], + batchSize: number, + callback: R +): Promise> { + const arrays = [] + for (let i = 0; i < array.length; i += batchSize) { + arrays.push(array.slice(i, i + batchSize)) + } + const responses = await Promise.all(arrays.map(callback)) + return responses.flat() +} diff --git a/src/worker/queue.ts b/src/worker/queue.ts index d8d404dd6..0e9a52250 100644 --- a/src/worker/queue.ts +++ b/src/worker/queue.ts @@ -1,22 +1,51 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' -import { DateTime } from 'luxon' +import Piscina from 'piscina' import Client from '../celery/client' import Worker from '../celery/worker' +import { IngestEventResponse } from '../ingestion/ingest-event' import { KafkaQueue } from '../ingestion/kafka-queue' import { status } from '../status' import { PluginsServer, Queue } from '../types' -import { UUIDT } from '../utils' + +export type WorkerMethods = { + processEvent: (event: PluginEvent) => Promise + processEventBatch: (event: PluginEvent[]) => Promise<(PluginEvent | null)[]> + ingestEvent: (event: PluginEvent) => Promise +} + +function pauseQueueIfWorkerFull(queue: Queue | undefined, server: PluginsServer, piscina?: Piscina) { + if (queue && (piscina?.queueSize || 0) > (server.WORKER_CONCURRENCY || 4) * (server.WORKER_CONCURRENCY || 4)) { + queue.pause() + } +} export async function startQueue( server: PluginsServer, - processEvent: (event: PluginEvent) => Promise, - processEventBatch: (event: PluginEvent[]) => Promise<(PluginEvent | null)[]> + piscina?: Piscina, + workerMethods: Partial = {} ): Promise { const relevantStartQueue = server.KAFKA_ENABLED ? startQueueKafka : startQueueRedis + const mergedWorkerMethods = { + processEvent: (event: PluginEvent) => { + return piscina!.runTask({ task: 'processEvent', args: { event } }) + }, + processEventBatch: (batch: PluginEvent[]) => { + return piscina!.runTask({ task: 'processEventBatch', args: { batch } }) + }, + ingestEvent: (event: PluginEvent) => { + return piscina!.runTask({ task: 'ingestEvent', args: { event } }) + }, + ...workerMethods, + } + try { - return await relevantStartQueue(server, processEvent, processEventBatch) + if (server.KAFKA_ENABLED) { + return await startQueueKafka(server, mergedWorkerMethods) + } else { + return await startQueueRedis(server, piscina, mergedWorkerMethods) + } } catch (error) { status.error('💥', 'Failed to start event queue:\n', error) throw error @@ -25,13 +54,13 @@ export async function startQueue( async function startQueueRedis( server: PluginsServer, - processEvent: (event: PluginEvent) => Promise, - processEventBatch: (event: PluginEvent[]) => Promise<(PluginEvent | null)[]> + piscina: Piscina | undefined, + workerMethods: WorkerMethods ): Promise { - const worker = new Worker(server.redis, server.PLUGINS_CELERY_QUEUE) + const celeryQueue = new Worker(server.redis, server.PLUGINS_CELERY_QUEUE) const client = new Client(server.redis, server.CELERY_DEFAULT_QUEUE) - worker.register( + celeryQueue.register( 'posthog.tasks.process_event.process_event_with_plugins', async ( distinct_id: string, @@ -44,22 +73,14 @@ async function startQueueRedis( ) => { const event = { distinct_id, ip, site_url, team_id, now, sent_at, ...data } as PluginEvent try { - const processedEvent = await processEvent(event) + pauseQueueIfWorkerFull(celeryQueue, server, piscina) + const processedEvent = await workerMethods.processEvent(event) if (processedEvent) { - const { distinct_id, ip, site_url, team_id, now, sent_at, ...data } = processedEvent - if (server.PLUGIN_SERVER_INGESTION) { - await server.eventsProcessor.processEvent( - distinct_id, - ip, - site_url, - processedEvent, - team_id, - DateTime.fromISO(now), - sent_at ? DateTime.fromISO(sent_at) : null, - new UUIDT().toString() - ) + pauseQueueIfWorkerFull(celeryQueue, server, piscina) + await workerMethods.ingestEvent(processedEvent) } else { + const { distinct_id, ip, site_url, team_id, now, sent_at, ...data } = processedEvent client.sendTask('posthog.tasks.process_event.process_event', [], { distinct_id, ip, @@ -77,36 +98,23 @@ async function startQueueRedis( } ) - worker.start() + celeryQueue.start() - return worker + return celeryQueue } -async function startQueueKafka( - server: PluginsServer, - processEvent: (event: PluginEvent) => Promise, - processEventBatch: (event: PluginEvent[]) => Promise<(PluginEvent | null)[]> -): Promise { - const kafkaQueue = new KafkaQueue(server, processEventBatch, async (event: PluginEvent) => { - const { distinct_id, ip, site_url, team_id, now, sent_at, uuid } = event - if (!uuid) { - status.error('❓', 'UUID missing in event received from Kafka!') - return - } - if (server.PLUGIN_SERVER_INGESTION) { - await server.eventsProcessor.processEvent( - distinct_id, - ip, - site_url, - event, - team_id, - DateTime.fromISO(now), - sent_at ? DateTime.fromISO(sent_at) : null, - uuid - ) - } - }) - +async function startQueueKafka(server: PluginsServer, workerMethods: WorkerMethods): Promise { + const kafkaQueue: Queue = new KafkaQueue( + server, + (batch: PluginEvent[]) => workerMethods.processEventBatch(batch), + server.PLUGIN_SERVER_INGESTION + ? async (event) => { + await workerMethods.ingestEvent(event) + } + : async () => { + // no op, but defining to avoid undefined issues + } + ) await kafkaQueue.start() return kafkaQueue diff --git a/src/worker/worker.ts b/src/worker/worker.ts index 1c807d48b..99f4b84fd 100644 --- a/src/worker/worker.ts +++ b/src/worker/worker.ts @@ -1,3 +1,4 @@ +import { ingestEvent } from '../ingestion/ingest-event' import { initApp } from '../init' import { runPlugins, runPluginsOnBatch, runPluginTask, setupPlugins } from '../plugins' import { createServer } from '../server' @@ -42,6 +43,9 @@ export async function createWorker(config: PluginsServerConfig, threadId: number if (task === 'getPluginSchedule') { response = cloneObject(server.pluginSchedule) } + if (task === 'ingestEvent') { + response = cloneObject(await ingestEvent(server, args.event)) + } if (task.startsWith('runEvery')) { const { pluginConfigId } = args response = cloneObject(await runPluginTask(server, task, pluginConfigId)) diff --git a/tests/postgres/process-event.test.ts b/tests/postgres/process-event.test.ts index 655c52066..277649a56 100644 --- a/tests/postgres/process-event.test.ts +++ b/tests/postgres/process-event.test.ts @@ -1,5 +1,4 @@ -import { Team } from '../../src/types' -import { createUserTeamAndOrganization } from '../helpers/sql' +import { createUserTeamAndOrganization, getTeams } from '../helpers/sql' import { createProcessEventTests } from '../shared/process-event' jest.setTimeout(600000) // 600 sec timeout. @@ -37,7 +36,7 @@ describe('process event (postgresql)', () => { '0174f81e-36f5-0000-7ef8-cc26c1fbab1c' ) - const teams = (await server!.db.postgresQuery('SELECT * FROM posthog_team ORDER BY id')).rows as Team[] + const teams = await getTeams(server!) // # Test no team leakage const team2 = teams[1] diff --git a/tests/postgres/queue.test.ts b/tests/postgres/queue.test.ts index 309256ce5..fcabec675 100644 --- a/tests/postgres/queue.test.ts +++ b/tests/postgres/queue.test.ts @@ -68,11 +68,11 @@ test('worker and task passing via redis', async () => { expect(args2).toEqual(args) expect(kwargs2).toEqual({}) - const queue = await startQueue( - server, - (event) => runPlugins(server, event), - (events) => Promise.all(events.map((event) => runPlugins(server, event))) - ) + const queue = await startQueue(server, undefined, { + processEvent: (event) => runPlugins(server, event), + processEventBatch: (events) => Promise.all(events.map((event) => runPlugins(server, event))), + ingestEvent: () => Promise.resolve({ success: true }), + }) await advanceOneTick() await advanceOneTick() @@ -133,11 +133,11 @@ test('process multiple tasks', async () => { expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(3) expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(0) - const queue = await startQueue( - server, - (event) => runPlugins(server, event), - (events) => Promise.all(events.map((event) => runPlugins(server, event))) - ) + const queue = await startQueue(server, undefined, { + processEvent: (event) => runPlugins(server, event), + processEventBatch: (events) => Promise.all(events.map((event) => runPlugins(server, event))), + ingestEvent: () => Promise.resolve({ success: true }), + }) await advanceOneTick() expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(2) @@ -200,11 +200,11 @@ test('pause and resume queue', async () => { expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(6) expect(await server.redis.llen(server.CELERY_DEFAULT_QUEUE)).toBe(0) - const queue = await startQueue( - server, - (event) => runPlugins(server, event), - (events) => Promise.all(events.map((event) => runPlugins(server, event))) - ) + const queue = await startQueue(server, undefined, { + processEvent: (event) => runPlugins(server, event), + processEventBatch: (events) => Promise.all(events.map((event) => runPlugins(server, event))), + ingestEvent: () => Promise.resolve({ success: true }), + }) await advanceOneTick() expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(5) diff --git a/tests/postgres/worker.test.ts b/tests/postgres/worker.test.ts index 760b74588..ee385f152 100644 --- a/tests/postgres/worker.test.ts +++ b/tests/postgres/worker.test.ts @@ -3,7 +3,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' import Client from '../../src/celery/client' import { startPluginsServer } from '../../src/server' import { LogLevel } from '../../src/types' -import { delay } from '../../src/utils' +import { delay, UUIDT } from '../../src/utils' import { makePiscina } from '../../src/worker/piscina' import { resetTestDatabase } from '../helpers/sql' import { setupPiscina } from '../helpers/worker' @@ -41,6 +41,7 @@ test('piscina worker test', async () => { const processEventBatch = (batch: PluginEvent[]) => piscina.runTask({ task: 'processEventBatch', args: { batch } }) const runEveryDay = (pluginConfigId: number) => piscina.runTask({ task: 'runEveryDay', args: { pluginConfigId } }) const getPluginSchedule = () => piscina.runTask({ task: 'getPluginSchedule' }) + const ingestEvent = (event: PluginEvent) => piscina.runTask({ task: 'ingestEvent', args: { event } }) const pluginSchedule = await getPluginSchedule() expect(pluginSchedule).toEqual({ runEveryDay: [39], runEveryHour: [], runEveryMinute: [] }) @@ -54,6 +55,12 @@ test('piscina worker test', async () => { const everyDayReturn = await runEveryDay(39) expect(everyDayReturn).toBe(4) + const ingestResponse1 = await ingestEvent(createEvent()) + expect(ingestResponse1).toEqual({ error: 'Not a valid UUID: "undefined"' }) + + const ingestResponse2 = await ingestEvent({ ...createEvent(), uuid: new UUIDT().toString() }) + expect(ingestResponse2).toEqual({ success: true }) + await piscina.destroy() }) diff --git a/tests/shared/process-event.ts b/tests/shared/process-event.ts index 441524942..59f2d68b6 100644 --- a/tests/shared/process-event.ts +++ b/tests/shared/process-event.ts @@ -18,14 +18,20 @@ import { import { delay, UUIDT } from '../../src/utils' import { createUserTeamAndOrganization, getFirstTeam, getTeams, onQuery, resetTestDatabase } from '../helpers/sql' -jest.setTimeout(600000) // 600 sec timeout - -export async function delayUntilEventIngested(fetchEvents: () => Promise, minCount = 1): Promise { - for (let i = 0; i < 30; i++) { - if ((await fetchEvents()).length >= minCount) { +jest.setTimeout(600000) // 600 sec timeout. + +export async function delayUntilEventIngested( + fetchEvents: () => Promise, + minCount = 1, + delayMs = 500, + maxDelayCount = 30 +): Promise { + for (let i = 0; i < maxDelayCount; i++) { + const events = await fetchEvents() + if ((typeof events === 'number' ? events : events.length) >= minCount) { return } - await delay(500) + await delay(delayMs) } }