Skip to content

Commit

Permalink
Require chain::Listen impls in block sync be Send + Sync
Browse files Browse the repository at this point in the history
Users who want to use `lightning-block-sync`'s `init` module would
be reasonable in wanting to use it in a multithreaded environment,
however because it takes a list of listeners as `dyn chain::Listen`
without any `Send` or `Sync` bound they fail in doing so.

Here we add a `Send + Sync` bound, requiring any listeners be both.
This could be less generic for users with their own `chain::Listen`
listener that is *not* `Send` or `Sync`, but given multi-threading
support is important and the LDK-included `chain::Listen`
implementations are `Send + Sync`, this seems like an acceptable
tradeoff.
  • Loading branch information
TheBlueMatt committed Mar 9, 2022
1 parent 6259e7a commit 6cc0cd7
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 26 deletions.
30 changes: 15 additions & 15 deletions lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ BlockSourceResult<ValidatedBlockHeader> {
/// let mut cache = UnboundedCache::new();
/// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger);
/// let listeners = vec![
/// (monitor_block_hash, &monitor_listener as &dyn chain::Listen),
/// (manager_block_hash, &manager as &dyn chain::Listen),
/// (monitor_block_hash, &monitor_listener as &(dyn chain::Listen + Send + Sync)),
/// (manager_block_hash, &manager as &(dyn chain::Listen + Send + Sync)),
/// ];
/// let chain_tip = init::synchronize_listeners(
/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap();
Expand All @@ -125,7 +125,7 @@ pub async fn synchronize_listeners<B: BlockSource, C: Cache>(
block_source: &mut B,
network: Network,
header_cache: &mut C,
mut chain_listeners: Vec<(BlockHash, &dyn chain::Listen)>,
mut chain_listeners: Vec<(BlockHash, &(dyn chain::Listen + Send + Sync))>,
) -> BlockSourceResult<ValidatedBlockHeader> {
let best_header = validate_best_block_header(block_source).await?;

Expand Down Expand Up @@ -198,7 +198,7 @@ impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> {
}

/// Wrapper for supporting dynamically sized chain listeners.
struct DynamicChainListener<'a>(&'a dyn chain::Listen);
struct DynamicChainListener<'a>(&'a (dyn chain::Listen + Send + Sync));

impl<'a> chain::Listen for DynamicChainListener<'a> {
fn block_connected(&self, _block: &Block, _height: u32) {
Expand All @@ -211,7 +211,7 @@ impl<'a> chain::Listen for DynamicChainListener<'a> {
}

/// A set of dynamically sized chain listeners, each paired with a starting block height.
struct ChainListenerSet<'a>(Vec<(u32, &'a dyn chain::Listen)>);
struct ChainListenerSet<'a>(Vec<(u32, &'a (dyn chain::Listen + Send + Sync))>);

