From 3d2ad2da036745dc8cf63aa82a54c24d556c22de Mon Sep 17 00:00:00 2001 From: niranda Date: Mon, 18 Jul 2022 14:46:17 -0400 Subject: [PATCH 01/30] remove gloo default hostname --- cpp/src/cylon/net/gloo/gloo_communicator.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.hpp b/cpp/src/cylon/net/gloo/gloo_communicator.hpp index 1f5b9a825..ccfd85253 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.hpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.hpp @@ -74,7 +74,7 @@ class GlooConfig : public CommConfig { #endif //GLOO_USE_MPI // tcp attr - std::string tcp_hostname_ = "localhost"; + std::string tcp_hostname_; std::string tcp_iface_; int tcp_ai_family_ = AF_UNSPEC; From 2c88bec77501bf93c8af3206bbc87235e59c5662 Mon Sep 17 00:00:00 2001 From: niranda Date: Mon, 18 Jul 2022 14:55:08 -0400 Subject: [PATCH 02/30] minor change gloo --- python/pycylon/pycylon/net/gloo_config.pyx | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/python/pycylon/pycylon/net/gloo_config.pyx b/python/pycylon/pycylon/net/gloo_config.pyx index 361fca365..755a55586 100644 --- a/python/pycylon/pycylon/net/gloo_config.pyx +++ b/python/pycylon/pycylon/net/gloo_config.pyx @@ -38,6 +38,15 @@ IF CYTHON_GLOO: def comm_type(self): return self.gloo_config_shd_ptr.get().Type() + def set_tcp_hostname(self, hostname: str): + self.gloo_config_shd_ptr.get().SetTcpHostname(hostname.encode()) + + def set_tcp_iface(self, iface: str): + self.gloo_config_shd_ptr.get().SetTcpIface(iface.encode()) + + def set_tcp_ai_family(self, ai_family: int): + self.gloo_config_shd_ptr.get().SetTcpAiFamily(ai_family) + cdef class GlooStandaloneConfig(CommConfig): """ GlooConfig Type mapping from libCylon to PyCylon From 21fc28119066be8b2b7578849fa2cee0ae09794e Mon Sep 17 00:00:00 2001 From: niranda Date: Mon, 18 Jul 2022 18:12:15 -0400 Subject: [PATCH 03/30] adding gloo-mpi test --- python/pycylon/test/test_all.py | 10 ++++++ python/pycylon/test/test_gloo.py | 2 +- python/pycylon/test/test_gloo_mpi.py | 46 ++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 python/pycylon/test/test_gloo_mpi.py diff --git a/python/pycylon/test/test_all.py b/python/pycylon/test/test_all.py index b8fb158ed..e54f83e86 100644 --- a/python/pycylon/test/test_all.py +++ b/python/pycylon/test/test_all.py @@ -14,6 +14,7 @@ import os + import numpy as np print("-------------------------------------------------") @@ -265,12 +266,14 @@ def test_dist_aggregate(): "python/pycylon/test/test_dist_aggregate.py")) assert responses[-1] == 0 + def test_dist_io(): print("34. Dist IO") responses.append(os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " "python/pycylon/test/test_io.py")) assert responses[-1] == 0 + if os.environ.get('CYLON_GLOO'): def test_gloo(): print("35. Gloo") @@ -278,6 +281,13 @@ def test_gloo(): assert responses[-1] == 0 + def test_gloo_mpi(): + print("36. Gloo") + responses.append(os.system( + f"{get_mpi_command()} -n 4 python -m pytest python/pycylon/test/test_gloo_mpi.py")) + assert responses[-1] == 0 + + def test_all(): ar = np.array(responses) total = len(responses) diff --git a/python/pycylon/test/test_gloo.py b/python/pycylon/test/test_gloo.py index 2d1e422eb..47ffe1192 100644 --- a/python/pycylon/test/test_gloo.py +++ b/python/pycylon/test/test_gloo.py @@ -29,7 +29,7 @@ from pycylon.net.gloo_config import GlooStandaloneConfig FILE_STORE_PATH = os.path.join(tempfile.gettempdir(), 'gloo') -WORLD_SIZE = 1 +WORLD_SIZE = 4 ROWS = 5 diff --git a/python/pycylon/test/test_gloo_mpi.py b/python/pycylon/test/test_gloo_mpi.py new file mode 100644 index 000000000..0c62e283d --- /dev/null +++ b/python/pycylon/test/test_gloo_mpi.py @@ -0,0 +1,46 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +""" +Run test: +>> mpirun -n 4 python -m pytest --with-mpi -q python/pycylon/test/test_gloo.py +""" + +import pandas as pd +import pytest +from numpy.random import default_rng + +from pycylon import CylonEnv, DataFrame +from pycylon.net.gloo_config import GlooMPIConfig + +ROWS = 5 + + +@pytest.mark.mpi +def test_gloo_mpi(): # confirms that the code is under main function + conf = GlooMPIConfig() + env = CylonEnv(config=conf) + + rng = default_rng(seed=ROWS) + data1 = rng.integers(0, ROWS, size=(ROWS, 2)) + data2 = rng.integers(0, ROWS, size=(ROWS, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + print("Distributed Merge") + df3 = df1.merge(right=df2, on=[0], env=env) + print(f'res len {len(df3)}') + + env.finalize() From 256397dc0847e6816e2cc37b793b43481bb357a4 Mon Sep 17 00:00:00 2001 From: niranda Date: Mon, 18 Jul 2022 19:27:23 -0400 Subject: [PATCH 04/30] adding ucc cyton --- cpp/src/cylon/net/mpi/mpi_communicator.cpp | 3 +- cpp/src/cylon/net/ucx/ucx_communicator.cpp | 26 ++++++------- cpp/src/cylon/net/ucx/ucx_communicator.hpp | 5 ++- python/pycylon/pycylon/api/lib.pxd | 5 +++ python/pycylon/pycylon/api/lib.pyx | 10 +++++ python/pycylon/pycylon/ctx/context.pyx | 7 ++++ python/pycylon/pycylon/net/ucx_config.pxd | 31 +++++++++++++++ python/pycylon/pycylon/net/ucx_config.pyx | 28 ++++++++++++++ python/pycylon/setup.py | 1 + python/pycylon/test/test_ucx_mpi.py | 44 ++++++++++++++++++++++ 10 files changed, 144 insertions(+), 16 deletions(-) create mode 100644 python/pycylon/pycylon/net/ucx_config.pxd create mode 100644 python/pycylon/pycylon/net/ucx_config.pyx create mode 100644 python/pycylon/test/test_ucx_mpi.py diff --git a/cpp/src/cylon/net/mpi/mpi_communicator.cpp b/cpp/src/cylon/net/mpi/mpi_communicator.cpp index ba2b5768e..5a723e346 100644 --- a/cpp/src/cylon/net/mpi/mpi_communicator.cpp +++ b/cpp/src/cylon/net/mpi/mpi_communicator.cpp @@ -91,12 +91,13 @@ Status MPICommunicator::Make(const std::shared_ptr &config, void MPICommunicator::Finalize() { // finalize only if we initialized MPI - if (!externally_init) { + if (!externally_init && !IsFinalized()) { int finalized; MPI_Finalized(&finalized); if (!finalized) { MPI_Finalize(); } + finalized = true; } } void MPICommunicator::Barrier() { diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index 964d76efb..677736dec 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -89,7 +89,8 @@ Status UCXCommunicator::AllReduce(const std::shared_ptr &column, return {Code::NotImplemented, "Allreduce not implemented for ucx"}; } -UCXCommunicator::UCXCommunicator(MemoryPool *pool) : Communicator(pool, -1, -1) {} +UCXCommunicator::UCXCommunicator(MemoryPool *pool, bool externally_init) + : Communicator(pool, -1, -1), externally_init(externally_init) {} Status UCXCommunicator::AllReduce(const std::shared_ptr &values, net::ReduceOp reduce_op, @@ -117,10 +118,16 @@ Status UCXCommunicator::Allgather(const std::shared_ptr &value, Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out) { CYLON_UNUSED(config); - *out = std::make_shared(pool); - auto &comm = static_cast(**out); - // Check init functions + // MPI init int initialized; + MPI_Initialized(&initialized); + if (!initialized) { + RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); + } + + *out = std::make_shared(pool, initialized); + auto &comm = static_cast(**out); + // Int variable used when iterating int sIndx; // Address of the UCP Worker for receiving @@ -133,12 +140,6 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo // Variable to hold the current ucp address ucp_address_t *address; - // MPI init - MPI_Initialized(&initialized); - if (!initialized) { - RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); - } - // Get the rank for checking send to self, and initializations MPI_Comm_rank(MPI_COMM_WORLD, &comm.rank); MPI_Comm_size(MPI_COMM_WORLD, &comm.world_size); @@ -205,7 +206,7 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo } void UCXCommunicator::Finalize() { - if (!this->IsFinalized()) { + if (!externally_init && !IsFinalized()) { ucp_cleanup(ucpContext); mpi_check_and_finalize(); finalized = true; @@ -329,8 +330,7 @@ void UCXUCCCommunicator::Finalize() { } } ucc_context_destroy(uccContext); - mpi_check_and_finalize(); - ucx_comm_->Finalize(); + ucx_comm_->Finalize(); // this will handle MPI_Finalize finalized = true; } } diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.hpp b/cpp/src/cylon/net/ucx/ucx_communicator.hpp index 57ba07578..74abb5200 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.hpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.hpp @@ -29,15 +29,15 @@ namespace cylon { namespace net { class UCXConfig : public CommConfig { + public: CommType Type() override; - public: static std::shared_ptr Make(); }; class UCXCommunicator : public Communicator { public: - explicit UCXCommunicator(MemoryPool *pool); + explicit UCXCommunicator(MemoryPool *pool, bool externally_init); ~UCXCommunicator() override = default; std::unique_ptr CreateChannel() const override; @@ -78,6 +78,7 @@ class UCXCommunicator : public Communicator { std::unordered_map endPointMap; // UCP Context - Holds a UCP communication instance's global information. ucp_context_h ucpContext{}; + bool externally_init = false; }; #ifdef BUILD_CYLON_UCC diff --git a/python/pycylon/pycylon/api/lib.pxd b/python/pycylon/pycylon/api/lib.pxd index ad6368bea..97b4a3857 100644 --- a/python/pycylon/pycylon/api/lib.pxd +++ b/python/pycylon/pycylon/api/lib.pxd @@ -25,6 +25,8 @@ from pycylon.net.comm_config cimport CCommConfig from pycylon.net.mpi_config cimport CMPIConfig IF CYTHON_GLOO: from pycylon.net.gloo_config cimport CGlooConfig +IF CYTHON_UCX & CYTHON_UCC: + from pycylon.net.ucx_config cimport CUCXConfig from pycylon.io.csv_read_config cimport CCSVReadOptions from pycylon.io.csv_read_config import CSVReadOptions from pycylon.io.csv_read_config cimport CSVReadOptions @@ -59,6 +61,9 @@ cdef api shared_ptr[CMPIConfig] pycylon_unwrap_mpi_config(object config) IF CYTHON_GLOO: cdef api shared_ptr[CGlooConfig] pycylon_unwrap_gloo_config(object config) +IF CYTHON_UCX & CYTHON_UCC: + cdef api shared_ptr[CUCXConfig] pycylon_unwrap_ucx_config(object config) + cdef api shared_ptr[CTable] pycylon_unwrap_table(object table) cdef api shared_ptr[CDataType] pycylon_unwrap_data_type(object data_type) diff --git a/python/pycylon/pycylon/api/lib.pyx b/python/pycylon/pycylon/api/lib.pyx index a1130f1d3..8b69f0c59 100644 --- a/python/pycylon/pycylon/api/lib.pyx +++ b/python/pycylon/pycylon/api/lib.pyx @@ -29,6 +29,9 @@ from pycylon.net.mpi_config cimport MPIConfig IF CYTHON_GLOO: from pycylon.net.gloo_config import GlooMPIConfig, GlooStandaloneConfig from pycylon.net.gloo_config cimport CGlooConfig, GlooMPIConfig, GlooStandaloneConfig +IF CYTHON_UCX & CYTHON_UCC: + from pycylon.net.ucx_config import UCXConfig + from pycylon.net.ucx_config cimport CUCXConfig, UCXConfig from pycylon.io.csv_read_config cimport CCSVReadOptions from pycylon.io.csv_read_config import CSVReadOptions from pycylon.io.csv_read_config cimport CSVReadOptions @@ -121,6 +124,13 @@ IF CYTHON_GLOO: else: raise ValueError('Passed object is not an instance of GlooConfig') +IF CYTHON_UCX & CYTHON_UCC: + cdef api shared_ptr[CUCXConfig] pycylon_unwrap_ucx_config(object config): + if isinstance(config, UCXConfig): + return ( config).ucx_config_shd_ptr + else: + raise ValueError('Passed object is not an instance of UcxConfig') + cdef api CCSVReadOptions pycylon_unwrap_csv_read_options(object csv_read_options): cdef CSVReadOptions csvrdopt if pyclon_is_csv_read_options(csv_read_options): diff --git a/python/pycylon/pycylon/ctx/context.pyx b/python/pycylon/pycylon/ctx/context.pyx index f3e9b71c6..c993947ee 100644 --- a/python/pycylon/pycylon/ctx/context.pyx +++ b/python/pycylon/pycylon/ctx/context.pyx @@ -20,6 +20,9 @@ from pycylon.ctx.context cimport CCylonContext from pycylon.api.lib cimport pycylon_unwrap_mpi_config IF CYTHON_GLOO: from pycylon.api.lib cimport pycylon_unwrap_gloo_config + +IF CYTHON_UCX & CYTHON_UCC: + from pycylon.api.lib cimport pycylon_unwrap_ucx_config from pycylon.net import CommType from pycylon.net.mpi_config cimport CMPIConfig from pycylon.net.mpi_config import MPIConfig @@ -82,6 +85,10 @@ cdef class CylonContext: if config.comm_type == CommType.GLOO: return pycylon_unwrap_gloo_config(config) + IF CYTHON_UCX & CYTHON_UCC: + if config.comm_type == CommType.UCX: + return pycylon_unwrap_ucx_config(config) + raise ValueError(f"Unsupported distributed comm config {config}") def get_rank(self) -> int: diff --git a/python/pycylon/pycylon/net/ucx_config.pxd b/python/pycylon/pycylon/net/ucx_config.pxd new file mode 100644 index 000000000..a6f1b36c8 --- /dev/null +++ b/python/pycylon/pycylon/net/ucx_config.pxd @@ -0,0 +1,31 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +IF CYTHON_UCX & CYTHON_UCC: + from libcpp.memory cimport shared_ptr + + from pycylon.net.comm_type cimport CCommType + from pycylon.net.comm_config cimport CommConfig + + cdef extern from "../../../../cpp/src/cylon/net/ucx/ucx_communicator.hpp" namespace "cylon::net": + cdef cppclass CUCXConfig "cylon::net::UCXConfig": + CCommType Type() + + @ staticmethod + shared_ptr[CUCXConfig] Make(); + + + cdef class UCXConfig(CommConfig): + cdef: + shared_ptr[CUCXConfig] ucx_config_shd_ptr diff --git a/python/pycylon/pycylon/net/ucx_config.pyx b/python/pycylon/pycylon/net/ucx_config.pyx new file mode 100644 index 000000000..ed04440c3 --- /dev/null +++ b/python/pycylon/pycylon/net/ucx_config.pyx @@ -0,0 +1,28 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +IF CYTHON_UCX & CYTHON_UCC: + from pycylon.net.comm_config cimport CommConfig + from pycylon.net.ucx_config cimport CUCXConfig + + cdef class UCXConfig(CommConfig): + """ + GlooConfig Type mapping from libCylon to PyCylon + """ + def __cinit__(self): + self.ucx_config_shd_ptr = CUCXConfig.Make() + + @property + def comm_type(self): + return self.ucx_config_shd_ptr.get().Type() diff --git a/python/pycylon/setup.py b/python/pycylon/setup.py index 1cb302144..c5923a388 100644 --- a/python/pycylon/setup.py +++ b/python/pycylon/setup.py @@ -147,6 +147,7 @@ _include_dirs.append(os.path.join(UCC_PREFIX, 'include')) macros.append(('BUILD_CYLON_UCX', '1')) macros.append(('BUILD_CYLON_UCC', '1')) + compile_time_env['CYTHON_UCX'] = True compile_time_env['CYTHON_UCC'] = True print('Libraries :', libraries) diff --git a/python/pycylon/test/test_ucx_mpi.py b/python/pycylon/test/test_ucx_mpi.py new file mode 100644 index 000000000..dc1528f5c --- /dev/null +++ b/python/pycylon/test/test_ucx_mpi.py @@ -0,0 +1,44 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +""" +Run test: +>> mpirun -n 4 python -m pytest -q python/pycylon/test/test_ucx_mpi.py +""" + +import pandas as pd +from numpy.random import default_rng + +from pycylon import CylonEnv, DataFrame +from pycylon.net.ucx_config import UCXConfig + +ROWS = 5 + + +def test_ucx_mpi(): # confirms that the code is under main function + conf = UCXConfig() + env = CylonEnv(config=conf) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, ROWS, size=(ROWS, 2)) + data2 = rng.integers(0, ROWS, size=(ROWS, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + print("Distributed Merge") + df3 = df1.merge(right=df2, on=[0], env=env) + print(f'res len {len(df3)}') + + env.finalize() \ No newline at end of file From 3d86b11350e8ee85f1e75141a9957a8304ddd521 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Mon, 18 Jul 2022 22:00:15 -0400 Subject: [PATCH 05/30] Update setup.py --- python/pycylon/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pycylon/setup.py b/python/pycylon/setup.py index c5923a388..7e4149466 100644 --- a/python/pycylon/setup.py +++ b/python/pycylon/setup.py @@ -132,7 +132,7 @@ macros = [] # compile_time_env serves as preprocessor macros. ref: https://github.com/cython/cython/issues/2488 -compile_time_env = {'CYTHON_GLOO': False, 'CYTHON_UCC': False} +compile_time_env = {'CYTHON_GLOO': False, 'CYTHON_UCC': False, 'CYTHON_UCX': False} if CYLON_GLOO: libraries.append('gloo') library_directories.append(os.path.join(GLOO_PREFIX, 'lib')) From d8160eb39a60d3388696d732eba397fbaefd8ce6 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Mon, 18 Jul 2022 22:52:13 -0400 Subject: [PATCH 06/30] Update setup.py --- python/pygcylon/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pygcylon/setup.py b/python/pygcylon/setup.py index be66a2f4c..662d58b25 100644 --- a/python/pygcylon/setup.py +++ b/python/pygcylon/setup.py @@ -163,7 +163,7 @@ def build_extensions(self): packages = find_packages(include=["pygcylon", "pygcylon.*"]) -compile_time_env = {'CYTHON_GLOO': False, 'CYTHON_UCC': False} +compile_time_env = {'CYTHON_GLOO': False, 'CYTHON_UCC': False, 'CYTHON_UCX': False} setup( name="pygcylon", packages=packages, From b6d78143b9b574ac94debdf30d1a6a35de151525 Mon Sep 17 00:00:00 2001 From: niranda Date: Tue, 19 Jul 2022 15:28:04 -0400 Subject: [PATCH 07/30] adding ucc test --- build.py | 8 ++++++++ python/pycylon/test/test_all.py | 7 +++++++ 2 files changed, 15 insertions(+) diff --git a/build.py b/build.py index cf2eecfa6..91a9dde32 100644 --- a/build.py +++ b/build.py @@ -254,6 +254,14 @@ def python_test(): env['LD_LIBRARY_PATH'] = os.path.join(GLOO_PREFIX, "lib") + os.pathsep + \ env['LD_LIBRARY_PATH'] + + if CYLON_UCC: + env['CYLON_UCC'] = str(CYLON_UCC) + env['UCC_PREFIX'] = UCC_PREFIX + env['LD_LIBRARY_PATH'] = os.path.join(UCC_PREFIX, "lib") + os.pathsep + \ + os.path.join(UCC_PREFIX, "lib", "ucc") + os.pathsep + \ + env['LD_LIBRARY_PATH'] + elif OS_NAME == 'Darwin': if 'DYLD_LIBRARY_PATH' in env: env['DYLD_LIBRARY_PATH'] = str(Path(INSTALL_DIR, "lib")) + os.pathsep \ diff --git a/python/pycylon/test/test_all.py b/python/pycylon/test/test_all.py index e54f83e86..ca22bd508 100644 --- a/python/pycylon/test/test_all.py +++ b/python/pycylon/test/test_all.py @@ -287,6 +287,13 @@ def test_gloo_mpi(): f"{get_mpi_command()} -n 4 python -m pytest python/pycylon/test/test_gloo_mpi.py")) assert responses[-1] == 0 +if os.environ.get('CYLON_UCC') and os.environ.get('CYLON_UCX'): + def test_ucx_mpi(): + print("36. UCX MPI") + responses.append(os.system( + f"{get_mpi_command()} -n 4 python -m pytest python/pycylon/test/test_ucx_mpi.py")) + assert responses[-1] == 0 + def test_all(): ar = np.array(responses) From f144e91a5b6a0392378c3877d47a14402fdcbd96 Mon Sep 17 00:00:00 2001 From: niranda Date: Tue, 19 Jul 2022 21:41:20 -0400 Subject: [PATCH 08/30] adding multi env test --- python/pycylon/test/test_all.py | 12 +++- .../test/test_mpi_multiple_env_init.py | 55 +++++++++++++++++++ 2 files changed, 65 insertions(+), 2 deletions(-) create mode 100644 python/pycylon/test/test_mpi_multiple_env_init.py diff --git a/python/pycylon/test/test_all.py b/python/pycylon/test/test_all.py index ca22bd508..61cba082f 100644 --- a/python/pycylon/test/test_all.py +++ b/python/pycylon/test/test_all.py @@ -287,13 +287,21 @@ def test_gloo_mpi(): f"{get_mpi_command()} -n 4 python -m pytest python/pycylon/test/test_gloo_mpi.py")) assert responses[-1] == 0 -if os.environ.get('CYLON_UCC') and os.environ.get('CYLON_UCX'): +if os.environ.get('CYLON_UCC'): def test_ucx_mpi(): - print("36. UCX MPI") + print("37. UCX MPI") responses.append(os.system( f"{get_mpi_command()} -n 4 python -m pytest python/pycylon/test/test_ucx_mpi.py")) assert responses[-1] == 0 +if os.environ.get('CYLON_GLOO') and os.environ.get('CYLON_UCC'): + def test_mpi_multiple_env_init(): + print("38. Create and destroy multiple environments in MPI") + responses.append(os.system( + f"{get_mpi_command()} -n 4 python -m pytest " + f"python/pycylon/test/test_mpi_multiple_env_init.py")) + assert responses[-1] == 0 + def test_all(): ar = np.array(responses) diff --git a/python/pycylon/test/test_mpi_multiple_env_init.py b/python/pycylon/test/test_mpi_multiple_env_init.py new file mode 100644 index 000000000..76aff0e6a --- /dev/null +++ b/python/pycylon/test/test_mpi_multiple_env_init.py @@ -0,0 +1,55 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +""" +Run test +>> mpirun -n 2 python -m pytest --with-mpi -q python/pycylon/test/test_mpi_multiple_env_init.py +""" +import pandas as pd +import pytest +from mpi4py import MPI +from numpy.random import default_rng + +from pycylon import CylonEnv, DataFrame +from pycylon.net import MPIConfig +from pycylon.net.gloo_config import GlooMPIConfig +from pycylon.net.ucx_config import UCXConfig + + +def create_and_destroy_env(conf): + env = CylonEnv(config=conf) + print(f"{conf.comm_type} rank: {env.rank} world size: {env.world_size}") + r = 10 + + rng = default_rng() + data1 = rng.integers(0, r, size=(r, 2)) + data2 = rng.integers(0, r, size=(r, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + print(len(df1.merge(right=df2, on=[0], env=env))) + env.finalize() + + +@pytest.mark.mpi +def test_mpi_multiple_env_init(): + comm = MPI.COMM_WORLD + rank = comm.rank + world_sz = comm.size + print(f"mpi rank: {rank} world size: {world_sz}") + + create_and_destroy_env(MPIConfig()) + create_and_destroy_env(GlooMPIConfig()) + create_and_destroy_env(UCXConfig()) From 8caecc917c9e784ce42dc1a854887b8edec2ffe4 Mon Sep 17 00:00:00 2001 From: niranda Date: Mon, 25 Jul 2022 16:44:52 -0400 Subject: [PATCH 09/30] cosmetic changes --- cpp/src/cylon/table.cpp | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/cpp/src/cylon/table.cpp b/cpp/src/cylon/table.cpp index 07964dfc8..d018cf186 100644 --- a/cpp/src/cylon/table.cpp +++ b/cpp/src/cylon/table.cpp @@ -504,9 +504,8 @@ Status MergeSortedTable(const std::vector> &tables, const std::vector &sort_orders, std::shared_ptr &out) { std::shared_ptr
concatenated; - std::vector table_indices(tables.size()), - table_end_indices(tables.size()); - int acc = 0; + std::vector table_indices(tables.size()), table_end_indices(tables.size()); + int64_t acc = 0; for (size_t i = 0; i < table_indices.size(); i++) { table_indices[i] = acc; acc += tables[i]->Rows(); @@ -515,7 +514,7 @@ Status MergeSortedTable(const std::vector> &tables, RETURN_CYLON_STATUS_IF_FAILED(Merge(tables, concatenated)); - if(concatenated->GetContext()->GetWorldSize() > 4) { + if (concatenated->GetContext()->GetWorldSize() > 4) { return Sort(concatenated, sort_columns, out, sort_orders); } @@ -530,7 +529,7 @@ Status MergeSortedTable(const std::vector> &tables, std::priority_queue, decltype(comp)> pq(comp); - for (size_t i = 0; i < tables.size(); i++) { + for (int i = 0; i < (int) tables.size(); i++) { if (table_indices[i] < table_end_indices[i]) { pq.push(i); } @@ -541,13 +540,9 @@ Status MergeSortedTable(const std::vector> &tables, arrow::Int64Builder filter(pool); RETURN_CYLON_STATUS_IF_ARROW_FAILED(filter.Reserve(concatenated->Rows())); - std::vector temp_v; - while (!pq.empty()) { int t = pq.top(); pq.pop(); - // std::cout<> &tables, } CYLON_ASSIGN_OR_RAISE(auto take_arr, filter.Finish()); - CYLON_ASSIGN_OR_RAISE( - auto take_res, - (arrow::compute::Take(concatenated->get_table(), take_arr))); + CYLON_ASSIGN_OR_RAISE(auto take_res, + (arrow::compute::Take(concatenated->get_table(), take_arr))); out = std::make_shared
(ctx, take_res.table()); From 9afa79ba842b4dab4e3dff8ad38d15dabaf3b2e4 Mon Sep 17 00:00:00 2001 From: niranda Date: Mon, 25 Jul 2022 22:28:01 -0400 Subject: [PATCH 10/30] adding regular sampling cython --- python/pycylon/pycylon/api/lib.pxd | 4 ++-- python/pycylon/pycylon/api/lib.pyx | 6 +++--- python/pycylon/pycylon/data/table.pxd | 10 ++++++++-- python/pycylon/pycylon/data/table.pyx | 22 +++++++++++++++------- 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/python/pycylon/pycylon/api/lib.pxd b/python/pycylon/pycylon/api/lib.pxd index 97b4a3857..96cad6ca3 100644 --- a/python/pycylon/pycylon/api/lib.pxd +++ b/python/pycylon/pycylon/api/lib.pxd @@ -72,7 +72,7 @@ cdef api CCSVReadOptions pycylon_unwrap_csv_read_options(object csv_read_options cdef api CCSVWriteOptions pycylon_unwrap_csv_write_options(object csv_write_options) -cdef api CSortOptions * pycylon_unwrap_sort_options(object sort_options) +cdef api shared_ptr[CSortOptions] pycylon_unwrap_sort_options(object sort_options) cdef api shared_ptr[CBaseArrowIndex] pycylon_unwrap_base_arrow_index(object base_arrow_index) @@ -92,7 +92,7 @@ cdef api object pycylon_wrap_layout(const CLayout & layout) cdef api object pycylon_wrap_data_type(const shared_ptr[CDataType] & data_type) -cdef api object pycylon_wrap_sort_options(CSortOptions *sort_options) +cdef api object pycylon_wrap_sort_options(const shared_ptr[CSortOptions] &sort_options) cdef api object pycylon_wrap_base_arrow_index(const shared_ptr[CBaseArrowIndex] & base_arrow_index) diff --git a/python/pycylon/pycylon/api/lib.pyx b/python/pycylon/pycylon/api/lib.pyx index 8b69f0c59..67bec2700 100644 --- a/python/pycylon/pycylon/api/lib.pyx +++ b/python/pycylon/pycylon/api/lib.pyx @@ -155,13 +155,13 @@ cdef api shared_ptr[CDataType] pycylon_unwrap_data_type(object data_type): else: raise ValueError('Passed object is not an instance of DataType') -cdef api CSortOptions * pycylon_unwrap_sort_options(object sort_options): +cdef api shared_ptr[CSortOptions] pycylon_unwrap_sort_options(object sort_options): cdef SortOptions so if pyclon_is_sort_options(sort_options): so = sort_options return so.thisPtr else: - raise ValueError('Passed object is not an instance of DataType') + raise ValueError('Passed object is not an instance of SortOptions') cdef api shared_ptr[CBaseArrowIndex] pycylon_unwrap_base_arrow_index(object base_arrow_index): cdef BaseArrowIndex bi @@ -206,7 +206,7 @@ cdef api object pycylon_wrap_data_type(const shared_ptr[CDataType] & cdata_type) data_type.init(cdata_type) return data_type -cdef api object pycylon_wrap_sort_options(CSortOptions *csort_options): +cdef api object pycylon_wrap_sort_options(const shared_ptr[CSortOptions] &csort_options): cdef SortOptions sort_options = SortOptions.__new__(SortOptions) sort_options.init(csort_options) return sort_options diff --git a/python/pycylon/pycylon/data/table.pxd b/python/pycylon/pycylon/data/table.pxd index 228fae890..084454edf 100644 --- a/python/pycylon/pycylon/data/table.pxd +++ b/python/pycylon/pycylon/data/table.pxd @@ -127,16 +127,22 @@ cdef extern from "../../../../cpp/src/cylon/table.hpp" namespace "cylon": cdef extern from "../../../../cpp/src/cylon/table.hpp" namespace "cylon": cdef cppclass CSortOptions "cylon::SortOptions": + enum CSortMethod: + CREGULAR_SAMPLE 'cylon::SortOptions::SortMethod::REGULAR_SAMPLE', + CINITIAL_SAMPLE 'cylon::SortOptions::SortMethod::INITIAL_SAMPLE' + int num_bins long num_samples + CSortMethod sort_method + @ staticmethod CSortOptions Defaults() cdef class SortOptions: cdef: - CSortOptions *thisPtr - void init(self, CSortOptions *csort_options) + shared_ptr[CSortOptions] thisPtr + void init(self, const shared_ptr[CSortOptions] &csort_options) cdef class Table: cdef: diff --git a/python/pycylon/pycylon/data/table.pyx b/python/pycylon/pycylon/data/table.pyx index 356baf83e..9144ebbcc 100644 --- a/python/pycylon/pycylon/data/table.pyx +++ b/python/pycylon/pycylon/data/table.pyx @@ -471,7 +471,7 @@ cdef class Table: """ cdef shared_ptr[CTable] output - cdef CSortOptions *csort_options + cdef shared_ptr[CSortOptions] csort_options cdef vector[int] sort_index cdef vector[cpp_bool] order_directions @@ -507,7 +507,7 @@ cdef class Table: else: csort_options = pycylon_unwrap_sort_options(SortOptions(0, 0)) cdef CStatus status = DistributedSort(self.table_shd_ptr, sort_index, output, - order_directions, csort_options[0]) + order_directions, csort_options.get()[0]) if status.is_ok(): return pycylon_wrap_table(output) else: @@ -2728,7 +2728,7 @@ cdef class SortOptions: """ Sort Operations for Distributed Sort """ - def __cinit__(self, num_bins: int = 0, num_samples: int = 0): + def __cinit__(self, num_bins: int = 0, num_samples: int = 0, sampling: str = 'regular'): ''' Initializes the CSortOptions struct Args: @@ -2738,9 +2738,17 @@ cdef class SortOptions: Returns: None ''' - self.thisPtr = new CSortOptions() - self.thisPtr.num_bins = num_bins - self.thisPtr.num_samples = num_samples + self.thisPtr = make_shared[CSortOptions]() + self.thisPtr.get().num_bins = num_bins + self.thisPtr.get().num_samples = num_samples - cdef void init(self, CSortOptions *csort_options): + sampling = sampling.lower() + if sampling == 'regular': + self.thisPtr.get().sort_method = CSortOptions.CSortMethod.CREGULAR_SAMPLE + elif sampling == 'initial': + self.thisPtr.get().sort_method = CSortOptions.CSortMethod.CINITIAL_SAMPLE + else: + raise ValueError(f'unknown sampling method for sorting: {sampling}') + + cdef void init(self, const shared_ptr[CSortOptions] &csort_options): self.thisPtr = csort_options From 72b83a9084374aabaae48dc6342dd690f269ddfd Mon Sep 17 00:00:00 2001 From: niranda Date: Tue, 2 Aug 2022 21:26:06 -0400 Subject: [PATCH 11/30] adding UCC barrier --- cpp/src/cylon/net/ucx/ucx_communicator.cpp | 27 +++++++++++++++++++++- cpp/test/CMakeLists.txt | 3 +++ cpp/test/sync_comms_test.cpp | 13 +++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index 677736dec..0e1771a83 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -27,6 +27,8 @@ namespace cylon { namespace net { +static constexpr int kBarrierFlag = UINT32_MAX; + void mpi_check_and_finalize() { int mpi_finalized; MPI_Finalized(&mpi_finalized); @@ -336,7 +338,30 @@ void UCXUCCCommunicator::Finalize() { } void UCXUCCCommunicator::Barrier() { - return ucx_comm_->Barrier(); + ucc_coll_args_t args_; + ucc_coll_req_h req; + + args_.mask = 0; + args_.coll_type = UCC_COLL_TYPE_BARRIER; + + ucc_status_t status; + status = ucc_collective_init(&args_, &req, uccTeam); + assert(status == UCC_OK); + + status = ucc_collective_post(req); + assert(status == UCC_OK); + + status = ucc_collective_test(req); + while (status > UCC_OK) { // loop until status == 0 + status = ucc_context_progress(uccContext); + assert(status >= UCC_OK); // should be 0, 1 or 2 + + status = ucc_collective_test(req); + } + assert(status == UCC_OK); + + status = ucc_collective_finalize(req); + assert(status == UCC_OK); } Status UCXUCCCommunicator::AllGather(const std::shared_ptr
&table, diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index b709ec1c8..2db81f853 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -238,5 +238,8 @@ if (CYLON_UCX) cylon_run_test(aggregate_test 1 ucx) cylon_run_test(aggregate_test 2 ucx) cylon_run_test(aggregate_test 4 ucx) + + cylon_run_test(sync_comms_test 1 ucx) + cylon_run_test(sync_comms_test 4 ucx) endif (CYLON_UCC) endif (CYLON_UCX) \ No newline at end of file diff --git a/cpp/test/sync_comms_test.cpp b/cpp/test/sync_comms_test.cpp index 76911e1ca..33f1286e6 100644 --- a/cpp/test/sync_comms_test.cpp +++ b/cpp/test/sync_comms_test.cpp @@ -18,6 +18,14 @@ namespace cylon { namespace test { +TEST_CASE("barrier", "[sync comms]") { + srand((unsigned) time(nullptr)); + int i = (rand() % 3) + 1; + + sleep(i); + ctx->Barrier(); +} + enum GenType { Empty, Null, NonEmpty }; void generate_table(std::shared_ptr *schema, @@ -113,6 +121,11 @@ TEST_CASE("all gather table", "[sync comms]") { } TEST_CASE("gather table", "[sync comms]") { + // todo: UCC doesnt support gatherv for the moment + if (ctx->GetCommType() == net::UCX){ + return; + } + std::shared_ptr schema; std::shared_ptr in_table; generate_table(&schema, &in_table); From dc95f400e9dcb1fb62135320f97ca9f6306b323b Mon Sep 17 00:00:00 2001 From: niranda Date: Tue, 2 Aug 2022 21:34:56 -0400 Subject: [PATCH 12/30] adding macos11 tag for CI --- .github/workflows/macos.yml | 2 +- cpp/test/sync_comms_test.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index 24c6a49ad..775a10979 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -20,7 +20,7 @@ jobs: fail-fast: false matrix: include: - - os: macos-10.15 + - os: macos-11 steps: - uses: actions/checkout@v2 diff --git a/cpp/test/sync_comms_test.cpp b/cpp/test/sync_comms_test.cpp index 33f1286e6..a6d69caea 100644 --- a/cpp/test/sync_comms_test.cpp +++ b/cpp/test/sync_comms_test.cpp @@ -121,7 +121,7 @@ TEST_CASE("all gather table", "[sync comms]") { } TEST_CASE("gather table", "[sync comms]") { - // todo: UCC doesnt support gatherv for the moment + // todo: UCC doesnt support gatherv for the moment #599 if (ctx->GetCommType() == net::UCX){ return; } From 30ffb1a637cefbb4cbec36bd3a4cd1d3da556190 Mon Sep 17 00:00:00 2001 From: niranda Date: Tue, 2 Aug 2022 21:56:26 -0400 Subject: [PATCH 13/30] fixing windows error --- cpp/test/sync_comms_test.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/test/sync_comms_test.cpp b/cpp/test/sync_comms_test.cpp index a6d69caea..62d810cb6 100644 --- a/cpp/test/sync_comms_test.cpp +++ b/cpp/test/sync_comms_test.cpp @@ -12,6 +12,8 @@ * limitations under the License. */ #include +#include +#include #include "common/test_header.hpp" @@ -20,9 +22,9 @@ namespace test { TEST_CASE("barrier", "[sync comms]") { srand((unsigned) time(nullptr)); - int i = (rand() % 3) + 1; + int i = (rand() % 2000) + 1; - sleep(i); + std::this_thread::sleep_for(std::chrono::duration(i)); ctx->Barrier(); } From ef1af26f8b40b66b4e8aed12d7343585b31d1967 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Tue, 2 Aug 2022 23:32:10 -0400 Subject: [PATCH 14/30] trying to fix macos ci --- conda/environments/cylon_MacOS.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conda/environments/cylon_MacOS.yml b/conda/environments/cylon_MacOS.yml index 8af49b18a..e1c2a797d 100644 --- a/conda/environments/cylon_MacOS.yml +++ b/conda/environments/cylon_MacOS.yml @@ -12,7 +12,7 @@ dependencies: - numpy>=1.16 - pandas>=1.0 - fsspec - - setuptools>=40.0,<60.0 + - setuptools>=40.0 # they are not needed for using pygcylon or compiling it - pytest - pytest-mpi From cda5c2cd3354a6ddf05d6adfcf327c3e9f7fffc5 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Wed, 3 Aug 2022 12:41:04 -0400 Subject: [PATCH 15/30] trying to fix macos issue --- .github/workflows/macos.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index 775a10979..b4e43726a 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -20,7 +20,7 @@ jobs: fail-fast: false matrix: include: - - os: macos-11 + - os: macos-12 steps: - uses: actions/checkout@v2 @@ -41,4 +41,4 @@ jobs: run: python build.py --cpp --python --test - name: Run pytest - run: python build.py --pytest \ No newline at end of file + run: python build.py --pytest From e524415c64035f9172604a370db1b46f8f12080c Mon Sep 17 00:00:00 2001 From: niranda Date: Wed, 3 Aug 2022 15:09:02 -0400 Subject: [PATCH 16/30] Revert "trying to fix macos issue" This reverts commit cda5c2cd3354a6ddf05d6adfcf327c3e9f7fffc5. --- .github/workflows/macos.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index b4e43726a..775a10979 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -20,7 +20,7 @@ jobs: fail-fast: false matrix: include: - - os: macos-12 + - os: macos-11 steps: - uses: actions/checkout@v2 @@ -41,4 +41,4 @@ jobs: run: python build.py --cpp --python --test - name: Run pytest - run: python build.py --pytest + run: python build.py --pytest \ No newline at end of file From c31c7e5fb0735626621f6e7dbc34525b5e3ddd89 Mon Sep 17 00:00:00 2001 From: niranda Date: Wed, 3 Aug 2022 15:28:40 -0400 Subject: [PATCH 17/30] attempting to fix macos ci --- build.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.py b/build.py index 91a9dde32..71cd4c857 100644 --- a/build.py +++ b/build.py @@ -287,7 +287,7 @@ def build_python(): conda_prefix = check_conda_prefix() - python_build_command = f'{PYTHON_EXEC} setup.py install --force' + python_build_command = f'{PYTHON_EXEC} -m pip install -v --upgrade .' env = os.environ env["CYLON_PREFIX"] = str(BUILD_DIR) if os.name == 'posix': From 5e6780634c70c447f4e283bbc9d4eb68ba3e4466 Mon Sep 17 00:00:00 2001 From: niranda Date: Thu, 4 Aug 2022 11:40:37 -0400 Subject: [PATCH 18/30] style-check --- build.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/build.py b/build.py index 71cd4c857..8e88415bc 100644 --- a/build.py +++ b/build.py @@ -107,9 +107,6 @@ def on_off(arg): RUN_CPP_TESTS = args.test RUN_PYTHON_TESTS = args.pytest CMAKE_FLAGS = args.cmake_flags -CPPLINT_COMMAND = "\"-DCMAKE_CXX_CPPLINT=cpplint;--linelength=100;--headers=h," \ - "hpp;--filter=-legal/copyright,-build/c++11,-runtime/references\" " if \ - args.style_check else " " # arrow build expects /s even on windows BUILD_PYTHON = args.python @@ -127,6 +124,16 @@ def on_off(arg): PYTHON_EXEC = sys.executable +if args.style_check: + cmd = f'{PYTHON_EXEC} -m pip install cpplint' + res = subprocess.run(cmd, shell=True, cwd=PYTHON_SOURCE_DIR) + check_status(res.returncode, "cpplint install") + + CPPLINT_COMMAND = "-DCMAKE_CXX_CPPLINT=\"cpplint;--linelength=100;--headers=h," \ + "hpp;--filter=-legal/copyright,-build/c++11,-runtime/references\" " +else: + CPPLINT_COMMAND = " " + CMAKE_BOOL_FLAGS = {'CYLON_GLOO', 'CYLON_UCX', 'CYLON_UCC'} CMAKE_FALSE_OPTIONS = {'0', 'FALSE', 'OFF', 'N', 'NO', 'IGNORE', 'NOTFOUND'} From c46306436c697e6bb612a904a96ebf8fd9a0ddf1 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Thu, 11 Aug 2022 10:55:17 -0400 Subject: [PATCH 19/30] adding gloo timeout --- cpp/src/cylon/net/gloo/gloo_communicator.cpp | 14 +++++++++++++- cpp/src/cylon/net/gloo/gloo_communicator.hpp | 5 +++++ python/pycylon/pycylon/net/gloo_config.pxd | 1 + python/pycylon/pycylon/net/gloo_config.pyx | 6 ++++++ 4 files changed, 25 insertions(+), 1 deletion(-) diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.cpp b/cpp/src/cylon/net/gloo/gloo_communicator.cpp index 7af55b28f..12d880398 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.cpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.cpp @@ -57,6 +57,9 @@ Status GlooCommunicator::Make(const std::shared_ptr &config, } else { // MPI is not initialized. Ask gloo to initialize MPI gloo_ctx = gloo::mpi::Context::createManaged(); } + if (gloo_config->timeout_ != kTimoutNotSet) { + gloo_ctx->setTimeout(std::chrono::seconds(gloo_config->timeout_)); + } ((gloo::mpi::Context &) *gloo_ctx).connectFullMesh(dev); #else return {Code::Invalid, "Gloo does not contain mpi headers!"}; @@ -69,8 +72,10 @@ Status GlooCommunicator::Make(const std::shared_ptr &config, gloo_ctx = std::make_shared(gloo_config->rank_, gloo_config->world_size_); + if (gloo_config->timeout_ != kTimoutNotSet) { + gloo_ctx->setTimeout(std::chrono::seconds(gloo_config->timeout_)); + } ((gloo::rendezvous::Context &) *gloo_ctx).connectFullMesh(*prefix_store, dev); - } *out = std::make_shared(pool, std::move(gloo_ctx)); return Status::OK(); @@ -193,5 +198,12 @@ int GlooConfig::world_size() const { return world_size_; } +void GlooConfig::SetTimeout(int timeout) { + GlooConfig::timeout_ = std::chrono::seconds(timeout); +} +const std::chrono::seconds &GlooConfig::timeout() const { + return GlooConfig::timeout_; +} + } } diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.hpp b/cpp/src/cylon/net/gloo/gloo_communicator.hpp index ccfd85253..c7523e00a 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.hpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.hpp @@ -33,6 +33,8 @@ namespace net { class GlooCommunicator; +static constexpr std::chrono::seconds kTimoutNotSet(0); + class GlooConfig : public CommConfig { public: explicit GlooConfig(int rank = 0, int world_size = 1, bool use_mpi = false); @@ -43,7 +45,9 @@ class GlooConfig : public CommConfig { int rank() const; int world_size() const; + const std::chrono::seconds &timeout() const; + void SetTimeout(int timeout); void SetTcpHostname(const std::string &tcp_hostname); void SetTcpIface(const std::string &tcp_iface); void SetTcpAiFamily(int tcp_ai_family); @@ -63,6 +67,7 @@ class GlooConfig : public CommConfig { int rank_; int world_size_; bool use_mpi_; + std::chrono::seconds timeout_ = kTimoutNotSet; #ifdef GLOO_USE_MPI /* diff --git a/python/pycylon/pycylon/net/gloo_config.pxd b/python/pycylon/pycylon/net/gloo_config.pxd index dbec281cf..f24b68ae8 100644 --- a/python/pycylon/pycylon/net/gloo_config.pxd +++ b/python/pycylon/pycylon/net/gloo_config.pxd @@ -31,6 +31,7 @@ IF CYTHON_GLOO: void SetTcpAiFamily(int tcp_ai_family) void SetFileStorePath(const string & file_store_path) void SetStorePrefix(const string & store_prefix) + void SetTimeout(int timeout) @staticmethod shared_ptr[CGlooConfig] MakeWithMpi(MPI_Comm comm); diff --git a/python/pycylon/pycylon/net/gloo_config.pyx b/python/pycylon/pycylon/net/gloo_config.pyx index 755a55586..8a7534fe4 100644 --- a/python/pycylon/pycylon/net/gloo_config.pyx +++ b/python/pycylon/pycylon/net/gloo_config.pyx @@ -47,6 +47,9 @@ IF CYTHON_GLOO: def set_tcp_ai_family(self, ai_family: int): self.gloo_config_shd_ptr.get().SetTcpAiFamily(ai_family) + def set_timeout(self, timeout: int): + self.gloo_config_shd_ptr.get().SetTimeout(timeout) + cdef class GlooStandaloneConfig(CommConfig): """ GlooConfig Type mapping from libCylon to PyCylon @@ -82,3 +85,6 @@ IF CYTHON_GLOO: def set_store_prefix(self, prefix: str): self.gloo_config_shd_ptr.get().SetStorePrefix(prefix.encode()) + + def set_timeout(self, timeout: int): + self.gloo_config_shd_ptr.get().SetTimeout(timeout) \ No newline at end of file From ffa89eefdc208f62f39ee004439753374a091cc3 Mon Sep 17 00:00:00 2001 From: niranda Date: Mon, 15 Aug 2022 21:46:54 -0400 Subject: [PATCH 20/30] adding custom mpiexec cmake var --- build.py | 6 +++++- cpp/CMakeLists.txt | 4 ++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/build.py b/build.py index 8e88415bc..8506cf733 100644 --- a/build.py +++ b/build.py @@ -89,6 +89,8 @@ def check_conda_prefix(): parser.add_argument("-ipath", help='Install directory') parser.add_argument("--verbose", help='Set verbosity', default=False, action="store_true") +parser.add_argument("-j", help='Parallel build threads', default=os.cpu_count(), + dest='parallel', type=int) args = parser.parse_args() @@ -107,6 +109,7 @@ def on_off(arg): RUN_CPP_TESTS = args.test RUN_PYTHON_TESTS = args.pytest CMAKE_FLAGS = args.cmake_flags +PARALLEL = args.parallel # arrow build expects /s even on windows BUILD_PYTHON = args.python @@ -176,6 +179,7 @@ def print_line(): logger.info(f"Python exec : {PYTHON_EXEC}") logger.info(f"Build mode : {CPP_BUILD_MODE}") logger.info(f"Build path : {BUILD_DIR}") +logger.info(f"Build threads : {PARALLEL}") logger.info(f"Install path : {INSTALL_DIR}") logger.info(f"CMake flags : {CMAKE_FLAGS}") logger.info(f" -CYLON_GLOO : {CYLON_GLOO}") @@ -213,7 +217,7 @@ def build_cpp(): res = subprocess.call(cmake_command, cwd=BUILD_DIR, shell=True) check_status(res, "C++ cmake generate") - cmake_build_command = f'cmake --build . --parallel {os.cpu_count()} --config {CPP_BUILD_MODE}' + cmake_build_command = f'cmake --build . --parallel {PARALLEL} --config {CPP_BUILD_MODE}' logger.info(f"Build command: {cmake_build_command}") res = subprocess.call(cmake_build_command, cwd=BUILD_DIR, shell=True) check_status(res, "C++ cmake build") diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index d7337f0b4..9b6e7c2ec 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -155,6 +155,10 @@ message("Finding MPI") find_package(MPI REQUIRED COMPONENTS CXX) message(STATUS "MPI include dir: ${MPI_CXX_INCLUDE_PATH}") message(STATUS "MPI libs: ${MPI_CXX_LIBRARIES}") + +if (CYLON_CUSTOM_MPIRUN) + set(MPIEXEC_EXECUTABLE ${CYLON_CUSTOM_MPIRUN}) +endif (CYLON_CUSTOM_MPIRUN) message(STATUS "MPI executable: ${MPIEXEC_EXECUTABLE}") include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH}) From d6e0d696d531ef3656702f188d15cdd3ebee133c Mon Sep 17 00:00:00 2001 From: niranda Date: Mon, 22 Aug 2022 12:52:19 -0400 Subject: [PATCH 21/30] minor change --- cpp/src/cylon/net/gloo/gloo_communicator.cpp | 4 ++-- cpp/src/cylon/net/gloo/gloo_communicator.hpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.cpp b/cpp/src/cylon/net/gloo/gloo_communicator.cpp index 12d880398..a33ebe578 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.cpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.cpp @@ -58,7 +58,7 @@ Status GlooCommunicator::Make(const std::shared_ptr &config, gloo_ctx = gloo::mpi::Context::createManaged(); } if (gloo_config->timeout_ != kTimoutNotSet) { - gloo_ctx->setTimeout(std::chrono::seconds(gloo_config->timeout_)); + gloo_ctx->setTimeout(gloo_config->timeout_); } ((gloo::mpi::Context &) *gloo_ctx).connectFullMesh(dev); #else @@ -73,7 +73,7 @@ Status GlooCommunicator::Make(const std::shared_ptr &config, gloo_ctx = std::make_shared(gloo_config->rank_, gloo_config->world_size_); if (gloo_config->timeout_ != kTimoutNotSet) { - gloo_ctx->setTimeout(std::chrono::seconds(gloo_config->timeout_)); + gloo_ctx->setTimeout(gloo_config->timeout_); } ((gloo::rendezvous::Context &) *gloo_ctx).connectFullMesh(*prefix_store, dev); } diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.hpp b/cpp/src/cylon/net/gloo/gloo_communicator.hpp index c7523e00a..d17fe95ed 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.hpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.hpp @@ -33,7 +33,7 @@ namespace net { class GlooCommunicator; -static constexpr std::chrono::seconds kTimoutNotSet(0); +static constexpr std::chrono::seconds kTimoutNotSet = std::chrono::seconds::zero(); class GlooConfig : public CommConfig { public: From 832c4d98434a413217e60d012f3eaefee7f03dbd Mon Sep 17 00:00:00 2001 From: niranda Date: Mon, 22 Aug 2022 16:48:42 -0400 Subject: [PATCH 22/30] adding sort options --- python/pycylon/pycylon/frame.py | 136 ++++++++++++++------------------ 1 file changed, 59 insertions(+), 77 deletions(-) diff --git a/python/pycylon/pycylon/frame.py b/python/pycylon/pycylon/frame.py index bc275a76d..26c1e0ca8 100644 --- a/python/pycylon/pycylon/frame.py +++ b/python/pycylon/pycylon/frame.py @@ -86,6 +86,7 @@ def read_csv(filepath: str, use_threads=True, names=None, sep=",", block_size: i return DataFrame(table) + class CylonEnv(object): def __init__(self, config=None, distributed=True) -> None: @@ -117,6 +118,7 @@ def finalize(self): def barrier(self): self._context.barrier() + class GroupByDataFrame(object): def __init__(self, df: DataFrame, by=None, groupby_type: str = 'hash') -> None: super().__init__() @@ -126,7 +128,8 @@ def __init__(self, df: DataFrame, by=None, groupby_type: str = 'hash') -> None: self.groupby_type = groupby_type def __do_groupby(self, op_dict) -> DataFrame: - return DataFrame(self.df.to_table().groupby(self.by, op_dict, groupby_type=self.groupby_type)) + return DataFrame( + self.df.to_table().groupby(self.by, op_dict, groupby_type=self.groupby_type)) def __apply_on_remaining_columns(self, op: str) -> DataFrame: op_dict = {} @@ -299,7 +302,7 @@ def _initialize_dataframe(self, data=None, index=None, columns=None, copy=False, data = data.rename(columns=columns) return data elif not data: - return cn.Table.from_pydict(context, {}) + return cn.Table.from_pydict(context, {}) else: raise ValueError(f"Invalid data structure, {type(data)}") @@ -397,13 +400,13 @@ def equals(self, df: DataFrame, ordered=True, env: CylonEnv = None): df._change_context(env) return self._table.distributed_equals(df._table, ordered) - def repartition(self, rows_per_partition, receive_build_rank_order=None, env: CylonEnv=None): + def repartition(self, rows_per_partition, receive_build_rank_order=None, env: CylonEnv = None): if env != None: self._change_context(env) df = DataFrame(self._table.repartition(rows_per_partition, receive_build_rank_order)) return df - def evenly_partition(self, env: CylonEnv=None): + def evenly_partition(self, env: CylonEnv = None): if env != None: self._change_context(env) return self._table.evenly_partition() @@ -441,7 +444,7 @@ def iloc(self) -> PyLocIndexer: 1 3 6 """ return self._table.iloc - + def __len__(self): """ This operator returns number of rows in the df. @@ -459,7 +462,7 @@ def __len__(self): """ return self._table.row_count - + def set_index(self, key, indexing_type: IndexingType = IndexingType.LINEAR, drop: bool = False): """ Set index of the pycylon DataFrame @@ -484,8 +487,8 @@ def set_index(self, key, indexing_type: IndexingType = IndexingType.LINEAR, drop """ return self._table.set_index(key, indexing_type, drop) - - @property + + @property def index(self): """ Returns the index of the df. @@ -503,7 +506,7 @@ def index(self): """ return self._table.get_index() - + def get_hash_object(self, index=True, encoding="utf8", hash_key=None, categorize=True): """ Returns a data hash of the df. @@ -525,13 +528,14 @@ def get_hash_object(self, index=True, encoding="utf8", hash_key=None, categorize """ pdf = self.to_pandas() - hashed_series = pd.util.hash_pandas_object(pdf, index=index, encoding=encoding, hash_key=hash_key, categorize=categorize) - hashed_pdf = hashed_series.to_frame() - context = CylonContext(config=None, distributed=False) + hashed_series = pd.util.hash_pandas_object(pdf, index=index, encoding=encoding, + hash_key=hash_key, categorize=categorize) + hashed_pdf = hashed_series.to_frame() + context = CylonContext(config=None, distributed=False) return DataFrame(cn.Table.from_pandas(context, hashed_pdf)) - + @property - def values(self) -> np.ndarray: #pandas series.values and here should be same + def values(self) -> np.ndarray: # pandas series.values and here should be same """ Returns Numpy ndarray object representation of the df. @@ -553,13 +557,13 @@ def values(self) -> np.ndarray: #pandas series.values and here should be same if len(self.columns) >= 2: return self.to_numpy() else: - if len(self.columns)==0: - return np.empty(self.shape, dtype=float) + if len(self.columns) == 0: + return np.empty(self.shape, dtype=float) dict = self.to_dict() col = self.columns[0] return np.array(list(dict[col])) - + @property def dtypes(self): """ @@ -582,11 +586,11 @@ def dtypes(self): types = schema.types i = 0 for value in schema.names: - dict[value] = pa.DataType.to_pandas_dtype(types[i]) + dict[value] = pa.DataType.to_pandas_dtype(types[i]) i += 1 return dict - + def select_dtypes(self, include=None, exclude=None) -> DataFrame: """ Return a subset of the df's columns based on the column dtypes. @@ -648,9 +652,8 @@ def select_dtypes(self, include=None, exclude=None) -> DataFrame: include = (include,) if include is not None else () if not isinstance(exclude, (list, tuple)): exclude = (exclude,) if exclude is not None else () - + selection = (frozenset(include), frozenset(exclude)) - if not any(selection): raise ValueError("at least one of include or exclude must be nonempty") @@ -663,7 +666,8 @@ def select_dtypes(self, include=None, exclude=None) -> DataFrame: f"include and exclude overlap on {(include & exclude)}" ) - def extract_unique_dtypes_from_dtypes_set(dtypes_set: FrozenSet[np.generic], unique_dtypes: np.ndarray) -> List[ + def extract_unique_dtypes_from_dtypes_set(dtypes_set: FrozenSet[np.generic], + unique_dtypes: np.ndarray) -> List[ np.generic]: extracted_dtypes = [ unique_dtype @@ -679,15 +683,14 @@ def extract_unique_dtypes_from_dtypes_set(dtypes_set: FrozenSet[np.generic], uni ] return extracted_dtypes - #get unique dtypes from cylon df - set operation + # get unique dtypes from cylon df - set operation unique_set = set(self.dtypes.values()) unique_dtypes = np.array(list(unique_set), dtype=object) - if include: included_dtypes = extract_unique_dtypes_from_dtypes_set(include, unique_dtypes) - - extracted_columns=[ + + extracted_columns = [ column for column, dtype in self.dtypes.items() if dtype in included_dtypes @@ -699,7 +702,7 @@ def extract_unique_dtypes_from_dtypes_set(dtypes_set: FrozenSet[np.generic], uni pass else: excluded_dtypes = extract_unique_dtypes_from_dtypes_set(exclude, unique_dtypes) - + extracted_columns = [ column for column, dtype in self.dtypes.items() @@ -1624,66 +1627,43 @@ def reset_index( # type: ignore[misc] def sort_values( self, by, - axis=0, ascending=True, - inplace=False, - kind="quicksort", - na_position="last", - ignore_index=False, - key=None, - num_samples: int = 0, - num_bins: int = 0, - env: CylonEnv = None - ) -> DataFrame: + env: CylonEnv = None, + **sort_options) -> DataFrame: """ - Sort by the values along either axis. + Sort by the values along columns. Parameters ---------- - - axis : %(axes_single_arg)s, default 0 - Axis to be sorted. + by : str or list of str + Name or list of names to sort by. + if axis is 0 or ‘index’ then by may contain index levels and/or column labels. + if axis is 1 or ‘columns’ then by may contain column levels and/or index labels. ascending : bool or list of bool, default True Sort ascending vs. descending. Specify list for multiple sort orders. If this is a list of bools, must match the length of the by. - inplace(Unsupported) : bool, default False - If True, perform operation in-place. - kind(Unsupported) : {'quicksort', 'mergesort', 'heapsort', 'stable'}, default 'quicksort' - Choice of sorting algorithm. See also :func:`numpy.sort` for more - information. `mergesort` and `stable` are the only stable algorithms. For - DataFrames, this option is only applied when sorting on a single - column or label. - na_position(Unsupported) : {'first', 'last'}, default 'last' - Puts NaNs at the beginning if `first`; `last` puts NaNs at the - end. - ignore_index(Unsupported) : bool, default False - If True, the resulting axis will be labeled 0, 1, …, n - 1. - .. versionadded:: 1.0.0 - key(Unsupported) : callable, optional - Apply the key function to the values - before sorting. This is similar to the `key` argument in the - builtin :meth:`sorted` function, with the notable difference that - this `key` function should be *vectorized*. It should expect a - ``Series`` and return a Series with the same shape as the input. - It will be applied to each column in `by` independently. - .. versionadded:: 1.1.0 - num_samples: int, default 0 - Number of samples to determine key distribution. Only used in a distributed env. Need to pass a - deterministic value common to every process. If num_samples == 0, the value would be handled internally. - num_bins: int, default 0 - Number of bins in the histogram of the key distribution. Only used in a distributed env. Need to pass a - deterministic value common to every process. If num_bins == 0, the value would be handled internally. env: CylonEnv, default (None) - Execution environment used to distinguish between distributed and local operations. default None (local env) + Execution environment used to distinguish between distributed and local operations. + default None (local env) + sort_options: + Options to be passed to SortOptions class + ex: + sampling: str + Sampling algorithm. Choices {'initial', 'regular'} default 'initial' + num_samples: int, default 0 + Number of samples to determine key distribution. Only used in a distributed env. + Need to pass a deterministic value common to every process. If num_samples == 0, + the value would be handled internally. + num_bins: int, default 0 + Number of bins in the histogram of the key distribution. Only used in a distributed + env. Need to pass a deterministic value common to every process. If num_bins == 0, + the value would be handled internally. Returns ------- - DataFrame or None - DataFrame with sorted values or None if ``inplace=True``. - See Also - -------- - DataFrame.sort_index : Sort a DataFrame by the index. - Series.sort_values : Similar method for a Series. + DataFrame + DataFrame with sorted values + Examples -------- >>> df = DataFrame({ @@ -1731,7 +1711,7 @@ def sort_values( if env is None: return DataFrame(self._table.sort(order_by=by, ascending=ascending)) else: - sort_opts = SortOptions(num_bins=num_bins, num_samples=num_samples) + sort_opts = SortOptions(**sort_options) return DataFrame( self._change_context(env)._table.distributed_sort(order_by=by, ascending=ascending, sort_options=sort_opts)) @@ -2162,7 +2142,8 @@ def drop_duplicates(self, subset: Optional[Union[Hashable, Sequence[Hashable]]] return DataFrame(self._change_context(env)._table.distributed_unique(columns=subset, inplace=inplace)) - def groupby(self, by: Union[int, str, List], groupby_type="hash", env: CylonEnv = None) -> GroupByDataFrame: + def groupby(self, by: Union[int, str, List], groupby_type="hash", + env: CylonEnv = None) -> GroupByDataFrame: """ A groupby operation involves some combination of splitting the object, applying a function, and combining the results. This can be used to group large amounts of data and compute operations on these groups. @@ -2304,6 +2285,7 @@ def applymap(self, func, na_action=None) -> DataFrame: """ return DataFrame(self._table.applymap(func)) + # -------------------- staticmethods --------------------------- def concat(objs: Union[Iterable["DataFrame"]], axis=0, join="outer", From b9698e3ef0b91d80717e51b2bc75c8ff79170b59 Mon Sep 17 00:00:00 2001 From: niranda Date: Tue, 23 Aug 2022 00:03:15 -0400 Subject: [PATCH 23/30] fixing dist sort empty table bug --- .../cylon/arrow/arrow_partition_kernels.cpp | 91 ++++++++++++------- cpp/src/cylon/net/gloo/gloo_operations.cpp | 84 ++++++++++++++++- cpp/test/CMakeLists.txt | 18 ++-- cpp/test/dist_sort_test.cpp | 40 +++++--- 4 files changed, 179 insertions(+), 54 deletions(-) diff --git a/cpp/src/cylon/arrow/arrow_partition_kernels.cpp b/cpp/src/cylon/arrow/arrow_partition_kernels.cpp index 11b4a353f..4e734b026 100644 --- a/cpp/src/cylon/arrow/arrow_partition_kernels.cpp +++ b/cpp/src/cylon/arrow/arrow_partition_kernels.cpp @@ -482,7 +482,8 @@ class RangePartitionKernel : public PartitionKernel { idx_col, [&](uint64_t global_idx, ValueT val) { uint32_t p = ascending ? - bin_to_partition[get_bin_pos(val)] : num_partitions - 1 - bin_to_partition[get_bin_pos(val)]; + bin_to_partition[get_bin_pos(val)] : num_partitions - 1 + - bin_to_partition[get_bin_pos(val)]; target_partitions[global_idx] = p; partition_histogram[p]++; }, @@ -492,69 +493,95 @@ class RangePartitionKernel : public PartitionKernel { } private: - inline Status build_bin_to_partition(const std::shared_ptr &idx_col, uint32_t num_partitions) { + inline Status build_bin_to_partition(const std::shared_ptr &idx_col, + uint32_t num_partitions) { const std::shared_ptr &data_type = tarrow::ToCylonType(idx_col->type()); std::shared_ptr sampled_array; - if ((uint64_t) idx_col->length() - == num_samples) { // if len == num_samples, dont sample, just use idx col as it is! + // if len == num_samples, dont sample, just use idx col as it is! + if ((uint64_t) idx_col->length() == num_samples) { sampled_array = std::make_shared(idx_col->chunks()); } else { // else, sample idx_col for num_samples std::shared_ptr samples; - RETURN_CYLON_STATUS_IF_ARROW_FAILED(util::SampleArray(idx_col, num_samples, samples, ToArrowPool(ctx))); - sampled_array = std::make_shared(samples); + RETURN_CYLON_STATUS_IF_ARROW_FAILED(util::SampleArray(idx_col, + num_samples, + samples, + ToArrowPool(ctx))); + sampled_array = std::make_shared(std::move(samples)); } + const auto &comm = ctx->GetCommunicator(); + // calculate minmax of the sample - std::shared_ptr minmax; - RETURN_CYLON_STATUS_IF_FAILED(compute::MinMax(ctx, sampled_array, data_type, minmax)); + // todo: Allreduce MIN {min, -max} array as an optimization. Bool could be a problem. + // might need some template tricks + + std::shared_ptr min_scalar, max_scalar; + if (sampled_array->length() == 0) { + // if no samples present, set min and max to opposite ends so that Allreduce will determine the + // correct value + min_scalar = Scalar::Make(arrow::MakeScalar(std::numeric_limits::max())); + max_scalar = Scalar::Make(arrow::MakeScalar(std::numeric_limits::min()));; + } else { + arrow::compute::ScalarAggregateOptions opts(true, 0); + CYLON_ASSIGN_OR_RAISE(const auto &min_max, arrow::compute::MinMax(sampled_array, opts)) + const auto &min_max_v = min_max.scalar_as().value; + min_scalar = Scalar::Make(min_max_v[0]); + max_scalar = Scalar::Make(min_max_v[1]); + } - const auto &struct_scalar = minmax->GetResult().scalar_as(); - min = std::static_pointer_cast(struct_scalar.value[0])->value; - max = std::static_pointer_cast(struct_scalar.value[1])->value; + std::shared_ptr global_min_scalar, global_max_scalar; + RETURN_CYLON_STATUS_IF_FAILED(comm->AllReduce(min_scalar, net::MIN, &global_min_scalar)); + RETURN_CYLON_STATUS_IF_FAILED(comm->AllReduce(max_scalar, net::MAX, &global_max_scalar)); + + min = std::static_pointer_cast(global_min_scalar->data())->value; + max = std::static_pointer_cast(global_max_scalar->data())->value; range = max - min; // create sample histogram std::vector local_counts(num_bins + 2, 0); for (const auto &arr: sampled_array->chunks()) { - const std::shared_ptr &casted_arr = std::static_pointer_cast(arr); - for (int64_t i = 0; i < casted_arr->length(); i++) { - ValueT val = casted_arr->Value(i); - local_counts[get_bin_pos(val)]++; - } + arrow::VisitArrayDataInline(*arr->data(), + [&](ValueT val) { + local_counts[get_bin_pos(val)]++; + }, + []() { + // idx_col is guaranteed to be non-null + }); } // all reduce local sample histograms - std::vector global_counts, *global_counts_ptr; + const uint64_t *global_counts_ptr; + std::shared_ptr global_counts; if (ctx->GetWorldSize() > 1) { // if distributed, all-reduce all local bin counts - global_counts.resize(num_bins + 2, 0); - RETURN_CYLON_STATUS_IF_FAILED(cylon::mpi::AllReduce(ctx, - local_counts.data(), - global_counts.data(), - num_bins + 2, - cylon::UInt64(), - cylon::net::SUM)); - global_counts_ptr = &global_counts; - local_counts.clear(); + arrow::BufferVector buf{nullptr, arrow::Buffer::Wrap(local_counts)}; + const auto + &array_data = arrow::ArrayData::Make(arrow::uint64(), num_bins + 2, std::move(buf), 0); + auto local_counts_col = Column::Make(arrow::MakeArray(array_data)); + + RETURN_CYLON_STATUS_IF_FAILED(comm->AllReduce(local_counts_col, net::SUM, &global_counts)); + global_counts_ptr = + std::static_pointer_cast(global_counts->data())->raw_values(); } else { // else, just use local bin counts - global_counts_ptr = &local_counts; + global_counts_ptr = local_counts.data(); } float_t quantile = float(1.0 / num_partitions), prefix_sum = 0; - LOG(INFO) << "len=" << idx_col->length() << " min=" << min << " max=" << max << " range=" << - range << " num bins=" << num_bins << " quantile=" << quantile; +// LOG(INFO) << "len=" << idx_col->length() << " min=" << min << " max=" << max << " range=" << +// range << " num bins=" << num_bins << " quantile=" << quantile; // divide global histogram into quantiles const uint64_t total_samples = ctx->GetWorldSize() * num_samples; uint32_t curr_partition = 0; float_t target_quantile = quantile; - for (const auto &c: *global_counts_ptr) { + for (uint32_t i = 0; i < num_bins + 2; i++) { bin_to_partition.push_back(curr_partition); - float_t freq = (float_t) c / total_samples; + float_t freq = (float_t) global_counts_ptr[i] / total_samples; prefix_sum += freq; if (prefix_sum > target_quantile) { - curr_partition += (curr_partition < num_partitions - 1); // if curr_partition < numpartition: curr_partition++ + curr_partition += (curr_partition + < num_partitions - 1); // if curr_partition < numpartition: curr_partition++ target_quantile += quantile; } } diff --git a/cpp/src/cylon/net/gloo/gloo_operations.cpp b/cpp/src/cylon/net/gloo/gloo_operations.cpp index c6dd6226c..30c1755c3 100644 --- a/cpp/src/cylon/net/gloo/gloo_operations.cpp +++ b/cpp/src/cylon/net/gloo/gloo_operations.cpp @@ -168,6 +168,75 @@ Status GlooTableBcastImpl::WaitAll(int32_t num_buffers) { return Status::OK(); } +template +void land(void *c_, const void *a_, const void *b_, size_t n) { + T *c = static_cast(c_); + const T *a = static_cast(a_); + const T *b = static_cast(b_); + for (size_t i = 0; i < n; i++) { + c[i] = a[i] && b[i]; + } +} + +template +void lor(void *c_, const void *a_, const void *b_, size_t n) { + T *c = static_cast(c_); + const T *a = static_cast(a_); + const T *b = static_cast(b_); + for (size_t i = 0; i < n; i++) { + c[i] = a[i] || b[i]; + } +} + +template +struct band_impl {}; + +template +struct band_impl::value>::type> { + static void impl(void *c_, const void *a_, const void *b_, size_t n) { + T *c = static_cast(c_); + const T *a = static_cast(a_); + const T *b = static_cast(b_); + for (size_t i = 0; i < n; i++) { + c[i] = a[i] & b[i]; + } + } + + static gloo::AllreduceOptions::Func band() { + return &impl; + } +}; + +template +struct band_impl::value>::type> { + static gloo::AllreduceOptions::Func band() { return nullptr; } +}; + + +template +struct bor_impl {}; + +template +struct bor_impl::value>::type> { + static void impl(void *c_, const void *a_, const void *b_, size_t n) { + T *c = static_cast(c_); + const T *a = static_cast(a_); + const T *b = static_cast(b_); + for (size_t i = 0; i < n; i++) { + c[i] = a[i] | b[i]; + } + } + + static gloo::AllreduceOptions::Func bor() { + return &impl; + } +}; + +template +struct bor_impl::value>::type> { + static gloo::AllreduceOptions::Func bor() { return nullptr; } +}; + template gloo::AllreduceOptions::Func get_reduce_func(ReduceOp op) { void (*func)(void *, const void *, const void *, size_t); @@ -180,10 +249,10 @@ gloo::AllreduceOptions::Func get_reduce_func(ReduceOp op) { return func; case PROD:func = &gloo::product; return func; - case LAND: - case LOR: - case BAND: - case BOR:return nullptr; + case LAND:return &land; + case LOR:return &lor; + case BAND:return band_impl::band(); + case BOR:return bor_impl::bor(); } return nullptr; } @@ -195,7 +264,12 @@ Status all_reduce_buffer(const std::shared_ptr &ctx, int count, ReduceOp reduce_op) { gloo::AllreduceOptions opts(ctx); - opts.setReduceFunction(get_reduce_func(reduce_op)); + + auto func = get_reduce_func(reduce_op); + if (func == nullptr) { + return {Code::Invalid, "Unsupported reduction operator " + std::to_string(reduce_op)}; + } + opts.setReduceFunction(func); opts.template setInput(const_cast((const T *) send_buf), count); opts.template setOutput((T *) rcv_buf, count); diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index b80d89c17..34a35d7f4 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -220,12 +220,12 @@ if (CYLON_GLOO) cylon_run_test(table_op_test 1 gloo-mpi) cylon_run_test(table_op_test 2 gloo-mpi) cylon_run_test(table_op_test 4 gloo-mpi) - # - # cylon_run_test(equal_test 1 gloo-mpi) - # cylon_run_test(equal_test 2 gloo-mpi) - # - # cylon_run_test(repartition_test 1 gloo-mpi) - # cylon_run_test(repartition_test 4 gloo-mpi) + + cylon_run_test(equal_test 1 gloo-mpi) + cylon_run_test(equal_test 2 gloo-mpi) + + cylon_run_test(repartition_test 1 gloo-mpi) + cylon_run_test(repartition_test 4 gloo-mpi) cylon_run_test(aggregate_test 1 gloo-mpi) cylon_run_test(aggregate_test 2 gloo-mpi) @@ -233,6 +233,9 @@ if (CYLON_GLOO) cylon_run_test(sync_comms_test 1 gloo-mpi) cylon_run_test(sync_comms_test 4 gloo-mpi) + + cylon_run_test(dist_sort_test 1 gloo-mpi) + cylon_run_test(dist_sort_test 4 gloo-mpi) endif (CYLON_GLOO) #### ucc tests @@ -254,5 +257,8 @@ if (CYLON_UCX) cylon_run_test(sync_comms_test 1 ucx) cylon_run_test(sync_comms_test 4 ucx) + +# cylon_run_test(dist_sort_test 1 ucx) +# cylon_run_test(dist_sort_test 4 ucx) endif (CYLON_UCC) endif (CYLON_UCX) \ No newline at end of file diff --git a/cpp/test/dist_sort_test.cpp b/cpp/test/dist_sort_test.cpp index d59534350..78307b0be 100644 --- a/cpp/test/dist_sort_test.cpp +++ b/cpp/test/dist_sort_test.cpp @@ -30,8 +30,7 @@ void testDistSort(const std::vector& sort_cols, {0, 0, SortOptions::INITIAL_SAMPLE})); std::vector> gathered; - CHECK_CYLON_STATUS(ctx->GetCommunicator()->Gather(out, /*root*/0, /*gather_from_root*/true, - &gathered)); + CHECK_CYLON_STATUS(ctx->GetCommunicator()->AllGather(out, &gathered)); if (RANK == 0) { std::shared_ptr
exp, result; @@ -96,7 +95,6 @@ TEMPLATE_LIST_TEST_CASE("Dist sort testing", "[dist sort]", ArrowNumericTypes) { ctx, global_arrow_table->Slice(RANK * rows_per_tab, rows_per_tab), table1)); std::shared_ptr
global_table; - LOG(INFO) << "HERE!!!"; CHECK_CYLON_STATUS( Table::FromArrowTable(ctx, global_arrow_table, global_table)); @@ -118,8 +116,8 @@ TEMPLATE_LIST_TEST_CASE("Dist sort testing", "[dist sort]", ArrowNumericTypes) { auto pool = cylon::ToArrowPool(ctx); std::shared_ptr arrow_empty_table; - auto arrow_status = util::CreateEmptyTable(table1->get_table()->schema(), - &arrow_empty_table, pool); + REQUIRE(util::CreateEmptyTable(table1->get_table()->schema(), + &arrow_empty_table, pool).ok()); auto empty_table = std::make_shared
(ctx, arrow_empty_table); table1 = empty_table; } @@ -127,17 +125,37 @@ TEMPLATE_LIST_TEST_CASE("Dist sort testing", "[dist sort]", ArrowNumericTypes) { std::shared_ptr
out, out2; auto ctx = table1->GetContext(); std::shared_ptr arrow_output; - auto status = DistributedSort(table1, {1, 0}, out, {0, 0}); - REQUIRE(status.is_ok()); - status = DistributedSort(table1, {1, 0}, out2, {0, 0}, - {0, 0, SortOptions::INITIAL_SAMPLE}); - REQUIRE(status.is_ok()); + CHECK_CYLON_STATUS(DistributedSort(table1, {1, 0}, out, {0, 0})); + CHECK_CYLON_STATUS(DistributedSort(table1, {1, 0}, out2, {0, 0}, + {0, 0, SortOptions::INITIAL_SAMPLE})); bool eq; - status = DistributedEquals(out, out2, eq); + CHECK_CYLON_STATUS(DistributedEquals(out, out2, eq)); REQUIRE(eq); } } +TEMPLATE_LIST_TEST_CASE("Dist sort empty table", "[dist sort]", ArrowNumericTypes) { + auto type = default_type_instance(); + auto schema = arrow::schema({{arrow::field("a", type)}, + {arrow::field("b", arrow::float32())}}); + auto empty_table = TableFromJSON(schema, {R"([])"}); + std::shared_ptr
table1; + + CHECK_CYLON_STATUS(Table::FromArrowTable(ctx, empty_table, table1)); + bool eq; + + std::shared_ptr
out; + // todo: check this for regular sampling! https://github.com/cylondata/cylon/issues/611 +// CHECK_CYLON_STATUS(DistributedSort(table1, {1, 0}, out, {0, 0})); +// CHECK_CYLON_STATUS(DistributedEquals(table1, out, eq)); +// REQUIRE(eq); + + CHECK_CYLON_STATUS(DistributedSort(table1, {1, 0}, out, {0, 0}, + {0, 0, SortOptions::INITIAL_SAMPLE})); + CHECK_CYLON_STATUS(Equals(table1, out, eq)); + REQUIRE(eq); +} + TEST_CASE("Binary search testing", "[binary search]") { auto schema = arrow::schema({{arrow::field("a", arrow::uint32())}, {arrow::field("b", arrow::float32())}}); From ec8c09ad660ad73b525259321adceb8d3adff595 Mon Sep 17 00:00:00 2001 From: niranda Date: Tue, 23 Aug 2022 16:21:19 -0400 Subject: [PATCH 24/30] fixing regular sampling bug --- cpp/src/cylon/table.cpp | 6 +++--- cpp/src/cylon/util/macros.hpp | 9 ++++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/cpp/src/cylon/table.cpp b/cpp/src/cylon/table.cpp index 507f72f45..ae6d29cc6 100644 --- a/cpp/src/cylon/table.cpp +++ b/cpp/src/cylon/table.cpp @@ -472,13 +472,13 @@ Status SampleTableUniform(const std::shared_ptr
&local_sorted, return Status::OK(); } - float step = local_sorted->Rows() / (num_samples + 1.0); - float acc = step; + double step = local_sorted->Rows() / (num_samples + 1.0); + double acc = step; arrow::Int64Builder filter(pool); RETURN_CYLON_STATUS_IF_ARROW_FAILED(filter.Reserve(num_samples)); for (int i = 0; i < num_samples; i++) { - filter.UnsafeAppend(acc); + filter.UnsafeAppend((int64_t) acc); acc += step; } diff --git a/cpp/src/cylon/util/macros.hpp b/cpp/src/cylon/util/macros.hpp index ffa348629..4b9c2ed1c 100644 --- a/cpp/src/cylon/util/macros.hpp +++ b/cpp/src/cylon/util/macros.hpp @@ -31,7 +31,8 @@ do{ \ const auto& _st = (expr); \ if (!_st.ok()) { \ - return cylon::Status(static_cast(_st.code()), _st.message()); \ + return cylon::Status(static_cast(_st.code()), _st.message() \ + + "\n" + __FILE__ + ":" + std::to_string(__LINE__) ); \ }; \ } while (0) @@ -43,7 +44,8 @@ int _err_str_len = 0; \ MPI_Error_string(_st, _err_str, &_err_str_len); \ return cylon::Status(cylon::Code::ExecutionError, \ - std::string(_err_str, _err_str_len)); \ + std::string(_err_str, _err_str_len)\ + + "\n" + __FILE__ + ":" + std::to_string(__LINE__)); \ }; \ } while (0) @@ -53,7 +55,8 @@ if (_st < 0) { \ return cylon::Status( \ cylon::Code::ExecutionError, \ - "UCC Failed: " + std::string(ucc_status_string(_st))); \ + "UCC Failed: " + std::string(ucc_status_string(_st))\ + + "\n" + __FILE__ + ":" + std::to_string(__LINE__)); \ }; \ } while (0) From bc4d7556fe33de5b628386f33d6dd99020a028ed Mon Sep 17 00:00:00 2001 From: niranda Date: Sat, 27 Aug 2022 17:48:55 -0400 Subject: [PATCH 25/30] adding licence --- cpp/src/cylon/mapreduce/mapreduce.cpp | 16 +++++++++++++--- cpp/src/cylon/mapreduce/mapreduce.hpp | 16 +++++++++++++--- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/cpp/src/cylon/mapreduce/mapreduce.cpp b/cpp/src/cylon/mapreduce/mapreduce.cpp index ad2c94633..74bf7a206 100644 --- a/cpp/src/cylon/mapreduce/mapreduce.cpp +++ b/cpp/src/cylon/mapreduce/mapreduce.cpp @@ -1,6 +1,16 @@ -// -// Created by niranda on 11/22/21. -// +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #include #include "cylon/mapreduce/mapreduce.hpp" diff --git a/cpp/src/cylon/mapreduce/mapreduce.hpp b/cpp/src/cylon/mapreduce/mapreduce.hpp index ea342c682..9064a0806 100644 --- a/cpp/src/cylon/mapreduce/mapreduce.hpp +++ b/cpp/src/cylon/mapreduce/mapreduce.hpp @@ -1,6 +1,16 @@ -// -// Created by niranda on 11/22/21. -// +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #ifndef CYLON_CPP_SRC_CYLON_MAPREDUCE_MAPREDUCE_HPP_ #define CYLON_CPP_SRC_CYLON_MAPREDUCE_MAPREDUCE_HPP_ From ec28db27b91b64ddf4c16fbfab2fb8cefefc53c4 Mon Sep 17 00:00:00 2001 From: niranda Date: Sat, 27 Aug 2022 22:55:17 -0400 Subject: [PATCH 26/30] adding bench logging --- cpp/src/cylon/CMakeLists.txt | 11 ++-- cpp/src/cylon/table.cpp | 94 +++++++++++------------------------ cpp/src/cylon/util/macros.hpp | 17 +++++++ 3 files changed, 54 insertions(+), 68 deletions(-) diff --git a/cpp/src/cylon/CMakeLists.txt b/cpp/src/cylon/CMakeLists.txt index 332ea6f7d..637fb69b2 100644 --- a/cpp/src/cylon/CMakeLists.txt +++ b/cpp/src/cylon/CMakeLists.txt @@ -14,11 +14,16 @@ find_package(Threads REQUIRED) -IF(WIN32) +if (CYLON_BENCH) + message("Benchmark timings enabled") + add_definitions(-D_CYLON_BENCH) +endif () + +if (WIN32) set_source_files_properties(util/murmur3.cpp util/murmur3.hpp PROPERTIES) -ELSE() +else () set_source_files_properties(util/murmur3.cpp util/murmur3.hpp PROPERTIES COMPILE_FLAGS -Wno-implicit-fallthrough) -ENDIF() +endif (WIN32) if (CYLON_UCX) set(UCX_CYLON_FILES diff --git a/cpp/src/cylon/table.cpp b/cpp/src/cylon/table.cpp index ae6d29cc6..23f2cb058 100644 --- a/cpp/src/cylon/table.cpp +++ b/cpp/src/cylon/table.cpp @@ -37,41 +37,11 @@ #include #include #include -#include #include #include -#include namespace cylon { -/** - * creates an Arrow array based on col_idx, filtered by row_indices - * @param ctx - * @param table - * @param col_idx - * @param row_indices - * @param array_vector - * @return - */ -Status PrepareArray(std::shared_ptr &ctx, - const std::shared_ptr &table, const int32_t col_idx, - const std::vector &row_indices, arrow::ArrayVector &array_vector) { - std::shared_ptr destination_col_array; - arrow::Status ar_status = - cylon::util::copy_array_by_indices(row_indices, - cylon::util::GetChunkOrEmptyArray(table->column(col_idx), - 0), - &destination_col_array, - cylon::ToArrowPool(ctx)); - if (ar_status != arrow::Status::OK()) { - LOG(FATAL) << "Failed while copying a column to the final table from tables." - << ar_status.ToString(); - return Status(static_cast(ar_status.code()), ar_status.message()); - } - array_vector.push_back(destination_col_array); - return Status::OK(); -} - static inline Status all_to_all_arrow_tables(const std::shared_ptr &ctx, const std::shared_ptr &schema, const std::vector> &partitioned_tables, @@ -126,10 +96,7 @@ static inline Status all_to_all_arrow_tables(const std::shared_ptr partitioned_tables.clear();*/ // now we have the final set of tables -// LOG(INFO) << "Concatenating tables, Num of tables : " << received_tables.size(); CYLON_ASSIGN_OR_RAISE(auto concat, arrow::ConcatenateTables(received_tables)) -// LOG(INFO) << "Done concatenating tables, rows : " << concat->num_rows(); - CYLON_ASSIGN_OR_RAISE(table_out, concat->CombineChunks(cylon::ToArrowPool(ctx))) return Status::OK(); } @@ -194,9 +161,9 @@ static inline Status all_to_all_arrow_tables_separated_cylon_table(const std::sh all_to_all_arrow_tables_separated_arrow_table(ctx, schema, partitioned_tables, received_tables); table_out.reserve(received_tables.size() - 1); - for(size_t i = 0; i < received_tables.size(); i++) { - if(received_tables[i]->num_rows() > 0) { - CYLON_ASSIGN_OR_RAISE(auto arrow_tb, received_tables[i]->CombineChunks(cylon::ToArrowPool(ctx))); + for (auto &received_table: received_tables) { + if (received_table->num_rows() > 0) { + CYLON_ASSIGN_OR_RAISE(auto arrow_tb, received_table->CombineChunks(cylon::ToArrowPool(ctx))); auto temp = std::make_shared
(ctx, std::move(arrow_tb)); table_out.push_back(temp); } @@ -214,10 +181,7 @@ static inline Status all_to_all_arrow_tables_preserve_order(const std::shared_pt std::shared_ptr &table_out) { std::vector> tables; RETURN_CYLON_STATUS_IF_FAILED(all_to_all_arrow_tables_separated_arrow_table(ctx, schema, partitioned_tables, tables)); - LOG(INFO) << "Concatenating tables, Num of tables : " << tables.size(); CYLON_ASSIGN_OR_RAISE(table_out, arrow::ConcatenateTables(tables)); - LOG(INFO) << "Done concatenating tables, rows : " << table_out->num_rows(); - return Status::OK(); } @@ -255,23 +219,16 @@ static inline Status shuffle_two_tables_by_hashing(const std::shared_ptr &left_table_out, std::shared_ptr &right_table_out) { - LOG(INFO) << "Shuffling two tables with total rows : " - << left_table->Rows() + right_table->Rows(); - auto t1 = std::chrono::high_resolution_clock::now(); - RETURN_CYLON_STATUS_IF_FAILED( - shuffle_table_by_hashing(ctx, left_table, left_hash_column, left_table_out)); - - auto t2 = std::chrono::high_resolution_clock::now(); - LOG(INFO) << "Left shuffle time : " - << std::chrono::duration_cast(t2 - t1).count(); - - RETURN_CYLON_STATUS_IF_FAILED( - shuffle_table_by_hashing(ctx, right_table, right_hash_column, right_table_out)); - - auto t3 = std::chrono::high_resolution_clock::now(); - LOG(INFO) << "Right shuffle time : " - << std::chrono::duration_cast(t3 - t2).count(); - + CYLON_BENCH_TIMER(ctx, "shuffle_l", + RETURN_CYLON_STATUS_IF_FAILED(shuffle_table_by_hashing(ctx, + left_table, + left_hash_column, + left_table_out))); + CYLON_BENCH_TIMER(ctx, "shuffle_r", + RETURN_CYLON_STATUS_IF_FAILED(shuffle_table_by_hashing(ctx, + right_table, + right_hash_column, + right_table_out))); return Status::OK(); } @@ -930,17 +887,24 @@ Status DistributedJoin(const std::shared_ptr
&left, const std::shared_ptr } std::shared_ptr left_final_table, right_final_table; - RETURN_CYLON_STATUS_IF_FAILED(shuffle_two_tables_by_hashing(ctx, - left, - join_config.GetLeftColumnIdx(), - right, - join_config.GetRightColumnIdx(), - left_final_table, - right_final_table)); + CYLON_BENCH_TIMER(ctx, "shuffle_t", + RETURN_CYLON_STATUS_IF_FAILED(shuffle_two_tables_by_hashing(ctx, + left, + join_config + .GetLeftColumnIdx(), + right, + join_config + .GetRightColumnIdx(), + left_final_table, + right_final_table))); std::shared_ptr table; - RETURN_CYLON_STATUS_IF_FAILED(join::JoinTables(left_final_table, right_final_table, - join_config, &table, cylon::ToArrowPool(ctx))); + CYLON_BENCH_TIMER(ctx, "join", + RETURN_CYLON_STATUS_IF_FAILED(join::JoinTables(left_final_table, + right_final_table, + join_config, + &table, + cylon::ToArrowPool(ctx)))); return Table::FromArrowTable(ctx, std::move(table), out); } diff --git a/cpp/src/cylon/util/macros.hpp b/cpp/src/cylon/util/macros.hpp index 4b9c2ed1c..8d748fa89 100644 --- a/cpp/src/cylon/util/macros.hpp +++ b/cpp/src/cylon/util/macros.hpp @@ -99,4 +99,21 @@ #define CYLON_UNUSED(expr) do { (void)(expr); } while (0) +#ifdef _CYLON_BENCH +#define CYLON_BENCH_TIMER(ctx, tag, ...) \ + do{ \ + auto t1 = std::chrono::high_resolution_clock::now(); \ + __VA_ARGS__; \ + auto t2 = std::chrono::high_resolution_clock::now(); \ + if ((ctx)->GetRank() == 0) { \ + std::cout << "[BENCH] " << (tag) << " " \ + << std::chrono::duration_cast(t2 - t1).count() \ + << " ms\n"; \ + } \ +}while(0) +#else +#define CYLON_BENCH_TIMER(ctx, tag, ...) \ + __VA_ARGS__; +#endif + #endif //CYLON_CPP_SRC_CYLON_UTIL_MACROS_HPP_ From 79139f97fd2e268fea4ec5c5a06d5cb09986e541 Mon Sep 17 00:00:00 2001 From: niranda Date: Mon, 12 Sep 2022 11:46:25 -0400 Subject: [PATCH 27/30] checking mpi --- cpp/src/cylon/net/comm_operations.hpp | 1 + cpp/src/cylon/net/gloo/gloo_communicator.hpp | 1 + cpp/src/cylon/net/mpi/mpi_channel.cpp | 1 + cpp/src/cylon/net/mpi/mpi_channel.hpp | 1 + cpp/src/cylon/net/mpi/mpi_communicator.hpp | 1 + cpp/src/cylon/net/mpi/mpi_operations.hpp | 1 + cpp/src/cylon/net/mpi/mpi_type_traits.hpp | 1 + cpp/src/cylon/net/ops/gather.cpp | 1 + cpp/src/cylon/net/ucx/ucx_operations.hpp | 1 + cpp/src/examples/ucc_allgather_vector_example.cpp | 1 + cpp/src/examples/ucc_example.cpp | 1 + cpp/test/common/test_header.hpp | 1 + cpp/test/custom_mpi_comm_test.cpp | 1 + 13 files changed, 13 insertions(+) diff --git a/cpp/src/cylon/net/comm_operations.hpp b/cpp/src/cylon/net/comm_operations.hpp index 581f1bc4b..b26dec56b 100644 --- a/cpp/src/cylon/net/comm_operations.hpp +++ b/cpp/src/cylon/net/comm_operations.hpp @@ -16,6 +16,7 @@ #define CYLON_CPP_SRC_CYLON_NET_COMM_OPERATIONS_HPP_ #include +#define MPI_Isend MPI_Issend #include namespace cylon { diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.hpp b/cpp/src/cylon/net/gloo/gloo_communicator.hpp index d17fe95ed..a0042bc20 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.hpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.hpp @@ -17,6 +17,7 @@ #ifdef GLOO_USE_MPI #include +#define MPI_Isend MPI_Issend #include #endif //GLOO_USE_MPI diff --git a/cpp/src/cylon/net/mpi/mpi_channel.cpp b/cpp/src/cylon/net/mpi/mpi_channel.cpp index 359d4294d..ab2f2e7aa 100644 --- a/cpp/src/cylon/net/mpi/mpi_channel.cpp +++ b/cpp/src/cylon/net/mpi/mpi_channel.cpp @@ -14,6 +14,7 @@ #include #include +#define MPI_Isend MPI_Issend #include #include #include diff --git a/cpp/src/cylon/net/mpi/mpi_channel.hpp b/cpp/src/cylon/net/mpi/mpi_channel.hpp index d50c6bf0a..1aea281b1 100644 --- a/cpp/src/cylon/net/mpi/mpi_channel.hpp +++ b/cpp/src/cylon/net/mpi/mpi_channel.hpp @@ -19,6 +19,7 @@ #include #include #include +#define MPI_Isend MPI_Issend #include #include diff --git a/cpp/src/cylon/net/mpi/mpi_communicator.hpp b/cpp/src/cylon/net/mpi/mpi_communicator.hpp index aea07fa9e..b02a24f29 100644 --- a/cpp/src/cylon/net/mpi/mpi_communicator.hpp +++ b/cpp/src/cylon/net/mpi/mpi_communicator.hpp @@ -16,6 +16,7 @@ #define CYLON_SRC_CYLON_COMM_MPICOMMUNICATOR_H_ #include +#define MPI_Isend MPI_Issend #include #include diff --git a/cpp/src/cylon/net/mpi/mpi_operations.hpp b/cpp/src/cylon/net/mpi/mpi_operations.hpp index d1bb77c2e..98e2219c1 100644 --- a/cpp/src/cylon/net/mpi/mpi_operations.hpp +++ b/cpp/src/cylon/net/mpi/mpi_operations.hpp @@ -24,6 +24,7 @@ #include #include #include +#define MPI_Isend MPI_Issend namespace cylon { namespace mpi { diff --git a/cpp/src/cylon/net/mpi/mpi_type_traits.hpp b/cpp/src/cylon/net/mpi/mpi_type_traits.hpp index 9793a7661..241991e23 100644 --- a/cpp/src/cylon/net/mpi/mpi_type_traits.hpp +++ b/cpp/src/cylon/net/mpi/mpi_type_traits.hpp @@ -17,6 +17,7 @@ #define CYLON_CPP_SRC_CYLON_NET_MPI_MPI_TYPE_TRAITS_HPP_ #include +#define MPI_Isend MPI_Issend namespace cylon { diff --git a/cpp/src/cylon/net/ops/gather.cpp b/cpp/src/cylon/net/ops/gather.cpp index eecf050ef..b0756bdfb 100644 --- a/cpp/src/cylon/net/ops/gather.cpp +++ b/cpp/src/cylon/net/ops/gather.cpp @@ -13,6 +13,7 @@ */ #include +#define MPI_Isend MPI_Issend #include #include diff --git a/cpp/src/cylon/net/ucx/ucx_operations.hpp b/cpp/src/cylon/net/ucx/ucx_operations.hpp index 23d7ab2db..457eb99ee 100644 --- a/cpp/src/cylon/net/ucx/ucx_operations.hpp +++ b/cpp/src/cylon/net/ucx/ucx_operations.hpp @@ -16,6 +16,7 @@ #define CYLON_CPP_SRC_CYLON_NET_UCX_UCX_OPERATIONS_HPP_ #include +#define MPI_Isend MPI_Issend #include #include "cylon/status.hpp" diff --git a/cpp/src/examples/ucc_allgather_vector_example.cpp b/cpp/src/examples/ucc_allgather_vector_example.cpp index fbedeff8e..e3f64c441 100644 --- a/cpp/src/examples/ucc_allgather_vector_example.cpp +++ b/cpp/src/examples/ucc_allgather_vector_example.cpp @@ -19,6 +19,7 @@ */ #include +#define MPI_Isend MPI_Issend #include #include diff --git a/cpp/src/examples/ucc_example.cpp b/cpp/src/examples/ucc_example.cpp index 986471d49..b93285124 100644 --- a/cpp/src/examples/ucc_example.cpp +++ b/cpp/src/examples/ucc_example.cpp @@ -18,6 +18,7 @@ */ #include +#define MPI_Isend MPI_Issend #include #include #include diff --git a/cpp/test/common/test_header.hpp b/cpp/test/common/test_header.hpp index d6e23fa4b..8289eb4ad 100644 --- a/cpp/test/common/test_header.hpp +++ b/cpp/test/common/test_header.hpp @@ -18,6 +18,7 @@ #define CATCH_CONFIG_RUNNER #include #include +#define MPI_Isend MPI_Issend #include #include #include diff --git a/cpp/test/custom_mpi_comm_test.cpp b/cpp/test/custom_mpi_comm_test.cpp index 02a0a1bdd..318c23501 100644 --- a/cpp/test/custom_mpi_comm_test.cpp +++ b/cpp/test/custom_mpi_comm_test.cpp @@ -16,6 +16,7 @@ #include "catch.hpp" #include +#define MPI_Isend MPI_Issend #include #include "cylon/net/mpi/mpi_communicator.hpp" From cab20bcc3652e7bba41214c71348dfd026be90b5 Mon Sep 17 00:00:00 2001 From: niranda Date: Mon, 12 Sep 2022 15:52:29 -0400 Subject: [PATCH 28/30] Revert "checking mpi" This reverts commit 79139f97fd2e268fea4ec5c5a06d5cb09986e541. --- cpp/src/cylon/net/comm_operations.hpp | 1 - cpp/src/cylon/net/gloo/gloo_communicator.hpp | 1 - cpp/src/cylon/net/mpi/mpi_channel.cpp | 1 - cpp/src/cylon/net/mpi/mpi_channel.hpp | 1 - cpp/src/cylon/net/mpi/mpi_communicator.hpp | 1 - cpp/src/cylon/net/mpi/mpi_operations.hpp | 1 - cpp/src/cylon/net/mpi/mpi_type_traits.hpp | 1 - cpp/src/cylon/net/ops/gather.cpp | 1 - cpp/src/cylon/net/ucx/ucx_operations.hpp | 1 - cpp/src/examples/ucc_allgather_vector_example.cpp | 1 - cpp/src/examples/ucc_example.cpp | 1 - cpp/test/common/test_header.hpp | 1 - cpp/test/custom_mpi_comm_test.cpp | 1 - 13 files changed, 13 deletions(-) diff --git a/cpp/src/cylon/net/comm_operations.hpp b/cpp/src/cylon/net/comm_operations.hpp index b26dec56b..581f1bc4b 100644 --- a/cpp/src/cylon/net/comm_operations.hpp +++ b/cpp/src/cylon/net/comm_operations.hpp @@ -16,7 +16,6 @@ #define CYLON_CPP_SRC_CYLON_NET_COMM_OPERATIONS_HPP_ #include -#define MPI_Isend MPI_Issend #include namespace cylon { diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.hpp b/cpp/src/cylon/net/gloo/gloo_communicator.hpp index a0042bc20..d17fe95ed 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.hpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.hpp @@ -17,7 +17,6 @@ #ifdef GLOO_USE_MPI #include -#define MPI_Isend MPI_Issend #include #endif //GLOO_USE_MPI diff --git a/cpp/src/cylon/net/mpi/mpi_channel.cpp b/cpp/src/cylon/net/mpi/mpi_channel.cpp index ab2f2e7aa..359d4294d 100644 --- a/cpp/src/cylon/net/mpi/mpi_channel.cpp +++ b/cpp/src/cylon/net/mpi/mpi_channel.cpp @@ -14,7 +14,6 @@ #include #include -#define MPI_Isend MPI_Issend #include #include #include diff --git a/cpp/src/cylon/net/mpi/mpi_channel.hpp b/cpp/src/cylon/net/mpi/mpi_channel.hpp index 1aea281b1..d50c6bf0a 100644 --- a/cpp/src/cylon/net/mpi/mpi_channel.hpp +++ b/cpp/src/cylon/net/mpi/mpi_channel.hpp @@ -19,7 +19,6 @@ #include #include #include -#define MPI_Isend MPI_Issend #include #include diff --git a/cpp/src/cylon/net/mpi/mpi_communicator.hpp b/cpp/src/cylon/net/mpi/mpi_communicator.hpp index b02a24f29..aea07fa9e 100644 --- a/cpp/src/cylon/net/mpi/mpi_communicator.hpp +++ b/cpp/src/cylon/net/mpi/mpi_communicator.hpp @@ -16,7 +16,6 @@ #define CYLON_SRC_CYLON_COMM_MPICOMMUNICATOR_H_ #include -#define MPI_Isend MPI_Issend #include #include diff --git a/cpp/src/cylon/net/mpi/mpi_operations.hpp b/cpp/src/cylon/net/mpi/mpi_operations.hpp index 98e2219c1..d1bb77c2e 100644 --- a/cpp/src/cylon/net/mpi/mpi_operations.hpp +++ b/cpp/src/cylon/net/mpi/mpi_operations.hpp @@ -24,7 +24,6 @@ #include #include #include -#define MPI_Isend MPI_Issend namespace cylon { namespace mpi { diff --git a/cpp/src/cylon/net/mpi/mpi_type_traits.hpp b/cpp/src/cylon/net/mpi/mpi_type_traits.hpp index 241991e23..9793a7661 100644 --- a/cpp/src/cylon/net/mpi/mpi_type_traits.hpp +++ b/cpp/src/cylon/net/mpi/mpi_type_traits.hpp @@ -17,7 +17,6 @@ #define CYLON_CPP_SRC_CYLON_NET_MPI_MPI_TYPE_TRAITS_HPP_ #include -#define MPI_Isend MPI_Issend namespace cylon { diff --git a/cpp/src/cylon/net/ops/gather.cpp b/cpp/src/cylon/net/ops/gather.cpp index b0756bdfb..eecf050ef 100644 --- a/cpp/src/cylon/net/ops/gather.cpp +++ b/cpp/src/cylon/net/ops/gather.cpp @@ -13,7 +13,6 @@ */ #include -#define MPI_Isend MPI_Issend #include #include diff --git a/cpp/src/cylon/net/ucx/ucx_operations.hpp b/cpp/src/cylon/net/ucx/ucx_operations.hpp index 457eb99ee..23d7ab2db 100644 --- a/cpp/src/cylon/net/ucx/ucx_operations.hpp +++ b/cpp/src/cylon/net/ucx/ucx_operations.hpp @@ -16,7 +16,6 @@ #define CYLON_CPP_SRC_CYLON_NET_UCX_UCX_OPERATIONS_HPP_ #include -#define MPI_Isend MPI_Issend #include #include "cylon/status.hpp" diff --git a/cpp/src/examples/ucc_allgather_vector_example.cpp b/cpp/src/examples/ucc_allgather_vector_example.cpp index e3f64c441..fbedeff8e 100644 --- a/cpp/src/examples/ucc_allgather_vector_example.cpp +++ b/cpp/src/examples/ucc_allgather_vector_example.cpp @@ -19,7 +19,6 @@ */ #include -#define MPI_Isend MPI_Issend #include #include diff --git a/cpp/src/examples/ucc_example.cpp b/cpp/src/examples/ucc_example.cpp index b93285124..986471d49 100644 --- a/cpp/src/examples/ucc_example.cpp +++ b/cpp/src/examples/ucc_example.cpp @@ -18,7 +18,6 @@ */ #include -#define MPI_Isend MPI_Issend #include #include #include diff --git a/cpp/test/common/test_header.hpp b/cpp/test/common/test_header.hpp index 8289eb4ad..d6e23fa4b 100644 --- a/cpp/test/common/test_header.hpp +++ b/cpp/test/common/test_header.hpp @@ -18,7 +18,6 @@ #define CATCH_CONFIG_RUNNER #include #include -#define MPI_Isend MPI_Issend #include #include #include diff --git a/cpp/test/custom_mpi_comm_test.cpp b/cpp/test/custom_mpi_comm_test.cpp index 318c23501..02a0a1bdd 100644 --- a/cpp/test/custom_mpi_comm_test.cpp +++ b/cpp/test/custom_mpi_comm_test.cpp @@ -16,7 +16,6 @@ #include "catch.hpp" #include -#define MPI_Isend MPI_Issend #include #include "cylon/net/mpi/mpi_communicator.hpp" From 682fc7fc76655c886efa1a0b126603eb0f1776b1 Mon Sep 17 00:00:00 2001 From: niranda Date: Tue, 13 Sep 2022 20:25:32 -0400 Subject: [PATCH 29/30] adding mpi_comm arg to ucx config --- cpp/src/cylon/net/ucx/ucx_communicator.cpp | 51 ++++++++++++++-------- cpp/src/cylon/net/ucx/ucx_communicator.hpp | 19 +++++--- cpp/test/CMakeLists.txt | 9 ++++ cpp/test/custom_mpi_comm_test.cpp | 47 +++++++++++++++++--- 4 files changed, 97 insertions(+), 29 deletions(-) diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index 0e1771a83..0a8e6e339 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -41,10 +41,14 @@ CommType UCXConfig::Type() { return CommType::UCX; } -std::shared_ptr UCXConfig::Make() { - return std::make_shared(); +std::shared_ptr UCXConfig::Make(MPI_Comm comm) { + return std::make_shared(comm); } +UCXConfig::UCXConfig(MPI_Comm comm) : comm_(comm) {} + +MPI_Comm UCXConfig::GetMPIComm() const { return comm_; } + std::unique_ptr UCXCommunicator::CreateChannel() const { return std::make_unique(this); } @@ -91,8 +95,9 @@ Status UCXCommunicator::AllReduce(const std::shared_ptr &column, return {Code::NotImplemented, "Allreduce not implemented for ucx"}; } -UCXCommunicator::UCXCommunicator(MemoryPool *pool, bool externally_init) - : Communicator(pool, -1, -1), externally_init(externally_init) {} +UCXCommunicator::UCXCommunicator(MemoryPool *pool, bool externally_init, MPI_Comm comm) + : Communicator(pool, -1, -1), + externally_init(externally_init), mpi_comm(comm) {} Status UCXCommunicator::AllReduce(const std::shared_ptr &values, net::ReduceOp reduce_op, @@ -119,7 +124,9 @@ Status UCXCommunicator::Allgather(const std::shared_ptr &value, Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out) { - CYLON_UNUSED(config); + const auto &mpi_config = std::static_pointer_cast(config); + auto mpi_comm = mpi_config->GetMPIComm(); + // MPI init int initialized; MPI_Initialized(&initialized); @@ -127,8 +134,12 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); } - *out = std::make_shared(pool, initialized); - auto &comm = static_cast(**out); + if (mpi_comm == MPI_COMM_NULL) { + mpi_comm = MPI_COMM_WORLD; + } + + *out = std::make_shared(pool, initialized, mpi_comm); + auto &comm = dynamic_cast(**out); // Int variable used when iterating int sIndx; @@ -143,8 +154,8 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo ucp_address_t *address; // Get the rank for checking send to self, and initializations - MPI_Comm_rank(MPI_COMM_WORLD, &comm.rank); - MPI_Comm_size(MPI_COMM_WORLD, &comm.world_size); + MPI_Comm_rank(mpi_comm, &comm.rank); + MPI_Comm_size(mpi_comm, &comm.world_size); int rank = comm.rank, world_size = comm.world_size; @@ -165,7 +176,7 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo allAddresses.get(), (int) ucpRecvWorkerAddr->addrSize, MPI_BYTE, - MPI_COMM_WORLD)); + mpi_comm)); // Iterate and set the sends comm.endPointMap.reserve(world_size); @@ -216,7 +227,7 @@ void UCXCommunicator::Finalize() { } void UCXCommunicator::Barrier() { - MPI_Barrier(MPI_COMM_WORLD); + MPI_Barrier(this->mpi_comm); } CommType UCXCommunicator::GetCommType() const { @@ -249,9 +260,9 @@ static ucc_status_t oob_allgather_free(void *req) { return UCC_OK; } -UCXUCCCommunicator::UCXUCCCommunicator(std::shared_ptr ucx_comm) +UCXUCCCommunicator::UCXUCCCommunicator(const std::shared_ptr &ucx_comm) : Communicator(ucx_comm->GetMemoryPool(), ucx_comm->GetRank(), ucx_comm->GetWorldSize()), - ucx_comm_(std::move(ucx_comm)) {} + ucx_comm_(std::static_pointer_cast(ucx_comm)) {} Status UCXUCCCommunicator::Make(const std::shared_ptr &config, MemoryPool *pool, @@ -262,6 +273,8 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, *out = std::make_shared(std::move(ucx_comm)); auto &comm = *std::static_pointer_cast(*out); + auto mpi_comm = comm.ucx_comm_->mpi_comm; + // initialize UCC team and context ucc_context_params_t ctx_params; ucc_team_params_t team_params; @@ -286,7 +299,7 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, ctx_params.oob.allgather = oob_allgather; ctx_params.oob.req_test = oob_allgather_test; ctx_params.oob.req_free = oob_allgather_free; - ctx_params.oob.coll_info = (void *) MPI_COMM_WORLD; + ctx_params.oob.coll_info = (void *) mpi_comm; ctx_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); ctx_params.oob.oob_ep = static_cast(comm.GetRank()); @@ -301,7 +314,7 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, team_params.oob.allgather = oob_allgather; team_params.oob.req_test = oob_allgather_test; team_params.oob.req_free = oob_allgather_free; - team_params.oob.coll_info = (void *) MPI_COMM_WORLD; + team_params.oob.coll_info = (void *) mpi_comm; team_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); team_params.oob.oob_ep = static_cast(comm.GetRank()); RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_team_create_post(&comm.uccContext, 1, &team_params, @@ -323,7 +336,7 @@ std::unique_ptr UCXUCCCommunicator::CreateChannel() const { } void UCXUCCCommunicator::Finalize() { - if (!this->IsFinalized()) { + if (!IsFinalized()) { ucc_status_t status; while (uccTeam && (UCC_INPROGRESS == (status = ucc_team_destroy(uccTeam)))) { if (UCC_OK != status) { @@ -331,7 +344,11 @@ void UCXUCCCommunicator::Finalize() { break; } } - ucc_context_destroy(uccContext); + + if (!ucx_comm_->externally_init){ + ucc_context_destroy(uccContext); + } + ucx_comm_->Finalize(); // this will handle MPI_Finalize finalized = true; } diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.hpp b/cpp/src/cylon/net/ucx/ucx_communicator.hpp index 74abb5200..3e9eae897 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.hpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.hpp @@ -30,14 +30,22 @@ namespace net { class UCXConfig : public CommConfig { public: + explicit UCXConfig(MPI_Comm comm = MPI_COMM_NULL); + ~UCXConfig() override = default; + CommType Type() override; - static std::shared_ptr Make(); + MPI_Comm GetMPIComm() const; + + static std::shared_ptr Make(MPI_Comm comm = MPI_COMM_NULL); + + private: + MPI_Comm comm_; }; class UCXCommunicator : public Communicator { public: - explicit UCXCommunicator(MemoryPool *pool, bool externally_init); + UCXCommunicator(MemoryPool *pool, bool externally_init, MPI_Comm comm); ~UCXCommunicator() override = default; std::unique_ptr CreateChannel() const override; @@ -79,12 +87,13 @@ class UCXCommunicator : public Communicator { // UCP Context - Holds a UCP communication instance's global information. ucp_context_h ucpContext{}; bool externally_init = false; + MPI_Comm mpi_comm; }; #ifdef BUILD_CYLON_UCC -class UCXUCCCommunicator: public Communicator{ +class UCXUCCCommunicator : public Communicator { public: - explicit UCXUCCCommunicator(std::shared_ptr ucx_comm); + explicit UCXUCCCommunicator(const std::shared_ptr& ucx_comm); static Status Make(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out); @@ -115,7 +124,7 @@ class UCXUCCCommunicator: public Communicator{ ucc_team_h uccTeam{}; ucc_context_h uccContext{}; - std::shared_ptr ucx_comm_; + std::shared_ptr ucx_comm_; }; #endif } diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index 34a35d7f4..dec2e4c5d 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -236,6 +236,10 @@ if (CYLON_GLOO) cylon_run_test(dist_sort_test 1 gloo-mpi) cylon_run_test(dist_sort_test 4 gloo-mpi) + + # custom mpi comm test + cylon_run_test(custom_mpi_comm_test 1 gloo-mpi) + cylon_run_test(custom_mpi_comm_test 4 gloo-mpi) endif (CYLON_GLOO) #### ucc tests @@ -249,6 +253,10 @@ if (CYLON_UCX) cylon_run_test(parquet_join_test 2 ucx) cylon_run_test(parquet_join_test 4 ucx) + # custom mpi comm test + cylon_run_test(custom_mpi_comm_test 1 ucx) + cylon_run_test(custom_mpi_comm_test 4 ucx) + if (CYLON_UCC) # ucx + ucc tests cylon_run_test(aggregate_test 1 ucx) @@ -260,5 +268,6 @@ if (CYLON_UCX) # cylon_run_test(dist_sort_test 1 ucx) # cylon_run_test(dist_sort_test 4 ucx) + endif (CYLON_UCC) endif (CYLON_UCX) \ No newline at end of file diff --git a/cpp/test/custom_mpi_comm_test.cpp b/cpp/test/custom_mpi_comm_test.cpp index 02a0a1bdd..d02d85ca0 100644 --- a/cpp/test/custom_mpi_comm_test.cpp +++ b/cpp/test/custom_mpi_comm_test.cpp @@ -22,9 +22,41 @@ #include "cylon/ctx/cylon_context.hpp" #include "test_utils.hpp" +#ifdef BUILD_CYLON_GLOO +#include "cylon/net/gloo/gloo_communicator.hpp" +#endif + +#ifdef BUILD_CYLON_UCX +#include "cylon/net/ucx/ucx_communicator.hpp" +#endif + +std::string COMM_ARG; + namespace cylon { namespace test { +std::shared_ptr MakeConfig(MPI_Comm comm){ + if (COMM_ARG == "mpi") { + LOG(INFO) << "Using MPI"; + return net::MPIConfig::Make(comm); + } + +#ifdef BUILD_CYLON_GLOO + if (COMM_ARG == "gloo-mpi") { + LOG(INFO) << "Using Gloo with MPI"; + return net::GlooConfig::MakeWithMpi(comm); + } +#endif + +#ifdef BUILD_CYLON_UCX + if (COMM_ARG == "ucx") { + LOG(INFO) << "Using UCX with MPI"; + return net::UCXConfig::Make(comm); + } +#endif + return nullptr; +} + TEST_CASE("custom mpi communicator") { MPI_Init(nullptr, nullptr); MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN); @@ -48,9 +80,10 @@ TEST_CASE("custom mpi communicator") { MPI_Comm new_comm; REQUIRE(MPI_Comm_split(MPI_COMM_WORLD, color, l_rank, &new_comm) == MPI_SUCCESS); - auto mpi_config = cylon::net::MPIConfig::Make(new_comm); + auto config = MakeConfig(new_comm); + REQUIRE(config); std::shared_ptr ctx; - REQUIRE(cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()); + REQUIRE(cylon::CylonContext::InitDistributed(config, &ctx).is_ok()); REQUIRE(l_rank == ctx->GetRank()); if (color == 0) { @@ -67,8 +100,9 @@ TEST_CASE("custom mpi communicator") { color = rank / 2; // [0, 0, 1, 1] REQUIRE(MPI_Comm_split(MPI_COMM_WORLD, color, l_rank, &new_comm) == MPI_SUCCESS); - mpi_config = cylon::net::MPIConfig::Make(new_comm); - REQUIRE(cylon::CylonContext::InitDistributed(mpi_config, &ctx).is_ok()); + config = MakeConfig(new_comm); + REQUIRE(config); + REQUIRE(cylon::CylonContext::InitDistributed(config, &ctx).is_ok()); REQUIRE(l_rank == ctx->GetRank()); REQUIRE(ctx->GetWorldSize() == 2); @@ -95,9 +129,8 @@ TEST_CASE("custom mpi communicator") { int main(int argc, char *argv[]) { Catch::Session session; - std::string comm_args = "mpi"; - auto cli = session.cli() | Catch::clara::Opt(comm_args, "mpi|gloo-mpi|ucc")["--comm"]("comm args"); + auto cli = session.cli() | Catch::clara::Opt(COMM_ARG, "mpi|gloo-mpi|ucx")["--comm"]("comm args"); // Now pass the new composite back to Catch2 so it uses that session.cli(cli); @@ -107,6 +140,6 @@ int main(int argc, char *argv[]) { if (returnCode != 0) // Indicates a command line error return returnCode; - LOG(INFO) << "comm args: " << comm_args; + LOG(INFO) << "comm args: " << COMM_ARG; return session.run(); } From a1661c7dcc6f5ff9e557adfc668da7efce2bbaf2 Mon Sep 17 00:00:00 2001 From: niranda Date: Tue, 13 Sep 2022 22:00:26 -0400 Subject: [PATCH 30/30] adding cython bindings --- python/pycylon/pycylon/net/ucx_config.pxd | 4 +- python/pycylon/pycylon/net/ucx_config.pyx | 7 +- python/pycylon/test/conftest.py | 24 ++ python/pycylon/test/test_all.py | 252 ++++++++++---------- python/pycylon/test/test_custom_mpi_comm.py | 46 ++-- python/pycylon/test/test_ucx_mpi.py | 2 +- 6 files changed, 190 insertions(+), 145 deletions(-) create mode 100644 python/pycylon/test/conftest.py diff --git a/python/pycylon/pycylon/net/ucx_config.pxd b/python/pycylon/pycylon/net/ucx_config.pxd index a6f1b36c8..ab3b0a8d9 100644 --- a/python/pycylon/pycylon/net/ucx_config.pxd +++ b/python/pycylon/pycylon/net/ucx_config.pxd @@ -15,6 +15,8 @@ IF CYTHON_UCX & CYTHON_UCC: from libcpp.memory cimport shared_ptr + from mpi4py.libmpi cimport MPI_Comm + from pycylon.net.comm_type cimport CCommType from pycylon.net.comm_config cimport CommConfig @@ -23,7 +25,7 @@ IF CYTHON_UCX & CYTHON_UCC: CCommType Type() @ staticmethod - shared_ptr[CUCXConfig] Make(); + shared_ptr[CUCXConfig] Make(MPI_Comm comm); cdef class UCXConfig(CommConfig): diff --git a/python/pycylon/pycylon/net/ucx_config.pyx b/python/pycylon/pycylon/net/ucx_config.pyx index ed04440c3..248c6b328 100644 --- a/python/pycylon/pycylon/net/ucx_config.pyx +++ b/python/pycylon/pycylon/net/ucx_config.pyx @@ -16,12 +16,15 @@ IF CYTHON_UCX & CYTHON_UCC: from pycylon.net.comm_config cimport CommConfig from pycylon.net.ucx_config cimport CUCXConfig + cimport mpi4py.MPI as MPI + from mpi4py.MPI import COMM_NULL + cdef class UCXConfig(CommConfig): """ GlooConfig Type mapping from libCylon to PyCylon """ - def __cinit__(self): - self.ucx_config_shd_ptr = CUCXConfig.Make() + def __cinit__(self, comm = COMM_NULL): + self.ucx_config_shd_ptr = CUCXConfig.Make(( comm).ob_mpi) @property def comm_type(self): diff --git a/python/pycylon/test/conftest.py b/python/pycylon/test/conftest.py new file mode 100644 index 000000000..44d75547b --- /dev/null +++ b/python/pycylon/test/conftest.py @@ -0,0 +1,24 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +import pytest + + +def pytest_addoption(parser): + parser.addoption("--comm", action="store", default="mpi") + + +@pytest.fixture +def comm(request): + return request.config.getoption("--comm") diff --git a/python/pycylon/test/test_all.py b/python/pycylon/test/test_all.py index 61cba082f..70235a5bf 100644 --- a/python/pycylon/test/test_all.py +++ b/python/pycylon/test/test_all.py @@ -15,14 +15,10 @@ import os -import numpy as np - print("-------------------------------------------------") print("|\t\tPyCylon Test Framework\t\t|") print("-------------------------------------------------") -responses = [] - def get_mpi_command(): if os.name == 'posix': @@ -33,283 +29,287 @@ def get_mpi_command(): return "" +def assert_success(fn): + def wrapper(): + assert fn() == 0 + + return wrapper + + +@assert_success def test_pycylon_installation_test(): print("1. PyCylon Installation Test") - responses.append(os.system("pytest -q python/pycylon/test/test_pycylon.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_pycylon.py") +@assert_success def test_pyarrow_installation_test(): print("2. PyArrow Installation Test") - responses.append(os.system("pytest -q python/pycylon/test/test_build_arrow.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_build_arrow.py") -# def test_fake(): -# # NOTE: To Test the Test Framework -# print("Fake Test") -# responses.append(os.system("pytest -q python/pycylon/test/test_fake.py")) -# assert responses[-1] == 0 - +@assert_success def test_cylon_context(): print("3. CylonContext Test") - responses.append( - os.system( - get_mpi_command() + " -n 2 python -m pytest --with-mpi " - "-q python/pycylon/test/test_cylon_context.py")) - assert responses[-1] == 0 + return os.system( + get_mpi_command() + " -n 2 python -m pytest --with-mpi " + "-q python/pycylon/test/test_cylon_context.py") +@assert_success def test_channel(): print("4. Channel Test") - responses.append(os.system("pytest -q python/pycylon/test/test_channel.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_channel.py") +@assert_success def test_commtype(): print("5. CommType Test") - responses.append(os.system("pytest -q python/pycylon/test/test_comm_type.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_comm_type.py") +@assert_success def test_csv_read_options(): print("6. CSV Read Options Test") - responses.append(os.system("pytest -q python/pycylon/test/test_csv_read_options.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_csv_read_options.py") +@assert_success def test_datatype(): print("7. Data Types Test") - responses.append(os.system("pytest -q python/pycylon/test/test_data_types.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_data_types.py") +@assert_success def test_data_utils(): print("8. Data Utils Test") - responses.append(os.system("pytest -q python/pycylon/test/test_data_utils.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_data_utils.py") +@assert_success def test_status(): print("9. Cylon Status Test") - responses.append(os.system("pytest -q python/pycylon/test/test_status.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_status.py") +@assert_success def test_request(): print("10. Request Test") - responses.append(os.system("pytest -q python/pycylon/test/test_txrequest.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_txrequest.py") +@assert_success def test_pycylon_pyarrow(): print("11. PyArrow/PyCylon Test") - responses.append(os.system("pytest -q python/pycylon/test/test_pyarrow.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_pyarrow.py") +@assert_success def test_table_conversion(): print("12. Table Conversion Test") - responses.append(os.system( + return os.system( get_mpi_command() + " -n 2 python -m pytest --with-mpi " - "-q python/pycylon/test/test_cylon_table_conversion.py")) - assert responses[-1] == 0 + "-q python/pycylon/test/test_cylon_table_conversion.py") +@assert_success def test_table_operation(): print("13. Table Operation Test") - responses.append(os.system("pytest -q python/pycylon/test/test_table.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_table.py") +@assert_success def test_table_properties(): print("14. Table Properties Test") - responses.append(os.system("pytest -q python/pycylon/test/test_table_properties.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_table_properties.py") +@assert_success def test_aggregate(): print("15. Aggregate Test") - responses.append(os.system("pytest -q python/pycylon/test/test_aggregate.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_aggregate.py") +@assert_success def test_join_config(): print("16. Join Config Test") - responses.append(os.system("pytest -q python/pycylon/test/test_join_config.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_join_config.py") +@assert_success def test_simple_table_join(): print("17. Simple Table Join Test") - responses.append(os.system( + return os.system( get_mpi_command() + " -n 4 python -m pytest --with-mpi " - "-q python/pycylon/test/test_cylon_simple_table_join.py")) - assert responses[-1] == 0 + "-q python/pycylon/test/test_cylon_simple_table_join.py") +@assert_success def test_dist_rl(): print("18. Distributed Relational Algebra Operator Test") - responses.append( - os.system( - get_mpi_command() + " -n 4 python -m pytest --with-mpi " - "-q python/pycylon/test/test_dist_rl.py")) - assert responses[-1] == 0 + return os.system( + get_mpi_command() + " -n 4 python -m pytest --with-mpi " + "-q python/pycylon/test/test_dist_rl.py") +@assert_success def test_rl(): print("19. Sequential Relational Algebra Operator Test") - responses.append(os.system("pytest -q python/pycylon/test/test_rl.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_rl.py") +@assert_success def test_rl_col(): print("20. Sequential Relational Algebra with Column Names Test") - responses.append(os.system("pytest -q python/pycylon/test/test_ra_by_column_names.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_ra_by_column_names.py") +@assert_success def test_dist_rl_col(): print("21. Distributed Relational Algebra with Column Names Test") - responses.append( - os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi -q " - "python/pycylon/test/test_dist_ra_by_column_names.py")) - assert responses[-1] == 0 + return os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi -q " + "python/pycylon/test/test_dist_ra_by_column_names.py") +@assert_success def test_index(): print("22. Index Test") - responses.append( - os.system("pytest -q python/pycylon/test/test_index.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_index.py") +@assert_success def test_compute(): print("23. Compute Test") - responses.append( - os.system("pytest -q python/pycylon/test/test_compute.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_compute.py") +@assert_success def test_series(): print("24. Series Test") - responses.append( - os.system("pytest -q python/pycylon/test/test_series.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_series.py") +@assert_success def test_frame(): print("25. DataFrame Test") - responses.append( - os.system("pytest -q python/pycylon/test/test_frame.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_frame.py") +@assert_success def test_duplicate(): print("26. Duplicate Handling") - responses.append( - os.system( - get_mpi_command() + " -n 2 python -m pytest --with-mpi " - "-q python/pycylon/test/test_duplicate_handle.py")) - assert responses[-1] == 0 + return os.system( + get_mpi_command() + " -n 2 python -m pytest --with-mpi " + "-q python/pycylon/test/test_duplicate_handle.py") +@assert_success def test_sorting(): print("27. Sorting") - responses.append(os.system("pytest -q python/pycylon/test/test_sorting.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_sorting.py") +@assert_success def test_df_dist_sorting(): print("28. Sorting") - responses.append(os.system(get_mpi_command() + " -n 4 python -m pytest " - "-q python/pycylon/test/test_df_dist_sorting.py")) - assert responses[-1] == 0 + return os.system(get_mpi_command() + " -n 4 python -m pytest " + "-q python/pycylon/test/test_df_dist_sorting.py") +@assert_success def test_pd_read_csv(): print("29. pandas read_csv") - responses.append(os.system("pytest -q python/pycylon/test/test_pd_read_csv.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_pd_read_csv.py") +@assert_success def test_data_split(): print("30. Data Split") - responses.append(os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " - "python/pycylon/test/test_data_split.py")) - assert responses[-1] == 0 + return os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " + "python/pycylon/test/test_data_split.py") +@assert_success def test_repartition(): print("31. Repartition") - responses.append(os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " - "python/pycylon/test/test_repartition.py")) - assert responses[-1] == 0 + return os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " + "python/pycylon/test/test_repartition.py") +@assert_success def test_equals(): print("32. Equals") - responses.append(os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " - "python/pycylon/test/test_equal.py")) - assert responses[-1] == 0 + return os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " + "python/pycylon/test/test_equal.py") +@assert_success def test_parquet(): print("33. DataFrame Test") - responses.append(os.system("pytest -q python/pycylon/test/test_parquet.py")) - assert responses[-1] == 0 + return os.system("pytest -q python/pycylon/test/test_parquet.py") +@assert_success def test_dist_aggregate(): print("34. Dist Aggregates") - responses.append(os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " - "python/pycylon/test/test_dist_aggregate.py")) - assert responses[-1] == 0 + return os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " + "python/pycylon/test/test_dist_aggregate.py") +@assert_success def test_dist_io(): print("34. Dist IO") - responses.append(os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " - "python/pycylon/test/test_io.py")) - assert responses[-1] == 0 + return os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " + "python/pycylon/test/test_io.py") + + +@assert_success +def test_custom_mpi_comm(): + print("36. Custom mpi comm") + return os.system( + f"{get_mpi_command()} -n 4 pytest python/pycylon/test/test_custom_mpi_comm.py ") if os.environ.get('CYLON_GLOO'): + @assert_success def test_gloo(): print("35. Gloo") - responses.append(os.system("python -m pytest python/pycylon/test/test_gloo.py")) - assert responses[-1] == 0 + return os.system("python -m pytest python/pycylon/test/test_gloo.py") + @assert_success def test_gloo_mpi(): print("36. Gloo") - responses.append(os.system( - f"{get_mpi_command()} -n 4 python -m pytest python/pycylon/test/test_gloo_mpi.py")) - assert responses[-1] == 0 + return os.system( + f"{get_mpi_command()} -n 4 python -m pytest python/pycylon/test/test_gloo_mpi.py") + + + @assert_success + def test_custom_mpi_comm_gloo(): + print("36. Gloo custom mpi comm") + return os.system( + f"{get_mpi_command()} -n 4 pytest python/pycylon/test/test_custom_mpi_comm.py " + f"--comm gloo-mpi") if os.environ.get('CYLON_UCC'): + @assert_success def test_ucx_mpi(): print("37. UCX MPI") - responses.append(os.system( - f"{get_mpi_command()} -n 4 python -m pytest python/pycylon/test/test_ucx_mpi.py")) - assert responses[-1] == 0 + return os.system( + f"{get_mpi_command()} -n 4 python -m pytest python/pycylon/test/test_ucx_mpi.py") + + + @assert_success + def test_custom_mpi_comm_ucx(): + print("36. UCX custom mpi comm") + return os.system( + f"{get_mpi_command()} -n 4 pytest python/pycylon/test/test_custom_mpi_comm.py " + f"--comm ucx") if os.environ.get('CYLON_GLOO') and os.environ.get('CYLON_UCC'): + @assert_success def test_mpi_multiple_env_init(): print("38. Create and destroy multiple environments in MPI") - responses.append(os.system( + return os.system( f"{get_mpi_command()} -n 4 python -m pytest " - f"python/pycylon/test/test_mpi_multiple_env_init.py")) - assert responses[-1] == 0 - - -def test_all(): - ar = np.array(responses) - total = len(responses) - failed_count = sum(ar > 0) - - if failed_count > 0: - print(f"{failed_count} of {total} Tests Failed !!!") - assert False - else: - print("All Tests Passed!") + f"python/pycylon/test/test_mpi_multiple_env_init.py") diff --git a/python/pycylon/test/test_custom_mpi_comm.py b/python/pycylon/test/test_custom_mpi_comm.py index a2eec0b21..db03edd84 100644 --- a/python/pycylon/test/test_custom_mpi_comm.py +++ b/python/pycylon/test/test_custom_mpi_comm.py @@ -14,8 +14,9 @@ """ Run test: ->> mpirun -n 4 python -m pytest -q python/pycylon/test/test_custom_mpi_comm.py +>> mpirun -n 4 pytest -q python/pycylon/test/test_custom_mpi_comm.py --comm [mpi/gloo-mpi/ucx] """ +import os from mpi4py import MPI from pycylon.frame import CylonEnv, read_csv, DataFrame @@ -23,14 +24,29 @@ from pyarrow.csv import ReadOptions, read_csv as pa_read_csv -def join_test(): +def get_comm_config(comm_str, new_comm): + if comm_str == 'mpi': + return MPIConfig(comm=new_comm) + + if os.environ.get('CYLON_GLOO') and comm_str == 'gloo-mpi': + from pycylon.net.gloo_config import GlooMPIConfig + return GlooMPIConfig(comm=new_comm) + + if os.environ.get('CYLON_UCC') and comm_str == 'ucx': + from pycylon.net.ucx_config import UCXConfig + return UCXConfig(comm=new_comm) + + raise ValueError(f'unknown comm string {comm_str}') + + +def join_test(comm_str): comm = MPI.COMM_WORLD rank = comm.rank l_rank = rank % 2 color = int(rank / 2) new_comm = comm.Split(color, l_rank) - config = MPIConfig(new_comm) + config = get_comm_config(comm_str, new_comm) env = CylonEnv(config=config, distributed=True) df1 = read_csv(f"data/input/csv1_{l_rank}.csv", env=env) @@ -44,15 +60,15 @@ def join_test(): assert out.equals(res, ordered=False) -def custom_comm_test(): - comm = MPI.COMM_WORLD - rank = comm.rank +def custom_comm_test(comm_str): + comm_ = MPI.COMM_WORLD + rank = comm_.rank l_rank = rank if rank < 3 else rank - 3 color = 0 if rank < 3 else 1 - new_comm = comm.Split(color, l_rank) + new_comm = comm_.Split(color, l_rank) - config = MPIConfig(new_comm) + config = get_comm_config(comm_str, new_comm) env = CylonEnv(config=config, distributed=True) print(f"local rank {env.rank} sz {env.world_size}") @@ -64,16 +80,16 @@ def custom_comm_test(): env.finalize() -def test_custom_mpi_comm(): - comm = MPI.COMM_WORLD - rank = comm.rank - sz = comm.size - print(f"rank {rank} sz {sz}") +def test_custom_mpi_comm(comm): + comm_ = MPI.COMM_WORLD + rank = comm_.rank + sz = comm_.size + print(f"comm {comm} rank {rank} sz {sz}") if sz != 4: return - custom_comm_test() - join_test() + custom_comm_test(comm) + join_test(comm) MPI.Finalize() diff --git a/python/pycylon/test/test_ucx_mpi.py b/python/pycylon/test/test_ucx_mpi.py index dc1528f5c..b3cf6c35d 100644 --- a/python/pycylon/test/test_ucx_mpi.py +++ b/python/pycylon/test/test_ucx_mpi.py @@ -41,4 +41,4 @@ def test_ucx_mpi(): # confirms that the code is under main function df3 = df1.merge(right=df2, on=[0], env=env) print(f'res len {len(df3)}') - env.finalize() \ No newline at end of file + env.finalize()