Skip to content

Commit

Permalink
Ucc integration (#591)
Browse files Browse the repository at this point in the history
* initial commit

* allreduce

* start on allgather

* use wait logic in ucc tests

* add allgather vector example

* improve ucc ctx/team initialization

* fixed missing data issue in allgatherv

* bcast

* allgather

* add a test for implemented operators

* make ucc team with one process works

* addressed blocking/non-blocking in ucc functions

* removing depricated method

* enable tests for ucx/ucc

* adding Make method to communicator API

* minor cmake change

* avoid copy by placing count vector outside loop

* fix double free issue

* using BUILD_CYLON_UCC def

* fixing openmpi at 4.1.2

* fixing openmpi at 4.1.3 ha1ae619_105 build

* fixing openmpi at 4.1.3 ha1ae619_105 build

* fixing double free error

* fixing ucx tests

* fixing ucx tests

* fixing gcylon

* fixing java build

* minor fix

* fix warnings

* fixing ucx world size 1

* changing branch

* review changes

Co-authored-by: niranda perera <[email protected]>
  • Loading branch information
kaiyingshan and nirandaperera authored Jul 14, 2022
1 parent 61b4a82 commit 4dd359f
Show file tree
Hide file tree
Showing 57 changed files with 1,895 additions and 289 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/conda-cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
# 20.04 supports CUDA 11.0+
- os: ubuntu-20.04
gcc: 9
ucc: "v1.0.0"
ucc: "master"

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -65,3 +65,6 @@ jobs:

- name: Run pytest
run: python build.py -ipath="$HOME/cylon/install" --pytest

- name: Build Java
run: python build.py -ipath="$HOME/cylon/install" --java
83 changes: 53 additions & 30 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,25 @@
logger = logging.getLogger("cylon_build")
logger.setLevel(logging.INFO)


def check_status(status, task):
if status != 0:
logger.error(f'{task} failed with a non zero exit code ({status}). '
f'Cylon build is terminating.')
sys.exit(1)
else:
logger.info(f'{task} completed successfully')


def check_conda_prefix():
conda_prefix = os.getenv('CONDA_PREFIX')
if not conda_prefix:
logger.error("The build should be in a conda environment")
sys.exit(1)

return conda_prefix


parser = argparse.ArgumentParser()

# C++ Build
Expand Down Expand Up @@ -54,10 +73,15 @@
python_build.add_argument(
"--pytest", action='store_true', help='Run Python test suite')

# Java build
java_build = parser.add_argument_group("JCylon")
java_build.add_argument("--java", action='store_true',
help='Build JCylon')

# Docker build
java_build = parser.add_argument_group("Docker")
java_build.add_argument("--docker", action='store_true',
help='Build Cylon Docker images locally')
docker_build = parser.add_argument_group("Docker")
docker_build.add_argument("--docker", action='store_true',
help='Build Cylon Docker images locally')

# Paths
parser.add_argument("-bpath", help='Build directory',
Expand All @@ -79,6 +103,7 @@ def on_off(arg):
not args.release and not args.debug)) else "Debug"
CPP_SOURCE_DIR = str(Path(args.root, 'cpp'))
PYTHON_SOURCE_DIR = Path(args.root, 'python', 'pycylon')
JAVA_SOURCE_DIR = Path(args.root, 'java')
RUN_CPP_TESTS = args.test
RUN_PYTHON_TESTS = args.pytest
CMAKE_FLAGS = args.cmake_flags
Expand All @@ -89,11 +114,14 @@ def on_off(arg):
# arrow build expects /s even on windows
BUILD_PYTHON = args.python

# java
BUILD_JAVA = args.java

# docker
BUILD_DOCKER = args.docker

BUILD_DIR = str(Path(args.bpath))
INSTALL_DIR = str(Path(args.ipath)) if args.ipath else ""
INSTALL_DIR = str(Path(args.ipath or check_conda_prefix()))

OS_NAME = platform.system() # Linux, Darwin or Windows

Expand Down Expand Up @@ -159,30 +187,10 @@ def print_line():
os.makedirs(BUILD_DIR)


