From 377d45c9ddcb5a6a05720edc254623d47bb87f9f Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sun, 17 Feb 2019 16:33:25 -0700 Subject: [PATCH] Pull RpcSubscriptions out of the Bank --- src/bank.rs | 41 --------------------- src/rpc_pubsub.rs | 75 +++++++++++++++++++-------------------- src/rpc_pubsub_service.rs | 1 - 3 files changed, 36 insertions(+), 81 deletions(-) diff --git a/src/bank.rs b/src/bank.rs index ef271e9313..0d4c1178fb 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -8,7 +8,6 @@ use crate::counter::Counter; use crate::genesis_block::GenesisBlock; use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS}; use crate::poh_service::NUM_TICKS_PER_SECOND; -use crate::rpc_subscriptions::RpcSubscriptions; use crate::status_cache::StatusCache; use bincode::{deserialize, serialize}; use log::Level; @@ -89,8 +88,6 @@ pub struct Bank { /// FIFO queue of `last_id` items last_id_queue: RwLock, - subscriptions: RwLock>>, - parent: Option>, parent_hash: Hash, @@ -102,7 +99,6 @@ impl Default for Bank { accounts: Accounts::default(), last_id_queue: RwLock::new(LastIdQueue::default()), status_cache: RwLock::new(BankStatusCache::default()), - subscriptions: RwLock::new(None), parent: None, parent_hash: Hash::default(), } @@ -131,11 +127,6 @@ impl Bank { self.parent.clone() } - pub fn set_subscriptions(&self, subscriptions: Arc) { - let mut sub = self.subscriptions.write().unwrap(); - *sub = Some(subscriptions) - } - fn process_genesis_block(&self, genesis_block: &GenesisBlock) { assert!(genesis_block.mint_id != Pubkey::default()); assert!(genesis_block.bootstrap_leader_id != Pubkey::default()); @@ -258,13 +249,6 @@ impl Bank { self.status_cache.write().unwrap().clear(); } - fn update_subscriptions(&self, txs: &[Transaction], res: &[Result<()>]) { - for (i, tx) in txs.iter().enumerate() { - if let Some(ref subs) = *self.subscriptions.read().unwrap() { - subs.check_signature(&tx.signatures[0], &res[i]); - } - } - } fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { let mut status_cache = self.status_cache.write().unwrap(); for (i, tx) in txs.iter().enumerate() { @@ -491,9 +475,6 @@ impl Bank { self.accounts .store_accounts(true, txs, executed, loaded_accounts); - // Check account subscriptions and send notifications - self.send_account_notifications(txs, executed, loaded_accounts); - // once committed there is no way to unroll let write_elapsed = now.elapsed(); debug!( @@ -502,7 +483,6 @@ impl Bank { txs.len(), ); self.update_transaction_statuses(txs, &executed); - self.update_subscriptions(txs, &executed); } /// Process a batch of transactions. @@ -609,27 +589,6 @@ impl Bank { extend_and_hash(&self.parent_hash, &serialize(&accounts_delta_hash).unwrap()) } - fn send_account_notifications( - &self, - txs: &[Transaction], - res: &[Result<()>], - loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], - ) { - for (i, raccs) in loaded.iter().enumerate() { - if res[i].is_err() || raccs.is_err() { - continue; - } - - let tx = &txs[i]; - let accs = raccs.as_ref().unwrap(); - for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) { - if let Some(ref subs) = *self.subscriptions.read().unwrap() { - subs.check_account(&key, account) - } - } - } - } - pub fn vote_states(&self, cond: F) -> Vec where F: Fn(&VoteState) -> bool, diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index ac9b619df3..27387a8315 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -186,6 +186,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { #[cfg(test)] mod tests { use super::*; + use crate::bank; use crate::genesis_block::GenesisBlock; use jsonrpc_core::futures::sync::mpsc; use jsonrpc_core::Response; @@ -194,10 +195,31 @@ mod tests { use solana_sdk::budget_transaction::BudgetTransaction; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; + use solana_sdk::transaction::Transaction; use std::thread::sleep; use std::time::Duration; use tokio::prelude::{Async, Stream}; + pub fn process_transaction_and_notify( + bank: &Bank, + tx: &Transaction, + subscriptions: &RpcSubscriptions, + ) -> bank::Result<()> { + bank.process_transaction(tx)?; + + for pubkey in &tx.account_keys { + if let Some(account) = &bank.get_account(pubkey) { + subscriptions.check_account(pubkey, account); + } + } + + let signature = &tx.signatures[0]; + let status = bank.get_signature_status(signature).unwrap(); + subscriptions.check_signature(signature, &status); + + Ok(()) + } + fn create_session() -> Arc { Arc::new(Session::new(mpsc::channel(1).0)) } @@ -212,8 +234,6 @@ mod tests { let last_id = arc_bank.last_id(); let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); - let subscriptions = rpc.subscriptions.clone(); - arc_bank.set_subscriptions(subscriptions); // Test signature subscriptions let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0); @@ -223,9 +243,7 @@ mod tests { Subscriber::new_test("signatureNotification"); rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string()); - arc_bank - .process_transaction(&tx) - .expect("process transaction"); + process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); // Test signature confirmation notification @@ -262,11 +280,9 @@ mod tests { let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#); - let expected: Response = - serde_json::from_str(&expected).expect("expected response deserialization"); + let expected: Response = serde_json::from_str(&expected).unwrap(); - let result: Response = serde_json::from_str(&res.expect("actual response")) - .expect("actual response deserialization"); + let result: Response = serde_json::from_str(&res.unwrap()).unwrap(); assert_eq!(expected, result); // Test bad parameter @@ -274,11 +290,9 @@ mod tests { format!(r#"{{"jsonrpc":"2.0","id":1,"method":"signatureUnsubscribe","params":[1]}}"#); let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"#); - let expected: Response = - serde_json::from_str(&expected).expect("expected response deserialization"); + let expected: Response = serde_json::from_str(&expected).unwrap(); - let result: Response = serde_json::from_str(&res.expect("actual response")) - .expect("actual response deserialization"); + let result: Response = serde_json::from_str(&res.unwrap()).unwrap(); assert_eq!(expected, result); } @@ -296,9 +310,6 @@ mod tests { let last_id = arc_bank.last_id(); let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); - let subscriptions = rpc.subscriptions.clone(); - arc_bank.set_subscriptions(subscriptions); - let session = create_session(); let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe(session, subscriber, contract_state.pubkey().to_string()); @@ -312,9 +323,7 @@ mod tests { budget_program_id, 0, ); - arc_bank - .process_transaction(&tx) - .expect("process transaction"); + process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); let tx = SystemTransaction::new_program_account( &alice, @@ -326,9 +335,7 @@ mod tests { 0, ); - arc_bank - .process_transaction(&tx) - .expect("process transaction"); + process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); // Test signature confirmation notification #1 let string = receiver.poll(); @@ -366,9 +373,7 @@ mod tests { 50, last_id, ); - arc_bank - .process_transaction(&tx) - .expect("process transaction"); + process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); // Test signature confirmation notification #2 @@ -396,9 +401,7 @@ mod tests { } let tx = SystemTransaction::new_account(&alice, witness.pubkey(), 1, last_id, 0); - arc_bank - .process_transaction(&tx) - .expect("process transaction"); + process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); let tx = BudgetTransaction::new_signature( &witness, @@ -406,9 +409,7 @@ mod tests { bob_pubkey, last_id, ); - arc_bank - .process_transaction(&tx) - .expect("process transaction"); + process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); let expected_userdata = arc_bank @@ -459,11 +460,9 @@ mod tests { let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#); - let expected: Response = - serde_json::from_str(&expected).expect("expected response deserialization"); + let expected: Response = serde_json::from_str(&expected).unwrap(); - let result: Response = serde_json::from_str(&res.expect("actual response")) - .expect("actual response deserialization"); + let result: Response = serde_json::from_str(&res.unwrap()).unwrap(); assert_eq!(expected, result); // Test bad parameter @@ -471,11 +470,9 @@ mod tests { format!(r#"{{"jsonrpc":"2.0","id":1,"method":"accountUnsubscribe","params":[1]}}"#); let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"#); - let expected: Response = - serde_json::from_str(&expected).expect("expected response deserialization"); + let expected: Response = serde_json::from_str(&expected).unwrap(); - let result: Response = serde_json::from_str(&res.expect("actual response")) - .expect("actual response deserialization"); + let result: Response = serde_json::from_str(&res.unwrap()).unwrap(); assert_eq!(expected, result); } } diff --git a/src/rpc_pubsub_service.rs b/src/rpc_pubsub_service.rs index 163bef1168..d846f1ad10 100644 --- a/src/rpc_pubsub_service.rs +++ b/src/rpc_pubsub_service.rs @@ -30,7 +30,6 @@ impl PubSubService { info!("rpc_pubsub bound to {:?}", pubsub_addr); let subscriptions = Arc::new(RpcSubscriptions::default()); let rpc = RpcSolPubSubImpl::new_with_subscriptions(bank.clone(), subscriptions.clone()); - bank.set_subscriptions(subscriptions); let exit = Arc::new(AtomicBool::new(false)); let exit_ = exit.clone(); let thread_hdl = Builder::new()