Skip to content

Commit

Permalink
Correctly handle non-blocking collectives tags
Browse files Browse the repository at this point in the history
As it is possible to have multiple outstanding non-blocking collectives
provided by different collective modules, we need a consistent
mechanism to allow them to select unique tags for each instance of a
collective.

Signed-off-by: George Bosilca <[email protected]>
  • Loading branch information
bosilca authored and jsquyres committed Aug 24, 2020
1 parent 8582e10 commit c2970a3
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 192 deletions.
6 changes: 3 additions & 3 deletions ompi/communicator/comm_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "ompi/constants.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/coll/base/base.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/topo/base/base.h"
#include "ompi/runtime/params.h"
#include "ompi/communicator/communicator.h"
Expand Down Expand Up @@ -382,9 +383,8 @@ static void ompi_comm_construct(ompi_communicator_t* comm)
comm->c_pml_comm = NULL;
comm->c_topo = NULL;
comm->c_coll = NULL;
comm->c_ibcast_tag = 0;
comm->c_ireduce_tag = 0;

comm->c_nbc_tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;

/* A keyhash will be created if/when an attribute is cached on
this communicator */
comm->c_keyhash = NULL;
Expand Down
11 changes: 5 additions & 6 deletions ompi/communicator/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,12 @@ struct ompi_communicator_t {
/* Collectives module interface and data */
mca_coll_base_comm_coll_t *c_coll;

/* Non-blocking collective tag. These are added here as they should be
* shared between all non-blocking collective modules (to avoid message
* collisions between them in the case where multiple outstanding
* non-blocking collective coexists using multiple backends).
/* Non-blocking collective tag. These tags might be shared between
* all non-blocking collective modules (to avoid message collision
* between them in the case where multiple outstanding non-blocking
* collective coexists using multiple backends).
*/
opal_atomic_int32_t c_ibcast_tag;
opal_atomic_int32_t c_ireduce_tag;
opal_atomic_int32_t c_nbc_tag;
};
typedef struct ompi_communicator_t ompi_communicator_t;

