From a80476179eb0f4c47837ca5630235bf5e995fbfb Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 6 May 2020 23:36:10 +0200 Subject: [PATCH 1/3] src: split out callback queue implementation from Environment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This isn’t conceptually tied to anything Node.js-specific at all. --- node.gyp | 2 + src/callback_queue-inl.h | 96 ++++++++++++++++++++++++++++++++++++++++ src/callback_queue.h | 64 +++++++++++++++++++++++++++ src/env-inl.h | 80 +++------------------------------ src/env.cc | 5 +-- src/env.h | 45 +------------------ 6 files changed, 172 insertions(+), 120 deletions(-) create mode 100644 src/callback_queue-inl.h create mode 100644 src/callback_queue.h diff --git a/node.gyp b/node.gyp index 3dadad15c9e193..961164ac320f07 100644 --- a/node.gyp +++ b/node.gyp @@ -642,6 +642,8 @@ 'src/base_object.h', 'src/base_object-inl.h', 'src/base64.h', + 'src/callback_queue.h', + 'src/callback_queue-inl.h', 'src/connect_wrap.h', 'src/connection_wrap.h', 'src/debug_utils.h', diff --git a/src/callback_queue-inl.h b/src/callback_queue-inl.h new file mode 100644 index 00000000000000..e135e584b34eab --- /dev/null +++ b/src/callback_queue-inl.h @@ -0,0 +1,96 @@ +#ifndef SRC_CALLBACK_QUEUE_INL_H_ +#define SRC_CALLBACK_QUEUE_INL_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "callback_queue.h" + +namespace node { + +template +template +std::unique_ptr::Callback> +CallbackQueue::CreateCallback(Fn&& fn, bool refed) { + return std::make_unique>(std::move(fn), refed); +} + +template +std::unique_ptr::Callback> +CallbackQueue::Shift() { + std::unique_ptr ret = std::move(head_); + if (ret) { + head_ = ret->get_next(); + if (!head_) + tail_ = nullptr; // The queue is now empty. + } + size_--; + return ret; +} + +template +void CallbackQueue::Push(std::unique_ptr cb) { + Callback* prev_tail = tail_; + + size_++; + tail_ = cb.get(); + if (prev_tail != nullptr) + prev_tail->set_next(std::move(cb)); + else + head_ = std::move(cb); +} + +template +void CallbackQueue::ConcatMove(CallbackQueue&& other) { + size_ += other.size_; + if (tail_ != nullptr) + tail_->set_next(std::move(other.head_)); + else + head_ = std::move(other.head_); + tail_ = other.tail_; + other.tail_ = nullptr; + other.size_ = 0; +} + +template +size_t CallbackQueue::size() const { + return size_.load(); +} + +template +CallbackQueue::Callback::Callback(bool refed) + : refed_(refed) {} + +template +bool CallbackQueue::Callback::is_refed() const { + return refed_; +} + +template +std::unique_ptr::Callback> +CallbackQueue::Callback::get_next() { + return std::move(next_); +} + +template +void CallbackQueue::Callback::set_next(std::unique_ptr next) { + next_ = std::move(next); +} + +template +template +CallbackQueue::CallbackImpl::CallbackImpl( + Fn&& callback, bool refed) + : Callback(refed), + callback_(std::move(callback)) {} + +template +template +R CallbackQueue::CallbackImpl::Call(Args&&... args) { + return callback_(std::forward(args)...); +} + +} // namespace node + +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#endif // SRC_CALLBACK_QUEUE_INL_H_ diff --git a/src/callback_queue.h b/src/callback_queue.h new file mode 100644 index 00000000000000..dd626442aabff0 --- /dev/null +++ b/src/callback_queue.h @@ -0,0 +1,64 @@ +#ifndef SRC_CALLBACK_QUEUE_H_ +#define SRC_CALLBACK_QUEUE_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include + +namespace node { + +template +class CallbackQueue { + public: + class Callback { + public: + explicit inline Callback(bool refed); + + virtual ~Callback() = default; + virtual R Call(Args&&... args) = 0; + + inline bool is_refed() const; + + private: + inline std::unique_ptr get_next(); + inline void set_next(std::unique_ptr next); + + bool refed_; + std::unique_ptr next_; + + friend class CallbackQueue; + }; + + template + inline std::unique_ptr CreateCallback(Fn&& fn, bool refed); + + inline std::unique_ptr Shift(); + inline void Push(std::unique_ptr cb); + // ConcatMove adds elements from 'other' to the end of this list, and clears + // 'other' afterwards. + inline void ConcatMove(CallbackQueue&& other); + + // size() is atomic and may be called from any thread. + inline size_t size() const; + + private: + template + class CallbackImpl final : public Callback { + public: + CallbackImpl(Fn&& callback, bool refed); + R Call(Args&&... args) override; + + private: + Fn callback_; + }; + + std::atomic size_ {0}; + std::unique_ptr head_; + Callback* tail_ = nullptr; +}; + +} // namespace node + +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#endif // SRC_CALLBACK_QUEUE_H_ diff --git a/src/env-inl.h b/src/env-inl.h index 853ba78de6d548..a2db24ed172dc9 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -25,6 +25,7 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #include "aliased_buffer.h" +#include "callback_queue-inl.h" #include "env.h" #include "node.h" #include "util-inl.h" @@ -705,50 +706,9 @@ inline void IsolateData::set_options( options_ = std::move(options); } -std::unique_ptr -Environment::NativeImmediateQueue::Shift() { - std::unique_ptr ret = std::move(head_); - if (ret) { - head_ = ret->get_next(); - if (!head_) - tail_ = nullptr; // The queue is now empty. - } - size_--; - return ret; -} - -void Environment::NativeImmediateQueue::Push( - std::unique_ptr cb) { - NativeImmediateCallback* prev_tail = tail_; - - size_++; - tail_ = cb.get(); - if (prev_tail != nullptr) - prev_tail->set_next(std::move(cb)); - else - head_ = std::move(cb); -} - -void Environment::NativeImmediateQueue::ConcatMove( - NativeImmediateQueue&& other) { - size_ += other.size_; - if (tail_ != nullptr) - tail_->set_next(std::move(other.head_)); - else - head_ = std::move(other.head_); - tail_ = other.tail_; - other.tail_ = nullptr; - other.size_ = 0; -} - -size_t Environment::NativeImmediateQueue::size() const { - return size_.load(); -} - template void Environment::CreateImmediate(Fn&& cb, bool ref) { - auto callback = std::make_unique>( - std::move(cb), ref); + auto callback = native_immediates_.CreateCallback(std::move(cb), ref); native_immediates_.Push(std::move(callback)); } @@ -768,8 +728,8 @@ void Environment::SetUnrefImmediate(Fn&& cb) { template void Environment::SetImmediateThreadsafe(Fn&& cb) { - auto callback = std::make_unique>( - std::move(cb), false); + auto callback = + native_immediates_threadsafe_.CreateCallback(std::move(cb), false); { Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); native_immediates_threadsafe_.Push(std::move(callback)); @@ -780,8 +740,8 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) { template void Environment::RequestInterrupt(Fn&& cb) { - auto callback = std::make_unique>( - std::move(cb), false); + auto callback = + native_immediates_interrupts_.CreateCallback(std::move(cb), false); { Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); native_immediates_interrupts_.Push(std::move(callback)); @@ -791,34 +751,6 @@ void Environment::RequestInterrupt(Fn&& cb) { RequestInterruptFromV8(); } -Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed) - : refed_(refed) {} - -bool Environment::NativeImmediateCallback::is_refed() const { - return refed_; -} - -std::unique_ptr -Environment::NativeImmediateCallback::get_next() { - return std::move(next_); -} - -void Environment::NativeImmediateCallback::set_next( - std::unique_ptr next) { - next_ = std::move(next); -} - -template -Environment::NativeImmediateCallbackImpl::NativeImmediateCallbackImpl( - Fn&& callback, bool refed) - : NativeImmediateCallback(refed), - callback_(std::move(callback)) {} - -template -void Environment::NativeImmediateCallbackImpl::Call(Environment* env) { - callback_(env); -} - inline bool Environment::can_call_into_js() const { return can_call_into_js_ && !is_stopping(); } diff --git a/src/env.cc b/src/env.cc index 3efa5c3b9c98ab..06e6fe6f793536 100644 --- a/src/env.cc +++ b/src/env.cc @@ -729,7 +729,7 @@ void Environment::RunAndClearInterrupts() { } DebugSealHandleScope seal_handle_scope(isolate()); - while (std::unique_ptr head = queue.Shift()) + while (auto head = queue.Shift()) head->Call(this); } } @@ -755,8 +755,7 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { auto drain_list = [&]() { TryCatchScope try_catch(this); DebugSealHandleScope seal_handle_scope(isolate()); - while (std::unique_ptr head = - native_immediates_.Shift()) { + while (auto head = native_immediates_.Shift()) { if (head->is_refed()) ref_count++; diff --git a/src/env.h b/src/env.h index 146754d03ab728..182ea4e4077b21 100644 --- a/src/env.h +++ b/src/env.h @@ -29,6 +29,7 @@ #include "inspector_agent.h" #include "inspector_profiler.h" #endif +#include "callback_queue.h" #include "debug_utils.h" #include "handle_wrap.h" #include "node.h" @@ -1368,49 +1369,7 @@ class Environment : public MemoryRetainer { std::list at_exit_functions_; - class NativeImmediateCallback { - public: - explicit inline NativeImmediateCallback(bool refed); - - virtual ~NativeImmediateCallback() = default; - virtual void Call(Environment* env) = 0; - - inline bool is_refed() const; - inline std::unique_ptr get_next(); - inline void set_next(std::unique_ptr next); - - private: - bool refed_; - std::unique_ptr next_; - }; - - template - class NativeImmediateCallbackImpl final : public NativeImmediateCallback { - public: - NativeImmediateCallbackImpl(Fn&& callback, bool refed); - void Call(Environment* env) override; - - private: - Fn callback_; - }; - - class NativeImmediateQueue { - public: - inline std::unique_ptr Shift(); - inline void Push(std::unique_ptr cb); - // ConcatMove adds elements from 'other' to the end of this list, and clears - // 'other' afterwards. - inline void ConcatMove(NativeImmediateQueue&& other); - - // size() is atomic and may be called from any thread. - inline size_t size() const; - - private: - std::atomic size_ {0}; - std::unique_ptr head_; - NativeImmediateCallback* tail_ = nullptr; - }; - + typedef CallbackQueue NativeImmediateQueue; NativeImmediateQueue native_immediates_; Mutex native_immediates_threadsafe_mutex_; NativeImmediateQueue native_immediates_threadsafe_; From f2c91810c68fc33877e64a7ed674cb2984c185db Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Thu, 7 May 2020 01:25:38 +0200 Subject: [PATCH 2/3] fixup! src: split out callback queue implementation from Environment --- src/callback_queue-inl.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/callback_queue-inl.h b/src/callback_queue-inl.h index e135e584b34eab..f887595a077b58 100644 --- a/src/callback_queue-inl.h +++ b/src/callback_queue-inl.h @@ -72,7 +72,8 @@ CallbackQueue::Callback::get_next() { } template -void CallbackQueue::Callback::set_next(std::unique_ptr next) { +void CallbackQueue::Callback::set_next( + std::unique_ptr next) { next_ = std::move(next); } From a9470f0976a3e7d094ab96553140fe36a8327542 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Thu, 7 May 2020 01:47:06 +0200 Subject: [PATCH 3/3] fixup! src: split out callback queue implementation from Environment --- src/callback_queue-inl.h | 2 +- src/callback_queue.h | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/callback_queue-inl.h b/src/callback_queue-inl.h index f887595a077b58..e83c81cd0dd802 100644 --- a/src/callback_queue-inl.h +++ b/src/callback_queue-inl.h @@ -86,7 +86,7 @@ CallbackQueue::CallbackImpl::CallbackImpl( template template -R CallbackQueue::CallbackImpl::Call(Args&&... args) { +R CallbackQueue::CallbackImpl::Call(Args... args) { return callback_(std::forward(args)...); } diff --git a/src/callback_queue.h b/src/callback_queue.h index dd626442aabff0..ebf975e6391d13 100644 --- a/src/callback_queue.h +++ b/src/callback_queue.h @@ -7,6 +7,12 @@ namespace node { +// A queue of C++ functions that take Args... as arguments and return R +// (this is similar to the signature of std::function). +// New entries are added using `CreateCallback()`/`Push()`, and removed using +// `Shift()`. +// The `refed` flag is left for easier use in situations in which some of these +// should be run even if nothing else is keeping the event loop alive. template class CallbackQueue { public: @@ -15,7 +21,7 @@ class CallbackQueue { explicit inline Callback(bool refed); virtual ~Callback() = default; - virtual R Call(Args&&... args) = 0; + virtual R Call(Args... args) = 0; inline bool is_refed() const; @@ -46,7 +52,7 @@ class CallbackQueue { class CallbackImpl final : public Callback { public: CallbackImpl(Fn&& callback, bool refed); - R Call(Args&&... args) override; + R Call(Args... args) override; private: Fn callback_;