Skip to content

Commit

Permalink
Chunk session recording events
Browse files Browse the repository at this point in the history
Closes #3632 and replaces https://github.com/PostHog/posthog/pull/3566/files

This should make it possible to ingest large full snapshot events

Base64 is used to compress the data for serialization purposes.

pytest.mock is used for clean patching methods
  • Loading branch information
macobo committed Mar 19, 2021
1 parent 9b652fe commit f01a876
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 8 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci-backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install freezegun fakeredis pytest pytest-django
python -m pip install freezegun fakeredis pytest pytest-mock pytest-django
if: steps.cache.outputs.cache-hit != 'true'

- name: Check migrations
Expand Down Expand Up @@ -196,7 +196,7 @@ jobs:
cd deploy
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install freezegun fakeredis pytest pytest-django
python -m pip install freezegun fakeredis pytest pytest-mock pytest-django
if: steps.cache.outputs.cache-hit != 'true'

# The 2-step migration process (first master, then current branch) verifies that it'll always
Expand Down Expand Up @@ -300,7 +300,7 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install freezegun fakeredis pytest pytest-django
python -m pip install freezegun fakeredis pytest pytest-mock pytest-django
if: steps.cache.outputs.cache-hit != 'true'

- name: Remove ee
Expand Down
2 changes: 1 addition & 1 deletion ee/clickhouse/queries/clickhouse_session_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
distinct_id,
MIN(timestamp) AS start_time,
MAX(timestamp) AS end_time,
COUNT(JSONExtractInt(snapshot_data, 'type') = 2 ? 1 : NULL) as full_snapshots
COUNT((JSONExtractInt(snapshot_data, 'type') = 2 OR JSONExtractBool(snapshot_data, 'has_full_snapshot')) ? 1 : NULL) as full_snapshots
FROM session_recording_events
WHERE
team_id = %(team_id)s
Expand Down
3 changes: 3 additions & 0 deletions posthog/api/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from posthog.celery import app as celery_app
from posthog.ee import is_ee_enabled
from posthog.helpers.session_recording import preprocess_session_recording_events
from posthog.models import Team, User
from posthog.models.feature_flag import get_active_feature_flags
from posthog.models.utils import UUIDT
Expand Down Expand Up @@ -174,6 +175,8 @@ def get_event(request):
else:
events = [data]

events = preprocess_session_recording_events(events)

for event in events:
try:
distinct_id = _get_distinct_id(event)
Expand Down
97 changes: 97 additions & 0 deletions posthog/helpers/session_recording.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import base64
import gzip
import json
from collections import defaultdict
from typing import Dict, Generator, List

from sentry_sdk.api import capture_message

from posthog.models import utils

Event = Dict
SnapshotData = Dict

FULL_SNAPSHOT = 2


def preprocess_session_recording_events(events: List[Event]) -> List[Event]:
result, snapshots = [], []
for event in events:
if is_snapshot(event):
snapshots.append(event)
else:
result.append(event)

if len(snapshots) > 0:
result.extend(list(compress_and_chunk_snapshots(snapshots)))

return result


def compress_and_chunk_snapshots(events: List[Event], chunk_size=512 * 1024) -> Generator[Event, None, None]:
data_list = [event["properties"]["$snapshot_data"] for event in events]
session_id = events[0]["properties"]["$session_id"] # assumption: all events within a request have same session_id
has_full_snapshot = any(snapshot_data["type"] == FULL_SNAPSHOT for snapshot_data in data_list)

compressed_data = compress_to_string(json.dumps(data_list))

id = str(utils.UUIDT())
chunks = chunk_string(compressed_data, chunk_size)
for index, chunk in enumerate(chunks):
yield {
**events[0],
"properties": {
**events[0]["properties"],
"$session_id": session_id,
"$snapshot_data": {
"chunk_id": id,
"chunk_index": index,
"chunk_count": len(chunks),
"data": chunk,
"compression": "gzip-base64",
"has_full_snapshot": has_full_snapshot,
},
},
}


def decompress_chunked_snapshot_data(
team_id: int, session_recording_id: str, snapshot_list: List[SnapshotData]
) -> Generator[SnapshotData, None, None]:
chunks_collector = defaultdict(list)
for snapshot_data in snapshot_list:
if "chunk_id" not in snapshot_data:
yield snapshot_data
else:
chunks_collector[snapshot_data["chunk_id"]].append(snapshot_data)

for chunks in chunks_collector.values():
if len(chunks) != chunks[0]["chunk_count"]:
capture_message(
"Did not find all session recording chunks! Team: {}, Session: {}".format(team_id, session_recording_id)
)
continue

