From 0bd1412562303763d23fd034adf51c104a5c8dfd Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 18 Oct 2018 22:57:48 -0700 Subject: [PATCH] Switch leader scheduler to use PoH ticks instead of Entry height (#1519) * Add PoH height to process_ledger() * Moved broadcast_stage Leader Scheduling logic to use Poh height instead of entry_height * Moved LeaderScheduler logic to PoH in ReplicateStage * Fix Leader scheduling tests to use PoH instead of entry height * Change is_leader detection in repair() to use PoH instead of entry height * Add tests to LeaderScheduler for new functionality * fix Entry::new and genesis block PoH counts * Moved LeaderScheduler to PoH ticks * Cleanup to resolve PR comments --- benches/banking_stage.rs | 4 + src/bank.rs | 112 ++++---- src/banking_stage.rs | 176 ++++++++++--- src/bin/ledger-tool.rs | 5 +- src/broadcast_stage.rs | 9 + src/cluster_info.rs | 20 +- src/drone.rs | 1 + src/entry.rs | 39 ++- src/fullnode.rs | 198 +++++++++++--- src/leader_scheduler.rs | 299 ++++++++++++++++++++- src/ledger.rs | 23 +- src/mint.rs | 2 +- src/packet.rs | 10 +- src/poh.rs | 6 +- src/poh_recorder.rs | 88 +++++-- src/replicate_stage.rs | 102 ++++--- src/replicator.rs | 1 + src/result.rs | 8 +- src/retransmit_stage.rs | 2 + src/rpc.rs | 1 + src/thin_client.rs | 4 + src/tpu.rs | 27 +- src/tvu.rs | 18 +- src/wallet.rs | 5 + src/window.rs | 53 ++-- src/window_service.rs | 9 + src/write_stage.rs | 555 +-------------------------------------- tests/multinode.rs | 160 +++++------ 28 files changed, 1063 insertions(+), 874 deletions(-) diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index eaa78d220b..992d5851da 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -103,6 +103,8 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { verified_receiver, Default::default(), &mint.last_id(), + 0, + None, ); let mut id = mint.last_id(); @@ -202,6 +204,8 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { verified_receiver, Default::default(), &mint.last_id(), + 0, + None, ); let mut id = mint.last_id(); diff --git a/src/bank.rs b/src/bank.rs index 22b66a13b5..906d9edd77 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -871,40 +871,50 @@ impl Bank { results } - pub fn process_entry_votes( - bank: &Bank, + pub fn process_entry( + &self, entry: &Entry, - entry_height: u64, + tick_height: &mut u64, + leader_scheduler: &mut LeaderScheduler, + ) -> Result<()> { + if !entry.is_tick() { + for result in self.process_transactions(&entry.transactions) { + result?; + } + } else { + *tick_height += 1; + self.register_entry_id(&entry.id); + } + + self.process_entry_votes(entry, *tick_height, leader_scheduler); + Ok(()) + } + + fn process_entry_votes( + &self, + entry: &Entry, + tick_height: u64, leader_scheduler: &mut LeaderScheduler, ) { for tx in &entry.transactions { if tx.vote().is_some() { // Update the active set in the leader scheduler - leader_scheduler.push_vote(*tx.from(), entry_height); + leader_scheduler.push_vote(*tx.from(), tick_height); } } - leader_scheduler.update_height(entry_height, bank); - } - - pub fn process_entry(&self, entry: &Entry) -> Result<()> { - if !entry.transactions.is_empty() { - for result in self.process_transactions(&entry.transactions) { - result?; - } - } else { - self.register_entry_id(&entry.id); - } - Ok(()) + leader_scheduler.update_height(tick_height, self); } /// Process an ordered list of entries, populating a circular buffer "tail" - /// as we go. + /// as we go. fn process_entries_tail( &self, entries: &[Entry], tail: &mut Vec, tail_idx: &mut usize, + tick_height: &mut u64, + leader_scheduler: &mut LeaderScheduler, ) -> Result { let mut entry_count = 0; @@ -917,7 +927,12 @@ impl Bank { *tail_idx = (*tail_idx + 1) % WINDOW_SIZE as usize; entry_count += 1; - self.process_entry(entry)?; + // TODO: We prepare for implementing voting contract by making the associated + // process_entries functions aware of the vote-tracking structure inside + // the leader scheduler. Next we will extract the vote tracking structure + // out of the leader scheduler, and into the bank, and remove the leader + // scheduler from these banking functions. + self.process_entry(entry, tick_height, leader_scheduler)?; } Ok(entry_count) @@ -958,7 +973,7 @@ impl Bank { // accumulator for entries that can be processed in parallel let mut mt_group = vec![]; for entry in entries { - if entry.transactions.is_empty() { + if entry.is_tick() { // if its a tick, execute the group and register the tick self.par_execute_entries(&mt_group)?; self.register_entry_id(&entry.id); @@ -992,37 +1007,37 @@ impl Bank { tail: &mut Vec, tail_idx: &mut usize, leader_scheduler: &mut LeaderScheduler, - ) -> Result + ) -> Result<(u64, u64)> where I: IntoIterator, { // Ledger verification needs to be parallelized, but we can't pull the whole // thing into memory. We therefore chunk it. - let mut entry_count = *tail_idx as u64; + let mut entry_height = *tail_idx as u64; + let mut tick_height = 0; + for entry in &tail[0..*tail_idx] { + tick_height += entry.is_tick() as u64 + } + let mut id = start_hash; for block in &entries.into_iter().chunks(VERIFY_BLOCK_SIZE) { let block: Vec<_> = block.collect(); if !block.verify(&id) { - warn!("Ledger proof of history failed at entry: {}", entry_count); + warn!("Ledger proof of history failed at entry: {}", entry_height); return Err(BankError::LedgerVerificationFailed); } id = block.last().unwrap().id; - let tail_count = self.process_entries_tail(&block, tail, tail_idx)?; + let entry_count = self.process_entries_tail( + &block, + tail, + tail_idx, + &mut tick_height, + leader_scheduler, + )?; - if !leader_scheduler.use_only_bootstrap_leader { - for (i, entry) in block.iter().enumerate() { - Self::process_entry_votes( - self, - &entry, - entry_count + i as u64 + 1, - leader_scheduler, - ); - } - } - - entry_count += tail_count; + entry_height += entry_count; } - Ok(entry_count) + Ok((tick_height, entry_height)) } /// Process a full ledger. @@ -1030,7 +1045,7 @@ impl Bank { &self, entries: I, leader_scheduler: &mut LeaderScheduler, - ) -> Result<(u64, Vec)> + ) -> Result<(u64, u64, Vec)> where I: IntoIterator, { @@ -1072,7 +1087,7 @@ impl Bank { tail.push(entry0); tail.push(entry1); let mut tail_idx = 2; - let entry_count = self.process_blocks( + let (tick_height, entry_height) = self.process_blocks( entry1_id, entries, &mut tail, @@ -1085,7 +1100,7 @@ impl Bank { tail.rotate_left(tail_idx) } - Ok((entry_count, tail)) + Ok((tick_height, entry_height, tail)) } /// Create, sign, and process a Transaction from `keypair` to `to` of @@ -1618,7 +1633,7 @@ mod tests { let mut last_id = mint.last_id(); let mut hash = mint.last_id(); let mut entries: Vec = vec![]; - let mut num_hashes = 0; + let num_hashes = 1; for k in keypairs { let txs = vec![Transaction::system_new( &mint.keypair(), @@ -1629,7 +1644,8 @@ mod tests { let mut e = ledger::next_entries(&hash, 0, txs); entries.append(&mut e); hash = entries.last().unwrap().id; - let tick = Entry::new_mut(&mut hash, &mut num_hashes, vec![]); + let tick = Entry::new(&hash, num_hashes, vec![]); + hash = tick.id; last_id = hash; entries.push(tick); } @@ -1645,14 +1661,16 @@ mod tests { let mut entries = Vec::with_capacity(length); let mut hash = mint.last_id(); let mut last_id = mint.last_id(); - let mut num_hashes = 0; + let num_hashes = 1; for i in 0..length { let keypair = Keypair::new(); let tx = Transaction::system_new(&mint.keypair(), keypair.pubkey(), 1, last_id); - let entry = Entry::new_mut(&mut hash, &mut num_hashes, vec![tx]); + let entry = Entry::new(&hash, num_hashes, vec![tx]); + hash = entry.id; entries.push(entry); if (i + 1) % ticks == 0 { - let tick = Entry::new_mut(&mut hash, &mut num_hashes, vec![]); + let tick = Entry::new(&hash, num_hashes, vec![]); + hash = tick.id; last_id = hash; entries.push(tick); } @@ -1681,11 +1699,12 @@ mod tests { let (ledger, pubkey) = create_sample_ledger(1); let (ledger, dup) = ledger.tee(); let bank = Bank::default(); - let (ledger_height, tail) = bank + let (tick_height, ledger_height, tail) = bank .process_ledger(ledger, &mut LeaderScheduler::default()) .unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(ledger_height, 4); + assert_eq!(tick_height, 2); assert_eq!(tail.len(), 4); assert_eq!(tail, dup.collect_vec()); let last_entry = &tail[tail.len() - 1]; @@ -1708,11 +1727,12 @@ mod tests { for entry_count in window_size - 3..window_size + 2 { let (ledger, pubkey) = create_sample_ledger(entry_count); let bank = Bank::default(); - let (ledger_height, tail) = bank + let (tick_height, ledger_height, tail) = bank .process_ledger(ledger, &mut LeaderScheduler::default()) .unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(ledger_height, entry_count as u64 + 3); + assert_eq!(tick_height, 2); assert!(tail.len() <= window_size); let last_entry = &tail[tail.len() - 1]; assert_eq!(bank.last_id(), last_entry.id); diff --git a/src/banking_stage.rs b/src/banking_stage.rs index a2ea97288c..702cda37dc 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -9,7 +9,7 @@ use entry::Entry; use hash::Hash; use log::Level; use packet::Packets; -use poh_recorder::PohRecorder; +use poh_recorder::{PohRecorder, PohRecorderError}; use rayon::prelude::*; use result::{Error, Result}; use service::Service; @@ -25,13 +25,20 @@ use std::time::Instant; use timing; use transaction::Transaction; +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum BankingStageReturnType { + LeaderRotation, + ChannelDisconnected, +} + // number of threads is 1 until mt bank is ready pub const NUM_THREADS: usize = 10; /// Stores the stage's thread handle and output receiver. pub struct BankingStage { /// Handle to the stage's thread. - thread_hdls: Vec>, + bank_thread_hdls: Vec>>, + tick_producer: JoinHandle>, } pub enum Config { @@ -55,10 +62,18 @@ impl BankingStage { verified_receiver: Receiver, config: Config, last_entry_id: &Hash, + tick_height: u64, + max_tick_height: Option, ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver)); - let poh = PohRecorder::new(bank.clone(), entry_sender, *last_entry_id); + let poh = PohRecorder::new( + bank.clone(), + entry_sender, + *last_entry_id, + tick_height, + max_tick_height, + ); let tick_poh = poh.clone(); // Tick producer is a headless producer, so when it exits it should notify the banking stage. // Since channel are not used to talk between these threads an AtomicBool is used as a @@ -71,21 +86,25 @@ impl BankingStage { let tick_producer = Builder::new() .name("solana-banking-stage-tick_producer".to_string()) .spawn(move || { - if let Err(e) = Self::tick_producer(&tick_poh, &config, &poh_exit) { - match e { - Error::SendError => (), - _ => error!( + let mut tick_poh_ = tick_poh; + let return_value = match Self::tick_producer(&mut tick_poh_, &config, &poh_exit) { + Err(Error::SendError) => Some(BankingStageReturnType::ChannelDisconnected), + Err(e) => { + error!( "solana-banking-stage-tick_producer unexpected error {:?}", e - ), + ); + None } - } + Ok(x) => x, + }; debug!("tick producer exiting"); poh_exit.store(true, Ordering::Relaxed); + return_value }).unwrap(); // Many banks that process transactions in parallel. - let mut thread_hdls: Vec> = (0..NUM_THREADS) + let bank_thread_hdls: Vec>> = (0..NUM_THREADS) .map(|_| { let thread_bank = bank.clone(); let thread_verified_receiver = shared_verified_receiver.clone(); @@ -94,7 +113,7 @@ impl BankingStage { Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { - loop { + let return_result = loop { if let Err(e) = Self::process_packets( &thread_bank, &thread_verified_receiver, @@ -104,23 +123,37 @@ impl BankingStage { match e { Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { - break + break Some(BankingStageReturnType::ChannelDisconnected); + } + Error::RecvError(_) => { + break Some(BankingStageReturnType::ChannelDisconnected); + } + Error::SendError => { + break Some(BankingStageReturnType::ChannelDisconnected); + } + Error::PohRecorderError(PohRecorderError::MaxHeightReached) => { + break Some(BankingStageReturnType::LeaderRotation); } - Error::RecvError(_) => break, - Error::SendError => break, _ => error!("solana-banking-stage-tx {:?}", e), } } if thread_banking_exit.load(Ordering::Relaxed) { debug!("tick service exited"); - break; + break None; } - } + }; thread_banking_exit.store(true, Ordering::Relaxed); + return_result }).unwrap() }).collect(); - thread_hdls.push(tick_producer); - (BankingStage { thread_hdls }, entry_receiver) + + ( + BankingStage { + bank_thread_hdls, + tick_producer, + }, + entry_receiver, + ) } /// Convert the transactions from a blob of binary data to a vector of transactions and @@ -135,22 +168,43 @@ impl BankingStage { }).collect() } - fn tick_producer(poh: &PohRecorder, config: &Config, poh_exit: &AtomicBool) -> Result<()> { + fn tick_producer( + poh: &mut PohRecorder, + config: &Config, + poh_exit: &AtomicBool, + ) -> Result> { loop { match *config { Config::Tick(num) => { for _ in 0..num { - poh.hash(); + match poh.hash() { + Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => { + return Ok(Some(BankingStageReturnType::LeaderRotation)); + } + Err(e) => { + return Err(e); + } + _ => (), + } } } Config::Sleep(duration) => { sleep(duration); } } - poh.tick()?; + match poh.tick() { + Ok(height) if Some(height) == poh.max_tick_height => { + // CASE 1: We were successful in recording the last tick, so exit + return Ok(Some(BankingStageReturnType::LeaderRotation)); + } + Ok(_) => (), + Err(e) => { + return Err(e); + } + }; if poh_exit.load(Ordering::Relaxed) { debug!("tick service exited"); - return Ok(()); + return Ok(None); } } } @@ -242,13 +296,24 @@ impl BankingStage { } impl Service for BankingStage { - type JoinReturnType = (); + type JoinReturnType = Option; - fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; + fn join(self) -> thread::Result> { + let mut return_value = None; + + for bank_thread_hdl in self.bank_thread_hdls { + let thread_return_value = bank_thread_hdl.join()?; + if thread_return_value.is_some() { + return_value = thread_return_value; + } } - Ok(()) + + let tick_return_value = self.tick_producer.join()?; + if tick_return_value.is_some() { + return_value = tick_return_value; + } + + Ok(return_value) } } @@ -256,6 +321,7 @@ impl Service for BankingStage { mod tests { use super::*; use bank::Bank; + use banking_stage::BankingStageReturnType; use ledger::Block; use mint::Mint; use packet::to_packets; @@ -273,9 +339,14 @@ mod tests { verified_receiver, Default::default(), &bank.last_id(), + 0, + None, ); drop(verified_sender); - assert_eq!(banking_stage.join().unwrap(), ()); + assert_eq!( + banking_stage.join().unwrap(), + Some(BankingStageReturnType::ChannelDisconnected) + ); } #[test] @@ -287,9 +358,14 @@ mod tests { verified_receiver, Default::default(), &bank.last_id(), + 0, + None, ); drop(entry_receiver); - assert_eq!(banking_stage.join().unwrap(), ()); + assert_eq!( + banking_stage.join().unwrap(), + Some(BankingStageReturnType::ChannelDisconnected) + ); } #[test] @@ -302,6 +378,8 @@ mod tests { verified_receiver, Config::Sleep(Duration::from_millis(1)), &bank.last_id(), + 0, + None, ); sleep(Duration::from_millis(500)); drop(verified_sender); @@ -310,7 +388,10 @@ mod tests { assert!(entries.len() != 0); assert!(entries.verify(&start_hash)); assert_eq!(entries[entries.len() - 1].id, bank.last_id()); - assert_eq!(banking_stage.join().unwrap(), ()); + assert_eq!( + banking_stage.join().unwrap(), + Some(BankingStageReturnType::ChannelDisconnected) + ); } #[test] @@ -324,6 +405,8 @@ mod tests { verified_receiver, Default::default(), &bank.last_id(), + 0, + None, ); // good tx @@ -359,7 +442,10 @@ mod tests { last_id = entries.last().unwrap().id; }); drop(entry_receiver); - assert_eq!(banking_stage.join().unwrap(), ()); + assert_eq!( + banking_stage.join().unwrap(), + Some(BankingStageReturnType::ChannelDisconnected) + ); } #[test] fn test_banking_stage_entryfication() { @@ -374,6 +460,8 @@ mod tests { verified_receiver, Default::default(), &bank.last_id(), + 0, + None, ); // Process a batch that includes a transaction that receives two tokens. @@ -392,7 +480,10 @@ mod tests { .send(vec![(packets[0].clone(), vec![1u8])]) .unwrap(); drop(verified_sender); - assert_eq!(banking_stage.join().unwrap(), ()); + assert_eq!( + banking_stage.join().unwrap(), + Some(BankingStageReturnType::ChannelDisconnected) + ); // Collect the ledger and feed it to a new bank. let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect(); @@ -410,4 +501,25 @@ mod tests { } assert_eq!(bank.get_balance(&alice.pubkey()), 1); } + + // Test that when the max_tick_height is reached, the banking stage exits + // with reason BankingStageReturnType::LeaderRotation + #[test] + fn test_max_tick_height_shutdown() { + let bank = Arc::new(Bank::new(&Mint::new(2))); + let (_verified_sender_, verified_receiver) = channel(); + let max_tick_height = 10; + let (banking_stage, _entry_receiver) = BankingStage::new( + &bank, + verified_receiver, + Default::default(), + &bank.last_id(), + 0, + Some(max_tick_height), + ); + assert_eq!( + banking_stage.join().unwrap(), + Some(BankingStageReturnType::LeaderRotation) + ); + } } diff --git a/src/bin/ledger-tool.rs b/src/bin/ledger-tool.rs index 962bc4d64c..c8fbcacebb 100644 --- a/src/bin/ledger-tool.rs +++ b/src/bin/ledger-tool.rs @@ -142,7 +142,10 @@ fn main() { } last_id = entry.id; - if let Err(e) = bank.process_entry(&entry) { + let mut tick_height = 0; + let mut leader_scheduler = LeaderScheduler::default(); + if let Err(e) = bank.process_entry(&entry, &mut tick_height, &mut leader_scheduler) + { eprintln!("verify failed at entry[{}], err: {:?}", i + 2, e); if !matches.is_present("continue") { exit(1); diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 7b78194cd0..e2ab5a9496 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -32,6 +32,7 @@ pub enum BroadcastStageReturnType { #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] fn broadcast( leader_scheduler: &Arc>, + mut tick_height: u64, node_info: &NodeInfo, broadcast_table: &[NodeInfo], window: &SharedWindow, @@ -49,6 +50,9 @@ fn broadcast( ventries.push(entries); while let Ok(entries) = receiver.try_recv() { num_entries += entries.len(); + tick_height += entries + .iter() + .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64); ventries.push(entries); } inc_new_counter_info!("broadcast_stage-entries_received", num_entries); @@ -134,6 +138,7 @@ fn broadcast( // Send blobs out from the window ClusterInfo::broadcast( &leader_scheduler, + tick_height, &node_info, &broadcast_table, &window, @@ -194,6 +199,7 @@ impl BroadcastStage { entry_height: u64, receiver: &Receiver>, leader_scheduler: &Arc>, + tick_height: u64, ) -> BroadcastStageReturnType { let mut transmit_index = WindowIndex { data: entry_height, @@ -205,6 +211,7 @@ impl BroadcastStage { let broadcast_table = cluster_info.read().unwrap().compute_broadcast_table(); if let Err(e) = broadcast( leader_scheduler, + tick_height, &me, &broadcast_table, &window, @@ -250,6 +257,7 @@ impl BroadcastStage { entry_height: u64, receiver: Receiver>, leader_scheduler: Arc>, + tick_height: u64, exit_sender: Arc, ) -> Self { let thread_hdl = Builder::new() @@ -263,6 +271,7 @@ impl BroadcastStage { entry_height, &receiver, &leader_scheduler, + tick_height, ) }).unwrap(); diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 0e285c17db..eed0ba16e8 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -496,6 +496,7 @@ impl ClusterInfo { #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn broadcast( leader_scheduler: &Arc>, + tick_height: u64, me: &NodeInfo, broadcast_table: &[NodeInfo], window: &SharedWindow, @@ -537,13 +538,13 @@ impl ClusterInfo { br_idx ); - // Make sure the next leader in line knows about the last entry before rotation - // so he can initiate repairs if necessary - let entry_height = idx + 1; - + // Make sure the next leader in line knows about the entries before his slot in the leader + // rotation so he can initiate repairs if necessary { let ls_lock = leader_scheduler.read().unwrap(); - let next_leader_id = ls_lock.get_scheduled_leader(entry_height); + let next_leader_height = ls_lock.max_height_for_leader(tick_height); + let next_leader_id = + next_leader_height.map(|nlh| ls_lock.get_scheduled_leader(nlh)); // In the case the next scheduled leader is None, then the write_stage moved // the schedule too far ahead and we no longer are in the known window // (will happen during calculation of the next set of slots every epoch or @@ -555,10 +556,11 @@ impl ClusterInfo { // scheduled leader, so the next leader will have to rely on avalanche/repairs // to get this last blob, which could cause slowdowns during leader handoffs. // See corresponding issue for repairs in repair() function in window.rs. - if next_leader_id.is_some() && next_leader_id != Some(me.id) { - let info_result = broadcast_table - .iter() - .position(|n| n.id == next_leader_id.unwrap()); + if let Some(Some(next_leader_id)) = next_leader_id { + if next_leader_id == me.id { + break; + } + let info_result = broadcast_table.iter().position(|n| n.id == next_leader_id); if let Some(index) = info_result { orders.push((window_l[w_idx].data.clone(), &broadcast_table[index])); } diff --git a/src/drone.rs b/src/drone.rs index bcaeb2edbd..2c8ba0bfc6 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -327,6 +327,7 @@ mod tests { leader_keypair, bank, 0, + 0, &[], leader, None, diff --git a/src/entry.rs b/src/entry.rs index 31cc4a528e..3f4209f8a3 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -48,12 +48,33 @@ pub struct Entry { impl Entry { /// Creates the next Entry `num_hashes` after `start_hash`. pub fn new(start_hash: &Hash, num_hashes: u64, transactions: Vec) -> Self { - let num_hashes = num_hashes + if transactions.is_empty() { 0 } else { 1 }; - let id = next_hash(start_hash, 0, &transactions); - let entry = Entry { - num_hashes, - id, - transactions, + let entry = { + if num_hashes == 0 && transactions.is_empty() { + Entry { + num_hashes: 0, + id: *start_hash, + transactions, + } + } else if num_hashes == 0 { + // If you passed in transactions, but passed in num_hashes == 0, then + // next_hash will generate the next hash and set num_hashes == 1 + let id = next_hash(start_hash, 1, &transactions); + Entry { + num_hashes: 1, + id, + transactions, + } + } else { + // Otherwise, the next Entry `num_hashes` after `start_hash`. + // If you wanted a tick for instance, then pass in num_hashes = 1 + // and transactions = empty + let id = next_hash(start_hash, num_hashes, &transactions); + Entry { + num_hashes, + id, + transactions, + } + } }; let size = serialized_size(&entry).unwrap(); @@ -175,6 +196,10 @@ impl Entry { } true } + + pub fn is_tick(&self) -> bool { + self.transactions.is_empty() + } } /// Creates the hash `num_hashes` after `start_hash`. If the transaction contains @@ -186,7 +211,7 @@ fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) - return *start_hash; } - let mut poh = Poh::new(*start_hash); + let mut poh = Poh::new(*start_hash, 0); for _ in 1..num_hashes { poh.hash(); diff --git a/src/fullnode.rs b/src/fullnode.rs index 698663dc84..d16a19519c 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -77,6 +77,7 @@ impl ValidatorServices { } } +#[derive(Debug)] pub enum FullnodeReturnType { LeaderToValidatorRotation, ValidatorToLeaderRotation, @@ -137,7 +138,7 @@ impl Fullnode { mut leader_scheduler: LeaderScheduler, ) -> Self { info!("creating bank..."); - let (bank, entry_height, ledger_tail) = + let (bank, tick_height, entry_height, ledger_tail) = Self::new_bank_from_ledger(ledger_path, &mut leader_scheduler); info!("creating networking stack..."); @@ -154,6 +155,7 @@ impl Fullnode { let server = Self::new_with_bank( keypair, bank, + tick_height, entry_height, &ledger_tail, node, @@ -236,6 +238,7 @@ impl Fullnode { pub fn new_with_bank( keypair: Keypair, bank: Bank, + tick_height: u64, entry_height: u64, ledger_tail: &[Entry], node: Node, @@ -308,7 +311,7 @@ impl Fullnode { let scheduled_leader = leader_scheduler .read() .unwrap() - .get_scheduled_leader(entry_height) + .get_scheduled_leader(tick_height) .expect("Leader not known after processing bank"); cluster_info.write().unwrap().set_leader(scheduled_leader); @@ -317,6 +320,7 @@ impl Fullnode { let tvu = Tvu::new( keypair.clone(), &bank, + tick_height, entry_height, cluster_info.clone(), shared_window.clone(), @@ -339,6 +343,10 @@ impl Fullnode { let validator_state = ValidatorServices::new(tvu); Some(NodeRole::Validator(validator_state)) } else { + let max_tick_height = { + let ls_lock = leader_scheduler.read().unwrap(); + ls_lock.max_height_for_leader(tick_height) + }; // Start in leader mode. let (tpu, entry_receiver, tpu_exit) = Tpu::new( keypair.clone(), @@ -352,9 +360,9 @@ impl Fullnode { .collect(), ledger_path, sigverify_disabled, - entry_height, + tick_height, + max_tick_height, last_entry_id, - leader_scheduler.clone(), ); let broadcast_stage = BroadcastStage::new( @@ -367,6 +375,7 @@ impl Fullnode { entry_height, entry_receiver, leader_scheduler.clone(), + tick_height, tpu_exit, ); let leader_state = LeaderServices::new(tpu, broadcast_stage); @@ -398,14 +407,14 @@ impl Fullnode { } fn leader_to_validator(&mut self) -> Result<()> { - let (scheduled_leader, entry_height) = { + let (scheduled_leader, tick_height, entry_height, last_entry_id) = { let mut ls_lock = self.leader_scheduler.write().unwrap(); // Clear the leader scheduler ls_lock.reset(); // TODO: We can avoid building the bank again once RecordStage is // integrated with BankingStage - let (bank, entry_height, _) = + let (bank, tick_height, entry_height, ledger_tail) = Self::new_bank_from_ledger(&self.ledger_path, &mut *ls_lock); self.bank = Arc::new(bank); @@ -414,7 +423,12 @@ impl Fullnode { ls_lock .get_scheduled_leader(entry_height) .expect("Scheduled leader should exist after rebuilding bank"), + tick_height, entry_height, + ledger_tail + .last() + .expect("Expected at least one entry in the ledger") + .id, ) }; @@ -439,9 +453,19 @@ impl Fullnode { )); } + // In the rare case that the leader exited on a multiple of seed_rotation_interval + // when the new leader schedule was being generated, and there are no other validators + // in the active set, then the leader scheduler will pick the same leader again, so + // check for that + if scheduled_leader == self.keypair.pubkey() { + self.validator_to_leader(tick_height, entry_height, last_entry_id); + return Ok(()); + } + let tvu = Tvu::new( self.keypair.clone(), &self.bank, + tick_height, entry_height, self.cluster_info.clone(), self.shared_window.clone(), @@ -463,11 +487,17 @@ impl Fullnode { Ok(()) } - fn validator_to_leader(&mut self, entry_height: u64, last_entry_id: Hash) { + fn validator_to_leader(&mut self, tick_height: u64, entry_height: u64, last_entry_id: Hash) { self.cluster_info .write() .unwrap() .set_leader(self.keypair.pubkey()); + + let max_tick_height = { + let ls_lock = self.leader_scheduler.read().unwrap(); + ls_lock.max_height_for_leader(tick_height) + }; + let (tpu, blob_receiver, tpu_exit) = Tpu::new( self.keypair.clone(), &self.bank, @@ -479,12 +509,12 @@ impl Fullnode { .collect(), &self.ledger_path, self.sigverify_disabled, - entry_height, + tick_height, + max_tick_height, // We pass the last_entry_id from the replicate stage because we can't trust that // the window didn't overwrite the slot at for the last entry that the replicate stage // processed. We also want to avoid reading processing the ledger for the last id. &last_entry_id, - self.leader_scheduler.clone(), ); let broadcast_stage = BroadcastStage::new( @@ -496,6 +526,7 @@ impl Fullnode { entry_height, blob_receiver, self.leader_scheduler.clone(), + tick_height, tpu_exit, ); let leader_state = LeaderServices::new(tpu, broadcast_stage); @@ -521,8 +552,9 @@ impl Fullnode { _ => Ok(None), }, Some(NodeRole::Validator(validator_services)) => match validator_services.join()? { - Some(TvuReturnType::LeaderRotation(entry_height, last_entry_id)) => { - self.validator_to_leader(entry_height, last_entry_id); + Some(TvuReturnType::LeaderRotation(tick_height, entry_height, last_entry_id)) => { + //TODO: Fix this to return actual poh height. + self.validator_to_leader(tick_height, entry_height, last_entry_id); Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation)) } _ => Ok(None), @@ -552,19 +584,19 @@ impl Fullnode { pub fn new_bank_from_ledger( ledger_path: &str, leader_scheduler: &mut LeaderScheduler, - ) -> (Bank, u64, Vec) { + ) -> (Bank, u64, u64, Vec) { let bank = Bank::default(); let entries = read_ledger(ledger_path, true).expect("opening ledger"); let entries = entries .map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err))); info!("processing ledger..."); - let (entry_height, ledger_tail) = bank + let (tick_height, entry_height, ledger_tail) = bank .process_ledger(entries, leader_scheduler) .expect("process_ledger"); // entry_height is the network-wide agreed height of the ledger. // initialize it from the input ledger info!("processed {} ledger...", entry_height); - (bank, entry_height, ledger_tail) + (bank, tick_height, entry_height, ledger_tail) } } @@ -581,7 +613,7 @@ impl Service for Fullnode { match self.node_role { Some(NodeRole::Validator(validator_service)) => { - if let Some(TvuReturnType::LeaderRotation(_, _)) = validator_service.join()? { + if let Some(TvuReturnType::LeaderRotation(_, _, _)) = validator_service.join()? { return Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation)); } } @@ -601,7 +633,7 @@ impl Service for Fullnode { mod tests { use bank::Bank; use cluster_info::Node; - use fullnode::{Fullnode, NodeRole, TvuReturnType}; + use fullnode::{Fullnode, FullnodeReturnType, NodeRole, TvuReturnType}; use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; use ledger::{create_tmp_genesis, create_tmp_sample_ledger, LedgerWriter}; use packet::make_consecutive_blobs; @@ -627,6 +659,7 @@ mod tests { let v = Fullnode::new_with_bank( keypair, bank, + 0, entry_height, &genesis_entries, tn, @@ -658,6 +691,7 @@ mod tests { Fullnode::new_with_bank( keypair, bank, + 0, entry_height, &genesis_entries, tn, @@ -682,6 +716,70 @@ mod tests { } } + #[test] + fn test_leader_to_leader_transition() { + // Create the leader node information + let bootstrap_leader_keypair = Keypair::new(); + let bootstrap_leader_node = + Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey()); + let bootstrap_leader_info = bootstrap_leader_node.info.clone(); + + // Make a mint and a genesis entries for leader ledger + let num_ending_ticks = 1; + let (_, bootstrap_leader_ledger_path, genesis_entries) = + create_tmp_sample_ledger("test_leader_to_leader_transition", 10_000, num_ending_ticks); + + let initial_tick_height = genesis_entries + .iter() + .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64); + + // Create the common leader scheduling configuration + let num_slots_per_epoch = 3; + let leader_rotation_interval = 5; + let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval; + let active_window_length = 5; + + // Set the bootstrap height to be bigger than the initial tick height. + // Once the leader hits the bootstrap height ticks, because there are no other + // choices in the active set, this leader will remain the leader in the next + // epoch. In the next epoch, check that the same leader knows to shut down and + // restart as a leader again. + let bootstrap_height = initial_tick_height + 1; + let leader_scheduler_config = LeaderSchedulerConfig::new( + bootstrap_leader_info.id, + Some(bootstrap_height as u64), + Some(leader_rotation_interval), + Some(seed_rotation_interval), + Some(active_window_length), + ); + + // Start up the leader + let mut bootstrap_leader = Fullnode::new( + bootstrap_leader_node, + &bootstrap_leader_ledger_path, + bootstrap_leader_keypair, + Some(bootstrap_leader_info.contact_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + ); + + // Wait for the leader to transition, ticks should cause the leader to + // reach the height for leader rotation + match bootstrap_leader.handle_role_transition().unwrap() { + Some(FullnodeReturnType::LeaderToValidatorRotation) => (), + _ => { + panic!("Expected a leader transition"); + } + } + + match bootstrap_leader.node_role { + Some(NodeRole::Leader(_)) => (), + _ => { + panic!("Expected bootstrap leader to be a leader"); + } + } + } + #[test] fn test_wrong_role_transition() { // Create the leader node information @@ -695,8 +793,9 @@ mod tests { let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); // Make a common mint and a genesis entry for both leader + validator's ledgers + let num_ending_ticks = 1; let (mint, bootstrap_leader_ledger_path, genesis_entries) = - create_tmp_sample_ledger("test_wrong_role_transition", 10_000); + create_tmp_sample_ledger("test_wrong_role_transition", 10_000, num_ending_ticks); let last_id = genesis_entries .last() @@ -706,27 +805,35 @@ mod tests { // Write the entries to the ledger that will cause leader rotation // after the bootstrap height let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); - let first_entries = - make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id); + let active_set_entries = make_active_set_entries( + &validator_keypair, + &mint.keypair(), + &last_id, + &last_id, + num_ending_ticks, + ); - let ledger_initial_len = (genesis_entries.len() + first_entries.len()) as u64; - ledger_writer.write_entries(first_entries).unwrap(); + let genesis_tick_height = genesis_entries + .iter() + .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64) + + num_ending_ticks as u64; + ledger_writer.write_entries(active_set_entries).unwrap(); // Create the common leader scheduling configuration let num_slots_per_epoch = 3; let leader_rotation_interval = 5; let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval; - // Set the bootstrap height exactly the current ledger length, so that we can + // Set the bootstrap height exactly the current tick height, so that we can // test if the bootstrap leader knows to immediately transition to a validator // after parsing the ledger during startup - let bootstrap_height = ledger_initial_len; + let bootstrap_height = genesis_tick_height; let leader_scheduler_config = LeaderSchedulerConfig::new( bootstrap_leader_info.id, Some(bootstrap_height), Some(leader_rotation_interval), Some(seed_rotation_interval), - Some(ledger_initial_len), + Some(genesis_tick_height), ); // Test that a node knows to transition to a validator based on parsing the ledger @@ -774,8 +881,13 @@ mod tests { let leader_ncp = leader_node.info.contact_info.ncp; // Create validator identity - let (mint, validator_ledger_path, genesis_entries) = - create_tmp_sample_ledger("test_validator_to_leader_transition", 10_000); + let num_ending_ticks = 1; + let (mint, validator_ledger_path, genesis_entries) = create_tmp_sample_ledger( + "test_validator_to_leader_transition", + 10_000, + num_ending_ticks, + ); + let validator_keypair = Keypair::new(); let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); let validator_info = validator_node.info.clone(); @@ -793,12 +905,16 @@ mod tests { // // 2) A vote from the validator let mut ledger_writer = LedgerWriter::open(&validator_ledger_path, false).unwrap(); - let bootstrap_entries = - make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id); - let bootstrap_entries_len = bootstrap_entries.len(); - last_id = bootstrap_entries.last().unwrap().id; - ledger_writer.write_entries(bootstrap_entries).unwrap(); - let ledger_initial_len = (genesis_entries.len() + bootstrap_entries_len) as u64; + let active_set_entries = + make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); + let initial_tick_height = genesis_entries + .iter() + .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64); + let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; + let active_set_entries_len = active_set_entries.len() as u64; + last_id = active_set_entries.last().unwrap().id; + ledger_writer.write_entries(active_set_entries).unwrap(); + let ledger_initial_len = genesis_entries.len() as u64 + active_set_entries_len; // Set the leader scheduler for the validator let leader_rotation_interval = 10; @@ -864,8 +980,8 @@ mod tests { let join_result = validator_services .join() .expect("Expected successful validator join"); - if let Some(TvuReturnType::LeaderRotation(result_bh, _)) = join_result { - assert_eq!(result_bh, bootstrap_height); + if let Some(TvuReturnType::LeaderRotation(tick_height, _, _)) = join_result { + assert_eq!(tick_height, bootstrap_height); } else { panic!("Expected validator to have exited due to leader rotation"); } @@ -873,14 +989,20 @@ mod tests { _ => panic!("Role should not be leader"), } - // Check the validator ledger to make sure it's the right height, we should've - // transitioned after the bootstrap_height entry - let (_, entry_height, _) = Fullnode::new_bank_from_ledger( + // Check the validator ledger for the correct entry + tick heights, we should've + // transitioned after tick_height = bootstrap_height. + let (_, tick_height, entry_height, _) = Fullnode::new_bank_from_ledger( &validator_ledger_path, &mut LeaderScheduler::new(&leader_scheduler_config), ); - assert_eq!(entry_height, bootstrap_height); + assert_eq!(tick_height, bootstrap_height); + assert_eq!( + entry_height, + // Only the first genesis entry has num_hashes = 0, every other entry + // had num_hashes = 1 + bootstrap_height + active_set_entries_len + initial_non_tick_height, + ); // Shut down t_responder.join().expect("responder thread join"); diff --git a/src/leader_scheduler.rs b/src/leader_scheduler.rs index 66d800e6c9..cb8b3d9faf 100644 --- a/src/leader_scheduler.rs +++ b/src/leader_scheduler.rs @@ -9,6 +9,7 @@ use budget_transaction::BudgetTransaction; use byteorder::{LittleEndian, ReadBytesExt}; use entry::Entry; use hash::{hash, Hash}; +use ledger::create_ticks; use signature::{Keypair, KeypairUtil}; #[cfg(test)] use solana_program_interface::account::Account; @@ -222,7 +223,7 @@ impl LeaderScheduler { (height - self.bootstrap_height) % self.leader_rotation_interval == 0 } - pub fn entries_until_next_leader_rotation(&self, height: u64) -> Option { + pub fn count_until_next_leader_rotation(&self, height: u64) -> Option { if self.use_only_bootstrap_leader { return None; } @@ -237,6 +238,46 @@ impl LeaderScheduler { } } + // Let Leader X be the leader at the input tick height. This function returns the + // the PoH height at which Leader X's slot ends. + pub fn max_height_for_leader(&self, height: u64) -> Option { + if self.use_only_bootstrap_leader || self.get_scheduled_leader(height).is_none() { + return None; + } + + let result = { + if height < self.bootstrap_height || self.leader_schedule.len() > 1 { + // Two cases to consider: + // + // 1) If height is less than the bootstrap height, then the current leader's + // slot ends when PoH height = bootstrap_height + // + // 2) Otherwise, if height >= bootstrap height, then we have generated a schedule. + // If this leader is not the only one in the schedule, then they will + // only be leader until the end of this slot (someone else is then guaranteed + // to take over) + // + // Both above cases are calculated by the function: + // count_until_next_leader_rotation() + height + self.count_until_next_leader_rotation(height).expect( + "Should return some value when not using default implementation + of LeaderScheduler", + ) + height + } else { + // If the height is greater than bootstrap_height and this leader is + // the only leader in the schedule, then that leader will be in power + // for every slot until the next epoch, which is seed_rotation_interval + // PoH counts from the beginning of the last epoch. + self.last_seed_height.expect( + "If height >= bootstrap height, then we expect + a seed has been generated", + ) + self.seed_rotation_interval + } + }; + + Some(result) + } + pub fn reset(&mut self) { self.last_seed_height = None; self.active_validators.reset(); @@ -259,6 +300,12 @@ impl LeaderScheduler { return; } + if let Some(last_seed_height) = self.last_seed_height { + if height <= last_seed_height { + return; + } + } + if (height - self.bootstrap_height) % self.seed_rotation_interval == 0 { self.generate_schedule(height, bank); } @@ -303,6 +350,8 @@ impl LeaderScheduler { // Called every seed_rotation_interval entries, generates the leader schedule // for the range of entries: [height, height + seed_rotation_interval) fn generate_schedule(&mut self, height: u64, bank: &Bank) { + assert!(height >= self.bootstrap_height); + assert!((height - self.bootstrap_height) % self.seed_rotation_interval == 0); let seed = Self::calculate_seed(height); self.seed = seed; let active_set = self.get_active_set(height); @@ -330,12 +379,36 @@ impl LeaderScheduler { // schedule let ordered_account_stake = ranked_active_set.into_iter().map(|(_, stake)| stake); let start_index = Self::choose_account(ordered_account_stake, self.seed, total_stake); - validator_rankings.rotate_left(start_index + 1); + validator_rankings.rotate_left(start_index); // There are only seed_rotation_interval / self.leader_rotation_interval slots, so // we only need to keep at most that many validators in the schedule - validator_rankings - .truncate((self.seed_rotation_interval / self.leader_rotation_interval) as usize); + let slots_per_epoch = self.seed_rotation_interval / self.leader_rotation_interval; + + // If possible, try to avoid having the same leader twice in a row, but + // if there's only one leader to choose from, then we have no other choice + if validator_rankings.len() > 1 { + let old_epoch_last_leader = self + .get_scheduled_leader(height - 1) + .expect("Previous leader schedule should still exist"); + let new_epoch_start_leader = validator_rankings[0]; + + if old_epoch_last_leader == new_epoch_start_leader { + if slots_per_epoch == 1 { + // If there is only one slot per epoch, and the same leader as the last slot + // of the previous epoch was chosen, then pick the next leader in the + // rankings instead + validator_rankings[0] = validator_rankings[1]; + } else { + // If there is more than one leader in the schedule, truncate and set the most + // recent leader to the back of the line. This way that node will still remain + // in the rotation, just at a later slot. + validator_rankings.truncate(slots_per_epoch as usize); + validator_rankings.rotate_left(1); + } + } + } + self.leader_schedule = validator_rankings; self.last_seed_height = Some(height); } @@ -432,12 +505,13 @@ pub fn make_active_set_entries( token_source: &Keypair, last_entry_id: &Hash, last_tick_id: &Hash, + num_ending_ticks: usize, ) -> Vec { // 1) Create transfer token entry let transfer_tx = Transaction::system_new(&token_source, active_keypair.pubkey(), 1, *last_tick_id); - let transfer_entry = Entry::new(last_entry_id, 0, vec![transfer_tx]); - let last_entry_id = transfer_entry.id; + let transfer_entry = Entry::new(last_entry_id, 1, vec![transfer_tx]); + let mut last_entry_id = transfer_entry.id; // 2) Create vote entry let vote = Vote { @@ -445,9 +519,14 @@ pub fn make_active_set_entries( contact_info_version: 0, }; let vote_tx = Transaction::budget_new_vote(&active_keypair, vote, *last_tick_id, 0); - let vote_entry = Entry::new(&last_entry_id, 0, vec![vote_tx]); + let vote_entry = Entry::new(&last_entry_id, 1, vec![vote_tx]); + last_entry_id = vote_entry.id; - vec![transfer_entry, vote_entry] + // 3) Create the ending empty ticks + let mut txs = vec![transfer_entry, vote_entry]; + let empty_ticks = create_ticks(num_ending_ticks, last_entry_id); + txs.extend(empty_ticks); + txs } #[cfg(test)] @@ -851,7 +930,7 @@ mod tests { // validators as part of the schedule each time (we need to check the active window // is the cause of validators being truncated later) let seed_rotation_interval = leader_rotation_interval * num_validators; - let active_window_length = 1; + let active_window_length = seed_rotation_interval; let leader_scheduler_config = LeaderSchedulerConfig::new( bootstrap_leader_id, @@ -877,7 +956,7 @@ mod tests { let new_pubkey = new_validator.pubkey(); validators.push(new_pubkey); // Vote at height i * active_window_length for validator i - leader_scheduler.push_vote(new_pubkey, i * active_window_length); + leader_scheduler.push_vote(new_pubkey, i * active_window_length + bootstrap_height); bank.transfer((i + 1) as i64, &mint.keypair(), new_pubkey, last_id) .unwrap(); } @@ -886,7 +965,7 @@ mod tests { // validators are falling out of the rotation as they fall out of the // active set for i in 0..=num_validators { - leader_scheduler.generate_schedule(i * active_window_length, &bank); + leader_scheduler.generate_schedule(i * active_window_length + bootstrap_height, &bank); let result = &leader_scheduler.leader_schedule; let expected = if i == num_validators { bootstrap_leader_id @@ -1022,4 +1101,202 @@ mod tests { let active_validators = ActiveValidators::new(Some(active_window_length)); assert_eq!(active_validators.active_window_length, active_window_length); } + + fn run_consecutive_leader_test(num_slots_per_epoch: u64, add_validator: bool) { + let bootstrap_leader_id = Keypair::new().pubkey(); + let bootstrap_height = 500; + let leader_rotation_interval = 100; + let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval; + let active_window_length = bootstrap_height + seed_rotation_interval; + + let leader_scheduler_config = LeaderSchedulerConfig::new( + bootstrap_leader_id, + Some(bootstrap_height), + Some(leader_rotation_interval), + Some(seed_rotation_interval), + Some(active_window_length), + ); + + let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); + + // Create mint and bank + let mint = Mint::new(10000); + let bank = Bank::new(&mint); + let last_id = mint + .create_entries() + .last() + .expect("Mint should not create empty genesis entries") + .id; + let initial_vote_height = 1; + + // Create and add validator to the active set + let validator_id = Keypair::new().pubkey(); + if add_validator { + leader_scheduler.push_vote(validator_id, initial_vote_height); + bank.transfer(1, &mint.keypair(), validator_id, last_id) + .unwrap(); + } + + // Make sure the bootstrap leader, not the validator, is picked again on next slot + // Depending on the seed, we make the leader stake either 2, or 3. Because the + // validator stake is always 1, then the rankings will always be + // [(validator, 1), (leader, leader_stake)]. Thus we just need to make sure that + // seed % (leader_stake + 1) > 0 to make sure that the leader is picked again. + let seed = LeaderScheduler::calculate_seed(bootstrap_height); + let leader_stake = { + if seed % 3 == 0 { + 3 + } else { + 2 + } + }; + + // Add leader to the active set + leader_scheduler.push_vote(bootstrap_leader_id, initial_vote_height); + bank.transfer(leader_stake, &mint.keypair(), bootstrap_leader_id, last_id) + .unwrap(); + + leader_scheduler.generate_schedule(bootstrap_height, &bank); + + // Make sure the validator, not the leader is selected on the first slot of the + // next epoch + if add_validator { + assert!(leader_scheduler.leader_schedule[0] == validator_id); + } else { + assert!(leader_scheduler.leader_schedule[0] == bootstrap_leader_id); + } + } + + #[test] + fn test_avoid_consecutive_leaders() { + // Test when there is both a leader + validator in the active set + run_consecutive_leader_test(1, true); + run_consecutive_leader_test(2, true); + run_consecutive_leader_test(10, true); + + // Test when there is only one node in the active set + run_consecutive_leader_test(1, false); + run_consecutive_leader_test(2, false); + run_consecutive_leader_test(10, false); + } + + #[test] + fn test_max_height_for_leader() { + let bootstrap_leader_id = Keypair::new().pubkey(); + let bootstrap_height = 500; + let leader_rotation_interval = 100; + let seed_rotation_interval = 2 * leader_rotation_interval; + let active_window_length = bootstrap_height + seed_rotation_interval; + + let leader_scheduler_config = LeaderSchedulerConfig::new( + bootstrap_leader_id, + Some(bootstrap_height), + Some(leader_rotation_interval), + Some(seed_rotation_interval), + Some(active_window_length), + ); + + let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); + + // Create mint and bank + let mint = Mint::new(10000); + let bank = Bank::new(&mint); + let last_id = mint + .create_entries() + .last() + .expect("Mint should not create empty genesis entries") + .id; + let initial_vote_height = 1; + + // No schedule generated yet, so for all heights < bootstrap height, the + // max height will be bootstrap leader + assert_eq!( + leader_scheduler.max_height_for_leader(0), + Some(bootstrap_height) + ); + assert_eq!( + leader_scheduler.max_height_for_leader(bootstrap_height - 1), + Some(bootstrap_height) + ); + assert_eq!( + leader_scheduler.max_height_for_leader(bootstrap_height), + None + ); + + // Test when the active set == 1 node + + // Generate schedule where the bootstrap leader will be the only + // choice because the active set is empty. Thus if the schedule + // was generated on PoH height bootstrap_height + n * seed_rotation_interval, + // then the same leader will be in power until PoH height + // bootstrap_height + (n + 1) * seed_rotation_interval + leader_scheduler.generate_schedule(bootstrap_height, &bank); + assert_eq!( + leader_scheduler.max_height_for_leader(bootstrap_height), + Some(bootstrap_height + seed_rotation_interval) + ); + assert_eq!( + leader_scheduler.max_height_for_leader(bootstrap_height - 1), + None + ); + leader_scheduler.generate_schedule(bootstrap_height + seed_rotation_interval, &bank); + assert_eq!( + leader_scheduler.max_height_for_leader(bootstrap_height + seed_rotation_interval), + Some(bootstrap_height + 2 * seed_rotation_interval) + ); + assert_eq!( + leader_scheduler.max_height_for_leader(bootstrap_height + seed_rotation_interval - 1), + None + ); + + leader_scheduler.reset(); + + // Now test when the active set > 1 node + + // Create and add validator to the active set + let validator_id = Keypair::new().pubkey(); + leader_scheduler.push_vote(validator_id, initial_vote_height); + bank.transfer(1, &mint.keypair(), validator_id, last_id) + .unwrap(); + + // Add leader to the active set + leader_scheduler.push_vote(bootstrap_leader_id, initial_vote_height); + bank.transfer(1, &mint.keypair(), bootstrap_leader_id, last_id) + .unwrap(); + + // Generate the schedule + leader_scheduler.generate_schedule(bootstrap_height, &bank); + + assert_eq!( + leader_scheduler.max_height_for_leader(bootstrap_height), + Some(bootstrap_height + leader_rotation_interval) + ); + assert_eq!( + leader_scheduler.max_height_for_leader(bootstrap_height - 1), + None + ); + assert_eq!( + leader_scheduler.max_height_for_leader(bootstrap_height + leader_rotation_interval), + Some(bootstrap_height + 2 * leader_rotation_interval) + ); + assert_eq!( + leader_scheduler.max_height_for_leader(bootstrap_height + seed_rotation_interval), + None, + ); + + leader_scheduler.generate_schedule(bootstrap_height + seed_rotation_interval, &bank); + + assert_eq!( + leader_scheduler.max_height_for_leader(bootstrap_height + seed_rotation_interval), + Some(bootstrap_height + seed_rotation_interval + leader_rotation_interval) + ); + assert_eq!( + leader_scheduler.max_height_for_leader(bootstrap_height + seed_rotation_interval - 1), + None + ); + assert_eq!( + leader_scheduler.max_height_for_leader(bootstrap_height + 2 * seed_rotation_interval), + None + ); + } } diff --git a/src/ledger.rs b/src/ledger.rs index 1ff3e5f5b8..49d625e63d 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -628,13 +628,29 @@ pub fn create_tmp_genesis(name: &str, num: i64) -> (Mint, String) { (mint, path) } -pub fn create_tmp_sample_ledger(name: &str, num: i64) -> (Mint, String, Vec) { - let mint = Mint::new(num); +pub fn create_ticks(num_ticks: usize, mut hash: Hash) -> Vec { + let mut ticks = Vec::with_capacity(num_ticks as usize); + for _ in 0..num_ticks { + let new_tick = Entry::new(&hash, 1, vec![]); + hash = new_tick.id; + ticks.push(new_tick); + } + + ticks +} + +pub fn create_tmp_sample_ledger( + name: &str, + num_tokens: i64, + num_ending_ticks: usize, +) -> (Mint, String, Vec) { + let mint = Mint::new(num_tokens); let path = get_tmp_ledger_path(name); // Create the entries let mut genesis = mint.create_entries(); - genesis.extend(vec![Entry::new(&mint.last_id(), 0, vec![])]); + let ticks = create_ticks(num_ending_ticks, mint.last_id()); + genesis.extend(ticks); let mut writer = LedgerWriter::open(&path, true).unwrap(); writer.write_entries(genesis.clone()).unwrap(); @@ -1017,5 +1033,4 @@ mod tests { let _ignored = remove_dir_all(&ledger_path); } - } diff --git a/src/mint.rs b/src/mint.rs index d912d57b59..74e92ad1de 100644 --- a/src/mint.rs +++ b/src/mint.rs @@ -60,7 +60,7 @@ impl Mint { pub fn create_entries(&self) -> Vec { let e0 = Entry::new(&self.seed(), 0, vec![]); - let e1 = Entry::new(&e0.id, 0, self.create_transactions()); + let e1 = Entry::new(&e0.id, 1, self.create_transactions()); vec![e0, e1] } } diff --git a/src/packet.rs b/src/packet.rs index b212f1050c..e054a70b4c 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -3,9 +3,11 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use counter::Counter; #[cfg(test)] +use entry::Entry; +#[cfg(test)] use hash::Hash; #[cfg(test)] -use ledger::{next_entries_mut, Block}; +use ledger::Block; use log::Level; use recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; use result::{Error, Result}; @@ -411,10 +413,12 @@ pub fn make_consecutive_blobs( addr: &SocketAddr, ) -> SharedBlobs { let mut last_hash = start_hash; - let mut num_hashes = 0; + let num_hashes = 1; let mut all_entries = Vec::with_capacity(num_blobs_to_make as usize); for _ in 0..num_blobs_to_make { - all_entries.extend(next_entries_mut(&mut last_hash, &mut num_hashes, vec![])); + let entry = Entry::new(&last_hash, num_hashes, vec![]); + last_hash = entry.id; + all_entries.push(entry); } let mut new_blobs = all_entries.to_blobs_with_id(me_id, start_height, addr); new_blobs.truncate(num_blobs_to_make as usize); diff --git a/src/poh.rs b/src/poh.rs index 20b00d772e..701f721638 100644 --- a/src/poh.rs +++ b/src/poh.rs @@ -5,6 +5,7 @@ use hash::{hash, hashv, Hash}; pub struct Poh { last_hash: Hash, num_hashes: u64, + pub tick_height: u64, } #[derive(Debug)] @@ -15,10 +16,11 @@ pub struct PohEntry { } impl Poh { - pub fn new(last_hash: Hash) -> Self { + pub fn new(last_hash: Hash, tick_height: u64) -> Self { Poh { last_hash, num_hashes: 0, + tick_height, } } @@ -32,7 +34,6 @@ impl Poh { self.last_hash = hashv(&[&self.last_hash.as_ref(), &mixin.as_ref()]); self.num_hashes = 0; - PohEntry { num_hashes, id: self.last_hash, @@ -47,6 +48,7 @@ impl Poh { let num_hashes = self.num_hashes; self.num_hashes = 0; + self.tick_height += 1; PohEntry { num_hashes, diff --git a/src/poh_recorder.rs b/src/poh_recorder.rs index 4c528cb4b3..c525759bda 100644 --- a/src/poh_recorder.rs +++ b/src/poh_recorder.rs @@ -5,53 +5,93 @@ use bank::Bank; use entry::Entry; use hash::Hash; use poh::Poh; -use result::Result; +use result::{Error, Result}; use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex}; use transaction::Transaction; +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum PohRecorderError { + MaxHeightReached, +} + #[derive(Clone)] pub struct PohRecorder { poh: Arc>, bank: Arc, sender: Sender>, + // TODO: whe extracting PoH generator into a separate standalone service, + // use this field for checking timeouts when running as a validator, and as + // a transmission guard when running as the leader. + pub max_tick_height: Option, } impl PohRecorder { /// A recorder to synchronize PoH with the following data structures /// * bank - the LastId's queue is updated on `tick` and `record` events /// * sender - the Entry channel that outputs to the ledger - pub fn new(bank: Arc, sender: Sender>, last_entry_id: Hash) -> Self { - let poh = Arc::new(Mutex::new(Poh::new(last_entry_id))); - PohRecorder { poh, bank, sender } + pub fn new( + bank: Arc, + sender: Sender>, + last_entry_id: Hash, + tick_height: u64, + max_tick_height: Option, + ) -> Self { + let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, tick_height))); + PohRecorder { + poh, + bank, + sender, + max_tick_height, + } } - pub fn hash(&self) { + pub fn hash(&self) -> Result<()> { // TODO: amortize the cost of this lock by doing the loop in here for // some min amount of hashes let mut poh = self.poh.lock().unwrap(); - poh.hash() + if self.check_max_tick_height_reached(&*poh) { + Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) + } else { + poh.hash(); + Ok(()) + } } - pub fn tick(&self) -> Result<()> { - // Register and send the entry out while holding the lock. + pub fn tick(&mut self) -> Result { + // Register and send the entry out while holding the lock if the max PoH height + // hasn't been reached. // This guarantees PoH order and Entry production and banks LastId queue is the same let mut poh = self.poh.lock().unwrap(); - let tick = poh.tick(); - self.bank.register_entry_id(&tick.id); - let entry = Entry { - num_hashes: tick.num_hashes, - id: tick.id, - transactions: vec![], - }; - self.sender.send(vec![entry])?; - Ok(()) + if self.check_max_tick_height_reached(&*poh) { + Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) + } else { + self.register_and_send_tick(&mut *poh)?; + Ok(poh.tick_height) + } } pub fn record(&self, mixin: Hash, txs: Vec) -> Result<()> { // Register and send the entry out while holding the lock. // This guarantees PoH order and Entry production and banks LastId queue is the same. let mut poh = self.poh.lock().unwrap(); + if self.check_max_tick_height_reached(&*poh) { + Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) + } else { + self.record_and_send_txs(&mut *poh, mixin, txs)?; + Ok(()) + } + } + + fn check_max_tick_height_reached(&self, poh: &Poh) -> bool { + if let Some(max_tick_height) = self.max_tick_height { + poh.tick_height >= max_tick_height + } else { + false + } + } + + fn record_and_send_txs(&self, poh: &mut Poh, mixin: Hash, txs: Vec) -> Result<()> { let tick = poh.record(mixin); assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function"); let entry = Entry { @@ -62,6 +102,18 @@ impl PohRecorder { self.sender.send(vec![entry])?; Ok(()) } + + fn register_and_send_tick(&self, poh: &mut Poh) -> Result<()> { + let tick = poh.tick(); + self.bank.register_entry_id(&tick.id); + let entry = Entry { + num_hashes: tick.num_hashes, + id: tick.id, + transactions: vec![], + }; + self.sender.send(vec![entry])?; + Ok(()) + } } #[cfg(test)] @@ -79,7 +131,7 @@ mod tests { let bank = Arc::new(Bank::new(&mint)); let last_id = bank.last_id(); let (entry_sender, entry_receiver) = channel(); - let poh_recorder = PohRecorder::new(bank, entry_sender, last_id); + let mut poh_recorder = PohRecorder::new(bank, entry_sender, last_id, 0, None); //send some data let h1 = hash(b"hello world!"); diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 26a32d1af6..ae5f6da71d 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -27,7 +27,7 @@ use vote_stage::send_validator_vote; #[derive(Debug, PartialEq, Eq, Clone)] pub enum ReplicateStageReturnType { - LeaderRotation(u64, Hash), + LeaderRotation(u64, u64, Hash), } // Implement a destructor for the ReplicateStage thread to signal it exited @@ -62,6 +62,7 @@ impl ReplicateStage { ledger_writer: Option<&mut LedgerWriter>, keypair: &Arc, vote_blob_sender: Option<&BlobSender>, + tick_height: &mut u64, entry_height: &mut u64, leader_scheduler: &Arc>, ) -> Result { @@ -85,24 +86,30 @@ impl ReplicateStage { let last_entry_id = { let mut num_entries_to_write = entries.len(); for (i, entry) in entries.iter().enumerate() { - res = bank.process_entry(&entry); - Bank::process_entry_votes( - &bank, + // max_tick_height is the PoH height at which the next leader rotation will + // happen. The leader should send an entry such that the total PoH is equal + // to max_tick_height - guard. + // TODO: Introduce a "guard" for the end of transmission periods, the guard + // is assumed to be zero for now. + let max_tick_height = { + let ls_lock = leader_scheduler.read().unwrap(); + ls_lock.max_height_for_leader(*tick_height) + }; + + res = bank.process_entry( &entry, - *entry_height + i as u64 + 1, + tick_height, &mut *leader_scheduler.write().unwrap(), ); - { + // Will run only if leader_scheduler.use_only_bootstrap_leader is false + if let Some(max_tick_height) = max_tick_height { let ls_lock = leader_scheduler.read().unwrap(); - if ls_lock.is_leader_rotation_height( - // i is zero indexed, so current entry height for this entry is actually the - // old entry height + i + 1 - *entry_height + i as u64 + 1, - ) { + if *tick_height == max_tick_height { let my_id = keypair.pubkey(); - let scheduled_leader = - ls_lock.get_scheduled_leader(*entry_height + i as u64 + 1).expect("Scheduled leader id should never be unknown while processing entries"); + let scheduled_leader = ls_lock.get_scheduled_leader(*tick_height).expect( + "Scheduled leader id should never be unknown while processing entries", + ); cluster_info.write().unwrap().set_leader(scheduled_leader); if my_id == scheduled_leader { num_entries_to_write = i + 1; @@ -162,6 +169,7 @@ impl ReplicateStage { window_receiver: EntryReceiver, ledger_path: Option<&str>, exit: Arc, + tick_height: u64, entry_height: u64, leader_scheduler: Arc>, ) -> Self { @@ -179,15 +187,17 @@ impl ReplicateStage { let now = Instant::now(); let mut next_vote_secs = 1; let mut entry_height_ = entry_height; + let mut tick_height_ = tick_height; let mut last_entry_id = None; loop { let leader_id = leader_scheduler .read() .unwrap() - .get_scheduled_leader(entry_height_) + .get_scheduled_leader(tick_height_) .expect("Scheduled leader id should never be unknown at this point"); if leader_id == keypair.pubkey() { return Some(ReplicateStageReturnType::LeaderRotation( + tick_height_, entry_height_, // We should never start the TPU / this stage on an exact entry that causes leader // rotation (Fullnode should automatically transition on startup if it detects @@ -212,6 +222,7 @@ impl ReplicateStage { ledger_writer.as_mut(), &keypair, vote_sender, + &mut tick_height_, &mut entry_height_, &leader_scheduler, ) { @@ -246,9 +257,10 @@ impl Service for ReplicateStage { #[cfg(test)] mod test { use cluster_info::{ClusterInfo, Node}; + use entry::Entry; use fullnode::Fullnode; use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; - use ledger::{create_tmp_sample_ledger, next_entries_mut, LedgerWriter}; + use ledger::{create_tmp_sample_ledger, LedgerWriter}; use logger; use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; use service::Service; @@ -269,8 +281,12 @@ mod test { let cluster_info_me = ClusterInfo::new(my_node.info.clone()).expect("ClusterInfo::new"); // Create a ledger - let (mint, my_ledger_path, genesis_entries) = - create_tmp_sample_ledger("test_replicate_stage_leader_rotation_exit", 10_000); + let num_ending_ticks = 1; + let (mint, my_ledger_path, genesis_entries) = create_tmp_sample_ledger( + "test_replicate_stage_leader_rotation_exit", + 10_000, + num_ending_ticks, + ); let mut last_id = genesis_entries .last() .expect("expected at least one genesis entry") @@ -280,11 +296,16 @@ mod test { // 1) Give the validator a nonzero number of tokens 2) A vote from the validator . // This will cause leader rotation after the bootstrap height let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap(); - let bootstrap_entries = - make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id); - last_id = bootstrap_entries.last().unwrap().id; - let ledger_initial_len = (genesis_entries.len() + bootstrap_entries.len()) as u64; - ledger_writer.write_entries(bootstrap_entries).unwrap(); + let active_set_entries = + make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id, 0); + last_id = active_set_entries.last().unwrap().id; + let initial_tick_height = genesis_entries + .iter() + .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64); + let active_set_entries_len = active_set_entries.len() as u64; + let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; + let initial_entry_len = genesis_entries.len() as u64 + active_set_entries_len; + ledger_writer.write_entries(active_set_entries).unwrap(); // Set up the LeaderScheduler so that this this node becomes the leader at // bootstrap_height = num_bootstrap_slots * leader_rotation_interval @@ -303,7 +324,8 @@ mod test { let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); // Set up the bank - let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler); + let (bank, _, _, _) = + Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler); let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); @@ -317,47 +339,57 @@ mod test { entry_receiver, Some(&my_ledger_path), exit.clone(), - ledger_initial_len, + initial_tick_height, + initial_entry_len, leader_scheduler.clone(), ); - // Send enough entries to trigger leader rotation + // Send enough ticks to trigger leader rotation let extra_entries = leader_rotation_interval; let total_entries_to_send = (bootstrap_height + extra_entries) as usize; - let mut num_hashes = 0; + let num_hashes = 1; let mut entries_to_send = vec![]; while entries_to_send.len() < total_entries_to_send { - let entries = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); - last_id = entries.last().expect("expected at least one entry").id; - entries_to_send.extend(entries); + let entry = Entry::new(&mut last_id, num_hashes, vec![]); + last_id = entry.id; + entries_to_send.push(entry); } - entries_to_send.truncate(total_entries_to_send); - let last_id = entries_to_send[(bootstrap_height - 1) as usize].id; + assert!((num_ending_ticks as u64) < bootstrap_height); + + // Add on the only entries that weren't ticks to the bootstrap height to get the + // total expected entry length + let expected_entry_height = + bootstrap_height + initial_non_tick_height + active_set_entries_len; + let expected_last_id = + entries_to_send[(bootstrap_height - initial_tick_height - 1) as usize].id; entry_sender.send(entries_to_send).unwrap(); // Wait for replicate_stage to exit and check return value is correct assert_eq!( Some(ReplicateStageReturnType::LeaderRotation( bootstrap_height, - last_id + expected_entry_height, + expected_last_id, )), replicate_stage.join().expect("replicate stage join") ); assert_eq!(exit.load(Ordering::Relaxed), true); - //Check ledger height is correct + // Check ledger height is correct let mut leader_scheduler = Arc::try_unwrap(leader_scheduler) .expect("Multiple references to this RwLock still exist") .into_inner() .expect("RwLock for LeaderScheduler is still locked"); - let (_, entry_height, _) = + leader_scheduler.reset(); + let (_, tick_height, entry_height, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler); - assert_eq!(entry_height, bootstrap_height); + assert_eq!(tick_height, bootstrap_height); + assert_eq!(entry_height, expected_entry_height); let _ignored = remove_dir_all(&my_ledger_path); } } diff --git a/src/replicator.rs b/src/replicator.rs index 45a09fe65b..fa51dd3a15 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -108,6 +108,7 @@ impl Replicator { let t_window = window_service( cluster_info.clone(), shared_window.clone(), + 0, entry_height, max_entry_height, blob_fetch_receiver, diff --git a/src/result.rs b/src/result.rs index e5a5ed303b..c66d236596 100644 --- a/src/result.rs +++ b/src/result.rs @@ -6,6 +6,7 @@ use cluster_info; #[cfg(feature = "erasure")] use erasure; use packet; +use poh_recorder; use serde_json; use std; use std::any::Any; @@ -25,6 +26,7 @@ pub enum Error { #[cfg(feature = "erasure")] ErasureError(erasure::ErasureError), SendError, + PohRecorderError(poh_recorder::PohRecorderError), } pub type Result = std::result::Result; @@ -73,7 +75,6 @@ impl std::convert::From> for Error { Error::JoinError(e) } } - impl std::convert::From for Error { fn from(e: std::io::Error) -> Error { Error::IO(e) @@ -94,6 +95,11 @@ impl std::convert::From> for Error { Error::Serialize(e) } } +impl std::convert::From for Error { + fn from(e: poh_recorder::PohRecorderError) -> Error { + Error::PohRecorderError(e) + } +} #[cfg(test)] mod tests { diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 8f788586a4..33a3abb953 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -87,6 +87,7 @@ impl RetransmitStage { pub fn new( cluster_info: &Arc>, window: SharedWindow, + tick_height: u64, entry_height: u64, retransmit_socket: Arc, repair_socket: Arc, @@ -102,6 +103,7 @@ impl RetransmitStage { let t_window = window_service( cluster_info.clone(), window, + tick_height, entry_height, 0, fetch_stage_receiver, diff --git a/src/rpc.rs b/src/rpc.rs index 720c38ceac..7512d4a3e8 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -559,6 +559,7 @@ mod tests { let server = Fullnode::new_with_bank( leader_keypair, bank, + 0, entry_height, &genesis_entries, leader, diff --git a/src/thin_client.rs b/src/thin_client.rs index 4fdab82292..f32fcf375f 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -459,6 +459,7 @@ mod tests { leader_keypair, bank, 0, + 0, &[], leader, None, @@ -506,6 +507,7 @@ mod tests { leader_keypair, bank, 0, + 0, &[], leader, None, @@ -567,6 +569,7 @@ mod tests { let server = Fullnode::new_with_bank( leader_keypair, bank, + 0, entry_height, &genesis_entries, leader, @@ -630,6 +633,7 @@ mod tests { let server = Fullnode::new_with_bank( leader_keypair, bank, + 0, entry_height, &genesis_entries, leader, diff --git a/src/tpu.rs b/src/tpu.rs index 2f4d1b16c1..dd890e62c8 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -26,12 +26,11 @@ //! ``` use bank::Bank; -use banking_stage::{BankingStage, Config}; +use banking_stage::{BankingStage, BankingStageReturnType, Config}; use cluster_info::ClusterInfo; use entry::Entry; use fetch_stage::FetchStage; use hash::Hash; -use leader_scheduler::LeaderScheduler; use service::Service; use signature::Keypair; use sigverify_stage::SigVerifyStage; @@ -40,7 +39,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::Receiver; use std::sync::{Arc, RwLock}; use std::thread; -use write_stage::{WriteStage, WriteStageReturnType}; +use write_stage::WriteStage; pub enum TpuReturnType { LeaderRotation, @@ -64,9 +63,9 @@ impl Tpu { transactions_sockets: Vec, ledger_path: &str, sigverify_disabled: bool, - entry_height: u64, + tick_height: u64, + max_tick_height: Option, last_entry_id: &Hash, - leader_scheduler: Arc>, ) -> (Self, Receiver>, Arc) { let exit = Arc::new(AtomicBool::new(false)); @@ -75,8 +74,14 @@ impl Tpu { let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); - let (banking_stage, entry_receiver) = - BankingStage::new(&bank, verified_receiver, tick_duration, last_entry_id); + let (banking_stage, entry_receiver) = BankingStage::new( + &bank, + verified_receiver, + tick_duration, + last_entry_id, + tick_height, + max_tick_height, + ); let (write_stage, entry_forwarder) = WriteStage::new( keypair, @@ -84,8 +89,6 @@ impl Tpu { cluster_info.clone(), ledger_path, entry_receiver, - entry_height, - leader_scheduler, ); let tpu = Tpu { @@ -118,9 +121,9 @@ impl Service for Tpu { fn join(self) -> thread::Result<(Option)> { self.fetch_stage.join()?; self.sigverify_stage.join()?; - self.banking_stage.join()?; - match self.write_stage.join()? { - WriteStageReturnType::LeaderRotation => Ok(Some(TpuReturnType::LeaderRotation)), + self.write_stage.join()?; + match self.banking_stage.join()? { + Some(BankingStageReturnType::LeaderRotation) => Ok(Some(TpuReturnType::LeaderRotation)), _ => Ok(None), } } diff --git a/src/tvu.rs b/src/tvu.rs index 8b7669673d..244ecc09bb 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -53,7 +53,7 @@ use window::SharedWindow; #[derive(Debug, PartialEq, Eq, Clone)] pub enum TvuReturnType { - LeaderRotation(u64, Hash), + LeaderRotation(u64, u64, Hash), } pub struct Tvu { @@ -79,6 +79,7 @@ impl Tvu { pub fn new( keypair: Arc, bank: &Arc, + tick_height: u64, entry_height: u64, cluster_info: Arc>, window: SharedWindow, @@ -102,6 +103,7 @@ impl Tvu { let (retransmit_stage, blob_window_receiver) = RetransmitStage::new( &cluster_info, window, + tick_height, entry_height, Arc::new(retransmit_socket), repair_socket, @@ -116,6 +118,7 @@ impl Tvu { blob_window_receiver, ledger_path, exit.clone(), + tick_height, entry_height, leader_scheduler, ); @@ -149,9 +152,15 @@ impl Service for Tvu { self.retransmit_stage.join()?; self.fetch_stage.join()?; match self.replicate_stage.join()? { - Some(ReplicateStageReturnType::LeaderRotation(entry_height, last_entry_id)) => Ok( - Some(TvuReturnType::LeaderRotation(entry_height, last_entry_id)), - ), + Some(ReplicateStageReturnType::LeaderRotation( + tick_height, + entry_height, + last_entry_id, + )) => Ok(Some(TvuReturnType::LeaderRotation( + tick_height, + entry_height, + last_entry_id, + ))), _ => Ok(None), } } @@ -254,6 +263,7 @@ pub mod tests { Arc::new(target1_keypair), &bank, 0, + 0, cref1, dr_1.1, target1.sockets.replicate, diff --git a/src/wallet.rs b/src/wallet.rs index ce4326b5fd..307999c893 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -932,6 +932,7 @@ mod tests { leader_keypair, bank, 0, + 0, &[], leader, None, @@ -1007,6 +1008,7 @@ mod tests { let server = Fullnode::new_with_bank( leader_keypair, bank, + 0, entry_height, &genesis_entries, leader, @@ -1085,6 +1087,7 @@ mod tests { leader_keypair, bank, 0, + 0, &[], leader, None, @@ -1204,6 +1207,7 @@ mod tests { leader_keypair, bank, 0, + 0, &[], leader, None, @@ -1321,6 +1325,7 @@ mod tests { leader_keypair, bank, 0, + 0, &[], leader, None, diff --git a/src/window.rs b/src/window.rs index 8c9648f750..dcf5e38a04 100644 --- a/src/window.rs +++ b/src/window.rs @@ -60,6 +60,7 @@ pub trait WindowUtil { times: usize, consumed: u64, received: u64, + tick_height: u64, max_entry_height: u64, leader_scheduler_option: &Arc>, ) -> Vec<(SocketAddr, Vec)>; @@ -74,6 +75,7 @@ pub trait WindowUtil { pix: u64, consume_queue: &mut Vec, consumed: &mut u64, + tick_height: &mut u64, leader_unknown: bool, pending_retransmits: &mut bool, ); @@ -101,6 +103,7 @@ impl WindowUtil for Window { times: usize, consumed: u64, received: u64, + tick_height: u64, max_entry_height: u64, leader_scheduler_option: &Arc>, ) -> Vec<(SocketAddr, Vec)> { @@ -110,28 +113,30 @@ impl WindowUtil for Window { let ls_lock = leader_scheduler_option.read().unwrap(); if !ls_lock.use_only_bootstrap_leader { // Calculate the next leader rotation height and check if we are the leader - let next_leader_rotation_height = consumed + ls_lock.entries_until_next_leader_rotation(consumed).expect("Leader rotation should exist when not using default implementation of LeaderScheduler"); - - match ls_lock.get_scheduled_leader(next_leader_rotation_height) { - Some(leader_id) if leader_id == *id => is_next_leader = true, - // In the case that we are not in the current scope of the leader schedule - // window then either: - // - // 1) The replicate stage hasn't caught up to the "consumed" entries we sent, - // in which case it will eventually catch up - // - // 2) We are on the border between seed_rotation_intervals, so the - // schedule won't be known until the entry on that cusp is received - // by the replicate stage (which comes after this stage). Hence, the next - // leader at the beginning of that next epoch will not know he is the - // leader until he receives that last "cusp" entry. He also won't ask for repairs - // for that entry because "is_next_leader" won't be set here. In this case, - // everybody will be blocking waiting for that "cusp" entry instead of repairing, - // until the leader hits "times" >= the max times in calculate_max_repair(). - // The impact of this, along with the similar problem from broadcast for the transitioning - // leader, can be observed in the multinode test, test_full_leader_validator_network(), - None => (), - _ => (), + if let Some(next_leader_rotation_height) = + ls_lock.max_height_for_leader(tick_height) + { + match ls_lock.get_scheduled_leader(next_leader_rotation_height) { + Some(leader_id) if leader_id == *id => is_next_leader = true, + // In the case that we are not in the current scope of the leader schedule + // window then either: + // + // 1) The replicate stage hasn't caught up to the "consumed" entries we sent, + // in which case it will eventually catch up + // + // 2) We are on the border between seed_rotation_intervals, so the + // schedule won't be known until the entry on that cusp is received + // by the replicate stage (which comes after this stage). Hence, the next + // leader at the beginning of that next epoch will not know he is the + // leader until he receives that last "cusp" entry. He also won't ask for repairs + // for that entry because "is_next_leader" won't be set here. In this case, + // everybody will be blocking waiting for that "cusp" entry instead of repairing, + // until the leader hits "times" >= the max times in calculate_max_repair(). + // The impact of this, along with the similar problem from broadcast for the transitioning + // leader, can be observed in the multinode test, test_full_leader_validator_network(), + None => (), + _ => (), + } } } } @@ -228,6 +233,7 @@ impl WindowUtil for Window { pix: u64, consume_queue: &mut Vec, consumed: &mut u64, + tick_height: &mut u64, leader_unknown: bool, pending_retransmits: &mut bool, ) { @@ -303,6 +309,9 @@ impl WindowUtil for Window { // Check that we can get the entries from this blob match reconstruct_entries_from_blobs(vec![k_data_blob]) { Ok(entries) => { + for entry in &entries { + *tick_height += entry.is_tick() as u64; + } consume_queue.extend(entries); } Err(_) => { diff --git a/src/window_service.rs b/src/window_service.rs index c5ad3aa9dd..b7e75a51a5 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -153,6 +153,7 @@ fn recv_window( cluster_info: &Arc>, consumed: &mut u64, received: &mut u64, + tick_height: &mut u64, max_ix: u64, r: &BlobReceiver, s: &EntrySender, @@ -231,6 +232,7 @@ fn recv_window( pix, &mut consume_queue, consumed, + tick_height, leader_unknown, pending_retransmits, ); @@ -263,6 +265,7 @@ fn recv_window( pub fn window_service( cluster_info: Arc>, window: SharedWindow, + tick_height: u64, entry_height: u64, max_entry_height: u64, r: BlobReceiver, @@ -275,6 +278,7 @@ pub fn window_service( Builder::new() .name("solana-window".to_string()) .spawn(move || { + let mut tick_height_ = tick_height; let mut consumed = entry_height; let mut received = entry_height; let mut last = entry_height; @@ -290,6 +294,7 @@ pub fn window_service( &cluster_info, &mut consumed, &mut received, + &mut tick_height_, max_entry_height, &r, &s, @@ -340,6 +345,7 @@ pub fn window_service( times, consumed, received, + tick_height_, max_entry_height, &leader_scheduler, ); @@ -406,6 +412,7 @@ mod test { win, 0, 0, + 0, r_reader, s_window, s_retransmit, @@ -468,6 +475,7 @@ mod test { win, 0, 0, + 0, r_reader, s_window, s_retransmit, @@ -531,6 +539,7 @@ mod test { win, 0, 0, + 0, r_reader, s_window, s_retransmit, diff --git a/src/write_stage.rs b/src/write_stage.rs index 46f5bb0b1b..ef4b85e8f7 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -3,18 +3,14 @@ //! stdout, and then sends the Entry to its output channel. use bank::Bank; -use budget_transaction::BudgetTransaction; use cluster_info::ClusterInfo; use counter::Counter; use entry::Entry; -use leader_scheduler::LeaderScheduler; use ledger::{Block, LedgerWriter}; use log::Level; use result::{Error, Result}; use service::Service; use signature::Keypair; -use solana_program_interface::pubkey::Pubkey; -use std::cmp; use std::net::UdpSocket; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; @@ -25,107 +21,19 @@ use streamer::responder; use timing::{duration_as_ms, duration_as_s}; use vote_stage::send_leader_vote; -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum WriteStageReturnType { - LeaderRotation, - ChannelDisconnected, -} - pub struct WriteStage { thread_hdls: Vec>, - write_thread: JoinHandle, + write_thread: JoinHandle<()>, } impl WriteStage { - // Given a vector of potential new entries to write, return as many as we can - // fit before we hit the entry height for leader rotation. Also return a boolean - // reflecting whether we actually hit an entry height for leader rotation. - fn find_leader_rotation_index( - bank: &Arc, - my_id: &Pubkey, - leader_scheduler: &mut LeaderScheduler, - entry_height: u64, - mut new_entries: Vec, - ) -> (Vec, bool) { - // In the case we're using the default implemntation of the leader scheduler, - // there won't ever be leader rotations at particular entry heights, so just - // return the entire input vector of entries - if leader_scheduler.use_only_bootstrap_leader { - return (new_entries, false); - } - - let new_entries_length = new_entries.len(); - - // i is the number of entries to take - let mut i = 0; - let mut is_leader_rotation = false; - - loop { - let next_leader = leader_scheduler.get_scheduled_leader(entry_height + i as u64); - - if next_leader != Some(*my_id) { - is_leader_rotation = true; - break; - } - - if i == new_entries_length { - break; - } - - // Find out how many more entries we can squeeze in until the next leader - // rotation - let entries_until_leader_rotation = leader_scheduler.entries_until_next_leader_rotation( - entry_height + (i as u64) - ).expect("Leader rotation should exist when not using default implementation of LeaderScheduler"); - - // Check the next leader rotation height entries in new_entries, or - // if the new_entries doesnt have that many entries remaining, - // just check the rest of the new_entries_vector - let step = cmp::min( - entries_until_leader_rotation as usize, - new_entries_length - i, - ); - - // 1) Since "i" is the current number/count of items from the new_entries vector we have - // have already checked, then "i" is also the INDEX into the new_entries vector of the - // next unprocessed entry. Hence this loop checks all entries between indexes: - // [entry_height + i, entry_height + i + step - 1], which is equivalent to the - // entry heights: [entry_height + i + 1, entry_height + i + step] - for (entry, new_entries_index) in new_entries[i..(i + step)].iter().zip(i..(i + step)) { - let votes = entry - .transactions - .iter() - .filter_map(BudgetTransaction::vote); - for (voter_id, _, _) in votes { - leader_scheduler - .push_vote(voter_id, entry_height + new_entries_index as u64 + 1); - } - // TODO: There's an issue here that the bank state may have updated - // while this entry was in flight from the BankingStage, which could cause - // the leader schedule, which is based on stake (tied to the bank account balances) - // right now, to be inconsistent with the rest of the network. Fix once - // we can pin PoH height to bank state - leader_scheduler.update_height(entry_height + new_entries_index as u64 + 1, bank); - } - - i += step - } - - new_entries.truncate(i as usize); - - (new_entries, is_leader_rotation) - } - /// Process any Entry items that have been published by the RecordStage. /// continuosly send entries out pub fn write_and_send_entries( - bank: &Arc, cluster_info: &Arc>, ledger_writer: &mut LedgerWriter, entry_sender: &Sender>, entry_receiver: &Receiver>, - entry_height: &mut u64, - leader_scheduler: &Arc>, ) -> Result<()> { let mut ventries = Vec::new(); let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; @@ -134,22 +42,8 @@ impl WriteStage { let mut num_txs = 0; loop { - // Find out how many more entries we can squeeze in until the next leader - // rotation - let (new_entries, is_leader_rotation) = Self::find_leader_rotation_index( - bank, - &cluster_info.read().unwrap().my_data().id, - &mut *leader_scheduler.write().unwrap(), - *entry_height + num_new_entries as u64, - received_entries, - ); - - num_new_entries += new_entries.len(); - ventries.push(new_entries); - - if is_leader_rotation { - break; - } + num_new_entries += received_entries.len(); + ventries.push(received_entries); if let Ok(n) = entry_receiver.try_recv() { received_entries = n; @@ -175,9 +69,6 @@ impl WriteStage { num_txs += e.transactions.len(); ledger_writer.write_entry_noflush(&e)?; } - // Once the entries have been written to the ledger, then we can - // safely incement entry height - *entry_height += entries.len() as u64; inc_new_counter_info!("write_stage-write_entries", entries.len()); @@ -217,8 +108,6 @@ impl WriteStage { cluster_info: Arc>, ledger_path: &str, entry_receiver: Receiver>, - entry_height: u64, - leader_scheduler: Arc>, ) -> (Self, Receiver>) { let (vote_blob_sender, vote_blob_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); @@ -236,42 +125,16 @@ impl WriteStage { let mut last_vote = 0; let mut last_valid_validator_timestamp = 0; let id = cluster_info.read().unwrap().id; - let mut entry_height_ = entry_height; loop { - // Note that entry height is not zero indexed, it starts at 1, so the - // old leader is in power up to and including entry height - // n * leader_rotation_interval for some "n". Once we've forwarded - // that last block, check for the next scheduled leader. - match leader_scheduler - .read() - .unwrap() - .get_scheduled_leader(entry_height_) - { - Some(leader_id) if leader_id == id => (), - None => panic!( - "Scheduled leader id should never be unknown while processing entries" - ), - _ => { - // If the leader is no longer in power, exit. - // When the broadcast stage has received the last blob, it - // will signal to close the fetch stage, which will in turn - // close down this write stage - return WriteStageReturnType::LeaderRotation; - } - } - if let Err(e) = Self::write_and_send_entries( - &bank, &cluster_info, &mut ledger_writer, &entry_sender, &entry_receiver, - &mut entry_height_, - &leader_scheduler, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { - return WriteStageReturnType::ChannelDisconnected + break; } Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), _ => { @@ -310,9 +173,9 @@ impl WriteStage { } impl Service for WriteStage { - type JoinReturnType = WriteStageReturnType; + type JoinReturnType = (); - fn join(self) -> thread::Result { + fn join(self) -> thread::Result<()> { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } @@ -320,409 +183,3 @@ impl Service for WriteStage { self.write_thread.join() } } - -#[cfg(test)] -mod tests { - use bank::Bank; - use cluster_info::{ClusterInfo, Node}; - use entry::Entry; - use hash::Hash; - use leader_scheduler::{set_new_leader, LeaderScheduler, LeaderSchedulerConfig}; - use ledger::{create_tmp_genesis, next_entries_mut, read_ledger}; - use service::Service; - use signature::{Keypair, KeypairUtil}; - use solana_program_interface::account::Account; - use solana_program_interface::pubkey::Pubkey; - use std::fs::remove_dir_all; - use std::sync::mpsc::{channel, Receiver, Sender}; - use std::sync::{Arc, RwLock}; - use write_stage::{WriteStage, WriteStageReturnType}; - - struct DummyWriteStage { - write_stage: WriteStage, - entry_sender: Sender>, - _write_stage_entry_receiver: Receiver>, - bank: Arc, - leader_ledger_path: String, - ledger_tail: Vec, - leader_scheduler: Arc>, - } - - fn process_ledger(ledger_path: &str, bank: &Bank) -> (u64, Vec) { - let entries = read_ledger(ledger_path, true).expect("opening ledger"); - - let entries = entries - .map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err))); - - info!("processing ledger..."); - bank.process_ledger(entries, &mut LeaderScheduler::default()) - .expect("process_ledger") - } - - fn setup_dummy_write_stage( - leader_keypair: Arc, - leader_scheduler_config: &LeaderSchedulerConfig, - test_name: &str, - ) -> DummyWriteStage { - // Setup leader info - let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - - let cluster_info = Arc::new(RwLock::new( - ClusterInfo::new(leader_info.info).expect("ClusterInfo::new"), - )); - let bank = Arc::new(Bank::default()); - - // Make a ledger - let (_, leader_ledger_path) = create_tmp_genesis(test_name, 10_000); - - let (entry_height, ledger_tail) = process_ledger(&leader_ledger_path, &bank); - - // Make a dummy pipe - let (entry_sender, entry_receiver) = channel(); - - // Make a dummy leader scheduler we can manipulate - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new(leader_scheduler_config))); - - // Start up the write stage - let (write_stage, _write_stage_entry_receiver) = WriteStage::new( - leader_keypair, - bank.clone(), - cluster_info.clone(), - &leader_ledger_path, - entry_receiver, - entry_height, - leader_scheduler.clone(), - ); - - DummyWriteStage { - write_stage, - entry_sender, - // Need to keep this alive, otherwise the write_stage will detect ChannelClosed - // and shut down - _write_stage_entry_receiver, - bank, - leader_ledger_path, - ledger_tail, - leader_scheduler, - } - } - - #[test] - fn test_write_stage_leader_rotation_exit() { - let leader_keypair = Keypair::new(); - let leader_id = leader_keypair.pubkey(); - - // Make a dummy leader scheduler we can manipulate - let bootstrap_height = 20; - let leader_rotation_interval = 10; - let seed_rotation_interval = 2 * leader_rotation_interval; - let active_window = bootstrap_height + 3 * seed_rotation_interval; - let leader_scheduler_config = LeaderSchedulerConfig::new( - leader_keypair.pubkey(), - Some(bootstrap_height), - Some(leader_rotation_interval), - Some(seed_rotation_interval), - Some(active_window), - ); - - let write_stage_info = setup_dummy_write_stage( - Arc::new(leader_keypair), - &leader_scheduler_config, - "test_write_stage_leader_rotation_exit", - ); - - let mut last_id = write_stage_info - .ledger_tail - .last() - .expect("Ledger should not be empty") - .id; - let mut num_hashes = 0; - - let genesis_entry_height = write_stage_info.ledger_tail.len() as u64; - - // Insert a nonzero balance and vote into the bank to make this node eligible - // for leader selection - write_stage_info - .leader_scheduler - .write() - .unwrap() - .push_vote(leader_id, 1); - let dummy_id = Keypair::new().pubkey(); - let accounts = write_stage_info.bank.accounts(); - let new_account = Account::new(1, 10, dummy_id.clone()); - accounts - .write() - .unwrap() - .insert(leader_id, new_account.clone()); - - // Input enough entries to make exactly leader_rotation_interval entries, which will - // trigger a check for leader rotation. Because the next scheduled leader - // is ourselves, we won't exit - for _ in genesis_entry_height..bootstrap_height { - let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); - write_stage_info.entry_sender.send(new_entry).unwrap(); - } - - // Set the next scheduled next leader to some other node - { - let mut leader_scheduler = write_stage_info.leader_scheduler.write().unwrap(); - set_new_leader(&write_stage_info.bank, &mut (*leader_scheduler), 1); - } - - // Input enough dummy entries until the next seed rotation_interval, - // The write_stage will see that it's no longer the leader after - // checking the schedule, and exit - for _ in 0..seed_rotation_interval { - let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); - write_stage_info.entry_sender.send(new_entry).unwrap(); - } - - assert_eq!( - write_stage_info.write_stage.join().unwrap(), - WriteStageReturnType::LeaderRotation - ); - - // Make sure the ledger contains exactly 2 * leader_rotation_interval entries - let (entry_height, _) = - process_ledger(&write_stage_info.leader_ledger_path, &write_stage_info.bank); - remove_dir_all(write_stage_info.leader_ledger_path).unwrap(); - assert_eq!(entry_height, 2 * leader_rotation_interval); - } - - fn make_leader_scheduler( - my_id: Pubkey, - bootstrap_height: u64, - leader_rotation_interval: u64, - seed_rotation_interval: u64, - active_window: u64, - ) -> LeaderScheduler { - let leader_scheduler_config = LeaderSchedulerConfig::new( - my_id, - Some(bootstrap_height), - Some(leader_rotation_interval), - Some(seed_rotation_interval), - Some(active_window), - ); - - let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); - leader_scheduler.push_vote(my_id, 1); - leader_scheduler - } - - #[test] - // Tests for when the leader across slots and epochs are the same - fn test_same_leader_index_calculation() { - // Set up a dummy node - let my_keypair = Arc::new(Keypair::new()); - let my_id = my_keypair.pubkey(); - - // Set up a dummy bank - let bank = Arc::new(Bank::default()); - let accounts = bank.accounts(); - let dummy_id = Keypair::new().pubkey(); - let new_account = Account::new(1, 10, dummy_id.clone()); - accounts.write().unwrap().insert(my_id, new_account.clone()); - - let entry = Entry::new(&Hash::default(), 0, vec![]); - - // Note: An slot is the period of leader_rotation_interval entries - // time during which a leader is in power - - let leader_rotation_interval = 10; - let bootstrap_height = 17; - let seed_rotation_interval = 3 * leader_rotation_interval; - let active_window = bootstrap_height + 3 * seed_rotation_interval; - - let mut leader_scheduler = make_leader_scheduler( - my_id, - bootstrap_height, - leader_rotation_interval, - seed_rotation_interval, - active_window, - ); - - // Case 1: A vector that is completely within a certain slot should return that - // entire vector - let mut len = (leader_scheduler.bootstrap_height - 1) as usize; - let mut input = vec![entry.clone(); len]; - let mut result = WriteStage::find_leader_rotation_index( - &bank, - &my_id, - &mut leader_scheduler, - 0, - input.clone(), - ); - - assert_eq!(result, (input, false)); - - // Case 2: A vector of new entries that spans multiple slots should return the - // entire vector, assuming that the same leader is in power for all the slots. - len = 2 * seed_rotation_interval as usize; - input = vec![entry.clone(); len]; - result = WriteStage::find_leader_rotation_index( - &bank, - &my_id, - &mut leader_scheduler, - bootstrap_height - 1, - input.clone(), - ); - - assert_eq!(result, (input, false)); - - // Case 3: A vector that triggers a check for leader rotation should return - // the entire vector and signal leader_rotation == false, if the - // same leader is in power for the next slot as well. - let mut leader_scheduler = make_leader_scheduler( - my_id, - bootstrap_height, - leader_rotation_interval, - seed_rotation_interval, - active_window, - ); - - len = 1; - input = vec![entry.clone(); len]; - result = WriteStage::find_leader_rotation_index( - &bank, - &my_id, - &mut leader_scheduler, - bootstrap_height - 1, - input.clone(), - ); - - assert_eq!(result, (input.clone(), false)); - - result = WriteStage::find_leader_rotation_index( - &bank, - &my_id, - &mut leader_scheduler, - bootstrap_height + seed_rotation_interval - 1, - input.clone(), - ); - - assert_eq!(result, (input, false)); - } - - // Tests for when the leader across slots / epochs are different - #[test] - fn test_different_leader_index_calculation() { - // Set up a dummy node - let my_keypair = Arc::new(Keypair::new()); - let my_id = my_keypair.pubkey(); - - // Set up a dummy bank - let bank = Arc::new(Bank::default()); - let entry = Entry::new(&Hash::default(), 0, vec![]); - - // Note: An slot is the period of leader_rotation_interval entries - // time during which a leader is in power - - let leader_rotation_interval = 10; - let bootstrap_height = 17; - let seed_rotation_interval = 3 * leader_rotation_interval; - let active_window = 1; - let swap_entry_height = bootstrap_height + 2 * seed_rotation_interval; - - // Case 1: A vector that spans different epochs for different leaders - // should get truncated - - // Set the leader scheduler - let mut leader_scheduler = make_leader_scheduler( - my_id, - bootstrap_height, - leader_rotation_interval, - seed_rotation_interval, - active_window, - ); - - set_new_leader(&bank, &mut leader_scheduler, swap_entry_height); - - // Start test - let mut start_height = bootstrap_height - 1; - let extra_len = leader_rotation_interval; - let expected_len = swap_entry_height - start_height; - let mut len = expected_len + extra_len; - let mut input = vec![entry.clone(); len as usize]; - let mut result = WriteStage::find_leader_rotation_index( - &bank, - &my_id, - &mut leader_scheduler, - start_height, - input.clone(), - ); - input.truncate(expected_len as usize); - assert_eq!(result, (input, true)); - - // Case 2: Start at entry height == the height where the next leader is elected, should - // return no entries - len = 1; - input = vec![entry.clone(); len as usize]; - result = WriteStage::find_leader_rotation_index( - &bank, - &my_id, - &mut leader_scheduler, - swap_entry_height, - input.clone(), - ); - - assert_eq!(result, (vec![], true)); - - // Case 3: A vector that lands one before leader rotation happens should not be - // truncated, and should signal leader rotation == false - - // Reset the leader scheduler - leader_scheduler = make_leader_scheduler( - my_id, - bootstrap_height, - leader_rotation_interval, - seed_rotation_interval, - active_window, - ); - - set_new_leader(&bank, &mut leader_scheduler, swap_entry_height); - - // Start test - start_height = bootstrap_height - 1; - let len_remaining = swap_entry_height - start_height; - len = len_remaining - 1; - input = vec![entry.clone(); len as usize]; - result = WriteStage::find_leader_rotation_index( - &bank, - &my_id, - &mut leader_scheduler, - start_height, - input.clone(), - ); - assert_eq!(result, (input, false)); - - // Case 4: A vector that lands exactly where leader rotation happens should not be - // truncated, but should return leader rotation == true - - // Reset the leader scheduler - leader_scheduler = make_leader_scheduler( - my_id, - bootstrap_height, - leader_rotation_interval, - seed_rotation_interval, - active_window, - ); - set_new_leader(&bank, &mut leader_scheduler, swap_entry_height); - - // Generate the schedule - leader_scheduler.update_height(bootstrap_height, &bank); - - // Start test - start_height = bootstrap_height + leader_rotation_interval - 1; - len = swap_entry_height - start_height; - input = vec![entry.clone(); len as usize]; - result = WriteStage::find_leader_rotation_index( - &bank, - &my_id, - &mut leader_scheduler, - start_height, - input.clone(), - ); - - assert_eq!(result, (input, true)); - } -} diff --git a/tests/multinode.rs b/tests/multinode.rs index 682c3d1011..909f0b61e6 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -774,10 +774,8 @@ fn test_leader_to_validator_transition() { logger::setup(); let leader_rotation_interval = 20; - // Make a dummy validator id to be the next leader and - // sink for this test's mock transactions + // Make a dummy validator id to be the next leader let validator_keypair = Keypair::new(); - let validator_id = validator_keypair.pubkey(); // Create the leader node information let leader_keypair = Keypair::new(); @@ -786,8 +784,12 @@ fn test_leader_to_validator_transition() { // Initialize the leader ledger. Make a mint and a genesis entry // in the leader ledger - let (mint, leader_ledger_path, genesis_entries) = - create_tmp_sample_ledger("test_leader_to_validator_transition", 10_000); + let num_ending_ticks = 1; + let (mint, leader_ledger_path, genesis_entries) = create_tmp_sample_ledger( + "test_leader_to_validator_transition", + 10_000, + num_ending_ticks, + ); let last_id = genesis_entries .last() @@ -798,10 +800,8 @@ fn test_leader_to_validator_transition() { // after the bootstrap height let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); let bootstrap_entries = - make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id); - let bootstrap_entries_len = bootstrap_entries.len(); + make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); ledger_writer.write_entries(bootstrap_entries).unwrap(); - let ledger_initial_len = (genesis_entries.len() + bootstrap_entries_len) as u64; // Start the leader node let bootstrap_height = leader_rotation_interval; @@ -842,39 +842,25 @@ fn test_leader_to_validator_transition() { assert!(converged); - let extra_transactions = std::cmp::max(bootstrap_height / 3, 1); + // Account that will be the sink for all the test's transactions + let bob_pubkey = Keypair::new().pubkey(); - // Push leader "extra_transactions" past bootstrap_height, - // make sure the leader stops. - assert!(ledger_initial_len < bootstrap_height); - for i in ledger_initial_len..(bootstrap_height + extra_transactions) { - if i < bootstrap_height { - // Poll to see that the bank state is updated after every transaction - // to ensure that each transaction is packaged as a single entry, - // so that we can be sure leader rotation is triggered - let expected_balance = std::cmp::min( - bootstrap_height - ledger_initial_len, - i - ledger_initial_len + 1, - ); - let result = send_tx_and_retry_get_balance( - &leader_info, - &mint, - &validator_id, - 1, - Some(expected_balance as i64), - ); + // Push transactions until we detect an exit + let mut i = 1; + loop { + // Poll to see that the bank state is updated after every transaction + // to ensure that each transaction is packaged as a single entry, + // so that we can be sure leader rotation is triggered + let result = + send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, Some(i as i64)); - // If the transaction wasn't reflected in the node, then we assume - // the node has transitioned already - if result != Some(expected_balance as i64) { - break; - } - } else { - // After bootstrap entries, we don't care about the - // number of entries generated by these transactions. These are just - // here for testing to make sure the leader stopped at the correct point. - send_tx_and_retry_get_balance(&leader_info, &mint, &validator_id, 1, None); + // If the transaction wasn't reflected in the node, then we assume + // the node has transitioned already + if result != Some(i as i64) { + break; } + + i += 1; } // Wait for leader to shut down tpu and restart tvu @@ -883,25 +869,22 @@ fn test_leader_to_validator_transition() { _ => panic!("Expected reason for exit to be leader rotation"), } - // Query now validator to make sure that he has the proper balances in his bank - // after the transitions, even though we submitted "extra_transactions" - // transactions earlier + // Query newly transitioned validator to make sure that he has the proper balances in + // the after the transitions let mut leader_client = mk_client(&leader_info); - let maximum_bal = bootstrap_height - ledger_initial_len; - let bal = leader_client - .poll_get_balance(&validator_id) - .expect("Expected success when polling newly transitioned validator for balance") - as u64; - - assert!(bal <= maximum_bal); + // Leader could have executed transactions in bank but not recorded them, so + // we only have an upper bound on the balance + if let Ok(bal) = leader_client.poll_get_balance(&bob_pubkey) { + assert!(bal <= i - 1); + } // Check the ledger to make sure it's the right height, we should've - // transitioned after the bootstrap_height entry - let (_, entry_height, _) = + // transitioned after tick_height == bootstrap_height + let (_, tick_height, _, _) = Fullnode::new_bank_from_ledger(&leader_ledger_path, &mut LeaderScheduler::default()); - assert_eq!(entry_height, bootstrap_height); + assert_eq!(tick_height, bootstrap_height); // Shut down ncp.close().unwrap(); @@ -927,8 +910,9 @@ fn test_leader_validator_basic() { let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); // Make a common mint and a genesis entry for both leader + validator ledgers + let num_ending_ticks = 1; let (mint, leader_ledger_path, genesis_entries) = - create_tmp_sample_ledger("test_leader_to_validator_transition", 10_000); + create_tmp_sample_ledger("test_leader_validator_basic", 10_000, num_ending_ticks); let validator_ledger_path = tmp_copy_ledger(&leader_ledger_path, "test_leader_validator_basic"); @@ -945,10 +929,9 @@ fn test_leader_validator_basic() { // Write the bootstrap entries to the ledger that will cause leader rotation // after the bootstrap height let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); - let bootstrap_entries = - make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id); - let ledger_initial_len = (genesis_entries.len() + bootstrap_entries.len()) as u64; - ledger_writer.write_entries(bootstrap_entries).unwrap(); + let active_set_entries = + make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); + ledger_writer.write_entries(active_set_entries).unwrap(); // Create the leader scheduler config let num_bootstrap_slots = 2; @@ -985,22 +968,21 @@ fn test_leader_validator_basic() { let servers = converge(&leader_info, 2); assert_eq!(servers.len(), 2); - // Send transactions to the leader - let extra_transactions = std::cmp::max(leader_rotation_interval / 3, 1); - - // Push "extra_transactions" past leader_rotation_interval entry height, - // make sure the validator stops. - for i in ledger_initial_len..(bootstrap_height + extra_transactions) { - let expected_balance = std::cmp::min( - bootstrap_height - ledger_initial_len, - i - ledger_initial_len + 1, - ); + // Push transactions until we detect the nodes exit + let mut i = 1; + loop { + // Poll to see that the bank state is updated after every transaction + // to ensure that each transaction is packaged as a single entry, + // so that we can be sure leader rotation is triggered let result = send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, None); + // If the transaction wasn't reflected in the node, then we assume // the node has transitioned already - if result != Some(expected_balance as i64) { + if result != Some(i as i64) { break; } + + i += 1; } // Wait for validator to shut down tvu and restart tpu @@ -1015,6 +997,16 @@ fn test_leader_validator_basic() { _ => panic!("Expected reason for exit to be leader rotation"), } + // Query newly transitioned validator to make sure that he has the proper balances + // in the bank after the transitions + let mut leader_client = mk_client(&leader_info); + + // Leader could have executed transactions in bank but not recorded them, so + // we only have an upper bound on the balance + if let Ok(bal) = leader_client.poll_get_balance(&bob_pubkey) { + assert!(bal <= i - 1); + } + // Shut down validator.close().unwrap(); leader.close().unwrap(); @@ -1082,8 +1074,9 @@ fn test_dropped_handoff_recovery() { let bootstrap_leader_info = bootstrap_leader_node.info.clone(); // Make a common mint and a genesis entry for both leader + validator's ledgers + let num_ending_ticks = 1; let (mint, bootstrap_leader_ledger_path, genesis_entries) = - create_tmp_sample_ledger("test_dropped_handoff_recovery", 10_000); + create_tmp_sample_ledger("test_dropped_handoff_recovery", 10_000, num_ending_ticks); let last_id = genesis_entries .last() @@ -1101,15 +1094,12 @@ fn test_dropped_handoff_recovery() { // Make the entries to give the next_leader validator some stake so that he will be in // leader election active set - let first_entries = - make_active_set_entries(&next_leader_keypair, &mint.keypair(), &last_id, &last_id); - let first_entries_len = first_entries.len(); + let active_set_entries = + make_active_set_entries(&next_leader_keypair, &mint.keypair(), &last_id, &last_id, 0); // Write the entries let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); - ledger_writer.write_entries(first_entries).unwrap(); - - let ledger_initial_len = (genesis_entries.len() + first_entries_len) as u64; + ledger_writer.write_entries(active_set_entries).unwrap(); let next_leader_ledger_path = tmp_copy_ledger( &bootstrap_leader_ledger_path, @@ -1119,10 +1109,13 @@ fn test_dropped_handoff_recovery() { ledger_paths.push(next_leader_ledger_path.clone()); // Create the common leader scheduling configuration + let initial_tick_height = genesis_entries + .iter() + .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64); let num_slots_per_epoch = (N + 1) as u64; let leader_rotation_interval = 5; let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval; - let bootstrap_height = ledger_initial_len + 1; + let bootstrap_height = initial_tick_height + 1; let leader_scheduler_config = LeaderSchedulerConfig::new( bootstrap_leader_info.id, Some(bootstrap_height), @@ -1227,8 +1220,12 @@ fn test_full_leader_validator_network() { } // Make a common mint and a genesis entry for both leader + validator's ledgers - let (mint, bootstrap_leader_ledger_path, genesis_entries) = - create_tmp_sample_ledger("test_full_leader_validator_network", 10_000); + let num_ending_ticks = 1; + let (mint, bootstrap_leader_ledger_path, genesis_entries) = create_tmp_sample_ledger( + "test_full_leader_validator_network", + 10_000, + num_ending_ticks, + ); let last_tick_id = genesis_entries .last() @@ -1251,8 +1248,13 @@ fn test_full_leader_validator_network() { for node_keypair in node_keypairs.iter() { // Make entries to give the validator some stake so that he will be in // leader election active set - let bootstrap_entries = - make_active_set_entries(node_keypair, &mint.keypair(), &last_entry_id, &last_tick_id); + let bootstrap_entries = make_active_set_entries( + node_keypair, + &mint.keypair(), + &last_entry_id, + &last_tick_id, + 0, + ); // Write the entries let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap();