Add banking metrics for buffered and dropped packets (#19902) (#19926)

(cherry picked from commit ca3f147670)

Co-authored-by: Justin Starry <justin@solana.com>
This commit is contained in:
mergify[bot]
2021-09-16 00:39:02 +00:00
committed by GitHub
parent 0c6a133b63
commit ccef24c44e

View File

@ -91,9 +91,11 @@ pub struct BankingStageStats {
id: u32, id: u32,
process_packets_count: AtomicUsize, process_packets_count: AtomicUsize,
new_tx_count: AtomicUsize, new_tx_count: AtomicUsize,
dropped_batches_count: AtomicUsize, dropped_packet_batches_count: AtomicUsize,
dropped_packets_count: AtomicUsize,
newly_buffered_packets_count: AtomicUsize, newly_buffered_packets_count: AtomicUsize,
current_buffered_packets_count: AtomicUsize, current_buffered_packets_count: AtomicUsize,
current_buffered_packet_batches_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize, rebuffered_packets_count: AtomicUsize,
consumed_buffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize,
@ -143,8 +145,13 @@ impl BankingStageStats {
i64 i64
), ),
( (
"dropped_batches_count", "dropped_packet_batches_count",
self.dropped_batches_count.swap(0, Ordering::Relaxed) as i64, self.dropped_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"dropped_packets_count",
self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
( (
@ -152,6 +159,12 @@ impl BankingStageStats {
self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64, self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
(
"current_buffered_packet_batches_count",
self.current_buffered_packet_batches_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
( (
"current_buffered_packets_count", "current_buffered_packets_count",
self.current_buffered_packets_count self.current_buffered_packets_count
@ -1225,7 +1238,8 @@ impl BankingStage {
let mut new_tx_count = 0; let mut new_tx_count = 0;
let mut mms_iter = mms.into_iter(); let mut mms_iter = mms.into_iter();
let mut dropped_batches_count = 0; let mut dropped_packets_count = 0;
let mut dropped_packet_batches_count = 0;
let mut newly_buffered_packets_count = 0; let mut newly_buffered_packets_count = 0;
while let Some(msgs) = mms_iter.next() { while let Some(msgs) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&msgs.packets); let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
@ -1235,7 +1249,8 @@ impl BankingStage {
buffered_packets, buffered_packets,
msgs, msgs,
packet_indexes, packet_indexes,
&mut dropped_batches_count, &mut dropped_packets_count,
&mut dropped_packet_batches_count,
&mut newly_buffered_packets_count, &mut newly_buffered_packets_count,
batch_limit, batch_limit,
duplicates, duplicates,
@ -1264,7 +1279,8 @@ impl BankingStage {
buffered_packets, buffered_packets,
msgs, msgs,
unprocessed_indexes, unprocessed_indexes,
&mut dropped_batches_count, &mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count, &mut newly_buffered_packets_count,
batch_limit, batch_limit,
duplicates, duplicates,
@ -1290,7 +1306,8 @@ impl BankingStage {
buffered_packets, buffered_packets,
msgs, msgs,
unprocessed_indexes, unprocessed_indexes,
&mut dropped_batches_count, &mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count, &mut newly_buffered_packets_count,
batch_limit, batch_limit,
duplicates, duplicates,
@ -1325,14 +1342,18 @@ impl BankingStage {
.new_tx_count .new_tx_count
.fetch_add(new_tx_count, Ordering::Relaxed); .fetch_add(new_tx_count, Ordering::Relaxed);
banking_stage_stats banking_stage_stats
.dropped_batches_count .dropped_packet_batches_count
.fetch_add(dropped_batches_count, Ordering::Relaxed); .fetch_add(dropped_packet_batches_count, Ordering::Relaxed);
banking_stage_stats banking_stage_stats
.newly_buffered_packets_count .newly_buffered_packets_count
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed); .fetch_add(newly_buffered_packets_count, Ordering::Relaxed);
banking_stage_stats banking_stage_stats
.current_buffered_packets_count .current_buffered_packet_batches_count
.swap(buffered_packets.len(), Ordering::Relaxed); .swap(buffered_packets.len(), Ordering::Relaxed);
banking_stage_stats.current_buffered_packets_count.swap(
buffered_packets.iter().map(|packets| packets.1.len()).sum(),
Ordering::Relaxed,
);
*recv_start = Instant::now(); *recv_start = Instant::now();
Ok(()) Ok(())
} }
@ -1341,7 +1362,8 @@ impl BankingStage {
unprocessed_packets: &mut UnprocessedPackets, unprocessed_packets: &mut UnprocessedPackets,
packets: Packets, packets: Packets,
mut packet_indexes: Vec<usize>, mut packet_indexes: Vec<usize>,
dropped_batches_count: &mut usize, dropped_packet_batches_count: &mut usize,
dropped_packets_count: &mut usize,
newly_buffered_packets_count: &mut usize, newly_buffered_packets_count: &mut usize,
batch_limit: usize, batch_limit: usize,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>, duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
@ -1368,8 +1390,10 @@ impl BankingStage {
} }
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
if unprocessed_packets.len() >= batch_limit { if unprocessed_packets.len() >= batch_limit {
*dropped_batches_count += 1; *dropped_packet_batches_count += 1;
unprocessed_packets.pop_front(); if let Some(dropped_batch) = unprocessed_packets.pop_front() {
*dropped_packets_count += dropped_batch.1.len();
}
} }
*newly_buffered_packets_count += packet_indexes.len(); *newly_buffered_packets_count += packet_indexes.len();
unprocessed_packets.push_back((packets, packet_indexes, false)); unprocessed_packets.push_back((packets, packet_indexes, false));
@ -2772,23 +2796,22 @@ mod tests {
#[test] #[test]
fn test_push_unprocessed_batch_limit() { fn test_push_unprocessed_batch_limit() {
solana_logger::setup(); solana_logger::setup();
// Create `Packets` with 1 unprocessed element // Create `Packets` with 2 unprocessed elements
let single_element_packets = Packets::new(vec![Packet::default()]); let new_packets = Packets::new(vec![Packet::default(); 2]);
let mut unprocessed_packets: UnprocessedPackets = let mut unprocessed_packets: UnprocessedPackets =
vec![(single_element_packets.clone(), vec![0], false)] vec![(new_packets, vec![0, 1], false)].into_iter().collect();
.into_iter()
.collect();
// Set the limit to 2 // Set the limit to 2
let batch_limit = 2; let batch_limit = 2;
// Create some new unprocessed packets // Create some new unprocessed packets
let new_packets = single_element_packets; let new_packets = Packets::new(vec![Packet::default()]);
let packet_indexes = vec![]; let packet_indexes = vec![];
let duplicates = Arc::new(Mutex::new(( let duplicates = Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE), LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(), PacketHasher::default(),
))); )));
let mut dropped_batches_count = 0; let mut dropped_packet_batches_count = 0;
let mut dropped_packets_count = 0;
let mut newly_buffered_packets_count = 0; let mut newly_buffered_packets_count = 0;
let banking_stage_stats = BankingStageStats::default(); let banking_stage_stats = BankingStageStats::default();
// Because the set of unprocessed `packet_indexes` is empty, the // Because the set of unprocessed `packet_indexes` is empty, the
@ -2797,14 +2820,16 @@ mod tests {
&mut unprocessed_packets, &mut unprocessed_packets,
new_packets.clone(), new_packets.clone(),
packet_indexes, packet_indexes,
&mut dropped_batches_count, &mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count, &mut newly_buffered_packets_count,
batch_limit, batch_limit,
&duplicates, &duplicates,
&banking_stage_stats, &banking_stage_stats,
); );
assert_eq!(unprocessed_packets.len(), 1); assert_eq!(unprocessed_packets.len(), 1);
assert_eq!(dropped_batches_count, 0); assert_eq!(dropped_packet_batches_count, 0);
assert_eq!(dropped_packets_count, 0);
assert_eq!(newly_buffered_packets_count, 0); assert_eq!(newly_buffered_packets_count, 0);
// Because the set of unprocessed `packet_indexes` is non-empty, the // Because the set of unprocessed `packet_indexes` is non-empty, the
@ -2814,14 +2839,16 @@ mod tests {
&mut unprocessed_packets, &mut unprocessed_packets,
new_packets, new_packets,
packet_indexes.clone(), packet_indexes.clone(),
&mut dropped_batches_count, &mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count, &mut newly_buffered_packets_count,
batch_limit, batch_limit,
&duplicates, &duplicates,
&banking_stage_stats, &banking_stage_stats,
); );
assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(dropped_batches_count, 0); assert_eq!(dropped_packet_batches_count, 0);
assert_eq!(dropped_packets_count, 0);
assert_eq!(newly_buffered_packets_count, 1); assert_eq!(newly_buffered_packets_count, 1);
// Because we've reached the batch limit, old unprocessed packets are // Because we've reached the batch limit, old unprocessed packets are
@ -2836,7 +2863,8 @@ mod tests {
&mut unprocessed_packets, &mut unprocessed_packets,
new_packets.clone(), new_packets.clone(),
packet_indexes.clone(), packet_indexes.clone(),
&mut dropped_batches_count, &mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count, &mut newly_buffered_packets_count,
batch_limit, batch_limit,
&duplicates, &duplicates,
@ -2844,15 +2872,17 @@ mod tests {
); );
assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
assert_eq!(dropped_batches_count, 1); assert_eq!(dropped_packet_batches_count, 1);
assert_eq!(dropped_packets_count, 2);
assert_eq!(newly_buffered_packets_count, 2); assert_eq!(newly_buffered_packets_count, 2);
// Check duplicates are dropped // Check duplicates are dropped (newly buffered shouldn't change)
BankingStage::push_unprocessed( BankingStage::push_unprocessed(
&mut unprocessed_packets, &mut unprocessed_packets,
new_packets.clone(), new_packets.clone(),
packet_indexes, packet_indexes,
&mut dropped_batches_count, &mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count, &mut newly_buffered_packets_count,
3, 3,
&duplicates, &duplicates,
@ -2860,7 +2890,8 @@ mod tests {
); );
assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
assert_eq!(dropped_batches_count, 1); assert_eq!(dropped_packet_batches_count, 1);
assert_eq!(dropped_packets_count, 2);
assert_eq!(newly_buffered_packets_count, 2); assert_eq!(newly_buffered_packets_count, 2);
} }