Send RPC notification when account is deleted (#13440)

* Send RPC notification when account is deleted

* Remove unwrap
This commit is contained in:
Justin Starry
2020-11-10 19:48:42 +08:00
committed by GitHub
parent 122b707a7d
commit a97c04b400

View File

@ -182,7 +182,7 @@ where
K: Eq + Hash + Clone + Copy, K: Eq + Hash + Clone + Copy,
S: Clone + Serialize, S: Clone + Serialize,
B: Fn(&Bank, &K) -> X, B: Fn(&Bank, &K) -> X,
F: Fn(X, &K, Slot, Option<T>, Option<Arc<Bank>>) -> (Box<dyn Iterator<Item = S>>, Slot), F: Fn(X, &K, Slot, Option<T>, Arc<Bank>) -> (Box<dyn Iterator<Item = S>>, Slot),
X: Clone + Serialize + Default, X: Clone + Serialize + Default,
T: Clone, T: Clone,
{ {
@ -206,29 +206,27 @@ where
commitment_slots.highest_confirmed_slot commitment_slots.highest_confirmed_slot
} }
}; };
let bank = bank_forks.read().unwrap().get(slot).cloned(); if let Some(bank) = bank_forks.read().unwrap().get(slot).cloned() {
let results = bank let results = bank_method(&bank, hashmap_key);
.clone() let mut w_last_notified_slot = last_notified_slot.write().unwrap();
.map(|desired_bank| bank_method(&desired_bank, hashmap_key)) let (filter_results, result_slot) = filter_results(
.unwrap_or_default(); results,
let mut w_last_notified_slot = last_notified_slot.write().unwrap(); hashmap_key,
let (filter_results, result_slot) = filter_results( *w_last_notified_slot,
results, config.as_ref().cloned(),
hashmap_key, bank,
*w_last_notified_slot,
config.as_ref().cloned(),
bank,
);
for result in filter_results {
notifier.notify(
Response {
context: RpcResponseContext { slot },
value: result,
},
sink,
); );
notified_set.insert(sub_id.clone()); for result in filter_results {
*w_last_notified_slot = result_slot; notifier.notify(
Response {
context: RpcResponseContext { slot },
value: result,
},
sink,
);
notified_set.insert(sub_id.clone());
*w_last_notified_slot = result_slot;
}
} }
} }
} }
@ -252,30 +250,28 @@ fn filter_account_result(
pubkey: &Pubkey, pubkey: &Pubkey,
last_notified_slot: Slot, last_notified_slot: Slot,
encoding: Option<UiAccountEncoding>, encoding: Option<UiAccountEncoding>,
bank: Option<Arc<Bank>>, bank: Arc<Bank>,
) -> (Box<dyn Iterator<Item = UiAccount>>, Slot) { ) -> (Box<dyn Iterator<Item = UiAccount>>, Slot) {
if let Some((account, fork)) = result { // If the account is not found, `last_modified_slot` will default to zero and
// If fork < last_notified_slot this means that we last notified for a fork // we will notify clients that the account no longer exists if we haven't already
// and should notify that the account state has been reverted. let (account, last_modified_slot) = result.unwrap_or_default();
if fork != last_notified_slot {
let encoding = encoding.unwrap_or(UiAccountEncoding::Binary); // If last_modified_slot < last_notified_slot this means that we last notified for a fork
if account.owner == spl_token_id_v2_0() && encoding == UiAccountEncoding::JsonParsed { // and should notify that the account state has been reverted.
let bank = bank.unwrap(); // If result.is_some(), bank must also be Some let results: Box<dyn Iterator<Item = UiAccount>> = if last_modified_slot != last_notified_slot {
return ( let encoding = encoding.unwrap_or(UiAccountEncoding::Binary);
Box::new(iter::once(get_parsed_token_account(bank, pubkey, account))), if account.owner == spl_token_id_v2_0() && encoding == UiAccountEncoding::JsonParsed {
fork, Box::new(iter::once(get_parsed_token_account(bank, pubkey, account)))
); } else {
} else { Box::new(iter::once(UiAccount::encode(
return ( pubkey, account, encoding, None, None,
Box::new(iter::once(UiAccount::encode( )))
pubkey, account, encoding, None, None,
))),
fork,
);
}
} }
} } else {
(Box::new(iter::empty()), last_notified_slot) Box::new(iter::empty())
};
(results, last_modified_slot)
} }
fn filter_signature_result( fn filter_signature_result(
@ -283,7 +279,7 @@ fn filter_signature_result(
_signature: &Signature, _signature: &Signature,
last_notified_slot: Slot, last_notified_slot: Slot,
_config: Option<bool>, _config: Option<bool>,
_bank: Option<Arc<Bank>>, _bank: Arc<Bank>,
) -> (Box<dyn Iterator<Item = RpcSignatureResult>>, Slot) { ) -> (Box<dyn Iterator<Item = RpcSignatureResult>>, Slot) {
( (
Box::new(result.into_iter().map(|result| { Box::new(result.into_iter().map(|result| {
@ -298,7 +294,7 @@ fn filter_program_results(
program_id: &Pubkey, program_id: &Pubkey,
last_notified_slot: Slot, last_notified_slot: Slot,
config: Option<ProgramConfig>, config: Option<ProgramConfig>,
bank: Option<Arc<Bank>>, bank: Arc<Bank>,
) -> (Box<dyn Iterator<Item = RpcKeyedAccount>>, Slot) { ) -> (Box<dyn Iterator<Item = RpcKeyedAccount>>, Slot) {
let config = config.unwrap_or_default(); let config = config.unwrap_or_default();
let encoding = config.encoding.unwrap_or(UiAccountEncoding::Binary); let encoding = config.encoding.unwrap_or(UiAccountEncoding::Binary);
@ -314,7 +310,6 @@ fn filter_program_results(
&& encoding == UiAccountEncoding::JsonParsed && encoding == UiAccountEncoding::JsonParsed
&& !accounts_is_empty && !accounts_is_empty
{ {
let bank = bank.unwrap(); // If !accounts_is_empty, bank must be Some
Box::new(get_parsed_token_accounts(bank, keyed_accounts)) Box::new(get_parsed_token_accounts(bank, keyed_accounts))
} else { } else {
Box::new( Box::new(
@ -1012,8 +1007,10 @@ pub(crate) mod tests {
genesis_utils::{create_genesis_config, GenesisConfigInfo}, genesis_utils::{create_genesis_config, GenesisConfigInfo},
}; };
use solana_sdk::{ use solana_sdk::{
message::Message,
signature::{Keypair, Signer}, signature::{Keypair, Signer},
system_transaction, system_instruction, system_program, system_transaction,
transaction::Transaction,
}; };
use std::{fmt::Debug, sync::mpsc::channel, time::Instant}; use std::{fmt::Debug, sync::mpsc::channel, time::Instant};
use tokio_01::{prelude::FutureExt, runtime::Runtime, timer::Delay}; use tokio_01::{prelude::FutureExt, runtime::Runtime, timer::Delay};
@ -1059,9 +1056,12 @@ pub(crate) mod tests {
bank_forks.write().unwrap().insert(bank1); bank_forks.write().unwrap().insert(bank1);
let alice = Keypair::new(); let alice = Keypair::new();
let (subscriber, _id_receiver, transport_receiver) = let (create_sub, _id_receiver, create_recv) = Subscriber::new_test("accountNotification");
Subscriber::new_test("accountNotification"); let (close_sub, _id_receiver, close_recv) = Subscriber::new_test("accountNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let create_sub_id = SubscriptionId::Number(0 as u64);
let close_sub_id = SubscriptionId::Number(1 as u64);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new( let subscriptions = RpcSubscriptions::new(
&exit, &exit,
@ -1078,8 +1078,8 @@ pub(crate) mod tests {
encoding: None, encoding: None,
data_slice: None, data_slice: None,
}), }),
sub_id.clone(), create_sub_id.clone(),
subscriber, create_sub,
); );
assert!(subscriptions assert!(subscriptions
@ -1094,8 +1094,8 @@ pub(crate) mod tests {
&alice, &alice,
blockhash, blockhash,
1, 1,
16, 0,
&solana_stake_program::id(), &system_program::id(),
); );
bank_forks bank_forks
.write() .write()
@ -1107,7 +1107,7 @@ pub(crate) mod tests {
let mut commitment_slots = CommitmentSlots::default(); let mut commitment_slots = CommitmentSlots::default();
commitment_slots.slot = 1; commitment_slots.slot = 1;
subscriptions.notify_subscribers(commitment_slots); subscriptions.notify_subscribers(commitment_slots);
let (response, _) = robust_poll_or_panic(transport_receiver); let (response, _) = robust_poll_or_panic(create_recv);
let expected = json!({ let expected = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "accountNotification", "method": "accountNotification",
@ -1115,10 +1115,10 @@ pub(crate) mod tests {
"result": { "result": {
"context": { "slot": 1 }, "context": { "slot": 1 },
"value": { "value": {
"data": "1111111111111111", "data": "",
"executable": false, "executable": false,
"lamports": 1, "lamports": 1,
"owner": "Stake11111111111111111111111111111111111111", "owner": "11111111111111111111111111111111",
"rentEpoch": 0, "rentEpoch": 0,
}, },
}, },
@ -1126,8 +1126,55 @@ pub(crate) mod tests {
} }
}); });
assert_eq!(serde_json::to_string(&expected).unwrap(), response); assert_eq!(serde_json::to_string(&expected).unwrap(), response);
subscriptions.remove_account_subscription(&create_sub_id);
subscriptions.add_account_subscription(
alice.pubkey(),
Some(RpcAccountInfoConfig {
commitment: Some(CommitmentConfig::recent()),
encoding: None,
data_slice: None,
}),
close_sub_id.clone(),
close_sub,
);
let tx = {
let instruction =
system_instruction::transfer(&alice.pubkey(), &mint_keypair.pubkey(), 1);
let message = Message::new(&[instruction], Some(&mint_keypair.pubkey()));
Transaction::new(&[&alice, &mint_keypair], message, blockhash)
};
bank_forks
.write()
.unwrap()
.get(1)
.unwrap()
.process_transaction(&tx)
.unwrap();
subscriptions.notify_subscribers(commitment_slots);
let (response, _) = robust_poll_or_panic(close_recv);
let expected = json!({
"jsonrpc": "2.0",
"method": "accountNotification",
"params": {
"result": {
"context": { "slot": 1 },
"value": {
"data": "",
"executable": false,
"lamports": 0,
"owner": "11111111111111111111111111111111",
"rentEpoch": 0,
},
},
"subscription": 1,
}
});
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
subscriptions.remove_account_subscription(&close_sub_id);
subscriptions.remove_account_subscription(&sub_id);
assert!(!subscriptions assert!(!subscriptions
.subscriptions .subscriptions
.account_subscriptions .account_subscriptions