Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Add exportEvents function #408

Merged
merged 28 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
51f9921
export events via vm upgrade
mariusandra May 22, 2021
1c5a6bc
cleaner exportEvents upgrade, add RetryError
mariusandra May 22, 2021
a10a0a0
add basic tests
mariusandra May 22, 2021
40b120d
test more things, fix buffer length issue
mariusandra May 22, 2021
cc3ed77
fix type
mariusandra May 22, 2021
fa4e15f
add missing vm method
mariusandra May 22, 2021
cc05f10
add plugin scaffold to imports
mariusandra May 23, 2021
b2919e4
Use JSDoc style for tooltips and fix onEvent typing
Twixes May 25, 2021
a1e3b92
Merge branch 'master' into export-events
Twixes May 25, 2021
593046a
Merge branch 'master' into export-events
mariusandra May 25, 2021
a5b24ee
stringClamp
mariusandra May 25, 2021
e3aa428
remove dead code
mariusandra May 25, 2021
1ce0b53
add consts
mariusandra May 25, 2021
5861232
log locally
mariusandra May 25, 2021
7087f35
better log
mariusandra May 25, 2021
9ae5692
it's a hub now
mariusandra May 25, 2021
f4a128c
less events in benchmark to hopefully deflake a test
mariusandra May 25, 2021
bd23922
Merge branch 'master' into export-events
mariusandra May 25, 2021
5986cc6
fix type bug
mariusandra May 25, 2021
dc24f51
Merge branch 'master' into export-events
mariusandra May 25, 2021
f7da25e
fix awkward bug
mariusandra May 25, 2021
2b2d862
add statsd for export event jobs
mariusandra May 25, 2021
2af9ee2
Merge branch 'master' into export-events
mariusandra May 25, 2021
ac43c74
typefix and rename
mariusandra May 25, 2021
3c4589e
fix ! in test
mariusandra May 25, 2021
c00ab8f
flush on teardown
mariusandra May 25, 2021
839464e
config as a string, as it should
mariusandra May 26, 2021
5ee6d36
Merge branch 'master' into export-events
mariusandra May 26, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/vm/memory.benchmark.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { PluginEvent } from '@posthog/plugin-scaffold/src/types'

