Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Fix import queue thread pool shutdown #4929

Merged
merged 4 commits into from
Feb 17, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/cli/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,14 @@ where

// we eagerly drop the service so that the internal exit future is fired,
// but we need to keep holding a reference to the global telemetry guard
// and drop the runtime first.
let _telemetry = service.telemetry();

let f = service.fuse();
pin_mut!(f);

runtime.block_on(main(f)).map_err(|e| e.to_string())?;
drop(runtime);

Ok(())
}
36 changes: 35 additions & 1 deletion primitives/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use std::{mem, pin::Pin, time::Duration, marker::PhantomData};
use std::{mem, pin::Pin, time::Duration, marker::PhantomData, sync::Arc};
use futures::{prelude::*, channel::mpsc, task::Context, task::Poll};
use futures_timer::Delay;
use parking_lot::{Mutex, Condvar};
use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}};

use crate::block_import::BlockOrigin;
Expand All @@ -40,9 +41,28 @@ pub struct BasicQueue<B: BlockT, Transaction> {
manual_poll: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// A thread pool where the background worker is being run.
pool: Option<futures::executor::ThreadPool>,
pool_guard: Arc<(Mutex<usize>, Condvar)>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. There's no control over the lifetimes of objects passed to ThreadPoolBuilder. It uses some reference counting internally and relying on implementation details seems wrong.
  2. Don't want to introduce another dependency over a trivial thing.

_phantom: PhantomData<Transaction>,
}

impl<B: BlockT, Transaction> Drop for BasicQueue<B, Transaction> {
fn drop(&mut self) {
self.pool = None;
// Flush the queue and close the receiver to terminate the future.
self.sender.close_channel();
self.result_port.close();

// Make sure all pool threads terminate.
// https://github.com/rust-lang/futures-rs/issues/1470
// https://github.com/rust-lang/futures-rs/issues/1349
let (ref mutex, ref condvar) = *self.pool_guard;
let mut lock = mutex.lock();
while *lock != 0 {
condvar.wait(&mut lock);
}
}
}

impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
/// Instantiate a new basic queue, with given verifier.
///
Expand All @@ -63,9 +83,22 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
finality_proof_import,
);

let guard = Arc::new((Mutex::new(0usize), Condvar::new()));
let guard_start = guard.clone();
let guard_end = guard.clone();

let mut pool = futures::executor::ThreadPool::builder()
.name_prefix("import-queue-worker-")
.pool_size(1)
.after_start(move |_| *guard_start.0.lock() += 1)
.before_stop(move |_| {
let (ref mutex, ref condvar) = *guard_end;
let mut lock = mutex.lock();
*lock -= 1;
if *lock == 0 {
condvar.notify_one();
}
})
.create()
.ok();

Expand All @@ -82,6 +115,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
result_port,
manual_poll,
pool,
pool_guard: guard,
_phantom: PhantomData,
}
}
Expand Down
5 changes: 5 additions & 0 deletions primitives/consensus/common/src/import_queue/buffered_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ impl<B: BlockT> BufferedLinkReceiver<B> {
}
}
}

/// Close the channel.
pub fn close(&mut self) {
self.rx.close()
}
}

#[cfg(test)]
Expand Down