-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[plugin-server] onEvent & onSnapshot (PostHog/plugin-server#359)
* add onEvent and onSnapshot worker tasks * run onEvent and onSnapshot in the two queues, disable processEvent for snapshots * fix some tests * onEvent and onSnapshot are methods in the vm * e2e test for onEvent and onSnapshot for postgres/celery * 3 worker jobs per event now * on* tests for clickhouse e2e * fix clickhouse ingestion bug * extract "ingestEvent" and share between kafka and celery
- Loading branch information
1 parent
3e2c956
commit 0e6fbf6
Showing
14 changed files
with
347 additions
and
128 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
import { PluginEvent } from '@posthog/plugin-scaffold' | ||
import * as Sentry from '@sentry/node' | ||
|
||
import { PluginsServer, WorkerMethods } from '../../types' | ||
import { timeoutGuard } from '../../utils/db/utils' | ||
import { status } from '../../utils/status' | ||
|
||
export async function ingestEvent( | ||
server: PluginsServer, | ||
workerMethods: WorkerMethods, | ||
event: PluginEvent, | ||
checkAndPause?: () => void // pause incoming messages if we are slow in getting them out again | ||
): Promise<void> { | ||
const eachEventStartTimer = new Date() | ||
const isSnapshot = event.event === '$snapshot' | ||
|
||
let processedEvent: PluginEvent | null = event | ||
|
||
checkAndPause?.() | ||
|
||
// run processEvent on all events that are not $snapshot | ||
if (!isSnapshot) { | ||
processedEvent = await runInstrumentedFunction({ | ||
server, | ||
event, | ||
func: (event) => workerMethods.processEvent(event), | ||
statsKey: 'kafka_queue.single_event', | ||
timeoutMessage: 'Still running plugins on event. Timeout warning after 30 sec!', | ||
}) | ||
} | ||
|
||
checkAndPause?.() | ||
|
||
if (processedEvent) { | ||
await Promise.all([ | ||
runInstrumentedFunction({ | ||
server, | ||
event: processedEvent, | ||
func: (event) => workerMethods.ingestEvent(event), | ||
statsKey: 'kafka_queue.single_ingestion', | ||
timeoutMessage: 'After 30 seconds still ingesting event', | ||
}), | ||
runInstrumentedFunction({ | ||
server, | ||
event: processedEvent, | ||
func: (event) => workerMethods[isSnapshot ? 'onSnapshot' : 'onEvent'](event), | ||
statsKey: `kafka_queue.single_${isSnapshot ? 'on_snapshot' : 'on_event'}`, | ||
timeoutMessage: `After 30 seconds still running ${isSnapshot ? 'onSnapshot' : 'onEvent'}`, | ||
}), | ||
]) | ||
} | ||
|
||
server.statsd?.timing('kafka_queue.each_event', eachEventStartTimer) | ||
|
||
countAndLogEvents() | ||
} | ||
|
||
async function runInstrumentedFunction({ | ||
server, | ||
timeoutMessage, | ||
event, | ||
func, | ||
statsKey, | ||
}: { | ||
server: PluginsServer | ||
event: PluginEvent | ||
timeoutMessage: string | ||
statsKey: string | ||
func: (event: PluginEvent) => Promise<any> | ||
}): Promise<any> { | ||
const timeout = timeoutGuard(timeoutMessage, { | ||
event: JSON.stringify(event), | ||
}) | ||
const timer = new Date() | ||
try { | ||
return await func(event) | ||
} catch (error) { | ||
status.info('🔔', error) | ||
Sentry.captureException(error) | ||
throw error | ||
} finally { | ||
server.statsd?.timing(statsKey, timer) | ||
clearTimeout(timeout) | ||
} | ||
} | ||
|
||
let messageCounter = 0 | ||
let messageLogDate = 0 | ||
|
||
function countAndLogEvents(): void { | ||
const now = new Date().valueOf() | ||
messageCounter++ | ||
if (now - messageLogDate > 10000) { | ||
status.info( | ||
'🕒', | ||
`Processed ${messageCounter} events${ | ||
messageLogDate === 0 ? '' : ` in ${Math.round((now - messageLogDate) / 10) / 100}s` | ||
}` | ||
) | ||
messageCounter = 0 | ||
messageLogDate = now | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.