Skip to content
This repository has been archived by the owner on Mar 7, 2024. It is now read-only.

Add bq_ingested_timestamp #9

Merged
merged 9 commits into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,11 @@ Sends events to a BigQuery database on ingestion.
1. Enter your Dataset ID
1. Enter your Table ID
1. Watch events roll into BigQuery

## Troubleshooting

### Duplicate Events

There's a very rare case when duplicate events appear in Bigquery. This happens due to network errors, where the export seems to have failed, yet it actually reaches Bigquery.

While this shouldn't happen, if you find duplicate events in Bigquery, follow these [official docs on Bigquery](https://cloud.google.com/bigquery/streaming-data-into-bigquery#manually_removing_duplicates) to manually remove the duplicates.
136 changes: 96 additions & 40 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { createBuffer } from '@posthog/plugin-contrib'
import { Plugin, PluginMeta, PluginEvent } from '@posthog/plugin-scaffold'
import { BigQuery, Table } from '@google-cloud/bigquery'

class RetryError extends Error {}
import { Plugin, PluginMeta, PluginEvent, RetryError } from '@posthog/plugin-scaffold'
import { BigQuery, Table, TableField, TableMetadata } from '@google-cloud/bigquery'

type BigQueryPlugin = Plugin<{
global: {
bigQueryClient: BigQuery
bigQueryTable: Table
bigQueryTableFields: TableField[]

exportEventsBuffer: ReturnType<typeof createBuffer>
exportEventsToIgnore: Set<string>
Expand Down Expand Up @@ -48,39 +47,77 @@ export const setupPlugin: BigQueryPlugin['setupPlugin'] = async (meta) => {
global.bigQueryClient = new BigQuery({
projectId: credentials['project_id'],
credentials,
autoRetry: false,
})
global.bigQueryTable = global.bigQueryClient.dataset(config.datasetId).table(config.tableId)

global.bigQueryTableFields = [
{ name: 'uuid', type: 'STRING' },
{ name: 'event', type: 'STRING' },
{ name: 'properties', type: 'STRING' },
{ name: 'elements', type: 'STRING' },
{ name: 'set', type: 'STRING' },
{ name: 'set_once', type: 'STRING' },
{ name: 'distinct_id', type: 'STRING' },
{ name: 'team_id', type: 'INT64' },
{ name: 'ip', type: 'STRING' },
{ name: 'site_url', type: 'STRING' },
{ name: 'timestamp', type: 'TIMESTAMP' },
{ name: 'bq_ingested_timestamp', type: 'TIMESTAMP' },
]

try {
// check if the table exists
await global.bigQueryTable.get()
const [metadata]: TableMetadata[] = await global.bigQueryTable.getMetadata()

if (!metadata.schema || !metadata.schema.fields) {
throw new Error("Can not get metadata for table. Please check if the table schema is defined.")
}

const existingFields = metadata.schema.fields
const fieldsToAdd = global.bigQueryTableFields.filter(
({ name }) => !existingFields.find((f: any) => f.name === name)
)

if (fieldsToAdd.length > 0) {
console.info(
`Incomplete schema on BigQuery table! Adding the following fields to reach parity: ${JSON.stringify(
fieldsToAdd
)}`
)

let result: TableMetadata
try {
metadata.schema.fields = metadata.schema.fields.concat(fieldsToAdd)
;[result] = await global.bigQueryTable.setMetadata(metadata)
} catch (error) {
const fieldsToStillAdd = global.bigQueryTableFields.filter(
({ name }) => !result.schema?.fields?.find((f: any) => f.name === name)
)

if (fieldsToStillAdd.length > 0) {
throw new Error(
`Tried adding fields ${JSON.stringify(fieldsToAdd)}, but ${JSON.stringify(
fieldsToStillAdd
)} still to add. Can not start plugin.`
)
}
}
}
} catch (error) {
// some other error? abort!
if (!error.message.includes('Not found')) {
throw new Error(error)
}
console.log(`Creating BigQuery Table - ${config.datasetId}:${config.tableId}`)

const schema = [
{ name: 'uuid', type: 'STRING' },
{ name: 'event', type: 'STRING' },
{ name: 'properties', type: 'STRING' },
{ name: 'elements', type: 'STRING' },
{ name: 'set', type: 'STRING' },
{ name: 'set_once', type: 'STRING' },
{ name: 'distinct_id', type: 'STRING' },
{ name: 'team_id', type: 'INT64' },
{ name: 'ip', type: 'STRING' },
{ name: 'site_url', type: 'STRING' },
{ name: 'timestamp', type: 'TIMESTAMP' },
]

try {
await global.bigQueryClient.dataset(config.datasetId).createTable(config.tableId, { schema })
await global.bigQueryClient
.dataset(config.datasetId)
.createTable(config.tableId, { schema: global.bigQueryTableFields })
} catch (error) {
// a different worker already created the table
if (!error.message.includes('Already Exists')) {
throw new Error()
throw error
}
}
}
Expand All @@ -89,6 +126,12 @@ export const setupPlugin: BigQueryPlugin['setupPlugin'] = async (meta) => {
}

export async function exportEventsToBigQuery(events: PluginEvent[], { global }: PluginMeta<BigQueryPlugin>) {
const insertOptions = {
createInsertId: false,
partialRetries: 0,
raw: true,
}

if (!global.bigQueryTable) {
throw new Error('No BigQuery client initialized!')
}
Expand Down Expand Up @@ -119,24 +162,36 @@ export async function exportEventsToBigQuery(events: PluginEvent[], { global }:
elements = $elements
}

return {
uuid,
event: eventName,
properties: JSON.stringify(ingestedProperties || {}),
elements: JSON.stringify(elements || {}),
set: JSON.stringify($set || {}),
set_once: JSON.stringify($set_once || {}),
distinct_id,
team_id,
ip,
site_url,
timestamp: timestamp ? global.bigQueryClient.timestamp(timestamp) : null,
const object: {json: Record<string, any>, insertId?: string} = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify this object if not using insertId?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could remove the insertId from the type, but other than that, nope, can't do, because the sdk by default inserts a random insertId unless I'm using the raw version, which requires the object to look like this^

json: {
uuid,
event: eventName,
properties: JSON.stringify(ingestedProperties || {}),
elements: JSON.stringify(elements || {}),
set: JSON.stringify($set || {}),
set_once: JSON.stringify($set_once || {}),
distinct_id,
team_id,
ip,
site_url,
timestamp: timestamp,
bq_ingested_timestamp: new Date().toISOString(),
}
}
return object
})
await global.bigQueryTable.insert(rows)
console.log(`Inserted ${events.length} ${events.length > 1 ? 'events' : 'event'} to BigQuery`)

const start = Date.now()
await global.bigQueryTable.insert(rows, insertOptions)
const end = Date.now() - start

console.log(`Inserted ${events.length} ${events.length > 1 ? 'events' : 'event'} to BigQuery. Took ${end/1000} seconds.`)

} catch (error) {
console.error(`Error inserting ${events.length} ${events.length > 1 ? 'events' : 'event'} into BigQuery: `, error)
console.error(
`Error inserting ${events.length} ${events.length > 1 ? 'events' : 'event'} into BigQuery: `,
error
)
throw new RetryError(`Error inserting into BigQuery! ${JSON.stringify(error.errors)}`)
}
}
Expand All @@ -147,7 +202,8 @@ const setupBufferExportCode = (
meta: PluginMeta<BigQueryPlugin>,
exportEvents: (events: PluginEvent[], meta: PluginMeta<BigQueryPlugin>) => Promise<void>
) => {
const uploadBytes = Math.max(1, Math.min(parseInt(meta.config.exportEventsBufferBytes) || 1024 * 1024, 100))

const uploadBytes = Math.max(1024*1024, Math.min(parseInt(meta.config.exportEventsBufferBytes) || 1024 * 1024, 1024*1024*10))
const uploadSeconds = Math.max(1, Math.min(parseInt(meta.config.exportEventsBufferSeconds) || 30, 600))

meta.global.exportEventsToIgnore = new Set(
Expand Down Expand Up @@ -199,12 +255,12 @@ const setupBufferExportCode = (

export const jobs: BigQueryPlugin['jobs'] = {
exportEventsWithRetry: async (payload, meta) => {
meta.global.exportEventsWithRetry(payload, meta)
await meta.global.exportEventsWithRetry(payload, meta)
},
}

export const onEvent: BigQueryPlugin['onEvent'] = (event, { global }) => {
if (!global.exportEventsToIgnore.has(event.event)) {
global.exportEventsBuffer.add(event)
global.exportEventsBuffer.add(event, JSON.stringify(event).length)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

}
}
2 changes: 1 addition & 1 deletion plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"name": "Maximum upload size in bytes",
"type": "string",
"default": "1048576",
"hint": "Default 1MB. Upload events after buffering this many of them. BigQuery has a 250KB limit per row, so events are still sent individually after the buffer is flushed. The value must be between 1 and 10 MB."
"hint": "Default 1MB. Upload events after buffering this many of them. The value must be between 1 MB and 10 MB."
},
{
"key": "exportEventsBufferSeconds",
Expand Down