Expand Down
46 changes: 16 additions & 30 deletions ompi/mca/coll/adapt/coll_adapt_algorithms.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,26 @@ int ompi_coll_adapt_ibcast_fini(void);
int ompi_coll_adapt_bcast(BCAST_ARGS);
int ompi_coll_adapt_ibcast(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_generic(IBCAST_ARGS,
ompi_coll_tree_t * tree, size_t seg_size, int ibcast_tag);
int ompi_coll_adapt_ibcast_binomial(IBCAST_ARGS,
int ibcast_tag);
int ompi_coll_adapt_ibcast_in_order_binomial(IBCAST_ARGS,
int ibcast_tag);
int ompi_coll_adapt_ibcast_binary(IBCAST_ARGS,
int ibcast_tag);
int ompi_coll_adapt_ibcast_pipeline(IBCAST_ARGS,
int ibcast_tag);
int ompi_coll_adapt_ibcast_chain(IBCAST_ARGS,
int ibcast_tag);
int ompi_coll_adapt_ibcast_linear(IBCAST_ARGS,
int ibcast_tag);
int ompi_coll_adapt_ibcast_tuned(IBCAST_ARGS,
int ibcast_tag);
ompi_coll_tree_t * tree, size_t seg_size);
int ompi_coll_adapt_ibcast_binomial(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_in_order_binomial(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_binary(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_pipeline(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_chain(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_linear(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_tuned(IBCAST_ARGS);

/* Reduce */
int ompi_coll_adapt_ireduce_register(void);
int ompi_coll_adapt_ireduce_fini(void);
int ompi_coll_adapt_reduce(REDUCE_ARGS);
int ompi_coll_adapt_ireduce(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_generic(IREDUCE_ARGS,
ompi_coll_tree_t * tree, size_t seg_size, int ireduce_tag);
int ompi_coll_adapt_ireduce_tuned(IREDUCE_ARGS,
int ireduce_tag);
int ompi_coll_adapt_ireduce_binomial(IREDUCE_ARGS,
int ireduce_tag);
int ompi_coll_adapt_ireduce_in_order_binomial(IREDUCE_ARGS,
int ireduce_tag);
int ompi_coll_adapt_ireduce_binary(IREDUCE_ARGS,
int ireduce_tag);
int ompi_coll_adapt_ireduce_pipeline(IREDUCE_ARGS,
int ireduce_tag);
int ompi_coll_adapt_ireduce_chain(IREDUCE_ARGS,
int ireduce_tag);
int ompi_coll_adapt_ireduce_linear(IREDUCE_ARGS,
int ireduce_tag);
ompi_coll_tree_t * tree, size_t seg_size);
int ompi_coll_adapt_ireduce_tuned(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_binomial(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_in_order_binomial(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_binary(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_pipeline(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_chain(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_linear(IREDUCE_ARGS);
95 changes: 42 additions & 53 deletions ompi/mca/coll/adapt/coll_adapt_ibcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include "coll_adapt.h"
#include "coll_adapt_algorithms.h"
#include "coll_adapt_context.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/coll/base/coll_base_util.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "opal/util/bit_ops.h"
#include "opal/sys/atomic.h"
Expand All @@ -27,8 +27,7 @@ typedef int (*ompi_coll_adapt_ibcast_fn_t) (void *buff,
int root,
struct ompi_communicator_t * comm,
ompi_request_t ** request,
mca_coll_base_module_t * module,
int ibcast_tag);
mca_coll_base_module_t * module);

static ompi_coll_adapt_algorithm_index_t ompi_coll_adapt_ibcast_algorithm_index[] = {
{0, (uintptr_t) ompi_coll_adapt_ibcast_tuned},
Expand Down Expand Up @@ -158,11 +157,11 @@ static int send_cb(ompi_request_t * req)
"[%d]: Send(start in send cb): segment %d to %d at buff %p send_count %d tag %d\n",
ompi_comm_rank(send_context->con->comm), send_context->frag_id,
send_context->peer, (void *) send_context->buff, send_count,
(send_context->con->ibcast_tag << 16) + new_id));
send_context->con->ibcast_tag - new_id));
err =
MCA_PML_CALL(isend
(send_buff, send_count, send_context->con->datatype, send_context->peer,
(send_context->con->ibcast_tag << 16) + new_id,
send_context->con->ibcast_tag - new_id,
MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req));
if (MPI_SUCCESS != err) {
OPAL_THREAD_UNLOCK(context->con->mutex);
Expand Down Expand Up @@ -245,10 +244,10 @@ static int recv_cb(ompi_request_t * req)
"[%d]: Recv(start in recv cb): segment %d from %d at buff %p recv_count %d tag %d\n",
ompi_comm_rank(context->con->comm), context->frag_id, context->peer,
(void *) recv_buff, recv_count,
(recv_context->con->ibcast_tag << 16) + recv_context->frag_id));
recv_context->con->ibcast_tag - recv_context->frag_id));
MCA_PML_CALL(irecv
(recv_buff, recv_count, recv_context->con->datatype, recv_context->peer,
(recv_context->con->ibcast_tag << 16) + recv_context->frag_id,
recv_context->con->ibcast_tag - recv_context->frag_id,
recv_context->con->comm, &recv_req));

/* Invoke recvive call back */
Expand Down Expand Up @@ -282,12 +281,12 @@ static int recv_cb(ompi_request_t * req)
"[%d]: Send(start in recv cb): segment %d to %d at buff %p send_count %d tag %d\n",
ompi_comm_rank(send_context->con->comm), send_context->frag_id,
send_context->peer, (void *) send_context->buff, send_count,
(send_context->con->ibcast_tag << 16) + send_context->frag_id));
send_context->con->ibcast_tag - send_context->frag_id));
err =
MCA_PML_CALL(isend
(send_buff, send_count, send_context->con->datatype,
send_context->peer,
(send_context->con->ibcast_tag << 16) + send_context->frag_id,
send_context->con->ibcast_tag - send_context->frag_id,
MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req));
if (MPI_SUCCESS != err) {
OPAL_THREAD_UNLOCK(context->con->mutex);
Expand Down Expand Up @@ -344,12 +343,10 @@ int ompi_coll_adapt_ibcast(void *buff, int count, struct ompi_datatype_t *dataty
*request = temp_request;
return MPI_SUCCESS;
}
int ibcast_tag = opal_atomic_add_fetch_32(&(comm->c_ibcast_tag), 1);
ibcast_tag = ibcast_tag % 4096;

OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output,
"ibcast tag %d root %d, algorithm %d, coll_adapt_ibcast_segment_size %zu, coll_adapt_ibcast_max_send_requests %d, coll_adapt_ibcast_max_recv_requests %d\n",
ibcast_tag, root, mca_coll_adapt_component.adapt_ibcast_algorithm,
"ibcast root %d, algorithm %d, coll_adapt_ibcast_segment_size %zu, coll_adapt_ibcast_max_send_requests %d, coll_adapt_ibcast_max_recv_requests %d\n",
root, mca_coll_adapt_component.adapt_ibcast_algorithm,
mca_coll_adapt_component.adapt_ibcast_segment_size,
mca_coll_adapt_component.adapt_ibcast_max_send_requests,
mca_coll_adapt_component.adapt_ibcast_max_recv_requests));
Expand All @@ -358,89 +355,82 @@ int ompi_coll_adapt_ibcast(void *buff, int count, struct ompi_datatype_t *dataty
(ompi_coll_adapt_ibcast_fn_t)
ompi_coll_adapt_ibcast_algorithm_index[mca_coll_adapt_component.adapt_ibcast_algorithm].
algorithm_fn_ptr;
return bcast_func(buff, count, datatype, root, comm, request, module, ibcast_tag);
return bcast_func(buff, count, datatype, root, comm, request, module);
}

