Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Benchmark ingestion #132

Merged
merged 12 commits into from
Feb 11, 2021
170 changes: 168 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ jobs:
ping -c 1 kafka
ping -c 1 zookeeper

- name: Start Kafka, Clickhouse, Zookeeper
- name: Start Kafka, ClickHouse, Zookeeper
run: |
cd posthog/ee
docker-compose -f docker-compose.ch.yml up -d zookeeper kafka clickhouse
Expand Down Expand Up @@ -191,6 +191,97 @@ jobs:
REDIS_URL: 'redis://localhost'
run: cd plugin-server && yarn test:clickhouse

benchmarks-clickhouse:
name: Benchmarks / ClickHouse + Kafka
runs-on: ubuntu-20.04

services:
postgres:
image: postgres:12
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: test_posthog
ports: ['5432:5432']
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
redis:
image: redis
ports:
- '6379:6379'
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5

env:
REDIS_URL: 'redis://localhost'
CLICKHOUSE_HOST: 'localhost'
CLICKHOUSE_DATABASE: 'posthog_test'
KAFKA_ENABLED: 'true'
KAFKA_HOSTS: 'kafka:9092'

steps:
- name: Check out Django server for database setup
uses: actions/checkout@v2
with:
repository: 'PostHog/posthog'
path: 'posthog/'

- name: Check out plugin server
uses: actions/checkout@v2
with:
path: 'plugin-server'

- name: Fix Kafka Hostname
run: |
sudo bash -c 'echo "127.0.0.1 kafka zookeeper" >> /etc/hosts'
ping -c 1 kafka
ping -c 1 zookeeper

- name: Start Kafka, ClickHouse, Zookeeper
run: |
cd posthog/ee
docker-compose -f docker-compose.ch.yml up -d zookeeper kafka clickhouse

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.8

- name: Set up Node 14
uses: actions/setup-node@v2
with:
node-version: 14

- uses: actions/cache@v2
with:
path: ${{ env.pythonLocation }}
key: ${{ env.pythonLocation }}-${{ hashFiles('posthog/requirements.txt') }}

- name: Install requirements.txt dependencies with pip
run: |
pip install --upgrade pip
pip install --upgrade --upgrade-strategy eager -r posthog/requirements.txt

- name: Set up databases
env:
SECRET_KEY: 'abcdef' # unsafe - for testing only
DATABASE_URL: 'postgres://postgres:postgres@localhost:5432/posthog'
PRIMARY_DB: 'clickhouse'
TEST: 'true'
run: python posthog/manage.py setup_test_environment

- name: Install package.json dependencies with Yarn
run: cd plugin-server && yarn

- name: Run benchmarks
env:
# Below DB name has `test_` prepended, as that's how Django (ran above) creates the test DB
DATABASE_URL: 'postgres://postgres:postgres@localhost:5432/test_posthog'
REDIS_URL: 'redis://localhost'
run: cd plugin-server && yarn benchmark:clickhouse

benchmarks-postgres:
name: Benchmarks / Postgres + Redis
runs-on: ubuntu-20.04
Expand Down Expand Up @@ -264,4 +355,79 @@ jobs:
# Below DB name has `test_` prepended, as that's how Django (ran above) creates the test DB
DATABASE_URL: 'postgres://postgres:postgres@localhost:5432/test_posthog'
REDIS_URL: 'redis://localhost'
run: cd plugin-server && yarn benchmark
run: cd plugin-server && yarn benchmark:postgres

benchmarks-vm:
name: Benchmarks / VM Performance & Memory
runs-on: ubuntu-20.04

services:
postgres:
image: postgres:12
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: test_posthog
ports: ['5432:5432']
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
redis:
image: redis
ports:
- '6379:6379'
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5

env:
REDIS_URL: 'redis://localhost'

steps:
- name: Check out Django server for database setup
uses: actions/checkout@v2
with:
repository: 'PostHog/posthog'
path: 'posthog/'

- name: Check out plugin server
uses: actions/checkout@v2
with:
path: 'plugin-server'

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.8

- name: Set up Node 14
uses: actions/setup-node@v2
with:
node-version: 14

- uses: actions/cache@v2
with:
path: ${{ env.pythonLocation }}
key: ${{ env.pythonLocation }}-${{ hashFiles('posthog/requirements.txt') }}

- name: Install requirements.txt dependencies with pip
run: |
pip install --upgrade pip
pip install --upgrade --upgrade-strategy eager -r posthog/requirements.txt

- name: Set up databases
env:
SECRET_KEY: 'abcdef' # unsafe - for testing only
DATABASE_URL: 'postgres://postgres:postgres@localhost:5432/posthog'
TEST: 'true'
run: python posthog/manage.py setup_test_environment

- name: Install package.json dependencies with Yarn
run: cd plugin-server && yarn

- name: Run benchmarks
env:
# Below DB name has `test_` prepended, as that's how Django (ran above) creates the test DB
DATABASE_URL: 'postgres://postgres:postgres@localhost:5432/test_posthog'
REDIS_URL: 'redis://localhost'
run: cd plugin-server && yarn benchmark:vm
95 changes: 95 additions & 0 deletions benchmarks/clickhouse/e2e.kafka.benchmark.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { performance } from 'perf_hooks'

import { createPosthog, DummyPostHog } from '../../src/extensions/posthog'
import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics'
import { startPluginsServer } from '../../src/server'
import { LogLevel, PluginsServerConfig, Queue } from '../../src/types'
import { PluginsServer } from '../../src/types'
import { delay, UUIDT } from '../../src/utils'
import { makePiscina } from '../../src/worker/piscina'
import { resetTestDatabaseClickhouse } from '../../tests/helpers/clickhouse'
import { resetKafka } from '../../tests/helpers/kafka'
import { pluginConfig39 } from '../../tests/helpers/plugins'
import { resetTestDatabase } from '../../tests/helpers/sql'
import { delayUntilEventIngested } from '../../tests/shared/process-event'

