Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Action matching juice #436

Merged
merged 40 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8474769
Reorient `ActionManager` to group by teamId for practicality
Twixes May 26, 2021
dfda86b
Make `getTeamActions()` return type more versatile
Twixes May 26, 2021
8b0b287
Add ActionMatcher base
Twixes May 26, 2021
21c97b1
Add `matchActions` worker task for optimization
Twixes May 26, 2021
7c5ee2d
Add part of action matching checks
Twixes May 27, 2021
11457d0
Fix PubSub's lack of teamId
Twixes May 27, 2021
13ffb0a
Merge branch 'master' into actionmanager-reoriented
Twixes May 27, 2021
0250b7f
Merge branch 'actionmanager-reoriented' into 235-action-matching-plus
Twixes May 27, 2021
6faf363
Remove `eventsProcessor.prepare()` calls
Twixes May 27, 2021
0c12c0b
Merge branch 'master' into 235-action-matching-plus
Twixes May 27, 2021
a957179
Add a legit ActionMatcher test
Twixes May 27, 2021
b072eb2
Adjust test for matchActions task
Twixes May 27, 2021
8b091d8
Improve task counting in test
Twixes May 27, 2021
4f2907f
Merge branch 'master' into 235-action-matching-plus
Twixes May 27, 2021
f5a3c05
Add moar matching capabilities
Twixes May 28, 2021
5fc1d68
Improve tests
Twixes May 28, 2021
2332121
Reorganize class dependencies
Twixes May 29, 2021
c80dee4
Add cohort matching
Twixes May 29, 2021
50dd10a
Add element/selector matching and polish other action matching parts
Twixes Jun 2, 2021
06eae9a
Merge branch 'master' into 235-action-matching-plus
Twixes Jun 3, 2021
b3ef524
Handle selector matching edge cases
Twixes Jun 3, 2021
bd2ee11
Save matched action occurrences to Postgres
Twixes Jun 3, 2021
f89acc7
Fix action-matcher tests
Twixes Jun 3, 2021
c703941
Merge branch 'master' into 235-action-matching-plus
Twixes Jun 3, 2021
cea9f54
Don't expose ActionManager methods in ActionMatcher
Twixes Jun 3, 2021
5bd0a36
Use feedback + RE2
Twixes Jun 4, 2021
ab3c0c5
Remove never satisfied branch
Twixes Jun 4, 2021
d0b4e71
Merge branch 'master' into 235-action-matching-plus
Twixes Jun 9, 2021
5dde78f
Address sum feedback and clean code up
Twixes Jun 9, 2021
90645ad
Use action matching results in a smarter way
Twixes Jun 9, 2021
5ca459c
Fix `createHub`
Twixes Jun 9, 2021
d75533d
Add action matching metric
Twixes Jun 9, 2021
02d258a
Enhance `PLUGIN_SERVER_ACTION_MATCHING`
Twixes Jun 9, 2021
6631888
Update action-matcher.test.ts
Twixes Jun 9, 2021
9aad73d
Merge branch 'master' into 235-action-matching-plus
Twixes Jun 9, 2021
5a2f7e7
Remove `||=`
Twixes Jun 9, 2021
cde9074
Update process-event.ts
Twixes Jun 9, 2021
73add38
Only fetch person if matching actions
Twixes Jun 9, 2021
e523ecf
Fix non-string distinct ID handling
Twixes Jun 9, 2021
6ea2f6d
Update process-event.ts
Twixes Jun 9, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion benchmarks/postgres/ingestion.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ describe('ingestion benchmarks', () => {
LOG_LEVEL: LogLevel.Log,
})
eventsProcessor = new EventsProcessor(hub)
await eventsProcessor.prepare()
team = await getFirstTeam(hub)
now = DateTime.utc()

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"@types/lru-cache": "^5.1.0",
"adm-zip": "^0.4.16",
"aws-sdk": "^2.884.0",
"escape-string-regexp": "^4.0.0",
"fast-deep-equal": "^3.1.3",
"generic-pool": "^3.7.1",
"graphile-worker": "^0.11.1",
Expand All @@ -75,6 +76,7 @@
"posthog-js-lite": "^0.0.5",
"pretty-bytes": "^5.6.0",
"protobufjs": "^6.10.2",
"re2": "^1.16.0",
"redlock": "^4.2.0",
"snowflake-sdk": "^1.6.0",
"tar-stream": "^2.1.4",
Expand Down
33 changes: 18 additions & 15 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { DB } from './utils/db/db'
import { KafkaProducerWrapper } from './utils/db/kafka-producer-wrapper'
import { InternalMetrics } from './utils/internal-metrics'
import { UUID } from './utils/utils'
import { ActionManager } from './worker/ingestion/action-manager'
import { ActionMatcher } from './worker/ingestion/action-matcher'
import { EventsProcessor } from './worker/ingestion/process-event'
import { LazyPluginVM } from './worker/vm/lazy'