b64_compressed_data = "".join(chunk["data"] for chunk in sorted(chunks, key=lambda c: c["chunk_index"]))
decompressed_data = json.loads(decompress(b64_compressed_data))

yield from decompressed_data


def chunk_string(string: str, chunk_length: int) -> List[str]:
"""Split a string into chunk_length-sized elements. Reversal operation: `''.join()`."""
return [string[0 + offset : chunk_length + offset] for offset in range(0, len(string), chunk_length)]


def is_snapshot(event: Dict) -> bool:
return event["event"] == "$snapshot"


def compress_to_string(json_string: str) -> str:
compressed_data = gzip.compress(json_string.encode("utf-16", "surrogatepass"), mtime=0)
return base64.b64encode(compressed_data).decode("utf-8")


def decompress(base64data: str) -> str:
compressed_bytes = base64.b64decode(base64data)
return gzip.decompress(compressed_bytes).decode("utf-16", "surrogatepass")
Empty file.
126 changes: 126 additions & 0 deletions posthog/helpers/tests/test_session_recording_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import pytest
from pytest_mock import MockerFixture

from posthog.helpers.session_recording import (
compress_and_chunk_snapshots,
decompress_chunked_snapshot_data,
preprocess_session_recording_events,
)


def test_preprocess_with_no_recordings():
events = [{"event": "$pageview"}, {"event": "$pageleave"}]
assert preprocess_session_recording_events(events) == events


def test_preprocess_recording_event_creates_chunks():
events = [
{
"event": "$snapshot",
"properties": {"$session_id": "1234", "$snapshot_data": {"type": 2, "foo": "bar"}, "distinct_id": "abc123"},
}
]

preprocessed = preprocess_session_recording_events(events)
assert preprocessed != events
assert len(preprocessed) == 1
assert preprocessed[0]["event"] == "$snapshot"
assert preprocessed[0]["properties"]["$session_id"] == "1234"
assert preprocessed[0]["properties"]["distinct_id"] == "abc123"
assert "chunk_id" in preprocessed[0]["properties"]["$snapshot_data"]


def test_compression_and_chunking(snapshot_events, mocker: MockerFixture):
mocker.patch("posthog.models.utils.UUIDT", return_value="0178495e-8521-0000-8e1c-2652fa57099b")

assert list(compress_and_chunk_snapshots(snapshot_events)) == [
{
"event": "$snapshot",
"properties": {
"$session_id": "1234",
"$snapshot_data": {
"chunk_count": 1,
"chunk_id": "0178495e-8521-0000-8e1c-2652fa57099b",
"chunk_index": 0,
"compression": "gzip-base64",
"data": "H4sIAAAAAAAC//v/L5qhmkGJoYShkqGAIRXIsmJQYDBi0AGSSgxpDPlACBFTYkhiSGQoAtK1YFlMXcZYdVUB5UuAOkH6YhkAxKw6nnAAAAA=",
"has_full_snapshot": True,
},
"distinct_id": "abc123",
},
}
]


def test_decompression_results_in_same_data(snapshot_events):
assert len(list(compress_and_chunk_snapshots(snapshot_events, 1000))) == 1
assert compress_and_decompress(snapshot_events, 1000) == [
snapshot_events[0]["properties"]["$snapshot_data"],
snapshot_events[1]["properties"]["$snapshot_data"],
]
assert len(list(compress_and_chunk_snapshots(snapshot_events, 100))) == 2
assert compress_and_decompress(snapshot_events, 100) == [
snapshot_events[0]["properties"]["$snapshot_data"],
snapshot_events[1]["properties"]["$snapshot_data"],
]


def test_has_full_snapshot_property(snapshot_events):
compressed = list(compress_and_chunk_snapshots(snapshot_events))
assert len(compressed) == 1
assert compressed[0]["properties"]["$snapshot_data"]["has_full_snapshot"]

snapshot_events[0]["properties"]["$snapshot_data"]["type"] = 0
compressed = list(compress_and_chunk_snapshots(snapshot_events))
assert len(compressed) == 1
assert not compressed[0]["properties"]["$snapshot_data"]["has_full_snapshot"]


def test_decompress_returns_unmodified_events(snapshot_events):
snapshot_data = [event["properties"]["$snapshot_data"] for event in snapshot_events]
assert list(decompress_chunked_snapshot_data(1, "someid", snapshot_data)) == snapshot_data


def test_decompress_ignores_if_not_enough_chunks(snapshot_events):
snapshot_data = complete_snapshots = [event["properties"]["$snapshot_data"] for event in snapshot_events]
snapshot_data.append(
{
"$session_id": "1234",
"$snapshot_data": {
"chunk_id": "unique_id",
"chunk_index": 1,
"chunk_count": 2,
"data": {},
"compression": "gzip",
"has_full_snapshot": False,
},
"distinct_id": "abc123",
}
)

