Skip to content

Commit

Permalink
feat(source-google-sheets): migrate low code (#50843)
Browse files Browse the repository at this point in the history
Co-authored-by: Octavia Squidington III <[email protected]>
Co-authored-by: Serhii Lazebnyi <[email protected]>
Co-authored-by: Serhii Lazebnyi <[email protected]>
  • Loading branch information
4 people authored Jan 30, 2025
1 parent 2c3e6a1 commit a4f6530
Show file tree
Hide file tree
Showing 65 changed files with 9,575 additions and 6,706 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[run]
omit =
source_google_sheets/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
},
{
"stream": {
"name": "Sheet6-4000-rows",
"name": "Sheet6-2000-rows",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from source_google_sheets.run import run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7
dockerImageTag: 0.8.5
dockerImageTag: 0.9.0-rc.1
dockerRepository: airbyte/source-google-sheets
documentationUrl: https://docs.airbyte.com/integrations/sources/google-sheets
githubIssueLabel: source-google-sheets
Expand All @@ -28,10 +28,13 @@ data:
oss:
enabled: true
releaseStage: generally_available
releases:
rolloutConfiguration:
enableProgressiveRollout: true
supportLevel: certified
tags:
- language:python
- cdk:python
- cdk:low-code
connectorTestSuitesOptions:
- suite: liveTests
testConnections:
Expand All @@ -40,17 +43,17 @@ data:
- suite: unitTests
- suite: acceptanceTests
testSecrets:
- name: SECRET_SOURCE-GOOGLE_SHEETS_CREDS
- name: SECRET_SOURCE-GOOGLE-SHEETS__CREDS
fileName: config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_SOURCE-GOOGLE_SHEETS_SERVICE_CREDS
- name: SECRET_SOURCE-GOOGLE-SHEETS_SERVICE__CREDS
fileName: service_config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_SOURCE-GOOGLE_SHEETS_WITH_URL_CREDS
- name: SECRET_SOURCE-GOOGLE-SHEETS_WITH_URL__CREDS
fileName: config_with_url.json
secretStore:
type: GSM
Expand Down
2,031 changes: 1,215 additions & 816 deletions airbyte-integrations/connectors/source-google-sheets/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.8.5"
version = "0.9.0-rc.1"
name = "source-google-sheets"
description = "Source implementation for Google Sheets."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -16,11 +16,8 @@ repository = "https://github.com/airbytehq/airbyte"
include = "source_google_sheets"

[tool.poetry.dependencies]
python = "^3.10"
airbyte-cdk = "^4"
google-auth-httplib2 = "==0.2.0"
Unidecode = "==1.3.8"
google-api-python-client = "==2.114.0"
python = ">=3.10,<3.13"
airbyte-cdk = "^6"

[tool.poetry.scripts]
source-google-sheets = "source_google_sheets.run:run"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from .source import SourceGoogleSheets

__all__ = ["SourceGoogleSheets"]

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from source_google_sheets.components.extractors import DpathSchemaMatchingExtractor, DpathSchemaExtractor
from source_google_sheets.components.partition_routers import RangePartitionRouter

__all__ = ["DpathSchemaMatchingExtractor", "RangePartitionRouter", "DpathSchemaExtractor"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union

import dpath
import requests

from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config
from source_google_sheets.utils import name_conversion, safe_name_conversion


class RawSchemaParser:
config: Config

def _extract_data(
self,
body: Mapping[str, Any],
extraction_path: Optional[List[Union[InterpolatedString, str]]] = None,
default: Any = None,
) -> Any:
"""
Extracts data from the body based on the provided extraction path.
"""

if not extraction_path:
return body

path = [node.eval(self.config) if not isinstance(node, str) else node for node in extraction_path]

return dpath.get(body, path, default=default) # type: ignore # extracted

def _set_data(
self, value: Any, body: MutableMapping[str, Any], extraction_path: Optional[List[Union[InterpolatedString, str]]] = None
) -> Any:
"""
Sets data in the body based on the provided extraction path.
"""
if not extraction_path:
body = value

path = [node.eval(self.config) if not isinstance(node, str) else node for node in extraction_path]

dpath.set(body, path, value=value)

def parse_raw_schema_values(
self,
raw_schema_data: MutableMapping[Any, Any],
schema_pointer: List[Union[InterpolatedString, str]],
key_pointer: List[Union[InterpolatedString, str]],
names_conversion: bool,
):
"""
1. Parses sheet headers from the provided raw schema. This method assumes that data is contiguous
i.e: every cell contains a value and the first cell which does not contain a value denotes the end
of the headers.
2. Makes name conversion if required.
3. Removes duplicated fields from the schema.
Return a list of tuples with correct property index (by found in array), value and raw_schema
"""
raw_schema_properties = self._extract_data(raw_schema_data, schema_pointer, default=[])
duplicate_fields = set()
parsed_schema_values = []
seen_values = set()
for property_index, raw_schema_property in enumerate(raw_schema_properties):
raw_schema_property_value = self._extract_data(raw_schema_property, key_pointer)
if not raw_schema_property_value:
break
if names_conversion:
raw_schema_property_value = safe_name_conversion(raw_schema_property_value)

if raw_schema_property_value in seen_values:
duplicate_fields.add(raw_schema_property_value)
seen_values.add(raw_schema_property_value)
parsed_schema_values.append((property_index, raw_schema_property_value, raw_schema_property))

if duplicate_fields:
parsed_schema_values = [
parsed_schema_value for parsed_schema_value in parsed_schema_values if parsed_schema_value[1] not in duplicate_fields
]

return parsed_schema_values

def parse(self, schema_type_identifier, records: Iterable[MutableMapping[Any, Any]]):
"""Removes duplicated fields and makes names conversion"""
names_conversion = self.config.get("names_conversion", False)
schema_pointer = schema_type_identifier.get("schema_pointer")
key_pointer = schema_type_identifier["key_pointer"]
parsed_properties = []
for raw_schema_data in records:
for _, parsed_value, raw_schema_property in self.parse_raw_schema_values(
raw_schema_data, schema_pointer, key_pointer, names_conversion
):
self._set_data(parsed_value, raw_schema_property, key_pointer)
parsed_properties.append(raw_schema_property)
self._set_data(parsed_properties, raw_schema_data, schema_pointer)
yield raw_schema_data


@dataclass
class DpathSchemaMatchingExtractor(DpathExtractor, RawSchemaParser):
"""
Current DpathExtractor has problems for this type of data in response:
[
{
"values": [
[
"name1",
"22"
],
[
"name2",
"24"
],
[
"name3",
"25"
]
]
}
]
This is because "values" field is a list of lists instead of objects that we could extract with "*".
In order to do so we need the ordered properties from the schema that we can match with each list of values.
Then, if we get a properties object like {0: 'name', 1: 'age'} we end up with:
{"type":"RECORD","record":{"stream":"a_stream_name","data":{"name":"name1","age":"22"},"emitted_at":1734371904128}}
{"type":"RECORD","record":{"stream":"a_stream_name","data":{"name":"name2","age":"24"},"emitted_at":1734371904134}}
{"type":"RECORD","record":{"stream":"a_stream_name","data":{"name":"name3","age":"25"},"emitted_at":1734371904134}}
"""

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
super().__post_init__(parameters)
self.decoder = JsonDecoder(parameters={})
self._values_to_match_key = parameters["values_to_match_key"]
schema_type_identifier = parameters["schema_type_identifier"]
names_conversion = self.config.get("names_conversion", False)
self._indexed_properties_to_match = self.extract_properties_to_match(
parameters["properties_to_match"], schema_type_identifier, names_conversion=names_conversion
)

def extract_properties_to_match(self, properties_to_match, schema_type_identifier, names_conversion):
schema_pointer = schema_type_identifier.get("schema_pointer")
key_pointer = schema_type_identifier["key_pointer"]
indexed_properties = {}
for property_index, property_parsed_value, _ in self.parse_raw_schema_values(
properties_to_match, schema_pointer, key_pointer, names_conversion
):
indexed_properties[property_index] = property_parsed_value
return indexed_properties

@staticmethod
def match_properties_with_values(unmatched_values: List[str], indexed_properties: Dict[int, str]):
data = {}
for relevant_index in sorted(indexed_properties.keys()):
if relevant_index >= len(unmatched_values):
break

unmatch_value = unmatched_values[relevant_index]
if unmatch_value.strip() != "":
data[indexed_properties[relevant_index]] = unmatch_value
yield data

@staticmethod
def is_row_empty(cell_values: List[str]) -> bool:
for cell in cell_values:
if cell.strip() != "":
return False
return True

@staticmethod
def row_contains_relevant_data(cell_values: List[str], relevant_indices: Iterable[int]) -> bool:
for idx in relevant_indices:
if len(cell_values) > idx and cell_values[idx].strip() != "":
return True
return False

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
raw_records_extracted = super().extract_records(response=response)
for raw_record in raw_records_extracted:
unmatched_values_collection = raw_record.get(self._values_to_match_key, [])
for unmatched_values in unmatched_values_collection:
if not DpathSchemaMatchingExtractor.is_row_empty(
unmatched_values
) and DpathSchemaMatchingExtractor.row_contains_relevant_data(unmatched_values, self._indexed_properties_to_match.keys()):
yield from DpathSchemaMatchingExtractor.match_properties_with_values(
unmatched_values, self._indexed_properties_to_match
)


class DpathSchemaExtractor(DpathExtractor, RawSchemaParser):
"""
Makes names conversion and parses sheet headers from the provided row.
"""

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
super().__post_init__(parameters)
self.schema_type_identifier = parameters["schema_type_identifier"]

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
extracted_records = super().extract_records(response=response)
yield from self.parse(schema_type_identifier=self.schema_type_identifier, records=extracted_records)
Loading

0 comments on commit a4f6530

Please sign in to comment.