Skip to content

Commit

Permalink
🐛 Source Marketo: handle null responses (#33623)
Browse files Browse the repository at this point in the history
Co-authored-by: richa-rochna <[email protected]>
Co-authored-by: Richa Rochna <[email protected]>
  • Loading branch information
3 people authored Dec 19, 2023
1 parent 52c5f58 commit 2003bc2
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9e0556f4-69df-4522-a3fb-03264d36b348
dockerImageTag: 1.2.2
dockerImageTag: 1.2.3
dockerRepository: airbyte/source-marketo
documentationUrl: https://docs.airbyte.com/integrations/sources/marketo
githubIssueLabel: source-marketo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,10 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
schema = self.get_json_schema()["properties"]
response.encoding = "utf-8"

reader = csv.DictReader(response.iter_lines(chunk_size=1024, decode_unicode=True))
response_lines = response.iter_lines(chunk_size=1024, decode_unicode=True)
filtered_response_lines = self.filter_null_bytes(response_lines)
reader = self.csv_rows(filtered_response_lines)

for record in reader:
new_record = {**record}
attributes = json.loads(new_record.pop("attributes", "{}"))
Expand All @@ -257,6 +260,23 @@ def read_records(
self.sleep_till_export_completed(stream_slice)
return super().read_records(sync_mode, cursor_field, stream_slice, stream_state)

def filter_null_bytes(self, response_lines: Iterable[str]) -> Iterable[str]:
for line in response_lines:
res = line.replace("\x00", "")
if len(res) < len(line):
self.logger.warning("Filter 'null' bytes from string, size reduced %d -> %d chars", len(line), len(res))
yield res

@staticmethod
def csv_rows(lines: Iterable[str]) -> Iterable[Mapping]:
reader = csv.reader(lines)
headers = None
for row in reader:
if headers is None:
headers = row
else:
yield dict(zip(headers, row))


class MarketoExportCreate(MarketoStream):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import pendulum
import pytest
from airbyte_cdk.models.airbyte_protocol import SyncMode
from source_marketo.source import Activities, Campaigns, Leads, MarketoStream, Programs, SourceMarketo
from source_marketo.source import Activities, Campaigns, IncrementalMarketoStream, Leads, MarketoStream, Programs, SourceMarketo


def test_create_export_job(mocker, send_email_stream, caplog):
Expand Down Expand Up @@ -314,3 +314,56 @@ def test_get_updated_state(config, latest_record, current_state, expected_state)
if expected_state == "start_date":
expected_state = {"updatedAt": config["start_date"]}
assert stream.get_updated_state(latest_record, current_state) == expected_state


def test_filter_null_bytes(config):
stream = Leads(config)

test_lines = [
"Hello\x00World\n",
"Name,Email\n",
"John\x00Doe,[email protected]\n"
]
expected_lines = [
"HelloWorld\n",
"Name,Email\n",
"JohnDoe,[email protected]\n"
]
filtered_lines = stream.filter_null_bytes(test_lines)
for expected_line, filtered_line in zip(expected_lines, filtered_lines):
assert expected_line == filtered_line


def test_csv_rows(config):
stream = Leads(config)

test_lines = [
"Name,Email\n",
"John Doe,[email protected]\n",
"Jane Doe,[email protected]\n"
]
expected_records = [
{"Name": "John Doe", "Email": "[email protected]"},
{"Name": "Jane Doe", "Email": "[email protected]"}
]
records = stream.csv_rows(test_lines)
for expected_record, record in zip(expected_records, records):
assert expected_record == record

def test_availablity_strategy(config):
stream = Leads(config)
assert stream.availability_strategy == None

def test_path(config):
stream = MarketoStream(config)
assert stream.path() == "rest/v1/marketo_stream.json"

def test_get_state(config):
stream = IncrementalMarketoStream(config)
assert stream.state == {}

def test_set_tate(config):
stream = IncrementalMarketoStream(config)
expected_state = {"id": 1}
stream.state = expected_state
assert stream._state == expected_state
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
#


from datetime import datetime

import pytest
from source_marketo.utils import clean_string, format_value
from source_marketo.utils import clean_string, format_value, to_datetime_str

test_data = [
(1, {"type": "integer"}, int),
Expand All @@ -15,11 +17,12 @@
("1.5", {"type": "integer"}, int),
("15", {"type": "integer"}, int),
("true", {"type": "boolean"}, bool),
("test_custom", {"type": "custom_type"}, str),
]


@pytest.mark.parametrize("value,schema,expected_output_type", test_data)
def test_fromat_value(value, schema, expected_output_type):
def test_format_value(value, schema, expected_output_type):
test = format_value(value, schema)

assert isinstance(test, expected_output_type)
Expand Down Expand Up @@ -55,3 +58,9 @@ def test_clean_string(value, expected):
test = clean_string(value)

assert test == expected

def test_to_datetime_str():
input = datetime(2023, 1, 1)
expected = "2023-01-01T00:00:00Z"

assert to_datetime_str(input) == expected
3 changes: 2 additions & 1 deletion docs/integrations/sources/marketo.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ If the 50,000 limit is too stringent, contact Marketo support for a quota increa

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------|
| 1.2.2 | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image |
| `1.2.3` | 2023-08-02 | [28999](https://github.com/airbytehq/airbyte/pull/28999) | Fix for ` _csv.Error: line contains NUL` |
| `1.2.2` | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image |
| `1.2.1` | 2023-09-18 | [30533](https://github.com/airbytehq/airbyte/pull/30533) | Fix `json_schema` for stream `Leads` |
| `1.2.0` | 2023-06-26 | [27726](https://github.com/airbytehq/airbyte/pull/27726) | License Update: Elv2 |
| `1.1.0` | 2023-04-18 | [23956](https://github.com/airbytehq/airbyte/pull/23956) | Add `Segmentations` Stream |
Expand Down

0 comments on commit 2003bc2

Please sign in to comment.