Rpc -> proper optimistic confirmation (#12514)

* Add service to track the most recent optimistically confirmed bank

* Plumb service into ClusterInfoVoteListener and ReplayStage

* Clean up test

* Use OptimisticallyConfirmedBank in RPC

* Remove superfluous notifications from RpcSubscriptions

* Use crossbeam to avoid mpsc recv_timeout panic

* Review comments

* Remove superfluous last_checked_slots, but pass in OptimisticallyConfirmedBank for complete correctness
This commit is contained in:
Tyera Eulberg
2020-09-28 20:43:05 -06:00
committed by GitHub
parent 06f84c65f1
commit 89621adca7
14 changed files with 684 additions and 150 deletions

View File

@ -2,6 +2,7 @@ use crate::{
cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
crds_value::CrdsValueLabel, crds_value::CrdsValueLabel,
optimistic_confirmation_verifier::OptimisticConfirmationVerifier, optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
poh_recorder::PohRecorder, poh_recorder::PohRecorder,
pubkey_references::LockedPubkeyReferences, pubkey_references::LockedPubkeyReferences,
result::{Error, Result}, result::{Error, Result},
@ -248,6 +249,7 @@ impl ClusterInfoVoteListener {
verified_vote_sender: VerifiedVoteSender, verified_vote_sender: VerifiedVoteSender,
replay_votes_receiver: ReplayVoteReceiver, replay_votes_receiver: ReplayVoteReceiver,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
bank_notification_sender: Option<BankNotificationSender>,
) -> Self { ) -> Self {
let exit_ = exit.clone(); let exit_ = exit.clone();
@ -293,6 +295,7 @@ impl ClusterInfoVoteListener {
verified_vote_sender, verified_vote_sender,
replay_votes_receiver, replay_votes_receiver,
blockstore, blockstore,
bank_notification_sender,
); );
}) })
.unwrap(); .unwrap();
@ -420,6 +423,7 @@ impl ClusterInfoVoteListener {
verified_vote_sender: VerifiedVoteSender, verified_vote_sender: VerifiedVoteSender,
replay_votes_receiver: ReplayVoteReceiver, replay_votes_receiver: ReplayVoteReceiver,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
bank_notification_sender: Option<BankNotificationSender>,
) -> Result<()> { ) -> Result<()> {
let mut optimistic_confirmation_verifier = let mut optimistic_confirmation_verifier =
OptimisticConfirmationVerifier::new(bank_forks.read().unwrap().root()); OptimisticConfirmationVerifier::new(bank_forks.read().unwrap().root());
@ -451,6 +455,7 @@ impl ClusterInfoVoteListener {
&subscriptions, &subscriptions,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver, &replay_votes_receiver,
&bank_notification_sender,
); );
if let Err(e) = optimistic_confirmed_slots { if let Err(e) = optimistic_confirmed_slots {
@ -485,6 +490,7 @@ impl ClusterInfoVoteListener {
subscriptions, subscriptions,
verified_vote_sender, verified_vote_sender,
replay_votes_receiver, replay_votes_receiver,
&None,
) )
} }
@ -495,6 +501,7 @@ impl ClusterInfoVoteListener {
subscriptions: &RpcSubscriptions, subscriptions: &RpcSubscriptions,
verified_vote_sender: &VerifiedVoteSender, verified_vote_sender: &VerifiedVoteSender,
replay_votes_receiver: &ReplayVoteReceiver, replay_votes_receiver: &ReplayVoteReceiver,
bank_notification_sender: &Option<BankNotificationSender>,
) -> Result<Vec<(Slot, Hash)>> { ) -> Result<Vec<(Slot, Hash)>> {
let mut sel = Select::new(); let mut sel = Select::new();
sel.recv(gossip_vote_txs_receiver); sel.recv(gossip_vote_txs_receiver);
@ -523,6 +530,7 @@ impl ClusterInfoVoteListener {
root_bank, root_bank,
subscriptions, subscriptions,
verified_vote_sender, verified_vote_sender,
bank_notification_sender,
)); ));
} else { } else {
remaining_wait_time = remaining_wait_time remaining_wait_time = remaining_wait_time
@ -543,6 +551,7 @@ impl ClusterInfoVoteListener {
diff: &mut HashMap<Slot, HashMap<Arc<Pubkey>, bool>>, diff: &mut HashMap<Slot, HashMap<Arc<Pubkey>, bool>>,
new_optimistic_confirmed_slots: &mut Vec<(Slot, Hash)>, new_optimistic_confirmed_slots: &mut Vec<(Slot, Hash)>,
is_gossip_vote: bool, is_gossip_vote: bool,
bank_notification_sender: &Option<BankNotificationSender>,
) { ) {
if vote.slots.is_empty() { if vote.slots.is_empty() {
return; return;
@ -595,7 +604,13 @@ impl ClusterInfoVoteListener {
if is_confirmed { if is_confirmed {
new_optimistic_confirmed_slots.push((*slot, last_vote_hash)); new_optimistic_confirmed_slots.push((*slot, last_vote_hash));
// Notify subscribers about new optimistic confirmation // Notify subscribers about new optimistic confirmation
subscriptions.notify_gossip_subscribers(*slot); if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::OptimisticallyConfirmed(*slot))
.unwrap_or_else(|err| {
warn!("bank_notification_sender failed: {:?}", err)
});
}
} }
if !is_new && !is_gossip_vote { if !is_new && !is_gossip_vote {
@ -636,6 +651,7 @@ impl ClusterInfoVoteListener {
root_bank: &Bank, root_bank: &Bank,
subscriptions: &RpcSubscriptions, subscriptions: &RpcSubscriptions,
verified_vote_sender: &VerifiedVoteSender, verified_vote_sender: &VerifiedVoteSender,
bank_notification_sender: &Option<BankNotificationSender>,
) -> Vec<(Slot, Hash)> { ) -> Vec<(Slot, Hash)> {
let mut diff: HashMap<Slot, HashMap<Arc<Pubkey>, bool>> = HashMap::new(); let mut diff: HashMap<Slot, HashMap<Arc<Pubkey>, bool>> = HashMap::new();
let mut new_optimistic_confirmed_slots = vec![]; let mut new_optimistic_confirmed_slots = vec![];
@ -686,6 +702,7 @@ impl ClusterInfoVoteListener {
&mut diff, &mut diff,
&mut new_optimistic_confirmed_slots, &mut new_optimistic_confirmed_slots,
is_gossip, is_gossip,
bank_notification_sender,
); );
} }
@ -769,6 +786,7 @@ impl ClusterInfoVoteListener {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank;
use solana_perf::packet; use solana_perf::packet;
use solana_runtime::{ use solana_runtime::{
bank::Bank, bank::Bank,
@ -996,6 +1014,7 @@ mod tests {
&subscriptions, &subscriptions,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver, &replay_votes_receiver,
&None,
) )
.unwrap(); .unwrap();
@ -1024,6 +1043,7 @@ mod tests {
&subscriptions, &subscriptions,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver, &replay_votes_receiver,
&None,
) )
.unwrap(); .unwrap();
@ -1101,6 +1121,7 @@ mod tests {
&subscriptions, &subscriptions,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver, &replay_votes_receiver,
&None,
) )
.unwrap(); .unwrap();
@ -1219,6 +1240,7 @@ mod tests {
&subscriptions, &subscriptions,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver, &replay_votes_receiver,
&None,
) )
.unwrap(); .unwrap();
@ -1313,6 +1335,7 @@ mod tests {
&subscriptions, &subscriptions,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver, &replay_votes_receiver,
&None,
); );
} }
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap(); let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap();
@ -1423,13 +1446,16 @@ mod tests {
); );
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let bank_forks = BankForks::new(bank); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = bank_forks.get(0).unwrap().clone(); let bank = bank_forks.read().unwrap().get(0).unwrap().clone();
let vote_tracker = VoteTracker::new(&bank); let vote_tracker = VoteTracker::new(&bank);
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = Arc::new(RpcSubscriptions::new( let subscriptions = Arc::new(RpcSubscriptions::new(
&exit, &exit,
Arc::new(RwLock::new(bank_forks)), bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())), Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
)); ));
// Send a vote to process, should add a reference to the pubkey for that voter // Send a vote to process, should add a reference to the pubkey for that voter
@ -1460,6 +1486,7 @@ mod tests {
&bank, &bank,
&subscriptions, &subscriptions,
&verified_vote_sender, &verified_vote_sender,
&None,
); );
let ref_count = Arc::strong_count( let ref_count = Arc::strong_count(
&vote_tracker &vote_tracker
@ -1529,6 +1556,7 @@ mod tests {
&new_root_bank, &new_root_bank,
&subscriptions, &subscriptions,
&verified_vote_sender, &verified_vote_sender,
&None,
); );
// Check new replay vote pubkey first // Check new replay vote pubkey first
@ -1578,12 +1606,15 @@ mod tests {
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let vote_tracker = VoteTracker::new(&bank); let vote_tracker = VoteTracker::new(&bank);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let bank_forks = BankForks::new(bank); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = bank_forks.get(0).unwrap().clone(); let bank = bank_forks.read().unwrap().get(0).unwrap().clone();
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = Arc::new(RpcSubscriptions::new( let subscriptions = Arc::new(RpcSubscriptions::new(
&exit, &exit,
Arc::new(RwLock::new(bank_forks)), bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())), Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
)); ));
// Integrity Checks // Integrity Checks

