From 539a1afe2a22cc2bdcc6f59ce254d24fa05ebe61 Mon Sep 17 00:00:00 2001 From: tqchen Date: Wed, 23 Sep 2015 12:23:45 -0700 Subject: [PATCH] [ENGINE] Remove additional state, more invariant check. [BUGFIX] Conditional variable modification need to be mutex locked --- src/common/object_pool.h | 4 ++ src/engine/threaded_engine.cc | 94 ++++++++++++++++++++++++----------- src/engine/threaded_engine.h | 40 +++++++++++---- 3 files changed, 98 insertions(+), 40 deletions(-) diff --git a/src/common/object_pool.h b/src/common/object_pool.h index 2e38654f7a4c..43d309e24bf5 100644 --- a/src/common/object_pool.h +++ b/src/common/object_pool.h @@ -57,6 +57,7 @@ class ObjectPool { * Currently defined to be 4KB. */ constexpr static std::size_t kPageSize = 1 << 12; + /*! \brief internal mutex */ std::mutex m_; /*! * \brief Head of free list. @@ -147,6 +148,9 @@ ObjectPool::ObjectPool() { template void ObjectPool::AllocateChunk() { static_assert(sizeof(LinkedList) <= kPageSize, "Object too big."); + static_assert(sizeof(LinkedList) % alignof(LinkedList) == 0, "ObjectPooll Invariant"); + static_assert(alignof(LinkedList) % alignof(T) == 0, "ObjectPooll Invariant"); + static_assert(kPageSize % alignof(LinkedList) == 0, "ObjectPooll Invariant"); void* new_chunk_ptr; int ret = posix_memalign(&new_chunk_ptr, kPageSize, kPageSize); CHECK_EQ(ret, 0) << "Allocation failed"; diff --git a/src/engine/threaded_engine.cc b/src/engine/threaded_engine.cc index 7e40fa888556..35ec65a423c6 100644 --- a/src/engine/threaded_engine.cc +++ b/src/engine/threaded_engine.cc @@ -31,15 +31,19 @@ ThreadedVar::ThreadedVar(VersionedVarBlock* head) : head_{head} { void ThreadedVar::AppendReadDependency(OprBlock* opr_block) { std::lock_guard lock{m_}; - if (ready_to_read_) { - assert(pending_write_ == nullptr); + if (pending_write_ == nullptr) { + // invariant: is_ready_to_read() + CHECK_GE(num_pending_reads_, 0); + // STATE CHANGE ++num_pending_reads_; - --opr_block->wait; + // decrease wait counter + opr_block->decr_wait(); } else { auto&& new_var_block = VersionedVarBlock::New(); assert(head_->next == nullptr); assert(head_->trigger == nullptr); assert(head_->write == false); + // append things to next. head_->next = new_var_block; head_->trigger = opr_block; head_ = new_var_block; @@ -47,19 +51,29 @@ void ThreadedVar::AppendReadDependency(OprBlock* opr_block) { } void ThreadedVar::AppendWriteDependency(OprBlock* opr_block) { - std::lock_guard lock{m_}; auto&& new_var_block = VersionedVarBlock::New(); + std::lock_guard lock{m_}; + // invariant. + assert(head_->next == nullptr); + assert(head_->trigger == nullptr); + assert(head_->write == false); + // attach to head. head_->next = new_var_block; head_->trigger = opr_block; head_->write = true; - if (ready_to_read_) { - // Raise `num_pending_reads_` temporarily to avoid premature triggering. - ++num_pending_reads_; + + // check if it is ready to write + if (pending_write_ == nullptr) { + // invariant: is_ready_to_read() pending_write_ = head_; - if (--num_pending_reads_ == 0) { - --opr_block->wait; + CHECK_GE(num_pending_reads_, 0); + if (num_pending_reads_ == 0) { + // STATE CHANGE + opr_block->decr_wait(); + num_pending_reads_ = kWriteTriggered; } - ready_to_read_ = false; + } else { + CHECK_NE(num_pending_reads_, 0); } head_ = new_var_block; } @@ -70,13 +84,17 @@ void ThreadedVar::CompleteReadDependency(Dispatcher dispatcher) { { // this is lock scope std::lock_guard lock{m_}; + CHECK_GT(num_pending_reads_, 0); + if (--num_pending_reads_ == 0) { - if (pending_write_ != nullptr && --pending_write_->trigger->wait == 0) { + if (pending_write_ != nullptr) { + // STATE CHANGE trigger = pending_write_->trigger; + num_pending_reads_ = kWriteTriggered; } } } - if (trigger != nullptr) { + if (trigger != nullptr && trigger->decr_wait() == 0) { dispatcher(trigger); } } @@ -88,12 +106,16 @@ bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) { OprBlock* trigger_write = nullptr; { std::lock_guard lock{m_}; - assert(ready_to_read_ == false); + // invariants + assert(head_->next == nullptr); + assert(pending_write_ != nullptr); + CHECK_EQ(num_pending_reads_, kWriteTriggered); + // really delete if (to_delete_) { VersionedVarBlock *head = pending_write_->next; VersionedVarBlock::Delete(pending_write_); - assert(head->next == nullptr); + assert(head_ == head); VersionedVarBlock::Delete(head); return true; } @@ -101,20 +123,22 @@ bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) { old_pending_write = pending_write_; // search for chains to trigger end_of_read_chain = old_pending_write->next; - assert(num_pending_reads_ == 0); - while (end_of_read_chain->next != nullptr && + // reset to 0 pending reads + num_pending_reads_ = 0; + while (end_of_read_chain != head_ && end_of_read_chain->write == false) { ++num_pending_reads_; end_of_read_chain = end_of_read_chain->next; } - // check the states - if (end_of_read_chain->next == nullptr) { - ready_to_read_ = true; + if (end_of_read_chain == head_) { pending_write_ = nullptr; } else { + // check if there is pending reads, if not trigger write assert(end_of_read_chain->write == true); pending_write_ = end_of_read_chain; - if (num_pending_reads_ == 0 && --end_of_read_chain->trigger->wait == 0) { + if (num_pending_reads_ == 0) { + // mark write as already actived in this var + num_pending_reads_ = kWriteTriggered; trigger_write = end_of_read_chain->trigger; } } @@ -129,7 +153,7 @@ bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) { VersionedVarBlock::Delete(old_pending_write); // dispatch all the events while (cur_head != end_of_read_chain) { - if (--cur_head->trigger->wait == 0) { + if (cur_head->trigger->decr_wait() == 0) { dispatcher(cur_head->trigger); } auto prev = cur_head; @@ -137,7 +161,7 @@ bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) { assert(cur_head != nullptr); VersionedVarBlock::Delete(prev); } - if (trigger_write != nullptr) { + if (trigger_write != nullptr && trigger_write->decr_wait() == 0) { dispatcher(trigger_write); } return false; @@ -150,7 +174,7 @@ void ThreadedVar::SetToDelete() { bool ThreadedVar::ready_to_read() { std::lock_guard lock{m_}; - return ready_to_read_; + return this->is_ready_to_read(); } // implementation of threaded engine @@ -232,8 +256,10 @@ void ThreadedEngine::Push(OprHandle op, Context exec_ctx) { ThreadedOpr* threaded_opr = ThreadedOpr::CastFromBase(op); OprBlock* opr_block = OprBlock::New(); opr_block->opr = threaded_opr; - opr_block->wait.store(threaded_opr->const_vars.size() + - threaded_opr->mutable_vars.size() + 1); + + opr_block->wait.store(static_cast( + threaded_opr->const_vars.size() + + threaded_opr->mutable_vars.size() + 1)); opr_block->ctx = exec_ctx; ++pending_; // Add read dependencies. @@ -244,7 +270,7 @@ void ThreadedEngine::Push(OprHandle op, Context exec_ctx) { for (auto&& i : threaded_opr->mutable_vars) { i->AppendWriteDependency(opr_block); } - if (--opr_block->wait == 0) { + if (opr_block->decr_wait() == 0) { this->PushToExecute(opr_block, true); } } @@ -275,7 +301,10 @@ void ThreadedEngine::WaitForVar(VarHandle var) { if (threaded_var->ready_to_read()) return; std::atomic done{false}; this->PushSync([this, &done](RunContext) { - done.store(true); + { + std::unique_lock lock{finished_m_}; + done.store(true); + } finished_cv_.notify_all(); }, Context::CPU(), {var}, {}, FnProperty::kNormal); { @@ -310,12 +339,17 @@ inline void ThreadedEngine::OnComplete(ThreadedOpr* threaded_opr) { ThreadedVar::Delete(i); } } + int npending; { std::unique_lock lock{finished_m_}; - if (--pending_ == 0) { - finished_cv_.notify_all(); - } + npending = --pending_; + } + CHECK_GE(npending, 0); + if (npending == 0) { + // no need to grab lock when notify. + finished_cv_.notify_all(); } + // delte operator if it is temperory if (threaded_opr->temporary) { ThreadedOpr::Delete(threaded_opr); diff --git a/src/engine/threaded_engine.h b/src/engine/threaded_engine.h index ae793fe62f3e..5ca2a503563a 100644 --- a/src/engine/threaded_engine.h +++ b/src/engine/threaded_engine.h @@ -43,13 +43,23 @@ struct OprBlock : public common::ObjectPoolAllocatable { /*! * \brief wait number of pending tasks this OprBlock is waiting for. */ - std::atomic wait{0}; + std::atomic wait{0}; /*! \brief Pointer to information on performing real operation */ ThreadedOpr* opr{nullptr}; /*! \brief The context this operator */ Context ctx; // define possible debug information DEFINE_ENGINE_DEBUG_INFO(OprBlock); + /*! + * \brief call this function to decrease the wait counter. + * \return the wait counter after the decreasement. + */ + inline int decr_wait() { + // chack invariant, avoid over trigger + int ret = --wait; + CHECK_GE(ret, 0); + return ret; + } }; // struct OprBlock /*! @@ -141,8 +151,11 @@ class ThreadedVar final : public Var, // TODO(hotpxl) consider rename head /*! \brief inetrnal mutex of the ThreadedVar */ std::mutex m_; - /*! \brief number of pending reads operation in the variable. */ - std::size_t num_pending_reads_{0}; + /*! + * \brief number of pending reads operation in the variable. + * will be marked as -1 when there is a already triggered pending write. + */ + int num_pending_reads_{0}; /*! * \brief Points to the last VersionedVarBlock in the queue. * head_ always points to a empty VersionedVarBlock. @@ -158,14 +171,19 @@ class ThreadedVar final : public Var, * This is actually the head(oldest operation) in the queue. */ VersionedVarBlock* pending_write_{nullptr}; - /*! - * \brief If true, then there are no running or pending write on this variable. - */ - bool ready_to_read_{true}; /*! * \brief If true, delete after operation completes. */ bool to_delete_{false}; + /*! \brief special const on num_pending_reads_ to mark write being triggered */ + static constexpr int kWriteTriggered = -1; + /*! + * \brief derived invariant of ready to ready, without lock. + * \return whether the current variable is ready to read. + */ + inline bool is_ready_to_read() const { + return pending_write_ == nullptr; + } }; // struct ThreadedVar /*! @@ -230,8 +248,10 @@ class ThreadedEngine : public Engine { ThreadedEngine() {} ~ThreadedEngine() { - // notify all pending waiters - kill_.store(true); + { + std::unique_lock lock{finished_m_}; + kill_.store(true); + } finished_cv_.notify_all(); } @@ -291,7 +311,7 @@ class ThreadedEngine : public Engine { /*! * \brief Number of pending operations. */ - std::atomic pending_{0}; + std::atomic pending_{0}; /*! \brief whether we want to kill the waiters */ std::atomic kill_{false}; /*! \brief whether it is during shutdown phase*/