Use poh would_be_leader check in banking stage to hold or forward txs (#3948)

This commit is contained in:
Pankaj Garg
2019-04-23 11:56:30 -07:00
committed by GitHub
parent 840a64ee8b
commit 7372ec9e1a
3 changed files with 55 additions and 31 deletions

View File

@ -19,7 +19,7 @@ use solana_metrics::counter::Counter;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_runtime::locked_accounts_results::LockedAccountsResults;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{self, duration_as_us, MAX_RECENT_BLOCKHASHES}; use solana_sdk::timing::{self, duration_as_us, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES};
use solana_sdk::transaction::{self, Transaction, TransactionError}; use solana_sdk::transaction::{self, Transaction, TransactionError};
use std::cmp; use std::cmp;
use std::net::UdpSocket; use std::net::UdpSocket;
@ -186,6 +186,7 @@ impl BankingStage {
fn process_or_forward_packets( fn process_or_forward_packets(
leader_data: Option<&ContactInfo>, leader_data: Option<&ContactInfo>,
bank_is_available: bool, bank_is_available: bool,
would_be_leader: bool,
my_id: &Pubkey, my_id: &Pubkey,
) -> BufferedPacketsDecision { ) -> BufferedPacketsDecision {
leader_data.map_or( leader_data.map_or(
@ -196,6 +197,9 @@ impl BankingStage {
if bank_is_available { if bank_is_available {
// If the bank is available, this node is the leader // If the bank is available, this node is the leader
BufferedPacketsDecision::Consume BufferedPacketsDecision::Consume
} else if would_be_leader {
// If the node will be the leader soon, hold the packets for now
BufferedPacketsDecision::Hold
} else if x.id != *my_id { } else if x.id != *my_id {
// If the current node is not the leader, forward the buffered packets // If the current node is not the leader, forward the buffered packets
BufferedPacketsDecision::Forward BufferedPacketsDecision::Forward
@ -216,11 +220,15 @@ impl BankingStage {
) -> Result<UnprocessedPackets> { ) -> Result<UnprocessedPackets> {
let rcluster_info = cluster_info.read().unwrap(); let rcluster_info = cluster_info.read().unwrap();
let decision = Self::process_or_forward_packets( let decision = {
rcluster_info.leader_data(), let poh = poh_recorder.lock().unwrap();
poh_recorder.lock().unwrap().bank().is_some(), Self::process_or_forward_packets(
&rcluster_info.id(), rcluster_info.leader_data(),
); poh.bank().is_some(),
poh.would_be_leader(DEFAULT_TICKS_PER_SLOT),
&rcluster_info.id(),
)
};
match decision { match decision {
BufferedPacketsDecision::Consume => { BufferedPacketsDecision::Consume => {
@ -250,13 +258,20 @@ impl BankingStage {
// Buffer the packets if I am the next leader // Buffer the packets if I am the next leader
// or, if it was getting sent to me // or, if it was getting sent to me
// or, the next leader is unknown // or, the next leader is unknown
let leader_id = match poh_recorder.lock().unwrap().bank() { let poh = poh_recorder.lock().unwrap();
let leader_id = match poh.bank() {
Some(bank) => leader_schedule_cache Some(bank) => leader_schedule_cache
.slot_leader_at_else_compute(bank.slot() + 1, &bank) .slot_leader_at_else_compute(bank.slot() + 1, &bank)
.unwrap_or_default(), .unwrap_or_default(),
None => rcluster_info None => {
.leader_data() if poh.would_be_leader(DEFAULT_TICKS_PER_SLOT) {
.map_or(rcluster_info.id(), |x| x.id), rcluster_info.id()
} else {
rcluster_info
.leader_data()
.map_or(rcluster_info.id(), |x| x.id)
}
}
}; };
leader_id == rcluster_info.id() leader_id == rcluster_info.id()
@ -997,34 +1012,38 @@ mod tests {
let my_id1 = Pubkey::new_rand(); let my_id1 = Pubkey::new_rand();
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(None, true, &my_id), BankingStage::process_or_forward_packets(None, true, false, &my_id),
BufferedPacketsDecision::Hold BufferedPacketsDecision::Hold
); );
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(None, false, &my_id), BankingStage::process_or_forward_packets(None, false, false, &my_id),
BufferedPacketsDecision::Hold BufferedPacketsDecision::Hold
); );
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(None, false, &my_id1), BankingStage::process_or_forward_packets(None, false, false, &my_id1),
BufferedPacketsDecision::Hold BufferedPacketsDecision::Hold
); );
let mut contact_info = ContactInfo::default(); let mut contact_info = ContactInfo::default();
contact_info.id = my_id1; contact_info.id = my_id1;
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), false, &my_id), BankingStage::process_or_forward_packets(Some(&contact_info), false, false, &my_id),
BufferedPacketsDecision::Forward BufferedPacketsDecision::Forward
); );
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), true, &my_id), BankingStage::process_or_forward_packets(Some(&contact_info), false, true, &my_id),
BufferedPacketsDecision::Consume
);
assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), false, &my_id1),
BufferedPacketsDecision::Hold BufferedPacketsDecision::Hold
); );
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), true, &my_id1), BankingStage::process_or_forward_packets(Some(&contact_info), true, false, &my_id),
BufferedPacketsDecision::Consume
);
assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), false, false, &my_id1),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), true, false, &my_id1),
BufferedPacketsDecision::Consume BufferedPacketsDecision::Consume
); );
} }

