Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ucc integration #591

Merged
merged 38 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
a54c345
initial commit
kaiyingshan Jun 20, 2022
5de0358
allreduce
kaiyingshan Jun 22, 2022
d9af4fd
start on allgather
kaiyingshan Jun 23, 2022
797c529
use wait logic in ucc tests
kaiyingshan Jun 23, 2022
c70eda1
add allgather vector example
kaiyingshan Jul 1, 2022
c480440
improve ucc ctx/team initialization
kaiyingshan Jul 1, 2022
bb5d9d0
fixed missing data issue in allgatherv
kaiyingshan Jul 1, 2022
26a526f
bcast
kaiyingshan Jul 3, 2022
570886b
allgather
kaiyingshan Jul 4, 2022
6020b59
add a test for implemented operators
kaiyingshan Jul 5, 2022
6f088cc
make ucc team with one process works
kaiyingshan Jul 5, 2022
af4ac43
addressed blocking/non-blocking in ucc functions
kaiyingshan Jul 5, 2022
7e62805
removing depricated method
nirandaperera Jul 7, 2022
2f37d4d
enable tests for ucx/ucc
kaiyingshan Jul 7, 2022
80ee656
adding Make method to communicator API
nirandaperera Jul 8, 2022
9482238
Merge branch 'ucc-integration' of https://github.com/cylondata/cylon …
nirandaperera Jul 8, 2022
450b114
minor cmake change
nirandaperera Jul 8, 2022
813b184
avoid copy by placing count vector outside loop
kaiyingshan Jul 9, 2022
942a5f4
Merge branch 'ucc-integration' of https://github.com/cylondata/cylon …
kaiyingshan Jul 9, 2022
8b4baea
fix double free issue
kaiyingshan Jul 11, 2022
01562e2
using BUILD_CYLON_UCC def
nirandaperera Jul 11, 2022
d78ad01
Merge remote-tracking branch 'origin/ucc-integration' into ucc-integr…
nirandaperera Jul 11, 2022
1d10796
fixing openmpi at 4.1.2
nirandaperera Jul 11, 2022
cfb47e8
fixing openmpi at 4.1.3 ha1ae619_105 build
nirandaperera Jul 11, 2022
caefb81
fixing openmpi at 4.1.3 ha1ae619_105 build
nirandaperera Jul 11, 2022
eedf09b
fixing double free error
nirandaperera Jul 12, 2022
72da6af
fixing ucx tests
nirandaperera Jul 12, 2022
3b34219
fixing ucx tests
nirandaperera Jul 12, 2022
747735a
fixing gcylon
nirandaperera Jul 13, 2022
71bbe55
fixing java build
nirandaperera Jul 13, 2022
fcdbe14
minor fix
nirandaperera Jul 13, 2022
76229d6
fix warnings
kaiyingshan Jul 13, 2022
65eb44f
Merge branch 'ucc-integration' of https://github.com/cylondata/cylon …
kaiyingshan Jul 13, 2022
d2c2cce
fixing ucx world size 1
nirandaperera Jul 13, 2022
830853c
Merge branch 'ucc-integration' of https://github.com/cylondata/cylon …
nirandaperera Jul 13, 2022
610585b
changing branch
nirandaperera Jul 13, 2022
9a7e9aa
review changes
nirandaperera Jul 13, 2022
eb73788
resolve conflict
kaiyingshan Jul 14, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/conda-cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
# 20.04 supports CUDA 11.0+
- os: ubuntu-20.04
gcc: 9
ucc: "v1.0.0"
ucc: "master"

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -65,3 +65,6 @@ jobs:

- name: Run pytest
run: python build.py -ipath="$HOME/cylon/install" --pytest

- name: Build Java
run: python build.py -ipath="$HOME/cylon/install" --java
83 changes: 53 additions & 30 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,25 @@
logger = logging.getLogger("cylon_build")
logger.setLevel(logging.INFO)


def check_status(status, task):
if status != 0:
logger.error(f'{task} failed with a non zero exit code ({status}). '
f'Cylon build is terminating.')
sys.exit(1)
else:
logger.info(f'{task} completed successfully')


def check_conda_prefix():
conda_prefix = os.getenv('CONDA_PREFIX')
if not conda_prefix:
logger.error("The build should be in a conda environment")
sys.exit(1)

return conda_prefix


parser = argparse.ArgumentParser()

# C++ Build
Expand Down Expand Up @@ -54,10 +73,15 @@
python_build.add_argument(
"--pytest", action='store_true', help='Run Python test suite')

# Java build
java_build = parser.add_argument_group("JCylon")
java_build.add_argument("--java", action='store_true',
help='Build JCylon')

