Skip to content

Commit

Permalink
Merge pull request #100 from AlexanderMann/fix/hubspot-postgres-error…
Browse files Browse the repository at this point in the history
…--unknown-column-path

Fix/hubspot postgres error  unknown column path
  • Loading branch information
AlexanderMann authored Mar 14, 2019
2 parents 16c49b4 + 7e0de66 commit 4007f8e
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 23 deletions.
17 changes: 9 additions & 8 deletions target_postgres/sql_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ def log_message(msg):

def _serialize_table_record_field_name(self, remote_schema, streamed_schema, path, value_json_schema):
"""
Returns the appropriate remote field (column) name for `field`.
Returns the appropriate remote field (column) name for `path`.
:param remote_schema: TABLE_SCHEMA(remote)
:param streamed_schema: TABLE_SCHEMA(local)
Expand Down Expand Up @@ -616,8 +616,9 @@ def _serialize_table_record_field_name(self, remote_schema, streamed_schema, pat
if not mapping is None:
return mapping

raise Exception('Unknown column path: {} for table: {}'.format(
raise Exception("A compatible column for path {} and JSONSchema {} in table {} cannot be found.".format(
path,
simple_json_schema,
remote_schema['path']
))

Expand Down Expand Up @@ -692,6 +693,9 @@ def _serialize_table_records(
value = default_paths[path]
json_schema_string_type = json_schema.python_type(value)

if not json_schema_string_type:
continue

## Serialize datetime to compatible format
if path in datetime_paths \
and json_schema_string_type == json_schema.STRING \
Expand All @@ -700,10 +704,8 @@ def _serialize_table_records(
value)
value_json_schema = {'type': json_schema.STRING,
'format': json_schema.DATE_TIME_FORMAT}
elif json_schema_string_type:
value_json_schema = {'type': json_schema_string_type}
else:
value_json_schema = json_schema.simple_type(streamed_schema['schema']['properties'][path])
value_json_schema = {'type': json_schema_string_type}

## Serialize NULL default value
value = self.serialize_table_record_null_value(remote_schema, streamed_schema, path, value)
Expand All @@ -713,9 +715,8 @@ def _serialize_table_records(
path,
value_json_schema)

if field_name in remote_fields \
and (not field_name in row
or row[field_name] == NULL_DEFAULT):
## `field_name` is unset
if row[field_name] == NULL_DEFAULT:
row[field_name] = value

serialized_rows.append(row)
Expand Down
121 changes: 106 additions & 15 deletions tests/test_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,31 @@ def assert_columns_equal(cursor, table_name, expected_column_tuples):
or set(columns) == expected_column_tuples


class BigCommerceStream:
def assert_count_equal(cursor, table_name, n):
cursor.execute('SELECT count(*) FROM "public"."{}"'.format(table_name))
assert cursor.fetchone()[0] == n


class SandboxStream:
idx = None
stream = NotImplementedError()

def __init__(self):
self.idx = -1

def __iter__(self):
return self

def __next__(self):
self.idx += 1

if self.idx < len(self.stream):
return json.dumps(self.stream[self.idx])

raise StopIteration


class BigCommerceStream(SandboxStream):
stream = [
{"type": "SCHEMA",
"stream": "products",
Expand Down Expand Up @@ -314,20 +337,6 @@ class BigCommerceStream:
"value": {"bookmarks": {"products": "2018-11-17T21:26:50+00:00",
"customers": "2018-11-17T21:25:01+00:00"}}}]

def __init__(self):
self.idx = -1

def __iter__(self):
return self

def __next__(self):
self.idx += 1

if self.idx < len(self.stream):
return json.dumps(self.stream[self.idx])

raise StopIteration


def test_bigcommerce__sandbox(db_cleanup):
main(CONFIG, input_stream=BigCommerceStream())
Expand Down Expand Up @@ -366,3 +375,85 @@ def test_bigcommerce__sandbox(db_cleanup):
('phone', 'text', 'YES'),
('last_name', 'text', 'YES')
})


class HubspotStream(SandboxStream):
stream = [
{"type": "SCHEMA",
"stream": "deals",
"schema": {
"type": "object",
"properties": {
"properties": {
"type": "object",
"properties": {
"num_contacted_notes": {
"type": "object",
"properties": {
"value": {
"type": ["null", "number", "string"]
}}}}}}},
"key_properties": []},
{"type": "RECORD",
"stream": "deals",
"record": {}},
{"type": "RECORD",
"stream": "deals",
"record": {
"properties": {}}},
{"type": "RECORD",
"stream": "deals",
"record": {
"properties": {
"num_contacted_notes": {}}}},
{"type": "RECORD",
"stream": "deals",
"record": {
"properties": {
"num_contacted_notes": {
"value": None}}}},
{"type": "RECORD",
"stream": "deals",
"record": {
"properties": {
"num_contacted_notes": {
"value": "helloworld"}}}},
{"type": "RECORD",
"stream": "deals",
"record": {
"properties": {
"num_contacted_notes": {
"value": 12345}}}},
{"type": "RECORD",
"stream": "deals",
"record": {
"properties": {
"num_contacted_notes": {
"value": 12345.6789}}}}]


def test_hubspot__sandbox(db_cleanup):
config = CONFIG.copy()
config['persist_empty_tables'] = True
main(config, input_stream=HubspotStream())

with psycopg2.connect(**TEST_DB) as conn:
with conn.cursor() as cur:
assert_tables_equal(cur,
{'deals'})

assert_columns_equal(cur,
'deals',
{
('_sdc_table_version', 'bigint', 'YES'),
('_sdc_received_at', 'timestamp with time zone', 'YES'),
('_sdc_sequence', 'bigint', 'YES'),
('_sdc_primary_key', 'text', 'NO'),
('_sdc_batched_at', 'timestamp with time zone', 'YES'),
('properties__num_contacted_notes__value__f', 'double precision', 'YES'),
('properties__num_contacted_notes__value__s', 'text', 'YES')
})

assert_count_equal(cur,
'deals',
7)

0 comments on commit 4007f8e

Please sign in to comment.