Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk session recordings #3566

Closed
wants to merge 13 commits into from
42 changes: 39 additions & 3 deletions posthog/api/capture.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json
import re
from copy import deepcopy
from datetime import datetime
from random import random
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

import statsd
from dateutil import parser
Expand All @@ -15,7 +16,7 @@
from posthog.models import Team, User
from posthog.models.feature_flag import get_active_feature_flags
from posthog.models.utils import UUIDT
from posthog.utils import cors_response, get_ip_address, load_data_from_request
from posthog.utils import chunk_string, cors_response, get_ip_address, load_data_from_request

if settings.EE_AVAILABLE:
from ee.clickhouse.process_event import log_event, process_event_ee
Expand Down Expand Up @@ -93,6 +94,39 @@ def _ensure_web_feature_flags_in_properties(event: Dict[str, Any], team: Team, d
event["properties"]["$active_feature_flags"] = get_active_feature_flags(team, distinct_id)


def _split_large_snapshot_events(events: List[Dict[str, Any]]):
split_events: List[Dict[str, Any]] = []
for event in events:
if event["event"] == "$snapshot":
snapshot_data = event["properties"]["$snapshot_data"]
snapshot_json = json.dumps(snapshot_data)

if len(snapshot_json) > settings.SESSION_RECORDING_CHUNK_SIZE:
chunks = chunk_string(snapshot_json, settings.SESSION_RECORDING_CHUNK_SIZE)
snapshot_id = str(UUIDT())

event_template = deepcopy(event)
event_template["properties"]["$snapshot_data"] = {}

for index, chunk in enumerate(chunks):
new_event = deepcopy(event_template)
new_event["properties"]["$snapshot_data"] = {
"posthog_chunked": True,
"snapshot_id": snapshot_id,
"snapshot_length": len(snapshot_json),
"chunk_data": chunk,
"chunk_length": len(chunk),
"chunk_index": index,
"chunk_count": len(chunks),
}
split_events.append(new_event)
else:
split_events.append(event)
else:
split_events.append(event)
return split_events


@csrf_exempt
def get_event(request):
timer = statsd.Timer("%s_posthog_cloud" % (settings.STATSD_PREFIX,))
Expand Down Expand Up @@ -174,6 +208,8 @@ def get_event(request):
else:
events = [data]

events = _split_large_snapshot_events(events)

for event in events:
try:
distinct_id = _get_distinct_id(event)
Expand Down
72 changes: 72 additions & 0 deletions posthog/api/test/test_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,75 @@ def test_add_feature_flags_if_missing(self, patch_process_event_with_plugins) ->
)
arguments = self._to_arguments(patch_process_event_with_plugins)
self.assertEqual(arguments["data"]["properties"]["$active_feature_flags"], ["test-ff"])

@patch("posthog.models.team.TEAM_CACHE", {})
@patch("posthog.api.capture.celery_app.send_task")
def test_split_session_recordings(self, patch_process_event_with_plugins):
with self.settings(SESSION_RECORDING_CHUNK_SIZE=100):
now = timezone.now()

# typical $snapshot_data property in a $snapshot event
snapshot_data = {
"data": {
"adds": [
{
"node": {"id": 2040, "type": 2, "tagName": "div", "attributes": {}, "childNodes": []},
"nextId": None,
"parentId": 39,
}
]
* 100
},
"type": 3,
"timestamp": 1611611017482,
}
data = {
"event": "$snapshot",
"timestamp": now.isoformat(),
"properties": {
"$session_id": "session123",
"$snapshot_data": snapshot_data,
"distinct_id": "userid123",
},
"api_key": self.team.api_token,
}
self.client.get(
"/e/?_=%s&data=%s" % (int(now.timestamp()), quote(self._dict_to_json(data))),
content_type="application/json",
HTTP_ORIGIN="https://localhost",
)

events = [call_args[1]["args"][3] for call_args in patch_process_event_with_plugins.call_args_list]
snapshot_datas = [event["properties"]["$snapshot_data"] for event in events]

self.assertEqual(
json.loads(json.dumps(events[0])),
{
"event": "$snapshot",
"timestamp": now.isoformat(),
"properties": {
"$session_id": "session123",
"distinct_id": "userid123",
"$snapshot_data": {
"posthog_chunked": True,
"snapshot_id": events[0]["properties"]["$snapshot_data"]["snapshot_id"],
"snapshot_length": 12159,
"chunk_data": '{"data": {"adds": [{"node": {"id": 2040, "type": 2, "tagName": "div", "attributes": {}, "childNodes"',
"chunk_length": 100,
"chunk_index": 0,
"chunk_count": 122,
},
},
"api_key": "token123",
},
)

