Backport: add slot to signature notification & respect confirmations param (#9036)

automerge
This commit is contained in:
Tyera Eulberg
2020-03-23 19:32:05 -06:00
committed by GitHub
parent f34ce94347
commit 9eb39df93f
4 changed files with 272 additions and 109 deletions

View File

@ -4,7 +4,7 @@ use crate::rpc_subscriptions::{Confirmations, RpcSubscriptions, SlotInfo};
use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_core::{Error, ErrorCode, Result};
use jsonrpc_derive::rpc; use jsonrpc_derive::rpc;
use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId};
use solana_client::rpc_response::{RpcAccount, RpcKeyedAccount}; use solana_client::rpc_response::{Response as RpcResponse, RpcAccount, RpcKeyedAccount};
use solana_sdk::{pubkey::Pubkey, signature::Signature, transaction}; use solana_sdk::{pubkey::Pubkey, signature::Signature, transaction};
use std::sync::{atomic, Arc}; use std::sync::{atomic, Arc};
@ -26,7 +26,7 @@ pub trait RpcSolPubSub {
fn account_subscribe( fn account_subscribe(
&self, &self,
meta: Self::Metadata, meta: Self::Metadata,
subscriber: Subscriber<RpcAccount>, subscriber: Subscriber<RpcResponse<RpcAccount>>,
pubkey_str: String, pubkey_str: String,
confirmations: Option<Confirmations>, confirmations: Option<Confirmations>,
); );
@ -50,7 +50,7 @@ pub trait RpcSolPubSub {
fn program_subscribe( fn program_subscribe(
&self, &self,
meta: Self::Metadata, meta: Self::Metadata,
subscriber: Subscriber<RpcKeyedAccount>, subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
pubkey_str: String, pubkey_str: String,
confirmations: Option<Confirmations>, confirmations: Option<Confirmations>,
); );
@ -74,7 +74,7 @@ pub trait RpcSolPubSub {
fn signature_subscribe( fn signature_subscribe(
&self, &self,
meta: Self::Metadata, meta: Self::Metadata,
subscriber: Subscriber<transaction::Result<()>>, subscriber: Subscriber<RpcResponse<transaction::Result<()>>>,
signature_str: String, signature_str: String,
confirmations: Option<Confirmations>, confirmations: Option<Confirmations>,
); );
@ -133,7 +133,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
fn account_subscribe( fn account_subscribe(
&self, &self,
_meta: Self::Metadata, _meta: Self::Metadata,
subscriber: Subscriber<RpcAccount>, subscriber: Subscriber<RpcResponse<RpcAccount>>,
pubkey_str: String, pubkey_str: String,
confirmations: Option<Confirmations>, confirmations: Option<Confirmations>,
) { ) {
@ -171,7 +171,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
fn program_subscribe( fn program_subscribe(
&self, &self,
_meta: Self::Metadata, _meta: Self::Metadata,
subscriber: Subscriber<RpcKeyedAccount>, subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
pubkey_str: String, pubkey_str: String,
confirmations: Option<Confirmations>, confirmations: Option<Confirmations>,
) { ) {
@ -209,7 +209,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
fn signature_subscribe( fn signature_subscribe(
&self, &self,
_meta: Self::Metadata, _meta: Self::Metadata,
subscriber: Subscriber<transaction::Result<()>>, subscriber: Subscriber<RpcResponse<transaction::Result<()>>>,
signature_str: String, signature_str: String,
confirmations: Option<Confirmations>, confirmations: Option<Confirmations>,
) { ) {
@ -340,13 +340,18 @@ mod tests {
// Test signature confirmation notification // Test signature confirmation notification
let response = robust_poll_or_panic(receiver); let response = robust_poll_or_panic(receiver);
let expected_res: Option<transaction::Result<()>> = Some(Ok(())); let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
let expected_res_str = let expected = json!({
serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); "jsonrpc": "2.0",
let expected = format!( "method": "signatureNotification",
r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#, "params": {
expected_res_str "result": {
); "context": { "slot": 0 },
assert_eq!(expected, response); "value": expected_res,
},
"subscription": 0,
}
});
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
} }
#[test] #[test]
@ -462,12 +467,15 @@ mod tests {
"method": "accountNotification", "method": "accountNotification",
"params": { "params": {
"result": { "result": {
"context": { "slot": 0 },
"value": {
"owner": budget_program_id.to_string(), "owner": budget_program_id.to_string(),
"lamports": 51, "lamports": 51,
"data": bs58::encode(expected_data).into_string(), "data": bs58::encode(expected_data).into_string(),
"executable": false, "executable": false,
"rentEpoch": 1, "rentEpoch": 1,
}, },
},
"subscription": 0, "subscription": 0,
} }
}); });
@ -609,12 +617,15 @@ mod tests {
"method": "accountNotification", "method": "accountNotification",
"params": { "params": {
"result": { "result": {
"context": { "slot": 0 },
"value": {
"owner": system_program::id().to_string(), "owner": system_program::id().to_string(),
"lamports": 100, "lamports": 100,
"data": "", "data": "",
"executable": false, "executable": false,
"rentEpoch": 1, "rentEpoch": 1,
}, },
},
"subscription": 0, "subscription": 0,
} }
}); });

