diff --git a/src/bank_forks.rs b/src/bank_forks.rs index 5054b17160..6a943562b4 100644 --- a/src/bank_forks.rs +++ b/src/bank_forks.rs @@ -28,6 +28,18 @@ impl BankForks { } } + pub fn new_from_banks(initial_banks: &[Arc]) -> Self { + let mut banks = HashMap::new(); + let working_bank = initial_banks[0].clone(); + for bank in initial_banks { + banks.insert(bank.slot(), bank.clone()); + } + Self { + banks, + working_bank, + } + } + // TODO: use the bank's own ID instead of receiving a parameter? pub fn insert(&mut self, bank_id: u64, bank: Bank) { let mut bank = Arc::new(bank); diff --git a/src/blocktree.rs b/src/blocktree.rs index 7aec2151e9..e78888770a 100644 --- a/src/blocktree.rs +++ b/src/blocktree.rs @@ -1290,7 +1290,7 @@ impl Iterator for EntryIterator { // Creates a new ledger with slot 0 full of ticks (and only ticks). // -// Returns the last_id that can be used to start slot 1 entries with. +// Returns the last_id that can be used to append entries with. pub fn create_new_ledger(ledger_path: &str, genesis_block: &GenesisBlock) -> Result { let ticks_per_slot = genesis_block.ticks_per_slot; Blocktree::destroy(ledger_path)?; @@ -1298,7 +1298,7 @@ pub fn create_new_ledger(ledger_path: &str, genesis_block: &GenesisBlock) -> Res // Fill slot 0 with ticks that link back to the genesis_block to bootstrap the ledger. let blocktree = Blocktree::open_config(ledger_path, ticks_per_slot)?; - let entries = crate::entry::create_ticks(genesis_block.ticks_per_slot, genesis_block.last_id()); + let entries = crate::entry::create_ticks(ticks_per_slot, genesis_block.last_id()); blocktree.write_entries(0, 0, 0, &entries)?; Ok(entries.last().unwrap().id) diff --git a/src/blocktree_processor.rs b/src/blocktree_processor.rs index a5c111b8f6..b813feaf08 100644 --- a/src/blocktree_processor.rs +++ b/src/blocktree_processor.rs @@ -9,6 +9,7 @@ use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::timing::duration_as_ms; use solana_sdk::timing::MAX_ENTRY_IDS; +use std::sync::Arc; use std::time::Instant; pub fn process_entry(bank: &Bank, entry: &Entry) -> Result<()> { @@ -113,23 +114,25 @@ pub fn process_blocktree( info!("processing ledger..."); // Setup bank for slot 0 - let (mut bank_forks, mut pending_slots) = { - let bank = Bank::new_with_paths(&genesis_block, account_paths); + let mut pending_slots = { let slot = 0; + let bank = Arc::new(Bank::new_with_paths(&genesis_block, account_paths)); let entry_height = 0; let last_entry_id = bank.last_id(); - - ( - BankForks::new(slot, bank), - vec![(slot, entry_height, last_entry_id)], - ) + vec![(slot, bank, entry_height, last_entry_id)] }; - let mut bank_forks_info = vec![]; + let mut fork_info = vec![]; while !pending_slots.is_empty() { - let (slot, mut entry_height, mut last_entry_id) = pending_slots.pop().unwrap(); + let (slot, starting_bank, starting_entry_height, mut last_entry_id) = + pending_slots.pop().unwrap(); - let bank = bank_forks[slot].clone(); + let bank = Arc::new(Bank::new_from_parent_and_id( + &starting_bank, + leader_schedule_utils::slot_leader_at(slot, &starting_bank), + starting_bank.slot(), + )); + let mut entry_height = starting_entry_height; // Load the metadata for this slot let meta = blocktree @@ -181,32 +184,50 @@ pub fn process_blocktree( let slot_complete = leader_schedule_utils::num_ticks_left_in_slot(&bank, bank.tick_height()) == 0; + if !slot_complete { + // Slot was not complete, clear out any partial entries + // TODO: Walk |meta.next_slots| and clear all child slots too? + blocktree.reset_slot_consumed(slot).map_err(|err| { + warn!("Failed to reset partial slot {}: {:?}", slot, err); + BankError::LedgerVerificationFailed + })?; - if !slot_complete || meta.next_slots.is_empty() { - // Reached the end of this fork. Record the final entry height and last entry id - - bank_forks_info.push(BankForksInfo { + let bfi = BankForksInfo { bank_id: slot, - entry_height, - last_entry_id, - next_blob_index: meta.consumed, - }); - + entry_height: starting_entry_height, + last_entry_id: starting_bank.last_id(), + next_blob_index: 0, + }; + fork_info.push((starting_bank, bfi)); continue; } - // reached end of slot, look for next slots - // TODO merge with locktower, voting bank.squash(); + if meta.next_slots.is_empty() { + // Reached the end of this fork. Record the final entry height and last entry id + + let bfi = BankForksInfo { + bank_id: slot, + entry_height, + last_entry_id: bank.last_id(), + next_blob_index: meta.consumed, + }; + fork_info.push((bank, bfi)); + continue; + } + // This is a fork point, create a new child bank for each fork pending_slots.extend(meta.next_slots.iter().map(|next_slot| { - let leader = leader_schedule_utils::slot_leader_at(*next_slot, &bank); - let child_bank = Bank::new_from_parent_and_id(&bank, leader, *next_slot); + let child_bank = Arc::new(Bank::new_from_parent_and_id( + &bank, + leader_schedule_utils::slot_leader_at(*next_slot, &bank), + *next_slot, + )); trace!("Add child bank for slot={}", next_slot); - bank_forks.insert(*next_slot, child_bank); - (*next_slot, entry_height, last_entry_id) + // bank_forks.insert(*next_slot, child_bank); + (*next_slot, child_bank, entry_height, last_entry_id) })); // reverse sort by slot, so the next slot to be processed can be pop()ed @@ -214,6 +235,8 @@ pub fn process_blocktree( pending_slots.sort_by(|a, b| b.0.cmp(&a.0)); } + let (banks, bank_forks_info): (Vec<_>, Vec<_>) = fork_info.into_iter().unzip(); + let bank_forks = BankForks::new_from_banks(&banks); info!( "processed ledger in {}ms, forks={}...", duration_as_ms(&now.elapsed()), @@ -275,7 +298,7 @@ mod tests { let blocktree = Blocktree::open_config(&ledger_path, ticks_per_slot) .expect("Expected to successfully open database ledger"); - let expected_last_entry_id; + let expected_last_entry_id = last_id; // Write slot 1 // slot 1, points at slot 0. Missing one tick @@ -286,7 +309,6 @@ mod tests { last_id = entries.last().unwrap().id; entries.pop(); - expected_last_entry_id = entries.last().unwrap().id; let blobs = entries_to_blobs(&entries, slot, parent_slot); blocktree.insert_data_blobs(blobs.iter()).unwrap(); @@ -303,9 +325,9 @@ mod tests { bank_forks_info[0], BankForksInfo { bank_id: 1, // never finished first slot - entry_height: 2 * ticks_per_slot - 1, + entry_height: ticks_per_slot, last_entry_id: expected_last_entry_id, - next_blob_index: ticks_per_slot - 1, + next_blob_index: 0, } ); } @@ -379,7 +401,7 @@ mod tests { // Ensure bank_forks holds the right banks for info in bank_forks_info { - assert_eq!(bank_forks[info.bank_id].last_id(), info.last_entry_id) + assert_eq!(bank_forks[info.bank_id].slot(), info.bank_id); } } @@ -488,6 +510,28 @@ mod tests { assert_eq!(bank.last_id(), entries.last().unwrap().id); } + #[test] + fn test_process_ledger_with_one_tick_per_slot() { + let (mut genesis_block, _mint_keypair) = GenesisBlock::new(123); + genesis_block.ticks_per_slot = 1; + let (ledger_path, _last_id) = create_new_tmp_ledger!(&genesis_block); + + let blocktree = Blocktree::open(&ledger_path).unwrap(); + let (bank_forks, bank_forks_info) = + process_blocktree(&genesis_block, &blocktree, None).unwrap(); + + assert_eq!(bank_forks_info.len(), 1); + assert_eq!( + bank_forks_info[0], + BankForksInfo { + bank_id: 0, + entry_height: 1, + } + ); + let bank = bank_forks[0].clone(); + assert_eq!(bank.tick_height(), 0); + } + #[test] fn test_par_process_entries_tick() { let (genesis_block, _mint_keypair) = GenesisBlock::new(1000); diff --git a/src/replay_stage.rs b/src/replay_stage.rs index f838a34eb0..6368b00c0c 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -195,6 +195,7 @@ impl ReplayStage { bank_forks_info[0].next_blob_index, ) }; + assert_eq!(bank.last_id(), last_entry_id); // TODO: remove last_entry_id, this assert proves it's unnecessary // Update Tpu and other fullnode components with the current bank let (mut current_slot, mut current_leader_id, mut max_tick_height_for_slot) = { @@ -213,6 +214,7 @@ impl ReplayStage { } current_blob_index = 0; } + assert_eq!(current_blob_index, 0); // TODO: remove next_blob_index, this assert proves it's unnecessary // Send a rotation notification back to Fullnode to initialize the TPU to the right // state. After this point, the bank.tick_height() is live, which it means it can diff --git a/src/tvu.rs b/src/tvu.rs index 37066ee153..82ef5effb2 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -203,7 +203,6 @@ pub mod tests { use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT; use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; - use solana_sdk::hash::Hash; #[test] fn test_tvu_exit() { @@ -219,7 +218,7 @@ pub mod tests { let bank_forks_info = vec![BankForksInfo { bank_id: 0, entry_height: 0, - last_entry_id: Hash::default(), + last_entry_id: bank_forks.working_bank().last_id(), next_blob_index: 0, }]; diff --git a/tests/multinode.rs b/tests/multinode.rs index 4139acf977..eac0783cca 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -4,6 +4,7 @@ extern crate solana; use log::*; use solana::blob_fetch_stage::BlobFetchStage; use solana::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree, Blocktree}; +use solana::blocktree_processor; use solana::client::mk_client; use solana::cluster_info::{Node, NodeInfo}; use solana::contact_info::ContactInfo; @@ -43,6 +44,68 @@ fn read_ledger(ledger_path: &str, ticks_per_slot: u64) -> Vec { .collect() } +#[test] +fn test_start_with_partial_slot_in_ledger() { + solana_logger::setup(); + + let leader_keypair = Arc::new(Keypair::new()); + + let ticks_per_slot = 4; + let (mut genesis_block, _mint_keypair) = + GenesisBlock::new_with_leader(10_000, leader_keypair.pubkey(), 500); + genesis_block.ticks_per_slot = ticks_per_slot; + + for i in 0..ticks_per_slot { + info!("Ledger will contain {} ticks in slot 1...", i); + + let (ledger_path, last_id) = create_new_tmp_ledger!(&genesis_block); + // Write `i` extra ticks into ledger to create a partially filled slot + { + let blocktree = Blocktree::open_config(&ledger_path, ticks_per_slot).unwrap(); + let entries = solana::entry::create_ticks(i, last_id); + blocktree.write_entries(1, 0, 0, &entries).unwrap(); + } + + let leader = Fullnode::new( + Node::new_localhost_with_pubkey(leader_keypair.pubkey()), + &leader_keypair, + &ledger_path, + VotingKeypair::new_local(&leader_keypair), + None, + &FullnodeConfig::default(), + ); + let (rotation_sender, rotation_receiver) = channel(); + let leader_exit = leader.run(Some(rotation_sender)); + + // Wait for the fullnode to rotate twice, indicating that it was able to ingest the ledger + // and work with it + assert_eq!( + (FullnodeReturnType::LeaderToLeaderRotation, 1), + rotation_receiver.recv().unwrap() + ); + assert_eq!( + (FullnodeReturnType::LeaderToLeaderRotation, 2), + rotation_receiver.recv().unwrap() + ); + + info!("Pass"); + leader_exit(); + + // Ensure the ledger is still valid + { + let blocktree = Blocktree::open_config(&ledger_path, ticks_per_slot).unwrap(); + let (_bank_forks, bank_forks_info) = + blocktree_processor::process_blocktree(&genesis_block, &blocktree, None).unwrap(); + assert_eq!(bank_forks_info.len(), 1); + + // The node processed two slots, ensure entry_height reflects that + assert!(bank_forks_info[0].entry_height >= ticks_per_slot * 2); + } + + remove_dir_all(ledger_path).unwrap(); + } +} + #[test] fn test_multi_node_ledger_window() -> result::Result<()> { solana_logger::setup(); diff --git a/tests/tvu.rs b/tests/tvu.rs index 9ddabed7d4..421cd200bb 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -19,7 +19,6 @@ use solana::tvu::{Sockets, Tvu}; use solana::voting_keypair::VotingKeypair; use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; -use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; use std::fs::remove_dir_all; @@ -86,8 +85,9 @@ fn test_replay() { let ticks_per_slot = genesis_block.ticks_per_slot; let tvu_addr = target1.info.tvu; - let mut cur_hash = Hash::default(); - let bank_forks = BankForks::new(0, Bank::new(&genesis_block)); + let bank = Bank::new(&genesis_block); + let mut cur_hash = bank.last_id(); + let bank_forks = BankForks::new(0, bank); let bank_forks_info = vec![BankForksInfo { bank_id: 0, entry_height: 0,