From 00abcbe1bef8e68b07e293514ea879a706f4ec51 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 14 Feb 2022 10:23:48 +0000 Subject: [PATCH] Introduce slot-specific packet metrics (backport #22906) (#23076) * Resolve conflicts * add end_of_slot_filtered_invalid_count * Increment end_of_slot_filtered_invalid_count * Fixup comment * Remove comment * Move all process_tx metris into common function * Switch to saturating_add_assign macro * Refactor timings so each struct reports own timing * Move into accumulate * Remove unnecessary struct Co-authored-by: Carl Lin --- core/benches/banking_stage.rs | 6 +- core/src/banking_stage.rs | 180 +++-- core/src/leader_slot_banking_stage_metrics.rs | 717 ++++++++++++++++++ core/src/lib.rs | 1 + 4 files changed, 851 insertions(+), 53 deletions(-) create mode 100644 core/src/leader_slot_banking_stage_metrics.rs diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 044eb4820b..edcaf95fde 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -8,7 +8,10 @@ use { log::*, rand::{thread_rng, Rng}, rayon::prelude::*, - solana_core::banking_stage::{BankingStage, BankingStageStats}, + solana_core::{ + banking_stage::{BankingStage, BankingStageStats}, + leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, + }, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ blockstore::Blockstore, @@ -95,6 +98,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { &BankingStageStats::default(), &recorder, &Arc::new(RwLock::new(CostModel::default())), + &mut LeaderSlotMetricsTracker::new(0), ); }); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 37cd3628bb..eefc3e06dd 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -2,6 +2,9 @@ //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. use { + crate::leader_slot_banking_stage_metrics::{ + LeaderSlotMetricsTracker, ProcessTransactionsSummary, + }, crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, histogram::Histogram, itertools::Itertools, @@ -83,47 +86,6 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; -/// A summary of what happened to transactions passed to the execution pipeline. -/// Transactions can -/// 1) Did not even make it to execution due to being filtered out by things like AccountInUse -/// lock conflictss or CostModel compute limits. These types of errors are retryable and -/// counted in `Self::retryable_transaction_indexes`. -/// 2) Did not execute due to some fatal error like too old, or duplicate signature. These -/// will be dropped from the transactions queue and not counted in `Self::retryable_transaction_indexes` -/// 3) Were executed and committed, captured by `committed_transactions_count` below. -/// 4) Were executed and failed commit, captured by `failed_commit_count` below. -struct ProcessTransactionsSummary { - // Returns true if we hit the end of the block/max PoH height for the block before - // processing all the transactions in the batch. - reached_max_poh_height: bool, - - // Total number of transactions that were passed as candidates for execution. See description - // of struct above for possible outcomes for these transactions - #[allow(dead_code)] - transactions_attempted_execution_count: usize, - - // Total number of transactions that made it into the block - #[allow(dead_code)] - committed_transactions_count: usize, - - // Total number of transactions that made it into the block where the transactions - // output from execution was success/no error. - #[allow(dead_code)] - committed_transactions_with_successful_result_count: usize, - - // All transactions that were executed but then failed record because the - // slot ended - #[allow(dead_code)] - failed_commit_count: usize, - - // Indexes of transactions in the transactions slice that were not committed but are retryable - retryable_transaction_indexes: Vec, - - // The number of transactions filtered out by the cost model - #[allow(dead_code)] - cost_model_throttled_transactions_count: usize, -} - pub struct ExecuteAndCommitTransactionsOutput { // Total number of transactions that were passed as candidates for execution transactions_attempted_execution_count: usize, @@ -155,6 +117,7 @@ pub struct BankingStageStats { current_buffered_packet_batches_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, + end_of_slot_filtered_invalid_count: AtomicUsize, cost_tracker_check_count: AtomicUsize, cost_forced_retry_transactions_count: AtomicUsize, batch_packet_indexes_len: Histogram, @@ -242,6 +205,12 @@ impl BankingStageStats { self.cost_tracker_check_count.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "end_of_slot_filtered_invalid_count", + self.end_of_slot_filtered_invalid_count + .swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "cost_forced_retry_transactions_count", self.cost_forced_retry_transactions_count @@ -451,13 +420,14 @@ impl BankingStage { .collect() } + /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns + /// the number of successfully forwarded packets in second part of tuple fn forward_buffered_packets( socket: &std::net::UdpSocket, tpu_forwards: &std::net::SocketAddr, - buffered_packet_batches: &UnprocessedPacketBatches, + packets: Vec<&Packet>, data_budget: &DataBudget, - ) -> std::io::Result<()> { - let packets = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); + ) -> (std::io::Result<()>, usize) { const INTERVAL_MS: u64 = 100; const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; @@ -489,7 +459,7 @@ impl BankingStage { inc_new_counter_info!("banking_stage-forwarded_packets", forwarded_packet_count); } - forward_result + (forward_result, forwarded_packet_count) } // Returns whether the given `PacketBatch` has any more remaining unprocessed @@ -518,6 +488,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, cost_model: &Arc>, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { let mut rebuffered_packet_count = 0; let mut consumed_buffered_packets_count = 0; @@ -531,7 +502,7 @@ impl BankingStage { if let Some((next_leader, bank)) = &reached_end_of_slot { // We've hit the end of this slot, no need to perform more processing, // just filter the remaining packets for the invalid (e.g. too old) ones - let new_unprocessed_indexes = Self::filter_unprocessed_packets( + let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot( bank, packet_batch, original_unprocessed_indexes, @@ -539,6 +510,18 @@ impl BankingStage { *next_leader, banking_stage_stats, ); + let end_of_slot_filtered_invalid_count = original_unprocessed_indexes + .len() + .saturating_sub(new_unprocessed_indexes.len()); + + slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( + end_of_slot_filtered_invalid_count as u64, + ); + + banking_stage_stats + .end_of_slot_filtered_invalid_count + .fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed); + Self::update_buffered_packets_with_new_unprocessed( original_unprocessed_indexes, new_unprocessed_indexes, @@ -556,6 +539,7 @@ impl BankingStage { gossip_vote_sender, banking_stage_stats, cost_model, + slot_metrics_tracker, ); let ProcessTransactionsSummary { reached_max_poh_height, @@ -674,6 +658,7 @@ impl BankingStage { recorder: &TransactionRecorder, data_budget: &DataBudget, cost_model: &Arc>, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> BufferedPacketsDecision { let bank_start; let ( @@ -715,6 +700,7 @@ impl BankingStage { banking_stage_stats, recorder, cost_model, + slot_metrics_tracker, ); } BufferedPacketsDecision::Forward => { @@ -726,6 +712,7 @@ impl BankingStage { socket, false, data_budget, + slot_metrics_tracker, ); } BufferedPacketsDecision::ForwardAndHold => { @@ -737,10 +724,12 @@ impl BankingStage { socket, true, data_budget, + slot_metrics_tracker, ); } _ => (), } + decision } @@ -752,6 +741,7 @@ impl BankingStage { socket: &UdpSocket, hold: bool, data_budget: &DataBudget, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { let addr = match forward_option { ForwardOption::NotForward => { @@ -769,13 +759,35 @@ impl BankingStage { Some(addr) => addr, None => return, }; - let _ = Self::forward_buffered_packets(socket, &addr, buffered_packet_batches, data_budget); + + let forwardable_packets = + Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); + let forwardable_packets_len = forwardable_packets.len(); + let (_forward_result, sucessful_forwarded_packets_count) = + Self::forward_buffered_packets(socket, &addr, forwardable_packets, data_budget); + let failed_forwarded_packets_count = + forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count); + + if failed_forwarded_packets_count > 0 { + slot_metrics_tracker + .increment_failed_forwarded_packets_count(failed_forwarded_packets_count as u64); + slot_metrics_tracker.increment_packet_batch_forward_failure_count(1); + } + + if sucessful_forwarded_packets_count > 0 { + slot_metrics_tracker.increment_successful_forwarded_packets_count( + sucessful_forwarded_packets_count as u64, + ); + } + if hold { buffered_packet_batches.retain(|(_, index, _)| !index.is_empty()); for (_, _, forwarded) in buffered_packet_batches.iter_mut() { *forwarded = true; } } else { + slot_metrics_tracker + .increment_cleared_from_buffer_after_forward_count(forwardable_packets_len as u64); buffered_packet_batches.clear(); } } @@ -799,6 +811,7 @@ impl BankingStage { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit); let mut banking_stage_stats = BankingStageStats::new(id); + let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); loop { while !buffered_packet_batches.is_empty() { let decision = Self::process_buffered_packets( @@ -814,6 +827,7 @@ impl BankingStage { &recorder, data_budget, &cost_model, + &mut slot_metrics_tracker, ); if matches!(decision, BufferedPacketsDecision::Hold) || matches!(decision, BufferedPacketsDecision::ForwardAndHold) @@ -824,6 +838,12 @@ impl BankingStage { } } + let current_poh_bank = { + let poh = poh_recorder.lock().unwrap(); + poh.bank_start() + }; + slot_metrics_tracker.update_on_leader_slot_boundary(¤t_poh_bank); + let recv_timeout = if !buffered_packet_batches.is_empty() { // If packets are buffered, let's wait for less time on recv from the channel. // This helps detect the next leader faster, and processing the buffered @@ -848,6 +868,7 @@ impl BankingStage { &mut banking_stage_stats, &recorder, &cost_model, + &mut slot_metrics_tracker, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => break, @@ -1411,6 +1432,7 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, cost_model: &Arc>, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> ProcessTransactionsSummary { let mut packet_conversion_time = Measure::start("packet_conversion"); let (transactions, transaction_to_packet_indexes, cost_model_throttled_packet_indexes) = @@ -1470,8 +1492,14 @@ impl BankingStage { ); } }); + cost_tracking_time.stop(); + slot_metrics_tracker.accumulate_process_transactions_summary(&process_transactions_summary); + + let retryable_tx_count = retryable_transaction_indexes.len(); + inc_new_counter_info!("banking_stage-unprocessed_transactions", retryable_tx_count); + let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time"); let mut filtered_retryable_tx_indexes = Self::filter_pending_packets_from_pending_txs( bank, @@ -1481,6 +1509,12 @@ impl BankingStage { ); filter_pending_packets_time.stop(); + let retryable_packets_filtered_count = retryable_transaction_indexes + .len() + .saturating_sub(filtered_retryable_tx_indexes.len()); + slot_metrics_tracker + .increment_retryable_packets_filtered_count(retryable_packets_filtered_count as u64); + inc_new_counter_info!( "banking_stage-dropped_tx_before_forwarding", retryable_transaction_indexes @@ -1510,7 +1544,7 @@ impl BankingStage { process_transactions_summary } - fn filter_unprocessed_packets( + fn filter_unprocessed_packets_at_end_of_slot( bank: &Arc, packet_batch: &PacketBatch, transaction_indexes: &[usize], @@ -1599,6 +1633,7 @@ impl BankingStage { banking_stage_stats: &mut BankingStageStats, recorder: &TransactionRecorder, cost_model: &Arc>, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); let packet_batches = verified_receiver.recv_timeout(recv_timeout)?; @@ -1623,6 +1658,15 @@ impl BankingStage { let mut newly_buffered_packets_count = 0; while let Some(packet_batch) = packet_batch_iter.next() { let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets); + // Track all the packets incoming from sigverify, both valid and invalid + slot_metrics_tracker.increment_total_new_valid_packets(packet_indexes.len() as u64); + slot_metrics_tracker.increment_newly_failed_sigverify_count( + packet_batch + .packets + .len() + .saturating_sub(packet_indexes.len()) as u64, + ); + let bank_start = poh.lock().unwrap().bank_start(); if PohRecorder::get_bank_still_processing_txs(&bank_start).is_none() { Self::push_unprocessed( @@ -1634,6 +1678,7 @@ impl BankingStage { &mut newly_buffered_packets_count, batch_limit, banking_stage_stats, + slot_metrics_tracker, ); continue; } @@ -1651,6 +1696,7 @@ impl BankingStage { gossip_vote_sender, banking_stage_stats, cost_model, + slot_metrics_tracker, ); let ProcessTransactionsSummary { @@ -1669,6 +1715,7 @@ impl BankingStage { &mut newly_buffered_packets_count, batch_limit, banking_stage_stats, + slot_metrics_tracker, ); // If there were retryable transactions, add the unexpired ones to the buffered queue @@ -1678,24 +1725,39 @@ impl BankingStage { // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones #[allow(clippy::while_let_on_iterator)] while let Some(packet_batch) = packet_batch_iter.next() { - let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets); - let unprocessed_indexes = Self::filter_unprocessed_packets( + let original_unprocessed_indexes = + Self::generate_packet_indexes(&packet_batch.packets); + let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot( &bank, &packet_batch, - &packet_indexes, + &original_unprocessed_indexes, my_pubkey, next_leader, banking_stage_stats, ); + + let end_of_slot_filtered_invalid_count = original_unprocessed_indexes + .len() + .saturating_sub(new_unprocessed_indexes.len()); + + slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( + end_of_slot_filtered_invalid_count as u64, + ); + + banking_stage_stats + .end_of_slot_filtered_invalid_count + .fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed); + Self::push_unprocessed( buffered_packet_batches, packet_batch, - unprocessed_indexes, + new_unprocessed_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, banking_stage_stats, + slot_metrics_tracker, ); } handle_retryable_packets_time.stop(); @@ -1757,12 +1819,16 @@ impl BankingStage { newly_buffered_packets_count: &mut usize, batch_limit: usize, banking_stage_stats: &mut BankingStageStats, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { if unprocessed_packet_batches.len() >= batch_limit { *dropped_packet_batches_count += 1; if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() { *dropped_packets_count += dropped_batch.1.len(); + slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count( + dropped_batch.1.len() as u64, + ); } } let _ = banking_stage_stats @@ -1770,6 +1836,8 @@ impl BankingStage { .increment(packet_indexes.len() as u64); *newly_buffered_packets_count += packet_indexes.len(); + slot_metrics_tracker + .increment_newly_buffered_packets_count(packet_indexes.len() as u64); unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false)); } } @@ -3213,6 +3281,7 @@ mod tests { &BankingStageStats::default(), &recorder, &Arc::new(RwLock::new(CostModel::default())), + &mut LeaderSlotMetricsTracker::new(0), ); assert_eq!( buffered_packet_batches[0].1.len(), @@ -3233,6 +3302,7 @@ mod tests { &BankingStageStats::default(), &recorder, &Arc::new(RwLock::new(CostModel::default())), + &mut LeaderSlotMetricsTracker::new(0), ); if num_expected_unprocessed == 0 { assert!(buffered_packet_batches.is_empty()) @@ -3299,6 +3369,7 @@ mod tests { &BankingStageStats::default(), &recorder, &Arc::new(RwLock::new(CostModel::default())), + &mut LeaderSlotMetricsTracker::new(0), ); // Check everything is correct. All indexes after `interrupted_iteration` @@ -3396,6 +3467,7 @@ mod tests { &send_socket, true, &data_budget, + &mut LeaderSlotMetricsTracker::new(0), ); recv_socket @@ -3495,6 +3567,7 @@ mod tests { &send_socket, hold, &DataBudget::default(), + &mut LeaderSlotMetricsTracker::new(0), ); recv_socket @@ -3556,6 +3629,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &mut banking_stage_stats, + &mut LeaderSlotMetricsTracker::new(0), ); assert_eq!(unprocessed_packets.len(), 1); assert_eq!(dropped_packet_batches_count, 0); @@ -3574,6 +3648,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &mut banking_stage_stats, + &mut LeaderSlotMetricsTracker::new(0), ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(dropped_packet_batches_count, 0); @@ -3597,6 +3672,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &mut banking_stage_stats, + &mut LeaderSlotMetricsTracker::new(0), ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!( diff --git a/core/src/leader_slot_banking_stage_metrics.rs b/core/src/leader_slot_banking_stage_metrics.rs new file mode 100644 index 0000000000..83c110b641 --- /dev/null +++ b/core/src/leader_slot_banking_stage_metrics.rs @@ -0,0 +1,717 @@ +use { + solana_poh::poh_recorder::BankStart, + solana_sdk::{clock::Slot, saturating_add_assign}, + std::time::Instant, +}; + +/// A summary of what happened to transactions passed to the execution pipeline. +/// Transactions can +/// 1) Did not even make it to execution due to being filtered out by things like AccountInUse +/// lock conflicts or CostModel compute limits. These types of errors are retryable and +/// counted in `Self::retryable_transaction_indexes`. +/// 2) Did not execute due to some fatal error like too old, or duplicate signature. These +/// will be dropped from the transactions queue and not counted in `Self::retryable_transaction_indexes` +/// 3) Were executed and committed, captured by `committed_transactions_count` below. +/// 4) Were executed and failed commit, captured by `failed_commit_count` below. +pub(crate) struct ProcessTransactionsSummary { + // Returns true if we hit the end of the block/max PoH height for the block before + // processing all the transactions in the batch. + pub reached_max_poh_height: bool, + + // Total number of transactions that were passed as candidates for execution. See description + // of struct above for possible outcomes for these transactions + pub transactions_attempted_execution_count: usize, + + // Total number of transactions that made it into the block + pub committed_transactions_count: usize, + + // Total number of transactions that made it into the block where the transactions + // output from execution was success/no error. + pub committed_transactions_with_successful_result_count: usize, + + // All transactions that were executed but then failed record because the + // slot ended + pub failed_commit_count: usize, + + // Indexes of transactions in the transactions slice that were not committed but are retryable + pub retryable_transaction_indexes: Vec, + + // The number of transactions filtered out by the cost model + pub cost_model_throttled_transactions_count: usize, +} + +// Metrics capturing wallclock time spent in various parts of BankingStage during this +// validator's leader slot +#[derive(Debug)] +struct LeaderSlotTimingMetrics { + bank_detected_time: Instant, + + // Delay from when the bank was created to when this thread detected it + bank_detected_delay_us: u64, +} + +impl LeaderSlotTimingMetrics { + fn new(bank_creation_time: &Instant) -> Self { + Self { + bank_detected_time: Instant::now(), + bank_detected_delay_us: bank_creation_time.elapsed().as_micros() as u64, + } + } + + fn report(&self, id: u32, slot: Slot) { + let bank_detected_to_now = self.bank_detected_time.elapsed().as_micros() as u64; + datapoint_info!( + "banking_stage-leader_slot_loop_timings", + ("id", id as i64, i64), + ("slot", slot as i64, i64), + ("bank_detected_to_now_us", bank_detected_to_now, i64), + ( + "bank_creation_to_now_us", + bank_detected_to_now + self.bank_detected_delay_us, + i64 + ), + ("bank_detected_delay_us", self.bank_detected_delay_us, i64), + ); + } +} + +// Metrics describing packets ingested/processed in various parts of BankingStage during this +// validator's leader slot +#[derive(Debug, Default)] +struct LeaderSlotPacketCountMetrics { + // total number of live packets TPU received from verified receiver for processing. + total_new_valid_packets: u64, + + // total number of packets TPU received from sigverify that failed signature verification. + newly_failed_sigverify_count: u64, + + // total number of dropped packet due to the thread's buffered packets capacity being reached. + exceeded_buffer_limit_dropped_packets_count: u64, + + // total number of packets that got added to the pending buffer after arriving to BankingStage + newly_buffered_packets_count: u64, + + // total number of transactions in the buffer that were filtered out due to things like age and + // duplicate signature checks + retryable_packets_filtered_count: u64, + + // total number of transactions that attempted execution in this slot. Should equal the sum + // of `committed_transactions_count`, `retryable_errored_transaction_count`, and + // `nonretryable_errored_transactions_count`. + transactions_attempted_execution_count: u64, + + // total number of transactions that were executed and committed into the block + // on this thread + committed_transactions_count: u64, + + // total number of transactions that were executed, got a successful execution output/no error, + // and were then committed into the block + committed_transactions_with_successful_result_count: u64, + + // total number of transactions that were not executed or failed commit, BUT were added back to the buffered + // queue becaus they were retryable errors + retryable_errored_transaction_count: u64, + + // total number of transactions that attempted execution due to some fatal error (too old, duplicate signature, etc.) + // AND were dropped from the buffered queue + nonretryable_errored_transactions_count: u64, + + // total number of transactions that were executed, but failed to be committed into the Poh stream because + // the block ended. Some of these may be already counted in `nonretryable_errored_transactions_count` if they + // then hit the age limit after failing to be comitted. + executed_transactions_failed_commit_count: u64, + + // total number of transactions that were excluded from the block because they were too expensive + // according to the cost model. These transactions are added back to the buffered queue and are + // already counted in `self.retrayble_errored_transaction_count`. + cost_model_throttled_transactions_count: u64, + + // total number of forwardsable packets that failed forwarding + failed_forwarded_packets_count: u64, + + // total number of forwardsable packets that were successfully forwarded + successful_forwarded_packets_count: u64, + + // total number of attempted forwards that failed. Note this is not a count of the number of packets + // that failed, just the total number of batches of packets that failed forwarding + packet_batch_forward_failure_count: u64, + + // total number of valid unprocessed packets in the buffer that were removed after being forwarded + cleared_from_buffer_after_forward_count: u64, + + // total number of packets removed at the end of the slot due to being too old, duplicate, etc. + end_of_slot_filtered_invalid_count: u64, +} + +impl LeaderSlotPacketCountMetrics { + fn new() -> Self { + Self { ..Self::default() } + } + + fn report(&self, id: u32, slot: Slot) { + datapoint_info!( + "banking_stage-leader_slot_packet_counts", + ("id", id as i64, i64), + ("slot", slot as i64, i64), + ( + "total_new_valid_packets", + self.total_new_valid_packets as i64, + i64 + ), + ( + "newly_failed_sigverify_count", + self.newly_failed_sigverify_count as i64, + i64 + ), + ( + "exceeded_buffer_limit_dropped_packets_count", + self.exceeded_buffer_limit_dropped_packets_count as i64, + i64 + ), + ( + "newly_buffered_packets_count", + self.newly_buffered_packets_count as i64, + i64 + ), + ( + "retryable_packets_filtered_count", + self.retryable_packets_filtered_count as i64, + i64 + ), + ( + "transactions_attempted_execution_count", + self.transactions_attempted_execution_count as i64, + i64 + ), + ( + "committed_transactions_count", + self.committed_transactions_count as i64, + i64 + ), + ( + "committed_transactions_with_successful_result_count", + self.committed_transactions_with_successful_result_count as i64, + i64 + ), + ( + "retryable_errored_transaction_count", + self.retryable_errored_transaction_count as i64, + i64 + ), + ( + "nonretryable_errored_transactions_count", + self.nonretryable_errored_transactions_count as i64, + i64 + ), + ( + "executed_transactions_failed_commit_count", + self.executed_transactions_failed_commit_count as i64, + i64 + ), + ( + "cost_model_throttled_transactions_count", + self.cost_model_throttled_transactions_count as i64, + i64 + ), + ( + "failed_forwarded_packets_count", + self.failed_forwarded_packets_count as i64, + i64 + ), + ( + "successful_forwarded_packets_count", + self.successful_forwarded_packets_count as i64, + i64 + ), + ( + "packet_batch_forward_failure_count", + self.packet_batch_forward_failure_count as i64, + i64 + ), + ( + "cleared_from_buffer_after_forward_count", + self.cleared_from_buffer_after_forward_count as i64, + i64 + ), + ( + "end_of_slot_filtered_invalid_count", + self.end_of_slot_filtered_invalid_count as i64, + i64 + ), + ); + } +} + +#[derive(Debug)] +pub(crate) struct LeaderSlotMetrics { + // banking_stage creates one QosService instance per working threads, that is uniquely + // identified by id. This field allows to categorize metrics for gossip votes, TPU votes + // and other transactions. + id: u32, + + // aggregate metrics per slot + slot: Slot, + + packet_count_metrics: LeaderSlotPacketCountMetrics, + + timing_metrics: LeaderSlotTimingMetrics, + + // Used by tests to check if the `self.report()` method was called + is_reported: bool, +} + +impl LeaderSlotMetrics { + pub(crate) fn new(id: u32, slot: Slot, bank_creation_time: &Instant) -> Self { + Self { + id, + slot, + packet_count_metrics: LeaderSlotPacketCountMetrics::new(), + timing_metrics: LeaderSlotTimingMetrics::new(bank_creation_time), + is_reported: false, + } + } + + pub(crate) fn report(&mut self) { + self.is_reported = true; + + self.timing_metrics.report(self.id, self.slot); + self.packet_count_metrics.report(self.id, self.slot); + } + + /// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None + fn reported_slot(&self) -> Option { + if self.is_reported { + Some(self.slot) + } else { + None + } + } +} + +#[derive(Debug)] +pub struct LeaderSlotMetricsTracker { + // Only `Some` if BankingStage detects it's time to construct our leader slot, + // otherwise `None` + leader_slot_metrics: Option, + id: u32, +} + +impl LeaderSlotMetricsTracker { + pub fn new(id: u32) -> Self { + Self { + leader_slot_metrics: None, + id, + } + } + + // Returns reported slot if metrics were reported + pub(crate) fn update_on_leader_slot_boundary( + &mut self, + bank_start: &Option, + ) -> Option { + match (self.leader_slot_metrics.as_mut(), bank_start) { + (None, None) => None, + + (Some(leader_slot_metrics), None) => { + leader_slot_metrics.report(); + // Ensure tests catch that `report()` method was called + let reported_slot = leader_slot_metrics.reported_slot(); + // Slot has ended, time to report metrics + self.leader_slot_metrics = None; + reported_slot + } + + (None, Some(bank_start)) => { + // Our leader slot has begain, time to create a new slot tracker + self.leader_slot_metrics = Some(LeaderSlotMetrics::new( + self.id, + bank_start.0.slot(), + &bank_start.1, + )); + self.leader_slot_metrics.as_ref().unwrap().reported_slot() + } + + (Some(leader_slot_metrics), Some(bank_start)) => { + if leader_slot_metrics.slot != bank_start.0.slot() { + // Last slot has ended, new slot has began + leader_slot_metrics.report(); + // Ensure tests catch that `report()` method was called + let reported_slot = leader_slot_metrics.reported_slot(); + self.leader_slot_metrics = Some(LeaderSlotMetrics::new( + self.id, + bank_start.0.slot(), + &bank_start.1, + )); + reported_slot + } else { + leader_slot_metrics.reported_slot() + } + } + } + } + + pub(crate) fn accumulate_process_transactions_summary( + &mut self, + process_transactions_summary: &ProcessTransactionsSummary, + ) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + let ProcessTransactionsSummary { + transactions_attempted_execution_count, + committed_transactions_count, + committed_transactions_with_successful_result_count, + failed_commit_count, + ref retryable_transaction_indexes, + cost_model_throttled_transactions_count, + .. + } = process_transactions_summary; + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .transactions_attempted_execution_count, + *transactions_attempted_execution_count as u64 + ); + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .committed_transactions_count, + *committed_transactions_count as u64 + ); + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .committed_transactions_with_successful_result_count, + *committed_transactions_with_successful_result_count as u64 + ); + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .executed_transactions_failed_commit_count, + *failed_commit_count as u64 + ); + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .retryable_errored_transaction_count, + retryable_transaction_indexes.len() as u64 + ); + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .nonretryable_errored_transactions_count, + transactions_attempted_execution_count + .saturating_sub(*committed_transactions_count) + .saturating_sub(retryable_transaction_indexes.len()) as u64 + ); + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .cost_model_throttled_transactions_count, + *cost_model_throttled_transactions_count as u64 + ); + } + } + + pub(crate) fn increment_total_new_valid_packets(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .total_new_valid_packets, + count + ); + } + } + + pub(crate) fn increment_newly_failed_sigverify_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .newly_failed_sigverify_count, + count + ); + } + } + + pub(crate) fn increment_exceeded_buffer_limit_dropped_packets_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .exceeded_buffer_limit_dropped_packets_count, + count + ); + } + } + + pub(crate) fn increment_newly_buffered_packets_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .newly_buffered_packets_count, + count + ); + } + } + + pub(crate) fn increment_retryable_packets_filtered_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .retryable_packets_filtered_count, + count + ); + } + } + + pub(crate) fn increment_failed_forwarded_packets_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .failed_forwarded_packets_count, + count + ); + } + } + + pub(crate) fn increment_successful_forwarded_packets_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .successful_forwarded_packets_count, + count + ); + } + } + + pub(crate) fn increment_packet_batch_forward_failure_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .packet_batch_forward_failure_count, + count + ); + } + } + + pub(crate) fn increment_cleared_from_buffer_after_forward_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .cleared_from_buffer_after_forward_count, + count + ); + } + } + + pub(crate) fn increment_end_of_slot_filtered_invalid_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .end_of_slot_filtered_invalid_count, + count + ); + } + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + solana_runtime::{bank::Bank, genesis_utils::create_genesis_config}, + solana_sdk::pubkey::Pubkey, + std::sync::Arc, + }; + + struct TestSlotBoundaryComponents { + first_bank: Arc, + first_poh_recorder_bank: BankStart, + next_bank: Arc, + next_poh_recorder_bank: BankStart, + leader_slot_metrics_tracker: LeaderSlotMetricsTracker, + } + + fn setup_test_slot_boundary_banks() -> TestSlotBoundaryComponents { + let genesis = create_genesis_config(10); + let first_bank = Arc::new(Bank::new(&genesis.genesis_config)); + let first_poh_recorder_bank = (first_bank.clone(), Arc::new(Instant::now())); + + // Create a child descended from the first bank + let next_bank = Arc::new(Bank::new_from_parent( + &first_bank, + &Pubkey::new_unique(), + first_bank.slot() + 1, + )); + let next_poh_recorder_bank = (next_bank.clone(), Arc::new(Instant::now())); + + let banking_stage_thread_id = 0; + let leader_slot_metrics_tracker = LeaderSlotMetricsTracker::new(banking_stage_thread_id); + + TestSlotBoundaryComponents { + first_bank, + first_poh_recorder_bank, + next_bank, + next_poh_recorder_bank, + leader_slot_metrics_tracker, + } + } + + #[test] + pub fn test_update_on_leader_slot_boundary_not_leader_to_not_leader() { + let TestSlotBoundaryComponents { + mut leader_slot_metrics_tracker, + .. + } = setup_test_slot_boundary_banks(); + // Test that with no bank being tracked, and no new bank being tracked, nothing is reported + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&None) + .is_none()); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + } + + #[test] + pub fn test_update_on_leader_slot_boundary_not_leader_to_leader() { + let TestSlotBoundaryComponents { + first_poh_recorder_bank, + mut leader_slot_metrics_tracker, + .. + } = setup_test_slot_boundary_banks(); + + // Test case where the thread has not detected a leader bank, and now sees a leader bank. + // Metrics should not be reported because leader slot has not ended + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) + .is_none()); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_some()); + } + + #[test] + pub fn test_update_on_leader_slot_boundary_leader_to_not_leader() { + let TestSlotBoundaryComponents { + first_bank, + first_poh_recorder_bank, + mut leader_slot_metrics_tracker, + .. + } = setup_test_slot_boundary_banks(); + + // Test case where the thread has a leader bank, and now detects there's no more leader bank, + // implying the slot has ended. Metrics should be reported for `first_bank.slot()`, + // because that leader slot has just ended. + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) + .is_none()); + assert_eq!( + leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&None) + .unwrap(), + first_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&None) + .is_none()); + } + + #[test] + pub fn test_update_on_leader_slot_boundary_leader_to_leader_same_slot() { + let TestSlotBoundaryComponents { + first_bank, + first_poh_recorder_bank, + mut leader_slot_metrics_tracker, + .. + } = setup_test_slot_boundary_banks(); + + // Test case where the thread has a leader bank, and now detects the same leader bank, + // implying the slot is still running. Metrics should not be reported + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank.clone())) + .is_none()); + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) + .is_none()); + assert_eq!( + leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&None) + .unwrap(), + first_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + } + + #[test] + pub fn test_update_on_leader_slot_boundary_leader_to_leader_bigger_slot() { + let TestSlotBoundaryComponents { + first_bank, + first_poh_recorder_bank, + next_bank, + next_poh_recorder_bank, + mut leader_slot_metrics_tracker, + } = setup_test_slot_boundary_banks(); + + // Test case where the thread has a leader bank, and now detects there's a new leader bank + // for a bigger slot, implying the slot has ended. Metrics should be reported for the + // smaller slot + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) + .is_none()); + assert_eq!( + leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(next_poh_recorder_bank)) + .unwrap(), + first_bank.slot() + ); + assert_eq!( + leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&None) + .unwrap(), + next_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + } + + #[test] + pub fn test_update_on_leader_slot_boundary_leader_to_leader_smaller_slot() { + let TestSlotBoundaryComponents { + first_bank, + first_poh_recorder_bank, + next_bank, + next_poh_recorder_bank, + mut leader_slot_metrics_tracker, + } = setup_test_slot_boundary_banks(); + // Test case where the thread has a leader bank, and now detects there's a new leader bank + // for a samller slot, implying the slot has ended. Metrics should be reported for the + // bigger slot + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(next_poh_recorder_bank)) + .is_none()); + assert_eq!( + leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) + .unwrap(), + next_bank.slot() + ); + assert_eq!( + leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&None) + .unwrap(), + first_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 68bca8d15e..2ab4849f67 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -26,6 +26,7 @@ pub mod fork_choice; pub mod gen_keys; pub mod heaviest_subtree_fork_choice; pub mod latest_validator_votes_for_frozen_banks; +pub mod leader_slot_banking_stage_metrics; pub mod ledger_cleanup_service; pub mod optimistic_confirmation_verifier; pub mod outstanding_requests;