Skip to content

Commit

Permalink
feat: Backend for BigQuery destination tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Feb 14, 2025
1 parent 0652361 commit 2dab705
Show file tree
Hide file tree
Showing 3 changed files with 510 additions and 67 deletions.
125 changes: 58 additions & 67 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime as dt
from dataclasses import dataclass
from typing import Any, TypedDict, cast

import posthoganalytics
Expand Down Expand Up @@ -526,56 +525,51 @@ def perform_destroy(self, instance: BatchExport):
"""
disable_and_delete_export(instance)

@action(methods=["POST"], detail=True)
def test(self, request: request.Request, *args, **kwargs) -> response.Response:
from posthog.temporal.batch_exports.destination_tests import get_destination_test

class BatchExportOrganizationViewSet(BatchExportViewSet):
filter_rewrite_rules = {"organization_id": "team__organization_id"}
destination = request.data.pop("destination", None)

if not destination:
pass

destination_test = get_destination_test(destination=destination)

return response.Response(destination_test.as_dict())

@action(methods=["POST"], detail=True)
def run_test_step(self, request: request.Request, *args, **kwargs) -> response.Response:
from posthog.temporal.batch_exports.destination_tests import get_destination_test

test_step = request.data.pop("step", 0)

@dataclass
class BatchExportBackfillProgress:
"""Progress information for a batch export backfill."""
serializer = self.get_serializer(data=request.data)
_ = serializer.is_valid(raise_exception=True)

total_runs: int | None
finished_runs: int | None
progress: float | None
destination_test = get_destination_test(
destination=serializer.validated_data["destination"]["type"],
**serializer.validated_data["destination"]["config"],
)

result = destination_test.run_step(test_step)
return response.Response(result.as_dict())


class BatchExportOrganizationViewSet(BatchExportViewSet):
filter_rewrite_rules = {"organization_id": "team__organization_id"}


class BatchExportBackfillSerializer(serializers.ModelSerializer):
progress = serializers.SerializerMethodField(read_only=True)
total_runs = serializers.SerializerMethodField(read_only=True)

class Meta:
model = BatchExportBackfill
fields = "__all__"

def get_progress(self, obj: BatchExportBackfill) -> BatchExportBackfillProgress | None:
"""Return progress information containing total runs, finished runs, and progress percentage.
To reduce the number of database calls we make (which could be expensive when fetching a list of backfills) we
only get the list of completed runs from the DB if the backfill is still running.
"""
if obj.status == obj.Status.COMPLETED:
return BatchExportBackfillProgress(
total_runs=obj.total_expected_runs, finished_runs=obj.total_expected_runs, progress=1.0
)
elif obj.status not in (obj.Status.RUNNING, obj.Status.STARTING):
# if backfill finished in some other state then progress info may not be meaningful
return None

total_runs = obj.total_expected_runs
if not total_runs:
return None

if obj.start_at is None:
# if it's just a single run, backfilling from the beginning of time, we can't calculate progress based on
# the number of completed runs so better to return None
return None

finished_runs = obj.get_finished_runs()
# just make sure we never return a progress > 1
total_runs = max(total_runs, finished_runs)
return BatchExportBackfillProgress(
total_runs=total_runs, finished_runs=finished_runs, progress=round(finished_runs / total_runs, ndigits=1)
)
def get_total_runs(self, obj: BatchExportBackfill) -> int | None:
"""Return the total number of runs for this backfill."""
return obj.total_runs


class BackfillsCursorPagination(CursorPagination):
Expand Down Expand Up @@ -612,35 +606,32 @@ def create_backfill(
end_at = None

if (start_at is not None or end_at is not None) and batch_export.model is not None:
try:
earliest_backfill_start_at = fetch_earliest_backfill_start_at(
team_id=team.pk,
model=batch_export.model,
interval_time_delta=batch_export.interval_time_delta,
exclude_events=batch_export.destination.config.get("exclude_events", []),
include_events=batch_export.destination.config.get("include_events", []),
earliest_backfill_start_at = fetch_earliest_backfill_start_at(
team_id=team.pk,
model=batch_export.model,
interval_time_delta=batch_export.interval_time_delta,
exclude_events=batch_export.destination.config.get("exclude_events", []),
include_events=batch_export.destination.config.get("include_events", []),
)
if earliest_backfill_start_at is None:
raise ValidationError("There is no data to backfill for this model.")

earliest_backfill_start_at = earliest_backfill_start_at.astimezone(team.timezone_info)

if end_at is not None and end_at < earliest_backfill_start_at:
raise ValidationError(
"The provided backfill date range contains no data. The earliest possible backfill start date is "
f"{earliest_backfill_start_at.strftime('%Y-%m-%d %H:%M:%S')}",
)

if start_at is not None and start_at < earliest_backfill_start_at:
logger.info(
"Backfill start_at '%s' is before the earliest possible backfill start_at '%s', setting start_at "
"to earliest_backfill_start_at",
start_at,
earliest_backfill_start_at,
)
if earliest_backfill_start_at is None:
raise ValidationError("There is no data to backfill for this model.")

earliest_backfill_start_at = earliest_backfill_start_at.astimezone(team.timezone_info)

if end_at is not None and end_at < earliest_backfill_start_at:
raise ValidationError(
"The provided backfill date range contains no data. The earliest possible backfill start date is "
f"{earliest_backfill_start_at.strftime('%Y-%m-%d %H:%M:%S')}",
)

if start_at is not None and start_at < earliest_backfill_start_at:
logger.info(
"Backfill start_at '%s' is before the earliest possible backfill start_at '%s', setting start_at "
"to earliest_backfill_start_at",
start_at,
earliest_backfill_start_at,
)
start_at = earliest_backfill_start_at
except NotImplementedError:
logger.warning("No backfill check implemented for model: '%s'; skipping", batch_export.model)
start_at = earliest_backfill_start_at

if start_at is None or end_at is None:
return backfill_export(temporal, str(batch_export.pk), team.pk, start_at, end_at)
Expand Down
Loading

0 comments on commit 2dab705

Please sign in to comment.