From f886b3b12bd4054247c7e801d08de8bd0dc88f69 Mon Sep 17 00:00:00 2001 From: carllin Date: Fri, 29 Mar 2019 20:00:36 -0700 Subject: [PATCH] Fix resetting PohRecorder to wrong bank (#3553) * Check whether future slot already has transmission --- core/benches/banking_stage.rs | 118 +-- core/src/banking_stage.rs | 448 ++++++----- core/src/fullnode.rs | 5 +- core/src/leader_schedule_utils.rs | 101 ++- core/src/poh_recorder.rs | 1183 ++++++++++++++++------------- core/src/poh_service.rs | 147 ++-- core/src/replay_stage.rs | 40 +- core/src/tvu.rs | 6 +- core/tests/tvu.rs | 173 ++--- 9 files changed, 1260 insertions(+), 961 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index feec976a3f..7ad63a05f1 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -1,10 +1,13 @@ #![feature(test)] extern crate test; +#[macro_use] +extern crate solana; use rand::{thread_rng, Rng}; use rayon::prelude::*; use solana::banking_stage::{create_test_recorder, BankingStage}; +use solana::blocktree::{get_tmp_ledger_path, Blocktree}; use solana::cluster_info::ClusterInfo; use solana::cluster_info::Node; use solana::packet::to_packets_chunked; @@ -104,33 +107,41 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { (x, iter::repeat(1).take(len).collect()) }) .collect(); - let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank); - let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); - let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); - poh_recorder.lock().unwrap().set_bank(&bank); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = Arc::new( + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), + ); + let (exit, poh_recorder, poh_service, signal_receiver) = + create_test_recorder(&bank, &blocktree); + let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); + poh_recorder.lock().unwrap().set_bank(&bank); - let mut id = genesis_block.hash(); - for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) { - id = hash(&id.as_ref()); - bank.register_tick(&id); - } - - let half_len = verified.len() / 2; - let mut start = 0; - bencher.iter(move || { - // make sure the transactions are still valid - bank.register_tick(&genesis_block.hash()); - for v in verified[start..start + half_len].chunks(verified.len() / num_threads) { - verified_sender.send(v.to_vec()).unwrap(); + let mut id = genesis_block.hash(); + for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) { + id = hash(&id.as_ref()); + bank.register_tick(&id); } - check_txs(&signal_receiver, txes / 2); - bank.clear_signatures(); - start += half_len; - start %= verified.len(); - }); - exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); + + let half_len = verified.len() / 2; + let mut start = 0; + bencher.iter(move || { + // make sure the transactions are still valid + bank.register_tick(&genesis_block.hash()); + for v in verified[start..start + half_len].chunks(verified.len() / num_threads) { + verified_sender.send(v.to_vec()).unwrap(); + } + check_txs(&signal_receiver, txes / 2); + bank.clear_signatures(); + start += half_len; + start %= verified.len(); + }); + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[bench] @@ -211,31 +222,40 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { (x, iter::repeat(1).take(len).collect()) }) .collect(); - let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank); - let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); - let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); - poh_recorder.lock().unwrap().set_bank(&bank); - let mut id = genesis_block.hash(); - for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) { - id = hash(&id.as_ref()); - bank.register_tick(&id); - } + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = Arc::new( + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), + ); + let (exit, poh_recorder, poh_service, signal_receiver) = + create_test_recorder(&bank, &blocktree); + let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); + poh_recorder.lock().unwrap().set_bank(&bank); - let half_len = verified.len() / 2; - let mut start = 0; - bencher.iter(move || { - // make sure the transactions are still valid - bank.register_tick(&genesis_block.hash()); - for v in verified[start..start + half_len].chunks(verified.len() / num_threads) { - verified_sender.send(v.to_vec()).unwrap(); + let mut id = genesis_block.hash(); + for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) { + id = hash(&id.as_ref()); + bank.register_tick(&id); } - check_txs(&signal_receiver, txes / 2); - bank.clear_signatures(); - start += half_len; - start %= verified.len(); - }); - exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); + + let half_len = verified.len() / 2; + let mut start = 0; + bencher.iter(move || { + // make sure the transactions are still valid + bank.register_tick(&genesis_block.hash()); + for v in verified[start..start + half_len].chunks(verified.len() / num_threads) { + verified_sender.send(v.to_vec()).unwrap(); + } + check_txs(&signal_receiver, txes / 2); + bank.clear_signatures(); + start += half_len; + start %= verified.len(); + }); + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + Blocktree::destroy(&ledger_path).unwrap(); } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 5be3308c94..608ea4470a 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1,7 +1,7 @@ //! The `banking_stage` processes Transaction messages. It is intended to be used //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. - +use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; use crate::entry; use crate::entry::{hash_transactions, Entry}; @@ -472,6 +472,7 @@ impl Service for BankingStage { pub fn create_test_recorder( bank: &Arc, + blocktree: &Arc, ) -> ( Arc, Arc>, @@ -486,6 +487,7 @@ pub fn create_test_recorder( Some(4), bank.ticks_per_slot(), &Pubkey::default(), + blocktree, ); poh_recorder.set_bank(&bank); @@ -498,10 +500,12 @@ pub fn create_test_recorder( #[cfg(test)] mod tests { use super::*; + use crate::blocktree::get_tmp_ledger_path; use crate::cluster_info::Node; use crate::entry::EntrySlice; use crate::packet::to_packets; use crate::poh_recorder::WorkingBank; + use crate::{get_tmp_ledger_path, tmp_ledger_name}; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::instruction::InstructionError; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -514,14 +518,22 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let (verified_sender, verified_receiver) = channel(); - let (exit, poh_recorder, poh_service, _entry_receiever) = create_test_recorder(&bank); - let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); - let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); - drop(verified_sender); - exit.store(true, Ordering::Relaxed); - banking_stage.join().unwrap(); - poh_service.join().unwrap(); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = Arc::new( + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), + ); + let (exit, poh_recorder, poh_service, _entry_receiever) = + create_test_recorder(&bank, &blocktree); + let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); + drop(verified_sender); + exit.store(true, Ordering::Relaxed); + banking_stage.join().unwrap(); + poh_service.join().unwrap(); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] @@ -532,27 +544,35 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = channel(); - let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); - let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); - let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); - trace!("sending bank"); - sleep(Duration::from_millis(600)); - drop(verified_sender); - exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); - drop(poh_recorder); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = Arc::new( + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), + ); + let (exit, poh_recorder, poh_service, entry_receiver) = + create_test_recorder(&bank, &blocktree); + let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); + trace!("sending bank"); + sleep(Duration::from_millis(600)); + drop(verified_sender); + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + drop(poh_recorder); - trace!("getting entries"); - let entries: Vec<_> = entry_receiver - .iter() - .flat_map(|x| x.1.into_iter().map(|e| e.0)) - .collect(); - trace!("done"); - assert_eq!(entries.len(), genesis_block.ticks_per_slot as usize - 1); - assert!(entries.verify(&start_hash)); - assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash()); - banking_stage.join().unwrap(); + trace!("getting entries"); + let entries: Vec<_> = entry_receiver + .iter() + .flat_map(|x| x.1.into_iter().map(|e| e.0)) + .collect(); + trace!("done"); + assert_eq!(entries.len(), genesis_block.ticks_per_slot as usize - 1); + assert!(entries.verify(&start_hash)); + assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash()); + banking_stage.join().unwrap(); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] @@ -562,76 +582,84 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = channel(); - let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); - let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); - let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = Arc::new( + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), + ); + let (exit, poh_recorder, poh_service, entry_receiver) = + create_test_recorder(&bank, &blocktree); + let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); - // fund another account so we can send 2 good transactions in a single batch. - let keypair = Keypair::new(); - let fund_tx = - SystemTransaction::new_account(&mint_keypair, &keypair.pubkey(), 2, start_hash, 0); - bank.process_transaction(&fund_tx).unwrap(); + // fund another account so we can send 2 good transactions in a single batch. + let keypair = Keypair::new(); + let fund_tx = + SystemTransaction::new_account(&mint_keypair, &keypair.pubkey(), 2, start_hash, 0); + bank.process_transaction(&fund_tx).unwrap(); - // good tx - let to = Keypair::new().pubkey(); - let tx = SystemTransaction::new_account(&mint_keypair, &to, 1, start_hash, 0); + // good tx + let to = Keypair::new().pubkey(); + let tx = SystemTransaction::new_account(&mint_keypair, &to, 1, start_hash, 0); - // good tx, but no verify - let to2 = Keypair::new().pubkey(); - let tx_no_ver = SystemTransaction::new_account(&keypair, &to2, 2, start_hash, 0); + // good tx, but no verify + let to2 = Keypair::new().pubkey(); + let tx_no_ver = SystemTransaction::new_account(&keypair, &to2, 2, start_hash, 0); - // bad tx, AccountNotFound - let keypair = Keypair::new(); - let to3 = Keypair::new().pubkey(); - let tx_anf = SystemTransaction::new_account(&keypair, &to3, 1, start_hash, 0); + // bad tx, AccountNotFound + let keypair = Keypair::new(); + let to3 = Keypair::new().pubkey(); + let tx_anf = SystemTransaction::new_account(&keypair, &to3, 1, start_hash, 0); - // send 'em over - let packets = to_packets(&[tx_no_ver, tx_anf, tx]); + // send 'em over + let packets = to_packets(&[tx_no_ver, tx_anf, tx]); - // glad they all fit - assert_eq!(packets.len(), 1); - verified_sender // no_ver, anf, tx - .send(vec![(packets[0].clone(), vec![0u8, 1u8, 1u8])]) - .unwrap(); + // glad they all fit + assert_eq!(packets.len(), 1); + verified_sender // no_ver, anf, tx + .send(vec![(packets[0].clone(), vec![0u8, 1u8, 1u8])]) + .unwrap(); - drop(verified_sender); - exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); - drop(poh_recorder); + drop(verified_sender); + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + drop(poh_recorder); - let mut blockhash = start_hash; - let bank = Bank::new(&genesis_block); - bank.process_transaction(&fund_tx).unwrap(); - //receive entries + ticks - for _ in 0..10 { - let ventries: Vec> = entry_receiver - .iter() - .map(|x| x.1.into_iter().map(|e| e.0).collect()) - .collect(); + let mut blockhash = start_hash; + let bank = Bank::new(&genesis_block); + bank.process_transaction(&fund_tx).unwrap(); + //receive entries + ticks + for _ in 0..10 { + let ventries: Vec> = entry_receiver + .iter() + .map(|x| x.1.into_iter().map(|e| e.0).collect()) + .collect(); - for entries in &ventries { - for entry in entries { - bank.process_transactions(&entry.transactions) - .iter() - .for_each(|x| assert_eq!(*x, Ok(()))); + for entries in &ventries { + for entry in entries { + bank.process_transactions(&entry.transactions) + .iter() + .for_each(|x| assert_eq!(*x, Ok(()))); + } + assert!(entries.verify(&blockhash)); + blockhash = entries.last().unwrap().hash; } - assert!(entries.verify(&blockhash)); - blockhash = entries.last().unwrap().hash; + + if bank.get_balance(&to) == 1 { + break; + } + + sleep(Duration::from_millis(200)); } - if bank.get_balance(&to) == 1 { - break; - } + assert_eq!(bank.get_balance(&to), 1); + assert_eq!(bank.get_balance(&to2), 0); - sleep(Duration::from_millis(200)); + drop(entry_receiver); + banking_stage.join().unwrap(); } - - assert_eq!(bank.get_balance(&to), 1); - assert_eq!(bank.get_balance(&to2), 0); - - drop(entry_receiver); - banking_stage.join().unwrap(); + Blocktree::destroy(&ledger_path).unwrap(); } #[test] @@ -671,43 +699,57 @@ mod tests { .send(vec![(packets[0].clone(), vec![1u8])]) .unwrap(); - let entry_receiver = { - // start a banking_stage to eat verified receiver - let bank = Arc::new(Bank::new(&genesis_block)); - let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); - let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); - let _banking_stage = - BankingStage::new_num_threads(&cluster_info, &poh_recorder, verified_receiver, 1); + let ledger_path = get_tmp_ledger_path!(); + { + let entry_receiver = { + // start a banking_stage to eat verified receiver + let bank = Arc::new(Bank::new(&genesis_block)); + let blocktree = Arc::new( + Blocktree::open(&ledger_path) + .expect("Expected to be able to open database ledger"), + ); + let (exit, poh_recorder, poh_service, entry_receiver) = + create_test_recorder(&bank, &blocktree); + let cluster_info = + ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + let _banking_stage = BankingStage::new_num_threads( + &cluster_info, + &poh_recorder, + verified_receiver, + 1, + ); - // wait for banking_stage to eat the packets - while bank.get_balance(&alice.pubkey()) != 1 { - sleep(Duration::from_millis(100)); - } - exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); - entry_receiver - }; - drop(verified_sender); + // wait for banking_stage to eat the packets + while bank.get_balance(&alice.pubkey()) != 1 { + sleep(Duration::from_millis(100)); + } + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + entry_receiver + }; + drop(verified_sender); - // consume the entire entry_receiver, feed it into a new bank - // check that the balance is what we expect. - let entries: Vec<_> = entry_receiver - .iter() - .flat_map(|x| x.1.into_iter().map(|e| e.0)) - .collect(); - - let bank = Bank::new(&genesis_block); - for entry in &entries { - bank.process_transactions(&entry.transactions) + // consume the entire entry_receiver, feed it into a new bank + // check that the balance is what we expect. + let entries: Vec<_> = entry_receiver .iter() - .for_each(|x| assert_eq!(*x, Ok(()))); - } + .flat_map(|x| x.1.into_iter().map(|e| e.0)) + .collect(); - // Assert the user holds one lamport, not two. If the stage only outputs one - // entry, then the second transaction will be rejected, because it drives - // the account balance below zero before the credit is added. - assert_eq!(bank.get_balance(&alice.pubkey()), 1); + let bank = Bank::new(&genesis_block); + for entry in &entries { + bank.process_transactions(&entry.transactions) + .iter() + .for_each(|x| assert_eq!(*x, Ok(()))); + } + + // Assert the user holds one lamport, not two. If the stage only outputs one + // entry, then the second transaction will be rejected, because it drives + // the account balance below zero before the credit is added. + assert_eq!(bank.get_balance(&alice.pubkey()), 1); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] @@ -719,47 +761,53 @@ mod tests { min_tick_height: bank.tick_height(), max_tick_height: std::u64::MAX, }; + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (poh_recorder, entry_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + None, + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); - let (poh_recorder, entry_receiver) = PohRecorder::new( - bank.tick_height(), - bank.last_blockhash(), - bank.slot(), - None, - bank.ticks_per_slot(), - &Pubkey::default(), - ); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + poh_recorder.lock().unwrap().set_working_bank(working_bank); + let pubkey = Keypair::new().pubkey(); - poh_recorder.lock().unwrap().set_working_bank(working_bank); - let pubkey = Keypair::new().pubkey(); + let transactions = vec![ + SystemTransaction::new_move(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + SystemTransaction::new_move(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + ]; - let transactions = vec![ - SystemTransaction::new_move(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), - SystemTransaction::new_move(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), - ]; + let mut results = vec![Ok(()), Ok(())]; + BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) + .unwrap(); + let (_, entries) = entry_receiver.recv().unwrap(); + assert_eq!(entries[0].0.transactions.len(), transactions.len()); - let mut results = vec![Ok(()), Ok(())]; - BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) - .unwrap(); - let (_, entries) = entry_receiver.recv().unwrap(); - assert_eq!(entries[0].0.transactions.len(), transactions.len()); + // InstructionErrors should still be recorded + results[0] = Err(TransactionError::InstructionError( + 1, + InstructionError::new_result_with_negative_lamports(), + )); + BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) + .unwrap(); + let (_, entries) = entry_receiver.recv().unwrap(); + assert_eq!(entries[0].0.transactions.len(), transactions.len()); - // InstructionErrors should still be recorded - results[0] = Err(TransactionError::InstructionError( - 1, - InstructionError::new_result_with_negative_lamports(), - )); - BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) - .unwrap(); - let (_, entries) = entry_receiver.recv().unwrap(); - assert_eq!(entries[0].0.transactions.len(), transactions.len()); - - // Other TransactionErrors should not be recorded - results[0] = Err(TransactionError::AccountNotFound); - BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) - .unwrap(); - let (_, entries) = entry_receiver.recv().unwrap(); - assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1); + // Other TransactionErrors should not be recorded + results[0] = Err(TransactionError::AccountNotFound); + BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) + .unwrap(); + let (_, entries) = entry_receiver.recv().unwrap(); + assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] @@ -782,53 +830,61 @@ mod tests { min_tick_height: bank.tick_height(), max_tick_height: bank.tick_height() + 1, }; - let (poh_recorder, entry_receiver) = PohRecorder::new( - bank.tick_height(), - bank.last_blockhash(), - bank.slot(), - Some(4), - bank.ticks_per_slot(), - &pubkey, - ); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (poh_recorder, entry_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + Some(4), + bank.ticks_per_slot(), + &pubkey, + &Arc::new(blocktree), + ); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); - poh_recorder.lock().unwrap().set_working_bank(working_bank); + poh_recorder.lock().unwrap().set_working_bank(working_bank); - BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder).unwrap(); - poh_recorder.lock().unwrap().tick(); + BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder) + .unwrap(); + poh_recorder.lock().unwrap().tick(); - let mut done = false; - // read entries until I find mine, might be ticks... - while let Ok((_, entries)) = entry_receiver.recv() { - for (entry, _) in entries { - if !entry.is_tick() { - trace!("got entry"); - assert_eq!(entry.transactions.len(), transactions.len()); - assert_eq!(bank.get_balance(&pubkey), 1); - done = true; + let mut done = false; + // read entries until I find mine, might be ticks... + while let Ok((_, entries)) = entry_receiver.recv() { + for (entry, _) in entries { + if !entry.is_tick() { + trace!("got entry"); + assert_eq!(entry.transactions.len(), transactions.len()); + assert_eq!(bank.get_balance(&pubkey), 1); + done = true; + } + } + if done { + break; } } - if done { - break; - } + trace!("done ticking"); + + assert_eq!(done, true); + + let transactions = vec![SystemTransaction::new_move( + &mint_keypair, + &pubkey, + 2, + genesis_block.hash(), + 0, + )]; + + assert_matches!( + BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder), + Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) + ); + + assert_eq!(bank.get_balance(&pubkey), 1); } - trace!("done ticking"); - - assert_eq!(done, true); - - let transactions = vec![SystemTransaction::new_move( - &mint_keypair, - &pubkey, - 2, - genesis_block.hash(), - 0, - )]; - - assert_matches!( - BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder), - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) - ); - - assert_eq!(bank.get_balance(&pubkey), 1); + Blocktree::destroy(&ledger_path).unwrap(); } } diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index 5ee36acd04..b509cec2fa 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -105,13 +105,15 @@ impl Fullnode { bank.tick_height(), bank.last_blockhash(), ); + let blocktree = Arc::new(blocktree); let (poh_recorder, entry_receiver) = PohRecorder::new( bank.tick_height(), bank.last_blockhash(), bank.slot(), - leader_schedule_utils::next_leader_slot(&id, bank.slot(), &bank), + leader_schedule_utils::next_leader_slot(&id, bank.slot(), &bank, Some(&blocktree)), bank.ticks_per_slot(), &id, + &blocktree, ); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit); @@ -130,7 +132,6 @@ impl Fullnode { node.sockets.gossip.local_addr().unwrap() ); - let blocktree = Arc::new(blocktree); let bank_forks = Arc::new(RwLock::new(bank_forks)); node.info.wallclock = timestamp(); diff --git a/core/src/leader_schedule_utils.rs b/core/src/leader_schedule_utils.rs index 925a94e590..f7918519e9 100644 --- a/core/src/leader_schedule_utils.rs +++ b/core/src/leader_schedule_utils.rs @@ -1,3 +1,4 @@ +use crate::blocktree::Blocktree; use crate::leader_schedule::LeaderSchedule; use crate::staking_utils; use solana_runtime::bank::Bank; @@ -44,7 +45,12 @@ pub fn slot_leader_at(slot: u64, bank: &Bank) -> Option { } /// Return the next slot after the given current_slot that the given node will be leader -pub fn next_leader_slot(pubkey: &Pubkey, mut current_slot: u64, bank: &Bank) -> Option { +pub fn next_leader_slot( + pubkey: &Pubkey, + mut current_slot: u64, + bank: &Bank, + blocktree: Option<&Blocktree>, +) -> Option { let (mut epoch, mut start_index) = bank.get_epoch_and_slot_index(current_slot + 1); while let Some(leader_schedule) = leader_schedule(epoch, bank) { // clippy thinks I should do this: @@ -59,6 +65,15 @@ pub fn next_leader_slot(pubkey: &Pubkey, mut current_slot: u64, bank: &Bank) -> for i in start_index..bank.get_slots_in_epoch(epoch) { current_slot += 1; if *pubkey == leader_schedule[i] { + if let Some(blocktree) = blocktree { + if let Some(meta) = blocktree.meta(current_slot).unwrap() { + // We have already sent a blob for this slot, so skip it + if meta.received > 0 { + continue; + } + } + } + return Some(current_slot); } } @@ -82,6 +97,8 @@ pub fn tick_height_to_slot(ticks_per_slot: u64, tick_height: u64) -> u64 { #[cfg(test)] mod tests { use super::*; + use crate::blocktree::get_tmp_ledger_path; + use crate::blocktree::tests::make_slot_entries; use crate::staking_utils; use crate::voting_keypair::tests::new_vote_account_with_delegate; use solana_sdk::genesis_block::{GenesisBlock, BOOTSTRAP_LEADER_LAMPORTS}; @@ -101,13 +118,14 @@ mod tests { let bank = Bank::new(&genesis_block); assert_eq!(slot_leader_at(bank.slot(), &bank).unwrap(), pubkey); - assert_eq!(next_leader_slot(&pubkey, 0, &bank), Some(1)); - assert_eq!(next_leader_slot(&pubkey, 1, &bank), Some(2)); + assert_eq!(next_leader_slot(&pubkey, 0, &bank, None), Some(1)); + assert_eq!(next_leader_slot(&pubkey, 1, &bank, None), Some(2)); assert_eq!( next_leader_slot( &pubkey, 2 * genesis_block.slots_per_epoch - 1, // no schedule generated for epoch 2 - &bank + &bank, + None ), None ); @@ -116,12 +134,81 @@ mod tests { next_leader_slot( &Keypair::new().pubkey(), // not in leader_schedule 0, - &bank + &bank, + None ), None ); } + #[test] + fn test_next_leader_slot_blocktree() { + let pubkey = Keypair::new().pubkey(); + let mut genesis_block = GenesisBlock::new_with_leader( + BOOTSTRAP_LEADER_LAMPORTS, + &pubkey, + BOOTSTRAP_LEADER_LAMPORTS, + ) + .0; + genesis_block.epoch_warmup = false; + + let bank = Bank::new(&genesis_block); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = Arc::new( + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), + ); + + assert_eq!(slot_leader_at(bank.slot(), &bank).unwrap(), pubkey); + // Check that the next leader slot after 0 is slot 1 + assert_eq!( + next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)), + Some(1) + ); + + // Write a blob into slot 2 that chains to slot 1, + // but slot 1 is empty so should not be skipped + let (blobs, _) = make_slot_entries(2, 1, 1); + blocktree.write_blobs(&blobs[..]).unwrap(); + assert_eq!( + next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)), + Some(1) + ); + + // Write a blob into slot 1 + let (blobs, _) = make_slot_entries(1, 0, 1); + + // Check that slot 1 and 2 are skipped + blocktree.write_blobs(&blobs[..]).unwrap(); + assert_eq!( + next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)), + Some(3) + ); + + // Integrity checks + assert_eq!( + next_leader_slot( + &pubkey, + 2 * genesis_block.slots_per_epoch - 1, // no schedule generated for epoch 2 + &bank, + Some(&blocktree) + ), + None + ); + + assert_eq!( + next_leader_slot( + &Keypair::new().pubkey(), // not in leader_schedule + 0, + &bank, + Some(&blocktree) + ), + None + ); + } + Blocktree::destroy(&ledger_path).unwrap(); + } + #[test] fn test_next_leader_slot_next_epoch() { let pubkey = Keypair::new().pubkey(); @@ -169,8 +256,8 @@ mod tests { expected_slot += index; assert_eq!( - next_leader_slot(&delegate_id, 0, &bank), - Some(expected_slot) + next_leader_slot(&delegate_id, 0, &bank, None), + Some(expected_slot), ); } diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 2e389ae842..2838d5ffd2 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -10,6 +10,7 @@ //! For Entries: //! * recorded entry must be >= WorkingBank::min_tick_height && entry must be < WorkingBank::man_tick_height //! +use crate::blocktree::Blocktree; use crate::entry::Entry; use crate::leader_schedule_utils; use crate::poh::Poh; @@ -51,14 +52,19 @@ pub struct PohRecorder { last_leader_tick: Option, max_last_leader_grace_ticks: u64, id: Pubkey, + blocktree: Arc, } impl PohRecorder { pub fn clear_bank(&mut self) { if let Some(working_bank) = self.working_bank.take() { let bank = working_bank.bank; - let next_leader_slot = - leader_schedule_utils::next_leader_slot(&self.id, bank.slot(), &bank); + let next_leader_slot = leader_schedule_utils::next_leader_slot( + &self.id, + bank.slot(), + &bank, + Some(&self.blocktree), + ); let (start_leader_at_tick, last_leader_tick) = Self::compute_leader_slot_ticks( &next_leader_slot, bank.ticks_per_slot(), @@ -273,6 +279,7 @@ impl PohRecorder { my_leader_slot_index: Option, ticks_per_slot: u64, id: &Pubkey, + blocktree: &Arc, ) -> (Self, Receiver) { let poh = Poh::new(last_entry_hash, tick_height); let (sender, receiver) = channel(); @@ -295,6 +302,7 @@ impl PohRecorder { last_leader_tick, max_last_leader_grace_ticks, id: *id, + blocktree: blocktree.clone(), }, receiver, ) @@ -345,6 +353,7 @@ impl PohRecorder { #[cfg(test)] mod tests { use super::*; + use crate::blocktree::{get_tmp_ledger_path, Blocktree}; use crate::test_tx::test_tx; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::hash; @@ -355,620 +364,750 @@ mod tests { #[test] fn test_poh_recorder_no_zero_tick() { let prev_hash = Hash::default(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( - 0, - prev_hash, - 0, - Some(4), - DEFAULT_TICKS_PER_SLOT, - &Pubkey::default(), - ); - poh_recorder.tick(); - assert_eq!(poh_recorder.tick_cache.len(), 1); - assert_eq!(poh_recorder.tick_cache[0].1, 1); - assert_eq!(poh_recorder.poh.tick_height, 1); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + Some(4), + DEFAULT_TICKS_PER_SLOT, + &Pubkey::default(), + &Arc::new(blocktree), + ); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.len(), 1); + assert_eq!(poh_recorder.tick_cache[0].1, 1); + assert_eq!(poh_recorder.poh.tick_height, 1); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_poh_recorder_tick_height_is_last_tick() { let prev_hash = Hash::default(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( - 0, - prev_hash, - 0, - Some(4), - DEFAULT_TICKS_PER_SLOT, - &Pubkey::default(), - ); - poh_recorder.tick(); - poh_recorder.tick(); - assert_eq!(poh_recorder.tick_cache.len(), 2); - assert_eq!(poh_recorder.tick_cache[1].1, 2); - assert_eq!(poh_recorder.poh.tick_height, 2); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + Some(4), + DEFAULT_TICKS_PER_SLOT, + &Pubkey::default(), + &Arc::new(blocktree), + ); + poh_recorder.tick(); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.len(), 2); + assert_eq!(poh_recorder.tick_cache[1].1, 2); + assert_eq!(poh_recorder.poh.tick_height, 2); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_poh_recorder_reset_clears_cache() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( - 0, - Hash::default(), - 0, - Some(4), - DEFAULT_TICKS_PER_SLOT, - &Pubkey::default(), - ); - poh_recorder.tick(); - assert_eq!(poh_recorder.tick_cache.len(), 1); - poh_recorder.reset(0, Hash::default(), 0, Some(4), DEFAULT_TICKS_PER_SLOT); - assert_eq!(poh_recorder.tick_cache.len(), 0); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + Hash::default(), + 0, + Some(4), + DEFAULT_TICKS_PER_SLOT, + &Pubkey::default(), + &Arc::new(blocktree), + ); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.len(), 1); + poh_recorder.reset(0, Hash::default(), 0, Some(4), DEFAULT_TICKS_PER_SLOT); + assert_eq!(poh_recorder.tick_cache.len(), 0); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_poh_recorder_clear() { - let (genesis_block, _mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( - 0, - prev_hash, - 0, - Some(4), - bank.ticks_per_slot(), - &Pubkey::default(), - ); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + Some(4), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); - let working_bank = WorkingBank { - bank, - min_tick_height: 2, - max_tick_height: 3, - }; - poh_recorder.set_working_bank(working_bank); - assert!(poh_recorder.working_bank.is_some()); - poh_recorder.clear_bank(); - assert!(poh_recorder.working_bank.is_none()); + let working_bank = WorkingBank { + bank, + min_tick_height: 2, + max_tick_height: 3, + }; + poh_recorder.set_working_bank(working_bank); + assert!(poh_recorder.working_bank.is_some()); + poh_recorder.clear_bank(); + assert!(poh_recorder.working_bank.is_none()); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_poh_recorder_tick_sent_after_min() { - let (genesis_block, _mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new( - 0, - prev_hash, - 0, - Some(4), - bank.ticks_per_slot(), - &Pubkey::default(), - ); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + Some(4), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); - let working_bank = WorkingBank { - bank: bank.clone(), - min_tick_height: 2, - max_tick_height: 3, - }; - poh_recorder.set_working_bank(working_bank); - poh_recorder.tick(); - poh_recorder.tick(); - //tick height equal to min_tick_height - //no tick has been sent - assert_eq!(poh_recorder.tick_cache.last().unwrap().1, 2); - assert!(entry_receiver.try_recv().is_err()); + let working_bank = WorkingBank { + bank: bank.clone(), + min_tick_height: 2, + max_tick_height: 3, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); + poh_recorder.tick(); + //tick height equal to min_tick_height + //no tick has been sent + assert_eq!(poh_recorder.tick_cache.last().unwrap().1, 2); + assert!(entry_receiver.try_recv().is_err()); - // all ticks are sent after height > min - poh_recorder.tick(); - assert_eq!(poh_recorder.poh.tick_height, 3); - assert_eq!(poh_recorder.tick_cache.len(), 0); - let (bank_, e) = entry_receiver.recv().expect("recv 1"); - assert_eq!(e.len(), 3); - assert_eq!(bank_.slot(), bank.slot()); - assert!(poh_recorder.working_bank.is_none()); + // all ticks are sent after height > min + poh_recorder.tick(); + assert_eq!(poh_recorder.poh.tick_height, 3); + assert_eq!(poh_recorder.tick_cache.len(), 0); + let (bank_, e) = entry_receiver.recv().expect("recv 1"); + assert_eq!(e.len(), 3); + assert_eq!(bank_.slot(), bank.slot()); + assert!(poh_recorder.working_bank.is_none()); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_poh_recorder_tick_sent_upto_and_including_max() { - let (genesis_block, _mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new( - 0, - prev_hash, - 0, - Some(4), - bank.ticks_per_slot(), - &Pubkey::default(), - ); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + Some(4), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); - poh_recorder.tick(); - poh_recorder.tick(); - poh_recorder.tick(); - poh_recorder.tick(); - assert_eq!(poh_recorder.tick_cache.last().unwrap().1, 4); - assert_eq!(poh_recorder.poh.tick_height, 4); + poh_recorder.tick(); + poh_recorder.tick(); + poh_recorder.tick(); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.last().unwrap().1, 4); + assert_eq!(poh_recorder.poh.tick_height, 4); - let working_bank = WorkingBank { - bank, - min_tick_height: 2, - max_tick_height: 3, - }; - poh_recorder.set_working_bank(working_bank); - poh_recorder.tick(); + let working_bank = WorkingBank { + bank, + min_tick_height: 2, + max_tick_height: 3, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); - assert_eq!(poh_recorder.poh.tick_height, 5); - assert!(poh_recorder.working_bank.is_none()); - let (_, e) = entry_receiver.recv().expect("recv 1"); - assert_eq!(e.len(), 3); + assert_eq!(poh_recorder.poh.tick_height, 5); + assert!(poh_recorder.working_bank.is_none()); + let (_, e) = entry_receiver.recv().expect("recv 1"); + assert_eq!(e.len(), 3); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_poh_recorder_record_to_early() { - let (genesis_block, _mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new( - 0, - prev_hash, - 0, - Some(4), - bank.ticks_per_slot(), - &Pubkey::default(), - ); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + Some(4), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); - let working_bank = WorkingBank { - bank: bank.clone(), - min_tick_height: 2, - max_tick_height: 3, - }; - poh_recorder.set_working_bank(working_bank); - poh_recorder.tick(); - let tx = test_tx(); - let h1 = hash(b"hello world!"); - assert!(poh_recorder - .record(bank.slot(), h1, vec![tx.clone()]) - .is_err()); - assert!(entry_receiver.try_recv().is_err()); + let working_bank = WorkingBank { + bank: bank.clone(), + min_tick_height: 2, + max_tick_height: 3, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); + let tx = test_tx(); + let h1 = hash(b"hello world!"); + assert!(poh_recorder + .record(bank.slot(), h1, vec![tx.clone()]) + .is_err()); + assert!(entry_receiver.try_recv().is_err()); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_poh_recorder_record_bad_slot() { - let (genesis_block, _mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( - 0, - prev_hash, - 0, - Some(4), - bank.ticks_per_slot(), - &Pubkey::default(), - ); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + Some(4), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); - let working_bank = WorkingBank { - bank: bank.clone(), - min_tick_height: 1, - max_tick_height: 2, - }; - poh_recorder.set_working_bank(working_bank); - poh_recorder.tick(); - assert_eq!(poh_recorder.tick_cache.len(), 1); - assert_eq!(poh_recorder.poh.tick_height, 1); - let tx = test_tx(); - let h1 = hash(b"hello world!"); - assert_matches!( - poh_recorder.record(bank.slot() + 1, h1, vec![tx.clone()]), - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) - ); + let working_bank = WorkingBank { + bank: bank.clone(), + min_tick_height: 1, + max_tick_height: 2, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.len(), 1); + assert_eq!(poh_recorder.poh.tick_height, 1); + let tx = test_tx(); + let h1 = hash(b"hello world!"); + assert_matches!( + poh_recorder.record(bank.slot() + 1, h1, vec![tx.clone()]), + Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) + ); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_poh_recorder_record_at_min_passes() { - let (genesis_block, _mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new( - 0, - prev_hash, - 0, - Some(4), - bank.ticks_per_slot(), - &Pubkey::default(), - ); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + Some(4), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); - let working_bank = WorkingBank { - bank: bank.clone(), - min_tick_height: 1, - max_tick_height: 2, - }; - poh_recorder.set_working_bank(working_bank); - poh_recorder.tick(); - assert_eq!(poh_recorder.tick_cache.len(), 1); - assert_eq!(poh_recorder.poh.tick_height, 1); - let tx = test_tx(); - let h1 = hash(b"hello world!"); - assert!(poh_recorder - .record(bank.slot(), h1, vec![tx.clone()]) - .is_ok()); - assert_eq!(poh_recorder.tick_cache.len(), 0); + let working_bank = WorkingBank { + bank: bank.clone(), + min_tick_height: 1, + max_tick_height: 2, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.len(), 1); + assert_eq!(poh_recorder.poh.tick_height, 1); + let tx = test_tx(); + let h1 = hash(b"hello world!"); + assert!(poh_recorder + .record(bank.slot(), h1, vec![tx.clone()]) + .is_ok()); + assert_eq!(poh_recorder.tick_cache.len(), 0); - //tick in the cache + entry - let (_b, e) = entry_receiver.recv().expect("recv 1"); - assert_eq!(e.len(), 1); - assert!(e[0].0.is_tick()); - let (_b, e) = entry_receiver.recv().expect("recv 2"); - assert!(!e[0].0.is_tick()); + //tick in the cache + entry + let (_b, e) = entry_receiver.recv().expect("recv 1"); + assert_eq!(e.len(), 1); + assert!(e[0].0.is_tick()); + let (_b, e) = entry_receiver.recv().expect("recv 2"); + assert!(!e[0].0.is_tick()); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_poh_recorder_record_at_max_fails() { - let (genesis_block, _mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new( - 0, - prev_hash, - 0, - Some(4), - bank.ticks_per_slot(), - &Pubkey::default(), - ); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + Some(4), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); - let working_bank = WorkingBank { - bank: bank.clone(), - min_tick_height: 1, - max_tick_height: 2, - }; - poh_recorder.set_working_bank(working_bank); - poh_recorder.tick(); - poh_recorder.tick(); - assert_eq!(poh_recorder.poh.tick_height, 2); - let tx = test_tx(); - let h1 = hash(b"hello world!"); - assert!(poh_recorder - .record(bank.slot(), h1, vec![tx.clone()]) - .is_err()); + let working_bank = WorkingBank { + bank: bank.clone(), + min_tick_height: 1, + max_tick_height: 2, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); + poh_recorder.tick(); + assert_eq!(poh_recorder.poh.tick_height, 2); + let tx = test_tx(); + let h1 = hash(b"hello world!"); + assert!(poh_recorder + .record(bank.slot(), h1, vec![tx.clone()]) + .is_err()); - let (_bank, e) = entry_receiver.recv().expect("recv 1"); - assert_eq!(e.len(), 2); - assert!(e[0].0.is_tick()); - assert!(e[1].0.is_tick()); + let (_bank, e) = entry_receiver.recv().expect("recv 1"); + assert_eq!(e.len(), 2); + assert!(e[0].0.is_tick()); + assert!(e[1].0.is_tick()); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_poh_cache_on_disconnect() { - let (genesis_block, _mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new( - 0, - prev_hash, - 0, - Some(4), - bank.ticks_per_slot(), - &Pubkey::default(), - ); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + Some(4), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); - let working_bank = WorkingBank { - bank, - min_tick_height: 2, - max_tick_height: 3, - }; - poh_recorder.set_working_bank(working_bank); - poh_recorder.tick(); - poh_recorder.tick(); - assert_eq!(poh_recorder.poh.tick_height, 2); - drop(entry_receiver); - poh_recorder.tick(); - assert!(poh_recorder.working_bank.is_none()); - assert_eq!(poh_recorder.tick_cache.len(), 3); + let working_bank = WorkingBank { + bank, + min_tick_height: 2, + max_tick_height: 3, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); + poh_recorder.tick(); + assert_eq!(poh_recorder.poh.tick_height, 2); + drop(entry_receiver); + poh_recorder.tick(); + assert!(poh_recorder.working_bank.is_none()); + assert_eq!(poh_recorder.tick_cache.len(), 3); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_reset_current() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( - 0, - Hash::default(), - 0, - Some(4), - DEFAULT_TICKS_PER_SLOT, - &Pubkey::default(), - ); - poh_recorder.tick(); - poh_recorder.tick(); - assert_eq!(poh_recorder.tick_cache.len(), 2); - poh_recorder.reset( - poh_recorder.poh.tick_height, - poh_recorder.poh.hash, - 0, - Some(4), - DEFAULT_TICKS_PER_SLOT, - ); - assert_eq!(poh_recorder.tick_cache.len(), 0); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + Hash::default(), + 0, + Some(4), + DEFAULT_TICKS_PER_SLOT, + &Pubkey::default(), + &Arc::new(blocktree), + ); + poh_recorder.tick(); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.len(), 2); + poh_recorder.reset( + poh_recorder.poh.tick_height, + poh_recorder.poh.hash, + 0, + Some(4), + DEFAULT_TICKS_PER_SLOT, + ); + assert_eq!(poh_recorder.tick_cache.len(), 0); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_reset_with_cached() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( - 0, - Hash::default(), - 0, - Some(4), - DEFAULT_TICKS_PER_SLOT, - &Pubkey::default(), - ); - poh_recorder.tick(); - poh_recorder.tick(); - assert_eq!(poh_recorder.tick_cache.len(), 2); - poh_recorder.reset( - poh_recorder.tick_cache[0].1, - poh_recorder.tick_cache[0].0.hash, - 0, - Some(4), - DEFAULT_TICKS_PER_SLOT, - ); - assert_eq!(poh_recorder.tick_cache.len(), 0); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + Hash::default(), + 0, + Some(4), + DEFAULT_TICKS_PER_SLOT, + &Pubkey::default(), + &Arc::new(blocktree), + ); + poh_recorder.tick(); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.len(), 2); + poh_recorder.reset( + poh_recorder.tick_cache[0].1, + poh_recorder.tick_cache[0].0.hash, + 0, + Some(4), + DEFAULT_TICKS_PER_SLOT, + ); + assert_eq!(poh_recorder.tick_cache.len(), 0); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_reset_to_new_value() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( - 0, - Hash::default(), - 0, - Some(4), - DEFAULT_TICKS_PER_SLOT, - &Pubkey::default(), - ); - poh_recorder.tick(); - poh_recorder.tick(); - poh_recorder.tick(); - assert_eq!(poh_recorder.tick_cache.len(), 3); - assert_eq!(poh_recorder.poh.tick_height, 3); - poh_recorder.reset(1, hash(b"hello"), 0, Some(4), DEFAULT_TICKS_PER_SLOT); - assert_eq!(poh_recorder.tick_cache.len(), 0); - poh_recorder.tick(); - assert_eq!(poh_recorder.poh.tick_height, 2); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + Hash::default(), + 0, + Some(4), + DEFAULT_TICKS_PER_SLOT, + &Pubkey::default(), + &Arc::new(blocktree), + ); + poh_recorder.tick(); + poh_recorder.tick(); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.len(), 3); + assert_eq!(poh_recorder.poh.tick_height, 3); + poh_recorder.reset(1, hash(b"hello"), 0, Some(4), DEFAULT_TICKS_PER_SLOT); + assert_eq!(poh_recorder.tick_cache.len(), 0); + poh_recorder.tick(); + assert_eq!(poh_recorder.poh.tick_height, 2); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_reset_clear_bank() { - let (genesis_block, _mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( - 0, - Hash::default(), - 0, - Some(4), - bank.ticks_per_slot(), - &Pubkey::default(), - ); - let ticks_per_slot = bank.ticks_per_slot(); - let working_bank = WorkingBank { - bank, - min_tick_height: 2, - max_tick_height: 3, - }; - poh_recorder.set_working_bank(working_bank); - poh_recorder.reset(1, hash(b"hello"), 0, Some(4), ticks_per_slot); - assert!(poh_recorder.working_bank.is_none()); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + Hash::default(), + 0, + Some(4), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); + let ticks_per_slot = bank.ticks_per_slot(); + let working_bank = WorkingBank { + bank, + min_tick_height: 2, + max_tick_height: 3, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.reset(1, hash(b"hello"), 0, Some(4), ticks_per_slot); + assert!(poh_recorder.working_bank.is_none()); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] pub fn test_clear_signal() { - let (genesis_block, _mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( - 0, - Hash::default(), - 0, - None, - bank.ticks_per_slot(), - &Pubkey::default(), - ); - let (sender, receiver) = sync_channel(1); - poh_recorder.set_bank(&bank); - poh_recorder.clear_bank_signal = Some(sender); - poh_recorder.clear_bank(); - assert!(receiver.try_recv().is_ok()); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + Hash::default(), + 0, + None, + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); + let (sender, receiver) = sync_channel(1); + poh_recorder.set_bank(&bank); + poh_recorder.clear_bank_signal = Some(sender); + poh_recorder.clear_bank(); + assert!(receiver.try_recv().is_ok()); + } + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_poh_recorder_reset_start_slot() { - let ticks_per_slot = 5; - let (mut genesis_block, _mint_keypair) = GenesisBlock::new(2); - genesis_block.ticks_per_slot = ticks_per_slot; - let bank = Arc::new(Bank::new(&genesis_block)); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let ticks_per_slot = 5; + let (mut genesis_block, _mint_keypair) = GenesisBlock::new(2); + genesis_block.ticks_per_slot = ticks_per_slot; + let bank = Arc::new(Bank::new(&genesis_block)); - let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( - 0, - prev_hash, - 0, - Some(4), - bank.ticks_per_slot(), - &Pubkey::default(), - ); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + Some(4), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); - let end_slot = 3; - let max_tick_height = (end_slot + 1) * ticks_per_slot - 1; - let working_bank = WorkingBank { - bank: bank.clone(), - min_tick_height: 1, - max_tick_height, - }; + let end_slot = 3; + let max_tick_height = (end_slot + 1) * ticks_per_slot - 1; + let working_bank = WorkingBank { + bank: bank.clone(), + min_tick_height: 1, + max_tick_height, + }; - poh_recorder.set_working_bank(working_bank); - for _ in 0..max_tick_height { - poh_recorder.tick(); + poh_recorder.set_working_bank(working_bank); + for _ in 0..max_tick_height { + poh_recorder.tick(); + } + + let tx = test_tx(); + let h1 = hash(b"hello world!"); + assert!(poh_recorder + .record(bank.slot(), h1, vec![tx.clone()]) + .is_err()); + assert!(poh_recorder.working_bank.is_none()); + // Make sure the starting slot is updated + assert_eq!(poh_recorder.start_slot(), end_slot); } - - let tx = test_tx(); - let h1 = hash(b"hello world!"); - assert!(poh_recorder - .record(bank.slot(), h1, vec![tx.clone()]) - .is_err()); - assert!(poh_recorder.working_bank.is_none()); - // Make sure the starting slot is updated - assert_eq!(poh_recorder.start_slot(), end_slot); + Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_reached_leader_tick() { - let (genesis_block, _mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( - 0, - prev_hash, - 0, - None, - bank.ticks_per_slot(), - &Pubkey::default(), - ); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + None, + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); - // Test that with no leader slot, we don't reach the leader tick - assert_eq!(poh_recorder.reached_leader_tick().0, false); + // Test that with no leader slot, we don't reach the leader tick + assert_eq!(poh_recorder.reached_leader_tick().0, false); - for _ in 0..bank.ticks_per_slot() { + for _ in 0..bank.ticks_per_slot() { + poh_recorder.tick(); + } + + // Tick should not be recorded + assert_eq!(poh_recorder.tick_height(), 0); + + // Test that with no leader slot, we don't reach the leader tick after sending some ticks + assert_eq!(poh_recorder.reached_leader_tick().0, false); + + poh_recorder.reset( + poh_recorder.tick_height(), + bank.last_blockhash(), + 0, + None, + bank.ticks_per_slot(), + ); + + // Test that with no leader slot in reset(), we don't reach the leader tick + assert_eq!(poh_recorder.reached_leader_tick().0, false); + + // Provide a leader slot 1 slot down + poh_recorder.reset( + bank.ticks_per_slot(), + bank.last_blockhash(), + 0, + Some(2), + bank.ticks_per_slot(), + ); + + let init_ticks = poh_recorder.tick_height(); + + // Send one slot worth of ticks + for _ in 0..bank.ticks_per_slot() { + poh_recorder.tick(); + } + + // Tick should be recorded + assert_eq!( + poh_recorder.tick_height(), + init_ticks + bank.ticks_per_slot() + ); + + // Test that we don't reach the leader tick because of grace ticks + assert_eq!(poh_recorder.reached_leader_tick().0, false); + + // reset poh now. it should discard the grace ticks wait + poh_recorder.reset( + poh_recorder.tick_height(), + bank.last_blockhash(), + 1, + Some(2), + bank.ticks_per_slot(), + ); + // without sending more ticks, we should be leader now + assert_eq!(poh_recorder.reached_leader_tick().0, true); + assert_eq!(poh_recorder.reached_leader_tick().1, 0); + + // Now test that with grace ticks we can reach leader ticks + // Set the leader slot 1 slot down + poh_recorder.reset( + poh_recorder.tick_height(), + bank.last_blockhash(), + 2, + Some(3), + bank.ticks_per_slot(), + ); + + // Send one slot worth of ticks + for _ in 0..bank.ticks_per_slot() { + poh_recorder.tick(); + } + + // We are not the leader yet, as expected + assert_eq!(poh_recorder.reached_leader_tick().0, false); + + // Send 1 less tick than the grace ticks + for _ in 0..bank.ticks_per_slot() / MAX_LAST_LEADER_GRACE_TICKS_FACTOR - 1 { + poh_recorder.tick(); + } + // We are still not the leader + assert_eq!(poh_recorder.reached_leader_tick().0, false); + + // Send one more tick poh_recorder.tick(); - } - // Tick should not be recorded - assert_eq!(poh_recorder.tick_height(), 0); + // We should be the leader now + assert_eq!(poh_recorder.reached_leader_tick().0, true); + assert_eq!( + poh_recorder.reached_leader_tick().1, + bank.ticks_per_slot() / MAX_LAST_LEADER_GRACE_TICKS_FACTOR + ); - // Test that with no leader slot, we don't reach the leader tick after sending some ticks - assert_eq!(poh_recorder.reached_leader_tick().0, false); + // Let's test that correct grace ticks are reported + // Set the leader slot 1 slot down + poh_recorder.reset( + poh_recorder.tick_height(), + bank.last_blockhash(), + 3, + Some(4), + bank.ticks_per_slot(), + ); - poh_recorder.reset( - poh_recorder.tick_height(), - bank.last_blockhash(), - 0, - None, - bank.ticks_per_slot(), - ); + // Send remaining ticks for the slot (remember we sent extra ticks in the previous part of the test) + for _ in + bank.ticks_per_slot() / MAX_LAST_LEADER_GRACE_TICKS_FACTOR..bank.ticks_per_slot() + { + poh_recorder.tick(); + } - // Test that with no leader slot in reset(), we don't reach the leader tick - assert_eq!(poh_recorder.reached_leader_tick().0, false); - - // Provide a leader slot 1 slot down - poh_recorder.reset( - bank.ticks_per_slot(), - bank.last_blockhash(), - 0, - Some(2), - bank.ticks_per_slot(), - ); - - let init_ticks = poh_recorder.tick_height(); - - // Send one slot worth of ticks - for _ in 0..bank.ticks_per_slot() { + // Send one extra tick before resetting (so that there's one grace tick) poh_recorder.tick(); + + // We are not the leader yet, as expected + assert_eq!(poh_recorder.reached_leader_tick().0, false); + poh_recorder.reset( + poh_recorder.tick_height(), + bank.last_blockhash(), + 3, + Some(4), + bank.ticks_per_slot(), + ); + // without sending more ticks, we should be leader now + assert_eq!(poh_recorder.reached_leader_tick().0, true); + assert_eq!(poh_recorder.reached_leader_tick().1, 1); + + // Let's test that if a node overshoots the ticks for its target + // leader slot, reached_leader_tick() will return false + // Set the leader slot 1 slot down + poh_recorder.reset( + poh_recorder.tick_height(), + bank.last_blockhash(), + 4, + Some(5), + bank.ticks_per_slot(), + ); + + // Send remaining ticks for the slot (remember we sent extra ticks in the previous part of the test) + for _ in 0..4 * bank.ticks_per_slot() { + poh_recorder.tick(); + } + + // We are not the leader, as expected + assert_eq!(poh_recorder.reached_leader_tick().0, false); } - - // Tick should be recorded - assert_eq!( - poh_recorder.tick_height(), - init_ticks + bank.ticks_per_slot() - ); - - // Test that we don't reach the leader tick because of grace ticks - assert_eq!(poh_recorder.reached_leader_tick().0, false); - - // reset poh now. it should discard the grace ticks wait - poh_recorder.reset( - poh_recorder.tick_height(), - bank.last_blockhash(), - 1, - Some(2), - bank.ticks_per_slot(), - ); - // without sending more ticks, we should be leader now - assert_eq!(poh_recorder.reached_leader_tick().0, true); - assert_eq!(poh_recorder.reached_leader_tick().1, 0); - - // Now test that with grace ticks we can reach leader ticks - // Set the leader slot 1 slot down - poh_recorder.reset( - poh_recorder.tick_height(), - bank.last_blockhash(), - 2, - Some(3), - bank.ticks_per_slot(), - ); - - // Send one slot worth of ticks - for _ in 0..bank.ticks_per_slot() { - poh_recorder.tick(); - } - - // We are not the leader yet, as expected - assert_eq!(poh_recorder.reached_leader_tick().0, false); - - // Send 1 less tick than the grace ticks - for _ in 0..bank.ticks_per_slot() / MAX_LAST_LEADER_GRACE_TICKS_FACTOR - 1 { - poh_recorder.tick(); - } - // We are still not the leader - assert_eq!(poh_recorder.reached_leader_tick().0, false); - - // Send one more tick - poh_recorder.tick(); - - // We should be the leader now - assert_eq!(poh_recorder.reached_leader_tick().0, true); - assert_eq!( - poh_recorder.reached_leader_tick().1, - bank.ticks_per_slot() / MAX_LAST_LEADER_GRACE_TICKS_FACTOR - ); - - // Let's test that correct grace ticks are reported - // Set the leader slot 1 slot down - poh_recorder.reset( - poh_recorder.tick_height(), - bank.last_blockhash(), - 3, - Some(4), - bank.ticks_per_slot(), - ); - - // Send remaining ticks for the slot (remember we sent extra ticks in the previous part of the test) - for _ in bank.ticks_per_slot() / MAX_LAST_LEADER_GRACE_TICKS_FACTOR..bank.ticks_per_slot() { - poh_recorder.tick(); - } - - // Send one extra tick before resetting (so that there's one grace tick) - poh_recorder.tick(); - - // We are not the leader yet, as expected - assert_eq!(poh_recorder.reached_leader_tick().0, false); - poh_recorder.reset( - poh_recorder.tick_height(), - bank.last_blockhash(), - 3, - Some(4), - bank.ticks_per_slot(), - ); - // without sending more ticks, we should be leader now - assert_eq!(poh_recorder.reached_leader_tick().0, true); - assert_eq!(poh_recorder.reached_leader_tick().1, 1); - - // Let's test that if a node overshoots the ticks for its target - // leader slot, reached_leader_tick() will return false - // Set the leader slot 1 slot down - poh_recorder.reset( - poh_recorder.tick_height(), - bank.last_blockhash(), - 4, - Some(5), - bank.ticks_per_slot(), - ); - - // Send remaining ticks for the slot (remember we sent extra ticks in the previous part of the test) - for _ in 0..4 * bank.ticks_per_slot() { - poh_recorder.tick(); - } - - // We are not the leader, as expected - assert_eq!(poh_recorder.reached_leader_tick().0, false); + Blocktree::destroy(&ledger_path).unwrap(); } } diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index 8744ea5e61..17b00dfeda 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -1,6 +1,5 @@ //! The `poh_service` module implements a service that records the passing of //! "ticks", a measure of time in the PoH stream - use crate::poh_recorder::PohRecorder; use crate::service::Service; use solana_sdk::timing::NUM_TICKS_PER_SECOND; @@ -98,6 +97,7 @@ impl Service for PohService { #[cfg(test)] mod tests { use super::*; + use crate::blocktree::{get_tmp_ledger_path, Blocktree}; use crate::poh_recorder::WorkingBank; use crate::result::Result; use crate::test_tx::test_tx; @@ -111,87 +111,94 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (poh_recorder, entry_receiver) = PohRecorder::new( - bank.tick_height(), - prev_hash, - bank.slot(), - Some(4), - bank.ticks_per_slot(), - &Pubkey::default(), - ); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); - let exit = Arc::new(AtomicBool::new(false)); - let working_bank = WorkingBank { - bank: bank.clone(), - min_tick_height: bank.tick_height(), - max_tick_height: std::u64::MAX, - }; + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (poh_recorder, entry_receiver) = PohRecorder::new( + bank.tick_height(), + prev_hash, + bank.slot(), + Some(4), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + ); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let exit = Arc::new(AtomicBool::new(false)); + let working_bank = WorkingBank { + bank: bank.clone(), + min_tick_height: bank.tick_height(), + max_tick_height: std::u64::MAX, + }; - let entry_producer: JoinHandle> = { - let poh_recorder = poh_recorder.clone(); - let exit = exit.clone(); + let entry_producer: JoinHandle> = { + let poh_recorder = poh_recorder.clone(); + let exit = exit.clone(); - Builder::new() - .name("solana-poh-service-entry_producer".to_string()) - .spawn(move || { - loop { - // send some data - let h1 = hash(b"hello world!"); - let tx = test_tx(); - poh_recorder - .lock() - .unwrap() - .record(bank.slot(), h1, vec![tx]) - .unwrap(); + Builder::new() + .name("solana-poh-service-entry_producer".to_string()) + .spawn(move || { + loop { + // send some data + let h1 = hash(b"hello world!"); + let tx = test_tx(); + poh_recorder + .lock() + .unwrap() + .record(bank.slot(), h1, vec![tx]) + .unwrap(); - if exit.load(Ordering::Relaxed) { - break Ok(()); + if exit.load(Ordering::Relaxed) { + break Ok(()); + } } - } - }) - .unwrap() - }; + }) + .unwrap() + }; - const HASHES_PER_TICK: u64 = 2; - let poh_service = PohService::new( - poh_recorder.clone(), - &PohServiceConfig::Tick(HASHES_PER_TICK as usize), - &exit, - ); - poh_recorder.lock().unwrap().set_working_bank(working_bank); + const HASHES_PER_TICK: u64 = 2; + let poh_service = PohService::new( + poh_recorder.clone(), + &PohServiceConfig::Tick(HASHES_PER_TICK as usize), + &exit, + ); + poh_recorder.lock().unwrap().set_working_bank(working_bank); - // get some events - let mut hashes = 0; - let mut need_tick = true; - let mut need_entry = true; - let mut need_partial = true; + // get some events + let mut hashes = 0; + let mut need_tick = true; + let mut need_entry = true; + let mut need_partial = true; - while need_tick || need_entry || need_partial { - for entry in entry_receiver.recv().unwrap().1 { - let entry = &entry.0; - if entry.is_tick() { - assert!(entry.num_hashes <= HASHES_PER_TICK); + while need_tick || need_entry || need_partial { + for entry in entry_receiver.recv().unwrap().1 { + let entry = &entry.0; + if entry.is_tick() { + assert!(entry.num_hashes <= HASHES_PER_TICK); - if entry.num_hashes == HASHES_PER_TICK { - need_tick = false; + if entry.num_hashes == HASHES_PER_TICK { + need_tick = false; + } else { + need_partial = false; + } + + hashes += entry.num_hashes; + + assert_eq!(hashes, HASHES_PER_TICK); + + hashes = 0; } else { - need_partial = false; + assert!(entry.num_hashes >= 1); + need_entry = false; + hashes += entry.num_hashes - 1; } - - hashes += entry.num_hashes; - - assert_eq!(hashes, HASHES_PER_TICK); - - hashes = 0; - } else { - assert!(entry.num_hashes >= 1); - need_entry = false; - hashes += entry.num_hashes - 1; } } + exit.store(true, Ordering::Relaxed); + let _ = poh_service.join().unwrap(); + let _ = entry_producer.join().unwrap(); } - exit.store(true, Ordering::Relaxed); - let _ = poh_service.join().unwrap(); - let _ = entry_producer.join().unwrap(); + Blocktree::destroy(&ledger_path).unwrap(); } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index d2b0593c6b..0be3dfc1c2 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -147,7 +147,13 @@ impl ReplayStage { &cluster_info, ); - Self::reset_poh_recorder(&my_id, &bank, &poh_recorder, ticks_per_slot); + Self::reset_poh_recorder( + &my_id, + &blocktree, + &bank, + &poh_recorder, + ticks_per_slot, + ); is_tpu_bank_active = false; } @@ -171,7 +177,6 @@ impl ReplayStage { &bank_forks, &poh_recorder, &cluster_info, - &blocktree, poh_slot, reached_leader_tick, grace_ticks, @@ -200,35 +205,11 @@ impl ReplayStage { bank_forks: &Arc>, poh_recorder: &Arc>, cluster_info: &Arc>, - blocktree: &Blocktree, poh_slot: u64, reached_leader_tick: bool, grace_ticks: u64, ) { trace!("{} checking poh slot {}", my_id, poh_slot); - if blocktree.meta(poh_slot).unwrap().is_some() { - // We've already broadcasted entries for this slot, skip it - - // Since we are skipping our leader slot, let's tell poh recorder when we should be - // leader again - if reached_leader_tick { - let _ = bank_forks.read().unwrap().get(poh_slot).map(|bank| { - let next_leader_slot = - leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank); - let mut poh = poh_recorder.lock().unwrap(); - let start_slot = poh.start_slot(); - poh.reset( - bank.tick_height(), - bank.last_blockhash(), - start_slot, - next_leader_slot, - bank.ticks_per_slot(), - ); - }); - } - - return; - } if bank_forks.read().unwrap().get(poh_slot).is_none() { let parent_slot = poh_recorder.lock().unwrap().start_slot(); let parent = { @@ -332,11 +313,13 @@ impl ReplayStage { fn reset_poh_recorder( my_id: &Pubkey, + blocktree: &Blocktree, bank: &Arc, poh_recorder: &Arc>, ticks_per_slot: u64, ) { - let next_leader_slot = leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank); + let next_leader_slot = + leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank, Some(blocktree)); poh_recorder.lock().unwrap().reset( bank.tick_height(), bank.last_blockhash(), @@ -635,7 +618,8 @@ mod test { let bank = bank_forks.working_bank(); let blocktree = Arc::new(blocktree); - let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); + let (exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blocktree); let (ledger_writer_sender, ledger_writer_receiver) = channel(); let (replay_stage, _slot_full_receiver) = ReplayStage::new( &my_keypair.pubkey(), diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 6ccb3d8f43..59081ab173 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -208,8 +208,10 @@ pub mod tests { let blocktree_path = get_tmp_ledger_path!(); let (blocktree, l_receiver) = Blocktree::open_with_signal(&blocktree_path) .expect("Expected to successfully open ledger"); + let blocktree = Arc::new(blocktree); let bank = bank_forks.working_bank(); - let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); + let (exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blocktree); let voting_keypair = Keypair::new(); let (storage_entry_sender, storage_entry_receiver) = channel(); let tvu = Tvu::new( @@ -225,7 +227,7 @@ pub mod tests { fetch: target1.sockets.tvu, } }, - Arc::new(blocktree), + blocktree, STORAGE_ROTATE_TEST_COUNT, &StorageState::default(), None, diff --git a/core/tests/tvu.rs b/core/tests/tvu.rs index f2ad23a1a5..5d1989fbdc 100644 --- a/core/tests/tvu.rs +++ b/core/tests/tvu.rs @@ -99,95 +99,98 @@ fn test_replay() { let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, &exit); let voting_keypair = Keypair::new(); - let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) = - create_test_recorder(&bank); - let (storage_sender, storage_receiver) = channel(); - let tvu = Tvu::new( - &voting_keypair.pubkey(), - Some(Arc::new(voting_keypair)), - &bank_forks, - &bank_forks_info, - &cref1, - { - Sockets { - repair: target1.sockets.repair, - retransmit: target1.sockets.retransmit, - fetch: target1.sockets.tvu, - } - }, - Arc::new(blocktree), - STORAGE_ROTATE_TEST_COUNT, - &StorageState::default(), - None, - ledger_signal_receiver, - &Arc::new(RpcSubscriptions::default()), - &poh_recorder, - storage_sender, - storage_receiver, - &exit, - ); - - let mut mint_ref_balance = starting_mint_balance; - let mut msgs = Vec::new(); - let mut blob_idx = 0; - let num_transfers = 10; - let mut transfer_amount = 501; - let bob_keypair = Keypair::new(); - let mut cur_hash = blockhash; - for i in 0..num_transfers { - let entry0 = next_entry_mut(&mut cur_hash, i, vec![]); - let entry_tick0 = next_entry_mut(&mut cur_hash, i + 1, vec![]); - - let tx0 = SystemTransaction::new_account( - &mint_keypair, - &bob_keypair.pubkey(), - transfer_amount, - blockhash, - 0, + let blocktree = Arc::new(blocktree); + { + let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blocktree); + let (storage_sender, storage_receiver) = channel(); + let tvu = Tvu::new( + &voting_keypair.pubkey(), + Some(Arc::new(voting_keypair)), + &bank_forks, + &bank_forks_info, + &cref1, + { + Sockets { + repair: target1.sockets.repair, + retransmit: target1.sockets.retransmit, + fetch: target1.sockets.tvu, + } + }, + blocktree, + STORAGE_ROTATE_TEST_COUNT, + &StorageState::default(), + None, + ledger_signal_receiver, + &Arc::new(RpcSubscriptions::default()), + &poh_recorder, + storage_sender, + storage_receiver, + &exit, ); - let entry_tick1 = next_entry_mut(&mut cur_hash, i + 1, vec![]); - let entry1 = next_entry_mut(&mut cur_hash, i + num_transfers, vec![tx0]); - let entry_tick2 = next_entry_mut(&mut cur_hash, i + 1, vec![]); - mint_ref_balance -= transfer_amount; - transfer_amount -= 1; // Sneaky: change transfer_amount slightly to avoid DuplicateSignature errors + let mut mint_ref_balance = starting_mint_balance; + let mut msgs = Vec::new(); + let mut blob_idx = 0; + let num_transfers = 10; + let mut transfer_amount = 501; + let bob_keypair = Keypair::new(); + let mut cur_hash = blockhash; + for i in 0..num_transfers { + let entry0 = next_entry_mut(&mut cur_hash, i, vec![]); + let entry_tick0 = next_entry_mut(&mut cur_hash, i + 1, vec![]); - let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2]; - let blobs = entries.to_shared_blobs(); - index_blobs(&blobs, &leader.info.id, blob_idx, 1, 0); - blob_idx += blobs.len() as u64; - blobs - .iter() - .for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr)); - msgs.extend(blobs.into_iter()); + let tx0 = SystemTransaction::new_account( + &mint_keypair, + &bob_keypair.pubkey(), + transfer_amount, + blockhash, + 0, + ); + let entry_tick1 = next_entry_mut(&mut cur_hash, i + 1, vec![]); + let entry1 = next_entry_mut(&mut cur_hash, i + num_transfers, vec![tx0]); + let entry_tick2 = next_entry_mut(&mut cur_hash, i + 1, vec![]); + + mint_ref_balance -= transfer_amount; + transfer_amount -= 1; // Sneaky: change transfer_amount slightly to avoid DuplicateSignature errors + + let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2]; + let blobs = entries.to_shared_blobs(); + index_blobs(&blobs, &leader.info.id, blob_idx, 1, 0); + blob_idx += blobs.len() as u64; + blobs + .iter() + .for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr)); + msgs.extend(blobs.into_iter()); + } + + // send the blobs into the socket + s_responder.send(msgs).expect("send"); + drop(s_responder); + + // receive retransmitted messages + let timer = Duration::new(1, 0); + while let Ok(_msg) = r_reader.recv_timeout(timer) { + trace!("got msg"); + } + + let working_bank = bank_forks.read().unwrap().working_bank(); + let final_mint_balance = working_bank.get_balance(&mint_keypair.pubkey()); + assert_eq!(final_mint_balance, mint_ref_balance); + + let bob_balance = working_bank.get_balance(&bob_keypair.pubkey()); + assert_eq!(bob_balance, starting_mint_balance - mint_ref_balance); + + exit.store(true, Ordering::Relaxed); + poh_service_exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + tvu.join().unwrap(); + dr_l.join().unwrap(); + dr_2.join().unwrap(); + dr_1.join().unwrap(); + t_receiver.join().unwrap(); + t_responder.join().unwrap(); } - - // send the blobs into the socket - s_responder.send(msgs).expect("send"); - drop(s_responder); - - // receive retransmitted messages - let timer = Duration::new(1, 0); - while let Ok(_msg) = r_reader.recv_timeout(timer) { - trace!("got msg"); - } - - let working_bank = bank_forks.read().unwrap().working_bank(); - let final_mint_balance = working_bank.get_balance(&mint_keypair.pubkey()); - assert_eq!(final_mint_balance, mint_ref_balance); - - let bob_balance = working_bank.get_balance(&bob_keypair.pubkey()); - assert_eq!(bob_balance, starting_mint_balance - mint_ref_balance); - - exit.store(true, Ordering::Relaxed); - poh_service_exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); - tvu.join().unwrap(); - dr_l.join().unwrap(); - dr_2.join().unwrap(); - dr_1.join().unwrap(); - t_receiver.join().unwrap(); - t_responder.join().unwrap(); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); let _ignored = remove_dir_all(&blocktree_path); }