diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 97d071cc55..c2a64c8683 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1311,7 +1311,8 @@ pub(crate) mod tests { let genesis_config = create_genesis_config(10_000).genesis_config; let bank0 = Bank::new(&genesis_config); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); - let subscriptions = Arc::new(RpcSubscriptions::default()); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); let bank_forks = BankForks::new(0, bank0); bank_forks.working_bank().freeze(); diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index dae247348a..f040e90b47 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -280,6 +280,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { mod tests { use super::*; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; + use crate::rpc_subscriptions::tests::robust_poll_or_panic; use jsonrpc_core::{futures::sync::mpsc, Response}; use jsonrpc_pubsub::{PubSubHandler, Session}; use solana_budget_program::{self, budget_instruction}; @@ -292,7 +293,6 @@ mod tests { transaction::{self, Transaction}, }; use std::{sync::RwLock, thread::sleep, time::Duration}; - use tokio::prelude::{Async, Stream}; fn process_transaction_and_notify( bank_forks: &Arc>, @@ -332,25 +332,21 @@ mod tests { let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash); let session = create_session(); - let (subscriber, _id_receiver, mut receiver) = - Subscriber::new_test("signatureNotification"); + let (subscriber, _id_receiver, receiver) = Subscriber::new_test("signatureNotification"); rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string(), None); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); - sleep(Duration::from_millis(200)); // Test signature confirmation notification - let string = receiver.poll(); - if let Async::Ready(Some(response)) = string.unwrap() { - let expected_res: Option> = Some(Ok(())); - let expected_res_str = - serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#, - expected_res_str - ); - assert_eq!(expected, response); - } + let response = robust_poll_or_panic(receiver); + let expected_res: Option> = Some(Ok(())); + let expected_res_str = + serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); + let expected = format!( + r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#, + expected_res_str + ); + assert_eq!(expected, response); } #[test] @@ -425,7 +421,7 @@ mod tests { let rpc = RpcSolPubSubImpl::default(); let session = create_session(); - let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); + let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe( session, subscriber, @@ -453,7 +449,6 @@ mod tests { sleep(Duration::from_millis(200)); // Test signature confirmation notification #1 - let string = receiver.poll(); let expected_data = bank_forks .read() .unwrap() @@ -477,9 +472,8 @@ mod tests { } }); - if let Async::Ready(Some(response)) = string.unwrap() { - assert_eq!(serde_json::to_string(&expected).unwrap(), response); - } + let response = robust_poll_or_panic(receiver); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); let tx = system_transaction::transfer(&alice, &witness.pubkey(), 1, blockhash); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); @@ -558,7 +552,7 @@ mod tests { let rpc = RpcSolPubSubImpl::default(); let session = create_session(); - let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); + let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash); @@ -570,7 +564,9 @@ mod tests { .process_transaction(&tx) .unwrap(); rpc.subscriptions.notify_subscribers(0, &bank_forks); - let _panic = receiver.poll(); + // allow 200ms for notification thread to wake + std::thread::sleep(Duration::from_millis(200)); + let _panic = robust_poll_or_panic(receiver); } #[test] @@ -587,7 +583,7 @@ mod tests { let rpc = RpcSolPubSubImpl::default(); let session = create_session(); - let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); + let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash); @@ -608,7 +604,6 @@ mod tests { let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); bank_forks.write().unwrap().insert(bank2); rpc.subscriptions.notify_subscribers(2, &bank_forks); - let string = receiver.poll(); let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", @@ -623,61 +618,54 @@ mod tests { "subscription": 0, } }); - if let Async::Ready(Some(response)) = string.unwrap() { - assert_eq!(serde_json::to_string(&expected).unwrap(), response); - } + let response = robust_poll_or_panic(receiver); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); } #[test] fn test_slot_subscribe() { let rpc = RpcSolPubSubImpl::default(); let session = create_session(); - let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("slotNotification"); + let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification"); rpc.slot_subscribe(session, subscriber); rpc.subscriptions.notify_slot(0, 0, 0); - // Test slot confirmation notification - let string = receiver.poll(); - if let Async::Ready(Some(response)) = string.unwrap() { - let expected_res = SlotInfo { - parent: 0, - slot: 0, - root: 0, - }; - let expected_res_str = - serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, - expected_res_str - ); - assert_eq!(expected, response); - } + let response = robust_poll_or_panic(receiver); + let expected_res = SlotInfo { + parent: 0, + slot: 0, + root: 0, + }; + let expected_res_str = + serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); + let expected = format!( + r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, + expected_res_str + ); + assert_eq!(expected, response); } #[test] fn test_slot_unsubscribe() { let rpc = RpcSolPubSubImpl::default(); let session = create_session(); - let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("slotNotification"); + let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification"); rpc.slot_subscribe(session, subscriber); rpc.subscriptions.notify_slot(0, 0, 0); - - let string = receiver.poll(); - if let Async::Ready(Some(response)) = string.unwrap() { - let expected_res = SlotInfo { - parent: 0, - slot: 0, - root: 0, - }; - let expected_res_str = - serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, - expected_res_str - ); - assert_eq!(expected, response); - } + let response = robust_poll_or_panic(receiver); + let expected_res = SlotInfo { + parent: 0, + slot: 0, + root: 0, + }; + let expected_res_str = + serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); + let expected = format!( + r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, + expected_res_str + ); + assert_eq!(expected, response); let session = create_session(); assert!(rpc diff --git a/core/src/rpc_pubsub_service.rs b/core/src/rpc_pubsub_service.rs index 75cd06dd7b..c1c5fee152 100644 --- a/core/src/rpc_pubsub_service.rs +++ b/core/src/rpc_pubsub_service.rs @@ -68,9 +68,9 @@ mod tests { #[test] fn test_pubsub_new() { - let subscriptions = Arc::new(RpcSubscriptions::default()); let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); let thread = pubsub_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-pubsub"); diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 7fc7587064..e7a06f6bcb 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -11,20 +11,45 @@ use solana_sdk::{ account::Account, clock::Slot, pubkey::Pubkey, signature::Signature, transaction, }; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; +use std::ops::DerefMut; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender}; +use std::thread::{Builder, JoinHandle}; +use std::time::Duration; use std::{ collections::HashMap, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, }; +const RECEIVE_DELAY_MILLIS: u64 = 100; + pub type Confirmations = usize; -#[derive(Serialize, Clone)] +#[derive(Serialize, Clone, Copy, Debug)] pub struct SlotInfo { pub slot: Slot, pub parent: Slot, pub root: Slot, } +enum NotificationEntry { + Slot(SlotInfo), + Bank((Slot, Arc>)), +} + +impl std::fmt::Debug for NotificationEntry { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), + NotificationEntry::Bank((current_slot, _)) => { + write!(f, "Bank({{current_slot: {:?}}})", current_slot) + } + } + } +} + +type NotificationSend = Arc>; + type RpcAccountSubscriptions = RwLock, Confirmations)>>>; type RpcProgramSubscriptions = @@ -159,31 +184,80 @@ fn notify_program(accounts: Vec<(Pubkey, Account)>, sink: &Sink } pub struct RpcSubscriptions { - account_subscriptions: RpcAccountSubscriptions, - program_subscriptions: RpcProgramSubscriptions, - signature_subscriptions: RpcSignatureSubscriptions, - slot_subscriptions: RpcSlotSubscriptions, + account_subscriptions: Arc, + program_subscriptions: Arc, + signature_subscriptions: Arc, + slot_subscriptions: Arc, + notification_sender: Arc>>>>, + t_cleanup: Option>, + exit: Arc, } impl Default for RpcSubscriptions { fn default() -> Self { - RpcSubscriptions { - account_subscriptions: RpcAccountSubscriptions::default(), - program_subscriptions: RpcProgramSubscriptions::default(), - signature_subscriptions: RpcSignatureSubscriptions::default(), - slot_subscriptions: RpcSlotSubscriptions::default(), - } + Self::new(&Arc::new(AtomicBool::new(false))) + } +} + +impl Drop for RpcSubscriptions { + fn drop(&mut self) { + self.shutdown().unwrap_or_else(|err| { + warn!("RPC Notification - shutdown error: {:?}", err); + }); } } impl RpcSubscriptions { - pub fn check_account( - &self, + pub fn new(exit: &Arc) -> Self { + let (notification_sender, notification_receiver): ( + Sender, + Receiver, + ) = std::sync::mpsc::channel(); + + let account_subscriptions = Arc::new(RpcAccountSubscriptions::default()); + let program_subscriptions = Arc::new(RpcProgramSubscriptions::default()); + let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default()); + let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default()); + let notification_sender = Arc::new(Mutex::new(notification_sender)); + + let exit_clone = exit.clone(); + let account_subscriptions_clone = account_subscriptions.clone(); + let program_subscriptions_clone = program_subscriptions.clone(); + let signature_subscriptions_clone = signature_subscriptions.clone(); + let slot_subscriptions_clone = slot_subscriptions.clone(); + + let t_cleanup = Builder::new() + .name("solana-rpc-notifications".to_string()) + .spawn(move || { + Self::process_notifications( + exit_clone, + notification_receiver, + account_subscriptions_clone, + program_subscriptions_clone, + signature_subscriptions_clone, + slot_subscriptions_clone, + ); + }) + .unwrap(); + + Self { + account_subscriptions, + program_subscriptions, + signature_subscriptions, + slot_subscriptions, + notification_sender, + t_cleanup: Some(t_cleanup), + exit: exit.clone(), + } + } + + fn check_account( pubkey: &Pubkey, current_slot: Slot, bank_forks: &Arc>, + account_subscriptions: Arc, ) { - let subscriptions = self.account_subscriptions.read().unwrap(); + let subscriptions = account_subscriptions.read().unwrap(); check_confirmations_and_notify( &subscriptions, pubkey, @@ -194,13 +268,13 @@ impl RpcSubscriptions { ); } - pub fn check_program( - &self, + fn check_program( program_id: &Pubkey, current_slot: Slot, bank_forks: &Arc>, + program_subscriptions: Arc, ) { - let subscriptions = self.program_subscriptions.write().unwrap(); + let subscriptions = program_subscriptions.read().unwrap(); check_confirmations_and_notify( &subscriptions, program_id, @@ -211,13 +285,13 @@ impl RpcSubscriptions { ); } - pub fn check_signature( - &self, + fn check_signature( signature: &Signature, current_slot: Slot, bank_forks: &Arc>, + signature_subscriptions: Arc, ) { - let mut subscriptions = self.signature_subscriptions.write().unwrap(); + let mut subscriptions = signature_subscriptions.write().unwrap(); check_confirmations_and_notify( &subscriptions, signature, @@ -280,29 +354,7 @@ impl RpcSubscriptions { /// Notify subscribers of changes to any accounts or new signatures since /// the bank's last checkpoint. pub fn notify_subscribers(&self, current_slot: Slot, bank_forks: &Arc>) { - let pubkeys: Vec<_> = { - let subs = self.account_subscriptions.read().unwrap(); - subs.keys().cloned().collect() - }; - for pubkey in &pubkeys { - self.check_account(pubkey, current_slot, bank_forks); - } - - let programs: Vec<_> = { - let subs = self.program_subscriptions.read().unwrap(); - subs.keys().cloned().collect() - }; - for program_id in &programs { - self.check_program(program_id, current_slot, bank_forks); - } - - let signatures: Vec<_> = { - let subs = self.signature_subscriptions.read().unwrap(); - subs.keys().cloned().collect() - }; - for signature in &signatures { - self.check_signature(signature, current_slot, bank_forks); - } + self.enqueue_notification(NotificationEntry::Bank((current_slot, bank_forks.clone()))); } pub fn add_slot_subscription(&self, sub_id: &SubscriptionId, sink: &Sink) { @@ -316,19 +368,120 @@ impl RpcSubscriptions { } pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) { - let subscriptions = self.slot_subscriptions.read().unwrap(); - for (_, sink) in subscriptions.iter() { - sink.notify(Ok(SlotInfo { slot, parent, root })) - .wait() - .unwrap(); + self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root })); + } + + fn enqueue_notification(&self, notification_entry: NotificationEntry) { + match self + .notification_sender + .lock() + .unwrap() + .send(Arc::new(Mutex::new(notification_entry))) + { + Ok(()) => (), + Err(SendError(notification)) => { + warn!( + "Dropped RPC Notification - receiver disconnected : {:?}", + notification + ); + } + } + } + + fn process_notifications( + exit: Arc, + notification_receiver: Receiver>>, + account_subscriptions: Arc, + program_subscriptions: Arc, + signature_subscriptions: Arc, + slot_subscriptions: Arc, + ) { + loop { + if exit.load(Ordering::Relaxed) { + break; + } + match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) { + Ok(notification_entry) => { + let mut notification_entry = notification_entry.lock().unwrap(); + match notification_entry.deref_mut() { + NotificationEntry::Slot(slot_info) => { + let subscriptions = slot_subscriptions.read().unwrap(); + for (_, sink) in subscriptions.iter() { + sink.notify(Ok(*slot_info)).wait().unwrap(); + } + } + NotificationEntry::Bank((current_slot, bank_forks)) => { + let pubkeys: Vec<_> = { + let subs = account_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for pubkey in &pubkeys { + Self::check_account( + pubkey, + *current_slot, + &bank_forks, + account_subscriptions.clone(), + ); + } + + let programs: Vec<_> = { + let subs = program_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for program_id in &programs { + Self::check_program( + program_id, + *current_slot, + &bank_forks, + program_subscriptions.clone(), + ); + } + + let signatures: Vec<_> = { + let subs = signature_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for signature in &signatures { + Self::check_signature( + signature, + *current_slot, + &bank_forks, + signature_subscriptions.clone(), + ); + } + } + } + } + Err(RecvTimeoutError::Timeout) => { + // not a problem - try reading again + } + Err(RecvTimeoutError::Disconnected) => { + warn!("RPC Notification thread - sender disconnected"); + break; + } + } + } + } + + fn shutdown(&mut self) -> std::thread::Result<()> { + if self.t_cleanup.is_some() { + info!("RPC Notification thread - shutting down"); + self.exit.store(true, Ordering::Relaxed); + let x = self.t_cleanup.take().unwrap().join(); + info!("RPC Notification thread - shut down."); + x + } else { + warn!("RPC Notification thread - already shut down."); + Ok(()) } } } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; + use jsonrpc_core::futures; use jsonrpc_pubsub::typed::Subscriber; use solana_budget_program; use solana_sdk::{ @@ -337,6 +490,26 @@ mod tests { }; use tokio::prelude::{Async, Stream}; + pub(crate) fn robust_poll( + mut receiver: futures::sync::mpsc::Receiver, + ) -> Result { + const INITIAL_DELAY_MS: u64 = RECEIVE_DELAY_MILLIS * 2; + + std::thread::sleep(Duration::from_millis(INITIAL_DELAY_MS)); + for _i in 0..5 { + let found = receiver.poll(); + if let Ok(Async::Ready(Some(result))) = found { + return Ok(result); + } + std::thread::sleep(Duration::from_millis(RECEIVE_DELAY_MILLIS)); + } + Err(RecvTimeoutError::Timeout) + } + + pub(crate) fn robust_poll_or_panic(receiver: futures::sync::mpsc::Receiver) -> T { + robust_poll(receiver).unwrap_or_else(|err| panic!("expected response! {}", err)) + } + #[test] fn test_check_account_subscribe() { let GenesisConfigInfo { @@ -364,11 +537,12 @@ mod tests { .process_transaction(&tx) .unwrap(); - let (subscriber, _id_receiver, mut transport_receiver) = + let (subscriber, _id_receiver, transport_receiver) = Subscriber::new_test("accountNotification"); let sub_id = SubscriptionId::Number(0 as u64); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let subscriptions = RpcSubscriptions::default(); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = RpcSubscriptions::new(&exit); subscriptions.add_account_subscription(&alice.pubkey(), None, &sub_id, &sink); assert!(subscriptions @@ -377,14 +551,12 @@ mod tests { .unwrap() .contains_key(&alice.pubkey())); - subscriptions.check_account(&alice.pubkey(), 0, &bank_forks); - let string = transport_receiver.poll(); - if let Async::Ready(Some(response)) = string.unwrap() { - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"subscription":0}}}}"# - ); - assert_eq!(expected, response); - } + subscriptions.notify_subscribers(0, &bank_forks); + let response = robust_poll_or_panic(transport_receiver); + let expected = format!( + r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"subscription":0}}}}"# + ); + assert_eq!(expected, response); subscriptions.remove_account_subscription(&sub_id); assert!(!subscriptions @@ -421,11 +593,12 @@ mod tests { .process_transaction(&tx) .unwrap(); - let (subscriber, _id_receiver, mut transport_receiver) = + let (subscriber, _id_receiver, transport_receiver) = Subscriber::new_test("programNotification"); let sub_id = SubscriptionId::Number(0 as u64); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let subscriptions = RpcSubscriptions::default(); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = RpcSubscriptions::new(&exit); subscriptions.add_program_subscription(&solana_budget_program::id(), None, &sub_id, &sink); assert!(subscriptions @@ -434,15 +607,13 @@ mod tests { .unwrap() .contains_key(&solana_budget_program::id())); - subscriptions.check_program(&solana_budget_program::id(), 0, &bank_forks); - let string = transport_receiver.poll(); - if let Async::Ready(Some(response)) = string.unwrap() { - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"programNotification","params":{{"result":{{"account":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"pubkey":"{:?}"}},"subscription":0}}}}"#, - alice.pubkey() - ); - assert_eq!(expected, response); - } + subscriptions.notify_subscribers(0, &bank_forks); + let response = robust_poll_or_panic(transport_receiver); + let expected = format!( + r#"{{"jsonrpc":"2.0","method":"programNotification","params":{{"result":{{"account":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"pubkey":"{:?}"}},"subscription":0}}}}"#, + alice.pubkey() + ); + assert_eq!(expected, response); subscriptions.remove_program_subscription(&sub_id); assert!(!subscriptions @@ -472,11 +643,12 @@ mod tests { .process_transaction(&tx) .unwrap(); - let (subscriber, _id_receiver, mut transport_receiver) = + let (subscriber, _id_receiver, transport_receiver) = Subscriber::new_test("signatureNotification"); let sub_id = SubscriptionId::Number(0 as u64); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let subscriptions = RpcSubscriptions::default(); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = RpcSubscriptions::new(&exit); subscriptions.add_signature_subscription(&signature, None, &sub_id, &sink); assert!(subscriptions @@ -485,18 +657,16 @@ mod tests { .unwrap() .contains_key(&signature)); - subscriptions.check_signature(&signature, 0, &bank_forks); - let string = transport_receiver.poll(); - if let Async::Ready(Some(response)) = string.unwrap() { - let expected_res: Option> = Some(Ok(())); - let expected_res_str = - serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#, - expected_res_str - ); - assert_eq!(expected, response); - } + subscriptions.notify_subscribers(0, &bank_forks); + let response = robust_poll_or_panic(transport_receiver); + let expected_res: Option> = Some(Ok(())); + let expected_res_str = + serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); + let expected = format!( + r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#, + expected_res_str + ); + assert_eq!(expected, response); subscriptions.remove_signature_subscription(&sub_id); assert!(!subscriptions @@ -507,11 +677,12 @@ mod tests { } #[test] fn test_check_slot_subscribe() { - let (subscriber, _id_receiver, mut transport_receiver) = + let (subscriber, _id_receiver, transport_receiver) = Subscriber::new_test("slotNotification"); let sub_id = SubscriptionId::Number(0 as u64); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let subscriptions = RpcSubscriptions::default(); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = RpcSubscriptions::new(&exit); subscriptions.add_slot_subscription(&sub_id, &sink); assert!(subscriptions @@ -521,21 +692,19 @@ mod tests { .contains_key(&sub_id)); subscriptions.notify_slot(0, 0, 0); - let string = transport_receiver.poll(); - if let Async::Ready(Some(response)) = string.unwrap() { - let expected_res = SlotInfo { - parent: 0, - slot: 0, - root: 0, - }; - let expected_res_str = - serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, - expected_res_str - ); - assert_eq!(expected, response); - } + let response = robust_poll_or_panic(transport_receiver); + let expected_res = SlotInfo { + parent: 0, + slot: 0, + root: 0, + }; + let expected_res_str = + serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); + let expected = format!( + r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, + expected_res_str + ); + assert_eq!(expected, response); subscriptions.remove_slot_subscription(&sub_id); assert!(!subscriptions diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 09c72318ab..c7824e8e66 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -303,7 +303,7 @@ pub mod tests { None, None, l_receiver, - &Arc::new(RpcSubscriptions::default()), + &Arc::new(RpcSubscriptions::new(&exit)), &poh_recorder, &leader_schedule_cache, &exit, diff --git a/core/src/validator.rs b/core/src/validator.rs index 480a4232af..1d851ea008 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -215,7 +215,7 @@ impl Validator { )) }; - let subscriptions = Arc::new(RpcSubscriptions::default()); + let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); let rpc_pubsub_service = if node.info.rpc_pubsub.port() == 0 { None } else {