Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source S3: support of Parquet format #5305

Merged
merged 23 commits into from
Sep 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9f407ad
add parquet parser
gl-pix Aug 10, 2021
7ce4638
add integration tests for partquet formats
gl-pix Aug 11, 2021
479bfd3
add unit tests for parquet
gl-pix Aug 12, 2021
b454cc2
update docs and secrets
gl-pix Aug 13, 2021
f4888e7
Merge remote-tracking branch 'origin/master' into antixar/5102-source…
gl-pix Aug 13, 2021
96bff17
fix incorrect import for tests
gl-pix Aug 13, 2021
86bdb23
add lib pandas for unit tests
gl-pix Aug 13, 2021
3d0830f
revert changes of foreign connectors
gl-pix Aug 23, 2021
4cdc9fe
Merge remote-tracking branch 'origin' into antixar/5102-source-s3-sup…
gl-pix Aug 23, 2021
d311bab
update secret settings
gl-pix Aug 23, 2021
93bc077
Merge remote-tracking branch 'origin' into antixar/5102-source-s3-sup…
gl-pix Aug 23, 2021
344afd6
fix config values
gl-pix Aug 23, 2021
b874bb6
Update airbyte-integrations/connectors/source-s3/source_s3/source_fil…
antixar Aug 30, 2021
11711f6
Update airbyte-integrations/connectors/source-s3/source_s3/source_fil…
antixar Aug 30, 2021
c21a222
remove some unused default options
gl-pix Aug 30, 2021
ad07e27
Merge remote-tracking branch 'origin' into antixar/5102-source-s3-sup…
gl-pix Aug 30, 2021
dcdc0eb
update tests
gl-pix Aug 30, 2021
7febc6d
update docs
gl-pix Aug 31, 2021
8670d2c
bump its version
gl-pix Aug 31, 2021
7095f56
Merge remote-tracking branch 'origin' into antixar/5102-source-s3-sup…
gl-pix Sep 3, 2021
d535170
Merge remote-tracking branch 'origin' into antixar/5102-source-s3-sup…
gl-pix Sep 3, 2021
6c62c3e
Merge remote-tracking branch 'origin' into antixar/5102-source-s3-sup…
gl-pix Sep 4, 2021
af286f4
fix expected test
gl-pix Sep 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ jobs:
SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }}
SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }}
SOURCE_S3_TEST_CREDS: ${{ secrets.SOURCE_S3_TEST_CREDS }}
SOURCE_S3_PARQUET_CREDS: ${{ secrets.SOURCE_S3_PARQUET_CREDS }}
SOURCE_SHORTIO_TEST_CREDS: ${{ secrets.SOURCE_SHORTIO_TEST_CREDS }}
SOURCE_STRIPE_CREDS: ${{ secrets.SOURCE_STRIPE_CREDS }}
STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ jobs:
SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }}
SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }}
SOURCE_S3_TEST_CREDS: ${{ secrets.SOURCE_S3_TEST_CREDS }}
SOURCE_S3_PARQUET_CREDS: ${{ secrets.SOURCE_S3_PARQUET_CREDS }}
SOURCE_SHORTIO_TEST_CREDS: ${{ secrets.SOURCE_SHORTIO_TEST_CREDS }}
SOURCE_STRIPE_CREDS: ${{ secrets.SOURCE_STRIPE_CREDS }}
STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "69589781-7828-43c5-9f63-8925b1c1ccc2",
"name": "S3",
"dockerRepository": "airbyte/source-s3",
"dockerImageTag": "0.1.3",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/s3"
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
- sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
name: S3
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/sources/s3
- sourceDefinitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87
name: Sendgrid
Expand Down
22 changes: 15 additions & 7 deletions airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
FROM python:3.7-slim
FROM python:3.7-slim as base
FROM base as builder

# Bash is installed for more convenient debugging.
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*
RUN apt-get update
WORKDIR /airbyte/integration_code
COPY setup.py ./
RUN pip install --prefix=/install .

FROM base
WORKDIR /airbyte/integration_code
COPY source_s3 ./source_s3
COPY --from=builder /install /usr/local

COPY main.py ./
COPY setup.py ./
RUN pip install .
COPY source_s3 ./source_s3


ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-s3



Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,48 @@ tests:
spec:
- spec_path: "integration_tests/spec.json"
connection:
# for CSV format
- config_path: "secrets/config.json"
status: "succeed"
# for Parquet format
- config_path: "secrets/parquet_config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
# for CSV format
- config_path: "secrets/config.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
basic_read:
# for CSV format
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
- expect_records:
path: "integration_tests/expected_records.txt"
expect_records:
path: "integration_tests/expected_records.txt"
# for Parquet format
- config_path: "secrets/parquet_config.json"
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
expect_records:
path: "integration_tests/parquet_expected_records.txt"
incremental:
# for CSV format
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"

full_refresh:
# for CSV format
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
#!/usr/bin/env sh

# Build latest connector image
source_image=$(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)
echo "try to build the source image: ${source_image}"
docker build . -t ${source_image}

# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest

