Add counter for dropped duplicated packets, fix dropped_packets_count (#20834)
This commit is contained in:
@ -92,6 +92,7 @@ pub struct BankingStageStats {
|
|||||||
new_tx_count: AtomicUsize,
|
new_tx_count: AtomicUsize,
|
||||||
dropped_packet_batches_count: AtomicUsize,
|
dropped_packet_batches_count: AtomicUsize,
|
||||||
dropped_packets_count: AtomicUsize,
|
dropped_packets_count: AtomicUsize,
|
||||||
|
dropped_duplicated_packets_count: AtomicUsize,
|
||||||
newly_buffered_packets_count: AtomicUsize,
|
newly_buffered_packets_count: AtomicUsize,
|
||||||
current_buffered_packets_count: AtomicUsize,
|
current_buffered_packets_count: AtomicUsize,
|
||||||
current_buffered_packet_batches_count: AtomicUsize,
|
current_buffered_packet_batches_count: AtomicUsize,
|
||||||
@ -147,6 +148,12 @@ impl BankingStageStats {
|
|||||||
self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64,
|
self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64,
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"dropped_duplicated_packets_count",
|
||||||
|
self.dropped_duplicated_packets_count
|
||||||
|
.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
(
|
(
|
||||||
"newly_buffered_packets_count",
|
"newly_buffered_packets_count",
|
||||||
self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64,
|
self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64,
|
||||||
@ -1444,8 +1451,8 @@ impl BankingStage {
|
|||||||
buffered_packets,
|
buffered_packets,
|
||||||
msgs,
|
msgs,
|
||||||
packet_indexes,
|
packet_indexes,
|
||||||
&mut dropped_packets_count,
|
|
||||||
&mut dropped_packet_batches_count,
|
&mut dropped_packet_batches_count,
|
||||||
|
&mut dropped_packets_count,
|
||||||
&mut newly_buffered_packets_count,
|
&mut newly_buffered_packets_count,
|
||||||
batch_limit,
|
batch_limit,
|
||||||
duplicates,
|
duplicates,
|
||||||
@ -1549,6 +1556,9 @@ impl BankingStage {
|
|||||||
banking_stage_stats
|
banking_stage_stats
|
||||||
.dropped_packet_batches_count
|
.dropped_packet_batches_count
|
||||||
.fetch_add(dropped_packet_batches_count, Ordering::Relaxed);
|
.fetch_add(dropped_packet_batches_count, Ordering::Relaxed);
|
||||||
|
banking_stage_stats
|
||||||
|
.dropped_packets_count
|
||||||
|
.fetch_add(dropped_packets_count, Ordering::Relaxed);
|
||||||
banking_stage_stats
|
banking_stage_stats
|
||||||
.newly_buffered_packets_count
|
.newly_buffered_packets_count
|
||||||
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed);
|
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed);
|
||||||
@ -1575,6 +1585,7 @@ impl BankingStage {
|
|||||||
banking_stage_stats: &BankingStageStats,
|
banking_stage_stats: &BankingStageStats,
|
||||||
) {
|
) {
|
||||||
{
|
{
|
||||||
|
let original_packets_count = packet_indexes.len();
|
||||||
let mut packet_duplicate_check_time = Measure::start("packet_duplicate_check");
|
let mut packet_duplicate_check_time = Measure::start("packet_duplicate_check");
|
||||||
let mut duplicates = duplicates.lock().unwrap();
|
let mut duplicates = duplicates.lock().unwrap();
|
||||||
let (cache, hasher) = duplicates.deref_mut();
|
let (cache, hasher) = duplicates.deref_mut();
|
||||||
@ -1592,6 +1603,12 @@ impl BankingStage {
|
|||||||
banking_stage_stats
|
banking_stage_stats
|
||||||
.packet_duplicate_check_elapsed
|
.packet_duplicate_check_elapsed
|
||||||
.fetch_add(packet_duplicate_check_time.as_us(), Ordering::Relaxed);
|
.fetch_add(packet_duplicate_check_time.as_us(), Ordering::Relaxed);
|
||||||
|
banking_stage_stats
|
||||||
|
.dropped_duplicated_packets_count
|
||||||
|
.fetch_add(
|
||||||
|
original_packets_count.saturating_sub(packet_indexes.len()),
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
|
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
|
||||||
if unprocessed_packets.len() >= batch_limit {
|
if unprocessed_packets.len() >= batch_limit {
|
||||||
|
Reference in New Issue
Block a user