Skip to content
This repository has been archived by the owner on Mar 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #5 from PostHog/retry-queue
Browse files Browse the repository at this point in the history
Add retry queue, + refactor
  • Loading branch information
mariusandra authored May 23, 2021
2 parents 1b0c7ec + 7a33677 commit eb67063
Show file tree
Hide file tree
Showing 7 changed files with 665 additions and 99 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
.idea

node_modules/
97 changes: 0 additions & 97 deletions index.js

This file was deleted.

208 changes: 208 additions & 0 deletions index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import { createBuffer } from '@posthog/plugin-contrib'
import { Plugin, PluginMeta, PluginEvent, RetryError } from '@posthog/plugin-scaffold'
import { BigQuery, Table } from '@google-cloud/bigquery'

type BigQueryPlugin = Plugin<{
global: {
bigQueryClient: BigQuery
bigQueryTable: Table

exportEventsBuffer: ReturnType<typeof createBuffer>
exportEventsToIgnore: Set<string>
exportEventsWithRetry: (payload: UploadJobPayload, meta: PluginMeta<BigQueryPlugin>) => Promise<void>
}
config: {
datasetId: string
tableId: string

exportEventsBufferBytes: string
exportEventsBufferSeconds: string
exportEventsToIgnore: string
}
jobs: {
exportEventsWithRetry: UploadJobPayload
}
}>

interface UploadJobPayload {
batch: PluginEvent[]
batchId: number
retriesPerformedSoFar: number
}

export const setupPlugin: BigQueryPlugin['setupPlugin'] = async (meta) => {
const { global, attachments, config } = meta
if (!attachments.googleCloudKeyJson) {
throw new Error('JSON config not provided!')
}
if (!config.datasetId) {
throw new Error('Dataset ID not provided!')
}
if (!config.tableId) {
throw new Error('Table ID not provided!')
}

const credentials = JSON.parse(attachments.googleCloudKeyJson.contents.toString())
global.bigQueryClient = new BigQuery({
projectId: credentials['project_id'],
credentials,
})
global.bigQueryTable = global.bigQueryClient.dataset(config.datasetId).table(config.tableId)

try {
// check if the table exists
await global.bigQueryTable.get()
} catch (error) {
// some other error? abort!
if (!error.message.includes('Not found')) {
throw new Error(error)
}
console.log(`Creating BigQuery Table - ${config.datasetId}:${config.tableId}`)

const schema = [
{ name: 'uuid', type: 'STRING' },
{ name: 'event', type: 'STRING' },
{ name: 'properties', type: 'STRING' },
{ name: 'elements', type: 'STRING' },
{ name: 'set', type: 'STRING' },
{ name: 'set_once', type: 'STRING' },
{ name: 'distinct_id', type: 'STRING' },
{ name: 'team_id', type: 'INT64' },
{ name: 'ip', type: 'STRING' },
{ name: 'site_url', type: 'STRING' },
{ name: 'timestamp', type: 'TIMESTAMP' },
]

try {
await global.bigQueryClient.dataset(config.datasetId).createTable(config.tableId, { schema })
} catch (error) {
// a different worker already created the table
if (!error.message.includes('Already Exists')) {
throw new Error()
}
}
}

setupBufferExportCode(meta, exportEventsToBigQuery)
}

export async function exportEventsToBigQuery(events: PluginEvent[], { global }: PluginMeta<BigQueryPlugin>) {
if (!global.bigQueryTable) {
throw new Error('No BigQuery client initialized!')
}
try {
const rows = events.map((event) => {
const {
event: eventName,
properties,
$set,
$set_once,
distinct_id,
team_id,
site_url,
now,
sent_at,
uuid,
..._discard
} = event
const ip = properties?.['$ip'] || event.ip
const timestamp = event.timestamp || properties?.timestamp || now || sent_at
let ingestedProperties = properties
let elements = []

// only move prop to elements for the $autocapture action
if (eventName === '$autocapture' && properties && '$elements' in properties) {
const { $elements, ...props } = properties
ingestedProperties = props
elements = $elements
}

return {
uuid,
event: eventName,
properties: JSON.stringify(ingestedProperties || {}),
elements: JSON.stringify(elements || {}),
set: JSON.stringify($set || {}),
set_once: JSON.stringify($set_once || {}),
distinct_id,
team_id,
ip,
site_url,
timestamp: timestamp ? global.bigQueryClient.timestamp(timestamp) : null,
}
})
await global.bigQueryTable.insert(rows)
console.log(`Inserted ${events.length} ${events.length > 1 ? 'events' : 'event'} to BigQuery`)
} catch (error) {
console.error(`Error inserting ${events.length} ${events.length > 1 ? 'events' : 'event'} into BigQuery: `, error)
throw new RetryError(`Error inserting into BigQuery! ${JSON.stringify(error.errors)}`)
}
}

