diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 412939247f..7fb3a090c1 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -1,7 +1,8 @@ use crate::{ - checks::*, cluster_query::*, feature::*, inflation::*, nonce::*, spend_utils::*, stake::*, - validator_info::*, vote::*, + checks::*, cluster_query::*, feature::*, inflation::*, nonce::*, send_tpu::*, spend_utils::*, + stake::*, validator_info::*, vote::*, }; +use bincode::serialize; use bip39::{Language, Mnemonic, MnemonicType, Seed}; use clap::{value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand}; use log::*; @@ -32,7 +33,7 @@ use solana_client::{ rpc_client::RpcClient, rpc_config::{RpcLargestAccountsFilter, RpcSendTransactionConfig}, rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, - rpc_response::RpcKeyedAccount, + rpc_response::{RpcKeyedAccount, RpcLeaderSchedule}, }; #[cfg(not(test))] use solana_faucet::faucet::request_airdrop_transaction; @@ -42,7 +43,7 @@ use solana_rbpf::vm::EbpfVm; use solana_remote_wallet::remote_wallet::RemoteWalletManager; use solana_sdk::{ bpf_loader, bpf_loader_deprecated, - clock::{Epoch, Slot, DEFAULT_TICKS_PER_SECOND}, + clock::{Epoch, Slot}, commitment_config::CommitmentConfig, decode_error::DecodeError, hash::Hash, @@ -64,12 +65,13 @@ use solana_stake_program::{ use solana_transaction_status::{EncodedTransaction, UiTransactionEncoding}; use solana_vote_program::vote_state::VoteAuthorize; use std::{ + cmp::min, collections::HashMap, error, fmt::Write as FmtWrite, fs::File, io::{Read, Write}, - net::{IpAddr, SocketAddr}, + net::{IpAddr, SocketAddr, UdpSocket}, str::FromStr, sync::Arc, thread::sleep, @@ -1031,33 +1033,50 @@ fn send_and_confirm_transactions_with_spinner( ) -> Result<(), Box> { let progress_bar = new_spinner_progress_bar(); let mut send_retries = 5; + let mut leader_schedule: Option = None; + let mut leader_schedule_epoch = 0; + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let cluster_nodes = rpc_client.get_cluster_nodes().ok(); + loop { let mut status_retries = 15; + progress_bar.set_message("Finding leader node..."); + let epoch_info = rpc_client.get_epoch_info_with_commitment(commitment)?; + if epoch_info.epoch > leader_schedule_epoch || leader_schedule.is_none() { + leader_schedule = rpc_client + .get_leader_schedule_with_commitment(Some(epoch_info.absolute_slot), commitment)?; + leader_schedule_epoch = epoch_info.epoch; + } + let tpu_address = get_leader_tpu( + min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch), + leader_schedule.as_ref(), + cluster_nodes.as_ref(), + ); + // Send all transactions let mut pending_transactions = HashMap::new(); let num_transactions = transactions.len(); for transaction in transactions { - if cfg!(not(test)) { - // Delay ~1 tick between write transactions in an attempt to reduce AccountInUse errors - // when all the write transactions modify the same program account (eg, deploying a - // new program) - sleep(Duration::from_millis(1000 / DEFAULT_TICKS_PER_SECOND)); + if let Some(tpu_address) = tpu_address { + let wire_transaction = + serialize(&transaction).expect("serialization should succeed"); + send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction); + } else { + let _result = rpc_client + .send_transaction_with_config( + &transaction, + RpcSendTransactionConfig { + preflight_commitment: Some(commitment.commitment), + ..RpcSendTransactionConfig::default() + }, + ) + .ok(); } - - let _result = rpc_client - .send_transaction_with_config( - &transaction, - RpcSendTransactionConfig { - preflight_commitment: Some(commitment.commitment), - ..RpcSendTransactionConfig::default() - }, - ) - .ok(); pending_transactions.insert(transaction.signatures[0], transaction); progress_bar.set_message(&format!( - "[{}/{}] Transactions sent", + "[{}/{}] Total Transactions sent", pending_transactions.len(), num_transactions )); @@ -1093,6 +1112,11 @@ fn send_and_confirm_transactions_with_spinner( let _ = pending_transactions.remove(&signature); } } + progress_bar.set_message(&format!( + "[{}/{}] Transactions confirmed", + num_transactions - pending_transactions.len(), + num_transactions + )); } if pending_transactions.is_empty() { diff --git a/cli/src/lib.rs b/cli/src/lib.rs index c2816dfc6a..e735fa3a46 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -26,6 +26,7 @@ pub mod cluster_query; pub mod feature; pub mod inflation; pub mod nonce; +pub mod send_tpu; pub mod spend_utils; pub mod stake; pub mod test_utils; diff --git a/cli/src/send_tpu.rs b/cli/src/send_tpu.rs new file mode 100644 index 0000000000..72c2525aae --- /dev/null +++ b/cli/src/send_tpu.rs @@ -0,0 +1,29 @@ +use log::*; +use solana_client::rpc_response::{RpcContactInfo, RpcLeaderSchedule}; +use std::net::{SocketAddr, UdpSocket}; + +pub fn get_leader_tpu( + slot_index: u64, + leader_schedule: Option<&RpcLeaderSchedule>, + cluster_nodes: Option<&Vec>, +) -> Option { + leader_schedule? + .iter() + .find(|(_pubkey, slots)| slots.iter().any(|slot| *slot as u64 == slot_index)) + .and_then(|(pubkey, _)| { + cluster_nodes? + .iter() + .find(|contact_info| contact_info.pubkey == *pubkey) + .and_then(|contact_info| contact_info.tpu) + }) +} + +pub fn send_transaction_tpu( + send_socket: &UdpSocket, + tpu_address: &SocketAddr, + wire_transaction: &[u8], +) { + if let Err(err) = send_socket.send_to(wire_transaction, tpu_address) { + warn!("Failed to send transaction to {}: {:?}", tpu_address, err); + } +} diff --git a/client/src/mock_sender.rs b/client/src/mock_sender.rs index 0588f6eaaa..1ec492b849 100644 --- a/client/src/mock_sender.rs +++ b/client/src/mock_sender.rs @@ -6,6 +6,7 @@ use crate::{ }; use serde_json::{json, Number, Value}; use solana_sdk::{ + epoch_info::EpochInfo, fee_calculator::{FeeCalculator, FeeRateGovernor}, instruction::InstructionError, signature::Signature, @@ -58,6 +59,13 @@ impl RpcSender for MockSender { serde_json::to_value(FeeCalculator::default()).unwrap(), ), })?, + RpcRequest::GetEpochInfo => serde_json::to_value(EpochInfo { + epoch: 1, + slot_index: 2, + slots_in_epoch: 32, + absolute_slot: 34, + block_height: 34, + })?, RpcRequest::GetFeeCalculatorForBlockhash => { let value = if self.url == "blockhash_expired" { Value::Null