Skip to content

Commit 72c6288

Browse files
committed
shim: parallel submission
rollkit demo submits a lot of blobs. In my testing I saw dozens of blobs submitted at the same time. Ofc submitting those serially won't cut it, esp. so if waiting until finalization. This changeset solves the problem by splitting the signing part, which is still serial, and the submission, which is now parallel. As a drive-by fix, all the errors for rollkit are now encapsulated into an enum. Closes #259
1 parent c58e0ba commit 72c6288

File tree

5 files changed

+249
-65
lines changed

5 files changed

+249
-65
lines changed

ikura/shim/src/cmd/query/submit.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@ pub async fn run(params: Params) -> anyhow::Result<()> {
2020
let namespace = read_namespace(&namespace)?;
2121
let client = connect_rpc(rpc).await?;
2222
tracing::info!("submitting blob to namespace {}", namespace);
23-
let (block_hash, _) = client.submit_blob(blob, namespace, key).await?;
23+
let nonce = client.get_last_nonce(&key).await?;
24+
let blob_extrinsic = client
25+
.make_blob_extrinsic(blob, namespace, &key, nonce)
26+
.await?;
27+
let (block_hash, _) = client.submit_blob(&blob_extrinsic).await?;
2428
tracing::info!("submitted blob to block hash 0x{}", hex::encode(block_hash));
2529
Ok(())
2630
}

ikura/shim/src/dock/rollkit.rs

+134-47
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use std::{collections::HashMap, fmt};
1+
use std::{collections::HashMap, fmt, sync::Arc};
2+
use tokio::sync::Mutex;
23
use tonic::{transport::Server, Request, Response, Status};
34
use tracing::info;
45

@@ -57,6 +58,7 @@ struct RollkitDock {
5758
client: ikura_rpc::Client,
5859
submit_key: Option<Keypair>,
5960
namespace: Option<ikura_nmt::Namespace>,
61+
cur_nonce: Arc<Mutex<Option<u64>>>,
6062
}
6163

