Skip to content

Commit

Permalink
Minor fixes (#596)
Browse files Browse the repository at this point in the history
* remove gloo default hostname

* minor change gloo

* adding gloo-mpi test

* adding ucc cyton

* Update setup.py

* Update setup.py

* adding ucc test

* adding multi env test

* cosmetic changes

* adding regular sampling cython

* adding UCC barrier

* adding macos11 tag for CI

* fixing windows error

* trying to fix macos ci

* trying to fix macos issue

* Revert "trying to fix macos issue"

This reverts commit cda5c2c.

* attempting to fix macos ci

* style-check

* adding gloo timeout

* adding custom mpiexec cmake var
  • Loading branch information
nirandaperera authored Aug 16, 2022
1 parent 2e6ac80 commit e6b7306
Show file tree
Hide file tree
Showing 28 changed files with 416 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
fail-fast: false
matrix:
include:
- os: macos-10.15
- os: macos-11

steps:
- uses: actions/checkout@v2
Expand Down
29 changes: 24 additions & 5 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def check_conda_prefix():
parser.add_argument("-ipath", help='Install directory')

parser.add_argument("--verbose", help='Set verbosity', default=False, action="store_true")
parser.add_argument("-j", help='Parallel build threads', default=os.cpu_count(),
dest='parallel', type=int)

args = parser.parse_args()

Expand All @@ -107,9 +109,7 @@ def on_off(arg):
RUN_CPP_TESTS = args.test
RUN_PYTHON_TESTS = args.pytest
CMAKE_FLAGS = args.cmake_flags
CPPLINT_COMMAND = "\"-DCMAKE_CXX_CPPLINT=cpplint;--linelength=100;--headers=h," \
"hpp;--filter=-legal/copyright,-build/c++11,-runtime/references\" " if \
args.style_check else " "
PARALLEL = args.parallel

# arrow build expects /s even on windows
BUILD_PYTHON = args.python
Expand All @@ -127,6 +127,16 @@ def on_off(arg):

PYTHON_EXEC = sys.executable

if args.style_check:
cmd = f'{PYTHON_EXEC} -m pip install cpplint'
res = subprocess.run(cmd, shell=True, cwd=PYTHON_SOURCE_DIR)
check_status(res.returncode, "cpplint install")

CPPLINT_COMMAND = "-DCMAKE_CXX_CPPLINT=\"cpplint;--linelength=100;--headers=h," \
"hpp;--filter=-legal/copyright,-build/c++11,-runtime/references\" "
else:
CPPLINT_COMMAND = " "

CMAKE_BOOL_FLAGS = {'CYLON_GLOO', 'CYLON_UCX', 'CYLON_UCC'}
CMAKE_FALSE_OPTIONS = {'0', 'FALSE', 'OFF', 'N', 'NO', 'IGNORE', 'NOTFOUND'}

Expand Down Expand Up @@ -169,6 +179,7 @@ def print_line():
logger.info(f"Python exec : {PYTHON_EXEC}")
logger.info(f"Build mode : {CPP_BUILD_MODE}")
logger.info(f"Build path : {BUILD_DIR}")
logger.info(f"Build threads : {PARALLEL}")
logger.info(f"Install path : {INSTALL_DIR}")
logger.info(f"CMake flags : {CMAKE_FLAGS}")
logger.info(f" -CYLON_GLOO : {CYLON_GLOO}")
Expand Down Expand Up @@ -206,7 +217,7 @@ def build_cpp():
res = subprocess.call(cmake_command, cwd=BUILD_DIR, shell=True)
check_status(res, "C++ cmake generate")

cmake_build_command = f'cmake --build . --parallel {os.cpu_count()} --config {CPP_BUILD_MODE}'
cmake_build_command = f'cmake --build . --parallel {PARALLEL} --config {CPP_BUILD_MODE}'
logger.info(f"Build command: {cmake_build_command}")
res = subprocess.call(cmake_build_command, cwd=BUILD_DIR, shell=True)
check_status(res, "C++ cmake build")
Expand Down Expand Up @@ -254,6 +265,14 @@ def python_test():

env['LD_LIBRARY_PATH'] = os.path.join(GLOO_PREFIX, "lib") + os.pathsep + \
env['LD_LIBRARY_PATH']

if CYLON_UCC:
env['CYLON_UCC'] = str(CYLON_UCC)
env['UCC_PREFIX'] = UCC_PREFIX
env['LD_LIBRARY_PATH'] = os.path.join(UCC_PREFIX, "lib") + os.pathsep + \
os.path.join(UCC_PREFIX, "lib", "ucc") + os.pathsep + \
env['LD_LIBRARY_PATH']

elif OS_NAME == 'Darwin':
if 'DYLD_LIBRARY_PATH' in env:
env['DYLD_LIBRARY_PATH'] = str(Path(INSTALL_DIR, "lib")) + os.pathsep \
Expand All @@ -279,7 +298,7 @@ def build_python():

conda_prefix = check_conda_prefix()

python_build_command = f'{PYTHON_EXEC} setup.py install --force'
python_build_command = f'{PYTHON_EXEC} -m pip install -v --upgrade .'
env = os.environ
env["CYLON_PREFIX"] = str(BUILD_DIR)
if os.name == 'posix':
Expand Down
2 changes: 1 addition & 1 deletion conda/environments/cylon_MacOS.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ dependencies:
- numpy>=1.16
- pandas>=1.0
- fsspec
- setuptools>=40.0,<60.0
- setuptools>=40.0
# they are not needed for using pygcylon or compiling it
- pytest
- pytest-mpi
Expand Down
4 changes: 4 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ message("Finding MPI")
find_package(MPI REQUIRED COMPONENTS CXX)
message(STATUS "MPI include dir: ${MPI_CXX_INCLUDE_PATH}")
message(STATUS "MPI libs: ${MPI_CXX_LIBRARIES}")

if (CYLON_CUSTOM_MPIRUN)
set(MPIEXEC_EXECUTABLE ${CYLON_CUSTOM_MPIRUN})
endif (CYLON_CUSTOM_MPIRUN)
message(STATUS "MPI executable: ${MPIEXEC_EXECUTABLE}")

include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH})
Expand Down
14 changes: 13 additions & 1 deletion cpp/src/cylon/net/gloo/gloo_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ Status GlooCommunicator::Make(const std::shared_ptr<CommConfig> &config,
} else { // MPI is not initialized. Ask gloo to initialize MPI
gloo_ctx = gloo::mpi::Context::createManaged();
}
if (gloo_config->timeout_ != kTimoutNotSet) {
gloo_ctx->setTimeout(std::chrono::seconds(gloo_config->timeout_));
}
((gloo::mpi::Context &) *gloo_ctx).connectFullMesh(dev);
#else
return {Code::Invalid, "Gloo does not contain mpi headers!"};
Expand All @@ -69,8 +72,10 @@ Status GlooCommunicator::Make(const std::shared_ptr<CommConfig> &config,

gloo_ctx = std::make_shared<gloo::rendezvous::Context>(gloo_config->rank_,
gloo_config->world_size_);
if (gloo_config->timeout_ != kTimoutNotSet) {
gloo_ctx->setTimeout(std::chrono::seconds(gloo_config->timeout_));
}
((gloo::rendezvous::Context &) *gloo_ctx).connectFullMesh(*prefix_store, dev);

}
*out = std::make_shared<GlooCommunicator>(pool, std::move(gloo_ctx));
return Status::OK();
Expand Down Expand Up @@ -193,5 +198,12 @@ int GlooConfig::world_size() const {
return world_size_;
}

void GlooConfig::SetTimeout(int timeout) {
GlooConfig::timeout_ = std::chrono::seconds(timeout);
}
const std::chrono::seconds &GlooConfig::timeout() const {
return GlooConfig::timeout_;
}

}
}
7 changes: 6 additions & 1 deletion cpp/src/cylon/net/gloo/gloo_communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ namespace net {

class GlooCommunicator;

static constexpr std::chrono::seconds kTimoutNotSet(0);

class GlooConfig : public CommConfig {
public:
explicit GlooConfig(int rank = 0, int world_size = 1, bool use_mpi = false);
Expand All @@ -43,7 +45,9 @@ class GlooConfig : public CommConfig {

int rank() const;
int world_size() const;
const std::chrono::seconds &timeout() const;

void SetTimeout(int timeout);
void SetTcpHostname(const std::string &tcp_hostname);
void SetTcpIface(const std::string &tcp_iface);
void SetTcpAiFamily(int tcp_ai_family);
Expand All @@ -63,6 +67,7 @@ class GlooConfig : public CommConfig {
int rank_;
int world_size_;
bool use_mpi_;
std::chrono::seconds timeout_ = kTimoutNotSet;

#ifdef GLOO_USE_MPI
/*
Expand All @@ -74,7 +79,7 @@ class GlooConfig : public CommConfig {
#endif //GLOO_USE_MPI

// tcp attr
std::string tcp_hostname_ = "localhost";
std::string tcp_hostname_;
std::string tcp_iface_;
int tcp_ai_family_ = AF_UNSPEC;

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/cylon/net/mpi/mpi_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,13 @@ Status MPICommunicator::Make(const std::shared_ptr<CommConfig> &config,

void MPICommunicator::Finalize() {
// finalize only if we initialized MPI
if (!externally_init) {
if (!externally_init && !IsFinalized()) {
int finalized;
MPI_Finalized(&finalized);
if (!finalized) {
MPI_Finalize();
}
finalized = true;
}
}
void MPICommunicator::Barrier() {
Expand Down
53 changes: 39 additions & 14 deletions cpp/src/cylon/net/ucx/ucx_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
namespace cylon {
namespace net {

static constexpr int kBarrierFlag = UINT32_MAX;

void mpi_check_and_finalize() {
int mpi_finalized;
MPI_Finalized(&mpi_finalized);
Expand Down Expand Up @@ -89,7 +91,8 @@ Status UCXCommunicator::AllReduce(const std::shared_ptr<Column> &column,
return {Code::NotImplemented, "Allreduce not implemented for ucx"};
}

UCXCommunicator::UCXCommunicator(MemoryPool *pool) : Communicator(pool, -1, -1) {}
UCXCommunicator::UCXCommunicator(MemoryPool *pool, bool externally_init)
: Communicator(pool, -1, -1), externally_init(externally_init) {}

Status UCXCommunicator::AllReduce(const std::shared_ptr<Scalar> &values,
net::ReduceOp reduce_op,
Expand Down Expand Up @@ -117,10 +120,16 @@ Status UCXCommunicator::Allgather(const std::shared_ptr<Scalar> &value,
Status UCXCommunicator::Make(const std::shared_ptr<CommConfig> &config, MemoryPool *pool,
std::shared_ptr<Communicator> *out) {
CYLON_UNUSED(config);
*out = std::make_shared<UCXCommunicator>(pool);
auto &comm = static_cast<UCXCommunicator &>(**out);
// Check init functions
// MPI init
int initialized;
MPI_Initialized(&initialized);
if (!initialized) {
RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr));
}

*out = std::make_shared<UCXCommunicator>(pool, initialized);
auto &comm = static_cast<UCXCommunicator &>(**out);

// Int variable used when iterating
int sIndx;
// Address of the UCP Worker for receiving
Expand All @@ -133,12 +142,6 @@ Status UCXCommunicator::Make(const std::shared_ptr<CommConfig> &config, MemoryPo
// Variable to hold the current ucp address
ucp_address_t *address;

// MPI init
MPI_Initialized(&initialized);
if (!initialized) {
RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr));
}

// Get the rank for checking send to self, and initializations
MPI_Comm_rank(MPI_COMM_WORLD, &comm.rank);
MPI_Comm_size(MPI_COMM_WORLD, &comm.world_size);
Expand Down Expand Up @@ -205,7 +208,7 @@ Status UCXCommunicator::Make(const std::shared_ptr<CommConfig> &config, MemoryPo
}

void UCXCommunicator::Finalize() {
if (!this->IsFinalized()) {
if (!externally_init && !IsFinalized()) {
ucp_cleanup(ucpContext);
mpi_check_and_finalize();
finalized = true;
Expand Down Expand Up @@ -329,14 +332,36 @@ void UCXUCCCommunicator::Finalize() {
}
}
ucc_context_destroy(uccContext);
mpi_check_and_finalize();
ucx_comm_->Finalize();
ucx_comm_->Finalize(); // this will handle MPI_Finalize
finalized = true;
}
}

void UCXUCCCommunicator::Barrier() {
return ucx_comm_->Barrier();
ucc_coll_args_t args_;
ucc_coll_req_h req;

args_.mask = 0;
args_.coll_type = UCC_COLL_TYPE_BARRIER;

ucc_status_t status;
status = ucc_collective_init(&args_, &req, uccTeam);
assert(status == UCC_OK);

status = ucc_collective_post(req);
assert(status == UCC_OK);

status = ucc_collective_test(req);
while (status > UCC_OK) { // loop until status == 0
status = ucc_context_progress(uccContext);
assert(status >= UCC_OK); // should be 0, 1 or 2

status = ucc_collective_test(req);
}
assert(status == UCC_OK);

status = ucc_collective_finalize(req);
assert(status == UCC_OK);
}

Status UCXUCCCommunicator::AllGather(const std::shared_ptr<Table> &table,
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/cylon/net/ucx/ucx_communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ namespace cylon {
namespace net {

class UCXConfig : public CommConfig {
public:
CommType Type() override;

public:
static std::shared_ptr<UCXConfig> Make();
};

class UCXCommunicator : public Communicator {
public:
explicit UCXCommunicator(MemoryPool *pool);
explicit UCXCommunicator(MemoryPool *pool, bool externally_init);
~UCXCommunicator() override = default;

std::unique_ptr<Channel> CreateChannel() const override;
Expand Down Expand Up @@ -78,6 +78,7 @@ class UCXCommunicator : public Communicator {
std::unordered_map<int, ucp_ep_h> endPointMap;
// UCP Context - Holds a UCP communication instance's global information.
ucp_context_h ucpContext{};
bool externally_init = false;
};

#ifdef BUILD_CYLON_UCC
Expand Down
18 changes: 6 additions & 12 deletions cpp/src/cylon/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,8 @@ Status MergeSortedTable(const std::vector<std::shared_ptr<Table>> &tables,
const std::vector<bool> &sort_orders,
std::shared_ptr<Table> &out) {
std::shared_ptr<Table> concatenated;
std::vector<int64_t> table_indices(tables.size()),
table_end_indices(tables.size());
int acc = 0;
std::vector<int64_t> table_indices(tables.size()), table_end_indices(tables.size());
int64_t acc = 0;
for (size_t i = 0; i < table_indices.size(); i++) {
table_indices[i] = acc;
acc += tables[i]->Rows();
Expand All @@ -515,7 +514,7 @@ Status MergeSortedTable(const std::vector<std::shared_ptr<Table>> &tables,

RETURN_CYLON_STATUS_IF_FAILED(Merge(tables, concatenated));

if(concatenated->GetContext()->GetWorldSize() > 4) {
if (concatenated->GetContext()->GetWorldSize() > 4) {
return Sort(concatenated, sort_columns, out, sort_orders);
}

Expand All @@ -530,7 +529,7 @@ Status MergeSortedTable(const std::vector<std::shared_ptr<Table>> &tables,

std::priority_queue<int, std::vector<int>, decltype(comp)> pq(comp);

for (size_t i = 0; i < tables.size(); i++) {
for (int i = 0; i < (int) tables.size(); i++) {
if (table_indices[i] < table_end_indices[i]) {
pq.push(i);
}
Expand All @@ -541,13 +540,9 @@ Status MergeSortedTable(const std::vector<std::shared_ptr<Table>> &tables,
arrow::Int64Builder filter(pool);
RETURN_CYLON_STATUS_IF_ARROW_FAILED(filter.Reserve(concatenated->Rows()));

std::vector<int> temp_v;

while (!pq.empty()) {
int t = pq.top();
pq.pop();
// std::cout<<table_indices[t]<<std::endl;
temp_v.push_back(table_indices[t]);
filter.UnsafeAppend(table_indices[t]);
table_indices[t] += 1;
if (table_indices[t] < table_end_indices[t]) {
Expand All @@ -556,9 +551,8 @@ Status MergeSortedTable(const std::vector<std::shared_ptr<Table>> &tables,
}

CYLON_ASSIGN_OR_RAISE(auto take_arr, filter.Finish());
CYLON_ASSIGN_OR_RAISE(
auto take_res,
(arrow::compute::Take(concatenated->get_table(), take_arr)));
CYLON_ASSIGN_OR_RAISE(auto take_res,
(arrow::compute::Take(concatenated->get_table(), take_arr)));

out = std::make_shared<Table>(ctx, take_res.table());

Expand Down
3 changes: 3 additions & 0 deletions cpp/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -238,5 +238,8 @@ if (CYLON_UCX)
cylon_run_test(aggregate_test 1 ucx)
cylon_run_test(aggregate_test 2 ucx)
cylon_run_test(aggregate_test 4 ucx)

cylon_run_test(sync_comms_test 1 ucx)
cylon_run_test(sync_comms_test 4 ucx)
endif (CYLON_UCC)
endif (CYLON_UCX)
Loading

0 comments on commit e6b7306

Please sign in to comment.