Move generic rpc_client functions from wallet/ to client/

This commit is contained in:
Michael Vines
2019-03-16 17:17:44 -07:00
parent 3ad019a176
commit c498775a3d
10 changed files with 363 additions and 423 deletions

View File

@ -1,4 +1,3 @@
use bincode::serialize;
use bs58;
use chrono::prelude::*;
use clap::ArgMatches;
@ -9,7 +8,6 @@ use solana_budget_api;
use solana_budget_api::budget_transaction::BudgetTransaction;
use solana_client::rpc_client::{get_rpc_request_str, RpcClient};
use solana_client::rpc_request::RpcRequest;
use solana_client::rpc_signature_status::RpcSignatureStatus;
#[cfg(not(test))]
use solana_drone::drone::request_airdrop_transaction;
use solana_drone::drone::DRONE_PORT;
@ -22,16 +20,12 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::rpc_port::DEFAULT_RPC_PORT;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, NUM_TICKS_PER_SECOND};
use solana_sdk::transaction::Transaction;
use solana_vote_api::vote_instruction::VoteInstruction;
use solana_vote_api::vote_transaction::VoteTransaction;
use std::fs::File;
use std::io::Read;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::str::FromStr;
use std::thread::sleep;
use std::time::Duration;
use std::{error, fmt, mem};
const USERDATA_CHUNK_SIZE: usize = 256;
@ -439,7 +433,7 @@ fn process_configure_staking(
delegate_option: Option<Pubkey>,
authorized_voter_option: Option<Pubkey>,
) -> ProcessResult {
let recent_blockhash = get_recent_blockhash(&rpc_client)?;
let recent_blockhash = rpc_client.get_recent_blockhash()?;
let mut ixs = vec![];
if let Some(delegate_id) = delegate_option {
ixs.push(VoteInstruction::new_delegate_stake(
@ -455,7 +449,7 @@ fn process_configure_staking(
}
let mut tx = Transaction::new(ixs);
tx.sign(&[&config.id], recent_blockhash);
let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?;
let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?;
Ok(signature_str.to_string())
}
@ -465,10 +459,10 @@ fn process_create_staking(
voting_account_id: &Pubkey,
lamports: u64,
) -> ProcessResult {
let recent_blockhash = get_recent_blockhash(&rpc_client)?;
let recent_blockhash = rpc_client.get_recent_blockhash()?;
let mut tx =
VoteTransaction::new_account(&config.id, voting_account_id, recent_blockhash, lamports, 0);
let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?;
let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?;
Ok(signature_str.to_string())
}
@ -486,7 +480,7 @@ fn process_deploy(
}
}
let blockhash = get_recent_blockhash(&rpc_client)?;
let blockhash = rpc_client.get_recent_blockhash()?;
let program_id = Keypair::new();
let mut file = File::open(program_location).map_err(|err| {
WalletError::DynamicProgramError(
@ -510,9 +504,11 @@ fn process_deploy(
0,
);
trace!("Creating program account");
send_and_confirm_transaction(&rpc_client, &mut tx, &config.id).map_err(|_| {
WalletError::DynamicProgramError("Program allocate space failed".to_string())
})?;
rpc_client
.send_and_confirm_transaction(&mut tx, &config.id)
.map_err(|_| {
WalletError::DynamicProgramError("Program allocate space failed".to_string())
})?;
trace!("Writing program data");
let write_transactions: Vec<_> = program_data
@ -529,13 +525,15 @@ fn process_deploy(
)
})
.collect();
send_and_confirm_transactions(&rpc_client, write_transactions, &program_id)?;
rpc_client.send_and_confirm_transactions(write_transactions, &program_id)?;
trace!("Finalizing program account");
let mut tx = LoaderTransaction::new_finalize(&program_id, &bpf_loader::id(), blockhash, 0);
send_and_confirm_transaction(&rpc_client, &mut tx, &program_id).map_err(|_| {
WalletError::DynamicProgramError("Program finalize transaction failed".to_string())
})?;
rpc_client
.send_and_confirm_transaction(&mut tx, &program_id)
.map_err(|_| {
WalletError::DynamicProgramError("Program finalize transaction failed".to_string())
})?;
Ok(json!({
"programId": format!("{}", program_id.pubkey()),
@ -553,11 +551,11 @@ fn process_pay(
witnesses: &Option<Vec<Pubkey>>,
cancelable: Option<Pubkey>,
) -> ProcessResult {
let blockhash = get_recent_blockhash(&rpc_client)?;
let blockhash = rpc_client.get_recent_blockhash()?;
if timestamp == None && *witnesses == None {
let mut tx = SystemTransaction::new_move(&config.id, to, lamports, blockhash, 0);
let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?;
let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?;
Ok(signature_str.to_string())
} else if *witnesses == None {
let dt = timestamp.unwrap();
@ -579,7 +577,7 @@ fn process_pay(
lamports,
blockhash,
);
let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?;
let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?;
Ok(json!({
"signature": signature_str,
@ -587,7 +585,7 @@ fn process_pay(
})
.to_string())
} else if timestamp == None {
let blockhash = get_recent_blockhash(&rpc_client)?;
let blockhash = rpc_client.get_recent_blockhash()?;
let witness = if let Some(ref witness_vec) = *witnesses {
witness_vec[0]
@ -609,7 +607,7 @@ fn process_pay(
lamports,
blockhash,
);
let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?;
let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?;
Ok(json!({
"signature": signature_str,
@ -622,10 +620,10 @@ fn process_pay(
}
fn process_cancel(rpc_client: &RpcClient, config: &WalletConfig, pubkey: &Pubkey) -> ProcessResult {
let blockhash = get_recent_blockhash(&rpc_client)?;
let blockhash = rpc_client.get_recent_blockhash()?;
let mut tx =
BudgetTransaction::new_signature(&config.id, pubkey, &config.id.pubkey(), blockhash);
let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?;
let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?;
Ok(signature_str.to_string())
}
@ -655,10 +653,10 @@ fn process_time_elapsed(
request_and_confirm_airdrop(&rpc_client, &drone_addr, &config.id.pubkey(), 1)?;
}
let blockhash = get_recent_blockhash(&rpc_client)?;
let blockhash = rpc_client.get_recent_blockhash()?;
let mut tx = BudgetTransaction::new_timestamp(&config.id, pubkey, to, dt, blockhash);
let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?;
let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?;
Ok(signature_str.to_string())
}
@ -676,9 +674,9 @@ fn process_witness(
request_and_confirm_airdrop(&rpc_client, &drone_addr, &config.id.pubkey(), 1)?;
}
let blockhash = get_recent_blockhash(&rpc_client)?;
let blockhash = rpc_client.get_recent_blockhash()?;
let mut tx = BudgetTransaction::new_signature(&config.id, pubkey, to, blockhash);
let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?;
let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?;
Ok(signature_str.to_string())
}
@ -772,215 +770,6 @@ pub fn process_command(config: &WalletConfig) -> ProcessResult {
}
}
fn get_recent_blockhash(rpc_client: &RpcClient) -> Result<Hash, Box<dyn error::Error>> {
let result = rpc_client.retry_make_rpc_request(&RpcRequest::GetRecentBlockhash, None, 5)?;
if result.as_str().is_none() {
Err(WalletError::RpcRequestError(
"Received bad blockhash".to_string(),
))?
}
let blockhash_str = result.as_str().unwrap();
let blockhash_vec = bs58::decode(blockhash_str)
.into_vec()
.map_err(|_| WalletError::RpcRequestError("Received bad blockhash".to_string()))?;
Ok(Hash::new(&blockhash_vec))
}
fn get_next_blockhash(
rpc_client: &RpcClient,
previous_blockhash: &Hash,
) -> Result<Hash, Box<dyn error::Error>> {
let mut next_blockhash_retries = 3;
loop {
let next_blockhash = get_recent_blockhash(rpc_client)?;
if cfg!(not(test)) {
if next_blockhash != *previous_blockhash {
return Ok(next_blockhash);
}
} else {
// When using MockRpcClient, get_recent_blockhash() returns a constant value
return Ok(next_blockhash);
}
if next_blockhash_retries == 0 {
Err(WalletError::RpcRequestError(
format!(
"Unable to fetch new blockhash, blockhash stuck at {:?}",
next_blockhash
)
.to_string(),
))?;
}
next_blockhash_retries -= 1;
// Retry ~twice during a slot
sleep(Duration::from_millis(
500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND,
));
}
}
fn send_transaction(
rpc_client: &RpcClient,
transaction: &Transaction,
) -> Result<String, Box<dyn error::Error>> {
let serialized = serialize(transaction).unwrap();
let params = json!([serialized]);
let signature =
rpc_client.retry_make_rpc_request(&RpcRequest::SendTransaction, Some(params), 5)?;
if signature.as_str().is_none() {
Err(WalletError::RpcRequestError(
"Received result of an unexpected type".to_string(),
))?
}
Ok(signature.as_str().unwrap().to_string())
}
fn confirm_transaction(
rpc_client: &RpcClient,
signature: &str,
) -> Result<RpcSignatureStatus, Box<dyn error::Error>> {
let params = json!([signature.to_string()]);
let signature_status =
rpc_client.retry_make_rpc_request(&RpcRequest::GetSignatureStatus, Some(params), 5)?;
if let Some(status) = signature_status.as_str() {
let rpc_status = RpcSignatureStatus::from_str(status).map_err(|_| {
WalletError::RpcRequestError("Unable to parse signature status".to_string())
})?;
Ok(rpc_status)
} else {
Err(WalletError::RpcRequestError(
"Received result of an unexpected type".to_string(),
))?
}
}
fn send_and_confirm_transaction<T: KeypairUtil>(
rpc_client: &RpcClient,
transaction: &mut Transaction,
signer: &T,
) -> Result<String, Box<dyn error::Error>> {
let mut send_retries = 5;
loop {
let mut status_retries = 4;
let signature_str = send_transaction(rpc_client, transaction)?;
let status = loop {
let status = confirm_transaction(rpc_client, &signature_str)?;
if status == RpcSignatureStatus::SignatureNotFound {
status_retries -= 1;
if status_retries == 0 {
break status;
}
} else {
break status;
}
if cfg!(not(test)) {
// Retry ~twice during a slot
sleep(Duration::from_millis(
500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND,
));
}
};
match status {
RpcSignatureStatus::AccountInUse | RpcSignatureStatus::SignatureNotFound => {
// Fetch a new blockhash and re-sign the transaction before sending it again
resign_transaction(rpc_client, transaction, signer)?;
send_retries -= 1;
}
RpcSignatureStatus::Confirmed => {
return Ok(signature_str);
}
_ => {
send_retries = 0;
}
}
if send_retries == 0 {
Err(WalletError::RpcRequestError(format!(
"Transaction {:?} failed: {:?}",
signature_str, status
)))?;
}
}
}
fn send_and_confirm_transactions(
rpc_client: &RpcClient,
mut transactions: Vec<Transaction>,
signer: &Keypair,
) -> Result<(), Box<dyn error::Error>> {
let mut send_retries = 5;
loop {
let mut status_retries = 4;
// Send all transactions
let mut transactions_signatures = vec![];
for transaction in transactions {
if cfg!(not(test)) {
// Delay ~1 tick between write transactions in an attempt to reduce AccountInUse errors
// since all the write transactions modify the same program account
sleep(Duration::from_millis(1000 / NUM_TICKS_PER_SECOND));
}
let signature = send_transaction(&rpc_client, &transaction).ok();
transactions_signatures.push((transaction, signature))
}
// Collect statuses for all the transactions, drop those that are confirmed
while status_retries > 0 {
status_retries -= 1;
if cfg!(not(test)) {
// Retry ~twice during a slot
sleep(Duration::from_millis(
500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND,
));
}
transactions_signatures = transactions_signatures
.into_iter()
.filter(|(_transaction, signature)| {
if let Some(signature) = signature {
if let Ok(status) = confirm_transaction(rpc_client, &signature) {
return status != RpcSignatureStatus::Confirmed;
}
}
true
})
.collect();
if transactions_signatures.is_empty() {
return Ok(());
}
}
if send_retries == 0 {
Err(WalletError::RpcRequestError(
"Transactions failed".to_string(),
))?;
}
send_retries -= 1;
// Re-sign any failed transactions with a new blockhash and retry
let blockhash =
get_next_blockhash(rpc_client, &transactions_signatures[0].0.recent_blockhash)?;
transactions = transactions_signatures
.into_iter()
.map(|(mut transaction, _)| {
transaction.sign(&[signer], blockhash);
transaction
})
.collect();
}
}
fn resign_transaction<T: KeypairUtil>(
rpc_client: &RpcClient,
tx: &mut Transaction,
signer_key: &T,
) -> Result<(), Box<dyn error::Error>> {
let blockhash = get_next_blockhash(rpc_client, &tx.recent_blockhash)?;
tx.sign(&[signer_key], blockhash);
Ok(())
}
// Quick and dirty Keypair that assumes the client will do retries but not update the
// blockhash. If the client updates the blockhash, the signature will be invalid.
// TODO: Parse `msg` and use that data to make a new airdrop request.
@ -1025,10 +814,10 @@ pub fn request_and_confirm_airdrop(
to_pubkey: &Pubkey,
lamports: u64,
) -> Result<(), Box<dyn error::Error>> {
let blockhash = get_recent_blockhash(rpc_client)?;
let blockhash = rpc_client.get_recent_blockhash()?;
let keypair = DroneKeypair::new_keypair(drone_addr, to_pubkey, lamports, blockhash)?;
let mut tx = keypair.airdrop_transaction();
send_and_confirm_transaction(rpc_client, &mut tx, &keypair)?;
rpc_client.send_and_confirm_transaction(&mut tx, &keypair)?;
Ok(())
}
@ -1037,8 +826,7 @@ mod tests {
use super::*;
use clap::{App, Arg, ArgGroup, SubCommand};
use serde_json::Value;
use solana::socketaddr;
use solana_client::mock_rpc_client_request::{PUBKEY, SIGNATURE};
use solana_client::mock_rpc_client_request::SIGNATURE;
use solana_sdk::signature::{gen_keypair_file, read_keypair, read_pkcs8, Keypair, KeypairUtil};
use std::fs;
use std::net::{Ipv4Addr, SocketAddr};
@ -1705,115 +1493,4 @@ mod tests {
fs::remove_file(&outfile).unwrap();
assert!(!Path::new(&outfile).exists());
}
#[test]
fn test_wallet_get_recent_blockhash() {
let rpc_client = RpcClient::new_mock("succeeds".to_string());
let vec = bs58::decode(PUBKEY).into_vec().unwrap();
let expected_blockhash = Hash::new(&vec);
let blockhash = get_recent_blockhash(&rpc_client);
assert_eq!(blockhash.unwrap(), expected_blockhash);
let rpc_client = RpcClient::new_mock("fails".to_string());
let blockhash = get_recent_blockhash(&rpc_client);
assert!(blockhash.is_err());
}
#[test]
fn test_wallet_send_transaction() {
let rpc_client = RpcClient::new_mock("succeeds".to_string());
let key = Keypair::new();
let to = Keypair::new().pubkey();
let blockhash = Hash::default();
let tx = SystemTransaction::new_account(&key, &to, 50, blockhash, 0);
let signature = send_transaction(&rpc_client, &tx);
assert_eq!(signature.unwrap(), SIGNATURE.to_string());
let rpc_client = RpcClient::new_mock("fails".to_string());
let signature = send_transaction(&rpc_client, &tx);
assert!(signature.is_err());
}
#[test]
fn test_wallet_confirm_transaction() {
let rpc_client = RpcClient::new_mock("succeeds".to_string());
let signature = "good_signature";
let status = confirm_transaction(&rpc_client, &signature);
assert_eq!(status.unwrap(), RpcSignatureStatus::Confirmed);
let rpc_client = RpcClient::new_mock("bad_sig_status".to_string());
let signature = "bad_status";
let status = confirm_transaction(&rpc_client, &signature);
assert!(status.is_err());
let rpc_client = RpcClient::new_mock("fails".to_string());
let signature = "bad_status_fmt";
let status = confirm_transaction(&rpc_client, &signature);
assert!(status.is_err());
}
#[test]
fn test_wallet_send_and_confirm_transaction() {
let rpc_client = RpcClient::new_mock("succeeds".to_string());
let key = Keypair::new();
let to = Keypair::new().pubkey();
let blockhash = Hash::default();
let mut tx = SystemTransaction::new_account(&key, &to, 50, blockhash, 0);
let result = send_and_confirm_transaction(&rpc_client, &mut tx, &key);
result.unwrap();
let rpc_client = RpcClient::new_mock("account_in_use".to_string());
let result = send_and_confirm_transaction(&rpc_client, &mut tx, &key);
assert!(result.is_err());
let rpc_client = RpcClient::new_mock("fails".to_string());
let result = send_and_confirm_transaction(&rpc_client, &mut tx, &key);
assert!(result.is_err());
}
#[test]
fn test_wallet_resign_transaction() {
let rpc_client = RpcClient::new_mock("succeeds".to_string());
let key = Keypair::new();
let to = Keypair::new().pubkey();
let vec = bs58::decode("HUu3LwEzGRsUkuJS121jzkPJW39Kq62pXCTmTa1F9jDL")
.into_vec()
.unwrap();
let blockhash = Hash::new(&vec);
let prev_tx = SystemTransaction::new_account(&key, &to, 50, blockhash, 0);
let mut tx = SystemTransaction::new_account(&key, &to, 50, blockhash, 0);
resign_transaction(&rpc_client, &mut tx, &key).unwrap();
assert_ne!(prev_tx, tx);
assert_ne!(prev_tx.signatures, tx.signatures);
assert_ne!(prev_tx.recent_blockhash, tx.recent_blockhash);
assert_eq!(prev_tx.fee, tx.fee);
assert_eq!(prev_tx.account_keys, tx.account_keys);
assert_eq!(prev_tx.instructions, tx.instructions);
}
#[test]
fn test_request_and_confirm_airdrop() {
let rpc_client = RpcClient::new_mock("succeeds".to_string());
let drone_addr = socketaddr!(0, 0);
let pubkey = Keypair::new().pubkey();
let lamports = 50;
assert_eq!(
request_and_confirm_airdrop(&rpc_client, &drone_addr, &pubkey, lamports).unwrap(),
()
);
let lamports = 0;
assert!(request_and_confirm_airdrop(&rpc_client, &drone_addr, &pubkey, lamports).is_err());
}
}