Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

onEvent & onSnapshot #359

Merged
merged 10 commits into from
May 6, 2021
103 changes: 103 additions & 0 deletions src/main/ingestion-queues/ingest-event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'

import { PluginsServer, WorkerMethods } from '../../types'
import { timeoutGuard } from '../../utils/db/utils'
import { status } from '../../utils/status'

export async function ingestEvent(
server: PluginsServer,
workerMethods: WorkerMethods,
event: PluginEvent,
checkAndPause?: () => void // pause incoming messages if we are slow in getting them out again
): Promise<void> {
const eachEventStartTimer = new Date()
const isSnapshot = event.event === '$snapshot'

let processedEvent: PluginEvent | null = event

checkAndPause?.()

// run processEvent on all events that are not $snapshot
if (!isSnapshot) {
processedEvent = await runInstrumentedFunction({
server,
event,
func: (event) => workerMethods.processEvent(event),
statsKey: 'kafka_queue.single_event',
timeoutMessage: 'Still running plugins on event. Timeout warning after 30 sec!',
})
}

checkAndPause?.()

if (processedEvent) {
await Promise.all([
runInstrumentedFunction({
server,
event: processedEvent,
func: (event) => workerMethods.ingestEvent(event),
statsKey: 'kafka_queue.single_ingestion',
timeoutMessage: 'After 30 seconds still ingesting event',
}),
runInstrumentedFunction({
server,
event: processedEvent,
func: (event) => workerMethods[isSnapshot ? 'onSnapshot' : 'onEvent'](event),
statsKey: `kafka_queue.single_${isSnapshot ? 'on_snapshot' : 'on_event'}`,
timeoutMessage: `After 30 seconds still running ${isSnapshot ? 'onSnapshot' : 'onEvent'}`,
}),
])
}

server.statsd?.timing('kafka_queue.each_event', eachEventStartTimer)

countAndLogEvents()
}

async function runInstrumentedFunction({
server,
timeoutMessage,
event,
func,
statsKey,
}: {
server: PluginsServer
event: PluginEvent
timeoutMessage: string
statsKey: string
func: (event: PluginEvent) => Promise<any>
}): Promise<any> {
const timeout = timeoutGuard(timeoutMessage, {
event: JSON.stringify(event),
})
const timer = new Date()
try {
return await func(event)
} catch (error) {
status.info('🔔', error)
Sentry.captureException(error)
throw error
} finally {
server.statsd?.timing(statsKey, timer)
clearTimeout(timeout)
}
}

let messageCounter = 0
let messageLogDate = 0

function countAndLogEvents(): void {
const now = new Date().valueOf()
messageCounter++
if (now - messageLogDate > 10000) {
status.info(
'🕒',
`Processed ${messageCounter} events${
messageLogDate === 0 ? '' : ` in ${Math.round((now - messageLogDate) / 10) / 100}s`
}`
)
messageCounter = 0
messageLogDate = now
}
}
84 changes: 8 additions & 76 deletions src/main/ingestion-queues/kafka-queue.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,28 @@
import Piscina from '@posthog/piscina'
import { PluginEvent } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import { Consumer, EachBatchPayload, EachMessagePayload, Kafka, KafkaMessage } from 'kafkajs'
import { PluginsServer, Queue } from 'types'
import { Consumer, EachBatchPayload, Kafka, KafkaMessage } from 'kafkajs'
import { PluginsServer, Queue, WorkerMethods } from 'types'

import { timeoutGuard } from '../../utils/db/utils'
import { status } from '../../utils/status'
import { groupIntoBatches, killGracefully, sanitizeEvent } from '../../utils/utils'
import { ingestEvent } from './ingest-event'

