Skip to content

Commit

Permalink
Merge Pull Request #9605 from trilinos/Trilinos/tasmit/distobject-loc…
Browse files Browse the repository at this point in the history
…al-actor

Automatically Merged using Trilinos Pull Request AutoTester
PR Title: Tpetra:  Make DistObject use a local DistributorActor
PR Author: tasmith4
  • Loading branch information
trilinos-autotester authored Aug 27, 2021
2 parents c76c189 + e0f107c commit 1b009a4
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 103 deletions.
13 changes: 5 additions & 8 deletions packages/tpetra/core/src/Tpetra_DistObject_decl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
/// \file Tpetra_DistObject_decl.hpp
/// \brief Declaration of the Tpetra::DistObject class

#include "Tpetra_Details_DistributorActor.hpp"
#include "Tpetra_Map.hpp"
#include "Tpetra_Import.hpp"
#include "Tpetra_Export.hpp"
Expand Down Expand Up @@ -783,25 +784,19 @@ namespace Tpetra {
const CombineMode CM,
const bool restrictedMode);

void doPosts(Distributor& distor,
void doPosts(const Details::DistributorPlan& distributorPlan,
size_t constantNumPackets,
bool commOnHost,
ReverseOption revOp,
std::shared_ptr<std::string> prefix,
const bool canTryAliasing,
const CombineMode CM);

void doWaits(Distributor& distor,
ReverseOption revOp);

void doPackAndPrepare(const SrcDistObject& src,
const Kokkos::DualView<const local_ordinal_type*, buffer_device_type>& exportLIDs,
size_t& constantNumPackets,
Distributor& distor);
size_t& constantNumPackets);

void doUnpackAndCombine(const Kokkos::DualView<const local_ordinal_type*, buffer_device_type>& remoteLIDs,
size_t constantNumPackets,
Distributor& distor,
CombineMode CM);

/// \name Methods implemented by subclasses and used by doTransfer().
Expand Down Expand Up @@ -1038,6 +1033,8 @@ namespace Tpetra {
private:
using this_type = DistObject<Packet, LocalOrdinal, GlobalOrdinal, Node>;

Details::DistributorActor distributorActor_;

#ifdef HAVE_TPETRA_TRANSFER_TIMERS
Teuchos::RCP<Teuchos::Time> doXferTimer_;
Teuchos::RCP<Teuchos::Time> copyAndPermuteTimer_;
Expand Down
131 changes: 36 additions & 95 deletions packages/tpetra/core/src/Tpetra_DistObject_def.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ namespace Tpetra {

const size_t numSameIDs = transfer.getNumSameIDs ();
Distributor& distor = transfer.getDistributor ();
const Details::DistributorPlan& distributorPlan = (revOp == DoForward) ? distor.getPlan() : *distor.getPlan().getReversePlan();

TEUCHOS_TEST_FOR_EXCEPTION
(debug && restrictedMode &&
Expand Down Expand Up @@ -1049,7 +1050,7 @@ namespace Tpetra {
std::cerr << os.str ();
}

doPackAndPrepare(src, exportLIDs, constantNumPackets, distor);
doPackAndPrepare(src, exportLIDs, constantNumPackets);
if (commOnHost) {
this->exports_.sync_host();
}
Expand Down Expand Up @@ -1122,7 +1123,7 @@ namespace Tpetra {
std::cerr << os.str ();
}

doPosts(distor, constantNumPackets, commOnHost, revOp, prefix, canTryAliasing, CM);
doPosts(distributorPlan, constantNumPackets, commOnHost, prefix, canTryAliasing, CM);
} // if (needCommunication)
} // if (CM != ZERO)
}
Expand Down Expand Up @@ -1241,6 +1242,7 @@ namespace Tpetra {
}

Distributor& distor = transfer.getDistributor ();
const Details::DistributorPlan& distributorPlan = (revOp == DoForward) ? distor.getPlan() : *distor.getPlan().getReversePlan();

