From 8588820a7f9f0de1382397ea3b2007c62132c2ae Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 9 Jul 2020 16:14:30 -0600 Subject: [PATCH] Add BanksService to Validator --- Cargo.lock | 1 + core/src/cluster_info.rs | 3 +- core/src/validator.rs | 83 +++++++++------ local-cluster/src/local_cluster.rs | 9 +- runtime/src/banks_service.rs | 131 ++++++++++++++++-------- runtime/src/lib.rs | 1 + runtime/src/rpc_banks_service.rs | 90 ++++++++++++++++ runtime/src/send_transaction_service.rs | 6 +- tokens/Cargo.toml | 1 + tokens/src/commands.rs | 6 +- tokens/tests/commands.rs | 31 +++--- validator/src/main.rs | 15 ++- 12 files changed, 268 insertions(+), 109 deletions(-) create mode 100644 runtime/src/rpc_banks_service.rs diff --git a/Cargo.lock b/Cargo.lock index 852de743490904..91f26ec5e0bc4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4862,6 +4862,7 @@ dependencies = [ "serde", "solana-clap-utils", "solana-cli-config", + "solana-core", "solana-remote-wallet", "solana-runtime", "solana-sdk 1.3.0", diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 58db301fa87936..17d52cc57648d5 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -2421,7 +2421,8 @@ impl Node { let rpc_pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_pubsub_port); let rpc_banks_port = find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap(); - let rpc_banks_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_banks_port); + let rpc_banks_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_banks_port); let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index b26a9b2fb3b9b1..2f9c58d641d899 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -37,6 +37,7 @@ use solana_runtime::{ bank_forks::{BankForks, SnapshotConfig}, commitment::BlockCommitmentCache, hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, + rpc_banks_service::RpcBanksService, }; use solana_sdk::{ clock::Slot, @@ -138,7 +139,7 @@ impl ValidatorExit { pub struct Validator { pub id: Pubkey, validator_exit: Arc>>, - rpc_service: Option<(JsonRpcService, PubSubService)>, + rpc_service: Option<(JsonRpcService, PubSubService, RpcBanksService)>, transaction_status_service: Option, rewards_recorder_service: Option, gossip_service: GossipService, @@ -260,37 +261,46 @@ impl Validator { )); let rpc_override_health_check = Arc::new(AtomicBool::new(false)); - let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port, rpc_banks_port)| { - if ContactInfo::is_valid_address(&node.info.rpc) { - assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); - assert_eq!(rpc_port, node.info.rpc.port()); - assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port()); - assert_eq!(rpc_banks_port, node.info.rpc_banks.port()); - } else { - assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub)); - } - ( - JsonRpcService::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port), - config.rpc_config.clone(), - config.snapshot_config.clone(), - bank_forks.clone(), - block_commitment_cache.clone(), - blockstore.clone(), - cluster_info.clone(), - genesis_config.hash(), - ledger_path, - validator_exit.clone(), - config.trusted_validators.clone(), - rpc_override_health_check.clone(), - ), - PubSubService::new( - &subscriptions, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port), - &exit, - ), - ) - }); + let rpc_service = config + .rpc_ports + .map(|(rpc_port, rpc_pubsub_port, rpc_banks_port)| { + if ContactInfo::is_valid_address(&node.info.rpc) { + assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); + assert_eq!(rpc_port, node.info.rpc.port()); + assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port()); + assert_eq!(rpc_banks_port, node.info.rpc_banks.port()); + } else { + assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub)); + } + let tpu_address = cluster_info.my_contact_info().tpu; + ( + JsonRpcService::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port), + config.rpc_config.clone(), + config.snapshot_config.clone(), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore.clone(), + cluster_info.clone(), + genesis_config.hash(), + ledger_path, + validator_exit.clone(), + config.trusted_validators.clone(), + rpc_override_health_check.clone(), + ), + PubSubService::new( + &subscriptions, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port), + &exit, + ), + RpcBanksService::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_banks_port), + tpu_address, + &bank_forks, + &exit, + ), + ) + }); let (transaction_status_sender, transaction_status_service) = if rpc_service.is_some() && config.rpc_config.enable_rpc_transaction_history { @@ -549,9 +559,10 @@ impl Validator { pub fn join(self) -> Result<()> { self.poh_service.join()?; drop(self.poh_recorder); - if let Some((rpc_service, rpc_pubsub_service)) = self.rpc_service { + if let Some((rpc_service, rpc_pubsub_service, rpc_banks_service)) = self.rpc_service { rpc_service.join()?; rpc_pubsub_service.join()?; + rpc_banks_service.join()?; } if let Some(transaction_status_service) = self.transaction_status_service { transaction_status_service.join()?; @@ -833,7 +844,11 @@ impl TestValidator { let leader_voting_keypair = Arc::new(voting_keypair); let config = ValidatorConfig { - rpc_ports: Some((node.info.rpc.port(), node.info.rpc_pubsub.port(), node.info.rpc_banks.port())), + rpc_ports: Some(( + node.info.rpc.port(), + node.info.rpc_pubsub.port(), + node.info.rpc_banks.port(), + )), ..ValidatorConfig::default() }; let node = Validator::new( diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index d56debf63516eb..559e2f09a75f84 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -172,6 +172,7 @@ impl LocalCluster { leader_config.rpc_ports = Some(( leader_node.info.rpc.port(), leader_node.info.rpc_pubsub.port(), + leader_node.info.rpc_banks.port(), )); leader_config.account_paths = vec![leader_ledger_path.join("accounts")]; let leader_server = Validator::new( @@ -299,6 +300,7 @@ impl LocalCluster { config.rpc_ports = Some(( validator_node.info.rpc.port(), validator_node.info.rpc_pubsub.port(), + validator_node.info.rpc_banks.port(), )); let voting_keypair = Arc::new(voting_keypair); config.account_paths = vec![ledger_path.join("accounts")]; @@ -546,8 +548,11 @@ impl Cluster for LocalCluster { // Update the stored ContactInfo for this node let node = Node::new_localhost_with_pubkey(&pubkey); cluster_validator_info.info.contact_info = node.info.clone(); - cluster_validator_info.config.rpc_ports = - Some((node.info.rpc.port(), node.info.rpc_pubsub.port())); + cluster_validator_info.config.rpc_ports = Some(( + node.info.rpc.port(), + node.info.rpc_pubsub.port(), + node.info.rpc_banks.port(), + )); let entry_point_info = { if *pubkey == self.entry_point_info.id { diff --git a/runtime/src/banks_service.rs b/runtime/src/banks_service.rs index 06508bc099bd7d..e1d7072029df25 100644 --- a/runtime/src/banks_service.rs +++ b/runtime/src/banks_service.rs @@ -1,11 +1,16 @@ -use crate::{bank::Bank, bank_forks::BankForks}; +use crate::{ + bank::Bank, + bank_forks::BankForks, + send_transaction_service::{SendTransactionService, TransactionInfo}, +}; +use bincode::{deserialize, serialize}; use futures::{ future, prelude::stream::{self, StreamExt}, }; use solana_sdk::{ account::Account, - banks_client::{start_tcp_client, Banks, BanksClient}, + banks_client::{Banks, BanksClient}, clock::Slot, commitment_config::CommitmentLevel, fee_calculator::FeeCalculator, @@ -16,9 +21,11 @@ use solana_sdk::{ }; use std::{ io, + net::SocketAddr, sync::{ + atomic::AtomicBool, mpsc::{channel, Receiver, Sender}, - Arc, + Arc, RwLock, }, thread::Builder, time::Duration, @@ -35,8 +42,8 @@ use tokio_serde::formats::Bincode; #[derive(Clone)] struct BanksService { - bank_forks: Arc, - transaction_sender: Sender, + bank_forks: Arc>, + transaction_sender: Sender, } impl BanksService { @@ -45,8 +52,8 @@ impl BanksService { /// a bank in the given BankForks. Otherwise, the receiver should /// forward them to a validator in the leader schedule. fn new_with_sender( - bank_forks: Arc, - transaction_sender: Sender, + bank_forks: Arc>, + transaction_sender: Sender, ) -> Self { Self { bank_forks, @@ -54,20 +61,24 @@ impl BanksService { } } - fn run(bank: &Bank, transaction_receiver: Receiver) { - while let Ok(tx) = transaction_receiver.recv() { - let mut transactions = vec![tx]; - while let Ok(tx) = transaction_receiver.try_recv() { - transactions.push(tx); + fn run(bank: &Bank, transaction_receiver: Receiver) { + while let Ok(info) = transaction_receiver.recv() { + let mut transaction_infos = vec![info]; + while let Ok(info) = transaction_receiver.try_recv() { + transaction_infos.push(info); } + let transactions: Vec<_> = transaction_infos + .into_iter() + .map(|info| deserialize(&info.wire_transaction).unwrap()) + .collect(); let _ = bank.process_transactions(&transactions); } } /// Useful for unit-testing - fn new(bank_forks: Arc) -> Self { + fn new(bank_forks: Arc>) -> Self { let (transaction_sender, transaction_receiver) = channel(); - let bank = bank_forks.working_bank(); + let bank = bank_forks.read().unwrap().working_bank(); Builder::new() .name("solana-bank-forks-client".to_string()) .spawn(move || Self::run(&bank, transaction_receiver)) @@ -77,21 +88,21 @@ impl BanksService { fn slot(&self, commitment: CommitmentLevel) -> Slot { match commitment { - CommitmentLevel::Recent => self.bank_forks.highest_slot(), - CommitmentLevel::Root => self.bank_forks.root(), + CommitmentLevel::Recent => self.bank_forks.read().unwrap().highest_slot(), + CommitmentLevel::Root => self.bank_forks.read().unwrap().root(), CommitmentLevel::Single | CommitmentLevel::SingleGossip => { //TODO: self.block_commitment_cache.highest_confirmed_slot() todo!(); } CommitmentLevel::Max => { //TODO: self.block_commitment_cache.largest_confirmed_root() - self.bank_forks.root() + self.bank_forks.read().unwrap().root() } } } - fn bank(&self, commitment: CommitmentLevel) -> &Arc { - &self.bank_forks[self.slot(commitment)] + fn bank(&self, commitment: CommitmentLevel) -> Arc { + self.bank_forks.read().unwrap()[self.slot(commitment)].clone() } } @@ -125,7 +136,18 @@ impl Banks for BanksService { } async fn send_transaction(self, _: Context, transaction: Transaction) { - self.transaction_sender.send(transaction).unwrap(); + let blockhash = &transaction.message.recent_blockhash; + let last_valid_slot = self + .bank_forks + .read() + .unwrap() + .root_bank() + .get_blockhash_last_valid_slot(&blockhash) + .unwrap(); + let signature = transaction.signatures.get(0).cloned().unwrap_or_default(); + let info = + TransactionInfo::new(signature, serialize(&transaction).unwrap(), last_valid_slot); + self.transaction_sender.send(info).unwrap(); } async fn get_signature_status_with_commitment( @@ -162,11 +184,18 @@ impl Banks for BanksService { commitment: CommitmentLevel, ) -> Option> { let blockhash = &transaction.message.recent_blockhash; - let root_bank = self.bank_forks.root_bank(); - let last_valid_slot = root_bank.get_blockhash_last_valid_slot(&blockhash).unwrap(); + let last_valid_slot = self + .bank_forks + .read() + .unwrap() + .root_bank() + .get_blockhash_last_valid_slot(&blockhash) + .unwrap(); let signature = transaction.signatures.get(0).cloned().unwrap_or_default(); - self.transaction_sender.send(transaction).unwrap(); - let bank = self.bank(commitment).clone(); + let info = + TransactionInfo::new(signature, serialize(&transaction).unwrap(), last_valid_slot); + self.transaction_sender.send(info).unwrap(); + let bank = self.bank(commitment); poll_transaction_status(bank, signature, last_valid_slot).await } @@ -176,12 +205,15 @@ impl Banks for BanksService { pubkey: Pubkey, _commitment: CommitmentLevel, ) -> Option { - let bank = self.bank_forks.root_bank(); - bank.get_account(&pubkey) + self.bank_forks + .read() + .unwrap() + .root_bank() + .get_account(&pubkey) } } -pub async fn start_local_service(bank_forks: &Arc) -> io::Result { +pub async fn start_local_service(bank_forks: &Arc>) -> io::Result { let banks_service = BanksService::new(bank_forks.clone()); let (client_transport, server_transport) = transport::channel::unbounded(); let server = server::new(server::Config::default()) @@ -193,32 +225,41 @@ pub async fn start_local_service(bank_forks: &Arc) -> io::Result) -> io::Result { - let incoming = tcp::listen(&"localhost:0", Bincode::default) +pub async fn start_tcp_service( + listen_addr: SocketAddr, + tpu_addr: SocketAddr, + bank_forks: Arc>, +) -> io::Result<()> { + // Note: These settings are copied straight from the tarpc example. + let service = tcp::listen(listen_addr, Bincode::default) .await? // Ignore accept errors. - .filter_map(|r| future::ready(r.ok())); - - let addr = incoming.get_ref().local_addr(); - - // Note: These settings are copied straight from the tarpc example. - let service = incoming + .filter_map(|r| future::ready(r.ok())) .map(server::BaseChannel::with_defaults) // Limit channels to 1 per IP. .max_channels_per_key(1, |t| t.as_ref().peer_addr().unwrap().ip()) // serve is generated by the service attribute. It takes as input any type implementing // the generated Banks trait. - .map(move |channel| { - let service = BanksService::new(bank_forks.clone()); - channel.respond_with(service.serve()).execute() + .map(move |chan| { + let (sender, receiver) = channel(); + let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); + + SendTransactionService::new( + tpu_addr, + &bank_forks, + &exit_send_transaction_service, + receiver, + ); + + let service = BanksService::new_with_sender(bank_forks.clone(), sender); + chan.respond_with(service.serve()).execute() }) // Max 10 channels. .buffer_unordered(10) .for_each(|_| async {}); - tokio::spawn(service); - - start_tcp_client(addr).await + service.await; + Ok(()) } #[cfg(test)] @@ -239,7 +280,9 @@ mod tests { // `runtime.block_on()` just once, to run all the async code. let genesis = create_genesis_config(10); - let bank_forks = Arc::new(BankForks::new(Bank::new(&genesis.genesis_config))); + let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::new( + &genesis.genesis_config, + )))); let bob_pubkey = Pubkey::new_rand(); let mint_pubkey = genesis.mint_keypair.pubkey(); @@ -273,7 +316,9 @@ mod tests { // server-side functionality is available to the client. let genesis = create_genesis_config(10); - let bank_forks = Arc::new(BankForks::new(Bank::new(&genesis.genesis_config))); + let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::new( + &genesis.genesis_config, + )))); let mint_pubkey = &genesis.mint_keypair.pubkey(); let bob_pubkey = Pubkey::new_rand(); diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index b87c67ef7b3636..f41e8ce3c48848 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -22,6 +22,7 @@ pub mod message_processor; mod native_loader; pub mod nonce_utils; pub mod rent_collector; +pub mod rpc_banks_service; pub mod send_transaction_service; pub mod serde_snapshot; pub mod snapshot_package; diff --git a/runtime/src/rpc_banks_service.rs b/runtime/src/rpc_banks_service.rs new file mode 100644 index 00000000000000..75cc653a2730b3 --- /dev/null +++ b/runtime/src/rpc_banks_service.rs @@ -0,0 +1,90 @@ +//! The `rpc_banks_service` module implements the Solana Banks RPC API. + +use crate::{bank_forks::BankForks, banks_service::start_tcp_service}; +use futures::{future::FutureExt, pin_mut, prelude::stream::StreamExt, select}; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{self, Builder, JoinHandle}, +}; +use tokio::{ + runtime::Runtime, + time::{self, Duration}, +}; + +pub struct RpcBanksService { + thread_hdl: JoinHandle<()>, +} + +/// Run the TCP service until `exit` is set to true +async fn start_abortable_tcp_service( + listen_addr: SocketAddr, + tpu_addr: SocketAddr, + bank_forks: Arc>, + exit: Arc, +) { + let service = start_tcp_service(listen_addr, tpu_addr, bank_forks.clone()).fuse(); + let interval = time::interval(Duration::from_millis(100)).fuse(); + pin_mut!(service, interval); + loop { + select! { + _ = service => {}, + _ = interval.select_next_some() => { + if exit.load(Ordering::Relaxed) { + break; + } + } + } + } +} + +impl RpcBanksService { + fn run( + listen_addr: SocketAddr, + tpu_addr: SocketAddr, + bank_forks: Arc>, + exit: Arc, + ) { + let service = start_abortable_tcp_service(listen_addr, tpu_addr, bank_forks, exit); + Runtime::new().unwrap().block_on(service); + } + + pub fn new( + listen_addr: SocketAddr, + tpu_addr: SocketAddr, + bank_forks: &Arc>, + exit: &Arc, + ) -> Self { + let bank_forks = bank_forks.clone(); + let exit = exit.clone(); + let thread_hdl = Builder::new() + .name("solana-rpc-banks".to_string()) + .spawn(move || Self::run(listen_addr, tpu_addr, bank_forks, exit)) + .unwrap(); + + Self { thread_hdl } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::bank::Bank; + + #[test] + fn test_rpc_banks_service_exit() { + let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::default()))); + let exit = Arc::new(AtomicBool::new(false)); + let addr = "127.0.0.1:0".parse().unwrap(); + let service = RpcBanksService::new(addr, addr, &bank_forks, &exit); + exit.store(true, Ordering::Relaxed); + service.join().unwrap(); + } +} diff --git a/runtime/src/send_transaction_service.rs b/runtime/src/send_transaction_service.rs index 0f24fe8a774745..bab219b0e4f7a4 100644 --- a/runtime/src/send_transaction_service.rs +++ b/runtime/src/send_transaction_service.rs @@ -22,9 +22,9 @@ pub struct SendTransactionService { } pub struct TransactionInfo { - signature: Signature, - wire_transaction: Vec, - last_valid_slot: Slot, + pub signature: Signature, + pub wire_transaction: Vec, + pub last_valid_slot: Slot, } impl TransactionInfo { diff --git a/tokens/Cargo.toml b/tokens/Cargo.toml index 6ca345541ae677..145cc151cd4801 100644 --- a/tokens/Cargo.toml +++ b/tokens/Cargo.toml @@ -31,3 +31,4 @@ tokio = "0.2" [dev-dependencies] solana-runtime = { path = "../runtime", version = "1.3.0" } +solana-core = { path = "../core", version = "1.3.0" } diff --git a/tokens/src/commands.rs b/tokens/src/commands.rs index ad3d804c2acac9..5af76bfd364177 100644 --- a/tokens/src/commands.rs +++ b/tokens/src/commands.rs @@ -627,13 +627,13 @@ mod tests { use super::*; use solana_runtime::{bank::Bank, bank_forks::BankForks, banks_service::start_local_service}; use solana_sdk::genesis_config::create_genesis_config; - use std::sync::Arc; + use std::sync::{Arc, RwLock}; use tokio::runtime::Runtime; #[test] fn test_process_distribute_tokens() { let (genesis_config, sender_keypair) = create_genesis_config(sol_to_lamports(9_000_000.0)); - let bank_forks = Arc::new(BankForks::new(Bank::new(&genesis_config))); + let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::new(&genesis_config)))); Runtime::new().unwrap().block_on(async { let banks_client = start_local_service(&bank_forks).await.unwrap(); let thin_client = ThinClient::new(banks_client, false); @@ -644,7 +644,7 @@ mod tests { #[test] fn test_process_distribute_stake() { let (genesis_config, sender_keypair) = create_genesis_config(sol_to_lamports(9_000_000.0)); - let bank_forks = Arc::new(BankForks::new(Bank::new(&genesis_config))); + let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::new(&genesis_config)))); Runtime::new().unwrap().block_on(async { let banks_client = start_local_service(&bank_forks).await.unwrap(); let thin_client = ThinClient::new(banks_client, false); diff --git a/tokens/tests/commands.rs b/tokens/tests/commands.rs index be127fe49d7aa2..ecc17bff75ef64 100644 --- a/tokens/tests/commands.rs +++ b/tokens/tests/commands.rs @@ -1,27 +1,22 @@ -use solana_runtime::{ - bank::Bank, bank_forks::BankForks, banks_service::start_local_tcp_service, - genesis_utils::create_genesis_config, -}; -use solana_sdk::native_token::sol_to_lamports; +use solana_core::validator::{TestValidator, TestValidatorOptions}; +use solana_sdk::{banks_client::start_tcp_client, native_token::sol_to_lamports}; use solana_tokens::{ commands::test_process_distribute_tokens_with_client, thin_client::ThinClient, }; -use std::sync::Arc; use tokio::runtime::Runtime; #[test] fn test_process_distribute_with_rpc_client() { - let genesis = create_genesis_config(sol_to_lamports(9_000_000.0)); - let bank_forks = Arc::new(BankForks::new(Bank::new(&genesis.genesis_config))); - - let mut runtime = Runtime::new().unwrap(); - let banks_client = runtime - .block_on(start_local_tcp_service(bank_forks)) - .unwrap(); + let validator = TestValidator::run_with_options(TestValidatorOptions { + mint_lamports: sol_to_lamports(9_000_000.0), + ..TestValidatorOptions::default() + }); - let thin_client = ThinClient::new(banks_client, false); - runtime.block_on(test_process_distribute_tokens_with_client( - thin_client, - genesis.mint_keypair, - )); + Runtime::new().unwrap().block_on(async { + let banks_client = start_tcp_client(validator.leader_data.rpc_banks) + .await + .unwrap(); + let thin_client = ThinClient::new(banks_client, false); + test_process_distribute_tokens_with_client(thin_client, validator.alice).await + }); } diff --git a/validator/src/main.rs b/validator/src/main.rs index 619114d9ddd39a..0e92cf9f25934f 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -599,7 +599,7 @@ pub fn main() { .value_name("PORT") .takes_value(true) .validator(port_validator) - .help("Use this port for JSON RPC, and the next port for the RPC websocket"), + .help("Use this port for JSON RPC, the next port for the RPC websocket, and the following for the RPC banks API"), ) .arg( Arg::with_name("private_rpc") @@ -950,7 +950,7 @@ pub fn main() { }, rpc_ports: value_t!(matches, "rpc_port", u16) .ok() - .map(|rpc_port| (rpc_port, rpc_port + 1)), + .map(|rpc_port| (rpc_port, rpc_port + 1, rpc_port + 2)), voting_disabled: matches.is_present("no_voting"), wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(), trusted_validators, @@ -1168,9 +1168,10 @@ pub fn main() { ); if !private_rpc { - if let Some((rpc_port, rpc_pubsub_port)) = validator_config.rpc_ports { + if let Some((rpc_port, rpc_pubsub_port, rpc_banks_port)) = validator_config.rpc_ports { node.info.rpc = SocketAddr::new(node.info.gossip.ip(), rpc_port); node.info.rpc_pubsub = SocketAddr::new(node.info.gossip.ip(), rpc_pubsub_port); + node.info.rpc_banks = SocketAddr::new(node.info.gossip.ip(), rpc_banks_port); } } @@ -1189,8 +1190,12 @@ pub fn main() { let mut tcp_listeners = vec![]; if !private_rpc { - if let Some((rpc_port, rpc_pubsub_port)) = validator_config.rpc_ports { - for (purpose, port) in &[("RPC", rpc_port), ("RPC pubsub", rpc_pubsub_port)] { + if let Some((rpc_port, rpc_pubsub_port, rpc_banks_port)) = validator_config.rpc_ports { + for (purpose, port) in &[ + ("RPC", rpc_port), + ("RPC pubsub", rpc_pubsub_port), + ("RPC banks", rpc_banks_port), + ] { tcp_listeners.push(( *port, TcpListener::bind(&SocketAddr::from((rpc_bind_address, *port)))