From 98eb2c37d0cc723aa2b286c510fca5e6ab9ad2be Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Tue, 6 Dec 2022 18:22:27 -0600 Subject: [PATCH 1/8] add init `AirbyteSync` block in style of `JobBlock` --- prefect_airbyte/connections.py | 271 +++++++++++++++------------------ 1 file changed, 124 insertions(+), 147 deletions(-) diff --git a/prefect_airbyte/connections.py b/prefect_airbyte/connections.py index d660551..4c7864b 100644 --- a/prefect_airbyte/connections.py +++ b/prefect_airbyte/connections.py @@ -1,10 +1,13 @@ """Tasks for connecting to Airbyte and triggering connection syncs""" import uuid from asyncio import sleep -from typing import Optional -from warnings import warn -from prefect import get_run_logger, task +from prefect import flow, get_run_logger +from prefect.blocks.core import Block +from prefect.exceptions import MissingContextError +from prefect.logging import get_logger +from pydantic import BaseModel, Field +from typing_extensions import Literal from prefect_airbyte import exceptions as err from prefect_airbyte.server import AirbyteServer @@ -20,169 +23,143 @@ JOB_STATUS_PENDING = "pending" -@task -async def trigger_sync( - connection_id: str, - airbyte_server: Optional[AirbyteServer] = None, - airbyte_server_host: Optional[str] = None, - airbyte_server_port: Optional[int] = None, - airbyte_api_version: Optional[str] = None, - poll_interval_s: int = 15, - status_updates: bool = False, - timeout: int = 5, -) -> dict: - """Prefect Task for triggering an Airbyte connection sync. - - *It is assumed that the user will have previously configured - a Source & Destination into a Connection.* - e.g. MySql -> CSV - - An invocation of `trigger_sync` will attempt to start a sync job for - the specified `connection_id` representing the Connection in - Airbyte. - - `trigger_sync` will poll Airbyte Server for the Connection status and - will only complete when the sync has completed or - when it receives an error status code from an API call. - - As of `prefect-airbyte==0.1.3`, the kwargs `airbyte_server_host` and - `airbyte_server_port` can be replaced by passing an `airbyte_server` block - instance to generate the `AirbyteClient`. Using the `airbyte_server` block is - preferred, but the individual kwargs remain for backwards compatibility. - - Args: - connection_id: Airbyte connection ID to trigger a sync for. - airbyte_server: An `AirbyteServer` block to create an `AirbyteClient`. - airbyte_server_host: Airbyte server host to connect to. - airbyte_server_port: Airbyte server port to connect to. - airbyte_api_version: Airbyte API version to use. - poll_interval_s: How often to poll Airbyte for sync status. - status_updates: Whether to log sync job status while polling. - timeout: The POST request `timeout` for the `httpx.AsyncClient`. - - Raises: - ValueError: If `connection_id` is not a valid UUID. - err.AirbyteSyncJobFailed: If airbyte returns `JOB_STATUS_FAILED`. - err.AirbyteConnectionInactiveException: If a given connection is inactive. - err.AirbyeConnectionDeprecatedException: If a given connection is deprecated. - - Returns: - Job metadata, including the connection ID and final status of the sync. - - Examples: - - Flow that triggers an Airybte connection sync: - - ```python - from prefect import flow - from prefect_airbyte.connections import trigger_sync - from prefect_airbyte.server import AirbyteServer - - @flow - def example_trigger_sync_flow(): - - # Run other tasks and subflows here - - trigger_sync( - airbyte_server=AirbyteServer.load("oss-airbyte"), - connection_id="your-connection-id-to-sync" - ) +class AirbyteSyncJobResult(BaseModel): + """Model representing a result from an Airbyte sync job.""" - example_trigger_sync_flow() - ``` - """ - logger = get_run_logger() - - if not airbyte_server: - warn( - "The use of `airbyte_server_host`, `airbyte_server_port`, and " - "`airbyte_api_version` is deprecated and will be removed in a " - "future release. Please pass an `airbyte_server` block to this " - "task instead.", - DeprecationWarning, - stacklevel=2, - ) - if any([airbyte_server_host, airbyte_server_port, airbyte_api_version]): - airbyte_server = AirbyteServer( - server_host=airbyte_server_host or "localhost", - server_port=airbyte_server_port or 8000, - api_version=airbyte_api_version or "v1", - ) - else: - airbyte_server = AirbyteServer() - else: - if any([airbyte_server_host, airbyte_server_port, airbyte_api_version]): - logger.info( - "Ignoring `airbyte_server_host`, `airbyte_api_version`, " - "and `airbyte_server_port` because `airbyte_server` block " - " was passed. Using API URL from `airbyte_server` block: " - f"{airbyte_server.base_url!r}." - ) + created_at: int + job_status: Literal["succeeded", "failed", "pending"] + job_id: uuid.UUID + updated_at: int - try: - uuid.UUID(connection_id) - except (TypeError, ValueError): - raise ValueError( - "Parameter `connection_id` *must* be a valid UUID \ - i.e. 32 hex characters, including hyphens." - ) - async with airbyte_server.get_client( - logger=logger, timeout=timeout - ) as airbyte_client: +class AirbyteSync(Block): + @property + def logger(self): + try: + return get_run_logger() + except MissingContextError: + return get_logger() - logger.info( - f"Getting Airbyte Connection {connection_id}, poll interval " - f"{poll_interval_s} seconds, airbyte_base_url {airbyte_server.base_url}" - ) + airbyte_server: AirbyteServer = Field( + default=..., + ) - connection_status = await airbyte_client.get_connection_status(connection_id) + timeout: int = Field( + default=5, + description="Timeout in seconds for requests made by `httpx.AsyncClient`.", + ) - if connection_status == CONNECTION_STATUS_ACTIVE: - # Trigger manual sync on the Connection ... - ( - job_id, - job_created_at, - ) = await airbyte_client.trigger_manual_sync_connection(connection_id) + async def trigger( + self, + connection_id: str, + ): + + try: + uuid.UUID(connection_id) + except (TypeError, ValueError): + raise ValueError( + "Parameter `connection_id` *must* be a valid UUID \ + i.e. 32 hex characters, including hyphens." + ) + + async with self.airbyte_server.get_client( + logger=self.logger, timeout=self.timeout + ) as airbyte_client: + + self.logger.info( + f"Triggering Airbyte Connection {connection_id}, " + f"in workspace at `airbyte_base_url` {self.airbyte_server.base_url}" + ) - job_status = JOB_STATUS_PENDING + connection_status = await airbyte_client.get_connection_status( + connection_id + ) + + if connection_status == CONNECTION_STATUS_ACTIVE: + # Trigger manual sync on the Connection ... + ( + job_id, + _, + ) = await airbyte_client.trigger_manual_sync_connection(connection_id) + + return job_id + + elif connection_status == CONNECTION_STATUS_INACTIVE: + self.logger.error( + f"Connection: {connection_id} is inactive" + " - you'll need to enable it in your Airbyte instance" + ) + raise err.AirbyteConnectionInactiveException( + f"Please enable the Connection {connection_id} in Airbyte instance." + ) + elif connection_status == CONNECTION_STATUS_DEPRECATED: + self.logger.error(f"Connection {connection_id} is deprecated.") + raise err.AirbyeConnectionDeprecatedException( + f"Connection {connection_id} is deprecated." + ) + + async def wait_for_completion( + self, + job_id: str, + poll_interval_s: int = 15, + status_updates: bool = False, + ): + with self.airbyte_server.get_client( + logger=self.logger, timeout=self.timeout + ) as airbyte_client: + + job_status = await airbyte_client.get_job_status(self.job_id) while job_status not in [JOB_STATUS_FAILED, JOB_STATUS_SUCCEEDED]: ( job_status, - job_created_at, - job_updated_at, + _, + _, ) = await airbyte_client.get_job_status(job_id) # pending┃running┃incomplete┃failed┃succeeded┃cancelled if job_status == JOB_STATUS_SUCCEEDED: - logger.info(f"Job {job_id} succeeded.") + self.logger.info(f"Job {job_id} succeeded.") elif job_status == JOB_STATUS_FAILED: - logger.error(f"Job {job_id} failed.") + self.logger.error(f"Job {job_id} failed.") raise err.AirbyteSyncJobFailed(f"Job {job_id} failed.") else: if status_updates: - logger.info(job_status) + self.logger.info(job_status) # wait for next poll interval await sleep(poll_interval_s) - return { - "connection_id": connection_id, - "status": connection_status, - "job_status": job_status, - "job_created_at": job_created_at, - "job_updated_at": job_updated_at, - } - elif connection_status == CONNECTION_STATUS_INACTIVE: - logger.error( - f"Connection: {connection_id} is inactive" - " - you'll need to enable it in your Airbyte instance" - ) - raise err.AirbyteConnectionInactiveException( - f"Please enable the Connection {connection_id} in Airbyte instance." - ) - elif connection_status == CONNECTION_STATUS_DEPRECATED: - logger.error(f"Connection {connection_id} is deprecated.") - raise err.AirbyeConnectionDeprecatedException( - f"Connection {connection_id} is deprecated." - ) + async def fetch_results(self, job_id: uuid.UUID) -> AirbyteSyncJobResult: + with self.airbyte_server.get_client( + logger=self.logger, timeout=self.timeout + ) as airbyte_client: + ( + job_status, + job_created_at, + job_updated_at, + ) = await airbyte_client.get_job_status(job_id) + + return AirbyteSyncJobResult( + created_at=job_created_at, + job_id=job_id, + job_status=job_status, + updated_at=job_updated_at, + ) + + +@flow +def trigger_sync_and_wait_for_completion( + connection_id: str, + airbyte_sync_job_block: AirbyteSync, + poll_interval_s: int = 15, + status_updates: bool = False, +) -> AirbyteSyncJobResult: + job_id = airbyte_sync_job_block.trigger(connection_id=connection_id) + + airbyte_sync_job_block.wait_for_completion( + job_id=job_id, + poll_interval_s=poll_interval_s, + status_updates=status_updates, + ) + + return airbyte_sync_job_block.fetch_results(job_id=job_id) From 297390922c54b0ee030fa14c2b1379b824ea4663 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Wed, 28 Dec 2022 09:15:44 -0600 Subject: [PATCH 2/8] add new client method supporting `run_airbyte_connection_sync` flow / JobBlock --- docs/exceptions.md | 1 + docs/flows.md | 1 + mkdocs.yml | 2 + prefect_airbyte/client.py | 26 ++- prefect_airbyte/connections.py | 414 ++++++++++++++++++++++++--------- prefect_airbyte/flows.py | 55 +++++ tests/conftest.py | 80 +++++-- tests/test_connections.py | 76 ++++-- tests/test_flows.py | 72 ++++++ 9 files changed, 582 insertions(+), 145 deletions(-) create mode 100644 docs/exceptions.md create mode 100644 docs/flows.md create mode 100644 prefect_airbyte/flows.py create mode 100644 tests/test_flows.py diff --git a/docs/exceptions.md b/docs/exceptions.md new file mode 100644 index 0000000..1138c7d --- /dev/null +++ b/docs/exceptions.md @@ -0,0 +1 @@ +::: prefect_airbyte.exceptions \ No newline at end of file diff --git a/docs/flows.md b/docs/flows.md new file mode 100644 index 0000000..7c447ef --- /dev/null +++ b/docs/flows.md @@ -0,0 +1 @@ +::: prefect_airbyte.flows \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index 79a6b43..be81bb3 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -50,4 +50,6 @@ nav: - Client: client.md - Configuration: configuration.md - Connections: connections.md + - Exceptions: exceptions.md + - Flows: flows.md - Server: server.md \ No newline at end of file diff --git a/prefect_airbyte/client.py b/prefect_airbyte/client.py index 46a836f..683f400 100644 --- a/prefect_airbyte/client.py +++ b/prefect_airbyte/client.py @@ -1,6 +1,6 @@ """Client for interacting with Airbyte instance""" import logging -from typing import Tuple +from typing import Any, Dict, Tuple from warnings import warn import httpx @@ -163,7 +163,7 @@ async def trigger_manual_sync_connection( raise err.AirbyteServerNotHealthyException() from e - async def get_job_status(self, job_id: str) -> Tuple[str, str, str]: + async def get_job_status(self, job_id: str) -> Tuple[str, int, int]: """ Gets the status of an Airbyte connection sync job. @@ -190,6 +190,28 @@ async def get_job_status(self, job_id: str) -> Tuple[str, str, str]: raise err.JobNotFoundException(f"Job {job_id} not found.") from e raise err.AirbyteServerNotHealthyException() from e + async def get_job_info(self, job_id: str) -> Dict[str, Any]: + """ + Gets the full API response for a given Airbyte Job ID. + + Args: + job_id: The ID of the Airbyte job to retrieve information on. + + Returns: + Dict of the full API response for the given job ID. + """ + get_connection_url = self.airbyte_base_url + "/jobs/get/" + try: + response = await self._client.post(get_connection_url, json={"id": job_id}) + response.raise_for_status() + + return response.json() + + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + raise err.JobNotFoundException(f"Job {job_id} not found.") from e + raise err.AirbyteServerNotHealthyException() from e + async def create_client(self) -> httpx.AsyncClient: """Convencience method to create a new httpx.AsyncClient. diff --git a/prefect_airbyte/connections.py b/prefect_airbyte/connections.py index 4c7864b..3a35428 100644 --- a/prefect_airbyte/connections.py +++ b/prefect_airbyte/connections.py @@ -1,11 +1,13 @@ """Tasks for connecting to Airbyte and triggering connection syncs""" import uuid from asyncio import sleep +from datetime import datetime +from typing import Any, Dict, Optional +from warnings import warn -from prefect import flow, get_run_logger -from prefect.blocks.core import Block -from prefect.exceptions import MissingContextError -from prefect.logging import get_logger +from prefect import get_run_logger, task +from prefect.blocks.abstract import JobBlock, JobRun +from prefect.utilities.asyncutils import sync_compatible from pydantic import BaseModel, Field from typing_extensions import Literal @@ -18,30 +20,288 @@ CONNECTION_STATUS_DEPRECATED = "deprecated" # Job statuses -JOB_STATUS_SUCCEEDED = "succeeded" +JOB_STATUS_CANCELLED = "cancelled" JOB_STATUS_FAILED = "failed" JOB_STATUS_PENDING = "pending" +JOB_STATUS_SUCCEEDED = "succeeded" + +terminal_job_statuses = { + JOB_STATUS_CANCELLED, + JOB_STATUS_FAILED, + JOB_STATUS_SUCCEEDED, +} + + +@task +async def trigger_sync( + connection_id: str, + airbyte_server: Optional[AirbyteServer] = None, + airbyte_server_host: Optional[str] = None, + airbyte_server_port: Optional[int] = None, + airbyte_api_version: Optional[str] = None, + poll_interval_s: int = 15, + status_updates: bool = False, + timeout: int = 5, +) -> Dict[str, Any]: + """Prefect Task for triggering an Airbyte connection sync. + *It is assumed that the user will have previously configured + a Source & Destination into a Connection.* + e.g. MySql -> CSV + An invocation of `trigger_sync` will attempt to start a sync job for + the specified `connection_id` representing the Connection in + Airbyte. + `trigger_sync` will poll Airbyte Server for the Connection status and + will only complete when the sync has completed or + when it receives an error status code from an API call. + + As of `prefect-airbyte==0.1.3`, the kwargs `airbyte_server_host` and + `airbyte_server_port` can be replaced by passing an `airbyte_server` block + instance to generate the `AirbyteClient`. Using the `airbyte_server` block is + preferred, but the individual kwargs remain for backwards compatibility. + + Args: + connection_id: Airbyte connection ID to trigger a sync for. + airbyte_server: An `AirbyteServer` block to create an `AirbyteClient`. + airbyte_server_host: Airbyte server host to connect to. + airbyte_server_port: Airbyte server port to connect to. + airbyte_api_version: Airbyte API version to use. + poll_interval_s: How often to poll Airbyte for sync status. + status_updates: Whether to log sync job status while polling. + timeout: The POST request `timeout` for the `httpx.AsyncClient`. + Raises: + ValueError: If `connection_id` is not a valid UUID. + AirbyteSyncJobFailed: If airbyte returns `JOB_STATUS_FAILED`. + AirbyteConnectionInactiveException: If a given connection is inactive. + AirbyeConnectionDeprecatedException: If a given connection is deprecated. + Returns: + Job metadata, including the connection ID and final status of the sync. + Examples: + Flow that triggers an Airybte connection sync: + ```python + from prefect import flow + from prefect_airbyte.connections import trigger_sync + from prefect_airbyte.server import AirbyteServer + @flow + def example_trigger_sync_flow(): + # Run other tasks and subflows here + trigger_sync( + airbyte_server=AirbyteServer.load("oss-airbyte"), + connection_id="your-connection-id-to-sync" + ) + example_trigger_sync_flow() + ``` + """ + logger = get_run_logger() + + if not airbyte_server: + warn( + "The use of `airbyte_server_host`, `airbyte_server_port`, and " + "`airbyte_api_version` is deprecated and will be removed in a " + "future release. Please pass an `airbyte_server` block to this " + "task instead.", + DeprecationWarning, + stacklevel=2, + ) + if any([airbyte_server_host, airbyte_server_port, airbyte_api_version]): + airbyte_server = AirbyteServer( + server_host=airbyte_server_host or "localhost", + server_port=airbyte_server_port or 8000, + api_version=airbyte_api_version or "v1", + ) + else: + airbyte_server = AirbyteServer() + else: + if any([airbyte_server_host, airbyte_server_port, airbyte_api_version]): + logger.info( + "Ignoring `airbyte_server_host`, `airbyte_api_version`, " + "and `airbyte_server_port` because `airbyte_server` block " + " was passed. Using API URL from `airbyte_server` block: " + f"{airbyte_server.base_url!r}." + ) + + try: + uuid.UUID(connection_id) + except (TypeError, ValueError): + raise ValueError( + "Parameter `connection_id` *must* be a valid UUID \ + i.e. 32 hex characters, including hyphens." + ) + + async with airbyte_server.get_client( + logger=logger, timeout=timeout + ) as airbyte_client: + + logger.info( + f"Getting Airbyte Connection {connection_id}, poll interval " + f"{poll_interval_s} seconds, airbyte_base_url {airbyte_server.base_url}" + ) + + connection_status = await airbyte_client.get_connection_status(connection_id) + + if connection_status == CONNECTION_STATUS_ACTIVE: + # Trigger manual sync on the Connection ... + ( + job_id, + job_created_at, + ) = await airbyte_client.trigger_manual_sync_connection(connection_id) + + job_status = JOB_STATUS_PENDING + + while job_status not in terminal_job_statuses: + ( + job_status, + job_created_at, + job_updated_at, + ) = await airbyte_client.get_job_status(job_id) + + # pending┃running┃incomplete┃failed┃succeeded┃cancelled + if job_status == JOB_STATUS_SUCCEEDED: + logger.info(f"Job {job_id} succeeded.") + elif ( + job_status == JOB_STATUS_FAILED + or job_status == JOB_STATUS_CANCELLED + ): + logger.error(f"Job {job_id} failed.") + raise err.AirbyteSyncJobFailed(f"Job {job_id} failed.") + else: + if status_updates: + logger.info(job_status) + # wait for next poll interval + await sleep(poll_interval_s) + + return { + "connection_id": connection_id, + "status": connection_status, + "job_status": job_status, + "job_created_at": job_created_at, + "job_updated_at": job_updated_at, + } + elif connection_status == CONNECTION_STATUS_INACTIVE: + logger.error( + f"Connection: {connection_id} is inactive" + " - you'll need to enable it in your Airbyte instance" + ) + raise err.AirbyteConnectionInactiveException( + f"Please enable the Connection {connection_id} in Airbyte instance." + ) + elif connection_status == CONNECTION_STATUS_DEPRECATED: + logger.error(f"Connection {connection_id} is deprecated.") + raise err.AirbyeConnectionDeprecatedException( + f"Connection {connection_id} is deprecated." + ) + + +# The below implements the `JobBlock` version of the above `trigger_sync` task. + + +class AirbyteSyncResult(BaseModel): + """Model representing a result from an `AirbyteSync` job run.""" + + created_at: datetime + job_status: Literal["succeeded", "failed", "pending", "cancelled"] + job_id: int + records_synced: int + updated_at: datetime + + +class AirbyteSync(JobRun): + """A `JobRun` representing an Airbyte sync job.""" + + def __init__(self, airbyte_connection: "AirbyteConnection", job_id: str): + self.airbyte_connection: "AirbyteConnection" = airbyte_connection + self.job_id: int = job_id + self.records_synced: int = 0 + + @sync_compatible + async def wait_for_completion(self): + """Wait for the `AirbyteConnection` sync to reach a terminal state. + + Raises: + AirbyteSyncJobFailed: If the sync job fails. + """ + async with self.airbyte_connection.airbyte_server.get_client( + logger=self.airbyte_connection.logger, + timeout=self.airbyte_connection.timeout, + ) as airbyte_client: + + job_status = JOB_STATUS_PENDING + + while job_status not in terminal_job_statuses: + job_info = await airbyte_client.get_job_info(self.job_id) + job_status = job_info["job"]["status"] -class AirbyteSyncJobResult(BaseModel): - """Model representing a result from an Airbyte sync job.""" + # "recordsSynced" key is only present if the job has synced records + if "recordsSynced" in job_info["attempts"][-1]["attempt"]: + self.records_synced = job_info["attempts"][-1]["attempt"][ + "recordsSynced" + ] - created_at: int - job_status: Literal["succeeded", "failed", "pending"] - job_id: uuid.UUID - updated_at: int + # pending┃running┃failed┃succeeded┃cancelled + if job_status == JOB_STATUS_SUCCEEDED: + self.logger.info(f"Job {self.job_id} succeeded.") + elif ( + job_status == JOB_STATUS_FAILED + or job_status == JOB_STATUS_CANCELLED + ): + self.logger.error(f"Job {self.job_id} {job_status}.") + raise err.AirbyteSyncJobFailed(f"Job {self.job_id} {job_status}.") + else: + if self.airbyte_connection.status_updates: + self.logger.info(job_status) + # wait for next poll interval + await sleep(self.airbyte_connection.poll_interval_s) + + @sync_compatible + async def fetch_result(self) -> AirbyteSyncResult: + """Fetch the result of the `AirbyteSync`. + + Returns: + `AirbyteSyncResult`: object containing metadata for the `AirbyteSync`. + """ + async with self.airbyte_connection.airbyte_server.get_client( + logger=self.airbyte_connection.logger, + timeout=self.airbyte_connection.timeout, + ) as airbyte_client: + job_info = await airbyte_client.get_job_info(self.job_id) + job_status = job_info["job"]["status"] + job_created_at = job_info["job"]["createdAt"] + job_updated_at = job_info["job"]["updatedAt"] + + return AirbyteSyncResult( + created_at=job_created_at, + job_id=self.job_id, + job_status=job_status, + records_synced=self.records_synced, + updated_at=job_updated_at, + ) -class AirbyteSync(Block): - @property - def logger(self): - try: - return get_run_logger() - except MissingContextError: - return get_logger() + +class AirbyteConnection(JobBlock): + """`JobBlock` representing an existing Airbyte connection.""" airbyte_server: AirbyteServer = Field( default=..., + description=( + "`AirbyteServer` block representing the Airbyte instance " + "where the Airbyte connection is defined." + ), + ) + + connection_id: uuid.UUID = Field( + default=..., + description="UUID of the Airbyte Connection to trigger.", + ) + + poll_interval_s: int = Field( + default=15, + description="Time in seconds between status checks of the Airbyte sync job.", + ) + + status_updates: bool = Field( + default=False, + description="Whether to log job status on each poll of the Airbyte sync job.", ) timeout: int = Field( @@ -49,117 +309,53 @@ def logger(self): description="Timeout in seconds for requests made by `httpx.AsyncClient`.", ) - async def trigger( - self, - connection_id: str, - ): - - try: - uuid.UUID(connection_id) - except (TypeError, ValueError): - raise ValueError( - "Parameter `connection_id` *must* be a valid UUID \ - i.e. 32 hex characters, including hyphens." - ) + @sync_compatible + async def trigger(self): + """Trigger a sync of the defined Airbyte connection. + + Returns: + An `AirbyteSync` `JobRun` object representing the active sync job. + + Raises: + AirbyteConnectionInactiveException: If the connection is inactive. + AirbyteConnectionDeprecatedException: If the connection is deprecated. + """ + str_connection_id = str(self.connection_id) async with self.airbyte_server.get_client( logger=self.logger, timeout=self.timeout ) as airbyte_client: self.logger.info( - f"Triggering Airbyte Connection {connection_id}, " - f"in workspace at `airbyte_base_url` {self.airbyte_server.base_url}" + f"Triggering Airbyte Connection {self.connection_id}, " + f"in workspace at {self.airbyte_server.base_url!r}" ) connection_status = await airbyte_client.get_connection_status( - connection_id + str_connection_id ) if connection_status == CONNECTION_STATUS_ACTIVE: - # Trigger manual sync on the Connection ... - ( - job_id, - _, - ) = await airbyte_client.trigger_manual_sync_connection(connection_id) + (job_id, _,) = await airbyte_client.trigger_manual_sync_connection( + str_connection_id + ) - return job_id + return AirbyteSync( + airbyte_connection=self, + job_id=job_id, + ) elif connection_status == CONNECTION_STATUS_INACTIVE: self.logger.error( - f"Connection: {connection_id} is inactive" + f"Connection: {self.connection_id!r} is inactive" " - you'll need to enable it in your Airbyte instance" ) raise err.AirbyteConnectionInactiveException( - f"Please enable the Connection {connection_id} in Airbyte instance." + f"Please enable the connection {self.connection_id!r} " + "in your Airbyte instance." ) elif connection_status == CONNECTION_STATUS_DEPRECATED: - self.logger.error(f"Connection {connection_id} is deprecated.") + self.logger.error(f"Connection {self.connection_id!r} is deprecated.") raise err.AirbyeConnectionDeprecatedException( - f"Connection {connection_id} is deprecated." + f"Connection {self.connection_id!r} is deprecated." ) - - async def wait_for_completion( - self, - job_id: str, - poll_interval_s: int = 15, - status_updates: bool = False, - ): - with self.airbyte_server.get_client( - logger=self.logger, timeout=self.timeout - ) as airbyte_client: - - job_status = await airbyte_client.get_job_status(self.job_id) - - while job_status not in [JOB_STATUS_FAILED, JOB_STATUS_SUCCEEDED]: - ( - job_status, - _, - _, - ) = await airbyte_client.get_job_status(job_id) - - # pending┃running┃incomplete┃failed┃succeeded┃cancelled - if job_status == JOB_STATUS_SUCCEEDED: - self.logger.info(f"Job {job_id} succeeded.") - elif job_status == JOB_STATUS_FAILED: - self.logger.error(f"Job {job_id} failed.") - raise err.AirbyteSyncJobFailed(f"Job {job_id} failed.") - else: - if status_updates: - self.logger.info(job_status) - # wait for next poll interval - await sleep(poll_interval_s) - - async def fetch_results(self, job_id: uuid.UUID) -> AirbyteSyncJobResult: - with self.airbyte_server.get_client( - logger=self.logger, timeout=self.timeout - ) as airbyte_client: - ( - job_status, - job_created_at, - job_updated_at, - ) = await airbyte_client.get_job_status(job_id) - - return AirbyteSyncJobResult( - created_at=job_created_at, - job_id=job_id, - job_status=job_status, - updated_at=job_updated_at, - ) - - -@flow -def trigger_sync_and_wait_for_completion( - connection_id: str, - airbyte_sync_job_block: AirbyteSync, - poll_interval_s: int = 15, - status_updates: bool = False, -) -> AirbyteSyncJobResult: - job_id = airbyte_sync_job_block.trigger(connection_id=connection_id) - - airbyte_sync_job_block.wait_for_completion( - job_id=job_id, - poll_interval_s=poll_interval_s, - status_updates=status_updates, - ) - - return airbyte_sync_job_block.fetch_results(job_id=job_id) diff --git a/prefect_airbyte/flows.py b/prefect_airbyte/flows.py new file mode 100644 index 0000000..249276c --- /dev/null +++ b/prefect_airbyte/flows.py @@ -0,0 +1,55 @@ +"""Flows for interacting with Airbyte.""" + +from prefect import flow, task + +from prefect_airbyte.connections import AirbyteConnection, AirbyteSyncResult + + +@flow +async def run_airbyte_connection_sync( + airbyte_connection: AirbyteConnection, +) -> AirbyteSyncResult: + """A flow that triggers a sync of an Airbyte connection and waits for it to complete. + + Args: + airbyte_connection: `AirbyteConnection` representing the Airbyte connection to + trigger and wait for completion of. + + Returns: + `AirbyteSyncResult`: Model containing metadata for the `AirbyteSync`. + + Example: + Define a flow that runs an Airbyte connection sync: + ```python + from prefect import flow + from prefect_airbyte.server import AirbyteServer + from prefect_airbyte.connections import AirbyteConnection + from prefect_airbyte.flows import run_airbyte_connection_sync + + airbyte_server = AirbyteServer( + server_host="localhost", + server_port=8000 + ) + + connection = AirbyteConnection( + airbyte_server=airbyte_server, + connection_id="" + ) + + @flow + def airbyte_sync_flow(): + # do some things + + airbyte_sync_result = run_airbyte_connection_sync( + airbyte_connection=connection + ) + print(airbyte_sync_result.records_synced) + + # do some other things, like trigger DBT based on number of new raw records + ``` + """ + airbyte_sync = await task(airbyte_connection.trigger.aio)(airbyte_connection) + + await task(airbyte_sync.wait_for_completion.aio)(airbyte_sync) + + return await task(airbyte_sync.fetch_result.aio)(airbyte_sync) diff --git a/tests/conftest.py b/tests/conftest.py index e2cbe11..a3d870f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,19 @@ import pytest import respx from httpx import Response +from prefect.testing.utilities import prefect_test_harness +from prefect_airbyte.connections import AirbyteConnection from prefect_airbyte.server import AirbyteServer -# connection fixtures / mocks -CONNECTION_ID = "e1b2078f-882a-4f50-9942-cfe34b2d825b" +@pytest.fixture(scope="session", autouse=True) +def prefect_db(): + """ + Sets up test harness for temporary DB during test runs. + """ + with prefect_test_harness(): + yield @pytest.fixture @@ -15,9 +22,19 @@ def airbyte_server(): @pytest.fixture -def airbyte_trigger_sync_response() -> dict: +def connection_id(): + return "e1b2078f-882a-4f50-9942-cfe34b2d825b" + + +@pytest.fixture +def airbyte_connection(airbyte_server, connection_id): + return AirbyteConnection(airbyte_server=airbyte_server, connection_id=connection_id) + + +@pytest.fixture +def airbyte_trigger_sync_response(connection_id) -> dict: return { - "connectionId": CONNECTION_ID, + "connectionId": connection_id, "status": "active", "job": {"id": 45, "createdAt": 1650311569, "updatedAt": 1650311585}, } @@ -34,9 +51,9 @@ def airbyte_bad_health_check_response() -> dict: @pytest.fixture -def airbyte_get_connection_response_json() -> dict: +def airbyte_get_connection_response_json(connection_id) -> dict: return { - "connectionId": CONNECTION_ID, + "connectionId": connection_id, "name": "File <> Snowflake Demo", "namespaceDefinition": "destination", "namespaceFormat": "${SOURCE_NAMESPACE}", @@ -109,12 +126,12 @@ def airbyte_get_connection_not_found(): @pytest.fixture -def airbyte_base_job_status_response() -> dict: +def airbyte_base_job_status_response(connection_id) -> dict: return { "job": { "id": 45, "configType": "sync", - "configId": CONNECTION_ID, + "configId": connection_id, "createdAt": 1650644844, "updatedAt": 1650644844, "status": None, @@ -126,18 +143,57 @@ def airbyte_base_job_status_response() -> dict: @pytest.fixture def airbyte_get_good_job_status_response(airbyte_base_job_status_response) -> dict: airbyte_base_job_status_response["job"]["status"] = "succeeded" + airbyte_base_job_status_response["attempts"].append( + { + "attempt": { + "id": 0, + "status": "succeeded", + "createdAt": 0, + "updatedAt": 0, + "endedAt": 0, + "bytesSynced": 0, + "recordsSynced": 0, + } + } + ) return airbyte_base_job_status_response @pytest.fixture def airbyte_get_pending_job_status_response(airbyte_base_job_status_response) -> dict: airbyte_base_job_status_response["job"]["status"] = "pending" + airbyte_base_job_status_response["attempts"].append( + { + "attempt": { + "id": 0, + "status": "pending", + "createdAt": 0, + "updatedAt": 0, + "endedAt": 0, + "bytesSynced": 0, + "recordsSynced": 0, + } + } + ) return airbyte_base_job_status_response @pytest.fixture def airbyte_get_failed_job_status_response(airbyte_base_job_status_response) -> dict: airbyte_base_job_status_response["job"]["status"] = "failed" + airbyte_base_job_status_response["attempts"].append( + { + "attempt": { + "id": 0, + "status": "failed", + "createdAt": 0, + "updatedAt": 0, + "endedAt": 0, + "bytesSynced": 0, + "recordsSynced": 0, + } + } + ) return airbyte_base_job_status_response @@ -288,7 +344,6 @@ def mock_cancelled_connection_sync_calls( respx_mock, base_airbyte_url, airbyte_good_health_check_response, - airbyte_get_good_job_status_response, airbyte_trigger_sync_response, airbyte_get_connection_response_json, airbyte_get_pending_job_status_response, @@ -310,12 +365,12 @@ def mock_cancelled_connection_sync_calls( respx_mock.post( url=f"{base_airbyte_url}/jobs/get/", - json={"id": airbyte_get_good_job_status_response["job"]["id"]}, + json={"id": airbyte_get_pending_job_status_response["job"]["id"]}, ).mock(return_value=Response(200, json=airbyte_get_pending_job_status_response)) respx_mock.post( url=f"{base_airbyte_url}/jobs/get/", - json={"id": airbyte_get_good_job_status_response["job"]["id"]}, + json={"id": airbyte_get_failed_job_status_response["job"]["id"]}, ).mock(return_value=Response(200, json=airbyte_get_failed_job_status_response)) @@ -339,9 +394,6 @@ def mock_inactive_sync_calls( ).mock(return_value=Response(200, json=airbyte_get_connection_response_json)) -# configuration fixtures / mocks - - @pytest.fixture def airbyte_good_export_configuration_response() -> bytes: return b"" diff --git a/tests/test_connections.py b/tests/test_connections.py index e4e3295..6070ffb 100644 --- a/tests/test_connections.py +++ b/tests/test_connections.py @@ -2,22 +2,22 @@ from prefect.logging import disable_run_logger from prefect_airbyte import exceptions as err -from prefect_airbyte.connections import trigger_sync +from prefect_airbyte.connections import AirbyteConnection, trigger_sync -CONNECTION_ID = "e1b2078f-882a-4f50-9942-cfe34b2d825b" - -async def example_trigger_sync_flow(airbyte_server=None, **kwargs): +async def example_trigger_sync_flow(connection_id, airbyte_server=None, **kwargs): with disable_run_logger(): return await trigger_sync.fn( - airbyte_server=airbyte_server, connection_id=CONNECTION_ID, **kwargs + airbyte_server=airbyte_server, connection_id=connection_id, **kwargs ) async def test_successful_trigger_sync( - mock_successful_connection_sync_calls, airbyte_server + mock_successful_connection_sync_calls, airbyte_server, connection_id ): - trigger_sync_result = await example_trigger_sync_flow(airbyte_server) + trigger_sync_result = await example_trigger_sync_flow( + airbyte_server=airbyte_server, connection_id=connection_id + ) assert type(trigger_sync_result) is dict @@ -31,41 +31,66 @@ async def test_successful_trigger_sync( async def test_cancelled_trigger_manual_sync( - mock_cancelled_connection_sync_calls, airbyte_server + mock_cancelled_connection_sync_calls, airbyte_server, connection_id ): with pytest.raises(err.AirbyteSyncJobFailed): - await example_trigger_sync_flow(airbyte_server) + await example_trigger_sync_flow( + airbyte_server=airbyte_server, connection_id=connection_id + ) -async def test_connection_sync_inactive(mock_inactive_sync_calls, airbyte_server): +async def test_connection_sync_inactive( + mock_inactive_sync_calls, airbyte_server, connection_id +): with pytest.raises(err.AirbyteConnectionInactiveException): - await example_trigger_sync_flow(airbyte_server) + await example_trigger_sync_flow( + airbyte_server=airbyte_server, connection_id=connection_id + ) -async def test_failed_trigger_sync(mock_failed_connection_sync_calls, airbyte_server): +async def test_failed_trigger_sync( + mock_failed_connection_sync_calls, airbyte_server, connection_id +): with pytest.raises(err.AirbyteSyncJobFailed): - await example_trigger_sync_flow(airbyte_server) + await example_trigger_sync_flow( + airbyte_server=airbyte_server, connection_id=connection_id + ) -async def test_bad_connection_id(mock_bad_connection_id_calls, airbyte_server): +async def test_bad_connection_id( + mock_bad_connection_id_calls, airbyte_server, connection_id +): with pytest.raises(err.ConnectionNotFoundException): - await example_trigger_sync_flow(airbyte_server) + await example_trigger_sync_flow( + airbyte_server=airbyte_server, connection_id=connection_id + ) -async def test_failed_health_check(mock_failed_health_check_calls, airbyte_server): +async def test_failed_health_check( + mock_failed_health_check_calls, airbyte_server, connection_id +): with pytest.raises(err.AirbyteServerNotHealthyException): - await example_trigger_sync_flow(airbyte_server) + await example_trigger_sync_flow( + airbyte_server=airbyte_server, connection_id=connection_id + ) -async def test_get_job_status_not_found(mock_invalid_job_status_calls, airbyte_server): +async def test_get_job_status_not_found( + mock_invalid_job_status_calls, airbyte_server, connection_id +): with pytest.raises(err.JobNotFoundException): - await example_trigger_sync_flow(airbyte_server) + await example_trigger_sync_flow( + airbyte_server=airbyte_server, connection_id=connection_id + ) -async def test_trigger_sync_with_kwargs(mock_successful_connection_sync_calls): +async def test_trigger_sync_with_kwargs( + mock_successful_connection_sync_calls, connection_id +): trigger_sync_result = await example_trigger_sync_flow( airbyte_server_host="localhost", airbyte_server_port=8000, + connection_id=connection_id, ) assert type(trigger_sync_result) is dict @@ -77,3 +102,14 @@ async def test_trigger_sync_with_kwargs(mock_successful_connection_sync_calls): "job_created_at": 1650644844, "job_updated_at": 1650644844, } + + +async def test_airbyte_connection_instantiation(airbyte_server, connection_id): + connection = AirbyteConnection( + airbyte_server=airbyte_server, + connection_id=connection_id, + ) + + assert isinstance(connection, AirbyteConnection) + assert connection.airbyte_server == airbyte_server + assert str(connection.connection_id) == connection_id diff --git a/tests/test_flows.py b/tests/test_flows.py new file mode 100644 index 0000000..c0acdf4 --- /dev/null +++ b/tests/test_flows.py @@ -0,0 +1,72 @@ +import pytest +from prefect import flow + +from prefect_airbyte.connections import AirbyteSyncResult +from prefect_airbyte.exceptions import AirbyteSyncJobFailed +from prefect_airbyte.flows import run_airbyte_connection_sync + +expected_airbyte_sync_result = AirbyteSyncResult( + created_at=1650644844, + job_status="succeeded", + job_id=45, + records_synced=0, + updated_at=1650644844, +) + + +async def test_run_airbyte_connection_sync_standalone_success( + airbyte_server, airbyte_connection, mock_successful_connection_sync_calls +): + + result = await run_airbyte_connection_sync(airbyte_connection=airbyte_connection) + + assert result == expected_airbyte_sync_result + + +async def test_run_airbyte_connection_sync_standalone_fail( + airbyte_server, airbyte_connection, mock_failed_connection_sync_calls, caplog +): + + with pytest.raises(AirbyteSyncJobFailed): + await run_airbyte_connection_sync(airbyte_connection=airbyte_connection) + + +async def test_run_airbyte_connection_sync_standalone_cancel( + airbyte_server, airbyte_connection, mock_cancelled_connection_sync_calls +): + + with pytest.raises(AirbyteSyncJobFailed): + await run_airbyte_connection_sync(airbyte_connection=airbyte_connection) + + +async def test_run_airbyte_connection_sync_standalone_status_updates( + airbyte_server, airbyte_connection, mock_successful_connection_sync_calls, caplog +): + airbyte_connection.status_updates = True + await run_airbyte_connection_sync(airbyte_connection=airbyte_connection) + + assert "Job 45 succeeded" in caplog.text + + +async def test_run_airbyte_connection_sync_subflow_synchronously( + airbyte_server, airbyte_connection, mock_successful_connection_sync_calls +): + @flow + def airbyte_sync_sync_flow(): + return run_airbyte_connection_sync(airbyte_connection=airbyte_connection) + + result = airbyte_sync_sync_flow() + + assert result == expected_airbyte_sync_result + + +async def test_run_airbyte_connection_sync_subflow_asynchronously( + airbyte_server, airbyte_connection, mock_successful_connection_sync_calls +): + @flow + async def airbyte_sync_sync_flow(): + return await run_airbyte_connection_sync(airbyte_connection=airbyte_connection) + + result = await airbyte_sync_sync_flow() + + assert result == expected_airbyte_sync_result From 4aab00e3484c5aba0bc9cabbd82666d60c4faf16 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Wed, 28 Dec 2022 09:42:51 -0600 Subject: [PATCH 3/8] clean up docstrings and examples --- README.md | 30 ++++++++++++++++++------------ prefect_airbyte/client.py | 8 ++++++-- prefect_airbyte/connections.py | 9 ++++++--- prefect_airbyte/flows.py | 6 +++--- tests/test_flows.py | 26 +++++++++++++------------- 5 files changed, 46 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index edbcb63..3f91a70 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ ## Welcome! -`prefect-airbyte` is a collection of prebuilt Prefect tasks that can be used to quickly construct Prefect flows to trigger Airbyte syncs or export your connector configurations. +`prefect-airbyte` is a collection of prebuilt Prefect tasks and flows that can be used to quickly construct Prefect flows to interact with [Airbyte](https://airbyte.io/). ## Getting Started @@ -67,22 +67,28 @@ remote_airbyte_server.save("my-remote-airbyte-server") #### Trigger a defined connection sync ```python from prefect import flow -from prefect_airbyte.connections import trigger_sync from prefect_airbyte.server import AirbyteServer +from prefect_airbyte.connections import AirbyteConnection +from prefect_airbyte.flows import run_connection_sync -@flow -def example_trigger_sync_flow(): +server = AirbyteServer(server_host="localhost", server_port=8000) - # Run other tasks and subflows here +connection = AirbyteConnection( + airbyte_server=server, + connection_id="e1b2078f-882a-4f50-9942-cfe34b2d825b", + status_updates=True, +) - trigger_sync( - airbyte_server=AirbyteServer.load("my-airbyte-server"), - connection_id="your-connection-id-to-sync", - poll_interval_s=3, - status_updates=True - ) +@flow +def airbyte_syncs(): + # do some setup + + sync_result = run_connection_sync( + airbyte_connection=connection, + ) -example_trigger_sync_flow() + # do some other things, like trigger DBT based on number of records synced + print(f'{sync_result.records_synced=}') ``` ```console diff --git a/prefect_airbyte/client.py b/prefect_airbyte/client.py index 683f400..63664f3 100644 --- a/prefect_airbyte/client.py +++ b/prefect_airbyte/client.py @@ -44,10 +44,10 @@ def __init__( async def check_health_status(self, client: httpx.AsyncClient) -> bool: """ - Checks the health status of an AirbyteInstance. + Checks the health status of an Airbyte instance. Args: - Session used to interact with the Airbyte API. + client: `httpx.AsyncClient` used to interact with the Airbyte API. Returns: True if the server is healthy. False otherwise. @@ -74,6 +74,8 @@ async def export_configuration( """ Triggers an export of Airbyte configuration. + **Note**: As of Airbyte v0.40.7-alpha, this endpoint no longer exists. + Returns: Gzipped Airbyte configuration data. """ @@ -167,6 +169,8 @@ async def get_job_status(self, job_id: str) -> Tuple[str, int, int]: """ Gets the status of an Airbyte connection sync job. + **Note**: To be deprecated in favor of `AirbyteClient.get_job_info`. + Args: job_id: ID of the Airbyte job to check. diff --git a/prefect_airbyte/connections.py b/prefect_airbyte/connections.py index 3a35428..329d92c 100644 --- a/prefect_airbyte/connections.py +++ b/prefect_airbyte/connections.py @@ -44,12 +44,15 @@ async def trigger_sync( timeout: int = 5, ) -> Dict[str, Any]: """Prefect Task for triggering an Airbyte connection sync. + *It is assumed that the user will have previously configured a Source & Destination into a Connection.* e.g. MySql -> CSV + An invocation of `trigger_sync` will attempt to start a sync job for the specified `connection_id` representing the Connection in Airbyte. + `trigger_sync` will poll Airbyte Server for the Connection status and will only complete when the sync has completed or when it receives an error status code from an API call. @@ -161,8 +164,8 @@ def example_trigger_sync_flow(): job_status == JOB_STATUS_FAILED or job_status == JOB_STATUS_CANCELLED ): - logger.error(f"Job {job_id} failed.") - raise err.AirbyteSyncJobFailed(f"Job {job_id} failed.") + logger.error(f"Job {job_id} {job_status}.") + raise err.AirbyteSyncJobFailed(f"Job {job_id} {job_status}.") else: if status_updates: logger.info(job_status) @@ -310,7 +313,7 @@ class AirbyteConnection(JobBlock): ) @sync_compatible - async def trigger(self): + async def trigger(self) -> AirbyteSync: """Trigger a sync of the defined Airbyte connection. Returns: diff --git a/prefect_airbyte/flows.py b/prefect_airbyte/flows.py index 249276c..729e6bf 100644 --- a/prefect_airbyte/flows.py +++ b/prefect_airbyte/flows.py @@ -6,7 +6,7 @@ @flow -async def run_airbyte_connection_sync( +async def run_connection_sync( airbyte_connection: AirbyteConnection, ) -> AirbyteSyncResult: """A flow that triggers a sync of an Airbyte connection and waits for it to complete. @@ -24,7 +24,7 @@ async def run_airbyte_connection_sync( from prefect import flow from prefect_airbyte.server import AirbyteServer from prefect_airbyte.connections import AirbyteConnection - from prefect_airbyte.flows import run_airbyte_connection_sync + from prefect_airbyte.flows import run_connection_sync airbyte_server = AirbyteServer( server_host="localhost", @@ -40,7 +40,7 @@ async def run_airbyte_connection_sync( def airbyte_sync_flow(): # do some things - airbyte_sync_result = run_airbyte_connection_sync( + airbyte_sync_result = run_connection_sync( airbyte_connection=connection ) print(airbyte_sync_result.records_synced) diff --git a/tests/test_flows.py b/tests/test_flows.py index c0acdf4..7f63021 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -3,7 +3,7 @@ from prefect_airbyte.connections import AirbyteSyncResult from prefect_airbyte.exceptions import AirbyteSyncJobFailed -from prefect_airbyte.flows import run_airbyte_connection_sync +from prefect_airbyte.flows import run_connection_sync expected_airbyte_sync_result = AirbyteSyncResult( created_at=1650644844, @@ -14,58 +14,58 @@ ) -async def test_run_airbyte_connection_sync_standalone_success( +async def test_run_connection_sync_standalone_success( airbyte_server, airbyte_connection, mock_successful_connection_sync_calls ): - result = await run_airbyte_connection_sync(airbyte_connection=airbyte_connection) + result = await run_connection_sync(airbyte_connection=airbyte_connection) assert result == expected_airbyte_sync_result -async def test_run_airbyte_connection_sync_standalone_fail( +async def test_run_connection_sync_standalone_fail( airbyte_server, airbyte_connection, mock_failed_connection_sync_calls, caplog ): with pytest.raises(AirbyteSyncJobFailed): - await run_airbyte_connection_sync(airbyte_connection=airbyte_connection) + await run_connection_sync(airbyte_connection=airbyte_connection) -async def test_run_airbyte_connection_sync_standalone_cancel( +async def test_run_connection_sync_standalone_cancel( airbyte_server, airbyte_connection, mock_cancelled_connection_sync_calls ): with pytest.raises(AirbyteSyncJobFailed): - await run_airbyte_connection_sync(airbyte_connection=airbyte_connection) + await run_connection_sync(airbyte_connection=airbyte_connection) -async def test_run_airbyte_connection_sync_standalone_status_updates( +async def test_run_connection_sync_standalone_status_updates( airbyte_server, airbyte_connection, mock_successful_connection_sync_calls, caplog ): airbyte_connection.status_updates = True - await run_airbyte_connection_sync(airbyte_connection=airbyte_connection) + await run_connection_sync(airbyte_connection=airbyte_connection) assert "Job 45 succeeded" in caplog.text -async def test_run_airbyte_connection_sync_subflow_synchronously( +async def test_run_connection_sync_subflow_synchronously( airbyte_server, airbyte_connection, mock_successful_connection_sync_calls ): @flow def airbyte_sync_sync_flow(): - return run_airbyte_connection_sync(airbyte_connection=airbyte_connection) + return run_connection_sync(airbyte_connection=airbyte_connection) result = airbyte_sync_sync_flow() assert result == expected_airbyte_sync_result -async def test_run_airbyte_connection_sync_subflow_asynchronously( +async def test_run_connection_sync_subflow_asynchronously( airbyte_server, airbyte_connection, mock_successful_connection_sync_calls ): @flow async def airbyte_sync_sync_flow(): - return await run_airbyte_connection_sync(airbyte_connection=airbyte_connection) + return await run_connection_sync(airbyte_connection=airbyte_connection) result = await airbyte_sync_sync_flow() From fddb4f224c8837ec829d867b7e66ae6f338c403e Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Wed, 28 Dec 2022 09:49:39 -0600 Subject: [PATCH 4/8] update changelog --- CHANGELOG.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ce160d0..e5829e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Removed +## 0.2.0 + +Released on December 28, 2022. + +### Added +- `AirbyteConnection` block to represent a connection to trigger in Airbyte - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44) +- `AirbyteSync` block to represent a sync job triggered within Airbyte - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44) +- `run_connection_sync` flow to trigger a sync job for a given connection - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44) +- `AirbyteClient.get_job_info` method to retrieve bulk information about a job - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44) +### Fixed +- Case in `trigger_sync` task where the cancelling the job would cause an infinite loop - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44) + ## 0.1.3 Released on November 16, 2022. From a0aa62a0ee91a1d442e81343e0bc129eb71c50d1 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Wed, 28 Dec 2022 10:17:16 -0600 Subject: [PATCH 5/8] add attrs for `AirbyteConnection` docstring --- prefect_airbyte/connections.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/prefect_airbyte/connections.py b/prefect_airbyte/connections.py index 329d92c..5f14995 100644 --- a/prefect_airbyte/connections.py +++ b/prefect_airbyte/connections.py @@ -282,7 +282,23 @@ async def fetch_result(self) -> AirbyteSyncResult: class AirbyteConnection(JobBlock): - """`JobBlock` representing an existing Airbyte connection.""" + """`JobBlock` representing an existing Airbyte connection. + + Attributes: + airbyte_server: `AirbyteServer` block representing the Airbyte instance + where the Airbyte connection is defined. + connection_id: UUID of the Airbyte Connection to trigger. + poll_interval_s: Time in seconds between status checks of the Airbyte sync job. + status_updates: Whether to log job status on each poll of the Airbyte sync job. + timeout: Timeout in seconds for requests made by `httpx.AsyncClient`. + + Example: + ```python + from prefect_airbyte import AirbyteConnection + + airbyte_connection = AirbyteConnection.load("BLOCK_NAME") + ``` + """ airbyte_server: AirbyteServer = Field( default=..., From 968b1cf145b16382cc31e2590f2522b609e89475 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Wed, 28 Dec 2022 12:15:30 -0600 Subject: [PATCH 6/8] address review comments --- README.md | 2 +- prefect_airbyte/connections.py | 12 +++--------- prefect_airbyte/flows.py | 6 ++++++ 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 3f91a70..18aea4e 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,7 @@ def airbyte_syncs(): ) # do some other things, like trigger DBT based on number of records synced - print(f'{sync_result.records_synced=}') + print(f'Number of Records Synced: {sync_result.records_synced}') ``` ```console diff --git a/prefect_airbyte/connections.py b/prefect_airbyte/connections.py index 5f14995..7898de3 100644 --- a/prefect_airbyte/connections.py +++ b/prefect_airbyte/connections.py @@ -160,10 +160,7 @@ def example_trigger_sync_flow(): # pending┃running┃incomplete┃failed┃succeeded┃cancelled if job_status == JOB_STATUS_SUCCEEDED: logger.info(f"Job {job_id} succeeded.") - elif ( - job_status == JOB_STATUS_FAILED - or job_status == JOB_STATUS_CANCELLED - ): + elif job_status in [JOB_STATUS_FAILED, JOB_STATUS_CANCELLED]: logger.error(f"Job {job_id} {job_status}.") raise err.AirbyteSyncJobFailed(f"Job {job_id} {job_status}.") else: @@ -210,7 +207,7 @@ class AirbyteSyncResult(BaseModel): class AirbyteSync(JobRun): """A `JobRun` representing an Airbyte sync job.""" - def __init__(self, airbyte_connection: "AirbyteConnection", job_id: str): + def __init__(self, airbyte_connection: "AirbyteConnection", job_id: int): self.airbyte_connection: "AirbyteConnection" = airbyte_connection self.job_id: int = job_id self.records_synced: int = 0 @@ -243,10 +240,7 @@ async def wait_for_completion(self): # pending┃running┃failed┃succeeded┃cancelled if job_status == JOB_STATUS_SUCCEEDED: self.logger.info(f"Job {self.job_id} succeeded.") - elif ( - job_status == JOB_STATUS_FAILED - or job_status == JOB_STATUS_CANCELLED - ): + elif job_status in [JOB_STATUS_FAILED, JOB_STATUS_CANCELLED]: self.logger.error(f"Job {self.job_id} {job_status}.") raise err.AirbyteSyncJobFailed(f"Job {self.job_id} {job_status}.") else: diff --git a/prefect_airbyte/flows.py b/prefect_airbyte/flows.py index 729e6bf..47d8605 100644 --- a/prefect_airbyte/flows.py +++ b/prefect_airbyte/flows.py @@ -48,6 +48,12 @@ def airbyte_sync_flow(): # do some other things, like trigger DBT based on number of new raw records ``` """ + + # TODO: refactor block method calls to avoid using .aio + # we currently need to do this because of the deadlock caused by calling + # a sync task within an async flow + # see [this issue](https://github.com/PrefectHQ/prefect/issues/7551) + airbyte_sync = await task(airbyte_connection.trigger.aio)(airbyte_connection) await task(airbyte_sync.wait_for_completion.aio)(airbyte_sync) From 021f1df74d79d36000c4e0f67a347bbbe2379b4e Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Wed, 28 Dec 2022 13:51:41 -0600 Subject: [PATCH 7/8] apply more review suggestions --- prefect_airbyte/client.py | 12 +++++++++++- prefect_airbyte/connections.py | 27 +++++++++++++++------------ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/prefect_airbyte/client.py b/prefect_airbyte/client.py index 63664f3..bf2eb4e 100644 --- a/prefect_airbyte/client.py +++ b/prefect_airbyte/client.py @@ -169,7 +169,7 @@ async def get_job_status(self, job_id: str) -> Tuple[str, int, int]: """ Gets the status of an Airbyte connection sync job. - **Note**: To be deprecated in favor of `AirbyteClient.get_job_info`. + **Note**: Deprecated in favor of `AirbyteClient.get_job_info`. Args: job_id: ID of the Airbyte job to check. @@ -179,6 +179,16 @@ async def get_job_status(self, job_id: str) -> Tuple[str, int, int]: job_created_at: Datetime string of when the job was created. job_updated_at: Datetime string of the when the job was last updated. """ + warn( + "`AirbyteClient.get_job_status` is deprecated and will be removed in " + "a future release. If you are using this client method directly, please " + "use the `AirbyteClient.get_job_info` method instead. If you are" + "seeing this warning while using the `trigger_sync` task, please " + "define an `AirbyteConnection` and use `run_connection_sync` instead.", + DeprecationWarning, + stacklevel=2, + ) + get_connection_url = self.airbyte_base_url + "/jobs/get/" try: response = await self._client.post(get_connection_url, json={"id": job_id}) diff --git a/prefect_airbyte/connections.py b/prefect_airbyte/connections.py index 7898de3..0ee8df9 100644 --- a/prefect_airbyte/connections.py +++ b/prefect_airbyte/connections.py @@ -210,7 +210,7 @@ class AirbyteSync(JobRun): def __init__(self, airbyte_connection: "AirbyteConnection", job_id: int): self.airbyte_connection: "AirbyteConnection" = airbyte_connection self.job_id: int = job_id - self.records_synced: int = 0 + self._records_synced: int = 0 @sync_compatible async def wait_for_completion(self): @@ -231,11 +231,9 @@ async def wait_for_completion(self): job_status = job_info["job"]["status"] - # "recordsSynced" key is only present if the job has synced records - if "recordsSynced" in job_info["attempts"][-1]["attempt"]: - self.records_synced = job_info["attempts"][-1]["attempt"][ - "recordsSynced" - ] + self._records_synced = job_info["attempts"][-1]["attempt"].get( + "recordsSynced", 0 + ) # pending┃running┃failed┃succeeded┃cancelled if job_status == JOB_STATUS_SUCCEEDED: @@ -270,7 +268,7 @@ async def fetch_result(self) -> AirbyteSyncResult: created_at=job_created_at, job_id=self.job_id, job_status=job_status, - records_synced=self.records_synced, + records_synced=self._records_synced, updated_at=job_updated_at, ) @@ -291,6 +289,15 @@ class AirbyteConnection(JobBlock): from prefect_airbyte import AirbyteConnection airbyte_connection = AirbyteConnection.load("BLOCK_NAME") + + # trigger the Airbyte connection sync + airbyte_sync = await airbyte_connection.trigger() + + # wait for the Airbyte sync to complete + await airbyte_sync.wait_for_completion() + + # fetch the result of the Airbyte sync + airbyte_sync_result = await airbyte_sync.fetch_result() ``` """ @@ -359,16 +366,12 @@ async def trigger(self) -> AirbyteSync: ) elif connection_status == CONNECTION_STATUS_INACTIVE: - self.logger.error( - f"Connection: {self.connection_id!r} is inactive" - " - you'll need to enable it in your Airbyte instance" - ) raise err.AirbyteConnectionInactiveException( + f"Connection: {self.connection_id!r} is inactive" f"Please enable the connection {self.connection_id!r} " "in your Airbyte instance." ) elif connection_status == CONNECTION_STATUS_DEPRECATED: - self.logger.error(f"Connection {self.connection_id!r} is deprecated.") raise err.AirbyeConnectionDeprecatedException( f"Connection {self.connection_id!r} is deprecated." ) From b7f1eb7064647bd11ff875b98878c436ef61d7b2 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Wed, 28 Dec 2022 15:41:34 -0600 Subject: [PATCH 8/8] remove awaits in examples bc sync_compatible --- prefect_airbyte/connections.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/prefect_airbyte/connections.py b/prefect_airbyte/connections.py index 0ee8df9..87023ec 100644 --- a/prefect_airbyte/connections.py +++ b/prefect_airbyte/connections.py @@ -291,13 +291,13 @@ class AirbyteConnection(JobBlock): airbyte_connection = AirbyteConnection.load("BLOCK_NAME") # trigger the Airbyte connection sync - airbyte_sync = await airbyte_connection.trigger() + airbyte_sync = airbyte_connection.trigger() # wait for the Airbyte sync to complete - await airbyte_sync.wait_for_completion() + airbyte_sync.wait_for_completion() # fetch the result of the Airbyte sync - airbyte_sync_result = await airbyte_sync.fetch_result() + airbyte_sync_result = airbyte_sync.fetch_result() ``` """