diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py index e0299bcb..67e4783e 100644 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -16,6 +16,7 @@ from tap_shopify.exceptions import ShopifyError from tap_shopify.streams.base import shopify_error_handling, get_request_timeout import tap_shopify.streams # Load stream objects into Context +from tap_shopify.rule_map import RuleMap REQUIRED_CONFIG_KEYS = ["shop", "api_key"] LOGGER = singer.get_logger() @@ -53,7 +54,7 @@ def load_schemas(): return schemas -def get_discovery_metadata(stream, schema): +def get_discovery_metadata(stream, schema, rule_map, stream_name): mdata = metadata.new() mdata = metadata.write(mdata, (), 'table-key-properties', stream.key_properties) mdata = metadata.write(mdata, (), 'forced-replication-method', stream.replication_method) @@ -61,14 +62,52 @@ def get_discovery_metadata(stream, schema): if stream.replication_key: mdata = metadata.write(mdata, (), 'valid-replication-keys', [stream.replication_key]) + if 'stream_name' in rule_map: + # Write original-name of stream name in top level metadata + mdata = metadata.write(mdata, (), 'original-name', stream_name) + for field_name in schema['properties'].keys(): if field_name in stream.key_properties or field_name == stream.replication_key: mdata = metadata.write(mdata, ('properties', field_name), 'inclusion', 'automatic') else: mdata = metadata.write(mdata, ('properties', field_name), 'inclusion', 'available') + # Add metadata for nested(child) fields also if it's name is changed from original name. + add_child_into_metadata(schema['properties'][field_name], metadata, mdata, + rule_map, ('properties', field_name), ) + if ('properties', field_name) in rule_map: + mdata.get(('properties', field_name)).update( + {'original-name': rule_map[('properties', field_name)]}) + return metadata.to_list(mdata) +def add_child_into_metadata(schema, m_data, mdata, rule_map, parent=()): + """ + Add metadata for nested(child) fields also if it's name is changed from original name. + """ + if schema and isinstance(schema, dict) and schema.get('properties'): + for key in schema['properties'].keys(): + # prepare key to find original-name of field in rule_map object + # Key is tuple of items found in breadcrumb. + breadcrumb = parent + ('properties', key) + + # Iterate in recursive manner to go through each field of schema. + add_child_into_metadata(schema['properties'][key], m_data, mdata, rule_map, breadcrumb) + + mdata = m_data.write(mdata, breadcrumb, 'inclusion', 'available') + + if breadcrumb in rule_map: + # Add `original-name` field in metadata which contain actual name of field. + mdata.get(breadcrumb).update({'original-name': rule_map[breadcrumb]}) + + if schema.get('anyOf'): + for schema_fields in schema.get('anyOf'): + add_child_into_metadata(schema_fields, m_data, mdata, rule_map, parent) + + if schema and isinstance(schema, dict) and schema.get('items'): + breadcrumb = parent + ('items',) + add_child_into_metadata(schema['items'], m_data, mdata, rule_map, breadcrumb) + def load_schema_references(): shared_schema_file = "definitions.json" shared_schema_path = get_abs_path('schemas/') @@ -84,7 +123,7 @@ def add_synthetic_key_to_schema(schema): schema['properties']['_sdc_shop_' + k] = {'type': ["null", SDC_KEYS[k]]} return schema -def discover(): +def discover(rule_map): initialize_shopify_client() # Checking token in discover mode raw_schemas = load_schemas() @@ -105,12 +144,32 @@ def discover(): catalog_schema = add_synthetic_key_to_schema( singer.resolve_schema_references(schema, refs_copy)) + # Define stream_name in GetStdFieldsFromApiFields + rule_map.GetStdFieldsFromApiFields[schema_name] = {} + + # We face issue regarding ref. In some of the schema same ref is being used. + # When we change fields of one of the ref, changes reflect in all the places + # where the same ref is being used. + # Due to this, the `original-name` field name was missing in the metadata of the catalog. + # So, to prevent change in the actual schema, here we are creating a deep copy of schema + # and updating deep copy. + # We do not update the actual schema + schema_copy = copy.deepcopy(schema) + + # Get updated schema by applying rule map + standard_catalog_schema = rule_map.apply_ruleset_on_schema(catalog_schema, + schema_copy, schema_name) + + # Get standard name of schema + standard_schema_name = rule_map.apply_rule_set_on_stream_name(schema_name) + # create and add catalog entry catalog_entry = { - 'stream': schema_name, - 'tap_stream_id': schema_name, - 'schema': catalog_schema, - 'metadata': get_discovery_metadata(stream, schema), + 'stream': standard_schema_name, + 'tap_stream_id': standard_schema_name, + 'schema': standard_catalog_schema, + 'metadata': get_discovery_metadata(stream, standard_catalog_schema, + rule_map.GetStdFieldsFromApiFields[schema_name], schema_name), 'key_properties': stream.key_properties, 'replication_key': stream.replication_key, 'replication_method': stream.replication_method @@ -133,7 +192,7 @@ def shuffle_streams(stream_name): Context.catalog["streams"] = top_half + bottom_half # pylint: disable=too-many-locals -def sync(): +def sync(rule_map): shop_attributes = initialize_shopify_client() sdc_fields = {"_sdc_shop_" + x: shop_attributes[x] for x in SDC_KEYS} @@ -157,6 +216,10 @@ def sync(): stream_id = catalog_entry['tap_stream_id'] stream = Context.stream_objects[stream_id]() + # Fill rule_map object by original-name available in metadata + rule_map.fill_rule_map_object_by_catalog(stream_id, + metadata.to_map(catalog_entry['metadata'])) + if not Context.is_selected(stream_id): LOGGER.info('Skipping stream: %s', stream_id) continue @@ -172,6 +235,10 @@ def sync(): extraction_time = singer.utils.now() record_schema = catalog_entry['schema'] record_metadata = metadata.to_map(catalog_entry['metadata']) + + # Apply rule map on record + rec = rule_map.apply_ruleset_on_api_response(rec, stream_id) + rec = transformer.transform({**rec, **sdc_fields}, record_schema, record_metadata) @@ -197,9 +264,11 @@ def main(): Context.config = args.config Context.state = args.state + rule_map = RuleMap() + # If discover flag was passed, run discovery mode and dump output to stdout if args.discover: - catalog = discover() + catalog = discover(rule_map) print(json.dumps(catalog, indent=2)) # Otherwise run in sync mode else: @@ -207,9 +276,9 @@ def main(): if args.catalog: Context.catalog = args.catalog.to_dict() else: - Context.catalog = discover() + Context.catalog = discover(rule_map) - sync() + sync(rule_map) except pyactiveresource.connection.ResourceNotFound as exc: raise ShopifyError(exc, 'Ensure shop is entered correctly') from exc except pyactiveresource.connection.UnauthorizedAccess as exc: diff --git a/tap_shopify/rule_map.py b/tap_shopify/rule_map.py new file mode 100644 index 00000000..8041cbc4 --- /dev/null +++ b/tap_shopify/rule_map.py @@ -0,0 +1,239 @@ +import re +import singer + +# These are standard keys defined in the JSON Schema spec. +# We will not apply rules on these STANDARD_KEYS. +STANDARD_KEYS = [ + 'selected', + 'inclusion', + 'description', + 'minimum', + 'maximum', + 'exclusiveMinimum', + 'exclusiveMaximum', + 'multipleOf', + 'maxLength', + 'minLength', + 'format', + 'type', + 'additionalProperties', + 'anyOf', + 'patternProperties', +] + + +LOGGER = singer.get_logger() + +class RuleMap: + GetStdFieldsFromApiFields = {} + + + def fill_rule_map_object_by_catalog(self, stream_name, stream_metadata): + """ + Read original-name of fields available in metadata of catalog and add + it in `GetStdFieldsFromApiFields` dict object. + param1: stream_name: users + param2: stream_metadata + { + "breadcrumb": [ + "properties", + "user_name" + ], + "metadata": { + "original-name": "UserName" + } + } + + After iterating all metadata, + GetStdFieldsFromApiFields['users'] = {('properties', 'UserName'): 'user_name'} + """ + self.GetStdFieldsFromApiFields[stream_name] = {} + + for key, value in stream_metadata.items(): + api_name = value.get('original-name') + if api_name and key: + self.GetStdFieldsFromApiFields[stream_name][key[:-1] + (api_name,)] = key[-1:][0] + + def apply_ruleset_on_schema(self, schema, schema_copy, stream_name, parent = ()): + """ + Apply defined rule set on schema and return it. + """ + temp_dict = {} + roll_back_dict = {} + + if schema and isinstance(schema, dict) and schema.get('properties'): + # Iterate through each item of schema. + for key in schema['properties'].keys(): + breadcrumb = parent + ('properties', key) + + self.apply_ruleset_on_schema(schema.get('properties', {}).get(key, {}), + schema_copy.get('properties', {}).get(key, {}), + stream_name, breadcrumb) + + # Skip keys available in STANDARD_KEYS + if key not in STANDARD_KEYS: + + # apply rules on field + standard_key = self.apply_rules_to_original_field(key) + + # Field name is changed after applying rules + # Check if same standard name of field is already available or + # not at same level + + if key != standard_key and standard_key not in schema['properties'].keys(): + if standard_key not in temp_dict: + + # Add standard name of field in GetStdFieldsFromApiFields with + # key as tuple of breadcrumb keys + # Example: + # GetStdFieldsFromApiFields['users'][ + # ('properties', 'user_name')] = 'UserName' + self.GetStdFieldsFromApiFields[stream_name][parent + + ('properties', standard_key)] = key + + # Add key in temp_dict with value as standard_key to update + # in schema after + # iterating whole schema + # Because we can not update schema while iterating it. + temp_dict[key] = standard_key + else: + # Print Warning message for field name conflict found same level and + # add it's standard name to + # roll_back_dict because we need to roll back standard field name + # to original field name. + LOGGER.warning('Conflict found for field : %s', breadcrumb) + roll_back_dict[standard_key] = True + + elif schema.get('anyOf'): + # Iterate through each possible datatype of field + # Example: + # 'sources': { + # 'anyOf': [ + # { + # 'type': ['null', 'array'], + # 'items': { + # 'type': ['null', 'object'], + # 'properties': {} + # } + # }, + # { + # 'type': ['null', 'object'], + # 'properties': {} + # } + # ] + # + # } + for index, schema_field in enumerate(schema.get('anyOf')): + self.apply_ruleset_on_schema(schema_field, schema_copy.get('anyOf')[index], + stream_name, parent) + elif schema and isinstance(schema, dict) and schema.get('items'): + breadcrumb = parent + ('items',) + self.apply_ruleset_on_schema(schema['items'], schema_copy['items'], + stream_name, breadcrumb) + + for key, new_key in temp_dict.items(): + if roll_back_dict.get(new_key): + breadcrumb = parent + ('properties', new_key) + # Remove key with standard name from GetStdFieldsFromApiFields for which conflict + # was found. + del self.GetStdFieldsFromApiFields[stream_name][breadcrumb] + LOGGER.warning('Conflict found for field : %s', parent + ("properties", key)) + else: + # Replace original name of field with standard name in schema + try: + schema_copy['properties'][new_key] = schema_copy['properties'].pop(key) + except KeyError: + pass + + return schema_copy + + def apply_rule_set_on_stream_name(self, stream_name): + """ + Apply defined rule set on stream name and return it. + """ + standard_stream_name = self.apply_rules_to_original_field(stream_name) + + if stream_name != standard_stream_name: + self.GetStdFieldsFromApiFields[stream_name]['stream_name'] = stream_name + return standard_stream_name + + # return original name of stream if it is not changed. + return stream_name + + @classmethod + def apply_rules_to_original_field(cls, key): + """ + Apply defined rules on field. + - Divide alphanumeric strings containing capital letters followed by small letters into + multiple words and join with underscores. + - However, two or more adjacent capital letters are considered a part of one word. + - Example: + anotherName -> another_name + ANOTHERName -> anothername + - Divide alphanumeric strings containing letters, number and special character into + multiple words and join with underscores. + - Example: + MyName123 -> my_name_123 + - Convert any character that is not a letter, digit, or underscore to underscore. + A space is considered a character. + - Example: + A0a_*A -> a_0_a_a + - Convert multiple underscores to a single underscore + - Example: + add____*LPlO -> add_lpl_o + - Convert all upper-case letters to lower-case. + """ + # Divide alphanumeric strings containing capital letters followed by small letters into + # multiple words and joined with underscores. This include empty string at last + standard_key = re.findall('[A-Z]*[^A-Z]*', key) + standard_key = '_'.join(standard_key) + + # Remove empty string from last position + standard_key = standard_key[:-1] + + # Divide alphanumeric strings containing letters, number and special character into + # multiple words and join with underscores. + standard_key = re.findall(r'[A-Za-z_]+|\d+|\W+', standard_key) + standard_key = '_'.join(standard_key) + + # Replace all special character with underscore + standard_key = re.sub(r'[\W]', '_', standard_key) + + # Prepend underscore if 1st character is digit + if standard_key[0].isdigit(): + standard_key = f'_{standard_key}' + + # Convert repetitive multiple underscores to a single underscore + standard_key = re.sub(r'[_]+', '_', standard_key) + + # Convert all upper-case letters to lower-case. + return standard_key.lower() + + def apply_ruleset_on_api_response(self, response, stream_name, parent = ()): + """ + Apply defined rule set on api response and return it. + """ + temp_dict = {} + if isinstance(response, dict): + for key, value in response.items(): + if isinstance(value, list) and value: + breadcrumb = parent + ('properties', key, 'items') + # Iterate through each item of list + for val in value: + self.apply_ruleset_on_api_response(val, stream_name, breadcrumb) + elif isinstance(value, dict): + breadcrumb = parent + ('properties', key) + self.apply_ruleset_on_api_response(value, stream_name, breadcrumb) + else: + breadcrumb = parent + ('properties', key) + + if breadcrumb in self.GetStdFieldsFromApiFields[stream_name]: + # Field found in the rule_map that need to be changed to standard name + temp_dict[key] = self.GetStdFieldsFromApiFields[stream_name][breadcrumb] + + # Updated key in record + for key, new_key in temp_dict.items(): + # Replace original name of field with standard name in response + response[new_key] = response.pop(key) + + return response diff --git a/tests/unittests/test_rule_set.py b/tests/unittests/test_rule_set.py new file mode 100644 index 00000000..01abb598 --- /dev/null +++ b/tests/unittests/test_rule_set.py @@ -0,0 +1,45 @@ +import unittest +from tap_shopify.rule_map import RuleMap + +FIELDS_SET = { + '%MyName123': '_my_name_123', + 'ANOTHERName': 'anothername', + 'anotherName': 'another_name', + 'add____*LPlO': 'add_lpl_o', + '123Abc%%_opR': '_123_abc_op_r', + 'UserName': 'user_name', + 'A0a_A': 'a_0_a_a', + 'aE0': 'a_e_0', + 'a.a b': 'a_a_b', + '1MyName': '_1_my_name', + '!MyName': '_my_name', + 'My_Name_': 'my_name_', + '_My_Name': '_my_name', + '___999Myy': '_999_myy', + 'My Name': 'my_name', + '["_"]': '_', + 'test-abc': 'test_abc', + '53234': '_53234', + 'My_Name!': 'my_name_', + 'My_Name____c': 'my_name_c', + '1MyName': '_1_my_name', + 'blurry-green-dodo-important': 'blurry_green_dodo_important', + '__new__--test__': '_new_test_', + '-5.490030': '_5_490030', + '\'aa\'': '_aa_', + "Audience Report": 'audience_report', + '******': '_', + '``~Qo': '_qo', + '99j_J': '_99_j_j' + + +} + +class TestRuleMap(unittest.TestCase): + + def test_apply_rules_to_original_field(self): + rule_map = RuleMap() + + for field, value in FIELDS_SET.items(): + standard_field = rule_map.apply_rules_to_original_field(field) + self.assertEquals(standard_field, value)