From 83d08223fd67184ab3f81666736229a5b4e2b867 Mon Sep 17 00:00:00 2001 From: sakridge Date: Tue, 14 Sep 2021 17:08:53 +0300 Subject: [PATCH] More transaction forwarding (#19834) --- core/src/banking_stage.rs | 73 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 71 insertions(+), 2 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index f5334541ae..e6dd53d28d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -3,6 +3,7 @@ //! can do its processing in parallel with signature verification on the GPU. use crate::{ cluster_info::ClusterInfo, + data_budget::DataBudget, packet_hasher::PacketHasher, poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder, WorkingBankEntry}, poh_service::{self, PohService}, @@ -260,6 +261,7 @@ impl BankingStage { LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default(), ))); + let data_budget = Arc::new(DataBudget::default()); // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { @@ -276,6 +278,7 @@ impl BankingStage { let transaction_status_sender = transaction_status_sender.clone(); let gossip_vote_sender = gossip_vote_sender.clone(); let duplicates = duplicates.clone(); + let data_budget = data_budget.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -292,6 +295,7 @@ impl BankingStage { transaction_status_sender, gossip_vote_sender, &duplicates, + &data_budget, ); }) .unwrap() @@ -315,11 +319,21 @@ impl BankingStage { socket: &std::net::UdpSocket, tpu_forwards: &std::net::SocketAddr, unprocessed_packets: &UnprocessedPackets, + data_budget: &DataBudget, ) -> std::io::Result<()> { let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets.iter()); inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); + const INTERVAL_MS: u64 = 100; + const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; + const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; + const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5; + data_budget.update(INTERVAL_MS, |bytes| { + std::cmp::min(bytes + MAX_BYTES_PER_INTERVAL, MAX_BYTES_BUDGET) + }); for p in packets { - socket.send_to(&p.data[..p.meta.size], &tpu_forwards)?; + if data_budget.take(p.meta.size) { + socket.send_to(&p.data[..p.meta.size], &tpu_forwards)?; + } } Ok(()) @@ -486,6 +500,7 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, + data_budget: &DataBudget, ) -> BufferedPacketsDecision { let bank_start; let ( @@ -536,6 +551,7 @@ impl BankingStage { poh_recorder, socket, false, + &data_budget, ); } BufferedPacketsDecision::ForwardAndHold => { @@ -546,6 +562,7 @@ impl BankingStage { poh_recorder, socket, true, + &data_budget, ); } _ => (), @@ -560,6 +577,7 @@ impl BankingStage { poh_recorder: &Arc>, socket: &UdpSocket, hold: bool, + data_budget: &DataBudget, ) { if !enable_forwarding { if !hold { @@ -572,7 +590,7 @@ impl BankingStage { Some(addr) => addr, None => return, }; - let _ = Self::forward_buffered_packets(socket, &addr, buffered_packets); + let _ = Self::forward_buffered_packets(socket, &addr, buffered_packets, data_budget); if hold { buffered_packets.retain(|(_, index, _)| !index.is_empty()); for (_, _, forwarded) in buffered_packets.iter_mut() { @@ -596,6 +614,7 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, duplicates: &Arc, PacketHasher)>>, + data_budget: &DataBudget, ) { let recorder = poh_recorder.lock().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -614,6 +633,7 @@ impl BankingStage { &gossip_vote_sender, &banking_stage_stats, &recorder, + &data_budget, ); if matches!(decision, BufferedPacketsDecision::Hold) || matches!(decision, BufferedPacketsDecision::ForwardAndHold) @@ -2694,6 +2714,55 @@ mod tests { Blockstore::destroy(&ledger_path).unwrap(); } + #[test] + fn test_forwarder_budget() { + solana_logger::setup(); + // Create `Packets` with 1 unprocessed element + let single_element_packets = Packets::new(vec![Packet::default()]); + let mut unprocessed_packets: UnprocessedPackets = + vec![(single_element_packets, vec![0], false)] + .into_iter() + .collect(); + + let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); + + let genesis_config_info = create_slow_genesis_config(10_000); + let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info; + + let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config)); + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Arc::new( + Blockstore::open(&ledger_path) + .expect("Expected to be able to open database ledger"), + ); + let poh_config = PohConfig { + // limit tick count to avoid clearing working_bank at + // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage + target_tick_count: Some(bank.max_tick_height() - 1), + ..PohConfig::default() + }; + + let (exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blockstore, Some(poh_config)); + + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let data_budget = DataBudget::default(); + BankingStage::handle_forwarding( + true, + &cluster_info, + &mut unprocessed_packets, + &poh_recorder, + &socket, + false, + &data_budget, + ); + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + Blockstore::destroy(&ledger_path).unwrap(); + } + #[test] fn test_push_unprocessed_batch_limit() { solana_logger::setup();