diff --git a/rpc/src/rpc_pubsub.rs b/rpc/src/rpc_pubsub.rs index 1e897835f3..9d9937237e 100644 --- a/rpc/src/rpc_pubsub.rs +++ b/rpc/src/rpc_pubsub.rs @@ -27,7 +27,7 @@ use { }, }; -const MAX_ACTIVE_SUBSCRIPTIONS: usize = 100_000; +pub const MAX_ACTIVE_SUBSCRIPTIONS: usize = 100_000; // Suppress needless_return due to // https://github.com/paritytech/jsonrpc/blob/2d38e6424d8461cdf72e78425ce67d51af9c6586/derive/src/lib.rs#L204 @@ -194,25 +194,35 @@ pub trait RpcSolPubSub { pub struct RpcSolPubSubImpl { uid: Arc, subscriptions: Arc, + max_active_subscriptions: usize, } impl RpcSolPubSubImpl { - pub fn new(subscriptions: Arc) -> Self { + pub fn new(subscriptions: Arc, max_active_subscriptions: usize) -> Self { let uid = Arc::new(atomic::AtomicUsize::default()); - Self { uid, subscriptions } + Self { + uid, + subscriptions, + max_active_subscriptions, + } } #[cfg(test)] fn default_with_bank_forks(bank_forks: Arc>) -> Self { let uid = Arc::new(atomic::AtomicUsize::default()); let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(bank_forks)); - Self { uid, subscriptions } + let max_active_subscriptions = MAX_ACTIVE_SUBSCRIPTIONS; + Self { + uid, + subscriptions, + max_active_subscriptions, + } } fn check_subscription_count(&self) -> Result<()> { let num_subscriptions = self.subscriptions.total(); debug!("Total existing subscriptions: {}", num_subscriptions); - if num_subscriptions >= MAX_ACTIVE_SUBSCRIPTIONS { + if num_subscriptions >= self.max_active_subscriptions { info!("Node subscription limit reached"); Err(Error { code: ErrorCode::InternalError, @@ -630,6 +640,7 @@ mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )), uid: Arc::new(atomic::AtomicUsize::default()), + max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, }; // Test signature subscriptions @@ -810,6 +821,7 @@ mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )), uid: Arc::new(atomic::AtomicUsize::default()), + max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, }; let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); @@ -920,6 +932,7 @@ mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )), uid: Arc::new(atomic::AtomicUsize::default()), + max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, }; let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index ea328b9af9..c768736717 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -2,7 +2,7 @@ use { crate::{ - rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl}, + rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl, MAX_ACTIVE_SUBSCRIPTIONS}, rpc_subscriptions::RpcSubscriptions, }, jsonrpc_pubsub::{PubSubHandler, Session}, @@ -29,6 +29,7 @@ pub struct PubSubConfig { pub max_fragment_size: usize, pub max_in_buffer_capacity: usize, pub max_out_buffer_capacity: usize, + pub max_active_subscriptions: usize, } impl Default for PubSubConfig { @@ -39,6 +40,7 @@ impl Default for PubSubConfig { max_fragment_size: 50 * 1024, // 50KB max_in_buffer_capacity: 50 * 1024, // 50KB max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc + max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, } } } @@ -55,7 +57,10 @@ impl PubSubService { exit: &Arc, ) -> Self { info!("rpc_pubsub bound to {:?}", pubsub_addr); - let rpc = RpcSolPubSubImpl::new(subscriptions.clone()); + let rpc = RpcSolPubSubImpl::new( + subscriptions.clone(), + pubsub_config.max_active_subscriptions, + ); let exit_ = exit.clone(); let thread_hdl = Builder::new() diff --git a/validator/src/main.rs b/validator/src/main.rs index c21eec8b2b..ebf6fc3c0f 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1003,6 +1003,8 @@ pub fn main() { PubSubConfig::default().max_in_buffer_capacity.to_string(); let default_rpc_pubsub_max_out_buffer_capacity = PubSubConfig::default().max_out_buffer_capacity.to_string(); + let default_rpc_pubsub_max_active_subscriptions = + PubSubConfig::default().max_active_subscriptions.to_string(); let default_rpc_send_transaction_retry_ms = ValidatorConfig::default() .send_transaction_retry_ms .to_string(); @@ -1610,6 +1612,16 @@ pub fn main() { .default_value(&default_rpc_pubsub_max_out_buffer_capacity) .help("The maximum size in bytes to which the outgoing websocket buffer can grow."), ) + .arg( + Arg::with_name("rpc_pubsub_max_active_subscriptions") + .long("rpc-pubsub-max-active-subscriptions") + .takes_value(true) + .value_name("NUMBER") + .validator(is_parsable::) + .default_value(&default_rpc_pubsub_max_active_subscriptions) + .help("The maximum number of active subscriptions that RPC PubSub will accept \ + across all connections."), + ) .arg( Arg::with_name("rpc_send_transaction_retry_ms") .long("rpc-send-retry-ms") @@ -2187,6 +2199,11 @@ pub fn main() { "rpc_pubsub_max_out_buffer_capacity", usize ), + max_active_subscriptions: value_t_or_exit!( + matches, + "rpc_pubsub_max_active_subscriptions", + usize + ), }, voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode, wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),