export class KafkaQueue implements Queue {
private pluginsServer: PluginsServer
private piscina: Piscina
private kafka: Kafka
private consumer: Consumer
private wasConsumerRan: boolean
private processEvent: (event: PluginEvent) => Promise<PluginEvent>
private ingestEvent: (event: PluginEvent) => Promise<void>

// used for logging aggregate stats to the console
private messageLogDate = 0
private messageCounter = 0

constructor(
pluginsServer: PluginsServer,
piscina: Piscina,
processEvent: (event: PluginEvent) => Promise<any>,
ingestEvent: (event: PluginEvent) => Promise<void>
) {
private workerMethods: WorkerMethods

constructor(pluginsServer: PluginsServer, piscina: Piscina, workerMethods: WorkerMethods) {
this.pluginsServer = pluginsServer
this.piscina = piscina
this.kafka = pluginsServer.kafka!
this.consumer = KafkaQueue.buildConsumer(this.kafka)
this.wasConsumerRan = false
this.processEvent = processEvent
this.ingestEvent = ingestEvent
this.messageLogDate = new Date().valueOf()
this.workerMethods = workerMethods
}

private async eachMessage(message: KafkaMessage): Promise<void> {
Expand All @@ -45,50 +33,7 @@ export class KafkaQueue implements Queue {
site_url: combinedEvent.site_url || null,
ip: combinedEvent.ip || null,
})
await this.eachEvent(event)
}

private async eachEvent(event: PluginEvent): Promise<void> {
const eachEventStartTimer = new Date()

const processingTimeout = timeoutGuard('Still running plugins on event. Timeout warning after 30 sec!', {
event: JSON.stringify(event),
})
const timer = new Date()
let processedEvent: PluginEvent
try {
processedEvent = await this.processEvent(event)
} catch (error) {
status.info('🔔', error)
Sentry.captureException(error)
throw error
} finally {
this.pluginsServer.statsd?.timing('kafka_queue.single_event', timer)
clearTimeout(processingTimeout)
}

// ingest event

if (processedEvent) {
const singleIngestionTimeout = timeoutGuard('After 30 seconds still ingesting event', {
event: JSON.stringify(processedEvent),
})
const singleIngestionTimer = new Date()
try {
await this.ingestEvent(processedEvent)
} catch (error) {
status.info('🔔', error)
Sentry.captureException(error)
throw error
} finally {
this.pluginsServer.statsd?.timing('kafka_queue.single_ingestion', singleIngestionTimer)
clearTimeout(singleIngestionTimeout)
}
}

this.pluginsServer.statsd?.timing('kafka_queue.each_event', eachEventStartTimer)

this.countAndLogEvents()
await ingestEvent(this.pluginsServer, this.workerMethods, event)
}

private async eachBatch({
Expand Down Expand Up @@ -227,17 +172,4 @@ export class KafkaQueue implements Queue {
})
return consumer
}

private countAndLogEvents() {
const now = new Date().valueOf()
this.messageCounter++
if (now - this.messageLogDate > 10000) {
status.info(
'🕒',
`Processed ${this.messageCounter} events in ${Math.round((now - this.messageLogDate) / 10) / 100}s`
)
this.messageCounter = 0
this.messageLogDate = now
}
}
}
34 changes: 15 additions & 19 deletions src/main/ingestion-queues/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@ import Piscina from '@posthog/piscina'
import { PluginEvent } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'

import { IngestEventResponse, PluginsServer, Queue } from '../../types'
import { PluginsServer, Queue, WorkerMethods } from '../../types'
import { status } from '../../utils/status'
import { sanitizeEvent, UUIDT } from '../../utils/utils'
import { CeleryQueue } from './celery-queue'
import { ingestEvent } from './ingest-event'
import { KafkaQueue } from './kafka-queue'

export type WorkerMethods = {
processEvent: (event: PluginEvent) => Promise<PluginEvent | null>
processEventBatch: (event: PluginEvent[]) => Promise<(PluginEvent | null)[]>
ingestEvent: (event: PluginEvent) => Promise<IngestEventResponse>
}

