Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Babel Loop Timeouts #155

Merged
merged 53 commits into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
12425e9
transpile code via babel-standalone
mariusandra Feb 15, 2021
68376d1
move vm.ts into vm/
mariusandra Feb 15, 2021
6b83212
add TASK_TIMEOUT config
mariusandra Feb 15, 2021
1e862a4
add babel transform to timeout while loops
mariusandra Feb 15, 2021
7e7d3dc
update refactored rename
mariusandra Feb 15, 2021
b390a4e
vm while timeout test
mariusandra Feb 15, 2021
9b1fa9e
add more unimplemented timeout tests
mariusandra Feb 15, 2021
c7cfd45
add transforms for while/for/do-while loops
mariusandra Feb 15, 2021
4e45dc0
fix loop timeout ms
mariusandra Feb 15, 2021
1f3e0a0
remove extra ;
mariusandra Feb 15, 2021
e664b20
fix import
mariusandra Feb 15, 2021
604ec7a
fix more imports
mariusandra Feb 15, 2021
8d9aa4b
fix even more imports
mariusandra Feb 15, 2021
77c0818
fix error messages
mariusandra Feb 15, 2021
723363d
simplify errors
mariusandra Feb 15, 2021
ff1ab9d
small refactor
mariusandra Feb 15, 2021
bae7cce
fix import
mariusandra Feb 15, 2021
5ee944c
add async guard and protect against long promises
mariusandra Feb 15, 2021
3d0585c
async guard around setupPlugin()
mariusandra Feb 16, 2021
af1426c
add column
mariusandra Feb 16, 2021
e6a1849
slightly safer variables
mariusandra Feb 16, 2021
4dee94d
less noise in vm bench
mariusandra Feb 16, 2021
abd172b
can not override async guard for the main functions (e.g. processEven…
mariusandra Feb 16, 2021
0d35843
reduce how much we do in a benchmark
mariusandra Feb 16, 2021
aa016aa
add types to loop timeout
mariusandra Feb 16, 2021
f0cd191
types for promise timeout
mariusandra Feb 16, 2021
c3c9d80
fix line/column numbers
mariusandra Feb 16, 2021
1a65f7f
explain some decisions
mariusandra Feb 16, 2021
00be595
verify that the process event timeout applies e2e.
mariusandra Feb 16, 2021
dbba38c
managed to get equal, so changing
mariusandra Feb 16, 2021
4eaf4fd
update message that it's just a warning
mariusandra Feb 16, 2021
6f55ec8
add e2e kafka test for bad delay
mariusandra Feb 16, 2021
d3db558
shorter test in github action
mariusandra Feb 17, 2021
733644f
increase test timeout to see if that makes a difference (locally it t…
mariusandra Feb 17, 2021
deef4a8
skip the "slow on GH action" test for now
mariusandra Feb 17, 2021
3c5f4a4
Merge branch 'master' into babel-timeouts
Twixes Feb 17, 2021
8463e40
process kafka events in parallel by default
mariusandra Feb 17, 2021
7a1ff20
add more metrics to kafka queue
mariusandra Feb 17, 2021
75ef5a3
some debug to help fix the test
mariusandra Feb 17, 2021
f792c51
add debug log in the delayUntilEventIngested function
mariusandra Feb 17, 2021
28d3f94
add "single_event_batch" timing to the event processing steps
mariusandra Feb 17, 2021
8c2d66e
fix timer
mariusandra Feb 17, 2021
f2aeff8
Improve timeoutGuard default timeout
Twixes Feb 17, 2021
440766a
revert benchmark to the last one that worked
mariusandra Feb 17, 2021
0b05999
skip bad delay
mariusandra Feb 17, 2021
be1af7b
add back a 1:1 copy of the e2e.kafka test, but with the timeout code.…
mariusandra Feb 17, 2021
cf004ed
remove broken tests, improve logging of working tests
mariusandra Feb 17, 2021
1c98e44
Merge branch 'babel-timeouts' of https://github.com/PostHog/plugin-se…
Twixes Feb 18, 2021
772716d
Clan up vm.ts
Twixes Feb 18, 2021
a69e15b
Improve clarity of loopTimeout
Twixes Feb 18, 2021
26cef38
Refactor transforms slightly
Twixes Feb 18, 2021
953bd62
Refactor transform call to secureCode func
Twixes Feb 18, 2021
190930b
Add secureCode tests
Twixes Feb 18, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/clickhouse/e2e.kafka.benchmark.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { performance } from 'perf_hooks'

import { createPosthog, DummyPostHog } from '../../src/extensions/posthog'
import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics'
import { startPluginsServer } from '../../src/server'
import { LogLevel, PluginsServerConfig, Queue } from '../../src/types'
import { PluginsServer } from '../../src/types'
import { delay, UUIDT } from '../../src/utils'
import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog'
import { makePiscina } from '../../src/worker/piscina'
import { resetTestDatabaseClickhouse } from '../../tests/helpers/clickhouse'
import { resetKafka } from '../../tests/helpers/kafka'
Expand Down Expand Up @@ -84,7 +84,7 @@ describe('e2e kafka & clickhouse benchmark', () => {

const n = (n: number) => `${Math.round(n * 100) / 100}`
console.log(
`[Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n(
`ℹ️️ [Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n(
1000 / (timeMs / count)
)} events/sec, ${n(timeMs / count)}ms per event)`
)
Expand Down
97 changes: 97 additions & 0 deletions benchmarks/clickhouse/e2e.timeout.benchmark.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { performance } from 'perf_hooks'

import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics'
import { startPluginsServer } from '../../src/server'
import { ClickHouseEvent, LogLevel, PluginsServerConfig, Queue } from '../../src/types'
import { PluginsServer } from '../../src/types'
import { delay, UUIDT } from '../../src/utils'
import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog'
import { makePiscina } from '../../src/worker/piscina'
import { resetTestDatabaseClickhouse } from '../../tests/helpers/clickhouse'
import { resetKafka } from '../../tests/helpers/kafka'
import { pluginConfig39 } from '../../tests/helpers/plugins'
import { resetTestDatabase } from '../../tests/helpers/sql'
import { delayUntilEventIngested } from '../../tests/shared/process-event'

jest.setTimeout(600000) // 10min timeout

const extraServerConfig: Partial<PluginsServerConfig> = {
KAFKA_ENABLED: true,
KAFKA_HOSTS: process.env.KAFKA_HOSTS || 'kafka:9092',
WORKER_CONCURRENCY: 4,
TASK_TIMEOUT: 5,
PLUGIN_SERVER_INGESTION: true,
KAFKA_CONSUMPTION_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_BATCH_PARALELL_PROCESSING: true,
LOG_LEVEL: LogLevel.Log,
}

describe('e2e kafka processing timeout benchmark', () => {
let queue: Queue
let server: PluginsServer
let stopServer: () => Promise<void>
let posthog: DummyPostHog

beforeEach(async () => {
await resetTestDatabase(`
async function processEvent (event) {
await new Promise(resolve => __jestSetTimeout(() => resolve(), 15000 * Math.random()))
event.properties.timeout = 'no timeout'
return event
}
`)
await resetKafka(extraServerConfig)
await resetTestDatabaseClickhouse(extraServerConfig)

const startResponse = await startPluginsServer(extraServerConfig, makePiscina)
server = startResponse.server
stopServer = startResponse.stop
queue = startResponse.queue

posthog = createPosthog(server, pluginConfig39)
})

afterEach(async () => {
await stopServer()
})

test('measure performance', async () => {
console.debug = () => null

const count = 3000

// fill in the queue
function createEvent() {
const uuid = new UUIDT().toString()
posthog.capture('custom event', { name: 'haha', uuid, randomProperty: 'lololo' })
}
await queue.pause()
for (let i = 0; i < count; i++) {
createEvent()
}

// hope that 5sec is enough to load kafka with all the events (posthog.capture can't be awaited)
await delay(5000)
await queue.resume()

console.log('Starting timer')
const startTime = performance.now()
await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count)
const timeMs = performance.now() - startTime
console.log('Finished!')

const n = (n: number) => `${Math.round(n * 100) / 100}`
console.log(
`ℹ️️ [Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n(
1000 / (timeMs / count)
)} events/sec, ${n(timeMs / count)}ms per event)`
)
const events = (await server.db.fetchEvents()) as ClickHouseEvent[]
const passedEvents = events.filter((e) => e.properties.timeout).length
console.log(
`ℹ️ Out of 3000 events: ${passedEvents} took under 5sec, ${
3000 - passedEvents
} timed out. This should be a 1:2 ratio.`
)
})
})
2 changes: 1 addition & 1 deletion benchmarks/postgres/e2e.celery.benchmark.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { performance } from 'perf_hooks'

import { createPosthog, DummyPostHog } from '../../src/extensions/posthog'
import { startPluginsServer } from '../../src/server'
import { LogLevel, PluginsServerConfig, Queue } from '../../src/types'
import { PluginsServer } from '../../src/types'
import { delay, UUIDT } from '../../src/utils'
import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog'
import { makePiscina } from '../../src/worker/piscina'
import { pluginConfig39 } from '../../tests/helpers/plugins'
import { resetTestDatabase } from '../../tests/helpers/sql'
Expand Down
37 changes: 21 additions & 16 deletions benchmarks/vm/memory.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types'

import { createServer } from '../../src/server'
import { Plugin, PluginConfig, PluginConfigVMReponse } from '../../src/types'
import { createPluginConfigVM } from '../../src/vm'
import { createPluginConfigVM } from '../../src/vm/vm'
import { commonOrganizationId } from '../../tests/helpers/plugins'

jest.mock('../../src/sql')
Expand Down Expand Up @@ -47,6 +47,7 @@ const mockConfig: PluginConfig = {
}

test('test vm memory usage', async () => {
const debug = false
const numVMs = 1000
const numEventsPerVM = 100

Expand Down Expand Up @@ -76,27 +77,31 @@ test('test vm memory usage', async () => {
const vm = await createPluginConfigVM(server, mockConfig, indexJs)
vms.push(vm)

const nowUsed = getUsed()
console.log(
`Used: ${nowUsed} MB, diff ${nowUsed - used} (${(nowUsed - usedAtStart) / (i + 1)} * ${
i + 1
} used since the start)`
)
used = nowUsed
if (debug || i === numVMs - 1) {
const nowUsed = getUsed()
console.log(
`Used: ${nowUsed} MB, diff ${nowUsed - used} (${(nowUsed - usedAtStart) / (i + 1)} * ${
i + 1
} used since the start)`
)
used = nowUsed
}
}

for (let i = 0; i < numEventsPerVM; i++) {
for (let j = 0; j < numVMs; j++) {
await vms[j].methods.processEvent(createEvent(i + j))
}
global.gc()
const nowUsed = getUsed()
console.log(
`Run ${i}. Used: ${nowUsed} MB, diff ${nowUsed - used} (${nowUsed - usedAtStart} used since the start, ${
(nowUsed - usedAtStart) / numVMs
} per vm)`
)
used = nowUsed
if (debug || i === numEventsPerVM - 1) {
global?.gc?.()
const nowUsed = getUsed()
console.log(
`Run ${i}. Used: ${nowUsed} MB, diff ${nowUsed - used} (${
nowUsed - usedAtStart
} used since the start, ${(nowUsed - usedAtStart) / numVMs} per vm)`
)
used = nowUsed
}
}

await closeServer()
Expand Down
7 changes: 5 additions & 2 deletions benchmarks/vm/worker.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,14 @@ test('piscina worker benchmark', async () => {
`,
},
{
testName: 'for200k',
// used to be 'for200k', but since we inject Date.now() code into
// the for/while/do loops, to throw if they are too long, running
// those comparisons 200k * 10k * runs * threads times is bit too much
testName: 'for2k',
events: 10000,
testCode: `
function processEvent (event, meta) {
let j = 0; for(let i = 0; i < 200000; i++) { j = i };
let j = 0; for(let i = 0; i < 2000; i++) { j = i };
event.properties = { "somewhere": "over the rainbow" };
return event
}
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"repository": "https://github.com/PostHog/posthog-plugin-server",
"license": "MIT",
"dependencies": {
"@babel/standalone": "^7.12.16",
"@google-cloud/bigquery": "^5.5.0",
"@posthog/clickhouse": "^1.7.0",
"@sentry/node": "^5.29.0",
Expand Down Expand Up @@ -72,6 +73,7 @@
"@babel/preset-typescript": "^7.8.3",
"@posthog/plugin-scaffold": "0.2.8",
"@types/adm-zip": "^0.4.33",
"@types/babel__standalone": "^7.1.3",
"@types/ioredis": "^4.17.7",
"@types/jest": "^26.0.15",
"@types/luxon": "^1.25.0",
Expand All @@ -86,7 +88,6 @@
"@types/yargs": "^15.0.9",
"@typescript-eslint/eslint-plugin": "^4.14.0",
"@typescript-eslint/parser": "^4.14.0",
"babel-core": "^7.0.0-bridge.0",
"babel-eslint": "^10.1.0",
"eslint": "^7.18.0",
"eslint-config-prettier": "^7.2.0",
Expand Down
4 changes: 3 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export function getDefaultConfig(): PluginsServerConfig {
KAFKA_CLIENT_CERT_KEY_B64: null,
KAFKA_TRUSTED_CERT_B64: null,
KAFKA_CONSUMPTION_TOPIC: null,
KAFKA_BATCH_PARALELL_PROCESSING: false,
KAFKA_BATCH_PARALELL_PROCESSING: true,
PLUGIN_SERVER_INGESTION: false,
PLUGINS_CELERY_QUEUE: 'posthog-plugins',
REDIS_URL: 'redis://127.0.0.1',
Expand All @@ -34,6 +34,7 @@ export function getDefaultConfig(): PluginsServerConfig {
WEB_PORT: 3008,
WEB_HOSTNAME: '0.0.0.0',
WORKER_CONCURRENCY: coreCount,
TASK_TIMEOUT: 30,
TASKS_PER_WORKER: 10,
LOG_LEVEL: LogLevel.Info,
SENTRY_DSN: null,
Expand Down Expand Up @@ -63,6 +64,7 @@ export function getConfigHelp(): Record<keyof PluginsServerConfig, string> {
WEB_PORT: 'port for web server to listen on',
WEB_HOSTNAME: 'hostname for web server to listen on',
WORKER_CONCURRENCY: 'number of concurrent worker threads',
TASK_TIMEOUT: 'How many seconds until tasks are timed out',
TASKS_PER_WORKER: 'number of parallel tasks per worker thread',
LOG_LEVEL: 'minimum log level',
KAFKA_ENABLED: 'use Kafka instead of Celery to ingest events',
Expand Down
23 changes: 18 additions & 5 deletions src/ingestion/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class KafkaQueue implements Queue {
isRunning,
isStale,
}: EachBatchPayload): Promise<void> {
const batchProcessingTimer = new Date()
const batchStartTimer = new Date()

const uuidOrder = new Map<string, number>()
const uuidOffset = new Map<string, string>()
Expand All @@ -63,21 +63,33 @@ export class KafkaQueue implements Queue {
)

const processingTimeout = timeoutGuard(
`Took too long to run plugins on ${pluginEvents.length} events! Timeout after 30 sec!`
`Still running plugins on ${pluginEvents.length} events. Timeout warning after 30 sec!`
)
const batches = groupIntoBatches(pluginEvents, maxBatchSize)
const processedEvents = (await Promise.all(batches.map(this.processEventBatch))).flat()
const processedEvents = (
await Promise.all(
batches.map(async (batch) => {
const timer = new Date()
const processedBatch = this.processEventBatch(batch)
this.pluginsServer.statsd?.timing('kafka_queue.single_event_batch', timer)
return processedBatch
})
)
).flat()

clearTimeout(processingTimeout)

this.pluginsServer.statsd?.timing('kafka_queue.each_batch.process_events', batchStartTimer)
const batchIngestionTimer = new Date()

// Sort in the original order that the events came in, putting any randomly added events to the end.
// This is so we would resolve the correct kafka offsets in order.
processedEvents.sort(
(a, b) => (uuidOrder.get(a.uuid!) || pluginEvents.length) - (uuidOrder.get(b.uuid!) || pluginEvents.length)
)

const ingestionTimeout = timeoutGuard(
`Took too long to ingest ${processedEvents.length} events! Timeout after 30 sec!`
`Still ingesting ${processedEvents.length} events. Timeout warning after 30 sec!`
)

// TODO: add chunking into groups of 500 or so. Might start too many promises at once now
Expand Down Expand Up @@ -126,7 +138,8 @@ export class KafkaQueue implements Queue {

clearTimeout(ingestionTimeout)

this.pluginsServer.statsd?.timing('kafka_queue.each_batch', batchProcessingTimer)
this.pluginsServer.statsd?.timing('kafka_queue.each_batch.ingest_events', batchIngestionTimer)
this.pluginsServer.statsd?.timing('kafka_queue.each_batch', batchStartTimer)
resolveOffset(batch.lastOffset())
await heartbeat()
await commitOffsetsIfNecessary()
Expand Down
5 changes: 3 additions & 2 deletions src/ingestion/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Sentry from '@sentry/node'
import crypto from 'crypto'

import { defaultConfig } from '../config'
import { BasePerson, Element, Person, RawPerson } from '../types'

export function unparsePersonPartial(person: Partial<Person>): Partial<RawPerson> {
Expand Down Expand Up @@ -156,9 +157,9 @@ export function chainToElements(chain: string): Element[] {
return elements
}

export function timeoutGuard(message: string): NodeJS.Timeout {
export function timeoutGuard(message: string, timeout = defaultConfig.TASK_TIMEOUT * 1000): NodeJS.Timeout {
return setTimeout(() => {
console.log(`⌛⌛⌛ ${message}`)
Sentry.captureMessage(message)
}, 30000)
}, timeout)
}
2 changes: 1 addition & 1 deletion src/plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { getPluginAttachmentRows, getPluginConfigRows, getPluginRows } from './s
import { status } from './status'
import { PluginConfig, PluginJsonConfig, PluginsServer, PluginTask, TeamId } from './types'
import { getFileFromArchive } from './utils'
import { createPluginConfigVM } from './vm'
import { createPluginConfigVM } from './vm/vm'

export async function setupPlugins(server: PluginsServer): Promise<void> {
const pluginRows = await getPluginRows(server)
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export enum LogLevel {
export interface PluginsServerConfig extends Record<string, any> {
WORKER_CONCURRENCY: number
TASKS_PER_WORKER: number
TASK_TIMEOUT: number
CELERY_DEFAULT_QUEUE: string
DATABASE_URL: string
CLICKHOUSE_HOST: string
Expand Down
8 changes: 8 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,11 @@ export function groupIntoBatches<T>(array: T[], batchSize: number): T[][] {
}
return batches
}

/** Template literal function, to standardize JS code used internally to form without extraneous indentation. */
export function code(strings: TemplateStringsArray): string {
const stringsConcat = strings.join('…')
const indentation = stringsConcat.match(/^\n([ ]*)/)?.[1].length ?? 0
const dedentedCode = stringsConcat.replace(new RegExp(`^[ ]{${indentation}}`, 'gm'), '')
return dedentedCode.trim()
}
Loading