* report pubsub stats every 2s (#21192)
(cherry picked from commit 7659a2edc2
)
* remove use of Duration::MAX
Co-authored-by: Jeff Biseda <jbiseda@gmail.com>
This commit is contained in:
@@ -437,6 +437,40 @@ fn initial_last_notified_slot(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct PubsubNotificationStats {
|
||||||
|
since: Option<Instant>,
|
||||||
|
notification_entry_processing_count: u64,
|
||||||
|
notification_entry_processing_time_us: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PubsubNotificationStats {
|
||||||
|
fn maybe_submit(&mut self) {
|
||||||
|
const SUBMIT_CADENCE: Duration = Duration::from_secs(2);
|
||||||
|
let elapsed = self.since.as_ref().map(Instant::elapsed);
|
||||||
|
if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
datapoint_info!(
|
||||||
|
"pubsub_notification_entries",
|
||||||
|
(
|
||||||
|
"notification_entry_processing_count",
|
||||||
|
self.notification_entry_processing_count,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"notification_entry_processing_time_us",
|
||||||
|
self.notification_entry_processing_time_us,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
);
|
||||||
|
*self = Self {
|
||||||
|
since: Some(Instant::now()),
|
||||||
|
..Self::default()
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct RpcSubscriptions {
|
pub struct RpcSubscriptions {
|
||||||
notification_sender: Sender<TimestampedNotificationEntry>,
|
notification_sender: Sender<TimestampedNotificationEntry>,
|
||||||
t_cleanup: Option<JoinHandle<()>>,
|
t_cleanup: Option<JoinHandle<()>>,
|
||||||
@@ -624,6 +658,8 @@ impl RpcSubscriptions {
|
|||||||
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||||
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
|
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
|
||||||
) {
|
) {
|
||||||
|
let mut stats = PubsubNotificationStats::default();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
@@ -744,14 +780,9 @@ impl RpcSubscriptions {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
datapoint_info!(
|
stats.notification_entry_processing_time_us +=
|
||||||
"pubsub_notification_entries",
|
queued_at.elapsed().as_micros() as u64;
|
||||||
(
|
stats.notification_entry_processing_count += 1;
|
||||||
"notification_entry_processing_time_us",
|
|
||||||
queued_at.elapsed().as_micros() as i64,
|
|
||||||
i64
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
Err(RecvTimeoutError::Timeout) => {
|
Err(RecvTimeoutError::Timeout) => {
|
||||||
// not a problem - try reading again
|
// not a problem - try reading again
|
||||||
@@ -761,6 +792,7 @@ impl RpcSubscriptions {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
stats.maybe_submit();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user