Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Commit

Permalink
Action matching juice (#436)
Browse files Browse the repository at this point in the history
* Reorient `ActionManager` to group by teamId for practicality

* Make `getTeamActions()` return type more versatile

* Add ActionMatcher base

* Add `matchActions` worker task for optimization

* Add part of action matching checks

* Fix PubSub's lack of teamId

* Remove `eventsProcessor.prepare()` calls

* Add a legit ActionMatcher test

* Adjust test for matchActions task

* Improve task counting in test

* Add moar matching capabilities

* Improve tests

* Reorganize class dependencies

* Add cohort matching

* Add element/selector matching and polish other action matching parts

* Handle selector matching edge cases

* Save matched action occurrences to Postgres

* Fix action-matcher tests

* Don't expose ActionManager methods in ActionMatcher

* Use feedback + RE2

* Remove never satisfied branch

* Address sum feedback and clean code up

* Use action matching results in a smarter way

* Fix `createHub`

* Add action matching metric

* Enhance `PLUGIN_SERVER_ACTION_MATCHING`

* Update action-matcher.test.ts

* Remove `||=`

* Update process-event.ts

* Only fetch person if matching actions

* Fix non-string distinct ID handling

* Update process-event.ts
  • Loading branch information
Twixes authored Jun 9, 2021
1 parent d4d8c02 commit 84e471c
Show file tree
Hide file tree
Showing 18 changed files with 1,425 additions and 125 deletions.
1 change: 0 additions & 1 deletion benchmarks/postgres/ingestion.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ describe('ingestion benchmarks', () => {
LOG_LEVEL: LogLevel.Log,
})
eventsProcessor = new EventsProcessor(hub)
await eventsProcessor.prepare()
team = await getFirstTeam(hub)
now = DateTime.utc()

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"@types/lru-cache": "^5.1.0",
"adm-zip": "^0.4.16",
"aws-sdk": "^2.884.0",
"escape-string-regexp": "^4.0.0",
"fast-deep-equal": "^3.1.3",
"generic-pool": "^3.7.1",
"graphile-worker": "^0.11.1",
Expand All @@ -75,6 +76,7 @@
"posthog-js-lite": "^0.0.5",
"pretty-bytes": "^5.6.0",
"protobufjs": "^6.10.2",
"re2": "^1.16.0",
"redlock": "^4.2.0",
"snowflake-sdk": "^1.6.0",
"tar-stream": "^2.1.4",
Expand Down
3 changes: 3 additions & 0 deletions src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export function getDefaultConfig(): PluginsServerConfig {
CRASH_IF_NO_PERSISTENT_JOB_QUEUE: false,
STALENESS_RESTART_SECONDS: 0,
CAPTURE_INTERNAL_METRICS: false,
PLUGIN_SERVER_ACTION_MATCHING: 0,
}
}

Expand Down Expand Up @@ -124,6 +125,8 @@ export function getConfigHelp(): Record<keyof PluginsServerConfig, string> {
'refuse to start unless there is a properly configured persistent job queue (e.g. graphile)',
STALENESS_RESTART_SECONDS: 'trigger a restart if no event ingested for this duration',
CAPTURE_INTERNAL_METRICS: 'capture internal metrics for posthog in posthog',
PLUGIN_SERVER_ACTION_MATCHING:
'whether plugin server action matching results should be used (transition period setting)',
}
}

Expand Down
34 changes: 19 additions & 15 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { DB } from './utils/db/db'
import { KafkaProducerWrapper } from './utils/db/kafka-producer-wrapper'
import { InternalMetrics } from './utils/internal-metrics'
import { UUID } from './utils/utils'
import { ActionManager } from './worker/ingestion/action-manager'
import { ActionMatcher } from './worker/ingestion/action-matcher'
import { EventsProcessor } from './worker/ingestion/process-event'
import { LazyPluginVM } from './worker/vm/lazy'

Expand Down Expand Up @@ -91,6 +93,7 @@ export interface PluginsServerConfig extends Record<string, any> {
CRASH_IF_NO_PERSISTENT_JOB_QUEUE: boolean
STALENESS_RESTART_SECONDS: number
CAPTURE_INTERNAL_METRICS: boolean
PLUGIN_SERVER_ACTION_MATCHING: 0 | 1 | 2
}