def check_status(status, task):
if status != 0:
logger.error(f'{task} failed with a non zero exit code ({status}). '
f'Cylon build is terminating.')
sys.exit(1)
else:
logger.info(f'{task} completed successfully')


def build_cpp():
if not BUILD_CPP:
return

CONDA_PREFIX = os.getenv('CONDA_PREFIX')
if args.ipath:
install_prefix = INSTALL_DIR
else:
if CONDA_PREFIX:
install_prefix = CONDA_PREFIX
else:
logger.error(
"install prefix can not be inferred. The build should be in a conda environment")
return

win_cmake_args = "-A x64" if os.name == 'nt' else ""
verb = '-DCMAKE_VERBOSE_MAKEFILE:BOOL=ON' if args.verbose else ''

Expand All @@ -191,7 +199,7 @@ def build_cpp():
f"-DCYLON_WITH_TEST={on_off(RUN_CPP_TESTS)} " \
f"-DARROW_BUILD_TYPE=SYSTEM " \
f"{CPPLINT_COMMAND} " \
f"-DCMAKE_INSTALL_PREFIX={install_prefix} " \
f"-DCMAKE_INSTALL_PREFIX={INSTALL_DIR} " \
f"{verb} {CMAKE_FLAGS} {CPP_SOURCE_DIR}"

logger.info(f"Generate command: {cmake_command}")
Expand All @@ -203,7 +211,7 @@ def build_cpp():
res = subprocess.call(cmake_build_command, cwd=BUILD_DIR, shell=True)
check_status(res, "C++ cmake build")

cmake_install_command = f'cmake --install . --prefix {install_prefix}'
cmake_install_command = f'cmake --install . --prefix {INSTALL_DIR}'
logger.info(f"Install command: {cmake_install_command}")
res = subprocess.call(cmake_install_command, cwd=BUILD_DIR, shell=True)
check_status(res, "C++ cmake install")
Expand All @@ -220,11 +228,16 @@ def build_cpp():
def build_docker():
if not BUILD_DOCKER:
return
logger.error("Docker build not implemented in this script")
sys.exit(1)


def python_test():
if not RUN_PYTHON_TESTS:
return

check_conda_prefix()

env = os.environ
if args.ipath:
if OS_NAME == 'Linux':
Expand Down Expand Up @@ -264,10 +277,7 @@ def build_python():
print_line()
logger.info("Building Python")

conda_prefix = os.getenv('CONDA_PREFIX')
if not conda_prefix:
logger.error("The build should be in a conda environment")
return 1
conda_prefix = check_conda_prefix()

python_build_command = f'{PYTHON_EXEC} setup.py install --force'
env = os.environ
Expand All @@ -290,6 +300,19 @@ def build_python():
check_status(res.returncode, "PyCylon build")


def build_java():
if not BUILD_JAVA:
return

conda_prefix = check_conda_prefix()

mvn_cmd = f"mvn clean install -Dcylon.core.libs={INSTALL_DIR}/lib " \
f"-Dcylon.arrow.dir={conda_prefix}"
res = subprocess.run(mvn_cmd, shell=True, cwd=JAVA_SOURCE_DIR)
check_status(res.returncode, "JCylon build")


build_cpp()
build_python()
python_test()
build_java()
2 changes: 1 addition & 1 deletion conda/environments/cylon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ dependencies:
- cmake>=3.17
- pyarrow=5.0.0
- glog=0.5.0
- openmpi>=4.1.3
- openmpi=4.1.3=ha1ae619_105
- ucx>=1.12.1
- cython>=0.29,<0.30
- numpy>=1.20
Expand Down
2 changes: 1 addition & 1 deletion conda/environments/gcylon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies:
- cudf=21.10.01
- cudatoolkit=11.2
- glog=0.5.0
- openmpi>=4.1.3
- openmpi=4.1.3=ha1ae619_105
- ucx>=1.12.1
- numpy>=1.20
- pandas>=1.0
Expand Down
3 changes: 2 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ if (CYLON_UCX)
message(STATUS "UCC libs: ${UCC_LIBRARIES}")

