From 049dc1c1b7b62c4bba333c0e994ff12cf20aee71 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Wed, 29 Dec 2021 22:49:43 -0500 Subject: [PATCH 01/10] enabling test --- python/pycylon/test/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pycylon/test/test_parquet.py b/python/pycylon/test/test_parquet.py index b7fb5f5dc..9eaa8dcf6 100644 --- a/python/pycylon/test/test_parquet.py +++ b/python/pycylon/test/test_parquet.py @@ -36,5 +36,5 @@ def test_parquet_join(): out = cdf1.merge(cdf2, how='inner', on=[0], algorithm='sort', suffixes=('lt-', 'rt-')) - # assert(expected.equals(out, ordered=False)) # should pass + assert(expected.equals(out, ordered=False)) # should pass assert (len(expected.to_table().subtract(out.to_table())) == 0) From 03b3ea2050ad523e0ec993e17b3d2204f1261978 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Fri, 31 Dec 2021 12:03:01 -0500 Subject: [PATCH 02/10] extending equal testcase to distributed --- python/pycylon/test/test_equal.py | 48 ++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/python/pycylon/test/test_equal.py b/python/pycylon/test/test_equal.py index 0eebfdb9d..fc86e4d2a 100644 --- a/python/pycylon/test/test_equal.py +++ b/python/pycylon/test/test_equal.py @@ -14,34 +14,40 @@ """ Run test: ->> pytest -q python/pycylon/test/test_equal.py +>> mpirun -n 4 python -m pytest --with-mpi -q python/pycylon/test/test_equal.py """ +import pytest + from copy import deepcopy -from pycylon import DataFrame, read_csv +from pycylon import DataFrame, CylonEnv import random +from pycylon.net import MPIConfig + -def test_ordered_eq(): +def test_ordered_eq(env=None): df1 = DataFrame([random.sample(range(10, 100), 50), - random.sample(range(10, 100), 50)]) - assert df1.equals(df1) + random.sample(range(10, 100), 50)]) + assert df1.equals(df1, env=env) -def test_ordered_neq(): + +def test_ordered_neq(env=None): arr1 = [random.sample(range(10, 100), 50), random.sample(range(10, 100), 50)] arr2 = deepcopy(arr1) arr2[0][0] += 1 df1 = DataFrame(arr1) df2 = DataFrame(arr2) - assert not df1.equals(df2) + assert not df1.equals(df2, env=env) arr3 = deepcopy(arr1) arr3[0] = arr3[0][::-1] arr3[1] = arr3[1][::-1] df3 = DataFrame(arr3) - assert not df1.equals(df3) + assert not df1.equals(df3, env=env) + -def test_unordered_eq(): +def test_unordered_eq(env=None): arr1 = [random.sample(range(10, 100), 50), random.sample(range(10, 100), 50)] arr2 = deepcopy(arr1) @@ -49,25 +55,35 @@ def test_unordered_eq(): arr2[1] = arr2[1][::-1] df1 = DataFrame(arr1) df2 = DataFrame(arr2) - assert df1.equals(df2, False) + assert df1.equals(df2, False, env=env) arr3 = deepcopy(arr1) arr3[0] = arr3[0][::-1] arr3[1] = arr3[1][::-1] df3 = DataFrame(arr3) - assert df1.equals(df3, False) - assert df2.equals(df3, False) + assert df1.equals(df3, False, env=env) + assert df2.equals(df3, False, env=env) -def test_unordered_neq(): + +def test_unordered_neq(env=None): arr1 = [random.sample(range(10, 100), 50), - random.sample(range(10, 100), 50)] + random.sample(range(10, 100), 50)] arr2 = deepcopy(arr1) arr2[0] = arr2[0][::-1] df1 = DataFrame(arr1) df2 = DataFrame(arr2) - assert not df1.equals(df2, False) + assert not df1.equals(df2, False, env=env) arr3 = deepcopy(arr1) arr3 = arr3[::-1] df3 = DataFrame(arr3) - assert not df1.equals(df3, False) + assert not df1.equals(df3, False, env=env) + + +@pytest.mark.mpi +def test_distributed(): + env = CylonEnv(config=MPIConfig(), distributed=True) + test_ordered_eq(env) + test_ordered_neq(env) + test_unordered_eq(env) + test_ordered_neq(env) From 17a0e81ce1d366660e3c9a77a30ac50d448c8635 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Fri, 31 Dec 2021 12:11:26 -0500 Subject: [PATCH 03/10] addig equal test to test_all --- python/pycylon/test/test_all.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/pycylon/test/test_all.py b/python/pycylon/test/test_all.py index 671e04d93..1d3760a3e 100644 --- a/python/pycylon/test/test_all.py +++ b/python/pycylon/test/test_all.py @@ -246,6 +246,12 @@ def test_repartition(): assert responses[-1] == 0 +def test_equals(): + print("32. Equals") + responses.append(os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " + "python/pycylon/test/test_equal.py")) + + def test_parquet(): print("32. DataFrame Test") responses.append(os.system("pytest -q python/pycylon/test/test_parquet.py")) From 69e8e67e504628a1e68d035c514e0c68a34834c6 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Fri, 31 Dec 2021 12:40:37 -0500 Subject: [PATCH 04/10] minor miss --- python/pycylon/test/test_all.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pycylon/test/test_all.py b/python/pycylon/test/test_all.py index 1d3760a3e..e3fe69020 100644 --- a/python/pycylon/test/test_all.py +++ b/python/pycylon/test/test_all.py @@ -250,6 +250,7 @@ def test_equals(): print("32. Equals") responses.append(os.system(get_mpi_command() + " -n 4 python -m pytest --with-mpi " "python/pycylon/test/test_equal.py")) + assert responses[-1] == 0 def test_parquet(): From afacab5f41f135d841783409fa5248bff884f1d8 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Fri, 31 Dec 2021 13:16:39 -0500 Subject: [PATCH 05/10] fix equality bug --- build.py | 2 +- build.sh | 4 +- python/pycylon/pycylon/data/table.pxd | 62 ++++++++++++++------------- python/pycylon/pycylon/data/table.pyx | 18 ++++---- python/pycylon/pycylon/frame.py | 2 + python/pycylon/test/test_all.py | 2 +- 6 files changed, 49 insertions(+), 41 deletions(-) diff --git a/build.py b/build.py index 95c491d27..9bce75b9b 100644 --- a/build.py +++ b/build.py @@ -193,7 +193,7 @@ def python_test(): else: # Windows env['PATH'] = str(Path(INSTALL_DIR, "Library")) + os.pathsep + env['PATH'] - test_command = f"{PYTHON_EXEC} -m pytest python/pycylon/test/test_all.py" + test_command = f"{PYTHON_EXEC} -m pytest -v python/pycylon/test/test_all.py" res = subprocess.run(test_command, env=env, shell=True) check_status(res.returncode, "Python test suite") diff --git a/build.sh b/build.sh index 79fe948b4..db0648e90 100755 --- a/build.sh +++ b/build.sh @@ -483,11 +483,11 @@ python_test(){ ARROW_LIB=$(python3 -c 'import pyarrow as pa; import os; print(os.path.dirname(pa.__file__))') || exit 1 LD_LIBRARY_PATH="${ARROW_LIB}:${LD_LIBRARY_PATH}" || exit 1 export_library_path ${LD_LIBRARY_PATH} - python3 -m pytest python/pycylon/test/test_all.py || exit 1 + python3 -m pytest -v python/pycylon/test/test_all.py || exit 1 } pygcylon_test(){ - python3 -m pytest python/pygcylon/test/test_all.py || exit 1 + python3 -m pytest -v python/pygcylon/test/test_all.py || exit 1 } build_java(){ diff --git a/python/pycylon/pycylon/data/table.pxd b/python/pycylon/pycylon/data/table.pxd index 611f4faa2..644d934bd 100644 --- a/python/pycylon/pycylon/data/table.pxd +++ b/python/pycylon/pycylon/data/table.pxd @@ -13,7 +13,7 @@ ## from libcpp.string cimport string -from libcpp cimport bool +from libcpp cimport bool as cpp_bool from pycylon.common.status cimport CStatus from pycylon.common.status import Status import uuid @@ -55,15 +55,15 @@ cdef extern from "../../../../cpp/src/cylon/table.hpp" namespace "cylon": vector[string] ColumnNames() - void retainMemory(bool retain) + void retainMemory(cpp_bool retain) - bool IsRetain() const + cpp_bool IsRetain() const - CStatus SetArrowIndex(shared_ptr[CBaseArrowIndex] & index, bool drop) + CStatus SetArrowIndex(shared_ptr[CBaseArrowIndex] & index, cpp_bool drop) shared_ptr[CBaseArrowIndex] GetArrowIndex() - CStatus ResetArrowIndex(bool drop) + CStatus ResetArrowIndex(cpp_bool drop) CStatus AddColumn(int position, string column_name, shared_ptr[CArrowArray] input_column) @@ -73,10 +73,10 @@ cdef extern from "../../../../cpp/src/cylon/table.hpp" namespace "cylon": const CCSVWriteOptions & options) CStatus Sort(shared_ptr[CTable] & table, const vector[int] sort_columns, - shared_ptr[CTable] & output, const vector[bool] & sort_direction) + shared_ptr[CTable] & output, const vector[cpp_bool] & sort_direction) - CStatus Project(shared_ptr[CTable] & table, const vector[int] & project_columns, shared_ptr[ - CTable] & output) + CStatus Project(shared_ptr[CTable] & table, const vector[int] & project_columns, + shared_ptr[ CTable] & output) CStatus Merge(vector[shared_ptr[CTable]] & tables, shared_ptr[CTable] output) @@ -86,47 +86,51 @@ cdef extern from "../../../../cpp/src/cylon/table.hpp" namespace "cylon": CStatus DistributedJoin(shared_ptr[CTable] & left, shared_ptr[CTable] & right, const CJoinConfig & join_config, shared_ptr[CTable] & output); - CStatus Union(shared_ptr[CTable] & first, shared_ptr[CTable] & second, shared_ptr[CTable] - & output) + CStatus Union(shared_ptr[CTable] & first, shared_ptr[CTable] & second, + shared_ptr[CTable] & output) - CStatus DistributedUnion(shared_ptr[CTable] & first, shared_ptr[CTable] & second, shared_ptr[ - CTable] - & output) + CStatus DistributedUnion(shared_ptr[CTable] & first, shared_ptr[CTable] & second, + shared_ptr[CTable] & output) - CStatus Subtract(shared_ptr[CTable] & first, shared_ptr[CTable] & second, shared_ptr[CTable] - & output) + CStatus Subtract(shared_ptr[CTable] & first, shared_ptr[CTable] & second, + shared_ptr[CTable] & output) CStatus DistributedSubtract(shared_ptr[CTable] & first, shared_ptr[CTable] & second, shared_ptr[CTable] & output) - CStatus Intersect(shared_ptr[CTable] & first, shared_ptr[CTable] & second, shared_ptr[CTable] - & output) + CStatus Intersect(shared_ptr[CTable] & first, shared_ptr[CTable] & second, + shared_ptr[CTable] & output) CStatus DistributedIntersect(shared_ptr[CTable] & first, shared_ptr[CTable] & second, shared_ptr[CTable] & output) CStatus DistributedSort(shared_ptr[CTable] & table, const vector[int] sort_columns, - shared_ptr[CTable] & output, const vector[bool] & sort_direction, + shared_ptr[CTable] & output, const vector[cpp_bool] & sort_direction, CSortOptions sort_options) - CStatus Shuffle(shared_ptr[CTable] & table, const vector[int] & hash_columns, shared_ptr[CTable] - & output) + CStatus Shuffle(shared_ptr[CTable] & table, const vector[int] & hash_columns, + shared_ptr[CTable] & output) - CStatus Unique(shared_ptr[CTable] & input_table, const vector[int] & columns, shared_ptr[CTable] - & output, bool first) + CStatus Unique(shared_ptr[CTable] & input_table, const vector[int] & columns, + shared_ptr[CTable]& output, cpp_bool first) CStatus DistributedUnique(shared_ptr[CTable] & input_table, const vector[int] & columns, shared_ptr[CTable]& output) - CStatus Equals(shared_ptr[CTable] & a, shared_ptr[CTable] & b, bool& result, bool ordered) - - CStatus DistributedEquals(shared_ptr[CTable] & a, shared_ptr[CTable] & b, bool& result, bool ordered) + CStatus Equals(shared_ptr[CTable] & a, shared_ptr[CTable] & b, cpp_bool& result, + cpp_bool ordered) - CStatus Repartition(const shared_ptr[CTable] & table, const vector[int64_t] & rows_per_partition, const vector[int] & receive_build_rank_order, shared_ptr[CTable]* output) - - CStatus Repartition(const shared_ptr[CTable] & table, const vector[int64_t] & rows_per_partition, shared_ptr[CTable]* output) + CStatus DistributedEquals(shared_ptr[CTable] & a, shared_ptr[CTable] & b, cpp_bool& result, + cpp_bool ordered) - CStatus Repartition(const shared_ptr[CTable] & table, shared_ptr[CTable]* output) + CStatus Repartition(const shared_ptr[CTable] & table, + const vector[int64_t] & rows_per_partition, + const vector[int] & receive_build_rank_order, shared_ptr[CTable] * output) + + CStatus Repartition(const shared_ptr[CTable] & table, + const vector[int64_t] & rows_per_partition, shared_ptr[CTable] * output) + + CStatus Repartition(const shared_ptr[CTable] & table, shared_ptr[CTable] * output) cdef extern from "../../../../cpp/src/cylon/table.hpp" namespace "cylon": cdef cppclass CSortOptions "cylon::SortOptions": diff --git a/python/pycylon/pycylon/data/table.pyx b/python/pycylon/pycylon/data/table.pyx index 742557d46..356baf83e 100644 --- a/python/pycylon/pycylon/data/table.pyx +++ b/python/pycylon/pycylon/data/table.pyx @@ -13,6 +13,7 @@ ## from libcpp.string cimport string +from libcpp cimport bool as cpp_bool from libc.stdint cimport int64_t from libcpp.vector cimport vector from libcpp.pair cimport pair @@ -133,7 +134,7 @@ cdef class Table: cdef shared_ptr[CTable] output cdef vector[int] sort_index - cdef vector[bool] order_directions + cdef vector[cpp_bool] order_directions if isinstance(order_by, str): sort_index.push_back(self._resolve_column_index_from_column_name(order_by)) @@ -472,7 +473,7 @@ cdef class Table: cdef shared_ptr[CTable] output cdef CSortOptions *csort_options cdef vector[int] sort_index - cdef vector[bool] order_directions + cdef vector[cpp_bool] order_directions if isinstance(order_by, str): sort_index.push_back(self._resolve_column_index_from_column_name(order_by)) @@ -707,7 +708,7 @@ cdef class Table: cdef shared_ptr[CArrowTable] aoutput cdef shared_ptr[CTable] output cdef vector[int] c_cols - cdef bool c_first = False + cdef cpp_bool c_first = False if keep == 'first': c_first = True if columns: @@ -807,7 +808,7 @@ cdef class Table: True ''' cdef CStatus status - cdef bool output + cdef cpp_bool output status = Equals(self.table_shd_ptr, table.table_shd_ptr, output, ordered) if status.is_ok(): return output @@ -845,8 +846,9 @@ cdef class Table: True ''' cdef CStatus status - cdef bool output - status = DistributedEquals(self.table_shd_ptr, table.table_shd_ptr, output, ordered) + cdef cpp_bool output + cdef cpp_bool ordered_ = ordered + status = DistributedEquals(self.table_shd_ptr, table.table_shd_ptr, output, ordered_) if status.is_ok(): return output else: @@ -2273,7 +2275,7 @@ cdef class Table: # cdef bool c_drop_index = drop_index # self.table_shd_ptr.get().ResetIndex(c_drop_index) - def reset_index(self, drop_index: bool = False) -> Table: + def reset_index(self, drop_index: bool = False): """ reset_index Here the existing index can be removed and set back to table. @@ -2301,7 +2303,7 @@ cdef class Table: 2 3 7 11 3 4 8 12 """ - cdef bool c_drop_index = drop_index + cdef cpp_bool c_drop_index = drop_index self.table_shd_ptr.get().ResetArrowIndex(c_drop_index) def dropna(self, axis=0, how='any', inplace=False): diff --git a/python/pycylon/pycylon/frame.py b/python/pycylon/pycylon/frame.py index 1430dd818..3abb3eb5b 100644 --- a/python/pycylon/pycylon/frame.py +++ b/python/pycylon/pycylon/frame.py @@ -398,6 +398,8 @@ def equals(self, df: DataFrame, ordered=True, env: CylonEnv = None): if env is None: return self._table.equals(df._table, ordered) else: + self._change_context(env) + df._change_context(env) return self._table.distributed_equals(df._table, ordered) def repartition(self, rows_per_partition, receive_build_rank_order=None, env: CylonEnv=None): diff --git a/python/pycylon/test/test_all.py b/python/pycylon/test/test_all.py index e3fe69020..b0e77e783 100644 --- a/python/pycylon/test/test_all.py +++ b/python/pycylon/test/test_all.py @@ -254,7 +254,7 @@ def test_equals(): def test_parquet(): - print("32. DataFrame Test") + print("33. DataFrame Test") responses.append(os.system("pytest -q python/pycylon/test/test_parquet.py")) assert responses[-1] == 0 From 87a18b2f2e3b2dee4b5e645e59097f1f1c270cb5 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Sun, 2 Jan 2022 16:20:08 -0500 Subject: [PATCH 06/10] using unordered equal operation for VERIFY_TABLE_UNORDERED --- cpp/test/join_test.cpp | 33 +++++++++++++++++++++---------- cpp/test/parquet_join_test.cpp | 9 +++------ cpp/test/test_macros.hpp | 15 +++----------- data/output/indexing_loc_hl_1.csv | 2 +- data/output/indexing_loc_r_1.csv | 2 +- data/output/multi_join_1_0.csv | 2 +- data/output/multi_join_2_0.csv | 2 +- data/output/multi_join_2_1.csv | 2 +- data/output/multi_join_4_0.csv | 2 +- data/output/multi_join_4_1.csv | 2 +- data/output/multi_join_4_2.csv | 2 +- data/output/multi_join_4_3.csv | 2 +- data/output/subtract_1_0.csv | 2 +- 13 files changed, 39 insertions(+), 38 deletions(-) diff --git a/cpp/test/join_test.cpp b/cpp/test/join_test.cpp index 1c2bf0f6f..ebaeeff4c 100644 --- a/cpp/test/join_test.cpp +++ b/cpp/test/join_test.cpp @@ -26,12 +26,14 @@ TEST_CASE("Join testing", "[join]") { "../data/output/join_inner_" + std::to_string(WORLD_SZ) + "_" + std::to_string(RANK) + ".csv"; SECTION("testing inner joins - sort") { - const auto &join_config = join::config::JoinConfig::InnerJoin(0, 0, cylon::join::config::JoinAlgorithm::SORT); + const auto + &join_config = join::config::JoinConfig::InnerJoin(0, 0, join::config::JoinAlgorithm::SORT); test::TestJoinOperation(join_config, ctx, path1, path2, out_path); } SECTION("testing inner joins - hash") { - const auto &join_config = join::config::JoinConfig::InnerJoin(0, 0, cylon::join::config::JoinAlgorithm::HASH); + const auto &join_config = + join::config::JoinConfig::InnerJoin(0, 0, join::config::JoinAlgorithm::HASH); test::TestJoinOperation(join_config, ctx, path1, path2, out_path); } } @@ -40,15 +42,21 @@ TEST_CASE("Join testing with null values in value columns", "[join]") { std::string path1 = "../data/input/csv_with_null1_" + std::to_string(RANK) + ".csv"; std::string path2 = "../data/input/csv_with_null2_" + std::to_string(RANK) + ".csv"; std::string out_path = - "../data/output/join_inner_null_" + std::to_string(WORLD_SZ) + "_" + std::to_string(RANK) + ".csv"; + "../data/output/join_inner_null_" + std::to_string(WORLD_SZ) + "_" + std::to_string(RANK) + + ".csv"; SECTION("testing inner joins - sort") { - const auto &join_config = join::config::JoinConfig::InnerJoin(0, 0, cylon::join::config::JoinAlgorithm::SORT); + const auto &join_config = join::config::JoinConfig::InnerJoin(0, + 0, + join::config::JoinAlgorithm::SORT, + "l_", + "r_"); test::TestJoinOperation(join_config, ctx, path1, path2, out_path); } SECTION("testing inner joins - hash") { - const auto &join_config = join::config::JoinConfig::InnerJoin(0, 0, cylon::join::config::JoinAlgorithm::HASH); + const auto &join_config = + join::config::JoinConfig::InnerJoin(0, 0, join::config::JoinAlgorithm::HASH, "l_", "r_"); test::TestJoinOperation(join_config, ctx, path1, path2, out_path); } } @@ -56,17 +64,20 @@ TEST_CASE("Join testing with null values in value columns", "[join]") { TEST_CASE("Multi Index Join testing", "[multi_join]") { std::string path1 = "../data/input/multi_join1.csv"; std::string path2 = "../data/input/multi_join2.csv"; - std::string out_path = "../data/output/multi_join_" + std::to_string(WORLD_SZ) + "_" + std::to_string(RANK) + ".csv"; + std::string out_path = + "../data/output/multi_join_" + std::to_string(WORLD_SZ) + "_" + std::to_string(RANK) + ".csv"; SECTION("testing inner joins - sort") { const auto &jc = - join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, cylon::join::config::JoinAlgorithm::SORT, "l_", "r_"); + join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, join::config::JoinAlgorithm::SORT, + "l_", "r_"); test::TestJoinOperation(jc, ctx, path1, path2, out_path); } SECTION("testing inner joins - hash") { const auto &jc = - join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, cylon::join::config::JoinAlgorithm::HASH, "l_", "r_"); + join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, join::config::JoinAlgorithm::HASH, + "l_", "r_"); test::TestJoinOperation(jc, ctx, path1, path2, out_path); } } @@ -98,12 +109,14 @@ TEST_CASE("Join testing chunks", "[join]") { } SECTION("testing inner joins - sort") { - const auto &jc = join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, join::config::JoinAlgorithm::SORT); + const auto &jc = + join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, join::config::JoinAlgorithm::SORT); CHECK_CYLON_STATUS(DistributedJoin(t1, t2, jc, out)); // just check if runs without a problem } SECTION("testing inner joins - hash") { - const auto &jc = join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, join::config::JoinAlgorithm::HASH); + const auto &jc = + join::config::JoinConfig::InnerJoin({0, 1}, {0, 1}, join::config::JoinAlgorithm::HASH); CHECK_CYLON_STATUS(DistributedJoin(t1, t2, jc, out)); // just check if runs without a problem } } diff --git a/cpp/test/parquet_join_test.cpp b/cpp/test/parquet_join_test.cpp index d1ae3a216..490c2284d 100644 --- a/cpp/test/parquet_join_test.cpp +++ b/cpp/test/parquet_join_test.cpp @@ -25,12 +25,9 @@ TEST_CASE("Parquet join testing", "[join]") { "../data/output/join_inner_" + std::to_string(WORLD_SZ) + "_" + std::to_string(RANK) + ".parquet"; SECTION("testing inner joins") { - auto join_config = cylon::join::config::JoinConfig(cylon::join::config::JoinType::INNER, - 0, - 0, - cylon::join::config::JoinAlgorithm::SORT, - "l_", - "r_"); + auto join_config = join::config::JoinConfig(join::config::JoinType::INNER, 0, 0, + join::config::JoinAlgorithm::SORT, + "lt-", "rt-"); TestParquetJoinOperation(join_config, ctx, path1, path2, out_path); } } diff --git a/cpp/test/test_macros.hpp b/cpp/test/test_macros.hpp index 587a1977d..8db260095 100644 --- a/cpp/test/test_macros.hpp +++ b/cpp/test/test_macros.hpp @@ -71,18 +71,9 @@ INFO("row count: " << _expected->Rows() << " vs " << _result->Rows()); \ REQUIRE(_expected->Rows() == _result->Rows()); \ \ - const auto& exp_schema = _expected->get_table()->schema(); \ - const auto& res_schema = _result->get_table()->schema(); \ - INFO("Schema: " << exp_schema->ToString() << "\nvs " << res_schema->ToString()); \ - CHECK_CYLON_STATUS(cylon::VerifyTableSchema(_expected->get_table(), _result->get_table()));\ - \ - CHECK_CYLON_STATUS(cylon::Subtract(_expected, _result, temp)); \ - INFO("subtract(expected, result) row count: " << temp->Rows()); \ - REQUIRE(temp->Rows() == 0); \ - \ - CHECK_CYLON_STATUS(cylon::Subtract(_result, _expected, temp)); \ - INFO("subtract(result, expected) row count: " << temp->Rows()); \ - REQUIRE(temp->Rows() == 0); \ + bool _eq_res = false; \ + CHECK_CYLON_STATUS(Equals(_expected, _result, _eq_res, /*ordered=*/false)); \ + REQUIRE(_eq_res); \ } while (0) #endif //CYLON_CPP_TEST_TEST_MACROS_HPP_ diff --git a/data/output/indexing_loc_hl_1.csv b/data/output/indexing_loc_hl_1.csv index 455dca4a5..b13ac9a86 100644 --- a/data/output/indexing_loc_hl_1.csv +++ b/data/output/indexing_loc_hl_1.csv @@ -1,4 +1,4 @@ -a +b 2 8 11 diff --git a/data/output/indexing_loc_r_1.csv b/data/output/indexing_loc_r_1.csv index c6ca8d9b7..cc1ff8cd3 100644 --- a/data/output/indexing_loc_r_1.csv +++ b/data/output/indexing_loc_r_1.csv @@ -1,4 +1,4 @@ -a +b 5 2 8 diff --git a/data/output/multi_join_1_0.csv b/data/output/multi_join_1_0.csv index 7a9be9ecc..a6fb89aaa 100644 --- a/data/output/multi_join_1_0.csv +++ b/data/output/multi_join_1_0.csv @@ -1,4 +1,4 @@ -l_a,l_b,l_c,r_a,r_b,r_d +l_a,l_b,l_c,r_a,r_b,r_c 1,3,2,1,3,2 8,11,8,8,11,7 1,2,3,1,2,5 diff --git a/data/output/multi_join_2_0.csv b/data/output/multi_join_2_0.csv index da6d5ca7b..00309d705 100644 --- a/data/output/multi_join_2_0.csv +++ b/data/output/multi_join_2_0.csv @@ -1,4 +1,4 @@ -l_a,l_b,l_c,r_a,r_b,r_d +l_a,l_b,l_c,r_a,r_b,r_c 1,3,2,1,3,2 1,3,2,1,3,2 5,9,7,5,9,9 diff --git a/data/output/multi_join_2_1.csv b/data/output/multi_join_2_1.csv index a62454010..d8bc2b601 100644 --- a/data/output/multi_join_2_1.csv +++ b/data/output/multi_join_2_1.csv @@ -1,4 +1,4 @@ -l_a,l_b,l_c,r_a,r_b,r_d +l_a,l_b,l_c,r_a,r_b,r_c 8,11,8,8,11,7 8,11,8,8,11,7 1,2,3,1,2,5 diff --git a/data/output/multi_join_4_0.csv b/data/output/multi_join_4_0.csv index 5ab989543..208ca8945 100644 --- a/data/output/multi_join_4_0.csv +++ b/data/output/multi_join_4_0.csv @@ -1,4 +1,4 @@ -l_a,l_b,l_c,r_a,r_b,r_d +l_a,l_b,l_c,r_a,r_b,r_c 5,9,7,5,9,9 5,9,7,5,9,9 5,9,7,5,9,9 diff --git a/data/output/multi_join_4_1.csv b/data/output/multi_join_4_1.csv index ca866eeb0..0fcaa6ddc 100644 --- a/data/output/multi_join_4_1.csv +++ b/data/output/multi_join_4_1.csv @@ -1,4 +1,4 @@ -l_a,l_b,l_c,r_a,r_b,r_d +l_a,l_b,l_c,r_a,r_b,r_c 1,2,3,1,2,5 1,2,3,1,2,5 1,2,3,1,2,5 diff --git a/data/output/multi_join_4_2.csv b/data/output/multi_join_4_2.csv index ff4b3e9fa..f3c905bc2 100644 --- a/data/output/multi_join_4_2.csv +++ b/data/output/multi_join_4_2.csv @@ -1,4 +1,4 @@ -l_a,l_b,l_c,r_a,r_b,r_d +l_a,l_b,l_c,r_a,r_b,r_c 1,3,2,1,3,2 1,3,2,1,3,2 1,3,2,1,3,2 diff --git a/data/output/multi_join_4_3.csv b/data/output/multi_join_4_3.csv index fa43efbfb..3371d38f0 100644 --- a/data/output/multi_join_4_3.csv +++ b/data/output/multi_join_4_3.csv @@ -1,4 +1,4 @@ -l_a,l_b,l_c,r_a,r_b,r_d +l_a,l_b,l_c,r_a,r_b,r_c 8,11,8,8,11,7 8,11,8,8,11,7 8,11,8,8,11,7 diff --git a/data/output/subtract_1_0.csv b/data/output/subtract_1_0.csv index 6b742dc25..bdb69c764 100644 --- a/data/output/subtract_1_0.csv +++ b/data/output/subtract_1_0.csv @@ -1,4 +1,4 @@ -pda0,1 +0,1 7,0.232000 45,0.734000 25,0.479000 From b07fad32f6e1d8d627762acc88c42aeb03a76d8f Mon Sep 17 00:00:00 2001 From: niranda perera Date: Sun, 2 Jan 2022 17:23:22 -0500 Subject: [PATCH 07/10] minor changes to python build --- build.py | 6 ++++++ cpp/CMakeLists.txt | 4 ++-- python/pycylon/pycylon/net/txrequest.pyx | 6 +++--- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/build.py b/build.py index 9bce75b9b..d2a800562 100644 --- a/build.py +++ b/build.py @@ -187,11 +187,14 @@ def python_test(): if OS_NAME == 'Linux': env['LD_LIBRARY_PATH'] = str(Path(INSTALL_DIR, "lib")) + os.pathsep \ + env['LD_LIBRARY_PATH'] + logger.info(f"LD_LIBRARY_PATH: {env['LD_LIBRARY_PATH']}") elif OS_NAME == 'Darwin': env['DYLD_LIBRARY_PATH'] = str(Path(INSTALL_DIR, "lib")) + os.pathsep \ + env['DYLD_LIBRARY_PATH'] + logger.info(f"DYLD_LIBRARY_PATH: {env['DYLD_LIBRARY_PATH']}") else: # Windows env['PATH'] = str(Path(INSTALL_DIR, "Library")) + os.pathsep + env['PATH'] + logger.info(f"PATH: {env['PATH']}") test_command = f"{PYTHON_EXEC} -m pytest -v python/pycylon/test/test_all.py" res = subprocess.run(test_command, env=env, shell=True) @@ -219,6 +222,9 @@ def build_python(): env["ARROW_PREFIX"] = str(Path(os.environ["CONDA_PREFIX"], "Library")) logger.info("Arrow prefix: " + str(Path(os.environ["CONDA_PREFIX"]))) + res = subprocess.run(f"{PYTHON_EXEC} -m pip uninstall -y pycylon", shell=True, env=env, + cwd=PYTHON_SOURCE_DIR) + check_status(res.returncode, "pip uninstall pycylon") res = subprocess.run(python_build_command, shell=True, env=env, cwd=PYTHON_SOURCE_DIR) check_status(res.returncode, "PyCylon build") diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a291a05f2..2ff39cf7c 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -22,9 +22,9 @@ # For more information about CMake, see http://www.cmake.org # ########################################################################## cmake_minimum_required(VERSION 3.17 FATAL_ERROR) -project(CYLON VERSION 0.4.0) +project(CYLON VERSION 0.5.0) -set(CYLON_VERSION 0.4.0) +set(CYLON_VERSION 0.5.0) ## defaults to release build if (NOT CMAKE_BUILD_TYPE) diff --git a/python/pycylon/pycylon/net/txrequest.pyx b/python/pycylon/pycylon/net/txrequest.pyx index ffba8d81d..65c4e7e74 100644 --- a/python/pycylon/pycylon/net/txrequest.pyx +++ b/python/pycylon/pycylon/net/txrequest.pyx @@ -13,7 +13,7 @@ ## import numpy as np -cimport numpy as np +cimport numpy as c_np from pycylon.net.txrequest cimport CTxRequest @@ -30,8 +30,8 @@ cdef class TxRequest: cdef public np_buf_val cdef public np_head_val - def __cinit__(self, int tgt, np.ndarray buf, int len, - np.ndarray[int, ndim=1, mode="c"] head, int hLength): + def __cinit__(self, int tgt, c_np.ndarray buf, int len, + c_np.ndarray[int, ndim=1, mode="c"] head, int hLength): ''' Initialized the PyCylon TxRequest :param tgt: passed as an int; the target of communication From eeb7374472362d7e03a04daea438cc724cbf802d Mon Sep 17 00:00:00 2001 From: niranda perera Date: Sun, 2 Jan 2022 18:39:31 -0500 Subject: [PATCH 08/10] fixing bug --- cpp/src/cylon/arrow/arrow_kernels.cpp | 4 +++ cpp/src/cylon/util/arrow_utils.cpp | 41 +++++++++++++++++---------- cpp/src/cylon/util/arrow_utils.hpp | 8 ++++-- 3 files changed, 36 insertions(+), 17 deletions(-) diff --git a/cpp/src/cylon/arrow/arrow_kernels.cpp b/cpp/src/cylon/arrow/arrow_kernels.cpp index b122197f7..2a30a9405 100644 --- a/cpp/src/cylon/arrow/arrow_kernels.cpp +++ b/cpp/src/cylon/arrow/arrow_kernels.cpp @@ -534,6 +534,10 @@ arrow::Status SortIndicesMultiColumns(arrow::MemoryPool *memory_pool, return arrow::Status::Invalid("No of sort columns and no of sort direction indicators mismatch"); } + if (util::CheckArrowTableContainsChunks(table, columns)){ + return arrow::Status::Invalid("SortIndicesMultiColumns can not handle chunked columns"); + } + std::vector> comparators; comparators.reserve(columns.size()); for (size_t i = 0; i < columns.size(); i++) { diff --git a/cpp/src/cylon/util/arrow_utils.cpp b/cpp/src/cylon/util/arrow_utils.cpp index 79ede60d4..d6bc926a7 100644 --- a/cpp/src/cylon/util/arrow_utils.cpp +++ b/cpp/src/cylon/util/arrow_utils.cpp @@ -18,17 +18,18 @@ #include #include #include -#include +#include #include #include #include #include +#include namespace cylon { namespace util { -arrow::Status SortTable(const std::shared_ptr &table, int64_t sort_column_index, +arrow::Status SortTable(const std::shared_ptr &table, int32_t sort_column_index, arrow::MemoryPool *memory_pool, std::shared_ptr &sorted_table, bool ascending) { std::shared_ptr tab_to_process; // table referenced @@ -56,7 +57,7 @@ arrow::Status SortTable(const std::shared_ptr &table, int64_t sort // no bounds check is needed as indices are guaranteed to be within range const arrow::compute::TakeOptions &take_options = arrow::compute::TakeOptions::NoBoundsCheck(); - for (int64_t col_index = 0; col_index < tab_to_process->num_columns(); ++col_index) { + for (int32_t col_index = 0; col_index < tab_to_process->num_columns(); ++col_index) { const arrow::Result &res = arrow::compute::Take( cylon::util::GetChunkOrEmptyArray(tab_to_process->column(col_index), 0), sorted_column_index, take_options, &exec_context); @@ -73,38 +74,37 @@ arrow::Status SortTableMultiColumns(const std::shared_ptr &table, arrow::MemoryPool *memory_pool, std::shared_ptr &sorted_table, const std::vector &sort_column_directions) { - std::shared_ptr tab_to_process; // table referenced + std::shared_ptr combined_tab; // table referenced // combine chunks if multiple chunks are available - if (table->column(sort_column_indices.at(0))->num_chunks() > 1) { - const auto &res = table->CombineChunks(memory_pool); - RETURN_ARROW_STATUS_IF_FAILED(res.status()); - tab_to_process = res.ValueOrDie(); + if (util::CheckArrowTableContainsChunks(table, sort_column_indices)) { + ARROW_ASSIGN_OR_RAISE(combined_tab, table->CombineChunks(memory_pool)); } else { - tab_to_process = table; + combined_tab = table; } // sort to indices std::shared_ptr sorted_column_index; - RETURN_ARROW_STATUS_IF_FAILED(cylon::SortIndicesMultiColumns( - memory_pool, table, sort_column_indices, sorted_column_index, sort_column_directions)); + RETURN_ARROW_STATUS_IF_FAILED( + SortIndicesMultiColumns(memory_pool, combined_tab, sort_column_indices, sorted_column_index, + sort_column_directions)); // now sort everything based on sorted index arrow::ArrayVector sorted_columns; - sorted_columns.reserve(table->num_columns()); + sorted_columns.reserve(combined_tab->num_columns()); arrow::compute::ExecContext exec_context(memory_pool); // no bounds check is needed as indices are guaranteed to be within range const arrow::compute::TakeOptions &take_options = arrow::compute::TakeOptions::NoBoundsCheck(); - for (int col_index = 0; col_index < tab_to_process->num_columns(); ++col_index) { + for (int col_index = 0; col_index < combined_tab->num_columns(); ++col_index) { const arrow::Result &res = arrow::compute::Take( - cylon::util::GetChunkOrEmptyArray(tab_to_process->column(col_index), 0), + cylon::util::GetChunkOrEmptyArray(combined_tab->column(col_index), 0), sorted_column_index, take_options, &exec_context); RETURN_ARROW_STATUS_IF_FAILED(res.status()); sorted_columns.emplace_back(res.ValueOrDie().make_array()); } - sorted_table = arrow::Table::Make(table->schema(), sorted_columns); + sorted_table = arrow::Table::Make(combined_tab->schema(), sorted_columns); return arrow::Status::OK(); } @@ -376,5 +376,16 @@ arrow::Status MakeEmptyArrowTable(const std::shared_ptr &schema, return arrow::Status::OK(); } +bool CheckArrowTableContainsChunks(const std::shared_ptr &table, + const std::vector &columns) { + if (columns.empty()) { + return std::any_of(table->columns().begin(), table->columns().end(), + [](const auto &col) { return col->num_chunks() > 1; }); + } else { + return std::any_of(columns.begin(), columns.end(), + [&](const auto &i) { return table->column(i)->num_chunks() > 1; }); + } +} + } // namespace util } // namespace cylon diff --git a/cpp/src/cylon/util/arrow_utils.hpp b/cpp/src/cylon/util/arrow_utils.hpp index 777ab0b6a..7febfb3c5 100644 --- a/cpp/src/cylon/util/arrow_utils.hpp +++ b/cpp/src/cylon/util/arrow_utils.hpp @@ -60,7 +60,7 @@ static inline constexpr int64_t CheckBit(const int64_t v) { return (v >> (sizeof(int64_t) * CHAR_BIT - 1)) & int64_t(1); } -arrow::Status SortTable(const std::shared_ptr &table, int64_t sort_column_index, +arrow::Status SortTable(const std::shared_ptr &table, int32_t sort_column_index, arrow::MemoryPool *memory_pool, std::shared_ptr &sorted_table, bool ascending = true); @@ -150,9 +150,13 @@ arrow::Status CreateEmptyTable(const std::shared_ptr &schema, std::shared_ptr *output, arrow::MemoryPool *pool = arrow::default_memory_pool()); -arrow::Status MakeEmptyArrowTable(const std::shared_ptr& schema, std::shared_ptr* table, +arrow::Status MakeEmptyArrowTable(const std::shared_ptr &schema, + std::shared_ptr *table, arrow::MemoryPool *pool = arrow::default_memory_pool()); +bool CheckArrowTableContainsChunks(const std::shared_ptr &table, + const std::vector &columns = {}); + } // namespace util } // namespace cylon #endif // CYLON_SRC_UTIL_ARROW_UTILS_HPP_ From 78b89c9f1f7829a6162378f4263465e8e5d1f047 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Sun, 2 Jan 2022 18:58:01 -0500 Subject: [PATCH 09/10] minor --- build.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/build.py b/build.py index d2a800562..9cae4eda8 100644 --- a/build.py +++ b/build.py @@ -213,7 +213,7 @@ def build_python(): logger.error("The build should be in a conda environment") return - python_build_command = f'{PYTHON_EXEC} setup.py install' + python_build_command = f'{PYTHON_EXEC} -m pip install -U .' env = os.environ env["CYLON_PREFIX"] = str(BUILD_DIR) if os.name == 'posix': @@ -222,9 +222,6 @@ def build_python(): env["ARROW_PREFIX"] = str(Path(os.environ["CONDA_PREFIX"], "Library")) logger.info("Arrow prefix: " + str(Path(os.environ["CONDA_PREFIX"]))) - res = subprocess.run(f"{PYTHON_EXEC} -m pip uninstall -y pycylon", shell=True, env=env, - cwd=PYTHON_SOURCE_DIR) - check_status(res.returncode, "pip uninstall pycylon") res = subprocess.run(python_build_command, shell=True, env=env, cwd=PYTHON_SOURCE_DIR) check_status(res.returncode, "PyCylon build") From 8fddbb6caadb95049ad1f132cf9e7efbec7c738a Mon Sep 17 00:00:00 2001 From: niranda perera Date: Wed, 5 Jan 2022 10:07:44 -0500 Subject: [PATCH 10/10] Apply suggestions from code review Co-authored-by: Kaiying Shan --- cpp/src/cylon/util/arrow_utils.cpp | 2 +- python/pycylon/test/test_parquet.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/cylon/util/arrow_utils.cpp b/cpp/src/cylon/util/arrow_utils.cpp index d6bc926a7..eb429d12c 100644 --- a/cpp/src/cylon/util/arrow_utils.cpp +++ b/cpp/src/cylon/util/arrow_utils.cpp @@ -383,7 +383,7 @@ bool CheckArrowTableContainsChunks(const std::shared_ptr &table, [](const auto &col) { return col->num_chunks() > 1; }); } else { return std::any_of(columns.begin(), columns.end(), - [&](const auto &i) { return table->column(i)->num_chunks() > 1; }); + [&](int i) { return table->column(i)->num_chunks() > 1; }); } } diff --git a/python/pycylon/test/test_parquet.py b/python/pycylon/test/test_parquet.py index 9eaa8dcf6..4a2745a0f 100644 --- a/python/pycylon/test/test_parquet.py +++ b/python/pycylon/test/test_parquet.py @@ -36,5 +36,5 @@ def test_parquet_join(): out = cdf1.merge(cdf2, how='inner', on=[0], algorithm='sort', suffixes=('lt-', 'rt-')) - assert(expected.equals(out, ordered=False)) # should pass + assert(expected.equals(out, ordered=False)) assert (len(expected.to_table().subtract(out.to_table())) == 0)