From 6ed2e4c187b58e744753056c5acd960cd5deefbd Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Wed, 20 Feb 2019 15:42:35 -0800 Subject: [PATCH] process_blocktree now loads forks --- src/blocktree_processor.rs | 337 ++++++++++++++++++++++--------------- src/fullnode.rs | 19 ++- 2 files changed, 212 insertions(+), 144 deletions(-) diff --git a/src/blocktree_processor.rs b/src/blocktree_processor.rs index ab25f1c527..e3bcf9990b 100644 --- a/src/blocktree_processor.rs +++ b/src/blocktree_processor.rs @@ -2,7 +2,6 @@ use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; use crate::entry::{Entry, EntrySlice}; use crate::leader_scheduler::LeaderScheduler; -use itertools::Itertools; use log::Level; use rayon::prelude::*; use solana_metrics::counter::Counter; @@ -14,8 +13,6 @@ use solana_sdk::timing::MAX_ENTRY_IDS; use std::sync::{Arc, RwLock}; use std::time::Instant; -pub const VERIFY_BLOCK_SIZE: usize = 16; - pub fn process_entry(bank: &Bank, entry: &Entry) -> (Result<()>, u64) { if !entry.is_tick() { let old_results = bank.process_transactions(&entry.transactions); @@ -174,81 +171,128 @@ fn process_block( Ok(()) } -/// Starting from the genesis block, append the provided entries to the ledger verifying them -/// along the way. -fn process_ledger( - bank: &Bank, - entries: I, - leader_scheduler: &Arc>, -) -> Result<(u64, Hash)> -where - I: IntoIterator, -{ - let mut last_entry_id = bank.last_id(); - let mut entries_iter = entries.into_iter(); - - trace!("genesis last_id={}", last_entry_id); - - // The first entry in the ledger is a pseudo-tick used only to ensure the number of ticks - // in slot 0 is the same as the number of ticks in all subsequent slots. It is not - // registered as a tick and thus cannot be used as a last_id - let entry0 = entries_iter - .next() - .ok_or(BankError::LedgerVerificationFailed)?; - if !(entry0.is_tick() && entry0.verify(&last_entry_id)) { - warn!("Ledger proof of history failed at entry0"); - return Err(BankError::LedgerVerificationFailed); - } - last_entry_id = entry0.id; - let mut entry_height = 1; - - // Ledger verification needs to be parallelized, but we can't pull the whole - // thing into memory. We therefore chunk it. - for block in &entries_iter.chunks(VERIFY_BLOCK_SIZE) { - let block: Vec<_> = block.collect(); - - if !block.verify(&last_entry_id) { - warn!("Ledger proof of history failed at entry: {}", entry_height); - return Err(BankError::LedgerVerificationFailed); - } - - process_block(bank, &block, leader_scheduler)?; - - last_entry_id = block.last().unwrap().id; - entry_height += block.len() as u64; - } - Ok((entry_height, last_entry_id)) +#[derive(Debug, PartialEq)] +pub struct BankForksInfo { + pub bank_id: u64, + pub entry_height: u64, + pub last_entry_id: Hash, } pub fn process_blocktree( genesis_block: &GenesisBlock, blocktree: &Blocktree, leader_scheduler: &Arc>, -) -> Result<(BankForks, u64, Hash)> { - let bank = Bank::new(&genesis_block); - let slot_height = 0; // Use the Bank's slot_height as its ID. - let bank_forks = BankForks::new(slot_height, bank); - leader_scheduler - .write() - .unwrap() - .update_tick_height(0, &bank_forks.finalized_bank()); - +) -> Result<(BankForks, Vec)> { let now = Instant::now(); info!("processing ledger..."); - let entries = blocktree.read_ledger().expect("opening ledger"); - let (entry_height, last_entry_id) = - process_ledger(&bank_forks.working_bank(), entries, leader_scheduler)?; + + // Setup bank for slot 0 + let (mut bank_forks, mut pending_slots) = { + let bank0 = Bank::new(&genesis_block); + let bank_id = 0; + let slot = 0; + let entry_height = 0; + leader_scheduler + .write() + .unwrap() + .update_tick_height(slot, &bank0); + let last_entry_id = bank0.last_id(); + + ( + BankForks::new(bank_id, bank0), + vec![(slot, bank_id, entry_height, last_entry_id)], + ) + }; + + let mut bank_forks_info = vec![]; + while !pending_slots.is_empty() { + let (slot, bank_id, mut entry_height, mut last_entry_id) = pending_slots.pop().unwrap(); + + bank_forks.set_working_bank_id(bank_id); + let bank = bank_forks.working_bank(); + + // Load the metadata for this slot + let meta = blocktree + .meta(slot) + .map_err(|err| { + warn!("Failed to load meta for slot {}: {:?}", slot, err); + BankError::LedgerVerificationFailed + })? + .unwrap(); + trace!("processing slot {:?}, meta={:?}", slot, meta); + + // Fetch all entries for this slot + let mut entries = blocktree.get_slot_entries(slot, 0, None).map_err(|err| { + warn!("Failed to load entries for slot {}: {:?}", slot, err); + BankError::LedgerVerificationFailed + })?; + + if slot == 0 { + // The first entry in the ledger is a pseudo-tick used only to ensure the number of ticks + // in slot 0 is the same as the number of ticks in all subsequent slots. It is not + // registered as a tick and thus cannot be used as a last_id + if entries.is_empty() { + warn!("entry0 not present"); + return Err(BankError::LedgerVerificationFailed); + } + let entry0 = &entries[0]; + if !(entry0.is_tick() && entry0.verify(&last_entry_id)) { + warn!("Ledger proof of history failed at entry0"); + return Err(BankError::LedgerVerificationFailed); + } + last_entry_id = entry0.id; + entry_height += 1; + entries = entries.drain(1..).collect(); + } + + // Feed the entries into the bank for this slot + if !entries.is_empty() { + if !entries.verify(&last_entry_id) { + warn!("Ledger proof of history failed at entry: {}", entry_height); + return Err(BankError::LedgerVerificationFailed); + } + + process_block(&bank, &entries, &leader_scheduler).map_err(|err| { + warn!("Failed to process entries for slot {}: {:?}", slot, err); + BankError::LedgerVerificationFailed + })?; + + last_entry_id = entries.last().unwrap().id; + entry_height += entries.len() as u64; + } + + match meta.next_slots.len() { + 0 => { + // Reached the end of this fork. Record the final entry height and last entry id + bank_forks_info.push(BankForksInfo { + bank_id, + entry_height, + last_entry_id, + }) + } + 1 => pending_slots.push((meta.next_slots[0], bank_id, entry_height, last_entry_id)), + _ => { + // This is a fork point, create a new child bank for each fork + pending_slots.extend(meta.next_slots.iter().map(|next_slot| { + let child_bank = Bank::new_from_parent(&bank); + trace!("Add child bank for slot={}", next_slot); + let child_bank_id = *next_slot; + bank_forks.insert(child_bank_id, child_bank); + (*next_slot, child_bank_id, entry_height, last_entry_id) + })); + } + } + // reverse sort by slot, so the next slot to be processed can be pop()ed + pending_slots.sort_by(|a, b| b.0.cmp(&a.0)); + } info!( - "processed {} ledger entries in {}ms, tick_height={}...", - entry_height, + "processed ledger in {}ms, forks={}...", duration_as_ms(&now.elapsed()), - bank_forks.working_bank().tick_height() + bank_forks_info.len(), ); - // TODO: probably need to return `entry_height` and `last_entry_id` for *all* banks in - // `bank_forks` instead of just for the `working_bank` - Ok((bank_forks, entry_height, last_entry_id)) + Ok((bank_forks, bank_forks_info)) } #[cfg(test)] @@ -257,7 +301,6 @@ mod tests { use crate::blocktree::tests::entries_to_blobs; use crate::blocktree::{create_tmp_sample_ledger, BlocktreeConfig}; use crate::entry::{create_ticks, next_entry, Entry}; - use crate::leader_scheduler::LeaderSchedulerConfig; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::native_program::ProgramError; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -287,21 +330,15 @@ mod tests { let blocktree_config = &BlocktreeConfig::default(); // Create a new ledger with slot 0 full of ticks - let ( - _mint_keypair, - ledger_path, - tick_height, - _last_entry_height, - _last_id, - mut last_entry_id, - ) = create_tmp_sample_ledger( - "blocktree_with_two_forks", - 10_000, - blocktree_config.ticks_per_slot - 1, - Keypair::new().pubkey(), - 123, - &blocktree_config, - ); + let (_mint_keypair, ledger_path, tick_height, _entry_height, _last_id, mut last_entry_id) = + create_tmp_sample_ledger( + "blocktree_with_two_forks", + 10_000, + blocktree_config.ticks_per_slot - 1, + Keypair::new().pubkey(), + 123, + &blocktree_config, + ); debug!("ledger_path: {:?}", ledger_path); assert_eq!(tick_height, blocktree_config.ticks_per_slot); @@ -321,9 +358,8 @@ mod tests { */ let genesis_block = GenesisBlock::load(&ledger_path).expect("Expected to successfully open genesis block"); - let (blocktree, _ledger_signal_receiver) = - Blocktree::open_with_config_signal(&ledger_path, &blocktree_config) - .expect("Expected to successfully open database ledger"); + let blocktree = Blocktree::open_config(&ledger_path, &blocktree_config) + .expect("Expected to successfully open database ledger"); // Fork 1, ending at slot 3 let last_slot1_entry_id = @@ -350,14 +386,32 @@ mod tests { info!("last_fork1_entry_id: {:?}", last_fork1_entry_id); info!("last_fork2_entry_id: {:?}", last_fork2_entry_id); - let (bank_forks, ledger_height, last_entry_id) = + let (mut bank_forks, bank_forks_info) = process_blocktree(&genesis_block, &blocktree, &leader_scheduler).unwrap(); - // The following asserts loosely demonstrate how `process_blocktree()` currently only - // processes fork1 and ignores fork2. - assert_eq!(last_entry_id, last_fork1_entry_id); - assert_eq!(ledger_height, 4 * blocktree_config.ticks_per_slot); - assert_eq!(bank_forks.working_bank().last_id(), last_entry_id); + assert_eq!(bank_forks_info.len(), 2); // There are two forks + assert_eq!( + bank_forks_info[0], + BankForksInfo { + bank_id: 2, // Fork 1 diverged with slot 2 + entry_height: blocktree_config.ticks_per_slot * 4, + last_entry_id: last_fork1_entry_id, + } + ); + assert_eq!( + bank_forks_info[1], + BankForksInfo { + bank_id: 4, // Fork 2 diverged with slot 4 + entry_height: blocktree_config.ticks_per_slot * 3, + last_entry_id: last_fork2_entry_id, + } + ); + + // Ensure bank_forks holds the right banks + for info in bank_forks_info { + bank_forks.set_working_bank_id(info.bank_id); + assert_eq!(bank_forks.working_bank().last_id(), info.last_entry_id) + } } #[test] @@ -440,66 +494,77 @@ mod tests { assert_eq!(bank.process_transaction(&tx), Ok(())); } - // create a ledger with a tick every `tick_interval` entries and a couple other transactions - fn create_sample_block_with_ticks( - genesis_block: &GenesisBlock, - mint_keypair: &Keypair, - num_one_token_transfers: usize, - tick_interval: usize, - ) -> impl Iterator { + #[test] + fn test_process_ledger_simple() { + let blocktree_config = BlocktreeConfig::default(); + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); + + let ( + mint_keypair, + ledger_path, + tick_height, + mut entry_height, + mut last_id, + mut last_entry_id, + ) = create_tmp_sample_ledger( + "process_ledger_simple", + 100, + 0, + Keypair::new().pubkey(), + 50, + &blocktree_config, + ); + debug!("ledger_path: {:?}", ledger_path); + let genesis_block = + GenesisBlock::load(&ledger_path).expect("Expected to successfully open genesis block"); + let mut entries = vec![]; - - let mut last_id = genesis_block.last_id(); - - // Start off the ledger with the psuedo-tick linked to the genesis block - // (see entry0 in `process_ledger`) - let tick = Entry::new(&genesis_block.last_id(), 1, vec![]); - let mut hash = tick.id; - entries.push(tick); - - for i in 0..num_one_token_transfers { + for _ in 0..3 { // Transfer one token from the mint to a random account let keypair = Keypair::new(); - let tx = SystemTransaction::new_account(mint_keypair, keypair.pubkey(), 1, last_id, 0); - let entry = Entry::new(&hash, 1, vec![tx]); - hash = entry.id; + let tx = SystemTransaction::new_account(&mint_keypair, keypair.pubkey(), 1, last_id, 0); + let entry = Entry::new(&last_entry_id, 1, vec![tx]); + last_entry_id = entry.id; entries.push(entry); // Add a second Transaction that will produce a // ProgramError<0, ResultWithNegativeTokens> error when processed let keypair2 = Keypair::new(); let tx = SystemTransaction::new_account(&keypair, keypair2.pubkey(), 42, last_id, 0); - let entry = Entry::new(&hash, 1, vec![tx]); - hash = entry.id; + let entry = Entry::new(&last_entry_id, 1, vec![tx]); + last_entry_id = entry.id; entries.push(entry); - - if (i + 1) % tick_interval == 0 { - let tick = Entry::new(&hash, 1, vec![]); - hash = tick.id; - last_id = hash; - entries.push(tick); - } } - entries.into_iter() - } - #[test] - fn test_process_ledger_simple() { - let leader_id = Keypair::new().pubkey(); - let leader_scheduler_config = LeaderSchedulerConfig::new(100, 1, 1000); - let (genesis_block, mint_keypair) = GenesisBlock::new_with_leader(100, leader_id, 50); + // Add a tick for good measure + let tick = Entry::new(&last_entry_id, 1, vec![]); + last_entry_id = tick.id; + last_id = last_entry_id; + entries.push(tick); - let ledger = create_sample_block_with_ticks(&genesis_block, &mint_keypair, 3, 3); - let bank = Bank::new(&genesis_block); - assert_eq!(bank.tick_height(), 0); - assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 50); - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new_with_bank( - &leader_scheduler_config, - &bank, - ))); - let (ledger_height, last_id) = process_ledger(&bank, ledger, &leader_scheduler).unwrap(); + let blocktree = Blocktree::open_config(&ledger_path, &blocktree_config) + .expect("Expected to successfully open database ledger"); + + blocktree + .write_entries(0, tick_height, entry_height, &entries) + .unwrap(); + entry_height += entries.len() as u64; + + let (bank_forks, bank_forks_info) = + process_blocktree(&genesis_block, &blocktree, &leader_scheduler).unwrap(); + + assert_eq!(bank_forks_info.len(), 1); + assert_eq!( + bank_forks_info[0], + BankForksInfo { + bank_id: 0, + entry_height, + last_entry_id, + } + ); + + let bank = bank_forks.working_bank(); assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 50 - 3); - assert_eq!(ledger_height, 8); assert_eq!(bank.tick_height(), 1); assert_eq!(bank.last_id(), last_id); } diff --git a/src/fullnode.rs b/src/fullnode.rs index 9aa6a30c0b..6598c43d66 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -1,6 +1,5 @@ //! The `fullnode` module hosts all the fullnode microservices. -use crate::bank_forks::BankForks; use crate::blocktree::{Blocktree, BlocktreeConfig}; use crate::blocktree_processor; use crate::cluster_info::{ClusterInfo, Node, NodeInfo}; @@ -454,21 +453,25 @@ fn new_banks_from_blocktree( blocktree_path: &str, blocktree_config: &BlocktreeConfig, leader_scheduler: &Arc>, -) -> (BankForks, u64, Hash, Blocktree, Receiver) { +) -> (Arc, u64, Hash, Blocktree, Receiver) { let (blocktree, ledger_signal_receiver) = Blocktree::open_with_config_signal(blocktree_path, blocktree_config) .expect("Expected to successfully open database ledger"); let genesis_block = GenesisBlock::load(blocktree_path).expect("Expected to successfully open genesis block"); - let (bank_forks, entry_height, last_entry_id) = + let (mut bank_forks, bank_forks_info) = blocktree_processor::process_blocktree(&genesis_block, &blocktree, leader_scheduler) .expect("process_blocktree failed"); + if bank_forks_info.len() != 1 { + warn!("TODO: figure out what to do with multiple bank forks"); + } + bank_forks.set_working_bank_id(bank_forks_info[0].bank_id); ( - bank_forks, - entry_height, - last_entry_id, + bank_forks.working_bank(), + bank_forks_info[0].entry_height, + bank_forks_info[0].last_entry_id, blocktree, ledger_signal_receiver, ) @@ -480,10 +483,10 @@ pub fn new_bank_from_ledger( ledger_config: &BlocktreeConfig, leader_scheduler: &Arc>, ) -> (Arc, u64, Hash, Blocktree, Receiver) { - let (bank_forks, entry_height, last_entry_id, blocktree, ledger_signal_receiver) = + let (working_bank, entry_height, last_entry_id, blocktree, ledger_signal_receiver) = new_banks_from_blocktree(ledger_path, ledger_config, leader_scheduler); ( - bank_forks.working_bank(), + working_bank, entry_height, last_entry_id, blocktree,