@ -4,7 +4,7 @@ use crate::rpc_subscriptions::{Confirmations, RpcSubscriptions, SlotInfo};
|
|||||||
use jsonrpc_core::{Error, ErrorCode, Result};
|
use jsonrpc_core::{Error, ErrorCode, Result};
|
||||||
use jsonrpc_derive::rpc;
|
use jsonrpc_derive::rpc;
|
||||||
use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId};
|
use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId};
|
||||||
use solana_client::rpc_response::{RpcAccount, RpcKeyedAccount};
|
use solana_client::rpc_response::{Response as RpcResponse, RpcAccount, RpcKeyedAccount};
|
||||||
use solana_sdk::{pubkey::Pubkey, signature::Signature, transaction};
|
use solana_sdk::{pubkey::Pubkey, signature::Signature, transaction};
|
||||||
use std::sync::{atomic, Arc};
|
use std::sync::{atomic, Arc};
|
||||||
|
|
||||||
@ -26,7 +26,7 @@ pub trait RpcSolPubSub {
|
|||||||
fn account_subscribe(
|
fn account_subscribe(
|
||||||
&self,
|
&self,
|
||||||
meta: Self::Metadata,
|
meta: Self::Metadata,
|
||||||
subscriber: Subscriber<RpcAccount>,
|
subscriber: Subscriber<RpcResponse<RpcAccount>>,
|
||||||
pubkey_str: String,
|
pubkey_str: String,
|
||||||
confirmations: Option<Confirmations>,
|
confirmations: Option<Confirmations>,
|
||||||
);
|
);
|
||||||
@ -50,7 +50,7 @@ pub trait RpcSolPubSub {
|
|||||||
fn program_subscribe(
|
fn program_subscribe(
|
||||||
&self,
|
&self,
|
||||||
meta: Self::Metadata,
|
meta: Self::Metadata,
|
||||||
subscriber: Subscriber<RpcKeyedAccount>,
|
subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
|
||||||
pubkey_str: String,
|
pubkey_str: String,
|
||||||
confirmations: Option<Confirmations>,
|
confirmations: Option<Confirmations>,
|
||||||
);
|
);
|
||||||
@ -74,7 +74,7 @@ pub trait RpcSolPubSub {
|
|||||||
fn signature_subscribe(
|
fn signature_subscribe(
|
||||||
&self,
|
&self,
|
||||||
meta: Self::Metadata,
|
meta: Self::Metadata,
|
||||||
subscriber: Subscriber<transaction::Result<()>>,
|
subscriber: Subscriber<RpcResponse<transaction::Result<()>>>,
|
||||||
signature_str: String,
|
signature_str: String,
|
||||||
confirmations: Option<Confirmations>,
|
confirmations: Option<Confirmations>,
|
||||||
);
|
);
|
||||||
@ -133,7 +133,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
|||||||
fn account_subscribe(
|
fn account_subscribe(
|
||||||
&self,
|
&self,
|
||||||
_meta: Self::Metadata,
|
_meta: Self::Metadata,
|
||||||
subscriber: Subscriber<RpcAccount>,
|
subscriber: Subscriber<RpcResponse<RpcAccount>>,
|
||||||
pubkey_str: String,
|
pubkey_str: String,
|
||||||
confirmations: Option<Confirmations>,
|
confirmations: Option<Confirmations>,
|
||||||
) {
|
) {
|
||||||
@ -171,7 +171,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
|||||||
fn program_subscribe(
|
fn program_subscribe(
|
||||||
&self,
|
&self,
|
||||||
_meta: Self::Metadata,
|
_meta: Self::Metadata,
|
||||||
subscriber: Subscriber<RpcKeyedAccount>,
|
subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
|
||||||
pubkey_str: String,
|
pubkey_str: String,
|
||||||
confirmations: Option<Confirmations>,
|
confirmations: Option<Confirmations>,
|
||||||
) {
|
) {
|
||||||
@ -209,7 +209,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
|||||||
fn signature_subscribe(
|
fn signature_subscribe(
|
||||||
&self,
|
&self,
|
||||||
_meta: Self::Metadata,
|
_meta: Self::Metadata,
|
||||||
subscriber: Subscriber<transaction::Result<()>>,
|
subscriber: Subscriber<RpcResponse<transaction::Result<()>>>,
|
||||||
signature_str: String,
|
signature_str: String,
|
||||||
confirmations: Option<Confirmations>,
|
confirmations: Option<Confirmations>,
|
||||||
) {
|
) {
|
||||||
@ -340,13 +340,18 @@ mod tests {
|
|||||||
// Test signature confirmation notification
|
// Test signature confirmation notification
|
||||||
let response = robust_poll_or_panic(receiver);
|
let response = robust_poll_or_panic(receiver);
|
||||||
let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
|
let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
|
||||||
let expected_res_str =
|
let expected = json!({
|
||||||
serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap();
|
"jsonrpc": "2.0",
|
||||||
let expected = format!(
|
"method": "signatureNotification",
|
||||||
r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#,
|
"params": {
|
||||||
expected_res_str
|
"result": {
|
||||||
);
|
"context": { "slot": 0 },
|
||||||
assert_eq!(expected, response);
|
"value": expected_res,
|
||||||
|
},
|
||||||
|
"subscription": 0,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -462,11 +467,14 @@ mod tests {
|
|||||||
"method": "accountNotification",
|
"method": "accountNotification",
|
||||||
"params": {
|
"params": {
|
||||||
"result": {
|
"result": {
|
||||||
"owner": budget_program_id.to_string(),
|
"context": { "slot": 0 },
|
||||||
"lamports": 51,
|
"value": {
|
||||||
"data": bs58::encode(expected_data).into_string(),
|
"owner": budget_program_id.to_string(),
|
||||||
"executable": false,
|
"lamports": 51,
|
||||||
"rentEpoch": 1,
|
"data": bs58::encode(expected_data).into_string(),
|
||||||
|
"executable": false,
|
||||||
|
"rentEpoch": 1,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"subscription": 0,
|
"subscription": 0,
|
||||||
}
|
}
|
||||||
@ -609,11 +617,14 @@ mod tests {
|
|||||||
"method": "accountNotification",
|
"method": "accountNotification",
|
||||||
"params": {
|
"params": {
|
||||||
"result": {
|
"result": {
|
||||||
"owner": system_program::id().to_string(),
|
"context": { "slot": 0 },
|
||||||
"lamports": 100,
|
"value": {
|
||||||
"data": "",
|
"owner": system_program::id().to_string(),
|
||||||
"executable": false,
|
"lamports": 100,
|
||||||
"rentEpoch": 1,
|
"data": "",
|
||||||
|
"executable": false,
|
||||||
|
"rentEpoch": 1,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"subscription": 0,
|
"subscription": 0,
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,7 @@ use core::hash::Hash;
|
|||||||
use jsonrpc_core::futures::Future;
|
use jsonrpc_core::futures::Future;
|
||||||
use jsonrpc_pubsub::{typed::Sink, SubscriptionId};
|
use jsonrpc_pubsub::{typed::Sink, SubscriptionId};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use solana_client::rpc_response::{RpcAccount, RpcKeyedAccount};
|
use solana_client::rpc_response::{Response, RpcAccount, RpcKeyedAccount, RpcResponseContext};
|
||||||
use solana_ledger::bank_forks::BankForks;
|
use solana_ledger::bank_forks::BankForks;
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
@ -50,11 +50,15 @@ impl std::fmt::Debug for NotificationEntry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type RpcAccountSubscriptions =
|
type RpcAccountSubscriptions =
|
||||||
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<RpcAccount>, Confirmations)>>>;
|
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<Response<RpcAccount>>, Confirmations)>>>;
|
||||||
type RpcProgramSubscriptions =
|
type RpcProgramSubscriptions = RwLock<
|
||||||
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<RpcKeyedAccount>, Confirmations)>>>;
|
HashMap<Pubkey, HashMap<SubscriptionId, (Sink<Response<RpcKeyedAccount>>, Confirmations)>>,
|
||||||
|
>;
|
||||||
type RpcSignatureSubscriptions = RwLock<
|
type RpcSignatureSubscriptions = RwLock<
|
||||||
HashMap<Signature, HashMap<SubscriptionId, (Sink<transaction::Result<()>>, Confirmations)>>,
|
HashMap<
|
||||||
|
Signature,
|
||||||
|
HashMap<SubscriptionId, (Sink<Response<transaction::Result<()>>>, Confirmations)>,
|
||||||
|
>,
|
||||||
>;
|
>;
|
||||||
type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
|
type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
|
||||||
|
|
||||||
@ -105,8 +109,9 @@ where
|
|||||||
found
|
found
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
fn check_confirmations_and_notify<K, S, B, F, X>(
|
fn check_confirmations_and_notify<K, S, B, F, X>(
|
||||||
subscriptions: &HashMap<K, HashMap<SubscriptionId, (Sink<S>, Confirmations)>>,
|
subscriptions: &HashMap<K, HashMap<SubscriptionId, (Sink<Response<S>>, Confirmations)>>,
|
||||||
hashmap_key: &K,
|
hashmap_key: &K,
|
||||||
current_slot: Slot,
|
current_slot: Slot,
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
@ -131,7 +136,7 @@ where
|
|||||||
|
|
||||||
let mut notified_set: HashSet<SubscriptionId> = HashSet::new();
|
let mut notified_set: HashSet<SubscriptionId> = HashSet::new();
|
||||||
if let Some(hashmap) = subscriptions.get(hashmap_key) {
|
if let Some(hashmap) = subscriptions.get(hashmap_key) {
|
||||||
for (bank_sub_id, (sink, confirmations)) in hashmap.iter() {
|
for (sub_id, (sink, confirmations)) in hashmap.iter() {
|
||||||
let desired_slot: Vec<u64> = current_ancestors
|
let desired_slot: Vec<u64> = current_ancestors
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(_, &v)| v == *confirmations)
|
.filter(|(_, &v)| v == *confirmations)
|
||||||
@ -146,16 +151,18 @@ where
|
|||||||
.collect();
|
.collect();
|
||||||
let root = if root.len() == 1 { root[0] } else { 0 };
|
let root = if root.len() == 1 { root[0] } else { 0 };
|
||||||
if desired_slot.len() == 1 {
|
if desired_slot.len() == 1 {
|
||||||
let desired_bank = bank_forks
|
let slot = desired_slot[0];
|
||||||
.read()
|
let desired_bank = bank_forks.read().unwrap().get(slot).unwrap().clone();
|
||||||
.unwrap()
|
|
||||||
.get(desired_slot[0])
|
|
||||||
.unwrap()
|
|
||||||
.clone();
|
|
||||||
let results = bank_method(&desired_bank, hashmap_key);
|
let results = bank_method(&desired_bank, hashmap_key);
|
||||||
for result in filter_results(results, root) {
|
for result in filter_results(results, root) {
|
||||||
notifier.notify(result, sink);
|
notifier.notify(
|
||||||
notified_set.insert(bank_sub_id.clone());
|
Response {
|
||||||
|
context: RpcResponseContext { slot },
|
||||||
|
value: result,
|
||||||
|
},
|
||||||
|
sink,
|
||||||
|
);
|
||||||
|
notified_set.insert(sub_id.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -354,7 +361,7 @@ impl RpcSubscriptions {
|
|||||||
pubkey: &Pubkey,
|
pubkey: &Pubkey,
|
||||||
confirmations: Option<Confirmations>,
|
confirmations: Option<Confirmations>,
|
||||||
sub_id: &SubscriptionId,
|
sub_id: &SubscriptionId,
|
||||||
sink: &Sink<RpcAccount>,
|
sink: &Sink<Response<RpcAccount>>,
|
||||||
) {
|
) {
|
||||||
let mut subscriptions = self.account_subscriptions.write().unwrap();
|
let mut subscriptions = self.account_subscriptions.write().unwrap();
|
||||||
add_subscription(&mut subscriptions, pubkey, confirmations, sub_id, sink);
|
add_subscription(&mut subscriptions, pubkey, confirmations, sub_id, sink);
|
||||||
@ -370,7 +377,7 @@ impl RpcSubscriptions {
|
|||||||
program_id: &Pubkey,
|
program_id: &Pubkey,
|
||||||
confirmations: Option<Confirmations>,
|
confirmations: Option<Confirmations>,
|
||||||
sub_id: &SubscriptionId,
|
sub_id: &SubscriptionId,
|
||||||
sink: &Sink<RpcKeyedAccount>,
|
sink: &Sink<Response<RpcKeyedAccount>>,
|
||||||
) {
|
) {
|
||||||
let mut subscriptions = self.program_subscriptions.write().unwrap();
|
let mut subscriptions = self.program_subscriptions.write().unwrap();
|
||||||
add_subscription(&mut subscriptions, program_id, confirmations, sub_id, sink);
|
add_subscription(&mut subscriptions, program_id, confirmations, sub_id, sink);
|
||||||
@ -386,7 +393,7 @@ impl RpcSubscriptions {
|
|||||||
signature: &Signature,
|
signature: &Signature,
|
||||||
confirmations: Option<Confirmations>,
|
confirmations: Option<Confirmations>,
|
||||||
sub_id: &SubscriptionId,
|
sub_id: &SubscriptionId,
|
||||||
sink: &Sink<transaction::Result<()>>,
|
sink: &Sink<Response<transaction::Result<()>>>,
|
||||||
) {
|
) {
|
||||||
let mut subscriptions = self.signature_subscriptions.write().unwrap();
|
let mut subscriptions = self.signature_subscriptions.write().unwrap();
|
||||||
add_subscription(&mut subscriptions, signature, confirmations, sub_id, sink);
|
add_subscription(&mut subscriptions, signature, confirmations, sub_id, sink);
|
||||||
@ -610,10 +617,24 @@ pub(crate) mod tests {
|
|||||||
|
|
||||||
subscriptions.notify_subscribers(0, &bank_forks);
|
subscriptions.notify_subscribers(0, &bank_forks);
|
||||||
let response = robust_poll_or_panic(transport_receiver);
|
let response = robust_poll_or_panic(transport_receiver);
|
||||||
let expected = format!(
|
let expected = json!({
|
||||||
r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"subscription":0}}}}"#
|
"jsonrpc": "2.0",
|
||||||
);
|
"method": "accountNotification",
|
||||||
assert_eq!(expected, response);
|
"params": {
|
||||||
|
"result": {
|
||||||
|
"context": { "slot": 0 },
|
||||||
|
"value": {
|
||||||
|
"data": "1111111111111111",
|
||||||
|
"executable": false,
|
||||||
|
"lamports": 1,
|
||||||
|
"owner": "Budget1111111111111111111111111111111111111",
|
||||||
|
"rentEpoch": 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"subscription": 0,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
|
||||||
|
|
||||||
subscriptions.remove_account_subscription(&sub_id);
|
subscriptions.remove_account_subscription(&sub_id);
|
||||||
assert!(!subscriptions
|
assert!(!subscriptions
|
||||||
@ -666,11 +687,27 @@ pub(crate) mod tests {
|
|||||||
|
|
||||||
subscriptions.notify_subscribers(0, &bank_forks);
|
subscriptions.notify_subscribers(0, &bank_forks);
|
||||||
let response = robust_poll_or_panic(transport_receiver);
|
let response = robust_poll_or_panic(transport_receiver);
|
||||||
let expected = format!(
|
let expected = json!({
|
||||||
r#"{{"jsonrpc":"2.0","method":"programNotification","params":{{"result":{{"account":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"pubkey":"{:?}"}},"subscription":0}}}}"#,
|
"jsonrpc": "2.0",
|
||||||
alice.pubkey()
|
"method": "programNotification",
|
||||||
);
|
"params": {
|
||||||
assert_eq!(expected, response);
|
"result": {
|
||||||
|
"context": { "slot": 0 },
|
||||||
|
"value": {
|
||||||
|
"account": {
|
||||||
|
"data": "1111111111111111",
|
||||||
|
"executable": false,
|
||||||
|
"lamports": 1,
|
||||||
|
"owner": "Budget1111111111111111111111111111111111111",
|
||||||
|
"rentEpoch": 1,
|
||||||
|
},
|
||||||
|
"pubkey": alice.pubkey().to_string(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"subscription": 0,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
|
||||||
|
|
||||||
subscriptions.remove_program_subscription(&sub_id);
|
subscriptions.remove_program_subscription(&sub_id);
|
||||||
assert!(!subscriptions
|
assert!(!subscriptions
|
||||||
@ -728,13 +765,18 @@ pub(crate) mod tests {
|
|||||||
subscriptions.notify_subscribers(0, &bank_forks);
|
subscriptions.notify_subscribers(0, &bank_forks);
|
||||||
let response = robust_poll_or_panic(transport_receiver);
|
let response = robust_poll_or_panic(transport_receiver);
|
||||||
let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
|
let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
|
||||||
let expected_res_str =
|
let expected = json!({
|
||||||
serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap();
|
"jsonrpc": "2.0",
|
||||||
let expected = format!(
|
"method": "signatureNotification",
|
||||||
r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#,
|
"params": {
|
||||||
expected_res_str
|
"result": {
|
||||||
);
|
"context": { "slot": 0 },
|
||||||
assert_eq!(expected, response);
|
"value": expected_res,
|
||||||
|
},
|
||||||
|
"subscription": 0,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
|
||||||
|
|
||||||
// Subscription should be automatically removed after notification
|
// Subscription should be automatically removed after notification
|
||||||
assert!(!subscriptions
|
assert!(!subscriptions
|
||||||
|
@ -7,7 +7,7 @@ use jsonrpc_core_client::transports::ws;
|
|||||||
use log::*;
|
use log::*;
|
||||||
use reqwest::{self, header::CONTENT_TYPE};
|
use reqwest::{self, header::CONTENT_TYPE};
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
use solana_client::rpc_client::get_rpc_request_str;
|
use solana_client::{rpc_client::get_rpc_request_str, rpc_response::Response};
|
||||||
use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, validator::TestValidator};
|
use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, validator::TestValidator};
|
||||||
use solana_sdk::{hash::Hash, pubkey::Pubkey, system_transaction, transaction};
|
use solana_sdk::{hash::Hash, pubkey::Pubkey, system_transaction, transaction};
|
||||||
use std::{
|
use std::{
|
||||||
@ -217,7 +217,7 @@ fn test_rpc_subscriptions() {
|
|||||||
// Create the pub sub runtime
|
// Create the pub sub runtime
|
||||||
let mut rt = Runtime::new().unwrap();
|
let mut rt = Runtime::new().unwrap();
|
||||||
let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub);
|
let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub);
|
||||||
let (sender, receiver) = channel::<(String, transaction::Result<()>)>();
|
let (sender, receiver) = channel::<(String, Response<transaction::Result<()>>)>();
|
||||||
let sender = Arc::new(Mutex::new(sender));
|
let sender = Arc::new(Mutex::new(sender));
|
||||||
|
|
||||||
rt.spawn({
|
rt.spawn({
|
||||||
@ -253,7 +253,7 @@ fn test_rpc_subscriptions() {
|
|||||||
assert!(now.elapsed().unwrap() < timeout);
|
assert!(now.elapsed().unwrap() < timeout);
|
||||||
match receiver.recv_timeout(Duration::from_secs(1)) {
|
match receiver.recv_timeout(Duration::from_secs(1)) {
|
||||||
Ok((sig, result)) => {
|
Ok((sig, result)) => {
|
||||||
assert!(result.is_ok());
|
assert!(result.value.is_ok());
|
||||||
assert!(signature_set.remove(&sig));
|
assert!(signature_set.remove(&sig));
|
||||||
}
|
}
|
||||||
Err(_err) => {
|
Err(_err) => {
|
||||||
|
Reference in New Issue
Block a user