From ba78425c8ea7e0843865b50067c4deed8d3467eb Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Mon, 11 Mar 2019 16:40:49 -0500 Subject: [PATCH 1/3] Fail: reproduce unknown column error --- tests/test_sandbox.py | 121 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 106 insertions(+), 15 deletions(-) diff --git a/tests/test_sandbox.py b/tests/test_sandbox.py index f7332ea1..5fccdc49 100644 --- a/tests/test_sandbox.py +++ b/tests/test_sandbox.py @@ -28,8 +28,31 @@ def assert_columns_equal(cursor, table_name, expected_column_tuples): or set(columns) == expected_column_tuples -class BigCommerceStream: +def assert_count_equal(cursor, table_name, n): + cursor.execute('SELECT count(*) FROM "public"."{}"'.format(table_name)) + assert cursor.fetchone()[0] == n + + +class SandboxStream: idx = None + stream = NotImplementedError() + + def __init__(self): + self.idx = -1 + + def __iter__(self): + return self + + def __next__(self): + self.idx += 1 + + if self.idx < len(self.stream): + return json.dumps(self.stream[self.idx]) + + raise StopIteration + + +class BigCommerceStream(SandboxStream): stream = [ {"type": "SCHEMA", "stream": "products", @@ -314,20 +337,6 @@ class BigCommerceStream: "value": {"bookmarks": {"products": "2018-11-17T21:26:50+00:00", "customers": "2018-11-17T21:25:01+00:00"}}}] - def __init__(self): - self.idx = -1 - - def __iter__(self): - return self - - def __next__(self): - self.idx += 1 - - if self.idx < len(self.stream): - return json.dumps(self.stream[self.idx]) - - raise StopIteration - def test_bigcommerce__sandbox(db_cleanup): main(CONFIG, input_stream=BigCommerceStream()) @@ -366,3 +375,85 @@ def test_bigcommerce__sandbox(db_cleanup): ('phone', 'text', 'YES'), ('last_name', 'text', 'YES') }) + + +class HubspotStream(SandboxStream): + stream = [ + {"type": "SCHEMA", + "stream": "deals", + "schema": { + "type": "object", + "properties": { + "properties": { + "type": "object", + "properties": { + "num_contacted_notes": { + "type": "object", + "properties": { + "value": { + "type": ["null", "number", "string"] + }}}}}}}, + "key_properties": []}, + {"type": "RECORD", + "stream": "deals", + "record": {}}, + {"type": "RECORD", + "stream": "deals", + "record": { + "properties": {}}}, + {"type": "RECORD", + "stream": "deals", + "record": { + "properties": { + "num_contacted_notes": {}}}}, + {"type": "RECORD", + "stream": "deals", + "record": { + "properties": { + "num_contacted_notes": { + "value": None}}}}, + {"type": "RECORD", + "stream": "deals", + "record": { + "properties": { + "num_contacted_notes": { + "value": "helloworld"}}}}, + {"type": "RECORD", + "stream": "deals", + "record": { + "properties": { + "num_contacted_notes": { + "value": 12345}}}}, + {"type": "RECORD", + "stream": "deals", + "record": { + "properties": { + "num_contacted_notes": { + "value": 12345.6789}}}}, ] + + +def test_hubspot__sandbox(db_cleanup): + config = CONFIG.copy() + config['persist_empty_tables'] = True + main(config, input_stream=HubspotStream()) + + with psycopg2.connect(**TEST_DB) as conn: + with conn.cursor() as cur: + assert_tables_equal(cur, + {'deals'}) + + assert_columns_equal(cur, + 'deals', + { + ('_sdc_table_version', 'bigint', 'YES'), + ('_sdc_received_at', 'timestamp with time zone', 'YES'), + ('_sdc_sequence', 'bigint', 'YES'), + ('_sdc_primary_key', 'text', 'NO'), + ('_sdc_batched_at', 'timestamp with time zone', 'YES'), + ('properties__num_contacted_notes__value__f', 'double precision', 'YES'), + ('properties__num_contacted_notes__value__s', 'text', 'YES') + }) + + assert_count_equal(cur, + 'deals', + 7) From 534b6780609582eef0fe605fd8cfc3bda11c7843 Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Mon, 11 Mar 2019 17:38:03 -0500 Subject: [PATCH 2/3] Fix: Unknown column path --- target_postgres/sql_base.py | 32 +++++++++++++++++++++++--------- tests/test_sandbox.py | 2 +- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/target_postgres/sql_base.py b/target_postgres/sql_base.py index db77e1d0..6f3ec579 100644 --- a/target_postgres/sql_base.py +++ b/target_postgres/sql_base.py @@ -588,7 +588,7 @@ def log_message(msg): def _serialize_table_record_field_name(self, remote_schema, streamed_schema, path, value_json_schema): """ - Returns the appropriate remote field (column) name for `field`. + Returns the appropriate remote field (column) name for `path`. :param remote_schema: TABLE_SCHEMA(remote) :param streamed_schema: TABLE_SCHEMA(local) @@ -616,9 +616,12 @@ def _serialize_table_record_field_name(self, remote_schema, streamed_schema, pat if not mapping is None: return mapping - raise Exception('Unknown column path: {} for table: {}'.format( + raise Exception("Column path: {} for table: {} for the value's JSONSchema {} cannot be found." \ + + " This means that the `remote_schema` has no compatible column for this path" \ + + " and value type.".format( path, - remote_schema['path'] + remote_schema['path'], + simple_json_schema )) def serialize_table_record_null_value( @@ -692,6 +695,9 @@ def _serialize_table_records( value = default_paths[path] json_schema_string_type = json_schema.python_type(value) + if not json_schema_string_type: + continue + ## Serialize datetime to compatible format if path in datetime_paths \ and json_schema_string_type == json_schema.STRING \ @@ -700,10 +706,8 @@ def _serialize_table_records( value) value_json_schema = {'type': json_schema.STRING, 'format': json_schema.DATE_TIME_FORMAT} - elif json_schema_string_type: - value_json_schema = {'type': json_schema_string_type} else: - value_json_schema = json_schema.simple_type(streamed_schema['schema']['properties'][path]) + value_json_schema = {'type': json_schema_string_type} ## Serialize NULL default value value = self.serialize_table_record_null_value(remote_schema, streamed_schema, path, value) @@ -713,9 +717,19 @@ def _serialize_table_records( path, value_json_schema) - if field_name in remote_fields \ - and (not field_name in row - or row[field_name] == NULL_DEFAULT): + if not field_name in row: + raise Exception( + "Unexpected field_name serialization! Field {} doesn't exist. Please create an issue at: {}." \ + + " Arguments are: {}".format( + field_name, + {'remote_schema': remote_schema, + 'streamed_schema': streamed_schema, + 'path': path, + 'value_json_schema': value_json_schema}, + "https://github.com/datamill-co/target-postgres/issues")) + + ## `field_name` is unset + if row[field_name] == NULL_DEFAULT: row[field_name] = value serialized_rows.append(row) diff --git a/tests/test_sandbox.py b/tests/test_sandbox.py index 5fccdc49..434a5aad 100644 --- a/tests/test_sandbox.py +++ b/tests/test_sandbox.py @@ -429,7 +429,7 @@ class HubspotStream(SandboxStream): "record": { "properties": { "num_contacted_notes": { - "value": 12345.6789}}}}, ] + "value": 12345.6789}}}}] def test_hubspot__sandbox(db_cleanup): From 7e0de667da8ae9311b3b680dcd9477a1f80c93f6 Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Thu, 14 Mar 2019 12:40:59 -0500 Subject: [PATCH 3/3] Housekeeping: Refine error messages in sql-base --- target_postgres/sql_base.py | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/target_postgres/sql_base.py b/target_postgres/sql_base.py index 6f3ec579..a45c4297 100644 --- a/target_postgres/sql_base.py +++ b/target_postgres/sql_base.py @@ -616,12 +616,10 @@ def _serialize_table_record_field_name(self, remote_schema, streamed_schema, pat if not mapping is None: return mapping - raise Exception("Column path: {} for table: {} for the value's JSONSchema {} cannot be found." \ - + " This means that the `remote_schema` has no compatible column for this path" \ - + " and value type.".format( + raise Exception("A compatible column for path {} and JSONSchema {} in table {} cannot be found.".format( path, - remote_schema['path'], - simple_json_schema + simple_json_schema, + remote_schema['path'] )) def serialize_table_record_null_value( @@ -717,17 +715,6 @@ def _serialize_table_records( path, value_json_schema) - if not field_name in row: - raise Exception( - "Unexpected field_name serialization! Field {} doesn't exist. Please create an issue at: {}." \ - + " Arguments are: {}".format( - field_name, - {'remote_schema': remote_schema, - 'streamed_schema': streamed_schema, - 'path': path, - 'value_json_schema': value_json_schema}, - "https://github.com/datamill-co/target-postgres/issues")) - ## `field_name` is unset if row[field_name] == NULL_DEFAULT: row[field_name] = value