Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing 554 #558

Merged
merged 10 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,16 @@ def python_test():
if OS_NAME == 'Linux':
env['LD_LIBRARY_PATH'] = str(Path(INSTALL_DIR, "lib")) + os.pathsep \
+ env['LD_LIBRARY_PATH']
logger.info(f"LD_LIBRARY_PATH: {env['LD_LIBRARY_PATH']}")
elif OS_NAME == 'Darwin':
env['DYLD_LIBRARY_PATH'] = str(Path(INSTALL_DIR, "lib")) + os.pathsep \
+ env['DYLD_LIBRARY_PATH']
logger.info(f"DYLD_LIBRARY_PATH: {env['DYLD_LIBRARY_PATH']}")
else: # Windows
env['PATH'] = str(Path(INSTALL_DIR, "Library")) + os.pathsep + env['PATH']
logger.info(f"PATH: {env['PATH']}")

test_command = f"{PYTHON_EXEC} -m pytest python/pycylon/test/test_all.py"
test_command = f"{PYTHON_EXEC} -m pytest -v python/pycylon/test/test_all.py"
res = subprocess.run(test_command, env=env, shell=True)
check_status(res.returncode, "Python test suite")

Expand All @@ -210,7 +213,7 @@ def build_python():
logger.error("The build should be in a conda environment")
return

python_build_command = f'{PYTHON_EXEC} setup.py install'
python_build_command = f'{PYTHON_EXEC} -m pip install -U .'
env = os.environ
env["CYLON_PREFIX"] = str(BUILD_DIR)
if os.name == 'posix':
Expand Down
4 changes: 2 additions & 2 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -483,11 +483,11 @@ python_test(){
ARROW_LIB=$(python3 -c 'import pyarrow as pa; import os; print(os.path.dirname(pa.__file__))') || exit 1
LD_LIBRARY_PATH="${ARROW_LIB}:${LD_LIBRARY_PATH}" || exit 1
export_library_path ${LD_LIBRARY_PATH}
python3 -m pytest python/pycylon/test/test_all.py || exit 1
python3 -m pytest -v python/pycylon/test/test_all.py || exit 1
}

pygcylon_test(){
python3 -m pytest python/pygcylon/test/test_all.py || exit 1
python3 -m pytest -v python/pygcylon/test/test_all.py || exit 1
}