View File

@ -4,7 +4,7 @@ use core::hash::Hash;
use jsonrpc_core::futures::Future; use jsonrpc_core::futures::Future;
use jsonrpc_pubsub::{typed::Sink, SubscriptionId}; use jsonrpc_pubsub::{typed::Sink, SubscriptionId};
use serde::Serialize; use serde::Serialize;
use solana_client::rpc_response::{RpcAccount, RpcKeyedAccount}; use solana_client::rpc_response::{Response, RpcAccount, RpcKeyedAccount, RpcResponseContext};
use solana_ledger::bank_forks::BankForks; use solana_ledger::bank_forks::BankForks;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::{ use solana_sdk::{
@ -50,11 +50,15 @@ impl std::fmt::Debug for NotificationEntry {
} }
type RpcAccountSubscriptions = type RpcAccountSubscriptions =
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<RpcAccount>, Confirmations)>>>; RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<Response<RpcAccount>>, Confirmations)>>>;
type RpcProgramSubscriptions = type RpcProgramSubscriptions = RwLock<
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<RpcKeyedAccount>, Confirmations)>>>; HashMap<Pubkey, HashMap<SubscriptionId, (Sink<Response<RpcKeyedAccount>>, Confirmations)>>,
>;
type RpcSignatureSubscriptions = RwLock< type RpcSignatureSubscriptions = RwLock<
HashMap<Signature, HashMap<SubscriptionId, (Sink<transaction::Result<()>>, Confirmations)>>, HashMap<
Signature,
HashMap<SubscriptionId, (Sink<Response<transaction::Result<()>>>, Confirmations)>,
>,
>; >;
type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>; type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
@ -105,8 +109,9 @@ where
found found
} }
#[allow(clippy::type_complexity)]
fn check_confirmations_and_notify<K, S, B, F, X>( fn check_confirmations_and_notify<K, S, B, F, X>(
subscriptions: &HashMap<K, HashMap<SubscriptionId, (Sink<S>, Confirmations)>>, subscriptions: &HashMap<K, HashMap<SubscriptionId, (Sink<Response<S>>, Confirmations)>>,
hashmap_key: &K, hashmap_key: &K,
current_slot: Slot, current_slot: Slot,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
@ -131,7 +136,7 @@ where
let mut notified_set: HashSet<SubscriptionId> = HashSet::new(); let mut notified_set: HashSet<SubscriptionId> = HashSet::new();
if let Some(hashmap) = subscriptions.get(hashmap_key) { if let Some(hashmap) = subscriptions.get(hashmap_key) {
for (bank_sub_id, (sink, confirmations)) in hashmap.iter() { for (sub_id, (sink, confirmations)) in hashmap.iter() {
let desired_slot: Vec<u64> = current_ancestors let desired_slot: Vec<u64> = current_ancestors
.iter() .iter()
.filter(|(_, &v)| v == *confirmations) .filter(|(_, &v)| v == *confirmations)
@ -146,16 +151,18 @@ where
.collect(); .collect();
let root = if root.len() == 1 { root[0] } else { 0 }; let root = if root.len() == 1 { root[0] } else { 0 };
if desired_slot.len() == 1 { if desired_slot.len() == 1 {
let desired_bank = bank_forks let slot = desired_slot[0];
.read() let desired_bank = bank_forks.read().unwrap().get(slot).unwrap().clone();
.unwrap()
.get(desired_slot[0])
.unwrap()
.clone();
let results = bank_method(&desired_bank, hashmap_key); let results = bank_method(&desired_bank, hashmap_key);
for result in filter_results(results, root) { for result in filter_results(results, root) {
notifier.notify(result, sink); notifier.notify(
notified_set.insert(bank_sub_id.clone()); Response {
context: RpcResponseContext { slot },
value: result,
},
sink,
);
notified_set.insert(sub_id.clone());
} }
} }
} }
@ -337,7 +344,7 @@ impl RpcSubscriptions {
signature, signature,
current_slot, current_slot,
bank_forks, bank_forks,
Bank::get_signature_status, Bank::get_signature_status_processed_since_parent,
filter_signature_result, filter_signature_result,
notifier, notifier,
); );
@ -354,7 +361,7 @@ impl RpcSubscriptions {
pubkey: &Pubkey, pubkey: &Pubkey,
confirmations: Option<Confirmations>, confirmations: Option<Confirmations>,
sub_id: &SubscriptionId, sub_id: &SubscriptionId,
sink: &Sink<RpcAccount>, sink: &Sink<Response<RpcAccount>>,
) { ) {
let mut subscriptions = self.account_subscriptions.write().unwrap(); let mut subscriptions = self.account_subscriptions.write().unwrap();
add_subscription(&mut subscriptions, pubkey, confirmations, sub_id, sink); add_subscription(&mut subscriptions, pubkey, confirmations, sub_id, sink);
@ -370,7 +377,7 @@ impl RpcSubscriptions {
program_id: &Pubkey, program_id: &Pubkey,
confirmations: Option<Confirmations>, confirmations: Option<Confirmations>,
sub_id: &SubscriptionId, sub_id: &SubscriptionId,
sink: &Sink<RpcKeyedAccount>, sink: &Sink<Response<RpcKeyedAccount>>,
) { ) {
let mut subscriptions = self.program_subscriptions.write().unwrap(); let mut subscriptions = self.program_subscriptions.write().unwrap();
add_subscription(&mut subscriptions, program_id, confirmations, sub_id, sink); add_subscription(&mut subscriptions, program_id, confirmations, sub_id, sink);
@ -386,7 +393,7 @@ impl RpcSubscriptions {
signature: &Signature, signature: &Signature,
confirmations: Option<Confirmations>, confirmations: Option<Confirmations>,
sub_id: &SubscriptionId, sub_id: &SubscriptionId,
sink: &Sink<transaction::Result<()>>, sink: &Sink<Response<transaction::Result<()>>>,
) { ) {
let mut subscriptions = self.signature_subscriptions.write().unwrap(); let mut subscriptions = self.signature_subscriptions.write().unwrap();
add_subscription(&mut subscriptions, signature, confirmations, sub_id, sink); add_subscription(&mut subscriptions, signature, confirmations, sub_id, sink);
@ -606,10 +613,24 @@ pub(crate) mod tests {
subscriptions.notify_subscribers(0, &bank_forks); subscriptions.notify_subscribers(0, &bank_forks);
let response = robust_poll_or_panic(transport_receiver); let response = robust_poll_or_panic(transport_receiver);
let expected = format!( let expected = json!({
r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"subscription":0}}}}"# "jsonrpc": "2.0",
); "method": "accountNotification",
assert_eq!(expected, response); "params": {
"result": {
"context": { "slot": 0 },
"value": {
"data": "1111111111111111",
"executable": false,
"lamports": 1,
"owner": "Budget1111111111111111111111111111111111111",
"rentEpoch": 1,
},
},
"subscription": 0,
}
});
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
subscriptions.remove_account_subscription(&sub_id); subscriptions.remove_account_subscription(&sub_id);
assert!(!subscriptions assert!(!subscriptions
@ -662,11 +683,27 @@ pub(crate) mod tests {
subscriptions.notify_subscribers(0, &bank_forks); subscriptions.notify_subscribers(0, &bank_forks);
let response = robust_poll_or_panic(transport_receiver); let response = robust_poll_or_panic(transport_receiver);
let expected = format!( let expected = json!({
r#"{{"jsonrpc":"2.0","method":"programNotification","params":{{"result":{{"account":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"pubkey":"{:?}"}},"subscription":0}}}}"#, "jsonrpc": "2.0",
alice.pubkey() "method": "programNotification",
); "params": {
assert_eq!(expected, response); "result": {
"context": { "slot": 0 },
"value": {
"account": {
"data": "1111111111111111",
"executable": false,
"lamports": 1,
"owner": "Budget1111111111111111111111111111111111111",
"rentEpoch": 1,
},
"pubkey": alice.pubkey().to_string(),
},
},
"subscription": 0,
}
});
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
subscriptions.remove_program_subscription(&sub_id); subscriptions.remove_program_subscription(&sub_id);
assert!(!subscriptions assert!(!subscriptions
@ -685,66 +722,122 @@ pub(crate) mod tests {
} = create_genesis_config(100); } = create_genesis_config(100);
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let mut bank_forks = BankForks::new(0, bank);
let alice = Keypair::new(); let alice = Keypair::new();
let tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 20, blockhash);
let signature = tx.signatures[0]; let past_bank_tx =
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, blockhash);
let unprocessed_tx = let unprocessed_tx =
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 10, blockhash); system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, blockhash);
let not_ready_signature = unprocessed_tx.signatures[0]; let processed_tx =
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 3, blockhash);
bank_forks bank_forks
.write()
.unwrap()
.get(0) .get(0)
.unwrap() .unwrap()
.process_transaction(&tx) .process_transaction(&past_bank_tx)
.unwrap(); .unwrap();
let next_bank =
Bank::new_from_parent(&bank_forks.banks[&0].clone(), &Pubkey::new_rand(), 1);
bank_forks.insert(next_bank);
bank_forks
.get(1)
.unwrap()
.process_transaction(&processed_tx)
.unwrap();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let (subscriber, _id_receiver, transport_receiver) = let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("signatureNotification"); Subscriber::new_test("signatureNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sink = subscriber.assign_id(SubscriptionId::Number(0)).unwrap();
let remaining_sub_id = SubscriptionId::Number(1 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit); let subscriptions = RpcSubscriptions::new(&exit);
subscriptions.add_signature_subscription(&signature, None, &sub_id, &sink.clone());
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
&not_ready_signature, &past_bank_tx.signatures[0],
None, Some(0),
&remaining_sub_id, &SubscriptionId::Number(1 as u64),
&sink.clone(),
);
subscriptions.add_signature_subscription(
&processed_tx.signatures[0],
Some(0),
&SubscriptionId::Number(2 as u64),
&sink.clone(),
);
subscriptions.add_signature_subscription(
&unprocessed_tx.signatures[0],
Some(0),
&SubscriptionId::Number(3 as u64),
&sink.clone(), &sink.clone(),
); );
{ {
let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); let sig_subs = subscriptions.signature_subscriptions.read().unwrap();
assert!(sig_subs.contains_key(&signature)); assert!(sig_subs.contains_key(&past_bank_tx.signatures[0]));
assert!(sig_subs.contains_key(&not_ready_signature)); assert!(sig_subs.contains_key(&unprocessed_tx.signatures[0]));
assert!(sig_subs.contains_key(&processed_tx.signatures[0]));
} }
subscriptions.notify_subscribers(0, &bank_forks); subscriptions.notify_subscribers(1, &bank_forks);
let response = robust_poll_or_panic(transport_receiver); let response = robust_poll_or_panic(transport_receiver);
let expected_res: Option<transaction::Result<()>> = Some(Ok(())); let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
let expected_res_str = let expected = json!({
serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); "jsonrpc": "2.0",
let expected = format!( "method": "signatureNotification",
r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#, "params": {
expected_res_str "result": {
); "context": { "slot": 1 },
assert_eq!(expected, response); "value": expected_res,
},
"subscription": 0,
}
});
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
let sig_subs = subscriptions.signature_subscriptions.read().unwrap();
// Subscription should be automatically removed after notification // Subscription should be automatically removed after notification
assert!(!subscriptions assert!(!sig_subs.contains_key(&processed_tx.signatures[0]));
.signature_subscriptions
.read() // Only one notification is expected for signature processed in previous bank
.unwrap() assert_eq!(sig_subs.get(&past_bank_tx.signatures[0]).unwrap().len(), 1);
.contains_key(&signature));
// Unprocessed signature subscription should not be removed // Unprocessed signature subscription should not be removed
assert!(subscriptions assert_eq!(
.signature_subscriptions sig_subs.get(&unprocessed_tx.signatures[0]).unwrap().len(),
.read() 1
.unwrap() );
.contains_key(&not_ready_signature));
let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("signatureNotification");
let sink = subscriber.assign_id(SubscriptionId::Number(0)).unwrap();
let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit);
subscriptions.add_signature_subscription(
&past_bank_tx.signatures[0],
Some(1),
&SubscriptionId::Number(1 as u64),
&sink.clone(),
);
subscriptions.notify_subscribers(1, &bank_forks);
let response = robust_poll_or_panic(transport_receiver);
let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
let expected = json!({
"jsonrpc": "2.0",
"method": "signatureNotification",
"params": {
"result": {
"context": { "slot": 0 },
"value": expected_res,
},
"subscription": 0,
}
});
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
} }
#[test] #[test]

