From c1a3b6ecc2ca0e9553361b57b47f401c3199dc40 Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Sat, 28 Mar 2020 00:33:40 +0800 Subject: [PATCH] Add RPC subscription api for rooted slots (#9118) automerge --- core/src/replay_stage.rs | 3 ++ core/src/rpc_pubsub.rs | 45 +++++++++++++++--- core/src/rpc_subscriptions.rs | 86 ++++++++++++++++++++++++++++++++--- docs/src/apps/jsonrpc-api.md | 54 +++++++++++++++++++++- 4 files changed, 174 insertions(+), 14 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 6f63a4bf14..f89b1dbde4 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -334,6 +334,7 @@ impl ReplayStage { &latest_root_senders, &mut earliest_vote_on_fork, &mut all_pubkeys, + &subscriptions, )?; ancestors @@ -712,6 +713,7 @@ impl ReplayStage { latest_root_senders: &[Sender], earliest_vote_on_fork: &mut Slot, all_pubkeys: &mut HashSet>, + subscriptions: &Arc, ) -> Result<()> { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -745,6 +747,7 @@ impl ReplayStage { earliest_vote_on_fork, all_pubkeys, ); + subscriptions.notify_roots(rooted_slots); latest_root_senders.iter().for_each(|s| { if let Err(e) = s.send(new_root) { trace!("latest root send failed: {:?}", e); diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 7eb138ed40..89e8ace666 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -5,7 +5,7 @@ use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; use solana_client::rpc_response::{Response as RpcResponse, RpcAccount, RpcKeyedAccount}; -use solana_sdk::{pubkey::Pubkey, signature::Signature, transaction}; +use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature, transaction}; use std::sync::{atomic, Arc}; // Suppress needless_return due to @@ -102,6 +102,18 @@ pub trait RpcSolPubSub { name = "slotUnsubscribe" )] fn slot_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; + + // Get notification when a new root is set + #[pubsub(subscription = "rootNotification", subscribe, name = "rootSubscribe")] + fn root_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber); + + // Unsubscribe from slot notification subscription. + #[pubsub( + subscription = "rootNotification", + unsubscribe, + name = "rootUnsubscribe" + )] + fn root_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; } #[derive(Default)] @@ -274,6 +286,27 @@ impl RpcSolPubSub for RpcSolPubSubImpl { }) } } + + fn root_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber) { + info!("root_subscribe"); + let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); + let sub_id = SubscriptionId::Number(id as u64); + info!("root_subscribe: id={:?}", sub_id); + self.subscriptions.add_root_subscription(sub_id, subscriber); + } + + fn root_unsubscribe(&self, _meta: Option, id: SubscriptionId) -> Result { + info!("root_unsubscribe"); + if self.subscriptions.remove_root_subscription(&id) { + Ok(true) + } else { + Err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Subscription id does not exist".into(), + data: None, + }) + } + } } #[cfg(test)] @@ -338,7 +371,7 @@ mod tests { process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); // Test signature confirmation notification - let response = robust_poll_or_panic(receiver); + let (response, _) = robust_poll_or_panic(receiver); let expected_res: Option> = Some(Ok(())); let expected = json!({ "jsonrpc": "2.0", @@ -480,7 +513,7 @@ mod tests { } }); - let response = robust_poll_or_panic(receiver); + let (response, _) = robust_poll_or_panic(receiver); assert_eq!(serde_json::to_string(&expected).unwrap(), response); let tx = system_transaction::transfer(&alice, &witness.pubkey(), 1, blockhash); @@ -629,7 +662,7 @@ mod tests { "subscription": 0, } }); - let response = robust_poll_or_panic(receiver); + let (response, _) = robust_poll_or_panic(receiver); assert_eq!(serde_json::to_string(&expected).unwrap(), response); } @@ -642,7 +675,7 @@ mod tests { rpc.subscriptions.notify_slot(0, 0, 0); // Test slot confirmation notification - let response = robust_poll_or_panic(receiver); + let (response, _) = robust_poll_or_panic(receiver); let expected_res = SlotInfo { parent: 0, slot: 0, @@ -664,7 +697,7 @@ mod tests { let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification"); rpc.slot_subscribe(session, subscriber); rpc.subscriptions.notify_slot(0, 0, 0); - let response = robust_poll_or_panic(receiver); + let (response, _) = robust_poll_or_panic(receiver); let expected_res = SlotInfo { parent: 0, slot: 0, diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 649c9f6b0d..42dccbf3e1 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -38,12 +38,14 @@ pub struct SlotInfo { enum NotificationEntry { Slot(SlotInfo), + Root(Slot), Bank((Slot, Arc>)), } impl std::fmt::Debug for NotificationEntry { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { + NotificationEntry::Root(root) => write!(f, "Root({})", root), NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), NotificationEntry::Bank((current_slot, _)) => { write!(f, "Bank({{current_slot: {:?}}})", current_slot) @@ -64,6 +66,7 @@ type RpcSignatureSubscriptions = RwLock< >, >; type RpcSlotSubscriptions = RwLock>>; +type RpcRootSubscriptions = RwLock>>; fn add_subscription( subscriptions: &mut HashMap, Confirmations)>>, @@ -224,6 +227,7 @@ pub struct RpcSubscriptions { program_subscriptions: Arc, signature_subscriptions: Arc, slot_subscriptions: Arc, + root_subscriptions: Arc, notification_sender: Arc>>, t_cleanup: Option>, notifier_runtime: Option, @@ -255,6 +259,7 @@ impl RpcSubscriptions { let program_subscriptions = Arc::new(RpcProgramSubscriptions::default()); let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default()); let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default()); + let root_subscriptions = Arc::new(RpcRootSubscriptions::default()); let notification_sender = Arc::new(Mutex::new(notification_sender)); let exit_clone = exit.clone(); @@ -262,6 +267,7 @@ impl RpcSubscriptions { let program_subscriptions_clone = program_subscriptions.clone(); let signature_subscriptions_clone = signature_subscriptions.clone(); let slot_subscriptions_clone = slot_subscriptions.clone(); + let root_subscriptions_clone = root_subscriptions.clone(); let notifier_runtime = RuntimeBuilder::new() .core_threads(1) @@ -281,6 +287,7 @@ impl RpcSubscriptions { program_subscriptions_clone, signature_subscriptions_clone, slot_subscriptions_clone, + root_subscriptions_clone, ); }) .unwrap(); @@ -290,6 +297,7 @@ impl RpcSubscriptions { program_subscriptions, signature_subscriptions, slot_subscriptions, + root_subscriptions, notification_sender, notifier_runtime: Some(notifier_runtime), t_cleanup: Some(t_cleanup), @@ -447,6 +455,24 @@ impl RpcSubscriptions { self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root })); } + pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let mut subscriptions = self.root_subscriptions.write().unwrap(); + subscriptions.insert(sub_id, sink); + } + + pub fn remove_root_subscription(&self, id: &SubscriptionId) -> bool { + let mut subscriptions = self.root_subscriptions.write().unwrap(); + subscriptions.remove(id).is_some() + } + + pub fn notify_roots(&self, mut rooted_slots: Vec) { + rooted_slots.sort(); + rooted_slots.into_iter().for_each(|root| { + self.enqueue_notification(NotificationEntry::Root(root)); + }); + } + fn enqueue_notification(&self, notification_entry: NotificationEntry) { match self .notification_sender @@ -472,6 +498,7 @@ impl RpcSubscriptions { program_subscriptions: Arc, signature_subscriptions: Arc, slot_subscriptions: Arc, + root_subscriptions: Arc, ) { loop { if exit.load(Ordering::Relaxed) { @@ -485,6 +512,12 @@ impl RpcSubscriptions { notifier.notify(slot_info, sink); } } + NotificationEntry::Root(root) => { + let subscriptions = root_subscriptions.read().unwrap(); + for (_, sink) in subscriptions.iter() { + notifier.notify(root, sink); + } + } NotificationEntry::Bank((current_slot, bank_forks)) => { let pubkeys: Vec<_> = { let subs = account_subscriptions.read().unwrap(); @@ -576,7 +609,7 @@ pub(crate) mod tests { pub(crate) fn robust_poll_or_panic( receiver: futures::sync::mpsc::Receiver, - ) -> T { + ) -> (T, futures::sync::mpsc::Receiver) { let (inner_sender, inner_receiver) = channel(); let mut rt = Runtime::new().unwrap(); rt.spawn(futures::lazy(|| { @@ -584,7 +617,9 @@ pub(crate) mod tests { .into_future() .timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) .map(move |result| match result { - (Some(value), _) => inner_sender.send(value).expect("send error"), + (Some(value), receiver) => { + inner_sender.send((value, receiver)).expect("send error") + } (None, _) => panic!("unexpected end of stream"), }) .map_err(|err| panic!("stream error {:?}", err)); @@ -638,7 +673,7 @@ pub(crate) mod tests { .contains_key(&alice.pubkey())); subscriptions.notify_subscribers(0, &bank_forks); - let response = robust_poll_or_panic(transport_receiver); + let (response, _) = robust_poll_or_panic(transport_receiver); let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", @@ -712,7 +747,7 @@ pub(crate) mod tests { .contains_key(&solana_budget_program::id())); subscriptions.notify_subscribers(0, &bank_forks); - let response = robust_poll_or_panic(transport_receiver); + let (response, _) = robust_poll_or_panic(transport_receiver); let expected = json!({ "jsonrpc": "2.0", "method": "programNotification", @@ -833,7 +868,7 @@ pub(crate) mod tests { "subscription": 2, } }); - let response = robust_poll_or_panic(past_bank_recv); + let (response, _) = robust_poll_or_panic(past_bank_recv); assert_eq!(serde_json::to_string(&expected).unwrap(), response); let expected = json!({ @@ -847,7 +882,7 @@ pub(crate) mod tests { "subscription": 3, } }); - let response = robust_poll_or_panic(processed_recv); + let (response, _) = robust_poll_or_panic(processed_recv); assert_eq!(serde_json::to_string(&expected).unwrap(), response); let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); @@ -881,7 +916,7 @@ pub(crate) mod tests { .contains_key(&sub_id)); subscriptions.notify_slot(0, 0, 0); - let response = robust_poll_or_panic(transport_receiver); + let (response, _) = robust_poll_or_panic(transport_receiver); let expected_res = SlotInfo { parent: 0, slot: 0, @@ -903,6 +938,43 @@ pub(crate) mod tests { .contains_key(&sub_id)); } + #[test] + fn test_check_root_subscribe() { + let (subscriber, _id_receiver, mut transport_receiver) = + Subscriber::new_test("rootNotification"); + let sub_id = SubscriptionId::Number(0 as u64); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = RpcSubscriptions::new(&exit); + subscriptions.add_root_subscription(sub_id.clone(), subscriber); + + assert!(subscriptions + .root_subscriptions + .read() + .unwrap() + .contains_key(&sub_id)); + + subscriptions.notify_roots(vec![2, 1, 3]); + + for expected_root in 1..=3 { + let (response, receiver) = robust_poll_or_panic(transport_receiver); + transport_receiver = receiver; + let expected_res_str = + serde_json::to_string(&serde_json::to_value(expected_root).unwrap()).unwrap(); + let expected = format!( + r#"{{"jsonrpc":"2.0","method":"rootNotification","params":{{"result":{},"subscription":0}}}}"#, + expected_res_str + ); + assert_eq!(expected, response); + } + + subscriptions.remove_root_subscription(&sub_id); + assert!(!subscriptions + .root_subscriptions + .read() + .unwrap() + .contains_key(&sub_id)); + } + #[test] fn test_add_and_remove_subscription() { let mut subscriptions: HashMap, Confirmations)>> = diff --git a/docs/src/apps/jsonrpc-api.md b/docs/src/apps/jsonrpc-api.md index 847509b134..1e1be07ef4 100644 --- a/docs/src/apps/jsonrpc-api.md +++ b/docs/src/apps/jsonrpc-api.md @@ -1255,7 +1255,7 @@ None ### slotUnsubscribe -Unsubscribe from signature confirmation notification +Unsubscribe from slot notifications #### Parameters: @@ -1274,3 +1274,55 @@ Unsubscribe from signature confirmation notification // Result {"jsonrpc": "2.0","result": true,"id": 1} ``` + +### rootSubscribe + +Subscribe to receive notification anytime a new root is set by the validator. + +#### Parameters: + +None + +#### Results: + +* `integer` - subscription id \(needed to unsubscribe\) + +#### Example: + +```bash +// Request +{"jsonrpc":"2.0", "id":1, "method":"rootSubscribe"} + +// Result +{"jsonrpc": "2.0","result": 0,"id": 1} +``` + +#### Notification Format: + +The result is the latest root slot number. + +```bash +{"jsonrpc": "2.0","method": "rootNotification", "params": {"result":42,"subscription":0}} +``` + +### rootUnsubscribe + +Unsubscribe from root notifications + +#### Parameters: + +* `` - subscription id to cancel + +#### Results: + +* `` - unsubscribe success message + +#### Example: + +```bash +// Request +{"jsonrpc":"2.0", "id":1, "method":"rootUnsubscribe", "params":[0]} + +// Result +{"jsonrpc": "2.0","result": true,"id": 1} +```