Skip to content

Commit

Permalink
Implement Slice, Head and Tail Operation in both centralize and distr… (
Browse files Browse the repository at this point in the history
#592)

* Implement Slice, Head and Tail Operation in both centralize and distributed environment

Signed-off-by: Arup Sarker <[email protected]>

* Add Distributed Slice with arrow api

Signed-off-by: Arup Sarker <[email protected]>

* [Cylon] Implementation of Distributive Slice, Head and Tail

Signed-off-by: Arup Sarker <[email protected]>

* [Cylon] Revert the changes based on the upstream

Signed-off-by: Arup Sarker <[email protected]>

* [Cylon] Implement Slice with new Logic

Signed-off-by: Arup Sarker <[email protected]>

* [Cylon] Implement new logic for slice and add a basic test case

Signed-off-by: Arup Sarker <[email protected]>

* Update cpp/src/cylon/table.cpp

Co-authored-by: niranda perera <[email protected]>

* Update cpp/src/cylon/table.cpp

Co-authored-by: niranda perera <[email protected]>

* Update cpp/src/cylon/table.cpp

Co-authored-by: niranda perera <[email protected]>

* Update cpp/src/examples/CMakeLists.txt

Co-authored-by: niranda perera <[email protected]>

* [Cylon] Removed unneccessary data copy and logs

Signed-off-by: Arup Sarker <[email protected]>

* [Cylon] Fix merge conflict

Signed-off-by: Arup Sarker <[email protected]>

* [Cylon] Refactoring Slice logic into seperate file

Signed-off-by: Arup Sarker <[email protected]>

* [Cylon] Update new test cases

Signed-off-by: Arup Sarker <[email protected]>

* [Cylon] Add multiple test cases for slice operation

Signed-off-by: Arup Sarker <[email protected]>

* [Cylon] Refactoring slice operation

Signed-off-by: Arup Sarker <[email protected]>

* [Cylon] Fix error message and un-necessary example files

Signed-off-by: Arup Sarker <[email protected]>

* [Cylon] Implement tail operation with new logic

Signed-off-by: Arup Sarker <[email protected]>

* [Cylon] Implement Tail operation by fixing the logic

Signed-off-by: Arup Sarker <[email protected]>

* Add sliceImple methods and clear unnecessary logs

Signed-off-by: Arup Sarker <[email protected]>

* [Cylon] Change the output table parameter from address to pointer

Signed-off-by: Arup Sarker <[email protected]>

* Squashing following commits

removing std out
fixing errors
more logs
more logs
adding logs
attempting to fix macos error
cosmetic  changes
cosmetic  changes

* Minor fixes  (#596)

* remove gloo default hostname

* minor change gloo

* adding gloo-mpi test

* adding ucc cyton

* Update setup.py

* Update setup.py

* adding ucc test

* adding multi env test

* cosmetic changes

* adding regular sampling cython

* adding UCC barrier

* adding macos11 tag for CI

* fixing windows error

* trying to fix macos ci

* trying to fix macos issue

* Revert "trying to fix macos issue"

This reverts commit cda5c2c.

* attempting to fix macos ci

* style-check

* adding gloo timeout

* adding custom mpiexec cmake var

* [Cylon] Handle corner case for slice test

Signed-off-by: Arup Sarker <[email protected]>

* Create README-summit.md (#602)

* Create README-summit.md

Detailed description of how to compile cylon on summit

* Update README-summit.md

* Update README-summit.md

* Update README-summit.md (#603)

add cmake path

* adding custom mpirun params cmake var (#604)

* adding custom mpirun params cmake var

* minor change

* changing merge to support empty tables

Signed-off-by: Arup Sarker <[email protected]>
Co-authored-by: niranda perera <[email protected]>
Co-authored-by: Gregor von Laszewski <[email protected]>
  • Loading branch information
3 people authored Aug 19, 2022
1 parent d99a6f2 commit 035fd70
Show file tree
Hide file tree
Showing 12 changed files with 607 additions and 4 deletions.
1 change: 1 addition & 0 deletions cpp/src/cylon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ add_library(cylon SHARED
indexing/index_utils.hpp
indexing/indexer.cpp
indexing/indexer.hpp
indexing/slice.cpp
io/arrow_io.cpp
io/arrow_io.hpp
io/csv_read_config.cpp
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/cylon/compute/aggregate_kernels.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include <cmath>
#include <vector>
#include <unordered_set>
#include <stdexcept>
#include <limits>


#include "cylon/util/macros.hpp"

Expand Down
170 changes: 170 additions & 0 deletions cpp/src/cylon/indexing/slice.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <memory>
#include <algorithm>

#include <cylon/table.hpp>
#include <cylon/ctx/arrow_memory_pool_utils.hpp>
#include <cylon/util/macros.hpp>
#include <cylon/util/arrow_utils.hpp>
#include <cylon/scalar.hpp>

namespace cylon {

static constexpr int64_t kZero = 0;

/**
* Slice the part of table to create a single table
* @param table, offset and length
* @return new sliced table
*/
Status Slice(const std::shared_ptr<Table> &in, int64_t offset, int64_t length,
std::shared_ptr<Table> *out) {
const auto &ctx = in->GetContext();
const auto &in_table = in->get_table();

std::shared_ptr<arrow::Table> out_table;
if (!in->Empty()) {
out_table = in_table->Slice(offset, length);
} else {
out_table = in_table;
}
return Table::FromArrowTable(ctx, std::move(out_table), *out);
}

/**
* DistributedSlice the part of table to create a single table
* @param table, global_offset and global_length
* @return new sliced table
*/
Status distributed_slice_impl(const std::shared_ptr<Table> &in,
int64_t global_offset,
int64_t global_length,
int64_t *partition_lengths,
std::shared_ptr<Table> *out) {
const auto &ctx = in->GetContext();
std::shared_ptr<cylon::Column> partition_len_col;

if (partition_lengths == nullptr) {
const auto &num_row_scalar = Scalar::Make(arrow::MakeScalar(in->Rows()));
RETURN_CYLON_STATUS_IF_FAILED(ctx->GetCommunicator()
->Allgather(num_row_scalar, &partition_len_col));

partition_lengths =
const_cast<int64_t *>(std::static_pointer_cast<arrow::Int64Array>(partition_len_col->data())
->raw_values());
}

int64_t rank = ctx->GetRank();
int64_t prefix_sum = std::accumulate(partition_lengths, partition_lengths + rank, kZero);
int64_t total_length = std::accumulate(partition_lengths + rank,
partition_lengths + ctx->GetWorldSize(),
prefix_sum);
if (global_offset > total_length) {
return {Code::Invalid, "global offset exceeds total length of the dist table"};
}
// adjust global length if it exceeds total_length
if (global_offset + global_length > total_length) {
global_length = total_length - global_offset;
}

int64_t this_length = *(partition_lengths + rank);
assert(this_length == in->Rows());

int64_t local_offset = std::max(kZero, std::min(global_offset - prefix_sum, this_length));
int64_t local_length =
std::min(this_length, std::max(global_offset + global_length - prefix_sum, kZero))
- local_offset;

return Slice(in, local_offset, local_length, out);
}

Status DistributedSlice(const std::shared_ptr<Table> &in,
int64_t offset,
int64_t length,
std::shared_ptr<Table> *out) {
return distributed_slice_impl(in, offset, length, nullptr, out);
}

/**
* Head the part of table to create a single table with specific number of rows
* @param tables, number of rows
* @return new table
*/
Status Head(const std::shared_ptr<Table> &table, int64_t num_rows,
std::shared_ptr<Table> *output) {
if (num_rows >= 0) {
return Slice(table, 0, num_rows, output);
} else
return {Code::Invalid, "Number of head rows should be >=0"};
}

Status DistributedHead(const std::shared_ptr<Table> &table, int64_t num_rows,
std::shared_ptr<Table> *output) {

std::shared_ptr<arrow::Table> in_table = table->get_table();

if (num_rows >= 0) {
return distributed_slice_impl(table, 0, num_rows, nullptr, output);
} else {
return {Code::Invalid, "Number of head rows should be >=0"};
}
}

/**
* Tail the part of table to create a single table with specific number of rows
* @param tables, number of rows
* @return new table
*/
Status Tail(const std::shared_ptr<Table> &table, int64_t num_rows,
std::shared_ptr<Table> *output) {

std::shared_ptr<arrow::Table> in_table = table->get_table();
const int64_t table_size = in_table->num_rows();

if (num_rows >= 0) {
return Slice(table, table_size - num_rows, num_rows, output);
} else {
return {Code::Invalid, "Number of tailed rows should be >=0"};
}
}

Status DistributedTail(const std::shared_ptr<Table> &table, int64_t num_rows,
std::shared_ptr<Table> *output) {
if (num_rows >= 0) {
const auto &ctx = table->GetContext();
std::shared_ptr<cylon::Column> partition_len_col;
const auto &num_row_scalar = Scalar::Make(arrow::MakeScalar(table->Rows()));
RETURN_CYLON_STATUS_IF_FAILED(ctx->GetCommunicator()
->Allgather(num_row_scalar, &partition_len_col));
assert(ctx->GetWorldSize() == partition_len_col->length());
auto *partition_lengths =
std::static_pointer_cast<arrow::Int64Array>(partition_len_col->data())
->raw_values();

int64_t dist_length =
std::accumulate(partition_lengths, partition_lengths + ctx->GetWorldSize(), kZero);

return distributed_slice_impl(table,
dist_length - num_rows,
num_rows,
const_cast <int64_t *> (partition_lengths),
output);
} else {
return {Code::Invalid, "Number of tailed rows should be >=0"};
}
}

}
1 change: 1 addition & 0 deletions cpp/src/cylon/net/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* limitations under the License.
*/

#include <cstddef>
#include "utils.hpp"


Expand Down
20 changes: 16 additions & 4 deletions cpp/src/cylon/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <memory>
#include <unordered_map>
#include <iostream>
#include <algorithm>

#include <cylon/table.hpp>
#include <cylon/join/join_utils.hpp>
Expand Down Expand Up @@ -277,7 +278,11 @@ static inline Status shuffle_two_tables_by_hashing(const std::shared_ptr<cylon::
Status FromCSV(const std::shared_ptr<CylonContext> &ctx, const std::string &path,
std::shared_ptr<Table> &tableOut, const cylon::io::config::CSVReadOptions &options) {
arrow::Result<std::shared_ptr<arrow::Table>> result = cylon::io::read_csv(ctx, path, options);

LOG(INFO) << "Reading Inside FromCSV";

if (result.ok()) {
LOG(INFO) << "CSV file reading is OK";
std::shared_ptr<arrow::Table> &table = result.ValueOrDie();
if (table->column(0)->chunks().size() > 1) {
const auto &combine_res = table->CombineChunks(ToArrowPool(ctx));
Expand All @@ -289,6 +294,7 @@ Status FromCSV(const std::shared_ptr<CylonContext> &ctx, const std::string &path
}
// slice the table if required
if (options.IsSlice() && ctx->GetWorldSize() > 1) {
LOG(INFO) << "Slice the table if required";
int32_t rows_per_worker = table->num_rows() / ctx->GetWorldSize();
int32_t remainder = table->num_rows() % ctx->GetWorldSize();

Expand Down Expand Up @@ -379,12 +385,14 @@ Status Merge(const std::vector<std::shared_ptr<cylon::Table>> &ctables,
std::vector<std::shared_ptr<arrow::Table>> tables;
tables.reserve(ctables.size());
for (const auto &t: ctables) {
if (t->Rows()) {
std::shared_ptr<arrow::Table> arrow;
t->ToArrowTable(arrow);
tables.push_back(std::move(arrow));
if (!t->Empty()) {
tables.push_back(t->get_table());
}
}
if (tables.empty()) { // means all tables are empty. return a cylon table from 0th table
tableOut = ctables[0];
return Status::OK();
}

const auto &ctx = ctables[0]->GetContext();
const auto &concat_res = arrow::ConcatenateTables(tables);
Expand Down Expand Up @@ -1472,6 +1480,7 @@ static Status RepartitionToMatchOtherTable(const std::shared_ptr<cylon::Table> &

std::vector<int64_t> rows_per_partition;
std::shared_ptr<cylon::Column> output;

RETURN_CYLON_STATUS_IF_FAILED(
a->GetContext()->GetCommunicator()->Allgather(num_row_scalar, &output));
auto *data_ptr =
Expand Down Expand Up @@ -1545,6 +1554,7 @@ Status Repartition(const std::shared_ptr<cylon::Table> &table,

auto num_row_scalar = std::make_shared<Scalar>(arrow::MakeScalar(num_row));


RETURN_CYLON_STATUS_IF_FAILED(
table->GetContext()->GetCommunicator()->Allgather(num_row_scalar,
&sizes_cols));
Expand Down Expand Up @@ -1750,5 +1760,7 @@ Status WriteParquet(const std::shared_ptr<cylon::CylonContext> &ctx_,

return Status(Code::OK);
}


#endif
} // namespace cylon
42 changes: 42 additions & 0 deletions cpp/src/cylon/table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,48 @@ Status WriteParquet(const std::shared_ptr<cylon::CylonContext> &ctx,
std::shared_ptr<cylon::Table> &table,
const std::string &path,
const io::config::ParquetOptions &options = cylon::io::config::ParquetOptions());
/**
* Slice the part of table to create a single table
* @param tables, offset, length
* @return new sliced table
*/
Status Slice(const std::shared_ptr<Table> &in, int64_t offset, int64_t length,
std::shared_ptr<cylon::Table> *out);

/**
* DistributedSlice the part of table to create a single table
* @param table, offset and length
* @return new sliced table
*/


Status DistributedSlice(const std::shared_ptr<Table> &in, int64_t offset, int64_t length,
std::shared_ptr<cylon::Table> *out);


/**
* Head the part of table to create a single table with specific number of rows
* @param tables, number of rows
* @return new table
*/

Status Head(const std::shared_ptr<Table> &table, int64_t num_rows,
std::shared_ptr<cylon::Table> *output);
Status DistributedHead(const std::shared_ptr<Table> &table, int64_t num_rows,
std::shared_ptr<cylon::Table> *output);

/**
* Tail the part of table to create a single table with specific number of rows
* @param tables, number of rows
* @return new table
*/

Status Tail(const std::shared_ptr<Table> &table, int64_t num_rows,
std::shared_ptr<cylon::Table> *output);

Status DistributedTail(const std::shared_ptr<Table> &table, int64_t num_rows,
std::shared_ptr<cylon::Table> *output);

#endif // BUILD_CYLON_PARQUET

} // namespace cylon
Expand Down
1 change: 1 addition & 0 deletions cpp/src/cylon/thridparty/flat_hash_map/bytell_hash_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <type_traits>
#include <vector>
#include <array>
#include <limits>

#include <cylon/thridparty/flat_hash_map/flat_hash_map.hpp>

Expand Down
1 change: 1 addition & 0 deletions cpp/src/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ cylon_add_exe(multi_idx_join_example)
cylon_add_exe(parquet_union_example)
cylon_add_exe(parquet_join_example)
cylon_add_exe(dist_sort_example)
cylon_add_exe(slice_example)

if (CYLON_UCX)
cylon_add_exe(ucx_join_example)
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/examples/join_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ int main(int argc, char *argv[]) {
} else if (mem == "f") {
cylon::FromCSV(ctx, std::string(argv[3]) + std::to_string(ctx->GetRank()) + ".csv", first_table);
cylon::FromCSV(ctx, std::string(argv[4]) + std::to_string(ctx->GetRank()) + ".csv", second_table);

if (argc == 6) {
if (!strcmp(argv[5], "hash")) {
LOG(INFO) << "Hash join algorithm";
Expand All @@ -85,6 +86,7 @@ int main(int argc, char *argv[]) {
LOG(INFO) << "Read tables in "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
read_end_time - start_start).count() << "[ms]";

auto join_config = cylon::join::config::JoinConfig(cylon::join::config::JoinType::INNER,
0,
0,
Expand All @@ -109,6 +111,7 @@ int main(int argc, char *argv[]) {
<< std::chrono::duration_cast<std::chrono::milliseconds>(
join_end_time - read_end_time).count() << "[ms]";
std::vector<std::string> column_names = joined->ColumnNames();

ctx->Finalize();
return 0;
}
Loading

0 comments on commit 035fd70

Please sign in to comment.