diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 75b39c3863..36b096f087 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -166,6 +166,7 @@ fn main() { let (verified_sender, verified_receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let bank0 = Bank::new_for_benches(&genesis_config); let mut bank_forks = BankForks::new(bank0); @@ -227,6 +228,7 @@ fn main() { &cluster_info, &poh_recorder, verified_receiver, + tpu_vote_receiver, vote_receiver, None, replay_vote_sender, @@ -384,6 +386,7 @@ fn main() { ); drop(verified_sender); + drop(tpu_vote_sender); drop(vote_sender); exit.store(true, Ordering::Relaxed); banking_stage.join().unwrap(); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 1df87ce8b3..992160dbc7 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -165,6 +165,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { genesis_config.ticks_per_slot = 10_000; let (verified_sender, verified_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded(); let mut bank = Bank::new_for_benches(&genesis_config); // Allow arbitrary transaction processing time for the purposes of this bench @@ -220,6 +221,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { &cluster_info, &poh_recorder, verified_receiver, + tpu_vote_receiver, vote_receiver, None, s, @@ -269,6 +271,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { start += chunk_len; start %= verified.len(); }); + drop(tpu_vote_sender); drop(vote_sender); exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 4ea86cdae8..b564ab2915 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -9,7 +9,7 @@ use itertools::Itertools; use lru::LruCache; use retain_mut::RetainMut; use solana_entry::entry::hash_transactions; -use solana_gossip::cluster_info::ClusterInfo; +use solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}; use solana_ledger::blockstore_processor::TransactionStatusSender; use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info}; @@ -52,7 +52,7 @@ use std::{ collections::{HashMap, VecDeque}, env, mem::size_of, - net::UdpSocket, + net::{SocketAddr, UdpSocket}, ops::DerefMut, sync::atomic::{AtomicU64, AtomicUsize, Ordering}, sync::{Arc, Mutex, RwLock}, @@ -80,6 +80,9 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const DEFAULT_LRU_SIZE: usize = 200_000; +const NUM_VOTE_PROCESSING_THREADS: u32 = 2; +const MIN_THREADS_BANKING: u32 = 1; + #[derive(Debug, Default)] pub struct BankingStageStats { last_report: AtomicInterval, @@ -267,6 +270,13 @@ pub enum BufferedPacketsDecision { Hold, } +#[derive(Debug, Clone)] +pub enum ForwardOption { + NotForward, + ForwardTpuVote, + ForwardTransaction, +} + impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. #[allow(clippy::new_ret_no_self)] @@ -274,6 +284,7 @@ impl BankingStage { cluster_info: &Arc, poh_recorder: &Arc>, verified_receiver: CrossbeamReceiver>, + tpu_verified_vote_receiver: CrossbeamReceiver>, verified_vote_receiver: CrossbeamReceiver>, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, @@ -283,6 +294,7 @@ impl BankingStage { cluster_info, poh_recorder, verified_receiver, + tpu_verified_vote_receiver, verified_vote_receiver, Self::num_threads(), transaction_status_sender, @@ -296,6 +308,7 @@ impl BankingStage { poh_recorder: &Arc>, verified_receiver: CrossbeamReceiver>, verified_vote_receiver: CrossbeamReceiver>, + tpu_verified_vote_receiver: CrossbeamReceiver>, num_threads: u32, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, @@ -311,13 +324,20 @@ impl BankingStage { ))); let data_budget = Arc::new(DataBudget::default()); // Many banks that process transactions in parallel. + assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING); let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { - let (verified_receiver, enable_forwarding) = if i < num_threads - 1 { - (verified_receiver.clone(), true) - } else { - // Disable forwarding of vote transactions, as votes are gossiped - (verified_vote_receiver.clone(), false) + let (verified_receiver, forward_option) = match i { + 0 => { + // Disable forwarding of vote transactions + // from gossip. Note - votes can also arrive from tpu + (verified_vote_receiver.clone(), ForwardOption::NotForward) + } + 1 => ( + tpu_verified_vote_receiver.clone(), + ForwardOption::ForwardTpuVote, + ), + _ => (verified_receiver.clone(), ForwardOption::ForwardTransaction), }; let poh_recorder = poh_recorder.clone(); @@ -336,7 +356,7 @@ impl BankingStage { &poh_recorder, &cluster_info, &mut recv_start, - enable_forwarding, + forward_option, i, batch_limit, transaction_status_sender, @@ -586,7 +606,7 @@ impl BankingStage { poh_recorder: &Arc>, cluster_info: &ClusterInfo, buffered_packets: &mut UnprocessedPackets, - enable_forwarding: bool, + forward_option: &ForwardOption, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, @@ -649,7 +669,7 @@ impl BankingStage { } BufferedPacketsDecision::Forward => { Self::handle_forwarding( - enable_forwarding, + forward_option, cluster_info, buffered_packets, poh_recorder, @@ -660,7 +680,7 @@ impl BankingStage { } BufferedPacketsDecision::ForwardAndHold => { Self::handle_forwarding( - enable_forwarding, + forward_option, cluster_info, buffered_packets, poh_recorder, @@ -675,7 +695,7 @@ impl BankingStage { } fn handle_forwarding( - enable_forwarding: bool, + forward_option: &ForwardOption, cluster_info: &ClusterInfo, buffered_packets: &mut UnprocessedPackets, poh_recorder: &Arc>, @@ -683,14 +703,19 @@ impl BankingStage { hold: bool, data_budget: &DataBudget, ) { - if !enable_forwarding { - if !hold { - buffered_packets.clear(); + let addr = match forward_option { + ForwardOption::NotForward => { + if !hold { + buffered_packets.clear(); + } + return; } - return; - } - - let addr = match next_leader_tpu_forwards(cluster_info, poh_recorder) { + ForwardOption::ForwardTransaction => { + next_leader_tpu_forwards(cluster_info, poh_recorder) + } + ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder), + }; + let addr = match addr { Some(addr) => addr, None => return, }; @@ -711,7 +736,7 @@ impl BankingStage { poh_recorder: &Arc>, cluster_info: &ClusterInfo, recv_start: &mut Instant, - enable_forwarding: bool, + forward_option: ForwardOption, id: u32, batch_limit: usize, transaction_status_sender: Option, @@ -734,7 +759,7 @@ impl BankingStage { poh_recorder, cluster_info, &mut buffered_packets, - enable_forwarding, + &forward_option, transaction_status_sender.clone(), &gossip_vote_sender, &banking_stage_stats, @@ -788,13 +813,11 @@ impl BankingStage { } pub fn num_threads() -> u32 { - const MIN_THREADS_VOTES: u32 = 1; - const MIN_THREADS_BANKING: u32 = 1; cmp::max( env::var("SOLANA_BANKING_THREADS") .map(|x| x.parse().unwrap_or(NUM_THREADS)) .unwrap_or(NUM_THREADS), - MIN_THREADS_VOTES + MIN_THREADS_BANKING, + NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING, ) } @@ -1126,6 +1149,10 @@ impl BankingStage { .iter() .filter_map(|tx_index| { let p = &msgs.packets[*tx_index]; + if votes_only && !p.meta.is_simple_vote_tx { + return None; + } + let tx: VersionedTransaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?; let message_bytes = Self::packet_message(p)?; let message_hash = Message::hash_raw_message(message_bytes); @@ -1133,9 +1160,6 @@ impl BankingStage { Err(TransactionError::UnsupportedVersion) }) .ok()?; - if votes_only && !solana_runtime::bank::is_simple_vote_transaction(&tx) { - return None; - } tx.verify_precompiles(feature_set).ok()?; Some((tx, *tx_index)) }) @@ -1627,27 +1651,37 @@ pub(crate) fn next_leader_tpu( cluster_info: &ClusterInfo, poh_recorder: &Mutex, ) -> Option { - if let Some(leader_pubkey) = poh_recorder - .lock() - .unwrap() - .leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET) - { - cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu) - } else { - None - } + next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu) } fn next_leader_tpu_forwards( cluster_info: &ClusterInfo, - poh_recorder: &Arc>, + poh_recorder: &Mutex, ) -> Option { + next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_forwards) +} + +pub(crate) fn next_leader_tpu_vote( + cluster_info: &ClusterInfo, + poh_recorder: &Mutex, +) -> Option { + next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_vote) +} + +fn next_leader_x( + cluster_info: &ClusterInfo, + poh_recorder: &Mutex, + port_selector: F, +) -> Option +where + F: FnOnce(&ContactInfo) -> SocketAddr, +{ if let Some(leader_pubkey) = poh_recorder .lock() .unwrap() .leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET) { - cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu_forwards) + cluster_info.lookup_contact_info(&leader_pubkey, port_selector) } else { None } @@ -1684,6 +1718,7 @@ mod tests { }; use solana_streamer::socket::SocketAddrSpace; use solana_transaction_status::TransactionWithStatusMeta; + use solana_vote_program::vote_transaction; use std::{ convert::{TryFrom, TryInto}, net::SocketAddr, @@ -1708,8 +1743,8 @@ mod tests { let genesis_config = create_genesis_config(2).genesis_config; let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)); let (verified_sender, verified_receiver) = unbounded(); - let (vote_sender, vote_receiver) = unbounded(); - let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -1720,11 +1755,14 @@ mod tests { create_test_recorder(&bank, &blockstore, None); let cluster_info = new_test_cluster_info(Node::new_localhost().info); let cluster_info = Arc::new(cluster_info); + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, verified_receiver, - vote_receiver, + tpu_vote_receiver, + gossip_verified_vote_receiver, None, gossip_vote_sender, Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( @@ -1732,7 +1770,8 @@ mod tests { ))))), ); drop(verified_sender); - drop(vote_sender); + drop(gossip_verified_vote_sender); + drop(tpu_vote_sender); exit.store(true, Ordering::Relaxed); banking_stage.join().unwrap(); poh_service.join().unwrap(); @@ -1751,7 +1790,7 @@ mod tests { let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = unbounded(); - let (vote_sender, vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -1766,13 +1805,15 @@ mod tests { create_test_recorder(&bank, &blockstore, Some(poh_config)); let cluster_info = new_test_cluster_info(Node::new_localhost().info); let cluster_info = Arc::new(cluster_info); + let (verified_gossip_vote_sender, verified_gossip_vote_receiver) = unbounded(); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, verified_receiver, - vote_receiver, + tpu_vote_receiver, + verified_gossip_vote_receiver, None, gossip_vote_sender, Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( @@ -1781,7 +1822,8 @@ mod tests { ); trace!("sending bank"); drop(verified_sender); - drop(vote_sender); + drop(verified_gossip_vote_sender); + drop(tpu_vote_sender); exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); drop(poh_recorder); @@ -1821,7 +1863,8 @@ mod tests { let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = unbounded(); - let (vote_sender, vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); + let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -1844,7 +1887,8 @@ mod tests { &cluster_info, &poh_recorder, verified_receiver, - vote_receiver, + tpu_vote_receiver, + gossip_verified_vote_receiver, None, gossip_vote_sender, Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( @@ -1887,7 +1931,8 @@ mod tests { .unwrap(); drop(verified_sender); - drop(vote_sender); + drop(tpu_vote_sender); + drop(gossip_verified_vote_sender); // wait until banking_stage to finish up all packets banking_stage.join().unwrap(); @@ -1968,6 +2013,7 @@ mod tests { verified_sender.send(packets).unwrap(); let (vote_sender, vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); @@ -1993,8 +2039,9 @@ mod tests { &cluster_info, &poh_recorder, verified_receiver, + tpu_vote_receiver, vote_receiver, - 2, + 3, None, gossip_vote_sender, Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( @@ -2012,6 +2059,7 @@ mod tests { }; drop(verified_sender); drop(vote_sender); + drop(tpu_vote_sender); // consume the entire entry_receiver, feed it into a new bank // check that the balance is what we expect. @@ -2968,7 +3016,7 @@ mod tests { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let data_budget = DataBudget::default(); BankingStage::handle_forwarding( - true, + &ForwardOption::ForwardTransaction, &cluster_info, &mut unprocessed_packets, &poh_recorder, @@ -3097,59 +3145,166 @@ mod tests { ); } + #[cfg(test)] + fn make_test_packets( + transactions: Vec, + vote_indexes: Vec, + ) -> (Packets, Vec) { + let capacity = transactions.len(); + let mut packets = Packets::with_capacity(capacity); + let mut packet_indexes = Vec::with_capacity(capacity); + packets.packets.resize(capacity, Packet::default()); + for (index, tx) in transactions.iter().enumerate() { + Packet::populate_packet(&mut packets.packets[index], None, tx).ok(); + packet_indexes.push(index); + } + for index in vote_indexes.iter() { + packets.packets[*index].meta.is_simple_vote_tx = true; + } + (packets, packet_indexes) + } + #[test] fn test_transactions_from_packets() { use solana_sdk::feature_set::FeatureSet; - use solana_vote_program::vote_state::Vote; - solana_logger::setup(); - let mut vote_packet = Packet::default(); - let vote_instruction = solana_vote_program::vote_instruction::vote( - &Pubkey::new_unique(), - &Pubkey::new_unique(), - Vote::default(), - ); - let vote_transaction = - Transaction::new_with_payer(&[vote_instruction], Some(&Pubkey::new_unique())); - Packet::populate_packet(&mut vote_packet, None, &vote_transaction).unwrap(); - let mut non_vote = Packet::default(); - let tx = system_transaction::transfer( - &Keypair::new(), - &Pubkey::new_unique(), - 2, + let keypair = Keypair::new(); + let transfer_tx = + system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()); + let vote_tx = vote_transaction::new_vote_transaction( + vec![42], Hash::default(), + Hash::default(), + &keypair, + &keypair, + &keypair, + None, ); - Packet::populate_packet(&mut non_vote, None, &tx).unwrap(); - let msgs = Packets::new(vec![non_vote, vote_packet]); - let packet_indexes = [0, 1]; - let feature_set = Arc::new(FeatureSet::default()); - let cost_model = Arc::new(RwLock::new(CostModel::default())); - let cost_tracker = Arc::new(RwLock::new(CostTracker::new(cost_model))); - let banking_stage_stats = BankingStageStats::default(); - let (transactions, _transaction_to_packet_indexes, _retryable_packet_indexes) = - BankingStage::transactions_from_packets( - &msgs, - &packet_indexes, - &feature_set, - &cost_tracker, - &banking_stage_stats, - false, - true, - &mut CostTrackerStats::default(), - ); - assert_eq!(transactions.len(), 1); - assert!(!transactions[0].signatures().is_empty()); - let (transactions, _transaction_to_packet_indexes, _retryable_packet_indexes) = - BankingStage::transactions_from_packets( - &msgs, - &packet_indexes, - &feature_set, - &cost_tracker, - &banking_stage_stats, - false, - false, - &mut CostTrackerStats::default(), + // packets with no votes + { + let vote_indexes = vec![]; + let (packets, packet_indexes) = + make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes); + + let mut votes_only = false; + let (txs, tx_packet_index, _retryable_packet_indexes) = + BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + &Arc::new(FeatureSet::default()), + &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), + &BankingStageStats::default(), + false, + votes_only, + &mut CostTrackerStats::default(), + ); + assert_eq!(2, txs.len()); + assert_eq!(vec![0, 1], tx_packet_index); + + votes_only = true; + let (txs, tx_packet_index, _retryable_packet_indexes) = + BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + &Arc::new(FeatureSet::default()), + &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), + &BankingStageStats::default(), + false, + votes_only, + &mut CostTrackerStats::default(), + ); + assert_eq!(0, txs.len()); + assert_eq!(0, tx_packet_index.len()); + } + + // packets with some votes + { + let vote_indexes = vec![0, 2]; + let (packets, packet_indexes) = make_test_packets( + vec![vote_tx.clone(), transfer_tx, vote_tx.clone()], + vote_indexes, ); - assert_eq!(transactions.len(), 2); + + let mut votes_only = false; + let (txs, tx_packet_index, _retryable_packet_indexes) = + BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + &Arc::new(FeatureSet::default()), + &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), + &BankingStageStats::default(), + false, + votes_only, + &mut CostTrackerStats::default(), + ); + assert_eq!(3, txs.len()); + assert_eq!(vec![0, 1, 2], tx_packet_index); + + votes_only = true; + let (txs, tx_packet_index, _retryable_packet_indexes) = + BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + &Arc::new(FeatureSet::default()), + &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), + &BankingStageStats::default(), + false, + votes_only, + &mut CostTrackerStats::default(), + ); + assert_eq!(2, txs.len()); + assert_eq!(vec![0, 2], tx_packet_index); + } + + // packets with all votes + { + let vote_indexes = vec![0, 1, 2]; + let (packets, packet_indexes) = make_test_packets( + vec![vote_tx.clone(), vote_tx.clone(), vote_tx], + vote_indexes, + ); + + let mut votes_only = false; + let (txs, tx_packet_index, _retryable_packet_indexes) = + BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + &Arc::new(FeatureSet::default()), + &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), + &BankingStageStats::default(), + false, + votes_only, + &mut CostTrackerStats::default(), + ); + assert_eq!(3, txs.len()); + assert_eq!(vec![0, 1, 2], tx_packet_index); + + votes_only = true; + let (txs, tx_packet_index, _retryable_packet_indexes) = + BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + &Arc::new(FeatureSet::default()), + &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), + &BankingStageStats::default(), + false, + votes_only, + &mut CostTrackerStats::default(), + ); + assert_eq!(3, txs.len()); + assert_eq!(vec![0, 1, 2], tx_packet_index); + } } } diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index e7fed7a08d..3c1b4d9708 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -350,7 +350,10 @@ impl ClusterInfoVoteListener { labels: Vec, ) -> (Vec, Vec<(CrdsValueLabel, Slot, Packets)>) { let mut msgs = packet::to_packets_chunked(&votes, 1); - sigverify::ed25519_verify_cpu(&mut msgs); + + // Votes should already be filtered by this point. + let reject_non_vote = false; + sigverify::ed25519_verify_cpu(&mut msgs, reject_non_vote); let (vote_txs, packets) = izip!(labels.into_iter(), votes.into_iter(), msgs,) .filter_map(|(label, vote, packet)| { diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 8aea7fe25c..239db3199b 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -23,38 +23,49 @@ impl FetchStage { pub fn new( sockets: Vec, tpu_forwards_sockets: Vec, + tpu_vote_sockets: Vec, exit: &Arc, poh_recorder: &Arc>, coalesce_ms: u64, - ) -> (Self, PacketReceiver) { + ) -> (Self, PacketReceiver, PacketReceiver) { let (sender, receiver) = channel(); + let (vote_sender, vote_receiver) = channel(); ( Self::new_with_sender( sockets, tpu_forwards_sockets, + tpu_vote_sockets, exit, &sender, + &vote_sender, poh_recorder, coalesce_ms, ), receiver, + vote_receiver, ) } + pub fn new_with_sender( sockets: Vec, tpu_forwards_sockets: Vec, + tpu_vote_sockets: Vec, exit: &Arc, sender: &PacketSender, + vote_sender: &PacketSender, poh_recorder: &Arc>, coalesce_ms: u64, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect(); + let tpu_vote_sockets = tpu_vote_sockets.into_iter().map(Arc::new).collect(); Self::new_multi_socket( tx_sockets, tpu_forwards_sockets, + tpu_vote_sockets, exit, sender, + vote_sender, poh_recorder, coalesce_ms, ) @@ -98,8 +109,10 @@ impl FetchStage { fn new_multi_socket( sockets: Vec>, tpu_forwards_sockets: Vec>, + tpu_vote_sockets: Vec>, exit: &Arc, sender: &PacketSender, + vote_sender: &PacketSender, poh_recorder: &Arc>, coalesce_ms: u64, ) -> Self { @@ -130,6 +143,18 @@ impl FetchStage { ) }); + let tpu_vote_threads = tpu_vote_sockets.into_iter().map(|socket| { + streamer::receiver( + socket, + exit, + vote_sender.clone(), + recycler.clone(), + "fetch_vote_stage", + coalesce_ms, + true, + ) + }); + let sender = sender.clone(); let poh_recorder = poh_recorder.clone(); @@ -150,7 +175,10 @@ impl FetchStage { }) .unwrap(); - let mut thread_hdls: Vec<_> = tpu_threads.chain(tpu_forwards_threads).collect(); + let mut thread_hdls: Vec<_> = tpu_threads + .chain(tpu_forwards_threads) + .chain(tpu_vote_threads) + .collect(); thread_hdls.push(fwd_thread_hdl); Self { thread_hdls } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 0f69de83d1..4d04c67518 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -5621,6 +5621,7 @@ pub mod tests { &poh_recorder, &tower_storage, vote_info, + false, ); let mut cursor = Cursor::default(); @@ -5684,6 +5685,7 @@ pub mod tests { &poh_recorder, &tower_storage, vote_info, + false, ); let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); @@ -5751,6 +5753,7 @@ pub mod tests { &poh_recorder, &tower_storage, vote_info, + false, ); assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time); diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 25ac0dfbaa..4a5321f474 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -855,7 +855,7 @@ mod tests { repair: socketaddr!("127.0.0.1:1237"), tpu: socketaddr!("127.0.0.1:1238"), tpu_forwards: socketaddr!("127.0.0.1:1239"), - unused: socketaddr!("127.0.0.1:1240"), + tpu_vote: socketaddr!("127.0.0.1:1240"), rpc: socketaddr!("127.0.0.1:1241"), rpc_pubsub: socketaddr!("127.0.0.1:1242"), serve_repair: socketaddr!("127.0.0.1:1243"), @@ -942,7 +942,7 @@ mod tests { repair: socketaddr!([127, 0, 0, 1], 1237), tpu: socketaddr!([127, 0, 0, 1], 1238), tpu_forwards: socketaddr!([127, 0, 0, 1], 1239), - unused: socketaddr!([127, 0, 0, 1], 1240), + tpu_vote: socketaddr!([127, 0, 0, 1], 1240), rpc: socketaddr!([127, 0, 0, 1], 1241), rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242), serve_repair: serve_repair_addr, @@ -972,7 +972,7 @@ mod tests { repair: socketaddr!([127, 0, 0, 1], 1237), tpu: socketaddr!([127, 0, 0, 1], 1238), tpu_forwards: socketaddr!([127, 0, 0, 1], 1239), - unused: socketaddr!([127, 0, 0, 1], 1240), + tpu_vote: socketaddr!([127, 0, 0, 1], 1240), rpc: socketaddr!([127, 0, 0, 1], 1241), rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242), serve_repair: serve_repair_addr2, diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index fbf6ea11ae..53b1da56ae 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -17,6 +17,16 @@ pub use solana_perf::sigverify::{ pub struct TransactionSigVerifier { recycler: Recycler, recycler_out: Recycler>, + reject_non_vote: bool, +} + +impl TransactionSigVerifier { + pub fn new_reject_non_vote() -> Self { + TransactionSigVerifier { + reject_non_vote: true, + ..TransactionSigVerifier::default() + } + } } impl Default for TransactionSigVerifier { @@ -25,13 +35,19 @@ impl Default for TransactionSigVerifier { Self { recycler: Recycler::warmed(50, 4096), recycler_out: Recycler::warmed(50, 4096), + reject_non_vote: false, } } } impl SigVerifier for TransactionSigVerifier { fn verify_batch(&self, mut batch: Vec) -> Vec { - sigverify::ed25519_verify(&mut batch, &self.recycler, &self.recycler_out); + sigverify::ed25519_verify( + &mut batch, + &self.recycler, + &self.recycler_out, + self.reject_non_vote, + ); batch } } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 4836f097da..245e873eb9 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -41,6 +41,7 @@ pub const DEFAULT_TPU_COALESCE_MS: u64 = 5; pub struct Tpu { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, + vote_sigverify_stage: SigVerifyStage, banking_stage: BankingStage, cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, @@ -55,6 +56,7 @@ impl Tpu { retransmit_slots_receiver: RetransmitSlotsReceiver, transactions_sockets: Vec, tpu_forwards_sockets: Vec, + tpu_vote_sockets: Vec, broadcast_sockets: Vec, subscriptions: &Arc, transaction_status_sender: Option, @@ -74,11 +76,14 @@ impl Tpu { cost_model: &Arc>, ) -> Self { let (packet_sender, packet_receiver) = channel(); + let (vote_packet_sender, vote_packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( transactions_sockets, tpu_forwards_sockets, + tpu_vote_sockets, exit, &packet_sender, + &vote_packet_sender, poh_recorder, tpu_coalesce_ms, ); @@ -89,11 +94,23 @@ impl Tpu { SigVerifyStage::new(packet_receiver, verified_sender, verifier) }; - let (verified_vote_packets_sender, verified_vote_packets_receiver) = unbounded(); + let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded(); + + let vote_sigverify_stage = { + let verifier = TransactionSigVerifier::new_reject_non_vote(); + SigVerifyStage::new( + vote_packet_receiver, + verified_tpu_vote_packets_sender, + verifier, + ) + }; + + let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) = + unbounded(); let cluster_info_vote_listener = ClusterInfoVoteListener::new( exit, cluster_info.clone(), - verified_vote_packets_sender, + verified_gossip_vote_packets_sender, poh_recorder, vote_tracker, bank_forks.clone(), @@ -111,7 +128,8 @@ impl Tpu { cluster_info, poh_recorder, verified_receiver, - verified_vote_packets_receiver, + verified_tpu_vote_packets_receiver, + verified_gossip_vote_packets_receiver, transaction_status_sender, replay_vote_sender, cost_tracker, @@ -131,6 +149,7 @@ impl Tpu { Self { fetch_stage, sigverify_stage, + vote_sigverify_stage, banking_stage, cluster_info_vote_listener, broadcast_stage, @@ -141,6 +160,7 @@ impl Tpu { let results = vec![ self.fetch_stage.join(), self.sigverify_stage.join(), + self.vote_sigverify_stage.join(), self.cluster_info_vote_listener.join(), self.banking_stage.join(), ]; diff --git a/core/src/tvu.rs b/core/src/tvu.rs index cf3ae2e1e7..939103e879 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -292,6 +292,7 @@ impl Tvu { cluster_info.clone(), poh_recorder.clone(), tower_storage, + bank_forks.clone(), ); let (cost_update_sender, cost_update_receiver): ( diff --git a/core/src/validator.rs b/core/src/validator.rs index 70784ada06..8936063a30 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -837,6 +837,7 @@ impl Validator { retransmit_slots_receiver, node.sockets.tpu, node.sockets.tpu_forwards, + node.sockets.tpu_vote, node.sockets.broadcast, &rpc_subscriptions, transaction_status_sender, diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index 189d101f0c..5553bc3225 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -2,9 +2,10 @@ use crate::tower_storage::{SavedTower, TowerStorage}; use solana_gossip::cluster_info::ClusterInfo; use solana_measure::measure::Measure; use solana_poh::poh_recorder::PohRecorder; +use solana_runtime::bank_forks::BankForks; use solana_sdk::{clock::Slot, transaction::Transaction}; use std::{ - sync::{mpsc::Receiver, Arc, Mutex}, + sync::{mpsc::Receiver, Arc, Mutex, RwLock}, thread::{self, Builder, JoinHandle}, }; @@ -39,16 +40,20 @@ impl VotingService { cluster_info: Arc, poh_recorder: Arc>, tower_storage: Arc, + bank_forks: Arc>, ) -> Self { let thread_hdl = Builder::new() .name("sol-vote-service".to_string()) .spawn(move || { for vote_op in vote_receiver.iter() { + let rooted_bank = bank_forks.read().unwrap().root_bank().clone(); + let send_to_tpu_vote_port = rooted_bank.send_to_tpu_vote_port_enabled(); Self::handle_vote( &cluster_info, &poh_recorder, tower_storage.as_ref(), vote_op, + send_to_tpu_vote_port, ); } }) @@ -61,6 +66,7 @@ impl VotingService { poh_recorder: &Mutex, tower_storage: &dyn TowerStorage, vote_op: VoteOp, + send_to_tpu_vote_port: bool, ) { if let VoteOp::PushVote { saved_tower, .. } = &vote_op { let mut measure = Measure::start("tower_save-ms"); @@ -72,10 +78,12 @@ impl VotingService { inc_new_counter_info!("tower_save-ms", measure.as_ms() as usize); } - let _ = cluster_info.send_transaction( - vote_op.tx(), - crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder), - ); + let target_address = if send_to_tpu_vote_port { + crate::banking_stage::next_leader_tpu_vote(cluster_info, poh_recorder) + } else { + crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder) + }; + let _ = cluster_info.send_transaction(vote_op.tx(), target_address); match vote_op { VoteOp::PushVote { diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 778f8319a9..fb5c6665f0 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -254,7 +254,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "skH6cJ1MHfUyJKVaTfRRDV9y29HjuRpfReFpxXMWNky")] +#[frozen_abi(digest = "4VqzaZbxQkeTgo916HVoLtaWoM8bbGaQZy6Qgw7K9kLf")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { @@ -765,7 +765,7 @@ impl ClusterInfo { }; let ip_addr = node.gossip.ip(); Some(format!( - "{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", + "{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", if ContactInfo::is_valid_address(&node.gossip, &self.socket_addr_space) { ip_addr.to_string() } else { @@ -780,6 +780,7 @@ impl ClusterInfo { "-".to_string() }, addr_to_string(&ip_addr, &node.gossip), + addr_to_string(&ip_addr, &node.tpu_vote), addr_to_string(&ip_addr, &node.tpu), addr_to_string(&ip_addr, &node.tpu_forwards), addr_to_string(&ip_addr, &node.tvu), @@ -794,9 +795,9 @@ impl ClusterInfo { format!( "IP Address |Age(ms)| Node identifier \ - | Version |Gossip| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer\n\ + | Version |Gossip|TPUvote| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer\n\ ------------------+-------+----------------------------------------------+---------+\ - ------+------+------+------+------+------+------+--------\n\ + ------+------+-------+------+------+------+------+------+--------\n\ {}\ Nodes: {}{}{}", nodes.join(""), @@ -2667,6 +2668,7 @@ pub struct Sockets { pub tvu_forwards: Vec, pub tpu: Vec, pub tpu_forwards: Vec, + pub tpu_vote: Vec, pub broadcast: Vec, pub repair: UdpSocket, pub retransmit_sockets: Vec, @@ -2693,6 +2695,7 @@ impl Node { let tvu = UdpSocket::bind("127.0.0.1:0").unwrap(); let tvu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap(); let tpu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap(); + let tpu_vote = UdpSocket::bind("127.0.0.1:0").unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let rpc_port = find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap(); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port); @@ -2703,7 +2706,6 @@ impl Node { let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap(); - let unused = UdpSocket::bind("0.0.0.0:0").unwrap(); let info = ContactInfo { id: *pubkey, gossip: gossip_addr, @@ -2712,7 +2714,7 @@ impl Node { repair: repair.local_addr().unwrap(), tpu: tpu.local_addr().unwrap(), tpu_forwards: tpu_forwards.local_addr().unwrap(), - unused: unused.local_addr().unwrap(), + tpu_vote: tpu_vote.local_addr().unwrap(), rpc: rpc_addr, rpc_pubsub: rpc_pubsub_addr, serve_repair: serve_repair.local_addr().unwrap(), @@ -2728,6 +2730,7 @@ impl Node { tvu_forwards: vec![tvu_forwards], tpu: vec![tpu], tpu_forwards: vec![tpu_forwards], + tpu_vote: vec![tpu_vote], broadcast, repair, retransmit_sockets: vec![retransmit_socket], @@ -2768,6 +2771,7 @@ impl Node { let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range); let (tpu_port, tpu) = Self::bind(bind_ip_addr, port_range); let (tpu_forwards_port, tpu_forwards) = Self::bind(bind_ip_addr, port_range); + let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); let (repair_port, repair) = Self::bind(bind_ip_addr, port_range); let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range); @@ -2784,7 +2788,7 @@ impl Node { repair: SocketAddr::new(gossip_addr.ip(), repair_port), tpu: SocketAddr::new(gossip_addr.ip(), tpu_port), tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), - unused: socketaddr_any!(), + tpu_vote: SocketAddr::new(gossip_addr.ip(), tpu_vote_port), rpc: SocketAddr::new(gossip_addr.ip(), rpc_port), rpc_pubsub: SocketAddr::new(gossip_addr.ip(), rpc_pubsub_port), serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port), @@ -2802,6 +2806,7 @@ impl Node { tvu_forwards: vec![tvu_forwards], tpu: vec![tpu], tpu_forwards: vec![tpu_forwards], + tpu_vote: vec![tpu_vote], broadcast: vec![broadcast], repair, retransmit_sockets: vec![retransmit_socket], @@ -2831,6 +2836,9 @@ impl Node { let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind"); + let (tpu_vote_port, tpu_vote_sockets) = + multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind"); + let (_, retransmit_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("retransmit multi_bind"); @@ -2848,7 +2856,7 @@ impl Node { repair: SocketAddr::new(gossip_addr.ip(), repair_port), tpu: SocketAddr::new(gossip_addr.ip(), tpu_port), tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), - unused: socketaddr_any!(), + tpu_vote: SocketAddr::new(gossip_addr.ip(), tpu_vote_port), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port), @@ -2865,6 +2873,7 @@ impl Node { tvu_forwards: tvu_forwards_sockets, tpu: tpu_sockets, tpu_forwards: tpu_forwards_sockets, + tpu_vote: tpu_vote_sockets, broadcast, repair, retransmit_sockets, diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index 30ec785ef4..a614fee7e5 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -28,7 +28,7 @@ pub struct ContactInfo { /// address to forward unprocessed transactions to pub tpu_forwards: SocketAddr, /// address to which to send bank state requests - pub unused: SocketAddr, + pub tpu_vote: SocketAddr, /// address to which to send JSON-RPC requests pub rpc: SocketAddr, /// websocket for JSON-RPC push notifications @@ -76,7 +76,7 @@ impl Default for ContactInfo { repair: socketaddr_any!(), tpu: socketaddr_any!(), tpu_forwards: socketaddr_any!(), - unused: socketaddr_any!(), + tpu_vote: socketaddr_any!(), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), serve_repair: socketaddr_any!(), @@ -96,7 +96,7 @@ impl ContactInfo { repair: socketaddr!("127.0.0.1:1237"), tpu: socketaddr!("127.0.0.1:1238"), tpu_forwards: socketaddr!("127.0.0.1:1239"), - unused: socketaddr!("127.0.0.1:1240"), + tpu_vote: socketaddr!("127.0.0.1:1240"), rpc: socketaddr!("127.0.0.1:1241"), rpc_pubsub: socketaddr!("127.0.0.1:1242"), serve_repair: socketaddr!("127.0.0.1:1243"), @@ -126,7 +126,7 @@ impl ContactInfo { repair: addr, tpu: addr, tpu_forwards: addr, - unused: addr, + tpu_vote: addr, rpc: addr, rpc_pubsub: addr, serve_repair: addr, @@ -152,6 +152,7 @@ impl ContactInfo { let rpc = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT); let rpc_pubsub = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); let serve_repair = next_port(bind_addr, 6); + let tpu_vote = next_port(bind_addr, 7); Self { id: *pubkey, gossip, @@ -160,7 +161,7 @@ impl ContactInfo { repair, tpu, tpu_forwards, - unused: "0.0.0.0:0".parse().unwrap(), + tpu_vote, rpc, rpc_pubsub, serve_repair, @@ -262,7 +263,7 @@ mod tests { assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); - assert!(ci.unused.ip().is_unspecified()); + assert!(ci.tpu_vote.ip().is_unspecified()); assert!(ci.serve_repair.ip().is_unspecified()); } #[test] @@ -274,7 +275,7 @@ mod tests { assert!(ci.rpc.ip().is_multicast()); assert!(ci.rpc_pubsub.ip().is_multicast()); assert!(ci.tpu.ip().is_multicast()); - assert!(ci.unused.ip().is_multicast()); + assert!(ci.tpu_vote.ip().is_multicast()); assert!(ci.serve_repair.ip().is_multicast()); } #[test] @@ -287,7 +288,7 @@ mod tests { assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); - assert!(ci.unused.ip().is_unspecified()); + assert!(ci.tpu_vote.ip().is_unspecified()); assert!(ci.serve_repair.ip().is_unspecified()); } #[test] @@ -295,12 +296,12 @@ mod tests { let addr = socketaddr!("127.0.0.1:10"); let ci = ContactInfo::new_with_socketaddr(&addr); assert_eq!(ci.tpu, addr); + assert_eq!(ci.tpu_vote.port(), 17); assert_eq!(ci.gossip.port(), 11); assert_eq!(ci.tvu.port(), 12); assert_eq!(ci.tpu_forwards.port(), 13); assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT); assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); - assert!(ci.unused.ip().is_unspecified()); assert_eq!(ci.serve_repair.port(), 16); } @@ -327,6 +328,7 @@ mod tests { assert_eq!(d1.tvu_forwards, socketaddr!("127.0.0.1:1238")); assert_eq!(d1.repair, socketaddr!("127.0.0.1:1239")); assert_eq!(d1.serve_repair, socketaddr!("127.0.0.1:1240")); + assert_eq!(d1.tpu_vote, socketaddr!("127.0.0.1:1241")); } #[test] diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 2a013ae191..bcc487d12d 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -19,7 +19,7 @@ fn bench_sigverify(bencher: &mut Bencher) { let recycler_out = Recycler::default(); // verify packets bencher.iter(|| { - let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); + let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false); }) } @@ -34,6 +34,6 @@ fn bench_get_offsets(bencher: &mut Bencher) { let recycler = Recycler::default(); // verify packets bencher.iter(|| { - let _ans = sigverify::generate_offsets(&mut batches, &recycler); + let _ans = sigverify::generate_offsets(&mut batches, &recycler, false); }) } diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 4090dac618..ed09d7a33a 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -102,8 +102,8 @@ pub fn init() { } } -fn verify_packet(packet: &mut Packet) { - let packet_offsets = get_packet_offsets(packet, 0); +fn verify_packet(packet: &mut Packet, reject_non_vote: bool) { + let packet_offsets = get_packet_offsets(packet, 0, reject_non_vote); let mut sig_start = packet_offsets.sig_start as usize; let mut pubkey_start = packet_offsets.pubkey_start as usize; let msg_start = packet_offsets.msg_start as usize; @@ -276,15 +276,20 @@ fn do_get_packet_offsets( )) } -fn get_packet_offsets(packet: &mut Packet, current_offset: usize) -> PacketOffsets { +fn get_packet_offsets( + packet: &mut Packet, + current_offset: usize, + reject_non_vote: bool, +) -> PacketOffsets { let unsanitized_packet_offsets = do_get_packet_offsets(packet, current_offset); if let Ok(offsets) = unsanitized_packet_offsets { check_for_simple_vote_transaction(packet, &offsets, current_offset).ok(); - offsets - } else { - // force sigverify to fail by returning zeros - PacketOffsets::new(0, 0, 0, 0, 0) + if !reject_non_vote || packet.meta.is_simple_vote_tx { + return offsets; + } } + // force sigverify to fail by returning zeros + PacketOffsets::new(0, 0, 0, 0, 0) } fn check_for_simple_vote_transaction( @@ -355,7 +360,11 @@ fn check_for_simple_vote_transaction( Ok(()) } -pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler) -> TxOffsets { +pub fn generate_offsets( + batches: &mut [Packets], + recycler: &Recycler, + reject_non_vote: bool, +) -> TxOffsets { debug!("allocating.."); let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets"); signature_offsets.set_pinnable(); @@ -370,7 +379,7 @@ pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler) batches.iter_mut().for_each(|p| { let mut sig_lens = Vec::new(); p.packets.iter_mut().for_each(|packet| { - let packet_offsets = get_packet_offsets(packet, current_offset); + let packet_offsets = get_packet_offsets(packet, current_offset, reject_non_vote); sig_lens.push(packet_offsets.sig_len); @@ -404,14 +413,16 @@ pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler) ) } -pub fn ed25519_verify_cpu(batches: &mut [Packets]) { +pub fn ed25519_verify_cpu(batches: &mut [Packets], reject_non_vote: bool) { use rayon::prelude::*; let count = batch_size(batches); debug!("CPU ECDSA for {}", batch_size(batches)); PAR_THREAD_POOL.install(|| { - batches - .into_par_iter() - .for_each(|p| p.packets.par_iter_mut().for_each(verify_packet)) + batches.into_par_iter().for_each(|p| { + p.packets + .par_iter_mut() + .for_each(|p| verify_packet(p, reject_non_vote)) + }) }); inc_new_counter_debug!("ed25519_verify_cpu", count); } @@ -491,10 +502,11 @@ pub fn ed25519_verify( batches: &mut [Packets], recycler: &Recycler, recycler_out: &Recycler>, + reject_non_vote: bool, ) { let api = perf_libs::api(); if api.is_none() { - return ed25519_verify_cpu(batches); + return ed25519_verify_cpu(batches, reject_non_vote); } let api = api.unwrap(); @@ -507,11 +519,11 @@ pub fn ed25519_verify( // may be busy doing other things while being a real validator // TODO: dynamically adjust this crossover if count < 64 { - return ed25519_verify_cpu(batches); + return ed25519_verify_cpu(batches, reject_non_vote); } let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) = - generate_offsets(batches, recycler); + generate_offsets(batches, recycler, reject_non_vote); debug!("CUDA ECDSA for {}", batch_size(batches)); debug!("allocating out.."); @@ -626,7 +638,7 @@ mod tests { let message_data = tx.message_data(); let mut packet = sigverify::make_packet_from_transaction(tx.clone()); - let packet_offsets = sigverify::get_packet_offsets(&mut packet, 0); + let packet_offsets = sigverify::get_packet_offsets(&mut packet, 0, false); assert_eq!( memfind(&tx_bytes, tx.signatures[0].as_ref()), @@ -705,7 +717,7 @@ mod tests { let res = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!(res, Err(PacketError::InvalidPubkeyLen)); - verify_packet(&mut packet); + verify_packet(&mut packet, false); assert!(packet.meta.discard); packet.meta.discard = false; @@ -741,7 +753,7 @@ mod tests { let res = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!(res, Err(PacketError::InvalidPubkeyLen)); - verify_packet(&mut packet); + verify_packet(&mut packet, false); assert!(packet.meta.discard); packet.meta.discard = false; @@ -872,7 +884,8 @@ mod tests { // Just like get_packet_offsets, but not returning redundant information. fn get_packet_offsets_from_tx(tx: Transaction, current_offset: u32) -> PacketOffsets { let mut packet = sigverify::make_packet_from_transaction(tx); - let packet_offsets = sigverify::get_packet_offsets(&mut packet, current_offset as usize); + let packet_offsets = + sigverify::get_packet_offsets(&mut packet, current_offset as usize, false); PacketOffsets::new( packet_offsets.sig_len, packet_offsets.sig_start - current_offset, @@ -953,7 +966,7 @@ mod tests { fn ed25519_verify(batches: &mut [Packets]) { let recycler = Recycler::default(); let recycler_out = Recycler::default(); - sigverify::ed25519_verify(batches, &recycler, &recycler_out); + sigverify::ed25519_verify(batches, &recycler, &recycler_out, false); } #[test] @@ -1051,8 +1064,8 @@ mod tests { // verify from GPU verification pipeline (when GPU verification is enabled) are // equivalent to the CPU verification pipeline. let mut batches_cpu = batches.clone(); - sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); - ed25519_verify_cpu(&mut batches_cpu); + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false); + ed25519_verify_cpu(&mut batches_cpu, false); // check result batches diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index d133e62c4c..13cbe87e48 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -5925,6 +5925,11 @@ impl Bank { .is_active(&feature_set::stakes_remove_delegation_if_inactive::id()) } + pub fn send_to_tpu_vote_port_enabled(&self) -> bool { + self.feature_set + .is_active(&feature_set::send_to_tpu_vote_port::id()) + } + // Check if the wallclock time from bank creation to now has exceeded the allotted // time for transaction processing pub fn should_bank_still_be_processing_txs( @@ -6257,7 +6262,7 @@ pub fn goto_end_of_slot(bank: &mut Bank) { } } -pub fn is_simple_vote_transaction(transaction: &SanitizedTransaction) -> bool { +fn is_simple_vote_transaction(transaction: &SanitizedTransaction) -> bool { if transaction.message().instructions().len() == 1 { let (program_pubkey, instruction) = transaction .message() diff --git a/sdk/docker-solana/Dockerfile b/sdk/docker-solana/Dockerfile index 1beecc8ae8..6452efdcc5 100644 --- a/sdk/docker-solana/Dockerfile +++ b/sdk/docker-solana/Dockerfile @@ -30,6 +30,8 @@ EXPOSE 8006/udp EXPOSE 8007/udp # broadcast EXPOSE 8008/udp +# tpu_vote +EXPOSE 8009/udp RUN apt update && \ apt-get install -y bzip2 libssl-dev && \ diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index a548a4a473..fcf581660d 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -237,6 +237,10 @@ pub mod remove_native_loader { solana_sdk::declare_id!("HTTgmruMYRZEntyL3EdCDdnS6e4D5wRq1FA7kQsb66qq"); } +pub mod send_to_tpu_vote_port { + solana_sdk::declare_id!("C5fh68nJ7uyKAuYZg2x9sEQ5YrVf3dkW6oojNBSc3Jvo"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -291,6 +295,7 @@ lazy_static! { (prevent_calling_precompiles_as_programs::id(), "Prevent calling precompiles as programs"), (optimize_epoch_boundary_updates::id(), "Optimize epoch boundary updates"), (remove_native_loader::id(), "Remove support for the native loader"), + (send_to_tpu_vote_port::id(), "Send votes to the tpu vote port"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter() diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index d39e3526fa..fc37737724 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -357,6 +357,9 @@ fn verify_reachable_ports( if ContactInfo::is_valid_address(&node.info.tpu_forwards, socket_addr_space) { udp_sockets.extend(node.sockets.tpu_forwards.iter()); } + if ContactInfo::is_valid_address(&node.info.tpu_vote, socket_addr_space) { + udp_sockets.extend(node.sockets.tpu_vote.iter()); + } if ContactInfo::is_valid_address(&node.info.tvu, socket_addr_space) { udp_sockets.extend(node.sockets.tvu.iter()); udp_sockets.extend(node.sockets.broadcast.iter());