include_directories(SYSTEM ${UCC_INCLUDE_DIRS})
link_directories(${UCC_LIBRARIES})
link_directories(${UCC_LIBRARIES} ${UCC_LIBRARIES}/ucc)
endif (CYLON_UCC)
endif (CYLON_UCX)

Expand Down Expand Up @@ -372,5 +372,6 @@ if (CYLON_WITH_TEST)
set(CYLON_CATCH2_HEADER_HASH 27da57c7a06d09be8dd81fab7246b79e7892b6ae7e4e49ba8631f1d5a955e3fc)

enable_testing()
set(CMAKE_CTEST_ARGUMENTS "--output-on-failure")
add_subdirectory(test)
endif ()
10 changes: 10 additions & 0 deletions cpp/src/cylon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@ if (CYLON_UCX)
net/ucx/ucx_operations.hpp
net/ucx/ucx_operations.cpp
)
if (CYLON_UCC)
set(UCC_CYLON_FILES
net/ucc/ucc_operations.hpp
net/ucc/ucc_operations.cpp)
else ()
set(UCC_CYLON_FILES)
endif (CYLON_UCC)
else (CYLON_UCX)
set(UCX_CYLON_FILES)
set(UCC_CYLON_FILES)
endif (CYLON_UCX)

if (CYLON_GLOO)
Expand All @@ -47,6 +55,7 @@ endif (CYLON_GLOO)