6264
impl RollkitDock {
@@ -69,6 +71,7 @@ impl RollkitDock {
6971
client,
7072
submit_key,
7173
namespace,
74+
cur_nonce: Arc::new(Mutex::new(None)),
7275
}
7376
}
7477
}
@@ -91,18 +94,16 @@ impl da_service_server::DaService for RollkitDock {
9194
let mut cache = HashMap::new();
9295
let mut response = GetResponse { blobs: vec![] };
9396
for (index, id) in ids.into_iter().enumerate() {
94-
let blob_id = BlobId::try_from(id)
95-
.map_err(|_| Status::invalid_argument(format!("not a valid ID at {index}")))?;
97+
let blob_id =
98+
BlobId::try_from(id).map_err(|_| RollkitDockError::GetInvalidBlobId { index })?;
9699
let block_number = blob_id.block_number;
97100
if !cache.contains_key(&block_number) {
98101
let block_hash = self.client.await_finalized_height(block_number).await;
99102
let block = self
100103
.client
101104
.await_block_at(Some(block_hash))
102105
.await
103-
.map_err(|_| {
104-
Status::internal("failed to retrieve block number {block_number}")
105-
})?;
106+
.map_err(|_| RollkitDockError::GetRetrieveBlock { block_number })?;
106107
cache.insert(blob_id.block_number, block);
107108
}
108109
// unwrap: at this point we know the block is in the cache, because at this point
@@ -117,7 +118,7 @@ impl da_service_server::DaService for RollkitDock {
117118
value: needle.data.clone(),
118119
});
119120
} else {
120-
return Err(Status::not_found(format!("blob not found at {blob_id}")));
121+
return Err(RollkitDockError::CantResolveBlobId(blob_id).into());
121122
}
122123
}
123124
Ok(Response::new(response))
@@ -158,57 +159,73 @@ impl da_service_server::DaService for RollkitDock {
158159
.submit_key
159160
.as_ref()
160161
.cloned()
161-
.ok_or_else(|| Status::failed_precondition("no key for signing blobs"))?;
162-
let namespace = self.namespace.ok_or_else(|| {
163-
Status::failed_precondition("no namespace provided, and no default namespace set")
164-
})?;
162+
.ok_or_else(|| RollkitDockError::NoSigningKey)?;
163+
let namespace = self
164+
.namespace
165+
.ok_or_else(|| RollkitDockError::NamespaceNotProvided)?;
165166
let SubmitRequest {
166167
blobs,
167168
gas_price: _,
168169
} = request.into_inner();
169-
let mut response = SubmitResponse {
170-
ids: vec![],
171-
proofs: vec![],
172-
};
173170
let blob_n = blobs.len();
171+
172+
// First, prepare a list of extrinsics to submit.
173+
let mut extrinsics = vec![];
174174
for (i, blob) in blobs.into_iter().enumerate() {
175175
let data_hash = sha2_hash(&blob.value);
176-
info!(
177-
"submitting blob {i}/{blob_n} (0x{}) to namespace {}",
178-
hex::encode(&data_hash),
179-
namespace,
180-
);
181-
let (block_hash, extrinsic_index) = self
182-
.client
183-
.submit_blob(blob.value, namespace, submit_key.clone())
176+
let nonce = self
177+
.gen_nonce()
184178
.await
185-
.map_err(|err| Status::internal(format!("failed to submit blob: {err}")))?;
186-
// TODO: getting the whole block is a bit inefficient, consider optimizing.
187-
let block_number = match self
179+
.map_err(RollkitDockError::NonceGeneration)?;
180+
let extrinsic = self
188181
.client
189-
.await_block_at(Some(block_hash))
182+
.make_blob_extrinsic(blob.value, namespace, &submit_key, nonce)
190183
.await
191-
.map(|block| block.number)
192-
{
193-
Ok(block_number) => block_number,
194-
Err(err) => {
195-
return Err(Status::internal(format!(
196-
"failed to obtain block number for 0x{}: {:?}",
197-
hex::encode(&block_hash),
198-
err,
199-
)));
200-
}
201-
};
202-
let blob_id = BlobId {
203-
block_number,
204-
extrinsic_index,
205-
data_hash,
206-
};
207-
info!("blob landed: {blob_id}");
208-
response.ids.push(blob_id.into());
209-
response.proofs.push(pbda::Proof { value: vec![] });
184+
.map_err(RollkitDockError::MakeSubmitBlobExtrinsic)?;
185+
extrinsics.push((i, data_hash, extrinsic));
210186
}
211-
Ok(Response::new(response))
187+
188+
// Then, submit the extrinsics in parallel and collect the results.
189+
let futs = extrinsics
190+
.into_iter()
191+
.map(|(i, data_hash, extrinsic)| async move {
192+
info!(
193+
"submitting blob {i}/{blob_n} (0x{}) to namespace {}",
194+
hex::encode(&data_hash),
195+
namespace
196+
);
197+
let (block_hash, extrinsic_index) = self
198+
.client
199+
.submit_blob(&extrinsic)
200+
.await
201+
.map_err(RollkitDockError::SubmitBlob)?;
202+
// TODO: getting the whole block is a bit inefficient, consider optimizing.
203+
let block_number = match self
204+
.client
205+
.await_block_at(Some(block_hash))
206+
.await
207+
.map(|block| block.number)
208+
{
209+
Ok(block_number) => block_number,
210+
Err(err) => {
211+
return Err(RollkitDockError::SubmitRetrieveBlockNumber {
212+
block_hash,
213+
err,
214+
});
215+
}
216+
};
217+
let blob_id = BlobId {
218+
block_number,
219+
extrinsic_index,
220+
data_hash,
221+
};
222+
info!("blob landed: {blob_id}");
223+
Ok(blob_id.into())
224+
});
225+
226+
let ids: Vec<_> = futures::future::try_join_all(futs).await?;
227+
let proofs = ids.iter().map(|_| pbda::Proof { value: vec![] }).collect();
228+
Ok(Response::new(SubmitResponse { proofs, ids }))
212229
}
213230

214231
async fn validate(
@@ -241,11 +258,81 @@ impl da_service_server::DaService for RollkitDock {
241258
}
242259
}
243260

261+
impl RollkitDock {
262+
/// Generates a new nonce suitable for signing an extrinsic from the signer.
263+
async fn gen_nonce(&self) -> anyhow::Result<u64> {
264+
let submit_key = self
265+
.submit_key
266+
.as_ref()
267+
.ok_or_else(|| anyhow::anyhow!("no key for signing blobs"))?; // should be unreachable
268+
let mut cur_nonce = self.cur_nonce.lock().await;
269+
let nonce = match *cur_nonce {
270+
Some(nonce) => nonce,
271+
None => self.client.get_last_nonce(&submit_key).await?,
272+
};
273+
cur_nonce.replace(nonce + 1);
274+
Ok(nonce)
275+
}
276+
}
277+
244278
fn sha2_hash(data: &[u8]) -> [u8; 32] {
245279
use sha2::Digest;
246280
sha2::Sha256::digest(data).into()
247281
}
248282

