Wait until bank is frozen before sending RPC notifications (#10654)

This commit is contained in:
Justin Starry
2020-06-18 00:44:51 +08:00
committed by GitHub
parent 6ee222363e
commit 39984cdcc3
2 changed files with 129 additions and 54 deletions

View File

@ -275,6 +275,7 @@ impl ReplayStage {
transaction_status_sender.clone(), transaction_status_sender.clone(),
&verify_recyclers, &verify_recyclers,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&subscriptions,
); );
Self::report_memory(&allocated, "replay_active_banks", start); Self::report_memory(&allocated, "replay_active_banks", start);
@ -1101,6 +1102,7 @@ impl ReplayStage {
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
verify_recyclers: &VerifyRecyclers, verify_recyclers: &VerifyRecyclers,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
subscriptions: &Arc<RpcSubscriptions>,
) -> bool { ) -> bool {
let mut did_complete_bank = false; let mut did_complete_bank = false;
let mut tx_count = 0; let mut tx_count = 0;
@ -1170,6 +1172,7 @@ impl ReplayStage {
bank.freeze(); bank.freeze();
heaviest_subtree_fork_choice heaviest_subtree_fork_choice
.add_new_leaf_slot(bank.slot(), Some(bank.parent_slot())); .add_new_leaf_slot(bank.slot(), Some(bank.parent_slot()));
subscriptions.notify_frozen(bank.slot());
} else { } else {
trace!( trace!(
"bank {} not completed tick_height: {}, max_tick_height: {}", "bank {} not completed tick_height: {}, max_tick_height: {}",

View File

@ -64,6 +64,7 @@ enum NotificationEntry {
Slot(SlotInfo), Slot(SlotInfo),
Vote(Vote), Vote(Vote),
Root(Slot), Root(Slot),
Frozen(Slot),
Bank(CacheSlotInfo), Bank(CacheSlotInfo),
Gossip(Slot), Gossip(Slot),
} }
@ -72,6 +73,7 @@ impl std::fmt::Debug for NotificationEntry {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self { match self {
NotificationEntry::Root(root) => write!(f, "Root({})", root), NotificationEntry::Root(root) => write!(f, "Root({})", root),
NotificationEntry::Frozen(slot) => write!(f, "Frozen({})", slot),
NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote), NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote),
NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info),
NotificationEntry::Bank(cache_slot_info) => write!( NotificationEntry::Bank(cache_slot_info) => write!(
@ -227,6 +229,8 @@ fn filter_account_result(
last_notified_slot: Slot, last_notified_slot: Slot,
) -> (Box<dyn Iterator<Item = RpcAccount>>, Slot) { ) -> (Box<dyn Iterator<Item = RpcAccount>>, Slot) {
if let Some((account, fork)) = result { if let Some((account, fork)) = result {
// If fork < last_notified_slot this means that we last notified for a fork
// and should notify that the account state has been reverted.
if fork != last_notified_slot { if fork != last_notified_slot {
return (Box::new(iter::once(RpcAccount::encode(account))), fork); return (Box::new(iter::once(RpcAccount::encode(account))), fork);
} }
@ -647,6 +651,10 @@ impl RpcSubscriptions {
self.enqueue_notification(NotificationEntry::Vote(vote.clone())); self.enqueue_notification(NotificationEntry::Vote(vote.clone()));
} }
pub fn notify_frozen(&self, frozen_slot: Slot) {
self.enqueue_notification(NotificationEntry::Frozen(frozen_slot));
}
pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<Slot>) { pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<Slot>) {
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap(); let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap();
@ -690,6 +698,7 @@ impl RpcSubscriptions {
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
last_checked_slots: Arc<RwLock<HashMap<CommitmentLevel, Slot>>>, last_checked_slots: Arc<RwLock<HashMap<CommitmentLevel, Slot>>>,
) { ) {
let mut pending_gossip_notifications = HashSet::new();
loop { loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
@ -720,6 +729,12 @@ impl RpcSubscriptions {
for (_, sink) in subscriptions.iter() { for (_, sink) in subscriptions.iter() {
notifier.notify(root, sink); notifier.notify(root, sink);
} }
// Prune old pending notifications
pending_gossip_notifications = pending_gossip_notifications
.into_iter()
.filter(|&s| s > root)
.collect();
} }
NotificationEntry::Bank(cache_slot_info) => { NotificationEntry::Bank(cache_slot_info) => {
RpcSubscriptions::notify_accounts_programs_signatures( RpcSubscriptions::notify_accounts_programs_signatures(
@ -731,23 +746,36 @@ impl RpcSubscriptions {
&notifier, &notifier,
) )
} }
NotificationEntry::Gossip(slot) => { NotificationEntry::Frozen(slot) => {
let _ = last_checked_slots if pending_gossip_notifications.remove(&slot) {
.write() Self::process_gossip_notification(
.unwrap() slot,
.insert(CommitmentLevel::SingleGossip, slot);
let cache_slot_info = CacheSlotInfo {
highest_confirmed_slot: slot,
..CacheSlotInfo::default()
};
RpcSubscriptions::notify_accounts_programs_signatures(
&subscriptions.gossip_account_subscriptions,
&subscriptions.gossip_program_subscriptions,
&subscriptions.gossip_signature_subscriptions,
&bank_forks,
&cache_slot_info,
&notifier, &notifier,
) &subscriptions,
&bank_forks,
&last_checked_slots,
);
}
}
NotificationEntry::Gossip(slot) => {
let bank_frozen = bank_forks
.read()
.unwrap()
.get(slot)
.filter(|b| b.is_frozen())
.is_some();
if !bank_frozen {
pending_gossip_notifications.insert(slot);
} else {
Self::process_gossip_notification(
slot,
&notifier,
&subscriptions,
&bank_forks,
&last_checked_slots,
);
}
} }
}, },
Err(RecvTimeoutError::Timeout) => { Err(RecvTimeoutError::Timeout) => {
@ -761,6 +789,42 @@ impl RpcSubscriptions {
} }
} }
fn process_gossip_notification(
slot: Slot,
notifier: &RpcNotifier,
subscriptions: &Subscriptions,
bank_forks: &Arc<RwLock<BankForks>>,
last_checked_slots: &Arc<RwLock<HashMap<CommitmentLevel, Slot>>>,
) {
let mut last_checked_slots_lock = last_checked_slots.write().unwrap();
let last_checked_slot = last_checked_slots_lock
.get(&CommitmentLevel::SingleGossip)
.cloned()
.unwrap_or_default();
if slot > last_checked_slot {
last_checked_slots_lock.insert(CommitmentLevel::SingleGossip, slot);
} else {
// Avoid sending stale or duplicate notifications
return;
}
drop(last_checked_slots_lock);
let cache_slot_info = CacheSlotInfo {
highest_confirmed_slot: slot,
..CacheSlotInfo::default()
};
RpcSubscriptions::notify_accounts_programs_signatures(
&subscriptions.gossip_account_subscriptions,
&subscriptions.gossip_program_subscriptions,
&subscriptions.gossip_signature_subscriptions,
&bank_forks,
&cache_slot_info,
&notifier,
);
}
fn notify_accounts_programs_signatures( fn notify_accounts_programs_signatures(
account_subscriptions: &Arc<RpcAccountSubscriptions>, account_subscriptions: &Arc<RpcAccountSubscriptions>,
program_subscriptions: &Arc<RpcProgramSubscriptions>, program_subscriptions: &Arc<RpcProgramSubscriptions>,
@ -1380,6 +1444,8 @@ pub(crate) mod tests {
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1); bank_forks.write().unwrap().insert(bank1);
let bank2 = Bank::new_from_parent(&bank0, &Pubkey::default(), 2);
bank_forks.write().unwrap().insert(bank2);
let alice = Keypair::new(); let alice = Keypair::new();
let (subscriber0, _id_receiver, transport_receiver0) = let (subscriber0, _id_receiver, transport_receiver0) =
@ -1405,17 +1471,10 @@ pub(crate) mod tests {
sub_id0.clone(), sub_id0.clone(),
subscriber0, subscriber0,
); );
let sub_id1 = SubscriptionId::Number(1 as u64);
subscriptions.add_account_subscription(
alice.pubkey(),
Some(CommitmentConfig::recent()),
sub_id1.clone(),
subscriber1,
);
assert!(subscriptions assert!(subscriptions
.subscriptions .subscriptions
.account_subscriptions .gossip_account_subscriptions
.read() .read()
.unwrap() .unwrap()
.contains_key(&alice.pubkey())); .contains_key(&alice.pubkey()));
@ -1428,37 +1487,27 @@ pub(crate) mod tests {
16, 16,
&solana_budget_program::id(), &solana_budget_program::id(),
); );
// Add the transaction to the 1st bank and then freeze the bank
let bank1 = bank_forks.write().unwrap().get(1).cloned().unwrap();
bank1.process_transaction(&tx).unwrap();
bank1.freeze();
// Add the same transaction to the unfrozen 2nd bank
bank_forks bank_forks
.write() .write()
.unwrap() .unwrap()
.get(1) .get(2)
.unwrap() .unwrap()
.process_transaction(&tx) .process_transaction(&tx)
.unwrap(); .unwrap();
let mut cache_slot_info = CacheSlotInfo::default();
cache_slot_info.current_slot = 1;
subscriptions.notify_subscribers(cache_slot_info);
let (response, _) = robust_poll_or_panic(transport_receiver1);
let expected = json!({
"jsonrpc": "2.0",
"method": "accountNotification",
"params": {
"result": {
"context": { "slot": 1 },
"value": {
"data": "1111111111111111",
"executable": false,
"lamports": 1,
"owner": "Budget1111111111111111111111111111111111111",
"rentEpoch": 1,
},
},
"subscription": 1,
}
});
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
// First, notify the unfrozen bank first to queue pending notification
subscriptions.notify_gossip_subscribers(2);
// Now, notify the frozen bank and ensure its notifications are processed
subscriptions.notify_gossip_subscribers(1); subscriptions.notify_gossip_subscribers(1);
let (response, _) = robust_poll_or_panic(transport_receiver0); let (response, _) = robust_poll_or_panic(transport_receiver0);
let expected = json!({ let expected = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
@ -1478,18 +1527,41 @@ pub(crate) mod tests {
} }
}); });
assert_eq!(serde_json::to_string(&expected).unwrap(), response); assert_eq!(serde_json::to_string(&expected).unwrap(), response);
subscriptions.remove_account_subscription(&sub_id0); subscriptions.remove_account_subscription(&sub_id0);
assert!(subscriptions
.subscriptions let sub_id1 = SubscriptionId::Number(1 as u64);
.account_subscriptions subscriptions.add_account_subscription(
.read() alice.pubkey(),
.unwrap() Some(CommitmentConfig::single_gossip()),
.contains_key(&alice.pubkey())); sub_id1.clone(),
subscriber1,
);
subscriptions.notify_frozen(2);
let (response, _) = robust_poll_or_panic(transport_receiver1);
let expected = json!({
"jsonrpc": "2.0",
"method": "accountNotification",
"params": {
"result": {
"context": { "slot": 2 },
"value": {
"data": "1111111111111111",
"executable": false,
"lamports": 1,
"owner": "Budget1111111111111111111111111111111111111",
"rentEpoch": 1,
},
},
"subscription": 1,
}
});
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
subscriptions.remove_account_subscription(&sub_id1); subscriptions.remove_account_subscription(&sub_id1);
assert!(!subscriptions assert!(!subscriptions
.subscriptions .subscriptions
.account_subscriptions .gossip_account_subscriptions
.read() .read()
.unwrap() .unwrap()
.contains_key(&alice.pubkey())); .contains_key(&alice.pubkey()));