diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index 24c6a49ad..775a10979 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -20,7 +20,7 @@ jobs: fail-fast: false matrix: include: - - os: macos-10.15 + - os: macos-11 steps: - uses: actions/checkout@v2 diff --git a/README-summit.md b/README-summit.md new file mode 100644 index 000000000..977d7b228 --- /dev/null +++ b/README-summit.md @@ -0,0 +1,106 @@ +# SUMMIT Installation guide + +This document describes the instalation on summit. +The instalition is first described in detail and then an abbreviated +instalation with a Makefile is provided. + +Notation: + +* `login$` denotes the login node +* `compute$` donates a compute node + +## Details + +### Activites on the login node + +```bash +login$ module purge +login$ module load gcc/9.3.0 +login$ module load spectrum-mpi/10.4.0.3-20210112 +login$ module load python/3.8-anaconda3 +``` + +Next check the python version + +```bash +login$ which python +> /sw/summit/python/3.8/anaconda3/2020.07-rhel8/bin/python +login$ python -V +> Python 3.8.3 +``` + +Now create a venv called `CYLON` in the home directory and activate it + +```bash +login$ python -m venv ~/CYLON +login$ source ~/CYLON/bin/activate +``` + +```bash +login$ which python +> ~/CYLON/bin/python +login$ python -V +> Python 3.8.3 +``` + +Update pip and install pytest: + +```bash +login$ pip install pip -U +login$ pip install pytest +``` + +Checkout the cylon code + +```bash +cd ~git clone https://github.com/cylondata/cylon.git +``` + +Now we obtain two interactive compute nodes + +```bash +login$ bsub -Is -W 0:30 -nnodes 2 -P gen150_bench $SHELL +``` + +After you get them you will be in a compute node + +### Activities on the compute node + +```bash +compute$ module purge +compute$ module load gcc/9.3.0 +compute$ module load spectrum-mpi/10.4.0.3-20210112 +compute$ module load python/3.8-anaconda3 +compute$ source ~/CYLON/bin/activate +compute$ export CC=`which gcc` +compute$ export CXX=`which g++` +compute$ export ARROW_DEFAULT_MEMORY_POOL=system +compute$ CC=gcc MPICC=mpicc pip install --no-binary mpi4py install mpi4py +compute$ pip install pytest-mpi +compute$ pip install cmake +compute$ pip install numpy +compute$ pip install cmake +compute$ export PATH=/ccs/home/gregorvl/.local/summit/anaconda3/2020.07/3.8/bin:$PATH + +compute$ ./build.sh -pyenv ~/CYLON -bpath $(pwd)/build --cpp --test --cmake-flags "-DMPI_C_COMPILER=$(which mpicc) -DMPI_CXX_COMPILER=$(which mpicxx)" -j 4 +``` + +The compilation will take some time. After it is completed you can conduct a test with + +```bash +compute$ cd ~/cylon/build +compute$ jsrun -n 1 -c 4 -a 1 ./build/bin/join_example m n 1000 0.9 +``` + +If everything is ok, you will see at the end of the test output + +``` +... +================================================ +All tests passed (66 assertions in 4 test cases) +``` + + +### Running batch scripts + +Please note that the module load and the source of the CYLON venv must be at the beginning of each batsch script you want to use cylon in. diff --git a/build.py b/build.py index cf2eecfa6..8506cf733 100644 --- a/build.py +++ b/build.py @@ -89,6 +89,8 @@ def check_conda_prefix(): parser.add_argument("-ipath", help='Install directory') parser.add_argument("--verbose", help='Set verbosity', default=False, action="store_true") +parser.add_argument("-j", help='Parallel build threads', default=os.cpu_count(), + dest='parallel', type=int) args = parser.parse_args() @@ -107,9 +109,7 @@ def on_off(arg): RUN_CPP_TESTS = args.test RUN_PYTHON_TESTS = args.pytest CMAKE_FLAGS = args.cmake_flags -CPPLINT_COMMAND = "\"-DCMAKE_CXX_CPPLINT=cpplint;--linelength=100;--headers=h," \ - "hpp;--filter=-legal/copyright,-build/c++11,-runtime/references\" " if \ - args.style_check else " " +PARALLEL = args.parallel # arrow build expects /s even on windows BUILD_PYTHON = args.python @@ -127,6 +127,16 @@ def on_off(arg): PYTHON_EXEC = sys.executable +if args.style_check: + cmd = f'{PYTHON_EXEC} -m pip install cpplint' + res = subprocess.run(cmd, shell=True, cwd=PYTHON_SOURCE_DIR) + check_status(res.returncode, "cpplint install") + + CPPLINT_COMMAND = "-DCMAKE_CXX_CPPLINT=\"cpplint;--linelength=100;--headers=h," \ + "hpp;--filter=-legal/copyright,-build/c++11,-runtime/references\" " +else: + CPPLINT_COMMAND = " " + CMAKE_BOOL_FLAGS = {'CYLON_GLOO', 'CYLON_UCX', 'CYLON_UCC'} CMAKE_FALSE_OPTIONS = {'0', 'FALSE', 'OFF', 'N', 'NO', 'IGNORE', 'NOTFOUND'} @@ -169,6 +179,7 @@ def print_line(): logger.info(f"Python exec : {PYTHON_EXEC}") logger.info(f"Build mode : {CPP_BUILD_MODE}") logger.info(f"Build path : {BUILD_DIR}") +logger.info(f"Build threads : {PARALLEL}") logger.info(f"Install path : {INSTALL_DIR}") logger.info(f"CMake flags : {CMAKE_FLAGS}") logger.info(f" -CYLON_GLOO : {CYLON_GLOO}") @@ -206,7 +217,7 @@ def build_cpp(): res = subprocess.call(cmake_command, cwd=BUILD_DIR, shell=True) check_status(res, "C++ cmake generate") - cmake_build_command = f'cmake --build . --parallel {os.cpu_count()} --config {CPP_BUILD_MODE}' + cmake_build_command = f'cmake --build . --parallel {PARALLEL} --config {CPP_BUILD_MODE}' logger.info(f"Build command: {cmake_build_command}") res = subprocess.call(cmake_build_command, cwd=BUILD_DIR, shell=True) check_status(res, "C++ cmake build") @@ -254,6 +265,14 @@ def python_test(): env['LD_LIBRARY_PATH'] = os.path.join(GLOO_PREFIX, "lib") + os.pathsep + \ env['LD_LIBRARY_PATH'] + + if CYLON_UCC: + env['CYLON_UCC'] = str(CYLON_UCC) + env['UCC_PREFIX'] = UCC_PREFIX + env['LD_LIBRARY_PATH'] = os.path.join(UCC_PREFIX, "lib") + os.pathsep + \ + os.path.join(UCC_PREFIX, "lib", "ucc") + os.pathsep + \ + env['LD_LIBRARY_PATH'] + elif OS_NAME == 'Darwin': if 'DYLD_LIBRARY_PATH' in env: env['DYLD_LIBRARY_PATH'] = str(Path(INSTALL_DIR, "lib")) + os.pathsep \ @@ -279,7 +298,7 @@ def build_python(): conda_prefix = check_conda_prefix() - python_build_command = f'{PYTHON_EXEC} setup.py install --force' + python_build_command = f'{PYTHON_EXEC} -m pip install -v --upgrade .' env = os.environ env["CYLON_PREFIX"] = str(BUILD_DIR) if os.name == 'posix': diff --git a/conda/environments/cylon_MacOS.yml b/conda/environments/cylon_MacOS.yml index 8af49b18a..e1c2a797d 100644 --- a/conda/environments/cylon_MacOS.yml +++ b/conda/environments/cylon_MacOS.yml @@ -12,7 +12,7 @@ dependencies: - numpy>=1.16 - pandas>=1.0 - fsspec - - setuptools>=40.0,<60.0 + - setuptools>=40.0 # they are not needed for using pygcylon or compiling it - pytest - pytest-mpi diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index d7337f0b4..f4ea961f1 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -155,8 +155,23 @@ message("Finding MPI") find_package(MPI REQUIRED COMPONENTS CXX) message(STATUS "MPI include dir: ${MPI_CXX_INCLUDE_PATH}") message(STATUS "MPI libs: ${MPI_CXX_LIBRARIES}") + +if (CYLON_CUSTOM_MPIRUN) + set(MPIEXEC_EXECUTABLE ${CYLON_CUSTOM_MPIRUN}) +endif (CYLON_CUSTOM_MPIRUN) message(STATUS "MPI executable: ${MPIEXEC_EXECUTABLE}") +if (CYLON_CUSTOM_MPIRUN_PARAMS) + set(MPIEXEC_EXECUTABLE_PARAMS ${CYLON_CUSTOM_MPIRUN_PARAMS}) +else () + if (WIN32) + set(MPIEXEC_EXECUTABLE_PARAMS "") + else () + set(MPIEXEC_EXECUTABLE_PARAMS --allow-run-as-root --oversubscribe) + endif () +endif (CYLON_CUSTOM_MPIRUN_PARAMS) +message(STATUS "MPI executable params: ${MPIEXEC_EXECUTABLE_PARAMS}") + include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH}) # Glog diff --git a/cpp/src/cylon/CMakeLists.txt b/cpp/src/cylon/CMakeLists.txt index 59602311d..332ea6f7d 100644 --- a/cpp/src/cylon/CMakeLists.txt +++ b/cpp/src/cylon/CMakeLists.txt @@ -102,6 +102,7 @@ add_library(cylon SHARED indexing/index_utils.hpp indexing/indexer.cpp indexing/indexer.hpp + indexing/slice.cpp io/arrow_io.cpp io/arrow_io.hpp io/csv_read_config.cpp diff --git a/cpp/src/cylon/compute/aggregate_kernels.hpp b/cpp/src/cylon/compute/aggregate_kernels.hpp index 0e9f4c58d..b676d10ac 100644 --- a/cpp/src/cylon/compute/aggregate_kernels.hpp +++ b/cpp/src/cylon/compute/aggregate_kernels.hpp @@ -19,6 +19,9 @@ #include #include #include +#include +#include + #include "cylon/util/macros.hpp" diff --git a/cpp/src/cylon/indexing/slice.cpp b/cpp/src/cylon/indexing/slice.cpp new file mode 100644 index 000000000..73ac1f08c --- /dev/null +++ b/cpp/src/cylon/indexing/slice.cpp @@ -0,0 +1,170 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include + +namespace cylon { + +static constexpr int64_t kZero = 0; + +/** + * Slice the part of table to create a single table + * @param table, offset and length + * @return new sliced table + */ +Status Slice(const std::shared_ptr &in, int64_t offset, int64_t length, + std::shared_ptr
*out) { + const auto &ctx = in->GetContext(); + const auto &in_table = in->get_table(); + + std::shared_ptr out_table; + if (!in->Empty()) { + out_table = in_table->Slice(offset, length); + } else { + out_table = in_table; + } + return Table::FromArrowTable(ctx, std::move(out_table), *out); +} + +/** + * DistributedSlice the part of table to create a single table + * @param table, global_offset and global_length + * @return new sliced table + */ +Status distributed_slice_impl(const std::shared_ptr
&in, + int64_t global_offset, + int64_t global_length, + int64_t *partition_lengths, + std::shared_ptr
*out) { + const auto &ctx = in->GetContext(); + std::shared_ptr partition_len_col; + + if (partition_lengths == nullptr) { + const auto &num_row_scalar = Scalar::Make(arrow::MakeScalar(in->Rows())); + RETURN_CYLON_STATUS_IF_FAILED(ctx->GetCommunicator() + ->Allgather(num_row_scalar, &partition_len_col)); + + partition_lengths = + const_cast(std::static_pointer_cast(partition_len_col->data()) + ->raw_values()); + } + + int64_t rank = ctx->GetRank(); + int64_t prefix_sum = std::accumulate(partition_lengths, partition_lengths + rank, kZero); + int64_t total_length = std::accumulate(partition_lengths + rank, + partition_lengths + ctx->GetWorldSize(), + prefix_sum); + if (global_offset > total_length) { + return {Code::Invalid, "global offset exceeds total length of the dist table"}; + } + // adjust global length if it exceeds total_length + if (global_offset + global_length > total_length) { + global_length = total_length - global_offset; + } + + int64_t this_length = *(partition_lengths + rank); + assert(this_length == in->Rows()); + + int64_t local_offset = std::max(kZero, std::min(global_offset - prefix_sum, this_length)); + int64_t local_length = + std::min(this_length, std::max(global_offset + global_length - prefix_sum, kZero)) + - local_offset; + + return Slice(in, local_offset, local_length, out); +} + +Status DistributedSlice(const std::shared_ptr
&in, + int64_t offset, + int64_t length, + std::shared_ptr
*out) { + return distributed_slice_impl(in, offset, length, nullptr, out); +} + +/** + * Head the part of table to create a single table with specific number of rows + * @param tables, number of rows + * @return new table + */ +Status Head(const std::shared_ptr
&table, int64_t num_rows, + std::shared_ptr
*output) { + if (num_rows >= 0) { + return Slice(table, 0, num_rows, output); + } else + return {Code::Invalid, "Number of head rows should be >=0"}; +} + +Status DistributedHead(const std::shared_ptr
&table, int64_t num_rows, + std::shared_ptr
*output) { + + std::shared_ptr in_table = table->get_table(); + + if (num_rows >= 0) { + return distributed_slice_impl(table, 0, num_rows, nullptr, output); + } else { + return {Code::Invalid, "Number of head rows should be >=0"}; + } +} + +/** + * Tail the part of table to create a single table with specific number of rows + * @param tables, number of rows + * @return new table + */ +Status Tail(const std::shared_ptr
&table, int64_t num_rows, + std::shared_ptr
*output) { + + std::shared_ptr in_table = table->get_table(); + const int64_t table_size = in_table->num_rows(); + + if (num_rows >= 0) { + return Slice(table, table_size - num_rows, num_rows, output); + } else { + return {Code::Invalid, "Number of tailed rows should be >=0"}; + } +} + +Status DistributedTail(const std::shared_ptr
&table, int64_t num_rows, + std::shared_ptr
*output) { + if (num_rows >= 0) { + const auto &ctx = table->GetContext(); + std::shared_ptr partition_len_col; + const auto &num_row_scalar = Scalar::Make(arrow::MakeScalar(table->Rows())); + RETURN_CYLON_STATUS_IF_FAILED(ctx->GetCommunicator() + ->Allgather(num_row_scalar, &partition_len_col)); + assert(ctx->GetWorldSize() == partition_len_col->length()); + auto *partition_lengths = + std::static_pointer_cast(partition_len_col->data()) + ->raw_values(); + + int64_t dist_length = + std::accumulate(partition_lengths, partition_lengths + ctx->GetWorldSize(), kZero); + + return distributed_slice_impl(table, + dist_length - num_rows, + num_rows, + const_cast (partition_lengths), + output); + } else { + return {Code::Invalid, "Number of tailed rows should be >=0"}; + } +} + +} \ No newline at end of file diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.cpp b/cpp/src/cylon/net/gloo/gloo_communicator.cpp index 7af55b28f..12d880398 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.cpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.cpp @@ -57,6 +57,9 @@ Status GlooCommunicator::Make(const std::shared_ptr &config, } else { // MPI is not initialized. Ask gloo to initialize MPI gloo_ctx = gloo::mpi::Context::createManaged(); } + if (gloo_config->timeout_ != kTimoutNotSet) { + gloo_ctx->setTimeout(std::chrono::seconds(gloo_config->timeout_)); + } ((gloo::mpi::Context &) *gloo_ctx).connectFullMesh(dev); #else return {Code::Invalid, "Gloo does not contain mpi headers!"}; @@ -69,8 +72,10 @@ Status GlooCommunicator::Make(const std::shared_ptr &config, gloo_ctx = std::make_shared(gloo_config->rank_, gloo_config->world_size_); + if (gloo_config->timeout_ != kTimoutNotSet) { + gloo_ctx->setTimeout(std::chrono::seconds(gloo_config->timeout_)); + } ((gloo::rendezvous::Context &) *gloo_ctx).connectFullMesh(*prefix_store, dev); - } *out = std::make_shared(pool, std::move(gloo_ctx)); return Status::OK(); @@ -193,5 +198,12 @@ int GlooConfig::world_size() const { return world_size_; } +void GlooConfig::SetTimeout(int timeout) { + GlooConfig::timeout_ = std::chrono::seconds(timeout); +} +const std::chrono::seconds &GlooConfig::timeout() const { + return GlooConfig::timeout_; +} + } } diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.hpp b/cpp/src/cylon/net/gloo/gloo_communicator.hpp index 1f5b9a825..c7523e00a 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.hpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.hpp @@ -33,6 +33,8 @@ namespace net { class GlooCommunicator; +static constexpr std::chrono::seconds kTimoutNotSet(0); + class GlooConfig : public CommConfig { public: explicit GlooConfig(int rank = 0, int world_size = 1, bool use_mpi = false); @@ -43,7 +45,9 @@ class GlooConfig : public CommConfig { int rank() const; int world_size() const; + const std::chrono::seconds &timeout() const; + void SetTimeout(int timeout); void SetTcpHostname(const std::string &tcp_hostname); void SetTcpIface(const std::string &tcp_iface); void SetTcpAiFamily(int tcp_ai_family); @@ -63,6 +67,7 @@ class GlooConfig : public CommConfig { int rank_; int world_size_; bool use_mpi_; + std::chrono::seconds timeout_ = kTimoutNotSet; #ifdef GLOO_USE_MPI /* @@ -74,7 +79,7 @@ class GlooConfig : public CommConfig { #endif //GLOO_USE_MPI // tcp attr - std::string tcp_hostname_ = "localhost"; + std::string tcp_hostname_; std::string tcp_iface_; int tcp_ai_family_ = AF_UNSPEC; diff --git a/cpp/src/cylon/net/mpi/mpi_communicator.cpp b/cpp/src/cylon/net/mpi/mpi_communicator.cpp index ba2b5768e..5a723e346 100644 --- a/cpp/src/cylon/net/mpi/mpi_communicator.cpp +++ b/cpp/src/cylon/net/mpi/mpi_communicator.cpp @@ -91,12 +91,13 @@ Status MPICommunicator::Make(const std::shared_ptr &config, void MPICommunicator::Finalize() { // finalize only if we initialized MPI - if (!externally_init) { + if (!externally_init && !IsFinalized()) { int finalized; MPI_Finalized(&finalized); if (!finalized) { MPI_Finalize(); } + finalized = true; } } void MPICommunicator::Barrier() { diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index 964d76efb..0e1771a83 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -27,6 +27,8 @@ namespace cylon { namespace net { +static constexpr int kBarrierFlag = UINT32_MAX; + void mpi_check_and_finalize() { int mpi_finalized; MPI_Finalized(&mpi_finalized); @@ -89,7 +91,8 @@ Status UCXCommunicator::AllReduce(const std::shared_ptr &column, return {Code::NotImplemented, "Allreduce not implemented for ucx"}; } -UCXCommunicator::UCXCommunicator(MemoryPool *pool) : Communicator(pool, -1, -1) {} +UCXCommunicator::UCXCommunicator(MemoryPool *pool, bool externally_init) + : Communicator(pool, -1, -1), externally_init(externally_init) {} Status UCXCommunicator::AllReduce(const std::shared_ptr &values, net::ReduceOp reduce_op, @@ -117,10 +120,16 @@ Status UCXCommunicator::Allgather(const std::shared_ptr &value, Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out) { CYLON_UNUSED(config); - *out = std::make_shared(pool); - auto &comm = static_cast(**out); - // Check init functions + // MPI init int initialized; + MPI_Initialized(&initialized); + if (!initialized) { + RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); + } + + *out = std::make_shared(pool, initialized); + auto &comm = static_cast(**out); + // Int variable used when iterating int sIndx; // Address of the UCP Worker for receiving @@ -133,12 +142,6 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo // Variable to hold the current ucp address ucp_address_t *address; - // MPI init - MPI_Initialized(&initialized); - if (!initialized) { - RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); - } - // Get the rank for checking send to self, and initializations MPI_Comm_rank(MPI_COMM_WORLD, &comm.rank); MPI_Comm_size(MPI_COMM_WORLD, &comm.world_size); @@ -205,7 +208,7 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo } void UCXCommunicator::Finalize() { - if (!this->IsFinalized()) { + if (!externally_init && !IsFinalized()) { ucp_cleanup(ucpContext); mpi_check_and_finalize(); finalized = true; @@ -329,14 +332,36 @@ void UCXUCCCommunicator::Finalize() { } } ucc_context_destroy(uccContext); - mpi_check_and_finalize(); - ucx_comm_->Finalize(); + ucx_comm_->Finalize(); // this will handle MPI_Finalize finalized = true; } } void UCXUCCCommunicator::Barrier() { - return ucx_comm_->Barrier(); + ucc_coll_args_t args_; + ucc_coll_req_h req; + + args_.mask = 0; + args_.coll_type = UCC_COLL_TYPE_BARRIER; + + ucc_status_t status; + status = ucc_collective_init(&args_, &req, uccTeam); + assert(status == UCC_OK); + + status = ucc_collective_post(req); + assert(status == UCC_OK); + + status = ucc_collective_test(req); + while (status > UCC_OK) { // loop until status == 0 + status = ucc_context_progress(uccContext); + assert(status >= UCC_OK); // should be 0, 1 or 2 + + status = ucc_collective_test(req); + } + assert(status == UCC_OK); + + status = ucc_collective_finalize(req); + assert(status == UCC_OK); } Status UCXUCCCommunicator::AllGather(const std::shared_ptr
&table, diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.hpp b/cpp/src/cylon/net/ucx/ucx_communicator.hpp index 57ba07578..74abb5200 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.hpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.hpp @@ -29,15 +29,15 @@ namespace cylon { namespace net { class UCXConfig : public CommConfig { + public: CommType Type() override; - public: static std::shared_ptr Make(); }; class UCXCommunicator : public Communicator { public: - explicit UCXCommunicator(MemoryPool *pool); + explicit UCXCommunicator(MemoryPool *pool, bool externally_init); ~UCXCommunicator() override = default; std::unique_ptr CreateChannel() const override; @@ -78,6 +78,7 @@ class UCXCommunicator : public Communicator { std::unordered_map endPointMap; // UCP Context - Holds a UCP communication instance's global information. ucp_context_h ucpContext{}; + bool externally_init = false; }; #ifdef BUILD_CYLON_UCC diff --git a/cpp/src/cylon/net/utils.cpp b/cpp/src/cylon/net/utils.cpp index 91b6134f2..63e34600a 100644 --- a/cpp/src/cylon/net/utils.cpp +++ b/cpp/src/cylon/net/utils.cpp @@ -12,6 +12,7 @@ * limitations under the License. */ +#include #include "utils.hpp" diff --git a/cpp/src/cylon/table.cpp b/cpp/src/cylon/table.cpp index 07964dfc8..507f72f45 100644 --- a/cpp/src/cylon/table.cpp +++ b/cpp/src/cylon/table.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -277,7 +278,11 @@ static inline Status shuffle_two_tables_by_hashing(const std::shared_ptr &ctx, const std::string &path, std::shared_ptr
&tableOut, const cylon::io::config::CSVReadOptions &options) { arrow::Result> result = cylon::io::read_csv(ctx, path, options); + + LOG(INFO) << "Reading Inside FromCSV"; + if (result.ok()) { + LOG(INFO) << "CSV file reading is OK"; std::shared_ptr &table = result.ValueOrDie(); if (table->column(0)->chunks().size() > 1) { const auto &combine_res = table->CombineChunks(ToArrowPool(ctx)); @@ -289,6 +294,7 @@ Status FromCSV(const std::shared_ptr &ctx, const std::string &path } // slice the table if required if (options.IsSlice() && ctx->GetWorldSize() > 1) { + LOG(INFO) << "Slice the table if required"; int32_t rows_per_worker = table->num_rows() / ctx->GetWorldSize(); int32_t remainder = table->num_rows() % ctx->GetWorldSize(); @@ -379,12 +385,14 @@ Status Merge(const std::vector> &ctables, std::vector> tables; tables.reserve(ctables.size()); for (const auto &t: ctables) { - if (t->Rows()) { - std::shared_ptr arrow; - t->ToArrowTable(arrow); - tables.push_back(std::move(arrow)); + if (!t->Empty()) { + tables.push_back(t->get_table()); } } + if (tables.empty()) { // means all tables are empty. return a cylon table from 0th table + tableOut = ctables[0]; + return Status::OK(); + } const auto &ctx = ctables[0]->GetContext(); const auto &concat_res = arrow::ConcatenateTables(tables); @@ -504,9 +512,8 @@ Status MergeSortedTable(const std::vector> &tables, const std::vector &sort_orders, std::shared_ptr
&out) { std::shared_ptr
concatenated; - std::vector table_indices(tables.size()), - table_end_indices(tables.size()); - int acc = 0; + std::vector table_indices(tables.size()), table_end_indices(tables.size()); + int64_t acc = 0; for (size_t i = 0; i < table_indices.size(); i++) { table_indices[i] = acc; acc += tables[i]->Rows(); @@ -515,7 +522,7 @@ Status MergeSortedTable(const std::vector> &tables, RETURN_CYLON_STATUS_IF_FAILED(Merge(tables, concatenated)); - if(concatenated->GetContext()->GetWorldSize() > 4) { + if (concatenated->GetContext()->GetWorldSize() > 4) { return Sort(concatenated, sort_columns, out, sort_orders); } @@ -530,7 +537,7 @@ Status MergeSortedTable(const std::vector> &tables, std::priority_queue, decltype(comp)> pq(comp); - for (size_t i = 0; i < tables.size(); i++) { + for (int i = 0; i < (int) tables.size(); i++) { if (table_indices[i] < table_end_indices[i]) { pq.push(i); } @@ -541,13 +548,9 @@ Status MergeSortedTable(const std::vector> &tables, arrow::Int64Builder filter(pool); RETURN_CYLON_STATUS_IF_ARROW_FAILED(filter.Reserve(concatenated->Rows())); - std::vector temp_v; - while (!pq.empty()) { int t = pq.top(); pq.pop(); - // std::cout<> &tables, } CYLON_ASSIGN_OR_RAISE(auto take_arr, filter.Finish()); - CYLON_ASSIGN_OR_RAISE( - auto take_res, - (arrow::compute::Take(concatenated->get_table(), take_arr))); + CYLON_ASSIGN_OR_RAISE(auto take_res, + (arrow::compute::Take(concatenated->get_table(), take_arr))); out = std::make_shared
(ctx, take_res.table()); @@ -1478,6 +1480,7 @@ static Status RepartitionToMatchOtherTable(const std::shared_ptr & std::vector rows_per_partition; std::shared_ptr output; + RETURN_CYLON_STATUS_IF_FAILED( a->GetContext()->GetCommunicator()->Allgather(num_row_scalar, &output)); auto *data_ptr = @@ -1551,6 +1554,7 @@ Status Repartition(const std::shared_ptr &table, auto num_row_scalar = std::make_shared(arrow::MakeScalar(num_row)); + RETURN_CYLON_STATUS_IF_FAILED( table->GetContext()->GetCommunicator()->Allgather(num_row_scalar, &sizes_cols)); @@ -1756,5 +1760,7 @@ Status WriteParquet(const std::shared_ptr &ctx_, return Status(Code::OK); } + + #endif } // namespace cylon diff --git a/cpp/src/cylon/table.hpp b/cpp/src/cylon/table.hpp index 4463649dc..cfd67113f 100644 --- a/cpp/src/cylon/table.hpp +++ b/cpp/src/cylon/table.hpp @@ -481,6 +481,48 @@ Status WriteParquet(const std::shared_ptr &ctx, std::shared_ptr &table, const std::string &path, const io::config::ParquetOptions &options = cylon::io::config::ParquetOptions()); +/** + * Slice the part of table to create a single table + * @param tables, offset, length + * @return new sliced table + */ +Status Slice(const std::shared_ptr
&in, int64_t offset, int64_t length, + std::shared_ptr *out); + +/** + * DistributedSlice the part of table to create a single table + * @param table, offset and length + * @return new sliced table + */ + + +Status DistributedSlice(const std::shared_ptr
&in, int64_t offset, int64_t length, + std::shared_ptr *out); + + +/** + * Head the part of table to create a single table with specific number of rows + * @param tables, number of rows + * @return new table + */ + +Status Head(const std::shared_ptr
&table, int64_t num_rows, + std::shared_ptr *output); +Status DistributedHead(const std::shared_ptr
&table, int64_t num_rows, + std::shared_ptr *output); + +/** + * Tail the part of table to create a single table with specific number of rows + * @param tables, number of rows + * @return new table + */ + +Status Tail(const std::shared_ptr
&table, int64_t num_rows, + std::shared_ptr *output); + +Status DistributedTail(const std::shared_ptr
&table, int64_t num_rows, + std::shared_ptr *output); + #endif // BUILD_CYLON_PARQUET } // namespace cylon diff --git a/cpp/src/cylon/thridparty/flat_hash_map/bytell_hash_map.hpp b/cpp/src/cylon/thridparty/flat_hash_map/bytell_hash_map.hpp index 3a31c726d..2cd3b1931 100644 --- a/cpp/src/cylon/thridparty/flat_hash_map/bytell_hash_map.hpp +++ b/cpp/src/cylon/thridparty/flat_hash_map/bytell_hash_map.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include diff --git a/cpp/src/examples/CMakeLists.txt b/cpp/src/examples/CMakeLists.txt index cd9042a14..510e3a1c8 100644 --- a/cpp/src/examples/CMakeLists.txt +++ b/cpp/src/examples/CMakeLists.txt @@ -75,6 +75,7 @@ cylon_add_exe(multi_idx_join_example) cylon_add_exe(parquet_union_example) cylon_add_exe(parquet_join_example) cylon_add_exe(dist_sort_example) +cylon_add_exe(slice_example) if (CYLON_UCX) cylon_add_exe(ucx_join_example) diff --git a/cpp/src/examples/join_example.cpp b/cpp/src/examples/join_example.cpp index 91a708fa9..4130af99a 100644 --- a/cpp/src/examples/join_example.cpp +++ b/cpp/src/examples/join_example.cpp @@ -71,6 +71,7 @@ int main(int argc, char *argv[]) { } else if (mem == "f") { cylon::FromCSV(ctx, std::string(argv[3]) + std::to_string(ctx->GetRank()) + ".csv", first_table); cylon::FromCSV(ctx, std::string(argv[4]) + std::to_string(ctx->GetRank()) + ".csv", second_table); + if (argc == 6) { if (!strcmp(argv[5], "hash")) { LOG(INFO) << "Hash join algorithm"; @@ -85,6 +86,7 @@ int main(int argc, char *argv[]) { LOG(INFO) << "Read tables in " << std::chrono::duration_cast( read_end_time - start_start).count() << "[ms]"; + auto join_config = cylon::join::config::JoinConfig(cylon::join::config::JoinType::INNER, 0, 0, @@ -109,6 +111,7 @@ int main(int argc, char *argv[]) { << std::chrono::duration_cast( join_end_time - read_end_time).count() << "[ms]"; std::vector column_names = joined->ColumnNames(); + ctx->Finalize(); return 0; } diff --git a/cpp/src/examples/slice_example.cpp b/cpp/src/examples/slice_example.cpp new file mode 100644 index 000000000..df2a254b3 --- /dev/null +++ b/cpp/src/examples/slice_example.cpp @@ -0,0 +1,162 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include + +#include "example_utils.hpp" + + +int main(int argc, char *argv[]) { + + if ((argc < 6 && std::string(argv[1]) == "f")) { + LOG(ERROR) << "./slice_example f [n | o] csv_file offset length" << std::endl + << "./slice_example f [n | o] csv_file offset length" << std::endl; + return 1; + } + + if ((argc < 7 && std::string(argv[1]) == "m")) { + LOG(ERROR) << "./slice_example m [n | o] num_tuples_per_worker 0.0-1.0 offset length" << std::endl + << "./slice_example m [n | o] num_tuples_per_worker 0.0-1.0 offset length" << std::endl; + return 1; + } + LOG(INFO) << "Starting main() function"; + auto start_start = std::chrono::steady_clock::now(); + auto mpi_config = std::make_shared(); + + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()) { + std::cerr << "ctx init failed! " << std::endl; + return 1; + } + + + std::shared_ptr in_table, joined, sliced, head_table, tail_table; + auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); + cylon::join::config::JoinAlgorithm algorithm = cylon::join::config::JoinAlgorithm::SORT; + + std::string mem = std::string(argv[1]); + std::string ops_param = std::string(argv[2]); + int64_t offset = 0, length = 0; + + bool ops = true; + if (ops_param == "o") { + ops = true; + } else if (ops_param == "n") { + ops = false; + } + + if (mem == "m") { + uint64_t count = std::stoull(argv[3]); + double dup = std::stod(argv[4]); + cylon::examples::create_in_memory_tables(count, dup,ctx,in_table); + offset = std::stoull(argv[5]); + length = std::stoull(argv[6]); + LOG(INFO) << "Load From in-memory size: " << std::string(argv[3]); + } else if (mem == "f") { + LOG(INFO) << "Load From the CSV file: " << std::string(argv[3]); + cylon::FromCSV(ctx, std::string(argv[3]) , in_table); + + //cylon::FromCSV(ctx, std::string(argv[3]) + std::to_string(ctx->GetRank()) + ".csv", first_table); + //cylon::FromCSV(ctx, std::string(argv[4]) + std::to_string(ctx->GetRank()) + ".csv", second_table); + + offset = std::stoull(argv[4]); + length = std::stoull(argv[5]); + } + ctx->Barrier(); + auto read_end_time = std::chrono::steady_clock::now(); + //in_table->Print(); + LOG(INFO) << "Read tables in " + << std::chrono::duration_cast( + read_end_time - start_start).count() << "[ms]"; + + auto join_config = cylon::join::config::JoinConfig(cylon::join::config::JoinType::INNER, + 0, + 0, + algorithm, + "l_", + "r_"); + cylon::Status status; + + // Code block for slice operation + if (ops) { + status = cylon::Slice(in_table, offset, length, &sliced); + } else { + status = cylon::DistributedSlice(in_table, offset, length, &sliced); + } + if (!status.is_ok()) { + LOG(INFO) << "Table Slice is failed "; + ctx->Finalize(); + return 1; + } + auto slice_end_time = std::chrono::steady_clock::now(); + LOG(INFO) << ctx->GetRank() << " Partition with " << "Sliced table has : " << sliced->Rows(); + LOG(INFO) << "Sliced is done in " + << std::chrono::duration_cast( + slice_end_time - read_end_time).count() << "[ms]"; + std::vector sliced_column_names = sliced->ColumnNames(); + + sliced->Print(); + + int64_t num_rows = 30; + //Code block for head operation + + if (ops) { + status = cylon::Head(in_table, num_rows, &head_table); + } else { + status = cylon::DistributedHead(in_table, num_rows, &head_table); + } + if (!status.is_ok()) { + LOG(INFO) << "Table Head is failed "; + ctx->Finalize(); + return 1; + } + slice_end_time = std::chrono::steady_clock::now(); + LOG(INFO) << ctx->GetRank() << " Partition with " << "Head table has : " << head_table->Rows(); + LOG(INFO) << "Head is done in " + << std::chrono::duration_cast( + slice_end_time - read_end_time).count() << "[ms]"; + sliced_column_names = head_table->ColumnNames(); + + head_table->Print(); + + //Code block for tail operation + + if (ops) { + status = cylon::Tail(in_table, num_rows, &tail_table); + } else { + status = cylon::DistributedTail(in_table, num_rows, &tail_table); + } + if (!status.is_ok()) { + LOG(INFO) << "Table Tail is failed "; + ctx->Finalize(); + return 1; + } + slice_end_time = std::chrono::steady_clock::now(); + LOG(INFO) << ctx->GetRank() << " Partition with " << "Tail table has : " << tail_table->Rows(); + LOG(INFO) << "Tail is done in " + << std::chrono::duration_cast( + slice_end_time - read_end_time).count() << "[ms]"; + sliced_column_names = tail_table->ColumnNames(); + + tail_table->Print(); + + ctx->Finalize(); + return 0; +} diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index b709ec1c8..8d0fd819b 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -73,11 +73,11 @@ endfunction(cylon_add_test) function(cylon_run_test TESTNAME parallelism comm_args) set(exec_name "${TESTNAME}_${parallelism}_${comm_args}") if (WIN32) - set(test_params -np ${parallelism} "${CMAKE_BINARY_DIR}/bin/${CMAKE_BUILD_TYPE}/${TESTNAME}" --comm "${comm_args}") + set(test_params -n ${parallelism} "${CMAKE_BINARY_DIR}/bin/${CMAKE_BUILD_TYPE}/${TESTNAME}" --comm "${comm_args}") else () - set(test_params --allow-run-as-root --oversubscribe -np ${parallelism} "${CMAKE_BINARY_DIR}/bin/${TESTNAME}" --comm "${comm_args}") + set(test_params -n ${parallelism} "${CMAKE_BINARY_DIR}/bin/${TESTNAME}" --comm "${comm_args}") endif () - add_test(NAME ${exec_name} COMMAND ${MPIEXEC_EXECUTABLE} ${test_params}) + add_test(NAME ${exec_name} COMMAND ${MPIEXEC_EXECUTABLE} ${MPIEXEC_EXECUTABLE_PARAMS} ${test_params}) endfunction(cylon_run_test) #Add tests as follows ... @@ -154,6 +154,12 @@ cylon_add_test(equal_test) cylon_run_test(equal_test 1 mpi) cylon_run_test(equal_test 2 mpi) +# slice test +cylon_add_test(slice_test) +cylon_run_test(slice_test 1 mpi) +cylon_run_test(slice_test 2 mpi) +cylon_run_test(slice_test 4 mpi) + # flatten array test cylon_add_test(flatten_array_test) cylon_run_test(flatten_array_test 1 mpi) @@ -238,5 +244,8 @@ if (CYLON_UCX) cylon_run_test(aggregate_test 1 ucx) cylon_run_test(aggregate_test 2 ucx) cylon_run_test(aggregate_test 4 ucx) + + cylon_run_test(sync_comms_test 1 ucx) + cylon_run_test(sync_comms_test 4 ucx) endif (CYLON_UCC) endif (CYLON_UCX) \ No newline at end of file diff --git a/cpp/test/slice_test.cpp b/cpp/test/slice_test.cpp new file mode 100644 index 000000000..27c79ba7a --- /dev/null +++ b/cpp/test/slice_test.cpp @@ -0,0 +1,201 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "common/test_header.hpp" +#include "test_utils.hpp" +#include + +namespace cylon { + + +void testDistSlice(std::shared_ptr
& global_table, + std::shared_ptr
& table, + int64_t offset, + int64_t length) { + + std::shared_ptr
out; + auto ctx = table->GetContext(); + std::shared_ptr arrow_output; + + CHECK_CYLON_STATUS(DistributedSlice(table, offset, length, &out)); + INFO(RANK << "slice res:" << out->Rows()); + + std::vector> gathered; + CHECK_CYLON_STATUS(ctx->GetCommunicator()->Gather(out, /*root*/0, /*gather_from_root*/true, + &gathered)); + if (RANK == 0) { + std::shared_ptr
result, global_out; + CHECK_CYLON_STATUS(Slice(global_table, offset, length, &global_out)); + + for (size_t i = 0; i < gathered.size(); i++) { + INFO("gathered " << i << ":" << gathered[i]->Rows()); + } + REQUIRE(WORLD_SZ == (int) gathered.size()); + + CHECK_CYLON_STATUS(Merge(gathered, result)); + CHECK_ARROW_EQUAL(global_out->get_table(), result->get_table()); + } +} + +namespace test { + +TEST_CASE("Dist Slice testing", "[dist slice]") { + auto schema = arrow::schema({{arrow::field("a", arrow::int64())}, + {arrow::field("b", arrow::float32())}}); + auto global_arrow_table = TableFromJSON(schema, {R"([{"a": 3, "b":0.025}, + {"a": 26, "b":0.394}, + {"a": 51, "b":0.755}, + {"a": 20, "b":0.030}, + {"a": 33, "b":0.318}, + {"a": 12, "b":0.813}, + {"a": 72, "b":0.968}, + {"a": 29, "b":0.291}, + {"a": 41, "b":0.519}, + {"a": 29, "b":0.291}, + {"a": 41, "b":0.519}, + {"a": 43, "b":0.419}, + {"a": 57, "b":0.153}, + {"a": 25, "b":0.479}, + {"a": 26, "b":0.676}, + {"a": 70, "b":0.504}, + {"a": 7, "b":0.232}, + {"a": 45, "b":0.734}, + {"a": 61, "b":0.685}, + {"a": 57, "b":0.314}, + {"a": 59, "b": 0.837}, + {"a": 67, "b": 0.086}, + {"a": 14, "b": 0.193}, + {"a": 21, "b": 0.853}, + {"a": 10, "b": 0.808}, + {"a": 13, "b": 0.085}, + {"a": 31, "b": 0.122}, + {"a": 20, "b": 0.689}, + {"a": 37, "b": 0.491}, + {"a": 62, "b": 0.262}, + {"a": 1 , "b": 0.868}, + {"a": 19, "b": 0.422}, + {"a": 64, "b": 0.528}, + {"a": 37, "b": 0.834}, + {"a": 33, "b": 0.010}, + {"a": 76, "b": 0.927}, + {"a": 4 , "b": 0.529}, + {"a": 13, "b": 0.201}, + {"a": 45, "b": 0.898}, + {"a": 67, "b": 0.407}])"}); + + int64_t rows_per_tab = global_arrow_table->num_rows() / WORLD_SZ; + std::shared_ptr
table1; + CHECK_CYLON_STATUS(Table::FromArrowTable( + ctx, global_arrow_table->Slice(RANK * rows_per_tab, rows_per_tab), + table1)); + std::shared_ptr
global_table; + + CHECK_CYLON_STATUS( + Table::FromArrowTable(ctx, global_arrow_table, global_table)); + + SECTION("dist_slice_test_1_single_table") { + testDistSlice(global_table, table1, 2, 5); + } + + SECTION("dist_slice_test_2_multiple_table") { + testDistSlice(global_table, table1, 1, 15); + } + + SECTION("dist_sort_test_3_skipped_two_table") { + testDistSlice(global_table, table1, 15, 8); + } + + SECTION("dist_slice_test_4_multiple_table") { + testDistSlice(global_table, table1, 1, 0); + } + + SECTION("dist_slice_test_5_multiple_table") { + testDistSlice(global_table, table1, 0, 0); + } + + SECTION("dist_slice_test_5_multiple_table") { + testDistSlice(global_table, table1, global_table->Rows(), INT64_MAX); + } + + SECTION("dist_slice_test_5_multiple_table") { + testDistSlice(global_table, table1, global_table->Rows() - 1, 1000); + } + + SECTION("failing case") { + REQUIRE_FALSE(DistributedSlice(table1, 1000, 1000, nullptr).is_ok()); + } + + SECTION("dist_sort_test_5_one_empty_table") { + auto pool = cylon::ToArrowPool(ctx); + + std::shared_ptr arrow_empty_table; + auto arrow_status = util::CreateEmptyTable(table1->get_table()->schema(), + &arrow_empty_table, pool); + REQUIRE(arrow_status.ok()); + auto empty_table = std::make_shared
(ctx, std::move(arrow_empty_table)); + + std::shared_ptr
out1, out2; + std::shared_ptr arrow_output; + CHECK_CYLON_STATUS(DistributedSlice(empty_table, 0, 10, &out1)); + CHECK_CYLON_STATUS(DistributedSlice(empty_table, 0, 5, &out2)); + CHECK_ARROW_EQUAL(out1->get_table(), out2->get_table()); + } + +} + +TEST_CASE("Slice testing", "[equal]") { + std::string path1 = "../data/input/csv1_0.csv"; + std::string path2 = "../data/input/csv1_1.csv"; + std::string path3 = "../data/input/csv1_0_shuffled.csv"; + std::string path4 = "../data/input/csv1_0_col_order_change.csv"; + std::shared_ptr
table1, table2, table3, table4, out; + + auto read_options = io::config::CSVReadOptions().UseThreads(false); + + CHECK_CYLON_STATUS(FromCSV(ctx, std::vector{path1, path2, path3, path4}, + std::vector *>{&table1, &table2, &table3, &table4}, + read_options)); + + SECTION("Testing Local Slice") { + + CHECK_CYLON_STATUS(Slice(table1, 13, 8, &out)); + + CHECK_CYLON_STATUS(Slice(table2, 15, 5, &out)); + + CHECK_CYLON_STATUS(Slice(table3, 0, 10, &out)); + + CHECK_CYLON_STATUS(Slice(table4, 2, 15, &out)); + } +} + +TEST_CASE("Distributed Slice testing", "[distributed slice]") { + std::string path1 = "../data/input/csv1_" + std::to_string(RANK) +".csv"; + std::string path2 = "../data/input/csv2_" + std::to_string(RANK) +".csv"; + std::shared_ptr
table1, table2, out; + + auto read_options = io::config::CSVReadOptions().UseThreads(false); + + CHECK_CYLON_STATUS(FromCSV(ctx, std::vector{path1, path2}, + std::vector *>{&table1, &table2}, + read_options)); + + SECTION("Testing Distributed Slice") { + CHECK_CYLON_STATUS(DistributedSlice(table1, 10, 15, &out)); + + CHECK_CYLON_STATUS(DistributedSlice(table2, 12, 8, &out)); + } +} + +} +} \ No newline at end of file diff --git a/cpp/test/sync_comms_test.cpp b/cpp/test/sync_comms_test.cpp index 76911e1ca..62d810cb6 100644 --- a/cpp/test/sync_comms_test.cpp +++ b/cpp/test/sync_comms_test.cpp @@ -12,12 +12,22 @@ * limitations under the License. */ #include +#include +#include #include "common/test_header.hpp" namespace cylon { namespace test { +TEST_CASE("barrier", "[sync comms]") { + srand((unsigned) time(nullptr)); + int i = (rand() % 2000) + 1; + + std::this_thread::sleep_for(std::chrono::duration(i)); + ctx->Barrier(); +} + enum GenType { Empty, Null, NonEmpty }; void generate_table(std::shared_ptr *schema, @@ -113,6 +123,11 @@ TEST_CASE("all gather table", "[sync comms]") { } TEST_CASE("gather table", "[sync comms]") { + // todo: UCC doesnt support gatherv for the moment #599 + if (ctx->GetCommType() == net::UCX){ + return; + } + std::shared_ptr schema; std::shared_ptr in_table; generate_table(&schema, &in_table); diff --git a/python/pycylon/pycylon/api/lib.pxd b/python/pycylon/pycylon/api/lib.pxd index ad6368bea..96cad6ca3 100644 --- a/python/pycylon/pycylon/api/lib.pxd +++ b/python/pycylon/pycylon/api/lib.pxd @@ -25,6 +25,8 @@ from pycylon.net.comm_config cimport CCommConfig from pycylon.net.mpi_config cimport CMPIConfig IF CYTHON_GLOO: from pycylon.net.gloo_config cimport CGlooConfig +IF CYTHON_UCX & CYTHON_UCC: + from pycylon.net.ucx_config cimport CUCXConfig from pycylon.io.csv_read_config cimport CCSVReadOptions from pycylon.io.csv_read_config import CSVReadOptions from pycylon.io.csv_read_config cimport CSVReadOptions @@ -59,6 +61,9 @@ cdef api shared_ptr[CMPIConfig] pycylon_unwrap_mpi_config(object config) IF CYTHON_GLOO: cdef api shared_ptr[CGlooConfig] pycylon_unwrap_gloo_config(object config) +IF CYTHON_UCX & CYTHON_UCC: + cdef api shared_ptr[CUCXConfig] pycylon_unwrap_ucx_config(object config) + cdef api shared_ptr[CTable] pycylon_unwrap_table(object table) cdef api shared_ptr[CDataType] pycylon_unwrap_data_type(object data_type) @@ -67,7 +72,7 @@ cdef api CCSVReadOptions pycylon_unwrap_csv_read_options(object csv_read_options cdef api CCSVWriteOptions pycylon_unwrap_csv_write_options(object csv_write_options) -cdef api CSortOptions * pycylon_unwrap_sort_options(object sort_options) +cdef api shared_ptr[CSortOptions] pycylon_unwrap_sort_options(object sort_options) cdef api shared_ptr[CBaseArrowIndex] pycylon_unwrap_base_arrow_index(object base_arrow_index) @@ -87,7 +92,7 @@ cdef api object pycylon_wrap_layout(const CLayout & layout) cdef api object pycylon_wrap_data_type(const shared_ptr[CDataType] & data_type) -cdef api object pycylon_wrap_sort_options(CSortOptions *sort_options) +cdef api object pycylon_wrap_sort_options(const shared_ptr[CSortOptions] &sort_options) cdef api object pycylon_wrap_base_arrow_index(const shared_ptr[CBaseArrowIndex] & base_arrow_index) diff --git a/python/pycylon/pycylon/api/lib.pyx b/python/pycylon/pycylon/api/lib.pyx index a1130f1d3..67bec2700 100644 --- a/python/pycylon/pycylon/api/lib.pyx +++ b/python/pycylon/pycylon/api/lib.pyx @@ -29,6 +29,9 @@ from pycylon.net.mpi_config cimport MPIConfig IF CYTHON_GLOO: from pycylon.net.gloo_config import GlooMPIConfig, GlooStandaloneConfig from pycylon.net.gloo_config cimport CGlooConfig, GlooMPIConfig, GlooStandaloneConfig +IF CYTHON_UCX & CYTHON_UCC: + from pycylon.net.ucx_config import UCXConfig + from pycylon.net.ucx_config cimport CUCXConfig, UCXConfig from pycylon.io.csv_read_config cimport CCSVReadOptions from pycylon.io.csv_read_config import CSVReadOptions from pycylon.io.csv_read_config cimport CSVReadOptions @@ -121,6 +124,13 @@ IF CYTHON_GLOO: else: raise ValueError('Passed object is not an instance of GlooConfig') +IF CYTHON_UCX & CYTHON_UCC: + cdef api shared_ptr[CUCXConfig] pycylon_unwrap_ucx_config(object config): + if isinstance(config, UCXConfig): + return ( config).ucx_config_shd_ptr + else: + raise ValueError('Passed object is not an instance of UcxConfig') + cdef api CCSVReadOptions pycylon_unwrap_csv_read_options(object csv_read_options): cdef CSVReadOptions csvrdopt if pyclon_is_csv_read_options(csv_read_options): @@ -145,13 +155,13 @@ cdef api shared_ptr[CDataType] pycylon_unwrap_data_type(object data_type): else: raise ValueError('Passed object is not an instance of DataType') -cdef api CSortOptions * pycylon_unwrap_sort_options(object sort_options): +cdef api shared_ptr[CSortOptions] pycylon_unwrap_sort_options(object sort_options): cdef SortOptions so if pyclon_is_sort_options(sort_options): so = sort_options return so.thisPtr else: - raise ValueError('Passed object is not an instance of DataType') + raise ValueError('Passed object is not an instance of SortOptions') cdef api shared_ptr[CBaseArrowIndex] pycylon_unwrap_base_arrow_index(object base_arrow_index): cdef BaseArrowIndex bi @@ -196,7 +206,7 @@ cdef api object pycylon_wrap_data_type(const shared_ptr[CDataType] & cdata_type) data_type.init(cdata_type) return data_type -cdef api object pycylon_wrap_sort_options(CSortOptions *csort_options): +cdef api object pycylon_wrap_sort_options(const shared_ptr[CSortOptions] &csort_options): cdef SortOptions sort_options = SortOptions.__new__(SortOptions) sort_options.init(csort_options) return sort_options diff --git a/python/pycylon/pycylon/ctx/context.pyx b/python/pycylon/pycylon/ctx/context.pyx index f3e9b71c6..c993947ee 100644 --- a/python/pycylon/pycylon/ctx/context.pyx +++ b/python/pycylon/pycylon/ctx/context.pyx @@ -20,6 +20,9 @@ from pycylon.ctx.context cimport CCylonContext from pycylon.api.lib cimport pycylon_unwrap_mpi_config IF CYTHON_GLOO: from pycylon.api.lib cimport pycylon_unwrap_gloo_config + +IF CYTHON_UCX & CYTHON_UCC: + from pycylon.api.lib cimport pycylon_unwrap_ucx_config from pycylon.net import CommType from pycylon.net.mpi_config cimport CMPIConfig from pycylon.net.mpi_config import MPIConfig @@ -82,6 +85,10 @@ cdef class CylonContext: if config.comm_type == CommType.GLOO: return pycylon_unwrap_gloo_config(config) + IF CYTHON_UCX & CYTHON_UCC: + if config.comm_type == CommType.UCX: + return pycylon_unwrap_ucx_config(config) + raise ValueError(f"Unsupported distributed comm config {config}") def get_rank(self) -> int: diff --git a/python/pycylon/pycylon/data/table.pxd b/python/pycylon/pycylon/data/table.pxd index 228fae890..084454edf 100644 --- a/python/pycylon/pycylon/data/table.pxd +++ b/python/pycylon/pycylon/data/table.pxd @@ -127,16 +127,22 @@ cdef extern from "../../../../cpp/src/cylon/table.hpp" namespace "cylon": cdef extern from "../../../../cpp/src/cylon/table.hpp" namespace "cylon": cdef cppclass CSortOptions "cylon::SortOptions": + enum CSortMethod: + CREGULAR_SAMPLE 'cylon::SortOptions::SortMethod::REGULAR_SAMPLE', + CINITIAL_SAMPLE 'cylon::SortOptions::SortMethod::INITIAL_SAMPLE' + int num_bins long num_samples + CSortMethod sort_method + @ staticmethod CSortOptions Defaults() cdef class SortOptions: cdef: - CSortOptions *thisPtr - void init(self, CSortOptions *csort_options) + shared_ptr[CSortOptions] thisPtr + void init(self, const shared_ptr[CSortOptions] &csort_options) cdef class Table: cdef: diff --git a/python/pycylon/pycylon/data/table.pyx b/python/pycylon/pycylon/data/table.pyx index 356baf83e..9144ebbcc 100644 --- a/python/pycylon/pycylon/data/table.pyx +++ b/python/pycylon/pycylon/data/table.pyx @@ -471,7 +471,7 @@ cdef class Table: """ cdef shared_ptr[CTable] output - cdef CSortOptions *csort_options + cdef shared_ptr[CSortOptions] csort_options cdef vector[int] sort_index cdef vector[cpp_bool] order_directions @@ -507,7 +507,7 @@ cdef class Table: else: csort_options = pycylon_unwrap_sort_options(SortOptions(0, 0)) cdef CStatus status = DistributedSort(self.table_shd_ptr, sort_index, output, - order_directions, csort_options[0]) + order_directions, csort_options.get()[0]) if status.is_ok(): return pycylon_wrap_table(output) else: @@ -2728,7 +2728,7 @@ cdef class SortOptions: """ Sort Operations for Distributed Sort """ - def __cinit__(self, num_bins: int = 0, num_samples: int = 0): + def __cinit__(self, num_bins: int = 0, num_samples: int = 0, sampling: str = 'regular'): ''' Initializes the CSortOptions struct Args: @@ -2738,9 +2738,17 @@ cdef class SortOptions: Returns: None ''' - self.thisPtr = new CSortOptions() - self.thisPtr.num_bins = num_bins - self.thisPtr.num_samples = num_samples + self.thisPtr = make_shared[CSortOptions]() + self.thisPtr.get().num_bins = num_bins + self.thisPtr.get().num_samples = num_samples - cdef void init(self, CSortOptions *csort_options): + sampling = sampling.lower() + if sampling == 'regular': + self.thisPtr.get().sort_method = CSortOptions.CSortMethod.CREGULAR_SAMPLE + elif sampling == 'initial': + self.thisPtr.get().sort_method = CSortOptions.CSortMethod.CINITIAL_SAMPLE + else: + raise ValueError(f'unknown sampling method for sorting: {sampling}') + + cdef void init(self, const shared_ptr[CSortOptions] &csort_options): self.thisPtr = csort_options diff --git a/python/pycylon/pycylon/net/gloo_config.pxd b/python/pycylon/pycylon/net/gloo_config.pxd index dbec281cf..f24b68ae8 100644 --- a/python/pycylon/pycylon/net/gloo_config.pxd +++ b/python/pycylon/pycylon/net/gloo_config.pxd @@ -31,6 +31,7 @@ IF CYTHON_GLOO: void SetTcpAiFamily(int tcp_ai_family) void SetFileStorePath(const string & file_store_path) void SetStorePrefix(const string & store_prefix) + void SetTimeout(int timeout) @staticmethod shared_ptr[CGlooConfig] MakeWithMpi(MPI_Comm comm); diff --git a/python/pycylon/pycylon/net/gloo_config.pyx b/python/pycylon/pycylon/net/gloo_config.pyx index 361fca365..8a7534fe4 100644 --- a/python/pycylon/pycylon/net/gloo_config.pyx +++ b/python/pycylon/pycylon/net/gloo_config.pyx @@ -38,6 +38,18 @@ IF CYTHON_GLOO: def comm_type(self): return self.gloo_config_shd_ptr.get().Type() + def set_tcp_hostname(self, hostname: str): + self.gloo_config_shd_ptr.get().SetTcpHostname(hostname.encode()) + + def set_tcp_iface(self, iface: str): + self.gloo_config_shd_ptr.get().SetTcpIface(iface.encode()) + + def set_tcp_ai_family(self, ai_family: int): + self.gloo_config_shd_ptr.get().SetTcpAiFamily(ai_family) + + def set_timeout(self, timeout: int): + self.gloo_config_shd_ptr.get().SetTimeout(timeout) + cdef class GlooStandaloneConfig(CommConfig): """ GlooConfig Type mapping from libCylon to PyCylon @@ -73,3 +85,6 @@ IF CYTHON_GLOO: def set_store_prefix(self, prefix: str): self.gloo_config_shd_ptr.get().SetStorePrefix(prefix.encode()) + + def set_timeout(self, timeout: int): + self.gloo_config_shd_ptr.get().SetTimeout(timeout) \ No newline at end of file diff --git a/python/pycylon/pycylon/net/ucx_config.pxd b/python/pycylon/pycylon/net/ucx_config.pxd new file mode 100644 index 000000000..a6f1b36c8 --- /dev/null +++ b/python/pycylon/pycylon/net/ucx_config.pxd @@ -0,0 +1,31 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +IF CYTHON_UCX & CYTHON_UCC: + from libcpp.memory cimport shared_ptr + + from pycylon.net.comm_type cimport CCommType + from pycylon.net.comm_config cimport CommConfig + + cdef extern from "../../../../cpp/src/cylon/net/ucx/ucx_communicator.hpp" namespace "cylon::net": + cdef cppclass CUCXConfig "cylon::net::UCXConfig": + CCommType Type() + + @ staticmethod + shared_ptr[CUCXConfig] Make(); + + + cdef class UCXConfig(CommConfig): + cdef: + shared_ptr[CUCXConfig] ucx_config_shd_ptr diff --git a/python/pycylon/pycylon/net/ucx_config.pyx b/python/pycylon/pycylon/net/ucx_config.pyx new file mode 100644 index 000000000..ed04440c3 --- /dev/null +++ b/python/pycylon/pycylon/net/ucx_config.pyx @@ -0,0 +1,28 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +IF CYTHON_UCX & CYTHON_UCC: + from pycylon.net.comm_config cimport CommConfig + from pycylon.net.ucx_config cimport CUCXConfig + + cdef class UCXConfig(CommConfig): + """ + GlooConfig Type mapping from libCylon to PyCylon + """ + def __cinit__(self): + self.ucx_config_shd_ptr = CUCXConfig.Make() + + @property + def comm_type(self): + return self.ucx_config_shd_ptr.get().Type() diff --git a/python/pycylon/setup.py b/python/pycylon/setup.py index 1cb302144..7e4149466 100644 --- a/python/pycylon/setup.py +++ b/python/pycylon/setup.py @@ -132,7 +132,7 @@ macros = [] # compile_time_env serves as preprocessor macros. ref: https://github.com/cython/cython/issues/2488 -compile_time_env = {'CYTHON_GLOO': False, 'CYTHON_UCC': False} +compile_time_env = {'CYTHON_GLOO': False, 'CYTHON_UCC': False, 'CYTHON_UCX': False} if CYLON_GLOO: libraries.append('gloo') library_directories.append(os.path.join(GLOO_PREFIX, 'lib')) @@ -147,6 +147,7 @@ _include_dirs.append(os.path.join(UCC_PREFIX, 'include')) macros.append(('BUILD_CYLON_UCX', '1')) macros.append(('BUILD_CYLON_UCC', '1')) + compile_time_env['CYTHON_UCX'] = True compile_time_env['CYTHON_UCC'] = True print('Libraries :', libraries) diff --git a/python/pycylon/test/test_all.py b/python/pycylon/test/test_all.py index b8fb158ed..61cba082f 100644 --- a/python/pycylon/test/test_all.py +++ b/python/pycylon/test/test_all.py @@ -14,6 +14,7 @@ import os + import numpy as np print("-------------------------------------------------") @@ -265,12 +266,14 @@ def test_dist_aggregate(): "python/pycylon/test/test_dist_aggregate.py")) assert responses[-1] == 0 + def test_dist_io(): print("34. Dist IO") responses.append(os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " "python/pycylon/test/test_io.py")) assert responses[-1] == 0 + if os.environ.get('CYLON_GLOO'): def test_gloo(): print("35. Gloo") @@ -278,6 +281,28 @@ def test_gloo(): assert responses[-1] == 0 + def test_gloo_mpi(): + print("36. Gloo") + responses.append(os.system( + f"{get_mpi_command()} -n 4 python -m pytest python/pycylon/test/test_gloo_mpi.py")) + assert responses[-1] == 0 + +if os.environ.get('CYLON_UCC'): + def test_ucx_mpi(): + print("37. UCX MPI") + responses.append(os.system( + f"{get_mpi_command()} -n 4 python -m pytest python/pycylon/test/test_ucx_mpi.py")) + assert responses[-1] == 0 + +if os.environ.get('CYLON_GLOO') and os.environ.get('CYLON_UCC'): + def test_mpi_multiple_env_init(): + print("38. Create and destroy multiple environments in MPI") + responses.append(os.system( + f"{get_mpi_command()} -n 4 python -m pytest " + f"python/pycylon/test/test_mpi_multiple_env_init.py")) + assert responses[-1] == 0 + + def test_all(): ar = np.array(responses) total = len(responses) diff --git a/python/pycylon/test/test_gloo.py b/python/pycylon/test/test_gloo.py index 2d1e422eb..47ffe1192 100644 --- a/python/pycylon/test/test_gloo.py +++ b/python/pycylon/test/test_gloo.py @@ -29,7 +29,7 @@ from pycylon.net.gloo_config import GlooStandaloneConfig FILE_STORE_PATH = os.path.join(tempfile.gettempdir(), 'gloo') -WORLD_SIZE = 1 +WORLD_SIZE = 4 ROWS = 5 diff --git a/python/pycylon/test/test_gloo_mpi.py b/python/pycylon/test/test_gloo_mpi.py new file mode 100644 index 000000000..0c62e283d --- /dev/null +++ b/python/pycylon/test/test_gloo_mpi.py @@ -0,0 +1,46 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +""" +Run test: +>> mpirun -n 4 python -m pytest --with-mpi -q python/pycylon/test/test_gloo.py +""" + +import pandas as pd +import pytest +from numpy.random import default_rng + +from pycylon import CylonEnv, DataFrame +from pycylon.net.gloo_config import GlooMPIConfig + +ROWS = 5 + + +@pytest.mark.mpi +def test_gloo_mpi(): # confirms that the code is under main function + conf = GlooMPIConfig() + env = CylonEnv(config=conf) + + rng = default_rng(seed=ROWS) + data1 = rng.integers(0, ROWS, size=(ROWS, 2)) + data2 = rng.integers(0, ROWS, size=(ROWS, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + print("Distributed Merge") + df3 = df1.merge(right=df2, on=[0], env=env) + print(f'res len {len(df3)}') + + env.finalize() diff --git a/python/pycylon/test/test_mpi_multiple_env_init.py b/python/pycylon/test/test_mpi_multiple_env_init.py new file mode 100644 index 000000000..76aff0e6a --- /dev/null +++ b/python/pycylon/test/test_mpi_multiple_env_init.py @@ -0,0 +1,55 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +""" +Run test +>> mpirun -n 2 python -m pytest --with-mpi -q python/pycylon/test/test_mpi_multiple_env_init.py +""" +import pandas as pd +import pytest +from mpi4py import MPI +from numpy.random import default_rng + +from pycylon import CylonEnv, DataFrame +from pycylon.net import MPIConfig +from pycylon.net.gloo_config import GlooMPIConfig +from pycylon.net.ucx_config import UCXConfig + + +def create_and_destroy_env(conf): + env = CylonEnv(config=conf) + print(f"{conf.comm_type} rank: {env.rank} world size: {env.world_size}") + r = 10 + + rng = default_rng() + data1 = rng.integers(0, r, size=(r, 2)) + data2 = rng.integers(0, r, size=(r, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + print(len(df1.merge(right=df2, on=[0], env=env))) + env.finalize() + + +@pytest.mark.mpi +def test_mpi_multiple_env_init(): + comm = MPI.COMM_WORLD + rank = comm.rank + world_sz = comm.size + print(f"mpi rank: {rank} world size: {world_sz}") + + create_and_destroy_env(MPIConfig()) + create_and_destroy_env(GlooMPIConfig()) + create_and_destroy_env(UCXConfig()) diff --git a/python/pycylon/test/test_ucx_mpi.py b/python/pycylon/test/test_ucx_mpi.py new file mode 100644 index 000000000..dc1528f5c --- /dev/null +++ b/python/pycylon/test/test_ucx_mpi.py @@ -0,0 +1,44 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +""" +Run test: +>> mpirun -n 4 python -m pytest -q python/pycylon/test/test_ucx_mpi.py +""" + +import pandas as pd +from numpy.random import default_rng + +from pycylon import CylonEnv, DataFrame +from pycylon.net.ucx_config import UCXConfig + +ROWS = 5 + + +def test_ucx_mpi(): # confirms that the code is under main function + conf = UCXConfig() + env = CylonEnv(config=conf) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, ROWS, size=(ROWS, 2)) + data2 = rng.integers(0, ROWS, size=(ROWS, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + print("Distributed Merge") + df3 = df1.merge(right=df2, on=[0], env=env) + print(f'res len {len(df3)}') + + env.finalize() \ No newline at end of file diff --git a/python/pygcylon/setup.py b/python/pygcylon/setup.py index be66a2f4c..662d58b25 100644 --- a/python/pygcylon/setup.py +++ b/python/pygcylon/setup.py @@ -163,7 +163,7 @@ def build_extensions(self): packages = find_packages(include=["pygcylon", "pygcylon.*"]) -compile_time_env = {'CYTHON_GLOO': False, 'CYTHON_UCC': False} +compile_time_env = {'CYTHON_GLOO': False, 'CYTHON_UCC': False, 'CYTHON_UCX': False} setup( name="pygcylon", packages=packages,