Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Add new and improved ThinClient
Browse files Browse the repository at this point in the history
  • Loading branch information
garious committed Jun 22, 2020
1 parent 27d4204 commit d526216
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 40 deletions.
108 changes: 69 additions & 39 deletions runtime/src/bank_forks_client.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use crate::{bank::Bank, bank_forks::BankForks};
use futures::future::Future;
use futures::future::{self, Ready};
use futures::{
future::{self, Future, Ready},
prelude::stream,
};
use solana_sdk::{
bank_forks_client::BankForksRpc,
bank_forks_client::{BankForksRpc, BankForksRpcClient},
clock::Slot,
fee_calculator::FeeCalculator,
hash::Hash,
pubkey::Pubkey,
signature::Signature,
transaction::{self, Transaction},
};
use std::{
io,
pin::Pin,
sync::{
mpsc::{channel, Receiver, Sender},
Expand All @@ -18,7 +22,12 @@ use std::{
thread::Builder,
time::Duration,
};
use tarpc::context::Context;
use tarpc::{
client,
context::Context,
server::{self, Handler},
};

use tokio::time::delay_for;

#[derive(Clone)]
Expand Down Expand Up @@ -123,74 +132,95 @@ impl BankForksRpc for BankForksServer {
let status = poll_transaction_status(root_bank.clone(), signature, last_valid_slot);
Box::pin(status)
}

type GetBalanceFut = Ready<u64>;
fn get_balance(self, _: Context, pubkey: Pubkey) -> Self::GetBalanceFut {
let bank = self.bank_forks.root_bank();
future::ready(bank.get_balance(&pubkey))
}
}

pub fn start_local_server(bank_forks: &Arc<BankForks>) -> io::Result<BankForksRpcClient> {
let bank_forks_server = BankForksServer::new(bank_forks.clone());
let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
let server = server::new(server::Config::default())
.incoming(stream::once(future::ready(server_transport)))
.respond_with(bank_forks_server.serve());
tokio::spawn(server);

BankForksRpcClient::new(client::Config::default(), client_transport).spawn()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::genesis_utils::create_genesis_config;
use futures::prelude::*;
use solana_sdk::{
bank_forks_client::BankForksRpcClient, message::Message, pubkey::Pubkey, signature::Signer,
bank_forks_client::ThinClient, message::Message, pubkey::Pubkey, signature::Signer,
system_instruction,
};
use std::io;
use tarpc::{
client, context,
server::{self, Handler},
};
use tarpc::context;

#[tokio::test]
async fn test_bank_forks_rpc_client_transfer() -> io::Result<()> {
let (client_transport, server_transport) = tarpc::transport::channel::unbounded();

async fn test_bank_forks_rpc_client_send() -> io::Result<()> {
let genesis = create_genesis_config(10);
let bank = Bank::new(&genesis.genesis_config);
let bank_forks = Arc::new(BankForks::new(bank));
let bank_forks_server = BankForksServer::new(bank_forks);
let server = server::new(server::Config::default())
.incoming(stream::once(future::ready(server_transport)))
.respond_with(bank_forks_server.serve());
tokio::spawn(server);

let mut client =
BankForksRpcClient::new(client::Config::default(), client_transport).spawn()?;

let (recent_blockhash, _fee_calculator, last_valid_slot) =
client.get_recent_blockhash(context::current()).await?;
let bank_forks = Arc::new(BankForks::new(Bank::new(&genesis.genesis_config)));
let rpc_client = start_local_server(&bank_forks)?;
let mut thin_client = ThinClient::new(rpc_client);

let mint_pubkey = &genesis.mint_keypair.pubkey();
let bob_pubkey = Pubkey::new_rand();
let instruction = system_instruction::transfer(&mint_pubkey, &bob_pubkey, 1);
let message = Message::new_with_payer(&[instruction], Some(&mint_pubkey));
let transaction = Transaction::new(&[&genesis.mint_keypair], message, recent_blockhash);
let signature = client
.send_transaction(context::current(), transaction.clone())
let (signature, last_valid_slot) = thin_client
.send_message(&[&genesis.mint_keypair], message)
.await?;

let mut status = client
let rpc_client = &mut thin_client.rpc_client;
let mut status = rpc_client
.get_signature_status(context::current(), signature)
.await?;
assert_eq!(status, None, "process_transaction() called synchronously");

while status.is_none() {
let root_slot = client.get_root_slot(context::current()).await?;
let root_slot = rpc_client.get_root_slot(context::current()).await?;
if root_slot > last_valid_slot {
break;
}
delay_for(Duration::from_millis(100)).await;
status = client
status = rpc_client
.get_signature_status(context::current(), signature)
.await?;
}

assert_eq!(status, Some(Ok(())));
assert_eq!(
rpc_client
.get_balance(context::current(), bob_pubkey)
.await?,
1
);
Ok(())
}

// Same thing, but all server-side
// TODO: Why didn't this fail with a duplicate signature?
let status = client
.send_and_confirm_transaction(context::current(), transaction)
#[tokio::test]
async fn test_bank_forks_rpc_client_send_and_confirm() -> io::Result<()> {
let genesis = create_genesis_config(10);
let bank_forks = Arc::new(BankForks::new(Bank::new(&genesis.genesis_config)));
let rpc_client = start_local_server(&bank_forks)?;
let mut thin_client = ThinClient::new(rpc_client);

let bob_pubkey = Pubkey::new_rand();
let status = thin_client
.transfer(&genesis.mint_keypair, &bob_pubkey, 1)
.await?;
assert_eq!(status, Some(Ok(())));

assert_eq!(
thin_client
.rpc_client
.get_balance(context::current(), bob_pubkey)
.await?,
1
);
Ok(())
}
}
64 changes: 63 additions & 1 deletion sdk/src/bank_forks_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ use solana_sdk::{
clock::Slot,
fee_calculator::FeeCalculator,
hash::Hash,
signature::Signature,
message::Message,
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
signers::Signers,
system_instruction,
transaction::{self, Transaction},
};
use std::io;
use tarpc::context;

#[tarpc::service]
pub trait BankForksRpc {
Expand All @@ -15,6 +21,62 @@ pub trait BankForksRpc {
async fn send_and_confirm_transaction(
transaction: Transaction,
) -> Option<transaction::Result<()>>;
async fn get_balance(pubkey: Pubkey) -> u64;
}

pub struct ThinClient {
pub rpc_client: BankForksRpcClient,
}

impl ThinClient {
pub fn new(rpc_client: BankForksRpcClient) -> Self {
Self { rpc_client }
}

pub async fn send_message<S: Signers>(
&mut self,
signers: &S,
message: Message,
) -> io::Result<(Signature, u64)> {
let (recent_blockhash, _fee_calculator, last_valid_slot) = self
.rpc_client
.get_recent_blockhash(context::current())
.await?;
let transaction = Transaction::new(signers, message, recent_blockhash);
let signature = self
.rpc_client
.send_transaction(context::current(), transaction)
.await?;
Ok((signature, last_valid_slot))
}

pub async fn send_and_confirm_message<S: Signers>(
&mut self,
signers: &S,
message: Message,
) -> io::Result<Option<transaction::Result<()>>> {
let (recent_blockhash, _fee_calculator, _last_valid_slot) = self
.rpc_client
.get_recent_blockhash(context::current())
.await?;
let transaction = Transaction::new(signers, message, recent_blockhash);
self.rpc_client
.send_and_confirm_transaction(context::current(), transaction)
.await
}

pub async fn transfer(
&mut self,
from_keypair: &Keypair,
to_pubkey: &Pubkey,
lamports: u64,
) -> io::Result<Option<transaction::Result<()>>> {
let from_pubkey = from_keypair.pubkey();
let instruction = system_instruction::transfer(&from_pubkey, &to_pubkey, lamports);
let message = Message::new_with_payer(&[instruction], Some(&from_pubkey));
self.send_and_confirm_message(&[from_keypair], message)
.await
}
}

#[cfg(test)]
Expand Down

0 comments on commit d526216

Please sign in to comment.