persist set_root() and use it in blocktree_processor to limit squashes (#3782)

* rename locktower's slot to epoch

* persist set_root() and use it in blocktree_processor to limit squashes
This commit is contained in:
Rob Walker
2019-04-15 13:12:28 -07:00
committed by GitHub
parent 8963500aa8
commit 64c6f05da2
4 changed files with 61 additions and 62 deletions

View File

@ -160,20 +160,6 @@ impl Blocktree {
self.orphans_cf.get(slot) self.orphans_cf.get(slot)
} }
pub fn reset_slot_consumed(&self, slot: u64) -> Result<()> {
if let Some(mut meta) = self.meta_cf.get(slot)? {
for index in 0..meta.received {
self.data_cf.delete((slot, index))?;
}
meta.consumed = 0;
meta.received = 0;
meta.last_index = std::u64::MAX;
meta.next_slots = vec![];
self.meta_cf.put(0, &meta)?;
}
Ok(())
}
pub fn get_next_slot(&self, slot: u64) -> Result<Option<u64>> { pub fn get_next_slot(&self, slot: u64) -> Result<Option<u64>> {
let mut db_iterator = self.db.cursor::<cf::SlotMeta>()?; let mut db_iterator = self.db.cursor::<cf::SlotMeta>()?;
db_iterator.seek(slot + 1); db_iterator.seek(slot + 1);
@ -771,8 +757,22 @@ impl Blocktree {
Ok(entries) Ok(entries)
} }
pub fn set_root(&self, root: u64) { pub fn is_root(&self, slot: u64) -> bool {
*self.root_slot.write().unwrap() = root; if let Ok(Some(meta)) = self.meta(slot) {
meta.is_root
} else {
false
}
}
pub fn set_root(&self, slot: u64) -> Result<()> {
*self.root_slot.write().unwrap() = slot;
if let Some(mut meta) = self.meta_cf.get(slot)? {
meta.is_root = true;
self.meta_cf.put(slot, &meta)?;
}
Ok(())
} }
pub fn get_orphans(&self, max: Option<usize>) -> Vec<u64> { pub fn get_orphans(&self, max: Option<usize>) -> Vec<u64> {
@ -1481,35 +1481,6 @@ pub mod tests {
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
} }
#[test]
fn test_overwrite_entries() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
let ticks_per_slot = 10;
let num_ticks = 2;
let mut ticks = create_ticks(num_ticks * 2, Hash::default());
let ticks2 = ticks.split_off(num_ticks as usize);
assert_eq!(ticks.len(), ticks2.len());
{
let ledger = Blocktree::open(&ledger_path).unwrap();
ledger
.write_entries(0, 0, 0, ticks_per_slot, &ticks)
.unwrap();
ledger.reset_slot_consumed(0).unwrap();
ledger
.write_entries(0, 0, 0, ticks_per_slot, &ticks2)
.unwrap();
let ledger_ticks = ledger.get_slot_entries(0, 0, None).unwrap();
assert_eq!(ledger_ticks.len(), ticks2.len());
assert_eq!(ledger_ticks, ticks2);
}
Blocktree::destroy(&ledger_path).unwrap();
}
#[test] #[test]
fn test_put_get_simple() { fn test_put_get_simple() {
let ledger_path = get_tmp_ledger_path("test_put_get_simple"); let ledger_path = get_tmp_ledger_path("test_put_get_simple");

View File

@ -24,6 +24,8 @@ pub struct SlotMeta {
// True if this slot is full (consumed == last_index + 1) and if every // True if this slot is full (consumed == last_index + 1) and if every
// slot that is a parent of this slot is also connected. // slot that is a parent of this slot is also connected.
pub is_connected: bool, pub is_connected: bool,
// True if this slot is a root
pub is_root: bool,
} }
impl SlotMeta { impl SlotMeta {
@ -51,6 +53,7 @@ impl SlotMeta {
parent_slot, parent_slot,
next_slots: vec![], next_slots: vec![],
is_connected: slot == 0, is_connected: slot == 0,
is_root: false,
last_index: std::u64::MAX, last_index: std::u64::MAX,
} }
} }

View File

@ -185,8 +185,11 @@ pub fn process_blocktree(
entry_height += entries.len() as u64; entry_height += entries.len() as u64;
} }
// TODO merge with locktower, voting, bank.vote_accounts()... bank.freeze(); // all banks handled by this routine are created from complete slots
bank.squash();
if blocktree.is_root(slot) {
bank.squash();
}
if meta.next_slots.is_empty() { if meta.next_slots.is_empty() {
// Reached the end of this fork. Record the final entry height and last entry.hash // Reached the end of this fork. Record the final entry height and last entry.hash
@ -354,7 +357,7 @@ mod tests {
slot 0 slot 0
| |
slot 1 slot 1 <-- set_root(true)
/ \ / \
slot 2 | slot 2 |
/ | / |
@ -381,6 +384,9 @@ mod tests {
info!("last_fork1_entry.hash: {:?}", last_fork1_entry_hash); info!("last_fork1_entry.hash: {:?}", last_fork1_entry_hash);
info!("last_fork2_entry.hash: {:?}", last_fork2_entry_hash); info!("last_fork2_entry.hash: {:?}", last_fork2_entry_hash);
blocktree.set_root(0).unwrap();
blocktree.set_root(1).unwrap();
let (bank_forks, bank_forks_info) = let (bank_forks, bank_forks_info) =
process_blocktree(&genesis_block, &blocktree, None).unwrap(); process_blocktree(&genesis_block, &blocktree, None).unwrap();
@ -392,6 +398,14 @@ mod tests {
entry_height: ticks_per_slot * 4, entry_height: ticks_per_slot * 4,
} }
); );
assert_eq!(
&bank_forks[3]
.parents()
.iter()
.map(|bank| bank.slot())
.collect::<Vec<_>>(),
&[2, 1]
);
assert_eq!( assert_eq!(
bank_forks_info[1], bank_forks_info[1],
BankForksInfo { BankForksInfo {
@ -399,9 +413,16 @@ mod tests {
entry_height: ticks_per_slot * 3, entry_height: ticks_per_slot * 3,
} }
); );
assert_eq!(
&bank_forks[4]
.parents()
.iter()
.map(|bank| bank.slot())
.collect::<Vec<_>>(),
&[1]
);
// Ensure bank_forks holds the right banks, and that everything's // Ensure bank_forks holds the right banks
// frozen
for info in bank_forks_info { for info in bank_forks_info {
assert_eq!(bank_forks[info.bank_slot].slot(), info.bank_slot); assert_eq!(bank_forks[info.bank_slot].slot(), info.bank_slot);
assert!(bank_forks[info.bank_slot].is_frozen()); assert!(bank_forks[info.bank_slot].is_frozen());

View File

@ -9,7 +9,7 @@ use crate::leader_schedule_utils;
use crate::locktower::{Locktower, StakeLockout}; use crate::locktower::{Locktower, StakeLockout};
use crate::packet::BlobError; use crate::packet::BlobError;
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
use crate::result; use crate::result::{Error, Result};
use crate::rpc_subscriptions::RpcSubscriptions; use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service; use crate::service::Service;
use hashbrown::HashMap; use hashbrown::HashMap;
@ -50,7 +50,7 @@ impl Drop for Finalizer {
} }
pub struct ReplayStage { pub struct ReplayStage {
t_replay: JoinHandle<result::Result<()>>, t_replay: JoinHandle<Result<()>>,
} }
#[derive(Default)] #[derive(Default)]
@ -98,7 +98,9 @@ impl ReplayStage {
let mut ticks_per_slot = 0; let mut ticks_per_slot = 0;
let mut locktower = Locktower::new_from_forks(&bank_forks.read().unwrap(), &my_id); let mut locktower = Locktower::new_from_forks(&bank_forks.read().unwrap(), &my_id);
if let Some(root) = locktower.root() { if let Some(root) = locktower.root() {
blocktree.set_root(root); blocktree
.set_root(root)
.expect("blocktree.set_root() failed at replay_stage startup");
} }
// Start the replay stage loop // Start the replay stage loop
let t_replay = Builder::new() let t_replay = Builder::new()
@ -148,7 +150,7 @@ impl ReplayStage {
&vote_account, &vote_account,
&cluster_info, &cluster_info,
&blocktree, &blocktree,
); )?;
Self::reset_poh_recorder( Self::reset_poh_recorder(
&my_id, &my_id,
@ -271,7 +273,7 @@ impl ReplayStage {
blocktree: &Blocktree, blocktree: &Blocktree,
progress: &mut HashMap<u64, ForkProgress>, progress: &mut HashMap<u64, ForkProgress>,
forward_entry_sender: &EntrySender, forward_entry_sender: &EntrySender,
) -> result::Result<()> { ) -> Result<()> {
let (entries, num) = Self::load_blocktree_entries(bank, blocktree, progress)?; let (entries, num) = Self::load_blocktree_entries(bank, blocktree, progress)?;
let len = entries.len(); let len = entries.len();
let result = let result =
@ -296,12 +298,13 @@ impl ReplayStage {
vote_account_pubkey: &Pubkey, vote_account_pubkey: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
) where ) -> Result<()>
where
T: 'static + KeypairUtil + Send + Sync, T: 'static + KeypairUtil + Send + Sync,
{ {
if let Some(new_root) = locktower.record_vote(bank.slot()) { if let Some(new_root) = locktower.record_vote(bank.slot()) {
bank_forks.write().unwrap().set_root(new_root); bank_forks.write().unwrap().set_root(new_root);
blocktree.set_root(new_root); blocktree.set_root(new_root)?;
Self::handle_new_root(&bank_forks, progress); Self::handle_new_root(&bank_forks, progress);
} }
locktower.update_epoch(&bank); locktower.update_epoch(&bank);
@ -315,6 +318,7 @@ impl ReplayStage {
); );
cluster_info.write().unwrap().push_vote(vote_tx); cluster_info.write().unwrap().push_vote(vote_tx);
} }
Ok(())
} }
fn reset_poh_recorder( fn reset_poh_recorder(
@ -349,7 +353,7 @@ impl ReplayStage {
progress: &mut HashMap<u64, ForkProgress>, progress: &mut HashMap<u64, ForkProgress>,
forward_entry_sender: &EntrySender, forward_entry_sender: &EntrySender,
slot_full_sender: &Sender<(u64, Pubkey)>, slot_full_sender: &Sender<(u64, Pubkey)>,
) -> result::Result<()> { ) -> Result<()> {
let active_banks = bank_forks.read().unwrap().active_banks(); let active_banks = bank_forks.read().unwrap().active_banks();
trace!("active banks {:?}", active_banks); trace!("active banks {:?}", active_banks);
@ -484,7 +488,7 @@ impl ReplayStage {
bank: &Bank, bank: &Bank,
blocktree: &Blocktree, blocktree: &Blocktree,
progress: &mut HashMap<u64, ForkProgress>, progress: &mut HashMap<u64, ForkProgress>,
) -> result::Result<(Vec<Entry>, usize)> { ) -> Result<(Vec<Entry>, usize)> {
let bank_slot = bank.slot(); let bank_slot = bank.slot();
let bank_progress = &mut progress let bank_progress = &mut progress
.entry(bank_slot) .entry(bank_slot)
@ -498,7 +502,7 @@ impl ReplayStage {
progress: &mut HashMap<u64, ForkProgress>, progress: &mut HashMap<u64, ForkProgress>,
forward_entry_sender: &EntrySender, forward_entry_sender: &EntrySender,
num: usize, num: usize,
) -> result::Result<()> { ) -> Result<()> {
let bank_progress = &mut progress let bank_progress = &mut progress
.entry(bank.slot()) .entry(bank.slot())
.or_insert(ForkProgress::new(bank.last_blockhash())); .or_insert(ForkProgress::new(bank.last_blockhash()));
@ -517,7 +521,7 @@ impl ReplayStage {
bank: &Bank, bank: &Bank,
entries: &[Entry], entries: &[Entry],
last_entry: &Hash, last_entry: &Hash,
) -> result::Result<()> { ) -> Result<()> {
if !entries.verify(last_entry) { if !entries.verify(last_entry) {
trace!( trace!(
"entry verification failed {} {} {} {}", "entry verification failed {} {} {} {}",
@ -526,7 +530,7 @@ impl ReplayStage {
last_entry, last_entry,
bank.last_blockhash() bank.last_blockhash()
); );
return Err(result::Error::BlobError(BlobError::VerificationFailed)); return Err(Error::BlobError(BlobError::VerificationFailed));
} }
blocktree_processor::process_entries(bank, entries)?; blocktree_processor::process_entries(bank, entries)?;