View File

@ -64,7 +64,7 @@ impl FetchStage {
if poh_recorder if poh_recorder
.lock() .lock()
.unwrap() .unwrap()
.would_be_leader(1, DEFAULT_TICKS_PER_SLOT) .would_be_leader(DEFAULT_TICKS_PER_SLOT)
{ {
inc_new_counter_info!("fetch_stage-honor_forwards", len); inc_new_counter_info!("fetch_stage-honor_forwards", len);
for packets in batch { for packets in batch {

View File

@ -79,14 +79,13 @@ impl PohRecorder {
} }
} }
pub fn would_be_leader(&self, within_next_n_slots: u64, ticks_per_slot: u64) -> bool { pub fn would_be_leader(&self, within_next_n_ticks: u64) -> bool {
let close_to_leader_tick = self.start_leader_at_tick.map_or(false, |leader_tick| { let close_to_leader_tick = self.start_leader_at_tick.map_or(false, |leader_tick| {
let leader_ideal_start_tick = let leader_ideal_start_tick =
leader_tick.saturating_sub(self.max_last_leader_grace_ticks); leader_tick.saturating_sub(self.max_last_leader_grace_ticks);
self.tick_height() <= self.last_leader_tick.unwrap_or(0) self.tick_height() <= self.last_leader_tick.unwrap_or(0)
&& self.tick_height() && self.tick_height() >= leader_ideal_start_tick.saturating_sub(within_next_n_ticks)
>= leader_ideal_start_tick.saturating_sub(within_next_n_slots * ticks_per_slot)
}); });
self.working_bank.is_some() || close_to_leader_tick self.working_bank.is_some() || close_to_leader_tick
@ -1191,7 +1190,7 @@ mod tests {
// Test that with no leader slot, we don't reach the leader tick // Test that with no leader slot, we don't reach the leader tick
assert_eq!( assert_eq!(
poh_recorder.would_be_leader(2, bank.ticks_per_slot()), poh_recorder.would_be_leader(2 * bank.ticks_per_slot()),
false false
); );
@ -1201,7 +1200,7 @@ mod tests {
// Test that with no leader slot, we don't reach the leader tick after sending some ticks // Test that with no leader slot, we don't reach the leader tick after sending some ticks
assert_eq!( assert_eq!(
poh_recorder.would_be_leader(2, bank.ticks_per_slot()), poh_recorder.would_be_leader(2 * bank.ticks_per_slot()),
false false
); );
@ -1214,7 +1213,7 @@ mod tests {
); );
assert_eq!( assert_eq!(
poh_recorder.would_be_leader(2, bank.ticks_per_slot()), poh_recorder.would_be_leader(2 * bank.ticks_per_slot()),
false false
); );
@ -1229,21 +1228,27 @@ mod tests {
// Test that the node won't be leader in next 2 slots // Test that the node won't be leader in next 2 slots
assert_eq!( assert_eq!(
poh_recorder.would_be_leader(2, bank.ticks_per_slot()), poh_recorder.would_be_leader(2 * bank.ticks_per_slot()),
false false
); );
// Test that the node will be leader in next 3 slots // Test that the node will be leader in next 3 slots
assert_eq!(poh_recorder.would_be_leader(3, bank.ticks_per_slot()), true); assert_eq!(
poh_recorder.would_be_leader(3 * bank.ticks_per_slot()),
true
);
assert_eq!( assert_eq!(
poh_recorder.would_be_leader(2, bank.ticks_per_slot()), poh_recorder.would_be_leader(2 * bank.ticks_per_slot()),
false false
); );
// If we set the working bank, the node should be leader within next 2 slots // If we set the working bank, the node should be leader within next 2 slots
poh_recorder.set_bank(&bank); poh_recorder.set_bank(&bank);
assert_eq!(poh_recorder.would_be_leader(2, bank.ticks_per_slot()), true); assert_eq!(
poh_recorder.would_be_leader(2 * bank.ticks_per_slot()),
true
);
} }
} }
} }