From da90df70370b4424afbb5310d209c5710b17c1e4 Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Wed, 1 Sep 2021 15:36:19 -0400 Subject: [PATCH] A better MPI_IN_PLACE alltoall algorithm. Provide optimized variant for the homogeneous case. Signed-off-by: George Bosilca --- ompi/mca/coll/basic/coll_basic_alltoallw.c | 161 +++++++++++---------- 1 file changed, 87 insertions(+), 74 deletions(-) diff --git a/ompi/mca/coll/basic/coll_basic_alltoallw.c b/ompi/mca/coll/basic/coll_basic_alltoallw.c index fe753b34e74..7131475b9ff 100644 --- a/ompi/mca/coll/basic/coll_basic_alltoallw.c +++ b/ompi/mca/coll/basic/coll_basic_alltoallw.c @@ -3,7 +3,7 @@ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2016 The University of Tennessee and The University + * Copyright (c) 2004-2021 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, @@ -36,97 +36,110 @@ #include "ompi/mca/coll/base/coll_tags.h" #include "ompi/mca/pml/pml.h" - +/* + * We want to minimize the amount of temporary memory needed while allowing as many ranks + * to exchange data simultaneously. We use a variation of the ring algorithm, where in a + * single step a process echange the data with both neighbors at distance k (on the left + * and the right on a logical ring topology). With this approach we need to pack the data + * for a single of the two neighbors, as we can then use the original buffer (and datatype + * and count) to send the data to the other. + */ static int mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, const int *rdisps, struct ompi_datatype_t * const *rdtypes, struct ompi_communicator_t *comm, mca_coll_base_module_t *module) { - int i, j, size, rank, err = MPI_SUCCESS; - ompi_request_t *req; - char *save_buffer = NULL; - size_t max_size = 0, packed_size; + int i, size, rank, left, right, err = MPI_SUCCESS; + ompi_request_t *req = MPI_REQUEST_NULL; + char *tmp_buffer = NULL; + size_t max_size = 0, packed_size, msg_size_left, msg_size_right; opal_convertor_t convertor; size = ompi_comm_size(comm); - rank = ompi_comm_rank(comm); - - /* If only one process, we're done. */ - if (1 == size) { + if (1 == size) { /* If only one process, we're done. */ return MPI_SUCCESS; } + rank = ompi_comm_rank(comm); - /* Find the largest amount of packed send/recv data */ - for (i = 0, max_size = 0 ; i < size ; ++i) { - ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i); - - packed_size = opal_datatype_compute_remote_size(&rdtypes[i]->super, - ompi_proc->super.proc_convertor->master->remote_sizes); - packed_size *= rcounts[i]; + /* Find the largest amount of packed send/recv data amoing all peers where + * we need to pack before the send. + */ + for (i = 1 ; i <= (size >> 1) ; ++i) { + right = (rank + i) % size; +#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT + ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, right); + + if( OPAL_LIKELY(opal_local_arch == ompi_proc->super.proc_convertor->master->remote_arch)) { + opal_datatype_type_size(&rdtypes[right]->super, &packed_size); + } else { + packed_size = opal_datatype_compute_remote_size(&rdtypes[right]->super, + ompi_proc->super.proc_convertor->master->remote_sizes); + } +#else + opal_datatype_type_size(&rdtypes[right]->super, &packed_size); +#endif /* OPAL_ENABLE_HETEROGENEOUS_SUPPORT */ + packed_size *= rcounts[right]; max_size = packed_size > max_size ? packed_size : max_size; } /* Allocate a temporary buffer */ - save_buffer = calloc (max_size, 1); - if (NULL == save_buffer) { + tmp_buffer = calloc (max_size, 1); + if (NULL == tmp_buffer) { return OMPI_ERR_OUT_OF_RESOURCE; } - /* in-place alltoallw slow algorithm (but works) */ - for (i = 0 ; i < size ; ++i) { - size_t msg_size_i; - ompi_datatype_type_size(rdtypes[i], &msg_size_i); - msg_size_i *= rcounts[i]; - for (j = i+1 ; j < size ; ++j) { - size_t msg_size_j; - struct iovec iov = {.iov_base = save_buffer, .iov_len = max_size}; - uint32_t iov_count = 1; - ompi_datatype_type_size(rdtypes[j], &msg_size_j); - msg_size_j *= rcounts[j]; - - /* Initiate all send/recv to/from others. */ - if (i == rank && msg_size_j != 0) { - ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, j); - opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0); - opal_convertor_prepare_for_send(&convertor, &rdtypes[j]->super, rcounts[j], - (char *) rbuf + rdisps[j]); - packed_size = max_size; - err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size); - if (1 != err) { goto error_hndl; } - - /* Exchange data with the peer */ - err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[j], rcounts[j], rdtypes[j], - j, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); - if (MPI_SUCCESS != err) { goto error_hndl; } - - err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED, - j, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, - comm)); - if (MPI_SUCCESS != err) { goto error_hndl; } - } else if (j == rank && msg_size_i != 0) { - ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i); - opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0); - opal_convertor_prepare_for_send(&convertor, &rdtypes[i]->super, rcounts[i], - (char *) rbuf + rdisps[i]); - packed_size = max_size; - err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size); - if (1 != err) { goto error_hndl; } - - /* Exchange data with the peer */ - err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[i], rcounts[i], rdtypes[i], - i, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); - if (MPI_SUCCESS != err) { goto error_hndl; } - - err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED, - i, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, - comm)); - if (MPI_SUCCESS != err) { goto error_hndl; } - } else { - continue; - } - - /* Wait for the requests to complete */ + for (i = 1 ; i <= (size >> 1) ; ++i) { + struct iovec iov = {.iov_base = tmp_buffer, .iov_len = max_size}; + uint32_t iov_count = 1; + + right = (rank + i) % size; + left = (rank + size - i) % size; + + ompi_datatype_type_size(rdtypes[right], &msg_size_right); + msg_size_right *= rcounts[right]; + + ompi_datatype_type_size(rdtypes[left], &msg_size_left); + msg_size_left *= rcounts[left]; + + if( 0 != msg_size_right ) { /* nothing to exchange with the peer on the right */ + ompi_proc_t *right_proc = ompi_comm_peer_lookup(comm, right); + opal_convertor_clone(right_proc->super.proc_convertor, &convertor, 0); + opal_convertor_prepare_for_send(&convertor, &rdtypes[right]->super, rcounts[right], + (char *) rbuf + rdisps[right]); + packed_size = max_size; + err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size); + if (1 != err) { goto error_hndl; } + + /* Receive data from the right */ + err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[right], rcounts[right], rdtypes[right], + right, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); + if (MPI_SUCCESS != err) { goto error_hndl; } + } + + if( (left != right) && (0 != msg_size_left) ) { + /* Send data to the left */ + err = MCA_PML_CALL(send ((char *) rbuf + rdisps[left], rcounts[left], rdtypes[left], + left, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, + comm)); + if (MPI_SUCCESS != err) { goto error_hndl; } + + err = ompi_request_wait (&req, MPI_STATUSES_IGNORE); + if (MPI_SUCCESS != err) { goto error_hndl; } + + /* Receive data from the left */ + err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[left], rcounts[left], rdtypes[left], + left, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); + if (MPI_SUCCESS != err) { goto error_hndl; } + } + + if( 0 != msg_size_right ) { /* nothing to exchange with the peer on the right */ + /* Send data to the right */ + err = MCA_PML_CALL(send ((char *) tmp_buffer, packed_size, MPI_PACKED, + right, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, + comm)); + if (MPI_SUCCESS != err) { goto error_hndl; } + err = ompi_request_wait (&req, MPI_STATUSES_IGNORE); if (MPI_SUCCESS != err) { goto error_hndl; } } @@ -134,7 +147,7 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con error_hndl: /* Free the temporary buffer */ - free (save_buffer); + free (tmp_buffer); /* All done */