From 0afb330db0cafa7e097a6f359c70946c4e63d400 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 18 Jun 2021 00:35:26 +0000 Subject: [PATCH] validator: expose max active pubsub subscriptions to CLI (#18035) (cherry picked from commit 5efc48fc692bb34e32324c86ee4a952ef93c6f84) # Conflicts: # core/src/rpc_pubsub_service.rs Co-authored-by: Trent Nelson --- core/src/rpc_pubsub.rs | 23 ++++++++++++++++++----- core/src/rpc_pubsub_service.rs | 9 +++++++-- validator/src/main.rs | 17 +++++++++++++++++ 3 files changed, 42 insertions(+), 7 deletions(-) diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index ad5182a67c..7909379f10 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -25,7 +25,7 @@ use std::{ sync::{atomic, Arc}, }; -const MAX_ACTIVE_SUBSCRIPTIONS: usize = 100_000; +pub const MAX_ACTIVE_SUBSCRIPTIONS: usize = 100_000; // Suppress needless_return due to // https://github.com/paritytech/jsonrpc/blob/2d38e6424d8461cdf72e78425ce67d51af9c6586/derive/src/lib.rs#L204 @@ -192,25 +192,35 @@ pub trait RpcSolPubSub { pub struct RpcSolPubSubImpl { uid: Arc, subscriptions: Arc, + max_active_subscriptions: usize, } impl RpcSolPubSubImpl { - pub fn new(subscriptions: Arc) -> Self { + pub fn new(subscriptions: Arc, max_active_subscriptions: usize) -> Self { let uid = Arc::new(atomic::AtomicUsize::default()); - Self { uid, subscriptions } + Self { + uid, + subscriptions, + max_active_subscriptions, + } } #[cfg(test)] fn default_with_bank_forks(bank_forks: Arc>) -> Self { let uid = Arc::new(atomic::AtomicUsize::default()); let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(bank_forks)); - Self { uid, subscriptions } + let max_active_subscriptions = MAX_ACTIVE_SUBSCRIPTIONS; + Self { + uid, + subscriptions, + max_active_subscriptions, + } } fn check_subscription_count(&self) -> Result<()> { let num_subscriptions = self.subscriptions.total(); debug!("Total existing subscriptions: {}", num_subscriptions); - if num_subscriptions >= MAX_ACTIVE_SUBSCRIPTIONS { + if num_subscriptions >= self.max_active_subscriptions { info!("Node subscription limit reached"); Err(Error { code: ErrorCode::InternalError, @@ -627,6 +637,7 @@ mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )), uid: Arc::new(atomic::AtomicUsize::default()), + max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, }; // Test signature subscriptions @@ -807,6 +818,7 @@ mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )), uid: Arc::new(atomic::AtomicUsize::default()), + max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, }; let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); @@ -917,6 +929,7 @@ mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )), uid: Arc::new(atomic::AtomicUsize::default()), + max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, }; let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); diff --git a/core/src/rpc_pubsub_service.rs b/core/src/rpc_pubsub_service.rs index a9ef61b6f3..8f11997fa0 100644 --- a/core/src/rpc_pubsub_service.rs +++ b/core/src/rpc_pubsub_service.rs @@ -1,7 +1,7 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request use crate::{ - rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl}, + rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl, MAX_ACTIVE_SUBSCRIPTIONS}, rpc_subscriptions::RpcSubscriptions, }; use jsonrpc_pubsub::{PubSubHandler, Session}; @@ -27,6 +27,7 @@ pub struct PubSubConfig { pub max_fragment_size: usize, pub max_in_buffer_capacity: usize, pub max_out_buffer_capacity: usize, + pub max_active_subscriptions: usize, } impl Default for PubSubConfig { @@ -37,6 +38,7 @@ impl Default for PubSubConfig { max_fragment_size: 50 * 1024, // 50KB max_in_buffer_capacity: 50 * 1024, // 50KB max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc + max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, } } } @@ -53,7 +55,10 @@ impl PubSubService { exit: &Arc, ) -> Self { info!("rpc_pubsub bound to {:?}", pubsub_addr); - let rpc = RpcSolPubSubImpl::new(subscriptions.clone()); + let rpc = RpcSolPubSubImpl::new( + subscriptions.clone(), + pubsub_config.max_active_subscriptions, + ); let exit_ = exit.clone(); let thread_hdl = Builder::new() diff --git a/validator/src/main.rs b/validator/src/main.rs index 2f50512e45..6ecfb9acc1 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1021,6 +1021,8 @@ pub fn main() { PubSubConfig::default().max_in_buffer_capacity.to_string(); let default_rpc_pubsub_max_out_buffer_capacity = PubSubConfig::default().max_out_buffer_capacity.to_string(); + let default_rpc_pubsub_max_active_subscriptions = + PubSubConfig::default().max_active_subscriptions.to_string(); let default_rpc_send_transaction_retry_ms = ValidatorConfig::default() .send_transaction_retry_ms .to_string(); @@ -1589,6 +1591,16 @@ pub fn main() { .default_value(&default_rpc_pubsub_max_out_buffer_capacity) .help("The maximum size in bytes to which the outgoing websocket buffer can grow."), ) + .arg( + Arg::with_name("rpc_pubsub_max_active_subscriptions") + .long("rpc-pubsub-max-active-subscriptions") + .takes_value(true) + .value_name("NUMBER") + .validator(is_parsable::) + .default_value(&default_rpc_pubsub_max_active_subscriptions) + .help("The maximum number of active subscriptions that RPC PubSub will accept \ + across all connections."), + ) .arg( Arg::with_name("rpc_send_transaction_retry_ms") .long("rpc-send-retry-ms") @@ -2125,6 +2137,11 @@ pub fn main() { "rpc_pubsub_max_out_buffer_capacity", usize ), + max_active_subscriptions: value_t_or_exit!( + matches, + "rpc_pubsub_max_active_subscriptions", + usize + ), }, voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode, wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),