diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index aa32ceeb87..58fe4d936e 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -275,6 +275,7 @@ impl ReplayStage { transaction_status_sender.clone(), &verify_recyclers, &mut heaviest_subtree_fork_choice, + &subscriptions, ); Self::report_memory(&allocated, "replay_active_banks", start); @@ -1101,6 +1102,7 @@ impl ReplayStage { transaction_status_sender: Option, verify_recyclers: &VerifyRecyclers, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, + subscriptions: &Arc, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -1170,6 +1172,7 @@ impl ReplayStage { bank.freeze(); heaviest_subtree_fork_choice .add_new_leaf_slot(bank.slot(), Some(bank.parent_slot())); + subscriptions.notify_frozen(bank.slot()); } else { trace!( "bank {} not completed tick_height: {}, max_tick_height: {}", diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 4104ee1400..df93120abc 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -64,6 +64,7 @@ enum NotificationEntry { Slot(SlotInfo), Vote(Vote), Root(Slot), + Frozen(Slot), Bank(CacheSlotInfo), Gossip(Slot), } @@ -72,6 +73,7 @@ impl std::fmt::Debug for NotificationEntry { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { NotificationEntry::Root(root) => write!(f, "Root({})", root), + NotificationEntry::Frozen(slot) => write!(f, "Frozen({})", slot), NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote), NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), NotificationEntry::Bank(cache_slot_info) => write!( @@ -227,6 +229,8 @@ fn filter_account_result( last_notified_slot: Slot, ) -> (Box>, Slot) { if let Some((account, fork)) = result { + // If fork < last_notified_slot this means that we last notified for a fork + // and should notify that the account state has been reverted. if fork != last_notified_slot { return (Box::new(iter::once(RpcAccount::encode(account))), fork); } @@ -647,6 +651,10 @@ impl RpcSubscriptions { self.enqueue_notification(NotificationEntry::Vote(vote.clone())); } + pub fn notify_frozen(&self, frozen_slot: Slot) { + self.enqueue_notification(NotificationEntry::Frozen(frozen_slot)); + } + pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap(); @@ -690,6 +698,7 @@ impl RpcSubscriptions { bank_forks: Arc>, last_checked_slots: Arc>>, ) { + let mut pending_gossip_notifications = HashSet::new(); loop { if exit.load(Ordering::Relaxed) { break; @@ -720,6 +729,12 @@ impl RpcSubscriptions { for (_, sink) in subscriptions.iter() { notifier.notify(root, sink); } + + // Prune old pending notifications + pending_gossip_notifications = pending_gossip_notifications + .into_iter() + .filter(|&s| s > root) + .collect(); } NotificationEntry::Bank(cache_slot_info) => { RpcSubscriptions::notify_accounts_programs_signatures( @@ -731,23 +746,36 @@ impl RpcSubscriptions { ¬ifier, ) } + NotificationEntry::Frozen(slot) => { + if pending_gossip_notifications.remove(&slot) { + Self::process_gossip_notification( + slot, + ¬ifier, + &subscriptions, + &bank_forks, + &last_checked_slots, + ); + } + } NotificationEntry::Gossip(slot) => { - let _ = last_checked_slots - .write() + let bank_frozen = bank_forks + .read() .unwrap() - .insert(CommitmentLevel::SingleGossip, slot); - let cache_slot_info = CacheSlotInfo { - highest_confirmed_slot: slot, - ..CacheSlotInfo::default() - }; - RpcSubscriptions::notify_accounts_programs_signatures( - &subscriptions.gossip_account_subscriptions, - &subscriptions.gossip_program_subscriptions, - &subscriptions.gossip_signature_subscriptions, - &bank_forks, - &cache_slot_info, - ¬ifier, - ) + .get(slot) + .filter(|b| b.is_frozen()) + .is_some(); + + if !bank_frozen { + pending_gossip_notifications.insert(slot); + } else { + Self::process_gossip_notification( + slot, + ¬ifier, + &subscriptions, + &bank_forks, + &last_checked_slots, + ); + } } }, Err(RecvTimeoutError::Timeout) => { @@ -761,6 +789,42 @@ impl RpcSubscriptions { } } + fn process_gossip_notification( + slot: Slot, + notifier: &RpcNotifier, + subscriptions: &Subscriptions, + bank_forks: &Arc>, + last_checked_slots: &Arc>>, + ) { + let mut last_checked_slots_lock = last_checked_slots.write().unwrap(); + let last_checked_slot = last_checked_slots_lock + .get(&CommitmentLevel::SingleGossip) + .cloned() + .unwrap_or_default(); + + if slot > last_checked_slot { + last_checked_slots_lock.insert(CommitmentLevel::SingleGossip, slot); + } else { + // Avoid sending stale or duplicate notifications + return; + } + + drop(last_checked_slots_lock); + + let cache_slot_info = CacheSlotInfo { + highest_confirmed_slot: slot, + ..CacheSlotInfo::default() + }; + RpcSubscriptions::notify_accounts_programs_signatures( + &subscriptions.gossip_account_subscriptions, + &subscriptions.gossip_program_subscriptions, + &subscriptions.gossip_signature_subscriptions, + &bank_forks, + &cache_slot_info, + ¬ifier, + ); + } + fn notify_accounts_programs_signatures( account_subscriptions: &Arc, program_subscriptions: &Arc, @@ -1380,6 +1444,8 @@ pub(crate) mod tests { let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); + let bank2 = Bank::new_from_parent(&bank0, &Pubkey::default(), 2); + bank_forks.write().unwrap().insert(bank2); let alice = Keypair::new(); let (subscriber0, _id_receiver, transport_receiver0) = @@ -1405,17 +1471,10 @@ pub(crate) mod tests { sub_id0.clone(), subscriber0, ); - let sub_id1 = SubscriptionId::Number(1 as u64); - subscriptions.add_account_subscription( - alice.pubkey(), - Some(CommitmentConfig::recent()), - sub_id1.clone(), - subscriber1, - ); assert!(subscriptions .subscriptions - .account_subscriptions + .gossip_account_subscriptions .read() .unwrap() .contains_key(&alice.pubkey())); @@ -1428,37 +1487,27 @@ pub(crate) mod tests { 16, &solana_budget_program::id(), ); + + // Add the transaction to the 1st bank and then freeze the bank + let bank1 = bank_forks.write().unwrap().get(1).cloned().unwrap(); + bank1.process_transaction(&tx).unwrap(); + bank1.freeze(); + + // Add the same transaction to the unfrozen 2nd bank bank_forks .write() .unwrap() - .get(1) + .get(2) .unwrap() .process_transaction(&tx) .unwrap(); - let mut cache_slot_info = CacheSlotInfo::default(); - cache_slot_info.current_slot = 1; - subscriptions.notify_subscribers(cache_slot_info); - let (response, _) = robust_poll_or_panic(transport_receiver1); - let expected = json!({ - "jsonrpc": "2.0", - "method": "accountNotification", - "params": { - "result": { - "context": { "slot": 1 }, - "value": { - "data": "1111111111111111", - "executable": false, - "lamports": 1, - "owner": "Budget1111111111111111111111111111111111111", - "rentEpoch": 1, - }, - }, - "subscription": 1, - } - }); - assert_eq!(serde_json::to_string(&expected).unwrap(), response); + // First, notify the unfrozen bank first to queue pending notification + subscriptions.notify_gossip_subscribers(2); + + // Now, notify the frozen bank and ensure its notifications are processed subscriptions.notify_gossip_subscribers(1); + let (response, _) = robust_poll_or_panic(transport_receiver0); let expected = json!({ "jsonrpc": "2.0", @@ -1478,18 +1527,41 @@ pub(crate) mod tests { } }); assert_eq!(serde_json::to_string(&expected).unwrap(), response); - subscriptions.remove_account_subscription(&sub_id0); - assert!(subscriptions - .subscriptions - .account_subscriptions - .read() - .unwrap() - .contains_key(&alice.pubkey())); + + let sub_id1 = SubscriptionId::Number(1 as u64); + subscriptions.add_account_subscription( + alice.pubkey(), + Some(CommitmentConfig::single_gossip()), + sub_id1.clone(), + subscriber1, + ); + + subscriptions.notify_frozen(2); + let (response, _) = robust_poll_or_panic(transport_receiver1); + let expected = json!({ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "context": { "slot": 2 }, + "value": { + "data": "1111111111111111", + "executable": false, + "lamports": 1, + "owner": "Budget1111111111111111111111111111111111111", + "rentEpoch": 1, + }, + }, + "subscription": 1, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); subscriptions.remove_account_subscription(&sub_id1); + assert!(!subscriptions .subscriptions - .account_subscriptions + .gossip_account_subscriptions .read() .unwrap() .contains_key(&alice.pubkey()));