Introduce slot-specific packet metrics (backport #22906) (#23076)

* Resolve conflicts

* add end_of_slot_filtered_invalid_count

* Increment end_of_slot_filtered_invalid_count

* Fixup comment

* Remove comment

* Move all process_tx metris into common function

* Switch to saturating_add_assign macro

* Refactor timings so each struct reports own timing

* Move into accumulate

* Remove unnecessary struct

Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
mergify[bot]
2022-02-14 10:23:48 +00:00
committed by GitHub
parent b4916d2601
commit 00abcbe1be
4 changed files with 851 additions and 53 deletions

View File

@ -8,7 +8,10 @@ use {
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_core::banking_stage::{BankingStage, BankingStageStats},
solana_core::{
banking_stage::{BankingStage, BankingStageStats},
leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker,
},
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
blockstore::Blockstore,
@ -95,6 +98,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&BankingStageStats::default(),
&recorder,
&Arc::new(RwLock::new(CostModel::default())),
&mut LeaderSlotMetricsTracker::new(0),
);
});

View File

@ -2,6 +2,9 @@
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use {
crate::leader_slot_banking_stage_metrics::{
LeaderSlotMetricsTracker, ProcessTransactionsSummary,
},
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
histogram::Histogram,
itertools::Itertools,
@ -83,47 +86,6 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
const NUM_VOTE_PROCESSING_THREADS: u32 = 2;
const MIN_THREADS_BANKING: u32 = 1;
/// A summary of what happened to transactions passed to the execution pipeline.
/// Transactions can
/// 1) Did not even make it to execution due to being filtered out by things like AccountInUse
/// lock conflictss or CostModel compute limits. These types of errors are retryable and
/// counted in `Self::retryable_transaction_indexes`.
/// 2) Did not execute due to some fatal error like too old, or duplicate signature. These
/// will be dropped from the transactions queue and not counted in `Self::retryable_transaction_indexes`
/// 3) Were executed and committed, captured by `committed_transactions_count` below.
/// 4) Were executed and failed commit, captured by `failed_commit_count` below.
struct ProcessTransactionsSummary {
// Returns true if we hit the end of the block/max PoH height for the block before
// processing all the transactions in the batch.
reached_max_poh_height: bool,
// Total number of transactions that were passed as candidates for execution. See description
// of struct above for possible outcomes for these transactions
#[allow(dead_code)]
transactions_attempted_execution_count: usize,
// Total number of transactions that made it into the block
#[allow(dead_code)]
committed_transactions_count: usize,
// Total number of transactions that made it into the block where the transactions
// output from execution was success/no error.
#[allow(dead_code)]
committed_transactions_with_successful_result_count: usize,
// All transactions that were executed but then failed record because the
// slot ended
#[allow(dead_code)]
failed_commit_count: usize,
// Indexes of transactions in the transactions slice that were not committed but are retryable
retryable_transaction_indexes: Vec<usize>,
// The number of transactions filtered out by the cost model
#[allow(dead_code)]
cost_model_throttled_transactions_count: usize,
}
pub struct ExecuteAndCommitTransactionsOutput {
// Total number of transactions that were passed as candidates for execution
transactions_attempted_execution_count: usize,
@ -155,6 +117,7 @@ pub struct BankingStageStats {
current_buffered_packet_batches_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize,
consumed_buffered_packets_count: AtomicUsize,
end_of_slot_filtered_invalid_count: AtomicUsize,
cost_tracker_check_count: AtomicUsize,
cost_forced_retry_transactions_count: AtomicUsize,
batch_packet_indexes_len: Histogram,
@ -242,6 +205,12 @@ impl BankingStageStats {
self.cost_tracker_check_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"end_of_slot_filtered_invalid_count",
self.end_of_slot_filtered_invalid_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"cost_forced_retry_transactions_count",
self.cost_forced_retry_transactions_count
@ -451,13 +420,14 @@ impl BankingStage {
.collect()
}
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
/// the number of successfully forwarded packets in second part of tuple
fn forward_buffered_packets(
socket: &std::net::UdpSocket,
tpu_forwards: &std::net::SocketAddr,
buffered_packet_batches: &UnprocessedPacketBatches,
packets: Vec<&Packet>,
data_budget: &DataBudget,
) -> std::io::Result<()> {
let packets = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
) -> (std::io::Result<()>, usize) {
const INTERVAL_MS: u64 = 100;
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
@ -489,7 +459,7 @@ impl BankingStage {
inc_new_counter_info!("banking_stage-forwarded_packets", forwarded_packet_count);
}
forward_result
(forward_result, forwarded_packet_count)
}
// Returns whether the given `PacketBatch` has any more remaining unprocessed
@ -518,6 +488,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) {
let mut rebuffered_packet_count = 0;
let mut consumed_buffered_packets_count = 0;
@ -531,7 +502,7 @@ impl BankingStage {
if let Some((next_leader, bank)) = &reached_end_of_slot {
// We've hit the end of this slot, no need to perform more processing,
// just filter the remaining packets for the invalid (e.g. too old) ones
let new_unprocessed_indexes = Self::filter_unprocessed_packets(
let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot(
bank,
packet_batch,
original_unprocessed_indexes,
@ -539,6 +510,18 @@ impl BankingStage {
*next_leader,
banking_stage_stats,
);
let end_of_slot_filtered_invalid_count = original_unprocessed_indexes
.len()
.saturating_sub(new_unprocessed_indexes.len());
slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count(
end_of_slot_filtered_invalid_count as u64,
);
banking_stage_stats
.end_of_slot_filtered_invalid_count
.fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed);
Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes,
new_unprocessed_indexes,
@ -556,6 +539,7 @@ impl BankingStage {
gossip_vote_sender,
banking_stage_stats,
cost_model,
slot_metrics_tracker,
);
let ProcessTransactionsSummary {
reached_max_poh_height,
@ -674,6 +658,7 @@ impl BankingStage {
recorder: &TransactionRecorder,
data_budget: &DataBudget,
cost_model: &Arc<RwLock<CostModel>>,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> BufferedPacketsDecision {
let bank_start;
let (
@ -715,6 +700,7 @@ impl BankingStage {
banking_stage_stats,
recorder,
cost_model,
slot_metrics_tracker,
);
}
BufferedPacketsDecision::Forward => {
@ -726,6 +712,7 @@ impl BankingStage {
socket,
false,
data_budget,
slot_metrics_tracker,
);
}
BufferedPacketsDecision::ForwardAndHold => {
@ -737,10 +724,12 @@ impl BankingStage {
socket,
true,
data_budget,
slot_metrics_tracker,
);
}
_ => (),
}
decision
}
@ -752,6 +741,7 @@ impl BankingStage {
socket: &UdpSocket,
hold: bool,
data_budget: &DataBudget,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) {
let addr = match forward_option {
ForwardOption::NotForward => {
@ -769,13 +759,35 @@ impl BankingStage {
Some(addr) => addr,
None => return,
};
let _ = Self::forward_buffered_packets(socket, &addr, buffered_packet_batches, data_budget);
let forwardable_packets =
Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
let forwardable_packets_len = forwardable_packets.len();
let (_forward_result, sucessful_forwarded_packets_count) =
Self::forward_buffered_packets(socket, &addr, forwardable_packets, data_budget);
let failed_forwarded_packets_count =
forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count);
if failed_forwarded_packets_count > 0 {
slot_metrics_tracker
.increment_failed_forwarded_packets_count(failed_forwarded_packets_count as u64);
slot_metrics_tracker.increment_packet_batch_forward_failure_count(1);
}
if sucessful_forwarded_packets_count > 0 {
slot_metrics_tracker.increment_successful_forwarded_packets_count(
sucessful_forwarded_packets_count as u64,
);
}
if hold {
buffered_packet_batches.retain(|(_, index, _)| !index.is_empty());
for (_, _, forwarded) in buffered_packet_batches.iter_mut() {
*forwarded = true;
}
} else {
slot_metrics_tracker
.increment_cleared_from_buffer_after_forward_count(forwardable_packets_len as u64);
buffered_packet_batches.clear();
}
}
@ -799,6 +811,7 @@ impl BankingStage {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit);
let mut banking_stage_stats = BankingStageStats::new(id);
let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
loop {
while !buffered_packet_batches.is_empty() {
let decision = Self::process_buffered_packets(
@ -814,6 +827,7 @@ impl BankingStage {
&recorder,
data_budget,
&cost_model,
&mut slot_metrics_tracker,
);
if matches!(decision, BufferedPacketsDecision::Hold)
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
@ -824,6 +838,12 @@ impl BankingStage {
}
}
let current_poh_bank = {
let poh = poh_recorder.lock().unwrap();
poh.bank_start()
};
slot_metrics_tracker.update_on_leader_slot_boundary(&current_poh_bank);
let recv_timeout = if !buffered_packet_batches.is_empty() {
// If packets are buffered, let's wait for less time on recv from the channel.
// This helps detect the next leader faster, and processing the buffered
@ -848,6 +868,7 @@ impl BankingStage {
&mut banking_stage_stats,
&recorder,
&cost_model,
&mut slot_metrics_tracker,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
@ -1411,6 +1432,7 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
cost_model: &Arc<RwLock<CostModel>>,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> ProcessTransactionsSummary {
let mut packet_conversion_time = Measure::start("packet_conversion");
let (transactions, transaction_to_packet_indexes, cost_model_throttled_packet_indexes) =
@ -1470,8 +1492,14 @@ impl BankingStage {
);
}
});
cost_tracking_time.stop();
slot_metrics_tracker.accumulate_process_transactions_summary(&process_transactions_summary);
let retryable_tx_count = retryable_transaction_indexes.len();
inc_new_counter_info!("banking_stage-unprocessed_transactions", retryable_tx_count);
let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time");
let mut filtered_retryable_tx_indexes = Self::filter_pending_packets_from_pending_txs(
bank,
@ -1481,6 +1509,12 @@ impl BankingStage {
);
filter_pending_packets_time.stop();
let retryable_packets_filtered_count = retryable_transaction_indexes
.len()
.saturating_sub(filtered_retryable_tx_indexes.len());
slot_metrics_tracker
.increment_retryable_packets_filtered_count(retryable_packets_filtered_count as u64);
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
retryable_transaction_indexes
@ -1510,7 +1544,7 @@ impl BankingStage {
process_transactions_summary
}
fn filter_unprocessed_packets(
fn filter_unprocessed_packets_at_end_of_slot(
bank: &Arc<Bank>,
packet_batch: &PacketBatch,
transaction_indexes: &[usize],
@ -1599,6 +1633,7 @@ impl BankingStage {
banking_stage_stats: &mut BankingStageStats,
recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
let packet_batches = verified_receiver.recv_timeout(recv_timeout)?;
@ -1623,6 +1658,15 @@ impl BankingStage {
let mut newly_buffered_packets_count = 0;
while let Some(packet_batch) = packet_batch_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets);
// Track all the packets incoming from sigverify, both valid and invalid
slot_metrics_tracker.increment_total_new_valid_packets(packet_indexes.len() as u64);
slot_metrics_tracker.increment_newly_failed_sigverify_count(
packet_batch
.packets
.len()
.saturating_sub(packet_indexes.len()) as u64,
);
let bank_start = poh.lock().unwrap().bank_start();
if PohRecorder::get_bank_still_processing_txs(&bank_start).is_none() {
Self::push_unprocessed(
@ -1634,6 +1678,7 @@ impl BankingStage {
&mut newly_buffered_packets_count,
batch_limit,
banking_stage_stats,
slot_metrics_tracker,
);
continue;
}
@ -1651,6 +1696,7 @@ impl BankingStage {
gossip_vote_sender,
banking_stage_stats,
cost_model,
slot_metrics_tracker,
);
let ProcessTransactionsSummary {
@ -1669,6 +1715,7 @@ impl BankingStage {
&mut newly_buffered_packets_count,
batch_limit,
banking_stage_stats,
slot_metrics_tracker,
);
// If there were retryable transactions, add the unexpired ones to the buffered queue
@ -1678,24 +1725,39 @@ impl BankingStage {
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
#[allow(clippy::while_let_on_iterator)]
while let Some(packet_batch) = packet_batch_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets);
let unprocessed_indexes = Self::filter_unprocessed_packets(
let original_unprocessed_indexes =
Self::generate_packet_indexes(&packet_batch.packets);
let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot(
&bank,
&packet_batch,
&packet_indexes,
&original_unprocessed_indexes,
my_pubkey,
next_leader,
banking_stage_stats,
);
let end_of_slot_filtered_invalid_count = original_unprocessed_indexes
.len()
.saturating_sub(new_unprocessed_indexes.len());
slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count(
end_of_slot_filtered_invalid_count as u64,
);
banking_stage_stats
.end_of_slot_filtered_invalid_count
.fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed);
Self::push_unprocessed(
buffered_packet_batches,
packet_batch,
unprocessed_indexes,
new_unprocessed_indexes,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
banking_stage_stats,
slot_metrics_tracker,
);
}
handle_retryable_packets_time.stop();
@ -1757,12 +1819,16 @@ impl BankingStage {
newly_buffered_packets_count: &mut usize,
batch_limit: usize,
banking_stage_stats: &mut BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) {
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
if unprocessed_packet_batches.len() >= batch_limit {
*dropped_packet_batches_count += 1;
if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() {
*dropped_packets_count += dropped_batch.1.len();
slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count(
dropped_batch.1.len() as u64,
);
}
}
let _ = banking_stage_stats
@ -1770,6 +1836,8 @@ impl BankingStage {
.increment(packet_indexes.len() as u64);
*newly_buffered_packets_count += packet_indexes.len();
slot_metrics_tracker
.increment_newly_buffered_packets_count(packet_indexes.len() as u64);
unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false));
}
}
@ -3213,6 +3281,7 @@ mod tests {
&BankingStageStats::default(),
&recorder,
&Arc::new(RwLock::new(CostModel::default())),
&mut LeaderSlotMetricsTracker::new(0),
);
assert_eq!(
buffered_packet_batches[0].1.len(),
@ -3233,6 +3302,7 @@ mod tests {
&BankingStageStats::default(),
&recorder,
&Arc::new(RwLock::new(CostModel::default())),
&mut LeaderSlotMetricsTracker::new(0),
);
if num_expected_unprocessed == 0 {
assert!(buffered_packet_batches.is_empty())
@ -3299,6 +3369,7 @@ mod tests {
&BankingStageStats::default(),
&recorder,
&Arc::new(RwLock::new(CostModel::default())),
&mut LeaderSlotMetricsTracker::new(0),
);
// Check everything is correct. All indexes after `interrupted_iteration`
@ -3396,6 +3467,7 @@ mod tests {
&send_socket,
true,
&data_budget,
&mut LeaderSlotMetricsTracker::new(0),
);
recv_socket
@ -3495,6 +3567,7 @@ mod tests {
&send_socket,
hold,
&DataBudget::default(),
&mut LeaderSlotMetricsTracker::new(0),
);
recv_socket
@ -3556,6 +3629,7 @@ mod tests {
&mut newly_buffered_packets_count,
batch_limit,
&mut banking_stage_stats,
&mut LeaderSlotMetricsTracker::new(0),
);
assert_eq!(unprocessed_packets.len(), 1);
assert_eq!(dropped_packet_batches_count, 0);
@ -3574,6 +3648,7 @@ mod tests {
&mut newly_buffered_packets_count,
batch_limit,
&mut banking_stage_stats,
&mut LeaderSlotMetricsTracker::new(0),
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(dropped_packet_batches_count, 0);
@ -3597,6 +3672,7 @@ mod tests {
&mut newly_buffered_packets_count,
batch_limit,
&mut banking_stage_stats,
&mut LeaderSlotMetricsTracker::new(0),
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(

View File

@ -0,0 +1,717 @@
use {
solana_poh::poh_recorder::BankStart,
solana_sdk::{clock::Slot, saturating_add_assign},
std::time::Instant,
};
/// A summary of what happened to transactions passed to the execution pipeline.
/// Transactions can
/// 1) Did not even make it to execution due to being filtered out by things like AccountInUse
/// lock conflicts or CostModel compute limits. These types of errors are retryable and
/// counted in `Self::retryable_transaction_indexes`.
/// 2) Did not execute due to some fatal error like too old, or duplicate signature. These
/// will be dropped from the transactions queue and not counted in `Self::retryable_transaction_indexes`
/// 3) Were executed and committed, captured by `committed_transactions_count` below.
/// 4) Were executed and failed commit, captured by `failed_commit_count` below.
pub(crate) struct ProcessTransactionsSummary {
// Returns true if we hit the end of the block/max PoH height for the block before
// processing all the transactions in the batch.
pub reached_max_poh_height: bool,
// Total number of transactions that were passed as candidates for execution. See description
// of struct above for possible outcomes for these transactions
pub transactions_attempted_execution_count: usize,
// Total number of transactions that made it into the block
pub committed_transactions_count: usize,
// Total number of transactions that made it into the block where the transactions
// output from execution was success/no error.
pub committed_transactions_with_successful_result_count: usize,
// All transactions that were executed but then failed record because the
// slot ended
pub failed_commit_count: usize,
// Indexes of transactions in the transactions slice that were not committed but are retryable
pub retryable_transaction_indexes: Vec<usize>,
// The number of transactions filtered out by the cost model
pub cost_model_throttled_transactions_count: usize,
}
// Metrics capturing wallclock time spent in various parts of BankingStage during this
// validator's leader slot
#[derive(Debug)]
struct LeaderSlotTimingMetrics {
bank_detected_time: Instant,
// Delay from when the bank was created to when this thread detected it
bank_detected_delay_us: u64,
}
impl LeaderSlotTimingMetrics {
fn new(bank_creation_time: &Instant) -> Self {
Self {
bank_detected_time: Instant::now(),
bank_detected_delay_us: bank_creation_time.elapsed().as_micros() as u64,
}
}
fn report(&self, id: u32, slot: Slot) {
let bank_detected_to_now = self.bank_detected_time.elapsed().as_micros() as u64;
datapoint_info!(
"banking_stage-leader_slot_loop_timings",
("id", id as i64, i64),
("slot", slot as i64, i64),
("bank_detected_to_now_us", bank_detected_to_now, i64),
(
"bank_creation_to_now_us",
bank_detected_to_now + self.bank_detected_delay_us,
i64
),
("bank_detected_delay_us", self.bank_detected_delay_us, i64),
);
}
}
// Metrics describing packets ingested/processed in various parts of BankingStage during this
// validator's leader slot
#[derive(Debug, Default)]
struct LeaderSlotPacketCountMetrics {
// total number of live packets TPU received from verified receiver for processing.
total_new_valid_packets: u64,
// total number of packets TPU received from sigverify that failed signature verification.
newly_failed_sigverify_count: u64,
// total number of dropped packet due to the thread's buffered packets capacity being reached.
exceeded_buffer_limit_dropped_packets_count: u64,
// total number of packets that got added to the pending buffer after arriving to BankingStage
newly_buffered_packets_count: u64,
// total number of transactions in the buffer that were filtered out due to things like age and
// duplicate signature checks
retryable_packets_filtered_count: u64,
// total number of transactions that attempted execution in this slot. Should equal the sum
// of `committed_transactions_count`, `retryable_errored_transaction_count`, and
// `nonretryable_errored_transactions_count`.
transactions_attempted_execution_count: u64,
// total number of transactions that were executed and committed into the block
// on this thread
committed_transactions_count: u64,
// total number of transactions that were executed, got a successful execution output/no error,
// and were then committed into the block
committed_transactions_with_successful_result_count: u64,
// total number of transactions that were not executed or failed commit, BUT were added back to the buffered
// queue becaus they were retryable errors
retryable_errored_transaction_count: u64,
// total number of transactions that attempted execution due to some fatal error (too old, duplicate signature, etc.)
// AND were dropped from the buffered queue
nonretryable_errored_transactions_count: u64,
// total number of transactions that were executed, but failed to be committed into the Poh stream because
// the block ended. Some of these may be already counted in `nonretryable_errored_transactions_count` if they
// then hit the age limit after failing to be comitted.
executed_transactions_failed_commit_count: u64,
// total number of transactions that were excluded from the block because they were too expensive
// according to the cost model. These transactions are added back to the buffered queue and are
// already counted in `self.retrayble_errored_transaction_count`.
cost_model_throttled_transactions_count: u64,
// total number of forwardsable packets that failed forwarding
failed_forwarded_packets_count: u64,
// total number of forwardsable packets that were successfully forwarded
successful_forwarded_packets_count: u64,
// total number of attempted forwards that failed. Note this is not a count of the number of packets
// that failed, just the total number of batches of packets that failed forwarding
packet_batch_forward_failure_count: u64,
// total number of valid unprocessed packets in the buffer that were removed after being forwarded
cleared_from_buffer_after_forward_count: u64,
// total number of packets removed at the end of the slot due to being too old, duplicate, etc.
end_of_slot_filtered_invalid_count: u64,
}
impl LeaderSlotPacketCountMetrics {
fn new() -> Self {
Self { ..Self::default() }
}
fn report(&self, id: u32, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_packet_counts",
("id", id as i64, i64),
("slot", slot as i64, i64),
(
"total_new_valid_packets",
self.total_new_valid_packets as i64,
i64
),
(
"newly_failed_sigverify_count",
self.newly_failed_sigverify_count as i64,
i64
),
(
"exceeded_buffer_limit_dropped_packets_count",
self.exceeded_buffer_limit_dropped_packets_count as i64,
i64
),
(
"newly_buffered_packets_count",
self.newly_buffered_packets_count as i64,
i64
),
(
"retryable_packets_filtered_count",
self.retryable_packets_filtered_count as i64,
i64
),
(
"transactions_attempted_execution_count",
self.transactions_attempted_execution_count as i64,
i64
),
(
"committed_transactions_count",
self.committed_transactions_count as i64,
i64
),
(
"committed_transactions_with_successful_result_count",
self.committed_transactions_with_successful_result_count as i64,
i64
),
(
"retryable_errored_transaction_count",
self.retryable_errored_transaction_count as i64,
i64
),
(
"nonretryable_errored_transactions_count",
self.nonretryable_errored_transactions_count as i64,
i64
),
(
"executed_transactions_failed_commit_count",
self.executed_transactions_failed_commit_count as i64,
i64
),
(
"cost_model_throttled_transactions_count",
self.cost_model_throttled_transactions_count as i64,
i64
),
(
"failed_forwarded_packets_count",
self.failed_forwarded_packets_count as i64,
i64
),
(
"successful_forwarded_packets_count",
self.successful_forwarded_packets_count as i64,
i64
),
(
"packet_batch_forward_failure_count",
self.packet_batch_forward_failure_count as i64,
i64
),
(
"cleared_from_buffer_after_forward_count",
self.cleared_from_buffer_after_forward_count as i64,
i64
),
(
"end_of_slot_filtered_invalid_count",
self.end_of_slot_filtered_invalid_count as i64,
i64
),
);
}
}
#[derive(Debug)]
pub(crate) struct LeaderSlotMetrics {
// banking_stage creates one QosService instance per working threads, that is uniquely
// identified by id. This field allows to categorize metrics for gossip votes, TPU votes
// and other transactions.
id: u32,
// aggregate metrics per slot
slot: Slot,
packet_count_metrics: LeaderSlotPacketCountMetrics,
timing_metrics: LeaderSlotTimingMetrics,
// Used by tests to check if the `self.report()` method was called
is_reported: bool,
}
impl LeaderSlotMetrics {
pub(crate) fn new(id: u32, slot: Slot, bank_creation_time: &Instant) -> Self {
Self {
id,
slot,
packet_count_metrics: LeaderSlotPacketCountMetrics::new(),
timing_metrics: LeaderSlotTimingMetrics::new(bank_creation_time),
is_reported: false,
}
}
pub(crate) fn report(&mut self) {
self.is_reported = true;
self.timing_metrics.report(self.id, self.slot);
self.packet_count_metrics.report(self.id, self.slot);
}
/// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None
fn reported_slot(&self) -> Option<Slot> {
if self.is_reported {
Some(self.slot)
} else {
None
}
}
}
#[derive(Debug)]
pub struct LeaderSlotMetricsTracker {
// Only `Some` if BankingStage detects it's time to construct our leader slot,
// otherwise `None`
leader_slot_metrics: Option<LeaderSlotMetrics>,
id: u32,
}
impl LeaderSlotMetricsTracker {
pub fn new(id: u32) -> Self {
Self {
leader_slot_metrics: None,
id,
}
}
// Returns reported slot if metrics were reported
pub(crate) fn update_on_leader_slot_boundary(
&mut self,
bank_start: &Option<BankStart>,
) -> Option<Slot> {
match (self.leader_slot_metrics.as_mut(), bank_start) {
(None, None) => None,
(Some(leader_slot_metrics), None) => {
leader_slot_metrics.report();
// Ensure tests catch that `report()` method was called
let reported_slot = leader_slot_metrics.reported_slot();
// Slot has ended, time to report metrics
self.leader_slot_metrics = None;
reported_slot
}
(None, Some(bank_start)) => {
// Our leader slot has begain, time to create a new slot tracker
self.leader_slot_metrics = Some(LeaderSlotMetrics::new(
self.id,
bank_start.0.slot(),
&bank_start.1,
));
self.leader_slot_metrics.as_ref().unwrap().reported_slot()
}
(Some(leader_slot_metrics), Some(bank_start)) => {
if leader_slot_metrics.slot != bank_start.0.slot() {
// Last slot has ended, new slot has began
leader_slot_metrics.report();
// Ensure tests catch that `report()` method was called
let reported_slot = leader_slot_metrics.reported_slot();
self.leader_slot_metrics = Some(LeaderSlotMetrics::new(
self.id,
bank_start.0.slot(),
&bank_start.1,
));
reported_slot
} else {
leader_slot_metrics.reported_slot()
}
}
}
}
pub(crate) fn accumulate_process_transactions_summary(
&mut self,
process_transactions_summary: &ProcessTransactionsSummary,
) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
let ProcessTransactionsSummary {
transactions_attempted_execution_count,
committed_transactions_count,
committed_transactions_with_successful_result_count,
failed_commit_count,
ref retryable_transaction_indexes,
cost_model_throttled_transactions_count,
..
} = process_transactions_summary;
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.transactions_attempted_execution_count,
*transactions_attempted_execution_count as u64
);
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.committed_transactions_count,
*committed_transactions_count as u64
);
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.committed_transactions_with_successful_result_count,
*committed_transactions_with_successful_result_count as u64
);
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.executed_transactions_failed_commit_count,
*failed_commit_count as u64
);
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.retryable_errored_transaction_count,
retryable_transaction_indexes.len() as u64
);
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.nonretryable_errored_transactions_count,
transactions_attempted_execution_count
.saturating_sub(*committed_transactions_count)
.saturating_sub(retryable_transaction_indexes.len()) as u64
);
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.cost_model_throttled_transactions_count,
*cost_model_throttled_transactions_count as u64
);
}
}
pub(crate) fn increment_total_new_valid_packets(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.total_new_valid_packets,
count
);
}
}
pub(crate) fn increment_newly_failed_sigverify_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.newly_failed_sigverify_count,
count
);
}
}
pub(crate) fn increment_exceeded_buffer_limit_dropped_packets_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.exceeded_buffer_limit_dropped_packets_count,
count
);
}
}
pub(crate) fn increment_newly_buffered_packets_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.newly_buffered_packets_count,
count
);
}
}
pub(crate) fn increment_retryable_packets_filtered_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.retryable_packets_filtered_count,
count
);
}
}
pub(crate) fn increment_failed_forwarded_packets_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.failed_forwarded_packets_count,
count
);
}
}
pub(crate) fn increment_successful_forwarded_packets_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.successful_forwarded_packets_count,
count
);
}
}
pub(crate) fn increment_packet_batch_forward_failure_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.packet_batch_forward_failure_count,
count
);
}
}
pub(crate) fn increment_cleared_from_buffer_after_forward_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.cleared_from_buffer_after_forward_count,
count
);
}
}
pub(crate) fn increment_end_of_slot_filtered_invalid_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.end_of_slot_filtered_invalid_count,
count
);
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
solana_runtime::{bank::Bank, genesis_utils::create_genesis_config},
solana_sdk::pubkey::Pubkey,
std::sync::Arc,
};
struct TestSlotBoundaryComponents {
first_bank: Arc<Bank>,
first_poh_recorder_bank: BankStart,
next_bank: Arc<Bank>,
next_poh_recorder_bank: BankStart,
leader_slot_metrics_tracker: LeaderSlotMetricsTracker,
}
fn setup_test_slot_boundary_banks() -> TestSlotBoundaryComponents {
let genesis = create_genesis_config(10);
let first_bank = Arc::new(Bank::new(&genesis.genesis_config));
let first_poh_recorder_bank = (first_bank.clone(), Arc::new(Instant::now()));
// Create a child descended from the first bank
let next_bank = Arc::new(Bank::new_from_parent(
&first_bank,
&Pubkey::new_unique(),
first_bank.slot() + 1,
));
let next_poh_recorder_bank = (next_bank.clone(), Arc::new(Instant::now()));
let banking_stage_thread_id = 0;
let leader_slot_metrics_tracker = LeaderSlotMetricsTracker::new(banking_stage_thread_id);
TestSlotBoundaryComponents {
first_bank,
first_poh_recorder_bank,
next_bank,
next_poh_recorder_bank,
leader_slot_metrics_tracker,
}
}
#[test]
pub fn test_update_on_leader_slot_boundary_not_leader_to_not_leader() {
let TestSlotBoundaryComponents {
mut leader_slot_metrics_tracker,
..
} = setup_test_slot_boundary_banks();
// Test that with no bank being tracked, and no new bank being tracked, nothing is reported
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&None)
.is_none());
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
}
#[test]
pub fn test_update_on_leader_slot_boundary_not_leader_to_leader() {
let TestSlotBoundaryComponents {
first_poh_recorder_bank,
mut leader_slot_metrics_tracker,
..
} = setup_test_slot_boundary_banks();
// Test case where the thread has not detected a leader bank, and now sees a leader bank.
// Metrics should not be reported because leader slot has not ended
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
.is_none());
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_some());
}
#[test]
pub fn test_update_on_leader_slot_boundary_leader_to_not_leader() {
let TestSlotBoundaryComponents {
first_bank,
first_poh_recorder_bank,
mut leader_slot_metrics_tracker,
..
} = setup_test_slot_boundary_banks();
// Test case where the thread has a leader bank, and now detects there's no more leader bank,
// implying the slot has ended. Metrics should be reported for `first_bank.slot()`,
// because that leader slot has just ended.
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
.is_none());
assert_eq!(
leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&None)
.unwrap(),
first_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&None)
.is_none());
}
#[test]
pub fn test_update_on_leader_slot_boundary_leader_to_leader_same_slot() {
let TestSlotBoundaryComponents {
first_bank,
first_poh_recorder_bank,
mut leader_slot_metrics_tracker,
..
} = setup_test_slot_boundary_banks();
// Test case where the thread has a leader bank, and now detects the same leader bank,
// implying the slot is still running. Metrics should not be reported
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank.clone()))
.is_none());
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
.is_none());
assert_eq!(
leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&None)
.unwrap(),
first_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
}
#[test]
pub fn test_update_on_leader_slot_boundary_leader_to_leader_bigger_slot() {
let TestSlotBoundaryComponents {
first_bank,
first_poh_recorder_bank,
next_bank,
next_poh_recorder_bank,
mut leader_slot_metrics_tracker,
} = setup_test_slot_boundary_banks();
// Test case where the thread has a leader bank, and now detects there's a new leader bank
// for a bigger slot, implying the slot has ended. Metrics should be reported for the
// smaller slot
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
.is_none());
assert_eq!(
leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(next_poh_recorder_bank))
.unwrap(),
first_bank.slot()
);
assert_eq!(
leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&None)
.unwrap(),
next_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
}
#[test]
pub fn test_update_on_leader_slot_boundary_leader_to_leader_smaller_slot() {
let TestSlotBoundaryComponents {
first_bank,
first_poh_recorder_bank,
next_bank,
next_poh_recorder_bank,
mut leader_slot_metrics_tracker,
} = setup_test_slot_boundary_banks();
// Test case where the thread has a leader bank, and now detects there's a new leader bank
// for a samller slot, implying the slot has ended. Metrics should be reported for the
// bigger slot
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(next_poh_recorder_bank))
.is_none());
assert_eq!(
leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
.unwrap(),
next_bank.slot()
);
assert_eq!(
leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&None)
.unwrap(),
first_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
}
}

View File

@ -26,6 +26,7 @@ pub mod fork_choice;
pub mod gen_keys;
pub mod heaviest_subtree_fork_choice;
pub mod latest_validator_votes_for_frozen_banks;
pub mod leader_slot_banking_stage_metrics;
pub mod ledger_cleanup_service;
pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests;