Skip to content

Commit

Permalink
Cylon Gloo integration (#576)
Browse files Browse the repository at this point in the history
* minimum working example

* minimum working example

* minor change

* gloo allgather done

* gloo gather done

* gloo bcast done

* fixing mem pool

* adding gloo allreduce

* minor refactor

* adding gloo channel

* renaming txrequest

* renaming txrequest

* adding gloochannel

* renaming txrequest.pxd

* renaming txrequest.pxd

* minor improvements to mpi alltoall code

* adding gloo channel and alltoall

* adding gloo channel and alltoall

* fixing set_op_test

* fixing groupby_test.cpp

* fixing table_op_test.cpp

* adding column serializer

* minor refactor

* adding array allgather

* adding scalar allgather

* minor import fix
  • Loading branch information
nirandaperera authored Mar 31, 2022
1 parent b2c0820 commit 2759a30
Show file tree
Hide file tree
Showing 54 changed files with 3,288 additions and 762 deletions.
30 changes: 30 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ if (UPPERCASE_CMAKE_BUILD_TYPE MATCHES "DEBUG")
elseif (UPPERCASE_CMAKE_BUILD_TYPE MATCHES "RELEASE")
message("Running on Release mode...")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3")
elseif (UPPERCASE_CMAKE_BUILD_TYPE MATCHES "RELWITHDEBINFO")
message("Running on RelWithDebInfo mode...")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -O2 -g -fsanitize=address")
else ()
message(FATAL_ERROR "Unknown CMAKE_BUILD_TYPE ${CMAKE_BUILD_TYPE}! Exiting...")
endif ()
Expand Down Expand Up @@ -234,6 +237,33 @@ if (CYLON_UCX)
message(STATUS "UCX libs: ${UCX_LIBRARIES}")
endif (CYLON_UCX)

# gloo
option(CYLON_GLOO "Build Cylon with Gloo" OFF)
if (CYLON_GLOO)
message("Cylon Gloo enabled")
add_definitions(-DBUILD_CYLON_GLOO)

if (NOT GLOO_INSTALL_PREFIX)
message(FATAL_ERROR "CYLON_GLOO is set, GLOO_INSTALL_PREFIX should also be set")
endif ()

find_package(Gloo REQUIRED CONFIG HINTS ${GLOO_INSTALL_PREFIX})
message(STATUS "gloo include dirs: ${GLOO_INCLUDE_DIRS}")
message(STATUS "gloo libs: ${GLOO_LIBRARIES}")

include_directories(SYSTEM ${GLOO_INCLUDE_DIRS})
link_directories(${GLOO_LIBRARIES})

if (EXISTS "${GLOO_INCLUDE_DIRS}/gloo/mpi/context.h")
set(GLOO_USE_MPI ON)
else ()
set(GLOO_USE_MPI OFF)
endif ()
message(STATUS "gloo use mpi: ${GLOO_USE_MPI}")
add_definitions(-DGLOO_USE_MPI)
endif()


# Arrow
if (NOT ARROW_BUILD_TYPE)
set(ARROW_BUILD_TYPE "SOURCE")
Expand Down
29 changes: 26 additions & 3 deletions cpp/src/cylon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,21 @@ else (CYLON_UCX)
set(UCX_CYLON_FILES)
endif (CYLON_UCX)

if (CYLON_GLOO)
set(CYLON_GLOO_FILES
net/gloo/gloo_channel.cpp
net/gloo/gloo_channel.hpp
net/gloo/gloo_communicator.cpp
net/gloo/gloo_communicator.hpp
net/gloo/gloo_operations.cpp
net/gloo/gloo_operations.hpp)
else (CYLON_GLOO)
set(CYLON_GLOO_FILES)
endif (CYLON_GLOO)

add_library(cylon SHARED
${UCX_CYLON_FILES}
${CYLON_GLOO_FILES}
arrow/arrow_all_to_all.cpp
arrow/arrow_all_to_all.hpp
arrow/arrow_buffer.cpp
Expand Down Expand Up @@ -115,8 +128,8 @@ add_library(cylon SHARED
net/ops/all_to_all.hpp
net/ops/gather.cpp
net/ops/bcast.cpp
net/TxRequest.cpp
net/TxRequest.hpp
net/cylon_request.cpp
net/cylon_request.hpp
ops.cpp
ops.hpp
ops/all_to_all_op.cpp
Expand Down Expand Up @@ -181,7 +194,13 @@ add_library(cylon SHARED
util/sort.hpp
util/to_string.hpp
util/uuid.cpp
util/uuid.hpp scalar.cpp scalar.hpp)
util/uuid.hpp
scalar.cpp
scalar.hpp
net/ops/base_ops.hpp
net/ops/base_ops.cpp
net/utils.cpp
net/utils.hpp)

IF(NOT MSVC)
if(APPLE)
Expand All @@ -202,6 +221,10 @@ if (CYLON_UCX)
target_link_libraries(cylon ${UCX_LIBRARIES})
endif ()

if (CYLON_GLOO)
target_link_libraries(cylon ${GLOO_LIBRARIES})
endif ()

if (PYCYLON_BUILD)
target_link_libraries(cylon ${Python3_LIBRARIES})
target_link_libraries(cylon ${ARROW_PY_LIB})
Expand Down
17 changes: 9 additions & 8 deletions cpp/src/cylon/ctx/arrow_memory_pool_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
#include <cylon/ctx/arrow_memory_pool_utils.hpp>

arrow::MemoryPool *cylon::ToArrowPool(const std::shared_ptr<cylon::CylonContext> &ctx) {
if (ctx->GetMemoryPool() == nullptr) {
return arrow::default_memory_pool();
} else {
return new ProxyMemoryPool(ctx->GetMemoryPool());
}
return ToArrowPool(ctx->GetMemoryPool());
}

arrow::MemoryPool *cylon::ToArrowPool(cylon::CylonContext *ctx) {
return ToArrowPool(ctx->GetMemoryPool());
}

arrow::MemoryPool *cylon::ToArrowPool(cylon::CylonContext* ctx) {
if (ctx->GetMemoryPool() == nullptr) {
arrow::MemoryPool *cylon::ToArrowPool(cylon::MemoryPool *pool) {
if (pool == nullptr) {
return arrow::default_memory_pool();
} else {
return new ProxyMemoryPool(ctx->GetMemoryPool());
// todo this is dangerous! return a smart pointer
return new ProxyMemoryPool(pool);
}
}
1 change: 1 addition & 0 deletions cpp/src/cylon/ctx/arrow_memory_pool_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ProxyMemoryPool : public arrow::MemoryPool {

arrow::MemoryPool *ToArrowPool(const std::shared_ptr<cylon::CylonContext> &ctx);
arrow::MemoryPool *ToArrowPool(cylon::CylonContext* ctx);
arrow::MemoryPool *ToArrowPool(MemoryPool* pool);
}

#endif //CYLON_SRC_CYLON_CTX_ARROW_MEMORY_POOL_UTILS_HPP_
27 changes: 16 additions & 11 deletions cpp/src/cylon/ctx/cylon_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
#include "cylon/net/ucx/ucx_communicator.hpp"
#endif

#ifdef BUILD_CYLON_GLOO
#include <cylon/net/gloo/gloo_communicator.hpp>
#endif

namespace cylon {

std::shared_ptr<CylonContext> CylonContext::Init() {
Expand Down Expand Up @@ -64,30 +68,31 @@ Status CylonContext::InitDistributed(const std::shared_ptr<cylon::net::CommConfi
case net::MPI: {
*ctx = std::make_shared<CylonContext>(true);
(*ctx)->communicator = std::make_shared<net::MPICommunicator>(ctx);
const auto &status = (*ctx)->communicator->Init(config);
if (!status.is_ok()) {
ctx->reset();
}
return status;
return (*ctx)->communicator->Init(config);
}

case net::UCX: {
#ifdef BUILD_CYLON_UCX
*ctx = std::make_shared<CylonContext>(true);
(*ctx)->communicator = std::make_shared<net::UCXCommunicator>(ctx);
const auto &status = (*ctx)->communicator->Init(config);
if (!status.is_ok()) {
ctx->reset();
}
return status;
return (*ctx)->communicator->Init(config);
#else
return {Code::NotImplemented, "UCX communication not implemented"};
#endif
}

case net::TCP:return {Code::NotImplemented, "TCP communication not implemented"};
case net::GLOO: {
#ifdef BUILD_CYLON_GLOO
*ctx = std::make_shared<CylonContext>(true);
(*ctx)->communicator = std::make_shared<net::GlooCommunicator>(ctx);
return (*ctx)->communicator->Init(config);
#else
return {Code::NotImplemented, "Gloo communication not implemented"};
#endif
}
}
return Status::OK();
return {Code::Invalid, "Cannot reach here!"};
}

const std::shared_ptr<net::Communicator> &CylonContext::GetCommunicator() const {
Expand Down
9 changes: 2 additions & 7 deletions cpp/src/cylon/ctx/memory_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ namespace cylon {
*/
class MemoryPool {
public:
virtual ~MemoryPool();

/// \brief EXPERIMENTAL. Create a new instance of the default MemoryPool
static std::unique_ptr<MemoryPool> CreateDefault();
virtual ~MemoryPool() = default;

/// Allocate a new memory region of at least size bytes.
///
Expand Down Expand Up @@ -57,13 +54,11 @@ class MemoryPool {
///
/// \return Maximum bytes allocated. If not known (or not implemented),
/// returns -1
virtual int64_t max_memory() const;
virtual int64_t max_memory() const = 0;

/// The name of the backend used by this MemoryPool (e.g. "system" or "jemalloc");
virtual std::string backend_name() const = 0;

protected:
MemoryPool();
};
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/cylon/net/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ cylon_install_all_headers("cylon/net")

add_subdirectory(ops)
add_subdirectory(mpi)
add_subdirectory(ucx)
add_subdirectory(ucx)
add_subdirectory(gloo)
16 changes: 10 additions & 6 deletions cpp/src/cylon/net/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
#include <vector>
#include <memory>
#include <cstring>
#include <cylon/net/TxRequest.hpp>
#include <cylon/net/cylon_request.hpp>
#include <cylon/net/buffer.hpp>

namespace cylon {

#define CYLON_CHANNEL_HEADER_SIZE 8
#define CYLON_MSG_FIN 1
#define CYLON_MSG_NOT_FIN 0

/**
* When a send is complete, this callback is called by the channel, it is the responsibility
* of the operations to register this callback
Expand All @@ -31,9 +35,9 @@ class ChannelSendCallback {
public:
virtual ~ChannelSendCallback() = default;

virtual void sendComplete(std::shared_ptr<TxRequest> request) = 0;
virtual void sendComplete(std::shared_ptr<CylonRequest> request) = 0;

virtual void sendFinishComplete(std::shared_ptr<TxRequest> request) = 0;
virtual void sendFinishComplete(std::shared_ptr<CylonRequest> request) = 0;
};

/**
Expand Down Expand Up @@ -63,20 +67,20 @@ class Channel {
* @param receives these are the workers we are going to receive from
*/
virtual void init(int edge, const std::vector<int> &receives, const std::vector<int> &sendIds,
ChannelReceiveCallback *rcv, ChannelSendCallback *send, Allocator *alloc) = 0;
ChannelReceiveCallback *rcv, ChannelSendCallback *send, Allocator *alloc) = 0;
/**
* Send the request
* @param request the request containing buffer, destination etc
* @return if the request is accepted to be sent
*/
virtual int send(std::shared_ptr<TxRequest> request) = 0;
virtual int send(std::shared_ptr<CylonRequest> request) = 0;

/**
* Inform the finish to the target
* @param request the request
* @return -1 if not accepted, 1 if accepted
*/
virtual int sendFin(std::shared_ptr<TxRequest> request) = 0;
virtual int sendFin(std::shared_ptr<CylonRequest> request) = 0;

/**
* This method needs to be called to progress the send
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/cylon/net/comm_type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
namespace cylon {
namespace net {
enum CommType {
LOCAL = 0, MPI, TCP, UCX
LOCAL = 0,
MPI = 1,
TCP = 2,
UCX = 3,
GLOO = 4
};
} // namespace net
} // namespace cylon
Expand Down
14 changes: 13 additions & 1 deletion cpp/src/cylon/net/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Communicator {

virtual Status Bcast(std::shared_ptr<Table> *table, int bcast_root) const = 0;

/* Array communicationss */
/* Array communications */

/**
* Allreduce values at every index on `values`.
Expand All @@ -65,12 +65,24 @@ class Communicator {
net::ReduceOp reduce_op,
std::shared_ptr<Column> *output) const = 0;

/**
* Allgather `values`
* @param values
* @param output
* @return
*/
virtual Status Allgather(const std::shared_ptr<Column> &values,
std::vector<std::shared_ptr<Column>> *output) const = 0;

/* Scalar communications */

virtual Status AllReduce(const std::shared_ptr<Scalar> &value,
net::ReduceOp reduce_op,
std::shared_ptr<Scalar> *output) const = 0;

virtual Status Allgather(const std::shared_ptr<Scalar> &value,
std::shared_ptr<Column> *output) const = 0;

protected:
int rank = -1;
int world_size = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,32 @@
#include <string>
#include <iostream>

#include <cylon/net/TxRequest.hpp>
#include <cylon/net/cylon_request.hpp>
#include <cylon/util/builtins.hpp>

cylon::TxRequest::TxRequest(int tgt) {
cylon::CylonRequest::CylonRequest(int tgt) {
target = tgt;
}

cylon::TxRequest::TxRequest(int tgt, const void *buf, int len) {
cylon::CylonRequest::CylonRequest(int tgt, const void *buf, int len) {
target = tgt;
buffer = buf;
length = len;
}

cylon::TxRequest::TxRequest(int tgt, const void *buf, int len, int *head, int hLength) {
cylon::CylonRequest::CylonRequest(int tgt, const void *buf, int len, int *head, int hLength) {
target = tgt;
buffer = buf;
length = len;
memcpy(&header[0], head, hLength * sizeof(int));
headerLength = hLength;
}

cylon::TxRequest::~TxRequest() {
cylon::CylonRequest::~CylonRequest() {
buffer = nullptr;
}

void cylon::TxRequest::to_string(std::string dataType, int bufDepth) {
void cylon::CylonRequest::to_string(std::string dataType, int bufDepth) {
std::cout << "Target: " << target << std::endl;
std::cout << "Length: " << length << std::endl;
std::cout << "Header Length: " << headerLength << std::endl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <iostream>

namespace cylon {
class TxRequest {
class CylonRequest {

public:
const void *buffer{};
Expand All @@ -27,13 +27,13 @@ class TxRequest {
int header[6] = {};
int headerLength{};

TxRequest(int tgt, const void *buf, int len);
CylonRequest(int tgt, const void *buf, int len);

TxRequest(int tgt, const void *buf, int len, int *head, int hLength);
CylonRequest(int tgt, const void *buf, int len, int *head, int hLength);

explicit TxRequest(int tgt);
explicit CylonRequest(int tgt);

~TxRequest();
~CylonRequest();

void to_string(std::string dataType, int bufDepth);
};
Expand Down
Loading

0 comments on commit 2759a30

Please sign in to comment.