diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index c793f0d5a1..ae20614777 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -246,7 +246,7 @@ fn main() { poh_recorder.lock().unwrap().set_bank(&bank); assert!(poh_recorder.lock().unwrap().bank().is_some()); if bank.slot() > 32 { - bank_forks.set_root(root, &None); + bank_forks.set_root(root, &None, None); root += 1; } debug!( diff --git a/core/src/commitment.rs b/core/src/commitment.rs index ffe9482ba2..ea1bdf463c 100644 --- a/core/src/commitment.rs +++ b/core/src/commitment.rs @@ -1,4 +1,5 @@ use crate::consensus::VOTE_THRESHOLD_SIZE; +use solana_ledger::blockstore::Blockstore; use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_info; use solana_runtime::bank::Bank; @@ -45,11 +46,12 @@ impl BlockCommitment { } } -#[derive(Default)] pub struct BlockCommitmentCache { block_commitment: HashMap, + largest_confirmed_root: Slot, total_stake: u64, bank: Arc, + blockstore: Arc, root: Slot, } @@ -70,22 +72,41 @@ impl std::fmt::Debug for BlockCommitmentCache { impl BlockCommitmentCache { pub fn new( block_commitment: HashMap, + largest_confirmed_root: Slot, total_stake: u64, bank: Arc, + blockstore: Arc, root: Slot, ) -> Self { Self { block_commitment, + largest_confirmed_root, total_stake, bank, + blockstore, root, } } + pub fn default_with_blockstore(blockstore: Arc) -> Self { + Self { + block_commitment: HashMap::default(), + largest_confirmed_root: Slot::default(), + total_stake: u64::default(), + bank: Arc::new(Bank::default()), + blockstore, + root: Slot::default(), + } + } + pub fn get_block_commitment(&self, slot: Slot) -> Option<&BlockCommitment> { self.block_commitment.get(&slot) } + pub fn largest_confirmed_root(&self) -> Slot { + self.largest_confirmed_root + } + pub fn total_stake(&self) -> u64 { self.total_stake } @@ -123,24 +144,28 @@ impl BlockCommitmentCache { } pub fn is_confirmed_rooted(&self, slot: Slot) -> bool { - self.get_block_commitment(slot) - .map(|block_commitment| { - (block_commitment.get_rooted_stake() as f64 / self.total_stake as f64) - > VOTE_THRESHOLD_SIZE - }) - .unwrap_or(false) + slot <= self.largest_confirmed_root() + && (self.blockstore.is_root(slot) || self.bank.status_cache_ancestors().contains(&slot)) } #[cfg(test)] - pub fn new_for_tests() -> Self { + pub fn new_for_tests_with_blockstore(blockstore: Arc) -> Self { let mut block_commitment: HashMap = HashMap::new(); block_commitment.insert(0, BlockCommitment::default()); Self { block_commitment, + blockstore, total_stake: 42, - ..Self::default() + largest_confirmed_root: Slot::default(), + bank: Arc::new(Bank::default()), + root: Slot::default(), } } + + #[cfg(test)] + pub(crate) fn set_get_largest_confirmed_root(&mut self, root: Slot) { + self.largest_confirmed_root = root; + } } pub struct CommitmentAggregationData { @@ -159,6 +184,18 @@ impl CommitmentAggregationData { } } +fn get_largest_confirmed_root(mut rooted_stake: Vec<(Slot, u64)>, total_stake: u64) -> Slot { + rooted_stake.sort_by(|a, b| a.0.cmp(&b.0).reverse()); + let mut stake_sum = 0; + for (root, stake) in rooted_stake { + stake_sum += stake; + if (stake_sum as f64 / total_stake as f64) > VOTE_THRESHOLD_SIZE { + return root; + } + } + 0 +} + pub struct AggregateCommitmentService { t_commitment: JoinHandle<()>, } @@ -216,12 +253,18 @@ impl AggregateCommitmentService { } let mut aggregate_commitment_time = Measure::start("aggregate-commitment-ms"); - let block_commitment = Self::aggregate_commitment(&ancestors, &aggregation_data.bank); + let (block_commitment, rooted_stake) = + Self::aggregate_commitment(&ancestors, &aggregation_data.bank); + + let largest_confirmed_root = + get_largest_confirmed_root(rooted_stake, aggregation_data.total_staked); let mut new_block_commitment = BlockCommitmentCache::new( block_commitment, + largest_confirmed_root, aggregation_data.total_staked, aggregation_data.bank, + block_commitment_cache.read().unwrap().blockstore.clone(), aggregation_data.root, ); @@ -236,7 +279,10 @@ impl AggregateCommitmentService { } } - pub fn aggregate_commitment(ancestors: &[Slot], bank: &Bank) -> HashMap { + pub fn aggregate_commitment( + ancestors: &[Slot], + bank: &Bank, + ) -> (HashMap, Vec<(Slot, u64)>) { assert!(!ancestors.is_empty()); // Check ancestors is sorted @@ -245,6 +291,7 @@ impl AggregateCommitmentService { } let mut commitment = HashMap::new(); + let mut rooted_stake: Vec<(Slot, u64)> = Vec::new(); for (_, (lamports, account)) in bank.vote_accounts().into_iter() { if lamports == 0 { continue; @@ -257,17 +304,19 @@ impl AggregateCommitmentService { let vote_state = vote_state.unwrap(); Self::aggregate_commitment_for_vote_account( &mut commitment, + &mut rooted_stake, &vote_state, ancestors, lamports, ); } - commitment + (commitment, rooted_stake) } fn aggregate_commitment_for_vote_account( commitment: &mut HashMap, + rooted_stake: &mut Vec<(Slot, u64)>, vote_state: &VoteState, ancestors: &[Slot], lamports: u64, @@ -286,6 +335,7 @@ impl AggregateCommitmentService { break; } } + rooted_stake.push((root, lamports)); } for vote in &vote_state.votes { @@ -312,6 +362,7 @@ impl AggregateCommitmentService { mod tests { use super::*; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; + use solana_ledger::get_tmp_ledger_path; use solana_sdk::pubkey::Pubkey; use solana_stake_program::stake_state; use solana_vote_program::vote_state::{self, VoteStateVersions}; @@ -329,6 +380,8 @@ mod tests { #[test] fn test_get_confirmations() { let bank = Arc::new(Bank::default()); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); // Build BlockCommitmentCache with votes at depths 0 and 1 for 2 slots let mut cache0 = BlockCommitment::default(); cache0.increase_confirmation_stake(1, 5); @@ -346,7 +399,8 @@ mod tests { block_commitment.entry(0).or_insert(cache0.clone()); block_commitment.entry(1).or_insert(cache1.clone()); block_commitment.entry(2).or_insert(cache2.clone()); - let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 50, bank, 0); + let block_commitment_cache = + BlockCommitmentCache::new(block_commitment, 0, 50, bank, blockstore, 0); assert_eq!(block_commitment_cache.get_confirmation_count(0), Some(2)); assert_eq!(block_commitment_cache.get_confirmation_count(1), Some(1)); @@ -354,17 +408,68 @@ mod tests { assert_eq!(block_commitment_cache.get_confirmation_count(3), None,); } + #[test] + fn test_is_confirmed_rooted() { + let bank = Arc::new(Bank::default()); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + blockstore.set_roots(&[0, 1]).unwrap(); + // Build BlockCommitmentCache with rooted slots + let mut cache0 = BlockCommitment::default(); + cache0.increase_rooted_stake(50); + let mut cache1 = BlockCommitment::default(); + cache1.increase_rooted_stake(40); + let mut cache2 = BlockCommitment::default(); + cache2.increase_rooted_stake(20); + + let mut block_commitment = HashMap::new(); + block_commitment.entry(1).or_insert(cache0.clone()); + block_commitment.entry(2).or_insert(cache1.clone()); + block_commitment.entry(3).or_insert(cache2.clone()); + let largest_confirmed_root = 1; + let block_commitment_cache = BlockCommitmentCache::new( + block_commitment, + largest_confirmed_root, + 50, + bank, + blockstore, + 0, + ); + + assert!(block_commitment_cache.is_confirmed_rooted(0)); + assert!(block_commitment_cache.is_confirmed_rooted(1)); + assert!(!block_commitment_cache.is_confirmed_rooted(2)); + assert!(!block_commitment_cache.is_confirmed_rooted(3)); + } + + #[test] + fn test_get_largest_confirmed_root() { + assert_eq!(get_largest_confirmed_root(vec![], 10), 0); + let mut rooted_stake = vec![]; + rooted_stake.push((0, 5)); + rooted_stake.push((1, 5)); + assert_eq!(get_largest_confirmed_root(rooted_stake, 10), 0); + let mut rooted_stake = vec![]; + rooted_stake.push((1, 5)); + rooted_stake.push((0, 10)); + rooted_stake.push((2, 5)); + rooted_stake.push((1, 4)); + assert_eq!(get_largest_confirmed_root(rooted_stake, 10), 1); + } + #[test] fn test_aggregate_commitment_for_vote_account_1() { let ancestors = vec![3, 4, 5, 7, 9, 11]; let mut commitment = HashMap::new(); + let mut rooted_stake = vec![]; let lamports = 5; let mut vote_state = VoteState::default(); - let root = ancestors.last().unwrap(); - vote_state.root_slot = Some(*root); + let root = ancestors.last().unwrap().clone(); + vote_state.root_slot = Some(root); AggregateCommitmentService::aggregate_commitment_for_vote_account( &mut commitment, + &mut rooted_stake, &vote_state, &ancestors, lamports, @@ -375,12 +480,14 @@ mod tests { expected.increase_rooted_stake(lamports); assert_eq!(*commitment.get(&a).unwrap(), expected); } + assert_eq!(rooted_stake[0], (root, lamports)); } #[test] fn test_aggregate_commitment_for_vote_account_2() { let ancestors = vec![3, 4, 5, 7, 9, 11]; let mut commitment = HashMap::new(); + let mut rooted_stake = vec![]; let lamports = 5; let mut vote_state = VoteState::default(); @@ -389,6 +496,7 @@ mod tests { vote_state.process_slot_vote_unchecked(*ancestors.last().unwrap()); AggregateCommitmentService::aggregate_commitment_for_vote_account( &mut commitment, + &mut rooted_stake, &vote_state, &ancestors, lamports, @@ -405,12 +513,14 @@ mod tests { assert_eq!(*commitment.get(&a).unwrap(), expected); } } + assert_eq!(rooted_stake[0], (root, lamports)); } #[test] fn test_aggregate_commitment_for_vote_account_3() { let ancestors = vec![3, 4, 5, 7, 9, 10, 11]; let mut commitment = HashMap::new(); + let mut rooted_stake = vec![]; let lamports = 5; let mut vote_state = VoteState::default(); @@ -421,6 +531,7 @@ mod tests { vote_state.process_slot_vote_unchecked(ancestors[6]); AggregateCommitmentService::aggregate_commitment_for_vote_account( &mut commitment, + &mut rooted_stake, &vote_state, &ancestors, lamports, @@ -441,6 +552,7 @@ mod tests { assert_eq!(*commitment.get(&a).unwrap(), expected); } } + assert_eq!(rooted_stake[0], (root, lamports)); } #[test] @@ -450,6 +562,8 @@ mod tests { mut genesis_config, .. } = create_genesis_config(10_000); + let rooted_stake_amount = 40; + let sk1 = Pubkey::new_rand(); let pk1 = Pubkey::new_rand(); let mut vote_account1 = vote_state::create_account(&pk1, &Pubkey::new_rand(), 0, 100); @@ -460,12 +574,36 @@ mod tests { let mut vote_account2 = vote_state::create_account(&pk2, &Pubkey::new_rand(), 0, 50); let stake_account2 = stake_state::create_account(&sk2, &pk2, &vote_account2, &genesis_config.rent, 50); + let sk3 = Pubkey::new_rand(); + let pk3 = Pubkey::new_rand(); + let mut vote_account3 = vote_state::create_account(&pk3, &Pubkey::new_rand(), 0, 1); + let stake_account3 = stake_state::create_account( + &sk3, + &pk3, + &vote_account3, + &genesis_config.rent, + rooted_stake_amount, + ); + let sk4 = Pubkey::new_rand(); + let pk4 = Pubkey::new_rand(); + let mut vote_account4 = vote_state::create_account(&pk4, &Pubkey::new_rand(), 0, 1); + let stake_account4 = stake_state::create_account( + &sk4, + &pk4, + &vote_account4, + &genesis_config.rent, + rooted_stake_amount, + ); genesis_config.accounts.extend(vec![ (pk1, vote_account1.clone()), (sk1, stake_account1), (pk2, vote_account2.clone()), (sk2, stake_account2), + (pk3, vote_account3.clone()), + (sk3, stake_account3), + (pk4, vote_account4.clone()), + (sk4, stake_account4), ]); // Create bank @@ -485,7 +623,20 @@ mod tests { VoteState::to(&versioned, &mut vote_account2).unwrap(); bank.store_account(&pk2, &vote_account2); - let commitment = AggregateCommitmentService::aggregate_commitment(&ancestors, &bank); + let mut vote_state3 = VoteState::from(&vote_account3).unwrap(); + vote_state3.root_slot = Some(1); + let versioned = VoteStateVersions::Current(Box::new(vote_state3)); + VoteState::to(&versioned, &mut vote_account3).unwrap(); + bank.store_account(&pk3, &vote_account3); + + let mut vote_state4 = VoteState::from(&vote_account4).unwrap(); + vote_state4.root_slot = Some(2); + let versioned = VoteStateVersions::Current(Box::new(vote_state4)); + VoteState::to(&versioned, &mut vote_account4).unwrap(); + bank.store_account(&pk4, &vote_account4); + + let (commitment, rooted_stake) = + AggregateCommitmentService::aggregate_commitment(&ancestors, &bank); for a in ancestors { if a <= 3 { @@ -509,5 +660,7 @@ mod tests { assert!(commitment.get(&a).is_none()); } } + assert_eq!(rooted_stake.len(), 2); + assert_eq!(get_largest_confirmed_root(rooted_stake, 100), 1) } } diff --git a/core/src/consensus.rs b/core/src/consensus.rs index fcc9cc7b94..5cf0f26fc9 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -621,7 +621,7 @@ pub mod test { } let vote = tower.new_vote_from_bank(&bank, &my_vote_pubkey).0; if let Some(new_root) = tower.record_bank_vote(vote) { - ReplayStage::handle_new_root(new_root, bank_forks, progress, &None); + ReplayStage::handle_new_root(new_root, bank_forks, progress, &None, None); } // Mark the vote for this bank under this node's pubkey so it will be diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 9aa9aa30d3..5d0c5b818a 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -67,7 +67,6 @@ impl Drop for Finalizer { } } -#[derive(Default)] pub struct ReplayStageConfig { pub my_pubkey: Pubkey, pub vote_account: Pubkey, @@ -257,13 +256,15 @@ impl ReplayStage { ); let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); + let forks_root = bank_forks.read().unwrap().root(); let start = allocated.get(); let mut frozen_banks: Vec<_> = bank_forks .read() .unwrap() .frozen_banks() - .values() - .cloned() + .into_iter() + .filter(|(slot, _)| *slot >= forks_root) + .map(|(_, bank)| bank) .collect(); let newly_computed_slot_stats = Self::compute_bank_stats( &my_pubkey, @@ -344,6 +345,7 @@ impl ReplayStage { &accounts_hash_sender, &latest_root_senders, &subscriptions, + &block_commitment_cache, )?; } datapoint_debug!( @@ -618,6 +620,7 @@ impl ReplayStage { accounts_hash_sender: &Option, latest_root_senders: &[Sender], subscriptions: &Arc, + block_commitment_cache: &Arc>, ) -> Result<()> { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -643,7 +646,20 @@ impl ReplayStage { blockstore .set_roots(&rooted_slots) .expect("Ledger set roots failed"); - Self::handle_new_root(new_root, &bank_forks, progress, accounts_hash_sender); + let largest_confirmed_root = Some( + block_commitment_cache + .read() + .unwrap() + .largest_confirmed_root(), + ); + + Self::handle_new_root( + new_root, + &bank_forks, + progress, + accounts_hash_sender, + largest_confirmed_root, + ); subscriptions.notify_roots(rooted_slots); latest_root_senders.iter().for_each(|s| { if let Err(e) = s.send(new_root) { @@ -979,15 +995,17 @@ impl ReplayStage { } pub(crate) fn handle_new_root( - new_root: u64, + new_root: Slot, bank_forks: &RwLock, progress: &mut HashMap, accounts_hash_sender: &Option, + largest_confirmed_root: Option, ) { - bank_forks - .write() - .unwrap() - .set_root(new_root, accounts_hash_sender); + bank_forks.write().unwrap().set_root( + new_root, + accounts_hash_sender, + largest_confirmed_root, + ); let r_bank_forks = bank_forks.read().unwrap(); progress.retain(|k, _| r_bank_forks.get(*k).is_some()); } @@ -1016,7 +1034,11 @@ impl ReplayStage { // Find the next slot that chains to the old slot let forks = forks_lock.read().unwrap(); let frozen_banks = forks.frozen_banks(); - let frozen_bank_slots: Vec = frozen_banks.keys().cloned().collect(); + let frozen_bank_slots: Vec = frozen_banks + .keys() + .cloned() + .filter(|s| *s >= forks.root()) + .collect(); let next_slots = blockstore .get_slots_since(&frozen_bank_slots) .expect("Db error"); @@ -1419,7 +1441,9 @@ pub(crate) mod tests { let exit = Arc::new(AtomicBool::new(false)); let subscriptions = Arc::new(RpcSubscriptions::new( &exit, - Arc::new(RwLock::new(BlockCommitmentCache::default())), + Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore( + blockstore.clone(), + ))), )); let bank_forks = BankForks::new(0, bank0); bank_forks.working_bank().freeze(); @@ -1472,12 +1496,58 @@ pub(crate) mod tests { for i in 0..=root { progress.insert(i, ForkProgress::new(Hash::default())); } - ReplayStage::handle_new_root(root, &bank_forks, &mut progress, &None); + ReplayStage::handle_new_root(root, &bank_forks, &mut progress, &None, None); assert_eq!(bank_forks.read().unwrap().root(), root); assert_eq!(progress.len(), 1); assert!(progress.get(&root).is_some()); } + #[test] + fn test_handle_new_root_ahead_of_largest_confirmed_root() { + let genesis_config = create_genesis_config(10_000).genesis_config; + let bank0 = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank0))); + let confirmed_root = 1; + let fork = 2; + let bank1 = Bank::new_from_parent( + bank_forks.read().unwrap().get(0).unwrap(), + &Pubkey::default(), + confirmed_root, + ); + bank_forks.write().unwrap().insert(bank1); + let bank2 = Bank::new_from_parent( + bank_forks.read().unwrap().get(confirmed_root).unwrap(), + &Pubkey::default(), + fork, + ); + bank_forks.write().unwrap().insert(bank2); + let root = 3; + let root_bank = Bank::new_from_parent( + bank_forks.read().unwrap().get(confirmed_root).unwrap(), + &Pubkey::default(), + root, + ); + bank_forks.write().unwrap().insert(root_bank); + let mut progress = HashMap::new(); + for i in 0..=root { + progress.insert(i, ForkProgress::new(Hash::default())); + } + ReplayStage::handle_new_root( + root, + &bank_forks, + &mut progress, + &None, + Some(confirmed_root), + ); + assert_eq!(bank_forks.read().unwrap().root(), root); + assert!(bank_forks.read().unwrap().get(confirmed_root).is_some()); + assert!(bank_forks.read().unwrap().get(fork).is_none()); + assert_eq!(progress.len(), 2); + assert!(progress.get(&root).is_some()); + assert!(progress.get(&confirmed_root).is_some()); + assert!(progress.get(&fork).is_none()); + } + #[test] fn test_dead_fork_transaction_error() { let keypair1 = Keypair::new(); @@ -1745,7 +1815,11 @@ pub(crate) mod tests { bank.store_account(&pubkey, &leader_vote_account); } - let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::default_with_blockstore(blockstore.clone()), + )); let (lockouts_sender, _) = AggregateCommitmentService::new( &Arc::new(AtomicBool::new(false)), block_commitment_cache.clone(), diff --git a/core/src/rpc.rs b/core/src/rpc.rs index b9771984f9..75c663f866 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -9,7 +9,7 @@ use crate::{ validator::ValidatorExit, }; use bincode::serialize; -use jsonrpc_core::{Error, Metadata, Result}; +use jsonrpc_core::{Error, ErrorCode, Metadata, Result}; use jsonrpc_derive::rpc; use solana_client::rpc_response::*; use solana_faucet::faucet::request_airdrop_transaction; @@ -40,6 +40,7 @@ use std::{ time::{Duration, Instant}, }; +const JSON_RPC_SERVER_ERROR_0: i64 = -32000; const MAX_QUERY_ITEMS: usize = 256; const MAX_SLOT_RANGE: u64 = 10_000; @@ -79,17 +80,32 @@ pub struct JsonRpcRequestProcessor { } impl JsonRpcRequestProcessor { - fn bank(&self, commitment: Option) -> Arc { + fn bank(&self, commitment: Option) -> Result> { debug!("RPC commitment_config: {:?}", commitment); let r_bank_forks = self.bank_forks.read().unwrap(); if commitment.is_some() && commitment.unwrap().commitment == CommitmentLevel::Recent { let bank = r_bank_forks.working_bank(); debug!("RPC using working_bank: {:?}", bank.slot()); - bank + Ok(bank) } else { - let slot = r_bank_forks.root(); - debug!("RPC using block: {:?}", slot); - r_bank_forks.get(slot).cloned().unwrap() + let cluster_root = self + .block_commitment_cache + .read() + .unwrap() + .largest_confirmed_root(); + debug!("RPC using block: {:?}", cluster_root); + r_bank_forks + .get(cluster_root) + .cloned() + .ok_or_else(|| Error { + code: ErrorCode::ServerError(JSON_RPC_SERVER_ERROR_0), + message: format!( + "Cluster largest_confirmed_root {} does not exist on node. Node root: {}", + cluster_root, + r_bank_forks.root(), + ), + data: None, + }) } } @@ -116,7 +132,7 @@ impl JsonRpcRequestProcessor { pubkey: Result, commitment: Option, ) -> RpcResponse> { - let bank = &*self.bank(commitment); + let bank = &*self.bank(commitment)?; pubkey.and_then(|key| new_response(bank, bank.get_account(&key).map(RpcAccount::encode))) } @@ -126,7 +142,7 @@ impl JsonRpcRequestProcessor { commitment: Option, ) -> Result { Ok(self - .bank(commitment) + .bank(commitment)? .get_minimum_balance_for_rent_exemption(data_len)) } @@ -136,7 +152,7 @@ impl JsonRpcRequestProcessor { commitment: Option, ) -> Result> { Ok(self - .bank(commitment) + .bank(commitment)? .get_program_accounts(Some(&program_id)) .into_iter() .map(|(pubkey, account)| RpcKeyedAccount { @@ -147,13 +163,13 @@ impl JsonRpcRequestProcessor { } pub fn get_inflation(&self, commitment: Option) -> Result { - Ok(self.bank(commitment).inflation()) + Ok(self.bank(commitment)?.inflation()) } pub fn get_epoch_schedule(&self) -> Result { // Since epoch schedule data comes from the genesis config, any commitment level should be // fine - Ok(*self.bank(None).epoch_schedule()) + Ok(*self.bank(None)?.epoch_schedule()) } pub fn get_balance( @@ -161,7 +177,7 @@ impl JsonRpcRequestProcessor { pubkey: Result, commitment: Option, ) -> RpcResponse { - let bank = &*self.bank(commitment); + let bank = &*self.bank(commitment)?; pubkey.and_then(|key| new_response(bank, bank.get_balance(&key))) } @@ -169,7 +185,7 @@ impl JsonRpcRequestProcessor { &self, commitment: Option, ) -> RpcResponse { - let bank = &*self.bank(commitment); + let bank = &*self.bank(commitment)?; let (blockhash, fee_calculator) = bank.confirmed_last_blockhash(); new_response( bank, @@ -184,7 +200,7 @@ impl JsonRpcRequestProcessor { &self, blockhash: &Hash, ) -> RpcResponse> { - let bank = &*self.bank(None); + let bank = &*self.bank(None)?; let fee_calculator = bank.get_fee_calculator(blockhash); new_response( bank, @@ -193,7 +209,7 @@ impl JsonRpcRequestProcessor { } fn get_fee_rate_governor(&self) -> RpcResponse { - let bank = &*self.bank(None); + let bank = &*self.bank(None)?; let fee_rate_governor = bank.get_fee_rate_governor(); new_response( bank, @@ -208,7 +224,7 @@ impl JsonRpcRequestProcessor { signature: Result, commitment: Option, ) -> RpcResponse { - let bank = &*self.bank(commitment); + let bank = &*self.bank(commitment)?; match signature { Err(e) => Err(e), Ok(sig) => { @@ -232,11 +248,11 @@ impl JsonRpcRequestProcessor { } fn get_slot(&self, commitment: Option) -> Result { - Ok(self.bank(commitment).slot()) + Ok(self.bank(commitment)?.slot()) } fn get_slot_leader(&self, commitment: Option) -> Result { - Ok(self.bank(commitment).collector_id().to_string()) + Ok(self.bank(commitment)?.collector_id().to_string()) } fn minimum_ledger_slot(&self) -> Result { @@ -253,18 +269,18 @@ impl JsonRpcRequestProcessor { } fn get_transaction_count(&self, commitment: Option) -> Result { - Ok(self.bank(commitment).transaction_count() as u64) + Ok(self.bank(commitment)?.transaction_count() as u64) } fn get_total_supply(&self, commitment: Option) -> Result { - Ok(self.bank(commitment).capitalization()) + Ok(self.bank(commitment)?.capitalization()) } fn get_vote_accounts( &self, commitment: Option, ) -> Result { - let bank = self.bank(commitment); + let bank = self.bank(commitment)?; let vote_accounts = bank.vote_accounts(); let epoch_vote_accounts = bank .epoch_vote_accounts(bank.get_epoch_and_slot_index(bank.slot()).0) @@ -326,7 +342,7 @@ impl JsonRpcRequestProcessor { } fn get_slots_per_segment(&self, commitment: Option) -> Result { - Ok(self.bank(commitment).slots_per_segment()) + Ok(self.bank(commitment)?.slots_per_segment()) } fn get_storage_pubkeys_for_slot(&self, slot: Slot) -> Result> { @@ -376,7 +392,11 @@ impl JsonRpcRequestProcessor { start_slot: Slot, end_slot: Option, ) -> Result> { - let end_slot = end_slot.unwrap_or_else(|| self.bank(None).slot()); + let end_slot = if let Some(end_slot) = end_slot { + end_slot + } else { + self.bank(None)?.slot() + }; if end_slot < start_slot { return Ok(vec![]); } @@ -394,7 +414,7 @@ impl JsonRpcRequestProcessor { // queried). If these values will be variable in the future, those timing parameters will // need to be stored persistently, and the slot_duration calculation will likely need to be // moved upstream into blockstore. Also, an explicit commitment level will need to be set. - let bank = self.bank(None); + let bank = self.bank(None)?; let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year()); let epoch = bank.epoch_schedule().get_epoch(slot); let stakes = HashMap::new(); @@ -412,7 +432,7 @@ impl JsonRpcRequestProcessor { signature: Signature, commitment: Option, ) -> Option { - self.get_transaction_status(signature, &self.bank(commitment)) + self.get_transaction_status(signature, &self.bank(commitment).ok()?) .map( |TransactionStatus { status, @@ -430,7 +450,7 @@ impl JsonRpcRequestProcessor { signature: Signature, commitment: Option, ) -> Option> { - self.bank(commitment).get_signature_status(&signature) + self.bank(commitment).ok()?.get_signature_status(&signature) } pub fn get_signature_statuses( @@ -449,7 +469,7 @@ impl JsonRpcRequestProcessor { let search_transaction_history = config .and_then(|x| x.search_transaction_history) .unwrap_or(false); - let bank = self.bank(commitment); + let bank = self.bank(commitment)?; for signature in signatures { let status = if let Some(status) = self.get_transaction_status(signature, &bank) { @@ -944,7 +964,7 @@ impl RpcSol for RpcSolImpl { meta: Self::Metadata, commitment: Option, ) -> Result { - let bank = meta.request_processor.read().unwrap().bank(commitment); + let bank = meta.request_processor.read().unwrap().bank(commitment)?; let epoch_schedule = bank.epoch_schedule(); let slot = bank.slot(); @@ -980,7 +1000,7 @@ impl RpcSol for RpcSolImpl { slot: Option, commitment: Option, ) -> Result> { - let bank = meta.request_processor.read().unwrap().bank(commitment); + let bank = meta.request_processor.read().unwrap().bank(commitment)?; let slot = slot.unwrap_or_else(|| bank.slot()); let epoch = bank.epoch_schedule().get_epoch(slot); @@ -1144,7 +1164,7 @@ impl RpcSol for RpcSolImpl { .request_processor .read() .unwrap() - .bank(commitment.clone()) + .bank(commitment.clone())? .confirmed_last_blockhash() .0; let transaction = request_airdrop_transaction(&faucet_addr, &pubkey, lamports, blockhash) @@ -1491,8 +1511,10 @@ pub mod tests { .or_insert(commitment_slot1.clone()); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new( block_commitment, + 0, 10, bank.clone(), + blockstore.clone(), 0, ))); @@ -1524,15 +1546,13 @@ pub mod tests { let mut roots = blockstore_roots.clone(); if !roots.is_empty() { - roots.retain(|&x| x > 1); + roots.retain(|&x| x > 0); let mut parent_bank = bank; for (i, root) in roots.iter().enumerate() { let new_bank = Bank::new_from_parent(&parent_bank, parent_bank.collector_id(), *root); parent_bank = bank_forks.write().unwrap().insert(new_bank); - parent_bank.squash(); - bank_forks.write().unwrap().set_root(*root, &None); - let parent = if i > 0 { roots[i - 1] } else { 1 }; + let parent = if i > 0 { roots[i - 1] } else { 0 }; fill_blockstore_slot_with_ticks(&blockstore, 5, *root, parent, Hash::default()); } blockstore.set_roots(&roots).unwrap(); @@ -1542,6 +1562,10 @@ pub mod tests { roots.iter().max().unwrap() + 1, ); bank_forks.write().unwrap().insert(new_bank); + + for root in roots.iter() { + bank_forks.write().unwrap().set_root(*root, &None, Some(0)); + } } let bank = bank_forks.read().unwrap().working_bank(); @@ -1610,14 +1634,16 @@ pub mod tests { let validator_exit = create_validator_exit(&exit); let (bank_forks, alice, _) = new_bank_forks(); let bank = bank_forks.read().unwrap().working_bank(); - let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let ledger_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&ledger_path).unwrap(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::default_with_blockstore(blockstore.clone()), + )); let request_processor = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), bank_forks, block_commitment_cache, - Arc::new(blockstore), + blockstore, StorageState::default(), validator_exit, ); @@ -2089,7 +2115,7 @@ pub mod tests { .expect("actual response deserialization"); let result = result.as_ref().unwrap(); assert_eq!(expected_res, result.status); - assert_eq!(Some(2), result.confirmations); + assert_eq!(None, result.confirmations); // Test getSignatureStatus request on unprocessed tx let tx = system_transaction::transfer(&alice, &bob_pubkey, 10, blockhash); @@ -2253,9 +2279,11 @@ pub mod tests { fn test_rpc_send_bad_tx() { let exit = Arc::new(AtomicBool::new(false)); let validator_exit = create_validator_exit(&exit); - let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let ledger_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&ledger_path).unwrap(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::default_with_blockstore(blockstore.clone()), + )); let mut io = MetaIoHandler::default(); let rpc = RpcSolImpl; @@ -2266,7 +2294,7 @@ pub mod tests { JsonRpcConfig::default(), new_bank_forks().0, block_commitment_cache, - Arc::new(blockstore), + blockstore, StorageState::default(), validator_exit, ); @@ -2356,14 +2384,16 @@ pub mod tests { fn test_rpc_request_processor_config_default_trait_validator_exit_fails() { let exit = Arc::new(AtomicBool::new(false)); let validator_exit = create_validator_exit(&exit); - let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let ledger_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&ledger_path).unwrap(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::default_with_blockstore(blockstore.clone()), + )); let request_processor = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), new_bank_forks().0, block_commitment_cache, - Arc::new(blockstore), + blockstore, StorageState::default(), validator_exit, ); @@ -2375,16 +2405,18 @@ pub mod tests { fn test_rpc_request_processor_allow_validator_exit_config() { let exit = Arc::new(AtomicBool::new(false)); let validator_exit = create_validator_exit(&exit); - let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let ledger_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&ledger_path).unwrap(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::default_with_blockstore(blockstore.clone()), + )); let mut config = JsonRpcConfig::default(); config.enable_validator_exit = true; let request_processor = JsonRpcRequestProcessor::new( config, new_bank_forks().0, block_commitment_cache, - Arc::new(blockstore), + blockstore, StorageState::default(), validator_exit, ); @@ -2439,6 +2471,8 @@ pub mod tests { let exit = Arc::new(AtomicBool::new(false)); let validator_exit = create_validator_exit(&exit); let bank_forks = new_bank_forks().0; + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let commitment_slot0 = BlockCommitment::new([8; MAX_LOCKOUT_HISTORY + 1]); let commitment_slot1 = BlockCommitment::new([9; MAX_LOCKOUT_HISTORY + 1]); @@ -2451,12 +2485,12 @@ pub mod tests { .or_insert(commitment_slot1.clone()); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new( block_commitment, + 0, 42, bank_forks.read().unwrap().working_bank(), + blockstore.clone(), 0, ))); - let ledger_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&ledger_path).unwrap(); let mut config = JsonRpcConfig::default(); config.enable_validator_exit = true; @@ -2464,7 +2498,7 @@ pub mod tests { config, bank_forks, block_commitment_cache, - Arc::new(blockstore), + blockstore, StorageState::default(), validator_exit, ); @@ -2648,8 +2682,16 @@ pub mod tests { fn test_get_confirmed_blocks() { let bob_pubkey = Pubkey::new_rand(); let roots = vec![0, 1, 3, 4, 8]; - let RpcHandler { io, meta, .. } = - start_rpc_handler_with_tx_and_blockstore(&bob_pubkey, roots.clone(), 0); + let RpcHandler { + io, + meta, + block_commitment_cache, + .. + } = start_rpc_handler_with_tx_and_blockstore(&bob_pubkey, roots.clone(), 0); + block_commitment_cache + .write() + .unwrap() + .set_get_largest_confirmed_root(8); let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"getConfirmedBlocks","params":[0]}}"#); diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index fd123b71e7..0f525e390e 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -7,8 +7,13 @@ use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; use solana_client::rpc_response::{ Response as RpcResponse, RpcAccount, RpcKeyedAccount, RpcSignatureResult, }; +#[cfg(test)] +use solana_ledger::blockstore::Blockstore; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; -use std::sync::{atomic, Arc}; +use std::{ + str::FromStr, + sync::{atomic, Arc}, +}; // Suppress needless_return due to // https://github.com/paritytech/jsonrpc/blob/2d38e6424d8461cdf72e78425ce67d51af9c6586/derive/src/lib.rs#L204 @@ -118,7 +123,6 @@ pub trait RpcSolPubSub { fn root_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; } -#[derive(Default)] pub struct RpcSolPubSubImpl { uid: Arc, subscriptions: Arc, @@ -129,9 +133,14 @@ impl RpcSolPubSubImpl { let uid = Arc::new(atomic::AtomicUsize::default()); Self { uid, subscriptions } } -} -use std::str::FromStr; + #[cfg(test)] + fn default_with_blockstore(blockstore: Arc) -> Self { + let uid = Arc::new(atomic::AtomicUsize::default()); + let subscriptions = Arc::new(RpcSubscriptions::default_with_blockstore(blockstore)); + Self { uid, subscriptions } + } +} fn param(param_str: &str, thing: &str) -> Result { param_str.parse::().map_err(|_e| Error { @@ -323,7 +332,7 @@ mod tests { use jsonrpc_pubsub::{PubSubHandler, Session}; use serial_test_derive::serial; use solana_budget_program::{self, budget_instruction}; - use solana_ledger::bank_forks::BankForks; + use solana_ledger::{bank_forks::BankForks, get_tmp_ledger_path}; use solana_runtime::bank::Bank; use solana_sdk::{ pubkey::Pubkey, @@ -370,12 +379,16 @@ mod tests { let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let rpc = RpcSolPubSubImpl { subscriptions: Arc::new(RpcSubscriptions::new( &Arc::new(AtomicBool::new(false)), - Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), + )), )), - ..RpcSolPubSubImpl::default() + uid: Arc::new(atomic::AtomicUsize::default()), }; // Test signature subscriptions @@ -416,11 +429,13 @@ mod tests { let bank = Bank::new(&genesis_config); let arc_bank = Arc::new(bank); let blockhash = arc_bank.last_blockhash(); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let session = create_session(); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::default(); + let rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore); io.extend_with(rpc.to_delegate()); let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash); @@ -475,13 +490,17 @@ mod tests { let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let rpc = RpcSolPubSubImpl { subscriptions: Arc::new(RpcSubscriptions::new( &Arc::new(AtomicBool::new(false)), - Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), + )), )), - ..RpcSolPubSubImpl::default() + uid: Arc::new(atomic::AtomicUsize::default()), }; let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); @@ -569,9 +588,11 @@ mod tests { fn test_account_unsubscribe() { let bob_pubkey = Pubkey::new_rand(); let session = create_session(); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::default(); + let rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore); io.extend_with(rpc.to_delegate()); @@ -615,13 +636,17 @@ mod tests { let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let bob = Keypair::new(); - let mut rpc = RpcSolPubSubImpl::default(); + let mut rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore.clone()); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new( &exit, - Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), + )), ); rpc.subscriptions = Arc::new(subscriptions); let session = create_session(); @@ -652,11 +677,15 @@ mod tests { let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let bob = Keypair::new(); - let mut rpc = RpcSolPubSubImpl::default(); + let mut rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore.clone()); let exit = Arc::new(AtomicBool::new(false)); - let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore(blockstore.clone()), + )); let subscriptions = RpcSubscriptions::new(&exit, block_commitment_cache.clone()); rpc.subscriptions = Arc::new(subscriptions); @@ -683,8 +712,14 @@ mod tests { cache0.increase_confirmation_stake(1, 10); let mut block_commitment = HashMap::new(); block_commitment.entry(0).or_insert(cache0.clone()); - let mut new_block_commitment = - BlockCommitmentCache::new(block_commitment, 10, bank1.clone(), 0); + let mut new_block_commitment = BlockCommitmentCache::new( + block_commitment, + 0, + 10, + bank1.clone(), + blockstore.clone(), + 0, + ); let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); drop(w_block_commitment_cache); @@ -698,7 +733,8 @@ mod tests { cache0.increase_confirmation_stake(2, 10); let mut block_commitment = HashMap::new(); block_commitment.entry(0).or_insert(cache0.clone()); - let mut new_block_commitment = BlockCommitmentCache::new(block_commitment, 10, bank2, 0); + let mut new_block_commitment = + BlockCommitmentCache::new(block_commitment, 0, 10, bank2, blockstore.clone(), 0); let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); drop(w_block_commitment_cache); @@ -728,7 +764,9 @@ mod tests { #[test] #[serial] fn test_slot_subscribe() { - let rpc = RpcSolPubSubImpl::default(); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification"); rpc.slot_subscribe(session, subscriber); @@ -753,7 +791,9 @@ mod tests { #[test] #[serial] fn test_slot_unsubscribe() { - let rpc = RpcSolPubSubImpl::default(); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification"); rpc.slot_subscribe(session, subscriber); diff --git a/core/src/rpc_pubsub_service.rs b/core/src/rpc_pubsub_service.rs index e58bdc775c..47586642ce 100644 --- a/core/src/rpc_pubsub_service.rs +++ b/core/src/rpc_pubsub_service.rs @@ -73,6 +73,7 @@ impl PubSubService { mod tests { use super::*; use crate::commitment::BlockCommitmentCache; + use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; use std::{ net::{IpAddr, Ipv4Addr}, sync::RwLock, @@ -82,9 +83,13 @@ mod tests { fn test_pubsub_new() { let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); let exit = Arc::new(AtomicBool::new(false)); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let subscriptions = Arc::new(RpcSubscriptions::new( &exit, - Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), + )), )); let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); let thread = pubsub_service.thread_hdl.thread(); diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 2c364f7082..a096ae7118 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -361,16 +361,18 @@ mod tests { solana_net_utils::find_available_port_in_range(ip_addr, (10000, 65535)).unwrap(), ); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank.slot(), bank))); - let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let ledger_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&ledger_path).unwrap(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::default_with_blockstore(blockstore.clone()), + )); let mut rpc_service = JsonRpcService::new( rpc_addr, JsonRpcConfig::default(), None, bank_forks, block_commitment_cache, - Arc::new(blockstore), + blockstore, cluster_info, Hash::default(), &PathBuf::from("farf"), diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 31fc978cf0..0aec77a549 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -11,7 +11,7 @@ use serde::Serialize; use solana_client::rpc_response::{ Response, RpcAccount, RpcKeyedAccount, RpcResponseContext, RpcSignatureResult, }; -use solana_ledger::bank_forks::BankForks; +use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore}; use solana_runtime::bank::Bank; use solana_sdk::{ account::Account, clock::Slot, pubkey::Pubkey, signature::Signature, transaction, @@ -246,15 +246,6 @@ pub struct RpcSubscriptions { exit: Arc, } -impl Default for RpcSubscriptions { - fn default() -> Self { - Self::new( - &Arc::new(AtomicBool::new(false)), - Arc::new(RwLock::new(BlockCommitmentCache::default())), - ) - } -} - impl Drop for RpcSubscriptions { fn drop(&mut self) { self.shutdown().unwrap_or_else(|err| { @@ -324,6 +315,15 @@ impl RpcSubscriptions { } } + pub fn default_with_blockstore(blockstore: Arc) -> Self { + Self::new( + &Arc::new(AtomicBool::new(false)), + Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore( + blockstore, + ))), + ) + } + fn check_account( pubkey: &Pubkey, bank_forks: &Arc>, @@ -624,6 +624,7 @@ pub(crate) mod tests { use jsonrpc_pubsub::typed::Subscriber; use serial_test_derive::serial; use solana_budget_program; + use solana_ledger::get_tmp_ledger_path; use solana_sdk::{ signature::{Keypair, Signer}, system_transaction, @@ -664,6 +665,8 @@ pub(crate) mod tests { mint_keypair, .. } = create_genesis_config(100); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); @@ -690,7 +693,9 @@ pub(crate) mod tests { let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new( &exit, - Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), + )), ); subscriptions.add_account_subscription(alice.pubkey(), None, sub_id.clone(), subscriber); @@ -737,6 +742,8 @@ pub(crate) mod tests { mint_keypair, .. } = create_genesis_config(100); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); @@ -763,7 +770,9 @@ pub(crate) mod tests { let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new( &exit, - Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), + )), ); subscriptions.add_program_subscription( solana_budget_program::id(), @@ -818,6 +827,8 @@ pub(crate) mod tests { mint_keypair, .. } = create_genesis_config(100); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let mut bank_forks = BankForks::new(0, bank); @@ -856,7 +867,8 @@ pub(crate) mod tests { let mut block_commitment = HashMap::new(); block_commitment.entry(0).or_insert(cache0.clone()); block_commitment.entry(1).or_insert(cache1.clone()); - let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 10, bank1, 0); + let block_commitment_cache = + BlockCommitmentCache::new(block_commitment, 0, 10, bank1, blockstore, 0); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = @@ -959,9 +971,13 @@ pub(crate) mod tests { Subscriber::new_test("slotNotification"); let sub_id = SubscriptionId::Number(0 as u64); let exit = Arc::new(AtomicBool::new(false)); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let subscriptions = RpcSubscriptions::new( &exit, - Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), + )), ); subscriptions.add_slot_subscription(sub_id.clone(), subscriber); @@ -1001,9 +1017,13 @@ pub(crate) mod tests { Subscriber::new_test("rootNotification"); let sub_id = SubscriptionId::Number(0 as u64); let exit = Arc::new(AtomicBool::new(false)); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let subscriptions = RpcSubscriptions::new( &exit, - Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), + )), ); subscriptions.add_root_subscription(sub_id.clone(), subscriber); diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 556bc7a8fb..feabf6c49e 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -663,6 +663,7 @@ mod tests { use super::*; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use rayon::prelude::*; + use solana_ledger::get_tmp_ledger_path; use solana_runtime::bank::Bank; use solana_sdk::{ hash::Hasher, @@ -690,7 +691,11 @@ mod tests { &[bank.clone()], vec![0], ))); - let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::default_with_blockstore(blockstore), + )); let (_slot_sender, slot_receiver) = channel(); let storage_state = StorageState::new( &bank.last_blockhash(), diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 378a4aca2c..3c01d75595 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -301,7 +301,9 @@ pub mod tests { let voting_keypair = Keypair::new(); let storage_keypair = Arc::new(Keypair::new()); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); - let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::default_with_blockstore(blockstore.clone()), + )); let tvu = Tvu::new( &voting_keypair.pubkey(), Some(Arc::new(voting_keypair)), @@ -320,10 +322,7 @@ pub mod tests { &StorageState::default(), None, l_receiver, - &Arc::new(RpcSubscriptions::new( - &exit, - Arc::new(RwLock::new(BlockCommitmentCache::default())), - )), + &Arc::new(RpcSubscriptions::new(&exit, block_commitment_cache.clone())), &poh_recorder, &leader_schedule_cache, &exit, diff --git a/core/src/validator.rs b/core/src/validator.rs index cba029f0c3..5dd788752f 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -202,7 +202,6 @@ impl Validator { } let bank_forks = Arc::new(RwLock::new(bank_forks)); - let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let mut validator_exit = ValidatorExit::default(); let exit_ = exit.clone(); @@ -238,6 +237,9 @@ impl Validator { ); let blockstore = Arc::new(blockstore); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::default_with_blockstore(blockstore.clone()), + )); let subscriptions = Arc::new(RpcSubscriptions::new(&exit, block_commitment_cache.clone())); diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index 5e1e3350ca..8fe77bef2f 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -137,7 +137,7 @@ mod tests { // and to allow snapshotting of bank and the purging logic on status_cache to // kick in if slot % set_root_interval == 0 || slot == last_slot - 1 { - bank_forks.set_root(bank.slot(), &sender); + bank_forks.set_root(bank.slot(), &sender, None); } } // Generate a snapshot package for last bank @@ -377,9 +377,11 @@ mod tests { snapshot_test_config.bank_forks.insert(new_bank); current_bank = snapshot_test_config.bank_forks[new_slot].clone(); } - snapshot_test_config - .bank_forks - .set_root(current_bank.slot(), &snapshot_sender); + snapshot_test_config.bank_forks.set_root( + current_bank.slot(), + &snapshot_sender, + None, + ); } let num_old_slots = num_set_roots * *add_root_interval - MAX_CACHE_ENTRIES + 1; diff --git a/core/tests/client.rs b/core/tests/client.rs index 7b1d9645df..7e16276a14 100644 --- a/core/tests/client.rs +++ b/core/tests/client.rs @@ -6,6 +6,7 @@ use solana_core::{ commitment::BlockCommitmentCache, rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions, validator::TestValidator, }; +use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; use solana_sdk::{ commitment_config::CommitmentConfig, pubkey::Pubkey, rpc_port, signature::Signer, system_transaction, @@ -85,9 +86,13 @@ fn test_slot_subscription() { rpc_port::DEFAULT_RPC_PUBSUB_PORT, ); let exit = Arc::new(AtomicBool::new(false)); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let subscriptions = Arc::new(RpcSubscriptions::new( &exit, - Arc::new(RwLock::new(BlockCommitmentCache::default())), + Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore( + blockstore, + ))), )); let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); std::thread::sleep(Duration::from_millis(400)); diff --git a/core/tests/storage_stage.rs b/core/tests/storage_stage.rs index bc2354b84b..e4a98ec91e 100644 --- a/core/tests/storage_stage.rs +++ b/core/tests/storage_stage.rs @@ -59,7 +59,9 @@ mod tests { &[bank.clone()], vec![0], ))); - let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::default_with_blockstore(blockstore.clone()), + )); let cluster_info = test_cluster_info(&keypair.pubkey()); let (bank_sender, bank_receiver) = channel(); @@ -180,7 +182,9 @@ mod tests { &[bank.clone()], vec![0], ))); - let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::default_with_blockstore(blockstore.clone()), + )); let cluster_info = test_cluster_info(&keypair.pubkey()); let (bank_sender, bank_receiver) = channel(); diff --git a/ledger/src/bank_forks.rs b/ledger/src/bank_forks.rs index ac8ab58aa4..58e18f2d3f 100644 --- a/ledger/src/bank_forks.rs +++ b/ledger/src/bank_forks.rs @@ -165,6 +165,7 @@ impl BankForks { &mut self, root: Slot, snapshot_package_sender: &Option, + largest_confirmed_root: Option, ) { self.root = root; let set_root_start = Instant::now(); @@ -205,7 +206,7 @@ impl BankForks { } } - self.prune_non_root(root); + self.prune_non_root(root, largest_confirmed_root); inc_new_counter_info!( "bank-forks_set_root_ms", @@ -276,10 +277,19 @@ impl BankForks { Ok(()) } - fn prune_non_root(&mut self, root: Slot) { + fn prune_non_root(&mut self, root: Slot, largest_confirmed_root: Option) { let descendants = self.descendants(); - self.banks - .retain(|slot, _| slot == &root || descendants[&root].contains(slot)); + self.banks.retain(|slot, _| { + *slot == root + || descendants[&root].contains(slot) + || (*slot < root + && *slot >= largest_confirmed_root.unwrap_or(root) + && descendants[slot].contains(&root)) + }); + datapoint_debug!( + "bank_forks_purge_non_root", + ("num_banks_retained", self.banks.len(), i64), + ); } pub fn set_snapshot_config(&mut self, snapshot_config: Option) {