Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source S3: support of Parquet format #5305

Merged
merged 23 commits into from
Sep 4, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9f407ad
add parquet parser
gl-pix Aug 10, 2021
7ce4638
add integration tests for partquet formats
gl-pix Aug 11, 2021
479bfd3
add unit tests for parquet
gl-pix Aug 12, 2021
b454cc2
update docs and secrets
gl-pix Aug 13, 2021
f4888e7
Merge remote-tracking branch 'origin/master' into antixar/5102-source…
gl-pix Aug 13, 2021
96bff17
fix incorrect import for tests
gl-pix Aug 13, 2021
86bdb23
add lib pandas for unit tests
gl-pix Aug 13, 2021
3d0830f
revert changes of foreign connectors
gl-pix Aug 23, 2021
4cdc9fe
Merge remote-tracking branch 'origin' into antixar/5102-source-s3-sup…
gl-pix Aug 23, 2021
d311bab
update secret settings
gl-pix Aug 23, 2021
93bc077
Merge remote-tracking branch 'origin' into antixar/5102-source-s3-sup…
gl-pix Aug 23, 2021
344afd6
fix config values
gl-pix Aug 23, 2021
b874bb6
Update airbyte-integrations/connectors/source-s3/source_s3/source_fil…
antixar Aug 30, 2021
11711f6
Update airbyte-integrations/connectors/source-s3/source_s3/source_fil…
antixar Aug 30, 2021
c21a222
remove some unused default options
gl-pix Aug 30, 2021
ad07e27
Merge remote-tracking branch 'origin' into antixar/5102-source-s3-sup…
gl-pix Aug 30, 2021
dcdc0eb
update tests
gl-pix Aug 30, 2021
7febc6d
update docs
gl-pix Aug 31, 2021
8670d2c
bump its version
gl-pix Aug 31, 2021
7095f56
Merge remote-tracking branch 'origin' into antixar/5102-source-s3-sup…
gl-pix Sep 3, 2021
d535170
Merge remote-tracking branch 'origin' into antixar/5102-source-s3-sup…
gl-pix Sep 3, 2021
6c62c3e
Merge remote-tracking branch 'origin' into antixar/5102-source-s3-sup…
gl-pix Sep 4, 2021
af286f4
fix expected test
gl-pix Sep 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ jobs:
SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }}
SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }}
SOURCE_S3_TEST_CREDS: ${{ secrets.SOURCE_S3_TEST_CREDS }}
SOURCE_S3_PARQUET_CREDS: ${{ secrets.SOURCE_S3_PARQUET_CREDS }}
SOURCE_SHORTIO_TEST_CREDS: ${{ secrets.SOURCE_SHORTIO_TEST_CREDS }}
SOURCE_STRIPE_CREDS: ${{ secrets.SOURCE_STRIPE_CREDS }}
STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ jobs:
SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }}
SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }}
SOURCE_S3_TEST_CREDS: ${{ secrets.SOURCE_S3_TEST_CREDS }}
SOURCE_S3_PARQUET_CREDS: ${{ secrets.SOURCE_S3_PARQUET_CREDS }}
SOURCE_SHORTIO_TEST_CREDS: ${{ secrets.SOURCE_SHORTIO_TEST_CREDS }}
SOURCE_STRIPE_CREDS: ${{ secrets.SOURCE_STRIPE_CREDS }}
STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS }}
Expand Down
22 changes: 15 additions & 7 deletions airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
FROM python:3.7-slim
FROM python:3.7-slim as base
FROM base as builder

# Bash is installed for more convenient debugging.
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*
RUN apt-get update
WORKDIR /airbyte/integration_code
COPY setup.py ./
RUN pip install --prefix=/install .

FROM base
WORKDIR /airbyte/integration_code
COPY source_s3 ./source_s3
COPY --from=builder /install /usr/local

COPY main.py ./
COPY setup.py ./
RUN pip install .
COPY source_s3 ./source_s3


ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-s3



Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,48 @@ tests:
spec:
- spec_path: "integration_tests/spec.json"
connection:
# for CSV format
- config_path: "secrets/config.json"
status: "succeed"
# for Parquet format
- config_path: "secrets/parquet_config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
# for CSV format
- config_path: "secrets/config.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
basic_read:
# for CSV format
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
- expect_records:
path: "integration_tests/expected_records.txt"
expect_records:
path: "integration_tests/expected_records.txt"
# for Parquet format
- config_path: "secrets/parquet_config.json"
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
expect_records:
path: "integration_tests/parquet_expected_records.txt"
incremental:
# for CSV format
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"

