diff --git a/target_postgres/sql_base.py b/target_postgres/sql_base.py index db77e1d0..a45c4297 100644 --- a/target_postgres/sql_base.py +++ b/target_postgres/sql_base.py @@ -588,7 +588,7 @@ def log_message(msg): def _serialize_table_record_field_name(self, remote_schema, streamed_schema, path, value_json_schema): """ - Returns the appropriate remote field (column) name for `field`. + Returns the appropriate remote field (column) name for `path`. :param remote_schema: TABLE_SCHEMA(remote) :param streamed_schema: TABLE_SCHEMA(local) @@ -616,8 +616,9 @@ def _serialize_table_record_field_name(self, remote_schema, streamed_schema, pat if not mapping is None: return mapping - raise Exception('Unknown column path: {} for table: {}'.format( + raise Exception("A compatible column for path {} and JSONSchema {} in table {} cannot be found.".format( path, + simple_json_schema, remote_schema['path'] )) @@ -692,6 +693,9 @@ def _serialize_table_records( value = default_paths[path] json_schema_string_type = json_schema.python_type(value) + if not json_schema_string_type: + continue + ## Serialize datetime to compatible format if path in datetime_paths \ and json_schema_string_type == json_schema.STRING \ @@ -700,10 +704,8 @@ def _serialize_table_records( value) value_json_schema = {'type': json_schema.STRING, 'format': json_schema.DATE_TIME_FORMAT} - elif json_schema_string_type: - value_json_schema = {'type': json_schema_string_type} else: - value_json_schema = json_schema.simple_type(streamed_schema['schema']['properties'][path]) + value_json_schema = {'type': json_schema_string_type} ## Serialize NULL default value value = self.serialize_table_record_null_value(remote_schema, streamed_schema, path, value) @@ -713,9 +715,8 @@ def _serialize_table_records( path, value_json_schema) - if field_name in remote_fields \ - and (not field_name in row - or row[field_name] == NULL_DEFAULT): + ## `field_name` is unset + if row[field_name] == NULL_DEFAULT: row[field_name] = value serialized_rows.append(row) diff --git a/tests/test_sandbox.py b/tests/test_sandbox.py index f7332ea1..434a5aad 100644 --- a/tests/test_sandbox.py +++ b/tests/test_sandbox.py @@ -28,8 +28,31 @@ def assert_columns_equal(cursor, table_name, expected_column_tuples): or set(columns) == expected_column_tuples -class BigCommerceStream: +def assert_count_equal(cursor, table_name, n): + cursor.execute('SELECT count(*) FROM "public"."{}"'.format(table_name)) + assert cursor.fetchone()[0] == n + + +class SandboxStream: idx = None + stream = NotImplementedError() + + def __init__(self): + self.idx = -1 + + def __iter__(self): + return self + + def __next__(self): + self.idx += 1 + + if self.idx < len(self.stream): + return json.dumps(self.stream[self.idx]) + + raise StopIteration + + +class BigCommerceStream(SandboxStream): stream = [ {"type": "SCHEMA", "stream": "products", @@ -314,20 +337,6 @@ class BigCommerceStream: "value": {"bookmarks": {"products": "2018-11-17T21:26:50+00:00", "customers": "2018-11-17T21:25:01+00:00"}}}] - def __init__(self): - self.idx = -1 - - def __iter__(self): - return self - - def __next__(self): - self.idx += 1 - - if self.idx < len(self.stream): - return json.dumps(self.stream[self.idx]) - - raise StopIteration - def test_bigcommerce__sandbox(db_cleanup): main(CONFIG, input_stream=BigCommerceStream()) @@ -366,3 +375,85 @@ def test_bigcommerce__sandbox(db_cleanup): ('phone', 'text', 'YES'), ('last_name', 'text', 'YES') }) + + +class HubspotStream(SandboxStream): + stream = [ + {"type": "SCHEMA", + "stream": "deals", + "schema": { + "type": "object", + "properties": { + "properties": { + "type": "object", + "properties": { + "num_contacted_notes": { + "type": "object", + "properties": { + "value": { + "type": ["null", "number", "string"] + }}}}}}}, + "key_properties": []}, + {"type": "RECORD", + "stream": "deals", + "record": {}}, + {"type": "RECORD", + "stream": "deals", + "record": { + "properties": {}}}, + {"type": "RECORD", + "stream": "deals", + "record": { + "properties": { + "num_contacted_notes": {}}}}, + {"type": "RECORD", + "stream": "deals", + "record": { + "properties": { + "num_contacted_notes": { + "value": None}}}}, + {"type": "RECORD", + "stream": "deals", + "record": { + "properties": { + "num_contacted_notes": { + "value": "helloworld"}}}}, + {"type": "RECORD", + "stream": "deals", + "record": { + "properties": { + "num_contacted_notes": { + "value": 12345}}}}, + {"type": "RECORD", + "stream": "deals", + "record": { + "properties": { + "num_contacted_notes": { + "value": 12345.6789}}}}] + + +def test_hubspot__sandbox(db_cleanup): + config = CONFIG.copy() + config['persist_empty_tables'] = True + main(config, input_stream=HubspotStream()) + + with psycopg2.connect(**TEST_DB) as conn: + with conn.cursor() as cur: + assert_tables_equal(cur, + {'deals'}) + + assert_columns_equal(cur, + 'deals', + { + ('_sdc_table_version', 'bigint', 'YES'), + ('_sdc_received_at', 'timestamp with time zone', 'YES'), + ('_sdc_sequence', 'bigint', 'YES'), + ('_sdc_primary_key', 'text', 'NO'), + ('_sdc_batched_at', 'timestamp with time zone', 'YES'), + ('properties__num_contacted_notes__value__f', 'double precision', 'YES'), + ('properties__num_contacted_notes__value__s', 'text', 'YES') + }) + + assert_count_equal(cur, + 'deals', + 7)