Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Port Python get_event endpoint over to plugin server #25

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5792d4d
Add CI and set up Jest
Twixes Nov 26, 2020
6913735
Add integrated ingestion server
Twixes Nov 26, 2020
c024511
Port over part of Python get_event endpoint to TS
Twixes Nov 26, 2020
4c24336
Update yarn.lock
Twixes Nov 26, 2020
7a8322a
Fix CI
Twixes Nov 26, 2020
e13e337
Update yarn.lock
Twixes Nov 26, 2020
e6ffc74
Fix CI
Twixes Nov 26, 2020
5d272c7
Fix test script
Twixes Nov 26, 2020
aacc34e
Fix test script
Twixes Nov 26, 2020
aea622c
Create .prettierignore
Twixes Nov 26, 2020
38a83d9
Merge branch 'ci-jest' into ingestion-base
Twixes Nov 26, 2020
c4e60a7
Don't import dynamically
Twixes Nov 26, 2020
fdeffbc
Merge branch 'ingestion-base' into ingestion-capture
Twixes Nov 26, 2020
23de6fb
Improve error handling
Twixes Nov 26, 2020
9ad4540
Say "web" instead of "ingestion", use fastify instead of httpf
Twixes Nov 26, 2020
f949a7d
Merge branch 'master' into ingestion-base
Twixes Nov 26, 2020
59b33f7
Fix curlies
Twixes Nov 26, 2020
1cc83bc
Add lint:fix script
Twixes Nov 26, 2020
f68be04
Merge branch 'ingestion-base' into ingestion-capture
Twixes Nov 26, 2020
54453d9
Move things to Fastify
Twixes Nov 27, 2020
50808ed
Merge branch 'master' into ingestion-capture
Twixes Nov 27, 2020
bf55347
Update yarn.lock
Twixes Nov 27, 2020
576e495
Rearrange things
Twixes Dec 1, 2020
8e7cfa4
Update the way the server is organized
Twixes Dec 1, 2020
89134cd
Fix tests
Twixes Dec 2, 2020
600b187
Merge branch 'master' into ingestion-capture
Twixes Dec 2, 2020
8b41f07
Sync with master
Twixes Dec 2, 2020
c107432
Remove unused imports
Twixes Dec 2, 2020
de4c0a2
Fix style
Twixes Dec 2, 2020
535d22e
Merge branch 'master' into ingestion-capture
Twixes Dec 2, 2020
5e48b80
Update yarn.lock
Twixes Dec 2, 2020
fb48d04
Bring back esModuleInterop for compat with supertest
Twixes Dec 2, 2020
bba15b3
Fix compat with supertest without esModuleInterop
Twixes Dec 2, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/__tests__/vm.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ test('attachments', async () => {
attachedFile: {
content_type: 'application/json',
file_name: 'plugin.json',
contents: new Buffer('{"name": "plugin"}'),
contents: Buffer.from('{"name": "plugin"}'),
},
}
const vm = createPluginConfigVM(
Expand Down
48 changes: 48 additions & 0 deletions src/__tests__/web/capture.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { buildFastifyInstance } from '../../web/server'
import * as request from 'supertest'

const fastifyInstance = buildFastifyInstance()
const { server } = fastifyInstance

beforeAll(fastifyInstance.ready)
afterAll(fastifyInstance.close)

test('Rejects capture request with no data at all', async () => {
const response = await request(server).get('/')
expect(response.body).toEqual({
statusCode: 400,
error: 'Bad Request',
message: 'No data found. Make sure to use a POST request when sending the payload in the body of the request.',
})
expect(response.status).toBe(400)
})

test('Handles server errors', async () => {
const response = await request(server).post('/').set('Content-Type', 'text/plain').send('1337')
expect(response.body).toEqual({
statusCode: 500,
error: 'Internal Server Error',
message: 'Unexpected leet detected!',
})
expect(response.status).toBe(500)
})

test('Disallows PATCH method', async () => {
const response = await request(server).patch('/')
expect(response.body).toEqual({
statusCode: 405,
error: 'Method Not Allowed',
message: `Method PATCH not allowed! Try GET or POST.`,
})
expect(response.status).toBe(405)
})

test('Disallows DELETE method', async () => {
const response = await request(server).delete('/')
expect(response.body).toEqual({
statusCode: 405,
error: 'Method Not Allowed',
message: `Method DELETE not allowed! Try GET or POST.`,
})
expect(response.status).toBe(405)
})
12 changes: 7 additions & 5 deletions src/server.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Pool } from 'pg'
import * as schedule from 'node-schedule'
import * as Redis from 'ioredis'
import { FastifyInstance } from 'fastify'
import { PluginsServer, PluginsServerConfig } from './types'
import { version } from '../package.json'
import { setupPlugins } from './plugins'
import { startWorker } from './worker'
import * as schedule from 'node-schedule'
import * as Redis from 'ioredis'
import { startWebServer, stopWebServer } from './web/server'
import { startFastifyInstance, stopFastifyInstance } from './web/server'

export const defaultConfig: PluginsServerConfig = {
CELERY_DEFAULT_QUEUE: 'celery',
Expand Down Expand Up @@ -41,8 +42,9 @@ export async function startPluginsServer(config: PluginsServerConfig): Promise<v

await setupPlugins(server)

let fastifyInstance: FastifyInstance | null = null
if (!serverConfig.DISABLE_WEB) {
await startWebServer(serverConfig.WEB_PORT, serverConfig.WEB_HOSTNAME)
fastifyInstance = await startFastifyInstance(serverConfig.WEB_PORT, serverConfig.WEB_HOSTNAME)
}

let stopWorker = startWorker(server)
Expand All @@ -67,7 +69,7 @@ export async function startPluginsServer(config: PluginsServerConfig): Promise<v

const closeJobs = async () => {
if (!serverConfig.DISABLE_WEB) {
await stopWebServer()
await stopFastifyInstance(fastifyInstance!)
}
await stopWorker()
pubSub.disconnect()
Expand Down
133 changes: 133 additions & 0 deletions src/web/capture.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import { FastifyRequest, FastifyReply } from 'fastify'
import { isLooselyFalsy, loadDataFromRequest } from './utils'

const ALLOWED_METHODS = ['GET', 'POST']

export async function getEvent(request: FastifyRequest, reply: FastifyReply): Promise<Record<string, any>> {
if (!ALLOWED_METHODS.includes(request.method)) {
const badRequest = new Error(`Method ${request.method} not allowed! Try ${ALLOWED_METHODS.join(' or ')}.`)
;((badRequest as unknown) as Record<string, number>).statusCode = 405
throw badRequest
}

// Edge case for testing error handling
if (request.body == '1337') {
throw new Error('Unexpected leet detected!')
}

// TODO: statsd timer

const now = new Date()

let dataFromRequest
let data
try {
dataFromRequest = loadDataFromRequest(request)
data = dataFromRequest['data']
} catch {
const badRequest = new Error("Malformed request data. Make sure you're sending valid JSON.")
;((badRequest as unknown) as Record<string, number>).statusCode = 400
throw badRequest
}

if (isLooselyFalsy(data)) {
const badRequest = new Error(
'No data found. Make sure to use a POST request when sending the payload in the body of the request.'
)
;((badRequest as unknown) as Record<string, number>).statusCode = 400
throw badRequest
}

/*
sent_at = _get_sent_at(data, request)

token = _get_token(data, request)
is_personal_api_key = False
if not token:
token = PersonalAPIKeyAuthentication.find_key(
request, data_from_request["body"], data if isinstance(data, dict) else None
)
is_personal_api_key = True
if not token:
return cors_response(
request,
JsonResponse(
{
"message": "Neither api_key nor personal_api_key set. You can find your project API key in PostHog project settings.",
},
status=400,
),
)

team = Team.objects.get_team_from_token(token, is_personal_api_key)
if team is None:
return cors_response(
request,
JsonResponse(
{
"message": "Project or personal API key invalid. You can find your project API key in PostHog project settings.",
},
status=400,
),
)

if isinstance(data, dict):
if data.get("batch"): # posthog-python and posthog-ruby
data = data["batch"]
assert data is not None
elif "engage" in request.path_info: # JS identify call
data["event"] = "$identify" # make sure it has an event name

if isinstance(data, list):
events = data
else:
events = [data]

for event in events:
try:
distinct_id = _get_distinct_id(event)
except KeyError:
return cors_response(
request,
JsonResponse(
{
"message": "You need to set user distinct ID field `distinct_id`.",
"item": event,
},
status=400,
),
)
if "event" not in event:
return cors_response(
request,
JsonResponse(
{"message": "You need to set event name field `event`.", "item": event},
status=400,
),
)

process_event_ee(
distinct_id=distinct_id,
ip=get_ip_address(request),
site_url=request.build_absolute_uri("/")[:-1],
data=event,
team_id=team.id,
now=now,
sent_at=sent_at,
)

if settings.LOG_TO_WAL:
# log the event to kafka write ahead log for processing
log_event(
distinct_id=distinct_id,
ip=get_ip_address(request),
site_url=request.build_absolute_uri("/")[:-1],
data=event,
team_id=team.id,
now=now,
sent_at=sent_at,
)
*/

return { status: 1 }
}
49 changes: 36 additions & 13 deletions src/web/server.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,48 @@
import { fastify, FastifyRequest, FastifyReply } from 'fastify'
import { fastify, FastifyInstance } from 'fastify'
import { parse as querystringParse, ParsedUrlQuery } from 'querystring'
import { parse as urlParse } from 'url'
import { getEvent } from './capture'

export const webServer = fastify()
declare module 'fastify' {
export interface FastifyRequest {
GET: ParsedUrlQuery
POST: ParsedUrlQuery
}
}

async function getEvent(request: FastifyRequest, reply: FastifyReply): Promise<Record<string, unknown>> {
return {}
export function buildFastifyInstance(): FastifyInstance {
const fastifyInstance = fastify()
fastifyInstance.addHook('preHandler', async (request) => {
// Mimic Django HttpRequest with GET and POST properties
request.GET = urlParse(request.url, true).query
try {
request.POST = querystringParse(String(request.body))
} catch {
request.POST = {}
}
})
fastifyInstance.all('*', getEvent)
return fastifyInstance
}

webServer.get('*', getEvent)
webServer.post('*', getEvent)
export async function stopFastifyInstance(fastifyInstance: FastifyInstance): Promise<void> {
await fastifyInstance.close()
console.info(`\n🛑 Web server cleaned up!`)
}

export async function startWebServer(port: string | number, hostname?: string): Promise<void> {
export async function startFastifyInstance(
port: string | number = 3008,
hostname?: string,
withSignalHandling = true
): Promise<FastifyInstance> {
console.info(`👾 Starting web server…`)
const fastifyInstance = buildFastifyInstance()
try {
const address = await webServer.listen(port, hostname)
const address = await fastifyInstance.listen(port, hostname)
console.info(`✅ Web server listening on ${address}!`)
} catch (e) {
console.error(`🛑 Web server could not start! ${e}`)
return fastifyInstance
}
}

export async function stopWebServer(): Promise<void> {
await webServer.close()
console.info(`\n🛑 Web server cleaned up!`)
return fastifyInstance
}
78 changes: 78 additions & 0 deletions src/web/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { FastifyRequest } from 'fastify'
import { ParsedUrlQuery } from 'querystring'
import { gunzipSync } from 'zlib'

declare module 'fastify' {
export interface FastifyRequest {
GET: ParsedUrlQuery
POST: ParsedUrlQuery
}
}

export function loadDataFromRequest(request: FastifyRequest): any {
const dataRes: Record<string, any> = { data: {}, body: null }
let data: any
if (request.method === 'POST') {
if (request.headers['content-type'] === 'application/json') {
data = request.body
try {
dataRes['body'] = { ...JSON.parse(request.body as string) }
} catch {}
} else {
try {
data = request.POST['data']
} catch {}
}
} else {
data = request.GET['data']
}

if (isLooselyFalsy(data)) {
return {}
}

// TODO: convert below from Python
// # add the data in sentry's scope in case there's an exception
// with push_scope() as scope:
// scope.set_context("data", data)

let compression = request.GET['compression'] || request.POST['compression'] || request.headers['content-encoding']
if (Array.isArray(compression)) {
compression = compression[0]
}
compression = compression && compression.toLowerCase()

switch (compression) {
case 'gzip':
data = gunzipSync(data)
break
case 'lz64':
// TODO: convert below from Python
// data = lzstring.LZString().decompressFromBase64(typeof data === 'string' ? data.replace(" ", "+") : data.decode().replace(" ", "+")).encode("utf-16", "surrogatepass").decode("utf-16")
break
default:
break
}

// Is it plain json?
try {
data = JSON.parse(data)
} catch {
// if not, it's probably base64 encoded from other libraries
data = base64ToJson(data)
}
dataRes['data'] = data
// FIXME: data can also be an array, function assumes it's either None or a dictionary.
return dataRes
}

export function base64ToJson(data: string): any {
return JSON.parse(
btoa(data.replace(' ', '+') + '===')
// TODO: investigate UTF-8/UTF-16 surrogate passes here
)
}

export function isLooselyFalsy(value: any): boolean {
return Array.isArray(value) ? !value.length : !value || !Object.keys(value).length
}
Loading