diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml
index 61fd5fe72434..a78e942b24c2 100644
--- a/.github/workflows/publish-command.yml
+++ b/.github/workflows/publish-command.yml
@@ -143,6 +143,7 @@ jobs:
SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }}
SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }}
SOURCE_S3_TEST_CREDS: ${{ secrets.SOURCE_S3_TEST_CREDS }}
+ SOURCE_S3_PARQUET_CREDS: ${{ secrets.SOURCE_S3_PARQUET_CREDS }}
SOURCE_SHORTIO_TEST_CREDS: ${{ secrets.SOURCE_SHORTIO_TEST_CREDS }}
SOURCE_STRIPE_CREDS: ${{ secrets.SOURCE_STRIPE_CREDS }}
STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS }}
diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml
index c03e40b6c1ce..890c0a6c6129 100644
--- a/.github/workflows/test-command.yml
+++ b/.github/workflows/test-command.yml
@@ -143,6 +143,7 @@ jobs:
SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }}
SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }}
SOURCE_S3_TEST_CREDS: ${{ secrets.SOURCE_S3_TEST_CREDS }}
+ SOURCE_S3_PARQUET_CREDS: ${{ secrets.SOURCE_S3_PARQUET_CREDS }}
SOURCE_SHORTIO_TEST_CREDS: ${{ secrets.SOURCE_SHORTIO_TEST_CREDS }}
SOURCE_STRIPE_CREDS: ${{ secrets.SOURCE_STRIPE_CREDS }}
STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS }}
diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json
index d6da4a1c5ca5..a682a2d9ca6f 100644
--- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json
+++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json
@@ -2,6 +2,6 @@
"sourceDefinitionId": "69589781-7828-43c5-9f63-8925b1c1ccc2",
"name": "S3",
"dockerRepository": "airbyte/source-s3",
- "dockerImageTag": "0.1.3",
+ "dockerImageTag": "0.1.4",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/s3"
}
diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml
index de53bc1a22a3..f34e881abc4f 100644
--- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml
+++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml
@@ -78,7 +78,7 @@
- sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
name: S3
dockerRepository: airbyte/source-s3
- dockerImageTag: 0.1.3
+ dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/sources/s3
- sourceDefinitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87
name: Sendgrid
diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile
index e8972a969255..fc6a74b4e12f 100644
--- a/airbyte-integrations/connectors/source-s3/Dockerfile
+++ b/airbyte-integrations/connectors/source-s3/Dockerfile
@@ -1,16 +1,24 @@
-FROM python:3.7-slim
+FROM python:3.7-slim as base
+FROM base as builder
-# Bash is installed for more convenient debugging.
-RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*
+RUN apt-get update
+WORKDIR /airbyte/integration_code
+COPY setup.py ./
+RUN pip install --prefix=/install .
+FROM base
WORKDIR /airbyte/integration_code
-COPY source_s3 ./source_s3
+COPY --from=builder /install /usr/local
+
COPY main.py ./
-COPY setup.py ./
-RUN pip install .
+COPY source_s3 ./source_s3
+
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
-LABEL io.airbyte.version=0.1.3
+LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-s3
+
+
+
diff --git a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml
index 556a257f9f31..9c32b9f7015a 100644
--- a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml
+++ b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml
@@ -5,23 +5,48 @@ tests:
spec:
- spec_path: "integration_tests/spec.json"
connection:
+ # for CSV format
- config_path: "secrets/config.json"
status: "succeed"
+ # for Parquet format
+ - config_path: "secrets/parquet_config.json"
+ status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
+ # for CSV format
- config_path: "secrets/config.json"
+ # for Parquet format
+ - config_path: "secrets/parquet_config.json"
basic_read:
+ # for CSV format
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
- - expect_records:
- path: "integration_tests/expected_records.txt"
+ expect_records:
+ path: "integration_tests/expected_records.txt"
+ # for Parquet format
+ - config_path: "secrets/parquet_config.json"
+ configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
+ expect_records:
+ path: "integration_tests/parquet_expected_records.txt"
incremental:
+ # for CSV format
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"
+ # for Parquet format
+ - config_path: "secrets/parquet_config.json"
+ configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
+ cursor_paths:
+ test: ["_ab_source_file_last_modified"]
+ future_state_path: "integration_tests/abnormal_state.json"
+
full_refresh:
+ # for CSV format
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
+ # for Parquet format
+ - config_path: "secrets/parquet_config.json"
+ configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
diff --git a/airbyte-integrations/connectors/source-s3/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-s3/acceptance-test-docker.sh
index 1425ff74f151..4263e580fb6f 100644
--- a/airbyte-integrations/connectors/source-s3/acceptance-test-docker.sh
+++ b/airbyte-integrations/connectors/source-s3/acceptance-test-docker.sh
@@ -1,4 +1,13 @@
#!/usr/bin/env sh
+
+# Build latest connector image
+source_image=$(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)
+echo "try to build the source image: ${source_image}"
+docker build . -t ${source_image}
+
+# Pull latest acctest image
+docker pull airbyte/source-acceptance-test:latest
+
docker run --rm -it \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /tmp:/tmp \
diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalog.json
index 6accfc44d9dd..631648d6329c 100644
--- a/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalog.json
+++ b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalog.json
@@ -4,7 +4,7 @@
"stream": {
"name": "test",
"json_schema": {},
- "supported_sync_modes": ["incremental", "full_refresh"],
+ "supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
},
diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records.txt
index 3d09dbfee3be..aaeffe0694a5 100644
--- a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records.txt
+++ b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records.txt
@@ -5,4 +5,4 @@
{"stream": "test", "data": {"id": 5, "name": "77h4aiMP", "valid": true, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 6, "name": "Le35Wyic", "valid": true, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 7, "name": "xZhh1Kyl", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
-{"stream": "test", "data": {"id": 8, "name": "M2t286iJ", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
\ No newline at end of file
+{"stream": "test", "data": {"id": 8, "name": "M2t286iJ", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/integration_test_abstract.py b/airbyte-integrations/connectors/source-s3/integration_tests/integration_test_abstract.py
index 31dc7f961b26..a861043508f3 100644
--- a/airbyte-integrations/connectors/source-s3/integration_tests/integration_test_abstract.py
+++ b/airbyte-integrations/connectors/source-s3/integration_tests/integration_test_abstract.py
@@ -32,7 +32,7 @@
import pytest
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import SyncMode
-from source_s3.source_files_abstract.fileformatparser import CsvParser
+from source_s3.source_files_abstract.formats.csv_parser import CsvParser
from source_s3.source_files_abstract.stream import FileStream
HERE = Path(__file__).resolve().parent
@@ -136,7 +136,8 @@ def _stream_records_test_logic(
fs = self.stream_class("dataset", provider, format, path_pattern, str_user_schema)
LOGGER.info(f"Testing stream_records() in SyncMode:{sync_mode.value}")
- assert fs._get_schema_map() == full_expected_schema # check we return correct schema from get_json_schema()
+ # check we return correct schema from get_json_schema()
+ assert fs._get_schema_map() == full_expected_schema
records = []
for stream_slice in fs.stream_slices(sync_mode=sync_mode, stream_state=current_state):
@@ -144,7 +145,8 @@ def _stream_records_test_logic(
# we need to do this in order to work out which extra columns (if any) we expect in this stream_slice
expected_columns = []
for file_dict in stream_slice:
- file_reader = CsvParser(format) # TODO: if we ever test other filetypes in these tests this will need fixing
+ # TODO: if we ever test other filetypes in these tests this will need fixing
+ file_reader = CsvParser(format)
with file_dict["storagefile"].open(file_reader.is_binary) as f:
expected_columns.extend(list(file_reader.get_inferred_schema(f).keys()))
expected_columns = set(expected_columns) # de-dupe
@@ -550,7 +552,8 @@ def test_stream_records(
state=latest_state,
)
LOGGER.info(f"incremental state: {latest_state}")
- time.sleep(1) # small delay to ensure next file gets later last_modified timestamp
+ # small delay to ensure next file gets later last_modified timestamp
+ time.sleep(1)
self.teardown_infra(cloud_bucket_name, self.credentials)
except Exception as e:
diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/parquet_configured_catalog.json b/airbyte-integrations/connectors/source-s3/integration_tests/parquet_configured_catalog.json
new file mode 100644
index 000000000000..631648d6329c
--- /dev/null
+++ b/airbyte-integrations/connectors/source-s3/integration_tests/parquet_configured_catalog.json
@@ -0,0 +1,15 @@
+{
+ "streams": [
+ {
+ "stream": {
+ "name": "test",
+ "json_schema": {},
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ "source_defined_cursor": true,
+ "default_cursor_field": ["_ab_source_file_last_modified"]
+ },
+ "sync_mode": "incremental",
+ "destination_sync_mode": "append"
+ }
+ ]
+}
diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/parquet_expected_records.txt b/airbyte-integrations/connectors/source-s3/integration_tests/parquet_expected_records.txt
new file mode 100644
index 000000000000..986478dbee87
--- /dev/null
+++ b/airbyte-integrations/connectors/source-s3/integration_tests/parquet_expected_records.txt
@@ -0,0 +1,4 @@
+{"stream": "test", "data": {"number": 1.0, "name": "foo", "flag": true, "delta": -1.0, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-30T15:46:17+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1630795278000}
+{"stream": "test", "data": {"number": 2.0, "name": null, "flag": false, "delta": 2.5, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-30T15:46:17+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1630795278000}
+{"stream": "test", "data": {"number": 3.0, "name": "bar", "flag": null, "delta": 0.1, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-30T15:46:17+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1630795278000}
+{"stream": "test", "data": {"number": null, "name": "baz", "flag": true, "delta": null, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-30T15:46:17+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1630795278000}
diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json
index 1466dc93b58d..bc48673b3d6c 100644
--- a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json
+++ b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json
@@ -32,15 +32,16 @@
"format": {
"title": "Format",
"default": "csv",
+ "type": "object",
"oneOf": [
{
"title": "csv",
+ "description": "This connector utilises PyArrow (Apache Arrow) for CSV parsing.",
"type": "object",
"properties": {
"filetype": {
- "title": "CsvFiletype",
- "description": "This connector utilises PyArrow (Apache Arrow) for CSV parsing.",
- "enum": ["csv"],
+ "title": "Filetype",
+ "const": "csv",
"type": "string"
},
"delimiter": {
@@ -93,24 +94,41 @@
],
"type": "string"
}
- },
- "required": ["filetype"]
+ }
},
{
- "title": "Coming Soon...",
+ "title": "parquet",
+ "description": "This connector utilises PyArrow (Apache Arrow) for Parquet parsing.",
"type": "object",
"properties": {
"filetype": {
- "title": "ParquetFiletype",
- "description": "An enumeration.",
- "enum": ["parquet"],
+ "title": "Filetype",
+ "const": "parquet",
"type": "string"
+ },
+ "buffer_size": {
+ "title": "Buffer Size",
+ "description": "Perform read buffering when deserializing individual column chunks. By default every group column will be loaded fully to memory. This option can help to optimize a work with memory if your data is particularly wide or failing during detection of OOM errors.",
+ "default": 0,
+ "type": "integer"
+ },
+ "columns": {
+ "title": "Columns",
+ "description": "If you only want to sync a subset of the columns from the file(s), add the columns you want here. Leave it empty to sync all columns.",
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ },
+ "batch_size": {
+ "title": "Batch Size",
+ "description": "Maximum number of records per batch. Batches may be smaller if there aren’t enough rows in the file. This option can help to optimize a work with memory if your data is particularly wide or failing during detection of OOM errors.",
+ "default": 65536,
+ "type": "integer"
}
- },
- "required": ["filetype"]
+ }
}
- ],
- "type": "object"
+ ]
},
"provider": {
"title": "S3: Amazon Web Services",
diff --git a/airbyte-integrations/connectors/source-s3/setup.py b/airbyte-integrations/connectors/source-s3/setup.py
index d43bb2338099..defb2cc81870 100644
--- a/airbyte-integrations/connectors/source-s3/setup.py
+++ b/airbyte-integrations/connectors/source-s3/setup.py
@@ -25,11 +25,18 @@
from setuptools import find_packages, setup
-MAIN_REQUIREMENTS = ["airbyte-cdk", "pyarrow==4.0.1", "smart-open[s3]==5.1.0", "wcmatch==8.2", "dill==0.3.4"]
+MAIN_REQUIREMENTS = [
+ "airbyte-cdk~=0.1.7",
+ "pyarrow==4.0.1",
+ "smart-open[s3]==5.1.0",
+ "wcmatch==8.2",
+ "dill==0.3.4",
+]
TEST_REQUIREMENTS = [
"pytest~=6.1",
"source-acceptance-test",
+ "pandas==1.3.1",
]
setup(
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/s3file.py b/airbyte-integrations/connectors/source-s3/source_s3/s3file.py
index 3155f3cb63d9..0bcbc287d2d2 100644
--- a/airbyte-integrations/connectors/source-s3/source_s3/s3file.py
+++ b/airbyte-integrations/connectors/source-s3/source_s3/s3file.py
@@ -75,7 +75,8 @@ def last_modified(self) -> datetime:
return obj.last_modified
# For some reason, this standard method above doesn't work for public files with no credentials so fall back on below
except NoCredentialsError as nce:
- if self.use_aws_account(self._provider): # we don't expect this error if using credentials so throw it
+ # we don't expect this error if using credentials so throw it
+ if self.use_aws_account(self._provider):
raise nce
else:
return boto3.client("s3", config=ClientConfig(signature_version=UNSIGNED)).head_object(Bucket=bucket, Key=self.url)[
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py
new file mode 100644
index 000000000000..fac438563577
--- /dev/null
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py
@@ -0,0 +1,124 @@
+#
+# MIT License
+#
+# Copyright (c) 2020 Airbyte
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
+from abc import ABC, abstractmethod
+from typing import Any, BinaryIO, Iterator, Mapping, TextIO, Union
+
+import pyarrow as pa
+from airbyte_cdk.logger import AirbyteLogger
+
+
+class AbstractFileParser(ABC):
+ def __init__(self, format: dict, master_schema: dict = None):
+ """
+ :param format: file format specific mapping as described in spec.json
+ :param master_schema: superset schema determined from all files, might be unused for some formats, defaults to None
+ """
+ self._format = format
+ self._master_schema = (
+ master_schema # this may need to be used differently by some formats, pyarrow allows extra columns in csv schema
+ )
+ self.logger = AirbyteLogger()
+
+ @property
+ @abstractmethod
+ def is_binary(self):
+ """
+ Override this per format so that file-like objects passed in are currently opened as binary or not
+ """
+
+ @abstractmethod
+ def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
+ """
+ Override this with format-specifc logic to infer the schema of file
+ Note: needs to return inferred schema with JsonSchema datatypes
+
+ :param file: file-like object (opened via StorageFile)
+ :return: mapping of {columns:datatypes} where datatypes are JsonSchema types
+ """
+
+ @abstractmethod
+ def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
+ """
+ Override this with format-specifc logic to stream each data row from the file as a mapping of {columns:values}
+ Note: avoid loading the whole file into memory to avoid OOM breakages
+
+ :param file: file-like object (opened via StorageFile)
+ :yield: data record as a mapping of {columns:values}
+ """
+
+ @staticmethod
+ def json_type_to_pyarrow_type(typ: str, reverse: bool = False, logger: AirbyteLogger = AirbyteLogger()) -> str:
+ """
+ Converts Json Type to PyArrow types to (or the other way around if reverse=True)
+
+ :param typ: Json type if reverse is False, else PyArrow type
+ :param reverse: switch to True for PyArrow type -> Json type, defaults to False
+ :param logger: defaults to AirbyteLogger()
+ :return: PyArrow type if reverse is False, else Json type
+ """
+ str_typ = str(typ)
+ # this is a map of airbyte types to pyarrow types. The first list element of the pyarrow types should be the one to use where required.
+ map = {
+ "boolean": ("bool_", "bool"),
+ "integer": ("int64", "int8", "int16", "int32", "uint8", "uint16", "uint32", "uint64"),
+ "number": ("float64", "float16", "float32", "decimal128", "decimal256", "halffloat", "float", "double"),
+ "string": ("large_string", "string"),
+ # TODO: support object type rather than coercing to string
+ "object": ("large_string",),
+ # TODO: support array type rather than coercing to string
+ "array": ("large_string",),
+ "null": ("large_string",),
+ }
+ if not reverse:
+ for json_type, pyarrow_types in map.items():
+ if str_typ.lower() == json_type:
+ return getattr(
+ pa, pyarrow_types[0]
+ ).__call__() # better way might be necessary when we decide to handle more type complexity
+ logger.debug(f"JSON type '{str_typ}' is not mapped, falling back to default conversion to large_string")
+ return pa.large_string()
+ else:
+ for json_type, pyarrow_types in map.items():
+ if any([str_typ.startswith(pa_type) for pa_type in pyarrow_types]):
+ return json_type
+ logger.debug(f"PyArrow type '{str_typ}' is not mapped, falling back to default conversion to string")
+ return "string" # default type if unspecified in map
+
+ @staticmethod
+ def json_schema_to_pyarrow_schema(schema: Mapping[str, Any], reverse: bool = False) -> Mapping[str, Any]:
+ """
+ Converts a schema with JsonSchema datatypes to one with PyArrow types (or the other way if reverse=True)
+ This utilises json_type_to_pyarrow_type() to convert each datatype
+
+ :param schema: json/pyarrow schema to convert
+ :param reverse: switch to True for PyArrow schema -> Json schema, defaults to False
+ :return: converted schema dict
+ """
+ new_schema = {}
+
+ for column, json_type in schema.items():
+ new_schema[column] = AbstractFileParser.json_type_to_pyarrow_type(json_type, reverse=reverse)
+
+ return new_schema
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/fileformatparser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py
similarity index 66%
rename from airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/fileformatparser.py
rename to airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py
index 1e97b7c6983b..11a50aba1a5d 100644
--- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/fileformatparser.py
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py
@@ -24,114 +24,21 @@
import json
import multiprocessing as mp
-from abc import ABC, abstractmethod
from typing import Any, BinaryIO, Iterator, Mapping, Optional, TextIO, Tuple, Union
import dill
import pyarrow as pa
-from airbyte_cdk.logger import AirbyteLogger
from pyarrow import csv as pa_csv
+from .abstract_file_parser import AbstractFileParser
+
def multiprocess_queuer(func, queue: mp.Queue, *args, **kwargs):
""" this is our multiprocesser helper function, lives at top-level to be Windows-compatible """
queue.put(dill.loads(func)(*args, **kwargs))
-class FileFormatParser(ABC):
- def __init__(self, format: dict, master_schema: dict = None):
- """
- :param format: file format specific mapping as described in spec.json
- :param master_schema: superset schema determined from all files, might be unused for some formats, defaults to None
- """
- self._format = format
- self._master_schema = (
- master_schema # this may need to be used differently by some formats, pyarrow allows extra columns in csv schema
- )
- self.logger = AirbyteLogger()
-
- @property
- @abstractmethod
- def is_binary(self):
- """
- Override this per format so that file-like objects passed in are currently opened as binary or not
- """
-
- @abstractmethod
- def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
- """
- Override this with format-specifc logic to infer the schema of file
- Note: needs to return inferred schema with JsonSchema datatypes
-
- :param file: file-like object (opened via StorageFile)
- :return: mapping of {columns:datatypes} where datatypes are JsonSchema types
- """
-
- @abstractmethod
- def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
- """
- Override this with format-specifc logic to stream each data row from the file as a mapping of {columns:values}
- Note: avoid loading the whole file into memory to avoid OOM breakages
-
- :param file: file-like object (opened via StorageFile)
- :yield: data record as a mapping of {columns:values}
- """
-
- @staticmethod
- def json_type_to_pyarrow_type(typ: str, reverse: bool = False, logger: AirbyteLogger = AirbyteLogger()) -> str:
- """
- Converts Json Type to PyArrow types to (or the other way around if reverse=True)
-
- :param typ: Json type if reverse is False, else PyArrow type
- :param reverse: switch to True for PyArrow type -> Json type, defaults to False
- :param logger: defaults to AirbyteLogger()
- :return: PyArrow type if reverse is False, else Json type
- """
- str_typ = str(typ)
- # this is a map of airbyte types to pyarrow types. The first list element of the pyarrow types should be the one to use where required.
- map = {
- "boolean": ("bool_", "bool"),
- "integer": ("int64", "int8", "int16", "int32", "uint8", "uint16", "uint32", "uint64"),
- "number": ("float64", "float16", "float32", "decimal128", "decimal256", "halffloat", "float", "double"),
- "string": ("large_string", "string"),
- "object": ("large_string",), # TODO: support object type rather than coercing to string
- "array": ("large_string",), # TODO: support array type rather than coercing to string
- "null": ("large_string",),
- }
- if not reverse:
- for json_type, pyarrow_types in map.items():
- if str_typ.lower() == json_type:
- return getattr(
- pa, pyarrow_types[0]
- ).__call__() # better way might be necessary when we decide to handle more type complexity
- logger.debug(f"JSON type '{str_typ}' is not mapped, falling back to default conversion to large_string")
- return pa.large_string()
- else:
- for json_type, pyarrow_types in map.items():
- if any([str_typ.startswith(pa_type) for pa_type in pyarrow_types]):
- return json_type
- logger.debug(f"PyArrow type '{str_typ}' is not mapped, falling back to default conversion to string")
- return "string" # default type if unspecified in map
-
- @staticmethod
- def json_schema_to_pyarrow_schema(schema: Mapping[str, Any], reverse: bool = False) -> Mapping[str, Any]:
- """
- Converts a schema with JsonSchema datatypes to one with PyArrow types (or the other way if reverse=True)
- This utilises json_type_to_pyarrow_type() to convert each datatype
-
- :param schema: json/pyarrow schema to convert
- :param reverse: switch to True for PyArrow schema -> Json schema, defaults to False
- :return: converted schema dict
- """
- new_schema = {}
-
- for column, json_type in schema.items():
- new_schema[column] = FileFormatParser.json_type_to_pyarrow_type(json_type, reverse=reverse)
-
- return new_schema
-
-
-class CsvParser(FileFormatParser):
+class CsvParser(AbstractFileParser):
@property
def is_binary(self):
return True
@@ -139,7 +46,6 @@ def is_binary(self):
def _read_options(self):
"""
https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html
-
build ReadOptions object like: pa.csv.ReadOptions(**self._read_options())
"""
return {"block_size": self._format.get("block_size", 10000), "encoding": self._format.get("encoding", "utf8")}
@@ -147,7 +53,6 @@ def _read_options(self):
def _parse_options(self):
"""
https://arrow.apache.org/docs/python/generated/pyarrow.csv.ParseOptions.html
-
build ParseOptions object like: pa.csv.ParseOptions(**self._parse_options())
"""
quote_char = self._format.get("quote_char", False) if self._format.get("quote_char", False) != "" else False
@@ -162,9 +67,7 @@ def _parse_options(self):
def _convert_options(self, json_schema: Mapping[str, Any] = None):
"""
https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html
-
build ConvertOptions object like: pa.csv.ConvertOptions(**self._convert_options())
-
:param json_schema: if this is passed in, pyarrow will attempt to enforce this schema on read, defaults to None
"""
check_utf8 = True if self._format.get("encoding", "utf8").lower().replace("-", "") == "utf8" else False
@@ -178,14 +81,14 @@ def _run_in_external_process(self, fn, timeout: int, max_timeout: int, *args) ->
"""
fn passed in must return a tuple of (desired return value, Exception OR None)
This allows propagating any errors from the process up and raising accordingly
-
"""
result = None
while result is None:
q_worker = mp.Queue()
proc = mp.Process(
target=multiprocess_queuer,
- args=(dill.dumps(fn), q_worker, *args), # use dill to pickle the function for Windows-compatibility
+ # use dill to pickle the function for Windows-compatibility
+ args=(dill.dumps(fn), q_worker, *args),
)
proc.start()
try:
@@ -212,7 +115,6 @@ def _run_in_external_process(self, fn, timeout: int, max_timeout: int, *args) ->
def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
"""
https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html
-
This now uses multiprocessing in order to timeout the schema inference as it can hang.
Since the hanging code is resistant to signal interrupts, threading/futures doesn't help so needed to multiprocess.
https://issues.apache.org/jira/browse/ARROW-11853?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel
@@ -224,7 +126,6 @@ def infer_schema_process(
"""
we need to reimport here to be functional on Windows systems since it doesn't have fork()
https://docs.python.org/3.7/library/multiprocessing.html#contexts-and-start-methods
-
This returns a tuple of (schema_dict, None OR Exception).
If return[1] is not None and holds an exception we then raise this in the main process.
This lets us propagate up any errors (that aren't timeouts) and raise correctly.
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py
new file mode 100644
index 000000000000..83cbc10ad8d0
--- /dev/null
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py
@@ -0,0 +1,72 @@
+#
+# MIT License
+#
+# Copyright (c) 2020 Airbyte
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
+from typing import Optional
+
+from pydantic import BaseModel, Field
+
+
+class CsvFormat(BaseModel):
+ 'This connector utilises PyArrow (Apache Arrow) for CSV parsing.'
+
+ class Config:
+ title = "csv"
+
+ filetype: str = Field(
+ Config.title,
+ const=True,
+ )
+
+ delimiter: str = Field(
+ default=",",
+ min_length=1,
+ description="The character delimiting individual cells in the CSV data. This may only be a 1-character string.",
+ )
+ quote_char: str = Field(
+ default='"', description="The character used optionally for quoting CSV values. To disallow quoting, make this field blank."
+ )
+ escape_char: Optional[str] = Field(
+ default=None,
+ description="The character used optionally for escaping special characters. To disallow escaping, leave this field blank.",
+ )
+ encoding: Optional[str] = Field(
+ default=None,
+ description='The character encoding of the CSV data. Leave blank to default to UTF-8. See list of python encodings for allowable options.',
+ )
+ double_quote: bool = Field(default=True, description="Whether two quotes in a quoted CSV value denote a single quote in the data.")
+ newlines_in_values: bool = Field(
+ default=False,
+ description="Whether newline characters are allowed in CSV values. Turning this on may affect performance. Leave blank to default to False.",
+ )
+ block_size: int = Field(
+ default=10000,
+ description="The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.",
+ )
+ additional_reader_options: str = Field(
+ default="{}",
+ description='Optionally add a valid JSON string here to provide additional options to the csv reader. Mappings must correspond to options detailed here. \'column_types\' is used internally to handle schema so overriding that would likely cause problems.',
+ examples=[
+ '{"timestamp_parsers": ["%m/%d/%Y %H:%M", "%Y/%m/%d %H:%M"], "strings_can_be_null": true, "null_values": ["NA", "NULL"]}'
+ ],
+ )
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_parser.py
new file mode 100644
index 000000000000..17e743ed740d
--- /dev/null
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_parser.py
@@ -0,0 +1,118 @@
+#
+# MIT License
+#
+# Copyright (c) 2020 Airbyte
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
+from typing import Any, BinaryIO, Iterator, List, Mapping, TextIO, Union
+
+import pyarrow.parquet as pq
+from pyarrow.parquet import ParquetFile
+
+from .abstract_file_parser import AbstractFileParser
+from .parquet_spec import ParquetFormat
+
+# All possible parquet data typess
+PARQUET_TYPES = {
+ "BOOLEAN": "boolean",
+ "DOUBLE": "number",
+ "FLOAT": "number",
+ "BYTE_ARRAY": "string",
+ "INT32": "integer",
+ "INT64": "integer",
+ "INT96": "integer",
+}
+
+
+class ParquetParser(AbstractFileParser):
+ """Apache Parquet is a free and open-source column-oriented data storage format of the Apache Hadoop ecosystem.
+
+ Docs: https://parquet.apache.org/documentation/latest/
+ """
+
+ is_binary = True
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ # adds default values if necessary attributes are skipped.
+ for field_name, field in ParquetFormat.__fields__.items():
+ if self._format.get(field_name) is not None:
+ continue
+ self._format[field_name] = field.default
+
+ def _select_options(self, *names: List[str]) -> dict:
+ return {name: self._format[name] for name in names}
+
+ def _init_reader(self, file: BinaryIO) -> ParquetFile:
+ """Generates a new parquet reader
+ Doc: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html
+
+ """
+ options = self._select_options(
+ "buffer_size",
+ )
+ # Source is a file path and enabling memory_map can improve performance in some environments.
+ options["memory_map"] = True
+ return pq.ParquetFile(file, **options)
+
+ def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
+ """
+ https://arrow.apache.org/docs/python/parquet.html#finer-grained-reading-and-writing
+
+ A stored schema is a part of metadata and we can extract it without parsing of full file
+ """
+ reader = self._init_reader(file)
+ schema_dict = {field.name: PARQUET_TYPES[field.physical_type] for field in reader.schema}
+ if not schema_dict:
+ # pyarrow can parse empty parquet files but a connector can't generate dynamic schema
+ raise OSError("empty Parquet file")
+ return schema_dict
+
+ def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
+ """
+ https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html
+ PyArrow reads streaming batches from a Parquet file
+ """
+
+ reader = self._init_reader(file)
+ self.logger.info(f"found {reader.num_row_groups} row groups")
+
+ if not reader.schema:
+ # pyarrow can parse empty parquet files but a connector can't generate dynamic schema
+ raise OSError("empty Parquet file")
+
+ args = self._select_options("columns", "batch_size")
+ num_row_groups = list(range(reader.num_row_groups))
+
+ # load batches per page
+ for num_row_group in num_row_groups:
+ args["row_groups"] = [num_row_group]
+ for batch in reader.iter_batches(**args):
+ # this gives us a dist of lists where each nested list holds ordered values for a single column
+ # {'number': [1.0, 2.0, 3.0], 'name': ['foo', None, 'bar'], 'flag': [True, False, True], 'delta': [-1.0, 2.5, 0.1]}
+ batch_columns = [col.name for col in batch.schema]
+ batch_dict = batch.to_pydict()
+ columnwise_record_values = [batch_dict[column] for column in batch_columns]
+
+ # we zip this to get row-by-row
+ for record_values in zip(*columnwise_record_values):
+ yield {batch_columns[i]: record_values[i] for i in range(len(batch_columns))}
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_spec.py
new file mode 100644
index 000000000000..25bc2f18d30c
--- /dev/null
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_spec.py
@@ -0,0 +1,51 @@
+#
+# MIT License
+#
+# Copyright (c) 2020 Airbyte
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
+from typing import List, Optional
+
+from pydantic import BaseModel, Field
+
+
+class ParquetFormat(BaseModel):
+ 'This connector utilises PyArrow (Apache Arrow) for Parquet parsing.'
+
+ class Config:
+ title = "parquet"
+
+ filetype: str = Field(Config.title, const=True)
+
+ buffer_size: int = Field(
+ default=0,
+ description="Perform read buffering when deserializing individual column chunks. By default every group column will be loaded fully to memory. This option can help to optimize a work with memory if your data is particularly wide or failing during detection of OOM errors.",
+ )
+
+ columns: Optional[List[str]] = Field(
+ default=None,
+ description="If you only want to sync a subset of the columns from the file(s), add the columns you want here. Leave it empty to sync all columns.",
+ )
+
+ batch_size: int = Field(
+ default=64 * 1024, # 64K records
+ description="Maximum number of records per batch. Batches may be smaller if there aren’t enough rows in the file. This option can help to optimize a work with memory if your data is particularly wide or failing during detection of OOM errors.",
+ )
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py
index 47870f145ceb..e65915b6af9d 100644
--- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py
@@ -71,7 +71,8 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
- That the path pattern(s) provided in config are valid to be matched against.
:param logger: an instance of AirbyteLogger to use
- :param config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
+ :param config: The user-provided configuration as specified by the source's spec.
+ This usually contains information required to check connection e.g. tokens, secrets and keys etc.
:return: A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data
source using the provided configuration.
Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong.
@@ -83,7 +84,8 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
for filepath in self.stream_class.filepath_iterator(logger, config.get("provider")):
found_a_file = True
# TODO: will need to split config.get("path_pattern") up by stream once supporting multiple streams
- globmatch(filepath, config.get("path_pattern"), flags=GLOBSTAR | SPLIT) # test that matching on the pattern doesn't error
+ # test that matching on the pattern doesn't error
+ globmatch(filepath, config.get("path_pattern"), flags=GLOBSTAR | SPLIT)
break # just need first file here to test connection and valid patterns
except Exception as e:
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py
index a397ea8623f8..abe4029f408b 100644
--- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py
@@ -26,12 +26,14 @@
import json
import re
from copy import deepcopy
-from enum import Enum
-from typing import Optional, Union
+from typing import Union
from jsonschema import RefResolver
from pydantic import BaseModel, Field
+from .formats.csv_spec import CsvFormat
+from .formats.parquet_spec import ParquetFormat
+
# To implement your provider specific spec, inherit from SourceFilesAbstractSpec and add provider-specific settings e.g.:
# class SourceS3Spec(SourceFilesAbstractSpec, BaseModel):
@@ -60,64 +62,6 @@
# provider: S3Provider = Field(...) # leave this as Field(...), just change type to relevant class
-class CsvFormat(BaseModel):
- class Config:
- title = "csv"
-
- class CsvFiletype(str, Enum):
- """
- This connector utilises PyArrow (Apache Arrow) for CSV parsing.
- """
-
- csv = "csv"
-
- filetype: CsvFiletype
-
- delimiter: str = Field(
- default=",",
- min_length=1,
- description="The character delimiting individual cells in the CSV data. This may only be a 1-character string.",
- )
- quote_char: str = Field(
- default='"', description="The character used optionally for quoting CSV values. To disallow quoting, make this field blank."
- )
- escape_char: Optional[str] = Field(
- default=None,
- description="The character used optionally for escaping special characters. To disallow escaping, leave this field blank.",
- )
- encoding: Optional[str] = Field(
- default=None,
- description='The character encoding of the CSV data. Leave blank to default to UTF-8. See list of python encodings for allowable options.',
- )
- double_quote: bool = Field(default=True, description="Whether two quotes in a quoted CSV value denote a single quote in the data.")
- newlines_in_values: bool = Field(
- default=False,
- description="Whether newline characters are allowed in CSV values. Turning this on may affect performance. Leave blank to default to False.",
- )
- block_size: int = Field(
- default=10000,
- description="The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.",
- )
- additional_reader_options: str = Field(
- default="{}",
- description='Optionally add a valid JSON string here to provide additional options to the csv reader. Mappings must correspond to options detailed here. \'column_types\' is used internally to handle schema so overriding that would likely cause problems.',
- examples=[
- '{"timestamp_parsers": ["%m/%d/%Y %H:%M", "%Y/%m/%d %H:%M"], "strings_can_be_null": true, "null_values": ["NA", "NULL"]}'
- ],
- )
-
-
-# We need this in as a dummy for now so that format comes out correctly as a oneOf
-class ParquetFormat(BaseModel):
- class Config:
- title = "Coming Soon..."
-
- class ParquetFiletype(str, Enum):
- parquet = "parquet"
-
- filetype: ParquetFiletype
-
-
class SourceFilesAbstractSpec(BaseModel):
dataset: str = Field(
@@ -137,12 +81,14 @@ class SourceFilesAbstractSpec(BaseModel):
examples=['{"column_1": "number", "column_2": "string", "column_3": "array", "column_4": "object", "column_5": "boolean"}'],
)
- format: Union[CsvFormat, ParquetFormat] = Field(default="csv")
+ format: Union[CsvFormat, ParquetFormat] = Field(default=CsvFormat.Config.title)
@staticmethod
def change_format_to_oneOf(schema: dict) -> dict:
- schema["properties"]["format"]["oneOf"] = deepcopy(schema["properties"]["format"]["anyOf"])
schema["properties"]["format"]["type"] = "object"
+ if "oneOf" in schema["properties"]["format"]:
+ return schema
+ schema["properties"]["format"]["oneOf"] = deepcopy(schema["properties"]["format"]["anyOf"])
del schema["properties"]["format"]["anyOf"]
return schema
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
index ba90524caa44..836c93d055b3 100644
--- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
@@ -37,7 +37,8 @@
from airbyte_cdk.sources.streams import Stream
from wcmatch.glob import GLOBSTAR, SPLIT, globmatch
-from .fileformatparser import CsvParser
+from .formats.csv_parser import CsvParser
+from .formats.parquet_parser import ParquetParser
from .storagefile import StorageFile
JSON_TYPES = ["string", "number", "integer", "object", "array", "boolean", "null"]
@@ -48,12 +49,14 @@ class ConfigurationError(Exception):
class FileStream(Stream, ABC):
+ @property
+ def fileformatparser_map(self):
+ """Mapping where every key is equal 'filetype' and values are corresponding parser classes."""
+ return {
+ "csv": CsvParser,
+ "parquet": ParquetParser,
+ }
- fileformatparser_map = {
- "csv": CsvParser,
- # 'parquet': ParquetParser,
- # etc.
- }
# TODO: make these user configurable in spec.json
ab_additional_col = "_ab_additional_properties"
ab_last_mod_col = "_ab_source_file_last_modified"
@@ -122,7 +125,13 @@ def fileformatparser_class(self) -> type:
"""
:return: reference to the relevant fileformatparser class e.g. CsvParser
"""
- return self.fileformatparser_map[self._format.get("filetype")]
+ filetype = self._format.get("filetype")
+ file_reader = self.fileformatparser_map.get(self._format.get("filetype"))
+ if not file_reader:
+ raise RuntimeError(
+ f"Detected mismatched file format '{filetype}'. Available values: '{list( self.fileformatparser_map.keys())}''."
+ )
+ return file_reader
@property
@abstractmethod
@@ -181,7 +190,8 @@ def get_storagefile_with_lastmod(filepath: str) -> Tuple[datetime, StorageFile]:
futures = [executor.submit(get_storagefile_with_lastmod, fp) for fp in filepath_gen]
for future in concurrent.futures.as_completed(futures):
- storagefiles.append(future.result()) # this will failfast on any errors
+ # this will failfast on any errors
+ storagefiles.append(future.result())
# The array storagefiles contain tuples of (last_modified, StorageFile), so sort by last_modified
self.storagefile_cache = sorted(storagefiles, key=itemgetter(0))
@@ -228,6 +238,7 @@ def _get_master_schema(self) -> Mapping[str, Any]:
master_schema = deepcopy(self._schema)
file_reader = self.fileformatparser_class(self._format)
+
# time order isn't necessary here but we might as well use this method so we cache the list for later use
for _, storagefile in self.time_ordered_storagefile_iterator():
with storagefile.open(file_reader.is_binary) as f:
@@ -412,7 +423,7 @@ def stream_slices(
we yield the stream_slice containing file(s) up to and EXcluding the file on the current iteration.
The stream_slice is then cleared (if we yielded it) and this iteration's file appended to the (next) stream_slice
"""
- if sync_mode.value == "full_refresh":
+ if sync_mode == SyncMode.full_refresh:
yield from super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state)
else:
diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/__init__.py b/airbyte-integrations/connectors/source-s3/unit_tests/__init__.py
new file mode 100644
index 000000000000..e69de29bb2d1
diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/abstract_test_parser.py b/airbyte-integrations/connectors/source-s3/unit_tests/abstract_test_parser.py
new file mode 100644
index 000000000000..b1a7fb659c17
--- /dev/null
+++ b/airbyte-integrations/connectors/source-s3/unit_tests/abstract_test_parser.py
@@ -0,0 +1,75 @@
+#
+# MIT License
+#
+# Copyright (c) 2020 Airbyte
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
+from abc import ABC, abstractmethod
+from typing import Any, List, Mapping
+
+import pytest
+from airbyte_cdk import AirbyteLogger
+from smart_open import open as smart_open
+
+
+class AbstractTestParser(ABC):
+ """ Prefix this class with Abstract so the tests don't run here but only in the children """
+
+ logger = AirbyteLogger()
+
+ @property
+ @abstractmethod
+ def test_files(self) -> List[Mapping[str, Any]]:
+ """return a list of test_file dicts in structure:
+ [
+ {"AbstractFileParser": CsvParser(format, master_schema), "filepath": "...", "num_records": 5, "inferred_schema": {...}, line_checks:{}, fails: []},
+ {"AbstractFileParser": CsvParser(format, master_schema), "filepath": "...", "num_records": 16, "inferred_schema": {...}, line_checks:{}, fails: []}
+ ]
+ note: line_checks index is 1-based to align with row numbers
+ """
+
+ def _get_readmode(self, test_name, test_file):
+ self.logger.info(f"testing {test_name}() with {test_file.get('test_alias', test_file['filepath'].split('/')[-1])} ...")
+ return "rb" if test_file["AbstractFileParser"].is_binary else "r"
+
+ def test_get_inferred_schema(self):
+ for test_file in self.test_files:
+ with smart_open(test_file["filepath"], self._get_readmode("get_inferred_schema", test_file)) as f:
+ if "test_get_inferred_schema" in test_file["fails"]:
+ with pytest.raises(Exception) as e_info:
+ test_file["AbstractFileParser"].get_inferred_schema(f)
+ self.logger.debug(str(e_info))
+ else:
+ assert test_file["AbstractFileParser"].get_inferred_schema(f) == test_file["inferred_schema"]
+
+ def test_stream_records(self):
+ for test_file in self.test_files:
+ with smart_open(test_file["filepath"], self._get_readmode("stream_records", test_file)) as f:
+ if "test_stream_records" in test_file["fails"]:
+ with pytest.raises(Exception) as e_info:
+ [print(r) for r in test_file["AbstractFileParser"].stream_records(f)]
+ self.logger.debug(str(e_info))
+ else:
+ records = [r for r in test_file["AbstractFileParser"].stream_records(f)]
+
+ assert len(records) == test_file["num_records"]
+ for index, expected_record in test_file["line_checks"].items():
+ assert records[index - 1] == expected_record
diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/test_abstract_file_parser.py b/airbyte-integrations/connectors/source-s3/unit_tests/test_abstract_file_parser.py
new file mode 100644
index 000000000000..b7a5dc4fa7c9
--- /dev/null
+++ b/airbyte-integrations/connectors/source-s3/unit_tests/test_abstract_file_parser.py
@@ -0,0 +1,131 @@
+#
+# MIT License
+#
+# Copyright (c) 2020 Airbyte
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
+import pyarrow as pa
+import pytest
+from airbyte_cdk import AirbyteLogger
+from source_s3.source_files_abstract.formats.abstract_file_parser import AbstractFileParser
+
+LOGGER = AirbyteLogger()
+
+
+class TestAbstractFileParserStatics:
+ @pytest.mark.parametrize( # testing all datatypes as laid out here: https://json-schema.org/understanding-json-schema/reference/type.html
+ "input_json_type, output_pyarrow_type",
+ [
+ ("string", pa.large_string()),
+ ("number", pa.float64()),
+ ("integer", pa.int64()),
+ ("object", pa.large_string()),
+ ("array", pa.large_string()),
+ ("boolean", pa.bool_()),
+ ("null", pa.large_string()),
+ ],
+ )
+ def test_json_type_to_pyarrow_type(self, input_json_type, output_pyarrow_type):
+ # Json -> PyArrow direction
+ LOGGER.info(f"asserting that JSON type '{input_json_type}' converts to PyArrow type '{output_pyarrow_type}'...")
+ assert AbstractFileParser.json_type_to_pyarrow_type(input_json_type) == output_pyarrow_type
+
+ @pytest.mark.parametrize( # testing all datatypes as laid out here: https://arrow.apache.org/docs/python/api/datatypes.html
+ "input_pyarrow_types, output_json_type",
+ [
+ ((pa.null(),), "string"), # null type
+ ((pa.bool_(),), "boolean"), # boolean type
+ (
+ (pa.int8(), pa.int16(), pa.int32(), pa.int64(), pa.uint8(), pa.uint16(), pa.uint32(), pa.uint64()),
+ "integer",
+ ), # integer types
+ ((pa.float16(), pa.float32(), pa.float64(), pa.decimal128(5, 10), pa.decimal256(3, 8)), "number"), # number types
+ ((pa.time32("s"), pa.time64("ns"), pa.timestamp("ms"), pa.date32(), pa.date64()), "string"), # temporal types
+ ((pa.binary(), pa.large_binary()), "string"), # binary types
+ ((pa.string(), pa.utf8(), pa.large_string(), pa.large_utf8()), "string"), # string types
+ ((pa.list_(pa.string()), pa.large_list(pa.timestamp("us"))), "string"), # array types
+ ((pa.map_(pa.string(), pa.float32()), pa.dictionary(pa.int16(), pa.list_(pa.string()))), "string"), # object types
+ ],
+ )
+ def test_json_type_to_pyarrow_type_reverse(self, input_pyarrow_types, output_json_type):
+ # PyArrow -> Json direction (reverse=True)
+ for typ in input_pyarrow_types:
+ LOGGER.info(f"asserting that PyArrow type '{typ}' converts to JSON type '{output_json_type}'...")
+ assert AbstractFileParser.json_type_to_pyarrow_type(typ, reverse=True) == output_json_type
+
+ @pytest.mark.parametrize( # if expecting fail, put pyarrow_schema as None
+ "json_schema, pyarrow_schema",
+ [
+ (
+ {"a": "string", "b": "number", "c": "integer", "d": "object", "e": "array", "f": "boolean", "g": "null"},
+ {
+ "a": pa.large_string(),
+ "b": pa.float64(),
+ "c": pa.int64(),
+ "d": pa.large_string(),
+ "e": pa.large_string(),
+ "f": pa.bool_(),
+ "g": pa.large_string(),
+ },
+ ),
+ ({"single_column": "object"}, {"single_column": pa.large_string()}),
+ ({}, {}),
+ ({"a": "NOT A REAL TYPE", "b": "another fake type"}, {"a": pa.large_string(), "b": pa.large_string()}),
+ (["string", "object"], None), # bad input type
+ ],
+ )
+ def test_json_schema_to_pyarrow_schema(self, json_schema, pyarrow_schema):
+ # Json -> PyArrow direction
+ if pyarrow_schema is not None:
+ assert AbstractFileParser.json_schema_to_pyarrow_schema(json_schema) == pyarrow_schema
+ else:
+ with pytest.raises(Exception) as e_info:
+ AbstractFileParser.json_schema_to_pyarrow_schema(json_schema)
+ LOGGER.debug(str(e_info))
+
+ @pytest.mark.parametrize( # if expecting fail, put json_schema as None
+ "pyarrow_schema, json_schema",
+ [
+ (
+ {
+ "a": pa.utf8(),
+ "b": pa.float16(),
+ "c": pa.uint32(),
+ "d": pa.map_(pa.string(), pa.float32()),
+ "e": pa.bool_(),
+ "f": pa.date64(),
+ },
+ {"a": "string", "b": "number", "c": "integer", "d": "string", "e": "boolean", "f": "string"},
+ ),
+ ({"single_column": pa.int32()}, {"single_column": "integer"}),
+ ({}, {}),
+ ({"a": "NOT A REAL TYPE", "b": "another fake type"}, {"a": "string", "b": "string"}),
+ (["string", "object"], None), # bad input type
+ ],
+ )
+ def test_json_schema_to_pyarrow_schema_reverse(self, pyarrow_schema, json_schema):
+ # PyArrow -> Json direction (reverse=True)
+ if json_schema is not None:
+ assert AbstractFileParser.json_schema_to_pyarrow_schema(pyarrow_schema, reverse=True) == json_schema
+ else:
+ with pytest.raises(Exception) as e_info:
+ AbstractFileParser.json_schema_to_pyarrow_schema(pyarrow_schema, reverse=True)
+ LOGGER.debug(str(e_info))
diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/test_fileformatparser.py b/airbyte-integrations/connectors/source-s3/unit_tests/test_csv_parser.py
similarity index 56%
rename from airbyte-integrations/connectors/source-s3/unit_tests/test_fileformatparser.py
rename to airbyte-integrations/connectors/source-s3/unit_tests/test_csv_parser.py
index eb3fa314a0e1..035e536c344e 100644
--- a/airbyte-integrations/connectors/source-s3/unit_tests/test_fileformatparser.py
+++ b/airbyte-integrations/connectors/source-s3/unit_tests/test_csv_parser.py
@@ -22,172 +22,24 @@
# SOFTWARE.
#
-
import os
-from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, List, Mapping
-import pyarrow as pa
-import pytest
-from airbyte_cdk import AirbyteLogger
-from smart_open import open as smart_open
-from source_s3.source_files_abstract.fileformatparser import CsvParser, FileFormatParser
-
-LOGGER = AirbyteLogger()
-SAMPLE_DIRECTORY = Path(__file__).resolve().parent.joinpath("sample_files/")
-
-
-class TestFileFormatParserStatics:
- @pytest.mark.parametrize( # testing all datatypes as laid out here: https://json-schema.org/understanding-json-schema/reference/type.html
- "input_json_type, output_pyarrow_type",
- [
- ("string", pa.large_string()),
- ("number", pa.float64()),
- ("integer", pa.int64()),
- ("object", pa.large_string()),
- ("array", pa.large_string()),
- ("boolean", pa.bool_()),
- ("null", pa.large_string()),
- ],
- )
- def test_json_type_to_pyarrow_type(self, input_json_type, output_pyarrow_type):
- # Json -> PyArrow direction
- LOGGER.info(f"asserting that JSON type '{input_json_type}' converts to PyArrow type '{output_pyarrow_type}'...")
- assert FileFormatParser.json_type_to_pyarrow_type(input_json_type) == output_pyarrow_type
-
- @pytest.mark.parametrize( # testing all datatypes as laid out here: https://arrow.apache.org/docs/python/api/datatypes.html
- "input_pyarrow_types, output_json_type",
- [
- ((pa.null(),), "string"), # null type
- ((pa.bool_(),), "boolean"), # boolean type
- (
- (pa.int8(), pa.int16(), pa.int32(), pa.int64(), pa.uint8(), pa.uint16(), pa.uint32(), pa.uint64()),
- "integer",
- ), # integer types
- ((pa.float16(), pa.float32(), pa.float64(), pa.decimal128(5, 10), pa.decimal256(3, 8)), "number"), # number types
- ((pa.time32("s"), pa.time64("ns"), pa.timestamp("ms"), pa.date32(), pa.date64()), "string"), # temporal types
- ((pa.binary(), pa.large_binary()), "string"), # binary types
- ((pa.string(), pa.utf8(), pa.large_string(), pa.large_utf8()), "string"), # string types
- ((pa.list_(pa.string()), pa.large_list(pa.timestamp("us"))), "string"), # array types
- ((pa.map_(pa.string(), pa.float32()), pa.dictionary(pa.int16(), pa.list_(pa.string()))), "string"), # object types
- ],
- )
- def test_json_type_to_pyarrow_type_reverse(self, input_pyarrow_types, output_json_type):
- # PyArrow -> Json direction (reverse=True)
- for typ in input_pyarrow_types:
- LOGGER.info(f"asserting that PyArrow type '{typ}' converts to JSON type '{output_json_type}'...")
- assert FileFormatParser.json_type_to_pyarrow_type(typ, reverse=True) == output_json_type
-
- @pytest.mark.parametrize( # if expecting fail, put pyarrow_schema as None
- "json_schema, pyarrow_schema",
- [
- (
- {"a": "string", "b": "number", "c": "integer", "d": "object", "e": "array", "f": "boolean", "g": "null"},
- {
- "a": pa.large_string(),
- "b": pa.float64(),
- "c": pa.int64(),
- "d": pa.large_string(),
- "e": pa.large_string(),
- "f": pa.bool_(),
- "g": pa.large_string(),
- },
- ),
- ({"single_column": "object"}, {"single_column": pa.large_string()}),
- ({}, {}),
- ({"a": "NOT A REAL TYPE", "b": "another fake type"}, {"a": pa.large_string(), "b": pa.large_string()}),
- (["string", "object"], None), # bad input type
- ],
- )
- def test_json_schema_to_pyarrow_schema(self, json_schema, pyarrow_schema):
- # Json -> PyArrow direction
- if pyarrow_schema is not None:
- assert FileFormatParser.json_schema_to_pyarrow_schema(json_schema) == pyarrow_schema
- else:
- with pytest.raises(Exception) as e_info:
- FileFormatParser.json_schema_to_pyarrow_schema(json_schema)
- LOGGER.debug(str(e_info))
-
- @pytest.mark.parametrize( # if expecting fail, put json_schema as None
- "pyarrow_schema, json_schema",
- [
- (
- {
- "a": pa.utf8(),
- "b": pa.float16(),
- "c": pa.uint32(),
- "d": pa.map_(pa.string(), pa.float32()),
- "e": pa.bool_(),
- "f": pa.date64(),
- },
- {"a": "string", "b": "number", "c": "integer", "d": "string", "e": "boolean", "f": "string"},
- ),
- ({"single_column": pa.int32()}, {"single_column": "integer"}),
- ({}, {}),
- ({"a": "NOT A REAL TYPE", "b": "another fake type"}, {"a": "string", "b": "string"}),
- (["string", "object"], None), # bad input type
- ],
- )
- def test_json_schema_to_pyarrow_schema_reverse(self, pyarrow_schema, json_schema):
- # PyArrow -> Json direction (reverse=True)
- if json_schema is not None:
- assert FileFormatParser.json_schema_to_pyarrow_schema(pyarrow_schema, reverse=True) == json_schema
- else:
- with pytest.raises(Exception) as e_info:
- FileFormatParser.json_schema_to_pyarrow_schema(pyarrow_schema, reverse=True)
- LOGGER.debug(str(e_info))
-
+from source_s3.source_files_abstract.formats.csv_parser import CsvParser
-class AbstractTestFileFormatParser(ABC):
- """ Prefix this class with Abstract so the tests don't run here but only in the children """
+from .abstract_test_parser import AbstractTestParser
- @property
- @abstractmethod
- def test_files(self) -> List[Mapping[str, Any]]:
- """return a list of test_file dicts in structure:
- [
- {"fileformatparser": CsvParser(format, master_schema), "filepath": "...", "num_records": 5, "inferred_schema": {...}, line_checks:{}, fails: []},
- {"fileformatparser": CsvParser(format, master_schema), "filepath": "...", "num_records": 16, "inferred_schema": {...}, line_checks:{}, fails: []}
- ]
- note: line_checks index is 1-based to align with row numbers
- """
-
- def _get_readmode(self, test_name, test_file):
- LOGGER.info(f"testing {test_name}() with {test_file.get('test_alias', test_file['filepath'].split('/')[-1])} ...")
- return "rb" if test_file["fileformatparser"].is_binary else "r"
-
- def test_get_inferred_schema(self):
- for test_file in self.test_files:
- with smart_open(test_file["filepath"], self._get_readmode("get_inferred_schema", test_file)) as f:
- if "test_get_inferred_schema" in test_file["fails"]:
- with pytest.raises(Exception) as e_info:
- test_file["fileformatparser"].get_inferred_schema(f)
- LOGGER.debug(str(e_info))
- else:
- assert test_file["fileformatparser"].get_inferred_schema(f) == test_file["inferred_schema"]
-
- def test_stream_records(self):
- for test_file in self.test_files:
- with smart_open(test_file["filepath"], self._get_readmode("stream_records", test_file)) as f:
- if "test_stream_records" in test_file["fails"]:
- with pytest.raises(Exception) as e_info:
- [print(r) for r in test_file["fileformatparser"].stream_records(f)]
- LOGGER.debug(str(e_info))
- else:
- records = [r for r in test_file["fileformatparser"].stream_records(f)]
- assert len(records) == test_file["num_records"]
- for index, expected_record in test_file["line_checks"].items():
- assert records[index - 1] == expected_record
+SAMPLE_DIRECTORY = Path(__file__).resolve().parent.joinpath("sample_files/")
-class TestCsvParser(AbstractTestFileFormatParser):
+class TestCsvParser(AbstractTestParser):
@property
def test_files(self) -> List[Mapping[str, Any]]:
return [
{
# basic 'normal' test
- "fileformatparser": CsvParser(
+ "AbstractFileParser": CsvParser(
format={"filetype": "csv"},
master_schema={
"id": "integer",
@@ -216,7 +68,7 @@ def test_files(self) -> List[Mapping[str, Any]]:
{
# tests custom CSV parameters (odd delimiter, quote_char, escape_char & newlines in values in the file)
"test_alias": "custom csv parameters",
- "fileformatparser": CsvParser(
+ "AbstractFileParser": CsvParser(
format={"filetype": "csv", "delimiter": "^", "quote_char": "|", "escape_char": "!", "newlines_in_values": True},
master_schema={
"id": "integer",
@@ -245,7 +97,7 @@ def test_files(self) -> List[Mapping[str, Any]]:
{
# tests encoding: Big5
"test_alias": "encoding: Big5",
- "fileformatparser": CsvParser(
+ "AbstractFileParser": CsvParser(
format={"filetype": "csv", "encoding": "big5"}, master_schema={"id": "integer", "name": "string", "valid": "boolean"}
),
"filepath": os.path.join(SAMPLE_DIRECTORY, "csv/test_file_3_enc_Big5.csv"),
@@ -263,7 +115,7 @@ def test_files(self) -> List[Mapping[str, Any]]:
{
# tests encoding: Arabic (Windows 1256)
"test_alias": "encoding: Arabic (Windows 1256)",
- "fileformatparser": CsvParser(
+ "AbstractFileParser": CsvParser(
format={"filetype": "csv", "encoding": "windows-1256"},
master_schema={"id": "integer", "notes": "string", "valid": "boolean"},
),
@@ -282,7 +134,7 @@ def test_files(self) -> List[Mapping[str, Any]]:
{
# tests compression: gzip
"test_alias": "compression: gzip",
- "fileformatparser": CsvParser(
+ "AbstractFileParser": CsvParser(
format={"filetype": "csv"},
master_schema={
"id": "integer",
@@ -321,7 +173,7 @@ def test_files(self) -> List[Mapping[str, Any]]:
{
# tests compression: bz2
"test_alias": "compression: bz2",
- "fileformatparser": CsvParser(
+ "AbstractFileParser": CsvParser(
format={"filetype": "csv"},
master_schema={
"id": "integer",
@@ -360,7 +212,7 @@ def test_files(self) -> List[Mapping[str, Any]]:
{
# tests extra columns in master schema
"test_alias": "extra columns in master schema",
- "fileformatparser": CsvParser(
+ "AbstractFileParser": CsvParser(
format={"filetype": "csv"},
master_schema={
"EXTRA_COLUMN_1": "boolean",
@@ -392,7 +244,7 @@ def test_files(self) -> List[Mapping[str, Any]]:
# tests missing columns in master schema
# TODO: maybe this should fail read_records, but it does pick up all the columns from file despite missing from master schema
"test_alias": "missing columns in master schema",
- "fileformatparser": CsvParser(format={"filetype": "csv"}, master_schema={"id": "integer", "name": "string"}),
+ "AbstractFileParser": CsvParser(format={"filetype": "csv"}, master_schema={"id": "integer", "name": "string"}),
"filepath": os.path.join(SAMPLE_DIRECTORY, "csv/test_file_1.csv"),
"num_records": 8,
"inferred_schema": {
@@ -410,7 +262,7 @@ def test_files(self) -> List[Mapping[str, Any]]:
{
# tests empty file, SHOULD FAIL INFER & STREAM RECORDS
"test_alias": "empty csv file",
- "fileformatparser": CsvParser(format={"filetype": "csv"}, master_schema={}),
+ "AbstractFileParser": CsvParser(format={"filetype": "csv"}, master_schema={}),
"filepath": os.path.join(SAMPLE_DIRECTORY, "csv/test_file_6_empty.csv"),
"num_records": 0,
"inferred_schema": {},
diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/test_parquet_parser.py b/airbyte-integrations/connectors/source-s3/unit_tests/test_parquet_parser.py
new file mode 100644
index 000000000000..b2f34f6c9ece
--- /dev/null
+++ b/airbyte-integrations/connectors/source-s3/unit_tests/test_parquet_parser.py
@@ -0,0 +1,317 @@
+#
+# MIT License
+#
+# Copyright (c) 2020 Airbyte
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
+import bz2
+import copy
+import gzip
+import os
+import random
+import shutil
+import sys
+import tempfile
+from pathlib import Path
+from typing import Any, List, Mapping
+
+import pandas as pd
+import pyarrow as pa
+import pyarrow.parquet as pq
+import pytest
+from source_s3.source_files_abstract.formats.parquet_parser import PARQUET_TYPES, ParquetParser
+
+from .abstract_test_parser import AbstractTestParser
+
+SAMPLE_DIRECTORY = Path(__file__).resolve().parent.joinpath("sample_files/")
+
+
+class TestParquetParser(AbstractTestParser):
+ filetype = "parquet"
+
+ def _save_parquet_file(self, filename: str, columns: List[str], rows: List[List[Any]]) -> str:
+ data = {}
+ for col_values in zip(columns, *rows):
+ data[col_values[0]] = list(col_values[1:])
+
+ if rows:
+ df = pd.DataFrame(data)
+ table = pa.Table.from_pandas(df)
+ else:
+ table = pa.Table.from_arrays([])
+
+ pq.write_table(table, filename)
+ return filename
+
+ def generate_parquet_file(
+ self, name: str, columns: Mapping[str, str], num_rows: int, custom_rows: Mapping[int, Mapping[str, Any]] = None
+ ) -> str:
+ """Generates a random data and save it to a tmp file"""
+ filename = os.path.join(self.tmp_folder, name + "." + self.filetype)
+ if os.path.exists(filename):
+ return filename
+
+ types = list(columns.values()) if num_rows else []
+ rows = [self._generate_row(types) for _ in range(num_rows)]
+ for n, custom_row in (custom_rows or {}).items():
+ rows[n] = custom_row
+ return self._save_parquet_file(filename, list(columns.keys()) if num_rows else [], rows)
+
+ @classmethod
+ def _generate_row(cls, types: List[str]) -> List[Any]:
+ """Generates random values with request types"""
+ row = []
+ for needed_type in types:
+ for json_type in PARQUET_TYPES.values():
+ if json_type == needed_type:
+ row.append(cls._generate_value(needed_type))
+ break
+ return row
+
+ @classmethod
+ def _generate_value(cls, typ: str) -> Any:
+ if typ not in ["boolean", "integer"] and cls._generate_value("boolean"):
+ # return 'None' for +- 33% of all requests
+ return None
+
+ if typ == "number":
+ while True:
+ int_value = cls._generate_value("integer")
+ if int_value:
+ break
+ return float(int_value) + random.random()
+ elif typ == "integer":
+ return random.randint(-sys.maxsize - 1, sys.maxsize)
+ # return random.randint(0, 1000)
+ elif typ == "boolean":
+ return random.choice([True, False, None])
+ elif typ == "string":
+ random_length = random.randint(0, 10 * 1024) # max size of bytes is 10k
+ return os.urandom(random_length)
+
+ raise Exception(f"not supported type: {typ}")
+
+ @property
+ def tmp_folder(self):
+ return os.path.join(tempfile.gettempdir(), self.__class__.__name__)
+
+ def compress(self, archive_name: str, filename: str) -> str:
+ compress_filename = f"{filename}.{archive_name}"
+ with open(filename, "rb") as f_in:
+ if archive_name == "gz":
+ with gzip.open(compress_filename, "wb") as f_out:
+ shutil.copyfileobj(f_in, f_out)
+ elif archive_name == "bz2":
+ with bz2.open(compress_filename, "wb") as f_out:
+ shutil.copyfileobj(f_in, f_out)
+ return compress_filename
+
+ @pytest.fixture(autouse=True)
+ def prepare_tmp_folder(self):
+ # create tmp folder and remove it after a tests
+ os.makedirs(self.tmp_folder, exist_ok=True)
+ self.logger.info(f"create the tmp folder: {self.tmp_folder}")
+ yield
+ self.logger.info(f"remove the tmp folder: {self.tmp_folder}")
+ shutil.rmtree(self.tmp_folder, ignore_errors=True)
+
+ @property
+ def test_files(self) -> List[Mapping[str, Any]]:
+ schema = {
+ "id": "integer",
+ "name": "string",
+ "valid": "boolean",
+ "code": "integer",
+ "degrees": "number",
+ "birthday": "string",
+ "last_seen": "string",
+ }
+ suite = []
+ # basic 'normal' test
+ num_records = 10
+ params = {"filetype": self.filetype}
+ suite.append(
+ {
+ "test_alias": "basic 'normal' test",
+ "AbstractFileParser": ParquetParser(format=params, master_schema=schema),
+ "filepath": self.generate_parquet_file("normal_test", schema, num_records),
+ "num_records": num_records,
+ "inferred_schema": schema,
+ "line_checks": {},
+ "fails": [],
+ }
+ )
+ # tests custom Parquet parameters (row_groups, batch_size etc)
+ params = {
+ "filetype": self.filetype,
+ "buffer_size": 1024,
+ "columns": ["id", "name", "last_seen"],
+ "batch_size": 10,
+ }
+ num_records = 100
+ suite.append(
+ {
+ "test_alias": "custom parquet parameters",
+ "filepath": self.generate_parquet_file("normal_params_test", schema, num_records),
+ "num_records": num_records,
+ "AbstractFileParser": ParquetParser(
+ format=params,
+ master_schema=schema,
+ ),
+ "inferred_schema": schema,
+ "line_checks": {},
+ "fails": [],
+ }
+ )
+
+ # tests a big parquet file (100K records)
+ params = {
+ "filetype": self.filetype,
+ "batch_size": 10,
+ }
+ num_records = 100000
+ suite.append(
+ {
+ "test_alias": "big parquet file",
+ "filepath": self.generate_parquet_file("big_parquet_file", schema, num_records),
+ "num_records": num_records,
+ "AbstractFileParser": ParquetParser(
+ format=params,
+ master_schema=schema,
+ ),
+ "inferred_schema": schema,
+ "line_checks": {},
+ "fails": [],
+ }
+ )
+ # check one record
+ params = {"filetype": self.filetype}
+ num_records = 20
+ test_record = {
+ "id": 7,
+ "name": self._generate_value("string"),
+ "valid": False,
+ "code": 10,
+ "degrees": -9.2,
+ "birthday": self._generate_value("string"),
+ "last_seen": self._generate_value("string"),
+ }
+
+ suite.append(
+ {
+ "test_alias": "check one record",
+ "filepath": self.generate_parquet_file(
+ "check_one_record", schema, num_records, custom_rows={7: list(test_record.values())}
+ ),
+ "num_records": num_records,
+ "AbstractFileParser": ParquetParser(
+ format=params,
+ master_schema=schema,
+ ),
+ "inferred_schema": schema,
+ "line_checks": {8: test_record},
+ "fails": [],
+ }
+ )
+
+ # extra columns in master schema
+ params = {"filetype": self.filetype}
+ num_records = 10
+ extra_schema = copy.deepcopy(schema)
+ extra_schema.update(
+ {
+ "extra_id": "integer",
+ "extra_name": "string",
+ }
+ )
+ suite.append(
+ {
+ "test_alias": "extra columns in master schema",
+ "filepath": self.generate_parquet_file("normal_test", schema, num_records),
+ "num_records": num_records,
+ "AbstractFileParser": ParquetParser(
+ format=params,
+ master_schema=extra_schema,
+ ),
+ "inferred_schema": schema,
+ "line_checks": {},
+ "fails": [],
+ }
+ )
+ # tests missing columns in master schema
+ params = {"filetype": self.filetype}
+ num_records = 10
+ simplified_schema = copy.deepcopy(schema)
+ simplified_schema.pop("id")
+ simplified_schema.pop("name")
+
+ suite.append(
+ {
+ "test_alias": "tests missing columns in master schema",
+ "filepath": self.generate_parquet_file("normal_test", schema, num_records),
+ "num_records": num_records,
+ "AbstractFileParser": ParquetParser(
+ format=params,
+ master_schema=simplified_schema,
+ ),
+ "inferred_schema": schema,
+ "line_checks": {},
+ "fails": [],
+ }
+ )
+ # tests empty file, SHOULD FAIL INFER & STREAM RECORDS
+ num_records = 0
+ suite.append(
+ {
+ "test_alias": "empty file",
+ "filepath": self.generate_parquet_file("empty_file", schema, num_records),
+ "num_records": num_records,
+ "AbstractFileParser": ParquetParser(
+ format=params,
+ master_schema={},
+ ),
+ "inferred_schema": schema,
+ "line_checks": {},
+ "fails": ["test_get_inferred_schema", "test_stream_records"],
+ }
+ )
+
+ # tests compression: gzip
+ num_records = 10
+ for archive_type in ["gz", "bz2"]:
+ suite.append(
+ {
+ "test_alias": f"compression: {archive_type}",
+ "filepath": self.compress(
+ archive_type,
+ self.generate_parquet_file("compression_test", schema, num_records, custom_rows={7: list(test_record.values())}),
+ ),
+ "num_records": num_records,
+ "AbstractFileParser": ParquetParser(
+ format=params,
+ master_schema=schema,
+ ),
+ "inferred_schema": schema,
+ "line_checks": {8: test_record},
+ "fails": [],
+ }
+ )
+ return suite
diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md
index 403f7855340d..6decfc026854 100644
--- a/docs/integrations/sources/s3.md
+++ b/docs/integrations/sources/s3.md
@@ -53,12 +53,12 @@ File Formats are mostly enabled \(and further tested\) thanks to other open-sour
| Format | Supported? |
| :--- | :--- |
| CSV | Yes |
+| Parquet | Yes |
| JSON | No |
| HTML | No |
| XML | No |
| Excel | No |
| Feather | No |
-| Parquet | No |
| Pickle | No |
We're looking to enable these other formats very soon, so watch this space!
@@ -101,6 +101,7 @@ Some example patterns:
- `**/file.*|**/file` : match every file called "file" with any extension (or no extension).
- `x/*/y/*` : match all files that sit in folder x -> any folder -> folder y.
- `**/prefix*.csv` : match all csv files with specific prefix.
+- `**/prefix*.parquet` : match all parquet files with specific prefix.
Let's look at a specific example, matching the following bucket layout:
@@ -157,12 +158,13 @@ For example:
- `path_prefix` : an optional string that limits the files returned by AWS when listing files to only that those starting with this prefix. This is different to path_pattern as it gets pushed down to the API call made to S3 rather than filtered in Airbyte and it does not accept pattern-style symbols (like wildcards `*`). We recommend using this if your bucket has many folders and files that are unrelated to this stream and all the relevant files will always sit under this chosen prefix.
### File Format Settings
+The Reader in charge of loading the file format is currently based on [PyArrow](https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html) (Apache Arrow).
+Note that all files within one stream must adhere to the same read options for every provided format.
#### CSV
-The Reader in charge of loading the file format is currently based on [PyArrow](https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html) (Apache Arrow). Since CSV files are effectively plain text, providing specific reader options is often required for correct parsing of the files.
-
-Note that all files within one stream must adhere to the same CSV read options provided. These settings are applied when a CSV is created or exported so please ensure that this process happens consistently over time.
+Since CSV files are effectively plain text, providing specific reader options is often required for correct parsing of the files.
+These settings are applied when a CSV is created or exported so please ensure that this process happens consistently over time.
- `delimiter` : Even though CSV is an acronymn for Comma Separated Values, it is used more generally as a term for flat file data that may or may not be comma separated. The delimiter field lets you specify which character acts as the separator.
- `quote_char` : In some cases, data values may contain instances of reserved characters (like a comma, if that's the delimiter). CSVs can allow this behaviour by wrapping a value in defined quote characters so that on read it can parse it correctly.
@@ -178,10 +180,22 @@ The final setting in the UI is `additional_reader_options`. This is a catch-all
You can find details on [available options here](https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html#pyarrow.csv.ConvertOptions).
+#### Parquet
+Apache Parquet file is a column-oriented data storage format of the Apache Hadoop ecosystem. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk. For now this solutiion are iterating through individual files at the abstract-level thus partitioned parquet datasets are unsupported.
+The following settings are available:
+
+- `buffer_size` : If positive, perform read buffering when deserializing individual column chunks. Otherwise IO calls are unbuffered.
+- `columns` : If not None, only these columns will be read from the file.
+- `batch_size` : Maximum number of records per batch. Batches may be smaller if there aren’t enough rows in the file.
+
+
+You can find details on [here](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html#pyarrow.parquet.ParquetFile.iter_batches).
+
## Changelog
| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
+| 0.1.4 | 2021-08-13 | [5305](https://github.com/airbytehq/airbyte/pull/5305) | Support of Parquet format |
| 0.1.3 | 2021-08-04 | [5197](https://github.com/airbytehq/airbyte/pull/5197) | Fixed bug where sync could hang indefinitely on schema inference |
| 0.1.2 | 2021-08-02 | [5135](https://github.com/airbytehq/airbyte/pull/5135) | Fixed bug in spec so it displays in UI correctly |
| 0.1.1 | 2021-07-30 | [4990](https://github.com/airbytehq/airbyte/pull/4990/commits/ff5f70662c5f84eabc03526cddfcc9d73c58c0f4) | Fixed documentation url in source definition |
diff --git a/tools/bin/ci_credentials.sh b/tools/bin/ci_credentials.sh
index 746c970ba9d3..718ad66cd697 100755
--- a/tools/bin/ci_credentials.sh
+++ b/tools/bin/ci_credentials.sh
@@ -102,6 +102,7 @@ write_standard_creds source-recharge "$RECHARGE_INTEGRATION_TEST_CREDS"
write_standard_creds source-recurly "$SOURCE_RECURLY_INTEGRATION_TEST_CREDS"
write_standard_creds source-redshift "$AWS_REDSHIFT_INTEGRATION_TEST_CREDS"
write_standard_creds source-s3 "$SOURCE_S3_TEST_CREDS"
+write_standard_creds source-s3 "$SOURCE_S3_PARQUET_CREDS" "parquet_config.json"
write_standard_creds source-salesforce-singer "$SALESFORCE_INTEGRATION_TESTS_CREDS"
write_standard_creds source-sendgrid "$SENDGRID_INTEGRATION_TEST_CREDS"
write_standard_creds source-shopify "$SHOPIFY_INTEGRATION_TEST_CREDS"