Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/hubspot postgres error unknown column path #100

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions target_postgres/sql_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ def log_message(msg):

def _serialize_table_record_field_name(self, remote_schema, streamed_schema, path, value_json_schema):
"""
Returns the appropriate remote field (column) name for `field`.
Returns the appropriate remote field (column) name for `path`.

:param remote_schema: TABLE_SCHEMA(remote)
:param streamed_schema: TABLE_SCHEMA(local)
Expand Down Expand Up @@ -616,8 +616,9 @@ def _serialize_table_record_field_name(self, remote_schema, streamed_schema, pat
if not mapping is None:
return mapping

raise Exception('Unknown column path: {} for table: {}'.format(
raise Exception("A compatible column for path {} and JSONSchema {} in table {} cannot be found.".format(
path,
simple_json_schema,
remote_schema['path']
))

Expand Down Expand Up @@ -692,6 +693,9 @@ def _serialize_table_records(
value = default_paths[path]
json_schema_string_type = json_schema.python_type(value)

if not json_schema_string_type:
continue

## Serialize datetime to compatible format
if path in datetime_paths \
and json_schema_string_type == json_schema.STRING \
Expand All @@ -700,10 +704,8 @@ def _serialize_table_records(
value)
value_json_schema = {'type': json_schema.STRING,
'format': json_schema.DATE_TIME_FORMAT}
elif json_schema_string_type:
value_json_schema = {'type': json_schema_string_type}
else:
value_json_schema = json_schema.simple_type(streamed_schema['schema']['properties'][path])
value_json_schema = {'type': json_schema_string_type}

## Serialize NULL default value
value = self.serialize_table_record_null_value(remote_schema, streamed_schema, path, value)
Expand All @@ -713,9 +715,8 @@ def _serialize_table_records(
path,
value_json_schema)

if field_name in remote_fields \
and (not field_name in row
or row[field_name] == NULL_DEFAULT):
## `field_name` is unset
if row[field_name] == NULL_DEFAULT:
row[field_name] = value

serialized_rows.append(row)
Expand Down
121 changes: 106 additions & 15 deletions tests/test_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,31 @@ def assert_columns_equal(cursor, table_name, expected_column_tuples):
or set(columns) == expected_column_tuples


class BigCommerceStream:
def assert_count_equal(cursor, table_name, n):
cursor.execute('SELECT count(*) FROM "public"."{}"'.format(table_name))
assert cursor.fetchone()[0] == n


class SandboxStream:
idx = None
stream = NotImplementedError()

def __init__(self):
self.idx = -1

def __iter__(self):
return self

def __next__(self):
self.idx += 1

if self.idx < len(self.stream):
return json.dumps(self.stream[self.idx])

raise StopIteration


class BigCommerceStream(SandboxStream):
stream = [
{"type": "SCHEMA",
"stream": "products",
Expand Down Expand Up @@ -314,20 +337,6 @@ class BigCommerceStream:
"value": {"bookmarks": {"products": "2018-11-17T21:26:50+00:00",
"customers": "2018-11-17T21:25:01+00:00"}}}]

def __init__(self):
self.idx = -1

def __iter__(self):
return self

def __next__(self):
self.idx += 1

if self.idx < len(self.stream):
return json.dumps(self.stream[self.idx])

raise StopIteration


def test_bigcommerce__sandbox(db_cleanup):
main(CONFIG, input_stream=BigCommerceStream())
Expand Down Expand Up @@ -366,3 +375,85 @@ def test_bigcommerce__sandbox(db_cleanup):
('phone', 'text', 'YES'),
('last_name', 'text', 'YES')
})


class HubspotStream(SandboxStream):
stream = [
{"type": "SCHEMA",
"stream": "deals",
"schema": {
"type": "object",
"properties": {
"properties": {
"type": "object",
"properties": {
"num_contacted_notes": {
"type": "object",
"properties": {
"value": {
"type": ["null", "number", "string"]
}}}}}}},
"key_properties": []},
{"type": "RECORD",
"stream": "deals",
"record": {}},
{"type": "RECORD",
"stream": "deals",
"record": {
"properties": {}}},
{"type": "RECORD",
"stream": "deals",
"record": {
"properties": {
"num_contacted_notes": {}}}},
{"type": "RECORD",
"stream": "deals",
"record": {
"properties": {
"num_contacted_notes": {
"value": None}}}},
{"type": "RECORD",
"stream": "deals",
"record": {
"properties": {
"num_contacted_notes": {
"value": "helloworld"}}}},
{"type": "RECORD",
"stream": "deals",
"record": {
"properties": {
"num_contacted_notes": {
"value": 12345}}}},
{"type": "RECORD",
"stream": "deals",
"record": {
"properties": {
"num_contacted_notes": {
"value": 12345.6789}}}}]


def test_hubspot__sandbox(db_cleanup):
config = CONFIG.copy()
config['persist_empty_tables'] = True
main(config, input_stream=HubspotStream())

with psycopg2.connect(**TEST_DB) as conn:
with conn.cursor() as cur:
assert_tables_equal(cur,
{'deals'})

assert_columns_equal(cur,
'deals',
{
('_sdc_table_version', 'bigint', 'YES'),
('_sdc_received_at', 'timestamp with time zone', 'YES'),
('_sdc_sequence', 'bigint', 'YES'),
('_sdc_primary_key', 'text', 'NO'),
('_sdc_batched_at', 'timestamp with time zone', 'YES'),
('properties__num_contacted_notes__value__f', 'double precision', 'YES'),
('properties__num_contacted_notes__value__s', 'text', 'YES')
})

assert_count_equal(cur,
'deals',
7)