From 9eb39df93fd22e542beff1511aa427fe672b03f3 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 23 Mar 2020 19:32:05 -0600 Subject: [PATCH] Backport: add slot to signature notification & respect confirmations param (#9036) automerge --- core/src/rpc_pubsub.rs | 59 +++++---- core/src/rpc_subscriptions.rs | 219 ++++++++++++++++++++++++---------- core/tests/rpc.rs | 91 ++++++++++---- runtime/src/bank.rs | 12 ++ 4 files changed, 272 insertions(+), 109 deletions(-) diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 0288b3e074..000efd3eaf 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -4,7 +4,7 @@ use crate::rpc_subscriptions::{Confirmations, RpcSubscriptions, SlotInfo}; use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; -use solana_client::rpc_response::{RpcAccount, RpcKeyedAccount}; +use solana_client::rpc_response::{Response as RpcResponse, RpcAccount, RpcKeyedAccount}; use solana_sdk::{pubkey::Pubkey, signature::Signature, transaction}; use std::sync::{atomic, Arc}; @@ -26,7 +26,7 @@ pub trait RpcSolPubSub { fn account_subscribe( &self, meta: Self::Metadata, - subscriber: Subscriber, + subscriber: Subscriber>, pubkey_str: String, confirmations: Option, ); @@ -50,7 +50,7 @@ pub trait RpcSolPubSub { fn program_subscribe( &self, meta: Self::Metadata, - subscriber: Subscriber, + subscriber: Subscriber>, pubkey_str: String, confirmations: Option, ); @@ -74,7 +74,7 @@ pub trait RpcSolPubSub { fn signature_subscribe( &self, meta: Self::Metadata, - subscriber: Subscriber>, + subscriber: Subscriber>>, signature_str: String, confirmations: Option, ); @@ -133,7 +133,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { fn account_subscribe( &self, _meta: Self::Metadata, - subscriber: Subscriber, + subscriber: Subscriber>, pubkey_str: String, confirmations: Option, ) { @@ -171,7 +171,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { fn program_subscribe( &self, _meta: Self::Metadata, - subscriber: Subscriber, + subscriber: Subscriber>, pubkey_str: String, confirmations: Option, ) { @@ -209,7 +209,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { fn signature_subscribe( &self, _meta: Self::Metadata, - subscriber: Subscriber>, + subscriber: Subscriber>>, signature_str: String, confirmations: Option, ) { @@ -340,13 +340,18 @@ mod tests { // Test signature confirmation notification let response = robust_poll_or_panic(receiver); let expected_res: Option> = Some(Ok(())); - let expected_res_str = - serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#, - expected_res_str - ); - assert_eq!(expected, response); + let expected = json!({ + "jsonrpc": "2.0", + "method": "signatureNotification", + "params": { + "result": { + "context": { "slot": 0 }, + "value": expected_res, + }, + "subscription": 0, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); } #[test] @@ -462,11 +467,14 @@ mod tests { "method": "accountNotification", "params": { "result": { - "owner": budget_program_id.to_string(), - "lamports": 51, - "data": bs58::encode(expected_data).into_string(), - "executable": false, - "rentEpoch": 1, + "context": { "slot": 0 }, + "value": { + "owner": budget_program_id.to_string(), + "lamports": 51, + "data": bs58::encode(expected_data).into_string(), + "executable": false, + "rentEpoch": 1, + }, }, "subscription": 0, } @@ -609,11 +617,14 @@ mod tests { "method": "accountNotification", "params": { "result": { - "owner": system_program::id().to_string(), - "lamports": 100, - "data": "", - "executable": false, - "rentEpoch": 1, + "context": { "slot": 0 }, + "value": { + "owner": system_program::id().to_string(), + "lamports": 100, + "data": "", + "executable": false, + "rentEpoch": 1, + }, }, "subscription": 0, } diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index cc33444295..6d8e1f38b4 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -4,7 +4,7 @@ use core::hash::Hash; use jsonrpc_core::futures::Future; use jsonrpc_pubsub::{typed::Sink, SubscriptionId}; use serde::Serialize; -use solana_client::rpc_response::{RpcAccount, RpcKeyedAccount}; +use solana_client::rpc_response::{Response, RpcAccount, RpcKeyedAccount, RpcResponseContext}; use solana_ledger::bank_forks::BankForks; use solana_runtime::bank::Bank; use solana_sdk::{ @@ -50,11 +50,15 @@ impl std::fmt::Debug for NotificationEntry { } type RpcAccountSubscriptions = - RwLock, Confirmations)>>>; -type RpcProgramSubscriptions = - RwLock, Confirmations)>>>; + RwLock>, Confirmations)>>>; +type RpcProgramSubscriptions = RwLock< + HashMap>, Confirmations)>>, +>; type RpcSignatureSubscriptions = RwLock< - HashMap>, Confirmations)>>, + HashMap< + Signature, + HashMap>>, Confirmations)>, + >, >; type RpcSlotSubscriptions = RwLock>>; @@ -105,8 +109,9 @@ where found } +#[allow(clippy::type_complexity)] fn check_confirmations_and_notify( - subscriptions: &HashMap, Confirmations)>>, + subscriptions: &HashMap>, Confirmations)>>, hashmap_key: &K, current_slot: Slot, bank_forks: &Arc>, @@ -131,7 +136,7 @@ where let mut notified_set: HashSet = HashSet::new(); if let Some(hashmap) = subscriptions.get(hashmap_key) { - for (bank_sub_id, (sink, confirmations)) in hashmap.iter() { + for (sub_id, (sink, confirmations)) in hashmap.iter() { let desired_slot: Vec = current_ancestors .iter() .filter(|(_, &v)| v == *confirmations) @@ -146,16 +151,18 @@ where .collect(); let root = if root.len() == 1 { root[0] } else { 0 }; if desired_slot.len() == 1 { - let desired_bank = bank_forks - .read() - .unwrap() - .get(desired_slot[0]) - .unwrap() - .clone(); + let slot = desired_slot[0]; + let desired_bank = bank_forks.read().unwrap().get(slot).unwrap().clone(); let results = bank_method(&desired_bank, hashmap_key); for result in filter_results(results, root) { - notifier.notify(result, sink); - notified_set.insert(bank_sub_id.clone()); + notifier.notify( + Response { + context: RpcResponseContext { slot }, + value: result, + }, + sink, + ); + notified_set.insert(sub_id.clone()); } } } @@ -337,7 +344,7 @@ impl RpcSubscriptions { signature, current_slot, bank_forks, - Bank::get_signature_status, + Bank::get_signature_status_processed_since_parent, filter_signature_result, notifier, ); @@ -354,7 +361,7 @@ impl RpcSubscriptions { pubkey: &Pubkey, confirmations: Option, sub_id: &SubscriptionId, - sink: &Sink, + sink: &Sink>, ) { let mut subscriptions = self.account_subscriptions.write().unwrap(); add_subscription(&mut subscriptions, pubkey, confirmations, sub_id, sink); @@ -370,7 +377,7 @@ impl RpcSubscriptions { program_id: &Pubkey, confirmations: Option, sub_id: &SubscriptionId, - sink: &Sink, + sink: &Sink>, ) { let mut subscriptions = self.program_subscriptions.write().unwrap(); add_subscription(&mut subscriptions, program_id, confirmations, sub_id, sink); @@ -386,7 +393,7 @@ impl RpcSubscriptions { signature: &Signature, confirmations: Option, sub_id: &SubscriptionId, - sink: &Sink>, + sink: &Sink>>, ) { let mut subscriptions = self.signature_subscriptions.write().unwrap(); add_subscription(&mut subscriptions, signature, confirmations, sub_id, sink); @@ -606,10 +613,24 @@ pub(crate) mod tests { subscriptions.notify_subscribers(0, &bank_forks); let response = robust_poll_or_panic(transport_receiver); - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"subscription":0}}}}"# - ); - assert_eq!(expected, response); + let expected = json!({ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "context": { "slot": 0 }, + "value": { + "data": "1111111111111111", + "executable": false, + "lamports": 1, + "owner": "Budget1111111111111111111111111111111111111", + "rentEpoch": 1, + }, + }, + "subscription": 0, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); subscriptions.remove_account_subscription(&sub_id); assert!(!subscriptions @@ -662,11 +683,27 @@ pub(crate) mod tests { subscriptions.notify_subscribers(0, &bank_forks); let response = robust_poll_or_panic(transport_receiver); - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"programNotification","params":{{"result":{{"account":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"pubkey":"{:?}"}},"subscription":0}}}}"#, - alice.pubkey() - ); - assert_eq!(expected, response); + let expected = json!({ + "jsonrpc": "2.0", + "method": "programNotification", + "params": { + "result": { + "context": { "slot": 0 }, + "value": { + "account": { + "data": "1111111111111111", + "executable": false, + "lamports": 1, + "owner": "Budget1111111111111111111111111111111111111", + "rentEpoch": 1, + }, + "pubkey": alice.pubkey().to_string(), + }, + }, + "subscription": 0, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); subscriptions.remove_program_subscription(&sub_id); assert!(!subscriptions @@ -685,66 +722,122 @@ pub(crate) mod tests { } = create_genesis_config(100); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); - let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let mut bank_forks = BankForks::new(0, bank); let alice = Keypair::new(); - let tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 20, blockhash); - let signature = tx.signatures[0]; + + let past_bank_tx = + system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, blockhash); let unprocessed_tx = - system_transaction::transfer(&mint_keypair, &alice.pubkey(), 10, blockhash); - let not_ready_signature = unprocessed_tx.signatures[0]; + system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, blockhash); + let processed_tx = + system_transaction::transfer(&mint_keypair, &alice.pubkey(), 3, blockhash); + bank_forks - .write() - .unwrap() .get(0) .unwrap() - .process_transaction(&tx) + .process_transaction(&past_bank_tx) .unwrap(); + let next_bank = + Bank::new_from_parent(&bank_forks.banks[&0].clone(), &Pubkey::new_rand(), 1); + bank_forks.insert(next_bank); + + bank_forks + .get(1) + .unwrap() + .process_transaction(&processed_tx) + .unwrap(); + + let bank_forks = Arc::new(RwLock::new(bank_forks)); + let (subscriber, _id_receiver, transport_receiver) = Subscriber::new_test("signatureNotification"); - let sub_id = SubscriptionId::Number(0 as u64); - let remaining_sub_id = SubscriptionId::Number(1 as u64); - let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let sink = subscriber.assign_id(SubscriptionId::Number(0)).unwrap(); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new(&exit); - subscriptions.add_signature_subscription(&signature, None, &sub_id, &sink.clone()); subscriptions.add_signature_subscription( - ¬_ready_signature, - None, - &remaining_sub_id, + &past_bank_tx.signatures[0], + Some(0), + &SubscriptionId::Number(1 as u64), + &sink.clone(), + ); + subscriptions.add_signature_subscription( + &processed_tx.signatures[0], + Some(0), + &SubscriptionId::Number(2 as u64), + &sink.clone(), + ); + subscriptions.add_signature_subscription( + &unprocessed_tx.signatures[0], + Some(0), + &SubscriptionId::Number(3 as u64), &sink.clone(), ); { let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); - assert!(sig_subs.contains_key(&signature)); - assert!(sig_subs.contains_key(¬_ready_signature)); + assert!(sig_subs.contains_key(&past_bank_tx.signatures[0])); + assert!(sig_subs.contains_key(&unprocessed_tx.signatures[0])); + assert!(sig_subs.contains_key(&processed_tx.signatures[0])); } - subscriptions.notify_subscribers(0, &bank_forks); + subscriptions.notify_subscribers(1, &bank_forks); let response = robust_poll_or_panic(transport_receiver); let expected_res: Option> = Some(Ok(())); - let expected_res_str = - serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); - let expected = format!( - r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#, - expected_res_str - ); - assert_eq!(expected, response); + let expected = json!({ + "jsonrpc": "2.0", + "method": "signatureNotification", + "params": { + "result": { + "context": { "slot": 1 }, + "value": expected_res, + }, + "subscription": 0, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + + let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); // Subscription should be automatically removed after notification - assert!(!subscriptions - .signature_subscriptions - .read() - .unwrap() - .contains_key(&signature)); + assert!(!sig_subs.contains_key(&processed_tx.signatures[0])); + + // Only one notification is expected for signature processed in previous bank + assert_eq!(sig_subs.get(&past_bank_tx.signatures[0]).unwrap().len(), 1); // Unprocessed signature subscription should not be removed - assert!(subscriptions - .signature_subscriptions - .read() - .unwrap() - .contains_key(¬_ready_signature)); + assert_eq!( + sig_subs.get(&unprocessed_tx.signatures[0]).unwrap().len(), + 1 + ); + + let (subscriber, _id_receiver, transport_receiver) = + Subscriber::new_test("signatureNotification"); + let sink = subscriber.assign_id(SubscriptionId::Number(0)).unwrap(); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = RpcSubscriptions::new(&exit); + + subscriptions.add_signature_subscription( + &past_bank_tx.signatures[0], + Some(1), + &SubscriptionId::Number(1 as u64), + &sink.clone(), + ); + subscriptions.notify_subscribers(1, &bank_forks); + let response = robust_poll_or_panic(transport_receiver); + let expected_res: Option> = Some(Ok(())); + let expected = json!({ + "jsonrpc": "2.0", + "method": "signatureNotification", + "params": { + "result": { + "context": { "slot": 0 }, + "value": expected_res, + }, + "subscription": 0, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); } #[test] diff --git a/core/tests/rpc.rs b/core/tests/rpc.rs index 8c51a6c028..e4b594aa27 100644 --- a/core/tests/rpc.rs +++ b/core/tests/rpc.rs @@ -7,9 +7,18 @@ use jsonrpc_core_client::transports::ws; use log::*; use reqwest::{self, header::CONTENT_TYPE}; use serde_json::{json, Value}; -use solana_client::rpc_client::get_rpc_request_str; +use solana_client::{ + rpc_client::{get_rpc_request_str, RpcClient}, + rpc_response::Response, +}; use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, validator::TestValidator}; -use solana_sdk::{hash::Hash, pubkey::Pubkey, system_transaction, transaction}; +use solana_sdk::{ + commitment_config::CommitmentConfig, + hash::Hash, + pubkey::Pubkey, + system_transaction, + transaction::{self, Transaction}, +}; use std::{ collections::HashSet, fs::remove_dir_all, @@ -17,8 +26,7 @@ use std::{ sync::mpsc::channel, sync::{Arc, Mutex}, thread::sleep, - time::Duration, - time::SystemTime, + time::{Duration, Instant}, }; use tokio::runtime::Runtime; @@ -204,35 +212,43 @@ fn test_rpc_subscriptions() { // Create transaction signatures to subscribe to let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut signature_set: HashSet = (0..1000) - .map(|_| { - let tx = system_transaction::transfer(&alice, &Pubkey::new_rand(), 1, genesis_hash); - transactions_socket - .send_to(&bincode::serialize(&tx).unwrap(), leader_data.tpu) - .unwrap(); - tx.signatures[0].to_string() - }) + let transactions: Vec = (0..500) + .map(|_| system_transaction::transfer(&alice, &Pubkey::new_rand(), 1, genesis_hash)) + .collect(); + let mut signature_set: HashSet = transactions + .iter() + .map(|tx| tx.signatures[0].to_string()) .collect(); // Create the pub sub runtime let mut rt = Runtime::new().unwrap(); let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub); - let (sender, receiver) = channel::<(String, transaction::Result<()>)>(); - let sender = Arc::new(Mutex::new(sender)); + let (status_sender, status_receiver) = channel::<(String, Response>)>(); + let status_sender = Arc::new(Mutex::new(status_sender)); + let (sent_sender, sent_receiver) = channel::<()>(); + let sent_sender = Arc::new(Mutex::new(sent_sender)); + + // Subscribe to all signatures rt.spawn({ let connect = ws::try_connect::(&rpc_pubsub_url).unwrap(); let signature_set = signature_set.clone(); connect .and_then(move |client| { for sig in signature_set { - let sender = sender.clone(); + let status_sender = status_sender.clone(); + let sent_sender = sent_sender.clone(); tokio::spawn( client .signature_subscribe(sig.clone(), None) .and_then(move |sig_stream| { + sent_sender.lock().unwrap().send(()).unwrap(); sig_stream.for_each(move |result| { - sender.lock().unwrap().send((sig.clone(), result)).unwrap(); + status_sender + .lock() + .unwrap() + .send((sig.clone(), result)) + .unwrap(); future::ok(()) }) }) @@ -246,18 +262,49 @@ fn test_rpc_subscriptions() { .map_err(|_| ()) }); + // Wait for signature subscriptions + let deadline = Instant::now() + Duration::from_secs(2); + (0..transactions.len()).for_each(|_| { + sent_receiver + .recv_timeout(deadline.saturating_duration_since(Instant::now())) + .unwrap(); + }); + + let rpc_client = RpcClient::new_socket(leader_data.rpc); + let transaction_count = rpc_client + .get_transaction_count_with_commitment(CommitmentConfig::recent()) + .unwrap(); + + // Send all transactions to tpu socket for processing + transactions.iter().for_each(|tx| { + transactions_socket + .send_to(&bincode::serialize(&tx).unwrap(), leader_data.tpu) + .unwrap(); + }); + let mut x = 0; + let now = Instant::now(); + while x < transaction_count + 500 || now.elapsed() > Duration::from_secs(5) { + x = rpc_client + .get_transaction_count_with_commitment(CommitmentConfig::recent()) + .unwrap(); + sleep(Duration::from_millis(200)); + } + // Wait for all signature subscriptions - let now = SystemTime::now(); - let timeout = Duration::from_secs(5); + let deadline = Instant::now() + Duration::from_secs(5); while !signature_set.is_empty() { - assert!(now.elapsed().unwrap() < timeout); - match receiver.recv_timeout(Duration::from_secs(1)) { + let timeout = deadline.saturating_duration_since(Instant::now()); + match status_receiver.recv_timeout(timeout) { Ok((sig, result)) => { - assert!(result.is_ok()); + assert!(result.value.is_ok()); assert!(signature_set.remove(&sig)); } Err(_err) => { - eprintln!("unexpected receive timeout"); + eprintln!( + "recv_timeout, {}/{} signatures remaining", + signature_set.len(), + transactions.len() + ); assert!(false) } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 8a14a72f4d..f61fd3738b 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1817,6 +1817,18 @@ impl Bank { .fetch_add(signature_count, Ordering::Relaxed); } + pub fn get_signature_status_processed_since_parent( + &self, + signature: &Signature, + ) -> Option> { + if let Some(status) = self.get_signature_confirmation_status(signature) { + if status.slot == self.slot() { + return Some(status.status); + } + } + None + } + pub fn get_signature_confirmation_status( &self, signature: &Signature,