full_refresh:
# for CSV format
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env sh

docker run --rm -it \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /tmp:/tmp \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"stream": {
"name": "test",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
{"stream": "test", "data": {"id": 5, "name": "77h4aiMP", "valid": true, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 6, "name": "Le35Wyic", "valid": true, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 7, "name": "xZhh1Kyl", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 8, "name": "M2t286iJ", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 8, "name": "M2t286iJ", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import pytest
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import SyncMode
from source_s3.source_files_abstract.fileformatparser import CsvParser
from source_s3.source_files_abstract.formats.csv_parser import CsvParser
from source_s3.source_files_abstract.stream import FileStream

HERE = Path(__file__).resolve().parent
Expand Down Expand Up @@ -136,15 +136,17 @@ def _stream_records_test_logic(
fs = self.stream_class("dataset", provider, format, path_pattern, str_user_schema)
LOGGER.info(f"Testing stream_records() in SyncMode:{sync_mode.value}")

assert fs._get_schema_map() == full_expected_schema # check we return correct schema from get_json_schema()
# check we return correct schema from get_json_schema()
assert fs._get_schema_map() == full_expected_schema

records = []
for stream_slice in fs.stream_slices(sync_mode=sync_mode, stream_state=current_state):
if stream_slice is not None:
# we need to do this in order to work out which extra columns (if any) we expect in this stream_slice
expected_columns = []
for file_dict in stream_slice:
file_reader = CsvParser(format) # TODO: if we ever test other filetypes in these tests this will need fixing
# TODO: if we ever test other filetypes in these tests this will need fixing
file_reader = CsvParser(format)
with file_dict["storagefile"].open(file_reader.is_binary) as f:
expected_columns.extend(list(file_reader.get_inferred_schema(f).keys()))
expected_columns = set(expected_columns) # de-dupe
Expand Down Expand Up @@ -550,7 +552,8 @@ def test_stream_records(
state=latest_state,
)
LOGGER.info(f"incremental state: {latest_state}")
time.sleep(1) # small delay to ensure next file gets later last_modified timestamp
# small delay to ensure next file gets later last_modified timestamp
time.sleep(1)
self.teardown_infra(cloud_bucket_name, self.credentials)

except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"streams": [
{
"stream": {
"name": "test",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"stream": "test", "data": {"number": 1.0, "name": "foo", "flag": true, "delta": -1.0, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-06T16:59:36+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1628688385000}
{"stream": "test", "data": {"number": 2.0, "name": null, "flag": false, "delta": 2.5, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-06T16:59:36+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1628688385000}
{"stream": "test", "data": {"number": 3.0, "name": "bar", "flag": null, "delta": 0.1, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-06T16:59:36+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1628688385000}
{"stream": "test", "data": {"number": null, "name": "baz", "flag": true, "delta": null, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-06T16:59:36+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1628688385000}
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,54 @@
"required": ["filetype"]
},
{
"title": "Coming Soon...",
"title": "parquet",
"type": "object",
"properties": {
"filetype": {
"title": "ParquetFiletype",
"description": "An enumeration.",
"enum": ["parquet"],
"type": "string"
},
"buffer_size": {
"title": "Buffer Size",
"description": "perform read buffering when deserializing individual column chunks. Otherwise IO calls are unbuffered.",
"default": 0,
"type": "integer"
},
"memory_map": {
"title": "Memory Map",
"description": "If the source is a file path, use a memory map to read file, which can improve performance in some environments.",
"default": false,
"type": "boolean"
},
"columns": {
"title": "Columns",
"description": "If not None, only these columns will be read from the file.",
"type": "array",
"items": {
"type": "string"
}
},
"batch_size": {
"title": "Batch Size",
"description": "Maximum number of records per batch. Batches may be smaller if there aren’t enough rows in the file.",
"default": 65536,
"type": "integer"
},
"row_groups": {
"title": "Row Groups",
"description": "Only these row groups will be read from the file.",
"type": "array",
"items": {
"type": "integer"
}
},
"use_threads": {
"title": "Use Threads",
"description": "Perform multi-threaded column reads.",
"default": true,
"type": "boolean"
}
},
"required": ["filetype"]
Expand Down
9 changes: 8 additions & 1 deletion airbyte-integrations/connectors/source-s3/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk", "pyarrow==4.0.1", "smart-open[s3]==5.1.0", "wcmatch==8.2", "dill==0.3.4"]
MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1.7",
"pyarrow==4.0.1",
"smart-open[s3]==5.1.0",
"wcmatch==8.2",
"dill==0.3.4",
]

TEST_REQUIREMENTS = [
"pytest~=6.1",
"source-acceptance-test",
"pandas==1.3.1",
]

setup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def last_modified(self) -> datetime:
return obj.last_modified
# For some reason, this standard method above doesn't work for public files with no credentials so fall back on below
except NoCredentialsError as nce:
if self.use_aws_account(self._provider): # we don't expect this error if using credentials so throw it
# we don't expect this error if using credentials so throw it
if self.use_aws_account(self._provider):
raise nce
else:
return boto3.client("s3", config=ClientConfig(signature_version=UNSIGNED)).head_object(Bucket=bucket, Key=self.url)[
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

from abc import ABC, abstractmethod
from typing import Any, BinaryIO, Iterator, Mapping, TextIO, Union

import pyarrow as pa
from airbyte_cdk.logger import AirbyteLogger


class AbstractFileParser(ABC):
def __init__(self, format: dict, master_schema: dict = None):
"""
:param format: file format specific mapping as described in spec.json
:param master_schema: superset schema determined from all files, might be unused for some formats, defaults to None
"""
self._format = format
self._master_schema = (
master_schema # this may need to be used differently by some formats, pyarrow allows extra columns in csv schema
)
self.logger = AirbyteLogger()

@property
@abstractmethod
def is_binary(self):
"""
Override this per format so that file-like objects passed in are currently opened as binary or not
"""

@abstractmethod
def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
"""
Override this with format-specifc logic to infer the schema of file
Note: needs to return inferred schema with JsonSchema datatypes

:param file: file-like object (opened via StorageFile)
:return: mapping of {columns:datatypes} where datatypes are JsonSchema types
"""

@abstractmethod
def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
"""
Override this with format-specifc logic to stream each data row from the file as a mapping of {columns:values}
Note: avoid loading the whole file into memory to avoid OOM breakages

:param file: file-like object (opened via StorageFile)
:yield: data record as a mapping of {columns:values}
"""

@staticmethod
def json_type_to_pyarrow_type(typ: str, reverse: bool = False, logger: AirbyteLogger = AirbyteLogger()) -> str:
"""
Converts Json Type to PyArrow types to (or the other way around if reverse=True)

:param typ: Json type if reverse is False, else PyArrow type
:param reverse: switch to True for PyArrow type -> Json type, defaults to False
:param logger: defaults to AirbyteLogger()
:return: PyArrow type if reverse is False, else Json type
"""
str_typ = str(typ)
# this is a map of airbyte types to pyarrow types. The first list element of the pyarrow types should be the one to use where required.
map = {
"boolean": ("bool_", "bool"),
"integer": ("int64", "int8", "int16", "int32", "uint8", "uint16", "uint32", "uint64"),
"number": ("float64", "float16", "float32", "decimal128", "decimal256", "halffloat", "float", "double"),
"string": ("large_string", "string"),
# TODO: support object type rather than coercing to string
"object": ("large_string",),
# TODO: support array type rather than coercing to string
"array": ("large_string",),
"null": ("large_string",),
}
if not reverse:
for json_type, pyarrow_types in map.items():
if str_typ.lower() == json_type:
return getattr(
pa, pyarrow_types[0]
).__call__() # better way might be necessary when we decide to handle more type complexity
logger.debug(f"JSON type '{str_typ}' is not mapped, falling back to default conversion to large_string")
return pa.large_string()
else:
for json_type, pyarrow_types in map.items():
if any([str_typ.startswith(pa_type) for pa_type in pyarrow_types]):
return json_type
logger.debug(f"PyArrow type '{str_typ}' is not mapped, falling back to default conversion to string")
return "string" # default type if unspecified in map

@staticmethod
def json_schema_to_pyarrow_schema(schema: Mapping[str, Any], reverse: bool = False) -> Mapping[str, Any]:
"""
Converts a schema with JsonSchema datatypes to one with PyArrow types (or the other way if reverse=True)
This utilises json_type_to_pyarrow_type() to convert each datatype

:param schema: json/pyarrow schema to convert
:param reverse: switch to True for PyArrow schema -> Json schema, defaults to False
:return: converted schema dict
"""
new_schema = {}

for column, json_type in schema.items():
new_schema[column] = AbstractFileParser.json_type_to_pyarrow_type(json_type, reverse=reverse)

return new_schema
Loading