diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 6b435e58d6..b68b5e9734 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -20,7 +20,10 @@ use solana_runtime::accounts_db::ErrorCounters; use solana_runtime::bank::Bank; use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_sdk::pubkey::Pubkey; -use solana_sdk::timing::{self, duration_as_us, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES}; +use solana_sdk::timing::{ + self, duration_as_us, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES, + MAX_TRANSACTION_FORWARDING_DELAY, +}; use solana_sdk::transaction::{self, Transaction, TransactionError}; use std::cmp; use std::net::UdpSocket; @@ -114,17 +117,19 @@ impl BankingStage { Self { bank_thread_hdls } } + fn filter_valid_packets_for_forwarding(all_packets: &[(Packets, Vec)]) -> Vec<&Packet> { + all_packets + .iter() + .flat_map(|(p, valid_indexes)| valid_indexes.iter().map(move |x| &p.packets[*x])) + .collect() + } + fn forward_buffered_packets( socket: &std::net::UdpSocket, tpu_via_blobs: &std::net::SocketAddr, unprocessed_packets: &[(Packets, Vec)], ) -> std::io::Result<()> { - let packets: Vec<&Packet> = unprocessed_packets - .iter() - .flat_map(|(p, unprocessed_indexes)| { - unprocessed_indexes.iter().map(move |x| &p.packets[*x]) - }) - .collect(); + let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets); inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); let blobs = packet::packets_to_blobs(&packets); @@ -577,7 +582,7 @@ impl BankingStage { let result = bank.check_transactions( &transactions, &filter, - MAX_RECENT_BLOCKHASHES / 2, + (MAX_RECENT_BLOCKHASHES - MAX_TRANSACTION_FORWARDING_DELAY) / 2, &mut error_counters, ); @@ -621,13 +626,17 @@ impl BankingStage { .filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None }) .collect(); if bank_shutdown { - unprocessed_packets.push((msgs, packet_indexes)); + if !packet_indexes.is_empty() { + unprocessed_packets.push((msgs, packet_indexes)); + } continue; } let bank = poh.lock().unwrap().bank(); if bank.is_none() { - unprocessed_packets.push((msgs, packet_indexes)); + if !packet_indexes.is_empty() { + unprocessed_packets.push((msgs, packet_indexes)); + } continue; } let bank = bank.unwrap(); @@ -718,6 +727,7 @@ mod tests { use crate::packet::to_packets; use crate::poh_recorder::WorkingBank; use crate::{get_tmp_ledger_path, tmp_ledger_name}; + use itertools::Itertools; use solana_sdk::instruction::InstructionError; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; @@ -1402,4 +1412,41 @@ mod tests { } Blocktree::destroy(&ledger_path).unwrap(); } + + #[test] + fn test_filter_valid_packets() { + solana_logger::setup(); + + let all_packets = (0..16) + .map(|packets_id| { + let packets = Packets::new( + (0..32) + .map(|packet_id| { + let mut p = Packet::default(); + p.meta.port = packets_id << 8 | packet_id; + p + }) + .collect_vec(), + ); + let valid_indexes = (0..32) + .filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None }) + .collect_vec(); + (packets, valid_indexes) + }) + .collect_vec(); + + let result = BankingStage::filter_valid_packets_for_forwarding(&all_packets); + + assert_eq!(result.len(), 256); + + let _ = result + .into_iter() + .enumerate() + .map(|(index, p)| { + let packets_id = index / 16; + let packet_id = (index % 16) * 2 + 1; + assert_eq!(p.meta.port, (packets_id << 8 | packet_id) as u16); + }) + .collect_vec(); + } } diff --git a/sdk/src/timing.rs b/sdk/src/timing.rs index b663323c6e..9858b94c17 100644 --- a/sdk/src/timing.rs +++ b/sdk/src/timing.rs @@ -24,6 +24,10 @@ pub const MAX_HASH_AGE_IN_SECONDS: usize = 120; // This must be <= MAX_HASH_AGE_IN_SECONDS, otherwise there's risk for DuplicateSignature errors pub const MAX_RECENT_BLOCKHASHES: usize = MAX_HASH_AGE_IN_SECONDS; +/// This is maximum time consumed in forwarding a transaction from one node to next, before +/// it can be processed in the target node +pub const MAX_TRANSACTION_FORWARDING_DELAY: usize = 3; + pub fn duration_as_ns(d: &Duration) -> u64 { d.as_secs() * 1_000_000_000 + u64::from(d.subsec_nanos()) }