Parallel notifications in RPC PubSub (#20543)

* generate rpc notifications in parallel

* Use multithreaded runtime for pubsub

* add metric for time since creation of rpc notification to queue

* measure notification entry processing

* fix: add n_threads config argument

* configure rayon thread pool for rpc notifications

* add config option for pubsub notification threads

* rename metric to created_to_queue_time_us

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>

* rename metric to notification_entry_processing_time_us

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>

* use value_of for rpc_pubsub_notification_threads parsing

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>

* rename threads to sol-sub-notif-N

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>

* fix rpc tests for TimestampedNotificationEntry

* rustfmt

* use rayon thread limit for rpc

Co-authored-by: Pavel Strakhov <p.strakhov@iconic.vc>
Co-authored-by: Alexander Polakov <a.polakov@zubr.io>
Co-authored-by: Nikita Podoliako <bananaelecitrus@gmail.com>
Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
This commit is contained in:
Pavel Strakhov
2021-11-01 09:17:24 +03:00
committed by GitHub
parent 484ead01ed
commit 9fabff5129
6 changed files with 225 additions and 106 deletions

24
Cargo.lock generated
View File

@ -94,9 +94,9 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]] [[package]]
name = "arrayvec" name = "arrayvec"
version = "0.7.1" version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4dc07131ffa69b8072d35f5007352af944213cde02545e2103680baed38fcd" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]] [[package]]
name = "ascii" name = "ascii"
@ -335,7 +335,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2607a74355ce2e252d0c483b2d8a348e1bba36036e786ccc2dcd777213c86ffd" checksum = "2607a74355ce2e252d0c483b2d8a348e1bba36036e786ccc2dcd777213c86ffd"
dependencies = [ dependencies = [
"arrayref", "arrayref",
"arrayvec 0.7.1", "arrayvec 0.7.2",
"cc", "cc",
"cfg-if 1.0.0", "cfg-if 1.0.0",
"constant_time_eq", "constant_time_eq",
@ -2807,9 +2807,9 @@ dependencies = [
[[package]] [[package]]
name = "parity-ws" name = "parity-ws"
version = "0.11.0" version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0ab8a461779bd022964cae2b4989fa9c99deb270bec162da2125ec03c09fcaa" checksum = "5983d3929ad50f12c3eb9a6743f19d691866ecd44da74c0a3308c3f8a56df0c6"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"bytes 0.4.12", "bytes 0.4.12",
@ -5505,6 +5505,7 @@ dependencies = [
"jsonrpc-ws-server", "jsonrpc-ws-server",
"libc", "libc",
"log 0.4.14", "log 0.4.14",
"rayon",
"regex", "regex",
"serde", "serde",
"serde_derive", "serde_derive",
@ -5522,6 +5523,7 @@ dependencies = [
"solana-net-utils", "solana-net-utils",
"solana-perf", "solana-perf",
"solana-poh", "solana-poh",
"solana-rayon-threadlimit",
"solana-runtime", "solana-runtime",
"solana-sdk", "solana-sdk",
"solana-send-transaction-service", "solana-send-transaction-service",
@ -6574,9 +6576,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "1.5.0" version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2dd85aeaba7b68df939bd357c6afb36c87951be9e80bf9c859f2fc3e9fca0fd" checksum = "114383b041aa6212c579467afa0075fbbdd0718de036100bc0ba7961d8cb9095"
dependencies = [ dependencies = [
"proc-macro2 1.0.32", "proc-macro2 1.0.32",
"quote 1.0.10", "quote 1.0.10",
@ -6595,9 +6597,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-postgres" name = "tokio-postgres"
version = "0.7.4" version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5c07a6ceeeb8515d53998ac4487788a21884e79d5651490bc31a7289f20a7d7" checksum = "4b6c8b33df661b548dcd8f9bf87debb8c56c05657ed291122e1188698c2ece95"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"byteorder", "byteorder",
@ -6709,9 +6711,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.6.8" version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d3725d3efa29485e87311c5b699de63cde14b00ed4d256b8318aa30ca452cd" checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.1.0",
"futures-core", "futures-core",

View File

@ -24,6 +24,7 @@ jsonrpc-pubsub = "18.0.0"
jsonrpc-ws-server = "18.0.0" jsonrpc-ws-server = "18.0.0"
libc = "0.2.105" libc = "0.2.105"
log = "0.4.14" log = "0.4.14"
rayon = "1.5.1"
regex = "1.5.4" regex = "1.5.4"
serde = "1.0.130" serde = "1.0.130"
serde_derive = "1.0.103" serde_derive = "1.0.103"
@ -39,6 +40,7 @@ solana-measure = { path = "../measure", version = "=1.9.0" }
solana-metrics = { path = "../metrics", version = "=1.9.0" } solana-metrics = { path = "../metrics", version = "=1.9.0" }
solana-perf = { path = "../perf", version = "=1.9.0" } solana-perf = { path = "../perf", version = "=1.9.0" }
solana-poh = { path = "../poh", version = "=1.9.0" } solana-poh = { path = "../poh", version = "=1.9.0" }
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.9.0" }
solana-runtime = { path = "../runtime", version = "=1.9.0" } solana-runtime = { path = "../runtime", version = "=1.9.0" }
solana-sdk = { path = "../sdk", version = "=1.9.0" } solana-sdk = { path = "../sdk", version = "=1.9.0" }
solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.9.0" } solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.9.0" }

View File

@ -29,6 +29,7 @@ pub const MAX_ACTIVE_SUBSCRIPTIONS: usize = 1_000_000;
pub const DEFAULT_QUEUE_CAPACITY_ITEMS: usize = 10_000_000; pub const DEFAULT_QUEUE_CAPACITY_ITEMS: usize = 10_000_000;
pub const DEFAULT_TEST_QUEUE_CAPACITY_ITEMS: usize = 100; pub const DEFAULT_TEST_QUEUE_CAPACITY_ITEMS: usize = 100;
pub const DEFAULT_QUEUE_CAPACITY_BYTES: usize = 256 * 1024 * 1024; pub const DEFAULT_QUEUE_CAPACITY_BYTES: usize = 256 * 1024 * 1024;
pub const DEFAULT_WORKER_THREADS: usize = 1;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct PubSubConfig { pub struct PubSubConfig {
@ -36,6 +37,8 @@ pub struct PubSubConfig {
pub max_active_subscriptions: usize, pub max_active_subscriptions: usize,
pub queue_capacity_items: usize, pub queue_capacity_items: usize,
pub queue_capacity_bytes: usize, pub queue_capacity_bytes: usize,
pub worker_threads: usize,
pub notification_threads: Option<usize>,
} }
impl Default for PubSubConfig { impl Default for PubSubConfig {
@ -45,6 +48,8 @@ impl Default for PubSubConfig {
max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS, queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS,
queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES, queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES,
worker_threads: DEFAULT_WORKER_THREADS,
notification_threads: None,
} }
} }
} }
@ -56,6 +61,8 @@ impl PubSubConfig {
max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS, queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS,
queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES, queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES,
worker_threads: DEFAULT_WORKER_THREADS,
notification_threads: Some(2),
} }
} }
} }
@ -77,7 +84,8 @@ impl PubSubService {
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-pubsub".to_string()) .name("solana-pubsub".to_string())
.spawn(move || { .spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread() let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(pubsub_config.worker_threads)
.enable_all() .enable_all()
.build() .build()
.expect("runtime creation failed"); .expect("runtime creation failed");
@ -145,6 +153,17 @@ impl BroadcastHandler {
{ {
count_final(entry.get().params()); count_final(entry.get().params());
let time_since_created = notification.created_at.elapsed();
datapoint_info!(
"pubsub_notifications",
(
"created_to_queue_time_us",
time_since_created.as_micros() as i64,
i64
),
);
if notification.is_final { if notification.is_final {
entry.remove(); entry.remove();
} }

View File

@ -1,5 +1,5 @@
use { use {
crate::rpc_subscriptions::{NotificationEntry, RpcNotification}, crate::rpc_subscriptions::{NotificationEntry, RpcNotification, TimestampedNotificationEntry},
dashmap::{mapref::entry::Entry as DashEntry, DashMap}, dashmap::{mapref::entry::Entry as DashEntry, DashMap},
solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig}, solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig},
solana_client::rpc_filter::RpcFilterType, solana_client::rpc_filter::RpcFilterType,
@ -164,7 +164,7 @@ struct SubscriptionControlInner {
subscriptions: DashMap<SubscriptionParams, Weak<SubscriptionTokenInner>>, subscriptions: DashMap<SubscriptionParams, Weak<SubscriptionTokenInner>>,
next_id: AtomicU64, next_id: AtomicU64,
max_active_subscriptions: usize, max_active_subscriptions: usize,
sender: crossbeam_channel::Sender<NotificationEntry>, sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
broadcast_sender: broadcast::Sender<RpcNotification>, broadcast_sender: broadcast::Sender<RpcNotification>,
counter: TokenCounter, counter: TokenCounter,
} }
@ -172,7 +172,7 @@ struct SubscriptionControlInner {
impl SubscriptionControl { impl SubscriptionControl {
pub fn new( pub fn new(
max_active_subscriptions: usize, max_active_subscriptions: usize,
sender: crossbeam_channel::Sender<NotificationEntry>, sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
broadcast_sender: broadcast::Sender<RpcNotification>, broadcast_sender: broadcast::Sender<RpcNotification>,
) -> Self { ) -> Self {
Self(Arc::new(SubscriptionControlInner { Self(Arc::new(SubscriptionControlInner {
@ -220,7 +220,7 @@ impl SubscriptionControl {
let _ = self let _ = self
.0 .0
.sender .sender
.send(NotificationEntry::Subscribed(token.0.params.clone(), id)); .send(NotificationEntry::Subscribed(token.0.params.clone(), id).into());
entry.insert(Arc::downgrade(&token.0)); entry.insert(Arc::downgrade(&token.0));
datapoint_info!( datapoint_info!(
"rpc-subscription", "rpc-subscription",
@ -506,10 +506,10 @@ impl Drop for SubscriptionTokenInner {
warn!("Subscriptions inconsistency (missing entry in by_params)"); warn!("Subscriptions inconsistency (missing entry in by_params)");
} }
DashEntry::Occupied(entry) => { DashEntry::Occupied(entry) => {
let _ = self.control.sender.send(NotificationEntry::Unsubscribed( let _ = self
self.params.clone(), .control
self.id, .sender
)); .send(NotificationEntry::Unsubscribed(self.params.clone(), self.id).into());
entry.remove(); entry.remove();
datapoint_info!( datapoint_info!(
"rpc-subscription", "rpc-subscription",
@ -543,7 +543,7 @@ mod tests {
struct ControlWrapper { struct ControlWrapper {
control: SubscriptionControl, control: SubscriptionControl,
receiver: crossbeam_channel::Receiver<NotificationEntry>, receiver: crossbeam_channel::Receiver<TimestampedNotificationEntry>,
} }
impl ControlWrapper { impl ControlWrapper {
@ -560,7 +560,7 @@ mod tests {
} }
fn assert_subscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) { fn assert_subscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
if let NotificationEntry::Subscribed(params, id) = self.receiver.recv().unwrap() { if let NotificationEntry::Subscribed(params, id) = self.receiver.recv().unwrap().entry {
assert_eq!(&params, expected_params); assert_eq!(&params, expected_params);
assert_eq!(id, SubscriptionId::from(expected_id)); assert_eq!(id, SubscriptionId::from(expected_id));
} else { } else {
@ -570,7 +570,8 @@ mod tests {
} }
fn assert_unsubscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) { fn assert_unsubscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
if let NotificationEntry::Unsubscribed(params, id) = self.receiver.recv().unwrap() { if let NotificationEntry::Unsubscribed(params, id) = self.receiver.recv().unwrap().entry
{
assert_eq!(&params, expected_params); assert_eq!(&params, expected_params);
assert_eq!(id, SubscriptionId::from(expected_id)); assert_eq!(id, SubscriptionId::from(expected_id));
} else { } else {

View File

@ -12,6 +12,7 @@ use {
}, },
}, },
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
rayon::prelude::*,
serde::Serialize, serde::Serialize,
solana_account_decoder::{parse_token::spl_token_id_v2_0, UiAccount, UiAccountEncoding}, solana_account_decoder::{parse_token::spl_token_id_v2_0, UiAccount, UiAccountEncoding},
solana_client::{ solana_client::{
@ -22,6 +23,7 @@ use {
}, },
}, },
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{ solana_runtime::{
bank::{Bank, TransactionLogInfo}, bank::{Bank, TransactionLogInfo},
bank_forks::BankForks, bank_forks::BankForks,
@ -37,15 +39,17 @@ use {
}, },
solana_vote_program::vote_state::Vote, solana_vote_program::vote_state::Vote,
std::{ std::{
cell::RefCell,
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
io::Cursor, io::Cursor,
iter, str, iter, str,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, RwLock, Weak, Arc, Mutex, RwLock, Weak,
}, },
thread::{Builder, JoinHandle}, thread::{Builder, JoinHandle},
time::Duration, time::Duration,
time::Instant,
}, },
tokio::sync::broadcast, tokio::sync::broadcast,
}; };
@ -69,6 +73,21 @@ fn get_transaction_logs(
} }
logs logs
} }
#[derive(Debug)]
pub struct TimestampedNotificationEntry {
pub entry: NotificationEntry,
pub queued_at: Instant,
}
impl From<NotificationEntry> for TimestampedNotificationEntry {
fn from(entry: NotificationEntry) -> Self {
TimestampedNotificationEntry {
entry,
queued_at: Instant::now(),
}
}
}
pub enum NotificationEntry { pub enum NotificationEntry {
Slot(SlotInfo), Slot(SlotInfo),
SlotUpdate(SlotUpdate), SlotUpdate(SlotUpdate),
@ -115,7 +134,7 @@ fn check_commitment_and_notify<P, S, B, F, X>(
commitment_slots: &CommitmentSlots, commitment_slots: &CommitmentSlots,
bank_method: B, bank_method: B,
filter_results: F, filter_results: F,
notifier: &mut RpcNotifier, notifier: &RpcNotifier,
is_final: bool, is_final: bool,
) -> bool ) -> bool
where where
@ -165,6 +184,7 @@ pub struct RpcNotification {
pub subscription_id: SubscriptionId, pub subscription_id: SubscriptionId,
pub is_final: bool, pub is_final: bool,
pub json: Weak<String>, pub json: Weak<String>,
pub created_at: Instant,
} }
struct RecentItems { struct RecentItems {
@ -209,8 +229,11 @@ impl RecentItems {
struct RpcNotifier { struct RpcNotifier {
sender: broadcast::Sender<RpcNotification>, sender: broadcast::Sender<RpcNotification>,
buf: Vec<u8>, recent_items: Mutex<RecentItems>,
recent_items: RecentItems, }
thread_local! {
static RPC_NOTIFIER_BUF: RefCell<Vec<u8>> = RefCell::new(Vec::new());
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
@ -227,28 +250,32 @@ struct Notification<T> {
} }
impl RpcNotifier { impl RpcNotifier {
fn notify<T>(&mut self, value: T, subscription: &SubscriptionInfo, is_final: bool) fn notify<T>(&self, value: T, subscription: &SubscriptionInfo, is_final: bool)
where where
T: serde::Serialize, T: serde::Serialize,
{ {
self.buf.clear(); let buf_arc = RPC_NOTIFIER_BUF.with(|buf| {
let notification = Notification { let mut buf = buf.borrow_mut();
jsonrpc: Some(jsonrpc_core::Version::V2), buf.clear();
method: subscription.method(), let notification = Notification {
params: NotificationParams { jsonrpc: Some(jsonrpc_core::Version::V2),
result: value, method: subscription.method(),
subscription: subscription.id(), params: NotificationParams {
}, result: value,
}; subscription: subscription.id(),
serde_json::to_writer(Cursor::new(&mut self.buf), &notification) },
.expect("serialization never fails"); };
let buf_str = str::from_utf8(&self.buf).expect("json is always utf-8"); serde_json::to_writer(Cursor::new(&mut *buf), &notification)
let buf_arc = Arc::new(String::from(buf_str)); .expect("serialization never fails");
let buf_str = str::from_utf8(&buf).expect("json is always utf-8");
Arc::new(String::from(buf_str))
});
let notification = RpcNotification { let notification = RpcNotification {
subscription_id: subscription.id(), subscription_id: subscription.id(),
json: Arc::downgrade(&buf_arc), json: Arc::downgrade(&buf_arc),
is_final, is_final,
created_at: Instant::now(),
}; };
// There is an unlikely case where this can fail: if the last subscription is closed // There is an unlikely case where this can fail: if the last subscription is closed
// just as the notifier generates a notification for it. // just as the notifier generates a notification for it.
@ -257,7 +284,7 @@ impl RpcNotifier {
inc_new_counter_info!("rpc-pubsub-messages", 1); inc_new_counter_info!("rpc-pubsub-messages", 1);
inc_new_counter_info!("rpc-pubsub-bytes", buf_arc.len()); inc_new_counter_info!("rpc-pubsub-bytes", buf_arc.len());
self.recent_items.push(buf_arc); self.recent_items.lock().unwrap().push(buf_arc);
} }
} }
@ -405,8 +432,7 @@ fn initial_last_notified_slot(
} }
pub struct RpcSubscriptions { pub struct RpcSubscriptions {
notification_sender: Sender<NotificationEntry>, notification_sender: Sender<TimestampedNotificationEntry>,
t_cleanup: Option<JoinHandle<()>>, t_cleanup: Option<JoinHandle<()>>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
@ -468,24 +494,31 @@ impl RpcSubscriptions {
let notifier = RpcNotifier { let notifier = RpcNotifier {
sender: broadcast_sender.clone(), sender: broadcast_sender.clone(),
buf: Vec::new(), recent_items: Mutex::new(RecentItems::new(
recent_items: RecentItems::new(
config.queue_capacity_items, config.queue_capacity_items,
config.queue_capacity_bytes, config.queue_capacity_bytes,
), )),
}; };
let notification_threads = config.notification_threads;
let t_cleanup = Builder::new() let t_cleanup = Builder::new()
.name("solana-rpc-notifications".to_string()) .name("solana-rpc-notifications".to_string())
.spawn(move || { .spawn(move || {
Self::process_notifications( let pool = rayon::ThreadPoolBuilder::new()
exit_clone, .num_threads(notification_threads.unwrap_or_else(get_thread_count))
notifier, .thread_name(|i| format!("sol-sub-notif-{}", i))
notification_receiver, .build()
subscriptions, .unwrap();
bank_forks, pool.install(|| {
block_commitment_cache, Self::process_notifications(
optimistically_confirmed_bank, exit_clone,
); notifier,
notification_receiver,
subscriptions,
bank_forks,
block_commitment_cache,
optimistically_confirmed_bank,
)
});
}) })
.unwrap(); .unwrap();
@ -565,7 +598,7 @@ impl RpcSubscriptions {
} }
fn enqueue_notification(&self, notification_entry: NotificationEntry) { fn enqueue_notification(&self, notification_entry: NotificationEntry) {
match self.notification_sender.send(notification_entry) { match self.notification_sender.send(notification_entry.into()) {
Ok(()) => (), Ok(()) => (),
Err(SendError(notification)) => { Err(SendError(notification)) => {
warn!( warn!(
@ -578,8 +611,8 @@ impl RpcSubscriptions {
fn process_notifications( fn process_notifications(
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
mut notifier: RpcNotifier, notifier: RpcNotifier,
notification_receiver: Receiver<NotificationEntry>, notification_receiver: Receiver<TimestampedNotificationEntry>,
mut subscriptions: SubscriptionsTracker, mut subscriptions: SubscriptionsTracker,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
@ -591,7 +624,8 @@ impl RpcSubscriptions {
} }
match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) { match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) {
Ok(notification_entry) => { Ok(notification_entry) => {
match notification_entry { let TimestampedNotificationEntry { entry, queued_at } = notification_entry;
match entry {
NotificationEntry::Subscribed(params, id) => { NotificationEntry::Subscribed(params, id) => {
subscriptions.subscribe(params.clone(), id, || { subscriptions.subscribe(params.clone(), id, || {
initial_last_notified_slot( initial_last_notified_slot(
@ -658,7 +692,7 @@ impl RpcSubscriptions {
subscriptions.commitment_watchers(), subscriptions.commitment_watchers(),
&bank_forks, &bank_forks,
&commitment_slots, &commitment_slots,
&mut notifier, &notifier,
"bank", "bank",
) )
} }
@ -672,7 +706,7 @@ impl RpcSubscriptions {
subscriptions.gossip_watchers(), subscriptions.gossip_watchers(),
&bank_forks, &bank_forks,
&commitment_slots, &commitment_slots,
&mut notifier, &notifier,
"gossip", "gossip",
) )
} }
@ -704,6 +738,14 @@ impl RpcSubscriptions {
} }
} }
} }
datapoint_info!(
"pubsub_notification_entries",
(
"notification_entry_processing_time_us",
queued_at.elapsed().as_micros() as i64,
i64
)
);
} }
Err(RecvTimeoutError::Timeout) => { Err(RecvTimeoutError::Timeout) => {
// not a problem - try reading again // not a problem - try reading again
@ -720,23 +762,24 @@ impl RpcSubscriptions {
subscriptions: &HashMap<SubscriptionId, Arc<SubscriptionInfo>>, subscriptions: &HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
commitment_slots: &CommitmentSlots, commitment_slots: &CommitmentSlots,
notifier: &mut RpcNotifier, notifier: &RpcNotifier,
source: &'static str, source: &'static str,
) { ) {
let mut total_time = Measure::start("notify_accounts_logs_programs_signatures"); let mut total_time = Measure::start("notify_accounts_logs_programs_signatures");
let mut num_accounts_found = 0; let num_accounts_found = AtomicUsize::new(0);
let mut num_accounts_notified = 0; let num_accounts_notified = AtomicUsize::new(0);
let mut num_logs_found = 0; let num_logs_found = AtomicUsize::new(0);
let mut num_logs_notified = 0; let num_logs_notified = AtomicUsize::new(0);
let mut num_signatures_found = 0; let num_signatures_found = AtomicUsize::new(0);
let mut num_signatures_notified = 0; let num_signatures_notified = AtomicUsize::new(0);
let mut num_programs_found = 0; let num_programs_found = AtomicUsize::new(0);
let mut num_programs_notified = 0; let num_programs_notified = AtomicUsize::new(0);
for subscription in subscriptions.values() { let subscriptions = subscriptions.into_par_iter();
subscriptions.for_each(|(_id, subscription)| {
match subscription.params() { match subscription.params() {
SubscriptionParams::Account(params) => { SubscriptionParams::Account(params) => {
let notified = check_commitment_and_notify( let notified = check_commitment_and_notify(
@ -750,10 +793,10 @@ impl RpcSubscriptions {
false, false,
); );
num_accounts_found += 1; num_accounts_found.fetch_add(1, Ordering::Relaxed);
if notified { if notified {
num_accounts_notified += 1; num_accounts_notified.fetch_add(1, Ordering::Relaxed);
} }
} }
SubscriptionParams::Logs(params) => { SubscriptionParams::Logs(params) => {
@ -767,10 +810,10 @@ impl RpcSubscriptions {
notifier, notifier,
false, false,
); );
num_logs_found += 1; num_logs_found.fetch_add(1, Ordering::Relaxed);
if notified { if notified {
num_logs_notified += 1; num_logs_notified.fetch_add(1, Ordering::Relaxed);
} }
} }
SubscriptionParams::Program(params) => { SubscriptionParams::Program(params) => {
@ -786,10 +829,10 @@ impl RpcSubscriptions {
notifier, notifier,
false, false,
); );
num_programs_found += 1; num_programs_found.fetch_add(1, Ordering::Relaxed);
if notified { if notified {
num_programs_notified += 1; num_programs_notified.fetch_add(1, Ordering::Relaxed);
} }
} }
SubscriptionParams::Signature(params) => { SubscriptionParams::Signature(params) => {
@ -805,65 +848,97 @@ impl RpcSubscriptions {
notifier, notifier,
true, // Unsubscribe. true, // Unsubscribe.
); );
num_signatures_found += 1; num_signatures_found.fetch_add(1, Ordering::Relaxed);
if notified { if notified {
num_signatures_notified += 1; num_signatures_notified.fetch_add(1, Ordering::Relaxed);
} }
} }
_ => error!("wrong subscription type in alps map"), _ => error!("wrong subscription type in alps map"),
} }
} });
total_time.stop(); total_time.stop();
let total_notified = num_accounts_notified let total_notified = num_accounts_notified.load(Ordering::Relaxed)
+ num_logs_notified + num_logs_notified.load(Ordering::Relaxed)
+ num_programs_notified + num_programs_notified.load(Ordering::Relaxed)
+ num_signatures_notified; + num_signatures_notified.load(Ordering::Relaxed);
let total_ms = total_time.as_ms(); let total_ms = total_time.as_ms();
if total_notified > 0 || total_ms > 10 { if total_notified > 0 || total_ms > 10 {
debug!( debug!(
"notified({}): accounts: {} / {} logs: {} / {} programs: {} / {} signatures: {} / {}", "notified({}): accounts: {} / {} logs: {} / {} programs: {} / {} signatures: {} / {}",
source, source,
num_accounts_found, num_accounts_found.load(Ordering::Relaxed),
num_accounts_notified, num_accounts_notified.load(Ordering::Relaxed),
num_logs_found, num_logs_found.load(Ordering::Relaxed),
num_logs_notified, num_logs_notified.load(Ordering::Relaxed),
num_programs_found, num_programs_found.load(Ordering::Relaxed),
num_programs_notified, num_programs_notified.load(Ordering::Relaxed),
num_signatures_found, num_signatures_found.load(Ordering::Relaxed),
num_signatures_notified, num_signatures_notified.load(Ordering::Relaxed),
); );
inc_new_counter_info!("rpc-subscription-notify-bank-or-gossip", total_notified); inc_new_counter_info!("rpc-subscription-notify-bank-or-gossip", total_notified);
datapoint_info!( datapoint_info!(
"rpc_subscriptions", "rpc_subscriptions",
("source", source.to_string(), String), ("source", source.to_string(), String),
("num_account_subscriptions", num_accounts_found, i64), (
("num_account_pubkeys_notified", num_accounts_notified, i64), "num_account_subscriptions",
("num_logs_subscriptions", num_logs_found, i64), num_accounts_found.load(Ordering::Relaxed),
("num_logs_notified", num_logs_notified, i64), i64
("num_program_subscriptions", num_programs_found, i64), ),
("num_programs_notified", num_programs_notified, i64), (
("num_signature_subscriptions", num_signatures_found, i64), "num_account_pubkeys_notified",
("num_signatures_notified", num_signatures_notified, i64), num_accounts_notified.load(Ordering::Relaxed),
i64
),
(
"num_logs_subscriptions",
num_logs_found.load(Ordering::Relaxed),
i64
),
(
"num_logs_notified",
num_logs_notified.load(Ordering::Relaxed),
i64
),
(
"num_program_subscriptions",
num_programs_found.load(Ordering::Relaxed),
i64
),
(
"num_programs_notified",
num_programs_notified.load(Ordering::Relaxed),
i64
),
(
"num_signature_subscriptions",
num_signatures_found.load(Ordering::Relaxed),
i64
),
(
"num_signatures_notified",
num_signatures_notified.load(Ordering::Relaxed),
i64
),
("notifications_time", total_time.as_us() as i64, i64), ("notifications_time", total_time.as_us() as i64, i64),
); );
inc_new_counter_info!( inc_new_counter_info!(
"rpc-subscription-counter-num_accounts_notified", "rpc-subscription-counter-num_accounts_notified",
num_accounts_notified num_accounts_notified.load(Ordering::Relaxed)
); );
inc_new_counter_info!( inc_new_counter_info!(
"rpc-subscription-counter-num_logs_notified", "rpc-subscription-counter-num_logs_notified",
num_logs_notified num_logs_notified.load(Ordering::Relaxed)
); );
inc_new_counter_info!( inc_new_counter_info!(
"rpc-subscription-counter-num_programs_notified", "rpc-subscription-counter-num_programs_notified",
num_programs_notified num_programs_notified.load(Ordering::Relaxed)
); );
inc_new_counter_info!( inc_new_counter_info!(
"rpc-subscription-counter-num_signatures_notified", "rpc-subscription-counter-num_signatures_notified",
num_signatures_notified num_signatures_notified.load(Ordering::Relaxed)
); );
} }
} }

View File

@ -1179,6 +1179,15 @@ pub fn main() {
.default_value("30") .default_value("30")
.help("Number of seconds before timing out RPC requests backed by BigTable"), .help("Number of seconds before timing out RPC requests backed by BigTable"),
) )
.arg(
Arg::with_name("rpc_pubsub_worker_threads")
.long("rpc-pubsub-worker-threads")
.takes_value(true)
.value_name("NUMBER")
.validator(is_parsable::<usize>)
.default_value("4")
.help("PubSub worker threads"),
)
.arg( .arg(
Arg::with_name("rpc_pubsub_enable_vote_subscription") Arg::with_name("rpc_pubsub_enable_vote_subscription")
.long("rpc-pubsub-enable-vote-subscription") .long("rpc-pubsub-enable-vote-subscription")
@ -1256,6 +1265,15 @@ pub fn main() {
.help("The maximum total size of notifications that RPC PubSub will store \ .help("The maximum total size of notifications that RPC PubSub will store \
across all connections."), across all connections."),
) )
.arg(
Arg::with_name("rpc_pubsub_notification_threads")
.long("rpc-pubsub-notification-threads")
.takes_value(true)
.value_name("NUM_THREADS")
.validator(is_parsable::<usize>)
.help("The maximum number of threads that RPC PubSub will use \
for generating notifications."),
)
.arg( .arg(
Arg::with_name("rpc_send_transaction_retry_ms") Arg::with_name("rpc_send_transaction_retry_ms")
.long("rpc-send-retry-ms") .long("rpc-send-retry-ms")
@ -2194,6 +2212,8 @@ pub fn main() {
"rpc_pubsub_queue_capacity_bytes", "rpc_pubsub_queue_capacity_bytes",
usize usize
), ),
worker_threads: value_t_or_exit!(matches, "rpc_pubsub_worker_threads", usize),
notification_threads: value_of(&matches, "rpc_pubsub_notification_threads"),
}, },
voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode, voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(), wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),