diff --git a/.github/workflows/conda-actions.yml b/.github/workflows/conda-actions.yml index 57a7552ac..9f089310a 100644 --- a/.github/workflows/conda-actions.yml +++ b/.github/workflows/conda-actions.yml @@ -1,4 +1,4 @@ -name: Conda C++/Python - gcc,OpenMPI +name: Conda C++/Python - gcc,OpenMPI & GCylon on: push: diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index 49b56c630..24c6a49ad 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -12,13 +12,33 @@ on: jobs: build: - runs-on: macos-10.15 + runs-on: ${{ matrix.os }} + defaults: + run: + shell: bash -l {0} + strategy: + fail-fast: false + matrix: + include: + - os: macos-10.15 steps: - uses: actions/checkout@v2 - name: Remove link for preventing an error run: rm /usr/local/bin/2to3 - name: Install dependencies - run: brew install open-mpi re2 && brew update --preinstall && brew install automake boost brotli c-ares ccache cmake flatbuffers glog grpc llvm llvm@8 lz4 minio ninja numpy openssl@1.1 protobuf python rapidjson snappy thrift wget zstd - - name: build python and test - run: python3 -m venv ENV && ./build.sh -pyenv $(pwd)/ENV -bpath $(pwd)/build --python --cmake-flags "-DCYLON_PARQUET=ON" + run: brew update --preinstall && brew install re2 automake boost brotli c-ares ccache flatbuffers grpc llvm llvm@8 lz4 minio ninja openssl@1.1 protobuf rapidjson snappy thrift wget zstd + + - uses: conda-incubator/setup-miniconda@v2 + with: + activate-environment: cylon_dev + environment-file: conda/environments/cylon_MacOS.yml + + - name: activate conda + run: conda activate cylon_dev + + - name: Build cylon, pycylon and run cpp test + run: python build.py --cpp --python --test + + - name: Run pytest + run: python build.py --pytest \ No newline at end of file diff --git a/build.py b/build.py index 3e4fde35c..9d7a802d1 100644 --- a/build.py +++ b/build.py @@ -97,6 +97,38 @@ def on_off(arg): PYTHON_EXEC = sys.executable +CMAKE_BOOL_FLAGS = {'CYLON_GLOO', 'CYLON_UCX', 'CYLON_UCC'} +CMAKE_FALSE_OPTIONS = {'0', 'FALSE', 'OFF', 'N', 'NO', 'IGNORE', 'NOTFOUND'} + + +def parse_cmake_bool(v): + """ + Converts string to 0 or 1. Evaluates to 0 if any of the following is true: + - string is empty, + - string is a case-insensitive equal of 0, FALSE, OFF, N, NO, IGNORE, or NOTFOUND, or + - string ends in the suffix -NOTFOUND (case-sensitive). + Otherwise, evaluates to 1. + """ + return bool(v and v not in CMAKE_FALSE_OPTIONS) + + +def parse_cmake_flags(flag): + for f in CMAKE_FLAGS.strip().replace('-D', '').split(): + k, v = f.split('=') + if k != flag: + continue + else: + return parse_cmake_bool(v) if k in CMAKE_BOOL_FLAGS else v + return None + + +CYLON_GLOO = parse_cmake_flags('CYLON_GLOO') +GLOO_PREFIX = parse_cmake_flags('GLOO_INSTALL_PREFIX') + +CYLON_UCX = parse_cmake_flags('CYLON_UCX') +CYLON_UCC = parse_cmake_flags('CYLON_UCC') +UCC_PREFIX = parse_cmake_flags('UCC_INSTALL_PREFIX') + def print_line(): logger.info("=================================================================") @@ -109,6 +141,11 @@ def print_line(): logger.info(f"Build path : {BUILD_DIR}") logger.info(f"Install path : {INSTALL_DIR}") logger.info(f"CMake flags : {CMAKE_FLAGS}") +logger.info(f" -CYLON_GLOO : {CYLON_GLOO}") +logger.info(f" -GLOO_PREFIX : {GLOO_PREFIX}") +logger.info(f" -CYLON_UCX : {CYLON_UCX}") +logger.info(f" -CYLON_UCC : {CYLON_UCC}") +logger.info(f" -UCC_PREFIX : {UCC_PREFIX}") logger.info(f"Run C++ tests : {RUN_CPP_TESTS}") logger.info(f"Build PyCylon : {BUILD_PYTHON}") logger.info(f"Run Py tests : {RUN_PYTHON_TESTS}") @@ -218,20 +255,28 @@ def build_python(): print_line() logger.info("Building Python") - CONDA_PREFIX = os.getenv('CONDA_PREFIX') - if not CONDA_PREFIX: + conda_prefix = os.getenv('CONDA_PREFIX') + if not conda_prefix: logger.error("The build should be in a conda environment") - return + return 1 python_build_command = f'{PYTHON_EXEC} setup.py install --force' env = os.environ env["CYLON_PREFIX"] = str(BUILD_DIR) if os.name == 'posix': - env["ARROW_PREFIX"] = str(Path(CONDA_PREFIX)) + env["ARROW_PREFIX"] = str(Path(conda_prefix)) elif os.name == 'nt': env["ARROW_PREFIX"] = str(Path(os.environ["CONDA_PREFIX"], "Library")) - logger.info("Arrow prefix: " + str(Path(os.environ["CONDA_PREFIX"]))) + if CYLON_GLOO: + env['CYLON_GLOO'] = str(CYLON_GLOO) + env['GLOO_PREFIX'] = GLOO_PREFIX + if CYLON_UCC and CYLON_UCX: + env['CYLON_UCX'] = str(CYLON_UCX) + env['CYLON_UCC'] = str(CYLON_UCC) + env['UCC_PREFIX'] = UCC_PREFIX + + logger.info("Arrow prefix: " + str(Path(conda_prefix))) res = subprocess.run(python_build_command, shell=True, env=env, cwd=PYTHON_SOURCE_DIR) check_status(res.returncode, "PyCylon build") diff --git a/conda/environments/cylon_MacOS.yml b/conda/environments/cylon_MacOS.yml index d9d4f0357..8af49b18a 100644 --- a/conda/environments/cylon_MacOS.yml +++ b/conda/environments/cylon_MacOS.yml @@ -3,7 +3,7 @@ channels: - conda-forge - defaults dependencies: - - python=3.8 + - python=3.9 - cmake>=3.17 - pyarrow=5.0.0 - glog=0.5.0 @@ -11,6 +11,7 @@ dependencies: - cython>=0.29,<0.30 - numpy>=1.16 - pandas>=1.0 + - fsspec - setuptools>=40.0,<60.0 # they are not needed for using pygcylon or compiling it - pytest diff --git a/cpp/src/cylon/ctx/cylon_context.cpp b/cpp/src/cylon/ctx/cylon_context.cpp index 851f1d300..a76a7e444 100644 --- a/cpp/src/cylon/ctx/cylon_context.cpp +++ b/cpp/src/cylon/ctx/cylon_context.cpp @@ -166,4 +166,8 @@ bool CylonContext::IsDistributed() const { cylon::net::CommType CylonContext::GetCommType() { return is_distributed ? this->communicator->GetCommType() : net::CommType::LOCAL; } + +CylonContext::~CylonContext() { + this->Finalize(); +} } // namespace cylon diff --git a/cpp/src/cylon/ctx/cylon_context.hpp b/cpp/src/cylon/ctx/cylon_context.hpp index 052550722..24a28b21c 100644 --- a/cpp/src/cylon/ctx/cylon_context.hpp +++ b/cpp/src/cylon/ctx/cylon_context.hpp @@ -41,6 +41,7 @@ class CylonContext { * @param distributed */ explicit CylonContext(bool distributed); + ~CylonContext(); /** * Initializes context diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.cpp b/cpp/src/cylon/net/gloo/gloo_communicator.cpp index 038d4022b..38fcad2fc 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.cpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.cpp @@ -34,19 +34,23 @@ Status GlooCommunicator::Init(const std::shared_ptr &config) { const auto &gloo_config = std::static_pointer_cast(config); gloo::transport::tcp::attr attr; - attr.hostname = gloo_config->tcp_hostname; - attr.iface = gloo_config->tcp_iface; - attr.ai_family = gloo_config->tcp_ai_family; + attr.hostname = gloo_config->tcp_hostname_; + attr.iface = gloo_config->tcp_iface_; + attr.ai_family = gloo_config->tcp_ai_family_; // create device dev_ = gloo::transport::tcp::CreateDevice(attr); - if (gloo_config->use_mpi) { + if (gloo_config->use_mpi_) { #ifdef GLOO_USE_MPI int res; RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Initialized(&res)); if (res) { - gloo_ctx_ = std::make_shared(gloo_config->mpi_comm); + if (gloo_config->mpi_comm_ == MPI_COMM_NULL) { + gloo_ctx_ = std::make_shared(MPI_COMM_WORLD); + } else { + gloo_ctx_ = std::make_shared(gloo_config->mpi_comm_); + } } else { // MPI is not initialized. Ask gloo to initialize MPI gloo_ctx_ = gloo::mpi::Context::createManaged(); } @@ -59,16 +63,16 @@ Status GlooCommunicator::Init(const std::shared_ptr &config) { return {Code::Invalid, "Gloo does not contain mpi headers!"}; #endif // GLOO_USE_MPI } else { + rank = gloo_config->rank_; + world_size = gloo_config->world_size_; + // store and prefix store - store_ = std::make_shared(gloo_config->file_store_path); - prefix_store_ = std::make_shared(gloo_config->store_prefix, + store_ = std::make_shared(gloo_config->file_store_path_); + prefix_store_ = std::make_shared(gloo_config->store_prefix_, *store_); gloo_ctx_ = std::make_shared(rank, world_size); ((gloo::rendezvous::Context &) *gloo_ctx_).connectFullMesh(*prefix_store_, dev_); - - rank = gloo_config->rank; - world_size = gloo_config->world_size; } return Status::OK(); } @@ -146,11 +150,45 @@ CommType GlooConfig::Type() { return GLOO; } #ifdef GLOO_USE_MPI std::shared_ptr GlooConfig::MakeWithMpi(MPI_Comm comm) { - auto config = std::make_shared(); - config->use_mpi = true; - config->mpi_comm = comm; - return config; + return std::make_shared(comm); } + +GlooConfig::GlooConfig(MPI_Comm mpi_comm) + : rank_(-1), world_size_(-1), use_mpi_(true), mpi_comm_(mpi_comm) {} #endif //GLOO_USE_MPI + +std::shared_ptr GlooConfig::Make(int rank, int world_size) { + return std::make_shared(rank, world_size); +} + +GlooConfig::GlooConfig(int rank, int world_size, bool use_mpi) + : rank_(rank), world_size_(world_size), use_mpi_(use_mpi) {} + +void GlooConfig::SetTcpHostname(const std::string &tcp_hostname) { + GlooConfig::tcp_hostname_ = tcp_hostname; +} + +void GlooConfig::SetTcpIface(const std::string &tcp_iface) { + GlooConfig::tcp_iface_ = tcp_iface; +} + +void GlooConfig::SetTcpAiFamily(int tcp_ai_family) { + GlooConfig::tcp_ai_family_ = tcp_ai_family; +} + +void GlooConfig::SetFileStorePath(const std::string &file_store_path) { + GlooConfig::file_store_path_ = file_store_path; +} + +void GlooConfig::SetStorePrefix(const std::string &store_prefix) { + GlooConfig::store_prefix_ = store_prefix; +} +int GlooConfig::rank() const { + return rank_; +} +int GlooConfig::world_size() const { + return world_size_; +} + } } diff --git a/cpp/src/cylon/net/gloo/gloo_communicator.hpp b/cpp/src/cylon/net/gloo/gloo_communicator.hpp index 5c73b74a6..a91cfbea7 100644 --- a/cpp/src/cylon/net/gloo/gloo_communicator.hpp +++ b/cpp/src/cylon/net/gloo/gloo_communicator.hpp @@ -31,11 +31,38 @@ namespace cylon { namespace net { +class GlooCommunicator; + class GlooConfig : public CommConfig { public: - int rank = 0; - int world_size = 1; - bool use_mpi = false; + explicit GlooConfig(int rank = 0, int world_size = 1, bool use_mpi = false); + +#ifdef GLOO_USE_MPI + explicit GlooConfig(MPI_Comm mpi_comm = MPI_COMM_WORLD); +#endif //GLOO_USE_MPI + + int rank() const; + int world_size() const; + + void SetTcpHostname(const std::string &tcp_hostname); + void SetTcpIface(const std::string &tcp_iface); + void SetTcpAiFamily(int tcp_ai_family); + void SetFileStorePath(const std::string &file_store_path); + void SetStorePrefix(const std::string &store_prefix); + + CommType Type() override; + +#ifdef GLOO_USE_MPI + static std::shared_ptr MakeWithMpi(MPI_Comm comm = MPI_COMM_NULL); +#endif //GLOO_USE_MPI + + static std::shared_ptr Make(int rank, int world_size); + + private: + friend GlooCommunicator; + int rank_; + int world_size_; + bool use_mpi_; #ifdef GLOO_USE_MPI /* @@ -43,23 +70,17 @@ class GlooConfig : public CommConfig { * Pass an MPI communicator to bootstrap Gloo context using MPI (NOTE: Gloo needs to be built * with -DUSE_MPI=1 flag, to use MPI communicator). */ - MPI_Comm mpi_comm = MPI_COMM_WORLD; + MPI_Comm mpi_comm_ = MPI_COMM_WORLD; #endif //GLOO_USE_MPI // tcp attr - std::string tcp_hostname = "localhost"; - std::string tcp_iface; - int tcp_ai_family = AF_UNSPEC; + std::string tcp_hostname_ = "localhost"; + std::string tcp_iface_; + int tcp_ai_family_ = AF_UNSPEC; // file store configs - std::string file_store_path; - std::string store_prefix; - - CommType Type() override; - -#ifdef GLOO_USE_MPI - static std::shared_ptr MakeWithMpi(MPI_Comm comm = nullptr); -#endif //GLOO_USE_MPI + std::string file_store_path_; + std::string store_prefix_; }; class GlooCommunicator : public Communicator { diff --git a/cpp/src/cylon/status.hpp b/cpp/src/cylon/status.hpp index da97685e8..7a0690844 100644 --- a/cpp/src/cylon/status.hpp +++ b/cpp/src/cylon/status.hpp @@ -20,8 +20,7 @@ namespace cylon { class Status { public: - Status() { - } + Status() = default; Status(int code, const std::string &msg) { this->code = code; @@ -58,8 +57,8 @@ class Status { } private: - int code; - std::string msg; + int code{}; + std::string msg{}; }; } // namespace cylon diff --git a/cpp/src/examples/gloo_example.cpp b/cpp/src/examples/gloo_example.cpp index d433bc26a..ba7a3fa75 100644 --- a/cpp/src/examples/gloo_example.cpp +++ b/cpp/src/examples/gloo_example.cpp @@ -12,73 +12,48 @@ * limitations under the License. */ -#include #include -#include "gloo/mpi/context.h" -#include "gloo/transport/tcp/device.h" -#include "gloo/allreduce_ring.h" - #include "cylon/net/gloo/gloo_communicator.hpp" #include "cylon/ctx/cylon_context.hpp" -int main(int argc, char **argv) { - /*int rv; - - rv = MPI_Init(&argc, &argv); - assert(rv == MPI_SUCCESS); +#include "examples/example_utils.hpp" - // We'll use the TCP transport in this example - auto dev = gloo::transport::tcp::CreateDevice("localhost"); +static constexpr int kCount = 10; +static constexpr double kDup = 0.9; - // Use inner scope to force destruction of context and algorithm - { - // Create Gloo context from MPI communicator - auto context = std::make_shared(MPI_COMM_WORLD); - context->connectFullMesh(dev); +int main(int argc, char **argv) { + auto config = std::make_shared(std::stoi(argv[1]), std::stoi(argv[2])); + config->SetFileStorePath("/tmp/gloo/"); + config->SetStorePrefix("foo"); - // Create and run simple allreduce - int rank = context->rank; - gloo::AllreduceRing allreduce(context, {&rank}, 1); - allreduce.run(); - std::cout << "Result: " << rank << std::endl; + if (config->rank() == 0) { + system("rm -rf /tmp/gloo/*"); } - rv = MPI_Finalize(); - assert(rv == MPI_SUCCESS); - return 0;*/ - /*v; + std::shared_ptr ctx; + if (!cylon::CylonContext::InitDistributed(config, &ctx).is_ok()) { + return 1; + } - rv = MPI_Init(&argc, &argv); - assert(rv == MPI_SUCCESS); + LOG(INFO) << "rank:" << ctx->GetRank() << " size:" << ctx->GetWorldSize(); - // We'll use the TCP transport in this example - auto dev = gloo::transport::tcp::CreateDevice("localhost"); + std::shared_ptr first_table, second_table, out; - // Use inner scope to force destruction of context and algorithm - { - // Create Gloo context from MPI communicator - auto context = std::make_shared(MPI_COMM_WORLD); - context->connectFullMesh(dev); + cylon::examples::create_two_in_memory_tables(kCount, kDup, ctx, first_table, second_table); - // Create and run simple allreduce - int rank = context->rank; - gloo::AllreduceRing allreduce(context, {&rank}, 1); - allreduce.run(); - std::cout << "Result: " << rank << std::endl; - } + cylon::join::config::JoinConfig jc{cylon::join::config::JoinType::INNER, 0, 0, + cylon::join::config::JoinAlgorithm::SORT, "l_", "r_"}; - rv = MPI_Finalize(); - assert(rv == MPI_SUCCESS); - return 0;*/ - auto config = std::make_shared(); - config->use_mpi = true; + auto status = cylon::DistributedJoin(first_table, second_table, jc, out); - std::shared_ptr ctx; - if (!cylon::CylonContext::InitDistributed(config, &ctx).is_ok()){ + if (!status.is_ok()) { + LOG(INFO) << "Table join failed "; return 1; } - std::cout << "rank: " << ctx->GetRank() << " size " << ctx->GetWorldSize() << std::endl; + LOG(INFO) << "First table had : " << first_table->Rows() << " and Second table had : " + << second_table->Rows() << ", Joined has : " << out->Rows(); + return 0; } diff --git a/python/pycylon/pycylon/common/status.pxd b/python/pycylon/pycylon/common/status.pxd index 47f960b34..a5f631446 100644 --- a/python/pycylon/pycylon/common/status.pxd +++ b/python/pycylon/pycylon/common/status.pxd @@ -23,17 +23,14 @@ from pycylon.common.code cimport CCode cdef extern from "../../../../cpp/src/cylon/status.hpp" namespace "cylon": cdef cppclass CStatus "cylon::Status": CStatus() - CStatus(int, string) - CStatus(int) + # CStatus(int, string) + # CStatus(int) CStatus(CCode) CStatus(CCode, string) int get_code() bool is_ok() - string get_msg() + const string & get_msg() cdef class Status: cdef: CStatus *thisptr - CCode _code - string msg - int code diff --git a/python/pycylon/pycylon/common/status.pyx b/python/pycylon/pycylon/common/status.pyx index 183e7b692..c0372935e 100644 --- a/python/pycylon/pycylon/common/status.pyx +++ b/python/pycylon/pycylon/common/status.pyx @@ -13,64 +13,43 @@ ## from libcpp.string cimport string -from libcpp cimport bool from pycylon.common.code cimport CCode from pycylon.common.status cimport CStatus cdef class Status: - - def __cinit__(self, int code, string msg, CCode _code): - ''' + def __cinit__(self, CCode code=CCode._OK, string msg=b''): + """ Initializes the Status to wrap the C++ object in Cython - :param code: passes as an int to represent the status code. :param msg: passes a str to convery the status message - :param _code: Cython correpondence to C++ Code object + :param code: Cython correpondence to C++ Code object :return: None - ''' - if _code != -1 and msg.size() == 0 and code == -1: - #print("Status(_Code)") - self.thisptr = new CStatus(_code) - self._code = _code - - elif msg.size() != 0 and code != -1: - #print("Status(code, msg)") - self.thisptr = new CStatus(code, msg) - self.msg = msg - self.code = code - - elif msg.size() == 0 and _code == -1 and code != -1: - #print("Status(code)") + """ + if msg.empty(): self.thisptr = new CStatus(code) - self.code = code - - elif msg.size() != 0 and _code != -1 and code == -1: - #print("Status(_Code, msg)") - self.thisptr = new CStatus(_code, msg) - self._code = _code - self.msg = msg else: - # non arg constructor used for wrapping CStatus to Status - pass + self.thisptr = new CStatus(code, msg) + def __del__(self): + del self.thisptr def get_code(self): - ''' + """ :return: the code - ''' + """ return self.thisptr.get_code() def is_ok(self): - ''' + """ :return: OK status from Status - ''' + """ return self.thisptr.is_ok() def get_msg(self): - ''' + """ :return: Message from Status - ''' + """ return self.thisptr.get_msg().decode() diff --git a/python/pycylon/pycylon/net/gloo_config.pxd b/python/pycylon/pycylon/net/gloo_config.pxd new file mode 100644 index 000000000..1c1f13113 --- /dev/null +++ b/python/pycylon/pycylon/net/gloo_config.pxd @@ -0,0 +1,46 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +IF CYTHON_GLOO: + from libcpp.memory cimport shared_ptr + from libcpp.string cimport string + from mpi4py.libmpi cimport MPI_Comm + + from pycylon.net.comm_config cimport CommConfig + + cdef extern from "../../../../cpp/src/cylon/net/gloo/gloo_communicator.hpp" namespace "cylon::net": + cdef cppclass CGlooConfig "cylon::net::GlooConfig": + int rank() + int world_size() + + void SetTcpHostname(const string& tcp_hostname) + void SetTcpIface(const string & tcp_iface) + void SetTcpAiFamily(int tcp_ai_family) + void SetFileStorePath(const string & file_store_path) + void SetStorePrefix(const string & store_prefix) + + @staticmethod + shared_ptr[CGlooConfig] MakeWithMpi(MPI_Comm comm); + + @staticmethod + shared_ptr[CGlooConfig] Make(int rank, int world_size); + + + cdef class GlooMPIConfig(CommConfig): + cdef: + shared_ptr[CGlooConfig] gloo_config_shd_ptr + + cdef class GlooStandaloneConfig(CommConfig): + cdef: + shared_ptr[CGlooConfig] gloo_config_shd_ptr \ No newline at end of file diff --git a/python/pycylon/pycylon/net/gloo_config.pyx b/python/pycylon/pycylon/net/gloo_config.pyx new file mode 100644 index 000000000..8a3b4f1f7 --- /dev/null +++ b/python/pycylon/pycylon/net/gloo_config.pyx @@ -0,0 +1,67 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +IF CYTHON_GLOO: + from pycylon.net.comm_config cimport CommConfig + from pycylon.net.gloo_config cimport CGlooConfig + cimport mpi4py.MPI as MPI + + from mpi4py.MPI import COMM_NULL + + cdef class GlooMPIConfig(CommConfig): + """ + GlooConfig Type mapping from libCylon to PyCylon + """ + def __cinit__(self, MPI.Comm comm = COMM_NULL): + self.gloo_config_shd_ptr = CGlooConfig.MakeWithMpi(comm.ob_mpi) + + @property + def rank(self): + return self.gloo_config_shd_ptr.get().rank() + + @property + def world_size(self): + return self.gloo_config_shd_ptr.get().world_size() + + cdef class GlooStandaloneConfig(CommConfig): + """ + GlooConfig Type mapping from libCylon to PyCylon + """ + def __cinit__(self, int rank = 0, int world_size = 1): + if rank < 0 or world_size < 0: + raise ValueError(f"Invalid rank/ world size provided") + self.gloo_config_shd_ptr = CGlooConfig.Make(rank, world_size) + + @property + def rank(self): + return self.gloo_config_shd_ptr.get().rank() + + @property + def world_size(self): + return self.gloo_config_shd_ptr.get().world_size() + + def set_tcp_hostname(self, hostname: str): + self.gloo_config_shd_ptr.get().SetTcpHostname(hostname.encode()) + + def set_tcp_iface(self, iface: str): + self.gloo_config_shd_ptr.get().SetTcpIface(iface.encode()) + + def set_tcp_ai_family(self, ai_family: int): + self.gloo_config_shd_ptr.get().SetTcpAiFamily(ai_family) + + def set_file_store_path(self, path: str): + self.gloo_config_shd_ptr.get().SetFileStorePath(path.encode()) + + def set_store_prefix(self, prefix: str): + self.gloo_config_shd_ptr.get().SetStorePrefix(prefix.encode()) diff --git a/python/pycylon/setup.py b/python/pycylon/setup.py index 888a0f4ba..1cb302144 100644 --- a/python/pycylon/setup.py +++ b/python/pycylon/setup.py @@ -22,12 +22,12 @@ import platform import sysconfig from distutils.sysconfig import get_python_lib +from distutils.util import strtobool import numpy as np import pyarrow as pa from Cython.Build import cythonize -from setuptools import find_packages, setup -from setuptools.extension import Extension +from setuptools import Extension, find_packages, setup import versioneer @@ -42,6 +42,14 @@ CYLON_PREFIX = os.environ.get('CYLON_PREFIX') ARROW_PREFIX = os.environ.get('ARROW_PREFIX') +CYLON_GLOO = strtobool(os.environ.get('CYLON_GLOO') or '0') +GLOO_PREFIX = os.environ.get('GLOO_PREFIX') +CYLON_UCX = strtobool(os.environ.get('CYLON_UCX') or '0') +CYLON_UCC = strtobool(os.environ.get('CYLON_UCC') or '0') +UCC_PREFIX = os.environ.get('UCC_PREFIX') + +if not CYLON_PREFIX: + raise ValueError("CYLON_PREFIX not set") try: nthreads = int(os.environ.get("PARALLEL_LEVEL", "0") or "0") @@ -50,15 +58,8 @@ compiler_directives = {"language_level": 3, "embedsignature": True} -cython_files = ["pycylon/*/*.pyx"] -print("CYTHON: " + str(cython_files)) - -if not CYLON_PREFIX: - raise ValueError("CYLON_PREFIX not set") - std_version = '-std=c++14' -additional_compile_args = [std_version, - '-DARROW_METADATA_V4 -DNEED_EXCLUSIVE_SCAN'] +additional_compile_args = [std_version, '-DARROW_METADATA_V4 -DNEED_EXCLUSIVE_SCAN'] arrow_lib_include_dir = None arrow_library_directory = None if not ARROW_PREFIX: @@ -106,12 +107,12 @@ mpi_library_dir = os.popen("mpicc --showme:libdirs").read().strip().split(' ') else: import mpi4py + mpi_library_dir = [mpi4py.get_config()['library_dirs']] library_directories.extend(mpi_library_dir) -print("Lib dirs:", library_directories) +libraries = ["arrow", "cylon", "glog"] -libraries = ["arrow", "cylon", "glog"] # todo glogd was added temporarily cylon_include_dir = os.path.abspath(os.path.join(__file__, "../../..", "cpp", "src")) _include_dirs = [cylon_include_dir, @@ -125,10 +126,34 @@ mpi_include_dir = os.popen("mpicc --showme:incdirs").read().strip().split(' ') else: import mpi4py + mpi_include_dir = [mpi4py.get_config()['include_dirs']] _include_dirs.extend(mpi_include_dir) -print("Include dirs:", _include_dirs) +macros = [] +# compile_time_env serves as preprocessor macros. ref: https://github.com/cython/cython/issues/2488 +compile_time_env = {'CYTHON_GLOO': False, 'CYTHON_UCC': False} +if CYLON_GLOO: + libraries.append('gloo') + library_directories.append(os.path.join(GLOO_PREFIX, 'lib')) + _include_dirs.append(os.path.join(GLOO_PREFIX, 'include')) + macros.append(('GLOO_USE_MPI', '1')) + macros.append(('BUILD_CYLON_GLOO', '1')) + compile_time_env['CYTHON_GLOO'] = True + +if CYLON_UCC and CYLON_UCX: + libraries.append('ucc') + library_directories.append(os.path.join(UCC_PREFIX, 'lib')) + _include_dirs.append(os.path.join(UCC_PREFIX, 'include')) + macros.append(('BUILD_CYLON_UCX', '1')) + macros.append(('BUILD_CYLON_UCC', '1')) + compile_time_env['CYTHON_UCC'] = True + +print('Libraries :', libraries) +print("Lib dirs :", library_directories) +print("Include dirs :", _include_dirs) +print("Macros :", macros) +print("Compile time env:", compile_time_env) # Adopted the Cudf Python Build format # https://github.com/rapidsai/cudf @@ -136,15 +161,15 @@ extensions = [ Extension( "*", - sources=cython_files, + sources=["pycylon/*/*.pyx"], include_dirs=_include_dirs, language='c++', extra_compile_args=extra_compile_args, extra_link_args=extra_link_args, libraries=libraries, library_dirs=library_directories, - ) -] + define_macros=macros, + )] compiler_directives = {"language_level": 3, "embedsignature": True} packages = find_packages(include=["pycylon", "pycylon.*"]) @@ -165,10 +190,9 @@ compiler_directives=dict( profile=False, language_level=3, embedsignature=True ), + compile_time_env=compile_time_env, ), - package_data=dict.fromkeys( - find_packages(include=["pycylon*"]), ["*.pxd"], - ), + package_data=dict.fromkeys(find_packages(include=["pycylon*"]), ["*.pxd"], ), python_requires='>=3.7', install_requires=[ 'numpy', diff --git a/python/pycylon/test/test_status.py b/python/pycylon/test/test_status.py index cd7f5fff3..1422c3044 100644 --- a/python/pycylon/test/test_status.py +++ b/python/pycylon/test/test_status.py @@ -1,16 +1,16 @@ ## - # Licensed under the Apache License, Version 2.0 (the "License"); - # you may not use this file except in compliance with the License. - # You may obtain a copy of the License at - # - # http://www.apache.org/licenses/LICENSE-2.0 - # - # Unless required by applicable law or agreed to in writing, software - # distributed under the License is distributed on an "AS IS" BASIS, - # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - # See the License for the specific language governing permissions and - # limitations under the License. - ## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## ''' Run test @@ -20,59 +20,30 @@ from pycylon.commons import Code from pycylon.commons import Status + def test_status(): - # Case I - # Input From Abstract Python API: code = -1, msg = None and Code = None msg = b"a" - code = -1 - _code = Code.IOError - s = Status(code, msg, _code) + code = Code.IOError + s = Status(code, msg) assert (s.get_code() == Code.IOError) assert (s.get_msg() == msg.decode()) - assert (s.is_ok() == False) + assert (s.is_ok() is False) - # Case II - # Input From Abstract Python API: code = 1, msg = None and Code = None msg = b"" - code = 1 - _code = -1 - s = Status(code, msg, _code) - - assert (s.get_code() == 1) - assert (s.get_msg() == msg.decode()) - assert (s.is_ok() == False) - - # Case III - # Input From Abstract Python API: code = -1, msg = "a" and Code = Code.OK + code = Code.IOError + s = Status(code, msg) - msg = b"a" - code = -1 - _code = Code.OK - s = Status(code, msg, _code) + assert (s.get_code() == Code.IOError) + assert (len(s.get_msg()) == 0) + assert (s.is_ok() is False) + s = Status(Code.OK) assert (s.get_code() == Code.OK) - assert (s.get_msg() == msg.decode()) - assert (s.is_ok() == True) + assert (len(s.get_msg()) == 0) + assert (s.is_ok()) - # Case IV - # Input From Abstract Python API: code = 0, msg = "a" and Code = None - msg = b"a" - code = 0 - _code = -1 - s = Status(code, msg, _code) - - assert (s.get_code() == code) - assert (s.get_msg() == msg.decode()) - assert (s.is_ok() == True) - - # Case V - # Input From Abstract Python API: code = 1, msg = None and Code = None - msg = b"" - code = -1 - _code = Code.IOError - s = Status(code, msg, _code) - - assert (s.get_code() == Code.IOError) - assert (s.get_msg() == msg.decode()) - assert (s.is_ok() == False) + s = Status() + assert (s.get_code() == Code.OK) + assert (len(s.get_msg()) == 0) + assert (s.is_ok())