export function pauseQueueIfWorkerFull(
pause: undefined | (() => void | Promise<void>),
server: PluginsServer,
Expand All @@ -30,6 +25,16 @@ export async function startQueue(
workerMethods: Partial<WorkerMethods> = {}
): Promise<Queue> {
const mergedWorkerMethods = {
onEvent: (event: PluginEvent) => {
server.lastActivity = new Date().valueOf()
server.lastActivityType = 'onEvent'
return piscina.runTask({ task: 'onEvent', args: { event } })
},
onSnapshot: (event: PluginEvent) => {
server.lastActivity = new Date().valueOf()
server.lastActivityType = 'onSnapshot'
return piscina.runTask({ task: 'onSnapshot', args: { event } })
},
processEvent: (event: PluginEvent) => {
server.lastActivity = new Date().valueOf()
server.lastActivityType = 'processEvent'
Expand Down Expand Up @@ -85,12 +90,8 @@ function startQueueRedis(server: PluginsServer, piscina: Piscina | undefined, wo
...data,
} as PluginEvent)
try {
pauseQueueIfWorkerFull(() => celeryQueue.pause(), server, piscina)
const processedEvent = await workerMethods.processEvent(event)
if (processedEvent) {
pauseQueueIfWorkerFull(() => celeryQueue.pause(), server, piscina)
await workerMethods.ingestEvent(processedEvent)
}
const checkAndPause = () => pauseQueueIfWorkerFull(() => celeryQueue.pause(), server, piscina)
await ingestEvent(server, workerMethods, event, checkAndPause)
} catch (e) {
Sentry.captureException(e)
}
Expand All @@ -104,12 +105,7 @@ function startQueueRedis(server: PluginsServer, piscina: Piscina | undefined, wo
}

async function startQueueKafka(server: PluginsServer, piscina: Piscina, workerMethods: WorkerMethods): Promise<Queue> {
const kafkaQueue: Queue = new KafkaQueue(
server,
piscina,
(event: PluginEvent) => workerMethods.processEvent(event),
async (event) => void (await workerMethods.ingestEvent(event))
)
const kafkaQueue: Queue = new KafkaQueue(server, piscina, workerMethods)
await kafkaQueue.start()

return kafkaQueue
Expand Down
11 changes: 11 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,23 @@ export interface PluginTask {
exec: (payload?: Record<string, any>) => Promise<any>
}

export type WorkerMethods = {
onEvent: (event: PluginEvent) => Promise<void>
onSnapshot: (event: PluginEvent) => Promise<void>
processEvent: (event: PluginEvent) => Promise<PluginEvent | null>
processEventBatch: (batch: PluginEvent[]) => Promise<(PluginEvent | null)[]>
ingestEvent: (event: PluginEvent) => Promise<IngestEventResponse>
}

export interface PluginConfigVMReponse {
vm: VM
methods: {
setupPlugin: () => Promise<void>
teardownPlugin: () => Promise<void>
onEvent: (event: PluginEvent) => Promise<void>
onSnapshot: (event: PluginEvent) => Promise<void>
processEvent: (event: PluginEvent) => Promise<PluginEvent>
// DEPRECATED
processEventBatch: (batch: PluginEvent[]) => Promise<PluginEvent[]>
}
tasks: Record<PluginTaskType, Record<string, PluginTask>>
Expand Down
44 changes: 42 additions & 2 deletions src/worker/plugins/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,47 @@ import { PluginEvent } from '@posthog/plugin-scaffold'
import { PluginConfig, PluginsServer, PluginTaskType } from '../../types'
import { processError } from '../../utils/db/error'

export async function runPlugins(server: PluginsServer, event: PluginEvent): Promise<PluginEvent | null> {
export async function runOnEvent(server: PluginsServer, event: PluginEvent): Promise<void> {
const pluginsToRun = getPluginsForTeam(server, event.team_id)

await Promise.all(
pluginsToRun.map(async (pluginConfig) => {
const onEvent = await pluginConfig.vm?.getOnEvent()
if (onEvent) {
const timer = new Date()
try {
await onEvent(event)
} catch (error) {
await processError(server, pluginConfig, error, event)
server.statsd?.increment(`plugin.${pluginConfig.plugin?.name}.on_event.ERROR`)
}
server.statsd?.timing(`plugin.${pluginConfig.plugin?.name}.on_event`, timer)
}
})
)
}

export async function runOnSnapshot(server: PluginsServer, event: PluginEvent): Promise<void> {
const pluginsToRun = getPluginsForTeam(server, event.team_id)

await Promise.all(
pluginsToRun.map(async (pluginConfig) => {
const onSnapshot = await pluginConfig.vm?.getOnSnapshot()
if (onSnapshot) {
const timer = new Date()
try {
await onSnapshot(event)
} catch (error) {
await processError(server, pluginConfig, error, event)
server.statsd?.increment(`plugin.${pluginConfig.plugin?.name}.on_event.ERROR`)
}
server.statsd?.timing(`plugin.${pluginConfig.plugin?.name}.on_event`, timer)
}
})
)
}

export async function runProcessEvent(server: PluginsServer, event: PluginEvent): Promise<PluginEvent | null> {
const pluginsToRun = getPluginsForTeam(server, event.team_id)
let returnedEvent: PluginEvent | null = event

Expand Down Expand Up @@ -42,7 +82,7 @@ export async function runPlugins(server: PluginsServer, event: PluginEvent): Pro
return returnedEvent
}

export async function runPluginsOnBatch(server: PluginsServer, batch: PluginEvent[]): Promise<PluginEvent[]> {
export async function runProcessEventBatch(server: PluginsServer, batch: PluginEvent[]): Promise<PluginEvent[]> {
const eventsByTeam = new Map<number, PluginEvent[]>()

for (const event of batch) {
Expand Down
12 changes: 9 additions & 3 deletions src/worker/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types'

import { EnqueuedJob, PluginsServer, PluginTaskType } from '../types'
import { ingestEvent } from './ingestion/ingest-event'
import { runPlugins, runPluginsOnBatch, runPluginTask } from './plugins/run'
import { runOnEvent, runOnSnapshot, runPluginTask, runProcessEvent, runProcessEventBatch } from './plugins/run'
import { loadSchedule, setupPlugins } from './plugins/setup'
import { teardownPlugins } from './plugins/teardown'

Expand All @@ -12,11 +12,17 @@ export const workerTasks: Record<string, TaskRunner> = {
hello: (server, args) => {
return `hello ${args}!`
},
onEvent: (server, args: { event: PluginEvent }) => {
return runOnEvent(server, args.event)
},
onSnapshot: (server, args: { event: PluginEvent }) => {
return runOnSnapshot(server, args.event)
},
processEvent: (server, args: { event: PluginEvent }) => {
return runPlugins(server, args.event)
return runProcessEvent(server, args.event)
},
processEventBatch: (server, args: { batch: PluginEvent[] }) => {
return runPluginsOnBatch(server, args.batch)
return runProcessEventBatch(server, args.batch)
},
runJob: (server, { job }: { job: EnqueuedJob }) => {
return runPluginTask(server, job.type, PluginTaskType.Job, job.pluginConfigId, job.payload)
Expand Down
Loading