From dc08fcfde62c535063e4596908e6c9935c24c958 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Thu, 5 Sep 2024 09:43:59 -0400 Subject: [PATCH 1/8] Towards unified device & host tasks We now template the `ttg::device::Task` on the `ExecutionSpace` so that we can determine whether it's a host or device task based on the space. We can then optimize away the select and kernel-wait suspension points. We could remove the send suspension point but we use coroutines for storing the final sends anyway and we don't have access to the task return type in `ttg::device::send()`. This allows tasks to be written once for both host and device without duplicating much of the code. Host tasks that are not coroutines will continue to be supported. Signed-off-by: Joseph Schuchart --- examples/CMakeLists.txt | 20 +- examples/potrf/potrf.h | 107 +++++---- examples/task-benchmarks/chain-ttg-dev.cc | 25 +- examples/task-benchmarks/chain-ttg.cc | 267 ++++++++++++++++++++++ tests/unit/cuda_kernel.cu | 5 +- tests/unit/cuda_kernel.h | 3 +- tests/unit/device_coro.cc | 143 ++++++------ ttg/ttg/coroutine.h | 1 + ttg/ttg/device/task.h | 186 ++++++--------- ttg/ttg/madness/ttg.h | 10 +- ttg/ttg/make_tt.h | 89 ++++---- ttg/ttg/parsec/ttg.h | 118 +++++----- ttg/ttg/tt.h | 46 ++-- 13 files changed, 646 insertions(+), 374 deletions(-) create mode 100644 examples/task-benchmarks/chain-ttg.cc diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index c34e3c07e..a6c97183a 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -15,6 +15,9 @@ if (TARGET tiledarray) COMPILE_DEFINITIONS BLOCK_SPARSE_GEMM=1;BTAS_TARGET_MAX_INDEX_RANK=2) add_ttg_executable(testing_dpotrf potrf/testing_dpotrf.cc LINK_LIBRARIES tiledarray lapackpp) + add_ttg_executable(testing_dpotrf_host potrf/testing_dpotrf.cc + LINK_LIBRARIES tiledarray lapackpp + COMPILE_DEFINITIONS TTG_ENABLE_DEV_HOST=1) add_ttg_executable(testing_dtrtri potrf/testing_dtrtri.cc LINK_LIBRARIES tiledarray lapackpp) add_ttg_executable(testing_dlauum potrf/testing_dlauum.cc LINK_LIBRARIES tiledarray lapackpp) add_ttg_executable(testing_dpoinv potrf/testing_dpoinv.cc LINK_LIBRARIES tiledarray lapackpp) @@ -50,14 +53,27 @@ if (TARGET tiledarray) endif() if (TTG_HAVE_CUDA) - add_ttg_executable(chain-ttg-cuda task-benchmarks/chain-ttg-dev.cc LINK_LIBRARIES tiledarray RUNTIMES "parsec") + add_ttg_executable(chain-ttg-dev-cuda task-benchmarks/chain-ttg-dev.cc + COMPILE_DEFINITIONS CHAIN_CUDA=1 + LINK_LIBRARIES tiledarray + RUNTIMES "parsec") endif(TTG_HAVE_CUDA) if (TTG_HAVE_HIP) - add_ttg_executable(chain-ttg-hip task-benchmarks/chain-ttg-dev.cc LINK_LIBRARIES tiledarray RUNTIMES "parsec") + add_ttg_executable(chain-ttg-dev-hip task-benchmarks/chain-ttg-dev.cc + COMPILE_DEFINITIONS CHAIN_HIP=1 + LINK_LIBRARIES tiledarray + RUNTIMES "parsec") endif(TTG_HAVE_HIP) endif() +add_ttg_executable(chain-ttg-host task-benchmarks/chain-ttg.cc) + +add_ttg_executable(chain-ttg-dev-host task-benchmarks/chain-ttg-dev.cc + COMPILE_DEFINITIONS CHAIN_HOST=1 + LINK_LIBRARIES tiledarray + RUNTIMES "parsec") + if (TARGET MADworld) add_ttg_executable(madness-1d madness/madness-1d/madness-1d.cc RUNTIMES "mad") if (TARGET blaspp) #(CBLAS_FOUND AND MKL_FOUND) diff --git a/examples/potrf/potrf.h b/examples/potrf/potrf.h index d3a92cb2f..894064b54 100644 --- a/examples/potrf/potrf.h +++ b/examples/potrf/potrf.h @@ -7,22 +7,19 @@ #include "util.h" #include "../devblas_helper.h" -#if (defined(TTG_ENABLE_CUDA) || defined(TTG_ENABLE_HIP)) +#if (defined(TTG_ENABLE_CUDA) || defined(TTG_ENABLE_HIP) || defined(TTG_ENABLE_DEV_HOST)) #define ENABLE_DEVICE_KERNEL 1 #endif #if defined(TTG_HAVE_CUDART) #define ES ttg::ExecutionSpace::CUDA -#define TASKRET -> ttg::device::Task #include #elif defined(TTG_ENABLE_HIP) #define ES ttg::ExecutionSpace::HIP -#define TASKRET -> ttg::device::Task #include #include #else #define ES ttg::ExecutionSpace::Host -#define TASKRET -> void #endif namespace potrf { @@ -35,21 +32,21 @@ namespace potrf { #if defined(ENABLE_DEVICE_KERNEL) static int device_potrf_workspace_size(MatrixTile &A) { int Lwork; - #if defined(TTG_ENABLE_CUDA) +#if defined(TTG_ENABLE_CUDA) cusolverDnDpotrf_bufferSize(cusolver_handle(), CUBLAS_FILL_MODE_LOWER, A.cols(), nullptr, A.lda(), &Lwork); return Lwork; - #elif defined(TTG_ENABLE_HIP) +#elif defined(TTG_ENABLE_HIP) hipsolverDnDpotrf_bufferSize(hipsolver_handle(), HIPSOLVER_FILL_MODE_LOWER, A.cols(), nullptr, A.lda(), &Lwork); return Lwork; - #else +#else return 0; - #endif +#endif } static void device_potrf(MatrixTile &A, double *workspace, int Lwork, int *devInfo) { @@ -64,13 +61,16 @@ namespace potrf { A.buffer().current_device_ptr(), A.lda(), workspace, Lwork, devInfo); - #elif defined(TTG_ENABLE_HIP) +#elif defined(TTG_ENABLE_HIP) hipsolverDpotrf(hipsolver_handle(), HIPSOLVER_FILL_MODE_LOWER, A.cols(), A.buffer().current_device_ptr(), A.lda(), workspace, Lwork, devInfo); - #endif +#else + auto info = lapack::potrf(lapack::Uplo::Lower, A.rows(), A.buffer().current_device_ptr(), A.lda()); + assert(info == 0); +#endif } static void device_norm(const MatrixTile &A, double *norm) { @@ -81,9 +81,11 @@ namespace potrf { auto handle = cublas_handle(); //double n = 1.0; cublasDnrm2(handle, size, buffer, 1, norm); - #elif defined(TTG_ENABLE_HIP) +#elif defined(TTG_ENABLE_HIP) hipblasDnrm2(hipblas_handle(), size, buffer, 1, norm); - #endif +#else + *norm = blas::nrm2(size, buffer, 1); +#endif } #endif // ENABLE_DEVICE_KERNEL @@ -99,7 +101,8 @@ namespace potrf { //std::cout << "Creating CUDA POTRF task " << std::endl; auto f_dev = [=, iallocator = std::move(iallocator)] (const Key1& key, MatrixTile&& tile_kk, - std::tuple>, ttg::Out>>& out) TASKRET { + std::tuple>, ttg::Out>>& out) + -> ttg::device::Task { const auto K = key[0]; /* compute successors before submitting the kernel @@ -186,7 +189,7 @@ namespace potrf { ttg::abort(); } }; - return ttg::make_tt(f_dev, ttg::edges(ttg::fuse(input, input_disp)), ttg::edges(output_result, output_trsm), "POTRF", + return ttg::make_tt(f_dev, ttg::edges(ttg::fuse(input, input_disp)), ttg::edges(output_result, output_trsm), "POTRF", {"tile_kk/dispatcher"}, {"output_result", "output_trsm"}); #else /* defined(ENABLE_DEVICE_KERNEL) */ auto f = [=](const Key1& key, MatrixTile&& tile_kk, @@ -234,7 +237,7 @@ namespace potrf { #if defined(ENABLE_DEVICE_KERNEL) auto f = [=](const Key2& key, const MatrixTile& tile_kk, MatrixTile&& tile_mk, std::tuple>, ttg::Out>, ttg::Out>, - ttg::Out>>& out) TASKRET { + ttg::Out>>& out) -> ttg::device::Task { const int M = key[0]; const int K = key[1]; // the column equals the outer most look K (same as PO) @@ -302,6 +305,9 @@ namespace potrf { mb, nb, &alpha, tile_kk.buffer().current_device_ptr(), tile_kk.lda(), tile_mk.buffer().current_device_ptr(), tile_mk.lda()); +#else + blas::trsm(blas::Layout::ColMajor, blas::Side::Right, lapack::Uplo::Lower, blas::Op::Trans, blas::Diag::NonUnit, + mb, nb, 1.0, tile_kk.data(), tile_kk.lda(), tile_mk.data(), tile_mk.lda()); #endif @@ -320,7 +326,7 @@ namespace potrf { co_await ttg::device::forward(ttg::device::broadcast<0, 1, 2, 3>(std::make_tuple(key, Key2(K, M), keylist_row, keylist_col), std::move(tile_mk), out)); }; - return ttg::make_tt(f, ttg::edges(input_kk, ttg::fuse(input_mk, input_disp)), + return ttg::make_tt(f, ttg::edges(input_kk, ttg::fuse(input_mk, input_disp)), ttg::edges(output_result, output_diag, output_row, output_col), "TRSM", {"tile_kk", "tile_mk/dispatcher"}, {"output_result", "tile_mk", "output_row", "output_col"}); #else // defined(ENABLE_DEVICE_KERNEL) @@ -386,8 +392,8 @@ namespace potrf { ttg::Edge>& output_syrk) { using T = typename MatrixT::element_type; #if defined(ENABLE_DEVICE_KERNEL) - auto f = [=](const Key2& key, const MatrixTile& tile_mk, MatrixTile&& tile_kk, - std::tuple>, ttg::Out>>& out) TASKRET { + auto f = [=](const Key2& key, const MatrixTile& tile_mk, MatrixTile&& tile_kk) + -> ttg::device::Task { const int K = key[0]; const int M = key[1]; @@ -432,6 +438,9 @@ namespace potrf { mb, nb, &alpha, tile_mk.buffer().current_device_ptr(), tile_mk.lda(), &beta, tile_kk.buffer().current_device_ptr(), tile_kk.lda()); +#else + blas::syrk(blas::Layout::ColMajor, lapack::Uplo::Lower, blas::Op::NoTrans, mb, nb, -1.0, tile_mk.data(), + tile_mk.lda(), 1.0, tile_kk.data(), tile_kk.lda()); #endif #ifdef DEBUG_TILES_VALUES @@ -449,18 +458,17 @@ namespace potrf { if (M == K + 1) { /* send the tile to potrf */ if (ttg::tracing()) ttg::print("SYRK(", key, "): sending output to POTRF(", Key1{K + 1}, ")"); - co_await ttg::device::send<0>(Key1(K + 1), std::move(tile_kk), out); + co_await ttg::device::send<0>(Key1(K + 1), std::move(tile_kk)); } else { /* send output to next syrk */ if (ttg::tracing()) ttg::print("SYRK(", key, "): sending output to SYRK(", Key2{K + 1, M}, ")"); - co_await ttg::device::send<1>(Key2(K + 1, M), std::move(tile_kk), out); + co_await ttg::device::send<1>(Key2(K + 1, M), std::move(tile_kk)); } }; - return ttg::make_tt(f, ttg::edges(input_mk, ttg::fuse(input_kk, input_disp)), ttg::edges(output_potrf, output_syrk), + return ttg::make_tt(f, ttg::edges(input_mk, ttg::fuse(input_kk, input_disp)), ttg::edges(output_potrf, output_syrk), "SYRK", {"tile_mk", "tile_kk/dispatcher"}, {"output_potrf", "output_syrk"}); #else // defined(ENABLE_DEVICE_KERNEL) - auto f = [=](const Key2& key, const MatrixTile& tile_mk, MatrixTile&& tile_kk, - std::tuple>, ttg::Out>>& out) { + auto f = [=](const Key2& key, const MatrixTile& tile_mk, MatrixTile&& tile_kk) { const int K = key[0]; const int M = key[1]; @@ -487,11 +495,11 @@ namespace potrf { if (M == K + 1) { /* send the tile to potrf */ if (ttg::tracing()) ttg::print("SYRK(", key, "): sending output to POTRF(", Key1{K + 1}, ")"); - ttg::send<0>(Key1(K + 1), std::move(tile_kk), out); + ttg::send<0>(Key1(K + 1), std::move(tile_kk)); } else { /* send output to next syrk */ if (ttg::tracing()) ttg::print("SYRK(", key, "): sending output to SYRK(", Key2{K + 1, M}, ")"); - ttg::send<1>(Key2(K + 1, M), std::move(tile_kk), out); + ttg::send<1>(Key2(K + 1, M), std::move(tile_kk)); } }; return ttg::make_tt(f, ttg::edges(input_mk, ttg::fuse(input_kk, input_disp)), ttg::edges(output_potrf, output_syrk), @@ -509,8 +517,8 @@ namespace potrf { ttg::Edge>& output_gemm) { using T = typename MatrixT::element_type; #if defined(ENABLE_DEVICE_KERNEL) - auto f = [=](const Key3& key, const MatrixTile& tile_mk, const MatrixTile& tile_nk, MatrixTile&& tile_mn, - std::tuple>, ttg::Out>>& out) TASKRET { + auto f = [=](const Key3& key, const MatrixTile& tile_mk, const MatrixTile& tile_nk, MatrixTile&& tile_mn) + -> ttg::device::Task { const int M = key[0]; const int N = key[1]; const int K = key[2]; @@ -559,6 +567,10 @@ namespace potrf { tile_mk.buffer().current_device_ptr(), tile_mk.lda(), tile_nk.buffer().current_device_ptr(), tile_nk.lda(), &beta, tile_mn.buffer().current_device_ptr(), tile_mn.lda()); +#else + blas::gemm(blas::Layout::ColMajor, blas::Op::NoTrans, blas::Op::Trans, tile_mk.rows(), tile_nk.rows(), + tile_nk.cols(), -1.0, tile_mk.data(), tile_mk.lda(), tile_nk.data(), tile_nk.lda(), 1.0, + tile_mn.data(), tile_mn.lda()); #endif @@ -578,19 +590,18 @@ namespace potrf { if (N == K + 1) { /* send the tile to trsm */ if (ttg::tracing()) ttg::print("GEMM(", key, "): sending output to TRSM(", Key2{M, N}, ")"); - co_await ttg::device::send<0>(Key2(M, N), std::move(tile_mn), out); + co_await ttg::device::send<0>(Key2(M, N), std::move(tile_mn)); } else { /* send the tile to the next gemm */ if (ttg::tracing()) ttg::print("GEMM(", key, "): sending output to GEMM(", Key3{M, N, K + 1}, ")"); - co_await ttg::device::send<1>(Key3(M, N, K + 1), std::move(tile_mn), out); + co_await ttg::device::send<1>(Key3(M, N, K + 1), std::move(tile_mn)); } }; - return ttg::make_tt(f, ttg::edges(input_mk, input_nk, ttg::fuse(input_disp, input_mn)), + return ttg::make_tt(f, ttg::edges(input_mk, input_nk, ttg::fuse(input_disp, input_mn)), ttg::edges(output_trsm, output_gemm), "GEMM", {"input_mk", "input_kn", "input_mn/dispatcher"}, {"output_trsm", "outout_gemm"}); #else // defined(ENABLE_DEVICE_KERNEL) - auto f = [=](const Key3& key, const MatrixTile& tile_mk, const MatrixTile& tile_nk, MatrixTile&& tile_mn, - std::tuple>, ttg::Out>>& out) { + auto f = [=](const Key3& key, const MatrixTile& tile_mk, const MatrixTile& tile_nk, MatrixTile&& tile_mn) { const int M = key[0]; const int N = key[1]; const int K = key[2]; @@ -617,11 +628,11 @@ namespace potrf { if (N == K + 1) { /* send the tile to trsm */ if (ttg::tracing()) ttg::print("GEMM(", key, "): sending output to TRSM(", Key2{M, N}, ")"); - ttg::send<0>(Key2(M, N), std::move(tile_mn), out); + ttg::send<0>(Key2(M, N), std::move(tile_mn)); } else { /* send the tile to the next gemm */ if (ttg::tracing()) ttg::print("GEMM(", key, "): sending output to GEMM(", Key3{M, N, K + 1}, ")"); - ttg::send<1>(Key3(M, N, K + 1), std::move(tile_mn), out); + ttg::send<1>(Key3(M, N, K + 1), std::move(tile_mn)); } }; return ttg::make_tt(f, ttg::edges(input_mk, input_nk, ttg::fuse(input_disp, input_mn)), @@ -634,20 +645,18 @@ namespace potrf { auto make_dispatcher(ttg::Edge>& input, ttg::Edge>& to_potrf, ttg::Edge>& to_trsm, ttg::Edge>& to_syrk, ttg::Edge>& to_gemm) { - auto f = [=](const Key2& key, const MatrixTile& tile, - std::tuple>, ttg::Out>, ttg::Out>, - ttg::Out>>& out) { + auto f = [=](const Key2& key, const MatrixTile& tile) { if (ttg::tracing()) ttg::print("POTRF_Dispatch(", key, ")"); if (0 == key[0] && 0 == key[1]) { // First element goes to POTRF if (ttg::tracing()) ttg::print("POTRF_Dispatch(", key, ") sending to POTRF(", Key1{key[0]}, ")"); - ttg::send<0>(Key1{key[0]}, tile, out); + ttg::send<0>(Key1{key[0]}, tile); return; } if (key[0] == key[1]) { // Other diagonal elements go to SYRK if (ttg::tracing()) ttg::print("POTRF_Dispatch(", key, ") sending to SYRK(", Key2{0, key[0]}, ")"); - ttg::send<2>(Key2{0, key[0]}, tile, out); + ttg::send<2>(Key2{0, key[0]}, tile); return; } // We only consider the lower triangular @@ -655,12 +664,12 @@ namespace potrf { if (0 == key[1]) { // First column goes to TRSM if (ttg::tracing()) ttg::print("POTRF_Dispatch(", key, ") sending to TRSM(", key, ")"); - ttg::send<1>(key, tile, out); + ttg::send<1>(key, tile); return; } // Rest goes to GEMM if (ttg::tracing()) ttg::print("POTRF_Dispatch(", key, ") sending to GEMM(", Key3{key[0], key[1], 0}, ")"); - ttg::send<3>(Key3{key[0], key[1], 0}, tile, out); + ttg::send<3>(Key3{key[0], key[1], 0}, tile); }; return ttg::make_tt(f, ttg::edges(input), ttg::edges(to_potrf, to_trsm, to_syrk, to_gemm), "POTRF Dispatch", @@ -705,28 +714,36 @@ namespace potrf { tt_potrf->set_keymap(keymap1); tt_potrf->set_defer_writer(defer_write); #ifdef ENABLE_DEVICE_KERNEL - tt_potrf->set_devicemap(devmap1); + if constexpr (ES != ttg::ExecutionSpace::Host) { + tt_potrf->set_devicemap(devmap1); + } #endif // 0 auto tt_trsm = make_trsm(A, disp_trsm, potrf_trsm, gemm_trsm, trsm_syrk, trsm_gemm_row, trsm_gemm_col, output); tt_trsm->set_keymap(keymap2a); tt_trsm->set_defer_writer(defer_write); #ifdef ENABLE_DEVICE_KERNEL - tt_trsm->set_devicemap(devmap2a); + if constexpr (ES != ttg::ExecutionSpace::Host) { + tt_trsm->set_devicemap(devmap2a); + } #endif // 0 auto tt_syrk = make_syrk(A, disp_syrk, trsm_syrk, syrk_syrk, syrk_potrf, syrk_syrk); tt_syrk->set_keymap(keymap2b); tt_syrk->set_defer_writer(defer_write); #ifdef ENABLE_DEVICE_KERNEL - tt_syrk->set_devicemap(devmap2b); + if constexpr (ES != ttg::ExecutionSpace::Host) { + tt_syrk->set_devicemap(devmap2b); + } #endif // 0 auto tt_gemm = make_gemm(A, disp_gemm, trsm_gemm_row, trsm_gemm_col, gemm_gemm, gemm_trsm, gemm_gemm); tt_gemm->set_keymap(keymap3); tt_gemm->set_defer_writer(defer_write); #ifdef ENABLE_DEVICE_KERNEL - tt_gemm->set_devicemap(devmap3); + if constexpr (ES != ttg::ExecutionSpace::Host) { + tt_gemm->set_devicemap(devmap3); + } #endif // 0 /* Priorities taken from DPLASMA */ diff --git a/examples/task-benchmarks/chain-ttg-dev.cc b/examples/task-benchmarks/chain-ttg-dev.cc index 80f14bff4..559e32870 100644 --- a/examples/task-benchmarks/chain-ttg-dev.cc +++ b/examples/task-benchmarks/chain-ttg-dev.cc @@ -3,13 +3,19 @@ #include "chrono.h" -#if defined(TTG_HAVE_CUDA) +#if defined(CHAIN_CUDA) +#ifndef TTG_HAVE_CUDA +#error Cannot build CUDA chain benchmark against TTG that does not support CUDA! +#endif #define ES ttg::ExecutionSpace::CUDA -#elif defined(TTG_HAVE_HIP) +#elif defined(CHAIN_HIP) #define ES ttg::ExecutionSpace::HIP +#ifndef TTG_HAVE_HIP +#error Cannot build HIP chain benchmark against TTG that does not support HIP! +#endif #else -#error "Either CUDA OR HIP is required to build this test!" -#endif // 0 +#define ES ttg::ExecutionSpace::Host +#endif #define NUM_TASKS 100000 @@ -53,7 +59,7 @@ auto make_ttg<1>(bool do_move) { send<0>(0, A{}); }, edges(), edges(I2N)); - auto next = make_tt([=](const int &key, auto&& value) -> ttg::device::Task { + auto next = make_tt([=](const int &key, auto&& value) -> ttg::device::Task { //++task_counter; co_await ttg::device::select(value.b); if (key < NUM_TASKS) { @@ -62,7 +68,6 @@ auto make_ttg<1>(bool do_move) { } else { co_await ttg::device::forward(ttg::device::send<0>(key+1, value)); } - } else { } } , edges(fuse(I2N, N2N)), edges(N2N)); @@ -80,7 +85,7 @@ auto make_ttg<2>(bool do_move) { send<1>(0, A{}); }, edges(), edges(I2N1, I2N2)); - auto next = make_tt([=](const int &key, A&& v1, A&& v2) -> ttg::device::Task { + auto next = make_tt([=](const int &key, A&& v1, A&& v2) -> ttg::device::Task { co_await ttg::device::select(v1.b, v2.b); if (key < NUM_TASKS) { if (do_move) { @@ -110,7 +115,7 @@ auto make_ttg<4>(bool do_move) { send<3>(0, A{}); }, edges(), edges(I2N1, I2N2, I2N3, I2N4)); - auto next = make_tt([=](const int &key, A&& v1, A&& v2, A&& v3, A&& v4) -> ttg::device::Task { + auto next = make_tt([=](const int &key, A&& v1, A&& v2, A&& v3, A&& v4) -> ttg::device::Task { co_await ttg::device::select(v1.b, v2.b, v3.b, v4.b); if (key < NUM_TASKS) { if (do_move) { @@ -150,7 +155,7 @@ auto make_ttg<8>(bool do_move) { send<7>(0, A{}); }, edges(), edges(I2N1, I2N2, I2N3, I2N4, I2N5, I2N6, I2N7, I2N8)); - auto next = make_tt([=](const int &key, auto&& v1, auto&& v2, auto&& v3, auto&& v4, auto&& v5, auto&& v6, auto&& v7, auto&& v8) -> ttg::device::Task { + auto next = make_tt([=](const int &key, auto&& v1, auto&& v2, auto&& v3, auto&& v4, auto&& v5, auto&& v6, auto&& v7, auto&& v8) -> ttg::device::Task { co_await ttg::device::select(v1.b, v2.b, v3.b, v4.b, v5.b, v6.b, v7.b, v8.b); if (key < NUM_TASKS) { if (do_move) { @@ -187,7 +192,7 @@ auto make_ttg<0>(bool do_move) { auto init = make_tt([](std::tuple> &outs) { sendk<0>(0, outs); }, edges(), edges(I2N)); - auto next = make_tt([](const int& key) -> ttg::device::Task { + auto next = make_tt([](const int& key) -> ttg::device::Task { co_await ttg::device::select(); if (key < NUM_TASKS) { co_await ttg::device::forward(ttg::device::sendk<0>(key+1)); diff --git a/examples/task-benchmarks/chain-ttg.cc b/examples/task-benchmarks/chain-ttg.cc new file mode 100644 index 000000000..285d57aca --- /dev/null +++ b/examples/task-benchmarks/chain-ttg.cc @@ -0,0 +1,267 @@ +//#define TTG_USE_USER_TERMDET 1 +#include "ttg.h" + +#include "chrono.h" + +#define NUM_TASKS 100000 + +using namespace ttg; + +template +auto make_ttg(bool do_move); + +// flows task ids via values +template <> +auto make_ttg<1>(bool do_move) { + Edge I2N, N2N; + Edge N2S; + + auto init = make_tt([]() { send<0>(0, 0); }, edges(), edges(I2N)); + + auto next = make_tt([=](const int &key, auto&& value) { + if (key < NUM_TASKS) { + //std::cout << &value << " -> " << value << std::endl; + //if (key < 10) { + //value++; + if (do_move) { + send<0>(key+1, std::move(value)); + //send<0>(key+1, value); + } else { + send<0>(key+1, value); + } + } + else { + sendv<1>(std::move(value)); + } + } , edges(fuse(I2N, N2N)), edges(N2N, N2S)); + + auto stop = make_tt([](const int& v) { + //std::cout << "last task received v=" << v << std::endl; + ttg::default_execution_context().impl().final_task(); + }, edges(N2S), edges()); + + return std::make_tuple(std::move(init), std::move(next), std::move(stop)); +} + +template <> +auto make_ttg<2>(bool do_move) { + Edge I2N1, I2N2; + Edge N2N1, N2N2; + Edge N2S1, N2S2; + + auto init = make_tt([]() { + send<0>(0, 0); + send<1>(0, 0); + }, edges(), edges(I2N1, I2N2)); + + auto next = make_tt([=](const int &key, int&& v1, int&& v2) { + if (key < NUM_TASKS) { + v1++; v2++; + if (do_move) { + send<0>(key+1, std::move(v1)); + send<1>(key+1, std::move(v2)); + } else { + send<0>(key+1, v1); + send<1>(key+1, v2); + } + } + else { + sendv<2>(std::move(v1)); + sendv<3>(std::move(v2)); + } + } , edges(fuse(I2N1, N2N1), fuse(I2N2, N2N2)), edges(N2N1, N2N2, N2S1, N2S2)); + + auto stop = make_tt([](const int &v1, const int &v2) { + //std::cout << "last task received v=" << v1 << std::endl; + ttg::default_execution_context().impl().final_task(); + }, edges(N2S1, N2S2), edges()); + + return std::make_tuple(std::move(init), std::move(next), std::move(stop)); +} + +template <> +auto make_ttg<4>(bool do_move) { + Edge I2N1, I2N2, I2N3, I2N4; + Edge N2N1, N2N2, N2N3, N2N4; + Edge N2S1, N2S2, N2S3, N2S4; + + auto init = make_tt([]() { + send<0>(0, 0); + send<1>(0, 0); + send<2>(0, 0); + send<3>(0, 0); + }, edges(), edges(I2N1, I2N2, I2N3, I2N4)); + + auto next = make_tt([=](const int &key, int&& v1, int&& v2, int&& v3, int&& v4) { + if (key < NUM_TASKS) { + v1++; v2++; + v3++; v4++; + if (do_move) { + send<0>(key+1, std::move(v1)); + send<1>(key+1, std::move(v2)); + send<2>(key+1, std::move(v3)); + send<3>(key+1, std::move(v4)); + } else { + send<0>(key+1, v1); + send<1>(key+1, v2); + send<2>(key+1, v3); + send<3>(key+1, v4); + } + } + else { + sendv<4>(std::move(v1)); + sendv<5>(std::move(v2)); + sendv<6>(std::move(v3)); + sendv<7>(std::move(v4)); + } + }, edges(fuse(I2N1, N2N1), fuse(I2N2, N2N2), + fuse(I2N3, N2N3), fuse(I2N4, N2N4)), + edges(N2N1, N2N2, N2N3, N2N4, N2S1, N2S2, N2S3, N2S4)); + + auto stop = make_tt([](const int& v1, const int& v2, const int& v3, const int& v4){ + //std::cout << "last task received v=" << v1 << std::endl; + ttg::default_execution_context().impl().final_task(); + }, edges(N2S1, N2S2, N2S3, N2S4), edges()); + + return std::make_tuple(std::move(init), std::move(next), std::move(stop)); +} + +template <> +auto make_ttg<8>(bool do_move) { + Edge I2N1, I2N2, I2N3, I2N4, I2N5, I2N6, I2N7, I2N8; + Edge N2N1, N2N2, N2N3, N2N4, N2N5, N2N6, N2N7, N2N8; + Edge N2S1, N2S2, N2S3, N2S4, N2S5, N2S6, N2S7, N2S8; + + auto init = make_tt([]() { + send<0>(0, 0); + send<1>(0, 0); + send<2>(0, 0); + send<3>(0, 0); + send<4>(0, 0); + send<5>(0, 0); + send<6>(0, 0); + send<7>(0, 0); + }, edges(), edges(I2N1, I2N2, I2N3, I2N4, I2N5, I2N6, I2N7, I2N8)); + + auto next = make_tt([=](const int &key, auto&& v1, auto&& v2, auto&& v3, + auto&& v4, auto&& v5, auto&& v6, auto&& v7, auto&& v8) { + if (key < NUM_TASKS) { + //if (key < 1000) { + v1++; v2++; + v3++; v4++; + v5++; v6++; + v6++; v8++; + if (do_move) { + send<0>(key+1, std::move(v1)); + send<1>(key+1, std::move(v2)); + send<2>(key+1, std::move(v3)); + send<3>(key+1, std::move(v4)); + send<4>(key+1, std::move(v5)); + send<5>(key+1, std::move(v6)); + send<6>(key+1, std::move(v7)); + send<7>(key+1, std::move(v8)); + } else { + send<0>(key+1, v1); + send<1>(key+1, v2); + send<2>(key+1, v3); + send<3>(key+1, v4); + send<4>(key+1, v5); + send<5>(key+1, v6); + send<6>(key+1, v7); + send<7>(key+1, v8); + } + } + else { + sendv<8>(std::move(v1)); + sendv<9>(std::move(v2)); + sendv<10>(std::move(v3)); + sendv<11>(std::move(v4)); + sendv<12>(std::move(v5)); + sendv<13>(std::move(v6)); + sendv<14>(std::move(v7)); + sendv<15>(std::move(v8)); + } + }, edges(fuse(I2N1, N2N1), fuse(I2N2, N2N2), fuse(I2N3, N2N3), fuse(I2N4, N2N4), fuse(I2N5, N2N5), fuse(I2N6, N2N6), fuse(I2N7, N2N7), fuse(I2N8, N2N8)), + edges(N2N1, N2N2, N2N3, N2N4, N2N5, N2N6, N2N7, N2N8, N2S1, N2S2, N2S3, N2S4, N2S5, N2S6, N2S7, N2S8)); + + auto stop = make_tt([](const int &v1, const int &v2, const int &v3, const int &v4, const int &v5, const int &v6, const int &v7, const int &v8) { + //std::cout << "last task received v=" << v1 << std::endl; + ttg::default_execution_context().impl().final_task(); + }, edges(N2S1, N2S2, N2S3, N2S4, N2S5, N2S6, N2S7, N2S8), edges()); + + return std::make_tuple(std::move(init), std::move(next), std::move(stop)); +} + +// flows task ids via keys +template <> +auto make_ttg<0>(bool do_move) { + Edge I2N, N2N; + Edge N2S; + + auto init = make_tt([]() { sendk<0>(0); }, edges(), edges(I2N)); + + auto next = make_tt([](const int& key) { + if (key < NUM_TASKS) { + ::sendk<0>(key+1); + } + else { + ::sendv<1>(key); + } + }, edges(fuse(I2N, N2N)), edges(N2N, N2S)); + + auto stop = make_tt([](const int &v) { + //std::cout << "last task received v=" << v << std::endl; + ttg::default_execution_context().impl().final_task(); + }, edges(N2S), edges()); + + return std::make_tuple(std::move(init), std::move(next), std::move(stop)); +} + +template +void run_bench(bool do_move) +{ + auto [init, next, stop] = make_ttg(do_move); + + auto connected = make_graph_executable(init.get()); + assert(connected); + std::cout << "Graph is connected.\n"; + + auto t0 = now(); + if (ttg::default_execution_context().rank() == 0) init->invoke(); + + ttg_execute(ttg_default_execution_context()); + ttg_fence(ttg_default_execution_context()); + auto t1 = now(); + + std::cout << "# of tasks = " << NUM_TASKS << std::endl; + std::cout << "time elapsed (microseconds) = " << duration_in_mus(t0, t1) + << ", avg " << duration_in_mus(t0, t1) / (double)NUM_TASKS << std::endl; +} + +int main(int argc, char* argv[]) { + + int num_flows = 0; + int do_move = 1; + ttg_initialize(argc, argv, -1); + + if (argc > 1) { + num_flows = std::atoi(argv[1]); + } + + if (argc > 2) { + do_move = std::atoi(argv[2]); + } + + switch(num_flows) { + case 0: run_bench<0>(do_move); break; + case 1: run_bench<1>(do_move); break; + case 2: run_bench<2>(do_move); break; + case 4: run_bench<4>(do_move); break; + case 8: run_bench<8>(do_move); break; + default: std::cout << "Unsupported number of flows: " << num_flows << std::endl; + } + + ttg_finalize(); + return 0; +} + diff --git a/tests/unit/cuda_kernel.cu b/tests/unit/cuda_kernel.cu index f6f00d172..f280f9a97 100644 --- a/tests/unit/cuda_kernel.cu +++ b/tests/unit/cuda_kernel.cu @@ -13,7 +13,10 @@ static __global__ void cu_increment_buffer(double* buffer, double* scratch) { } } -void increment_buffer(double* buffer, std::size_t buffer_size, double* scratch, std::size_t scratch_size) { +void increment_buffer_cuda( + double* buffer, std::size_t buffer_size, + double* scratch, std::size_t scratch_size) +{ cu_increment_buffer<<<1, buffer_size>>>(buffer, scratch); diff --git a/tests/unit/cuda_kernel.h b/tests/unit/cuda_kernel.h index 4fec87a99..a0d3181ce 100644 --- a/tests/unit/cuda_kernel.h +++ b/tests/unit/cuda_kernel.h @@ -1,4 +1,5 @@ #include "ttg/config.h" #include -void increment_buffer(double* buffer, std::size_t buffer_size, double* scratch, std::size_t scratch_size); \ No newline at end of file +void increment_buffer_cuda( + double* buffer, std::size_t buffer_size, double* scratch, std::size_t scratch_size); \ No newline at end of file diff --git a/tests/unit/device_coro.cc b/tests/unit/device_coro.cc index 60581d232..489ad8ffb 100644 --- a/tests/unit/device_coro.cc +++ b/tests/unit/device_coro.cc @@ -6,10 +6,24 @@ #include "cuda_kernel.h" +#if defined(TTG_HAVE_CUDA) +#define SPACE ttg::ExecutionSpace::CUDA +#else +#define SPACE ttg::ExecutionSpace::Host +#endif // 0 + struct value_t { ttg::Buffer db; // TODO: rename int quark; + value_t() + {} + + value_t(std::size_t c) + : db(c) + , quark(c) + { } + template void serialize(Archive& ar, const unsigned int version) { ar& quark; @@ -17,6 +31,22 @@ struct value_t { } }; +static void increment_buffer( + double* buffer, std::size_t buffer_size, + double* scratch, std::size_t scratch_size) +{ +#if defined(TTG_HAVE_CUDA) + increment_buffer(buffer, buffer_size, scratch, scratch_size); +#else // TTG_HAVE_CUDA + for (std::size_t i = 0 ; i < buffer_size; ++i) { + buffer[i] += 1.0; + } + if (scratch != nullptr) { + *scratch += 1.0; + } +#endif // TTG_HAVE_CUDA +} + #ifdef TTG_SERIALIZATION_SUPPORTS_MADNESS /* devicebuf is non-POD so provide serialization * information for members not a devicebuf */ @@ -28,14 +58,14 @@ namespace madness::archive { } // namespace madness::archive #endif // TTG_SERIALIZATION_SUPPORTS_MADNESS -#if defined(TTG_HAVE_DEVICE) && defined(TTG_IMPL_DEVICE_SUPPORT) +#if defined(TTG_IMPL_DEVICE_SUPPORT) TEST_CASE("Device", "coro") { SECTION("devicebuf") { ttg::Edge edge; - auto fn = [&](const int& key, value_t&& val) -> ttg::device::Task { + auto fn = [&](const int& key, value_t&& val) -> ttg::device::Task { //ttg::print("device_task key ", key); /* wait for the view to be available on the device */ @@ -58,10 +88,10 @@ TEST_CASE("Device", "coro") { //ptr.get_view(device_id); - auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), - "device_task", {"edge_in"}, {"edge_out"}); + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); ttg::make_graph_executable(tt); - if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{}); + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{1}); std::cout << "Entering fence" << std::endl; ttg::ttg_fence(ttg::default_execution_context()); } @@ -69,7 +99,7 @@ TEST_CASE("Device", "coro") { SECTION("devicebuf-inc") { ttg::Edge edge; - auto fn = [&](const int& key, value_t&& val) -> ttg::device::Task { + auto fn = [&](const int& key, value_t&& val) -> ttg::device::Task { //ttg::print("device_task key ", key); /* wait for the view to be available on the device */ @@ -80,19 +110,15 @@ TEST_CASE("Device", "coro") { std::cout << "KEY " << key << " VAL IN DEV " << *val.db.current_device_ptr() << " VAL IN HOST " << *val.db.host_ptr() << std::endl; /* call a kernel */ -#ifdef TTG_HAVE_CUDA increment_buffer(val.db.current_device_ptr(), val.db.size(), nullptr, 0); -#endif // TTG_HAVE_CUDA /* here we suspend to wait for a kernel to complete */ co_await ttg::device::wait(val.db); std::cout << "KEY " << key << " VAL OUT DEV " << *val.db.current_device_ptr() << " VAL OUT HOST " << *val.db.host_ptr() << std::endl; -#ifdef TTG_HAVE_CUDA /* buffer is increment once per task, so it should be the same as key */ CHECK(static_cast(*val.db.host_ptr()) == key+1); -#endif // TTG_HAVE_CUDA /* we're back, the kernel executed and we can send */ if (key < 10) { @@ -104,10 +130,10 @@ TEST_CASE("Device", "coro") { //ptr.get_view(device_id); - auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), - "device_task", {"edge_in"}, {"edge_out"}); + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); ttg::make_graph_executable(tt); - value_t v; + value_t v{1}; *v.db.host_ptr() = 2.0; // start from non-zero value if (ttg::default_execution_context().rank() == 0) tt->invoke(2, std::move(v)); std::cout << "Entering fence" << std::endl; @@ -117,7 +143,7 @@ TEST_CASE("Device", "coro") { SECTION("scratch") { ttg::Edge edge; - auto fn = [&](const int& key, value_t&& val) -> ttg::device::Task { + auto fn = [&](const int& key, value_t&& val) -> ttg::device::Task { double scratch = 0.0; ttg::devicescratch ds = ttg::make_scratch(&scratch, ttg::scope::Allocate); @@ -127,17 +153,13 @@ TEST_CASE("Device", "coro") { CHECK(ds.device_ptr() != nullptr); /* call a kernel */ -#ifdef TTG_HAVE_CUDA increment_buffer(val.db.current_device_ptr(), val.db.size(), ds.device_ptr(), ds.size()); -#endif // TTG_HAVE_CUDA /* here we suspend to wait for a kernel to complete */ co_await ttg::device::wait(ds); -#ifdef TTG_HAVE_CUDA /* the scratch is allocated but no data is transferred in; it's incremented once */ CHECK((static_cast(scratch)-1) == 0); -#endif // 0 /* we're back, the kernel executed and we can send */ if (key < 10) { @@ -149,17 +171,17 @@ TEST_CASE("Device", "coro") { } }; - auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), - "device_task", {"edge_in"}, {"edge_out"}); + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); ttg::make_graph_executable(tt); - if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{}); + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{1}); ttg::ttg_fence(ttg::default_execution_context()); } SECTION("scratch-syncin") { ttg::Edge edge; - auto fn = [&](const int& key, value_t&& val) -> ttg::device::Task { + auto fn = [&](const int& key, value_t&& val) -> ttg::device::Task { double scratch = key; ttg::devicescratch ds = ttg::make_scratch(&scratch, ttg::scope::SyncIn); @@ -169,17 +191,13 @@ TEST_CASE("Device", "coro") { CHECK(ds.device_ptr() != nullptr); /* call a kernel */ -#ifdef TTG_HAVE_CUDA increment_buffer(val.db.current_device_ptr(), val.db.size(), ds.device_ptr(), ds.size()); -#endif // TTG_HAVE_CUDA /* here we suspend to wait for a kernel to complete */ co_await ttg::device::wait(ds); -#ifdef TTG_HAVE_CUDA /* scratch is increment once per task, so it should be the same as key */ CHECK((static_cast(scratch))-1 == key); -#endif // 0 /* we're back, the kernel executed and we can send */ if (key < 10) { @@ -191,17 +209,17 @@ TEST_CASE("Device", "coro") { } }; - auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), - "device_task", {"edge_in"}, {"edge_out"}); + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); ttg::make_graph_executable(tt); - if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{}); + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{1}); ttg::ttg_fence(ttg::default_execution_context()); } SECTION("scratch-value-out") { ttg::Edge edge; - auto fn = [&](const int& key, value_t&& val) -> ttg::device::Task { + auto fn = [&](const int& key, value_t&& val) -> ttg::device::Task { double scratch = 0.0; ttg::devicescratch ds = ttg::make_scratch(&scratch, ttg::scope::Allocate); @@ -211,17 +229,13 @@ TEST_CASE("Device", "coro") { CHECK(ds.device_ptr() != nullptr); /* call a kernel */ -#ifdef TTG_HAVE_CUDA increment_buffer(val.db.current_device_ptr(), val.db.size(), ds.device_ptr(), ds.size()); -#endif // TTG_HAVE_CUDA /* here we suspend to wait for a kernel to complete */ co_await ttg::device::wait(ds, val.db); -#ifdef TTG_HAVE_CUDA /* buffer is increment once per task, so it should be 1 */ CHECK((static_cast(scratch)-1) == 0); -#endif // 0 /* we're back, the kernel executed and we can send */ if (key < 10) { @@ -233,20 +247,23 @@ TEST_CASE("Device", "coro") { } }; - auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), - "device_task", {"edge_in"}, {"edge_out"}); + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); ttg::make_graph_executable(tt); - if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{}); + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{1}); ttg::ttg_fence(ttg::default_execution_context()); } + +#if 0 + /* TODO: Ptr seems broken atm, fix or remove! */ SECTION("ptr") { ttg::Edge edge; ttg::Ptr ptr; int last_key = 0; constexpr const int num_iter = 10; - auto fn = [&](const int& key, value_t&& val) -> ttg::device::Task { + auto fn = [&](const int& key, value_t&& val) -> ttg::device::Task { double scratch = key; ttg::devicescratch ds = ttg::make_scratch(&scratch, ttg::scope::SyncIn); @@ -256,18 +273,14 @@ TEST_CASE("Device", "coro") { CHECK(ds.device_ptr() != nullptr); /* KERNEL */ -#ifdef TTG_HAVE_CUDA increment_buffer(val.db.current_device_ptr(), val.db.size(), ds.device_ptr(), ds.size()); -#endif // TTG_HAVE_CUDA /* here we suspend to wait for a kernel and the out-transfer to complete */ co_await ttg::device::wait(val.db, ds); -#ifdef TTG_HAVE_CUDA /* buffer is increment once per task, so it should be the same as key */ CHECK(static_cast(scratch) == key+1); CHECK(static_cast(*val.db.host_ptr()) == key+1); -#endif // TTG_HAVE_CUDA /* we're back, the kernel executed and we can send */ if (key < num_iter) { @@ -283,10 +296,10 @@ TEST_CASE("Device", "coro") { //ptr.get_view(device_id); - auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), - "device_task", {"edge_in"}, {"edge_out"}); + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); ttg::make_graph_executable(tt); - if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{}); + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{1}); ttg::ttg_fence(ttg::default_execution_context()); if (num_iter == last_key) { CHECK(ptr.is_valid()); @@ -300,6 +313,8 @@ TEST_CASE("Device", "coro") { ptr.reset(); } +#endif // 0 + /* TODO: enabel this test once we control the PaRSEC state machine! */ SECTION("device-host-tasks") { @@ -307,15 +322,11 @@ TEST_CASE("Device", "coro") { auto host_fn = [&](const int& key, value_t&& val) { /* check that the data has been synced back */ -#ifdef TTG_HAVE_CUDA CHECK(static_cast(*val.db.host_ptr()) == key); -#endif // TTG_HAVE_CUDA /* modify the data */ *val.db.host_ptr() += 1.0; -#ifdef TTG_HAVE_CUDA CHECK(static_cast(*val.db.host_ptr()) == key+1); -#endif // TTG_HAVE_CUDA /* send back to the device */ ttg::send<0>(key+1, std::move(val)); @@ -323,15 +334,13 @@ TEST_CASE("Device", "coro") { auto htt = ttg::make_tt(host_fn, ttg::edges(d2h), ttg::edges(h2d), "host_task", {"d2h"}, {"h2d"}); - auto device_fn = [&](const int& key, value_t&& val) -> ttg::device::Task { + auto device_fn = [&](const int& key, value_t&& val) -> ttg::device::Task { /* wait for the view to be available on the device */ co_await ttg::device::select(val.db); /* call a kernel */ -#ifdef TTG_HAVE_CUDA increment_buffer(val.db.current_device_ptr(), val.db.size(), nullptr, 0); -#endif // TTG_HAVE_CUDA /* here we suspend to wait for a kernel to complete */ //co_await ttg::device::wait(val.db); @@ -345,17 +354,17 @@ TEST_CASE("Device", "coro") { } }; - auto dtt = ttg::make_tt(device_fn, ttg::edges(h2d), ttg::edges(d2h), - "device_task", {"h2d"}, {"d2h"}); + auto dtt = ttg::make_tt(device_fn, ttg::edges(h2d), ttg::edges(d2h), + "device_task", {"h2d"}, {"d2h"}); ttg::make_graph_executable(dtt); - if (ttg::default_execution_context().rank() == 0) htt->invoke(0, value_t{}); + if (ttg::default_execution_context().rank() == 0) htt->invoke(0, value_t{1}); ttg::ttg_fence(ttg::default_execution_context()); } SECTION("loop") { ttg::Edge edge; - auto fn = [&](int key, value_t&& val) -> ttg::device::Task { + auto fn = [&](int key, value_t&& val) -> ttg::device::Task { double scratch = 1.0; ttg::devicescratch ds = ttg::make_scratch(&scratch, ttg::scope::Allocate); @@ -370,33 +379,29 @@ TEST_CASE("Device", "coro") { CHECK(val.db.current_device_ptr() != nullptr); /* KERNEL */ -#ifdef TTG_HAVE_CUDA increment_buffer(val.db.current_device_ptr(), val.db.size(), ds.device_ptr(), ds.size()); //increment_buffer(val.db.current_device_ptr(), val.db.size(), 0, 0); -#endif // TTG_HAVE_CUDA /* here we suspend to wait for a kernel and the out-transfer to complete */ co_await ttg::device::wait(val.db); -#ifdef TTG_HAVE_CUDA /* buffer is increment once per task, so it should be the same as key */ //CHECK(static_cast(scratch) == i); CHECK(static_cast(*val.db.host_ptr()) == i+1); -#endif // TTG_HAVE_CUDA } }; - auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), - "device_task", {"edge_in"}, {"edge_out"}); + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); ttg::make_graph_executable(tt); - if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{}); + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{1}); ttg::ttg_fence(ttg::default_execution_context()); } SECTION("loop-scratchout") { ttg::Edge edge; - auto fn = [&](int key, value_t&& val) -> ttg::device::Task { + auto fn = [&](int key, value_t&& val) -> ttg::device::Task { double scratch = -10.0; ttg::devicescratch ds = ttg::make_scratch(&scratch, ttg::scope::SyncIn); @@ -411,26 +416,22 @@ TEST_CASE("Device", "coro") { CHECK(val.db.current_device_ptr() != nullptr); /* KERNEL */ -#ifdef TTG_HAVE_CUDA increment_buffer(val.db.current_device_ptr(), val.db.size(), ds.device_ptr(), ds.size()); //increment_buffer(val.db.current_device_ptr(), val.db.size(), 0, 0); -#endif // TTG_HAVE_CUDA /* here we suspend to wait for a kernel and the out-transfer to complete */ co_await ttg::device::wait(val.db, ds); -#ifdef TTG_HAVE_CUDA /* buffer is increment once per task, so it should be the same as key */ CHECK(static_cast(scratch) == (-10+i+1)); CHECK(static_cast(*val.db.host_ptr()) == i+1); -#endif // TTG_HAVE_CUDA } }; - auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), - "device_task", {"edge_in"}, {"edge_out"}); + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); ttg::make_graph_executable(tt); - if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{}); + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{1}); ttg::ttg_fence(ttg::default_execution_context()); } } diff --git a/ttg/ttg/coroutine.h b/ttg/ttg/coroutine.h index dc1ed8e5b..333490a22 100644 --- a/ttg/ttg/coroutine.h +++ b/ttg/ttg/coroutine.h @@ -214,6 +214,7 @@ namespace ttg { // fwd declare all coro promise types that have not been declared yet namespace device::detail { + template struct device_task_promise_type; } // namespace device::detail diff --git a/ttg/ttg/device/task.h b/ttg/ttg/device/task.h index 8e2d14cfc..a772ad73d 100644 --- a/ttg/ttg/device/task.h +++ b/ttg/ttg/device/task.h @@ -402,26 +402,31 @@ namespace ttg::device { namespace detail { // fwd-decl + template struct device_task_promise_type; // base type for ttg::device::Task - using device_task_handle_type = ttg::coroutine_handle; + template + using device_task_handle_type = ttg::coroutine_handle>; } // namespace detail /// A device::Task is a coroutine (a callable that can be suspended and resumed). - /// Since task execution in TTG is not preempable, tasks should not block. + /// Since task execution in TTG is not preemptable, tasks should not block. /// The purpose of suspending a task is to yield control back to the runtime until some events occur; /// in the meantime its executor (e.g., a user-space thread) can perform other work. /// Once the task function reaches a point where further progress is pending completion of one or more asynchronous /// actions the function needs to be suspended via a coroutine await (`co_await`). /// Resumption will be handled by the runtime. - struct Task : public detail::device_task_handle_type { - using base_type = detail::device_task_handle_type; + template + struct Task : public detail::device_task_handle_type { + using base_type = detail::device_task_handle_type; + + static constexpr const ttg::ExecutionSpace space = ES; /// these are members mandated by the promise_type concept ///@{ - using promise_type = detail::device_task_promise_type; + using promise_type = detail::device_task_promise_type; ///@} @@ -444,8 +449,11 @@ namespace ttg::device { * application task coroutine on the first co_yield. It subsequently * tracks the state of the task when it moves from waiting for transfers * to waiting for the submitted kernel to complete. */ + template struct device_task_promise_type { + static constexpr const ttg::ExecutionSpace space = ES; + /* do not suspend the coroutine on first invocation, we want to run * the coroutine immediately and suspend when we get the device transfers. */ @@ -463,41 +471,58 @@ namespace ttg::device { return {}; } - /* Allow co_await on a tuple */ - template - ttg::suspend_always await_transform(std::tuple &views) { - return yield_value(views); - } - template - ttg::suspend_always await_transform(detail::to_device_t&& a) { - bool need_transfer = !(TTG_IMPL_NS::register_device_memory(a.ties)); - /* TODO: are we allowed to not suspend here and launch the kernel directly? */ - m_state = ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER; - return {}; + auto await_transform(detail::to_device_t&& a) { + if constexpr (space != ttg::ExecutionSpace::Host) { + bool need_transfer = !(TTG_IMPL_NS::register_device_memory(a.ties)); + /* TODO: are we allowed to not suspend here and launch the kernel directly? */ + m_state = ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER; + return ttg::suspend_always{}; + } else { + return ttg::suspend_never{}; // host never suspends + } } template auto await_transform(detail::wait_kernel_t&& a) { - //std::cout << "yield_value: wait_kernel_t" << std::endl; - if constexpr (sizeof...(Ts) > 0) { - TTG_IMPL_NS::mark_device_out(a.ties); + if constexpr (space != ttg::ExecutionSpace::Host) { + if constexpr (sizeof...(Ts) > 0) { + TTG_IMPL_NS::mark_device_out(a.ties); + } + m_state = ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL; + return ttg::suspend_always{}; + } else { + return ttg::suspend_never{}; // host never suspends } - m_state = ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL; - return a; } ttg::suspend_always await_transform(std::vector&& v) { - m_sends = std::forward>(v); - m_state = ttg::device::detail::TTG_DEVICE_CORO_SENDOUT; + if constexpr (space != ttg::ExecutionSpace::Host) { + m_sends = std::move(v); + m_state = ttg::device::detail::TTG_DEVICE_CORO_SENDOUT; + } else { + /* execute second part of sends immediately and never suspend */ + for (auto& send : v) { + send.coro(); + } + v.clear(); + } return {}; + // unreachable + throw std::runtime_error("Returned after sending!"); } ttg::suspend_always await_transform(device::detail::send_t&& v) { - m_sends.clear(); - m_sends.push_back(std::forward(v)); - m_state = ttg::device::detail::TTG_DEVICE_CORO_SENDOUT; + if constexpr (space != ttg::ExecutionSpace::Host) { + m_sends.clear(); + m_sends.push_back(std::move(v)); + m_state = ttg::device::detail::TTG_DEVICE_CORO_SENDOUT; + } else { + v.coro(); + } return {}; + // unreachable + throw std::runtime_error("Returned after sending!"); } void return_void() { @@ -508,7 +533,9 @@ namespace ttg::device { return m_state == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; } - ttg::device::Task get_return_object() { return {detail::device_task_handle_type::from_promise(*this)}; } + ttg::device::Task get_return_object() { + return {detail::device_task_handle_type::from_promise(*this)}; + } void unhandled_exception() { std::cerr << "Task coroutine caught an unhandled exception!" << std::endl; @@ -532,108 +559,25 @@ namespace ttg::device { private: std::vector m_sends; ttg_device_coro_state m_state = ttg::device::detail::TTG_DEVICE_CORO_STATE_NONE; - }; + template + struct is_device_task : std::false_type { }; + template + struct is_device_task> : std::true_type { }; + template + constexpr bool is_device_task_v = is_device_task::value; + } // namespace detail - bool Task::completed() { return base_type::promise().state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; } + template + bool Task::completed() { + return base_type::promise().state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; + } struct device_wait_kernel { }; - - /* NOTE: below is preliminary for reductions on the device, which is not available yet */ -#if 0 - /************************** - * Device reduction coros * - **************************/ - - struct device_reducer_promise_type; - - using device_reducer_handle_type = ttg::coroutine_handle; - - /// task that can be resumed after some events occur - struct device_reducer : public device_reducer_handle_type { - using base_type = device_reducer_handle_type; - - /// these are members mandated by the promise_type concept - ///@{ - - using promise_type = device_reducer_promise_type; - - ///@} - - device_reducer(base_type base) : base_type(std::move(base)) {} - - base_type& handle() { return *this; } - - /// @return true if ready to resume - inline bool ready() { - return true; - } - - /// @return true if task completed and can be destroyed - inline bool completed(); - }; - - - /* The promise type that stores the views provided by the - * application task coroutine on the first co_yield. It subsequently - * tracks the state of the task when it moves from waiting for transfers - * to waiting for the submitted kernel to complete. */ - struct device_reducer_promise_type { - - /* do not suspend the coroutine on first invocation, we want to run - * the coroutine immediately and suspend when we get the device transfers. - */ - ttg::suspend_never initial_suspend() { - m_state = ttg::device::detail::TTG_DEVICE_CORO_INIT; - return {}; - } - - /* suspend the coroutine at the end of the execution - * so we can access the promise. - * TODO: necessary? maybe we can save one suspend here - */ - ttg::suspend_always final_suspend() noexcept { - m_state = ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; - return {}; - } - - template - ttg::suspend_always await_transform(detail::to_device_t&& a) { - bool need_transfer = !(TTG_IMPL_NS::register_device_memory(a.ties)); - /* TODO: are we allowed to not suspend here and launch the kernel directly? */ - m_state = ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER; - return {}; - } - - void return_void() { - m_state = ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; - } - - bool complete() const { - return m_state == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; - } - - device_reducer get_return_object() { return device_reducer{device_reducer_handle_type::from_promise(*this)}; } - - void unhandled_exception() { } - - auto state() { - return m_state; - } - - - private: - ttg::device::detail::ttg_device_coro_state m_state = ttg::device::detail::TTG_DEVICE_CORO_STATE_NONE; - - }; - - bool device_reducer::completed() { return base_type::promise().state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; } -#endif // 0 - } // namespace ttg::device #endif // TTG_HAVE_COROUTINE diff --git a/ttg/ttg/madness/ttg.h b/ttg/ttg/madness/ttg.h index 85dee437f..555fb7cf7 100644 --- a/ttg/ttg/madness/ttg.h +++ b/ttg/ttg/madness/ttg.h @@ -344,20 +344,26 @@ namespace ttg_madness { // ttg::print("starting task"); if constexpr (!ttg::meta::is_void_v && !ttg::meta::is_empty_tuple_v) { TTG_PROCESS_TT_OP_RETURN( + ttg::ExecutionSpace::Host, suspended_task_address, coroutine_id, derived->op(key, this->make_input_refs(), derived->output_terminals)); // !!! NOTE converting input values to refs } else if constexpr (!ttg::meta::is_void_v && ttg::meta::is_empty_tuple_v) { - TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(key, derived->output_terminals)); + TTG_PROCESS_TT_OP_RETURN( + ttg::ExecutionSpace::Host, suspended_task_address, + coroutine_id, derived->op(key, derived->output_terminals)); } else if constexpr (ttg::meta::is_void_v && !ttg::meta::is_empty_tuple_v) { TTG_PROCESS_TT_OP_RETURN( + ttg::ExecutionSpace::Host, suspended_task_address, coroutine_id, derived->op(this->make_input_refs(), derived->output_terminals)); // !!! NOTE converting input values to refs } else if constexpr (ttg::meta::is_void_v && ttg::meta::is_empty_tuple_v) { - TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(derived->output_terminals)); + TTG_PROCESS_TT_OP_RETURN( + ttg::ExecutionSpace::Host, suspended_task_address, + coroutine_id, derived->op(derived->output_terminals)); } else // unreachable ttg::abort(); } else { // resume suspended coroutine diff --git a/ttg/ttg/make_tt.h b/ttg/ttg/make_tt.h index d3912576a..9b9b3d450 100644 --- a/ttg/ttg/make_tt.h +++ b/ttg/ttg/make_tt.h @@ -3,6 +3,39 @@ #ifndef TTG_MAKE_TT_H #define TTG_MAKE_TT_H +namespace detail { + template + struct op_return_type { + using type = void; + }; + +#ifdef TTG_HAVE_COROUTINE + template<> + struct op_return_type { + using type = ttg::coroutine_handle; + }; + + template + struct op_return_type> { + using type = typename ttg::device::Task::base_type; + }; +#endif // TTG_HAVE_COROUTINE + + template + using op_return_type_t = typename op_return_type::type; + + template + struct op_execution_space : std::integral_constant + { }; + + template + struct op_execution_space> : std::integral_constant + { }; + + template + constexpr const ttg::ExecutionSpace op_execution_space_v = op_execution_space::value; + +} // namespace detail // Class to wrap a callable with signature // @@ -11,12 +44,12 @@ // // returnT is void for funcT = synchronous (ordinary) function and the appropriate return type for funcT=coroutine template class CallableWrapTT : public TT< keyT, output_terminalsT, - CallableWrapTT, + CallableWrapTT, ttg::typelist> { using baseT = typename CallableWrapTT::ttT; @@ -27,27 +60,13 @@ class CallableWrapTT using noref_funcT = std::remove_reference_t; std::conditional_t, std::add_pointer_t, noref_funcT> func; - - using op_return_type = -#ifdef TTG_HAVE_COROUTINE - std::conditional_t, - ttg::coroutine_handle, -#ifdef TTG_HAVE_DEVICE - std::conditional_t, - ttg::device::Task::base_type, - void> -#else // TTG_HAVE_DEVICE - void -#endif // TTG_HAVE_DEVICE - >; -#else // TTG_HAVE_COROUTINE - void; -#endif // TTG_HAVE_COROUTINE + static_assert(!ttg::device::detail::is_device_task_v); + using op_return_type = detail::op_return_type_t; public: - static constexpr bool have_cuda_op = (space == ttg::ExecutionSpace::CUDA); - static constexpr bool have_hip_op = (space == ttg::ExecutionSpace::HIP); - static constexpr bool have_level_zero_op = (space == ttg::ExecutionSpace::L0); + static constexpr bool have_cuda_op = (Space == ttg::ExecutionSpace::CUDA); + static constexpr bool have_hip_op = (Space == ttg::ExecutionSpace::HIP); + static constexpr bool have_level_zero_op = (Space == ttg::ExecutionSpace::L0); protected: @@ -66,20 +85,12 @@ class CallableWrapTT coro_handle = ret; } return coro_handle; - } else -#ifdef TTG_HAVE_DEVICE - if constexpr (std::is_same_v) { - ttg::device::Task::base_type coro_handle = ret; + } else if constexpr (ttg::device::detail::is_device_task_v) { + typename returnT::base_type coro_handle = ret; return coro_handle; } -#else // TTG_HAVE_DEVICE - ttg::abort(); // should not happen -#endif // TTG_HAVE_DEVICE if constexpr (!(std::is_same_v -#ifdef TTG_HAVE_DEVICE - || std::is_same_v -#endif // TTG_HAVE_DEVICE - )) + || ttg::device::detail::is_device_task_v)) #endif { static_assert(std::tuple_size_v> == 1, @@ -490,8 +501,7 @@ auto make_tt_tpl(funcT &&func, const std::tuple auto make_tt(funcT &&func, const std::tuple...> &inedges = std::tuple<>{}, const std::tuple &outedges = std::tuple<>{}, const std::string &name = "wrapper", @@ -529,6 +539,8 @@ auto make_tt(funcT &&func, const std::tuple. "signature of func " "is faulty, or inedges does match the expected list of types, or both"); + constexpr const ttg::ExecutionSpace space = detail::op_execution_space_v; + // net argument typelist using func_args_t = ttg::meta::drop_void_t; constexpr auto num_args = std::tuple_size_v; @@ -573,15 +585,6 @@ auto make_tt(funcT &&func, const std::tuple. return std::make_unique(std::forward(func), inedges, outedges, name, innames, outnames); } -template -auto make_tt(funcT &&func, const std::tuple...> &inedges = std::tuple<>{}, - const std::tuple &outedges = std::tuple<>{}, const std::string &name = "wrapper", - const std::vector &innames = std::vector(sizeof...(input_edge_valuesT), "input"), - const std::vector &outnames = std::vector(sizeof...(output_edgesT), "output")) { - return make_tt(std::forward(func), inedges, outedges, name, innames, outnames); -} - template [[deprecated("use make_tt_tpl instead")]] inline auto wrapt( funcT &&func, const std::tuple...> &inedges, diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index 2fc62bd91..aec57a8af 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -1398,7 +1398,7 @@ namespace ttg_parsec { task_t *task = (task_t*)gpu_task->ec; // get the device task from the coroutine handle - ttg::device::Task dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); + auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); task->dev_ptr->stream = gpu_stream; @@ -1445,7 +1445,7 @@ namespace ttg_parsec { int rc = PARSEC_HOOK_RETURN_DONE; if (nullptr != task->suspended_task_address) { /* Get a new handle for the promise*/ - dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); + dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); dev_data = dev_task.promise(); assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL || @@ -1594,10 +1594,10 @@ namespace ttg_parsec { } // get the device task from the coroutine handle - auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); + auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); // get the promise which contains the views - ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise(); + ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise(); /* for now make sure we're waiting for transfers and the coro hasn't skipped this step */ assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER); @@ -1675,14 +1675,14 @@ namespace ttg_parsec { if constexpr (!ttg::meta::is_void_v && !ttg::meta::is_empty_tuple_v) { auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence{}); - TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op(task->key, std::move(input), obj->output_terminals)); + TTG_PROCESS_TT_OP_RETURN(Space, suspended_task_address, task->coroutine_id, baseobj->template op(task->key, std::move(input), obj->output_terminals)); } else if constexpr (!ttg::meta::is_void_v && ttg::meta::is_empty_tuple_v) { - TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op(task->key, obj->output_terminals)); + TTG_PROCESS_TT_OP_RETURN(Space, suspended_task_address, task->coroutine_id, baseobj->template op(task->key, obj->output_terminals)); } else if constexpr (ttg::meta::is_void_v && !ttg::meta::is_empty_tuple_v) { auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence{}); - TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op(std::move(input), obj->output_terminals)); + TTG_PROCESS_TT_OP_RETURN(Space, suspended_task_address, task->coroutine_id, baseobj->template op(std::move(input), obj->output_terminals)); } else if constexpr (ttg::meta::is_void_v && ttg::meta::is_empty_tuple_v) { - TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op(obj->output_terminals)); + TTG_PROCESS_TT_OP_RETURN(Space, suspended_task_address, task->coroutine_id, baseobj->template op(obj->output_terminals)); } else { ttg::abort(); } @@ -1693,50 +1693,47 @@ namespace ttg_parsec { #ifdef TTG_HAVE_COROUTINE assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid); -#ifdef TTG_HAVE_DEVICE if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) { - ttg::device::Task coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address); + auto coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address); assert(detail::parsec_ttg_caller == nullptr); detail::parsec_ttg_caller = static_cast(task); // TODO: unify the outputs tls handling auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor(); task->tt->set_outputs_tls_ptr(); coro.resume(); - if (coro.completed()) { + if (coro.done()) { coro.destroy(); suspended_task_address = nullptr; } task->tt->set_outputs_tls_ptr(old_output_tls_ptr); detail::parsec_ttg_caller = nullptr; - } else -#endif // TTG_HAVE_DEVICE - if (task->coroutine_id == ttg::TaskCoroutineID::ResumableTask) { - auto ret = static_cast(ttg::coroutine_handle::from_address(suspended_task_address)); - assert(ret.ready()); - auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor(); - task->tt->set_outputs_tls_ptr(); - ret.resume(); - if (ret.completed()) { - ret.destroy(); - suspended_task_address = nullptr; - } - else { // not yet completed - // leave suspended_task_address as is - - // right now can events are not properly implemented, we are only testing the workflow with dummy events - // so mark the events finished manually, parsec will rerun this task again and it should complete the second time - auto events = static_cast(ttg::coroutine_handle::from_address(suspended_task_address)).events(); - for (auto &event_ptr : events) { - event_ptr->finish(); + } else if (task->coroutine_id == ttg::TaskCoroutineID::ResumableTask) { + auto ret = static_cast(ttg::coroutine_handle::from_address(suspended_task_address)); + assert(ret.ready()); + auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor(); + task->tt->set_outputs_tls_ptr(); + ret.resume(); + if (ret.done()) { + ret.destroy(); + suspended_task_address = nullptr; } - assert(ttg::coroutine_handle::from_address(suspended_task_address).promise().ready()); + else { // not yet completed + // leave suspended_task_address as is + + // right now can events are not properly implemented, we are only testing the workflow with dummy events + // so mark the events finished manually, parsec will rerun this task again and it should complete the second time + auto events = static_cast(ttg::coroutine_handle::from_address(suspended_task_address)).events(); + for (auto &event_ptr : events) { + event_ptr->finish(); + } + assert(ttg::coroutine_handle::from_address(suspended_task_address).promise().ready()); + } + task->tt->set_outputs_tls_ptr(old_output_tls_ptr); + detail::parsec_ttg_caller = nullptr; + task->suspended_task_address = suspended_task_address; } - task->tt->set_outputs_tls_ptr(old_output_tls_ptr); - detail::parsec_ttg_caller = nullptr; - task->suspended_task_address = suspended_task_address; - } - else - ttg::abort(); // unrecognized task id + else + ttg::abort(); // unrecognized task id #else // TTG_HAVE_COROUTINE ttg::abort(); // should not happen #endif // TTG_HAVE_COROUTINE @@ -1774,9 +1771,9 @@ namespace ttg_parsec { assert(detail::parsec_ttg_caller == NULL); detail::parsec_ttg_caller = task; if constexpr (!ttg::meta::is_void_v) { - TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op(task->key, obj->output_terminals)); + TTG_PROCESS_TT_OP_RETURN(Space, suspended_task_address, task->coroutine_id, baseobj->template op(task->key, obj->output_terminals)); } else if constexpr (ttg::meta::is_void_v) { - TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op(obj->output_terminals)); + TTG_PROCESS_TT_OP_RETURN(Space, suspended_task_address, task->coroutine_id, baseobj->template op(obj->output_terminals)); } else // unreachable ttg:: abort(); detail::parsec_ttg_caller = NULL; @@ -2039,7 +2036,7 @@ namespace ttg_parsec { detail::parsec_ttg_caller = parsec_ttg_caller_save; /* release the dummy task */ - complete_task_and_release(es, &dummy->parsec_task); + complete_task_and_release(es, &dummy->parsec_task); parsec_thread_mempool_free(mempool, &dummy->parsec_task); } @@ -3335,16 +3332,19 @@ namespace ttg_parsec { } } - void copy_mark_pushout(detail::ttg_data_copy_t *copy) { + void copy_mark_pushout(detail::parsec_ttg_task_base_t *caller, detail::ttg_data_copy_t *copy) { - assert(detail::parsec_ttg_caller->dev_ptr && detail::parsec_ttg_caller->dev_ptr->gpu_task); - parsec_gpu_task_t *gpu_task = detail::parsec_ttg_caller->dev_ptr->gpu_task; + assert(caller->dev_ptr && caller->dev_ptr->gpu_task); + parsec_gpu_task_t *gpu_task = caller->dev_ptr->gpu_task; + if (nullptr == gpu_task->flow[0]) { + return; // this task has no flows because we skipped flow creation for host tasks + } auto check_parsec_data = [&](parsec_data_t* data) { if (data->owner_device != 0) { /* find the flow */ int flowidx = 0; while (flowidx < MAX_PARAM_COUNT && - gpu_task->flow[flowidx]->flow_flags != PARSEC_FLOW_ACCESS_NONE) { + gpu_task->flow[flowidx]->flow_flags != PARSEC_FLOW_ACCESS_NONE) { if (detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in->original == data) { /* found the right data, set the corresponding flow as pushout */ break; @@ -3370,22 +3370,25 @@ namespace ttg_parsec { /* check whether a data needs to be pushed out */ template - std::enable_if_t>, - void> + std::enable_if_t>, void> do_prepare_send(const Value &value, RemoteCheckFn&& remote_check) { using valueT = std::tuple_element_t; static constexpr const bool value_is_const = std::is_const_v; + detail::parsec_ttg_task_base_t *caller = detail::parsec_ttg_caller; + + // host tasks have no dev_ptr + if (nullptr == caller->dev_ptr) return; + /* get the copy */ detail::ttg_data_copy_t *copy; - copy = detail::find_copy_in_task(detail::parsec_ttg_caller, &value); + copy = detail::find_copy_in_task(caller, &value); /* if there is no copy we don't need to prepare anything */ if (nullptr == copy) { return; } - detail::parsec_ttg_task_base_t *caller = detail::parsec_ttg_caller; bool need_pushout = false; if (caller->data_flags & detail::ttg_parsec_data_flags::MARKED_PUSHOUT) { @@ -3397,7 +3400,7 @@ namespace ttg_parsec { auto &reducer = std::get(input_reducers); if (reducer) { /* reductions are currently done only on the host so push out */ - copy_mark_pushout(copy); + copy_mark_pushout(caller, copy); caller->data_flags |= detail::ttg_parsec_data_flags::MARKED_PUSHOUT; return; } @@ -3406,6 +3409,9 @@ namespace ttg_parsec { if (caller->data_flags & detail::ttg_parsec_data_flags::IS_MODIFIED) { /* The data has been modified previously. PaRSEC requires us to pushout * data if we transition from a writer to one or more readers. */ + /** + * TODO: check if there is only one device and avoid the pushout! + */ need_pushout = true; } @@ -3455,7 +3461,7 @@ namespace ttg_parsec { } if (need_pushout) { - copy_mark_pushout(copy); + copy_mark_pushout(caller, copy); caller->data_flags |= detail::ttg_parsec_data_flags::MARKED_PUSHOUT; } } @@ -3727,6 +3733,7 @@ namespace ttg_parsec { junk[0]++; } + template static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *parsec_task) { //std::cout << "complete_task_and_release: task " << parsec_task << std::endl; @@ -3744,7 +3751,7 @@ namespace ttg_parsec { //increment_data_versions(task, std::make_index_sequence>{}); // get the device task from the coroutine handle - auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); + auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); // get the promise which contains the views auto dev_data = dev_task.promise(); @@ -3842,7 +3849,6 @@ namespace ttg_parsec { world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_castnb_task_classes)>(self.task_class_id+1)); // function_id_to_instance[self.task_class_id] = this; //self.incarnations = incarnations_array.data(); -//#if 0 if constexpr (derived_has_cuda_op()) { self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t)); ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA; @@ -3851,6 +3857,7 @@ namespace ttg_parsec { ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE; ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL; ((__parsec_chore_t *)self.incarnations)[1].hook = NULL; + self.complete_execution = complete_task_and_release; } else if constexpr (derived_has_hip_op()) { self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t)); ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_HIP; @@ -3860,6 +3867,7 @@ namespace ttg_parsec { ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE; ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL; ((__parsec_chore_t *)self.incarnations)[1].hook = NULL; + self.complete_execution = complete_task_and_release; #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) } else if constexpr (derived_has_level_zero_op()) { self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t)); @@ -3870,6 +3878,7 @@ namespace ttg_parsec { ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE; ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL; ((__parsec_chore_t *)self.incarnations)[1].hook = NULL; + self.complete_execution = complete_task_and_release; #endif // PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT } else { self.incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t)); @@ -3879,11 +3888,10 @@ namespace ttg_parsec { ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE; ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL; ((__parsec_chore_t *)self.incarnations)[1].hook = NULL; + self.complete_execution = complete_task_and_release; } -//#endif // 0 self.release_task = &parsec_release_task_to_mempool_update_nbtasks; - self.complete_execution = complete_task_and_release; for (i = 0; i < MAX_PARAM_COUNT; i++) { parsec_flow_t *flow = new parsec_flow_t; diff --git a/ttg/ttg/tt.h b/ttg/ttg/tt.h index 1ca33fe9e..dea32a994 100644 --- a/ttg/ttg/tt.h +++ b/ttg/ttg/tt.h @@ -178,31 +178,31 @@ namespace ttg { #ifndef TTG_PROCESS_TT_OP_RETURN #ifdef TTG_HAVE_COROUTINE -#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke) \ - { \ - using return_type = decltype(invoke); \ - if constexpr (std::is_same_v) { \ - invoke; \ - id = ttg::TaskCoroutineID::Invalid; \ - } else { \ - auto coro_return = invoke; \ - static_assert(std::is_same_v || \ - std::is_base_of_v, decltype(coro_return)>|| \ - std::is_base_of_v, \ - decltype(coro_return)>); \ - if constexpr (std::is_base_of_v, decltype(coro_return)>) \ - id = ttg::TaskCoroutineID::ResumableTask; \ - else if constexpr (std::is_base_of_v< \ - ttg::coroutine_handle, \ - decltype(coro_return)>) \ - id = ttg::TaskCoroutineID::DeviceTask; \ - else \ - std::abort(); \ - result = coro_return.address(); \ - } \ +#define TTG_PROCESS_TT_OP_RETURN(Space, result, id, invoke) \ + { \ + using return_type = decltype(invoke); \ + if constexpr (std::is_same_v) { \ + invoke; \ + id = ttg::TaskCoroutineID::Invalid; \ + } else { \ + auto coro_return = invoke; \ + static_assert(std::is_same_v || \ + std::is_base_of_v, decltype(coro_return)>|| \ + std::is_base_of_v>, \ + decltype(coro_return)>); \ + if constexpr (std::is_base_of_v, decltype(coro_return)>) \ + id = ttg::TaskCoroutineID::ResumableTask; \ + else if constexpr (std::is_base_of_v< \ + ttg::coroutine_handle>, \ + decltype(coro_return)>) \ + id = ttg::TaskCoroutineID::DeviceTask; \ + else \ + std::abort(); \ + result = coro_return.address(); \ + } \ } #else -#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke) invoke +#define TTG_PROCESS_TT_OP_RETURN(Space, result, id, invoke) invoke #endif #else #error "TTG_PROCESS_TT_OP_RETURN already defined in ttg/tt.h, check your header guards" From 91e1d3d220e1945d9d6321f2442294cc427440a0 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Thu, 7 Nov 2024 21:54:18 -0500 Subject: [PATCH 2/8] POTRF: Fix allocator type when TiledArray is not available Signed-off-by: Joseph Schuchart --- examples/matrixtile.h | 8 +++++--- examples/potrf/potrf.h | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/examples/matrixtile.h b/examples/matrixtile.h index a759b1c3c..2d1e1f30e 100644 --- a/examples/matrixtile.h +++ b/examples/matrixtile.h @@ -12,7 +12,8 @@ #include #if defined(TILEDARRAY_HAS_DEVICE) -#define ALLOCATOR TiledArray::device_pinned_allocator +template +using default_allocator_t = TiledArray::device_pinned_allocator; inline void allocator_init(int argc, char **argv) { // initialize MADNESS so that TA allocators can be created @@ -26,7 +27,8 @@ inline void allocator_fini() { madness::finalize(); } #else // TILEDARRAY_HAS_DEVICE -#define ALLOCATOR std::allocator +template +using default_allocator_t = std::allocator; inline void allocator_init(int argc, char **argv) { } @@ -34,7 +36,7 @@ inline void allocator_fini() { } #endif // TILEDARRAY_HAS_DEVICE -template +template > class MatrixTile : public ttg::TTValue> { public: using metadata_t = typename std::tuple; diff --git a/examples/potrf/potrf.h b/examples/potrf/potrf.h index 894064b54..56d4382d8 100644 --- a/examples/potrf/potrf.h +++ b/examples/potrf/potrf.h @@ -11,7 +11,7 @@ #define ENABLE_DEVICE_KERNEL 1 #endif -#if defined(TTG_HAVE_CUDART) +#if defined(TTG_ENABLE_CUDA) #define ES ttg::ExecutionSpace::CUDA #include #elif defined(TTG_ENABLE_HIP) @@ -97,7 +97,7 @@ namespace potrf { ttg::Edge>& output_result) { using T = typename MatrixT::element_type; #if defined(ENABLE_DEVICE_KERNEL) - auto iallocator = std::make_shared>(); + auto iallocator = std::make_shared>(); //std::cout << "Creating CUDA POTRF task " << std::endl; auto f_dev = [=, iallocator = std::move(iallocator)] (const Key1& key, MatrixTile&& tile_kk, From 9472a4027fcf2a88e099e99b66045b908b0f98ac Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 8 Nov 2024 09:15:22 -0500 Subject: [PATCH 3/8] Add scratch to madness backend Signed-off-by: Joseph Schuchart --- ttg/ttg/madness/devicescratch.h | 85 +++++++++++++++++++++++++++++++++ ttg/ttg/madness/ttg.h | 2 + 2 files changed, 87 insertions(+) create mode 100644 ttg/ttg/madness/devicescratch.h diff --git a/ttg/ttg/madness/devicescratch.h b/ttg/ttg/madness/devicescratch.h new file mode 100644 index 000000000..6965f7d82 --- /dev/null +++ b/ttg/ttg/madness/devicescratch.h @@ -0,0 +1,85 @@ +#ifndef TTG_MADNESS_DEVICESCRATCH_H +#define TTG_MADNESS_DEVICESCRATCH_H + +#include + +namespace ttg_madness { + +/** + * Scratch-space for task-local variables. + * TTG will allocate memory on the device + * and transfer data in and out based on the scope. + */ +template +struct devicescratch { + + using element_type = std::decay_t; + + static_assert(std::is_trivially_copyable_v, + "Only trivially copyable types are supported for devices."); + static_assert(std::is_default_constructible_v, + "Only default constructible types are supported for devices."); + +private: + + element_type* m_data = nullptr; + std::size_t m_count = 0; + ttg::scope m_scope; + +public: + + /* Constructing a devicescratch using application-managed memory. + * The memory pointed to by ptr must be accessible during + * the life-time of the devicescratch. */ + devicescratch(element_type* ptr, ttg::scope scope = ttg::scope::SyncIn, std::size_t count = 1) + : m_data(ptr) + , m_count(count) + , m_scope(scope) + { } + + /* don't allow moving */ + devicescratch(devicescratch&&) = delete; + + /* don't allow copying */ + devicescratch(const devicescratch& db) = delete; + + /* don't allow moving */ + devicescratch& operator=(devicescratch&&) = delete; + + /* don't allow copying */ + devicescratch& operator=(const devicescratch& db) = delete; + + ~devicescratch() { + m_data = nullptr; + m_count = 0; + } + + /* get the current device pointer (only host supported) */ + element_type* device_ptr() { + assert(is_valid()); + return m_data; + } + + /* get the current device pointer */ + const element_type* device_ptr() const { + assert(is_valid()); + return m_data; + } + + bool is_valid() const { + return true; + } + + ttg::scope scope() const { + return m_scope; + } + + std::size_t size() const { + return m_count; + } + +}; + +} // namespace ttg_madness + +#endif // TTG_MADNESS_DEVICESCRATCH_H \ No newline at end of file diff --git a/ttg/ttg/madness/ttg.h b/ttg/ttg/madness/ttg.h index 555fb7cf7..41234aa5f 100644 --- a/ttg/ttg/madness/ttg.h +++ b/ttg/ttg/madness/ttg.h @@ -13,7 +13,9 @@ #include "ttg/base/keymap.h" #include "ttg/base/tt.h" #include "ttg/func.h" +#include "ttg/madness/buffer.h" #include "ttg/madness/device.h" +#include "ttg/madness/devicescratch.h" #include "ttg/runtimes.h" #include "ttg/tt.h" #include "ttg/util/bug.h" From a5f815a2643abdaf9645539cfb2f1b36d13340fe Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 11 Nov 2024 10:40:38 -0500 Subject: [PATCH 4/8] Provide a valid device ID for host devices Device ID alone does not uniquely identify a device. The host always has ID 0. Signed-off-by: Joseph Schuchart --- ttg/ttg/device/device.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/ttg/ttg/device/device.h b/ttg/ttg/device/device.h index 244e9c944..616503bd1 100644 --- a/ttg/ttg/device/device.h +++ b/ttg/ttg/device/device.h @@ -32,9 +32,6 @@ namespace ttg::device { { } int id() const { - if (is_host()) { - throw std::runtime_error("No valid ID for Host execution space!"); - } if (is_invalid()) { throw std::runtime_error("Invalid execution space!"); } From 9d348dfe3926d6fa2eb0e3d68eb408a2e8da25d9 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 11 Nov 2024 10:43:59 -0500 Subject: [PATCH 5/8] Destroy host task coroutine handle when suspending Host tasks only suspend at the very end. We perfom all communication and then destroy the coroutine handle because there is no reason to keep it around. This may enable compiler optimizations and enables backends that otherwise do not handle device tasks to work with host-enabled device tasks. Yes, this needs renaming. Signed-off-by: Joseph Schuchart --- ttg/ttg/device/task.h | 10 ++++++++-- ttg/ttg/madness/ttg.h | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/ttg/ttg/device/task.h b/ttg/ttg/device/task.h index a772ad73d..a9d1be314 100644 --- a/ttg/ttg/device/task.h +++ b/ttg/ttg/device/task.h @@ -52,8 +52,14 @@ namespace ttg::device { constexpr bool await_ready() const noexcept { return false; } /* always suspend */ - template - constexpr void await_suspend(ttg::coroutine_handle) const noexcept {} + template + constexpr void await_suspend(ttg::coroutine_handle> handle) const noexcept { + if constexpr (ES == ttg::ExecutionSpace::Host) { + /* Destroy the coroutine handle. This allows the backends that are not coroutine-aware + * to execute co-routine tasks on the host. */ + handle.destroy(); + } + } void await_resume() noexcept { if constexpr (sizeof...(Ts) > 0) { diff --git a/ttg/ttg/madness/ttg.h b/ttg/ttg/madness/ttg.h index 41234aa5f..d04bb0e7d 100644 --- a/ttg/ttg/madness/ttg.h +++ b/ttg/ttg/madness/ttg.h @@ -392,7 +392,7 @@ namespace ttg_madness { // } #ifdef TTG_HAVE_COROUTINE - if (suspended_task_address) { + if (suspended_task_address && coroutine_id != ttg::TaskCoroutineID::DeviceTask) { // TODO implement handling of suspended coroutines properly // only resumable_task is recognized at the moment From 2ab164b00f5a419532841d7005043e7a735a4f13 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 11 Nov 2024 10:44:39 -0500 Subject: [PATCH 6/8] Remove unused code from POTRF Signed-off-by: Joseph Schuchart --- examples/potrf/potrf.h | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/examples/potrf/potrf.h b/examples/potrf/potrf.h index 56d4382d8..ff75f3501 100644 --- a/examples/potrf/potrf.h +++ b/examples/potrf/potrf.h @@ -50,12 +50,8 @@ namespace potrf { } static void device_potrf(MatrixTile &A, double *workspace, int Lwork, int *devInfo) { - int device = ttg::device::current_device(); - assert(device >= 0); #if defined(TTG_ENABLE_CUDA) - //std::cout << "POTRF A " << A.buffer().device_ptr_on(device) << " device " << device << " cols " << A.cols() << " lda " << A.lda() << " Lwork " << Lwork << " WS " << workspace << " devInfo " << devInfo << std::endl; auto handle = cusolver_handle(); - //std::cout << "POTRF handle " << handle << " device " << device << " stream " << ttg::device::current_stream() << std::endl; cusolverDnDpotrf(handle, CUBLAS_FILL_MODE_LOWER, A.cols(), A.buffer().current_device_ptr(), A.lda(), @@ -68,8 +64,7 @@ namespace potrf { workspace, Lwork, devInfo); #else - auto info = lapack::potrf(lapack::Uplo::Lower, A.rows(), A.buffer().current_device_ptr(), A.lda()); - assert(info == 0); + *devInfo = lapack::potrf(lapack::Uplo::Lower, A.rows(), A.buffer().current_device_ptr(), A.lda()); #endif } @@ -150,13 +145,6 @@ namespace potrf { co_await ttg::device::select(tile_kk.buffer(), devWS, devInfo); #endif // DEBUG_TILES_VALUES - int device = ttg::device::current_device(); - //std::cout << "POTRF [" << K << "] on " << device << std::endl; - - - //std::cout << "devWS host ptr " << hostWS << " device ptr " << devWS.device_ptr() << " size " << devWS.size() - // << " devInfo host ptr " << hostInfo << " device ptr " << devInfo.device_ptr() << "size " << devInfo.size() << std::endl; - /* everything is on the device, call the POTRF */ device_potrf(tile_kk, devWS.device_ptr(), Lwork, devInfo.device_ptr()); @@ -279,7 +267,6 @@ namespace potrf { co_await ttg::device::select(tile_kk.buffer(), tile_mk.buffer()); #endif // DEBUG_TILES_VALUES - int device = ttg::device::current_device(); double alpha = 1.0; #ifdef DEBUG_TILES_VALUES @@ -288,9 +275,6 @@ namespace potrf { device_norm(tile_mk, &norms[1]); #endif // DEBUG_TILES_VALUES - - //std::cout << "TRSM [" << K << ", " << M << "] on " << device << std::endl; - #if defined(TTG_ENABLE_CUDA) cublasDtrsm(cublas_handle(), CUBLAS_SIDE_RIGHT, CUBLAS_FILL_MODE_LOWER, @@ -417,8 +401,6 @@ namespace potrf { co_await ttg::device::select(tile_kk.buffer(), tile_mk.buffer()); #endif // DEBUG_TILES_VALUES - int device = ttg::device::current_device(); - double alpha = -1.0; double beta = 1.0; @@ -547,7 +529,6 @@ namespace potrf { co_await ttg::device::select(tile_mk.buffer(), tile_nk.buffer(), tile_mn.buffer()); #endif // DEBUG_TILES_VALUES - int device = ttg::device::current_device(); double alpha = -1.0; double beta = 1.0; From ffb0673d811bc426307673ce93dd9afb5d1a29c4 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 11 Nov 2024 12:57:17 -0500 Subject: [PATCH 7/8] Add madness/devicescratch.h to cmake install set Signed-off-by: Joseph Schuchart --- ttg/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/ttg/CMakeLists.txt b/ttg/CMakeLists.txt index 272f005ab..66ccafc92 100644 --- a/ttg/CMakeLists.txt +++ b/ttg/CMakeLists.txt @@ -209,6 +209,7 @@ if (TARGET MADworld) set(ttg-mad-headers ${CMAKE_CURRENT_SOURCE_DIR}/ttg/madness/buffer.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/madness/device.h + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/madness/devicescratch.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/madness/fwd.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/madness/import.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/madness/ttg.h From 310f97c233a0742774d867ab72612db44f3f539f Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 20 Dec 2024 15:21:39 -0500 Subject: [PATCH 8/8] Rename ttg::device::Task to ttg::CoTask This was decided at the December '24 EPEXA meeting. We need to figure out what to do with the existing resumable task. Signed-off-by: Joseph Schuchart --- examples/potrf/potrf.h | 8 +- examples/spmm/spmm_cuda.cc | 2 +- examples/task-benchmarks/chain-ttg-dev.cc | 10 +- ttg/ttg/coroutine.h | 8 +- ttg/ttg/device/task.h | 743 +++++++++++----------- ttg/ttg/make_tt.h | 12 +- ttg/ttg/parsec/ttg.h | 30 +- ttg/ttg/tt.h | 4 +- 8 files changed, 411 insertions(+), 406 deletions(-) diff --git a/examples/potrf/potrf.h b/examples/potrf/potrf.h index ff75f3501..4cb808839 100644 --- a/examples/potrf/potrf.h +++ b/examples/potrf/potrf.h @@ -97,7 +97,7 @@ namespace potrf { auto f_dev = [=, iallocator = std::move(iallocator)] (const Key1& key, MatrixTile&& tile_kk, std::tuple>, ttg::Out>>& out) - -> ttg::device::Task { + -> ttg::CoTask { const auto K = key[0]; /* compute successors before submitting the kernel @@ -225,7 +225,7 @@ namespace potrf { #if defined(ENABLE_DEVICE_KERNEL) auto f = [=](const Key2& key, const MatrixTile& tile_kk, MatrixTile&& tile_mk, std::tuple>, ttg::Out>, ttg::Out>, - ttg::Out>>& out) -> ttg::device::Task { + ttg::Out>>& out) -> ttg::CoTask { const int M = key[0]; const int K = key[1]; // the column equals the outer most look K (same as PO) @@ -377,7 +377,7 @@ namespace potrf { using T = typename MatrixT::element_type; #if defined(ENABLE_DEVICE_KERNEL) auto f = [=](const Key2& key, const MatrixTile& tile_mk, MatrixTile&& tile_kk) - -> ttg::device::Task { + -> ttg::CoTask { const int K = key[0]; const int M = key[1]; @@ -500,7 +500,7 @@ namespace potrf { using T = typename MatrixT::element_type; #if defined(ENABLE_DEVICE_KERNEL) auto f = [=](const Key3& key, const MatrixTile& tile_mk, const MatrixTile& tile_nk, MatrixTile&& tile_mn) - -> ttg::device::Task { + -> ttg::CoTask { const int M = key[0]; const int N = key[1]; const int K = key[2]; diff --git a/examples/spmm/spmm_cuda.cc b/examples/spmm/spmm_cuda.cc index 469aef8f7..cad75b9cd 100644 --- a/examples/spmm/spmm_cuda.cc +++ b/examples/spmm/spmm_cuda.cc @@ -819,7 +819,7 @@ class SpMM25D { } } - ttg::device::Task op(const Key<3> &ijk, typename baseT::input_refs_tuple_type &&_ijk, + ttg::CoTask op(const Key<3> &ijk, typename baseT::input_refs_tuple_type &&_ijk, std::tuple, Blk>, Out, Blk>> &result) { const auto i = ijk[0]; const auto j = ijk[1]; diff --git a/examples/task-benchmarks/chain-ttg-dev.cc b/examples/task-benchmarks/chain-ttg-dev.cc index 559e32870..64bf006d9 100644 --- a/examples/task-benchmarks/chain-ttg-dev.cc +++ b/examples/task-benchmarks/chain-ttg-dev.cc @@ -59,7 +59,7 @@ auto make_ttg<1>(bool do_move) { send<0>(0, A{}); }, edges(), edges(I2N)); - auto next = make_tt([=](const int &key, auto&& value) -> ttg::device::Task { + auto next = make_tt([=](const int &key, auto&& value) -> ttg::CoTask { //++task_counter; co_await ttg::device::select(value.b); if (key < NUM_TASKS) { @@ -85,7 +85,7 @@ auto make_ttg<2>(bool do_move) { send<1>(0, A{}); }, edges(), edges(I2N1, I2N2)); - auto next = make_tt([=](const int &key, A&& v1, A&& v2) -> ttg::device::Task { + auto next = make_tt([=](const int &key, A&& v1, A&& v2) -> ttg::CoTask { co_await ttg::device::select(v1.b, v2.b); if (key < NUM_TASKS) { if (do_move) { @@ -115,7 +115,7 @@ auto make_ttg<4>(bool do_move) { send<3>(0, A{}); }, edges(), edges(I2N1, I2N2, I2N3, I2N4)); - auto next = make_tt([=](const int &key, A&& v1, A&& v2, A&& v3, A&& v4) -> ttg::device::Task { + auto next = make_tt([=](const int &key, A&& v1, A&& v2, A&& v3, A&& v4) -> ttg::CoTask { co_await ttg::device::select(v1.b, v2.b, v3.b, v4.b); if (key < NUM_TASKS) { if (do_move) { @@ -155,7 +155,7 @@ auto make_ttg<8>(bool do_move) { send<7>(0, A{}); }, edges(), edges(I2N1, I2N2, I2N3, I2N4, I2N5, I2N6, I2N7, I2N8)); - auto next = make_tt([=](const int &key, auto&& v1, auto&& v2, auto&& v3, auto&& v4, auto&& v5, auto&& v6, auto&& v7, auto&& v8) -> ttg::device::Task { + auto next = make_tt([=](const int &key, auto&& v1, auto&& v2, auto&& v3, auto&& v4, auto&& v5, auto&& v6, auto&& v7, auto&& v8) -> ttg::CoTask { co_await ttg::device::select(v1.b, v2.b, v3.b, v4.b, v5.b, v6.b, v7.b, v8.b); if (key < NUM_TASKS) { if (do_move) { @@ -192,7 +192,7 @@ auto make_ttg<0>(bool do_move) { auto init = make_tt([](std::tuple> &outs) { sendk<0>(0, outs); }, edges(), edges(I2N)); - auto next = make_tt([](const int& key) -> ttg::device::Task { + auto next = make_tt([](const int& key) -> ttg::CoTask { co_await ttg::device::select(); if (key < NUM_TASKS) { co_await ttg::device::forward(ttg::device::sendk<0>(key+1)); diff --git a/ttg/ttg/coroutine.h b/ttg/ttg/coroutine.h index 333490a22..1ae194a22 100644 --- a/ttg/ttg/coroutine.h +++ b/ttg/ttg/coroutine.h @@ -213,10 +213,10 @@ namespace ttg { ///////////////////////////////////////////////////////////////////////////// // fwd declare all coro promise types that have not been declared yet - namespace device::detail { + namespace detail { template - struct device_task_promise_type; - } // namespace device::detail + struct cotask_promise_type; + } // namespace detail /// describes all types of coroutine tasks known to TTG /// @internal only exists to simplify metaprogramming in the backend code @@ -225,7 +225,7 @@ namespace ttg { Invalid, /// -> ttg::resumable_task ResumableTask, - /// -> ttg::device::Task + /// -> ttg::CoTask DeviceTask }; diff --git a/ttg/ttg/device/task.h b/ttg/ttg/device/task.h index a9d1be314..0bc9e51be 100644 --- a/ttg/ttg/device/task.h +++ b/ttg/ttg/device/task.h @@ -11,409 +11,417 @@ #ifdef TTG_HAVE_COROUTINE -namespace ttg::device { +namespace ttg { + + /******************************************* + * Device task promise and coroutine handle + *******************************************/ namespace detail { - template - struct to_device_t { - std::tuple...> ties; - }; - } // namespace detail - - /** - * Select a device to execute on based on the provided buffer and scratchspace objects. - * Returns an object that should be awaited on using \c co_await. - * Upon resume, the device is selected (i.e., \sa ttg::device::current_device and - * \sa ttg::device::current_stream are available) and the buffers are available on the - * selected device. - */ - template - [[nodiscard]] - inline auto select(Args &&...args) { - return detail::to_device_t...>{std::tie(std::forward(args)...)}; - } + // fwd-decl + template + struct cotask_promise_type; + // base type for ttg::CoTask + template + using cotask_handle_type = ttg::coroutine_handle>; + } // namespace detail namespace detail { - enum ttg_device_coro_state { - TTG_DEVICE_CORO_STATE_NONE, - TTG_DEVICE_CORO_INIT, - TTG_DEVICE_CORO_WAIT_TRANSFER, - TTG_DEVICE_CORO_WAIT_KERNEL, - TTG_DEVICE_CORO_SENDOUT, - TTG_DEVICE_CORO_COMPLETE + enum ttg_coro_state { + TTG_CORO_STATE_NONE, + TTG_CORO_INIT, + TTG_CORO_WAIT_TRANSFER, + TTG_CORO_WAIT_KERNEL, + TTG_CORO_SENDOUT, + TTG_CORO_COMPLETE }; - template - struct wait_kernel_t { - std::tuple ties; - - /* always suspend */ - constexpr bool await_ready() const noexcept { return false; } - - /* always suspend */ - template - constexpr void await_suspend(ttg::coroutine_handle> handle) const noexcept { - if constexpr (ES == ttg::ExecutionSpace::Host) { - /* Destroy the coroutine handle. This allows the backends that are not coroutine-aware - * to execute co-routine tasks on the host. */ - handle.destroy(); - } - } + } // namespace detail - void await_resume() noexcept { - if constexpr (sizeof...(Ts) > 0) { - /* hook to allow the backend to handle the data after pushout */ - TTG_IMPL_NS::post_device_out(ties); - } - } - }; - } // namespace detail - - /** - * Wait for previously submitted kernels to complete and provided - * ttg::Buffer and ttg::devicescratch to be transferred back to host. - * Must only be called after awaiting \sa ttg::device::select has resumed. - */ - template - [[nodiscard]] - inline auto wait(Buffers &&...args) { - static_assert( - ((ttg::meta::is_buffer_v> || ttg::meta::is_devicescratch_v>) && - ...), - "Only ttg::Buffer and ttg::devicescratch can be waited on!"); - return detail::wait_kernel_t...>{std::tie(std::forward(args)...)}; - } + namespace device { + + namespace detail { + template + struct to_device_t { + std::tuple...> ties; + }; + } // namespace detail + + /** + * Select a device to execute on based on the provided buffer and scratchspace objects. + * Returns an object that should be awaited on using \c co_await. + * Upon resume, the device is selected (i.e., \sa ttg::device::current_device and + * \sa ttg::device::current_stream are available) and the buffers are available on the + * selected device. + */ + template + [[nodiscard]] + inline auto select(Args &&...args) { + return detail::to_device_t...>{std::tie(std::forward(args)...)}; + } - /****************************** - * Send/Broadcast handling - * We pass the value returned by the backend's copy handler into a coroutine - * and execute the first part (prepare), before suspending it. - * The second part (send/broadcast) is executed after the task completed. - ******************************/ + namespace detail { - namespace detail { - struct send_coro_promise_type; + template + struct wait_kernel_t { + std::tuple ties; - using send_coro_handle_type = ttg::coroutine_handle; + /* always suspend */ + constexpr bool await_ready() const noexcept { return false; } - /// a coroutine for sending data from the device - struct send_coro_state : public send_coro_handle_type { - using base_type = send_coro_handle_type; + /* always suspend */ + template + constexpr void await_suspend(ttg::coroutine_handle> handle) const noexcept { + if constexpr (ES == ttg::ExecutionSpace::Host) { + /* Destroy the coroutine handle. This allows the backends that are not coroutine-aware + * to execute co-routine tasks on the host. */ + handle.destroy(); + } + } - /// these are members mandated by the promise_type concept - ///@{ + void await_resume() noexcept { + if constexpr (sizeof...(Ts) > 0) { + /* hook to allow the backend to handle the data after pushout */ + TTG_IMPL_NS::post_device_out(ties); + } + } + }; + } // namespace detail + + /** + * Wait for previously submitted kernels to complete and provided + * ttg::Buffer and ttg::devicescratch to be transferred back to host. + * Must only be called after awaiting \sa ttg::device::select has resumed. + */ + template + [[nodiscard]] + inline auto wait(Buffers &&...args) { + static_assert( + ((ttg::meta::is_buffer_v> || ttg::meta::is_devicescratch_v>) && + ...), + "Only ttg::Buffer and ttg::devicescratch can be waited on!"); + return detail::wait_kernel_t...>{std::tie(std::forward(args)...)}; + } - using promise_type = send_coro_promise_type; + /****************************** + * Send/Broadcast handling + * We pass the value returned by the backend's copy handler into a coroutine + * and execute the first part (prepare), before suspending it. + * The second part (send/broadcast) is executed after the task completed. + ******************************/ - ///@} + namespace detail { + struct send_coro_promise_type; - send_coro_state(base_type base) : base_type(std::move(base)) {} + using send_coro_handle_type = ttg::coroutine_handle; - base_type &handle() { return *this; } + /// a coroutine for sending data from the device + struct send_coro_state : public send_coro_handle_type { + using base_type = send_coro_handle_type; - /// @return true if ready to resume - inline bool ready() { return true; } + /// these are members mandated by the promise_type concept + ///@{ - /// @return true if task completed and can be destroyed - inline bool completed(); - }; + using promise_type = send_coro_promise_type; - /// the promise type for the send coroutine - struct send_coro_promise_type { - /* do not suspend the coroutine on first invocation, we want to run - * the coroutine immediately and suspend only once. - */ - ttg::suspend_never initial_suspend() { return {}; } + ///@} - /* we don't suspend the coroutine at the end. - * it can be destroyed once the send/broadcast is done - */ - ttg::suspend_never final_suspend() noexcept { return {}; } + send_coro_state(base_type base) : base_type(std::move(base)) {} - send_coro_state get_return_object() { return send_coro_state{send_coro_handle_type::from_promise(*this)}; } + base_type &handle() { return *this; } - /* the send coros only have an empty co_await */ - ttg::suspend_always await_transform(ttg::Void) { return {}; } + /// @return true if ready to resume + inline bool ready() { return true; } - void unhandled_exception() { - std::cerr << "Send coroutine caught an unhandled exception!" << std::endl; - throw; // fwd - } + /// @return true if task completed and can be destroyed + inline bool completed(); + }; - void return_void() {} - }; + /// the promise type for the send coroutine + struct send_coro_promise_type { + /* do not suspend the coroutine on first invocation, we want to run + * the coroutine immediately and suspend only once. + */ + ttg::suspend_never initial_suspend() { return {}; } - template - inline send_coro_state send_coro(const Key &key, Value &&value, ttg::Out> &t, - ttg::detail::value_copy_handler &ch) { - ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro - Key k = key; - t.prepare_send(k, std::forward(value)); - co_await ttg::Void{}; // we'll come back once the task is done - t.send(k, std::forward(value)); - }; + /* we don't suspend the coroutine at the end. + * it can be destroyed once the send/broadcast is done + */ + ttg::suspend_never final_suspend() noexcept { return {}; } - template - inline send_coro_state sendv_coro(Value &&value, ttg::Out> &t, - ttg::detail::value_copy_handler &ch) { - ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro - t.prepare_send(std::forward(value)); - co_await ttg::Void{}; // we'll come back once the task is done - t.sendv(std::forward(value)); - }; + send_coro_state get_return_object() { return send_coro_state{send_coro_handle_type::from_promise(*this)}; } - template - inline send_coro_state sendk_coro(const Key &key, ttg::Out &t) { - // no need to prepare the send but we have to suspend once - Key k = key; - co_await ttg::Void{}; // we'll come back once the task is done - t.sendk(k); - }; + /* the send coros only have an empty co_await */ + ttg::suspend_always await_transform(ttg::Void) { return {}; } - template - inline send_coro_state send_coro(ttg::Out &t) { - // no need to prepare the send but we have to suspend once - co_await ttg::Void{}; // we'll come back once the task is done - t.send(); - }; + void unhandled_exception() { + std::cerr << "Send coroutine caught an unhandled exception!" << std::endl; + throw; // fwd + } - struct send_t { - send_coro_state coro; - }; - } // namespace detail - - template - inline detail::send_t send(const keyT &key, valueT &&value, std::tuple...> &t) { - ttg::detail::value_copy_handler copy_handler; - return detail::send_t{ - detail::send_coro(key, copy_handler(std::forward(value)), std::get(t), copy_handler)}; - } + void return_void() {} + }; - template - inline detail::send_t sendv(valueT &&value, std::tuple...> &t) { - ttg::detail::value_copy_handler copy_handler; - return detail::send_t{detail::sendv_coro(copy_handler(std::forward(value)), std::get(t), copy_handler)}; - } + template + inline send_coro_state send_coro(const Key &key, Value &&value, ttg::Out> &t, + ttg::detail::value_copy_handler &ch) { + ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro + Key k = key; + t.prepare_send(k, std::forward(value)); + co_await ttg::Void{}; // we'll come back once the task is done + t.send(k, std::forward(value)); + }; + + template + inline send_coro_state sendv_coro(Value &&value, ttg::Out> &t, + ttg::detail::value_copy_handler &ch) { + ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro + t.prepare_send(std::forward(value)); + co_await ttg::Void{}; // we'll come back once the task is done + t.sendv(std::forward(value)); + }; + + template + inline send_coro_state sendk_coro(const Key &key, ttg::Out &t) { + // no need to prepare the send but we have to suspend once + Key k = key; + co_await ttg::Void{}; // we'll come back once the task is done + t.sendk(k); + }; + + template + inline send_coro_state send_coro(ttg::Out &t) { + // no need to prepare the send but we have to suspend once + co_await ttg::Void{}; // we'll come back once the task is done + t.send(); + }; + + struct send_t { + send_coro_state coro; + }; + } // namespace detail + + template + inline detail::send_t send(const keyT &key, valueT &&value, std::tuple...> &t) { + ttg::detail::value_copy_handler copy_handler; + return detail::send_t{ + detail::send_coro(key, copy_handler(std::forward(value)), std::get(t), copy_handler)}; + } - template - inline detail::send_t sendk(const Key &key, std::tuple...> &t) { - return detail::send_t{detail::sendk_coro(key, std::get(t))}; - } + template + inline detail::send_t sendv(valueT &&value, std::tuple...> &t) { + ttg::detail::value_copy_handler copy_handler; + return detail::send_t{detail::sendv_coro(copy_handler(std::forward(value)), std::get(t), copy_handler)}; + } - // clang-format off - /// \brief Sends a task id and a value to the template tasks attached to the output terminal of this template task - /// \param[in] i Identifies which output terminal of this template task to select for sending - /// \param[in] key: the id of the task(s) receiving the value - /// \param[in] value: the value to send to the receiving task(s) - // clang-format on - template - inline detail::send_t send(size_t i, const keyT &key, valueT &&value) { - ttg::detail::value_copy_handler copy_handler; - auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); - return detail::send_t{detail::send_coro(key, copy_handler(std::forward(value)), *terminal_ptr, copy_handler)}; - } + template + inline detail::send_t sendk(const Key &key, std::tuple...> &t) { + return detail::send_t{detail::sendk_coro(key, std::get(t))}; + } - // clang-format off - /// \brief Sends a task id and a value to the template tasks attached to the output terminal of this template task - /// \note this is provided to support `send` with and without explicitly-passed terminal tuple - /// \tparam Identifies which output terminal of this template task to select for sending - /// \param[in] key: the id of the task(s) receiving the value - /// \param[in] value: the value to send to the receiving task(s) - // clang-format on - template - inline auto send(const keyT &key, valueT &&value) { - return ttg::device::send(i, key, std::forward(value)); - } + // clang-format off + /// \brief Sends a task id and a value to the template tasks attached to the output terminal of this template task + /// \param[in] i Identifies which output terminal of this template task to select for sending + /// \param[in] key: the id of the task(s) receiving the value + /// \param[in] value: the value to send to the receiving task(s) + // clang-format on + template + inline detail::send_t send(size_t i, const keyT &key, valueT &&value) { + ttg::detail::value_copy_handler copy_handler; + auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); + return detail::send_t{detail::send_coro(key, copy_handler(std::forward(value)), *terminal_ptr, copy_handler)}; + } + // clang-format off + /// \brief Sends a task id and a value to the template tasks attached to the output terminal of this template task + /// \note this is provided to support `send` with and without explicitly-passed terminal tuple + /// \tparam Identifies which output terminal of this template task to select for sending + /// \param[in] key: the id of the task(s) receiving the value + /// \param[in] value: the value to send to the receiving task(s) + // clang-format on + template + inline auto send(const keyT &key, valueT &&value) { + return ttg::device::send(i, key, std::forward(value)); + } - template - inline detail::send_t sendv(std::size_t i, valueT &&value) { - auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); - ttg::detail::value_copy_handler copy_handler; - return detail::send_t{detail::sendv_coro(copy_handler(std::forward(value)), *terminal_ptr, copy_handler)}; - } - template - inline detail::send_t sendk(std::size_t i, const Key& key) { - auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); - return detail::send_t{detail::sendk_coro(key, *terminal_ptr)}; - } + template + inline detail::send_t sendv(std::size_t i, valueT &&value) { + auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); + ttg::detail::value_copy_handler copy_handler; + return detail::send_t{detail::sendv_coro(copy_handler(std::forward(value)), *terminal_ptr, copy_handler)}; + } - template - inline detail::send_t send(std::size_t i) { - auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); - return detail::send_t{detail::send_coro(*terminal_ptr)}; - } + template + inline detail::send_t sendk(std::size_t i, const Key& key) { + auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); + return detail::send_t{detail::sendk_coro(key, *terminal_ptr)}; + } + template + inline detail::send_t send(std::size_t i) { + auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); + return detail::send_t{detail::send_coro(*terminal_ptr)}; + } - template - inline detail::send_t sendv(valueT &&value) { - return sendv(i, std::forward(value)); - } - template - inline detail::send_t sendk(const Key& key) { - return sendk(i, key); - } + template + inline detail::send_t sendv(valueT &&value) { + return sendv(i, std::forward(value)); + } - template - inline detail::send_t sendk() { - return send(i); - } + template + inline detail::send_t sendk(const Key& key) { + return sendk(i, key); + } - namespace detail { + template + inline detail::send_t sendk() { + return send(i); + } - template - struct broadcast_keylist_trait { - using type = T; - }; + namespace detail { + + template + struct broadcast_keylist_trait { + using type = T; + }; + + /* overload for iterable types that extracts the type of the first element */ + template + struct broadcast_keylist_trait>> { + using key_type = decltype(*std::begin(std::get<0>(std::declval()))); + }; + + template + inline void prepare_broadcast(const std::tuple &keylists, valueT &&value, + std::tuple...> &t) { + std::get(t).prepare_send(std::get(keylists), std::forward(value)); + if constexpr (sizeof...(Is) > 0) { + prepare_broadcast(keylists, std::forward(value), t); + } + } - /* overload for iterable types that extracts the type of the first element */ - template - struct broadcast_keylist_trait>> { - using key_type = decltype(*std::begin(std::get<0>(std::declval()))); - }; + template + inline void prepare_broadcast(const std::tuple &keylists, valueT &&value) { + using key_t = typename broadcast_keylist_trait< + std::tuple_element_t...>> + >::key_type; + auto *terminal_ptr = ttg::detail::get_out_terminal(I, "ttg::device::broadcast(keylists, value)"); + terminal_ptr->prepare_send(std::get(keylists), value); + if constexpr (sizeof...(Is) > 0) { + prepare_broadcast(keylists, std::forward(value)); + } + } - template - inline void prepare_broadcast(const std::tuple &keylists, valueT &&value, - std::tuple...> &t) { - std::get(t).prepare_send(std::get(keylists), std::forward(value)); - if constexpr (sizeof...(Is) > 0) { - prepare_broadcast(keylists, std::forward(value), t); + template + inline void broadcast(const std::tuple &keylists, valueT &&value, + std::tuple...> &t) { + std::get(t).broadcast(std::get(keylists), std::forward(value)); + if constexpr (sizeof...(Is) > 0) { + detail::broadcast(keylists, std::forward(value), t); + } } - } - template - inline void prepare_broadcast(const std::tuple &keylists, valueT &&value) { - using key_t = typename broadcast_keylist_trait< - std::tuple_element_t...>> - >::key_type; - auto *terminal_ptr = ttg::detail::get_out_terminal(I, "ttg::device::broadcast(keylists, value)"); - terminal_ptr->prepare_send(std::get(keylists), value); - if constexpr (sizeof...(Is) > 0) { - prepare_broadcast(keylists, std::forward(value)); + template + inline void broadcast(const std::tuple &keylists, valueT &&value) { + using key_t = typename broadcast_keylist_trait< + std::tuple_element_t...>> + >::key_type; + auto *terminal_ptr = ttg::detail::get_out_terminal(I, "ttg::device::broadcast(keylists, value)"); + terminal_ptr->broadcast(std::get(keylists), value); + if constexpr (sizeof...(Is) > 0) { + ttg::device::detail::broadcast(keylists, std::forward(value)); + } } - } - template - inline void broadcast(const std::tuple &keylists, valueT &&value, - std::tuple...> &t) { - std::get(t).broadcast(std::get(keylists), std::forward(value)); - if constexpr (sizeof...(Is) > 0) { - detail::broadcast(keylists, std::forward(value), t); + /* overload with explicit terminals */ + template + inline send_coro_state + broadcast_coro(RangesT &&keylists, valueT &&value, + std::tuple...> &t, + ttg::detail::value_copy_handler&& ch) { + ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro + RangesT kl = std::forward(keylists); // capture the keylist(s) + if constexpr (ttg::meta::is_tuple_v) { + // treat as tuple + prepare_broadcast<0, I, Is...>(kl, std::forward>(value), t); + co_await ttg::Void{}; // we'll come back once the task is done + ttg::device::detail::broadcast<0, I, Is...>(kl, std::forward>(value), t); + } else if constexpr (!ttg::meta::is_tuple_v) { + // create a tie to the captured keylist + prepare_broadcast<0, I, Is...>(std::tie(kl), std::forward>(value), t); + co_await ttg::Void{}; // we'll come back once the task is done + ttg::device::detail::broadcast<0, I, Is...>(std::tie(kl), std::forward>(value), t); + } } - } - template - inline void broadcast(const std::tuple &keylists, valueT &&value) { - using key_t = typename broadcast_keylist_trait< - std::tuple_element_t...>> - >::key_type; - auto *terminal_ptr = ttg::detail::get_out_terminal(I, "ttg::device::broadcast(keylists, value)"); - terminal_ptr->broadcast(std::get(keylists), value); - if constexpr (sizeof...(Is) > 0) { - ttg::device::detail::broadcast(keylists, std::forward(value)); + /* overload with implicit terminals */ + template + inline send_coro_state + broadcast_coro(RangesT &&keylists, valueT &&value, + ttg::detail::value_copy_handler&& ch) { + ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro + RangesT kl = std::forward(keylists); // capture the keylist(s) + if constexpr (ttg::meta::is_tuple_v) { + // treat as tuple + static_assert(sizeof...(Is)+1 == std::tuple_size_v, + "Size of keylist tuple must match the number of output terminals"); + prepare_broadcast<0, I, Is...>(kl, std::forward>(value)); + co_await ttg::Void{}; // we'll come back once the task is done + ttg::device::detail::broadcast<0, I, Is...>(kl, std::forward>(value)); + } else if constexpr (!ttg::meta::is_tuple_v) { + // create a tie to the captured keylist + prepare_broadcast<0, I, Is...>(std::tie(kl), std::forward>(value)); + co_await ttg::Void{}; // we'll come back once the task is done + ttg::device::detail::broadcast<0, I, Is...>(std::tie(kl), std::forward>(value)); + } } - } + } // namespace detail - /* overload with explicit terminals */ - template - inline send_coro_state - broadcast_coro(RangesT &&keylists, valueT &&value, - std::tuple...> &t, - ttg::detail::value_copy_handler&& ch) { - ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro - RangesT kl = std::forward(keylists); // capture the keylist(s) - if constexpr (ttg::meta::is_tuple_v) { - // treat as tuple - prepare_broadcast<0, I, Is...>(kl, std::forward>(value), t); - co_await ttg::Void{}; // we'll come back once the task is done - ttg::device::detail::broadcast<0, I, Is...>(kl, std::forward>(value), t); - } else if constexpr (!ttg::meta::is_tuple_v) { - // create a tie to the captured keylist - prepare_broadcast<0, I, Is...>(std::tie(kl), std::forward>(value), t); - co_await ttg::Void{}; // we'll come back once the task is done - ttg::device::detail::broadcast<0, I, Is...>(std::tie(kl), std::forward>(value), t); - } + [[nodiscard]] + inline detail::send_t broadcast(rangeT &&keylist, + valueT &&value, + std::tuple...> &t) { + ttg::detail::value_copy_handler copy_handler; + return detail::send_t{ + detail::broadcast_coro(std::forward(keylist), + copy_handler(std::forward(value)), + t, std::move(copy_handler))}; } - /* overload with implicit terminals */ - template - inline send_coro_state - broadcast_coro(RangesT &&keylists, valueT &&value, - ttg::detail::value_copy_handler&& ch) { - ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro - RangesT kl = std::forward(keylists); // capture the keylist(s) - if constexpr (ttg::meta::is_tuple_v) { - // treat as tuple - static_assert(sizeof...(Is)+1 == std::tuple_size_v, - "Size of keylist tuple must match the number of output terminals"); - prepare_broadcast<0, I, Is...>(kl, std::forward>(value)); - co_await ttg::Void{}; // we'll come back once the task is done - ttg::device::detail::broadcast<0, I, Is...>(kl, std::forward>(value)); - } else if constexpr (!ttg::meta::is_tuple_v) { - // create a tie to the captured keylist - prepare_broadcast<0, I, Is...>(std::tie(kl), std::forward>(value)); - co_await ttg::Void{}; // we'll come back once the task is done - ttg::device::detail::broadcast<0, I, Is...>(std::tie(kl), std::forward>(value)); - } + inline detail::send_t broadcast(rangeT &&keylist, valueT &&value) { + ttg::detail::value_copy_handler copy_handler; + return detail::send_t{broadcast_coro(std::tie(keylist), copy_handler(std::forward(value)), + std::move(copy_handler))}; } - } // namespace detail - - /* overload with explicit terminals and keylist passed by const reference */ - template - [[nodiscard]] - inline detail::send_t broadcast(rangeT &&keylist, - valueT &&value, - std::tuple...> &t) { - ttg::detail::value_copy_handler copy_handler; - return detail::send_t{ - detail::broadcast_coro(std::forward(keylist), - copy_handler(std::forward(value)), - t, std::move(copy_handler))}; - } - - /* overload with implicit terminals and keylist passed by const reference */ - template - inline detail::send_t broadcast(rangeT &&keylist, valueT &&value) { - ttg::detail::value_copy_handler copy_handler; - return detail::send_t{broadcast_coro(std::tie(keylist), copy_handler(std::forward(value)), - std::move(copy_handler))}; - } - template - [[nodiscard]] - std::vector forward(Args&&... args) { - // TODO: check the cost of this! - return std::vector{std::forward(args)...}; - } - - /******************************************* - * Device task promise and coroutine handle - *******************************************/ + template + [[nodiscard]] + std::vector forward(Args&&... args) { + // TODO: check the cost of this! + return std::vector{std::forward(args)...}; + } - namespace detail { - // fwd-decl - template - struct device_task_promise_type; - // base type for ttg::device::Task - template - using device_task_handle_type = ttg::coroutine_handle>; - } // namespace detail + } // namespace device /// A device::Task is a coroutine (a callable that can be suspended and resumed). @@ -423,20 +431,20 @@ namespace ttg::device { /// Once the task function reaches a point where further progress is pending completion of one or more asynchronous /// actions the function needs to be suspended via a coroutine await (`co_await`). /// Resumption will be handled by the runtime. - template - struct Task : public detail::device_task_handle_type { - using base_type = detail::device_task_handle_type; + template + struct CoTask : public detail::cotask_handle_type { + using base_type = detail::cotask_handle_type; static constexpr const ttg::ExecutionSpace space = ES; /// these are members mandated by the promise_type concept ///@{ - using promise_type = detail::device_task_promise_type; + using promise_type = ttg::detail::cotask_promise_type; ///@} - Task(base_type base) : base_type(std::move(base)) {} + CoTask(base_type base) : base_type(std::move(base)) {} base_type& handle() { return *this; } @@ -456,7 +464,7 @@ namespace ttg::device { * tracks the state of the task when it moves from waiting for transfers * to waiting for the submitted kernel to complete. */ template - struct device_task_promise_type { + struct cotask_promise_type { static constexpr const ttg::ExecutionSpace space = ES; @@ -464,7 +472,7 @@ namespace ttg::device { * the coroutine immediately and suspend when we get the device transfers. */ ttg::suspend_never initial_suspend() { - m_state = ttg::device::detail::TTG_DEVICE_CORO_INIT; + m_state = ttg::detail::TTG_CORO_INIT; return {}; } @@ -473,16 +481,16 @@ namespace ttg::device { * TODO: necessary? maybe we can save one suspend here */ ttg::suspend_always final_suspend() noexcept { - m_state = ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; + m_state = ttg::detail::TTG_CORO_COMPLETE; return {}; } template - auto await_transform(detail::to_device_t&& a) { + auto await_transform(device::detail::to_device_t&& a) { if constexpr (space != ttg::ExecutionSpace::Host) { bool need_transfer = !(TTG_IMPL_NS::register_device_memory(a.ties)); /* TODO: are we allowed to not suspend here and launch the kernel directly? */ - m_state = ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER; + m_state = ttg::detail::TTG_CORO_WAIT_TRANSFER; return ttg::suspend_always{}; } else { return ttg::suspend_never{}; // host never suspends @@ -490,12 +498,12 @@ namespace ttg::device { } template - auto await_transform(detail::wait_kernel_t&& a) { + auto await_transform(device::detail::wait_kernel_t&& a) { if constexpr (space != ttg::ExecutionSpace::Host) { if constexpr (sizeof...(Ts) > 0) { TTG_IMPL_NS::mark_device_out(a.ties); } - m_state = ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL; + m_state = ttg::detail::TTG_CORO_WAIT_KERNEL; return ttg::suspend_always{}; } else { return ttg::suspend_never{}; // host never suspends @@ -505,7 +513,7 @@ namespace ttg::device { ttg::suspend_always await_transform(std::vector&& v) { if constexpr (space != ttg::ExecutionSpace::Host) { m_sends = std::move(v); - m_state = ttg::device::detail::TTG_DEVICE_CORO_SENDOUT; + m_state = ttg::detail::TTG_CORO_SENDOUT; } else { /* execute second part of sends immediately and never suspend */ for (auto& send : v) { @@ -522,7 +530,7 @@ namespace ttg::device { if constexpr (space != ttg::ExecutionSpace::Host) { m_sends.clear(); m_sends.push_back(std::move(v)); - m_state = ttg::device::detail::TTG_DEVICE_CORO_SENDOUT; + m_state = ttg::detail::TTG_CORO_SENDOUT; } else { v.coro(); } @@ -532,19 +540,19 @@ namespace ttg::device { } void return_void() { - m_state = ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; + m_state = ttg::detail::TTG_CORO_COMPLETE; } bool complete() const { - return m_state == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; + return m_state == ttg::detail::TTG_CORO_COMPLETE; } - ttg::device::Task get_return_object() { - return {detail::device_task_handle_type::from_promise(*this)}; + ttg::CoTask get_return_object() { + return {detail::cotask_handle_type::from_promise(*this)}; } void unhandled_exception() { - std::cerr << "Task coroutine caught an unhandled exception!" << std::endl; + std::cerr << "CoTask coroutine caught an unhandled exception!" << std::endl; throw; // fwd } @@ -564,27 +572,24 @@ namespace ttg::device { private: std::vector m_sends; - ttg_device_coro_state m_state = ttg::device::detail::TTG_DEVICE_CORO_STATE_NONE; + ttg_coro_state m_state = ttg::detail::TTG_CORO_STATE_NONE; }; template struct is_device_task : std::false_type { }; template - struct is_device_task> : std::true_type { }; + struct is_device_task> : std::true_type { }; template constexpr bool is_device_task_v = is_device_task::value; } // namespace detail template - bool Task::completed() { - return base_type::promise().state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; + bool CoTask::completed() { + return base_type::promise().state() == ttg::detail::TTG_CORO_COMPLETE; } - struct device_wait_kernel - { }; - -} // namespace ttg::device +} // namespace ttg #endif // TTG_HAVE_COROUTINE diff --git a/ttg/ttg/make_tt.h b/ttg/ttg/make_tt.h index 9b9b3d450..e6c738068 100644 --- a/ttg/ttg/make_tt.h +++ b/ttg/ttg/make_tt.h @@ -16,8 +16,8 @@ namespace detail { }; template - struct op_return_type> { - using type = typename ttg::device::Task::base_type; + struct op_return_type> { + using type = typename ttg::CoTask::base_type; }; #endif // TTG_HAVE_COROUTINE @@ -29,7 +29,7 @@ namespace detail { { }; template - struct op_execution_space> : std::integral_constant + struct op_execution_space> : std::integral_constant { }; template @@ -60,7 +60,7 @@ class CallableWrapTT using noref_funcT = std::remove_reference_t; std::conditional_t, std::add_pointer_t, noref_funcT> func; - static_assert(!ttg::device::detail::is_device_task_v); + static_assert(!ttg::detail::is_device_task_v); using op_return_type = detail::op_return_type_t; public: @@ -85,12 +85,12 @@ class CallableWrapTT coro_handle = ret; } return coro_handle; - } else if constexpr (ttg::device::detail::is_device_task_v) { + } else if constexpr (ttg::detail::is_device_task_v) { typename returnT::base_type coro_handle = ret; return coro_handle; } if constexpr (!(std::is_same_v - || ttg::device::detail::is_device_task_v)) + || ttg::detail::is_device_task_v)) #endif { static_assert(std::tuple_size_v> == 1, diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index aec57a8af..c24e8d573 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -1398,7 +1398,7 @@ namespace ttg_parsec { task_t *task = (task_t*)gpu_task->ec; // get the device task from the coroutine handle - auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); + auto dev_task = ttg::detail::cotask_handle_type::from_address(task->suspended_task_address); task->dev_ptr->stream = gpu_stream; @@ -1408,8 +1408,8 @@ namespace ttg_parsec { auto dev_data = dev_task.promise(); /* we should still be waiting for the transfer to complete */ - assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER || - dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL); + assert(dev_data.state() == ttg::detail::TTG_CORO_WAIT_TRANSFER || + dev_data.state() == ttg::detail::TTG_CORO_WAIT_KERNEL); #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA) { @@ -1445,15 +1445,15 @@ namespace ttg_parsec { int rc = PARSEC_HOOK_RETURN_DONE; if (nullptr != task->suspended_task_address) { /* Get a new handle for the promise*/ - dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); + dev_task = ttg::detail::cotask_handle_type::from_address(task->suspended_task_address); dev_data = dev_task.promise(); - assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL || - dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT || - dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE); + assert(dev_data.state() == ttg::detail::TTG_CORO_WAIT_KERNEL || + dev_data.state() == ttg::detail::TTG_CORO_SENDOUT || + dev_data.state() == ttg::detail::TTG_CORO_COMPLETE); - if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT == dev_data.state() || - ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) { + if (ttg::detail::TTG_CORO_SENDOUT == dev_data.state() || + ttg::detail::TTG_CORO_COMPLETE == dev_data.state()) { /* the task started sending so we won't come back here */ //std::cout << "device_static_submit task " << task << " complete" << std::endl; } else { @@ -1594,13 +1594,13 @@ namespace ttg_parsec { } // get the device task from the coroutine handle - auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); + auto dev_task = ttg::detail::cotask_handle_type::from_address(task->suspended_task_address); // get the promise which contains the views ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise(); /* for now make sure we're waiting for transfers and the coro hasn't skipped this step */ - assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER); + assert(dev_data.state() == ttg::detail::TTG_CORO_WAIT_TRANSFER); parsec_device_gpu_module_t *device = (parsec_device_gpu_module_t*)task->parsec_task.selected_device; assert(NULL != device); @@ -1694,7 +1694,7 @@ namespace ttg_parsec { assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid); if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) { - auto coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address); + auto coro = ttg::detail::cotask_handle_type::from_address(suspended_task_address); assert(detail::parsec_ttg_caller == nullptr); detail::parsec_ttg_caller = static_cast(task); // TODO: unify the outputs tls handling @@ -3751,16 +3751,16 @@ namespace ttg_parsec { //increment_data_versions(task, std::make_index_sequence>{}); // get the device task from the coroutine handle - auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); + auto dev_task = ttg::detail::cotask_handle_type::from_address(task->suspended_task_address); // get the promise which contains the views auto dev_data = dev_task.promise(); /* for now make sure we're waiting for the kernel to complete and the coro hasn't skipped this step */ - assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT); + assert(dev_data.state() == ttg::detail::TTG_CORO_SENDOUT); /* execute the sends we stored */ - if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) { + if (dev_data.state() == ttg::detail::TTG_CORO_SENDOUT) { /* set the current task, needed inside the sends */ detail::parsec_ttg_caller = task; dev_data.do_sends(); diff --git a/ttg/ttg/tt.h b/ttg/ttg/tt.h index dea32a994..fa2fbfec7 100644 --- a/ttg/ttg/tt.h +++ b/ttg/ttg/tt.h @@ -188,12 +188,12 @@ namespace ttg { auto coro_return = invoke; \ static_assert(std::is_same_v || \ std::is_base_of_v, decltype(coro_return)>|| \ - std::is_base_of_v>, \ + std::is_base_of_v>, \ decltype(coro_return)>); \ if constexpr (std::is_base_of_v, decltype(coro_return)>) \ id = ttg::TaskCoroutineID::ResumableTask; \ else if constexpr (std::is_base_of_v< \ - ttg::coroutine_handle>, \ + ttg::coroutine_handle>, \ decltype(coro_return)>) \ id = ttg::TaskCoroutineID::DeviceTask; \ else \