* reimplement rpc pubsub with a broadcast queue * update tests for new pubsub implementation * fix: fix review suggestions * chore(rpc): add additional pubsub metrics * integrate max subscriptions check into SubscriptionTracker to reduce locking * separate subscription control from tracker * limit memory usage of items in pubsub broadcast queue, improve error handling * add more pubsub metrics * add final count metrics to pubsub * add metric for total number of subscriptions * fix small review suggestions * remove by_params from SubscriptionTracker and add node_progress_watchers map instead * add subscription tracker tests * add metrics for number of pubsub notifications as a counter * ignore clippy lint in TokenCounter * fix underflow in token counter * reduce queue capacity in pubsub tests * fix(rpc): fix test timeouts * fix race in account subscription test * Add RpcSubscriptions::new_for_tests Co-authored-by: Pavel Strakhov <p.strakhov@iconic.vc> Co-authored-by: Nikita Podoliako <n.podoliako@zubr.io> Co-authored-by: Tyera Eulberg <tyera@solana.com>
64 lines
1.8 KiB
Rust
64 lines
1.8 KiB
Rust
#![allow(clippy::integer_arithmetic)]
|
|
pub mod counter;
|
|
pub mod datapoint;
|
|
mod metrics;
|
|
pub use crate::metrics::{flush, query, set_host_id, set_panic_hook, submit};
|
|
|
|
use std::sync::Arc;
|
|
|
|
/// A helper that sends the count of created tokens as a datapoint.
|
|
#[allow(clippy::redundant_allocation)]
|
|
pub struct TokenCounter(Arc<&'static str>);
|
|
|
|
impl TokenCounter {
|
|
/// Creates a new counter with the specified metrics `name`.
|
|
pub fn new(name: &'static str) -> Self {
|
|
Self(Arc::new(name))
|
|
}
|
|
|
|
/// Creates a new token for this counter. The metric's value will be equal
|
|
/// to the number of `CounterToken`s.
|
|
pub fn create_token(&self) -> CounterToken {
|
|
// new_count = strong_count
|
|
// - 1 (in TokenCounter)
|
|
// + 1 (token that's being created)
|
|
datapoint_info!(*self.0, ("count", Arc::strong_count(&self.0), i64));
|
|
CounterToken(self.0.clone())
|
|
}
|
|
}
|
|
|
|
/// A token for `TokenCounter`.
|
|
#[allow(clippy::redundant_allocation)]
|
|
pub struct CounterToken(Arc<&'static str>);
|
|
|
|
impl Clone for CounterToken {
|
|
fn clone(&self) -> Self {
|
|
// new_count = strong_count
|
|
// - 1 (in TokenCounter)
|
|
// + 1 (token that's being created)
|
|
datapoint_info!(*self.0, ("count", Arc::strong_count(&self.0), i64));
|
|
CounterToken(self.0.clone())
|
|
}
|
|
}
|
|
|
|
impl Drop for CounterToken {
|
|
fn drop(&mut self) {
|
|
// new_count = strong_count
|
|
// - 1 (in TokenCounter, if it still exists)
|
|
// - 1 (token that's being dropped)
|
|
datapoint_info!(
|
|
*self.0,
|
|
("count", Arc::strong_count(&self.0).saturating_sub(2), i64)
|
|
);
|
|
}
|
|
}
|
|
|
|
impl Drop for TokenCounter {
|
|
fn drop(&mut self) {
|
|
datapoint_info!(
|
|
*self.0,
|
|
("count", Arc::strong_count(&self.0).saturating_sub(2), i64)
|
|
);
|
|
}
|
|
}
|