Skip to content

Commit

Permalink
Immutable BlockSource interface
Browse files Browse the repository at this point in the history
Querying a BlockSource is a logically immutable operation. Use non-mut
references in its interface to reflect this, which allows for users to
hold multiple references if desired.
  • Loading branch information
jkczyz committed Feb 14, 2022
1 parent 963f8d9 commit 444d8fb
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 54 deletions.
1 change: 1 addition & 0 deletions lightning-block-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rpc-client = [ "serde", "serde_json", "chunked_transfer" ]
[dependencies]
bitcoin = "0.27"
lightning = { version = "0.0.104", path = "../lightning" }
futures = { version = "0.3" }
tokio = { version = "1.0", features = [ "io-util", "net", "time" ], optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = { version = "1.0", optional = true }
Expand Down
6 changes: 3 additions & 3 deletions lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use lightning::chain;
/// start when there are no chain listeners to sync yet.
///
/// [`SpvClient`]: crate::SpvClient
pub async fn validate_best_block_header<B: BlockSource>(block_source: &mut B) ->
pub async fn validate_best_block_header<B: BlockSource>(block_source: &B) ->
BlockSourceResult<ValidatedBlockHeader> {
let (best_block_hash, best_block_height) = block_source.get_best_block().await?;
block_source
Expand Down Expand Up @@ -67,7 +67,7 @@ BlockSourceResult<ValidatedBlockHeader> {
/// C: chain::Filter,
/// P: chainmonitor::Persist<S>,
/// >(
/// block_source: &mut B,
/// block_source: &B,
/// chain_monitor: &ChainMonitor<S, &C, &T, &F, &L, &P>,
/// config: UserConfig,
/// keys_manager: &K,
Expand Down Expand Up @@ -122,7 +122,7 @@ BlockSourceResult<ValidatedBlockHeader> {
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
pub async fn synchronize_listeners<B: BlockSource, C: Cache>(
block_source: &mut B,
block_source: &B,
network: Network,
header_cache: &mut C,
mut chain_listeners: Vec<(BlockHash, &dyn chain::Listen)>,
Expand Down
6 changes: 3 additions & 3 deletions lightning-block-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ pub trait BlockSource : Sync + Send {
///
/// Implementations that cannot find headers based on the hash should return a `Transient` error
/// when `height_hint` is `None`.
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData>;
fn get_header<'a>(&'a self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData>;

/// Returns the block for a given hash. A headers-only block source should return a `Transient`
/// error.
fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>;
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>;

/// Returns the hash of the best block and, optionally, its height.
///
/// When polling a block source, [`Poll`] implementations may pass the height to [`get_header`]
/// to allow for a more efficient lookup.
///
/// [`get_header`]: Self::get_header
fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)>;
fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)>;
}

/// Result type for `BlockSource` requests.
Expand Down
44 changes: 22 additions & 22 deletions lightning-block-sync/src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::network::constants::Network;

use std::ops::DerefMut;
use std::ops::Deref;

/// The `Poll` trait defines behavior for polling block sources for a chain tip and retrieving
/// related chain data. It serves as an adapter for `BlockSource`.
Expand All @@ -17,15 +17,15 @@ use std::ops::DerefMut;
/// [`ChainPoller`]: ../struct.ChainPoller.html
pub trait Poll {
/// Returns a chain tip in terms of its relationship to the provided chain tip.
fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) ->
fn poll_chain_tip<'a>(&'a self, best_known_chain_tip: ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ChainTip>;

/// Returns the header that preceded the given header in the chain.
fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
fn look_up_previous_header<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlockHeader>;

/// Returns the block associated with the given header.
fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
fn fetch_block<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlock>;
}

Expand Down Expand Up @@ -170,12 +170,12 @@ mod sealed {
///
/// Other `Poll` implementations should be built using `ChainPoller` as it provides the simplest way
/// of validating chain data and checking consistency.
pub struct ChainPoller<B: DerefMut<Target=T> + Sized, T: BlockSource> {
pub struct ChainPoller<B: Deref<Target=T> + Sized, T: BlockSource> {
block_source: B,
network: Network,
}

impl<B: DerefMut<Target=T> + Sized, T: BlockSource> ChainPoller<B, T> {
impl<B: Deref<Target=T> + Sized, T: BlockSource> ChainPoller<B, T> {
/// Creates a new poller for the given block source.
///
/// If the `network` parameter is mainnet, then the difficulty between blocks is checked for
Expand All @@ -185,8 +185,8 @@ impl<B: DerefMut<Target=T> + Sized, T: BlockSource> ChainPoller<B, T> {
}
}

impl<B: DerefMut<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for ChainPoller<B, T> {
fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) ->
impl<B: Deref<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for ChainPoller<B, T> {
fn poll_chain_tip<'a>(&'a self, best_known_chain_tip: ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ChainTip>
{
Box::pin(async move {
Expand All @@ -206,7 +206,7 @@ impl<B: DerefMut<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for Chain
})
}

fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
fn look_up_previous_header<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlockHeader>
{
Box::pin(async move {
Expand All @@ -225,7 +225,7 @@ impl<B: DerefMut<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for Chain
})
}

fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
fn fetch_block<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlock>
{
Box::pin(async move {
Expand All @@ -249,7 +249,7 @@ mod tests {
let best_known_chain_tip = chain.tip();
chain.disconnect_tip();

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Transient);
Expand All @@ -261,10 +261,10 @@ mod tests {

#[tokio::test]
async fn poll_chain_without_headers() {
let mut chain = Blockchain::default().with_height(1).without_headers();
let chain = Blockchain::default().with_height(1).without_headers();
let best_known_chain_tip = chain.at_height(0);

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
Expand All @@ -283,7 +283,7 @@ mod tests {
chain.blocks.last_mut().unwrap().header.bits =
BlockHeader::compact_target_from_u256(&Uint256::from_be_bytes([0; 32]));

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
Expand All @@ -295,10 +295,10 @@ mod tests {

#[tokio::test]
async fn poll_chain_with_malformed_headers() {
let mut chain = Blockchain::default().with_height(1).malformed_headers();
let chain = Blockchain::default().with_height(1).malformed_headers();
let best_known_chain_tip = chain.at_height(0);

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
Expand All @@ -310,10 +310,10 @@ mod tests {

#[tokio::test]
async fn poll_chain_with_common_tip() {
let mut chain = Blockchain::default().with_height(0);
let chain = Blockchain::default().with_height(0);
let best_known_chain_tip = chain.tip();

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Common),
Expand All @@ -330,7 +330,7 @@ mod tests {
let worse_chain_tip = chain.tip();
assert_eq!(best_known_chain_tip.chainwork, worse_chain_tip.chainwork);

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)),
Expand All @@ -345,7 +345,7 @@ mod tests {
chain.disconnect_tip();
let worse_chain_tip = chain.tip();

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)),
Expand All @@ -354,12 +354,12 @@ mod tests {

#[tokio::test]
async fn poll_chain_with_better_tip() {
let mut chain = Blockchain::default().with_height(1);
let chain = Blockchain::default().with_height(1);
let best_known_chain_tip = chain.at_height(0);

let better_chain_tip = chain.tip();

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Better(better_chain_tip)),
Expand Down
22 changes: 12 additions & 10 deletions lightning-block-sync/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,51 @@ use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;

use futures::lock::Mutex;

use std::convert::TryFrom;
use std::convert::TryInto;

/// A simple REST client for requesting resources using HTTP `GET`.
pub struct RestClient {
endpoint: HttpEndpoint,
client: HttpClient,
client: Mutex<HttpClient>,
}

impl RestClient {
/// Creates a new REST client connected to the given endpoint.
///
/// The endpoint should contain the REST path component (e.g., http://127.0.0.1:8332/rest).
pub fn new(endpoint: HttpEndpoint) -> std::io::Result<Self> {
let client = HttpClient::connect(&endpoint)?;
let client = Mutex::new(HttpClient::connect(&endpoint)?);
Ok(Self { endpoint, client })
}

/// Requests a resource encoded in `F` format and interpreted as type `T`.
pub async fn request_resource<F, T>(&mut self, resource_path: &str) -> std::io::Result<T>
pub async fn request_resource<F, T>(&self, resource_path: &str) -> std::io::Result<T>
where F: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
let uri = format!("{}/{}", self.endpoint.path().trim_end_matches("/"), resource_path);
self.client.get::<F>(&uri, &host).await?.try_into()
self.client.lock().await.get::<F>(&uri, &host).await?.try_into()
}
}

impl BlockSource for RestClient {
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
Box::pin(async move {
let resource_path = format!("headers/1/{}.json", header_hash.to_hex());
Ok(self.request_resource::<JsonResponse, _>(&resource_path).await?)
})
}

fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
Box::pin(async move {
let resource_path = format!("block/{}.bin", header_hash.to_hex());
Ok(self.request_resource::<BinaryResponse, _>(&resource_path).await?)
})
}

fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
Box::pin(async move {
Ok(self.request_resource::<JsonResponse, _>("chaininfo.json").await?)
})
Expand Down Expand Up @@ -81,7 +83,7 @@ mod tests {
#[tokio::test]
async fn request_unknown_resource() {
let server = HttpServer::responding_with_not_found();
let mut client = RestClient::new(server.endpoint()).unwrap();
let client = RestClient::new(server.endpoint()).unwrap();

match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
Expand All @@ -92,7 +94,7 @@ mod tests {
#[tokio::test]
async fn request_malformed_resource() {
let server = HttpServer::responding_with_ok(MessageBody::Content("foo"));
let mut client = RestClient::new(server.endpoint()).unwrap();
let client = RestClient::new(server.endpoint()).unwrap();

match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidData),
Expand All @@ -103,7 +105,7 @@ mod tests {
#[tokio::test]
async fn request_valid_resource() {
let server = HttpServer::responding_with_ok(MessageBody::Content(42));
let mut client = RestClient::new(server.endpoint()).unwrap();
let client = RestClient::new(server.endpoint()).unwrap();

match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => panic!("Unexpected error: {:?}", e),
Expand Down
Loading

0 comments on commit 444d8fb

Please sign in to comment.