self.assertEqual(set([e["event"] for e in events]), {"$snapshot"})
self.assertEqual(set([s["snapshot_id"] for s in snapshot_datas]), {snapshot_datas[0]["snapshot_id"]})
self.assertEqual(set([s["posthog_chunked"] for s in snapshot_datas]), {True})
self.assertEqual(set([s["chunk_index"] for s in snapshot_datas]), set(range(122)))
self.assertEqual(set([s["chunk_count"] for s in snapshot_datas]), {122})
self.assertEqual(sum([s["chunk_length"] for s in snapshot_datas]), snapshot_datas[0]["snapshot_length"])

# join all the chunks, assume they're in order
self.assertEqual("".join([s["chunk_data"] for s in snapshot_datas]), json.dumps(snapshot_data))
48 changes: 47 additions & 1 deletion posthog/queries/sessions/session_recording.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import json
from typing import (
Any,
Callable,
Expand All @@ -11,6 +12,7 @@
)

from django.db import connection
from sentry_sdk.api import capture_message

from posthog.models import Person, SessionRecordingEvent, Team
from posthog.models.filters.sessions_filter import SessionsFilter
Expand Down Expand Up @@ -59,10 +61,54 @@ def query_recording_snapshots(

return events[0].distinct_id, events[0].timestamp, [e.snapshot_data for e in events]

def merge_snapshot_chunks(self, team: Team, session_id: str, unmerged_snapshots: List[Dict[str, any]]):
snapshot_collectors: Dict[str, Dict[str, any]] = {} # gather the chunks of a snapshot
snapshots_and_collectors: List[Dict[str, any]] = [] # list the snapshots in order

for chunk_or_snapshot in unmerged_snapshots:
if chunk_or_snapshot.get("posthog_chunked"): # it's a chunk
collector = snapshot_collectors.get(chunk_or_snapshot["snapshot_id"])
if not collector:
collector = {
"collector": True,
"event": chunk_or_snapshot,
"count": chunk_or_snapshot["chunk_count"],
"chunks": {},
}
snapshot_collectors[chunk_or_snapshot["snapshot_id"]] = collector
snapshots_and_collectors.append(collector)

collector["chunks"][chunk_or_snapshot["chunk_index"]] = chunk_or_snapshot["chunk_data"]
else: # full snapshot
snapshots_and_collectors.append({"snapshot": True, "data": chunk_or_snapshot})

snapshots = []
for snapshot_or_collector in snapshots_and_collectors:
if snapshot_or_collector.get("collector"):
has_all_chunks = True
data = ""
for i in range(snapshot_or_collector["count"]):
if not snapshot_or_collector["chunks"].get(i):
has_all_chunks = False
break
data = data + snapshot_or_collector["chunks"][i]

if has_all_chunks:
snapshots.append(json.loads(data))
else:
capture_message(
"Did not find all session recording chunks! Team: {}, Session: {}".format(team.pk, session_id)
)
elif snapshot_or_collector.get("snapshot"):
snapshots.append(snapshot_or_collector["data"])

return snapshots

def run(self, team: Team, session_recording_id: str, *args, **kwargs) -> Dict[str, Any]:
from posthog.api.person import PersonSerializer

distinct_id, start_time, snapshots = self.query_recording_snapshots(team, session_recording_id)
distinct_id, start_time, unmerged_snapshots = self.query_recording_snapshots(team, session_recording_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will SESSIONS_IN_RANGE_QUERY above still continue working with this setup? From a cursory glance at the code it seems not.

Basically there will always be sessions with incomplete data - e.g. person leaves the page before posthog.js finishes sending the full payload event or the larger payload gets blocked. We should not show sessions where there's no full snapshot event which the below line accomplishes:

COUNT(*) FILTER(where snapshot_data->>'type' = '2') as full_snapshots

snapshots = self.merge_snapshot_chunks(team, session_recording_id, unmerged_snapshots)
person = (
PersonSerializer(Person.objects.get(team=team, persondistinctid__distinct_id=distinct_id)).data
if distinct_id
Expand Down
61 changes: 57 additions & 4 deletions posthog/queries/sessions/test/test_session_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ def test_query_run(self):
self.assertEqual(
session["snapshots"],
[
{"timestamp": 1_600_000_000, "type": 2},
{"timestamp": 1_600_000_010, "type": 2},
{"timestamp": 1_600_000_030, "type": 2},
{"timestamp": 1_600_000_000_000, "type": 2},
{"timestamp": 1_600_000_010_000, "type": 2},
{"timestamp": 1_600_000_030_000, "type": 2},
],
)
self.assertEqual(session["person"]["properties"], {"$some_prop": "something"})
Expand All @@ -36,6 +36,28 @@ def test_query_run_with_no_such_session(self):
session = session_recording().run(team=self.team, session_recording_id="xxx")
self.assertEqual(session, {"snapshots": [], "person": None, "start_time": None})

def test_query_chunk_merge(self):
with freeze_time("2020-09-13T12:26:40.000Z"):
Person.objects.create(team=self.team, distinct_ids=["user"], properties={"$some_prop": "something"})

self.create_snapshot_chunks("user", "1", now())
self.create_snapshot("user", "1", now() + relativedelta(seconds=10))
self.create_snapshot_chunks("user2", "2", now() + relativedelta(seconds=20))
self.create_snapshot_chunks("user", "1", now() + relativedelta(seconds=30))

session = session_recording().run(team=self.team, session_recording_id="1")
self.maxDiff = None
self.assertEqual(
session["snapshots"],
[
{"data": {"adds": [{"node": {"id": 2040}}] * 100}, "timestamp": 1_600_000_000_000, "type": 3},
{"timestamp": 1_600_000_010_000, "type": 2},
{"data": {"adds": [{"node": {"id": 2040}}] * 100}, "timestamp": 1_600_000_030_000, "type": 3},
],
)
self.assertEqual(session["person"]["properties"], {"$some_prop": "something"})
self.assertEqual(session["start_time"], now())

def _test_filter_sessions(self, filter, expected):
with freeze_time("2020-09-13T12:26:40.000Z"):
self.create_snapshot("user", "1", now() + relativedelta(seconds=5))
Expand Down Expand Up @@ -113,9 +135,40 @@ def create_snapshot(self, distinct_id, session_id, timestamp, type=2):
distinct_id=distinct_id,
timestamp=timestamp,
session_id=session_id,
snapshot_data={"timestamp": timestamp.timestamp(), "type": type},
snapshot_data={"timestamp": int(timestamp.timestamp() * 1000), "type": type},
)

def create_snapshot_chunks(self, distinct_id, session_id, timestamp):
from posthog.api.capture import _split_large_snapshot_events

snapshot_data = {
"data": {"adds": [{"node": {"id": 2040}}] * 100},
"type": 3,
"timestamp": int(timestamp.timestamp() * 1000),
}
data = {
"event": "$snapshot",
"timestamp": timestamp,
"properties": {
"$session_id": "session123",
"$snapshot_data": snapshot_data,
"distinct_id": "userid123",
},
"api_key": self.team.api_token,
}

with self.settings(SESSION_RECORDING_CHUNK_SIZE=100):
events = _split_large_snapshot_events([data])
self.assertGreater(len(events), 1) # make sure we did actually split it
for event in events:
event_factory(
team_id=self.team.pk,
distinct_id=distinct_id,
timestamp=timestamp,
session_id=session_id,
snapshot_data=event["properties"]["$snapshot_data"],
)

return TestSessionRecording


Expand Down
2 changes: 2 additions & 0 deletions posthog/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def print_warning(warning_lines: Sequence[str]):
PLUGINS_CELERY_QUEUE = os.getenv("PLUGINS_CELERY_QUEUE", "posthog-plugins")
PLUGINS_RELOAD_PUBSUB_CHANNEL = os.getenv("PLUGINS_RELOAD_PUBSUB_CHANNEL", "reload-plugins")

SESSION_RECORDING_CHUNK_SIZE = int(os.getenv("SESSION_RECORDING_CHUNK_SIZE", 512 * 1024)) # 512 KB

# Tokens used when installing plugins, for example to get the latest commit SHA or to download private repositories.
# Used mainly to get around API limits and only if no ?private_token=TOKEN found in the plugin URL.
GITLAB_TOKEN = os.getenv("GITLAB_TOKEN", None)
Expand Down
5 changes: 5 additions & 0 deletions posthog/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,3 +612,8 @@ def is_valid_regex(value: Any) -> bool:
return True
except re.error:
return False


def chunk_string(string: str, chunk_length: int) -> List[str]:
"""Split a string into chunk_length-sized elements. Reversal operation: `''.join()`."""
return [string[0 + offset : chunk_length + offset] for offset in range(0, len(string), chunk_length)]