From 1c97b31eaf3fe30f3634d39d2739306fcbe2e55c Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Fri, 14 Feb 2020 01:00:50 +0800 Subject: [PATCH] Retain signature subscriptions that haven't been notified (#8261) --- core/src/rpc_subscriptions.rs | 71 ++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index fa193e5393..edd543971c 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -17,7 +17,7 @@ use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender}; use std::thread::{Builder, JoinHandle}; use std::time::Duration; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, sync::{Arc, Mutex, RwLock}, }; @@ -113,11 +113,12 @@ fn check_confirmations_and_notify( bank_forks: &Arc>, bank_method: F, notify: N, -) where +) -> HashSet +where K: Eq + Hash + Clone + Copy, S: Clone + Serialize, F: Fn(&Bank, &K) -> X, - N: Fn(X, &Sink, u64), + N: Fn(X, &Sink, u64) -> bool, X: Clone + Serialize, { let current_ancestors = bank_forks @@ -127,8 +128,10 @@ fn check_confirmations_and_notify( .unwrap() .ancestors .clone(); + + let mut notified_set: HashSet = HashSet::new(); if let Some(hashmap) = subscriptions.get(hashmap_key) { - for (_bank_sub_id, (sink, confirmations)) in hashmap.iter() { + for (bank_sub_id, (sink, confirmations)) in hashmap.iter() { let desired_slot: Vec = current_ancestors .iter() .filter(|(_, &v)| v == *confirmations) @@ -150,30 +153,41 @@ fn check_confirmations_and_notify( .unwrap() .clone(); let result = bank_method(&desired_bank, hashmap_key); - notify(result, &sink, root); + if notify(result, &sink, root) { + notified_set.insert(bank_sub_id.clone()); + } } } } + notified_set } -fn notify_account(result: Option<(Account, Slot)>, sink: &Sink, root: Slot) { +fn notify_account(result: Option<(Account, Slot)>, sink: &Sink, root: Slot) -> bool { if let Some((account, fork)) = result { if fork >= root { sink.notify(Ok(RpcAccount::encode(account))).wait().unwrap(); + return true; } } + false } -fn notify_signature(result: Option, sink: &Sink, _root: Slot) +fn notify_signature(result: Option, sink: &Sink, _root: Slot) -> bool where S: Clone + Serialize, { if let Some(result) = result { sink.notify(Ok(result)).wait().unwrap(); + return true; } + false } -fn notify_program(accounts: Vec<(Pubkey, Account)>, sink: &Sink, _root: Slot) { +fn notify_program( + accounts: Vec<(Pubkey, Account)>, + sink: &Sink, + _root: Slot, +) -> bool { for (pubkey, account) in accounts.iter() { sink.notify(Ok(RpcKeyedAccount { pubkey: pubkey.to_string(), @@ -182,6 +196,7 @@ fn notify_program(accounts: Vec<(Pubkey, Account)>, sink: &Sink .wait() .unwrap(); } + !accounts.is_empty() } pub struct RpcSubscriptions { @@ -293,7 +308,7 @@ impl RpcSubscriptions { signature_subscriptions: Arc, ) { let mut subscriptions = signature_subscriptions.write().unwrap(); - check_confirmations_and_notify( + let notified_ids = check_confirmations_and_notify( &subscriptions, signature, current_slot, @@ -301,7 +316,12 @@ impl RpcSubscriptions { Bank::get_signature_status, notify_signature, ); - subscriptions.remove(&signature); + if let Some(subscription_ids) = subscriptions.get_mut(signature) { + subscription_ids.retain(|k, _| !notified_ids.contains(k)); + if subscription_ids.is_empty() { + subscriptions.remove(&signature); + } + } } pub fn add_account_subscription( @@ -637,6 +657,9 @@ pub(crate) mod tests { let alice = Keypair::new(); let tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 20, blockhash); let signature = tx.signatures[0]; + let unprocessed_tx = + system_transaction::transfer(&mint_keypair, &alice.pubkey(), 10, blockhash); + let not_ready_signature = unprocessed_tx.signatures[0]; bank_forks .write() .unwrap() @@ -648,16 +671,23 @@ pub(crate) mod tests { let (subscriber, _id_receiver, transport_receiver) = Subscriber::new_test("signatureNotification"); let sub_id = SubscriptionId::Number(0 as u64); + let remaining_sub_id = SubscriptionId::Number(1 as u64); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new(&exit); - subscriptions.add_signature_subscription(&signature, None, &sub_id, &sink); + subscriptions.add_signature_subscription(&signature, None, &sub_id, &sink.clone()); + subscriptions.add_signature_subscription( + ¬_ready_signature, + None, + &remaining_sub_id, + &sink.clone(), + ); - assert!(subscriptions - .signature_subscriptions - .read() - .unwrap() - .contains_key(&signature)); + { + let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); + assert!(sig_subs.contains_key(&signature)); + assert!(sig_subs.contains_key(¬_ready_signature)); + } subscriptions.notify_subscribers(0, &bank_forks); let response = robust_poll_or_panic(transport_receiver); @@ -670,12 +700,19 @@ pub(crate) mod tests { ); assert_eq!(expected, response); - subscriptions.remove_signature_subscription(&sub_id); + // Subscription should be automatically removed after notification assert!(!subscriptions .signature_subscriptions .read() .unwrap() .contains_key(&signature)); + + // Unprocessed signature subscription should not be removed + assert!(subscriptions + .signature_subscriptions + .read() + .unwrap() + .contains_key(¬_ready_signature)); } #[test]