diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml index 317bd8d680d3..0fd53f4ad86b 100644 --- a/.github/workflows/publish-command.yml +++ b/.github/workflows/publish-command.yml @@ -34,6 +34,7 @@ jobs: aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} github-token: ${{ secrets.SELF_RUNNER_GITHUB_ACCESS_TOKEN }} + ec2-image-id: ami-0d648081937c75a73 publish-image: needs: start-publish-image-runner runs-on: ${{ needs.start-publish-image-runner.outputs.label }} diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index 3a5b1359227c..6c6f98940e34 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -33,6 +33,7 @@ jobs: aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} github-token: ${{ secrets.SELF_RUNNER_GITHUB_ACCESS_TOKEN }} + ec2-image-id: ami-0d648081937c75a73 integration-test: timeout-minutes: 240 needs: start-test-runner diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json index f8417079b2c0..c126e4b0a1a8 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "69589781-7828-43c5-9f63-8925b1c1ccc2", "name": "S3", "dockerRepository": "airbyte/source-s3", - "dockerImageTag": "0.1.5", + "dockerImageTag": "0.1.6", "documentationUrl": "https://docs.airbyte.io/integrations/sources/s3" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index ee726139dff8..774a99480e4c 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -85,7 +85,7 @@ - sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 name: S3 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.5 + dockerImageTag: 0.1.6 documentationUrl: https://docs.airbyte.io/integrations/sources/s3 sourceType: file - sourceDefinitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87 diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index 0dc3dde2ff6e..21ffd014d63c 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,7 +17,7 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.5 +LABEL io.airbyte.version=0.1.6 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json index b826eb56aa83..9c8cb99621ad 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json @@ -93,6 +93,15 @@ "{\"timestamp_parsers\": [\"%m/%d/%Y %H:%M\", \"%Y/%m/%d %H:%M\"], \"strings_can_be_null\": true, \"null_values\": [\"NA\", \"NULL\"]}" ], "type": "string" + }, + "advanced_options": { + "title": "Advanced Options", + "description": "Optionally add a valid JSON string here to provide additional Pyarrow ReadOptions. Specify 'column_names' here if your CSV doesn't have header, or if you want to use custom column names. 'block_size' and 'encoding' are already used above, specify them again here will override the values above.", + "default": "{}", + "examples": [ + "{\"column_names\": [\"column1\", \"column2\"]}" + ], + "type": "string" } } }, diff --git a/airbyte-integrations/connectors/source-s3/setup.py b/airbyte-integrations/connectors/source-s3/setup.py index ae62d1616611..b2d43ccb3600 100644 --- a/airbyte-integrations/connectors/source-s3/setup.py +++ b/airbyte-integrations/connectors/source-s3/setup.py @@ -6,7 +6,7 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk~=0.1.7", + "airbyte-cdk~=0.1.28", "pyarrow==4.0.1", "smart-open[s3]==5.1.0", "wcmatch==8.2", diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py index c16794c493c0..08e8a13a0919 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py @@ -28,7 +28,10 @@ def _read_options(self): https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html build ReadOptions object like: pa.csv.ReadOptions(**self._read_options()) """ - return {"block_size": self._format.get("block_size", 10000), "encoding": self._format.get("encoding", "utf8")} + return { + **{"block_size": self._format.get("block_size", 10000), "encoding": self._format.get("encoding", "utf8")}, + **json.loads(self._format.get("advanced_options", "{}")), + } def _parse_options(self): """ diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py index 41cb701e8601..a05c22e71dd7 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py @@ -50,3 +50,8 @@ class Config: '{"timestamp_parsers": ["%m/%d/%Y %H:%M", "%Y/%m/%d %H:%M"], "strings_can_be_null": true, "null_values": ["NA", "NULL"]}' ], ) + advanced_options: str = Field( + default="{}", + description="Optionally add a valid JSON string here to provide additional Pyarrow ReadOptions. Specify 'column_names' here if your CSV doesn't have header, or if you want to use custom column names. 'block_size' and 'encoding' are already used above, specify them again here will override the values above.", + examples=["{\"column_names\": [\"column1\", \"column2\"]}"], + ) diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index 06706255a934..c4bffaff8c74 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -8,6 +8,7 @@ from abc import ABC, abstractmethod from copy import deepcopy from datetime import datetime +from functools import lru_cache from operator import itemgetter from traceback import format_exc from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union @@ -19,10 +20,11 @@ from .formats.csv_parser import CsvParser from .formats.parquet_parser import ParquetParser -from .storagefile import StorageFile JSON_TYPES = ["string", "number", "integer", "object", "array", "boolean", "null"] +LOGGER = AirbyteLogger() + class ConfigurationError(Exception): """Client mis-configured""" @@ -60,8 +62,7 @@ def __init__(self, dataset: str, provider: dict, format: dict, path_pattern: str if schema: self._schema = self._parse_user_input_schema(schema) self.master_schema = None - self.storagefile_cache: Optional[List[Tuple[datetime, StorageFile]]] = None - self.logger.info(f"initialised stream with format: {format}") + LOGGER.info(f"initialised stream with format: {format}") @staticmethod def _parse_user_input_schema(schema: str) -> Mapping[str, str]: @@ -142,7 +143,8 @@ def pattern_matched_filepath_iterator(self, filepaths: Iterable[str]) -> Iterato if globmatch(filepath, self._path_pattern, flags=GLOBSTAR | SPLIT): yield filepath - def time_ordered_storagefile_iterator(self) -> Iterable[Tuple[datetime, StorageFile]]: + @lru_cache(maxsize=None) + def get_time_ordered_filepaths(self) -> Iterable[Tuple[datetime, str]]: """ Iterates through pattern_matched_filepath_iterator(), acquiring last_modified property of each file to return in time ascending order. Uses concurrent.futures to thread this asynchronously in order to improve performance when there are many files (network I/O) @@ -150,29 +152,25 @@ def time_ordered_storagefile_iterator(self) -> Iterable[Tuple[datetime, StorageF :return: list in time-ascending order """ - - def get_storagefile_with_lastmod(filepath: str) -> Tuple[datetime, StorageFile]: + def get_storagefile_with_lastmod(filepath: str) -> Tuple[datetime, str]: fc = self.storagefile_class(filepath, self._provider) - return (fc.last_modified, fc) - - if self.storagefile_cache is None: - storagefiles = [] - # use concurrent future threads to parallelise grabbing last_modified from all the files - # TODO: don't hardcode max_workers like this - with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor: + return (fc.last_modified, filepath) - filepath_gen = self.pattern_matched_filepath_iterator(self.filepath_iterator()) + storagefiles = [] + # use concurrent future threads to parallelise grabbing last_modified from all the files + # TODO: don't hardcode max_workers like this + with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor: - futures = [executor.submit(get_storagefile_with_lastmod, fp) for fp in filepath_gen] + filepath_gen = self.pattern_matched_filepath_iterator(self.filepath_iterator()) - for future in concurrent.futures.as_completed(futures): - # this will failfast on any errors - storagefiles.append(future.result()) + futures = [executor.submit(get_storagefile_with_lastmod, fp) for fp in filepath_gen] - # The array storagefiles contain tuples of (last_modified, StorageFile), so sort by last_modified - self.storagefile_cache = sorted(storagefiles, key=itemgetter(0)) + for future in concurrent.futures.as_completed(futures): + # this will failfast on any errors + storagefiles.append(future.result()) - return self.storagefile_cache + # The array storagefiles contain tuples of (last_modified, filepath), so sort by last_modified + return sorted(storagefiles, key=itemgetter(0)) def _get_schema_map(self) -> Mapping[str, Any]: if self._schema != {}: @@ -197,26 +195,33 @@ def get_json_schema(self) -> Mapping[str, Any]: properties[self.ab_last_mod_col]["format"] = "date-time" return {"type": "object", "properties": properties} - def _get_master_schema(self) -> Mapping[str, Any]: + def _get_master_schema(self, min_datetime: datetime = None) -> Mapping[str, Any]: """ In order to auto-infer a schema across many files and/or allow for additional properties (columns), we need to determine the superset of schemas across all relevant files. - This method iterates through time_ordered_storagefile_iterator() obtaining the inferred schema (process implemented per file format), + This method iterates through get_time_ordered_filepaths() obtaining the inferred schema (process implemented per file format), to build up this superset schema (master_schema). This runs datatype checks to Warn or Error if we find incompatible schemas (e.g. same column is 'date' in one file but 'float' in another). This caches the master_schema after first run in order to avoid repeated compute and network calls to infer schema on all files. + :param min_datetime: if passed, will only use files with last_modified >= this to determine master schema + :raises RuntimeError: if we find datatype mismatches between files or between a file and schema state (provided or from previous inc. batch) :return: A dict of the JSON schema representing this stream. """ # TODO: could implement a (user-beware) 'lazy' mode that skips schema checking to improve performance + # TODO: could utilise min_datetime to add a start_date parameter in spec for user if self.master_schema is None: master_schema = deepcopy(self._schema) file_reader = self.fileformatparser_class(self._format) - # time order isn't necessary here but we might as well use this method so we cache the list for later use - for _, storagefile in self.time_ordered_storagefile_iterator(): + for last_mod, filepath in self.get_time_ordered_filepaths(): + # skip this file if it's earlier than min_datetime + if (min_datetime is not None) and (last_mod < min_datetime): + continue + + storagefile = self.storagefile_class(filepath, self._provider) with storagefile.open(file_reader.is_binary) as f: this_schema = file_reader.get_inferred_schema(f) @@ -232,7 +237,7 @@ def _get_master_schema(self) -> Mapping[str, Any]: # this is to allow more leniency as we may be able to coerce this datatype mismatch on read according to provided schema state # if not, then the read will error anyway if col in self._schema.keys(): - self.logger.warn( + LOGGER.warn( f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. " + f"Should be '{master_schema[col]}', but found '{this_schema[col]}'. " + f"Airbyte will attempt to coerce this to {master_schema[col]} on read." @@ -251,7 +256,7 @@ def _get_master_schema(self) -> Mapping[str, Any]: if col not in master_schema.keys(): master_schema[col] = datatype - self.logger.info(f"determined master schema: {master_schema}") + LOGGER.info(f"determined master schema: {master_schema}") self.master_schema = master_schema return self.master_schema @@ -261,17 +266,18 @@ def stream_slices( ) -> Iterable[Optional[Mapping[str, Any]]]: """ This builds full-refresh stream_slices regardless of sync_mode param. - 1 file == 1 stream_slice. + For full refresh, 1 file == 1 stream_slice. + The structure of a stream slice is [ {file}, ... ]. + In incremental mode, a stream slice may have more than one file so we mirror that format here. Incremental stream_slices are implemented in the IncrementalFileStream child class. """ # TODO: this could be optimised via concurrent reads, however we'd lose chronology and need to deal with knock-ons of that # we could do this concurrently both full and incremental by running batches in parallel # and then incrementing the cursor per each complete batch - for last_mod, storagefile in self.time_ordered_storagefile_iterator(): + for last_mod, filepath in self.get_time_ordered_filepaths(): + storagefile = self.storagefile_class(filepath, self._provider) yield [{"unique_url": storagefile.url, "last_modified": last_mod, "storagefile": storagefile}] - # in case we have no files - yield from [None] def _match_target_schema(self, record: Mapping[str, Any], target_columns: List) -> Mapping[str, Any]: """ @@ -312,21 +318,17 @@ def _add_extra_fields_from_map(self, record: Mapping[str, Any], extra_map: Mappi record[key] = value return record - def read_records( + def _read_from_slice( self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, + file_reader, + stream_slice: Mapping[str, Any], stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: """ - Uses provider-relevant StorageFile to open file and then iterates through stream_records() using format-relevant FileFormatParser. + Uses provider-relevant StorageFile to open file and then iterates through stream_records() using format-relevant AbstractFileParser. Records are mutated on the fly using _match_target_schema() and _add_extra_fields_from_map() to achieve desired final schema. - Since this is called per stream_slice, this method works for both full_refresh and incremental so sync_mode is ignored. + Since this is called per stream_slice, this method works for both full_refresh and incremental. """ - stream_slice = stream_slice if stream_slice is not None else [] - file_reader = self.fileformatparser_class(self._format, self._get_master_schema()) - # TODO: read all files in a stream_slice concurrently for file_info in stream_slice: with file_info["storagefile"].open(file_reader.is_binary) as f: @@ -341,20 +343,31 @@ def read_records( }, ) yield complete_record - self.logger.info("finished reading a stream slice") - + LOGGER.info("finished reading a stream slice") # Always return an empty generator just in case no records were ever yielded yield from [] + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + """ + The heavy lifting sits in _read_from_slice() which is full refresh / incremental agnostic + """ + stream_slice = stream_slice if stream_slice is not None else [] + file_reader = self.fileformatparser_class(self._format, self._get_master_schema()) + + yield from self._read_from_slice(file_reader, stream_slice) + class IncrementalFileStream(FileStream, ABC): # TODO: ideally want to checkpoint after every file or stream slice rather than N records state_checkpoint_interval = None - # TODO: would be great if we could override time_ordered_storagefile_iterator() here with state awareness - # this would allow filtering down to only files we need early and avoid unnecessary work - @property def cursor_field(self) -> str: """ @@ -362,6 +375,13 @@ def cursor_field(self) -> str: """ return self.ab_last_mod_col + def _get_datetime_from_stream_state(self, stream_state: Mapping[str, Any] = None) -> datetime: + """ if no state, we default to 1970-01-01 in order to pick up all files present. """ + if stream_state is not None and self.cursor_field in stream_state.keys(): + return datetime.strptime(stream_state[self.cursor_field], self.datetime_format_string) + else: + return datetime.strptime("1970-01-01T00:00:00+0000", self.datetime_format_string) + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: """ Inspects the latest record extracted from the data source and the current state object and return an updated state object. @@ -373,16 +393,11 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late :return: An updated state object """ state_dict = {} - if current_stream_state is not None and self.cursor_field in current_stream_state.keys(): - current_parsed_datetime = datetime.strptime(current_stream_state[self.cursor_field], self.datetime_format_string) - latest_record_datetime = datetime.strptime( - latest_record.get(self.cursor_field, "1970-01-01T00:00:00+0000"), self.datetime_format_string - ) - state_dict[self.cursor_field] = datetime.strftime( - max(current_parsed_datetime, latest_record_datetime), self.datetime_format_string - ) - else: - state_dict[self.cursor_field] = "1970-01-01T00:00:00+0000" + current_parsed_datetime = self._get_datetime_from_stream_state(current_stream_state) + latest_record_datetime = datetime.strptime( + latest_record.get(self.cursor_field, "1970-01-01T00:00:00+0000"), self.datetime_format_string + ) + state_dict[self.cursor_field] = datetime.strftime(max(current_parsed_datetime, latest_record_datetime), self.datetime_format_string) state_dict["schema"] = self._get_schema_map() return state_dict @@ -395,7 +410,7 @@ def stream_slices( An incremental stream_slice is a group of all files with the exact same last_modified timestamp. This ensures we only update the cursor state to a given timestamp after ALL files with that timestamp have been successfully read. - Slight nuance: as we iterate through time_ordered_storagefile_iterator(), + Slight nuance: as we iterate through get_time_ordered_filepaths(), we yield the stream_slice containing file(s) up to and EXcluding the file on the current iteration. The stream_slice is then cleared (if we yielded it) and this iteration's file appended to the (next) stream_slice """ @@ -412,15 +427,16 @@ def stream_slices( prev_file_last_mod = None # init variable to hold previous iterations last modified stream_slice = [] - for last_mod, storagefile in self.time_ordered_storagefile_iterator(): + for last_mod, filepath in self.get_time_ordered_filepaths(): # skip this file if last_mod is earlier than our cursor value from state if ( stream_state is not None and self.cursor_field in stream_state.keys() - and last_mod <= datetime.strptime(stream_state[self.cursor_field], self.datetime_format_string) + and last_mod <= self._get_datetime_from_stream_state(stream_state) ): continue + storagefile = self.storagefile_class(filepath, self._provider) # check if this storagefile belongs in the next slice, if so yield the current slice before this file if (prev_file_last_mod is not None) and (last_mod != prev_file_last_mod): yield stream_slice @@ -436,3 +452,25 @@ def stream_slices( # in case we have no files yield from [None] + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + """ + The heavy lifting sits in _read_from_slice() which is full refresh / incremental agnostic. + We override this for incremental so we can pass our minimum datetime from state into _get_master_schema(). + This means we only parse the schema of new files on incremental runs rather than all files in the bucket. + """ + if sync_mode == SyncMode.full_refresh: + yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state) + + else: + stream_slice = stream_slice if stream_slice is not None else [] + file_reader = self.fileformatparser_class( + self._format, self._get_master_schema(self._get_datetime_from_stream_state(stream_state)) + ) + yield from self._read_from_slice(file_reader, stream_slice) diff --git a/airbyte-integrations/connectors/source-s3/source_s3/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/stream.py index 992a00208818..80782a1b7ee1 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/stream.py @@ -36,7 +36,7 @@ def _list_bucket(self, accept_key=lambda k: True) -> Iterator[str]: else: session = boto3session.Session() client_config = Config(signature_version=UNSIGNED) - client = make_s3_client(self._provider, config=client_config, session=session) + client = make_s3_client(provider, config=client_config, session=session) ctoken = None while True: diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/csv/test_file_8_no_header.csv b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/csv/test_file_8_no_header.csv new file mode 100644 index 000000000000..e93df63f0460 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/csv/test_file_8_no_header.csv @@ -0,0 +1,8 @@ +1,PVdhmjb1,False,12,-31.3,2021-07-14,2021-07-14 15:30:09.224125 +2,j4DyXTS7,True,-8,41.6,2021-07-14,2021-07-14 15:30:09.224383 +3,v0w8fTME,False,7,-27.5,2021-07-14,2021-07-14 15:30:09.224527 +4,1q6jD8Np,False,-8,-6.7,2021-07-14,2021-07-14 15:30:09.224741 +5,77h4aiMP,True,-15,-13.7,2021-07-14,2021-07-14 15:30:09.224907 +6,Le35Wyic,True,3,35.3,2021-07-14,2021-07-14 15:30:09.225033 +7,xZhh1Kyl,False,10,-9.2,2021-07-14,2021-07-14 15:30:09.225145 +8,M2t286iJ,False,4,-3.5,2021-07-14,2021-07-14 15:30:09.225320 diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/test_csv_parser.py b/airbyte-integrations/connectors/source-s3/unit_tests/test_csv_parser.py index 623981af9de6..7d63667ae2d7 100644 --- a/airbyte-integrations/connectors/source-s3/unit_tests/test_csv_parser.py +++ b/airbyte-integrations/connectors/source-s3/unit_tests/test_csv_parser.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # +import json import os from pathlib import Path from typing import Any, List, Mapping @@ -249,4 +250,30 @@ def test_files(self) -> List[Mapping[str, Any]]: "line_checks": {}, "fails": ["test_get_inferred_schema", "test_stream_records"], }, + { + # no header test + "test_alias": "no header csv file", + "AbstractFileParser": CsvParser( + format={ + "filetype": "csv", + "advanced_options": json.dumps({ + "column_names": ["id", "name", "valid", "code", "degrees", "birthday", "last_seen"] + }) + }, + master_schema={} + ), + "filepath": os.path.join(SAMPLE_DIRECTORY, "csv/test_file_8_no_header.csv"), + "num_records": 8, + "inferred_schema": { + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + "line_checks": {}, + "fails": [], + }, ] diff --git a/airbyte-webapp/src/App.tsx b/airbyte-webapp/src/App.tsx index 99a18ebbe2a7..ed0dffbf14f0 100644 --- a/airbyte-webapp/src/App.tsx +++ b/airbyte-webapp/src/App.tsx @@ -44,6 +44,9 @@ const Features: Feature[] = [ { id: FeatureItem.AllowUpdateConnectors, }, + { + id: FeatureItem.AllowOAuthConnector, + }, ]; const StyleProvider: React.FC = ({ children }) => ( diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 055167ee70e8..e1a71dea0cab 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -181,14 +181,16 @@ Since CSV files are effectively plain text, providing specific reader options is * `double_quote` : Whether two quotes in a quoted CSV value denote a single quote in the data. * `newlines_in_values` : Sometimes referred to as `multiline`. In most cases, newline characters signal the end of a row in a CSV, however text data may contain newline characters within it. Setting this to True allows correct parsing in this case. * `block_size` : This is the number of bytes to process in memory at a time while reading files. The default value here is usually fine but if your table is particularly wide \(lots of columns / data in fields is large\) then raising this might solve failures on detecting schema. Since this defines how much data to read into memory, raising this too high could cause Out Of Memory issues so use with caution. +* `additional_reader_options` : This allows for editing the less commonly required CSV [ConvertOptions](https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html#pyarrow.csv.ConvertOptions). The value must be a valid JSON string, e.g.: -The final setting in the UI is `additional_reader_options`. This is a catch-all to allow for editing the less commonly required CSV parsing options. The value must be a valid JSON string, e.g.: + ```text + {"timestamp_parsers": ["%m/%d/%Y %H:%M", "%Y/%m/%d %H:%M"], "strings_can_be_null": true, "null_values": ["NA", "NULL"]} + ``` +* `advanced_options` : This allows for editing the less commonly required CSV [ReadOptions](https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html#pyarrow.csv.ReadOptions). The value must be a valid JSON string. One use case for this is when your CSV has no header, or you want to use custom column names, you can specify `column_names` using this option. -```text -{"timestamp_parsers": ["%m/%d/%Y %H:%M", "%Y/%m/%d %H:%M"], "strings_can_be_null": true, "null_values": ["NA", "NULL"]} -``` - -You can find details on [available options here](https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html#pyarrow.csv.ConvertOptions). + ```test + {"column_names": ["column1", "column2", "column3"]} + ``` #### Parquet @@ -204,6 +206,7 @@ You can find details on [here](https://arrow.apache.org/docs/python/generated/py | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.6 | 2021-10-15 | [6615](https://github.com/airbytehq/airbyte/pull/6615) & [7058](https://github.com/airbytehq/airbyte/pull/7058) | Memory and performance optimisation. Advanced options for CSV parsing. | | 0.1.5 | 2021-09-24 | [6398](https://github.com/airbytehq/airbyte/pull/6398) | Support custom non Amazon S3 services | | 0.1.4 | 2021-08-13 | [5305](https://github.com/airbytehq/airbyte/pull/5305) | Support of Parquet format | | 0.1.3 | 2021-08-04 | [5197](https://github.com/airbytehq/airbyte/pull/5197) | Fixed bug where sync could hang indefinitely on schema inference |