Skip to content
This repository has been archived by the owner on Jan 24, 2025. It is now read-only.

add AirbyteConnection, AirbyteSync and run_connection_sync #44

Merged
merged 8 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Removed

## 0.2.0

Released on December 28, 2022.

### Added
- `AirbyteConnection` block to represent a connection to trigger in Airbyte - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44)
- `AirbyteSync` block to represent a sync job triggered within Airbyte - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44)
- `run_connection_sync` flow to trigger a sync job for a given connection - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44)
- `AirbyteClient.get_job_info` method to retrieve bulk information about a job - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44)
### Fixed
- Case in `trigger_sync` task where the cancelling the job would cause an infinite loop - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44)

## 0.1.3

Released on November 16, 2022.
Expand Down
30 changes: 18 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

## Welcome!

`prefect-airbyte` is a collection of prebuilt Prefect tasks that can be used to quickly construct Prefect flows to trigger Airbyte syncs or export your connector configurations.
`prefect-airbyte` is a collection of prebuilt Prefect tasks and flows that can be used to quickly construct Prefect flows to interact with [Airbyte](https://airbyte.io/).

## Getting Started

Expand Down Expand Up @@ -67,22 +67,28 @@ remote_airbyte_server.save("my-remote-airbyte-server")
#### Trigger a defined connection sync
```python
from prefect import flow
from prefect_airbyte.connections import trigger_sync
from prefect_airbyte.server import AirbyteServer
from prefect_airbyte.connections import AirbyteConnection
from prefect_airbyte.flows import run_connection_sync

@flow
def example_trigger_sync_flow():
server = AirbyteServer(server_host="localhost", server_port=8000)

# Run other tasks and subflows here
connection = AirbyteConnection(
airbyte_server=server,
connection_id="e1b2078f-882a-4f50-9942-cfe34b2d825b",
status_updates=True,
)

trigger_sync(
airbyte_server=AirbyteServer.load("my-airbyte-server"),
connection_id="your-connection-id-to-sync",
poll_interval_s=3,
status_updates=True
)
@flow
def airbyte_syncs():
# do some setup

sync_result = run_connection_sync(
airbyte_connection=connection,
)

example_trigger_sync_flow()
# do some other things, like trigger DBT based on number of records synced
print(f'Number of Records Synced: {sync_result.records_synced}')
```

```console
Expand Down
1 change: 1 addition & 0 deletions docs/exceptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: prefect_airbyte.exceptions
1 change: 1 addition & 0 deletions docs/flows.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: prefect_airbyte.flows
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,6 @@ nav:
- Client: client.md
- Configuration: configuration.md
- Connections: connections.md
- Exceptions: exceptions.md
- Flows: flows.md
- Server: server.md
44 changes: 40 additions & 4 deletions prefect_airbyte/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Client for interacting with Airbyte instance"""
import logging
from typing import Tuple
from typing import Any, Dict, Tuple
from warnings import warn

import httpx
Expand Down Expand Up @@ -44,10 +44,10 @@ def __init__(

async def check_health_status(self, client: httpx.AsyncClient) -> bool:
"""
Checks the health status of an AirbyteInstance.
Checks the health status of an Airbyte instance.

Args:
Session used to interact with the Airbyte API.
client: `httpx.AsyncClient` used to interact with the Airbyte API.

Returns:
True if the server is healthy. False otherwise.
Expand All @@ -74,6 +74,8 @@ async def export_configuration(
"""
Triggers an export of Airbyte configuration.

**Note**: As of Airbyte v0.40.7-alpha, this endpoint no longer exists.

Returns:
Gzipped Airbyte configuration data.
"""
Expand Down Expand Up @@ -163,10 +165,12 @@ async def trigger_manual_sync_connection(

raise err.AirbyteServerNotHealthyException() from e

async def get_job_status(self, job_id: str) -> Tuple[str, str, str]:
async def get_job_status(self, job_id: str) -> Tuple[str, int, int]:
"""
Gets the status of an Airbyte connection sync job.

**Note**: Deprecated in favor of `AirbyteClient.get_job_info`.

Args:
job_id: ID of the Airbyte job to check.

Expand All @@ -175,6 +179,16 @@ async def get_job_status(self, job_id: str) -> Tuple[str, str, str]:
job_created_at: Datetime string of when the job was created.
job_updated_at: Datetime string of the when the job was last updated.
"""
warn(
"`AirbyteClient.get_job_status` is deprecated and will be removed in "
"a future release. If you are using this client method directly, please "
"use the `AirbyteClient.get_job_info` method instead. If you are"
"seeing this warning while using the `trigger_sync` task, please "
"define an `AirbyteConnection` and use `run_connection_sync` instead.",
DeprecationWarning,
stacklevel=2,
)

get_connection_url = self.airbyte_base_url + "/jobs/get/"
try:
response = await self._client.post(get_connection_url, json={"id": job_id})
Expand All @@ -190,6 +204,28 @@ async def get_job_status(self, job_id: str) -> Tuple[str, str, str]:
raise err.JobNotFoundException(f"Job {job_id} not found.") from e
raise err.AirbyteServerNotHealthyException() from e

async def get_job_info(self, job_id: str) -> Dict[str, Any]:
"""
Gets the full API response for a given Airbyte Job ID.

Args:
job_id: The ID of the Airbyte job to retrieve information on.

Returns:
Dict of the full API response for the given job ID.
"""
get_connection_url = self.airbyte_base_url + "/jobs/get/"
try:
response = await self._client.post(get_connection_url, json={"id": job_id})
response.raise_for_status()

return response.json()

except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise err.JobNotFoundException(f"Job {job_id} not found.") from e
raise err.AirbyteServerNotHealthyException() from e

async def create_client(self) -> httpx.AsyncClient:
"""Convencience method to create a new httpx.AsyncClient.

Expand Down
Loading