diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 0cbd0418a4..237e02c1ff 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -522,6 +522,7 @@ impl CliConfig<'_> { commitment: CommitmentConfig::recent(), send_transaction_config: RpcSendTransactionConfig { skip_preflight: true, + preflight_commitment: Some(CommitmentConfig::recent().commitment), ..RpcSendTransactionConfig::default() }, ..Self::default() @@ -1406,6 +1407,7 @@ fn send_deploy_messages( config.commitment, RpcSendTransactionConfig { skip_preflight: true, + preflight_commitment: Some(config.commitment.commitment), ..RpcSendTransactionConfig::default() }, ) @@ -1815,6 +1817,7 @@ fn process_set_program_upgrade_authority( config.commitment, RpcSendTransactionConfig { skip_preflight: true, + preflight_commitment: Some(config.commitment.commitment), ..RpcSendTransactionConfig::default() }, ) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index a1645365e4..33622ca393 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -58,7 +58,7 @@ type PacketsAndOffsets = (Packets, Vec); pub type UnprocessedPackets = Vec; /// Transaction forwarding -pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 1; +pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2; // Fixed thread size seems to be fastest on GCP setup pub const NUM_THREADS: u32 = 4; diff --git a/core/src/rpc.rs b/core/src/rpc.rs index a577a7be72..79e161cd26 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -237,7 +237,7 @@ impl JsonRpcRequestProcessor { let cluster_info = Arc::new(ClusterInfo::default()); let tpu_address = cluster_info.my_contact_info().tpu; let (sender, receiver) = channel(); - SendTransactionService::new(tpu_address, &bank_forks, None, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); Self { config: JsonRpcConfig::default(), @@ -2322,8 +2322,13 @@ impl RpcSol for RpcSolImpl { let config = config.unwrap_or_default(); let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Base58); let (wire_transaction, transaction) = deserialize_transaction(data, encoding)?; - let bank = &*meta.bank(None); - let last_valid_slot = bank + + let preflight_commitment = config + .preflight_commitment + .map(|commitment| CommitmentConfig { commitment }); + let preflight_bank = &*meta.bank(preflight_commitment); + + let last_valid_slot = preflight_bank .get_blockhash_last_valid_slot(&transaction.message.recent_blockhash) .unwrap_or(0); @@ -2335,11 +2340,6 @@ impl RpcSol for RpcSolImpl { if meta.health.check() != RpcHealthStatus::Ok { return Err(RpcCustomError::RpcNodeUnhealthy.into()); } - - let preflight_commitment = config - .preflight_commitment - .map(|commitment| CommitmentConfig { commitment }); - let preflight_bank = &*meta.bank(preflight_commitment); if let (Err(err), logs) = preflight_bank.simulate_transaction(transaction.clone()) { return Err(RpcCustomError::SendTransactionPreflightFailure { message: format!("Transaction simulation failed: {}", err), @@ -2906,7 +2906,7 @@ pub mod tests { None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), ); - SendTransactionService::new(tpu_address, &bank_forks, None, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr( &leader_pubkey, @@ -4306,7 +4306,7 @@ pub mod tests { None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), ); - SendTransactionService::new(tpu_address, &bank_forks, None, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); let mut bad_transaction = system_transaction::transfer( &mint_keypair, @@ -4502,7 +4502,7 @@ pub mod tests { None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), ); - SendTransactionService::new(tpu_address, &bank_forks, None, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); assert_eq!(request_processor.validator_exit(), false); assert_eq!(exit.load(Ordering::Relaxed), false); } @@ -4534,7 +4534,7 @@ pub mod tests { None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), ); - SendTransactionService::new(tpu_address, &bank_forks, None, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); assert_eq!(request_processor.validator_exit(), true); assert_eq!(exit.load(Ordering::Relaxed), true); } @@ -4625,7 +4625,7 @@ pub mod tests { None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), ); - SendTransactionService::new(tpu_address, &bank_forks, None, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); assert_eq!( request_processor.get_block_commitment(0), RpcBlockCommitment { diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 43b9566e24..a059a5d4a0 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -251,6 +251,8 @@ impl JsonRpcService { trusted_validators: Option>, override_health_check: Arc, optimistically_confirmed_bank: Arc>, + send_transaction_retry_ms: u64, + send_transaction_leader_forward_count: u64, ) -> Self { info!("rpc bound to {:?}", rpc_addr); info!("rpc configuration: {:?}", config); @@ -323,6 +325,8 @@ impl JsonRpcService { &bank_forks, leader_info, receiver, + send_transaction_retry_ms, + send_transaction_leader_forward_count, )); #[cfg(test)] @@ -456,6 +460,8 @@ mod tests { None, Arc::new(AtomicBool::new(false)), optimistically_confirmed_bank, + 1000, + 1, ); let thread = rpc_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); diff --git a/core/src/send_transaction_service.rs b/core/src/send_transaction_service.rs index 79f9a268d8..fafe5b1022 100644 --- a/core/src/send_transaction_service.rs +++ b/core/src/send_transaction_service.rs @@ -4,7 +4,11 @@ use crate::poh_recorder::PohRecorder; use log::*; use solana_metrics::{datapoint_warn, inc_new_counter_info}; use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; +use solana_sdk::{ + clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, + pubkey::Pubkey, + signature::Signature, +}; use std::sync::Mutex; use std::{ collections::HashMap, @@ -64,12 +68,21 @@ impl LeaderInfo { .collect(); } - pub fn get_leader_tpu(&self) -> Option<&SocketAddr> { - self.poh_recorder - .lock() - .unwrap() - .leader_after_n_slots(0) - .and_then(|leader| self.recent_peers.get(&leader)) + pub fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr> { + let recorder = self.poh_recorder.lock().unwrap(); + let leaders: Vec<_> = (0..max_count) + .filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS)) + .collect(); + drop(recorder); + let mut unique_leaders = vec![]; + for leader in leaders.iter() { + if let Some(addr) = self.recent_peers.get(leader) { + if !unique_leaders.contains(&addr) { + unique_leaders.push(addr); + } + } + } + unique_leaders } } @@ -88,8 +101,17 @@ impl SendTransactionService { bank_forks: &Arc>, leader_info: Option, receiver: Receiver, + retry_rate_ms: u64, + leader_forward_count: u64, ) -> Self { - let thread = Self::retry_thread(tpu_address, receiver, bank_forks.clone(), leader_info); + let thread = Self::retry_thread( + tpu_address, + receiver, + bank_forks.clone(), + leader_info, + retry_rate_ms, + leader_forward_count, + ); Self { thread } } @@ -98,8 +120,11 @@ impl SendTransactionService { receiver: Receiver, bank_forks: Arc>, mut leader_info: Option, + retry_rate_ms: u64, + leader_forward_count: u64, ) -> JoinHandle<()> { let mut last_status_check = Instant::now(); + let mut last_leader_refresh = Instant::now(); let mut transactions = HashMap::new(); let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -110,19 +135,21 @@ impl SendTransactionService { Builder::new() .name("send-tx-sv2".to_string()) .spawn(move || loop { - match receiver.recv_timeout(Duration::from_secs(1)) { + match receiver.recv_timeout(Duration::from_millis(1000.min(retry_rate_ms))) { Err(RecvTimeoutError::Disconnected) => break, Err(RecvTimeoutError::Timeout) => {} Ok(transaction_info) => { - let address = leader_info + let addresses = leader_info .as_ref() - .and_then(|leader_info| leader_info.get_leader_tpu()) - .unwrap_or(&tpu_address); - Self::send_transaction( - &send_socket, - address, - &transaction_info.wire_transaction, - ); + .map(|leader_info| leader_info.get_leader_tpus(leader_forward_count)); + let addresses = addresses.unwrap_or_else(|| vec![&tpu_address]); + for address in addresses { + Self::send_transaction( + &send_socket, + address, + &transaction_info.wire_transaction, + ); + } if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { transactions.insert(transaction_info.signature, transaction_info); } else { @@ -131,15 +158,19 @@ impl SendTransactionService { } } - if Instant::now().duration_since(last_status_check).as_secs() >= 5 { + if last_status_check.elapsed().as_millis() as u64 >= retry_rate_ms { if !transactions.is_empty() { datapoint_info!( "send_transaction_service-queue-size", ("len", transactions.len(), i64) ); - let bank_forks = bank_forks.read().unwrap(); - let root_bank = bank_forks.root_bank(); - let working_bank = bank_forks.working_bank(); + let (root_bank, working_bank) = { + let bank_forks = bank_forks.read().unwrap(); + ( + bank_forks.root_bank().clone(), + bank_forks.working_bank().clone(), + ) + }; let _result = Self::process_transactions( &working_bank, @@ -151,8 +182,11 @@ impl SendTransactionService { ); } last_status_check = Instant::now(); - if let Some(leader_info) = leader_info.as_mut() { - leader_info.refresh_recent_peers(); + if last_leader_refresh.elapsed().as_millis() > 1000 { + if let Some(leader_info) = leader_info.as_mut() { + leader_info.refresh_recent_peers(); + } + last_leader_refresh = Instant::now(); } } }) @@ -188,12 +222,21 @@ impl SendTransactionService { info!("Retrying transaction: {}", signature); result.retried += 1; inc_new_counter_info!("send_transaction_service-retry", 1); + let leaders = leader_info + .as_ref() + .map(|leader_info| leader_info.get_leader_tpus(1)); + let leader = if let Some(leaders) = leaders { + if leaders.is_empty() { + &tpu_address + } else { + leaders[0] + } + } else { + &tpu_address + }; Self::send_transaction( &send_socket, - leader_info - .as_ref() - .and_then(|leader_info| leader_info.get_leader_tpu()) - .unwrap_or(&tpu_address), + leader, &transaction_info.wire_transaction, ); true @@ -234,9 +277,20 @@ impl SendTransactionService { #[cfg(test)] mod test { use super::*; + use crate::contact_info::ContactInfo; + use solana_ledger::{ + blockstore::Blockstore, get_tmp_ledger_path, leader_schedule_cache::LeaderScheduleCache, + }; + use solana_runtime::genesis_utils::{ + create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, + }; use solana_sdk::{ - genesis_config::create_genesis_config, pubkey::Pubkey, signature::Signer, + genesis_config::create_genesis_config, + poh_config::PohConfig, + pubkey::Pubkey, + signature::{Keypair, Signer}, system_transaction, + timing::timestamp, }; use std::sync::mpsc::channel; @@ -248,7 +302,7 @@ mod test { let (sender, receiver) = channel(); let send_tranaction_service = - SendTransactionService::new(tpu_address, &bank_forks, None, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); drop(sender); send_tranaction_service.join().unwrap(); @@ -401,4 +455,113 @@ mod test { } ); } + + #[test] + fn test_get_leader_tpus() { + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&ledger_path).unwrap(); + + let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand(); + let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand(); + let validator_vote_keypairs2 = ValidatorVoteKeypairs::new_rand(); + let validator_keypairs = vec![ + &validator_vote_keypairs0, + &validator_vote_keypairs1, + &validator_vote_keypairs2, + ]; + let GenesisConfigInfo { + genesis_config, + mint_keypair: _, + voting_keypair: _, + } = create_genesis_config_with_vote_accounts( + 1_000_000_000, + &validator_keypairs, + vec![10_000; 3], + ); + let bank = Arc::new(Bank::new(&genesis_config)); + + let (poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + bank.last_blockhash(), + 0, + Some((2, 2)), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blockstore), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &Arc::new(PohConfig::default()), + ); + + let node_keypair = Arc::new(Keypair::new()); + let cluster_info = Arc::new(ClusterInfo::new( + ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), + node_keypair, + )); + + let validator0_socket = SocketAddr::from(([127, 0, 0, 1], 1111)); + let validator1_socket = SocketAddr::from(([127, 0, 0, 1], 2222)); + let validator2_socket = SocketAddr::from(([127, 0, 0, 1], 3333)); + let recent_peers: HashMap<_, _> = vec![ + ( + validator_vote_keypairs0.node_keypair.pubkey(), + validator0_socket, + ), + ( + validator_vote_keypairs1.node_keypair.pubkey(), + validator1_socket, + ), + ( + validator_vote_keypairs2.node_keypair.pubkey(), + validator2_socket, + ), + ] + .iter() + .cloned() + .collect(); + let leader_info = LeaderInfo { + cluster_info, + poh_recorder: Arc::new(Mutex::new(poh_recorder)), + recent_peers: recent_peers.clone(), + }; + + let slot = bank.slot(); + let first_leader = + solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap(); + assert_eq!( + leader_info.get_leader_tpus(1), + vec![recent_peers.get(&first_leader).unwrap()] + ); + + let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at( + slot + NUM_CONSECUTIVE_LEADER_SLOTS, + &bank, + ) + .unwrap(); + let mut expected_leader_sockets = vec![ + recent_peers.get(&first_leader).unwrap(), + recent_peers.get(&second_leader).unwrap(), + ]; + expected_leader_sockets.dedup(); + assert_eq!(leader_info.get_leader_tpus(2), expected_leader_sockets); + + let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at( + slot + (2 * NUM_CONSECUTIVE_LEADER_SLOTS), + &bank, + ) + .unwrap(); + let mut expected_leader_sockets = vec![ + recent_peers.get(&first_leader).unwrap(), + recent_peers.get(&second_leader).unwrap(), + recent_peers.get(&third_leader).unwrap(), + ]; + expected_leader_sockets.dedup(); + assert_eq!(leader_info.get_leader_tpus(3), expected_leader_sockets); + + for x in 4..8 { + assert!(leader_info.get_leader_tpus(x).len() <= recent_peers.len()); + } + } + Blockstore::destroy(&ledger_path).unwrap(); + } } diff --git a/core/src/validator.rs b/core/src/validator.rs index 7288a54b3e..fcef13dbcd 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -106,6 +106,8 @@ pub struct ValidatorConfig { pub debug_keys: Option>>, pub contact_debug_interval: u64, pub bpf_jit: bool, + pub send_transaction_retry_ms: u64, + pub send_transaction_leader_forward_count: u64, } impl Default for ValidatorConfig { @@ -144,6 +146,8 @@ impl Default for ValidatorConfig { debug_keys: None, contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL, bpf_jit: false, + send_transaction_retry_ms: 2000, + send_transaction_leader_forward_count: 2, } } } @@ -433,6 +437,8 @@ impl Validator { config.trusted_validators.clone(), rpc_override_health_check.clone(), optimistically_confirmed_bank.clone(), + config.send_transaction_retry_ms, + config.send_transaction_leader_forward_count, ), pubsub_service: PubSubService::new( config.pubsub_config.clone(), diff --git a/validator/src/main.rs b/validator/src/main.rs index cd8d3b57ad..8d210f425c 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -798,6 +798,12 @@ pub fn main() { PubSubConfig::default().max_in_buffer_capacity.to_string(); let default_rpc_pubsub_max_out_buffer_capacity = PubSubConfig::default().max_out_buffer_capacity.to_string(); + let default_rpc_send_transaction_retry_ms = ValidatorConfig::default() + .send_transaction_retry_ms + .to_string(); + let default_rpc_send_transaction_leader_forward_count = ValidatorConfig::default() + .send_transaction_leader_forward_count + .to_string(); let matches = App::new(crate_name!()).about(crate_description!()) .version(solana_version::version!()) @@ -1278,6 +1284,24 @@ pub fn main() { .default_value(&default_rpc_pubsub_max_out_buffer_capacity) .help("The maximum size in bytes to which the outgoing websocket buffer can grow."), ) + .arg( + Arg::with_name("rpc_send_transaction_retry_ms") + .long("rpc-send-retry-ms") + .value_name("MILLISECS") + .takes_value(true) + .validator(is_parsable::) + .default_value(&default_rpc_send_transaction_retry_ms) + .help("The rate at which transactions sent via rpc service are retried."), + ) + .arg( + Arg::with_name("rpc_send_transaction_leader_forward_count") + .long("rpc-send-leader-count") + .value_name("NUMBER") + .takes_value(true) + .validator(is_parsable::) + .default_value(&default_rpc_send_transaction_leader_forward_count) + .help("The number of upcoming leaders to which to forward transactions sent via rpc service."), + ) .arg( Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch") .long("halt-on-trusted-validators-accounts-hash-mismatch") @@ -1483,6 +1507,12 @@ pub fn main() { debug_keys, contact_debug_interval, bpf_jit: matches.is_present("bpf_jit"), + send_transaction_retry_ms: value_t_or_exit!(matches, "rpc_send_transaction_retry_ms", u64), + send_transaction_leader_forward_count: value_t_or_exit!( + matches, + "rpc_send_transaction_leader_forward_count", + u64 + ), ..ValidatorConfig::default() };