Skip to content

Commit

Permalink
Split PaRSEC backend ttg.h to make it maintainable.
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph Schuchart <[email protected]>
  • Loading branch information
devreal committed Oct 13, 2022
1 parent 93043ed commit d9a0bef
Show file tree
Hide file tree
Showing 12 changed files with 2,823 additions and 2,707 deletions.
8 changes: 8 additions & 0 deletions ttg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,18 @@ endif(TARGET MADworld)
########################
if (TARGET PaRSEC::parsec)
set(ttg-parsec-headers
${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/comm_helper.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/copy_handler.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/funcs.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/fwd.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/hooks.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/import.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/task.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/tt.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/ttg.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/ttg_data_copy.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/vars.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/world.h
)
find_package(MPI)
set(ttg-parsec-deps "ttg;MPI::MPI_CXX;PaRSEC::parsec")
Expand Down
137 changes: 137 additions & 0 deletions ttg/ttg/parsec/comm_helper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#ifndef TTG_PARSEC_COMM_HELPER_H
#define TTG_PARSEC_COMM_HELPER_H

#include <parsec.h>

#include <map>
#include <mutex>
#include <functional>

#include "ttg/impl_selector.h"

#include "ttg/util/trace.h"
#include "ttg/base/tt.h"
#include "ttg/world.h"

#include "ttg/parsec/world.h"
#include "ttg/parsec/ttg_data_copy.h"

namespace ttg_parsec {
namespace detail {

typedef void (*static_set_arg_fct_type)(void *, size_t, ttg::TTBase *);
typedef std::pair<static_set_arg_fct_type, ttg::TTBase *> static_set_arg_fct_call_t;
inline std::map<uint64_t, static_set_arg_fct_call_t> static_id_to_op_map;
inline std::mutex static_map_mutex;
typedef std::tuple<int, void *, size_t> static_set_arg_fct_arg_t;
inline std::multimap<uint64_t, static_set_arg_fct_arg_t> delayed_unpack_actions;

struct msg_header_t {
typedef enum {
MSG_SET_ARG = 0,
MSG_SET_ARGSTREAM_SIZE = 1,
MSG_FINALIZE_ARGSTREAM_SIZE = 2,
MSG_GET_FROM_PULL =3 } fn_id_t;
uint32_t taskpool_id;
uint64_t op_id;
fn_id_t fn_id;
int32_t param_id;
int num_keys;
};

struct msg_t {
msg_header_t tt_id;
unsigned char bytes[WorldImpl::PARSEC_TTG_MAX_AM_SIZE - sizeof(msg_header_t)];

msg_t() = default;
msg_t(uint64_t tt_id, uint32_t taskpool_id, msg_header_t::fn_id_t fn_id, int32_t param_id, int num_keys = 1)
: tt_id{taskpool_id, tt_id, fn_id, param_id, num_keys} {}
};

inline int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag, void *data, long unsigned int size,
int src_rank, void *obj) {
static_set_arg_fct_type static_set_arg_fct;
parsec_taskpool_t *tp = NULL;
msg_header_t *msg = static_cast<msg_header_t *>(data);
uint64_t op_id = msg->op_id;
tp = parsec_taskpool_lookup(msg->taskpool_id);
assert(NULL != tp);
static_map_mutex.lock();
try {
auto op_pair = static_id_to_op_map.at(op_id);
static_map_mutex.unlock();
tp->tdm.module->incoming_message_start(tp, src_rank, NULL, NULL, 0, NULL);
static_set_arg_fct = op_pair.first;
static_set_arg_fct(data, size, op_pair.second);
tp->tdm.module->incoming_message_end(tp, NULL);
return 0;
} catch (const std::out_of_range &e) {
void *data_cpy = malloc(size);
assert(data_cpy != 0);
memcpy(data_cpy, data, size);
ttg::trace("ttg_parsec(", ttg_default_execution_context().rank(), ") Delaying delivery of message (", src_rank,
", ", op_id, ", ", data_cpy, ", ", size, ")");
delayed_unpack_actions.insert(std::make_pair(op_id, std::make_tuple(src_rank, data_cpy, size)));
static_map_mutex.unlock();
return 1;
}
}

template <typename ActivationT>
inline int get_complete_cb(parsec_comm_engine_t *comm_engine, parsec_ce_mem_reg_handle_t lreg, ptrdiff_t ldispl,
parsec_ce_mem_reg_handle_t rreg, ptrdiff_t rdispl, size_t size, int remote,
void *cb_data) {
parsec_ce.mem_unregister(&lreg);
ActivationT *activation = static_cast<ActivationT *>(cb_data);
if (activation->complete_transfer()) {
delete activation;
}
return PARSEC_SUCCESS;
}

inline int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
int src, void *cb_data) {
std::intptr_t *fn_ptr = static_cast<std::intptr_t *>(msg);
std::function<void(void)> *fn = reinterpret_cast<std::function<void(void)> *>(*fn_ptr);
(*fn)();
delete fn;
return PARSEC_SUCCESS;
}

template <typename FuncT>
inline int invoke_get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
int src, void *cb_data) {
std::intptr_t *iptr = static_cast<std::intptr_t *>(msg);
FuncT *fn_ptr = reinterpret_cast<FuncT *>(*iptr);
(*fn_ptr)();
delete fn_ptr;
return PARSEC_SUCCESS;
}

