Replay Stage start_leader() can use wrong parent fork() (#3238)

*  Make sure start_leader starts on the last voted block, not necessarily the biggest indexed bank in frozen_slots()

* Fix tvu test
This commit is contained in:
carllin
2019-03-12 17:42:53 -07:00
committed by GitHub
parent 76feb2098e
commit cb3eeace56
7 changed files with 147 additions and 130 deletions

View File

@ -432,7 +432,7 @@ pub fn create_test_recorder(
) { ) {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (poh_recorder, entry_receiver) = let (poh_recorder, entry_receiver) =
PohRecorder::new(bank.tick_height(), bank.last_blockhash()); PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot());
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit); let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit);
(exit, poh_recorder, poh_service, entry_receiver) (exit, poh_recorder, poh_service, entry_receiver)
@ -641,7 +641,7 @@ mod tests {
}; };
let (poh_recorder, entry_receiver) = let (poh_recorder, entry_receiver) =
PohRecorder::new(bank.tick_height(), bank.last_blockhash()); PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot());
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_working_bank(working_bank); poh_recorder.lock().unwrap().set_working_bank(working_bank);
@ -694,7 +694,7 @@ mod tests {
max_tick_height: bank.tick_height() + 1, max_tick_height: bank.tick_height() + 1,
}; };
let (poh_recorder, entry_receiver) = let (poh_recorder, entry_receiver) =
PohRecorder::new(bank.tick_height(), bank.last_blockhash()); PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot());
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_working_bank(working_bank); poh_recorder.lock().unwrap().set_working_bank(working_bank);

View File

@ -107,7 +107,7 @@ impl Fullnode {
bank.last_blockhash(), bank.last_blockhash(),
); );
let (poh_recorder, entry_receiver) = let (poh_recorder, entry_receiver) =
PohRecorder::new(bank.tick_height(), bank.last_blockhash()); PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot());
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit); let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit);
poh_recorder.lock().unwrap().clear_bank_signal = poh_recorder.lock().unwrap().clear_bank_signal =

View File

@ -43,8 +43,8 @@ pub fn num_ticks_left_in_slot(bank: &Bank, tick_height: u64) -> u64 {
bank.ticks_per_slot() - tick_height % bank.ticks_per_slot() - 1 bank.ticks_per_slot() - tick_height % bank.ticks_per_slot() - 1
} }
pub fn tick_height_to_slot(bank: &Bank, tick_height: u64) -> u64 { pub fn tick_height_to_slot(ticks_per_slot: u64, tick_height: u64) -> u64 {
tick_height / bank.ticks_per_slot() tick_height / ticks_per_slot
} }
#[cfg(test)] #[cfg(test)]

View File