jest.setTimeout(600000) // 10min timeout

const extraServerConfig: Partial<PluginsServerConfig> = {
KAFKA_ENABLED: true,
KAFKA_HOSTS: process.env.KAFKA_HOSTS || 'kafka:9092',
WORKER_CONCURRENCY: 4,
PLUGIN_SERVER_INGESTION: true,
KAFKA_CONSUMPTION_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_BATCH_PARALELL_PROCESSING: true,
LOG_LEVEL: LogLevel.Log,
}

describe('e2e kafka & clickhouse benchmark', () => {
let queue: Queue
let server: PluginsServer
let stopServer: () => Promise<void>
let posthog: DummyPostHog

beforeEach(async () => {
await resetTestDatabase(`
async function processEventBatch (batch) {
// console.log(\`Received batch of \${batch.length} events\`)
return batch.map(event => {
event.properties.processed = 'hell yes'
event.properties.upperUuid = event.properties.uuid?.toUpperCase()
return event
})
}
`)
await resetKafka(extraServerConfig)
await resetTestDatabaseClickhouse(extraServerConfig)

const startResponse = await startPluginsServer(extraServerConfig, makePiscina)
server = startResponse.server
stopServer = startResponse.stop
queue = startResponse.queue

posthog = createPosthog(server, pluginConfig39)
})

afterEach(async () => {
await stopServer()
})

test('measure performance', async () => {
console.debug = () => null

const count = 3000

// fill in the queue
function createEvent() {
const uuid = new UUIDT().toString()
posthog.capture('custom event', { name: 'haha', uuid, randomProperty: 'lololo' })
}
await queue.pause()
for (let i = 0; i < count; i++) {
createEvent()
}

// hope that 5sec is enough to load kafka with all the events (posthog.capture can't be awaited)
await delay(5000)
await queue.resume()

console.log('Starting timer')
const startTime = performance.now()
await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count)
const timeMs = performance.now() - startTime
console.log('Finished!')

const n = (n: number) => `${Math.round(n * 100) / 100}`
console.log(
`[Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n(
1000 / (timeMs / count)
)} events/sec, ${n(timeMs / count)}ms per event)`
)

const events = await server.db.fetchEvents()
expect(events[count - 1].properties.upperUuid).toEqual(events[count - 1].properties.uuid.toUpperCase())
})
})
89 changes: 89 additions & 0 deletions benchmarks/postgres/e2e.celery.benchmark.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { performance } from 'perf_hooks'

import { createPosthog, DummyPostHog } from '../../src/extensions/posthog'
import { startPluginsServer } from '../../src/server'
import { LogLevel, PluginsServerConfig, Queue } from '../../src/types'
import { PluginsServer } from '../../src/types'
import { delay, UUIDT } from '../../src/utils'
import { makePiscina } from '../../src/worker/piscina'
import { pluginConfig39 } from '../../tests/helpers/plugins'
import { resetTestDatabase } from '../../tests/helpers/sql'
import { delayUntilEventIngested } from '../../tests/shared/process-event'

jest.setTimeout(600000) // 10min timeout

const extraServerConfig: Partial<PluginsServerConfig> = {
WORKER_CONCURRENCY: 4,
PLUGINS_CELERY_QUEUE: 'test-plugins-celery-queue',
CELERY_DEFAULT_QUEUE: 'test-celery-default-queue',
PLUGIN_SERVER_INGESTION: true,
LOG_LEVEL: LogLevel.Log,
KAFKA_ENABLED: false,
}

describe('e2e celery & postgres benchmark', () => {
let queue: Queue
let server: PluginsServer
let stopServer: () => Promise<void>
let posthog: DummyPostHog

beforeEach(async () => {
await resetTestDatabase(`
async function processEvent (event) {
event.properties.processed = 'hell yes'
event.properties.upperUuid = event.properties.uuid?.toUpperCase()
return event
}
`)

const startResponse = await startPluginsServer(extraServerConfig, makePiscina)
server = startResponse.server
stopServer = startResponse.stop
queue = startResponse.queue

await server.redis.del(server.PLUGINS_CELERY_QUEUE)
await server.redis.del(server.CELERY_DEFAULT_QUEUE)

posthog = createPosthog(server, pluginConfig39)
})

afterEach(async () => {
await stopServer()
})

test('measure performance', async () => {
console.debug = () => null

const count = 3000

// fill in the queue
function createEvent() {
const uuid = new UUIDT().toString()
posthog.capture('custom event', { name: 'haha', uuid, randomProperty: 'lololo' })
}
await queue.pause()
expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(0)
for (let i = 0; i < count; i++) {
await createEvent()
}
await delay(1000)
expect(await server.redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(count)
await queue.resume()

console.log('Starting timer')
const startTime = performance.now()
await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count)
const timeMs = performance.now() - startTime
console.log('Finished!')

const n = (n: number) => `${Math.round(n * 100) / 100}`
console.log(
`[Celery & Postgres] Ingested ${count} events in ${n(timeMs / 1000)}s (${n(
1000 / (timeMs / count)
)} events/sec, ${n(timeMs / count)}ms per event)`
)

const events = await server.db.fetchEvents()
expect(events[count - 1].properties.upperUuid).toEqual(events[count - 1].properties.uuid.toUpperCase())
})
})
Loading