export interface Hub extends PluginsServerConfig {
Expand All @@ -115,6 +118,8 @@ export interface Hub extends PluginsServerConfig {
pluginConfigSecrets: Map<PluginConfigId, string>
pluginConfigSecretLookup: Map<string, PluginConfigId>
// tools
actionManager: ActionManager
actionMatcher: ActionMatcher
eventsProcessor: EventsProcessor
jobQueueManager: JobQueueManager
// diagnostics
Expand Down Expand Up @@ -487,44 +492,43 @@ export enum PropertyOperator {
}

/** Sync with posthog/frontend/src/types.ts */
interface BasePropertyFilter {
interface PropertyFilterBase {
key: string
value: string | number | Array<string | number> | null
label?: string
}

/** Sync with posthog/frontend/src/types.ts */
export interface EventPropertyFilter extends BasePropertyFilter {
type: 'event'
export interface PropertyFilterWithOperator extends PropertyFilterBase {
operator: PropertyOperator
}

/** Sync with posthog/frontend/src/types.ts */
export interface PersonPropertyFilter extends BasePropertyFilter {
export interface EventPropertyFilter extends PropertyFilterWithOperator {
type: 'event'
}

/** Sync with posthog/frontend/src/types.ts */
export interface PersonPropertyFilter extends PropertyFilterWithOperator {
type: 'person'
operator: PropertyOperator
}

/** Sync with posthog/frontend/src/types.ts */
export interface ElementPropertyFilter extends BasePropertyFilter {
export interface ElementPropertyFilter extends PropertyFilterWithOperator {
type: 'element'
key: 'tag_name' | 'text' | 'href' | 'selector'
operator: PropertyOperator
value: string
}

/** Sync with posthog/frontend/src/types.ts */
export interface CohortPropertyFilter extends BasePropertyFilter {
export interface CohortPropertyFilter extends PropertyFilterBase {
type: 'cohort'
key: 'id'
value: number
value: number | string
}

/** Sync with posthog/frontend/src/types.ts */
export type ActionStepProperties =
| EventPropertyFilter
| PersonPropertyFilter
| ElementPropertyFilter
| CohortPropertyFilter
export type PropertyFilter = EventPropertyFilter | PersonPropertyFilter | ElementPropertyFilter | CohortPropertyFilter

/** Sync with posthog/frontend/src/types.ts */
export enum ActionStepUrlMatching {
Expand All @@ -544,7 +548,7 @@ export interface ActionStep {
url_matching: ActionStepUrlMatching | null
name: string | null
event: string | null
properties: ActionStepProperties[] | null
properties: PropertyFilter[] | null
}

/** Raw Action row from database. */
Expand Down
22 changes: 21 additions & 1 deletion src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,17 @@ export class DB {
}
}

// Cohort & CohortPeople

public async doesPersonBelongToCohort(cohortId: number, personId: Person['id']): Promise<boolean> {
const selectResult = await this.postgresQuery(
`SELECT EXISTS (SELECT 1 FROM posthog_cohortpeople WHERE cohort_id = $1 AND person_id = $2);`,
[cohortId, personId],
'doesPersonBelongToCohort'
)
return selectResult.rows[0]
}

// Organization

public async fetchOrganization(organizationId: string): Promise<RawOrganization | undefined> {
Expand Down Expand Up @@ -724,7 +735,7 @@ export class DB {
).rows as PropertyDefinitionType[]
}

// Action & ActionStep
// Action & ActionStep & Action<>Event

public async fetchAllActionsGroupedByTeam(): Promise<Record<Team['id'], Record<Action['id'], Action>>> {
const rawActions: RawAction[] = (
Expand Down Expand Up @@ -772,6 +783,15 @@ export class DB {
return action
}

public async registerEventActionOccurrences(eventId: Event['id'], actions: Action[]): Promise<void> {
const valuesClause = actions.map((action, index) => `($1, $${index + 2})`).join(', ')
await this.postgresQuery(
`INSERT INTO posthog_action_events (event_id, action_id) VALUES ${valuesClause}`,
[eventId, ...actions.map((action) => action.id)],
'registerEventActionOccurrences'
)
}

// Team Internal Metrics

public async fetchInternalMetricsTeam(): Promise<Team['id'] | null> {
Expand Down
6 changes: 5 additions & 1 deletion src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { ConnectionOptions } from 'tls'
import { defaultConfig } from '../../config/config'
import { JobQueueManager } from '../../main/job-queues/job-queue-manager'
import { Hub, PluginsServerConfig } from '../../types'
import { ActionManager } from '../../worker/ingestion/action-manager'
import { ActionMatcher } from '../../worker/ingestion/action-matcher'
import { EventsProcessor } from '../../worker/ingestion/process-event'
import { InternalMetrics } from '../internal-metrics'
import { killProcess } from '../kill'
Expand Down Expand Up @@ -175,8 +177,10 @@ export async function createHub(
}

// :TODO: This is only used on worker threads, not main
hub.actionManager = new ActionManager(db)
await hub.actionManager.prepare()
hub.actionMatcher = new ActionMatcher(db, hub.actionManager, statsd)
hub.eventsProcessor = new EventsProcessor(hub as Hub)
await hub.eventsProcessor.prepare()
hub.jobQueueManager = new JobQueueManager(hub as Hub)

if (serverConfig.CAPTURE_INTERNAL_METRICS) {
Expand Down
13 changes: 13 additions & 0 deletions src/utils/db/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,19 @@ export function chainToElements(chain: string): Element[] {
return elements
}

export function extractElements(elements: Record<string, any>[]): Element[] {
return elements.map((el) => ({
text: el['$el_text']?.slice(0, 400),
tag_name: el['tag_name'],
href: el['attr__href']?.slice(0, 2048),
attr_class: el['attr__class']?.split(' '),
attr_id: el['attr__id'],
nth_child: el['nth_child'],
nth_of_type: el['nth_of_type'],
attributes: Object.fromEntries(Object.entries(el).filter(([key]) => key.startsWith('attr__'))),
}))
}

export function timeoutGuard(
message: string,
context?: Record<string, any>,
Expand Down
2 changes: 1 addition & 1 deletion src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { Readable } from 'stream'
import * as tar from 'tar-stream'
import * as zlib from 'zlib'

import { LogLevel, Plugin, PluginConfigId, PluginsServerConfig, TimestampFormat } from '../types'
import { Element, LogLevel, Plugin, PluginConfigId, PluginsServerConfig, TimestampFormat } from '../types'
import { status } from './status'

/** Time until autoexit (due to error) gives up on graceful exit and kills the process right away. */
Expand Down
17 changes: 13 additions & 4 deletions src/worker/ingestion/action-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ export class ActionManager {
this.ready = true
}

public getTeamActions(teamId: Team['id']): ActionMap | null {
public getTeamActions(teamId: Team['id']): ActionMap {
if (!this.ready) {
throw new Error('ActionManager is not ready! Run actionManager.prepare() before this')
}
return this.actionCache[teamId] || null
return this.actionCache[teamId] || {}
}

public async reloadAllActions(): Promise<void> {
Expand All @@ -35,7 +35,15 @@ export class ActionManager {

public async reloadAction(teamId: Team['id'], actionId: Action['id']): Promise<void> {
const refetchedAction = await this.db.fetchAction(actionId)
const wasCachedAlready = teamId in this.actionCache && actionId in this.actionCache[teamId]

let wasCachedAlready = true
if (!this.actionCache[teamId]) {
wasCachedAlready = false
this.actionCache[teamId] = {}
} else if (!this.actionCache[teamId][actionId]) {
wasCachedAlready = false
}

if (refetchedAction) {
status.debug(
'🍿',
Expand All @@ -59,7 +67,8 @@ export class ActionManager {
}

public dropAction(teamId: Team['id'], actionId: Action['id']): void {
const wasCachedAlready = teamId in this.actionCache && actionId in this.actionCache[teamId]
const wasCachedAlready = !!this.actionCache?.[teamId]?.[actionId]

if (wasCachedAlready) {
status.info('🍿', `Deleted action ID ${actionId} (team ID ${teamId}) from cache`)
delete this.actionCache[teamId][actionId]
Expand Down
Loading

0 comments on commit 84e471c

Please sign in to comment.