diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e6bd67..ce160d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,10 +11,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +### Fixed + ### Deprecated ### Removed +## 0.1.3 + +Released on November 16, 2022. + +This release does not introduce breaking changes, but changes the default interface for tasks in this collection. Previously, tasks only accepted individual kwargs like `airbyte_server_host` and `airbyte_server_port` that were needed to construct the base url and make API calls. Now that Airbyte supports basic user/password authentication, it made sense to create an `AirbyteServer` block that stores this user auth data and uses it to configure clients. + +We will create an `AirbyteServer` on the fly for users who continue to pass the old kwargs, but print a message that they will eventually be removed from the interface. + +### Added +- `AirbyteServer` block to handle client generation and support for NGINX authentication on OSS instances - [#40](https://github.com/PrefectHQ/prefect-airbyte/pull/40) +- Deprecation warning on `AirbyteClient.export_configuration`, as OSS airbyte v0.40.7 has removed the corresponding endpoint - [#40](https://github.com/PrefectHQ/prefect-airbyte/pull/40) +- The `httpx.AsyncClient` as a private member of the class, so we could use the whole `AirbyteClient` as a context manager when it is retrieved by `AirbyteServer.get_client` - [#40](https://github.com/PrefectHQ/prefect-airbyte/pull/40) + +### Changed +- Task inputs for `trigger_sync` and `export_configuration` from accepting Airbyte network configurations as separate kwargs to accepting an `AirbyteServer` block instance - [#40](https://github.com/PrefectHQ/prefect-airbyte/pull/40) + + ### Fixed - Docstring for `trigger_sync` task and removed inappropriate default value for `connection_id` - [#26](https://github.com/PrefectHQ/prefect-airbyte/pull/26) diff --git a/README.md b/README.md index 2124251..edbcb63 100644 --- a/README.md +++ b/README.md @@ -42,12 +42,33 @@ pip install prefect-airbyte ``` ### Examples +#### Create an `AirbyteServer` block and save it +```python +from prefect_airbyte.server import AirbyteServer + +# running airbyte locally at http://localhost:8000 with default auth +local_airbyte_server = AirbyteServer() + +# running airbyte remotely at http://: as user `Marvin` +remote_airbyte_server = AirbyteServer( + username="Marvin", + password="DontPanic42", + server_host="42.42.42.42", + server_port="4242" +) + +local_airbyte_server.save("my-local-airbyte-server") + +remote_airbyte_server.save("my-remote-airbyte-server") + +``` + #### Trigger a defined connection sync ```python from prefect import flow from prefect_airbyte.connections import trigger_sync - +from prefect_airbyte.server import AirbyteServer @flow def example_trigger_sync_flow(): @@ -55,6 +76,7 @@ def example_trigger_sync_flow(): # Run other tasks and subflows here trigger_sync( + airbyte_server=AirbyteServer.load("my-airbyte-server"), connection_id="your-connection-id-to-sync", poll_interval_s=3, status_updates=True @@ -83,11 +105,15 @@ example_trigger_sync_flow() #### Export an Airbyte instance's configuration + +**NOTE**: The API endpoint corresponding to this task is no longer supported by open-source Airbyte versions as of v0.40.7. Check out the [Octavia CLI docs](https://github.com/airbytehq/airbyte/tree/master/octavia-cli) for more info. + ```python import gzip from prefect import flow, task from prefect_airbyte.configuration import export_configuration +from prefect_airbyte.server import AirbyteServer @task def zip_and_write_somewhere( @@ -103,9 +129,7 @@ def example_export_configuration_flow(filepath: str): # Run other tasks and subflows here airbyte_config = export_configuration( - airbyte_server_host="localhost", - airbyte_server_port="8000", - airbyte_api_version="v1", + airbyte_server=AirbyteServer.load("my-airbyte-server-block") ) zip_and_write_somewhere( diff --git a/docs/server.md b/docs/server.md new file mode 100644 index 0000000..3d81e3e --- /dev/null +++ b/docs/server.md @@ -0,0 +1 @@ +::: prefect_airbyte.server \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index cee1215..a82ca2d 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -49,4 +49,5 @@ nav: - Home: index.md - Client: client.md - Configuration: configuration.md - - Connections: connections.md \ No newline at end of file + - Connections: connections.md + - Server: server.md \ No newline at end of file diff --git a/prefect_airbyte/client.py b/prefect_airbyte/client.py index b9a9971..46a836f 100644 --- a/prefect_airbyte/client.py +++ b/prefect_airbyte/client.py @@ -1,7 +1,7 @@ """Client for interacting with Airbyte instance""" - import logging from typing import Tuple +from warnings import warn import httpx @@ -12,15 +12,15 @@ class AirbyteClient: """ Client class used to call API endpoints on an Airbyte server. - This client assumes that you're using an OSS Airbyte server which does not require - an API key to access its a API. + This client currently supports username/password authentication as set in `auth`. For more info, see the [Airbyte docs](https://docs.airbyte.io/api-documentation). Attributes: + airbyte_base_url str: Base API endpoint URL for Airbyte. + auth: Username and password for Airbyte API. logger: A logger instance used by the client to log messages related to API calls. - airbyte_base_url str: Base API endpoint URL for Airbyte. timeout: The number of seconds to wait before an API call times out. """ @@ -28,22 +28,19 @@ def __init__( self, logger: logging.Logger, airbyte_base_url: str = "http://localhost:8000/api/v1", + auth: Tuple[str, str] = ("airbyte", "password"), timeout: int = 5, ): + self._closed = False + self._started = False + self.airbyte_base_url = airbyte_base_url + self.auth = auth self.logger = logger self.timeout = timeout - - async def _establish_session(self) -> httpx.AsyncClient: - """ - Checks health of the Airbyte server and establishes a session. - - Returns: - Session used to communicate with the Airbyte API. - """ - client = httpx.AsyncClient(timeout=self.timeout) - await self.check_health_status(client) - return client + self._client = httpx.AsyncClient( + base_url=self.airbyte_base_url, auth=self.auth, timeout=self.timeout + ) async def check_health_status(self, client: httpx.AsyncClient) -> bool: """ @@ -71,16 +68,6 @@ async def check_health_status(self, client: httpx.AsyncClient) -> bool: except httpx.HTTPStatusError as e: raise err.AirbyteServerNotHealthyException() from e - async def create_client(self) -> httpx.AsyncClient: - """ - Convenience method for establishing a healthy session with the Airbyte server. - - Returns: - Session for interacting with the Airbyte server. - """ - client = await self._establish_session() - return client - async def export_configuration( self, ) -> bytes: @@ -90,12 +77,17 @@ async def export_configuration( Returns: Gzipped Airbyte configuration data. """ - client = await self.create_client() + warn( + "As of Airbyte v0.40.7-alpha, the Airbyte API no longer supports " + "exporting configurations. See the Octavia CLI docs for more info.", + DeprecationWarning, + stacklevel=2, + ) get_connection_url = self.airbyte_base_url + "/deployment/export/" try: - response = await client.post(get_connection_url) + response = await self._client.post(get_connection_url) response.raise_for_status() self.logger.debug("Export configuration response: %s", response) @@ -103,7 +95,12 @@ async def export_configuration( export_config = response.content return export_config except httpx.HTTPStatusError as e: - raise err.AirbyteExportConfigurationFailed() from e + if e.response.status_code == 404: + self.logger.error( + "If you are using Airbyte v0.40.7-alpha, there is no longer " + "an API endpoint for exporting configurations." + ) + raise err.AirbyteExportConfigurationFailed() from e async def get_connection_status(self, connection_id: str) -> str: """ @@ -115,13 +112,10 @@ async def get_connection_status(self, connection_id: str) -> str: Returns: The status of the defined Airbyte connection. """ - client = await self.create_client() - get_connection_url = self.airbyte_base_url + "/connections/get/" - # TODO - Missing auth because Airbyte API currently doesn't yet support auth try: - response = await client.post( + response = await self._client.post( get_connection_url, json={"connectionId": connection_id} ) @@ -149,13 +143,10 @@ async def trigger_manual_sync_connection( created_at: Datetime string of when the job was created. """ - client = await self.create_client() - get_connection_url = self.airbyte_base_url + "/connections/sync/" - # TODO - no current authentication methods from Airbyte try: - response = await client.post( + response = await self._client.post( get_connection_url, json={"connectionId": connection_id} ) response.raise_for_status() @@ -184,11 +175,9 @@ async def get_job_status(self, job_id: str) -> Tuple[str, str, str]: job_created_at: Datetime string of when the job was created. job_updated_at: Datetime string of the when the job was last updated. """ - client = await self.create_client() - get_connection_url = self.airbyte_base_url + "/jobs/get/" try: - response = await client.post(get_connection_url, json={"id": job_id}) + response = await self._client.post(get_connection_url, json={"id": job_id}) response.raise_for_status() job = response.json()["job"] @@ -200,3 +189,38 @@ async def get_job_status(self, job_id: str) -> Tuple[str, str, str]: if e.response.status_code == 404: raise err.JobNotFoundException(f"Job {job_id} not found.") from e raise err.AirbyteServerNotHealthyException() from e + + async def create_client(self) -> httpx.AsyncClient: + """Convencience method to create a new httpx.AsyncClient. + + To be removed in favor of using the entire `AirbyteClient` class + as a context manager. + """ + warn( + "Use of this method will be removed in a future release - " + "please use the `AirbyteClient` class as a context manager.", + DeprecationWarning, + stacklevel=2, + ) + return self._client + + async def __aenter__(self): + """Context manager entry point.""" + if self._closed: + raise RuntimeError( + "The client cannot be started again after it has been closed." + ) + if self._started: + raise RuntimeError("The client cannot be started more than once.") + + self._started = True + + await self.check_health_status(self._client) + + return self + + async def __aexit__(self, *exc): + """Context manager exit point.""" + + self._closed = True + await self._client.__aexit__() diff --git a/prefect_airbyte/configuration.py b/prefect_airbyte/configuration.py index 8e4b102..ea2096e 100644 --- a/prefect_airbyte/configuration.py +++ b/prefect_airbyte/configuration.py @@ -1,15 +1,18 @@ """Tasks for updating and fetching Airbyte configurations""" -from prefect import task -from prefect.logging.loggers import get_logger +from typing import Optional +from warnings import warn -from prefect_airbyte.client import AirbyteClient +from prefect import get_run_logger, task + +from prefect_airbyte.server import AirbyteServer @task async def export_configuration( - airbyte_server_host: str = "localhost", - airbyte_server_port: int = "8000", - airbyte_api_version: str = "v1", + airbyte_server: Optional[AirbyteServer] = None, + airbyte_server_host: Optional[str] = None, + airbyte_server_port: Optional[int] = None, + airbyte_api_version: Optional[str] = None, timeout: int = 5, ) -> bytes: @@ -17,10 +20,16 @@ async def export_configuration( Prefect Task that exports an Airbyte configuration via `{airbyte_server_host}/api/v1/deployment/export`. + As of `prefect-airbyte==0.1.3`, the kwargs `airbyte_server_host` and + `airbyte_server_port` can be replaced by passing an `airbyte_server` block + instance to generate the `AirbyteClient`. Using the `airbyte_server` block is + preferred, but the individual kwargs remain for backwards compatibility. + Args: - airbyte_server_host: Airbyte instance hostname where connection is configured. - airbyte_server_port: Port where Airbyte instance is listening. - airbyte_api_version: Version of Airbyte API to use to export configuration. + airbyte_server: An `AirbyteServer` block for generating an `AirbyteClient`. + airbyte_server_host: Airbyte server host to connect to. + airbyte_server_port: Airbyte server port to connect to. + airbyte_api_version: Airbyte API version to use. timeout: Timeout in seconds on the `httpx.AsyncClient`. Returns: @@ -35,6 +44,7 @@ async def export_configuration( from prefect import flow, task from prefect_airbyte.configuration import export_configuration + from prefect_airbyte.server import AirbyteServer @task def zip_and_write_somewhere( @@ -50,9 +60,7 @@ def example_export_configuration_flow(): # Run other tasks and subflows here airbyte_config = export_configuration( - airbyte_server_host="localhost", - airbyte_server_port="8000", - airbyte_api_version="v1", + airbyte_server=AirbyteServer.load("oss-airbyte") ) zip_and_write_somewhere(airbyte_config=airbyte_config) @@ -60,17 +68,38 @@ def example_export_configuration_flow(): example_trigger_sync_flow() ``` """ + logger = get_run_logger() + + if not airbyte_server: + warn( + "The use of `airbyte_server_host`, `airbyte_server_port`, and " + "`airbyte_api_version` is deprecated and will be removed in a " + "future release. Please pass an `airbyte_server` block to this " + "task instead.", + DeprecationWarning, + stacklevel=2, + ) + if any([airbyte_server_host, airbyte_server_port, airbyte_api_version]): + airbyte_server = AirbyteServer( + server_host=airbyte_server_host or "localhost", + server_port=airbyte_server_port or 8000, + api_version=airbyte_api_version or "v1", + ) + else: + airbyte_server = AirbyteServer() + else: + if any([airbyte_server_host, airbyte_server_port, airbyte_api_version]): + logger.info( + "Ignoring `airbyte_server_host`, `airbyte_api_version`, " + "and `airbyte_server_port` because `airbyte_server` block " + " was passed. Using API URL from `airbyte_server` block: " + f"{airbyte_server.base_url!r}." + ) - logger = get_logger() - - airbyte_base_url = ( - f"http://{airbyte_server_host}:" - f"{airbyte_server_port}/api/{airbyte_api_version}" - ) - - airbyte = AirbyteClient(logger, airbyte_base_url, timeout=timeout) + async with airbyte_server.get_client( + logger=logger, timeout=timeout + ) as airbyte_client: - logger.info("Initiating export of Airbyte configuration") - airbyte_config = await airbyte.export_configuration() + logger.info("Initiating export of Airbyte configuration") - return airbyte_config + return await airbyte_client.export_configuration() diff --git a/prefect_airbyte/connections.py b/prefect_airbyte/connections.py index a9d08b3..d660551 100644 --- a/prefect_airbyte/connections.py +++ b/prefect_airbyte/connections.py @@ -1,12 +1,13 @@ """Tasks for connecting to Airbyte and triggering connection syncs""" import uuid from asyncio import sleep +from typing import Optional +from warnings import warn -from prefect import task -from prefect.logging.loggers import get_logger +from prefect import get_run_logger, task from prefect_airbyte import exceptions as err -from prefect_airbyte.client import AirbyteClient +from prefect_airbyte.server import AirbyteServer # Connection statuses CONNECTION_STATUS_ACTIVE = "active" @@ -22,9 +23,10 @@ @task async def trigger_sync( connection_id: str, - airbyte_server_host: str = "localhost", - airbyte_server_port: int = "8000", - airbyte_api_version: str = "v1", + airbyte_server: Optional[AirbyteServer] = None, + airbyte_server_host: Optional[str] = None, + airbyte_server_port: Optional[int] = None, + airbyte_api_version: Optional[str] = None, poll_interval_s: int = 15, status_updates: bool = False, timeout: int = 5, @@ -43,11 +45,17 @@ async def trigger_sync( will only complete when the sync has completed or when it receives an error status code from an API call. + As of `prefect-airbyte==0.1.3`, the kwargs `airbyte_server_host` and + `airbyte_server_port` can be replaced by passing an `airbyte_server` block + instance to generate the `AirbyteClient`. Using the `airbyte_server` block is + preferred, but the individual kwargs remain for backwards compatibility. + Args: connection_id: Airbyte connection ID to trigger a sync for. - airbyte_server_host: Airbyte instance hostname where connection is configured. - airbyte_server_port: Port where Airbyte instance is listening. - airbyte_api_version: Version of Airbyte API to use to trigger connection sync. + airbyte_server: An `AirbyteServer` block to create an `AirbyteClient`. + airbyte_server_host: Airbyte server host to connect to. + airbyte_server_port: Airbyte server port to connect to. + airbyte_api_version: Airbyte API version to use. poll_interval_s: How often to poll Airbyte for sync status. status_updates: Whether to log sync job status while polling. timeout: The POST request `timeout` for the `httpx.AsyncClient`. @@ -68,7 +76,7 @@ async def trigger_sync( ```python from prefect import flow from prefect_airbyte.connections import trigger_sync - + from prefect_airbyte.server import AirbyteServer @flow def example_trigger_sync_flow(): @@ -76,13 +84,40 @@ def example_trigger_sync_flow(): # Run other tasks and subflows here trigger_sync( + airbyte_server=AirbyteServer.load("oss-airbyte"), connection_id="your-connection-id-to-sync" ) example_trigger_sync_flow() ``` """ - logger = get_logger() + logger = get_run_logger() + + if not airbyte_server: + warn( + "The use of `airbyte_server_host`, `airbyte_server_port`, and " + "`airbyte_api_version` is deprecated and will be removed in a " + "future release. Please pass an `airbyte_server` block to this " + "task instead.", + DeprecationWarning, + stacklevel=2, + ) + if any([airbyte_server_host, airbyte_server_port, airbyte_api_version]): + airbyte_server = AirbyteServer( + server_host=airbyte_server_host or "localhost", + server_port=airbyte_server_port or 8000, + api_version=airbyte_api_version or "v1", + ) + else: + airbyte_server = AirbyteServer() + else: + if any([airbyte_server_host, airbyte_server_port, airbyte_api_version]): + logger.info( + "Ignoring `airbyte_server_host`, `airbyte_api_version`, " + "and `airbyte_server_port` because `airbyte_server` block " + " was passed. Using API URL from `airbyte_server` block: " + f"{airbyte_server.base_url!r}." + ) try: uuid.UUID(connection_id) @@ -92,64 +127,62 @@ def example_trigger_sync_flow(): i.e. 32 hex characters, including hyphens." ) - # see https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com - # /rapidoc-api-docs.html#overview - airbyte_base_url = ( - f"http://{airbyte_server_host}:" - f"{airbyte_server_port}/api/{airbyte_api_version}" - ) - - airbyte = AirbyteClient(logger, airbyte_base_url, timeout=timeout) - - logger.info( - f"Getting Airbyte Connection {connection_id}, poll interval " - f"{poll_interval_s} seconds, airbyte_base_url {airbyte_base_url}" - ) - - connection_status = await airbyte.get_connection_status(connection_id) + async with airbyte_server.get_client( + logger=logger, timeout=timeout + ) as airbyte_client: - if connection_status == CONNECTION_STATUS_ACTIVE: - # Trigger manual sync on the Connection ... - job_id, job_created_at = await airbyte.trigger_manual_sync_connection( - connection_id + logger.info( + f"Getting Airbyte Connection {connection_id}, poll interval " + f"{poll_interval_s} seconds, airbyte_base_url {airbyte_server.base_url}" ) - job_status = JOB_STATUS_PENDING - - while job_status not in [JOB_STATUS_FAILED, JOB_STATUS_SUCCEEDED]: - job_status, job_created_at, job_updated_at = await airbyte.get_job_status( - job_id + connection_status = await airbyte_client.get_connection_status(connection_id) + + if connection_status == CONNECTION_STATUS_ACTIVE: + # Trigger manual sync on the Connection ... + ( + job_id, + job_created_at, + ) = await airbyte_client.trigger_manual_sync_connection(connection_id) + + job_status = JOB_STATUS_PENDING + + while job_status not in [JOB_STATUS_FAILED, JOB_STATUS_SUCCEEDED]: + ( + job_status, + job_created_at, + job_updated_at, + ) = await airbyte_client.get_job_status(job_id) + + # pending┃running┃incomplete┃failed┃succeeded┃cancelled + if job_status == JOB_STATUS_SUCCEEDED: + logger.info(f"Job {job_id} succeeded.") + elif job_status == JOB_STATUS_FAILED: + logger.error(f"Job {job_id} failed.") + raise err.AirbyteSyncJobFailed(f"Job {job_id} failed.") + else: + if status_updates: + logger.info(job_status) + # wait for next poll interval + await sleep(poll_interval_s) + + return { + "connection_id": connection_id, + "status": connection_status, + "job_status": job_status, + "job_created_at": job_created_at, + "job_updated_at": job_updated_at, + } + elif connection_status == CONNECTION_STATUS_INACTIVE: + logger.error( + f"Connection: {connection_id} is inactive" + " - you'll need to enable it in your Airbyte instance" + ) + raise err.AirbyteConnectionInactiveException( + f"Please enable the Connection {connection_id} in Airbyte instance." + ) + elif connection_status == CONNECTION_STATUS_DEPRECATED: + logger.error(f"Connection {connection_id} is deprecated.") + raise err.AirbyeConnectionDeprecatedException( + f"Connection {connection_id} is deprecated." ) - - # pending┃running┃incomplete┃failed┃succeeded┃cancelled - if job_status == JOB_STATUS_SUCCEEDED: - logger.info(f"Job {job_id} succeeded.") - elif job_status == JOB_STATUS_FAILED: - logger.error(f"Job {job_id} failed.") - raise err.AirbyteSyncJobFailed(f"Job {job_id} failed.") - else: - if status_updates: - logger.info(job_status) - # wait for next poll interval - await sleep(poll_interval_s) - - return { - "connection_id": connection_id, - "status": connection_status, - "job_status": job_status, - "job_created_at": job_created_at, - "job_updated_at": job_updated_at, - } - elif connection_status == CONNECTION_STATUS_INACTIVE: - logger.error( - f"Connection: {connection_id} is inactive" - " - you'll need to enable it in your Airbyte instance" - ) - raise err.AirbyteConnectionInactiveException( - f"Please enable the Connection {connection_id} in Airbyte instance." - ) - elif connection_status == CONNECTION_STATUS_DEPRECATED: - logger.error(f"Connection {connection_id} is deprecated.") - raise err.AirbyeConnectionDeprecatedException( - f"Connection {connection_id} is deprecated." - ) diff --git a/prefect_airbyte/server.py b/prefect_airbyte/server.py new file mode 100644 index 0000000..cac78bf --- /dev/null +++ b/prefect_airbyte/server.py @@ -0,0 +1,99 @@ +"""A module for defining OSS Airbyte interactions with Prefect.""" + +from logging import Logger + +from prefect.blocks.core import Block +from pydantic import Field, SecretStr + +from prefect_airbyte.client import AirbyteClient + + +class AirbyteServer(Block): + """A block representing an Airbyte server for generating `AirbyteClient` instances. + + Attributes: + username: Username for Airbyte API. + password: Password for Airbyte API. + server_host: Hostname for Airbyte API. + server_port: Port for Airbyte API. + api_version: Version of Airbyte API to use. + use_ssl: Whether to use a secure url for calls to the Airbyte API. + + Example: + Create an `AirbyteServer` block for an Airbyte instance running on localhost: + ```python + from prefect import flow + from prefect_airbyte.connection import trigger_sync + from prefect_airbyte.server import AirbyteServer + + @flow + def airbyte_orchestration_flow(): + airbyte_server = AirbyteServer() + trigger_sync( + airbyte_server=airbyte_server, + connection_id="my_connection_id", + ) + ``` + """ + + _block_type_name = "Airbyte Server" + _block_type_slug = "airbyte-server" + _logo_url = "https://images.ctfassets.net/zscdif0zqppk/6gm7wsC7ANnKYQsm7oiSYz/aac1ad5e054d35d9e24af8d6ed3aed5f/59758427?h=250" # noqa + + username: str = Field( + default="airbyte", + description="Username to authenticate with Airbyte API.", + ) + + password: SecretStr = Field( + default=SecretStr("password"), + description="Password to authenticate with Airbyte API.", + ) + + server_host: str = Field( + default="localhost", + description="Host address of Airbyte server.", + example="127.0.0.1", + ) + + server_port: int = Field( + default=8000, + description="Port number of Airbyte server.", + ) + + api_version: str = Field( + default="v1", + description="Airbyte API version to use.", + title="API Version", + ) + + use_ssl: bool = Field( + default=False, + description="Whether to use SSL when connecting to Airbyte server.", + title="Use SSL", + ) + + @property + def base_url(self) -> str: + """Property containing the base URL for the Airbyte API.""" + protocol = "https" if self.use_ssl else "http" + return ( + f"{protocol}://{self.server_host}:{self.server_port}/api/{self.api_version}" + ) + + def get_client(self, logger: Logger, timeout: int = 10) -> AirbyteClient: + """Returns an `AirbyteClient` instance for interacting with the Airbyte API. + + Args: + logger: Logger instance used to log messages related to API calls. + timeout: The number of seconds to wait before an API call times out. + + Returns: + An `AirbyteClient` instance. + """ + return AirbyteClient( + logger=logger, + airbyte_base_url=self.base_url, + auth=(self.username, self.password.get_secret_value()), + timeout=timeout, + ) diff --git a/tests/conftest.py b/tests/conftest.py index 462d7d3..e2cbe11 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,11 +2,18 @@ import respx from httpx import Response +from prefect_airbyte.server import AirbyteServer + # connection fixtures / mocks CONNECTION_ID = "e1b2078f-882a-4f50-9942-cfe34b2d825b" +@pytest.fixture +def airbyte_server(): + return AirbyteServer() + + @pytest.fixture def airbyte_trigger_sync_response() -> dict: return { @@ -355,3 +362,20 @@ def mock_successful_config_export_calls( respx_mock.post(url=f"{base_airbyte_url}/deployment/export/").mock( return_value=Response(200, content=airbyte_good_export_configuration_response) ) + + +@respx.mock(assert_all_called=True) +@pytest.fixture +def mock_config_endpoint_not_found( + respx_mock, + base_airbyte_url, + airbyte_good_health_check_response, + airbyte_good_export_configuration_response, +): + respx_mock.get(url=f"{base_airbyte_url}/health/").mock( + return_value=Response(200, json=airbyte_good_health_check_response) + ) + + respx_mock.post(url=f"{base_airbyte_url}/deployment/export/").mock( + return_value=Response(404) + ) diff --git a/tests/test_configuration.py b/tests/test_configuration.py index dad536b..f138bd1 100644 --- a/tests/test_configuration.py +++ b/tests/test_configuration.py @@ -1,15 +1,43 @@ import pytest +from prefect.logging import disable_run_logger from prefect_airbyte import exceptions as err from prefect_airbyte.configuration import export_configuration +from prefect_airbyte.exceptions import AirbyteExportConfigurationFailed -async def test_export_configuration(mock_successful_config_export_calls): - export = await export_configuration.fn() +async def test_export_configuration( + mock_successful_config_export_calls, airbyte_server +): + with disable_run_logger(): + export = await export_configuration.fn(airbyte_server=airbyte_server) - assert type(export) is bytes + assert type(export) is bytes -async def test_export_configuration_failed_health(mock_failed_health_check_calls): +async def test_export_configuration_using_kwargs( + mock_successful_config_export_calls, airbyte_server +): + with disable_run_logger(): + export = await export_configuration.fn( + airbyte_server_host="localhost", + airbyte_server_port=8000, + ) + + assert type(export) is bytes + + +async def test_export_configuration_failed_health( + mock_failed_health_check_calls, airbyte_server +): with pytest.raises(err.AirbyteServerNotHealthyException): - await export_configuration.fn() + with disable_run_logger(): + await export_configuration.fn(airbyte_server=airbyte_server) + + +async def test_export_configuration_raise_deprecation_warning( + mock_config_endpoint_not_found, airbyte_server +): + with pytest.raises(AirbyteExportConfigurationFailed): + with disable_run_logger(): + await export_configuration.fn(airbyte_server=airbyte_server) diff --git a/tests/test_connections.py b/tests/test_connections.py index c5109a0..e4e3295 100644 --- a/tests/test_connections.py +++ b/tests/test_connections.py @@ -1,4 +1,5 @@ import pytest +from prefect.logging import disable_run_logger from prefect_airbyte import exceptions as err from prefect_airbyte.connections import trigger_sync @@ -6,8 +7,17 @@ CONNECTION_ID = "e1b2078f-882a-4f50-9942-cfe34b2d825b" -async def test_successful_trigger_sync(mock_successful_connection_sync_calls): - trigger_sync_result = await trigger_sync.fn(connection_id=CONNECTION_ID) +async def example_trigger_sync_flow(airbyte_server=None, **kwargs): + with disable_run_logger(): + return await trigger_sync.fn( + airbyte_server=airbyte_server, connection_id=CONNECTION_ID, **kwargs + ) + + +async def test_successful_trigger_sync( + mock_successful_connection_sync_calls, airbyte_server +): + trigger_sync_result = await example_trigger_sync_flow(airbyte_server) assert type(trigger_sync_result) is dict @@ -20,31 +30,50 @@ async def test_successful_trigger_sync(mock_successful_connection_sync_calls): } -async def test_cancelled_trigger_manual_sync(mock_cancelled_connection_sync_calls): +async def test_cancelled_trigger_manual_sync( + mock_cancelled_connection_sync_calls, airbyte_server +): with pytest.raises(err.AirbyteSyncJobFailed): - await trigger_sync.fn(connection_id=CONNECTION_ID) + await example_trigger_sync_flow(airbyte_server) -async def test_connection_sync_inactive(mock_inactive_sync_calls): +async def test_connection_sync_inactive(mock_inactive_sync_calls, airbyte_server): with pytest.raises(err.AirbyteConnectionInactiveException): - await trigger_sync.fn(connection_id=CONNECTION_ID) + await example_trigger_sync_flow(airbyte_server) -async def test_failed_trigger_sync(mock_failed_connection_sync_calls): +async def test_failed_trigger_sync(mock_failed_connection_sync_calls, airbyte_server): with pytest.raises(err.AirbyteSyncJobFailed): - await trigger_sync.fn(connection_id=CONNECTION_ID) + await example_trigger_sync_flow(airbyte_server) -async def test_bad_connection_id(mock_bad_connection_id_calls): +async def test_bad_connection_id(mock_bad_connection_id_calls, airbyte_server): with pytest.raises(err.ConnectionNotFoundException): - await trigger_sync.fn(connection_id=CONNECTION_ID) + await example_trigger_sync_flow(airbyte_server) -async def test_failed_health_check(mock_failed_health_check_calls): +async def test_failed_health_check(mock_failed_health_check_calls, airbyte_server): with pytest.raises(err.AirbyteServerNotHealthyException): - await trigger_sync.fn(connection_id=CONNECTION_ID) + await example_trigger_sync_flow(airbyte_server) -async def test_get_job_status_not_found(mock_invalid_job_status_calls): +async def test_get_job_status_not_found(mock_invalid_job_status_calls, airbyte_server): with pytest.raises(err.JobNotFoundException): - await trigger_sync.fn(connection_id=CONNECTION_ID) + await example_trigger_sync_flow(airbyte_server) + + +async def test_trigger_sync_with_kwargs(mock_successful_connection_sync_calls): + trigger_sync_result = await example_trigger_sync_flow( + airbyte_server_host="localhost", + airbyte_server_port=8000, + ) + + assert type(trigger_sync_result) is dict + + assert trigger_sync_result == { + "connection_id": "e1b2078f-882a-4f50-9942-cfe34b2d825b", + "status": "active", + "job_status": "succeeded", + "job_created_at": 1650644844, + "job_updated_at": 1650644844, + }