diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index f79a6cf83c..0c08fcc6ed 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -22,8 +22,8 @@ use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_sdk::poh_config::PohConfig; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::{ - self, duration_as_us, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES, - MAX_TRANSACTION_FORWARDING_DELAY, + self, duration_as_us, DEFAULT_NUM_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, + MAX_RECENT_BLOCKHASHES, MAX_TRANSACTION_FORWARDING_DELAY, }; use solana_sdk::transaction::{self, Transaction, TransactionError}; use std::net::UdpSocket; @@ -38,6 +38,9 @@ use sys_info; type PacketsAndOffsets = (Packets, Vec); pub type UnprocessedPackets = Vec; +/// Transaction forwarding +pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 4; + // number of threads is 1 until mt bank is ready pub const NUM_THREADS: u32 = 10; @@ -249,17 +252,15 @@ impl BankingStage { buffered_packets: &mut Vec, enable_forwarding: bool, ) -> Result<()> { - let (decision, next_leader) = { + let decision = { let poh = poh_recorder.lock().unwrap(); - let next_leader = poh.next_slot_leader(); - ( - Self::consume_or_forward_packets( - next_leader, - poh.bank().is_some(), - poh.would_be_leader(DEFAULT_TICKS_PER_SLOT * 2), - my_pubkey, + Self::consume_or_forward_packets( + poh.next_slot_leader(), + poh.bank().is_some(), + poh.would_be_leader( + (FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT, ), - next_leader, + my_pubkey, ) }; @@ -272,6 +273,9 @@ impl BankingStage { } BufferedPacketsDecision::Forward => { if enable_forwarding { + let poh = poh_recorder.lock().unwrap(); + let next_leader = + poh.leader_after_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET); next_leader.map_or(Ok(()), |leader_pubkey| { let leader_addr = { cluster_info @@ -608,10 +612,20 @@ impl BankingStage { let filter = Self::prepare_filter_for_pending_transactions(transactions, pending_indexes); let mut error_counters = ErrorCounters::default(); + // The following code also checks if the blockhash for a transaction is too old + // The check accounts for + // 1. Transaction forwarding delay + // 2. The slot at which the next leader will actually process the transaction + // Drop the transaction if it will expire by the time the next node receives and processes it let result = bank.check_transactions( transactions, &filter, - (MAX_RECENT_BLOCKHASHES - MAX_TRANSACTION_FORWARDING_DELAY) / 2, + (MAX_RECENT_BLOCKHASHES / 2) + .saturating_sub(MAX_TRANSACTION_FORWARDING_DELAY) + .saturating_sub( + (FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET * bank.ticks_per_slot() + / DEFAULT_NUM_TICKS_PER_SECOND) as usize, + ), &mut error_counters, ); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 599e29b921..5b2736f1ad 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -1,5 +1,6 @@ //! The `fetch_stage` batches input from a UDP socket and sends it to a channel. +use crate::banking_stage::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET; use crate::poh_recorder::PohRecorder; use crate::result::{Error, Result}; use crate::service::Service; @@ -61,11 +62,11 @@ impl FetchStage { batch.push(more); } - if poh_recorder - .lock() - .unwrap() - .would_be_leader(DEFAULT_TICKS_PER_SLOT * 2) - { + if poh_recorder.lock().unwrap().would_be_leader( + FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET + .saturating_add(1) + .saturating_mul(DEFAULT_TICKS_PER_SLOT), + ) { inc_new_counter_debug!("fetch_stage-honor_forwards", len); for packets in batch { if sendr.send(packets).is_err() { diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index ccb21ead83..552df01589 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -105,6 +105,13 @@ impl PohRecorder { self.leader_schedule_cache.slot_leader_at(slot + 1, None) } + pub fn leader_after_slots(&self, slots: u64) -> Option { + let slot = + leader_schedule_utils::tick_height_to_slot(self.ticks_per_slot, self.tick_height()); + self.leader_schedule_cache + .slot_leader_at(slot + slots, None) + } + pub fn start_slot(&self) -> u64 { self.start_slot }