diff --git a/Cargo.lock b/Cargo.lock index 1be9113df0..1f11d941e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2732,9 +2732,9 @@ dependencies = [ [[package]] name = "parity-ws" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0ab8a461779bd022964cae2b4989fa9c99deb270bec162da2125ec03c09fcaa" +checksum = "5983d3929ad50f12c3eb9a6743f19d691866ecd44da74c0a3308c3f8a56df0c6" dependencies = [ "byteorder", "bytes 0.4.12", @@ -3459,9 +3459,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b0d8e0819fadc20c74ea8373106ead0600e3a67ef1fe8da56e39b9ae7275674" +checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" dependencies = [ "autocfg 1.0.0", "crossbeam-deque", @@ -3471,9 +3471,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.9.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ab346ac5921dc62ffa9f89b7a773907511cdfa5490c572ae9be1be33e8afa4a" +checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" dependencies = [ "crossbeam-channel 0.5.0", "crossbeam-deque", @@ -5377,6 +5377,7 @@ dependencies = [ "jsonrpc-ws-server", "libc", "log 0.4.14", + "rayon", "regex", "serde", "serde_derive", @@ -5394,6 +5395,7 @@ dependencies = [ "solana-net-utils", "solana-perf", "solana-poh", + "solana-rayon-threadlimit", "solana-runtime", "solana-sdk", "solana-stake-program", diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 69a5f91475..2a304c3123 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -24,6 +24,7 @@ jsonrpc-pubsub = "18.0.0" jsonrpc-ws-server = "18.0.0" libc = "0.2.81" log = "0.4.11" +rayon = "1.5.1" regex = "1.3.9" serde = "1.0.122" serde_derive = "1.0.103" @@ -38,6 +39,7 @@ solana-measure = { path = "../measure", version = "=1.8.3" } solana-metrics = { path = "../metrics", version = "=1.8.3" } solana-perf = { path = "../perf", version = "=1.8.3" } solana-poh = { path = "../poh", version = "=1.8.3" } +solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.8.3" } solana-runtime = { path = "../runtime", version = "=1.8.3" } solana-sdk = { path = "../sdk", version = "=1.8.3" } solana-streamer = { path = "../streamer", version = "=1.8.3" } diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index efef616bec..1f12e7843a 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -29,6 +29,7 @@ pub const MAX_ACTIVE_SUBSCRIPTIONS: usize = 1_000_000; pub const DEFAULT_QUEUE_CAPACITY_ITEMS: usize = 10_000_000; pub const DEFAULT_TEST_QUEUE_CAPACITY_ITEMS: usize = 100; pub const DEFAULT_QUEUE_CAPACITY_BYTES: usize = 256 * 1024 * 1024; +pub const DEFAULT_WORKER_THREADS: usize = 1; #[derive(Debug, Clone)] pub struct PubSubConfig { @@ -36,6 +37,8 @@ pub struct PubSubConfig { pub max_active_subscriptions: usize, pub queue_capacity_items: usize, pub queue_capacity_bytes: usize, + pub worker_threads: usize, + pub notification_threads: Option, } impl Default for PubSubConfig { @@ -45,6 +48,8 @@ impl Default for PubSubConfig { max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS, queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES, + worker_threads: DEFAULT_WORKER_THREADS, + notification_threads: None, } } } @@ -56,6 +61,8 @@ impl PubSubConfig { max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS, queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES, + worker_threads: DEFAULT_WORKER_THREADS, + notification_threads: Some(2), } } } @@ -77,7 +84,8 @@ impl PubSubService { let thread_hdl = Builder::new() .name("solana-pubsub".to_string()) .spawn(move || { - let runtime = tokio::runtime::Builder::new_current_thread() + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(pubsub_config.worker_threads) .enable_all() .build() .expect("runtime creation failed"); @@ -145,6 +153,17 @@ impl BroadcastHandler { { count_final(entry.get().params()); + let time_since_created = notification.created_at.elapsed(); + + datapoint_info!( + "pubsub_notifications", + ( + "created_to_queue_time_us", + time_since_created.as_micros() as i64, + i64 + ), + ); + if notification.is_final { entry.remove(); } diff --git a/rpc/src/rpc_subscription_tracker.rs b/rpc/src/rpc_subscription_tracker.rs index 45b4a03a5d..2cca19a5fb 100644 --- a/rpc/src/rpc_subscription_tracker.rs +++ b/rpc/src/rpc_subscription_tracker.rs @@ -1,5 +1,5 @@ use { - crate::rpc_subscriptions::{NotificationEntry, RpcNotification}, + crate::rpc_subscriptions::{NotificationEntry, RpcNotification, TimestampedNotificationEntry}, dashmap::{mapref::entry::Entry as DashEntry, DashMap}, solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig}, solana_client::rpc_filter::RpcFilterType, @@ -164,7 +164,7 @@ struct SubscriptionControlInner { subscriptions: DashMap>, next_id: AtomicU64, max_active_subscriptions: usize, - sender: crossbeam_channel::Sender, + sender: crossbeam_channel::Sender, broadcast_sender: broadcast::Sender, counter: TokenCounter, } @@ -172,7 +172,7 @@ struct SubscriptionControlInner { impl SubscriptionControl { pub fn new( max_active_subscriptions: usize, - sender: crossbeam_channel::Sender, + sender: crossbeam_channel::Sender, broadcast_sender: broadcast::Sender, ) -> Self { Self(Arc::new(SubscriptionControlInner { @@ -220,7 +220,7 @@ impl SubscriptionControl { let _ = self .0 .sender - .send(NotificationEntry::Subscribed(token.0.params.clone(), id)); + .send(NotificationEntry::Subscribed(token.0.params.clone(), id).into()); entry.insert(Arc::downgrade(&token.0)); datapoint_info!( "rpc-subscription", @@ -506,10 +506,10 @@ impl Drop for SubscriptionTokenInner { warn!("Subscriptions inconsistency (missing entry in by_params)"); } DashEntry::Occupied(entry) => { - let _ = self.control.sender.send(NotificationEntry::Unsubscribed( - self.params.clone(), - self.id, - )); + let _ = self + .control + .sender + .send(NotificationEntry::Unsubscribed(self.params.clone(), self.id).into()); entry.remove(); datapoint_info!( "rpc-subscription", @@ -543,7 +543,7 @@ mod tests { struct ControlWrapper { control: SubscriptionControl, - receiver: crossbeam_channel::Receiver, + receiver: crossbeam_channel::Receiver, } impl ControlWrapper { @@ -560,7 +560,7 @@ mod tests { } fn assert_subscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) { - if let NotificationEntry::Subscribed(params, id) = self.receiver.recv().unwrap() { + if let NotificationEntry::Subscribed(params, id) = self.receiver.recv().unwrap().entry { assert_eq!(¶ms, expected_params); assert_eq!(id, SubscriptionId::from(expected_id)); } else { @@ -570,7 +570,8 @@ mod tests { } fn assert_unsubscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) { - if let NotificationEntry::Unsubscribed(params, id) = self.receiver.recv().unwrap() { + if let NotificationEntry::Unsubscribed(params, id) = self.receiver.recv().unwrap().entry + { assert_eq!(¶ms, expected_params); assert_eq!(id, SubscriptionId::from(expected_id)); } else { diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 3191e5c951..c12c58fdcf 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -12,6 +12,7 @@ use { }, }, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, + rayon::prelude::*, serde::Serialize, solana_account_decoder::{parse_token::spl_token_id_v2_0, UiAccount, UiAccountEncoding}, solana_client::{ @@ -22,6 +23,7 @@ use { }, }, solana_measure::measure::Measure, + solana_rayon_threadlimit::get_thread_count, solana_runtime::{ bank::{Bank, TransactionLogInfo}, bank_forks::BankForks, @@ -37,15 +39,17 @@ use { }, solana_vote_program::vote_state::Vote, std::{ + cell::RefCell, collections::{HashMap, VecDeque}, io::Cursor, iter, str, sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLock, Weak, + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, RwLock, Weak, }, thread::{Builder, JoinHandle}, time::Duration, + time::Instant, }, tokio::sync::broadcast, }; @@ -69,6 +73,21 @@ fn get_transaction_logs( } logs } +#[derive(Debug)] +pub struct TimestampedNotificationEntry { + pub entry: NotificationEntry, + pub queued_at: Instant, +} + +impl From for TimestampedNotificationEntry { + fn from(entry: NotificationEntry) -> Self { + TimestampedNotificationEntry { + entry, + queued_at: Instant::now(), + } + } +} + pub enum NotificationEntry { Slot(SlotInfo), SlotUpdate(SlotUpdate), @@ -121,7 +140,7 @@ fn check_commitment_and_notify( commitment_slots: &CommitmentSlots, bank_method: B, filter_results: F, - notifier: &mut RpcNotifier, + notifier: &RpcNotifier, is_final: bool, ) -> bool where @@ -171,6 +190,7 @@ pub struct RpcNotification { pub subscription_id: SubscriptionId, pub is_final: bool, pub json: Weak, + pub created_at: Instant, } struct RecentItems { @@ -215,8 +235,11 @@ impl RecentItems { struct RpcNotifier { sender: broadcast::Sender, - buf: Vec, - recent_items: RecentItems, + recent_items: Mutex, +} + +thread_local! { + static RPC_NOTIFIER_BUF: RefCell> = RefCell::new(Vec::new()); } #[derive(Debug, Serialize)] @@ -233,28 +256,32 @@ struct Notification { } impl RpcNotifier { - fn notify(&mut self, value: T, subscription: &SubscriptionInfo, is_final: bool) + fn notify(&self, value: T, subscription: &SubscriptionInfo, is_final: bool) where T: serde::Serialize, { - self.buf.clear(); - let notification = Notification { - jsonrpc: Some(jsonrpc_core::Version::V2), - method: subscription.method(), - params: NotificationParams { - result: value, - subscription: subscription.id(), - }, - }; - serde_json::to_writer(Cursor::new(&mut self.buf), ¬ification) - .expect("serialization never fails"); - let buf_str = str::from_utf8(&self.buf).expect("json is always utf-8"); - let buf_arc = Arc::new(String::from(buf_str)); + let buf_arc = RPC_NOTIFIER_BUF.with(|buf| { + let mut buf = buf.borrow_mut(); + buf.clear(); + let notification = Notification { + jsonrpc: Some(jsonrpc_core::Version::V2), + method: subscription.method(), + params: NotificationParams { + result: value, + subscription: subscription.id(), + }, + }; + serde_json::to_writer(Cursor::new(&mut *buf), ¬ification) + .expect("serialization never fails"); + let buf_str = str::from_utf8(&buf).expect("json is always utf-8"); + Arc::new(String::from(buf_str)) + }); let notification = RpcNotification { subscription_id: subscription.id(), json: Arc::downgrade(&buf_arc), is_final, + created_at: Instant::now(), }; // There is an unlikely case where this can fail: if the last subscription is closed // just as the notifier generates a notification for it. @@ -263,7 +290,7 @@ impl RpcNotifier { inc_new_counter_info!("rpc-pubsub-messages", 1); inc_new_counter_info!("rpc-pubsub-bytes", buf_arc.len()); - self.recent_items.push(buf_arc); + self.recent_items.lock().unwrap().push(buf_arc); } } @@ -411,8 +438,7 @@ fn initial_last_notified_slot( } pub struct RpcSubscriptions { - notification_sender: Sender, - + notification_sender: Sender, t_cleanup: Option>, exit: Arc, @@ -474,24 +500,31 @@ impl RpcSubscriptions { let notifier = RpcNotifier { sender: broadcast_sender.clone(), - buf: Vec::new(), - recent_items: RecentItems::new( + recent_items: Mutex::new(RecentItems::new( config.queue_capacity_items, config.queue_capacity_bytes, - ), + )), }; + let notification_threads = config.notification_threads; let t_cleanup = Builder::new() .name("solana-rpc-notifications".to_string()) .spawn(move || { - Self::process_notifications( - exit_clone, - notifier, - notification_receiver, - subscriptions, - bank_forks, - block_commitment_cache, - optimistically_confirmed_bank, - ); + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(notification_threads.unwrap_or_else(get_thread_count)) + .thread_name(|i| format!("sol-sub-notif-{}", i)) + .build() + .unwrap(); + pool.install(|| { + Self::process_notifications( + exit_clone, + notifier, + notification_receiver, + subscriptions, + bank_forks, + block_commitment_cache, + optimistically_confirmed_bank, + ) + }); }) .unwrap(); @@ -571,7 +604,7 @@ impl RpcSubscriptions { } fn enqueue_notification(&self, notification_entry: NotificationEntry) { - match self.notification_sender.send(notification_entry) { + match self.notification_sender.send(notification_entry.into()) { Ok(()) => (), Err(SendError(notification)) => { warn!( @@ -584,8 +617,8 @@ impl RpcSubscriptions { fn process_notifications( exit: Arc, - mut notifier: RpcNotifier, - notification_receiver: Receiver, + notifier: RpcNotifier, + notification_receiver: Receiver, mut subscriptions: SubscriptionsTracker, bank_forks: Arc>, block_commitment_cache: Arc>, @@ -597,7 +630,8 @@ impl RpcSubscriptions { } match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) { Ok(notification_entry) => { - match notification_entry { + let TimestampedNotificationEntry { entry, queued_at } = notification_entry; + match entry { NotificationEntry::Subscribed(params, id) => { subscriptions.subscribe(params.clone(), id, || { initial_last_notified_slot( @@ -664,7 +698,7 @@ impl RpcSubscriptions { subscriptions.commitment_watchers(), &bank_forks, &commitment_slots, - &mut notifier, + ¬ifier, "bank", ) } @@ -678,7 +712,7 @@ impl RpcSubscriptions { subscriptions.gossip_watchers(), &bank_forks, &commitment_slots, - &mut notifier, + ¬ifier, "gossip", ) } @@ -710,6 +744,14 @@ impl RpcSubscriptions { } } } + datapoint_info!( + "pubsub_notification_entries", + ( + "notification_entry_processing_time_us", + queued_at.elapsed().as_micros() as i64, + i64 + ) + ); } Err(RecvTimeoutError::Timeout) => { // not a problem - try reading again @@ -726,23 +768,24 @@ impl RpcSubscriptions { subscriptions: &HashMap>, bank_forks: &Arc>, commitment_slots: &CommitmentSlots, - notifier: &mut RpcNotifier, + notifier: &RpcNotifier, source: &'static str, ) { let mut total_time = Measure::start("notify_accounts_logs_programs_signatures"); - let mut num_accounts_found = 0; - let mut num_accounts_notified = 0; + let num_accounts_found = AtomicUsize::new(0); + let num_accounts_notified = AtomicUsize::new(0); - let mut num_logs_found = 0; - let mut num_logs_notified = 0; + let num_logs_found = AtomicUsize::new(0); + let num_logs_notified = AtomicUsize::new(0); - let mut num_signatures_found = 0; - let mut num_signatures_notified = 0; + let num_signatures_found = AtomicUsize::new(0); + let num_signatures_notified = AtomicUsize::new(0); - let mut num_programs_found = 0; - let mut num_programs_notified = 0; + let num_programs_found = AtomicUsize::new(0); + let num_programs_notified = AtomicUsize::new(0); - for subscription in subscriptions.values() { + let subscriptions = subscriptions.into_par_iter(); + subscriptions.for_each(|(_id, subscription)| { match subscription.params() { SubscriptionParams::Account(params) => { let notified = check_commitment_and_notify( @@ -756,10 +799,10 @@ impl RpcSubscriptions { false, ); - num_accounts_found += 1; + num_accounts_found.fetch_add(1, Ordering::Relaxed); if notified { - num_accounts_notified += 1; + num_accounts_notified.fetch_add(1, Ordering::Relaxed); } } SubscriptionParams::Logs(params) => { @@ -773,10 +816,10 @@ impl RpcSubscriptions { notifier, false, ); - num_logs_found += 1; + num_logs_found.fetch_add(1, Ordering::Relaxed); if notified { - num_logs_notified += 1; + num_logs_notified.fetch_add(1, Ordering::Relaxed); } } SubscriptionParams::Program(params) => { @@ -792,10 +835,10 @@ impl RpcSubscriptions { notifier, false, ); - num_programs_found += 1; + num_programs_found.fetch_add(1, Ordering::Relaxed); if notified { - num_programs_notified += 1; + num_programs_notified.fetch_add(1, Ordering::Relaxed); } } SubscriptionParams::Signature(params) => { @@ -811,65 +854,97 @@ impl RpcSubscriptions { notifier, true, // Unsubscribe. ); - num_signatures_found += 1; + num_signatures_found.fetch_add(1, Ordering::Relaxed); if notified { - num_signatures_notified += 1; + num_signatures_notified.fetch_add(1, Ordering::Relaxed); } } _ => error!("wrong subscription type in alps map"), } - } + }); total_time.stop(); - let total_notified = num_accounts_notified - + num_logs_notified - + num_programs_notified - + num_signatures_notified; + let total_notified = num_accounts_notified.load(Ordering::Relaxed) + + num_logs_notified.load(Ordering::Relaxed) + + num_programs_notified.load(Ordering::Relaxed) + + num_signatures_notified.load(Ordering::Relaxed); let total_ms = total_time.as_ms(); if total_notified > 0 || total_ms > 10 { debug!( "notified({}): accounts: {} / {} logs: {} / {} programs: {} / {} signatures: {} / {}", source, - num_accounts_found, - num_accounts_notified, - num_logs_found, - num_logs_notified, - num_programs_found, - num_programs_notified, - num_signatures_found, - num_signatures_notified, + num_accounts_found.load(Ordering::Relaxed), + num_accounts_notified.load(Ordering::Relaxed), + num_logs_found.load(Ordering::Relaxed), + num_logs_notified.load(Ordering::Relaxed), + num_programs_found.load(Ordering::Relaxed), + num_programs_notified.load(Ordering::Relaxed), + num_signatures_found.load(Ordering::Relaxed), + num_signatures_notified.load(Ordering::Relaxed), ); inc_new_counter_info!("rpc-subscription-notify-bank-or-gossip", total_notified); datapoint_info!( "rpc_subscriptions", ("source", source.to_string(), String), - ("num_account_subscriptions", num_accounts_found, i64), - ("num_account_pubkeys_notified", num_accounts_notified, i64), - ("num_logs_subscriptions", num_logs_found, i64), - ("num_logs_notified", num_logs_notified, i64), - ("num_program_subscriptions", num_programs_found, i64), - ("num_programs_notified", num_programs_notified, i64), - ("num_signature_subscriptions", num_signatures_found, i64), - ("num_signatures_notified", num_signatures_notified, i64), + ( + "num_account_subscriptions", + num_accounts_found.load(Ordering::Relaxed), + i64 + ), + ( + "num_account_pubkeys_notified", + num_accounts_notified.load(Ordering::Relaxed), + i64 + ), + ( + "num_logs_subscriptions", + num_logs_found.load(Ordering::Relaxed), + i64 + ), + ( + "num_logs_notified", + num_logs_notified.load(Ordering::Relaxed), + i64 + ), + ( + "num_program_subscriptions", + num_programs_found.load(Ordering::Relaxed), + i64 + ), + ( + "num_programs_notified", + num_programs_notified.load(Ordering::Relaxed), + i64 + ), + ( + "num_signature_subscriptions", + num_signatures_found.load(Ordering::Relaxed), + i64 + ), + ( + "num_signatures_notified", + num_signatures_notified.load(Ordering::Relaxed), + i64 + ), ("notifications_time", total_time.as_us() as i64, i64), ); inc_new_counter_info!( "rpc-subscription-counter-num_accounts_notified", - num_accounts_notified + num_accounts_notified.load(Ordering::Relaxed) ); inc_new_counter_info!( "rpc-subscription-counter-num_logs_notified", - num_logs_notified + num_logs_notified.load(Ordering::Relaxed) ); inc_new_counter_info!( "rpc-subscription-counter-num_programs_notified", - num_programs_notified + num_programs_notified.load(Ordering::Relaxed) ); inc_new_counter_info!( "rpc-subscription-counter-num_signatures_notified", - num_signatures_notified + num_signatures_notified.load(Ordering::Relaxed) ); } } diff --git a/validator/src/main.rs b/validator/src/main.rs index 368fa2d6a9..26a5694837 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1630,6 +1630,15 @@ pub fn main() { .default_value("30") .help("Number of seconds before timing out RPC requests backed by BigTable"), ) + .arg( + Arg::with_name("rpc_pubsub_worker_threads") + .long("rpc-pubsub-worker-threads") + .takes_value(true) + .value_name("NUMBER") + .validator(is_parsable::) + .default_value("4") + .help("PubSub worker threads"), + ) .arg( Arg::with_name("rpc_pubsub_enable_vote_subscription") .long("rpc-pubsub-enable-vote-subscription") @@ -1707,6 +1716,15 @@ pub fn main() { .help("The maximum total size of notifications that RPC PubSub will store \ across all connections."), ) + .arg( + Arg::with_name("rpc_pubsub_notification_threads") + .long("rpc-pubsub-notification-threads") + .takes_value(true) + .value_name("NUM_THREADS") + .validator(is_parsable::) + .help("The maximum number of threads that RPC PubSub will use \ + for generating notifications."), + ) .arg( Arg::with_name("rpc_send_transaction_retry_ms") .long("rpc-send-retry-ms") @@ -2429,6 +2447,8 @@ pub fn main() { "rpc_pubsub_queue_capacity_bytes", usize ), + worker_threads: value_t_or_exit!(matches, "rpc_pubsub_worker_threads", usize), + notification_threads: value_of(&matches, "rpc_pubsub_notification_threads"), }, voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode, wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),