@ -37,10 +37,11 @@ pub struct WorkingBank {
pub struct PohRecorder { pub struct PohRecorder {
pub poh: Poh, pub poh: Poh,
pub clear_bank_signal: Option<SyncSender<bool>>,
start_slot: u64,
tick_cache: Vec<(Entry, u64)>, tick_cache: Vec<(Entry, u64)>,
working_bank: Option<WorkingBank>, working_bank: Option<WorkingBank>,
sender: Sender<WorkingBankEntries>, sender: Sender<WorkingBankEntries>,
pub clear_bank_signal: Option<SyncSender<bool>>,
} }
impl PohRecorder { impl PohRecorder {
@ -57,14 +58,20 @@ impl PohRecorder {
self.poh.hash(); self.poh.hash();
} }
pub fn start_slot(&self) -> u64 {
self.start_slot
}
pub fn bank(&self) -> Option<Arc<Bank>> { pub fn bank(&self) -> Option<Arc<Bank>> {
self.working_bank.clone().map(|w| w.bank) self.working_bank.clone().map(|w| w.bank)
} }
pub fn tick_height(&self) -> u64 { pub fn tick_height(&self) -> u64 {
self.poh.tick_height self.poh.tick_height
} }
// synchronize PoH with a bank // synchronize PoH with a bank
pub fn reset(&mut self, tick_height: u64, blockhash: Hash) { pub fn reset(&mut self, tick_height: u64, blockhash: Hash, start_slot: u64) {
self.clear_bank(); self.clear_bank();
let existing = self.tick_cache.iter().any(|(entry, entry_tick_height)| { let existing = self.tick_cache.iter().any(|(entry, entry_tick_height)| {
if entry.hash == blockhash { if entry.hash == blockhash {
@ -85,6 +92,7 @@ impl PohRecorder {
self.poh.hash, self.poh.tick_height, blockhash, tick_height, self.poh.hash, self.poh.tick_height, blockhash, tick_height,
); );
std::mem::swap(&mut cache, &mut self.tick_cache); std::mem::swap(&mut cache, &mut self.tick_cache);
self.start_slot = start_slot;
self.poh = Poh::new(blockhash, tick_height); self.poh = Poh::new(blockhash, tick_height);
} }
@ -180,7 +188,11 @@ impl PohRecorder {
/// A recorder to synchronize PoH with the following data structures /// A recorder to synchronize PoH with the following data structures
/// * bank - the LastId's queue is updated on `tick` and `record` events /// * bank - the LastId's queue is updated on `tick` and `record` events
/// * sender - the Entry channel that outputs to the ledger /// * sender - the Entry channel that outputs to the ledger
pub fn new(tick_height: u64, last_entry_hash: Hash) -> (Self, Receiver<WorkingBankEntries>) { pub fn new(
tick_height: u64,
last_entry_hash: Hash,
start_slot: u64,
) -> (Self, Receiver<WorkingBankEntries>) {
let poh = Poh::new(last_entry_hash, tick_height); let poh = Poh::new(last_entry_hash, tick_height);
let (sender, receiver) = channel(); let (sender, receiver) = channel();
( (
@ -190,6 +202,7 @@ impl PohRecorder {
working_bank: None, working_bank: None,
sender, sender,
clear_bank_signal: None, clear_bank_signal: None,
start_slot,
}, },
receiver, receiver,
) )
@ -241,7 +254,7 @@ mod tests {
#[test] #[test]
fn test_poh_recorder_no_zero_tick() { fn test_poh_recorder_no_zero_tick() {
let prev_hash = Hash::default(); let prev_hash = Hash::default();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash); let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0);
poh_recorder.tick(); poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 1); assert_eq!(poh_recorder.tick_cache.len(), 1);
assert_eq!(poh_recorder.tick_cache[0].1, 1); assert_eq!(poh_recorder.tick_cache[0].1, 1);
@ -251,7 +264,7 @@ mod tests {
#[test] #[test]
fn test_poh_recorder_tick_height_is_last_tick() { fn test_poh_recorder_tick_height_is_last_tick() {
let prev_hash = Hash::default(); let prev_hash = Hash::default();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash); let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0);
poh_recorder.tick(); poh_recorder.tick();
poh_recorder.tick(); poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 2); assert_eq!(poh_recorder.tick_cache.len(), 2);
@ -261,10 +274,10 @@ mod tests {
#[test] #[test]
fn test_poh_recorder_reset_clears_cache() { fn test_poh_recorder_reset_clears_cache() {
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0);
poh_recorder.tick(); poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 1); assert_eq!(poh_recorder.tick_cache.len(), 1);
poh_recorder.reset(0, Hash::default()); poh_recorder.reset(0, Hash::default(), 0);
assert_eq!(poh_recorder.tick_cache.len(), 0); assert_eq!(poh_recorder.tick_cache.len(), 0);
} }
@ -273,7 +286,7 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash(); let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash); let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0);
let working_bank = WorkingBank { let working_bank = WorkingBank {
bank, bank,
@ -291,7 +304,7 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash(); let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0);
let working_bank = WorkingBank { let working_bank = WorkingBank {
bank: bank.clone(), bank: bank.clone(),
@ -321,7 +334,7 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash(); let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0);
poh_recorder.tick(); poh_recorder.tick();
poh_recorder.tick(); poh_recorder.tick();
@ -349,7 +362,7 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash(); let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0);
let working_bank = WorkingBank { let working_bank = WorkingBank {
bank, bank,
@ -369,7 +382,7 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash(); let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0);
let working_bank = WorkingBank { let working_bank = WorkingBank {
bank, bank,
@ -398,7 +411,7 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash(); let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0);
let working_bank = WorkingBank { let working_bank = WorkingBank {
bank, bank,
@ -424,7 +437,7 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash(); let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0);
let working_bank = WorkingBank { let working_bank = WorkingBank {
bank, bank,
@ -443,28 +456,30 @@ mod tests {
#[test] #[test]
fn test_reset_current() { fn test_reset_current() {
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0);
poh_recorder.tick(); poh_recorder.tick();
poh_recorder.tick(); poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 2); assert_eq!(poh_recorder.tick_cache.len(), 2);
poh_recorder.reset(poh_recorder.poh.tick_height, poh_recorder.poh.hash); poh_recorder.reset(poh_recorder.poh.tick_height, poh_recorder.poh.hash, 0);
assert_eq!(poh_recorder.tick_cache.len(), 2); assert_eq!(poh_recorder.tick_cache.len(), 2);
} }
#[test] #[test]
fn test_reset_with_cached() { fn test_reset_with_cached() {
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0);
poh_recorder.tick(); poh_recorder.tick();
poh_recorder.tick(); poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 2); assert_eq!(poh_recorder.tick_cache.len(), 2);
poh_recorder.reset( poh_recorder.reset(
poh_recorder.tick_cache[0].1, poh_recorder.tick_cache[0].1,
poh_recorder.tick_cache[0].0.hash, poh_recorder.tick_cache[0].0.hash,
0,
); );
assert_eq!(poh_recorder.tick_cache.len(), 2); assert_eq!(poh_recorder.tick_cache.len(), 2);
poh_recorder.reset( poh_recorder.reset(
poh_recorder.tick_cache[1].1, poh_recorder.tick_cache[1].1,
poh_recorder.tick_cache[1].0.hash, poh_recorder.tick_cache[1].0.hash,
0,
); );
assert_eq!(poh_recorder.tick_cache.len(), 2); assert_eq!(poh_recorder.tick_cache.len(), 2);
} }
@ -472,7 +487,7 @@ mod tests {
#[test] #[test]
#[should_panic] #[should_panic]
fn test_reset_with_cached_bad_height() { fn test_reset_with_cached_bad_height() {
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0);
poh_recorder.tick(); poh_recorder.tick();
poh_recorder.tick(); poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 2); assert_eq!(poh_recorder.tick_cache.len(), 2);
@ -480,18 +495,19 @@ mod tests {
poh_recorder.reset( poh_recorder.reset(
poh_recorder.tick_cache[0].1, poh_recorder.tick_cache[0].1,
poh_recorder.tick_cache[1].0.hash, poh_recorder.tick_cache[1].0.hash,
0,
); );
} }
#[test] #[test]
fn test_reset_to_new_value() { fn test_reset_to_new_value() {
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0);
poh_recorder.tick(); poh_recorder.tick();
poh_recorder.tick(); poh_recorder.tick();
poh_recorder.tick(); poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 3); assert_eq!(poh_recorder.tick_cache.len(), 3);
assert_eq!(poh_recorder.poh.tick_height, 3); assert_eq!(poh_recorder.poh.tick_height, 3);
poh_recorder.reset(1, hash(b"hello")); poh_recorder.reset(1, hash(b"hello"), 0);
assert_eq!(poh_recorder.tick_cache.len(), 0); assert_eq!(poh_recorder.tick_cache.len(), 0);
poh_recorder.tick(); poh_recorder.tick();
assert_eq!(poh_recorder.poh.tick_height, 2); assert_eq!(poh_recorder.poh.tick_height, 2);
@ -501,14 +517,14 @@ mod tests {
fn test_reset_clear_bank() { fn test_reset_clear_bank() {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0);
let working_bank = WorkingBank { let working_bank = WorkingBank {
bank, bank,
min_tick_height: 2, min_tick_height: 2,
max_tick_height: 3, max_tick_height: 3,
}; };
poh_recorder.set_working_bank(working_bank); poh_recorder.set_working_bank(working_bank);
poh_recorder.reset(1, hash(b"hello")); poh_recorder.reset(1, hash(b"hello"), 0);
assert!(poh_recorder.working_bank.is_none()); assert!(poh_recorder.working_bank.is_none());
} }
@ -516,7 +532,7 @@ mod tests {
pub fn test_clear_signal() { pub fn test_clear_signal() {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0);
let (sender, receiver) = sync_channel(1); let (sender, receiver) = sync_channel(1);
poh_recorder.set_bank(&bank); poh_recorder.set_bank(&bank);
poh_recorder.clear_bank_signal = Some(sender); poh_recorder.clear_bank_signal = Some(sender);

View File

@ -110,7 +110,8 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash(); let prev_hash = bank.last_blockhash();
let (poh_recorder, entry_receiver) = PohRecorder::new(bank.tick_height(), prev_hash); let (poh_recorder, entry_receiver) =
PohRecorder::new(bank.tick_height(), prev_hash, bank.slot());
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let working_bank = WorkingBank { let working_bank = WorkingBank {

View File

@ -76,6 +76,7 @@ impl ReplayStage {
let poh_recorder = poh_recorder.clone(); let poh_recorder = poh_recorder.clone();
let my_id = *my_id; let my_id = *my_id;
let vote_account = *vote_account; let vote_account = *vote_account;
let mut ticks_per_slot = 0;
// Start the replay stage loop // Start the replay stage loop
let t_replay = Builder::new() let t_replay = Builder::new()
@ -92,10 +93,11 @@ impl ReplayStage {
Self::generate_new_bank_forks(&blocktree, &mut bank_forks.write().unwrap()); Self::generate_new_bank_forks(&blocktree, &mut bank_forks.write().unwrap());
let active_banks = bank_forks.read().unwrap().active_banks(); let active_banks = bank_forks.read().unwrap().active_banks();
trace!("active banks {:?}", active_banks); trace!("active banks {:?}", active_banks);
let mut votable: Vec<u64> = vec![]; let mut votable: Vec<Arc<Bank>> = vec![];
let mut is_tpu_bank_active = poh_recorder.lock().unwrap().bank().is_some(); let mut is_tpu_bank_active = poh_recorder.lock().unwrap().bank().is_some();
for bank_slot in &active_banks { for bank_slot in &active_banks {
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone(); let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
ticks_per_slot = bank.ticks_per_slot();
if bank.collector_id() != my_id { if bank.collector_id() != my_id {
Self::replay_blocktree_into_bank( Self::replay_blocktree_into_bank(
&bank, &bank,
@ -108,49 +110,63 @@ impl ReplayStage {
if bank.tick_height() == max_tick_height { if bank.tick_height() == max_tick_height {
bank.freeze(); bank.freeze();
info!("bank frozen {}", bank.slot()); info!("bank frozen {}", bank.slot());
votable.push(*bank_slot);
progress.remove(bank_slot); progress.remove(bank_slot);
if let Err(e) = if let Err(e) =
slot_full_sender.send((bank.slot(), bank.collector_id())) slot_full_sender.send((bank.slot(), bank.collector_id()))
{ {
info!("{} slot_full alert failed: {:?}", my_id, e); info!("{} slot_full alert failed: {:?}", my_id, e);
} }
votable.push(bank);
} }
} }
if ticks_per_slot == 0 {
let frozen_banks = bank_forks.read().unwrap().frozen_banks();
let bank = frozen_banks.values().next().unwrap();
ticks_per_slot = bank.ticks_per_slot();
}
// TODO: fork selection // TODO: fork selection
// vote on the latest one for now // vote on the latest one for now
votable.sort(); votable.sort_by(|b1, b2| b1.slot().cmp(&b2.slot()));
if let Some(latest_slot_vote) = votable.last() { if let Some(bank) = votable.last() {
let parent = bank_forks subscriptions.notify_subscribers(&bank);
.read()
.unwrap()
.get(*latest_slot_vote)
.unwrap()
.clone();
subscriptions.notify_subscribers(&parent);
if let Some(ref voting_keypair) = voting_keypair { if let Some(ref voting_keypair) = voting_keypair {
let keypair = voting_keypair.as_ref(); let keypair = voting_keypair.as_ref();
let vote = VoteTransaction::new_vote( let vote = VoteTransaction::new_vote(
&vote_account, &vote_account,
keypair, keypair,
*latest_slot_vote, bank.slot(),
parent.last_blockhash(), bank.last_blockhash(),
0, 0,
); );
cluster_info.write().unwrap().push_vote(vote); cluster_info.write().unwrap().push_vote(vote);
} }
poh_recorder poh_recorder.lock().unwrap().reset(
.lock() bank.tick_height(),
.unwrap() bank.last_blockhash(),
.reset(parent.tick_height(), parent.last_blockhash()); bank.slot(),
);
is_tpu_bank_active = false; is_tpu_bank_active = false;
} }
if !is_tpu_bank_active { if !is_tpu_bank_active {
Self::start_leader(&my_id, &bank_forks, &poh_recorder, &cluster_info); assert!(ticks_per_slot > 0);
let poh_tick_height = poh_recorder.lock().unwrap().tick_height();
let poh_slot = leader_schedule_utils::tick_height_to_slot(
ticks_per_slot,
poh_tick_height + 1,
);
Self::start_leader(
&my_id,
&bank_forks,
&poh_recorder,
&cluster_info,
&blocktree,
poh_slot,
);
} }
inc_new_counter_info!( inc_new_counter_info!(
@ -179,58 +195,50 @@ impl ReplayStage {
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
blocktree: &Blocktree,
poh_slot: u64,
) { ) {
let frozen = bank_forks.read().unwrap().frozen_banks(); trace!("{} checking poh slot {}", my_id, poh_slot);
if blocktree.meta(poh_slot).unwrap().is_some() {
// We've already broadcasted entries for this slot, skip it
return;
}
if bank_forks.read().unwrap().get(poh_slot).is_none() {
let frozen = bank_forks.read().unwrap().frozen_banks();
let parent_slot = poh_recorder.lock().unwrap().start_slot();
assert!(frozen.contains_key(&parent_slot));
let parent = &frozen[&parent_slot];
// TODO: fork selection leader_schedule_utils::slot_leader_at(poh_slot, parent)
let mut newest_frozen: Vec<(&u64, &Arc<Bank>)> = frozen.iter().collect(); .map(|next_leader| {
newest_frozen.sort_by_key(|x| *x.0); debug!(
if let Some((_, parent)) = newest_frozen.last() { "me: {} leader {} at poh slot {}",
let poh_tick_height = poh_recorder.lock().unwrap().tick_height(); my_id, next_leader, poh_slot
let poh_slot = leader_schedule_utils::tick_height_to_slot(parent, poh_tick_height + 1); );
trace!("checking poh slot for leader {}", poh_slot); cluster_info.write().unwrap().set_leader(&next_leader);
if frozen.get(&poh_slot).is_some() { if next_leader == *my_id {
// Already been a leader for this slot, skip it debug!("{} starting tpu for slot {}", my_id, poh_slot);
return; let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot);
} bank_forks.write().unwrap().insert(poh_slot, tpu_bank);
if bank_forks.read().unwrap().get(poh_slot).is_none() { if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() {
leader_schedule_utils::slot_leader_at(poh_slot, parent) assert_eq!(
.map(|next_leader| { bank_forks.read().unwrap().working_bank().slot(),
debug!( tpu_bank.slot()
"me: {} leader {} at poh slot {}", );
my_id, next_leader, poh_slot debug!(
); "poh_recorder new working bank: me: {} next_slot: {} next_leader: {}",
cluster_info.write().unwrap().set_leader(&next_leader); my_id,
if next_leader == *my_id { tpu_bank.slot(),
debug!("starting tpu for slot {}", poh_slot); next_leader
let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot); );
bank_forks.write().unwrap().insert(poh_slot, tpu_bank); poh_recorder.lock().unwrap().set_bank(&tpu_bank);
if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() {
assert_eq!(
bank_forks.read().unwrap().working_bank().slot(),
tpu_bank.slot()
);
debug!(
"poh_recorder new working bank: me: {} next_slot: {} next_leader: {}",
my_id,
tpu_bank.slot(),
next_leader
);
poh_recorder.lock().unwrap().set_bank(&tpu_bank);
inc_new_counter_info!(
"replay_stage-new_leader",
tpu_bank.slot() as usize
);
}
} }
}) }
.or_else(|| { })
error!("No next leader found"); .or_else(|| {
None error!("{} No next leader found", my_id);
}); None
} });
} else {
error!("No frozen banks available!");
} }
} }
pub fn replay_blocktree_into_bank( pub fn replay_blocktree_into_bank(

View File

@ -2,13 +2,12 @@
extern crate solana; extern crate solana;
use log::*; use log::*;
use solana::bank_forks::BankForks;
use solana::banking_stage::create_test_recorder; use solana::banking_stage::create_test_recorder;
use solana::blocktree::{get_tmp_ledger_path, Blocktree}; use solana::blocktree::{create_new_tmp_ledger, Blocktree};
use solana::blocktree_processor::BankForksInfo;
use solana::cluster_info::{ClusterInfo, Node}; use solana::cluster_info::{ClusterInfo, Node};
use solana::entry::next_entry_mut; use solana::entry::next_entry_mut;
use solana::entry::EntrySlice; use solana::entry::EntrySlice;
use solana::fullnode;
use solana::gossip_service::GossipService; use solana::gossip_service::GossipService;
use solana::packet::index_blobs; use solana::packet::index_blobs;
use solana::rpc_subscriptions::RpcSubscriptions; use solana::rpc_subscriptions::RpcSubscriptions;
@ -17,7 +16,6 @@ use solana::storage_stage::StorageState;
use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT; use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT;
use solana::streamer; use solana::streamer;
use solana::tvu::{Sockets, Tvu}; use solana::tvu::{Sockets, Tvu};
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::system_transaction::SystemTransaction;
@ -74,44 +72,37 @@ fn test_replay() {
r_responder, r_responder,
); );
let starting_balance = 10_000; let total_balance = 10_000;
let (mut genesis_block, mint_keypair) = GenesisBlock::new(starting_balance); let leader_balance = 100;
let starting_mint_balance = total_balance - leader_balance;
let (genesis_block, mint_keypair) =
GenesisBlock::new_with_leader(total_balance, &leader.info.id, leader_balance);
let (blocktree_path, blockhash) = create_new_tmp_ledger!(&genesis_block);
// TODO: Fix this test so it always works with the default GenesisBlock configuration
genesis_block.ticks_per_slot = 64;
let ticks_per_slot = genesis_block.ticks_per_slot;
let tvu_addr = target1.info.tvu; let tvu_addr = target1.info.tvu;
let bank = Bank::new(&genesis_block); let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) =
let blockhash = bank.last_blockhash(); fullnode::new_banks_from_blocktree(&blocktree_path, None);
let bank_forks = BankForks::new(0, bank);
let bank_forks_info = vec![BankForksInfo {
bank_slot: 0,
entry_height: 0,
}];
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), starting_balance); assert_eq!(
bank.get_balance(&mint_keypair.pubkey()),
starting_mint_balance
);
// start cluster_info1 // start cluster_info1
let bank_forks = Arc::new(RwLock::new(bank_forks));
let mut cluster_info1 = ClusterInfo::new_with_invalid_keypair(target1.info.clone()); let mut cluster_info1 = ClusterInfo::new_with_invalid_keypair(target1.info.clone());
cluster_info1.insert_info(leader.info.clone()); cluster_info1.insert_info(leader.info.clone());
let cref1 = Arc::new(RwLock::new(cluster_info1)); let cref1 = Arc::new(RwLock::new(cluster_info1));
let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, &exit); let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, &exit);
let blocktree_path = get_tmp_ledger_path!();
let (blocktree, ledger_signal_receiver) =
Blocktree::open_with_config_signal(&blocktree_path, ticks_per_slot)
.expect("Expected to successfully open ledger");
let voting_keypair = Keypair::new(); let voting_keypair = Keypair::new();
let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) = let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank); create_test_recorder(&bank);
let tvu = Tvu::new( let tvu = Tvu::new(
&voting_keypair.pubkey(), &voting_keypair.pubkey(),
Some(Arc::new(voting_keypair)), Some(Arc::new(voting_keypair)),
&Arc::new(RwLock::new(bank_forks)), &bank_forks,
&bank_forks_info, &bank_forks_info,
&cref1, &cref1,
{ {
@ -131,7 +122,7 @@ fn test_replay() {
&exit, &exit,
); );
let mut alice_ref_balance = starting_balance; let mut mint_ref_balance = starting_mint_balance;
let mut msgs = Vec::new(); let mut msgs = Vec::new();
let mut blob_idx = 0; let mut blob_idx = 0;
let num_transfers = 10; let num_transfers = 10;
@ -153,12 +144,12 @@ fn test_replay() {
let entry1 = next_entry_mut(&mut cur_hash, i + num_transfers, vec![tx0]); let entry1 = next_entry_mut(&mut cur_hash, i + num_transfers, vec![tx0]);
let entry_tick2 = next_entry_mut(&mut cur_hash, i + 1, vec![]); let entry_tick2 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
alice_ref_balance -= transfer_amount; mint_ref_balance -= transfer_amount;
transfer_amount -= 1; // Sneaky: change transfer_amount slightly to avoid DuplicateSignature errors transfer_amount -= 1; // Sneaky: change transfer_amount slightly to avoid DuplicateSignature errors
let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2]; let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2];
let blobs = entries.to_shared_blobs(); let blobs = entries.to_shared_blobs();
index_blobs(&blobs, &leader.info.id, blob_idx, 0, 0); index_blobs(&blobs, &leader.info.id, blob_idx, 1, 0);
blob_idx += blobs.len() as u64; blob_idx += blobs.len() as u64;
blobs blobs
.iter() .iter()
@ -176,11 +167,12 @@ fn test_replay() {
trace!("got msg"); trace!("got msg");
} }
let alice_balance = bank.get_balance(&mint_keypair.pubkey()); let working_bank = bank_forks.read().unwrap().working_bank();
assert_eq!(alice_balance, alice_ref_balance); let final_mint_balance = working_bank.get_balance(&mint_keypair.pubkey());
assert_eq!(final_mint_balance, mint_ref_balance);
let bob_balance = bank.get_balance(&bob_keypair.pubkey()); let bob_balance = working_bank.get_balance(&bob_keypair.pubkey());
assert_eq!(bob_balance, starting_balance - alice_ref_balance); assert_eq!(bob_balance, starting_mint_balance - mint_ref_balance);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
poh_service_exit.store(true, Ordering::Relaxed); poh_service_exit.store(true, Ordering::Relaxed);