diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 4da5cf216a..7bc6cb6d47 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -344,7 +344,7 @@ impl RpcSubscriptions { signature, current_slot, bank_forks, - Bank::get_signature_status, + Bank::get_signature_status_processed_since_parent, filter_signature_result, notifier, ); @@ -726,43 +726,108 @@ pub(crate) mod tests { } = create_genesis_config(100); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); - let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let mut bank_forks = BankForks::new(0, bank); let alice = Keypair::new(); - let tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 20, blockhash); - let signature = tx.signatures[0]; + + let past_bank_tx = + system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, blockhash); let unprocessed_tx = - system_transaction::transfer(&mint_keypair, &alice.pubkey(), 10, blockhash); - let not_ready_signature = unprocessed_tx.signatures[0]; + system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, blockhash); + let processed_tx = + system_transaction::transfer(&mint_keypair, &alice.pubkey(), 3, blockhash); + bank_forks - .write() - .unwrap() .get(0) .unwrap() - .process_transaction(&tx) + .process_transaction(&past_bank_tx) .unwrap(); + let next_bank = + Bank::new_from_parent(&bank_forks.banks[&0].clone(), &Pubkey::new_rand(), 1); + bank_forks.insert(next_bank); + + bank_forks + .get(1) + .unwrap() + .process_transaction(&processed_tx) + .unwrap(); + + let bank_forks = Arc::new(RwLock::new(bank_forks)); + let (subscriber, _id_receiver, transport_receiver) = Subscriber::new_test("signatureNotification"); - let sub_id = SubscriptionId::Number(0 as u64); - let remaining_sub_id = SubscriptionId::Number(1 as u64); - let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let sink = subscriber.assign_id(SubscriptionId::Number(0)).unwrap(); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new(&exit); - subscriptions.add_signature_subscription(&signature, None, &sub_id, &sink.clone()); subscriptions.add_signature_subscription( - ¬_ready_signature, - None, - &remaining_sub_id, + &past_bank_tx.signatures[0], + Some(0), + &SubscriptionId::Number(1 as u64), + &sink.clone(), + ); + subscriptions.add_signature_subscription( + &processed_tx.signatures[0], + Some(0), + &SubscriptionId::Number(2 as u64), + &sink.clone(), + ); + subscriptions.add_signature_subscription( + &unprocessed_tx.signatures[0], + Some(0), + &SubscriptionId::Number(3 as u64), &sink.clone(), ); { let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); - assert!(sig_subs.contains_key(&signature)); - assert!(sig_subs.contains_key(¬_ready_signature)); + assert!(sig_subs.contains_key(&past_bank_tx.signatures[0])); + assert!(sig_subs.contains_key(&unprocessed_tx.signatures[0])); + assert!(sig_subs.contains_key(&processed_tx.signatures[0])); } - subscriptions.notify_subscribers(0, &bank_forks); + subscriptions.notify_subscribers(1, &bank_forks); + let response = robust_poll_or_panic(transport_receiver); + let expected_res: Option> = Some(Ok(())); + let expected = json!({ + "jsonrpc": "2.0", + "method": "signatureNotification", + "params": { + "result": { + "context": { "slot": 1 }, + "value": expected_res, + }, + "subscription": 0, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + + let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); + + // Subscription should be automatically removed after notification + assert!(!sig_subs.contains_key(&processed_tx.signatures[0])); + + // Only one notification is expected for signature processed in previous bank + assert_eq!(sig_subs.get(&past_bank_tx.signatures[0]).unwrap().len(), 1); + + // Unprocessed signature subscription should not be removed + assert_eq!( + sig_subs.get(&unprocessed_tx.signatures[0]).unwrap().len(), + 1 + ); + + let (subscriber, _id_receiver, transport_receiver) = + Subscriber::new_test("signatureNotification"); + let sink = subscriber.assign_id(SubscriptionId::Number(0)).unwrap(); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = RpcSubscriptions::new(&exit); + + subscriptions.add_signature_subscription( + &past_bank_tx.signatures[0], + Some(1), + &SubscriptionId::Number(1 as u64), + &sink.clone(), + ); + subscriptions.notify_subscribers(1, &bank_forks); let response = robust_poll_or_panic(transport_receiver); let expected_res: Option> = Some(Ok(())); let expected = json!({ @@ -777,20 +842,6 @@ pub(crate) mod tests { } }); assert_eq!(serde_json::to_string(&expected).unwrap(), response); - - // Subscription should be automatically removed after notification - assert!(!subscriptions - .signature_subscriptions - .read() - .unwrap() - .contains_key(&signature)); - - // Unprocessed signature subscription should not be removed - assert!(subscriptions - .signature_subscriptions - .read() - .unwrap() - .contains_key(¬_ready_signature)); } #[test] diff --git a/core/tests/rpc.rs b/core/tests/rpc.rs index 866b2f8e6a..e4b594aa27 100644 --- a/core/tests/rpc.rs +++ b/core/tests/rpc.rs @@ -7,9 +7,18 @@ use jsonrpc_core_client::transports::ws; use log::*; use reqwest::{self, header::CONTENT_TYPE}; use serde_json::{json, Value}; -use solana_client::{rpc_client::get_rpc_request_str, rpc_response::Response}; +use solana_client::{ + rpc_client::{get_rpc_request_str, RpcClient}, + rpc_response::Response, +}; use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, validator::TestValidator}; -use solana_sdk::{hash::Hash, pubkey::Pubkey, system_transaction, transaction}; +use solana_sdk::{ + commitment_config::CommitmentConfig, + hash::Hash, + pubkey::Pubkey, + system_transaction, + transaction::{self, Transaction}, +}; use std::{ collections::HashSet, fs::remove_dir_all, @@ -17,8 +26,7 @@ use std::{ sync::mpsc::channel, sync::{Arc, Mutex}, thread::sleep, - time::Duration, - time::SystemTime, + time::{Duration, Instant}, }; use tokio::runtime::Runtime; @@ -204,35 +212,43 @@ fn test_rpc_subscriptions() { // Create transaction signatures to subscribe to let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut signature_set: HashSet = (0..1000) - .map(|_| { - let tx = system_transaction::transfer(&alice, &Pubkey::new_rand(), 1, genesis_hash); - transactions_socket - .send_to(&bincode::serialize(&tx).unwrap(), leader_data.tpu) - .unwrap(); - tx.signatures[0].to_string() - }) + let transactions: Vec = (0..500) + .map(|_| system_transaction::transfer(&alice, &Pubkey::new_rand(), 1, genesis_hash)) + .collect(); + let mut signature_set: HashSet = transactions + .iter() + .map(|tx| tx.signatures[0].to_string()) .collect(); // Create the pub sub runtime let mut rt = Runtime::new().unwrap(); let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub); - let (sender, receiver) = channel::<(String, Response>)>(); - let sender = Arc::new(Mutex::new(sender)); + let (status_sender, status_receiver) = channel::<(String, Response>)>(); + let status_sender = Arc::new(Mutex::new(status_sender)); + let (sent_sender, sent_receiver) = channel::<()>(); + let sent_sender = Arc::new(Mutex::new(sent_sender)); + + // Subscribe to all signatures rt.spawn({ let connect = ws::try_connect::(&rpc_pubsub_url).unwrap(); let signature_set = signature_set.clone(); connect .and_then(move |client| { for sig in signature_set { - let sender = sender.clone(); + let status_sender = status_sender.clone(); + let sent_sender = sent_sender.clone(); tokio::spawn( client .signature_subscribe(sig.clone(), None) .and_then(move |sig_stream| { + sent_sender.lock().unwrap().send(()).unwrap(); sig_stream.for_each(move |result| { - sender.lock().unwrap().send((sig.clone(), result)).unwrap(); + status_sender + .lock() + .unwrap() + .send((sig.clone(), result)) + .unwrap(); future::ok(()) }) }) @@ -246,18 +262,49 @@ fn test_rpc_subscriptions() { .map_err(|_| ()) }); + // Wait for signature subscriptions + let deadline = Instant::now() + Duration::from_secs(2); + (0..transactions.len()).for_each(|_| { + sent_receiver + .recv_timeout(deadline.saturating_duration_since(Instant::now())) + .unwrap(); + }); + + let rpc_client = RpcClient::new_socket(leader_data.rpc); + let transaction_count = rpc_client + .get_transaction_count_with_commitment(CommitmentConfig::recent()) + .unwrap(); + + // Send all transactions to tpu socket for processing + transactions.iter().for_each(|tx| { + transactions_socket + .send_to(&bincode::serialize(&tx).unwrap(), leader_data.tpu) + .unwrap(); + }); + let mut x = 0; + let now = Instant::now(); + while x < transaction_count + 500 || now.elapsed() > Duration::from_secs(5) { + x = rpc_client + .get_transaction_count_with_commitment(CommitmentConfig::recent()) + .unwrap(); + sleep(Duration::from_millis(200)); + } + // Wait for all signature subscriptions - let now = SystemTime::now(); - let timeout = Duration::from_secs(5); + let deadline = Instant::now() + Duration::from_secs(5); while !signature_set.is_empty() { - assert!(now.elapsed().unwrap() < timeout); - match receiver.recv_timeout(Duration::from_secs(1)) { + let timeout = deadline.saturating_duration_since(Instant::now()); + match status_receiver.recv_timeout(timeout) { Ok((sig, result)) => { assert!(result.value.is_ok()); assert!(signature_set.remove(&sig)); } Err(_err) => { - eprintln!("unexpected receive timeout"); + eprintln!( + "recv_timeout, {}/{} signatures remaining", + signature_set.len(), + transactions.len() + ); assert!(false) } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 01e0dd2f2b..88839af5d6 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1823,6 +1823,18 @@ impl Bank { .fetch_add(signature_count, Ordering::Relaxed); } + pub fn get_signature_status_processed_since_parent( + &self, + signature: &Signature, + ) -> Option> { + if let Some(status) = self.get_signature_confirmation_status(signature) { + if status.slot == self.slot() { + return Some(status.status); + } + } + None + } + pub fn get_signature_confirmation_status( &self, signature: &Signature,