template <typename KeyT, typename ActivationCallbackT>
class rma_delayed_activate {
std::vector<KeyT> _keylist;
std::atomic<int> _outstanding_transfers;
ActivationCallbackT _cb;
detail::ttg_data_copy_t *_copy;

public:
rma_delayed_activate(std::vector<KeyT> &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
: _keylist(std::move(key)), _outstanding_transfers(num_transfers), _cb(cb), _copy(copy) {}

bool complete_transfer(void) {
int left = --_outstanding_transfers;
if (0 == left) {
_cb(std::move(_keylist), _copy);
return true;
}
return false;
}
};


} // namespace detail

} // namespace ttg_parsec

#endif // TTG_PARSEC_COMM_HELPER_H
95 changes: 95 additions & 0 deletions ttg/ttg/parsec/copy_handler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#ifndef TTG_PARSEC_COPY_HANDLER_H
#define TTG_PARSEC_COPY_HANDLER_H


#include "ttg/runtimes.h"
#include "ttg/func.h"

#include "ttg/parsec/ttg_data_copy.h"
#include "ttg/parsec/vars.h"


/**
* The PaRSEC backend tracks data copies so we make a copy of the data
* if the data is not being tracked yet or if the data is not const, i.e.,
* the user may mutate the data after it was passed to send/broadcast.
*/
template <>
struct ttg::detail::value_copy_handler<ttg::Runtime::PaRSEC> {
private:
ttg_parsec::detail::ttg_data_copy_t *copy_to_remove = nullptr;

public:
~value_copy_handler() {
if (nullptr != copy_to_remove) {
ttg_parsec::detail::remove_data_copy(copy_to_remove, ttg_parsec::detail::parsec_ttg_caller);
ttg_parsec::detail::release_data_copy(copy_to_remove);
}
}

template <typename Value>
inline Value &&operator()(Value &&value) {
if (nullptr == ttg_parsec::detail::parsec_ttg_caller) {
ttg::print("ERROR: ttg_send or ttg_broadcast called outside of a task!\n");
}
ttg_parsec::detail::ttg_data_copy_t *copy;
copy = ttg_parsec::detail::find_copy_in_task(ttg_parsec::detail::parsec_ttg_caller, &value);
Value *value_ptr = &value;
if (nullptr == copy) {
/**
* the value is not known, create a copy that we can track
* depending on Value, this uses either the copy or move constructor
*/
copy = ttg_parsec::detail::create_new_datacopy(std::forward<Value>(value));
bool inserted = ttg_parsec::detail::add_copy_to_task(copy, ttg_parsec::detail::parsec_ttg_caller);
assert(inserted);
value_ptr = reinterpret_cast<Value *>(copy->device_private);
copy_to_remove = copy;
} else {
/* this copy won't be modified anymore so mark it as read-only */
copy->reset_readers();
}
return std::move(*value_ptr);
}

template <typename Value>
inline const Value &operator()(const Value &value) {
if (nullptr == ttg_parsec::detail::parsec_ttg_caller) {
ttg::print("ERROR: ttg_send or ttg_broadcast called outside of a task!\n");
}
ttg_parsec::detail::ttg_data_copy_t *copy;
copy = ttg_parsec::detail::find_copy_in_task(ttg_parsec::detail::parsec_ttg_caller, &value);
const Value *value_ptr = &value;
if (nullptr == copy) {
/**
* the value is not known, create a copy that we can track
* depending on Value, this uses either the copy or move constructor
*/
copy = ttg_parsec::detail::create_new_datacopy(value);
bool inserted = ttg_parsec::detail::add_copy_to_task(copy, ttg_parsec::detail::parsec_ttg_caller);
assert(inserted);
value_ptr = reinterpret_cast<Value *>(copy->device_private);
copy_to_remove = copy;
}
return *value_ptr;
}

/* we have to make a copy of non-const data as the user may modify it after
* send/broadcast */
template <typename Value, typename Enabler = std::enable_if_t<!std::is_const_v<Value>>>
inline Value &operator()(Value &value) {
if (nullptr == ttg_parsec::detail::parsec_ttg_caller) {
ttg::print("ERROR: ttg_send or ttg_broadcast called outside of a task!\n");
}
/* the value is not known, create a copy that we can track */
ttg_parsec::detail::ttg_data_copy_t *copy;
copy = ttg_parsec::detail::create_new_datacopy(value);
bool inserted = ttg_parsec::detail::add_copy_to_task(copy, ttg_parsec::detail::parsec_ttg_caller);
assert(inserted);
Value *value_ptr = reinterpret_cast<Value *>(copy->device_private);
copy_to_remove = copy;
return *value_ptr;
}
};

#endif // TTG_PARSEC_COPY_HANDLER_H
91 changes: 91 additions & 0 deletions ttg/ttg/parsec/funcs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#ifndef TTG_PARSEC_FUNCS_H
#define TTG_PARSEC_FUNCS_H

#include "ttg/impl_selector.h"

#include <parsec.h>

#include "ttg/base/tt.h"
#include "ttg/util/trace.h"
#include "ttg/util/env.h"
#include "ttg/serialization.h"

#include "ttg/parsec/fwd.h"
#include "ttg/parsec/vars.h"
#include "ttg/parsec/world.h"

namespace ttg_parsec {

template <typename... RestOfArgs>
inline void ttg_initialize(int argc, char **argv, int num_threads, RestOfArgs &&...) {
int provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);

if (num_threads < 1) num_threads = ttg::detail::num_threads();
auto world_ptr = new ttg_parsec::WorldImpl{&argc, &argv, num_threads};
std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
ttg::World world{std::move(world_sptr)};
ttg::detail::set_default_world(std::move(world));
}
inline void ttg_finalize() {
ttg::detail::set_default_world(ttg::World{}); // reset the default world
ttg::detail::destroy_worlds<ttg_parsec::WorldImpl>();
MPI_Finalize();
}
inline ttg::World ttg_default_execution_context() { return ttg::get_default_world(); }
inline void ttg_abort() { MPI_Abort(ttg_default_execution_context().impl().comm(), 1); }
inline void ttg_execute(ttg::World world) { world.impl().execute(); }
inline void ttg_fence(ttg::World world) { world.impl().fence(); }

template <typename T>
inline void ttg_register_ptr(ttg::World world, const std::shared_ptr<T> &ptr) {
world.impl().register_ptr(ptr);
}

template <typename T>
inline void ttg_register_ptr(ttg::World world, std::unique_ptr<T> &&ptr) {
world.impl().register_ptr(std::move(ptr));
}

inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
world.impl().register_status(status_ptr);
}

template <typename Callback>
inline void ttg_register_callback(ttg::World world, Callback &&callback) {
world.impl().register_callback(std::forward<Callback>(callback));
}

inline ttg::Edge<> &ttg_ctl_edge(ttg::World world) { return world.impl().ctl_edge(); }

inline void ttg_sum(ttg::World world, double &value) {
double result = 0.0;
MPI_Allreduce(&value, &result, 1, MPI_DOUBLE, MPI_SUM, world.impl().comm());
value = result;
}

/// broadcast
/// @tparam T a serializable type
template <typename T>
void ttg_broadcast(::ttg::World world, T &data, int source_rank) {
int64_t BUFLEN;
if (world.rank() == source_rank) {
BUFLEN = ttg::default_data_descriptor<T>::payload_size(&data);
}
MPI_Bcast(&BUFLEN, 1, MPI_INT64_T, source_rank, world.impl().comm());

unsigned char *buf = new unsigned char[BUFLEN];
if (world.rank() == source_rank) {
ttg::default_data_descriptor<T>::pack_payload(&data, BUFLEN, 0, buf);
}
MPI_Bcast(buf, BUFLEN, MPI_UNSIGNED_CHAR, source_rank, world.impl().comm());
if (world.rank() != source_rank) {
ttg::default_data_descriptor<T>::unpack_payload(&data, BUFLEN, 0, buf);
}
delete[] buf;
}


} // namespace ttg_parsec

#endif // TTG_PARSEC_FUNCS_H
11 changes: 11 additions & 0 deletions ttg/ttg/parsec/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,20 @@
#include "ttg/util/typelist.h"

#include <future>
#include <parsec.h>

namespace ttg_parsec {


namespace detail {
inline int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag, void *data, long unsigned int size,
int src_rank, void *obj);

inline int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
int src, void *cb_data);
}


template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs = ttg::typelist<>>
class TT;

Expand Down
Loading

0 comments on commit d9a0bef

Please sign in to comment.