From ea0837973e911d01265d9f62ad326e8d82ec9760 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Tue, 26 Feb 2019 09:18:24 -0800 Subject: [PATCH] blocktree_processor to use slots as bank ids, and squash --- src/bank_forks.rs | 22 ++++-- src/blocktree_processor.rs | 138 ++++++++++++++++++++++++++----------- src/replay_stage.rs | 7 +- 3 files changed, 119 insertions(+), 48 deletions(-) diff --git a/src/bank_forks.rs b/src/bank_forks.rs index cfc38ca9f7..e71451acc5 100644 --- a/src/bank_forks.rs +++ b/src/bank_forks.rs @@ -23,8 +23,18 @@ impl BankForks { self.banks[&self.working_bank_id].clone() } + // TODO: use the bank's own ID instead of receiving a parameter pub fn insert(&mut self, bank_id: u64, bank: Bank) { - self.banks.insert(bank_id, Arc::new(bank)); + let mut bank = Arc::new(bank); + self.banks.insert(bank_id, bank.clone()); + + // TODO: this really only needs to look at the first + // parent if we're always calling insert() + // when we construct a child bank + while let Some(parent) = bank.parent() { + self.banks.remove(&parent.id()); + bank = parent; + } } pub fn set_working_bank_id(&mut self, bank_id: u64) { @@ -42,9 +52,8 @@ mod tests { #[test] fn test_bank_forks_root() { let bank = Bank::default(); - let tick_height = bank.tick_height(); let bank_forks = BankForks::new(0, bank); - assert_eq!(bank_forks.working_bank().tick_height(), tick_height); + assert_eq!(bank_forks.working_bank().tick_height(), 0); } #[test] @@ -53,9 +62,8 @@ mod tests { let mut bank_forks = BankForks::new(0, bank); let child_bank = Bank::new_from_parent(&bank_forks.working_bank()); child_bank.register_tick(&Hash::default()); - let child_bank_id = 1; - bank_forks.insert(child_bank_id, child_bank); - bank_forks.set_working_bank_id(child_bank_id); - assert_eq!(bank_forks.working_bank().tick_height(), child_bank_id); + bank_forks.insert(1, child_bank); + bank_forks.set_working_bank_id(1); + assert_eq!(bank_forks.working_bank().tick_height(), 1); } } diff --git a/src/blocktree_processor.rs b/src/blocktree_processor.rs index e98d35ab7e..270810a569 100644 --- a/src/blocktree_processor.rs +++ b/src/blocktree_processor.rs @@ -136,7 +136,6 @@ pub fn process_blocktree( // Setup bank for slot 0 let (mut bank_forks, mut pending_slots) = { let bank0 = Bank::new(&genesis_block); - let bank_id = 0; let slot = 0; let entry_height = 0; leader_scheduler @@ -146,16 +145,16 @@ pub fn process_blocktree( let last_entry_id = bank0.last_id(); ( - BankForks::new(bank_id, bank0), - vec![(slot, bank_id, entry_height, last_entry_id)], + BankForks::new(slot, bank0), + vec![(slot, entry_height, last_entry_id)], ) }; let mut bank_forks_info = vec![]; while !pending_slots.is_empty() { - let (slot, bank_id, mut entry_height, mut last_entry_id) = pending_slots.pop().unwrap(); + let (slot, mut entry_height, mut last_entry_id) = pending_slots.pop().unwrap(); - bank_forks.set_working_bank_id(bank_id); + bank_forks.set_working_bank_id(slot); let bank = bank_forks.working_bank(); // Load the metadata for this slot @@ -207,37 +206,48 @@ pub fn process_blocktree( entry_height += entries.len() as u64; } - match meta.next_slots.len() { - 0 => { - let next_blob_index = { - if meta.is_full() { - 0 - } else { - meta.consumed - } - }; - // Reached the end of this fork. Record the final entry height and last entry id - bank_forks_info.push(BankForksInfo { - bank_id, - entry_height, - last_entry_id, - next_blob_index, - }) - } - 1 => pending_slots.push((meta.next_slots[0], bank_id, entry_height, last_entry_id)), - _ => { - // This is a fork point, create a new child bank for each fork - pending_slots.extend(meta.next_slots.iter().map(|next_slot| { - let child_bank = Bank::new_from_parent(&bank); - trace!("Add child bank for slot={}", next_slot); - let child_bank_id = *next_slot; - bank_forks.insert(child_bank_id, child_bank); - (*next_slot, child_bank_id, entry_height, last_entry_id) - })); - } + let slot_complete = leader_scheduler + .read() + .unwrap() + .num_ticks_left_in_slot(bank.tick_height()) + == 0; + + if !slot_complete || meta.next_slots.is_empty() { + // Reached the end of this fork. Record the final entry height and last entry id + + bank_forks_info.push(BankForksInfo { + bank_id: slot, + entry_height, + last_entry_id, + next_blob_index: meta.consumed, + }); + + continue; + } + + if slot_complete && !meta.next_slots.is_empty() { + // reached end of slot, look for next slots + + // TODO merge with locktower, voting + bank.squash(); + + // This is a fork point, create a new child bank for each fork + pending_slots.extend(meta.next_slots.iter().map(|next_slot| { + let leader = leader_scheduler + .read() + .unwrap() + .get_leader_for_slot(*next_slot) + .unwrap(); + let child_bank = Bank::new_from_parent_and_id(&bank, leader, *next_slot); + trace!("Add child bank for slot={}", next_slot); + bank_forks.insert(*next_slot, child_bank); + (*next_slot, entry_height, last_entry_id) + })); + + // reverse sort by slot, so the next slot to be processed can be pop()ed + // TODO: remove me once leader_scheduler can hang with out-of-order slots? + pending_slots.sort_by(|a, b| b.0.cmp(&a.0)); } - // reverse sort by slot, so the next slot to be processed can be pop()ed - pending_slots.sort_by(|a, b| b.0.cmp(&a.0)); } info!( @@ -275,6 +285,56 @@ mod tests { last_entry_id } + #[test] + fn test_process_blocktree_with_incomplete_slot() { + solana_logger::setup(); + + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); + let (genesis_block, _mint_keypair) = GenesisBlock::new(10_000); + let ticks_per_slot = genesis_block.ticks_per_slot; + + // Create a new ledger with slot 0 full of ticks + let (ledger_path, tick_height, _entry_height, _last_id, last_entry_id) = + create_tmp_sample_blocktree( + "blocktree_with_incomplete_slot", + &genesis_block, + ticks_per_slot - 2, // last tick missing in slot 0 + ); + debug!("ledger_path: {:?}", ledger_path); + assert_eq!(tick_height, ticks_per_slot - 1); + + /* + Build a blocktree in the ledger with the following fork structure: + + slot 0 + | + slot 1 + + where slot 0 is incomplete + */ + + let blocktree = Blocktree::open_config(&ledger_path, ticks_per_slot) + .expect("Expected to successfully open database ledger"); + + // slot 1, points at slot 0 + let _last_slot1_entry_id = + fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, 1, 0, last_entry_id); + + let (mut _bank_forks, bank_forks_info) = + process_blocktree(&genesis_block, &blocktree, &leader_scheduler).unwrap(); + + assert_eq!(bank_forks_info.len(), 1); + assert_eq!( + bank_forks_info[0], + BankForksInfo { + bank_id: 0, // never finished first slot + entry_height: ticks_per_slot - 1, + last_entry_id: last_entry_id, + next_blob_index: ticks_per_slot - 1, + } + ); + } + #[test] fn test_process_blocktree_with_two_forks() { solana_logger::setup(); @@ -332,19 +392,19 @@ mod tests { assert_eq!( bank_forks_info[0], BankForksInfo { - bank_id: 2, // Fork 1 diverged with slot 2 + bank_id: 3, // Fork 1's head is slot 3 entry_height: ticks_per_slot * 4, last_entry_id: last_fork1_entry_id, - next_blob_index: 0, + next_blob_index: ticks_per_slot, // this fork is done, but we need to look for children in replay } ); assert_eq!( bank_forks_info[1], BankForksInfo { - bank_id: 4, // Fork 2 diverged with slot 4 + bank_id: 4, // Fork 2's head is slot 4 entry_height: ticks_per_slot * 3, last_entry_id: last_fork2_entry_id, - next_blob_index: 0, + next_blob_index: ticks_per_slot, // this fork is done, but we need to look for children in replay } ); diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 97a0e35a33..5116bc9af3 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -235,8 +235,11 @@ impl ReplayStage { let old_bank = bank.clone(); // If the next slot is going to be a new slot and we're the leader for that slot, // make a new working bank, set it as the working bank. - if tick_height + 1 == first_tick_in_slot && leader_id == my_id { - bank = Self::create_and_set_working_bank(slot, &bank_forks, &old_bank); + if tick_height + 1 == first_tick_in_slot { + if leader_id == my_id { + bank = Self::create_and_set_working_bank(slot, &bank_forks, &old_bank); + } + current_blob_index = 0; } // Send a rotation notification back to Fullnode to initialize the TPU to the right