Expand Down Expand Up @@ -115,6 +117,8 @@ export interface Hub extends PluginsServerConfig {
pluginConfigSecrets: Map<PluginConfigId, string>
pluginConfigSecretLookup: Map<string, PluginConfigId>
// tools
actionManager: ActionManager
actionMatcher: ActionMatcher
eventsProcessor: EventsProcessor
jobQueueManager: JobQueueManager
// diagnostics
Expand Down Expand Up @@ -487,44 +491,43 @@ export enum PropertyOperator {
}

/** Sync with posthog/frontend/src/types.ts */
interface BasePropertyFilter {
interface PropertyFilterBase {
key: string
value: string | number | Array<string | number> | null
label?: string
}

/** Sync with posthog/frontend/src/types.ts */
export interface EventPropertyFilter extends BasePropertyFilter {
type: 'event'
export interface PropertyFilterWithOperator extends PropertyFilterBase {
operator: PropertyOperator
}

/** Sync with posthog/frontend/src/types.ts */
export interface PersonPropertyFilter extends BasePropertyFilter {
export interface EventPropertyFilter extends PropertyFilterWithOperator {
type: 'event'
}

/** Sync with posthog/frontend/src/types.ts */
export interface PersonPropertyFilter extends PropertyFilterWithOperator {
type: 'person'
operator: PropertyOperator
}

/** Sync with posthog/frontend/src/types.ts */
export interface ElementPropertyFilter extends BasePropertyFilter {
export interface ElementPropertyFilter extends PropertyFilterWithOperator {
type: 'element'
key: 'tag_name' | 'text' | 'href' | 'selector'
operator: PropertyOperator
value: string
}

/** Sync with posthog/frontend/src/types.ts */
export interface CohortPropertyFilter extends BasePropertyFilter {
export interface CohortPropertyFilter extends PropertyFilterBase {
type: 'cohort'
key: 'id'
value: number
value: number | string
}

/** Sync with posthog/frontend/src/types.ts */
export type ActionStepProperties =
| EventPropertyFilter
| PersonPropertyFilter
| ElementPropertyFilter
| CohortPropertyFilter
export type PropertyFilter = EventPropertyFilter | PersonPropertyFilter | ElementPropertyFilter | CohortPropertyFilter

/** Sync with posthog/frontend/src/types.ts */
export enum ActionStepUrlMatching {
Expand All @@ -544,7 +547,7 @@ export interface ActionStep {
url_matching: ActionStepUrlMatching | null
name: string | null
event: string | null
properties: ActionStepProperties[] | null
properties: PropertyFilter[] | null
}

/** Raw Action row from database. */
Expand Down
21 changes: 20 additions & 1 deletion src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,17 @@ export class DB {
}
}

// Cohort & CohortPeople

public async doesPersonBelongToCohort(cohortId: number, personId: Person['id']): Promise<boolean> {
const selectResult = await this.postgresQuery(
`SELECT EXISTS (SELECT 1 FROM posthog_cohortpeople WHERE cohort_id = $1 AND person_id = $2);`,
[cohortId, personId],
'doesPersonBelongToCohort'
)
return selectResult.rows[0]
}

// Organization

public async fetchOrganization(organizationId: string): Promise<RawOrganization | undefined> {
Expand Down Expand Up @@ -724,7 +735,7 @@ export class DB {
).rows as PropertyDefinitionType[]
}

// Action & ActionStep
// Action & ActionStep & Action<>Event

public async fetchAllActionsGroupedByTeam(): Promise<Record<Team['id'], Record<Action['id'], Action>>> {
const rawActions: RawAction[] = (
Expand Down Expand Up @@ -772,6 +783,14 @@ export class DB {
return action
}

public async registerActionOccurrence(actionId: Action['id'], eventId: Event['id']): Promise<void> {
await this.postgresQuery(
`INSERT INTO posthog_action_events_2 (action_id, event_id) VALUES ($1, $2)`, // TODO: remove _2
[actionId, eventId],
'registerActionOccurrence'
)
}

// Team Internal Metrics

public async fetchInternalMetricsTeam(): Promise<Team['id'] | null> {
Expand Down
6 changes: 5 additions & 1 deletion src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { ConnectionOptions } from 'tls'
import { defaultConfig } from '../../config/config'
import { JobQueueManager } from '../../main/job-queues/job-queue-manager'
import { Hub, PluginsServerConfig } from '../../types'
import { ActionManager } from '../../worker/ingestion/action-manager'
import { ActionMatcher } from '../../worker/ingestion/action-matcher'
import { EventsProcessor } from '../../worker/ingestion/process-event'
import { InternalMetrics } from '../internal-metrics'
import { killProcess } from '../kill'
Expand Down Expand Up @@ -175,8 +177,10 @@ export async function createHub(
}

// :TODO: This is only used on worker threads, not main
hub.actionManager = new ActionManager(db)
await hub.actionManager.prepare()
hub.actionMatcher = new ActionMatcher(db, hub.actionManager)
hub.eventsProcessor = new EventsProcessor(hub as Hub)
await hub.eventsProcessor.prepare()
hub.jobQueueManager = new JobQueueManager(hub as Hub)

if (serverConfig.CAPTURE_INTERNAL_METRICS) {
Expand Down
15 changes: 14 additions & 1 deletion src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { Readable } from 'stream'
import * as tar from 'tar-stream'
import * as zlib from 'zlib'

import { LogLevel, Plugin, PluginConfigId, PluginsServerConfig, TimestampFormat } from '../types'
import { Element, LogLevel, Plugin, PluginConfigId, PluginsServerConfig, TimestampFormat } from '../types'
import { status } from './status'

/** Time until autoexit (due to error) gives up on graceful exit and kills the process right away. */
Expand Down Expand Up @@ -606,3 +606,16 @@ export function stringClamp(value: string, def: number, min: number, max: number
const nanToNull = (nr: number): null | number => (isNaN(nr) ? null : nr)
return clamp(nanToNull(parseInt(value)) ?? def, min, max)
}

export function extractElements(elements: Record<string, any>[]): Element[] {
return elements.map((el) => ({
text: el['$el_text']?.slice(0, 400),
tag_name: el['tag_name'],
href: el['attr__href']?.slice(0, 2048),
attr_class: el['attr__class']?.split(' '),
attr_id: el['attr__id'],
nth_child: el['nth_child'],
nth_of_type: el['nth_of_type'],
attributes: Object.fromEntries(Object.entries(el).filter(([key]) => key.startsWith('attr__'))),
}))
}
17 changes: 13 additions & 4 deletions src/worker/ingestion/action-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ export class ActionManager {
this.ready = true
}

public getTeamActions(teamId: Team['id']): ActionMap | null {
public getTeamActions(teamId: Team['id']): ActionMap {
if (!this.ready) {
throw new Error('ActionManager is not ready! Run actionManager.prepare() before this')
}
return this.actionCache[teamId] || null
return this.actionCache[teamId] || {}
}

public async reloadAllActions(): Promise<void> {
Expand All @@ -35,7 +35,15 @@ export class ActionManager {

public async reloadAction(teamId: Team['id'], actionId: Action['id']): Promise<void> {
const refetchedAction = await this.db.fetchAction(actionId)
const wasCachedAlready = teamId in this.actionCache && actionId in this.actionCache[teamId]

let wasCachedAlready = true
if (!this.actionCache[teamId]) {
wasCachedAlready = false
this.actionCache[teamId] = {}
} else if (!this.actionCache[teamId][actionId]) {
wasCachedAlready = false
}

if (refetchedAction) {
status.info(
'🍿',
Expand All @@ -59,7 +67,8 @@ export class ActionManager {
}

public dropAction(teamId: Team['id'], actionId: Action['id']): void {
const wasCachedAlready = teamId in this.actionCache && actionId in this.actionCache[teamId]
const wasCachedAlready = !!this.actionCache?.[teamId]?.[actionId]

if (wasCachedAlready) {
status.info('🍿', `Deleted action ID ${actionId} (team ID ${teamId}) from cache`)
delete this.actionCache[teamId][actionId]
Expand Down
Loading