# Docker build
java_build = parser.add_argument_group("Docker")
java_build.add_argument("--docker", action='store_true',
help='Build Cylon Docker images locally')
docker_build = parser.add_argument_group("Docker")
docker_build.add_argument("--docker", action='store_true',
help='Build Cylon Docker images locally')

# Paths
parser.add_argument("-bpath", help='Build directory',
Expand All @@ -79,6 +103,7 @@ def on_off(arg):
not args.release and not args.debug)) else "Debug"
CPP_SOURCE_DIR = str(Path(args.root, 'cpp'))
PYTHON_SOURCE_DIR = Path(args.root, 'python', 'pycylon')
JAVA_SOURCE_DIR = Path(args.root, 'java')
RUN_CPP_TESTS = args.test
RUN_PYTHON_TESTS = args.pytest
CMAKE_FLAGS = args.cmake_flags
Expand All @@ -89,11 +114,14 @@ def on_off(arg):
# arrow build expects /s even on windows
BUILD_PYTHON = args.python

# java
BUILD_JAVA = args.java

# docker
BUILD_DOCKER = args.docker

BUILD_DIR = str(Path(args.bpath))
INSTALL_DIR = str(Path(args.ipath)) if args.ipath else ""
INSTALL_DIR = str(Path(args.ipath or check_conda_prefix()))

OS_NAME = platform.system() # Linux, Darwin or Windows

Expand Down Expand Up @@ -159,30 +187,10 @@ def print_line():
os.makedirs(BUILD_DIR)


def check_status(status, task):
if status != 0:
logger.error(f'{task} failed with a non zero exit code ({status}). '
f'Cylon build is terminating.')
sys.exit(1)
else:
logger.info(f'{task} completed successfully')


def build_cpp():
if not BUILD_CPP:
return

CONDA_PREFIX = os.getenv('CONDA_PREFIX')
if args.ipath:
install_prefix = INSTALL_DIR
else:
if CONDA_PREFIX:
install_prefix = CONDA_PREFIX
else:
logger.error(
"install prefix can not be inferred. The build should be in a conda environment")
return

win_cmake_args = "-A x64" if os.name == 'nt' else ""
verb = '-DCMAKE_VERBOSE_MAKEFILE:BOOL=ON' if args.verbose else ''

Expand All @@ -191,7 +199,7 @@ def build_cpp():
f"-DCYLON_WITH_TEST={on_off(RUN_CPP_TESTS)} " \
f"-DARROW_BUILD_TYPE=SYSTEM " \
f"{CPPLINT_COMMAND} " \
f"-DCMAKE_INSTALL_PREFIX={install_prefix} " \
f"-DCMAKE_INSTALL_PREFIX={INSTALL_DIR} " \
f"{verb} {CMAKE_FLAGS} {CPP_SOURCE_DIR}"

logger.info(f"Generate command: {cmake_command}")
Expand All @@ -203,7 +211,7 @@ def build_cpp():
res = subprocess.call(cmake_build_command, cwd=BUILD_DIR, shell=True)
check_status(res, "C++ cmake build")

cmake_install_command = f'cmake --install . --prefix {install_prefix}'
cmake_install_command = f'cmake --install . --prefix {INSTALL_DIR}'
logger.info(f"Install command: {cmake_install_command}")
res = subprocess.call(cmake_install_command, cwd=BUILD_DIR, shell=True)
check_status(res, "C++ cmake install")
Expand All @@ -220,11 +228,16 @@ def build_cpp():
def build_docker():
if not BUILD_DOCKER:
return
logger.error("Docker build not implemented in this script")
sys.exit(1)


def python_test():
if not RUN_PYTHON_TESTS:
return

check_conda_prefix()

env = os.environ
if args.ipath:
if OS_NAME == 'Linux':
Expand Down Expand Up @@ -264,10 +277,7 @@ def build_python():
print_line()
logger.info("Building Python")

conda_prefix = os.getenv('CONDA_PREFIX')
if not conda_prefix:
logger.error("The build should be in a conda environment")
return 1
conda_prefix = check_conda_prefix()

python_build_command = f'{PYTHON_EXEC} setup.py install --force'
env = os.environ
Expand All @@ -290,6 +300,19 @@ def build_python():
check_status(res.returncode, "PyCylon build")


def build_java():
if not BUILD_JAVA:
return

conda_prefix = check_conda_prefix()

mvn_cmd = f"mvn clean install -Dcylon.core.libs={INSTALL_DIR}/lib " \
f"-Dcylon.arrow.dir={conda_prefix}"
res = subprocess.run(mvn_cmd, shell=True, cwd=JAVA_SOURCE_DIR)
check_status(res.returncode, "JCylon build")


