From 94dcf028c12a892b753946563edcc4589f69fe79 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 4 Sep 2024 13:24:49 -0700 Subject: [PATCH] Enable more test cases for serial execution mode in SharedArbitratorTest (#10916) Summary: As title, and with minor refactors. The following 3 cases will still be executed with parallel mode only because: 1. `asyncArbitratonFromNonDriverContext`: Plan doesn't support serial execution 2. `arbitrateMemoryFromOtherOperator`: Plan doesn't support serial execution 3. `concurrentArbitration`: Test run hangs `3` should be fixed in subsequent PR. Pull Request resolved: https://github.com/facebookincubator/velox/pull/10916 Reviewed By: bikramSingh91 Differential Revision: D62169461 Pulled By: xiaoxmeng fbshipit-source-id: c506f9d73d8d663abfb2d79fb0bbbcc0eb5bd9b9 --- .../memory/tests/SharedArbitratorTest.cpp | 194 ++++++++++-------- velox/exec/tests/HashJoinTest.cpp | 18 +- velox/exec/tests/TableWriteTest.cpp | 12 +- velox/exec/tests/utils/ArbitratorTestUtil.cpp | 18 ++ velox/exec/tests/utils/ArbitratorTestUtil.h | 6 + 5 files changed, 160 insertions(+), 88 deletions(-) diff --git a/velox/common/memory/tests/SharedArbitratorTest.cpp b/velox/common/memory/tests/SharedArbitratorTest.cpp index 3dd90d2b437b..512a731e7d0f 100644 --- a/velox/common/memory/tests/SharedArbitratorTest.cpp +++ b/velox/common/memory/tests/SharedArbitratorTest.cpp @@ -226,7 +226,19 @@ class FakeMemoryOperatorFactory : public Operator::PlanNodeTranslator { uint32_t maxDrivers_{1}; }; -class SharedArbitrationTestBase : public exec::test::HiveConnectorTestBase { +namespace { +std::unique_ptr newParallelExecutor() { + return std::make_unique(32); +} + +struct TestParam { + bool isSerialExecutionMode{false}; +}; +} // namespace + +class SharedArbitrationTest : public testing::WithParamInterface, + public exec::test::HiveConnectorTestBase { + public: protected: static void SetUpTestCase() { exec::test::HiveConnectorTestBase::SetUpTestCase(); @@ -253,6 +265,12 @@ class SharedArbitrationTestBase : public exec::test::HiveConnectorTestBase { fuzzerOpts_.allowLazyVector = false; vector_ = makeRowVector(rowType_, fuzzerOpts_); numAddedPools_ = 0; + isSerialExecutionMode_ = GetParam().isSerialExecutionMode; + if (isSerialExecutionMode_) { + executor_ = nullptr; + } else { + executor_ = newParallelExecutor(); + } } void TearDown() override { @@ -295,6 +313,18 @@ class SharedArbitrationTestBase : public exec::test::HiveConnectorTestBase { } } + AssertQueryBuilder newQueryBuilder() { + AssertQueryBuilder builder = AssertQueryBuilder(duckDbQueryRunner_); + builder.serialExecution(isSerialExecutionMode_); + return builder; + } + + AssertQueryBuilder newQueryBuilder(const core::PlanNodePtr& plan) { + AssertQueryBuilder builder = AssertQueryBuilder(plan); + builder.serialExecution(isSerialExecutionMode_); + return builder; + } + static inline FakeMemoryOperatorFactory* fakeOperatorFactory_; std::unique_ptr memoryManager_; SharedArbitrator* arbitrator_{nullptr}; @@ -302,51 +332,19 @@ class SharedArbitrationTestBase : public exec::test::HiveConnectorTestBase { VectorFuzzer::Options fuzzerOpts_; RowVectorPtr vector_; std::atomic_uint64_t numAddedPools_{0}; + bool isSerialExecutionMode_{false}; }; -namespace { -std::unique_ptr newParallelExecutor() { - return std::make_unique(32); -} - -struct TestParam { - bool isSerialExecutionMode{false}; -}; -} // namespace - /// A test fixture that runs cases within parallel execution mode. -class SharedArbitrationTest : public SharedArbitrationTestBase { - protected: - void SetUp() override { - SharedArbitrationTestBase::SetUp(); - executor_ = newParallelExecutor(); - } -}; +class SharedArbitrationTestWithParallelExecutionModeOnly + : public SharedArbitrationTest {}; /// A test fixture that runs cases within both serial and /// parallel execution modes. -class SharedArbitrationTestWithExecutionModes - : public testing::WithParamInterface, - public SharedArbitrationTestBase { - public: - static std::vector getTestParams() { - return std::vector({{false}, {true}}); - } - - protected: - void SetUp() override { - SharedArbitrationTestBase::SetUp(); - isSerialExecutionMode_ = GetParam().isSerialExecutionMode; - if (isSerialExecutionMode_) { - executor_ = nullptr; - } else { - executor_ = newParallelExecutor(); - } - } - - bool isSerialExecutionMode_{false}; -}; +class SharedArbitrationTestWithThreadingModes : public SharedArbitrationTest {}; -DEBUG_ONLY_TEST_F(SharedArbitrationTest, queryArbitrationStateCheck) { +DEBUG_ONLY_TEST_P( + SharedArbitrationTestWithThreadingModes, + queryArbitrationStateCheck) { const std::vector vectors = createVectors(rowType_, 32, 32 << 20); createDuckDbTable(vectors); @@ -364,7 +362,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, queryArbitrationStateCheck) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); TestScopedSpillInjection scopedSpillInjection(100); core::PlanNodeId aggregationNodeId; - AssertQueryBuilder(duckDbQueryRunner_) + newQueryBuilder() .queryCtx(queryCtx) .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, "true") @@ -380,7 +378,9 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, queryArbitrationStateCheck) { ASSERT_FALSE(queryCtx->testingUnderArbitration()); } -DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenAbortAndArbitrationLeave) { +DEBUG_ONLY_TEST_P( + SharedArbitrationTestWithThreadingModes, + raceBetweenAbortAndArbitrationLeave) { const std::vector vectors = createVectors(rowType_, 32, 32 << 20); setupMemory(kMemoryCapacity, /*memoryPoolInitCapacity=*/0); @@ -415,7 +415,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenAbortAndArbitrationLeave) { .capturePlanNodeId(aggregationNodeId) .planNode(); VELOX_ASSERT_THROW( - AssertQueryBuilder(plan) + newQueryBuilder(plan) .queryCtx(queryCtx) .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, "true") @@ -434,7 +434,9 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenAbortAndArbitrationLeave) { waitForAllTasksToBeDeleted(); } -DEBUG_ONLY_TEST_F(SharedArbitrationTest, skipNonReclaimableTaskTest) { +DEBUG_ONLY_TEST_P( + SharedArbitrationTestWithThreadingModes, + skipNonReclaimableTaskTest) { const std::vector vectors = createVectors(rowType_, 32, 32 << 20); std::shared_ptr queryCtx = @@ -485,7 +487,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, skipNonReclaimableTaskTest) { .planNode(); std::thread spillableThread([&]() { const auto spillDirectory = exec::test::TempDirectoryPath::create(); - AssertQueryBuilder(spillPlan) + newQueryBuilder(spillPlan) .queryCtx(queryCtx) .spillDirectory(spillDirectory->getPath()) .copyResults(pool()); @@ -501,7 +503,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, skipNonReclaimableTaskTest) { false) .planNode(); std::thread nonSpillableThread([&]() { - AssertQueryBuilder(nonSpillPlan).queryCtx(queryCtx).copyResults(pool()); + newQueryBuilder(nonSpillPlan).queryCtx(queryCtx).copyResults(pool()); }); while (!blockedPartialAggregation || !blockedAggregation) { @@ -525,7 +527,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, skipNonReclaimableTaskTest) { ASSERT_EQ(taskPausedCount, 1); } -DEBUG_ONLY_TEST_P(SharedArbitrationTestWithExecutionModes, reclaimToOrderBy) { +DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) { const int numVectors = 32; std::vector vectors; for (int i = 0; i < numVectors; ++i) { @@ -588,7 +590,7 @@ DEBUG_ONLY_TEST_P(SharedArbitrationTestWithExecutionModes, reclaimToOrderBy) { std::thread orderByThread([&]() { core::PlanNodeId orderByNodeId; auto task = - AssertQueryBuilder(duckDbQueryRunner_) + newQueryBuilder() .queryCtx(orderByQueryCtx) .serialExecution(isSerialExecutionMode_) .plan(PlanBuilder() @@ -605,7 +607,7 @@ DEBUG_ONLY_TEST_P(SharedArbitrationTestWithExecutionModes, reclaimToOrderBy) { std::thread memThread([&]() { auto task = - AssertQueryBuilder(duckDbQueryRunner_) + newQueryBuilder() .queryCtx(fakeMemoryQueryCtx) .serialExecution(isSerialExecutionMode_) .plan(PlanBuilder() @@ -628,7 +630,7 @@ DEBUG_ONLY_TEST_P(SharedArbitrationTestWithExecutionModes, reclaimToOrderBy) { } DEBUG_ONLY_TEST_P( - SharedArbitrationTestWithExecutionModes, + SharedArbitrationTestWithThreadingModes, reclaimToAggregation) { const int numVectors = 32; std::vector vectors; @@ -692,7 +694,7 @@ DEBUG_ONLY_TEST_P( std::thread aggregationThread([&]() { core::PlanNodeId aggregationNodeId; auto task = - AssertQueryBuilder(duckDbQueryRunner_) + newQueryBuilder() .queryCtx(aggregationQueryCtx) .serialExecution(isSerialExecutionMode_) .plan(PlanBuilder() @@ -710,7 +712,7 @@ DEBUG_ONLY_TEST_P( std::thread memThread([&]() { auto task = - AssertQueryBuilder(duckDbQueryRunner_) + newQueryBuilder() .queryCtx(fakeMemoryQueryCtx) .serialExecution(isSerialExecutionMode_) .plan(PlanBuilder() @@ -733,7 +735,7 @@ DEBUG_ONLY_TEST_P( } DEBUG_ONLY_TEST_P( - SharedArbitrationTestWithExecutionModes, + SharedArbitrationTestWithThreadingModes, reclaimToJoinBuilder) { const int numVectors = 32; std::vector vectors; @@ -798,7 +800,7 @@ DEBUG_ONLY_TEST_P( auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId joinNodeId; auto task = - AssertQueryBuilder(duckDbQueryRunner_) + newQueryBuilder() .queryCtx(joinQueryCtx) .serialExecution(isSerialExecutionMode_) .plan(PlanBuilder(planNodeIdGenerator) @@ -826,7 +828,7 @@ DEBUG_ONLY_TEST_P( std::thread memThread([&]() { auto task = - AssertQueryBuilder(duckDbQueryRunner_) + newQueryBuilder() .queryCtx(fakeMemoryQueryCtx) .serialExecution(isSerialExecutionMode_) .plan(PlanBuilder() @@ -848,7 +850,9 @@ DEBUG_ONLY_TEST_P( } } -DEBUG_ONLY_TEST_F(SharedArbitrationTest, driverInitTriggeredArbitration) { +DEBUG_ONLY_TEST_P( + SharedArbitrationTestWithThreadingModes, + driverInitTriggeredArbitration) { const int numVectors = 2; std::vector vectors; const int vectorSize = 100; @@ -872,7 +876,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, driverInitTriggeredArbitration) { ASSERT_EQ(queryCtx->pool()->maxCapacity(), kMemoryCapacity); auto planNodeIdGenerator = std::make_shared(); - AssertQueryBuilder(duckDbQueryRunner_) + newQueryBuilder() .config(core::QueryConfig::kSpillEnabled, "false") .queryCtx(queryCtx) .plan(PlanBuilder(planNodeIdGenerator, pool()) @@ -885,8 +889,8 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, driverInitTriggeredArbitration) { waitForAllTasksToBeDeleted(); } -DEBUG_ONLY_TEST_F( - SharedArbitrationTest, +DEBUG_ONLY_TEST_P( + SharedArbitrationTestWithThreadingModes, DISABLED_raceBetweenTaskTerminateAndReclaim) { setupMemory(kMemoryCapacity, 0); const int numVectors = 10; @@ -949,7 +953,7 @@ DEBUG_ONLY_TEST_F( const auto spillDirectory = exec::test::TempDirectoryPath::create(); std::thread queryThread([&]() { VELOX_ASSERT_THROW( - AssertQueryBuilder(duckDbQueryRunner_) + newQueryBuilder() .queryCtx(queryCtx) .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, "true") @@ -982,7 +986,9 @@ DEBUG_ONLY_TEST_F( waitForAllTasksToBeDeleted(); } -DEBUG_ONLY_TEST_F(SharedArbitrationTest, asyncArbitratonFromNonDriverContext) { +DEBUG_ONLY_TEST_P( + SharedArbitrationTestWithParallelExecutionModeOnly, + asyncArbitratonFromNonDriverContext) { setupMemory(kMemoryCapacity, 0); const int numVectors = 10; std::vector vectors; @@ -1020,7 +1026,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, asyncArbitratonFromNonDriverContext) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); std::shared_ptr task; std::thread queryThread([&]() { - task = AssertQueryBuilder(duckDbQueryRunner_) + task = newQueryBuilder() .queryCtx(queryCtx) .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, "true") @@ -1058,7 +1064,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, asyncArbitratonFromNonDriverContext) { waitForAllTasksToBeDeleted(); } -DEBUG_ONLY_TEST_F(SharedArbitrationTest, runtimeStats) { +DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, runtimeStats) { const uint64_t memoryCapacity = 128 * MB; setupMemory(memoryCapacity); fuzzerOpts_.vectorSize = 1000; @@ -1101,7 +1107,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, runtimeStats) { .planNode(); { const std::shared_ptr task = - AssertQueryBuilder(duckDbQueryRunner_) + newQueryBuilder() .queryCtx(queryCtx) .maxDrivers(1) .spillDirectory(spillDirectory->getPath()) @@ -1136,7 +1142,9 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, runtimeStats) { waitForAllTasksToBeDeleted(); } -DEBUG_ONLY_TEST_F(SharedArbitrationTest, arbitrateMemoryFromOtherOperator) { +DEBUG_ONLY_TEST_P( + SharedArbitrationTestWithParallelExecutionModeOnly, + arbitrateMemoryFromOtherOperator) { setupMemory(kMemoryCapacity, 0); const int numVectors = 10; std::vector vectors; @@ -1195,7 +1203,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, arbitrateMemoryFromOtherOperator) { core::PlanNodeId aggregationNodeId; std::thread queryThread([&]() { if (sameDriver) { - task = AssertQueryBuilder(duckDbQueryRunner_) + task = newQueryBuilder() .queryCtx(queryCtx) .plan(PlanBuilder() .values(vectors) @@ -1206,7 +1214,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, arbitrateMemoryFromOtherOperator) { .assertResults( "SELECT c0, c1, array_agg(c2) FROM tmp GROUP BY c0, c1"); } else { - task = AssertQueryBuilder(duckDbQueryRunner_) + task = newQueryBuilder() .queryCtx(queryCtx) .plan(PlanBuilder() .values(vectors) @@ -1233,7 +1241,9 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, arbitrateMemoryFromOtherOperator) { } } -TEST_F(SharedArbitrationTest, concurrentArbitration) { +TEST_P( + SharedArbitrationTestWithParallelExecutionModeOnly, + concurrentArbitration) { // Tries to replicate an actual workload by concurrently running multiple // query shapes that support spilling (and hence can be forced to abort or // spill by the arbitrator). Also adds an element of randomness by randomly @@ -1251,18 +1261,31 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) { vectors.push_back(makeRowVector(rowType_, fuzzerOpts_)); } const int numDrivers = 4; - const auto expectedWriteResult = - runWriteTask( - vectors, nullptr, numDrivers, pool(), kHiveConnectorId, false) - .data; + const auto expectedWriteResult = runWriteTask( + vectors, + nullptr, + isSerialExecutionMode_, + numDrivers, + pool(), + kHiveConnectorId, + false) + .data; const auto expectedJoinResult = - runHashJoinTask(vectors, nullptr, numDrivers, pool(), false).data; + runHashJoinTask( + vectors, nullptr, isSerialExecutionMode_, numDrivers, pool(), false) + .data; const auto expectedOrderResult = - runOrderByTask(vectors, nullptr, numDrivers, pool(), false).data; + runOrderByTask( + vectors, nullptr, isSerialExecutionMode_, numDrivers, pool(), false) + .data; const auto expectedRowNumberResult = - runRowNumberTask(vectors, nullptr, numDrivers, pool(), false).data; + runRowNumberTask( + vectors, nullptr, isSerialExecutionMode_, numDrivers, pool(), false) + .data; const auto expectedTopNResult = - runTopNTask(vectors, nullptr, numDrivers, pool(), false).data; + runTopNTask( + vectors, nullptr, isSerialExecutionMode_, numDrivers, pool(), false) + .data; struct { uint64_t totalCapacity; @@ -1305,6 +1328,7 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) { task = runWriteTask( vectors, queryCtx, + isSerialExecutionMode_, numDrivers, pool(), kHiveConnectorId, @@ -1315,6 +1339,7 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) { task = runHashJoinTask( vectors, queryCtx, + isSerialExecutionMode_, numDrivers, pool(), true, @@ -1324,6 +1349,7 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) { task = runOrderByTask( vectors, queryCtx, + isSerialExecutionMode_, numDrivers, pool(), true, @@ -1333,6 +1359,7 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) { task = runRowNumberTask( vectors, queryCtx, + isSerialExecutionMode_, numDrivers, pool(), true, @@ -1342,6 +1369,7 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) { task = runTopNTask( vectors, queryCtx, + isSerialExecutionMode_, numDrivers, pool(), true, @@ -1376,7 +1404,7 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) { } } -TEST_F(SharedArbitrationTest, reserveReleaseCounters) { +TEST_P(SharedArbitrationTestWithThreadingModes, reserveReleaseCounters) { for (int i = 0; i < 37; ++i) { folly::Random::DefaultGenerator rng(i); auto numRootPools = folly::Random::rand32(rng) % 11 + 3; @@ -1407,10 +1435,14 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) { } VELOX_INSTANTIATE_TEST_SUITE_P( - SharedArbitrationTestWithExecutionModes, - SharedArbitrationTestWithExecutionModes, - testing::ValuesIn( - SharedArbitrationTestWithExecutionModes::getTestParams())); + SharedArbitrationTest, + SharedArbitrationTestWithParallelExecutionModeOnly, + testing::ValuesIn(std::vector{{false}})); + +VELOX_INSTANTIATE_TEST_SUITE_P( + SharedArbitrationTest, + SharedArbitrationTestWithThreadingModes, + testing::ValuesIn(std::vector{{false}, {true}})); } // namespace facebook::velox::memory int main(int argc, char** argv) { diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index f9c2402b65ef..3d59824fd93d 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -6937,6 +6937,7 @@ TEST_F(HashJoinTest, reclaimFromJoinBuilderWithMultiDrivers) { vectors, newQueryCtx( memoryManagerWithoutArbitrator.get(), executor_.get(), 8L << 30), + false, numDrivers, pool(), false) @@ -6949,6 +6950,7 @@ TEST_F(HashJoinTest, reclaimFromJoinBuilderWithMultiDrivers) { vectors, newQueryCtx( memoryManagerWithArbitrator.get(), executor_.get(), 128 << 20), + false, numDrivers, pool(), true, @@ -6979,7 +6981,7 @@ DEBUG_ONLY_TEST_F( std::shared_ptr queryCtx = newQueryCtx(memory::memoryManager(), executor_.get(), 512 << 20); const auto expectedResult = - runHashJoinTask(vectors, queryCtx, numDrivers, pool(), false).data; + runHashJoinTask(vectors, queryCtx, false, numDrivers, pool(), false).data; std::atomic_bool nonReclaimableSectionWaitFlag{true}; std::atomic_bool reclaimerInitializationWaitFlag{true}; @@ -7028,7 +7030,7 @@ DEBUG_ONLY_TEST_F( std::thread joinThread([&]() { const auto result = runHashJoinTask( - vectors, queryCtx, numDrivers, pool(), true, expectedResult); + vectors, queryCtx, false, numDrivers, pool(), true, expectedResult); auto taskStats = exec::toPlanStats(result.task->taskStats()); auto& planStats = taskStats.at(result.planNodeId); ASSERT_EQ(planStats.spilledBytes, 0); @@ -7438,7 +7440,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, taskWaitTimeout) { createVectors(rowType, queryMemoryCapacity / 2, fuzzerOpts_); const int numDrivers = 4; const auto expectedResult = - runHashJoinTask(vectors, nullptr, numDrivers, pool(), false).data; + runHashJoinTask(vectors, nullptr, false, numDrivers, pool(), false).data; for (uint64_t timeoutMs : {0, 1'000, 30'000}) { SCOPED_TRACE(fmt::format("timeout {}", succinctMillis(timeoutMs))); @@ -7478,12 +7480,18 @@ DEBUG_ONLY_TEST_F(HashJoinTest, taskWaitTimeout) { if (timeoutMs == 1'000) { VELOX_ASSERT_THROW( runHashJoinTask( - vectors, queryCtx, numDrivers, pool(), true, expectedResult), + vectors, + queryCtx, + false, + numDrivers, + pool(), + true, + expectedResult), "Memory reclaim failed to wait"); } else { // We expect succeed on large time out or no timeout. const auto result = runHashJoinTask( - vectors, queryCtx, numDrivers, pool(), true, expectedResult); + vectors, queryCtx, false, numDrivers, pool(), true, expectedResult); auto taskStats = exec::toPlanStats(result.task->taskStats()); auto& planStats = taskStats.at(result.planNodeId); ASSERT_GT(planStats.spilledBytes, 0); diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index c54ebd3e443b..ca1a16f586ec 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -4282,7 +4282,8 @@ DEBUG_ONLY_TEST_F( std::vector vectors = createVectors(rowType_, memoryCapacity / 8, fuzzerOpts_); const auto expectedResult = - runWriteTask(vectors, nullptr, 1, pool(), kHiveConnectorId, false).data; + runWriteTask(vectors, nullptr, false, 1, pool(), kHiveConnectorId, false) + .data; auto queryCtx = newQueryCtx(memory::memoryManager(), executor_.get(), memoryCapacity); @@ -4307,7 +4308,14 @@ DEBUG_ONLY_TEST_F( std::thread queryThread([&]() { const auto result = runWriteTask( - vectors, queryCtx, 1, pool(), kHiveConnectorId, true, expectedResult); + vectors, + queryCtx, + false, + 1, + pool(), + kHiveConnectorId, + true, + expectedResult); }); writerCloseWait.await([&]() { return !writerCloseWaitFlag.load(); }); diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.cpp b/velox/exec/tests/utils/ArbitratorTestUtil.cpp index a10d22661630..f971526db872 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.cpp +++ b/velox/exec/tests/utils/ArbitratorTestUtil.cpp @@ -91,6 +91,7 @@ core::PlanNodePtr hashJoinPlan( QueryTestResult runHashJoinTask( const std::vector& vectors, const std::shared_ptr& queryCtx, + bool serialExecution, uint32_t numDrivers, memory::MemoryPool* pool, bool enableSpilling, @@ -100,6 +101,7 @@ QueryTestResult runHashJoinTask( if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) + .serialExecution(serialExecution) .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) @@ -109,6 +111,7 @@ QueryTestResult runHashJoinTask( .copyResults(pool, result.task); } else { result.data = AssertQueryBuilder(plan) + .serialExecution(serialExecution) .queryCtx(queryCtx) .maxDrivers(numDrivers) .copyResults(pool, result.task); @@ -133,6 +136,7 @@ core::PlanNodePtr aggregationPlan( QueryTestResult runAggregateTask( const std::vector& vectors, const std::shared_ptr& queryCtx, + bool serialExecution, bool enableSpilling, uint32_t numDrivers, memory::MemoryPool* pool, @@ -143,6 +147,7 @@ QueryTestResult runAggregateTask( const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) + .serialExecution(serialExecution) .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "true") @@ -151,6 +156,7 @@ QueryTestResult runAggregateTask( .copyResults(pool, result.task); } else { result.data = AssertQueryBuilder(plan) + .serialExecution(serialExecution) .queryCtx(queryCtx) .maxDrivers(numDrivers) .copyResults(pool, result.task); @@ -176,6 +182,7 @@ core::PlanNodePtr orderByPlan( QueryTestResult runOrderByTask( const std::vector& vectors, const std::shared_ptr& queryCtx, + bool serialExecution, uint32_t numDrivers, memory::MemoryPool* pool, bool enableSpilling, @@ -185,6 +192,7 @@ QueryTestResult runOrderByTask( if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) + .serialExecution(serialExecution) .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kOrderBySpillEnabled, "true") @@ -193,6 +201,7 @@ QueryTestResult runOrderByTask( .copyResults(pool, result.task); } else { result.data = AssertQueryBuilder(plan) + .serialExecution(serialExecution) .queryCtx(queryCtx) .maxDrivers(numDrivers) .copyResults(pool, result.task); @@ -218,6 +227,7 @@ core::PlanNodePtr rowNumberPlan( QueryTestResult runRowNumberTask( const std::vector& vectors, const std::shared_ptr& queryCtx, + bool serialExecution, uint32_t numDrivers, memory::MemoryPool* pool, bool enableSpilling, @@ -227,6 +237,7 @@ QueryTestResult runRowNumberTask( if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) + .serialExecution(serialExecution) .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kRowNumberSpillEnabled, "true") @@ -235,6 +246,7 @@ QueryTestResult runRowNumberTask( .copyResults(pool, result.task); } else { result.data = AssertQueryBuilder(plan) + .serialExecution(serialExecution) .queryCtx(queryCtx) .maxDrivers(numDrivers) .copyResults(pool, result.task); @@ -260,6 +272,7 @@ core::PlanNodePtr topNPlan( QueryTestResult runTopNTask( const std::vector& vectors, const std::shared_ptr& queryCtx, + bool serialExecution, uint32_t numDrivers, memory::MemoryPool* pool, bool enableSpilling, @@ -270,6 +283,7 @@ QueryTestResult runTopNTask( const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) + .serialExecution(serialExecution) .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") @@ -278,6 +292,7 @@ QueryTestResult runTopNTask( .copyResults(pool, result.task); } else { result.data = AssertQueryBuilder(plan) + .serialExecution(serialExecution) .queryCtx(queryCtx) .maxDrivers(numDrivers) .copyResults(pool, result.task); @@ -305,6 +320,7 @@ core::PlanNodePtr writePlan( QueryTestResult runWriteTask( const std::vector& vectors, const std::shared_ptr& queryCtx, + bool serialExecution, uint32_t numDrivers, memory::MemoryPool* pool, const std::string& kHiveConnectorId, @@ -317,6 +333,7 @@ QueryTestResult runWriteTask( const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) + .serialExecution(serialExecution) .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "false") @@ -345,6 +362,7 @@ QueryTestResult runWriteTask( .copyResults(pool, result.task); } else { result.data = AssertQueryBuilder(plan) + .serialExecution(serialExecution) .queryCtx(queryCtx) .maxDrivers(numDrivers) .copyResults(pool, result.task); diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.h b/velox/exec/tests/utils/ArbitratorTestUtil.h index e2987d737cf5..9aa50153b17d 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.h +++ b/velox/exec/tests/utils/ArbitratorTestUtil.h @@ -114,6 +114,7 @@ core::PlanNodePtr hashJoinPlan( QueryTestResult runHashJoinTask( const std::vector& vectors, const std::shared_ptr& queryCtx, + bool serialExecution, uint32_t numDrivers, memory::MemoryPool* pool, bool enableSpilling, @@ -126,6 +127,7 @@ core::PlanNodePtr aggregationPlan( QueryTestResult runAggregateTask( const std::vector& vectors, const std::shared_ptr& queryCtx, + bool serialExecution, bool enableSpilling, uint32_t numDrivers, memory::MemoryPool* pool, @@ -138,6 +140,7 @@ core::PlanNodePtr orderByPlan( QueryTestResult runOrderByTask( const std::vector& vectors, const std::shared_ptr& queryCtx, + bool serialExecution, uint32_t numDrivers, memory::MemoryPool* pool, bool enableSpilling, @@ -150,6 +153,7 @@ core::PlanNodePtr rowNumberPlan( QueryTestResult runRowNumberTask( const std::vector& vectors, const std::shared_ptr& queryCtx, + bool serialExecution, uint32_t numDrivers, memory::MemoryPool* pool, bool enableSpilling, @@ -162,6 +166,7 @@ core::PlanNodePtr topNPlan( QueryTestResult runTopNTask( const std::vector& vectors, const std::shared_ptr& queryCtx, + bool serialExecution, uint32_t numDrivers, memory::MemoryPool* pool, bool enableSpilling, @@ -175,6 +180,7 @@ core::PlanNodePtr writePlan( QueryTestResult runWriteTask( const std::vector& vectors, const std::shared_ptr& queryCtx, + bool serialExecution, uint32_t numDrivers, memory::MemoryPool* pool, const std::string& kHiveConnectorId,