assert list(decompress_chunked_snapshot_data(1, "someid", snapshot_data)) == complete_snapshots


@pytest.fixture
def snapshot_events():
return [
{
"event": "$snapshot",
"properties": {"$session_id": "1234", "$snapshot_data": {"type": 2, "foo": "bar"}, "distinct_id": "abc123"},
},
{
"event": "$snapshot",
"properties": {
"$session_id": "1234",
"$snapshot_data": {"type": 3, "foo": "zeta"},
"distinct_id": "abc123",
},
},
]


def compress_and_decompress(events, chunk_size):
snapshot_data = [
event["properties"]["$snapshot_data"] for event in compress_and_chunk_snapshots(events, chunk_size)
]
return list(decompress_chunked_snapshot_data(1, "someid", snapshot_data))
5 changes: 4 additions & 1 deletion posthog/queries/sessions/session_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from django.db import connection

from posthog.helpers.session_recording import decompress_chunked_snapshot_data
from posthog.models import Person, SessionRecordingEvent, Team
from posthog.models.filters.sessions_filter import SessionsFilter
from posthog.models.session_recording_event import SessionRecordingViewed
Expand All @@ -36,7 +37,7 @@
MIN(timestamp) as start_time,
MAX(timestamp) as end_time,
MAX(timestamp) - MIN(timestamp) as duration,
COUNT(*) FILTER(where snapshot_data->>'type' = '2') as full_snapshots
COUNT(*) FILTER(where snapshot_data->>'type' = '2' OR (snapshot_data->>'has_full_snapshot')::boolean) as full_snapshots
FROM posthog_sessionrecordingevent
WHERE
team_id = %(team_id)s
Expand All @@ -63,6 +64,8 @@ def run(self, team: Team, session_recording_id: str, *args, **kwargs) -> Dict[st
from posthog.api.person import PersonSerializer

distinct_id, start_time, snapshots = self.query_recording_snapshots(team, session_recording_id)
snapshots = list(decompress_chunked_snapshot_data(team.pk, session_recording_id, snapshots))

person = (
PersonSerializer(Person.objects.get(team=team, persondistinctid__distinct_id=distinct_id)).data
if distinct_id
Expand Down
22 changes: 19 additions & 3 deletions posthog/queries/sessions/test/test_session_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,18 @@ def _test_filter_sessions(self, filter, expected):
self.create_snapshot("user", "3", now() + relativedelta(seconds=15))
self.create_snapshot("user", "3", now() + relativedelta(seconds=20))
self.create_snapshot("user", "3", now() + relativedelta(seconds=60))
self.create_snapshot("user", "4", now() + relativedelta(seconds=999))
self.create_snapshot("user", "4", now() + relativedelta(seconds=1020))
self.create_chunked_snapshot(
"user", "4", now() + relativedelta(seconds=999), {"chunk_id": "afb", "has_full_snapshot": True}
)
self.create_snapshot("user", "4", now() + relativedelta(seconds=1020), type=1)

self.create_snapshot("broken-user", "5", now() + relativedelta(seconds=10), type=3)
self.create_snapshot("broken-user", "5", now() + relativedelta(seconds=20), type=3)
self.create_chunked_snapshot(
"broken-user",
"5",
now() + relativedelta(seconds=20),
{"chunk_id": "afb", "has_full_snapshot": False},
)

sessions = [
{"distinct_id": "user", "start_time": now(), "end_time": now() + relativedelta(seconds=100)},
Expand Down Expand Up @@ -116,6 +123,15 @@ def create_snapshot(self, distinct_id, session_id, timestamp, type=2):
snapshot_data={"timestamp": timestamp.timestamp(), "type": type},
)

def create_chunked_snapshot(self, distinct_id, session_id, timestamp, snapshot_data):
event_factory(
team_id=self.team.pk,
distinct_id=distinct_id,
timestamp=timestamp,
session_id=session_id,
snapshot_data=snapshot_data,
)

return TestSessionRecording


Expand Down
1 change: 1 addition & 0 deletions requirements-dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ black
isort
pytest
pytest-django
pytest-mock
3 changes: 3 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,13 @@ pyparsing==2.4.7
# via packaging
pytest-django==4.1.0
# via -r requirements-dev.in
pytest-mock==3.5.1
# via -r requirements-dev.in
pytest==6.2.2
# via
# -r requirements-dev.in
# pytest-django
# pytest-mock
python-dateutil==2.8.1
# via freezegun
pytz==2021.1
Expand Down

0 comments on commit f01a876

Please sign in to comment.