Skip to content

Commit

Permalink
feat!: implement retryable calls
Browse files Browse the repository at this point in the history
  • Loading branch information
ValuedMammal committed Sep 7, 2024
1 parent a660346 commit 1f67ece
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ name = "esplora_client"
path = "src/lib.rs"

[dependencies]
async-std = "1.13.0"
serde = { version = "1.0", features = ["derive"] }
bitcoin = { version = "0.32", features = ["serde", "std"], default-features = false }
hex = { version = "0.2", package = "hex-conservative" }
Expand Down
55 changes: 47 additions & 8 deletions src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;

use async_std::task;
use bitcoin::consensus::{deserialize, serialize, Decodable, Encodable};
use bitcoin::hashes::{sha256, Hash};
use bitcoin::hex::{DisplayHex, FromHex};
Expand All @@ -24,16 +26,21 @@ use bitcoin::{
#[allow(unused_imports)]
use log::{debug, error, info, trace};

use reqwest::{header, Client};
use reqwest::{header, Client, Response};

use crate::{BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus};
use crate::{
BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus,
RETRYABLE_ERROR_CODES,
};

#[derive(Debug, Clone)]
pub struct AsyncClient {
/// The URL of the Esplora Server.
url: String,
/// The inner [`reqwest::Client`] to make HTTP requests.
client: Client,
backoff: Duration,
max_retries: u32,
}

impl AsyncClient {
Expand Down Expand Up @@ -63,12 +70,22 @@ impl AsyncClient {
client_builder = client_builder.default_headers(headers);
}

Ok(Self::from_client(builder.base_url, client_builder.build()?))
Ok(AsyncClient {
url: builder.base_url,
client: client_builder.build()?,
backoff: builder.backoff,
max_retries: builder.max_retries,
})
}

/// Build an async client from the base url and [`Client`]
pub fn from_client(url: String, client: Client) -> Self {
AsyncClient { url, client }
AsyncClient {
url,
client,
backoff: crate::DEFAULT_BACKOFF,
max_retries: crate::DEFAULT_MAX_RETRIES,
}
}

/// Make an HTTP GET request to given URL, deserializing to any `T` that
Expand All @@ -84,7 +101,7 @@ impl AsyncClient {
/// [`bitcoin::consensus::Decodable`] deserialization.
async fn get_response<T: Decodable>(&self, path: &str) -> Result<T, Error> {
let url = format!("{}{}", self.url, path);
let response = self.client.get(url).send().await?;
let response = self.get_with_retry(&url).await?;

if !response.status().is_success() {
return Err(Error::HttpResponse {
Expand Down Expand Up @@ -127,7 +144,7 @@ impl AsyncClient {
path: &str,
) -> Result<T, Error> {
let url = format!("{}{}", self.url, path);
let response = self.client.get(url).send().await?;
let response = self.get_with_retry(&url).await?;

if !response.status().is_success() {
return Err(Error::HttpResponse {
Expand Down Expand Up @@ -172,7 +189,7 @@ impl AsyncClient {
/// [`bitcoin::consensus::Decodable`] deserialization.
async fn get_response_hex<T: Decodable>(&self, path: &str) -> Result<T, Error> {
let url = format!("{}{}", self.url, path);
let response = self.client.get(url).send().await?;
let response = self.get_with_retry(&url).await?;

if !response.status().is_success() {
return Err(Error::HttpResponse {
Expand Down Expand Up @@ -212,7 +229,7 @@ impl AsyncClient {
/// This function will return an error either from the HTTP client.
async fn get_response_text(&self, path: &str) -> Result<String, Error> {
let url = format!("{}{}", self.url, path);
let response = self.client.get(url).send().await?;
let response = self.get_with_retry(&url).await?;

if !response.status().is_success() {
return Err(Error::HttpResponse {
Expand Down Expand Up @@ -422,4 +439,26 @@ impl AsyncClient {
pub fn client(&self) -> &Client {
&self.client
}

/// Sends a GET request to the given `url`, retrying failed attempts
/// for retryable error codes until max retries hit.
async fn get_with_retry(&self, url: &str) -> Result<Response, Error> {
let mut attempts = 0;
let mut delay = self.backoff;

loop {
match self.client.get(url).send().await {
Ok(resp)
if attempts < self.max_retries
&& RETRYABLE_ERROR_CODES.contains(&resp.status().as_u16()) =>
{
task::sleep(delay).await;
attempts += 1;
delay *= 2;
}
Ok(resp) => return Ok(resp),
Err(e) => return Err(Error::Reqwest(e)),
}
}
}
}
65 changes: 49 additions & 16 deletions src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::str::FromStr;
use std::thread;
use std::time::Duration;

#[allow(unused_imports)]
use log::{debug, error, info, trace};

use minreq::{Proxy, Request};
use minreq::{Proxy, Request, Response};

use bitcoin::consensus::{deserialize, serialize, Decodable};
use bitcoin::hashes::{sha256, Hash};
Expand All @@ -27,7 +29,10 @@ use bitcoin::{
block::Header as BlockHeader, Block, BlockHash, MerkleBlock, Script, Transaction, Txid,
};

use crate::{BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus};
use crate::{
BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus,
RETRYABLE_ERROR_CODES,
};

#[derive(Debug, Clone)]
pub struct BlockingClient {
Expand All @@ -39,6 +44,10 @@ pub struct BlockingClient {
pub timeout: Option<u64>,
/// HTTP headers to set on every request made to Esplora server
pub headers: HashMap<String, String>,
/// Backoff
pub backoff: Duration,
/// Max retries
pub max_retries: u32,
}

impl BlockingClient {
Expand All @@ -49,6 +58,8 @@ impl BlockingClient {
proxy: builder.proxy,
timeout: builder.timeout,
headers: builder.headers,
backoff: builder.backoff,
max_retries: builder.max_retries,
}
}

Expand Down Expand Up @@ -80,20 +91,20 @@ impl BlockingClient {
}

fn get_opt_response<T: Decodable>(&self, path: &str) -> Result<Option<T>, Error> {
match self.get_request(path)?.send() {
match self.get_with_retry(path) {
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
let message = resp.as_str().unwrap_or_default().to_string();
Err(Error::HttpResponse { status, message })
}
Ok(resp) => Ok(Some(deserialize::<T>(resp.as_bytes())?)),
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

fn get_opt_response_txid(&self, path: &str) -> Result<Option<Txid>, Error> {
match self.get_request(path)?.send() {
match self.get_with_retry(path) {
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
Expand All @@ -103,12 +114,12 @@ impl BlockingClient {
Ok(resp) => Ok(Some(
Txid::from_str(resp.as_str().map_err(Error::Minreq)?).map_err(Error::HexToArray)?,
)),
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

fn get_opt_response_hex<T: Decodable>(&self, path: &str) -> Result<Option<T>, Error> {
match self.get_request(path)?.send() {
match self.get_with_retry(path) {
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
Expand All @@ -122,12 +133,12 @@ impl BlockingClient {
.map_err(Error::BitcoinEncoding)
.map(|r| Some(r))
}
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

fn get_response_hex<T: Decodable>(&self, path: &str) -> Result<T, Error> {
match self.get_request(path)?.send() {
match self.get_with_retry(path) {
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
let message = resp.as_str().unwrap_or_default().to_string();
Expand All @@ -138,51 +149,51 @@ impl BlockingClient {
let hex_vec = Vec::from_hex(hex_str).unwrap();
deserialize::<T>(&hex_vec).map_err(Error::BitcoinEncoding)
}
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

fn get_response_json<'a, T: serde::de::DeserializeOwned>(
&'a self,
path: &'a str,
) -> Result<T, Error> {
let response = self.get_request(path)?.send();
let response = self.get_with_retry(path);
match response {
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
let message = resp.as_str().unwrap_or_default().to_string();
Err(Error::HttpResponse { status, message })
}
Ok(resp) => Ok(resp.json::<T>().map_err(Error::Minreq)?),
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

fn get_opt_response_json<T: serde::de::DeserializeOwned>(
&self,
path: &str,
) -> Result<Option<T>, Error> {
match self.get_request(path)?.send() {
match self.get_with_retry(path) {
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
let message = resp.as_str().unwrap_or_default().to_string();
Err(Error::HttpResponse { status, message })
}
Ok(resp) => Ok(Some(resp.json::<T>()?)),
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

fn get_response_str(&self, path: &str) -> Result<String, Error> {
match self.get_request(path)?.send() {
match self.get_with_retry(path) {
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
let message = resp.as_str().unwrap_or_default().to_string();
Err(Error::HttpResponse { status, message })
}
Ok(resp) => Ok(resp.as_str()?.to_string()),
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

Expand Down Expand Up @@ -339,6 +350,28 @@ impl BlockingClient {
};
self.get_response_json(&path)
}

/// Sends a GET request to the given `url`, retrying failed attempts
/// for retryable error codes until max retries hit.
pub fn get_with_retry(&self, url: &str) -> Result<Response, Error> {
let mut attempts = 0;
let mut delay = self.backoff;

loop {
match self.get_request(url)?.send() {
Ok(resp)
if attempts < self.max_retries
&& RETRYABLE_ERROR_CODES.contains(&(resp.status_code as u16)) =>
{
thread::sleep(delay);
attempts += 1;
delay *= 2;
}
Ok(resp) => return Ok(resp),
Err(e) => return Err(Error::Minreq(e)),
}
}
}
}

fn is_status_ok(status: i32) -> bool {
Expand Down
31 changes: 31 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
use std::collections::HashMap;
use std::fmt;
use std::num::TryFromIntError;
use std::time::Duration;

pub mod api;

Expand All @@ -83,6 +84,18 @@ pub use blocking::BlockingClient;
#[cfg(feature = "async")]
pub use r#async::AsyncClient;

/// Response status codes for which the request may be retried.
const RETRYABLE_ERROR_CODES: [u16; 2] = [
429, // TOO_MANY_REQUESTS
503, // SERVICE_UNAVAILABLE
];

/// Default backoff.
const DEFAULT_BACKOFF: Duration = Duration::from_millis(512);

/// Default max retries.
const DEFAULT_MAX_RETRIES: u32 = 3;

/// Get a fee value in sats/vbytes from the estimates
/// that matches the confirmation target set as parameter.
///
Expand Down Expand Up @@ -117,6 +130,10 @@ pub struct Builder {
pub timeout: Option<u64>,
/// HTTP headers to set on every request made to Esplora server.
pub headers: HashMap<String, String>,
/// Backoff
pub backoff: Duration,
/// Max retries
pub max_retries: u32,
}

impl Builder {
Expand All @@ -127,6 +144,8 @@ impl Builder {
proxy: None,
timeout: None,
headers: HashMap::new(),
backoff: DEFAULT_BACKOFF,
max_retries: DEFAULT_MAX_RETRIES,
}
}

Expand All @@ -148,6 +167,18 @@ impl Builder {
self
}

/// Set backoff.
pub fn backoff(mut self, duration: Duration) -> Self {
self.backoff = duration;
self
}

/// Set max retries.
pub fn max_retries(mut self, count: u32) -> Self {
self.max_retries = count;
self
}

/// Build a blocking client from builder
#[cfg(feature = "blocking")]
pub fn build_blocking(self) -> BlockingClient {
Expand Down

0 comments on commit 1f67ece

Please sign in to comment.