Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Plugin storage #87

Merged
merged 21 commits into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
env:
DATABASE_URL: 'postgres://postgres:postgres@localhost:${{ job.services.postgres.ports[5432] }}/postgres'
REDIS_URL: 'redis://localhost'
run: yarn task:db-init
run: yarn task:init-test-db

- name: Test with Jest
env:
Expand Down Expand Up @@ -113,7 +113,7 @@ jobs:
env:
DATABASE_URL: 'postgres://postgres:postgres@localhost:${{ job.services.postgres.ports[5432] }}/postgres'
REDIS_URL: 'redis://localhost'
run: yarn task:db-init
run: yarn task:init-test-db

- name: Run benchmarks
env:
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/worker.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { makePiscina } from '../src/worker/piscina'
import { defaultConfig } from '../src/config'
import { PluginEvent } from '@posthog/plugin-scaffold/src/types'
import { performance } from 'perf_hooks'
import { mockJestWithIndex } from '../tests/helpers/plugins'
import * as os from 'os'
import { LogLevel } from '../src/types'
import { resetTestDatabase } from '../tests/helpers/sql'

jest.mock('../src/sql')
jest.setTimeout(600000) // 600 sec timeout
Expand Down Expand Up @@ -61,13 +61,12 @@ async function processCountEvents(piscina: ReturnType<typeof makePiscina>, count
}
}

