Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix spawn functionality with pmi2 #5849

Merged
merged 6 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/mpid/ch3/channels/sock/src/ch3_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/

#include "mpidi_ch3_impl.h"
#include "pmi.h"
#include "mpidu_sock.h"
#include "utlist.h"

Expand Down
1 change: 1 addition & 0 deletions src/mpid/ch3/include/mpidimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ int MPIDI_CH3I_Port_destroy(int port_name_tag);
--------------------------*/

#define MPIDI_MAX_KVS_VALUE_LEN 4096
#define MPIDI_MAX_JOBID_LEN 1024

/* ------------------------------------------------------------------------- */
/* mpirma.h (in src/mpi/rma?) */
Expand Down
174 changes: 7 additions & 167 deletions src/mpid/ch3/src/ch3u_comm_spawn_multiple.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,66 +25,6 @@
children */
#define PARENT_PORT_KVSKEY "PARENT_ROOT_PORT_NAME"

/* FIXME: We can avoid these two routines if we define PMI as using
MPI info values */
/* Turn a SINGLE MPI_Info into an array of PMI_keyvals (return the pointer
to the array of PMI keyvals) */
static int mpi_to_pmi_keyvals( MPIR_Info *info_ptr, PMI_keyval_t **kv_ptr,
int *nkeys_ptr )
{
char key[MPI_MAX_INFO_KEY];
PMI_keyval_t *kv = 0;
int i, nkeys = 0, vallen, flag, mpi_errno=MPI_SUCCESS;

if (!info_ptr || info_ptr->handle == MPI_INFO_NULL) {
goto fn_exit;
}

MPIR_Info_get_nkeys_impl( info_ptr, &nkeys );
if (nkeys == 0) {
goto fn_exit;
}
kv = (PMI_keyval_t *)MPL_malloc( nkeys * sizeof(PMI_keyval_t), MPL_MEM_DYNAMIC );
if (!kv) { MPIR_ERR_POP(mpi_errno); }

for (i=0; i<nkeys; i++) {
mpi_errno = MPIR_Info_get_nthkey_impl( info_ptr, i, key );
if (mpi_errno) { MPIR_ERR_POP(mpi_errno); }
MPIR_Info_get_valuelen_impl( info_ptr, key, &vallen, &flag );
MPIR_ERR_CHKANDJUMP1(!flag, mpi_errno, MPI_ERR_OTHER,"**infonokey", "**infonokey %s", key);

kv[i].key = MPL_strdup(key);
kv[i].val = MPL_malloc( vallen + 1, MPL_MEM_DYNAMIC );
if (!kv[i].key || !kv[i].val) {
MPIR_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" );
}
MPIR_Info_get_impl( info_ptr, key, vallen+1, kv[i].val, &flag );
MPIR_ERR_CHKANDJUMP1(!flag, mpi_errno, MPI_ERR_OTHER,"**infonokey", "**infonokey %s", key);
MPL_DBG_MSG_FMT(MPIDI_CH3_DBG_OTHER,TERSE,(MPL_DBG_FDEST,"key: <%s>, value: <%s>\n", kv[i].key, kv[i].val));
}

fn_fail:
fn_exit:
*kv_ptr = kv;
*nkeys_ptr = nkeys;
return mpi_errno;
}
/* Free the entire array of PMI keyvals */
static void free_pmi_keyvals(PMI_keyval_t **kv, int size, int *counts)
{
int i,j;

for (i=0; i<size; i++)
{
for (j=0; j<counts[i]; j++)
{
MPL_free((char *)kv[i][j].key);
MPL_free(kv[i][j].val);
}
MPL_free(kv[i]);
}
}