// What follows is code that should be abstracted away into the plugin server itself.

const setupBufferExportCode = (
meta: PluginMeta<BigQueryPlugin>,
exportEvents: (events: PluginEvent[], meta: PluginMeta<BigQueryPlugin>) => Promise<void>
) => {
const uploadBytes = Math.max(1, Math.min(parseInt(meta.config.exportEventsBufferBytes) || 1024 * 1024, 100))
const uploadSeconds = Math.max(1, Math.min(parseInt(meta.config.exportEventsBufferSeconds) || 30, 600))

meta.global.exportEventsToIgnore = new Set(
meta.config.exportEventsToIgnore
? meta.config.exportEventsToIgnore.split(',').map((event) => event.trim())
: null
)
meta.global.exportEventsBuffer = createBuffer({
limit: uploadBytes,
timeoutSeconds: uploadSeconds,
onFlush: async (batch) => {
const jobPayload = {
batch,
batchId: Math.floor(Math.random() * 1000000),
retriesPerformedSoFar: 0,
}
const firstThroughQueue = false // TODO: might make sense sometimes? e.g. when we are processing too many tasks already?
if (firstThroughQueue) {
await meta.jobs.exportEventsWithRetry(jobPayload).runNow()
} else {
await meta.global.exportEventsWithRetry(jobPayload, meta)
}
},
})
meta.global.exportEventsWithRetry = async (payload: UploadJobPayload, meta: PluginMeta<BigQueryPlugin>) => {
const { jobs } = meta
try {
await exportEvents(payload.batch, meta)
} catch (err) {
if (err instanceof RetryError) {
if (payload.retriesPerformedSoFar < 15) {
const nextRetrySeconds = 2 ** payload.retriesPerformedSoFar * 3
console.log(`Enqueued batch ${payload.batchId} for retry in ${Math.round(nextRetrySeconds)}s`)

await jobs
.exportEventsWithRetry({ ...payload, retriesPerformedSoFar: payload.retriesPerformedSoFar + 1 })
.runIn(nextRetrySeconds, 'seconds')
} else {
console.log(
`Dropped batch ${payload.batchId} after retrying ${payload.retriesPerformedSoFar} times`
)
}
} else {
throw err
}
}
}
}

export const jobs: BigQueryPlugin['jobs'] = {
exportEventsWithRetry: async (payload, meta) => {
meta.global.exportEventsWithRetry(payload, meta)
},
}

export const onEvent: BigQueryPlugin['onEvent'] = (event, { global }) => {
if (!global.exportEventsToIgnore.has(event.event)) {
global.exportEventsBuffer.add(event)
}
}
13 changes: 13 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"name": "@posthog/s3-export-plugin",
"private": true,
"version": "0.0.1",
"description": "Export PostHog events to Amazon S3 on ingestion.",
"devDependencies": {
"@google-cloud/bigquery": "^5.6.0",
"@posthog/plugin-contrib": "^0.0.4",
"@posthog/plugin-scaffold": "^0.10.0",
"@types/generic-pool": "^3.1.9",
"generic-pool": "^3.7.8"
}
}
24 changes: 23 additions & 1 deletion plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "BigQuery Export",
"url": "https://github.com/PostHog/bigquery-plugin",
"description": "Sends events to a BigQuery database on ingestion.",
"main": "index.js",
"main": "index.ts",
"posthogVersion": ">= 1.25.0",
"config": [
{
"key": "googleCloudKeyJson",
Expand All @@ -23,6 +24,27 @@
"name": "Table ID",
"type": "string",
"required": true
},
{
"key": "exportEventsToIgnore",
"name": "Events to ignore",
"type": "string",
"default": "$feature_flag_called",
"hint": "Comma separated list of events to ignore"
},
{
"key": "exportEventsBufferBytes",
"name": "Maximum upload size in bytes",
"type": "string",
"default": "1048576",
"hint": "Default 1MB. Upload events after buffering this many of them. BigQuery has a 250KB limit per row, so events are still sent individually after the buffer is flushed. The value must be between 1 and 10 MB."
},
{
"key": "exportEventsBufferSeconds",
"name": "Export events at least every X seconds",
"type": "string",
"default": "30",
"hint": "Default 30 seconds. If there are events to upload and this many seconds has passed since the last upload, then upload the queued events. The value must be between 1 and 600 seconds."
}
]
}
15 changes: 15 additions & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"compilerOptions": {
"target": "ES2018",
"lib": ["ES2019"],
"module": "ES2015",
"moduleResolution": "Node",
"strict": true,
"forceConsistentCasingInFileNames": true,
"noImplicitReturns": true,
"noUnusedParameters": true,
"esModuleInterop": true,
"noEmit": true
},
"exclude": ["**.test.ts"]
}
Loading

0 comments on commit eb67063

Please sign in to comment.