diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ae1ffbba..711116ee 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -9,4 +9,3 @@ repos: rev: 22.3.0 hooks: - id: black - language_version: python3.9 diff --git a/convokit/model/convoKitMeta.py b/convokit/model/convoKitMeta.py index 343782ed..299047ad 100644 --- a/convokit/model/convoKitMeta.py +++ b/convokit/model/convoKitMeta.py @@ -2,9 +2,11 @@ from collections.abc import MutableMapping except: from collections import MutableMapping +from numpy import isin from convokit.util import warn from .convoKitIndex import ConvoKitIndex import json +from typing import Union # See reference: https://stackoverflow.com/questions/7760916/correct-usage-of-a-getter-setter-for-dictionary-values @@ -14,12 +16,30 @@ class ConvoKitMeta(MutableMapping, dict): ConvoKitMeta is a dictlike object that stores the metadata attributes of a corpus component """ - def __init__(self, convokit_index, obj_type): + def __init__(self, owner, convokit_index, obj_type): + self.owner = owner # Corpus or CorpusComponent self.index: ConvoKitIndex = convokit_index self.obj_type = obj_type + self._get_storage().initialize_data_for_component("meta", self.storage_key) + + @property + def storage_key(self) -> str: + return f"{self.obj_type}_{self.owner.id}" + def __getitem__(self, item): - return dict.__getitem__(self, item) + return self._get_storage().get_data("meta", self.storage_key, item) + + def _get_storage(self): + # special case for Corpus meta since that's the only time owner is not a CorpusComponent + # since cannot directly import Corpus to check the type (circular import), as a proxy we + # check for the obj_type attribute which is common to all CorpusComponent but not + # present in Corpus + if not hasattr(self.owner, "obj_type"): + return self.owner.storage + # self.owner -> CorpusComponent + # self.owner.owner -> Corpus that owns the CorpusComponent (only Corpus has direct pointer to storage) + return self.owner.owner.storage @staticmethod def _check_type_and_update_index(index, obj_type, key, value): @@ -45,12 +65,12 @@ def __setitem__(self, key, value): if self.index.type_check: ConvoKitMeta._check_type_and_update_index(self.index, self.obj_type, key, value) - dict.__setitem__(self, key, value) + self._get_storage().update_data("meta", self.storage_key, key, value) def __delitem__(self, key): if self.obj_type == "corpus": - dict.__delitem__(self, key) self.index.del_from_index(self.obj_type, key) + self._get_storage().delete_data("meta", self.storage_key, key) else: if self.index.lock_metadata_deletion[self.obj_type]: warn( @@ -62,19 +82,36 @@ def __delitem__(self, key): ) ) else: - dict.__delitem__(self, key) + self._get_storage().delete_data("meta", self.storage_key, key) def __iter__(self): - return dict.__iter__(self) + return self._get_storage().get_data("meta", self.storage_key).__iter__() def __len__(self): - return dict.__len__(self) + return self._get_storage().get_data("meta", self.storage_key).__len__() def __contains__(self, x): - return dict.__contains__(self, x) + return self._get_storage().get_data("meta", self.storage_key).__contains__(x) + + def __repr__(self) -> str: + return "ConvoKitMeta(" + self.to_dict().__repr__() + ")" def to_dict(self): - return self.__dict__ + return dict(self._get_storage().get_data("meta", self.storage_key)) + + def reinitialize_from(self, other: Union["ConvoKitMeta", dict]): + """ + Reinitialize this ConvoKitMeta instance with the data from other + """ + if isinstance(other, ConvoKitMeta): + other = {k: v for k, v in other.to_dict().items()} + elif not isinstance(other, dict): + raise TypeError( + "ConvoKitMeta can only be reinitialized from a dict instance or another ConvoKitMeta" + ) + self._get_storage().initialize_data_for_component( + "meta", self.storage_key, overwrite=True, initial_value=other + ) _basic_types = {type(0), type(1.0), type("str"), type(True)} # cannot include lists or dicts diff --git a/convokit/model/corpus.py b/convokit/model/corpus.py index 84ae9fac..8f151ff7 100644 --- a/convokit/model/corpus.py +++ b/convokit/model/corpus.py @@ -8,6 +8,7 @@ import random from .convoKitMeta import ConvoKitMeta from .convoKitMatrix import ConvoKitMatrix +from .storageManager import StorageManager, MemStorageManager import shutil @@ -52,6 +53,7 @@ def __init__( exclude_speaker_meta: Optional[List[str]] = None, exclude_overall_meta: Optional[List[str]] = None, disable_type_check=True, + storage: Optional[StorageManager] = None, ): if filename is None: @@ -61,8 +63,19 @@ def __init__( else: self.corpus_dirpath = os.path.dirname(filename) + self.id = None + if filename is not None: + # automatically derive an ID from the file path + self.id = os.path.basename(os.path.normpath(filename)) + + # Setup storage + if storage is not None: + self.storage = storage + else: + self.storage = MemStorageManager() + self.meta_index = ConvoKitIndex(self) - self.meta = ConvoKitMeta(self.meta_index, "corpus") + self.meta = ConvoKitMeta(self, self.meta_index, "corpus") # private storage self._vector_matrices = dict() @@ -601,38 +614,60 @@ def filter_conversations_by(self, selector: Callable[[Conversation], bool]): } self.update_speakers_data() self.reinitialize_index() + + # clear all storage entries corresponding to filtered-out components + meta_ids = [] + for utt in self.iter_utterances(): + meta_ids.append(utt.meta.storage_key) + for convo in self.iter_conversations(): + meta_ids.append(convo.meta.storage_key) + for speaker in self.iter_speakers(): + meta_ids.append(speaker.meta.storage_key) + self.storage.purge_obsolete_entries( + self.get_utterance_ids, self.get_conversation_ids(), self.get_speaker_ids(), meta_ids + ) + return self - def filter_utterances_by(self, selector: Callable[[Utterance], bool]): + @staticmethod + def filter_utterances(source_corpus: "Corpus", selector: Callable[[Utterance], bool]): """ - Returns a new corpus that includes only a subset of Utterances within this Corpus. This filtering provides no + Returns a new corpus that includes only a subset of Utterances from the source Corpus. This filtering provides no guarantees with regard to maintaining conversational integrity and should be used with care. - Vectors are not preserved. + Vectors are not preserved. The source corpus will be invalidated and will no longer be usable. + :param source_corpus: the Corpus to subset from :param selector: function for selecting which :return: a new Corpus with a subset of the Utterances """ - utts = list(self.iter_utterances(selector)) + utts = list(source_corpus.iter_utterances(selector)) new_corpus = Corpus(utterances=utts) for convo in new_corpus.iter_conversations(): - convo.meta.update(self.get_conversation(convo.id).meta) + convo.meta.update(source_corpus.get_conversation(convo.id).meta) + + # original Corpus is invalidated and no longer usable; clear all data from + # its now-orphaned StorageManager to avoid having duplicates in memory + source_corpus.storage.clear_all_data() + return new_corpus + @staticmethod def reindex_conversations( - self, + source_corpus: "Corpus", new_convo_roots: List[str], preserve_corpus_meta: bool = True, preserve_convo_meta: bool = True, verbose=True, ) -> "Corpus": """ - Generates a new Corpus from current Corpus with specified list of utterance ids to use as conversation ids. + Generates a new Corpus from source Corpus with specified list of utterance ids to use as conversation ids. The subtrees denoted by these utterance ids should be distinct and should not overlap, otherwise there may be unexpected behavior. - Vectors are not preserved. The original Corpus will be mutated. + Vectors are not preserved. The source Corpus will be invalidated and no longer usable. + :param source_corpus: the Corpus containing the original data to select from :param new_convo_roots: List of utterance ids to use as conversation ids :param preserve_corpus_meta: set as True to copy original Corpus metadata to new Corpus :param preserve_convo_meta: set as True to copy original Conversation metadata to new Conversation metadata @@ -641,7 +676,7 @@ def reindex_conversations( :return: new Corpus with reindexed Conversations """ "" new_convo_roots = set(new_convo_roots) - for convo in self.iter_conversations(): + for convo in source_corpus.iter_conversations(): try: convo.initialize_tree_structure() except ValueError as e: @@ -652,7 +687,9 @@ def reindex_conversations( original_utt_to_convo_id = dict() for utt_id in new_convo_roots: - orig_convo = self.get_conversation(self.get_utterance(utt_id).conversation_id) + orig_convo = source_corpus.get_conversation( + source_corpus.get_utterance(utt_id).conversation_id + ) original_utt_to_convo_id[utt_id] = orig_convo.id try: subtree = orig_convo.get_subtree(utt_id) @@ -668,13 +705,13 @@ def reindex_conversations( new_corpus = Corpus(utterances=new_corpus_utts) if preserve_corpus_meta: - new_corpus.meta.update(self.meta) + new_corpus.meta.update(source_corpus.meta) if preserve_convo_meta: for convo in new_corpus.iter_conversations(): - convo.meta["original_convo_meta"] = self.get_conversation( + convo.meta["original_convo_meta"] = source_corpus.get_conversation( original_utt_to_convo_id[convo.id] - ).meta + ).meta.to_dict() convo.meta["original_convo_id"] = original_utt_to_convo_id[convo.id] if verbose: missing_convo_roots = list( @@ -684,6 +721,10 @@ def reindex_conversations( warn("Failed to find some of the specified new convo roots:\n") print(missing_convo_roots) + # original Corpus is invalidated and no longer usable; clear all data from + # its now-orphaned StorageManager to avoid having duplicates in memory + source_corpus.storage.clear_all_data() + return new_corpus def get_meta(self) -> Dict: @@ -905,54 +946,57 @@ def reinitialize_index(self): new_index.version = old_index.version self.meta_index = new_index - def merge(self, other_corpus, warnings: bool = True): + @staticmethod + def merge(primary: "Corpus", secondary: "Corpus", warnings: bool = True): """ - Merges this corpus with another corpus. + Merges two corpora (one primary and one secondary), creating a new Corpus with their combined data. - Utterances with the same id must share the same data, otherwise the other corpus utterance data & metadata + Utterances with the same id must share the same data. In case of conflicts, + the primary Corpus will take precedence and the conflicting Utterance from secondary will be ignored. A warning is printed when this happens. - If metadata of this corpus (or its conversations / utterances) shares a key with the metadata of the - other corpus, the other corpus's metadata (or its conversations / utterances) values will be used. A warning + If metadata of the primary Corpus (or its conversations / utterances) shares a key with the metadata of the + secondary Corpus, the secondary's metadata (or its conversations / utterances) values will be used. A warning is printed when this happens. - May mutate original and other corpus in the process. + Will invalidate primary and secondary in the process. - (Updates internal ConvoKit Index to match post-merge state and uses this Corpus's version number.) + The resulting Corpus will inherit the primary Corpus's id and version number. - :param other_corpus: Corpus + :param primary: the primary Corpus + :param secondary: the secondary Corpus :param warnings: print warnings when data conflicts are encountered :return: new Corpus constructed from combined lists of utterances """ - utts1 = list(self.iter_utterances()) - utts2 = list(other_corpus.iter_utterances()) - combined_utts = self._merge_utterances(utts1, utts2, warnings=warnings) + utts1 = list(primary.iter_utterances()) + utts2 = list(secondary.iter_utterances()) + combined_utts = primary._merge_utterances(utts1, utts2, warnings=warnings) new_corpus = Corpus(utterances=list(combined_utts)) # Note that we collect Speakers from the utt sets directly instead of the combined utts, otherwise # differences in Speaker meta will not be registered for duplicate Utterances (because utts would be discarded # during merging) - speakers_meta, speakers_meta_conflict = self._collect_speaker_data([utts1, utts2]) + speakers_meta, speakers_meta_conflict = primary._collect_speaker_data([utts1, utts2]) Corpus._update_corpus_speaker_data( new_corpus, speakers_meta, speakers_meta_conflict, warnings=warnings ) # Merge CORPUS metadata - new_corpus.meta = self.meta - for key, val in other_corpus.meta.items(): + new_corpus.meta.reinitialize_from(primary.meta) + for key, val in secondary.meta.items(): if key in new_corpus.meta and new_corpus.meta[key] != val: if warnings: warn( - "Found conflicting values for Corpus metadata key: {}. " - "Overwriting with other Corpus's metadata.".format(repr(key)) + "Found conflicting values for primary Corpus metadata key: {}. " + "Overwriting with secondary Corpus's metadata.".format(repr(key)) ) new_corpus.meta[key] = val # Merge CONVERSATION metadata - convos1 = self.iter_conversations() - convos2 = other_corpus.iter_conversations() + convos1 = primary.iter_conversations() + convos2 = secondary.iter_conversations() for convo in convos1: - new_corpus.get_conversation(convo.id).meta = convo.meta + new_corpus.get_conversation(convo.id).meta.reinitialize_from(convo.meta) for convo in convos2: for key, val in convo.meta.items(): @@ -961,7 +1005,7 @@ def merge(self, other_corpus, warnings: bool = True): if warnings: warn( "Found conflicting values for Conversation {} for metadata key: {}. " - "Overwriting with other corpus's Conversation metadata.".format( + "Overwriting with secondary corpus's Conversation metadata.".format( repr(convo.id), repr(key) ) ) @@ -970,6 +1014,12 @@ def merge(self, other_corpus, warnings: bool = True): new_corpus.update_speakers_data() new_corpus.reinitialize_index() + # source corpora are now invalidated and all needed data has been copied + # into the new merged corpus; clear the source corpora's storage to + # prevent having duplicates in memory + primary.storage.clear_all_data() + secondary.storage.clear_all_data() + return new_corpus def add_utterances(self, utterances=List[Utterance], warnings: bool = False, with_checks=True): @@ -988,40 +1038,57 @@ def add_utterances(self, utterances=List[Utterance], warnings: bool = False, wit :return: a Corpus with the utterances from this Corpus and the input utterances combined """ if with_checks: - helper_corpus = Corpus(utterances=utterances) - return self.merge(helper_corpus, warnings=warnings) - else: - new_speakers = {u.speaker.id: u.speaker for u in utterances} - new_utterances = {u.id: u for u in utterances} - for speaker in new_speakers.values(): - speaker.owner = self - for utt in new_utterances.values(): - utt.owner = self - - # update corpus speakers - for new_speaker_id, new_speaker in new_speakers.items(): - if new_speaker_id not in self.speakers: - self.speakers[new_speaker_id] = new_speaker - - # update corpus utterances + (link speaker -> utt) - for new_utt_id, new_utt in new_utterances.items(): - if new_utt_id not in self.utterances: - self.utterances[new_utt_id] = new_utt - self.speakers[new_utt.speaker.id]._add_utterance(new_utt) - - # update corpus conversations + (link convo <-> utt) - new_convos = defaultdict(list) - for utt in new_utterances.values(): - if utt.conversation_id in self.conversations: + # leverage the merge method's _merge_utterances method to run the checks + # (but then run a subsequent filtering operation since we aren't actually doing a merge) + added_utt_ids = {utt.id for utt in utterances} + combined_utts = self._merge_utterances( + list(self.iter_utterances()), utterances, warnings + ) + speakers_meta, speakers_meta_conflict = self._collect_speaker_data( + [list(self.iter_utterances()), utterances] + ) + utterances = [utt for utt in combined_utts if utt.id in added_utt_ids] + + new_speakers = {u.speaker.id: u.speaker for u in utterances} + new_utterances = {u.id: u for u in utterances} + for speaker in new_speakers.values(): + speaker.owner = self + for utt in new_utterances.values(): + utt.owner = self + + # update corpus speakers + for new_speaker_id, new_speaker in new_speakers.items(): + if new_speaker_id not in self.speakers: + self.speakers[new_speaker_id] = new_speaker + + # update corpus utterances + (link speaker -> utt) + for new_utt_id, new_utt in new_utterances.items(): + if new_utt_id not in self.utterances: + self.utterances[new_utt_id] = new_utt + self.speakers[new_utt.speaker.id]._add_utterance(new_utt) + + # update corpus conversations + (link convo <-> utt) + new_convos = defaultdict(list) + for utt in new_utterances.values(): + if utt.conversation_id in self.conversations: + if (not with_checks) or ( + utt.id not in self.conversations[utt.conversation_id].get_utterance_ids() + ): self.conversations[utt.conversation_id]._add_utterance(utt) - else: - new_convos[utt.conversation_id].append(utt.id) - for convo_id, convo_utts in new_convos.items(): - new_convo = Conversation(owner=self, id=convo_id, utterances=convo_utts, meta=None) - self.conversations[convo_id] = new_convo - # (link speaker -> convo) - new_convo_speaker = self.speakers[new_convo.get_utterance(convo_id).speaker.id] - new_convo_speaker._add_conversation(new_convo) + else: + new_convos[utt.conversation_id].append(utt.id) + for convo_id, convo_utts in new_convos.items(): + new_convo = Conversation(owner=self, id=convo_id, utterances=convo_utts, meta=None) + self.conversations[convo_id] = new_convo + # (link speaker -> convo) + new_convo_speaker = self.speakers[new_convo.get_utterance(convo_id).speaker.id] + new_convo_speaker._add_conversation(new_convo) + + # update speaker metadata (only in cases of conflict) + if with_checks: + Corpus._update_corpus_speaker_data( + self, speakers_meta, speakers_meta_conflict, warnings + ) return self diff --git a/convokit/model/corpusComponent.py b/convokit/model/corpusComponent.py index 78e70240..929fc24f 100644 --- a/convokit/model/corpusComponent.py +++ b/convokit/model/corpusComponent.py @@ -4,30 +4,79 @@ class CorpusComponent: - def __init__(self, obj_type: str, owner=None, id=None, vectors: List[str] = None, meta=None): + def __init__( + self, + obj_type: str, + owner=None, + id=None, + initial_data=None, + vectors: List[str] = None, + meta=None, + ): self.obj_type = obj_type # utterance, speaker, conversation self._owner = owner + self._id = id + self.vectors = vectors if vectors is not None else [] + + # if the CorpusComponent is initialized with an owner set up an entry + # in the owner's storage; if it is not initialized with an owner + # (i.e. it is a standalone object) set up a dict-based temp storage + if self.owner is None: + self._temp_storage = initial_data if initial_data is not None else {} + else: + self.owner.storage.initialize_data_for_component( + self.obj_type, + self._id, + initial_value=(initial_data if initial_data is not None else {}), + ) + if meta is None: meta = dict() self.meta = self.init_meta(meta) - self.id = id - self.vectors = vectors if vectors is not None else [] def get_owner(self): return self._owner def set_owner(self, owner): + if owner is self._owner: + # no action needed + return + # stash the metadata first since reassigning self._owner will break its storage connection + meta_vals = {k: v for k, v in self.meta.items()} + previous_owner = self._owner self._owner = owner if owner is not None: - self.meta = self.init_meta(self.meta) + # when a new owner Corpus is assigned, we must take the following steps: + # (1) transfer this component's data to the new owner's StorageManager + # (2) avoid duplicates by removing the data from the old owner (or temp storage if there was no prior owner) + # (3) reinitialize the metadata instance + data_dict = ( + dict(previous_owner.storage.get_data(self.obj_type, self.id)) + if previous_owner is not None + else self._temp_storage + ) + self.owner.storage.initialize_data_for_component( + self.obj_type, self.id, initial_value=data_dict + ) + if previous_owner is not None: + previous_owner.storage.delete_data(self.obj_type, self.id) + previous_owner.storage.delete_data("meta", self.meta.storage_key) + else: + del self._temp_storage + self.meta = self.init_meta(meta_vals) owner = property(get_owner, set_owner) def init_meta(self, meta): if self._owner is None: + # ConvoKitMeta instances are not allowed for ownerless (standalone) + # components since they must be backed by a StorageManager. In this + # case we must forcibly convert the ConvoKitMeta instance to dict + if type(meta) == ConvoKitMeta: + meta = meta.to_dict() return meta else: - ck_meta = ConvoKitMeta(self.owner.meta_index, self.obj_type) + ck_meta = ConvoKitMeta(self, self.owner.meta_index, self.obj_type) for key, value in meta.items(): ck_meta[key] = value return ck_meta @@ -48,6 +97,17 @@ def set_id(self, value): id = property(get_id, set_id) + def get_data(self, property_name): + if self._owner is None: + return self._temp_storage[property_name] + return self.owner.storage.get_data(self.obj_type, self.id, property_name) + + def set_data(self, property_name, value): + if self._owner is None: + self._temp_storage[property_name] = value + else: + self.owner.storage.update_data(self.obj_type, self.id, property_name, value) + # def __eq__(self, other): # if type(self) != type(other): return False # # do not compare 'utterances' and 'conversations' in Speaker.__dict__. recursion loop will occur. @@ -135,6 +195,13 @@ def delete_vector(self, vector_name: str): """ self.vectors.remove(vector_name) + def to_dict(self): + return { + "id": self.id, + "vectors": self.vectors, + "meta": self.meta if type(self.meta) == dict else self.meta.to_dict(), + } + def __str__(self): return "{}(id: {}, vectors: {}, meta: {})".format( self.obj_type.capitalize(), self.id, self.vectors, self.meta diff --git a/convokit/model/corpusUtil.py b/convokit/model/corpusUtil.py index e33bab46..99e36a19 100644 --- a/convokit/model/corpusUtil.py +++ b/convokit/model/corpusUtil.py @@ -19,7 +19,7 @@ def get_utterances_dataframe(obj, selector=lambda utt: True, exclude_meta: bool """ ds = dict() for utt in obj.iter_utterances(selector): - d = utt.__dict__.copy() + d = utt.to_dict().copy() if not exclude_meta: for k, v in d["meta"].items(): d["meta." + k] = v @@ -27,9 +27,7 @@ def get_utterances_dataframe(obj, selector=lambda utt: True, exclude_meta: bool ds[utt.id] = d df = pd.DataFrame(ds).T - df["id"] = df["_id"] df = df.set_index("id") - df = df.drop(["_id", "_owner", "obj_type", "user", "_root"], axis=1) df["speaker"] = df["speaker"].map(lambda spkr: spkr.id) meta_columns = [k for k in df.columns if k.startswith("meta.")] return df[ @@ -50,7 +48,7 @@ def get_conversations_dataframe(obj, selector=lambda convo: True, exclude_meta: """ ds = dict() for convo in obj.iter_conversations(selector): - d = convo.__dict__.copy() + d = convo.to_dict().copy() if not exclude_meta: for k, v in d["meta"].items(): d["meta." + k] = v @@ -58,9 +56,7 @@ def get_conversations_dataframe(obj, selector=lambda convo: True, exclude_meta: ds[convo.id] = d df = pd.DataFrame(ds).T - df["id"] = df["_id"] - df = df.set_index("id") - return df.drop(["_owner", "obj_type", "_utterance_ids", "_speaker_ids", "tree", "_id"], axis=1) + return df.set_index("id") def get_speakers_dataframe(obj, selector=lambda utt: True, exclude_meta: bool = False): @@ -75,7 +71,7 @@ def get_speakers_dataframe(obj, selector=lambda utt: True, exclude_meta: bool = """ ds = dict() for spkr in obj.iter_speakers(selector): - d = spkr.__dict__.copy() + d = spkr.to_dict().copy() if not exclude_meta: for k, v in d["meta"].items(): d["meta." + k] = v @@ -83,6 +79,4 @@ def get_speakers_dataframe(obj, selector=lambda utt: True, exclude_meta: bool = ds[spkr.id] = d df = pd.DataFrame(ds).T - df["id"] = df["_id"] - df = df.set_index("id") - return df.drop(["_owner", "obj_type", "utterances", "conversations", "_id"], axis=1) + return df.set_index("id") diff --git a/convokit/model/speaker.py b/convokit/model/speaker.py index 09e27c20..a6392d49 100644 --- a/convokit/model/speaker.py +++ b/convokit/model/speaker.py @@ -176,3 +176,8 @@ def __eq__(self, other): return self.id == other.id except AttributeError: return self.__dict__["_name"] == other.__dict__["_name"] + + def __str__(self): + return "Speaker(id: {}, vectors: {}, meta: {})".format( + repr(self.id), self.vectors, self.meta + ) diff --git a/convokit/model/storageManager.py b/convokit/model/storageManager.py new file mode 100644 index 00000000..303ad617 --- /dev/null +++ b/convokit/model/storageManager.py @@ -0,0 +1,171 @@ +from typing import Optional +from abc import ABCMeta, abstractmethod + + +class StorageManager(metaclass=ABCMeta): + """ + Abstraction layer for the concrete representation of data and metadata + within corpus components (e.g., Utterance text and timestamps). All requests + to access or modify corpusComponent fields (with the exception of ID) are + actually routed through one of StorageManager's concrete subclasses. Each + subclass implements a storage backend that contains the actual data. + """ + + def __init__(self): + # concrete data storage (i.e., collections) for each component type + # this will be assigned in subclasses + self.data = {"utterance": None, "conversation": None, "speaker": None, "meta": None} + + @abstractmethod + def get_collection_ids(self, component_type: str): + """ + Returns a list of all object IDs within the component_type collection + """ + return NotImplemented + + @abstractmethod + def has_data_for_component(self, component_type: str, component_id: str) -> bool: + """ + Check if there is an existing entry for the component of type component_type + with id component_id + """ + return NotImplemented + + @abstractmethod + def initialize_data_for_component( + self, component_type: str, component_id: str, overwrite: bool = False, initial_value=None + ): + """ + Create a blank entry for a component of type component_type with id + component_id. Will avoid overwriting any existing data unless the + overwrite parameter is set to True. + """ + return NotImplemented + + @abstractmethod + def get_data(self, component_type: str, component_id: str, property_name: Optional[str] = None): + """ + Retrieve the property data for the component of type component_type with + id component_id. If property_name is specified return only the data for + that property, otherwise return the dict containing all properties. + """ + return NotImplemented + + @abstractmethod + def update_data(self, component_type: str, component_id: str, property_name: str, new_value): + """ + Set or update the property data for the component of type component_type + with id component_id + """ + return NotImplemented + + @abstractmethod + def delete_data( + self, component_type: str, component_id: str, property_name: Optional[str] = None + ): + """ + Delete a data entry from this StorageManager for the component of type + component_type with id component_id. If property_name is specified + delete only that property, otherwise delete the entire entry. + """ + return NotImplemented + + @abstractmethod + def clear_all_data(self): + """ + Erase all data from this StorageManager (i.e., reset self.data to its + initial empty state; Python will garbage-collect the now-unreferenced + old data entries). This is used for cleanup after destructive Corpus + operations. + """ + return NotImplemented + + def get_collection(self, component_type: str): + if component_type not in self.data: + raise ValueError( + 'component_type must be one of "utterance", "conversation", "speaker", or "meta".' + ) + return self.data[component_type] + + def purge_obsolete_entries(self, utterance_ids, conversation_ids, speaker_ids, meta_ids): + """ + Compare the entries in this StorageManager to the existing component ids + provided as parameters, and delete any entries that are not found in the + parameter ids. + """ + ref_ids = { + "utterance": set(utterance_ids), + "conversation": set(conversation_ids), + "speaker": set(speaker_ids), + "meta": set(meta_ids), + } + for obj_type in self.data: + for obj_id in self.get_collection_ids(obj_type): + if obj_id not in ref_ids[obj_type]: + self.delete_data(obj_type, obj_id) + + +class MemStorageManager(StorageManager): + """ + Concrete StorageManager implementation for in-memory data storage. + Collections are implemented as vanilla Python dicts. + """ + + def __init__(self): + super().__init__() + + # initialize component collections as dicts + for key in self.data: + self.data[key] = {} + + def get_collection_ids(self, component_type: str): + return list(self.get_collection(component_type).keys()) + + def has_data_for_component(self, component_type: str, component_id: str) -> bool: + collection = self.get_collection(component_type) + return component_id in collection + + def initialize_data_for_component( + self, component_type: str, component_id: str, overwrite: bool = False, initial_value=None + ): + collection = self.get_collection(component_type) + if overwrite or not self.has_data_for_component(component_type, component_id): + collection[component_id] = initial_value if initial_value is not None else {} + + def get_data(self, component_type: str, component_id: str, property_name: Optional[str] = None): + collection = self.get_collection(component_type) + if component_id not in collection: + raise KeyError( + f"This StorageManager does not have an entry for the {component_type} with id {component_id}." + ) + if property_name is None: + return collection[component_id] + else: + return collection[component_id][property_name] + + def update_data(self, component_type: str, component_id: str, property_name: str, new_value): + collection = self.get_collection(component_type) + # don't create new collections if the ID is not found; this is supposed to be handled in the + # CorpusComponent constructor so if the ID is missing that indicates something is wrong + if component_id not in collection: + raise KeyError( + f"This StorageManager does not have an entry for the {component_type} with id {component_id}." + ) + collection[component_id][property_name] = new_value + + def delete_data( + self, component_type: str, component_id: str, property_name: Optional[str] = None + ): + collection = self.get_collection(component_type) + if component_id not in collection: + raise KeyError( + f"This StorageManager does not have an entry for the {component_type} with id {component_id}." + ) + if property_name is None: + del collection[component_id] + else: + del collection[component_id][property_name] + + def clear_all_data(self): + for key in self.data: + self.data[key] = {} diff --git a/convokit/model/utterance.py b/convokit/model/utterance.py index c2898a04..a3a4edda 100644 --- a/convokit/model/utterance.py +++ b/convokit/model/utterance.py @@ -38,29 +38,83 @@ def __init__( text: str = "", meta: Optional[Dict] = None, ): - super().__init__(obj_type="utterance", owner=owner, id=id, meta=meta) - speaker_ = speaker if speaker is not None else user - self.speaker = speaker_ - if self.speaker is None: - raise ValueError("No Speaker found: Utterance must be initialized with a Speaker.") - self.user = speaker # for backwards compatbility - self.conversation_id = conversation_id if conversation_id is not None else root - if self.conversation_id is not None and not isinstance(self.conversation_id, str): + # check arguments that have alternate naming due to backwards compatibility + if speaker is None: + if user is not None: + speaker = user + else: + raise ValueError("No Speaker found: Utterance must be initialized with a Speaker.") + if conversation_id is None and root is not None: + conversation_id = root + + if conversation_id is not None and not isinstance(conversation_id, str): warn( "Utterance conversation_id must be a string: conversation_id of utterance with ID: {} " - "has been casted to a string.".format(self.id) + "has been casted to a string.".format(id) ) - self.conversation_id = str(self.conversation_id) - self._root = self.conversation_id - self.reply_to = reply_to - self.timestamp = timestamp # int(timestamp) if timestamp is not None else timestamp + conversation_id = str(conversation_id) if not isinstance(text, str): warn( "Utterance text must be a string: text of utterance with ID: {} " - "has been casted to a string.".format(self.id) + "has been casted to a string.".format(id) ) text = "" if text is None else str(text) - self.text = text + + props = { + "speaker_id": speaker.id, + "conversation_id": conversation_id, + "reply_to": reply_to, + "timestamp": timestamp, + "text": text, + } + super().__init__(obj_type="utterance", owner=owner, id=id, initial_data=props, meta=meta) + self.speaker_ = speaker + + ############################################################################ + ## directly-accessible class properties (roughly equivalent to keys in the + ## JSON, plus aliases for compatibility) + ############################################################################ + + def _get_speaker(self): + return self.speaker_ + + def _set_speaker(self, val): + self.speaker_ = val + self.set_data("speaker_id", self.speaker.id) + + speaker = property(_get_speaker, _set_speaker) + + def _get_conversation_id(self): + return self.get_data("conversation_id") + + def _set_conversation_id(self, val): + self.set_data("conversation_id", val) + + conversation_id = property(_get_conversation_id, _set_conversation_id) + + def _get_reply_to(self): + return self.get_data("reply_to") + + def _set_reply_to(self, val): + self.set_data("reply_to", val) + + reply_to = property(_get_reply_to, _set_reply_to) + + def _get_timestamp(self): + return self.get_data("timestamp") + + def _set_timestamp(self, val): + self.set_data("timestamp", val) + + timestamp = property(_get_timestamp, _set_timestamp) + + def _get_text(self): + return self.get_data("text") + + def _set_text(self, val): + self.set_data("text", val) + + text = property(_get_text, _set_text) def _get_root(self): deprecation("utterance.root", "utterance.conversation_id") @@ -73,6 +127,10 @@ def _set_root(self, value: str): root = property(_get_root, _set_root) + ############################################################################ + ## end properties + ############################################################################ + def get_conversation(self): """ Get the Conversation (identified by Utterance.conversation_id) this Utterance belongs to @@ -90,6 +148,18 @@ def get_speaker(self): return self.speaker + def to_dict(self): + return { + "id": self.id, + "conversation_id": self.conversation_id, + "reply_to": self.reply_to, + "speaker": self.speaker, + "timestamp": self.timestamp, + "text": self.text, + "vectors": self.vectors, + "meta": self.meta if type(self.meta) == dict else self.meta.to_dict(), + } + def __hash__(self): return super().__hash__() diff --git a/convokit/tests/general/test_convo_traversal.py b/convokit/tests/general/test_convo_traversal.py index 29ade7d5..b96685cf 100644 --- a/convokit/tests/general/test_convo_traversal.py +++ b/convokit/tests/general/test_convo_traversal.py @@ -236,24 +236,31 @@ def test_one_utt_convo(self): self.assertEqual([utt.id for utt in convo.traverse("preorder")], ["other"]) def test_reindex_corpus(self): + self.setUp() # reinitialize corpus since reindex is destructive + original_convo_meta = { + k: v for k, v in self.corpus.get_conversation("0").meta.to_dict().items() + } + original_corpus_meta = {k: v for k, v in self.corpus.meta.to_dict().items()} new_convo_conversation_ids = ["1", "2", "3"] - new_corpus = self.corpus.reindex_conversations(new_convo_conversation_ids) + new_corpus = Corpus.reindex_conversations(self.corpus, new_convo_conversation_ids) # checking for correct number of conversations and utterances self.assertEqual(len(list(new_corpus.iter_conversations())), 3) self.assertEqual(len(list(new_corpus.iter_utterances())), 11) # checking that corpus and conversation metadata was preserved for convo in new_corpus.iter_conversations(): - self.assertEqual( - convo.meta["original_convo_meta"], self.corpus.get_conversation("0").meta - ) + self.assertEqual(convo.meta["original_convo_meta"], original_convo_meta) - self.assertEqual(self.corpus.meta, new_corpus.meta) + self.assertEqual(original_corpus_meta, new_corpus.meta) def test_reindex_corpus2(self): + self.setUp() # reinitialize corpus since reindex is destructive new_convo_conversation_ids = ["1", "2", "3"] - new_corpus = self.corpus.reindex_conversations( - new_convo_conversation_ids, preserve_convo_meta=False, preserve_corpus_meta=False + new_corpus = Corpus.reindex_conversations( + self.corpus, + new_convo_conversation_ids, + preserve_convo_meta=False, + preserve_corpus_meta=False, ) # checking for correct number of conversations and utterances self.assertEqual(len(list(new_corpus.iter_conversations())), 3) diff --git a/convokit/tests/general/test_merge_corpus.py b/convokit/tests/general/test_merge_corpus.py index 95725501..23b25c42 100644 --- a/convokit/tests/general/test_merge_corpus.py +++ b/convokit/tests/general/test_merge_corpus.py @@ -23,11 +23,22 @@ def test_no_overlap(self): ] ) - merged = corpus1.merge(corpus2) + all_utt_ids = set(corpus1.get_utterance_ids()) | set(corpus2.get_utterance_ids()) + all_speaker_ids = set(corpus1.get_speaker_ids()) | set(corpus2.get_speaker_ids()) + + merged = Corpus.merge(corpus1, corpus2) self.assertEqual(len(list(merged.iter_utterances())), 6) self.assertEqual(len(list(merged.iter_speakers())), 6) - self.assertEqual(len(list(corpus1.iter_utterances())), 3) - self.assertEqual(len(list(corpus2.iter_utterances())), 3) + + for utt_id in all_utt_ids: + self.assertTrue(utt_id in merged.storage.get_collection_ids("utterance")) + for speaker_id in all_speaker_ids: + self.assertTrue(speaker_id in merged.storage.get_collection_ids("speaker")) + + for collection in corpus1.storage.data.values(): + self.assertEqual(len(collection), 0) + for collection in corpus2.storage.data.values(): + self.assertEqual(len(collection), 0) def test_with_overlap(self): """ @@ -49,11 +60,22 @@ def test_with_overlap(self): ] ) - merged = corpus1.merge(corpus2) + all_utt_ids = set(corpus1.get_utterance_ids()) | set(corpus2.get_utterance_ids()) + all_speaker_ids = set(corpus1.get_speaker_ids()) | set(corpus2.get_speaker_ids()) + + merged = Corpus.merge(corpus1, corpus2) self.assertEqual(len(list(merged.iter_utterances())), 5) self.assertEqual(len(list(merged.iter_speakers())), 5) - self.assertEqual(len(list(corpus1.iter_utterances())), 3) - self.assertEqual(len(list(corpus2.iter_utterances())), 3) + + for utt_id in all_utt_ids: + self.assertTrue(utt_id in merged.storage.get_collection_ids("utterance")) + for speaker_id in all_speaker_ids: + self.assertTrue(speaker_id in merged.storage.get_collection_ids("speaker")) + + for collection in corpus1.storage.data.values(): + self.assertEqual(len(collection), 0) + for collection in corpus2.storage.data.values(): + self.assertEqual(len(collection), 0) def test_overlap_diff_data(self): """ @@ -77,14 +99,32 @@ def test_overlap_diff_data(self): ] ) - merged = corpus1.merge(corpus2) + all_utt_ids = set(corpus1.get_utterance_ids()) | set(corpus2.get_utterance_ids()) + all_speaker_ids = set(corpus1.get_speaker_ids()) | set(corpus2.get_speaker_ids()) + + merged = Corpus.merge(corpus1, corpus2) self.assertEqual(len(list(merged.iter_utterances())), 5) self.assertEqual(len(list(merged.iter_speakers())), 5) self.assertEqual(len(list(corpus1.iter_utterances())), 3) self.assertEqual(len(list(corpus2.iter_utterances())), 3) self.assertEqual(merged.get_utterance("2").text, "this is a test") - self.assertEqual(merged.get_utterance("2").speaker, Speaker(id="charlie")) + self.assertEqual(merged.get_utterance("2").speaker.id, "charlie") + + for utt_id in all_utt_ids: + self.assertTrue(utt_id in merged.storage.get_collection_ids("utterance")) + for speaker_id in all_speaker_ids: + if ( + speaker_id == "candace" + ): # this speaker shouldn't be present due to overlap prioritization + self.assertFalse(speaker_id in merged.storage.get_collection_ids("speaker")) + else: + self.assertTrue(speaker_id in merged.storage.get_collection_ids("speaker")) + + for collection in corpus1.storage.data.values(): + self.assertEqual(len(collection), 0) + for collection in corpus2.storage.data.values(): + self.assertEqual(len(collection), 0) def test_overlap_diff_metadata(self): """ @@ -118,13 +158,28 @@ def test_overlap_diff_metadata(self): ] ) - merged = corpus1.merge(corpus2) + all_utt_ids = set(corpus1.get_utterance_ids()) | set(corpus2.get_utterance_ids()) + all_speaker_ids = set(corpus1.get_speaker_ids()) | set(corpus2.get_speaker_ids()) + + merged = Corpus.merge(corpus1, corpus2) self.assertEqual(len(list(merged.iter_utterances())), 5) self.assertEqual(len(list(merged.iter_speakers())), 5) self.assertEqual(len(merged.get_utterance("2").meta), 3) self.assertEqual(merged.get_utterance("2").meta["the"], "ringo") + for utt_id in all_utt_ids: + self.assertTrue(utt_id in merged.storage.get_collection_ids("utterance")) + self.assertTrue(f"utterance_{utt_id}" in merged.storage.get_collection_ids("meta")) + for speaker_id in all_speaker_ids: + self.assertTrue(speaker_id in merged.storage.get_collection_ids("speaker")) + self.assertTrue(f"speaker_{speaker_id}" in merged.storage.get_collection_ids("meta")) + + for collection in corpus1.storage.data.values(): + self.assertEqual(len(collection), 0) + for collection in corpus2.storage.data.values(): + self.assertEqual(len(collection), 0) + def test_overlap_convo_metadata(self): """ Merge with overlap in conversation with metadata differences. @@ -181,10 +236,18 @@ def test_overlap_convo_metadata(self): corpus2.get_conversation("convo1").add_meta("hello", "food") corpus2.get_conversation("convo1").add_meta("what", "a mood") - merged = corpus1.merge(corpus2) + merged = Corpus.merge(corpus1, corpus2) self.assertEqual(len(merged.get_conversation("convo1").meta), 3) self.assertEqual(merged.get_conversation("convo1").meta["hello"], "food") + self.assertTrue("convo1" in merged.storage.get_collection_ids("conversation")) + self.assertTrue("conversation_convo1" in merged.storage.get_collection_ids("meta")) + + self.assertFalse("convo1" in corpus1.storage.get_collection_ids("conversation")) + self.assertFalse("convo1" in corpus2.storage.get_collection_ids("conversation")) + self.assertFalse("conversation_convo1" in corpus1.storage.get_collection_ids("meta")) + self.assertFalse("conversation_convo1" in corpus2.storage.get_collection_ids("meta")) + def test_corpus_metadata(self): """ Merge with overlap in corpus metadata @@ -213,7 +276,7 @@ def test_corpus_metadata(self): corpus2.add_meta("toxicity", 0.9) corpus2.add_meta("paggro", 1.0) - merged = corpus1.merge(corpus2) + merged = Corpus.merge(corpus1, corpus2) self.assertEqual(len(merged.meta), 3) self.assertEqual(merged.meta["toxicity"], 0.9) @@ -243,9 +306,20 @@ def test_add_utterance(self): ] added = corpus1.add_utterances(utts) + self.assertIs(added, corpus1) self.assertEqual(len(list(added.iter_utterances())), 4) + self.assertEqual(set(added.get_utterance_ids()), {"0", "1", "2", "5"}) + self.assertEqual(set(added.get_speaker_ids()), {"alice", "bob", "charlie", "foxtrot"}) self.assertEqual(len(added.get_utterance("2").meta), 3) self.assertEqual(added.get_utterance("2").meta["hello"], "food") + self.assertIn("what", added.get_utterance("2").meta) + self.assertEqual(added.get_utterance("1").text, "my name is bob") + self.assertEqual(added.get_utterance("1").speaker.id, "bob") + self.assertEqual(added.get_utterance("5").text, "goodbye") + self.assertEqual(added.get_utterance("5").speaker.id, "foxtrot") + + for utt in added.iter_utterances(): + self.assertFalse(hasattr(utt, "_temp_storage")) if __name__ == "__main__":