diff --git a/.github/workflows/conda-cpp.yml b/.github/workflows/conda-cpp.yml index 48cc069a7..e557637f8 100644 --- a/.github/workflows/conda-cpp.yml +++ b/.github/workflows/conda-cpp.yml @@ -24,7 +24,7 @@ jobs: # 20.04 supports CUDA 11.0+ - os: ubuntu-20.04 gcc: 9 - ucc: "v1.0.0" + ucc: "master" steps: - uses: actions/checkout@v2 @@ -65,3 +65,6 @@ jobs: - name: Run pytest run: python build.py -ipath="$HOME/cylon/install" --pytest + + - name: Build Java + run: python build.py -ipath="$HOME/cylon/install" --java diff --git a/build.py b/build.py index ef679c786..cf2eecfa6 100644 --- a/build.py +++ b/build.py @@ -25,6 +25,25 @@ logger = logging.getLogger("cylon_build") logger.setLevel(logging.INFO) + +def check_status(status, task): + if status != 0: + logger.error(f'{task} failed with a non zero exit code ({status}). ' + f'Cylon build is terminating.') + sys.exit(1) + else: + logger.info(f'{task} completed successfully') + + +def check_conda_prefix(): + conda_prefix = os.getenv('CONDA_PREFIX') + if not conda_prefix: + logger.error("The build should be in a conda environment") + sys.exit(1) + + return conda_prefix + + parser = argparse.ArgumentParser() # C++ Build @@ -54,10 +73,15 @@ python_build.add_argument( "--pytest", action='store_true', help='Run Python test suite') +# Java build +java_build = parser.add_argument_group("JCylon") +java_build.add_argument("--java", action='store_true', + help='Build JCylon') + # Docker build -java_build = parser.add_argument_group("Docker") -java_build.add_argument("--docker", action='store_true', - help='Build Cylon Docker images locally') +docker_build = parser.add_argument_group("Docker") +docker_build.add_argument("--docker", action='store_true', + help='Build Cylon Docker images locally') # Paths parser.add_argument("-bpath", help='Build directory', @@ -79,6 +103,7 @@ def on_off(arg): not args.release and not args.debug)) else "Debug" CPP_SOURCE_DIR = str(Path(args.root, 'cpp')) PYTHON_SOURCE_DIR = Path(args.root, 'python', 'pycylon') +JAVA_SOURCE_DIR = Path(args.root, 'java') RUN_CPP_TESTS = args.test RUN_PYTHON_TESTS = args.pytest CMAKE_FLAGS = args.cmake_flags @@ -89,11 +114,14 @@ def on_off(arg): # arrow build expects /s even on windows BUILD_PYTHON = args.python +# java +BUILD_JAVA = args.java + # docker BUILD_DOCKER = args.docker BUILD_DIR = str(Path(args.bpath)) -INSTALL_DIR = str(Path(args.ipath)) if args.ipath else "" +INSTALL_DIR = str(Path(args.ipath or check_conda_prefix())) OS_NAME = platform.system() # Linux, Darwin or Windows @@ -159,30 +187,10 @@ def print_line(): os.makedirs(BUILD_DIR) -def check_status(status, task): - if status != 0: - logger.error(f'{task} failed with a non zero exit code ({status}). ' - f'Cylon build is terminating.') - sys.exit(1) - else: - logger.info(f'{task} completed successfully') - - def build_cpp(): if not BUILD_CPP: return - CONDA_PREFIX = os.getenv('CONDA_PREFIX') - if args.ipath: - install_prefix = INSTALL_DIR - else: - if CONDA_PREFIX: - install_prefix = CONDA_PREFIX - else: - logger.error( - "install prefix can not be inferred. The build should be in a conda environment") - return - win_cmake_args = "-A x64" if os.name == 'nt' else "" verb = '-DCMAKE_VERBOSE_MAKEFILE:BOOL=ON' if args.verbose else '' @@ -191,7 +199,7 @@ def build_cpp(): f"-DCYLON_WITH_TEST={on_off(RUN_CPP_TESTS)} " \ f"-DARROW_BUILD_TYPE=SYSTEM " \ f"{CPPLINT_COMMAND} " \ - f"-DCMAKE_INSTALL_PREFIX={install_prefix} " \ + f"-DCMAKE_INSTALL_PREFIX={INSTALL_DIR} " \ f"{verb} {CMAKE_FLAGS} {CPP_SOURCE_DIR}" logger.info(f"Generate command: {cmake_command}") @@ -203,7 +211,7 @@ def build_cpp(): res = subprocess.call(cmake_build_command, cwd=BUILD_DIR, shell=True) check_status(res, "C++ cmake build") - cmake_install_command = f'cmake --install . --prefix {install_prefix}' + cmake_install_command = f'cmake --install . --prefix {INSTALL_DIR}' logger.info(f"Install command: {cmake_install_command}") res = subprocess.call(cmake_install_command, cwd=BUILD_DIR, shell=True) check_status(res, "C++ cmake install") @@ -220,11 +228,16 @@ def build_cpp(): def build_docker(): if not BUILD_DOCKER: return + logger.error("Docker build not implemented in this script") + sys.exit(1) def python_test(): if not RUN_PYTHON_TESTS: return + + check_conda_prefix() + env = os.environ if args.ipath: if OS_NAME == 'Linux': @@ -264,10 +277,7 @@ def build_python(): print_line() logger.info("Building Python") - conda_prefix = os.getenv('CONDA_PREFIX') - if not conda_prefix: - logger.error("The build should be in a conda environment") - return 1 + conda_prefix = check_conda_prefix() python_build_command = f'{PYTHON_EXEC} setup.py install --force' env = os.environ @@ -290,6 +300,19 @@ def build_python(): check_status(res.returncode, "PyCylon build") +def build_java(): + if not BUILD_JAVA: + return + + conda_prefix = check_conda_prefix() + + mvn_cmd = f"mvn clean install -Dcylon.core.libs={INSTALL_DIR}/lib " \ + f"-Dcylon.arrow.dir={conda_prefix}" + res = subprocess.run(mvn_cmd, shell=True, cwd=JAVA_SOURCE_DIR) + check_status(res.returncode, "JCylon build") + + build_cpp() build_python() python_test() +build_java() diff --git a/conda/environments/cylon.yml b/conda/environments/cylon.yml index 5ccfe229b..ec02a88dc 100644 --- a/conda/environments/cylon.yml +++ b/conda/environments/cylon.yml @@ -7,7 +7,7 @@ dependencies: - cmake>=3.17 - pyarrow=5.0.0 - glog=0.5.0 - - openmpi>=4.1.3 + - openmpi=4.1.3=ha1ae619_105 - ucx>=1.12.1 - cython>=0.29,<0.30 - numpy>=1.20 diff --git a/conda/environments/gcylon.yml b/conda/environments/gcylon.yml index 1b48f2cd1..084ec0a33 100644 --- a/conda/environments/gcylon.yml +++ b/conda/environments/gcylon.yml @@ -11,7 +11,7 @@ dependencies: - cudf=21.10.01 - cudatoolkit=11.2 - glog=0.5.0 - - openmpi>=4.1.3 + - openmpi=4.1.3=ha1ae619_105 - ucx>=1.12.1 - numpy>=1.20 - pandas>=1.0 diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 747fc3a40..d7337f0b4 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -251,7 +251,7 @@ if (CYLON_UCX) message(STATUS "UCC libs: ${UCC_LIBRARIES}") include_directories(SYSTEM ${UCC_INCLUDE_DIRS}) - link_directories(${UCC_LIBRARIES}) + link_directories(${UCC_LIBRARIES} ${UCC_LIBRARIES}/ucc) endif (CYLON_UCC) endif (CYLON_UCX) @@ -372,5 +372,6 @@ if (CYLON_WITH_TEST) set(CYLON_CATCH2_HEADER_HASH 27da57c7a06d09be8dd81fab7246b79e7892b6ae7e4e49ba8631f1d5a955e3fc) enable_testing() + set(CMAKE_CTEST_ARGUMENTS "--output-on-failure") add_subdirectory(test) endif () diff --git a/cpp/src/cylon/CMakeLists.txt b/cpp/src/cylon/CMakeLists.txt index 99683e189..59602311d 100644 --- a/cpp/src/cylon/CMakeLists.txt +++ b/cpp/src/cylon/CMakeLists.txt @@ -29,8 +29,16 @@ if (CYLON_UCX) net/ucx/ucx_operations.hpp net/ucx/ucx_operations.cpp ) + if (CYLON_UCC) + set(UCC_CYLON_FILES + net/ucc/ucc_operations.hpp + net/ucc/ucc_operations.cpp) + else () + set(UCC_CYLON_FILES) + endif (CYLON_UCC) else (CYLON_UCX) set(UCX_CYLON_FILES) + set(UCC_CYLON_FILES) endif (CYLON_UCX) if (CYLON_GLOO) @@ -47,6 +55,7 @@ endif (CYLON_GLOO) add_library(cylon SHARED ${UCX_CYLON_FILES} + ${UCC_CYLON_FILES} ${CYLON_GLOO_FILES} arrow/arrow_all_to_all.cpp arrow/arrow_all_to_all.hpp @@ -116,6 +125,7 @@ add_library(cylon SHARED net/channel.hpp net/comm_operations.hpp net/comm_type.hpp + net/communicator.cpp net/communicator.hpp net/mpi/mpi_channel.cpp net/mpi/mpi_channel.hpp diff --git a/cpp/src/cylon/compute/scalar_aggregate.cpp b/cpp/src/cylon/compute/scalar_aggregate.cpp index 672447b07..d5ad01608 100644 --- a/cpp/src/cylon/compute/scalar_aggregate.cpp +++ b/cpp/src/cylon/compute/scalar_aggregate.cpp @@ -267,12 +267,14 @@ struct VarianceKernelImpl : public ScalarAggregateKernel { arrow::MemoryPool *pool_ = nullptr; }; -bool is_all_valid(const std::shared_ptr &comm, - const std::shared_ptr &values) { +Status is_all_valid(const std::shared_ptr &comm, + const std::shared_ptr &values, + bool *res) { const auto &null_count = Scalar::Make(std::make_shared(values->null_count())); std::shared_ptr out; - const auto &status = comm->AllReduce(null_count, net::SUM, &out); - return status.is_ok() && std::static_pointer_cast(out->data())->value == 0; + RETURN_CYLON_STATUS_IF_FAILED(comm->AllReduce(null_count, net::SUM, &out)); + *res = std::static_pointer_cast(out->data())->value == 0; + return Status::OK(); } Status ScalarAggregate(const std::shared_ptr &ctx, @@ -292,7 +294,9 @@ Status ScalarAggregate(const std::shared_ptr &ctx, if (ctx->GetWorldSize() > 1) { const auto &comm = ctx->GetCommunicator(); - if (!is_all_valid(comm, combined_results)) { + bool all_valid; + RETURN_CYLON_STATUS_IF_FAILED(is_all_valid(comm, combined_results, &all_valid)); + if (!all_valid) { // combined_results array has nulls. So, return Null scalar *result = arrow::MakeNullScalar(values->type()); return Status::OK(); @@ -422,7 +426,9 @@ Status ScalarAggregate(const std::shared_ptr &ctx, // all reduce combined_results const auto &comm = ctx->GetCommunicator(); - if (!is_all_valid(comm, combined_results)) { + bool all_valid; + RETURN_CYLON_STATUS_IF_FAILED(is_all_valid(comm, combined_results, &all_valid)); + if (!all_valid) { // combined_results array has nulls. So, return Null scalar CYLON_ASSIGN_OR_RAISE(*result, arrow::MakeArrayOfNull(promoted_type, table_->num_columns(), pool)) diff --git a/cpp/src/cylon/ctx/cylon_context.cpp b/cpp/src/cylon/ctx/cylon_context.cpp index a76a7e444..963a70040 100644 --- a/cpp/src/cylon/ctx/cylon_context.cpp +++ b/cpp/src/cylon/ctx/cylon_context.cpp @@ -26,7 +26,7 @@ #endif #ifdef BUILD_CYLON_GLOO -#include +#include "cylon/net/gloo/gloo_communicator.hpp" #endif namespace cylon { @@ -38,28 +38,6 @@ CylonContext::CylonContext(bool distributed) { this->is_distributed = distributed; } -std::shared_ptr CylonContext::InitDistributed(const std::shared_ptr &config) { - if (config->Type() == net::CommType::MPI) { - auto ctx = std::make_shared(true); - ctx->communicator = std::make_shared(&ctx); - ctx->communicator->Init(config); - return ctx; - } - -#ifdef BUILD_CYLON_UCX - else if (config->Type() == net::CommType::UCX) { - auto ctx = std::make_shared(true); - ctx->communicator = std::make_shared(&ctx); - ctx->communicator->Init(config); - return ctx; - } -#endif - else { - throw "Unsupported communication type"; - } - return nullptr; -} - Status CylonContext::InitDistributed(const std::shared_ptr &config, std::shared_ptr *ctx) { switch (config->Type()) { @@ -67,26 +45,31 @@ Status CylonContext::InitDistributed(const std::shared_ptr(true); - (*ctx)->communicator = std::make_shared(ctx); - return (*ctx)->communicator->Init(config); + auto pool = (*ctx)->GetMemoryPool(); + return net::MPICommunicator::Make(config, pool, &(*ctx)->communicator); } case net::UCX: { #ifdef BUILD_CYLON_UCX *ctx = std::make_shared(true); - (*ctx)->communicator = std::make_shared(ctx); - return (*ctx)->communicator->Init(config); + auto pool = (*ctx)->GetMemoryPool(); +#ifdef BUILD_CYLON_UCC + return net::UCXUCCCommunicator::Make(config, pool, &(*ctx)->communicator); +#else + return net::UCXCommunicator::Make(config, pool, &(*ctx)->communicator); +#endif #else return {Code::NotImplemented, "UCX communication not implemented"}; #endif } case net::TCP:return {Code::NotImplemented, "TCP communication not implemented"}; + case net::GLOO: { #ifdef BUILD_CYLON_GLOO *ctx = std::make_shared(true); - (*ctx)->communicator = std::make_shared(ctx); - return (*ctx)->communicator->Init(config); + auto pool = (*ctx)->GetMemoryPool(); + return net::GlooCommunicator::Make(config, pool, &(*ctx)->communicator); #else return {Code::NotImplemented, "Gloo communication not implemented"}; #endif diff --git a/cpp/src/cylon/ctx/cylon_context.hpp b/cpp/src/cylon/ctx/cylon_context.hpp index 24a28b21c..ccc921274 100644 --- a/cpp/src/cylon/ctx/cylon_context.hpp +++ b/cpp/src/cylon/ctx/cylon_context.hpp @@ -54,7 +54,6 @@ class CylonContext { * @param config Configuration to be passed on to the cylon::net::Communicator * @return */ - static std::shared_ptr InitDistributed(const std::shared_ptr &config); static Status InitDistributed(const std::shared_ptr &config, std::shared_ptr *ctx); diff --git a/cpp/src/cylon/net/CMakeLists.txt b/cpp/src/cylon/net/CMakeLists.txt index ade6f06f1..f6e6c76e5 100644 --- a/cpp/src/cylon/net/CMakeLists.txt +++ b/cpp/src/cylon/net/CMakeLists.txt @@ -17,4 +17,5 @@ cylon_install_all_headers("cylon/net") add_subdirectory(ops) add_subdirectory(mpi) add_subdirectory(ucx) -add_subdirectory(gloo) \ No newline at end of file +add_subdirectory(gloo) +add_subdirectory(ucc) \ No newline at end of file diff --git a/cpp/src/cylon/net/communicator.cpp b/cpp/src/cylon/net/communicator.cpp new file mode 100644 index 000000000..e69068b6a --- /dev/null +++ b/cpp/src/cylon/net/communicator.cpp @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cylon/net/communicator.hpp" + +namespace cylon { +namespace net { + +MemoryPool *Communicator::GetMemoryPool() const { + return this->pool; +} + +int Communicator::GetRank() const { + return this->rank; +} + +int Communicator::GetWorldSize() const { + return this->world_size; +} + +bool Communicator::IsFinalized() const { + return this->finalized; +} + +} +} \ No newline at end of file diff --git a/cpp/src/cylon/net/communicator.hpp b/cpp/src/cylon/net/communicator.hpp index 6e685e375..db1883800 100644 --- a/cpp/src/cylon/net/communicator.hpp +++ b/cpp/src/cylon/net/communicator.hpp @@ -15,6 +15,7 @@ #ifndef CYLON_SRC_CYLON_COMM_COMMUNICATOR_H_ #define CYLON_SRC_CYLON_COMM_COMMUNICATOR_H_ +#include "cylon/ctx/memory_pool.hpp" #include "cylon/net/comm_config.hpp" #include "cylon/net/channel.hpp" #include "cylon/net/comm_operations.hpp" @@ -29,16 +30,23 @@ namespace net { class Communicator { public: - explicit Communicator(const std::shared_ptr *ctx_ptr) : ctx_ptr(ctx_ptr) {} + Communicator(MemoryPool *pool, int32_t rank, int32_t world_size) + : rank(rank), world_size(world_size), pool(pool) {} + virtual ~Communicator() = default; - virtual Status Init(const std::shared_ptr &config) = 0; + virtual int GetRank() const; + virtual int GetWorldSize() const; + virtual MemoryPool* GetMemoryPool() const; + bool IsFinalized() const; + + virtual CommType GetCommType() const = 0; + virtual std::unique_ptr CreateChannel() const = 0; - virtual int GetRank() const = 0; - virtual int GetWorldSize() const = 0; + virtual void Finalize() = 0; + virtual void Barrier() = 0; - virtual CommType GetCommType() const = 0; virtual Status AllGather(const std::shared_ptr &table, std::vector> *out) const = 0; @@ -48,7 +56,17 @@ class Communicator { bool gather_from_root, std::vector> *out) const = 0; - virtual Status Bcast(std::shared_ptr
*table, int bcast_root) const = 0; + /** + * Broadcasts `table` in `bcast_root` rank to every other rank. + * @param table Input could be NULL in non-root ranks. Those ranks would have the + * broadcast result in this shared_ptr + * @param bcast_root + * @param ctx CylonContext is required to instantiate tables in non-root ranks + * @return + */ + virtual Status Bcast(std::shared_ptr
*table, + int bcast_root, + const std::shared_ptr &ctx) const = 0; /* Array communications */ @@ -86,8 +104,8 @@ class Communicator { protected: int rank = -1; int world_size = -1; - // keeping a ptr to the CylonContext shared_ptr - const std::shared_ptr *ctx_ptr; + MemoryPool *pool; + bool finalized = false; }; } } diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.cpp b/cpp/src/cylon/net/gloo/gloo_communicator.cpp index 38fcad2fc..7af55b28f 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.cpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.cpp @@ -30,7 +30,9 @@ namespace cylon { namespace net { -Status GlooCommunicator::Init(const std::shared_ptr &config) { +Status GlooCommunicator::Make(const std::shared_ptr &config, + MemoryPool *pool, + std::shared_ptr *out) { const auto &gloo_config = std::static_pointer_cast(config); gloo::transport::tcp::attr attr; @@ -39,43 +41,41 @@ Status GlooCommunicator::Init(const std::shared_ptr &config) { attr.ai_family = gloo_config->tcp_ai_family_; // create device - dev_ = gloo::transport::tcp::CreateDevice(attr); + auto dev = gloo::transport::tcp::CreateDevice(attr); + std::shared_ptr gloo_ctx; if (gloo_config->use_mpi_) { #ifdef GLOO_USE_MPI int res; RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Initialized(&res)); if (res) { if (gloo_config->mpi_comm_ == MPI_COMM_NULL) { - gloo_ctx_ = std::make_shared(MPI_COMM_WORLD); + gloo_ctx = std::make_shared(MPI_COMM_WORLD); } else { - gloo_ctx_ = std::make_shared(gloo_config->mpi_comm_); + gloo_ctx = std::make_shared(gloo_config->mpi_comm_); } } else { // MPI is not initialized. Ask gloo to initialize MPI - gloo_ctx_ = gloo::mpi::Context::createManaged(); + gloo_ctx = gloo::mpi::Context::createManaged(); } - ((gloo::mpi::Context &) *gloo_ctx_).connectFullMesh(dev_); - - // update rank and world size - rank = gloo_ctx_->rank; - world_size = gloo_ctx_->size; + ((gloo::mpi::Context &) *gloo_ctx).connectFullMesh(dev); #else return {Code::Invalid, "Gloo does not contain mpi headers!"}; #endif // GLOO_USE_MPI } else { - rank = gloo_config->rank_; - world_size = gloo_config->world_size_; - // store and prefix store - store_ = std::make_shared(gloo_config->file_store_path_); - prefix_store_ = std::make_shared(gloo_config->store_prefix_, - *store_); + auto store = std::make_shared(gloo_config->file_store_path_); + auto prefix_store = std::make_shared(gloo_config->store_prefix_, + *store); + + gloo_ctx = std::make_shared(gloo_config->rank_, + gloo_config->world_size_); + ((gloo::rendezvous::Context &) *gloo_ctx).connectFullMesh(*prefix_store, dev); - gloo_ctx_ = std::make_shared(rank, world_size); - ((gloo::rendezvous::Context &) *gloo_ctx_).connectFullMesh(*prefix_store_, dev_); } + *out = std::make_shared(pool, std::move(gloo_ctx)); return Status::OK(); } + std::unique_ptr GlooCommunicator::CreateChannel() const { return std::make_unique(gloo_ctx_.get()); } @@ -113,37 +113,40 @@ Status GlooCommunicator::Gather(const std::shared_ptr
&table, return impl.Execute(table, gather_root, gather_from_root, out); } -Status GlooCommunicator::Bcast(std::shared_ptr
*table, int bcast_root) const { +Status GlooCommunicator::Bcast(std::shared_ptr
*table, + int bcast_root, + const std::shared_ptr &ctx) const { GlooTableBcastImpl impl(&gloo_ctx_); - return impl.Execute(table, bcast_root, *ctx_ptr); + return impl.Execute(table, bcast_root, ctx); } Status GlooCommunicator::AllReduce(const std::shared_ptr &values, net::ReduceOp reduce_op, std::shared_ptr *output) const { GlooAllReduceImpl impl(&gloo_ctx_); - return impl.Execute(values, reduce_op, output, (*ctx_ptr)->GetMemoryPool()); + return impl.Execute(values, reduce_op, output, pool); } Status GlooCommunicator::AllReduce(const std::shared_ptr &value, net::ReduceOp reduce_op, std::shared_ptr *output) const { GlooAllReduceImpl impl(&gloo_ctx_); - return impl.Execute(value, reduce_op, output, (*ctx_ptr)->GetMemoryPool()); + return impl.Execute(value, reduce_op, output, pool); } -GlooCommunicator::GlooCommunicator(const std::shared_ptr *ctx_ptr) - : Communicator(ctx_ptr) {} +GlooCommunicator::GlooCommunicator(MemoryPool *pool, + std::shared_ptr gloo_ctx) + : Communicator(pool, gloo_ctx->rank, gloo_ctx->size), gloo_ctx_(std::move(gloo_ctx)) {} Status GlooCommunicator::Allgather(const std::shared_ptr &values, std::vector> *output) const { GlooAllgatherImpl impl(&gloo_ctx_); - return impl.Execute(values, gloo_ctx_->size, output, (*ctx_ptr)->GetMemoryPool()); + return impl.Execute(values, gloo_ctx_->size, output, pool); } Status GlooCommunicator::Allgather(const std::shared_ptr &value, std::shared_ptr *output) const { GlooAllgatherImpl impl(&gloo_ctx_); - return impl.Execute(value, gloo_ctx_->size, output, (*ctx_ptr)->GetMemoryPool()); + return impl.Execute(value, gloo_ctx_->size, output, pool); } CommType GlooConfig::Type() { return GLOO; } diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.hpp b/cpp/src/cylon/net/gloo/gloo_communicator.hpp index a91cfbea7..1f5b9a825 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.hpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.hpp @@ -85,8 +85,7 @@ class GlooConfig : public CommConfig { class GlooCommunicator : public Communicator { public: - explicit GlooCommunicator(const std::shared_ptr *ctx_ptr); - Status Init(const std::shared_ptr &config) override; + GlooCommunicator(MemoryPool *pool, std::shared_ptr gloo_ctx); std::unique_ptr CreateChannel() const override; int GetRank() const override; int GetWorldSize() const override; @@ -99,7 +98,9 @@ class GlooCommunicator : public Communicator { int gather_root, bool gather_from_root, std::vector> *out) const override; - Status Bcast(std::shared_ptr
*table, int bcast_root) const override; + Status Bcast(std::shared_ptr
*table, + int bcast_root, + const std::shared_ptr &ctx) const override; Status AllReduce(const std::shared_ptr &values, net::ReduceOp reduce_op, std::shared_ptr *output) const override; @@ -111,11 +112,11 @@ class GlooCommunicator : public Communicator { Status Allgather(const std::shared_ptr &values, std::vector> *output) const override; + static Status Make(const std::shared_ptr &config, + MemoryPool *pool, std::shared_ptr *out); + private: std::shared_ptr gloo_ctx_ = nullptr; - std::shared_ptr dev_ = nullptr; - std::shared_ptr store_ = nullptr; - std::shared_ptr prefix_store_ = nullptr; }; } diff --git a/cpp/src/cylon/net/mpi/mpi_communicator.cpp b/cpp/src/cylon/net/mpi/mpi_communicator.cpp index fdfbad6de..ba2b5768e 100644 --- a/cpp/src/cylon/net/mpi/mpi_communicator.cpp +++ b/cpp/src/cylon/net/mpi/mpi_communicator.cpp @@ -53,40 +53,45 @@ int MPICommunicator::GetRank() const { int MPICommunicator::GetWorldSize() const { return this->world_size; } -Status MPICommunicator::Init(const std::shared_ptr &config) { + +Status MPICommunicator::Make(const std::shared_ptr &config, + MemoryPool *pool, + std::shared_ptr *out) { + int ext_init, rank, world_size; // check if MPI is initialized - RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Initialized(&mpi_initialized_externally)); - mpi_comm_ = std::static_pointer_cast(config)->GetMPIComm(); + RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Initialized(&ext_init)); + auto mpi_comm = std::static_pointer_cast(config)->GetMPIComm(); - if (mpi_comm_ != MPI_COMM_NULL && !mpi_initialized_externally) { + if (mpi_comm != MPI_COMM_NULL && !ext_init) { return {Code::Invalid, "non-null MPI_Comm passed without initializing MPI"}; } - if (!mpi_initialized_externally) { // if not initialized, init MPI + if (!ext_init) { // if not initialized, init MPI RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); } - if (mpi_comm_ == MPI_COMM_NULL) { // set comm_ to world - mpi_comm_ = MPI_COMM_WORLD; + if (mpi_comm == MPI_COMM_NULL) { // set comm_ to world + mpi_comm = MPI_COMM_WORLD; } // setting errors to return - MPI_Comm_set_errhandler(mpi_comm_, MPI_ERRORS_RETURN); + MPI_Comm_set_errhandler(mpi_comm, MPI_ERRORS_RETURN); - RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Comm_rank(mpi_comm_, &this->rank)); - RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Comm_size(mpi_comm_, &this->world_size)); + RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Comm_rank(mpi_comm, &rank)); + RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Comm_size(mpi_comm, &world_size)); if (rank < 0 || world_size < 0 || rank >= world_size) { return {Code::ExecutionError, "Malformed rank :" + std::to_string(rank) + " or world size:" + std::to_string(world_size)}; } + *out = std::make_shared(pool, rank, world_size, mpi_comm, (bool) ext_init); return Status::OK(); } void MPICommunicator::Finalize() { // finalize only if we initialized MPI - if (!mpi_initialized_externally) { + if (!externally_init) { int finalized; MPI_Finalized(&finalized); if (!finalized) { @@ -116,9 +121,11 @@ Status MPICommunicator::Gather(const std::shared_ptr
&table, return impl.Execute(table, gather_root, gather_from_root, out); } -Status MPICommunicator::Bcast(std::shared_ptr
*table, int bcast_root) const { +Status MPICommunicator::Bcast(std::shared_ptr
*table, + int bcast_root, + const std::shared_ptr &ctx) const { mpi::MpiTableBcastImpl impl(mpi_comm_); - return impl.Execute(table, bcast_root, *ctx_ptr); + return impl.Execute(table, bcast_root, ctx); } MPI_Comm MPICommunicator::mpi_comm() const { @@ -129,28 +136,32 @@ Status MPICommunicator::AllReduce(const std::shared_ptr &values, net::ReduceOp reduce_op, std::shared_ptr *output) const { mpi::MpiAllReduceImpl impl(mpi_comm_); - return impl.Execute(values, reduce_op, output, (*ctx_ptr)->GetMemoryPool()); + return impl.Execute(values, reduce_op, output, pool); } Status MPICommunicator::AllReduce(const std::shared_ptr &value, net::ReduceOp reduce_op, std::shared_ptr *output) const { mpi::MpiAllReduceImpl impl(mpi_comm_); - return impl.Execute(value, reduce_op, output, (*ctx_ptr)->GetMemoryPool()); + return impl.Execute(value, reduce_op, output, pool); } -MPICommunicator::MPICommunicator(const std::shared_ptr *ctx_ptr) - : Communicator(ctx_ptr) {} +MPICommunicator::MPICommunicator(MemoryPool *pool, + int32_t rank, + int32_t world_size, + MPI_Comm mpi_comm, + bool externally_init) + : Communicator(pool, rank, world_size), mpi_comm_(mpi_comm), externally_init(externally_init) {} Status MPICommunicator::Allgather(const std::shared_ptr &values, std::vector> *output) const { mpi::MpiAllgatherImpl impl(mpi_comm_); - return impl.Execute(values, (*ctx_ptr)->GetWorldSize(), output, (*ctx_ptr)->GetMemoryPool()); + return impl.Execute(values, world_size, output, pool); } Status MPICommunicator::Allgather(const std::shared_ptr &value, std::shared_ptr *output) const { mpi::MpiAllgatherImpl impl(mpi_comm_); - return impl.Execute(value, (*ctx_ptr)->GetWorldSize(), output, (*ctx_ptr)->GetMemoryPool()); + return impl.Execute(value, world_size, output, pool); } } // namespace net diff --git a/cpp/src/cylon/net/mpi/mpi_communicator.hpp b/cpp/src/cylon/net/mpi/mpi_communicator.hpp index 46e2aa533..aea07fa9e 100644 --- a/cpp/src/cylon/net/mpi/mpi_communicator.hpp +++ b/cpp/src/cylon/net/mpi/mpi_communicator.hpp @@ -41,9 +41,8 @@ class MPIConfig : public CommConfig { class MPICommunicator : public Communicator { public: - explicit MPICommunicator(const std::shared_ptr *ctx_ptr); + MPICommunicator(MemoryPool *pool, int32_t rank, int32_t world_size, MPI_Comm mpi_comm, bool externally_init); ~MPICommunicator() override = default; - Status Init(const std::shared_ptr &config) override; std::unique_ptr CreateChannel() const override; int GetRank() const override; int GetWorldSize() const override; @@ -57,7 +56,9 @@ class MPICommunicator : public Communicator { Status Gather(const std::shared_ptr
&table, int gather_root, bool gather_from_root, std::vector> *out) const override; - Status Bcast(std::shared_ptr
*table, int bcast_root) const override; + Status Bcast(std::shared_ptr
*table, + int bcast_root, + const std::shared_ptr &ctx) const override; Status AllReduce(const std::shared_ptr &values, net::ReduceOp reduce_op, @@ -73,9 +74,12 @@ class MPICommunicator : public Communicator { MPI_Comm mpi_comm() const; + static Status Make(const std::shared_ptr &config, + MemoryPool *pool, std::shared_ptr *out); + private: MPI_Comm mpi_comm_ = MPI_COMM_NULL; - int mpi_initialized_externally = 0; + bool externally_init = false; }; } diff --git a/cpp/src/cylon/net/ops/base_ops.cpp b/cpp/src/cylon/net/ops/base_ops.cpp index 5e5539f1d..c70c84743 100644 --- a/cpp/src/cylon/net/ops/base_ops.cpp +++ b/cpp/src/cylon/net/ops/base_ops.cpp @@ -51,10 +51,11 @@ Status TableAllgatherImpl::Execute(const std::shared_ptr &seria displacements->reserve(num_buffers); received_buffers->reserve(num_buffers); + std::vector> all_recv_counts(num_buffers); for (int32_t i = 0; i < num_buffers; ++i) { std::shared_ptr receive_buf; RETURN_CYLON_STATUS_IF_FAILED(allocator->Allocate(total_buffer_sizes[i], &receive_buf)); - const auto &receive_counts = receiveCounts(*all_buffer_sizes, i, num_buffers, + all_recv_counts[i] = receiveCounts(*all_buffer_sizes, i, num_buffers, world_size); auto disp_per_buffer = displacementsPerBuffer(*all_buffer_sizes, i, num_buffers, world_size); @@ -63,7 +64,7 @@ Status TableAllgatherImpl::Execute(const std::shared_ptr &seria send_buffers[i], local_buffer_sizes[i], receive_buf->GetByteBuffer(), - receive_counts, + all_recv_counts[i], disp_per_buffer)); displacements->push_back(std::move(disp_per_buffer)); received_buffers->push_back(std::move(receive_buf)); @@ -146,11 +147,12 @@ Status TableGatherImpl::Execute(const std::shared_ptr &s displacements->reserve(num_buffers); received_buffers->reserve(num_buffers); + std::vector> all_recv_counts(num_buffers); for (int32_t i = 0; i < num_buffers; ++i) { if (is_root) { std::shared_ptr receive_buf; RETURN_CYLON_STATUS_IF_FAILED(allocator->Allocate(total_buffer_sizes[i], &receive_buf)); - const auto &receive_counts = receiveCounts(*all_buffer_sizes, i, + all_recv_counts[i] = receiveCounts(*all_buffer_sizes, i, num_buffers, world_size); auto disp_per_buffer = displacementsPerBuffer(*all_buffer_sizes, i, num_buffers, world_size); @@ -159,7 +161,7 @@ Status TableGatherImpl::Execute(const std::shared_ptr &s send_buffers[i], local_buffer_sizes[i], receive_buf->GetByteBuffer(), - receive_counts, + all_recv_counts[i], disp_per_buffer, gather_root)); displacements->push_back(std::move(disp_per_buffer)); @@ -429,19 +431,17 @@ Status AllGatherImpl::Execute(const std::shared_ptr &values, std::array, 3> displacements{}; std::array, 3> received_bufs{}; + std::vector> all_recv_counts(3); for (int i = 0; i < 3; i++) { RETURN_CYLON_STATUS_IF_FAILED(allocator.Allocate(total_buf_sizes[i], &received_bufs[i])); - const auto &receive_counts = receiveCounts(all_buf_sizes, i, 3, world_size); + all_recv_counts[i] = receiveCounts(all_buf_sizes, i, 3, world_size); displacements[i].resize(world_size); - prefix_sum(receive_counts, &displacements[i]); + prefix_sum(all_recv_counts[i], &displacements[i]); - RETURN_CYLON_STATUS_IF_FAILED(IallgatherBufferData(i, - buffers[i], - buf_sizes[i], - received_bufs[i]->GetByteBuffer(), - receive_counts, - displacements[i])); + RETURN_CYLON_STATUS_IF_FAILED(IallgatherBufferData( + i, buffers[i], buf_sizes[i], received_bufs[i]->GetByteBuffer(), + all_recv_counts[i], displacements[i])); } WaitAll(); diff --git a/cpp/src/cylon/net/ucc/CMakeLists.txt b/cpp/src/cylon/net/ucc/CMakeLists.txt new file mode 100644 index 000000000..63b5da43b --- /dev/null +++ b/cpp/src/cylon/net/ucc/CMakeLists.txt @@ -0,0 +1,15 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +cylon_install_all_headers("cylon/net/ucc") \ No newline at end of file diff --git a/cpp/src/cylon/net/ucc/ucc_operations.cpp b/cpp/src/cylon/net/ucc/ucc_operations.cpp new file mode 100644 index 000000000..d29c8c7dd --- /dev/null +++ b/cpp/src/cylon/net/ucc/ucc_operations.cpp @@ -0,0 +1,501 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cylon/net/ucc/ucc_operations.hpp" +#include "cylon/util/macros.hpp" + +namespace cylon { +namespace ucc { + +Status UccTableAllgatherImpl::AllgatherBufferSizes(const int32_t *send_data, + int num_buffers, + int32_t *rcv_data) const { + ucc_coll_args_t args; + ucc_coll_req_h req; + + args.mask = 0; + args.coll_type = UCC_COLL_TYPE_ALLGATHER; + + args.src.info.buffer = const_cast(send_data); + args.src.info.count = static_cast(num_buffers); + args.src.info.datatype = UCC_DT_INT32; + args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + args.dst.info.buffer = rcv_data; + args.dst.info.count = num_buffers * world_size; + args.dst.info.datatype = UCC_DT_INT32; + args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_init(&args, &req, ucc_team_)); + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_post(req)); + + ucc_status_t status; + + while (UCC_OK != (status = ucc_collective_test(req))) { + RETURN_CYLON_STATUS_IF_UCC_FAILED(status); + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_progress(ucc_context_)); + } + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_finalize(req)); + + return Status::OK(); +} + +Status UccTableAllgatherImpl::IallgatherBufferData( + int buf_idx, const uint8_t *send_data, int32_t send_count, + uint8_t *recv_data, const std::vector &recv_count, + const std::vector &displacements) { + ucc_coll_args_t &args = args_[buf_idx]; + + args.mask = 0; + args.coll_type = UCC_COLL_TYPE_ALLGATHERV; + + args.src.info.buffer = const_cast(send_data); + args.src.info.count = static_cast(send_count); + args.src.info.datatype = UCC_DT_UINT8; + args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + + args.dst.info_v.buffer = recv_data; + args.dst.info_v.counts = (ucc_count_t *) recv_count.data(); + args.dst.info_v.displacements = (ucc_aint_t *) displacements.data(); + args.dst.info_v.datatype = UCC_DT_UINT8; + args.dst.info_v.mem_type = UCC_MEMORY_TYPE_HOST; + + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_collective_init(&args, &requests_[buf_idx], ucc_team_)); + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_post(requests_[buf_idx])); + return Status::OK(); +} + +Status WaitAllHelper(std::vector &reqs, ucc_context_h &ctx) { + bool alldone = false; + ucc_status_t status; + while (!alldone) { + alldone = true; + for (auto &r: reqs) { + // at every iteration progress the ctx + ucc_context_progress(ctx); + + status = ucc_collective_test(r); + // if an error occurs or the operation is not posted yet, return an error + if (status < 0 || UCC_OPERATION_INITIALIZED == status) { + return {Code::ExecutionError, "UCC Failed: " + std::string(ucc_status_string(status))}; + } + + // now status can be OK or IN PROGRESS + alldone &= (UCC_INPROGRESS != status); + } + } + + // all done, finalize requests now + for (auto &r: reqs) { + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_finalize(r)); + } + return Status::OK(); +} + +Status UccTableAllgatherImpl::WaitAll(int num_buffers) { + CYLON_UNUSED(num_buffers); + return WaitAllHelper(requests_, ucc_context_); +} + +UccTableAllgatherImpl::UccTableAllgatherImpl(ucc_team_h ucc_team, + ucc_context_h ucc_context, + int ws) + : TableAllgatherImpl(), + ucc_team_(ucc_team), + ucc_context_(ucc_context), + requests_({}), + args_({}), + world_size(ws){} + +void UccTableAllgatherImpl::Init(int num_buffers) { + requests_.resize(num_buffers); + args_.resize(num_buffers); +} + +ucc_datatype_t GetUccDataType(const std::shared_ptr &data_type) { + switch (data_type->getType()) { + case Type::BOOL: + + break; + case Type::UINT8: + return UCC_DT_UINT8; + case Type::INT8: + return UCC_DT_INT8; + case Type::UINT16: + return UCC_DT_UINT16; + case Type::INT16: + return UCC_DT_INT16; + case Type::UINT32: + return UCC_DT_UINT32; + case Type::INT32: + return UCC_DT_INT32; + case Type::UINT64: + return UCC_DT_UINT64; + case Type::INT64: + return UCC_DT_INT64; + case Type::FLOAT: + return UCC_DT_FLOAT32; + case Type::DOUBLE: + return UCC_DT_FLOAT64; + case Type::FIXED_SIZE_BINARY: + case Type::STRING: + case Type::BINARY: + case Type::LARGE_STRING: + case Type::LARGE_BINARY: + return UCC_DT_UINT8; + // todo: MPI does not support 16byte floats. We'll have to use a custom + // datatype for this later. + case Type::HALF_FLOAT: + return UCC_DT_FLOAT16; + case Type::DATE32: + return UCC_DT_UINT32; + case Type::DATE64: + case Type::TIMESTAMP: + return UCC_DT_UINT64; + case Type::TIME32: + return UCC_DT_UINT32; + case Type::TIME64: + return UCC_DT_UINT64; + case Type::DECIMAL: + case Type::DURATION: + case Type::INTERVAL: + case Type::LIST: + case Type::FIXED_SIZE_LIST: + case Type::EXTENSION: + case Type::MAX_ID: + break; + } + return UCC_DT_PREDEFINED_LAST; +} + +ucc_reduction_op_t GetUccOp(cylon::net::ReduceOp reduce_op) { + switch (reduce_op) { + case net::SUM: + return UCC_OP_SUM; + case net::MIN: + return UCC_OP_MIN; + case net::MAX: + return UCC_OP_MAX; + case net::PROD: + return UCC_OP_PROD; + case net::LAND: + return UCC_OP_LAND; + case net::LOR: + return UCC_OP_LOR; + case net::BAND: + return UCC_OP_BAND; + case net::BOR: + return UCC_OP_BOR; + default: + return UCC_OP_LAST; + } +} + +UccAllReduceImpl::UccAllReduceImpl(ucc_team_h ucc_team, ucc_context_h ucc_context) + : ucc_team_(ucc_team), ucc_context_(ucc_context) {} + +Status UccAllReduceImpl::AllReduceBuffer(const void *send_buf, void *rcv_buf, + int count, + const std::shared_ptr &data_type, + cylon::net::ReduceOp reduce_op) const { + auto dt = GetUccDataType(data_type); + auto op = GetUccOp(reduce_op); + + if (dt == UCC_DT_PREDEFINED_LAST || op == UCC_OP_LAST) { + return {Code::NotImplemented, "ucc allreduce not implemented for type or operation"}; + } + + ucc_coll_req_h req; + ucc_coll_args_t args; + + args.mask = 0; + args.coll_type = UCC_COLL_TYPE_ALLREDUCE; + args.src.info.buffer = const_cast(send_buf); + args.src.info.count = count; + args.src.info.datatype = dt; + args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + args.dst.info.buffer = rcv_buf; + args.dst.info.count = count; + args.dst.info.datatype = dt; + args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; + args.op = op; + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_init(&args, &req, ucc_team_)); + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_post(req)); + + while (UCC_INPROGRESS == ucc_collective_test(req)) { + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_progress(ucc_context_)); + } + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_finalize(req)); + return Status::OK(); +} + +UccTableGatherImpl::UccTableGatherImpl(ucc_team_h ucc_team, + ucc_context_h ucc_context, int rk, int ws) + : ucc_team_(ucc_team), ucc_context_(ucc_context), world_size(ws), rank(rk) {} + +void UccTableGatherImpl::Init(int32_t num_buffers) { + this->requests_.resize(num_buffers); + this->args_.resize(num_buffers); +} + +Status UccTableGatherImpl::GatherBufferSizes(const int32_t *send_data, int32_t num_buffers, + int32_t *rcv_data, int32_t gather_root) const { + ucc_coll_args_t args; + ucc_coll_req_h req; + + args.mask = 0; + args.coll_type = UCC_COLL_TYPE_GATHER; + args.root = gather_root; + + args.src.info.buffer = const_cast(send_data); + args.src.info.count = static_cast(num_buffers); + args.src.info.datatype = UCC_DT_INT32; + args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + + if(rank == gather_root) { + args.dst.info.buffer = rcv_data; + args.dst.info.count = num_buffers * world_size; + args.dst.info.datatype = UCC_DT_INT32; + args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; + } + + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_collective_init(&args, &req, ucc_team_)); + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_post(req)); + + ucc_status_t status; + + while (UCC_OK != (status = ucc_collective_test(req))) { + RETURN_CYLON_STATUS_IF_UCC_FAILED(status); + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_progress(ucc_context_)); + } + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_finalize(req)); + return Status::OK(); +} + +Status UccTableGatherImpl::IgatherBufferData( + int32_t buf_idx, const uint8_t *send_data, int32_t send_count, + uint8_t *recv_data, const std::vector &recv_count, + const std::vector &displacements, int32_t gather_root) { + ucc_coll_args_t &args = args_[buf_idx]; + + args.mask = 0; + args.coll_type = UCC_COLL_TYPE_GATHERV; + args.root = gather_root; + + args.src.info.buffer = const_cast(send_data); + args.src.info.count = static_cast(send_count); + args.src.info.datatype = UCC_DT_UINT8; + args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + + if(rank == gather_root) { + args.dst.info_v.buffer = recv_data; + + args.dst.info_v.counts = (ucc_count_t *)recv_count.data(); + args.dst.info_v.displacements = + (ucc_aint_t *)displacements.data(); + args.dst.info_v.datatype = UCC_DT_UINT8; + args.dst.info_v.mem_type = UCC_MEMORY_TYPE_HOST; + } + + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_collective_init(&args, &requests_[buf_idx], ucc_team_)); + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_post(requests_[buf_idx])); + return Status::OK(); +} + +Status UccTableGatherImpl::WaitAll(int32_t num_buffers) { + CYLON_UNUSED(num_buffers); + return WaitAllHelper(requests_, ucc_context_); +} + +UccTableBcastImpl::UccTableBcastImpl(ucc_team_h ucc_team, ucc_context_h ucc_context) + : ucc_team_(ucc_team), ucc_context_(ucc_context) {} + +void UccTableBcastImpl::Init(int32_t num_buffers) { + reqs.resize(num_buffers); + args.resize(num_buffers); +} + +Status UccTableBcastImpl::BcastBufferSizes(int32_t *buffer, int32_t count, + int32_t bcast_root) const { + ucc_coll_args_t args_; + ucc_coll_req_h req; + + args_.mask = 0; + args_.coll_type = UCC_COLL_TYPE_BCAST; + args_.root = bcast_root; + + args_.src.info.buffer = buffer; + args_.src.info.count = count; + args_.src.info.datatype = UCC_DT_INT32; + args_.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_collective_init(&args_, &req, ucc_team_)); + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_post(req)); + + ucc_status_t status; + + while (UCC_OK != (status = ucc_collective_test(req))) { + RETURN_CYLON_STATUS_IF_UCC_FAILED(status); + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_progress(ucc_context_)); + } + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_finalize(req)); + + return Status::OK(); +} + +Status UccTableBcastImpl::BcastBufferData(uint8_t *buf_data, int32_t send_count, + int32_t bcast_root) const { + ucc_coll_args_t args_; + ucc_coll_req_h req; + + args_.mask = 0; + args_.coll_type = UCC_COLL_TYPE_BCAST; + args_.root = bcast_root; + + args_.src.info.buffer = buf_data; + args_.src.info.count = send_count; + args_.src.info.datatype = UCC_DT_UINT8; + args_.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_collective_init(&args_, &req, ucc_team_)); + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_post(req)); + + ucc_status_t status; + + while (UCC_OK != (status = ucc_collective_test(req))) { + RETURN_CYLON_STATUS_IF_UCC_FAILED(status); + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_progress(ucc_context_)); + } + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_finalize(req)); + + return Status::OK(); +} + +Status UccTableBcastImpl::IbcastBufferData(int32_t buf_idx, uint8_t *buf_data, + int32_t send_count, + int32_t bcast_root) { + ucc_coll_args_t& arg = args[buf_idx]; + ucc_coll_req_h& req = reqs[buf_idx]; + + arg.mask = 0; + arg.coll_type = UCC_COLL_TYPE_BCAST; + arg.root = bcast_root; + + arg.src.info.buffer = buf_data; + arg.src.info.count = send_count; + arg.src.info.datatype = UCC_DT_UINT8; + arg.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_collective_init(&arg, &req, ucc_team_)); + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_post(req)); + + return Status::OK(); +} + +Status UccTableBcastImpl::WaitAll(int32_t num_buffers) { + CYLON_UNUSED(num_buffers); + return WaitAllHelper(reqs, ucc_context_); +} + +UccAllGatherImpl::UccAllGatherImpl(ucc_team_h ucc_team, + ucc_context_h ucc_context, int ws) + : ucc_team_(ucc_team), ucc_context_(ucc_context), world_size(ws) {} + +Status UccAllGatherImpl::AllgatherBufferSize(const int32_t *send_data, + int32_t num_buffers, + int32_t *rcv_data) const { + ucc_coll_args_t args; + ucc_coll_req_h req; + + args.mask = 0; + args.coll_type = UCC_COLL_TYPE_ALLGATHER; + + args.src.info.buffer = const_cast(send_data); + args.src.info.count = static_cast(num_buffers); + args.src.info.datatype = UCC_DT_INT32; + args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + args.dst.info.buffer = rcv_data; + args.dst.info.count = num_buffers * world_size; + args.dst.info.datatype = UCC_DT_INT32; + args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; + + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_collective_init(&args, &req, ucc_team_)); + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_post(req)); + + ucc_status_t status; + + while (UCC_OK != (status = ucc_collective_test(req))) { + RETURN_CYLON_STATUS_IF_UCC_FAILED(status); + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_progress(ucc_context_)); + } + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_finalize(req)); + + return Status::OK(); +} + +Status UccAllGatherImpl::IallgatherBufferData(int32_t buf_idx, const uint8_t *send_data, + int32_t send_count, uint8_t *recv_data, + const std::vector &recv_count, + const std::vector &displacements) { + requests_.resize(3); + args_.resize(3); + + ucc_coll_args_t &args = args_[buf_idx]; + + args.mask = 0; + args.coll_type = UCC_COLL_TYPE_ALLGATHERV; + + args.src.info.buffer = const_cast(send_data); + args.src.info.count = static_cast(send_count); + args.src.info.datatype = UCC_DT_UINT8; + args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + + args.dst.info_v.buffer = recv_data; + args.dst.info_v.counts = (ucc_count_t *)recv_count.data(); + args.dst.info_v.displacements = (ucc_aint_t *)displacements.data(); + args.dst.info_v.datatype = UCC_DT_UINT8; + args.dst.info_v.mem_type = UCC_MEMORY_TYPE_HOST; + + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_collective_init(&args, &requests_[buf_idx], ucc_team_)); + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_post(requests_[buf_idx])); + return Status::OK(); +} + +Status UccAllGatherImpl::WaitAll() { + return WaitAllHelper(requests_, ucc_context_); +} + +} // namespace ucc +} // namespace cylon \ No newline at end of file diff --git a/cpp/src/cylon/net/ucc/ucc_operations.hpp b/cpp/src/cylon/net/ucc/ucc_operations.hpp new file mode 100644 index 000000000..3beeffd6d --- /dev/null +++ b/cpp/src/cylon/net/ucc/ucc_operations.hpp @@ -0,0 +1,134 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "cylon/util/macros.hpp" +#include "cylon/net/comm_operations.hpp" +#include "cylon/net/ops/base_ops.hpp" +#include "cylon/net/comm_operations.hpp" + +namespace cylon { +namespace ucc { + +class UccTableAllgatherImpl : public net::TableAllgatherImpl { + public: + UccTableAllgatherImpl(ucc_team_h ucc_team, ucc_context_h ucc_context, int world_size); + ~UccTableAllgatherImpl() override = default; + + void Init(int num_buffers) override; + + Status AllgatherBufferSizes(const int32_t *send_data, int num_buffers, + int32_t *rcv_data) const override; + + Status IallgatherBufferData( + int buf_idx, const uint8_t *send_data, int32_t send_count, + uint8_t *recv_data, const std::vector &recv_count, + const std::vector &displacements) override; + + Status WaitAll(int num_buffers) override; + + private: + ucc_team_h ucc_team_; + ucc_context_h ucc_context_; + std::vector requests_; + std::vector args_; + int world_size; +}; + +class UccTableGatherImpl : public net::TableGatherImpl { + public: + UccTableGatherImpl(ucc_team_h ucc_team, ucc_context_h ucc_context, int rank, + int world_size); + + ~UccTableGatherImpl() override = default; + + void Init(int32_t num_buffers) override; + + Status GatherBufferSizes(const int32_t *send_data, int32_t num_buffers, + int32_t *rcv_data, + int32_t gather_root) const override; + + Status IgatherBufferData(int32_t buf_idx, const uint8_t *send_data, + int32_t send_count, uint8_t *recv_data, + const std::vector &recv_count, + const std::vector &displacements, + int32_t gather_root) override; + + Status WaitAll(int32_t num_buffers) override; + + private: + std::vector requests_; + std::vector args_; + ucc_team_h ucc_team_; + ucc_context_h ucc_context_; + int world_size; + int rank; +}; + +class UccAllReduceImpl : public net::AllReduceImpl { +public: + UccAllReduceImpl(ucc_team_h ucc_team, ucc_context_h ucc_context); + ~UccAllReduceImpl() override = default; + + Status AllReduceBuffer(const void *send_buf, void *rcv_buf, int count, + const std::shared_ptr &data_type, + net::ReduceOp reduce_op) const override; + +private: + ucc_team_h ucc_team_; + ucc_context_h ucc_context_; +}; + +class UccTableBcastImpl : public net::TableBcastImpl { + public: + UccTableBcastImpl(ucc_team_h ucc_team, ucc_context_h ucc_context); + ~UccTableBcastImpl() override = default; + void Init(int32_t num_buffers) override; + Status BcastBufferSizes(int32_t *buffer, int32_t count, + int32_t bcast_root) const override; + Status BcastBufferData(uint8_t *buf_data, int32_t send_count, + int32_t bcast_root) const override; + Status IbcastBufferData(int32_t buf_idx, uint8_t *buf_data, + int32_t send_count, int32_t bcast_root) override; + Status WaitAll(int32_t num_buffers) override; + private: + ucc_team_h ucc_team_; + ucc_context_h ucc_context_; + std::vector reqs; + std::vector args; +}; + +class UccAllGatherImpl : public net::AllGatherImpl { + public: + UccAllGatherImpl(ucc_team_h ucc_team, ucc_context_h ucc_context, int world_size); + ~UccAllGatherImpl() override = default; + Status AllgatherBufferSize(const int32_t *send_data, int32_t num_buffers, + int32_t *rcv_data) const override; + Status IallgatherBufferData(int32_t buf_idx, const uint8_t *send_data, + int32_t send_count, uint8_t *recv_data, + const std::vector &recv_count, + const std::vector &displacements) override; + Status WaitAll() override; + private: + ucc_team_h ucc_team_; + ucc_context_h ucc_context_; + std::vector requests_; + std::vector args_; + int world_size; +}; + +} // namespace ucc +} // namespace cylon \ No newline at end of file diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index 4cf714a69..964d76efb 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -14,18 +14,25 @@ #include #include -#include -#include -#include -#include + +#include "cylon/net/communicator.hpp" +#include "cylon/net/ucx/ucx_communicator.hpp" +#include "cylon/net/ucx/ucx_channel.hpp" +#include "cylon/util/macros.hpp" + +#ifdef BUILD_CYLON_UCC +#include "cylon/net/ucc/ucc_operations.hpp" +#endif namespace cylon { namespace net { -void UCXConfig::DummyConfig(int dummy) { - this->AddConfig("Dummy", &dummy); -} -int UCXConfig::GetDummyConfig() { - return *reinterpret_cast(this->GetConfig("Dummy")); + +void mpi_check_and_finalize() { + int mpi_finalized; + MPI_Finalized(&mpi_finalized); + if (!mpi_finalized) { + MPI_Finalize(); + } } CommType UCXConfig::Type() { @@ -46,8 +53,72 @@ int UCXCommunicator::GetRank() const { int UCXCommunicator::GetWorldSize() const { return this->world_size; } -Status UCXCommunicator::Init(const std::shared_ptr &config) { + +Status UCXCommunicator::AllGather(const std::shared_ptr
&table, + std::vector> *out) const { + CYLON_UNUSED(table); + CYLON_UNUSED(out); + return {Code::NotImplemented, "All gather not implemented for ucx"}; +} + +Status UCXCommunicator::Gather(const std::shared_ptr
&table, + int gather_root, + bool gather_from_root, + std::vector> *out) const { + CYLON_UNUSED(table); + CYLON_UNUSED(gather_root); + CYLON_UNUSED(gather_from_root); + CYLON_UNUSED(out); + return {Code::NotImplemented, "All gather not implemented for ucx"}; +} + +Status UCXCommunicator::Bcast(std::shared_ptr
*table, int bcast_root, + const std::shared_ptr &ctx) const { + CYLON_UNUSED(table); + CYLON_UNUSED(bcast_root); + CYLON_UNUSED(ctx); + return {Code::NotImplemented, "Bcast not implemented for ucx"}; +} + +Status UCXCommunicator::AllReduce(const std::shared_ptr &column, + net::ReduceOp reduce_op, + std::shared_ptr *output) const { + CYLON_UNUSED(column); + CYLON_UNUSED(reduce_op); + CYLON_UNUSED(output); + return {Code::NotImplemented, "Allreduce not implemented for ucx"}; +} + +UCXCommunicator::UCXCommunicator(MemoryPool *pool) : Communicator(pool, -1, -1) {} + +Status UCXCommunicator::AllReduce(const std::shared_ptr &values, + net::ReduceOp reduce_op, + std::shared_ptr *output) const { + CYLON_UNUSED(values); + CYLON_UNUSED(reduce_op); + CYLON_UNUSED(output); + return {Code::NotImplemented, "Allreduce not implemented for ucx"}; +} + +Status UCXCommunicator::Allgather(const std::shared_ptr &values, + std::vector> *output) const { + CYLON_UNUSED(values); + CYLON_UNUSED(output); + return {Code::NotImplemented, "Allgather not implemented for ucx"}; +} + +Status UCXCommunicator::Allgather(const std::shared_ptr &value, + std::shared_ptr *output) const { + CYLON_UNUSED(value); + CYLON_UNUSED(output); + return {Code::NotImplemented, "Allgather not implemented for ucx"}; +} + +Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPool *pool, + std::shared_ptr *out) { CYLON_UNUSED(config); + *out = std::make_shared(pool); + auto &comm = static_cast(**out); // Check init functions int initialized; // Int variable used when iterating @@ -69,16 +140,18 @@ Status UCXCommunicator::Init(const std::shared_ptr &config) { } // Get the rank for checking send to self, and initializations - MPI_Comm_rank(MPI_COMM_WORLD, &this->rank); - MPI_Comm_size(MPI_COMM_WORLD, &this->world_size); + MPI_Comm_rank(MPI_COMM_WORLD, &comm.rank); + MPI_Comm_size(MPI_COMM_WORLD, &comm.world_size); + + int rank = comm.rank, world_size = comm.world_size; // Init context - RETURN_CYLON_STATUS_IF_FAILED(cylon::ucx::initContext(&ucpContext, nullptr)); + RETURN_CYLON_STATUS_IF_FAILED(cylon::ucx::initContext(&comm.ucpContext, nullptr)); // Init recv worker and get address - ucpRecvWorkerAddr = cylon::ucx::initWorker(ucpContext, &ucpRecvWorker); + ucpRecvWorkerAddr = cylon::ucx::initWorker(comm.ucpContext, &comm.ucpRecvWorker); // Init send worker - ucpSendWorkerAddr = cylon::ucx::initWorker(ucpContext, &ucpSendWorker); + ucpSendWorkerAddr = cylon::ucx::initWorker(comm.ucpContext, &comm.ucpSendWorker); // Gather all worker addresses // All addresses buffer for allGather @@ -92,13 +165,14 @@ Status UCXCommunicator::Init(const std::shared_ptr &config) { MPI_COMM_WORLD)); // Iterate and set the sends - for (sIndx = 0; sIndx < this->world_size; sIndx++) { + comm.endPointMap.reserve(world_size); + for (sIndx = 0; sIndx < world_size; sIndx++) { ucp_ep_params_t epParams; ucp_ep_h ep; // If not self, then check if the worker address has been received. // If self,then assign local worker - if (this->rank != sIndx) { + if (rank != sIndx) { address = reinterpret_cast(allAddresses.get() + sIndx * ucpRecvWorkerAddr->addrSize); } else { @@ -112,9 +186,9 @@ Status UCXCommunicator::Init(const std::shared_ptr &config) { epParams.err_mode = UCP_ERR_HANDLING_MODE_NONE; // Create an endpoint - ucxStatus = ucp_ep_create(ucpSendWorker, &epParams, &ep); + ucxStatus = ucp_ep_create(comm.ucpSendWorker, &epParams, &ep); - endPointMap[sIndx] = ep; + comm.endPointMap[sIndx] = ep; // Check if the endpoint was created properly if (ucxStatus != UCS_OK) { LOG(FATAL) << "Error when creating the endpoint."; @@ -129,9 +203,13 @@ Status UCXCommunicator::Init(const std::shared_ptr &config) { return Status::OK(); } + void UCXCommunicator::Finalize() { - ucp_cleanup(ucpContext); - MPI_Finalize(); + if (!this->IsFinalized()) { + ucp_cleanup(ucpContext); + mpi_check_and_finalize(); + finalized = true; + } } void UCXCommunicator::Barrier() { @@ -142,59 +220,173 @@ CommType UCXCommunicator::GetCommType() const { return UCX; } -Status UCXCommunicator::AllGather(const std::shared_ptr
&table, - std::vector> *out) const { - CYLON_UNUSED(table); - CYLON_UNUSED(out); - return {Code::NotImplemented, "All gather not implemented yet for ucx"}; +#ifdef BUILD_CYLON_UCC + +static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, + void *coll_info, void **req) { + auto comm = (MPI_Comm) coll_info; + MPI_Request request; + + MPI_Iallgather(sbuf, (int) msglen, MPI_BYTE, rbuf, (int) msglen, MPI_BYTE, comm, + &request); + *req = (void *) request; + return UCC_OK; } -Status UCXCommunicator::Gather(const std::shared_ptr
&table, - int gather_root, - bool gather_from_root, - std::vector> *out) const { - CYLON_UNUSED(table); - CYLON_UNUSED(gather_root); - CYLON_UNUSED(gather_from_root); - CYLON_UNUSED(out); - return {Code::NotImplemented, "All gather not implemented yet for ucx"}; +static ucc_status_t oob_allgather_test(void *req) { + auto request = (MPI_Request) req; + int completed; + + MPI_Test(&request, &completed, MPI_STATUS_IGNORE); + return completed ? UCC_OK : UCC_INPROGRESS; } -Status UCXCommunicator::Bcast(std::shared_ptr
*table, int bcast_root) const { - CYLON_UNUSED(table); - CYLON_UNUSED(bcast_root); - return {Code::NotImplemented, "All gather not implemented yet for ucx"}; +static ucc_status_t oob_allgather_free(void *req) { + CYLON_UNUSED(req); + return UCC_OK; } -Status UCXCommunicator::AllReduce(const std::shared_ptr &column, - net::ReduceOp reduce_op, - std::shared_ptr *output) const { - CYLON_UNUSED(column); - CYLON_UNUSED(reduce_op); - CYLON_UNUSED(output); - return {Code::NotImplemented, "Allreduce not implemented yet for ucx"}; + +UCXUCCCommunicator::UCXUCCCommunicator(std::shared_ptr ucx_comm) + : Communicator(ucx_comm->GetMemoryPool(), ucx_comm->GetRank(), ucx_comm->GetWorldSize()), + ucx_comm_(std::move(ucx_comm)) {} + +Status UCXUCCCommunicator::Make(const std::shared_ptr &config, + MemoryPool *pool, + std::shared_ptr *out) { + std::shared_ptr ucx_comm; + RETURN_CYLON_STATUS_IF_FAILED(UCXCommunicator::Make(config, pool, &ucx_comm)); + + *out = std::make_shared(std::move(ucx_comm)); + auto &comm = *std::static_pointer_cast(*out); + + // initialize UCC team and context + ucc_context_params_t ctx_params; + ucc_team_params_t team_params; + ucc_context_config_h ctx_config; + ucc_status_t status; + ucc_lib_h lib; + ucc_lib_config_h lib_config; + + // init ucc lib + ucc_lib_params_t lib_params = {.mask = UCC_LIB_PARAM_FIELD_THREAD_MODE, + .thread_mode = UCC_THREAD_SINGLE, + .coll_types = {}, + .reduction_types = {}, + .sync_type = {}}; + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_lib_config_read(nullptr, nullptr, &lib_config)); + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_init(&lib_params, lib_config, &lib)); + ucc_lib_config_release(lib_config); + + // init ucc context + ctx_params.mask = UCC_CONTEXT_PARAM_FIELD_OOB; + ctx_params.oob.allgather = oob_allgather; + ctx_params.oob.req_test = oob_allgather_test; + ctx_params.oob.req_free = oob_allgather_free; + ctx_params.oob.coll_info = (void *) MPI_COMM_WORLD; + ctx_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); + ctx_params.oob.oob_ep = static_cast(comm.GetRank()); + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_config_read(lib, nullptr, &ctx_config)); + + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_create(lib, &ctx_params, ctx_config, + &comm.uccContext)); + ucc_context_config_release(ctx_config); + + // init ucc team + team_params.mask = UCC_TEAM_PARAM_FIELD_OOB; + team_params.oob.allgather = oob_allgather; + team_params.oob.req_test = oob_allgather_test; + team_params.oob.req_free = oob_allgather_free; + team_params.oob.coll_info = (void *) MPI_COMM_WORLD; + team_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); + team_params.oob.oob_ep = static_cast(comm.GetRank()); + RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_team_create_post(&comm.uccContext, 1, &team_params, + &comm.uccTeam)); + while (UCC_INPROGRESS == (status = ucc_team_create_test(comm.uccTeam))) { +// RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_progress(comm.uccContext)); + } + + RETURN_CYLON_STATUS_IF_UCC_FAILED(status); + return Status::OK(); } -UCXCommunicator::UCXCommunicator(const std::shared_ptr *ctx_ptr) - : Communicator(ctx_ptr) {} -Status UCXCommunicator::AllReduce(const std::shared_ptr &values, - net::ReduceOp reduce_op, - std::shared_ptr *output) const { - CYLON_UNUSED(values); - CYLON_UNUSED(reduce_op); - CYLON_UNUSED(output); - return {Code::NotImplemented, "Allreduce not implemented yet for ucx"}; +CommType UCXUCCCommunicator::GetCommType() const { + return UCX; } -Status UCXCommunicator::Allgather(const std::shared_ptr &values, - std::vector> *output) const { - CYLON_UNUSED(values); - CYLON_UNUSED(output); - return {Code::NotImplemented, "Allgather not implemented yet for ucx"}; + +std::unique_ptr UCXUCCCommunicator::CreateChannel() const { + return ucx_comm_->CreateChannel(); } -Status UCXCommunicator::Allgather(const std::shared_ptr &value, - std::shared_ptr *output) const { - CYLON_UNUSED(value); - CYLON_UNUSED(output); - return {Code::NotImplemented, "Allgather not implemented yet for ucx"}; + +void UCXUCCCommunicator::Finalize() { + if (!this->IsFinalized()) { + ucc_status_t status; + while (uccTeam && (UCC_INPROGRESS == (status = ucc_team_destroy(uccTeam)))) { + if (UCC_OK != status) { + LOG(ERROR) << "ucc_team_destroy failed"; + break; + } + } + ucc_context_destroy(uccContext); + mpi_check_and_finalize(); + ucx_comm_->Finalize(); + finalized = true; + } +} + +void UCXUCCCommunicator::Barrier() { + return ucx_comm_->Barrier(); } + +Status UCXUCCCommunicator::AllGather(const std::shared_ptr
&table, + std::vector> *out) const { + ucc::UccTableAllgatherImpl impl(uccTeam, uccContext, world_size); + return impl.Execute(table, out); +} + +Status UCXUCCCommunicator::Gather(const std::shared_ptr
&table, + int gather_root, + bool gather_from_root, + std::vector> *out) const { + ucc::UccTableGatherImpl impl(uccTeam, uccContext, rank, world_size); + return impl.Execute(table, gather_root, gather_from_root, out); +} + +Status UCXUCCCommunicator::Bcast(std::shared_ptr
*table, + int bcast_root, + const std::shared_ptr &ctx) const { + ucc::UccTableBcastImpl impl(uccTeam, uccContext); + // The ctx_ptr and the real context are not the same + return impl.Execute(table, bcast_root, ctx); +} + +Status UCXUCCCommunicator::AllReduce(const std::shared_ptr &column, + net::ReduceOp reduce_op, + std::shared_ptr *output) const { + ucc::UccAllReduceImpl impl(uccTeam, uccContext); + return impl.Execute(column, reduce_op, output); +} + +Status UCXUCCCommunicator::AllReduce(const std::shared_ptr &values, + net::ReduceOp reduce_op, + std::shared_ptr *output) const { + ucc::UccAllReduceImpl impl(uccTeam, uccContext); + return impl.Execute(values, reduce_op, output); +} + +Status UCXUCCCommunicator::Allgather(const std::shared_ptr &values, + std::vector> *output) const { + ucc::UccAllGatherImpl impl(uccTeam, uccContext, world_size); + return impl.Execute(values, world_size, output); +} + +Status UCXUCCCommunicator::Allgather(const std::shared_ptr &value, + std::shared_ptr *output) const { + ucc::UccAllGatherImpl impl(uccTeam, uccContext, world_size); + return impl.Execute(value, world_size, output); +} + +#endif // BUILD_CYLON_UCC } // namespace net } // namespace cylon diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.hpp b/cpp/src/cylon/net/ucx/ucx_communicator.hpp index 5e8f1642c..57ba07578 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.hpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.hpp @@ -21,14 +21,14 @@ #include +#ifdef BUILD_CYLON_UCC +#include +#endif + namespace cylon { namespace net { class UCXConfig : public CommConfig { - void DummyConfig(int dummy); - - int GetDummyConfig(); - CommType Type() override; public: @@ -37,8 +37,9 @@ class UCXConfig : public CommConfig { class UCXCommunicator : public Communicator { public: - explicit UCXCommunicator(const std::shared_ptr *ctx_ptr); - Status Init(const std::shared_ptr &config) override; + explicit UCXCommunicator(MemoryPool *pool); + ~UCXCommunicator() override = default; + std::unique_ptr CreateChannel() const override; int GetRank() const override; int GetWorldSize() const override; @@ -52,7 +53,8 @@ class UCXCommunicator : public Communicator { int gather_root, bool gather_from_root, std::vector> *out) const override; - Status Bcast(std::shared_ptr
*table, int bcast_root) const override; + Status Bcast(std::shared_ptr
*table, int bcast_root, + const std::shared_ptr &ctx) const override; Status AllReduce(const std::shared_ptr &column, net::ReduceOp reduce_op, std::shared_ptr *output) const override; @@ -64,6 +66,9 @@ class UCXCommunicator : public Communicator { Status Allgather(const std::shared_ptr &value, std::shared_ptr *output) const override; + static Status Make(const std::shared_ptr &config, MemoryPool *pool, + std::shared_ptr *out); + // # UCX specific attributes - These need to be passed to the channels created from the communicator // The worker for receiving ucp_worker_h ucpRecvWorker{}; @@ -75,6 +80,43 @@ class UCXCommunicator : public Communicator { ucp_context_h ucpContext{}; }; +#ifdef BUILD_CYLON_UCC +class UCXUCCCommunicator: public Communicator{ + public: + explicit UCXUCCCommunicator(std::shared_ptr ucx_comm); + + static Status Make(const std::shared_ptr &config, MemoryPool *pool, + std::shared_ptr *out); + + CommType GetCommType() const override; + std::unique_ptr CreateChannel() const override; + void Finalize() override; + void Barrier() override; + Status AllGather(const std::shared_ptr
&table, + std::vector> *out) const override; + Status Gather(const std::shared_ptr
&table, + int gather_root, + bool gather_from_root, + std::vector> *out) const override; + Status Bcast(std::shared_ptr
*table, + int bcast_root, + const std::shared_ptr &ctx) const override; + Status AllReduce(const std::shared_ptr &values, + net::ReduceOp reduce_op, + std::shared_ptr *output) const override; + Status Allgather(const std::shared_ptr &values, + std::vector> *output) const override; + Status AllReduce(const std::shared_ptr &value, + net::ReduceOp reduce_op, + std::shared_ptr *output) const override; + Status Allgather(const std::shared_ptr &value, + std::shared_ptr *output) const override; + + ucc_team_h uccTeam{}; + ucc_context_h uccContext{}; + std::shared_ptr ucx_comm_; +}; +#endif } } #endif //CYLON_SRC_CYLON_COMM_UCXCOMMUNICATOR_H_ diff --git a/cpp/src/cylon/table.cpp b/cpp/src/cylon/table.cpp index 65450a8b8..07964dfc8 100644 --- a/cpp/src/cylon/table.cpp +++ b/cpp/src/cylon/table.cpp @@ -600,7 +600,7 @@ Status GetSplitPoints(std::shared_ptr
&sample_result, split_points, sample_result->GetContext())); } - return ctx->GetCommunicator()->Bcast(&split_points, 0); + return ctx->GetCommunicator()->Bcast(&split_points, 0, ctx); } // return (index of) first element that is not less than the target element diff --git a/cpp/src/cylon/table.hpp b/cpp/src/cylon/table.hpp index 1b433eb01..4463649dc 100644 --- a/cpp/src/cylon/table.hpp +++ b/cpp/src/cylon/table.hpp @@ -36,6 +36,8 @@ #include #include +#include "cylon/util/macros.hpp" + namespace cylon { /** diff --git a/cpp/src/cylon/util/macros.hpp b/cpp/src/cylon/util/macros.hpp index ccf8a46d7..ffa348629 100644 --- a/cpp/src/cylon/util/macros.hpp +++ b/cpp/src/cylon/util/macros.hpp @@ -47,6 +47,16 @@ }; \ } while (0) +#define RETURN_CYLON_STATUS_IF_UCC_FAILED(expr) \ + do { \ + const auto& _st = (expr); \ + if (_st < 0) { \ + return cylon::Status( \ + cylon::Code::ExecutionError, \ + "UCC Failed: " + std::string(ucc_status_string(_st))); \ + }; \ + } while (0) + #define CYLON_ASSIGN_OR_RAISE_IMPL(result_name, lhs, rexpr) \ auto&& result_name = (rexpr); \ RETURN_CYLON_STATUS_IF_ARROW_FAILED((result_name).status()) ; \ diff --git a/cpp/src/examples/CMakeLists.txt b/cpp/src/examples/CMakeLists.txt index bc88311a3..cd9042a14 100644 --- a/cpp/src/examples/CMakeLists.txt +++ b/cpp/src/examples/CMakeLists.txt @@ -80,6 +80,8 @@ if (CYLON_UCX) cylon_add_exe(ucx_join_example) if (CYLON_UCC) cylon_add_exe(ucc_example) + cylon_add_exe(ucc_operators_example) + cylon_add_exe(ucc_allgather_vector_example) endif () endif (CYLON_UCX) diff --git a/cpp/src/examples/compute_example.cpp b/cpp/src/examples/compute_example.cpp index a67461ee4..98bb39671 100644 --- a/cpp/src/examples/compute_example.cpp +++ b/cpp/src/examples/compute_example.cpp @@ -44,7 +44,11 @@ int main() { int32_t agg_index = 1; auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr table; std::shared_ptr result; diff --git a/cpp/src/examples/gcylon/gjoin.cpp b/cpp/src/examples/gcylon/gjoin.cpp index ea4528d64..56a1e5993 100644 --- a/cpp/src/examples/gcylon/gjoin.cpp +++ b/cpp/src/examples/gcylon/gjoin.cpp @@ -29,7 +29,11 @@ int main(int argc, char *argv[]) { const int64_t ROWS = 10; auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } int my_rank = ctx->GetRank(); LOG(INFO) << "my_rank: " << my_rank << ", world size: " << ctx->GetWorldSize(); diff --git a/cpp/src/examples/gcylon/gjoin_csv.cpp b/cpp/src/examples/gcylon/gjoin_csv.cpp index e49c4d722..16cb70a26 100644 --- a/cpp/src/examples/gcylon/gjoin_csv.cpp +++ b/cpp/src/examples/gcylon/gjoin_csv.cpp @@ -34,7 +34,11 @@ int main(int argc, char *argv[]) { std::string input_csv_file2 = argv[2]; auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } int my_rank = ctx->GetRank(); LOG(INFO) << "my_rank: " << my_rank << ", world size: " << ctx->GetWorldSize(); diff --git a/cpp/src/examples/gcylon/gshuffle.cpp b/cpp/src/examples/gcylon/gshuffle.cpp index 002e2255d..edb26a211 100644 --- a/cpp/src/examples/gcylon/gshuffle.cpp +++ b/cpp/src/examples/gcylon/gshuffle.cpp @@ -63,7 +63,11 @@ int main(int argc, char *argv[]) { const bool RESULT_TO_FILE = false; auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } int my_rank = ctx->GetRank(); int number_of_GPUs; diff --git a/cpp/src/examples/gcylon/gsort.cpp b/cpp/src/examples/gcylon/gsort.cpp index ced350e16..ea858a37d 100644 --- a/cpp/src/examples/gcylon/gsort.cpp +++ b/cpp/src/examples/gcylon/gsort.cpp @@ -46,7 +46,11 @@ int main(int argc, char *argv[]) { const bool RESULT_TO_FILE = true; auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } int my_rank = ctx->GetRank(); int number_of_GPUs; diff --git a/cpp/src/examples/groupby_example.cpp b/cpp/src/examples/groupby_example.cpp index 293d2e213..10e527cb0 100644 --- a/cpp/src/examples/groupby_example.cpp +++ b/cpp/src/examples/groupby_example.cpp @@ -40,7 +40,11 @@ int main(int argc, char *argv[]) { auto start_start = std::chrono::steady_clock::now(); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr first_table, output; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); diff --git a/cpp/src/examples/groupby_perf.cpp b/cpp/src/examples/groupby_perf.cpp index d33657c7e..d3df95a1f 100644 --- a/cpp/src/examples/groupby_perf.cpp +++ b/cpp/src/examples/groupby_perf.cpp @@ -35,7 +35,11 @@ int main(int argc, char *argv[]) { auto start_start = std::chrono::steady_clock::now(); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr table, output; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); diff --git a/cpp/src/examples/groupby_pipeline_example.cpp b/cpp/src/examples/groupby_pipeline_example.cpp index 253d8f9cc..6d3461eee 100644 --- a/cpp/src/examples/groupby_pipeline_example.cpp +++ b/cpp/src/examples/groupby_pipeline_example.cpp @@ -56,7 +56,11 @@ int main(int argc, char *argv[]) { auto start_start = std::chrono::steady_clock::now(); auto mpi_config = cylon::net::MPIConfig::Make(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } arrow::MemoryPool *pool = arrow::default_memory_pool(); std::shared_ptr left_table; diff --git a/cpp/src/examples/indexing_example.cpp b/cpp/src/examples/indexing_example.cpp index d2f937840..34a59f363 100644 --- a/cpp/src/examples/indexing_example.cpp +++ b/cpp/src/examples/indexing_example.cpp @@ -285,11 +285,15 @@ int arrow_indexer_test_1() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -352,11 +356,15 @@ int arrow_indexer_str_test_1() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -412,11 +420,15 @@ int arrow_indexer_test_2() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -475,11 +487,15 @@ int arrow_indexer_str_test_2() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -538,11 +554,15 @@ int arrow_indexer_test_3() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -600,11 +620,15 @@ int arrow_indexer_str_test_3() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -662,11 +686,15 @@ int arrow_indexer_test_4() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -723,11 +751,15 @@ int arrow_indexer_str_test_4() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -784,11 +816,15 @@ int arrow_indexer_test_5() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -847,11 +883,15 @@ int arrow_indexer_str_test_5() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -910,11 +950,15 @@ int arrow_indexer_test_6() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -972,11 +1016,15 @@ int arrow_indexer_str_test_6() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -1039,11 +1087,15 @@ int arrow_iloc_indexer_test_1() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -1099,11 +1151,15 @@ int arrow_iloc_indexer_test_2() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -1162,11 +1218,15 @@ int arrow_iloc_indexer_test_3() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -1224,11 +1284,15 @@ int arrow_iloc_indexer_test_4() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -1285,11 +1349,15 @@ int arrow_iloc_indexer_test_5() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -1349,11 +1417,15 @@ int arrow_iloc_indexer_test_6() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table @@ -1426,11 +1498,15 @@ int arrow_filter_example() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output; + std::shared_ptr input, output; int64_t capacity = 10; CYLON_UNUSED(capacity); int64_t search_capacity = 4; @@ -1543,11 +1619,15 @@ int arrow_range_indexer_test() { separator(func_title); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } cylon::Status status; - std::shared_ptr input, output, output1; + std::shared_ptr input, output, output1; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); // read first table diff --git a/cpp/src/examples/intersect_example.cpp b/cpp/src/examples/intersect_example.cpp index c55b3e1b5..da1146006 100644 --- a/cpp/src/examples/intersect_example.cpp +++ b/cpp/src/examples/intersect_example.cpp @@ -32,7 +32,11 @@ int main(int argc, char *argv[]) { auto start_time = std::chrono::steady_clock::now(); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr first_table, second_table, unioned_table; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); @@ -42,7 +46,7 @@ int main(int argc, char *argv[]) { if (mem == "m") { uint64_t count = std::stoull(argv[3]); double dup = std::stod(argv[4]); - cylon::examples::create_two_in_memory_tables(count, dup,ctx,first_table,second_table); + cylon::examples::create_two_in_memory_tables(count, dup, ctx, first_table, second_table); } else if (mem == "f") { cylon::FromCSV(ctx, std::string(argv[3]) + std::to_string(ctx->GetRank()) + ".csv", first_table); cylon::FromCSV(ctx, std::string(argv[4]) + std::to_string(ctx->GetRank()) + ".csv", second_table); diff --git a/cpp/src/examples/join_example.cpp b/cpp/src/examples/join_example.cpp index d59abf698..91a708fa9 100644 --- a/cpp/src/examples/join_example.cpp +++ b/cpp/src/examples/join_example.cpp @@ -34,7 +34,11 @@ int main(int argc, char *argv[]) { auto start_start = std::chrono::steady_clock::now(); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr first_table, second_table, joined; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); diff --git a/cpp/src/examples/multi_idx_join_example.cpp b/cpp/src/examples/multi_idx_join_example.cpp index dd869a0c8..69ad18a34 100644 --- a/cpp/src/examples/multi_idx_join_example.cpp +++ b/cpp/src/examples/multi_idx_join_example.cpp @@ -42,7 +42,11 @@ int main(int argc, char *argv[]) { auto start_start = std::chrono::steady_clock::now(); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr first_table, second_table, joined; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); diff --git a/cpp/src/examples/multicolumn_sorting_example.cpp b/cpp/src/examples/multicolumn_sorting_example.cpp index 4023d7111..08d03fdbc 100644 --- a/cpp/src/examples/multicolumn_sorting_example.cpp +++ b/cpp/src/examples/multicolumn_sorting_example.cpp @@ -31,7 +31,11 @@ int main(int argc, char *argv[]) { auto start_start = std::chrono::steady_clock::now(); CYLON_UNUSED(start_start); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr first_table, output; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); diff --git a/cpp/src/examples/parquet_join_example.cpp b/cpp/src/examples/parquet_join_example.cpp index 918ec65a3..a22d2adf2 100644 --- a/cpp/src/examples/parquet_join_example.cpp +++ b/cpp/src/examples/parquet_join_example.cpp @@ -27,7 +27,11 @@ int main(int argc, char *argv[]) { auto start_start = std::chrono::steady_clock::now(); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr first_table, second_table, joined; diff --git a/cpp/src/examples/parquet_union_example.cpp b/cpp/src/examples/parquet_union_example.cpp index fccd641aa..5cd0bc3c1 100644 --- a/cpp/src/examples/parquet_union_example.cpp +++ b/cpp/src/examples/parquet_union_example.cpp @@ -30,7 +30,11 @@ int main(int argc, char *argv[]) { auto start_time = std::chrono::steady_clock::now(); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr first_table, second_table, unioned_table; diff --git a/cpp/src/examples/project_example.cpp b/cpp/src/examples/project_example.cpp index 02363f354..c94c27e2b 100644 --- a/cpp/src/examples/project_example.cpp +++ b/cpp/src/examples/project_example.cpp @@ -29,7 +29,11 @@ int main(int argc, char *argv[]) { } auto start_time = std::chrono::steady_clock::now(); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr table, project; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); diff --git a/cpp/src/examples/select_example.cpp b/cpp/src/examples/select_example.cpp index 8a8b6038e..f0177965b 100644 --- a/cpp/src/examples/select_example.cpp +++ b/cpp/src/examples/select_example.cpp @@ -30,7 +30,11 @@ int main(int argc, char *argv[]) { } auto start_time = std::chrono::steady_clock::now(); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr table, select; auto read_options = cylon::io::config::CSVReadOptions().UseThreads( diff --git a/cpp/src/examples/sorting_example.cpp b/cpp/src/examples/sorting_example.cpp index d65549c49..cd231d94d 100644 --- a/cpp/src/examples/sorting_example.cpp +++ b/cpp/src/examples/sorting_example.cpp @@ -32,7 +32,11 @@ int main(int argc, char *argv[]) { CYLON_UNUSED(start_start); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr first_table, output; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); diff --git a/cpp/src/examples/subtract_example.cpp b/cpp/src/examples/subtract_example.cpp index 90f4ceb08..427f2e64d 100644 --- a/cpp/src/examples/subtract_example.cpp +++ b/cpp/src/examples/subtract_example.cpp @@ -32,7 +32,11 @@ int main(int argc, char *argv[]) { auto start_time = std::chrono::steady_clock::now(); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr first_table, second_table, unioned_table; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); @@ -42,7 +46,7 @@ int main(int argc, char *argv[]) { if (mem == "m") { uint64_t count = std::stoull(argv[3]); double dup = std::stod(argv[4]); - cylon::examples::create_two_in_memory_tables(count, dup,ctx,first_table,second_table); + cylon::examples::create_two_in_memory_tables(count, dup, ctx, first_table, second_table); } else if (mem == "f") { cylon::FromCSV(ctx, std::string(argv[3]) + std::to_string(ctx->GetRank()) + ".csv", first_table); cylon::FromCSV(ctx, std::string(argv[4]) + std::to_string(ctx->GetRank()) + ".csv", second_table); diff --git a/cpp/src/examples/ucc_allgather_vector_example.cpp b/cpp/src/examples/ucc_allgather_vector_example.cpp new file mode 100644 index 000000000..fbedeff8e --- /dev/null +++ b/cpp/src/examples/ucc_allgather_vector_example.cpp @@ -0,0 +1,187 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * temporary example to test an issue; removed after issue resolved + * run the example as follows: + * mpirun -n 4 bin/ucc_allgather_vector_example + */ + +#include +#include + +#include +#include +#include +#include + +#define STR(x) #x +#define UCC_CHECK(_call) \ + if (UCC_OK != (_call)) { \ + fprintf(stderr, "*** UCC TEST FAIL: %s\n", STR(_call)); \ + MPI_Abort(MPI_COMM_WORLD, -1); \ + } + +static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, + void *coll_info, void **req) { + auto comm = (MPI_Comm)coll_info; + MPI_Request request; + + MPI_Iallgather(sbuf, (int)msglen, MPI_BYTE, rbuf, (int)msglen, MPI_BYTE, comm, + &request); + *req = (void *)request; + return UCC_OK; +} + +static ucc_status_t oob_allgather_test(void *req) { + auto request = (MPI_Request)req; + int completed; + + MPI_Test(&request, &completed, MPI_STATUS_IGNORE); + return completed ? UCC_OK : UCC_INPROGRESS; +} + +static ucc_status_t oob_allgather_free(void *req) { + (void) req; + return UCC_OK; +} + +/* Creates UCC team for a group of processes represented by MPI + communicator. UCC API provides different ways to create a team, + one of them is to use out-of-band (OOB) allgather provided by + the calling runtime. */ +static ucc_team_h create_ucc_team(MPI_Comm comm, ucc_context_h ctx) { + int rank, size; + ucc_team_h team; + ucc_team_params_t team_params; + ucc_status_t status; + + MPI_Comm_rank(comm, &rank); + MPI_Comm_size(comm, &size); + + team_params.mask = UCC_TEAM_PARAM_FIELD_OOB | UCC_TEAM_PARAM_FIELD_EP | + UCC_TEAM_PARAM_FIELD_EP_RANGE; + team_params.oob.allgather = oob_allgather; + team_params.oob.req_test = oob_allgather_test; + team_params.oob.req_free = oob_allgather_free; + team_params.oob.coll_info = (void *)comm; + team_params.oob.n_oob_eps = size; + team_params.oob.oob_ep = rank; + team_params.ep = rank; + team_params.ep_range = UCC_COLLECTIVE_EP_RANGE_CONTIG; + + UCC_CHECK(ucc_team_create_post(&ctx, 1, &team_params, &team)); + while (UCC_INPROGRESS == (status = ucc_team_create_test(team))) { + UCC_CHECK(ucc_context_progress(ctx)); + }; + if (UCC_OK != status) { + fprintf(stderr, "failed to create ucc team\n"); + MPI_Abort(MPI_COMM_WORLD, status); + } + return team; +} + +int main(int argc, char **argv) { + ucc_lib_config_h lib_config; + ucc_context_config_h ctx_config; + int rank, size, i; + ucc_team_h team; + ucc_context_h ctx; + ucc_lib_h lib; + size_t msglen; + ucc_coll_req_h req; + ucc_coll_args_t args; + + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); + + /* Init ucc library */ + ucc_lib_params_t lib_params = {.mask = UCC_LIB_PARAM_FIELD_THREAD_MODE, + .thread_mode = UCC_THREAD_SINGLE, + .coll_types = {}, + .reduction_types = {}, + .sync_type = {}}; + UCC_CHECK(ucc_lib_config_read(nullptr, nullptr, &lib_config)); + UCC_CHECK(ucc_init(&lib_params, lib_config, &lib)); + ucc_lib_config_release(lib_config); + + /* Init ucc context for a specified UCC_TEST_TLS */ + ucc_context_params_t ctx_params; + ctx_params.mask = UCC_CONTEXT_PARAM_FIELD_OOB; + ctx_params.oob.allgather = oob_allgather; + ctx_params.oob.req_test = oob_allgather_test; + ctx_params.oob.req_free = oob_allgather_free; + ctx_params.oob.coll_info = (void *)MPI_COMM_WORLD; + ctx_params.oob.n_oob_eps = static_cast(size); + ctx_params.oob.oob_ep = static_cast(rank); + + UCC_CHECK(ucc_context_config_read(lib, nullptr, &ctx_config)); + UCC_CHECK(ucc_context_create(lib, &ctx_params, ctx_config, &ctx)); + + ucc_context_config_release(ctx_config); + + team = create_ucc_team(MPI_COMM_WORLD, ctx); + + std::vector count(size), disp(size); + std::vector send(10), receive(size * 10); + + for(int i = 0; i < 10; i++) { + send[i] = i + rank * 10 + 1; + } + + for(int i = 0; i < size; i++) { + count[i] = 10; + disp[i] = 10 * i; + } + + args.mask = UCC_COLL_ARGS_FIELD_FLAGS; + args.flags = + UCC_COLL_ARGS_FLAG_DISPLACEMENTS_64BIT | UCC_COLL_ARGS_FLAG_COUNT_64BIT; + args.coll_type = UCC_COLL_TYPE_ALLGATHERV; + args.src.info.buffer = send.data(); + args.src.info.count = 10; + args.src.info.datatype = UCC_DT_INT32; + args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + + args.dst.info_v.buffer = receive.data(); + args.dst.info_v.counts = count.data(); + args.dst.info_v.displacements = disp.data(); + args.dst.info_v.datatype = UCC_DT_INT32; + args.dst.info_v.mem_type = UCC_MEMORY_TYPE_HOST; + + UCC_CHECK(ucc_collective_init(&args, &req, team)); + UCC_CHECK(ucc_collective_post(req)); + while (UCC_INPROGRESS == ucc_collective_test(req)) { + UCC_CHECK(ucc_context_progress(ctx)); + } + ucc_collective_finalize(req); + + /* Check result */ + int sum = ((size + 1) * size) / 2; + for (i = 0; i < 10 * size; i++) { + std::cout< + +#include + +#include +#include +#include +#include +#include + +auto read_options = cylon::io::config::CSVReadOptions() + .UseThreads(false) + .BlockSize(1 << 30) + .WithDelimiter('\t'); + +cylon::Status readInputCsv(int i, + const std::shared_ptr& ctx, + std::shared_ptr& table) { + return cylon::FromCSV(ctx, + "/home/ky/cylon/data/input/csv1_" + std::to_string(i) + ".csv", + table, read_options); +} + +void testTableAllgather(std::shared_ptr& ctx) { + int ws = ctx->GetWorldSize(); + if(ws > 4) { + std::cout<<"table allgather test can only take 4 or less processes."< table; + std::vector> out, original(ws); + + readInputCsv(ctx->GetRank(), ctx, table); + + for(int i = 0; i < ws; i++) { + readInputCsv(i, ctx, original[i]); + } + + ctx->GetCommunicator()->AllGather(table, &out); + + for(int i = 0; i < ws; i++) { + bool result; + cylon::Equals(out[i], original[i], result); + if(!result) { + std::cout<<"table allgather test failed at rank "<< ctx->GetRank() <GetRank() << std::endl; +} + +void testColumnAllgather(std::shared_ptr& ctx) { + std::shared_ptr input; + std::vector> output; + std::vector v(10); + + for (int i = 0; i < 10; i++) { + v[i] = i + ctx->GetRank() * 10 + 1; + } + cylon::Column::FromVector(v, input); + + ctx->GetCommunicator()->Allgather(input, &output); + + for(int i = 0; i < ctx->GetWorldSize(); i++) { + auto c = output[i]; + for (int j = 0; j < 10; j++) { + int32_t result = std::static_pointer_cast( + c->data()->GetScalar(j).ValueOrDie())->value; + + if(result != i * 10 + j + 1) { + std::cout<<"column gather test failed"<GetRank() << std::endl; +} + +void testScalarAllgather(std::shared_ptr& ctx) { + std::shared_ptr column; + std::vector v = {ctx->GetRank()}; + cylon::Column::FromVector(v, column); + + std::shared_ptr arrow_scalar = column->data()->GetScalar(0).ValueOrDie(); + std::shared_ptr scalar = cylon::Scalar::Make(arrow_scalar); + + std::shared_ptr out; + ctx->GetCommunicator()->Allgather(scalar, &out); + + for(int i = 0; i < ctx->GetWorldSize(); i++) { + int32_t result = std::static_pointer_cast(out->data()->GetScalar(i).ValueOrDie())->value; + if(result != i) { + std::cout<<"scalar gather test failed at rank "<GetRank()<GetRank()<& table, + std::shared_ptr& ctx) { + std::vector> out; + auto status = ctx->GetCommunicator()->Gather(table, 0, 1, &out); + std::cout<GetRank() == 0) { + std::cout<<"out size: "<Print(); + // std::cout<get_table()->num_rows()<& ctx) { + std::shared_ptr table, out, original; + if(ctx->GetRank() == 0) { + readInputCsv(0, ctx, table); + } else { + readInputCsv(0, ctx, original); + } + ctx->GetCommunicator()->Bcast(&table, 0, ctx); + + if(ctx->GetRank() != 0) { + bool result; + cylon::Equals(table, original, result); + if(!result) { + std::cout<<"table bcast test failed at rank " << ctx->GetRank() <GetRank() <& ctx) { + std::shared_ptr input, output; + std::vector v(10); + int ws = ctx->GetWorldSize(); + + for(int i = 0; i < 10; i++) { + v[i] = i + ctx->GetRank() * 10; + } + cylon::Column::FromVector(v, input); + + ctx->GetCommunicator()->AllReduce(input, cylon::net::ReduceOp::SUM, &output); + + for (int i = 0; i < 10; i++) { + auto result = std::static_pointer_cast(output->data()->GetScalar(i).ValueOrDie())->value; + if(result != ws * i + (ws - 1) * ws / 2 * 10) { + std::cout << "column allreduce test failed at rank " << ctx->GetRank() + << std::endl; + return; + } + } + std::cout << "column allreduce test passed at rank " << ctx->GetRank() + << std::endl; +} + +void testScalarAllReduce(std::shared_ptr& ctx) { + std::shared_ptr column; + int ws = ctx->GetWorldSize(); + std::vector v = {ctx->GetRank() + 1}; + cylon::Column::FromVector(v, column); + + std::shared_ptr arrow_scalar = + column->data()->GetScalar(0).ValueOrDie(); + std::shared_ptr scalar = cylon::Scalar::Make(arrow_scalar), out; + + ctx->GetCommunicator()->AllReduce(scalar, cylon::net::ReduceOp::SUM, &out); + + int32_t result = std::static_pointer_cast(out->data())->value; + + if (result != (ws + 1) * ws / 2) { + std::cout << "scalar allreduce test failed at rank " << ctx->GetRank() + << std::endl; + return; + } + std::cout << "scalar allreduce test passed at rank " << ctx->GetRank() + << std::endl; +} + +int main(int argc, char **argv) { + auto ucx_config = std::make_shared(); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(ucx_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } + + testTableAllgather(ctx); + testColumnAllgather(ctx); + testScalarAllgather(ctx); + testTableBcast(ctx); + testColumnAllReduce(ctx); + testScalarAllReduce(ctx); +} diff --git a/cpp/src/examples/ucx_join_example.cpp b/cpp/src/examples/ucx_join_example.cpp index 12c2ee8c8..34e02fccd 100644 --- a/cpp/src/examples/ucx_join_example.cpp +++ b/cpp/src/examples/ucx_join_example.cpp @@ -27,7 +27,11 @@ int main(int argc, char *argv[]) { auto start_start = std::chrono::steady_clock::now(); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr first_table, second_table, joined; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); diff --git a/cpp/src/examples/union_example.cpp b/cpp/src/examples/union_example.cpp index 28af0a423..b59c42203 100644 --- a/cpp/src/examples/union_example.cpp +++ b/cpp/src/examples/union_example.cpp @@ -35,7 +35,11 @@ int main(int argc, char *argv[]) { auto start_time = std::chrono::steady_clock::now(); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr first_table, second_table, unioned_table; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); diff --git a/cpp/src/examples/unique_example.cpp b/cpp/src/examples/unique_example.cpp index a70af7a32..b0880b5e9 100644 --- a/cpp/src/examples/unique_example.cpp +++ b/cpp/src/examples/unique_example.cpp @@ -33,7 +33,11 @@ int main(int argc, char *argv[]) { auto start_start = std::chrono::steady_clock::now(); auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } std::shared_ptr table, output; auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index 1b4d1134f..b709ec1c8 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -62,6 +62,11 @@ function(cylon_add_test TESTNAME) target_link_libraries(${exec_name} ${GLOO_LIBRARIES}) endif (CYLON_GLOO) + if (CYLON_UCX) + target_link_libraries(${exec_name} ${UCX_LIBRARIES}) + target_link_directories(${exec_name} PUBLIC ${UCC_LIBRARIES} ${UCC_LIBRARIES}/ucc) + endif (CYLON_UCX) + endfunction(cylon_add_test) # macro to run executable @@ -216,3 +221,22 @@ if (CYLON_GLOO) cylon_run_test(sync_comms_test 1 gloo-mpi) cylon_run_test(sync_comms_test 4 gloo-mpi) endif (CYLON_GLOO) + +#### ucc tests +if (CYLON_UCX) + # ucx only tests + cylon_run_test(join_test 1 ucx) + cylon_run_test(join_test 2 ucx) + cylon_run_test(join_test 4 ucx) + + cylon_run_test(parquet_join_test 1 ucx) + cylon_run_test(parquet_join_test 2 ucx) + cylon_run_test(parquet_join_test 4 ucx) + + if (CYLON_UCC) + # ucx + ucc tests + cylon_run_test(aggregate_test 1 ucx) + cylon_run_test(aggregate_test 2 ucx) + cylon_run_test(aggregate_test 4 ucx) + endif (CYLON_UCC) +endif (CYLON_UCX) \ No newline at end of file diff --git a/cpp/test/common/test_header.hpp b/cpp/test/common/test_header.hpp index ff17b4503..d6e23fa4b 100644 --- a/cpp/test/common/test_header.hpp +++ b/cpp/test/common/test_header.hpp @@ -28,6 +28,9 @@ #ifdef BUILD_CYLON_GLOO #include #endif // BUILD_CYLON_GLOO +#ifdef BUILD_CYLON_UCX +#include +#endif // BUILD_CYLON_UCX #include "test_utils.hpp" #include "test_macros.hpp" @@ -49,7 +52,8 @@ int main(int argc, char *argv[]) { std::string comm_args = "mpi"; - auto cli = session.cli() | Catch::clara::Opt(comm_args, "mpi|gloo-mpi")["--comm"]("comm args"); + auto + cli = session.cli() | Catch::clara::Opt(comm_args, "mpi|gloo-mpi|ucx")["--comm"]("comm args"); // Now pass the new composite back to Catch2 so it uses that session.cli(cli); @@ -73,6 +77,14 @@ int main(int argc, char *argv[]) { #else LOG(ERROR) << "gloo-mpi passed for tests, but tests are not built with gloo"; return 1; +#endif + } else if (comm_args == "ucx") { +#ifdef BUILD_CYLON_UCX + LOG(INFO) << "Using UCX/UCC"; + config = std::make_shared(); +#else + LOG(ERROR) << "ucx passed for tests, but tests are not built with ucx"; + return 1; #endif } else { LOG(ERROR) << "unsupported comm " << argv[1]; diff --git a/cpp/test/custom_mpi_comm_test.cpp b/cpp/test/custom_mpi_comm_test.cpp index a003df19b..02a0a1bdd 100644 --- a/cpp/test/custom_mpi_comm_test.cpp +++ b/cpp/test/custom_mpi_comm_test.cpp @@ -49,7 +49,8 @@ TEST_CASE("custom mpi communicator") { REQUIRE(MPI_Comm_split(MPI_COMM_WORLD, color, l_rank, &new_comm) == MPI_SUCCESS); auto mpi_config = cylon::net::MPIConfig::Make(new_comm); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + REQUIRE(cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()); REQUIRE(l_rank == ctx->GetRank()); if (color == 0) { @@ -67,7 +68,7 @@ TEST_CASE("custom mpi communicator") { REQUIRE(MPI_Comm_split(MPI_COMM_WORLD, color, l_rank, &new_comm) == MPI_SUCCESS); mpi_config = cylon::net::MPIConfig::Make(new_comm); - ctx = cylon::CylonContext::InitDistributed(mpi_config); + REQUIRE(cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()); REQUIRE(l_rank == ctx->GetRank()); REQUIRE(ctx->GetWorldSize() == 2); @@ -96,7 +97,7 @@ int main(int argc, char *argv[]) { Catch::Session session; std::string comm_args = "mpi"; - auto cli = session.cli() | Catch::clara::Opt(comm_args, "mpi|gloo-mpi")["--comm"]("comm args"); + auto cli = session.cli() | Catch::clara::Opt(comm_args, "mpi|gloo-mpi|ucc")["--comm"]("comm args"); // Now pass the new composite back to Catch2 so it uses that session.cli(cli); diff --git a/cpp/test/sync_comms_test.cpp b/cpp/test/sync_comms_test.cpp index fc57495bd..76911e1ca 100644 --- a/cpp/test/sync_comms_test.cpp +++ b/cpp/test/sync_comms_test.cpp @@ -204,7 +204,7 @@ TEST_CASE("bcast table", "[sync comms]") { } const auto &comm = ctx->GetCommunicator(); - CHECK_CYLON_STATUS(comm->Bcast(&table, bcast_root)); + CHECK_CYLON_STATUS(comm->Bcast(&table, bcast_root, ctx)); INFO ("world sz " + std::to_string(WORLD_SZ) + " rank " + std::to_string(RANK)); REQUIRE(table != nullptr); diff --git a/java/src/main/native/CMakeLists.txt b/java/src/main/native/CMakeLists.txt index a979388f2..d297e3c82 100644 --- a/java/src/main/native/CMakeLists.txt +++ b/java/src/main/native/CMakeLists.txt @@ -59,7 +59,7 @@ endif (JNI_FOUND) message(STATUS "Inclduing ${CYLON_ARROW_DIR}/install/include/") include_directories("include" - "${JNI_INCLUDE_DIRS}" ${CYLON_INCLUDE} "${CYLON_ARROW_DIR}/install/include/") + "${JNI_INCLUDE_DIRS}" ${CYLON_INCLUDE} "${CYLON_ARROW_DIR}/install/include/" "${CYLON_ARROW_DIR}/include/") set(SOURCE_FILES src/Table.cpp diff --git a/java/src/main/native/src/TwisterXContext.cpp b/java/src/main/native/src/TwisterXContext.cpp index b1f0f02b0..682871f08 100644 --- a/java/src/main/native/src/TwisterXContext.cpp +++ b/java/src/main/native/src/TwisterXContext.cpp @@ -11,6 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include #include "org_cylondata_cylon_CylonContext.h" #include @@ -20,7 +21,11 @@ JNIEXPORT void JNICALL Java_org_cylondata_cylon_CylonContext_nativeInit (JNIEnv *env, jclass obj, jint ctx_id) { auto mpi_config = std::make_shared(); - auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + std::shared_ptr ctx; + auto st = cylon::CylonContext::InitDistributed(mpi_config, &ctx); + if (!st.is_ok()){ + throw std::runtime_error("InitDistributed failed " + st.get_msg()); + } contexts.insert(std::pair>(ctx_id, ctx)); }