Add PacketBatch packet_indexes stat (#22564) (#22574)

* collect stats on packet batch indicies

* cleanup

* cleanup

* cleanup

* change name

(cherry picked from commit 650882217c)

# Conflicts:
#	core/src/banking_stage.rs

Co-authored-by: buffalu <85544055+buffalu@users.noreply.github.com>
This commit is contained in:
mergify[bot]
2022-01-20 00:48:18 +00:00
committed by GitHub
parent 78b82dedb1
commit 99846eea12

View File

@ -4,6 +4,7 @@
use {
crate::packet_hasher::PacketHasher,
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
histogram::Histogram,
itertools::Itertools,
lru::LruCache,
retain_mut::RetainMut,
@ -100,6 +101,7 @@ pub struct BankingStageStats {
consumed_buffered_packets_count: AtomicUsize,
cost_tracker_check_count: AtomicUsize,
cost_forced_retry_transactions_count: AtomicUsize,
batch_packet_indexes_len: Histogram,
// Timing
consume_buffered_packets_elapsed: AtomicU64,
@ -119,11 +121,15 @@ impl BankingStageStats {
pub fn new(id: u32) -> Self {
BankingStageStats {
id,
batch_packet_indexes_len: Histogram::configure()
.max_value(PACKETS_PER_BATCH as u64)
.build()
.unwrap(),
..BankingStageStats::default()
}
}
fn report(&self, report_interval_ms: u64) {
fn report(&mut self, report_interval_ms: u64) {
if self.last_report.should_update(report_interval_ms) {
datapoint_info!(
"banking_stage-loop-stats",
@ -254,7 +260,28 @@ impl BankingStageStats {
self.cost_tracker_check_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"packet_batch_indices_len_min",
self.batch_packet_indexes_len.minimum().unwrap_or(0) as i64,
i64
),
(
"packet_batch_indices_len_max",
self.batch_packet_indexes_len.maximum().unwrap_or(0) as i64,
i64
),
(
"packet_batch_indices_len_mean",
self.batch_packet_indexes_len.mean().unwrap_or(0) as i64,
i64
),
(
"packet_batch_indices_len_90pct",
self.batch_packet_indexes_len.percentile(90.0).unwrap_or(0) as i64,
i64
),
);
self.batch_packet_indexes_len.clear();
}
}
}
@ -723,7 +750,7 @@ impl BankingStage {
let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit);
let banking_stage_stats = BankingStageStats::new(id);
let mut banking_stage_stats = BankingStageStats::new(id);
loop {
while !buffered_packet_batches.is_empty() {
let decision = Self::process_buffered_packets(
@ -770,7 +797,7 @@ impl BankingStage {
transaction_status_sender.clone(),
&gossip_vote_sender,
&mut buffered_packet_batches,
&banking_stage_stats,
&mut banking_stage_stats,
duplicates,
&recorder,
&cost_model,
@ -1428,7 +1455,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
buffered_packet_batches: &mut UnprocessedPacketBatches,
banking_stage_stats: &BankingStageStats,
banking_stage_stats: &mut BankingStageStats,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>,
@ -1589,7 +1616,7 @@ impl BankingStage {
newly_buffered_packets_count: &mut usize,
batch_limit: usize,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
banking_stage_stats: &BankingStageStats,
banking_stage_stats: &mut BankingStageStats,
) {
{
let original_packets_count = packet_indexes.len();
@ -1624,6 +1651,10 @@ impl BankingStage {
*dropped_packets_count += dropped_batch.1.len();
}
}
let _ = banking_stage_stats
.batch_packet_indexes_len
.increment(packet_indexes.len() as u64);
*newly_buffered_packets_count += packet_indexes.len();
unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false));
}
@ -3197,7 +3228,7 @@ mod tests {
let mut dropped_packet_batches_count = 0;
let mut dropped_packets_count = 0;
let mut newly_buffered_packets_count = 0;
let banking_stage_stats = BankingStageStats::default();
let mut banking_stage_stats = BankingStageStats::default();
// Because the set of unprocessed `packet_indexes` is empty, the
// packets are not added to the unprocessed queue
BankingStage::push_unprocessed(
@ -3209,7 +3240,7 @@ mod tests {
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&banking_stage_stats,
&mut banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 1);
assert_eq!(dropped_packet_batches_count, 0);
@ -3228,7 +3259,7 @@ mod tests {
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&banking_stage_stats,
&mut banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(dropped_packet_batches_count, 0);
@ -3252,7 +3283,7 @@ mod tests {
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&banking_stage_stats,
&mut banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(
@ -3273,7 +3304,7 @@ mod tests {
&mut newly_buffered_packets_count,
3,
&duplicates,
&banking_stage_stats,
&mut banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(