build_cpp()
build_python()
python_test()
build_java()
2 changes: 1 addition & 1 deletion conda/environments/cylon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ dependencies:
- cmake>=3.17
- pyarrow=5.0.0
- glog=0.5.0
- openmpi>=4.1.3
- openmpi=4.1.3=ha1ae619_105
- ucx>=1.12.1
- cython>=0.29,<0.30
- numpy>=1.20
Expand Down
2 changes: 1 addition & 1 deletion conda/environments/gcylon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies:
- cudf=21.10.01
- cudatoolkit=11.2
- glog=0.5.0
- openmpi>=4.1.3
- openmpi=4.1.3=ha1ae619_105
- ucx>=1.12.1
- numpy>=1.20
- pandas>=1.0
Expand Down
3 changes: 2 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ if (CYLON_UCX)
message(STATUS "UCC libs: ${UCC_LIBRARIES}")

include_directories(SYSTEM ${UCC_INCLUDE_DIRS})
link_directories(${UCC_LIBRARIES})
link_directories(${UCC_LIBRARIES} ${UCC_LIBRARIES}/ucc)
endif (CYLON_UCC)
endif (CYLON_UCX)

Expand Down Expand Up @@ -372,5 +372,6 @@ if (CYLON_WITH_TEST)
set(CYLON_CATCH2_HEADER_HASH 27da57c7a06d09be8dd81fab7246b79e7892b6ae7e4e49ba8631f1d5a955e3fc)

enable_testing()
set(CMAKE_CTEST_ARGUMENTS "--output-on-failure")
add_subdirectory(test)
endif ()
10 changes: 10 additions & 0 deletions cpp/src/cylon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@ if (CYLON_UCX)
net/ucx/ucx_operations.hpp
net/ucx/ucx_operations.cpp
)
if (CYLON_UCC)
set(UCC_CYLON_FILES
net/ucc/ucc_operations.hpp
net/ucc/ucc_operations.cpp)
else ()
set(UCC_CYLON_FILES)
endif (CYLON_UCC)
else (CYLON_UCX)
set(UCX_CYLON_FILES)
set(UCC_CYLON_FILES)
endif (CYLON_UCX)

if (CYLON_GLOO)
Expand All @@ -47,6 +55,7 @@ endif (CYLON_GLOO)

