Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor fixes #608

Merged
merged 34 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3d2ad2d
remove gloo default hostname
nirandaperera Jul 18, 2022
2c88bec
minor change gloo
nirandaperera Jul 18, 2022
21fc281
adding gloo-mpi test
nirandaperera Jul 18, 2022
256397d
adding ucc cyton
nirandaperera Jul 18, 2022
3d86b11
Update setup.py
nirandaperera Jul 19, 2022
d8160eb
Update setup.py
nirandaperera Jul 19, 2022
b6d7814
adding ucc test
nirandaperera Jul 19, 2022
f144e91
adding multi env test
nirandaperera Jul 20, 2022
8caecc9
cosmetic changes
nirandaperera Jul 25, 2022
9afa79b
adding regular sampling cython
nirandaperera Jul 26, 2022
72b83a9
adding UCC barrier
nirandaperera Aug 3, 2022
dc95f40
adding macos11 tag for CI
nirandaperera Aug 3, 2022
30ffb1a
fixing windows error
nirandaperera Aug 3, 2022
ef1af26
trying to fix macos ci
nirandaperera Aug 3, 2022
cda5c2c
trying to fix macos issue
nirandaperera Aug 3, 2022
e524415
Revert "trying to fix macos issue"
nirandaperera Aug 3, 2022
c31c7e5
attempting to fix macos ci
nirandaperera Aug 3, 2022
5e67806
style-check
nirandaperera Aug 4, 2022
c463064
adding gloo timeout
nirandaperera Aug 11, 2022
41a7c4b
Merge remote-tracking branch 'nira/minor-fixes' into minor-fixes
nirandaperera Aug 11, 2022
ffa89ee
adding custom mpiexec cmake var
nirandaperera Aug 16, 2022
d6e0d69
minor change
nirandaperera Aug 22, 2022
c1a6afb
Merge branch 'minor-fixes' of github.com:nirandaperera/cylon into min…
nirandaperera Aug 22, 2022
7b9764a
Merge branch 'main' of https://github.com/cylondata/cylon into minor-…
nirandaperera Aug 22, 2022
832c4d9
adding sort options
nirandaperera Aug 22, 2022
b9698e3
fixing dist sort empty table bug
nirandaperera Aug 23, 2022
ec8c09a
fixing regular sampling bug
nirandaperera Aug 23, 2022
bc4d755
adding licence
nirandaperera Aug 27, 2022
ec28db2
adding bench logging
nirandaperera Aug 28, 2022
9eee032
Merge branch 'main' of https://github.com/cylondata/cylon into minor-…
nirandaperera Aug 31, 2022
79139f9
checking mpi
nirandaperera Sep 12, 2022
cab20bc
Revert "checking mpi"
nirandaperera Sep 12, 2022
682fc7f
adding mpi_comm arg to ucx config
nirandaperera Sep 14, 2022
a1661c7
adding cython bindings
nirandaperera Sep 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions cpp/src/cylon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@

find_package(Threads REQUIRED)

IF(WIN32)
if (CYLON_BENCH)
message("Benchmark timings enabled")
add_definitions(-D_CYLON_BENCH)
endif ()

if (WIN32)
set_source_files_properties(util/murmur3.cpp util/murmur3.hpp PROPERTIES)
ELSE()
else ()
set_source_files_properties(util/murmur3.cpp util/murmur3.hpp PROPERTIES COMPILE_FLAGS -Wno-implicit-fallthrough)
ENDIF()
endif (WIN32)

