From 12425e93b3c72c126160255cbc5f06293958365a Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 13:11:11 +0100 Subject: [PATCH 01/51] transpile code via babel-standalone --- package.json | 3 ++- src/extensions/posthog.ts | 2 +- src/vm.ts | 16 ++++++++++++++-- tests/plugins.test.ts | 18 ++++++++++-------- yarn.lock | 17 ++++++++++++----- 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/package.json b/package.json index 981a41d9..0d010bbd 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "repository": "https://github.com/PostHog/posthog-plugin-server", "license": "MIT", "dependencies": { + "@babel/standalone": "^7.12.16", "@google-cloud/bigquery": "^5.5.0", "@posthog/clickhouse": "^1.7.0", "@sentry/node": "^5.29.0", @@ -72,6 +73,7 @@ "@babel/preset-typescript": "^7.8.3", "@posthog/plugin-scaffold": "0.2.8", "@types/adm-zip": "^0.4.33", + "@types/babel__standalone": "^7.1.3", "@types/ioredis": "^4.17.7", "@types/jest": "^26.0.15", "@types/luxon": "^1.25.0", @@ -86,7 +88,6 @@ "@types/yargs": "^15.0.9", "@typescript-eslint/eslint-plugin": "^4.14.0", "@typescript-eslint/parser": "^4.14.0", - "babel-core": "^7.0.0-bridge.0", "babel-eslint": "^10.1.0", "eslint": "^7.18.0", "eslint-config-prettier": "^7.2.0", diff --git a/src/extensions/posthog.ts b/src/extensions/posthog.ts index dd60f978..34cf2a3e 100644 --- a/src/extensions/posthog.ts +++ b/src/extensions/posthog.ts @@ -30,7 +30,7 @@ export function createPosthog(server: PluginsServer, pluginConfig: PluginConfig) if (!server.kafkaProducer) { throw new Error('kafkaProducer not configured!') } - server.kafkaProducer.send({ + await server.kafkaProducer.send({ topic: server.KAFKA_CONSUMPTION_TOPIC!, messages: [ { diff --git a/src/vm.ts b/src/vm.ts index c12f0fe9..f8a33ccf 100644 --- a/src/vm.ts +++ b/src/vm.ts @@ -1,3 +1,4 @@ +import { transform } from '@babel/standalone' import fetch from 'node-fetch' import { VM } from 'vm2' @@ -18,6 +19,18 @@ export async function createPluginConfigVM( sandbox: {}, }) + const { code } = transform(`${libJs};${indexJs}`, { + envName: 'production', + filename: undefined, + cwd: undefined, + code: true, + ast: false, + sourceMaps: false, + babelrc: false, + configFile: false, + presets: [['env', { targets: { node: process.versions.node } }]], + }) + // our own stuff vm.freeze(createConsole(), 'console') vm.freeze(createPosthog(server, pluginConfig), 'posthog') @@ -53,8 +66,7 @@ export async function createPluginConfigVM( const __getExported = (key) => __getExportDestinations().find(a => a[key])?.[key]; // the plugin JS code - ${libJs}; - ${indexJs}; + ${code}; // inject the meta object + shareable 'global' to the end of each exported function const __pluginMeta = { diff --git a/tests/plugins.test.ts b/tests/plugins.test.ts index d31563bc..6b3caf7e 100644 --- a/tests/plugins.test.ts +++ b/tests/plugins.test.ts @@ -141,10 +141,11 @@ test('archive plugin with broken index.js does not do much', async () => { expect(setError).toHaveBeenCalled() expect(setError.mock.calls[0][0]).toEqual(mockServer) - expect(setError.mock.calls[0][1]!.message).toEqual("Unexpected token ';'") - expect(setError.mock.calls[0][1]!.name).toEqual('SyntaxError') - expect(setError.mock.calls[0][1]!.stack).toContain('vm.js:') - expect(setError.mock.calls[0][1]!.time).toBeDefined() + const error = setError.mock.calls[0][1]! + expect(error.message).toContain('unknown: Unexpected token, expected "," (3:8)') + expect(error.name).toEqual('SyntaxError') + expect(error.stack).toContain('SyntaxError: unknown') + expect(error.time).toBeDefined() expect(setError.mock.calls[0][2]).toEqual(pluginConfigs.get(39)) expect(pluginConfigs.get(39)!.vm).toEqual(null) }) @@ -168,10 +169,11 @@ test('local plugin with broken index.js does not do much', async () => { expect(setError).toHaveBeenCalled() expect(setError.mock.calls[0][0]).toEqual(mockServer) - expect(setError.mock.calls[0][1]!.message).toEqual("Unexpected token ';'") - expect(setError.mock.calls[0][1]!.name).toEqual('SyntaxError') - expect(setError.mock.calls[0][1]!.stack).toContain('vm.js:') - expect(setError.mock.calls[0][1]!.time).toBeDefined() + const error = setError.mock.calls[0][1]! + expect(error.message).toContain('unknown: Unexpected token, expected "," (1:26)') + expect(error.name).toEqual('SyntaxError') + expect(error.stack).toContain('SyntaxError: unknown') + expect(error.time).toBeDefined() expect(setError.mock.calls[0][2]).toEqual(pluginConfigs.get(39)) expect(pluginConfigs.get(39)!.vm).toEqual(null) diff --git a/yarn.lock b/yarn.lock index 5a1868fd..ff1fe5cd 100644 --- a/yarn.lock +++ b/yarn.lock @@ -842,6 +842,11 @@ dependencies: regenerator-runtime "^0.13.4" +"@babel/standalone@^7.12.16": + version "7.12.16" + resolved "https://registry.yarnpkg.com/@babel/standalone/-/standalone-7.12.16.tgz#2a6dbb54ffc36f5873136d8f8ff9d056b6d01078" + integrity sha512-QAJjWYSZYICtMtuJQCAlnhyhwPXMgnl2Bqp9MyxKfiE6u5/GTuEdw3Ay47re2DLxSiYVaST5uRLjyhE0igkXeQ== + "@babel/template@^7.10.4", "@babel/template@^7.12.7", "@babel/template@^7.3.3": version "7.12.7" resolved "https://registry.yarnpkg.com/@babel/template/-/template-7.12.7.tgz#c817233696018e39fbb6c491d2fb684e05ed43bc" @@ -1354,6 +1359,13 @@ dependencies: "@babel/types" "^7.0.0" +"@types/babel__standalone@^7.1.3": + version "7.1.3" + resolved "https://registry.yarnpkg.com/@types/babel__standalone/-/babel__standalone-7.1.3.tgz#e275639469c056f7cecc4963c88a803f028236dd" + integrity sha512-gj2Uvh8NT7+/0liAKMBtAqWspdg1LGZ471AD9Rdvrg1mB0q45qnl21dQPGz/ZtMYRgem0Aj/CUVQYHP3QDb5Gw== + dependencies: + "@babel/core" "^7.1.0" + "@types/babel__template@*": version "7.4.0" resolved "https://registry.yarnpkg.com/@types/babel__template/-/babel__template-7.4.0.tgz#0c888dd70b3ee9eebb6e4f200e809da0076262be" @@ -1901,11 +1913,6 @@ aws4@^1.8.0: resolved "https://registry.yarnpkg.com/aws4/-/aws4-1.11.0.tgz#d61f46d83b2519250e2784daf5b09479a8b41c59" integrity sha512-xh1Rl34h6Fi1DC2WWKfxUTVqRsNnr6LsKz2+hfwDxQJWmrx8+c7ylaqBMcHfl1U1r2dsifOvKX3LQuLNZ+XSvA== -babel-core@^7.0.0-bridge.0: - version "7.0.0-bridge.0" - resolved "https://registry.yarnpkg.com/babel-core/-/babel-core-7.0.0-bridge.0.tgz#95a492ddd90f9b4e9a4a1da14eb335b87b634ece" - integrity sha512-poPX9mZH/5CSanm50Q+1toVci6pv5KSRv/5TWCwtzQS5XEwn40BcCrgIeMFWP9CKKIniKXNxoIOnOq4VVlGXhg== - babel-eslint@^10.1.0: version "10.1.0" resolved "https://registry.yarnpkg.com/babel-eslint/-/babel-eslint-10.1.0.tgz#6968e568a910b78fb3779cdd8b6ac2f479943232" From 68376d1b58f704ea1c97a34208b062b6d71fbee5 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 13:19:33 +0100 Subject: [PATCH 02/51] move vm.ts into vm/ --- benchmarks/vm/memory.benchmark.ts | 2 +- src/{ => vm}/extensions/cache.ts | 2 +- src/{ => vm}/extensions/console.ts | 0 src/{ => vm}/extensions/google.ts | 0 src/{ => vm}/extensions/posthog.ts | 6 +++--- src/{ => vm}/extensions/storage.ts | 2 +- src/{ => vm}/vm.ts | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) rename src/{ => vm}/extensions/cache.ts (96%) rename src/{ => vm}/extensions/console.ts (100%) rename src/{ => vm}/extensions/google.ts (100%) rename src/{ => vm}/extensions/posthog.ts (95%) rename src/{ => vm}/extensions/storage.ts (95%) rename src/{ => vm}/vm.ts (99%) diff --git a/benchmarks/vm/memory.benchmark.ts b/benchmarks/vm/memory.benchmark.ts index 55739591..b8279b53 100644 --- a/benchmarks/vm/memory.benchmark.ts +++ b/benchmarks/vm/memory.benchmark.ts @@ -2,7 +2,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' import { createServer } from '../../src/server' import { Plugin, PluginConfig, PluginConfigVMReponse } from '../../src/types' -import { createPluginConfigVM } from '../../src/vm' +import { createPluginConfigVM } from '../../src/vm/vm' import { commonOrganizationId } from '../../tests/helpers/plugins' jest.mock('../../src/sql') diff --git a/src/extensions/cache.ts b/src/vm/extensions/cache.ts similarity index 96% rename from src/extensions/cache.ts rename to src/vm/extensions/cache.ts index a56a0780..0a115be7 100644 --- a/src/extensions/cache.ts +++ b/src/vm/extensions/cache.ts @@ -1,6 +1,6 @@ import { CacheExtension } from '@posthog/plugin-scaffold' -import { PluginsServer } from '../types' +import { PluginsServer } from '../../types' export function createCache(server: PluginsServer, pluginName: string, teamId: number): CacheExtension { const getKey = (key: string) => `@plugin/${pluginName}/${typeof teamId === 'undefined' ? '@all' : teamId}/${key}` diff --git a/src/extensions/console.ts b/src/vm/extensions/console.ts similarity index 100% rename from src/extensions/console.ts rename to src/vm/extensions/console.ts diff --git a/src/extensions/google.ts b/src/vm/extensions/google.ts similarity index 100% rename from src/extensions/google.ts rename to src/vm/extensions/google.ts diff --git a/src/extensions/posthog.ts b/src/vm/extensions/posthog.ts similarity index 95% rename from src/extensions/posthog.ts rename to src/vm/extensions/posthog.ts index 34cf2a3e..85394903 100644 --- a/src/extensions/posthog.ts +++ b/src/vm/extensions/posthog.ts @@ -2,9 +2,9 @@ import { Properties } from '@posthog/plugin-scaffold' import { DateTime } from 'luxon' import { PluginConfig, PluginsServer, RawEventMessage } from 'types' -import Client from '../celery/client' -import { UUIDT } from '../utils' -import { version } from '../version' +import Client from '../../celery/client' +import { UUIDT } from '../../utils' +import { version } from '../../version' interface InternalData { distinct_id: string diff --git a/src/extensions/storage.ts b/src/vm/extensions/storage.ts similarity index 95% rename from src/extensions/storage.ts rename to src/vm/extensions/storage.ts index e54b54da..80f862ac 100644 --- a/src/extensions/storage.ts +++ b/src/vm/extensions/storage.ts @@ -1,6 +1,6 @@ import { StorageExtension } from '@posthog/plugin-scaffold' -import { PluginConfig, PluginsServer } from '../types' +import { PluginConfig, PluginsServer } from '../../types' export function createStorage(server: PluginsServer, pluginConfig: PluginConfig): StorageExtension { const get = async function (key: string, defaultValue: unknown): Promise { diff --git a/src/vm.ts b/src/vm/vm.ts similarity index 99% rename from src/vm.ts rename to src/vm/vm.ts index f8a33ccf..35f90609 100644 --- a/src/vm.ts +++ b/src/vm/vm.ts @@ -2,12 +2,12 @@ import { transform } from '@babel/standalone' import fetch from 'node-fetch' import { VM } from 'vm2' +import { PluginConfig, PluginConfigVMReponse, PluginsServer } from '../types' import { createCache } from './extensions/cache' import { createConsole } from './extensions/console' import { createGoogle } from './extensions/google' import { createPosthog } from './extensions/posthog' import { createStorage } from './extensions/storage' -import { PluginConfig, PluginConfigVMReponse, PluginsServer } from './types' export async function createPluginConfigVM( server: PluginsServer, From 6b832121dab4bbdcb47cbcff54129fa284f63d66 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 13:25:53 +0100 Subject: [PATCH 03/51] add TASK_TIMEOUT config --- src/config.ts | 2 ++ src/types.ts | 1 + src/vm/vm.ts | 1 + 3 files changed, 4 insertions(+) diff --git a/src/config.ts b/src/config.ts index f440a052..48fc1728 100644 --- a/src/config.ts +++ b/src/config.ts @@ -34,6 +34,7 @@ export function getDefaultConfig(): PluginsServerConfig { WEB_PORT: 3008, WEB_HOSTNAME: '0.0.0.0', WORKER_CONCURRENCY: coreCount, + TASK_TIMEOUT: 30, TASKS_PER_WORKER: 10, LOG_LEVEL: LogLevel.Info, SENTRY_DSN: null, @@ -63,6 +64,7 @@ export function getConfigHelp(): Record { WEB_PORT: 'port for web server to listen on', WEB_HOSTNAME: 'hostname for web server to listen on', WORKER_CONCURRENCY: 'number of concurrent worker threads', + TASK_TIMEOUT: 'How many seconds until tasks are timed out', TASKS_PER_WORKER: 'number of parallel tasks per worker thread', LOG_LEVEL: 'minimum log level', KAFKA_ENABLED: 'use Kafka instead of Celery to ingest events', diff --git a/src/types.ts b/src/types.ts index c29c46f7..8f57110d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -22,6 +22,7 @@ export enum LogLevel { export interface PluginsServerConfig extends Record { WORKER_CONCURRENCY: number TASKS_PER_WORKER: number + TASK_TIMEOUT: number CELERY_DEFAULT_QUEUE: string DATABASE_URL: string CLICKHOUSE_HOST: string diff --git a/src/vm/vm.ts b/src/vm/vm.ts index 35f90609..425bc21f 100644 --- a/src/vm/vm.ts +++ b/src/vm/vm.ts @@ -16,6 +16,7 @@ export async function createPluginConfigVM( libJs = '' ): Promise { const vm = new VM({ + timeout: server.TASK_TIMEOUT * 1000, sandbox: {}, }) From 1e862a485514663cc040b6645bdf53c992aa2280 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 13:26:10 +0100 Subject: [PATCH 04/51] add babel transform to timeout while loops --- src/vm/transforms/while.ts | 35 +++++++++++++++++++++++++++++++++++ src/vm/vm.ts | 2 ++ 2 files changed, 37 insertions(+) create mode 100644 src/vm/transforms/while.ts diff --git a/src/vm/transforms/while.ts b/src/vm/transforms/while.ts new file mode 100644 index 00000000..4f7f7268 --- /dev/null +++ b/src/vm/transforms/while.ts @@ -0,0 +1,35 @@ +// https://medium.com/@bvjebin/js-infinite-loops-killing-em-e1c2f5f2db7f + +export const whileLoopTimeout = (timeoutSeconds: number) => (babel: any) => { + const t = babel.types + return { + visitor: { + WhileStatement: function transformWhile(path: any) { + const variableName = path.scope.generateUidIdentifier('timer') + const declaration = t.declareVariable(variableName) + path.scope.parent.push(declaration) + const definition = t.assignmentExpression( + '=', + variableName, + t.callExpression(t.memberExpression(t.identifier('Date'), t.identifier('now')), []) + ) + path.insertBefore(t.expressionStatement(definition)) + const lhs = t.parenthesizedExpression( + t.binaryExpression('+', variableName, t.NumericLiteral(Math.round(timeoutSeconds * 1000))) + ) + path.get('body').pushContainer( + 'body', + t.ifStatement( + t.binaryExpression( + '>', + t.callExpression(t.memberExpression(t.identifier('Date'), t.identifier('now')), []), + lhs + ), + t.throwStatement(t.stringLiteral('While Loop Timeout')), + null + ) + ) + }, + }, + } +} diff --git a/src/vm/vm.ts b/src/vm/vm.ts index 425bc21f..d5f910c3 100644 --- a/src/vm/vm.ts +++ b/src/vm/vm.ts @@ -8,6 +8,7 @@ import { createConsole } from './extensions/console' import { createGoogle } from './extensions/google' import { createPosthog } from './extensions/posthog' import { createStorage } from './extensions/storage' +import { whileLoopTimeout } from './transforms/while' export async function createPluginConfigVM( server: PluginsServer, @@ -30,6 +31,7 @@ export async function createPluginConfigVM( babelrc: false, configFile: false, presets: [['env', { targets: { node: process.versions.node } }]], + plugins: [whileLoopTimeout(server.TASK_TIMEOUT)], }) // our own stuff From 7e7d3dcd75c09f3ac1b5162ef495e1ace282c592 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 13:26:32 +0100 Subject: [PATCH 05/51] update refactored rename --- tests/postgres/vm.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/postgres/vm.test.ts b/tests/postgres/vm.test.ts index 29654234..ffff6d1a 100644 --- a/tests/postgres/vm.test.ts +++ b/tests/postgres/vm.test.ts @@ -5,7 +5,7 @@ import Client from '../../src/celery/client' import { createServer } from '../../src/server' import { PluginsServer } from '../../src/types' import { delay } from '../../src/utils' -import { createPluginConfigVM } from '../../src/vm' +import { createPluginConfigVM } from '../../src/vm/vm' import { pluginConfig39 } from '../helpers/plugins' import { resetTestDatabase } from '../helpers/sql' From b390a4ef420dad2ce3055907ad231e5553d602d8 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 13:38:13 +0100 Subject: [PATCH 06/51] vm while timeout test --- src/vm/transforms/while.ts | 4 ++- tests/postgres/vm.timeout.test.ts | 51 +++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 tests/postgres/vm.timeout.test.ts diff --git a/src/vm/transforms/while.ts b/src/vm/transforms/while.ts index 4f7f7268..affb72a1 100644 --- a/src/vm/transforms/while.ts +++ b/src/vm/transforms/while.ts @@ -25,7 +25,9 @@ export const whileLoopTimeout = (timeoutSeconds: number) => (babel: any) => { t.callExpression(t.memberExpression(t.identifier('Date'), t.identifier('now')), []), lhs ), - t.throwStatement(t.stringLiteral('While Loop Timeout')), + t.throwStatement( + t.NewExpression(t.identifier('Error'), [t.stringLiteral('While Loop Timeout')]) + ), null ) ) diff --git a/tests/postgres/vm.timeout.test.ts b/tests/postgres/vm.timeout.test.ts new file mode 100644 index 00000000..6b68a0e5 --- /dev/null +++ b/tests/postgres/vm.timeout.test.ts @@ -0,0 +1,51 @@ +import { createServer } from '../../src/server' +import { PluginsServer } from '../../src/types' +import { createPluginConfigVM } from '../../src/vm/vm' +import { pluginConfig39 } from '../helpers/plugins' +import { resetTestDatabase } from '../helpers/sql' + +const defaultEvent = { + distinct_id: 'my_id', + ip: '127.0.0.1', + site_url: 'http://localhost', + team_id: 3, + now: new Date().toISOString(), + event: 'default event', + properties: {}, +} + +describe('vm timeout tests', () => { + let server: PluginsServer + let stopServer: () => Promise + + beforeEach(async () => { + ;[server, stopServer] = await createServer({ + TASK_TIMEOUT: 1, + }) + }) + + afterEach(async () => { + await stopServer() + }) + + test('while loop', async () => { + const indexJs = ` + async function processEvent (event, meta) { + while(1){} + // Promise.resolve().then(() => { while(1); console.log("foo", Date.now()); }) + // await new Promise(resolve => __jestSetTimeout(() => resolve(), 40000)) + event.properties.processed = 'yup' + return event + } + ` + await resetTestDatabase(indexJs) + const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + let errorMessage = undefined + try { + await vm.methods.processEvent({ ...defaultEvent }) + } catch (e) { + errorMessage = e.message + } + expect(errorMessage!).toEqual('While Loop Timeout') + }) +}) From 9b1fa9ea27fe12c1f4ef178fa8b4f310f033a393 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 13:45:33 +0100 Subject: [PATCH 07/51] add more unimplemented timeout tests --- tests/postgres/vm.timeout.test.ts | 116 +++++++++++++++++++++++++++++- 1 file changed, 114 insertions(+), 2 deletions(-) diff --git a/tests/postgres/vm.timeout.test.ts b/tests/postgres/vm.timeout.test.ts index 6b68a0e5..716e4aa4 100644 --- a/tests/postgres/vm.timeout.test.ts +++ b/tests/postgres/vm.timeout.test.ts @@ -32,8 +32,120 @@ describe('vm timeout tests', () => { const indexJs = ` async function processEvent (event, meta) { while(1){} - // Promise.resolve().then(() => { while(1); console.log("foo", Date.now()); }) - // await new Promise(resolve => __jestSetTimeout(() => resolve(), 40000)) + event.properties.processed = 'yup' + return event + } + ` + await resetTestDatabase(indexJs) + const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + let errorMessage = undefined + try { + await vm.methods.processEvent({ ...defaultEvent }) + } catch (e) { + errorMessage = e.message + } + expect(errorMessage!).toEqual('While Loop Timeout') + }) + + test('while loop in promise', async () => { + const indexJs = ` + async function processEvent (event, meta) { + await Promise.resolve().then(() => { while(1) {}; }) + event.properties.processed = 'yup' + return event + } + ` + await resetTestDatabase(indexJs) + const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + let errorMessage = undefined + try { + await vm.methods.processEvent({ ...defaultEvent }) + } catch (e) { + errorMessage = e.message + } + expect(errorMessage!).toEqual('While Loop Timeout') + }) + + test.skip('do..while loop', async () => { + const indexJs = ` + async function processEvent (event, meta) { + do {} while (true); + event.properties.processed = 'yup' + return event + } + ` + await resetTestDatabase(indexJs) + const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + let errorMessage = undefined + try { + await vm.methods.processEvent({ ...defaultEvent }) + } catch (e) { + errorMessage = e.message + } + expect(errorMessage!).toEqual('While Loop Timeout') + }) + + test.skip('do..while loop in promise', async () => { + const indexJs = ` + async function processEvent (event, meta) { + await Promise.resolve().then(() => { do {} while (true); }) + event.properties.processed = 'yup' + return event + } + ` + await resetTestDatabase(indexJs) + const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + let errorMessage = undefined + try { + await vm.methods.processEvent({ ...defaultEvent }) + } catch (e) { + errorMessage = e.message + } + expect(errorMessage!).toEqual('While Loop Timeout') + }) + + test.skip('for loop', async () => { + const indexJs = ` + async function processEvent (event, meta) { + for(let i = 0; i > 1; i--){} + event.properties.processed = 'yup' + return event + } + ` + await resetTestDatabase(indexJs) + const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + let errorMessage = undefined + try { + await vm.methods.processEvent({ ...defaultEvent }) + } catch (e) { + errorMessage = e.message + } + expect(errorMessage!).toEqual('While Loop Timeout') + }) + + test.skip('for loop in promise', async () => { + const indexJs = ` + async function processEvent (event, meta) { + await Promise.resolve().then(() => { for(let i = 0; i > 1; i--){}; }) + event.properties.processed = 'yup' + return event + } + ` + await resetTestDatabase(indexJs) + const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + let errorMessage = undefined + try { + await vm.methods.processEvent({ ...defaultEvent }) + } catch (e) { + errorMessage = e.message + } + expect(errorMessage!).toEqual('While Loop Timeout') + }) + + test.skip('long promise', async () => { + const indexJs = ` + async function processEvent (event, meta) { + await new Promise(resolve => __jestSetTimeout(() => resolve(), 40000)) event.properties.processed = 'yup' return event } From c7cfd4596813041839330e6109163bc914d8441f Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 14:17:09 +0100 Subject: [PATCH 08/51] add transforms for while/for/do-while loops --- src/vm/transforms/loop-timeout.ts | 65 +++++++++++++++++++++++ src/vm/transforms/while.ts | 37 ------------- src/vm/vm.ts | 4 +- tests/postgres/vm.timeout.test.ts | 88 ++++++++++++++++++++++++++----- 4 files changed, 141 insertions(+), 53 deletions(-) create mode 100644 src/vm/transforms/loop-timeout.ts delete mode 100644 src/vm/transforms/while.ts diff --git a/src/vm/transforms/loop-timeout.ts b/src/vm/transforms/loop-timeout.ts new file mode 100644 index 00000000..c66716d2 --- /dev/null +++ b/src/vm/transforms/loop-timeout.ts @@ -0,0 +1,65 @@ +// Inspiration: +// https://medium.com/@bvjebin/js-infinite-loops-killing-em-e1c2f5f2db7f +// https://github.com/jsbin/loop-protect/blob/master/lib/index.js + +import { PluginsServer } from '../../types' + +const generateBefore = (t: any, id: any) => + t.variableDeclaration('var', [ + t.variableDeclarator(id, t.callExpression(t.memberExpression(t.identifier('Date'), t.identifier('now')), [])), + ]) + +const generateInside = ({ t, id, line, ch, timeout }: any = {}) => { + return t.ifStatement( + t.binaryExpression( + '>', + t.binaryExpression( + '-', + t.callExpression(t.memberExpression(t.identifier('Date'), t.identifier('now')), []), + id + ), + t.numericLiteral(timeout) + ), + t.throwStatement( + t.NewExpression(t.identifier('Error'), [t.stringLiteral(`${timeout} second loop timeout on line ${line}`)]) + ) + ) +} + +const protect = (t: any, timeout: number) => (path: any) => { + if (!path.node.loc) { + // I don't really know _how_ we get into this state + // but https://jsbin.com/mipesawapi/1/ triggers it + // and the node, I'm guessing after translation, + // doesn't have a line in the code, so this blows up. + return + } + const id = path.scope.generateUidIdentifier('LP') + const before = generateBefore(t, id) + const inside = generateInside({ + t, + id, + line: path.node.loc.start.line, + ch: path.node.loc.start.column, + timeout, + }) + const body = path.get('body') + + // if we have an expression statement, convert it to a block + if (!t.isBlockStatement(body)) { + body.replaceWith(t.blockStatement([body.node])) + } + path.insertBefore(before) + body.unshiftContainer('body', inside) +} + +export const loopTimeout = (server: PluginsServer) => (babel: any) => { + const t = babel.types + return { + visitor: { + WhileStatement: protect(t, server.TASK_TIMEOUT), + ForStatement: protect(t, server.TASK_TIMEOUT), + DoWhileStatement: protect(t, server.TASK_TIMEOUT), + }, + } +} diff --git a/src/vm/transforms/while.ts b/src/vm/transforms/while.ts deleted file mode 100644 index affb72a1..00000000 --- a/src/vm/transforms/while.ts +++ /dev/null @@ -1,37 +0,0 @@ -// https://medium.com/@bvjebin/js-infinite-loops-killing-em-e1c2f5f2db7f - -export const whileLoopTimeout = (timeoutSeconds: number) => (babel: any) => { - const t = babel.types - return { - visitor: { - WhileStatement: function transformWhile(path: any) { - const variableName = path.scope.generateUidIdentifier('timer') - const declaration = t.declareVariable(variableName) - path.scope.parent.push(declaration) - const definition = t.assignmentExpression( - '=', - variableName, - t.callExpression(t.memberExpression(t.identifier('Date'), t.identifier('now')), []) - ) - path.insertBefore(t.expressionStatement(definition)) - const lhs = t.parenthesizedExpression( - t.binaryExpression('+', variableName, t.NumericLiteral(Math.round(timeoutSeconds * 1000))) - ) - path.get('body').pushContainer( - 'body', - t.ifStatement( - t.binaryExpression( - '>', - t.callExpression(t.memberExpression(t.identifier('Date'), t.identifier('now')), []), - lhs - ), - t.throwStatement( - t.NewExpression(t.identifier('Error'), [t.stringLiteral('While Loop Timeout')]) - ), - null - ) - ) - }, - }, - } -} diff --git a/src/vm/vm.ts b/src/vm/vm.ts index d5f910c3..17c797c4 100644 --- a/src/vm/vm.ts +++ b/src/vm/vm.ts @@ -8,7 +8,7 @@ import { createConsole } from './extensions/console' import { createGoogle } from './extensions/google' import { createPosthog } from './extensions/posthog' import { createStorage } from './extensions/storage' -import { whileLoopTimeout } from './transforms/while' +import { loopTimeout } from './transforms/loop-timeout' export async function createPluginConfigVM( server: PluginsServer, @@ -31,7 +31,7 @@ export async function createPluginConfigVM( babelrc: false, configFile: false, presets: [['env', { targets: { node: process.versions.node } }]], - plugins: [whileLoopTimeout(server.TASK_TIMEOUT)], + plugins: [loopTimeout(server)], }) // our own stuff diff --git a/tests/postgres/vm.timeout.test.ts b/tests/postgres/vm.timeout.test.ts index 716e4aa4..9ecd0950 100644 --- a/tests/postgres/vm.timeout.test.ts +++ b/tests/postgres/vm.timeout.test.ts @@ -31,7 +31,7 @@ describe('vm timeout tests', () => { test('while loop', async () => { const indexJs = ` async function processEvent (event, meta) { - while(1){} + while(1) {} event.properties.processed = 'yup' return event } @@ -44,7 +44,27 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('While Loop Timeout') + expect(errorMessage!).toEqual('1 second loop timeout on line 3') + }) + + test('while loop no body', async () => { + const indexJs = ` + async function processEvent (event, meta) { + let i = 0 + while(1) i++; + event.properties.processed = 'yup' + return event + } + ` + await resetTestDatabase(indexJs) + const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + let errorMessage = undefined + try { + await vm.methods.processEvent({ ...defaultEvent }) + } catch (e) { + errorMessage = e.message + } + expect(errorMessage!).toEqual('1 second loop timeout on line 4') }) test('while loop in promise', async () => { @@ -63,10 +83,10 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('While Loop Timeout') + expect(errorMessage!).toEqual('1 second loop timeout on line 3') }) - test.skip('do..while loop', async () => { + test('do..while loop', async () => { const indexJs = ` async function processEvent (event, meta) { do {} while (true); @@ -82,10 +102,30 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('While Loop Timeout') + expect(errorMessage!).toEqual('1 second loop timeout on line 3') + }) + + test('do..while loop no body', async () => { + const indexJs = ` + async function processEvent (event, meta) { + let i = 0; + do i++; while (true); + event.properties.processed = 'yup' + return event + } + ` + await resetTestDatabase(indexJs) + const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + let errorMessage = undefined + try { + await vm.methods.processEvent({ ...defaultEvent }) + } catch (e) { + errorMessage = e.message + } + expect(errorMessage!).toEqual('1 second loop timeout on line 4') }) - test.skip('do..while loop in promise', async () => { + test('do..while loop in promise', async () => { const indexJs = ` async function processEvent (event, meta) { await Promise.resolve().then(() => { do {} while (true); }) @@ -101,13 +141,33 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('While Loop Timeout') + expect(errorMessage!).toEqual('1 second loop timeout on line 3') + }) + + test('for loop', async () => { + const indexJs = ` + async function processEvent (event, meta) { + for(let i = 0; i < 1; i--) {} + event.properties.processed = 'yup' + return event + } + ` + await resetTestDatabase(indexJs) + const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + let errorMessage = undefined + try { + await vm.methods.processEvent({ ...defaultEvent }) + } catch (e) { + errorMessage = e.message + } + expect(errorMessage!).toEqual('1 second loop timeout on line 3') }) - test.skip('for loop', async () => { + test('for loop no body', async () => { const indexJs = ` async function processEvent (event, meta) { - for(let i = 0; i > 1; i--){} + let a = 0 + for(let i = 0; i < 1; i--) a++ event.properties.processed = 'yup' return event } @@ -120,13 +180,13 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('While Loop Timeout') + expect(errorMessage!).toEqual('1 second loop timeout on line 4') }) - test.skip('for loop in promise', async () => { + test('for loop in promise', async () => { const indexJs = ` async function processEvent (event, meta) { - await Promise.resolve().then(() => { for(let i = 0; i > 1; i--){}; }) + await Promise.resolve().then(() => { for(let i = 0; i < 1; i--) {}; }) event.properties.processed = 'yup' return event } @@ -139,7 +199,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('While Loop Timeout') + expect(errorMessage!).toEqual('1 second loop timeout on line 3') }) test.skip('long promise', async () => { @@ -158,6 +218,6 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('While Loop Timeout') + expect(errorMessage!).toEqual('Long Promise Timeout') }) }) From 4e45dc0b2abd620289c0f49ae28ee5405e58b836 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 14:20:51 +0100 Subject: [PATCH 09/51] fix loop timeout ms --- src/vm/transforms/loop-timeout.ts | 2 +- tests/postgres/vm.timeout.test.ts | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/vm/transforms/loop-timeout.ts b/src/vm/transforms/loop-timeout.ts index c66716d2..82f34da3 100644 --- a/src/vm/transforms/loop-timeout.ts +++ b/src/vm/transforms/loop-timeout.ts @@ -18,7 +18,7 @@ const generateInside = ({ t, id, line, ch, timeout }: any = {}) => { t.callExpression(t.memberExpression(t.identifier('Date'), t.identifier('now')), []), id ), - t.numericLiteral(timeout) + t.numericLiteral(timeout * 1000) ), t.throwStatement( t.NewExpression(t.identifier('Error'), [t.stringLiteral(`${timeout} second loop timeout on line ${line}`)]) diff --git a/tests/postgres/vm.timeout.test.ts b/tests/postgres/vm.timeout.test.ts index 9ecd0950..ae264b6c 100644 --- a/tests/postgres/vm.timeout.test.ts +++ b/tests/postgres/vm.timeout.test.ts @@ -38,12 +38,14 @@ describe('vm timeout tests', () => { ` await resetTestDatabase(indexJs) const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + const date = new Date() let errorMessage = undefined try { await vm.methods.processEvent({ ...defaultEvent }) } catch (e) { errorMessage = e.message } + expect(new Date().valueOf() - date.valueOf()).toBeGreaterThan(1000) expect(errorMessage!).toEqual('1 second loop timeout on line 3') }) From 1f3e0a018da0cbebfea6f7d5953359c3593c3a92 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 14:34:56 +0100 Subject: [PATCH 10/51] remove extra ; --- src/vm/vm.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/vm/vm.ts b/src/vm/vm.ts index 17c797c4..067bebba 100644 --- a/src/vm/vm.ts +++ b/src/vm/vm.ts @@ -21,7 +21,8 @@ export async function createPluginConfigVM( sandbox: {}, }) - const { code } = transform(`${libJs};${indexJs}`, { + const source = libJs ? `${libJs};${indexJs}` : indexJs + const { code } = transform(source, { envName: 'production', filename: undefined, cwd: undefined, From e664b20f12ba1dee2598cc12a45d011eb6068bc8 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 15:02:47 +0100 Subject: [PATCH 11/51] fix import --- src/plugins.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugins.ts b/src/plugins.ts index 9697dcb1..3e9e2f21 100644 --- a/src/plugins.ts +++ b/src/plugins.ts @@ -7,7 +7,7 @@ import { getPluginAttachmentRows, getPluginConfigRows, getPluginRows } from './s import { status } from './status' import { PluginConfig, PluginJsonConfig, PluginsServer, PluginTask, TeamId } from './types' import { getFileFromArchive } from './utils' -import { createPluginConfigVM } from './vm' +import { createPluginConfigVM } from './vm/vm' export async function setupPlugins(server: PluginsServer): Promise { const pluginRows = await getPluginRows(server) From 604ec7a938968df490d0d36c62732eee849fdef2 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 15:14:33 +0100 Subject: [PATCH 12/51] fix more imports --- tests/clickhouse/e2e.test.ts | 2 +- tests/clickhouse/postgres-parity.test.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/clickhouse/e2e.test.ts b/tests/clickhouse/e2e.test.ts index 881f0429..9689f5c6 100644 --- a/tests/clickhouse/e2e.test.ts +++ b/tests/clickhouse/e2e.test.ts @@ -1,9 +1,9 @@ -import { createPosthog, DummyPostHog } from '../../src/extensions/posthog' import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics' import { startPluginsServer } from '../../src/server' import { LogLevel, PluginsServerConfig } from '../../src/types' import { PluginsServer } from '../../src/types' import { delay, UUIDT } from '../../src/utils' +import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog' import { makePiscina } from '../../src/worker/piscina' import { resetTestDatabaseClickhouse } from '../helpers/clickhouse' import { resetKafka } from '../helpers/kafka' diff --git a/tests/clickhouse/postgres-parity.test.ts b/tests/clickhouse/postgres-parity.test.ts index 71bbd5ac..a0180228 100644 --- a/tests/clickhouse/postgres-parity.test.ts +++ b/tests/clickhouse/postgres-parity.test.ts @@ -1,9 +1,9 @@ import { DateTime } from 'luxon' -import { createPosthog, DummyPostHog } from '../../src/extensions/posthog' import { startPluginsServer } from '../../src/server' import { Database, LogLevel, PluginsServer, PluginsServerConfig, Team, TimestampFormat } from '../../src/types' import { castTimestampOrNow, UUIDT } from '../../src/utils' +import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog' import { makePiscina } from '../../src/worker/piscina' import { resetTestDatabaseClickhouse } from '../helpers/clickhouse' import { resetKafka } from '../helpers/kafka' From 8d9aa4bb8a524110eaaf91dead9116c210d59f64 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 15:58:15 +0100 Subject: [PATCH 13/51] fix even more imports --- benchmarks/clickhouse/e2e.kafka.benchmark.ts | 2 +- benchmarks/postgres/e2e.celery.benchmark.ts | 2 +- tests/helpers/clickhouse.ts | 2 +- tests/postgres/e2e.test.ts | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/benchmarks/clickhouse/e2e.kafka.benchmark.ts b/benchmarks/clickhouse/e2e.kafka.benchmark.ts index e81f3961..5b4ac98d 100644 --- a/benchmarks/clickhouse/e2e.kafka.benchmark.ts +++ b/benchmarks/clickhouse/e2e.kafka.benchmark.ts @@ -1,11 +1,11 @@ import { performance } from 'perf_hooks' -import { createPosthog, DummyPostHog } from '../../src/extensions/posthog' import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics' import { startPluginsServer } from '../../src/server' import { LogLevel, PluginsServerConfig, Queue } from '../../src/types' import { PluginsServer } from '../../src/types' import { delay, UUIDT } from '../../src/utils' +import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog' import { makePiscina } from '../../src/worker/piscina' import { resetTestDatabaseClickhouse } from '../../tests/helpers/clickhouse' import { resetKafka } from '../../tests/helpers/kafka' diff --git a/benchmarks/postgres/e2e.celery.benchmark.ts b/benchmarks/postgres/e2e.celery.benchmark.ts index 4acdd1ed..5f6f4fe7 100644 --- a/benchmarks/postgres/e2e.celery.benchmark.ts +++ b/benchmarks/postgres/e2e.celery.benchmark.ts @@ -1,10 +1,10 @@ import { performance } from 'perf_hooks' -import { createPosthog, DummyPostHog } from '../../src/extensions/posthog' import { startPluginsServer } from '../../src/server' import { LogLevel, PluginsServerConfig, Queue } from '../../src/types' import { PluginsServer } from '../../src/types' import { delay, UUIDT } from '../../src/utils' +import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog' import { makePiscina } from '../../src/worker/piscina' import { pluginConfig39 } from '../../tests/helpers/plugins' import { resetTestDatabase } from '../../tests/helpers/sql' diff --git a/tests/helpers/clickhouse.ts b/tests/helpers/clickhouse.ts index 33af10a8..abc6d552 100644 --- a/tests/helpers/clickhouse.ts +++ b/tests/helpers/clickhouse.ts @@ -1,4 +1,4 @@ -import ClickHouse from '@posthog/clickhouse' +import * as ClickHouse from '@posthog/clickhouse' import { defaultConfig } from '../../src/config' import { PluginsServerConfig } from '../../src/types' diff --git a/tests/postgres/e2e.test.ts b/tests/postgres/e2e.test.ts index bc6ee8f9..0e567b92 100644 --- a/tests/postgres/e2e.test.ts +++ b/tests/postgres/e2e.test.ts @@ -1,8 +1,8 @@ -import { createPosthog, DummyPostHog } from '../../src/extensions/posthog' import { startPluginsServer } from '../../src/server' import { LogLevel } from '../../src/types' import { PluginsServer } from '../../src/types' import { delay, UUIDT } from '../../src/utils' +import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog' import { makePiscina } from '../../src/worker/piscina' import { pluginConfig39 } from '../helpers/plugins' import { resetTestDatabase } from '../helpers/sql' From 77c0818f7d5e94983f7a51c027573c3c56e6ab93 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 16:00:43 +0100 Subject: [PATCH 14/51] fix error messages --- src/vm/transforms/loop-timeout.ts | 8 +++++- tests/postgres/vm.timeout.test.ts | 43 ++++++++++++++++++++++++------- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/src/vm/transforms/loop-timeout.ts b/src/vm/transforms/loop-timeout.ts index 82f34da3..0caa8da0 100644 --- a/src/vm/transforms/loop-timeout.ts +++ b/src/vm/transforms/loop-timeout.ts @@ -21,7 +21,13 @@ const generateInside = ({ t, id, line, ch, timeout }: any = {}) => { t.numericLiteral(timeout * 1000) ), t.throwStatement( - t.NewExpression(t.identifier('Error'), [t.stringLiteral(`${timeout} second loop timeout on line ${line}`)]) + t.NewExpression(t.identifier('Error'), [ + t.stringLiteral( + `Script execution timed out after looping for ${timeout} second${ + timeout === 1 ? '' : 's' + } on line ${line}` + ), + ]) ) ) } diff --git a/tests/postgres/vm.timeout.test.ts b/tests/postgres/vm.timeout.test.ts index ae264b6c..7c992b0b 100644 --- a/tests/postgres/vm.timeout.test.ts +++ b/tests/postgres/vm.timeout.test.ts @@ -46,7 +46,7 @@ describe('vm timeout tests', () => { errorMessage = e.message } expect(new Date().valueOf() - date.valueOf()).toBeGreaterThan(1000) - expect(errorMessage!).toEqual('1 second loop timeout on line 3') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3') }) test('while loop no body', async () => { @@ -66,7 +66,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('1 second loop timeout on line 4') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 4') }) test('while loop in promise', async () => { @@ -85,7 +85,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('1 second loop timeout on line 3') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3') }) test('do..while loop', async () => { @@ -104,7 +104,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('1 second loop timeout on line 3') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3') }) test('do..while loop no body', async () => { @@ -124,7 +124,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('1 second loop timeout on line 4') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 4') }) test('do..while loop in promise', async () => { @@ -143,7 +143,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('1 second loop timeout on line 3') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3') }) test('for loop', async () => { @@ -162,7 +162,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('1 second loop timeout on line 3') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3') }) test('for loop no body', async () => { @@ -182,7 +182,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('1 second loop timeout on line 4') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 4') }) test('for loop in promise', async () => { @@ -201,13 +201,36 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('1 second loop timeout on line 3') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3') + }) + + test.skip('small promises', async () => { + const indexJs = ` + async function processEvent (event, meta) { + await new Promise(resolve => __jestSetTimeout(() => resolve(), 1000)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 1000)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 1000)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 1000)) + + event.properties.processed = 'yup' + return event + } + ` + await resetTestDatabase(indexJs) + const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + let errorMessage = undefined + try { + await vm.methods.processEvent({ ...defaultEvent }) + } catch (e) { + errorMessage = e.message + } + expect(errorMessage!).toEqual('Long Promise Timeout') }) test.skip('long promise', async () => { const indexJs = ` async function processEvent (event, meta) { - await new Promise(resolve => __jestSetTimeout(() => resolve(), 40000)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 4000)) event.properties.processed = 'yup' return event } From 723363dc0e41f5544c8a7adbbe4ef3f58f73cb4b Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 16:08:45 +0100 Subject: [PATCH 15/51] simplify errors --- tests/plugins.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/plugins.test.ts b/tests/plugins.test.ts index 6b3caf7e..28108dfe 100644 --- a/tests/plugins.test.ts +++ b/tests/plugins.test.ts @@ -142,7 +142,7 @@ test('archive plugin with broken index.js does not do much', async () => { expect(setError).toHaveBeenCalled() expect(setError.mock.calls[0][0]).toEqual(mockServer) const error = setError.mock.calls[0][1]! - expect(error.message).toContain('unknown: Unexpected token, expected "," (3:8)') + expect(error.message).toContain('unknown: Unexpected token, expected ","') expect(error.name).toEqual('SyntaxError') expect(error.stack).toContain('SyntaxError: unknown') expect(error.time).toBeDefined() @@ -170,7 +170,7 @@ test('local plugin with broken index.js does not do much', async () => { expect(setError).toHaveBeenCalled() expect(setError.mock.calls[0][0]).toEqual(mockServer) const error = setError.mock.calls[0][1]! - expect(error.message).toContain('unknown: Unexpected token, expected "," (1:26)') + expect(error.message).toContain('unknown: Unexpected token, expected ","') expect(error.name).toEqual('SyntaxError') expect(error.stack).toContain('SyntaxError: unknown') expect(error.time).toBeDefined() From ff1ab9d0cad7e368ac32770830f2f8cc3865d844 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 16:09:13 +0100 Subject: [PATCH 16/51] small refactor --- src/vm/vm.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/vm/vm.ts b/src/vm/vm.ts index 067bebba..1abc142d 100644 --- a/src/vm/vm.ts +++ b/src/vm/vm.ts @@ -16,11 +16,6 @@ export async function createPluginConfigVM( indexJs: string, libJs = '' ): Promise { - const vm = new VM({ - timeout: server.TASK_TIMEOUT * 1000, - sandbox: {}, - }) - const source = libJs ? `${libJs};${indexJs}` : indexJs const { code } = transform(source, { envName: 'production', @@ -35,6 +30,12 @@ export async function createPluginConfigVM( plugins: [loopTimeout(server)], }) + // create virtual machine + const vm = new VM({ + timeout: server.TASK_TIMEOUT * 1000 + 1, + sandbox: {}, + }) + // our own stuff vm.freeze(createConsole(), 'console') vm.freeze(createPosthog(server, pluginConfig), 'posthog') From bae7ccee7359dd2dde1755579a3e2fba7ae88a4d Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 16:23:34 +0100 Subject: [PATCH 17/51] fix import --- tests/helpers/clickhouse.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/helpers/clickhouse.ts b/tests/helpers/clickhouse.ts index abc6d552..33af10a8 100644 --- a/tests/helpers/clickhouse.ts +++ b/tests/helpers/clickhouse.ts @@ -1,4 +1,4 @@ -import * as ClickHouse from '@posthog/clickhouse' +import ClickHouse from '@posthog/clickhouse' import { defaultConfig } from '../../src/config' import { PluginsServerConfig } from '../../src/types' From 5ee944c0faa1e5e75dffb397e2bc2044a8846615 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 15 Feb 2021 18:06:56 +0100 Subject: [PATCH 18/51] add async guard and protect against long promises --- src/vm/transforms/promise-timeout.ts | 45 ++++++++++++++++++++++++++++ src/vm/vm.ts | 22 ++++++++++++-- tests/postgres/vm.timeout.test.ts | 23 +++++++++----- 3 files changed, 79 insertions(+), 11 deletions(-) create mode 100644 src/vm/transforms/promise-timeout.ts diff --git a/src/vm/transforms/promise-timeout.ts b/src/vm/transforms/promise-timeout.ts new file mode 100644 index 00000000..586fe7f9 --- /dev/null +++ b/src/vm/transforms/promise-timeout.ts @@ -0,0 +1,45 @@ +// inspired by: https://github.com/treywood/babel-plugin-bluebird-async-functions/ +import { PluginsServer } from '../../types' + +const REPLACED = Symbol() + +export const promiseTimeout = (server: PluginsServer) => (babel: any) => { + const t = babel.types + return { + visitor: { + // changes: bla.then --> __asyncGuard(bla).then + MemberExpression: { + exit(path: any) { + const { node } = path + if ( + node.property && + t.isIdentifier(node.property) && + node.property.name === 'then' && + !node[REPLACED] + ) { + const newCall = t.memberExpression( + t.callExpression(t.identifier('__asyncGuard'), [node.object]), + t.identifier('then') + ) + newCall[REPLACED] = true + path.replaceWith(newCall) + } + }, + }, + + // changes: await bla --> await __asyncGuard(bla) + AwaitExpression: { + exit(path: any) { + const { node } = path + if (node && !node[REPLACED]) { + const newAwait = t.awaitExpression( + t.callExpression(t.identifier('__asyncGuard'), [node.argument]) + ) + newAwait[REPLACED] = true + path.replaceWith(newAwait) + } + }, + }, + }, + } +} diff --git a/src/vm/vm.ts b/src/vm/vm.ts index 1abc142d..62d6cc08 100644 --- a/src/vm/vm.ts +++ b/src/vm/vm.ts @@ -9,6 +9,7 @@ import { createGoogle } from './extensions/google' import { createPosthog } from './extensions/posthog' import { createStorage } from './extensions/storage' import { loopTimeout } from './transforms/loop-timeout' +import { promiseTimeout } from './transforms/promise-timeout' export async function createPluginConfigVM( server: PluginsServer, @@ -27,7 +28,7 @@ export async function createPluginConfigVM( babelrc: false, configFile: false, presets: [['env', { targets: { node: process.versions.node } }]], - plugins: [loopTimeout(server)], + plugins: [loopTimeout(server), promiseTimeout(server)], }) // create virtual machine @@ -47,6 +48,19 @@ export async function createPluginConfigVM( if (process.env.NODE_ENV === 'test') { vm.freeze(setTimeout, '__jestSetTimeout') } + + vm.freeze(async (promise: () => Promise) => { + const timeout = server.TASK_TIMEOUT + const message = `Script execution timed out after promise waited for ${timeout} second${ + timeout === 1 ? '' : 's' + }` + const response = await Promise.race([ + promise, + new Promise((resolve, reject) => setTimeout(() => reject(new Error(message)), timeout * 1000)), + ]) + return response + }, '__asyncGuard') + vm.freeze( { cache: createCache( @@ -60,6 +74,7 @@ export async function createPluginConfigVM( }, '__pluginHostMeta' ) + vm.run( ` // two ways packages could export themselves (plus "global") @@ -69,6 +84,7 @@ export async function createPluginConfigVM( // helpers to get globals const __getExportDestinations = () => [exports, module.exports, global] const __getExported = (key) => __getExportDestinations().find(a => a[key])?.[key]; + const __asyncFunctionGuard = (func) => (...args) => __asyncGuard(func(...args)) // the plugin JS code ${code}; @@ -111,8 +127,8 @@ export async function createPluginConfigVM( // export various functions const __methods = { - processEvent: __bindMeta('processEvent'), - processEventBatch: __bindMeta('processEventBatch') + processEvent: __asyncFunctionGuard(__bindMeta('processEvent')), + processEventBatch: __asyncFunctionGuard(__bindMeta('processEventBatch')) }; // gather the runEveryX commands and export in __tasks diff --git a/tests/postgres/vm.timeout.test.ts b/tests/postgres/vm.timeout.test.ts index 7c992b0b..68e7bc8f 100644 --- a/tests/postgres/vm.timeout.test.ts +++ b/tests/postgres/vm.timeout.test.ts @@ -204,13 +204,17 @@ describe('vm timeout tests', () => { expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3') }) - test.skip('small promises', async () => { + test('small promises', async () => { const indexJs = ` async function processEvent (event, meta) { - await new Promise(resolve => __jestSetTimeout(() => resolve(), 1000)) - await new Promise(resolve => __jestSetTimeout(() => resolve(), 1000)) - await new Promise(resolve => __jestSetTimeout(() => resolve(), 1000)) - await new Promise(resolve => __jestSetTimeout(() => resolve(), 1000)) + const data = await fetch('https://www.example.com').then(response => response.json()).then(data => { + return data + }) + + await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) event.properties.processed = 'yup' return event @@ -218,16 +222,19 @@ describe('vm timeout tests', () => { ` await resetTestDatabase(indexJs) const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + const date = new Date() let errorMessage = undefined try { await vm.methods.processEvent({ ...defaultEvent }) } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('Long Promise Timeout') + expect(new Date().valueOf() - date.valueOf()).toBeGreaterThan(1000) + expect(new Date().valueOf() - date.valueOf()).toBeLessThan(4000) + expect(errorMessage!).toEqual('Script execution timed out after promise waited for 1 second') }) - test.skip('long promise', async () => { + test('long promise', async () => { const indexJs = ` async function processEvent (event, meta) { await new Promise(resolve => __jestSetTimeout(() => resolve(), 4000)) @@ -243,6 +250,6 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('Long Promise Timeout') + expect(errorMessage!).toEqual('Script execution timed out after promise waited for 1 second') }) }) From 3d0585c1657ff4807fea09053f7b67139c15c829 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 16 Feb 2021 15:41:46 +0100 Subject: [PATCH 19/51] async guard around setupPlugin() --- src/vm/vm.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/vm/vm.ts b/src/vm/vm.ts index 62d6cc08..b9d6f2cf 100644 --- a/src/vm/vm.ts +++ b/src/vm/vm.ts @@ -81,13 +81,13 @@ export async function createPluginConfigVM( const module = { exports: {} }; let exports = {}; + // the plugin JS code + ${code}; + // helpers to get globals const __getExportDestinations = () => [exports, module.exports, global] const __getExported = (key) => __getExportDestinations().find(a => a[key])?.[key]; - const __asyncFunctionGuard = (func) => (...args) => __asyncGuard(func(...args)) - - // the plugin JS code - ${code}; + const __asyncFunctionGuard = (func) => func ? (...args) => __asyncGuard(func(...args)) : func // inject the meta object + shareable 'global' to the end of each exported function const __pluginMeta = { @@ -146,11 +146,11 @@ export async function createPluginConfigVM( } // run the plugin setup script, if present - const __setupPlugin = async () => __callWithMeta('setupPlugin'); + const __setupPlugin = __asyncFunctionGuard(async () => __callWithMeta('setupPlugin')); ` ) - await vm.run('__setupPlugin')() + await vm.run('__setupPlugin()') return { vm, From af1426c0d4f2b62b03d005b458548b27382f42f3 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 16 Feb 2021 16:06:47 +0100 Subject: [PATCH 20/51] add column --- src/vm/transforms/loop-timeout.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/vm/transforms/loop-timeout.ts b/src/vm/transforms/loop-timeout.ts index 0caa8da0..5b17e572 100644 --- a/src/vm/transforms/loop-timeout.ts +++ b/src/vm/transforms/loop-timeout.ts @@ -25,7 +25,7 @@ const generateInside = ({ t, id, line, ch, timeout }: any = {}) => { t.stringLiteral( `Script execution timed out after looping for ${timeout} second${ timeout === 1 ? '' : 's' - } on line ${line}` + } on line ${line}:${ch}` ), ]) ) From e6a18498344d444a55552bb58c2c74e1ed3934f0 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 16 Feb 2021 16:07:09 +0100 Subject: [PATCH 21/51] slightly safer variables --- src/vm/vm.ts | 153 ++++++++++++++++++++++++++++----------------------- 1 file changed, 84 insertions(+), 69 deletions(-) diff --git a/src/vm/vm.ts b/src/vm/vm.ts index b9d6f2cf..527d782e 100644 --- a/src/vm/vm.ts +++ b/src/vm/vm.ts @@ -1,4 +1,5 @@ import { transform } from '@babel/standalone' +import { randomBytes } from 'crypto' import fetch from 'node-fetch' import { VM } from 'vm2' @@ -51,12 +52,16 @@ export async function createPluginConfigVM( vm.freeze(async (promise: () => Promise) => { const timeout = server.TASK_TIMEOUT - const message = `Script execution timed out after promise waited for ${timeout} second${ - timeout === 1 ? '' : 's' - }` const response = await Promise.race([ promise, - new Promise((resolve, reject) => setTimeout(() => reject(new Error(message)), timeout * 1000)), + new Promise((resolve, reject) => + setTimeout(() => { + const message = `Script execution timed out after promise waited for ${timeout} second${ + timeout === 1 ? '' : 's' + }` + reject(new Error(message)) + }, timeout * 1000) + ), ]) return response }, '__asyncGuard') @@ -75,86 +80,96 @@ export async function createPluginConfigVM( '__pluginHostMeta' ) - vm.run( - ` + vm.run(` // two ways packages could export themselves (plus "global") const module = { exports: {} }; let exports = {}; // the plugin JS code ${code}; - - // helpers to get globals - const __getExportDestinations = () => [exports, module.exports, global] - const __getExported = (key) => __getExportDestinations().find(a => a[key])?.[key]; - const __asyncFunctionGuard = (func) => func ? (...args) => __asyncGuard(func(...args)) : func - - // inject the meta object + shareable 'global' to the end of each exported function - const __pluginMeta = { - ...__pluginHostMeta, - global: {} - }; - function __bindMeta (keyOrFunc) { - const func = typeof keyOrFunc === 'function' ? keyOrFunc : __getExported(keyOrFunc); - if (func) return (...args) => func(...args, __pluginMeta); - } - function __callWithMeta (keyOrFunc, ...args) { - const func = __bindMeta(keyOrFunc); - if (func) return func(...args); - } - - // we have processEvent, but not processEventBatch - if (!__getExported('processEventBatch') && __getExported('processEvent')) { - exports.processEventBatch = async function processEventBatch (batch, meta) { - const processEvent = __getExported('processEvent'); - let waitFor = false - const processedEvents = batch.map(event => { - const e = processEvent(event, meta) - if (e && typeof e.then !== 'undefined') { - waitFor = true - } - return e - }) - const response = waitFor ? (await Promise.all(processedEvents)) : processedEvents; - return response.filter(r => r) + `) + + const responseVar = `__pluginDetails${randomBytes(64).toString('hex')}` + + vm.run(` + const ${responseVar} = (() => { + // helpers to get globals + const __getExportDestinations = () => [exports, module.exports, global] + const __getExported = (key) => __getExportDestinations().find(a => a[key])?.[key]; + const __asyncFunctionGuard = (func) => func ? (...args) => __asyncGuard(func(...args)) : func + + // inject the meta object + shareable 'global' to the end of each exported function + const __pluginMeta = { + ...__pluginHostMeta, + global: {} + }; + function __bindMeta (keyOrFunc) { + const func = typeof keyOrFunc === 'function' ? keyOrFunc : __getExported(keyOrFunc); + if (func) return (...args) => func(...args, __pluginMeta); } - // we have processEventBatch, but not processEvent - } else if (!__getExported('processEvent') && __getExported('processEventBatch')) { - exports.processEvent = async function processEvent (event, meta) { - return (await (__getExported('processEventBatch'))([event], meta))?.[0] + function __callWithMeta (keyOrFunc, ...args) { + const func = __bindMeta(keyOrFunc); + if (func) return func(...args); } - } - - // export various functions - const __methods = { - processEvent: __asyncFunctionGuard(__bindMeta('processEvent')), - processEventBatch: __asyncFunctionGuard(__bindMeta('processEventBatch')) - }; - - // gather the runEveryX commands and export in __tasks - const __tasks = {}; - for (const exportDestination of __getExportDestinations().reverse()) { - for (const [name, value] of Object.entries(exportDestination)) { - if (name.startsWith("runEvery") && typeof value === 'function') { - __tasks[name] = { - name: name, - type: 'runEvery', - exec: __bindMeta(value) + + // we have processEvent, but not processEventBatch + if (!__getExported('processEventBatch') && __getExported('processEvent')) { + exports.processEventBatch = async function processEventBatch (batch, meta) { + const processEvent = __getExported('processEvent'); + let waitFor = false + const processedEvents = batch.map(event => { + const e = processEvent(event, meta) + if (e && typeof e.then !== 'undefined') { + waitFor = true + } + return e + }) + const response = waitFor ? (await Promise.all(processedEvents)) : processedEvents; + return response.filter(r => r) + } + // we have processEventBatch, but not processEvent + } else if (!__getExported('processEvent') && __getExported('processEventBatch')) { + exports.processEvent = async function processEvent (event, meta) { + return (await (__getExported('processEventBatch'))([event], meta))?.[0] + } + } + + // export various functions + const __methods = { + processEvent: __asyncFunctionGuard(__bindMeta('processEvent')), + processEventBatch: __asyncFunctionGuard(__bindMeta('processEventBatch')) + }; + + // gather the runEveryX commands and export in __tasks + const __tasks = {}; + for (const exportDestination of __getExportDestinations().reverse()) { + for (const [name, value] of Object.entries(exportDestination)) { + if (name.startsWith("runEvery") && typeof value === 'function') { + __tasks[name] = { + name: name, + type: 'runEvery', + exec: __bindMeta(value) + } } } } - } - // run the plugin setup script, if present - const __setupPlugin = __asyncFunctionGuard(async () => __callWithMeta('setupPlugin')); - ` - ) + // run the plugin setup script, if present + const __setupPlugin = __asyncFunctionGuard(async () => __callWithMeta('setupPlugin')); + + return { + __methods, + __tasks, + __setupPlugin + } + })(); + `) - await vm.run('__setupPlugin()') + await vm.run(`${responseVar}.__setupPlugin()`) return { vm, - methods: vm.run('__methods'), - tasks: vm.run('__tasks'), + methods: vm.run(`${responseVar}.__methods`), + tasks: vm.run(`${responseVar}.__tasks`), } } From 4dee94d5272b17580d9d43304c05065e3c6a811b Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 16 Feb 2021 16:56:35 +0100 Subject: [PATCH 22/51] less noise in vm bench --- benchmarks/vm/memory.benchmark.ts | 35 ++++++++++++++++++------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/benchmarks/vm/memory.benchmark.ts b/benchmarks/vm/memory.benchmark.ts index b8279b53..40e7263e 100644 --- a/benchmarks/vm/memory.benchmark.ts +++ b/benchmarks/vm/memory.benchmark.ts @@ -47,6 +47,7 @@ const mockConfig: PluginConfig = { } test('test vm memory usage', async () => { + const debug = false const numVMs = 1000 const numEventsPerVM = 100 @@ -76,27 +77,31 @@ test('test vm memory usage', async () => { const vm = await createPluginConfigVM(server, mockConfig, indexJs) vms.push(vm) - const nowUsed = getUsed() - console.log( - `Used: ${nowUsed} MB, diff ${nowUsed - used} (${(nowUsed - usedAtStart) / (i + 1)} * ${ - i + 1 - } used since the start)` - ) - used = nowUsed + if (debug || i === numVMs - 1) { + const nowUsed = getUsed() + console.log( + `Used: ${nowUsed} MB, diff ${nowUsed - used} (${(nowUsed - usedAtStart) / (i + 1)} * ${ + i + 1 + } used since the start)` + ) + used = nowUsed + } } for (let i = 0; i < numEventsPerVM; i++) { for (let j = 0; j < numVMs; j++) { await vms[j].methods.processEvent(createEvent(i + j)) } - global.gc() - const nowUsed = getUsed() - console.log( - `Run ${i}. Used: ${nowUsed} MB, diff ${nowUsed - used} (${nowUsed - usedAtStart} used since the start, ${ - (nowUsed - usedAtStart) / numVMs - } per vm)` - ) - used = nowUsed + if (debug || i === numEventsPerVM - 1) { + global?.gc?.() + const nowUsed = getUsed() + console.log( + `Run ${i}. Used: ${nowUsed} MB, diff ${nowUsed - used} (${ + nowUsed - usedAtStart + } used since the start, ${(nowUsed - usedAtStart) / numVMs} per vm)` + ) + used = nowUsed + } } await closeServer() From abd172b591010af024abac49887312fe2aed6ddb Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 16 Feb 2021 16:57:24 +0100 Subject: [PATCH 23/51] can not override async guard for the main functions (e.g. processEvent, etc) --- src/vm/vm.ts | 21 ++++++++++++-------- tests/postgres/vm.timeout.test.ts | 32 +++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/src/vm/vm.ts b/src/vm/vm.ts index 527d782e..9d1b03d2 100644 --- a/src/vm/vm.ts +++ b/src/vm/vm.ts @@ -50,9 +50,9 @@ export async function createPluginConfigVM( vm.freeze(setTimeout, '__jestSetTimeout') } - vm.freeze(async (promise: () => Promise) => { + const asyncGuard = async (promise: () => Promise) => { const timeout = server.TASK_TIMEOUT - const response = await Promise.race([ + return await Promise.race([ promise, new Promise((resolve, reject) => setTimeout(() => { @@ -63,8 +63,9 @@ export async function createPluginConfigVM( }, timeout * 1000) ), ]) - return response - }, '__asyncGuard') + } + + vm.freeze(asyncGuard, '__asyncGuard') vm.freeze( { @@ -92,7 +93,11 @@ export async function createPluginConfigVM( const responseVar = `__pluginDetails${randomBytes(64).toString('hex')}` vm.run(` - const ${responseVar} = (() => { + if (typeof global.${responseVar} !== 'undefined') { + throw new Error("Plugin created variable ${responseVar} that is reserved for the VM.") + } + let ${responseVar} = undefined; + ((__asyncGuard) => { // helpers to get globals const __getExportDestinations = () => [exports, module.exports, global] const __getExported = (key) => __getExportDestinations().find(a => a[key])?.[key]; @@ -157,13 +162,13 @@ export async function createPluginConfigVM( // run the plugin setup script, if present const __setupPlugin = __asyncFunctionGuard(async () => __callWithMeta('setupPlugin')); - return { + ${responseVar} = { __methods, __tasks, __setupPlugin } - })(); - `) + }) + `)(asyncGuard) await vm.run(`${responseVar}.__setupPlugin()`) diff --git a/tests/postgres/vm.timeout.test.ts b/tests/postgres/vm.timeout.test.ts index 68e7bc8f..902a9a39 100644 --- a/tests/postgres/vm.timeout.test.ts +++ b/tests/postgres/vm.timeout.test.ts @@ -234,6 +234,38 @@ describe('vm timeout tests', () => { expect(errorMessage!).toEqual('Script execution timed out after promise waited for 1 second') }) + test('small promises and overriding async guard', async () => { + const indexJs = ` + // const __asyncGuard = false + async function processEvent (event, meta) { + const __asyncGuard = (a) => a + const data = await fetch('https://www.example.com').then(response => response.json()).then(data => { + return data + }) + + await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) + + event.properties.processed = 'yup' + return event + } + ` + await resetTestDatabase(indexJs) + const vm = await createPluginConfigVM(server, pluginConfig39, indexJs) + const date = new Date() + let errorMessage = undefined + try { + await vm.methods.processEvent({ ...defaultEvent }) + } catch (e) { + errorMessage = e.message + } + expect(new Date().valueOf() - date.valueOf()).toBeGreaterThan(1000) + expect(new Date().valueOf() - date.valueOf()).toBeLessThan(4000) + expect(errorMessage!).toEqual('Script execution timed out after promise waited for 1 second') + }) + test('long promise', async () => { const indexJs = ` async function processEvent (event, meta) { From 0d35843c34d9261f96feb8ed4b6fb4954ebc4a98 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 16 Feb 2021 17:08:20 +0100 Subject: [PATCH 24/51] reduce how much we do in a benchmark --- benchmarks/vm/worker.benchmark.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/benchmarks/vm/worker.benchmark.ts b/benchmarks/vm/worker.benchmark.ts index 3b37e0e8..4c75e504 100644 --- a/benchmarks/vm/worker.benchmark.ts +++ b/benchmarks/vm/worker.benchmark.ts @@ -94,11 +94,14 @@ test('piscina worker benchmark', async () => { `, }, { - testName: 'for200k', + // used to be 'for200k', but since we inject Date.now() code into + // the for/while/do loops, to throw if they are too long, running + // those comparisons 200k * 10k * runs * threads times is bit too much + testName: 'for2k', events: 10000, testCode: ` function processEvent (event, meta) { - let j = 0; for(let i = 0; i < 200000; i++) { j = i }; + let j = 0; for(let i = 0; i < 2000; i++) { j = i }; event.properties = { "somewhere": "over the rainbow" }; return event } From aa016aa2eb0c6f28b2dc38fae4a2d42950bd0a38 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 16 Feb 2021 17:16:11 +0100 Subject: [PATCH 25/51] add types to loop timeout --- src/vm/transforms/loop-timeout.ts | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/src/vm/transforms/loop-timeout.ts b/src/vm/transforms/loop-timeout.ts index 5b17e572..8ae6e011 100644 --- a/src/vm/transforms/loop-timeout.ts +++ b/src/vm/transforms/loop-timeout.ts @@ -2,14 +2,28 @@ // https://medium.com/@bvjebin/js-infinite-loops-killing-em-e1c2f5f2db7f // https://github.com/jsbin/loop-protect/blob/master/lib/index.js +import * as types from '@babel/types' + import { PluginsServer } from '../../types' -const generateBefore = (t: any, id: any) => +const generateBefore = (t: typeof types, id: any) => t.variableDeclaration('var', [ t.variableDeclarator(id, t.callExpression(t.memberExpression(t.identifier('Date'), t.identifier('now')), [])), ]) -const generateInside = ({ t, id, line, ch, timeout }: any = {}) => { +const generateInside = ({ + t, + id, + line, + ch, + timeout, +}: { + t: typeof types + id: types.Identifier + line: number + ch: number + timeout: number +}) => { return t.ifStatement( t.binaryExpression( '>', @@ -21,7 +35,7 @@ const generateInside = ({ t, id, line, ch, timeout }: any = {}) => { t.numericLiteral(timeout * 1000) ), t.throwStatement( - t.NewExpression(t.identifier('Error'), [ + t.newExpression(t.identifier('Error'), [ t.stringLiteral( `Script execution timed out after looping for ${timeout} second${ timeout === 1 ? '' : 's' @@ -32,7 +46,7 @@ const generateInside = ({ t, id, line, ch, timeout }: any = {}) => { ) } -const protect = (t: any, timeout: number) => (path: any) => { +const protect = (t: typeof types, timeout: number) => (path: any): void => { if (!path.node.loc) { // I don't really know _how_ we get into this state // but https://jsbin.com/mipesawapi/1/ triggers it @@ -59,8 +73,7 @@ const protect = (t: any, timeout: number) => (path: any) => { body.unshiftContainer('body', inside) } -export const loopTimeout = (server: PluginsServer) => (babel: any) => { - const t = babel.types +export const loopTimeout = (server: PluginsServer) => ({ types: t }: { types: typeof types }) => { return { visitor: { WhileStatement: protect(t, server.TASK_TIMEOUT), From f0cd191bcf362d6d55d86a2b5028031d50f59935 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 16 Feb 2021 17:17:17 +0100 Subject: [PATCH 26/51] types for promise timeout --- src/vm/transforms/promise-timeout.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/vm/transforms/promise-timeout.ts b/src/vm/transforms/promise-timeout.ts index 586fe7f9..4ba9dda4 100644 --- a/src/vm/transforms/promise-timeout.ts +++ b/src/vm/transforms/promise-timeout.ts @@ -1,10 +1,11 @@ // inspired by: https://github.com/treywood/babel-plugin-bluebird-async-functions/ +import * as types from '@babel/types' + import { PluginsServer } from '../../types' const REPLACED = Symbol() -export const promiseTimeout = (server: PluginsServer) => (babel: any) => { - const t = babel.types +export const promiseTimeout = (server: PluginsServer) => ({ types: t }: { types: typeof types }) => { return { visitor: { // changes: bla.then --> __asyncGuard(bla).then @@ -21,7 +22,7 @@ export const promiseTimeout = (server: PluginsServer) => (babel: any) => { t.callExpression(t.identifier('__asyncGuard'), [node.object]), t.identifier('then') ) - newCall[REPLACED] = true + ;(newCall as any)[REPLACED] = true path.replaceWith(newCall) } }, @@ -35,7 +36,7 @@ export const promiseTimeout = (server: PluginsServer) => (babel: any) => { const newAwait = t.awaitExpression( t.callExpression(t.identifier('__asyncGuard'), [node.argument]) ) - newAwait[REPLACED] = true + ;(newAwait as any)[REPLACED] = true path.replaceWith(newAwait) } }, From c3c9d80d0760f4e1cdb4f4929e56f89897e57382 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 16 Feb 2021 17:19:42 +0100 Subject: [PATCH 27/51] fix line/column numbers --- tests/postgres/vm.timeout.test.ts | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/postgres/vm.timeout.test.ts b/tests/postgres/vm.timeout.test.ts index 902a9a39..cdc460a9 100644 --- a/tests/postgres/vm.timeout.test.ts +++ b/tests/postgres/vm.timeout.test.ts @@ -46,7 +46,7 @@ describe('vm timeout tests', () => { errorMessage = e.message } expect(new Date().valueOf() - date.valueOf()).toBeGreaterThan(1000) - expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3:16') }) test('while loop no body', async () => { @@ -66,7 +66,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 4') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 4:16') }) test('while loop in promise', async () => { @@ -85,7 +85,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3:53') }) test('do..while loop', async () => { @@ -104,7 +104,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3:16') }) test('do..while loop no body', async () => { @@ -124,7 +124,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 4') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 4:16') }) test('do..while loop in promise', async () => { @@ -143,7 +143,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3:53') }) test('for loop', async () => { @@ -162,7 +162,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3:16') }) test('for loop no body', async () => { @@ -182,7 +182,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 4') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 4:16') }) test('for loop in promise', async () => { @@ -201,7 +201,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3') + expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3:53') }) test('small promises', async () => { From 1a65f7f4b8c1f060561243216fe58c20e7411e99 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 16 Feb 2021 17:28:56 +0100 Subject: [PATCH 28/51] explain some decisions --- src/vm/vm.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/vm/vm.ts b/src/vm/vm.ts index 9d1b03d2..4b507e27 100644 --- a/src/vm/vm.ts +++ b/src/vm/vm.ts @@ -50,6 +50,9 @@ export async function createPluginConfigVM( vm.freeze(setTimeout, '__jestSetTimeout') } + // Creating this outside the vm (so not in a babel plugin for example) + // because `setTimeout` is not available inside the vm... and we don't want to + // make it available for now, as it makes it easier to create malicious code const asyncGuard = async (promise: () => Promise) => { const timeout = server.TASK_TIMEOUT return await Promise.race([ @@ -92,6 +95,10 @@ export async function createPluginConfigVM( const responseVar = `__pluginDetails${randomBytes(64).toString('hex')}` + // Explicitly passing __asyncGuard to the returned function from `vm.run` in order + // to make it harder to override the global `__asyncGuard = noop` inside plugins. + // This way even if promises inside plugins are unbounded, the `processEvent` function + // itself will still terminate after TASK_TIMEOUT seconds, not clogging the entire ingestion. vm.run(` if (typeof global.${responseVar} !== 'undefined') { throw new Error("Plugin created variable ${responseVar} that is reserved for the VM.") From 00be5955131904f1c19f05625bf59c349d9734de Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 16 Feb 2021 23:51:24 +0100 Subject: [PATCH 29/51] verify that the process event timeout applies e2e. --- tests/postgres/e2e.test.ts | 2 +- tests/postgres/e2e.timeout.test.ts | 64 ++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 tests/postgres/e2e.timeout.test.ts diff --git a/tests/postgres/e2e.test.ts b/tests/postgres/e2e.test.ts index 0e567b92..1a2a4733 100644 --- a/tests/postgres/e2e.test.ts +++ b/tests/postgres/e2e.test.ts @@ -1,7 +1,7 @@ import { startPluginsServer } from '../../src/server' import { LogLevel } from '../../src/types' import { PluginsServer } from '../../src/types' -import { delay, UUIDT } from '../../src/utils' +import { UUIDT } from '../../src/utils' import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog' import { makePiscina } from '../../src/worker/piscina' import { pluginConfig39 } from '../helpers/plugins' diff --git a/tests/postgres/e2e.timeout.test.ts b/tests/postgres/e2e.timeout.test.ts new file mode 100644 index 00000000..13ad4649 --- /dev/null +++ b/tests/postgres/e2e.timeout.test.ts @@ -0,0 +1,64 @@ +import { startPluginsServer } from '../../src/server' +import { LogLevel, PluginsServer } from '../../src/types' +import { UUIDT } from '../../src/utils' +import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog' +import { makePiscina } from '../../src/worker/piscina' +import { pluginConfig39 } from '../helpers/plugins' +import { resetTestDatabase } from '../helpers/sql' +import { delayUntilEventIngested } from '../shared/process-event' + +jest.setTimeout(60000) // 60 sec timeout + +describe('e2e postgres ingestion timeout', () => { + let server: PluginsServer + let stopServer: () => Promise + let posthog: DummyPostHog + + beforeEach(async () => { + await resetTestDatabase(` + async function processEvent (event) { + await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) + event.properties = { passed: true } + return event + } + `) + const startResponse = await startPluginsServer( + { + WORKER_CONCURRENCY: 2, + TASK_TIMEOUT: 2, + PLUGINS_CELERY_QUEUE: 'test-plugins-celery-queue', + CELERY_DEFAULT_QUEUE: 'test-celery-default-queue', + PLUGIN_SERVER_INGESTION: true, + LOG_LEVEL: LogLevel.Log, + KAFKA_ENABLED: false, + }, + makePiscina + ) + server = startResponse.server + stopServer = startResponse.stop + + await server.redis.del(server.PLUGINS_CELERY_QUEUE) + await server.redis.del(server.CELERY_DEFAULT_QUEUE) + + posthog = createPosthog(server, pluginConfig39) + }) + + afterEach(async () => { + await stopServer() + }) + + test('event captured, processed, ingested', async () => { + expect((await server.db.fetchEvents()).length).toBe(0) + const uuid = new UUIDT().toString() + posthog.capture('custom event', { name: 'haha', uuid, randomProperty: 'lololo' }) + await delayUntilEventIngested(() => server.db.fetchEvents()) + const events = await server.db.fetchEvents() + expect(events.length).toBe(1) + expect(events[0].properties.name).toEqual('haha') + expect(events[0].properties.passed).not.toEqual(true) + }) +}) From dbba38c429227da66e1aaad89796ec950b275227 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 16 Feb 2021 23:52:49 +0100 Subject: [PATCH 30/51] managed to get equal, so changing --- tests/postgres/vm.timeout.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/postgres/vm.timeout.test.ts b/tests/postgres/vm.timeout.test.ts index cdc460a9..996e250d 100644 --- a/tests/postgres/vm.timeout.test.ts +++ b/tests/postgres/vm.timeout.test.ts @@ -45,7 +45,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(new Date().valueOf() - date.valueOf()).toBeGreaterThan(1000) + expect(new Date().valueOf() - date.valueOf()).toBeGreaterThanOrEqual(1000) expect(errorMessage!).toEqual('Script execution timed out after looping for 1 second on line 3:16') }) @@ -229,7 +229,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(new Date().valueOf() - date.valueOf()).toBeGreaterThan(1000) + expect(new Date().valueOf() - date.valueOf()).toBeGreaterThanOrEqual(1000) expect(new Date().valueOf() - date.valueOf()).toBeLessThan(4000) expect(errorMessage!).toEqual('Script execution timed out after promise waited for 1 second') }) @@ -261,7 +261,7 @@ describe('vm timeout tests', () => { } catch (e) { errorMessage = e.message } - expect(new Date().valueOf() - date.valueOf()).toBeGreaterThan(1000) + expect(new Date().valueOf() - date.valueOf()).toBeGreaterThanOrEqual(1000) expect(new Date().valueOf() - date.valueOf()).toBeLessThan(4000) expect(errorMessage!).toEqual('Script execution timed out after promise waited for 1 second') }) From 4eaf4fd36d8c45d3114b1109f87573422c14567a Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 00:31:27 +0100 Subject: [PATCH 31/51] update message that it's just a warning --- src/ingestion/kafka-queue.ts | 4 ++-- src/ingestion/utils.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ingestion/kafka-queue.ts b/src/ingestion/kafka-queue.ts index 11193f5f..f71dff12 100644 --- a/src/ingestion/kafka-queue.ts +++ b/src/ingestion/kafka-queue.ts @@ -63,7 +63,7 @@ export class KafkaQueue implements Queue { ) const processingTimeout = timeoutGuard( - `Took too long to run plugins on ${pluginEvents.length} events! Timeout after 30 sec!` + `Still running plugins on ${pluginEvents.length} events. Timeout warning after 30 sec!` ) const batches = groupIntoBatches(pluginEvents, maxBatchSize) const processedEvents = (await Promise.all(batches.map(this.processEventBatch))).flat() @@ -77,7 +77,7 @@ export class KafkaQueue implements Queue { ) const ingestionTimeout = timeoutGuard( - `Took too long to ingest ${processedEvents.length} events! Timeout after 30 sec!` + `Still ingesting ${processedEvents.length} events. Timeout warning after 30 sec!` ) // TODO: add chunking into groups of 500 or so. Might start too many promises at once now diff --git a/src/ingestion/utils.ts b/src/ingestion/utils.ts index 49450888..5ef67915 100644 --- a/src/ingestion/utils.ts +++ b/src/ingestion/utils.ts @@ -156,9 +156,9 @@ export function chainToElements(chain: string): Element[] { return elements } -export function timeoutGuard(message: string): NodeJS.Timeout { +export function timeoutGuard(message: string, timeout = 30000): NodeJS.Timeout { return setTimeout(() => { console.log(`⌛⌛⌛ ${message}`) Sentry.captureMessage(message) - }, 30000) + }, timeout) } From 6f55ec825d43e4c2389e5ac33a0ae538757881ee Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 00:32:08 +0100 Subject: [PATCH 32/51] add e2e kafka test for bad delay --- benchmarks/clickhouse/e2e.kafka.benchmark.ts | 70 ++++++++++++-------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/benchmarks/clickhouse/e2e.kafka.benchmark.ts b/benchmarks/clickhouse/e2e.kafka.benchmark.ts index 5b4ac98d..6e5d769e 100644 --- a/benchmarks/clickhouse/e2e.kafka.benchmark.ts +++ b/benchmarks/clickhouse/e2e.kafka.benchmark.ts @@ -2,7 +2,7 @@ import { performance } from 'perf_hooks' import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics' import { startPluginsServer } from '../../src/server' -import { LogLevel, PluginsServerConfig, Queue } from '../../src/types' +import { ClickHouseEvent, LogLevel, PluginsServerConfig, Queue } from '../../src/types' import { PluginsServer } from '../../src/types' import { delay, UUIDT } from '../../src/utils' import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog' @@ -19,6 +19,7 @@ const extraServerConfig: Partial = { KAFKA_ENABLED: true, KAFKA_HOSTS: process.env.KAFKA_HOSTS || 'kafka:9092', WORKER_CONCURRENCY: 4, + TASK_TIMEOUT: 10, PLUGIN_SERVER_INGESTION: true, KAFKA_CONSUMPTION_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION, KAFKA_BATCH_PARALELL_PROCESSING: true, @@ -26,38 +27,15 @@ const extraServerConfig: Partial = { } describe('e2e kafka & clickhouse benchmark', () => { - let queue: Queue - let server: PluginsServer - let stopServer: () => Promise - let posthog: DummyPostHog - - beforeEach(async () => { - await resetTestDatabase(` - async function processEventBatch (batch) { - // console.log(\`Received batch of \${batch.length} events\`) - return batch.map(event => { - event.properties.processed = 'hell yes' - event.properties.upperUuid = event.properties.uuid?.toUpperCase() - return event - }) - } - `) + async function measurePerformance(code: string): Promise<[PluginsServer, () => Promise]> { + await resetTestDatabase(code) await resetKafka(extraServerConfig) await resetTestDatabaseClickhouse(extraServerConfig) - const startResponse = await startPluginsServer(extraServerConfig, makePiscina) - server = startResponse.server - stopServer = startResponse.stop - queue = startResponse.queue - - posthog = createPosthog(server, pluginConfig39) - }) + const { server, stop: stopServer, queue } = await startPluginsServer(extraServerConfig, makePiscina) - afterEach(async () => { - await stopServer() - }) + const posthog = createPosthog(server, pluginConfig39) - test('measure performance', async () => { console.debug = () => null const count = 3000 @@ -89,7 +67,41 @@ describe('e2e kafka & clickhouse benchmark', () => { )} events/sec, ${n(timeMs / count)}ms per event)` ) + return [server, stopServer] + } + + test('sync batch', async () => { + const [server, stopServer] = await measurePerformance(` + async function processEventBatch (batch) { + console.log(\`Received batch of \${batch.length} events\`) + return batch.map(event => { + event.properties.processed = 'hell yes' + event.properties.upperUuid = event.properties.uuid?.toUpperCase() + return event + }) + } + `) + const events = await server.db.fetchEvents() - expect(events[count - 1].properties.upperUuid).toEqual(events[count - 1].properties.uuid.toUpperCase()) + expect(events[2999].properties.upperUuid).toEqual(events[2999].properties.uuid.toUpperCase()) + + await stopServer() + }) + + test('bad delay', async () => { + // Delay up to 30sec in processEvent, while TASK_TIMEOUT=10 + // Effectively two thirds of the events should time out + const [server, stopServer] = await measurePerformance(` + async function processEvent (event) { + await new Promise(resolve => __jestSetTimeout(() => resolve(), 30000 * Math.random())) + event.properties.timeout = 'no timeout' + return event + } + `) + const events = (await server.db.fetchEvents()) as ClickHouseEvent[] + const props = events.map((e) => e.properties.timeout) + console.log(`${props.filter((p) => p).length} events out of 3000 took under 10sec, the rest were timed out`) + + await stopServer() }) }) From d3db5581de43bd7c11c65bb6243b7f0f5b71fcf7 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 01:03:03 +0100 Subject: [PATCH 33/51] shorter test in github action --- benchmarks/clickhouse/e2e.kafka.benchmark.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/benchmarks/clickhouse/e2e.kafka.benchmark.ts b/benchmarks/clickhouse/e2e.kafka.benchmark.ts index 6e5d769e..d0097fb5 100644 --- a/benchmarks/clickhouse/e2e.kafka.benchmark.ts +++ b/benchmarks/clickhouse/e2e.kafka.benchmark.ts @@ -19,7 +19,7 @@ const extraServerConfig: Partial = { KAFKA_ENABLED: true, KAFKA_HOSTS: process.env.KAFKA_HOSTS || 'kafka:9092', WORKER_CONCURRENCY: 4, - TASK_TIMEOUT: 10, + TASK_TIMEOUT: 5, PLUGIN_SERVER_INGESTION: true, KAFKA_CONSUMPTION_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION, KAFKA_BATCH_PARALELL_PROCESSING: true, @@ -89,18 +89,22 @@ describe('e2e kafka & clickhouse benchmark', () => { }) test('bad delay', async () => { - // Delay up to 30sec in processEvent, while TASK_TIMEOUT=10 + // Delay up to 15sec in processEvent, while TASK_TIMEOUT=5 // Effectively two thirds of the events should time out const [server, stopServer] = await measurePerformance(` async function processEvent (event) { - await new Promise(resolve => __jestSetTimeout(() => resolve(), 30000 * Math.random())) + await new Promise(resolve => __jestSetTimeout(() => resolve(), 15000 * Math.random())) event.properties.timeout = 'no timeout' return event } `) const events = (await server.db.fetchEvents()) as ClickHouseEvent[] - const props = events.map((e) => e.properties.timeout) - console.log(`${props.filter((p) => p).length} events out of 3000 took under 10sec, the rest were timed out`) + const passedEvents = events.filter((e) => e.properties.timeout).length + console.log( + `Out of 3000 events: ${passedEvents} took under 5sec, ${ + 3000 - passedEvents + } timed out. This should be a 1:2 ratio.` + ) await stopServer() }) From 733644f7025dc064beb435b7bd8975ffdb63850b Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 08:32:31 +0100 Subject: [PATCH 34/51] increase test timeout to see if that makes a difference (locally it takes 3min for both tests) --- benchmarks/clickhouse/e2e.kafka.benchmark.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/clickhouse/e2e.kafka.benchmark.ts b/benchmarks/clickhouse/e2e.kafka.benchmark.ts index d0097fb5..9a345414 100644 --- a/benchmarks/clickhouse/e2e.kafka.benchmark.ts +++ b/benchmarks/clickhouse/e2e.kafka.benchmark.ts @@ -13,7 +13,7 @@ import { pluginConfig39 } from '../../tests/helpers/plugins' import { resetTestDatabase } from '../../tests/helpers/sql' import { delayUntilEventIngested } from '../../tests/shared/process-event' -jest.setTimeout(600000) // 10min timeout +jest.setTimeout(1200000) // 20min timeout const extraServerConfig: Partial = { KAFKA_ENABLED: true, From deef4a88889cf546fcd31ebfe7875df400d8b386 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 09:12:36 +0100 Subject: [PATCH 35/51] skip the "slow on GH action" test for now --- benchmarks/clickhouse/e2e.kafka.benchmark.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/clickhouse/e2e.kafka.benchmark.ts b/benchmarks/clickhouse/e2e.kafka.benchmark.ts index 9a345414..c417bd91 100644 --- a/benchmarks/clickhouse/e2e.kafka.benchmark.ts +++ b/benchmarks/clickhouse/e2e.kafka.benchmark.ts @@ -13,7 +13,7 @@ import { pluginConfig39 } from '../../tests/helpers/plugins' import { resetTestDatabase } from '../../tests/helpers/sql' import { delayUntilEventIngested } from '../../tests/shared/process-event' -jest.setTimeout(1200000) // 20min timeout +jest.setTimeout(600000) // 10min timeout const extraServerConfig: Partial = { KAFKA_ENABLED: true, @@ -88,7 +88,7 @@ describe('e2e kafka & clickhouse benchmark', () => { await stopServer() }) - test('bad delay', async () => { + test.skip('bad delay', async () => { // Delay up to 15sec in processEvent, while TASK_TIMEOUT=5 // Effectively two thirds of the events should time out const [server, stopServer] = await measurePerformance(` From 8463e404a8d3b5df6278a0f7dd507ae887911a41 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 20:24:50 +0100 Subject: [PATCH 36/51] process kafka events in parallel by default --- src/config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config.ts b/src/config.ts index 48fc1728..ef03ba7d 100644 --- a/src/config.ts +++ b/src/config.ts @@ -24,7 +24,7 @@ export function getDefaultConfig(): PluginsServerConfig { KAFKA_CLIENT_CERT_KEY_B64: null, KAFKA_TRUSTED_CERT_B64: null, KAFKA_CONSUMPTION_TOPIC: null, - KAFKA_BATCH_PARALELL_PROCESSING: false, + KAFKA_BATCH_PARALELL_PROCESSING: true, PLUGIN_SERVER_INGESTION: false, PLUGINS_CELERY_QUEUE: 'posthog-plugins', REDIS_URL: 'redis://127.0.0.1', From 7a1ff20684e07fb3d25e0e5b811bce3e712eacf6 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 20:25:14 +0100 Subject: [PATCH 37/51] add more metrics to kafka queue --- src/ingestion/kafka-queue.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/ingestion/kafka-queue.ts b/src/ingestion/kafka-queue.ts index f71dff12..78bb7ee0 100644 --- a/src/ingestion/kafka-queue.ts +++ b/src/ingestion/kafka-queue.ts @@ -36,7 +36,7 @@ export class KafkaQueue implements Queue { isRunning, isStale, }: EachBatchPayload): Promise { - const batchProcessingTimer = new Date() + const batchStartTimer = new Date() const uuidOrder = new Map() const uuidOffset = new Map() @@ -70,6 +70,9 @@ export class KafkaQueue implements Queue { clearTimeout(processingTimeout) + this.pluginsServer.statsd?.timing('kafka_queue.each_batch.process_events', batchStartTimer) + const batchIngestionTimer = new Date() + // Sort in the original order that the events came in, putting any randomly added events to the end. // This is so we would resolve the correct kafka offsets in order. processedEvents.sort( @@ -126,7 +129,8 @@ export class KafkaQueue implements Queue { clearTimeout(ingestionTimeout) - this.pluginsServer.statsd?.timing('kafka_queue.each_batch', batchProcessingTimer) + this.pluginsServer.statsd?.timing('kafka_queue.each_batch.ingest_events', batchIngestionTimer) + this.pluginsServer.statsd?.timing('kafka_queue.each_batch', batchStartTimer) resolveOffset(batch.lastOffset()) await heartbeat() await commitOffsetsIfNecessary() From 75ef5a37f9b5c5994364f79007fbb99ee7a96478 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 20:29:42 +0100 Subject: [PATCH 38/51] some debug to help fix the test --- benchmarks/clickhouse/e2e.kafka.benchmark.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/benchmarks/clickhouse/e2e.kafka.benchmark.ts b/benchmarks/clickhouse/e2e.kafka.benchmark.ts index c417bd91..7c3ae145 100644 --- a/benchmarks/clickhouse/e2e.kafka.benchmark.ts +++ b/benchmarks/clickhouse/e2e.kafka.benchmark.ts @@ -5,7 +5,7 @@ import { startPluginsServer } from '../../src/server' import { ClickHouseEvent, LogLevel, PluginsServerConfig, Queue } from '../../src/types' import { PluginsServer } from '../../src/types' import { delay, UUIDT } from '../../src/utils' -import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog' +import { createPosthog } from '../../src/vm/extensions/posthog' import { makePiscina } from '../../src/worker/piscina' import { resetTestDatabaseClickhouse } from '../../tests/helpers/clickhouse' import { resetKafka } from '../../tests/helpers/kafka' @@ -71,6 +71,7 @@ describe('e2e kafka & clickhouse benchmark', () => { } test('sync batch', async () => { + console.log('Starting "sync batch" test') const [server, stopServer] = await measurePerformance(` async function processEventBatch (batch) { console.log(\`Received batch of \${batch.length} events\`) @@ -86,9 +87,11 @@ describe('e2e kafka & clickhouse benchmark', () => { expect(events[2999].properties.upperUuid).toEqual(events[2999].properties.uuid.toUpperCase()) await stopServer() + console.log('Stopping "sync batch" test') }) test.skip('bad delay', async () => { + console.log('Starting "bad delay" test') // Delay up to 15sec in processEvent, while TASK_TIMEOUT=5 // Effectively two thirds of the events should time out const [server, stopServer] = await measurePerformance(` @@ -107,5 +110,6 @@ describe('e2e kafka & clickhouse benchmark', () => { ) await stopServer() + console.log('Stopping "bad delay" test') }) }) From f792c51e653b3c94327043cd936833c259349604 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 20:47:43 +0100 Subject: [PATCH 39/51] add debug log in the delayUntilEventIngested function --- benchmarks/clickhouse/e2e.kafka.benchmark.ts | 2 +- tests/shared/process-event.ts | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/benchmarks/clickhouse/e2e.kafka.benchmark.ts b/benchmarks/clickhouse/e2e.kafka.benchmark.ts index 7c3ae145..1db43fa2 100644 --- a/benchmarks/clickhouse/e2e.kafka.benchmark.ts +++ b/benchmarks/clickhouse/e2e.kafka.benchmark.ts @@ -56,7 +56,7 @@ describe('e2e kafka & clickhouse benchmark', () => { console.log('Starting timer') const startTime = performance.now() - await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count) + await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count, true) const timeMs = performance.now() - startTime console.log('Finished!') diff --git a/tests/shared/process-event.ts b/tests/shared/process-event.ts index 59f2d68b..7aa9968a 100644 --- a/tests/shared/process-event.ts +++ b/tests/shared/process-event.ts @@ -1,5 +1,6 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' import { DateTime } from 'luxon' +import { performance } from 'perf_hooks' import { IEvent } from '../../src/idl/protos' import { EventsProcessor } from '../../src/ingestion/process-event' @@ -24,10 +25,19 @@ export async function delayUntilEventIngested( fetchEvents: () => Promise, minCount = 1, delayMs = 500, - maxDelayCount = 30 + maxDelayCount = 30, + debug = false ): Promise { + const timer = performance.now() for (let i = 0; i < maxDelayCount; i++) { const events = await fetchEvents() + if (debug) { + console.log( + `Waiting. ${Math.round((performance.now() - timer) / 100) * 10}s since the start. ${ + typeof events === 'number' ? events : events.length + } events.` + ) + } if ((typeof events === 'number' ? events : events.length) >= minCount) { return } From 28d3f944b726a16513926f29ec0dd70a1fa68531 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 20:48:36 +0100 Subject: [PATCH 40/51] add "single_event_batch" timing to the event processing steps --- src/ingestion/kafka-queue.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/ingestion/kafka-queue.ts b/src/ingestion/kafka-queue.ts index 78bb7ee0..7bbc156a 100644 --- a/src/ingestion/kafka-queue.ts +++ b/src/ingestion/kafka-queue.ts @@ -66,7 +66,16 @@ export class KafkaQueue implements Queue { `Still running plugins on ${pluginEvents.length} events. Timeout warning after 30 sec!` ) const batches = groupIntoBatches(pluginEvents, maxBatchSize) - const processedEvents = (await Promise.all(batches.map(this.processEventBatch))).flat() + const processedEvents = ( + await Promise.all( + batches.map(async (batch) => { + const timer = new Date() + const processedBatch = this.processEventBatch(batch) + this.pluginsServer.statsd?.timing('kafka_queue.single_event_batch', timer) + return processedBatch + }) + ) + ).flat() clearTimeout(processingTimeout) From 8c2d66ee1d63cb3e6ecdb6eeca57461afad404e9 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 20:59:30 +0100 Subject: [PATCH 41/51] fix timer --- tests/shared/process-event.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/shared/process-event.ts b/tests/shared/process-event.ts index 7aa9968a..6b3806f1 100644 --- a/tests/shared/process-event.ts +++ b/tests/shared/process-event.ts @@ -33,7 +33,7 @@ export async function delayUntilEventIngested( const events = await fetchEvents() if (debug) { console.log( - `Waiting. ${Math.round((performance.now() - timer) / 100) * 10}s since the start. ${ + `Waiting. ${Math.round((performance.now() - timer) / 100) / 10}s since the start. ${ typeof events === 'number' ? events : events.length } events.` ) From f2aeff86f684768e406e4cff53bbd7f7271ea46b Mon Sep 17 00:00:00 2001 From: Michael Matloka Date: Wed, 17 Feb 2021 21:39:56 +0100 Subject: [PATCH 42/51] Improve timeoutGuard default timeout --- src/ingestion/utils.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ingestion/utils.ts b/src/ingestion/utils.ts index 5ef67915..a8bfcb64 100644 --- a/src/ingestion/utils.ts +++ b/src/ingestion/utils.ts @@ -1,6 +1,7 @@ import * as Sentry from '@sentry/node' import crypto from 'crypto' +import { defaultConfig } from '../config' import { BasePerson, Element, Person, RawPerson } from '../types' export function unparsePersonPartial(person: Partial): Partial { @@ -156,7 +157,7 @@ export function chainToElements(chain: string): Element[] { return elements } -export function timeoutGuard(message: string, timeout = 30000): NodeJS.Timeout { +export function timeoutGuard(message: string, timeout = defaultConfig.TASK_TIMEOUT * 1000): NodeJS.Timeout { return setTimeout(() => { console.log(`⌛⌛⌛ ${message}`) Sentry.captureMessage(message) From 440766abf7d7328dcc4b920d4f67f20604d9e088 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 22:26:08 +0100 Subject: [PATCH 43/51] revert benchmark to the last one that worked --- benchmarks/clickhouse/e2e.kafka.benchmark.ts | 80 ++++++---------- .../clickhouse/e2e.kafka.timeout.benchmark.ts | 95 +++++++++++++++++++ 2 files changed, 125 insertions(+), 50 deletions(-) create mode 100644 benchmarks/clickhouse/e2e.kafka.timeout.benchmark.ts diff --git a/benchmarks/clickhouse/e2e.kafka.benchmark.ts b/benchmarks/clickhouse/e2e.kafka.benchmark.ts index 1db43fa2..0abf7b2d 100644 --- a/benchmarks/clickhouse/e2e.kafka.benchmark.ts +++ b/benchmarks/clickhouse/e2e.kafka.benchmark.ts @@ -2,10 +2,10 @@ import { performance } from 'perf_hooks' import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics' import { startPluginsServer } from '../../src/server' -import { ClickHouseEvent, LogLevel, PluginsServerConfig, Queue } from '../../src/types' +import { LogLevel, PluginsServerConfig, Queue } from '../../src/types' import { PluginsServer } from '../../src/types' import { delay, UUIDT } from '../../src/utils' -import { createPosthog } from '../../src/vm/extensions/posthog' +import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog' import { makePiscina } from '../../src/worker/piscina' import { resetTestDatabaseClickhouse } from '../../tests/helpers/clickhouse' import { resetKafka } from '../../tests/helpers/kafka' @@ -19,7 +19,6 @@ const extraServerConfig: Partial = { KAFKA_ENABLED: true, KAFKA_HOSTS: process.env.KAFKA_HOSTS || 'kafka:9092', WORKER_CONCURRENCY: 4, - TASK_TIMEOUT: 5, PLUGIN_SERVER_INGESTION: true, KAFKA_CONSUMPTION_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION, KAFKA_BATCH_PARALELL_PROCESSING: true, @@ -27,15 +26,38 @@ const extraServerConfig: Partial = { } describe('e2e kafka & clickhouse benchmark', () => { - async function measurePerformance(code: string): Promise<[PluginsServer, () => Promise]> { - await resetTestDatabase(code) + let queue: Queue + let server: PluginsServer + let stopServer: () => Promise + let posthog: DummyPostHog + + beforeEach(async () => { + await resetTestDatabase(` + async function processEventBatch (batch) { + // console.log(\`Received batch of \${batch.length} events\`) + return batch.map(event => { + event.properties.processed = 'hell yes' + event.properties.upperUuid = event.properties.uuid?.toUpperCase() + return event + }) + } + `) await resetKafka(extraServerConfig) await resetTestDatabaseClickhouse(extraServerConfig) - const { server, stop: stopServer, queue } = await startPluginsServer(extraServerConfig, makePiscina) + const startResponse = await startPluginsServer(extraServerConfig, makePiscina) + server = startResponse.server + stopServer = startResponse.stop + queue = startResponse.queue - const posthog = createPosthog(server, pluginConfig39) + posthog = createPosthog(server, pluginConfig39) + }) + + afterEach(async () => { + await stopServer() + }) + test('measure performance', async () => { console.debug = () => null const count = 3000 @@ -67,49 +89,7 @@ describe('e2e kafka & clickhouse benchmark', () => { )} events/sec, ${n(timeMs / count)}ms per event)` ) - return [server, stopServer] - } - - test('sync batch', async () => { - console.log('Starting "sync batch" test') - const [server, stopServer] = await measurePerformance(` - async function processEventBatch (batch) { - console.log(\`Received batch of \${batch.length} events\`) - return batch.map(event => { - event.properties.processed = 'hell yes' - event.properties.upperUuid = event.properties.uuid?.toUpperCase() - return event - }) - } - `) - const events = await server.db.fetchEvents() - expect(events[2999].properties.upperUuid).toEqual(events[2999].properties.uuid.toUpperCase()) - - await stopServer() - console.log('Stopping "sync batch" test') - }) - - test.skip('bad delay', async () => { - console.log('Starting "bad delay" test') - // Delay up to 15sec in processEvent, while TASK_TIMEOUT=5 - // Effectively two thirds of the events should time out - const [server, stopServer] = await measurePerformance(` - async function processEvent (event) { - await new Promise(resolve => __jestSetTimeout(() => resolve(), 15000 * Math.random())) - event.properties.timeout = 'no timeout' - return event - } - `) - const events = (await server.db.fetchEvents()) as ClickHouseEvent[] - const passedEvents = events.filter((e) => e.properties.timeout).length - console.log( - `Out of 3000 events: ${passedEvents} took under 5sec, ${ - 3000 - passedEvents - } timed out. This should be a 1:2 ratio.` - ) - - await stopServer() - console.log('Stopping "bad delay" test') + expect(events[count - 1].properties.upperUuid).toEqual(events[count - 1].properties.uuid.toUpperCase()) }) }) diff --git a/benchmarks/clickhouse/e2e.kafka.timeout.benchmark.ts b/benchmarks/clickhouse/e2e.kafka.timeout.benchmark.ts new file mode 100644 index 00000000..d289628e --- /dev/null +++ b/benchmarks/clickhouse/e2e.kafka.timeout.benchmark.ts @@ -0,0 +1,95 @@ +import { performance } from 'perf_hooks' + +import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics' +import { startPluginsServer } from '../../src/server' +import { ClickHouseEvent, LogLevel, PluginsServerConfig, Queue } from '../../src/types' +import { PluginsServer } from '../../src/types' +import { delay, UUIDT } from '../../src/utils' +import { createPosthog } from '../../src/vm/extensions/posthog' +import { makePiscina } from '../../src/worker/piscina' +import { resetTestDatabaseClickhouse } from '../../tests/helpers/clickhouse' +import { resetKafka } from '../../tests/helpers/kafka' +import { pluginConfig39 } from '../../tests/helpers/plugins' +import { resetTestDatabase } from '../../tests/helpers/sql' +import { delayUntilEventIngested } from '../../tests/shared/process-event' + +jest.setTimeout(600000) // 10min timeout + +const extraServerConfig: Partial = { + KAFKA_ENABLED: true, + KAFKA_HOSTS: process.env.KAFKA_HOSTS || 'kafka:9092', + WORKER_CONCURRENCY: 4, + TASK_TIMEOUT: 5, + PLUGIN_SERVER_INGESTION: true, + KAFKA_CONSUMPTION_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION, + KAFKA_BATCH_PARALELL_PROCESSING: true, + LOG_LEVEL: LogLevel.Log, +} + +describe('e2e kafka & clickhouse timeout benchmark', () => { + async function measurePerformance(code: string): Promise<[PluginsServer, () => Promise]> { + await resetTestDatabase(code) + await resetKafka(extraServerConfig) + await resetTestDatabaseClickhouse(extraServerConfig) + + const { server, stop: stopServer, queue } = await startPluginsServer(extraServerConfig, makePiscina) + + const posthog = createPosthog(server, pluginConfig39) + + console.debug = () => null + + const count = 3000 + + // fill in the queue + function createEvent() { + const uuid = new UUIDT().toString() + posthog.capture('custom event', { name: 'haha', uuid, randomProperty: 'lololo' }) + } + await queue.pause() + for (let i = 0; i < count; i++) { + createEvent() + } + + // hope that 5sec is enough to load kafka with all the events (posthog.capture can't be awaited) + await delay(5000) + await queue.resume() + + console.log('Starting timer') + const startTime = performance.now() + await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count, true) + const timeMs = performance.now() - startTime + console.log('Finished!') + + const n = (n: number) => `${Math.round(n * 100) / 100}` + console.log( + `[Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n( + 1000 / (timeMs / count) + )} events/sec, ${n(timeMs / count)}ms per event)` + ) + + return [server, stopServer] + } + + test('bad delay', async () => { + console.log('Starting "bad delay" test') + // Delay up to 15sec in processEvent, while TASK_TIMEOUT=5 + // Effectively two thirds of the events should time out + const [server, stopServer] = await measurePerformance(` + async function processEvent (event) { + await new Promise(resolve => __jestSetTimeout(() => resolve(), 15000 * Math.random())) + event.properties.timeout = 'no timeout' + return event + } + `) + const events = (await server.db.fetchEvents()) as ClickHouseEvent[] + const passedEvents = events.filter((e) => e.properties.timeout).length + console.log( + `Out of 3000 events: ${passedEvents} took under 5sec, ${ + 3000 - passedEvents + } timed out. This should be a 1:2 ratio.` + ) + + await stopServer() + console.log('Stopping "bad delay" test') + }) +}) From 0b05999629777023bfbab9a084efb153d303bc42 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 22:53:22 +0100 Subject: [PATCH 44/51] skip bad delay --- benchmarks/clickhouse/e2e.kafka.timeout.benchmark.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/clickhouse/e2e.kafka.timeout.benchmark.ts b/benchmarks/clickhouse/e2e.kafka.timeout.benchmark.ts index d289628e..25106a77 100644 --- a/benchmarks/clickhouse/e2e.kafka.timeout.benchmark.ts +++ b/benchmarks/clickhouse/e2e.kafka.timeout.benchmark.ts @@ -70,7 +70,7 @@ describe('e2e kafka & clickhouse timeout benchmark', () => { return [server, stopServer] } - test('bad delay', async () => { + test.skip('bad delay', async () => { console.log('Starting "bad delay" test') // Delay up to 15sec in processEvent, while TASK_TIMEOUT=5 // Effectively two thirds of the events should time out From be1af7b046d18bc51b7ead02524d237250c9690d Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 23:22:28 +0100 Subject: [PATCH 45/51] add back a 1:1 copy of the e2e.kafka test, but with the timeout code. see if GH actions run --- .../clickhouse/e2e.timeout.benchmark.ts | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 benchmarks/clickhouse/e2e.timeout.benchmark.ts diff --git a/benchmarks/clickhouse/e2e.timeout.benchmark.ts b/benchmarks/clickhouse/e2e.timeout.benchmark.ts new file mode 100644 index 00000000..97e5d770 --- /dev/null +++ b/benchmarks/clickhouse/e2e.timeout.benchmark.ts @@ -0,0 +1,93 @@ +import { performance } from 'perf_hooks' + +import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics' +import { startPluginsServer } from '../../src/server' +import { LogLevel, PluginsServerConfig, Queue } from '../../src/types' +import { PluginsServer } from '../../src/types' +import { delay, UUIDT } from '../../src/utils' +import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog' +import { makePiscina } from '../../src/worker/piscina' +import { resetTestDatabaseClickhouse } from '../../tests/helpers/clickhouse' +import { resetKafka } from '../../tests/helpers/kafka' +import { pluginConfig39 } from '../../tests/helpers/plugins' +import { resetTestDatabase } from '../../tests/helpers/sql' +import { delayUntilEventIngested } from '../../tests/shared/process-event' + +jest.setTimeout(600000) // 10min timeout + +const extraServerConfig: Partial = { + KAFKA_ENABLED: true, + KAFKA_HOSTS: process.env.KAFKA_HOSTS || 'kafka:9092', + WORKER_CONCURRENCY: 4, + TASK_TIMEOUT: 5, + PLUGIN_SERVER_INGESTION: true, + KAFKA_CONSUMPTION_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION, + KAFKA_BATCH_PARALELL_PROCESSING: true, + LOG_LEVEL: LogLevel.Log, +} + +describe('e2e kafka processing timeout benchmark', () => { + let queue: Queue + let server: PluginsServer + let stopServer: () => Promise + let posthog: DummyPostHog + + beforeEach(async () => { + await resetTestDatabase(` + async function processEvent (event) { + await new Promise(resolve => __jestSetTimeout(() => resolve(), 15000 * Math.random())) + event.properties.timeout = 'no timeout' + return event + } + `) + await resetKafka(extraServerConfig) + await resetTestDatabaseClickhouse(extraServerConfig) + + const startResponse = await startPluginsServer(extraServerConfig, makePiscina) + server = startResponse.server + stopServer = startResponse.stop + queue = startResponse.queue + + posthog = createPosthog(server, pluginConfig39) + }) + + afterEach(async () => { + await stopServer() + }) + + test('measure performance', async () => { + console.debug = () => null + + const count = 3000 + + // fill in the queue + function createEvent() { + const uuid = new UUIDT().toString() + posthog.capture('custom event', { name: 'haha', uuid, randomProperty: 'lololo' }) + } + await queue.pause() + for (let i = 0; i < count; i++) { + createEvent() + } + + // hope that 5sec is enough to load kafka with all the events (posthog.capture can't be awaited) + await delay(5000) + await queue.resume() + + console.log('Starting timer') + const startTime = performance.now() + await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count, true) + const timeMs = performance.now() - startTime + console.log('Finished!') + + const n = (n: number) => `${Math.round(n * 100) / 100}` + console.log( + `[Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n( + 1000 / (timeMs / count) + )} events/sec, ${n(timeMs / count)}ms per event)` + ) + + const events = await server.db.fetchEvents() + expect(events[count - 1].properties.upperUuid).toEqual(events[count - 1].properties.uuid.toUpperCase()) + }) +}) From cf004ed2972ec264587eb072aedac4848c57d96d Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 17 Feb 2021 23:48:10 +0100 Subject: [PATCH 46/51] remove broken tests, improve logging of working tests --- benchmarks/clickhouse/e2e.kafka.benchmark.ts | 4 +- .../clickhouse/e2e.kafka.timeout.benchmark.ts | 95 ------------------- .../clickhouse/e2e.timeout.benchmark.ts | 16 ++-- 3 files changed, 12 insertions(+), 103 deletions(-) delete mode 100644 benchmarks/clickhouse/e2e.kafka.timeout.benchmark.ts diff --git a/benchmarks/clickhouse/e2e.kafka.benchmark.ts b/benchmarks/clickhouse/e2e.kafka.benchmark.ts index 0abf7b2d..d7c9eba7 100644 --- a/benchmarks/clickhouse/e2e.kafka.benchmark.ts +++ b/benchmarks/clickhouse/e2e.kafka.benchmark.ts @@ -78,13 +78,13 @@ describe('e2e kafka & clickhouse benchmark', () => { console.log('Starting timer') const startTime = performance.now() - await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count, true) + await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count) const timeMs = performance.now() - startTime console.log('Finished!') const n = (n: number) => `${Math.round(n * 100) / 100}` console.log( - `[Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n( + `ℹ️️ [Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n( 1000 / (timeMs / count) )} events/sec, ${n(timeMs / count)}ms per event)` ) diff --git a/benchmarks/clickhouse/e2e.kafka.timeout.benchmark.ts b/benchmarks/clickhouse/e2e.kafka.timeout.benchmark.ts deleted file mode 100644 index 25106a77..00000000 --- a/benchmarks/clickhouse/e2e.kafka.timeout.benchmark.ts +++ /dev/null @@ -1,95 +0,0 @@ -import { performance } from 'perf_hooks' - -import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics' -import { startPluginsServer } from '../../src/server' -import { ClickHouseEvent, LogLevel, PluginsServerConfig, Queue } from '../../src/types' -import { PluginsServer } from '../../src/types' -import { delay, UUIDT } from '../../src/utils' -import { createPosthog } from '../../src/vm/extensions/posthog' -import { makePiscina } from '../../src/worker/piscina' -import { resetTestDatabaseClickhouse } from '../../tests/helpers/clickhouse' -import { resetKafka } from '../../tests/helpers/kafka' -import { pluginConfig39 } from '../../tests/helpers/plugins' -import { resetTestDatabase } from '../../tests/helpers/sql' -import { delayUntilEventIngested } from '../../tests/shared/process-event' - -jest.setTimeout(600000) // 10min timeout - -const extraServerConfig: Partial = { - KAFKA_ENABLED: true, - KAFKA_HOSTS: process.env.KAFKA_HOSTS || 'kafka:9092', - WORKER_CONCURRENCY: 4, - TASK_TIMEOUT: 5, - PLUGIN_SERVER_INGESTION: true, - KAFKA_CONSUMPTION_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION, - KAFKA_BATCH_PARALELL_PROCESSING: true, - LOG_LEVEL: LogLevel.Log, -} - -describe('e2e kafka & clickhouse timeout benchmark', () => { - async function measurePerformance(code: string): Promise<[PluginsServer, () => Promise]> { - await resetTestDatabase(code) - await resetKafka(extraServerConfig) - await resetTestDatabaseClickhouse(extraServerConfig) - - const { server, stop: stopServer, queue } = await startPluginsServer(extraServerConfig, makePiscina) - - const posthog = createPosthog(server, pluginConfig39) - - console.debug = () => null - - const count = 3000 - - // fill in the queue - function createEvent() { - const uuid = new UUIDT().toString() - posthog.capture('custom event', { name: 'haha', uuid, randomProperty: 'lololo' }) - } - await queue.pause() - for (let i = 0; i < count; i++) { - createEvent() - } - - // hope that 5sec is enough to load kafka with all the events (posthog.capture can't be awaited) - await delay(5000) - await queue.resume() - - console.log('Starting timer') - const startTime = performance.now() - await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count, true) - const timeMs = performance.now() - startTime - console.log('Finished!') - - const n = (n: number) => `${Math.round(n * 100) / 100}` - console.log( - `[Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n( - 1000 / (timeMs / count) - )} events/sec, ${n(timeMs / count)}ms per event)` - ) - - return [server, stopServer] - } - - test.skip('bad delay', async () => { - console.log('Starting "bad delay" test') - // Delay up to 15sec in processEvent, while TASK_TIMEOUT=5 - // Effectively two thirds of the events should time out - const [server, stopServer] = await measurePerformance(` - async function processEvent (event) { - await new Promise(resolve => __jestSetTimeout(() => resolve(), 15000 * Math.random())) - event.properties.timeout = 'no timeout' - return event - } - `) - const events = (await server.db.fetchEvents()) as ClickHouseEvent[] - const passedEvents = events.filter((e) => e.properties.timeout).length - console.log( - `Out of 3000 events: ${passedEvents} took under 5sec, ${ - 3000 - passedEvents - } timed out. This should be a 1:2 ratio.` - ) - - await stopServer() - console.log('Stopping "bad delay" test') - }) -}) diff --git a/benchmarks/clickhouse/e2e.timeout.benchmark.ts b/benchmarks/clickhouse/e2e.timeout.benchmark.ts index 97e5d770..1a574712 100644 --- a/benchmarks/clickhouse/e2e.timeout.benchmark.ts +++ b/benchmarks/clickhouse/e2e.timeout.benchmark.ts @@ -2,7 +2,7 @@ import { performance } from 'perf_hooks' import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics' import { startPluginsServer } from '../../src/server' -import { LogLevel, PluginsServerConfig, Queue } from '../../src/types' +import { ClickHouseEvent, LogLevel, PluginsServerConfig, Queue } from '../../src/types' import { PluginsServer } from '../../src/types' import { delay, UUIDT } from '../../src/utils' import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog' @@ -76,18 +76,22 @@ describe('e2e kafka processing timeout benchmark', () => { console.log('Starting timer') const startTime = performance.now() - await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count, true) + await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count) const timeMs = performance.now() - startTime console.log('Finished!') const n = (n: number) => `${Math.round(n * 100) / 100}` console.log( - `[Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n( + `ℹ️️ [Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n( 1000 / (timeMs / count) )} events/sec, ${n(timeMs / count)}ms per event)` ) - - const events = await server.db.fetchEvents() - expect(events[count - 1].properties.upperUuid).toEqual(events[count - 1].properties.uuid.toUpperCase()) + const events = (await server.db.fetchEvents()) as ClickHouseEvent[] + const passedEvents = events.filter((e) => e.properties.timeout).length + console.log( + `ℹ️ Out of 3000 events: ${passedEvents} took under 5sec, ${ + 3000 - passedEvents + } timed out. This should be a 1:2 ratio.` + ) }) }) From 772716d106b2df96de6d98459a9ba2304570bfe0 Mon Sep 17 00:00:00 2001 From: Michael Matloka Date: Thu, 18 Feb 2021 01:20:50 +0100 Subject: [PATCH 47/51] Clan up vm.ts --- src/vm/vm.ts | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/vm/vm.ts b/src/vm/vm.ts index 4b507e27..fe5bbc57 100644 --- a/src/vm/vm.ts +++ b/src/vm/vm.ts @@ -18,31 +18,28 @@ export async function createPluginConfigVM( indexJs: string, libJs = '' ): Promise { - const source = libJs ? `${libJs};${indexJs}` : indexJs - const { code } = transform(source, { + const rawCode = libJs ? `${libJs};${indexJs}` : indexJs + + const { code } = transform(rawCode, { envName: 'production', - filename: undefined, - cwd: undefined, code: true, - ast: false, - sourceMaps: false, babelrc: false, configFile: false, presets: [['env', { targets: { node: process.versions.node } }]], plugins: [loopTimeout(server), promiseTimeout(server)], }) - // create virtual machine + // Create virtual machine const vm = new VM({ timeout: server.TASK_TIMEOUT * 1000 + 1, sandbox: {}, }) - // our own stuff + // Add PostHog utilities to virtual machine vm.freeze(createConsole(), 'console') vm.freeze(createPosthog(server, pluginConfig), 'posthog') - // exported node packages + // Add non-PostHog utilities to virtual machine vm.freeze(fetch, 'fetch') vm.freeze(createGoogle(), 'google') From a69e15b4c932cbe3fb27cefdd2a6a86f17f4eff4 Mon Sep 17 00:00:00 2001 From: Michael Matloka Date: Thu, 18 Feb 2021 02:20:01 +0100 Subject: [PATCH 48/51] Improve clarity of loopTimeout --- src/vm/transforms/loop-timeout.ts | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/vm/transforms/loop-timeout.ts b/src/vm/transforms/loop-timeout.ts index 8ae6e011..345ffdf9 100644 --- a/src/vm/transforms/loop-timeout.ts +++ b/src/vm/transforms/loop-timeout.ts @@ -2,6 +2,7 @@ // https://medium.com/@bvjebin/js-infinite-loops-killing-em-e1c2f5f2db7f // https://github.com/jsbin/loop-protect/blob/master/lib/index.js +import { PluginObj } from '@babel/core' import * as types from '@babel/types' import { PluginsServer } from '../../types' @@ -73,12 +74,16 @@ const protect = (t: typeof types, timeout: number) => (path: any): void => { body.unshiftContainer('body', inside) } -export const loopTimeout = (server: PluginsServer) => ({ types: t }: { types: typeof types }) => { - return { - visitor: { - WhileStatement: protect(t, server.TASK_TIMEOUT), - ForStatement: protect(t, server.TASK_TIMEOUT), - DoWhileStatement: protect(t, server.TASK_TIMEOUT), - }, +export type BabelPlugin = ({ types: t }: { types: typeof types }) => PluginObj + +export function loopTimeout(server: PluginsServer): BabelPlugin { + return ({ types: t }) => { + return { + visitor: { + WhileStatement: protect(t, server.TASK_TIMEOUT), + ForStatement: protect(t, server.TASK_TIMEOUT), + DoWhileStatement: protect(t, server.TASK_TIMEOUT), + }, + } } } From 26cef384c36deef8551a8f28ebce87522c70d3e0 Mon Sep 17 00:00:00 2001 From: Michael Matloka Date: Thu, 18 Feb 2021 04:12:59 +0100 Subject: [PATCH 49/51] Refactor transforms slightly --- src/vm/transforms/common.ts | 6 +++ src/vm/transforms/loop-timeout.ts | 31 +++++-------- src/vm/transforms/promise-timeout.ts | 69 +++++++++++++--------------- 3 files changed, 49 insertions(+), 57 deletions(-) create mode 100644 src/vm/transforms/common.ts diff --git a/src/vm/transforms/common.ts b/src/vm/transforms/common.ts new file mode 100644 index 00000000..6c60ed2a --- /dev/null +++ b/src/vm/transforms/common.ts @@ -0,0 +1,6 @@ +import { PluginObj } from '@babel/core' +import * as types from '@babel/types' + +import { PluginsServer } from '../../types' + +export type PluginGen = (server: PluginsServer) => (param: { types: typeof types }) => PluginObj diff --git a/src/vm/transforms/loop-timeout.ts b/src/vm/transforms/loop-timeout.ts index 345ffdf9..dd720062 100644 --- a/src/vm/transforms/loop-timeout.ts +++ b/src/vm/transforms/loop-timeout.ts @@ -2,13 +2,12 @@ // https://medium.com/@bvjebin/js-infinite-loops-killing-em-e1c2f5f2db7f // https://github.com/jsbin/loop-protect/blob/master/lib/index.js -import { PluginObj } from '@babel/core' import * as types from '@babel/types' -import { PluginsServer } from '../../types' +import { PluginGen } from './common' const generateBefore = (t: typeof types, id: any) => - t.variableDeclaration('var', [ + t.variableDeclaration('const', [ t.variableDeclarator(id, t.callExpression(t.memberExpression(t.identifier('Date'), t.identifier('now')), [])), ]) @@ -49,10 +48,8 @@ const generateInside = ({ const protect = (t: typeof types, timeout: number) => (path: any): void => { if (!path.node.loc) { - // I don't really know _how_ we get into this state - // but https://jsbin.com/mipesawapi/1/ triggers it - // and the node, I'm guessing after translation, - // doesn't have a line in the code, so this blows up. + // I don't really know _how_ we get into this state, but https://jsbin.com/mipesawapi/1/ triggers it, + // and the node, I'm guessing after translation, doesn't have a line in the code, so this blows up. return } const id = path.scope.generateUidIdentifier('LP') @@ -74,16 +71,10 @@ const protect = (t: typeof types, timeout: number) => (path: any): void => { body.unshiftContainer('body', inside) } -export type BabelPlugin = ({ types: t }: { types: typeof types }) => PluginObj - -export function loopTimeout(server: PluginsServer): BabelPlugin { - return ({ types: t }) => { - return { - visitor: { - WhileStatement: protect(t, server.TASK_TIMEOUT), - ForStatement: protect(t, server.TASK_TIMEOUT), - DoWhileStatement: protect(t, server.TASK_TIMEOUT), - }, - } - } -} +export const loopTimeout: PluginGen = (server) => ({ types: t }) => ({ + visitor: { + WhileStatement: protect(t, server.TASK_TIMEOUT), + ForStatement: protect(t, server.TASK_TIMEOUT), + DoWhileStatement: protect(t, server.TASK_TIMEOUT), + }, +}) diff --git a/src/vm/transforms/promise-timeout.ts b/src/vm/transforms/promise-timeout.ts index 4ba9dda4..3eac2005 100644 --- a/src/vm/transforms/promise-timeout.ts +++ b/src/vm/transforms/promise-timeout.ts @@ -1,46 +1,41 @@ // inspired by: https://github.com/treywood/babel-plugin-bluebird-async-functions/ -import * as types from '@babel/types' -import { PluginsServer } from '../../types' +import { PluginGen } from './common' const REPLACED = Symbol() -export const promiseTimeout = (server: PluginsServer) => ({ types: t }: { types: typeof types }) => { - return { - visitor: { - // changes: bla.then --> __asyncGuard(bla).then - MemberExpression: { - exit(path: any) { - const { node } = path - if ( - node.property && - t.isIdentifier(node.property) && - node.property.name === 'then' && - !node[REPLACED] - ) { - const newCall = t.memberExpression( - t.callExpression(t.identifier('__asyncGuard'), [node.object]), - t.identifier('then') - ) - ;(newCall as any)[REPLACED] = true - path.replaceWith(newCall) - } - }, +export const promiseTimeout: PluginGen = () => ({ types: t }) => ({ + visitor: { + // Turn `bla.then` into `asyncGuard(bla).then` + MemberExpression: { + exit(path: any) { + const { node } = path + if ( + node?.property && + t.isIdentifier(node.property) && + node.property.name === 'then' && + !node[REPLACED] + ) { + const newCall = t.memberExpression( + t.callExpression(t.identifier('__asyncGuard'), [node.object]), + t.identifier('then') + ) + ;(newCall as any)[REPLACED] = true + path.replaceWith(newCall) + } }, + }, - // changes: await bla --> await __asyncGuard(bla) - AwaitExpression: { - exit(path: any) { - const { node } = path - if (node && !node[REPLACED]) { - const newAwait = t.awaitExpression( - t.callExpression(t.identifier('__asyncGuard'), [node.argument]) - ) - ;(newAwait as any)[REPLACED] = true - path.replaceWith(newAwait) - } - }, + // Turn `await bla` into `await __asyncGuard(bla)` + AwaitExpression: { + exit(path: any) { + const { node } = path + if (node && !node[REPLACED]) { + const newAwait = t.awaitExpression(t.callExpression(t.identifier('__asyncGuard'), [node.argument])) + ;(newAwait as any)[REPLACED] = true + path.replaceWith(newAwait) + } }, }, - } -} + }, +}) From 953bd629e1f41ed36ab972eb28fbbeb0cd9161f5 Mon Sep 17 00:00:00 2001 From: Michael Matloka Date: Thu, 18 Feb 2021 04:18:35 +0100 Subject: [PATCH 50/51] Refactor transform call to secureCode func --- src/vm/transforms/index.ts | 20 ++++++++++++++++++++ src/vm/vm.ts | 16 +++------------- 2 files changed, 23 insertions(+), 13 deletions(-) create mode 100644 src/vm/transforms/index.ts diff --git a/src/vm/transforms/index.ts b/src/vm/transforms/index.ts new file mode 100644 index 00000000..25057d09 --- /dev/null +++ b/src/vm/transforms/index.ts @@ -0,0 +1,20 @@ +import { transform } from '@babel/standalone' + +import { PluginsServer } from '../../types' +import { loopTimeout } from './transforms/loop-timeout' +import { promiseTimeout } from './transforms/promise-timeout' + +export function secureCode(rawCode: string, server: PluginsServer): string { + const { code } = transform(rawCode, { + envName: 'production', + code: true, + babelrc: false, + configFile: false, + presets: [['env', { targets: { node: process.versions.node } }]], + plugins: [loopTimeout(server), promiseTimeout(server)], + }) + if (!code) { + throw new Error(`Babel transform gone wrong! Could not secure the following code:\n${rawCode}`) + } + return code +} diff --git a/src/vm/vm.ts b/src/vm/vm.ts index fe5bbc57..de4edd2d 100644 --- a/src/vm/vm.ts +++ b/src/vm/vm.ts @@ -1,4 +1,3 @@ -import { transform } from '@babel/standalone' import { randomBytes } from 'crypto' import fetch from 'node-fetch' import { VM } from 'vm2' @@ -9,8 +8,7 @@ import { createConsole } from './extensions/console' import { createGoogle } from './extensions/google' import { createPosthog } from './extensions/posthog' import { createStorage } from './extensions/storage' -import { loopTimeout } from './transforms/loop-timeout' -import { promiseTimeout } from './transforms/promise-timeout' +import { secureCode } from './transforms' export async function createPluginConfigVM( server: PluginsServer, @@ -19,15 +17,7 @@ export async function createPluginConfigVM( libJs = '' ): Promise { const rawCode = libJs ? `${libJs};${indexJs}` : indexJs - - const { code } = transform(rawCode, { - envName: 'production', - code: true, - babelrc: false, - configFile: false, - presets: [['env', { targets: { node: process.versions.node } }]], - plugins: [loopTimeout(server), promiseTimeout(server)], - }) + const securedCode = secureCode(rawCode, server) // Create virtual machine const vm = new VM({ @@ -87,7 +77,7 @@ export async function createPluginConfigVM( let exports = {}; // the plugin JS code - ${code}; + ${securedCode}; `) const responseVar = `__pluginDetails${randomBytes(64).toString('hex')}` From 190930bd21eb94c3d8c0331ebfbc2d29bb062639 Mon Sep 17 00:00:00 2001 From: Michael Matloka Date: Thu, 18 Feb 2021 05:22:17 +0100 Subject: [PATCH 51/51] Add secureCode tests --- src/utils.ts | 8 +++ src/vm/transforms/index.ts | 4 +- tests/transforms.test.ts | 117 +++++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 tests/transforms.test.ts diff --git a/src/utils.ts b/src/utils.ts index b8f081bb..73f7ce30 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -343,3 +343,11 @@ export function groupIntoBatches(array: T[], batchSize: number): T[][] { } return batches } + +/** Template literal function, to standardize JS code used internally to form without extraneous indentation. */ +export function code(strings: TemplateStringsArray): string { + const stringsConcat = strings.join('…') + const indentation = stringsConcat.match(/^\n([ ]*)/)?.[1].length ?? 0 + const dedentedCode = stringsConcat.replace(new RegExp(`^[ ]{${indentation}}`, 'gm'), '') + return dedentedCode.trim() +} diff --git a/src/vm/transforms/index.ts b/src/vm/transforms/index.ts index 25057d09..2a924218 100644 --- a/src/vm/transforms/index.ts +++ b/src/vm/transforms/index.ts @@ -1,8 +1,8 @@ import { transform } from '@babel/standalone' import { PluginsServer } from '../../types' -import { loopTimeout } from './transforms/loop-timeout' -import { promiseTimeout } from './transforms/promise-timeout' +import { loopTimeout } from './loop-timeout' +import { promiseTimeout } from './promise-timeout' export function secureCode(rawCode: string, server: PluginsServer): string { const { code } = transform(rawCode, { diff --git a/tests/transforms.test.ts b/tests/transforms.test.ts new file mode 100644 index 00000000..16b23c0c --- /dev/null +++ b/tests/transforms.test.ts @@ -0,0 +1,117 @@ +import { createServer } from '../src/server' +import { PluginsServer } from '../src/types' +import { code } from '../src/utils' +import { secureCode } from '../src/vm/transforms' +import { resetTestDatabase } from './helpers/sql' + +let server: PluginsServer +let closeServer: () => Promise +beforeEach(async () => { + ;[server, closeServer] = await createServer() + await resetTestDatabase(`const processEvent = event => event`) +}) +afterEach(() => { + closeServer() +}) + +describe('secureCode', () => { + it('secures awaits by wrapping promises in __asyncGuard', () => { + const rawCode = code` + async function x() { + await console.log() + } + ` + + const securedCode = secureCode(rawCode, server) + + expect(securedCode).toStrictEqual(code` + "use strict"; + + async function x() { + await __asyncGuard(console.log()); + } + `) + }) + + it('secures then calls by wrapping promises in __asyncGuard', () => { + const rawCode = code` + async function x() {} + x.then(() => null) + ` + + const securedCode = secureCode(rawCode, server) + + expect(securedCode).toStrictEqual(code` + "use strict"; + + async function x() {} + + __asyncGuard(x).then(() => null); + `) + }) + + it('secures block for loops with timeouts', () => { + const rawCode = code` + for (let i = 0; i < i + 1; i++) { + console.log(i) + } + ` + + const securedCode = secureCode(rawCode, server) + + expect(securedCode).toStrictEqual(code` + "use strict"; + + const _LP = Date.now(); + + for (let i = 0; i < i + 1; i++) { + if (Date.now() - _LP > 30000) throw new Error("Script execution timed out after looping for 30 seconds on line 1:0"); + console.log(i); + } + `) + }) + + it('secures inline for loops with timeouts', () => { + const rawCode = code` + for (let i = 0; i < i + 1; i++) console.log(i) + ` + + const securedCode = secureCode(rawCode, server) + + expect(securedCode).toStrictEqual(code` + "use strict"; + + const _LP = Date.now(); + + for (let i = 0; i < i + 1; i++) { + if (Date.now() - _LP > 30000) throw new Error("Script execution timed out after looping for 30 seconds on line 1:0"); + console.log(i); + } + `) + }) + + it('secures block for loops with timeouts avoiding _LP collision', () => { + const rawCode = code` + const _LP = 0 + + for (let i = 0; i < i + 1; i++) { + console.log(i) + } + ` + + const securedCode = secureCode(rawCode, server) + + expect(securedCode).toStrictEqual(code` + "use strict"; + + const _LP = 0; + + const _LP2 = Date.now(); + + for (let i = 0; i < i + 1; i++) { + if (Date.now() - _LP2 > 30000) throw new Error("Script execution timed out after looping for 30 seconds on line 3:0"); + console.log(i); + } + `) + }) +})