(cherry picked from commit ca3f147670
)
Co-authored-by: Justin Starry <justin@solana.com>
This commit is contained in:
@ -82,9 +82,11 @@ pub struct BankingStageStats {
|
|||||||
id: u32,
|
id: u32,
|
||||||
process_packets_count: AtomicUsize,
|
process_packets_count: AtomicUsize,
|
||||||
new_tx_count: AtomicUsize,
|
new_tx_count: AtomicUsize,
|
||||||
dropped_batches_count: AtomicUsize,
|
dropped_packet_batches_count: AtomicUsize,
|
||||||
|
dropped_packets_count: AtomicUsize,
|
||||||
newly_buffered_packets_count: AtomicUsize,
|
newly_buffered_packets_count: AtomicUsize,
|
||||||
current_buffered_packets_count: AtomicUsize,
|
current_buffered_packets_count: AtomicUsize,
|
||||||
|
current_buffered_packet_batches_count: AtomicUsize,
|
||||||
rebuffered_packets_count: AtomicUsize,
|
rebuffered_packets_count: AtomicUsize,
|
||||||
consumed_buffered_packets_count: AtomicUsize,
|
consumed_buffered_packets_count: AtomicUsize,
|
||||||
|
|
||||||
@ -122,8 +124,13 @@ impl BankingStageStats {
|
|||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"dropped_batches_count",
|
"dropped_packet_batches_count",
|
||||||
self.dropped_batches_count.swap(0, Ordering::Relaxed) as i64,
|
self.dropped_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"dropped_packets_count",
|
||||||
|
self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64,
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
@ -131,6 +138,12 @@ impl BankingStageStats {
|
|||||||
self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64,
|
self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64,
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"current_buffered_packet_batches_count",
|
||||||
|
self.current_buffered_packet_batches_count
|
||||||
|
.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
(
|
(
|
||||||
"current_buffered_packets_count",
|
"current_buffered_packets_count",
|
||||||
self.current_buffered_packets_count
|
self.current_buffered_packets_count
|
||||||
@ -1178,7 +1191,8 @@ impl BankingStage {
|
|||||||
let mut new_tx_count = 0;
|
let mut new_tx_count = 0;
|
||||||
|
|
||||||
let mut mms_iter = mms.into_iter();
|
let mut mms_iter = mms.into_iter();
|
||||||
let mut dropped_batches_count = 0;
|
let mut dropped_packets_count = 0;
|
||||||
|
let mut dropped_packet_batches_count = 0;
|
||||||
let mut newly_buffered_packets_count = 0;
|
let mut newly_buffered_packets_count = 0;
|
||||||
while let Some(msgs) = mms_iter.next() {
|
while let Some(msgs) = mms_iter.next() {
|
||||||
let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
|
let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
|
||||||
@ -1188,7 +1202,8 @@ impl BankingStage {
|
|||||||
buffered_packets,
|
buffered_packets,
|
||||||
msgs,
|
msgs,
|
||||||
packet_indexes,
|
packet_indexes,
|
||||||
&mut dropped_batches_count,
|
&mut dropped_packets_count,
|
||||||
|
&mut dropped_packet_batches_count,
|
||||||
&mut newly_buffered_packets_count,
|
&mut newly_buffered_packets_count,
|
||||||
batch_limit,
|
batch_limit,
|
||||||
duplicates,
|
duplicates,
|
||||||
@ -1217,7 +1232,8 @@ impl BankingStage {
|
|||||||
buffered_packets,
|
buffered_packets,
|
||||||
msgs,
|
msgs,
|
||||||
unprocessed_indexes,
|
unprocessed_indexes,
|
||||||
&mut dropped_batches_count,
|
&mut dropped_packet_batches_count,
|
||||||
|
&mut dropped_packets_count,
|
||||||
&mut newly_buffered_packets_count,
|
&mut newly_buffered_packets_count,
|
||||||
batch_limit,
|
batch_limit,
|
||||||
duplicates,
|
duplicates,
|
||||||
@ -1243,7 +1259,8 @@ impl BankingStage {
|
|||||||
buffered_packets,
|
buffered_packets,
|
||||||
msgs,
|
msgs,
|
||||||
unprocessed_indexes,
|
unprocessed_indexes,
|
||||||
&mut dropped_batches_count,
|
&mut dropped_packet_batches_count,
|
||||||
|
&mut dropped_packets_count,
|
||||||
&mut newly_buffered_packets_count,
|
&mut newly_buffered_packets_count,
|
||||||
batch_limit,
|
batch_limit,
|
||||||
duplicates,
|
duplicates,
|
||||||
@ -1278,14 +1295,18 @@ impl BankingStage {
|
|||||||
.new_tx_count
|
.new_tx_count
|
||||||
.fetch_add(new_tx_count, Ordering::Relaxed);
|
.fetch_add(new_tx_count, Ordering::Relaxed);
|
||||||
banking_stage_stats
|
banking_stage_stats
|
||||||
.dropped_batches_count
|
.dropped_packet_batches_count
|
||||||
.fetch_add(dropped_batches_count, Ordering::Relaxed);
|
.fetch_add(dropped_packet_batches_count, Ordering::Relaxed);
|
||||||
banking_stage_stats
|
banking_stage_stats
|
||||||
.newly_buffered_packets_count
|
.newly_buffered_packets_count
|
||||||
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed);
|
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed);
|
||||||
banking_stage_stats
|
banking_stage_stats
|
||||||
.current_buffered_packets_count
|
.current_buffered_packet_batches_count
|
||||||
.swap(buffered_packets.len(), Ordering::Relaxed);
|
.swap(buffered_packets.len(), Ordering::Relaxed);
|
||||||
|
banking_stage_stats.current_buffered_packets_count.swap(
|
||||||
|
buffered_packets.iter().map(|packets| packets.1.len()).sum(),
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
*recv_start = Instant::now();
|
*recv_start = Instant::now();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -1294,7 +1315,8 @@ impl BankingStage {
|
|||||||
unprocessed_packets: &mut UnprocessedPackets,
|
unprocessed_packets: &mut UnprocessedPackets,
|
||||||
packets: Packets,
|
packets: Packets,
|
||||||
mut packet_indexes: Vec<usize>,
|
mut packet_indexes: Vec<usize>,
|
||||||
dropped_batches_count: &mut usize,
|
dropped_packet_batches_count: &mut usize,
|
||||||
|
dropped_packets_count: &mut usize,
|
||||||
newly_buffered_packets_count: &mut usize,
|
newly_buffered_packets_count: &mut usize,
|
||||||
batch_limit: usize,
|
batch_limit: usize,
|
||||||
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
|
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
|
||||||
@ -1321,8 +1343,10 @@ impl BankingStage {
|
|||||||
}
|
}
|
||||||
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
|
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
|
||||||
if unprocessed_packets.len() >= batch_limit {
|
if unprocessed_packets.len() >= batch_limit {
|
||||||
*dropped_batches_count += 1;
|
*dropped_packet_batches_count += 1;
|
||||||
unprocessed_packets.pop_front();
|
if let Some(dropped_batch) = unprocessed_packets.pop_front() {
|
||||||
|
*dropped_packets_count += dropped_batch.1.len();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
*newly_buffered_packets_count += packet_indexes.len();
|
*newly_buffered_packets_count += packet_indexes.len();
|
||||||
unprocessed_packets.push_back((packets, packet_indexes, false));
|
unprocessed_packets.push_back((packets, packet_indexes, false));
|
||||||
@ -2651,23 +2675,22 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_push_unprocessed_batch_limit() {
|
fn test_push_unprocessed_batch_limit() {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
// Create `Packets` with 1 unprocessed element
|
// Create `Packets` with 2 unprocessed elements
|
||||||
let single_element_packets = Packets::new(vec![Packet::default()]);
|
let new_packets = Packets::new(vec![Packet::default(); 2]);
|
||||||
let mut unprocessed_packets: UnprocessedPackets =
|
let mut unprocessed_packets: UnprocessedPackets =
|
||||||
vec![(single_element_packets.clone(), vec![0], false)]
|
vec![(new_packets, vec![0, 1], false)].into_iter().collect();
|
||||||
.into_iter()
|
|
||||||
.collect();
|
|
||||||
// Set the limit to 2
|
// Set the limit to 2
|
||||||
let batch_limit = 2;
|
let batch_limit = 2;
|
||||||
// Create some new unprocessed packets
|
// Create some new unprocessed packets
|
||||||
let new_packets = single_element_packets;
|
let new_packets = Packets::new(vec![Packet::default()]);
|
||||||
let packet_indexes = vec![];
|
let packet_indexes = vec![];
|
||||||
|
|
||||||
let duplicates = Arc::new(Mutex::new((
|
let duplicates = Arc::new(Mutex::new((
|
||||||
LruCache::new(DEFAULT_LRU_SIZE),
|
LruCache::new(DEFAULT_LRU_SIZE),
|
||||||
PacketHasher::default(),
|
PacketHasher::default(),
|
||||||
)));
|
)));
|
||||||
let mut dropped_batches_count = 0;
|
let mut dropped_packet_batches_count = 0;
|
||||||
|
let mut dropped_packets_count = 0;
|
||||||
let mut newly_buffered_packets_count = 0;
|
let mut newly_buffered_packets_count = 0;
|
||||||
let banking_stage_stats = BankingStageStats::default();
|
let banking_stage_stats = BankingStageStats::default();
|
||||||
// Because the set of unprocessed `packet_indexes` is empty, the
|
// Because the set of unprocessed `packet_indexes` is empty, the
|
||||||
@ -2676,14 +2699,16 @@ mod tests {
|
|||||||
&mut unprocessed_packets,
|
&mut unprocessed_packets,
|
||||||
new_packets.clone(),
|
new_packets.clone(),
|
||||||
packet_indexes,
|
packet_indexes,
|
||||||
&mut dropped_batches_count,
|
&mut dropped_packet_batches_count,
|
||||||
|
&mut dropped_packets_count,
|
||||||
&mut newly_buffered_packets_count,
|
&mut newly_buffered_packets_count,
|
||||||
batch_limit,
|
batch_limit,
|
||||||
&duplicates,
|
&duplicates,
|
||||||
&banking_stage_stats,
|
&banking_stage_stats,
|
||||||
);
|
);
|
||||||
assert_eq!(unprocessed_packets.len(), 1);
|
assert_eq!(unprocessed_packets.len(), 1);
|
||||||
assert_eq!(dropped_batches_count, 0);
|
assert_eq!(dropped_packet_batches_count, 0);
|
||||||
|
assert_eq!(dropped_packets_count, 0);
|
||||||
assert_eq!(newly_buffered_packets_count, 0);
|
assert_eq!(newly_buffered_packets_count, 0);
|
||||||
|
|
||||||
// Because the set of unprocessed `packet_indexes` is non-empty, the
|
// Because the set of unprocessed `packet_indexes` is non-empty, the
|
||||||
@ -2693,14 +2718,16 @@ mod tests {
|
|||||||
&mut unprocessed_packets,
|
&mut unprocessed_packets,
|
||||||
new_packets,
|
new_packets,
|
||||||
packet_indexes.clone(),
|
packet_indexes.clone(),
|
||||||
&mut dropped_batches_count,
|
&mut dropped_packet_batches_count,
|
||||||
|
&mut dropped_packets_count,
|
||||||
&mut newly_buffered_packets_count,
|
&mut newly_buffered_packets_count,
|
||||||
batch_limit,
|
batch_limit,
|
||||||
&duplicates,
|
&duplicates,
|
||||||
&banking_stage_stats,
|
&banking_stage_stats,
|
||||||
);
|
);
|
||||||
assert_eq!(unprocessed_packets.len(), 2);
|
assert_eq!(unprocessed_packets.len(), 2);
|
||||||
assert_eq!(dropped_batches_count, 0);
|
assert_eq!(dropped_packet_batches_count, 0);
|
||||||
|
assert_eq!(dropped_packets_count, 0);
|
||||||
assert_eq!(newly_buffered_packets_count, 1);
|
assert_eq!(newly_buffered_packets_count, 1);
|
||||||
|
|
||||||
// Because we've reached the batch limit, old unprocessed packets are
|
// Because we've reached the batch limit, old unprocessed packets are
|
||||||
@ -2715,7 +2742,8 @@ mod tests {
|
|||||||
&mut unprocessed_packets,
|
&mut unprocessed_packets,
|
||||||
new_packets.clone(),
|
new_packets.clone(),
|
||||||
packet_indexes.clone(),
|
packet_indexes.clone(),
|
||||||
&mut dropped_batches_count,
|
&mut dropped_packet_batches_count,
|
||||||
|
&mut dropped_packets_count,
|
||||||
&mut newly_buffered_packets_count,
|
&mut newly_buffered_packets_count,
|
||||||
batch_limit,
|
batch_limit,
|
||||||
&duplicates,
|
&duplicates,
|
||||||
@ -2723,15 +2751,17 @@ mod tests {
|
|||||||
);
|
);
|
||||||
assert_eq!(unprocessed_packets.len(), 2);
|
assert_eq!(unprocessed_packets.len(), 2);
|
||||||
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
|
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
|
||||||
assert_eq!(dropped_batches_count, 1);
|
assert_eq!(dropped_packet_batches_count, 1);
|
||||||
|
assert_eq!(dropped_packets_count, 2);
|
||||||
assert_eq!(newly_buffered_packets_count, 2);
|
assert_eq!(newly_buffered_packets_count, 2);
|
||||||
|
|
||||||
// Check duplicates are dropped
|
// Check duplicates are dropped (newly buffered shouldn't change)
|
||||||
BankingStage::push_unprocessed(
|
BankingStage::push_unprocessed(
|
||||||
&mut unprocessed_packets,
|
&mut unprocessed_packets,
|
||||||
new_packets.clone(),
|
new_packets.clone(),
|
||||||
packet_indexes,
|
packet_indexes,
|
||||||
&mut dropped_batches_count,
|
&mut dropped_packet_batches_count,
|
||||||
|
&mut dropped_packets_count,
|
||||||
&mut newly_buffered_packets_count,
|
&mut newly_buffered_packets_count,
|
||||||
3,
|
3,
|
||||||
&duplicates,
|
&duplicates,
|
||||||
@ -2739,7 +2769,8 @@ mod tests {
|
|||||||
);
|
);
|
||||||
assert_eq!(unprocessed_packets.len(), 2);
|
assert_eq!(unprocessed_packets.len(), 2);
|
||||||
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
|
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
|
||||||
assert_eq!(dropped_batches_count, 1);
|
assert_eq!(dropped_packet_batches_count, 1);
|
||||||
|
assert_eq!(dropped_packets_count, 2);
|
||||||
assert_eq!(newly_buffered_packets_count, 2);
|
assert_eq!(newly_buffered_packets_count, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user