View File

@ -45,6 +45,7 @@ pub mod ledger_cleanup_service;
pub mod local_vote_signer_service; pub mod local_vote_signer_service;
pub mod non_circulating_supply; pub mod non_circulating_supply;
pub mod optimistic_confirmation_verifier; pub mod optimistic_confirmation_verifier;
pub mod optimistically_confirmed_bank_tracker;
pub mod poh_recorder; pub mod poh_recorder;
pub mod poh_service; pub mod poh_service;
pub mod progress_map; pub mod progress_map;

View File

@ -0,0 +1,294 @@
//! The `optimistically_confirmed_bank_tracker` module implements a threaded service to track the
//! most recent optimistically confirmed bank for use in rpc services, and triggers gossip
//! subscription notifications
use crate::rpc_subscriptions::RpcSubscriptions;
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::clock::Slot;
use std::{
collections::HashSet,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::Duration,
};
pub struct OptimisticallyConfirmedBank {
pub bank: Arc<Bank>,
}
impl OptimisticallyConfirmedBank {
pub fn locked_from_bank_forks_root(bank_forks: &Arc<RwLock<BankForks>>) -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(Self {
bank: bank_forks.read().unwrap().root_bank().clone(),
}))
}
}
pub enum BankNotification {
OptimisticallyConfirmed(Slot),
Frozen(Arc<Bank>),
Root(Arc<Bank>),
}
impl std::fmt::Debug for BankNotification {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
BankNotification::OptimisticallyConfirmed(slot) => {
write!(f, "OptimisticallyConfirmed({:?})", slot)
}
BankNotification::Frozen(bank) => write!(f, "Frozen({})", bank.slot()),
BankNotification::Root(bank) => write!(f, "Root({})", bank.slot()),
}
}
}
pub type BankNotificationReceiver = Receiver<BankNotification>;
pub type BankNotificationSender = Sender<BankNotification>;
pub struct OptimisticallyConfirmedBankTracker {
thread_hdl: JoinHandle<()>,
}
impl OptimisticallyConfirmedBankTracker {
pub fn new(
receiver: BankNotificationReceiver,
exit: &Arc<AtomicBool>,
bank_forks: Arc<RwLock<BankForks>>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
subscriptions: Arc<RpcSubscriptions>,
) -> Self {
let exit_ = exit.clone();
let mut pending_optimistically_confirmed_banks = HashSet::new();
let thread_hdl = Builder::new()
.name("solana-optimistic-bank-tracker".to_string())
.spawn(move || loop {
if exit_.load(Ordering::Relaxed) {
break;
}
if let Err(RecvTimeoutError::Disconnected) = Self::recv_notification(
&receiver,
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
) {
break;
}
})
.unwrap();
Self { thread_hdl }
}
fn recv_notification(
receiver: &Receiver<BankNotification>,
bank_forks: &Arc<RwLock<BankForks>>,
optimistically_confirmed_bank: &Arc<RwLock<OptimisticallyConfirmedBank>>,
subscriptions: &Arc<RpcSubscriptions>,
mut pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
) -> Result<(), RecvTimeoutError> {
let notification = receiver.recv_timeout(Duration::from_secs(1))?;
Self::process_notification(
notification,
bank_forks,
optimistically_confirmed_bank,
subscriptions,
&mut pending_optimistically_confirmed_banks,
);
Ok(())
}
pub(crate) fn process_notification(
notification: BankNotification,
bank_forks: &Arc<RwLock<BankForks>>,
optimistically_confirmed_bank: &Arc<RwLock<OptimisticallyConfirmedBank>>,
subscriptions: &Arc<RpcSubscriptions>,
pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
) {
debug!("received bank notification: {:?}", notification);
match notification {
BankNotification::OptimisticallyConfirmed(slot) => {
if let Some(bank) = bank_forks
.read()
.unwrap()
.get(slot)
.filter(|b| b.is_frozen())
{
let mut w_optimistically_confirmed_bank =
optimistically_confirmed_bank.write().unwrap();
if bank.slot() > w_optimistically_confirmed_bank.bank.slot() {
w_optimistically_confirmed_bank.bank = bank.clone();
subscriptions.notify_gossip_subscribers(slot);
}
drop(w_optimistically_confirmed_bank);
} else if slot > bank_forks.read().unwrap().root_bank().slot() {
pending_optimistically_confirmed_banks.insert(slot);
}
}
BankNotification::Frozen(bank) => {
let frozen_slot = bank.slot();
if pending_optimistically_confirmed_banks.remove(&bank.slot()) {
let mut w_optimistically_confirmed_bank =
optimistically_confirmed_bank.write().unwrap();
if frozen_slot > w_optimistically_confirmed_bank.bank.slot() {
w_optimistically_confirmed_bank.bank = bank;
subscriptions.notify_gossip_subscribers(frozen_slot);
}
drop(w_optimistically_confirmed_bank);
}
}
BankNotification::Root(bank) => {
let root_slot = bank.slot();
let mut w_optimistically_confirmed_bank =
optimistically_confirmed_bank.write().unwrap();
if root_slot > w_optimistically_confirmed_bank.bank.slot() {
w_optimistically_confirmed_bank.bank = bank;
}
drop(w_optimistically_confirmed_bank);
pending_optimistically_confirmed_banks.retain(|&s| s > root_slot);
}
}
}
pub fn close(self) -> thread::Result<()> {
self.join()
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
#[cfg(test)]
mod tests {
use super::*;
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use solana_runtime::commitment::BlockCommitmentCache;
use solana_sdk::pubkey::Pubkey;
#[test]
fn test_process_notification() {
let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone();
let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2);
bank_forks.write().unwrap().insert(bank2);
let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone();
let bank3 = Bank::new_from_parent(&bank2, &Pubkey::default(), 3);
bank_forks.write().unwrap().insert(bank3);
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
block_commitment_cache,
optimistically_confirmed_bank.clone(),
));
let mut pending_optimistically_confirmed_banks = HashSet::new();
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 0);
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(2),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
// Test max optimistically confirmed bank remains in the cache
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(1),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
// Test bank will only be cached when frozen
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(3),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
assert_eq!(pending_optimistically_confirmed_banks.contains(&3), true);
// Test bank will only be cached when frozen
let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone();
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::Frozen(bank3),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
// Test higher root will be cached and clear pending_optimistically_confirmed_banks
let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone();
let bank4 = Bank::new_from_parent(&bank3, &Pubkey::default(), 4);
bank_forks.write().unwrap().insert(bank4);
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(4),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
assert_eq!(pending_optimistically_confirmed_banks.contains(&4), true);
let bank4 = bank_forks.read().unwrap().get(4).unwrap().clone();
let bank5 = Bank::new_from_parent(&bank4, &Pubkey::default(), 5);
bank_forks.write().unwrap().insert(bank5);
let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone();
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::Root(bank5),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
assert_eq!(pending_optimistically_confirmed_banks.contains(&4), false);
// Banks <= root do not get added to pending list, even if not frozen
let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone();
let bank6 = Bank::new_from_parent(&bank5, &Pubkey::default(), 6);
bank_forks.write().unwrap().insert(bank6);
let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone();
let bank7 = Bank::new_from_parent(&bank5, &Pubkey::default(), 7);
bank_forks.write().unwrap().insert(bank7);
bank_forks.write().unwrap().set_root(7, &None, None);
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(6),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
assert_eq!(pending_optimistically_confirmed_banks.contains(&6), false);
}
}