add_library(cylon SHARED
${UCX_CYLON_FILES}
${UCC_CYLON_FILES}
${CYLON_GLOO_FILES}
arrow/arrow_all_to_all.cpp
arrow/arrow_all_to_all.hpp
Expand Down Expand Up @@ -116,6 +125,7 @@ add_library(cylon SHARED
net/channel.hpp
net/comm_operations.hpp
net/comm_type.hpp
net/communicator.cpp
net/communicator.hpp
net/mpi/mpi_channel.cpp
net/mpi/mpi_channel.hpp
Expand Down
18 changes: 12 additions & 6 deletions cpp/src/cylon/compute/scalar_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,14 @@ struct VarianceKernelImpl : public ScalarAggregateKernel {
arrow::MemoryPool *pool_ = nullptr;
};

bool is_all_valid(const std::shared_ptr<net::Communicator> &comm,
const std::shared_ptr<arrow::Array> &values) {
Status is_all_valid(const std::shared_ptr<net::Communicator> &comm,
const std::shared_ptr<arrow::Array> &values,
bool *res) {
const auto &null_count = Scalar::Make(std::make_shared<arrow::Int64Scalar>(values->null_count()));
std::shared_ptr<Scalar> out;
const auto &status = comm->AllReduce(null_count, net::SUM, &out);
return status.is_ok() && std::static_pointer_cast<arrow::Int64Scalar>(out->data())->value == 0;
RETURN_CYLON_STATUS_IF_FAILED(comm->AllReduce(null_count, net::SUM, &out));
*res = std::static_pointer_cast<arrow::Int64Scalar>(out->data())->value == 0;
return Status::OK();
}

Status ScalarAggregate(const std::shared_ptr<CylonContext> &ctx,
Expand All @@ -292,7 +294,9 @@ Status ScalarAggregate(const std::shared_ptr<CylonContext> &ctx,
if (ctx->GetWorldSize() > 1) {
const auto &comm = ctx->GetCommunicator();

if (!is_all_valid(comm, combined_results)) {
bool all_valid;
RETURN_CYLON_STATUS_IF_FAILED(is_all_valid(comm, combined_results, &all_valid));
if (!all_valid) {
// combined_results array has nulls. So, return Null scalar
*result = arrow::MakeNullScalar(values->type());
return Status::OK();
Expand Down Expand Up @@ -422,7 +426,9 @@ Status ScalarAggregate(const std::shared_ptr<CylonContext> &ctx,
// all reduce combined_results
const auto &comm = ctx->GetCommunicator();

if (!is_all_valid(comm, combined_results)) {
bool all_valid;
RETURN_CYLON_STATUS_IF_FAILED(is_all_valid(comm, combined_results, &all_valid));
if (!all_valid) {
// combined_results array has nulls. So, return Null scalar
CYLON_ASSIGN_OR_RAISE(*result,
arrow::MakeArrayOfNull(promoted_type, table_->num_columns(), pool))
Expand Down
41 changes: 12 additions & 29 deletions cpp/src/cylon/ctx/cylon_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#endif

#ifdef BUILD_CYLON_GLOO
#include <cylon/net/gloo/gloo_communicator.hpp>
#include "cylon/net/gloo/gloo_communicator.hpp"
#endif

namespace cylon {
Expand All @@ -38,55 +38,38 @@ CylonContext::CylonContext(bool distributed) {
this->is_distributed = distributed;
}

std::shared_ptr<CylonContext> CylonContext::InitDistributed(const std::shared_ptr<cylon::net::CommConfig> &config) {
if (config->Type() == net::CommType::MPI) {
auto ctx = std::make_shared<CylonContext>(true);
ctx->communicator = std::make_shared<net::MPICommunicator>(&ctx);
ctx->communicator->Init(config);
return ctx;
}

#ifdef BUILD_CYLON_UCX
else if (config->Type() == net::CommType::UCX) {
auto ctx = std::make_shared<CylonContext>(true);
ctx->communicator = std::make_shared<net::UCXCommunicator>(&ctx);
ctx->communicator->Init(config);
return ctx;
}
#endif
else {
throw "Unsupported communication type";
}
return nullptr;
}

Status CylonContext::InitDistributed(const std::shared_ptr<cylon::net::CommConfig> &config,
std::shared_ptr<CylonContext> *ctx) {
switch (config->Type()) {
case net::LOCAL: return {Code::Invalid, "InitDistributed called on Local communication"};

case net::MPI: {
*ctx = std::make_shared<CylonContext>(true);
(*ctx)->communicator = std::make_shared<net::MPICommunicator>(ctx);
return (*ctx)->communicator->Init(config);
auto pool = (*ctx)->GetMemoryPool();
return net::MPICommunicator::Make(config, pool, &(*ctx)->communicator);
}

case net::UCX: {
#ifdef BUILD_CYLON_UCX
*ctx = std::make_shared<CylonContext>(true);
(*ctx)->communicator = std::make_shared<net::UCXCommunicator>(ctx);
return (*ctx)->communicator->Init(config);
auto pool = (*ctx)->GetMemoryPool();
#ifdef BUILD_CYLON_UCC
return net::UCXUCCCommunicator::Make(config, pool, &(*ctx)->communicator);
#else
return net::UCXCommunicator::Make(config, pool, &(*ctx)->communicator);
#endif
#else
return {Code::NotImplemented, "UCX communication not implemented"};
#endif
}

case net::TCP:return {Code::NotImplemented, "TCP communication not implemented"};

case net::GLOO: {
#ifdef BUILD_CYLON_GLOO
*ctx = std::make_shared<CylonContext>(true);
(*ctx)->communicator = std::make_shared<net::GlooCommunicator>(ctx);
return (*ctx)->communicator->Init(config);
auto pool = (*ctx)->GetMemoryPool();
return net::GlooCommunicator::Make(config, pool, &(*ctx)->communicator);
#else
return {Code::NotImplemented, "Gloo communication not implemented"};
#endif
Expand Down
1 change: 0 additions & 1 deletion cpp/src/cylon/ctx/cylon_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class CylonContext {
* @param <cylon::net::CommConfig*> config Configuration to be passed on to the cylon::net::Communicator
* @return <cylon::CylonContext*>
*/
static std::shared_ptr<CylonContext> InitDistributed(const std::shared_ptr<cylon::net::CommConfig> &config);
static Status InitDistributed(const std::shared_ptr<cylon::net::CommConfig> &config,
std::shared_ptr<CylonContext> *ctx);

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/cylon/net/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ cylon_install_all_headers("cylon/net")
add_subdirectory(ops)
add_subdirectory(mpi)
add_subdirectory(ucx)
add_subdirectory(gloo)
add_subdirectory(gloo)
add_subdirectory(ucc)
Loading

0 comments on commit 4dd359f

Please sign in to comment.