Skip to content

Commit

Permalink
Make DaService async (#508)
Browse files Browse the repository at this point in the history
  • Loading branch information
bkolad authored Jul 17, 2023
1 parent 7f98e41 commit 70e563d
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 153 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ rust-version = "1.66"
jmt = "0.6.0"

# External dependencies
async-trait = "0.1.71"
anyhow = "1.0.68"
borsh = { version = "0.10.3", features = ["rc", "bytes"] }
byteorder = "1.4.3"
Expand Down
1 change: 1 addition & 0 deletions adapters/celestia/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ tendermint = "0.32"
tendermint-proto = "0.32"

# Convenience
async-trait = { workspace = true }
anyhow = { workspace = true }
sha2 = { workspace = true }
base64 = "0.13.1"
Expand Down
159 changes: 76 additions & 83 deletions adapters/celestia/src/da_service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;

use async_trait::async_trait;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::params::ArrayParams;
use jsonrpsee::http_client::{HeaderMap, HttpClient};
Expand Down Expand Up @@ -113,18 +112,17 @@ const fn default_request_timeout_seconds() -> u64 {
60
}

#[async_trait]
impl DaService for CelestiaService {
type RuntimeConfig = DaServiceConfig;

type Spec = CelestiaSpec;

type FilteredBlock = FilteredCelestiaBlock;

type Future<T> = Pin<Box<dyn Future<Output = Result<T, Self::Error>> + Send>>;

type Error = BoxError;

fn new(config: Self::RuntimeConfig, chain_params: RollupParams) -> Self {
async fn new(config: Self::RuntimeConfig, chain_params: RollupParams) -> Self {
let client = {
let mut headers = HeaderMap::new();
headers.insert(
Expand All @@ -147,74 +145,70 @@ impl DaService for CelestiaService {
Self::with_client(client, chain_params.namespace)
}

fn get_finalized_at(&self, height: u64) -> Self::Future<Self::FilteredBlock> {
async fn get_finalized_at(&self, height: u64) -> Result<Self::FilteredBlock, Self::Error> {
let client = self.client.clone();
let rollup_namespace = self.rollup_namespace;
Box::pin(async move {
let _span = span!(Level::TRACE, "fetching finalized block", height = height);
// Fetch the header and relevant shares via RPC
info!("Fetching header at height={}...", height);
let header = client
.request::<serde_json::Value, _>("header.GetByHeight", vec![height])
.await?;
debug!(header_result = ?header);
debug!("Fetching shares...");
let (rollup_shares, tx_data) =
fetch_needed_shares_by_header(rollup_namespace, &client, &header).await?;

debug!("Fetching EDS...");
// Fetch entire extended data square
let data_square = client
.request::<ExtendedDataSquare, _>(
"share.GetEDS",
vec![header
.get("dah")
.ok_or(BoxError::msg("missing 'dah' in block header"))?],
)
.await?;

let unmarshalled_header: CelestiaHeaderResponse = serde_json::from_value(header)?;
let dah: DataAvailabilityHeader = unmarshalled_header.dah.try_into()?;
debug!("Parsing namespaces...");
// Parse out all of the rows containing etxs
let etx_rows =
get_rows_containing_namespace(PFB_NAMESPACE, &dah, data_square.rows()?.into_iter())
.await?;
// Parse out all of the rows containing rollup data
let rollup_rows = get_rows_containing_namespace(
rollup_namespace,
&dah,
data_square.rows()?.into_iter(),
let _span = span!(Level::TRACE, "fetching finalized block", height = height);
// Fetch the header and relevant shares via RPC
info!("Fetching header at height={}...", height);
let header = client
.request::<serde_json::Value, _>("header.GetByHeight", vec![height])
.await?;
debug!(header_result = ?header);
debug!("Fetching shares...");
let (rollup_shares, tx_data) =
fetch_needed_shares_by_header(rollup_namespace, &client, &header).await?;

debug!("Fetching EDS...");
// Fetch entire extended data square
let data_square = client
.request::<ExtendedDataSquare, _>(
"share.GetEDS",
vec![header
.get("dah")
.ok_or(BoxError::msg("missing 'dah' in block header"))?],
)
.await?;

debug!("Decoding pfb protobufs...");
// Parse out the pfds and store them for later retrieval
let pfds = parse_pfb_namespace(tx_data)?;
let mut pfd_map = HashMap::new();
for tx in pfds {
for (idx, nid) in tx.0.namespace_ids.iter().enumerate() {
if nid == &rollup_namespace.0[..] {
// TODO: Retool this map to avoid cloning txs
pfd_map.insert(tx.0.share_commitments[idx].clone(), tx.clone());
}
let unmarshalled_header: CelestiaHeaderResponse = serde_json::from_value(header)?;
let dah: DataAvailabilityHeader = unmarshalled_header.dah.try_into()?;
debug!("Parsing namespaces...");
// Parse out all of the rows containing etxs
let etx_rows =
get_rows_containing_namespace(PFB_NAMESPACE, &dah, data_square.rows()?.into_iter())
.await?;
// Parse out all of the rows containing rollup data
let rollup_rows =
get_rows_containing_namespace(rollup_namespace, &dah, data_square.rows()?.into_iter())
.await?;

debug!("Decoding pfb protobufs...");
// Parse out the pfds and store them for later retrieval
let pfds = parse_pfb_namespace(tx_data)?;
let mut pfd_map = HashMap::new();
for tx in pfds {
for (idx, nid) in tx.0.namespace_ids.iter().enumerate() {
if nid == &rollup_namespace.0[..] {
// TODO: Retool this map to avoid cloning txs
pfd_map.insert(tx.0.share_commitments[idx].clone(), tx.clone());
}
}
}

let filtered_block = FilteredCelestiaBlock {
header: CelestiaHeader::new(dah, unmarshalled_header.header.into()),
rollup_data: rollup_shares,
relevant_pfbs: pfd_map,
rollup_rows,
pfb_rows: etx_rows,
};
let filtered_block = FilteredCelestiaBlock {
header: CelestiaHeader::new(dah, unmarshalled_header.header.into()),
rollup_data: rollup_shares,
relevant_pfbs: pfd_map,
rollup_rows,
pfb_rows: etx_rows,
};

Ok::<Self::FilteredBlock, BoxError>(filtered_block)
})
Ok::<Self::FilteredBlock, BoxError>(filtered_block)
}

fn get_block_at(&self, height: u64) -> Self::Future<Self::FilteredBlock> {
self.get_finalized_at(height)
async fn get_block_at(&self, height: u64) -> Result<Self::FilteredBlock, Self::Error> {
self.get_finalized_at(height).await
}

fn extract_relevant_txs(
Expand Down Expand Up @@ -261,7 +255,7 @@ impl DaService for CelestiaService {
(etx_proofs.0, rollup_row_proofs.0)
}

fn send_transaction(&self, blob: &[u8]) -> <Self as DaService>::Future<()> {
async fn send_transaction(&self, blob: &[u8]) -> Result<(), Self::Error> {
// https://node-rpc-docs.celestia.org/
let client = self.client.clone();
info!("Sending {} bytes of raw data to Celestia.", blob.len());
Expand All @@ -271,26 +265,24 @@ impl DaService for CelestiaService {
// We factor extra share to be occupied for namespace, which is pessimistic
let gas_limit = get_gas_limit_for_bytes(blob.len());

Box::pin(async move {
let mut params = ArrayParams::new();
params.insert(namespace)?;
params.insert(blob)?;
params.insert(fee.to_string())?;
params.insert(gas_limit)?;
// Note, we only deserialize what we can use, other fields might be left over
let response = client
.request::<CelestiaBasicResponse, _>("state.SubmitPayForBlob", params)
.await?;
if !response.is_success() {
anyhow::bail!("Error returned from Celestia node: {:?}", response);
}
debug!("Response after submitting blob: {:?}", response);
info!(
"Blob has been submitted to Celestia. tx-hash={}",
response.tx_hash,
);
Ok::<(), BoxError>(())
})
let mut params = ArrayParams::new();
params.insert(namespace)?;
params.insert(blob)?;
params.insert(fee.to_string())?;
params.insert(gas_limit)?;
// Note, we only deserialize what we can use, other fields might be left over
let response = client
.request::<CelestiaBasicResponse, _>("state.SubmitPayForBlob", params)
.await?;
if !response.is_success() {
anyhow::bail!("Error returned from Celestia node: {:?}", response);
}
debug!("Response after submitting blob: {:?}", response);
info!(
"Blob has been submitted to Celestia. tx-hash={}",
response.tx_hash,
);
Ok::<(), BoxError>(())
}
}

Expand Down Expand Up @@ -406,7 +398,8 @@ mod tests {
RollupParams {
namespace: NamespaceId(namespace),
},
);
)
.await;

(mock_server, config, da_service, namespace)
}
Expand Down
Loading

0 comments on commit 70e563d

Please sign in to comment.