From 9634005be40ceb40792616431f073c360fcbe922 Mon Sep 17 00:00:00 2001 From: Jonathan Chang Date: Sun, 4 Sep 2022 12:40:33 -0400 Subject: [PATCH] DB storage manager (#175) * initial implementation of DBStorageManager * Update github actions and dependencies (from original db-mode branch) * Basic DB mode functionality implemented * Implement database reconnection * Add support for binary metadata in db * Implement optimized JSON-to-DB path * Delete accidentally added file arcExtractor.py * ran black fmt * corpusHelper -> corpus_helpers * renaming some funcs for clarity * Refactor DB operation calls into corpus_helpers * Add docs for DB mode --- .github/workflows/continuous-integration.yml | 5 + convokit/__init__.py | 1 + convokit/convokitConfig.py | 56 ++++ convokit/model/__init__.py | 10 +- convokit/model/convoKitMeta.py | 6 +- convokit/model/corpus.py | 287 ++++++++++------ convokit/model/corpusComponent.py | 4 +- .../{corpusHelper.py => corpus_helpers.py} | 309 +++++++++++++++++- convokit/model/storageManager.py | 150 ++++++++- convokit/util.py | 5 + docs/source/db_setup.rst | 107 ++++++ docs/source/index.rst | 20 +- docs/source/install.rst | 36 ++ docs/source/storage_options.rst | 79 +++++ docs/source/tutorial.rst | 10 +- setup.py | 3 + 16 files changed, 950 insertions(+), 138 deletions(-) create mode 100644 convokit/convokitConfig.py rename convokit/model/{corpusHelper.py => corpus_helpers.py} (54%) create mode 100644 docs/source/db_setup.rst create mode 100644 docs/source/install.rst create mode 100644 docs/source/storage_options.rst diff --git a/.github/workflows/continuous-integration.yml b/.github/workflows/continuous-integration.yml index ba70ce52..862c1fff 100644 --- a/.github/workflows/continuous-integration.yml +++ b/.github/workflows/continuous-integration.yml @@ -8,6 +8,7 @@ jobs: strategy: matrix: python-version: [3.7, 3.8, 3.9] + mongodb-version: [5.0.2] steps: - uses: actions/checkout@v3 @@ -15,6 +16,10 @@ jobs: uses: actions/setup-python@v2 with: python-version: ${{ matrix.python-version }} + - name: Start MongoDB + uses: supercharge/mongodb-github-action@1.7.0 + with: + mongodb-version: ${{ matrix.mongodb-version }} - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/convokit/__init__.py b/convokit/__init__.py index 8fba2078..a37d8c64 100644 --- a/convokit/__init__.py +++ b/convokit/__init__.py @@ -17,5 +17,6 @@ from .bag_of_words import * from .expected_context_framework import * from .surprise import * +from .convokitConfig import * # __path__ = __import__('pkgutil').extend_path(__path__, __name__) diff --git a/convokit/convokitConfig.py b/convokit/convokitConfig.py new file mode 100644 index 00000000..6d11d8dc --- /dev/null +++ b/convokit/convokitConfig.py @@ -0,0 +1,56 @@ +import os +from typing import Optional +from yaml import load, Loader + + +DEFAULT_CONFIG_CONTENTS = ( + "# Default Storage Parameters\n" + "db_host: localhost:27017\n" + "data_directory: ~/.convokit/saved-corpora\n" + "default_storage_mode: mem" +) + +ENV_VARS = {"db_host": "CONVOKIT_DB_HOST", "default_storage_mode": "CONVOKIT_STORAGE_MODE"} + + +class ConvoKitConfig: + """ + Utility class providing read-only access to the ConvoKit config file + """ + + def __init__(self, filename: Optional[str] = None): + if filename is None: + filename = os.path.expanduser("~/.convokit/config.yml") + + if not os.path.isfile(filename): + convo_dir = os.path.dirname(filename) + if not os.path.isdir(convo_dir): + os.makedirs(convo_dir) + with open(filename, "w") as f: + print( + f"No configuration file found at {filename}; writing with contents: \n{DEFAULT_CONFIG_CONTENTS}" + ) + f.write(DEFAULT_CONFIG_CONTENTS) + self.config_contents = load(DEFAULT_CONFIG_CONTENTS, Loader=Loader) + else: + with open(filename, "r") as f: + self.config_contents = load(f.read(), Loader=Loader) + + def _get_config_from_env_or_file(self, config_key: str, default_val): + env_val = os.environ.get(ENV_VARS[config_key], None) + if env_val is not None: + # environment variable setting takes priority + return env_val + return self.config_contents.get(config_key, default_val) + + @property + def db_host(self): + return self._get_config_from_env_or_file("db_host", "localhost:27017") + + @property + def data_directory(self): + return self.config_contents.get("data_directory", "~/.convokit/saved-corpora") + + @property + def default_storage_mode(self): + return self._get_config_from_env_or_file("default_storage_mode", "mem") diff --git a/convokit/model/__init__.py b/convokit/model/__init__.py index fd7a41a7..6aa6434b 100644 --- a/convokit/model/__init__.py +++ b/convokit/model/__init__.py @@ -1,10 +1,10 @@ from .conversation import Conversation +from .convoKitIndex import ConvoKitIndex +from .convoKitMatrix import ConvoKitMatrix from .corpus import Corpus +from .corpusComponent import CorpusComponent +from .corpus_helpers import * from .speaker import Speaker +from .user import User from .utterance import Utterance -from .corpusComponent import CorpusComponent -from .corpusHelper import * -from .convoKitIndex import ConvoKitIndex -from .convoKitMatrix import ConvoKitMatrix from .utteranceNode import UtteranceNode -from .user import User diff --git a/convokit/model/convoKitMeta.py b/convokit/model/convoKitMeta.py index 299047ad..dd5412c3 100644 --- a/convokit/model/convoKitMeta.py +++ b/convokit/model/convoKitMeta.py @@ -28,7 +28,8 @@ def storage_key(self) -> str: return f"{self.obj_type}_{self.owner.id}" def __getitem__(self, item): - return self._get_storage().get_data("meta", self.storage_key, item) + item_type = self.index.get_index(self.obj_type).get(item, None) + return self._get_storage().get_data("meta", self.storage_key, item, item_type) def _get_storage(self): # special case for Corpus meta since that's the only time owner is not a CorpusComponent @@ -65,7 +66,8 @@ def __setitem__(self, key, value): if self.index.type_check: ConvoKitMeta._check_type_and_update_index(self.index, self.obj_type, key, value) - self._get_storage().update_data("meta", self.storage_key, key, value) + item_type = self.index.get_index(self.obj_type).get(key, None) + self._get_storage().update_data("meta", self.storage_key, key, value, item_type) def __delitem__(self, key): if self.obj_type == "corpus": diff --git a/convokit/model/corpus.py b/convokit/model/corpus.py index 8f151ff7..56ed296e 100644 --- a/convokit/model/corpus.py +++ b/convokit/model/corpus.py @@ -1,15 +1,17 @@ +import random +import shutil +from typing import List, Collection, Callable, Set, Generator, Tuple, Optional, ValuesView, Union + from pandas import DataFrame from tqdm import tqdm -from typing import List, Collection, Callable, Set, Generator, Tuple, Optional, ValuesView, Union -from .corpusHelper import * -from convokit.util import deprecation, warn -from .corpusUtil import * + +from convokit.convokitConfig import ConvoKitConfig +from convokit.util import deprecation, create_safe_id from .convoKitIndex import ConvoKitIndex -import random -from .convoKitMeta import ConvoKitMeta from .convoKitMatrix import ConvoKitMatrix -from .storageManager import StorageManager, MemStorageManager -import shutil +from .corpusUtil import * +from .corpus_helpers import * +from .storageManager import DBStorageManager, StorageManager, MemStorageManager class Corpus: @@ -44,6 +46,8 @@ def __init__( self, filename: Optional[str] = None, utterances: Optional[List[Utterance]] = None, + db_collection_prefix: Optional[str] = None, + db_host: Optional[str] = None, preload_vectors: List[str] = None, utterance_start_index: int = None, utterance_end_index: int = None, @@ -53,9 +57,12 @@ def __init__( exclude_speaker_meta: Optional[List[str]] = None, exclude_overall_meta: Optional[List[str]] = None, disable_type_check=True, + storage_type: Optional[str] = None, storage: Optional[StorageManager] = None, ): + self.config = ConvoKitConfig() + if filename is None: self.corpus_dirpath = None elif os.path.isdir(filename): @@ -63,8 +70,20 @@ def __init__( else: self.corpus_dirpath = os.path.dirname(filename) + # configure corpus ID (optional for mem mode, required for DB mode) + if storage_type is None: + storage_type = self.config.default_storage_mode + if db_collection_prefix is None and filename is None and storage_type == "db": + db_collection_prefix = create_safe_id() + warn( + "You are in DB mode, but no collection prefix was specified and no filename was given from which to infer one." + "Will use a randomly generated unique prefix " + db_collection_prefix + ) self.id = None - if filename is not None: + if db_collection_prefix is not None: + # treat the unique collection prefix as the ID (even if a filename is specified) + self.id = db_collection_prefix + elif filename is not None: # automatically derive an ID from the file path self.id = os.path.basename(os.path.normpath(filename)) @@ -72,7 +91,16 @@ def __init__( if storage is not None: self.storage = storage else: - self.storage = MemStorageManager() + if storage_type == "mem": + self.storage = MemStorageManager() + elif storage_type == "db": + if db_host is None: + db_host = self.config.db_host + self.storage = DBStorageManager(self.id, db_host) + else: + raise ValueError( + f"Unrecognized setting '{storage_type}' for storage type; should be either 'mem' or 'db'." + ) self.meta_index = ConvoKitIndex(self) self.meta = ConvoKitMeta(self, self.meta_index, "corpus") @@ -90,82 +118,35 @@ def __init__( if exclude_overall_meta is None: exclude_overall_meta = [] - # Construct corpus from file or directory - if filename is not None: - if disable_type_check: - self.meta_index.disable_type_check() - if os.path.isdir(filename): - utterances = load_uttinfo_from_dir( - filename, utterance_start_index, utterance_end_index, exclude_utterance_meta - ) - - speakers_data = load_speakers_data_from_dir(filename, exclude_speaker_meta) - convos_data = load_convos_data_from_dir(filename, exclude_conversation_meta) - load_corpus_meta_from_dir(filename, self.meta, exclude_overall_meta) - - with open(os.path.join(filename, "index.json"), "r") as f: - idx_dict = json.load(f) - self.meta_index.update_from_dict(idx_dict) - - # load all processed text information, but don't load actual text. - # also checks if the index file exists. - # try: - # with open(os.path.join(filename, "processed_text.index.json"), "r") as f: - # self.processed_text = {k: {} for k in json.load(f)} - # except: - # pass - - # unpack binary data for utterances - unpack_binary_data_for_utts( - utterances, - filename, - self.meta_index.utterances_index, - exclude_utterance_meta, - KeyMeta, - ) - # unpack binary data for speakers - unpack_binary_data( - filename, - speakers_data, - self.meta_index.speakers_index, - "speaker", - exclude_speaker_meta, - ) - - # unpack binary data for conversations - unpack_binary_data( - filename, - convos_data, - self.meta_index.conversations_index, - "convo", - exclude_conversation_meta, - ) - - # unpack binary data for overall corpus - unpack_binary_data( - filename, - self.meta, - self.meta_index.overall_index, - "overall", - exclude_overall_meta, - ) - - else: - speakers_data = defaultdict(dict) - convos_data = defaultdict(dict) - utterances = load_from_utterance_file( - filename, utterance_start_index, utterance_end_index - ) - - self.utterances = dict() - self.speakers = dict() - - initialize_speakers_and_utterances_objects( - self, self.utterances, utterances, self.speakers, speakers_data + if filename is not None and storage_type == "db": + # JSON-to-DB construction mode uses a specialized code branch, which + # optimizes for this use case by using direct batch insertions into the + # DB rather than going through the StorageManager, hence improving + # efficiency. + + with open(os.path.join(filename, "index.json"), "r") as f: + idx_dict = json.load(f) + self.meta_index.update_from_dict(idx_dict) + + # populate the DB with the contents of the source file + ids_in_db = populate_db_from_file( + filename, + self.storage.db, + self.id, + self.meta_index, + utterance_start_index, + utterance_end_index, + exclude_utterance_meta, + exclude_conversation_meta, + exclude_speaker_meta, + exclude_overall_meta, ) - self.meta_index.enable_type_check() + # with the StorageManager's DB now populated, initialize the corresponding + # CorpusComponent instances. + init_corpus_from_storage_manager(self, ids_in_db) + self.meta_index.enable_type_check() # load preload_vectors if preload_vectors is not None: for vector_name in preload_vectors: @@ -173,22 +154,130 @@ def __init__( if matrix is not None: self._vector_matrices[vector_name] = matrix - elif utterances is not None: # Construct corpus from utterances list - self.speakers = {u.speaker.id: u.speaker for u in utterances} - self.utterances = {u.id: u for u in utterances} - for _, speaker in self.speakers.items(): - speaker.owner = self - for _, utt in self.utterances.items(): - utt.owner = self - - if merge_lines: - self.utterances = merge_utterance_lines(self.utterances) - - if disable_type_check: - self.meta_index.disable_type_check() - self.conversations = initialize_conversations(self, self.utterances, convos_data) - self.meta_index.enable_type_check() - self.update_speakers_data() + if merge_lines: + self.utterances = merge_utterance_lines(self.utterances) + else: + # Construct corpus from file or directory + if filename is not None: + if disable_type_check: + self.meta_index.disable_type_check() + if os.path.isdir(filename): + utterances = load_utterance_info_from_dir( + filename, utterance_start_index, utterance_end_index, exclude_utterance_meta + ) + + speakers_data = load_speakers_data_from_dir(filename, exclude_speaker_meta) + convos_data = load_convos_data_from_dir(filename, exclude_conversation_meta) + load_corpus_meta_from_dir(filename, self.meta, exclude_overall_meta) + + with open(os.path.join(filename, "index.json"), "r") as f: + idx_dict = json.load(f) + self.meta_index.update_from_dict(idx_dict) + + # load all processed text information, but don't load actual text. + # also checks if the index file exists. + # try: + # with open(os.path.join(filename, "processed_text.index.json"), "r") as f: + # self.processed_text = {k: {} for k in json.load(f)} + # except: + # pass + + # unpack binary data for utterances + unpack_binary_data_for_utts( + utterances, + filename, + self.meta_index.utterances_index, + exclude_utterance_meta, + KeyMeta, + ) + # unpack binary data for speakers + unpack_binary_data( + filename, + speakers_data, + self.meta_index.speakers_index, + "speaker", + exclude_speaker_meta, + ) + + # unpack binary data for conversations + unpack_binary_data( + filename, + convos_data, + self.meta_index.conversations_index, + "convo", + exclude_conversation_meta, + ) + + # unpack binary data for overall corpus + unpack_binary_data( + filename, + self.meta, + self.meta_index.overall_index, + "overall", + exclude_overall_meta, + ) + + else: + speakers_data = defaultdict(dict) + convos_data = defaultdict(dict) + utterances = load_from_utterance_file( + filename, utterance_start_index, utterance_end_index + ) + + self.utterances = dict() + self.speakers = dict() + + initialize_speakers_and_utterances_objects( + self, self.utterances, utterances, self.speakers, speakers_data + ) + + self.meta_index.enable_type_check() + + # load preload_vectors + if preload_vectors is not None: + for vector_name in preload_vectors: + matrix = ConvoKitMatrix.from_dir(self.corpus_dirpath, vector_name) + if matrix is not None: + self._vector_matrices[vector_name] = matrix + + elif utterances is not None: # Construct corpus from utterances list + self.speakers = {u.speaker.id: u.speaker for u in utterances} + self.utterances = {u.id: u for u in utterances} + for _, speaker in self.speakers.items(): + speaker.owner = self + for _, utt in self.utterances.items(): + utt.owner = self + + if merge_lines: + self.utterances = merge_utterance_lines(self.utterances) + + if disable_type_check: + self.meta_index.disable_type_check() + # if corpus is nonempty (check for self.utterances), construct the conversation + # data from the utterance list + if hasattr(self, "utterances"): + self.conversations = initialize_conversations(self, self.utterances, convos_data) + self.meta_index.enable_type_check() + self.update_speakers_data() + + @classmethod + def reconnect_to_db(cls, db_collection_prefix: str, db_host: Optional[str] = None): + """ + Factory method for a Corpus instance backed by an already-existing database (e.g., + one that was created in a previous run of a Python script or interactive session). + + This can be used to reconnect to existing Corpus data that you still want to use + without having to reload the data from the source file; this can happen for example + if your script crashed in the middle of working with the Corpus and you want to + resume where you left off. + """ + # create a blank Corpus that will hold the data + result = cls(db_collection_prefix=db_collection_prefix, db_host=db_host, storage_type="db") + # through the constructor, the blank Corpus' StorageManager is now connected + # to the DB. Next use the DB contents to populate the corpus components. + init_corpus_from_storage_manager(result) + + return result @property def vectors(self): diff --git a/convokit/model/corpusComponent.py b/convokit/model/corpusComponent.py index 929fc24f..2ae4fd48 100644 --- a/convokit/model/corpusComponent.py +++ b/convokit/model/corpusComponent.py @@ -72,10 +72,12 @@ def init_meta(self, meta): # ConvoKitMeta instances are not allowed for ownerless (standalone) # components since they must be backed by a StorageManager. In this # case we must forcibly convert the ConvoKitMeta instance to dict - if type(meta) == ConvoKitMeta: + if isinstance(meta, ConvoKitMeta): meta = meta.to_dict() return meta else: + if isinstance(meta, ConvoKitMeta) and meta.owner is self._owner: + return meta ck_meta = ConvoKitMeta(self, self.owner.meta_index, self.obj_type) for key, value in meta.items(): ck_meta[key] = value diff --git a/convokit/model/corpusHelper.py b/convokit/model/corpus_helpers.py similarity index 54% rename from convokit/model/corpusHelper.py rename to convokit/model/corpus_helpers.py index 56b5805a..9adc8edc 100644 --- a/convokit/model/corpusHelper.py +++ b/convokit/model/corpus_helpers.py @@ -2,17 +2,20 @@ Contains functions that help with the construction / dumping of a Corpus """ -import os import json +import os +import pickle from collections import defaultdict from typing import Dict -import pickle -from .speaker import Speaker -from .utterance import Utterance +import bson +from pymongo import UpdateOne + +from convokit.util import warn from .conversation import Conversation from .convoKitMeta import ConvoKitMeta -from convokit.util import warn +from .speaker import Speaker +from .utterance import Utterance BIN_DELIM_L, BIN_DELIM_R = "<##bin{", "}&&@**>" KeyId = "id" @@ -25,8 +28,10 @@ KeyMeta = "meta" KeyVectors = "vectors" +JSONLIST_BUFFER_SIZE = 1000 + -def load_uttinfo_from_dir( +def load_utterance_info_from_dir( dirname, utterance_start_index, utterance_end_index, exclude_utterance_meta ): assert dirname is not None @@ -284,15 +289,22 @@ def merge_utterance_lines(utt_dict): return new_utterances -def initialize_conversations(corpus, utt_dict, convos_data): +def initialize_conversations(corpus, utt_dict, convos_data, convo_to_utts=None): """ - Initialize Conversation objects from utterances and conversations data + Initialize Conversation objects from utterances and conversations data. + If a mapping from Conversation IDs to their constituent Utterance IDs is + already known (e.g., as a side effect of a prior computation) they can be + directly provided via the convo_to_utts parameter, otherwise the mapping + will be computed by iteration over the Utterances in utt_dict. """ # organize utterances by conversation - convo_to_utts = defaultdict(list) # temp container identifying utterances by conversation - for u in utt_dict.values(): - convo_key = u.conversation_id # each conversation_id is considered a separate conversation - convo_to_utts[convo_key].append(u.id) + if convo_to_utts is None: + convo_to_utts = defaultdict(list) # temp container identifying utterances by conversation + for u in utt_dict.values(): + convo_key = ( + u.conversation_id + ) # each conversation_id is considered a separate conversation + convo_to_utts[convo_key].append(u.id) conversations = {} for convo_id in convo_to_utts: # look up the metadata associated with this conversation, if any @@ -409,3 +421,276 @@ def dump_jsonlist_from_dict(entries, filename, index_key="id", value_key="value" def extract_meta_from_df(df): meta_cols = [col.split(".")[1] for col in df if col.startswith("meta")] return meta_cols + + +def load_binary_metadata(filename, index, exclude_meta=None): + binary_data = {"utterance": {}, "conversation": {}, "speaker": {}, "corpus": {}} + for component_type in binary_data: + meta_index = index.get_index(component_type) + for meta_key, meta_type in meta_index.items(): + if meta_type == ["bin"] and ( + exclude_meta is None or meta_key not in exclude_meta[component_type] + ): + try: + with open( + os.path.join(filename, meta_key + "-{}-bin.p".format(component_type)), "rb" + ) as f: + l_bin = pickle.load(f) + binary_data[component_type][meta_key] = l_bin + except FileNotFoundError: + warn( + f"Metadata field {meta_key} is specified to have binary type but no saved binary data was found. This field will be skipped." + ) + return binary_data + + +def load_jsonlist_to_db( + filename, + db, + collection_prefix, + start_line=None, + end_line=None, + exclude_meta=None, + bin_meta=None, +): + """ + Populate the specified MongoDB database with the utterance data contained in + the given filename (which should point to an utterances.jsonl file). + """ + utt_collection = db[f"{collection_prefix}_utterance"] + meta_collection = db[f"{collection_prefix}_meta"] + inserted_ids = set() + speaker_key = None + convo_key = None + reply_key = None + with open(filename) as f: + utt_insertion_buffer = [] + meta_insertion_buffer = [] + for ln, line in enumerate(f): + if start_line is not None and ln < start_line: + continue + if end_line is not None and ln > end_line: + break + utt_obj = json.loads(line) + if speaker_key is None: + # backwards compatibility for corpora made before the user->speaker rename + speaker_key = "speaker" if "speaker" in utt_obj else "user" + if convo_key is None: + # backwards compatibility for corpora made before the root->conversation_id rename + convo_key = "conversation_id" if "conversation_id" in utt_obj else "root" + if reply_key is None: + # fix for misnamed reply_to in subreddit corpora + reply_key = "reply-to" if "reply-to" in utt_obj else "reply_to" + utt_insertion_buffer.append( + UpdateOne( + {"_id": utt_obj["id"]}, + { + "$set": { + "speaker_id": utt_obj[speaker_key], + "conversation_id": utt_obj[convo_key], + "reply_to": utt_obj[reply_key], + "timestamp": utt_obj["timestamp"], + "text": utt_obj["text"], + } + }, + upsert=True, + ) + ) + utt_meta = utt_obj["meta"] + if exclude_meta is not None: + for exclude_key in exclude_meta: + if exclude_key in utt_meta: + del utt_meta[exclude_key] + if bin_meta is not None: + for key, bin_list in bin_meta.items(): + bin_locator = utt_meta[key] + if ( + type(bin_locator) == "str" + and bin_locator.startswith(BIN_DELIM_L) + and bin_locator.endswith(BIN_DELIM_R) + ): + bin_idx = int(bin_locator[len(BIN_DELIM_L) : -len(BIN_DELIM_R)]) + utt_meta[key] = bson.Binary(pickle.dumps(bin_list[bin_idx])) + meta_insertion_buffer.append( + UpdateOne({"_id": "utterance_" + utt_obj["id"]}, {"$set": utt_meta}, upsert=True) + ) + inserted_ids.add(utt_obj["id"]) + if len(utt_insertion_buffer) >= JSONLIST_BUFFER_SIZE: + utt_collection.bulk_write(utt_insertion_buffer) + meta_collection.bulk_write(meta_insertion_buffer) + utt_insertion_buffer = [] + meta_insertion_buffer = [] + # after loop termination, insert any remaining items in the buffer + if len(utt_insertion_buffer) > 0: + utt_collection.bulk_write(utt_insertion_buffer) + meta_collection.bulk_write(meta_insertion_buffer) + utt_insertion_buffer = [] + meta_insertion_buffer = [] + return inserted_ids + + +def load_json_to_db( + filename, db, collection_prefix, component_type, exclude_meta=None, bin_meta=None +): + """ + Populate the specified MongoDB database with corpus component data from + either the speakers.json or conversations.json file located in a directory + containing valid ConvoKit Corpus data. The component_type parameter controls + which JSON file gets used. + """ + component_collection = db[f"{collection_prefix}_{component_type}"] + meta_collection = db[f"{collection_prefix}_meta"] + if component_type == "speaker": + json_data = load_speakers_data_from_dir(filename, exclude_meta) + elif component_type == "conversation": + json_data = load_convos_data_from_dir(filename, exclude_meta) + component_insertion_buffer = [] + meta_insertion_buffer = [] + for component_id, component_data in json_data.items(): + if KeyMeta in component_data: + # contains non-metadata entries + payload = {k: v for k, v in component_data.items() if k not in {"meta", "vectors"}} + meta = component_data[KeyMeta] + else: + # contains only metadata, with metadata at the top level + payload = {} + meta = component_data + component_insertion_buffer.append( + UpdateOne({"_id": component_id}, {"$set": payload}, upsert=True) + ) + if bin_meta is not None: + for key, bin_list in bin_meta.items(): + bin_locator = meta[key] + if ( + type(bin_locator) == "str" + and bin_locator.startswith(BIN_DELIM_L) + and bin_locator.endswith(BIN_DELIM_R) + ): + bin_idx = int(bin_locator[len(BIN_DELIM_L) : -len(BIN_DELIM_R)]) + meta[key] = bson.Binary(pickle.dumps(bin_list[bin_idx])) + meta_insertion_buffer.append( + UpdateOne({"_id": f"{component_type}_{component_id}"}, {"$set": meta}, upsert=True) + ) + component_collection.bulk_write(component_insertion_buffer) + meta_collection.bulk_write(meta_insertion_buffer) + + +def load_corpus_info_to_db(filename, db, collection_prefix, exclude_meta=None, bin_meta=None): + """ + Populate the specified MongoDB database with Corpus metadata loaded from the + corpus.json file of a directory containing valid ConvoKit Corpus data. + """ + if exclude_meta is None: + exclude_meta = {} + meta_collection = db[f"{collection_prefix}_meta"] + with open(os.path.join(filename, "corpus.json")) as f: + corpus_meta = {k: v for k, v in json.load(f).items() if k not in exclude_meta} + if bin_meta is not None: + for key, bin_list in bin_meta.items(): + bin_locator = corpus_meta[key] + if ( + type(bin_locator) == "str" + and bin_locator.startswith(BIN_DELIM_L) + and bin_locator.endswith(BIN_DELIM_R) + ): + bin_idx = int(bin_locator[len(BIN_DELIM_L) : -len(BIN_DELIM_R)]) + corpus_meta[key] = bson.Binary(pickle.dumps(bin_list[bin_idx])) + meta_collection.update_one( + {"_id": f"corpus_{collection_prefix}"}, {"$set": corpus_meta}, upsert=True + ) + + +def populate_db_from_file( + filename, + db, + collection_prefix, + meta_index, + utterance_start_index, + utterance_end_index, + exclude_utterance_meta, + exclude_conversation_meta, + exclude_speaker_meta, + exclude_overall_meta, +): + """ + Populate all necessary collections of a MongoDB database so that it can be + used by a DBStorageManager, sourcing data from the valid ConvoKit Corpus + data pointed to by the filename parameter. + """ + binary_meta = load_binary_metadata( + filename, + meta_index, + { + "utterance": exclude_utterance_meta, + "conversation": exclude_conversation_meta, + "speaker": exclude_speaker_meta, + "corpus": exclude_overall_meta, + }, + ) + + # first load the utterance data + inserted_utt_ids = load_jsonlist_to_db( + os.path.join(filename, "utterances.jsonl"), + db, + collection_prefix, + utterance_start_index, + utterance_end_index, + exclude_utterance_meta, + binary_meta["utterance"], + ) + # next load the speaker and conversation data + for component_type in ["speaker", "conversation"]: + load_json_to_db( + filename, + db, + collection_prefix, + component_type, + (exclude_speaker_meta if component_type == "speaker" else exclude_conversation_meta), + binary_meta[component_type], + ) + # finally, load the corpus metadata + load_corpus_info_to_db( + filename, db, collection_prefix, exclude_overall_meta, binary_meta["corpus"] + ) + + return inserted_utt_ids + + +def init_corpus_from_storage_manager(corpus, utt_ids=None): + """ + Use an already-populated MongoDB database to initialize the components of + the specified Corpus (which should be empty before this function is called) + """ + # we will bypass the initialization step when constructing components since + # we know their necessary data already exists within the db + corpus.storage.bypass_init = True + + # fetch object ids from the DB and initialize corpus components for them + # create speakers first so we can refer to them when initializing utterances + speakers = {} + for speaker_doc in corpus.storage.data["speaker"].find(projection=["_id"]): + speaker_id = speaker_doc["_id"] + speakers[speaker_id] = Speaker(owner=corpus, id=speaker_id) + corpus.speakers = speakers + + # next, create utterances + utterances = {} + convo_to_utts = defaultdict(list) + for utt_doc in corpus.storage.data["utterance"].find( + projection=["_id", "speaker_id", "conversation_id"] + ): + utt_id = utt_doc["_id"] + if utt_ids is None or utt_id in utt_ids: + convo_to_utts[utt_doc["conversation_id"]].append(utt_id) + utterances[utt_id] = Utterance( + owner=corpus, id=utt_id, speaker=speakers[utt_doc["speaker_id"]] + ) + corpus.utterances = utterances + + # run post-construction integrity steps as in regular constructor + corpus.conversations = initialize_conversations(corpus, corpus.utterances, {}, convo_to_utts) + corpus.meta_index.enable_type_check() + corpus.update_speakers_data() + + # restore the StorageManager's init behavior to default + corpus.storage.bypass_init = False diff --git a/convokit/model/storageManager.py b/convokit/model/storageManager.py index 303ad617..8a19030c 100644 --- a/convokit/model/storageManager.py +++ b/convokit/model/storageManager.py @@ -1,5 +1,9 @@ -from typing import Optional +from typing import Optional, List from abc import ABCMeta, abstractmethod +from pymongo import MongoClient +from pymongo.database import Database +import bson +import pickle class StorageManager(metaclass=ABCMeta): @@ -43,19 +47,35 @@ def initialize_data_for_component( return NotImplemented @abstractmethod - def get_data(self, component_type: str, component_id: str, property_name: Optional[str] = None): + def get_data( + self, + component_type: str, + component_id: str, + property_name: Optional[str] = None, + object_type: Optional[List[str]] = None, + ): """ Retrieve the property data for the component of type component_type with id component_id. If property_name is specified return only the data for that property, otherwise return the dict containing all properties. + Additionally, the expected type of the property to be fetched may be specified + as a string; this is meant to be used for metadata in conjunction with the index. """ return NotImplemented @abstractmethod - def update_data(self, component_type: str, component_id: str, property_name: str, new_value): + def update_data( + self, + component_type: str, + component_id: str, + property_name: str, + new_value, + object_type: Optional[List[str]] = None, + ): """ Set or update the property data for the component of type component_type - with id component_id + with id component_id. For metadata, the Python object type may also be + specified, to be used in conjunction with the index. """ return NotImplemented @@ -132,7 +152,13 @@ def initialize_data_for_component( if overwrite or not self.has_data_for_component(component_type, component_id): collection[component_id] = initial_value if initial_value is not None else {} - def get_data(self, component_type: str, component_id: str, property_name: Optional[str] = None): + def get_data( + self, + component_type: str, + component_id: str, + property_name: Optional[str] = None, + object_type: Optional[List[str]] = None, + ): collection = self.get_collection(component_type) if component_id not in collection: raise KeyError( @@ -143,7 +169,14 @@ def get_data(self, component_type: str, component_id: str, property_name: Option else: return collection[component_id][property_name] - def update_data(self, component_type: str, component_id: str, property_name: str, new_value): + def update_data( + self, + component_type: str, + component_id: str, + property_name: str, + new_value, + object_type: Optional[List[str]] = None, + ): collection = self.get_collection(component_type) # don't create new collections if the ID is not found; this is supposed to be handled in the # CorpusComponent constructor so if the ID is missing that indicates something is wrong @@ -169,3 +202,108 @@ def delete_data( def clear_all_data(self): for key in self.data: self.data[key] = {} + + +class DBStorageManager(StorageManager): + """ + Concrete StorageManager implementation for database-backed data storage. + Collections are implemented as MongoDB collections. + """ + + def __init__(self, collection_prefix, db_host: Optional[str] = None): + super().__init__() + + self.collection_prefix = collection_prefix + self.client = MongoClient(db_host) + self.db = self.client["convokit"] + + # this special lock is used for reconnecting to an existing DB, whereupon + # it is known that all the data already exists and so the initialization + # step can be skipped, greatly saving time + self.bypass_init = False + + # initialize component collections as MongoDB collections in the convokit db + for key in self.data: + self.data[key] = self.db[self._get_collection_name(key)] + + def _get_collection_name(self, component_type: str) -> str: + return f"{self.collection_prefix}_{component_type}" + + def get_collection_ids(self, component_type: str): + # from StackOverflow: get all keys in a MongoDB collection + # https://stackoverflow.com/questions/2298870/get-names-of-all-keys-in-the-collection + map = bson.Code("function() { for (var key in this) { emit(key, null); } }") + reduce = bson.Code("function(key, stuff) { return null; }") + result = self.db[self._get_collection_name(component_type)].map_reduce( + map, reduce, "get_collection_ids_result" + ) + return result.distinct("_id") + + def has_data_for_component(self, component_type: str, component_id: str) -> bool: + collection = self.get_collection(component_type) + lookup = collection.find_one({"_id": component_id}) + return lookup is not None + + def initialize_data_for_component( + self, component_type: str, component_id: str, overwrite: bool = False, initial_value=None + ): + if self.bypass_init: + return + collection = self.get_collection(component_type) + if overwrite or not self.has_data_for_component(component_type, component_id): + data = initial_value if initial_value is not None else {} + collection.update_one({"_id": component_id}, {"$set": data}, upsert=True) + + def get_data( + self, + component_type: str, + component_id: str, + property_name: Optional[str] = None, + object_type: Optional[List[str]] = None, + ): + collection = self.get_collection(component_type) + all_fields = collection.find_one({"_id": component_id}) + if all_fields is None: + raise KeyError( + f"This StorageManager does not have an entry for the {component_type} with id {component_id}." + ) + if property_name is None: + return all_fields + else: + result = all_fields[property_name] + if object_type == ["bin"]: + # binary data must be unpacked + result = pickle.loads(result) + return result + + def update_data( + self, + component_type: str, + component_id: str, + property_name: str, + new_value, + object_type: Optional[List[str]] = None, + ): + data = self.get_data(component_type, component_id) + if object_type == ["bin"]: + # non-serializable types must go through pickling then be encoded as bson.Binary + new_value = bson.Binary(pickle.dumps(new_value)) + data[property_name] = new_value + collection = self.get_collection(component_type) + collection.update_one({"_id": component_id}, {"$set": data}) + + def delete_data( + self, component_type: str, component_id: str, property_name: Optional[str] = None + ): + collection = self.get_collection(component_type) + if property_name is None: + # delete the entire document + collection.delete_one({"_id": component_id}) + else: + # delete only the specified property + collection.update_one({"_id": component_id}, {"$unset": {property_name: ""}}) + + def clear_all_data(self): + for key in self.data: + self.data[key].drop() + self.data[key] = self.db[self._get_collection_name(key)] diff --git a/convokit/util.py b/convokit/util.py index 4d5067d5..2942b97d 100644 --- a/convokit/util.py +++ b/convokit/util.py @@ -2,6 +2,7 @@ import os import shutil import urllib.request +import uuid import warnings import zipfile from typing import Dict @@ -374,3 +375,7 @@ def deprecation(prev_name: str, new_name: str, stacklevel: int = 3): category=FutureWarning, stacklevel=stacklevel, ) + + +def create_safe_id(): + return uuid.uuid4().hex diff --git a/docs/source/db_setup.rst b/docs/source/db_setup.rst new file mode 100644 index 00000000..13f6b843 --- /dev/null +++ b/docs/source/db_setup.rst @@ -0,0 +1,107 @@ +Setting Up MongoDB For ConvoKit +=============================== + +`The MongoDB Documentation `_ provides a complete +guide on installing and running a MongoDB server. Here, we provide a simplified +guide to getting MongoDB setup to use with ConvoKit's DB Storage mode, in a handful +of settings. + +Running MongoDB with Conda +-------------------------- + +0. Install conda if needed, following `these instructions `_ for your system. +1. (Optional) Create a new environment where you want to install mongodb: + +:: + + $ conda create --name my_env + +2. Activate your newly created environment, or an existing environment where you want to install mongodb: + +:: + + $ conda activate my_env + + +3. Install the mongodb package. + +:: + + $ conda install mongodb + +Check to see if version is at least 5.0. + +:: + + $ mongod --version + +If not, utilize: + +:: + + $ conda install -c conda-forge mongodb=5.0 + + +4. Start the MongoDB server as a daemon process. + +:: + + $ mongod --fork --logpath --dbpath + +5. Use the MongoDB server for ConvoKit! +6. To stop the MongoDB server, on Linux or MacOS, use the ``htop`` command to find the mongod process ID and run: + +:: + + $ kill + +6. Alternitivly, to stop the MongoDB server on Linux, run + +:: + + $ mongod --shutdown + + +Sometimes, the above process doesn't work for MacOS. However, there is another solution for MacOS users below. + + +Running MongoDB on MacOS with Homebrew +-------------------------------------- + +0. If needed install Homebrew `here `_. +1. Use Homebrew to install MongoDB. + +:: + + $ brew tap mongodb/brew + $ brew install mongodb-community@5.0 + +2. Start MongoDB. + +:: + + $ brew services start mongodb-community@5.0 + +3. Use the MongoDB server for ConvoKit! +4. To stop the MongoDB server, run + +:: + + $ brew services stop mongodb-community@5.0 + +Using MongoDB Atlas: A remote MongoDB server in the cloud +--------------------------------------------------------- + +MongoDB offers a cloud service version of their database, called MongoDB Atlas. +Atlas provides a free tier that is a good option for starting out with ConvoKit +remote DB storage, and several paid tiers that provide production level performance. +Follow these instructions, based on `the instructions for getting started with Atlas +provided by the MongoDB team `_, +to setup a MongoDB server in the cloud for use with ConvoKit. + +0. Register a new MongoDB Atlas account here: https://account.mongodb.com/account/register, and log into the Atlas UI. +1. Create a new MongoDB cluster and a database user within the Atlas UI. +2. Add your IP address to the set of approved IP addresses that can connect to cluster, and setup a DB user, within the Atlas UI (as suggested in the "Setup connection security" tab). +3. In the "Choose a connection method" tab, select "Connect your Application" and choose Python as your driver. Then, copy the outputted URI, which should look something like ``mongodb+srv://:@cluster0.m0srt.mongodb.net/myFirstDatabase?retryWrites=true&w=majority`` +4. Paste the aforementioned URI into ~/.convokit/config.yml in the db_host field. Then, replace and with the credentials you setup in step 1, and replace ``myFirstDatabase`` with ``convokit``. +5. Use the remote MongoDB server for ConvoKit! \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index c397f23f..9a389466 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -14,14 +14,26 @@ Contents -------- .. toctree:: - :maxdepth: 2 + :maxdepth: 1 + :caption: Getting Started + Installing ConvoKit Introductory tutorial Core Concepts + Data Format + Troubleshooting + +.. toctree:: + :maxdepth: 2 + :caption: API Reference + The Corpus model Transformers + Utilities + +.. toctree:: + :maxdepth: 2 + :caption: Datasets and Examples + Datasets Examples - Data Format - Utilities - Troubleshooting diff --git a/docs/source/install.rst b/docs/source/install.rst new file mode 100644 index 00000000..74a05de1 --- /dev/null +++ b/docs/source/install.rst @@ -0,0 +1,36 @@ +Installing ConvoKit +------------------- + +System Requirements +=================== +ConvoKit requires Python 3.7 or above. + +Package Installation +==================== +ConvoKit can be installed via pip: ``pip3 install convokit`` + +Post-install Steps +================== +ConvoKit relies on NLTK and SpaCy to implement certain basic NLP functions. +If you have not already previously used these packages, they require additional first-time setup: + +#. For NLTK, download the punkt tokenizer: ``import nltk; nltk.download('punkt')`` (in a ``python`` interactive session) + +#. For SpaCy, download the default English model: ``python3 -m spacy download en_core_web_sm`` + +Optional: Choose a Backend +========================== +By default, ConvoKit uses a native Python backend which keeps all data in memory during runtime. +This is suitable for most use cases and does not require any additional setup. +However, certain use cases (including low-memory environments and real-time applications) may prefer the alternative MongoDB backend, which requires additional setup. +For more information on choosing between the two options and setting up the MongoDB backend, please consult the following guides: + +.. toctree:: + :maxdepth: 1 + + Choosing a Backend: native Python vs MongoDB + Setting up MongoDB for ConvoKit + +Troubleshooting +=============== +If you run into any issues during or after installation, check out our `Troubleshooting Guide `_ for a list of solutions to common issues. \ No newline at end of file diff --git a/docs/source/storage_options.rst b/docs/source/storage_options.rst new file mode 100644 index 00000000..0075fc5f --- /dev/null +++ b/docs/source/storage_options.rst @@ -0,0 +1,79 @@ +Choosing a Runtime Storage Backend +---------------------------------- + +The runtime storage backend determines how the data for ConvoKit objects is stored at runtime for use in Python code. +Note that this should not be confused with the data format, which defines how Corpus data gets saved to a file for persistence and redistribution. + +ConvoKit supports two backends: native Python (the default) and MongoDB. +This guide provides a brief explanation of each one and why you might want to use them. + +The Native Python Backend +========================= +ConvoKit's default backend stores all data in system memory as native Python objects (e.g., lists and dictionaries). +This means that once a Corpus has been loaded, reading and writing data to the Corpus is relatively fast, since all data is already in native Python representations so no conversion steps are needed. +However, this also involves keeping all the data in system memory (RAM) at once, which means that for large corpora the memory cost can be quite high. +Furthermore, since RAM is by definition volatile, any changes you make to the Corpus will persist only as long as your Python script or notebook stays running, unless you explicitly dump the Corpus to a file. +As a result, changes may be lost if your script crashes or your computer shuts down unexpectedly. + +We generally recommend sticking to the default native Python backend due to its speed and simplicity. +In particular, it is a good choice for interactive sessions (e.g., ipython shell or Jupyter notebooks), where runtime errors won't crash the whole session/notebook and hence the persistence issue is less vital, and where you may be performing a lot of experimental changes to a Corpus that would benefit from the faster read/write speed. + +The MongoDB Backend +=================== +As the name suggests, the MongoDB backend stores all data in a MongoDB database. +This provides two key advantages. +First, it allows ConvoKit to take advantage of MongoDB's lazy loading, so not all data needs to be loaded into system memory at once, resulting in a much smaller memory footprint which enables the use of extremely large corpora in environments that might not otherwise have enough memory to handle them (e.g., a personal laptop). +Second, since all changes are written to the database, which is backed by on-disk files, changes are resilient to unexpected crashes; in the event of such a crash you can simply reconnect to the database to pick up where you left off. +On the other hand, reading and writing data to the Corpus is much slower in the MongoDB backend, both because doing so involves database reads/writes which are disk operations (slower than RAM operations) and because data must be converted between MongoDB format and Python objects. +Note that using the MongoDB backend requires some additional setup; see the :doc:`MongoDB setup guide ` for instructions. + +We recommend the MongoDB backend for the following use cases: memory-limited environments where your available RAM is insufficient to use your desired Corpus with the default backend; and live-service environments where you expect to continuously make changes to a Corpus over time and need to be resilient to unexpected crashes (e.g., using a Corpus as a component of a web server). + +How to Change Backends +====================== +Once you have chosen the backend that best suits your purposes, the next step is to tell ConvoKit to use it. +This can be done in three ways: + +#. Corpus-level: ConvoKit supports specifying a backend on a per-Corpus basis. This is done through the ``storage_type`` parameter when constructing a corpus. You can set this parameter to the string ``"mem"`` for the native Python backend or ``"db"`` for the MongoDB backend. It is possible to mix Python-backed and MongoDB-backed corpora in the same script. + +#. System-level: If you want to change the *default* backend in all ConvoKit code that runs on your computer (i.e., the backend that gets used when the ``storage_type`` parameter is not given), this is controlled by the ConvoKit system setting ``"default_storage_mode"``. This is set to ``"mem"`` when ConvoKit is first installed, but you can change it to ``"db"`` to tell ConvoKit to use the MongoDB backend by default. Note: ConvoKit system settings are found in the ``config.yml`` file, which is located in the hidden directory ``~/.convokit``. + +#. Script-level: As an in-between option, if you want to change the default storage option used in a specific Python script but not at the whole-system level, you can do this by setting the environment variable ``CONVOKIT_STORAGE_MODE`` before running your script. For example, if you normally run your script as ``python3 myscript.py``, running it instead as ``CONVOKIT_STORAGE_MODE=db python myscript.py`` will set the default storage mode to MongoDB for that run of the script only. + +Differences in Corpus behavior between backends +=============================================== +For the most part, the two backends are designed to be interchangeable; that is, code written for one backend should work in the other backend out-of-the-box. +However, some specifics of MongoDB result in two minor differences in Corpus behavior that you should be aware of when writing your code. + +First, since the MongoDB backend uses a MongoDB database as its data storage system, it needs to give that database a name. +Thus, there is an additional parameter in the Corpus constructor, ``db_collection_prefix``, which is only used by the MongoDB backend. +This parameter determines how the MongoDB database will be named. +Note that you still have the option of not specifying a name, but in this case a random name will be used. +It is best practice to explicitly supply a name yourself, so you know what database to reconnect to in the event that reconnection is needed after a system crash. + +Second, because all operations in MongoDB involve *copying* data from the MongoDB database to the Python process (or vice versa), all metadata values must be treated as *immutable*. +This does not really make a difference for primitive values like ints and strings, since those are immutable in Python to begin with. +However, code that relies on mutating a more complex type like a dictionary may not work as expected in the MongoDB backend. +For example, suppose the metadata entry ``"foo"`` is a list type, and you access it by saving it to a Python variable as follows: + +>>> saved_foo = my_utt.meta["foo"] + +Because lists are considered mutable in Python, you might expect the following code to successfully add a new item in the ``foo`` metadata of ``my_utt``: + +>>> saved_foo.append("new value") + +This will work in the native Python backend. +However, it will not work in the MongoDB backend; the code will run, but only the variable ``saved_foo`` will be affected, not the actual metadata of ``my_utt``. +This is because ``saved_foo`` only contains a copy of the data in the MongoDB database, which has been translated into a Python object. +Thus, any operations that are done directly on ``saved_foo`` are done only to the Python object, and do not involve any database writes. + +It is therefore best to treat *all* metadata objects, regardless of type, as immutable when using the MongoDB backend. +Thus, the correct way to change metadata in MongoDB mode is the same way you would change an int or string type metadata entry: that is, by completely overwriting it. +For example, to achieve the desired effect with the ``"foo"`` metadata entry from above, you should do the following: + +>>> temp_foo = my_utt.meta["foo"] +>>> temp_foo.append("new value") +>>> my_utt.meta["foo"] = temp_foo + +By adding the additional line of code that overwrites the ``"foo"`` metadata entry, you are telling ConvoKit that you want to update the value of ``"foo"`` in the database-backed metadata table with a new value, represented by ``temp_foo`` which contains the new additional item. +Thus the contents of ``temp_foo`` will get written to the database as the new value of ``my_utt.meta["foo"]``, hence updating the metadata as desired. diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst index f48cb05d..ec7b2efe 100644 --- a/docs/source/tutorial.rst +++ b/docs/source/tutorial.rst @@ -5,15 +5,7 @@ Introductory tutorial Setup ===== -This toolkit requires Python >=3.7. - -If you haven't already, - -#. Download the toolkit: ``pip3 install convokit`` - -#. Download Spacy's English model: ``python3 -m spacy download en_core_web_sm`` - -#. Download nltk's punkt tokenizer: ``import nltk; nltk.download('punkt')`` (in a ``python`` interactive session) +Follow the :doc:`Installation Guide ` to install and set up ConvoKit. **If you encounter difficulties with installation**, check out our `Troubleshooting Guide `_ for a list of solutions to common issues. diff --git a/setup.py b/setup.py index e398fe42..a8d0fc81 100644 --- a/setup.py +++ b/setup.py @@ -53,6 +53,9 @@ "clean-text>=0.6.0", "unidecode>=1.1.1", "tqdm>=4.64.0", + "pymongo>=4.0", + "pyyaml>=5.4.1", + "dnspython>=1.16.0", ], extras_require={ "craft": ["torch>=0.12"],