283+
enum RollkitDockError {
284+
NoSigningKey,
285+
MakeSubmitBlobExtrinsic(anyhow::Error),
286+
SubmitBlob(anyhow::Error),
287+
NonceGeneration(anyhow::Error),
288+
GetInvalidBlobId {
289+
index: usize,
290+
},
291+
GetRetrieveBlock {
292+
block_number: u64,
293+
},
294+
SubmitRetrieveBlockNumber {
295+
block_hash: [u8; 32],
296+
err: anyhow::Error,
297+
},
298+
CantResolveBlobId(BlobId),
299+
NamespaceNotProvided,
300+
}
301+
302+
impl From<RollkitDockError> for Status {
303+
fn from(me: RollkitDockError) -> Status {
304+
use RollkitDockError::*;
305+
match me {
306+
NoSigningKey => {
307+
Status::failed_precondition("the key for signing blobs is not provided")
308+
}
309+
MakeSubmitBlobExtrinsic(err) => {
310+
Status::internal(format!("failed to create a submit blob extrinsic: {err}"))
311+
}
312+
SubmitBlob(err) => Status::internal(format!("failed to submit blob: {err}")),
313+
NonceGeneration(err) => Status::internal(format!("failed to generate a nonce: {err}")),
314+
GetInvalidBlobId { index } => {
315+
Status::invalid_argument(format!("not a valid blob ID at index {index}"))
316+
}
317+
GetRetrieveBlock { block_number } => {
318+
Status::internal(format!("failed to retrieve block number {block_number}"))
319+
}
320+
SubmitRetrieveBlockNumber { block_hash, err } => Status::internal(format!(
321+
"failed to obtain block number for 0x{}: {}",
322+
hex::encode(block_hash),
323+
err,
324+
)),
325+
CantResolveBlobId(blob_id) => {
326+
Status::not_found(format!("cannot resolve blob ID: {blob_id}"))
327+
}
328+
NamespaceNotProvided => Status::failed_precondition(
329+
"no namespace provided, and no default names
330+
pace set",
331+
),
332+
}
333+
}
334+
}
335+
249336
struct BlobId {
250337
/// The block number at which the blob in question has been landed.
251338
block_number: u64,

ikura/shim/src/dock/rpc_error.rs

+19
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,25 @@ pub fn no_signing_key() -> ErrorObjectOwned {
88
)
99
}
1010

11+
pub fn nonce_obtain_error(e: anyhow::Error) -> ErrorObjectOwned {
12+
ErrorObjectOwned::owned(
13+
jsonrpsee::types::error::INTERNAL_ERROR_CODE,
14+
format!("Internal Error: failed to obtain nonce: {:?}", e),
15+
None::<()>,
16+
)
17+
}
18+
19+
pub fn submit_extrinsic_error(e: anyhow::Error) -> ErrorObjectOwned {
20+
ErrorObjectOwned::owned(
21+
jsonrpsee::types::error::INTERNAL_ERROR_CODE,
22+
format!(
23+
"Internal Error: failed to create a submit blob extrinsic: {:?}",
24+
e
25+
),
26+
None::<()>,
27+
)
28+
}
29+
1130
pub fn submission_error(e: anyhow::Error) -> ErrorObjectOwned {
1231
ErrorObjectOwned::owned(
1332
jsonrpsee::types::error::INTERNAL_ERROR_CODE,

ikura/shim/src/dock/sovereign.rs

+32-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
use std::sync::Arc;
2+
13
use ikura_shim_common_sovereign::{Block, SovereignRPCServer};
24
use jsonrpsee::{server::Server, types::ErrorObjectOwned};
5+
use tokio::sync::Mutex;
36
use tracing::info;
47

58
use super::rpc_error as err;
@@ -32,11 +35,16 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
3235
struct SovereignDock {
3336
client: ikura_rpc::Client,
3437
submit_key: Option<Keypair>,
38+
cur_nonce: Arc<Mutex<Option<u64>>>,
3539
}
3640

3741
impl SovereignDock {
3842
fn new(client: ikura_rpc::Client, submit_key: Option<Keypair>) -> Self {
39-
Self { client, submit_key }
43+
Self {
44+
client,
45+
submit_key,
46+
cur_nonce: Arc::new(Mutex::new(None)),
47+
}
4048
}
4149
}
4250

@@ -81,14 +89,36 @@ impl SovereignRPCServer for SovereignDock {
8189
.as_ref()
8290
.cloned()
8391
.ok_or_else(err::no_signing_key)?;
92+
let nonce = self.gen_nonce().await.map_err(err::nonce_obtain_error)?;
93+
let blob_extrinsic = self
94+
.client
95+
.make_blob_extrinsic(blob, namespace, &submit_key, nonce)
96+
.await
97+
.map_err(err::submit_extrinsic_error)?;
8498
self.client
85-
.submit_blob(blob, namespace, submit_key)
99+
.submit_blob(&blob_extrinsic)
86100
.await
87101
.map_err(err::submission_error)?;
88102
Ok(())
89103
}
90104
}
91105

106+
impl SovereignDock {
107+
async fn gen_nonce(&self) -> anyhow::Result<u64> {
108+
let submit_key = self
109+
.submit_key
110+
.as_ref()
111+
.ok_or_else(|| anyhow::anyhow!("no key for signing blobs"))?; // should be unreachable
112+
let mut cur_nonce = self.cur_nonce.lock().await;
113+
let nonce = match *cur_nonce {
114+
Some(nonce) => nonce,
115+
None => self.client.get_last_nonce(&submit_key).await?,
116+
};
117+
cur_nonce.replace(nonce + 1);
118+
Ok(nonce)
119+
}
120+
}
121+
92122
/// Creates a namespace proof for the given namespace in the given block.
93123
fn make_namespace_proof(
94124
block: &ikura_rpc::Block,

0 commit comments

Comments
 (0)