Set epoch schedule in set_root in leader schedule cache (#4821)
This commit is contained in:
		| @@ -166,7 +166,8 @@ pub fn process_blocktree( | |||||||
|  |  | ||||||
|     blocktree.set_roots(&[0]).expect("Couldn't set first root"); |     blocktree.set_roots(&[0]).expect("Couldn't set first root"); | ||||||
|  |  | ||||||
|     let leader_schedule_cache = LeaderScheduleCache::new(*pending_slots[0].2.epoch_schedule(), 0); |     let leader_schedule_cache = | ||||||
|  |         LeaderScheduleCache::new(*pending_slots[0].2.epoch_schedule(), &pending_slots[0].2); | ||||||
|  |  | ||||||
|     let mut fork_info = vec![]; |     let mut fork_info = vec![]; | ||||||
|     let mut last_status_report = Instant::now(); |     let mut last_status_report = Instant::now(); | ||||||
| @@ -225,7 +226,7 @@ pub fn process_blocktree( | |||||||
|  |  | ||||||
|         if blocktree.is_root(slot) { |         if blocktree.is_root(slot) { | ||||||
|             root = slot; |             root = slot; | ||||||
|             leader_schedule_cache.set_root(slot); |             leader_schedule_cache.set_root(&bank); | ||||||
|             bank.squash(); |             bank.squash(); | ||||||
|             pending_slots.clear(); |             pending_slots.clear(); | ||||||
|             fork_info.clear(); |             fork_info.clear(); | ||||||
|   | |||||||
| @@ -21,22 +21,42 @@ pub struct LeaderScheduleCache { | |||||||
|  |  | ||||||
| impl LeaderScheduleCache { | impl LeaderScheduleCache { | ||||||
|     pub fn new_from_bank(bank: &Bank) -> Self { |     pub fn new_from_bank(bank: &Bank) -> Self { | ||||||
|         Self::new(*bank.epoch_schedule(), bank.slot()) |         Self::new(*bank.epoch_schedule(), bank) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn new(epoch_schedule: EpochSchedule, root: u64) -> Self { |     pub fn new(epoch_schedule: EpochSchedule, root_bank: &Bank) -> Self { | ||||||
|         let cache = Self { |         let cache = Self { | ||||||
|             cached_schedules: RwLock::new((HashMap::new(), VecDeque::new())), |             cached_schedules: RwLock::new((HashMap::new(), VecDeque::new())), | ||||||
|             epoch_schedule, |             epoch_schedule, | ||||||
|             max_epoch: RwLock::new(0), |             max_epoch: RwLock::new(0), | ||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         cache.set_root(root); |         // This sets the root and calculates the schedule at stakers_epoch(root) | ||||||
|  |         cache.set_root(root_bank); | ||||||
|  |  | ||||||
|  |         // Calculate the schedule for all epochs between 0 and stakers_epoch(root) | ||||||
|  |         let stakers_epoch = epoch_schedule.get_stakers_epoch(root_bank.slot()); | ||||||
|  |         for epoch in 0..stakers_epoch { | ||||||
|  |             let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(epoch); | ||||||
|  |             cache.slot_leader_at(first_slot_in_epoch, Some(root_bank)); | ||||||
|  |         } | ||||||
|         cache |         cache | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn set_root(&self, root: u64) { |     pub fn set_root(&self, root_bank: &Bank) { | ||||||
|         *self.max_epoch.write().unwrap() = self.epoch_schedule.get_stakers_epoch(root); |         let new_max_epoch = self.epoch_schedule.get_stakers_epoch(root_bank.slot()); | ||||||
|  |         let old_max_epoch = { | ||||||
|  |             let mut max_epoch = self.max_epoch.write().unwrap(); | ||||||
|  |             let old_max_epoch = *max_epoch; | ||||||
|  |             *max_epoch = new_max_epoch; | ||||||
|  |             assert!(new_max_epoch >= old_max_epoch); | ||||||
|  |             old_max_epoch | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         // Calculate the epoch as soon as it's rooted | ||||||
|  |         if new_max_epoch > old_max_epoch { | ||||||
|  |             self.compute_epoch_schedule(new_max_epoch, root_bank); | ||||||
|  |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn slot_leader_at(&self, slot: u64, bank: Option<&Bank>) -> Option<Pubkey> { |     pub fn slot_leader_at(&self, slot: u64, bank: Option<&Bank>) -> Option<Pubkey> { | ||||||
| @@ -180,18 +200,38 @@ mod tests { | |||||||
|     use crate::blocktree::get_tmp_ledger_path; |     use crate::blocktree::get_tmp_ledger_path; | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_slot_leader_at() { |     fn test_new_cache() { | ||||||
|         let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(2); |         let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(2); | ||||||
|         let bank = Bank::new(&genesis_block); |         let bank = Bank::new(&genesis_block); | ||||||
|         let cache = LeaderScheduleCache::new_from_bank(&bank); |         let cache = LeaderScheduleCache::new_from_bank(&bank); | ||||||
|  |         assert_eq!(bank.slot(), 0); | ||||||
|  |  | ||||||
|         // Nothing in the cache, should return None |         // Epoch schedule for all epochs in the range: | ||||||
|         assert!(cache.slot_leader_at(bank.slot(), None).is_none()); |         // [0, stakers_epoch(bank.slot())] should | ||||||
|  |         // be calculated by constructor | ||||||
|  |         let epoch_schedule = bank.epoch_schedule(); | ||||||
|  |         let stakers_epoch = bank.get_stakers_epoch(bank.slot()); | ||||||
|  |         for epoch in 0..=stakers_epoch { | ||||||
|  |             let first_slot_in_stakers_epoch = epoch_schedule.get_first_slot_in_epoch(epoch); | ||||||
|  |             let last_slot_in_stakers_epoch = epoch_schedule.get_last_slot_in_epoch(epoch); | ||||||
|  |             assert!(cache | ||||||
|  |                 .slot_leader_at(first_slot_in_stakers_epoch, None) | ||||||
|  |                 .is_some()); | ||||||
|  |             assert!(cache | ||||||
|  |                 .slot_leader_at(last_slot_in_stakers_epoch, None) | ||||||
|  |                 .is_some()); | ||||||
|  |             if epoch == stakers_epoch { | ||||||
|  |                 assert!(cache | ||||||
|  |                     .slot_leader_at(last_slot_in_stakers_epoch + 1, None) | ||||||
|  |                     .is_none()); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |  | ||||||
|         // Add something to the cache |         // Should be a schedule for every epoch just checked | ||||||
|         assert!(cache.slot_leader_at(bank.slot(), Some(&bank)).is_some()); |         assert_eq!( | ||||||
|         assert!(cache.slot_leader_at(bank.slot(), None).is_some()); |             cache.cached_schedules.read().unwrap().0.len() as u64, | ||||||
|         assert_eq!(cache.cached_schedules.read().unwrap().0.len(), 1); |             stakers_epoch + 1 | ||||||
|  |         ); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
| @@ -225,7 +265,7 @@ mod tests { | |||||||
|         let epoch_schedule = EpochSchedule::new(slots_per_epoch, slots_per_epoch / 2, true); |         let epoch_schedule = EpochSchedule::new(slots_per_epoch, slots_per_epoch / 2, true); | ||||||
|         let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(2); |         let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(2); | ||||||
|         let bank = Arc::new(Bank::new(&genesis_block)); |         let bank = Arc::new(Bank::new(&genesis_block)); | ||||||
|         let cache = Arc::new(LeaderScheduleCache::new(epoch_schedule, bank.slot())); |         let cache = Arc::new(LeaderScheduleCache::new(epoch_schedule, &bank)); | ||||||
|  |  | ||||||
|         let num_threads = 10; |         let num_threads = 10; | ||||||
|         let (threads, senders): (Vec<_>, Vec<_>) = (0..num_threads) |         let (threads, senders): (Vec<_>, Vec<_>) = (0..num_threads) | ||||||
| @@ -446,7 +486,7 @@ mod tests { | |||||||
|         assert!(bank2.epoch_vote_accounts(2).is_some()); |         assert!(bank2.epoch_vote_accounts(2).is_some()); | ||||||
|  |  | ||||||
|         // Set root for a slot in epoch 1, so that epoch 2 is now confirmed |         // Set root for a slot in epoch 1, so that epoch 2 is now confirmed | ||||||
|         cache.set_root(95); |         cache.set_root(&bank2); | ||||||
|         assert_eq!(*cache.max_epoch.read().unwrap(), 2); |         assert_eq!(*cache.max_epoch.read().unwrap(), 2); | ||||||
|         assert!(cache.slot_leader_at(96, Some(&bank2)).is_some()); |         assert!(cache.slot_leader_at(96, Some(&bank2)).is_some()); | ||||||
|         assert_eq!(bank2.get_epoch_and_slot_index(223).0, 2); |         assert_eq!(bank2.get_epoch_and_slot_index(223).0, 2); | ||||||
|   | |||||||
| @@ -346,7 +346,7 @@ impl ReplayStage { | |||||||
|             // Set root first in leader schedule_cache before bank_forks because bank_forks.root |             // Set root first in leader schedule_cache before bank_forks because bank_forks.root | ||||||
|             // is consumed by repair_service to update gossip, so we don't want to get blobs for |             // is consumed by repair_service to update gossip, so we don't want to get blobs for | ||||||
|             // repair on gossip before we update leader schedule, otherwise they may get dropped. |             // repair on gossip before we update leader schedule, otherwise they may get dropped. | ||||||
|             leader_schedule_cache.set_root(new_root); |             leader_schedule_cache.set_root(rooted_banks.last().unwrap()); | ||||||
|             bank_forks.write().unwrap().set_root(new_root); |             bank_forks.write().unwrap().set_root(new_root); | ||||||
|             Self::handle_new_root(&bank_forks, progress); |             Self::handle_new_root(&bank_forks, progress); | ||||||
|             root_bank_sender.send(rooted_banks)?; |             root_bank_sender.send(rooted_banks)?; | ||||||
|   | |||||||
| @@ -328,12 +328,6 @@ mod test { | |||||||
|         blob.set_id(&leader_pubkey); |         blob.set_id(&leader_pubkey); | ||||||
|         blob.sign(&leader_keypair); |         blob.sign(&leader_keypair); | ||||||
|  |  | ||||||
|         // without a Bank and blobs not from me, blob gets thrown out |  | ||||||
|         assert_eq!( |  | ||||||
|             should_retransmit_and_persist(&blob, None, &cache, &me_id), |  | ||||||
|             false |  | ||||||
|         ); |  | ||||||
|  |  | ||||||
|         // with a Bank for slot 0, blob continues |         // with a Bank for slot 0, blob continues | ||||||
|         assert_eq!( |         assert_eq!( | ||||||
|             should_retransmit_and_persist(&blob, Some(bank.clone()), &cache, &me_id), |             should_retransmit_and_persist(&blob, Some(bank.clone()), &cache, &me_id), | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user