Skip to content
This repository has been archived by the owner on Jan 24, 2025. It is now read-only.

Commit

Permalink
Merge pull request #44 from PrefectHQ/job-block
Browse files Browse the repository at this point in the history
add `AirbyteConnection`, `AirbyteSync` and `run_connection_sync`
  • Loading branch information
zzstoatzz authored Dec 28, 2022
2 parents e849555 + b7f1eb7 commit 66b2ded
Show file tree
Hide file tree
Showing 11 changed files with 537 additions and 69 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Removed

## 0.2.0

Released on December 28, 2022.

### Added
- `AirbyteConnection` block to represent a connection to trigger in Airbyte - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44)
- `AirbyteSync` block to represent a sync job triggered within Airbyte - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44)
- `run_connection_sync` flow to trigger a sync job for a given connection - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44)
- `AirbyteClient.get_job_info` method to retrieve bulk information about a job - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44)
### Fixed
- Case in `trigger_sync` task where the cancelling the job would cause an infinite loop - [#44](https://github.com/PrefectHQ/prefect-airbyte/pull/44)

## 0.1.3

Released on November 16, 2022.
Expand Down
30 changes: 18 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

## Welcome!

`prefect-airbyte` is a collection of prebuilt Prefect tasks that can be used to quickly construct Prefect flows to trigger Airbyte syncs or export your connector configurations.
`prefect-airbyte` is a collection of prebuilt Prefect tasks and flows that can be used to quickly construct Prefect flows to interact with [Airbyte](https://airbyte.io/).

## Getting Started

Expand Down Expand Up @@ -67,22 +67,28 @@ remote_airbyte_server.save("my-remote-airbyte-server")
#### Trigger a defined connection sync
```python
from prefect import flow
from prefect_airbyte.connections import trigger_sync
from prefect_airbyte.server import AirbyteServer
from prefect_airbyte.connections import AirbyteConnection
from prefect_airbyte.flows import run_connection_sync

@flow
def example_trigger_sync_flow():
server = AirbyteServer(server_host="localhost", server_port=8000)

# Run other tasks and subflows here
connection = AirbyteConnection(
airbyte_server=server,
connection_id="e1b2078f-882a-4f50-9942-cfe34b2d825b",
status_updates=True,
)

trigger_sync(
airbyte_server=AirbyteServer.load("my-airbyte-server"),
connection_id="your-connection-id-to-sync",
poll_interval_s=3,
status_updates=True
)
@flow
def airbyte_syncs():
# do some setup

sync_result = run_connection_sync(
airbyte_connection=connection,
)

example_trigger_sync_flow()
# do some other things, like trigger DBT based on number of records synced
print(f'Number of Records Synced: {sync_result.records_synced}')
```

```console
Expand Down
1 change: 1 addition & 0 deletions docs/exceptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: prefect_airbyte.exceptions
1 change: 1 addition & 0 deletions docs/flows.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: prefect_airbyte.flows
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,6 @@ nav:
- Client: client.md
- Configuration: configuration.md
- Connections: connections.md
- Exceptions: exceptions.md
- Flows: flows.md
- Server: server.md
44 changes: 40 additions & 4 deletions prefect_airbyte/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Client for interacting with Airbyte instance"""
import logging
from typing import Tuple
from typing import Any, Dict, Tuple
from warnings import warn

import httpx
Expand Down Expand Up @@ -44,10 +44,10 @@ def __init__(

async def check_health_status(self, client: httpx.AsyncClient) -> bool:
"""
Checks the health status of an AirbyteInstance.
Checks the health status of an Airbyte instance.
Args:
Session used to interact with the Airbyte API.
client: `httpx.AsyncClient` used to interact with the Airbyte API.
Returns:
True if the server is healthy. False otherwise.
Expand All @@ -74,6 +74,8 @@ async def export_configuration(
"""
Triggers an export of Airbyte configuration.
**Note**: As of Airbyte v0.40.7-alpha, this endpoint no longer exists.
Returns:
Gzipped Airbyte configuration data.
"""
Expand Down Expand Up @@ -163,10 +165,12 @@ async def trigger_manual_sync_connection(

raise err.AirbyteServerNotHealthyException() from e

async def get_job_status(self, job_id: str) -> Tuple[str, str, str]:
async def get_job_status(self, job_id: str) -> Tuple[str, int, int]:
"""
Gets the status of an Airbyte connection sync job.
**Note**: Deprecated in favor of `AirbyteClient.get_job_info`.
Args:
job_id: ID of the Airbyte job to check.
Expand All @@ -175,6 +179,16 @@ async def get_job_status(self, job_id: str) -> Tuple[str, str, str]:
job_created_at: Datetime string of when the job was created.
job_updated_at: Datetime string of the when the job was last updated.
"""
warn(
"`AirbyteClient.get_job_status` is deprecated and will be removed in "
"a future release. If you are using this client method directly, please "
"use the `AirbyteClient.get_job_info` method instead. If you are"
"seeing this warning while using the `trigger_sync` task, please "
"define an `AirbyteConnection` and use `run_connection_sync` instead.",
DeprecationWarning,
stacklevel=2,
)

get_connection_url = self.airbyte_base_url + "/jobs/get/"
try:
response = await self._client.post(get_connection_url, json={"id": job_id})
Expand All @@ -190,6 +204,28 @@ async def get_job_status(self, job_id: str) -> Tuple[str, str, str]:
raise err.JobNotFoundException(f"Job {job_id} not found.") from e
raise err.AirbyteServerNotHealthyException() from e

async def get_job_info(self, job_id: str) -> Dict[str, Any]:
"""
Gets the full API response for a given Airbyte Job ID.
Args:
job_id: The ID of the Airbyte job to retrieve information on.
Returns:
Dict of the full API response for the given job ID.
"""
get_connection_url = self.airbyte_base_url + "/jobs/get/"
try:
response = await self._client.post(get_connection_url, json={"id": job_id})
response.raise_for_status()

return response.json()

except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise err.JobNotFoundException(f"Job {job_id} not found.") from e
raise err.AirbyteServerNotHealthyException() from e

async def create_client(self) -> httpx.AsyncClient:
"""Convencience method to create a new httpx.AsyncClient.
Expand Down
Loading

0 comments on commit 66b2ded

Please sign in to comment.