From 8484c5327182ccc46e807fa77cb6f8f9c2e0a810 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Mon, 1 Mar 2021 23:09:34 +0100 Subject: [PATCH] [wip] reload test --- src/plugins.ts | 7 +++++ src/utils.ts | 6 ++-- src/worker/worker.ts | 3 +- tests/postgres/worker.test.ts | 55 +++++++++++++++++++++++++++++++++-- 4 files changed, 65 insertions(+), 6 deletions(-) diff --git a/src/plugins.ts b/src/plugins.ts index cd24b74db..1b859f467 100644 --- a/src/plugins.ts +++ b/src/plugins.ts @@ -90,6 +90,13 @@ export async function setupPlugins(server: PluginsServer): Promise { } } +export async function checkReloadPlugins(server: PluginsServer): Promise { + for (const [, pluginConfig] of server.pluginConfigs) { + pluginConfig.vm = null + await loadPlugin(server, pluginConfig) + } +} + async function loadPlugin(server: PluginsServer, pluginConfig: PluginConfig): Promise { const { plugin } = pluginConfig diff --git a/src/utils.ts b/src/utils.ts index 2a3a04b36..f234c91e1 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -8,7 +8,7 @@ import * as tar from 'tar-stream' import * as zlib from 'zlib' import { status } from './status' -import { LogLevel, PluginsServer, PluginsServerConfig, TimestampFormat } from './types' +import { LogLevel, PluginsServerConfig, TimestampFormat } from './types' /** Time until autoexit (due to error) gives up on graceful exit and kills the process right away. */ const GRACEFUL_EXIT_PERIOD_SECONDS = 5 @@ -388,12 +388,12 @@ export async function createRedis(serverConfig: PluginsServerConfig): Promise Promise | void ): Promise { // use one extra connection for redis pubsub - const pubSub = await createRedis(server) + const pubSub = await createRedis(serverConfig) await pubSub.subscribe(channel) pubSub.on('message', (c: string, message) => { if (c === channel) { diff --git a/src/worker/worker.ts b/src/worker/worker.ts index 04122d85f..1e76ab3ae 100644 --- a/src/worker/worker.ts +++ b/src/worker/worker.ts @@ -1,6 +1,6 @@ import { ingestEvent } from '../ingestion/ingest-event' import { initApp } from '../init' -import { runPlugins, runPluginsOnBatch, runPluginTask, setupPlugins } from '../plugins' +import { checkReloadPlugins, runPlugins, runPluginsOnBatch, runPluginTask, setupPlugins } from '../plugins' import { createServer } from '../server' import { status } from '../status' import { PluginsServerConfig } from '../types' @@ -18,6 +18,7 @@ export async function createWorker(config: PluginsServerConfig, threadId: number const pubSub = await createPubSub(server, server.PLUGINS_RELOAD_PUBSUB_CHANNEL, () => { status.info('⚡', 'Reloading plugins inside a worker!') + void checkReloadPlugins(server) }) function closeWorker() { diff --git a/tests/postgres/worker.test.ts b/tests/postgres/worker.test.ts index 229e5eaf1..abe4dad22 100644 --- a/tests/postgres/worker.test.ts +++ b/tests/postgres/worker.test.ts @@ -1,14 +1,15 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' +import { Pool } from 'pg' import Client from '../../src/celery/client' +import { getDefaultConfig } from '../../src/config' import { startPluginsServer } from '../../src/server' import { LogLevel } from '../../src/types' -import { delay, UUIDT } from '../../src/utils' +import { createRedis, delay, UUIDT } from '../../src/utils' import { makePiscina } from '../../src/worker/piscina' import { resetTestDatabase } from '../helpers/sql' import { setupPiscina } from '../helpers/worker' -jest.mock('../../src/sql') jest.setTimeout(600000) // 600 sec timeout function createEvent(index = 0): PluginEvent { @@ -23,6 +24,8 @@ function createEvent(index = 0): PluginEvent { } } +const config = getDefaultConfig() + test('piscina worker test', async () => { const workerThreads = 2 const testCode = ` @@ -211,3 +214,51 @@ test('pause the queue if too many tasks', async () => { await pluginsServer.server.redisPool.release(redis) await pluginsServer.stop() }) + +test('test reloads', async () => { + const workerThreads = 2 + const testCode = ` + async function processEvent (event, meta) { + await new Promise(resolve => __jestSetTimeout(resolve, 500)) + event.properties["somewhere"] = "over the rainbow"; + return event + } + async function runEveryDay (meta) { + return 4 + } + ` + const testCode2 = ` + async function processEvent (event, meta) { + await new Promise(resolve => __jestSetTimeout(resolve, 500)) + event.properties["somewhere"] = "under the double rainbow"; + return event + } + async function runEveryDay (meta) { + return 4 + } + ` + await resetTestDatabase(testCode) + const piscina = setupPiscina(workerThreads, 2) + + const processEvent = (event: PluginEvent) => piscina.runTask({ task: 'processEvent', args: { event } }) + const event = await processEvent(createEvent()) + expect(event.properties['somewhere']).toBe('over the rainbow') + + // change the code + const db = new Pool({ connectionString: config.DATABASE_URL }) + await db.query('UPDATE posthog_plugin SET plugin_type=$1, archive=NULL, source=$2', ['source', testCode2]) + await db.end() + + // publish the reload event + const redis = await createRedis(config) + await redis.publish(config.PLUGINS_RELOAD_PUBSUB_CHANNEL, 'hurray') + + // wait for it to kick in + await delay(5000) + + // try processing the event again + const event2 = await processEvent(createEvent()) + expect(event2.properties['somewhere']).toBe('under the double rainbow') + + await piscina.destroy() +})