diff --git a/TM1py/Services/HierarchyService.py b/TM1py/Services/HierarchyService.py index b4da2656..e3da69a1 100644 --- a/TM1py/Services/HierarchyService.py +++ b/TM1py/Services/HierarchyService.py @@ -9,7 +9,7 @@ import json import math -from typing import Dict, Tuple, List, Optional +from typing import Dict, Tuple, List, Optional, Iterable import networkx as nx from requests import Response @@ -22,7 +22,7 @@ from TM1py.Services.SubsetService import SubsetService from TM1py.Utils.Utils import case_and_space_insensitive_equals, format_url, CaseAndSpaceInsensitiveDict, \ CaseAndSpaceInsensitiveSet, CaseAndSpaceInsensitiveTuplesDict, require_pandas, require_data_admin, \ - lower_and_drop_spaces, require_ops_admin, verify_version + require_ops_admin, verify_version class HierarchyService(ObjectService): @@ -353,8 +353,9 @@ def remove_edges_under_consolidation(self, dimension_name: str, hierarchy_name: hierarchy = self.get(dimension_name, hierarchy_name) from TM1py.Services import ElementService element_service = ElementService(self._rest) - elements_under_consolidations = CaseAndSpaceInsensitiveSet(element_service.get_members_under_consolidation(dimension_name, hierarchy_name, - consolidation_element)) + elements_under_consolidations = CaseAndSpaceInsensitiveSet( + element_service.get_members_under_consolidation(dimension_name, hierarchy_name, + consolidation_element)) elements_under_consolidations.add(consolidation_element) remove_edges = [] for (parent, component) in hierarchy.edges: @@ -427,8 +428,10 @@ def update_or_create_hierarchy_from_dataframe( verify_unique_elements: bool = False, verify_edges: bool = True, element_type_column: str = 'ElementType', - unwind: bool = False, - update_attribute_types: bool = False): + unwind_all: bool = False, + unwind_consolidations: Iterable = None, + update_attribute_types: bool = False, + **kwargs): """ Update or Create a hierarchy based on a dataframe, while never deleting existing elements. :param dimension_name: @@ -455,8 +458,11 @@ def update_or_create_hierarchy_from_dataframe( Abort early if element names are not unique :param verify_edges: Abort early if edges contain a circular reference - :param unwind: bool + :param unwind_all: bool Unwind hierarch before creating new edges + :param unwind_consolidations: list + Unwind a list of specific consolidations in the hierarchy before creating new edges, + if unwind_all is true, this list is ignored :param update_attribute_types: bool If True, function will delete and recreate attributes when a type change is requested. By default, function will not delete attributes. @@ -485,6 +491,17 @@ def update_or_create_hierarchy_from_dataframe( if len(alias_columns) > 0: self._validate_alias_uniqueness(df=df[[element_column, *alias_columns]]) + # backward compatibility for unwind, the value for unwind would be assinged to unwind_all. expected type is bool + if "unwind" in kwargs: + unwind_all = kwargs["unwind"] + + if unwind_consolidations: + if isinstance(unwind_consolidations, str) or not isinstance(unwind_consolidations, Iterable): + raise ValueError( + f"value for 'unwind_consolidations' must be an iterable (e.g., list), " + f"but received: '{unwind_consolidations}' of type {type(unwind_consolidations).__name__}" + ) + # identify and sort level columns level_columns = [] level_weight_columns = [] @@ -634,8 +651,29 @@ def update_or_create_hierarchy_from_dataframe( sum_numeric_duplicates=False, use_blob=True) - if unwind: - self.remove_all_edges(dimension_name, hierarchy_name) + if unwind_all: + self.remove_all_edges(dimension_name=dimension_name, hierarchy_name=hierarchy_name) + else: + if unwind_consolidations: + edges_to_delete = CaseAndSpaceInsensitiveTuplesDict() + for elem in unwind_consolidations: + if not self.elements.exists( + dimension_name=dimension_name, + hierarchy_name=hierarchy_name, + element_name=elem): + continue + + edges_under_consolidation = self.elements.get_edges_under_consolidation( + dimension_name=dimension_name, + hierarchy_name=hierarchy_name, + consolidation=elem) + edges_to_delete.join(edges_under_consolidation) + + self.elements.delete_edges( + dimension_name=dimension_name, + hierarchy_name=hierarchy_name, + edges=edges_to_delete, + use_blob=self.is_admin) edges = CaseAndSpaceInsensitiveTuplesDict() for element_name, *record in df[[element_column, *level_columns, *level_weight_columns]].itertuples( @@ -666,16 +704,16 @@ def update_or_create_hierarchy_from_dataframe( else: raise ex - delete_edges = { + edges_to_delete = { (k, v): w for (k, v), w in edges.items() if w != current_edges.get((k, v), w)} - if delete_edges: + if edges_to_delete: self.elements.delete_edges( dimension_name=dimension_name, hierarchy_name=hierarchy_name, - edges=delete_edges.keys(), + edges=edges_to_delete.keys(), use_blob=self.is_admin) new_edges = { diff --git a/TM1py/Utils/Utils.py b/TM1py/Utils/Utils.py index a8a5a527..8d6aae85 100644 --- a/TM1py/Utils/Utils.py +++ b/TM1py/Utils/Utils.py @@ -4,16 +4,15 @@ import http.client as http_client import json import math -import pytz import re import ssl import urllib.parse as urlparse -from contextlib import suppress from enum import Enum, unique from io import StringIO from typing import Any, Dict, List, Tuple, Iterable, Optional, Generator, Union, Callable from urllib.parse import unquote +import pytz import requests from mdxpy import MdxBuilder, Member from requests.adapters import HTTPAdapter @@ -1034,152 +1033,424 @@ def map_cell_properties_to_compact_json_response(properties: List, compact_cells class CaseAndSpaceInsensitiveDict(collections.abc.MutableMapping): - """A case-and-space-insensitive dict-like object with String keys. - Implements all methods and operations of - ``collections.abc.MutableMapping`` as well as dict's ``copy``. Also - provides ``adjusted_items``, ``adjusted_keys``. - All keys are expected to be strings. The structure remembers the - case of the last key to be set, and ``iter(instance)``, - ``keys()``, ``items()``, ``iterkeys()``, and ``iteritems()`` - will contain case-sensitive keys. - However, querying and contains testing is case insensitive: - elements = TM1pyElementsDictionary() - elements['Travel Expenses'] = 100 - elements['travelexpenses'] == 100 # True - Entries are ordered + """ + A case-and-space-insensitive dict-like object with string keys. + + This class implements all methods and operations of `collections.abc.MutableMapping`, + as well as dict's `copy`. It also provides `adjusted_items` and `adjusted_keys`. + All keys are expected to be strings. + + The structure remembers the case of the last key set, and methods like `__iter__`, + `keys()`, `items()`, etc., will contain case-sensitive keys. + + However, querying and membership tests are case-and-space-insensitive: + data = CaseAndSpaceInsensitiveDict() + data['Travel Expenses'] = 100 + assert data['travelexpenses'] == 100 # True + + Entries are ordered. """ def __init__(self, data=None, **kwargs): + """Initialize the dictionary with optional initial data.""" self._store = collections.OrderedDict() if data is None: data = {} self.update(data, **kwargs) - def __setitem__(self, key: str, value): - # Use the adjusted cased key for lookups, but store the actual - # key alongside the value. - self._store[lower_and_drop_spaces(key)] = (key, value) + def _adjust_key(self, key): + """Adjust the key by lowering case and removing spaces.""" + if not isinstance(key, str): + raise TypeError("Keys must be strings.") + return lower_and_drop_spaces(key) - def __getitem__(self, key: str): - return self._store[lower_and_drop_spaces(key)][1] + def __setitem__(self, key, value): + """Set the value for a key, adjusting the key as needed.""" + adjusted_key = self._adjust_key(key) + self._store[adjusted_key] = (key, value) - def __delitem__(self, key: str): - del self._store[lower_and_drop_spaces(key)] + def __getitem__(self, key): + """Retrieve the value for a key, using the adjusted key.""" + adjusted_key = self._adjust_key(key) + try: + return self._store[adjusted_key][1] + except KeyError: + raise KeyError(f"Key '{key}' not found.") from None + + def __delitem__(self, key): + """Delete the item associated with the key.""" + adjusted_key = self._adjust_key(key) + try: + del self._store[adjusted_key] + except KeyError: + raise KeyError(f"Key '{key}' not found.") from None def __iter__(self): - return (casedkey for casedkey, mappedvalue in self._store.values()) + """Iterate over the keys in their original case.""" + return (key for key, _ in self._store.values()) def __len__(self): return len(self._store) - def adjusted_items(self) -> Generator: - """Like iteritems(), but with all adjusted keys.""" - return ( - (adjusted_key, key_value[1]) - for (adjusted_key, key_value) - in self._store.items() - ) - - def adjusted_keys(self) -> Generator: - """Like keys(), but with all adjusted keys.""" - return ( - adjusted_key - for (adjusted_key, key_value) - in self._store.items() - ) + def __contains__(self, key): + """Check if the key exists in the dictionary.""" + adjusted_key = self._adjust_key(key) + return adjusted_key in self._store + + def keys(self): + """Return a view of the keys in their original case.""" + return [key for key, _ in self._store.values()] + + def values(self): + """Return a view of the values.""" + return [value for _, value in self._store.values()] + + def items(self): + """Return a view of the items in their original case.""" + return [(key, value) for key, value in self._store.values()] + + def adjusted_keys(self): + """Return a generator of the adjusted keys.""" + return (adjusted_key for adjusted_key in self._store.keys()) + + def adjusted_items(self): + """Return a generator of (adjusted_key, value) pairs.""" + return ((adjusted_key, key_value[1]) for adjusted_key, key_value in self._store.items()) def __eq__(self, other): + """Check equality with another dictionary.""" if isinstance(other, collections.abc.Mapping): other = CaseAndSpaceInsensitiveDict(other) else: return NotImplemented - # Compare insensitively return dict(self.adjusted_items()) == dict(other.adjusted_items()) - # Copy is required def copy(self): return CaseAndSpaceInsensitiveDict(self._store.values()) + def update(self, other=(), **kwargs): + """ + Update the dictionary with key/value pairs from other, overwriting existing keys. + + Parameters: + other (Mapping or Iterable): A mapping or iterable of key-value pairs. + **kwargs: Additional key-value pairs. + """ + if isinstance(other, collections.abc.Mapping): + for key, value in other.items(): + self[key] = value + + elif hasattr(other, '__iter__'): + for item in other: + if not isinstance(item, collections.abc.Iterable): + raise TypeError("Items must be key-value pairs.") + key, value = item + self[key] = value + + elif other: + raise TypeError("Other object is not a mapping or iterable of key-value pairs.") + + for key, value in kwargs.items(): + self[key] = value + + def get(self, key, default=None): + """ + Return the value for key if key is in the dictionary, else default. + + Parameters: + key (str): The key to look up. + default: The value to return if the key is not found. + """ + try: + return self[key] + except KeyError: + return default + + def setdefault(self, key, default=None): + """ + If key is in the dictionary, return its value. + If not, insert key with a value of default and return default. + + Parameters: + key (str): The key to look up or insert. + default: The value to set if the key is not found. + """ + if key in self: + return self[key] + else: + self[key] = default + return default + + def pop(self, key, default=None): + """ + Remove the specified key and return the corresponding value. + If key is not found, default is returned if provided, otherwise KeyError is raised. + + Parameters: + key (str): The key to remove. + default: The value to return if the key is not found. + """ + adjusted_key = self._adjust_key(key) + try: + value = self._store.pop(adjusted_key)[1] + return value + except KeyError: + if default is not None: + return default + else: + raise KeyError(f"Key '{key}' not found.") from None + + def popitem(self): + """ + Remove and return a (key, value) pair from the dictionary. + Pairs are returned in LIFO order. + + Raises: + KeyError: If the dictionary is empty. + """ + adjusted_key, (key, value) = self._store.popitem() + return key, value + + def clear(self): + """Remove all items from the dictionary.""" + self._store.clear() + def __repr__(self): - return str(dict(self.items())) + """Return the dictionary's string representation.""" + items = ", ".join(f"{key!r}: {value!r}" for key, value in self.items()) + return f"{self.__class__.__name__}({{{items}}})" + + def __str__(self): + """Return a user-friendly string representation.""" + return repr(self) class CaseAndSpaceInsensitiveTuplesDict(collections.abc.MutableMapping): - """A case-and-space-insensitive dict-like object with String-Tuples Keys. - Implements all methods and operations of - ``collections.abc.MutableMapping`` as well as dict's ``copy``. Also - provides ``adjusted_items``, ``adjusted_keys``. - All keys are expected to be tuples of strings. The structure remembers the - case of the last key to be set, and ``iter(instance)``, - ``keys()``, ``items()``, ``iterkeys()``, and ``iteritems()`` - will contain case-sensitive keys. - However, querying and contains testing is case insensitive: + """ + A case-and-space-insensitive dict-like object with tuple of strings as keys. + + This class implements all methods and operations of `collections.abc.MutableMapping`, + as well as dict's `copy`. It also provides `adjusted_items` and `adjusted_keys`. + All keys are expected to be tuples of strings. + + The structure remembers the case of the last key set, and methods like `__iter__`, + `keys()`, `items()`, etc., will contain case-sensitive keys. + + However, querying and membership tests are case-and-space-insensitive: data = CaseAndSpaceInsensitiveTuplesDict() data[('[Business Unit].[UK]', '[Scenario].[Worst Case]')] = 1000 - data[('[BusinessUnit].[UK]', '[Scenario].[worstcase]')] == 1000 # True - data[('[Business Unit].[UK]', '[Scenario].[Worst Case]')] == 1000 # True - Entries are ordered + assert data[('[BusinessUnit].[UK]', '[Scenario].[worstcase]')] == 1000 + assert data[('[Business Unit].[UK]', '[Scenario].[Worst Case]')] == 1000 + + Entries are ordered. """ def __init__(self, data=None, **kwargs): + """Initialize the dictionary with optional initial data.""" self._store = collections.OrderedDict() if data is None: data = {} self.update(data, **kwargs) + def _adjust_key(self, key): + """Adjust the key by lowering case and removing spaces.""" + if not isinstance(key, tuple): + raise TypeError("Keys must be tuples of strings.") + try: + return tuple(lower_and_drop_spaces(item) for item in key) + except TypeError as e: + raise TypeError("All items in the key tuple must be strings.") from e + def __setitem__(self, key, value): - # Use the adjusted cased key for lookups, but store the actual - # key alongside the value. - self._store[tuple([lower_and_drop_spaces(item) for item in key])] = (key, value) + """Set the value for a key, adjusting the key as needed.""" + adjusted_key = self._adjust_key(key) + self._store[adjusted_key] = (key, value) def __getitem__(self, key): - return self._store[tuple([lower_and_drop_spaces(item) for item in key])][1] + """Retrieve the value for a key, using the adjusted key.""" + adjusted_key = self._adjust_key(key) + try: + return self._store[adjusted_key][1] + except KeyError: + raise KeyError(f"Key {key} not found.") from None def __delitem__(self, key): - del self._store[tuple([lower_and_drop_spaces(item) for item in key])] + """Delete the item associated with the key.""" + adjusted_key = self._adjust_key(key) + try: + del self._store[adjusted_key] + except KeyError: + raise KeyError(f"Key {key} not found.") from None def __iter__(self): - return (casedkey for casedkey, mappedvalue in self._store.values()) + """Iterate over the keys in their original case.""" + return (key for key, _ in self._store.values()) def __len__(self): + """Return the number of items in the dictionary.""" return len(self._store) - def items(self): - return super(CaseAndSpaceInsensitiveTuplesDict, self).items() + def __contains__(self, key): + """Check if the key exists in the dictionary.""" + adjusted_key = self._adjust_key(key) + return adjusted_key in self._store - def adjusted_items(self): - """Like iteritems(), but with all adjusted keys.""" - return ( - (adjusted_key, key_value[1]) - for (adjusted_key, key_value) - in self._store.items() - ) + def keys(self): + """Return a view of the keys in their original case.""" + return [key for key, _ in self._store.values()] + + def values(self): + """Return a view of the values.""" + return [value for _, value in self._store.values()] + + def items(self): + """Return a view of the items (key-value pairs).""" + return [(key, value) for key, value in self._store.values()] def adjusted_keys(self): - """Like keys(), but with all adjusted keys.""" - return ( - adjusted_key - for (adjusted_key, key_value) - in self._store.items() - ) + """Return a generator of the adjusted keys.""" + return (adjusted_key for adjusted_key in self._store.keys()) + + def adjusted_items(self): + """Return a generator of (adjusted_key, value) pairs.""" + return ((adjusted_key, key_value[1]) for adjusted_key, key_value in self._store.items()) def __eq__(self, other): + """Check equality with another dictionary.""" if isinstance(other, collections.abc.Mapping): other = CaseAndSpaceInsensitiveTuplesDict(other) else: return NotImplemented - # Compare insensitively return dict(self.adjusted_items()) == dict(other.adjusted_items()) - # Copy is required def copy(self): - return CaseAndSpaceInsensitiveTuplesDict(self._store.values()) + """Create a shallow copy of the dictionary.""" + new_copy = CaseAndSpaceInsensitiveTuplesDict() + new_copy._store = self._store.copy() + return new_copy + + def update(self, other=(), **kwargs): + """ + Update the dictionary with key/value pairs from other, overwriting existing keys. + + Parameters: + other (Mapping or Iterable): A mapping or iterable of key-value pairs. + **kwargs: Additional key-value pairs. + """ + if isinstance(other, collections.abc.Mapping): + for key, value in other.items(): + self[key] = value + + elif hasattr(other, '__iter__'): + for item in other: + if not isinstance(item, collections.abc.Iterable): + raise TypeError("Items must be key-value pairs.") + key, value = item + self[key] = value + + elif other: + raise TypeError("Other object is not a mapping or iterable of key-value pairs.") + + for key, value in kwargs.items(): + self[key] = value + + def join(self, other): + """ + Merge another mapping or iterable of key-value pairs into this dictionary. + + Parameters: + other (Mapping or Iterable): A mapping or iterable of key-value pairs. + """ + self.update(other) + + def get(self, key, default=None): + """ + Return the value for key if key is in the dictionary, else default. + + Parameters: + key (tuple): The key to look up. + default: The value to return if the key is not found. + """ + try: + return self[key] + except KeyError: + return default + + def setdefault(self, key, default=None): + """ + If key is in the dictionary, return its value. + If not, insert key with a value of default and return default. + + Parameters: + key (tuple): The key to look up or insert. + default: The value to set if the key is not found. + """ + if key in self: + return self[key] + else: + self[key] = default + return default + + def pop(self, key, default=None): + """ + Remove the specified key and return the corresponding value. + If key is not found, default is returned if provided, otherwise KeyError is raised. + + Parameters: + key (tuple): The key to remove. + default: The value to return if the key is not found. + """ + adjusted_key = self._adjust_key(key) + try: + value = self._store.pop(adjusted_key)[1] + return value + except KeyError: + if default is not None: + return default + else: + raise KeyError(f"Key {key} not found.") from None + + def popitem(self): + """ + Remove and return a (key, value) pair from the dictionary. + Pairs are returned in LIFO order. + + Raises: + KeyError: If the dictionary is empty. + """ + adjusted_key, (key, value) = self._store.popitem() + return key, value + + def clear(self): + """Remove all items from the dictionary.""" + self._store.clear() def __repr__(self): - return str(dict(self.items())) + """Return the dictionary's string representation.""" + items = ", ".join(f"{key!r}: {value!r}" for key, value in self.items()) + return f"{self.__class__.__name__}({{{items}}})" + + def __str__(self): + """Return a user-friendly string representation.""" + return repr(self) class CaseAndSpaceInsensitiveSet(collections.abc.MutableSet): + """ + A case-and-space-insensitive set-like object for strings. + + This class implements all methods and operations of `collections.abc.MutableSet`. + All values are expected to be strings. The set remembers the case of the last + value added, and methods like `__iter__` and `__str__` will contain case-sensitive values. + + However, membership tests are case-and-space-insensitive: + data = CaseAndSpaceInsensitiveSet('Apple', 'Banana') + assert 'apple' in data # True + assert ' BANANA ' in data # True + + Entries are ordered based on insertion order. + """ + def __init__(self, *values): self._store = {} for v in values: @@ -1191,45 +1462,132 @@ def __init__(self, *values): else: self.add(v) - def __contains__(self, value): - return value.lower().replace(" ", "") in self._store + def _adjust_value(self, value): + if not isinstance(value, str): + raise TypeError("Value must be string.") + return lower_and_drop_spaces(value) - def __delitem__(self, key): - del self._store[key.lower().replace(" ", "")] + def __contains__(self, value): + adjusted_value = self._adjust_value(value) + return adjusted_value in self._store def __iter__(self): + """Iterate over the values in their original case.""" return iter(self._store.values()) def __len__(self): return len(self._store) def add(self, value): - self._store[value.lower().replace(" ", "")] = value + adjusted_value = self._adjust_value(value) + self._store[adjusted_value] = value def discard(self, value): - with suppress(KeyError): - del self._store[value.lower().replace(" ", "")] + adjusted_value = self._adjust_value(value) + self._store.pop(adjusted_value, None) - def copy(self): - return CaseAndSpaceInsensitiveSet(*self._store.values()) + def clear(self): + self._store.clear() - def __repr__(self): - return str(self._store) + def pop(self): + """ + Remove and return an arbitrary element from the set. + Raises KeyError if the set is empty. + """ + if not self._store: + raise KeyError("pop from an empty set") + adjusted_value, value = self._store.popitem() + return value + + def update(self, *others): + """Update the set, adding elements from all others.""" + for iterable in others: + for value in iterable: + self.add(value) def __eq__(self, other): - if isinstance(other, collections.abc.MutableSet): - other = CaseAndSpaceInsensitiveSet(*other) - else: + """Check equality with another set.""" + if not isinstance(other, collections.abc.Set): return NotImplemented - # Compare insensitively - return set(self._store.keys()) == set(other._store.keys()) + return set(self._adjust_value(v) for v in self) == \ + set(self._adjust_value(v) for v in other) + + def __ne__(self, other): + """Check inequality with another set.""" + return not self == other def __sub__(self, other): + """Return a new set with elements in the set that are not in the others.""" result = self.copy() - for entry in other: - result.discard(entry) + result.difference_update(other) return result + def copy(self): + """Create a shallow copy of the set.""" + return CaseAndSpaceInsensitiveSet(self) + + def __repr__(self): + """Return the set's string representation.""" + items = ", ".join(repr(value) for value in self) + return f"{self.__class__.__name__}([{items}])" + + def __str__(self): + """Return a user-friendly string representation.""" + return f"{{{', '.join(map(str, self))}}}" + + def __le__(self, other): + """Test whether every element in the set is in other.""" + return all(item in other for item in self) + + def __lt__(self, other): + """Test whether the set is a proper subset of other.""" + return self <= other and self != other + + def __ge__(self, other): + """Test whether every element in other is in the set.""" + return all(item in self for item in other) + + def __gt__(self, other): + """Test whether the set is a proper superset of other.""" + return self >= other and self != other + + def __or__(self, other): + """Return the union of the sets as a new set.""" + return self.union(other) + + def __and__(self, other): + """Return the intersection of the sets as a new set.""" + return self.intersection(other) + + def __delitem__(self, key): + del self._store[key.lower().replace(" ", "")] + + def difference_update(self, *others): + """Remove all elements of another set from this set.""" + for iterable in others: + for value in iterable: + self.discard(value) + + def intersection(self, *others): + """Return a new set with elements common to the set and all others.""" + new_set = CaseAndSpaceInsensitiveSet() + for value in self: + if all(value in other for other in others): + new_set.add(value) + return new_set + + def difference(self, *others): + """Return a new set with elements in the set that are not in the others.""" + new_set = self.copy() + new_set.difference_update(*others) + return new_set + + def union(self, *others): + """Return a new set with elements from the set and all others.""" + new_set = self.copy() + new_set.update(*others) + return new_set + def get_dimensions_from_where_clause(mdx: str) -> List[str]: mdx = mdx.replace(" ", "").upper() @@ -1365,11 +1723,13 @@ def read_object_name_from_url(url: str, pattern: str) -> str: return unquote(match.group(1)) + def utc_localize_time(timestamp): timestamp = pytz.utc.localize(timestamp) timestamp_utc = timestamp.astimezone(pytz.utc) return timestamp_utc + class HTTPAdapterWithSocketOptions(HTTPAdapter): def __init__(self, *args, **kwargs): self.socket_options = kwargs.pop("socket_options", None) diff --git a/Tests/CaseAndSpaceInsensitiveDict_test.py b/Tests/CaseAndSpaceInsensitiveDict_test.py new file mode 100644 index 00000000..f08a9702 --- /dev/null +++ b/Tests/CaseAndSpaceInsensitiveDict_test.py @@ -0,0 +1,150 @@ +import unittest +from TM1py.Utils.Utils import CaseAndSpaceInsensitiveDict + + +class TestCaseAndSpaceInsensitiveDict(unittest.TestCase): + + def setUp(self): + self.map = CaseAndSpaceInsensitiveDict() + self.map["key1"] = "value1" + self.map["key2"] = "value2" + self.map["key3"] = "value3" + + def tearDown(self): + del self.map + + def test_set_item(self): + # Test setting new items with case and space variations + self.map["key4"] = "value4" + self.assertEqual(self.map["KEY4"], "value4") + self.assertEqual(self.map["key4"], "value4") + self.assertEqual(self.map["K e Y 4"], "value4") + + def test_get_item(self): + # Test getting items with case and space variations + self.assertEqual(self.map["KEY1"], "value1") + self.assertEqual(self.map["key2"], "value2") + self.assertEqual(self.map["K e Y 3"], "value3") + + def test_get_with_default(self): + # Test getting an item with a default value for non-existing key + self.assertEqual(self.map.get("nonexistent", "default"), "default") + self.assertEqual(self.map.get("KEY1", "default"), "value1") + + def test_delete_item(self): + # Delete items with case and space insensitivity + del self.map["KEY1"] + del self.map["key2"] + del self.map["K e Y 3"] + + # Confirm deletion + self.assertNotIn("key1", self.map) + self.assertNotIn("key2", self.map) + self.assertNotIn("key3", self.map) + + def test_iter(self): + # Ensure iteration maintains insertion order + for key1, key2 in zip(self.map, ("key1", "key2", "key3")): + self.assertEqual(key1, key2) + + def test_len(self): + # Verify length + self.assertEqual(len(self.map), 3) + self.map["key4"] = "value4" + self.assertEqual(len(self.map), 4) + + def test_copy(self): + # Verify copy creates a new instance with the same data + copy_map = self.map.copy() + self.assertIsNot(copy_map, self.map) + self.assertEqual(copy_map, self.map) + + def test_equality(self): + # Exact match + other_map = CaseAndSpaceInsensitiveDict() + other_map["key1"] = "value1" + other_map["key2"] = "value2" + other_map["key3"] = "value3" + self.assertEqual(self.map, other_map) + + def test_equality_case_and_space_insensitive(self): + # Case and space insensitive match + other_map = CaseAndSpaceInsensitiveDict() + other_map["key1"] = "value1" + other_map["KEY2"] = "value2" + other_map["K e Y 3"] = "value3" + self.assertEqual(self.map, other_map) + + def test_inequality(self): + # Mismatched values or additional keys should cause inequality + other_map = CaseAndSpaceInsensitiveDict({"key 1": "wrong", "key 2": "value2", "key3": "value3"}) + self.assertNotEqual(self.map, other_map) + + other_map = CaseAndSpaceInsensitiveDict({"key1": "value1", "key 2": "wrong", "key3": "value3"}) + self.assertNotEqual(self.map, other_map) + + other_map = CaseAndSpaceInsensitiveDict({"key1": "value1", "key2": "value2", "key4": "value4"}) + self.assertNotEqual(self.map, other_map) + + def test_update(self): + # Test updating existing and new keys + update_map = {"KEY1": "new_value1", "new_key": "new_value"} + self.map.update(update_map) + + self.assertEqual(self.map["key1"], "new_value1") + self.assertEqual(self.map["new_key"], "new_value") + self.assertEqual(len(self.map), 4) + + def test_setdefault(self): + # Existing key should return its value + self.assertEqual(self.map.setdefault("key1", "default"), "value1") + + # New key should set and return the default value + self.assertEqual(self.map.setdefault("new_key", "default"), "default") + self.assertEqual(self.map["new_key"], "default") + + def test_keys(self): + # Check keys() method for case and insertion order preservation + expected_keys = ["key1", "key2", "key3"] + self.assertEqual(list(self.map.keys()), expected_keys) + + def test_values(self): + # Check values() method for correct values in insertion order + expected_values = ["value1", "value2", "value3"] + self.assertEqual(list(self.map.values()), expected_values) + + def test_items(self): + # Check items() method for correct key-value pairs in insertion order + expected_items = [("key1", "value1"), ("key2", "value2"), ("key3", "value3")] + self.assertEqual(list(self.map.items()), expected_items) + + def test_adjusted_keys(self): + # Test adjusted_keys() to ensure all keys are lowercased and spaceless + expected_adjusted_keys = ["key1", "key2", "key3"] + self.assertEqual(list(self.map.adjusted_keys()), expected_adjusted_keys) + + def test_adjusted_items(self): + # Test adjusted_items() for lowercased, spaceless keys + expected_adjusted_items = { + "key1": "value1", + "key2": "value2", + "key3": "value3" + } + adjusted_items = dict(self.map.adjusted_items()) + self.assertEqual(adjusted_items, expected_adjusted_items) + + def test_contains(self): + # Test in operator with case and space insensitivity + self.assertIn("key1", self.map) + self.assertIn("KEY2", self.map) + self.assertIn("k e y 3", self.map) + self.assertNotIn("nonexistent_key", self.map) + + def test_keyerror_on_nonexistent_key(self): + # Confirm that accessing a nonexistent key raises KeyError + with self.assertRaises(KeyError): + _ = self.map["nonexistent_key"] + + +if __name__ == '__main__': + unittest.main() diff --git a/Tests/CaseAndSpaceInsensitiveSet_test.py b/Tests/CaseAndSpaceInsensitiveSet_test.py new file mode 100644 index 00000000..426838b6 --- /dev/null +++ b/Tests/CaseAndSpaceInsensitiveSet_test.py @@ -0,0 +1,163 @@ +import unittest +from TM1py.Utils.Utils import CaseAndSpaceInsensitiveSet + + +class TestCaseAndSpaceInsensitiveSet(unittest.TestCase): + + def setUp(self): + self.original_values = ("Value1", "Value 2", "V A L U E 3") + self.set = CaseAndSpaceInsensitiveSet() + for value in self.original_values: + self.set.add(value) + + def tearDown(self): + del self.set + + def test_contains(self): + # Case and space insensitivity + self.assertIn("Value1", self.set) + self.assertIn("VALUE1", self.set) + self.assertIn("V ALUE 1", self.set) + self.assertIn("Value 2", self.set) + self.assertIn("V A L UE 2", self.set) + self.assertIn("VALUE3", self.set) + self.assertIn("V A LUE 3", self.set) + + # Non-existent values + self.assertNotIn("Value", self.set) + self.assertNotIn("VALUE4", self.set) + self.assertNotIn("VA LUE 4", self.set) + + def test_del(self): + # Remove elements with del + del self.set["VALUE1"] + del self.set["value 2"] + del self.set["V a L u E 3"] + # Verify the set is empty + self.assertFalse(self.set) + + def test_discard(self): + # Discard elements and check if set is empty + self.set.discard("Value1") + self.set.discard("Value 2") + self.set.discard("Value3") + self.assertFalse(self.set) + + def test_discard_case_and_space_insensitivity(self): + # Discard with case and space insensitivity + self.set.discard("VAL UE 1") + self.set.discard("Value 2") + self.set.discard("VA LUE3") + self.assertFalse(self.set) + + def test_len(self): + # Check length after addition and removal + self.assertEqual(len(self.set), 3) + self.set.add("Value4") + self.assertEqual(len(self.set), 4) + self.set.discard("VALUE4") + self.assertEqual(len(self.set), 3) + + def test_iter(self): + # Ensure set iteration matches the original values + self.assertEqual(len(self.set), len(self.original_values)) + for value in self.set: + self.assertIn(value, self.original_values) + + def test_add(self): + # Test adding a new element with various case/space combinations + self.set.add("Value4") + self.assertIn("Value4", self.set) + self.assertIn("VALUE4", self.set) + self.assertIn(" VALUE4", self.set) + self.assertIn("VALUE4 ", self.set) + self.assertIn("V ALUE 4", self.set) + self.assertIn("Va L UE4", self.set) + self.assertIn(" VAlue4", self.set) + + def test_copy(self): + # Verify copy creates a new instance with identical elements + copy_set = self.set.copy() + self.assertIsNot(copy_set, self.set) + self.assertEqual(copy_set, self.set) + + def test_eq(self): + # Equality with exact and case-insensitive values + new_set = CaseAndSpaceInsensitiveSet(self.original_values) + self.assertEqual(self.set, new_set) + + # Case and space-insensitive match + new_set = CaseAndSpaceInsensitiveSet(value.upper() for value in self.original_values) + self.assertEqual(self.set, new_set) + + def test_eq_against_builtin_set(self): + # Equality check against Python's built-in set + new_set = set(self.original_values) + self.assertEqual(self.set, new_set) + + def test_ne(self): + # Inequality with completely different values + new_set = CaseAndSpaceInsensitiveSet(["wrong1", "wrong2", "wrong3"]) + self.assertNotEqual(self.set, new_set) + + def test_clear(self): + # Clear the set and verify it's empty + self.set.clear() + self.assertEqual(len(self.set), 0) + self.assertFalse(self.set) + + def test_pop(self): + # Pop elements and confirm they exist in the original values + popped_value = self.set.pop() + self.assertIn(popped_value, self.original_values) + self.assertEqual(len(self.set), 2) + + def test_update(self): + # Update set with additional elements + self.set.update(["NewValue", "Value1"]) # Value1 already exists + self.assertIn("NewValue", self.set) + self.assertEqual(len(self.set), 4) + + def test_union(self): + # Union with another set + new_set = CaseAndSpaceInsensitiveSet(["ExtraValue", "VALUE1"]) + result_set = self.set.union(new_set) + self.assertIn("ExtraValue", result_set) + self.assertEqual(len(result_set), 4) + + def test_intersection(self): + # Intersection with a set containing common elements + new_set = CaseAndSpaceInsensitiveSet(["VALUE1", "UnknownValue"]) + result_set = self.set.intersection(new_set) + self.assertIn("Value1", result_set) + self.assertEqual(len(result_set), 1) + + def test_difference(self): + # Difference with another set + new_set = CaseAndSpaceInsensitiveSet(["Value1", "ExtraValue"]) + result_set = self.set.difference(new_set) + self.assertNotIn("Value1", result_set) + self.assertIn("Value 2", result_set) + self.assertEqual(len(result_set), 2) + + def test_subset_and_superset_operations(self): + # Test subset, superset, and proper subset/superset relationships + new_set = CaseAndSpaceInsensitiveSet(["Value1"]) + self.assertTrue(new_set < self.set) # Proper subset + self.assertTrue(new_set <= self.set) # Subset + self.assertTrue(self.set > new_set) # Proper superset + self.assertTrue(self.set >= new_set) # Superset + self.assertFalse(new_set > self.set) # Not a superset + self.assertFalse(new_set == self.set) # Not equal + + def test_disjoint(self): + # Check for disjoint sets + disjoint_set = CaseAndSpaceInsensitiveSet(["OtherValue"]) + self.assertTrue(self.set.isdisjoint(disjoint_set)) + + overlapping_set = CaseAndSpaceInsensitiveSet(["VALUE1", "NewValue"]) + self.assertFalse(self.set.isdisjoint(overlapping_set)) + + +if __name__ == '__main__': + unittest.main() diff --git a/Tests/CaseAndSpaceInsensitiveTuplesDict_test.py b/Tests/CaseAndSpaceInsensitiveTuplesDict_test.py new file mode 100644 index 00000000..4c27456a --- /dev/null +++ b/Tests/CaseAndSpaceInsensitiveTuplesDict_test.py @@ -0,0 +1,172 @@ +import unittest +from TM1py.Utils.Utils import CaseAndSpaceInsensitiveTuplesDict + + +class TestCaseAndSpaceInsensitiveTuplesDict(unittest.TestCase): + + def setUp(self): + self.map = CaseAndSpaceInsensitiveTuplesDict() + self.map[("Elem1", "Elem1")] = "Value1" + self.map[("Elem1", "Elem2")] = 2 + self.map[("Elem1", "Elem3")] = 3 + + def tearDown(self): + del self.map + + def test_delete_item(self): + # Verify items exist + self.assertIn(("Elem1", "Elem1"), self.map) + self.assertIn(("Elem1", "Elem2"), self.map) + self.assertIn(("Elem1", "Elem3"), self.map) + + # Delete items with case and space insensitivity + del self.map[("El em1", "ELEM1")] + del self.map[("El em1", "E L E M 2")] + del self.map[("El em1", " eLEM3")] + + # Confirm deletion + self.assertNotIn(("Elem1", "Elem1"), self.map) + self.assertNotIn(("Elem1", "Elem2"), self.map) + self.assertNotIn(("Elem1", "Elem3"), self.map) + + def test_equality(self): + # Exact match + other_map = CaseAndSpaceInsensitiveTuplesDict({ + ("Elem1", "Elem1"): "Value1", + ("Elem1", "Elem2"): 2, + ("Elem1", "Elem3"): 3 + }) + self.assertEqual(other_map, self.map) + + # Case and space-insensitive match + other_map = CaseAndSpaceInsensitiveTuplesDict({ + ("Elem 1", "Elem1"): "Value1", + ("ELEM 1", "E L E M 2"): 2, + (" Elem1 ", "Elem 3"): 3 + }) + self.assertEqual(other_map, self.map) + + def test_inequality(self): + # Different value + other_map = CaseAndSpaceInsensitiveTuplesDict({ + ("Elem1", "Elem1"): "Value1", + ("Elem1", "Elem2"): 0, + ("Elem1", "Elem3"): 3 + }) + self.assertNotEqual(other_map, self.map) + + # Partially matching keys with incorrect values + other_map = CaseAndSpaceInsensitiveTuplesDict({ + ("Elem 1", "Elem1"): "Value1", + ("ELEM 1", "E L E M 2"): "wrong", + (" Elem1 ", "Elem 3"): 3 + }) + self.assertNotEqual(other_map, self.map) + + # Completely different key + other_map = CaseAndSpaceInsensitiveTuplesDict({ + ("wrong", "Elem1"): "Value1", + ("Elem1", "Elem2"): 2, + ("Elem1", "Elem3"): 3 + }) + self.assertNotEqual(other_map, self.map) + + def test_get_item(self): + # Retrieve with case and space insensitivity + self.assertEqual(self.map[("ELEM1", "ELEM1")], "Value1") + self.assertEqual(self.map[("elem1", "e l e m 2")], 2) + self.assertEqual(self.map[("e l e M 1", "elem3")], 3) + + def test_iterate_keys(self): + # Ensure iteration maintains insertion order + expected_keys = [("Elem1", "Elem1"), ("Elem1", "Elem2"), ("Elem1", "Elem3")] + for actual_key, expected_key in zip(self.map, expected_keys): + self.assertEqual(actual_key, expected_key) + + def test_length(self): + # Check length + self.assertEqual(len(self.map), 3) + + def test_set_item(self): + # Test setting a new item and overriding an existing item + self.map[("E L E M 1", "E L E M 2")] = 3 + self.assertEqual(self.map[("Elem1", "Elem2")], 3) + + # Add a new entry and check + self.map[("Elem4", "Elem5")] = 5 + self.assertEqual(len(self.map), 4) + self.assertEqual(self.map[("Elem4", "Elem5")], 5) + + def test_copy(self): + # Verify that a copy has the same contents but is a different instance + copy_map = self.map.copy() + self.assertIsNot(copy_map, self.map) + self.assertEqual(copy_map, self.map) + + def test_adjusted_keys(self): + # Test that adjusted keys return as expected (all keys lowercased and spaceless) + adjusted_keys = list(self.map.adjusted_keys()) + expected_keys = [("elem1", "elem1"), ("elem1", "elem2"), ("elem1", "elem3")] + self.assertEqual(adjusted_keys, expected_keys) + + def test_adjusted_items(self): + # Test adjusted items + adjusted_items = dict(self.map.adjusted_items()) + expected_items = { + ("elem1", "elem1"): "Value1", + ("elem1", "elem2"): 2, + ("elem1", "elem3"): 3 + } + self.assertEqual(adjusted_items, expected_items) + + def test_update(self): + # Test updating with new values + update_map = { + ("Elem1", "Elem2"): "Updated", + ("Elem1", "NewElem"): 10 + } + self.map.update(update_map) + + # Check that updates are applied + self.assertEqual(self.map[("Elem1", "Elem2")], "Updated") + self.assertEqual(self.map[("Elem1", "NewElem")], 10) + self.assertEqual(len(self.map), 4) + + def test_setdefault(self): + # Existing key with setdefault should return the existing value + self.assertEqual(self.map.setdefault(("Elem1", "Elem2"), "NewValue"), 2) + + # New key should add the value and return the default + self.assertEqual(self.map.setdefault(("Elem1", "NewElem"), 10), 10) + self.assertEqual(self.map[("Elem1", "NewElem")], 10) + + def test_keyerror_on_nonexistent_key(self): + # Test that a KeyError is raised when accessing a non-existent key + with self.assertRaises(KeyError): + _ = self.map[("NonExistent", "Key")] + + def test_contains(self): + # Test that keys are found with case and space insensitivity + self.assertIn(("Elem1", "Elem1"), self.map) + self.assertIn(("elem1", "elem2"), self.map) + self.assertIn((" e l e m 1 ", " elem 3 "), self.map) + self.assertNotIn(("NonExistent", "Key"), self.map) + + def test_keys_method(self): + # Test that keys() returns all keys in original case and insertion order + expected_keys = [("Elem1", "Elem1"), ("Elem1", "Elem2"), ("Elem1", "Elem3")] + self.assertEqual(list(self.map.keys()), expected_keys) + + def test_values_method(self): + # Test that values() returns all values in insertion order + expected_values = ["Value1", 2, 3] + self.assertEqual(list(self.map.values()), expected_values) + + def test_items_method(self): + # Test that items() returns all key-value pairs in original case and insertion order + expected_items = [(("Elem1", "Elem1"), "Value1"), (("Elem1", "Elem2"), 2), (("Elem1", "Elem3"), 3)] + self.assertEqual(list(self.map.items()), expected_items) + + +if __name__ == '__main__': + unittest.main() diff --git a/Tests/HierarchyService_test.py b/Tests/HierarchyService_test.py index b337169d..acf21f34 100644 --- a/Tests/HierarchyService_test.py +++ b/Tests/HierarchyService_test.py @@ -506,7 +506,7 @@ def test_update_or_create_hierarchy_from_dataframe(self): df=df, element_column=self.region_dimension_name, element_type_column="ElementType", - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get( @@ -533,7 +533,7 @@ def test_update_or_create_hierarchy_from_dataframe_non_standard_level_order(self df=df, element_column=self.region_dimension_name, element_type_column="ElementType", - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get( @@ -555,7 +555,7 @@ def test_update_or_create_hierarchy_from_dataframe_existing_attributes(self): hierarchy_name=self.alternative_region_dimension_name, df=df, element_type_column="ElementType", - unwind=True + unwind_all=True ) columns = [self.region_dimension_name, "ElementType", "Alias:a", "Currency:s", "population:n", "level001", @@ -571,7 +571,7 @@ def test_update_or_create_hierarchy_from_dataframe_existing_attributes(self): hierarchy_name=self.region_dimension_name, df=df, element_type_column="ElementType", - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get( dimension_name=self.alternative_region_dimension_name, @@ -592,7 +592,7 @@ def test_update_or_create_hierarchy_from_dataframe_update_attributes(self): hierarchy_name=self.alternative_region_dimension_name, df=df, element_type_column="ElementType", - unwind=True + unwind_all=True ) columns = [self.region_dimension_name, "ElementType", "Alias:a", "Currency:s", "population:s", "level001", @@ -609,7 +609,7 @@ def test_update_or_create_hierarchy_from_dataframe_update_attributes(self): df=df, element_type_column="ElementType", update_attribute_types=True, - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get( dimension_name=self.alternative_region_dimension_name, @@ -630,7 +630,7 @@ def test_update_or_create_hierarchy_from_dataframe_existing_attributes_on_hierar hierarchy_name=self.region_dimension_name, df=df, element_type_column="ElementType", - unwind=True + unwind_all=True ) columns = [self.region_dimension_name, "ElementType", "Alias:a", "Currency:s", "population:n", "level001", @@ -646,7 +646,7 @@ def test_update_or_create_hierarchy_from_dataframe_existing_attributes_on_hierar hierarchy_name=self.alternative_region_dimension_name, df=df, element_type_column="ElementType", - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get( dimension_name=self.alternative_region_dimension_name, @@ -669,7 +669,7 @@ def test_update_or_create_hierarchy_from_dataframe_no_attributes(self): df=df, element_column=self.region_dimension_name, element_type_column="ElementType", - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get( @@ -693,7 +693,7 @@ def test_update_or_create_hierarchy_from_dataframe_with_attributes_without_suffi hierarchy_name=self.region_dimension_name, df=df, element_column=self.region_dimension_name, - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get(self.region_dimension_name, self.region_dimension_name) @@ -712,7 +712,7 @@ def test_update_or_create_hierarchy_from_dataframe_on_preexisting_hierarchy(self hierarchy_name=self.region_dimension_name, df=df, element_column=self.region_dimension_name, - unwind=True + unwind_all=True ) columns = [self.region_dimension_name, "Currency", "population"] @@ -727,7 +727,7 @@ def test_update_or_create_hierarchy_from_dataframe_on_preexisting_hierarchy(self hierarchy_name=self.region_dimension_name, df=df, element_column=self.region_dimension_name, - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get(self.region_dimension_name, self.region_dimension_name) @@ -749,7 +749,7 @@ def test_update_or_create_hierarchy_from_dataframe_on_preexisting_hierarchy_with df=df, element_column=self.region_dimension_name, element_type_column="ElementType", - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get(self.region_dimension_name, self.region_dimension_name) # Assert that edges exist @@ -767,7 +767,7 @@ def test_update_or_create_hierarchy_from_dataframe_on_preexisting_hierarchy_with hierarchy_name=self.region_dimension_name, df=df, element_column=self.region_dimension_name, - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get(self.region_dimension_name, self.region_dimension_name) @@ -794,7 +794,7 @@ def test_update_or_create_hierarchy_from_dataframe_with_consolidations(self): df=df, element_column=self.region_dimension_name, element_type_column="ElementType", - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get( @@ -817,7 +817,7 @@ def test_update_or_create_hierarchy_from_dataframe_with_no_element_type(self): hierarchy_name=self.region_dimension_name, df=df, element_column="Element Name", - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get( @@ -839,7 +839,7 @@ def test_update_or_create_hierarchy_from_dataframe_without_levels(self): hierarchy_name=self.region_dimension_name, df=df, element_column="Element Name", - unwind=True) + unwind_all=True) hierarchy = self.tm1.hierarchies.get( dimension_name=self.region_dimension_name, @@ -860,7 +860,7 @@ def test_update_or_create_hierarchy_from_dataframe_imbalanced(self): dimension_name=self.region_dimension_name, hierarchy_name=self.region_dimension_name, df=df, - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get( @@ -892,7 +892,7 @@ def test_update_or_create_hierarchy_from_dataframe_inconsistent_records(self): df=df, element_column=self.region_dimension_name, element_type_column="ElementType", - unwind=True) + unwind_all=True) def test_update_or_create_hierarchy_from_dataframe_duplicate_records(self): columns = [self.region_dimension_name, "ElementType", "Alias:a", "Currency:s", "population:n", "level001", @@ -911,7 +911,7 @@ def test_update_or_create_hierarchy_from_dataframe_duplicate_records(self): df=df, element_column=self.region_dimension_name, element_type_column="ElementType", - unwind=True) + unwind_all=True) hierarchy = self.tm1.hierarchies.get( dimension_name=self.region_dimension_name, @@ -937,7 +937,7 @@ def test_update_or_create_hierarchy_from_dataframe_multi_parents(self): df=df, element_column=self.region_dimension_name, element_type_column="ElementType", - unwind=True) + unwind_all=True) hierarchy = self.tm1.hierarchies.get( dimension_name=self.region_dimension_name, @@ -964,7 +964,7 @@ def test_update_or_create_hierarchy_from_dataframe_invalid_alias(self): df=df, element_column=self.region_dimension_name, element_type_column="ElementType", - unwind=True) + unwind_all=True) def test_update_or_create_hierarchy_from_dataframe_circular_reference(self): columns = [self.region_dimension_name, "ElementType", "Alias:a", "Currency:s", "population:n", "level001", @@ -985,7 +985,7 @@ def test_update_or_create_hierarchy_from_dataframe_circular_reference(self): df=df, element_column=self.region_dimension_name, element_type_column="ElementType", - unwind=True + unwind_all=True ) def test_update_or_create_hierarchy_from_dataframe_circular_references(self): @@ -1007,7 +1007,7 @@ def test_update_or_create_hierarchy_from_dataframe_circular_references(self): df=df, element_column=self.region_dimension_name, element_type_column="ElementType", - unwind=True + unwind_all=True ) def test_update_or_create_hierarchy_from_dataframe_no_weight_columns(self): @@ -1026,7 +1026,7 @@ def test_update_or_create_hierarchy_from_dataframe_no_weight_columns(self): df=df, element_column=self.region_dimension_name, element_type_column="ElementType", - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get( @@ -1050,7 +1050,7 @@ def test_update_or_create_hierarchy_from_dataframe_new_dimension_alternate_hiera df=df, element_column=self.region_dimension_name, element_type_column="ElementType", - unwind=True + unwind_all=True ) hierarchy = self.tm1.hierarchies.get( @@ -1058,6 +1058,129 @@ def test_update_or_create_hierarchy_from_dataframe_new_dimension_alternate_hiera hierarchy_name=self.region_dimension_name) self._verify_region_dimension(hierarchy) + def test_update_or_create_hierarchy_from_dataframe_unwind_all_false(self): + columns = [self.region_dimension_name, "ElementType", "Alias:a", "Currency:s", "population:n", "level001", + "level000", "level001_weight", "level000_weight"] + data = [ + ['France', "Numeric", "Frankreich", "EUR", 60_000_000, "Europe", "World", 1, 1], + ['Switzerland', 'Numeric', "Schweiz", "CHF", 9_000_000, "Europe", "World", 1, 1], + ['Germany', 'Numeric', "Deutschland", "EUR", 84_000_000, "Europe", "World", 1, 1], + ] + df = DataFrame(data=data, columns=columns) + + self.tm1.hierarchies.update_or_create_hierarchy_from_dataframe( + dimension_name=self.region_dimension_name, + hierarchy_name=self.region_dimension_name, + df=df, + element_column=self.region_dimension_name, + element_type_column="ElementType", + unwind_all=False + ) + + hierarchy = self.tm1.hierarchies.get( + dimension_name=self.region_dimension_name, + hierarchy_name=self.region_dimension_name) + self._verify_region_dimension(hierarchy) + + def test_update_or_create_hierarchy_from_dataframe_unwind_consolidations_single(self): + columns = [self.region_dimension_name, "ElementType", "Alias:a", "Currency:s", "population:n", "level002", + "level001", "level000", "level002_weight", "level001_weight", "level000_weight"] + data = [ + ['France', "Numeric", "Frankreich", "EUR", 60_000_000, "", "Europe", "World", 0, 1, 1], + ['Switzerland', 'Numeric', "Schweiz", "CHF", 9_000_000, "DACH", "Europe", "World", 1, 1, 1], + ['Germany', 'Numeric', "Deutschland", "EUR", 84_000_000, "DACH", "Europe", "World", 1, 1, 1], + ] + df = DataFrame(data=data, columns=columns) + + self.tm1.hierarchies.update_or_create_hierarchy_from_dataframe( + dimension_name=self.region_dimension_name, + hierarchy_name=self.region_dimension_name, + df=df, + unwind_all=True + ) + + + columns = [self.region_dimension_name, "ElementType", "Alias:a", "Currency:s", "population:n"] + data = [ + ['France', "Numeric", "Frankreich", "EUR", 60_000_000], + ['Switzerland', 'Numeric', "Schweiz", "CHF", 9_000_000], + ['Germany', 'Numeric', "Deutschland", "EUR", 84_000_000], + ] + df = DataFrame(data=data, columns=columns) + + self.tm1.hierarchies.update_or_create_hierarchy_from_dataframe( + dimension_name=self.region_dimension_name, + hierarchy_name=self.region_dimension_name, + df=df, + unwind_consolidations=["Dach"], + unwind_all=False + ) + + hierarchy = self.tm1.hierarchies.get( + dimension_name=self.region_dimension_name, + hierarchy_name=self.region_dimension_name) + self._verify_region_attributes(hierarchy) + + self.assertEqual(1, hierarchy.edges["Europe", "France"]) + self.assertEqual(1, hierarchy.edges["Europe", "DACH"]) + self.assertEqual(1, hierarchy.edges["Europe", "DACH"]) + + self.assertNotIn(("DACH", "Germany"), hierarchy.edges) + self.assertNotIn(("DACH", "Switzerland"), hierarchy.edges) + + + def test_update_or_create_hierarchy_from_dataframe_unwind_consolidations_multi(self): + columns = [self.region_dimension_name, "ElementType", "Alias:a", "Currency:s", "population:n", "level002", + "level001", "level000", "level002_weight", "level001_weight", "level000_weight"] + data = [ + ['France', "Numeric", "Frankreich", "EUR", 48_000_000, "South Europe", "Europe", "World", 1, 1, 1], + ['Spain', "Numeric", "Spanien", "EUR", 60_000_000, "South Europe", "Europe", "World", 1, 1, 1], + ['Switzerland', 'Numeric', "Schweiz", "CHF", 9_000_000, "North Europe", "Europe", "World", 1, 1, 1], + ['Germany', 'Numeric', "Deutschland", "EUR", 84_000_000, "North Europe", "Europe", "World", 1, 1, 1], + ['USA', 'Numeric', "Vereinigte Staaten", "USD", 346_000_000, "", "North America", "World", 0, 1, 1], + ] + df = DataFrame(data=data, columns=columns) + + self.tm1.hierarchies.update_or_create_hierarchy_from_dataframe( + dimension_name=self.region_dimension_name, + hierarchy_name=self.region_dimension_name, + df=df, + unwind_all=True + ) + + + columns = [self.region_dimension_name, "ElementType", "Alias:a", "Currency:s", "population:n"] + data = [ + ['France', "Numeric", "Frankreich", "EUR", 60_000_000], + ['Switzerland', 'Numeric', "Schweiz", "CHF", 9_000_000], + ['Germany', 'Numeric', "Deutschland", "EUR", 84_000_000], + ] + df = DataFrame(data=data, columns=columns) + + self.tm1.hierarchies.update_or_create_hierarchy_from_dataframe( + dimension_name=self.region_dimension_name, + hierarchy_name=self.region_dimension_name, + df=df, + unwind_consolidations=["South Europe", "North Europe"], + unwind_all=False + ) + + hierarchy = self.tm1.hierarchies.get( + dimension_name=self.region_dimension_name, + hierarchy_name=self.region_dimension_name) + self._verify_region_attributes(hierarchy) + + self.assertEqual(1, hierarchy.edges["World", "Europe"]) + self.assertEqual(1, hierarchy.edges["Europe", "North Europe"]) + self.assertEqual(1, hierarchy.edges["Europe", "North Europe"]) + self.assertEqual(1, hierarchy.edges["World", "North America"]) + self.assertEqual(1, hierarchy.edges["North America", "USA"]) + + self.assertNotIn(("North Europe", "Germany"), hierarchy.edges) + self.assertNotIn(("North Europe", "Switzerland"), hierarchy.edges) + self.assertNotIn(("South Europe", "Spain"), hierarchy.edges) + self.assertNotIn(("South Europe", "France"), hierarchy.edges) + if __name__ == '__main__': unittest.main() diff --git a/Tests/TM1pyDict_test.py b/Tests/TM1pyDict_test.py deleted file mode 100644 index 67ad57d3..00000000 --- a/Tests/TM1pyDict_test.py +++ /dev/null @@ -1,335 +0,0 @@ -import configparser -from pathlib import Path -import unittest - -from TM1py.Services import TM1Service -from TM1py.Utils.Utils import CaseAndSpaceInsensitiveSet, CaseAndSpaceInsensitiveDict, CaseAndSpaceInsensitiveTuplesDict - - -class TestCaseAndSpaceInsensitiveDict(unittest.TestCase): - tm1: TM1Service - map: CaseAndSpaceInsensitiveDict - - @classmethod - def setUpClass(cls): - """ - Establishes a connection to TM1 and creates TM! objects to use across all tests - """ - - # Connection to TM1 - cls.config = configparser.ConfigParser() - cls.config.read(Path(__file__).parent.joinpath('config.ini')) - cls.tm1 = TM1Service(**cls.config['tm1srv01']) - - def setUp(self): - - self.map = CaseAndSpaceInsensitiveDict() - self.map["key1"] = "value1" - self.map["key2"] = "value2" - self.map["key3"] = "value3" - - def tearDown(self): - del self.map - - def test_set(self): - self.map["key4"] = "value4" - self.assertEqual(self.map["KEY4"], "value4") - self.assertEqual(self.map["key4"], "value4") - self.assertEqual(self.map["K e Y 4"], "value4") - - def test_get(self): - self.assertEqual(self.map["KEY1"], "value1") - self.assertEqual(self.map["key2"], "value2") - self.assertEqual(self.map["K e Y 3"], "value3") - - def test_del(self): - del self.map["KEY1"] - del self.map["key2"] - del self.map["K e Y 3"] - - def test_iter(self): - for key1, key2 in zip(self.map, ("key1", "key2", "key3")): - self.assertEqual(key1, key2) - - def test_len(self): - self.assertEqual(3, len(self.map)) - - def test_copy(self): - c = self.map.copy() - self.assertIsNot(c, self.map) - self.assertEqual(c, self.map) - - def test_eq(self): - _map = CaseAndSpaceInsensitiveDict() - _map["key1"] = "value1" - _map["key2"] = "value2" - _map["key3"] = "value3" - self.assertEqual(self.map, _map) - - def test_eq_case_and_space_insensitive(self): - _map = CaseAndSpaceInsensitiveDict() - _map["key1"] = "value1" - _map["KEY2"] = "value2" - _map["K e Y 3"] = "value3" - self.assertEqual(self.map, _map) - - def test_ne(self): - _map = CaseAndSpaceInsensitiveDict() - _map["key 1"] = "wrong" - _map["key 2"] = "value2" - _map["key3"] = "value3" - self.assertNotEqual(self.map, _map) - - _map = CaseAndSpaceInsensitiveDict() - _map["key1"] = "value1" - _map["key 2"] = "wrong" - _map["key3"] = "value3" - self.assertNotEqual(self.map, _map) - - _map = CaseAndSpaceInsensitiveDict() - _map["key1"] = "value1" - _map["key2"] = "value2" - _map["key4"] = "value4" - self.assertNotEqual(self.map, _map) - - -class TestCaseAndSpaceInsensitiveSet(unittest.TestCase): - set: CaseAndSpaceInsensitiveSet - - def setUp(self): - self.original_values = ("Value1", "Value 2", "V A L U E 3") - self.set = CaseAndSpaceInsensitiveSet() - self.set.add(self.original_values[0]) - self.set.add(self.original_values[1]) - self.set.add(self.original_values[2]) - - def tearDown(self): - del self.set - - def test_get(self): - self.assertIn("Value1", self.set) - self.assertIn("VALUE1", self.set) - self.assertIn("V ALUE 1", self.set) - self.assertIn("Value2", self.set) - self.assertIn("Value2", self.set) - self.assertIn("V A L UE2", self.set) - self.assertIn("Value3", self.set) - self.assertIn("VALUE3", self.set) - self.assertIn("V A LUE3", self.set) - - self.assertNotIn("Value", self.set) - self.assertNotIn("VALUE4", self.set) - self.assertNotIn("VA LUE 4", self.set) - - def test_del(self): - del self.set["VALUE1"] - del self.set["value2"] - del self.set["V a L u E 3"] - - def test_discard(self): - self.set.discard("Value1") - self.set.discard("Value2") - self.set.discard("Value3") - # test for empty-ness - self.assertFalse(self.set) - - def test_discard_case_and_space_insensitivity(self): - self.set.discard("VAL UE 1") - self.set.discard("Value2") - self.set.discard("VA LUE3") - # test for empty-ness - self.assertFalse(self.set) - - def test_len(self): - self.set.add("Value4") - self.assertEqual(4, len(self.set)) - self.set.discard("VALUE 4") - self.assertEqual(3, len(self.set)) - - def test_iter(self): - self.assertEqual(len(self.set), len(self.original_values)) - for value in self.set: - self.assertIn(value, self.original_values) - - def test_add(self): - self.set.add("Value4") - self.assertIn("Value4", self.set) - self.assertIn("VALUE4", self.set) - self.assertIn(" VALUE4", self.set) - self.assertIn("VALUE4 ", self.set) - self.assertIn("V ALUE 4", self.set) - self.assertIn("Va L UE4", self.set) - self.assertIn(" VAlue4", self.set) - - def test_copy(self): - c = self.set.copy() - self.assertIsNot(c, self.set) - self.assertEqual(c, self.set) - - def test_eq(self): - new_set = CaseAndSpaceInsensitiveSet() - new_set.add(self.original_values[0]) - new_set.add(self.original_values[1]) - new_set.add(self.original_values[2]) - self.assertEqual(self.set, new_set) - - def test_eq_case_and_space_sensitivity(self): - new_set = CaseAndSpaceInsensitiveSet() - new_set.add(self.original_values[0].upper()) - new_set.add(self.original_values[1].lower()) - new_set.add(self.original_values[2].lower()) - self.assertEqual(self.set, new_set) - - def test_eq_against_set(self): - new_set = set() - new_set.add(self.original_values[0]) - new_set.add(self.original_values[1]) - new_set.add(self.original_values[2]) - self.assertEqual(self.set, new_set) - - def test_eq_against_set_case_and_space_sensitivity(self): - new_set = set() - new_set.add(self.original_values[0].upper()) - new_set.add(self.original_values[1].lower()) - new_set.add(self.original_values[2].upper()) - self.assertEqual(self.set, new_set) - - def test_ne(self): - new_set = CaseAndSpaceInsensitiveSet() - new_set.add("wrong1") - new_set.add("wrong2") - new_set.add("wrong3") - self.assertNotEqual(self.set, new_set) - - def test_ne_against_set(self): - new_set = set() - new_set.add("wrong1") - new_set.add("wrong2") - new_set.add("wrong3") - self.assertNotEqual(self.set, new_set) - - -class TestCaseAndSpaceInsensitiveTuplesDict(unittest.TestCase): - tm1: TM1Service - map: CaseAndSpaceInsensitiveTuplesDict - - @classmethod - def setUpClass(cls): - """ - Establishes a connection to TM1 and creates TM! objects to use across all tests - """ - - # Connection to TM1 - config = configparser.ConfigParser() - config.read(Path(__file__).parent.joinpath('config.ini')) - cls.tm1 = TM1Service(**config['tm1srv01']) - - def setUp(self): - self.map = CaseAndSpaceInsensitiveTuplesDict() - self.map[("Elem1", "Elem1")] = "Value1" - self.map[("Elem1", "Elem2")] = 2 - self.map[("Elem1", "Elem3")] = 3 - - def tearDown(self): - del self.map - - def test_del(self): - self.assertIn(("Elem1", "Elem1"), self.map) - self.assertIn(("Elem1", "Elem2"), self.map) - self.assertIn(("Elem1", "Elem3"), self.map) - del self.map[("El em1", "ELEM1")] - del self.map[("El em1", "E L E M 2")] - del self.map[("El em1", " eLEM3")] - self.assertNotIn(("Elem1", "Elem1"), self.map) - self.assertNotIn(("Elem1", "Elem2"), self.map) - self.assertNotIn(("Elem1", "Elem3"), self.map) - - def test_eq(self): - _map = CaseAndSpaceInsensitiveTuplesDict() - _map[("Elem1", "Elem1")] = "Value1" - _map[("Elem1", "Elem2")] = 2 - _map[("Elem1", "Elem3")] = 3 - self.assertEqual(_map, self.map) - - _map = CaseAndSpaceInsensitiveTuplesDict() - _map[("Elem 1", "Elem1")] = "Value1" - _map[("ELEM 1", "E L E M 2")] = 2 - _map[(" Elem1 ", "Elem 3")] = 3 - self.assertEqual(_map, self.map) - - def test_ne(self): - _map = CaseAndSpaceInsensitiveTuplesDict() - _map[("Elem1", "Elem1")] = "Value1" - _map[("Elem1", "Elem2")] = 0 - _map[("Elem1", "Elem3")] = 3 - self.assertNotEqual(_map, self.map) - - _map = CaseAndSpaceInsensitiveTuplesDict() - _map[("Elem 1", "Elem1")] = "Value1" - _map[("ELEM 1", "E L E M 2")] = "wrong" - _map[(" Elem1 ", "Elem 3")] = 3 - self.assertNotEqual(_map, self.map) - - _map = CaseAndSpaceInsensitiveTuplesDict() - _map[("wrong", "Elem1")] = "Value1" - _map[("Elem1", "Elem2")] = 2 - _map[("Elem1", "Elem3")] = 3 - self.assertNotEqual(_map, self.map) - - def test_get(self): - self.assertEqual(self.map[("ELEM1", "ELEM1")], "Value1") - self.assertEqual(self.map[("elem1", "e l e m 2")], 2) - self.assertEqual(self.map[("e l e M 1", "elem3")], 3) - self.assertNotEqual(self.map[("e l e M 1", "elem3")], 2) - - def test_iter(self): - for tuple1, tuple2 in zip(self.map, [("Elem1", "Elem1"), ("Elem1", "Elem2"), ("Elem1", "Elem3")]): - self.assertEqual(tuple1, tuple2) - - def test_len(self): - self.assertEqual(len(self.map), 3) - - def test_set(self): - self.map[("E L E M 1", "E L E M 2")] = 3 - self.assertEqual(self.map[("Elem1", "Elem2")], 3) - - def test_copy(self): - c = self.map.copy() - self.assertIsNot(c, self.map) - self.assertEqual(c, self.map) - - def test_full(self): - mdx_rows = '[}Clients].Members' - mdx_columns = '[}Groups].Members' - cube_name = '[}ClientGroups]' - mdx = 'SELECT {} ON ROWS, {} ON COLUMNS FROM {}'.format(mdx_rows, mdx_columns, cube_name) - data = self.tm1.cubes.cells.execute_mdx(mdx) - - # Get - if self.tm1.version[0:2] == '10': - coordinates = ('[}Clients].[ad min]', '[}Groups].[ADM IN]') - else: - coordinates = ('[}Clients].[}Clients].[ad min]', '[}Groups].[}Groups].[ADM IN]') - self.assertIsNotNone(data[coordinates]) - - # Delete - if self.tm1.version[0:2] == '10': - coordinates = ('[}clients].[}clients].[admin]', '[}groups].[}groups].[admin]') - else: - coordinates = ('[}clients].[}clients].[admin]', '[}groups].[}groups].[admin]') - self.assertTrue(coordinates in data) - del data[coordinates] - self.assertFalse(coordinates in data) - - # Copy - data_cloned = data.copy() - self.assertTrue(data_cloned == data) - self.assertFalse(data_cloned is data) - - @classmethod - def tearDownClass(cls): - cls.tm1.logout() - - -if __name__ == '__main__': - unittest.main()