From 6c5c83d3c95eae9e64ec6e354322467a8c456646 Mon Sep 17 00:00:00 2001 From: Jonathan Hu Date: Wed, 20 Nov 2024 09:26:56 -0800 Subject: [PATCH] Tpetra: revert #13491 Until we can diagnose segfaults reported wrt PR #13598. Signed-off-by: Jonathan Hu --- .../tpetra/core/src/Tpetra_CrsMatrix_def.hpp | 90 ++- .../src/Tpetra_Details_DistributorActor.hpp | 652 +----------------- .../tpetra/core/src/Tpetra_Distributor.hpp | 89 +-- 3 files changed, 74 insertions(+), 757 deletions(-) diff --git a/packages/tpetra/core/src/Tpetra_CrsMatrix_def.hpp b/packages/tpetra/core/src/Tpetra_CrsMatrix_def.hpp index a88b5ca649ba..f0eef6b3b32e 100644 --- a/packages/tpetra/core/src/Tpetra_CrsMatrix_def.hpp +++ b/packages/tpetra/core/src/Tpetra_CrsMatrix_def.hpp @@ -47,7 +47,6 @@ #include "KokkosBlas1_scal.hpp" #include "KokkosSparse_getDiagCopy.hpp" #include "KokkosSparse_spmv.hpp" -#include "Kokkos_StdAlgorithms.hpp" #include #include @@ -8302,16 +8301,24 @@ CrsMatrix:: << std::endl; std::cerr << os.str (); } - destMat->numExportPacketsPerLID_.sync_device(); - auto numExportPacketsPerLID = destMat->numExportPacketsPerLID_.view_device(); - auto numImportPacketsPerLID = destMat->numImportPacketsPerLID_.view_device(); + // Make sure that host has the latest version, since we're + // using the version on host. If host has the latest + // version, syncing to host does nothing. + destMat->numExportPacketsPerLID_.sync_host (); + Teuchos::ArrayView numExportPacketsPerLID = + getArrayViewFromDualView (destMat->numExportPacketsPerLID_); + destMat->numImportPacketsPerLID_.sync_host (); + Teuchos::ArrayView numImportPacketsPerLID = + getArrayViewFromDualView (destMat->numImportPacketsPerLID_); + if (verbose) { std::ostringstream os; os << *verbosePrefix << "Calling 3-arg doReversePostsAndWaits" << std::endl; std::cerr << os.str (); } - Distor.doReversePostsAndWaits(numExportPacketsPerLID, 1, numImportPacketsPerLID); + Distor.doReversePostsAndWaits(destMat->numExportPacketsPerLID_.view_host(), 1, + destMat->numImportPacketsPerLID_.view_host()); if (verbose) { std::ostringstream os; os << *verbosePrefix << "Finished 3-arg doReversePostsAndWaits" @@ -8319,26 +8326,34 @@ CrsMatrix:: std::cerr << os.str (); } - size_t totalImportPackets = Kokkos::Experimental::reduce(typename Node::execution_space(), numImportPacketsPerLID); + size_t totalImportPackets = 0; + for (Array_size_type i = 0; i < numImportPacketsPerLID.size (); ++i) { + totalImportPackets += numImportPacketsPerLID[i]; + } // Reallocation MUST go before setting the modified flag, // because it may clear out the flags. destMat->reallocImportsIfNeeded (totalImportPackets, verbose, verbosePrefix.get ()); destMat->imports_.modify_host (); - auto deviceImports = destMat->imports_.view_device(); - auto deviceExports = destMat->exports_.view_device(); + auto hostImports = destMat->imports_.view_host(); + // This is a legacy host pack/unpack path, so use the host + // version of exports_. + destMat->exports_.sync_host (); + auto hostExports = destMat->exports_.view_host(); if (verbose) { std::ostringstream os; - os << *verbosePrefix << "Calling 4-arg doReversePostsAndWaitsKokkos" + os << *verbosePrefix << "Calling 4-arg doReversePostsAndWaits" << std::endl; std::cerr << os.str (); } - destMat->imports_.sync_device(); - Distor.doReversePostsAndWaitsKokkos (deviceExports, numExportPacketsPerLID, deviceImports, numImportPacketsPerLID); + Distor.doReversePostsAndWaits (hostExports, + numExportPacketsPerLID, + hostImports, + numImportPacketsPerLID); if (verbose) { std::ostringstream os; - os << *verbosePrefix << "Finished 4-arg doReversePostsAndWaitsKokkos" + os << *verbosePrefix << "Finished 4-arg doReversePostsAndWaits" << std::endl; std::cerr << os.str (); } @@ -8381,16 +8396,23 @@ CrsMatrix:: << std::endl; std::cerr << os.str (); } - destMat->numExportPacketsPerLID_.sync_device (); - auto numExportPacketsPerLID = destMat->numExportPacketsPerLID_.view_device(); - auto numImportPacketsPerLID = destMat->numImportPacketsPerLID_.view_device(); + // Make sure that host has the latest version, since we're + // using the version on host. If host has the latest + // version, syncing to host does nothing. + destMat->numExportPacketsPerLID_.sync_host (); + Teuchos::ArrayView numExportPacketsPerLID = + getArrayViewFromDualView (destMat->numExportPacketsPerLID_); + destMat->numImportPacketsPerLID_.sync_host (); + Teuchos::ArrayView numImportPacketsPerLID = + getArrayViewFromDualView (destMat->numImportPacketsPerLID_); if (verbose) { std::ostringstream os; os << *verbosePrefix << "Calling 3-arg doPostsAndWaits" << std::endl; std::cerr << os.str (); } - Distor.doPostsAndWaits(numExportPacketsPerLID, 1, numImportPacketsPerLID); + Distor.doPostsAndWaits(destMat->numExportPacketsPerLID_.view_host(), 1, + destMat->numImportPacketsPerLID_.view_host()); if (verbose) { std::ostringstream os; os << *verbosePrefix << "Finished 3-arg doPostsAndWaits" @@ -8398,26 +8420,34 @@ CrsMatrix:: std::cerr << os.str (); } - size_t totalImportPackets = Kokkos::Experimental::reduce(typename Node::execution_space(), numImportPacketsPerLID); + size_t totalImportPackets = 0; + for (Array_size_type i = 0; i < numImportPacketsPerLID.size (); ++i) { + totalImportPackets += numImportPacketsPerLID[i]; + } // Reallocation MUST go before setting the modified flag, // because it may clear out the flags. destMat->reallocImportsIfNeeded (totalImportPackets, verbose, verbosePrefix.get ()); destMat->imports_.modify_host (); - auto deviceImports = destMat->imports_.view_device(); - auto deviceExports = destMat->exports_.view_device(); + auto hostImports = destMat->imports_.view_host(); + // This is a legacy host pack/unpack path, so use the host + // version of exports_. + destMat->exports_.sync_host (); + auto hostExports = destMat->exports_.view_host(); if (verbose) { std::ostringstream os; - os << *verbosePrefix << "Calling 4-arg doPostsAndWaitsKokkos" + os << *verbosePrefix << "Calling 4-arg doPostsAndWaits" << std::endl; std::cerr << os.str (); } - destMat->imports_.sync_device (); - Distor.doPostsAndWaitsKokkos (deviceExports, numExportPacketsPerLID, deviceImports, numImportPacketsPerLID); + Distor.doPostsAndWaits (hostExports, + numExportPacketsPerLID, + hostImports, + numImportPacketsPerLID); if (verbose) { std::ostringstream os; - os << *verbosePrefix << "Finished 4-arg doPostsAndWaitsKokkos" + os << *verbosePrefix << "Finished 4-arg doPostsAndWaits" << std::endl; std::cerr << os.str (); } @@ -8464,6 +8494,12 @@ CrsMatrix:: Teuchos::Array RemotePids; if (runOnHost) { Teuchos::Array TargetPids; + // Backwards compatibility measure. We'll use this again below. + + // TODO JHU Need to track down why numImportPacketsPerLID_ has not been corrently marked as modified on host (which it has been) + // TODO JHU somewhere above, e.g., call to Distor.doPostsAndWaits(). + // TODO JHU This only becomes apparent as we begin to convert TAFC to run on device. + destMat->numImportPacketsPerLID_.modify_host(); //FIXME # ifdef HAVE_TPETRA_MMM_TIMINGS RCP tmCopySPRdata = rcp(new TimeMonitor(*TimeMonitor::getNewTimer(prefix + std::string("TAFC unpack-count-resize + copy same-perm-remote data")))); @@ -8655,6 +8691,14 @@ CrsMatrix:: } else { // run on device + + // Backwards compatibility measure. We'll use this again below. + + // TODO JHU Need to track down why numImportPacketsPerLID_ has not been corrently marked as modified on host (which it has been) + // TODO JHU somewhere above, e.g., call to Distor.doPostsAndWaits(). + // TODO JHU This only becomes apparent as we begin to convert TAFC to run on device. + destMat->numImportPacketsPerLID_.modify_host(); //FIXME + # ifdef HAVE_TPETRA_MMM_TIMINGS RCP tmCopySPRdata = rcp(new TimeMonitor(*TimeMonitor::getNewTimer(prefix + std::string("TAFC unpack-count-resize + copy same-perm-remote data")))); # endif diff --git a/packages/tpetra/core/src/Tpetra_Details_DistributorActor.hpp b/packages/tpetra/core/src/Tpetra_Details_DistributorActor.hpp index 24e8351a6133..9b021ac53e9b 100644 --- a/packages/tpetra/core/src/Tpetra_Details_DistributorActor.hpp +++ b/packages/tpetra/core/src/Tpetra_Details_DistributorActor.hpp @@ -22,7 +22,6 @@ #include "Teuchos_Time.hpp" #include "Kokkos_TeuchosCommAdapters.hpp" -#include "Kokkos_StdAlgorithms.hpp" #ifdef HAVE_TPETRA_MPI #include "mpi.h" @@ -54,13 +53,6 @@ class DistributorActor { const ImpView &imports, const Teuchos::ArrayView& numImportPacketsPerLID); - template - void doPostsAndWaitsKokkos(const DistributorPlan& plan, - const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID); - template void doPosts(const DistributorPlan& plan, const ExpView& exports, @@ -74,27 +66,6 @@ class DistributorActor { const ImpView &imports, const Teuchos::ArrayView& numImportPacketsPerLID); - template - void doPostsKokkos(const DistributorPlan& plan, - const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID); - - template - void doPostsAllToAllKokkos( - const DistributorPlan &plan, const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID); - - template - void doPostsNbrAllToAllVKokkos( - const DistributorPlan &plan, const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID); - void doWaits(const DistributorPlan& plan); bool isReady() const; @@ -176,22 +147,6 @@ void DistributorActor::doPostsAndWaits(const DistributorPlan& plan, doWaits(plan); } - -template -void DistributorActor::doPostsAndWaitsKokkos(const DistributorPlan& plan, - const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID) -{ - static_assert(areKokkosViews, - "Data arrays for DistributorActor::doPostsAndWaitsKokkos must be Kokkos::Views"); - static_assert(areKokkosViews, - "Num packets arrays for DistributorActor::doPostsAndWaitsKokkos must be Kokkos::Views"); - doPostsKokkos(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); - doWaits(plan); -} - template using HostAccessibility = Kokkos::SpaceAccessibility; @@ -805,140 +760,6 @@ void DistributorActor::doPostsAllToAll( << "\"."); } -template -void DistributorActor::doPostsAllToAllKokkos( - const DistributorPlan &plan, const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID) { - TEUCHOS_TEST_FOR_EXCEPTION( - !plan.getIndicesTo().is_null(), std::runtime_error, - "Send Type=\"Alltoall\" only works for fast-path communication."); - - using size_type = Teuchos::Array::size_type; - using ExpExecSpace = typename ExpPacketsView::execution_space; - using ImpExecSpace = typename ImpPacketsView::execution_space; - - auto comm = plan.getComm(); - Kokkos::View sendcounts("sendcounts", comm->getSize()); - Kokkos::View sdispls("sdispls", comm->getSize()); - Kokkos::View recvcounts("recvcounts", comm->getSize()); - Kokkos::View rdispls("rdispls", comm->getSize()); - - auto sendcounts_d = Kokkos::create_mirror_view(ExpExecSpace(), sendcounts); - auto sdispls_d = Kokkos::create_mirror_view(ExpExecSpace(), sdispls); - auto recvcounts_d = Kokkos::create_mirror_view(ImpExecSpace(), recvcounts); - auto rdispls_d = Kokkos::create_mirror_view(ImpExecSpace(), rdispls); - - auto getStartsTo = Kokkos::Compat::getKokkosViewDeepCopy(plan.getStartsTo()); - auto getLengthsTo = Kokkos::Compat::getKokkosViewDeepCopy(plan.getLengthsTo()); - auto getProcsTo = Kokkos::Compat::getKokkosViewDeepCopy(plan.getProcsTo()); - - size_t curPKToffset = 0; - Kokkos::parallel_scan(Kokkos::RangePolicy(0, plan.getNumSends()), KOKKOS_LAMBDA(const size_t pp, size_t& offset, bool is_final) { - sdispls_d(getProcsTo(pp)) = offset; - size_t numPackets = 0; - for (size_t j = getStartsTo(pp); j < getStartsTo(pp) + getLengthsTo(pp); ++j) { - numPackets += numExportPacketsPerLID(j); - } - sendcounts_d(getProcsTo(pp)) = static_cast(numPackets); - offset += numPackets; - }, curPKToffset); - - int overflow; - Kokkos::parallel_reduce(Kokkos::RangePolicy(0, plan.getNumSends()), KOKKOS_LAMBDA(const size_t pp, int& index) { - if(sendcounts_d(getProcsTo(pp)) < 0) { - index = pp+1; - } - }, overflow); - - // numPackets is converted down to int, so make sure it can be represented - TEUCHOS_TEST_FOR_EXCEPTION(overflow, std::logic_error, - "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): " - "Send count for send " - << overflow-1 << " is too large " - "to be represented as int."); - - const size_type actualNumReceives = - Teuchos::as(plan.getNumReceives()) + - Teuchos::as(plan.hasSelfMessage() ? 1 : 0); - - auto getLengthsFrom = Kokkos::Compat::getKokkosViewDeepCopy(plan.getLengthsFrom()); - auto getProcsFrom = Kokkos::Compat::getKokkosViewDeepCopy(plan.getProcsFrom()); - - Kokkos::View curLIDoffset("curLIDoffset", actualNumReceives); - Kokkos::parallel_scan(Kokkos::RangePolicy(0, actualNumReceives), KOKKOS_LAMBDA(const size_type i, size_t& offset, bool is_final) { - if(is_final) curLIDoffset(i) = offset; - offset += getLengthsFrom(i); - }); - - Kokkos::parallel_scan(Kokkos::RangePolicy(0, actualNumReceives), KOKKOS_LAMBDA(const size_type i, size_t& curBufferOffset, bool is_final) { - size_t totalPacketsFrom_i = 0; - for(size_t j = 0; j < getLengthsFrom(i); j++) { - totalPacketsFrom_i += numImportPacketsPerLID(curLIDoffset(i) + j); - } - - if(is_final) rdispls_d(getProcsFrom(i)) = curBufferOffset; - if(is_final) recvcounts_d(getProcsFrom(i)) = static_cast(totalPacketsFrom_i); - curBufferOffset += totalPacketsFrom_i; - }); - - Kokkos::parallel_reduce(Kokkos::RangePolicy(0, actualNumReceives), KOKKOS_LAMBDA(const size_type i, int& index) { - if(recvcounts_d(getProcsFrom(i)) < 0) { - index = i+1; - } - }, overflow); - - // totalPacketsFrom_i is converted down to int, so make sure it can be - // represented - TEUCHOS_TEST_FOR_EXCEPTION(overflow, std::logic_error, - "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): " - "Recv count for receive " - << overflow-1 << " is too large " - "to be represented as int."); - - Kokkos::deep_copy(sendcounts, sendcounts_d); - Kokkos::deep_copy(sdispls, sdispls_d); - Kokkos::deep_copy(recvcounts, recvcounts_d); - Kokkos::deep_copy(rdispls, rdispls_d); - - Teuchos::RCP> mpiComm = - Teuchos::rcp_dynamic_cast>(comm); - Teuchos::RCP> rawComm = - mpiComm->getRawMpiComm(); - using T = typename ExpView::non_const_value_type; - MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits::getType(T()); - -#if defined(HAVE_TPETRACORE_MPI_ADVANCE) - if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) { - MPIX_Comm *mpixComm = *plan.getMPIXComm(); - TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error, - "MPIX_Comm is null in doPostsAllToAll \"" - << __FILE__ << ":" << __LINE__); - - const int err = MPIX_Alltoallv( - exports.data(), sendcounts.data(), sdispls.data(), rawType, - imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm); - - TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error, - "MPIX_Alltoallv failed with error \"" - << Teuchos::mpiErrorCodeToString(err) - << "\"."); - - return; - } -#endif // HAVE_TPETRACORE_MPI_ADVANCE - - const int err = MPI_Alltoallv( - exports.data(), sendcounts.data(), sdispls.data(), rawType, - imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)()); - - TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error, - "MPI_Alltoallv failed with error \"" - << Teuchos::mpiErrorCodeToString(err) - << "\"."); -} - #if defined(HAVE_TPETRACORE_MPI_ADVANCE) template void DistributorActor::doPostsNbrAllToAllV( @@ -1019,117 +840,6 @@ void DistributorActor::doPostsNbrAllToAllV( << Teuchos::mpiErrorCodeToString(err) << "\"."); } - -template -void DistributorActor::doPostsNbrAllToAllVKokkos( - const DistributorPlan &plan, const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID) { - TEUCHOS_TEST_FOR_EXCEPTION( - !plan.getIndicesTo().is_null(), std::runtime_error, - "Send Type=\"Alltoall\" only works for fast-path communication."); - - const Teuchos_Ordinal numSends = plan.getProcsTo().size(); - const Teuchos_Ordinal numRecvs = plan.getProcsFrom().size(); - - auto comm = plan.getComm(); - Kokkos::View sendcounts("sendcounts", comm->getSize()); - Kokkos::View sdispls("sdispls", comm->getSize()); - Kokkos::View recvcounts("recvcounts", comm->getSize()); - Kokkos::View rdispls("rdispls", comm->getSize()); - - auto sendcounts_d = Kokkos::create_mirror_view(ExpExecSpace(), sendcounts); - auto sdispls_d = Kokkos::create_mirror_view(ExpExecSpace(), sdispls); - auto recvcounts_d = Kokkos::create_mirror_view(ImpExecSpace(), recvcounts); - auto rdispls_d = Kokkos::create_mirror_view(ImpExecSpace(), rdispls); - - auto getStartsTo = Kokkos::Compat::getKokkosViewDeepCopy(plan.getStartsTo()); - auto getLengthsTo = Kokkos::Compat::getKokkosViewDeepCopy(plan.getLengthsTo()); - - Teuchos::RCP> mpiComm = - Teuchos::rcp_dynamic_cast>(comm); - Teuchos::RCP> rawComm = - mpiComm->getRawMpiComm(); - using T = typename ExpView::non_const_value_type; - using ExpExecSpace = typename ExpPacketsView::execution_space; - using ImpExecSpace = typename ImpPacketsView::execution_space; - MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits::getType(T()); - - // unlike standard alltoall, entry `i` in sdispls and sendcounts - // refer to the ith participating rank, rather than rank i - Kokkos::parallel_scan(Kokkos::RangePolicy(0, numSends), KOKKOS_LAMBDA(const Teuchos_Ordinal pp, size_t& curPKToffset, bool is_final) { - sdispls_d(pp) = curPKToffset; - size_t numPackets = 0; - for (size_t j = getStartsTo(pp); j < getStartsTo(pp) + getLengthsTo(pp); ++j) { - numPackets += numExportPacketsPerLID(j); - } - sendcounts_d(pp) = static_cast(numPackets); - curPKToffset += numPackets; - }); - - int overflow; - Kokkos::parallel_reduce(Kokkos::RangePolicy(0, numSends), KOKKOS_LAMBDA(const Teuchos_Ordinal pp, int& index) { - if(sendcounts_d(pp) < 0) { - index = i+1; - } - }, overflow); - - // numPackets is converted down to int, so make sure it can be represented - TEUCHOS_TEST_FOR_EXCEPTION(overflow, std::logic_error, - "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): " - "Send count for send " - << overflow-1 << " is too large " - "to be represented as int."); - - auto getLengthsFrom = Kokkos::Compat::getKokkosViewDeepCopy(plan.getLengthsFrom()); - - Kokkos::View curLIDoffset("curLIDoffset", numRecvs); - Kokkos::parallel_scan(Kokkos::RangePolicy(0, numRecvs), KOKKOS_LAMBDA(const Teuchos_Ordinal i, size_t& offset, bool is_final) { - if(is_final) curLIDoffset(i) = offset; - offset += getLengthsFrom(i); - }); - - Kokkos::parallel_scan(Kokkos::RangePolicy(0, numRecvs), KOKKOS_LAMBDA(const Teuchos_Ordinal i, size_t& curBufferOffset, bool is_final) { - rdispls_d(i) = curBufferOffset; - size_t totalPacketsFrom_i = 0; - for(size_t j = 0; j < getLengthsFrom(i); j++) { - totalPacketsFrom_i += numImportPacketsPerLID(curLIDoffset(i) + j); - } - - recvcounts_d(i) = static_cast(totalPacketsFrom_i); - curBufferOffset += totalPacketsFrom_i; - }); - - Kokkos::parallel_reduce(Kokkos::RangePolicy(0, numRecvs), KOKKOS_LAMBDA(const Teuchos_Ordinal i, int& index) { - if(recvcounts_d(pp) < 0) { - index = i+1; - } - }, overflow); - - // totalPacketsFrom_i is converted down to int, so make sure it can be - // represented - TEUCHOS_TEST_FOR_EXCEPTION(overflow, std::logic_error, - "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): " - "Recv count for receive " - << overflow-1 << ") is too large " - "to be represented as int."); - - Kokkos::deep_copy(sendcounts, sendcounts_d); - Kokkos::deep_copy(sdispls, sdispls_d); - Kokkos::deep_copy(recvcounts, recvcounts_d); - Kokkos::deep_copy(rdispls, rdispls_d); - - MPIX_Comm *mpixComm = *plan.getMPIXComm(); - const int err = MPIX_Neighbor_alltoallv( - exports.data(), sendcounts.data(), sdispls.data(), rawType, - imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm); - - TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error, - "MPIX_Neighbor_alltoallv failed with error \"" - << Teuchos::mpiErrorCodeToString(err) - << "\"."); -} #endif // HAVE_TPETRACORE_MPI_ADVANCE #endif // HAVE_TPETRA_MPI // clang-format off @@ -1397,16 +1107,16 @@ void DistributorActor::doPosts(const DistributorPlan& plan, // This buffer is long enough for only one message at a time. // Thus, we use DISTRIBUTOR_SEND always in this case, regardless - // of sendType requested by user. + // of sendType requested by user. // This code path formerly errored out with message: - // Tpetra::Distributor::doPosts(4-arg, Kokkos): + // Tpetra::Distributor::doPosts(4-arg, Kokkos): // The "send buffer" code path // doesn't currently work with nonblocking sends. // Now, we opt to just do the communication in a way that works. #ifdef HAVE_TPETRA_DEBUG if (sendType != Details::DISTRIBUTOR_SEND) { if (plan.getComm()->getRank() == 0) - std::cout << "The requested Tpetra send type " + std::cout << "The requested Tpetra send type " << DistributorSendTypeEnumToString(sendType) << " requires Distributor data to be ordered by" << " the receiving processor rank. Since these" @@ -1415,7 +1125,7 @@ void DistributorActor::doPosts(const DistributorPlan& plan, } #endif - Kokkos::View sendArray ("sendArray", + Kokkos::View sendArray ("sendArray", maxNumPackets); Array indicesOffsets (numExportPacketsPerLID.size(), 0); @@ -1470,360 +1180,6 @@ void DistributorActor::doPosts(const DistributorPlan& plan, } } -template -void DistributorActor::doPostsKokkos(const DistributorPlan& plan, - const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID) -{ - static_assert(areKokkosViews, - "Data arrays for DistributorActor::doPostsKokkos must be Kokkos::Views"); - static_assert(areKokkosViews, - "Num packets arrays for DistributorActor::doPostsKokkos must be Kokkos::Views"); - using Teuchos::Array; - using Teuchos::as; - using Teuchos::ireceive; - using Teuchos::isend; - using Teuchos::send; - using Teuchos::TypeNameTraits; - using std::endl; - using Kokkos::Compat::create_const_view; - using Kokkos::Compat::create_view; - using Kokkos::Compat::subview_offset; - using Kokkos::Compat::deep_copy_offset; - using ExpExecSpace = typename ExpPacketsView::execution_space; - using ImpExecSpace = typename ImpPacketsView::execution_space; - typedef Array::size_type size_type; - typedef ExpView exports_view_type; - typedef ImpView imports_view_type; - -#ifdef KOKKOS_ENABLE_CUDA - static_assert (! std::is_same::value && - ! std::is_same::value, - "Please do not use Tpetra::Distributor with UVM " - "allocations. See GitHub issue #1088."); -#endif // KOKKOS_ENABLE_CUDA - -#ifdef KOKKOS_ENABLE_SYCL - static_assert (! std::is_same::value && - ! std::is_same::value, - "Please do not use Tpetra::Distributor with SharedUSM " - "allocations. See GitHub issue #1088 (corresponding to CUDA)."); -#endif // KOKKOS_ENABLE_SYCL - -#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS - Teuchos::TimeMonitor timeMon (*timer_doPosts4KV_); -#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS - - // Run-time configurable parameters that come from the input - // ParameterList set by setParameterList(). - const Details::EDistributorSendType sendType = plan.getSendType(); - -#ifdef HAVE_TPETRA_MPI - // All-to-all communication layout is quite different from - // point-to-point, so we handle it separately. - if (sendType == Details::DISTRIBUTOR_ALLTOALL) { - doPostsAllToAllKokkos(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); - return; - } -#ifdef HAVE_TPETRACORE_MPI_ADVANCE - else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) - { - doPostsAllToAllKokkos(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); - return; - } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) { - doPostsNbrAllToAllVKokkos(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); - return; - } -#endif - -#else // HAVE_TPETRA_MPI - if (plan.hasSelfMessage()) { - size_t packetsPerSend; - Kokkos::parallel_reduce(Kokkos::RangePolicy(plan.getStartsTo()[0], plan.getStartsTo()[0]+plan.getLengthsTo()[0]), KOKKOS_LAMBDA(const size_t j, size_t& packets) { - packets += numExportPacketsPerLID(j); - }, packetsPerSend); - - deep_copy_offset(imports, exports, (size_t)0, (size_t)0, packetsPerSend); - } -#endif // HAVE_TPETRA_MPI - - const int myProcID = plan.getComm()->getRank (); - size_t selfReceiveOffset = 0; - -#ifdef HAVE_TPETRA_DEBUG - // Different messages may have different numbers of packets. - size_t totalNumImportPackets = Kokkos::Experimental::reduce(ImpExecSpace(), numImportPacketsPerLID); - TEUCHOS_TEST_FOR_EXCEPTION( - imports.extent (0) < totalNumImportPackets, std::runtime_error, - "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): The 'imports' array must have " - "enough entries to hold the expected number of import packets. " - "imports.extent(0) = " << imports.extent (0) << " < " - "totalNumImportPackets = " << totalNumImportPackets << "."); - TEUCHOS_TEST_FOR_EXCEPTION - (requests_.size () != 0, std::logic_error, "Tpetra::Distributor::" - "doPostsKokkos(4 args, Kokkos): Process " << myProcID << ": requests_.size () = " - << requests_.size () << " != 0."); -#endif // HAVE_TPETRA_DEBUG - // Distributor uses requests_.size() as the number of outstanding - // nonblocking message requests, so we resize to zero to maintain - // this invariant. - // - // getNumReceives() does _not_ include the self message, if there is - // one. Here, we do actually send a message to ourselves, so we - // include any self message in the "actual" number of receives to - // post. - // - // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts() - // doesn't (re)allocate its array of requests. That happens in - // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on - // demand), or Resize_(). - const size_type actualNumReceives = as (plan.getNumReceives()) + - as (plan.hasSelfMessage() ? 1 : 0); - requests_.resize (0); - - // Post the nonblocking receives. It's common MPI wisdom to post - // receives before sends. In MPI terms, this means favoring - // adding to the "posted queue" (of receive requests) over adding - // to the "unexpected queue" (of arrived messages not yet matched - // with a receive). - { -#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS - Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4KV_recvs_); -#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS - - size_t curBufferOffset = 0; - size_t curLIDoffset = 0; - for (size_type i = 0; i < actualNumReceives; ++i) { - size_t totalPacketsFrom_i = 0; - Kokkos::parallel_reduce(Kokkos::RangePolicy(0, plan.getLengthsFrom()[i]), KOKKOS_LAMBDA(const size_t j, size_t& total) { - total += numImportPacketsPerLID(curLIDoffset+j); - }, totalPacketsFrom_i); - // totalPacketsFrom_i is converted down to int, so make sure it can be represented - TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX), - std::logic_error, "Tpetra::Distributor::doPostsKokkos(3 args, Kokkos): " - "Recv count for receive " << i << " (" << totalPacketsFrom_i << ") is too large " - "to be represented as int."); - curLIDoffset += plan.getLengthsFrom()[i]; - if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) { - // If my process is receiving these packet(s) from another - // process (not a self-receive), and if there is at least - // one packet to receive: - // - // 1. Set up the persisting view (recvBuf) into the imports - // array, given the offset and size (total number of - // packets from process getProcsFrom()[i]). - // 2. Start the Irecv and save the resulting request. - imports_view_type recvBuf = - subview_offset (imports, curBufferOffset, totalPacketsFrom_i); - requests_.push_back (ireceive (recvBuf, plan.getProcsFrom()[i], - mpiTag_, *plan.getComm())); - } - else { // Receiving these packet(s) from myself - selfReceiveOffset = curBufferOffset; // Remember the offset - } - curBufferOffset += totalPacketsFrom_i; - } - } - -#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS - Teuchos::TimeMonitor timeMonSends (*timer_doPosts4KV_sends_); -#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS - - // setup views containing starting-offsets into exports for each send, - // and num-packets-to-send for each send. - Kokkos::View sendPacketOffsets("sendPacketOffsets", plan.getNumSends()); - Kokkos::View packetsPerSend("packetsPerSend", plan.getNumSends()); - auto sendPacketOffsets_d = Kokkos::create_mirror_view(ExpExecSpace(), sendPacketOffsets); - auto packetsPerSend_d = Kokkos::create_mirror_view(ExpExecSpace(), packetsPerSend); - - auto starts = Kokkos::Compat::getKokkosViewDeepCopy(plan.getStartsTo()); - auto lengths = Kokkos::Compat::getKokkosViewDeepCopy(plan.getLengthsTo()); - - Kokkos::parallel_scan(Kokkos::RangePolicy(0, plan.getNumSends()), KOKKOS_LAMBDA(const size_t pp, size_t& curPKToffset, bool final_pass) { - if(final_pass) sendPacketOffsets_d(pp) = curPKToffset; - size_t numPackets = 0; - for(size_t j = starts(pp); j < starts(pp) + lengths(pp); j++) { - numPackets += numExportPacketsPerLID(j); - } - if(final_pass) packetsPerSend_d(pp) = numPackets; - curPKToffset += numPackets; - }); - - size_t maxNumPackets; - Kokkos::parallel_reduce(Kokkos::RangePolicy(0, plan.getNumSends()), KOKKOS_LAMBDA(const size_t pp, size_t& max) { - if(packetsPerSend_d(pp) > max) { - max = packetsPerSend_d(pp); - } - }, Kokkos::Max(maxNumPackets)); - - // numPackets will be used as a message length, so make sure it can be represented as int - TEUCHOS_TEST_FOR_EXCEPTION(maxNumPackets > size_t(INT_MAX), - std::logic_error, "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): " - "numPackets = " << maxNumPackets << " is too large " - "to be represented as int."); - - Kokkos::deep_copy(sendPacketOffsets, sendPacketOffsets_d); - Kokkos::deep_copy(packetsPerSend, packetsPerSend_d); - - // setup scan through getProcsTo() list starting with higher numbered procs - // (should help balance message traffic) - size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage(); - size_t procIndex = 0; - while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myProcID)) { - ++procIndex; - } - if (procIndex == numBlocks) { - procIndex = 0; - } - - size_t selfNum = 0; - size_t selfIndex = 0; - if (plan.getIndicesTo().is_null()) { - -#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS - Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_fast_); -#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS - - // Data are already blocked (laid out) by process, so we don't - // need a separate send buffer (besides the exports array). - for (size_t i = 0; i < numBlocks; ++i) { - size_t p = i + procIndex; - if (p > (numBlocks - 1)) { - p -= numBlocks; - } - - if (plan.getProcsTo()[p] != myProcID && packetsPerSend[p] > 0) { - exports_view_type tmpSend = - subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]); - - if (sendType == Details::DISTRIBUTOR_ISEND) { - exports_view_type tmpSendBuf = - subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]); - requests_.push_back (isend (tmpSendBuf, plan.getProcsTo()[p], - mpiTag_, *plan.getComm())); - } - else { // DISTRIBUTOR_SEND - send (tmpSend, - as (tmpSend.size ()), - plan.getProcsTo()[p], mpiTag_, *plan.getComm()); - } - } - else { // "Sending" the message to myself - selfNum = p; - } - } - - if (plan.hasSelfMessage()) { - deep_copy_offset(imports, exports, selfReceiveOffset, - sendPacketOffsets[selfNum], packetsPerSend[selfNum]); - } - } - else { // data are not blocked by proc, use send buffer - -#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS - Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_slow_); -#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS - - // FIXME (mfh 05 Mar 2013) This may be broken for Isend. - typedef typename ExpView::non_const_value_type Packet; - typedef typename ExpView::array_layout Layout; - typedef typename ExpView::device_type Device; - typedef typename ExpView::memory_traits Mem; - - // This buffer is long enough for only one message at a time. - // Thus, we use DISTRIBUTOR_SEND always in this case, regardless - // of sendType requested by user. - // This code path formerly errored out with message: - // Tpetra::Distributor::doPostsKokkos(4-arg, Kokkos): - // The "send buffer" code path - // doesn't currently work with nonblocking sends. - // Now, we opt to just do the communication in a way that works. -#ifdef HAVE_TPETRA_DEBUG - if (sendType != Details::DISTRIBUTOR_SEND) { - if (plan.getComm()->getRank() == 0) - std::cout << "The requested Tpetra send type " - << DistributorSendTypeEnumToString(sendType) - << " requires Distributor data to be ordered by" - << " the receiving processor rank. Since these" - << " data are not ordered, Tpetra will use Send" - << " instead." << std::endl; - } -#endif - - Kokkos::View sendArray ("sendArray", - maxNumPackets); - - Kokkos::View indicesOffsets ("indicesOffsets", numExportPacketsPerLID.extent(0)); - size_t ioffset = 0; - Kokkos::parallel_scan(Kokkos::RangePolicy(0, numExportPacketsPerLID.extent(0)), KOKKOS_LAMBDA(const size_t j, size_t& offset, bool is_final) { - if(is_final) indicesOffsets(j) = offset; - offset += numExportPacketsPerLID(j); - }, ioffset); - - for (size_t i = 0; i < numBlocks; ++i) { - size_t p = i + procIndex; - if (p > (numBlocks - 1)) { - p -= numBlocks; - } - - if (plan.getProcsTo()[p] != myProcID) { - size_t j = plan.getStartsTo()[p]; - size_t numPacketsTo_p = 0; - //mirror in case execspaces are different - auto sendArrayMirror = Kokkos::create_mirror_view_and_copy(ExpExecSpace(), sendArray); - auto exportsMirror = Kokkos::create_mirror_view_and_copy(ExpExecSpace(), exports); - Kokkos::parallel_scan(Kokkos::RangePolicy(0, plan.getLengthsTo()[p]), KOKKOS_LAMBDA(const size_t k, size_t& offset, bool is_final) { - if(is_final) { - const size_t dst_end = offset + numExportPacketsPerLID(j + k); - const size_t src_end = indicesOffsets(j + k) + numExportPacketsPerLID(j + k); - auto dst_sub = Kokkos::subview(sendArrayMirror, Kokkos::make_pair(offset, dst_end)); - auto src_sub = Kokkos::subview(exportsMirror, Kokkos::make_pair(indicesOffsets(j + k), src_end)); - Kokkos::Experimental::local_deep_copy(dst_sub, src_sub); - } - offset += numExportPacketsPerLID(j + k); - }, numPacketsTo_p); - Kokkos::deep_copy(sendArray, sendArrayMirror); - typename ExpView::execution_space().fence(); - - if (numPacketsTo_p > 0) { - ImpView tmpSend = - subview_offset(sendArray, size_t(0), numPacketsTo_p); - - send (tmpSend, - as (tmpSend.size ()), - plan.getProcsTo()[p], mpiTag_, *plan.getComm()); - } - } - else { // "Sending" the message to myself - selfNum = p; - selfIndex = plan.getStartsTo()[p]; - } - } - - if (plan.hasSelfMessage()) { - //mirror in case execspaces are different - auto importsMirror = Kokkos::create_mirror_view_and_copy(ExpExecSpace(), imports); - auto exportsMirror = Kokkos::create_mirror_view_and_copy(ExpExecSpace(), exports); - size_t temp; - Kokkos::parallel_scan(Kokkos::RangePolicy(0, plan.getLengthsTo()[selfNum]), KOKKOS_LAMBDA(const size_t k, size_t& offset, bool is_final) { - if(is_final) { - const size_t dst_end = selfReceiveOffset + offset + numExportPacketsPerLID(selfIndex + k); - const size_t src_end = indicesOffsets(selfIndex + k) + numExportPacketsPerLID(selfIndex + k); - auto dst_sub = Kokkos::subview(importsMirror, Kokkos::make_pair(selfReceiveOffset + offset, dst_end)); - auto src_sub = Kokkos::subview(exportsMirror, Kokkos::make_pair(indicesOffsets(selfIndex + k), src_end)); - Kokkos::Experimental::local_deep_copy(dst_sub, src_sub); - } - offset += numExportPacketsPerLID(selfIndex + k); - }, temp); - Kokkos::deep_copy(imports, importsMirror); - selfIndex += plan.getLengthsTo()[selfNum]; - selfReceiveOffset += temp; - } - } -} - } } diff --git a/packages/tpetra/core/src/Tpetra_Distributor.hpp b/packages/tpetra/core/src/Tpetra_Distributor.hpp index a8beece8ee9d..c0c31a0f8b54 100644 --- a/packages/tpetra/core/src/Tpetra_Distributor.hpp +++ b/packages/tpetra/core/src/Tpetra_Distributor.hpp @@ -23,7 +23,6 @@ #include "KokkosCompat_View.hpp" #include "Kokkos_Core.hpp" #include "Kokkos_TeuchosCommAdapters.hpp" -#include "Kokkos_StdAlgorithms.hpp" #include #include #include @@ -427,13 +426,6 @@ namespace Tpetra { const ImpView &imports, const Teuchos::ArrayView& numImportPacketsPerLID); - template - typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type - doPostsAndWaitsKokkos (const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID); - /// \brief Post the data for a forward plan, but do not execute the waits yet. /// /// Call this overload when you have the same number of Packets @@ -488,13 +480,6 @@ namespace Tpetra { const Teuchos::ArrayView& numExportPacketsPerLID, const ImpView &imports, const Teuchos::ArrayView& numImportPacketsPerLID); - - template - typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type - doPostsKokkos (const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID); /// \brief Execute the reverse communication plan. /// @@ -516,14 +501,7 @@ namespace Tpetra { const Teuchos::ArrayView& numExportPacketsPerLID, const ImpView &imports, const Teuchos::ArrayView& numImportPacketsPerLID); - - template - typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type - doReversePostsAndWaitsKokkos (const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID); - + /// \brief Post the data for a reverse plan, but do not execute the waits yet. /// /// This method takes the same arguments as the three-argument @@ -544,14 +522,7 @@ namespace Tpetra { const Teuchos::ArrayView& numExportPacketsPerLID, const ImpView &imports, const Teuchos::ArrayView& numImportPacketsPerLID); - - template - typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type - doReversePostsKokkos (const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID); - + //@} //! @name Implementation of Teuchos::Describable //@{ @@ -669,16 +640,6 @@ namespace Tpetra { actor_.doPostsAndWaits(plan_, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); } - template - typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type - Distributor:: - doPostsAndWaitsKokkos (const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID) - { - actor_.doPostsAndWaitsKokkos(plan_, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); - } template typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type @@ -700,17 +661,6 @@ namespace Tpetra { { actor_.doPosts(plan_, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); } - - template - typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type - Distributor:: - doPostsKokkos (const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID) - { - actor_.doPostsKokkos(plan_, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); - } template typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type @@ -735,19 +685,6 @@ namespace Tpetra { numImportPacketsPerLID); doReverseWaits (); } - - template - typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type - Distributor:: - doReversePostsAndWaitsKokkos (const ExpView& exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView& imports, - const ImpPacketsView &numImportPacketsPerLID) - { - doReversePostsKokkos (exports, numExportPacketsPerLID, imports, - numImportPacketsPerLID); - doReverseWaits (); - } template typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type @@ -786,27 +723,7 @@ namespace Tpetra { reverseDistributor_->doPosts (exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); } - - template - typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type - Distributor:: - doReversePostsKokkos (const ExpView &exports, - const ExpPacketsView &numExportPacketsPerLID, - const ImpView &imports, - const ImpPacketsView &numImportPacketsPerLID) - { - // FIXME (mfh 29 Mar 2012) WHY? - TEUCHOS_TEST_FOR_EXCEPTION( - ! plan_.getIndicesTo().is_null(), std::runtime_error, - "Tpetra::Distributor::doReversePosts(3 args): Can only do " - "reverse communication when original data are blocked by process."); - if (reverseDistributor_.is_null ()) { - createReverseDistributor (); - } - reverseDistributor_->doPostsKokkos (exports, numExportPacketsPerLID, - imports, numImportPacketsPerLID); - } - + template void Distributor:: computeSends(const Teuchos::ArrayView& importGIDs,