Don't vote for empty leader transmissions (#3248)

* Don't vote for empty leader transmissions

* Add is_delta flag to bank to detect empty leader transmissions

* Plumb new is_votable flag through replay stage

* Fix PohRecorder tests

* Change is_delta to AtomicBool to avoid making Bank references mutable

* Reset start slot in poh_recorder when working bank is cleared, so that connsecutive TPU's will start from the correct place

* Use proper max tick height calculation

* Test for not voting on empty transmission

* tests for is_votable
This commit is contained in:
carllin
2019-03-13 14:06:12 -07:00
committed by GitHub
parent 242bcf44db
commit af03df38b9
3 changed files with 141 additions and 49 deletions

View File

@ -73,19 +73,6 @@ impl PohRecorder {
// synchronize PoH with a bank
pub fn reset(&mut self, tick_height: u64, blockhash: Hash, start_slot: u64) {
self.clear_bank();
let existing = self.tick_cache.iter().any(|(entry, entry_tick_height)| {
if entry.hash == blockhash {
assert_eq!(*entry_tick_height, tick_height);
}
entry.hash == blockhash
});
if existing {
info!(
"reset skipped for: {},{}",
self.poh.hash, self.poh.tick_height
);
return;
}
let mut cache = vec![];
info!(
"reset poh from: {},{} to: {},{}",
@ -159,6 +146,7 @@ impl PohRecorder {
"poh_record: max_tick_height reached, setting working bank {} to None",
working_bank.bank.slot()
);
self.start_slot = working_bank.max_tick_height / working_bank.bank.ticks_per_slot();
self.clear_bank();
}
if e.is_err() {
@ -461,7 +449,7 @@ mod tests {
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 2);
poh_recorder.reset(poh_recorder.poh.tick_height, poh_recorder.poh.hash, 0);
assert_eq!(poh_recorder.tick_cache.len(), 2);
assert_eq!(poh_recorder.tick_cache.len(), 0);
}
#[test]
@ -475,28 +463,7 @@ mod tests {
poh_recorder.tick_cache[0].0.hash,
0,
);
assert_eq!(poh_recorder.tick_cache.len(), 2);
poh_recorder.reset(
poh_recorder.tick_cache[1].1,
poh_recorder.tick_cache[1].0.hash,
0,
);
assert_eq!(poh_recorder.tick_cache.len(), 2);
}
#[test]
#[should_panic]
fn test_reset_with_cached_bad_height() {
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0);
poh_recorder.tick();
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 2);
//mixed up heights
poh_recorder.reset(
poh_recorder.tick_cache[0].1,
poh_recorder.tick_cache[1].0.hash,
0,
);
assert_eq!(poh_recorder.tick_cache.len(), 0);
}
#[test]
@ -539,4 +506,35 @@ mod tests {
poh_recorder.clear_bank();
assert!(receiver.try_recv().is_ok());
}
#[test]
fn test_poh_recorder_reset_start_slot() {
let ticks_per_slot = 5;
let (mut genesis_block, _mint_keypair) = GenesisBlock::new(2);
genesis_block.ticks_per_slot = ticks_per_slot;
let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0);
let end_slot = 3;
let max_tick_height = (end_slot + 1) * ticks_per_slot - 1;
let working_bank = WorkingBank {
bank,
min_tick_height: 1,
max_tick_height,
};
poh_recorder.set_working_bank(working_bank);
for _ in 0..max_tick_height {
poh_recorder.tick();
}
let tx = test_tx();
let h1 = hash(b"hello world!");
assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err());
assert!(poh_recorder.working_bank.is_none());
// Make sure the starting slot is updated
assert_eq!(poh_recorder.start_slot(), end_slot);
}
}

View File

@ -20,7 +20,7 @@ use solana_sdk::timing::duration_as_ms;
use solana_vote_api::vote_transaction::VoteTransaction;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
@ -108,15 +108,13 @@ impl ReplayStage {
}
let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1;
if bank.tick_height() == max_tick_height {
bank.freeze();
info!("bank frozen {}", bank.slot());
progress.remove(bank_slot);
if let Err(e) =
slot_full_sender.send((bank.slot(), bank.collector_id()))
{
info!("{} slot_full alert failed: {:?}", my_id, e);
}
votable.push(bank);
Self::process_completed_bank(
&my_id,
bank,
&mut progress,
&mut votable,
&slot_full_sender,
);
}
}
@ -315,6 +313,24 @@ impl ReplayStage {
Ok(())
}
fn process_completed_bank(
my_id: &Pubkey,
bank: Arc<Bank>,
progress: &mut HashMap<u64, (Hash, usize)>,
votable: &mut Vec<Arc<Bank>>,
slot_full_sender: &Sender<(u64, Pubkey)>,
) {
bank.freeze();
info!("bank frozen {}", bank.slot());
progress.remove(&bank.slot());
if let Err(e) = slot_full_sender.send((bank.slot(), bank.collector_id())) {
info!("{} slot_full alert failed: {:?}", my_id, e);
}
if bank.is_votable() {
votable.push(bank);
}
}
fn generate_new_bank_forks(blocktree: &Blocktree, forks: &mut BankForks) {
// Find the next slot that chains to the old slot
let frozen_banks = forks.frozen_banks();
@ -439,6 +455,42 @@ mod test {
let _ignored = remove_dir_all(&my_ledger_path);
}
#[test]
fn test_no_vote_empty_transmission() {
let genesis_block = GenesisBlock::new(10_000).0;
let bank = Arc::new(Bank::new(&genesis_block));
let mut blockhash = bank.last_blockhash();
let mut entries = Vec::new();
for _ in 0..genesis_block.ticks_per_slot {
let entry = next_entry_mut(&mut blockhash, 1, vec![]); //just ticks
entries.push(entry);
}
let (sender, _receiver) = channel();
let mut progress = HashMap::new();
let (forward_entry_sender, _forward_entry_receiver) = channel();
ReplayStage::replay_entries_into_bank(
&bank,
entries.clone(),
&mut progress,
&forward_entry_sender,
0,
)
.unwrap();
let mut votable = vec![];
ReplayStage::process_completed_bank(
&Pubkey::default(),
bank,
&mut progress,
&mut votable,
&sender,
);
assert!(progress.is_empty());
// Don't vote on slot that only contained ticks
assert!(votable.is_empty());
}
#[test]
fn test_replay_stage_poh_ok_entry_receiver() {
let (forward_entry_sender, forward_entry_receiver) = channel();