/*
* Ibcast functions with different algorithms
*/
int ompi_coll_adapt_ibcast_tuned(void *buff, int count, struct ompi_datatype_t *datatype,
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request,
mca_coll_base_module_t *module, int ibcast_tag)
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request,
mca_coll_base_module_t *module)
{
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n"));
return OMPI_ERR_NOT_IMPLEMENTED;
}

int ompi_coll_adapt_ibcast_binomial(void *buff, int count, struct ompi_datatype_t *datatype,
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request, mca_coll_base_module_t * module,
int ibcast_tag)
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request, mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_bmtree(comm, root);
int err =
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
mca_coll_adapt_component.adapt_ibcast_segment_size,
ibcast_tag);
mca_coll_adapt_component.adapt_ibcast_segment_size);
return err;
}

int ompi_coll_adapt_ibcast_in_order_binomial(void *buff, int count, struct ompi_datatype_t *datatype,
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request,
mca_coll_base_module_t * module, int ibcast_tag)
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request,
mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_in_order_bmtree(comm, root);
int err =
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
mca_coll_adapt_component.adapt_ibcast_segment_size,
ibcast_tag);
mca_coll_adapt_component.adapt_ibcast_segment_size);
return err;
}


int ompi_coll_adapt_ibcast_binary(void *buff, int count, struct ompi_datatype_t *datatype, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, int ibcast_tag)
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_tree(2, comm, root);
int err =
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
mca_coll_adapt_component.adapt_ibcast_segment_size,
ibcast_tag);
mca_coll_adapt_component.adapt_ibcast_segment_size);
return err;
}

int ompi_coll_adapt_ibcast_pipeline(void *buff, int count, struct ompi_datatype_t *datatype,
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request, mca_coll_base_module_t * module,
int ibcast_tag)
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request, mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(1, comm, root);
int err =
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
mca_coll_adapt_component.adapt_ibcast_segment_size,
ibcast_tag);
mca_coll_adapt_component.adapt_ibcast_segment_size);
return err;
}


int ompi_coll_adapt_ibcast_chain(void *buff, int count, struct ompi_datatype_t *datatype, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, int ibcast_tag)
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(4, comm, root);
int err =
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
mca_coll_adapt_component.adapt_ibcast_segment_size,
ibcast_tag);
mca_coll_adapt_component.adapt_ibcast_segment_size);
return err;
}

int ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t *datatype, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, int ibcast_tag)
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module)
{
int fanout = ompi_comm_size(comm) - 1;
ompi_coll_tree_t *tree;
Expand All @@ -453,16 +443,15 @@ int ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t
}
int err =
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
mca_coll_adapt_component.adapt_ibcast_segment_size,
ibcast_tag);
mca_coll_adapt_component.adapt_ibcast_segment_size);
return err;
}


int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t *datatype, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, ompi_coll_tree_t * tree,
size_t seg_size, int ibcast_tag)
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, ompi_coll_tree_t * tree,
size_t seg_size)
{
int i, j, rank, err;
/* The min of num_segs and SEND_NUM or RECV_NUM, in case the num_segs is less than SEND_NUM or RECV_NUM */
Expand Down Expand Up @@ -555,11 +544,11 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t
con->mutex = mutex;
con->request = temp_request;
con->tree = tree;
con->ibcast_tag = ibcast_tag;
con->ibcast_tag = ompi_coll_base_nbc_reserve_tags(comm, num_segs);

OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
"[%d]: Ibcast, root %d, tag %d\n", rank, root,
ibcast_tag));
con->ibcast_tag));
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
"[%d]: con->mutex = %p, num_children = %d, num_segs = %d, real_seg_size = %d, seg_count = %d, tree_adreess = %p\n",
rank, (void *) con->mutex, tree->tree_nextsize, num_segs,
Expand Down Expand Up @@ -610,11 +599,11 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
"[%d]: Send(start in main): segment %d to %d at buff %p send_count %d tag %d\n",
rank, context->frag_id, context->peer,
(void *) send_buff, send_count, (ibcast_tag << 16) + i));
(void *) send_buff, send_count, con->ibcast_tag - i));
err =
MCA_PML_CALL(isend
(send_buff, send_count, datatype, context->peer,
(ibcast_tag << 16) + i, MCA_PML_BASE_SEND_SYNCHRONOUS, comm,
con->ibcast_tag - i, MCA_PML_BASE_SEND_SYNCHRONOUS, comm,
&send_req));
if (MPI_SUCCESS != err) {
return err;
Expand Down Expand Up @@ -668,11 +657,11 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t
"[%d]: Recv(start in main): segment %d from %d at buff %p recv_count %d tag %d\n",
ompi_comm_rank(context->con->comm), context->frag_id,
context->peer, (void *) recv_buff, recv_count,
(ibcast_tag << 16) + i));
con->ibcast_tag - i));
err =
MCA_PML_CALL(irecv
(recv_buff, recv_count, datatype, context->peer,
(ibcast_tag << 16) + i, comm, &recv_req));
con->ibcast_tag - i, comm, &recv_req));
if (MPI_SUCCESS != err) {
return err;
}
Expand Down
Loading

0 comments on commit c2970a3

Please sign in to comment.