diff --git a/client/cli/src/runtime.rs b/client/cli/src/runtime.rs index 157b75f2050d1..eccf240f20ff3 100644 --- a/client/cli/src/runtime.rs +++ b/client/cli/src/runtime.rs @@ -128,12 +128,14 @@ where // we eagerly drop the service so that the internal exit future is fired, // but we need to keep holding a reference to the global telemetry guard + // and drop the runtime first. let _telemetry = service.telemetry(); let f = service.fuse(); pin_mut!(f); runtime.block_on(main(f)).map_err(|e| e.to_string())?; + drop(runtime); Ok(()) } diff --git a/client/consensus/babe/src/tests.rs b/client/consensus/babe/src/tests.rs index 687f23e646f66..7d71a677a6b5c 100644 --- a/client/consensus/babe/src/tests.rs +++ b/client/consensus/babe/src/tests.rs @@ -229,7 +229,7 @@ impl Verifier for TestVerifier { ) -> Result<(BlockImportParams, Option)>>), String> { // apply post-sealing mutations (i.e. stripping seal, if desired). (self.mutator)(&mut header, Stage::PostSeal); - Ok(self.inner.verify(origin, header, justification, body).expect("verification failed!")) + self.inner.verify(origin, header, justification, body) } } @@ -423,7 +423,14 @@ fn run_one_test( } runtime.spawn(futures01::future::poll_fn(move || { - net.lock().poll(); + let mut net = net.lock(); + net.poll(); + for p in net.peers() { + for (h, e) in p.failed_verifications() { + panic!("Verification failed for {:?}: {}", h, e); + } + } + Ok::<_, ()>(futures01::Async::NotReady::<()>) })); diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 22e340a2a81b3..fc706dfdce0b8 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -191,7 +191,7 @@ pub struct Peer> { client: PeersClient, /// We keep a copy of the verifier so that we can invoke it for locally-generated blocks, /// instead of going through the import queue. - verifier: VerifierAdapter>, + verifier: VerifierAdapter, /// We keep a copy of the block_import so that we can invoke it for locally-generated blocks, /// instead of going through the import queue. block_import: BlockImportAdapter<()>, @@ -368,6 +368,11 @@ impl> Peer { |backend| backend.blocks_count() ).unwrap_or(0) } + + /// Return a collection of block hashes that failed verification + pub fn failed_verifications(&self) -> HashMap<::Hash, String> { + self.verifier.failed_verifications.lock().clone() + } } pub struct EmptyTransactionPool; @@ -493,15 +498,13 @@ impl BlockImport for BlockImportAdapter { } /// Implements `Verifier` on an `Arc>`. Used internally. -struct VerifierAdapter(Arc>>); - -impl Clone for VerifierAdapter { - fn clone(&self) -> Self { - VerifierAdapter(self.0.clone()) - } +#[derive(Clone)] +struct VerifierAdapter { + verifier: Arc>>>, + failed_verifications: Arc>>, } -impl> Verifier for VerifierAdapter { +impl Verifier for VerifierAdapter { fn verify( &mut self, origin: BlockOrigin, @@ -509,7 +512,20 @@ impl> Verifier for VerifierAdapter { justification: Option, body: Option> ) -> Result<(BlockImportParams, Option)>>), String> { - self.0.lock().verify(origin, header, justification, body) + let hash = header.hash(); + self.verifier.lock().verify(origin, header, justification, body).map_err(|e| { + self.failed_verifications.lock().insert(hash, e.clone()); + e + }) + } +} + +impl VerifierAdapter { + fn new(verifier: Arc>>>) -> VerifierAdapter { + VerifierAdapter { + verifier, + failed_verifications: Default::default(), + } } } @@ -600,7 +616,7 @@ pub trait TestNetFactory: Sized { config, &data, ); - let verifier = VerifierAdapter(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); + let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); let import_queue = Box::new(BasicQueue::new( verifier.clone(), @@ -676,7 +692,7 @@ pub trait TestNetFactory: Sized { &config, &data, ); - let verifier = VerifierAdapter(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); + let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); let import_queue = Box::new(BasicQueue::new( verifier.clone(), diff --git a/primitives/consensus/common/src/import_queue/basic_queue.rs b/primitives/consensus/common/src/import_queue/basic_queue.rs index 63ba16b658c01..bd42ebd6850f7 100644 --- a/primitives/consensus/common/src/import_queue/basic_queue.rs +++ b/primitives/consensus/common/src/import_queue/basic_queue.rs @@ -14,9 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use std::{mem, pin::Pin, time::Duration, marker::PhantomData}; +use std::{mem, pin::Pin, time::Duration, marker::PhantomData, sync::Arc}; use futures::{prelude::*, channel::mpsc, task::Context, task::Poll}; use futures_timer::Delay; +use parking_lot::{Mutex, Condvar}; use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}}; use crate::block_import::BlockOrigin; @@ -40,9 +41,28 @@ pub struct BasicQueue { manual_poll: Option + Send>>>, /// A thread pool where the background worker is being run. pool: Option, + pool_guard: Arc<(Mutex, Condvar)>, _phantom: PhantomData, } +impl Drop for BasicQueue { + fn drop(&mut self) { + self.pool = None; + // Flush the queue and close the receiver to terminate the future. + self.sender.close_channel(); + self.result_port.close(); + + // Make sure all pool threads terminate. + // https://github.com/rust-lang/futures-rs/issues/1470 + // https://github.com/rust-lang/futures-rs/issues/1349 + let (ref mutex, ref condvar) = *self.pool_guard; + let mut lock = mutex.lock(); + while *lock != 0 { + condvar.wait(&mut lock); + } + } +} + impl BasicQueue { /// Instantiate a new basic queue, with given verifier. /// @@ -63,9 +83,22 @@ impl BasicQueue { finality_proof_import, ); + let guard = Arc::new((Mutex::new(0usize), Condvar::new())); + let guard_start = guard.clone(); + let guard_end = guard.clone(); + let mut pool = futures::executor::ThreadPool::builder() .name_prefix("import-queue-worker-") .pool_size(1) + .after_start(move |_| *guard_start.0.lock() += 1) + .before_stop(move |_| { + let (ref mutex, ref condvar) = *guard_end; + let mut lock = mutex.lock(); + *lock -= 1; + if *lock == 0 { + condvar.notify_one(); + } + }) .create() .ok(); @@ -82,6 +115,7 @@ impl BasicQueue { result_port, manual_poll, pool, + pool_guard: guard, _phantom: PhantomData, } } diff --git a/primitives/consensus/common/src/import_queue/buffered_link.rs b/primitives/consensus/common/src/import_queue/buffered_link.rs index 143ab0eef8090..d0f6c87951354 100644 --- a/primitives/consensus/common/src/import_queue/buffered_link.rs +++ b/primitives/consensus/common/src/import_queue/buffered_link.rs @@ -157,6 +157,11 @@ impl BufferedLinkReceiver { } } } + + /// Close the channel. + pub fn close(&mut self) { + self.rx.close() + } } #[cfg(test)]