diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index e66af7a7c9..fd001af7c2 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -820,118 +820,121 @@ fn load_frozen_forks( )?; let dev_halt_at_slot = opts.dev_halt_at_slot.unwrap_or(std::u64::MAX); - while !pending_slots.is_empty() { - let (meta, bank, last_entry_hash) = pending_slots.pop().unwrap(); - let slot = bank.slot(); - if last_status_report.elapsed() > Duration::from_secs(2) { - let secs = last_status_report.elapsed().as_secs() as f32; - last_status_report = Instant::now(); - info!( - "processing ledger: slot={}, last root slot={} slots={} slots/s={:?} txs/s={}", + if root_bank.slot() != dev_halt_at_slot { + while !pending_slots.is_empty() { + let (meta, bank, last_entry_hash) = pending_slots.pop().unwrap(); + let slot = bank.slot(); + if last_status_report.elapsed() > Duration::from_secs(2) { + let secs = last_status_report.elapsed().as_secs() as f32; + last_status_report = Instant::now(); + info!( + "processing ledger: slot={}, last root slot={} slots={} slots/s={:?} txs/s={}", + slot, + last_root_slot, + slots_elapsed, + slots_elapsed as f32 / secs, + txs as f32 / secs, + ); + slots_elapsed = 0; + txs = 0; + } + + let allocated = thread_mem_usage::Allocatedp::default(); + let initial_allocation = allocated.get(); + + let mut progress = ConfirmationProgress::new(last_entry_hash); + + if process_single_slot( + blockstore, + &bank, + opts, + recyclers, + &mut progress, + transaction_status_sender.clone(), + None, + ) + .is_err() + { + continue; + } + txs += progress.num_txs; + + // Block must be frozen by this point, otherwise `process_single_slot` would + // have errored above + assert!(bank.is_frozen()); + all_banks.insert(bank.slot(), bank.clone()); + + // If we've reached the last known root in blockstore, start looking + // for newer cluster confirmed roots + let new_root_bank = { + if *root == max_root { + supermajority_root_from_vote_accounts( + bank.slot(), + bank.total_epoch_stake(), + bank.vote_accounts(), + ).and_then(|supermajority_root| { + if supermajority_root > *root { + // If there's a cluster confirmed root greater than our last + // replayed root, then beccause the cluster confirmed root should + // be descended from our last root, it must exist in `all_banks` + let cluster_root_bank = all_banks.get(&supermajority_root).unwrap(); + + // cluster root must be a descendant of our root, otherwise something + // is drastically wrong + assert!(cluster_root_bank.ancestors.contains_key(root)); + info!("blockstore processor found new cluster confirmed root: {}, observed in bank: {}", cluster_root_bank.slot(), bank.slot()); + Some(cluster_root_bank) + } else { + None + } + }) + } else if blockstore.is_root(slot) { + Some(&bank) + } else { + None + } + }; + + if let Some(new_root_bank) = new_root_bank { + *root = new_root_bank.slot(); + last_root_slot = new_root_bank.slot(); + leader_schedule_cache.set_root(&new_root_bank); + new_root_bank.squash(); + + if last_free.elapsed() > Duration::from_secs(30) { + // This could take few secs; so update last_free later + new_root_bank.exhaustively_free_unused_resource(); + last_free = Instant::now(); + } + + // Filter out all non descendants of the new root + pending_slots + .retain(|(_, pending_bank, _)| pending_bank.ancestors.contains_key(root)); + initial_forks.retain(|_, fork_tip_bank| fork_tip_bank.ancestors.contains_key(root)); + all_banks.retain(|_, bank| bank.ancestors.contains_key(root)); + } + + slots_elapsed += 1; + + trace!( + "Bank for {}slot {} is complete. {} bytes allocated", + if last_root_slot == slot { "root " } else { "" }, slot, - last_root_slot, - slots_elapsed, - slots_elapsed as f32 / secs, - txs as f32 / secs, + allocated.since(initial_allocation) ); - slots_elapsed = 0; - txs = 0; - } - let allocated = thread_mem_usage::Allocatedp::default(); - let initial_allocation = allocated.get(); + process_next_slots( + &bank, + &meta, + blockstore, + leader_schedule_cache, + &mut pending_slots, + &mut initial_forks, + )?; - let mut progress = ConfirmationProgress::new(last_entry_hash); - - if process_single_slot( - blockstore, - &bank, - opts, - recyclers, - &mut progress, - transaction_status_sender.clone(), - None, - ) - .is_err() - { - continue; - } - txs += progress.num_txs; - - // Block must be frozen by this point, otherwise `process_single_slot` would - // have errored above - assert!(bank.is_frozen()); - all_banks.insert(bank.slot(), bank.clone()); - - // If we've reached the last known root in blockstore, start looking - // for newer cluster confirmed roots - let new_root_bank = { - if *root == max_root { - supermajority_root_from_vote_accounts( - bank.slot(), - bank.total_epoch_stake(), - bank.vote_accounts(), - ).and_then(|supermajority_root| { - if supermajority_root > *root { - // If there's a cluster confirmed root greater than our last - // replayed root, then beccause the cluster confirmed root should - // be descended from our last root, it must exist in `all_banks` - let cluster_root_bank = all_banks.get(&supermajority_root).unwrap(); - - // cluster root must be a descendant of our root, otherwise something - // is drastically wrong - assert!(cluster_root_bank.ancestors.contains_key(root)); - info!("blockstore processor found new cluster confirmed root: {}, observed in bank: {}", cluster_root_bank.slot(), bank.slot()); - Some(cluster_root_bank) - } else { - None - } - }) - } else if blockstore.is_root(slot) { - Some(&bank) - } else { - None + if slot >= dev_halt_at_slot { + break; } - }; - - if let Some(new_root_bank) = new_root_bank { - *root = new_root_bank.slot(); - last_root_slot = new_root_bank.slot(); - leader_schedule_cache.set_root(&new_root_bank); - new_root_bank.squash(); - - if last_free.elapsed() > Duration::from_secs(30) { - // This could take few secs; so update last_free later - new_root_bank.exhaustively_free_unused_resource(); - last_free = Instant::now(); - } - - // Filter out all non descendants of the new root - pending_slots.retain(|(_, pending_bank, _)| pending_bank.ancestors.contains_key(root)); - initial_forks.retain(|_, fork_tip_bank| fork_tip_bank.ancestors.contains_key(root)); - all_banks.retain(|_, bank| bank.ancestors.contains_key(root)); - } - - slots_elapsed += 1; - - trace!( - "Bank for {}slot {} is complete. {} bytes allocated", - if last_root_slot == slot { "root " } else { "" }, - slot, - allocated.since(initial_allocation) - ); - - process_next_slots( - &bank, - &meta, - blockstore, - leader_schedule_cache, - &mut pending_slots, - &mut initial_forks, - )?; - - if slot >= dev_halt_at_slot { - break; } } @@ -2607,6 +2610,37 @@ pub mod tests { assert_eq!(bank.process_transaction(&fail_tx), Ok(())); } + #[test] + fn test_halt_at_slot_starting_snapshot_root() { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(123); + + // Create roots at slots 0, 1 + let forks = tr(0) / tr(1); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + blockstore.add_tree( + forks, + false, + true, + genesis_config.ticks_per_slot, + genesis_config.hash(), + ); + blockstore.set_roots(&[0, 1]).unwrap(); + + // Specify halting at slot 0 + let opts = ProcessOptions { + poh_verify: true, + dev_halt_at_slot: Some(0), + ..ProcessOptions::default() + }; + let (bank_forks, _leader_schedule) = + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts).unwrap(); + + // Should be able to fetch slot 0 because we specified halting at slot 0, even + // if there is a greater root at slot 1. + assert!(bank_forks.get(0).is_some()); + } + #[test] fn test_process_blockstore_from_root() { let GenesisConfigInfo {