Rpc -> proper optimistic confirmation (#12514) (#12537)

* Add service to track the most recent optimistically confirmed bank

* Plumb service into ClusterInfoVoteListener and ReplayStage

* Clean up test

* Use OptimisticallyConfirmedBank in RPC

* Remove superfluous notifications from RpcSubscriptions

* Use crossbeam to avoid mpsc recv_timeout panic

* Review comments

* Remove superfluous last_checked_slots, but pass in OptimisticallyConfirmedBank for complete correctness

(cherry picked from commit 89621adca7)

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
This commit is contained in:
mergify[bot]
2020-09-29 03:49:18 +00:00
committed by GitHub
parent 63d9f32bb4
commit 5b322a995f
14 changed files with 684 additions and 150 deletions

View File

@@ -1,6 +1,9 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::rpc::{get_parsed_token_account, get_parsed_token_accounts};
use crate::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
rpc::{get_parsed_token_account, get_parsed_token_accounts},
};
use core::hash::Hash;
use jsonrpc_core::futures::Future;
use jsonrpc_pubsub::{
@@ -61,7 +64,6 @@ enum NotificationEntry {
Slot(SlotInfo),
Vote(Vote),
Root(Slot),
Frozen(Slot),
Bank(CommitmentSlots),
Gossip(Slot),
SignaturesReceived((Slot, Vec<Signature>)),
@@ -71,7 +73,6 @@ impl std::fmt::Debug for NotificationEntry {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
NotificationEntry::Root(root) => write!(f, "Root({})", root),
NotificationEntry::Frozen(slot) => write!(f, "Frozen({})", slot),
NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote),
NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info),
NotificationEntry::Bank(commitment_slots) => {
@@ -346,7 +347,7 @@ pub struct RpcSubscriptions {
notifier_runtime: Option<Runtime>,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
last_checked_slots: Arc<RwLock<HashMap<CommitmentLevel, Slot>>>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
exit: Arc<AtomicBool>,
}
@@ -363,6 +364,7 @@ impl RpcSubscriptions {
exit: &Arc<AtomicBool>,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
) -> Self {
let (notification_sender, notification_receiver): (
Sender<NotificationEntry>,
@@ -396,9 +398,6 @@ impl RpcSubscriptions {
};
let _subscriptions = subscriptions.clone();
let last_checked_slots = Arc::new(RwLock::new(HashMap::new()));
let _last_checked_slots = last_checked_slots.clone();
let notifier_runtime = RuntimeBuilder::new()
.core_threads(1)
.name_prefix("solana-rpc-notifier-")
@@ -415,7 +414,6 @@ impl RpcSubscriptions {
notification_receiver,
_subscriptions,
_bank_forks,
_last_checked_slots,
);
})
.unwrap();
@@ -427,16 +425,19 @@ impl RpcSubscriptions {
t_cleanup: Some(t_cleanup),
bank_forks,
block_commitment_cache,
last_checked_slots,
optimistically_confirmed_bank,
exit: exit.clone(),
}
}
pub fn default_with_bank_forks(bank_forks: Arc<RwLock<BankForks>>) -> Self {
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
Self::new(
&Arc::new(AtomicBool::new(false)),
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
)
}
@@ -529,12 +530,12 @@ impl RpcSubscriptions {
.read()
.unwrap()
.highest_confirmed_slot(),
CommitmentLevel::SingleGossip => *self
.last_checked_slots
CommitmentLevel::SingleGossip => self
.optimistically_confirmed_bank
.read()
.unwrap()
.get(&CommitmentLevel::SingleGossip)
.unwrap_or(&0),
.bank
.slot(),
};
let last_notified_slot = if let Some((_account, slot)) = self
.bank_forks
@@ -724,10 +725,6 @@ impl RpcSubscriptions {
self.enqueue_notification(NotificationEntry::Vote(vote.clone()));
}
pub fn notify_frozen(&self, frozen_slot: Slot) {
self.enqueue_notification(NotificationEntry::Frozen(frozen_slot));
}
pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<Slot>) {
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap();
@@ -769,9 +766,7 @@ impl RpcSubscriptions {
notification_receiver: Receiver<NotificationEntry>,
subscriptions: Subscriptions,
bank_forks: Arc<RwLock<BankForks>>,
last_checked_slots: Arc<RwLock<HashMap<CommitmentLevel, Slot>>>,
) {
let mut pending_gossip_notifications = HashSet::new();
loop {
if exit.load(Ordering::Relaxed) {
break;
@@ -804,18 +799,10 @@ impl RpcSubscriptions {
}
NotificationEntry::Root(root) => {
debug!("root notify: {:?}", root);
{
let subscriptions = subscriptions.root_subscriptions.read().unwrap();
for (_, sink) in subscriptions.iter() {
notifier.notify(root, sink);
}
let subscriptions = subscriptions.root_subscriptions.read().unwrap();
for (_, sink) in subscriptions.iter() {
notifier.notify(root, sink);
}
// Prune old pending notifications
pending_gossip_notifications = pending_gossip_notifications
.into_iter()
.filter(|&s| s > root)
.collect();
}
NotificationEntry::Bank(commitment_slots) => {
RpcSubscriptions::notify_accounts_programs_signatures(
@@ -828,36 +815,13 @@ impl RpcSubscriptions {
"bank",
)
}
NotificationEntry::Frozen(slot) => {
if pending_gossip_notifications.remove(&slot) {
Self::process_gossip_notification(
slot,
&notifier,
&subscriptions,
&bank_forks,
&last_checked_slots,
);
}
}
NotificationEntry::Gossip(slot) => {
let bank_frozen = bank_forks
.read()
.unwrap()
.get(slot)
.filter(|b| b.is_frozen())
.is_some();
if !bank_frozen {
pending_gossip_notifications.insert(slot);
} else {
Self::process_gossip_notification(
slot,
&notifier,
&subscriptions,
&bank_forks,
&last_checked_slots,
);
}
Self::process_gossip_notification(
slot,
&notifier,
&subscriptions,
&bank_forks,
);
}
NotificationEntry::SignaturesReceived(slot_signatures) => {
RpcSubscriptions::process_signatures_received(
@@ -883,23 +847,7 @@ impl RpcSubscriptions {
notifier: &RpcNotifier,
subscriptions: &Subscriptions,
bank_forks: &Arc<RwLock<BankForks>>,
last_checked_slots: &Arc<RwLock<HashMap<CommitmentLevel, Slot>>>,
) {
let mut last_checked_slots_lock = last_checked_slots.write().unwrap();
let last_checked_slot = last_checked_slots_lock
.get(&CommitmentLevel::SingleGossip)
.cloned()
.unwrap_or_default();
if slot > last_checked_slot {
last_checked_slots_lock.insert(CommitmentLevel::SingleGossip, slot);
} else {
// Avoid sending stale or duplicate notifications
return;
}
drop(last_checked_slots_lock);
let commitment_slots = CommitmentSlots {
highest_confirmed_slot: slot,
..CommitmentSlots::default()
@@ -1053,6 +1001,9 @@ impl RpcSubscriptions {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::optimistically_confirmed_bank_tracker::{
BankNotification, OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
};
use jsonrpc_core::futures::{self, stream::Stream};
use jsonrpc_pubsub::typed::Subscriber;
use serial_test_derive::serial;
@@ -1118,6 +1069,7 @@ pub(crate) mod tests {
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1,
))),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
);
subscriptions.add_account_subscription(
alice.pubkey(),
@@ -1216,10 +1168,13 @@ pub(crate) mod tests {
Subscriber::new_test("programNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let exit = Arc::new(AtomicBool::new(false));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = RpcSubscriptions::new(
&exit,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
);
subscriptions.add_program_subscription(
solana_budget_program::id(),
@@ -1326,10 +1281,13 @@ pub(crate) mod tests {
);
let exit = Arc::new(AtomicBool::new(false));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = RpcSubscriptions::new(
&exit,
bank_forks,
Arc::new(RwLock::new(block_commitment_cache)),
optimistically_confirmed_bank,
);
let (past_bank_sub1, _id_receiver, past_bank_recv1) =
@@ -1482,10 +1440,13 @@ pub(crate) mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = RpcSubscriptions::new(
&exit,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
);
subscriptions.add_slot_subscription(sub_id.clone(), subscriber);
@@ -1530,10 +1491,13 @@ pub(crate) mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = RpcSubscriptions::new(
&exit,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
);
subscriptions.add_root_subscription(sub_id.clone(), subscriber);
@@ -1632,18 +1596,23 @@ pub(crate) mod tests {
bank_forks.write().unwrap().insert(bank2);
let alice = Keypair::new();
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let mut pending_optimistically_confirmed_banks = HashSet::new();
let (subscriber0, _id_receiver, transport_receiver0) =
Subscriber::new_test("accountNotification");
let (subscriber1, _id_receiver, transport_receiver1) =
Subscriber::new_test("accountNotification");
let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1,
))),
);
optimistically_confirmed_bank.clone(),
));
let sub_id0 = SubscriptionId::Number(0 as u64);
subscriptions.add_account_subscription(
alice.pubkey(),
@@ -1687,10 +1656,22 @@ pub(crate) mod tests {
.unwrap();
// First, notify the unfrozen bank first to queue pending notification
subscriptions.notify_gossip_subscribers(2);
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(2),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
// Now, notify the frozen bank and ensure its notifications are processed
subscriptions.notify_gossip_subscribers(1);
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(1),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
let (response, _) = robust_poll_or_panic(transport_receiver0);
let expected = json!({
@@ -1725,7 +1706,14 @@ pub(crate) mod tests {
subscriber1,
);
subscriptions.notify_frozen(2);
let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone();
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::Frozen(bank2),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
let (response, _) = robust_poll_or_panic(transport_receiver1);
let expected = json!({
"jsonrpc": "2.0",