From 6907d882c83f67076865fc7a97cce37a169749c3 Mon Sep 17 00:00:00 2001 From: George Claireaux Date: Fri, 1 Oct 2021 16:04:55 +0100 Subject: [PATCH 01/11] memory & performance optimisations --- .../source_s3/source_files_abstract/stream.py | 119 ++++++++++++------ 1 file changed, 79 insertions(+), 40 deletions(-) diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index 5f431bdc02be..19efdd428349 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -19,7 +19,6 @@ from .formats.csv_parser import CsvParser from .formats.parquet_parser import ParquetParser -from .storagefile import StorageFile JSON_TYPES = ["string", "number", "integer", "object", "array", "boolean", "null"] @@ -60,7 +59,7 @@ def __init__(self, dataset: str, provider: dict, format: dict, path_pattern: str if schema: self._schema = self._parse_user_input_schema(schema) self.master_schema = None - self.storagefile_cache: Optional[List[Tuple[datetime, StorageFile]]] = None + self.time_ordered_filepath_cache: Optional[List[Tuple[datetime, str]]] = None self.logger = AirbyteLogger() self.logger.info(f"initialised stream with format: {format}") @@ -143,7 +142,7 @@ def pattern_matched_filepath_iterator(self, filepaths: Iterable[str]) -> Iterato if globmatch(filepath, self._path_pattern, flags=GLOBSTAR | SPLIT): yield filepath - def time_ordered_storagefile_iterator(self) -> Iterable[Tuple[datetime, StorageFile]]: + def time_ordered_filepath_iterator(self) -> Iterable[Tuple[datetime, str]]: """ Iterates through pattern_matched_filepath_iterator(), acquiring last_modified property of each file to return in time ascending order. Uses concurrent.futures to thread this asynchronously in order to improve performance when there are many files (network I/O) @@ -152,11 +151,11 @@ def time_ordered_storagefile_iterator(self) -> Iterable[Tuple[datetime, StorageF :return: list in time-ascending order """ - def get_storagefile_with_lastmod(filepath: str) -> Tuple[datetime, StorageFile]: + def get_storagefile_with_lastmod(filepath: str) -> Tuple[datetime, str]: fc = self.storagefile_class(filepath, self._provider) - return (fc.last_modified, fc) + return (fc.last_modified, filepath) - if self.storagefile_cache is None: + if self.time_ordered_filepath_cache is None: storagefiles = [] # use concurrent future threads to parallelise grabbing last_modified from all the files # TODO: don't hardcode max_workers like this @@ -170,10 +169,10 @@ def get_storagefile_with_lastmod(filepath: str) -> Tuple[datetime, StorageFile]: # this will failfast on any errors storagefiles.append(future.result()) - # The array storagefiles contain tuples of (last_modified, StorageFile), so sort by last_modified - self.storagefile_cache = sorted(storagefiles, key=itemgetter(0)) + # The array storagefiles contain tuples of (last_modified, filepath), so sort by last_modified + self.time_ordered_filepath_cache = sorted(storagefiles, key=itemgetter(0)) - return self.storagefile_cache + return self.time_ordered_filepath_cache def _get_schema_map(self) -> Mapping[str, Any]: if self._schema != {}: @@ -198,26 +197,33 @@ def get_json_schema(self) -> Mapping[str, Any]: properties[self.ab_last_mod_col]["format"] = "date-time" return {"type": "object", "properties": properties} - def _get_master_schema(self) -> Mapping[str, Any]: + def _get_master_schema(self, min_datetime: datetime = None) -> Mapping[str, Any]: """ In order to auto-infer a schema across many files and/or allow for additional properties (columns), we need to determine the superset of schemas across all relevant files. - This method iterates through time_ordered_storagefile_iterator() obtaining the inferred schema (process implemented per file format), + This method iterates through time_ordered_filepath_iterator() obtaining the inferred schema (process implemented per file format), to build up this superset schema (master_schema). This runs datatype checks to Warn or Error if we find incompatible schemas (e.g. same column is 'date' in one file but 'float' in another). This caches the master_schema after first run in order to avoid repeated compute and network calls to infer schema on all files. + :param min_datetime: if passed, will only use files with last_modified >= this to determine master schema + :raises RuntimeError: if we find datatype mismatches between files or between a file and schema state (provided or from previous inc. batch) :return: A dict of the JSON schema representing this stream. """ # TODO: could implement a (user-beware) 'lazy' mode that skips schema checking to improve performance + # TODO: could utilise min_datetime to add a start_date parameter in spec for user if self.master_schema is None: master_schema = deepcopy(self._schema) file_reader = self.fileformatparser_class(self._format) - # time order isn't necessary here but we might as well use this method so we cache the list for later use - for _, storagefile in self.time_ordered_storagefile_iterator(): + for last_mod, filepath in self.time_ordered_filepath_iterator(): + # skip this file if it's earlier than min_datetime + if (min_datetime is not None) and (last_mod < min_datetime): + continue + + storagefile = self.storagefile_class(filepath, self._provider) with storagefile.open(file_reader.is_binary) as f: this_schema = file_reader.get_inferred_schema(f) @@ -269,7 +275,8 @@ def stream_slices( # TODO: this could be optimised via concurrent reads, however we'd lose chronology and need to deal with knock-ons of that # we could do this concurrently both full and incremental by running batches in parallel # and then incrementing the cursor per each complete batch - for last_mod, storagefile in self.time_ordered_storagefile_iterator(): + for last_mod, filepath in self.time_ordered_filepath_iterator(): + storagefile = self.storagefile_class(filepath, self._provider) yield [{"unique_url": storagefile.url, "last_modified": last_mod, "storagefile": storagefile}] # in case we have no files yield from [None] @@ -313,21 +320,17 @@ def _add_extra_fields_from_map(self, record: Mapping[str, Any], extra_map: Mappi record[key] = value return record - def read_records( + def _read_from_slice( self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, + file_reader, + stream_slice: Mapping[str, Any], stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: """ - Uses provider-relevant StorageFile to open file and then iterates through stream_records() using format-relevant FileFormatParser. + Uses provider-relevant StorageFile to open file and then iterates through stream_records() using format-relevant AbstractFileParser. Records are mutated on the fly using _match_target_schema() and _add_extra_fields_from_map() to achieve desired final schema. - Since this is called per stream_slice, this method works for both full_refresh and incremental so sync_mode is ignored. + Since this is called per stream_slice, this method works for both full_refresh and incremental. """ - stream_slice = stream_slice if stream_slice is not None else [] - file_reader = self.fileformatparser_class(self._format, self._get_master_schema()) - # TODO: read all files in a stream_slice concurrently for file_info in stream_slice: with file_info["storagefile"].open(file_reader.is_binary) as f: @@ -343,19 +346,30 @@ def read_records( ) yield complete_record self.logger.info("finished reading a stream slice") - # Always return an empty generator just in case no records were ever yielded yield from [] + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + """ + The heavy lifting sits in _read_from_slice() which is full refresh / incremental agnostic + """ + stream_slice = stream_slice if stream_slice is not None else [] + file_reader = self.fileformatparser_class(self._format, self._get_master_schema()) + + yield from self._read_from_slice(file_reader, stream_slice) + class IncrementalFileStream(FileStream, ABC): # TODO: ideally want to checkpoint after every file or stream slice rather than N records state_checkpoint_interval = None - # TODO: would be great if we could override time_ordered_storagefile_iterator() here with state awareness - # this would allow filtering down to only files we need early and avoid unnecessary work - @property def cursor_field(self) -> str: """ @@ -363,6 +377,13 @@ def cursor_field(self) -> str: """ return self.ab_last_mod_col + def _get_datetime_from_stream_state(self, stream_state: Mapping[str, Any] = None) -> datetime: + """ if no state, we default to 1970-01-01 in order to pick up all files present. """ + if stream_state is not None and self.cursor_field in stream_state.keys(): + return datetime.strptime(stream_state[self.cursor_field], self.datetime_format_string) + else: + return datetime.strptime("1970-01-01T00:00:00+0000", self.datetime_format_string) + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: """ Inspects the latest record extracted from the data source and the current state object and return an updated state object. @@ -374,16 +395,11 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late :return: An updated state object """ state_dict = {} - if current_stream_state is not None and self.cursor_field in current_stream_state.keys(): - current_parsed_datetime = datetime.strptime(current_stream_state[self.cursor_field], self.datetime_format_string) - latest_record_datetime = datetime.strptime( - latest_record.get(self.cursor_field, "1970-01-01T00:00:00+0000"), self.datetime_format_string - ) - state_dict[self.cursor_field] = datetime.strftime( - max(current_parsed_datetime, latest_record_datetime), self.datetime_format_string - ) - else: - state_dict[self.cursor_field] = "1970-01-01T00:00:00+0000" + current_parsed_datetime = self._get_datetime_from_stream_state(current_stream_state) + latest_record_datetime = datetime.strptime( + latest_record.get(self.cursor_field, "1970-01-01T00:00:00+0000"), self.datetime_format_string + ) + state_dict[self.cursor_field] = datetime.strftime(max(current_parsed_datetime, latest_record_datetime), self.datetime_format_string) state_dict["schema"] = self._get_schema_map() return state_dict @@ -396,7 +412,7 @@ def stream_slices( An incremental stream_slice is a group of all files with the exact same last_modified timestamp. This ensures we only update the cursor state to a given timestamp after ALL files with that timestamp have been successfully read. - Slight nuance: as we iterate through time_ordered_storagefile_iterator(), + Slight nuance: as we iterate through time_ordered_filepath_iterator(), we yield the stream_slice containing file(s) up to and EXcluding the file on the current iteration. The stream_slice is then cleared (if we yielded it) and this iteration's file appended to the (next) stream_slice """ @@ -413,15 +429,16 @@ def stream_slices( prev_file_last_mod = None # init variable to hold previous iterations last modified stream_slice = [] - for last_mod, storagefile in self.time_ordered_storagefile_iterator(): + for last_mod, filepath in self.time_ordered_filepath_iterator(): # skip this file if last_mod is earlier than our cursor value from state if ( stream_state is not None and self.cursor_field in stream_state.keys() - and last_mod <= datetime.strptime(stream_state[self.cursor_field], self.datetime_format_string) + and last_mod <= self._get_datetime_from_stream_state(stream_state) ): continue + storagefile = self.storagefile_class(filepath, self._provider) # check if this storagefile belongs in the next slice, if so yield the current slice before this file if (prev_file_last_mod is not None) and (last_mod != prev_file_last_mod): yield stream_slice @@ -437,3 +454,25 @@ def stream_slices( # in case we have no files yield from [None] + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + """ + The heavy lifting sits in _read_from_slice() which is full refresh / incremental agnostic. + We override this for incremental so we can pass our minimum datetime from state into _get_master_schema(). + This means we only parse the schema of new files on incremental runs rather than all files in the bucket. + """ + if sync_mode == SyncMode.full_refresh: + yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state) + + else: + stream_slice = stream_slice if stream_slice is not None else [] + file_reader = self.fileformatparser_class( + self._format, self._get_master_schema(self._get_datetime_from_stream_state(stream_state)) + ) + yield from self._read_from_slice(file_reader, stream_slice) From 793fec56e361efa5768c50c83b3ccca502f9e699 Mon Sep 17 00:00:00 2001 From: George Claireaux Date: Fri, 15 Oct 2021 14:16:33 +0100 Subject: [PATCH 02/11] address comments --- .../connectors/source-s3/setup.py | 2 +- .../source_s3/source_files_abstract/stream.py | 60 +++++++++---------- .../connectors/source-s3/source_s3/stream.py | 2 +- 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/airbyte-integrations/connectors/source-s3/setup.py b/airbyte-integrations/connectors/source-s3/setup.py index ae62d1616611..b2d43ccb3600 100644 --- a/airbyte-integrations/connectors/source-s3/setup.py +++ b/airbyte-integrations/connectors/source-s3/setup.py @@ -6,7 +6,7 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk~=0.1.7", + "airbyte-cdk~=0.1.28", "pyarrow==4.0.1", "smart-open[s3]==5.1.0", "wcmatch==8.2", diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index 19efdd428349..52b2026c28cd 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -8,6 +8,7 @@ from abc import ABC, abstractmethod from copy import deepcopy from datetime import datetime +from functools import lru_cache from operator import itemgetter from traceback import format_exc from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union @@ -22,6 +23,8 @@ JSON_TYPES = ["string", "number", "integer", "object", "array", "boolean", "null"] +LOGGER = AirbyteLogger() + class ConfigurationError(Exception): """Client mis-configured""" @@ -59,9 +62,7 @@ def __init__(self, dataset: str, provider: dict, format: dict, path_pattern: str if schema: self._schema = self._parse_user_input_schema(schema) self.master_schema = None - self.time_ordered_filepath_cache: Optional[List[Tuple[datetime, str]]] = None - self.logger = AirbyteLogger() - self.logger.info(f"initialised stream with format: {format}") + LOGGER.info(f"initialised stream with format: {format}") @staticmethod def _parse_user_input_schema(schema: str) -> Mapping[str, str]: @@ -142,7 +143,8 @@ def pattern_matched_filepath_iterator(self, filepaths: Iterable[str]) -> Iterato if globmatch(filepath, self._path_pattern, flags=GLOBSTAR | SPLIT): yield filepath - def time_ordered_filepath_iterator(self) -> Iterable[Tuple[datetime, str]]: + @lru_cache(maxsize=None) + def get_time_ordered_filepaths(self) -> Iterable[Tuple[datetime, str]]: """ Iterates through pattern_matched_filepath_iterator(), acquiring last_modified property of each file to return in time ascending order. Uses concurrent.futures to thread this asynchronously in order to improve performance when there are many files (network I/O) @@ -150,29 +152,25 @@ def time_ordered_filepath_iterator(self) -> Iterable[Tuple[datetime, str]]: :return: list in time-ascending order """ - def get_storagefile_with_lastmod(filepath: str) -> Tuple[datetime, str]: fc = self.storagefile_class(filepath, self._provider) return (fc.last_modified, filepath) - if self.time_ordered_filepath_cache is None: - storagefiles = [] - # use concurrent future threads to parallelise grabbing last_modified from all the files - # TODO: don't hardcode max_workers like this - with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor: - - filepath_gen = self.pattern_matched_filepath_iterator(self.filepath_iterator()) + storagefiles = [] + # use concurrent future threads to parallelise grabbing last_modified from all the files + # TODO: don't hardcode max_workers like this + with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor: - futures = [executor.submit(get_storagefile_with_lastmod, fp) for fp in filepath_gen] + filepath_gen = self.pattern_matched_filepath_iterator(self.filepath_iterator()) - for future in concurrent.futures.as_completed(futures): - # this will failfast on any errors - storagefiles.append(future.result()) + futures = [executor.submit(get_storagefile_with_lastmod, fp) for fp in filepath_gen] - # The array storagefiles contain tuples of (last_modified, filepath), so sort by last_modified - self.time_ordered_filepath_cache = sorted(storagefiles, key=itemgetter(0)) + for future in concurrent.futures.as_completed(futures): + # this will failfast on any errors + storagefiles.append(future.result()) - return self.time_ordered_filepath_cache + # The array storagefiles contain tuples of (last_modified, filepath), so sort by last_modified + return sorted(storagefiles, key=itemgetter(0)) def _get_schema_map(self) -> Mapping[str, Any]: if self._schema != {}: @@ -201,7 +199,7 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Mapping[str, Any] """ In order to auto-infer a schema across many files and/or allow for additional properties (columns), we need to determine the superset of schemas across all relevant files. - This method iterates through time_ordered_filepath_iterator() obtaining the inferred schema (process implemented per file format), + This method iterates through get_time_ordered_filepaths() obtaining the inferred schema (process implemented per file format), to build up this superset schema (master_schema). This runs datatype checks to Warn or Error if we find incompatible schemas (e.g. same column is 'date' in one file but 'float' in another). This caches the master_schema after first run in order to avoid repeated compute and network calls to infer schema on all files. @@ -218,7 +216,7 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Mapping[str, Any] file_reader = self.fileformatparser_class(self._format) - for last_mod, filepath in self.time_ordered_filepath_iterator(): + for last_mod, filepath in self.get_time_ordered_filepaths(): # skip this file if it's earlier than min_datetime if (min_datetime is not None) and (last_mod < min_datetime): continue @@ -239,7 +237,7 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Mapping[str, Any] # this is to allow more leniency as we may be able to coerce this datatype mismatch on read according to provided schema state # if not, then the read will error anyway if col in self._schema.keys(): - self.logger.warn( + LOGGER.warn( f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. " + f"Should be '{master_schema[col]}', but found '{this_schema[col]}'. " + f"Airbyte will attempt to coerce this to {master_schema[col]} on read." @@ -258,7 +256,7 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Mapping[str, Any] if col not in master_schema.keys(): master_schema[col] = datatype - self.logger.info(f"determined master schema: {master_schema}") + LOGGER.info(f"determined master schema: {master_schema}") self.master_schema = master_schema return self.master_schema @@ -268,18 +266,20 @@ def stream_slices( ) -> Iterable[Optional[Mapping[str, Any]]]: """ This builds full-refresh stream_slices regardless of sync_mode param. - 1 file == 1 stream_slice. + For full refresh, 1 file == 1 stream_slice. + The structure of a stream slice is [ {file}, ... ]. + In incremental mode, a stream slice may have more than one file so we mirror that format here. Incremental stream_slices are implemented in the IncrementalFileStream child class. """ # TODO: this could be optimised via concurrent reads, however we'd lose chronology and need to deal with knock-ons of that # we could do this concurrently both full and incremental by running batches in parallel # and then incrementing the cursor per each complete batch - for last_mod, filepath in self.time_ordered_filepath_iterator(): + for last_mod, filepath in self.get_time_ordered_filepaths(): storagefile = self.storagefile_class(filepath, self._provider) yield [{"unique_url": storagefile.url, "last_modified": last_mod, "storagefile": storagefile}] - # in case we have no files - yield from [None] + # # in case we have no files + # yield from [None] def _match_target_schema(self, record: Mapping[str, Any], target_columns: List) -> Mapping[str, Any]: """ @@ -345,7 +345,7 @@ def _read_from_slice( }, ) yield complete_record - self.logger.info("finished reading a stream slice") + LOGGER.info("finished reading a stream slice") # Always return an empty generator just in case no records were ever yielded yield from [] @@ -412,7 +412,7 @@ def stream_slices( An incremental stream_slice is a group of all files with the exact same last_modified timestamp. This ensures we only update the cursor state to a given timestamp after ALL files with that timestamp have been successfully read. - Slight nuance: as we iterate through time_ordered_filepath_iterator(), + Slight nuance: as we iterate through get_time_ordered_filepaths(), we yield the stream_slice containing file(s) up to and EXcluding the file on the current iteration. The stream_slice is then cleared (if we yielded it) and this iteration's file appended to the (next) stream_slice """ @@ -429,7 +429,7 @@ def stream_slices( prev_file_last_mod = None # init variable to hold previous iterations last modified stream_slice = [] - for last_mod, filepath in self.time_ordered_filepath_iterator(): + for last_mod, filepath in self.get_time_ordered_filepaths(): # skip this file if last_mod is earlier than our cursor value from state if ( stream_state is not None diff --git a/airbyte-integrations/connectors/source-s3/source_s3/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/stream.py index 992a00208818..80782a1b7ee1 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/stream.py @@ -36,7 +36,7 @@ def _list_bucket(self, accept_key=lambda k: True) -> Iterator[str]: else: session = boto3session.Session() client_config = Config(signature_version=UNSIGNED) - client = make_s3_client(self._provider, config=client_config, session=session) + client = make_s3_client(provider, config=client_config, session=session) ctoken = None while True: From f0cc1229121777fc143556424c60b6a6de8cf01e Mon Sep 17 00:00:00 2001 From: George Claireaux Date: Fri, 15 Oct 2021 14:23:29 +0100 Subject: [PATCH 03/11] version bump --- .../69589781-7828-43c5-9f63-8925b1c1ccc2.json | 2 +- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-integrations/connectors/source-s3/Dockerfile | 2 +- docs/integrations/sources/s3.md | 1 + 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json index f8417079b2c0..c126e4b0a1a8 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "69589781-7828-43c5-9f63-8925b1c1ccc2", "name": "S3", "dockerRepository": "airbyte/source-s3", - "dockerImageTag": "0.1.5", + "dockerImageTag": "0.1.6", "documentationUrl": "https://docs.airbyte.io/integrations/sources/s3" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index cac1c3ed7e46..3777f6bb518a 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -85,7 +85,7 @@ - sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 name: S3 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.5 + dockerImageTag: 0.1.6 documentationUrl: https://docs.airbyte.io/integrations/sources/s3 sourceType: file - sourceDefinitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87 diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index 0dc3dde2ff6e..21ffd014d63c 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,7 +17,7 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.5 +LABEL io.airbyte.version=0.1.6 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 055167ee70e8..928bc676dc54 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -204,6 +204,7 @@ You can find details on [here](https://arrow.apache.org/docs/python/generated/py | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.6 | 2021-10-15 | [6615](https://github.com/airbytehq/airbyte/pull/6615) | Memory and performance optimisation | | 0.1.5 | 2021-09-24 | [6398](https://github.com/airbytehq/airbyte/pull/6398) | Support custom non Amazon S3 services | | 0.1.4 | 2021-08-13 | [5305](https://github.com/airbytehq/airbyte/pull/5305) | Support of Parquet format | | 0.1.3 | 2021-08-04 | [5197](https://github.com/airbytehq/airbyte/pull/5197) | Fixed bug where sync could hang indefinitely on schema inference | From b05ca560c33a59adb864123ab2219e0065a8630a Mon Sep 17 00:00:00 2001 From: Jingkun Zhuang Date: Fri, 15 Oct 2021 10:46:22 -0400 Subject: [PATCH 04/11] added advanced_options for reading csv without header, and more custom pyarrow ReadOptions --- .../source-s3/integration_tests/spec.json | 9 +++++++ .../formats/csv_parser.py | 5 +++- .../source_files_abstract/formats/csv_spec.py | 5 ++++ .../csv/test_file_8_no_header.csv | 8 ++++++ .../source-s3/unit_tests/test_csv_parser.py | 27 +++++++++++++++++++ 5 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 airbyte-integrations/connectors/source-s3/unit_tests/sample_files/csv/test_file_8_no_header.csv diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json index b826eb56aa83..9c8cb99621ad 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json @@ -93,6 +93,15 @@ "{\"timestamp_parsers\": [\"%m/%d/%Y %H:%M\", \"%Y/%m/%d %H:%M\"], \"strings_can_be_null\": true, \"null_values\": [\"NA\", \"NULL\"]}" ], "type": "string" + }, + "advanced_options": { + "title": "Advanced Options", + "description": "Optionally add a valid JSON string here to provide additional Pyarrow ReadOptions. Specify 'column_names' here if your CSV doesn't have header, or if you want to use custom column names. 'block_size' and 'encoding' are already used above, specify them again here will override the values above.", + "default": "{}", + "examples": [ + "{\"column_names\": [\"column1\", \"column2\"]}" + ], + "type": "string" } } }, diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py index c16794c493c0..08e8a13a0919 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py @@ -28,7 +28,10 @@ def _read_options(self): https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html build ReadOptions object like: pa.csv.ReadOptions(**self._read_options()) """ - return {"block_size": self._format.get("block_size", 10000), "encoding": self._format.get("encoding", "utf8")} + return { + **{"block_size": self._format.get("block_size", 10000), "encoding": self._format.get("encoding", "utf8")}, + **json.loads(self._format.get("advanced_options", "{}")), + } def _parse_options(self): """ diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py index 41cb701e8601..a05c22e71dd7 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py @@ -50,3 +50,8 @@ class Config: '{"timestamp_parsers": ["%m/%d/%Y %H:%M", "%Y/%m/%d %H:%M"], "strings_can_be_null": true, "null_values": ["NA", "NULL"]}' ], ) + advanced_options: str = Field( + default="{}", + description="Optionally add a valid JSON string here to provide additional Pyarrow ReadOptions. Specify 'column_names' here if your CSV doesn't have header, or if you want to use custom column names. 'block_size' and 'encoding' are already used above, specify them again here will override the values above.", + examples=["{\"column_names\": [\"column1\", \"column2\"]}"], + ) diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/csv/test_file_8_no_header.csv b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/csv/test_file_8_no_header.csv new file mode 100644 index 000000000000..e93df63f0460 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/csv/test_file_8_no_header.csv @@ -0,0 +1,8 @@ +1,PVdhmjb1,False,12,-31.3,2021-07-14,2021-07-14 15:30:09.224125 +2,j4DyXTS7,True,-8,41.6,2021-07-14,2021-07-14 15:30:09.224383 +3,v0w8fTME,False,7,-27.5,2021-07-14,2021-07-14 15:30:09.224527 +4,1q6jD8Np,False,-8,-6.7,2021-07-14,2021-07-14 15:30:09.224741 +5,77h4aiMP,True,-15,-13.7,2021-07-14,2021-07-14 15:30:09.224907 +6,Le35Wyic,True,3,35.3,2021-07-14,2021-07-14 15:30:09.225033 +7,xZhh1Kyl,False,10,-9.2,2021-07-14,2021-07-14 15:30:09.225145 +8,M2t286iJ,False,4,-3.5,2021-07-14,2021-07-14 15:30:09.225320 diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/test_csv_parser.py b/airbyte-integrations/connectors/source-s3/unit_tests/test_csv_parser.py index 623981af9de6..7d63667ae2d7 100644 --- a/airbyte-integrations/connectors/source-s3/unit_tests/test_csv_parser.py +++ b/airbyte-integrations/connectors/source-s3/unit_tests/test_csv_parser.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # +import json import os from pathlib import Path from typing import Any, List, Mapping @@ -249,4 +250,30 @@ def test_files(self) -> List[Mapping[str, Any]]: "line_checks": {}, "fails": ["test_get_inferred_schema", "test_stream_records"], }, + { + # no header test + "test_alias": "no header csv file", + "AbstractFileParser": CsvParser( + format={ + "filetype": "csv", + "advanced_options": json.dumps({ + "column_names": ["id", "name", "valid", "code", "degrees", "birthday", "last_seen"] + }) + }, + master_schema={} + ), + "filepath": os.path.join(SAMPLE_DIRECTORY, "csv/test_file_8_no_header.csv"), + "num_records": 8, + "inferred_schema": { + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + "line_checks": {}, + "fails": [], + }, ] From 53ab78e5a85c379b922b36330b90e2d7563901ff Mon Sep 17 00:00:00 2001 From: Jingkun Zhuang Date: Fri, 15 Oct 2021 10:47:02 -0400 Subject: [PATCH 05/11] updated to use the latest airbyte-cdk --- airbyte-integrations/connectors/source-s3/setup.py | 2 +- .../source-s3/source_s3/source_files_abstract/stream.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-s3/setup.py b/airbyte-integrations/connectors/source-s3/setup.py index ae62d1616611..77503623156f 100644 --- a/airbyte-integrations/connectors/source-s3/setup.py +++ b/airbyte-integrations/connectors/source-s3/setup.py @@ -6,7 +6,7 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk~=0.1.7", + "airbyte-cdk~=0.1.27", "pyarrow==4.0.1", "smart-open[s3]==5.1.0", "wcmatch==8.2", diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index 5f431bdc02be..5115d399a01c 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -12,7 +12,6 @@ from traceback import format_exc from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models.airbyte_protocol import SyncMode from airbyte_cdk.sources.streams import Stream from wcmatch.glob import GLOBSTAR, SPLIT, globmatch @@ -61,7 +60,6 @@ def __init__(self, dataset: str, provider: dict, format: dict, path_pattern: str self._schema = self._parse_user_input_schema(schema) self.master_schema = None self.storagefile_cache: Optional[List[Tuple[datetime, StorageFile]]] = None - self.logger = AirbyteLogger() self.logger.info(f"initialised stream with format: {format}") @staticmethod @@ -233,7 +231,7 @@ def _get_master_schema(self) -> Mapping[str, Any]: # this is to allow more leniency as we may be able to coerce this datatype mismatch on read according to provided schema state # if not, then the read will error anyway if col in self._schema.keys(): - self.logger.warn( + self.logger.warning( f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. " + f"Should be '{master_schema[col]}', but found '{this_schema[col]}'. " + f"Airbyte will attempt to coerce this to {master_schema[col]} on read." From 0e0c2c0e968da513ca49563677592eebb9d45984 Mon Sep 17 00:00:00 2001 From: Jingkun Zhuang Date: Fri, 15 Oct 2021 10:47:46 -0400 Subject: [PATCH 06/11] updated docs --- docs/integrations/sources/s3.md | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 055167ee70e8..5e723c780ea8 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -181,14 +181,16 @@ Since CSV files are effectively plain text, providing specific reader options is * `double_quote` : Whether two quotes in a quoted CSV value denote a single quote in the data. * `newlines_in_values` : Sometimes referred to as `multiline`. In most cases, newline characters signal the end of a row in a CSV, however text data may contain newline characters within it. Setting this to True allows correct parsing in this case. * `block_size` : This is the number of bytes to process in memory at a time while reading files. The default value here is usually fine but if your table is particularly wide \(lots of columns / data in fields is large\) then raising this might solve failures on detecting schema. Since this defines how much data to read into memory, raising this too high could cause Out Of Memory issues so use with caution. +* `additional_reader_options` : This allows for editing the less commonly required CSV [ConvertOptions](https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html#pyarrow.csv.ConvertOptions). The value must be a valid JSON string, e.g.: -The final setting in the UI is `additional_reader_options`. This is a catch-all to allow for editing the less commonly required CSV parsing options. The value must be a valid JSON string, e.g.: + ```text + {"timestamp_parsers": ["%m/%d/%Y %H:%M", "%Y/%m/%d %H:%M"], "strings_can_be_null": true, "null_values": ["NA", "NULL"]} + ``` +* `advanced_options` : This allows for editing the less commonly required CSV [ReadOptions](https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html#pyarrow.csv.ReadOptions). The value must be a valid JSON string. One use case for this is when your CSV has no header, or you want to use custom column names, you can specify `column_names` using this option. -```text -{"timestamp_parsers": ["%m/%d/%Y %H:%M", "%Y/%m/%d %H:%M"], "strings_can_be_null": true, "null_values": ["NA", "NULL"]} -``` - -You can find details on [available options here](https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html#pyarrow.csv.ConvertOptions). + ```test + {"column_names": ["column1", "column2", "column3"]} + ``` #### Parquet @@ -204,6 +206,7 @@ You can find details on [here](https://arrow.apache.org/docs/python/generated/py | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.6 | 2021-10-14 | [7058](https://github.com/airbytehq/airbyte/pull/7058) | Advanced options for CSV | | 0.1.5 | 2021-09-24 | [6398](https://github.com/airbytehq/airbyte/pull/6398) | Support custom non Amazon S3 services | | 0.1.4 | 2021-08-13 | [5305](https://github.com/airbytehq/airbyte/pull/5305) | Support of Parquet format | | 0.1.3 | 2021-08-04 | [5197](https://github.com/airbytehq/airbyte/pull/5197) | Fixed bug where sync could hang indefinitely on schema inference | From 438f631ce57e3686e38eac1f2916dddc55c5c4ae Mon Sep 17 00:00:00 2001 From: Jingkun Zhuang Date: Fri, 15 Oct 2021 10:48:22 -0400 Subject: [PATCH 07/11] bump source-s3 to 0.1.6 --- airbyte-integrations/connectors/source-s3/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index 0dc3dde2ff6e..21ffd014d63c 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,7 +17,7 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.5 +LABEL io.airbyte.version=0.1.6 LABEL io.airbyte.name=airbyte/source-s3 From 53d17223a582fd50b3e3177c03c0a79b7fc4ff05 Mon Sep 17 00:00:00 2001 From: George Claireaux Date: Mon, 18 Oct 2021 12:13:04 +0100 Subject: [PATCH 08/11] remove unneeded lines --- .../source-s3/source_s3/source_files_abstract/stream.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index 52b2026c28cd..c4bffaff8c74 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -278,8 +278,6 @@ def stream_slices( for last_mod, filepath in self.get_time_ordered_filepaths(): storagefile = self.storagefile_class(filepath, self._provider) yield [{"unique_url": storagefile.url, "last_modified": last_mod, "storagefile": storagefile}] - # # in case we have no files - # yield from [None] def _match_target_schema(self, record: Mapping[str, Any], target_columns: List) -> Mapping[str, Any]: """ From e9f26303474a5af1b132b9181c298c6b8f77658f Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Tue, 19 Oct 2021 20:37:27 +0800 Subject: [PATCH 09/11] Use the all dep ami for python builds. --- .github/workflows/publish-command.yml | 1 + .github/workflows/test-command.yml | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml index 317bd8d680d3..9fa6fc4e1917 100644 --- a/.github/workflows/publish-command.yml +++ b/.github/workflows/publish-command.yml @@ -34,6 +34,7 @@ jobs: aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} github-token: ${{ secrets.SELF_RUNNER_GITHUB_ACCESS_TOKEN }} + ec2-instance-id: ami-0d648081937c75a73 publish-image: needs: start-publish-image-runner runs-on: ${{ needs.start-publish-image-runner.outputs.label }} diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index 3a5b1359227c..8a7253ad3b89 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -33,6 +33,7 @@ jobs: aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} github-token: ${{ secrets.SELF_RUNNER_GITHUB_ACCESS_TOKEN }} + ec2-instance-id: ami-0d648081937c75a73 integration-test: timeout-minutes: 240 needs: start-test-runner From ee2ba4e8b9aecf1512079582bc786dd0ce6ae942 Mon Sep 17 00:00:00 2001 From: George Claireaux Date: Tue, 19 Oct 2021 14:44:14 +0100 Subject: [PATCH 10/11] ec2-instance-id should be ec2-image-id --- .github/workflows/publish-command.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml index 9fa6fc4e1917..0fd53f4ad86b 100644 --- a/.github/workflows/publish-command.yml +++ b/.github/workflows/publish-command.yml @@ -34,7 +34,7 @@ jobs: aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} github-token: ${{ secrets.SELF_RUNNER_GITHUB_ACCESS_TOKEN }} - ec2-instance-id: ami-0d648081937c75a73 + ec2-image-id: ami-0d648081937c75a73 publish-image: needs: start-publish-image-runner runs-on: ${{ needs.start-publish-image-runner.outputs.label }} From ee1dc9fd914b7b68429c7b3d3ba9131946faf15a Mon Sep 17 00:00:00 2001 From: George Claireaux Date: Tue, 19 Oct 2021 14:44:49 +0100 Subject: [PATCH 11/11] ec2-instance-id should be ec2-image-id --- .github/workflows/test-command.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index 8a7253ad3b89..6c6f98940e34 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -33,7 +33,7 @@ jobs: aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} github-token: ${{ secrets.SELF_RUNNER_GITHUB_ACCESS_TOKEN }} - ec2-instance-id: ami-0d648081937c75a73 + ec2-image-id: ami-0d648081937c75a73 integration-test: timeout-minutes: 240 needs: start-test-runner