Skip to content
This repository has been archived by the owner on Jan 24, 2025. It is now read-only.

Add AirbyteServer block #40

Merged
merged 16 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

### Fixed

### Deprecated

### Removed

## 0.1.3

Released on November 16, 2022.

This release does not introduce breaking changes, but changes the default interface for tasks in this collection. Previously, tasks only accepted individual kwargs like `airbyte_server_host` and `airbyte_server_port` that were needed to construct the base url and make API calls. Now that Airbyte supports basic user/password authentication, it made sense to create an `AirbyteServer` block that stores this user auth data and uses it to configure clients.

We will create an `AirbyteServer` on the fly for users who continue to pass the old kwargs, but print a message that they will eventually be removed from the interface.

### Added
- `AirbyteServer` block to handle client generation and support for NGINX authentication on OSS instances - [#40](https://github.com/PrefectHQ/prefect-airbyte/pull/40)
- Deprecation warning on `AirbyteClient.export_configuration`, as OSS airbyte v0.40.7 has removed the corresponding endpoint - [#40](https://github.com/PrefectHQ/prefect-airbyte/pull/40)
- The `httpx.AsyncClient` as a private member of the class, so we could use the whole `AirbyteClient` as a context manager when it is retrieved by `AirbyteServer.get_client` - [#40](https://github.com/PrefectHQ/prefect-airbyte/pull/40)

### Changed
- Task inputs for `trigger_sync` and `export_configuration` from accepting Airbyte network configurations as separate kwargs to accepting an `AirbyteServer` block instance - [#40](https://github.com/PrefectHQ/prefect-airbyte/pull/40)


### Fixed
- Docstring for `trigger_sync` task and removed inappropriate default value for `connection_id` - [#26](https://github.com/PrefectHQ/prefect-airbyte/pull/26)

Expand Down
32 changes: 28 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,41 @@ pip install prefect-airbyte
```

### Examples
#### Create an `AirbyteServer` block and save it
```python
from prefect_airbyte.server import AirbyteServer

# running airbyte locally at http://localhost:8000 with default auth
local_airbyte_server = AirbyteServer()

# running airbyte remotely at http://<someIP>:<somePort> as user `Marvin`
remote_airbyte_server = AirbyteServer(
username="Marvin",
password="DontPanic42",
server_host="42.42.42.42",
server_port="4242"
)

local_airbyte_server.save("my-local-airbyte-server")

remote_airbyte_server.save("my-remote-airbyte-server")

```
zzstoatzz marked this conversation as resolved.
Show resolved Hide resolved


#### Trigger a defined connection sync
```python
from prefect import flow
from prefect_airbyte.connections import trigger_sync

from prefect_airbyte.server import AirbyteServer

@flow
def example_trigger_sync_flow():

# Run other tasks and subflows here

trigger_sync(
airbyte_server=AirbyteServer.load("my-airbyte-server"),
connection_id="your-connection-id-to-sync",
poll_interval_s=3,
status_updates=True
Expand Down Expand Up @@ -83,11 +105,15 @@ example_trigger_sync_flow()


#### Export an Airbyte instance's configuration

**NOTE**: The API endpoint corresponding to this task is no longer supported by open-source Airbyte versions as of v0.40.7. Check out the [Octavia CLI docs](https://github.com/airbytehq/airbyte/tree/master/octavia-cli) for more info.

```python
import gzip

from prefect import flow, task
from prefect_airbyte.configuration import export_configuration
from prefect_airbyte.server import AirbyteServer

@task
def zip_and_write_somewhere(
Expand All @@ -103,9 +129,7 @@ def example_export_configuration_flow(filepath: str):
# Run other tasks and subflows here

airbyte_config = export_configuration(
airbyte_server_host="localhost",
airbyte_server_port="8000",
airbyte_api_version="v1",
airbyte_server=AirbyteServer.load("my-airbyte-server-block")
)

zip_and_write_somewhere(
Expand Down
1 change: 1 addition & 0 deletions docs/server.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: prefect_airbyte.server
3 changes: 2 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ nav:
- Home: index.md
- Client: client.md
- Configuration: configuration.md
- Connections: connections.md
- Connections: connections.md
- Server: server.md
102 changes: 63 additions & 39 deletions prefect_airbyte/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Client for interacting with Airbyte instance"""

import logging
from typing import Tuple
from warnings import warn

import httpx

Expand All @@ -12,38 +12,35 @@ class AirbyteClient:
"""
Client class used to call API endpoints on an Airbyte server.

This client assumes that you're using an OSS Airbyte server which does not require
an API key to access its a API.
This client currently supports username/password authentication as set in `auth`.

For more info, see the [Airbyte docs](https://docs.airbyte.io/api-documentation).

Attributes:
airbyte_base_url str: Base API endpoint URL for Airbyte.
auth: Username and password for Airbyte API.
logger: A logger instance used by the client to log messages related to
API calls.
airbyte_base_url str: Base API endpoint URL for Airbyte.
timeout: The number of seconds to wait before an API call times out.
"""

def __init__(
self,
logger: logging.Logger,
airbyte_base_url: str = "http://localhost:8000/api/v1",
auth: Tuple[str, str] = ("airbyte", "password"),
timeout: int = 5,
):
self._closed = False
self._started = False

self.airbyte_base_url = airbyte_base_url
self.auth = auth
self.logger = logger
self.timeout = timeout

async def _establish_session(self) -> httpx.AsyncClient:
"""
Checks health of the Airbyte server and establishes a session.

Returns:
Session used to communicate with the Airbyte API.
"""
client = httpx.AsyncClient(timeout=self.timeout)
await self.check_health_status(client)
return client
self._client = httpx.AsyncClient(
base_url=self.airbyte_base_url, auth=self.auth, timeout=self.timeout
)

async def check_health_status(self, client: httpx.AsyncClient) -> bool:
"""
Expand Down Expand Up @@ -71,16 +68,6 @@ async def check_health_status(self, client: httpx.AsyncClient) -> bool:
except httpx.HTTPStatusError as e:
raise err.AirbyteServerNotHealthyException() from e

async def create_client(self) -> httpx.AsyncClient:
"""
Convenience method for establishing a healthy session with the Airbyte server.

Returns:
Session for interacting with the Airbyte server.
"""
client = await self._establish_session()
return client

zzstoatzz marked this conversation as resolved.
Show resolved Hide resolved
async def export_configuration(
self,
) -> bytes:
Expand All @@ -90,20 +77,30 @@ async def export_configuration(
Returns:
Gzipped Airbyte configuration data.
"""
client = await self.create_client()
warn(
"As of Airbyte v0.40.7-alpha, the Airbyte API no longer supports "
"exporting configurations. See the Octavia CLI docs for more info.",
DeprecationWarning,
stacklevel=2,
)

get_connection_url = self.airbyte_base_url + "/deployment/export/"

try:
response = await client.post(get_connection_url)
response = await self._client.post(get_connection_url)
response.raise_for_status()

self.logger.debug("Export configuration response: %s", response)

export_config = response.content
return export_config
except httpx.HTTPStatusError as e:
raise err.AirbyteExportConfigurationFailed() from e
if e.response.status_code == 404:
self.logger.error(
"If you are using Airbyte v0.40.7-alpha, there is no longer "
"an API endpoint for exporting configurations."
)
zzstoatzz marked this conversation as resolved.
Show resolved Hide resolved
raise err.AirbyteExportConfigurationFailed() from e

async def get_connection_status(self, connection_id: str) -> str:
"""
Expand All @@ -115,13 +112,10 @@ async def get_connection_status(self, connection_id: str) -> str:
Returns:
The status of the defined Airbyte connection.
"""
client = await self.create_client()

get_connection_url = self.airbyte_base_url + "/connections/get/"

# TODO - Missing auth because Airbyte API currently doesn't yet support auth
try:
response = await client.post(
response = await self._client.post(
get_connection_url, json={"connectionId": connection_id}
)

Expand Down Expand Up @@ -149,13 +143,10 @@ async def trigger_manual_sync_connection(
created_at: Datetime string of when the job was created.

"""
client = await self.create_client()

get_connection_url = self.airbyte_base_url + "/connections/sync/"

# TODO - no current authentication methods from Airbyte
try:
response = await client.post(
response = await self._client.post(
get_connection_url, json={"connectionId": connection_id}
)
response.raise_for_status()
Expand Down Expand Up @@ -184,11 +175,9 @@ async def get_job_status(self, job_id: str) -> Tuple[str, str, str]:
job_created_at: Datetime string of when the job was created.
job_updated_at: Datetime string of the when the job was last updated.
"""
client = await self.create_client()

get_connection_url = self.airbyte_base_url + "/jobs/get/"
try:
response = await client.post(get_connection_url, json={"id": job_id})
response = await self._client.post(get_connection_url, json={"id": job_id})
response.raise_for_status()

job = response.json()["job"]
Expand All @@ -200,3 +189,38 @@ async def get_job_status(self, job_id: str) -> Tuple[str, str, str]:
if e.response.status_code == 404:
raise err.JobNotFoundException(f"Job {job_id} not found.") from e
raise err.AirbyteServerNotHealthyException() from e

async def create_client(self) -> httpx.AsyncClient:
"""Convencience method to create a new httpx.AsyncClient.

To be removed in favor of using the entire `AirbyteClient` class
as a context manager.
"""
warn(
"Use of this method will be removed in a future release - "
"please use the `AirbyteClient` class as a context manager.",
DeprecationWarning,
stacklevel=2,
)
return self._client

async def __aenter__(self):
"""Context manager entry point."""
if self._closed:
raise RuntimeError(
"The client cannot be started again after it has been closed."
)
if self._started:
raise RuntimeError("The client cannot be started more than once.")

self._started = True

await self.check_health_status(self._client)

return self

async def __aexit__(self, *exc):
"""Context manager exit point."""

self._closed = True
await self._client.__aexit__()
Loading