Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tpetra: Use a consistent MPI tag for DistributorActor #10675

Merged
merged 8 commits into from
Jul 12, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,17 @@
namespace Tpetra {
namespace Details {

DistributorActor::DistributorActor() {
DistributorActor::DistributorActor()
: mpiTag_(DEFAULT_MPI_TAG)
{
#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
makeTimers();
#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
}

DistributorActor::DistributorActor(const DistributorActor& otherActor)
: requests_(otherActor.requests_)
: mpiTag_(otherActor.mpiTag_),
requests_(otherActor.requests_)
{
#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
makeTimers();
Expand Down
40 changes: 13 additions & 27 deletions packages/tpetra/core/src/Tpetra_Details_DistributorActor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,16 @@
#include "Teuchos_RCP.hpp"
#include "Teuchos_Time.hpp"

#include "Kokkos_TeuchosCommAdapters.hpp"

namespace Tpetra {
namespace Details {

template <class View1, class View2>
constexpr bool areKokkosViews = Kokkos::is_view<View1>::value && Kokkos::is_view<View2>::value;

class DistributorActor {
static constexpr int DEFAULT_MPI_TAG = 1;

public:
DistributorActor();
Expand Down Expand Up @@ -91,6 +94,8 @@ class DistributorActor {
bool isReady() const;

private:
int mpiTag_;

Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requests_;

#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
Expand Down Expand Up @@ -191,14 +196,6 @@ void DistributorActor::doPosts(const DistributorPlan& plan,

size_t selfReceiveOffset = 0;

// MPI tag for nonblocking receives and blocking sends in this
// method. Some processes might take the "fast" path
// (getIndicesTo().is_null()) and others might take the "slow" path for
// the same doPosts() call, so the path tag must be the same for
// both.
const int pathTag = 0;
const int tag = plan.getTag(pathTag);

#ifdef HAVE_TPETRA_DEBUG
TEUCHOS_TEST_FOR_EXCEPTION
(requests_.size () != 0,
Expand Down Expand Up @@ -255,7 +252,7 @@ void DistributorActor::doPosts(const DistributorPlan& plan,
imports_view_type recvBuf =
subview_offset (imports, curBufferOffset, curBufLen);
requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
tag, *plan.getComm()));
mpiTag_, *plan.getComm()));
}
else { // Receiving from myself
selfReceiveOffset = curBufferOffset; // Remember the self-recv offset
Expand Down Expand Up @@ -308,12 +305,12 @@ void DistributorActor::doPosts(const DistributorPlan& plan,
subview_offset (exports, plan.getStartsTo()[p] * numPackets,
plan.getLengthsTo()[p] * numPackets);
requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
tag, *plan.getComm()));
mpiTag_, *plan.getComm()));
}
else { // DISTRIBUTOR_SEND
send<int> (tmpSend,
as<int> (tmpSend.size ()),
plan.getProcsTo()[p], tag, *plan.getComm());
plan.getProcsTo()[p], mpiTag_, *plan.getComm());
}
}
else { // "Sending" the message to myself
Expand Down Expand Up @@ -387,7 +384,7 @@ void DistributorActor::doPosts(const DistributorPlan& plan,

send<int> (tmpSend,
as<int> (tmpSend.size ()),
plan.getProcsTo()[p], tag, *plan.getComm());
plan.getProcsTo()[p], mpiTag_, *plan.getComm());
}
else { // "Sending" the message to myself
selfNum = p;
Expand Down Expand Up @@ -467,17 +464,6 @@ void DistributorActor::doPosts(const DistributorPlan& plan,
"enough entries to hold the expected number of import packets. "
"imports.extent(0) = " << imports.extent (0) << " < "
"totalNumImportPackets = " << totalNumImportPackets << ".");
#endif // HAVE_TPETRA_DEBUG

// MPI tag for nonblocking receives and blocking sends in this
// method. Some processes might take the "fast" path
// (plan.getIndicesTo().is_null()) and others might take the "slow" path for
// the same doPosts() call, so the path tag must be the same for
// both.
const int pathTag = 1;
const int tag = plan.getTag(pathTag);

#ifdef HAVE_TPETRA_DEBUG
TEUCHOS_TEST_FOR_EXCEPTION
(requests_.size () != 0, std::logic_error, "Tpetra::Distributor::"
"doPosts(4 args, Kokkos): Process " << myProcID << ": requests_.size () = "
Expand Down Expand Up @@ -530,7 +516,7 @@ void DistributorActor::doPosts(const DistributorPlan& plan,
imports_view_type recvBuf =
subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
tag, *plan.getComm()));
mpiTag_, *plan.getComm()));
}
else { // Receiving these packet(s) from myself
selfReceiveOffset = curBufferOffset; // Remember the offset
Expand Down Expand Up @@ -594,12 +580,12 @@ void DistributorActor::doPosts(const DistributorPlan& plan,
exports_view_type tmpSendBuf =
subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
tag, *plan.getComm()));
mpiTag_, *plan.getComm()));
}
else { // DISTRIBUTOR_SEND
send<int> (tmpSend,
as<int> (tmpSend.size ()),
plan.getProcsTo()[p], tag, *plan.getComm());
plan.getProcsTo()[p], mpiTag_, *plan.getComm());
}
}
else { // "Sending" the message to myself
Expand Down Expand Up @@ -676,7 +662,7 @@ void DistributorActor::doPosts(const DistributorPlan& plan,

send<int> (tmpSend,
as<int> (tmpSend.size ()),
plan.getProcsTo()[p], tag, *plan.getComm());
plan.getProcsTo()[p], mpiTag_, *plan.getComm());
}
}
else { // "Sending" the message to myself
Expand Down
15 changes: 3 additions & 12 deletions packages/tpetra/core/src/Tpetra_Details_DistributorPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ DistributorPlan::DistributorPlan(Teuchos::RCP<const Teuchos::Comm<int>> comm)
howInitialized_(DISTRIBUTOR_NOT_INITIALIZED),
reversePlan_(Teuchos::null),
sendType_(DISTRIBUTOR_SEND),
useDistinctTags_(useDistinctTags_default),
sendMessageToSelf_(false),
numSendsToOtherProcs_(0),
maxSendLength_(0),
Expand All @@ -99,7 +98,6 @@ DistributorPlan::DistributorPlan(const DistributorPlan& otherPlan)
howInitialized_(DISTRIBUTOR_INITIALIZED_BY_COPY),
reversePlan_(otherPlan.reversePlan_),
sendType_(otherPlan.sendType_),
useDistinctTags_(otherPlan.useDistinctTags_),
sendMessageToSelf_(otherPlan.sendMessageToSelf_),
numSendsToOtherProcs_(otherPlan.numSendsToOtherProcs_),
procIdsToSendTo_(otherPlan.procIdsToSendTo_),
Expand All @@ -115,10 +113,6 @@ DistributorPlan::DistributorPlan(const DistributorPlan& otherPlan)
indicesFrom_(otherPlan.indicesFrom_)
{ }

int DistributorPlan::getTag(const int pathTag) const {
return useDistinctTags_ ? pathTag : comm_->getTag();
}

size_t DistributorPlan::createFromSends(const Teuchos::ArrayView<const int>& exportProcIDs) {
using Teuchos::outArg;
using Teuchos::REDUCE_MAX;
Expand Down Expand Up @@ -620,7 +614,6 @@ void DistributorPlan::createReversePlan() const
reversePlan_->procsFrom_ = procIdsToSendTo_;
reversePlan_->startsFrom_ = startsTo_;
reversePlan_->indicesFrom_ = indicesTo_;
reversePlan_->useDistinctTags_ = useDistinctTags_;
}

void DistributorPlan::computeReceives()
Expand All @@ -643,9 +636,7 @@ void DistributorPlan::computeReceives()
const int myRank = comm_->getRank();
const int numProcs = comm_->getSize();

// MPI tag for nonblocking receives and blocking sends in this method.
const int pathTag = 2;
const int tag = getTag(pathTag);
const int mpiTag = DEFAULT_MPI_TAG;

// toProcsFromMe[i] == the number of messages sent by this process
// to process i. The data in numSendsToOtherProcs_, procIdsToSendTo_, and lengthsTo_
Expand Down Expand Up @@ -787,7 +778,7 @@ void DistributorPlan::computeReceives()
lengthsFromBuffers[i].resize (1);
lengthsFromBuffers[i][0] = as<size_t> (0);
requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
tag, *comm_);
mpiTag, *comm_);
}

