* collect stats on packet batch indicies
* cleanup
* cleanup
* cleanup
* change name
(cherry picked from commit 650882217c
)
# Conflicts:
# core/src/banking_stage.rs
Co-authored-by: buffalu <85544055+buffalu@users.noreply.github.com>
This commit is contained in:
@ -4,6 +4,7 @@
|
||||
use {
|
||||
crate::{packet_hasher::PacketHasher, qos_service::QosService},
|
||||
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
|
||||
histogram::Histogram,
|
||||
itertools::Itertools,
|
||||
lru::LruCache,
|
||||
retain_mut::RetainMut,
|
||||
@ -100,6 +101,7 @@ pub struct BankingStageStats {
|
||||
current_buffered_packet_batches_count: AtomicUsize,
|
||||
rebuffered_packets_count: AtomicUsize,
|
||||
consumed_buffered_packets_count: AtomicUsize,
|
||||
batch_packet_indexes_len: Histogram,
|
||||
|
||||
// Timing
|
||||
consume_buffered_packets_elapsed: AtomicU64,
|
||||
@ -116,6 +118,10 @@ impl BankingStageStats {
|
||||
pub fn new(id: u32) -> Self {
|
||||
BankingStageStats {
|
||||
id,
|
||||
batch_packet_indexes_len: Histogram::configure()
|
||||
.max_value(PACKETS_PER_BATCH as u64)
|
||||
.build()
|
||||
.unwrap(),
|
||||
..BankingStageStats::default()
|
||||
}
|
||||
}
|
||||
@ -149,9 +155,10 @@ impl BankingStageStats {
|
||||
.unprocessed_packet_conversion_elapsed
|
||||
.load(Ordering::Relaxed)
|
||||
+ self.transaction_processing_elapsed.load(Ordering::Relaxed)
|
||||
+ self.batch_packet_indexes_len.entries()
|
||||
}
|
||||
|
||||
fn report(&self, report_interval_ms: u64) {
|
||||
fn report(&mut self, report_interval_ms: u64) {
|
||||
// skip repoting metrics if stats is empty
|
||||
if self.is_empty() {
|
||||
return;
|
||||
@ -260,7 +267,28 @@ impl BankingStageStats {
|
||||
.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"packet_batch_indices_len_min",
|
||||
self.batch_packet_indexes_len.minimum().unwrap_or(0) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"packet_batch_indices_len_max",
|
||||
self.batch_packet_indexes_len.maximum().unwrap_or(0) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"packet_batch_indices_len_mean",
|
||||
self.batch_packet_indexes_len.mean().unwrap_or(0) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"packet_batch_indices_len_90pct",
|
||||
self.batch_packet_indexes_len.percentile(90.0).unwrap_or(0) as i64,
|
||||
i64
|
||||
)
|
||||
);
|
||||
self.batch_packet_indexes_len.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -731,7 +759,7 @@ impl BankingStage {
|
||||
let recorder = poh_recorder.lock().unwrap().recorder();
|
||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit);
|
||||
let banking_stage_stats = BankingStageStats::new(id);
|
||||
let mut banking_stage_stats = BankingStageStats::new(id);
|
||||
loop {
|
||||
let my_pubkey = cluster_info.id();
|
||||
while !buffered_packet_batches.is_empty() {
|
||||
@ -779,7 +807,7 @@ impl BankingStage {
|
||||
transaction_status_sender.clone(),
|
||||
&gossip_vote_sender,
|
||||
&mut buffered_packet_batches,
|
||||
&banking_stage_stats,
|
||||
&mut banking_stage_stats,
|
||||
duplicates,
|
||||
&recorder,
|
||||
&qos_service,
|
||||
@ -1331,7 +1359,7 @@ impl BankingStage {
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
gossip_vote_sender: &ReplayVoteSender,
|
||||
buffered_packet_batches: &mut UnprocessedPacketBatches,
|
||||
banking_stage_stats: &BankingStageStats,
|
||||
banking_stage_stats: &mut BankingStageStats,
|
||||
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
|
||||
recorder: &TransactionRecorder,
|
||||
qos_service: &Arc<QosService>,
|
||||
@ -1497,7 +1525,7 @@ impl BankingStage {
|
||||
newly_buffered_packets_count: &mut usize,
|
||||
batch_limit: usize,
|
||||
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
|
||||
banking_stage_stats: &BankingStageStats,
|
||||
banking_stage_stats: &mut BankingStageStats,
|
||||
) {
|
||||
{
|
||||
let original_packets_count = packet_indexes.len();
|
||||
@ -1532,6 +1560,10 @@ impl BankingStage {
|
||||
*dropped_packets_count += dropped_batch.1.len();
|
||||
}
|
||||
}
|
||||
let _ = banking_stage_stats
|
||||
.batch_packet_indexes_len
|
||||
.increment(packet_indexes.len() as u64);
|
||||
|
||||
*newly_buffered_packets_count += packet_indexes.len();
|
||||
unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false));
|
||||
}
|
||||
@ -3101,7 +3133,7 @@ mod tests {
|
||||
let mut dropped_packet_batches_count = 0;
|
||||
let mut dropped_packets_count = 0;
|
||||
let mut newly_buffered_packets_count = 0;
|
||||
let banking_stage_stats = BankingStageStats::default();
|
||||
let mut banking_stage_stats = BankingStageStats::default();
|
||||
// Because the set of unprocessed `packet_indexes` is empty, the
|
||||
// packets are not added to the unprocessed queue
|
||||
BankingStage::push_unprocessed(
|
||||
@ -3113,7 +3145,7 @@ mod tests {
|
||||
&mut newly_buffered_packets_count,
|
||||
batch_limit,
|
||||
&duplicates,
|
||||
&banking_stage_stats,
|
||||
&mut banking_stage_stats,
|
||||
);
|
||||
assert_eq!(unprocessed_packets.len(), 1);
|
||||
assert_eq!(dropped_packet_batches_count, 0);
|
||||
@ -3132,7 +3164,7 @@ mod tests {
|
||||
&mut newly_buffered_packets_count,
|
||||
batch_limit,
|
||||
&duplicates,
|
||||
&banking_stage_stats,
|
||||
&mut banking_stage_stats,
|
||||
);
|
||||
assert_eq!(unprocessed_packets.len(), 2);
|
||||
assert_eq!(dropped_packet_batches_count, 0);
|
||||
@ -3156,7 +3188,7 @@ mod tests {
|
||||
&mut newly_buffered_packets_count,
|
||||
batch_limit,
|
||||
&duplicates,
|
||||
&banking_stage_stats,
|
||||
&mut banking_stage_stats,
|
||||
);
|
||||
assert_eq!(unprocessed_packets.len(), 2);
|
||||
assert_eq!(
|
||||
@ -3177,7 +3209,7 @@ mod tests {
|
||||
&mut newly_buffered_packets_count,
|
||||
3,
|
||||
&duplicates,
|
||||
&banking_stage_stats,
|
||||
&mut banking_stage_stats,
|
||||
);
|
||||
assert_eq!(unprocessed_packets.len(), 2);
|
||||
assert_eq!(
|
||||
|
Reference in New Issue
Block a user