if (CYLON_UCX)
set(UCX_CYLON_FILES
Expand Down
91 changes: 59 additions & 32 deletions cpp/src/cylon/arrow/arrow_partition_kernels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,8 @@ class RangePartitionKernel : public PartitionKernel {
idx_col,
[&](uint64_t global_idx, ValueT val) {
uint32_t p = ascending ?
bin_to_partition[get_bin_pos(val)] : num_partitions - 1 - bin_to_partition[get_bin_pos(val)];
bin_to_partition[get_bin_pos(val)] : num_partitions - 1
- bin_to_partition[get_bin_pos(val)];
target_partitions[global_idx] = p;
partition_histogram[p]++;
},
Expand All @@ -492,69 +493,95 @@ class RangePartitionKernel : public PartitionKernel {
}

private:
inline Status build_bin_to_partition(const std::shared_ptr<arrow::ChunkedArray> &idx_col, uint32_t num_partitions) {
inline Status build_bin_to_partition(const std::shared_ptr<arrow::ChunkedArray> &idx_col,
uint32_t num_partitions) {
const std::shared_ptr<DataType> &data_type = tarrow::ToCylonType(idx_col->type());
std::shared_ptr<arrow::ChunkedArray> sampled_array;

if ((uint64_t) idx_col->length()
== num_samples) { // if len == num_samples, dont sample, just use idx col as it is!
// if len == num_samples, dont sample, just use idx col as it is!
if ((uint64_t) idx_col->length() == num_samples) {
sampled_array = std::make_shared<arrow::ChunkedArray>(idx_col->chunks());
} else { // else, sample idx_col for num_samples
std::shared_ptr<arrow::Array> samples;
RETURN_CYLON_STATUS_IF_ARROW_FAILED(util::SampleArray(idx_col, num_samples, samples, ToArrowPool(ctx)));
sampled_array = std::make_shared<arrow::ChunkedArray>(samples);
RETURN_CYLON_STATUS_IF_ARROW_FAILED(util::SampleArray(idx_col,
num_samples,
samples,
ToArrowPool(ctx)));
sampled_array = std::make_shared<arrow::ChunkedArray>(std::move(samples));
}

const auto &comm = ctx->GetCommunicator();

// calculate minmax of the sample
std::shared_ptr<compute::Result> minmax;
RETURN_CYLON_STATUS_IF_FAILED(compute::MinMax(ctx, sampled_array, data_type, minmax));
// todo: Allreduce MIN {min, -max} array as an optimization. Bool could be a problem.
// might need some template tricks

std::shared_ptr<Scalar> min_scalar, max_scalar;
if (sampled_array->length() == 0) {
// if no samples present, set min and max to opposite ends so that Allreduce will determine the
// correct value
min_scalar = Scalar::Make(arrow::MakeScalar(std::numeric_limits<ValueT>::max()));
max_scalar = Scalar::Make(arrow::MakeScalar(std::numeric_limits<ValueT>::min()));;
} else {
arrow::compute::ScalarAggregateOptions opts(true, 0);
CYLON_ASSIGN_OR_RAISE(const auto &min_max, arrow::compute::MinMax(sampled_array, opts))
const auto &min_max_v = min_max.scalar_as<arrow::StructScalar>().value;
min_scalar = Scalar::Make(min_max_v[0]);
max_scalar = Scalar::Make(min_max_v[1]);
}

const auto &struct_scalar = minmax->GetResult().scalar_as<arrow::StructScalar>();
min = std::static_pointer_cast<ScalarT>(struct_scalar.value[0])->value;
max = std::static_pointer_cast<ScalarT>(struct_scalar.value[1])->value;
std::shared_ptr<Scalar> global_min_scalar, global_max_scalar;
RETURN_CYLON_STATUS_IF_FAILED(comm->AllReduce(min_scalar, net::MIN, &global_min_scalar));
RETURN_CYLON_STATUS_IF_FAILED(comm->AllReduce(max_scalar, net::MAX, &global_max_scalar));

min = std::static_pointer_cast<ScalarT>(global_min_scalar->data())->value;
max = std::static_pointer_cast<ScalarT>(global_max_scalar->data())->value;
range = max - min;

// create sample histogram
std::vector<uint64_t> local_counts(num_bins + 2, 0);
for (const auto &arr: sampled_array->chunks()) {
const std::shared_ptr<ArrayT> &casted_arr = std::static_pointer_cast<ArrayT>(arr);
for (int64_t i = 0; i < casted_arr->length(); i++) {
ValueT val = casted_arr->Value(i);
local_counts[get_bin_pos(val)]++;
}
arrow::VisitArrayDataInline<ARROW_T>(*arr->data(),
[&](ValueT val) {
local_counts[get_bin_pos(val)]++;
},
[]() {
// idx_col is guaranteed to be non-null
});
}

// all reduce local sample histograms
std::vector<uint64_t> global_counts, *global_counts_ptr;
const uint64_t *global_counts_ptr;
std::shared_ptr<Column> global_counts;
if (ctx->GetWorldSize() > 1) { // if distributed, all-reduce all local bin counts
global_counts.resize(num_bins + 2, 0);
RETURN_CYLON_STATUS_IF_FAILED(cylon::mpi::AllReduce(ctx,
local_counts.data(),
global_counts.data(),
num_bins + 2,
cylon::UInt64(),
cylon::net::SUM));
global_counts_ptr = &global_counts;
local_counts.clear();
arrow::BufferVector buf{nullptr, arrow::Buffer::Wrap(local_counts)};
const auto
&array_data = arrow::ArrayData::Make(arrow::uint64(), num_bins + 2, std::move(buf), 0);
auto local_counts_col = Column::Make(arrow::MakeArray(array_data));

RETURN_CYLON_STATUS_IF_FAILED(comm->AllReduce(local_counts_col, net::SUM, &global_counts));
global_counts_ptr =
std::static_pointer_cast<arrow::UInt64Array>(global_counts->data())->raw_values();
} else { // else, just use local bin counts
global_counts_ptr = &local_counts;
global_counts_ptr = local_counts.data();
}

float_t quantile = float(1.0 / num_partitions), prefix_sum = 0;

LOG(INFO) << "len=" << idx_col->length() << " min=" << min << " max=" << max << " range=" <<
range << " num bins=" << num_bins << " quantile=" << quantile;
// LOG(INFO) << "len=" << idx_col->length() << " min=" << min << " max=" << max << " range=" <<
// range << " num bins=" << num_bins << " quantile=" << quantile;

// divide global histogram into quantiles
const uint64_t total_samples = ctx->GetWorldSize() * num_samples;
uint32_t curr_partition = 0;
float_t target_quantile = quantile;
for (const auto &c: *global_counts_ptr) {
for (uint32_t i = 0; i < num_bins + 2; i++) {
bin_to_partition.push_back(curr_partition);
float_t freq = (float_t) c / total_samples;
float_t freq = (float_t) global_counts_ptr[i] / total_samples;
prefix_sum += freq;
if (prefix_sum > target_quantile) {
curr_partition += (curr_partition < num_partitions - 1); // if curr_partition < numpartition: curr_partition++
curr_partition += (curr_partition
< num_partitions - 1); // if curr_partition < numpartition: curr_partition++
target_quantile += quantile;
}
}
Expand Down
16 changes: 13 additions & 3 deletions cpp/src/cylon/mapreduce/mapreduce.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
//
// Created by niranda on 11/22/21.
//
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cylon/thridparty/flat_hash_map/bytell_hash_map.hpp>
#include "cylon/mapreduce/mapreduce.hpp"
Expand Down
16 changes: 13 additions & 3 deletions cpp/src/cylon/mapreduce/mapreduce.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
//
// Created by niranda on 11/22/21.
//
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef CYLON_CPP_SRC_CYLON_MAPREDUCE_MAPREDUCE_HPP_
#define CYLON_CPP_SRC_CYLON_MAPREDUCE_MAPREDUCE_HPP_
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/cylon/net/gloo/gloo_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Status GlooCommunicator::Make(const std::shared_ptr<CommConfig> &config,
gloo_ctx = gloo::mpi::Context::createManaged();
}
if (gloo_config->timeout_ != kTimoutNotSet) {
gloo_ctx->setTimeout(std::chrono::seconds(gloo_config->timeout_));
gloo_ctx->setTimeout(gloo_config->timeout_);
}
((gloo::mpi::Context &) *gloo_ctx).connectFullMesh(dev);
#else
Expand All @@ -73,7 +73,7 @@ Status GlooCommunicator::Make(const std::shared_ptr<CommConfig> &config,
gloo_ctx = std::make_shared<gloo::rendezvous::Context>(gloo_config->rank_,
gloo_config->world_size_);
if (gloo_config->timeout_ != kTimoutNotSet) {
gloo_ctx->setTimeout(std::chrono::seconds(gloo_config->timeout_));
gloo_ctx->setTimeout(gloo_config->timeout_);
}
((gloo::rendezvous::Context &) *gloo_ctx).connectFullMesh(*prefix_store, dev);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/cylon/net/gloo/gloo_communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace net {

class GlooCommunicator;

static constexpr std::chrono::seconds kTimoutNotSet(0);
static constexpr std::chrono::seconds kTimoutNotSet = std::chrono::seconds::zero();

class GlooConfig : public CommConfig {
public:
Expand Down
84 changes: 79 additions & 5 deletions cpp/src/cylon/net/gloo/gloo_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,75 @@ Status GlooTableBcastImpl::WaitAll(int32_t num_buffers) {
return Status::OK();
}

template<typename T>
void land(void *c_, const void *a_, const void *b_, size_t n) {
T *c = static_cast<T *>(c_);
const T *a = static_cast<const T *>(a_);
const T *b = static_cast<const T *>(b_);
for (size_t i = 0; i < n; i++) {
c[i] = a[i] && b[i];
}
}

template<typename T>
void lor(void *c_, const void *a_, const void *b_, size_t n) {
T *c = static_cast<T *>(c_);
const T *a = static_cast<const T *>(a_);
const T *b = static_cast<const T *>(b_);
for (size_t i = 0; i < n; i++) {
c[i] = a[i] || b[i];
}
}

template<typename T, typename Enable = void>
struct band_impl {};

template<typename T>
struct band_impl<T, typename std::enable_if<std::is_integral<T>::value>::type> {
static void impl(void *c_, const void *a_, const void *b_, size_t n) {
T *c = static_cast<T *>(c_);
const T *a = static_cast<const T *>(a_);
const T *b = static_cast<const T *>(b_);
for (size_t i = 0; i < n; i++) {
c[i] = a[i] & b[i];
}
}

static gloo::AllreduceOptions::Func band() {
return &impl;
}
};

template<typename T>
struct band_impl<T, typename std::enable_if<std::is_floating_point<T>::value>::type> {
static gloo::AllreduceOptions::Func band() { return nullptr; }
};


template<typename T, typename Enable = void>
struct bor_impl {};

template<typename T>
struct bor_impl<T, typename std::enable_if<std::is_integral<T>::value>::type> {
static void impl(void *c_, const void *a_, const void *b_, size_t n) {
T *c = static_cast<T *>(c_);
const T *a = static_cast<const T *>(a_);
const T *b = static_cast<const T *>(b_);
for (size_t i = 0; i < n; i++) {
c[i] = a[i] | b[i];
}
}

static gloo::AllreduceOptions::Func bor() {
return &impl;
}
};

template<typename T>
struct bor_impl<T, typename std::enable_if<std::is_floating_point<T>::value>::type> {
static gloo::AllreduceOptions::Func bor() { return nullptr; }
};

template<typename T>
gloo::AllreduceOptions::Func get_reduce_func(ReduceOp op) {
void (*func)(void *, const void *, const void *, size_t);
Expand All @@ -180,10 +249,10 @@ gloo::AllreduceOptions::Func get_reduce_func(ReduceOp op) {
return func;
case PROD:func = &gloo::product<T>;
return func;
case LAND:
case LOR:
case BAND:
case BOR:return nullptr;
case LAND:return &land<T>;
case LOR:return &lor<T>;
case BAND:return band_impl<T>::band();
case BOR:return bor_impl<T>::bor();
}
return nullptr;
}
Expand All @@ -195,7 +264,12 @@ Status all_reduce_buffer(const std::shared_ptr<gloo::Context> &ctx,
int count,
ReduceOp reduce_op) {
gloo::AllreduceOptions opts(ctx);
opts.setReduceFunction(get_reduce_func<T>(reduce_op));

auto func = get_reduce_func<T>(reduce_op);
if (func == nullptr) {
return {Code::Invalid, "Unsupported reduction operator " + std::to_string(reduce_op)};
}
opts.setReduceFunction(func);

opts.template setInput<T>(const_cast<T *>((const T *) send_buf), count);
opts.template setOutput<T>((T *) rcv_buf, count);
Expand Down
Loading