Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1571 Issue: adopt Best practice for Google Sheets Source #1668

Merged
merged 2 commits into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "71607ba1-c0ac-4799-8049-7f4b90dd50f7",
"name": "Google Sheets",
"dockerRepository": "airbyte/source-google-sheets",
"dockerImageTag": "0.1.5",
"dockerImageTag": "0.1.6",
"documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-google-sheets"
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
- sourceDefinitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7
name: Google Sheets
dockerRepository: airbyte/source-google-sheets
dockerImageTag: 0.1.5
dockerImageTag: 0.1.6
documentationUrl: https://hub.docker.com/repository/docker/airbyte/source-google-sheets
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ COPY $CODE_PATH ./$CODE_PATH
COPY setup.py ./
RUN pip install .

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/source-google-sheets
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
if err.resp.status == status_codes.NOT_FOUND:
reason = "Requested spreadsheet was not found."
logger.error(f"Formatted error: {reason}")
return AirbyteConnectionStatus(status=Status.FAILED, message=str(reason))
return AirbyteConnectionStatus(
status=Status.FAILED, message=f"Unable to connect with the provided credentials to spreadsheet. Error: {reason}"
)

return AirbyteConnectionStatus(status=Status.SUCCEEDED)

Expand All @@ -72,9 +74,12 @@ def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
sheet_names = [sheet.properties.title for sheet in spreadsheet_metadata.sheets]
streams = []
for sheet_name in sheet_names:
header_row_data = Helpers.get_first_row(client, spreadsheet_id, sheet_name)
stream = Helpers.headers_to_airbyte_stream(sheet_name, header_row_data)
streams.append(stream)
try:
header_row_data = Helpers.get_first_row(client, spreadsheet_id, sheet_name)
stream = Helpers.headers_to_airbyte_stream(sheet_name, header_row_data)
streams.append(stream)
except Exception as err:
logger.error(str(err))
return AirbyteCatalog(streams=streams)

except errors.HttpError as err:
Expand All @@ -99,8 +104,7 @@ def read(
logger.info(f"Syncing sheet {sheet}")
column_index_to_name = sheet_to_column_index_to_name[sheet]
row_cursor = 2 # we start syncing past the header row
encountered_blank_row = False
while not encountered_blank_row:
while True:
range = f"{sheet}!{row_cursor}:{row_cursor + ROW_BATCH_SIZE}"
logger.info(f"Fetching range {range}")
row_batch = SpreadsheetValues.parse_obj(
Expand All @@ -119,8 +123,7 @@ def read(

for row in row_values:
if Helpers.is_row_empty(row):
encountered_blank_row = True
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the correctness of these changes, but the previous code stopped reading data after it found one empty line, for example, if after the header line there is an empty line and then data, then we do not read this data. I made it so that all data in a specific ROW_BACTH_SIZE was read, and the cycle was interrupted when there were no records in this values_range.
Let me know please, if this is not the correct logic, I will revert the code to the previous state.

break
continue
elif Helpers.row_contains_relevant_data(row, column_index_to_name.keys()):
yield AirbyteMessage(type=Type.RECORD, record=Helpers.row_data_to_record_message(sheet, row, column_index_to_name))
logger.info(f"Finished syncing spreadsheet {spreadsheet_id}")
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def get_first_row(client, spreadsheet_id: str, sheet_name: str) -> List[str]:
raise Exception(f"Expected data for exactly one range for sheet {sheet_name}")

all_row_data = range_data[0].rowData
if len(all_row_data) != 1:
if not all_row_data or len(all_row_data) != 1:
raise Exception(f"Expected data for exactly one row for sheet {sheet_name}")

first_row_data = all_row_data[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
from pathlib import Path
from typing import Dict

from airbyte_protocol import ConfiguredAirbyteCatalog, ConnectorSpecification
from base_python_test import StandardSourceTestIface
from base_python_test import DefaultStandardSourceTest
from google_sheets_source.client import GoogleSheetsClient
from google_sheets_source.helpers import Helpers
from google_sheets_source.models.spreadsheet import Spreadsheet
Expand All @@ -42,22 +41,11 @@
]


class GoogleSheetsSourceStandardTest(StandardSourceTestIface):
def __init__(self):
pass

def get_spec(self) -> ConnectorSpecification:
raw_spec = pkgutil.get_data(self.__class__.__module__.split(".")[0], "spec.json")
return ConnectorSpecification.parse_obj(json.loads(raw_spec))

class GoogleSheetsSourceStandardTest(DefaultStandardSourceTest):
def get_config(self) -> object:
config = {"credentials_json": json.dumps(self._get_creds()), "spreadsheet_id": self._get_spreadsheet_id()}
return config

def get_catalog(self) -> ConfiguredAirbyteCatalog:
raw_catalog = pkgutil.get_data(self.__class__.__module__.split(".")[0], "configured_catalog.json")
return ConfiguredAirbyteCatalog.parse_obj(json.loads(raw_catalog))

def setup(self) -> None:
Path(self._get_tmp_dir()).mkdir(parents=True, exist_ok=True)

Expand Down