// Post the sends: Tell each process to which we are sending how
Expand All @@ -804,7 +795,7 @@ void DistributorPlan::computeReceives()
// this communication pattern will send that process
// lengthsTo_[i] blocks of packets.
const size_t* const lengthsTo_i = &lengthsTo_[i];
send<int, size_t> (lengthsTo_i, 1, as<int> (procIdsToSendTo_[i]), tag, *comm_);
send<int, size_t> (lengthsTo_i, 1, as<int> (procIdsToSendTo_[i]), mpiTag, *comm_);
}
else {
// We don't need a send in the self-message case. If this
Expand Down
26 changes: 2 additions & 24 deletions packages/tpetra/core/src/Tpetra_Details_DistributorPlan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@
namespace Tpetra {
namespace Details {

namespace {
const bool useDistinctTags_default = true;
}

/// \brief The type of MPI send that Distributor should use.
///
/// This is an implementation detail of Distributor. Please do
Expand Down Expand Up @@ -100,16 +96,12 @@ DistributorHowInitializedEnumToString (EDistributorHowInitialized how);
/// sends, this prevents deadlock, even if MPI_Send blocks and
/// does not buffer.)
class DistributorPlan : public Teuchos::ParameterListAcceptorDefaultBase {
static constexpr int DEFAULT_MPI_TAG = 0;

public:
DistributorPlan(Teuchos::RCP<const Teuchos::Comm<int>> comm);
DistributorPlan(const DistributorPlan& otherPlan);

//! Get the tag to use for receives and sends.
///
/// See useDistinctTags_. This is called in doPosts() (both
/// variants) and computeReceives().
int getTag(const int pathTag) const;

size_t createFromSends(const Teuchos::ArrayView<const int>& exportProcIDs);
void createFromRecvs(const Teuchos::ArrayView<const int>& remoteProcIDs);
void createFromSendsAndRecvs(const Teuchos::ArrayView<const int>& exportProcIDs,
Expand All @@ -121,7 +113,6 @@ class DistributorPlan : public Teuchos::ParameterListAcceptorDefaultBase {

Teuchos::RCP<const Teuchos::Comm<int>> getComm() const { return comm_; }
EDistributorSendType getSendType() const { return sendType_; }
bool useDistinctTags() const { return useDistinctTags_; }
size_t getNumReceives() const { return numReceives_; }
size_t getNumSends() const { return numSendsToOtherProcs_; }
bool hasSelfMessage() const { return sendMessageToSelf_; }
Expand Down Expand Up @@ -159,19 +150,6 @@ class DistributorPlan : public Teuchos::ParameterListAcceptorDefaultBase {
//! @name Parameters read in from the Teuchos::ParameterList
//@{
EDistributorSendType sendType_;

/// \brief Whether to use different tags for different code paths.
///
/// There are currently three code paths in Distributor that post
/// receives and sends:
///
/// 1. Three-argument variant of doPosts()
/// 2. Four-argument variant of doPosts()
/// 3. computeReceives()
///
/// If this option is true, Distributor will use a distinct
/// message tag for each of these paths.
bool useDistinctTags_;
//@}

bool sendMessageToSelf_;
Expand Down