View File

@ -11,6 +11,7 @@ use crate::{
consensus::{ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes}, consensus::{ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes},
fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, fork_choice::{ForkChoice, SelectVoteAndResetForkResult},
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
progress_map::{ForkProgress, ProgressMap, PropagatedStats}, progress_map::{ForkProgress, ProgressMap, PropagatedStats},
pubkey_references::PubkeyReferences, pubkey_references::PubkeyReferences,
@ -108,6 +109,7 @@ pub struct ReplayStageConfig {
pub transaction_status_sender: Option<TransactionStatusSender>, pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>, pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_time_sender: Option<CacheBlockTimeSender>, pub cache_block_time_sender: Option<CacheBlockTimeSender>,
pub bank_notification_sender: Option<BankNotificationSender>,
} }
#[derive(Default)] #[derive(Default)]
@ -239,6 +241,7 @@ impl ReplayStage {
transaction_status_sender, transaction_status_sender,
rewards_recorder_sender, rewards_recorder_sender,
cache_block_time_sender, cache_block_time_sender,
bank_notification_sender,
} = config; } = config;
trace!("replay stage"); trace!("replay stage");
@ -308,8 +311,8 @@ impl ReplayStage {
transaction_status_sender.clone(), transaction_status_sender.clone(),
&verify_recyclers, &verify_recyclers,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&subscriptions,
&replay_vote_sender, &replay_vote_sender,
&bank_notification_sender,
); );
replay_active_banks_time.stop(); replay_active_banks_time.stop();
Self::report_memory(&allocated, "replay_active_banks", start); Self::report_memory(&allocated, "replay_active_banks", start);
@ -462,6 +465,7 @@ impl ReplayStage {
&block_commitment_cache, &block_commitment_cache,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&cache_block_time_sender, &cache_block_time_sender,
&bank_notification_sender,
)?; )?;
}; };
voting_time.stop(); voting_time.stop();
@ -1032,6 +1036,7 @@ impl ReplayStage {
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
cache_block_time_sender: &Option<CacheBlockTimeSender>, cache_block_time_sender: &Option<CacheBlockTimeSender>,
bank_notification_sender: &Option<BankNotificationSender>,
) -> Result<()> { ) -> Result<()> {
if bank.is_empty() { if bank.is_empty() {
inc_new_counter_info!("replay_stage-voted_empty_bank", 1); inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
@ -1055,7 +1060,7 @@ impl ReplayStage {
.expect("Root bank doesn't exist") .expect("Root bank doesn't exist")
.clone(); .clone();
let mut rooted_banks = root_bank.parents(); let mut rooted_banks = root_bank.parents();
rooted_banks.push(root_bank); rooted_banks.push(root_bank.clone());
let rooted_slots: Vec<_> = rooted_banks.iter().map(|bank| bank.slot()).collect(); let rooted_slots: Vec<_> = rooted_banks.iter().map(|bank| bank.slot()).collect();
// Call leader schedule_cache.set_root() before blockstore.set_root() because // Call leader schedule_cache.set_root() before blockstore.set_root() because
// bank_forks.root is consumed by repair_service to update gossip, so we don't want to // bank_forks.root is consumed by repair_service to update gossip, so we don't want to
@ -1087,6 +1092,11 @@ impl ReplayStage {
heaviest_subtree_fork_choice, heaviest_subtree_fork_choice,
); );
subscriptions.notify_roots(rooted_slots); subscriptions.notify_roots(rooted_slots);
if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::Root(root_bank))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
latest_root_senders.iter().for_each(|s| { latest_root_senders.iter().for_each(|s| {
if let Err(e) = s.send(new_root) { if let Err(e) = s.send(new_root) {
trace!("latest root send failed: {:?}", e); trace!("latest root send failed: {:?}", e);
@ -1253,8 +1263,8 @@ impl ReplayStage {
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
verify_recyclers: &VerifyRecyclers, verify_recyclers: &VerifyRecyclers,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
subscriptions: &Arc<RpcSubscriptions>,
replay_vote_sender: &ReplayVoteSender, replay_vote_sender: &ReplayVoteSender,
bank_notification_sender: &Option<BankNotificationSender>,
) -> bool { ) -> bool {
let mut did_complete_bank = false; let mut did_complete_bank = false;
let mut tx_count = 0; let mut tx_count = 0;
@ -1325,7 +1335,11 @@ impl ReplayStage {
bank.freeze(); bank.freeze();
heaviest_subtree_fork_choice heaviest_subtree_fork_choice
.add_new_leaf_slot(bank.slot(), Some(bank.parent_slot())); .add_new_leaf_slot(bank.slot(), Some(bank.parent_slot()));
subscriptions.notify_frozen(bank.slot()); if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::Frozen(bank.clone()))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
} else { } else {
trace!( trace!(
"bank {} not completed tick_height: {}, max_tick_height: {}", "bank {} not completed tick_height: {}, max_tick_height: {}",
@ -1961,6 +1975,7 @@ pub(crate) mod tests {
use crate::{ use crate::{
consensus::test::{initialize_state, VoteSimulator}, consensus::test::{initialize_state, VoteSimulator},
consensus::Tower, consensus::Tower,
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
progress_map::ValidatorStakeInfo, progress_map::ValidatorStakeInfo,
replay_stage::ReplayStage, replay_stage::ReplayStage,
transaction_status_service::TransactionStatusService, transaction_status_service::TransactionStatusService,
@ -2073,11 +2088,14 @@ pub(crate) mod tests {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0)));
// RpcSubscriptions // RpcSubscriptions
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let rpc_subscriptions = Arc::new(RpcSubscriptions::new( let rpc_subscriptions = Arc::new(RpcSubscriptions::new(
&exit, &exit,
bank_forks.clone(), bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default())), Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
)); ));
ReplayBlockstoreComponents { ReplayBlockstoreComponents {
@ -2568,6 +2586,7 @@ pub(crate) mod tests {
&exit, &exit,
bank_forks.clone(), bank_forks.clone(),
block_commitment_cache.clone(), block_commitment_cache.clone(),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)); ));
let (lockouts_sender, _) = let (lockouts_sender, _) =
AggregateCommitmentService::new(&exit, block_commitment_cache.clone(), subscriptions); AggregateCommitmentService::new(&exit, block_commitment_cache.clone(), subscriptions);

View File

@ -4,6 +4,7 @@ use crate::{
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
contact_info::ContactInfo, contact_info::ContactInfo,
non_circulating_supply::calculate_non_circulating_supply, non_circulating_supply::calculate_non_circulating_supply,
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
rpc_error::RpcCustomError, rpc_error::RpcCustomError,
rpc_health::*, rpc_health::*,
send_transaction_service::{SendTransactionService, TransactionInfo}, send_transaction_service::{SendTransactionService, TransactionInfo},
@ -121,6 +122,7 @@ pub struct JsonRpcRequestProcessor {
transaction_sender: Arc<Mutex<Sender<TransactionInfo>>>, transaction_sender: Arc<Mutex<Sender<TransactionInfo>>>,
runtime_handle: runtime::Handle, runtime_handle: runtime::Handle,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>, bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
} }
impl Metadata for JsonRpcRequestProcessor {} impl Metadata for JsonRpcRequestProcessor {}
@ -134,6 +136,17 @@ impl JsonRpcRequestProcessor {
Some(config) => config.commitment, Some(config) => config.commitment,
}; };
if commitment_level == CommitmentLevel::SingleGossip {
let bank = self
.optimistically_confirmed_bank
.read()
.unwrap()
.bank
.clone();
debug!("RPC using optimistically confirmed slot: {:?}", bank.slot());
return bank;
}
let slot = self let slot = self
.block_commitment_cache .block_commitment_cache
.read() .read()
@ -147,12 +160,13 @@ impl JsonRpcRequestProcessor {
CommitmentLevel::Root => { CommitmentLevel::Root => {
debug!("RPC using node root: {:?}", slot); debug!("RPC using node root: {:?}", slot);
} }
CommitmentLevel::Single | CommitmentLevel::SingleGossip => { CommitmentLevel::Single => {
debug!("RPC using confirmed slot: {:?}", slot); debug!("RPC using confirmed slot: {:?}", slot);
} }
CommitmentLevel::Max => { CommitmentLevel::Max => {
debug!("RPC using block: {:?}", slot); debug!("RPC using block: {:?}", slot);
} }
CommitmentLevel::SingleGossip => unreachable!(),
}; };
r_bank_forks.get(slot).cloned().unwrap_or_else(|| { r_bank_forks.get(slot).cloned().unwrap_or_else(|| {
@ -187,6 +201,7 @@ impl JsonRpcRequestProcessor {
genesis_hash: Hash, genesis_hash: Hash,
runtime: &runtime::Runtime, runtime: &runtime::Runtime,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>, bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
) -> (Self, Receiver<TransactionInfo>) { ) -> (Self, Receiver<TransactionInfo>) {
let (sender, receiver) = channel(); let (sender, receiver) = channel();
( (
@ -202,6 +217,7 @@ impl JsonRpcRequestProcessor {
transaction_sender: Arc::new(Mutex::new(sender)), transaction_sender: Arc::new(Mutex::new(sender)),
runtime_handle: runtime.handle().clone(), runtime_handle: runtime.handle().clone(),
bigtable_ledger_storage, bigtable_ledger_storage,
optimistically_confirmed_bank,
}, },
receiver, receiver,
) )
@ -237,6 +253,9 @@ impl JsonRpcRequestProcessor {
transaction_sender: Arc::new(Mutex::new(sender)), transaction_sender: Arc::new(Mutex::new(sender)),
runtime_handle: runtime::Runtime::new().unwrap().handle().clone(), runtime_handle: runtime::Runtime::new().unwrap().handle().clone(),
bigtable_ledger_storage: None, bigtable_ledger_storage: None,
optimistically_confirmed_bank: Arc::new(RwLock::new(OptimisticallyConfirmedBank {
bank: bank.clone(),
})),
} }
} }
@ -2553,8 +2572,13 @@ pub(crate) fn create_validator_exit(exit: &Arc<AtomicBool>) -> Arc<RwLock<Option
pub mod tests { pub mod tests {
use super::*; use super::*;
use crate::{ use crate::{
contact_info::ContactInfo, non_circulating_supply::non_circulating_accounts, contact_info::ContactInfo,
non_circulating_supply::non_circulating_accounts,
optimistically_confirmed_bank_tracker::{
BankNotification, OptimisticallyConfirmedBankTracker,
},
replay_stage::tests::create_test_transactions_and_populate_blockstore, replay_stage::tests::create_test_transactions_and_populate_blockstore,
rpc_subscriptions::RpcSubscriptions,
}; };
use bincode::deserialize; use bincode::deserialize;
use jsonrpc_core::{ use jsonrpc_core::{
@ -2752,6 +2776,7 @@ pub mod tests {
Hash::default(), Hash::default(),
&runtime::Runtime::new().unwrap(), &runtime::Runtime::new().unwrap(),
None, None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
); );
SendTransactionService::new(tpu_address, &bank_forks, None, receiver); SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
@ -4096,31 +4121,13 @@ pub mod tests {
#[test] #[test]
fn test_rpc_send_bad_tx() { fn test_rpc_send_bad_tx() {
let exit = Arc::new(AtomicBool::new(false)); let genesis = create_genesis_config(100);
let validator_exit = create_validator_exit(&exit); let bank = Arc::new(Bank::new(&genesis.genesis_config));
let ledger_path = get_tmp_ledger_path!(); let meta = JsonRpcRequestProcessor::new_from_bank(&bank);
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let mut io = MetaIoHandler::default(); let mut io = MetaIoHandler::default();
let rpc = RpcSolImpl; let rpc = RpcSolImpl;
io.extend_with(rpc.to_delegate()); io.extend_with(rpc.to_delegate());
let cluster_info = Arc::new(ClusterInfo::default());
let tpu_address = cluster_info.my_contact_info().tpu;
let bank_forks = new_bank_forks().0;
let (meta, receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig::default(),
new_bank_forks().0,
block_commitment_cache,
blockstore,
validator_exit,
RpcHealth::stub(),
cluster_info,
Hash::default(),
&runtime::Runtime::new().unwrap(),
None,
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
let req = r#"{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["37u9WtQpcm6ULa3Vmu7ySnANv"]}"#; let req = r#"{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["37u9WtQpcm6ULa3Vmu7ySnANv"]}"#;
let res = io.handle_request_sync(req, meta); let res = io.handle_request_sync(req, meta);
@ -4160,6 +4167,7 @@ pub mod tests {
Hash::default(), Hash::default(),
&runtime::Runtime::new().unwrap(), &runtime::Runtime::new().unwrap(),
None, None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
); );
SendTransactionService::new(tpu_address, &bank_forks, None, receiver); SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
@ -4342,6 +4350,7 @@ pub mod tests {
Hash::default(), Hash::default(),
&runtime::Runtime::new().unwrap(), &runtime::Runtime::new().unwrap(),
None, None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
); );
SendTransactionService::new(tpu_address, &bank_forks, None, receiver); SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
assert_eq!(request_processor.validator_exit(), false); assert_eq!(request_processor.validator_exit(), false);
@ -4371,6 +4380,7 @@ pub mod tests {
Hash::default(), Hash::default(),
&runtime::Runtime::new().unwrap(), &runtime::Runtime::new().unwrap(),
None, None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
); );
SendTransactionService::new(tpu_address, &bank_forks, None, receiver); SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
assert_eq!(request_processor.validator_exit(), true); assert_eq!(request_processor.validator_exit(), true);
@ -4459,6 +4469,7 @@ pub mod tests {
Hash::default(), Hash::default(),
&runtime::Runtime::new().unwrap(), &runtime::Runtime::new().unwrap(),
None, None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
); );
SendTransactionService::new(tpu_address, &bank_forks, None, receiver); SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
assert_eq!( assert_eq!(
@ -5525,4 +5536,118 @@ pub mod tests {
}) })
); );
} }
#[test]
fn test_rpc_single_gossip() {
let exit = Arc::new(AtomicBool::new(false));
let validator_exit = create_validator_exit(&exit);
let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let cluster_info = Arc::new(ClusterInfo::default());
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone();
let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2);
bank_forks.write().unwrap().insert(bank2);
let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone();
let bank3 = Bank::new_from_parent(&bank2, &Pubkey::default(), 3);
bank_forks.write().unwrap().insert(bank3);
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let mut pending_optimistically_confirmed_banks = HashSet::new();
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
block_commitment_cache.clone(),
optimistically_confirmed_bank.clone(),
));
let (meta, _receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig::default(),
bank_forks.clone(),
block_commitment_cache,
blockstore,
validator_exit,
RpcHealth::stub(),
cluster_info,
Hash::default(),
&runtime::Runtime::new().unwrap(),
None,
optimistically_confirmed_bank.clone(),
);
let mut io = MetaIoHandler::default();
io.extend_with(RpcSolImpl.to_delegate());
let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment":"singleGossip"}]}"#;
let res = io.handle_request_sync(req, meta.clone());
let json: Value = serde_json::from_str(&res.unwrap()).unwrap();
let slot: Slot = serde_json::from_value(json["result"].clone()).unwrap();
assert_eq!(slot, 0);
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(2),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "singleGossip"}]}"#;
let res = io.handle_request_sync(&req, meta.clone());
let json: Value = serde_json::from_str(&res.unwrap()).unwrap();
let slot: Slot = serde_json::from_value(json["result"].clone()).unwrap();
assert_eq!(slot, 2);
// Test rollback does not appear to happen, even if slots are notified out of order
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(1),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "singleGossip"}]}"#;
let res = io.handle_request_sync(&req, meta.clone());
let json: Value = serde_json::from_str(&res.unwrap()).unwrap();
let slot: Slot = serde_json::from_value(json["result"].clone()).unwrap();
assert_eq!(slot, 2);
// Test bank will only be cached when frozen
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(3),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "singleGossip"}]}"#;
let res = io.handle_request_sync(&req, meta.clone());
let json: Value = serde_json::from_str(&res.unwrap()).unwrap();
let slot: Slot = serde_json::from_value(json["result"].clone()).unwrap();
assert_eq!(slot, 2);
// Test freezing an optimistically confirmed bank will update cache
let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone();
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::Frozen(bank3),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "singleGossip"}]}"#;
let res = io.handle_request_sync(&req, meta);
let json: Value = serde_json::from_str(&res.unwrap()).unwrap();
let slot: Slot = serde_json::from_value(json["result"].clone()).unwrap();
assert_eq!(slot, 3);
}
} }

