From a97c04b400b43cdb0c142c46158636ab7d245e39 Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Tue, 10 Nov 2020 19:48:42 +0800 Subject: [PATCH] Send RPC notification when account is deleted (#13440) * Send RPC notification when account is deleted * Remove unwrap --- core/src/rpc_subscriptions.rs | 167 ++++++++++++++++++++++------------ 1 file changed, 107 insertions(+), 60 deletions(-) diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 0e71472859..0a2364830b 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -182,7 +182,7 @@ where K: Eq + Hash + Clone + Copy, S: Clone + Serialize, B: Fn(&Bank, &K) -> X, - F: Fn(X, &K, Slot, Option, Option>) -> (Box>, Slot), + F: Fn(X, &K, Slot, Option, Arc) -> (Box>, Slot), X: Clone + Serialize + Default, T: Clone, { @@ -206,29 +206,27 @@ where commitment_slots.highest_confirmed_slot } }; - let bank = bank_forks.read().unwrap().get(slot).cloned(); - let results = bank - .clone() - .map(|desired_bank| bank_method(&desired_bank, hashmap_key)) - .unwrap_or_default(); - let mut w_last_notified_slot = last_notified_slot.write().unwrap(); - let (filter_results, result_slot) = filter_results( - results, - hashmap_key, - *w_last_notified_slot, - config.as_ref().cloned(), - bank, - ); - for result in filter_results { - notifier.notify( - Response { - context: RpcResponseContext { slot }, - value: result, - }, - sink, + if let Some(bank) = bank_forks.read().unwrap().get(slot).cloned() { + let results = bank_method(&bank, hashmap_key); + let mut w_last_notified_slot = last_notified_slot.write().unwrap(); + let (filter_results, result_slot) = filter_results( + results, + hashmap_key, + *w_last_notified_slot, + config.as_ref().cloned(), + bank, ); - notified_set.insert(sub_id.clone()); - *w_last_notified_slot = result_slot; + for result in filter_results { + notifier.notify( + Response { + context: RpcResponseContext { slot }, + value: result, + }, + sink, + ); + notified_set.insert(sub_id.clone()); + *w_last_notified_slot = result_slot; + } } } } @@ -252,30 +250,28 @@ fn filter_account_result( pubkey: &Pubkey, last_notified_slot: Slot, encoding: Option, - bank: Option>, + bank: Arc, ) -> (Box>, Slot) { - if let Some((account, fork)) = result { - // If fork < last_notified_slot this means that we last notified for a fork - // and should notify that the account state has been reverted. - if fork != last_notified_slot { - let encoding = encoding.unwrap_or(UiAccountEncoding::Binary); - if account.owner == spl_token_id_v2_0() && encoding == UiAccountEncoding::JsonParsed { - let bank = bank.unwrap(); // If result.is_some(), bank must also be Some - return ( - Box::new(iter::once(get_parsed_token_account(bank, pubkey, account))), - fork, - ); - } else { - return ( - Box::new(iter::once(UiAccount::encode( - pubkey, account, encoding, None, None, - ))), - fork, - ); - } + // If the account is not found, `last_modified_slot` will default to zero and + // we will notify clients that the account no longer exists if we haven't already + let (account, last_modified_slot) = result.unwrap_or_default(); + + // If last_modified_slot < last_notified_slot this means that we last notified for a fork + // and should notify that the account state has been reverted. + let results: Box> = if last_modified_slot != last_notified_slot { + let encoding = encoding.unwrap_or(UiAccountEncoding::Binary); + if account.owner == spl_token_id_v2_0() && encoding == UiAccountEncoding::JsonParsed { + Box::new(iter::once(get_parsed_token_account(bank, pubkey, account))) + } else { + Box::new(iter::once(UiAccount::encode( + pubkey, account, encoding, None, None, + ))) } - } - (Box::new(iter::empty()), last_notified_slot) + } else { + Box::new(iter::empty()) + }; + + (results, last_modified_slot) } fn filter_signature_result( @@ -283,7 +279,7 @@ fn filter_signature_result( _signature: &Signature, last_notified_slot: Slot, _config: Option, - _bank: Option>, + _bank: Arc, ) -> (Box>, Slot) { ( Box::new(result.into_iter().map(|result| { @@ -298,7 +294,7 @@ fn filter_program_results( program_id: &Pubkey, last_notified_slot: Slot, config: Option, - bank: Option>, + bank: Arc, ) -> (Box>, Slot) { let config = config.unwrap_or_default(); let encoding = config.encoding.unwrap_or(UiAccountEncoding::Binary); @@ -314,7 +310,6 @@ fn filter_program_results( && encoding == UiAccountEncoding::JsonParsed && !accounts_is_empty { - let bank = bank.unwrap(); // If !accounts_is_empty, bank must be Some Box::new(get_parsed_token_accounts(bank, keyed_accounts)) } else { Box::new( @@ -1012,8 +1007,10 @@ pub(crate) mod tests { genesis_utils::{create_genesis_config, GenesisConfigInfo}, }; use solana_sdk::{ + message::Message, signature::{Keypair, Signer}, - system_transaction, + system_instruction, system_program, system_transaction, + transaction::Transaction, }; use std::{fmt::Debug, sync::mpsc::channel, time::Instant}; use tokio_01::{prelude::FutureExt, runtime::Runtime, timer::Delay}; @@ -1059,9 +1056,12 @@ pub(crate) mod tests { bank_forks.write().unwrap().insert(bank1); let alice = Keypair::new(); - let (subscriber, _id_receiver, transport_receiver) = - Subscriber::new_test("accountNotification"); - let sub_id = SubscriptionId::Number(0 as u64); + let (create_sub, _id_receiver, create_recv) = Subscriber::new_test("accountNotification"); + let (close_sub, _id_receiver, close_recv) = Subscriber::new_test("accountNotification"); + + let create_sub_id = SubscriptionId::Number(0 as u64); + let close_sub_id = SubscriptionId::Number(1 as u64); + let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new( &exit, @@ -1078,8 +1078,8 @@ pub(crate) mod tests { encoding: None, data_slice: None, }), - sub_id.clone(), - subscriber, + create_sub_id.clone(), + create_sub, ); assert!(subscriptions @@ -1094,8 +1094,8 @@ pub(crate) mod tests { &alice, blockhash, 1, - 16, - &solana_stake_program::id(), + 0, + &system_program::id(), ); bank_forks .write() @@ -1107,7 +1107,7 @@ pub(crate) mod tests { let mut commitment_slots = CommitmentSlots::default(); commitment_slots.slot = 1; subscriptions.notify_subscribers(commitment_slots); - let (response, _) = robust_poll_or_panic(transport_receiver); + let (response, _) = robust_poll_or_panic(create_recv); let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", @@ -1115,10 +1115,10 @@ pub(crate) mod tests { "result": { "context": { "slot": 1 }, "value": { - "data": "1111111111111111", + "data": "", "executable": false, "lamports": 1, - "owner": "Stake11111111111111111111111111111111111111", + "owner": "11111111111111111111111111111111", "rentEpoch": 0, }, }, @@ -1126,8 +1126,55 @@ pub(crate) mod tests { } }); assert_eq!(serde_json::to_string(&expected).unwrap(), response); + subscriptions.remove_account_subscription(&create_sub_id); + + subscriptions.add_account_subscription( + alice.pubkey(), + Some(RpcAccountInfoConfig { + commitment: Some(CommitmentConfig::recent()), + encoding: None, + data_slice: None, + }), + close_sub_id.clone(), + close_sub, + ); + + let tx = { + let instruction = + system_instruction::transfer(&alice.pubkey(), &mint_keypair.pubkey(), 1); + let message = Message::new(&[instruction], Some(&mint_keypair.pubkey())); + Transaction::new(&[&alice, &mint_keypair], message, blockhash) + }; + + bank_forks + .write() + .unwrap() + .get(1) + .unwrap() + .process_transaction(&tx) + .unwrap(); + subscriptions.notify_subscribers(commitment_slots); + let (response, _) = robust_poll_or_panic(close_recv); + let expected = json!({ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "context": { "slot": 1 }, + "value": { + "data": "", + "executable": false, + "lamports": 0, + "owner": "11111111111111111111111111111111", + "rentEpoch": 0, + }, + }, + "subscription": 1, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + subscriptions.remove_account_subscription(&close_sub_id); - subscriptions.remove_account_subscription(&sub_id); assert!(!subscriptions .subscriptions .account_subscriptions