diff --git a/orte/mca/state/base/state_base_fns.c b/orte/mca/state/base/state_base_fns.c index 3aa93c5c6cc..2d88cac52f4 100644 --- a/orte/mca/state/base/state_base_fns.c +++ b/orte/mca/state/base/state_base_fns.c @@ -922,8 +922,9 @@ void orte_state_base_check_all_complete(int fd, short args, void *cbdata) one_still_alive = false; j = opal_hash_table_get_first_key_uint32(orte_job_data, &u32, (void **)&job, &nptr); while (OPAL_SUCCESS == j) { - /* skip the daemon job */ - if (job->jobid == ORTE_PROC_MY_NAME->jobid) { + /* skip the daemon job and all jobs from other families */ + if (job->jobid == ORTE_PROC_MY_NAME->jobid || + ORTE_JOB_FAMILY(job->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) { goto next; } /* if this is the job we are checking AND it normally terminated, diff --git a/orte/orted/pmix/pmix_server_dyn.c b/orte/orted/pmix/pmix_server_dyn.c index e84178ee50f..7f7232949f5 100644 --- a/orte/orted/pmix/pmix_server_dyn.c +++ b/orte/orted/pmix/pmix_server_dyn.c @@ -42,6 +42,7 @@ #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/rmaps/base/base.h" +#include "orte/mca/rml/base/rml_contact.h" #include "orte/mca/state/state.h" #include "orte/util/name_fns.h" #include "orte/util/show_help.h" @@ -539,7 +540,14 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata) int rc, cnt; opal_pmix_pdata_t *pdat; orte_job_t *jdata; - opal_buffer_t buf; + orte_node_t *node; + orte_proc_t *proc; + opal_buffer_t buf, bucket; + opal_byte_object_t *bo; + orte_process_name_t dmn, pname; + char *uri; + opal_value_t val; + opal_list_t nodes; ORTE_ACQUIRE_OBJECT(cd); @@ -556,6 +564,7 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata) pdat = (opal_pmix_pdata_t*)opal_list_get_first(data); if (OPAL_BYTE_OBJECT != pdat->value.type) { rc = ORTE_ERR_BAD_PARAM; + ORTE_ERROR_LOG(rc); goto release; } /* the data will consist of a packed buffer with the job data in it */ @@ -565,15 +574,107 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata) pdat->value.data.bo.size = 0; cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &jdata, &cnt, ORTE_JOB))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + goto release; + } + + /* unpack the byte object containing the daemon uri's */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); OBJ_DESTRUCT(&buf); goto release; } + /* load it into a buffer */ + OBJ_CONSTRUCT(&bucket, opal_buffer_t); + opal_dss.load(&bucket, bo->bytes, bo->size); + bo->bytes = NULL; + free(bo); + /* prep a list to save the nodes */ + OBJ_CONSTRUCT(&nodes, opal_list_t); + /* unpack and store the URI's */ + cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &uri, &cnt, OPAL_STRING))) { + rc = orte_rml_base_parse_uris(uri, &dmn, NULL); + if (ORTE_SUCCESS != rc) { + OBJ_DESTRUCT(&buf); + OBJ_DESTRUCT(&bucket); + goto release; + } + /* save a node object for this daemon */ + node = OBJ_NEW(orte_node_t); + node->daemon = OBJ_NEW(orte_proc_t); + memcpy(&node->daemon->name, &dmn, sizeof(orte_process_name_t)); + opal_list_append(&nodes, &node->super); + /* register the URI */ + OBJ_CONSTRUCT(&val, opal_value_t); + val.key = OPAL_PMIX_PROC_URI; + val.type = OPAL_STRING; + val.data.string = uri; + if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&dmn, &val))) { + ORTE_ERROR_LOG(rc); + val.key = NULL; + val.data.string = NULL; + OBJ_DESTRUCT(&val); + OBJ_DESTRUCT(&buf); + OBJ_DESTRUCT(&bucket); + goto release; + } + val.key = NULL; + val.data.string = NULL; + OBJ_DESTRUCT(&val); + cnt = 1; + } + OBJ_DESTRUCT(&bucket); + + /* unpack the proc-to-daemon map */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + goto release; + } + /* load it into a buffer */ + OBJ_CONSTRUCT(&bucket, opal_buffer_t); + opal_dss.load(&bucket, bo->bytes, bo->size); + bo->bytes = NULL; + free(bo); + /* unpack and store the map */ + cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &pname, &cnt, ORTE_NAME))) { + /* get the name of the daemon hosting it */ + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&bucket, &dmn, &cnt, ORTE_NAME))) { + OBJ_DESTRUCT(&buf); + OBJ_DESTRUCT(&bucket); + goto release; + } + /* create the proc object */ + proc = OBJ_NEW(orte_proc_t); + memcpy(&proc->name, &pname, sizeof(orte_process_name_t)); + opal_pointer_array_set_item(jdata->procs, pname.vpid, proc); + /* find the daemon */ + OPAL_LIST_FOREACH(node, &nodes, orte_node_t) { + if (node->daemon->name.vpid == dmn.vpid) { + OBJ_RETAIN(node); + proc->node = node; + break; + } + } + } + OBJ_DESTRUCT(&bucket); + OPAL_LIST_DESTRUCT(&nodes); OBJ_DESTRUCT(&buf); + + /* register the nspace */ if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) { + ORTE_ERROR_LOG(rc); OBJ_RELEASE(jdata); goto release; } - OBJ_RELEASE(jdata); // no reason to keep this around + + /* save the job object so we don't endlessly cycle */ + opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata); /* restart the cnct processor */ ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata); @@ -619,6 +720,7 @@ static void _cnct(int sd, short args, void *cbdata) * out about it, and all we can do is return an error */ if (orte_pmix_server_globals.server.jobid == ORTE_PROC_MY_HNP->jobid && orte_pmix_server_globals.server.vpid == ORTE_PROC_MY_HNP->vpid) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED); rc = ORTE_ERR_NOT_SUPPORTED; goto release; } @@ -634,6 +736,7 @@ static void _cnct(int sd, short args, void *cbdata) kv->data.uint32 = geteuid(); opal_list_append(cd->info, &kv->super); if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) { + ORTE_ERROR_LOG(rc); opal_argv_free(keys); goto release; } @@ -647,6 +750,7 @@ static void _cnct(int sd, short args, void *cbdata) if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, NULL, OPAL_BOOL)) { /* it hasn't been registered yet, so register it now */ if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) { + ORTE_ERROR_LOG(rc); goto release; } } diff --git a/orte/orted/pmix/pmix_server_fence.c b/orte/orted/pmix/pmix_server_fence.c index 20c1849bbb0..402ad053ea9 100644 --- a/orte/orted/pmix/pmix_server_fence.c +++ b/orte/orted/pmix/pmix_server_fence.c @@ -227,6 +227,7 @@ static void dmodex_req(int sd, short args, void *cbdata) rc = ORTE_ERR_NOT_FOUND; goto callback; } + /* point the request to the daemon that is hosting the * target process */ req->proxy.vpid = dmn->name.vpid; @@ -240,7 +241,8 @@ static void dmodex_req(int sd, short args, void *cbdata) /* if we are the host daemon, then this is a local request, so * just wait for the data to come in */ - if (ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) { + if (ORTE_PROC_MY_NAME->jobid == dmn->name.jobid && + ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) { return; } diff --git a/orte/orted/pmix/pmix_server_register_fns.c b/orte/orted/pmix/pmix_server_register_fns.c index a5670539dbf..df240f53a09 100644 --- a/orte/orted/pmix/pmix_server_register_fns.c +++ b/orte/orted/pmix/pmix_server_register_fns.c @@ -13,7 +13,7 @@ * All rights reserved. * Copyright (c) 2009-2018 Cisco Systems, Inc. All rights reserved * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2013-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2019 Intel, Inc. All rights reserved. * Copyright (c) 2014 Mellanox Technologies, Inc. * All rights reserved. * Copyright (c) 2014-2016 Research Organization for Information Science @@ -71,6 +71,9 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force) gid_t gid; opal_list_t *cache; hwloc_obj_t machine; + opal_buffer_t buf, bucket; + opal_byte_object_t bo, *boptr; + orte_proc_t *proc; opal_output_verbose(2, orte_pmix_server_globals.output, "%s register nspace for %s", @@ -494,21 +497,52 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force) jdata->num_local_procs, info, NULL, NULL); OPAL_LIST_RELEASE(info); + if (OPAL_SUCCESS != rc) { + return rc; + } - /* if the user has connected us to an external server, then we must - * assume there is going to be some cross-mpirun exchange, and so + /* if I am the HNP and this job is a member of my family, then we must + * assume there could be some cross-mpirun exchange, and so * we protect against that situation by publishing the job info * for this job - this allows any subsequent "connect" to retrieve * the job info */ - if (NULL != orte_data_server_uri) { - opal_buffer_t buf; + if (ORTE_PROC_IS_HNP && ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) == ORTE_JOB_FAMILY(jdata->jobid)) { + /* pack the job - note that this doesn't include the procs + * or their locations */ OBJ_CONSTRUCT(&buf, opal_buffer_t); if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &jdata, 1, ORTE_JOB))) { ORTE_ERROR_LOG(rc); OBJ_DESTRUCT(&buf); return rc; } + + /* pack the hostname, daemon vpid and contact URI for each involved node */ + map = jdata->map; + OBJ_CONSTRUCT(&bucket, opal_buffer_t); + for (i=0; i < map->nodes->size; i++) { + if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) { + continue; + } + opal_dss.pack(&bucket, &node->daemon->rml_uri, 1, OPAL_STRING); + } + opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size); + boptr = &bo; + opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT); + + /* pack the proc name and daemon vpid for each proc */ + OBJ_CONSTRUCT(&bucket, opal_buffer_t); + for (i=0; i < jdata->procs->size; i++) { + if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) { + continue; + } + opal_dss.pack(&bucket, &proc->name, 1, ORTE_NAME); + opal_dss.pack(&bucket, &proc->node->daemon->name, 1, ORTE_NAME); + } + opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size); + boptr = &bo; + opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT); + info = OBJ_NEW(opal_list_t); /* create a key-value with the key being the string jobid * and the value being the byte object */