diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index 24c6a49ad..775a10979 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -20,7 +20,7 @@ jobs: fail-fast: false matrix: include: - - os: macos-10.15 + - os: macos-11 steps: - uses: actions/checkout@v2 diff --git a/build.py b/build.py index cf2eecfa6..8506cf733 100644 --- a/build.py +++ b/build.py @@ -89,6 +89,8 @@ def check_conda_prefix(): parser.add_argument("-ipath", help='Install directory') parser.add_argument("--verbose", help='Set verbosity', default=False, action="store_true") +parser.add_argument("-j", help='Parallel build threads', default=os.cpu_count(), + dest='parallel', type=int) args = parser.parse_args() @@ -107,9 +109,7 @@ def on_off(arg): RUN_CPP_TESTS = args.test RUN_PYTHON_TESTS = args.pytest CMAKE_FLAGS = args.cmake_flags -CPPLINT_COMMAND = "\"-DCMAKE_CXX_CPPLINT=cpplint;--linelength=100;--headers=h," \ - "hpp;--filter=-legal/copyright,-build/c++11,-runtime/references\" " if \ - args.style_check else " " +PARALLEL = args.parallel # arrow build expects /s even on windows BUILD_PYTHON = args.python @@ -127,6 +127,16 @@ def on_off(arg): PYTHON_EXEC = sys.executable +if args.style_check: + cmd = f'{PYTHON_EXEC} -m pip install cpplint' + res = subprocess.run(cmd, shell=True, cwd=PYTHON_SOURCE_DIR) + check_status(res.returncode, "cpplint install") + + CPPLINT_COMMAND = "-DCMAKE_CXX_CPPLINT=\"cpplint;--linelength=100;--headers=h," \ + "hpp;--filter=-legal/copyright,-build/c++11,-runtime/references\" " +else: + CPPLINT_COMMAND = " " + CMAKE_BOOL_FLAGS = {'CYLON_GLOO', 'CYLON_UCX', 'CYLON_UCC'} CMAKE_FALSE_OPTIONS = {'0', 'FALSE', 'OFF', 'N', 'NO', 'IGNORE', 'NOTFOUND'} @@ -169,6 +179,7 @@ def print_line(): logger.info(f"Python exec : {PYTHON_EXEC}") logger.info(f"Build mode : {CPP_BUILD_MODE}") logger.info(f"Build path : {BUILD_DIR}") +logger.info(f"Build threads : {PARALLEL}") logger.info(f"Install path : {INSTALL_DIR}") logger.info(f"CMake flags : {CMAKE_FLAGS}") logger.info(f" -CYLON_GLOO : {CYLON_GLOO}") @@ -206,7 +217,7 @@ def build_cpp(): res = subprocess.call(cmake_command, cwd=BUILD_DIR, shell=True) check_status(res, "C++ cmake generate") - cmake_build_command = f'cmake --build . --parallel {os.cpu_count()} --config {CPP_BUILD_MODE}' + cmake_build_command = f'cmake --build . --parallel {PARALLEL} --config {CPP_BUILD_MODE}' logger.info(f"Build command: {cmake_build_command}") res = subprocess.call(cmake_build_command, cwd=BUILD_DIR, shell=True) check_status(res, "C++ cmake build") @@ -254,6 +265,14 @@ def python_test(): env['LD_LIBRARY_PATH'] = os.path.join(GLOO_PREFIX, "lib") + os.pathsep + \ env['LD_LIBRARY_PATH'] + + if CYLON_UCC: + env['CYLON_UCC'] = str(CYLON_UCC) + env['UCC_PREFIX'] = UCC_PREFIX + env['LD_LIBRARY_PATH'] = os.path.join(UCC_PREFIX, "lib") + os.pathsep + \ + os.path.join(UCC_PREFIX, "lib", "ucc") + os.pathsep + \ + env['LD_LIBRARY_PATH'] + elif OS_NAME == 'Darwin': if 'DYLD_LIBRARY_PATH' in env: env['DYLD_LIBRARY_PATH'] = str(Path(INSTALL_DIR, "lib")) + os.pathsep \ @@ -279,7 +298,7 @@ def build_python(): conda_prefix = check_conda_prefix() - python_build_command = f'{PYTHON_EXEC} setup.py install --force' + python_build_command = f'{PYTHON_EXEC} -m pip install -v --upgrade .' env = os.environ env["CYLON_PREFIX"] = str(BUILD_DIR) if os.name == 'posix': diff --git a/conda/environments/cylon_MacOS.yml b/conda/environments/cylon_MacOS.yml index 8af49b18a..e1c2a797d 100644 --- a/conda/environments/cylon_MacOS.yml +++ b/conda/environments/cylon_MacOS.yml @@ -12,7 +12,7 @@ dependencies: - numpy>=1.16 - pandas>=1.0 - fsspec - - setuptools>=40.0,<60.0 + - setuptools>=40.0 # they are not needed for using pygcylon or compiling it - pytest - pytest-mpi diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index d7337f0b4..9b6e7c2ec 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -155,6 +155,10 @@ message("Finding MPI") find_package(MPI REQUIRED COMPONENTS CXX) message(STATUS "MPI include dir: ${MPI_CXX_INCLUDE_PATH}") message(STATUS "MPI libs: ${MPI_CXX_LIBRARIES}") + +if (CYLON_CUSTOM_MPIRUN) + set(MPIEXEC_EXECUTABLE ${CYLON_CUSTOM_MPIRUN}) +endif (CYLON_CUSTOM_MPIRUN) message(STATUS "MPI executable: ${MPIEXEC_EXECUTABLE}") include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH}) diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.cpp b/cpp/src/cylon/net/gloo/gloo_communicator.cpp index 7af55b28f..12d880398 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.cpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.cpp @@ -57,6 +57,9 @@ Status GlooCommunicator::Make(const std::shared_ptr &config, } else { // MPI is not initialized. Ask gloo to initialize MPI gloo_ctx = gloo::mpi::Context::createManaged(); } + if (gloo_config->timeout_ != kTimoutNotSet) { + gloo_ctx->setTimeout(std::chrono::seconds(gloo_config->timeout_)); + } ((gloo::mpi::Context &) *gloo_ctx).connectFullMesh(dev); #else return {Code::Invalid, "Gloo does not contain mpi headers!"}; @@ -69,8 +72,10 @@ Status GlooCommunicator::Make(const std::shared_ptr &config, gloo_ctx = std::make_shared(gloo_config->rank_, gloo_config->world_size_); + if (gloo_config->timeout_ != kTimoutNotSet) { + gloo_ctx->setTimeout(std::chrono::seconds(gloo_config->timeout_)); + } ((gloo::rendezvous::Context &) *gloo_ctx).connectFullMesh(*prefix_store, dev); - } *out = std::make_shared(pool, std::move(gloo_ctx)); return Status::OK(); @@ -193,5 +198,12 @@ int GlooConfig::world_size() const { return world_size_; } +void GlooConfig::SetTimeout(int timeout) { + GlooConfig::timeout_ = std::chrono::seconds(timeout); +} +const std::chrono::seconds &GlooConfig::timeout() const { + return GlooConfig::timeout_; +} + } } diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.hpp b/cpp/src/cylon/net/gloo/gloo_communicator.hpp index 1f5b9a825..c7523e00a 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.hpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.hpp @@ -33,6 +33,8 @@ namespace net { class GlooCommunicator; +static constexpr std::chrono::seconds kTimoutNotSet(0); + class GlooConfig : public CommConfig { public: explicit GlooConfig(int rank = 0, int world_size = 1, bool use_mpi = false); @@ -43,7 +45,9 @@ class GlooConfig : public CommConfig { int rank() const; int world_size() const; + const std::chrono::seconds &timeout() const; + void SetTimeout(int timeout); void SetTcpHostname(const std::string &tcp_hostname); void SetTcpIface(const std::string &tcp_iface); void SetTcpAiFamily(int tcp_ai_family); @@ -63,6 +67,7 @@ class GlooConfig : public CommConfig { int rank_; int world_size_; bool use_mpi_; + std::chrono::seconds timeout_ = kTimoutNotSet; #ifdef GLOO_USE_MPI /* @@ -74,7 +79,7 @@ class GlooConfig : public CommConfig { #endif //GLOO_USE_MPI // tcp attr - std::string tcp_hostname_ = "localhost"; + std::string tcp_hostname_; std::string tcp_iface_; int tcp_ai_family_ = AF_UNSPEC; diff --git a/cpp/src/cylon/net/mpi/mpi_communicator.cpp b/cpp/src/cylon/net/mpi/mpi_communicator.cpp index ba2b5768e..5a723e346 100644 --- a/cpp/src/cylon/net/mpi/mpi_communicator.cpp +++ b/cpp/src/cylon/net/mpi/mpi_communicator.cpp @@ -91,12 +91,13 @@ Status MPICommunicator::Make(const std::shared_ptr &config, void MPICommunicator::Finalize() { // finalize only if we initialized MPI - if (!externally_init) { + if (!externally_init && !IsFinalized()) { int finalized; MPI_Finalized(&finalized); if (!finalized) { MPI_Finalize(); } + finalized = true; } } void MPICommunicator::Barrier() { diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index 964d76efb..0e1771a83 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -27,6 +27,8 @@ namespace cylon { namespace net { +static constexpr int kBarrierFlag = UINT32_MAX; + void mpi_check_and_finalize() { int mpi_finalized; MPI_Finalized(&mpi_finalized); @@ -89,7 +91,8 @@ Status UCXCommunicator::AllReduce(const std::shared_ptr &column, return {Code::NotImplemented, "Allreduce not implemented for ucx"}; } -UCXCommunicator::UCXCommunicator(MemoryPool *pool) : Communicator(pool, -1, -1) {} +UCXCommunicator::UCXCommunicator(MemoryPool *pool, bool externally_init) + : Communicator(pool, -1, -1), externally_init(externally_init) {} Status UCXCommunicator::AllReduce(const std::shared_ptr &values, net::ReduceOp reduce_op, @@ -117,10 +120,16 @@ Status UCXCommunicator::Allgather(const std::shared_ptr &value, Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out) { CYLON_UNUSED(config); - *out = std::make_shared(pool); - auto &comm = static_cast(**out); - // Check init functions + // MPI init int initialized; + MPI_Initialized(&initialized); + if (!initialized) { + RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); + } + + *out = std::make_shared(pool, initialized); + auto &comm = static_cast(**out); + // Int variable used when iterating int sIndx; // Address of the UCP Worker for receiving @@ -133,12 +142,6 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo // Variable to hold the current ucp address ucp_address_t *address; - // MPI init - MPI_Initialized(&initialized); - if (!initialized) { - RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); - } - // Get the rank for checking send to self, and initializations MPI_Comm_rank(MPI_COMM_WORLD, &comm.rank); MPI_Comm_size(MPI_COMM_WORLD, &comm.world_size); @@ -205,7 +208,7 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo } void UCXCommunicator::Finalize() { - if (!this->IsFinalized()) { + if (!externally_init && !IsFinalized()) { ucp_cleanup(ucpContext); mpi_check_and_finalize(); finalized = true; @@ -329,14 +332,36 @@ void UCXUCCCommunicator::Finalize() { } } ucc_context_destroy(uccContext); - mpi_check_and_finalize(); - ucx_comm_->Finalize(); + ucx_comm_->Finalize(); // this will handle MPI_Finalize finalized = true; } } void UCXUCCCommunicator::Barrier() { - return ucx_comm_->Barrier(); + ucc_coll_args_t args_; + ucc_coll_req_h req; + + args_.mask = 0; + args_.coll_type = UCC_COLL_TYPE_BARRIER; + + ucc_status_t status; + status = ucc_collective_init(&args_, &req, uccTeam); + assert(status == UCC_OK); + + status = ucc_collective_post(req); + assert(status == UCC_OK); + + status = ucc_collective_test(req); + while (status > UCC_OK) { // loop until status == 0 + status = ucc_context_progress(uccContext); + assert(status >= UCC_OK); // should be 0, 1 or 2 + + status = ucc_collective_test(req); + } + assert(status == UCC_OK); + + status = ucc_collective_finalize(req); + assert(status == UCC_OK); } Status UCXUCCCommunicator::AllGather(const std::shared_ptr &table, diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.hpp b/cpp/src/cylon/net/ucx/ucx_communicator.hpp index 57ba07578..74abb5200 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.hpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.hpp @@ -29,15 +29,15 @@ namespace cylon { namespace net { class UCXConfig : public CommConfig { + public: CommType Type() override; - public: static std::shared_ptr Make(); }; class UCXCommunicator : public Communicator { public: - explicit UCXCommunicator(MemoryPool *pool); + explicit UCXCommunicator(MemoryPool *pool, bool externally_init); ~UCXCommunicator() override = default; std::unique_ptr CreateChannel() const override; @@ -78,6 +78,7 @@ class UCXCommunicator : public Communicator { std::unordered_map endPointMap; // UCP Context - Holds a UCP communication instance's global information. ucp_context_h ucpContext{}; + bool externally_init = false; }; #ifdef BUILD_CYLON_UCC diff --git a/cpp/src/cylon/table.cpp b/cpp/src/cylon/table.cpp index 07964dfc8..d018cf186 100644 --- a/cpp/src/cylon/table.cpp +++ b/cpp/src/cylon/table.cpp @@ -504,9 +504,8 @@ Status MergeSortedTable(const std::vector> &tables, const std::vector &sort_orders, std::shared_ptr
&out) { std::shared_ptr
concatenated; - std::vector table_indices(tables.size()), - table_end_indices(tables.size()); - int acc = 0; + std::vector table_indices(tables.size()), table_end_indices(tables.size()); + int64_t acc = 0; for (size_t i = 0; i < table_indices.size(); i++) { table_indices[i] = acc; acc += tables[i]->Rows(); @@ -515,7 +514,7 @@ Status MergeSortedTable(const std::vector> &tables, RETURN_CYLON_STATUS_IF_FAILED(Merge(tables, concatenated)); - if(concatenated->GetContext()->GetWorldSize() > 4) { + if (concatenated->GetContext()->GetWorldSize() > 4) { return Sort(concatenated, sort_columns, out, sort_orders); } @@ -530,7 +529,7 @@ Status MergeSortedTable(const std::vector> &tables, std::priority_queue, decltype(comp)> pq(comp); - for (size_t i = 0; i < tables.size(); i++) { + for (int i = 0; i < (int) tables.size(); i++) { if (table_indices[i] < table_end_indices[i]) { pq.push(i); } @@ -541,13 +540,9 @@ Status MergeSortedTable(const std::vector> &tables, arrow::Int64Builder filter(pool); RETURN_CYLON_STATUS_IF_ARROW_FAILED(filter.Reserve(concatenated->Rows())); - std::vector temp_v; - while (!pq.empty()) { int t = pq.top(); pq.pop(); - // std::cout<> &tables, } CYLON_ASSIGN_OR_RAISE(auto take_arr, filter.Finish()); - CYLON_ASSIGN_OR_RAISE( - auto take_res, - (arrow::compute::Take(concatenated->get_table(), take_arr))); + CYLON_ASSIGN_OR_RAISE(auto take_res, + (arrow::compute::Take(concatenated->get_table(), take_arr))); out = std::make_shared
(ctx, take_res.table()); diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index b709ec1c8..2db81f853 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -238,5 +238,8 @@ if (CYLON_UCX) cylon_run_test(aggregate_test 1 ucx) cylon_run_test(aggregate_test 2 ucx) cylon_run_test(aggregate_test 4 ucx) + + cylon_run_test(sync_comms_test 1 ucx) + cylon_run_test(sync_comms_test 4 ucx) endif (CYLON_UCC) endif (CYLON_UCX) \ No newline at end of file diff --git a/cpp/test/sync_comms_test.cpp b/cpp/test/sync_comms_test.cpp index 76911e1ca..62d810cb6 100644 --- a/cpp/test/sync_comms_test.cpp +++ b/cpp/test/sync_comms_test.cpp @@ -12,12 +12,22 @@ * limitations under the License. */ #include +#include +#include #include "common/test_header.hpp" namespace cylon { namespace test { +TEST_CASE("barrier", "[sync comms]") { + srand((unsigned) time(nullptr)); + int i = (rand() % 2000) + 1; + + std::this_thread::sleep_for(std::chrono::duration(i)); + ctx->Barrier(); +} + enum GenType { Empty, Null, NonEmpty }; void generate_table(std::shared_ptr *schema, @@ -113,6 +123,11 @@ TEST_CASE("all gather table", "[sync comms]") { } TEST_CASE("gather table", "[sync comms]") { + // todo: UCC doesnt support gatherv for the moment #599 + if (ctx->GetCommType() == net::UCX){ + return; + } + std::shared_ptr schema; std::shared_ptr in_table; generate_table(&schema, &in_table); diff --git a/python/pycylon/pycylon/api/lib.pxd b/python/pycylon/pycylon/api/lib.pxd index ad6368bea..96cad6ca3 100644 --- a/python/pycylon/pycylon/api/lib.pxd +++ b/python/pycylon/pycylon/api/lib.pxd @@ -25,6 +25,8 @@ from pycylon.net.comm_config cimport CCommConfig from pycylon.net.mpi_config cimport CMPIConfig IF CYTHON_GLOO: from pycylon.net.gloo_config cimport CGlooConfig +IF CYTHON_UCX & CYTHON_UCC: + from pycylon.net.ucx_config cimport CUCXConfig from pycylon.io.csv_read_config cimport CCSVReadOptions from pycylon.io.csv_read_config import CSVReadOptions from pycylon.io.csv_read_config cimport CSVReadOptions @@ -59,6 +61,9 @@ cdef api shared_ptr[CMPIConfig] pycylon_unwrap_mpi_config(object config) IF CYTHON_GLOO: cdef api shared_ptr[CGlooConfig] pycylon_unwrap_gloo_config(object config) +IF CYTHON_UCX & CYTHON_UCC: + cdef api shared_ptr[CUCXConfig] pycylon_unwrap_ucx_config(object config) + cdef api shared_ptr[CTable] pycylon_unwrap_table(object table) cdef api shared_ptr[CDataType] pycylon_unwrap_data_type(object data_type) @@ -67,7 +72,7 @@ cdef api CCSVReadOptions pycylon_unwrap_csv_read_options(object csv_read_options cdef api CCSVWriteOptions pycylon_unwrap_csv_write_options(object csv_write_options) -cdef api CSortOptions * pycylon_unwrap_sort_options(object sort_options) +cdef api shared_ptr[CSortOptions] pycylon_unwrap_sort_options(object sort_options) cdef api shared_ptr[CBaseArrowIndex] pycylon_unwrap_base_arrow_index(object base_arrow_index) @@ -87,7 +92,7 @@ cdef api object pycylon_wrap_layout(const CLayout & layout) cdef api object pycylon_wrap_data_type(const shared_ptr[CDataType] & data_type) -cdef api object pycylon_wrap_sort_options(CSortOptions *sort_options) +cdef api object pycylon_wrap_sort_options(const shared_ptr[CSortOptions] &sort_options) cdef api object pycylon_wrap_base_arrow_index(const shared_ptr[CBaseArrowIndex] & base_arrow_index) diff --git a/python/pycylon/pycylon/api/lib.pyx b/python/pycylon/pycylon/api/lib.pyx index a1130f1d3..67bec2700 100644 --- a/python/pycylon/pycylon/api/lib.pyx +++ b/python/pycylon/pycylon/api/lib.pyx @@ -29,6 +29,9 @@ from pycylon.net.mpi_config cimport MPIConfig IF CYTHON_GLOO: from pycylon.net.gloo_config import GlooMPIConfig, GlooStandaloneConfig from pycylon.net.gloo_config cimport CGlooConfig, GlooMPIConfig, GlooStandaloneConfig +IF CYTHON_UCX & CYTHON_UCC: + from pycylon.net.ucx_config import UCXConfig + from pycylon.net.ucx_config cimport CUCXConfig, UCXConfig from pycylon.io.csv_read_config cimport CCSVReadOptions from pycylon.io.csv_read_config import CSVReadOptions from pycylon.io.csv_read_config cimport CSVReadOptions @@ -121,6 +124,13 @@ IF CYTHON_GLOO: else: raise ValueError('Passed object is not an instance of GlooConfig') +IF CYTHON_UCX & CYTHON_UCC: + cdef api shared_ptr[CUCXConfig] pycylon_unwrap_ucx_config(object config): + if isinstance(config, UCXConfig): + return ( config).ucx_config_shd_ptr + else: + raise ValueError('Passed object is not an instance of UcxConfig') + cdef api CCSVReadOptions pycylon_unwrap_csv_read_options(object csv_read_options): cdef CSVReadOptions csvrdopt if pyclon_is_csv_read_options(csv_read_options): @@ -145,13 +155,13 @@ cdef api shared_ptr[CDataType] pycylon_unwrap_data_type(object data_type): else: raise ValueError('Passed object is not an instance of DataType') -cdef api CSortOptions * pycylon_unwrap_sort_options(object sort_options): +cdef api shared_ptr[CSortOptions] pycylon_unwrap_sort_options(object sort_options): cdef SortOptions so if pyclon_is_sort_options(sort_options): so = sort_options return so.thisPtr else: - raise ValueError('Passed object is not an instance of DataType') + raise ValueError('Passed object is not an instance of SortOptions') cdef api shared_ptr[CBaseArrowIndex] pycylon_unwrap_base_arrow_index(object base_arrow_index): cdef BaseArrowIndex bi @@ -196,7 +206,7 @@ cdef api object pycylon_wrap_data_type(const shared_ptr[CDataType] & cdata_type) data_type.init(cdata_type) return data_type -cdef api object pycylon_wrap_sort_options(CSortOptions *csort_options): +cdef api object pycylon_wrap_sort_options(const shared_ptr[CSortOptions] &csort_options): cdef SortOptions sort_options = SortOptions.__new__(SortOptions) sort_options.init(csort_options) return sort_options diff --git a/python/pycylon/pycylon/ctx/context.pyx b/python/pycylon/pycylon/ctx/context.pyx index f3e9b71c6..c993947ee 100644 --- a/python/pycylon/pycylon/ctx/context.pyx +++ b/python/pycylon/pycylon/ctx/context.pyx @@ -20,6 +20,9 @@ from pycylon.ctx.context cimport CCylonContext from pycylon.api.lib cimport pycylon_unwrap_mpi_config IF CYTHON_GLOO: from pycylon.api.lib cimport pycylon_unwrap_gloo_config + +IF CYTHON_UCX & CYTHON_UCC: + from pycylon.api.lib cimport pycylon_unwrap_ucx_config from pycylon.net import CommType from pycylon.net.mpi_config cimport CMPIConfig from pycylon.net.mpi_config import MPIConfig @@ -82,6 +85,10 @@ cdef class CylonContext: if config.comm_type == CommType.GLOO: return pycylon_unwrap_gloo_config(config) + IF CYTHON_UCX & CYTHON_UCC: + if config.comm_type == CommType.UCX: + return pycylon_unwrap_ucx_config(config) + raise ValueError(f"Unsupported distributed comm config {config}") def get_rank(self) -> int: diff --git a/python/pycylon/pycylon/data/table.pxd b/python/pycylon/pycylon/data/table.pxd index 228fae890..084454edf 100644 --- a/python/pycylon/pycylon/data/table.pxd +++ b/python/pycylon/pycylon/data/table.pxd @@ -127,16 +127,22 @@ cdef extern from "../../../../cpp/src/cylon/table.hpp" namespace "cylon": cdef extern from "../../../../cpp/src/cylon/table.hpp" namespace "cylon": cdef cppclass CSortOptions "cylon::SortOptions": + enum CSortMethod: + CREGULAR_SAMPLE 'cylon::SortOptions::SortMethod::REGULAR_SAMPLE', + CINITIAL_SAMPLE 'cylon::SortOptions::SortMethod::INITIAL_SAMPLE' + int num_bins long num_samples + CSortMethod sort_method + @ staticmethod CSortOptions Defaults() cdef class SortOptions: cdef: - CSortOptions *thisPtr - void init(self, CSortOptions *csort_options) + shared_ptr[CSortOptions] thisPtr + void init(self, const shared_ptr[CSortOptions] &csort_options) cdef class Table: cdef: diff --git a/python/pycylon/pycylon/data/table.pyx b/python/pycylon/pycylon/data/table.pyx index 356baf83e..9144ebbcc 100644 --- a/python/pycylon/pycylon/data/table.pyx +++ b/python/pycylon/pycylon/data/table.pyx @@ -471,7 +471,7 @@ cdef class Table: """ cdef shared_ptr[CTable] output - cdef CSortOptions *csort_options + cdef shared_ptr[CSortOptions] csort_options cdef vector[int] sort_index cdef vector[cpp_bool] order_directions @@ -507,7 +507,7 @@ cdef class Table: else: csort_options = pycylon_unwrap_sort_options(SortOptions(0, 0)) cdef CStatus status = DistributedSort(self.table_shd_ptr, sort_index, output, - order_directions, csort_options[0]) + order_directions, csort_options.get()[0]) if status.is_ok(): return pycylon_wrap_table(output) else: @@ -2728,7 +2728,7 @@ cdef class SortOptions: """ Sort Operations for Distributed Sort """ - def __cinit__(self, num_bins: int = 0, num_samples: int = 0): + def __cinit__(self, num_bins: int = 0, num_samples: int = 0, sampling: str = 'regular'): ''' Initializes the CSortOptions struct Args: @@ -2738,9 +2738,17 @@ cdef class SortOptions: Returns: None ''' - self.thisPtr = new CSortOptions() - self.thisPtr.num_bins = num_bins - self.thisPtr.num_samples = num_samples + self.thisPtr = make_shared[CSortOptions]() + self.thisPtr.get().num_bins = num_bins + self.thisPtr.get().num_samples = num_samples - cdef void init(self, CSortOptions *csort_options): + sampling = sampling.lower() + if sampling == 'regular': + self.thisPtr.get().sort_method = CSortOptions.CSortMethod.CREGULAR_SAMPLE + elif sampling == 'initial': + self.thisPtr.get().sort_method = CSortOptions.CSortMethod.CINITIAL_SAMPLE + else: + raise ValueError(f'unknown sampling method for sorting: {sampling}') + + cdef void init(self, const shared_ptr[CSortOptions] &csort_options): self.thisPtr = csort_options diff --git a/python/pycylon/pycylon/net/gloo_config.pxd b/python/pycylon/pycylon/net/gloo_config.pxd index dbec281cf..f24b68ae8 100644 --- a/python/pycylon/pycylon/net/gloo_config.pxd +++ b/python/pycylon/pycylon/net/gloo_config.pxd @@ -31,6 +31,7 @@ IF CYTHON_GLOO: void SetTcpAiFamily(int tcp_ai_family) void SetFileStorePath(const string & file_store_path) void SetStorePrefix(const string & store_prefix) + void SetTimeout(int timeout) @staticmethod shared_ptr[CGlooConfig] MakeWithMpi(MPI_Comm comm); diff --git a/python/pycylon/pycylon/net/gloo_config.pyx b/python/pycylon/pycylon/net/gloo_config.pyx index 361fca365..8a7534fe4 100644 --- a/python/pycylon/pycylon/net/gloo_config.pyx +++ b/python/pycylon/pycylon/net/gloo_config.pyx @@ -38,6 +38,18 @@ IF CYTHON_GLOO: def comm_type(self): return self.gloo_config_shd_ptr.get().Type() + def set_tcp_hostname(self, hostname: str): + self.gloo_config_shd_ptr.get().SetTcpHostname(hostname.encode()) + + def set_tcp_iface(self, iface: str): + self.gloo_config_shd_ptr.get().SetTcpIface(iface.encode()) + + def set_tcp_ai_family(self, ai_family: int): + self.gloo_config_shd_ptr.get().SetTcpAiFamily(ai_family) + + def set_timeout(self, timeout: int): + self.gloo_config_shd_ptr.get().SetTimeout(timeout) + cdef class GlooStandaloneConfig(CommConfig): """ GlooConfig Type mapping from libCylon to PyCylon @@ -73,3 +85,6 @@ IF CYTHON_GLOO: def set_store_prefix(self, prefix: str): self.gloo_config_shd_ptr.get().SetStorePrefix(prefix.encode()) + + def set_timeout(self, timeout: int): + self.gloo_config_shd_ptr.get().SetTimeout(timeout) \ No newline at end of file diff --git a/python/pycylon/pycylon/net/ucx_config.pxd b/python/pycylon/pycylon/net/ucx_config.pxd new file mode 100644 index 000000000..a6f1b36c8 --- /dev/null +++ b/python/pycylon/pycylon/net/ucx_config.pxd @@ -0,0 +1,31 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +IF CYTHON_UCX & CYTHON_UCC: + from libcpp.memory cimport shared_ptr + + from pycylon.net.comm_type cimport CCommType + from pycylon.net.comm_config cimport CommConfig + + cdef extern from "../../../../cpp/src/cylon/net/ucx/ucx_communicator.hpp" namespace "cylon::net": + cdef cppclass CUCXConfig "cylon::net::UCXConfig": + CCommType Type() + + @ staticmethod + shared_ptr[CUCXConfig] Make(); + + + cdef class UCXConfig(CommConfig): + cdef: + shared_ptr[CUCXConfig] ucx_config_shd_ptr diff --git a/python/pycylon/pycylon/net/ucx_config.pyx b/python/pycylon/pycylon/net/ucx_config.pyx new file mode 100644 index 000000000..ed04440c3 --- /dev/null +++ b/python/pycylon/pycylon/net/ucx_config.pyx @@ -0,0 +1,28 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +IF CYTHON_UCX & CYTHON_UCC: + from pycylon.net.comm_config cimport CommConfig + from pycylon.net.ucx_config cimport CUCXConfig + + cdef class UCXConfig(CommConfig): + """ + GlooConfig Type mapping from libCylon to PyCylon + """ + def __cinit__(self): + self.ucx_config_shd_ptr = CUCXConfig.Make() + + @property + def comm_type(self): + return self.ucx_config_shd_ptr.get().Type() diff --git a/python/pycylon/setup.py b/python/pycylon/setup.py index 1cb302144..7e4149466 100644 --- a/python/pycylon/setup.py +++ b/python/pycylon/setup.py @@ -132,7 +132,7 @@ macros = [] # compile_time_env serves as preprocessor macros. ref: https://github.com/cython/cython/issues/2488 -compile_time_env = {'CYTHON_GLOO': False, 'CYTHON_UCC': False} +compile_time_env = {'CYTHON_GLOO': False, 'CYTHON_UCC': False, 'CYTHON_UCX': False} if CYLON_GLOO: libraries.append('gloo') library_directories.append(os.path.join(GLOO_PREFIX, 'lib')) @@ -147,6 +147,7 @@ _include_dirs.append(os.path.join(UCC_PREFIX, 'include')) macros.append(('BUILD_CYLON_UCX', '1')) macros.append(('BUILD_CYLON_UCC', '1')) + compile_time_env['CYTHON_UCX'] = True compile_time_env['CYTHON_UCC'] = True print('Libraries :', libraries) diff --git a/python/pycylon/test/test_all.py b/python/pycylon/test/test_all.py index b8fb158ed..61cba082f 100644 --- a/python/pycylon/test/test_all.py +++ b/python/pycylon/test/test_all.py @@ -14,6 +14,7 @@ import os + import numpy as np print("-------------------------------------------------") @@ -265,12 +266,14 @@ def test_dist_aggregate(): "python/pycylon/test/test_dist_aggregate.py")) assert responses[-1] == 0 + def test_dist_io(): print("34. Dist IO") responses.append(os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " "python/pycylon/test/test_io.py")) assert responses[-1] == 0 + if os.environ.get('CYLON_GLOO'): def test_gloo(): print("35. Gloo") @@ -278,6 +281,28 @@ def test_gloo(): assert responses[-1] == 0 + def test_gloo_mpi(): + print("36. Gloo") + responses.append(os.system( + f"{get_mpi_command()} -n 4 python -m pytest python/pycylon/test/test_gloo_mpi.py")) + assert responses[-1] == 0 + +if os.environ.get('CYLON_UCC'): + def test_ucx_mpi(): + print("37. UCX MPI") + responses.append(os.system( + f"{get_mpi_command()} -n 4 python -m pytest python/pycylon/test/test_ucx_mpi.py")) + assert responses[-1] == 0 + +if os.environ.get('CYLON_GLOO') and os.environ.get('CYLON_UCC'): + def test_mpi_multiple_env_init(): + print("38. Create and destroy multiple environments in MPI") + responses.append(os.system( + f"{get_mpi_command()} -n 4 python -m pytest " + f"python/pycylon/test/test_mpi_multiple_env_init.py")) + assert responses[-1] == 0 + + def test_all(): ar = np.array(responses) total = len(responses) diff --git a/python/pycylon/test/test_gloo.py b/python/pycylon/test/test_gloo.py index 2d1e422eb..47ffe1192 100644 --- a/python/pycylon/test/test_gloo.py +++ b/python/pycylon/test/test_gloo.py @@ -29,7 +29,7 @@ from pycylon.net.gloo_config import GlooStandaloneConfig FILE_STORE_PATH = os.path.join(tempfile.gettempdir(), 'gloo') -WORLD_SIZE = 1 +WORLD_SIZE = 4 ROWS = 5 diff --git a/python/pycylon/test/test_gloo_mpi.py b/python/pycylon/test/test_gloo_mpi.py new file mode 100644 index 000000000..0c62e283d --- /dev/null +++ b/python/pycylon/test/test_gloo_mpi.py @@ -0,0 +1,46 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +""" +Run test: +>> mpirun -n 4 python -m pytest --with-mpi -q python/pycylon/test/test_gloo.py +""" + +import pandas as pd +import pytest +from numpy.random import default_rng + +from pycylon import CylonEnv, DataFrame +from pycylon.net.gloo_config import GlooMPIConfig + +ROWS = 5 + + +@pytest.mark.mpi +def test_gloo_mpi(): # confirms that the code is under main function + conf = GlooMPIConfig() + env = CylonEnv(config=conf) + + rng = default_rng(seed=ROWS) + data1 = rng.integers(0, ROWS, size=(ROWS, 2)) + data2 = rng.integers(0, ROWS, size=(ROWS, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + print("Distributed Merge") + df3 = df1.merge(right=df2, on=[0], env=env) + print(f'res len {len(df3)}') + + env.finalize() diff --git a/python/pycylon/test/test_mpi_multiple_env_init.py b/python/pycylon/test/test_mpi_multiple_env_init.py new file mode 100644 index 000000000..76aff0e6a --- /dev/null +++ b/python/pycylon/test/test_mpi_multiple_env_init.py @@ -0,0 +1,55 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +""" +Run test +>> mpirun -n 2 python -m pytest --with-mpi -q python/pycylon/test/test_mpi_multiple_env_init.py +""" +import pandas as pd +import pytest +from mpi4py import MPI +from numpy.random import default_rng + +from pycylon import CylonEnv, DataFrame +from pycylon.net import MPIConfig +from pycylon.net.gloo_config import GlooMPIConfig +from pycylon.net.ucx_config import UCXConfig + + +def create_and_destroy_env(conf): + env = CylonEnv(config=conf) + print(f"{conf.comm_type} rank: {env.rank} world size: {env.world_size}") + r = 10 + + rng = default_rng() + data1 = rng.integers(0, r, size=(r, 2)) + data2 = rng.integers(0, r, size=(r, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + print(len(df1.merge(right=df2, on=[0], env=env))) + env.finalize() + + +@pytest.mark.mpi +def test_mpi_multiple_env_init(): + comm = MPI.COMM_WORLD + rank = comm.rank + world_sz = comm.size + print(f"mpi rank: {rank} world size: {world_sz}") + + create_and_destroy_env(MPIConfig()) + create_and_destroy_env(GlooMPIConfig()) + create_and_destroy_env(UCXConfig()) diff --git a/python/pycylon/test/test_ucx_mpi.py b/python/pycylon/test/test_ucx_mpi.py new file mode 100644 index 000000000..dc1528f5c --- /dev/null +++ b/python/pycylon/test/test_ucx_mpi.py @@ -0,0 +1,44 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +""" +Run test: +>> mpirun -n 4 python -m pytest -q python/pycylon/test/test_ucx_mpi.py +""" + +import pandas as pd +from numpy.random import default_rng + +from pycylon import CylonEnv, DataFrame +from pycylon.net.ucx_config import UCXConfig + +ROWS = 5 + + +def test_ucx_mpi(): # confirms that the code is under main function + conf = UCXConfig() + env = CylonEnv(config=conf) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, ROWS, size=(ROWS, 2)) + data2 = rng.integers(0, ROWS, size=(ROWS, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + print("Distributed Merge") + df3 = df1.merge(right=df2, on=[0], env=env) + print(f'res len {len(df3)}') + + env.finalize() \ No newline at end of file diff --git a/python/pygcylon/setup.py b/python/pygcylon/setup.py index be66a2f4c..662d58b25 100644 --- a/python/pygcylon/setup.py +++ b/python/pygcylon/setup.py @@ -163,7 +163,7 @@ def build_extensions(self): packages = find_packages(include=["pygcylon", "pygcylon.*"]) -compile_time_env = {'CYTHON_GLOO': False, 'CYTHON_UCC': False} +compile_time_env = {'CYTHON_GLOO': False, 'CYTHON_UCC': False, 'CYTHON_UCX': False} setup( name="pygcylon", packages=packages,