/*
* MPIDI_CH3_Comm_spawn_multiple()
*/
Expand All @@ -95,14 +35,12 @@ int MPIDI_Comm_spawn_multiple(int count, char **commands,
**intercomm, int *errcodes)
{
char port_name[MPI_MAX_PORT_NAME];
int *info_keyval_sizes=0, i, mpi_errno=MPI_SUCCESS;
PMI_keyval_t **info_keyval_vectors=0, preput_keyval_vector;
int *pmi_errcodes = 0, pmi_errno;
int i, mpi_errno=MPI_SUCCESS;
int *pmi_errcodes = 0;
int total_num_processes, should_accept = 1;

MPIR_FUNC_ENTER;


if (comm_ptr->rank == root) {
/* create an array for the pmi error codes */
total_num_processes = 0;
Expand All @@ -126,105 +64,12 @@ int MPIDI_Comm_spawn_multiple(int count, char **commands,
/* --END ERROR HANDLING-- */

/* Spawn the processes */
#ifdef USE_PMI2_API
MPIR_Assert(count > 0);
{
int *argcs = MPL_malloc(count*sizeof(int), MPL_MEM_DYNAMIC);
struct MPIR_Info preput;
struct MPIR_Info *preput_p[1] = { &preput };

MPIR_Assert(argcs);
/*
info_keyval_sizes = MPL_malloc(count * sizeof(int), MPL_MEM_DYNAMIC);
*/

/* FIXME cheating on constness */
preput.key = (char *)PARENT_PORT_KVSKEY;
preput.value = port_name;
preput.next = NULL;

/* compute argcs array */
for (i = 0; i < count; ++i) {
argcs[i] = 0;
if (argvs != NULL && argvs[i] != NULL) {
while (argvs[i][argcs[i]]) {
++argcs[i];
}
}

/* a fib for now */
/*
info_keyval_sizes[i] = 0;
*/
}
/* XXX DJG don't need this, PMI API is thread-safe? */
/*MPID_THREAD_CS_ENTER(POBJ, MPIR_THREAD_POBJ_PMI_MUTEX);*/
/* release the global CS for spawn PMI calls */
MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
pmi_errno = PMI2_Job_Spawn(count, (const char **)commands,
argcs, (const char ***)argvs,
maxprocs,
info_keyval_sizes, (const MPIR_Info **)info_ptrs,
1, (const struct MPIR_Info **)preput_p,
NULL, 0,
/*jobId, jobIdSize,*/ /* XXX DJG job stuff? */
pmi_errcodes);
MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
/*MPID_THREAD_CS_EXIT(POBJ, MPIR_THREAD_POBJ_PMI_MUTEX);*/
MPL_free(argcs);
if (pmi_errno != PMI2_SUCCESS) {
MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER,
"**pmi_spawn_multiple", "**pmi_spawn_multiple %d", pmi_errno);
}
}
#else
/* FIXME: This is *really* awkward. We should either
Fix on MPI-style info data structures for PMI (avoid unnecessary
duplication) or add an MPIU_Info_getall(...) that creates
the necessary arrays of key/value pairs */

/* convert the infos into PMI keyvals */
info_keyval_sizes = (int *) MPL_malloc(count * sizeof(int), MPL_MEM_DYNAMIC);
info_keyval_vectors =
(PMI_keyval_t**) MPL_malloc(count * sizeof(PMI_keyval_t*), MPL_MEM_DYNAMIC);
if (!info_keyval_sizes || !info_keyval_vectors) {
MPIR_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem");
}
MPIR_PMI_KEYVAL_t preput;
preput.key = PARENT_PORT_KVSKEY;
preput.val = port_name;

if (!info_ptrs) {
for (i=0; i<count; i++) {
info_keyval_vectors[i] = 0;
info_keyval_sizes[i] = 0;
}
}
else {
for (i=0; i<count; i++) {
mpi_errno = mpi_to_pmi_keyvals( info_ptrs[i],
&info_keyval_vectors[i],
&info_keyval_sizes[i] );
if (mpi_errno) { MPIR_ERR_POP(mpi_errno); }
}
}

preput_keyval_vector.key = PARENT_PORT_KVSKEY;
preput_keyval_vector.val = port_name;


MPID_THREAD_CS_ENTER(POBJ, MPIR_THREAD_POBJ_PMI_MUTEX);
pmi_errno = PMI_Spawn_multiple(count, (const char **)
commands,
(const char ***) argvs,
maxprocs, info_keyval_sizes,
(const PMI_keyval_t **)
info_keyval_vectors, 1,
&preput_keyval_vector,
pmi_errcodes);
MPID_THREAD_CS_EXIT(POBJ, MPIR_THREAD_POBJ_PMI_MUTEX);
if (pmi_errno != PMI_SUCCESS) {
MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER,
"**pmi_spawn_multiple", "**pmi_spawn_multiple %d", pmi_errno);
}
#endif
mpi_errno = MPIR_pmi_spawn_multiple(count, commands, argvs, maxprocs, info_ptrs, 1, &preput, pmi_errcodes);
MPIR_ERR_CHECK(mpi_errno);

if (errcodes != MPI_ERRCODES_IGNORE) {
for (i=0; i<total_num_processes; i++) {
Expand Down Expand Up @@ -274,11 +119,6 @@ int MPIDI_Comm_spawn_multiple(int count, char **commands,
}

fn_exit:
if (info_keyval_vectors) {
free_pmi_keyvals(info_keyval_vectors, count, info_keyval_sizes);
MPL_free(info_keyval_sizes);
MPL_free(info_keyval_vectors);
}
MPL_free(pmi_errcodes);
MPIR_FUNC_EXIT;
return mpi_errno;
Expand Down
2 changes: 0 additions & 2 deletions src/mpid/ch3/src/mpid_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

#include "mpidimpl.h"

#define MAX_JOBID_LEN 1024

#if defined(HAVE_LIMITS_H)
#include <limits.h>
#endif
Expand Down
6 changes: 2 additions & 4 deletions src/mpid/ch3/src/mpidi_pg.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#include "pmi.h"
#endif

#define MAX_JOBID_LEN 1024

/*
=== BEGIN_MPI_T_CVAR_INFO_BLOCK ===

Expand Down Expand Up @@ -739,12 +737,12 @@ int MPIDI_PG_InitConnKVS( MPIDI_PG_t *pg )
#ifdef USE_PMI2_API
int mpi_errno = MPI_SUCCESS;

pg->connData = (char *)MPL_malloc(MAX_JOBID_LEN, MPL_MEM_STRINGS);
pg->connData = (char *)MPL_malloc(MPIDI_MAX_JOBID_LEN, MPL_MEM_STRINGS);
if (pg->connData == NULL) {
MPIR_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomem");
}

mpi_errno = PMI2_Job_GetId(pg->connData, MAX_JOBID_LEN);
mpi_errno = PMI2_Job_GetId(pg->connData, MPIDI_MAX_JOBID_LEN);
MPIR_ERR_CHECK(mpi_errno);
#else
int pmi_errno, kvs_name_sz;
Expand Down
2 changes: 1 addition & 1 deletion src/mpid/ch3/util/sock/ch3u_connect_sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ int MPIDI_CH3I_Connection_alloc(MPIDI_CH3I_Connection_t ** connp)
we might prefer for connections to simply point at the single process
group to which the remote process belong */
#ifdef USE_PMI2_API
id_sz = MPID_MAX_JOBID_LEN;
id_sz = MPIDI_MAX_JOBID_LEN;
#else
pmi_errno = PMI_KVS_Get_name_length_max(&id_sz);
MPIR_ERR_CHKANDJUMP1(pmi_errno, mpi_errno,MPI_ERR_OTHER,
Expand Down
13 changes: 12 additions & 1 deletion src/pm/hydra/pm/pmiserv/pmip_pmi_v2.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,18 @@ static HYD_status fn_fullinit(int fd, char *args[])
break;
}
}
int idx = i;
HYDU_ASSERT(i < HYD_pmcd_pmip.local.proxy_process_count, status);

/* find executable information */
i = 0;
struct HYD_exec *exec;
for (exec = HYD_pmcd_pmip.exec_list; exec; exec = exec->next) {
i += exec->proc_count;
if (idx < i)
break;
}

HYD_STRING_STASH_INIT(stash);
HYD_STRING_STASH(stash,
MPL_strdup("cmd=fullinit-response;pmi-version=2;pmi-subversion=0;rank="),
Expand All @@ -183,7 +193,8 @@ static HYD_status fn_fullinit(int fd, char *args[])
HYD_STRING_STASH(stash, MPL_strdup(";size="), status);
HYD_STRING_STASH(stash, HYDU_int_to_str(HYD_pmcd_pmip.system_global.global_process_count),
status);
HYD_STRING_STASH(stash, MPL_strdup(";appnum=0"), status);
HYD_STRING_STASH(stash, MPL_strdup(";appnum="), status);
HYD_STRING_STASH(stash, HYDU_int_to_str(exec->appnum), status);
if (HYD_pmcd_pmip.local.spawner_kvsname) {
HYD_STRING_STASH(stash, MPL_strdup(";spawner-jobid="), status);
HYD_STRING_STASH(stash, MPL_strdup(HYD_pmcd_pmip.local.spawner_kvsname), status);
Expand Down
2 changes: 1 addition & 1 deletion src/pmi/pmi2/simple/simple2pmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ int PMI2_Nameserv_lookup(const char service_name[], const PMI2U_Info * info_ptr,
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_nameservlookup",
"**pmi2_nameservlookup %s", errmsg ? errmsg : "unknown");

found = getval(cmd.pairs, cmd.nPairs, VALUE_KEY, &found_port, &plen);
found = getval(cmd.pairs, cmd.nPairs, PORT_KEY, &found_port, &plen);
PMI2U_ERR_CHKANDJUMP1(!found, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_nameservlookup",
"**pmi2_nameservlookup %s", "not found");
MPL_strncpy(port, found_port, portLen);
Expand Down
51 changes: 49 additions & 2 deletions src/util/mpir_pmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -886,8 +886,55 @@ int MPIR_pmi_spawn_multiple(int count, char *commands[], char **argvs[],
MPIR_ERR_CHKANDJUMP1(pmi_errno != PMI_SUCCESS, mpi_errno, MPI_ERR_OTHER,
"**pmi_spawn_multiple", "**pmi_spawn_multiple %d", pmi_errno);
#elif defined(USE_PMI2_API)
/* not supported yet */
MPIR_Assert(0);
struct MPIR_Info preput;
struct MPIR_Info *preput_p[1] = { &preput };
int *argcs = MPL_malloc(count * sizeof(int), MPL_MEM_DYNAMIC);
int *info_keyval_sizes = MPL_malloc(count * sizeof(int), MPL_MEM_DYNAMIC);
MPIR_Assert(argcs);
MPIR_Assert(info_keyval_sizes);

/* compute argcs array */
for (int i = 0; i < count; ++i) {
argcs[i] = 0;
if (argvs != NULL && argvs[i] != NULL) {
while (argvs[i][argcs[i]]) {
++argcs[i];
}
}
}

/* FIXME cheating on constness */
preput.key = (char *) preput_keyvals->key;
preput.value = preput_keyvals->val;
preput.next = NULL;

/* determine info sizes */
if (!info_ptrs) {
for (int i = 0; i < count; i++) {
info_keyval_sizes[i] = 0;
}
} else {
for (int i = 0; i < count; i++) {
if (info_ptrs[i] != NULL) {
MPIR_Info_get_nkeys_impl(info_ptrs[i], &info_keyval_sizes[i]);
info_ptrs[i] = info_ptrs[i]->next; /* skip empty MPIR_Info struct */
} else {
info_keyval_sizes[i] = 0;
}
}
}

pmi_errno = PMI2_Job_Spawn(count, (const char **) commands,
argcs, (const char ***) argvs,
maxprocs,
info_keyval_sizes, (const MPIR_Info **) info_ptrs,
1, (const struct MPIR_Info **) preput_p, NULL, 0, pmi_errcodes);
MPL_free(argcs);
MPL_free(info_keyval_sizes);
if (pmi_errno != PMI2_SUCCESS) {
MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER,
"**pmi_spawn_multiple", "**pmi_spawn_multiple %d", pmi_errno);
}
#elif defined(USE_PMIX_API)
/* not supported yet */
MPIR_Assert(0);
Expand Down