diff --git a/core/src/commitment.rs b/core/src/commitment.rs index e548547ffa..5e18dbe5a7 100644 --- a/core/src/commitment.rs +++ b/core/src/commitment.rs @@ -1,7 +1,7 @@ -use crate::consensus::VOTE_THRESHOLD_SIZE; +use crate::{consensus::VOTE_THRESHOLD_SIZE, rpc_subscriptions::RpcSubscriptions}; use solana_ledger::blockstore::Blockstore; use solana_measure::measure::Measure; -use solana_metrics::inc_new_counter_info; +use solana_metrics::datapoint_info; use solana_runtime::bank::Bank; use solana_sdk::clock::Slot; use solana_vote_program::{vote_state::VoteState, vote_state::MAX_LOCKOUT_HISTORY}; @@ -14,6 +14,14 @@ use std::{ time::Duration, }; +#[derive(Default)] +pub struct CacheSlotInfo { + pub current_slot: Slot, + pub node_root: Slot, + pub largest_confirmed_root: Slot, + pub highest_confirmed_slot: Slot, +} + pub type BlockCommitmentArray = [u64; MAX_LOCKOUT_HISTORY + 1]; #[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] @@ -53,6 +61,7 @@ pub struct BlockCommitmentCache { bank: Arc, blockstore: Arc, root: Slot, + highest_confirmed_slot: Slot, } impl std::fmt::Debug for BlockCommitmentCache { @@ -77,6 +86,7 @@ impl BlockCommitmentCache { bank: Arc, blockstore: Arc, root: Slot, + highest_confirmed_slot: Slot, ) -> Self { Self { block_commitment, @@ -85,6 +95,7 @@ impl BlockCommitmentCache { bank, blockstore, root, + highest_confirmed_slot, } } @@ -96,6 +107,7 @@ impl BlockCommitmentCache { bank: Arc::new(Bank::default()), blockstore, root: Slot::default(), + highest_confirmed_slot: Slot::default(), } } @@ -123,6 +135,26 @@ impl BlockCommitmentCache { self.root } + pub fn highest_confirmed_slot(&self) -> Slot { + self.highest_confirmed_slot + } + + fn highest_slot_with_confirmation_count(&self, confirmation_count: usize) -> Slot { + assert!(confirmation_count > 0 && confirmation_count <= MAX_LOCKOUT_HISTORY); + for slot in (self.root()..self.slot()).rev() { + if let Some(count) = self.get_confirmation_count(slot) { + if count >= confirmation_count { + return slot; + } + } + } + self.root + } + + fn calculate_highest_confirmed_slot(&self) -> Slot { + self.highest_slot_with_confirmation_count(1) + } + pub fn get_confirmation_count(&self, slot: Slot) -> Option { self.get_lockout_count(slot, VOTE_THRESHOLD_SIZE) } @@ -159,6 +191,7 @@ impl BlockCommitmentCache { largest_confirmed_root: Slot::default(), bank: Arc::new(Bank::default()), root: Slot::default(), + highest_confirmed_slot: Slot::default(), } } @@ -177,10 +210,11 @@ impl BlockCommitmentCache { largest_confirmed_root: root, bank, root, + highest_confirmed_slot: root, } } - pub(crate) fn set_get_largest_confirmed_root(&mut self, root: Slot) { + pub(crate) fn set_largest_confirmed_root(&mut self, root: Slot) { self.largest_confirmed_root = root; } } @@ -221,6 +255,7 @@ impl AggregateCommitmentService { pub fn new( exit: &Arc, block_commitment_cache: Arc>, + subscriptions: Arc, ) -> (Sender, Self) { let (sender, receiver): ( Sender, @@ -238,7 +273,7 @@ impl AggregateCommitmentService { } if let Err(RecvTimeoutError::Disconnected) = - Self::run(&receiver, &block_commitment_cache, &exit_) + Self::run(&receiver, &block_commitment_cache, &subscriptions, &exit_) { break; } @@ -251,6 +286,7 @@ impl AggregateCommitmentService { fn run( receiver: &Receiver, block_commitment_cache: &RwLock, + subscriptions: &Arc, exit: &Arc, ) -> Result<(), RecvTimeoutError> { loop { @@ -283,16 +319,30 @@ impl AggregateCommitmentService { aggregation_data.bank, block_commitment_cache.read().unwrap().blockstore.clone(), aggregation_data.root, + aggregation_data.root, ); + new_block_commitment.highest_confirmed_slot = + new_block_commitment.calculate_highest_confirmed_slot(); let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); aggregate_commitment_time.stop(); - inc_new_counter_info!( - "aggregate-commitment-ms", - aggregate_commitment_time.as_ms() as usize + datapoint_info!( + "block-commitment-cache", + ( + "aggregate-commitment-ms", + aggregate_commitment_time.as_ms() as i64, + i64 + ) ); + + subscriptions.notify_subscribers(CacheSlotInfo { + current_slot: w_block_commitment_cache.slot(), + node_root: w_block_commitment_cache.root(), + largest_confirmed_root: w_block_commitment_cache.largest_confirmed_root(), + highest_confirmed_slot: w_block_commitment_cache.highest_confirmed_slot(), + }); } } @@ -382,7 +432,7 @@ mod tests { genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path, }; - use solana_sdk::pubkey::Pubkey; + use solana_sdk::{genesis_config::GenesisConfig, pubkey::Pubkey}; use solana_stake_program::stake_state; use solana_vote_program::vote_state::{self, VoteStateVersions}; @@ -419,7 +469,7 @@ mod tests { block_commitment.entry(1).or_insert(cache1); block_commitment.entry(2).or_insert(cache2); let block_commitment_cache = - BlockCommitmentCache::new(block_commitment, 0, 50, bank, blockstore, 0); + BlockCommitmentCache::new(block_commitment, 0, 50, bank, blockstore, 0, 0); assert_eq!(block_commitment_cache.get_confirmation_count(0), Some(2)); assert_eq!(block_commitment_cache.get_confirmation_count(1), Some(1)); @@ -453,6 +503,7 @@ mod tests { bank, blockstore, 0, + 0, ); assert!(block_commitment_cache.is_confirmed_rooted(0)); @@ -476,6 +527,114 @@ mod tests { assert_eq!(get_largest_confirmed_root(rooted_stake, 10), 1); } + #[test] + fn test_highest_confirmed_slot() { + let bank = Arc::new(Bank::new(&GenesisConfig::default())); + let bank_slot_5 = Arc::new(Bank::new_from_parent(&bank, &Pubkey::default(), 5)); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let total_stake = 50; + + // Build cache with confirmation_count 2 given total_stake + let mut cache0 = BlockCommitment::default(); + cache0.increase_confirmation_stake(1, 5); + cache0.increase_confirmation_stake(2, 40); + + // Build cache with confirmation_count 1 given total_stake + let mut cache1 = BlockCommitment::default(); + cache1.increase_confirmation_stake(1, 40); + cache1.increase_confirmation_stake(2, 5); + + // Build cache with confirmation_count 0 given total_stake + let mut cache2 = BlockCommitment::default(); + cache2.increase_confirmation_stake(1, 20); + cache2.increase_confirmation_stake(2, 5); + + let mut block_commitment = HashMap::new(); + block_commitment.entry(1).or_insert(cache0.clone()); // Slot 1, conf 2 + block_commitment.entry(2).or_insert(cache1.clone()); // Slot 2, conf 1 + block_commitment.entry(3).or_insert(cache2.clone()); // Slot 3, conf 0 + let block_commitment_cache = BlockCommitmentCache::new( + block_commitment, + 0, + total_stake, + bank_slot_5.clone(), + blockstore.clone(), + 0, + 0, + ); + + assert_eq!(block_commitment_cache.calculate_highest_confirmed_slot(), 2); + + // Build map with multiple slots at conf 1 + let mut block_commitment = HashMap::new(); + block_commitment.entry(1).or_insert(cache1.clone()); // Slot 1, conf 1 + block_commitment.entry(2).or_insert(cache1.clone()); // Slot 2, conf 1 + block_commitment.entry(3).or_insert(cache2.clone()); // Slot 3, conf 0 + let block_commitment_cache = BlockCommitmentCache::new( + block_commitment, + 0, + total_stake, + bank_slot_5.clone(), + blockstore.clone(), + 0, + 0, + ); + + assert_eq!(block_commitment_cache.calculate_highest_confirmed_slot(), 2); + + // Build map with slot gaps + let mut block_commitment = HashMap::new(); + block_commitment.entry(1).or_insert(cache1.clone()); // Slot 1, conf 1 + block_commitment.entry(3).or_insert(cache1.clone()); // Slot 3, conf 1 + block_commitment.entry(5).or_insert(cache2.clone()); // Slot 5, conf 0 + let block_commitment_cache = BlockCommitmentCache::new( + block_commitment, + 0, + total_stake, + bank_slot_5.clone(), + blockstore.clone(), + 0, + 0, + ); + + assert_eq!(block_commitment_cache.calculate_highest_confirmed_slot(), 3); + + // Build map with no conf 1 slots, but one higher + let mut block_commitment = HashMap::new(); + block_commitment.entry(1).or_insert(cache0.clone()); // Slot 1, conf 2 + block_commitment.entry(2).or_insert(cache2.clone()); // Slot 2, conf 0 + block_commitment.entry(3).or_insert(cache2.clone()); // Slot 3, conf 0 + let block_commitment_cache = BlockCommitmentCache::new( + block_commitment, + 0, + total_stake, + bank_slot_5.clone(), + blockstore.clone(), + 0, + 0, + ); + + assert_eq!(block_commitment_cache.calculate_highest_confirmed_slot(), 1); + + // Build map with no conf 1 or higher slots + let mut block_commitment = HashMap::new(); + block_commitment.entry(1).or_insert(cache2.clone()); // Slot 1, conf 0 + block_commitment.entry(2).or_insert(cache2.clone()); // Slot 2, conf 0 + block_commitment.entry(3).or_insert(cache2.clone()); // Slot 3, conf 0 + let block_commitment_cache = BlockCommitmentCache::new( + block_commitment, + 0, + total_stake, + bank_slot_5.clone(), + blockstore.clone(), + 0, + 0, + ); + + assert_eq!(block_commitment_cache.calculate_highest_confirmed_slot(), 0); + } + #[test] fn test_aggregate_commitment_for_vote_account_1() { let ancestors = vec![3, 4, 5, 7, 9, 11]; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 58172d78c7..97a7614f5c 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -176,8 +176,11 @@ impl ReplayStage { let mut tower = Tower::new(&my_pubkey, &vote_account, &bank_forks.read().unwrap()); // Start the replay stage loop - let (lockouts_sender, commitment_service) = - AggregateCommitmentService::new(&exit, block_commitment_cache.clone()); + let (lockouts_sender, commitment_service) = AggregateCommitmentService::new( + &exit, + block_commitment_cache.clone(), + subscriptions.clone(), + ); #[allow(clippy::cognitive_complexity)] let t_replay = Builder::new() @@ -353,8 +356,6 @@ impl ReplayStage { // Vote on a fork if let Some(ref vote_bank) = vote_bank { - subscriptions - .notify_subscribers(block_commitment_cache.read().unwrap().slot()); if let Some(votable_leader) = leader_schedule_cache.slot_leader_at(vote_bank.slot(), Some(vote_bank)) { @@ -2595,13 +2596,6 @@ pub(crate) mod tests { let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); - let block_commitment_cache = Arc::new(RwLock::new( - BlockCommitmentCache::default_with_blockstore(blockstore), - )); - let (lockouts_sender, _) = AggregateCommitmentService::new( - &Arc::new(AtomicBool::new(false)), - block_commitment_cache.clone(), - ); let leader_pubkey = Pubkey::new_rand(); let leader_lamports = 3; @@ -2622,6 +2616,18 @@ pub(crate) mod tests { 0, ))); + let exit = Arc::new(AtomicBool::new(false)); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::default_with_blockstore(blockstore.clone()), + )); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + bank_forks.clone(), + block_commitment_cache.clone(), + )); + let (lockouts_sender, _) = + AggregateCommitmentService::new(&exit, block_commitment_cache.clone(), subscriptions); + assert!(block_commitment_cache .read() .unwrap() diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 6d7cd24870..2ab4d9c6e4 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -79,28 +79,42 @@ impl JsonRpcRequestProcessor { fn bank(&self, commitment: Option) -> Result> { debug!("RPC commitment_config: {:?}", commitment); let r_bank_forks = self.bank_forks.read().unwrap(); - if commitment.is_some() && commitment.unwrap().commitment == CommitmentLevel::Recent { - let bank = r_bank_forks.working_bank(); - debug!("RPC using working_bank: {:?}", bank.slot()); - Ok(bank) - } else if commitment.is_some() && commitment.unwrap().commitment == CommitmentLevel::Root { - let slot = r_bank_forks.root(); - debug!("RPC using node root: {:?}", slot); - Ok(r_bank_forks.get(slot).cloned().unwrap()) - } else { - let cluster_root = self - .block_commitment_cache - .read() - .unwrap() - .largest_confirmed_root(); - debug!("RPC using block: {:?}", cluster_root); - r_bank_forks.get(cluster_root).cloned().ok_or_else(|| { - RpcCustomError::NonexistentClusterRoot { - cluster_root, - node_root: r_bank_forks.root(), - } - .into() - }) + + match commitment { + Some(commitment_config) if commitment_config.commitment == CommitmentLevel::Recent => { + let bank = r_bank_forks.working_bank(); + debug!("RPC using working_bank: {:?}", bank.slot()); + Ok(bank) + } + Some(commitment_config) if commitment_config.commitment == CommitmentLevel::Root => { + let slot = r_bank_forks.root(); + debug!("RPC using node root: {:?}", slot); + Ok(r_bank_forks.get(slot).cloned().unwrap()) + } + Some(commitment_config) if commitment_config.commitment == CommitmentLevel::Single => { + let slot = self + .block_commitment_cache + .read() + .unwrap() + .highest_confirmed_slot(); + debug!("RPC using confirmed slot: {:?}", slot); + Ok(r_bank_forks.get(slot).cloned().unwrap()) + } + _ => { + let cluster_root = self + .block_commitment_cache + .read() + .unwrap() + .largest_confirmed_root(); + debug!("RPC using block: {:?}", cluster_root); + r_bank_forks.get(cluster_root).cloned().ok_or_else(|| { + RpcCustomError::NonexistentClusterRoot { + cluster_root, + node_root: r_bank_forks.root(), + } + .into() + }) + } } } @@ -1577,6 +1591,7 @@ pub mod tests { bank.clone(), blockstore.clone(), 0, + 0, ))); // Add timestamp vote to blockstore @@ -2627,6 +2642,7 @@ pub mod tests { bank_forks.read().unwrap().working_bank(), blockstore.clone(), 0, + 0, ))); let mut config = JsonRpcConfig::default(); @@ -2822,7 +2838,7 @@ pub mod tests { block_commitment_cache .write() .unwrap() - .set_get_largest_confirmed_root(8); + .set_largest_confirmed_root(8); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getConfirmedBlocks","params":[0]}"#; let res = io.handle_request_sync(&req, meta.clone()); @@ -2878,7 +2894,7 @@ pub mod tests { block_commitment_cache .write() .unwrap() - .set_get_largest_confirmed_root(7); + .set_largest_confirmed_root(7); let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year()); diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 708836d9cd..7aa30759ba 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -355,7 +355,7 @@ mod tests { use super::*; use crate::{ cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker}, - commitment::{BlockCommitment, BlockCommitmentCache}, + commitment::{BlockCommitmentCache, CacheSlotInfo}, rpc_subscriptions::tests::robust_poll_or_panic, }; use crossbeam_channel::unbounded; @@ -381,7 +381,6 @@ mod tests { }; use solana_vote_program::vote_transaction; use std::{ - collections::HashMap, sync::{atomic::AtomicBool, RwLock}, thread::sleep, time::Duration, @@ -391,15 +390,17 @@ mod tests { bank_forks: &Arc>, tx: &Transaction, subscriptions: &RpcSubscriptions, - slot: Slot, + current_slot: Slot, ) -> transaction::Result<()> { bank_forks .write() .unwrap() - .get(slot) + .get(current_slot) .unwrap() .process_transaction(tx)?; - subscriptions.notify_subscribers(slot); + let mut cache_slot_info = CacheSlotInfo::default(); + cache_slot_info.current_slot = current_slot; + subscriptions.notify_subscribers(cache_slot_info); Ok(()) } @@ -714,7 +715,8 @@ mod tests { .unwrap() .process_transaction(&tx) .unwrap(); - rpc.subscriptions.notify_subscribers(0); + rpc.subscriptions + .notify_subscribers(CacheSlotInfo::default()); // allow 200ms for notification thread to wake std::thread::sleep(Duration::from_millis(200)); let _panic = robust_poll_or_panic(receiver); @@ -766,23 +768,17 @@ mod tests { .unwrap() .process_transaction(&tx) .unwrap(); - rpc.subscriptions.notify_subscribers(1); + let mut cache_slot_info = CacheSlotInfo::default(); + cache_slot_info.current_slot = 1; + rpc.subscriptions.notify_subscribers(cache_slot_info); - let bank1 = bank_forks.read().unwrap()[1].clone(); - let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); - bank_forks.write().unwrap().insert(bank2); - bank_forks.write().unwrap().set_root(1, &None, None); - let bank2 = bank_forks.read().unwrap()[2].clone(); - - let mut block_commitment: HashMap = HashMap::new(); - block_commitment.insert(0, BlockCommitment::default()); - let mut new_block_commitment = - BlockCommitmentCache::new(block_commitment, 1, 10, bank2, blockstore, 1); - let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); - std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); - drop(w_block_commitment_cache); - - rpc.subscriptions.notify_subscribers(2); + let cache_slot_info = CacheSlotInfo { + current_slot: 2, + node_root: 1, + largest_confirmed_root: 1, + highest_confirmed_slot: 1, + }; + rpc.subscriptions.notify_subscribers(cache_slot_info); let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 2b4157c71e..1910a617ec 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -1,6 +1,6 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::commitment::BlockCommitmentCache; +use crate::commitment::{BlockCommitmentCache, CacheSlotInfo}; use core::hash::Hash; use jsonrpc_core::futures::Future; use jsonrpc_pubsub::{ @@ -56,7 +56,7 @@ enum NotificationEntry { Slot(SlotInfo), Vote(Vote), Root(Slot), - Bank(Slot), + Bank(CacheSlotInfo), } impl std::fmt::Debug for NotificationEntry { @@ -65,9 +65,11 @@ impl std::fmt::Debug for NotificationEntry { NotificationEntry::Root(root) => write!(f, "Root({})", root), NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote), NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), - NotificationEntry::Bank(current_slot) => { - write!(f, "Bank({{current_slot: {:?}}})", current_slot) - } + NotificationEntry::Bank(cache_slot_info) => write!( + f, + "Bank({{current_slot: {:?}}})", + cache_slot_info.current_slot + ), } } } @@ -142,7 +144,7 @@ fn check_commitment_and_notify( subscriptions: &HashMap>>>, hashmap_key: &K, bank_forks: &Arc>, - block_commitment_cache: &Arc>, + cache_slot_info: &CacheSlotInfo, bank_method: B, filter_results: F, notifier: &RpcNotifier, @@ -154,12 +156,6 @@ where F: Fn(X, Slot) -> (Box>, Slot), X: Clone + Serialize + Default, { - let r_block_commitment_cache = block_commitment_cache.read().unwrap(); - let current_slot = r_block_commitment_cache.slot(); - let node_root = r_block_commitment_cache.root(); - let largest_confirmed_root = r_block_commitment_cache.largest_confirmed_root(); - drop(r_block_commitment_cache); - let mut notified_set: HashSet = HashSet::new(); if let Some(hashmap) = subscriptions.get(hashmap_key) { for ( @@ -172,9 +168,10 @@ where ) in hashmap.iter() { let slot = match commitment.commitment { - CommitmentLevel::Max => largest_confirmed_root, - CommitmentLevel::Recent => current_slot, - CommitmentLevel::Root => node_root, + CommitmentLevel::Max => cache_slot_info.largest_confirmed_root, + CommitmentLevel::Recent => cache_slot_info.current_slot, + CommitmentLevel::Root => cache_slot_info.node_root, + CommitmentLevel::Single => cache_slot_info.highest_confirmed_slot, }; let results = { let bank_forks = bank_forks.read().unwrap(); @@ -332,7 +329,6 @@ impl RpcSubscriptions { notification_receiver, _subscriptions, _bank_forks, - _block_commitment_cache, ); }) .unwrap(); @@ -364,16 +360,16 @@ impl RpcSubscriptions { fn check_account( pubkey: &Pubkey, bank_forks: &Arc>, - block_commitment_cache: &Arc>, account_subscriptions: Arc, notifier: &RpcNotifier, + cache_slot_info: &CacheSlotInfo, ) { let subscriptions = account_subscriptions.read().unwrap(); check_commitment_and_notify( &subscriptions, pubkey, bank_forks, - block_commitment_cache, + cache_slot_info, Bank::get_account_modified_slot, filter_account_result, notifier, @@ -383,16 +379,16 @@ impl RpcSubscriptions { fn check_program( program_id: &Pubkey, bank_forks: &Arc>, - block_commitment_cache: &Arc>, program_subscriptions: Arc, notifier: &RpcNotifier, + cache_slot_info: &CacheSlotInfo, ) { let subscriptions = program_subscriptions.read().unwrap(); check_commitment_and_notify( &subscriptions, program_id, bank_forks, - block_commitment_cache, + cache_slot_info, Bank::get_program_accounts_modified_since_parent, filter_program_results, notifier, @@ -402,16 +398,16 @@ impl RpcSubscriptions { fn check_signature( signature: &Signature, bank_forks: &Arc>, - block_commitment_cache: &Arc>, signature_subscriptions: Arc, notifier: &RpcNotifier, + cache_slot_info: &CacheSlotInfo, ) { let mut subscriptions = signature_subscriptions.write().unwrap(); let notified_ids = check_commitment_and_notify( &subscriptions, signature, bank_forks, - block_commitment_cache, + cache_slot_info, Bank::get_signature_status_processed_since_parent, filter_signature_result, notifier, @@ -443,6 +439,11 @@ impl RpcSubscriptions { .largest_confirmed_root(), CommitmentLevel::Recent => self.block_commitment_cache.read().unwrap().slot(), CommitmentLevel::Root => self.block_commitment_cache.read().unwrap().root(), + CommitmentLevel::Single => self + .block_commitment_cache + .read() + .unwrap() + .highest_confirmed_slot(), }; let last_notified_slot = if let Some((_account, slot)) = self .bank_forks @@ -518,8 +519,8 @@ impl RpcSubscriptions { /// Notify subscribers of changes to any accounts or new signatures since /// the bank's last checkpoint. - pub fn notify_subscribers(&self, current_slot: Slot) { - self.enqueue_notification(NotificationEntry::Bank(current_slot)); + pub fn notify_subscribers(&self, cache_slot_info: CacheSlotInfo) { + self.enqueue_notification(NotificationEntry::Bank(cache_slot_info)); } pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { @@ -593,7 +594,6 @@ impl RpcSubscriptions { notification_receiver: Receiver, subscriptions: Subscriptions, bank_forks: Arc>, - block_commitment_cache: Arc>, ) { loop { if exit.load(Ordering::Relaxed) { @@ -626,7 +626,7 @@ impl RpcSubscriptions { notifier.notify(root, sink); } } - NotificationEntry::Bank(_current_slot) => { + NotificationEntry::Bank(cache_slot_info) => { let pubkeys: Vec<_> = { let subs = subscriptions.account_subscriptions.read().unwrap(); subs.keys().cloned().collect() @@ -635,9 +635,9 @@ impl RpcSubscriptions { Self::check_account( pubkey, &bank_forks, - &block_commitment_cache, subscriptions.account_subscriptions.clone(), ¬ifier, + &cache_slot_info, ); } @@ -649,9 +649,9 @@ impl RpcSubscriptions { Self::check_program( program_id, &bank_forks, - &block_commitment_cache, subscriptions.program_subscriptions.clone(), ¬ifier, + &cache_slot_info, ); } @@ -663,9 +663,9 @@ impl RpcSubscriptions { Self::check_signature( signature, &bank_forks, - &block_commitment_cache, subscriptions.signature_subscriptions.clone(), ¬ifier, + &cache_slot_info, ); } } @@ -802,7 +802,9 @@ pub(crate) mod tests { .unwrap() .process_transaction(&tx) .unwrap(); - subscriptions.notify_subscribers(1); + let mut cache_slot_info = CacheSlotInfo::default(); + cache_slot_info.current_slot = 1; + subscriptions.notify_subscribers(cache_slot_info); let (response, _) = robust_poll_or_panic(transport_receiver); let expected = json!({ "jsonrpc": "2.0", @@ -887,7 +889,7 @@ pub(crate) mod tests { .unwrap() .contains_key(&solana_budget_program::id())); - subscriptions.notify_subscribers(0); + subscriptions.notify_subscribers(CacheSlotInfo::default()); let (response, _) = robust_poll_or_panic(transport_receiver); let expected = json!({ "jsonrpc": "2.0", @@ -969,7 +971,7 @@ pub(crate) mod tests { block_commitment.entry(0).or_insert(cache0); block_commitment.entry(1).or_insert(cache1); let block_commitment_cache = - BlockCommitmentCache::new(block_commitment, 0, 10, bank1, blockstore, 0); + BlockCommitmentCache::new(block_commitment, 0, 10, bank1, blockstore, 0, 0); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new( @@ -1020,8 +1022,9 @@ pub(crate) mod tests { assert!(sig_subs.contains_key(&unprocessed_tx.signatures[0])); assert!(sig_subs.contains_key(&processed_tx.signatures[0])); } - - subscriptions.notify_subscribers(1); + let mut cache_slot_info = CacheSlotInfo::default(); + cache_slot_info.current_slot = 1; + subscriptions.notify_subscribers(cache_slot_info); let expected_res = RpcSignatureResult { err: None }; struct Notification { diff --git a/core/src/validator.rs b/core/src/validator.rs index 25064683fd..fde57e7fc8 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -309,7 +309,7 @@ impl Validator { block_commitment_cache .write() .unwrap() - .set_get_largest_confirmed_root(bank_forks.read().unwrap().root()); + .set_largest_confirmed_root(bank_forks.read().unwrap().root()); // Park with the RPC service running, ready for inspection! warn!("Validator halted"); diff --git a/docs/src/apps/jsonrpc-api.md b/docs/src/apps/jsonrpc-api.md index a375fb73ef..3fa0ae4433 100644 --- a/docs/src/apps/jsonrpc-api.md +++ b/docs/src/apps/jsonrpc-api.md @@ -98,7 +98,8 @@ Solana nodes choose which bank state to query based on a commitment requirement set by the client. Clients may specify either: * `{"commitment":"max"}` - the node will query the most recent bank confirmed by the cluster as having reached `MAX_LOCKOUT_HISTORY` confirmations * `{"commitment":"root"}` - the node will query the most recent bank having reached `MAX_LOCKOUT_HISTORY` confirmations on this node -* `{"commitment":"recent"}` - the node will query its most recent bank state +* `{"commitment":"single"}` - the node will query the most recent bank having reached 1 confirmation +* `{"commitment":"recent"}` - the node will query its most recent bank The commitment parameter should be included as the last element in the `params` array: @@ -1163,25 +1164,11 @@ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0","id":1, "m ### Subscription Websocket -After connect to the RPC PubSub websocket at `ws://
/`: +After connecting to the RPC PubSub websocket at `ws://
/`: * Submit subscription requests to the websocket using the methods below * Multiple subscriptions may be active at once -* All subscriptions take an optional `confirmations` parameter, which defines - - how many confirmed blocks the node should wait before sending a notification. - - The greater the number, the more likely the notification is to represent - - consensus across the cluster, and the less likely it is to be affected by - - forking or rollbacks. If unspecified, the default value is 0; the node will - - send a notification as soon as it witnesses the event. The maximum - - `confirmations` wait length is the cluster's `MAX_LOCKOUT_HISTORY`, which - - represents the economic finality of the chain. +* Many subscriptions take the optional [`commitment` parameter](jsonrpc-api.md#configuring-state-commitment), defining how . For subscriptions, if commitment is unspecified, the default value is `recent`. ### accountSubscribe @@ -1190,7 +1177,7 @@ Subscribe to an account to receive notifications when the lamports or data for a #### Parameters: * `` - account Pubkey, as base-58 encoded string -* `` - optional, number of confirmed blocks to wait before notification. +* `` - (optional) [Commitment](jsonrpc-api.md#configuring-state-commitment) Default: 0, Max: `MAX_LOCKOUT_HISTORY` \(greater integers rounded down\) @@ -1245,7 +1232,7 @@ Subscribe to a program to receive notifications when the lamports or data for a #### Parameters: * `` - program\_id Pubkey, as base-58 encoded string -* `` - optional, number of confirmed blocks to wait before notification. +* `` - (optional) [Commitment](jsonrpc-api.md#configuring-state-commitment) Default: 0, Max: `MAX_LOCKOUT_HISTORY` \(greater integers rounded down\) @@ -1303,7 +1290,7 @@ Subscribe to a transaction signature to receive notification when the transactio #### Parameters: * `` - Transaction Signature, as base-58 encoded string -* `` - optional, number of confirmed blocks to wait before notification. +* `` - (optional) [Commitment](jsonrpc-api.md#configuring-state-commitment) Default: 0, Max: `MAX_LOCKOUT_HISTORY` \(greater integers rounded down\) diff --git a/sdk/src/commitment_config.rs b/sdk/src/commitment_config.rs index 24ba5f7ebd..b72da97bb1 100644 --- a/sdk/src/commitment_config.rs +++ b/sdk/src/commitment_config.rs @@ -31,6 +31,12 @@ impl CommitmentConfig { } } + pub fn single() -> Self { + Self { + commitment: CommitmentLevel::Single, + } + } + pub fn ok(self) -> Option { if self == Self::default() { None @@ -46,4 +52,5 @@ pub enum CommitmentLevel { Max, Recent, Root, + Single, }