-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Fix import queue thread pool shutdown #4929
Conversation
// Flush the queue and close the receiver to terminate the future. | ||
let _ = self.sender.unbounded_send(ToWorkerMsg::Shutdown); | ||
let (_, closed) = buffered_link::buffered_link(); | ||
drop(std::mem::replace(&mut self.result_port, closed)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced with close
fn drop(&mut self) { | ||
drop(self.pool.take()); | ||
// Flush the queue and close the receiver to terminate the future. | ||
let _ = self.sender.unbounded_send(ToWorkerMsg::Shutdown); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should just close it: https://docs.rs/futures/0.3.4/futures/channel/mpsc/struct.UnboundedSender.html#method.close_channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let _ = self.sender.unbounded_send(ToWorkerMsg::Shutdown); | |
self.sender.close_channel(); |
let mut pool = futures::executor::ThreadPool::builder() | ||
.name_prefix("import-queue-worker-") | ||
.pool_size(1) | ||
.pool_size(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we increase the size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there just one worker anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was testing if thread tracking code is working properly. Eventually there should be more workers, but this is out of scope of this PR.
Also fixed an issue with slog global guard being disposed too early. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to use close_channel
for the sender ;) Otherwise it looks good.
@@ -144,6 +178,7 @@ enum ToWorkerMsg<B: BlockT> { | |||
ImportBlocks(BlockOrigin, Vec<IncomingBlock<B>>), | |||
ImportJustification(Origin, B::Hash, NumberFor<B>, Justification), | |||
ImportFinalityProof(Origin, B::Hash, NumberFor<B>, Vec<u8>), | |||
Shutdown, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shutdown, |
@@ -239,6 +274,7 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> { | |||
ToWorkerMsg::ImportJustification(who, hash, number, justification) => { | |||
worker.import_justification(who, hash, number, justification); | |||
} | |||
ToWorkerMsg::Shutdown => return Poll::Ready(()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ToWorkerMsg::Shutdown => return Poll::Ready(()), |
fn drop(&mut self) { | ||
drop(self.pool.take()); | ||
// Flush the queue and close the receiver to terminate the future. | ||
let _ = self.sender.unbounded_send(ToWorkerMsg::Shutdown); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let _ = self.sender.unbounded_send(ToWorkerMsg::Shutdown); | |
self.sender.close_channel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we maybe add some test that takes some fake block import which just waits 30secs and we drop the BasicQueue
directly.
@@ -40,9 +41,28 @@ pub struct BasicQueue<B: BlockT, Transaction> { | |||
manual_poll: Option<Pin<Box<dyn Future<Output = ()> + Send>>>, | |||
/// A thread pool where the background worker is being run. | |||
pool: Option<futures::executor::ThreadPool>, | |||
pool_guard: Arc<(Mutex<usize>, Condvar)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to use a https://docs.rs/crossbeam/0.7.3/crossbeam/sync/struct.WaitGroup.html?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- There's no control over the lifetimes of objects passed to
ThreadPoolBuilder
. It uses some reference counting internally and relying on implementation details seems wrong. - Don't want to introduce another dependency over a trivial thing.
* Fix import queue thread pool shutdown * Make sure runtime is disposed before telemetry * Close channel istead of sending a message * Fixed test
Due to the way rs-futures
ThreadPool
is implemented, Importing threads may outlive the queue instance and even the main thread. This leads to a race on closing rocksdb database vs cleaning the C++ runtime in the main thread, causing segfaults and preventing the database from being closed properly.This PR adds explicit synchronization between importing threads and the queue shutdown.
Fixes #4913