Prevent Requests/Caching of leader schedules for epochs beyond confirmed roots (#4033)
automerge
This commit is contained in:
		| @@ -266,7 +266,7 @@ impl BankingStage { | ||||
|         let poh = poh_recorder.lock().unwrap(); | ||||
|         let leader_id = match poh.bank() { | ||||
|             Some(bank) => leader_schedule_cache | ||||
|                 .slot_leader_at_else_compute(bank.slot() + 1, &bank) | ||||
|                 .slot_leader_at(bank.slot() + 1, Some(&bank)) | ||||
|                 .unwrap_or_default(), | ||||
|             None => { | ||||
|                 if poh.would_be_leader(DEFAULT_TICKS_PER_SLOT) { | ||||
|   | ||||
| @@ -131,7 +131,7 @@ pub fn process_blocktree( | ||||
|         vec![(slot, meta, bank, entry_height, last_entry_hash)] | ||||
|     }; | ||||
|  | ||||
|     let leader_schedule_cache = LeaderScheduleCache::new(*pending_slots[0].2.epoch_schedule()); | ||||
|     let leader_schedule_cache = LeaderScheduleCache::new(*pending_slots[0].2.epoch_schedule(), 0); | ||||
|  | ||||
|     let mut fork_info = vec![]; | ||||
|     let mut last_status_report = Instant::now(); | ||||
| @@ -188,6 +188,7 @@ pub fn process_blocktree( | ||||
|         bank.freeze(); // all banks handled by this routine are created from complete slots | ||||
|  | ||||
|         if blocktree.is_root(slot) { | ||||
|             leader_schedule_cache.set_root(slot); | ||||
|             bank.squash(); | ||||
|             pending_slots.clear(); | ||||
|             fork_info.clear(); | ||||
| @@ -219,7 +220,7 @@ pub fn process_blocktree( | ||||
|                 let next_bank = Arc::new(Bank::new_from_parent( | ||||
|                     &bank, | ||||
|                     &leader_schedule_cache | ||||
|                         .slot_leader_at_else_compute(next_slot, &bank) | ||||
|                         .slot_leader_at(next_slot, Some(&bank)) | ||||
|                         .unwrap(), | ||||
|                     next_slot, | ||||
|                 )); | ||||
|   | ||||
| @@ -15,41 +15,34 @@ pub struct LeaderScheduleCache { | ||||
|     // Map from an epoch to a leader schedule for that epoch | ||||
|     pub cached_schedules: RwLock<CachedSchedules>, | ||||
|     epoch_schedule: EpochSchedule, | ||||
|     max_epoch: RwLock<u64>, | ||||
| } | ||||
|  | ||||
| impl LeaderScheduleCache { | ||||
|     pub fn new_from_bank(bank: &Bank) -> Self { | ||||
|         Self::new(*bank.epoch_schedule()) | ||||
|         Self::new(*bank.epoch_schedule(), bank.slot()) | ||||
|     } | ||||
|  | ||||
|     pub fn new(epoch_schedule: EpochSchedule) -> Self { | ||||
|         Self { | ||||
|     pub fn new(epoch_schedule: EpochSchedule, root: u64) -> Self { | ||||
|         let cache = Self { | ||||
|             cached_schedules: RwLock::new((HashMap::new(), VecDeque::new())), | ||||
|             epoch_schedule, | ||||
|         } | ||||
|             max_epoch: RwLock::new(0), | ||||
|         }; | ||||
|  | ||||
|         cache.set_root(root); | ||||
|         cache | ||||
|     } | ||||
|  | ||||
|     pub fn slot_leader_at(&self, slot: u64) -> Option<Pubkey> { | ||||
|         let (epoch, slot_index) = self.epoch_schedule.get_epoch_and_slot_index(slot); | ||||
|         self.cached_schedules | ||||
|             .read() | ||||
|             .unwrap() | ||||
|             .0 | ||||
|             .get(&epoch) | ||||
|             .map(|schedule| schedule[slot_index]) | ||||
|     pub fn set_root(&self, root: u64) { | ||||
|         *self.max_epoch.write().unwrap() = self.epoch_schedule.get_stakers_epoch(root); | ||||
|     } | ||||
|  | ||||
|     pub fn slot_leader_at_else_compute(&self, slot: u64, bank: &Bank) -> Option<Pubkey> { | ||||
|         let cache_result = self.slot_leader_at(slot); | ||||
|         if cache_result.is_some() { | ||||
|             cache_result | ||||
|     pub fn slot_leader_at(&self, slot: u64, bank: Option<&Bank>) -> Option<Pubkey> { | ||||
|         if let Some(bank) = bank { | ||||
|             self.slot_leader_at_else_compute(slot, bank) | ||||
|         } else { | ||||
|             let (epoch, slot_index) = bank.get_epoch_and_slot_index(slot); | ||||
|             if let Some(epoch_schedule) = self.compute_epoch_schedule(epoch, bank) { | ||||
|                 Some(epoch_schedule[slot_index]) | ||||
|             } else { | ||||
|                 None | ||||
|             } | ||||
|             self.slot_leader_at_no_compute(slot) | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -94,6 +87,39 @@ impl LeaderScheduleCache { | ||||
|         None | ||||
|     } | ||||
|  | ||||
|     fn slot_leader_at_no_compute(&self, slot: u64) -> Option<Pubkey> { | ||||
|         let (epoch, slot_index) = self.epoch_schedule.get_epoch_and_slot_index(slot); | ||||
|         self.cached_schedules | ||||
|             .read() | ||||
|             .unwrap() | ||||
|             .0 | ||||
|             .get(&epoch) | ||||
|             .map(|schedule| schedule[slot_index]) | ||||
|     } | ||||
|  | ||||
|     fn slot_leader_at_else_compute(&self, slot: u64, bank: &Bank) -> Option<Pubkey> { | ||||
|         let cache_result = self.slot_leader_at_no_compute(slot); | ||||
|         // Forbid asking for slots in an unconfirmed epoch | ||||
|         let bank_epoch = self.epoch_schedule.get_epoch_and_slot_index(slot).0; | ||||
|         if bank_epoch > *self.max_epoch.read().unwrap() { | ||||
|             error!( | ||||
|                 "Requested leader in slot: {} of unconfirmed epoch: {}", | ||||
|                 slot, bank_epoch | ||||
|             ); | ||||
|             return None; | ||||
|         } | ||||
|         if cache_result.is_some() { | ||||
|             cache_result | ||||
|         } else { | ||||
|             let (epoch, slot_index) = bank.get_epoch_and_slot_index(slot); | ||||
|             if let Some(epoch_schedule) = self.compute_epoch_schedule(epoch, bank) { | ||||
|                 Some(epoch_schedule[slot_index]) | ||||
|             } else { | ||||
|                 None | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn get_epoch_schedule_else_compute( | ||||
|         &self, | ||||
|         epoch: u64, | ||||
| @@ -150,19 +176,17 @@ mod tests { | ||||
|     use crate::blocktree::get_tmp_ledger_path; | ||||
|  | ||||
|     #[test] | ||||
|     fn test_slot_leader_at_else_compute() { | ||||
|     fn test_slot_leader_at() { | ||||
|         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||
|         let bank = Bank::new(&genesis_block); | ||||
|         let cache = LeaderScheduleCache::new_from_bank(&bank); | ||||
|  | ||||
|         // Nothing in the cache, should return None | ||||
|         assert!(cache.slot_leader_at(bank.slot()).is_none()); | ||||
|         assert!(cache.slot_leader_at(bank.slot(), None).is_none()); | ||||
|  | ||||
|         // Add something to the cache | ||||
|         assert!(cache | ||||
|             .slot_leader_at_else_compute(bank.slot(), &bank) | ||||
|             .is_some()); | ||||
|         assert!(cache.slot_leader_at(bank.slot()).is_some()); | ||||
|         assert!(cache.slot_leader_at(bank.slot(), Some(&bank)).is_some()); | ||||
|         assert!(cache.slot_leader_at(bank.slot(), None).is_some()); | ||||
|         assert_eq!(cache.cached_schedules.read().unwrap().0.len(), 1); | ||||
|     } | ||||
|  | ||||
| @@ -195,9 +219,9 @@ mod tests { | ||||
|     fn run_thread_race() { | ||||
|         let slots_per_epoch = MINIMUM_SLOT_LENGTH as u64; | ||||
|         let epoch_schedule = EpochSchedule::new(slots_per_epoch, slots_per_epoch / 2, true); | ||||
|         let cache = Arc::new(LeaderScheduleCache::new(epoch_schedule)); | ||||
|         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||
|         let bank = Arc::new(Bank::new(&genesis_block)); | ||||
|         let cache = Arc::new(LeaderScheduleCache::new(epoch_schedule, bank.slot())); | ||||
|  | ||||
|         let num_threads = 10; | ||||
|         let (threads, senders): (Vec<_>, Vec<_>) = (0..num_threads) | ||||
| @@ -210,7 +234,7 @@ mod tests { | ||||
|                         .name("test_thread_race_leader_schedule_cache".to_string()) | ||||
|                         .spawn(move || { | ||||
|                             let _ = receiver.recv(); | ||||
|                             cache.slot_leader_at_else_compute(bank.slot(), &bank); | ||||
|                             cache.slot_leader_at(bank.slot(), Some(&bank)); | ||||
|                         }) | ||||
|                         .unwrap(), | ||||
|                     sender, | ||||
| @@ -246,9 +270,7 @@ mod tests { | ||||
|         let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); | ||||
|  | ||||
|         assert_eq!( | ||||
|             cache | ||||
|                 .slot_leader_at_else_compute(bank.slot(), &bank) | ||||
|                 .unwrap(), | ||||
|             cache.slot_leader_at(bank.slot(), Some(&bank)).unwrap(), | ||||
|             pubkey | ||||
|         ); | ||||
|         assert_eq!(cache.next_leader_slot(&pubkey, 0, &bank, None), Some(1)); | ||||
| @@ -294,9 +316,7 @@ mod tests { | ||||
|             ); | ||||
|  | ||||
|             assert_eq!( | ||||
|                 cache | ||||
|                     .slot_leader_at_else_compute(bank.slot(), &bank) | ||||
|                     .unwrap(), | ||||
|                 cache.slot_leader_at(bank.slot(), Some(&bank)).unwrap(), | ||||
|                 pubkey | ||||
|             ); | ||||
|             // Check that the next leader slot after 0 is slot 1 | ||||
| @@ -400,4 +420,35 @@ mod tests { | ||||
|             Some(expected_slot), | ||||
|         ); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_schedule_for_unconfirmed_epoch() { | ||||
|         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||
|         let bank = Arc::new(Bank::new(&genesis_block)); | ||||
|         let cache = LeaderScheduleCache::new_from_bank(&bank); | ||||
|  | ||||
|         assert_eq!(*cache.max_epoch.read().unwrap(), 1); | ||||
|  | ||||
|         // Asking for the leader for the last slot in epoch 1 is ok b/c | ||||
|         // epoch 1 is confirmed | ||||
|         assert_eq!(bank.get_epoch_and_slot_index(95).0, 1); | ||||
|         assert!(cache.slot_leader_at(95, Some(&bank)).is_some()); | ||||
|  | ||||
|         // Asking for the lader for the first slot in epoch 2 is not ok | ||||
|         // b/c epoch 2 is unconfirmed | ||||
|         assert_eq!(bank.get_epoch_and_slot_index(96).0, 2); | ||||
|         assert!(cache.slot_leader_at(96, Some(&bank)).is_none()); | ||||
|  | ||||
|         let bank2 = Bank::new_from_parent(&bank, &Pubkey::new_rand(), 95); | ||||
|         assert!(bank2.epoch_vote_accounts(2).is_some()); | ||||
|  | ||||
|         // Set root for a slot in epoch 1, so that epoch 2 is now confirmed | ||||
|         cache.set_root(95); | ||||
|         assert_eq!(*cache.max_epoch.read().unwrap(), 2); | ||||
|         assert!(cache.slot_leader_at(96, Some(&bank2)).is_some()); | ||||
|         assert_eq!(bank2.get_epoch_and_slot_index(223).0, 2); | ||||
|         assert!(cache.slot_leader_at(223, Some(&bank2)).is_some()); | ||||
|         assert_eq!(bank2.get_epoch_and_slot_index(224).0, 3); | ||||
|         assert!(cache.slot_leader_at(224, Some(&bank2)).is_none()); | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -157,6 +157,7 @@ impl ReplayStage { | ||||
|                             &vote_account, | ||||
|                             &cluster_info, | ||||
|                             &blocktree, | ||||
|                             &leader_schedule_cache, | ||||
|                         )?; | ||||
|  | ||||
|                         Self::reset_poh_recorder( | ||||
| @@ -235,7 +236,7 @@ impl ReplayStage { | ||||
|             }; | ||||
|             assert!(parent.is_frozen()); | ||||
|  | ||||
|             leader_schedule_cache.slot_leader_at_else_compute(poh_slot, &parent) | ||||
|             leader_schedule_cache.slot_leader_at(poh_slot, Some(&parent)) | ||||
|                 .map(|next_leader| { | ||||
|                     debug!( | ||||
|                         "me: {} leader {} at poh slot {}", | ||||
| @@ -308,12 +309,14 @@ impl ReplayStage { | ||||
|         vote_account_pubkey: &Pubkey, | ||||
|         cluster_info: &Arc<RwLock<ClusterInfo>>, | ||||
|         blocktree: &Arc<Blocktree>, | ||||
|         leader_schedule_cache: &Arc<LeaderScheduleCache>, | ||||
|     ) -> Result<()> | ||||
|     where | ||||
|         T: 'static + KeypairUtil + Send + Sync, | ||||
|     { | ||||
|         if let Some(new_root) = locktower.record_vote(bank.slot()) { | ||||
|             bank_forks.write().unwrap().set_root(new_root); | ||||
|             leader_schedule_cache.set_root(new_root); | ||||
|             blocktree.set_root(new_root)?; | ||||
|             Self::handle_new_root(&bank_forks, progress); | ||||
|         } | ||||
| @@ -593,7 +596,7 @@ impl ReplayStage { | ||||
|                     continue; | ||||
|                 } | ||||
|                 let leader = leader_schedule_cache | ||||
|                     .slot_leader_at_else_compute(child_id, &parent_bank) | ||||
|                     .slot_leader_at(child_id, Some(&parent_bank)) | ||||
|                     .unwrap(); | ||||
|                 info!("new fork:{} parent:{}", child_id, parent_id); | ||||
|                 forks.insert(Bank::new_from_parent(&parent_bank, &leader, child_id)); | ||||
|   | ||||
| @@ -51,7 +51,7 @@ fn retransmit( | ||||
|     ); | ||||
|     for blob in &blobs { | ||||
|         let leader = leader_schedule_cache | ||||
|             .slot_leader_at_else_compute(blob.read().unwrap().slot(), r_bank.as_ref()); | ||||
|             .slot_leader_at(blob.read().unwrap().slot(), Some(r_bank.as_ref())); | ||||
|         if blob.read().unwrap().meta.forward { | ||||
|             ClusterInfo::retransmit_to(&cluster_info, &neighbors, blob, leader, sock, true)?; | ||||
|             ClusterInfo::retransmit_to(&cluster_info, &children, blob, leader, sock, false)?; | ||||
|   | ||||
| @@ -85,10 +85,10 @@ fn should_retransmit_and_persist( | ||||
|     my_id: &Pubkey, | ||||
| ) -> bool { | ||||
|     let slot_leader_id = match bank { | ||||
|         None => leader_schedule_cache.and_then(|cache| cache.slot_leader_at(blob.slot())), | ||||
|         None => leader_schedule_cache.and_then(|cache| cache.slot_leader_at(blob.slot(), None)), | ||||
|         Some(bank) => match leader_schedule_cache { | ||||
|             None => slot_leader_at(blob.slot(), &bank), | ||||
|             Some(cache) => cache.slot_leader_at_else_compute(blob.slot(), bank), | ||||
|             Some(cache) => cache.slot_leader_at(blob.slot(), Some(bank)), | ||||
|         }, | ||||
|     }; | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user