Skip to content

Commit

Permalink
instantiate a declarative connector and allow for reads to be invoked…
Browse files Browse the repository at this point in the history
… from the connector builder server (#19333)

* instantiate a declarative connector and allow for reads to be invoked from the connector builder server

* various pr feedback and cleaning up the code a bit

* refactor grouping logic into a separate function to illustrate how groups are being emitted

* fix the webapp to also pass config to the stream list endpoint

* fix dereference field

* replace error message handling with default FastAPI HTTPException

* pr feedback: more error messaging and some code reuse

* formatting

* regenerate open api spec
  • Loading branch information
brianjlai authored Nov 15, 2022
1 parent 15c3d62 commit 23679f5
Show file tree
Hide file tree
Showing 11 changed files with 876 additions and 9 deletions.
4 changes: 2 additions & 2 deletions airbyte-connector-builder-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ airbytePython {
task generateOpenApiPythonServer(type: GenerateTask){
outputs.upToDateWhen { false }

def generatedCodeDir = "$buildDir/server"
def generatedCodeDir = "$buildDir/airbyte_connector_builder_server"
inputSpec = "$rootDir.absolutePath/airbyte-connector-builder-server/src/main/openapi/openapi.yaml"
outputDir = "$buildDir/airbyte_connector_builder_server"
outputDir = generatedCodeDir

generatorName = "python-fastapi"
configFile = "$projectDir/openapi/generator_config.yaml"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

# coding: utf-8

from __future__ import annotations
from datetime import date, datetime # noqa: F401

import re # noqa: F401
from typing import Any, Dict, List, Optional # noqa: F401

from pydantic import AnyUrl, BaseModel, EmailStr, validator # noqa: F401


class StreamReadSliceDescriptor(BaseModel):
"""NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
Do not edit the class manually.
StreamReadSliceDescriptor - a model defined in OpenAPI
start_datetime: The start_datetime of this StreamReadSliceDescriptor [Optional].
list_item: The list_item of this StreamReadSliceDescriptor [Optional].
"""

start_datetime: Optional[datetime] = None
list_item: Optional[str] = None

StreamReadSliceDescriptor.update_forward_refs()
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ class StreamsListReadStreams(BaseModel):
"""

name: str
url: AnyUrl
url: str

StreamsListReadStreams.update_forward_refs()
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ class StreamsListRequestBody(BaseModel):
StreamsListRequestBody - a model defined in OpenAPI
manifest: The manifest of this StreamsListRequestBody.
config: The config of this StreamsListRequestBody.
"""

manifest: Dict[str, Any]
config: Dict[str, Any]

StreamsListRequestBody.update_forward_refs()
139 changes: 136 additions & 3 deletions airbyte-connector-builder-server/connector_builder/impl/default_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,31 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
import logging
from json import JSONDecodeError
from typing import Any, Dict, Iterable, Optional, Union
from urllib.parse import parse_qs, urljoin, urlparse

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type
from connector_builder.generated.apis.default_api_interface import DefaultApi
from connector_builder.generated.models.http_request import HttpRequest
from connector_builder.generated.models.http_response import HttpResponse
from connector_builder.generated.models.stream_read import StreamRead
from connector_builder.generated.models.stream_read_pages import StreamReadPages
from connector_builder.generated.models.stream_read_request_body import StreamReadRequestBody
from connector_builder.generated.models.stream_read_slices import StreamReadSlices
from connector_builder.generated.models.streams_list_read import StreamsListRead
from connector_builder.generated.models.streams_list_read_streams import StreamsListReadStreams
from connector_builder.generated.models.streams_list_request_body import StreamsListRequestBody
from fastapi import Body
from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter
from fastapi import Body, HTTPException
from jsonschema import ValidationError


class DefaultApiImpl(DefaultApi):
logger = logging.getLogger("airbyte.connector-builder")

async def get_manifest_template(self) -> str:
return """version: "0.1.0"
Expand Down Expand Up @@ -65,7 +80,125 @@ async def get_manifest_template(self) -> str:
"""

async def list_streams(self, streams_list_request_body: StreamsListRequestBody = Body(None, description="")) -> StreamsListRead:
raise Exception("not yet implemented")
"""
Takes in a low code manifest and a config to resolve the list of streams that are available for testing
:param streams_list_request_body: Input parameters to retrieve the list of available streams
:return: Stream objects made up of a stream name and the HTTP URL it will send requests to
"""
adapter = self._create_low_code_adapter(manifest=streams_list_request_body.manifest)

stream_list_read = []
try:
for http_stream in adapter.get_http_streams(streams_list_request_body.config):
stream_list_read.append(
StreamsListReadStreams(
name=http_stream.name,
url=urljoin(http_stream.url_base, http_stream.path()),
)
)
except Exception as error:
raise HTTPException(status_code=400, detail=f"Could not list streams with with error: {error.args[0]}")
return StreamsListRead(streams=stream_list_read)

async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Body(None, description="")) -> StreamRead:
raise Exception("not yet implemented")
"""
Using the provided manifest and config, invokes a sync for the specified stream and returns groups of Airbyte messages
that are produced during the read operation
:param stream_read_request_body: Input parameters to trigger the read operation for a stream
:return: Airbyte record messages produced by the sync grouped by slice and page
"""
adapter = self._create_low_code_adapter(manifest=stream_read_request_body.manifest)

single_slice = StreamReadSlices(pages=[])
log_messages = []
try:
for message_group in self._get_message_groups(
adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config)
):
if isinstance(message_group, AirbyteLogMessage):
log_messages.append({"message": message_group.message})
else:
single_slice.pages.append(message_group)
except Exception as error:
# TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec
raise HTTPException(status_code=400, detail=f"Could not perform read with with error: {error.args[0]}")

return StreamRead(logs=log_messages, slices=[single_slice])

def _get_message_groups(self, messages: Iterable[AirbyteMessage]) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]:
"""
Message groups are partitioned according to when request log messages are received. Subsequent response log messages
and record messages belong to the prior request log message and when we encounter another request, append the latest
message group.
Messages received from the CDK read operation will always arrive in the following order:
{type: LOG, log: {message: "request: ..."}}
{type: LOG, log: {message: "response: ..."}}
... 0 or more record messages
{type: RECORD, record: {data: ...}}
{type: RECORD, record: {data: ...}}
Repeats for each request/response made
Note: The exception is that normal log messages can be received at any time which are not incorporated into grouping
"""
first_page = True
current_records = []
current_page_request: Optional[HttpRequest] = None
current_page_response: Optional[HttpResponse] = None
for message in messages:
if first_page and message.type == Type.LOG and message.log.message.startswith("request:"):
first_page = False
request = self._create_request_from_log_message(message.log)
current_page_request = request
elif message.type == Type.LOG and message.log.message.startswith("request:"):
if not current_page_request or not current_page_response:
raise ValueError("Every message grouping should have at least one request and response")
yield StreamReadPages(request=current_page_request, response=current_page_response, records=current_records)
current_page_request = self._create_request_from_log_message(message.log)
current_records = []
elif message.type == Type.LOG and message.log.message.startswith("response:"):
current_page_response = self._create_response_from_log_message(message.log)
elif message.type == Type.LOG:
yield message.log
elif message.type == Type.RECORD:
current_records.append(message.record.data)
else:
if not current_page_request or not current_page_response:
raise ValueError("Every message grouping should have at least one request and response")
yield StreamReadPages(request=current_page_request, response=current_page_response, records=current_records)

