diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index cd476d5c31..88c45c9ec6 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -2,6 +2,7 @@ use crate::{ cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, crds_value::CrdsValueLabel, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, + optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, poh_recorder::PohRecorder, pubkey_references::LockedPubkeyReferences, result::{Error, Result}, @@ -248,6 +249,7 @@ impl ClusterInfoVoteListener { verified_vote_sender: VerifiedVoteSender, replay_votes_receiver: ReplayVoteReceiver, blockstore: Arc, + bank_notification_sender: Option, ) -> Self { let exit_ = exit.clone(); @@ -293,6 +295,7 @@ impl ClusterInfoVoteListener { verified_vote_sender, replay_votes_receiver, blockstore, + bank_notification_sender, ); }) .unwrap(); @@ -420,6 +423,7 @@ impl ClusterInfoVoteListener { verified_vote_sender: VerifiedVoteSender, replay_votes_receiver: ReplayVoteReceiver, blockstore: Arc, + bank_notification_sender: Option, ) -> Result<()> { let mut optimistic_confirmation_verifier = OptimisticConfirmationVerifier::new(bank_forks.read().unwrap().root()); @@ -451,6 +455,7 @@ impl ClusterInfoVoteListener { &subscriptions, &verified_vote_sender, &replay_votes_receiver, + &bank_notification_sender, ); if let Err(e) = optimistic_confirmed_slots { @@ -485,6 +490,7 @@ impl ClusterInfoVoteListener { subscriptions, verified_vote_sender, replay_votes_receiver, + &None, ) } @@ -495,6 +501,7 @@ impl ClusterInfoVoteListener { subscriptions: &RpcSubscriptions, verified_vote_sender: &VerifiedVoteSender, replay_votes_receiver: &ReplayVoteReceiver, + bank_notification_sender: &Option, ) -> Result> { let mut sel = Select::new(); sel.recv(gossip_vote_txs_receiver); @@ -523,6 +530,7 @@ impl ClusterInfoVoteListener { root_bank, subscriptions, verified_vote_sender, + bank_notification_sender, )); } else { remaining_wait_time = remaining_wait_time @@ -543,6 +551,7 @@ impl ClusterInfoVoteListener { diff: &mut HashMap, bool>>, new_optimistic_confirmed_slots: &mut Vec<(Slot, Hash)>, is_gossip_vote: bool, + bank_notification_sender: &Option, ) { if vote.slots.is_empty() { return; @@ -595,7 +604,13 @@ impl ClusterInfoVoteListener { if is_confirmed { new_optimistic_confirmed_slots.push((*slot, last_vote_hash)); // Notify subscribers about new optimistic confirmation - subscriptions.notify_gossip_subscribers(*slot); + if let Some(sender) = bank_notification_sender { + sender + .send(BankNotification::OptimisticallyConfirmed(*slot)) + .unwrap_or_else(|err| { + warn!("bank_notification_sender failed: {:?}", err) + }); + } } if !is_new && !is_gossip_vote { @@ -636,6 +651,7 @@ impl ClusterInfoVoteListener { root_bank: &Bank, subscriptions: &RpcSubscriptions, verified_vote_sender: &VerifiedVoteSender, + bank_notification_sender: &Option, ) -> Vec<(Slot, Hash)> { let mut diff: HashMap, bool>> = HashMap::new(); let mut new_optimistic_confirmed_slots = vec![]; @@ -686,6 +702,7 @@ impl ClusterInfoVoteListener { &mut diff, &mut new_optimistic_confirmed_slots, is_gossip, + bank_notification_sender, ); } @@ -769,6 +786,7 @@ impl ClusterInfoVoteListener { #[cfg(test)] mod tests { use super::*; + use crate::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank; use solana_perf::packet; use solana_runtime::{ bank::Bank, @@ -996,6 +1014,7 @@ mod tests { &subscriptions, &verified_vote_sender, &replay_votes_receiver, + &None, ) .unwrap(); @@ -1024,6 +1043,7 @@ mod tests { &subscriptions, &verified_vote_sender, &replay_votes_receiver, + &None, ) .unwrap(); @@ -1101,6 +1121,7 @@ mod tests { &subscriptions, &verified_vote_sender, &replay_votes_receiver, + &None, ) .unwrap(); @@ -1219,6 +1240,7 @@ mod tests { &subscriptions, &verified_vote_sender, &replay_votes_receiver, + &None, ) .unwrap(); @@ -1313,6 +1335,7 @@ mod tests { &subscriptions, &verified_vote_sender, &replay_votes_receiver, + &None, ); } let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap(); @@ -1423,13 +1446,16 @@ mod tests { ); let bank = Bank::new(&genesis_config); let exit = Arc::new(AtomicBool::new(false)); - let bank_forks = BankForks::new(bank); - let bank = bank_forks.get(0).unwrap().clone(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = bank_forks.read().unwrap().get(0).unwrap().clone(); let vote_tracker = VoteTracker::new(&bank); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = Arc::new(RpcSubscriptions::new( &exit, - Arc::new(RwLock::new(bank_forks)), + bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), + optimistically_confirmed_bank, )); // Send a vote to process, should add a reference to the pubkey for that voter @@ -1460,6 +1486,7 @@ mod tests { &bank, &subscriptions, &verified_vote_sender, + &None, ); let ref_count = Arc::strong_count( &vote_tracker @@ -1529,6 +1556,7 @@ mod tests { &new_root_bank, &subscriptions, &verified_vote_sender, + &None, ); // Check new replay vote pubkey first @@ -1578,12 +1606,15 @@ mod tests { let bank = Bank::new(&genesis_config); let vote_tracker = VoteTracker::new(&bank); let exit = Arc::new(AtomicBool::new(false)); - let bank_forks = BankForks::new(bank); - let bank = bank_forks.get(0).unwrap().clone(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = bank_forks.read().unwrap().get(0).unwrap().clone(); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = Arc::new(RpcSubscriptions::new( &exit, - Arc::new(RwLock::new(bank_forks)), + bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), + optimistically_confirmed_bank, )); // Integrity Checks diff --git a/core/src/lib.rs b/core/src/lib.rs index d7c2130d4e..771c9ed7e6 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -45,6 +45,7 @@ pub mod ledger_cleanup_service; pub mod local_vote_signer_service; pub mod non_circulating_supply; pub mod optimistic_confirmation_verifier; +pub mod optimistically_confirmed_bank_tracker; pub mod poh_recorder; pub mod poh_service; pub mod progress_map; diff --git a/core/src/optimistically_confirmed_bank_tracker.rs b/core/src/optimistically_confirmed_bank_tracker.rs new file mode 100644 index 0000000000..8de96bac88 --- /dev/null +++ b/core/src/optimistically_confirmed_bank_tracker.rs @@ -0,0 +1,294 @@ +//! The `optimistically_confirmed_bank_tracker` module implements a threaded service to track the +//! most recent optimistically confirmed bank for use in rpc services, and triggers gossip +//! subscription notifications + +use crate::rpc_subscriptions::RpcSubscriptions; +use crossbeam_channel::{Receiver, RecvTimeoutError, Sender}; +use solana_runtime::{bank::Bank, bank_forks::BankForks}; +use solana_sdk::clock::Slot; +use std::{ + collections::HashSet, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, +}; + +pub struct OptimisticallyConfirmedBank { + pub bank: Arc, +} + +impl OptimisticallyConfirmedBank { + pub fn locked_from_bank_forks_root(bank_forks: &Arc>) -> Arc> { + Arc::new(RwLock::new(Self { + bank: bank_forks.read().unwrap().root_bank().clone(), + })) + } +} + +pub enum BankNotification { + OptimisticallyConfirmed(Slot), + Frozen(Arc), + Root(Arc), +} + +impl std::fmt::Debug for BankNotification { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + BankNotification::OptimisticallyConfirmed(slot) => { + write!(f, "OptimisticallyConfirmed({:?})", slot) + } + BankNotification::Frozen(bank) => write!(f, "Frozen({})", bank.slot()), + BankNotification::Root(bank) => write!(f, "Root({})", bank.slot()), + } + } +} + +pub type BankNotificationReceiver = Receiver; +pub type BankNotificationSender = Sender; + +pub struct OptimisticallyConfirmedBankTracker { + thread_hdl: JoinHandle<()>, +} + +impl OptimisticallyConfirmedBankTracker { + pub fn new( + receiver: BankNotificationReceiver, + exit: &Arc, + bank_forks: Arc>, + optimistically_confirmed_bank: Arc>, + subscriptions: Arc, + ) -> Self { + let exit_ = exit.clone(); + let mut pending_optimistically_confirmed_banks = HashSet::new(); + let thread_hdl = Builder::new() + .name("solana-optimistic-bank-tracker".to_string()) + .spawn(move || loop { + if exit_.load(Ordering::Relaxed) { + break; + } + + if let Err(RecvTimeoutError::Disconnected) = Self::recv_notification( + &receiver, + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + ) { + break; + } + }) + .unwrap(); + Self { thread_hdl } + } + + fn recv_notification( + receiver: &Receiver, + bank_forks: &Arc>, + optimistically_confirmed_bank: &Arc>, + subscriptions: &Arc, + mut pending_optimistically_confirmed_banks: &mut HashSet, + ) -> Result<(), RecvTimeoutError> { + let notification = receiver.recv_timeout(Duration::from_secs(1))?; + Self::process_notification( + notification, + bank_forks, + optimistically_confirmed_bank, + subscriptions, + &mut pending_optimistically_confirmed_banks, + ); + Ok(()) + } + + pub(crate) fn process_notification( + notification: BankNotification, + bank_forks: &Arc>, + optimistically_confirmed_bank: &Arc>, + subscriptions: &Arc, + pending_optimistically_confirmed_banks: &mut HashSet, + ) { + debug!("received bank notification: {:?}", notification); + match notification { + BankNotification::OptimisticallyConfirmed(slot) => { + if let Some(bank) = bank_forks + .read() + .unwrap() + .get(slot) + .filter(|b| b.is_frozen()) + { + let mut w_optimistically_confirmed_bank = + optimistically_confirmed_bank.write().unwrap(); + if bank.slot() > w_optimistically_confirmed_bank.bank.slot() { + w_optimistically_confirmed_bank.bank = bank.clone(); + subscriptions.notify_gossip_subscribers(slot); + } + drop(w_optimistically_confirmed_bank); + } else if slot > bank_forks.read().unwrap().root_bank().slot() { + pending_optimistically_confirmed_banks.insert(slot); + } + } + BankNotification::Frozen(bank) => { + let frozen_slot = bank.slot(); + if pending_optimistically_confirmed_banks.remove(&bank.slot()) { + let mut w_optimistically_confirmed_bank = + optimistically_confirmed_bank.write().unwrap(); + if frozen_slot > w_optimistically_confirmed_bank.bank.slot() { + w_optimistically_confirmed_bank.bank = bank; + subscriptions.notify_gossip_subscribers(frozen_slot); + } + drop(w_optimistically_confirmed_bank); + } + } + BankNotification::Root(bank) => { + let root_slot = bank.slot(); + let mut w_optimistically_confirmed_bank = + optimistically_confirmed_bank.write().unwrap(); + if root_slot > w_optimistically_confirmed_bank.bank.slot() { + w_optimistically_confirmed_bank.bank = bank; + } + drop(w_optimistically_confirmed_bank); + pending_optimistically_confirmed_banks.retain(|&s| s > root_slot); + } + } + } + + pub fn close(self) -> thread::Result<()> { + self.join() + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; + use solana_runtime::commitment::BlockCommitmentCache; + use solana_sdk::pubkey::Pubkey; + + #[test] + fn test_process_notification() { + let exit = Arc::new(AtomicBool::new(false)); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank_forks.write().unwrap().insert(bank1); + let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone(); + let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); + bank_forks.write().unwrap().insert(bank2); + let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone(); + let bank3 = Bank::new_from_parent(&bank2, &Pubkey::default(), 3); + bank_forks.write().unwrap().insert(bank3); + + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + bank_forks.clone(), + block_commitment_cache, + optimistically_confirmed_bank.clone(), + )); + let mut pending_optimistically_confirmed_banks = HashSet::new(); + + assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 0); + + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::OptimisticallyConfirmed(2), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + ); + assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); + + // Test max optimistically confirmed bank remains in the cache + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::OptimisticallyConfirmed(1), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + ); + assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); + + // Test bank will only be cached when frozen + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::OptimisticallyConfirmed(3), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + ); + assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); + assert_eq!(pending_optimistically_confirmed_banks.len(), 1); + assert_eq!(pending_optimistically_confirmed_banks.contains(&3), true); + + // Test bank will only be cached when frozen + let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone(); + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::Frozen(bank3), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + ); + assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); + + // Test higher root will be cached and clear pending_optimistically_confirmed_banks + let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone(); + let bank4 = Bank::new_from_parent(&bank3, &Pubkey::default(), 4); + bank_forks.write().unwrap().insert(bank4); + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::OptimisticallyConfirmed(4), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + ); + assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); + assert_eq!(pending_optimistically_confirmed_banks.len(), 1); + assert_eq!(pending_optimistically_confirmed_banks.contains(&4), true); + + let bank4 = bank_forks.read().unwrap().get(4).unwrap().clone(); + let bank5 = Bank::new_from_parent(&bank4, &Pubkey::default(), 5); + bank_forks.write().unwrap().insert(bank5); + let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone(); + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::Root(bank5), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + ); + assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); + assert_eq!(pending_optimistically_confirmed_banks.len(), 0); + assert_eq!(pending_optimistically_confirmed_banks.contains(&4), false); + + // Banks <= root do not get added to pending list, even if not frozen + let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone(); + let bank6 = Bank::new_from_parent(&bank5, &Pubkey::default(), 6); + bank_forks.write().unwrap().insert(bank6); + let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone(); + let bank7 = Bank::new_from_parent(&bank5, &Pubkey::default(), 7); + bank_forks.write().unwrap().insert(bank7); + bank_forks.write().unwrap().set_root(7, &None, None); + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::OptimisticallyConfirmed(6), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + ); + assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); + assert_eq!(pending_optimistically_confirmed_banks.len(), 0); + assert_eq!(pending_optimistically_confirmed_banks.contains(&6), false); + } +} diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index cbef8f42a0..de42e5ec12 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -11,6 +11,7 @@ use crate::{ consensus::{ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes}, fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, + optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, progress_map::{ForkProgress, ProgressMap, PropagatedStats}, pubkey_references::PubkeyReferences, @@ -108,6 +109,7 @@ pub struct ReplayStageConfig { pub transaction_status_sender: Option, pub rewards_recorder_sender: Option, pub cache_block_time_sender: Option, + pub bank_notification_sender: Option, } #[derive(Default)] @@ -239,6 +241,7 @@ impl ReplayStage { transaction_status_sender, rewards_recorder_sender, cache_block_time_sender, + bank_notification_sender, } = config; trace!("replay stage"); @@ -308,8 +311,8 @@ impl ReplayStage { transaction_status_sender.clone(), &verify_recyclers, &mut heaviest_subtree_fork_choice, - &subscriptions, &replay_vote_sender, + &bank_notification_sender, ); replay_active_banks_time.stop(); Self::report_memory(&allocated, "replay_active_banks", start); @@ -462,6 +465,7 @@ impl ReplayStage { &block_commitment_cache, &mut heaviest_subtree_fork_choice, &cache_block_time_sender, + &bank_notification_sender, )?; }; voting_time.stop(); @@ -1032,6 +1036,7 @@ impl ReplayStage { block_commitment_cache: &Arc>, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, cache_block_time_sender: &Option, + bank_notification_sender: &Option, ) -> Result<()> { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -1055,7 +1060,7 @@ impl ReplayStage { .expect("Root bank doesn't exist") .clone(); let mut rooted_banks = root_bank.parents(); - rooted_banks.push(root_bank); + rooted_banks.push(root_bank.clone()); let rooted_slots: Vec<_> = rooted_banks.iter().map(|bank| bank.slot()).collect(); // Call leader schedule_cache.set_root() before blockstore.set_root() because // bank_forks.root is consumed by repair_service to update gossip, so we don't want to @@ -1087,6 +1092,11 @@ impl ReplayStage { heaviest_subtree_fork_choice, ); subscriptions.notify_roots(rooted_slots); + if let Some(sender) = bank_notification_sender { + sender + .send(BankNotification::Root(root_bank)) + .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); + } latest_root_senders.iter().for_each(|s| { if let Err(e) = s.send(new_root) { trace!("latest root send failed: {:?}", e); @@ -1253,8 +1263,8 @@ impl ReplayStage { transaction_status_sender: Option, verify_recyclers: &VerifyRecyclers, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, - subscriptions: &Arc, replay_vote_sender: &ReplayVoteSender, + bank_notification_sender: &Option, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -1325,7 +1335,11 @@ impl ReplayStage { bank.freeze(); heaviest_subtree_fork_choice .add_new_leaf_slot(bank.slot(), Some(bank.parent_slot())); - subscriptions.notify_frozen(bank.slot()); + if let Some(sender) = bank_notification_sender { + sender + .send(BankNotification::Frozen(bank.clone())) + .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); + } } else { trace!( "bank {} not completed tick_height: {}, max_tick_height: {}", @@ -1961,6 +1975,7 @@ pub(crate) mod tests { use crate::{ consensus::test::{initialize_state, VoteSimulator}, consensus::Tower, + optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, progress_map::ValidatorStakeInfo, replay_stage::ReplayStage, transaction_status_service::TransactionStatusService, @@ -2073,11 +2088,14 @@ pub(crate) mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); // RpcSubscriptions + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let exit = Arc::new(AtomicBool::new(false)); let rpc_subscriptions = Arc::new(RpcSubscriptions::new( &exit, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::default())), + optimistically_confirmed_bank, )); ReplayBlockstoreComponents { @@ -2568,6 +2586,7 @@ pub(crate) mod tests { &exit, bank_forks.clone(), block_commitment_cache.clone(), + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )); let (lockouts_sender, _) = AggregateCommitmentService::new(&exit, block_commitment_cache.clone(), subscriptions); diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 973dc1c166..c83b47a766 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -4,6 +4,7 @@ use crate::{ cluster_info::ClusterInfo, contact_info::ContactInfo, non_circulating_supply::calculate_non_circulating_supply, + optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, rpc_error::RpcCustomError, rpc_health::*, send_transaction_service::{SendTransactionService, TransactionInfo}, @@ -121,6 +122,7 @@ pub struct JsonRpcRequestProcessor { transaction_sender: Arc>>, runtime_handle: runtime::Handle, bigtable_ledger_storage: Option, + optimistically_confirmed_bank: Arc>, } impl Metadata for JsonRpcRequestProcessor {} @@ -134,6 +136,17 @@ impl JsonRpcRequestProcessor { Some(config) => config.commitment, }; + if commitment_level == CommitmentLevel::SingleGossip { + let bank = self + .optimistically_confirmed_bank + .read() + .unwrap() + .bank + .clone(); + debug!("RPC using optimistically confirmed slot: {:?}", bank.slot()); + return bank; + } + let slot = self .block_commitment_cache .read() @@ -147,12 +160,13 @@ impl JsonRpcRequestProcessor { CommitmentLevel::Root => { debug!("RPC using node root: {:?}", slot); } - CommitmentLevel::Single | CommitmentLevel::SingleGossip => { + CommitmentLevel::Single => { debug!("RPC using confirmed slot: {:?}", slot); } CommitmentLevel::Max => { debug!("RPC using block: {:?}", slot); } + CommitmentLevel::SingleGossip => unreachable!(), }; r_bank_forks.get(slot).cloned().unwrap_or_else(|| { @@ -187,6 +201,7 @@ impl JsonRpcRequestProcessor { genesis_hash: Hash, runtime: &runtime::Runtime, bigtable_ledger_storage: Option, + optimistically_confirmed_bank: Arc>, ) -> (Self, Receiver) { let (sender, receiver) = channel(); ( @@ -202,6 +217,7 @@ impl JsonRpcRequestProcessor { transaction_sender: Arc::new(Mutex::new(sender)), runtime_handle: runtime.handle().clone(), bigtable_ledger_storage, + optimistically_confirmed_bank, }, receiver, ) @@ -237,6 +253,9 @@ impl JsonRpcRequestProcessor { transaction_sender: Arc::new(Mutex::new(sender)), runtime_handle: runtime::Runtime::new().unwrap().handle().clone(), bigtable_ledger_storage: None, + optimistically_confirmed_bank: Arc::new(RwLock::new(OptimisticallyConfirmedBank { + bank: bank.clone(), + })), } } @@ -2553,8 +2572,13 @@ pub(crate) fn create_validator_exit(exit: &Arc) -> Arc>>, trusted_validators: Option>, override_health_check: Arc, + optimistically_confirmed_bank: Arc>, ) -> Self { info!("rpc bound to {:?}", rpc_addr); info!("rpc configuration: {:?}", config); @@ -311,6 +313,7 @@ impl JsonRpcService { genesis_hash, &runtime, bigtable_ledger_storage, + optimistically_confirmed_bank, ); let leader_info = @@ -436,6 +439,8 @@ mod tests { let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let mut rpc_service = JsonRpcService::new( rpc_addr, JsonRpcConfig::default(), @@ -450,6 +455,7 @@ mod tests { validator_exit, None, Arc::new(AtomicBool::new(false)), + optimistically_confirmed_bank, ); let thread = rpc_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index d2785bbcf4..d8896d51fa 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -1,6 +1,9 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::rpc::{get_parsed_token_account, get_parsed_token_accounts}; +use crate::{ + optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, + rpc::{get_parsed_token_account, get_parsed_token_accounts}, +}; use core::hash::Hash; use jsonrpc_core::futures::Future; use jsonrpc_pubsub::{ @@ -61,7 +64,6 @@ enum NotificationEntry { Slot(SlotInfo), Vote(Vote), Root(Slot), - Frozen(Slot), Bank(CommitmentSlots), Gossip(Slot), SignaturesReceived((Slot, Vec)), @@ -71,7 +73,6 @@ impl std::fmt::Debug for NotificationEntry { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { NotificationEntry::Root(root) => write!(f, "Root({})", root), - NotificationEntry::Frozen(slot) => write!(f, "Frozen({})", slot), NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote), NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), NotificationEntry::Bank(commitment_slots) => { @@ -346,7 +347,7 @@ pub struct RpcSubscriptions { notifier_runtime: Option, bank_forks: Arc>, block_commitment_cache: Arc>, - last_checked_slots: Arc>>, + optimistically_confirmed_bank: Arc>, exit: Arc, } @@ -363,6 +364,7 @@ impl RpcSubscriptions { exit: &Arc, bank_forks: Arc>, block_commitment_cache: Arc>, + optimistically_confirmed_bank: Arc>, ) -> Self { let (notification_sender, notification_receiver): ( Sender, @@ -396,9 +398,6 @@ impl RpcSubscriptions { }; let _subscriptions = subscriptions.clone(); - let last_checked_slots = Arc::new(RwLock::new(HashMap::new())); - let _last_checked_slots = last_checked_slots.clone(); - let notifier_runtime = RuntimeBuilder::new() .core_threads(1) .name_prefix("solana-rpc-notifier-") @@ -415,7 +414,6 @@ impl RpcSubscriptions { notification_receiver, _subscriptions, _bank_forks, - _last_checked_slots, ); }) .unwrap(); @@ -427,16 +425,19 @@ impl RpcSubscriptions { t_cleanup: Some(t_cleanup), bank_forks, block_commitment_cache, - last_checked_slots, + optimistically_confirmed_bank, exit: exit.clone(), } } pub fn default_with_bank_forks(bank_forks: Arc>) -> Self { + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); Self::new( &Arc::new(AtomicBool::new(false)), bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), + optimistically_confirmed_bank, ) } @@ -529,12 +530,12 @@ impl RpcSubscriptions { .read() .unwrap() .highest_confirmed_slot(), - CommitmentLevel::SingleGossip => *self - .last_checked_slots + CommitmentLevel::SingleGossip => self + .optimistically_confirmed_bank .read() .unwrap() - .get(&CommitmentLevel::SingleGossip) - .unwrap_or(&0), + .bank + .slot(), }; let last_notified_slot = if let Some((_account, slot)) = self .bank_forks @@ -724,10 +725,6 @@ impl RpcSubscriptions { self.enqueue_notification(NotificationEntry::Vote(vote.clone())); } - pub fn notify_frozen(&self, frozen_slot: Slot) { - self.enqueue_notification(NotificationEntry::Frozen(frozen_slot)); - } - pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap(); @@ -769,9 +766,7 @@ impl RpcSubscriptions { notification_receiver: Receiver, subscriptions: Subscriptions, bank_forks: Arc>, - last_checked_slots: Arc>>, ) { - let mut pending_gossip_notifications = HashSet::new(); loop { if exit.load(Ordering::Relaxed) { break; @@ -804,18 +799,10 @@ impl RpcSubscriptions { } NotificationEntry::Root(root) => { debug!("root notify: {:?}", root); - { - let subscriptions = subscriptions.root_subscriptions.read().unwrap(); - for (_, sink) in subscriptions.iter() { - notifier.notify(root, sink); - } + let subscriptions = subscriptions.root_subscriptions.read().unwrap(); + for (_, sink) in subscriptions.iter() { + notifier.notify(root, sink); } - - // Prune old pending notifications - pending_gossip_notifications = pending_gossip_notifications - .into_iter() - .filter(|&s| s > root) - .collect(); } NotificationEntry::Bank(commitment_slots) => { RpcSubscriptions::notify_accounts_programs_signatures( @@ -828,36 +815,13 @@ impl RpcSubscriptions { "bank", ) } - NotificationEntry::Frozen(slot) => { - if pending_gossip_notifications.remove(&slot) { - Self::process_gossip_notification( - slot, - ¬ifier, - &subscriptions, - &bank_forks, - &last_checked_slots, - ); - } - } NotificationEntry::Gossip(slot) => { - let bank_frozen = bank_forks - .read() - .unwrap() - .get(slot) - .filter(|b| b.is_frozen()) - .is_some(); - - if !bank_frozen { - pending_gossip_notifications.insert(slot); - } else { - Self::process_gossip_notification( - slot, - ¬ifier, - &subscriptions, - &bank_forks, - &last_checked_slots, - ); - } + Self::process_gossip_notification( + slot, + ¬ifier, + &subscriptions, + &bank_forks, + ); } NotificationEntry::SignaturesReceived(slot_signatures) => { RpcSubscriptions::process_signatures_received( @@ -883,23 +847,7 @@ impl RpcSubscriptions { notifier: &RpcNotifier, subscriptions: &Subscriptions, bank_forks: &Arc>, - last_checked_slots: &Arc>>, ) { - let mut last_checked_slots_lock = last_checked_slots.write().unwrap(); - let last_checked_slot = last_checked_slots_lock - .get(&CommitmentLevel::SingleGossip) - .cloned() - .unwrap_or_default(); - - if slot > last_checked_slot { - last_checked_slots_lock.insert(CommitmentLevel::SingleGossip, slot); - } else { - // Avoid sending stale or duplicate notifications - return; - } - - drop(last_checked_slots_lock); - let commitment_slots = CommitmentSlots { highest_confirmed_slot: slot, ..CommitmentSlots::default() @@ -1053,6 +1001,9 @@ impl RpcSubscriptions { #[cfg(test)] pub(crate) mod tests { use super::*; + use crate::optimistically_confirmed_bank_tracker::{ + BankNotification, OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker, + }; use jsonrpc_core::futures::{self, stream::Stream}; use jsonrpc_pubsub::typed::Subscriber; use serial_test_derive::serial; @@ -1118,6 +1069,7 @@ pub(crate) mod tests { Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, ))), + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), ); subscriptions.add_account_subscription( alice.pubkey(), @@ -1216,10 +1168,13 @@ pub(crate) mod tests { Subscriber::new_test("programNotification"); let sub_id = SubscriptionId::Number(0 as u64); let exit = Arc::new(AtomicBool::new(false)); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = RpcSubscriptions::new( &exit, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + optimistically_confirmed_bank, ); subscriptions.add_program_subscription( solana_stake_program::id(), @@ -1324,10 +1279,13 @@ pub(crate) mod tests { ); let exit = Arc::new(AtomicBool::new(false)); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = RpcSubscriptions::new( &exit, bank_forks, Arc::new(RwLock::new(block_commitment_cache)), + optimistically_confirmed_bank, ); let (past_bank_sub1, _id_receiver, past_bank_recv1) = @@ -1480,10 +1438,13 @@ pub(crate) mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = RpcSubscriptions::new( &exit, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + optimistically_confirmed_bank, ); subscriptions.add_slot_subscription(sub_id.clone(), subscriber); @@ -1528,10 +1489,13 @@ pub(crate) mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = RpcSubscriptions::new( &exit, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + optimistically_confirmed_bank, ); subscriptions.add_root_subscription(sub_id.clone(), subscriber); @@ -1630,18 +1594,23 @@ pub(crate) mod tests { bank_forks.write().unwrap().insert(bank2); let alice = Keypair::new(); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let mut pending_optimistically_confirmed_banks = HashSet::new(); + let (subscriber0, _id_receiver, transport_receiver0) = Subscriber::new_test("accountNotification"); let (subscriber1, _id_receiver, transport_receiver1) = Subscriber::new_test("accountNotification"); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = RpcSubscriptions::new( + let subscriptions = Arc::new(RpcSubscriptions::new( &exit, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, ))), - ); + optimistically_confirmed_bank.clone(), + )); let sub_id0 = SubscriptionId::Number(0 as u64); subscriptions.add_account_subscription( alice.pubkey(), @@ -1685,10 +1654,22 @@ pub(crate) mod tests { .unwrap(); // First, notify the unfrozen bank first to queue pending notification - subscriptions.notify_gossip_subscribers(2); + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::OptimisticallyConfirmed(2), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + ); // Now, notify the frozen bank and ensure its notifications are processed - subscriptions.notify_gossip_subscribers(1); + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::OptimisticallyConfirmed(1), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + ); let (response, _) = robust_poll_or_panic(transport_receiver0); let expected = json!({ @@ -1723,7 +1704,14 @@ pub(crate) mod tests { subscriber1, ); - subscriptions.notify_frozen(2); + let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone(); + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::Frozen(bank2), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + ); let (response, _) = robust_poll_or_panic(transport_receiver1); let expected = json!({ "jsonrpc": "2.0", diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 60c535367b..253748f58e 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -7,6 +7,7 @@ use crate::{ cluster_info::ClusterInfo, cluster_info_vote_listener::{ClusterInfoVoteListener, VerifiedVoteSender, VoteTracker}, fetch_stage::FetchStage, + optimistically_confirmed_bank_tracker::BankNotificationSender, poh_recorder::{PohRecorder, WorkingBankEntry}, rpc_subscriptions::RpcSubscriptions, sigverify::TransactionSigVerifier, @@ -57,6 +58,7 @@ impl Tpu { verified_vote_sender: VerifiedVoteSender, replay_vote_receiver: ReplayVoteReceiver, replay_vote_sender: ReplayVoteSender, + bank_notification_sender: Option, ) -> Self { let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( @@ -85,6 +87,7 @@ impl Tpu { verified_vote_sender, replay_vote_receiver, blockstore.clone(), + bank_notification_sender, ); let banking_stage = BankingStage::new( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index a0d7674dd6..97e1413c08 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -11,6 +11,7 @@ use crate::{ completed_data_sets_service::CompletedDataSetsSender, consensus::Tower, ledger_cleanup_service::LedgerCleanupService, + optimistically_confirmed_bank_tracker::BankNotificationSender, poh_recorder::PohRecorder, replay_stage::{ReplayStage, ReplayStageConfig}, retransmit_stage::RetransmitStage, @@ -108,6 +109,7 @@ impl Tvu { verified_vote_receiver: VerifiedVoteReceiver, replay_vote_sender: ReplayVoteSender, completed_data_sets_sender: CompletedDataSetsSender, + bank_notification_sender: Option, tvu_config: TvuConfig, ) -> Self { let keypair: Arc = cluster_info.keypair.clone(); @@ -219,6 +221,7 @@ impl Tvu { transaction_status_sender, rewards_recorder_sender, cache_block_time_sender, + bank_notification_sender, }; let replay_stage = ReplayStage::new( @@ -279,6 +282,7 @@ pub mod tests { use crate::{ banking_stage::create_test_recorder, cluster_info::{ClusterInfo, Node}, + optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, }; use serial_test_derive::serial; use solana_ledger::{ @@ -348,6 +352,7 @@ pub mod tests { &exit, bank_forks.clone(), block_commitment_cache.clone(), + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )), &poh_recorder, tower, @@ -365,6 +370,7 @@ pub mod tests { verified_vote_receiver, replay_vote_sender, completed_data_sets_sender, + None, TvuConfig::default(), ); exit.store(true, Ordering::Relaxed); diff --git a/core/src/validator.rs b/core/src/validator.rs index 59ace24e3d..5427193a61 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -10,6 +10,9 @@ use crate::{ consensus::{reconcile_blockstore_roots_with_tower, Tower, TowerError}, contact_info::ContactInfo, gossip_service::GossipService, + optimistically_confirmed_bank_tracker::{ + OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker, + }, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, poh_service::PohService, rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService}, @@ -166,10 +169,17 @@ struct TransactionHistoryServices { cache_block_time_service: Option, } +struct RpcServices { + json_rpc_service: JsonRpcService, + pubsub_service: PubSubService, + rpc_banks_service: RpcBanksService, + optimistically_confirmed_bank_tracker: OptimisticallyConfirmedBankTracker, +} + pub struct Validator { pub id: Pubkey, validator_exit: Arc>>, - rpc_service: Option<(JsonRpcService, PubSubService, RpcBanksService)>, + rpc_service: Option, transaction_status_service: Option, rewards_recorder_service: Option, cache_block_time_service: Option, @@ -329,10 +339,14 @@ impl Validator { block_commitment_cache.initialize_slots(bank.slot()); let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache)); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let subscriptions = Arc::new(RpcSubscriptions::new( &exit, bank_forks.clone(), block_commitment_cache.clone(), + optimistically_confirmed_bank.clone(), )); let (completed_data_sets_sender, completed_data_sets_receiver) = @@ -391,9 +405,8 @@ impl Validator { let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let rpc_override_health_check = Arc::new(AtomicBool::new(false)); - let rpc_service = config - .rpc_addrs - .map(|(rpc_addr, rpc_pubsub_addr, rpc_banks_addr)| { + let (rpc_service, bank_notification_sender) = + if let Some((rpc_addr, rpc_pubsub_addr, rpc_banks_addr)) = config.rpc_addrs { if ContactInfo::is_valid_address(&node.info.rpc) { assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); assert_eq!(rpc_addr.port(), node.info.rpc.port()); @@ -403,32 +416,47 @@ impl Validator { assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub)); } let tpu_address = cluster_info.my_contact_info().tpu; + let (bank_notification_sender, bank_notification_receiver) = unbounded(); ( - JsonRpcService::new( - rpc_addr, - config.rpc_config.clone(), - config.snapshot_config.clone(), - bank_forks.clone(), - block_commitment_cache.clone(), - blockstore.clone(), - cluster_info.clone(), - Some(poh_recorder.clone()), - genesis_config.hash(), - ledger_path, - validator_exit.clone(), - config.trusted_validators.clone(), - rpc_override_health_check.clone(), - ), - PubSubService::new(&subscriptions, rpc_pubsub_addr, &exit), - RpcBanksService::new( - rpc_banks_addr, - tpu_address, - &bank_forks, - &block_commitment_cache, - &exit, - ), + Some(RpcServices { + json_rpc_service: JsonRpcService::new( + rpc_addr, + config.rpc_config.clone(), + config.snapshot_config.clone(), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore.clone(), + cluster_info.clone(), + Some(poh_recorder.clone()), + genesis_config.hash(), + ledger_path, + validator_exit.clone(), + config.trusted_validators.clone(), + rpc_override_health_check.clone(), + optimistically_confirmed_bank.clone(), + ), + pubsub_service: PubSubService::new(&subscriptions, rpc_pubsub_addr, &exit), + rpc_banks_service: RpcBanksService::new( + rpc_banks_addr, + tpu_address, + &bank_forks, + &block_commitment_cache, + &exit, + ), + optimistically_confirmed_bank_tracker: + OptimisticallyConfirmedBankTracker::new( + bank_notification_receiver, + &exit, + bank_forks.clone(), + optimistically_confirmed_bank, + subscriptions.clone(), + ), + }), + Some(bank_notification_sender), ) - }); + } else { + (None, None) + }; let ip_echo_server = solana_net_utils::ip_echo_server(node.sockets.ip_echo.unwrap()); @@ -532,6 +560,7 @@ impl Validator { verified_vote_receiver, replay_vote_sender.clone(), completed_data_sets_sender, + bank_notification_sender.clone(), TvuConfig { max_ledger_shreds: config.max_ledger_shreds, halt_on_trusted_validators_accounts_hash_mismatch: config @@ -562,6 +591,7 @@ impl Validator { verified_vote_sender, replay_vote_receiver, replay_vote_sender, + bank_notification_sender, ); datapoint_info!("validator-new", ("id", id.to_string(), String)); @@ -625,10 +655,17 @@ impl Validator { pub fn join(self) -> Result<()> { self.poh_service.join()?; drop(self.poh_recorder); - if let Some((rpc_service, rpc_pubsub_service, rpc_banks_service)) = self.rpc_service { - rpc_service.join()?; - rpc_pubsub_service.join()?; + if let Some(RpcServices { + json_rpc_service, + pubsub_service, + rpc_banks_service, + optimistically_confirmed_bank_tracker, + }) = self.rpc_service + { + json_rpc_service.join()?; + pubsub_service.join()?; rpc_banks_service.join()?; + optimistically_confirmed_bank_tracker.join()?; } if let Some(transaction_status_service) = self.transaction_status_service { transaction_status_service.join()?; diff --git a/core/tests/client.rs b/core/tests/client.rs index bebe1a7773..77b10ef807 100644 --- a/core/tests/client.rs +++ b/core/tests/client.rs @@ -1,5 +1,6 @@ use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::SlotInfo}; use solana_core::{ + optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions, test_validator::TestValidator, }; @@ -91,10 +92,13 @@ fn test_slot_subscription() { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = Arc::new(RpcSubscriptions::new( &exit, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), + optimistically_confirmed_bank, )); let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); std::thread::sleep(Duration::from_millis(400)); diff --git a/runtime/src/commitment.rs b/runtime/src/commitment.rs index c6b41b6278..b298db4b9c 100644 --- a/runtime/src/commitment.rs +++ b/runtime/src/commitment.rs @@ -104,8 +104,8 @@ impl BlockCommitmentCache { } pub fn highest_gossip_confirmed_slot(&self) -> Slot { - // TODO: see solana_core::RpcSubscriptions: - //self.last_checked_slots.get(&CommitmentLevel::SingleGossip).unwrap_or(&0) + // TODO: combine bank caches + // Currently, this information is provided by OptimisticallyConfirmedBank::bank.slot() self.highest_confirmed_slot() }