function setupPiscina(workers: number, code: string, tasksPerWorker: number) {
function setupPiscina(workers: number, tasksPerWorker: number) {
return makePiscina({
...defaultConfig,
WORKER_CONCURRENCY: workers,
TASKS_PER_WORKER: tasksPerWorker,
LOG_LEVEL: LogLevel.Log,
__jestMock: mockJestWithIndex(code),
})
}

Expand Down Expand Up @@ -119,6 +118,7 @@ test('piscina worker benchmark', async () => {

const results: Array<Record<string, string | number>> = []
for (const { testName, events: _events, testCode } of tests) {
await resetTestDatabase(testCode)
const events = isLightDevRun ? _events / 10 : _events
for (const batchSize of [1, 10, 100].filter((size) => size <= events)) {
const result: Record<string, any> = {
Expand All @@ -128,7 +128,7 @@ test('piscina worker benchmark', async () => {
batchSize,
}
for (const threads of workerThreads) {
const piscina = setupPiscina(threads, testCode, 100)
const piscina = setupPiscina(threads, 100)

// warmup
await processCountEvents(piscina, threads * 4)
Expand Down
2 changes: 1 addition & 1 deletion jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ module.exports = {
testEnvironment: 'node',
clearMocks: true,
coverageProvider: 'v8',
setupFilesAfterEnv: ['./jest.setup.pg-mock.js', './jest.setup.kafka-mock.js', './jest.setup.fetch-mock.js'],
setupFilesAfterEnv: ['./jest.setup.kafka-mock.js', './jest.setup.fetch-mock.js'],
testMatch: ['<rootDir>/tests/**/*.test.ts', '<rootDir>/benchmarks/**/*.benchmark.ts'],
}
8 changes: 0 additions & 8 deletions jest.setup.pg-mock.js

This file was deleted.

8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{
"name": "posthog-plugin-server",
"version": "0.6.0",
"version": "0.6.1",
"description": "PostHog Plugin Server",
"types": "dist/src/index.d.ts",
"main": "dist/src/index.js",
"scripts": {
"test": "jest --testPathIgnorePatterns='benchmarks/'",
"test": "jest --runInBand tests/*.test.ts",
"benchmark": "node --expose-gc node_modules/.bin/jest --runInBand benchmarks/",
"start": "yarn start:dev",
"start:dist": "node dist/src/index.js --base-dir ../posthog",
Expand All @@ -18,7 +18,7 @@
"prettier": "prettier --write .",
"prettier:check": "prettier --check .",
"prepublishOnly": "yarn test && yarn build",
"task:db-init": "ts-node ./tasks/db-init.ts"
"task:init-test-db": "NODE_ENV=test ts-node ./tasks/init-test-db.ts"
},
"bin": {
"posthog-plugin-server": "bin/posthog-plugin-server"
Expand Down Expand Up @@ -52,7 +52,7 @@
"@babel/core": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"@babel/preset-typescript": "^7.8.3",
"@posthog/plugin-scaffold": "0.2.5",
"@posthog/plugin-scaffold": "0.2.6",
"@types/adm-zip": "^0.4.33",
"@types/ioredis": "^4.17.7",
"@types/jest": "^26.0.15",
Expand Down
1 change: 0 additions & 1 deletion src/celery/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ export default class RedisBroker implements Pausable {
if (body) {
callback(body)
}
Promise.resolve()
return body
})
.then((body) => {
Expand Down
1 change: 0 additions & 1 deletion src/celery/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ export class Worker extends Base implements Queue {
*
* @todo implement here
*/
// eslint-disable-next-line class-methods-use-this
public async stop(): Promise<void> {
const taskCount = this.activeTasks.size
if (taskCount > 0) {
Expand Down
4 changes: 3 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ export const defaultConfig = overrideWithEnv(getDefaultConfig())
export const configHelp = getConfigHelp()

export function getDefaultConfig(): PluginsServerConfig {
const isTestEnv = process.env.NODE_ENV === 'test'

return {
CELERY_DEFAULT_QUEUE: 'celery',
DATABASE_URL: 'postgres://localhost:5432/posthog',
DATABASE_URL: isTestEnv ? 'postgres://localhost:5432/posthog_test' : 'postgres://localhost:5432/posthog',
KAFKA_ENABLED: false,
KAFKA_HOSTS: null,
KAFKA_CLIENT_CERT_B64: null,
Expand Down
35 changes: 35 additions & 0 deletions src/extensions/storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { PluginConfig, PluginsServer } from '../types'
import { StorageExtension } from '@posthog/plugin-scaffold'

export function createStorage(server: PluginsServer, pluginConfig: PluginConfig): StorageExtension {
const get = async function (key: string, defaultValue: unknown): Promise<unknown> {
const result = await server.db.query(
'SELECT * FROM posthog_pluginstorage WHERE "plugin_config_id"=$1 AND "key"=$2 LIMIT 1',
[pluginConfig.id, key]
)
return result?.rows.length === 1 ? JSON.parse(result.rows[0].value) : defaultValue
}
const set = async function (key: string, value: unknown): Promise<void> {
if (typeof value === 'undefined') {
await server.db.query('DELETE FROM posthog_pluginstorage WHERE "plugin_config_id"=$1 AND "key"=$2', [
pluginConfig.id,
key,
])
} else {
await server.db.query(
`
INSERT INTO posthog_pluginstorage ("plugin_config_id", "key", "value")
VALUES ($1, $2, $3)
ON CONFLICT ("plugin_config_id", "key")
DO UPDATE SET value = $3
`,
[pluginConfig.id, key, JSON.stringify(value)]
)
}
}

return {
get,
set,
}
}
4 changes: 2 additions & 2 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold'
import { defaultConfig } from './config'
import Piscina from 'piscina'
import * as Sentry from '@sentry/node'
import { areWeTestingWithJest, delay } from './utils'
import { delay } from './utils'
import { StatsD } from 'hot-shots'
import { EventsProcessor } from './ingestion/process-event'
import { status } from './status'
Expand All @@ -34,7 +34,7 @@ export async function createServer(
console.error(error)
})
.on('ready', () => {
if (!areWeTestingWithJest()) {
if (process.env.NODE_ENV !== 'test') {
status.info('✅', 'Connected to Redis!')
}
})
Expand Down
13 changes: 0 additions & 13 deletions src/sql.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,20 @@
import { areWeTestingWithJest } from './utils'
import { Plugin, PluginAttachmentDB, PluginConfig, PluginConfigId, PluginError, PluginsServer } from './types'

// This nice "mocking" system with areWeTestingWithJest is used since we want to mock data in worker threads.
// Jest mocks don't penetrate that far. Improvements welcome.

export async function getPluginRows(server: PluginsServer): Promise<Plugin[]> {
if (areWeTestingWithJest() && server.__jestMock?.getPluginRows) {
return server.__jestMock?.getPluginRows
}
const { rows: pluginRows }: { rows: Plugin[] } = await server.db.query(
"SELECT * FROM posthog_plugin WHERE id in (SELECT plugin_id FROM posthog_pluginconfig WHERE enabled='t' GROUP BY plugin_id)"
)
return pluginRows
}

export async function getPluginAttachmentRows(server: PluginsServer): Promise<PluginAttachmentDB[]> {
if (areWeTestingWithJest() && server.__jestMock?.getPluginAttachmentRows) {
return server.__jestMock?.getPluginAttachmentRows
}
const { rows }: { rows: PluginAttachmentDB[] } = await server.db.query(
"SELECT * FROM posthog_pluginattachment WHERE plugin_config_id in (SELECT id FROM posthog_pluginconfig WHERE enabled='t')"
)
return rows
}

export async function getPluginConfigRows(server: PluginsServer): Promise<PluginConfig[]> {
if (areWeTestingWithJest() && server.__jestMock?.getPluginConfigRows) {
return server.__jestMock?.getPluginConfigRows
}
const { rows }: { rows: PluginConfig[] } = await server.db.query(
"SELECT * FROM posthog_pluginconfig WHERE enabled='t'"
)
Expand Down
8 changes: 2 additions & 6 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ export interface PluginsServerConfig extends Record<string, any> {
STATSD_PORT: number
STATSD_PREFIX: string
SCHEDULE_LOCK_TTL: number

__jestMock?: {
getPluginRows: Plugin[]
getPluginConfigRows: PluginConfig[]
getPluginAttachmentRows: PluginAttachmentDB[]
}
}

export interface PluginsServer extends PluginsServerConfig {
Expand Down Expand Up @@ -90,6 +84,8 @@ export interface Plugin {
archive: Buffer | null
source?: string
error?: PluginError
from_json?: boolean
from_web?: boolean
}

export interface PluginConfig {
Expand Down
4 changes: 0 additions & 4 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ export function cloneObject<T extends any | any[]>(obj: T): T {
return clone as T
}

export function areWeTestingWithJest(): boolean {
return Boolean(process.env.JEST_WORKER_ID)
}

/** LUT of byte value to hexadecimal representation. For UUID stringification. */
const byteToHex: string[] = []
for (let i = 0; i < 256; i++) {
Expand Down
5 changes: 3 additions & 2 deletions src/vm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { createCache } from './extensions/cache'
import { createPosthog } from './extensions/posthog'
import { createGoogle } from './extensions/google'
import { PluginsServer, PluginConfig, PluginConfigVMReponse } from './types'
import { areWeTestingWithJest } from './utils'
import { createStorage } from './extensions/storage'

export async function createPluginConfigVM(
server: PluginsServer,
Expand All @@ -25,7 +25,7 @@ export async function createPluginConfigVM(
vm.freeze(fetch, 'fetch')
vm.freeze(createGoogle(), 'google')

if (areWeTestingWithJest()) {
if (process.env.NODE_ENV === 'test') {
vm.freeze(setTimeout, '__jestSetTimeout')
}
vm.freeze(
Expand All @@ -37,6 +37,7 @@ export async function createPluginConfigVM(
),
config: pluginConfig.config,
attachments: pluginConfig.attachments,
storage: createStorage(server, pluginConfig),
},
'__pluginHostMeta'
)
Expand Down
6 changes: 1 addition & 5 deletions src/worker/piscina.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
const Sentry = require('@sentry/node')
const { isMainThread, threadId } = require('worker_threads')

function areWeTestingWithJest() {
return Boolean(process.env.JEST_WORKER_ID)
}

if (isMainThread) {
const Piscina = require('piscina')
const { createConfig } = require('./config')
Expand All @@ -20,7 +16,7 @@ if (isMainThread) {
},
}
} else {
if (areWeTestingWithJest()) {
if (process.env.NODE_ENV === 'test') {
require('ts-node').register()
}

Expand Down
40 changes: 28 additions & 12 deletions tasks/db-init.ts → tasks/init-test-db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@ async function task() {
await server.db.query(createPlugin)
await server.db.query(createPluginConfig)
await server.db.query(createPluginAttachment)
await server.db.query(createPluginStorage)

await closeServer()
}

const ifNotExists = 'IF NOT EXISTS'

const createTeam = `
CREATE TABLE ${ifNotExists} posthog_team
CREATE TABLE posthog_team
(
id serial NOT NULL
CONSTRAINT posthog_team_pkey
PRIMARY KEY
PRIMARY KEY,
name varchar(200)
);
`

const createPlugin = `
CREATE TABLE ${ifNotExists} posthog_plugin
CREATE TABLE posthog_plugin
(
id serial NOT NULL
CONSTRAINT posthog_plugin_pkey
Expand All @@ -36,12 +36,14 @@ const createPlugin = `
archive bytea,
from_json boolean NOT NULL,
from_web boolean NOT NULL,
error jsonb
error jsonb,
plugin_type varchar(200),
source text
);
`

const createPluginAttachment = `
CREATE TABLE ${ifNotExists} posthog_pluginattachment
CREATE TABLE posthog_pluginattachment
(
id serial NOT NULL
CONSTRAINT posthog_pluginattachment_pkey
Expand All @@ -61,15 +63,15 @@ const createPluginAttachment = `
DEFERRABLE INITIALLY DEFERRED
);

CREATE INDEX ${ifNotExists} posthog_pluginattachment_plugin_config_id_cc94a1b9
CREATE INDEX posthog_pluginattachment_plugin_config_id_cc94a1b9
ON posthog_pluginattachment (plugin_config_id);

CREATE INDEX ${ifNotExists} posthog_pluginattachment_team_id_415eacc7
CREATE INDEX posthog_pluginattachment_team_id_415eacc7
ON posthog_pluginattachment (team_id);
`

const createPluginConfig = `
CREATE TABLE ${ifNotExists} posthog_pluginconfig
CREATE TABLE posthog_pluginconfig
(
id serial NOT NULL
CONSTRAINT posthog_pluginconfig_pkey
Expand All @@ -88,11 +90,25 @@ const createPluginConfig = `
error jsonb
);

CREATE INDEX ${ifNotExists} posthog_pluginconfig_team_id_71185766
CREATE INDEX posthog_pluginconfig_team_id_71185766
ON posthog_pluginconfig (team_id);

CREATE INDEX ${ifNotExists} posthog_pluginconfig_plugin_id_d014ca1c
CREATE INDEX posthog_pluginconfig_plugin_id_d014ca1c
ON posthog_pluginconfig (plugin_id);
`

const createPluginStorage = `
CREATE TABLE posthog_pluginstorage (
id serial NOT NULL CONSTRAINT posthog_pluginstorage_pkey PRIMARY KEY,
key varchar(200) NOT NULL,
value text,
plugin_config_id integer NOT NULL CONSTRAINT posthog_pluginstorag_plugin_config_id_6744363a_fk_posthog_p
REFERENCES posthog_pluginconfig DEFERRABLE INITIALLY DEFERRED
);

CREATE INDEX posthog_pluginstorage_plugin_config_id_6744363a ON posthog_pluginstorage (plugin_config_id);

CREATE UNIQUE INDEX posthog_unique_plugin_storage_key ON posthog_pluginstorage (plugin_config_id int4_ops, KEY text_ops);
`

task()
Loading