Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Slice, Head and Tail Operation in both centralize and distr… #592

Merged
merged 30 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3ec4801
Implement Slice, Head and Tail Operation in both centralize and distr…
arupcsedu Jun 25, 2022
1b451cd
Add Distributed Slice with arrow api
arupcsedu Jun 25, 2022
68cfe5d
[Cylon] Implementation of Distributive Slice, Head and Tail
arupcsedu Jul 10, 2022
bde8bab
[Cylon] Revert the changes based on the upstream
arupcsedu Jul 14, 2022
f406753
Merge branch 'cylondata:main' into main
arupcsedu Jul 14, 2022
2a4a1dd
[Cylon] Implement Slice with new Logic
arupcsedu Jul 19, 2022
3e39974
[Cylon] Implement new logic for slice and add a basic test case
arupcsedu Jul 21, 2022
db5595c
Update cpp/src/cylon/table.cpp
arupcsedu Jul 21, 2022
bf286da
Update cpp/src/cylon/table.cpp
arupcsedu Jul 21, 2022
adca09e
Update cpp/src/cylon/table.cpp
arupcsedu Jul 21, 2022
62fc159
Update cpp/src/examples/CMakeLists.txt
arupcsedu Jul 21, 2022
bd47628
[Cylon] Removed unneccessary data copy and logs
arupcsedu Jul 21, 2022
a194f53
[Cylon] Fix merge conflict
arupcsedu Jul 21, 2022
60c50df
[Cylon] Refactoring Slice logic into seperate file
arupcsedu Jul 26, 2022
06d380d
[Cylon] Update new test cases
arupcsedu Jul 27, 2022
0f4acce
[Cylon] Add multiple test cases for slice operation
arupcsedu Jul 29, 2022
170d4c7
[Cylon] Refactoring slice operation
arupcsedu Aug 3, 2022
8c28aa8
[Cylon] Fix error message and un-necessary example files
arupcsedu Aug 3, 2022
9ee3914
[Cylon] Implement tail operation with new logic
arupcsedu Aug 6, 2022
1d54430
[Cylon] Implement Tail operation by fixing the logic
arupcsedu Aug 10, 2022
ffc70d3
Add sliceImple methods and clear unnecessary logs
arupcsedu Aug 11, 2022
e02f598
[Cylon] Change the output table parameter from address to pointer
arupcsedu Aug 12, 2022
f13b4e1
Squashing following commits
nirandaperera Aug 14, 2022
e6b7306
Minor fixes (#596)
nirandaperera Aug 16, 2022
6a03ab3
[Cylon] Handle corner case for slice test
arupcsedu Aug 17, 2022
4bc27f9
Create README-summit.md (#602)
laszewsk Aug 18, 2022
f20c119
Update README-summit.md (#603)
laszewsk Aug 18, 2022
d99a6f2
adding custom mpirun params cmake var (#604)
nirandaperera Aug 19, 2022
b4018cf
Merge branch 'main' of https://github.com/arupcsedu/cylon into arupcs…
nirandaperera Aug 19, 2022
b50efb2
changing merge to support empty tables
nirandaperera Aug 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/src/cylon/compute/aggregate_kernels.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include <cmath>
#include <vector>
#include <unordered_set>
#include <stdexcept>
#include <limits>


#include "cylon/util/macros.hpp"

Expand Down
1 change: 1 addition & 0 deletions cpp/src/cylon/net/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* limitations under the License.
*/

#include <cstddef>
#include "utils.hpp"


Expand Down
159 changes: 159 additions & 0 deletions cpp/src/cylon/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <memory>
#include <unordered_map>
#include <iostream>
#include <algorithm>

#include <cylon/table.hpp>
#include <cylon/join/join_utils.hpp>
Expand Down Expand Up @@ -277,7 +278,11 @@ static inline Status shuffle_two_tables_by_hashing(const std::shared_ptr<cylon::
Status FromCSV(const std::shared_ptr<CylonContext> &ctx, const std::string &path,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these additional logs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will reflect the changes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove these logs

std::shared_ptr<Table> &tableOut, const cylon::io::config::CSVReadOptions &options) {
arrow::Result<std::shared_ptr<arrow::Table>> result = cylon::io::read_csv(ctx, path, options);

LOG(INFO) << "Reading Inside FromCSV";

if (result.ok()) {
LOG(INFO) << "CSV file reading is OK";
std::shared_ptr<arrow::Table> &table = result.ValueOrDie();
if (table->column(0)->chunks().size() > 1) {
const auto &combine_res = table->CombineChunks(ToArrowPool(ctx));
Expand All @@ -289,6 +294,7 @@ Status FromCSV(const std::shared_ptr<CylonContext> &ctx, const std::string &path
}
// slice the table if required
if (options.IsSlice() && ctx->GetWorldSize() > 1) {
LOG(INFO) << "Slice the table if required";
int32_t rows_per_worker = table->num_rows() / ctx->GetWorldSize();
int32_t remainder = table->num_rows() % ctx->GetWorldSize();

Expand Down Expand Up @@ -1467,8 +1473,11 @@ Status Equals(const std::shared_ptr<cylon::Table> &a, const std::shared_ptr<cylo
static Status RepartitionToMatchOtherTable(const std::shared_ptr<cylon::Table> &a,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these logs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will reflect the changes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove these logs

const std::shared_ptr<cylon::Table> &b,
std::shared_ptr<cylon::Table> *b_out) {
LOG(INFO) << "I am at the RepartitionToMatchOtherTable";
int64_t num_row = a->Rows();

LOG(INFO) << "Table-A size: " << num_row;
LOG(INFO) << "Table-B size: " << b->Rows();
if (num_row == 0) {
*b_out = a;
return Status::OK();
Expand All @@ -1478,8 +1487,11 @@ static Status RepartitionToMatchOtherTable(const std::shared_ptr<cylon::Table> &

std::vector<int64_t> rows_per_partition;
std::shared_ptr<cylon::Column> output;

LOG(INFO) << "Execute before allgather api in RepartitionToMatchOtherTable";
RETURN_CYLON_STATUS_IF_FAILED(
a->GetContext()->GetCommunicator()->Allgather(num_row_scalar, &output));
LOG(INFO) << "Execute after allgather api in RepartitionToMatchOtherTable";
auto *data_ptr =
std::static_pointer_cast<arrow::Int64Array>(output->data())->raw_values();

Expand All @@ -1496,6 +1508,8 @@ Status DistributedEquals(const std::shared_ptr<cylon::Table> &a,
bool subResult;
RETURN_CYLON_STATUS_IF_FAILED(VerifyTableSchema(a->get_table(), b->get_table()));

LOG(INFO) << "I am at the DistributedEquals";

if (!ordered) {
int col = a->Columns();
std::vector<int32_t> indices(col);
Expand Down Expand Up @@ -1551,6 +1565,7 @@ Status Repartition(const std::shared_ptr<cylon::Table> &table,

auto num_row_scalar = std::make_shared<Scalar>(arrow::MakeScalar(num_row));


RETURN_CYLON_STATUS_IF_FAILED(
table->GetContext()->GetCommunicator()->Allgather(num_row_scalar,
&sizes_cols));
Expand Down Expand Up @@ -1756,5 +1771,149 @@ Status WriteParquet(const std::shared_ptr<cylon::CylonContext> &ctx_,

return Status(Code::OK);
}

/**
* Local_Slice the part of table to create a single table
* @param table, offset and length
* @return new sliced table
*/


Status Local_Slice(const std::shared_ptr<Table> &in, int64_t offset, int64_t length,
std::shared_ptr<cylon::Table> &out) {
const auto &ctx = in->GetContext();
std::shared_ptr<arrow::Table> out_table, in_table = in->get_table();

if (!in->Empty()) {
out_table = in_table->Slice(offset, length);
} else {
out_table = in_table;
}
return Table::FromArrowTable(ctx, std::move(out_table), out);
}


/**
* DistributedSlice the part of table to create a single table
* @param table, offset and length
* @return new sliced table
*/


Status DistributedSlice(const std::shared_ptr<cylon::Table> &in, int64_t offset, int64_t length,
std::shared_ptr<cylon::Table> &out) {

const auto &ctx = in->GetContext();
std::shared_ptr<arrow::Table> out_table, in_table = in->get_table();
auto num_row = in->Rows();

std::vector<int64_t> sizes;
std::shared_ptr<cylon::Column> sizes_cols;
RETURN_CYLON_STATUS_IF_FAILED(Column::FromVector(sizes, sizes_cols));

auto num_row_scalar = std::make_shared<Scalar>(arrow::MakeScalar(num_row));


RETURN_CYLON_STATUS_IF_FAILED(ctx->GetCommunicator()->Allgather(num_row_scalar, &sizes_cols));

auto *data_ptr =
std::static_pointer_cast<arrow::Int64Array>(sizes_cols->data())
->raw_values();

int64_t L = length;
LOG(INFO) << "Total Length: " << L;
sizes.resize(sizes_cols->length());
std::copy(data_ptr, data_ptr + sizes_cols->length(), sizes.data());

//$x = max(zero_0, min(K-L_i, sl_i))
//$x+y = min(sl_i, max(K+L-L_i, zero_0))

int64_t K = offset;
int64_t zero_0 = 0;
int64_t rank = ctx->GetRank();
int64_t L_i = std::accumulate(data_ptr, data_ptr + rank, zero_0);

LOG(INFO) << "Current Partion: " << rank << " and Size: " << sizes[rank] << "L_i: " << L_i;

int64_t sl_i = *(data_ptr + rank);


int64_t x = std::max(zero_0, std::min(K - L_i, sl_i));
int64_t y = std::min(sl_i, std::max(K + L - L_i, zero_0));


out_table = in_table->Slice(x, y);

return Table::FromArrowTable(ctx, std::move(out_table), out);
}


/**
* Head the part of table to create a single table with specific number of rows
* @param tables, number of rows
* @return new table
*/

Status Head(const std::shared_ptr<Table> &table, int64_t num_rows, std::shared_ptr<cylon::Table> &output) {

std::shared_ptr<arrow::Table> in_table = table->get_table();
const int64_t table_size = in_table->num_rows();

if(num_rows > 0 && table_size > 0) {
return Local_Slice(table, 0, num_rows, output);
}
else
LOG_AND_RETURN_ERROR(Code::ValueError, "Number of row should be greater than 0");

}

Status Distributed_Head(const std::shared_ptr<Table> &table, int64_t num_rows, std::shared_ptr<cylon::Table> &output) {

std::shared_ptr<arrow::Table> in_table = table->get_table();
const int64_t table_size = in_table->num_rows();

if(num_rows > 0 && table_size > 0) {
return DistributedSlice(table, 0, num_rows, output);
}
else
LOG_AND_RETURN_ERROR(Code::ValueError, "Number of row should be greater than 0");

}

/**
* Tail the part of table to create a single table with specific number of rows
* @param tables, number of rows
* @return new table
*/

Status Tail(const std::shared_ptr<Table> &table, int64_t num_rows, std::shared_ptr<cylon::Table> &output) {

std::shared_ptr<arrow::Table> in_table = table->get_table();
const int64_t table_size = in_table->num_rows();

LOG(INFO) << "Input Table size " << table_size;

if(num_rows > 0 && table_size > 0) {
return Local_Slice(table, table_size-num_rows, num_rows, output);
}
else
LOG_AND_RETURN_ERROR(Code::ValueError, "Number of row should be greater than 0");

}

Status Distributed_Tail(const std::shared_ptr<Table> &table, int64_t num_rows, std::shared_ptr<cylon::Table> &output) {

std::shared_ptr<arrow::Table> in_table = table->get_table();
const int64_t table_size = in_table->num_rows();
LOG(INFO) << "Input Table size " << table_size;

if(num_rows > 0 && table_size > 0) {
return DistributedSlice(table, table_size-num_rows, num_rows, output);
}
else
LOG_AND_RETURN_ERROR(Code::ValueError, "Number of row should be greater than 0");

}

#endif
} // namespace cylon
42 changes: 42 additions & 0 deletions cpp/src/cylon/table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,48 @@ Status WriteParquet(const std::shared_ptr<cylon::CylonContext> &ctx,
std::shared_ptr<cylon::Table> &table,
const std::string &path,
const io::config::ParquetOptions &options = cylon::io::config::ParquetOptions());
/**
* Local_Slice the part of table to create a single table
* @param tables, offset, length
* @return new sliced table
*/
Status Local_Slice(const std::shared_ptr<Table> &in, int64_t offset, int64_t length,
std::shared_ptr<cylon::Table> &out);

/**
* DistributedSlice the part of table to create a single table
* @param table, offset and length
* @return new sliced table
*/


Status DistributedSlice(const std::shared_ptr<Table> &in, int64_t offset, int64_t length,
std::shared_ptr<cylon::Table> &out);


/**
* Head the part of table to create a single table with specific number of rows
* @param tables, number of rows
* @return new table
*/

Status Head(const std::shared_ptr<Table> &table, int64_t num_rows,
std::shared_ptr<cylon::Table> &output);
Status Distributed_Head(const std::shared_ptr<Table> &table, int64_t num_rows,
std::shared_ptr<cylon::Table> &output);

/**
* Tail the part of table to create a single table with specific number of rows
* @param tables, number of rows
* @return new table
*/

Status Tail(const std::shared_ptr<Table> &table, int64_t num_rows,
std::shared_ptr<cylon::Table> &output);

Status Distributed_Tail(const std::shared_ptr<Table> &table, int64_t num_rows,
std::shared_ptr<cylon::Table> &output);

#endif // BUILD_CYLON_PARQUET

} // namespace cylon
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/cylon/thridparty/flat_hash_map/bytell_hash_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <type_traits>
#include <vector>
#include <array>
#include <stdexcept>
#include <limits>

#include <cylon/thridparty/flat_hash_map/flat_hash_map.hpp>

Expand Down
3 changes: 3 additions & 0 deletions cpp/src/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ cylon_add_exe(multi_idx_join_example)
cylon_add_exe(parquet_union_example)
cylon_add_exe(parquet_join_example)
cylon_add_exe(dist_sort_example)
cylon_add_exe(slice_example)
cylon_add_exe(head_example)
cylon_add_exe(tail_example)

if (CYLON_UCX)
cylon_add_exe(ucx_join_example)
Expand Down
Loading