Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data-warehouse): Build a new postgres source #28660

Merged
merged 22 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 171 additions & 92 deletions mypy-baseline.txt

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions posthog/temporal/data_imports/pipelines/helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from posthog.warehouse.models import ExternalDataJob
from django.db.models import F
from posthog.warehouse.types import IncrementalFieldType
from posthog.warehouse.util import database_sync_to_async
from zoneinfo import ZoneInfo
from datetime import datetime, date


@database_sync_to_async
Expand All @@ -11,3 +14,12 @@ def aget_external_data_job(team_id, job_id):
@database_sync_to_async
def aupdate_job_count(job_id: str, team_id: int, count: int):
ExternalDataJob.objects.filter(id=job_id, team_id=team_id).update(rows_synced=F("rows_synced") + count)


def incremental_type_to_initial_value(field_type: IncrementalFieldType) -> int | datetime | date:
if field_type == IncrementalFieldType.Integer or field_type == IncrementalFieldType.Numeric:
return 0
if field_type == IncrementalFieldType.DateTime or field_type == IncrementalFieldType.Timestamp:
return datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=ZoneInfo("UTC"))
if field_type == IncrementalFieldType.Date:
return date(1970, 1, 1)
40 changes: 28 additions & 12 deletions posthog/temporal/data_imports/pipelines/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
import time
from typing import Any
import pyarrow as pa
from dlt.sources import DltSource, DltResource
from dlt.sources import DltSource
import deltalake as deltalake
from posthog.temporal.common.logger import FilteringBoundLogger
from posthog.temporal.data_imports.pipelines.pipeline.typings import SourceResponse
from posthog.temporal.data_imports.pipelines.pipeline.utils import (
_get_column_hints,
_handle_null_columns_with_definitions,
_update_incremental_state,
_get_primary_keys,
Expand All @@ -24,7 +26,7 @@


class PipelineNonDLT:
_resource: DltResource
_resource: SourceResponse
_resource_name: str
_job: ExternalDataJob
_schema: ExternalDataSchema
Expand All @@ -36,14 +38,29 @@ class PipelineNonDLT:
_load_id: int

def __init__(
self, source: DltSource, logger: FilteringBoundLogger, job_id: str, is_incremental: bool, reset_pipeline: bool
self,
source: DltSource | SourceResponse,
logger: FilteringBoundLogger,
job_id: str,
is_incremental: bool,
reset_pipeline: bool,
) -> None:
resources = list(source.resources.items())
assert len(resources) == 1
resource_name, resource = resources[0]
if isinstance(source, DltSource):
resources = list(source.resources.items())
assert len(resources) == 1
resource_name, resource = resources[0]

self._resource_name = resource_name
self._resource = SourceResponse(
items=resource,
primary_keys=_get_primary_keys(resource),
name=resource_name,
column_hints=_get_column_hints(resource),
)
else:
self._resource = source
self._resource_name = source.name

self._resource = resource
self._resource_name = resource_name
self._job = ExternalDataJob.objects.prefetch_related("schema").get(id=job_id)
self._is_incremental = is_incremental
self._reset_pipeline = reset_pipeline
Expand All @@ -54,7 +71,7 @@ def __init__(
assert schema is not None
self._schema = schema

self._delta_table_helper = DeltaTableHelper(resource_name, self._job, self._logger)
self._delta_table_helper = DeltaTableHelper(self._resource_name, self._job, self._logger)
self._internal_schema = HogQLSchema()

def run(self):
Expand All @@ -80,7 +97,7 @@ def run(self):
self._schema.sync_type_config.pop("incremental_field_last_value", None)
self._schema.save()

for item in self._resource:
for item in self._resource.items:
py_table = None

if isinstance(item, list):
Expand Down Expand Up @@ -151,9 +168,8 @@ def _process_pa_table(self, pa_table: pa.Table, index: int):
pa_table = _evolve_pyarrow_schema(pa_table, delta_table.schema() if delta_table is not None else None)
pa_table = _handle_null_columns_with_definitions(pa_table, self._resource)

table_primary_keys = _get_primary_keys(self._resource)
delta_table = self._delta_table_helper.write_to_deltalake(
pa_table, self._is_incremental, index, table_primary_keys
pa_table, self._is_incremental, index, self._resource.primary_keys
)

self._internal_schema.add_pyarrow_table(pa_table)
Expand Down
12 changes: 12 additions & 0 deletions posthog/temporal/data_imports/pipelines/pipeline/typings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import dataclasses
from typing import Any
from collections.abc import Iterable
from dlt.common.data_types.typing import TDataType


@dataclasses.dataclass
class SourceResponse:
name: str
items: Iterable[Any]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Using Any for items type parameter reduces type safety. Consider using a more specific type or generic type parameter if possible.

primary_keys: list[str] | None
column_hints: dict[str, TDataType | None] | None = None # Legacy support for DLT sources
Loading
Loading