Skip to content

Commit

Permalink
Use tokio unboundedsender / receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
rakanalh committed Jan 25, 2025
1 parent 177b0b7 commit fc8ed59
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 25 deletions.
3 changes: 2 additions & 1 deletion bin/citrea/tests/evm/fee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn test_minimum_base_fee() -> Result<(), anyhow::Error> {
let rollup_config =
create_default_rollup_config(true, &sequencer_db_dir, &da_db_dir, NodeMode::SequencerNode);
let sequencer_config = SequencerConfig::default();
tokio::spawn(async {
let seq_task = tokio::spawn(async {
// Don't provide a prover since the EVM is not currently provable
start_rollup(
port_tx,
Expand Down Expand Up @@ -59,5 +59,6 @@ async fn test_minimum_base_fee() -> Result<(), anyhow::Error> {
// Base fee should at most be 0.01 gwei
assert_eq!(block.header.base_fee_per_gas.unwrap(), 10000000);

seq_task.abort();
Ok(())
}
4 changes: 2 additions & 2 deletions bin/citrea/tests/test_helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub async fn start_rollup(
);
let span = info_span!("Sequencer");

let (mut sequencer, rpc_methods) = CitreaRollupBlueprint::create_sequencer(
let (mut sequencer, rpc_module) = CitreaRollupBlueprint::create_sequencer(
&mock_demo_rollup,
genesis_config,
rollup_config.clone(),
Expand All @@ -180,7 +180,7 @@ pub async fn start_rollup(
start_rpc_server(
rollup_config.rpc,
&mut task_manager,
rpc_methods,
rpc_module,
Some(rpc_reporting_channel),
);

Expand Down
5 changes: 2 additions & 3 deletions crates/sequencer/src/commitment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use std::sync::Arc;
use std::time::Instant;

use anyhow::anyhow;
use futures::channel::mpsc::UnboundedReceiver;
use futures::StreamExt;
use parking_lot::RwLock;
use rs_merkle::algorithms::Sha256;
use rs_merkle::MerkleTree;
Expand All @@ -14,6 +12,7 @@ use sov_modules_api::StateDiff;
use sov_rollup_interface::da::{BlockHeaderTrait, DaTxRequest, SequencerCommitment};
use sov_rollup_interface::services::da::{DaService, TxRequestWithNotifier};
use tokio::select;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument};
Expand Down Expand Up @@ -73,7 +72,7 @@ where
_ = cancellation_token.cancelled() => {
return;
},
info = self.soft_confirmation_rx.next() => {
info = self.soft_confirmation_rx.recv() => {
let Some((height, state_diff)) = info else {
// An error is returned because the channel is either
// closed or lagged.
Expand Down
4 changes: 2 additions & 2 deletions crates/sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use citrea_common::RollupPublicKeys;
pub use citrea_common::SequencerConfig;
use db_provider::DbProvider;
use deposit_data_mempool::DepositDataMempool;
use futures::channel::mpsc::unbounded;
use jsonrpsee::RpcModule;
use mempool::CitreaMempool;
use parking_lot::Mutex;
Expand All @@ -20,6 +19,7 @@ use sov_rollup_interface::services::da::DaService;
use sov_state::ProverStorage;
use sov_stf_runner::InitParams;
use tokio::sync::broadcast;
use tokio::sync::mpsc::unbounded_channel;

mod commitment;
pub mod db_migrations;
Expand Down Expand Up @@ -51,7 +51,7 @@ where
DB: SequencerLedgerOps + Send + Sync + Clone + 'static,
RT: Runtime<C, Da::Spec>,
{
let (l2_force_block_tx, l2_force_block_rx) = unbounded();
let (l2_force_block_tx, l2_force_block_rx) = unbounded_channel();
// used as client of reth's mempool
let db_provider = DbProvider::new(prover_storage.clone());
let mempool = Arc::new(CitreaMempool::new(
Expand Down
19 changes: 8 additions & 11 deletions crates/sequencer/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use alloy_eips::eip2718::Encodable2718;
use alloy_network::AnyNetwork;
use alloy_primitives::{Bytes, B256};
use citrea_evm::Evm;
use futures::channel::mpsc::UnboundedSender;
use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::types::error::{INTERNAL_ERROR_CODE, INTERNAL_ERROR_MSG};
Expand All @@ -17,6 +16,7 @@ use reth_rpc_types_compat::transaction::from_recovered;
use reth_transaction_pool::{EthPooledTransaction, PoolTransaction};
use sov_db::ledger_db::SequencerLedgerOps;
use sov_modules_api::{Context, WorkingSet};
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, error};

use crate::deposit_data_mempool::DepositDataMempool;
Expand Down Expand Up @@ -213,16 +213,13 @@ impl<C: sov_modules_api::Context, DB: SequencerLedgerOps + Send + Sync + 'static
}

debug!("Sequencer: citrea_testPublishBlock");
self.context
.l2_force_block_tx
.unbounded_send(())
.map_err(|e| {
ErrorObjectOwned::owned(
INTERNAL_ERROR_CODE,
INTERNAL_ERROR_MSG,
Some(format!("Could not send L2 force block transaction: {e}")),
)
})
self.context.l2_force_block_tx.send(()).map_err(|e| {
ErrorObjectOwned::owned(
INTERNAL_ERROR_CODE,
INTERNAL_ERROR_MSG,
Some(format!("Could not send L2 force block transaction: {e}")),
)
})
}
}

Expand Down
11 changes: 5 additions & 6 deletions crates/sequencer/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ use citrea_evm::{CallMessage, RlpEvmTransaction, MIN_TRANSACTION_GAS};
use citrea_primitives::basefee::calculate_next_block_base_fee;
use citrea_primitives::types::SoftConfirmationHash;
use citrea_stf::runtime::Runtime;
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::StreamExt;
use parking_lot::Mutex;
use reth_execution_types::ChangedAccount;
use reth_provider::{AccountReader, BlockReaderIdExt};
Expand All @@ -43,6 +41,7 @@ use sov_rollup_interface::stf::StateTransitionFunction;
use sov_state::ProverStorage;
use sov_stf_runner::InitParams;
use tokio::signal;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::{broadcast, mpsc};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -591,7 +590,7 @@ where

// Setup required workers to update our knowledge of the DA layer every X seconds (configurable).
let (da_height_update_tx, mut da_height_update_rx) = mpsc::channel(1);
let (da_commitment_tx, da_commitment_rx) = unbounded::<(u64, StateDiff)>();
let (da_commitment_tx, da_commitment_rx) = unbounded_channel::<(u64, StateDiff)>();

let mut commitment_service = CommitmentService::new(
self.ledger_db.clone(),
Expand Down Expand Up @@ -646,7 +645,7 @@ where
// If sequencer is in test mode, it will build a block every time it receives a message
// The RPC from which the sender can be called is only registered for test mode. This means
// that evey though we check the receiver here, it'll never be "ready" to be consumed unless in test mode.
_ = self.l2_force_block_rx.next(), if self.config.test_mode => {
_ = self.l2_force_block_rx.recv(), if self.config.test_mode => {
if missed_da_blocks_count > 0 {
if let Err(e) = self.process_missed_da_blocks(missed_da_blocks_count, last_used_l1_height, l1_fee_rate).await {
error!("Sequencer error: {}", e);
Expand All @@ -663,7 +662,7 @@ where
// Only errors when there are no receivers
let _ = self.soft_confirmation_tx.send(l2_height);

let _ = da_commitment_tx.unbounded_send((l2_height, state_diff));
let _ = da_commitment_tx.send((l2_height, state_diff));
},
Err(e) => {
error!("Sequencer error: {}", e);
Expand Down Expand Up @@ -695,7 +694,7 @@ where
// Only errors when there are no receivers
let _ = self.soft_confirmation_tx.send(l2_height);

let _ = da_commitment_tx.unbounded_send((l2_height, state_diff));
let _ = da_commitment_tx.send((l2_height, state_diff));
},
Err(e) => {
error!("Sequencer error: {}", e);
Expand Down

0 comments on commit fc8ed59

Please sign in to comment.