def _create_request_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpRequest]:
# TODO: As a temporary stopgap, the CDK emits request data as a log message string. Ideally this should come in the
# form of a custom message object defined in the Airbyte protocol, but this unblocks us in the immediate while the
# protocol change is worked on.
raw_request = log_message.message.partition("request:")[2]
try:
request = json.loads(raw_request)
url = urlparse(request.get("url", ""))
full_path = f"{url.scheme}://{url.hostname}{url.path}" if url else ""
parameters = parse_qs(url.query) or None
return HttpRequest(url=full_path, headers=request.get("headers"), parameters=parameters, body=request.get("body"))
except JSONDecodeError as error:
self.logger.warning(f"Failed to parse log message into request object with error: {error}")
return None

def _create_response_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpResponse]:
# TODO: As a temporary stopgap, the CDK emits response data as a log message string. Ideally this should come in the
# form of a custom message object defined in the Airbyte protocol, but this unblocks us in the immediate while the
# protocol change is worked on.
raw_response = log_message.message.partition("response:")[2]
try:
response = json.loads(raw_response)
body = json.loads(response.get("body", "{}"))
return HttpResponse(status=response.get("status_code"), body=body, headers=response.get("headers"))
except JSONDecodeError as error:
self.logger.warning(f"Failed to parse log message into response object with error: {error}")
return None

@staticmethod
def _create_low_code_adapter(manifest: Dict[str, Any]) -> LowCodeSourceAdapter:
try:
return LowCodeSourceAdapter(manifest=manifest)
except ValidationError as error:
# TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec
raise HTTPException(status_code=400, detail=f"Invalid connector manifest with error: {error.message}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Dict, Iterable, List

from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.yaml_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.streams.http import HttpStream


class LowCodeSourceAdapter:
def __init__(self, manifest: Dict[str, Any]):
# Request and response messages are only emitted for a sources that have debug turned on
self._source = ManifestDeclarativeSource(manifest, debug=True)

def get_http_streams(self, config: Dict[str, Any]) -> List[HttpStream]:
http_streams = []
for stream in self._source.streams(config=config):
if isinstance(stream, DeclarativeStream):
if isinstance(stream.retriever, HttpStream):
http_streams.append(stream.retriever)
else:
raise TypeError(
f"A declarative stream should only have a retriever of type HttpStream, but received: {stream.retriever.__class__}")
else:
raise TypeError(f"A declarative source should only contain streams of type DeclarativeStream, but received: {stream.__class__}")
return http_streams

def read_stream(self, stream: str, config: Dict[str, Any]) -> Iterable[AirbyteMessage]:
configured_catalog = ConfiguredAirbyteCatalog.parse_obj(
{
"streams": [
{
"stream": {
"name": stream,
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
}
]
}
)
generator = self._source.read(logger=self._source.logger, config=config, catalog=configured_catalog)
for message in generator:
yield message
2 changes: 1 addition & 1 deletion airbyte-connector-builder-server/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
},
packages=find_packages(exclude=("unit_tests", "integration_tests", "docs")),
package_data={},
install_requires=["fastapi", "uvicorn"],
install_requires=["airbyte-cdk~=0.8", "fastapi", "uvicorn"],
python_requires=">=3.9.11",
extras_require={
"tests": ["MyPy~=0.812", "pytest~=6.2.5", "pytest-cov", "pytest-mock", "pytest-recording", "requests-mock", "pre-commit"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,15 @@ components:
type: object
required:
- manifest
- config
properties:
manifest:
type: object
description: The config-based connector manifest contents
# $ref: "#/components/schemas/ConnectorManifest"
config:
type: object
description: The config blob containing the user inputs for testing
StreamsListRead:
type: object
required:
Expand All @@ -213,7 +217,6 @@ components:
description: The name of the stream
url:
type: string
format: uri
description: The URL to which read requests will be made for this stream
# --- Potential addition for a later phase ---
# slices:
Expand Down
Loading

0 comments on commit 23679f5

Please sign in to comment.