From a7b3436b1e02f175e5ed564e49a3fe34323e02e1 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sat, 6 Nov 2021 15:22:07 +0000 Subject: [PATCH] report pubsub stats every 2s (backport #21192) (#21196) * report pubsub stats every 2s (#21192) (cherry picked from commit 7659a2edc2901bc32246e79a61db0cce0ba0f80e) * remove use of Duration::MAX Co-authored-by: Jeff Biseda --- rpc/src/rpc_subscriptions.rs | 48 ++++++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index c12c58fdcf..a6ba0942e8 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -437,6 +437,40 @@ fn initial_last_notified_slot( } } +#[derive(Default)] +struct PubsubNotificationStats { + since: Option, + notification_entry_processing_count: u64, + notification_entry_processing_time_us: u64, +} + +impl PubsubNotificationStats { + fn maybe_submit(&mut self) { + const SUBMIT_CADENCE: Duration = Duration::from_secs(2); + let elapsed = self.since.as_ref().map(Instant::elapsed); + if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default() { + return; + } + datapoint_info!( + "pubsub_notification_entries", + ( + "notification_entry_processing_count", + self.notification_entry_processing_count, + i64 + ), + ( + "notification_entry_processing_time_us", + self.notification_entry_processing_time_us, + i64 + ), + ); + *self = Self { + since: Some(Instant::now()), + ..Self::default() + }; + } +} + pub struct RpcSubscriptions { notification_sender: Sender, t_cleanup: Option>, @@ -624,6 +658,8 @@ impl RpcSubscriptions { block_commitment_cache: Arc>, optimistically_confirmed_bank: Arc>, ) { + let mut stats = PubsubNotificationStats::default(); + loop { if exit.load(Ordering::Relaxed) { break; @@ -744,14 +780,9 @@ impl RpcSubscriptions { } } } - datapoint_info!( - "pubsub_notification_entries", - ( - "notification_entry_processing_time_us", - queued_at.elapsed().as_micros() as i64, - i64 - ) - ); + stats.notification_entry_processing_time_us += + queued_at.elapsed().as_micros() as u64; + stats.notification_entry_processing_count += 1; } Err(RecvTimeoutError::Timeout) => { // not a problem - try reading again @@ -761,6 +792,7 @@ impl RpcSubscriptions { break; } } + stats.maybe_submit(); } }