Skip to content
This repository has been archived by the owner on Jan 24, 2025. It is now read-only.

Refactors status code handling in Airbyte client #20

Merged
merged 8 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Status code handling in Airbyte client - [#20](https://github.com/PrefectHQ/prefect-airbyte/pull/20)

## 0.1.1

Released on September 2, 2022.

### Added

- a `timeout` parameter to `trigger_sync` and `export_configuration` passed to `httpx.AsyncClient`
- a `timeout` parameter to `trigger_sync` and `export_configuration` passed to `httpx.AsyncClient` - [#18](https://github.com/PrefectHQ/prefect-airbyte/pull/18)

### Fixed

- Using `asyncio.sleep` instead of `time.sleep` within `trigger_sync` task.
- Using `asyncio.sleep` instead of `time.sleep` within `trigger_sync` task - [#13](https://github.com/PrefectHQ/prefect-airbyte/pull/13)

### Security

Expand Down
74 changes: 36 additions & 38 deletions prefect_airbyte/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ async def _establish_session(self) -> httpx.AsyncClient:
client: `httpx.AsyncClient` used to communicate with the Airbyte API
"""
client = httpx.AsyncClient(timeout=self.timeout)
if await self.check_health_status(client):
return client
else:
raise err.AirbyteServerNotHealthyException
await self.check_health_status(client)
return client

async def check_health_status(self, client: httpx.AsyncClient):
"""
Expand All @@ -73,6 +71,8 @@ async def check_health_status(self, client: httpx.AsyncClient):
get_connection_url = self.airbyte_base_url + "/health/"
try:
response = await client.get(get_connection_url)
response.raise_for_status()

self.logger.debug("Health check response: %s", response.json())
key = "available" if "available" in response.json() else "db"
health_status = response.json()[key]
Expand All @@ -82,7 +82,7 @@ async def check_health_status(self, client: httpx.AsyncClient):
)
return True
except httpx.HTTPStatusError as e:
raise err.AirbyteServerNotHealthyException(e)
raise err.AirbyteServerNotHealthyException() from e

async def create_client(self) -> httpx.AsyncClient:
"""
Expand All @@ -98,7 +98,7 @@ async def create_client(self) -> httpx.AsyncClient:

async def export_configuration(
self,
) -> bytearray:
) -> bytes:
"""
Trigger an export of Airbyte configuration

Expand All @@ -114,12 +114,14 @@ async def export_configuration(

try:
response = await client.post(get_connection_url)
if response.status_code == 200:
self.logger.debug("Export configuration response: %s", response)
export_config = response.content
return export_config
response.raise_for_status()

self.logger.debug("Export configuration response: %s", response)

export_config = response.content
return export_config
except httpx.HTTPStatusError as e:
raise err.AirbyteExportConfigurationFailed(e)
raise err.AirbyteExportConfigurationFailed() from e

async def get_connection_status(self, connection_id: str) -> str:
"""
Expand Down Expand Up @@ -147,9 +149,9 @@ async def get_connection_status(self, connection_id: str) -> str:
return connection_status
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise err.ConnectionNotFoundException
raise err.ConnectionNotFoundException() from e
else:
raise err.AirbyteServerNotHealthyException(e)
raise err.AirbyteServerNotHealthyException() from e

async def trigger_manual_sync_connection(
self, connection_id: str
Expand All @@ -172,25 +174,20 @@ async def trigger_manual_sync_connection(
response = await client.post(
get_connection_url, json={"connectionId": connection_id}
)
if response.status_code == 200:
job_id = response.json()["job"]["id"]
print(response.json())
job_created_at = response.json()["job"]["createdAt"]
return job_id, job_created_at
elif response.status_code == 404:
# connection_id not found
self.logger.warning(
f"Connection {connection_id} not found, please double "
f"check the connection_id ..."
)
response.raise_for_status()
job_id = response.json()["job"]["id"]
job_created_at = response.json()["job"]["createdAt"]
return job_id, job_created_at
desertaxle marked this conversation as resolved.
Show resolved Hide resolved
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise err.ConnectionNotFoundException(
f"Connection {connection_id} not found, please double "
f"check the connection_id ..."
)
except httpx.HTTPStatusError as e:
raise err.AirbyteServerNotHealthyException(e)
f"check the connection_id."
) from e

async def get_job_status(self, job_id: str) -> str:
raise err.AirbyteServerNotHealthyException() from e

async def get_job_status(self, job_id: str) -> Tuple[str, str, str]:
"""
Get the status of an Airbyte connection sync job

Expand All @@ -205,13 +202,14 @@ async def get_job_status(self, job_id: str) -> str:
get_connection_url = self.airbyte_base_url + "/jobs/get/"
try:
response = await client.post(get_connection_url, json={"id": job_id})
if response.status_code == 200:
job_status = response.json()["job"]["status"]
job_created_at = response.json()["job"]["createdAt"]
job_updated_at = response.json()["job"]["updatedAt"]
return job_status, job_created_at, job_updated_at
elif response.status_code == 404:
self.logger.error(f"Job {job_id} not found...")
raise err.JobNotFoundException(f"Job {job_id} not found...")
response.raise_for_status()

job = response.json()["job"]
job_status = job["status"]
job_created_at = job["createdAt"]
job_updated_at = job["updatedAt"]
return job_status, job_created_at, job_updated_at
desertaxle marked this conversation as resolved.
Show resolved Hide resolved
except httpx.HTTPStatusError as e:
raise err.AirbyteServerNotHealthyException(e)
if e.response.status_code == 404:
raise err.JobNotFoundException(f"Job {job_id} not found.") from e
raise err.AirbyteServerNotHealthyException() from e