Skip to content
This repository has been archived by the owner on Mar 7, 2024. It is now read-only.

Add retry queue, + refactor #5

Merged
merged 11 commits into from
May 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
.idea

node_modules/
97 changes: 0 additions & 97 deletions index.js

This file was deleted.

208 changes: 208 additions & 0 deletions index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import { createBuffer } from '@posthog/plugin-contrib'
import { Plugin, PluginMeta, PluginEvent, RetryError } from '@posthog/plugin-scaffold'
import { BigQuery, Table } from '@google-cloud/bigquery'

type BigQueryPlugin = Plugin<{
global: {
bigQueryClient: BigQuery
bigQueryTable: Table

exportEventsBuffer: ReturnType<typeof createBuffer>
exportEventsToIgnore: Set<string>
exportEventsWithRetry: (payload: UploadJobPayload, meta: PluginMeta<BigQueryPlugin>) => Promise<void>
}
config: {
datasetId: string
tableId: string

exportEventsBufferBytes: string
exportEventsBufferSeconds: string
exportEventsToIgnore: string
}
jobs: {
exportEventsWithRetry: UploadJobPayload
}
}>

interface UploadJobPayload {
batch: PluginEvent[]
batchId: number
retriesPerformedSoFar: number
}

export const setupPlugin: BigQueryPlugin['setupPlugin'] = async (meta) => {
const { global, attachments, config } = meta
if (!attachments.googleCloudKeyJson) {
throw new Error('JSON config not provided!')
}
if (!config.datasetId) {
throw new Error('Dataset ID not provided!')
}
if (!config.tableId) {
throw new Error('Table ID not provided!')
}

const credentials = JSON.parse(attachments.googleCloudKeyJson.contents.toString())
global.bigQueryClient = new BigQuery({
projectId: credentials['project_id'],
credentials,
})
global.bigQueryTable = global.bigQueryClient.dataset(config.datasetId).table(config.tableId)

try {
// check if the table exists
await global.bigQueryTable.get()
} catch (error) {
// some other error? abort!
if (!error.message.includes('Not found')) {
throw new Error(error)
}
console.log(`Creating BigQuery Table - ${config.datasetId}:${config.tableId}`)

const schema = [
{ name: 'uuid', type: 'STRING' },
{ name: 'event', type: 'STRING' },
{ name: 'properties', type: 'STRING' },
{ name: 'elements', type: 'STRING' },
{ name: 'set', type: 'STRING' },
{ name: 'set_once', type: 'STRING' },
{ name: 'distinct_id', type: 'STRING' },
{ name: 'team_id', type: 'INT64' },
{ name: 'ip', type: 'STRING' },
{ name: 'site_url', type: 'STRING' },
{ name: 'timestamp', type: 'TIMESTAMP' },
]

try {
await global.bigQueryClient.dataset(config.datasetId).createTable(config.tableId, { schema })
} catch (error) {
// a different worker already created the table
if (!error.message.includes('Already Exists')) {
throw new Error()
}
}
}

setupBufferExportCode(meta, exportEventsToBigQuery)
}

export async function exportEventsToBigQuery(events: PluginEvent[], { global }: PluginMeta<BigQueryPlugin>) {
if (!global.bigQueryTable) {
throw new Error('No BigQuery client initialized!')
}
try {
const rows = events.map((event) => {
const {
event: eventName,
properties,
$set,
$set_once,
distinct_id,
team_id,
site_url,
now,
sent_at,
uuid,
..._discard
} = event
const ip = properties?.['$ip'] || event.ip
const timestamp = event.timestamp || properties?.timestamp || now || sent_at
let ingestedProperties = properties
let elements = []

// only move prop to elements for the $autocapture action
if (eventName === '$autocapture' && properties && '$elements' in properties) {
const { $elements, ...props } = properties
ingestedProperties = props
elements = $elements
}

return {
uuid,
event: eventName,
properties: JSON.stringify(ingestedProperties || {}),
elements: JSON.stringify(elements || {}),
set: JSON.stringify($set || {}),
set_once: JSON.stringify($set_once || {}),
distinct_id,
team_id,
ip,
site_url,
timestamp: timestamp ? global.bigQueryClient.timestamp(timestamp) : null,
}
})
await global.bigQueryTable.insert(rows)
console.log(`Inserted ${events.length} ${events.length > 1 ? 'events' : 'event'} to BigQuery`)
} catch (error) {
console.error(`Error inserting ${events.length} ${events.length > 1 ? 'events' : 'event'} into BigQuery: `, error)
throw new RetryError(`Error inserting into BigQuery! ${JSON.stringify(error.errors)}`)
}
}

// What follows is code that should be abstracted away into the plugin server itself.

