diff --git a/cli/src/program.rs b/cli/src/program.rs index 630d23dd18..e10c6d341a 100644 --- a/cli/src/program.rs +++ b/cli/src/program.rs @@ -1,4 +1,4 @@ -use crate::send_tpu::{get_leader_tpu, send_transaction_tpu}; +use crate::send_tpu::{get_leader_tpus, send_transaction_tpu}; use crate::{ checks::*, cli::{ @@ -55,6 +55,7 @@ use std::{ }; const DATA_CHUNK_SIZE: usize = 229; // Keep program chunks under PACKET_DATA_SIZE +const NUM_TPU_LEADERS: u64 = 2; #[derive(Debug, PartialEq)] pub enum ProgramCliCommand { @@ -1578,7 +1579,7 @@ fn send_and_confirm_transactions_with_spinner( let cluster_nodes = rpc_client.get_cluster_nodes().ok(); loop { - progress_bar.set_message("Finding leader node..."); + progress_bar.set_message("Finding leader nodes..."); let epoch_info = rpc_client.get_epoch_info()?; let mut slot = epoch_info.absolute_slot; let mut last_epoch_fetch = Instant::now(); @@ -1587,8 +1588,9 @@ fn send_and_confirm_transactions_with_spinner( leader_schedule_epoch = epoch_info.epoch; } - let mut tpu_address = get_leader_tpu( + let mut tpu_addresses = get_leader_tpus( min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch), + NUM_TPU_LEADERS, leader_schedule.as_ref(), cluster_nodes.as_ref(), ); @@ -1597,10 +1599,12 @@ fn send_and_confirm_transactions_with_spinner( let mut pending_transactions = HashMap::new(); let num_transactions = transactions.len(); for transaction in transactions { - if let Some(tpu_address) = tpu_address { + if !tpu_addresses.is_empty() { let wire_transaction = serialize(&transaction).expect("serialization should succeed"); - send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction); + for tpu_address in &tpu_addresses { + send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction); + } } else { let _result = rpc_client .send_transaction_with_config( @@ -1626,8 +1630,9 @@ fn send_and_confirm_transactions_with_spinner( if last_epoch_fetch.elapsed() > Duration::from_millis(400) { let epoch_info = rpc_client.get_epoch_info()?; last_epoch_fetch = Instant::now(); - tpu_address = get_leader_tpu( + tpu_addresses = get_leader_tpus( min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch), + NUM_TPU_LEADERS, leader_schedule.as_ref(), cluster_nodes.as_ref(), ); @@ -1678,17 +1683,20 @@ fn send_and_confirm_transactions_with_spinner( } let epoch_info = rpc_client.get_epoch_info()?; - tpu_address = get_leader_tpu( + tpu_addresses = get_leader_tpus( min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch), + NUM_TPU_LEADERS, leader_schedule.as_ref(), cluster_nodes.as_ref(), ); for transaction in pending_transactions.values() { - if let Some(tpu_address) = tpu_address { + if !tpu_addresses.is_empty() { let wire_transaction = - serialize(transaction).expect("serialization should succeed"); - send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction); + serialize(&transaction).expect("serialization should succeed"); + for tpu_address in &tpu_addresses { + send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction); + } } else { let _result = rpc_client .send_transaction_with_config( diff --git a/cli/src/send_tpu.rs b/cli/src/send_tpu.rs index 72c2525aae..320e88e47a 100644 --- a/cli/src/send_tpu.rs +++ b/cli/src/send_tpu.rs @@ -1,21 +1,38 @@ use log::*; use solana_client::rpc_response::{RpcContactInfo, RpcLeaderSchedule}; +use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS; use std::net::{SocketAddr, UdpSocket}; -pub fn get_leader_tpu( +pub fn get_leader_tpus( slot_index: u64, + num_leaders: u64, leader_schedule: Option<&RpcLeaderSchedule>, cluster_nodes: Option<&Vec>, -) -> Option { - leader_schedule? - .iter() - .find(|(_pubkey, slots)| slots.iter().any(|slot| *slot as u64 == slot_index)) - .and_then(|(pubkey, _)| { - cluster_nodes? +) -> Vec { + let leaders: Vec<_> = (0..num_leaders) + .filter_map(|i| { + leader_schedule? .iter() - .find(|contact_info| contact_info.pubkey == *pubkey) - .and_then(|contact_info| contact_info.tpu) + .find(|(_pubkey, slots)| { + slots.iter().any(|slot| { + *slot as u64 == (slot_index + (i * NUM_CONSECUTIVE_LEADER_SLOTS)) + }) + }) + .and_then(|(pubkey, _)| { + cluster_nodes? + .iter() + .find(|contact_info| contact_info.pubkey == *pubkey) + .and_then(|contact_info| contact_info.tpu) + }) }) + .collect(); + let mut unique_leaders = vec![]; + for leader in leaders.into_iter() { + if !unique_leaders.contains(&leader) { + unique_leaders.push(leader); + } + } + unique_leaders } pub fn send_transaction_tpu(