flag end-of-slot when poh bank is gone (backport #23069) (#23174)

* flag end-of-slot when poh bank is gone

(cherry picked from commit 03bf66a51b)

# Conflicts:
#	core/src/banking_stage.rs

* merge fix

Co-authored-by: Tao Zhu <tao@solana.com>
This commit is contained in:
mergify[bot]
2022-02-16 18:08:03 +00:00
committed by GitHub
parent 68934353f2
commit 41bbc11a46

View File

@ -316,6 +316,12 @@ pub struct BatchedTransactionCostDetails {
pub batched_execute_cost: u64,
}
#[derive(Debug, Default)]
struct EndOfSlot {
next_slot_leader: Option<Pubkey>,
working_bank: Option<Arc<Bank>>,
}
/// Stores the stage's thread handle and output receiver.
pub struct BankingStage {
bank_thread_hdls: Vec<JoinHandle<()>>,
@ -510,38 +516,44 @@ impl BankingStage {
let mut consumed_buffered_packets_count = 0;
let buffered_packet_batches_len = buffered_packet_batches.len();
let mut proc_start = Measure::start("consume_buffered_process");
let mut reached_end_of_slot = None;
let mut reached_end_of_slot: Option<EndOfSlot> = None;
buffered_packet_batches.retain_mut(|buffered_packet_batch_and_offsets| {
let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) =
buffered_packet_batch_and_offsets;
if let Some((next_leader, bank)) = &reached_end_of_slot {
if let Some(end_of_slot) = &reached_end_of_slot {
// We've hit the end of this slot, no need to perform more processing,
// just filter the remaining packets for the invalid (e.g. too old) ones
let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot(
bank,
packet_batch,
original_unprocessed_indexes,
my_pubkey,
*next_leader,
banking_stage_stats,
);
let end_of_slot_filtered_invalid_count = original_unprocessed_indexes
.len()
.saturating_sub(new_unprocessed_indexes.len());
// if the working_bank is available
if let Some(bank) = &end_of_slot.working_bank {
let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot(
bank,
packet_batch,
original_unprocessed_indexes,
my_pubkey,
end_of_slot.next_slot_leader,
banking_stage_stats,
);
slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count(
end_of_slot_filtered_invalid_count as u64,
);
let end_of_slot_filtered_invalid_count = original_unprocessed_indexes
.len()
.saturating_sub(new_unprocessed_indexes.len());
banking_stage_stats
.end_of_slot_filtered_invalid_count
.fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed);
slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count(
end_of_slot_filtered_invalid_count as u64,
);
Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes,
new_unprocessed_indexes,
)
banking_stage_stats
.end_of_slot_filtered_invalid_count
.fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed);
Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes,
new_unprocessed_indexes,
)
} else {
true
}
} else {
let bank_start = poh_recorder.lock().unwrap().bank_start();
if let Some(BankStart {
@ -573,10 +585,10 @@ impl BankingStage {
max_tx_ingestion_ns,
)
{
reached_end_of_slot = Some((
poh_recorder.lock().unwrap().next_slot_leader(),
working_bank,
));
reached_end_of_slot = Some(EndOfSlot {
next_slot_leader: poh_recorder.lock().unwrap().next_slot_leader(),
working_bank: Some(working_bank),
});
}
// The difference between all transactions passed to execution and the ones that
@ -603,6 +615,13 @@ impl BankingStage {
}
has_more_unprocessed_transactions
} else {
// mark as end-of-slot to avoid aggressively lock poh for the remaining for
// packet batches in buffer
reached_end_of_slot = Some(EndOfSlot {
next_slot_leader: poh_recorder.lock().unwrap().next_slot_leader(),
working_bank: None,
});
// `original_unprocessed_indexes` must have remaining packets to process
// if not yet processed.
assert!(Self::packet_has_more_unprocessed_transactions(