impl<'a> chain::Listen for ChainListenerSet<'a> {
fn block_connected(&self, block: &Block, height: u32) {
Expand Down Expand Up @@ -249,9 +249,9 @@ mod tests {
.expect_block_connected(*chain.at_height(4));

let listeners = vec![
(chain.at_height(1).block_hash, &listener_1 as &dyn chain::Listen),
(chain.at_height(2).block_hash, &listener_2 as &dyn chain::Listen),
(chain.at_height(3).block_hash, &listener_3 as &dyn chain::Listen),
(chain.at_height(1).block_hash, &listener_1 as &(dyn chain::Listen + Send + Sync)),
(chain.at_height(2).block_hash, &listener_2 as &(dyn chain::Listen + Send + Sync)),
(chain.at_height(3).block_hash, &listener_3 as &(dyn chain::Listen + Send + Sync)),
];
let mut cache = chain.header_cache(0..=4);
match synchronize_listeners(&mut chain, Network::Bitcoin, &mut cache, listeners).await {
Expand Down Expand Up @@ -284,9 +284,9 @@ mod tests {
.expect_block_connected(*main_chain.at_height(4));

let listeners = vec![
(fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen),
(fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen),
(fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen),
(fork_chain_1.tip().block_hash, &listener_1 as &(dyn chain::Listen + Send + Sync)),
(fork_chain_2.tip().block_hash, &listener_2 as &(dyn chain::Listen + Send + Sync)),
(fork_chain_3.tip().block_hash, &listener_3 as &(dyn chain::Listen + Send + Sync)),
];
let mut cache = fork_chain_1.header_cache(2..=4);
cache.extend(fork_chain_2.header_cache(3..=4));
Expand Down Expand Up @@ -327,9 +327,9 @@ mod tests {
.expect_block_connected(*main_chain.at_height(4));

let listeners = vec![
(fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen),
(fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen),
(fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen),
(fork_chain_1.tip().block_hash, &listener_1 as &(dyn chain::Listen + Send + Sync)),
(fork_chain_2.tip().block_hash, &listener_2 as &(dyn chain::Listen + Send + Sync)),
(fork_chain_3.tip().block_hash, &listener_3 as &(dyn chain::Listen + Send + Sync)),
];
let mut cache = fork_chain_1.header_cache(2..=4);
cache.extend(fork_chain_2.header_cache(3..=4));
Expand All @@ -351,7 +351,7 @@ mod tests {
.expect_block_disconnected(*old_tip)
.expect_block_connected(*new_tip);

let listeners = vec![(old_tip.block_hash, &listener as &dyn chain::Listen)];
let listeners = vec![(old_tip.block_hash, &listener as &(dyn chain::Listen + Send + Sync))];
let mut cache = fork_chain.header_cache(2..=2);
match synchronize_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await {
Ok(_) => {
Expand Down
22 changes: 11 additions & 11 deletions lightning-block-sync/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bitcoin::util::uint::Uint256;

use lightning::chain;

use std::cell::RefCell;
use std::sync::Mutex;
use std::collections::VecDeque;

#[derive(Default)]
Expand Down Expand Up @@ -171,32 +171,32 @@ impl chain::Listen for NullChainListener {
}

pub struct MockChainListener {
expected_blocks_connected: RefCell<VecDeque<BlockHeaderData>>,
expected_blocks_disconnected: RefCell<VecDeque<BlockHeaderData>>,
expected_blocks_connected: Mutex<VecDeque<BlockHeaderData>>,
expected_blocks_disconnected: Mutex<VecDeque<BlockHeaderData>>,
}

impl MockChainListener {
pub fn new() -> Self {
Self {
expected_blocks_connected: RefCell::new(VecDeque::new()),
expected_blocks_disconnected: RefCell::new(VecDeque::new()),
expected_blocks_connected: Mutex::new(VecDeque::new()),
expected_blocks_disconnected: Mutex::new(VecDeque::new()),
}
}

pub fn expect_block_connected(self, block: BlockHeaderData) -> Self {
self.expected_blocks_connected.borrow_mut().push_back(block);
self.expected_blocks_connected.lock().unwrap().push_back(block);
self
}

pub fn expect_block_disconnected(self, block: BlockHeaderData) -> Self {
self.expected_blocks_disconnected.borrow_mut().push_back(block);
self.expected_blocks_disconnected.lock().unwrap().push_back(block);
self
}
}

impl chain::Listen for MockChainListener {
fn block_connected(&self, block: &Block, height: u32) {
match self.expected_blocks_connected.borrow_mut().pop_front() {
match self.expected_blocks_connected.lock().unwrap().pop_front() {
None => {
panic!("Unexpected block connected: {:?}", block.block_hash());
},
Expand All @@ -208,7 +208,7 @@ impl chain::Listen for MockChainListener {
}

fn block_disconnected(&self, header: &BlockHeader, height: u32) {
match self.expected_blocks_disconnected.borrow_mut().pop_front() {
match self.expected_blocks_disconnected.lock().unwrap().pop_front() {
None => {
panic!("Unexpected block disconnected: {:?}", header.block_hash());
},
Expand All @@ -226,12 +226,12 @@ impl Drop for MockChainListener {
return;
}

let expected_blocks_connected = self.expected_blocks_connected.borrow();
let expected_blocks_connected = self.expected_blocks_connected.lock().unwrap();
if !expected_blocks_connected.is_empty() {
panic!("Expected blocks connected: {:?}", expected_blocks_connected);
}

let expected_blocks_disconnected = self.expected_blocks_disconnected.borrow();
let expected_blocks_disconnected = self.expected_blocks_disconnected.lock().unwrap();
if !expected_blocks_disconnected.is_empty() {
panic!("Expected blocks disconnected: {:?}", expected_blocks_disconnected);
}
Expand Down

0 comments on commit 6cc0cd7

Please sign in to comment.