diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 33e3dff9ca..c0a63f2f91 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -336,6 +336,7 @@ impl ReplayStage { &lockouts_sender, &accounts_hash_sender, &latest_root_senders, + &subscriptions, )?; } datapoint_debug!( @@ -607,6 +608,7 @@ impl ReplayStage { lockouts_sender: &Sender, accounts_hash_sender: &Option, latest_root_senders: &[Sender], + subscriptions: &Arc, ) -> Result<()> { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -633,6 +635,7 @@ impl ReplayStage { .set_roots(&rooted_slots) .expect("Ledger set roots failed"); Self::handle_new_root(new_root, &bank_forks, progress, accounts_hash_sender); + subscriptions.notify_roots(rooted_slots); latest_root_senders.iter().for_each(|s| { if let Err(e) = s.send(new_root) { trace!("latest root send failed: {:?}", e); diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index b65444eca2..6421ad4482 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -5,7 +5,7 @@ use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; use solana_client::rpc_response::{Response as RpcResponse, RpcAccount, RpcKeyedAccount}; -use solana_sdk::{pubkey::Pubkey, signature::Signature, transaction}; +use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature, transaction}; use std::sync::{atomic, Arc}; // Suppress needless_return due to @@ -102,6 +102,18 @@ pub trait RpcSolPubSub { name = "slotUnsubscribe" )] fn slot_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; + + // Get notification when a new root is set + #[pubsub(subscription = "rootNotification", subscribe, name = "rootSubscribe")] + fn root_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber); + + // Unsubscribe from slot notification subscription. + #[pubsub( + subscription = "rootNotification", + unsubscribe, + name = "rootUnsubscribe" + )] + fn root_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; } #[derive(Default)] @@ -274,6 +286,27 @@ impl RpcSolPubSub for RpcSolPubSubImpl { }) } } + + fn root_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber) { + info!("root_subscribe"); + let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); + let sub_id = SubscriptionId::Number(id as u64); + info!("root_subscribe: id={:?}", sub_id); + self.subscriptions.add_root_subscription(sub_id, subscriber); + } + + fn root_unsubscribe(&self, _meta: Option, id: SubscriptionId) -> Result { + info!("root_unsubscribe"); + if self.subscriptions.remove_root_subscription(&id) { + Ok(true) + } else { + Err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Subscription id does not exist".into(), + data: None, + }) + } + } } #[cfg(test)] @@ -338,7 +371,7 @@ mod tests { process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); // Test signature confirmation notification - let response = robust_poll_or_panic(receiver); + let (response, _) = robust_poll_or_panic(receiver); let expected_res: Option> = Some(Ok(())); let expected = json!({ "jsonrpc": "2.0", @@ -480,7 +513,7 @@ mod tests { } }); - let response = robust_poll_or_panic(receiver); + let (response, _) = robust_poll_or_panic(receiver); assert_eq!(serde_json::to_string(&expected).unwrap(), response); let tx = system_transaction::transfer(&alice, &witness.pubkey(), 1, blockhash); @@ -629,7 +662,7 @@ mod tests { "subscription": 0, } }); - let response = robust_poll_or_panic(receiver); + let (response, _) = robust_poll_or_panic(receiver); assert_eq!(serde_json::to_string(&expected).unwrap(), response); } @@ -642,7 +675,7 @@ mod tests { rpc.subscriptions.notify_slot(0, 0, 0); // Test slot confirmation notification - let response = robust_poll_or_panic(receiver); + let (response, _) = robust_poll_or_panic(receiver); let expected_res = SlotInfo { parent: 0, slot: 0, @@ -664,7 +697,7 @@ mod tests { let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification"); rpc.slot_subscribe(session, subscriber); rpc.subscriptions.notify_slot(0, 0, 0); - let response = robust_poll_or_panic(receiver); + let (response, _) = robust_poll_or_panic(receiver); let expected_res = SlotInfo { parent: 0, slot: 0, diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index af652ea6a1..8d02bde22e 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -38,12 +38,14 @@ pub struct SlotInfo { enum NotificationEntry { Slot(SlotInfo), + Root(Slot), Bank((Slot, Arc>)), } impl std::fmt::Debug for NotificationEntry { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { + NotificationEntry::Root(root) => write!(f, "Root({})", root), NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), NotificationEntry::Bank((current_slot, _)) => { write!(f, "Bank({{current_slot: {:?}}})", current_slot) @@ -64,6 +66,7 @@ type RpcSignatureSubscriptions = RwLock< >, >; type RpcSlotSubscriptions = RwLock>>; +type RpcRootSubscriptions = RwLock>>; fn add_subscription( subscriptions: &mut HashMap, Confirmations)>>, @@ -224,6 +227,7 @@ pub struct RpcSubscriptions { program_subscriptions: Arc, signature_subscriptions: Arc, slot_subscriptions: Arc, + root_subscriptions: Arc, notification_sender: Arc>>, t_cleanup: Option>, notifier_runtime: Option, @@ -255,6 +259,7 @@ impl RpcSubscriptions { let program_subscriptions = Arc::new(RpcProgramSubscriptions::default()); let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default()); let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default()); + let root_subscriptions = Arc::new(RpcRootSubscriptions::default()); let notification_sender = Arc::new(Mutex::new(notification_sender)); let exit_clone = exit.clone(); @@ -262,6 +267,7 @@ impl RpcSubscriptions { let program_subscriptions_clone = program_subscriptions.clone(); let signature_subscriptions_clone = signature_subscriptions.clone(); let slot_subscriptions_clone = slot_subscriptions.clone(); + let root_subscriptions_clone = root_subscriptions.clone(); let notifier_runtime = RuntimeBuilder::new() .core_threads(1) @@ -281,6 +287,7 @@ impl RpcSubscriptions { program_subscriptions_clone, signature_subscriptions_clone, slot_subscriptions_clone, + root_subscriptions_clone, ); }) .unwrap(); @@ -290,6 +297,7 @@ impl RpcSubscriptions { program_subscriptions, signature_subscriptions, slot_subscriptions, + root_subscriptions, notification_sender, notifier_runtime: Some(notifier_runtime), t_cleanup: Some(t_cleanup), @@ -447,6 +455,24 @@ impl RpcSubscriptions { self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root })); } + pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let mut subscriptions = self.root_subscriptions.write().unwrap(); + subscriptions.insert(sub_id, sink); + } + + pub fn remove_root_subscription(&self, id: &SubscriptionId) -> bool { + let mut subscriptions = self.root_subscriptions.write().unwrap(); + subscriptions.remove(id).is_some() + } + + pub fn notify_roots(&self, mut rooted_slots: Vec) { + rooted_slots.sort(); + rooted_slots.into_iter().for_each(|root| { + self.enqueue_notification(NotificationEntry::Root(root)); + }); + } + fn enqueue_notification(&self, notification_entry: NotificationEntry) { match self .notification_sender @@ -472,6 +498,7 @@ impl RpcSubscriptions { program_subscriptions: Arc, signature_subscriptions: Arc, slot_subscriptions: Arc, + root_subscriptions: Arc, ) { loop { if exit.load(Ordering::Relaxed) { @@ -485,6 +512,12 @@ impl RpcSubscriptions { notifier.notify(slot_info, sink); } } + NotificationEntry::Root(root) => { + let subscriptions = root_subscriptions.read().unwrap(); + for (_, sink) in subscriptions.iter() { + notifier.notify(root, sink); + } + } NotificationEntry::Bank((current_slot, bank_forks)) => { let pubkeys: Vec<_> = { let subs = account_subscriptions.read().unwrap(); @@ -564,33 +597,39 @@ impl RpcSubscriptions { pub(crate) mod tests { use super::*; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use jsonrpc_core::futures; + use jsonrpc_core::futures::{self, stream::Stream}; use jsonrpc_pubsub::typed::Subscriber; use solana_budget_program; use solana_sdk::{ signature::{Keypair, Signer}, system_transaction, }; - use tokio::prelude::{Async, Stream}; + use std::{fmt::Debug, sync::mpsc::channel, time::Instant}; + use tokio::{prelude::FutureExt, runtime::Runtime, timer::Delay}; - pub(crate) fn robust_poll( - mut receiver: futures::sync::mpsc::Receiver, - ) -> Result { - const INITIAL_DELAY_MS: u64 = RECEIVE_DELAY_MILLIS * 2; + pub(crate) fn robust_poll_or_panic( + receiver: futures::sync::mpsc::Receiver, + ) -> (T, futures::sync::mpsc::Receiver) { + let (inner_sender, inner_receiver) = channel(); + let mut rt = Runtime::new().unwrap(); + rt.spawn(futures::lazy(|| { + let recv_timeout = receiver + .into_future() + .timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) + .map(move |result| match result { + (Some(value), receiver) => { + inner_sender.send((value, receiver)).expect("send error") + } + (None, _) => panic!("unexpected end of stream"), + }) + .map_err(|err| panic!("stream error {:?}", err)); - std::thread::sleep(Duration::from_millis(INITIAL_DELAY_MS)); - for _i in 0..5 { - let found = receiver.poll(); - if let Ok(Async::Ready(Some(result))) = found { - return Ok(result); - } - std::thread::sleep(Duration::from_millis(RECEIVE_DELAY_MILLIS)); - } - Err(RecvTimeoutError::Timeout) - } - - pub(crate) fn robust_poll_or_panic(receiver: futures::sync::mpsc::Receiver) -> T { - robust_poll(receiver).unwrap_or_else(|err| panic!("expected response! {}", err)) + const INITIAL_DELAY_MS: u64 = RECEIVE_DELAY_MILLIS * 2; + Delay::new(Instant::now() + Duration::from_millis(INITIAL_DELAY_MS)) + .and_then(|_| recv_timeout) + .map_err(|err| panic!("timer error {:?}", err)) + })); + inner_receiver.recv().expect("recv error") } #[test] @@ -634,7 +673,7 @@ pub(crate) mod tests { .contains_key(&alice.pubkey())); subscriptions.notify_subscribers(0, &bank_forks); - let response = robust_poll_or_panic(transport_receiver); + let (response, _) = robust_poll_or_panic(transport_receiver); let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", @@ -708,7 +747,7 @@ pub(crate) mod tests { .contains_key(&solana_budget_program::id())); subscriptions.notify_subscribers(0, &bank_forks); - let response = robust_poll_or_panic(transport_receiver); + let (response, _) = robust_poll_or_panic(transport_receiver); let expected = json!({ "jsonrpc": "2.0", "method": "programNotification", @@ -829,7 +868,7 @@ pub(crate) mod tests { "subscription": 2, } }); - let response = robust_poll_or_panic(past_bank_recv); + let (response, _) = robust_poll_or_panic(past_bank_recv); assert_eq!(serde_json::to_string(&expected).unwrap(), response); let expected = json!({ @@ -843,7 +882,7 @@ pub(crate) mod tests { "subscription": 3, } }); - let response = robust_poll_or_panic(processed_recv); + let (response, _) = robust_poll_or_panic(processed_recv); assert_eq!(serde_json::to_string(&expected).unwrap(), response); let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); @@ -877,7 +916,7 @@ pub(crate) mod tests { .contains_key(&sub_id)); subscriptions.notify_slot(0, 0, 0); - let response = robust_poll_or_panic(transport_receiver); + let (response, _) = robust_poll_or_panic(transport_receiver); let expected_res = SlotInfo { parent: 0, slot: 0, @@ -899,6 +938,43 @@ pub(crate) mod tests { .contains_key(&sub_id)); } + #[test] + fn test_check_root_subscribe() { + let (subscriber, _id_receiver, mut transport_receiver) = + Subscriber::new_test("rootNotification"); + let sub_id = SubscriptionId::Number(0 as u64); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = RpcSubscriptions::new(&exit); + subscriptions.add_root_subscription(sub_id.clone(), subscriber); + + assert!(subscriptions + .root_subscriptions + .read() + .unwrap() + .contains_key(&sub_id)); + + subscriptions.notify_roots(vec![2, 1, 3]); + + for expected_root in 1..=3 { + let (response, receiver) = robust_poll_or_panic(transport_receiver); + transport_receiver = receiver; + let expected_res_str = + serde_json::to_string(&serde_json::to_value(expected_root).unwrap()).unwrap(); + let expected = format!( + r#"{{"jsonrpc":"2.0","method":"rootNotification","params":{{"result":{},"subscription":0}}}}"#, + expected_res_str + ); + assert_eq!(expected, response); + } + + subscriptions.remove_root_subscription(&sub_id); + assert!(!subscriptions + .root_subscriptions + .read() + .unwrap() + .contains_key(&sub_id)); + } + #[test] fn test_add_and_remove_subscription() { let mut subscriptions: HashMap, Confirmations)>> = diff --git a/docs/src/apps/jsonrpc-api.md b/docs/src/apps/jsonrpc-api.md index 3332dcd7da..6c0539d8e3 100644 --- a/docs/src/apps/jsonrpc-api.md +++ b/docs/src/apps/jsonrpc-api.md @@ -1255,7 +1255,7 @@ None ### slotUnsubscribe -Unsubscribe from signature confirmation notification +Unsubscribe from slot notifications #### Parameters: @@ -1274,3 +1274,55 @@ Unsubscribe from signature confirmation notification // Result {"jsonrpc": "2.0","result": true,"id": 1} ``` + +### rootSubscribe + +Subscribe to receive notification anytime a new root is set by the validator. + +#### Parameters: + +None + +#### Results: + +* `integer` - subscription id \(needed to unsubscribe\) + +#### Example: + +```bash +// Request +{"jsonrpc":"2.0", "id":1, "method":"rootSubscribe"} + +// Result +{"jsonrpc": "2.0","result": 0,"id": 1} +``` + +#### Notification Format: + +The result is the latest root slot number. + +```bash +{"jsonrpc": "2.0","method": "rootNotification", "params": {"result":42,"subscription":0}} +``` + +### rootUnsubscribe + +Unsubscribe from root notifications + +#### Parameters: + +* `` - subscription id to cancel + +#### Results: + +* `` - unsubscribe success message + +#### Example: + +```bash +// Request +{"jsonrpc":"2.0", "id":1, "method":"rootUnsubscribe", "params":[0]} + +// Result +{"jsonrpc": "2.0","result": true,"id": 1} +```