diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index e5d15e82ed..a7a9fc3b01 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -4,7 +4,7 @@ use crate::rpc_subscriptions::{Confirmations, RpcSubscriptions, SlotInfo}; use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; -use solana_client::rpc_response::{RpcAccount, RpcKeyedAccount}; +use solana_client::rpc_response::{Response as RpcResponse, RpcAccount, RpcKeyedAccount}; use solana_sdk::{pubkey::Pubkey, signature::Signature, transaction}; use std::sync::{atomic, Arc}; @@ -26,7 +26,7 @@ pub trait RpcSolPubSub { fn account_subscribe( &self, meta: Self::Metadata, - subscriber: Subscriber, + subscriber: Subscriber>, pubkey_str: String, confirmations: Option, ); @@ -50,7 +50,7 @@ pub trait RpcSolPubSub { fn program_subscribe( &self, meta: Self::Metadata, - subscriber: Subscriber, + subscriber: Subscriber>, pubkey_str: String, confirmations: Option, ); @@ -74,7 +74,7 @@ pub trait RpcSolPubSub { fn signature_subscribe( &self, meta: Self::Metadata, - subscriber: Subscriber>, + subscriber: Subscriber>>, signature_str: String, confirmations: Option, ); @@ -133,7 +133,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { fn account_subscribe( &self, _meta: Self::Metadata, - subscriber: Subscriber, + subscriber: Subscriber>, pubkey_str: String, confirmations: Option, ) { @@ -171,7 +171,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { fn program_subscribe( &self, _meta: Self::Metadata, - subscriber: Subscriber, + subscriber: Subscriber>, pubkey_str: String, confirmations: Option, ) { @@ -209,7 +209,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { fn signature_subscribe( &self, _meta: Self::Metadata, - subscriber: Subscriber>, + subscriber: Subscriber>>, signature_str: String, confirmations: Option, ) { @@ -340,13 +340,18 @@ mod tests { // Test signature confirmation notification let response = robust_poll_or_panic(receiver); let expected_res: Option> = Some(Ok(())); - let expected_res_str = - serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#, - expected_res_str - ); - assert_eq!(expected, response); + let expected = json!({ + "jsonrpc": "2.0", + "method": "signatureNotification", + "params": { + "result": { + "context": { "slot": 0 }, + "value": expected_res, + }, + "subscription": 0, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); } #[test] @@ -462,11 +467,14 @@ mod tests { "method": "accountNotification", "params": { "result": { - "owner": budget_program_id.to_string(), - "lamports": 51, - "data": bs58::encode(expected_data).into_string(), - "executable": false, - "rentEpoch": 1, + "context": { "slot": 0 }, + "value": { + "owner": budget_program_id.to_string(), + "lamports": 51, + "data": bs58::encode(expected_data).into_string(), + "executable": false, + "rentEpoch": 1, + }, }, "subscription": 0, } @@ -609,11 +617,14 @@ mod tests { "method": "accountNotification", "params": { "result": { - "owner": system_program::id().to_string(), - "lamports": 100, - "data": "", - "executable": false, - "rentEpoch": 1, + "context": { "slot": 0 }, + "value": { + "owner": system_program::id().to_string(), + "lamports": 100, + "data": "", + "executable": false, + "rentEpoch": 1, + }, }, "subscription": 0, } diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 6019812c32..4da5cf216a 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -4,7 +4,7 @@ use core::hash::Hash; use jsonrpc_core::futures::Future; use jsonrpc_pubsub::{typed::Sink, SubscriptionId}; use serde::Serialize; -use solana_client::rpc_response::{RpcAccount, RpcKeyedAccount}; +use solana_client::rpc_response::{Response, RpcAccount, RpcKeyedAccount, RpcResponseContext}; use solana_ledger::bank_forks::BankForks; use solana_runtime::bank::Bank; use solana_sdk::{ @@ -50,11 +50,15 @@ impl std::fmt::Debug for NotificationEntry { } type RpcAccountSubscriptions = - RwLock, Confirmations)>>>; -type RpcProgramSubscriptions = - RwLock, Confirmations)>>>; + RwLock>, Confirmations)>>>; +type RpcProgramSubscriptions = RwLock< + HashMap>, Confirmations)>>, +>; type RpcSignatureSubscriptions = RwLock< - HashMap>, Confirmations)>>, + HashMap< + Signature, + HashMap>>, Confirmations)>, + >, >; type RpcSlotSubscriptions = RwLock>>; @@ -105,8 +109,9 @@ where found } +#[allow(clippy::type_complexity)] fn check_confirmations_and_notify( - subscriptions: &HashMap, Confirmations)>>, + subscriptions: &HashMap>, Confirmations)>>, hashmap_key: &K, current_slot: Slot, bank_forks: &Arc>, @@ -131,7 +136,7 @@ where let mut notified_set: HashSet = HashSet::new(); if let Some(hashmap) = subscriptions.get(hashmap_key) { - for (bank_sub_id, (sink, confirmations)) in hashmap.iter() { + for (sub_id, (sink, confirmations)) in hashmap.iter() { let desired_slot: Vec = current_ancestors .iter() .filter(|(_, &v)| v == *confirmations) @@ -146,16 +151,18 @@ where .collect(); let root = if root.len() == 1 { root[0] } else { 0 }; if desired_slot.len() == 1 { - let desired_bank = bank_forks - .read() - .unwrap() - .get(desired_slot[0]) - .unwrap() - .clone(); + let slot = desired_slot[0]; + let desired_bank = bank_forks.read().unwrap().get(slot).unwrap().clone(); let results = bank_method(&desired_bank, hashmap_key); for result in filter_results(results, root) { - notifier.notify(result, sink); - notified_set.insert(bank_sub_id.clone()); + notifier.notify( + Response { + context: RpcResponseContext { slot }, + value: result, + }, + sink, + ); + notified_set.insert(sub_id.clone()); } } } @@ -354,7 +361,7 @@ impl RpcSubscriptions { pubkey: &Pubkey, confirmations: Option, sub_id: &SubscriptionId, - sink: &Sink, + sink: &Sink>, ) { let mut subscriptions = self.account_subscriptions.write().unwrap(); add_subscription(&mut subscriptions, pubkey, confirmations, sub_id, sink); @@ -370,7 +377,7 @@ impl RpcSubscriptions { program_id: &Pubkey, confirmations: Option, sub_id: &SubscriptionId, - sink: &Sink, + sink: &Sink>, ) { let mut subscriptions = self.program_subscriptions.write().unwrap(); add_subscription(&mut subscriptions, program_id, confirmations, sub_id, sink); @@ -386,7 +393,7 @@ impl RpcSubscriptions { signature: &Signature, confirmations: Option, sub_id: &SubscriptionId, - sink: &Sink>, + sink: &Sink>>, ) { let mut subscriptions = self.signature_subscriptions.write().unwrap(); add_subscription(&mut subscriptions, signature, confirmations, sub_id, sink); @@ -610,10 +617,24 @@ pub(crate) mod tests { subscriptions.notify_subscribers(0, &bank_forks); let response = robust_poll_or_panic(transport_receiver); - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"subscription":0}}}}"# - ); - assert_eq!(expected, response); + let expected = json!({ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "context": { "slot": 0 }, + "value": { + "data": "1111111111111111", + "executable": false, + "lamports": 1, + "owner": "Budget1111111111111111111111111111111111111", + "rentEpoch": 1, + }, + }, + "subscription": 0, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); subscriptions.remove_account_subscription(&sub_id); assert!(!subscriptions @@ -666,11 +687,27 @@ pub(crate) mod tests { subscriptions.notify_subscribers(0, &bank_forks); let response = robust_poll_or_panic(transport_receiver); - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"programNotification","params":{{"result":{{"account":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"pubkey":"{:?}"}},"subscription":0}}}}"#, - alice.pubkey() - ); - assert_eq!(expected, response); + let expected = json!({ + "jsonrpc": "2.0", + "method": "programNotification", + "params": { + "result": { + "context": { "slot": 0 }, + "value": { + "account": { + "data": "1111111111111111", + "executable": false, + "lamports": 1, + "owner": "Budget1111111111111111111111111111111111111", + "rentEpoch": 1, + }, + "pubkey": alice.pubkey().to_string(), + }, + }, + "subscription": 0, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); subscriptions.remove_program_subscription(&sub_id); assert!(!subscriptions @@ -728,13 +765,18 @@ pub(crate) mod tests { subscriptions.notify_subscribers(0, &bank_forks); let response = robust_poll_or_panic(transport_receiver); let expected_res: Option> = Some(Ok(())); - let expected_res_str = - serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#, - expected_res_str - ); - assert_eq!(expected, response); + let expected = json!({ + "jsonrpc": "2.0", + "method": "signatureNotification", + "params": { + "result": { + "context": { "slot": 0 }, + "value": expected_res, + }, + "subscription": 0, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); // Subscription should be automatically removed after notification assert!(!subscriptions diff --git a/core/tests/rpc.rs b/core/tests/rpc.rs index 8c51a6c028..866b2f8e6a 100644 --- a/core/tests/rpc.rs +++ b/core/tests/rpc.rs @@ -7,7 +7,7 @@ use jsonrpc_core_client::transports::ws; use log::*; use reqwest::{self, header::CONTENT_TYPE}; use serde_json::{json, Value}; -use solana_client::rpc_client::get_rpc_request_str; +use solana_client::{rpc_client::get_rpc_request_str, rpc_response::Response}; use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, validator::TestValidator}; use solana_sdk::{hash::Hash, pubkey::Pubkey, system_transaction, transaction}; use std::{ @@ -217,7 +217,7 @@ fn test_rpc_subscriptions() { // Create the pub sub runtime let mut rt = Runtime::new().unwrap(); let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub); - let (sender, receiver) = channel::<(String, transaction::Result<()>)>(); + let (sender, receiver) = channel::<(String, Response>)>(); let sender = Arc::new(Mutex::new(sender)); rt.spawn({ @@ -253,7 +253,7 @@ fn test_rpc_subscriptions() { assert!(now.elapsed().unwrap() < timeout); match receiver.recv_timeout(Duration::from_secs(1)) { Ok((sig, result)) => { - assert!(result.is_ok()); + assert!(result.value.is_ok()); assert!(signature_set.remove(&sig)); } Err(_err) => {