build_java(){
Expand Down
4 changes: 2 additions & 2 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
# For more information about CMake, see http://www.cmake.org #
##########################################################################
cmake_minimum_required(VERSION 3.17 FATAL_ERROR)
project(CYLON VERSION 0.4.0)
project(CYLON VERSION 0.5.0)

set(CYLON_VERSION 0.4.0)
set(CYLON_VERSION 0.5.0)

## defaults to release build
if (NOT CMAKE_BUILD_TYPE)
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/cylon/arrow/arrow_kernels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,10 @@ arrow::Status SortIndicesMultiColumns(arrow::MemoryPool *memory_pool,
return arrow::Status::Invalid("No of sort columns and no of sort direction indicators mismatch");
}

if (util::CheckArrowTableContainsChunks(table, columns)){
return arrow::Status::Invalid("SortIndicesMultiColumns can not handle chunked columns");
}

std::vector<std::shared_ptr<ArrayIndexComparator>> comparators;
comparators.reserve(columns.size());
for (size_t i = 0; i < columns.size(); i++) {
Expand Down
41 changes: 26 additions & 15 deletions cpp/src/cylon/util/arrow_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
#include <memory>
#include <random>
#include <vector>
#include <math.h>
#include <cmath>
#include <arrow/util/cpu_info.h>

#include <cylon/util/arrow_utils.hpp>
#include <cylon/arrow/arrow_kernels.hpp>
#include <cylon/util/macros.hpp>
#include <iostream>

namespace cylon {
namespace util {

arrow::Status SortTable(const std::shared_ptr<arrow::Table> &table, int64_t sort_column_index,
arrow::Status SortTable(const std::shared_ptr<arrow::Table> &table, int32_t sort_column_index,
arrow::MemoryPool *memory_pool, std::shared_ptr<arrow::Table> &sorted_table,
bool ascending) {
std::shared_ptr<arrow::Table> tab_to_process; // table referenced
Expand Down Expand Up @@ -56,7 +57,7 @@ arrow::Status SortTable(const std::shared_ptr<arrow::Table> &table, int64_t sort
// no bounds check is needed as indices are guaranteed to be within range
const arrow::compute::TakeOptions &take_options = arrow::compute::TakeOptions::NoBoundsCheck();

for (int64_t col_index = 0; col_index < tab_to_process->num_columns(); ++col_index) {
for (int32_t col_index = 0; col_index < tab_to_process->num_columns(); ++col_index) {
const arrow::Result<arrow::Datum> &res = arrow::compute::Take(
cylon::util::GetChunkOrEmptyArray(tab_to_process->column(col_index), 0),
sorted_column_index, take_options, &exec_context);
Expand All @@ -73,38 +74,37 @@ arrow::Status SortTableMultiColumns(const std::shared_ptr<arrow::Table> &table,
arrow::MemoryPool *memory_pool,
std::shared_ptr<arrow::Table> &sorted_table,
const std::vector<bool> &sort_column_directions) {
std::shared_ptr<arrow::Table> tab_to_process; // table referenced
std::shared_ptr<arrow::Table> combined_tab; // table referenced
// combine chunks if multiple chunks are available
if (table->column(sort_column_indices.at(0))->num_chunks() > 1) {
const auto &res = table->CombineChunks(memory_pool);
RETURN_ARROW_STATUS_IF_FAILED(res.status());
tab_to_process = res.ValueOrDie();
if (util::CheckArrowTableContainsChunks(table, sort_column_indices)) {
ARROW_ASSIGN_OR_RAISE(combined_tab, table->CombineChunks(memory_pool));
} else {
tab_to_process = table;
combined_tab = table;
}

// sort to indices
std::shared_ptr<arrow::UInt64Array> sorted_column_index;
RETURN_ARROW_STATUS_IF_FAILED(cylon::SortIndicesMultiColumns(
memory_pool, table, sort_column_indices, sorted_column_index, sort_column_directions));
RETURN_ARROW_STATUS_IF_FAILED(
SortIndicesMultiColumns(memory_pool, combined_tab, sort_column_indices, sorted_column_index,
sort_column_directions));

// now sort everything based on sorted index
arrow::ArrayVector sorted_columns;
sorted_columns.reserve(table->num_columns());
sorted_columns.reserve(combined_tab->num_columns());

arrow::compute::ExecContext exec_context(memory_pool);
// no bounds check is needed as indices are guaranteed to be within range
const arrow::compute::TakeOptions &take_options = arrow::compute::TakeOptions::NoBoundsCheck();

for (int col_index = 0; col_index < tab_to_process->num_columns(); ++col_index) {
for (int col_index = 0; col_index < combined_tab->num_columns(); ++col_index) {
const arrow::Result<arrow::Datum> &res = arrow::compute::Take(
cylon::util::GetChunkOrEmptyArray(tab_to_process->column(col_index), 0),
cylon::util::GetChunkOrEmptyArray(combined_tab->column(col_index), 0),
sorted_column_index, take_options, &exec_context);
RETURN_ARROW_STATUS_IF_FAILED(res.status());
sorted_columns.emplace_back(res.ValueOrDie().make_array());
}

sorted_table = arrow::Table::Make(table->schema(), sorted_columns);
sorted_table = arrow::Table::Make(combined_tab->schema(), sorted_columns);
return arrow::Status::OK();
}

Expand Down Expand Up @@ -376,5 +376,16 @@ arrow::Status MakeEmptyArrowTable(const std::shared_ptr<arrow::Schema> &schema,
return arrow::Status::OK();
}

bool CheckArrowTableContainsChunks(const std::shared_ptr<arrow::Table> &table,
const std::vector<int> &columns) {
if (columns.empty()) {
return std::any_of(table->columns().begin(), table->columns().end(),
[](const auto &col) { return col->num_chunks() > 1; });
} else {
return std::any_of(columns.begin(), columns.end(),
[&](const auto &i) { return table->column(i)->num_chunks() > 1; });
}
}

} // namespace util
} // namespace cylon
8 changes: 6 additions & 2 deletions cpp/src/cylon/util/arrow_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static inline constexpr int64_t CheckBit(const int64_t v) {
return (v >> (sizeof(int64_t) * CHAR_BIT - 1)) & int64_t(1);
}

arrow::Status SortTable(const std::shared_ptr<arrow::Table> &table, int64_t sort_column_index,
arrow::Status SortTable(const std::shared_ptr<arrow::Table> &table, int32_t sort_column_index,
arrow::MemoryPool *memory_pool, std::shared_ptr<arrow::Table> &sorted_table,
bool ascending = true);

Expand Down Expand Up @@ -150,9 +150,13 @@ arrow::Status CreateEmptyTable(const std::shared_ptr<arrow::Schema> &schema,
std::shared_ptr<arrow::Table> *output,
arrow::MemoryPool *pool = arrow::default_memory_pool());

arrow::Status MakeEmptyArrowTable(const std::shared_ptr<arrow::Schema>& schema, std::shared_ptr<arrow::Table>* table,
arrow::Status MakeEmptyArrowTable(const std::shared_ptr<arrow::Schema> &schema,
std::shared_ptr<arrow::Table> *table,
arrow::MemoryPool *pool = arrow::default_memory_pool());

bool CheckArrowTableContainsChunks(const std::shared_ptr<arrow::Table> &table,
const std::vector<int> &columns = {});

} // namespace util
} // namespace cylon
#endif // CYLON_SRC_UTIL_ARROW_UTILS_HPP_
33 changes: 23 additions & 10 deletions cpp/test/join_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ TEST_CASE("Join testing", "[join]") {
"../data/output/join_inner_" + std::to_string(WORLD_SZ) + "_" + std::to_string(RANK) + ".csv";

SECTION("testing inner joins - sort") {
const auto &join_config = join::config::JoinConfig::InnerJoin(0, 0, cylon::join::config::JoinAlgorithm::SORT);
const auto
&join_config = join::config::JoinConfig::InnerJoin(0, 0, join::config::JoinAlgorithm::SORT);
test::TestJoinOperation(join_config, ctx, path1, path2, out_path);
}

SECTION("testing inner joins - hash") {
const auto &join_config = join::config::JoinConfig::InnerJoin(0, 0, cylon::join::config::JoinAlgorithm::HASH);
const auto &join_config =
join::config::JoinConfig::InnerJoin(0, 0, join::config::JoinAlgorithm::HASH);
test::TestJoinOperation(join_config, ctx, path1, path2, out_path);
}
}
Expand All @@ -40,33 +42,42 @@ TEST_CASE("Join testing with null values in value columns", "[join]") {
std::string path1 = "../data/input/csv_with_null1_" + std::to_string(RANK) + ".csv";
std::string path2 = "../data/input/csv_with_null2_" + std::to_string(RANK) + ".csv";
std::string out_path =
"../data/output/join_inner_null_" + std::to_string(WORLD_SZ) + "_" + std::to_string(RANK) + ".csv";
"../data/output/join_inner_null_" + std::to_string(WORLD_SZ) + "_" + std::to_string(RANK)
+ ".csv";

SECTION("testing inner joins - sort") {
const auto &join_config = join::config::JoinConfig::InnerJoin(0, 0, cylon::join::config::JoinAlgorithm::SORT);
const auto &join_config = join::config::JoinConfig::InnerJoin(0,
0,
join::config::JoinAlgorithm::SORT,
"l_",
"r_");
test::TestJoinOperation(join_config, ctx, path1, path2, out_path);
}

SECTION("testing inner joins - hash") {
const auto &join_config = join::config::JoinConfig::InnerJoin(0, 0, cylon::join::config::JoinAlgorithm::HASH);
const auto &join_config =
join::config::JoinConfig::InnerJoin(0, 0, join::config::JoinAlgorithm::HASH, "l_", "r_");
test::TestJoinOperation(join_config, ctx, path1, path2, out_path);
}
}

TEST_CASE("Multi Index Join testing", "[multi_join]") {
std::string path1 = "../data/input/multi_join1.csv";
std::string path2 = "../data/input/multi_join2.csv";
std::string out_path = "../data/output/multi_join_" + std::to_string(WORLD_SZ) + "_" + std::to_string(RANK) + ".csv";
std::string out_path =
"../data/output/multi_join_" + std::to_string(WORLD_SZ) + "_" + std::to_string(RANK) + ".csv";

SECTION("testing inner joins - sort") {
const auto &jc =
join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, cylon::join::config::JoinAlgorithm::SORT, "l_", "r_");
join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, join::config::JoinAlgorithm::SORT,
"l_", "r_");
test::TestJoinOperation(jc, ctx, path1, path2, out_path);
}

SECTION("testing inner joins - hash") {
const auto &jc =
join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, cylon::join::config::JoinAlgorithm::HASH, "l_", "r_");
join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, join::config::JoinAlgorithm::HASH,
"l_", "r_");
test::TestJoinOperation(jc, ctx, path1, path2, out_path);
}
}
Expand Down Expand Up @@ -98,12 +109,14 @@ TEST_CASE("Join testing chunks", "[join]") {
}

SECTION("testing inner joins - sort") {
const auto &jc = join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, join::config::JoinAlgorithm::SORT);
const auto &jc =
join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, join::config::JoinAlgorithm::SORT);
CHECK_CYLON_STATUS(DistributedJoin(t1, t2, jc, out)); // just check if runs without a problem
}

SECTION("testing inner joins - hash") {
const auto &jc = join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, join::config::JoinAlgorithm::HASH);
const auto &jc =
join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, join::config::JoinAlgorithm::HASH);
CHECK_CYLON_STATUS(DistributedJoin(t1, t2, jc, out)); // just check if runs without a problem
}
}
Expand Down
9 changes: 3 additions & 6 deletions cpp/test/parquet_join_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@ TEST_CASE("Parquet join testing", "[join]") {
"../data/output/join_inner_" + std::to_string(WORLD_SZ) + "_" + std::to_string(RANK) + ".parquet";

SECTION("testing inner joins") {
auto join_config = cylon::join::config::JoinConfig(cylon::join::config::JoinType::INNER,
0,
0,
cylon::join::config::JoinAlgorithm::SORT,
"l_",
"r_");
auto join_config = join::config::JoinConfig(join::config::JoinType::INNER, 0, 0,
join::config::JoinAlgorithm::SORT,
"lt-", "rt-");
TestParquetJoinOperation(join_config, ctx, path1, path2, out_path);
}
}
Expand Down
15 changes: 3 additions & 12 deletions cpp/test/test_macros.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,9 @@
INFO("row count: " << _expected->Rows() << " vs " << _result->Rows()); \
REQUIRE(_expected->Rows() == _result->Rows()); \
\
const auto& exp_schema = _expected->get_table()->schema(); \
const auto& res_schema = _result->get_table()->schema(); \
INFO("Schema: " << exp_schema->ToString() << "\nvs " << res_schema->ToString()); \
CHECK_CYLON_STATUS(cylon::VerifyTableSchema(_expected->get_table(), _result->get_table()));\
\
CHECK_CYLON_STATUS(cylon::Subtract(_expected, _result, temp)); \
INFO("subtract(expected, result) row count: " << temp->Rows()); \
REQUIRE(temp->Rows() == 0); \
\
CHECK_CYLON_STATUS(cylon::Subtract(_result, _expected, temp)); \
INFO("subtract(result, expected) row count: " << temp->Rows()); \
REQUIRE(temp->Rows() == 0); \
bool _eq_res = false; \
CHECK_CYLON_STATUS(Equals(_expected, _result, _eq_res, /*ordered=*/false)); \
REQUIRE(_eq_res); \
} while (0)