const setupBufferExportCode = (
meta: PluginMeta<BigQueryPlugin>,
exportEvents: (events: PluginEvent[], meta: PluginMeta<BigQueryPlugin>) => Promise<void>
) => {
const uploadBytes = Math.max(1, Math.min(parseInt(meta.config.exportEventsBufferBytes) || 1024 * 1024, 100))
const uploadSeconds = Math.max(1, Math.min(parseInt(meta.config.exportEventsBufferSeconds) || 30, 600))

meta.global.exportEventsToIgnore = new Set(
meta.config.exportEventsToIgnore
? meta.config.exportEventsToIgnore.split(',').map((event) => event.trim())
: null
)
meta.global.exportEventsBuffer = createBuffer({
limit: uploadBytes,
timeoutSeconds: uploadSeconds,
onFlush: async (batch) => {
const jobPayload = {
batch,
batchId: Math.floor(Math.random() * 1000000),
retriesPerformedSoFar: 0,
}
const firstThroughQueue = false // TODO: might make sense sometimes? e.g. when we are processing too many tasks already?
if (firstThroughQueue) {
await meta.jobs.exportEventsWithRetry(jobPayload).runNow()
} else {
await meta.global.exportEventsWithRetry(jobPayload, meta)
}
},
})
meta.global.exportEventsWithRetry = async (payload: UploadJobPayload, meta: PluginMeta<BigQueryPlugin>) => {
const { jobs } = meta
try {
await exportEvents(payload.batch, meta)
} catch (err) {
if (err instanceof RetryError) {
if (payload.retriesPerformedSoFar < 15) {
const nextRetrySeconds = 2 ** payload.retriesPerformedSoFar * 3
console.log(`Enqueued batch ${payload.batchId} for retry in ${Math.round(nextRetrySeconds)}s`)

await jobs
.exportEventsWithRetry({ ...payload, retriesPerformedSoFar: payload.retriesPerformedSoFar + 1 })
.runIn(nextRetrySeconds, 'seconds')
} else {
console.log(
`Dropped batch ${payload.batchId} after retrying ${payload.retriesPerformedSoFar} times`
)
}
} else {
throw err
}
}
}
}

export const jobs: BigQueryPlugin['jobs'] = {
exportEventsWithRetry: async (payload, meta) => {
meta.global.exportEventsWithRetry(payload, meta)
},
}

export const onEvent: BigQueryPlugin['onEvent'] = (event, { global }) => {
if (!global.exportEventsToIgnore.has(event.event)) {
global.exportEventsBuffer.add(event)
}
}
13 changes: 13 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"name": "@posthog/s3-export-plugin",
"private": true,
"version": "0.0.1",
"description": "Export PostHog events to Amazon S3 on ingestion.",
"devDependencies": {
"@google-cloud/bigquery": "^5.6.0",
"@posthog/plugin-contrib": "^0.0.4",
"@posthog/plugin-scaffold": "^0.10.0",
"@types/generic-pool": "^3.1.9",
"generic-pool": "^3.7.8"
}
}
24 changes: 23 additions & 1 deletion plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "BigQuery Export",
"url": "https://github.com/PostHog/bigquery-plugin",
"description": "Sends events to a BigQuery database on ingestion.",
"main": "index.js",
"main": "index.ts",
"posthogVersion": ">= 1.25.0",
"config": [
{
"key": "googleCloudKeyJson",
Expand All @@ -23,6 +24,27 @@
"name": "Table ID",
"type": "string",
"required": true
},
{
"key": "exportEventsToIgnore",
"name": "Events to ignore",
"type": "string",
"default": "$feature_flag_called",
"hint": "Comma separated list of events to ignore"
},
{
"key": "exportEventsBufferBytes",
"name": "Maximum upload size in bytes",
"type": "string",
"default": "1048576",
"hint": "Default 1MB. Upload events after buffering this many of them. BigQuery has a 250KB limit per row, so events are still sent individually after the buffer is flushed. The value must be between 1 and 10 MB."
},
{
"key": "exportEventsBufferSeconds",
"name": "Export events at least every X seconds",
"type": "string",
"default": "30",
"hint": "Default 30 seconds. If there are events to upload and this many seconds has passed since the last upload, then upload the queued events. The value must be between 1 and 600 seconds."
}
]
}
15 changes: 15 additions & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"compilerOptions": {
"target": "ES2018",
"lib": ["ES2019"],
"module": "ES2015",
"moduleResolution": "Node",
"strict": true,
"forceConsistentCasingInFileNames": true,
"noImplicitReturns": true,
"noUnusedParameters": true,
"esModuleInterop": true,
"noEmit": true
},
"exclude": ["**.test.ts"]
}
Loading