Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Commit

Permalink
[wip] reload test
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusandra committed Mar 1, 2021
1 parent 09db4ac commit 8484c53
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 6 deletions.
7 changes: 7 additions & 0 deletions src/plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ export async function setupPlugins(server: PluginsServer): Promise<void> {
}
}

export async function checkReloadPlugins(server: PluginsServer): Promise<void> {
for (const [, pluginConfig] of server.pluginConfigs) {
pluginConfig.vm = null
await loadPlugin(server, pluginConfig)
}
}

async function loadPlugin(server: PluginsServer, pluginConfig: PluginConfig): Promise<boolean> {
const { plugin } = pluginConfig

Expand Down
6 changes: 3 additions & 3 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import * as tar from 'tar-stream'
import * as zlib from 'zlib'

import { status } from './status'
import { LogLevel, PluginsServer, PluginsServerConfig, TimestampFormat } from './types'
import { LogLevel, PluginsServerConfig, TimestampFormat } from './types'

/** Time until autoexit (due to error) gives up on graceful exit and kills the process right away. */
const GRACEFUL_EXIT_PERIOD_SECONDS = 5
Expand Down Expand Up @@ -388,12 +388,12 @@ export async function createRedis(serverConfig: PluginsServerConfig): Promise<Re
}

export async function createPubSub(
server: PluginsServer,
serverConfig: PluginsServerConfig,
channel: string,
callback: (message: any) => Promise<void> | void
): Promise<Redis.Redis> {
// use one extra connection for redis pubsub
const pubSub = await createRedis(server)
const pubSub = await createRedis(serverConfig)
await pubSub.subscribe(channel)
pubSub.on('message', (c: string, message) => {
if (c === channel) {
Expand Down
3 changes: 2 additions & 1 deletion src/worker/worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ingestEvent } from '../ingestion/ingest-event'
import { initApp } from '../init'
import { runPlugins, runPluginsOnBatch, runPluginTask, setupPlugins } from '../plugins'
import { checkReloadPlugins, runPlugins, runPluginsOnBatch, runPluginTask, setupPlugins } from '../plugins'
import { createServer } from '../server'
import { status } from '../status'
import { PluginsServerConfig } from '../types'
Expand All @@ -18,6 +18,7 @@ export async function createWorker(config: PluginsServerConfig, threadId: number

const pubSub = await createPubSub(server, server.PLUGINS_RELOAD_PUBSUB_CHANNEL, () => {
status.info('⚡', 'Reloading plugins inside a worker!')
void checkReloadPlugins(server)
})

function closeWorker() {
Expand Down
55 changes: 53 additions & 2 deletions tests/postgres/worker.test.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { PluginEvent } from '@posthog/plugin-scaffold/src/types'
import { Pool } from 'pg'

import Client from '../../src/celery/client'
import { getDefaultConfig } from '../../src/config'
import { startPluginsServer } from '../../src/server'
import { LogLevel } from '../../src/types'
import { delay, UUIDT } from '../../src/utils'
import { createRedis, delay, UUIDT } from '../../src/utils'
import { makePiscina } from '../../src/worker/piscina'
import { resetTestDatabase } from '../helpers/sql'
import { setupPiscina } from '../helpers/worker'

jest.mock('../../src/sql')
jest.setTimeout(600000) // 600 sec timeout

function createEvent(index = 0): PluginEvent {
Expand All @@ -23,6 +24,8 @@ function createEvent(index = 0): PluginEvent {
}
}

const config = getDefaultConfig()

test('piscina worker test', async () => {
const workerThreads = 2
const testCode = `
Expand Down Expand Up @@ -211,3 +214,51 @@ test('pause the queue if too many tasks', async () => {
await pluginsServer.server.redisPool.release(redis)
await pluginsServer.stop()
})

test('test reloads', async () => {
const workerThreads = 2
const testCode = `
async function processEvent (event, meta) {
await new Promise(resolve => __jestSetTimeout(resolve, 500))
event.properties["somewhere"] = "over the rainbow";
return event
}
async function runEveryDay (meta) {
return 4
}
`
const testCode2 = `
async function processEvent (event, meta) {
await new Promise(resolve => __jestSetTimeout(resolve, 500))
event.properties["somewhere"] = "under the double rainbow";
return event
}
async function runEveryDay (meta) {
return 4
}
`
await resetTestDatabase(testCode)
const piscina = setupPiscina(workerThreads, 2)

const processEvent = (event: PluginEvent) => piscina.runTask({ task: 'processEvent', args: { event } })
const event = await processEvent(createEvent())
expect(event.properties['somewhere']).toBe('over the rainbow')

// change the code
const db = new Pool({ connectionString: config.DATABASE_URL })
await db.query('UPDATE posthog_plugin SET plugin_type=$1, archive=NULL, source=$2', ['source', testCode2])
await db.end()

// publish the reload event
const redis = await createRedis(config)
await redis.publish(config.PLUGINS_RELOAD_PUBSUB_CHANNEL, 'hurray')

// wait for it to kick in
await delay(5000)

// try processing the event again
const event2 = await processEvent(createEvent())
expect(event2.properties['somewhere']).toBe('under the double rainbow')

await piscina.destroy()
})

0 comments on commit 8484c53

Please sign in to comment.