#endif //CYLON_CPP_TEST_TEST_MACROS_HPP_
2 changes: 1 addition & 1 deletion data/output/indexing_loc_hl_1.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
a
b
2
8
11
Expand Down
2 changes: 1 addition & 1 deletion data/output/indexing_loc_r_1.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
a
b
5
2
8
Expand Down
2 changes: 1 addition & 1 deletion data/output/multi_join_1_0.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
l_a,l_b,l_c,r_a,r_b,r_d
l_a,l_b,l_c,r_a,r_b,r_c
1,3,2,1,3,2
8,11,8,8,11,7
1,2,3,1,2,5
Expand Down
2 changes: 1 addition & 1 deletion data/output/multi_join_2_0.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
l_a,l_b,l_c,r_a,r_b,r_d
l_a,l_b,l_c,r_a,r_b,r_c
1,3,2,1,3,2
1,3,2,1,3,2
5,9,7,5,9,9
Expand Down
2 changes: 1 addition & 1 deletion data/output/multi_join_2_1.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
l_a,l_b,l_c,r_a,r_b,r_d
l_a,l_b,l_c,r_a,r_b,r_c
8,11,8,8,11,7
8,11,8,8,11,7
1,2,3,1,2,5
Expand Down
2 changes: 1 addition & 1 deletion data/output/multi_join_4_0.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
l_a,l_b,l_c,r_a,r_b,r_d
l_a,l_b,l_c,r_a,r_b,r_c
5,9,7,5,9,9
5,9,7,5,9,9
5,9,7,5,9,9
Expand Down
2 changes: 1 addition & 1 deletion data/output/multi_join_4_1.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
l_a,l_b,l_c,r_a,r_b,r_d
l_a,l_b,l_c,r_a,r_b,r_c
1,2,3,1,2,5
1,2,3,1,2,5
1,2,3,1,2,5
Expand Down
2 changes: 1 addition & 1 deletion data/output/multi_join_4_2.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
l_a,l_b,l_c,r_a,r_b,r_d
l_a,l_b,l_c,r_a,r_b,r_c
1,3,2,1,3,2
1,3,2,1,3,2
1,3,2,1,3,2
Expand Down
2 changes: 1 addition & 1 deletion data/output/multi_join_4_3.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
l_a,l_b,l_c,r_a,r_b,r_d
l_a,l_b,l_c,r_a,r_b,r_c
8,11,8,8,11,7
8,11,8,8,11,7
8,11,8,8,11,7
Expand Down
2 changes: 1 addition & 1 deletion data/output/subtract_1_0.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pda0,1
0,1
7,0.232000
45,0.734000
25,0.479000
Expand Down
Loading