From 257ddbeee1e8e7db2daa54e86f8eeedf76ace8f1 Mon Sep 17 00:00:00 2001 From: sakridge Date: Wed, 29 Sep 2021 09:12:58 -0700 Subject: [PATCH] Tpu vote 1.7 (#20187) * Add separate vote processing tpu port * Add feature to send to tpu vote port * Add vote rejecting sigverify mode * use packet.meta.is_simple_vote_tx in place of deserialization * consolidate code that identifies vote tx atcommon path for cpu and gpu * new key for feature set * banking forward tpu vote * add tpu vote port to dockerfile and other review changes * Simplify thread id compare * fix a test; updated cluster_info ABI change Co-authored-by: Tao Zhu --- banking-bench/src/main.rs | 3 + core/benches/banking_stage.rs | 3 + core/src/banking_stage.rs | 290 ++++++++++++++++++------- core/src/cluster_info_vote_listener.rs | 5 +- core/src/fetch_stage.rs | 34 ++- core/src/replay_stage.rs | 21 +- core/src/serve_repair.rs | 6 +- core/src/sigverify.rs | 18 +- core/src/tpu.rs | 26 ++- core/src/tvu.rs | 8 +- core/src/validator.rs | 1 + core/src/voting_service.rs | 19 +- gossip/src/cluster_info.rs | 25 ++- gossip/src/contact_info.rs | 20 +- perf/benches/sigverify.rs | 4 +- perf/src/sigverify.rs | 59 +++-- runtime/src/bank.rs | 7 +- sdk/docker-solana/Dockerfile | 2 + sdk/src/feature_set.rs | 5 + validator/src/main.rs | 3 + 20 files changed, 415 insertions(+), 144 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index b2bd5c35fd..614661f713 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -166,6 +166,7 @@ fn main() { let (verified_sender, verified_receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let bank0 = Bank::new(&genesis_config); let mut bank_forks = BankForks::new(bank0); @@ -226,6 +227,7 @@ fn main() { &cluster_info, &poh_recorder, verified_receiver, + tpu_vote_receiver, vote_receiver, None, replay_vote_sender, @@ -381,6 +383,7 @@ fn main() { ); drop(verified_sender); + drop(tpu_vote_sender); drop(vote_sender); exit.store(true, Ordering::Relaxed); banking_stage.join().unwrap(); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 59370c38d8..99b8c1b8d0 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -158,6 +158,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { genesis_config.ticks_per_slot = 10_000; let (verified_sender, verified_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded(); let mut bank = Bank::new(&genesis_config); // Allow arbitrary transaction processing time for the purposes of this bench @@ -213,6 +214,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { &cluster_info, &poh_recorder, verified_receiver, + tpu_vote_receiver, vote_receiver, None, s, @@ -259,6 +261,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { start += chunk_len; start %= verified.len(); }); + drop(tpu_vote_sender); drop(vote_sender); exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index d037c0c1d0..228bd19ee1 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -6,7 +6,7 @@ use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; use lru::LruCache; use retain_mut::RetainMut; -use solana_gossip::cluster_info::ClusterInfo; +use solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}; use solana_ledger::{blockstore_processor::TransactionStatusSender, entry::hash_transactions}; use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info}; @@ -49,7 +49,7 @@ use std::{ collections::{HashMap, VecDeque}, env, mem::size_of, - net::UdpSocket, + net::{SocketAddr, UdpSocket}, ops::DerefMut, sync::atomic::{AtomicU64, AtomicUsize, Ordering}, sync::{Arc, Mutex}, @@ -77,6 +77,9 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const DEFAULT_LRU_SIZE: usize = 200_000; +const NUM_VOTE_PROCESSING_THREADS: u32 = 2; +const MIN_THREADS_BANKING: u32 = 1; + #[derive(Debug, Default)] pub struct BankingStageStats { last_report: AtomicInterval, @@ -220,6 +223,13 @@ pub enum BufferedPacketsDecision { Hold, } +#[derive(Debug, Clone)] +pub enum ForwardOption { + NotForward, + ForwardTpuVote, + ForwardTransaction, +} + impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. #[allow(clippy::new_ret_no_self)] @@ -227,6 +237,7 @@ impl BankingStage { cluster_info: &Arc, poh_recorder: &Arc>, verified_receiver: CrossbeamReceiver>, + tpu_verified_vote_receiver: CrossbeamReceiver>, verified_vote_receiver: CrossbeamReceiver>, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, @@ -235,6 +246,7 @@ impl BankingStage { cluster_info, poh_recorder, verified_receiver, + tpu_verified_vote_receiver, verified_vote_receiver, Self::num_threads(), transaction_status_sender, @@ -247,6 +259,7 @@ impl BankingStage { poh_recorder: &Arc>, verified_receiver: CrossbeamReceiver>, verified_vote_receiver: CrossbeamReceiver>, + tpu_verified_vote_receiver: CrossbeamReceiver>, num_threads: u32, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, @@ -262,13 +275,20 @@ impl BankingStage { ))); let data_budget = Arc::new(DataBudget::default()); // Many banks that process transactions in parallel. + assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING); let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { - let (verified_receiver, enable_forwarding) = if i < num_threads - 1 { - (verified_receiver.clone(), true) - } else { - // Disable forwarding of vote transactions, as votes are gossiped - (verified_vote_receiver.clone(), false) + let (verified_receiver, forward_option) = match i { + 0 => { + // Disable forwarding of vote transactions + // from gossip. Note - votes can also arrive from tpu + (verified_vote_receiver.clone(), ForwardOption::NotForward) + } + 1 => ( + tpu_verified_vote_receiver.clone(), + ForwardOption::ForwardTpuVote, + ), + _ => (verified_receiver.clone(), ForwardOption::ForwardTransaction), }; let poh_recorder = poh_recorder.clone(); @@ -287,7 +307,7 @@ impl BankingStage { &poh_recorder, &cluster_info, &mut recv_start, - enable_forwarding, + forward_option, i, batch_limit, transaction_status_sender, @@ -493,7 +513,7 @@ impl BankingStage { poh_recorder: &Arc>, cluster_info: &ClusterInfo, buffered_packets: &mut UnprocessedPackets, - enable_forwarding: bool, + forward_option: &ForwardOption, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, @@ -543,7 +563,7 @@ impl BankingStage { } BufferedPacketsDecision::Forward => { Self::handle_forwarding( - enable_forwarding, + forward_option, cluster_info, buffered_packets, poh_recorder, @@ -554,7 +574,7 @@ impl BankingStage { } BufferedPacketsDecision::ForwardAndHold => { Self::handle_forwarding( - enable_forwarding, + forward_option, cluster_info, buffered_packets, poh_recorder, @@ -569,7 +589,7 @@ impl BankingStage { } fn handle_forwarding( - enable_forwarding: bool, + forward_option: &ForwardOption, cluster_info: &ClusterInfo, buffered_packets: &mut UnprocessedPackets, poh_recorder: &Arc>, @@ -577,14 +597,19 @@ impl BankingStage { hold: bool, data_budget: &DataBudget, ) { - if !enable_forwarding { - if !hold { - buffered_packets.clear(); + let addr = match forward_option { + ForwardOption::NotForward => { + if !hold { + buffered_packets.clear(); + } + return; } - return; - } - - let addr = match next_leader_tpu_forwards(cluster_info, poh_recorder) { + ForwardOption::ForwardTransaction => { + next_leader_tpu_forwards(cluster_info, poh_recorder) + } + ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder), + }; + let addr = match addr { Some(addr) => addr, None => return, }; @@ -606,7 +631,7 @@ impl BankingStage { poh_recorder: &Arc>, cluster_info: &ClusterInfo, recv_start: &mut Instant, - enable_forwarding: bool, + forward_option: ForwardOption, id: u32, batch_limit: usize, transaction_status_sender: Option, @@ -626,7 +651,7 @@ impl BankingStage { poh_recorder, cluster_info, &mut buffered_packets, - enable_forwarding, + &forward_option, transaction_status_sender.clone(), &gossip_vote_sender, &banking_stage_stats, @@ -676,13 +701,11 @@ impl BankingStage { } pub fn num_threads() -> u32 { - const MIN_THREADS_VOTES: u32 = 1; - const MIN_THREADS_BANKING: u32 = 1; cmp::max( env::var("SOLANA_BANKING_THREADS") .map(|x| x.parse().unwrap_or(NUM_THREADS)) .unwrap_or(NUM_THREADS), - MIN_THREADS_VOTES + MIN_THREADS_BANKING, + NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING, ) } @@ -1010,12 +1033,11 @@ impl BankingStage { .iter() .filter_map(|tx_index| { let p = &msgs.packets[*tx_index]; - let tx: Transaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?; - - if votes_only && !solana_runtime::bank::is_simple_vote_transaction(&tx) { + if votes_only && !p.meta.is_simple_vote_tx { return None; } + let tx: Transaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?; tx.verify_precompiles(libsecp256k1_0_5_upgrade_enabled) .ok()?; let message_bytes = Self::packet_message(p)?; @@ -1403,27 +1425,37 @@ pub(crate) fn next_leader_tpu( cluster_info: &ClusterInfo, poh_recorder: &Mutex, ) -> Option { - if let Some(leader_pubkey) = poh_recorder - .lock() - .unwrap() - .leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET) - { - cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu) - } else { - None - } + next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu) } fn next_leader_tpu_forwards( cluster_info: &ClusterInfo, - poh_recorder: &Arc>, + poh_recorder: &Mutex, ) -> Option { + next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_forwards) +} + +pub(crate) fn next_leader_tpu_vote( + cluster_info: &ClusterInfo, + poh_recorder: &Mutex, +) -> Option { + next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_vote) +} + +fn next_leader_x( + cluster_info: &ClusterInfo, + poh_recorder: &Mutex, + port_selector: F, +) -> Option +where + F: FnOnce(&ContactInfo) -> SocketAddr, +{ if let Some(leader_pubkey) = poh_recorder .lock() .unwrap() .leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET) { - cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu_forwards) + cluster_info.lookup_contact_info(&leader_pubkey, port_selector) } else { None } @@ -1459,6 +1491,7 @@ mod tests { }; use solana_streamer::socket::SocketAddrSpace; use solana_transaction_status::TransactionWithStatusMeta; + use solana_vote_program::vote_transaction; use std::{ net::SocketAddr, path::Path, @@ -1482,8 +1515,9 @@ mod tests { let genesis_config = create_genesis_config(2).genesis_config; let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config)); let (verified_sender, verified_receiver) = unbounded(); - let (vote_sender, vote_receiver) = unbounded(); - let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); + let (vote_forward_sender, _vote_forward_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -1498,12 +1532,14 @@ mod tests { &cluster_info, &poh_recorder, verified_receiver, - vote_receiver, + tpu_vote_receiver, + gossip_verified_vote_receiver, None, - gossip_vote_sender, + vote_forward_sender, ); drop(verified_sender); - drop(vote_sender); + drop(gossip_verified_vote_sender); + drop(tpu_vote_sender); exit.store(true, Ordering::Relaxed); banking_stage.join().unwrap(); poh_service.join().unwrap(); @@ -1522,7 +1558,7 @@ mod tests { let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = unbounded(); - let (vote_sender, vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -1537,19 +1573,22 @@ mod tests { create_test_recorder(&bank, &blockstore, Some(poh_config)); let cluster_info = new_test_cluster_info(Node::new_localhost().info); let cluster_info = Arc::new(cluster_info); - let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let (vote_forward_sender, _vote_forward_receiver) = unbounded(); + let (verified_gossip_vote_sender, verified_gossip_vote_receiver) = unbounded(); let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, verified_receiver, - vote_receiver, + tpu_vote_receiver, + verified_gossip_vote_receiver, None, - gossip_vote_sender, + vote_forward_sender, ); trace!("sending bank"); drop(verified_sender); - drop(vote_sender); + drop(verified_gossip_vote_sender); + drop(tpu_vote_sender); exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); drop(poh_recorder); @@ -1589,7 +1628,8 @@ mod tests { let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = unbounded(); - let (vote_sender, vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); + let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -1612,7 +1652,8 @@ mod tests { &cluster_info, &poh_recorder, verified_receiver, - vote_receiver, + tpu_vote_receiver, + gossip_verified_vote_receiver, None, gossip_vote_sender, ); @@ -1652,7 +1693,8 @@ mod tests { .unwrap(); drop(verified_sender); - drop(vote_sender); + drop(tpu_vote_sender); + drop(gossip_verified_vote_sender); // wait until banking_stage to finish up all packets banking_stage.join().unwrap(); @@ -1733,6 +1775,7 @@ mod tests { verified_sender.send(packets).unwrap(); let (vote_sender, vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); @@ -1758,8 +1801,9 @@ mod tests { &cluster_info, &poh_recorder, verified_receiver, + tpu_vote_receiver, vote_receiver, - 2, + 3, None, gossip_vote_sender, ); @@ -1774,6 +1818,7 @@ mod tests { }; drop(verified_sender); drop(vote_sender); + drop(tpu_vote_sender); // consume the entire entry_receiver, feed it into a new bank // check that the balance is what we expect. @@ -2741,7 +2786,7 @@ mod tests { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let data_budget = DataBudget::default(); BankingStage::handle_forwarding( - true, + &ForwardOption::ForwardTransaction, &cluster_info, &mut unprocessed_packets, &poh_recorder, @@ -2870,36 +2915,123 @@ mod tests { ); } + #[cfg(test)] + fn make_test_packets( + transactions: Vec, + vote_indexes: Vec, + ) -> (Packets, Vec) { + let capacity = transactions.len(); + let mut packets = Packets::with_capacity(capacity); + let mut packet_indexes = Vec::with_capacity(capacity); + packets.packets.resize(capacity, Packet::default()); + for (index, tx) in transactions.iter().enumerate() { + Packet::populate_packet(&mut packets.packets[index], None, tx).ok(); + packet_indexes.push(index); + } + for index in vote_indexes.iter() { + packets.packets[*index].meta.is_simple_vote_tx = true; + } + (packets, packet_indexes) + } + #[test] fn test_transactions_from_packets() { - use solana_vote_program::vote_state::Vote; - solana_logger::setup(); - let mut vote_packet = Packet::default(); - let vote_instruction = solana_vote_program::vote_instruction::vote( - &Pubkey::new_unique(), - &Pubkey::new_unique(), - Vote::default(), - ); - let vote_transaction = - Transaction::new_with_payer(&[vote_instruction], Some(&Pubkey::new_unique())); - Packet::populate_packet(&mut vote_packet, None, &vote_transaction).unwrap(); - let mut non_vote = Packet::default(); - let tx = system_transaction::transfer( - &Keypair::new(), - &Pubkey::new_unique(), - 2, + let keypair = Keypair::new(); + let transfer_tx = + system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()); + let vote_tx = vote_transaction::new_vote_transaction( + vec![42], Hash::default(), + Hash::default(), + &keypair, + &keypair, + &keypair, + None, ); - Packet::populate_packet(&mut non_vote, None, &tx).unwrap(); - let msgs = Packets::new(vec![non_vote, vote_packet]); - let packet_indexes = [0, 1]; - let (transactions, _transaction_to_packet_indexes) = - BankingStage::transactions_from_packets(&msgs, &packet_indexes, false, true); - assert_eq!(transactions.len(), 1); - assert!(!transactions[0].transaction().signatures.is_empty()); - let (transactions, _transaction_to_packet_indexes) = - BankingStage::transactions_from_packets(&msgs, &packet_indexes, false, false); - assert_eq!(transactions.len(), 2); + // packets with no votes + { + let vote_indexes = vec![]; + let (packets, packet_indexes) = + make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes); + + let mut votes_only = false; + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + false, + votes_only, + ); + assert_eq!(2, txs.len()); + assert_eq!(vec![0, 1], tx_packet_index); + + votes_only = true; + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + false, + votes_only, + ); + assert_eq!(0, txs.len()); + assert_eq!(0, tx_packet_index.len()); + } + + // packets with some votes + { + let vote_indexes = vec![0, 2]; + let (packets, packet_indexes) = make_test_packets( + vec![vote_tx.clone(), transfer_tx, vote_tx.clone()], + vote_indexes, + ); + + let mut votes_only = false; + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + false, + votes_only, + ); + assert_eq!(3, txs.len()); + assert_eq!(vec![0, 1, 2], tx_packet_index); + + votes_only = true; + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + false, + votes_only, + ); + assert_eq!(2, txs.len()); + assert_eq!(vec![0, 2], tx_packet_index); + } + + // packets with all votes + { + let vote_indexes = vec![0, 1, 2]; + let (packets, packet_indexes) = make_test_packets( + vec![vote_tx.clone(), vote_tx.clone(), vote_tx], + vote_indexes, + ); + + let mut votes_only = false; + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + false, + votes_only, + ); + assert_eq!(3, txs.len()); + assert_eq!(vec![0, 1, 2], tx_packet_index); + + votes_only = true; + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + false, + votes_only, + ); + assert_eq!(3, txs.len()); + assert_eq!(vec![0, 1, 2], tx_packet_index); + } } } diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index e7e4400325..fc9f7a3db6 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -351,7 +351,10 @@ impl ClusterInfoVoteListener { labels: Vec, ) -> (Vec, Vec<(CrdsValueLabel, Slot, Packets)>) { let mut msgs = packet::to_packets_chunked(&votes, 1); - sigverify::ed25519_verify_cpu(&mut msgs); + + // Votes should already be filtered by this point. + let reject_non_vote = false; + sigverify::ed25519_verify_cpu(&mut msgs, reject_non_vote); let (vote_txs, packets) = izip!(labels.into_iter(), votes.into_iter(), msgs,) .filter_map(|(label, vote, packet)| { diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 8aea7fe25c..019ea4b0e5 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -23,38 +23,49 @@ impl FetchStage { pub fn new( sockets: Vec, tpu_forwards_sockets: Vec, + tpu_vote_sockets: Vec, exit: &Arc, poh_recorder: &Arc>, coalesce_ms: u64, - ) -> (Self, PacketReceiver) { + ) -> (Self, PacketReceiver, PacketReceiver) { let (sender, receiver) = channel(); + let (vote_sender, vote_receiver) = channel(); ( Self::new_with_sender( sockets, tpu_forwards_sockets, + tpu_vote_sockets, exit, &sender, - poh_recorder, + &vote_sender, + &poh_recorder, coalesce_ms, ), receiver, + vote_receiver, ) } + pub fn new_with_sender( sockets: Vec, tpu_forwards_sockets: Vec, + tpu_vote_sockets: Vec, exit: &Arc, sender: &PacketSender, + vote_sender: &PacketSender, poh_recorder: &Arc>, coalesce_ms: u64, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect(); + let tpu_vote_sockets = tpu_vote_sockets.into_iter().map(Arc::new).collect(); Self::new_multi_socket( tx_sockets, tpu_forwards_sockets, + tpu_vote_sockets, exit, sender, + vote_sender, poh_recorder, coalesce_ms, ) @@ -98,8 +109,10 @@ impl FetchStage { fn new_multi_socket( sockets: Vec>, tpu_forwards_sockets: Vec>, + tpu_vote_sockets: Vec>, exit: &Arc, sender: &PacketSender, + vote_sender: &PacketSender, poh_recorder: &Arc>, coalesce_ms: u64, ) -> Self { @@ -130,6 +143,18 @@ impl FetchStage { ) }); + let tpu_vote_threads = tpu_vote_sockets.into_iter().map(|socket| { + streamer::receiver( + socket, + &exit, + vote_sender.clone(), + recycler.clone(), + "fetch_vote_stage", + coalesce_ms, + true, + ) + }); + let sender = sender.clone(); let poh_recorder = poh_recorder.clone(); @@ -150,7 +175,10 @@ impl FetchStage { }) .unwrap(); - let mut thread_hdls: Vec<_> = tpu_threads.chain(tpu_forwards_threads).collect(); + let mut thread_hdls: Vec<_> = tpu_threads + .chain(tpu_forwards_threads) + .chain(tpu_vote_threads) + .collect(); thread_hdls.push(fwd_thread_hdl); Self { thread_hdls } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 2f0ea702a5..c6716b9098 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -4713,7 +4713,12 @@ mod tests { let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) .unwrap(); - crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); + crate::voting_service::VotingService::handle_vote( + &cluster_info, + &poh_recorder, + vote_info, + false, + ); let mut cursor = Cursor::default(); let (_, votes) = cluster_info.get_votes(&mut cursor); @@ -4770,7 +4775,12 @@ mod tests { let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) .unwrap(); - crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); + crate::voting_service::VotingService::handle_vote( + &cluster_info, + &poh_recorder, + vote_info, + false, + ); let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); let vote_tx = &votes[0]; @@ -4832,7 +4842,12 @@ mod tests { let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) .unwrap(); - crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); + crate::voting_service::VotingService::handle_vote( + &cluster_info, + &poh_recorder, + vote_info, + false, + ); assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time); let (_, votes) = cluster_info.get_votes(&mut cursor); diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 2a8acf75e8..c26c6b2faf 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -738,7 +738,7 @@ mod tests { repair: socketaddr!("127.0.0.1:1237"), tpu: socketaddr!("127.0.0.1:1238"), tpu_forwards: socketaddr!("127.0.0.1:1239"), - unused: socketaddr!("127.0.0.1:1240"), + tpu_vote: socketaddr!("127.0.0.1:1240"), rpc: socketaddr!("127.0.0.1:1241"), rpc_pubsub: socketaddr!("127.0.0.1:1242"), serve_repair: socketaddr!("127.0.0.1:1243"), @@ -825,7 +825,7 @@ mod tests { repair: socketaddr!([127, 0, 0, 1], 1237), tpu: socketaddr!([127, 0, 0, 1], 1238), tpu_forwards: socketaddr!([127, 0, 0, 1], 1239), - unused: socketaddr!([127, 0, 0, 1], 1240), + tpu_vote: socketaddr!([127, 0, 0, 1], 1240), rpc: socketaddr!([127, 0, 0, 1], 1241), rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242), serve_repair: serve_repair_addr, @@ -855,7 +855,7 @@ mod tests { repair: socketaddr!([127, 0, 0, 1], 1237), tpu: socketaddr!([127, 0, 0, 1], 1238), tpu_forwards: socketaddr!([127, 0, 0, 1], 1239), - unused: socketaddr!([127, 0, 0, 1], 1240), + tpu_vote: socketaddr!([127, 0, 0, 1], 1240), rpc: socketaddr!([127, 0, 0, 1], 1241), rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242), serve_repair: serve_repair_addr2, diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index fbf6ea11ae..53b1da56ae 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -17,6 +17,16 @@ pub use solana_perf::sigverify::{ pub struct TransactionSigVerifier { recycler: Recycler, recycler_out: Recycler>, + reject_non_vote: bool, +} + +impl TransactionSigVerifier { + pub fn new_reject_non_vote() -> Self { + TransactionSigVerifier { + reject_non_vote: true, + ..TransactionSigVerifier::default() + } + } } impl Default for TransactionSigVerifier { @@ -25,13 +35,19 @@ impl Default for TransactionSigVerifier { Self { recycler: Recycler::warmed(50, 4096), recycler_out: Recycler::warmed(50, 4096), + reject_non_vote: false, } } } impl SigVerifier for TransactionSigVerifier { fn verify_batch(&self, mut batch: Vec) -> Vec { - sigverify::ed25519_verify(&mut batch, &self.recycler, &self.recycler_out); + sigverify::ed25519_verify( + &mut batch, + &self.recycler, + &self.recycler_out, + self.reject_non_vote, + ); batch } } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 42da887e72..dd3e5ae0d8 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -39,6 +39,7 @@ pub const DEFAULT_TPU_COALESCE_MS: u64 = 5; pub struct Tpu { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, + vote_sigverify_stage: SigVerifyStage, banking_stage: BankingStage, cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, @@ -53,6 +54,7 @@ impl Tpu { retransmit_slots_receiver: RetransmitSlotsReceiver, transactions_sockets: Vec, tpu_forwards_sockets: Vec, + tpu_vote_sockets: Vec, broadcast_sockets: Vec, subscriptions: &Arc, transaction_status_sender: Option, @@ -71,11 +73,14 @@ impl Tpu { cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender, ) -> Self { let (packet_sender, packet_receiver) = channel(); + let (vote_packet_sender, vote_packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( transactions_sockets, tpu_forwards_sockets, + tpu_vote_sockets, exit, &packet_sender, + &vote_packet_sender, poh_recorder, tpu_coalesce_ms, ); @@ -86,11 +91,23 @@ impl Tpu { SigVerifyStage::new(packet_receiver, verified_sender, verifier) }; - let (verified_vote_packets_sender, verified_vote_packets_receiver) = unbounded(); + let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded(); + + let vote_sigverify_stage = { + let verifier = TransactionSigVerifier::new_reject_non_vote(); + SigVerifyStage::new( + vote_packet_receiver, + verified_tpu_vote_packets_sender, + verifier, + ) + }; + + let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) = + unbounded(); let cluster_info_vote_listener = ClusterInfoVoteListener::new( exit, cluster_info.clone(), - verified_vote_packets_sender, + verified_gossip_vote_packets_sender, poh_recorder, vote_tracker, bank_forks.clone(), @@ -107,7 +124,8 @@ impl Tpu { cluster_info, poh_recorder, verified_receiver, - verified_vote_packets_receiver, + verified_tpu_vote_packets_receiver, + verified_gossip_vote_packets_receiver, transaction_status_sender, replay_vote_sender, ); @@ -126,6 +144,7 @@ impl Tpu { Self { fetch_stage, sigverify_stage, + vote_sigverify_stage, banking_stage, cluster_info_vote_listener, broadcast_stage, @@ -136,6 +155,7 @@ impl Tpu { let results = vec![ self.fetch_stage.join(), self.sigverify_stage.join(), + self.vote_sigverify_stage.join(), self.cluster_info_vote_listener.join(), self.banking_stage.join(), ]; diff --git a/core/src/tvu.rs b/core/src/tvu.rs index b7487bc3d2..f948dc6699 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -276,8 +276,12 @@ impl Tvu { }; let (voting_sender, voting_receiver) = channel(); - let voting_service = - VotingService::new(voting_receiver, cluster_info.clone(), poh_recorder.clone()); + let voting_service = VotingService::new( + voting_receiver, + cluster_info.clone(), + poh_recorder.clone(), + bank_forks.clone(), + ); let replay_stage = ReplayStage::new( replay_stage_config, diff --git a/core/src/validator.rs b/core/src/validator.rs index d2a8b31daf..3edf24f9aa 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -764,6 +764,7 @@ impl Validator { retransmit_slots_receiver, node.sockets.tpu, node.sockets.tpu_forwards, + node.sockets.tpu_vote, node.sockets.broadcast, &rpc_subscriptions, transaction_status_sender, diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index 4dcf8bf59f..c7d01c99f2 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -1,8 +1,9 @@ use solana_gossip::cluster_info::ClusterInfo; use solana_poh::poh_recorder::PohRecorder; +use solana_runtime::bank_forks::BankForks; use solana_sdk::{clock::Slot, transaction::Transaction}; use std::{ - sync::{mpsc::Receiver, Arc, Mutex}, + sync::{mpsc::Receiver, Arc, Mutex, RwLock}, thread::{self, Builder, JoinHandle}, }; @@ -38,12 +39,15 @@ impl VotingService { vote_receiver: Receiver, cluster_info: Arc, poh_recorder: Arc>, + bank_forks: Arc>, ) -> Self { let thread_hdl = Builder::new() .name("sol-vote-service".to_string()) .spawn(move || { for vote_op in vote_receiver.iter() { - Self::handle_vote(&cluster_info, &poh_recorder, vote_op); + let rooted_bank = bank_forks.read().unwrap().root_bank().clone(); + let send_to_tpu_vote_port = rooted_bank.send_to_tpu_vote_port_enabled(); + Self::handle_vote(&cluster_info, &poh_recorder, vote_op, send_to_tpu_vote_port); } }) .unwrap(); @@ -54,11 +58,14 @@ impl VotingService { cluster_info: &ClusterInfo, poh_recorder: &Mutex, vote_op: VoteOp, + send_to_tpu_vote_port: bool, ) { - let _ = cluster_info.send_vote( - vote_op.tx(), - crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder), - ); + let target_address = if send_to_tpu_vote_port { + crate::banking_stage::next_leader_tpu_vote(cluster_info, poh_recorder) + } else { + crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder) + }; + let _ = cluster_info.send_vote(vote_op.tx(), target_address); match vote_op { VoteOp::PushVote { tx, tower_slots } => { diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index ef5624885e..35853aada9 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -255,7 +255,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "GANv3KVkTYF84kmg1bAuWEZd9MaiYzPquuu13hup3379")] +#[frozen_abi(digest = "3qq56sFGXGbNqr7qKq8x47t144ugdfv5adCkVJUMnMf3")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { @@ -759,7 +759,7 @@ impl ClusterInfo { }; let ip_addr = node.gossip.ip(); Some(format!( - "{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", + "{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", if ContactInfo::is_valid_address(&node.gossip, &self.socket_addr_space) { ip_addr.to_string() } else { @@ -774,6 +774,7 @@ impl ClusterInfo { "-".to_string() }, addr_to_string(&ip_addr, &node.gossip), + addr_to_string(&ip_addr, &node.tpu_vote), addr_to_string(&ip_addr, &node.tpu), addr_to_string(&ip_addr, &node.tpu_forwards), addr_to_string(&ip_addr, &node.tvu), @@ -788,9 +789,9 @@ impl ClusterInfo { format!( "IP Address |Age(ms)| Node identifier \ - | Version |Gossip| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer\n\ + | Version |Gossip|TPUvote| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer\n\ ------------------+-------+----------------------------------------------+---------+\ - ------+------+------+------+------+------+------+--------\n\ + ------+------+-------+------+------+------+------+------+--------\n\ {}\ Nodes: {}{}{}", nodes.join(""), @@ -2705,6 +2706,7 @@ pub struct Sockets { pub tvu_forwards: Vec, pub tpu: Vec, pub tpu_forwards: Vec, + pub tpu_vote: Vec, pub broadcast: Vec, pub repair: UdpSocket, pub retransmit_sockets: Vec, @@ -2731,6 +2733,7 @@ impl Node { let tvu = UdpSocket::bind("127.0.0.1:0").unwrap(); let tvu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap(); let tpu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap(); + let tpu_vote = UdpSocket::bind("127.0.0.1:0").unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let rpc_port = find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap(); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port); @@ -2741,7 +2744,6 @@ impl Node { let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap(); - let unused = UdpSocket::bind("0.0.0.0:0").unwrap(); let info = ContactInfo { id: *pubkey, gossip: gossip_addr, @@ -2750,7 +2752,7 @@ impl Node { repair: repair.local_addr().unwrap(), tpu: tpu.local_addr().unwrap(), tpu_forwards: tpu_forwards.local_addr().unwrap(), - unused: unused.local_addr().unwrap(), + tpu_vote: tpu_vote.local_addr().unwrap(), rpc: rpc_addr, rpc_pubsub: rpc_pubsub_addr, serve_repair: serve_repair.local_addr().unwrap(), @@ -2766,6 +2768,7 @@ impl Node { tvu_forwards: vec![tvu_forwards], tpu: vec![tpu], tpu_forwards: vec![tpu_forwards], + tpu_vote: vec![tpu_vote], broadcast, repair, retransmit_sockets: vec![retransmit_socket], @@ -2806,6 +2809,7 @@ impl Node { let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range); let (tpu_port, tpu) = Self::bind(bind_ip_addr, port_range); let (tpu_forwards_port, tpu_forwards) = Self::bind(bind_ip_addr, port_range); + let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); let (repair_port, repair) = Self::bind(bind_ip_addr, port_range); let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range); @@ -2822,7 +2826,7 @@ impl Node { repair: SocketAddr::new(gossip_addr.ip(), repair_port), tpu: SocketAddr::new(gossip_addr.ip(), tpu_port), tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), - unused: socketaddr_any!(), + tpu_vote: SocketAddr::new(gossip_addr.ip(), tpu_vote_port), rpc: SocketAddr::new(gossip_addr.ip(), rpc_port), rpc_pubsub: SocketAddr::new(gossip_addr.ip(), rpc_pubsub_port), serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port), @@ -2840,6 +2844,7 @@ impl Node { tvu_forwards: vec![tvu_forwards], tpu: vec![tpu], tpu_forwards: vec![tpu_forwards], + tpu_vote: vec![tpu_vote], broadcast: vec![broadcast], repair, retransmit_sockets: vec![retransmit_socket], @@ -2869,6 +2874,9 @@ impl Node { let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind"); + let (tpu_vote_port, tpu_vote_sockets) = + multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind"); + let (_, retransmit_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("retransmit multi_bind"); @@ -2886,7 +2894,7 @@ impl Node { repair: SocketAddr::new(gossip_addr.ip(), repair_port), tpu: SocketAddr::new(gossip_addr.ip(), tpu_port), tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), - unused: socketaddr_any!(), + tpu_vote: SocketAddr::new(gossip_addr.ip(), tpu_vote_port), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port), @@ -2903,6 +2911,7 @@ impl Node { tvu_forwards: tvu_forwards_sockets, tpu: tpu_sockets, tpu_forwards: tpu_forwards_sockets, + tpu_vote: tpu_vote_sockets, broadcast, repair, retransmit_sockets, diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index 30ec785ef4..2ca4dcc23c 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -28,7 +28,7 @@ pub struct ContactInfo { /// address to forward unprocessed transactions to pub tpu_forwards: SocketAddr, /// address to which to send bank state requests - pub unused: SocketAddr, + pub tpu_vote: SocketAddr, /// address to which to send JSON-RPC requests pub rpc: SocketAddr, /// websocket for JSON-RPC push notifications @@ -76,7 +76,7 @@ impl Default for ContactInfo { repair: socketaddr_any!(), tpu: socketaddr_any!(), tpu_forwards: socketaddr_any!(), - unused: socketaddr_any!(), + tpu_vote: socketaddr_any!(), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), serve_repair: socketaddr_any!(), @@ -96,7 +96,7 @@ impl ContactInfo { repair: socketaddr!("127.0.0.1:1237"), tpu: socketaddr!("127.0.0.1:1238"), tpu_forwards: socketaddr!("127.0.0.1:1239"), - unused: socketaddr!("127.0.0.1:1240"), + tpu_vote: socketaddr!("127.0.0.1:1240"), rpc: socketaddr!("127.0.0.1:1241"), rpc_pubsub: socketaddr!("127.0.0.1:1242"), serve_repair: socketaddr!("127.0.0.1:1243"), @@ -126,7 +126,7 @@ impl ContactInfo { repair: addr, tpu: addr, tpu_forwards: addr, - unused: addr, + tpu_vote: addr, rpc: addr, rpc_pubsub: addr, serve_repair: addr, @@ -152,6 +152,7 @@ impl ContactInfo { let rpc = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT); let rpc_pubsub = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); let serve_repair = next_port(bind_addr, 6); + let tpu_vote = next_port(&bind_addr, 7); Self { id: *pubkey, gossip, @@ -160,7 +161,7 @@ impl ContactInfo { repair, tpu, tpu_forwards, - unused: "0.0.0.0:0".parse().unwrap(), + tpu_vote, rpc, rpc_pubsub, serve_repair, @@ -262,7 +263,7 @@ mod tests { assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); - assert!(ci.unused.ip().is_unspecified()); + assert!(ci.tpu_vote.ip().is_unspecified()); assert!(ci.serve_repair.ip().is_unspecified()); } #[test] @@ -274,7 +275,7 @@ mod tests { assert!(ci.rpc.ip().is_multicast()); assert!(ci.rpc_pubsub.ip().is_multicast()); assert!(ci.tpu.ip().is_multicast()); - assert!(ci.unused.ip().is_multicast()); + assert!(ci.tpu_vote.ip().is_multicast()); assert!(ci.serve_repair.ip().is_multicast()); } #[test] @@ -287,7 +288,7 @@ mod tests { assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); - assert!(ci.unused.ip().is_unspecified()); + assert!(ci.tpu_vote.ip().is_unspecified()); assert!(ci.serve_repair.ip().is_unspecified()); } #[test] @@ -295,12 +296,12 @@ mod tests { let addr = socketaddr!("127.0.0.1:10"); let ci = ContactInfo::new_with_socketaddr(&addr); assert_eq!(ci.tpu, addr); + assert_eq!(ci.tpu_vote.port(), 17); assert_eq!(ci.gossip.port(), 11); assert_eq!(ci.tvu.port(), 12); assert_eq!(ci.tpu_forwards.port(), 13); assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT); assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); - assert!(ci.unused.ip().is_unspecified()); assert_eq!(ci.serve_repair.port(), 16); } @@ -327,6 +328,7 @@ mod tests { assert_eq!(d1.tvu_forwards, socketaddr!("127.0.0.1:1238")); assert_eq!(d1.repair, socketaddr!("127.0.0.1:1239")); assert_eq!(d1.serve_repair, socketaddr!("127.0.0.1:1240")); + assert_eq!(d1.tpu_vote, socketaddr!("127.0.0.1:1241")); } #[test] diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 2a013ae191..bcc487d12d 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -19,7 +19,7 @@ fn bench_sigverify(bencher: &mut Bencher) { let recycler_out = Recycler::default(); // verify packets bencher.iter(|| { - let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); + let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false); }) } @@ -34,6 +34,6 @@ fn bench_get_offsets(bencher: &mut Bencher) { let recycler = Recycler::default(); // verify packets bencher.iter(|| { - let _ans = sigverify::generate_offsets(&mut batches, &recycler); + let _ans = sigverify::generate_offsets(&mut batches, &recycler, false); }) } diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 8ed29edafa..84ee833372 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -103,8 +103,8 @@ pub fn init() { } } -fn verify_packet(packet: &mut Packet) { - let packet_offsets = get_packet_offsets(packet, 0); +fn verify_packet(packet: &mut Packet, reject_non_vote: bool) { + let packet_offsets = get_packet_offsets(packet, 0, reject_non_vote); let mut sig_start = packet_offsets.sig_start as usize; let mut pubkey_start = packet_offsets.pubkey_start as usize; let msg_start = packet_offsets.msg_start as usize; @@ -249,15 +249,20 @@ fn do_get_packet_offsets( )) } -fn get_packet_offsets(packet: &mut Packet, current_offset: usize) -> PacketOffsets { +fn get_packet_offsets( + packet: &mut Packet, + current_offset: usize, + reject_non_vote: bool, +) -> PacketOffsets { let unsanitized_packet_offsets = do_get_packet_offsets(packet, current_offset); if let Ok(offsets) = unsanitized_packet_offsets { check_for_simple_vote_transaction(packet, &offsets, current_offset).ok(); - offsets - } else { - // force sigverify to fail by returning zeros - PacketOffsets::new(0, 0, 0, 0, 0) + if !reject_non_vote || packet.meta.is_simple_vote_tx { + return offsets; + } } + // force sigverify to fail by returning zeros + PacketOffsets::new(0, 0, 0, 0, 0) } fn check_for_simple_vote_transaction( @@ -328,7 +333,11 @@ fn check_for_simple_vote_transaction( Ok(()) } -pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler) -> TxOffsets { +pub fn generate_offsets( + batches: &mut [Packets], + recycler: &Recycler, + reject_non_vote: bool, +) -> TxOffsets { debug!("allocating.."); let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets"); signature_offsets.set_pinnable(); @@ -343,7 +352,7 @@ pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler) batches.iter_mut().for_each(|p| { let mut sig_lens = Vec::new(); p.packets.iter_mut().for_each(|packet| { - let packet_offsets = get_packet_offsets(packet, current_offset); + let packet_offsets = get_packet_offsets(packet, current_offset, reject_non_vote); sig_lens.push(packet_offsets.sig_len); @@ -377,14 +386,16 @@ pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler) ) } -pub fn ed25519_verify_cpu(batches: &mut [Packets]) { +pub fn ed25519_verify_cpu(batches: &mut [Packets], reject_non_vote: bool) { use rayon::prelude::*; let count = batch_size(batches); debug!("CPU ECDSA for {}", batch_size(batches)); PAR_THREAD_POOL.install(|| { - batches - .into_par_iter() - .for_each(|p| p.packets.par_iter_mut().for_each(|p| verify_packet(p))) + batches.into_par_iter().for_each(|p| { + p.packets + .par_iter_mut() + .for_each(|p| verify_packet(p, reject_non_vote)) + }) }); inc_new_counter_debug!("ed25519_verify_cpu", count); } @@ -464,10 +475,11 @@ pub fn ed25519_verify( batches: &mut [Packets], recycler: &Recycler, recycler_out: &Recycler>, + reject_non_vote: bool, ) { let api = perf_libs::api(); if api.is_none() { - return ed25519_verify_cpu(batches); + return ed25519_verify_cpu(batches, reject_non_vote); } let api = api.unwrap(); @@ -480,11 +492,11 @@ pub fn ed25519_verify( // may be busy doing other things while being a real validator // TODO: dynamically adjust this crossover if count < 64 { - return ed25519_verify_cpu(batches); + return ed25519_verify_cpu(batches, reject_non_vote); } let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) = - generate_offsets(batches, recycler); + generate_offsets(batches, recycler, reject_non_vote); debug!("CUDA ECDSA for {}", batch_size(batches)); debug!("allocating out.."); @@ -599,7 +611,7 @@ mod tests { let message_data = tx.message_data(); let mut packet = sigverify::make_packet_from_transaction(tx.clone()); - let packet_offsets = sigverify::get_packet_offsets(&mut packet, 0); + let packet_offsets = sigverify::get_packet_offsets(&mut packet, 0, false); assert_eq!( memfind(&tx_bytes, tx.signatures[0].as_ref()), @@ -694,7 +706,7 @@ mod tests { let res = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!(res, Err(PacketError::InvalidPubkeyLen)); - verify_packet(&mut packet); + verify_packet(&mut packet, false); assert!(packet.meta.discard); packet.meta.discard = false; @@ -730,7 +742,7 @@ mod tests { let res = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!(res, Err(PacketError::InvalidPubkeyLen)); - verify_packet(&mut packet); + verify_packet(&mut packet, false); assert!(packet.meta.discard); packet.meta.discard = false; @@ -824,7 +836,8 @@ mod tests { // Just like get_packet_offsets, but not returning redundant information. fn get_packet_offsets_from_tx(tx: Transaction, current_offset: u32) -> PacketOffsets { let mut packet = sigverify::make_packet_from_transaction(tx); - let packet_offsets = sigverify::get_packet_offsets(&mut packet, current_offset as usize); + let packet_offsets = + sigverify::get_packet_offsets(&mut packet, current_offset as usize, false); PacketOffsets::new( packet_offsets.sig_len, packet_offsets.sig_start - current_offset, @@ -905,7 +918,7 @@ mod tests { fn ed25519_verify(batches: &mut [Packets]) { let recycler = Recycler::default(); let recycler_out = Recycler::default(); - sigverify::ed25519_verify(batches, &recycler, &recycler_out); + sigverify::ed25519_verify(batches, &recycler, &recycler_out, false); } #[test] @@ -1003,8 +1016,8 @@ mod tests { // verify from GPU verification pipeline (when GPU verification is enabled) are // equivalent to the CPU verification pipeline. let mut batches_cpu = batches.clone(); - sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); - ed25519_verify_cpu(&mut batches_cpu); + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false); + ed25519_verify_cpu(&mut batches_cpu, false); // check result batches diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 872bc1aa02..707a4145f8 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -5135,6 +5135,11 @@ impl Bank { .is_active(&feature_set::stakes_remove_delegation_if_inactive::id()) } + pub fn send_to_tpu_vote_port_enabled(&self) -> bool { + self.feature_set + .is_active(&feature_set::send_to_tpu_vote_port::id()) + } + // Check if the wallclock time from bank creation to now has exceeded the allotted // time for transaction processing pub fn should_bank_still_be_processing_txs( @@ -5449,7 +5454,7 @@ pub fn goto_end_of_slot(bank: &mut Bank) { } } -pub fn is_simple_vote_transaction(transaction: &Transaction) -> bool { +fn is_simple_vote_transaction(transaction: &Transaction) -> bool { if transaction.message.instructions.len() == 1 { let instruction = &transaction.message.instructions[0]; let program_pubkey = diff --git a/sdk/docker-solana/Dockerfile b/sdk/docker-solana/Dockerfile index 1beecc8ae8..6452efdcc5 100644 --- a/sdk/docker-solana/Dockerfile +++ b/sdk/docker-solana/Dockerfile @@ -30,6 +30,8 @@ EXPOSE 8006/udp EXPOSE 8007/udp # broadcast EXPOSE 8008/udp +# tpu_vote +EXPOSE 8009/udp RUN apt update && \ apt-get install -y bzip2 libssl-dev && \ diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 9eb8cffc51..5d092837a2 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -215,6 +215,10 @@ pub mod stakes_remove_delegation_if_inactive { solana_sdk::declare_id!("HFpdDDNQjvcXnXKec697HDDsyk6tFoWS2o8fkxuhQZpL"); } +pub mod send_to_tpu_vote_port { + solana_sdk::declare_id!("C5fh68nJ7uyKAuYZg2x9sEQ5YrVf3dkW6oojNBSc3Jvo"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -269,6 +273,7 @@ lazy_static! { (fix_write_privs::id(), "fix native invoke write privileges"), (reduce_required_deploy_balance::id(), "reduce required payer balance for program deploys"), (stakes_remove_delegation_if_inactive::id(), "remove delegations from stakes cache when inactive"), + (send_to_tpu_vote_port::id(), "Send votes to the tpu vote port"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter() diff --git a/validator/src/main.rs b/validator/src/main.rs index 71edf98900..881f0c3e8e 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -681,6 +681,9 @@ fn verify_reachable_ports( if ContactInfo::is_valid_address(&node.info.tpu_forwards, socket_addr_space) { udp_sockets.extend(node.sockets.tpu_forwards.iter()); } + if ContactInfo::is_valid_address(&node.info.tpu_vote, socket_addr_space) { + udp_sockets.extend(node.sockets.tpu_vote.iter()); + } if ContactInfo::is_valid_address(&node.info.tvu, socket_addr_space) { udp_sockets.extend(node.sockets.tvu.iter()); udp_sockets.extend(node.sockets.broadcast.iter());