diff --git a/Cargo.lock b/Cargo.lock index 1c16c8d1f97..4a0b9031c9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -201,6 +201,7 @@ dependencies = [ "auto_impl", "c-kzg", "derive_more 1.0.0", + "k256", "serde", ] @@ -289,6 +290,7 @@ dependencies = [ "alloy-primitives", "alloy-rlp", "derive_more 1.0.0", + "k256", "serde", ] @@ -4789,6 +4791,7 @@ dependencies = [ "ethexe-sequencer", "ethexe-service-utils", "ethexe-signer", + "ethexe-tx-pool", "ethexe-validator", "futures", "futures-timer", @@ -4801,7 +4804,9 @@ dependencies = [ "ntest", "parity-scale-codec", "rand", + "reqwest", "serde", + "serde_json", "static_init", "tempfile", "tokio", @@ -4835,6 +4840,23 @@ dependencies = [ "tempfile", ] +[[package]] +name = "ethexe-tx-pool" +version = "1.7.1" +dependencies = [ + "anyhow", + "ethexe-common", + "ethexe-db", + "ethexe-signer", + "futures", + "gear-utils", + "gprimitives", + "hex", + "log", + "parity-scale-codec", + "tokio", +] + [[package]] name = "ethexe-validator" version = "1.7.1" diff --git a/Cargo.toml b/Cargo.toml index 154a3cda7e3..8f87c10e8d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -311,6 +311,7 @@ ethexe-prometheus = { path = "ethexe/prometheus", default-features = false } ethexe-validator = { path = "ethexe/validator", default-features = false } ethexe-rpc = { path = "ethexe/rpc", default-features = false } ethexe-common = { path = "ethexe/common", default-features = false } +ethexe-tx-pool = { path = "ethexe/tx-pool", default-features = false } # Common executor between `sandbox-host` and `lazy-pages-fuzzer` wasmi = { package = "wasmi", version = "0.38" } diff --git a/core/src/lib.rs b/core/src/lib.rs index 4905fe394ae..4c8fcee6bec 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -44,5 +44,7 @@ pub mod reservation; pub mod str; pub mod tasks; +pub use ids::{hash, hash_of_array}; + // This allows all casts from u32 into usize be safe. const _: () = assert!(size_of::() <= size_of::()); diff --git a/ethexe/cli/src/commands/tx.rs b/ethexe/cli/src/commands/tx.rs index b14411050d3..a9226dc22ec 100644 --- a/ethexe/cli/src/commands/tx.rs +++ b/ethexe/cli/src/commands/tx.rs @@ -218,6 +218,7 @@ impl TxCommand { } // TODO (breathx): impl reply, value claim and exec balance top up with watch. +// TODO (breathx) submit offchain txs /// Available transaction to submit. #[derive(Debug, Subcommand)] pub enum TxSubcommand { diff --git a/ethexe/common/src/lib.rs b/ethexe/common/src/lib.rs index 30dd41081d4..f429e0772fa 100644 --- a/ethexe/common/src/lib.rs +++ b/ethexe/common/src/lib.rs @@ -25,6 +25,7 @@ extern crate alloc; pub mod db; pub mod events; pub mod gear; +pub mod tx_pool; pub use gear_core; pub use gprimitives; diff --git a/ethexe/common/src/tx_pool.rs b/ethexe/common/src/tx_pool.rs new file mode 100644 index 00000000000..bcdebc0a9e1 --- /dev/null +++ b/ethexe/common/src/tx_pool.rs @@ -0,0 +1,132 @@ +// This file is part of Gear. +// +// Copyright (C) 2025 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! ethexe tx pool types + +use alloc::vec::Vec; +use core::fmt; +use gprimitives::{H160, H256}; +use parity_scale_codec::{Decode, Encode}; +use serde::{Deserialize, Serialize}; + +/// Ethexe transaction with a signature. +#[derive(Clone, Encode, Decode, PartialEq, Eq, Serialize, Deserialize)] +pub struct SignedOffchainTransaction { + pub signature: Vec, + pub transaction: OffchainTransaction, +} + +impl SignedOffchainTransaction { + /// Ethexe transaction blake2b256 hash. + pub fn tx_hash(&self) -> H256 { + gear_core::hash(&self.encode()).into() + } + + /// Ethexe transaction reference block hash + /// + /// Reference block hash is used for a transaction mortality check. + pub fn reference_block(&self) -> H256 { + self.transaction.reference_block + } +} + +impl fmt::Debug for SignedOffchainTransaction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SignedOffchainTransaction") + .field("signature", &hex::encode(&self.signature)) + .field("transaction", &self.transaction) + .finish() + } +} + +impl fmt::Display for SignedOffchainTransaction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "SignedOffchainTransaction {{ signature: 0x{}, transaction: {} }}", + hex::encode(&self.signature), + self.transaction + ) + } +} + +/// Ethexe offchain transaction with a reference block for mortality. +#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq, Serialize, Deserialize)] +pub struct OffchainTransaction { + pub raw: RawOffchainTransaction, + pub reference_block: H256, +} + +impl OffchainTransaction { + /// Recent block hashes window size used to check transaction mortality. + /// + /// ### Rationale + /// The constant could have been defined in the `ethexe-db`, + /// but defined here to ease upgrades without invalidation of the transactions + /// stores. + pub const BLOCK_HASHES_WINDOW_SIZE: u32 = 32; +} + +impl fmt::Display for OffchainTransaction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "OffchainTransaction {{ raw: {}, reference_block: {} }}", + self.raw, self.reference_block + ) + } +} + +/// Raw ethexe offchain transaction. +/// +/// A particular job to be processed without external specifics. +#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq, Serialize, Deserialize)] +pub enum RawOffchainTransaction { + SendMessage { program_id: H160, payload: Vec }, +} + +impl RawOffchainTransaction { + /// Gets the program id of the transaction. + pub fn program_id(&self) -> H160 { + match self { + RawOffchainTransaction::SendMessage { program_id, .. } => *program_id, + } + } + + /// Gets the payload of the transaction. + pub fn payload(&self) -> &[u8] { + match self { + RawOffchainTransaction::SendMessage { payload, .. } => payload, + } + } +} + +impl fmt::Display for RawOffchainTransaction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RawOffchainTransaction::SendMessage { + program_id, + payload, + } => f + .debug_struct("SendMessage") + .field("program_id", program_id) + .field("payload", &hex::encode(payload)) + .finish(), + } + } +} diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index 78d6db05e49..ccf4129a28c 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -22,10 +22,12 @@ use crate::{ overlay::{CASOverlay, KVOverlay}, CASDatabase, KVDatabase, }; +use anyhow::{bail, Result}; use ethexe_common::{ db::{BlockHeader, BlockMetaStorage, CodeInfo, CodesStorage, Schedule}, events::BlockRequestEvent, gear::StateTransition, + tx_pool::{OffchainTransaction, SignedOffchainTransaction}, }; use ethexe_runtime_common::state::{ Allocations, DispatchStash, HashOf, Mailbox, MemoryPages, MemoryPagesRegion, MessageQueue, @@ -58,6 +60,7 @@ enum KeyPrefix { CodeValid = 10, BlockStartSchedule = 11, BlockEndSchedule = 12, + Transaction = 13, } impl KeyPrefix { @@ -444,6 +447,60 @@ impl Database { self.cas.write(data) } + pub fn get_offchain_transaction(&self, tx_hash: H256) -> Option { + self.kv + .get(&KeyPrefix::Transaction.one(tx_hash)) + .map(|data| { + Decode::decode(&mut data.as_slice()) + .expect("failed to data into `SignedTransaction`") + }) + } + + pub fn set_offchain_transaction(&self, tx: SignedOffchainTransaction) { + let tx_hash = tx.tx_hash(); + self.kv + .put(&KeyPrefix::Transaction.one(tx_hash), tx.encode()); + } + + pub fn check_within_recent_blocks(&self, reference_block_hash: H256) -> Result<()> { + const ERR_MSG: &str = "Reference block isn't within recent blocks window"; + + let Some((latest_valid_block_hash, latest_valid_block_header)) = self.latest_valid_block() + else { + bail!("No latest valid block found"); + }; + let Some(reference_block_header) = self.block_header(reference_block_hash) else { + bail!("No reference block found"); + }; + + // If reference block is far away from the latest valid block, it's not in the window. + let Some(actual_window) = latest_valid_block_header + .height + .checked_sub(reference_block_header.height) + else { + bail!("Can't calculate actual window: reference block hash doesn't suit actual blocks state"); + }; + + if actual_window > OffchainTransaction::BLOCK_HASHES_WINDOW_SIZE { + bail!(ERR_MSG); + } + + // Check against reorgs. + let mut block_hash = latest_valid_block_hash; + for _ in 0..OffchainTransaction::BLOCK_HASHES_WINDOW_SIZE { + if block_hash == reference_block_hash { + return Ok(()); + } + + let Some(block_header) = self.block_header(block_hash) else { + bail!("{ERR_MSG}, after possible reorg reference block hash is not actual"); + }; + block_hash = block_header.parent_hash; + } + + bail!(ERR_MSG); + } + fn block_small_meta(&self, block_hash: H256) -> Option { self.kv .get(&KeyPrefix::BlockSmallMeta.two(self.router_address, block_hash)) diff --git a/ethexe/network/src/lib.rs b/ethexe/network/src/lib.rs index 8421da4706a..2606c1ba85a 100644 --- a/ethexe/network/src/lib.rs +++ b/ethexe/network/src/lib.rs @@ -367,7 +367,7 @@ impl NetworkService { topic, }, .. - }) if gpu_commitments_topic().hash() == topic => { + }) if commitments_topic().hash() == topic || offchain_tx_topic().hash() == topic => { return Some(NetworkEvent::Message { source, data }); } BehaviourEvent::Gossipsub(gossipsub::Event::GossipsubNotSupported { peer_id }) => { @@ -414,9 +414,20 @@ impl NetworkService { .swarm .behaviour_mut() .gossipsub - .publish(gpu_commitments_topic(), data) + .publish(commitments_topic(), data) { - log::debug!("gossipsub publishing failed: {e}") + log::error!("gossipsub publishing failed: {e}") + } + } + + pub fn publish_offchain_transaction(&mut self, data: Vec) { + if let Err(e) = self + .swarm + .behaviour_mut() + .gossipsub + .publish(offchain_tx_topic(), data) + { + log::error!("gossipsub publishing failed: {e}") } } @@ -532,7 +543,8 @@ impl Behaviour { ) .map_err(|e| anyhow!("`gossipsub` scoring parameters error: {e}"))?; - gossipsub.subscribe(&gpu_commitments_topic())?; + gossipsub.subscribe(&commitments_topic())?; + gossipsub.subscribe(&offchain_tx_topic())?; let db_sync = db_sync::Behaviour::new(db_sync::Config::default(), peer_score_handle, db); @@ -550,9 +562,13 @@ impl Behaviour { } } -fn gpu_commitments_topic() -> gossipsub::IdentTopic { +fn commitments_topic() -> gossipsub::IdentTopic { // TODO: use router address in topic name to avoid obsolete router - gossipsub::IdentTopic::new("gpu-commitments") + gossipsub::IdentTopic::new("ethexe-commitments") +} + +fn offchain_tx_topic() -> gossipsub::IdentTopic { + gossipsub::IdentTopic::new("ethexe-tx-pool") } #[cfg(test)] diff --git a/ethexe/rpc/Cargo.toml b/ethexe/rpc/Cargo.toml index 0757b75a0af..1e917ef154d 100644 --- a/ethexe/rpc/Cargo.toml +++ b/ethexe/rpc/Cargo.toml @@ -10,7 +10,7 @@ repository.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { workspace = true } +tokio = { workspace = true, features = ["sync"] } anyhow.workspace = true futures.workspace = true gprimitives = { workspace = true, features = ["serde", "ethexe"] } diff --git a/ethexe/rpc/src/apis/mod.rs b/ethexe/rpc/src/apis/mod.rs index ebe699a238c..683653c0d2d 100644 --- a/ethexe/rpc/src/apis/mod.rs +++ b/ethexe/rpc/src/apis/mod.rs @@ -20,8 +20,10 @@ mod block; mod code; mod dev; mod program; +mod tx_pool; pub use block::{BlockApi, BlockServer}; pub use code::{CodeApi, CodeServer}; pub use dev::{DevApi, DevServer}; pub use program::{ProgramApi, ProgramServer}; +pub use tx_pool::{TransactionPoolApi, TransactionPoolServer}; diff --git a/ethexe/rpc/src/apis/tx_pool.rs b/ethexe/rpc/src/apis/tx_pool.rs new file mode 100644 index 00000000000..e7908c010ea --- /dev/null +++ b/ethexe/rpc/src/apis/tx_pool.rs @@ -0,0 +1,90 @@ +// This file is part of Gear. +// +// Copyright (C) 2025 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Transaction pool rpc interface. + +use crate::{errors, RpcEvent}; +use ethexe_common::tx_pool::{OffchainTransaction, SignedOffchainTransaction}; +use gprimitives::H256; +use jsonrpsee::{ + core::{async_trait, RpcResult}, + proc_macros::rpc, +}; +use tokio::sync::{mpsc, oneshot}; + +#[rpc(server)] +pub trait TransactionPool { + #[method(name = "transactionPool_sendMessage")] + async fn send_message( + &self, + ethexe_tx: OffchainTransaction, + signature: Vec, + ) -> RpcResult; +} + +#[derive(Clone)] +pub struct TransactionPoolApi { + rpc_sender: mpsc::UnboundedSender, +} + +impl TransactionPoolApi { + pub fn new(rpc_sender: mpsc::UnboundedSender) -> Self { + Self { rpc_sender } + } +} + +#[async_trait] +impl TransactionPoolServer for TransactionPoolApi { + async fn send_message( + &self, + ethexe_tx: OffchainTransaction, + signature: Vec, + ) -> RpcResult { + let signed_ethexe_tx = SignedOffchainTransaction { + transaction: ethexe_tx, + signature, + }; + log::debug!("Called send_message with vars: {signed_ethexe_tx:#?}"); + + let (response_sender, response_receiver) = oneshot::channel(); + self.rpc_sender + .send(RpcEvent::OffchainTransaction { + transaction: signed_ethexe_tx, + response_sender, + }) + .map_err(|e| { + // That could be a panic case, as rpc_receiver must not be dropped, + // but the main service works independently from rpc and can be malformed. + log::error!( + "Failed to send `RpcEvent::OffchainTransaction` event task: {e}. \ + The receiving end in the main service might have been dropped." + ); + errors::internal() + })?; + + let res = response_receiver.await.map_err(|e| { + // No panic case, as a responsibility of the RPC API is fulfilled. + // The dropped sender signalizes that the main service has crashed + // or is malformed, so problems should be handled there. + log::error!("Response sender for the `RpcEvent::OffchainTransaction` was dropped: {e}"); + errors::internal() + })?; + + res.map_err(errors::tx_pool) + } +} diff --git a/ethexe/rpc/src/errors.rs b/ethexe/rpc/src/errors.rs index 84ef36694e6..c2eed838471 100644 --- a/ethexe/rpc/src/errors.rs +++ b/ethexe/rpc/src/errors.rs @@ -18,6 +18,8 @@ use jsonrpsee::types::ErrorObject; +// TODO #4364: https://github.com/gear-tech/gear/issues/4364 + pub fn db(err: &'static str) -> ErrorObject<'static> { ErrorObject::owned(8000, "Database error", Some(err)) } @@ -29,3 +31,7 @@ pub fn runtime(err: anyhow::Error) -> ErrorObject<'static> { pub fn internal() -> ErrorObject<'static> { ErrorObject::owned(8000, "Internal error", None::<&str>) } + +pub fn tx_pool(err: anyhow::Error) -> ErrorObject<'static> { + ErrorObject::owned(8000, "Transaction pool error", Some(format!("{err}"))) +} diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index ac0732d8ebc..5daaa92aaed 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -19,10 +19,13 @@ use anyhow::{anyhow, Result}; use apis::{ BlockApi, BlockServer, CodeApi, CodeServer, DevApi, DevServer, ProgramApi, ProgramServer, + TransactionPoolApi, TransactionPoolServer, }; +use ethexe_common::tx_pool::SignedOffchainTransaction; use ethexe_db::Database; use ethexe_observer::MockBlobReader; use futures::{stream::FusedStream, FutureExt, Stream}; +use gprimitives::H256; use jsonrpsee::{ server::{ serve_with_graceful_shutdown, stop_channel, Server, ServerHandle, StopHandle, @@ -31,7 +34,6 @@ use jsonrpsee::{ Methods, RpcModule as JsonrpcModule, }; use std::{ - mem, net::SocketAddr, pin::Pin, sync::Arc, @@ -39,7 +41,7 @@ use std::{ }; use tokio::{ net::TcpListener, - sync::mpsc::{self, UnboundedReceiver}, + sync::{mpsc, oneshot}, }; use tower::Service; @@ -89,9 +91,6 @@ impl RpcService { pub async fn run_server(self) -> Result<(ServerHandle, RpcReceiver)> { let (rpc_sender, rpc_receiver) = mpsc::unbounded_channel(); - // TODO: Temporary solution, will be changed with introducing tx pool. - mem::forget(rpc_sender); - let listener = TcpListener::bind(self.config.listen_addr).await?; let cors = util::try_into_cors(self.config.cors)?; @@ -106,6 +105,9 @@ impl RpcService { module.merge(ProgramServer::into_rpc(ProgramApi::new(self.db.clone())))?; module.merge(BlockServer::into_rpc(BlockApi::new(self.db.clone())))?; module.merge(CodeServer::into_rpc(CodeApi::new(self.db.clone())))?; + module.merge(TransactionPoolServer::into_rpc(TransactionPoolApi::new( + rpc_sender, + )))?; if self.config.dev { module.merge(DevServer::into_rpc(DevApi::new( @@ -184,7 +186,7 @@ impl RpcService { } } -pub struct RpcReceiver(UnboundedReceiver); +pub struct RpcReceiver(mpsc::UnboundedReceiver); impl Stream for RpcReceiver { type Item = RpcEvent; @@ -201,4 +203,9 @@ impl FusedStream for RpcReceiver { } #[derive(Debug)] -pub enum RpcEvent {} +pub enum RpcEvent { + OffchainTransaction { + transaction: SignedOffchainTransaction, + response_sender: oneshot::Sender>, + }, +} diff --git a/ethexe/service/Cargo.toml b/ethexe/service/Cargo.toml index 5ba04ec07dc..dbf2039dfb3 100644 --- a/ethexe/service/Cargo.toml +++ b/ethexe/service/Cargo.toml @@ -23,6 +23,7 @@ ethexe-common.workspace = true ethexe-runtime-common.workspace = true ethexe-prometheus.workspace = true ethexe-rpc.workspace = true +ethexe-tx-pool.workspace = true gprimitives.workspace = true clap = { workspace = true, features = ["derive"] } @@ -68,7 +69,8 @@ ntest = "0.9.3" gear-core.workspace = true gear-core-errors.workspace = true gear-utils.workspace = true -ethexe-network.workspace = true +reqwest.workspace = true +serde_json.workspace = true demo-ping = { workspace = true, features = ["debug", "ethexe"] } demo-async = { workspace = true, features = ["debug", "ethexe"] } diff --git a/ethexe/service/src/lib.rs b/ethexe/service/src/lib.rs index 7ff067f0f78..284e644a283 100644 --- a/ethexe/service/src/lib.rs +++ b/ethexe/service/src/lib.rs @@ -29,11 +29,13 @@ use ethexe_network::{db_sync, NetworkEvent, NetworkService}; use ethexe_observer::{MockBlobReader, ObserverEvent, ObserverService, RequestBlockData}; use ethexe_processor::{LocalOutcome, ProcessorConfig}; use ethexe_prometheus::{PrometheusEvent, PrometheusService}; +use ethexe_rpc::RpcEvent; use ethexe_sequencer::{ agro::AggregatedCommitments, SequencerConfig, SequencerEvent, SequencerService, }; use ethexe_service_utils::{OptionFuture as _, OptionStreamNext as _}; use ethexe_signer::{Digest, PublicKey, Signature, Signer}; +use ethexe_tx_pool::{SignedOffchainTransaction, TxPoolService}; use ethexe_validator::BlockCommitmentValidationRequest; use futures::StreamExt; use gprimitives::H256; @@ -53,6 +55,7 @@ pub struct Service { router_query: RouterQuery, processor: ethexe_processor::Processor, signer: ethexe_signer::Signer, + tx_pool: TxPoolService, // Optional services network: Option, @@ -77,6 +80,9 @@ pub enum NetworkMessage { codes: Option<(Digest, Signature)>, blocks: Option<(Digest, Signature)>, }, + OffchainTransaction { + transaction: SignedOffchainTransaction, + }, } impl Service { @@ -227,6 +233,8 @@ impl Service { ethexe_rpc::RpcService::new(config.clone(), db.clone(), mock_blob_reader.clone()) }); + let tx_pool = TxPoolService::new(db.clone()); + Ok(Self { db, network, @@ -239,6 +247,7 @@ impl Service { validator, prometheus, rpc, + tx_pool, }) } @@ -259,6 +268,7 @@ impl Service { router_query: RouterQuery, processor: ethexe_processor::Processor, signer: ethexe_signer::Signer, + tx_pool: TxPoolService, network: Option, sequencer: Option, validator: Option, @@ -277,6 +287,7 @@ impl Service { validator, prometheus, rpc, + tx_pool, } } @@ -386,6 +397,7 @@ impl Service { let mut commitments = vec![]; let last_committed_chain = query.get_last_committed_chain(block_data.hash).await?; + log::debug!("Last committed chain {:#?}", last_committed_chain); for block_hash in last_committed_chain.into_iter().rev() { let transitions = Self::process_one_block(db, query, processor, block_hash).await?; @@ -414,9 +426,8 @@ impl Service { } pub async fn run(self) -> Result<()> { - self.run_inner().await.map_err(|err| { + self.run_inner().await.inspect_err(|err| { log::error!("Service finished work with error: {err:?}"); - err }) } @@ -430,6 +441,7 @@ impl Service { mut processor, mut sequencer, signer: _signer, + tx_pool, mut validator, mut prometheus, rpc, @@ -672,6 +684,11 @@ impl Service { } } }, + NetworkMessage::OffchainTransaction { transaction } => { + if let Err(e) = Self::process_offchain_transaction(transaction, &tx_pool, &db, network.as_mut()) { + log::warn!("Failed to process offchain transaction received by p2p: {e}"); + } + }, }; } NetworkEvent::ExternalValidation(validating_response) => { @@ -716,6 +733,25 @@ impl Service { } event = rpc_receiver.maybe_next_some() => { log::info!("Received RPC event {event:#?}"); + + match event { + RpcEvent::OffchainTransaction { transaction, response_sender } => { + let res = Self::process_offchain_transaction( + transaction, + &tx_pool, + &db, + network.as_mut(), + ).context("Failed to process offchain transaction received from RPC"); + + + if let Err(e) = response_sender.send(res) { + // No panic case as a responsibility of the service is fulfilled. + // The dropped receiver signalizes that the rpc service has crashed + // or is malformed, so problems should be handled there. + log::error!("Response receiver for the `RpcEvent::OffchainTransaction` was dropped: {e:#?}"); + } + } + } } _ = rpc_handle.as_mut().maybe() => { log::info!("`RPCWorker` has terminated, shutting down..."); @@ -747,4 +783,38 @@ impl Service { Ok(true) } + + fn process_offchain_transaction( + transaction: SignedOffchainTransaction, + tx_pool: &TxPoolService, + db: &Database, + network: Option<&mut NetworkService>, + ) -> Result { + let validated_tx = tx_pool + .validate(transaction) + .context("Failed to validate offchain transaction")?; + let tx_hash = validated_tx.tx_hash(); + + // Set valid transaction + db.set_offchain_transaction(validated_tx.clone()); + + // Try propagate transaction + if let Some(n) = network { + n.publish_offchain_transaction( + NetworkMessage::OffchainTransaction { + transaction: validated_tx, + } + .encode(), + ); + } else { + log::debug!( + "Validated offchain transaction won't be propagated, network service isn't defined" + ); + } + + // TODO (breathx) Execute transaction + log::info!("Unimplemented tx execution"); + + Ok(tx_hash) + } } diff --git a/ethexe/service/src/tests.rs b/ethexe/service/src/tests.rs index c57b5046cb0..06a69e0bb82 100644 --- a/ethexe/service/src/tests.rs +++ b/ethexe/service/src/tests.rs @@ -40,6 +40,7 @@ use ethexe_prometheus::PrometheusConfig; use ethexe_rpc::RpcConfig; use ethexe_runtime_common::state::{Storage, ValueWithExpiry}; use ethexe_signer::Signer; +use ethexe_tx_pool::{OffchainTransaction, RawOffchainTransaction, SignedOffchainTransaction}; use ethexe_validator::Validator; use gear_core::{ ids::prelude::*, @@ -963,11 +964,135 @@ async fn multiple_validators() { assert_eq!(res.reply_payload, res.message_id.encode().as_slice()); } +#[tokio::test(flavor = "multi_thread")] +#[ntest::timeout(120_000)] +async fn tx_pool_gossip() { + gear_utils::init_default_logger(); + + let test_env_config = TestEnvConfig { + validators: ValidatorsConfig::Generated(2), + ..Default::default() + }; + + // Setup env of 2 nodes, one of them knows about the other one. + let mut env = TestEnv::new(test_env_config).await.unwrap(); + + log::info!("📗 Starting node 0"); + let mut node0 = env.new_node( + NodeConfig::default() + .validator(env.validators[0]) + .service_rpc(9505) + .network(None, None), + ); + node0.start_service().await; + + log::info!("📗 Starting node 1"); + let mut node1 = env.new_node( + NodeConfig::default() + .validator(env.validators[1]) + .network(None, node0.multiaddr.clone()), + ); + node1.start_service().await; + + log::info!("Populate node-0 and node-1 with 2 valid blocks"); + + env.observer + .provider() + .evm_mine(None) + .await + .expect("failed mining a new block"); + env.observer + .provider() + .evm_mine(None) + .await + .expect("failed mining a new block"); + + // Give some time for nodes to process the blocks + tokio::time::sleep(Duration::from_secs(2)).await; + let reference_block = node0 + .db + .latest_valid_block() + .expect("at least genesis block is latest valid") + .0; + + // Prepare tx data + let signed_ethexe_tx = { + let sender_pub_key = env.signer.generate_key().expect("failed generating key"); + + let ethexe_tx = OffchainTransaction { + raw: RawOffchainTransaction::SendMessage { + program_id: H160::random(), + payload: vec![], + }, + // referring to the latest valid block hash + reference_block, + }; + let signature = env + .signer + .sign(sender_pub_key, ethexe_tx.encode().as_ref()) + .expect("failed signing tx"); + SignedOffchainTransaction { + signature: signature.encode(), + transaction: ethexe_tx, + } + }; + + // Send request + log::info!("Sending tx pool request to node-1"); + let resp = send_json_request(node0.service_rpc_url().expect("rpc server is set"), || { + serde_json::json!({ + "jsonrpc": "2.0", + "method": "transactionPool_sendMessage", + "params": { + "ethexe_tx": signed_ethexe_tx.transaction, + "signature": signed_ethexe_tx.signature, + }, + "id": 1, + }) + }) + .await + .expect("failed sending request"); + + assert!(resp.status().is_success()); + + // This way the response from RPC server is checked to be `Ok`. + // In case of error RPC returns the `Ok` response with error message. + let resp = resp + .json::() + .await + .expect("failed to deserialize json response from rpc"); + assert!(resp.get("result").is_some()); + + // Tx executable validation takes time. + // Sleep for a while so tx is processed by both nodes. + tokio::time::sleep(Duration::from_secs(12)).await; + + // Check that node-1 received the message + let tx_hash = signed_ethexe_tx.tx_hash(); + let node1_db_tx = node1 + .db + .get_offchain_transaction(tx_hash) + .expect("tx not found"); + assert_eq!(node1_db_tx, signed_ethexe_tx); +} + +async fn send_json_request( + rpc_server_url: String, + create_request: impl Fn() -> serde_json::Value, +) -> Result { + let client = reqwest::Client::new(); + let req_body = create_request(); + + client.post(rpc_server_url).json(&req_body).send().await +} + mod utils { use super::*; use ethexe_network::export::Multiaddr; use ethexe_observer::{ObserverEvent, ObserverService, SimpleBlockData}; + use ethexe_rpc::RpcService; use ethexe_sequencer::{SequencerConfig, SequencerService}; + use ethexe_tx_pool::TxPoolService; use futures::StreamExt; use gear_core::message::ReplyCode; use std::{ @@ -1160,6 +1285,7 @@ mod utils { sequencer_public_key, validator_public_key, network, + service_rpc_config, } = config; let db = @@ -1192,6 +1318,7 @@ mod utils { validator_public_key, network_address, network_bootstrap_address, + service_rpc_config, } } @@ -1385,6 +1512,8 @@ mod utils { pub validator_public_key: Option, /// Network configuration, if provided then new node starts with network. pub network: Option, + /// RPC configuration, if provided then new node starts with RPC service. + pub service_rpc_config: Option, } impl NodeConfig { @@ -1414,6 +1543,17 @@ mod utils { }); self } + + pub fn service_rpc(mut self, rpc_port: u16) -> Self { + let service_rpc_config = RpcConfig { + listen_addr: SocketAddr::new("127.0.0.1".parse().unwrap(), rpc_port), + cors: None, + dev: false, + }; + self.service_rpc_config = Some(service_rpc_config); + + self + } } #[derive(Default)] @@ -1553,6 +1693,7 @@ mod utils { validator_public_key: Option, network_address: Option, network_bootstrap_address: Option, + service_rpc_config: Option, } impl Node { @@ -1628,6 +1769,12 @@ mod utils { None => None, }; + let tx_pool_service = TxPoolService::new(self.db.clone()); + + let rpc = self.service_rpc_config.as_ref().map(|service_rpc_config| { + RpcService::new(service_rpc_config.clone(), self.db.clone(), None) + }); + let service = Service::new_from_parts( self.db.clone(), self.observer.clone(), @@ -1635,11 +1782,12 @@ mod utils { router_query, processor, self.signer.clone(), + tx_pool_service, network, sequencer, validator, None, - None, + rpc, ); let handle = task::spawn(service.run()); self.running_service_handle = Some(handle); @@ -1657,6 +1805,12 @@ mod utils { let _ = handle.await; self.multiaddr = None; } + + pub fn service_rpc_url(&self) -> Option { + self.service_rpc_config + .as_ref() + .map(|rpc| format!("http://{}", rpc.listen_addr)) + } } #[derive(Clone)] diff --git a/ethexe/signer/Cargo.toml b/ethexe/signer/Cargo.toml index 6a2d8a310e7..2b43ce2f028 100644 --- a/ethexe/signer/Cargo.toml +++ b/ethexe/signer/Cargo.toml @@ -24,4 +24,4 @@ secp256k1 = { version = "0.30", features = ["rand", "global-context", "hashes", sha3 = { version = "0.10", default-features = false } [dev-dependencies] -alloy.workspace = true +alloy = { workspace = true, features = ["k256"] } diff --git a/ethexe/signer/src/address.rs b/ethexe/signer/src/address.rs new file mode 100644 index 00000000000..f441d3ab55a --- /dev/null +++ b/ethexe/signer/src/address.rs @@ -0,0 +1,126 @@ +// This file is part of Gear. +// +// Copyright (C) 2025 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Ethereum address. + +use crate::PublicKey; +use anyhow::{anyhow, Error, Result}; +use gprimitives::{ActorId, H160}; +use parity_scale_codec::{Decode, Encode}; +use sha3::Digest as _; +use std::{fmt, str::FromStr}; + +/// Ethereum address type. +/// +/// Basically a 20 bytes buffer, which is obtained from the least significant 20 bytes +/// of the hashed with keccak256 public key. +#[derive(Encode, Decode, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Address(pub [u8; 20]); + +impl Address { + /// Address hex string. + pub fn to_hex(&self) -> String { + hex::encode(self.0) + } +} + +impl From<[u8; 20]> for Address { + fn from(value: [u8; 20]) -> Self { + Self(value) + } +} + +impl From for Address { + fn from(value: H160) -> Self { + Self(value.into()) + } +} + +impl From for Address { + fn from(key: PublicKey) -> Self { + let public_key_uncompressed = secp256k1::PublicKey::from(key).serialize_uncompressed(); + + let mut address = Address::default(); + let hash = sha3::Keccak256::digest(&public_key_uncompressed[1..]); + address.0[..20].copy_from_slice(&hash[12..]); + + address + } +} + +impl FromStr for Address { + type Err = Error; + + fn from_str(s: &str) -> Result { + Ok(Self(crate::decode_to_array(s)?)) + } +} + +/// Tries to convert `ActorId`` into `Address`. +/// +/// Succeeds if first 12 bytes are 0. +impl TryFrom for Address { + type Error = Error; + + fn try_from(id: ActorId) -> Result { + id.as_ref() + .iter() + .take(12) + .all(|&byte| byte == 0) + .then_some(Address(id.to_address_lossy().0)) + .ok_or_else(|| anyhow!("First 12 bytes are not 0, it is not ethereum address")) + } +} + +impl From for Address { + fn from(value: u64) -> Self { + let actor_id = ActorId::from(value); + actor_id + .try_into() + .expect("actor id from `u64` has first 12 bytes being 0") + } +} + +impl From
for ActorId { + fn from(value: Address) -> Self { + H160(value.0).into() + } +} + +impl fmt::Debug for Address { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "0x{}", self.to_hex()) + } +} + +impl fmt::Display for Address { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "0x{}", self.to_hex()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_u64_to_address() { + // Does not panic + let _ = Address::from(u64::MAX / 2); + } +} diff --git a/ethexe/signer/src/digest.rs b/ethexe/signer/src/digest.rs index a29e299c7c2..5066ca2691e 100644 --- a/ethexe/signer/src/digest.rs +++ b/ethexe/signer/src/digest.rs @@ -16,7 +16,9 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! Keccak256 digest type. Implements AsDigest hashing for ethexe common types. +//! Keccak256 digest type. +//! +//! Implements `ToDigest` hashing for ethexe common types. use core::fmt; use ethexe_common::gear::{ @@ -25,6 +27,7 @@ use ethexe_common::gear::{ use parity_scale_codec::{Decode, Encode}; use sha3::Digest as _; +/// Common digest type for the ethexe. #[derive( Clone, Copy, @@ -39,7 +42,7 @@ use sha3::Digest as _; derive_more::Into, derive_more::AsRef, )] -pub struct Digest([u8; 32]); +pub struct Digest(pub(crate) [u8; 32]); impl fmt::Debug for Digest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { diff --git a/ethexe/signer/src/lib.rs b/ethexe/signer/src/lib.rs index 5db87627e1f..27138400cda 100644 --- a/ethexe/signer/src/lib.rs +++ b/ethexe/signer/src/lib.rs @@ -17,142 +17,130 @@ // along with this program. If not, see . //! Signer library for ethexe. - +//! +//! The crate defines types and related logic for private keys, public keys types, +//! cryptographic signatures and ethereum address. +//! +//! Cryptographic instrumentary of the crate is based on secp256k1 standard +//! using [secp256k1](https://crates.io/crates/secp256k1) crate, but all the +//! machinery used is wrapped in the crate's types. + +mod address; mod digest; mod signature; +// Exports +pub use address::Address; pub use digest::{Digest, ToDigest}; -use secp256k1::hashes::hex::{Case, DisplayHex}; pub use sha3; pub use signature::Signature; -use anyhow::{anyhow, bail, Result}; -use gprimitives::{ActorId, H160}; +use anyhow::{anyhow, bail, Error, Result}; use parity_scale_codec::{Decode, Encode}; -use sha3::Digest as _; +use secp256k1::{ + hashes::hex::{Case, DisplayHex}, + PublicKey as Secp256k1PublicKey, SecretKey as Secp256k1SecretKey, +}; use signature::RawSignature; use std::{fmt, fs, path::PathBuf, str::FromStr}; -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -pub struct PublicKey(pub [u8; 33]); - +/// Private key. +/// +/// Private key type used for elliptic curves maths for secp256k1 standard +/// is a 256 bits unsigned integer, which the type stores as a 32 bytes array. #[derive(Encode, Decode, Default, Clone, Copy, PartialEq, Eq, Hash)] pub struct PrivateKey(pub [u8; 32]); -impl From for PublicKey { +impl From for Secp256k1SecretKey { fn from(key: PrivateKey) -> Self { - let secret_key = - secp256k1::SecretKey::from_slice(&key.0[..]).expect("32 bytes, within curve order"); - let public_key = secp256k1::PublicKey::from_secret_key_global(&secret_key); - - PublicKey::from_bytes(public_key.serialize()) + Secp256k1SecretKey::from_byte_array(&key.0).expect("32 bytes; within curve order") } } -#[derive(Encode, Decode, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct Address(pub [u8; 20]); - -impl From<[u8; 20]> for Address { - fn from(value: [u8; 20]) -> Self { - Self(value) - } -} +impl FromStr for PrivateKey { + type Err = Error; -impl From for Address { - fn from(value: H160) -> Self { - Self(value.into()) + fn from_str(s: &str) -> Result { + Ok(Self(decode_to_array(s)?)) } } -impl TryFrom for Address { - type Error = anyhow::Error; - - fn try_from(id: ActorId) -> std::result::Result { - id.as_ref() - .iter() - .take(12) - .all(|&byte| byte == 0) - .then_some(Address(id.to_address_lossy().0)) - .ok_or_else(|| anyhow!("First 12 bytes are not 0, it is not ethereum address")) - } -} +/// Public key. +/// +/// Basically, public key is a point on the elliptic curve, which should have +/// two coordinates - `x` and `y`, both 256 bits unsigned integers. But it's possible +/// to store only `x` coordinate, as `y` can be calculated. +/// +/// As the secp256k1 elliptic curve is symmetric, the y can be either positive or +/// negative. To stress the exact position of the `y` the prefix byte is used, so +/// the public key becomes 33 bytes, not 32. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct PublicKey(pub [u8; 33]); -impl From
for ActorId { - fn from(value: Address) -> Self { - H160(value.0).into() - } -} +impl PublicKey { + /// Create public key from the private key. + /// + /// Only `ethexe-signer` types are used. + pub fn from_private(private_key: PrivateKey) -> Self { + let secret_key = private_key.into(); + let public_key = Secp256k1PublicKey::from_secret_key_global(&secret_key); -fn strip_prefix(s: &str) -> &str { - if let Some(s) = s.strip_prefix("0x") { - s - } else { - s + public_key.into() } -} -fn decode_to_array(s: &str) -> Result<[u8; N]> { - let mut buf = [0; N]; - hex::decode_to_slice(strip_prefix(s), &mut buf) - .map_err(|_| anyhow!("invalid hex format for {s:?}"))?; - Ok(buf) -} + pub fn try_from_slice(slice: &[u8]) -> Result { + let bytes = <[u8; 33]>::try_from(slice)?; -impl FromStr for PrivateKey { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - Ok(Self(decode_to_array(s)?)) + Ok(Self::from_bytes(bytes)) } -} -impl PublicKey { + /// Create public key from compressed public key bytes. pub fn from_bytes(bytes: [u8; 33]) -> Self { Self(bytes) } + /// Public key hex string. pub fn to_hex(&self) -> String { hex::encode(self.0) } + /// Convert public key to ethereum address. pub fn to_address(&self) -> Address { - let public_key_uncompressed = secp256k1::PublicKey::from_slice(&self.0) - .expect("Invalid public key") - .serialize_uncompressed(); - - let mut address = Address::default(); - let hash = sha3::Keccak256::digest(&public_key_uncompressed[1..]); - address.0[..20].copy_from_slice(&hash[12..]); - - address + (*self).into() } } -impl FromStr for PublicKey { - type Err = anyhow::Error; +impl From for PublicKey { + fn from(key: PrivateKey) -> Self { + Self::from_private(key) + } +} - fn from_str(s: &str) -> Result { - Ok(Self(decode_to_array(s)?)) +impl From for PublicKey { + fn from(key: Secp256k1PublicKey) -> Self { + Self(key.serialize()) } } -impl Address { - pub fn to_hex(&self) -> String { - hex::encode(self.0) +impl From for Secp256k1PublicKey { + fn from(key: PublicKey) -> Self { + Secp256k1PublicKey::from_byte_array_compressed(&key.0).expect("invalid public key") } } -impl FromStr for Address { - type Err = anyhow::Error; +impl FromStr for PublicKey { + type Err = Error; - fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> Result { Ok(Self(decode_to_array(s)?)) } } -impl fmt::Debug for Address { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "0x{}", self.to_hex()) +impl TryFrom<&[u8]> for PublicKey { + type Error = Error; + + fn try_from(data: &[u8]) -> Result { + Self::try_from_slice(data) } } @@ -162,24 +150,21 @@ impl fmt::Display for PublicKey { } } -impl fmt::Display for Address { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "0x{}", self.to_hex()) - } -} - +/// Signer which signs data using owned key store. #[derive(Debug, Clone)] pub struct Signer { key_store: PathBuf, } impl Signer { + /// Create a new signer with a key store location. pub fn new(key_store: PathBuf) -> Result { fs::create_dir_all(key_store.as_path())?; Ok(Self { key_store }) } + /// Create a new signer with a key temporary key store location. pub fn tmp() -> Self { let temp_dir = tempfile::tempdir().expect("Cannot create temp dir for keys"); Self { @@ -187,34 +172,40 @@ impl Signer { } } + /// Create a ECDSA recoverable signature with `Electrum` notation for the `v` value. + /// + /// For more info about `v` value read [`RawSignature`] docs. pub fn raw_sign_digest(&self, public_key: PublicKey, digest: Digest) -> Result { let private_key = self.get_private_key(public_key)?; RawSignature::create_for_digest(private_key, digest) } + /// Create a ECDSA recoverable signature. + // TODO #4365 pub fn sign_digest(&self, public_key: PublicKey, digest: Digest) -> Result { let private_key = self.get_private_key(public_key)?; Signature::create_for_digest(private_key, digest) } + /// Create a ECDSA recoverable signature for the raw bytes data. pub fn sign(&self, public_key: PublicKey, data: &[u8]) -> Result { self.sign_digest(public_key, data.to_digest()) } + /// Create a ECDSA recoverable signature for the raw bytes data with + /// an ethereum address provided instead of the public key. + /// + /// If the private key for the ethereum address is stored, the signature will be returned. pub fn sign_with_addr(&self, address: Address, data: &[u8]) -> Result { - let keys = self.list_keys()?; - - for key in keys { - if key.to_address() == address { - return self.sign(key, data); - } + match self.get_key_by_addr(address)? { + Some(public_key) => self.sign(public_key, data), + None => bail!("Address not found: {}", address), } - - bail!("Address not found: {}", address); } + /// Get a public key for the provided ethereum address. If no key found a `None` is returned. pub fn get_key_by_addr(&self, address: Address) -> Result> { let keys = self.list_keys()?; @@ -227,50 +218,55 @@ impl Signer { Ok(None) } + /// Check if key exists for the ethereum address. pub fn has_addr(&self, address: Address) -> Result { Ok(self.get_key_by_addr(address)?.is_some()) } + /// Check if key exists in the key store. pub fn has_key(&self, key: PublicKey) -> Result { let key_path = self.key_store.join(key.to_hex()); let has_key = fs::metadata(key_path).is_ok(); Ok(has_key) } + /// Add a private key to the key store. pub fn add_key(&self, key: PrivateKey) -> Result { - let secret_key = - secp256k1::SecretKey::from_slice(&key.0[..]).expect("32 bytes, within curve order"); - let public_key = secp256k1::PublicKey::from_secret_key_global(&secret_key); + let public_key: PublicKey = key.into(); - let local_public = PublicKey::from_bytes(public_key.serialize()); + let key_file = self.key_store.join(public_key.to_hex()); + fs::write(key_file, key.0)?; - let key_file = self.key_store.join(local_public.to_hex()); - fs::write(key_file, secret_key.secret_bytes())?; - Ok(local_public) + Ok(public_key) } + /// Generate a new private key and return a public key for it. pub fn generate_key(&self) -> Result { - let (secret_key, public_key) = + let (secp256k1_secret_key, secp256k1_public_key) = secp256k1::generate_keypair(&mut secp256k1::rand::thread_rng()); - let local_public = PublicKey::from_bytes(public_key.serialize()); + let public_key: PublicKey = secp256k1_public_key.into(); log::debug!( "Secret key generated: {}", - secret_key.secret_bytes().to_hex_string(Case::Lower) + secp256k1_secret_key + .secret_bytes() + .to_hex_string(Case::Lower) ); - let key_file = self.key_store.join(local_public.to_hex()); - fs::write(key_file, secret_key.secret_bytes())?; - Ok(local_public) + let key_file = self.key_store.join(public_key.to_hex()); + fs::write(key_file, secp256k1_secret_key.secret_bytes())?; + Ok(public_key) } + /// Remove all the keys from the key store. pub fn clear_keys(&self) -> Result<()> { fs::remove_dir_all(&self.key_store)?; Ok(()) } + /// Get a list of the stored public keys. pub fn list_keys(&self) -> Result> { let mut keys = vec![]; @@ -284,6 +280,7 @@ impl Signer { Ok(keys) } + /// Get a private key for the public one from the key store. pub fn get_private_key(&self, key: PublicKey) -> Result { let mut buf = [0u8; 32]; @@ -300,10 +297,25 @@ impl Signer { } } +// Decodes hexed string to a byte array. +pub(crate) fn decode_to_array(s: &str) -> Result<[u8; N]> { + let mut buf = [0; N]; + + // Strip the "0x" prefix if it exists. + let stripped = s.strip_prefix("0x").unwrap_or(s); + + // Decode + hex::decode_to_slice(stripped, &mut buf) + .map_err(|_| anyhow!("invalid hex format for {stripped:?}"))?; + + Ok(buf) +} + #[cfg(test)] mod tests { use super::*; - use alloy::primitives::{keccak256, PrimitiveSignature as Signature}; + use alloy::primitives::{keccak256, PrimitiveSignature as AlloySignature}; + use gprimitives::ActorId; use std::env::temp_dir; #[test] @@ -333,7 +345,7 @@ mod tests { let hash = keccak256(message); // Recover the address using the signature - let alloy_sig = Signature::try_from(signature.as_ref()).expect("failed to parse sig"); + let alloy_sig = AlloySignature::try_from(signature.as_ref()).expect("failed to parse sig"); let recovered_address = alloy_sig .recover_address_from_prehash(&hash) @@ -371,7 +383,7 @@ mod tests { let hash = keccak256(message); // Recover the address using the signature - let alloy_sig = Signature::try_from(signature.as_ref()).expect("failed to parse sig"); + let alloy_sig = AlloySignature::try_from(signature.as_ref()).expect("failed to parse sig"); let recovered_address = alloy_sig .recover_address_from_prehash(&hash) diff --git a/ethexe/signer/src/signature.rs b/ethexe/signer/src/signature.rs index 270f2002884..62128fbb8ea 100644 --- a/ethexe/signer/src/signature.rs +++ b/ethexe/signer/src/signature.rs @@ -19,7 +19,7 @@ //! Secp256k1 signature types and utilities. use crate::{Digest, PrivateKey, PublicKey}; -use anyhow::{Context, Result}; +use anyhow::{Error, Result}; use parity_scale_codec::{Decode, Encode}; use secp256k1::{ ecdsa::{RecoverableSignature, RecoveryId}, @@ -27,24 +27,29 @@ use secp256k1::{ }; use std::fmt; +/// A recoverable ECDSA signature with `v` value in an `Electrum` notation. +/// +/// 'Electrum' notation signatures define `v` to be from the `{0; 1}` set. #[derive(Clone, Copy, PartialEq, Eq)] pub struct RawSignature([u8; 65]); impl RawSignature { + /// Create a recoverable signature for the provided digest using the private key. pub fn create_for_digest(private_key: PrivateKey, digest: Digest) -> Result { - let secp_secret_key = secp256k1::SecretKey::from_slice(&private_key.0) - .with_context(|| "Invalid secret key format")?; - + let secp_secret_key = private_key.into(); let message = Message::from_digest(digest.into()); let recoverable = secp256k1::global::SECP256K1.sign_ecdsa_recoverable(&message, &secp_secret_key); - let (id, signature) = recoverable.serialize_compact(); - let mut bytes = [0u8; 65]; - bytes[..64].copy_from_slice(signature.as_ref()); - bytes[64] = i32::from(id) as u8; - Ok(RawSignature(bytes)) + + let mut ret = [0u8; 65]; + ret[..64].copy_from_slice(signature.as_ref()); + ret[64] = i32::from(id) + .try_into() + .expect("recovery id is within u8 range"); + + Ok(RawSignature(ret)) } } @@ -62,40 +67,67 @@ impl AsRef<[u8]> for RawSignature { impl From for RawSignature { fn from(mut sig: Signature) -> RawSignature { + // TODO #4365: https://github.com/gear-tech/gear/issues/4365 sig.0[64] -= 27; RawSignature(sig.0) } } +/// A recoverable ECDSA signature type with any possible `v`. +/// +/// The signature can be in 'Electrum' notation, pre- or post- EIP-155 notations. #[derive(Clone, Copy, Encode, Decode, PartialEq, Eq)] pub struct Signature([u8; 65]); impl Signature { + /// Create a recoverable signature for the provided digest using the private key. + pub fn create_for_digest(private_key: PrivateKey, digest: Digest) -> Result { + let raw_signature = RawSignature::create_for_digest(private_key, digest)?; + Ok(raw_signature.into()) + } + /// # Safety /// This function is unsafe because it does not check the validity of the input bytes. pub const unsafe fn from_bytes(bytes: [u8; 65]) -> Self { Self(bytes) } + /// Covert signature to hex string. pub fn to_hex(&self) -> String { hex::encode(self.0) } + /// Verify the signature with public key recovery from the signature. + pub fn verify_with_public_key_recover(&self, digest: Digest) -> Result<()> { + let public_key = self.recover_from_digest(digest)?; + self.verify(public_key, digest) + } + + /// Recovers public key which was used to create the signature for the signed digest. pub fn recover_from_digest(&self, digest: Digest) -> Result { - let sig = (*self).try_into()?; - let public_key = secp256k1::global::SECP256K1 - .recover_ecdsa(&Message::from_digest(digest.into()), &sig)?; - Ok(PublicKey::from_bytes(public_key.serialize())) + let signature: RecoverableSignature = (*self).try_into()?; + signature + .recover(&Message::from_digest(digest.0)) + .map(PublicKey::from) + .map_err(Into::into) } - pub fn create_for_digest(private_key: PrivateKey, digest: Digest) -> Result { - let raw_signature = RawSignature::create_for_digest(private_key, digest)?; - Ok(raw_signature.into()) + /// Verifies the signature using the public key and digest possibly signed with + /// the public key. + pub fn verify(&self, public_key: PublicKey, digest: Digest) -> Result<()> { + let signature: RecoverableSignature = (*self).try_into()?; + let message = Message::from_digest(digest.0); + let secp256k1_public_key = public_key.into(); + + secp256k1::global::SECP256K1 + .verify_ecdsa(&message, &signature.to_standard(), &secp256k1_public_key) + .map_err(Into::into) } } impl From for Signature { fn from(mut sig: RawSignature) -> Self { + // TODO #4365: https://github.com/gear-tech/gear/issues/4365 sig.0[64] += 27; Signature(sig.0) } @@ -107,6 +139,14 @@ impl From for [u8; 65] { } } +impl TryFrom<&[u8]> for Signature { + type Error = Error; + + fn try_from(mut data: &[u8]) -> Result { + Decode::decode(&mut data).map_err(Into::into) + } +} + impl AsRef<[u8]> for Signature { fn as_ref(&self) -> &[u8] { &self.0 @@ -131,6 +171,7 @@ impl TryFrom for RecoverableSignature { fn try_from(sig: Signature) -> Result { RecoverableSignature::from_compact( sig.0[..64].as_ref(), + // TODO: Include chain id, as that's for transaction of pre-EIP-155 (!) RecoveryId::try_from((sig.0[64] - 27) as i32)?, ) .map_err(Into::into) diff --git a/ethexe/tx-pool/Cargo.toml b/ethexe/tx-pool/Cargo.toml new file mode 100644 index 00000000000..7bd08c73537 --- /dev/null +++ b/ethexe/tx-pool/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "ethexe-tx-pool" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +ethexe-signer.workspace = true +ethexe-common.workspace = true +ethexe-db.workspace = true +gprimitives = { workspace = true, features = ["codec"] } + +anyhow.workspace = true +futures.workspace = true +hex.workspace = true +log.workspace = true +parity-scale-codec = { workspace = true, features = ["std", "derive"] } +tokio = { workspace = true, features = ["sync", "rt"] } + +[dev-dependencies] +tokio = { workspace = true, features = ["sync", "rt", "macros"] } +gear-utils.workspace = true diff --git a/ethexe/tx-pool/src/lib.rs b/ethexe/tx-pool/src/lib.rs new file mode 100644 index 00000000000..115649a52db --- /dev/null +++ b/ethexe/tx-pool/src/lib.rs @@ -0,0 +1,71 @@ +// This file is part of Gear. +// +// Copyright (C) 2025 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Ethexe transaction pool. + +mod validation; + +#[cfg(test)] +mod tests; + +use anyhow::{Context as _, Result}; +pub use ethexe_common::tx_pool::{ + OffchainTransaction, RawOffchainTransaction, SignedOffchainTransaction, +}; +use ethexe_db::Database; +use ethexe_signer::{Address, Signature, ToDigest}; +use gprimitives::{ActorId, H160}; +use parity_scale_codec::Encode; +use validation::TxValidator; + +/// Transaction pool service. +/// +/// Serves as an interface for the transaction pool core. +pub struct TxPoolService { + db: Database, +} + +impl TxPoolService { + pub fn new(db: Database) -> Self { + Self { db } + } + + /// Basically validates the transaction and includes the transaction + /// to the ready queue, so it's returned by the service stream. + pub fn validate( + &self, + transaction: SignedOffchainTransaction, + ) -> Result { + TxValidator::new(transaction, self.db.clone()) + .with_all_checks() + .validate() + .context("Tx validation failed") + } +} + +/// Gets source of the `SendMessage` transaction recovering it from the signature. +pub fn tx_send_message_source(tx: &SignedOffchainTransaction) -> Result { + Signature::try_from(tx.signature.as_ref()) + .and_then(|signature| signature.recover_from_digest(tx.transaction.encode().to_digest())) + .map(|public_key| H160::from(Address::from(public_key).0).into()) +} + +/// Ethexe transaction signature. +fn tx_signature(tx: &SignedOffchainTransaction) -> Result { + Signature::try_from(tx.signature.as_ref()) +} diff --git a/ethexe/tx-pool/src/tests.rs b/ethexe/tx-pool/src/tests.rs new file mode 100644 index 00000000000..132a040208b --- /dev/null +++ b/ethexe/tx-pool/src/tests.rs @@ -0,0 +1,113 @@ +// This file is part of Gear. +// +// Copyright(C) 2025 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Testing module for the tx pool. +//! +//! Test here mainly focus on: +//! - the overall logic of the tx pool to work as expected +//! - the channels inside the tx pool service to work as expected + +use crate::{ + OffchainTransaction, RawOffchainTransaction, SignedOffchainTransaction, TxPoolService, +}; +use ethexe_db::{BlockHeader, BlockMetaStorage, Database, MemDb}; +use ethexe_signer::{PrivateKey, Signer, ToDigest}; +use gprimitives::{H160, H256}; +use parity_scale_codec::Encode; +use std::str::FromStr; + +pub(crate) fn generate_signed_ethexe_tx(reference_block_hash: H256) -> SignedOffchainTransaction { + let signer = Signer::tmp(); + let public_key = signer + .add_key( + PrivateKey::from_str( + "4c0883a69102937d6231471b5dbb6204fe51296170827936ea5cce4b76994b0f", + ) + .expect("invalid private key"), + ) + .expect("key addition failed"); + + let transaction = OffchainTransaction { + raw: RawOffchainTransaction::SendMessage { + program_id: H160::random(), + payload: vec![], + }, + reference_block: reference_block_hash, + }; + let signature = signer + .sign_digest(public_key, transaction.encode().to_digest()) + .expect("signing failed"); + + SignedOffchainTransaction { + transaction, + signature: signature.encode(), + } +} + +pub(crate) fn new_block(parent_hash: Option) -> (H256, BlockHeader) { + let block_hash = H256::random(); + let header = BlockHeader { + height: 0, + timestamp: 0, + parent_hash: parent_hash.unwrap_or(H256::random()), + }; + + (block_hash, header) +} + +#[tokio::test] +async fn test_add_transaction() { + gear_utils::init_default_logger(); + + let db = Database::from_one(&MemDb::default(), Default::default()); + + let mut tx_pool = TxPoolService::new(db.clone()); + + // -------------- Test adding a valid transaction -------------- + + // Prepare the database by populating it with blocks + let block_data = new_block(None); + db.set_block_header(block_data.0, block_data.1.clone()); + db.set_latest_valid_block(block_data.0, block_data.1); + let (tx_reference_block_hash, block_header) = new_block(Some(block_data.0)); + db.set_block_header(tx_reference_block_hash, block_header.clone()); + db.set_latest_valid_block(tx_reference_block_hash, block_header); + + // Add the transaction to the service + let signed_ethexe_tx = generate_signed_ethexe_tx(tx_reference_block_hash); + assert!(tx_pool.validate(signed_ethexe_tx.clone()).is_ok()); + + // -------------- Test adding invalid transaction -------------- + + // Populate more blocks in db + let mut block_hash = tx_reference_block_hash; + for _ in 0..32 { + let block_data = new_block(Some(block_hash)); + db.set_block_header(block_data.0, block_data.1.clone()); + db.set_latest_valid_block(block_data.0, block_data.1); + block_hash = block_data.0; + } + + // Rotten block hash + let invalid_tx = generate_signed_ethexe_tx(tx_reference_block_hash); + let res = tx_pool.validate(invalid_tx.clone()); + assert!(res.is_err()); + let err_string = format!("{:?}", res.expect_err("checked")); + println!("{}", err_string); + assert!(err_string.contains("Reference block isn't within recent blocks window")); +} diff --git a/ethexe/tx-pool/src/validation.rs b/ethexe/tx-pool/src/validation.rs new file mode 100644 index 00000000000..6728e72b000 --- /dev/null +++ b/ethexe/tx-pool/src/validation.rs @@ -0,0 +1,255 @@ +// This file is part of Gear. +// +// Copyright (C) 2025 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Transactions validation. + +use crate::SignedOffchainTransaction; +use anyhow::{anyhow, Context, Result}; +use ethexe_db::Database; +use ethexe_signer::ToDigest; +use parity_scale_codec::Encode; + +// TODO #4424 + +/// Main transaction pool tx validator. +/// +/// Basically consumes a transaction and runs all the defined checks on it. +/// The checks are defined through the `with_*_check` methods. +pub(crate) struct TxValidator { + transaction: SignedOffchainTransaction, + db: Database, + signature_check: bool, + mortality_check: bool, + uniqueness_check: bool, +} + +impl TxValidator { + pub(crate) fn new(transaction: SignedOffchainTransaction, db: Database) -> Self { + Self { + transaction, + db, + signature_check: false, + mortality_check: false, + uniqueness_check: false, + } + } + + pub(crate) fn with_all_checks(self) -> Self { + self.with_signature_check() + .with_mortality_check() + .with_uniqueness_check() + } + + pub(crate) fn with_signature_check(mut self) -> Self { + self.signature_check = true; + self + } + + pub(crate) fn with_mortality_check(mut self) -> Self { + self.mortality_check = true; + self + } + + pub(crate) fn with_uniqueness_check(mut self) -> Self { + self.uniqueness_check = true; + self + } +} + +impl TxValidator { + /// Runs all stateful and stateless sync validators for the transaction. + pub(crate) fn validate(self) -> Result { + if self.signature_check { + self.check_signature()?; + } + + if self.mortality_check { + self.check_mortality()?; + } + + if self.uniqueness_check { + self.check_uniqueness()?; + } + + Ok(self.transaction) + } + + /// Validates transaction signature. + fn check_signature(&self) -> Result<()> { + let tx_digest = self.transaction.encode().to_digest(); + let signature = crate::tx_signature(&self.transaction)?; + + signature.verify_with_public_key_recover(tx_digest) + } + + /// Validates transaction mortality. + /// + /// Basically checks that transaction reference block hash is within the recent blocks window. + fn check_mortality(&self) -> Result<()> { + let block_hash = self.transaction.reference_block(); + + self.db + .check_within_recent_blocks(block_hash) + .context("Transaction mortality check failed") + } + + /// Validates transaction uniqueness. + /// + /// Basically checks that transaction is not already in the database. + fn check_uniqueness(&self) -> Result<()> { + let tx_hash = self.transaction.tx_hash(); + + // TODO #4505 + if self.db.get_offchain_transaction(tx_hash).is_none() { + Ok(()) + } else { + Err(anyhow!("Transaction already exists")) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tests; + use ethexe_db::{BlockMetaStorage, Database, MemDb}; + use gprimitives::H256; + + macro_rules! assert_ok { + ( $x:expr ) => { + assert!($x.is_ok()); + }; + } + + macro_rules! assert_err { + ( $x:expr ) => { + assert!($x.is_err()); + }; + } + + #[test] + fn test_signature_validation() { + let signed_transaction = tests::generate_signed_ethexe_tx(H256::random()); + let db = Database::from_one(&MemDb::default(), Default::default()); + let validator = TxValidator::new(signed_transaction, db).with_signature_check(); + assert_ok!(validator.validate()); + } + + #[test] + fn test_valid_mortality() { + let db = Database::from_one(&MemDb::default(), Default::default()); + + // Test valid mortality + let block_data = tests::new_block(None); + db.set_block_header(block_data.0, block_data.1.clone()); + db.set_latest_valid_block(block_data.0, block_data.1); + + let (block_hash, header) = tests::new_block(Some(block_data.0)); + db.set_block_header(block_hash, header.clone()); + db.set_latest_valid_block(block_hash, header); + + let signed_tx = tests::generate_signed_ethexe_tx(block_hash); + + let block_data = tests::new_block(Some(block_hash)); + db.set_block_header(block_data.0, block_data.1.clone()); + db.set_latest_valid_block(block_data.0, block_data.1); + + let tx_validator = TxValidator::new(signed_tx, db).with_mortality_check(); + + // println!("{:?}", tx_validator.validate()); + assert_ok!(tx_validator.validate()); + } + + #[test] + fn test_invalid_mortality_non_existent_block() { + let db = Database::from_one(&MemDb::default(), Default::default()); + let non_window_block_hash = H256::random(); + let invalid_transaction = tests::generate_signed_ethexe_tx(non_window_block_hash); + + let tx_validator = TxValidator::new(invalid_transaction, db).with_mortality_check(); + + assert_err!(tx_validator.validate()); + } + + #[test] + fn test_invalid_mortality_rotten_tx() { + let db = Database::from_one(&MemDb::default(), Default::default()); + + let (first_block_hash, first_block_header) = tests::new_block(None); + db.set_block_header(first_block_hash, first_block_header.clone()); + db.set_latest_valid_block(first_block_hash, first_block_header); + let (second_block_hash, second_block_header) = tests::new_block(Some(first_block_hash)); + db.set_block_header(second_block_hash, second_block_header.clone()); + db.set_latest_valid_block(second_block_hash, second_block_header); + + // Add more 30 blocks + let mut block_hash = second_block_hash; + for _ in 0..30 { + let block_data = tests::new_block(Some(block_hash)); + db.set_block_header(block_data.0, block_data.1.clone()); + db.set_latest_valid_block(block_data.0, block_data.1); + block_hash = block_data.0; + } + + let transaction1 = TxValidator::new( + tests::generate_signed_ethexe_tx(first_block_hash), + db.clone(), + ) + .with_mortality_check() + .validate() + .expect("internal error: transaction1 validation failed"); + + let transaction2 = TxValidator::new( + tests::generate_signed_ethexe_tx(second_block_hash), + db.clone(), + ) + .with_mortality_check() + .validate() + .expect("internal error: transaction2 validation failed"); + + // Adding a new block to the db, which should remove the first block from window + let block_data = tests::new_block(Some(block_hash)); + db.set_block_header(block_data.0, block_data.1.clone()); + db.set_latest_valid_block(block_data.0, block_data.1); + + // `db` is `Arc`, so no need to instantiate a new validator. + assert_err!(TxValidator::new(transaction1, db.clone()) + .with_mortality_check() + .validate()); + assert_ok!(TxValidator::new(transaction2, db.clone()) + .with_mortality_check() + .validate()); + } + + #[test] + fn test_uniqueness_validation() { + let db = Database::from_one(&MemDb::default(), Default::default()); + let transaction = tests::generate_signed_ethexe_tx(H256::random()); + + let transaction = TxValidator::new(transaction, db.clone()) + .with_uniqueness_check() + .validate() + .expect("internal error: uniqueness validation failed"); + + db.set_offchain_transaction(transaction.clone()); + + assert_err!(TxValidator::new(transaction, db.clone()) + .with_uniqueness_check() + .validate()); + } +}