diff --git a/src/fullnode.rs b/src/fullnode.rs index e6419e835a..be48e79464 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -102,6 +102,7 @@ pub struct Fullnode { rotation_receiver: TvuRotationReceiver, blocktree: Arc, leader_scheduler: Arc>, + bank_forks: Arc>, } impl Fullnode { @@ -252,6 +253,7 @@ impl Fullnode { rotation_receiver, blocktree, leader_scheduler, + bank_forks, } } @@ -278,7 +280,7 @@ impl Fullnode { None => FullnodeReturnType::LeaderToLeaderRotation, // value doesn't matter here... }; self.node_services.tpu.switch_to_leader( - Arc::new(rotation_info.bank), + self.bank_forks.read().unwrap().working_bank(), PohServiceConfig::default(), self.tpu_sockets .iter() diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 6f79aab2fd..f233a05952 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -191,59 +191,65 @@ impl ReplayStage { let to_leader_sender = to_leader_sender.clone(); let subscriptions_ = subscriptions.clone(); - let (bank, last_entry_id, mut current_blob_index) = { + // Gather up all the metadata about the current state of the ledger + let (mut bank, tick_height, last_entry_id, mut current_blob_index) = { let mut bank_forks = bank_forks.write().unwrap(); bank_forks.set_working_bank_id(bank_forks_info[0].bank_id); + let bank = bank_forks.working_bank(); + let tick_height = bank.tick_height(); ( - bank_forks.working_bank(), + bank, + tick_height, bank_forks_info[0].last_entry_id, bank_forks_info[0].entry_height, ) }; + let last_entry_id = Arc::new(RwLock::new(last_entry_id)); - { + // Update Tpu and other fullnode components with the current bank + let (mut current_slot, mut current_leader_id, mut max_tick_height_for_slot) = { let leader_scheduler = leader_scheduler.read().unwrap(); - let slot = leader_scheduler.tick_height_to_slot(bank.tick_height() + 1); + let slot = leader_scheduler.tick_height_to_slot(tick_height + 1); + let first_tick_in_slot = slot * bank.ticks_per_slot(); let leader_id = leader_scheduler .get_leader_for_slot(slot) .expect("Leader not known after processing bank"); trace!("node {:?} scheduled as leader for slot {}", leader_id, slot,); + let old_bank = bank.clone(); + // If the next slot is going to be a new slot and we're the leader for that slot, + // make a new working bank, set it as the working bank. + if tick_height + 1 == first_tick_in_slot && leader_id == my_id { + bank = Self::create_and_set_working_bank(slot, &bank_forks, &old_bank); + } + // Send a rotation notification back to Fullnode to initialize the TPU to the right - // state + // state. After this point, the bank.tick_height() is live, which it means it can + // be updated by the TPU to_leader_sender .send(TvuRotationInfo { - bank: Bank::new_from_parent(&bank), + bank: old_bank, last_entry_id: *last_entry_id.read().unwrap(), slot, leader_id, }) .unwrap(); - } + let max_tick_height_for_slot = + first_tick_in_slot + leader_scheduler.num_ticks_left_in_slot(first_tick_in_slot); + + (Some(slot), leader_id, max_tick_height_for_slot) + }; + + // Start the replay stage loop + let bank_forks = bank_forks.clone(); let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_.clone()); - let mut last_leader_id = leader_scheduler_ - .read() - .unwrap() - .get_leader_for_tick(bank.tick_height() + 1) - .unwrap(); let mut prev_slot = None; - let (mut current_slot, mut max_tick_height_for_slot) = { - let tick_height = bank.tick_height(); - let leader_scheduler = leader_scheduler_.read().unwrap(); - let current_slot = leader_scheduler.tick_height_to_slot(tick_height + 1); - let first_tick_in_current_slot = current_slot * bank.ticks_per_slot(); - ( - Some(current_slot), - first_tick_in_current_slot - + leader_scheduler.num_ticks_left_in_slot(first_tick_in_current_slot), - ) - }; // Loop through blocktree MAX_ENTRY_RECV_PER_ITER entries at a time for each // relevant slot to see if there are any available updates @@ -266,20 +272,35 @@ impl ReplayStage { prev_slot.expect("prev_slot must exist"), ); if new_slot.is_some() { + trace!("{} replay_stage: new_slot found: {:?}", my_id, new_slot); // Reset the state + bank = Self::create_and_set_working_bank( + new_slot.unwrap(), + &bank_forks, + &bank, + ); current_slot = new_slot; - current_blob_index = 0; - let leader_scheduler = leader_scheduler_.read().unwrap(); - let first_tick_in_current_slot = - current_slot.unwrap() * bank.ticks_per_slot(); - max_tick_height_for_slot = first_tick_in_current_slot - + leader_scheduler - .num_ticks_left_in_slot(first_tick_in_current_slot); + Self::reset_state( + bank.ticks_per_slot(), + current_slot.unwrap(), + &mut max_tick_height_for_slot, + &mut current_blob_index, + ); + } else { + continue; } } + // current_slot must be Some(x) by this point + let slot = current_slot.unwrap(); + + // Fetch the next entries from the database let entries = { - if let Some(slot) = current_slot { + if current_leader_id != my_id { + info!( + "{} replay_stage: asking for entries from slot: {}, bi: {}", + my_id, slot, current_blob_index + ); if let Ok(entries) = blocktree.get_slot_entries( slot, current_blob_index, @@ -294,7 +315,6 @@ impl ReplayStage { } }; - // Fetch the next entries from the database if !entries.is_empty() { if let Err(e) = Self::process_entries( entries, @@ -307,45 +327,87 @@ impl ReplayStage { &leader_scheduler_, &subscriptions_, ) { - error!("process_entries failed: {:?}", e); + error!("{} process_entries failed: {:?}", my_id, e); } + } - let current_tick_height = bank.tick_height(); + let current_tick_height = bank.tick_height(); - // We've reached the end of a slot, reset our state and check - // for leader rotation - if max_tick_height_for_slot == current_tick_height { - // Check for leader rotation - let (leader_id, next_slot) = { - let leader_scheduler = leader_scheduler_.read().unwrap(); - ( - leader_scheduler - .get_leader_for_tick(current_tick_height + 1) - .unwrap(), - leader_scheduler.tick_height_to_slot(current_tick_height + 1), - ) - }; + // We've reached the end of a slot, reset our state and check + // for leader rotation + if max_tick_height_for_slot == current_tick_height { + // TODO: replace this with generating an actual leader schedule + // from the bank + leader_scheduler_ + .write() + .unwrap() + .update_tick_height(current_tick_height, &bank); + // Check for leader rotation + let (leader_id, next_slot) = { + let leader_scheduler = leader_scheduler_.read().unwrap(); + ( + leader_scheduler + .get_leader_for_tick(current_tick_height + 1) + .unwrap(), + leader_scheduler.tick_height_to_slot(current_tick_height + 1), + ) + }; - if my_id == leader_id || my_id == last_leader_id { - to_leader_sender - .send(TvuRotationInfo { - bank: Bank::new_from_parent(&bank), - last_entry_id: *last_entry_id.read().unwrap(), - slot: next_slot, - leader_id, - }) - .unwrap(); - } else if leader_id != last_leader_id { - // TODO: Remove this soon once we boot the leader from ClusterInfo - cluster_info.write().unwrap().set_leader(leader_id); + // If we were the leader for the last slot update the last id b/c we + // haven't processed any of the entries for the slot for which we were + // the leader + if current_leader_id == my_id { + let meta = blocktree.meta(slot).unwrap().expect("meta has to exist"); + if meta.last_index == std::u64::MAX { + // Ledger hasn't gotten last blob yet, break and wait + // for a signal + continue; } - - // Check for any slots that chain to this one - prev_slot = current_slot; - current_slot = None; - last_leader_id = leader_id; - continue; + let last_entry = blocktree + .get_slot_entries(slot, meta.last_index, Some(1)) + .unwrap(); + *(last_entry_id.write().unwrap()) = last_entry[0].id; } + + let old_bank = bank.clone(); + prev_slot = current_slot; + if my_id == leader_id { + // Create new bank for next slot if we are the leader for that slot + bank = Self::create_and_set_working_bank( + next_slot, + &bank_forks, + &old_bank, + ); + current_slot = Some(next_slot); + Self::reset_state( + bank.ticks_per_slot(), + next_slot, + &mut max_tick_height_for_slot, + &mut current_blob_index, + ); + } else { + current_slot = None; + } + + if leader_id != current_leader_id { + // TODO: Remove this soon once we boot the leader from ClusterInfo + cluster_info.write().unwrap().set_leader(leader_id); + } + + // Always send rotation signal so that other services like + // RPC can be made aware of last slot's bank + to_leader_sender + .send(TvuRotationInfo { + bank: old_bank, + last_entry_id: *last_entry_id.read().unwrap(), + slot: next_slot, + leader_id, + }) + .unwrap(); + + // Check for any slots that chain to this one + current_leader_id = leader_id; + continue; } } }) @@ -363,6 +425,28 @@ impl ReplayStage { self.exit.store(true, Ordering::Relaxed); } + fn create_and_set_working_bank( + slot: u64, + bank_forks: &Arc>, + parent: &Arc, + ) -> Arc { + let new_bank = Bank::new_from_parent(&parent); + let mut bank_forks = bank_forks.write().unwrap(); + bank_forks.insert(slot, new_bank); + bank_forks.set_working_bank_id(slot); + bank_forks.working_bank() + } + + fn reset_state( + ticks_per_slot: u64, + slot: u64, + max_tick_height_for_slot: &mut u64, + current_blob_index: &mut u64, + ) { + *current_blob_index = 0; + *max_tick_height_for_slot = (slot + 1) * ticks_per_slot - 1; + } + fn get_next_slot(blocktree: &Blocktree, slot_index: u64) -> Option { // Find the next slot that chains to the old slot let next_slots = blocktree.get_slots_since(&[slot_index]).expect("Db error"); diff --git a/src/tvu.rs b/src/tvu.rs index 440f4e8834..44d4dae94a 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -35,7 +35,7 @@ use std::sync::{Arc, RwLock}; use std::thread; pub struct TvuRotationInfo { - pub bank: Bank, // Bank to use + pub bank: Arc, // Bank to use pub last_entry_id: Hash, // last_entry_id of that bank pub slot: u64, // slot height to initiate a rotation pub leader_id: Pubkey, // leader upon rotation