import { Plugin, PluginConfig, PluginConfigVMReponse } from '../../src/types'
import { Plugin, PluginConfig, PluginConfigVMResponse } from '../../src/types'
import { createHub } from '../../src/utils/db/hub'
import { createPluginConfigVM } from '../../src/worker/vm/vm'
import { commonOrganizationId } from '../../tests/helpers/plugins'
Expand Down Expand Up @@ -77,7 +77,7 @@ test('test vm memory usage', async () => {
const usedAtStart = getUsed()

let used = usedAtStart
const vms: PluginConfigVMReponse[] = []
const vms: PluginConfigVMResponse[] = []

for (let i = 0; i < numVMs; i++) {
const vm = await createPluginConfigVM(hub, mockConfig, indexJs)
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"@maxmind/geoip2-node": "^2.3.1",
"@posthog/clickhouse": "^1.7.0",
"@posthog/piscina": "^2.2.0-posthog",
"@posthog/plugin-contrib": "^0.0.3",
"@posthog/plugin-contrib": "^0.0.5",
"@sentry/node": "^5.29.0",
"@sentry/tracing": "^5.29.0",
"@types/lru-cache": "^5.1.0",
Expand Down Expand Up @@ -83,7 +83,7 @@
},
"devDependencies": {
"@babel/cli": "^7.13.0",
"@posthog/plugin-scaffold": "0.5.0",
"@posthog/plugin-scaffold": "0.10.0",
"@types/adm-zip": "^0.4.33",
"@types/babel__standalone": "^7.1.3",
"@types/generic-pool": "^3.1.9",
Expand Down
31 changes: 20 additions & 11 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import ClickHouse from '@posthog/clickhouse'
import { PluginAttachment, PluginConfigSchema, PluginEvent, Properties } from '@posthog/plugin-scaffold'
import { Meta, PluginAttachment, PluginConfigSchema, PluginEvent, Properties } from '@posthog/plugin-scaffold'
import { Pool as GenericPool } from 'generic-pool'
import { StatsD } from 'hot-shots'
import { Redis } from 'ioredis'
Expand Down Expand Up @@ -293,18 +293,27 @@ export type WorkerMethods = {
ingestEvent: (event: PluginEvent) => Promise<IngestEventResponse>
}

export interface PluginConfigVMReponse {
export type VMMethods = {
setupPlugin?: () => Promise<void>
teardownPlugin?: () => Promise<void>
onEvent?: (event: PluginEvent) => Promise<void>
onSnapshot?: (event: PluginEvent) => Promise<void>
exportEvents?: (events: PluginEvent[]) => Promise<void>
processEvent?: (event: PluginEvent) => Promise<PluginEvent>
// DEPRECATED
processEventBatch?: (batch: PluginEvent[]) => Promise<PluginEvent[]>
}

export interface PluginConfigVMResponse {
vm: VM
methods: {
setupPlugin?: () => Promise<void>
teardownPlugin?: () => Promise<void>
onEvent?: (event: PluginEvent) => Promise<void>
onSnapshot?: (event: PluginEvent) => Promise<void>
processEvent?: (event: PluginEvent) => Promise<PluginEvent>
// DEPRECATED
processEventBatch?: (batch: PluginEvent[]) => Promise<PluginEvent[]>
}
methods: VMMethods
tasks: Record<PluginTaskType, Record<string, PluginTask>>
}

export interface PluginConfigVMInternalResponse<M extends Meta = Meta> {
methods: VMMethods
tasks: Record<PluginTaskType, Record<string, PluginTask>>
meta: M
}

export interface EventUsage {
Expand Down
9 changes: 9 additions & 0 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -566,3 +566,12 @@ export function filterIncrementProperties(incrementProperties: unknown): Record<

return filteredIncrementProperties
}

export function clamp(value: number, min: number, max: number): number {
return value > max ? max : value < min ? min : value
}

export function stringClamp(value: string, def: number, min: number, max: number): number {
const nanToNull = (nr: number): null | number => (isNaN(nr) ? null : nr)
return clamp(nanToNull(parseInt(value)) ?? def, min, max)
}
2 changes: 1 addition & 1 deletion src/worker/plugins/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export async function runProcessEvent(server: Hub, event: PluginEvent): Promise<

try {
returnedEvent = (await processEvent(returnedEvent)) || null
if (returnedEvent.team_id !== teamId) {
if (returnedEvent && returnedEvent.team_id !== teamId) {
returnedEvent.team_id = teamId
throw new IllegalOperationError('Plugin tried to change event.team_id')
}
Expand Down
2 changes: 2 additions & 0 deletions src/worker/vm/imports.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { BigQuery } from '@google-cloud/bigquery'
import * as contrib from '@posthog/plugin-contrib'
import * as scaffold from '@posthog/plugin-scaffold'
import * as AWS from 'aws-sdk'
import crypto from 'crypto'
import * as genericPool from 'generic-pool'
Expand All @@ -17,6 +18,7 @@ export const imports = {
'node-fetch': fetch,
'snowflake-sdk': snowflake,
'@google-cloud/bigquery': { BigQuery },
'@posthog/plugin-scaffold': scaffold,
'@posthog/plugin-contrib': contrib,
'aws-sdk': AWS,
pg: pg,
Expand Down
20 changes: 12 additions & 8 deletions src/worker/vm/lazy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
Hub,
PluginCapabilities,
PluginConfig,
PluginConfigVMReponse,
PluginConfigVMResponse,
PluginLogEntrySource,
PluginLogEntryType,
PluginTask,
Expand All @@ -18,7 +18,7 @@ import { createPluginConfigVM } from './vm'
export class LazyPluginVM {
initialize?: (hub: Hub, pluginConfig: PluginConfig, indexJs: string, logInfo: string) => Promise<void>
failInitialization?: () => void
resolveInternalVm: Promise<PluginConfigVMReponse | null>
resolveInternalVm: Promise<PluginConfigVMResponse | null>

constructor() {
this.resolveInternalVm = new Promise((resolve) => {
Expand Down Expand Up @@ -56,23 +56,27 @@ export class LazyPluginVM {
})
}

async getOnEvent(): Promise<PluginConfigVMReponse['methods']['onEvent'] | null> {
async getExportEvents(): Promise<PluginConfigVMResponse['methods']['exportEvents'] | null> {
return (await this.resolveInternalVm)?.methods.exportEvents || null
}

async getOnEvent(): Promise<PluginConfigVMResponse['methods']['onEvent'] | null> {
return (await this.resolveInternalVm)?.methods.onEvent || null
}

async getOnSnapshot(): Promise<PluginConfigVMReponse['methods']['onSnapshot'] | null> {
async getOnSnapshot(): Promise<PluginConfigVMResponse['methods']['onSnapshot'] | null> {
return (await this.resolveInternalVm)?.methods.onSnapshot || null
}

async getProcessEvent(): Promise<PluginConfigVMReponse['methods']['processEvent'] | null> {
async getProcessEvent(): Promise<PluginConfigVMResponse['methods']['processEvent'] | null> {
return (await this.resolveInternalVm)?.methods.processEvent || null
}

async getProcessEventBatch(): Promise<PluginConfigVMReponse['methods']['processEventBatch'] | null> {
async getProcessEventBatch(): Promise<PluginConfigVMResponse['methods']['processEventBatch'] | null> {
return (await this.resolveInternalVm)?.methods.processEventBatch || null
}

async getTeardownPlugin(): Promise<PluginConfigVMReponse['methods']['teardownPlugin'] | null> {
async getTeardownPlugin(): Promise<PluginConfigVMResponse['methods']['teardownPlugin'] | null> {
return (await this.resolveInternalVm)?.methods.teardownPlugin || null
}

Expand All @@ -87,7 +91,7 @@ export class LazyPluginVM {
private async inferPluginCapabilities(
hub: Hub,
pluginConfig: PluginConfig,
vm: PluginConfigVMReponse
vm: PluginConfigVMResponse
): Promise<void> {
if (!pluginConfig.plugin) {
throw new Error(`'PluginConfig missing plugin: ${pluginConfig}`)
Expand Down
152 changes: 152 additions & 0 deletions src/worker/vm/upgrades/export-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import { createBuffer } from '@posthog/plugin-contrib'
import { Plugin, PluginEvent, PluginMeta, RetryError } from '@posthog/plugin-scaffold'

import { Hub, PluginConfig, PluginConfigVMInternalResponse, PluginTaskType } from '../../../types'
import { status } from '../../../utils/status'
import { stringClamp } from '../../../utils/utils'

const MAXIMUM_RETRIES = 15
const EXPORT_BUFFER_BYTES_MINIMUM = 1
const EXPORT_BUFFER_BYTES_DEFAULT = 1024 * 1024
const EXPORT_BUFFER_BYTES_MAXIMUM = 100 * 1024 * 1024
const EXPORT_BUFFER_SECONDS_MINIMUM = 1
const EXPORT_BUFFER_SECONDS_DEFAULT = 10
const EXPORT_BUFFER_SECONDS_MAXIMUM = 600

type ExportEventsUpgrade = Plugin<{
global: {
exportEventsBuffer: ReturnType<typeof createBuffer>
exportEventsToIgnore: Set<string>
exportEventsWithRetry: (payload: ExportEventsJobPayload, meta: PluginMeta<ExportEventsUpgrade>) => Promise<void>
}
config: {
exportEventsBufferBytes: string
exportEventsBufferSeconds: string
exportEventsToIgnore: string
}
jobs: {
exportEventsWithRetry: ExportEventsJobPayload
}
Comment on lines +27 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing to me: aren't jobs supposed to be callables?

Copy link
Collaborator Author

@mariusandra mariusandra May 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed they are, but the only thing you can actually configure is the payload of a job. So in an effort to reduce boilerplate and not give any wrong ideas, in this jobs: {} object you just specify the payload.

Basically, I think it's just nicer to write

    jobs: {
        exportEventsWithRetry: ExportEventsJobPayload
    }

instead of something with more boilerplate like:

    jobs: {
        exportEventsWithRetry: (payload: ExportEventsJobPayload) => Promise<void>
    }

}>

interface ExportEventsJobPayload extends Record<string, any> {
batch: PluginEvent[]
batchId: number
retriesPerformedSoFar: number
}

/**
* Inject export abstraction code into plugin VM if it has method `exportEvents`:
* - add `global`/`config`/`jobs` stuff specified in the `ExportEventsUpgrade` type above,
* - patch `onEvent` with code to add the event to a buffer.
*/
export function upgradeExportEvents(
hub: Hub,
pluginConfig: PluginConfig,
response: PluginConfigVMInternalResponse<PluginMeta<ExportEventsUpgrade>>
): void {
const { methods, tasks, meta } = response

if (!methods.exportEvents) {
return
}

const uploadBytes = stringClamp(
meta.config.exportEventsBufferBytes,
EXPORT_BUFFER_BYTES_DEFAULT,
EXPORT_BUFFER_BYTES_MINIMUM,
EXPORT_BUFFER_BYTES_MAXIMUM
)
const uploadSeconds = stringClamp(
meta.config.exportEventsBufferSeconds,
EXPORT_BUFFER_SECONDS_DEFAULT,
EXPORT_BUFFER_SECONDS_MINIMUM,
EXPORT_BUFFER_SECONDS_MAXIMUM
)

meta.global.exportEventsToIgnore = new Set(
meta.config.exportEventsToIgnore
? meta.config.exportEventsToIgnore.split(',').map((event: string) => event.trim())
: null
)

meta.global.exportEventsBuffer = createBuffer({
limit: uploadBytes,
timeoutSeconds: uploadSeconds,
onFlush: async (batch) => {
const jobPayload = {
batch,
batchId: Math.floor(Math.random() * 1000000),
retriesPerformedSoFar: 0,
}
// Running the first export code directly, without a job in between
await meta.global.exportEventsWithRetry(jobPayload, meta)
},
})

meta.global.exportEventsWithRetry = async (
payload: ExportEventsJobPayload,
meta: PluginMeta<ExportEventsUpgrade>
) => {
const start = new Date()
try {
await methods.exportEvents?.(payload.batch)
hub.statsd?.timing('plugin.export_events.success', start, {
plugin: pluginConfig.plugin?.name ?? '?',
teamId: pluginConfig.team_id.toString(),
})
} catch (err) {
if (err instanceof RetryError) {
if (payload.retriesPerformedSoFar < MAXIMUM_RETRIES) {
const nextRetrySeconds = 2 ** (payload.retriesPerformedSoFar + 1) * 3
await meta.jobs
.exportEventsWithRetry({ ...payload, retriesPerformedSoFar: payload.retriesPerformedSoFar + 1 })
.runIn(nextRetrySeconds, 'seconds')

status.info(
'🚃',
`Enqueued PluginConfig ${pluginConfig.id} batch ${payload.batchId} for retry #${
payload.retriesPerformedSoFar + 1
} in ${Math.round(nextRetrySeconds)}s`
)
hub.statsd?.increment('plugin.export_events.retry_enqueued', {
retry: `${payload.retriesPerformedSoFar + 1}`,
plugin: pluginConfig.plugin?.name ?? '?',
teamId: pluginConfig.team_id.toString(),
})
} else {
status.info(
'☠️',
`Dropped PluginConfig ${pluginConfig.id} batch ${payload.batchId} after retrying ${payload.retriesPerformedSoFar} times`
)
hub.statsd?.increment('plugin.export_events.retry_dropped', {
retry: `${payload.retriesPerformedSoFar}`,
plugin: pluginConfig.plugin?.name ?? '?',
teamId: pluginConfig.team_id.toString(),
})
}
} else {
throw err
}
}
}

tasks.job['exportEventsWithRetry'] = {
name: 'exportEventsWithRetry',
type: PluginTaskType.Job,
exec: (payload) => meta.global.exportEventsWithRetry(payload as ExportEventsJobPayload, meta),
}

const oldOnEvent = methods.onEvent
methods.onEvent = async (event: PluginEvent) => {
if (!meta.global.exportEventsToIgnore.has(event.event)) {
meta.global.exportEventsBuffer.add(event, JSON.stringify(event).length)
}
await oldOnEvent?.(event)
}

const oldTeardownPlugin = methods.teardownPlugin
methods.teardownPlugin = async () => {
await Promise.all([meta.global.exportEventsBuffer.flush(), oldTeardownPlugin?.()])
}
}
Loading