View File

@ -354,6 +354,7 @@ mod tests {
use super::*; use super::*;
use crate::{ use crate::{
cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker}, cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker},
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
rpc_subscriptions::tests::robust_poll_or_panic, rpc_subscriptions::tests::robust_poll_or_panic,
}; };
use crossbeam_channel::unbounded; use crossbeam_channel::unbounded;
@ -431,6 +432,7 @@ mod tests {
&Arc::new(AtomicBool::new(false)), &Arc::new(AtomicBool::new(false)),
bank_forks.clone(), bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)), )),
uid: Arc::new(atomic::AtomicUsize::default()), uid: Arc::new(atomic::AtomicUsize::default()),
}; };
@ -568,6 +570,7 @@ mod tests {
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1, 1, 1,
))), ))),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)), )),
uid: Arc::new(atomic::AtomicUsize::default()), uid: Arc::new(atomic::AtomicUsize::default()),
}; };
@ -676,6 +679,7 @@ mod tests {
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1, 1, 1,
))), ))),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)), )),
uid: Arc::new(atomic::AtomicUsize::default()), uid: Arc::new(atomic::AtomicUsize::default()),
}; };
@ -798,6 +802,7 @@ mod tests {
&exit, &exit,
bank_forks.clone(), bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
); );
rpc.subscriptions = Arc::new(subscriptions); rpc.subscriptions = Arc::new(subscriptions);
let session = create_session(); let session = create_session();
@ -847,8 +852,12 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests()));
let subscriptions = let subscriptions = RpcSubscriptions::new(
RpcSubscriptions::new(&exit, bank_forks.clone(), block_commitment_cache); &exit,
bank_forks.clone(),
block_commitment_cache,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
);
rpc.subscriptions = Arc::new(subscriptions); rpc.subscriptions = Arc::new(subscriptions);
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification");
@ -991,7 +1000,14 @@ mod tests {
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("voteNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("voteNotification");
// Setup Subscriptions // Setup Subscriptions
let subscriptions = RpcSubscriptions::new(&exit, bank_forks, block_commitment_cache); let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = RpcSubscriptions::new(
&exit,
bank_forks,
block_commitment_cache,
optimistically_confirmed_bank,
);
rpc.subscriptions = Arc::new(subscriptions); rpc.subscriptions = Arc::new(subscriptions);
rpc.vote_subscribe(session, subscriber); rpc.vote_subscribe(session, subscriber);

View File

@ -73,6 +73,7 @@ impl PubSubService {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank;
use solana_runtime::{ use solana_runtime::{
bank::Bank, bank::Bank,
bank_forks::BankForks, bank_forks::BankForks,
@ -91,10 +92,13 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = Arc::new(RpcSubscriptions::new( let subscriptions = Arc::new(RpcSubscriptions::new(
&exit, &exit,
bank_forks, bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
)); ));
let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit);
let thread = pubsub_service.thread_hdl.thread(); let thread = pubsub_service.thread_hdl.thread();

View File

@ -3,6 +3,7 @@
use crate::{ use crate::{
bigtable_upload_service::BigTableUploadService, bigtable_upload_service::BigTableUploadService,
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
poh_recorder::PohRecorder, poh_recorder::PohRecorder,
rpc::*, rpc::*,
rpc_health::*, rpc_health::*,
@ -249,6 +250,7 @@ impl JsonRpcService {
validator_exit: Arc<RwLock<Option<ValidatorExit>>>, validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
trusted_validators: Option<HashSet<Pubkey>>, trusted_validators: Option<HashSet<Pubkey>>,
override_health_check: Arc<AtomicBool>, override_health_check: Arc<AtomicBool>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
) -> Self { ) -> Self {
info!("rpc bound to {:?}", rpc_addr); info!("rpc bound to {:?}", rpc_addr);
info!("rpc configuration: {:?}", config); info!("rpc configuration: {:?}", config);
@ -311,6 +313,7 @@ impl JsonRpcService {
genesis_hash, genesis_hash,
&runtime, &runtime,
bigtable_ledger_storage, bigtable_ledger_storage,
optimistically_confirmed_bank,
); );
let leader_info = let leader_info =
@ -436,6 +439,8 @@ mod tests {
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let mut rpc_service = JsonRpcService::new( let mut rpc_service = JsonRpcService::new(
rpc_addr, rpc_addr,
JsonRpcConfig::default(), JsonRpcConfig::default(),
@ -450,6 +455,7 @@ mod tests {
validator_exit, validator_exit,
None, None,
Arc::new(AtomicBool::new(false)), Arc::new(AtomicBool::new(false)),
optimistically_confirmed_bank,
); );
let thread = rpc_service.thread_hdl.thread(); let thread = rpc_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); assert_eq!(thread.name().unwrap(), "solana-jsonrpc");

View File

@ -1,6 +1,9 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request //! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::rpc::{get_parsed_token_account, get_parsed_token_accounts}; use crate::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
rpc::{get_parsed_token_account, get_parsed_token_accounts},
};
use core::hash::Hash; use core::hash::Hash;
use jsonrpc_core::futures::Future; use jsonrpc_core::futures::Future;
use jsonrpc_pubsub::{ use jsonrpc_pubsub::{
@ -61,7 +64,6 @@ enum NotificationEntry {
Slot(SlotInfo), Slot(SlotInfo),
Vote(Vote), Vote(Vote),
Root(Slot), Root(Slot),
Frozen(Slot),
Bank(CommitmentSlots), Bank(CommitmentSlots),
Gossip(Slot), Gossip(Slot),
SignaturesReceived((Slot, Vec<Signature>)), SignaturesReceived((Slot, Vec<Signature>)),
@ -71,7 +73,6 @@ impl std::fmt::Debug for NotificationEntry {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self { match self {
NotificationEntry::Root(root) => write!(f, "Root({})", root), NotificationEntry::Root(root) => write!(f, "Root({})", root),
NotificationEntry::Frozen(slot) => write!(f, "Frozen({})", slot),
NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote), NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote),
NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info),
NotificationEntry::Bank(commitment_slots) => { NotificationEntry::Bank(commitment_slots) => {
@ -346,7 +347,7 @@ pub struct RpcSubscriptions {
notifier_runtime: Option<Runtime>, notifier_runtime: Option<Runtime>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
last_checked_slots: Arc<RwLock<HashMap<CommitmentLevel, Slot>>>, optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
} }
@ -363,6 +364,7 @@ impl RpcSubscriptions {
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
) -> Self { ) -> Self {
let (notification_sender, notification_receiver): ( let (notification_sender, notification_receiver): (
Sender<NotificationEntry>, Sender<NotificationEntry>,
@ -396,9 +398,6 @@ impl RpcSubscriptions {
}; };
let _subscriptions = subscriptions.clone(); let _subscriptions = subscriptions.clone();
let last_checked_slots = Arc::new(RwLock::new(HashMap::new()));
let _last_checked_slots = last_checked_slots.clone();
let notifier_runtime = RuntimeBuilder::new() let notifier_runtime = RuntimeBuilder::new()
.core_threads(1) .core_threads(1)
.name_prefix("solana-rpc-notifier-") .name_prefix("solana-rpc-notifier-")
@ -415,7 +414,6 @@ impl RpcSubscriptions {
notification_receiver, notification_receiver,
_subscriptions, _subscriptions,
_bank_forks, _bank_forks,
_last_checked_slots,
); );
}) })
.unwrap(); .unwrap();
@ -427,16 +425,19 @@ impl RpcSubscriptions {
t_cleanup: Some(t_cleanup), t_cleanup: Some(t_cleanup),
bank_forks, bank_forks,
block_commitment_cache, block_commitment_cache,
last_checked_slots, optimistically_confirmed_bank,
exit: exit.clone(), exit: exit.clone(),
} }
} }
pub fn default_with_bank_forks(bank_forks: Arc<RwLock<BankForks>>) -> Self { pub fn default_with_bank_forks(bank_forks: Arc<RwLock<BankForks>>) -> Self {
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
Self::new( Self::new(
&Arc::new(AtomicBool::new(false)), &Arc::new(AtomicBool::new(false)),
bank_forks, bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())), Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
) )
} }
@ -529,12 +530,12 @@ impl RpcSubscriptions {
.read() .read()
.unwrap() .unwrap()
.highest_confirmed_slot(), .highest_confirmed_slot(),
CommitmentLevel::SingleGossip => *self CommitmentLevel::SingleGossip => self
.last_checked_slots .optimistically_confirmed_bank
.read() .read()
.unwrap() .unwrap()
.get(&CommitmentLevel::SingleGossip) .bank
.unwrap_or(&0), .slot(),
}; };
let last_notified_slot = if let Some((_account, slot)) = self let last_notified_slot = if let Some((_account, slot)) = self
.bank_forks .bank_forks
@ -724,10 +725,6 @@ impl RpcSubscriptions {
self.enqueue_notification(NotificationEntry::Vote(vote.clone())); self.enqueue_notification(NotificationEntry::Vote(vote.clone()));
} }
pub fn notify_frozen(&self, frozen_slot: Slot) {
self.enqueue_notification(NotificationEntry::Frozen(frozen_slot));
}
pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<Slot>) { pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<Slot>) {
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap(); let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap();
@ -769,9 +766,7 @@ impl RpcSubscriptions {
notification_receiver: Receiver<NotificationEntry>, notification_receiver: Receiver<NotificationEntry>,
subscriptions: Subscriptions, subscriptions: Subscriptions,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
last_checked_slots: Arc<RwLock<HashMap<CommitmentLevel, Slot>>>,
) { ) {
let mut pending_gossip_notifications = HashSet::new();
loop { loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
@ -804,18 +799,10 @@ impl RpcSubscriptions {
} }
NotificationEntry::Root(root) => { NotificationEntry::Root(root) => {
debug!("root notify: {:?}", root); debug!("root notify: {:?}", root);
{ let subscriptions = subscriptions.root_subscriptions.read().unwrap();
let subscriptions = subscriptions.root_subscriptions.read().unwrap(); for (_, sink) in subscriptions.iter() {
for (_, sink) in subscriptions.iter() { notifier.notify(root, sink);
notifier.notify(root, sink);
}
} }
// Prune old pending notifications
pending_gossip_notifications = pending_gossip_notifications
.into_iter()
.filter(|&s| s > root)
.collect();
} }
NotificationEntry::Bank(commitment_slots) => { NotificationEntry::Bank(commitment_slots) => {
RpcSubscriptions::notify_accounts_programs_signatures( RpcSubscriptions::notify_accounts_programs_signatures(
@ -828,36 +815,13 @@ impl RpcSubscriptions {
"bank", "bank",
) )
} }
NotificationEntry::Frozen(slot) => {
if pending_gossip_notifications.remove(&slot) {
Self::process_gossip_notification(
slot,
&notifier,
&subscriptions,
&bank_forks,
&last_checked_slots,
);
}
}
NotificationEntry::Gossip(slot) => { NotificationEntry::Gossip(slot) => {
let bank_frozen = bank_forks Self::process_gossip_notification(
.read() slot,
.unwrap() &notifier,
.get(slot) &subscriptions,
.filter(|b| b.is_frozen()) &bank_forks,
.is_some(); );
if !bank_frozen {
pending_gossip_notifications.insert(slot);
} else {
Self::process_gossip_notification(
slot,
&notifier,
&subscriptions,
&bank_forks,
&last_checked_slots,
);
}
} }
NotificationEntry::SignaturesReceived(slot_signatures) => { NotificationEntry::SignaturesReceived(slot_signatures) => {
RpcSubscriptions::process_signatures_received( RpcSubscriptions::process_signatures_received(
@ -883,23 +847,7 @@ impl RpcSubscriptions {
notifier: &RpcNotifier, notifier: &RpcNotifier,
subscriptions: &Subscriptions, subscriptions: &Subscriptions,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
last_checked_slots: &Arc<RwLock<HashMap<CommitmentLevel, Slot>>>,
) { ) {
let mut last_checked_slots_lock = last_checked_slots.write().unwrap();
let last_checked_slot = last_checked_slots_lock
.get(&CommitmentLevel::SingleGossip)
.cloned()
.unwrap_or_default();
if slot > last_checked_slot {
last_checked_slots_lock.insert(CommitmentLevel::SingleGossip, slot);
} else {
// Avoid sending stale or duplicate notifications
return;
}
drop(last_checked_slots_lock);
let commitment_slots = CommitmentSlots { let commitment_slots = CommitmentSlots {
highest_confirmed_slot: slot, highest_confirmed_slot: slot,
..CommitmentSlots::default() ..CommitmentSlots::default()
@ -1053,6 +1001,9 @@ impl RpcSubscriptions {
#[cfg(test)] #[cfg(test)]
pub(crate) mod tests { pub(crate) mod tests {
use super::*; use super::*;
use crate::optimistically_confirmed_bank_tracker::{
BankNotification, OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
};
use jsonrpc_core::futures::{self, stream::Stream}; use jsonrpc_core::futures::{self, stream::Stream};
use jsonrpc_pubsub::typed::Subscriber; use jsonrpc_pubsub::typed::Subscriber;
use serial_test_derive::serial; use serial_test_derive::serial;
@ -1118,6 +1069,7 @@ pub(crate) mod tests {
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1, 1, 1,
))), ))),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
); );
subscriptions.add_account_subscription( subscriptions.add_account_subscription(
alice.pubkey(), alice.pubkey(),
@ -1216,10 +1168,13 @@ pub(crate) mod tests {
Subscriber::new_test("programNotification"); Subscriber::new_test("programNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sub_id = SubscriptionId::Number(0 as u64);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = RpcSubscriptions::new( let subscriptions = RpcSubscriptions::new(
&exit, &exit,
bank_forks, bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
); );
subscriptions.add_program_subscription( subscriptions.add_program_subscription(
solana_stake_program::id(), solana_stake_program::id(),
@ -1324,10 +1279,13 @@ pub(crate) mod tests {
); );
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = RpcSubscriptions::new( let subscriptions = RpcSubscriptions::new(
&exit, &exit,
bank_forks, bank_forks,
Arc::new(RwLock::new(block_commitment_cache)), Arc::new(RwLock::new(block_commitment_cache)),
optimistically_confirmed_bank,
); );
let (past_bank_sub1, _id_receiver, past_bank_recv1) = let (past_bank_sub1, _id_receiver, past_bank_recv1) =
@ -1480,10 +1438,13 @@ pub(crate) mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = RpcSubscriptions::new( let subscriptions = RpcSubscriptions::new(
&exit, &exit,
bank_forks, bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
); );
subscriptions.add_slot_subscription(sub_id.clone(), subscriber); subscriptions.add_slot_subscription(sub_id.clone(), subscriber);
@ -1528,10 +1489,13 @@ pub(crate) mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = RpcSubscriptions::new( let subscriptions = RpcSubscriptions::new(
&exit, &exit,
bank_forks, bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
); );
subscriptions.add_root_subscription(sub_id.clone(), subscriber); subscriptions.add_root_subscription(sub_id.clone(), subscriber);
@ -1630,18 +1594,23 @@ pub(crate) mod tests {
bank_forks.write().unwrap().insert(bank2); bank_forks.write().unwrap().insert(bank2);
let alice = Keypair::new(); let alice = Keypair::new();
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let mut pending_optimistically_confirmed_banks = HashSet::new();
let (subscriber0, _id_receiver, transport_receiver0) = let (subscriber0, _id_receiver, transport_receiver0) =
Subscriber::new_test("accountNotification"); Subscriber::new_test("accountNotification");
let (subscriber1, _id_receiver, transport_receiver1) = let (subscriber1, _id_receiver, transport_receiver1) =
Subscriber::new_test("accountNotification"); Subscriber::new_test("accountNotification");
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new( let subscriptions = Arc::new(RpcSubscriptions::new(
&exit, &exit,
bank_forks.clone(), bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1, 1, 1,
))), ))),
); optimistically_confirmed_bank.clone(),
));
let sub_id0 = SubscriptionId::Number(0 as u64); let sub_id0 = SubscriptionId::Number(0 as u64);
subscriptions.add_account_subscription( subscriptions.add_account_subscription(
alice.pubkey(), alice.pubkey(),
@ -1685,10 +1654,22 @@ pub(crate) mod tests {
.unwrap(); .unwrap();
// First, notify the unfrozen bank first to queue pending notification // First, notify the unfrozen bank first to queue pending notification
subscriptions.notify_gossip_subscribers(2); OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(2),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
// Now, notify the frozen bank and ensure its notifications are processed // Now, notify the frozen bank and ensure its notifications are processed
subscriptions.notify_gossip_subscribers(1); OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(1),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
let (response, _) = robust_poll_or_panic(transport_receiver0); let (response, _) = robust_poll_or_panic(transport_receiver0);
let expected = json!({ let expected = json!({
@ -1723,7 +1704,14 @@ pub(crate) mod tests {
subscriber1, subscriber1,
); );
subscriptions.notify_frozen(2); let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone();
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::Frozen(bank2),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
let (response, _) = robust_poll_or_panic(transport_receiver1); let (response, _) = robust_poll_or_panic(transport_receiver1);
let expected = json!({ let expected = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",

View File

@ -7,6 +7,7 @@ use crate::{
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::{ClusterInfoVoteListener, VerifiedVoteSender, VoteTracker}, cluster_info_vote_listener::{ClusterInfoVoteListener, VerifiedVoteSender, VoteTracker},
fetch_stage::FetchStage, fetch_stage::FetchStage,
optimistically_confirmed_bank_tracker::BankNotificationSender,
poh_recorder::{PohRecorder, WorkingBankEntry}, poh_recorder::{PohRecorder, WorkingBankEntry},
rpc_subscriptions::RpcSubscriptions, rpc_subscriptions::RpcSubscriptions,
sigverify::TransactionSigVerifier, sigverify::TransactionSigVerifier,
@ -57,6 +58,7 @@ impl Tpu {
verified_vote_sender: VerifiedVoteSender, verified_vote_sender: VerifiedVoteSender,
replay_vote_receiver: ReplayVoteReceiver, replay_vote_receiver: ReplayVoteReceiver,
replay_vote_sender: ReplayVoteSender, replay_vote_sender: ReplayVoteSender,
bank_notification_sender: Option<BankNotificationSender>,
) -> Self { ) -> Self {
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender( let fetch_stage = FetchStage::new_with_sender(
@ -85,6 +87,7 @@ impl Tpu {
verified_vote_sender, verified_vote_sender,
replay_vote_receiver, replay_vote_receiver,
blockstore.clone(), blockstore.clone(),
bank_notification_sender,
); );
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(

View File

@ -11,6 +11,7 @@ use crate::{
completed_data_sets_service::CompletedDataSetsSender, completed_data_sets_service::CompletedDataSetsSender,
consensus::Tower, consensus::Tower,
ledger_cleanup_service::LedgerCleanupService, ledger_cleanup_service::LedgerCleanupService,
optimistically_confirmed_bank_tracker::BankNotificationSender,
poh_recorder::PohRecorder, poh_recorder::PohRecorder,
replay_stage::{ReplayStage, ReplayStageConfig}, replay_stage::{ReplayStage, ReplayStageConfig},
retransmit_stage::RetransmitStage, retransmit_stage::RetransmitStage,
@ -108,6 +109,7 @@ impl Tvu {
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
replay_vote_sender: ReplayVoteSender, replay_vote_sender: ReplayVoteSender,
completed_data_sets_sender: CompletedDataSetsSender, completed_data_sets_sender: CompletedDataSetsSender,
bank_notification_sender: Option<BankNotificationSender>,
tvu_config: TvuConfig, tvu_config: TvuConfig,
) -> Self { ) -> Self {
let keypair: Arc<Keypair> = cluster_info.keypair.clone(); let keypair: Arc<Keypair> = cluster_info.keypair.clone();
@ -219,6 +221,7 @@ impl Tvu {
transaction_status_sender, transaction_status_sender,
rewards_recorder_sender, rewards_recorder_sender,
cache_block_time_sender, cache_block_time_sender,
bank_notification_sender,
}; };
let replay_stage = ReplayStage::new( let replay_stage = ReplayStage::new(
@ -279,6 +282,7 @@ pub mod tests {
use crate::{ use crate::{
banking_stage::create_test_recorder, banking_stage::create_test_recorder,
cluster_info::{ClusterInfo, Node}, cluster_info::{ClusterInfo, Node},
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
}; };
use serial_test_derive::serial; use serial_test_derive::serial;
use solana_ledger::{ use solana_ledger::{
@ -348,6 +352,7 @@ pub mod tests {
&exit, &exit,
bank_forks.clone(), bank_forks.clone(),
block_commitment_cache.clone(), block_commitment_cache.clone(),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)), )),
&poh_recorder, &poh_recorder,
tower, tower,
@ -365,6 +370,7 @@ pub mod tests {
verified_vote_receiver, verified_vote_receiver,
replay_vote_sender, replay_vote_sender,
completed_data_sets_sender, completed_data_sets_sender,
None,
TvuConfig::default(), TvuConfig::default(),
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);

View File

@ -10,6 +10,9 @@ use crate::{
consensus::{reconcile_blockstore_roots_with_tower, Tower, TowerError}, consensus::{reconcile_blockstore_roots_with_tower, Tower, TowerError},
contact_info::ContactInfo, contact_info::ContactInfo,
gossip_service::GossipService, gossip_service::GossipService,
optimistically_confirmed_bank_tracker::{
OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
},
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
poh_service::PohService, poh_service::PohService,
rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService}, rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService},
@ -166,10 +169,17 @@ struct TransactionHistoryServices {
cache_block_time_service: Option<CacheBlockTimeService>, cache_block_time_service: Option<CacheBlockTimeService>,
} }
struct RpcServices {
json_rpc_service: JsonRpcService,
pubsub_service: PubSubService,
rpc_banks_service: RpcBanksService,
optimistically_confirmed_bank_tracker: OptimisticallyConfirmedBankTracker,
}
pub struct Validator { pub struct Validator {
pub id: Pubkey, pub id: Pubkey,
validator_exit: Arc<RwLock<Option<ValidatorExit>>>, validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
rpc_service: Option<(JsonRpcService, PubSubService, RpcBanksService)>, rpc_service: Option<RpcServices>,
transaction_status_service: Option<TransactionStatusService>, transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_service: Option<RewardsRecorderService>, rewards_recorder_service: Option<RewardsRecorderService>,
cache_block_time_service: Option<CacheBlockTimeService>, cache_block_time_service: Option<CacheBlockTimeService>,
@ -329,10 +339,14 @@ impl Validator {
block_commitment_cache.initialize_slots(bank.slot()); block_commitment_cache.initialize_slots(bank.slot());
let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache)); let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = Arc::new(RpcSubscriptions::new( let subscriptions = Arc::new(RpcSubscriptions::new(
&exit, &exit,
bank_forks.clone(), bank_forks.clone(),
block_commitment_cache.clone(), block_commitment_cache.clone(),
optimistically_confirmed_bank.clone(),
)); ));
let (completed_data_sets_sender, completed_data_sets_receiver) = let (completed_data_sets_sender, completed_data_sets_receiver) =
@ -391,9 +405,8 @@ impl Validator {
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let rpc_override_health_check = Arc::new(AtomicBool::new(false)); let rpc_override_health_check = Arc::new(AtomicBool::new(false));
let rpc_service = config let (rpc_service, bank_notification_sender) =
.rpc_addrs if let Some((rpc_addr, rpc_pubsub_addr, rpc_banks_addr)) = config.rpc_addrs {
.map(|(rpc_addr, rpc_pubsub_addr, rpc_banks_addr)| {
if ContactInfo::is_valid_address(&node.info.rpc) { if ContactInfo::is_valid_address(&node.info.rpc) {
assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub));
assert_eq!(rpc_addr.port(), node.info.rpc.port()); assert_eq!(rpc_addr.port(), node.info.rpc.port());
@ -403,32 +416,47 @@ impl Validator {
assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub)); assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub));
} }
let tpu_address = cluster_info.my_contact_info().tpu; let tpu_address = cluster_info.my_contact_info().tpu;
let (bank_notification_sender, bank_notification_receiver) = unbounded();
( (
JsonRpcService::new( Some(RpcServices {
rpc_addr, json_rpc_service: JsonRpcService::new(
config.rpc_config.clone(), rpc_addr,
config.snapshot_config.clone(), config.rpc_config.clone(),
bank_forks.clone(), config.snapshot_config.clone(),
block_commitment_cache.clone(), bank_forks.clone(),
blockstore.clone(), block_commitment_cache.clone(),
cluster_info.clone(), blockstore.clone(),
Some(poh_recorder.clone()), cluster_info.clone(),
genesis_config.hash(), Some(poh_recorder.clone()),
ledger_path, genesis_config.hash(),
validator_exit.clone(), ledger_path,
config.trusted_validators.clone(), validator_exit.clone(),
rpc_override_health_check.clone(), config.trusted_validators.clone(),
), rpc_override_health_check.clone(),
PubSubService::new(&subscriptions, rpc_pubsub_addr, &exit), optimistically_confirmed_bank.clone(),
RpcBanksService::new( ),
rpc_banks_addr, pubsub_service: PubSubService::new(&subscriptions, rpc_pubsub_addr, &exit),
tpu_address, rpc_banks_service: RpcBanksService::new(
&bank_forks, rpc_banks_addr,
&block_commitment_cache, tpu_address,
&exit, &bank_forks,
), &block_commitment_cache,
&exit,
),
optimistically_confirmed_bank_tracker:
OptimisticallyConfirmedBankTracker::new(
bank_notification_receiver,
&exit,
bank_forks.clone(),
optimistically_confirmed_bank,
subscriptions.clone(),
),
}),
Some(bank_notification_sender),
) )
}); } else {
(None, None)
};
let ip_echo_server = solana_net_utils::ip_echo_server(node.sockets.ip_echo.unwrap()); let ip_echo_server = solana_net_utils::ip_echo_server(node.sockets.ip_echo.unwrap());
@ -532,6 +560,7 @@ impl Validator {
verified_vote_receiver, verified_vote_receiver,
replay_vote_sender.clone(), replay_vote_sender.clone(),
completed_data_sets_sender, completed_data_sets_sender,
bank_notification_sender.clone(),
TvuConfig { TvuConfig {
max_ledger_shreds: config.max_ledger_shreds, max_ledger_shreds: config.max_ledger_shreds,
halt_on_trusted_validators_accounts_hash_mismatch: config halt_on_trusted_validators_accounts_hash_mismatch: config
@ -562,6 +591,7 @@ impl Validator {
verified_vote_sender, verified_vote_sender,
replay_vote_receiver, replay_vote_receiver,
replay_vote_sender, replay_vote_sender,
bank_notification_sender,
); );
datapoint_info!("validator-new", ("id", id.to_string(), String)); datapoint_info!("validator-new", ("id", id.to_string(), String));
@ -625,10 +655,17 @@ impl Validator {
pub fn join(self) -> Result<()> { pub fn join(self) -> Result<()> {
self.poh_service.join()?; self.poh_service.join()?;
drop(self.poh_recorder); drop(self.poh_recorder);
if let Some((rpc_service, rpc_pubsub_service, rpc_banks_service)) = self.rpc_service { if let Some(RpcServices {
rpc_service.join()?; json_rpc_service,
rpc_pubsub_service.join()?; pubsub_service,
rpc_banks_service,
optimistically_confirmed_bank_tracker,
}) = self.rpc_service
{
json_rpc_service.join()?;
pubsub_service.join()?;
rpc_banks_service.join()?; rpc_banks_service.join()?;
optimistically_confirmed_bank_tracker.join()?;
} }
if let Some(transaction_status_service) = self.transaction_status_service { if let Some(transaction_status_service) = self.transaction_status_service {
transaction_status_service.join()?; transaction_status_service.join()?;

View File

@ -1,5 +1,6 @@
use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::SlotInfo}; use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::SlotInfo};
use solana_core::{ use solana_core::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions, rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions,
test_validator::TestValidator, test_validator::TestValidator,
}; };
@ -91,10 +92,13 @@ fn test_slot_subscription() {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = Arc::new(RpcSubscriptions::new( let subscriptions = Arc::new(RpcSubscriptions::new(
&exit, &exit,
bank_forks, bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())), Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
)); ));
let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit);
std::thread::sleep(Duration::from_millis(400)); std::thread::sleep(Duration::from_millis(400));

View File

@ -104,8 +104,8 @@ impl BlockCommitmentCache {
} }
pub fn highest_gossip_confirmed_slot(&self) -> Slot { pub fn highest_gossip_confirmed_slot(&self) -> Slot {
// TODO: see solana_core::RpcSubscriptions: // TODO: combine bank caches
//self.last_checked_slots.get(&CommitmentLevel::SingleGossip).unwrap_or(&0) // Currently, this information is provided by OptimisticallyConfirmedBank::bank.slot()
self.highest_confirmed_slot() self.highest_confirmed_slot()
} }