TEUCHOS_TEST_FOR_EXCEPTION
(debug && restrictedMode &&
Expand Down Expand Up @@ -1319,14 +1321,14 @@ namespace Tpetra {
}
}
else {
doWaits(distor, revOp);
distributorActor_.doWaits(distributorPlan);

if (verbose) {
std::ostringstream os;
os << *prefix << "8. unpackAndCombine" << endl;
std::cerr << os.str ();
}
doUnpackAndCombine(remoteLIDs, constantNumPackets, distor, CM);
doUnpackAndCombine(remoteLIDs, constantNumPackets, CM);
} // if (needCommunication)
} // if (CM != ZERO)

Expand All @@ -1346,10 +1348,9 @@ namespace Tpetra {
template <class Packet, class LocalOrdinal, class GlobalOrdinal, class Node>
void
DistObject<Packet, LocalOrdinal, GlobalOrdinal, Node>::
doPosts(Distributor& distor,
doPosts(const Details::DistributorPlan& distributorPlan,
size_t constantNumPackets,
bool commOnHost,
ReverseOption revOp,
std::shared_ptr<std::string> prefix,
const bool canTryAliasing,
const CombineMode CM)
Expand Down Expand Up @@ -1385,17 +1386,11 @@ namespace Tpetra {
// MPI communication happens here.
if (verbose) {
std::ostringstream os;
os << *prefix << "Call do"
<< (revOp == DoReverse ? "Reverse" : "") << "PostsAndWaits"
os << *prefix << "Call doPostsAndWaits"
<< endl;
std::cerr << os.str ();
}
if (revOp == DoReverse) {
distor.doReversePostsAndWaits (numExp_h, 1, numImp_h);
}
else {
distor.doPostsAndWaits (numExp_h, 1, numImp_h);
}
distributorActor_.doPostsAndWaits(distributorPlan, numExp_h, 1, numImp_h);

if (verbose) {
std::ostringstream os;
Expand All @@ -1416,17 +1411,11 @@ namespace Tpetra {
// MPI communication happens here.
if (verbose) {
std::ostringstream os;
os << *prefix << "Call do"
<< (revOp == DoReverse ? "Reverse" : "") << "PostsAndWaits"
os << *prefix << "Call doPostsAndWaits"
<< endl;
std::cerr << os.str ();
}
if (revOp == DoReverse) {
distor.doReversePostsAndWaits (numExp_d, 1, numImp_d);
}
else {
distor.doPostsAndWaits (numExp_d, 1, numImp_d);
}
distributorActor_.doPostsAndWaits(distributorPlan, numExp_d, 1, numImp_d);

if (verbose) {
std::ostringstream os;
Expand Down Expand Up @@ -1479,45 +1468,28 @@ namespace Tpetra {
std::ostringstream os;
os << *prefix << "Comm on "
<< (commOnHost ? "host" : "device")
<< "; call do" << (revOp == DoReverse ? "Reverse" : "")
<< "PostsAndWaits" << endl;
<< "; call doPosts" << endl;
std::cerr << os.str ();
}

if (commOnHost) {
this->imports_.modify_host ();
if (revOp == DoReverse) {
distor.doReversePosts
(create_const_view (this->exports_.view_host ()),
numExportPacketsPerLID_av,
this->imports_.view_host (),
numImportPacketsPerLID_av);
}
else {
distor.doPosts
(create_const_view (this->exports_.view_host ()),
numExportPacketsPerLID_av,
this->imports_.view_host (),
numImportPacketsPerLID_av);
}
distributorActor_.doPosts
(distributorPlan,
create_const_view (this->exports_.view_host ()),
numExportPacketsPerLID_av,
this->imports_.view_host (),
numImportPacketsPerLID_av);
}
else { // pack on device
Kokkos::fence(); // for UVM
this->imports_.modify_device ();
if (revOp == DoReverse) {
distor.doReversePosts
(create_const_view (this->exports_.view_device ()),
numExportPacketsPerLID_av,
this->imports_.view_device (),
numImportPacketsPerLID_av);
}
else {
distor.doPosts
(create_const_view (this->exports_.view_device ()),
numExportPacketsPerLID_av,
this->imports_.view_device (),
numImportPacketsPerLID_av);
}
distributorActor_.doPosts
(distributorPlan,
create_const_view (this->exports_.view_device ()),
numExportPacketsPerLID_av,
this->imports_.view_device (),
numImportPacketsPerLID_av);
}
}
else { // constant number of packets per LID
Expand All @@ -1543,65 +1515,35 @@ namespace Tpetra {
std::ostringstream os;
os << *prefix << "7.2. Comm on "
<< (commOnHost ? "host" : "device")
<< "; call do" << (revOp == DoReverse ? "Reverse" : "")
<< "PostsAndWaits" << endl;
<< "; call doPosts" << endl;
std::cerr << os.str ();
}
if (commOnHost) {
this->imports_.modify_host ();
if (revOp == DoReverse) {
distor.doReversePosts
(create_const_view (this->exports_.view_host ()),
constantNumPackets,
this->imports_.view_host ());
}
else {
distor.doPosts
(create_const_view (this->exports_.view_host ()),
constantNumPackets,
this->imports_.view_host ());
}
distributorActor_.doPosts
(distributorPlan,
create_const_view (this->exports_.view_host ()),
constantNumPackets,
this->imports_.view_host ());
}
else { // pack on device
Kokkos::fence(); // for UVM
this->imports_.modify_device ();
if (revOp == DoReverse) {
distor.doReversePosts
(create_const_view (this->exports_.view_device ()),
constantNumPackets,
this->imports_.view_device ());
}
else {
distor.doPosts
(create_const_view (this->exports_.view_device ()),
constantNumPackets,
this->imports_.view_device ());
}
distributorActor_.doPosts
(distributorPlan,
create_const_view (this->exports_.view_device ()),
constantNumPackets,
this->imports_.view_device ());
} // commOnHost
} // constant or variable num packets per LID
}

template <class Packet, class LocalOrdinal, class GlobalOrdinal, class Node>
void
DistObject<Packet, LocalOrdinal, GlobalOrdinal, Node>::
doWaits(Distributor& distor,
ReverseOption revOp)
{
if (revOp == DoReverse) {
distor.doReverseWaits();
}
else {
distor.doWaits();
}
}

template <class Packet, class LocalOrdinal, class GlobalOrdinal, class Node>
void
DistObject<Packet, LocalOrdinal, GlobalOrdinal, Node>::
doPackAndPrepare(const SrcDistObject& src,
const Kokkos::DualView<const local_ordinal_type*, buffer_device_type>& exportLIDs,
size_t& constantNumPackets,
Distributor& distor)
size_t& constantNumPackets)
{
using Details::ProfilingRegion;
using std::endl;
Expand Down Expand Up @@ -1668,7 +1610,6 @@ namespace Tpetra {
DistObject<Packet, LocalOrdinal, GlobalOrdinal, Node>::
doUnpackAndCombine(const Kokkos::DualView<const local_ordinal_type*, buffer_device_type>& remoteLIDs,
size_t constantNumPackets,
Distributor& distor,
CombineMode CM)
{
using Details::ProfilingRegion;
Expand Down
5 changes: 5 additions & 0 deletions packages/tpetra/core/src/Tpetra_Distributor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,11 @@ namespace Tpetra {
Teuchos::Describable::verbLevel_default) const;
//@}

/// \brief Get this Distributor's DistributorPlan
///
/// FIXME: Delete this method when it's no longer needed for non-blocking
/// communication in the DistObject
const Details::DistributorPlan& getPlan() const { return plan_; }
private:
Details::DistributorPlan plan_;
Details::DistributorActor actor_;
Expand Down

0 comments on commit 1b009a4

Please sign in to comment.