diff --git a/Cargo.lock b/Cargo.lock index 2afa4be2f4..150bf3e26f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,9 +94,9 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrayvec" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4dc07131ffa69b8072d35f5007352af944213cde02545e2103680baed38fcd" +checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "ascii" @@ -335,7 +335,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2607a74355ce2e252d0c483b2d8a348e1bba36036e786ccc2dcd777213c86ffd" dependencies = [ "arrayref", - "arrayvec 0.7.1", + "arrayvec 0.7.2", "cc", "cfg-if 1.0.0", "constant_time_eq", @@ -2807,9 +2807,9 @@ dependencies = [ [[package]] name = "parity-ws" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0ab8a461779bd022964cae2b4989fa9c99deb270bec162da2125ec03c09fcaa" +checksum = "5983d3929ad50f12c3eb9a6743f19d691866ecd44da74c0a3308c3f8a56df0c6" dependencies = [ "byteorder", "bytes 0.4.12", @@ -5505,6 +5505,7 @@ dependencies = [ "jsonrpc-ws-server", "libc", "log 0.4.14", + "rayon", "regex", "serde", "serde_derive", @@ -5522,6 +5523,7 @@ dependencies = [ "solana-net-utils", "solana-perf", "solana-poh", + "solana-rayon-threadlimit", "solana-runtime", "solana-sdk", "solana-send-transaction-service", @@ -6574,9 +6576,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2dd85aeaba7b68df939bd357c6afb36c87951be9e80bf9c859f2fc3e9fca0fd" +checksum = "114383b041aa6212c579467afa0075fbbdd0718de036100bc0ba7961d8cb9095" dependencies = [ "proc-macro2 1.0.32", "quote 1.0.10", @@ -6595,9 +6597,9 @@ dependencies = [ [[package]] name = "tokio-postgres" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5c07a6ceeeb8515d53998ac4487788a21884e79d5651490bc31a7289f20a7d7" +checksum = "4b6c8b33df661b548dcd8f9bf87debb8c56c05657ed291122e1188698c2ece95" dependencies = [ "async-trait", "byteorder", @@ -6709,9 +6711,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.8" +version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d3725d3efa29485e87311c5b699de63cde14b00ed4d256b8318aa30ca452cd" +checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" dependencies = [ "bytes 1.1.0", "futures-core", diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index d2593bcbf6..a71aaa6d5a 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -24,6 +24,7 @@ jsonrpc-pubsub = "18.0.0" jsonrpc-ws-server = "18.0.0" libc = "0.2.105" log = "0.4.14" +rayon = "1.5.1" regex = "1.5.4" serde = "1.0.130" serde_derive = "1.0.103" @@ -39,6 +40,7 @@ solana-measure = { path = "../measure", version = "=1.9.0" } solana-metrics = { path = "../metrics", version = "=1.9.0" } solana-perf = { path = "../perf", version = "=1.9.0" } solana-poh = { path = "../poh", version = "=1.9.0" } +solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.9.0" } solana-runtime = { path = "../runtime", version = "=1.9.0" } solana-sdk = { path = "../sdk", version = "=1.9.0" } solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.9.0" } diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index dcac3c0d36..b124a40f58 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -29,6 +29,7 @@ pub const MAX_ACTIVE_SUBSCRIPTIONS: usize = 1_000_000; pub const DEFAULT_QUEUE_CAPACITY_ITEMS: usize = 10_000_000; pub const DEFAULT_TEST_QUEUE_CAPACITY_ITEMS: usize = 100; pub const DEFAULT_QUEUE_CAPACITY_BYTES: usize = 256 * 1024 * 1024; +pub const DEFAULT_WORKER_THREADS: usize = 1; #[derive(Debug, Clone)] pub struct PubSubConfig { @@ -36,6 +37,8 @@ pub struct PubSubConfig { pub max_active_subscriptions: usize, pub queue_capacity_items: usize, pub queue_capacity_bytes: usize, + pub worker_threads: usize, + pub notification_threads: Option, } impl Default for PubSubConfig { @@ -45,6 +48,8 @@ impl Default for PubSubConfig { max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS, queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES, + worker_threads: DEFAULT_WORKER_THREADS, + notification_threads: None, } } } @@ -56,6 +61,8 @@ impl PubSubConfig { max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS, queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES, + worker_threads: DEFAULT_WORKER_THREADS, + notification_threads: Some(2), } } } @@ -77,7 +84,8 @@ impl PubSubService { let thread_hdl = Builder::new() .name("solana-pubsub".to_string()) .spawn(move || { - let runtime = tokio::runtime::Builder::new_current_thread() + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(pubsub_config.worker_threads) .enable_all() .build() .expect("runtime creation failed"); @@ -145,6 +153,17 @@ impl BroadcastHandler { { count_final(entry.get().params()); + let time_since_created = notification.created_at.elapsed(); + + datapoint_info!( + "pubsub_notifications", + ( + "created_to_queue_time_us", + time_since_created.as_micros() as i64, + i64 + ), + ); + if notification.is_final { entry.remove(); } diff --git a/rpc/src/rpc_subscription_tracker.rs b/rpc/src/rpc_subscription_tracker.rs index 97e8031732..911fdbd1bc 100644 --- a/rpc/src/rpc_subscription_tracker.rs +++ b/rpc/src/rpc_subscription_tracker.rs @@ -1,5 +1,5 @@ use { - crate::rpc_subscriptions::{NotificationEntry, RpcNotification}, + crate::rpc_subscriptions::{NotificationEntry, RpcNotification, TimestampedNotificationEntry}, dashmap::{mapref::entry::Entry as DashEntry, DashMap}, solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig}, solana_client::rpc_filter::RpcFilterType, @@ -164,7 +164,7 @@ struct SubscriptionControlInner { subscriptions: DashMap>, next_id: AtomicU64, max_active_subscriptions: usize, - sender: crossbeam_channel::Sender, + sender: crossbeam_channel::Sender, broadcast_sender: broadcast::Sender, counter: TokenCounter, } @@ -172,7 +172,7 @@ struct SubscriptionControlInner { impl SubscriptionControl { pub fn new( max_active_subscriptions: usize, - sender: crossbeam_channel::Sender, + sender: crossbeam_channel::Sender, broadcast_sender: broadcast::Sender, ) -> Self { Self(Arc::new(SubscriptionControlInner { @@ -220,7 +220,7 @@ impl SubscriptionControl { let _ = self .0 .sender - .send(NotificationEntry::Subscribed(token.0.params.clone(), id)); + .send(NotificationEntry::Subscribed(token.0.params.clone(), id).into()); entry.insert(Arc::downgrade(&token.0)); datapoint_info!( "rpc-subscription", @@ -506,10 +506,10 @@ impl Drop for SubscriptionTokenInner { warn!("Subscriptions inconsistency (missing entry in by_params)"); } DashEntry::Occupied(entry) => { - let _ = self.control.sender.send(NotificationEntry::Unsubscribed( - self.params.clone(), - self.id, - )); + let _ = self + .control + .sender + .send(NotificationEntry::Unsubscribed(self.params.clone(), self.id).into()); entry.remove(); datapoint_info!( "rpc-subscription", @@ -543,7 +543,7 @@ mod tests { struct ControlWrapper { control: SubscriptionControl, - receiver: crossbeam_channel::Receiver, + receiver: crossbeam_channel::Receiver, } impl ControlWrapper { @@ -560,7 +560,7 @@ mod tests { } fn assert_subscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) { - if let NotificationEntry::Subscribed(params, id) = self.receiver.recv().unwrap() { + if let NotificationEntry::Subscribed(params, id) = self.receiver.recv().unwrap().entry { assert_eq!(¶ms, expected_params); assert_eq!(id, SubscriptionId::from(expected_id)); } else { @@ -570,7 +570,8 @@ mod tests { } fn assert_unsubscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) { - if let NotificationEntry::Unsubscribed(params, id) = self.receiver.recv().unwrap() { + if let NotificationEntry::Unsubscribed(params, id) = self.receiver.recv().unwrap().entry + { assert_eq!(¶ms, expected_params); assert_eq!(id, SubscriptionId::from(expected_id)); } else { diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 033cd563af..adeaa35b70 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -12,6 +12,7 @@ use { }, }, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, + rayon::prelude::*, serde::Serialize, solana_account_decoder::{parse_token::spl_token_id_v2_0, UiAccount, UiAccountEncoding}, solana_client::{ @@ -22,6 +23,7 @@ use { }, }, solana_measure::measure::Measure, + solana_rayon_threadlimit::get_thread_count, solana_runtime::{ bank::{Bank, TransactionLogInfo}, bank_forks::BankForks, @@ -37,15 +39,17 @@ use { }, solana_vote_program::vote_state::Vote, std::{ + cell::RefCell, collections::{HashMap, VecDeque}, io::Cursor, iter, str, sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLock, Weak, + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, RwLock, Weak, }, thread::{Builder, JoinHandle}, time::Duration, + time::Instant, }, tokio::sync::broadcast, }; @@ -69,6 +73,21 @@ fn get_transaction_logs( } logs } +#[derive(Debug)] +pub struct TimestampedNotificationEntry { + pub entry: NotificationEntry, + pub queued_at: Instant, +} + +impl From for TimestampedNotificationEntry { + fn from(entry: NotificationEntry) -> Self { + TimestampedNotificationEntry { + entry, + queued_at: Instant::now(), + } + } +} + pub enum NotificationEntry { Slot(SlotInfo), SlotUpdate(SlotUpdate), @@ -115,7 +134,7 @@ fn check_commitment_and_notify( commitment_slots: &CommitmentSlots, bank_method: B, filter_results: F, - notifier: &mut RpcNotifier, + notifier: &RpcNotifier, is_final: bool, ) -> bool where @@ -165,6 +184,7 @@ pub struct RpcNotification { pub subscription_id: SubscriptionId, pub is_final: bool, pub json: Weak, + pub created_at: Instant, } struct RecentItems { @@ -209,8 +229,11 @@ impl RecentItems { struct RpcNotifier { sender: broadcast::Sender, - buf: Vec, - recent_items: RecentItems, + recent_items: Mutex, +} + +thread_local! { + static RPC_NOTIFIER_BUF: RefCell> = RefCell::new(Vec::new()); } #[derive(Debug, Serialize)] @@ -227,28 +250,32 @@ struct Notification { } impl RpcNotifier { - fn notify(&mut self, value: T, subscription: &SubscriptionInfo, is_final: bool) + fn notify(&self, value: T, subscription: &SubscriptionInfo, is_final: bool) where T: serde::Serialize, { - self.buf.clear(); - let notification = Notification { - jsonrpc: Some(jsonrpc_core::Version::V2), - method: subscription.method(), - params: NotificationParams { - result: value, - subscription: subscription.id(), - }, - }; - serde_json::to_writer(Cursor::new(&mut self.buf), ¬ification) - .expect("serialization never fails"); - let buf_str = str::from_utf8(&self.buf).expect("json is always utf-8"); - let buf_arc = Arc::new(String::from(buf_str)); + let buf_arc = RPC_NOTIFIER_BUF.with(|buf| { + let mut buf = buf.borrow_mut(); + buf.clear(); + let notification = Notification { + jsonrpc: Some(jsonrpc_core::Version::V2), + method: subscription.method(), + params: NotificationParams { + result: value, + subscription: subscription.id(), + }, + }; + serde_json::to_writer(Cursor::new(&mut *buf), ¬ification) + .expect("serialization never fails"); + let buf_str = str::from_utf8(&buf).expect("json is always utf-8"); + Arc::new(String::from(buf_str)) + }); let notification = RpcNotification { subscription_id: subscription.id(), json: Arc::downgrade(&buf_arc), is_final, + created_at: Instant::now(), }; // There is an unlikely case where this can fail: if the last subscription is closed // just as the notifier generates a notification for it. @@ -257,7 +284,7 @@ impl RpcNotifier { inc_new_counter_info!("rpc-pubsub-messages", 1); inc_new_counter_info!("rpc-pubsub-bytes", buf_arc.len()); - self.recent_items.push(buf_arc); + self.recent_items.lock().unwrap().push(buf_arc); } } @@ -405,8 +432,7 @@ fn initial_last_notified_slot( } pub struct RpcSubscriptions { - notification_sender: Sender, - + notification_sender: Sender, t_cleanup: Option>, exit: Arc, @@ -468,24 +494,31 @@ impl RpcSubscriptions { let notifier = RpcNotifier { sender: broadcast_sender.clone(), - buf: Vec::new(), - recent_items: RecentItems::new( + recent_items: Mutex::new(RecentItems::new( config.queue_capacity_items, config.queue_capacity_bytes, - ), + )), }; + let notification_threads = config.notification_threads; let t_cleanup = Builder::new() .name("solana-rpc-notifications".to_string()) .spawn(move || { - Self::process_notifications( - exit_clone, - notifier, - notification_receiver, - subscriptions, - bank_forks, - block_commitment_cache, - optimistically_confirmed_bank, - ); + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(notification_threads.unwrap_or_else(get_thread_count)) + .thread_name(|i| format!("sol-sub-notif-{}", i)) + .build() + .unwrap(); + pool.install(|| { + Self::process_notifications( + exit_clone, + notifier, + notification_receiver, + subscriptions, + bank_forks, + block_commitment_cache, + optimistically_confirmed_bank, + ) + }); }) .unwrap(); @@ -565,7 +598,7 @@ impl RpcSubscriptions { } fn enqueue_notification(&self, notification_entry: NotificationEntry) { - match self.notification_sender.send(notification_entry) { + match self.notification_sender.send(notification_entry.into()) { Ok(()) => (), Err(SendError(notification)) => { warn!( @@ -578,8 +611,8 @@ impl RpcSubscriptions { fn process_notifications( exit: Arc, - mut notifier: RpcNotifier, - notification_receiver: Receiver, + notifier: RpcNotifier, + notification_receiver: Receiver, mut subscriptions: SubscriptionsTracker, bank_forks: Arc>, block_commitment_cache: Arc>, @@ -591,7 +624,8 @@ impl RpcSubscriptions { } match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) { Ok(notification_entry) => { - match notification_entry { + let TimestampedNotificationEntry { entry, queued_at } = notification_entry; + match entry { NotificationEntry::Subscribed(params, id) => { subscriptions.subscribe(params.clone(), id, || { initial_last_notified_slot( @@ -658,7 +692,7 @@ impl RpcSubscriptions { subscriptions.commitment_watchers(), &bank_forks, &commitment_slots, - &mut notifier, + ¬ifier, "bank", ) } @@ -672,7 +706,7 @@ impl RpcSubscriptions { subscriptions.gossip_watchers(), &bank_forks, &commitment_slots, - &mut notifier, + ¬ifier, "gossip", ) } @@ -704,6 +738,14 @@ impl RpcSubscriptions { } } } + datapoint_info!( + "pubsub_notification_entries", + ( + "notification_entry_processing_time_us", + queued_at.elapsed().as_micros() as i64, + i64 + ) + ); } Err(RecvTimeoutError::Timeout) => { // not a problem - try reading again @@ -720,23 +762,24 @@ impl RpcSubscriptions { subscriptions: &HashMap>, bank_forks: &Arc>, commitment_slots: &CommitmentSlots, - notifier: &mut RpcNotifier, + notifier: &RpcNotifier, source: &'static str, ) { let mut total_time = Measure::start("notify_accounts_logs_programs_signatures"); - let mut num_accounts_found = 0; - let mut num_accounts_notified = 0; + let num_accounts_found = AtomicUsize::new(0); + let num_accounts_notified = AtomicUsize::new(0); - let mut num_logs_found = 0; - let mut num_logs_notified = 0; + let num_logs_found = AtomicUsize::new(0); + let num_logs_notified = AtomicUsize::new(0); - let mut num_signatures_found = 0; - let mut num_signatures_notified = 0; + let num_signatures_found = AtomicUsize::new(0); + let num_signatures_notified = AtomicUsize::new(0); - let mut num_programs_found = 0; - let mut num_programs_notified = 0; + let num_programs_found = AtomicUsize::new(0); + let num_programs_notified = AtomicUsize::new(0); - for subscription in subscriptions.values() { + let subscriptions = subscriptions.into_par_iter(); + subscriptions.for_each(|(_id, subscription)| { match subscription.params() { SubscriptionParams::Account(params) => { let notified = check_commitment_and_notify( @@ -750,10 +793,10 @@ impl RpcSubscriptions { false, ); - num_accounts_found += 1; + num_accounts_found.fetch_add(1, Ordering::Relaxed); if notified { - num_accounts_notified += 1; + num_accounts_notified.fetch_add(1, Ordering::Relaxed); } } SubscriptionParams::Logs(params) => { @@ -767,10 +810,10 @@ impl RpcSubscriptions { notifier, false, ); - num_logs_found += 1; + num_logs_found.fetch_add(1, Ordering::Relaxed); if notified { - num_logs_notified += 1; + num_logs_notified.fetch_add(1, Ordering::Relaxed); } } SubscriptionParams::Program(params) => { @@ -786,10 +829,10 @@ impl RpcSubscriptions { notifier, false, ); - num_programs_found += 1; + num_programs_found.fetch_add(1, Ordering::Relaxed); if notified { - num_programs_notified += 1; + num_programs_notified.fetch_add(1, Ordering::Relaxed); } } SubscriptionParams::Signature(params) => { @@ -805,65 +848,97 @@ impl RpcSubscriptions { notifier, true, // Unsubscribe. ); - num_signatures_found += 1; + num_signatures_found.fetch_add(1, Ordering::Relaxed); if notified { - num_signatures_notified += 1; + num_signatures_notified.fetch_add(1, Ordering::Relaxed); } } _ => error!("wrong subscription type in alps map"), } - } + }); total_time.stop(); - let total_notified = num_accounts_notified - + num_logs_notified - + num_programs_notified - + num_signatures_notified; + let total_notified = num_accounts_notified.load(Ordering::Relaxed) + + num_logs_notified.load(Ordering::Relaxed) + + num_programs_notified.load(Ordering::Relaxed) + + num_signatures_notified.load(Ordering::Relaxed); let total_ms = total_time.as_ms(); if total_notified > 0 || total_ms > 10 { debug!( "notified({}): accounts: {} / {} logs: {} / {} programs: {} / {} signatures: {} / {}", source, - num_accounts_found, - num_accounts_notified, - num_logs_found, - num_logs_notified, - num_programs_found, - num_programs_notified, - num_signatures_found, - num_signatures_notified, + num_accounts_found.load(Ordering::Relaxed), + num_accounts_notified.load(Ordering::Relaxed), + num_logs_found.load(Ordering::Relaxed), + num_logs_notified.load(Ordering::Relaxed), + num_programs_found.load(Ordering::Relaxed), + num_programs_notified.load(Ordering::Relaxed), + num_signatures_found.load(Ordering::Relaxed), + num_signatures_notified.load(Ordering::Relaxed), ); inc_new_counter_info!("rpc-subscription-notify-bank-or-gossip", total_notified); datapoint_info!( "rpc_subscriptions", ("source", source.to_string(), String), - ("num_account_subscriptions", num_accounts_found, i64), - ("num_account_pubkeys_notified", num_accounts_notified, i64), - ("num_logs_subscriptions", num_logs_found, i64), - ("num_logs_notified", num_logs_notified, i64), - ("num_program_subscriptions", num_programs_found, i64), - ("num_programs_notified", num_programs_notified, i64), - ("num_signature_subscriptions", num_signatures_found, i64), - ("num_signatures_notified", num_signatures_notified, i64), + ( + "num_account_subscriptions", + num_accounts_found.load(Ordering::Relaxed), + i64 + ), + ( + "num_account_pubkeys_notified", + num_accounts_notified.load(Ordering::Relaxed), + i64 + ), + ( + "num_logs_subscriptions", + num_logs_found.load(Ordering::Relaxed), + i64 + ), + ( + "num_logs_notified", + num_logs_notified.load(Ordering::Relaxed), + i64 + ), + ( + "num_program_subscriptions", + num_programs_found.load(Ordering::Relaxed), + i64 + ), + ( + "num_programs_notified", + num_programs_notified.load(Ordering::Relaxed), + i64 + ), + ( + "num_signature_subscriptions", + num_signatures_found.load(Ordering::Relaxed), + i64 + ), + ( + "num_signatures_notified", + num_signatures_notified.load(Ordering::Relaxed), + i64 + ), ("notifications_time", total_time.as_us() as i64, i64), ); inc_new_counter_info!( "rpc-subscription-counter-num_accounts_notified", - num_accounts_notified + num_accounts_notified.load(Ordering::Relaxed) ); inc_new_counter_info!( "rpc-subscription-counter-num_logs_notified", - num_logs_notified + num_logs_notified.load(Ordering::Relaxed) ); inc_new_counter_info!( "rpc-subscription-counter-num_programs_notified", - num_programs_notified + num_programs_notified.load(Ordering::Relaxed) ); inc_new_counter_info!( "rpc-subscription-counter-num_signatures_notified", - num_signatures_notified + num_signatures_notified.load(Ordering::Relaxed) ); } } diff --git a/validator/src/main.rs b/validator/src/main.rs index c63a4a0db9..7dbaa332bd 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1179,6 +1179,15 @@ pub fn main() { .default_value("30") .help("Number of seconds before timing out RPC requests backed by BigTable"), ) + .arg( + Arg::with_name("rpc_pubsub_worker_threads") + .long("rpc-pubsub-worker-threads") + .takes_value(true) + .value_name("NUMBER") + .validator(is_parsable::) + .default_value("4") + .help("PubSub worker threads"), + ) .arg( Arg::with_name("rpc_pubsub_enable_vote_subscription") .long("rpc-pubsub-enable-vote-subscription") @@ -1256,6 +1265,15 @@ pub fn main() { .help("The maximum total size of notifications that RPC PubSub will store \ across all connections."), ) + .arg( + Arg::with_name("rpc_pubsub_notification_threads") + .long("rpc-pubsub-notification-threads") + .takes_value(true) + .value_name("NUM_THREADS") + .validator(is_parsable::) + .help("The maximum number of threads that RPC PubSub will use \ + for generating notifications."), + ) .arg( Arg::with_name("rpc_send_transaction_retry_ms") .long("rpc-send-retry-ms") @@ -2194,6 +2212,8 @@ pub fn main() { "rpc_pubsub_queue_capacity_bytes", usize ), + worker_threads: value_t_or_exit!(matches, "rpc_pubsub_worker_threads", usize), + notification_threads: value_of(&matches, "rpc_pubsub_notification_threads"), }, voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode, wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),