diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 01b199a510..a23a3648f4 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -10,6 +10,7 @@ use solana::banking_stage::{create_test_recorder, BankingStage}; use solana::blocktree::{get_tmp_ledger_path, Blocktree}; use solana::cluster_info::ClusterInfo; use solana::cluster_info::Node; +use solana::leader_schedule_cache::LeaderScheduleCache; use solana::packet::to_packets_chunked; use solana::poh_recorder::WorkingBankEntries; use solana::service::Service; @@ -57,6 +58,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { let (verified_sender, verified_receiver) = channel(); let (vote_sender, vote_receiver) = channel(); let bank = Arc::new(Bank::new(&genesis_block)); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let dummy = system_transaction::transfer( &mint_keypair, &mint_keypair.pubkey(), @@ -122,6 +124,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { &poh_recorder, verified_receiver, vote_receiver, + &leader_schedule_cache, ); poh_recorder.lock().unwrap().set_bank(&bank); @@ -164,6 +167,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { let (verified_sender, verified_receiver) = channel(); let (vote_sender, vote_receiver) = channel(); let bank = Arc::new(Bank::new(&genesis_block)); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let dummy = system_transaction::transfer( &mint_keypair, &mint_keypair.pubkey(), @@ -245,6 +249,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { &poh_recorder, verified_receiver, vote_receiver, + &leader_schedule_cache, ); poh_recorder.lock().unwrap().set_bank(&bank); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index f3a765ca7b..8eb1d9dc2d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -6,7 +6,7 @@ use crate::cluster_info::ClusterInfo; use crate::contact_info::ContactInfo; use crate::entry; use crate::entry::{hash_transactions, Entry}; -use crate::leader_schedule_utils; +use crate::leader_schedule_cache::LeaderScheduleCache; use crate::packet; use crate::packet::{Packet, Packets}; use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries}; @@ -56,6 +56,7 @@ impl BankingStage { poh_recorder: &Arc>, verified_receiver: Receiver, verified_vote_receiver: Receiver, + leader_schedule_cache: &Arc, ) -> Self { Self::new_num_threads( cluster_info, @@ -63,6 +64,7 @@ impl BankingStage { verified_receiver, verified_vote_receiver, cmp::min(2, Self::num_threads()), + leader_schedule_cache, ) } @@ -72,6 +74,7 @@ impl BankingStage { verified_receiver: Receiver, verified_vote_receiver: Receiver, num_threads: u32, + leader_schedule_cache: &Arc, ) -> Self { let verified_receiver = Arc::new(Mutex::new(verified_receiver)); let verified_vote_receiver = Arc::new(Mutex::new(verified_vote_receiver)); @@ -95,6 +98,7 @@ impl BankingStage { let cluster_info = cluster_info.clone(); let exit = exit.clone(); let mut recv_start = Instant::now(); + let leader_schedule_cache = leader_schedule_cache.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -104,6 +108,7 @@ impl BankingStage { &cluster_info, &mut recv_start, enable_forwarding, + leader_schedule_cache, ); exit.store(true, Ordering::Relaxed); }) @@ -238,6 +243,7 @@ impl BankingStage { fn should_buffer_packets( poh_recorder: &Arc>, cluster_info: &Arc>, + leader_schedule_cache: &Arc, ) -> bool { let rcluster_info = cluster_info.read().unwrap(); @@ -245,9 +251,9 @@ impl BankingStage { // or, if it was getting sent to me // or, the next leader is unknown let leader_id = match poh_recorder.lock().unwrap().bank() { - Some(bank) => { - leader_schedule_utils::slot_leader_at(bank.slot() + 1, &bank).unwrap_or_default() - } + Some(bank) => leader_schedule_cache + .slot_leader_at_else_compute(bank.slot() + 1, &bank) + .unwrap_or_default(), None => rcluster_info .leader_data() .map_or(rcluster_info.id(), |x| x.id), @@ -262,6 +268,7 @@ impl BankingStage { cluster_info: &Arc>, recv_start: &mut Instant, enable_forwarding: bool, + leader_schedule_cache: Arc, ) { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = vec![]; @@ -292,7 +299,11 @@ impl BankingStage { { Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Ok(unprocessed_packets) => { - if Self::should_buffer_packets(poh_recorder, cluster_info) { + if Self::should_buffer_packets( + poh_recorder, + cluster_info, + &leader_schedule_cache, + ) { let num = unprocessed_packets .iter() .map(|(x, start, _)| x.packets.len().saturating_sub(*start)) @@ -618,6 +629,7 @@ pub fn create_test_recorder( bank.ticks_per_slot(), &Pubkey::default(), blocktree, + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); poh_recorder.set_bank(&bank); @@ -647,6 +659,7 @@ mod tests { fn test_banking_stage_shutdown1() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let (verified_sender, verified_receiver) = channel(); let (vote_sender, vote_receiver) = channel(); let ledger_path = get_tmp_ledger_path!(); @@ -663,6 +676,7 @@ mod tests { &poh_recorder, verified_receiver, vote_receiver, + &leader_schedule_cache, ); drop(verified_sender); drop(vote_sender); @@ -679,6 +693,7 @@ mod tests { let (mut genesis_block, _mint_keypair) = GenesisBlock::new(2); genesis_block.ticks_per_slot = 4; let bank = Arc::new(Bank::new(&genesis_block)); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = channel(); let (vote_sender, vote_receiver) = channel(); @@ -696,6 +711,7 @@ mod tests { &poh_recorder, verified_receiver, vote_receiver, + &leader_schedule_cache, ); trace!("sending bank"); sleep(Duration::from_millis(600)); @@ -724,6 +740,7 @@ mod tests { solana_logger::setup(); let (genesis_block, mint_keypair) = GenesisBlock::new(10); let bank = Arc::new(Bank::new(&genesis_block)); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = channel(); let (vote_sender, vote_receiver) = channel(); @@ -741,6 +758,7 @@ mod tests { &poh_recorder, verified_receiver, vote_receiver, + &leader_schedule_cache, ); // fund another account so we can send 2 good transactions in a single batch. @@ -862,6 +880,7 @@ mod tests { let entry_receiver = { // start a banking_stage to eat verified receiver let bank = Arc::new(Bank::new(&genesis_block)); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let blocktree = Arc::new( Blocktree::open(&ledger_path) .expect("Expected to be able to open database ledger"), @@ -877,6 +896,7 @@ mod tests { verified_receiver, vote_receiver, 2, + &leader_schedule_cache, ); // wait for banking_stage to eat the packets @@ -933,6 +953,7 @@ mod tests { bank.ticks_per_slot(), &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); @@ -1040,6 +1061,7 @@ mod tests { bank.ticks_per_slot(), &pubkey, &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index 7a1200e067..682b07249b 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -1,7 +1,7 @@ use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; use crate::entry::{Entry, EntrySlice}; -use crate::leader_schedule_utils; +use crate::leader_schedule_cache::LeaderScheduleCache; use rayon::prelude::*; use solana_metrics::counter::Counter; use solana_runtime::bank::Bank; @@ -116,10 +116,9 @@ pub fn process_blocktree( genesis_block: &GenesisBlock, blocktree: &Blocktree, account_paths: Option, -) -> result::Result<(BankForks, Vec), BlocktreeProcessorError> { +) -> result::Result<(BankForks, Vec, LeaderScheduleCache), BlocktreeProcessorError> { let now = Instant::now(); info!("processing ledger..."); - // Setup bank for slot 0 let mut pending_slots = { let slot = 0; @@ -139,6 +138,8 @@ pub fn process_blocktree( vec![(slot, meta, bank, entry_height, last_entry_hash)] }; + let leader_schedule_cache = LeaderScheduleCache::new(*pending_slots[0].2.epoch_schedule()); + let mut fork_info = vec![]; while !pending_slots.is_empty() { let (slot, meta, bank, mut entry_height, mut last_entry_hash) = @@ -216,7 +217,9 @@ pub fn process_blocktree( if next_meta.is_full() { let next_bank = Arc::new(Bank::new_from_parent( &bank, - &leader_schedule_utils::slot_leader_at(next_slot, &bank).unwrap(), + &leader_schedule_cache + .slot_leader_at_else_compute(next_slot, &bank) + .unwrap(), next_slot, )); trace!("Add child bank for slot={}", next_slot); @@ -250,7 +253,7 @@ pub fn process_blocktree( bank_forks_info.len(), ); - Ok((bank_forks, bank_forks_info)) + Ok((bank_forks, bank_forks_info, leader_schedule_cache)) } #[cfg(test)] @@ -327,7 +330,7 @@ mod tests { // slot 2, points at slot 1 fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, 2, 1, blockhash); - let (mut _bank_forks, bank_forks_info) = + let (mut _bank_forks, bank_forks_info, _) = process_blocktree(&genesis_block, &blocktree, None).unwrap(); assert_eq!(bank_forks_info.len(), 1); @@ -387,7 +390,7 @@ mod tests { blocktree.set_root(0).unwrap(); blocktree.set_root(1).unwrap(); - let (bank_forks, bank_forks_info) = + let (bank_forks, bank_forks_info, _) = process_blocktree(&genesis_block, &blocktree, None).unwrap(); assert_eq!(bank_forks_info.len(), 2); // There are two forks @@ -537,7 +540,7 @@ mod tests { .write_entries(1, 0, 0, genesis_block.ticks_per_slot, &entries) .unwrap(); let entry_height = genesis_block.ticks_per_slot + entries.len() as u64; - let (bank_forks, bank_forks_info) = + let (bank_forks, bank_forks_info, _) = process_blocktree(&genesis_block, &blocktree, None).unwrap(); assert_eq!(bank_forks_info.len(), 1); @@ -562,7 +565,7 @@ mod tests { let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); let blocktree = Blocktree::open(&ledger_path).unwrap(); - let (bank_forks, bank_forks_info) = + let (bank_forks, bank_forks_info, _) = process_blocktree(&genesis_block, &blocktree, None).unwrap(); assert_eq!(bank_forks_info.len(), 1); diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index c6947ba9ed..ec55a6d49f 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -9,8 +9,7 @@ use crate::entry::create_ticks; use crate::entry::next_entry_mut; use crate::entry::Entry; use crate::gossip_service::{discover_nodes, GossipService}; -use crate::leader_schedule_utils; -use crate::poh_recorder::PohRecorder; +use crate::leader_schedule_cache::LeaderScheduleCache; use crate::poh_service::{PohService, PohServiceConfig}; use crate::rpc::JsonRpcConfig; use crate::rpc_pubsub_service::PubSubService; @@ -20,6 +19,8 @@ use crate::service::Service; use crate::storage_stage::StorageState; use crate::tpu::Tpu; use crate::tvu::{Sockets, Tvu}; + +use crate::poh_recorder::PohRecorder; use solana_metrics::counter::Counter; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; @@ -94,9 +95,10 @@ impl Fullnode { let id = keypair.pubkey(); assert_eq!(id, node.info.id); - let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) = + let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver, leader_schedule_cache) = new_banks_from_blocktree(ledger_path, config.account_paths.clone()); + let leader_schedule_cache = Arc::new(leader_schedule_cache); let exit = Arc::new(AtomicBool::new(false)); let bank_info = &bank_forks_info[0]; let bank = bank_forks[bank_info.bank_slot].clone(); @@ -107,15 +109,17 @@ impl Fullnode { bank.last_blockhash(), ); let blocktree = Arc::new(blocktree); + let (poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal( bank.tick_height(), bank.last_blockhash(), bank.slot(), - leader_schedule_utils::next_leader_slot(&id, bank.slot(), &bank, Some(&blocktree)), + leader_schedule_cache.next_leader_slot(&id, bank.slot(), &bank, Some(&blocktree)), bank.ticks_per_slot(), &id, &blocktree, blocktree.new_blobs_signals.first().cloned(), + &leader_schedule_cache, ); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit); @@ -231,6 +235,7 @@ impl Fullnode { &poh_recorder, sender.clone(), receiver, + &leader_schedule_cache, &exit, ); let tpu = Tpu::new( @@ -244,6 +249,7 @@ impl Fullnode { config.sigverify_disabled, &blocktree, sender, + &leader_schedule_cache, &exit, ); @@ -275,14 +281,20 @@ impl Fullnode { pub fn new_banks_from_blocktree( blocktree_path: &str, account_paths: Option, -) -> (BankForks, Vec, Blocktree, Receiver) { +) -> ( + BankForks, + Vec, + Blocktree, + Receiver, + LeaderScheduleCache, +) { let genesis_block = GenesisBlock::load(blocktree_path).expect("Expected to successfully open genesis block"); let (blocktree, ledger_signal_receiver) = Blocktree::open_with_signal(blocktree_path) .expect("Expected to successfully open database ledger"); - let (bank_forks, bank_forks_info) = + let (bank_forks, bank_forks_info, leader_schedule_cache) = blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths) .expect("process_blocktree failed"); @@ -291,6 +303,7 @@ pub fn new_banks_from_blocktree( bank_forks_info, blocktree, ledger_signal_receiver, + leader_schedule_cache, ) } diff --git a/core/src/leader_schedule.rs b/core/src/leader_schedule.rs index 3ac5514a92..c637f845d8 100644 --- a/core/src/leader_schedule.rs +++ b/core/src/leader_schedule.rs @@ -5,7 +5,7 @@ use solana_sdk::pubkey::Pubkey; use std::ops::Index; /// Stake-weighted leader schedule for one epoch. -#[derive(Debug, PartialEq)] +#[derive(Debug, Default, PartialEq)] pub struct LeaderSchedule { slot_leaders: Vec, } diff --git a/core/src/leader_schedule_cache.rs b/core/src/leader_schedule_cache.rs new file mode 100644 index 0000000000..79b3e542e8 --- /dev/null +++ b/core/src/leader_schedule_cache.rs @@ -0,0 +1,405 @@ +use crate::blocktree::Blocktree; +use crate::leader_schedule::LeaderSchedule; +use crate::leader_schedule_utils; +use solana_runtime::bank::{Bank, EpochSchedule}; +use solana_sdk::pubkey::Pubkey; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, VecDeque}; +use std::sync::{Arc, RwLock}; + +type CachedSchedules = (HashMap>, VecDeque); +const MAX_SCHEDULES: usize = 10; + +#[derive(Default)] +pub struct LeaderScheduleCache { + // Map from an epoch to a leader schedule for that epoch + pub cached_schedules: RwLock, + epoch_schedule: EpochSchedule, +} + +impl LeaderScheduleCache { + pub fn new_from_bank(bank: &Bank) -> Self { + Self::new(*bank.epoch_schedule()) + } + + pub fn new(epoch_schedule: EpochSchedule) -> Self { + Self { + cached_schedules: RwLock::new((HashMap::new(), VecDeque::new())), + epoch_schedule, + } + } + + pub fn slot_leader_at(&self, slot: u64) -> Option { + let (epoch, slot_index) = self.epoch_schedule.get_epoch_and_slot_index(slot); + self.cached_schedules + .read() + .unwrap() + .0 + .get(&epoch) + .map(|schedule| schedule[slot_index]) + } + + pub fn slot_leader_at_else_compute(&self, slot: u64, bank: &Bank) -> Option { + let cache_result = self.slot_leader_at(slot); + if cache_result.is_some() { + cache_result + } else { + let (epoch, slot_index) = bank.get_epoch_and_slot_index(slot); + if let Some(epoch_schedule) = self.compute_epoch_schedule(epoch, bank) { + Some(epoch_schedule[slot_index]) + } else { + None + } + } + } + + /// Return the next slot after the given current_slot that the given node will be leader + pub fn next_leader_slot( + &self, + pubkey: &Pubkey, + mut current_slot: u64, + bank: &Bank, + blocktree: Option<&Blocktree>, + ) -> Option { + let (mut epoch, mut start_index) = bank.get_epoch_and_slot_index(current_slot + 1); + while let Some(leader_schedule) = self.get_epoch_schedule_else_compute(epoch, bank) { + // clippy thinks I should do this: + // for (i, ) in leader_schedule + // .iter() + // .enumerate() + // .take(bank.get_slots_in_epoch(epoch)) + // .skip(from_slot_index + 1) { + // + // but leader_schedule doesn't implement Iter... + #[allow(clippy::needless_range_loop)] + for i in start_index..bank.get_slots_in_epoch(epoch) { + current_slot += 1; + if *pubkey == leader_schedule[i] { + if let Some(blocktree) = blocktree { + if let Some(meta) = blocktree.meta(current_slot).unwrap() { + // We have already sent a blob for this slot, so skip it + if meta.received > 0 { + continue; + } + } + } + + return Some(current_slot); + } + } + + epoch += 1; + start_index = 0; + } + None + } + + fn get_epoch_schedule_else_compute( + &self, + epoch: u64, + bank: &Bank, + ) -> Option> { + let epoch_schedule = self.cached_schedules.read().unwrap().0.get(&epoch).cloned(); + + if epoch_schedule.is_some() { + epoch_schedule + } else if let Some(epoch_schedule) = self.compute_epoch_schedule(epoch, bank) { + Some(epoch_schedule) + } else { + None + } + } + + fn compute_epoch_schedule(&self, epoch: u64, bank: &Bank) -> Option> { + let leader_schedule = leader_schedule_utils::leader_schedule(epoch, bank); + leader_schedule.map(|leader_schedule| { + let leader_schedule = Arc::new(leader_schedule); + let (ref mut cached_schedules, ref mut order) = *self.cached_schedules.write().unwrap(); + // Check to see if schedule exists in case somebody already inserted in the time we were + // waiting for the lock + let entry = cached_schedules.entry(epoch); + if let Entry::Vacant(v) = entry { + v.insert(leader_schedule.clone()); + order.push_back(epoch); + Self::retain_latest(cached_schedules, order); + } + leader_schedule + }) + } + + fn retain_latest(schedules: &mut HashMap>, order: &mut VecDeque) { + if schedules.len() > MAX_SCHEDULES { + let first = order.pop_front().unwrap(); + schedules.remove(&first); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::blocktree::tests::make_slot_entries; + use crate::voting_keypair::tests::new_vote_account; + use solana_runtime::bank::{Bank, EpochSchedule}; + use solana_sdk::genesis_block::{GenesisBlock, BOOTSTRAP_LEADER_LAMPORTS}; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::thread::Builder; + + use crate::blocktree::get_tmp_ledger_path; + + #[test] + fn test_slot_leader_at_else_compute() { + let slots_per_epoch = 10; + let epoch_schedule = EpochSchedule::new(slots_per_epoch, slots_per_epoch / 2, true); + let cache = LeaderScheduleCache::new(epoch_schedule); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Bank::new(&genesis_block); + + // Nothing in the cache, should return None + assert!(cache.slot_leader_at(bank.slot()).is_none()); + + // Add something to the cache + assert!(cache + .slot_leader_at_else_compute(bank.slot(), &bank) + .is_some()); + assert!(cache.slot_leader_at(bank.slot()).is_some()); + assert_eq!(cache.cached_schedules.read().unwrap().0.len(), 1); + } + + #[test] + fn test_retain_latest() { + let mut cached_schedules = HashMap::new(); + let mut order = VecDeque::new(); + for i in 0..=MAX_SCHEDULES { + cached_schedules.insert(i as u64, Arc::new(LeaderSchedule::default())); + order.push_back(i as u64); + } + LeaderScheduleCache::retain_latest(&mut cached_schedules, &mut order); + assert_eq!(cached_schedules.len(), MAX_SCHEDULES); + let mut keys: Vec<_> = cached_schedules.keys().cloned().collect(); + keys.sort(); + let expected: Vec<_> = (1..=MAX_SCHEDULES as u64).collect(); + let expected_order: VecDeque<_> = (1..=MAX_SCHEDULES as u64).collect(); + assert_eq!(expected, keys); + assert_eq!(expected_order, order); + } + + #[test] + fn test_thread_race_leader_schedule_cache() { + let num_runs = 10; + for _ in 0..num_runs { + run_thread_race() + } + } + + fn run_thread_race() { + let slots_per_epoch = 10; + let epoch_schedule = EpochSchedule::new(slots_per_epoch, slots_per_epoch / 2, true); + let cache = Arc::new(LeaderScheduleCache::new(epoch_schedule)); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + + let num_threads = 10; + let (threads, senders): (Vec<_>, Vec<_>) = (0..num_threads) + .map(|_| { + let cache = cache.clone(); + let bank = bank.clone(); + let (sender, receiver) = channel(); + ( + Builder::new() + .name("test_thread_race_leader_schedule_cache".to_string()) + .spawn(move || { + let _ = receiver.recv(); + cache.slot_leader_at_else_compute(bank.slot(), &bank); + }) + .unwrap(), + sender, + ) + }) + .unzip(); + + for sender in &senders { + sender.send(true).unwrap(); + } + + for t in threads.into_iter() { + t.join().unwrap(); + } + + let (ref cached_schedules, ref order) = *cache.cached_schedules.read().unwrap(); + assert_eq!(cached_schedules.len(), 1); + assert_eq!(order.len(), 1); + } + + #[test] + fn test_next_leader_slot() { + let pubkey = Pubkey::new_rand(); + let mut genesis_block = GenesisBlock::new_with_leader( + BOOTSTRAP_LEADER_LAMPORTS, + &pubkey, + BOOTSTRAP_LEADER_LAMPORTS, + ) + .0; + genesis_block.epoch_warmup = false; + + let bank = Bank::new(&genesis_block); + let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + + assert_eq!( + cache + .slot_leader_at_else_compute(bank.slot(), &bank) + .unwrap(), + pubkey + ); + assert_eq!(cache.next_leader_slot(&pubkey, 0, &bank, None), Some(1)); + assert_eq!(cache.next_leader_slot(&pubkey, 1, &bank, None), Some(2)); + assert_eq!( + cache.next_leader_slot( + &pubkey, + 2 * genesis_block.slots_per_epoch - 1, // no schedule generated for epoch 2 + &bank, + None + ), + None + ); + + assert_eq!( + cache.next_leader_slot( + &Pubkey::new_rand(), // not in leader_schedule + 0, + &bank, + None + ), + None + ); + } + + #[test] + fn test_next_leader_slot_blocktree() { + let pubkey = Pubkey::new_rand(); + let mut genesis_block = GenesisBlock::new_with_leader( + BOOTSTRAP_LEADER_LAMPORTS, + &pubkey, + BOOTSTRAP_LEADER_LAMPORTS, + ) + .0; + genesis_block.epoch_warmup = false; + + let bank = Bank::new(&genesis_block); + let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = Arc::new( + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), + ); + + assert_eq!( + cache + .slot_leader_at_else_compute(bank.slot(), &bank) + .unwrap(), + pubkey + ); + // Check that the next leader slot after 0 is slot 1 + assert_eq!( + cache.next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)), + Some(1) + ); + + // Write a blob into slot 2 that chains to slot 1, + // but slot 1 is empty so should not be skipped + let (blobs, _) = make_slot_entries(2, 1, 1); + blocktree.write_blobs(&blobs[..]).unwrap(); + assert_eq!( + cache.next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)), + Some(1) + ); + + // Write a blob into slot 1 + let (blobs, _) = make_slot_entries(1, 0, 1); + + // Check that slot 1 and 2 are skipped + blocktree.write_blobs(&blobs[..]).unwrap(); + assert_eq!( + cache.next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)), + Some(3) + ); + + // Integrity checks + assert_eq!( + cache.next_leader_slot( + &pubkey, + 2 * genesis_block.slots_per_epoch - 1, // no schedule generated for epoch 2 + &bank, + Some(&blocktree) + ), + None + ); + + assert_eq!( + cache.next_leader_slot( + &Pubkey::new_rand(), // not in leader_schedule + 0, + &bank, + Some(&blocktree) + ), + None + ); + } + Blocktree::destroy(&ledger_path).unwrap(); + } + + #[test] + fn test_next_leader_slot_next_epoch() { + let pubkey = Pubkey::new_rand(); + let (mut genesis_block, mint_keypair) = GenesisBlock::new_with_leader( + 2 * BOOTSTRAP_LEADER_LAMPORTS, + &pubkey, + BOOTSTRAP_LEADER_LAMPORTS, + ); + genesis_block.epoch_warmup = false; + + let bank = Bank::new(&genesis_block); + let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let delegate_id = Pubkey::new_rand(); + + // Create new vote account + let new_voting_keypair = Keypair::new(); + new_vote_account( + &mint_keypair, + &new_voting_keypair, + &delegate_id, + &bank, + BOOTSTRAP_LEADER_LAMPORTS, + ); + + // Have to wait until the epoch at after the epoch stakes generated at genesis + // for the new votes to take effect. + let mut target_slot = 1; + let epoch = bank.get_stakers_epoch(0); + while bank.get_stakers_epoch(target_slot) == epoch { + target_slot += 1; + } + + let bank = Bank::new_from_parent(&Arc::new(bank), &Pubkey::default(), target_slot); + let mut expected_slot = 0; + let epoch = bank.get_stakers_epoch(target_slot); + for i in 0..epoch { + expected_slot += bank.get_slots_in_epoch(i); + } + + let schedule = cache.compute_epoch_schedule(epoch, &bank).unwrap(); + let mut index = 0; + while schedule[index] != delegate_id { + index += 1 + } + + expected_slot += index; + + assert_eq!( + cache.next_leader_slot(&delegate_id, 0, &bank, None), + Some(expected_slot), + ); + } +} diff --git a/core/src/leader_schedule_utils.rs b/core/src/leader_schedule_utils.rs index f8011186a7..1c0bf90eb4 100644 --- a/core/src/leader_schedule_utils.rs +++ b/core/src/leader_schedule_utils.rs @@ -1,4 +1,3 @@ -use crate::blocktree::Blocktree; use crate::leader_schedule::LeaderSchedule; use crate::staking_utils; use solana_runtime::bank::Bank; @@ -6,7 +5,7 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::NUM_CONSECUTIVE_LEADER_SLOTS; /// Return the leader schedule for the given epoch. -fn leader_schedule(epoch_height: u64, bank: &Bank) -> Option { +pub fn leader_schedule(epoch_height: u64, bank: &Bank) -> Option { staking_utils::delegated_stakes_at_epoch(bank, epoch_height).map(|stakes| { let mut seed = [0u8; 32]; seed[0..8].copy_from_slice(&epoch_height.to_le_bytes()); @@ -21,6 +20,23 @@ fn leader_schedule(epoch_height: u64, bank: &Bank) -> Option { }) } +/// Return the leader for the given slot. +pub fn slot_leader_at(slot: u64, bank: &Bank) -> Option { + let (epoch, slot_index) = bank.get_epoch_and_slot_index(slot); + + leader_schedule(epoch, bank).map(|leader_schedule| leader_schedule[slot_index]) +} + +// Returns the number of ticks remaining from the specified tick_height to the end of the +// slot implied by the tick_height +pub fn num_ticks_left_in_slot(bank: &Bank, tick_height: u64) -> u64 { + bank.ticks_per_slot() - tick_height % bank.ticks_per_slot() - 1 +} + +pub fn tick_height_to_slot(ticks_per_slot: u64, tick_height: u64) -> u64 { + tick_height / ticks_per_slot +} + fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) { // Sort first by stake. If stakes are the same, sort by pubkey to ensure a // deterministic result. @@ -37,229 +53,11 @@ fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) { stakes.dedup(); } -/// Return the leader for the given slot. -pub fn slot_leader_at(slot: u64, bank: &Bank) -> Option { - let (epoch, slot_index) = bank.get_epoch_and_slot_index(slot); - - leader_schedule(epoch, bank).map(|leader_schedule| leader_schedule[slot_index]) -} - -/// Return the next slot after the given current_slot that the given node will be leader -pub fn next_leader_slot( - pubkey: &Pubkey, - mut current_slot: u64, - bank: &Bank, - blocktree: Option<&Blocktree>, -) -> Option { - let (mut epoch, mut start_index) = bank.get_epoch_and_slot_index(current_slot + 1); - while let Some(leader_schedule) = leader_schedule(epoch, bank) { - // clippy thinks I should do this: - // for (i, ) in leader_schedule - // .iter() - // .enumerate() - // .take(bank.get_slots_in_epoch(epoch)) - // .skip(from_slot_index + 1) { - // - // but leader_schedule doesn't implement Iter... - #[allow(clippy::needless_range_loop)] - for i in start_index..bank.get_slots_in_epoch(epoch) { - current_slot += 1; - if *pubkey == leader_schedule[i] { - if let Some(blocktree) = blocktree { - if let Some(meta) = blocktree.meta(current_slot).unwrap() { - // We have already sent a blob for this slot, so skip it - if meta.received > 0 { - continue; - } - } - } - - return Some(current_slot); - } - } - - epoch += 1; - start_index = 0; - } - None -} - -// Returns the number of ticks remaining from the specified tick_height to the end of the -// slot implied by the tick_height -pub fn num_ticks_left_in_slot(bank: &Bank, tick_height: u64) -> u64 { - bank.ticks_per_slot() - tick_height % bank.ticks_per_slot() - 1 -} - -pub fn tick_height_to_slot(ticks_per_slot: u64, tick_height: u64) -> u64 { - tick_height / ticks_per_slot -} - #[cfg(test)] mod tests { use super::*; - use crate::blocktree::get_tmp_ledger_path; - use crate::blocktree::tests::make_slot_entries; use crate::staking_utils; - use crate::voting_keypair::tests::new_vote_account; use solana_sdk::genesis_block::{GenesisBlock, BOOTSTRAP_LEADER_LAMPORTS}; - use solana_sdk::signature::{Keypair, KeypairUtil}; - use std::sync::Arc; - - #[test] - fn test_next_leader_slot() { - let pubkey = Pubkey::new_rand(); - let mut genesis_block = GenesisBlock::new_with_leader( - BOOTSTRAP_LEADER_LAMPORTS, - &pubkey, - BOOTSTRAP_LEADER_LAMPORTS, - ) - .0; - genesis_block.epoch_warmup = false; - - let bank = Bank::new(&genesis_block); - assert_eq!(slot_leader_at(bank.slot(), &bank).unwrap(), pubkey); - assert_eq!(next_leader_slot(&pubkey, 0, &bank, None), Some(1)); - assert_eq!(next_leader_slot(&pubkey, 1, &bank, None), Some(2)); - assert_eq!( - next_leader_slot( - &pubkey, - 2 * genesis_block.slots_per_epoch - 1, // no schedule generated for epoch 2 - &bank, - None - ), - None - ); - - assert_eq!( - next_leader_slot( - &Pubkey::new_rand(), // not in leader_schedule - 0, - &bank, - None - ), - None - ); - } - - #[test] - fn test_next_leader_slot_blocktree() { - let pubkey = Pubkey::new_rand(); - let mut genesis_block = GenesisBlock::new_with_leader( - BOOTSTRAP_LEADER_LAMPORTS, - &pubkey, - BOOTSTRAP_LEADER_LAMPORTS, - ) - .0; - genesis_block.epoch_warmup = false; - - let bank = Bank::new(&genesis_block); - let ledger_path = get_tmp_ledger_path!(); - { - let blocktree = Arc::new( - Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), - ); - - assert_eq!(slot_leader_at(bank.slot(), &bank).unwrap(), pubkey); - // Check that the next leader slot after 0 is slot 1 - assert_eq!( - next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)), - Some(1) - ); - - // Write a blob into slot 2 that chains to slot 1, - // but slot 1 is empty so should not be skipped - let (blobs, _) = make_slot_entries(2, 1, 1); - blocktree.write_blobs(&blobs[..]).unwrap(); - assert_eq!( - next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)), - Some(1) - ); - - // Write a blob into slot 1 - let (blobs, _) = make_slot_entries(1, 0, 1); - - // Check that slot 1 and 2 are skipped - blocktree.write_blobs(&blobs[..]).unwrap(); - assert_eq!( - next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)), - Some(3) - ); - - // Integrity checks - assert_eq!( - next_leader_slot( - &pubkey, - 2 * genesis_block.slots_per_epoch - 1, // no schedule generated for epoch 2 - &bank, - Some(&blocktree) - ), - None - ); - - assert_eq!( - next_leader_slot( - &Pubkey::new_rand(), // not in leader_schedule - 0, - &bank, - Some(&blocktree) - ), - None - ); - } - Blocktree::destroy(&ledger_path).unwrap(); - } - - #[test] - fn test_next_leader_slot_next_epoch() { - let pubkey = Pubkey::new_rand(); - let (mut genesis_block, mint_keypair) = GenesisBlock::new_with_leader( - 2 * BOOTSTRAP_LEADER_LAMPORTS, - &pubkey, - BOOTSTRAP_LEADER_LAMPORTS, - ); - genesis_block.epoch_warmup = false; - - let bank = Bank::new(&genesis_block); - let delegate_id = Pubkey::new_rand(); - - // Create new vote account - let new_voting_keypair = Keypair::new(); - new_vote_account( - &mint_keypair, - &new_voting_keypair, - &delegate_id, - &bank, - BOOTSTRAP_LEADER_LAMPORTS, - ); - - // Have to wait until the epoch at after the epoch stakes generated at genesis - // for the new votes to take effect. - let mut target_slot = 1; - let epoch = bank.get_stakers_epoch(0); - while bank.get_stakers_epoch(target_slot) == epoch { - target_slot += 1; - } - - let bank = Bank::new_from_parent(&Arc::new(bank), &Pubkey::default(), target_slot); - let mut expected_slot = 0; - let epoch = bank.get_stakers_epoch(target_slot); - for i in 0..epoch { - expected_slot += bank.get_slots_in_epoch(i); - } - - let schedule = leader_schedule(epoch, &bank).unwrap(); - let mut index = 0; - while schedule[index] != delegate_id { - index += 1 - } - - expected_slot += index; - - assert_eq!( - next_leader_slot(&delegate_id, 0, &bank, None), - Some(expected_slot), - ); - } #[test] fn test_leader_schedule_via_bank() { diff --git a/core/src/lib.rs b/core/src/lib.rs index c6206781f5..6a58b3340f 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -37,6 +37,7 @@ pub mod fullnode; pub mod gen_keys; pub mod gossip_service; pub mod leader_schedule; +pub mod leader_schedule_cache; pub mod leader_schedule_utils; pub mod local_cluster; pub mod local_vote_signer_service; diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index df647243de..deb4d41e23 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -12,7 +12,7 @@ //! use crate::blocktree::Blocktree; use crate::entry::Entry; -use crate::leader_schedule_utils; +use crate::leader_schedule_cache::LeaderScheduleCache; use crate::poh::Poh; use crate::result::{Error, Result}; use solana_runtime::bank::Bank; @@ -53,13 +53,14 @@ pub struct PohRecorder { max_last_leader_grace_ticks: u64, id: Pubkey, blocktree: Arc, + leader_schedule_cache: Arc, } impl PohRecorder { fn clear_bank(&mut self) { if let Some(working_bank) = self.working_bank.take() { let bank = working_bank.bank; - let next_leader_slot = leader_schedule_utils::next_leader_slot( + let next_leader_slot = self.leader_schedule_cache.next_leader_slot( &self.id, bank.slot(), &bank, @@ -278,6 +279,7 @@ impl PohRecorder { id: &Pubkey, blocktree: &Arc, clear_bank_signal: Option>, + leader_schedule_cache: &Arc, ) -> (Self, Receiver) { let poh = Poh::new(last_entry_hash, tick_height); let (sender, receiver) = channel(); @@ -301,6 +303,7 @@ impl PohRecorder { max_last_leader_grace_ticks, id: *id, blocktree: blocktree.clone(), + leader_schedule_cache: leader_schedule_cache.clone(), }, receiver, ) @@ -317,6 +320,7 @@ impl PohRecorder { ticks_per_slot: u64, id: &Pubkey, blocktree: &Arc, + leader_schedule_cache: &Arc, ) -> (Self, Receiver) { Self::new_with_clear_signal( tick_height, @@ -327,6 +331,7 @@ impl PohRecorder { id, blocktree, None, + leader_schedule_cache, ) } @@ -376,6 +381,7 @@ impl PohRecorder { mod tests { use super::*; use crate::blocktree::{get_tmp_ledger_path, Blocktree}; + use crate::leader_schedule_cache::LeaderScheduleCache; use crate::test_tx::test_tx; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::hash; @@ -399,6 +405,7 @@ mod tests { DEFAULT_TICKS_PER_SLOT, &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::default()), ); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 1); @@ -424,6 +431,7 @@ mod tests { DEFAULT_TICKS_PER_SLOT, &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::default()), ); poh_recorder.tick(); poh_recorder.tick(); @@ -448,6 +456,7 @@ mod tests { DEFAULT_TICKS_PER_SLOT, &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::default()), ); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 1); @@ -474,6 +483,7 @@ mod tests { bank.ticks_per_slot(), &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); let working_bank = WorkingBank { @@ -506,6 +516,7 @@ mod tests { bank.ticks_per_slot(), &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); let working_bank = WorkingBank { @@ -550,6 +561,7 @@ mod tests { bank.ticks_per_slot(), &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); poh_recorder.tick(); @@ -592,6 +604,7 @@ mod tests { bank.ticks_per_slot(), &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); let working_bank = WorkingBank { @@ -628,6 +641,7 @@ mod tests { bank.ticks_per_slot(), &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); let working_bank = WorkingBank { @@ -666,6 +680,7 @@ mod tests { bank.ticks_per_slot(), &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); let working_bank = WorkingBank { @@ -711,6 +726,7 @@ mod tests { bank.ticks_per_slot(), &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); let working_bank = WorkingBank { @@ -753,6 +769,7 @@ mod tests { bank.ticks_per_slot(), &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); let working_bank = WorkingBank { @@ -786,6 +803,7 @@ mod tests { DEFAULT_TICKS_PER_SLOT, &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::default()), ); poh_recorder.tick(); poh_recorder.tick(); @@ -816,6 +834,7 @@ mod tests { DEFAULT_TICKS_PER_SLOT, &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::default()), ); poh_recorder.tick(); poh_recorder.tick(); @@ -846,6 +865,7 @@ mod tests { DEFAULT_TICKS_PER_SLOT, &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::default()), ); poh_recorder.tick(); poh_recorder.tick(); @@ -876,6 +896,7 @@ mod tests { bank.ticks_per_slot(), &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); let ticks_per_slot = bank.ticks_per_slot(); let working_bank = WorkingBank { @@ -908,6 +929,7 @@ mod tests { &Pubkey::default(), &Arc::new(blocktree), Some(sender), + &Arc::new(LeaderScheduleCache::default()), ); poh_recorder.set_bank(&bank); poh_recorder.clear_bank(); @@ -936,6 +958,7 @@ mod tests { bank.ticks_per_slot(), &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); let end_slot = 3; @@ -980,6 +1003,7 @@ mod tests { bank.ticks_per_slot(), &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); // Test that with no leader slot, we don't reach the leader tick diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index c994c03dc4..ffdf8cfeaf 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -98,6 +98,7 @@ impl Service for PohService { mod tests { use super::*; use crate::blocktree::{get_tmp_ledger_path, Blocktree}; + use crate::leader_schedule_cache::LeaderScheduleCache; use crate::poh_recorder::WorkingBank; use crate::result::Result; use crate::test_tx::test_tx; @@ -123,6 +124,7 @@ mod tests { bank.ticks_per_slot(), &Pubkey::default(), &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let exit = Arc::new(AtomicBool::new(false)); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 3a8a212763..26dbd916cd 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -5,6 +5,7 @@ use crate::blocktree::Blocktree; use crate::blocktree_processor; use crate::cluster_info::ClusterInfo; use crate::entry::{Entry, EntrySender, EntrySlice}; +use crate::leader_schedule_cache::LeaderScheduleCache; use crate::leader_schedule_utils; use crate::locktower::{Locktower, StakeLockout}; use crate::packet::BlobError; @@ -83,6 +84,7 @@ impl ReplayStage { subscriptions: &Arc, poh_recorder: &Arc>, storage_entry_sender: EntrySender, + leader_schedule_cache: &Arc, ) -> (Self, Receiver<(u64, Pubkey)>) where T: 'static + KeypairUtil + Send + Sync, @@ -103,6 +105,7 @@ impl ReplayStage { .expect("blocktree.set_root() failed at replay_stage startup"); } // Start the replay stage loop + let leader_schedule_cache = leader_schedule_cache.clone(); let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { @@ -115,7 +118,11 @@ impl ReplayStage { break; } - Self::generate_new_bank_forks(&blocktree, &mut bank_forks.write().unwrap()); + Self::generate_new_bank_forks( + &blocktree, + &mut bank_forks.write().unwrap(), + &leader_schedule_cache, + ); let mut is_tpu_bank_active = poh_recorder.lock().unwrap().bank().is_some(); @@ -158,6 +165,7 @@ impl ReplayStage { &bank, &poh_recorder, ticks_per_slot, + &leader_schedule_cache, ); is_tpu_bank_active = false; @@ -185,6 +193,7 @@ impl ReplayStage { poh_slot, reached_leader_tick, grace_ticks, + &leader_schedule_cache, ); } @@ -213,6 +222,7 @@ impl ReplayStage { poh_slot: u64, reached_leader_tick: bool, grace_ticks: u64, + leader_schedule_cache: &Arc, ) { trace!("{} checking poh slot {}", my_id, poh_slot); if bank_forks.read().unwrap().get(poh_slot).is_none() { @@ -225,7 +235,7 @@ impl ReplayStage { }; assert!(parent.is_frozen()); - leader_schedule_utils::slot_leader_at(poh_slot, &parent) + leader_schedule_cache.slot_leader_at_else_compute(poh_slot, &parent) .map(|next_leader| { debug!( "me: {} leader {} at poh slot {}", @@ -327,9 +337,10 @@ impl ReplayStage { bank: &Arc, poh_recorder: &Arc>, ticks_per_slot: u64, + leader_schedule_cache: &Arc, ) { let next_leader_slot = - leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank, Some(blocktree)); + leader_schedule_cache.next_leader_slot(&my_id, bank.slot(), &bank, Some(blocktree)); poh_recorder.lock().unwrap().reset( bank.tick_height(), bank.last_blockhash(), @@ -557,7 +568,11 @@ impl ReplayStage { } } - fn generate_new_bank_forks(blocktree: &Blocktree, forks: &mut BankForks) { + fn generate_new_bank_forks( + blocktree: &Blocktree, + forks: &mut BankForks, + leader_schedule_cache: &Arc, + ) { // Find the next slot that chains to the old slot let frozen_banks = forks.frozen_banks(); let frozen_bank_slots: Vec = frozen_banks.keys().cloned().collect(); @@ -577,7 +592,9 @@ impl ReplayStage { trace!("child already active or frozen {}", child_id); continue; } - let leader = leader_schedule_utils::slot_leader_at(child_id, &parent_bank).unwrap(); + let leader = leader_schedule_cache + .slot_leader_at_else_compute(child_id, &parent_bank) + .unwrap(); info!("new fork:{} parent:{}", child_id, parent_id); forks.insert(Bank::new_from_parent(&parent_bank, &leader, child_id)); } @@ -636,10 +653,10 @@ mod test { // Set up the replay stage { let voting_keypair = Arc::new(Keypair::new()); - let (bank_forks, _bank_forks_info, blocktree, l_receiver) = + let (bank_forks, _bank_forks_info, blocktree, l_receiver, leader_schedule_cache) = new_banks_from_blocktree(&my_ledger_path, None); let bank = bank_forks.working_bank(); - + let leader_schedule_cache = Arc::new(leader_schedule_cache); let blocktree = Arc::new(blocktree); let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank, &blocktree); @@ -656,6 +673,7 @@ mod test { &Arc::new(RpcSubscriptions::default()), &poh_recorder, ledger_writer_sender, + &leader_schedule_cache, ); let vote_ix = vote_instruction::vote(&voting_keypair.pubkey(), vec![Vote::new(0)]); let vote_tx = Transaction::new_signed_instructions( @@ -754,6 +772,7 @@ mod test { let genesis_block = GenesisBlock::new(10_000).0; let bank0 = Bank::new(&genesis_block); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); let mut bank_forks = BankForks::new(0, bank0); bank_forks.working_bank().freeze(); @@ -763,7 +782,11 @@ mod test { blob_slot_1.set_parent(0); blocktree.insert_data_blobs(&vec![blob_slot_1]).unwrap(); assert!(bank_forks.get(1).is_none()); - ReplayStage::generate_new_bank_forks(&blocktree, &mut bank_forks); + ReplayStage::generate_new_bank_forks( + &blocktree, + &mut bank_forks, + &leader_schedule_cache, + ); assert!(bank_forks.get(1).is_some()); // Insert blob for slot 3, generate new forks, check result @@ -772,7 +795,11 @@ mod test { blob_slot_2.set_parent(0); blocktree.insert_data_blobs(&vec![blob_slot_2]).unwrap(); assert!(bank_forks.get(2).is_none()); - ReplayStage::generate_new_bank_forks(&blocktree, &mut bank_forks); + ReplayStage::generate_new_bank_forks( + &blocktree, + &mut bank_forks, + &leader_schedule_cache, + ); assert!(bank_forks.get(1).is_some()); assert!(bank_forks.get(2).is_some()); } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 92a3749ea4..d2cbb706b8 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -8,6 +8,7 @@ use crate::cluster_info::ClusterInfo; use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::entry::EntrySender; use crate::fetch_stage::FetchStage; +use crate::leader_schedule_cache::LeaderScheduleCache; use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; @@ -39,6 +40,7 @@ impl Tpu { sigverify_disabled: bool, blocktree: &Arc, storage_entry_sender: EntrySender, + leader_schedule_cache: &Arc, exit: &Arc, ) -> Self { cluster_info.write().unwrap().set_leader(id); @@ -69,6 +71,7 @@ impl Tpu { poh_recorder, verified_receiver, verified_vote_receiver, + leader_schedule_cache, ); let broadcast_stage = BroadcastStage::new( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 59081ab173..2df8235227 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -19,6 +19,7 @@ use crate::blocktree::Blocktree; use crate::blocktree_processor::BankForksInfo; use crate::cluster_info::ClusterInfo; use crate::entry::{EntryReceiver, EntrySender}; +use crate::leader_schedule_cache::LeaderScheduleCache; use crate::poh_recorder::PohRecorder; use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; @@ -71,6 +72,7 @@ impl Tvu { poh_recorder: &Arc>, storage_entry_sender: EntrySender, storage_entry_receiver: EntryReceiver, + leader_schedule_cache: &Arc, exit: &Arc, ) -> Self where @@ -121,6 +123,7 @@ impl Tvu { subscriptions, poh_recorder, storage_entry_sender, + leader_schedule_cache, ); let blockstream_service = if blockstream.is_some() { @@ -214,6 +217,7 @@ pub mod tests { create_test_recorder(&bank, &blocktree); let voting_keypair = Keypair::new(); let (storage_entry_sender, storage_entry_receiver) = channel(); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let tvu = Tvu::new( &voting_keypair.pubkey(), Some(Arc::new(voting_keypair)), @@ -236,6 +240,7 @@ pub mod tests { &poh_recorder, storage_entry_sender, storage_entry_receiver, + &leader_schedule_cache, &exit, ); exit.store(true, Ordering::Relaxed); diff --git a/core/tests/tvu.rs b/core/tests/tvu.rs index 7946586031..0c85f4a59b 100644 --- a/core/tests/tvu.rs +++ b/core/tests/tvu.rs @@ -83,7 +83,7 @@ fn test_replay() { let tvu_addr = target1.info.tvu; - let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) = + let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver, leader_schedule_cache) = fullnode::new_banks_from_blocktree(&blocktree_path, None); let bank = bank_forks.working_bank(); assert_eq!( @@ -91,6 +91,7 @@ fn test_replay() { starting_mint_balance ); + let leader_schedule_cache = Arc::new(leader_schedule_cache); // start cluster_info1 let bank_forks = Arc::new(RwLock::new(bank_forks)); let mut cluster_info1 = ClusterInfo::new_with_invalid_keypair(target1.info.clone()); @@ -126,6 +127,7 @@ fn test_replay() { &poh_recorder, storage_sender, storage_receiver, + &leader_schedule_cache, &exit, ); diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 048a1a8cdf..b6e76cf314 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -112,7 +112,7 @@ fn main() { stdout().write_all(b"\n]}\n").expect("close array"); } ("verify", _) => match process_blocktree(&genesis_block, &blocktree, None) { - Ok((_bank_forks, bank_forks_info)) => { + Ok((_bank_forks, bank_forks_info, _)) => { println!("{:?}", bank_forks_info); } Err(err) => { diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 8015539dc5..3fca1065e1 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -279,6 +279,10 @@ impl Bank { } } + pub fn epoch_schedule(&self) -> &EpochSchedule { + &self.epoch_schedule + } + /// squash the parent's state up into this Bank, /// this Bank becomes a root pub fn squash(&self) {