Trigger RPC notifications after block commitment cache update (#10077)
* Fixup commitment-aggregation metric * Trigger notifications after commitment-cache update * Fixup fn name * Add single-confirmation commitment level * Rename to highest_confirmed_slot * Pass commitment-cache info directly to notifications * Use match * Update commitment docs * Update out of date pubsub docs
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
//! The `pubsub` module implements a threaded subscription service on client RPC request
|
||||
|
||||
use crate::commitment::BlockCommitmentCache;
|
||||
use crate::commitment::{BlockCommitmentCache, CacheSlotInfo};
|
||||
use core::hash::Hash;
|
||||
use jsonrpc_core::futures::Future;
|
||||
use jsonrpc_pubsub::{
|
||||
@@ -56,7 +56,7 @@ enum NotificationEntry {
|
||||
Slot(SlotInfo),
|
||||
Vote(Vote),
|
||||
Root(Slot),
|
||||
Bank(Slot),
|
||||
Bank(CacheSlotInfo),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for NotificationEntry {
|
||||
@@ -65,9 +65,11 @@ impl std::fmt::Debug for NotificationEntry {
|
||||
NotificationEntry::Root(root) => write!(f, "Root({})", root),
|
||||
NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote),
|
||||
NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info),
|
||||
NotificationEntry::Bank(current_slot) => {
|
||||
write!(f, "Bank({{current_slot: {:?}}})", current_slot)
|
||||
}
|
||||
NotificationEntry::Bank(cache_slot_info) => write!(
|
||||
f,
|
||||
"Bank({{current_slot: {:?}}})",
|
||||
cache_slot_info.current_slot
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -142,7 +144,7 @@ fn check_commitment_and_notify<K, S, B, F, X>(
|
||||
subscriptions: &HashMap<K, HashMap<SubscriptionId, SubscriptionData<Response<S>>>>,
|
||||
hashmap_key: &K,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
|
||||
cache_slot_info: &CacheSlotInfo,
|
||||
bank_method: B,
|
||||
filter_results: F,
|
||||
notifier: &RpcNotifier,
|
||||
@@ -154,12 +156,6 @@ where
|
||||
F: Fn(X, Slot) -> (Box<dyn Iterator<Item = S>>, Slot),
|
||||
X: Clone + Serialize + Default,
|
||||
{
|
||||
let r_block_commitment_cache = block_commitment_cache.read().unwrap();
|
||||
let current_slot = r_block_commitment_cache.slot();
|
||||
let node_root = r_block_commitment_cache.root();
|
||||
let largest_confirmed_root = r_block_commitment_cache.largest_confirmed_root();
|
||||
drop(r_block_commitment_cache);
|
||||
|
||||
let mut notified_set: HashSet<SubscriptionId> = HashSet::new();
|
||||
if let Some(hashmap) = subscriptions.get(hashmap_key) {
|
||||
for (
|
||||
@@ -172,9 +168,10 @@ where
|
||||
) in hashmap.iter()
|
||||
{
|
||||
let slot = match commitment.commitment {
|
||||
CommitmentLevel::Max => largest_confirmed_root,
|
||||
CommitmentLevel::Recent => current_slot,
|
||||
CommitmentLevel::Root => node_root,
|
||||
CommitmentLevel::Max => cache_slot_info.largest_confirmed_root,
|
||||
CommitmentLevel::Recent => cache_slot_info.current_slot,
|
||||
CommitmentLevel::Root => cache_slot_info.node_root,
|
||||
CommitmentLevel::Single => cache_slot_info.highest_confirmed_slot,
|
||||
};
|
||||
let results = {
|
||||
let bank_forks = bank_forks.read().unwrap();
|
||||
@@ -332,7 +329,6 @@ impl RpcSubscriptions {
|
||||
notification_receiver,
|
||||
_subscriptions,
|
||||
_bank_forks,
|
||||
_block_commitment_cache,
|
||||
);
|
||||
})
|
||||
.unwrap();
|
||||
@@ -364,16 +360,16 @@ impl RpcSubscriptions {
|
||||
fn check_account(
|
||||
pubkey: &Pubkey,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
|
||||
account_subscriptions: Arc<RpcAccountSubscriptions>,
|
||||
notifier: &RpcNotifier,
|
||||
cache_slot_info: &CacheSlotInfo,
|
||||
) {
|
||||
let subscriptions = account_subscriptions.read().unwrap();
|
||||
check_commitment_and_notify(
|
||||
&subscriptions,
|
||||
pubkey,
|
||||
bank_forks,
|
||||
block_commitment_cache,
|
||||
cache_slot_info,
|
||||
Bank::get_account_modified_slot,
|
||||
filter_account_result,
|
||||
notifier,
|
||||
@@ -383,16 +379,16 @@ impl RpcSubscriptions {
|
||||
fn check_program(
|
||||
program_id: &Pubkey,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
|
||||
program_subscriptions: Arc<RpcProgramSubscriptions>,
|
||||
notifier: &RpcNotifier,
|
||||
cache_slot_info: &CacheSlotInfo,
|
||||
) {
|
||||
let subscriptions = program_subscriptions.read().unwrap();
|
||||
check_commitment_and_notify(
|
||||
&subscriptions,
|
||||
program_id,
|
||||
bank_forks,
|
||||
block_commitment_cache,
|
||||
cache_slot_info,
|
||||
Bank::get_program_accounts_modified_since_parent,
|
||||
filter_program_results,
|
||||
notifier,
|
||||
@@ -402,16 +398,16 @@ impl RpcSubscriptions {
|
||||
fn check_signature(
|
||||
signature: &Signature,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
|
||||
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
|
||||
notifier: &RpcNotifier,
|
||||
cache_slot_info: &CacheSlotInfo,
|
||||
) {
|
||||
let mut subscriptions = signature_subscriptions.write().unwrap();
|
||||
let notified_ids = check_commitment_and_notify(
|
||||
&subscriptions,
|
||||
signature,
|
||||
bank_forks,
|
||||
block_commitment_cache,
|
||||
cache_slot_info,
|
||||
Bank::get_signature_status_processed_since_parent,
|
||||
filter_signature_result,
|
||||
notifier,
|
||||
@@ -443,6 +439,11 @@ impl RpcSubscriptions {
|
||||
.largest_confirmed_root(),
|
||||
CommitmentLevel::Recent => self.block_commitment_cache.read().unwrap().slot(),
|
||||
CommitmentLevel::Root => self.block_commitment_cache.read().unwrap().root(),
|
||||
CommitmentLevel::Single => self
|
||||
.block_commitment_cache
|
||||
.read()
|
||||
.unwrap()
|
||||
.highest_confirmed_slot(),
|
||||
};
|
||||
let last_notified_slot = if let Some((_account, slot)) = self
|
||||
.bank_forks
|
||||
@@ -518,8 +519,8 @@ impl RpcSubscriptions {
|
||||
|
||||
/// Notify subscribers of changes to any accounts or new signatures since
|
||||
/// the bank's last checkpoint.
|
||||
pub fn notify_subscribers(&self, current_slot: Slot) {
|
||||
self.enqueue_notification(NotificationEntry::Bank(current_slot));
|
||||
pub fn notify_subscribers(&self, cache_slot_info: CacheSlotInfo) {
|
||||
self.enqueue_notification(NotificationEntry::Bank(cache_slot_info));
|
||||
}
|
||||
|
||||
pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<SlotInfo>) {
|
||||
@@ -593,7 +594,6 @@ impl RpcSubscriptions {
|
||||
notification_receiver: Receiver<NotificationEntry>,
|
||||
subscriptions: Subscriptions,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||
) {
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
@@ -626,7 +626,7 @@ impl RpcSubscriptions {
|
||||
notifier.notify(root, sink);
|
||||
}
|
||||
}
|
||||
NotificationEntry::Bank(_current_slot) => {
|
||||
NotificationEntry::Bank(cache_slot_info) => {
|
||||
let pubkeys: Vec<_> = {
|
||||
let subs = subscriptions.account_subscriptions.read().unwrap();
|
||||
subs.keys().cloned().collect()
|
||||
@@ -635,9 +635,9 @@ impl RpcSubscriptions {
|
||||
Self::check_account(
|
||||
pubkey,
|
||||
&bank_forks,
|
||||
&block_commitment_cache,
|
||||
subscriptions.account_subscriptions.clone(),
|
||||
¬ifier,
|
||||
&cache_slot_info,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -649,9 +649,9 @@ impl RpcSubscriptions {
|
||||
Self::check_program(
|
||||
program_id,
|
||||
&bank_forks,
|
||||
&block_commitment_cache,
|
||||
subscriptions.program_subscriptions.clone(),
|
||||
¬ifier,
|
||||
&cache_slot_info,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -663,9 +663,9 @@ impl RpcSubscriptions {
|
||||
Self::check_signature(
|
||||
signature,
|
||||
&bank_forks,
|
||||
&block_commitment_cache,
|
||||
subscriptions.signature_subscriptions.clone(),
|
||||
¬ifier,
|
||||
&cache_slot_info,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -802,7 +802,9 @@ pub(crate) mod tests {
|
||||
.unwrap()
|
||||
.process_transaction(&tx)
|
||||
.unwrap();
|
||||
subscriptions.notify_subscribers(1);
|
||||
let mut cache_slot_info = CacheSlotInfo::default();
|
||||
cache_slot_info.current_slot = 1;
|
||||
subscriptions.notify_subscribers(cache_slot_info);
|
||||
let (response, _) = robust_poll_or_panic(transport_receiver);
|
||||
let expected = json!({
|
||||
"jsonrpc": "2.0",
|
||||
@@ -887,7 +889,7 @@ pub(crate) mod tests {
|
||||
.unwrap()
|
||||
.contains_key(&solana_budget_program::id()));
|
||||
|
||||
subscriptions.notify_subscribers(0);
|
||||
subscriptions.notify_subscribers(CacheSlotInfo::default());
|
||||
let (response, _) = robust_poll_or_panic(transport_receiver);
|
||||
let expected = json!({
|
||||
"jsonrpc": "2.0",
|
||||
@@ -969,7 +971,7 @@ pub(crate) mod tests {
|
||||
block_commitment.entry(0).or_insert(cache0);
|
||||
block_commitment.entry(1).or_insert(cache1);
|
||||
let block_commitment_cache =
|
||||
BlockCommitmentCache::new(block_commitment, 0, 10, bank1, blockstore, 0);
|
||||
BlockCommitmentCache::new(block_commitment, 0, 10, bank1, blockstore, 0, 0);
|
||||
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let subscriptions = RpcSubscriptions::new(
|
||||
@@ -1020,8 +1022,9 @@ pub(crate) mod tests {
|
||||
assert!(sig_subs.contains_key(&unprocessed_tx.signatures[0]));
|
||||
assert!(sig_subs.contains_key(&processed_tx.signatures[0]));
|
||||
}
|
||||
|
||||
subscriptions.notify_subscribers(1);
|
||||
let mut cache_slot_info = CacheSlotInfo::default();
|
||||
cache_slot_info.current_slot = 1;
|
||||
subscriptions.notify_subscribers(cache_slot_info);
|
||||
let expected_res = RpcSignatureResult { err: None };
|
||||
|
||||
struct Notification {
|
||||
|
Reference in New Issue
Block a user