diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index 22d6a3e2bb..3c354c9d83 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -97,7 +97,12 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { let len = x.read().unwrap().packets.len(); (x, iter::repeat(1).take(len).collect()) }).collect(); - let (_stage, signal_receiver) = BankingStage::new(&bank, verified_receiver, Default::default()); + let (_stage, signal_receiver) = BankingStage::new( + &bank, + verified_receiver, + Default::default(), + &mint.last_id(), + ); bencher.iter(move || { for v in verified.chunks(verified.len() / NUM_THREADS) { verified_sender.send(v.to_vec()).unwrap(); @@ -182,7 +187,12 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { let len = x.read().unwrap().packets.len(); (x, iter::repeat(1).take(len).collect()) }).collect(); - let (_stage, signal_receiver) = BankingStage::new(&bank, verified_receiver, Default::default()); + let (_stage, signal_receiver) = BankingStage::new( + &bank, + verified_receiver, + Default::default(), + &mint.last_id(), + ); bencher.iter(move || { for v in verified.chunks(verified.len() / NUM_THREADS) { verified_sender.send(v.to_vec()).unwrap(); diff --git a/src/banking_stage.rs b/src/banking_stage.rs index b3e30ed6f4..a2ea97288c 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -6,6 +6,7 @@ use bank::{Bank, NUM_TICKS_PER_SECOND}; use bincode::deserialize; use counter::Counter; use entry::Entry; +use hash::Hash; use log::Level; use packet::Packets; use poh_recorder::PohRecorder; @@ -53,10 +54,11 @@ impl BankingStage { bank: &Arc, verified_receiver: Receiver, config: Config, + last_entry_id: &Hash, ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver)); - let poh = PohRecorder::new(bank.clone(), entry_sender); + let poh = PohRecorder::new(bank.clone(), entry_sender, *last_entry_id); let tick_poh = poh.clone(); // Tick producer is a headless producer, so when it exits it should notify the banking stage. // Since channel are not used to talk between these threads an AtomicBool is used as a @@ -264,20 +266,28 @@ mod tests { #[test] fn test_banking_stage_shutdown1() { - let bank = Bank::new(&Mint::new(2)); + let bank = Arc::new(Bank::new(&Mint::new(2))); let (verified_sender, verified_receiver) = channel(); - let (banking_stage, _entry_receiver) = - BankingStage::new(&Arc::new(bank), verified_receiver, Default::default()); + let (banking_stage, _entry_receiver) = BankingStage::new( + &bank, + verified_receiver, + Default::default(), + &bank.last_id(), + ); drop(verified_sender); assert_eq!(banking_stage.join().unwrap(), ()); } #[test] fn test_banking_stage_shutdown2() { - let bank = Bank::new(&Mint::new(2)); + let bank = Arc::new(Bank::new(&Mint::new(2))); let (_verified_sender, verified_receiver) = channel(); - let (banking_stage, entry_receiver) = - BankingStage::new(&Arc::new(bank), verified_receiver, Default::default()); + let (banking_stage, entry_receiver) = BankingStage::new( + &bank, + verified_receiver, + Default::default(), + &bank.last_id(), + ); drop(entry_receiver); assert_eq!(banking_stage.join().unwrap(), ()); } @@ -291,6 +301,7 @@ mod tests { &bank, verified_receiver, Config::Sleep(Duration::from_millis(1)), + &bank.last_id(), ); sleep(Duration::from_millis(500)); drop(verified_sender); @@ -308,8 +319,12 @@ mod tests { let bank = Arc::new(Bank::new(&mint)); let start_hash = bank.last_id(); let (verified_sender, verified_receiver) = channel(); - let (banking_stage, entry_receiver) = - BankingStage::new(&bank, verified_receiver, Default::default()); + let (banking_stage, entry_receiver) = BankingStage::new( + &bank, + verified_receiver, + Default::default(), + &bank.last_id(), + ); // good tx let keypair = mint.keypair(); @@ -354,8 +369,12 @@ mod tests { let mint = Mint::new(2); let bank = Arc::new(Bank::new(&mint)); let (verified_sender, verified_receiver) = channel(); - let (banking_stage, entry_receiver) = - BankingStage::new(&bank, verified_receiver, Default::default()); + let (banking_stage, entry_receiver) = BankingStage::new( + &bank, + verified_receiver, + Default::default(), + &bank.last_id(), + ); // Process a batch that includes a transaction that receives two tokens. let alice = Keypair::new(); diff --git a/src/fullnode.rs b/src/fullnode.rs index e91d2f48ce..13f9bc5e81 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -5,6 +5,7 @@ use broadcast_stage::BroadcastStage; use cluster_info::{ClusterInfo, Node, NodeInfo}; use drone::DRONE_PORT; use entry::Entry; +use hash::Hash; use leader_scheduler::LeaderScheduler; use ledger::read_ledger; use ncp::Ncp; @@ -275,6 +276,11 @@ impl Fullnode { exit.clone(), ); + let last_entry_id = &ledger_tail + .last() + .expect("Expected at least one entry in the ledger") + .id; + let window = window::new_window_from_entries(ledger_tail, entry_height, &node.info); let shared_window = Arc::new(RwLock::new(window)); let cluster_info = Arc::new(RwLock::new( @@ -347,6 +353,7 @@ impl Fullnode { ledger_path, sigverify_disabled, entry_height, + last_entry_id, leader_scheduler.clone(), ); @@ -455,7 +462,7 @@ impl Fullnode { Ok(()) } - fn validator_to_leader(&mut self, entry_height: u64) { + fn validator_to_leader(&mut self, entry_height: u64, last_entry_id: Hash) { self.cluster_info .write() .unwrap() @@ -472,6 +479,10 @@ impl Fullnode { &self.ledger_path, self.sigverify_disabled, entry_height, + // We pass the last_entry_id from the replicate stage because we can't trust that + // the window didn't overwrite the slot at for the last entry that the replicate stage + // processed. We also want to avoid reading processing the ledger for the last id. + &last_entry_id, self.leader_scheduler.clone(), ); @@ -509,8 +520,8 @@ impl Fullnode { _ => Ok(None), }, Some(NodeRole::Validator(validator_services)) => match validator_services.join()? { - Some(TvuReturnType::LeaderRotation(entry_height)) => { - self.validator_to_leader(entry_height); + Some(TvuReturnType::LeaderRotation(entry_height, last_entry_id)) => { + self.validator_to_leader(entry_height, last_entry_id); Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation)) } _ => Ok(None), @@ -568,7 +579,7 @@ impl Service for Fullnode { match self.node_role { Some(NodeRole::Validator(validator_service)) => { - if let Some(TvuReturnType::LeaderRotation(_)) = validator_service.join()? { + if let Some(TvuReturnType::LeaderRotation(_, _)) = validator_service.join()? { return Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation)); } } @@ -590,7 +601,7 @@ mod tests { use cluster_info::Node; use fullnode::{Fullnode, NodeRole, TvuReturnType}; use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; - use ledger::{genesis, LedgerWriter}; + use ledger::{create_sample_ledger, genesis, LedgerWriter}; use packet::make_consecutive_blobs; use service::Service; use signature::{Keypair, KeypairUtil}; @@ -605,14 +616,17 @@ mod tests { fn validator_exit() { let keypair = Keypair::new(); let tn = Node::new_localhost_with_pubkey(keypair.pubkey()); - let (alice, validator_ledger_path) = genesis("validator_exit", 10_000); - let bank = Bank::new(&alice); + let (mint, validator_ledger_path) = genesis("validator_exit", 10_000); + let bank = Bank::new(&mint); let entry = tn.info.clone(); + let genesis_entries = &mint.create_entries(); + let entry_height = genesis_entries.len() as u64; + let v = Fullnode::new_with_bank( keypair, bank, - 0, - &[], + entry_height, + &genesis_entries, tn, Some(&entry), &validator_ledger_path, @@ -631,16 +645,19 @@ mod tests { .map(|i| { let keypair = Keypair::new(); let tn = Node::new_localhost_with_pubkey(keypair.pubkey()); - let (alice, validator_ledger_path) = + let (mint, validator_ledger_path) = genesis(&format!("validator_parallel_exit_{}", i), 10_000); ledger_paths.push(validator_ledger_path.clone()); - let bank = Bank::new(&alice); + let bank = Bank::new(&mint); let entry = tn.info.clone(); + + let genesis_entries = &mint.create_entries(); + let entry_height = genesis_entries.len() as u64; Fullnode::new_with_bank( keypair, bank, - 0, - &[], + entry_height, + &genesis_entries, tn, Some(&entry), &validator_ledger_path, @@ -664,7 +681,6 @@ mod tests { } #[test] - #[ignore] fn test_wrong_role_transition() { // Create the leader node information let bootstrap_leader_keypair = Keypair::new(); @@ -677,9 +693,9 @@ mod tests { let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); // Make a common mint and a genesis entry for both leader + validator's ledgers - let (mint, bootstrap_leader_ledger_path) = genesis("test_wrong_role_transition", 10_000); + let (mint, bootstrap_leader_ledger_path, genesis_entries) = + create_sample_ledger("test_wrong_role_transition", 10_000); - let genesis_entries = mint.create_entries(); let last_id = genesis_entries .last() .expect("expected at least one genesis entry") @@ -688,7 +704,8 @@ mod tests { // Write the entries to the ledger that will cause leader rotation // after the bootstrap height let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); - let first_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id); + let first_entries = + make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id); let ledger_initial_len = (genesis_entries.len() + first_entries.len()) as u64; ledger_writer.write_entries(first_entries).unwrap(); @@ -746,7 +763,6 @@ mod tests { } #[test] - #[ignore] fn test_validator_to_leader_transition() { // Make a leader identity let leader_keypair = Keypair::new(); @@ -755,12 +771,12 @@ mod tests { let leader_ncp = leader_node.info.contact_info.ncp; // Create validator identity - let (mint, validator_ledger_path) = genesis("test_validator_to_leader_transition", 10_000); + let (mint, validator_ledger_path, genesis_entries) = + create_sample_ledger("test_validator_to_leader_transition", 10_000); let validator_keypair = Keypair::new(); let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); let validator_info = validator_node.info.clone(); - let genesis_entries = mint.create_entries(); let mut last_id = genesis_entries .last() .expect("expected at least one genesis entry") @@ -775,7 +791,7 @@ mod tests { // 2) A vote from the validator let mut ledger_writer = LedgerWriter::open(&validator_ledger_path, false).unwrap(); let bootstrap_entries = - make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id); + make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id); let bootstrap_entries_len = bootstrap_entries.len(); last_id = bootstrap_entries.last().unwrap().id; ledger_writer.write_entries(bootstrap_entries).unwrap(); @@ -845,10 +861,11 @@ mod tests { let join_result = validator_services .join() .expect("Expected successful validator join"); - assert_eq!( - join_result, - Some(TvuReturnType::LeaderRotation(bootstrap_height)) - ); + if let Some(TvuReturnType::LeaderRotation(result_bh, _)) = join_result { + assert_eq!(result_bh, bootstrap_height); + } else { + panic!("Expected validator to have exited due to leader rotation"); + } } _ => panic!("Role should not be leader"), } diff --git a/src/leader_scheduler.rs b/src/leader_scheduler.rs index be889dcb59..605d857422 100644 --- a/src/leader_scheduler.rs +++ b/src/leader_scheduler.rs @@ -430,20 +430,22 @@ pub fn set_new_leader(bank: &Bank, leader_scheduler: &mut LeaderScheduler, vote_ pub fn make_active_set_entries( active_keypair: &Keypair, token_source: &Keypair, - last_id: &Hash, + last_entry_id: &Hash, + last_tick_id: &Hash, ) -> Vec { // 1) Create transfer token entry - let transfer_tx = Transaction::system_new(&token_source, active_keypair.pubkey(), 1, *last_id); - let transfer_entry = Entry::new(last_id, 0, vec![transfer_tx]); - let last_id = transfer_entry.id; + let transfer_tx = + Transaction::system_new(&token_source, active_keypair.pubkey(), 1, *last_tick_id); + let transfer_entry = Entry::new(last_entry_id, 0, vec![transfer_tx]); + let last_entry_id = transfer_entry.id; // 2) Create vote entry let vote = Vote { version: 0, contact_info_version: 0, }; - let vote_tx = Transaction::budget_new_vote(&active_keypair, vote, last_id, 0); - let vote_entry = Entry::new(&last_id, 0, vec![vote_tx]); + let vote_tx = Transaction::budget_new_vote(&active_keypair, vote, *last_tick_id, 0); + let vote_entry = Entry::new(&last_entry_id, 0, vec![vote_tx]); vec![transfer_entry, vote_entry] } diff --git a/src/ledger.rs b/src/ledger.rs index de0ba7e961..1a89b853f2 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -8,12 +8,10 @@ use budget_transaction::BudgetTransaction; use entry::Entry; use hash::Hash; use log::Level::Trace; -#[cfg(test)] use mint::Mint; use packet::{SharedBlob, BLOB_DATA_SIZE}; use rayon::prelude::*; use result::{Error, Result}; -#[cfg(test)] use signature::{Keypair, KeypairUtil}; use solana_program_interface::pubkey::Pubkey; use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; @@ -466,7 +464,8 @@ impl Block for [Entry] { let r = x1.verify(&x0.id); if !r { warn!( - "entry invalid!: {:?} num txs: {}", + "entry invalid!: x0: {:?}, x1: {:?} num txs: {}", + x0.id, x1.id, x1.transactions.len() ); @@ -592,7 +591,6 @@ pub fn next_entries( next_entries_mut(&mut id, &mut num_hashes, transactions) } -#[cfg(test)] pub fn tmp_ledger_path(name: &str) -> String { use std::env; let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); @@ -611,6 +609,31 @@ pub fn genesis(name: &str, num: i64) -> (Mint, String) { (mint, path) } +fn create_ticks(num_ticks: usize, hash: &mut Hash) -> Vec { + let mut ticks = Vec::with_capacity(num_ticks); + let mut num_hashes = 0; + for _ in 0..num_ticks { + ticks.push(Entry::new_mut(hash, &mut num_hashes, vec![])); + } + + ticks +} + +pub fn create_sample_ledger(name: &str, num: i64) -> (Mint, String, Vec) { + let mint = Mint::new(num); + let path = tmp_ledger_path(name); + + // Create the entries + let mut genesis = mint.create_entries(); + let ticks = create_ticks(1, &mut mint.last_id()); + genesis.extend(ticks); + + let mut writer = LedgerWriter::open(&path, true).unwrap(); + writer.write_entries(genesis.clone()).unwrap(); + + (mint, path, genesis) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/poh_recorder.rs b/src/poh_recorder.rs index 6f444c42f8..4c528cb4b3 100644 --- a/src/poh_recorder.rs +++ b/src/poh_recorder.rs @@ -21,8 +21,8 @@ impl PohRecorder { /// A recorder to synchronize PoH with the following data structures /// * bank - the LastId's queue is updated on `tick` and `record` events /// * sender - the Entry channel that outputs to the ledger - pub fn new(bank: Arc, sender: Sender>) -> Self { - let poh = Arc::new(Mutex::new(Poh::new(bank.last_id()))); + pub fn new(bank: Arc, sender: Sender>, last_entry_id: Hash) -> Self { + let poh = Arc::new(Mutex::new(Poh::new(last_entry_id))); PohRecorder { poh, bank, sender } } @@ -77,8 +77,9 @@ mod tests { fn test_poh() { let mint = Mint::new(1); let bank = Arc::new(Bank::new(&mint)); + let last_id = bank.last_id(); let (entry_sender, entry_receiver) = channel(); - let poh_recorder = PohRecorder::new(bank, entry_sender); + let poh_recorder = PohRecorder::new(bank, entry_sender, last_id); //send some data let h1 = hash(b"hello world!"); diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index d7f107b569..c22bb67245 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -4,6 +4,7 @@ use bank::Bank; use cluster_info::ClusterInfo; use counter::Counter; use entry::EntryReceiver; +use hash::Hash; use leader_scheduler::LeaderScheduler; use ledger::{Block, LedgerWriter}; use log::Level; @@ -23,7 +24,7 @@ use vote_stage::send_validator_vote; #[derive(Debug, PartialEq, Eq, Clone)] pub enum ReplicateStageReturnType { - LeaderRotation(u64), + LeaderRotation(u64, Hash), } // Implement a destructor for the ReplicateStage thread to signal it exited @@ -60,7 +61,7 @@ impl ReplicateStage { vote_blob_sender: Option<&BlobSender>, entry_height: &mut u64, leader_scheduler: &Arc>, - ) -> Result<()> { + ) -> Result { let timer = Duration::new(1, 0); //coalesce all the available entries into a single vote let mut entries = window_receiver.recv_timeout(timer)?; @@ -69,7 +70,7 @@ impl ReplicateStage { } let mut res = Ok(()); - { + let last_entry_id = { let mut num_entries_to_write = entries.len(); for (i, entry) in entries.iter().enumerate() { res = bank.process_entry(&entry); @@ -111,7 +112,11 @@ impl ReplicateStage { // If leader rotation happened, only write the entries up to leader rotation. entries.truncate(num_entries_to_write); - } + entries + .last() + .expect("Entries cannot be empty at this point") + .id + }; if let Some(sender) = vote_blob_sender { send_validator_vote(bank, keypair, cluster_info, sender)?; @@ -135,7 +140,7 @@ impl ReplicateStage { *entry_height += entries_len; res?; - Ok(()) + Ok(last_entry_id) } pub fn new( @@ -162,6 +167,7 @@ impl ReplicateStage { let now = Instant::now(); let mut next_vote_secs = 1; let mut entry_height_ = entry_height; + let mut last_entry_id = None; loop { let leader_id = leader_scheduler .read() @@ -169,7 +175,14 @@ impl ReplicateStage { .get_scheduled_leader(entry_height_) .expect("Scheduled leader id should never be unknown at this point"); if leader_id == keypair.pubkey() { - return Some(ReplicateStageReturnType::LeaderRotation(entry_height_)); + return Some(ReplicateStageReturnType::LeaderRotation( + entry_height_, + // We should never start the TPU / this stage on an exact entry that causes leader + // rotation (Fullnode should automatically transition on startup if it detects + // are no longer a validator. Hence we can assume that some entry must have + // triggered leader rotation + last_entry_id.expect("Must exist an entry that triggered rotation"), + )); } // Only vote once a second. @@ -180,7 +193,7 @@ impl ReplicateStage { None }; - if let Err(e) = Self::replicate_requests( + match Self::replicate_requests( &bank, &cluster_info, &window_receiver, @@ -190,10 +203,11 @@ impl ReplicateStage { &mut entry_height_, &leader_scheduler, ) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => error!("{:?}", e), + Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break, + Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), + Err(e) => error!("{:?}", e), + Ok(last_entry_id_) => { + last_entry_id = Some(last_entry_id_); } } } @@ -222,7 +236,7 @@ mod test { use cluster_info::{ClusterInfo, Node}; use fullnode::Fullnode; use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; - use ledger::{genesis, next_entries_mut, LedgerWriter}; + use ledger::{create_sample_ledger, next_entries_mut, LedgerWriter}; use logger; use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; use service::Service; @@ -232,7 +246,6 @@ mod test { use std::sync::{Arc, RwLock}; #[test] - #[ignore] pub fn test_replicate_stage_leader_rotation_exit() { logger::setup(); @@ -243,8 +256,8 @@ mod test { let cluster_info_me = ClusterInfo::new(my_node.info.clone()).expect("ClusterInfo::new"); // Create a ledger - let (mint, my_ledger_path) = genesis("test_replicate_stage_leader_rotation_exit", 10_000); - let genesis_entries = mint.create_entries(); + let (mint, my_ledger_path, genesis_entries) = + create_sample_ledger("test_replicate_stage_leader_rotation_exit", 10_000); let mut last_id = genesis_entries .last() .expect("expected at least one genesis entry") @@ -254,7 +267,8 @@ mod test { // 1) Give the validator a nonzero number of tokens 2) A vote from the validator . // This will cause leader rotation after the bootstrap height let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap(); - let bootstrap_entries = make_active_set_entries(&my_keypair, &mint.keypair(), &last_id); + let bootstrap_entries = + make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id); last_id = bootstrap_entries.last().unwrap().id; let ledger_initial_len = (genesis_entries.len() + bootstrap_entries.len()) as u64; ledger_writer.write_entries(bootstrap_entries).unwrap(); @@ -307,11 +321,15 @@ mod test { } entries_to_send.truncate(total_entries_to_send); + let last_id = entries_to_send[(bootstrap_height - 1) as usize].id; entry_sender.send(entries_to_send).unwrap(); // Wait for replicate_stage to exit and check return value is correct assert_eq!( - Some(ReplicateStageReturnType::LeaderRotation(bootstrap_height)), + Some(ReplicateStageReturnType::LeaderRotation( + bootstrap_height, + last_id + )), replicate_stage.join().expect("replicate stage join") ); diff --git a/src/thin_client.rs b/src/thin_client.rs index e41c543eab..62487e260a 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -575,11 +575,13 @@ mod tests { let leader_data = leader.info.clone(); let ledger_path = tmp_ledger("client_check_signature", &alice); + let genesis_entries = &alice.create_entries(); + let entry_height = genesis_entries.len() as u64; let server = Fullnode::new_with_bank( leader_keypair, bank, - 0, - &[], + entry_height, + &genesis_entries, leader, None, &ledger_path, @@ -636,11 +638,13 @@ mod tests { let leader_data = leader.info.clone(); let ledger_path = tmp_ledger("zero_balance_check", &alice); + let genesis_entries = &alice.create_entries(); + let entry_height = genesis_entries.len() as u64; let server = Fullnode::new_with_bank( leader_keypair, bank, - 0, - &[], + entry_height, + &genesis_entries, leader, None, &ledger_path, diff --git a/src/tpu.rs b/src/tpu.rs index f4ca8fb072..2f4d1b16c1 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -30,6 +30,7 @@ use banking_stage::{BankingStage, Config}; use cluster_info::ClusterInfo; use entry::Entry; use fetch_stage::FetchStage; +use hash::Hash; use leader_scheduler::LeaderScheduler; use service::Service; use signature::Keypair; @@ -54,6 +55,7 @@ pub struct Tpu { } impl Tpu { + #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn new( keypair: Arc, bank: &Arc, @@ -63,6 +65,7 @@ impl Tpu { ledger_path: &str, sigverify_disabled: bool, entry_height: u64, + last_entry_id: &Hash, leader_scheduler: Arc>, ) -> (Self, Receiver>, Arc) { let exit = Arc::new(AtomicBool::new(false)); @@ -73,7 +76,7 @@ impl Tpu { SigVerifyStage::new(packet_receiver, sigverify_disabled); let (banking_stage, entry_receiver) = - BankingStage::new(&bank, verified_receiver, tick_duration); + BankingStage::new(&bank, verified_receiver, tick_duration, last_entry_id); let (write_stage, entry_forwarder) = WriteStage::new( keypair, diff --git a/src/tvu.rs b/src/tvu.rs index f9459d601f..8b7669673d 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -39,6 +39,7 @@ use bank::Bank; use blob_fetch_stage::BlobFetchStage; use cluster_info::ClusterInfo; +use hash::Hash; use leader_scheduler::LeaderScheduler; use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; use retransmit_stage::RetransmitStage; @@ -52,7 +53,7 @@ use window::SharedWindow; #[derive(Debug, PartialEq, Eq, Clone)] pub enum TvuReturnType { - LeaderRotation(u64), + LeaderRotation(u64, Hash), } pub struct Tvu { @@ -148,9 +149,9 @@ impl Service for Tvu { self.retransmit_stage.join()?; self.fetch_stage.join()?; match self.replicate_stage.join()? { - Some(ReplicateStageReturnType::LeaderRotation(entry_height)) => { - Ok(Some(TvuReturnType::LeaderRotation(entry_height))) - } + Some(ReplicateStageReturnType::LeaderRotation(entry_height, last_entry_id)) => Ok( + Some(TvuReturnType::LeaderRotation(entry_height, last_entry_id)), + ), _ => Ok(None), } } diff --git a/src/wallet.rs b/src/wallet.rs index 824f04d095..8941797df0 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -1017,11 +1017,13 @@ mod tests { let rpc_port = 11111; // Needs to be distinct known number to not conflict with other tests + let genesis_entries = &alice.create_entries(); + let entry_height = genesis_entries.len() as u64; let server = Fullnode::new_with_bank( leader_keypair, bank, - 0, - &[], + entry_height, + &genesis_entries, leader, None, &ledger_path, diff --git a/tests/multinode.rs b/tests/multinode.rs index 7bd35f7b1e..482aa502e1 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -11,7 +11,7 @@ use solana::entry::Entry; use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::hash::Hash; use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; -use solana::ledger::{read_ledger, LedgerWriter}; +use solana::ledger::{create_sample_ledger, read_ledger, LedgerWriter}; use solana::logger; use solana::mint::Mint; use solana::ncp::Ncp; @@ -787,7 +787,6 @@ fn test_multi_node_dynamic_network() { } #[test] -#[ignore] fn test_leader_to_validator_transition() { logger::setup(); let leader_rotation_interval = 20; @@ -805,7 +804,7 @@ fn test_leader_to_validator_transition() { // Initialize the leader ledger. Make a mint and a genesis entry // in the leader ledger let (mint, leader_ledger_path, genesis_entries) = - genesis("test_leader_to_validator_transition", 10_000); + create_sample_ledger("test_leader_to_validator_transition", 10_000); let last_id = genesis_entries .last() @@ -815,7 +814,8 @@ fn test_leader_to_validator_transition() { // Write the bootstrap entries to the ledger that will cause leader rotation // after the bootstrap height let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); - let bootstrap_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id); + let bootstrap_entries = + make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id); let bootstrap_entries_len = bootstrap_entries.len(); ledger_writer.write_entries(bootstrap_entries).unwrap(); let ledger_initial_len = (genesis_entries.len() + bootstrap_entries_len) as u64; @@ -927,7 +927,6 @@ fn test_leader_to_validator_transition() { } #[test] -#[ignore] fn test_leader_validator_basic() { logger::setup(); let leader_rotation_interval = 10; @@ -946,7 +945,7 @@ fn test_leader_validator_basic() { // Make a common mint and a genesis entry for both leader + validator ledgers let (mint, leader_ledger_path, genesis_entries) = - genesis("test_leader_validator_basic", 10_000); + create_sample_ledger("test_leader_to_validator_transition", 10_000); let validator_ledger_path = tmp_copy_ledger(&leader_ledger_path, "test_leader_validator_basic"); @@ -954,7 +953,6 @@ fn test_leader_validator_basic() { .last() .expect("expected at least one genesis entry") .id; - let genesis_height = genesis_entries.len(); // Initialize both leader + validator ledger let mut ledger_paths = Vec::new(); @@ -964,7 +962,9 @@ fn test_leader_validator_basic() { // Write the bootstrap entries to the ledger that will cause leader rotation // after the bootstrap height let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); - let bootstrap_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id); + let bootstrap_entries = + make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id); + let ledger_initial_len = (genesis_entries.len() + bootstrap_entries.len()) as u64; ledger_writer.write_entries(bootstrap_entries).unwrap(); // Create the leader scheduler config @@ -1004,12 +1004,20 @@ fn test_leader_validator_basic() { // Send transactions to the leader let extra_transactions = std::cmp::max(leader_rotation_interval / 3, 1); - let total_transactions_to_send = bootstrap_height + extra_transactions; // Push "extra_transactions" past leader_rotation_interval entry height, // make sure the validator stops. - for _ in genesis_height as u64..total_transactions_to_send { - send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, None); + for i in ledger_initial_len..(bootstrap_height + extra_transactions) { + let expected_balance = std::cmp::min( + bootstrap_height - ledger_initial_len, + i - ledger_initial_len + 1, + ); + let result = send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, None); + // If the transaction wasn't reflected in the node, then we assume + // the node has transitioned already + if result != Some(expected_balance as i64) { + break; + } } // Wait for validator to shut down tvu and restart tpu @@ -1092,7 +1100,7 @@ fn test_dropped_handoff_recovery() { // Make a common mint and a genesis entry for both leader + validator's ledgers let (mint, bootstrap_leader_ledger_path, genesis_entries) = - genesis("test_dropped_handoff_recovery", 10_000); + create_sample_ledger("test_dropped_handoff_recovery", 10_000); let last_id = genesis_entries .last() @@ -1110,7 +1118,8 @@ fn test_dropped_handoff_recovery() { // Make the entries to give the next_leader validator some stake so that he will be in // leader election active set - let first_entries = make_active_set_entries(&next_leader_keypair, &mint.keypair(), &last_id); + let first_entries = + make_active_set_entries(&next_leader_keypair, &mint.keypair(), &last_id, &last_id); let first_entries_len = first_entries.len(); // Write the entries @@ -1212,8 +1221,8 @@ fn test_dropped_handoff_recovery() { } #[test] -//TODO: Ignore for now due to bug exposed by the test "test_dropped_handoff_recovery" #[ignore] +//TODO: Ignore for now due to bug exposed by the test "test_dropped_handoff_recovery" fn test_full_leader_validator_network() { logger::setup(); // The number of validators @@ -1236,9 +1245,14 @@ fn test_full_leader_validator_network() { // Make a common mint and a genesis entry for both leader + validator's ledgers let (mint, bootstrap_leader_ledger_path, genesis_entries) = - genesis("test_full_leader_validator_network", 10_000); + create_sample_ledger("test_full_leader_validator_network", 10_000); - let mut last_id = genesis_entries + let last_tick_id = genesis_entries + .last() + .expect("expected at least one genesis entry") + .id; + + let mut last_entry_id = genesis_entries .last() .expect("expected at least one genesis entry") .id; @@ -1254,11 +1268,12 @@ fn test_full_leader_validator_network() { for node_keypair in node_keypairs.iter() { // Make entries to give the validator some stake so that he will be in // leader election active set - let bootstrap_entries = make_active_set_entries(node_keypair, &mint.keypair(), &last_id); + let bootstrap_entries = + make_active_set_entries(node_keypair, &mint.keypair(), &last_entry_id, &last_tick_id); // Write the entries let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); - last_id = bootstrap_entries + last_entry_id = bootstrap_entries .last() .expect("expected at least one genesis entry") .id;