View File

@ -7,9 +7,18 @@ use jsonrpc_core_client::transports::ws;
use log::*; use log::*;
use reqwest::{self, header::CONTENT_TYPE}; use reqwest::{self, header::CONTENT_TYPE};
use serde_json::{json, Value}; use serde_json::{json, Value};
use solana_client::rpc_client::get_rpc_request_str; use solana_client::{
rpc_client::{get_rpc_request_str, RpcClient},
rpc_response::Response,
};
use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, validator::TestValidator}; use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, validator::TestValidator};
use solana_sdk::{hash::Hash, pubkey::Pubkey, system_transaction, transaction}; use solana_sdk::{
commitment_config::CommitmentConfig,
hash::Hash,
pubkey::Pubkey,
system_transaction,
transaction::{self, Transaction},
};
use std::{ use std::{
collections::HashSet, collections::HashSet,
fs::remove_dir_all, fs::remove_dir_all,
@ -17,8 +26,7 @@ use std::{
sync::mpsc::channel, sync::mpsc::channel,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
thread::sleep, thread::sleep,
time::Duration, time::{Duration, Instant},
time::SystemTime,
}; };
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
@ -204,35 +212,43 @@ fn test_rpc_subscriptions() {
// Create transaction signatures to subscribe to // Create transaction signatures to subscribe to
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut signature_set: HashSet<String> = (0..1000) let transactions: Vec<Transaction> = (0..500)
.map(|_| { .map(|_| system_transaction::transfer(&alice, &Pubkey::new_rand(), 1, genesis_hash))
let tx = system_transaction::transfer(&alice, &Pubkey::new_rand(), 1, genesis_hash); .collect();
transactions_socket let mut signature_set: HashSet<String> = transactions
.send_to(&bincode::serialize(&tx).unwrap(), leader_data.tpu) .iter()
.unwrap(); .map(|tx| tx.signatures[0].to_string())
tx.signatures[0].to_string()
})
.collect(); .collect();
// Create the pub sub runtime // Create the pub sub runtime
let mut rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub); let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub);
let (sender, receiver) = channel::<(String, transaction::Result<()>)>();
let sender = Arc::new(Mutex::new(sender));
let (status_sender, status_receiver) = channel::<(String, Response<transaction::Result<()>>)>();
let status_sender = Arc::new(Mutex::new(status_sender));
let (sent_sender, sent_receiver) = channel::<()>();
let sent_sender = Arc::new(Mutex::new(sent_sender));
// Subscribe to all signatures
rt.spawn({ rt.spawn({
let connect = ws::try_connect::<PubsubClient>(&rpc_pubsub_url).unwrap(); let connect = ws::try_connect::<PubsubClient>(&rpc_pubsub_url).unwrap();
let signature_set = signature_set.clone(); let signature_set = signature_set.clone();
connect connect
.and_then(move |client| { .and_then(move |client| {
for sig in signature_set { for sig in signature_set {
let sender = sender.clone(); let status_sender = status_sender.clone();
let sent_sender = sent_sender.clone();
tokio::spawn( tokio::spawn(
client client
.signature_subscribe(sig.clone(), None) .signature_subscribe(sig.clone(), None)
.and_then(move |sig_stream| { .and_then(move |sig_stream| {
sent_sender.lock().unwrap().send(()).unwrap();
sig_stream.for_each(move |result| { sig_stream.for_each(move |result| {
sender.lock().unwrap().send((sig.clone(), result)).unwrap(); status_sender
.lock()
.unwrap()
.send((sig.clone(), result))
.unwrap();
future::ok(()) future::ok(())
}) })
}) })
@ -246,18 +262,49 @@ fn test_rpc_subscriptions() {
.map_err(|_| ()) .map_err(|_| ())
}); });
// Wait for signature subscriptions
let deadline = Instant::now() + Duration::from_secs(2);
(0..transactions.len()).for_each(|_| {
sent_receiver
.recv_timeout(deadline.saturating_duration_since(Instant::now()))
.unwrap();
});
let rpc_client = RpcClient::new_socket(leader_data.rpc);
let transaction_count = rpc_client
.get_transaction_count_with_commitment(CommitmentConfig::recent())
.unwrap();
// Send all transactions to tpu socket for processing
transactions.iter().for_each(|tx| {
transactions_socket
.send_to(&bincode::serialize(&tx).unwrap(), leader_data.tpu)
.unwrap();
});
let mut x = 0;
let now = Instant::now();
while x < transaction_count + 500 || now.elapsed() > Duration::from_secs(5) {
x = rpc_client
.get_transaction_count_with_commitment(CommitmentConfig::recent())
.unwrap();
sleep(Duration::from_millis(200));
}
// Wait for all signature subscriptions // Wait for all signature subscriptions
let now = SystemTime::now(); let deadline = Instant::now() + Duration::from_secs(5);
let timeout = Duration::from_secs(5);
while !signature_set.is_empty() { while !signature_set.is_empty() {
assert!(now.elapsed().unwrap() < timeout); let timeout = deadline.saturating_duration_since(Instant::now());
match receiver.recv_timeout(Duration::from_secs(1)) { match status_receiver.recv_timeout(timeout) {
Ok((sig, result)) => { Ok((sig, result)) => {
assert!(result.is_ok()); assert!(result.value.is_ok());
assert!(signature_set.remove(&sig)); assert!(signature_set.remove(&sig));
} }
Err(_err) => { Err(_err) => {
eprintln!("unexpected receive timeout"); eprintln!(
"recv_timeout, {}/{} signatures remaining",
signature_set.len(),
transactions.len()
);
assert!(false) assert!(false)
} }
} }

View File

@ -1817,6 +1817,18 @@ impl Bank {
.fetch_add(signature_count, Ordering::Relaxed); .fetch_add(signature_count, Ordering::Relaxed);
} }
pub fn get_signature_status_processed_since_parent(
&self,
signature: &Signature,
) -> Option<Result<()>> {
if let Some(status) = self.get_signature_confirmation_status(signature) {
if status.slot == self.slot() {
return Some(status.status);
}
}
None
}
pub fn get_signature_confirmation_status( pub fn get_signature_confirmation_status(
&self, &self,
signature: &Signature, signature: &Signature,