diff --git a/.github/workflows/conda-actions.yml b/.github/workflows/conda-actions.yml index f9b93e8dc..217213788 100644 --- a/.github/workflows/conda-actions.yml +++ b/.github/workflows/conda-actions.yml @@ -35,7 +35,7 @@ jobs: # Specify the correct host compilers - name: Install/Select gcc and g++ run: | - sudo apt-get install -y gcc-${{ matrix.gcc }} g++-${{ matrix.gcc }} + sudo apt-get install -y gcc-${{ matrix.gcc }} g++-${{ matrix.gcc }} libopenmpi-dev echo "CC=/usr/bin/gcc-${{ matrix.gcc }}" >> $GITHUB_ENV echo "CXX=/usr/bin/g++-${{ matrix.gcc }}" >> $GITHUB_ENV echo "CUDAHOSTCXX=/usr/bin/g++-${{ matrix.gcc }}" >> $GITHUB_ENV diff --git a/conda/environments/cylon.yml b/conda/environments/cylon.yml index 7acbe1f60..30ba4f154 100644 --- a/conda/environments/cylon.yml +++ b/conda/environments/cylon.yml @@ -7,7 +7,7 @@ dependencies: - cmake>=3.17 - pyarrow=5.0.0 - glog=0.5.0 - - openmpi>=4.1 + - openmpi>=4.1.3 - ucx>=1.12.1 - cython>=0.29,<0.30 - numpy>=1.16 diff --git a/conda/environments/cylon_MacOS.yml b/conda/environments/cylon_MacOS.yml index 72e691dad..d9d4f0357 100644 --- a/conda/environments/cylon_MacOS.yml +++ b/conda/environments/cylon_MacOS.yml @@ -7,7 +7,7 @@ dependencies: - cmake>=3.17 - pyarrow=5.0.0 - glog=0.5.0 - - openmpi>=4.1 + - openmpi>=4.1.2 - cython>=0.29,<0.30 - numpy>=1.16 - pandas>=1.0 diff --git a/conda/environments/gcylon.yml b/conda/environments/gcylon.yml index 593d8dd62..a140fbb5f 100644 --- a/conda/environments/gcylon.yml +++ b/conda/environments/gcylon.yml @@ -11,8 +11,8 @@ dependencies: - cudf=21.10.01 - cudatoolkit=11.2 - glog=0.5.0 - - openmpi>=4.1 - - ucx>=1.12.0 + - openmpi>=4.1.2 + - ucx>=1.12.1 - numpy>=1.16 - pandas>=1.0 - setuptools>=40.0,<60.0 diff --git a/cpp/src/cylon/arrow/arrow_comparator.cpp b/cpp/src/cylon/arrow/arrow_comparator.cpp index 36ce652da..bbc149dc5 100644 --- a/cpp/src/cylon/arrow/arrow_comparator.cpp +++ b/cpp/src/cylon/arrow/arrow_comparator.cpp @@ -759,10 +759,13 @@ Status CreateDualArrayIndexComparator(const std::shared_ptr &a1, Status TableRowIndexEqualTo::Make(const std::shared_ptr &table, const std::vector &col_ids, - std::unique_ptr *out_equal_to) { + std::unique_ptr *out_equal_to, + const std::vector &sort_order) { auto comps = std::make_shared>>(); comps->reserve(col_ids.size()); - for (int col_id: col_ids) { + bool order_not_given = sort_order.size() == 0; + for (std::size_t i = 0; i < col_ids.size(); i++) { + int col_id = col_ids[i]; if (table->column(col_id)->num_chunks() > 1) { return {Code::Invalid, "TableRowIndexEqualTo does not support multiple chunks"}; } @@ -772,7 +775,7 @@ Status TableRowIndexEqualTo::Make(const std::shared_ptr &table, } else { const auto &array = table->column(col_id)->chunk(0); std::unique_ptr comp; - RETURN_CYLON_STATUS_IF_FAILED(CreateArrayIndexComparator(array, &comp)); + RETURN_CYLON_STATUS_IF_FAILED(CreateArrayIndexComparator(array, &comp, order_not_given || sort_order[i])); comps->emplace_back(std::move(comp)); } } @@ -920,7 +923,8 @@ Status DualTableRowIndexEqualTo::Make(const std::shared_ptr &t1, const std::shared_ptr &t2, const std::vector &t1_indices, const std::vector &t2_indices, - std::unique_ptr *out_equal_to) { + std::unique_ptr *out_equal_to, + const std::vector &sort_order) { int num_cols = (int) t1_indices.size(); if (num_cols != (int) t2_indices.size()) { return {Code::Invalid, "sizes of indices of t1 and t2 are not equal!"}; @@ -929,6 +933,8 @@ Status DualTableRowIndexEqualTo::Make(const std::shared_ptr &t1, auto comps = std::make_shared>>(); comps->reserve(num_cols); + bool order_not_given = sort_order.size() == 0; + for (int i = 0; i < num_cols; i++) { if (t1->column(t1_indices[i])->num_chunks() > 1 || t2->column(t2_indices[i])->num_chunks() > 1) { @@ -939,7 +945,9 @@ Status DualTableRowIndexEqualTo::Make(const std::shared_ptr &t1, const auto &a2 = util::GetChunkOrEmptyArray(t2->column(t2_indices[i]), 0); std::unique_ptr comp; - RETURN_CYLON_STATUS_IF_FAILED(CreateDualArrayIndexComparator(a1, a2, &comp)); + + // asc=true if sort_order is not provided + RETURN_CYLON_STATUS_IF_FAILED(CreateDualArrayIndexComparator(a1, a2, &comp, order_not_given || sort_order[i])); comps->emplace_back(std::move(comp)); } diff --git a/cpp/src/cylon/arrow/arrow_comparator.hpp b/cpp/src/cylon/arrow/arrow_comparator.hpp index 0d5dce9d6..9aa86a998 100644 --- a/cpp/src/cylon/arrow/arrow_comparator.hpp +++ b/cpp/src/cylon/arrow/arrow_comparator.hpp @@ -173,8 +173,11 @@ class TableRowIndexEqualTo { // equality, less than, greater than int compare(const int64_t &record1, const int64_t &record2) const; - static Status Make(const std::shared_ptr &table, const std::vector &col_ids, - std::unique_ptr *out_equal_to); + static Status Make(const std::shared_ptr &table, + const std::vector &col_ids, + std::unique_ptr *out_equal_to, + const std::vector &sort_order = {}); + static Status Make(const std::vector> &arrays, std::unique_ptr *out_equal_to); @@ -278,7 +281,8 @@ class DualTableRowIndexEqualTo { const std::shared_ptr &t2, const std::vector &t1_indices, const std::vector &t2_indices, - std::unique_ptr *out_equal_to); + std::unique_ptr *out_equal_to, + const std::vector &sort_order = {}); bool operator()(const int64_t &record1, const int64_t &record2) const; diff --git a/cpp/src/cylon/table.cpp b/cpp/src/cylon/table.cpp index 6a1d66ed6..3831a5432 100644 --- a/cpp/src/cylon/table.cpp +++ b/cpp/src/cylon/table.cpp @@ -36,8 +36,10 @@ #include #include #include +#include #include #include +#include namespace cylon { @@ -68,7 +70,7 @@ Status PrepareArray(std::shared_ptr &ctx, array_vector.push_back(destination_col_array); return Status::OK(); } - + static inline Status all_to_all_arrow_tables(const std::shared_ptr &ctx, const std::shared_ptr &schema, const std::vector> &partitioned_tables, @@ -131,28 +133,19 @@ static inline Status all_to_all_arrow_tables(const std::shared_ptr return Status::OK(); } -/** - * output rows order by rank number - */ -static inline Status all_to_all_arrow_tables_preserve_order(const std::shared_ptr &ctx, - const std::shared_ptr &schema, - const std::vector> &partitioned_tables, - std::shared_ptr &table_out) { +// entries from each RANK are separated +static inline Status all_to_all_arrow_tables_separated_arrow_table(const std::shared_ptr &ctx, + const std::shared_ptr &schema, + const std::vector> &partitioned_tables, + std::vector> &received_tables) { const auto &neighbours = ctx->GetNeighbours(true); - - // here we expect that only a few processes will send to one process - // so we use a tree map instead of using a vector with size of number of processes - std::vector> received_tables; - std::map> received_tables_mp; - received_tables.reserve(neighbours.size()); + received_tables.resize(ctx->GetWorldSize()); // define call back to catch the receiving tables ArrowCallback arrow_callback = - [&received_tables_mp](int source, - const std::shared_ptr &table_, - int reference) { + [&received_tables, &ctx](int source, const std::shared_ptr &table_, int reference) { CYLON_UNUSED(reference); - received_tables_mp[source] = table_; + received_tables[source] = table_; return true; }; @@ -161,24 +154,24 @@ static inline Status all_to_all_arrow_tables_preserve_order(const std::shared_pt arrow_callback, schema); // if world size == partitions, simply send paritions based on index - const size_t world_size = (size_t) ctx->GetWorldSize(), - num_partitions = partitioned_tables.size(), + const int world_size = ctx->GetWorldSize(), + num_partitions = (int) partitioned_tables.size(), rank = ctx->GetRank(); if (world_size == num_partitions) { - for (size_t i = 0; i < partitioned_tables.size(); i++) { + for (int i = 0; i < num_partitions; i++) { if (i != rank) { all_to_all.insert(partitioned_tables[i], i); } else { - received_tables_mp[i] = partitioned_tables[i]; + received_tables[i] = partitioned_tables[i]; } } - } else { // divide parititions to world_size potions and send accordingly - for (size_t i = 0; i < partitioned_tables.size(); i++) { - size_t target = i * world_size / num_partitions; + } else { // divide partitions to world_size potions and send accordingly + for (int i = 0; i < num_partitions; i++) { + int target = i * world_size / num_partitions; if (target != rank) { all_to_all.insert(partitioned_tables[i], target); } else { - received_tables_mp[i] = partitioned_tables[i]; + received_tables[i] = partitioned_tables[i]; } } } @@ -189,16 +182,39 @@ static inline Status all_to_all_arrow_tables_preserve_order(const std::shared_pt } all_to_all.close(); - for (auto &p: received_tables_mp) { - received_tables.push_back(p.second); + return Status::OK(); +} + +static inline Status all_to_all_arrow_tables_separated_cylon_table(const std::shared_ptr &ctx, + const std::shared_ptr &schema, + const std::vector> &partitioned_tables, + std::vector> &table_out) { + std::vector> received_tables; + all_to_all_arrow_tables_separated_arrow_table(ctx, schema, partitioned_tables, received_tables); + + table_out.reserve(received_tables.size() - 1); + for(int i = 0; i < received_tables.size(); i++) { + if(received_tables[i]->num_rows() > 0) { + CYLON_ASSIGN_OR_RAISE(auto arrow_tb, received_tables[i]->CombineChunks(cylon::ToArrowPool(ctx))); + auto temp = std::make_shared(ctx, std::move(arrow_tb)); + table_out.push_back(temp); + } } - /* // now clear locally partitioned tables - partitioned_tables.clear();*/ + return Status::OK(); +} - // now we have the final set of tables - LOG(INFO) << "Concatenating tables, Num of tables : " << received_tables.size(); - CYLON_ASSIGN_OR_RAISE(table_out, arrow::ConcatenateTables(received_tables)); +/** + * output rows order by rank number + */ +static inline Status all_to_all_arrow_tables_preserve_order(const std::shared_ptr &ctx, + const std::shared_ptr &schema, + const std::vector> &partitioned_tables, + std::shared_ptr &table_out) { + std::vector> tables; + RETURN_CYLON_STATUS_IF_FAILED(all_to_all_arrow_tables_separated_arrow_table(ctx, schema, partitioned_tables, tables)); + LOG(INFO) << "Concatenating tables, Num of tables : " << tables.size(); + CYLON_ASSIGN_OR_RAISE(table_out, arrow::ConcatenateTables(tables)); LOG(INFO) << "Done concatenating tables, rows : " << table_out->num_rows(); return Status::OK(); @@ -432,16 +448,313 @@ Status Sort(const std::shared_ptr
&table, const std::vector &sor return Table::FromArrowTable(ctx, sorted_table, out); } -Status DistributedSort(const std::shared_ptr
&table, - int sort_column, - std::shared_ptr
&output, - bool ascending, - SortOptions sort_options) { - return DistributedSort(table, std::vector{sort_column}, output, std::vector{ascending}, - sort_options); +Status SampleTableUniform(const std::shared_ptr
&local_sorted, + int num_samples, std::vector sort_columns, + std::shared_ptr
&sample_result, + const std::shared_ptr &ctx) { + auto pool = cylon::ToArrowPool(ctx); + + CYLON_ASSIGN_OR_RAISE(auto local_sorted_selected_cols, local_sorted->get_table()->SelectColumns(sort_columns)); + + if (local_sorted->Rows() == 0 || num_samples == 0) { + std::shared_ptr output; + RETURN_CYLON_STATUS_IF_ARROW_FAILED(util::CreateEmptyTable( + local_sorted_selected_cols->schema(), &output, pool)); + sample_result = std::make_shared
(ctx, std::move(output)); + return Status::OK(); + } + + float step = local_sorted->Rows() / (num_samples + 1.0); + float acc = step; + arrow::Int64Builder filter(pool); + RETURN_CYLON_STATUS_IF_ARROW_FAILED(filter.Reserve(num_samples)); + + for (int i = 0; i < num_samples; i++) { + filter.UnsafeAppend(acc); + acc += step; + } + + CYLON_ASSIGN_OR_RAISE(auto take_arr, filter.Finish()); + CYLON_ASSIGN_OR_RAISE( + auto take_res, + (arrow::compute::Take(local_sorted_selected_cols, take_arr))); + sample_result = std::make_shared
(ctx, take_res.table()); + + return Status::OK(); } -Status DistributedSort(const std::shared_ptr
&table, +template +static int CompareRows(const std::vector> &comparators, + int64_t idx_a, + int64_t idx_b) { + int sz = comparators.size(); + if (std::is_same::value) { + idx_b |= (int64_t)1 << 63; + } + for (int i = 0; i < sz; i++) { + int result = comparators[i]->compare(idx_a, idx_b); + if (result == 0) continue; + return result; + } + return 0; +} + +Status MergeSortedTable(const std::vector> &tables, + const std::vector &sort_columns, + const std::vector &sort_orders, + std::shared_ptr
&out) { + std::shared_ptr
concatenated; + std::vector table_indices(tables.size()), + table_end_indices(tables.size()); + int acc = 0; + for (int i = 0; i < table_indices.size(); i++) { + table_indices[i] = acc; + acc += tables[i]->Rows(); + table_end_indices[i] = acc; + } + + RETURN_CYLON_STATUS_IF_FAILED(Merge(tables, concatenated)); + + if(concatenated->GetContext()->GetWorldSize() > 4) { + return Sort(concatenated, sort_columns, out, sort_orders); + } + + std::unique_ptr equal_to; + RETURN_CYLON_STATUS_IF_FAILED(TableRowIndexEqualTo::Make( + concatenated->get_table(), sort_columns, &equal_to, sort_orders)); + + auto comp = [&](int a, int b) { // a and b are index of table in `tables` + int64_t a_idx = table_indices[a], b_idx = table_indices[b]; + return equal_to->compare(a_idx, b_idx) > 0; + }; + + std::priority_queue, decltype(comp)> pq(comp); + + for (int i = 0; i < tables.size(); i++) { + if (table_indices[i] < table_end_indices[i]) { + pq.push(i); + } + } + + auto ctx = concatenated->GetContext(); + auto pool = cylon::ToArrowPool(ctx); + arrow::Int64Builder filter(pool); + RETURN_CYLON_STATUS_IF_ARROW_FAILED(filter.Reserve(concatenated->Rows())); + + std::vector temp_v; + + while (!pq.empty()) { + int t = pq.top(); + pq.pop(); + // std::cout<get_table(), take_arr))); + + out = std::make_shared
(ctx, take_res.table()); + + return Status::OK(); +} + +Status DetermineSplitPoints( + const std::vector> &gathered_tables_include_root, + const std::vector &sort_orders, + std::shared_ptr
&split_points, + const std::shared_ptr &ctx) { + std::shared_ptr
merged_table; + + std::vector sort_columns(sort_orders.size()); + std::iota(sort_columns.begin(), sort_columns.end(), 0); + + RETURN_CYLON_STATUS_IF_FAILED(MergeSortedTable( + gathered_tables_include_root, sort_columns, sort_orders, merged_table)); + + int num_split_points = + std::min(merged_table->Rows(), (int64_t)ctx->GetWorldSize() - 1); + + return SampleTableUniform(merged_table, num_split_points, sort_columns, split_points, ctx); +} + +Status GetSplitPoints(std::shared_ptr
&sample_result, + const std::vector &sort_orders, + int num_split_points, + std::shared_ptr
&split_points) { + auto ctx = sample_result->GetContext(); + + std::vector> gather_results; + // net::MPICommunicator comm; + RETURN_CYLON_STATUS_IF_FAILED( + ctx->GetCommunicator()->Gather(sample_result, 0, true, &gather_results)); + + if (ctx->GetRank() == 0) { + RETURN_CYLON_STATUS_IF_FAILED( + DetermineSplitPoints(gather_results, sort_orders, + split_points, sample_result->GetContext())); + } + + return ctx->GetCommunicator()->Bcast(&split_points, 0); +} + +// return (index of) first element that is not less than the target element +int64_t tableBinarySearch( + const std::shared_ptr
&split_points, + const std::shared_ptr
&sorted_table, + std::unique_ptr& equal_to, + int64_t split_point_idx, int64_t l) { + int64_t r = sorted_table->Rows() - 1; + int L = l; + + while (r >= l) { + int64_t m = (l + r) / 2; + int compare_result_1 = equal_to->compare(m, util::SetBit(split_point_idx)); + int compare_result_2 = + m == L ? -1 : equal_to->compare(m - 1, util::SetBit(split_point_idx)); + if (compare_result_1 >= 0 && compare_result_2 < 0) + return m; + else if (compare_result_1 < 0) { + l = m + 1; + } else { + r = m - 1; + } + } + + return sorted_table->Rows(); +} + +Status GetSplitPointIndices(const std::shared_ptr
&split_points, + const std::shared_ptr
&sorted_table, + const std::vector &sort_columns, + const std::vector &sort_order, + std::vector &target_partition, + std::vector &partition_hist) { + // binary search + int num_split_points = split_points->Rows(); + + auto arrow_sorted_table = sorted_table->get_table(); + auto arrow_split_points = split_points->get_table(); + + CYLON_ASSIGN_OR_RAISE(auto arrow_sorted_table_comb, + arrow_sorted_table->CombineChunks( + ToArrowPool(sorted_table->GetContext()))); + CYLON_ASSIGN_OR_RAISE(auto arrow_split_points_comb, + arrow_split_points->CombineChunks( + ToArrowPool(sorted_table->GetContext()))); + + std::vector split_points_sort_cols(split_points->Columns()); + std::iota(split_points_sort_cols.begin(), split_points_sort_cols.end(), 0); + + std::unique_ptr equal_to; + RETURN_CYLON_STATUS_IF_FAILED(DualTableRowIndexEqualTo::Make( + arrow_sorted_table_comb, arrow_split_points_comb, sort_columns, + split_points_sort_cols, &equal_to, sort_order)); + + int64_t num_rows = sorted_table->Rows(); + target_partition.resize(num_rows); + partition_hist.resize(num_split_points + 1); + int64_t l_idx = 0; + + for (int64_t i = 0; i < num_split_points; i++) { + int64_t idx = + tableBinarySearch(split_points, sorted_table, equal_to, i, l_idx); + std::fill(target_partition.begin() + l_idx, target_partition.begin() + idx, + i); + partition_hist[i] = idx - l_idx; + l_idx = idx; + } + + std::fill(target_partition.begin() + l_idx, target_partition.end(), + num_split_points); + partition_hist[num_split_points] = num_rows - l_idx; + + return Status::OK(); +} + +/** + * perform distributed sort on provided table + * @param table + * @param sort_columns sort based on these columns + * @param sort_direction Sort direction 'true' indicates ascending ordering and false indicate descending ordering. + * @param sorted_table resulting table + * @return + */ +Status DistributedSortRegularSampling(const std::shared_ptr
&table, + const std::vector &sort_columns, + const std::vector &sort_direction, + std::shared_ptr &output, + SortOptions sort_options) { + if (sort_columns.size() > table->Columns()) { + return Status(Code::ValueError, + "number of values in sort_column_indices can not larger than " + "the number of columns"); + } else if (sort_columns.size() != sort_direction.size()) { + return Status(Code::ValueError, + "sizes of sort_column_indices and column_orders must match"); + } + + const auto &ctx = table->GetContext(); + int world_sz = ctx->GetWorldSize(); + + if (world_sz == 1) { + return Sort(table, sort_columns, output, sort_direction); + } + // locally sort + std::shared_ptr
local_sorted; + RETURN_CYLON_STATUS_IF_FAILED( + Sort(table, sort_columns, local_sorted, sort_direction)); + + const int SAMPLING_RATIO = + sort_options.num_samples == 0 ? 2 : sort_options.num_samples; + + std::vector target_partitions, partition_hist; + std::vector> split_tables; + + // sample the sorted table with sort columns and create a table + int sample_count = ctx->GetWorldSize() * SAMPLING_RATIO; + sample_count = std::min((int64_t)sample_count, table->Rows()); + + // sample_result only contains sorted columns + std::shared_ptr
sample_result; + + RETURN_CYLON_STATUS_IF_FAILED( + SampleTableUniform(local_sorted, sample_count, sort_columns, sample_result, ctx)); + + // determine split point, split_points only contains sorted columns + std::shared_ptr
split_points; + RETURN_CYLON_STATUS_IF_FAILED(GetSplitPoints( + sample_result, sort_direction, world_sz - 1, split_points)); + + // construct target_partition, partition_hist + RETURN_CYLON_STATUS_IF_FAILED( + GetSplitPointIndices(split_points, local_sorted, sort_columns, + sort_direction, target_partitions, partition_hist)); + + // split and all_to_all + RETURN_CYLON_STATUS_IF_FAILED(Split(local_sorted, world_sz, target_partitions, + partition_hist, split_tables)); + + // we are going to free if retain is set to false. therefore, we need to make + // a copy of schema + std::shared_ptr schema = table->get_table()->schema(); + // if (!table->IsRetain()) { + // const_cast &>(table).reset(); + // } + std::vector> all_to_all_result; + RETURN_CYLON_STATUS_IF_FAILED(all_to_all_arrow_tables_separated_cylon_table( + ctx, schema, split_tables, all_to_all_result)); + + return MergeSortedTable(all_to_all_result, sort_columns, sort_direction, output); +} + +Status DistributedSortInitialSampling(const std::shared_ptr
&table, const std::vector &sort_columns, std::shared_ptr
&output, const std::vector &sort_direction, @@ -498,6 +811,27 @@ Status DistributedSort(const std::shared_ptr
&table, return Table::FromArrowTable(ctx, sorted_table, output); } +Status DistributedSort(const std::shared_ptr
&table, + int sort_column, + std::shared_ptr
&output, + bool ascending, + SortOptions sort_options) { + return DistributedSort(table, std::vector{sort_column}, output, std::vector{ascending}, + sort_options); +} + +Status DistributedSort(const std::shared_ptr
&table, + const std::vector &sort_columns, + std::shared_ptr
&output, + const std::vector &sort_direction, + SortOptions sort_options) { + if(sort_options.sort_method == sort_options.INITIAL_SAMPLE) { + return DistributedSortInitialSampling(table, sort_columns, output, sort_direction, sort_options); + } else { + return DistributedSortRegularSampling(table, sort_columns, sort_direction, output, sort_options); + } +} + Status HashPartition(const std::shared_ptr
&table, const std::vector &hash_columns, int no_of_partitions, std::unordered_map> *out) { @@ -1165,14 +1499,14 @@ Status DistributedEquals(const std::shared_ptr &a, std::vector column_orders(col, true); std::iota(indices.begin(), indices.end(), 0); - std::shared_ptr b_repartitioned; - RETURN_CYLON_STATUS_IF_FAILED(RepartitionToMatchOtherTable(a, b, &b_repartitioned)); + std::shared_ptr a_sorted, b_sorted; + RETURN_CYLON_STATUS_IF_FAILED(DistributedSort(a, indices, a_sorted, column_orders)); + RETURN_CYLON_STATUS_IF_FAILED(DistributedSort(b, indices, b_sorted, column_orders)); - std::shared_ptr out_a, out_b; - RETURN_CYLON_STATUS_IF_FAILED(DistributedSort(a, indices, out_a, column_orders)); - RETURN_CYLON_STATUS_IF_FAILED(DistributedSort(b_repartitioned, indices, out_b, column_orders)); + std::shared_ptr b_repartitioned; + RETURN_CYLON_STATUS_IF_FAILED(RepartitionToMatchOtherTable(a_sorted, b_sorted, &b_repartitioned)); - RETURN_CYLON_STATUS_IF_FAILED(Equals(out_a, out_b, subResult)); + RETURN_CYLON_STATUS_IF_FAILED(Equals(a_sorted, b_repartitioned, subResult)); } else { std::shared_ptr b_repartitioned; RETURN_CYLON_STATUS_IF_FAILED(RepartitionToMatchOtherTable(a, b, &b_repartitioned)); @@ -1196,11 +1530,6 @@ Status Repartition(const std::shared_ptr &table, int rank = table->GetContext()->GetRank(); int num_row = (int) table->Rows(); - if (num_row == 0) { - *output = table; - return Status::OK(); - } - if (rows_per_partition.size() != (size_t) world_size) { return Status(cylon::Code::ValueError, "rows_per_partition size does not align with world size. Received " + diff --git a/cpp/src/cylon/table.hpp b/cpp/src/cylon/table.hpp index 39d4ecb88..6b4f76203 100644 --- a/cpp/src/cylon/table.hpp +++ b/cpp/src/cylon/table.hpp @@ -354,11 +354,17 @@ Status Sort(const std::shared_ptr
&table, const std::vector &sor */ struct SortOptions { + enum SortMethod { + REGULAR_SAMPLE = 0, + INITIAL_SAMPLE = 1 + }; uint32_t num_bins; uint64_t num_samples; + SortMethod sort_method; - static SortOptions Defaults() { return {0, 0}; } + static SortOptions Defaults() { return {0, 0, REGULAR_SAMPLE}; } }; + Status DistributedSort(const std::shared_ptr
&table, int sort_column, std::shared_ptr
&output, @@ -371,6 +377,29 @@ Status DistributedSort(const std::shared_ptr
&table, const std::vector &sort_direction, SortOptions sort_options = SortOptions::Defaults()); +/** + * given the index of split point in split_points, return the index in + * sorted_table where all entries before the index is less than the spilt point. + * @param split_points table of split points + * @param sorted_table table to split + * @param equal_to comparator + * @param split_point_idx index of split point + * @param l optional starting index + */ +int64_t tableBinarySearch( + const std::shared_ptr
&split_points, + const std::shared_ptr
&sorted_table, + std::unique_ptr& equal_to, + int64_t split_point_idx, int64_t l = 0); + +/** + * merge tables + */ +Status MergeSortedTable(const std::vector> &tables, + const std::vector &sort_columns, + const std::vector &sort_orders, + std::shared_ptr
&out); + /** * Filters out rows based on the selector function * @param table diff --git a/cpp/src/examples/CMakeLists.txt b/cpp/src/examples/CMakeLists.txt index 1087de2da..bc88311a3 100644 --- a/cpp/src/examples/CMakeLists.txt +++ b/cpp/src/examples/CMakeLists.txt @@ -74,6 +74,7 @@ cylon_add_exe(multicolumn_sorting_example) cylon_add_exe(multi_idx_join_example) cylon_add_exe(parquet_union_example) cylon_add_exe(parquet_join_example) +cylon_add_exe(dist_sort_example) if (CYLON_UCX) cylon_add_exe(ucx_join_example) diff --git a/cpp/src/examples/dist_sort_example.cpp b/cpp/src/examples/dist_sort_example.cpp new file mode 100644 index 000000000..c4b410d52 --- /dev/null +++ b/cpp/src/examples/dist_sort_example.cpp @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +/** + * To run this example: ./dist_sort_example + */ + +#include +#include + +#include +#include +#include + +#include "example_utils.hpp" + +int64_t run_example(std::shared_ptr& table) { + std::shared_ptr output; + auto start_1 = std::chrono::steady_clock::now(); + auto status = DistributedSort(table, 0, output, true); + auto end_1 = std::chrono::steady_clock::now(); + + int64_t time = std::chrono::duration_cast(end_1 - start_1).count(); + + return time; +} + +int main(int argc, char *argv[]) { + if (argc < 1) { + LOG(ERROR) << "./dist_sort_example " << std::endl; + return 1; + } + + auto mpi_config = std::make_shared(); + std::shared_ptr ctx; + auto status = cylon::CylonContext::InitDistributed(mpi_config, &ctx); + + std::shared_ptr table, output_merge, output_sort; + + uint64_t count = std::stoull(argv[1]); + cylon::examples::create_in_memory_tables(count, 0.2, ctx, table); + + int iters = std::stoi(argv[2]); + + ctx->Barrier(); + + int64_t time = 0; + + for(int i = 0; i < iters; i++) { + time += run_example(table); + } + + std::cout<< time << " ms in total." << std::endl; + + return 0; +} diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index aaf2496ed..20b8140c8 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -163,6 +163,11 @@ cylon_add_test(custom_mpi_comm_test) cylon_run_test(custom_mpi_comm_test 1 mpi) cylon_run_test(custom_mpi_comm_test 4 mpi) +# distributed sort test +cylon_add_test(dist_sort_test) +cylon_run_test(dist_sort_test 1 mpi) +cylon_run_test(dist_sort_test 2 mpi) +cylon_run_test(dist_sort_test 4 mpi) #### Gloo tests if (CYLON_GLOO) @@ -197,4 +202,4 @@ if (CYLON_GLOO) cylon_run_test(sync_comms_test 1 gloo-mpi) cylon_run_test(sync_comms_test 4 gloo-mpi) -endif (CYLON_GLOO) \ No newline at end of file +endif (CYLON_GLOO) diff --git a/cpp/test/aggregate_test.cpp b/cpp/test/aggregate_test.cpp index 24583acaf..ca460b722 100644 --- a/cpp/test/aggregate_test.cpp +++ b/cpp/test/aggregate_test.cpp @@ -12,9 +12,9 @@ * limitations under the License. */ -#include "cylon/compute/aggregates.hpp" -#include "cylon/mapreduce/mapreduce.hpp" -#include "cylon/net/mpi/mpi_operations.hpp" +#include +#include +#include #include "common/test_header.hpp" #include "test_utils.hpp" diff --git a/cpp/test/dist_sort_test.cpp b/cpp/test/dist_sort_test.cpp new file mode 100644 index 000000000..fa900b0d8 --- /dev/null +++ b/cpp/test/dist_sort_test.cpp @@ -0,0 +1,288 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "common/test_header.hpp" +#include "test_utils.hpp" + +namespace cylon { +void testDistSort(const std::vector& sort_cols, + const std::vector& sort_order, + std::shared_ptr
& global_table, + std::shared_ptr
& table) { + std::shared_ptr
out; + auto ctx = table->GetContext(); + std::shared_ptr arrow_output; + + auto status = DistributedSort(table, sort_cols, out, sort_order, + {0, 0, SortOptions::INITIAL_SAMPLE}); + REQUIRE(status.is_ok()); + std::shared_ptr
out2; + bool eq; + + if (RANK == 0) { + status = Sort(global_table, sort_cols, out2, sort_order); + } else { + auto pool = cylon::ToArrowPool(ctx); + std::shared_ptr arrow_empty_table; + auto arrow_status = util::CreateEmptyTable( + global_table->get_table()->schema(), &arrow_empty_table, pool); + out2 = std::make_shared
(ctx, arrow_empty_table); + } + std::shared_ptr
out3; + status = Repartition(out2, &out3); + status = DistributedEquals(out3, out, eq); + REQUIRE(eq); +} + +namespace test { +TEMPLATE_LIST_TEST_CASE("Dist sort testing", "[dist sort]", ArrowNumericTypes) { + auto type = default_type_instance(); + auto schema = arrow::schema({{arrow::field("a", type)}, + {arrow::field("b", arrow::float32())}}); + auto global_arrow_table = TableFromJSON(schema, {R"([{"a": 3, "b":0.025}, + {"a": 26, "b":0.394}, + {"a": 51, "b":0.755}, + {"a": 20, "b":0.030}, + {"a": 33, "b":0.318}, + {"a": 12, "b":0.813}, + {"a": 72, "b":0.968}, + {"a": 29, "b":0.291}, + {"a": 41, "b":0.519}, + {"a": 29, "b":0.291}, + {"a": 41, "b":0.519}, + {"a": 43, "b":0.419}, + {"a": 57, "b":0.153}, + {"a": 25, "b":0.479}, + {"a": 26, "b":0.676}, + {"a": 70, "b":0.504}, + {"a": 7, "b":0.232}, + {"a": 45, "b":0.734}, + {"a": 61, "b":0.685}, + {"a": 57, "b":0.314}, + {"a": 59, "b": 0.837}, + {"a": 67, "b": 0.086}, + {"a": 14, "b": 0.193}, + {"a": 21, "b": 0.853}, + {"a": 10, "b": 0.808}, + {"a": 13, "b": 0.085}, + {"a": 31, "b": 0.122}, + {"a": 20, "b": 0.689}, + {"a": 37, "b": 0.491}, + {"a": 62, "b": 0.262}, + {"a": 1 , "b": 0.868}, + {"a": 19, "b": 0.422}, + {"a": 64, "b": 0.528}, + {"a": 37, "b": 0.834}, + {"a": 33, "b": 0.010}, + {"a": 76, "b": 0.927}, + {"a": 4 , "b": 0.529}, + {"a": 13, "b": 0.201}, + {"a": 45, "b": 0.898}, + {"a": 67, "b": 0.407}])"}); + + int64_t rows_per_tab = global_arrow_table->num_rows() / WORLD_SZ; + std::shared_ptr
table1; + CHECK_CYLON_STATUS(Table::FromArrowTable( + ctx, global_arrow_table->Slice(RANK * rows_per_tab, rows_per_tab), + table1)); + std::shared_ptr
global_table; + CHECK_CYLON_STATUS( + Table::FromArrowTable(ctx, global_arrow_table, global_table)); + + SECTION("dist_sort_test_1") { + testDistSort({0, 1}, {1, 1}, global_table, table1); + } + + SECTION("dist_sort_test_2_different_direction") { + testDistSort({0, 1}, {1, 0}, global_table, table1); + } + + SECTION("dist_sort_test_3_different_order") { + testDistSort({1, 0}, {0, 0}, global_table, table1); + } + + SECTION("dist_sort_test_4_one_empty_table") { + if (RANK == 0) { + auto pool = cylon::ToArrowPool(ctx); + + std::shared_ptr arrow_empty_table; + auto arrow_status = util::CreateEmptyTable(table1->get_table()->schema(), + &arrow_empty_table, pool); + auto empty_table = std::make_shared
(ctx, arrow_empty_table); + table1 = empty_table; + } + + std::shared_ptr
out, out2; + auto ctx = table1->GetContext(); + std::shared_ptr arrow_output; + auto status = DistributedSort(table1, {1, 0}, out, {0, 0}); + REQUIRE(status.is_ok()); + status = DistributedSort(table1, {1, 0}, out2, {0, 0}, + {0, 0, SortOptions::INITIAL_SAMPLE}); + REQUIRE(status.is_ok()); + bool eq; + status = DistributedEquals(out, out2, eq); + REQUIRE(eq); + } +} + +TEST_CASE("Binary search testing", "[binary search]") { + auto schema = arrow::schema({{arrow::field("a", arrow::uint32())}, + {arrow::field("b", arrow::float32())}}); + + auto arrow_table = TableFromJSON(schema, {R"([ {"a": 1, "b": 0.868000}, + {"a": 3, "b": 0.025000}, + {"a": 4, "b": 0.529000}, + {"a": 7, "b": 0.232000}, + {"a": 10, "b": 0.808000}, + {"a": 12, "b": 0.813000}, + {"a": 13, "b": 0.085000}, + {"a": 13, "b": 0.201000}, + {"a": 14, "b": 0.193000}, + {"a": 19, "b": 0.422000}, + {"a": 20, "b": 0.030000}, + {"a": 20, "b": 0.689000}, + {"a": 21, "b": 0.853000}, + {"a": 25, "b": 0.479000}, + {"a": 26, "b": 0.394000}, + {"a": 26, "b": 0.676000}, + {"a": 29, "b": 0.291000}, + {"a": 29, "b": 0.291000}, + {"a": 31, "b": 0.122000}, + {"a": 33, "b": 0.010000}, + {"a": 33, "b": 0.318000}, + {"a": 37, "b": 0.491000}, + {"a": 37, "b": 0.834000}, + {"a": 41, "b": 0.519000}, + {"a": 41, "b": 0.519000}, + {"a": 43, "b": 0.419000}, + {"a": 45, "b": 0.734000}, + {"a": 45, "b": 0.898000}, + {"a": 51, "b": 0.755000}, + {"a": 57, "b": 0.153000}, + {"a": 57, "b": 0.314000}, + {"a": 59, "b": 0.837000}, + {"a": 61, "b": 0.685000}, + {"a": 62, "b": 0.262000}, + {"a": 64, "b": 0.528000}, + {"a": 67, "b": 0.086000}, + {"a": 67, "b": 0.407000}, + {"a": 70, "b": 0.504000}, + {"a": 72, "b": 0.968000}, + {"a": 76, "b": 0.927000}])"}); + + std::shared_ptr
table; + CHECK_CYLON_STATUS(Table::FromArrowTable(ctx, arrow_table, table)); + + auto arrow_split_points = + TableFromJSON(schema, {R"([ {"a": 1, "b": 0.068000}, + {"a": 4, "b": 0.529000}, + {"a": 27, "b": 0.025000}, + {"a": 80, "b": 0.232000}])"}); + + std::shared_ptr
split_points; + CHECK_CYLON_STATUS(Table::FromArrowTable(ctx, arrow_split_points, split_points)); + + + std::unique_ptr equal_to; + auto status = DualTableRowIndexEqualTo::Make( + table->get_table(), split_points->get_table(), {0, 1}, + {0, 1}, &equal_to, {1, 1}); + + SECTION("search target less than table's smallest entry") { + REQUIRE(tableBinarySearch(split_points, table, equal_to, 0) == 0); + } + + SECTION("table contains search target") { + REQUIRE(tableBinarySearch(split_points, table, equal_to, 1) == 2); + } + + + SECTION("search target not exsit but within range of table") { + REQUIRE(tableBinarySearch(split_points, table, equal_to, 2) == 16); + } + + + SECTION("search target greater than table's largest entry") { + REQUIRE(tableBinarySearch(split_points, table, equal_to, 3) == table->Rows()); + } +} + +TEST_CASE("Merge testing", "[table merge]") { + auto schema = arrow::schema({{arrow::field("a", arrow::uint32())}, + {arrow::field("b", arrow::float32())}}); + + auto arrow_table_1 = TableFromJSON(schema, {R"([ {"a": 1, "b": 0.868000}, + {"a": 3, "b": 0.025000}, + {"a": 4, "b": 0.529000}, + {"a": 7, "b": 0.232000}, + {"a": 10, "b": 0.808000}, + {"a": 12, "b": 0.813000}, + {"a": 13, "b": 0.085000}, + {"a": 13, "b": 0.201000}, + {"a": 14, "b": 0.193000}, + {"a": 19, "b": 0.422000}, + {"a": 20, "b": 0.030000}, + {"a": 20, "b": 0.689000}, + {"a": 21, "b": 0.853000}, + {"a": 25, "b": 0.479000}, + {"a": 26, "b": 0.394000}, + {"a": 61, "b": 0.685000}, + {"a": 62, "b": 0.262000}, + {"a": 64, "b": 0.528000}, + {"a": 72, "b": 0.968000}, + {"a": 76, "b": 0.927000}])"}); + + std::shared_ptr
table1; + CHECK_CYLON_STATUS(Table::FromArrowTable(ctx, arrow_table_1, table1)); + + auto arrow_table_2 = TableFromJSON(schema, {R"([ {"a": 26, "b": 0.676000}, + {"a": 29, "b": 0.291000}, + {"a": 29, "b": 0.291000}, + {"a": 31, "b": 0.122000}, + {"a": 33, "b": 0.010000}, + {"a": 33, "b": 0.318000}, + {"a": 37, "b": 0.491000}, + {"a": 37, "b": 0.834000}, + {"a": 41, "b": 0.519000}, + {"a": 41, "b": 0.519000}, + {"a": 43, "b": 0.419000}, + {"a": 45, "b": 0.734000}, + {"a": 45, "b": 0.898000}, + {"a": 51, "b": 0.755000}, + {"a": 57, "b": 0.153000}, + {"a": 57, "b": 0.314000}, + {"a": 59, "b": 0.837000}, + {"a": 67, "b": 0.086000}, + {"a": 67, "b": 0.407000}, + {"a": 70, "b": 0.504000}])"}); + + std::shared_ptr
table2; + CHECK_CYLON_STATUS(Table::FromArrowTable(ctx, arrow_table_2, table2)); + + std::shared_ptr
concat, sorted, merged; + Merge({table1, table2}, concat); + Sort(concat, {0, 1}, sorted, {1, 1}); + + MergeSortedTable({table1, table2}, {0, 1}, {1, 1}, merged); + + bool result; + Equals(sorted, merged, result); + REQUIRE(result); +} + +} // namespace test +} // namespace cylon \ No newline at end of file diff --git a/python/pygcylon/setup.py b/python/pygcylon/setup.py index 2044fe105..39b74a76c 100644 --- a/python/pygcylon/setup.py +++ b/python/pygcylon/setup.py @@ -132,6 +132,9 @@ def build_extensions(self): get_python_lib(), os.path.join(os.sys.prefix, "lib")] +mpi_library_dir = os.popen("mpicc --showme:libdirs").read().strip().split(' ') +library_directories.extend(mpi_library_dir) + libraries = ["gcylon", "cylon", "cudf", "cudart", "glog"] cylon_include_dir = "../../cpp/src/" @@ -141,6 +144,9 @@ def build_extensions(self): cuda_include_dir, np.get_include()] +mpi_include_dir = os.popen("mpicc --showme:incdirs").read().strip().split(' ') +_include_dirs.extend(mpi_include_dir) + cython_files = ["pygcylon/**/*.pyx"] extensions = [