add_library(cylon SHARED
${UCX_CYLON_FILES}
${UCC_CYLON_FILES}
${CYLON_GLOO_FILES}
arrow/arrow_all_to_all.cpp
arrow/arrow_all_to_all.hpp
Expand Down Expand Up @@ -116,6 +125,7 @@ add_library(cylon SHARED
net/channel.hpp
net/comm_operations.hpp
net/comm_type.hpp
net/communicator.cpp
net/communicator.hpp
net/mpi/mpi_channel.cpp
net/mpi/mpi_channel.hpp
Expand Down
18 changes: 12 additions & 6 deletions cpp/src/cylon/compute/scalar_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,14 @@ struct VarianceKernelImpl : public ScalarAggregateKernel {
arrow::MemoryPool *pool_ = nullptr;
};

bool is_all_valid(const std::shared_ptr<net::Communicator> &comm,
const std::shared_ptr<arrow::Array> &values) {
Status is_all_valid(const std::shared_ptr<net::Communicator> &comm,
const std::shared_ptr<arrow::Array> &values,
bool *res) {
const auto &null_count = Scalar::Make(std::make_shared<arrow::Int64Scalar>(values->null_count()));
std::shared_ptr<Scalar> out;
const auto &status = comm->AllReduce(null_count, net::SUM, &out);
return status.is_ok() && std::static_pointer_cast<arrow::Int64Scalar>(out->data())->value == 0;
RETURN_CYLON_STATUS_IF_FAILED(comm->AllReduce(null_count, net::SUM, &out));
*res = std::static_pointer_cast<arrow::Int64Scalar>(out->data())->value == 0;
return Status::OK();
}

Status ScalarAggregate(const std::shared_ptr<CylonContext> &ctx,
Expand All @@ -292,7 +294,9 @@ Status ScalarAggregate(const std::shared_ptr<CylonContext> &ctx,
if (ctx->GetWorldSize() > 1) {
const auto &comm = ctx->GetCommunicator();

if (!is_all_valid(comm, combined_results)) {
bool all_valid;
RETURN_CYLON_STATUS_IF_FAILED(is_all_valid(comm, combined_results, &all_valid));
if (!all_valid) {
// combined_results array has nulls. So, return Null scalar
*result = arrow::MakeNullScalar(values->type());
return Status::OK();
Expand Down Expand Up @@ -422,7 +426,9 @@ Status ScalarAggregate(const std::shared_ptr<CylonContext> &ctx,
// all reduce combined_results
const auto &comm = ctx->GetCommunicator();

if (!is_all_valid(comm, combined_results)) {
bool all_valid;
RETURN_CYLON_STATUS_IF_FAILED(is_all_valid(comm, combined_results, &all_valid));
if (!all_valid) {
// combined_results array has nulls. So, return Null scalar
CYLON_ASSIGN_OR_RAISE(*result,
arrow::MakeArrayOfNull(promoted_type, table_->num_columns(), pool))
Expand Down
41 changes: 12 additions & 29 deletions cpp/src/cylon/ctx/cylon_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#endif

#ifdef BUILD_CYLON_GLOO
#include <cylon/net/gloo/gloo_communicator.hpp>
#include "cylon/net/gloo/gloo_communicator.hpp"
#endif

namespace cylon {
Expand All @@ -38,55 +38,38 @@ CylonContext::CylonContext(bool distributed) {
this->is_distributed = distributed;
}

std::shared_ptr<CylonContext> CylonContext::InitDistributed(const std::shared_ptr<cylon::net::CommConfig> &config) {
if (config->Type() == net::CommType::MPI) {
auto ctx = std::make_shared<CylonContext>(true);
ctx->communicator = std::make_shared<net::MPICommunicator>(&ctx);
ctx->communicator->Init(config);
return ctx;
}

#ifdef BUILD_CYLON_UCX
else if (config->Type() == net::CommType::UCX) {
auto ctx = std::make_shared<CylonContext>(true);
ctx->communicator = std::make_shared<net::UCXCommunicator>(&ctx);
ctx->communicator->Init(config);
return ctx;
}
#endif
else {
throw "Unsupported communication type";
}
return nullptr;
}

Status CylonContext::InitDistributed(const std::shared_ptr<cylon::net::CommConfig> &config,
std::shared_ptr<CylonContext> *ctx) {
switch (config->Type()) {
case net::LOCAL: return {Code::Invalid, "InitDistributed called on Local communication"};

case net::MPI: {
*ctx = std::make_shared<CylonContext>(true);
(*ctx)->communicator = std::make_shared<net::MPICommunicator>(ctx);
return (*ctx)->communicator->Init(config);
auto pool = (*ctx)->GetMemoryPool();
return net::MPICommunicator::Make(config, pool, &(*ctx)->communicator);
}

case net::UCX: {
#ifdef BUILD_CYLON_UCX
*ctx = std::make_shared<CylonContext>(true);
(*ctx)->communicator = std::make_shared<net::UCXCommunicator>(ctx);
return (*ctx)->communicator->Init(config);
auto pool = (*ctx)->GetMemoryPool();
#ifdef BUILD_CYLON_UCC
return net::UCXUCCCommunicator::Make(config, pool, &(*ctx)->communicator);
#else
return net::UCXCommunicator::Make(config, pool, &(*ctx)->communicator);
#endif
#else
return {Code::NotImplemented, "UCX communication not implemented"};
#endif
}

case net::TCP:return {Code::NotImplemented, "TCP communication not implemented"};

case net::GLOO: {
#ifdef BUILD_CYLON_GLOO
*ctx = std::make_shared<CylonContext>(true);
(*ctx)->communicator = std::make_shared<net::GlooCommunicator>(ctx);
return (*ctx)->communicator->Init(config);
auto pool = (*ctx)->GetMemoryPool();
return net::GlooCommunicator::Make(config, pool, &(*ctx)->communicator);
#else
return {Code::NotImplemented, "Gloo communication not implemented"};
#endif
Expand Down
1 change: 0 additions & 1 deletion cpp/src/cylon/ctx/cylon_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class CylonContext {
* @param <cylon::net::CommConfig*> config Configuration to be passed on to the cylon::net::Communicator
* @return <cylon::CylonContext*>
*/
static std::shared_ptr<CylonContext> InitDistributed(const std::shared_ptr<cylon::net::CommConfig> &config);
static Status InitDistributed(const std::shared_ptr<cylon::net::CommConfig> &config,
std::shared_ptr<CylonContext> *ctx);

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/cylon/net/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ cylon_install_all_headers("cylon/net")
add_subdirectory(ops)
add_subdirectory(mpi)
add_subdirectory(ucx)
add_subdirectory(gloo)
add_subdirectory(gloo)
add_subdirectory(ucc)
Loading