From c687e355ac782883d6a9320edf9bff3f16682fae Mon Sep 17 00:00:00 2001 From: Felix Riemen <92442930+ferie24@users.noreply.github.com> Date: Wed, 14 Sep 2022 22:08:41 +0200 Subject: [PATCH] Run bug fixed (#178) * Custom fields added to run on upload * if statement cleanup * bugfile.py removed --- .../arangopipe_admin_api.py | 180 +++++++++--------- .../arangopipe_storage/arangopipe_api.py | 160 +++++++++------- 2 files changed, 181 insertions(+), 159 deletions(-) diff --git a/arangopipe/arangopipe/arangopipe_storage/arangopipe_admin_api.py b/arangopipe/arangopipe/arangopipe_storage/arangopipe_admin_api.py index aca6cbd..5afcc4d 100644 --- a/arangopipe/arangopipe/arangopipe_storage/arangopipe_admin_api.py +++ b/arangopipe/arangopipe/arangopipe_storage/arangopipe_admin_api.py @@ -15,8 +15,7 @@ from requests.auth import HTTPBasicAuth import time - -#import traceback +# import traceback # create logger with 'spam_application' logger = logging.getLogger('arangopipe_admin_logger') logger.setLevel(logging.DEBUG) @@ -37,8 +36,7 @@ class ArangoPipeAdmin: - def __init__(self, reuse_connection=True, config=None, persist_conn=True,\ - client_url = None): + def __init__(self, reuse_connection=True, config=None, persist_conn=True, client_url=None): self.reuse_connection = reuse_connection self.db = None self.emlg = None @@ -62,8 +60,8 @@ def __init__(self, reuse_connection=True, config=None, persist_conn=True,\ logger.info(info_msg) else: - assert config is not None,\ - "You must provide connection information for new connections" + assert config is not None, \ + "You must provide connection information for new connections" self.config = config self.cfg = config.cfg @@ -74,7 +72,7 @@ def __init__(self, reuse_connection=True, config=None, persist_conn=True,\ db_serv_port = self.cfg['arangodb'][self.mscp.DB_SERVICE_PORT] db_end_point = self.cfg['arangodb'][self.mscp.DB_SERVICE_END_POINT] db_serv_name = self.cfg['arangodb'][self.mscp.DB_SERVICE_NAME] - + except KeyError as k: logger.error("Connection information is missing : " + k.args[0]) @@ -90,7 +88,7 @@ def __init__(self, reuse_connection=True, config=None, persist_conn=True,\ else: db_dbName = '' if 'username' in self.cfg['arangodb']: - logger.info("user name for connection: " +\ + logger.info("user name for connection: " + \ str(self.cfg['arangodb'][self.mscp.DB_USER_NAME])) db_user_name = self.cfg['arangodb'][self.mscp.DB_USER_NAME] else: @@ -119,8 +117,8 @@ def __init__(self, reuse_connection=True, config=None, persist_conn=True,\ logger.info("A root user password was specified, persisting...") try: - self.create_db(db_serv_host, db_serv_port,\ - db_serv_name, db_end_point,\ + self.create_db(db_serv_host, db_serv_port, + db_serv_name, db_end_point, db_dbName, db_user_name, db_password, db_conn_protocol) # If you could create a DB, proceed with provisioning the graph. Otherwise you @@ -133,8 +131,8 @@ def __init__(self, reuse_connection=True, config=None, persist_conn=True,\ except: logger.error("Error connecting to DB, trying again...") time.sleep(2) - self.create_db(db_serv_host, db_serv_port,\ - db_serv_name, db_end_point,\ + self.create_db(db_serv_host, db_serv_port, + db_serv_name, db_end_point, db_dbName, db_user_name, db_password, db_conn_protocol) self.create_enterprise_ml_graph(db_replication_factor) if persist_conn: @@ -148,12 +146,12 @@ def check_repeated_creation(self, api_data): repeated_connection = False else: try: - user_name_equal = api_data[self.mscp.DB_USER_NAME] ==\ - self.cfg['arangodb'][self.mscp.DB_USER_NAME] - password_equal = api_data[self.mscp.DB_PASSWORD] ==\ - self.cfg['arangodb'][self.mscp.DB_PASSWORD] - db_name_equal = api_data[self.mscp.DB_NAME] ==\ - self.cfg['arangodb'][self.mscp.DB_NAME] + user_name_equal = api_data[self.mscp.DB_USER_NAME] == \ + self.cfg['arangodb'][self.mscp.DB_USER_NAME] + password_equal = api_data[self.mscp.DB_PASSWORD] == \ + self.cfg['arangodb'][self.mscp.DB_PASSWORD] + db_name_equal = api_data[self.mscp.DB_NAME] == \ + self.cfg['arangodb'][self.mscp.DB_NAME] repeated_connection = user_name_equal or password_equal or db_name_equal if user_name_equal: logger.info( @@ -180,8 +178,8 @@ def create_config(self): def get_config(self): return self.config - def create_db(self, db_srv_host, db_srv_port, db_serv_name,\ - db_end_point, db_dbName, db_user_name, db_password,\ + def create_db(self, db_srv_host, db_srv_port, db_serv_name, \ + db_end_point, db_dbName, db_user_name, db_password, \ db_conn_protocol): host_connection = db_conn_protocol + "://" + db_srv_host + ":" + str( @@ -190,7 +188,7 @@ def create_db(self, db_srv_host, db_srv_port, db_serv_name,\ logger.debug("Connection reuse: " + str(self.reuse_connection)) if not self.reuse_connection: API_ENDPOINT = host_connection + "/_db/_system/" + db_end_point + \ - "/" + db_serv_name + "/" + db_serv_name print("API endpoint: " + API_ENDPOINT) if db_dbName: @@ -202,16 +200,19 @@ def create_db(self, db_srv_host, db_srv_port, db_serv_name,\ "Password preference for managed connection was indicated !" ) - api_data = {self.mscp.DB_NAME : db_dbName,\ - self.mscp.DB_USER_NAME: db_user_name,\ - self.mscp.DB_PASSWORD: db_password } + api_data = {self.mscp.DB_NAME: db_dbName, \ + self.mscp.DB_USER_NAME: db_user_name, \ + self.mscp.DB_PASSWORD: db_password} logger.info("Requesting a managed service database...") - - if self.mscp.DB_ROOT_USER_PASSWORD in self.cfg['arangodb'] and self.mscp.DB_ROOT_USER in self.cfg['arangodb']: - r = requests.post(url=API_ENDPOINT, auth=HTTPBasicAuth(self.cfg['arangodb'][self.mscp.DB_ROOT_USER], self.cfg['arangodb'][self.mscp.DB_ROOT_USER_PASSWORD]),json=api_data, verify=True) + if self.mscp.DB_ROOT_USER_PASSWORD in self.cfg['arangodb'] and self.mscp.DB_ROOT_USER in self.cfg[ + 'arangodb']: + r = requests.post(url=API_ENDPOINT, auth=HTTPBasicAuth(self.cfg['arangodb'][self.mscp.DB_ROOT_USER], + self.cfg['arangodb'][ + self.mscp.DB_ROOT_USER_PASSWORD]), + json=api_data, verify=True) else: - r = requests.post(url=API_ENDPOINT,json=api_data, verify=True) + r = requests.post(url=API_ENDPOINT, json=api_data, verify=True) if r.status_code == 409 or r.status_code == 400: logger.error( @@ -220,10 +221,9 @@ def create_db(self, db_srv_host, db_srv_port, db_serv_name,\ ) return - assert r.status_code == 200, \ - "Managed DB endpoint is unavailable !, reason: " + r.reason + " err code: " +\ - str(r.status_code) + "Managed DB endpoint is unavailable !, reason: " + r.reason + " err code: " + \ + str(r.status_code) result = json.loads(r.text) logger.info("Managed service database was created !") ms_dbName = result['dbName'] @@ -253,10 +253,10 @@ def create_db(self, db_srv_host, db_srv_port, db_serv_name,\ self.config = disk_cfg self.cfg = disk_cfg.cfg # Connect to arangopipe database as administrative user. - #This returns an API wrapper for "test" database. + # This returns an API wrapper for "test" database. print("Host Connection: " + str(host_connection)) - client = ArangoClient(hosts= host_connection) - #This is for the case when it is not a 409 or 400 but due to the OASIS connection + client = ArangoClient(hosts=host_connection) + # This is for the case when it is not a 409 or 400 but due to the OASIS connection # issue db = client.db(ms_dbName, ms_user_name, ms_password, verify=True) @@ -267,7 +267,7 @@ def create_db(self, db_srv_host, db_srv_port, db_serv_name,\ def create_enterprise_ml_graph(self, db_replication_factor): - cl = ['project', 'models', 'datasets', 'featuresets', 'modelparams', 'run',\ + cl = ['project', 'models', 'datasets', 'featuresets', 'modelparams', 'run', \ 'devperf', 'servingperf', 'deployment'] if self.reuse_connection: @@ -284,24 +284,23 @@ def create_enterprise_ml_graph(self, db_replication_factor): self.db.create_collection(col, db_replication_factor) self.emlg.create_vertex_collection(col) - - from_list = ['project', 'models', 'run', 'run', 'run', 'run',\ - 'deployment', 'deployment', 'deployment', 'deployment',\ + from_list = ['project', 'models', 'run', 'run', 'run', 'run', \ + 'deployment', 'deployment', 'deployment', 'deployment', \ 'featuresets'] - to_list = ['models', 'run', 'modelparams', 'datasets', 'devperf',\ - 'featuresets', 'servingperf', 'models', 'modelparams',\ + to_list = ['models', 'run', 'modelparams', 'datasets', 'devperf', \ + 'featuresets', 'servingperf', 'models', 'modelparams', \ 'featuresets', 'datasets'] - edge_names = ['project_models', 'run_models', 'run_modelparams', 'run_datasets',\ + edge_names = ['project_models', 'run_models', 'run_modelparams', 'run_datasets', \ 'run_devperf', 'run_featuresets', 'deployment_servingperf', \ - 'deployment_model', 'deployment_modelparams', 'deployment_featureset',\ - 'featureset_dataset'] + 'deployment_model', 'deployment_modelparams', 'deployment_featureset', \ + 'featureset_dataset'] for edge, fromv, tov in zip(edge_names, from_list, to_list): if not self.emlg.has_edge_definition(edge): - self.db.create_collection(edge, edge = True,\ - replication_factor = db_replication_factor) - self.emlg.create_edge_definition(edge_collection = edge,\ - from_vertex_collections = [fromv],\ - to_vertex_collections = [tov] ) + self.db.create_collection(edge, edge=True, \ + replication_factor=db_replication_factor) + self.emlg.create_edge_definition(edge_collection=edge, \ + from_vertex_collections=[fromv], \ + to_vertex_collections=[tov]) self.cfg['arangodb'][ self.mscp.DB_REPLICATION_FACTOR] = db_replication_factor @@ -349,14 +348,14 @@ def register_deployment(self, dep_tag): deployment = self.emlg.vertex_collection("deployment") deploy_info = {"tag": dep_tag} dep_reg = deployment.insert(deploy_info) - #Link the deployment to the model parameters + # Link the deployment to the model parameters dep_model_params_edge = self.emlg.edge_collection( "deployment_modelparams") dep_model_params_key = dep_reg["_key"] + "-" + tagged_model_params[ "_key"] - the_dep_model_param_edge = { "_key": dep_model_params_key,\ - "_from": dep_reg["_id"],\ - "_to": tagged_model_params["_id"]} + the_dep_model_param_edge = {"_key": dep_model_params_key, \ + "_from": dep_reg["_id"], \ + "_to": tagged_model_params["_id"]} dep_mp_reg = dep_model_params_edge.insert(the_dep_model_param_edge) @@ -364,17 +363,17 @@ def register_deployment(self, dep_tag): dep_featureset_edge = self.emlg.edge_collection( "deployment_featureset") dep_featureset_key = dep_reg["_key"] + "-" + tagged_featureset["_key"] - the_dep_featureset_edge = { "_key": dep_featureset_key,\ - "_from": dep_reg["_id"],\ - "_to": tagged_featureset["_id"]} + the_dep_featureset_edge = {"_key": dep_featureset_key, \ + "_from": dep_reg["_id"], \ + "_to": tagged_featureset["_id"]} dep_fs_reg = dep_featureset_edge.insert(the_dep_featureset_edge) # Link the deployment to the model dep_model_edge = self.emlg.edge_collection("deployment_model") dep_featureset_key = dep_reg["_key"] + "-" + tagged_model["_key"] - the_dep_model_edge = { "_key": dep_featureset_key,\ - "_from": dep_reg["_id"],\ - "_to": tagged_model["_id"]} + the_dep_model_edge = {"_key": dep_featureset_key, \ + "_from": dep_reg["_id"], \ + "_to": tagged_model["_id"]} dep_model_reg = dep_model_edge.insert(the_dep_model_edge) return dep_model_reg @@ -387,7 +386,7 @@ def add_vertex_to_arangopipe(self, vertex_to_create): else: self.emlg = self.db.graph(self.cfg['mlgraph']['graphname']) - #Check if vertex exists in the graph, if not create it + # Check if vertex exists in the graph, if not create it if not self.emlg.has_vertex_collection(vertex_to_create): self.db.create_collection(vertex_to_create, rf) self.emlg.create_vertex_collection(vertex_to_create) @@ -403,7 +402,7 @@ def remove_vertex_from_arangopipe(self, vertex_to_remove, purge=True): else: self.emlg = self.db.graph(self.cfg['mlgraph']['graphname']) - #Check if vertex exists in the graph, if not create it + # Check if vertex exists in the graph, if not create it if self.emlg.has_vertex_collection(vertex_to_remove): self.emlg.delete_vertex_collection(vertex_to_remove, purge) @@ -423,26 +422,26 @@ def add_edge_definition_to_arangopipe(self, edge_col_name, edge_name, else: self.emlg = self.db.graph(self.cfg['mlgraph']['graphname']) - #Check if all data needed to create an edge exists, if so, create it + # Check if all data needed to create an edge exists, if so, create it if not self.emlg.has_vertex_collection(from_vertex_name): - logger.error("Source vertex, " + from_vertex_name +\ + logger.error("Source vertex, " + from_vertex_name + \ " does not exist, aborting edge creation!") return elif not self.emlg.has_vertex_collection(to_vertex_name): - logger.error("Destination vertex, " + to_vertex_name +\ + logger.error("Destination vertex, " + to_vertex_name + \ " does not exist, aborting edge creation!") return else: if not self.emlg.has_edge_definition(edge_name): if not self.emlg.has_edge_collection(edge_col_name): - self.db.create_collection(edge_col_name, edge = True,\ - replication_factor = rf) + self.db.create_collection(edge_col_name, edge=True, \ + replication_factor=rf) - self.emlg.create_edge_definition(edge_collection = edge_col_name,\ - from_vertex_collections=[from_vertex_name],\ - to_vertex_collections=[to_vertex_name] ) + self.emlg.create_edge_definition(edge_collection=edge_col_name, \ + from_vertex_collections=[from_vertex_name], \ + to_vertex_collections=[to_vertex_name]) else: logger.error("Edge, " + edge_name + " already exists!") @@ -457,18 +456,18 @@ def add_edges_to_arangopipe(self, edge_col_name, from_vertex_list, else: self.emlg = self.db.graph(self.cfg['mlgraph']['graphname']) - #Check if all data needed to create an edge exists, if so, create it + # Check if all data needed to create an edge exists, if so, create it if not self.emlg.has_edge_collection(edge_col_name): msg = "Edge collection %s did not exist, creating it!" % ( edge_col_name) logger.info(msg) - self.db.create_collection(edge_col_name, edge = True,\ - replication_factor = rf) + self.db.create_collection(edge_col_name, edge=True, \ + replication_factor=rf) - ed = self.emlg.create_edge_definition(edge_collection = edge_col_name,\ - from_vertex_collections= from_vertex_list,\ - to_vertex_collections= to_vertex_list ) + ed = self.emlg.create_edge_definition(edge_collection=edge_col_name, \ + from_vertex_collections=from_vertex_list, \ + to_vertex_collections=to_vertex_list) return @@ -508,9 +507,7 @@ def has_edge(self, edge_name): return result - def delete_all_databases(self,\ - preserve = ['arangopipe', 'facebook_db', \ - 'fb_node2vec_db', 'node2vecdb', '_system']): + def delete_all_databases(self, preserve=['arangopipe', 'facebook_db', 'fb_node2vec_db', 'node2vecdb', '_system']): db_srv_host = self.cfg['arangodb'][self.mscp.DB_SERVICE_HOST] db_srv_port = self.cfg['arangodb'][self.mscp.DB_SERVICE_PORT] @@ -520,7 +517,7 @@ def delete_all_databases(self,\ self.mscp.DB_ROOT_USER_PASSWORD] except KeyError as k: msg = "Root credentials are unvailable, try again " + \ - "with a new connection and credentials for root provided" + "with a new connection and credentials for root provided" logger.error(msg) logger.error("Credential information that is missing : " + k.args[0]) @@ -529,21 +526,20 @@ def delete_all_databases(self,\ db_conn_protocol = self.cfg['arangodb'][self.mscp.DB_CONN_PROTOCOL] host_connection = db_conn_protocol + "://" + \ - db_srv_host + ":" + str(db_srv_port) + db_srv_host + ":" + str(db_srv_port) if not root_user and not root_user_password: msg = "You will need to provide root credentials while connecting to perform" + \ - " deletes of databases ! Please try again after doing so." + " deletes of databases ! Please try again after doing so." logger.info(msg) return - client = ArangoClient(hosts= host_connection) + client = ArangoClient(hosts=host_connection) if not '_system' in preserve: preserve.append('_system') - - sys_db = client.db('_system',\ - username = root_user,\ - password = root_user_password, verify=True) + sys_db = client.db('_system', \ + username=root_user, \ + password=root_user_password, verify=True) try: @@ -570,7 +566,7 @@ def delete_database(self, db_to_delete): self.mscp.DB_ROOT_USER_PASSWORD] except KeyError as k: msg = "Root credentials are unvailable, try again " + \ - "with a new connection and credentials for root provided" + "with a new connection and credentials for root provided" logger.error(msg) logger.error("Credential information that is missing : " + k.args[0]) @@ -579,18 +575,18 @@ def delete_database(self, db_to_delete): db_conn_protocol = self.cfg['arangodb'][self.mscp.DB_CONN_PROTOCOL] host_connection = db_conn_protocol + "://" + \ - db_srv_host + ":" + str(db_srv_port) + db_srv_host + ":" + str(db_srv_port) if not root_user and not root_user_password: msg = "You will need to provide root credentials while connecting to perform" + \ - " deletes of databases ! Please try again after doing so." + " deletes of databases ! Please try again after doing so." logger.info(msg) return - client = ArangoClient(hosts= host_connection) + client = ArangoClient(hosts=host_connection) - sys_db = client.db('_system',\ - username = root_user,\ - password = root_user_password, verify=True) + sys_db = client.db('_system', \ + username=root_user, \ + password=root_user_password, verify=True) try: if sys_db.has_database(db_to_delete): sys_db.delete_database(db_to_delete) diff --git a/arangopipe/arangopipe/arangopipe_storage/arangopipe_api.py b/arangopipe/arangopipe/arangopipe_storage/arangopipe_api.py index c9dc4b9..62089ef 100644 --- a/arangopipe/arangopipe/arangopipe_storage/arangopipe_api.py +++ b/arangopipe/arangopipe/arangopipe_storage/arangopipe_api.py @@ -37,6 +37,7 @@ class ArangoPipe: (3) Register your featureset with ArangoPipe (4) Register you model with ArangoPipe """ + def __init__(self, config): self.cfg = config.get_cfg() self.emlg = None @@ -52,7 +53,7 @@ def heart_beat(self): print("WARNING : " + str(e)) logger.error( "Your database was perhaps deleted, try a new connection") - #logger.error("Error: " + str(e)) + # logger.error("Error: " + str(e)) raise Exception("Your connection is stale, try a new connection!") return @@ -114,8 +115,8 @@ def lookup_entity_by_id(self, entity_id): asset_info = None if len(asset_keys) == 0: - logger.info("The asset by name: " + asset_name +\ - " was not found in Arangopipe!") + logger.info("The asset by name: " + asset_name + \ + " was not found in Arangopipe!") else: asset_info = asset_keys[0] @@ -130,8 +131,8 @@ def lookup_entity(self, asset_name, asset_type): asset_info = None if len(asset_keys) == 0: - logger.info("The asset by name: " + asset_name +\ - " was not found in Arangopipe!") + logger.info("The asset by name: " + asset_name + \ + " was not found in Arangopipe!") else: asset_info = asset_keys[0] @@ -188,8 +189,8 @@ def lookup_modelparams(self, tag_value): mp_info = None mp_keys = [doc for doc in cursor] if len(mp_keys) == 0: - logger.info("The model params for tag: " + tag_value +\ - " was not found in Arangopipe!") + logger.info("The model params for tag: " + tag_value + \ + " was not found in Arangopipe!") else: mp_info = mp_keys[0] return mp_info @@ -207,8 +208,8 @@ def lookup_modelperf(self, tag_value): mperf_info = None mperf_keys = [doc for doc in cursor] if len(mperf_keys) == 0: - logger.info("The model performance for tag: " + tag_value +\ - " was not found in Arangopipe!") + logger.info("The model performance for tag: " + tag_value + \ + " was not found in Arangopipe!") else: mperf_info = mperf_keys[0] @@ -223,24 +224,21 @@ def init_graph(self): db_passwd = self.cfg['arangodb']['password'] db_conn_protocol = self.cfg['arangodb'][self.mscp.DB_CONN_PROTOCOL] - host_conn_str = db_conn_protocol + "://" + \ + host_conn_str = db_conn_protocol + "://" + \ db_serv_host + ":" + str(db_serv_port) - client = ArangoClient(hosts= host_conn_str) + client = ArangoClient(hosts=host_conn_str) - self.db = client.db(name= db_name, \ - username=db_user_name,\ - password=db_passwd, verify=True) + self.db = client.db(name=db_name, \ + username=db_user_name, \ + password=db_passwd, verify=True) self.emlg = self.db.graph(self.cfg['mlgraph']['graphname']) return - - - - def register_model(self, mi, user_id = "authorized_user",\ - project = "Wine-Quality-Regression-Modelling"): - """ Register a model. The operation requires specifying a user id. If the user id is permitted to register a model, then the registration proceeds, otherwise an unauthorized operation is indicated. """ + def register_model(self, mi, user_id="authorized_user", project="Wine-Quality-Regression-Modelling"): + """ Register a model. The operation requires specifying a user id. If the user id is permitted to register a + model, then the registration proceeds, otherwise an unauthorized operation is indicated. """ model_name = mi["name"] try: @@ -266,16 +264,17 @@ def register_model(self, mi, user_id = "authorized_user",\ project_model_edge = self.emlg.edge_collection("project_models") project_model_key = the_project_info["_key"] + "-" + model_reg["_key"] - a_project_model_edge = {"_key": project_model_key,\ - "_from": "project/" + the_project_info["_key"],\ - "_to": "models/" + model_reg["_key"]} + a_project_model_edge = {"_key": project_model_key, \ + "_from": "project/" + the_project_info["_key"], \ + "_to": "models/" + model_reg["_key"]} pm_reg = project_model_edge.insert(a_project_model_edge) logger.info("Recording project model link " + str(pm_reg)) return model_reg def register_dataset(self, ds_info, user_id="authorized_user"): - """ Register a dataset. The operation requires specifying a user id. If the user id is permitted to register a dataset, then the registration proceeds, otherwise an unauthorized operation is indicated. """ + """ Register a dataset. The operation requires specifying a user id. If the user id is permitted to register + a dataset, then the registration proceeds, otherwise an unauthorized operation is indicated. """ ds_name = ds_info["name"] try: @@ -294,10 +293,10 @@ def register_dataset(self, ds_info, user_id="authorized_user"): return ds_reg - - def register_featureset(self, fs_info, dataset_id, \ - user_id = "authorized_user"): - """ Register a featureset. ManagedServiceConnParamThe operation requires specifying a user id. If the user id is permitted to register a featureset, then the registration proceeds, otherwise an unauthorized operation is indicated. """ + def register_featureset(self, fs_info, dataset_id, user_id="authorized_user"): + """ Register a featureset. ManagedServiceConnParamThe operation requires specifying a user id. If the user id + is permitted to register a featureset, then the registration proceeds, otherwise an unauthorized operation is + indicated. """ fs_name = fs_info["name"] try: existing_fs = self.lookup_featureset(fs_name) @@ -317,16 +316,18 @@ def register_featureset(self, fs_info, dataset_id, \ "featureset_dataset") featureset_dataset_key = fs_reg["_key"] + "-" + dataset_id - a_featureset_dataset_edge = {"_key": featureset_dataset_key,\ - "_from": "featuresets/" + fs_reg["_key"],\ - "_to": "datasets/" + dataset_id} + a_featureset_dataset_edge = {"_key": featureset_dataset_key, \ + "_from": "featuresets/" + fs_reg["_key"], \ + "_to": "datasets/" + dataset_id} fsds_reg = featureset_dataset_edge.insert(a_featureset_dataset_edge) logger.info("Recording featureset dataset link " + str(fsds_reg)) return fs_reg def log_run(self, ri): - """ Log a run. Logging a run requires specifying a dataset, featureset and a model against which this run is recored. A run records model parameters and model performance. The run object is probably most useful for the analysis of model performance with respect to a featureset, model hyper-parameters and a dataset.""" + """ Log a run. Logging a run requires specifying a dataset, featureset and a model against which this run is + recored. A run records model parameters and model performance. The run object is probably most useful for the + analysis of model performance with respect to a featureset, model hyper-parameters and a dataset. """ rrid = ri["run_id"] mp = ri["model-params"] @@ -335,70 +336,95 @@ def log_run(self, ri): mperf["_key"] = mperf["run_id"] model_key = ri["model"] + run_info_adds = {"_key": rrid, "timestamp": mperf["timestamp"]} + default_params = ["dataset", "featureset", "model", "run_id", "model-params", "model-perf", "tag", "project", + "deployment_tag"] + default_run_params = ["run_id", "deployment_tag"] + + for key in ri: + if key in default_params: + if key in default_run_params: + run_info_adds[key] = ri[key] + else: + run_info_adds[key] = ri[key] + run = self.emlg.vertex_collection("run") - run_info = {"_key": rrid, "timestamp": mperf["timestamp"]} - if "deployment_tag" in ri: - run_info["deployment_tag"] = ri["deployment_tag"] - if "tag" in ri: - run_info["tag"] = ri["tag"] - logger.info("Run info " + str(run_info)) - run_reg = run.insert(run_info) + # collection type + logger.info("Run info " + str(run_info_adds)) + # logger + run_reg = run.insert(run_info_adds) + # insert dict into collection type logger.info("Recording run " + str(run_reg)) + # logger run_model_key = run_reg["_key"] + "-" + model_key - a_run_model_edge = {"_key": run_model_key,\ - "_from" :"models/" + model_key, \ - "_to": "run/" + rrid} + # key gen + a_run_model_edge = {"_key": run_model_key, "_from": "models/" + model_key, "_to": "run/" + rrid} + # dict creation run_model_edge = self.emlg.edge_collection("run_models") + # collection type rme_reg = run_model_edge.insert(a_run_model_edge) + # insert dict into collection logger.info("Recording model run link " + str(rme_reg)) + # logger model_param = self.emlg.vertex_collection("modelparams") + # get collection mp_reg = model_param.insert(mp) + # insert dict into collection logger.info("Recording model params " + str(mp_reg)) + # logger run_fs_edge = self.emlg.edge_collection("run_featuresets") + # get colletion run_fs_key = rrid + "-" + ri["featureset"] - - a_edge_run_fs = {"_key": run_fs_key,\ - "_from": "run/" + rrid,\ - "_to": "featuresets/" + ri["featureset"]} - + # gen key + a_edge_run_fs = {"_key": run_fs_key, "_from": "run/" + rrid, "_to": "featuresets/" + ri["featureset"]} + # dict gen rfse_reg = run_fs_edge.insert(a_edge_run_fs) + # insert dict into collection logger.info("Recording run featureset link " + str(rfse_reg)) + # logger run_mp_edge = self.emlg.edge_collection("run_modelparams") + # get collection run_mp_key = rrid + "-" + mp["run_id"] - - a_run_mp_edge = {"_key": run_mp_key,\ - "_from": "run/" + rrid,\ - "_to": "modelparams/" + mp_reg["_key"]} - + # key gen + a_run_mp_edge = {"_key": run_mp_key, "_from": "run/" + rrid, "_to": "modelparams/" + mp_reg["_key"]} + # dict creation rmp_reg = run_mp_edge.insert(a_run_mp_edge) + # insert into collection logger.info("Recording run model params " + str(rmp_reg)) + # logger model_perf = self.emlg.vertex_collection("devperf") + # get collection dp_reg = model_perf.insert(mperf) + # insert into collection logger.info("Recording model dev performance " + str(dp_reg)) + # logger run_devperf_edge = self.emlg.edge_collection("run_devperf") + # get collection run_devperf_key = rrid + "-" + dp_reg["_key"] - - a_run_devperfedge = {"_key": run_devperf_key,\ - "_from": "run/" + rrid,\ - "_to": "devperf/" + dp_reg["_key"]} + # key gen + a_run_devperfedge = {"_key": run_devperf_key, "_from": "run/" + rrid, "_to": "devperf/" + dp_reg["_key"]} + # dict gen rdp_reg = run_devperf_edge.insert(a_run_devperfedge) + # insert into collection logger.info("Recording run dev perf link " + str(rdp_reg)) + # logger run_dataset_edge = self.emlg.edge_collection("run_datasets") + # get collection run_dataset_key = rrid + "-" + ri["dataset"] - - a_run_dataset_edge = {"_key": run_dataset_key,\ - "_from": "run/" + rrid,\ - "_to": "datasets/" + ri["dataset"]} + # key gen + a_run_dataset_edge = {"_key": run_dataset_key, "_from": "run/" + rrid, "_to": "datasets/" + ri["dataset"]} + # dict gen rds_reg = run_dataset_edge.insert(a_run_dataset_edge) + # insert into collection logger.info("Recording run dev perf link " + str(rds_reg)) - + # logger return def log_serving_perf(self, sp, dep_tag, userid="authorized user"): @@ -416,9 +442,9 @@ def log_serving_perf(self, sp, dep_tag, userid="authorized user"): dep_servingperf_edge = self.emlg.edge_collection( "deployment_servingperf") dep_servingperf_key = the_dep_doc["_key"] + "-" + sp_reg["_key"] - the_dep_servingperf_edge = { "_key": dep_servingperf_key,\ - "_from": the_dep_doc["_id"],\ - "_to": sp_reg["_id"]} + the_dep_servingperf_edge = {"_key": dep_servingperf_key, \ + "_from": the_dep_doc["_id"], \ + "_to": sp_reg["_id"]} dep_servingperf_reg = dep_servingperf_edge.insert( the_dep_servingperf_edge) @@ -451,9 +477,9 @@ def insert_into_edge_type(self, document["_key"] = edge_key edge_info = ec.insert(document) else: - document = { "_from": from_vdoc['_id'],\ - "_to": to_vdoc['_id'],\ - "_key": edge_key } + document = {"_from": from_vdoc['_id'], \ + "_to": to_vdoc['_id'], \ + "_key": edge_key} edge_info = ec.insert(document) except Exception as e: logger.error(e)