docker run --rm -it \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /tmp:/tmp \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"stream": {
"name": "test",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
{"stream": "test", "data": {"id": 5, "name": "77h4aiMP", "valid": true, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 6, "name": "Le35Wyic", "valid": true, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 7, "name": "xZhh1Kyl", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 8, "name": "M2t286iJ", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 8, "name": "M2t286iJ", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import pytest
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import SyncMode
from source_s3.source_files_abstract.fileformatparser import CsvParser
from source_s3.source_files_abstract.formats.csv_parser import CsvParser
from source_s3.source_files_abstract.stream import FileStream

HERE = Path(__file__).resolve().parent
Expand Down Expand Up @@ -136,15 +136,17 @@ def _stream_records_test_logic(
fs = self.stream_class("dataset", provider, format, path_pattern, str_user_schema)
LOGGER.info(f"Testing stream_records() in SyncMode:{sync_mode.value}")

assert fs._get_schema_map() == full_expected_schema # check we return correct schema from get_json_schema()
# check we return correct schema from get_json_schema()
assert fs._get_schema_map() == full_expected_schema

records = []
for stream_slice in fs.stream_slices(sync_mode=sync_mode, stream_state=current_state):
if stream_slice is not None:
# we need to do this in order to work out which extra columns (if any) we expect in this stream_slice
expected_columns = []
for file_dict in stream_slice:
file_reader = CsvParser(format) # TODO: if we ever test other filetypes in these tests this will need fixing
# TODO: if we ever test other filetypes in these tests this will need fixing
file_reader = CsvParser(format)
with file_dict["storagefile"].open(file_reader.is_binary) as f:
expected_columns.extend(list(file_reader.get_inferred_schema(f).keys()))
expected_columns = set(expected_columns) # de-dupe
Expand Down Expand Up @@ -550,7 +552,8 @@ def test_stream_records(
state=latest_state,
)
LOGGER.info(f"incremental state: {latest_state}")
time.sleep(1) # small delay to ensure next file gets later last_modified timestamp
# small delay to ensure next file gets later last_modified timestamp
time.sleep(1)
self.teardown_infra(cloud_bucket_name, self.credentials)

except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"streams": [
{
"stream": {
"name": "test",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"stream": "test", "data": {"number": 1.0, "name": "foo", "flag": true, "delta": -1.0, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-30T15:46:17+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1630795278000}
{"stream": "test", "data": {"number": 2.0, "name": null, "flag": false, "delta": 2.5, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-30T15:46:17+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1630795278000}
{"stream": "test", "data": {"number": 3.0, "name": "bar", "flag": null, "delta": 0.1, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-30T15:46:17+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1630795278000}
{"stream": "test", "data": {"number": null, "name": "baz", "flag": true, "delta": null, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-30T15:46:17+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1630795278000}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@
"format": {
"title": "Format",
"default": "csv",
"type": "object",
"oneOf": [
{
"title": "csv",
"description": "This connector utilises <a href=\"https: // arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html\" target=\"_blank\">PyArrow (Apache Arrow)</a> for CSV parsing.",
"type": "object",
"properties": {
"filetype": {
"title": "CsvFiletype",
"description": "This connector utilises <a href=\"https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html\" target=\"_blank\">PyArrow (Apache Arrow)</a> for CSV parsing.",
"enum": ["csv"],
"title": "Filetype",
"const": "csv",
"type": "string"
},
"delimiter": {
Expand Down Expand Up @@ -93,24 +94,41 @@
],
"type": "string"
}
},
"required": ["filetype"]
}
},
{
"title": "Coming Soon...",
"title": "parquet",
"description": "This connector utilises <a href=\"https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html\" target=\"_blank\">PyArrow (Apache Arrow)</a> for Parquet parsing.",
"type": "object",
"properties": {
"filetype": {
"title": "ParquetFiletype",
"description": "An enumeration.",
"enum": ["parquet"],
"title": "Filetype",
"const": "parquet",
"type": "string"
},
"buffer_size": {
"title": "Buffer Size",
"description": "Perform read buffering when deserializing individual column chunks. By default every group column will be loaded fully to memory. This option can help to optimize a work with memory if your data is particularly wide or failing during detection of OOM errors.",
"default": 0,
"type": "integer"
},
"columns": {
"title": "Columns",
"description": "If you only want to sync a subset of the columns from the file(s), add the columns you want here. Leave it empty to sync all columns.",
"type": "array",
"items": {
"type": "string"
}
},
"batch_size": {
"title": "Batch Size",
"description": "Maximum number of records per batch. Batches may be smaller if there aren’t enough rows in the file. This option can help to optimize a work with memory if your data is particularly wide or failing during detection of OOM errors.",
"default": 65536,
"type": "integer"
}
},
"required": ["filetype"]
}
}
],
"type": "object"
]
},
"provider": {
"title": "S3: Amazon Web Services",
Expand Down
9 changes: 8 additions & 1 deletion airbyte-integrations/connectors/source-s3/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk", "pyarrow==4.0.1", "smart-open[s3]==5.1.0", "wcmatch==8.2", "dill==0.3.4"]
MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1.7",
"pyarrow==4.0.1",
"smart-open[s3]==5.1.0",
"wcmatch==8.2",
"dill==0.3.4",
]

TEST_REQUIREMENTS = [
"pytest~=6.1",
"source-acceptance-test",
"pandas==1.3.1",
]

setup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def last_modified(self) -> datetime:
return obj.last_modified
# For some reason, this standard method above doesn't work for public files with no credentials so fall back on below
except NoCredentialsError as nce:
if self.use_aws_account(self._provider): # we don't expect this error if using credentials so throw it
# we don't expect this error if using credentials so throw it
if self.use_aws_account(self._provider):
raise nce
else:
return boto3.client("s3", config=ClientConfig(signature_version=UNSIGNED)).head_object(Bucket=bucket, Key=self.url)[
Expand Down
Loading