diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index 64dc9faf39c..639372548d0 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -40,6 +40,7 @@ #include "ompi/constants.h" #include "ompi/mca/pml/pml.h" #include "ompi/mca/coll/base/base.h" +#include "ompi/mca/coll/base/coll_tags.h" #include "ompi/mca/topo/base/base.h" #include "ompi/runtime/params.h" #include "ompi/communicator/communicator.h" @@ -382,9 +383,8 @@ static void ompi_comm_construct(ompi_communicator_t* comm) comm->c_pml_comm = NULL; comm->c_topo = NULL; comm->c_coll = NULL; - comm->c_ibcast_tag = 0; - comm->c_ireduce_tag = 0; - + comm->c_nbc_tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE; + /* A keyhash will be created if/when an attribute is cached on this communicator */ comm->c_keyhash = NULL; diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index c642ab4bfb8..8936b7f1df9 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -188,13 +188,12 @@ struct ompi_communicator_t { /* Collectives module interface and data */ mca_coll_base_comm_coll_t *c_coll; - /* Non-blocking collective tag. These are added here as they should be - * shared between all non-blocking collective modules (to avoid message - * collisions between them in the case where multiple outstanding - * non-blocking collective coexists using multiple backends). + /* Non-blocking collective tag. These tags might be shared between + * all non-blocking collective modules (to avoid message collision + * between them in the case where multiple outstanding non-blocking + * collective coexists using multiple backends). */ - opal_atomic_int32_t c_ibcast_tag; - opal_atomic_int32_t c_ireduce_tag; + opal_atomic_int32_t c_nbc_tag; }; typedef struct ompi_communicator_t ompi_communicator_t; diff --git a/ompi/mca/coll/adapt/coll_adapt_algorithms.h b/ompi/mca/coll/adapt/coll_adapt_algorithms.h index a25d8afb622..700adabea15 100644 --- a/ompi/mca/coll/adapt/coll_adapt_algorithms.h +++ b/ompi/mca/coll/adapt/coll_adapt_algorithms.h @@ -25,21 +25,14 @@ int ompi_coll_adapt_ibcast_fini(void); int ompi_coll_adapt_bcast(BCAST_ARGS); int ompi_coll_adapt_ibcast(IBCAST_ARGS); int ompi_coll_adapt_ibcast_generic(IBCAST_ARGS, - ompi_coll_tree_t * tree, size_t seg_size, int ibcast_tag); -int ompi_coll_adapt_ibcast_binomial(IBCAST_ARGS, - int ibcast_tag); -int ompi_coll_adapt_ibcast_in_order_binomial(IBCAST_ARGS, - int ibcast_tag); -int ompi_coll_adapt_ibcast_binary(IBCAST_ARGS, - int ibcast_tag); -int ompi_coll_adapt_ibcast_pipeline(IBCAST_ARGS, - int ibcast_tag); -int ompi_coll_adapt_ibcast_chain(IBCAST_ARGS, - int ibcast_tag); -int ompi_coll_adapt_ibcast_linear(IBCAST_ARGS, - int ibcast_tag); -int ompi_coll_adapt_ibcast_tuned(IBCAST_ARGS, - int ibcast_tag); + ompi_coll_tree_t * tree, size_t seg_size); +int ompi_coll_adapt_ibcast_binomial(IBCAST_ARGS); +int ompi_coll_adapt_ibcast_in_order_binomial(IBCAST_ARGS); +int ompi_coll_adapt_ibcast_binary(IBCAST_ARGS); +int ompi_coll_adapt_ibcast_pipeline(IBCAST_ARGS); +int ompi_coll_adapt_ibcast_chain(IBCAST_ARGS); +int ompi_coll_adapt_ibcast_linear(IBCAST_ARGS); +int ompi_coll_adapt_ibcast_tuned(IBCAST_ARGS); /* Reduce */ int ompi_coll_adapt_ireduce_register(void); @@ -47,18 +40,11 @@ int ompi_coll_adapt_ireduce_fini(void); int ompi_coll_adapt_reduce(REDUCE_ARGS); int ompi_coll_adapt_ireduce(IREDUCE_ARGS); int ompi_coll_adapt_ireduce_generic(IREDUCE_ARGS, - ompi_coll_tree_t * tree, size_t seg_size, int ireduce_tag); -int ompi_coll_adapt_ireduce_tuned(IREDUCE_ARGS, - int ireduce_tag); -int ompi_coll_adapt_ireduce_binomial(IREDUCE_ARGS, - int ireduce_tag); -int ompi_coll_adapt_ireduce_in_order_binomial(IREDUCE_ARGS, - int ireduce_tag); -int ompi_coll_adapt_ireduce_binary(IREDUCE_ARGS, - int ireduce_tag); -int ompi_coll_adapt_ireduce_pipeline(IREDUCE_ARGS, - int ireduce_tag); -int ompi_coll_adapt_ireduce_chain(IREDUCE_ARGS, - int ireduce_tag); -int ompi_coll_adapt_ireduce_linear(IREDUCE_ARGS, - int ireduce_tag); + ompi_coll_tree_t * tree, size_t seg_size); +int ompi_coll_adapt_ireduce_tuned(IREDUCE_ARGS); +int ompi_coll_adapt_ireduce_binomial(IREDUCE_ARGS); +int ompi_coll_adapt_ireduce_in_order_binomial(IREDUCE_ARGS); +int ompi_coll_adapt_ireduce_binary(IREDUCE_ARGS); +int ompi_coll_adapt_ireduce_pipeline(IREDUCE_ARGS); +int ompi_coll_adapt_ireduce_chain(IREDUCE_ARGS); +int ompi_coll_adapt_ireduce_linear(IREDUCE_ARGS); diff --git a/ompi/mca/coll/adapt/coll_adapt_ibcast.c b/ompi/mca/coll/adapt/coll_adapt_ibcast.c index 1b4e8de364f..3a8555e7fd2 100644 --- a/ompi/mca/coll/adapt/coll_adapt_ibcast.c +++ b/ompi/mca/coll/adapt/coll_adapt_ibcast.c @@ -14,7 +14,7 @@ #include "coll_adapt.h" #include "coll_adapt_algorithms.h" #include "coll_adapt_context.h" -#include "ompi/mca/coll/base/coll_tags.h" +#include "ompi/mca/coll/base/coll_base_util.h" #include "ompi/mca/coll/base/coll_base_functions.h" #include "opal/util/bit_ops.h" #include "opal/sys/atomic.h" @@ -27,8 +27,7 @@ typedef int (*ompi_coll_adapt_ibcast_fn_t) (void *buff, int root, struct ompi_communicator_t * comm, ompi_request_t ** request, - mca_coll_base_module_t * module, - int ibcast_tag); + mca_coll_base_module_t * module); static ompi_coll_adapt_algorithm_index_t ompi_coll_adapt_ibcast_algorithm_index[] = { {0, (uintptr_t) ompi_coll_adapt_ibcast_tuned}, @@ -158,11 +157,11 @@ static int send_cb(ompi_request_t * req) "[%d]: Send(start in send cb): segment %d to %d at buff %p send_count %d tag %d\n", ompi_comm_rank(send_context->con->comm), send_context->frag_id, send_context->peer, (void *) send_context->buff, send_count, - (send_context->con->ibcast_tag << 16) + new_id)); + send_context->con->ibcast_tag - new_id)); err = MCA_PML_CALL(isend (send_buff, send_count, send_context->con->datatype, send_context->peer, - (send_context->con->ibcast_tag << 16) + new_id, + send_context->con->ibcast_tag - new_id, MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req)); if (MPI_SUCCESS != err) { OPAL_THREAD_UNLOCK(context->con->mutex); @@ -245,10 +244,10 @@ static int recv_cb(ompi_request_t * req) "[%d]: Recv(start in recv cb): segment %d from %d at buff %p recv_count %d tag %d\n", ompi_comm_rank(context->con->comm), context->frag_id, context->peer, (void *) recv_buff, recv_count, - (recv_context->con->ibcast_tag << 16) + recv_context->frag_id)); + recv_context->con->ibcast_tag - recv_context->frag_id)); MCA_PML_CALL(irecv (recv_buff, recv_count, recv_context->con->datatype, recv_context->peer, - (recv_context->con->ibcast_tag << 16) + recv_context->frag_id, + recv_context->con->ibcast_tag - recv_context->frag_id, recv_context->con->comm, &recv_req)); /* Invoke recvive call back */ @@ -282,12 +281,12 @@ static int recv_cb(ompi_request_t * req) "[%d]: Send(start in recv cb): segment %d to %d at buff %p send_count %d tag %d\n", ompi_comm_rank(send_context->con->comm), send_context->frag_id, send_context->peer, (void *) send_context->buff, send_count, - (send_context->con->ibcast_tag << 16) + send_context->frag_id)); + send_context->con->ibcast_tag - send_context->frag_id)); err = MCA_PML_CALL(isend (send_buff, send_count, send_context->con->datatype, send_context->peer, - (send_context->con->ibcast_tag << 16) + send_context->frag_id, + send_context->con->ibcast_tag - send_context->frag_id, MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req)); if (MPI_SUCCESS != err) { OPAL_THREAD_UNLOCK(context->con->mutex); @@ -344,12 +343,10 @@ int ompi_coll_adapt_ibcast(void *buff, int count, struct ompi_datatype_t *dataty *request = temp_request; return MPI_SUCCESS; } - int ibcast_tag = opal_atomic_add_fetch_32(&(comm->c_ibcast_tag), 1); - ibcast_tag = ibcast_tag % 4096; OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, - "ibcast tag %d root %d, algorithm %d, coll_adapt_ibcast_segment_size %zu, coll_adapt_ibcast_max_send_requests %d, coll_adapt_ibcast_max_recv_requests %d\n", - ibcast_tag, root, mca_coll_adapt_component.adapt_ibcast_algorithm, + "ibcast root %d, algorithm %d, coll_adapt_ibcast_segment_size %zu, coll_adapt_ibcast_max_send_requests %d, coll_adapt_ibcast_max_recv_requests %d\n", + root, mca_coll_adapt_component.adapt_ibcast_algorithm, mca_coll_adapt_component.adapt_ibcast_segment_size, mca_coll_adapt_component.adapt_ibcast_max_send_requests, mca_coll_adapt_component.adapt_ibcast_max_recv_requests)); @@ -358,89 +355,82 @@ int ompi_coll_adapt_ibcast(void *buff, int count, struct ompi_datatype_t *dataty (ompi_coll_adapt_ibcast_fn_t) ompi_coll_adapt_ibcast_algorithm_index[mca_coll_adapt_component.adapt_ibcast_algorithm]. algorithm_fn_ptr; - return bcast_func(buff, count, datatype, root, comm, request, module, ibcast_tag); + return bcast_func(buff, count, datatype, root, comm, request, module); } /* * Ibcast functions with different algorithms */ int ompi_coll_adapt_ibcast_tuned(void *buff, int count, struct ompi_datatype_t *datatype, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, - mca_coll_base_module_t *module, int ibcast_tag) + int root, struct ompi_communicator_t *comm, + ompi_request_t ** request, + mca_coll_base_module_t *module) { OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n")); return OMPI_ERR_NOT_IMPLEMENTED; } int ompi_coll_adapt_ibcast_binomial(void *buff, int count, struct ompi_datatype_t *datatype, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, mca_coll_base_module_t * module, - int ibcast_tag) + int root, struct ompi_communicator_t *comm, + ompi_request_t ** request, mca_coll_base_module_t * module) { ompi_coll_tree_t *tree = ompi_coll_base_topo_build_bmtree(comm, root); int err = ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size, - ibcast_tag); + mca_coll_adapt_component.adapt_ibcast_segment_size); return err; } int ompi_coll_adapt_ibcast_in_order_binomial(void *buff, int count, struct ompi_datatype_t *datatype, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, - mca_coll_base_module_t * module, int ibcast_tag) + int root, struct ompi_communicator_t *comm, + ompi_request_t ** request, + mca_coll_base_module_t * module) { ompi_coll_tree_t *tree = ompi_coll_base_topo_build_in_order_bmtree(comm, root); int err = ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size, - ibcast_tag); + mca_coll_adapt_component.adapt_ibcast_segment_size); return err; } int ompi_coll_adapt_ibcast_binary(void *buff, int count, struct ompi_datatype_t *datatype, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module, int ibcast_tag) + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { ompi_coll_tree_t *tree = ompi_coll_base_topo_build_tree(2, comm, root); int err = ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size, - ibcast_tag); + mca_coll_adapt_component.adapt_ibcast_segment_size); return err; } int ompi_coll_adapt_ibcast_pipeline(void *buff, int count, struct ompi_datatype_t *datatype, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, mca_coll_base_module_t * module, - int ibcast_tag) + int root, struct ompi_communicator_t *comm, + ompi_request_t ** request, mca_coll_base_module_t * module) { ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(1, comm, root); int err = ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size, - ibcast_tag); + mca_coll_adapt_component.adapt_ibcast_segment_size); return err; } int ompi_coll_adapt_ibcast_chain(void *buff, int count, struct ompi_datatype_t *datatype, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module, int ibcast_tag) + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(4, comm, root); int err = ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size, - ibcast_tag); + mca_coll_adapt_component.adapt_ibcast_segment_size); return err; } int ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t *datatype, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module, int ibcast_tag) + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { int fanout = ompi_comm_size(comm) - 1; ompi_coll_tree_t *tree; @@ -453,16 +443,15 @@ int ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t } int err = ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size, - ibcast_tag); + mca_coll_adapt_component.adapt_ibcast_segment_size); return err; } int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t *datatype, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module, ompi_coll_tree_t * tree, - size_t seg_size, int ibcast_tag) + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module, ompi_coll_tree_t * tree, + size_t seg_size) { int i, j, rank, err; /* The min of num_segs and SEND_NUM or RECV_NUM, in case the num_segs is less than SEND_NUM or RECV_NUM */ @@ -555,11 +544,11 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t con->mutex = mutex; con->request = temp_request; con->tree = tree; - con->ibcast_tag = ibcast_tag; + con->ibcast_tag = ompi_coll_base_nbc_reserve_tags(comm, num_segs); OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Ibcast, root %d, tag %d\n", rank, root, - ibcast_tag)); + con->ibcast_tag)); OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: con->mutex = %p, num_children = %d, num_segs = %d, real_seg_size = %d, seg_count = %d, tree_adreess = %p\n", rank, (void *) con->mutex, tree->tree_nextsize, num_segs, @@ -610,11 +599,11 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Send(start in main): segment %d to %d at buff %p send_count %d tag %d\n", rank, context->frag_id, context->peer, - (void *) send_buff, send_count, (ibcast_tag << 16) + i)); + (void *) send_buff, send_count, con->ibcast_tag - i)); err = MCA_PML_CALL(isend (send_buff, send_count, datatype, context->peer, - (ibcast_tag << 16) + i, MCA_PML_BASE_SEND_SYNCHRONOUS, comm, + con->ibcast_tag - i, MCA_PML_BASE_SEND_SYNCHRONOUS, comm, &send_req)); if (MPI_SUCCESS != err) { return err; @@ -668,11 +657,11 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t "[%d]: Recv(start in main): segment %d from %d at buff %p recv_count %d tag %d\n", ompi_comm_rank(context->con->comm), context->frag_id, context->peer, (void *) recv_buff, recv_count, - (ibcast_tag << 16) + i)); + con->ibcast_tag - i)); err = MCA_PML_CALL(irecv (recv_buff, recv_count, datatype, context->peer, - (ibcast_tag << 16) + i, comm, &recv_req)); + con->ibcast_tag - i, comm, &recv_req)); if (MPI_SUCCESS != err) { return err; } diff --git a/ompi/mca/coll/adapt/coll_adapt_ireduce.c b/ompi/mca/coll/adapt/coll_adapt_ireduce.c index 9fc7cb63ea0..63de926ef53 100644 --- a/ompi/mca/coll/adapt/coll_adapt_ireduce.c +++ b/ompi/mca/coll/adapt/coll_adapt_ireduce.c @@ -17,7 +17,7 @@ #include "coll_adapt_item.h" #include "ompi/constants.h" #include "ompi/mca/coll/coll.h" -#include "ompi/mca/coll/base/coll_tags.h" +#include "ompi/mca/coll/base/coll_base_util.h" #include "ompi/mca/pml/pml.h" #include "ompi/mca/coll/base/coll_base_functions.h" #include "ompi/mca/coll/base/coll_base_topo.h" @@ -32,7 +32,7 @@ typedef int (*ompi_coll_adapt_ireduce_fn_t) (const void *sbuf, int root, struct ompi_communicator_t * comm, ompi_request_t ** request, - mca_coll_base_module_t * module, int ireduce_tag); + mca_coll_base_module_t * module); static ompi_coll_adapt_algorithm_index_t ompi_coll_adapt_ireduce_algorithm_index[] = { {0, (uintptr_t)ompi_coll_adapt_ireduce_tuned}, @@ -263,14 +263,14 @@ static int send_cb(ompi_request_t * req) OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: In send_cb, create isend to seg %d, peer %d, tag %d\n", send_context->con->rank, send_context->frag_id, send_context->peer, - (send_context->con->ireduce_tag << 16) + send_context->frag_id)); + send_context->con->ireduce_tag - send_context->frag_id)); ompi_request_t *send_req; err = MCA_PML_CALL(isend (send_context->buff, send_count, send_context->con->datatype, send_context->peer, - (context->con->ireduce_tag << 16) + send_context->frag_id, + context->con->ireduce_tag - send_context->frag_id, MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req)); if (MPI_SUCCESS != err) { return err; @@ -355,13 +355,13 @@ static int recv_cb(ompi_request_t * req) "[%d]: In recv_cb, create irecv for seg %d, peer %d, inbuf %p, tag %d\n", context->con->rank, recv_context->frag_id, recv_context->peer, (void *) inbuf, - (recv_context->con->ireduce_tag << 16) + recv_context->frag_id)); + recv_context->con->ireduce_tag - recv_context->frag_id)); ompi_request_t *recv_req; err = MCA_PML_CALL(irecv (temp_recv_buf, recv_count, recv_context->con->datatype, recv_context->peer, - (recv_context->con->ireduce_tag << 16) + recv_context->frag_id, + recv_context->con->ireduce_tag - recv_context->frag_id, recv_context->con->comm, &recv_req)); if (MPI_SUCCESS != err) { return err; @@ -460,14 +460,14 @@ static int recv_cb(ompi_request_t * req) OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: In recv_cb, create isend to seg %d, peer %d, tag %d\n", send_context->con->rank, send_context->frag_id, send_context->peer, - (send_context->con->ireduce_tag << 16) + send_context->frag_id)); + send_context->con->ireduce_tag - send_context->frag_id)); ompi_request_t *send_req; err = MCA_PML_CALL(isend (send_context->buff, send_count, send_context->con->datatype, send_context->peer, - (send_context->con->ireduce_tag << 16) + send_context->frag_id, + send_context->con->ireduce_tag - send_context->frag_id, MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req)); if (MPI_SUCCESS != err) { return err; @@ -524,12 +524,10 @@ int ompi_coll_adapt_ireduce(const void *sbuf, void *rbuf, int count, struct ompi if (count == 0) { return MPI_SUCCESS; } - int ireduce_tag = opal_atomic_add_fetch_32(&(comm->c_ireduce_tag), 1); - ireduce_tag = (ireduce_tag % 4096) + 4096; OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, - "ireduce tag %d root %d, algorithm %d, coll_adapt_ireduce_segment_size %zu, coll_adapt_ireduce_max_send_requests %d, coll_adapt_ireduce_max_recv_requests %d\n", - ireduce_tag, root, mca_coll_adapt_component.adapt_ireduce_algorithm, + "ireduce root %d, algorithm %d, coll_adapt_ireduce_segment_size %zu, coll_adapt_ireduce_max_send_requests %d, coll_adapt_ireduce_max_recv_requests %d\n", + root, mca_coll_adapt_component.adapt_ireduce_algorithm, mca_coll_adapt_component.adapt_ireduce_segment_size, mca_coll_adapt_component.adapt_ireduce_max_send_requests, mca_coll_adapt_component.adapt_ireduce_max_recv_requests)); @@ -538,93 +536,78 @@ int ompi_coll_adapt_ireduce(const void *sbuf, void *rbuf, int count, struct ompi (ompi_coll_adapt_ireduce_fn_t) ompi_coll_adapt_ireduce_algorithm_index[mca_coll_adapt_component. adapt_ireduce_algorithm].algorithm_fn_ptr; - return reduce_func(sbuf, rbuf, count, dtype, op, root, comm, request, module, ireduce_tag); + return reduce_func(sbuf, rbuf, count, dtype, op, root, comm, request, module); } /* * Ireduce functions with different algorithms */ int ompi_coll_adapt_ireduce_tuned(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, - mca_coll_base_module_t *module, int ireduce_tag) + struct ompi_datatype_t *dtype, struct ompi_op_t *op, + int root, struct ompi_communicator_t *comm, + ompi_request_t ** request, + mca_coll_base_module_t *module) { OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n")); return OMPI_ERR_NOT_IMPLEMENTED; } int ompi_coll_adapt_ireduce_binomial(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module, int ireduce_tag) + struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { - ompi_coll_tree_t *tree = ompi_coll_base_topo_build_bmtree(comm, root); - int err = - ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module, - tree, mca_coll_adapt_component.adapt_ireduce_segment_size, - ireduce_tag); - return err; + return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, + request, module, ompi_coll_base_topo_build_bmtree(comm, root), + mca_coll_adapt_component.adapt_ireduce_segment_size); } int ompi_coll_adapt_ireduce_in_order_binomial(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, - mca_coll_base_module_t * module, int ireduce_tag) + struct ompi_datatype_t *dtype, struct ompi_op_t *op, + int root, struct ompi_communicator_t *comm, + ompi_request_t ** request, + mca_coll_base_module_t * module) { - ompi_coll_tree_t *tree = ompi_coll_base_topo_build_in_order_bmtree(comm, root); - int err = - ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module, - tree, mca_coll_adapt_component.adapt_ireduce_segment_size, - ireduce_tag); - return err; + return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, + request, module, ompi_coll_base_topo_build_in_order_bmtree(comm, root), + mca_coll_adapt_component.adapt_ireduce_segment_size); } int ompi_coll_adapt_ireduce_binary(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module, int ireduce_tag) + struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { - ompi_coll_tree_t *tree = ompi_coll_base_topo_build_tree(2, comm, root); - int err = - ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module, - tree, mca_coll_adapt_component.adapt_ireduce_segment_size, - ireduce_tag); - return err; + return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, + request, module, ompi_coll_base_topo_build_tree(2, comm, root), + mca_coll_adapt_component.adapt_ireduce_segment_size); } int ompi_coll_adapt_ireduce_pipeline(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module, int ireduce_tag) + struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { - ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(1, comm, root); - int err = - ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module, - tree, mca_coll_adapt_component.adapt_ireduce_segment_size, - ireduce_tag); - return err; + return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, + request, module, ompi_coll_base_topo_build_chain(1, comm, root), + mca_coll_adapt_component.adapt_ireduce_segment_size); } int ompi_coll_adapt_ireduce_chain(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module, int ireduce_tag) + struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { - ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(4, comm, root); - int err = - ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module, - tree, mca_coll_adapt_component.adapt_ireduce_segment_size, - ireduce_tag); - return err; + return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, + request, module, ompi_coll_base_topo_build_chain(4, comm, root), + mca_coll_adapt_component.adapt_ireduce_segment_size); } int ompi_coll_adapt_ireduce_linear(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module, int ireduce_tag) + struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { int fanout = ompi_comm_size(comm) - 1; ompi_coll_tree_t *tree; @@ -635,19 +618,17 @@ int ompi_coll_adapt_ireduce_linear(const void *sbuf, void *rbuf, int count, } else { tree = ompi_coll_base_topo_build_tree(MAXTREEFANOUT, comm, root); } - int err = - ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module, - tree, mca_coll_adapt_component.adapt_ireduce_segment_size, - ireduce_tag); - return err; + return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, + request, module, tree, + mca_coll_adapt_component.adapt_ireduce_segment_size); } int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module, ompi_coll_tree_t * tree, - size_t seg_size, int ireduce_tag) + struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module, ompi_coll_tree_t * tree, + size_t seg_size) { ptrdiff_t extent, lower_bound, segment_increment; @@ -777,12 +758,12 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, con->rbuf = (char *) rbuf; con->root = root; con->distance = distance; - con->ireduce_tag = ireduce_tag; + con->ireduce_tag = ompi_coll_base_nbc_reserve_tags(comm, num_segs); con->real_seg_size = real_seg_size; OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: start ireduce root %d tag %d\n", rank, tree->tree_root, - ireduce_tag)); + con->ireduce_tag)); /* If the current process is not leaf node */ if (tree->tree_nextsize > 0) { @@ -849,14 +830,14 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, "[%d]: In ireduce, create irecv for seg %d, peer %d, recv_count %d, inbuf %p tag %d\n", context->con->rank, context->frag_id, context->peer, recv_count, (void *) inbuf, - (ireduce_tag << 16) + seg_index)); + con->ireduce_tag - seg_index)); /* Create a recv request */ ompi_request_t *recv_req; err = MCA_PML_CALL(irecv (temp_recv_buf, recv_count, dtype, tree->tree_next[i], - (ireduce_tag << 16) + seg_index, comm, &recv_req)); + con->ireduce_tag - seg_index, comm, &recv_req)); if (MPI_SUCCESS != err) { return err; } @@ -908,14 +889,14 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: In ireduce, create isend to seg %d, peer %d, send_count %d tag %d\n", context->con->rank, context->frag_id, context->peer, - send_count, (ireduce_tag << 16) + context->frag_id)); + send_count, con->ireduce_tag - context->frag_id)); /* Create send request */ ompi_request_t *send_req; err = MCA_PML_CALL(isend (context->buff, send_count, dtype, tree->tree_prev, - (ireduce_tag << 16) + context->frag_id, + con->ireduce_tag - context->frag_id, MCA_PML_BASE_SEND_SYNCHRONOUS, comm, &send_req)); if (MPI_SUCCESS != err) { return err; diff --git a/ompi/mca/coll/base/coll_base_util.h b/ompi/mca/coll/base/coll_base_util.h index c83e46c2ddb..b54fc70664b 100644 --- a/ompi/mca/coll/base/coll_base_util.h +++ b/ompi/mca/coll/base/coll_base_util.h @@ -27,6 +27,8 @@ #include "ompi/mca/mca.h" #include "ompi/datatype/ompi_datatype.h" #include "ompi/request/request.h" +#include "ompi/communicator/communicator.h" +#include "ompi/mca/coll/base/coll_tags.h" #include "ompi/op/op.h" #include "ompi/mca/pml/pml.h" @@ -60,6 +62,22 @@ struct ompi_coll_base_nbc_request_t { OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_coll_base_nbc_request_t); +static inline int32_t +ompi_coll_base_nbc_reserve_tags(ompi_communicator_t* comm, int32_t reserve) +{ + int32_t tag, old_tag; + assert( reserve > 0 ); + reread_tag: /* In case we fail to atomically update the tag */ + tag = old_tag = comm->c_nbc_tag; + if ((tag - reserve) < MCA_COLL_BASE_TAG_NONBLOCKING_END) { + tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE; + } + if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_32(&comm->c_nbc_tag, &old_tag, tag - reserve) ) { + goto reread_tag; + } + return tag; +} + typedef struct ompi_coll_base_nbc_request_t ompi_coll_base_nbc_request_t; /** diff --git a/ompi/mca/coll/libnbc/coll_libnbc.h b/ompi/mca/coll/libnbc/coll_libnbc.h index 682010b6910..3bdeb9419fa 100644 --- a/ompi/mca/coll/libnbc/coll_libnbc.h +++ b/ompi/mca/coll/libnbc/coll_libnbc.h @@ -94,7 +94,6 @@ struct ompi_coll_libnbc_module_t { mca_coll_base_module_t super; opal_mutex_t mutex; bool comm_registered; - int tag; #ifdef NBC_CACHE_SCHEDULE void *NBC_Dict[NBC_NUM_COLL]; /* this should point to a struct hb_tree, but since this is a diff --git a/ompi/mca/coll/libnbc/nbc.c b/ompi/mca/coll/libnbc/nbc.c index 171f5a37e9c..35e02fe87bf 100644 --- a/ompi/mca/coll/libnbc/nbc.c +++ b/ompi/mca/coll/libnbc/nbc.c @@ -25,7 +25,7 @@ * Additional copyrights may follow */ #include "nbc_internal.h" -#include "ompi/mca/coll/base/coll_tags.h" +#include "ompi/mca/coll/base/coll_base_util.h" #include "ompi/op/op.h" #include "ompi/mca/pml/pml.h" @@ -595,7 +595,6 @@ void NBC_Return_handle(ompi_coll_libnbc_request_t *request) { } int NBC_Init_comm(MPI_Comm comm, NBC_Comminfo *comminfo) { - comminfo->tag= MCA_COLL_BASE_TAG_NONBLOCKING_BASE; #ifdef NBC_CACHE_SCHEDULE /* initialize the NBC_ALLTOALL SchedCache tree */ @@ -672,7 +671,7 @@ int NBC_Start(NBC_Handle *handle) { int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm, ompi_coll_libnbc_module_t *module, bool persistent, ompi_request_t **request, void *tmpbuf) { - int ret, tmp_tag; + int ret; bool need_register = false; ompi_coll_libnbc_request_t *handle; @@ -685,13 +684,7 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm, /* update the module->tag here because other processes may have operations * and they may update the module->tag */ - OPAL_THREAD_LOCK(&module->mutex); - tmp_tag = module->tag--; - if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) { - tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE; - NBC_DEBUG(2,"resetting tags ...\n"); - } - OPAL_THREAD_UNLOCK(&module->mutex); + (void)ompi_coll_base_nbc_reserve_tags(comm, 1); OBJ_RELEASE(schedule); free(tmpbuf); @@ -712,20 +705,15 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm, /******************** Do the tag and shadow comm administration ... ***************/ - OPAL_THREAD_LOCK(&module->mutex); - tmp_tag = module->tag--; - if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) { - tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE; - NBC_DEBUG(2,"resetting tags ...\n"); - } + handle->tag = ompi_coll_base_nbc_reserve_tags(comm, 1); + OPAL_THREAD_LOCK(&module->mutex); if (true != module->comm_registered) { module->comm_registered = true; need_register = true; } OPAL_THREAD_UNLOCK(&module->mutex); - handle->tag = tmp_tag; /* register progress */ if (need_register) { @@ -737,7 +725,6 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm, } handle->comm=comm; - /*printf("got module: %lu tag: %i\n", module, module->tag);*/ /******************** end of tag and shadow comm administration ... ***************/ handle->comminfo = module;