diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 42dccbf3e1..ad7368f3a9 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -159,8 +159,11 @@ where let root = if root.len() == 1 { root[0] } else { 0 }; if desired_slot.len() == 1 { let slot = desired_slot[0]; - let desired_bank = bank_forks.read().unwrap().get(slot).unwrap().clone(); - let results = bank_method(&desired_bank, hashmap_key); + let results = { + let bank_forks = bank_forks.read().unwrap(); + let desired_bank = bank_forks.get(slot).unwrap(); + bank_method(&desired_bank, hashmap_key) + }; for result in filter_results(results, root) { notifier.notify( Response { diff --git a/core/tests/rpc.rs b/core/tests/rpc.rs index ec5b907513..8dd6d8d01b 100644 --- a/core/tests/rpc.rs +++ b/core/tests/rpc.rs @@ -16,6 +16,7 @@ use solana_sdk::{ commitment_config::CommitmentConfig, hash::Hash, pubkey::Pubkey, + signature::Signer, system_transaction, transaction::{self, Transaction}, }; @@ -24,7 +25,6 @@ use std::{ fs::remove_dir_all, net::UdpSocket, sync::mpsc::channel, - sync::{Arc, Mutex}, thread::sleep, time::{Duration, Instant}, }; @@ -210,9 +210,11 @@ fn test_rpc_subscriptions() { .. } = TestValidator::run(); - // Create transaction signatures to subscribe to let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let transactions: Vec = (0..100) + transactions_socket.connect(leader_data.tpu).unwrap(); + + // Create transaction signatures to subscribe to + let transactions: Vec = (0..1000) .map(|_| system_transaction::transfer(&alice, &Pubkey::new_rand(), 1, genesis_hash)) .collect(); let mut signature_set: HashSet = transactions @@ -220,15 +222,15 @@ fn test_rpc_subscriptions() { .map(|tx| tx.signatures[0].to_string()) .collect(); + // Track when subscriptions are ready + let (ready_sender, ready_receiver) = channel::<()>(); + // Track when status notifications are received + let (status_sender, status_receiver) = channel::<(String, Response>)>(); + // Create the pub sub runtime let mut rt = Runtime::new().unwrap(); let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub); - let (status_sender, status_receiver) = channel::<(String, Response>)>(); - let status_sender = Arc::new(Mutex::new(status_sender)); - let (sent_sender, sent_receiver) = channel::<()>(); - let sent_sender = Arc::new(Mutex::new(sent_sender)); - // Subscribe to all signatures rt.spawn({ let connect = ws::try_connect::(&rpc_pubsub_url).unwrap(); @@ -237,18 +239,12 @@ fn test_rpc_subscriptions() { .and_then(move |client| { for sig in signature_set { let status_sender = status_sender.clone(); - let sent_sender = sent_sender.clone(); tokio::spawn( client .signature_subscribe(sig.clone(), None) .and_then(move |sig_stream| { - sent_sender.lock().unwrap().send(()).unwrap(); sig_stream.for_each(move |result| { - status_sender - .lock() - .unwrap() - .send((sig.clone(), result)) - .unwrap(); + status_sender.send((sig.clone(), result)).unwrap(); future::ok(()) }) }) @@ -257,37 +253,50 @@ fn test_rpc_subscriptions() { }), ); } + tokio::spawn( + client + .slot_subscribe() + .and_then(move |slot_stream| { + slot_stream.for_each(move |_| { + ready_sender.send(()).unwrap(); + future::ok(()) + }) + }) + .map_err(|err| { + eprintln!("slot sub err: {:#?}", err); + }), + ); future::ok(()) }) .map_err(|_| ()) }); // Wait for signature subscriptions - let deadline = Instant::now() + Duration::from_secs(2); - (0..transactions.len()).for_each(|_| { - sent_receiver - .recv_timeout(deadline.saturating_duration_since(Instant::now())) - .unwrap(); - }); + ready_receiver.recv_timeout(Duration::from_secs(2)).unwrap(); let rpc_client = RpcClient::new_socket(leader_data.rpc); - let mut transaction_count = rpc_client - .get_transaction_count_with_commitment(CommitmentConfig::recent()) - .unwrap(); + let mut mint_balance = rpc_client + .get_balance_with_commitment(&alice.pubkey(), CommitmentConfig::recent()) + .unwrap() + .value; + assert!(mint_balance >= transactions.len() as u64); // Send all transactions to tpu socket for processing transactions.iter().for_each(|tx| { transactions_socket - .send_to(&bincode::serialize(&tx).unwrap(), leader_data.tpu) + .send(&bincode::serialize(&tx).unwrap()) .unwrap(); }); + + // Track mint balance to know when transactions have completed let now = Instant::now(); - let expected_transaction_count = transaction_count + transactions.len() as u64; - while transaction_count < expected_transaction_count && now.elapsed() < Duration::from_secs(5) { - transaction_count = rpc_client - .get_transaction_count_with_commitment(CommitmentConfig::recent()) - .unwrap(); - sleep(Duration::from_millis(200)); + let expected_mint_balance = mint_balance - transactions.len() as u64; + while mint_balance != expected_mint_balance && now.elapsed() < Duration::from_secs(5) { + mint_balance = rpc_client + .get_balance_with_commitment(&alice.pubkey(), CommitmentConfig::recent()) + .unwrap() + .value; + sleep(Duration::from_millis(100)); } // Wait for all signature subscriptions @@ -300,12 +309,12 @@ fn test_rpc_subscriptions() { assert!(signature_set.remove(&sig)); } Err(_err) => { - eprintln!( + assert!( + false, "recv_timeout, {}/{} signatures remaining", signature_set.len(), transactions.len() ); - assert!(false) } } }