From 0fee854220202b0c90919fe71f9941dd32538dc6 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 26 Oct 2018 09:38:18 -0700 Subject: [PATCH] Revert "Vote contract (#1552)" This reverts commit f6c8e1a4bf86c9bf0e981a1b7bbf5fbb2676c80c. --- src/bank.rs | 135 ++++++----- src/bin/fullnode.rs | 56 +---- src/bin/ledger-tool.rs | 8 +- src/budget_instruction.rs | 16 +- src/budget_program.rs | 5 + src/budget_transaction.rs | 22 +- src/cluster_info.rs | 117 ++++++++- src/drone.rs | 15 +- src/fullnode.rs | 292 +++++++++-------------- src/leader_scheduler.rs | 489 ++++++++++++++------------------------ src/leader_vote_stage.rs | 159 +++++++++++++ src/ledger.rs | 30 ++- src/lib.rs | 3 +- src/replicate_stage.rs | 91 ++++--- src/replicator.rs | 4 +- src/result.rs | 7 - src/rpc.rs | 33 +-- src/rpc_pubsub.rs | 21 +- src/thin_client.rs | 88 +------ src/tpu.rs | 22 +- src/tvu.rs | 26 +- src/vote_program.rs | 151 ------------ src/vote_stage.rs | 335 ++++++++++++++++++++++---- src/vote_transaction.rs | 85 ------- src/wallet.rs | 57 ++--- src/window.rs | 4 +- tests/multinode.rs | 112 ++++----- 27 files changed, 1194 insertions(+), 1189 deletions(-) create mode 100644 src/leader_vote_stage.rs delete mode 100644 src/vote_program.rs delete mode 100644 src/vote_transaction.rs diff --git a/src/bank.rs b/src/bank.rs index 6661912c97..db2383a00f 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -7,6 +7,7 @@ use bincode::deserialize; use bincode::serialize; use bpf_loader; use budget_program::BudgetState; +use budget_transaction::BudgetTransaction; use counter::Counter; use entry::Entry; use hash::{hash, Hash}; @@ -30,7 +31,7 @@ use std; use std::collections::{BTreeMap, HashMap, HashSet}; use std::result; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Mutex, RwLock}; use std::time::Instant; use storage_program::StorageProgram; use system_program::SystemProgram; @@ -41,7 +42,6 @@ use timing::{duration_as_us, timestamp}; use token_program::TokenProgram; use tokio::prelude::Future; use transaction::Transaction; -use vote_program::VoteProgram; use window::WINDOW_SIZE; /// The number of most recent `last_id` values that the bank will track the signatures @@ -151,7 +151,7 @@ impl Default for LastIds { /// The state of all accounts and contracts after processing its entries. pub struct Bank { /// A map of account public keys to the balance in that account. - pub accounts: RwLock>, + accounts: RwLock>, /// set of accounts which are currently in the pipeline account_locks: Mutex>, @@ -171,13 +171,6 @@ pub struct Bank { // Mapping of signatures to Subscriber ids and sinks to notify on confirmation signature_subscriptions: RwLock>>>, - - /// Tracks and updates the leader schedule based on the votes and account stakes - /// processed by the bank - pub leader_scheduler: Arc>, - - // The number of ticks that have elapsed since genesis - tick_height: Mutex, } impl Default for Bank { @@ -190,8 +183,6 @@ impl Default for Bank { finality_time: AtomicUsize::new(std::usize::MAX), account_subscriptions: RwLock::new(HashMap::new()), signature_subscriptions: RwLock::new(HashMap::new()), - leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())), - tick_height: Mutex::new(0), } } } @@ -622,8 +613,6 @@ impl Bank { { return Err(BankError::ProgramRuntimeError(instruction_index as u8)); } - } else if VoteProgram::check_id(&tx_program_id) { - VoteProgram::process_transaction(&tx, instruction_index, program_accounts).is_err(); } else { let mut depth = 0; let mut keys = Vec::new(); @@ -901,28 +890,41 @@ impl Bank { results } - pub fn process_entry(&self, entry: &Entry) -> Result<()> { + pub fn process_entry( + &self, + entry: &Entry, + tick_height: &mut u64, + leader_scheduler: &mut LeaderScheduler, + ) -> Result<()> { if !entry.is_tick() { for result in self.process_transactions(&entry.transactions) { result?; } } else { - let tick_height = { - let mut tick_height_lock = self.tick_height.lock().unwrap(); - *tick_height_lock += 1; - *tick_height_lock - }; - - self.leader_scheduler - .write() - .unwrap() - .update_height(tick_height, self); + *tick_height += 1; self.register_entry_id(&entry.id); } + self.process_entry_votes(entry, *tick_height, leader_scheduler); Ok(()) } + fn process_entry_votes( + &self, + entry: &Entry, + tick_height: u64, + leader_scheduler: &mut LeaderScheduler, + ) { + for tx in &entry.transactions { + if tx.vote().is_some() { + // Update the active set in the leader scheduler + leader_scheduler.push_vote(*tx.from(), tick_height); + } + } + + leader_scheduler.update_height(tick_height, self); + } + /// Process an ordered list of entries, populating a circular buffer "tail" /// as we go. fn process_entries_tail( @@ -930,6 +932,8 @@ impl Bank { entries: &[Entry], tail: &mut Vec, tail_idx: &mut usize, + tick_height: &mut u64, + leader_scheduler: &mut LeaderScheduler, ) -> Result { let mut entry_count = 0; @@ -947,7 +951,7 @@ impl Bank { // the leader scheduler. Next we will extract the vote tracking structure // out of the leader scheduler, and into the bank, and remove the leader // scheduler from these banking functions. - self.process_entry(entry)?; + self.process_entry(entry, tick_height, leader_scheduler)?; } Ok(entry_count) @@ -992,7 +996,6 @@ impl Bank { // if its a tick, execute the group and register the tick self.par_execute_entries(&mt_group)?; self.register_entry_id(&entry.id); - *self.tick_height.lock().unwrap() += 1; mt_group = vec![]; continue; } @@ -1022,18 +1025,17 @@ impl Bank { entries: I, tail: &mut Vec, tail_idx: &mut usize, - ) -> Result + leader_scheduler: &mut LeaderScheduler, + ) -> Result<(u64, u64)> where I: IntoIterator, { // Ledger verification needs to be parallelized, but we can't pull the whole // thing into memory. We therefore chunk it. let mut entry_height = *tail_idx as u64; - + let mut tick_height = 0; for entry in &tail[0..*tail_idx] { - if entry.is_tick() { - *self.tick_height.lock().unwrap() += 1; - } + tick_height += entry.is_tick() as u64 } let mut id = start_hash; @@ -1044,15 +1046,25 @@ impl Bank { return Err(BankError::LedgerVerificationFailed); } id = block.last().unwrap().id; - let entry_count = self.process_entries_tail(&block, tail, tail_idx)?; + let entry_count = self.process_entries_tail( + &block, + tail, + tail_idx, + &mut tick_height, + leader_scheduler, + )?; entry_height += entry_count; } - Ok(entry_height) + Ok((tick_height, entry_height)) } /// Process a full ledger. - pub fn process_ledger(&self, entries: I) -> Result<(u64, u64, Vec)> + pub fn process_ledger( + &self, + entries: I, + leader_scheduler: &mut LeaderScheduler, + ) -> Result<(u64, u64, Vec)> where I: IntoIterator, { @@ -1094,14 +1106,20 @@ impl Bank { tail.push(entry0); tail.push(entry1); let mut tail_idx = 2; - let entry_height = self.process_blocks(entry1_id, entries, &mut tail, &mut tail_idx)?; + let (tick_height, entry_height) = self.process_blocks( + entry1_id, + entries, + &mut tail, + &mut tail_idx, + leader_scheduler, + )?; // check if we need to rotate tail if tail.len() == WINDOW_SIZE as usize { tail.rotate_left(tail_idx) } - Ok((*self.tick_height.lock().unwrap(), entry_height, tail)) + Ok((tick_height, entry_height, tail)) } /// Create, sign, and process a Transaction from `keypair` to `to` of @@ -1218,16 +1236,6 @@ impl Bank { subscriptions.remove(pubkey).is_some() } - pub fn get_current_leader(&self) -> Option { - let ls_lock = self.leader_scheduler.read().unwrap(); - let tick_height = self.tick_height.lock().unwrap(); - ls_lock.get_scheduled_leader(*tick_height) - } - - pub fn get_tick_height(&self) -> u64 { - *self.tick_height.lock().unwrap() - } - fn check_account_subscriptions(&self, pubkey: &Pubkey, account: &Account) { let subscriptions = self.account_subscriptions.read().unwrap(); if let Some(hashmap) = subscriptions.get(pubkey) { @@ -1280,6 +1288,13 @@ impl Bank { } subscriptions.remove(&signature); } + + #[cfg(test)] + // Used to access accounts for things like controlling stake to control + // the eligible set of nodes for leader selection + pub fn accounts(&self) -> &RwLock> { + &self.accounts + } } #[cfg(test)] @@ -1292,6 +1307,7 @@ mod tests { use entry_writer::{self, EntryWriter}; use hash::hash; use jsonrpc_macros::pubsub::{Subscriber, SubscriptionId}; + use leader_scheduler::LeaderScheduler; use ledger; use logger; use signature::Keypair; @@ -1624,7 +1640,8 @@ mod tests { let mint = Mint::new(1); let genesis = mint.create_entries(); let bank = Bank::default(); - bank.process_ledger(genesis).unwrap(); + bank.process_ledger(genesis, &mut LeaderScheduler::default()) + .unwrap(); assert_eq!(bank.get_balance(&mint.pubkey()), 1); } @@ -1701,7 +1718,9 @@ mod tests { let (ledger, pubkey) = create_sample_ledger(1); let (ledger, dup) = ledger.tee(); let bank = Bank::default(); - let (tick_height, ledger_height, tail) = bank.process_ledger(ledger).unwrap(); + let (tick_height, ledger_height, tail) = bank + .process_ledger(ledger, &mut LeaderScheduler::default()) + .unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(ledger_height, 4); assert_eq!(tick_height, 2); @@ -1727,7 +1746,9 @@ mod tests { for entry_count in window_size - 3..window_size + 2 { let (ledger, pubkey) = create_sample_ledger(entry_count); let bank = Bank::default(); - let (tick_height, ledger_height, tail) = bank.process_ledger(ledger).unwrap(); + let (tick_height, ledger_height, tail) = bank + .process_ledger(ledger, &mut LeaderScheduler::default()) + .unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(ledger_height, entry_count as u64 + 3); assert_eq!(tick_height, 2); @@ -1753,7 +1774,8 @@ mod tests { let ledger = to_file_iter(ledger); let bank = Bank::default(); - bank.process_ledger(ledger).unwrap(); + bank.process_ledger(ledger, &mut LeaderScheduler::default()) + .unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); } @@ -1764,7 +1786,8 @@ mod tests { let block = to_file_iter(create_sample_block_with_ticks(&mint, 1, 1)); let bank = Bank::default(); - bank.process_ledger(genesis.chain(block)).unwrap(); + bank.process_ledger(genesis.chain(block), &mut LeaderScheduler::default()) + .unwrap(); assert_eq!(bank.get_balance(&mint.pubkey()), 1); } @@ -1778,9 +1801,13 @@ mod tests { let ledger1 = create_sample_ledger_with_mint_and_keypairs(&mint, &keypairs); let bank0 = Bank::default(); - bank0.process_ledger(ledger0).unwrap(); + bank0 + .process_ledger(ledger0, &mut LeaderScheduler::default()) + .unwrap(); let bank1 = Bank::default(); - bank1.process_ledger(ledger1).unwrap(); + bank1 + .process_ledger(ledger1, &mut LeaderScheduler::default()) + .unwrap(); let initial_state = bank0.hash_internal_state(); diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 9e69a186d3..b9427ef7c2 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -17,16 +17,14 @@ use solana::logger; use solana::metrics::set_panic_hook; use solana::signature::{Keypair, KeypairUtil}; use solana::thin_client::poll_gossip_for_leader; -use solana::vote_program::VoteProgram; use solana::wallet::request_airdrop; use std::fs::File; use std::net::{Ipv4Addr, SocketAddr}; use std::process::exit; -use std::sync::Arc; use std::thread::sleep; use std::time::Duration; -fn main() { +fn main() -> () { logger::setup(); set_panic_hook("fullnode"); let matches = App::new("fullnode") @@ -84,6 +82,7 @@ fn main() { // save off some stuff for airdrop let node_info = node.info.clone(); + let pubkey = keypair.pubkey(); let leader = match network { Some(network) => { @@ -92,16 +91,10 @@ fn main() { None => node_info, }; - let vote_account_keypair = Arc::new(Keypair::new()); - let vote_account_id = vote_account_keypair.pubkey(); - let keypair = Arc::new(keypair); - let pubkey = keypair.pubkey(); - let mut fullnode = Fullnode::new( node, ledger_path, - keypair.clone(), - vote_account_keypair, + keypair, network, false, LeaderScheduler::from_bootstrap_leader(leader.id), @@ -136,49 +129,6 @@ fn main() { } } - // Create the vote account - loop { - let last_id = client.get_last_id(); - if client - .create_vote_account(&keypair, vote_account_id, &last_id, 1) - .is_err() - { - sleep(Duration::from_secs(2)); - continue; - } - - let balance = client.poll_get_balance(&vote_account_id).unwrap_or(0); - - if balance > 0 { - break; - } - - sleep(Duration::from_secs(2)); - } - - // Register the vote account to this node - loop { - let last_id = client.get_last_id(); - if client - .register_vote_account(&keypair, vote_account_id, &last_id) - .is_err() - { - sleep(Duration::from_secs(2)); - continue; - } - - let account_user_data = client.get_account_userdata(&vote_account_id); - if let Ok(Some(account_user_data)) = account_user_data { - if let Ok(vote_state) = VoteProgram::deserialize(&account_user_data) { - if vote_state.node_id == pubkey { - break; - } - } - } - - sleep(Duration::from_secs(2)); - } - loop { let status = fullnode.handle_role_transition(); match status { diff --git a/src/bin/ledger-tool.rs b/src/bin/ledger-tool.rs index dfe9c20222..c8fbcacebb 100644 --- a/src/bin/ledger-tool.rs +++ b/src/bin/ledger-tool.rs @@ -5,6 +5,7 @@ extern crate solana; use clap::{App, Arg, SubCommand}; use solana::bank::Bank; +use solana::leader_scheduler::LeaderScheduler; use solana::ledger::{read_ledger, verify_ledger}; use solana::logger; use std::io::{stdout, Write}; @@ -115,7 +116,7 @@ fn main() { }; let genesis = genesis.take(2).map(|e| e.unwrap()); - if let Err(e) = bank.process_ledger(genesis) { + if let Err(e) = bank.process_ledger(genesis, &mut LeaderScheduler::default()) { eprintln!("verify failed at genesis err: {:?}", e); if !matches.is_present("continue") { exit(1); @@ -141,7 +142,10 @@ fn main() { } last_id = entry.id; - if let Err(e) = bank.process_entry(&entry) { + let mut tick_height = 0; + let mut leader_scheduler = LeaderScheduler::default(); + if let Err(e) = bank.process_entry(&entry, &mut tick_height, &mut leader_scheduler) + { eprintln!("verify failed at entry[{}], err: {:?}", i + 2, e); if !matches.is_present("continue") { exit(1); diff --git a/src/budget_instruction.rs b/src/budget_instruction.rs index c7e83a2cff..bf8b575359 100644 --- a/src/budget_instruction.rs +++ b/src/budget_instruction.rs @@ -1,12 +1,15 @@ use budget::Budget; use chrono::prelude::{DateTime, Utc}; -/// A smart contract. #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] -pub struct Contract { - /// The number of tokens allocated to the `Budget` and any transaction fees. - pub tokens: i64, - pub budget: Budget, +pub struct Vote { + /// We send some gossip specific membership information through the vote to shortcut + /// liveness voting + /// The version of the ClusterInfo struct that the last_id of this network voted with + pub version: u64, + /// The version of the ClusterInfo struct that has the same network configuration as this one + pub contact_info_version: u64, + // TODO: add signature of the state here as well } /// An instruction to progress the smart contract. @@ -21,4 +24,7 @@ pub enum Instruction { /// Tell the budget that the `NewBudget` with `Signature` has been /// signed by the containing transaction's `Pubkey`. ApplySignature, + + /// Vote for a PoH that is equal to the lastid of this transaction + NewVote(Vote), } diff --git a/src/budget_program.rs b/src/budget_program.rs index 51c89ff4dd..d6a2827bc2 100644 --- a/src/budget_program.rs +++ b/src/budget_program.rs @@ -172,6 +172,11 @@ impl BudgetState { Err(BudgetError::UninitializedContract) } } + Instruction::NewVote(_vote) => { + // TODO: move vote instruction into a different contract + trace!("GOT VOTE! last_id={}", tx.last_id); + Ok(()) + } } } fn serialize(&self, output: &mut [u8]) -> Result<(), BudgetError> { diff --git a/src/budget_transaction.rs b/src/budget_transaction.rs index 06c8d017ef..87e2a293d6 100644 --- a/src/budget_transaction.rs +++ b/src/budget_transaction.rs @@ -2,7 +2,7 @@ use bincode::{deserialize, serialize}; use budget::{Budget, Condition}; -use budget_instruction::Instruction; +use budget_instruction::{Instruction, Vote}; use budget_program::BudgetState; use chrono::prelude::*; use hash::Hash; @@ -38,6 +38,8 @@ pub trait BudgetTransaction { last_id: Hash, ) -> Self; + fn budget_new_vote(from_keypair: &Keypair, vote: Vote, last_id: Hash, fee: i64) -> Self; + fn budget_new_on_date( from_keypair: &Keypair, to: Pubkey, @@ -59,6 +61,8 @@ pub trait BudgetTransaction { last_id: Hash, ) -> Self; + fn vote(&self) -> Option<(Pubkey, Vote, Hash)>; + fn instruction(&self, program_index: usize) -> Option; fn system_instruction(&self, program_index: usize) -> Option; @@ -149,6 +153,12 @@ impl BudgetTransaction for Transaction { ) } + fn budget_new_vote(from_keypair: &Keypair, vote: Vote, last_id: Hash, fee: i64) -> Self { + let instruction = Instruction::NewVote(vote); + let userdata = serialize(&instruction).expect("serialize instruction"); + Self::new(from_keypair, &[], BudgetState::id(), userdata, last_id, fee) + } + /// Create and sign a postdated Transaction. Used for unit-testing. fn budget_new_on_date( from_keypair: &Keypair, @@ -209,6 +219,16 @@ impl BudgetTransaction for Transaction { ) } + fn vote(&self) -> Option<(Pubkey, Vote, Hash)> { + if self.instructions.len() > 1 { + None + } else if let Some(Instruction::NewVote(vote)) = self.instruction(0) { + Some((self.account_keys[0], vote, self.last_id)) + } else { + None + } + } + fn instruction(&self, instruction_index: usize) -> Option { deserialize(&self.userdata(instruction_index)).ok() } diff --git a/src/cluster_info.rs b/src/cluster_info.rs index b3734beee8..633066d6d8 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -13,6 +13,7 @@ //! //! Bank needs to provide an interface for us to query the stake weight use bincode::{deserialize, serialize, serialized_size}; +use budget_instruction::Vote; use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy}; use counter::Counter; use hash::Hash; @@ -337,6 +338,47 @@ impl ClusterInfo { self.external_liveness.get(key) } + pub fn insert_vote(&mut self, pubkey: &Pubkey, v: &Vote, last_id: Hash) { + if self.table.get(pubkey).is_none() { + warn!("{}: VOTE for unknown id: {}", self.id, pubkey); + return; + } + if v.contact_info_version > self.table[pubkey].contact_info.version { + warn!( + "{}: VOTE for new address version from: {} ours: {} vote: {:?}", + self.id, pubkey, self.table[pubkey].contact_info.version, v, + ); + return; + } + if *pubkey == self.my_data().leader_id { + info!("{}: LEADER_VOTED! {}", self.id, pubkey); + inc_new_counter_info!("cluster_info-insert_vote-leader_voted", 1); + } + + if v.version <= self.table[pubkey].version { + debug!("{}: VOTE for old version: {}", self.id, pubkey); + self.update_liveness(*pubkey); + return; + } else { + let mut data = self.table[pubkey].clone(); + data.version = v.version; + data.ledger_state.last_id = last_id; + + debug!("{}: INSERTING VOTE! for {}", self.id, data.id); + self.update_liveness(data.id); + self.insert(&data); + } + } + pub fn insert_votes(&mut self, votes: &[(Pubkey, Vote, Hash)]) { + inc_new_counter_info!("cluster_info-vote-count", votes.len()); + if !votes.is_empty() { + info!("{}: INSERTING VOTES {}", self.id, votes.len()); + } + for v in votes { + self.insert_vote(&v.0, &v.1, v.2); + } + } + pub fn insert(&mut self, v: &NodeInfo) -> usize { // TODO check that last_verified types are always increasing // update the peer table @@ -497,7 +539,7 @@ impl ClusterInfo { ); // Make sure the next leader in line knows about the entries before his slot in the leader - // rotation so they can initiate repairs if necessary + // rotation so he can initiate repairs if necessary { let ls_lock = leader_scheduler.read().unwrap(); let next_leader_height = ls_lock.max_height_for_leader(tick_height); @@ -782,6 +824,22 @@ impl ClusterInfo { Ok((v.contact_info.ncp, req)) } + pub fn new_vote(&mut self, last_id: Hash) -> Result<(Vote, SocketAddr)> { + let mut me = self.my_data().clone(); + let leader = self + .leader_data() + .ok_or(ClusterInfoError::NoLeader)? + .clone(); + me.version += 1; + me.ledger_state.last_id = last_id; + let vote = Vote { + version: me.version, + contact_info_version: me.contact_info.version, + }; + self.insert(&me); + Ok((vote, leader.contact_info.tpu)) + } + /// At random pick a node and try to get updated changes from them fn run_gossip(obj: &Arc>, blob_sender: &BlobSender) -> Result<()> { //TODO we need to keep track of stakes and weight the selection by stake size @@ -1330,6 +1388,7 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) { #[cfg(test)] mod tests { use bincode::serialize; + use budget_instruction::Vote; use cluster_info::{ ClusterInfo, ClusterInfoError, Node, NodeInfo, Protocol, FULLNODE_PORT_RANGE, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, @@ -1377,6 +1436,62 @@ mod tests { assert_eq!(cluster_info.table[&d.id].version, 3); assert!(liveness < cluster_info.alive[&d.id]); } + #[test] + fn test_new_vote() { + let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); + assert_eq!(d.version, 0); + let mut cluster_info = ClusterInfo::new(d.clone()).unwrap(); + assert_eq!(cluster_info.table[&d.id].version, 0); + let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1235")); + assert_ne!(d.id, leader.id); + assert_matches!( + cluster_info.new_vote(Hash::default()).err(), + Some(Error::ClusterInfoError(ClusterInfoError::NoLeader)) + ); + cluster_info.insert(&leader); + assert_matches!( + cluster_info.new_vote(Hash::default()).err(), + Some(Error::ClusterInfoError(ClusterInfoError::NoLeader)) + ); + cluster_info.set_leader(leader.id); + assert_eq!(cluster_info.table[&d.id].version, 1); + let v = Vote { + version: 2, //version should increase when we vote + contact_info_version: 0, + }; + let expected = (v, cluster_info.table[&leader.id].contact_info.tpu); + assert_eq!(cluster_info.new_vote(Hash::default()).unwrap(), expected); + } + + #[test] + fn test_insert_vote() { + let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); + assert_eq!(d.version, 0); + let mut cluster_info = ClusterInfo::new(d.clone()).unwrap(); + assert_eq!(cluster_info.table[&d.id].version, 0); + let vote_same_version = Vote { + version: d.version, + contact_info_version: 0, + }; + cluster_info.insert_vote(&d.id, &vote_same_version, Hash::default()); + assert_eq!(cluster_info.table[&d.id].version, 0); + + let vote_new_version_new_addrs = Vote { + version: d.version + 1, + contact_info_version: 1, + }; + cluster_info.insert_vote(&d.id, &vote_new_version_new_addrs, Hash::default()); + //should be dropped since the address is newer then we know + assert_eq!(cluster_info.table[&d.id].version, 0); + + let vote_new_version_old_addrs = Vote { + version: d.version + 1, + contact_info_version: 0, + }; + cluster_info.insert_vote(&d.id, &vote_new_version_old_addrs, Hash::default()); + //should be accepted, since the update is for the same address field as the one we know + assert_eq!(cluster_info.table[&d.id].version, 1); + } fn sorted(ls: &Vec<(NodeInfo, u64)>) -> Vec<(NodeInfo, u64)> { let mut copy: Vec<_> = ls.iter().cloned().collect(); copy.sort_by(|x, y| x.0.id.cmp(&y.0.id)); diff --git a/src/drone.rs b/src/drone.rs index 3cd46b7901..311298508d 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -235,7 +235,6 @@ mod tests { use signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; use std::net::{SocketAddr, UdpSocket}; - use std::sync::{Arc, RwLock}; use std::time::Duration; use thin_client::ThinClient; @@ -314,24 +313,18 @@ mod tests { const TPS_BATCH: i64 = 5_000_000; logger::setup(); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000_000); - let mut bank = Bank::new(&alice); - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader.info.id, - ))); - bank.leader_scheduler = leader_scheduler; + let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let carlos_pubkey = Keypair::new().pubkey(); let leader_data = leader.info.clone(); let ledger_path = get_tmp_ledger_path("send_airdrop"); - let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, - vote_account_keypair, bank, 0, 0, @@ -340,6 +333,7 @@ mod tests { None, &ledger_path, false, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); @@ -374,14 +368,13 @@ mod tests { // restart the leader, drone should find the new one at the same gossip port server.close().unwrap(); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let server = Fullnode::new( leader, &ledger_path, leader_keypair, - Arc::new(Keypair::new()), None, false, LeaderScheduler::from_bootstrap_leader(leader_data.id), diff --git a/src/fullnode.rs b/src/fullnode.rs index 142ff80de6..3b15b3a4ba 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -85,12 +85,12 @@ pub enum FullnodeReturnType { pub struct Fullnode { pub node_role: Option, + pub leader_scheduler: Arc>, keypair: Arc, - vote_account_keypair: Arc, exit: Arc, rpu: Option, - rpc_service: Option, - rpc_pubsub_service: Option, + rpc_service: JsonRpcService, + rpc_pubsub_service: PubSubService, ncp: Ncp, bank: Arc, cluster_info: Arc>, @@ -104,7 +104,6 @@ pub struct Fullnode { broadcast_socket: UdpSocket, requests_socket: UdpSocket, respond_socket: UdpSocket, - rpc_port: Option, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -133,17 +132,14 @@ impl Fullnode { pub fn new( node: Node, ledger_path: &str, - keypair: Arc, - vote_account_keypair: Arc, + keypair: Keypair, leader_addr: Option, sigverify_disabled: bool, - leader_scheduler: LeaderScheduler, + mut leader_scheduler: LeaderScheduler, ) -> Self { - let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); - info!("creating bank..."); let (bank, tick_height, entry_height, ledger_tail) = - Self::new_bank_from_ledger(ledger_path, leader_scheduler); + Self::new_bank_from_ledger(ledger_path, &mut leader_scheduler); info!("creating networking stack..."); let local_gossip_addr = node.sockets.gossip.local_addr().unwrap(); @@ -158,7 +154,6 @@ impl Fullnode { let leader_info = leader_addr.map(|i| NodeInfo::new_entry_point(&i)); let server = Self::new_with_bank( keypair, - vote_account_keypair, bank, tick_height, entry_height, @@ -167,6 +162,7 @@ impl Fullnode { leader_info.as_ref(), ledger_path, sigverify_disabled, + leader_scheduler, None, ); @@ -240,8 +236,7 @@ impl Fullnode { /// ``` #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn new_with_bank( - keypair: Arc, - vote_account_keypair: Arc, + keypair: Keypair, bank: Bank, tick_height: u64, entry_height: u64, @@ -250,6 +245,7 @@ impl Fullnode { bootstrap_leader_info_option: Option<&NodeInfo>, ledger_path: &str, sigverify_disabled: bool, + leader_scheduler: LeaderScheduler, rpc_port: Option, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); @@ -278,8 +274,21 @@ impl Fullnode { ClusterInfo::new(node.info).expect("ClusterInfo::new"), )); - let (rpc_service, rpc_pubsub_service) = - Self::startup_rpc_services(rpc_port, &bank, &cluster_info); + // Use custom RPC port, if provided (`Some(port)`) + // RPC port may be any open port on the node + // If rpc_port == `None`, node will listen on the default RPC_PORT from Rpc module + // If rpc_port == `Some(0)`, node will dynamically choose any open port for both + // Rpc and RpcPubsub serivces. Useful for tests. + let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), rpc_port.unwrap_or(RPC_PORT)); + // TODO: The RPC service assumes that there is a drone running on the leader + // Drone location/id will need to be handled a different way as soon as leader rotation begins + let rpc_service = JsonRpcService::new(&bank, &cluster_info, rpc_addr, exit.clone()); + + let rpc_pubsub_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::from(0)), + rpc_port.map_or(RPC_PORT + 1, |port| if port == 0 { port } else { port + 1 }), + ); + let rpc_pubsub_service = PubSubService::new(&bank, rpc_pubsub_addr, exit.clone()); let ncp = Ncp::new( &cluster_info, @@ -289,6 +298,9 @@ impl Fullnode { exit.clone(), ); + let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); + let keypair = Arc::new(keypair); + // Insert the bootstrap leader info, should only be None if this node // is the bootstrap leader if let Some(bootstrap_leader_info) = bootstrap_leader_info_option { @@ -296,8 +308,10 @@ impl Fullnode { } // Get the scheduled leader - let scheduled_leader = bank - .get_current_leader() + let scheduled_leader = leader_scheduler + .read() + .unwrap() + .get_scheduled_leader(tick_height) .expect("Leader not known after processing bank"); cluster_info.write().unwrap().set_leader(scheduled_leader); @@ -305,8 +319,8 @@ impl Fullnode { // Start in validator mode. let tvu = Tvu::new( keypair.clone(), - vote_account_keypair.clone(), &bank, + tick_height, entry_height, cluster_info.clone(), shared_window.clone(), @@ -324,17 +338,20 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(ledger_path), + leader_scheduler.clone(), ); let validator_state = ValidatorServices::new(tvu); Some(NodeRole::Validator(validator_state)) } else { let max_tick_height = { - let ls_lock = bank.leader_scheduler.read().unwrap(); + let ls_lock = leader_scheduler.read().unwrap(); ls_lock.max_height_for_leader(tick_height) }; // Start in leader mode. let (tpu, entry_receiver, tpu_exit) = Tpu::new( + keypair.clone(), &bank, + &cluster_info, Default::default(), node.sockets .transaction @@ -357,7 +374,7 @@ impl Fullnode { shared_window.clone(), entry_height, entry_receiver, - bank.leader_scheduler.clone(), + leader_scheduler.clone(), tick_height, tpu_exit, ); @@ -367,15 +384,14 @@ impl Fullnode { Fullnode { keypair, - vote_account_keypair, cluster_info, shared_window, bank, sigverify_disabled, rpu, ncp, - rpc_service: Some(rpc_service), - rpc_pubsub_service: Some(rpc_pubsub_service), + rpc_service, + rpc_pubsub_service, node_role, ledger_path: ledger_path.to_owned(), exit, @@ -386,50 +402,27 @@ impl Fullnode { broadcast_socket: node.sockets.broadcast, requests_socket: node.sockets.requests, respond_socket: node.sockets.respond, - rpc_port, + leader_scheduler, } } fn leader_to_validator(&mut self) -> Result<()> { - // Close down any services that could have a reference to the bank - if self.rpu.is_some() { - let old_rpu = self.rpu.take().unwrap(); - old_rpu.close()?; - } + let (scheduled_leader, tick_height, entry_height, last_entry_id) = { + let mut ls_lock = self.leader_scheduler.write().unwrap(); + // Clear the leader scheduler + ls_lock.reset(); - if self.rpc_service.is_some() { - let old_rpc_service = self.rpc_service.take().unwrap(); - old_rpc_service.close()?; - } - - if self.rpc_pubsub_service.is_some() { - let old_rpc_pubsub_service = self.rpc_pubsub_service.take().unwrap(); - old_rpc_pubsub_service.close()?; - } - - // Correctness check: Ensure that references to the bank and leader scheduler are no - // longer held by any running thread - let mut new_leader_scheduler = self.bank.leader_scheduler.read().unwrap().clone(); - - // Clear the leader scheduler - new_leader_scheduler.reset(); - - let (new_bank, scheduled_leader, tick_height, entry_height, last_entry_id) = { // TODO: We can avoid building the bank again once RecordStage is // integrated with BankingStage - let (new_bank, tick_height, entry_height, ledger_tail) = Self::new_bank_from_ledger( - &self.ledger_path, - Arc::new(RwLock::new(new_leader_scheduler)), - ); + let (bank, tick_height, entry_height, ledger_tail) = + Self::new_bank_from_ledger(&self.ledger_path, &mut *ls_lock); - let new_bank = Arc::new(new_bank); - let scheduled_leader = new_bank - .get_current_leader() - .expect("Scheduled leader should exist after rebuilding bank"); + self.bank = Arc::new(bank); ( - new_bank, - scheduled_leader, + ls_lock + .get_scheduled_leader(entry_height) + .expect("Scheduled leader should exist after rebuilding bank"), tick_height, entry_height, ledger_tail @@ -444,23 +437,21 @@ impl Fullnode { .unwrap() .set_leader(scheduled_leader); - // Spin up new versions of all the services that relied on the bank, passing in the - // new bank - self.rpu = Some(Rpu::new( - &new_bank, - self.requests_socket - .try_clone() - .expect("Failed to clone requests socket"), - self.respond_socket - .try_clone() - .expect("Failed to clone respond socket"), - )); - - let (rpc_service, rpc_pubsub_service) = - Self::startup_rpc_services(self.rpc_port, &new_bank, &self.cluster_info); - self.rpc_service = Some(rpc_service); - self.rpc_pubsub_service = Some(rpc_pubsub_service); - self.bank = new_bank; + // Make a new RPU to serve requests out of the new bank we've created + // instead of the old one + if self.rpu.is_some() { + let old_rpu = self.rpu.take().unwrap(); + old_rpu.close()?; + self.rpu = Some(Rpu::new( + &self.bank, + self.requests_socket + .try_clone() + .expect("Failed to clone requests socket"), + self.respond_socket + .try_clone() + .expect("Failed to clone respond socket"), + )); + } // In the rare case that the leader exited on a multiple of seed_rotation_interval // when the new leader schedule was being generated, and there are no other validators @@ -468,31 +459,32 @@ impl Fullnode { // check for that if scheduled_leader == self.keypair.pubkey() { self.validator_to_leader(tick_height, entry_height, last_entry_id); - Ok(()) - } else { - let tvu = Tvu::new( - self.keypair.clone(), - self.vote_account_keypair.clone(), - &self.bank, - entry_height, - self.cluster_info.clone(), - self.shared_window.clone(), - self.replicate_socket - .iter() - .map(|s| s.try_clone().expect("Failed to clone replicate sockets")) - .collect(), - self.repair_socket - .try_clone() - .expect("Failed to clone repair socket"), - self.retransmit_socket - .try_clone() - .expect("Failed to clone retransmit socket"), - Some(&self.ledger_path), - ); - let validator_state = ValidatorServices::new(tvu); - self.node_role = Some(NodeRole::Validator(validator_state)); - Ok(()) + return Ok(()); } + + let tvu = Tvu::new( + self.keypair.clone(), + &self.bank, + tick_height, + entry_height, + self.cluster_info.clone(), + self.shared_window.clone(), + self.replicate_socket + .iter() + .map(|s| s.try_clone().expect("Failed to clone replicate sockets")) + .collect(), + self.repair_socket + .try_clone() + .expect("Failed to clone repair socket"), + self.retransmit_socket + .try_clone() + .expect("Failed to clone retransmit socket"), + Some(&self.ledger_path), + self.leader_scheduler.clone(), + ); + let validator_state = ValidatorServices::new(tvu); + self.node_role = Some(NodeRole::Validator(validator_state)); + Ok(()) } fn validator_to_leader(&mut self, tick_height: u64, entry_height: u64, last_entry_id: Hash) { @@ -502,12 +494,14 @@ impl Fullnode { .set_leader(self.keypair.pubkey()); let max_tick_height = { - let ls_lock = self.bank.leader_scheduler.read().unwrap(); + let ls_lock = self.leader_scheduler.read().unwrap(); ls_lock.max_height_for_leader(tick_height) }; let (tpu, blob_receiver, tpu_exit) = Tpu::new( + self.keypair.clone(), &self.bank, + &self.cluster_info, Default::default(), self.transaction_sockets .iter() @@ -531,7 +525,7 @@ impl Fullnode { self.shared_window.clone(), entry_height, blob_receiver, - self.bank.leader_scheduler.clone(), + self.leader_scheduler.clone(), tick_height, tpu_exit, ); @@ -575,12 +569,6 @@ impl Fullnode { if let Some(ref rpu) = self.rpu { rpu.exit(); } - if let Some(ref rpc_service) = self.rpc_service { - rpc_service.exit(); - } - if let Some(ref rpc_pubsub_service) = self.rpc_pubsub_service { - rpc_pubsub_service.exit(); - } match self.node_role { Some(NodeRole::Leader(ref leader_services)) => leader_services.exit(), Some(NodeRole::Validator(ref validator_services)) => validator_services.exit(), @@ -595,50 +583,21 @@ impl Fullnode { pub fn new_bank_from_ledger( ledger_path: &str, - leader_scheduler: Arc>, + leader_scheduler: &mut LeaderScheduler, ) -> (Bank, u64, u64, Vec) { - let mut bank = Bank::new_with_builtin_programs(); - bank.leader_scheduler = leader_scheduler; + let bank = Bank::new_with_builtin_programs(); let entries = read_ledger(ledger_path, true).expect("opening ledger"); let entries = entries .map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err))); info!("processing ledger..."); - let (tick_height, entry_height, ledger_tail) = - bank.process_ledger(entries).expect("process_ledger"); + let (tick_height, entry_height, ledger_tail) = bank + .process_ledger(entries, leader_scheduler) + .expect("process_ledger"); // entry_height is the network-wide agreed height of the ledger. // initialize it from the input ledger info!("processed {} ledger...", entry_height); (bank, tick_height, entry_height, ledger_tail) } - - pub fn get_leader_scheduler(&self) -> &Arc> { - &self.bank.leader_scheduler - } - - fn startup_rpc_services( - rpc_port: Option, - bank: &Arc, - cluster_info: &Arc>, - ) -> (JsonRpcService, PubSubService) { - // Use custom RPC port, if provided (`Some(port)`) - // RPC port may be any open port on the node - // If rpc_port == `None`, node will listen on the default RPC_PORT from Rpc module - // If rpc_port == `Some(0)`, node will dynamically choose any open port for both - // Rpc and RpcPubsub serivces. Useful for tests. - - let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), rpc_port.unwrap_or(RPC_PORT)); - let rpc_pubsub_addr = SocketAddr::new( - IpAddr::V4(Ipv4Addr::from(0)), - rpc_port.map_or(RPC_PORT + 1, |port| if port == 0 { port } else { port + 1 }), - ); - - // TODO: The RPC service assumes that there is a drone running on the leader - // Drone location/id will need to be handled a different way as soon as leader rotation begins - ( - JsonRpcService::new(bank, cluster_info, rpc_addr), - PubSubService::new(bank, rpc_pubsub_addr), - ) - } } impl Service for Fullnode { @@ -648,14 +607,9 @@ impl Service for Fullnode { if let Some(rpu) = self.rpu { rpu.join()?; } - if let Some(rpc_service) = self.rpc_service { - rpc_service.join()?; - } - if let Some(rpc_pubsub_service) = self.rpc_pubsub_service { - rpc_pubsub_service.join()?; - } - self.ncp.join()?; + self.rpc_service.join()?; + self.rpc_pubsub_service.join()?; match self.node_role { Some(NodeRole::Validator(validator_service)) => { @@ -689,7 +643,7 @@ mod tests { use std::fs::remove_dir_all; use std::net::UdpSocket; use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; + use std::sync::Arc; use streamer::responder; #[test] @@ -697,19 +651,13 @@ mod tests { let keypair = Keypair::new(); let tn = Node::new_localhost_with_pubkey(keypair.pubkey()); let (mint, validator_ledger_path) = create_tmp_genesis("validator_exit", 10_000); - let mut bank = Bank::new(&mint); + let bank = Bank::new(&mint); let entry = tn.info.clone(); let genesis_entries = &mint.create_entries(); let entry_height = genesis_entries.len() as u64; - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - entry.id, - ))); - bank.leader_scheduler = leader_scheduler; - let v = Fullnode::new_with_bank( - Arc::new(keypair), - Arc::new(Keypair::new()), + keypair, bank, 0, entry_height, @@ -718,6 +666,7 @@ mod tests { Some(&entry), &validator_ledger_path, false, + LeaderScheduler::from_bootstrap_leader(entry.id), Some(0), ); v.close().unwrap(); @@ -734,20 +683,13 @@ mod tests { let (mint, validator_ledger_path) = create_tmp_genesis(&format!("validator_parallel_exit_{}", i), 10_000); ledger_paths.push(validator_ledger_path.clone()); - let mut bank = Bank::new(&mint); + let bank = Bank::new(&mint); let entry = tn.info.clone(); let genesis_entries = &mint.create_entries(); let entry_height = genesis_entries.len() as u64; - - let leader_scheduler = Arc::new(RwLock::new( - LeaderScheduler::from_bootstrap_leader(entry.id), - )); - bank.leader_scheduler = leader_scheduler; - Fullnode::new_with_bank( - Arc::new(keypair), - Arc::new(Keypair::new()), + keypair, bank, 0, entry_height, @@ -756,6 +698,7 @@ mod tests { Some(&entry), &validator_ledger_path, false, + LeaderScheduler::from_bootstrap_leader(entry.id), Some(0), ) }).collect(); @@ -814,8 +757,7 @@ mod tests { let mut bootstrap_leader = Fullnode::new( bootstrap_leader_node, &bootstrap_leader_ledger_path, - Arc::new(bootstrap_leader_keypair), - Arc::new(Keypair::new()), + bootstrap_leader_keypair, Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -841,7 +783,7 @@ mod tests { #[test] fn test_wrong_role_transition() { // Create the leader node information - let bootstrap_leader_keypair = Arc::new(Keypair::new()); + let bootstrap_leader_keypair = Keypair::new(); let bootstrap_leader_node = Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey()); let bootstrap_leader_info = bootstrap_leader_node.info.clone(); @@ -863,7 +805,7 @@ mod tests { // Write the entries to the ledger that will cause leader rotation // after the bootstrap height let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); - let (active_set_entries, validator_vote_account_keypair) = make_active_set_entries( + let active_set_entries = make_active_set_entries( &validator_keypair, &mint.keypair(), &last_id, @@ -895,12 +837,10 @@ mod tests { ); // Test that a node knows to transition to a validator based on parsing the ledger - let leader_vote_account_keypair = Arc::new(Keypair::new()); let bootstrap_leader = Fullnode::new( bootstrap_leader_node, &bootstrap_leader_ledger_path, bootstrap_leader_keypair, - leader_vote_account_keypair, Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -917,8 +857,7 @@ mod tests { let validator = Fullnode::new( validator_node, &bootstrap_leader_ledger_path, - Arc::new(validator_keypair), - Arc::new(validator_vote_account_keypair), + validator_keypair, Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -966,7 +905,7 @@ mod tests { // // 2) A vote from the validator let mut ledger_writer = LedgerWriter::open(&validator_ledger_path, false).unwrap(); - let (active_set_entries, validator_vote_account_keypair) = + let active_set_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); let initial_tick_height = genesis_entries .iter() @@ -994,8 +933,7 @@ mod tests { let mut validator = Fullnode::new( validator_node, &validator_ledger_path, - Arc::new(validator_keypair), - Arc::new(validator_vote_account_keypair), + validator_keypair, Some(leader_ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -1055,7 +993,7 @@ mod tests { // transitioned after tick_height = bootstrap_height. let (_, tick_height, entry_height, _) = Fullnode::new_bank_from_ledger( &validator_ledger_path, - Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))), + &mut LeaderScheduler::new(&leader_scheduler_config), ); assert_eq!(tick_height, bootstrap_height); diff --git a/src/leader_scheduler.rs b/src/leader_scheduler.rs index 19b02b01d6..58f93a5f24 100644 --- a/src/leader_scheduler.rs +++ b/src/leader_scheduler.rs @@ -4,24 +4,82 @@ use bank::Bank; use bincode::serialize; +use budget_instruction::Vote; +use budget_transaction::BudgetTransaction; use byteorder::{LittleEndian, ReadBytesExt}; use entry::Entry; use hash::{hash, Hash}; use ledger::create_ticks; use signature::{Keypair, KeypairUtil}; +#[cfg(test)] +use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; -use std::collections::HashSet; +use std::collections::HashMap; use std::io::Cursor; use system_transaction::SystemTransaction; use transaction::Transaction; -use vote_program::{Vote, VoteProgram}; -use vote_transaction::VoteTransaction; pub const DEFAULT_BOOTSTRAP_HEIGHT: u64 = 1000; pub const DEFAULT_LEADER_ROTATION_INTERVAL: u64 = 100; pub const DEFAULT_SEED_ROTATION_INTERVAL: u64 = 1000; pub const DEFAULT_ACTIVE_WINDOW_LENGTH: u64 = 1000; +#[derive(Debug)] +pub struct ActiveValidators { + // Map from validator id to the last PoH height at which they voted, + pub active_validators: HashMap, + pub active_window_length: u64, +} + +impl ActiveValidators { + pub fn new(active_window_length_option: Option) -> Self { + let mut active_window_length = DEFAULT_ACTIVE_WINDOW_LENGTH; + if let Some(input) = active_window_length_option { + active_window_length = input; + } + + ActiveValidators { + active_validators: HashMap::new(), + active_window_length, + } + } + + // Finds all the active voters who have voted in the range + // (height - active_window_length, height], and removes + // anybody who hasn't voted in that range from the map + pub fn get_active_set(&mut self, height: u64) -> Vec { + // Don't filter anything if height is less than the + // size of the active window. Otherwise, calculate the acceptable + // window and filter the active_validators + + // Note: height == 0 will only be included for all + // height < self.active_window_length + let upper_bound = height; + if height >= self.active_window_length { + let lower_bound = height - self.active_window_length; + self.active_validators + .retain(|_, height| *height > lower_bound); + } + + self.active_validators + .iter() + .filter_map(|(k, v)| if *v <= upper_bound { Some(*k) } else { None }) + .collect() + } + + // Push a vote for a validator with id == "id" who voted at PoH height == "height" + pub fn push_vote(&mut self, id: Pubkey, height: u64) -> () { + let old_height = self.active_validators.entry(id).or_insert(height); + if height > *old_height { + *old_height = height; + } + } + + pub fn reset(&mut self) -> () { + self.active_validators.clear(); + } +} + pub struct LeaderSchedulerConfig { // The first leader who will bootstrap the network pub bootstrap_leader: Pubkey, @@ -61,7 +119,7 @@ impl LeaderSchedulerConfig { } } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct LeaderScheduler { // Set to true if we want the default implementation of the LeaderScheduler, // where ony the bootstrap leader is used @@ -81,13 +139,12 @@ pub struct LeaderScheduler { // the leader rotation process begins to pick future leaders pub bootstrap_height: u64, + // Maintain the set of active validators + pub active_validators: ActiveValidators, + // The last height at which the seed + schedule was generated pub last_seed_height: Option, - // The length of time in ticks for which a vote qualifies a candidate for leader - // selection - pub active_window_length: u64, - // Round-robin ordering for the validators leader_schedule: Vec, @@ -136,11 +193,6 @@ impl LeaderScheduler { seed_rotation_interval = input; } - let mut active_window_length = DEFAULT_ACTIVE_WINDOW_LENGTH; - if let Some(input) = config.active_window_length_option { - active_window_length = input; - } - // Enforced invariants assert!(seed_rotation_interval >= leader_rotation_interval); assert!(bootstrap_height > 0); @@ -148,13 +200,13 @@ impl LeaderScheduler { LeaderScheduler { use_only_bootstrap_leader: false, + active_validators: ActiveValidators::new(config.active_window_length_option), leader_rotation_interval, seed_rotation_interval, leader_schedule: Vec::new(), last_seed_height: None, bootstrap_leader: config.bootstrap_leader, bootstrap_height, - active_window_length, seed: 0, } } @@ -228,6 +280,15 @@ impl LeaderScheduler { pub fn reset(&mut self) { self.last_seed_height = None; + self.active_validators.reset(); + } + + pub fn push_vote(&mut self, id: Pubkey, height: u64) { + if self.use_only_bootstrap_leader { + return; + } + + self.active_validators.push_vote(id, height); } pub fn update_height(&mut self, height: u64, bank: &Bank) { @@ -282,34 +343,8 @@ impl LeaderScheduler { Some(self.leader_schedule[validator_index]) } - // TODO: We use a HashSet for now because a single validator could potentially register - // multiple vote account. Once that is no longer possible (see the TODO in vote_program.rs, - // process_transaction(), case VoteInstruction::RegisterAccount), we can use a vector. - fn get_active_set(&mut self, height: u64, bank: &Bank) -> HashSet { - let upper_bound = height; - let lower_bound = height.saturating_sub(self.active_window_length); - - { - let bank_accounts = &*bank.accounts.read().unwrap(); - - bank_accounts - .values() - .filter_map(|account| { - if VoteProgram::check_id(&account.program_id) { - if let Ok(vote_state) = VoteProgram::deserialize(&account.userdata) { - return vote_state - .votes - .back() - .filter(|vote| { - vote.tick_height > lower_bound - && vote.tick_height <= upper_bound - }).map(|_| vote_state.node_id); - } - } - - None - }).collect() - } + fn get_active_set(&mut self, height: u64) -> Vec { + self.active_validators.get_active_set(height) } // Called every seed_rotation_interval entries, generates the leader schedule @@ -319,8 +354,8 @@ impl LeaderScheduler { assert!((height - self.bootstrap_height) % self.seed_rotation_interval == 0); let seed = Self::calculate_seed(height); self.seed = seed; - let active_set = self.get_active_set(height, &bank); - let ranked_active_set = Self::rank_active_set(bank, active_set.iter()); + let active_set = self.get_active_set(height); + let ranked_active_set = Self::rank_active_set(bank, &active_set[..]); // Handle case where there are no active validators with // non-zero stake. In this case, use the bootstrap leader for @@ -382,11 +417,9 @@ impl LeaderScheduler { bank.get_balance(id) } - fn rank_active_set<'a, I>(bank: &Bank, active: I) -> Vec<(&'a Pubkey, u64)> - where - I: Iterator, - { + fn rank_active_set<'a>(bank: &Bank, active: &'a [Pubkey]) -> Vec<(&'a Pubkey, u64)> { let mut active_accounts: Vec<(&'a Pubkey, u64)> = active + .iter() .filter_map(|pk| { let stake = Self::get_stake(pk, bank); if stake > 0 { @@ -445,6 +478,24 @@ impl Default for LeaderScheduler { } } +// Remove all candiates for leader selection from the active set by clearing the bank, +// and then set a single new candidate who will be eligible starting at height = vote_height +// by adding one new account to the bank +#[cfg(test)] +pub fn set_new_leader(bank: &Bank, leader_scheduler: &mut LeaderScheduler, vote_height: u64) { + // Set the scheduled next leader to some other node + let new_leader_keypair = Keypair::new(); + let new_leader_id = new_leader_keypair.pubkey(); + leader_scheduler.push_vote(new_leader_id, vote_height); + let dummy_id = Keypair::new().pubkey(); + let new_account = Account::new(1, 10, dummy_id.clone()); + + // Remove the previous acounts from the active set + let mut accounts = bank.accounts().write().unwrap(); + accounts.clear(); + accounts.insert(new_leader_id, new_account); +} + // Create two entries so that the node with keypair == active_keypair // is in the active set for leader selection: // 1) Give the node a nonzero number of tokens, @@ -455,107 +506,50 @@ pub fn make_active_set_entries( last_entry_id: &Hash, last_tick_id: &Hash, num_ending_ticks: usize, -) -> (Vec, Keypair) { +) -> Vec { // 1) Create transfer token entry let transfer_tx = - Transaction::system_new(&token_source, active_keypair.pubkey(), 2, *last_tick_id); + Transaction::system_new(&token_source, active_keypair.pubkey(), 1, *last_tick_id); let transfer_entry = Entry::new(last_entry_id, 1, vec![transfer_tx]); let mut last_entry_id = transfer_entry.id; - // 2) Create the vote account - let vote_account = Keypair::new(); - let create_vote_account_tx = - Transaction::vote_account_new(active_keypair, vote_account.pubkey(), *last_tick_id, 1); - - let create_vote_account_entry = Entry::new(&last_entry_id, 1, vec![create_vote_account_tx]); - last_entry_id = create_vote_account_entry.id; - - // 3) Register the vote account - let register_vote_account_tx = - Transaction::vote_account_register(active_keypair, vote_account.pubkey(), *last_tick_id, 0); - - let register_vote_account_entry = Entry::new(&last_entry_id, 1, vec![register_vote_account_tx]); - last_entry_id = register_vote_account_entry.id; - - // 4) Create vote entry - let vote = Vote { tick_height: 1 }; - let vote_tx = Transaction::vote_new(&vote_account, vote, *last_tick_id, 0); + // 2) Create vote entry + let vote = Vote { + version: 0, + contact_info_version: 0, + }; + let vote_tx = Transaction::budget_new_vote(&active_keypair, vote, *last_tick_id, 0); let vote_entry = Entry::new(&last_entry_id, 1, vec![vote_tx]); last_entry_id = vote_entry.id; - // 5) Create the ending empty ticks - let mut txs = vec![ - transfer_entry, - create_vote_account_entry, - register_vote_account_entry, - vote_entry, - ]; + // 3) Create the ending empty ticks + let mut txs = vec![transfer_entry, vote_entry]; let empty_ticks = create_ticks(num_ending_ticks, last_entry_id); txs.extend(empty_ticks); - (txs, vote_account) + txs } #[cfg(test)] mod tests { use bank::Bank; - use hash::Hash; use leader_scheduler::{ - LeaderScheduler, LeaderSchedulerConfig, DEFAULT_BOOTSTRAP_HEIGHT, - DEFAULT_LEADER_ROTATION_INTERVAL, DEFAULT_SEED_ROTATION_INTERVAL, + ActiveValidators, LeaderScheduler, LeaderSchedulerConfig, DEFAULT_ACTIVE_WINDOW_LENGTH, + DEFAULT_BOOTSTRAP_HEIGHT, DEFAULT_LEADER_ROTATION_INTERVAL, DEFAULT_SEED_ROTATION_INTERVAL, }; use mint::Mint; - use result::Result; use signature::{Keypair, KeypairUtil}; use solana_sdk::pubkey::Pubkey; use std::collections::HashSet; - use std::hash::Hash as StdHash; + use std::hash::Hash; use std::iter::FromIterator; - use transaction::Transaction; - use vote_program::Vote; - use vote_transaction::VoteTransaction; fn to_hashset_owned(slice: &[T]) -> HashSet where - T: Eq + StdHash + Clone, + T: Eq + Hash + Clone, { HashSet::from_iter(slice.iter().cloned()) } - fn push_vote(vote_account: &Keypair, bank: &Bank, height: u64, last_id: Hash) { - let vote = Vote { - tick_height: height, - }; - - let new_vote_tx = Transaction::vote_new(vote_account, vote, last_id, 0); - - bank.process_transaction(&new_vote_tx).unwrap(); - } - - fn create_vote_account( - node_keypair: &Keypair, - bank: &Bank, - num_tokens: i64, - last_id: Hash, - ) -> Result { - let new_vote_account = Keypair::new(); - - // Create the new vote account - let tx = Transaction::vote_account_new( - node_keypair, - new_vote_account.pubkey(), - last_id, - num_tokens, - ); - bank.process_transaction(&tx)?; - - // Register the vote account to the validator - let tx = - Transaction::vote_account_register(node_keypair, new_vote_account.pubkey(), last_id, 0); - bank.process_transaction(&tx)?; - - Ok(new_vote_account) - } - fn run_scheduler_test( num_validators: usize, bootstrap_height: u64, @@ -578,11 +572,7 @@ mod tests { let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); // Create the bank and validators, which are inserted in order of account balance - let num_vote_account_tokens = 1; - let mint = Mint::new( - (((num_validators + 1) / 2) * (num_validators + 1) - + num_vote_account_tokens * num_validators) as i64, - ); + let mint = Mint::new((((num_validators + 1) / 2) * (num_validators + 1)) as i64); let bank = Bank::new(&mint); let mut validators = vec![]; let last_id = mint @@ -594,24 +584,11 @@ mod tests { let new_validator = Keypair::new(); let new_pubkey = new_validator.pubkey(); validators.push(new_pubkey); - // Give the validator some tokens - bank.transfer( - (i + 1 + num_vote_account_tokens) as i64, - &mint.keypair(), - new_pubkey, - last_id, - ).unwrap(); - - // Create a vote account - let new_vote_account = create_vote_account( - &new_validator, - &bank, - num_vote_account_tokens as i64, - mint.last_id(), - ).unwrap(); // Vote to make the validator part of the active set for the entire test // (we made the active_window_length large enough at the beginning of the test) - push_vote(&new_vote_account, &bank, 1, mint.last_id()); + leader_scheduler.push_vote(new_pubkey, 1); + bank.transfer((i + 1) as i64, &mint.keypair(), new_pubkey, last_id) + .unwrap(); } // The scheduled leader during the bootstrapping period (assuming a seed + schedule @@ -689,9 +666,6 @@ mod tests { fn test_active_set() { let leader_id = Keypair::new().pubkey(); let active_window_length = 1000; - let mint = Mint::new(10000); - let bank = Bank::new(&mint); - let leader_scheduler_config = LeaderSchedulerConfig::new( leader_id, Some(100), @@ -707,60 +681,40 @@ mod tests { let num_old_ids = 20; let mut old_ids = HashSet::new(); for _ in 0..num_old_ids { - let new_keypair = Keypair::new(); - let pk = new_keypair.pubkey(); - old_ids.insert(pk.clone()); - - // Give the account some stake - bank.transfer(5, &mint.keypair(), pk, mint.last_id()) - .unwrap(); - - // Create a vote account - let new_vote_account = - create_vote_account(&new_keypair, &bank, 1, mint.last_id()).unwrap(); - - // Push a vote for the account - push_vote(&new_vote_account, &bank, start_height, mint.last_id()); + let pk = Keypair::new().pubkey(); + old_ids.insert(pk); + leader_scheduler.push_vote(pk, start_height); } // Insert a bunch of votes at height "start_height + active_window_length" let num_new_ids = 10; let mut new_ids = HashSet::new(); for _ in 0..num_new_ids { - let new_keypair = Keypair::new(); - let pk = new_keypair.pubkey(); + let pk = Keypair::new().pubkey(); new_ids.insert(pk); - // Give the account some stake - bank.transfer(5, &mint.keypair(), pk, mint.last_id()) - .unwrap(); - - // Create a vote account - let new_vote_account = - create_vote_account(&new_keypair, &bank, 1, mint.last_id()).unwrap(); - - push_vote( - &new_vote_account, - &bank, - start_height + active_window_length, - mint.last_id(), - ); + leader_scheduler.push_vote(pk, start_height + active_window_length); } // Queries for the active set - let result = - leader_scheduler.get_active_set(active_window_length + start_height - 1, &bank); - assert_eq!(result, old_ids); + let result = leader_scheduler.get_active_set(active_window_length + start_height - 1); + assert_eq!(result.len(), num_old_ids); + let result_set = to_hashset_owned(&result); + assert_eq!(result_set, old_ids); - let result = leader_scheduler.get_active_set(active_window_length + start_height, &bank); - assert_eq!(result, new_ids); + let result = leader_scheduler.get_active_set(active_window_length + start_height); + assert_eq!(result.len(), num_new_ids); + let result_set = to_hashset_owned(&result); + assert_eq!(result_set, new_ids); - let result = - leader_scheduler.get_active_set(2 * active_window_length + start_height - 1, &bank); - assert_eq!(result, new_ids); + let result = leader_scheduler.get_active_set(2 * active_window_length + start_height - 1); + assert_eq!(result.len(), num_new_ids); + let result_set = to_hashset_owned(&result); + assert_eq!(result_set, new_ids); - let result = - leader_scheduler.get_active_set(2 * active_window_length + start_height, &bank); - assert!(result.is_empty()); + let result = leader_scheduler.get_active_set(2 * active_window_length + start_height); + assert_eq!(result.len(), 0); + let result_set = to_hashset_owned(&result); + assert!(result_set.is_empty()); } #[test] @@ -800,7 +754,7 @@ mod tests { } let validators_pk: Vec = validators.iter().map(Keypair::pubkey).collect(); - let result = LeaderScheduler::rank_active_set(&bank, validators_pk.iter()); + let result = LeaderScheduler::rank_active_set(&bank, &validators_pk[..]); assert_eq!(result.len(), validators.len()); @@ -830,7 +784,7 @@ mod tests { .chain(new_validators.iter()) .map(Keypair::pubkey) .collect(); - let result = LeaderScheduler::rank_active_set(&bank, all_validators.iter()); + let result = LeaderScheduler::rank_active_set(&bank, &all_validators[..]); assert_eq!(result.len(), new_validators.len()); for (i, (pk, balance)) in result.into_iter().enumerate() { @@ -856,7 +810,7 @@ mod tests { .unwrap(); } - let result = LeaderScheduler::rank_active_set(&bank, tied_validators_pk.iter()); + let result = LeaderScheduler::rank_active_set(&bank, &tied_validators_pk[..]); let mut sorted: Vec<&Pubkey> = tied_validators_pk.iter().map(|x| x).collect(); sorted.sort_by(|pk1, pk2| pk1.cmp(pk2)); assert_eq!(result.len(), tied_validators_pk.len()); @@ -968,7 +922,6 @@ mod tests { #[test] fn test_scheduler_active_window() { let num_validators = 10; - let num_vote_account_tokens = 1; // Set up the LeaderScheduler struct let bootstrap_leader_id = Keypair::new().pubkey(); let bootstrap_height = 500; @@ -990,10 +943,7 @@ mod tests { let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); // Create the bank and validators - let mint = Mint::new( - ((((num_validators + 1) / 2) * (num_validators + 1)) - + (num_vote_account_tokens * num_validators)) as i64, - ); + let mint = Mint::new((((num_validators + 1) / 2) * (num_validators + 1)) as i64); let bank = Bank::new(&mint); let mut validators = vec![]; let last_id = mint @@ -1005,29 +955,10 @@ mod tests { let new_validator = Keypair::new(); let new_pubkey = new_validator.pubkey(); validators.push(new_pubkey); - // Give the validator some tokens - bank.transfer( - (i + 1 + num_vote_account_tokens) as i64, - &mint.keypair(), - new_pubkey, - last_id, - ).unwrap(); - - // Create a vote account - let new_vote_account = create_vote_account( - &new_validator, - &bank, - num_vote_account_tokens as i64, - mint.last_id(), - ).unwrap(); - // Vote at height i * active_window_length for validator i - push_vote( - &new_vote_account, - &bank, - i * active_window_length + bootstrap_height, - mint.last_id(), - ); + leader_scheduler.push_vote(new_pubkey, i * active_window_length + bootstrap_height); + bank.transfer((i + 1) as i64, &mint.keypair(), new_pubkey, last_id) + .unwrap(); } // Generate schedule every active_window_length entries and check that @@ -1048,12 +979,8 @@ mod tests { #[test] fn test_multiple_vote() { - let leader_keypair = Keypair::new(); - let leader_id = leader_keypair.pubkey(); + let leader_id = Keypair::new().pubkey(); let active_window_length = 1000; - let mint = Mint::new(10000); - let bank = Bank::new(&mint); - let leader_scheduler_config = LeaderSchedulerConfig::new( leader_id, Some(100), @@ -1064,38 +991,18 @@ mod tests { let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); - // Give the node some tokens - bank.transfer(5, &mint.keypair(), leader_id, bank.last_id()) - .unwrap(); - - // Check that a node that votes twice in a row will get included in the active + // Check that a validator that votes twice in a row will get included in the active // window let initial_vote_height = 1; - // Create a vote account - let new_vote_account = - create_vote_account(&leader_keypair, &bank, 1, mint.last_id()).unwrap(); - // Vote twice - push_vote( - &new_vote_account, - &bank, - initial_vote_height, - mint.last_id(), - ); - push_vote( - &new_vote_account, - &bank, - initial_vote_height + 1, - mint.last_id(), - ); - + leader_scheduler.push_vote(leader_id, initial_vote_height); + leader_scheduler.push_vote(leader_id, initial_vote_height + 1); + let result = leader_scheduler.get_active_set(initial_vote_height + active_window_length); + assert_eq!(result, vec![leader_id]); let result = - leader_scheduler.get_active_set(initial_vote_height + active_window_length, &bank); - assert_eq!(result, to_hashset_owned(&vec![leader_id])); - let result = - leader_scheduler.get_active_set(initial_vote_height + active_window_length + 1, &bank); - assert!(result.is_empty()); + leader_scheduler.get_active_set(initial_vote_height + active_window_length + 1); + assert_eq!(result, vec![]); } #[test] @@ -1156,6 +1063,13 @@ mod tests { DEFAULT_SEED_ROTATION_INTERVAL ); + // Check defaults for ActiveValidators + let active_validators = ActiveValidators::new(None); + assert_eq!( + active_validators.active_window_length, + DEFAULT_ACTIVE_WINDOW_LENGTH + ); + // Check actual arguments for LeaderScheduler let bootstrap_height = 500; let leader_rotation_interval = 100; @@ -1182,11 +1096,14 @@ mod tests { leader_scheduler.seed_rotation_interval, seed_rotation_interval ); + + // Check actual arguments for ActiveValidators + let active_validators = ActiveValidators::new(Some(active_window_length)); + assert_eq!(active_validators.active_window_length, active_window_length); } fn run_consecutive_leader_test(num_slots_per_epoch: u64, add_validator: bool) { - let bootstrap_leader_keypair = Keypair::new(); - let bootstrap_leader_id = bootstrap_leader_keypair.pubkey(); + let bootstrap_leader_id = Keypair::new().pubkey(); let bootstrap_height = 500; let leader_rotation_interval = 100; let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval; @@ -1213,20 +1130,11 @@ mod tests { let initial_vote_height = 1; // Create and add validator to the active set - let validator_keypair = Keypair::new(); - let validator_id = validator_keypair.pubkey(); + let validator_id = Keypair::new().pubkey(); if add_validator { - bank.transfer(5, &mint.keypair(), validator_id, last_id) + leader_scheduler.push_vote(validator_id, initial_vote_height); + bank.transfer(1, &mint.keypair(), validator_id, last_id) .unwrap(); - // Create a vote account - let new_vote_account = - create_vote_account(&validator_keypair, &bank, 1, mint.last_id()).unwrap(); - push_vote( - &new_vote_account, - &bank, - initial_vote_height, - mint.last_id(), - ); } // Make sure the bootstrap leader, not the validator, is picked again on next slot @@ -1243,29 +1151,10 @@ mod tests { } }; - let vote_account_tokens = 1; - bank.transfer( - leader_stake + vote_account_tokens, - &mint.keypair(), - bootstrap_leader_id, - last_id, - ).unwrap(); - - // Create a vote account - let new_vote_account = create_vote_account( - &bootstrap_leader_keypair, - &bank, - vote_account_tokens, - mint.last_id(), - ).unwrap(); - // Add leader to the active set - push_vote( - &new_vote_account, - &bank, - initial_vote_height, - mint.last_id(), - ); + leader_scheduler.push_vote(bootstrap_leader_id, initial_vote_height); + bank.transfer(leader_stake, &mint.keypair(), bootstrap_leader_id, last_id) + .unwrap(); leader_scheduler.generate_schedule(bootstrap_height, &bank); @@ -1293,8 +1182,7 @@ mod tests { #[test] fn test_max_height_for_leader() { - let bootstrap_leader_keypair = Keypair::new(); - let bootstrap_leader_id = bootstrap_leader_keypair.pubkey(); + let bootstrap_leader_id = Keypair::new().pubkey(); let bootstrap_height = 500; let leader_rotation_interval = 100; let seed_rotation_interval = 2 * leader_rotation_interval; @@ -1366,34 +1254,15 @@ mod tests { // Now test when the active set > 1 node // Create and add validator to the active set - let validator_keypair = Keypair::new(); - let validator_id = validator_keypair.pubkey(); - - // Create a vote account for the validator - bank.transfer(5, &mint.keypair(), validator_id, last_id) + let validator_id = Keypair::new().pubkey(); + leader_scheduler.push_vote(validator_id, initial_vote_height); + bank.transfer(1, &mint.keypair(), validator_id, last_id) .unwrap(); - let new_validator_vote_account = - create_vote_account(&validator_keypair, &bank, 1, mint.last_id()).unwrap(); - push_vote( - &new_validator_vote_account, - &bank, - initial_vote_height, - mint.last_id(), - ); - - // Create a vote account for the leader - bank.transfer(5, &mint.keypair(), bootstrap_leader_id, last_id) - .unwrap(); - let new_leader_vote_account = - create_vote_account(&bootstrap_leader_keypair, &bank, 1, mint.last_id()).unwrap(); // Add leader to the active set - push_vote( - &new_leader_vote_account, - &bank, - initial_vote_height, - mint.last_id(), - ); + leader_scheduler.push_vote(bootstrap_leader_id, initial_vote_height); + bank.transfer(1, &mint.keypair(), bootstrap_leader_id, last_id) + .unwrap(); // Generate the schedule leader_scheduler.generate_schedule(bootstrap_height, &bank); diff --git a/src/leader_vote_stage.rs b/src/leader_vote_stage.rs new file mode 100644 index 0000000000..7eae6b273c --- /dev/null +++ b/src/leader_vote_stage.rs @@ -0,0 +1,159 @@ +//! The `leader_vote_stage` module implements the TPU's vote stage. It +//! computes and notes the votes for the entries, and then sends the +//! Entry to its output channel. + +use bank::Bank; +use cluster_info::ClusterInfo; +use counter::Counter; +use entry::Entry; +use ledger::Block; +use log::Level; +use result::{Error, Result}; +use service::Service; +use signature::Keypair; +use std::net::UdpSocket; +use std::sync::atomic::AtomicUsize; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; +use std::sync::{Arc, RwLock}; +use std::thread::{self, Builder, JoinHandle}; +use std::time::{Duration, Instant}; +use streamer::responder; +use timing::duration_as_ms; +use vote_stage::send_leader_vote; + +pub struct LeaderVoteStage { + thread_hdls: Vec>, + vote_thread: JoinHandle<()>, +} + +impl LeaderVoteStage { + /// Process any Entry items that have been published by the RecordStage. + /// continuosly send entries out + pub fn compute_vote_and_send_entries( + cluster_info: &Arc>, + entry_sender: &Sender>, + entry_receiver: &Receiver>, + ) -> Result<()> { + let mut ventries = Vec::new(); + let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; + let now = Instant::now(); + let mut num_new_entries = 0; + + loop { + num_new_entries += received_entries.len(); + ventries.push(received_entries); + + if let Ok(n) = entry_receiver.try_recv() { + received_entries = n; + } else { + break; + } + } + inc_new_counter_info!("leader_vote_stage-entries_received", num_new_entries); + debug!("leader_vote_stage entries: {}", num_new_entries); + + for entries in ventries { + let votes = &entries.votes(); + cluster_info.write().unwrap().insert_votes(&votes); + + inc_new_counter_info!("leader_vote_stage-write_entries", entries.len()); + + //TODO(anatoly): real stake based voting needs to change this + //leader simply votes if the current set of validators have voted + //on a valid last id + + trace!("New entries? {}", entries.len()); + if !entries.is_empty() { + inc_new_counter_info!("leader_vote_stage-recv_vote", votes.len()); + inc_new_counter_info!("leader_vote_stage-entries_sent", entries.len()); + trace!("broadcasting {}", entries.len()); + entry_sender.send(entries)?; + } + } + inc_new_counter_info!( + "leader_vote_stage-time_ms", + duration_as_ms(&now.elapsed()) as usize + ); + + Ok(()) + } + + /// Create a new LeaderVoteStage for voting and broadcasting entries. + pub fn new( + keypair: Arc, + bank: Arc, + cluster_info: Arc>, + entry_receiver: Receiver>, + ) -> (Self, Receiver>) { + let (vote_blob_sender, vote_blob_receiver) = channel(); + let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); + let t_responder = responder( + "leader_vote_stage_vote_sender", + Arc::new(send), + vote_blob_receiver, + ); + let (entry_sender, entry_receiver_forward) = channel(); + + let vote_thread = Builder::new() + .name("solana-writer".to_string()) + .spawn(move || { + let mut last_vote = 0; + let mut last_valid_validator_timestamp = 0; + let id = cluster_info.read().unwrap().id; + loop { + if let Err(e) = Self::compute_vote_and_send_entries( + &cluster_info, + &entry_sender, + &entry_receiver, + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { + break; + } + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => { + inc_new_counter_info!( + "leader_vote_stage-compute_vote_and_send_entries-error", + 1 + ); + error!("{:?}", e); + } + } + }; + if let Err(e) = send_leader_vote( + &id, + &keypair, + &bank, + &cluster_info, + &vote_blob_sender, + &mut last_vote, + &mut last_valid_validator_timestamp, + ) { + inc_new_counter_info!("leader_vote_stage-leader_vote-error", 1); + error!("{:?}", e); + } + } + }).unwrap(); + + let thread_hdls = vec![t_responder]; + ( + LeaderVoteStage { + vote_thread, + thread_hdls, + }, + entry_receiver_forward, + ) + } +} + +impl Service for LeaderVoteStage { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls { + thread_hdl.join()?; + } + + self.vote_thread.join() + } +} diff --git a/src/ledger.rs b/src/ledger.rs index f8f0aeee5c..0e11dd7dc2 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -3,7 +3,7 @@ //! access read to a persistent file-based ledger. use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_size}; -#[cfg(test)] +use budget_instruction::Vote; use budget_transaction::BudgetTransaction; #[cfg(test)] use chrono::prelude::Utc; @@ -25,8 +25,6 @@ use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::Path; use transaction::Transaction; -use vote_program::Vote; -use vote_transaction::VoteTransaction; use window::WINDOW_SIZE; // @@ -498,7 +496,7 @@ impl Block for [Entry] { entry .transactions .iter() - .flat_map(VoteTransaction::get_votes) + .filter_map(BudgetTransaction::vote) }).collect() } } @@ -686,6 +684,7 @@ pub fn make_tiny_test_entries(num: usize) -> Vec { mod tests { use super::*; use bincode::serialized_size; + use budget_instruction::Vote; use budget_transaction::BudgetTransaction; use entry::{next_entry, Entry}; use hash::hash; @@ -694,7 +693,6 @@ mod tests { use std; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use transaction::Transaction; - use vote_program::Vote; #[test] fn test_verify_slice() { @@ -716,8 +714,15 @@ mod tests { let zero = Hash::default(); let one = hash(&zero.as_ref()); let keypair = Keypair::new(); - let vote_account = Keypair::new(); - let tx0 = Transaction::vote_new(&vote_account, Vote { tick_height: 1 }, one, 1); + let tx0 = Transaction::budget_new_vote( + &keypair, + Vote { + version: 0, + contact_info_version: 1, + }, + one, + 1, + ); let tx1 = Transaction::budget_new_timestamp( &keypair, keypair.pubkey(), @@ -767,8 +772,15 @@ mod tests { let id = Hash::default(); let next_id = hash(&id.as_ref()); let keypair = Keypair::new(); - let vote_account = Keypair::new(); - let tx_small = Transaction::vote_new(&vote_account, Vote { tick_height: 1 }, next_id, 2); + let tx_small = Transaction::budget_new_vote( + &keypair, + Vote { + version: 0, + contact_info_version: 2, + }, + next_id, + 2, + ); let tx_large = Transaction::budget_new(&keypair, keypair.pubkey(), 1, next_id); let tx_small_size = serialized_size(&tx_small).unwrap() as usize; diff --git a/src/lib.rs b/src/lib.rs index 1f9e9024bb..5a4b20c238 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,7 @@ pub mod fetch_stage; pub mod fullnode; pub mod hash; pub mod leader_scheduler; +pub mod leader_vote_stage; pub mod ledger; pub mod ledger_write_stage; pub mod loader_transaction; @@ -79,9 +80,7 @@ pub mod token_program; pub mod tpu; pub mod transaction; pub mod tvu; -pub mod vote_program; pub mod vote_stage; -pub mod vote_transaction; pub mod wallet; pub mod window; pub mod window_service; diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 791e6289cc..2a758043c9 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -6,6 +6,8 @@ use counter::Counter; use entry::{EntryReceiver, EntrySender}; use hash::Hash; use influx_db_client as influxdb; +use leader_scheduler::LeaderScheduler; +use ledger::Block; use log::Level; use metrics; use result::{Error, Result}; @@ -57,10 +59,11 @@ impl ReplicateStage { cluster_info: &Arc>, window_receiver: &EntryReceiver, keypair: &Arc, - vote_account_keypair: &Arc, vote_blob_sender: Option<&BlobSender>, ledger_entry_sender: &EntrySender, + tick_height: &mut u64, entry_height: &mut u64, + leader_scheduler: &Arc>, ) -> Result { let timer = Duration::new(1, 0); //coalesce all the available entries into a single vote @@ -78,23 +81,37 @@ impl ReplicateStage { let mut res = Ok(()); let last_entry_id = { let mut num_entries_to_write = entries.len(); - let current_leader = bank - .get_current_leader() - .expect("Scheduled leader id should never be unknown while processing entries"); for (i, entry) in entries.iter().enumerate() { - res = bank.process_entry(&entry); - let my_id = keypair.pubkey(); - let scheduled_leader = bank - .get_current_leader() - .expect("Scheduled leader id should never be unknown while processing entries"); + // max_tick_height is the PoH height at which the next leader rotation will + // happen. The leader should send an entry such that the total PoH is equal + // to max_tick_height - guard. + // TODO: Introduce a "guard" for the end of transmission periods, the guard + // is assumed to be zero for now. + let max_tick_height = { + let ls_lock = leader_scheduler.read().unwrap(); + ls_lock.max_height_for_leader(*tick_height) + }; - // TODO: Remove this soon once we boot the leader from ClusterInfo - if scheduled_leader != current_leader { - cluster_info.write().unwrap().set_leader(scheduled_leader); - } - if my_id == scheduled_leader { - num_entries_to_write = i + 1; - break; + res = bank.process_entry( + &entry, + tick_height, + &mut *leader_scheduler.write().unwrap(), + ); + + // Will run only if leader_scheduler.use_only_bootstrap_leader is false + if let Some(max_tick_height) = max_tick_height { + let ls_lock = leader_scheduler.read().unwrap(); + if *tick_height == max_tick_height { + let my_id = keypair.pubkey(); + let scheduled_leader = ls_lock.get_scheduled_leader(*tick_height).expect( + "Scheduled leader id should never be unknown while processing entries", + ); + cluster_info.write().unwrap().set_leader(scheduled_leader); + if my_id == scheduled_leader { + num_entries_to_write = i + 1; + break; + } + } } if res.is_err() { @@ -117,9 +134,11 @@ impl ReplicateStage { }; if let Some(sender) = vote_blob_sender { - send_validator_vote(bank, vote_account_keypair, &cluster_info, sender)?; + send_validator_vote(bank, keypair, cluster_info, sender)?; } + cluster_info.write().unwrap().insert_votes(&entries.votes()); + inc_new_counter_info!( "replicate-transactions", entries.iter().map(|x| x.transactions.len()).sum() @@ -141,12 +160,13 @@ impl ReplicateStage { pub fn new( keypair: Arc, - vote_account_keypair: Arc, bank: Arc, cluster_info: Arc>, window_receiver: EntryReceiver, exit: Arc, + tick_height: u64, entry_height: u64, + leader_scheduler: Arc>, ) -> (Self, EntryReceiver) { let (vote_blob_sender, vote_blob_receiver) = channel(); let (ledger_entry_sender, ledger_entry_receiver) = channel(); @@ -162,15 +182,17 @@ impl ReplicateStage { let now = Instant::now(); let mut next_vote_secs = 1; let mut entry_height_ = entry_height; + let mut tick_height_ = tick_height; let mut last_entry_id = None; loop { - let leader_id = - bank.get_current_leader() - .expect("Scheduled leader id should never be unknown at this point"); - + let leader_id = leader_scheduler + .read() + .unwrap() + .get_scheduled_leader(tick_height_) + .expect("Scheduled leader id should never be unknown at this point"); if leader_id == keypair.pubkey() { return Some(ReplicateStageReturnType::LeaderRotation( - bank.get_tick_height(), + tick_height_, entry_height_, // We should never start the TPU / this stage on an exact entry that causes leader // rotation (Fullnode should automatically transition on startup if it detects @@ -193,10 +215,11 @@ impl ReplicateStage { &cluster_info, &window_receiver, &keypair, - &vote_account_keypair, vote_sender, &ledger_entry_sender, + &mut tick_height_, &mut entry_height_, + &leader_scheduler, ) { Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break, Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), @@ -271,7 +294,7 @@ mod test { // 1) Give the validator a nonzero number of tokens 2) A vote from the validator . // This will cause leader rotation after the bootstrap height let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap(); - let (active_set_entries, vote_account_keypair) = + let active_set_entries = make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id, 0); last_id = active_set_entries.last().unwrap().id; let initial_tick_height = genesis_entries @@ -296,23 +319,26 @@ mod test { Some(bootstrap_height), ); - let leader_scheduler = - Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))); + let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); // Set up the bank - let (bank, _, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); + let (bank, _, _, _) = + Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler); + + let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); // Set up the replicate stage let (entry_sender, entry_receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); let (replicate_stage, _ledger_writer_recv) = ReplicateStage::new( Arc::new(my_keypair), - Arc::new(vote_account_keypair), Arc::new(bank), Arc::new(RwLock::new(cluster_info_me)), entry_receiver, exit.clone(), + initial_tick_height, initial_entry_len, + leader_scheduler.clone(), ); // Send enough ticks to trigger leader rotation @@ -349,6 +375,13 @@ mod test { assert_eq!(exit.load(Ordering::Relaxed), true); + // Check ledger height is correct + let mut leader_scheduler = Arc::try_unwrap(leader_scheduler) + .expect("Multiple references to this RwLock still exist") + .into_inner() + .expect("RwLock for LeaderScheduler is still locked"); + + leader_scheduler.reset(); let _ignored = remove_dir_all(&my_ledger_path); } } diff --git a/src/replicator.rs b/src/replicator.rs index ac53ce737c..fa51dd3a15 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -198,16 +198,14 @@ mod tests { let (mint, leader_ledger_path) = create_tmp_genesis(leader_ledger_path, 100); info!("starting leader node"); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let network_addr = leader_node.sockets.gossip.local_addr().unwrap(); let leader_info = leader_node.info.clone(); - let vote_account_keypair = Arc::new(Keypair::new()); let leader = Fullnode::new( leader_node, &leader_ledger_path, leader_keypair, - vote_account_keypair, None, false, LeaderScheduler::from_bootstrap_leader(leader_info.id), diff --git a/src/result.rs b/src/result.rs index bb63cae72a..c66d236596 100644 --- a/src/result.rs +++ b/src/result.rs @@ -10,7 +10,6 @@ use poh_recorder; use serde_json; use std; use std::any::Any; -use vote_stage; #[derive(Debug)] pub enum Error { @@ -28,7 +27,6 @@ pub enum Error { ErasureError(erasure::ErasureError), SendError, PohRecorderError(poh_recorder::PohRecorderError), - VoteError(vote_stage::VoteError), } pub type Result = std::result::Result; @@ -102,11 +100,6 @@ impl std::convert::From for Error { Error::PohRecorderError(e) } } -impl std::convert::From for Error { - fn from(e: vote_stage::VoteError) -> Error { - Error::VoteError(e) - } -} #[cfg(test)] mod tests { diff --git a/src/rpc.rs b/src/rpc.rs index 7ba02da438..c898bbec4b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -28,7 +28,6 @@ pub const RPC_PORT: u16 = 8899; pub struct JsonRpcService { thread_hdl: JoinHandle<()>, - exit: Arc, } impl JsonRpcService { @@ -36,12 +35,11 @@ impl JsonRpcService { bank: &Arc, cluster_info: &Arc>, rpc_addr: SocketAddr, + exit: Arc, ) -> Self { - let exit = Arc::new(AtomicBool::new(false)); let request_processor = JsonRpcRequestProcessor::new(bank.clone()); let info = cluster_info.clone(); let exit_pubsub = exit.clone(); - let exit_ = exit.clone(); let thread_hdl = Builder::new() .name("solana-jsonrpc".to_string()) .spawn(move || { @@ -64,23 +62,14 @@ impl JsonRpcService { warn!("JSON RPC service unavailable: unable to bind to RPC port {}. \nMake sure this port is not already in use by another application", rpc_addr.port()); return; } - while !exit_.load(Ordering::Relaxed) { + while !exit.load(Ordering::Relaxed) { sleep(Duration::from_millis(100)); } server.unwrap().close(); () }) .unwrap(); - JsonRpcService { thread_hdl, exit } - } - - pub fn exit(&self) { - self.exit.store(true, Ordering::Relaxed); - } - - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() + JsonRpcService { thread_hdl } } } @@ -390,7 +379,8 @@ mod tests { ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(), )); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 24680); - let rpc_service = JsonRpcService::new(&Arc::new(bank), &cluster_info, rpc_addr); + let exit = Arc::new(AtomicBool::new(false)); + let rpc_service = JsonRpcService::new(&Arc::new(bank), &cluster_info, rpc_addr, exit); let thread = rpc_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); @@ -596,11 +586,11 @@ mod tests { #[test] fn test_rpc_send_tx() { - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000_000); - let mut bank = Bank::new(&alice); + let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let leader_data = leader.info.clone(); let ledger_path = create_tmp_ledger_with_mint("rpc_send_tx", &alice); @@ -612,16 +602,8 @@ mod tests { let genesis_entries = &alice.create_entries(); let entry_height = genesis_entries.len() as u64; - - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_data.id, - ))); - bank.leader_scheduler = leader_scheduler; - - let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, - vote_account_keypair, bank, 0, entry_height, @@ -630,6 +612,7 @@ mod tests { None, &ledger_path, false, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index 4a920e1b1e..f5de442e9e 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -27,7 +27,6 @@ pub enum ClientState { pub struct PubSubService { thread_hdl: JoinHandle<()>, - exit: Arc, } impl Service for PubSubService { @@ -39,10 +38,8 @@ impl Service for PubSubService { } impl PubSubService { - pub fn new(bank: &Arc, pubsub_addr: SocketAddr) -> Self { + pub fn new(bank: &Arc, pubsub_addr: SocketAddr, exit: Arc) -> Self { let rpc = RpcSolPubSubImpl::new(JsonRpcRequestProcessor::new(bank.clone()), bank.clone()); - let exit = Arc::new(AtomicBool::new(false)); - let exit_ = exit.clone(); let thread_hdl = Builder::new() .name("solana-pubsub".to_string()) .spawn(move || { @@ -63,23 +60,14 @@ impl PubSubService { warn!("Pubsub service unavailable: unable to bind to port {}. \nMake sure this port is not already in use by another application", pubsub_addr.port()); return; } - while !exit_.load(Ordering::Relaxed) { + while !exit.load(Ordering::Relaxed) { sleep(Duration::from_millis(100)); } server.unwrap().close(); () }) .unwrap(); - PubSubService { thread_hdl, exit } - } - - pub fn exit(&self) { - self.exit.store(true, Ordering::Relaxed); - } - - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() + PubSubService { thread_hdl } } } @@ -261,7 +249,8 @@ mod tests { let alice = Mint::new(10_000); let bank = Bank::new(&alice); let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); - let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr); + let exit = Arc::new(AtomicBool::new(false)); + let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr, exit); let thread = pubsub_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-pubsub"); } diff --git a/src/thin_client.rs b/src/thin_client.rs index 4feb155de5..fa8f1b8294 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -26,7 +26,6 @@ use std::time::Instant; use system_transaction::SystemTransaction; use timing; use transaction::Transaction; -use vote_transaction::VoteTransaction; use influx_db_client as influxdb; use metrics; @@ -149,29 +148,6 @@ impl ThinClient { )) } - pub fn create_vote_account( - &self, - node_keypair: &Keypair, - vote_account_id: Pubkey, - last_id: &Hash, - num_tokens: i64, - ) -> io::Result { - let tx = - Transaction::vote_account_new(&node_keypair, vote_account_id, *last_id, num_tokens); - self.transfer_signed(&tx) - } - - /// Creates, signs, and processes a Transaction. Useful for writing unit-tests. - pub fn register_vote_account( - &self, - node_keypair: &Keypair, - vote_account_id: Pubkey, - last_id: &Hash, - ) -> io::Result { - let tx = Transaction::vote_account_register(node_keypair, vote_account_id, *last_id, 0); - self.transfer_signed(&tx) - } - /// Creates, signs, and processes a Transaction. Useful for writing unit-tests. pub fn transfer( &self, @@ -194,24 +170,6 @@ impl ThinClient { result } - pub fn get_account_userdata(&mut self, pubkey: &Pubkey) -> io::Result>> { - let req = Request::GetAccount { key: *pubkey }; - let data = serialize(&req).expect("serialize GetAccount in pub fn get_account_userdata"); - self.requests_socket - .send_to(&data, &self.requests_addr) - .expect("buffer error in pub fn get_account_userdata"); - - loop { - let resp = self.recv_response()?; - trace!("recv_response {:?}", resp); - if let Response::Account { key, account } = resp { - if key == *pubkey { - return Ok(account.map(|account| account.userdata)); - } - } - } - } - /// Request the balance of the user holding `pubkey`. This method blocks /// until the server sends a response. If the response packet is dropped /// by the network, this method will hang indefinitely. @@ -488,23 +446,17 @@ mod tests { #[ignore] fn test_thin_client() { logger::setup(); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let alice = Mint::new(10_000); - let mut bank = Bank::new(&alice); + let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let ledger_path = create_tmp_ledger_with_mint("thin_client", &alice); - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_data.id, - ))); - bank.leader_scheduler = leader_scheduler; - let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, - vote_account_keypair, bank, 0, 0, @@ -513,6 +465,7 @@ mod tests { None, &ledger_path, false, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); sleep(Duration::from_millis(900)); @@ -542,22 +495,16 @@ mod tests { #[ignore] fn test_bad_sig() { logger::setup(); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000); - let mut bank = Bank::new(&alice); + let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let leader_data = leader.info.clone(); let ledger_path = create_tmp_ledger_with_mint("bad_sig", &alice); - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_data.id, - ))); - bank.leader_scheduler = leader_scheduler; - let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, - vote_account_keypair, bank, 0, 0, @@ -566,6 +513,7 @@ mod tests { None, &ledger_path, false, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); //TODO: remove this sleep, or add a retry so CI is stable @@ -608,25 +556,18 @@ mod tests { #[test] fn test_client_check_signature() { logger::setup(); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000); - let mut bank = Bank::new(&alice); + let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let leader_data = leader.info.clone(); let ledger_path = create_tmp_ledger_with_mint("client_check_signature", &alice); let genesis_entries = &alice.create_entries(); let entry_height = genesis_entries.len() as u64; - - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_data.id, - ))); - bank.leader_scheduler = leader_scheduler; - let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, - vote_account_keypair, bank, 0, entry_height, @@ -635,6 +576,7 @@ mod tests { None, &ledger_path, false, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); sleep(Duration::from_millis(300)); @@ -678,25 +620,18 @@ mod tests { #[test] fn test_zero_balance_after_nonzero() { logger::setup(); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000); - let mut bank = Bank::new(&alice); + let bank = Bank::new(&alice); let bob_keypair = Keypair::new(); let leader_data = leader.info.clone(); let ledger_path = create_tmp_ledger_with_mint("zero_balance_check", &alice); let genesis_entries = &alice.create_entries(); let entry_height = genesis_entries.len() as u64; - - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_data.id, - ))); - bank.leader_scheduler = leader_scheduler; - let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, - vote_account_keypair, bank, 0, entry_height, @@ -705,6 +640,7 @@ mod tests { None, &ledger_path, false, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); sleep(Duration::from_millis(900)); diff --git a/src/tpu.rs b/src/tpu.rs index 531be39591..66fe9a902d 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -27,18 +27,21 @@ use bank::Bank; use banking_stage::{BankingStage, BankingStageReturnType}; +use cluster_info::ClusterInfo; use entry::Entry; use fetch_stage::FetchStage; use hash::Hash; +use leader_vote_stage::LeaderVoteStage; use ledger_write_stage::LedgerWriteStage; use poh_service::Config; use service::Service; +use signature::Keypair; use sigverify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::mpsc::Receiver; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::thread; pub enum TpuReturnType { @@ -49,6 +52,7 @@ pub struct Tpu { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, banking_stage: BankingStage, + leader_vote_stage: LeaderVoteStage, ledger_write_stage: LedgerWriteStage, exit: Arc, } @@ -56,7 +60,9 @@ pub struct Tpu { impl Tpu { #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn new( + keypair: Arc, bank: &Arc, + cluster_info: &Arc>, tick_duration: Config, transactions_sockets: Vec, ledger_path: &str, @@ -81,21 +87,28 @@ impl Tpu { max_tick_height, ); + let (leader_vote_stage, ledger_entry_receiver) = + LeaderVoteStage::new(keypair, bank.clone(), cluster_info.clone(), entry_receiver); + let (ledger_entry_sender, entry_forwarder) = channel(); - let ledger_write_stage = - LedgerWriteStage::new(Some(ledger_path), entry_receiver, Some(ledger_entry_sender)); + let ledger_write_stage = LedgerWriteStage::new( + Some(ledger_path), + ledger_entry_receiver, + Some(ledger_entry_sender), + ); let tpu = Tpu { fetch_stage, sigverify_stage, banking_stage, + leader_vote_stage, ledger_write_stage, exit: exit.clone(), }; (tpu, entry_forwarder, exit) } - pub fn exit(&self) { + pub fn exit(&self) -> () { self.exit.store(true, Ordering::Relaxed); } @@ -115,6 +128,7 @@ impl Service for Tpu { fn join(self) -> thread::Result<(Option)> { self.fetch_stage.join()?; self.sigverify_stage.join()?; + self.leader_vote_stage.join()?; self.ledger_write_stage.join()?; match self.banking_stage.join()? { Some(BankingStageReturnType::LeaderRotation) => Ok(Some(TpuReturnType::LeaderRotation)), diff --git a/src/tvu.rs b/src/tvu.rs index d13b2ed70a..950ccca04e 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -40,6 +40,7 @@ use bank::Bank; use blob_fetch_stage::BlobFetchStage; use cluster_info::ClusterInfo; use hash::Hash; +use leader_scheduler::LeaderScheduler; use ledger_write_stage::LedgerWriteStage; use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; use retransmit_stage::RetransmitStage; @@ -79,8 +80,8 @@ impl Tvu { #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn new( keypair: Arc, - vote_account_keypair: Arc, bank: &Arc, + tick_height: u64, entry_height: u64, cluster_info: Arc>, window: SharedWindow, @@ -88,6 +89,7 @@ impl Tvu { repair_socket: UdpSocket, retransmit_socket: UdpSocket, ledger_path: Option<&str>, + leader_scheduler: Arc>, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); @@ -103,22 +105,23 @@ impl Tvu { let (retransmit_stage, blob_window_receiver) = RetransmitStage::new( &cluster_info, window, - bank.get_tick_height(), + tick_height, entry_height, Arc::new(retransmit_socket), repair_socket, blob_fetch_receiver, - bank.leader_scheduler.clone(), + leader_scheduler.clone(), ); let (replicate_stage, ledger_entry_receiver) = ReplicateStage::new( keypair, - vote_account_keypair, bank.clone(), cluster_info, blob_window_receiver, exit.clone(), + tick_height, entry_height, + leader_scheduler, ); let ledger_write_stage = LedgerWriteStage::new(ledger_path, ledger_entry_receiver, None); @@ -136,7 +139,7 @@ impl Tvu { self.exit.load(Ordering::Relaxed) } - pub fn exit(&self) { + pub fn exit(&self) -> () { self.exit.store(true, Ordering::Relaxed); } @@ -252,12 +255,7 @@ pub mod tests { let starting_balance = 10_000; let mint = Mint::new(starting_balance); let replicate_addr = target1.info.contact_info.tvu; - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_id, - ))); - let mut bank = Bank::new(&mint); - bank.leader_scheduler = leader_scheduler; - let bank = Arc::new(bank); + let bank = Arc::new(Bank::new(&mint)); //start cluster_info1 let mut cluster_info1 = ClusterInfo::new(target1.info.clone()).expect("ClusterInfo::new"); @@ -266,18 +264,20 @@ pub mod tests { let cref1 = Arc::new(RwLock::new(cluster_info1)); let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()); - let vote_account_keypair = Arc::new(Keypair::new()); let tvu = Tvu::new( Arc::new(target1_keypair), - vote_account_keypair, &bank, 0, + 0, cref1, dr_1.1, target1.sockets.replicate, target1.sockets.repair, target1.sockets.retransmit, None, + Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_id, + ))), ); let mut alice_ref_balance = starting_balance; diff --git a/src/vote_program.rs b/src/vote_program.rs deleted file mode 100644 index 40e0881688..0000000000 --- a/src/vote_program.rs +++ /dev/null @@ -1,151 +0,0 @@ -//! Vote program -//! Receive and processes votes from validators - -use bincode::{deserialize, serialize}; -use byteorder::{ByteOrder, LittleEndian}; -use solana_sdk::account::Account; -use solana_sdk::pubkey::Pubkey; -use std; -use std::collections::VecDeque; -use transaction::Transaction; - -// Upper limit on the size of the Vote State -pub const MAX_STATE_SIZE: usize = 1024; - -// Maximum number of votes to keep around -const MAX_VOTE_HISTORY: usize = 32; - -#[derive(Debug, PartialEq)] -pub enum Error { - UserdataDeserializeFailure, - InvalidArguments, - InvalidUserdata, - UserdataTooSmall, -} -impl std::fmt::Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "error") - } -} -pub type Result = std::result::Result; - -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] -pub struct Vote { - // TODO: add signature of the state here as well - /// A vote for height tick_height - pub tick_height: u64, -} - -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] -pub enum VoteInstruction { - /// Register a new "vote account" to represent a particular validator in the Vote Contract, - /// and initialize the VoteState for this "vote account" - /// * Transaction::keys[0] - the validator id - /// * Transaction::keys[1] - the new "vote account" to be associated with the validator - /// identified by keys[0] for voting - RegisterAccount, - NewVote(Vote), -} - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct VoteProgram { - pub votes: VecDeque, - pub node_id: Pubkey, -} - -pub const VOTE_PROGRAM_ID: [u8; 32] = [ - 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -]; - -impl VoteProgram { - pub fn check_id(program_id: &Pubkey) -> bool { - program_id.as_ref() == VOTE_PROGRAM_ID - } - - pub fn id() -> Pubkey { - Pubkey::new(&VOTE_PROGRAM_ID) - } - - pub fn deserialize(input: &[u8]) -> Result { - let len = LittleEndian::read_u16(&input[0..2]) as usize; - - if len == 0 || input.len() < len + 1 { - Err(Error::InvalidUserdata) - } else { - deserialize(&input[2..=len + 1]).map_err(|err| { - error!("Unable to deserialize vote state: {:?}", err); - Error::InvalidUserdata - }) - } - } - - pub fn serialize(self: &VoteProgram, output: &mut [u8]) -> Result<()> { - let self_serialized = serialize(self).unwrap(); - - if output.len() + 2 < self_serialized.len() { - warn!( - "{} bytes required to serialize but only have {} bytes", - self_serialized.len(), - output.len() + 2, - ); - return Err(Error::UserdataTooSmall); - } - - let serialized_len = self_serialized.len() as u16; - LittleEndian::write_u16(&mut output[0..2], serialized_len); - output[2..=serialized_len as usize + 1].clone_from_slice(&self_serialized); - Ok(()) - } - - pub fn process_transaction( - tx: &Transaction, - instruction_index: usize, - accounts: &mut [&mut Account], - ) -> Result<()> { - match deserialize(tx.userdata(instruction_index)) { - Ok(VoteInstruction::RegisterAccount) => { - // TODO: a single validator could register multiple "vote accounts" - // which would clutter the "accounts" structure. - accounts[1].program_id = Self::id(); - - let mut vote_state = VoteProgram { - votes: VecDeque::new(), - node_id: *tx.from(), - }; - - vote_state.serialize(&mut accounts[1].userdata)?; - - Ok(()) - } - Ok(VoteInstruction::NewVote(vote)) => { - if !Self::check_id(&accounts[0].program_id) { - error!("accounts[0] is not assigned to the VOTE_PROGRAM"); - Err(Error::InvalidArguments)?; - } - - let mut vote_state = Self::deserialize(&accounts[0].userdata)?; - - // TODO: Integrity checks - // a) Verify the vote's bank hash matches what is expected - // b) Verify vote is older than previous votes - - // Only keep around the most recent MAX_VOTE_HISTORY votes - if vote_state.votes.len() == MAX_VOTE_HISTORY { - vote_state.votes.pop_front(); - } - - vote_state.votes.push_back(vote); - vote_state.serialize(&mut accounts[0].userdata)?; - - Ok(()) - } - Err(_) => { - info!( - "Invalid vote transaction userdata: {:?}", - tx.userdata(instruction_index) - ); - Err(Error::UserdataDeserializeFailure) - } - } - } -} diff --git a/src/vote_stage.rs b/src/vote_stage.rs index eee722b980..ce07fb11e5 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -2,90 +2,341 @@ use bank::Bank; use bincode::serialize; +use budget_transaction::BudgetTransaction; use cluster_info::ClusterInfo; use counter::Counter; use hash::Hash; +use influx_db_client as influxdb; use log::Level; +use metrics; use packet::SharedBlob; -use result::{Error, Result}; +use result::Result; use signature::Keypair; -use std::net::SocketAddr; +use solana_sdk::pubkey::Pubkey; +use std::result; use std::sync::atomic::AtomicUsize; use std::sync::{Arc, RwLock}; use streamer::BlobSender; +use timing; use transaction::Transaction; -use vote_program::Vote; -use vote_transaction::VoteTransaction; pub const VOTE_TIMEOUT_MS: u64 = 1000; #[derive(Debug, PartialEq, Eq)] -pub enum VoteError { +enum VoteError { NoValidLastIdsToVoteOn, - NoLeader, - LeaderInfoNotFound, } -// TODO: Change voting to be on fixed tick intervals based on bank state pub fn create_new_signed_vote_blob( last_id: &Hash, - vote_account: &Keypair, - bank: &Arc, + keypair: &Keypair, cluster_info: &Arc>, ) -> Result { let shared_blob = SharedBlob::default(); - let tick_height = bank.get_tick_height(); - - let leader_tpu = get_leader_tpu(bank, cluster_info)?; - //TODO: doesn't seem like there is a synchronous call to get height and id - debug!("voting on {:?}", &last_id.as_ref()[..8]); - let vote = Vote { tick_height }; - let tx = Transaction::vote_new(&vote_account, vote, *last_id, 0); + let (vote, addr) = { + let mut wcluster_info = cluster_info.write().unwrap(); + //TODO: doesn't seem like there is a synchronous call to get height and id + debug!("voting on {:?}", &last_id.as_ref()[..8]); + wcluster_info.new_vote(*last_id) + }?; + let tx = Transaction::budget_new_vote(&keypair, vote, *last_id, 0); { let mut blob = shared_blob.write().unwrap(); let bytes = serialize(&tx)?; let len = bytes.len(); blob.data[..len].copy_from_slice(&bytes); - blob.meta.set_addr(&leader_tpu); + blob.meta.set_addr(&addr); blob.meta.size = len; - }; - + } Ok(shared_blob) } -fn get_leader_tpu(bank: &Bank, cluster_info: &Arc>) -> Result { - let leader_id = { - if let Some(leader_id) = bank.get_current_leader() { - leader_id - } else { - return Err(Error::VoteError(VoteError::NoLeader)); - } - }; +fn get_last_id_to_vote_on( + id: &Pubkey, + ids: &[Hash], + bank: &Arc, + now: u64, + last_vote: &mut u64, + last_valid_validator_timestamp: &mut u64, +) -> result::Result<(Hash, u64), VoteError> { + let mut valid_ids = bank.count_valid_ids(&ids); + let super_majority_index = (2 * ids.len()) / 3; - let rcluster_info = cluster_info.read().unwrap(); - let leader_tpu = rcluster_info - .table - .get(&leader_id) - .map(|leader| leader.contact_info.tpu); + //TODO(anatoly): this isn't stake based voting + debug!( + "{}: valid_ids {}/{} {}", + id, + valid_ids.len(), + ids.len(), + super_majority_index, + ); - if let Some(leader_tpu) = leader_tpu { - Ok(leader_tpu) - } else { - Err(Error::VoteError(VoteError::LeaderInfoNotFound)) + metrics::submit( + influxdb::Point::new("vote_stage-peer_count") + .add_field("total_peers", influxdb::Value::Integer(ids.len() as i64)) + .add_field( + "valid_peers", + influxdb::Value::Integer(valid_ids.len() as i64), + ).to_owned(), + ); + + if valid_ids.len() > super_majority_index { + *last_vote = now; + + // Sort by timestamp + valid_ids.sort_by(|a, b| a.1.cmp(&b.1)); + + let last_id = ids[valid_ids[super_majority_index].0]; + return Ok((last_id, valid_ids[super_majority_index].1)); } + + if *last_valid_validator_timestamp != 0 { + metrics::submit( + influxdb::Point::new(&"leader-finality") + .add_field( + "duration_ms", + influxdb::Value::Integer((now - *last_valid_validator_timestamp) as i64), + ).to_owned(), + ); + } + + Err(VoteError::NoValidLastIdsToVoteOn) +} + +pub fn send_leader_vote( + id: &Pubkey, + keypair: &Keypair, + bank: &Arc, + cluster_info: &Arc>, + vote_blob_sender: &BlobSender, + last_vote: &mut u64, + last_valid_validator_timestamp: &mut u64, +) -> Result<()> { + let now = timing::timestamp(); + if now - *last_vote > VOTE_TIMEOUT_MS { + let ids: Vec<_> = cluster_info.read().unwrap().valid_last_ids(); + if let Ok((last_id, super_majority_timestamp)) = get_last_id_to_vote_on( + id, + &ids, + bank, + now, + last_vote, + last_valid_validator_timestamp, + ) { + if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, cluster_info) { + vote_blob_sender.send(vec![shared_blob])?; + let finality_ms = now - super_majority_timestamp; + + *last_valid_validator_timestamp = super_majority_timestamp; + debug!("{} leader_sent_vote finality: {} ms", id, finality_ms); + inc_new_counter_info!("vote_stage-leader_sent_vote", 1); + + bank.set_finality((now - *last_valid_validator_timestamp) as usize); + + metrics::submit( + influxdb::Point::new(&"leader-finality") + .add_field("duration_ms", influxdb::Value::Integer(finality_ms as i64)) + .to_owned(), + ); + } + } + } + Ok(()) } pub fn send_validator_vote( bank: &Arc, - vote_account: &Keypair, + keypair: &Arc, cluster_info: &Arc>, vote_blob_sender: &BlobSender, ) -> Result<()> { let last_id = bank.last_id(); + if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, cluster_info) { + inc_new_counter_info!("replicate-vote_sent", 1); - let shared_blob = create_new_signed_vote_blob(&last_id, vote_account, bank, cluster_info)?; - inc_new_counter_info!("replicate-vote_sent", 1); - vote_blob_sender.send(vec![shared_blob])?; - + vote_blob_sender.send(vec![shared_blob])?; + } Ok(()) } + +#[cfg(test)] +pub mod tests { + use super::*; + use bank::Bank; + use bincode::deserialize; + use budget_instruction::Vote; + use cluster_info::{ClusterInfo, NodeInfo}; + use entry::next_entry; + use hash::{hash, Hash}; + use logger; + use mint::Mint; + use std::sync::mpsc::channel; + use std::sync::{Arc, RwLock}; + use std::thread::sleep; + use std::time::Duration; + use system_transaction::SystemTransaction; + use transaction::Transaction; + + #[test] + fn test_send_leader_vote() { + logger::setup(); + + // create a mint/bank + let mint = Mint::new(1000); + let bank = Arc::new(Bank::new(&mint)); + let hash0 = Hash::default(); + + // get a non-default hash last_id + let entry = next_entry(&hash0, 1, vec![]); + bank.register_entry_id(&entry.id); + + // Create a leader + let leader_data = NodeInfo::new_with_socketaddr(&"127.0.0.1:1234".parse().unwrap()); + let leader_pubkey = leader_data.id.clone(); + let mut leader_cluster_info = ClusterInfo::new(leader_data).unwrap(); + + // give the leader some tokens + let give_leader_tokens_tx = + Transaction::system_new(&mint.keypair(), leader_pubkey.clone(), 100, entry.id); + bank.process_transaction(&give_leader_tokens_tx).unwrap(); + + leader_cluster_info.set_leader(leader_pubkey); + + // Insert 7 agreeing validators / 3 disagreeing + // and votes for new last_id + for i in 0..10 { + let mut validator = + NodeInfo::new_with_socketaddr(&format!("127.0.0.1:234{}", i).parse().unwrap()); + + let vote = Vote { + version: validator.version + 1, + contact_info_version: 1, + }; + + if i < 7 { + validator.ledger_state.last_id = entry.id; + } + + leader_cluster_info.insert(&validator); + trace!("validator id: {:?}", validator.id); + + leader_cluster_info.insert_vote(&validator.id, &vote, entry.id); + } + let leader = Arc::new(RwLock::new(leader_cluster_info)); + let (vote_blob_sender, vote_blob_receiver) = channel(); + let mut last_vote: u64 = timing::timestamp() - VOTE_TIMEOUT_MS - 1; + let mut last_valid_validator_timestamp = 0; + let res = send_leader_vote( + &mint.pubkey(), + &mint.keypair(), + &bank, + &leader, + &vote_blob_sender, + &mut last_vote, + &mut last_valid_validator_timestamp, + ); + trace!("vote result: {:?}", res); + assert!(res.is_ok()); + let vote_blob = vote_blob_receiver.recv_timeout(Duration::from_millis(500)); + trace!("vote_blob: {:?}", vote_blob); + + // leader shouldn't vote yet, not enough votes + assert!(vote_blob.is_err()); + + // add two more nodes and see that it succeeds + for i in 0..2 { + let mut validator = + NodeInfo::new_with_socketaddr(&format!("127.0.0.1:234{}", i).parse().unwrap()); + + let vote = Vote { + version: validator.version + 1, + contact_info_version: 1, + }; + + validator.ledger_state.last_id = entry.id; + + leader.write().unwrap().insert(&validator); + trace!("validator id: {:?}", validator.id); + + leader + .write() + .unwrap() + .insert_vote(&validator.id, &vote, entry.id); + } + + last_vote = timing::timestamp() - VOTE_TIMEOUT_MS - 1; + let res = send_leader_vote( + &Pubkey::default(), + &mint.keypair(), + &bank, + &leader, + &vote_blob_sender, + &mut last_vote, + &mut last_valid_validator_timestamp, + ); + trace!("vote result: {:?}", res); + assert!(res.is_ok()); + let vote_blob = vote_blob_receiver.recv_timeout(Duration::from_millis(500)); + trace!("vote_blob: {:?}", vote_blob); + + // leader should vote now + assert!(vote_blob.is_ok()); + + // vote should be valid + let blob = &vote_blob.unwrap()[0]; + let tx = deserialize(&(blob.read().unwrap().data)).unwrap(); + assert_eq!(bank.process_transaction(&tx), Ok(())); + } + + #[test] + fn test_get_last_id_to_vote_on() { + logger::setup(); + + let mint = Mint::new(1234); + let bank = Arc::new(Bank::new(&mint)); + let mut last_vote = 0; + let mut last_valid_validator_timestamp = 0; + + // generate 10 last_ids, register 6 with the bank + let ids: Vec<_> = (0..10) + .map(|i| { + let last_id = hash(&serialize(&i).unwrap()); // Unique hash + if i < 6 { + bank.register_entry_id(&last_id); + } + // sleep to get a different timestamp in the bank + sleep(Duration::from_millis(1)); + last_id + }).collect(); + + // see that we fail to have 2/3rds consensus + assert!( + get_last_id_to_vote_on( + &Pubkey::default(), + &ids, + &bank, + 0, + &mut last_vote, + &mut last_valid_validator_timestamp + ).is_err() + ); + + // register another, see passing + bank.register_entry_id(&ids[6]); + + let res = get_last_id_to_vote_on( + &Pubkey::default(), + &ids, + &bank, + 0, + &mut last_vote, + &mut last_valid_validator_timestamp, + ); + if let Ok((hash, timestamp)) = res { + assert!(hash == ids[6]); + assert!(timestamp != 0); + } else { + assert!(false, "get_last_id returned error!: {:?}", res); + } + } +} diff --git a/src/vote_transaction.rs b/src/vote_transaction.rs deleted file mode 100644 index fe5edb835a..0000000000 --- a/src/vote_transaction.rs +++ /dev/null @@ -1,85 +0,0 @@ -//! The `vote_transaction` module provides functionality for creating vote transactions. - -use bincode::{deserialize, serialize}; -use hash::Hash; -use signature::Keypair; -use solana_sdk::pubkey::Pubkey; -use system_transaction::SystemTransaction; -use transaction::Transaction; -use vote_program::{Vote, VoteInstruction, VoteProgram, MAX_STATE_SIZE}; - -pub trait VoteTransaction { - fn vote_new(vote_account: &Keypair, vote: Vote, last_id: Hash, fee: i64) -> Self; - fn vote_account_new( - validator_id: &Keypair, - new_vote_account_id: Pubkey, - last_id: Hash, - num_tokens: i64, - ) -> Self; - fn vote_account_register( - validator_id: &Keypair, - vote_account_id: Pubkey, - last_id: Hash, - fee: i64, - ) -> Self; - fn get_votes(&self) -> Vec<(Pubkey, Vote, Hash)>; -} - -impl VoteTransaction for Transaction { - fn vote_new(vote_account: &Keypair, vote: Vote, last_id: Hash, fee: i64) -> Self { - let instruction = VoteInstruction::NewVote(vote); - let userdata = serialize(&instruction).expect("serialize instruction"); - Transaction::new(vote_account, &[], VoteProgram::id(), userdata, last_id, fee) - } - - fn vote_account_new( - validator_id: &Keypair, - new_vote_account_id: Pubkey, - last_id: Hash, - num_tokens: i64, - ) -> Self { - Transaction::system_create( - validator_id, - new_vote_account_id, - last_id, - num_tokens, - MAX_STATE_SIZE as u64, - VoteProgram::id(), - 0, - ) - } - - fn vote_account_register( - validator_id: &Keypair, - vote_account_id: Pubkey, - last_id: Hash, - fee: i64, - ) -> Self { - let register_tx = VoteInstruction::RegisterAccount; - let userdata = serialize(®ister_tx).unwrap(); - Transaction::new( - validator_id, - &[vote_account_id], - VoteProgram::id(), - userdata, - last_id, - fee, - ) - } - - fn get_votes(&self) -> Vec<(Pubkey, Vote, Hash)> { - let mut votes = vec![]; - for i in 0..self.instructions.len() { - let tx_program_id = self.program_id(i); - if VoteProgram::check_id(&tx_program_id) { - if let Ok(Some(VoteInstruction::NewVote(vote))) = deserialize(&self.userdata(i)) { - votes.push((self.account_keys[0], vote, self.last_id)) - } - } - } - votes - } -} - -#[cfg(test)] -mod tests {} diff --git a/src/wallet.rs b/src/wallet.rs index c82525d923..af7abb6aeb 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -780,7 +780,6 @@ mod tests { use signature::{read_keypair, read_pkcs8, Keypair, KeypairUtil}; use std::fs::remove_dir_all; use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; #[test] fn test_wallet_parse_command() { @@ -1075,11 +1074,11 @@ mod tests { #[ignore] fn test_wallet_process_command() { let (alice, ledger_path) = create_tmp_genesis("wallet_process_command", 10_000_000); - let mut bank = Bank::new(&alice); + let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let leader_data1 = leader.info.clone(); @@ -1087,14 +1086,8 @@ mod tests { let mut config = WalletConfig::default(); let rpc_port = 12345; // Needs to be distinct known number to not conflict with other tests - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_data.id, - ))); - bank.leader_scheduler = leader_scheduler; - let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, - vote_account_keypair, bank, 0, 0, @@ -1103,6 +1096,7 @@ mod tests { None, &ledger_path, false, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); @@ -1158,10 +1152,10 @@ mod tests { #[test] fn test_wallet_request_airdrop() { let (alice, ledger_path) = create_tmp_genesis("wallet_request_airdrop", 10_000_000); - let mut bank = Bank::new(&alice); + let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); @@ -1169,15 +1163,8 @@ mod tests { let genesis_entries = &alice.create_entries(); let entry_height = genesis_entries.len() as u64; - - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_data.id, - ))); - bank.leader_scheduler = leader_scheduler; - let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, - vote_account_keypair, bank, 0, entry_height, @@ -1186,6 +1173,7 @@ mod tests { None, &ledger_path, false, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); @@ -1239,11 +1227,11 @@ mod tests { #[ignore] fn test_wallet_timestamp_tx() { let (alice, ledger_path) = create_tmp_genesis("wallet_timestamp_tx", 10_000_000); - let mut bank = Bank::new(&alice); + let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let leader_data1 = leader.info.clone(); @@ -1253,14 +1241,8 @@ mod tests { let mut config_witness = WalletConfig::default(); let rpc_port = 13579; // Needs to be distinct known number to not conflict with other tests - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_data.id, - ))); - bank.leader_scheduler = leader_scheduler; - let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, - vote_account_keypair, bank, 0, 0, @@ -1269,6 +1251,7 @@ mod tests { None, &ledger_path, false, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); @@ -1366,9 +1349,9 @@ mod tests { #[ignore] fn test_wallet_witness_tx() { let (alice, ledger_path) = create_tmp_genesis("wallet_witness_tx", 10_000_000); - let mut bank = Bank::new(&alice); + let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let leader_data1 = leader.info.clone(); @@ -1378,14 +1361,8 @@ mod tests { let mut config_witness = WalletConfig::default(); let rpc_port = 11223; // Needs to be distinct known number to not conflict with other tests - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_data.id, - ))); - bank.leader_scheduler = leader_scheduler; - let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, - vote_account_keypair, bank, 0, 0, @@ -1394,6 +1371,7 @@ mod tests { None, &ledger_path, false, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); @@ -1489,9 +1467,9 @@ mod tests { #[ignore] fn test_wallet_cancel_tx() { let (alice, ledger_path) = create_tmp_genesis("wallet_cancel_tx", 10_000_000); - let mut bank = Bank::new(&alice); + let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let leader_data1 = leader.info.clone(); @@ -1501,14 +1479,8 @@ mod tests { let mut config_witness = WalletConfig::default(); let rpc_port = 13456; // Needs to be distinct known number to not conflict with other tests - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_data.id, - ))); - bank.leader_scheduler = leader_scheduler; - let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, - vote_account_keypair, bank, 0, 0, @@ -1517,6 +1489,7 @@ mod tests { None, &ledger_path, false, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); diff --git a/src/window.rs b/src/window.rs index 970354afa9..d5f3c97c1e 100644 --- a/src/window.rs +++ b/src/window.rs @@ -127,8 +127,8 @@ impl WindowUtil for Window { // 2) We are on the border between seed_rotation_intervals, so the // schedule won't be known until the entry on that cusp is received // by the replicate stage (which comes after this stage). Hence, the next - // leader at the beginning of that next epoch will not know they are the - // leader until they receive that last "cusp" entry. The leader also won't ask for repairs + // leader at the beginning of that next epoch will not know he is the + // leader until he receives that last "cusp" entry. He also won't ask for repairs // for that entry because "is_next_leader" won't be set here. In this case, // everybody will be blocking waiting for that "cusp" entry instead of repairing, // until the leader hits "times" >= the max times in calculate_max_repair(). diff --git a/tests/multinode.rs b/tests/multinode.rs index a68d35ca05..03606ff18c 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -109,7 +109,7 @@ fn make_tiny_test_entries(start_hash: Hash, num: usize) -> Vec { fn test_multi_node_ledger_window() -> result::Result<()> { logger::setup(); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); @@ -136,7 +136,6 @@ fn test_multi_node_ledger_window() -> result::Result<()> { leader, &leader_ledger_path, leader_keypair, - Arc::new(Keypair::new()), None, false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -149,7 +148,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { // start up another validator from zero, converge and then check // balances - let keypair = Arc::new(Keypair::new()); + let keypair = Keypair::new(); let validator_pubkey = keypair.pubkey().clone(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.info.clone(); @@ -157,7 +156,6 @@ fn test_multi_node_ledger_window() -> result::Result<()> { validator, &zero_ledger_path, keypair, - Arc::new(Keypair::new()), Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -208,7 +206,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { logger::setup(); const N: usize = 5; trace!("test_multi_node_validator_catchup_from_zero"); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); @@ -229,7 +227,6 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { leader, &leader_ledger_path, leader_keypair, - Arc::new(Keypair::new()), None, false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -242,7 +239,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { let mut nodes = vec![server]; for _ in 0..N { - let keypair = Arc::new(Keypair::new()); + let keypair = Keypair::new(); let validator_pubkey = keypair.pubkey().clone(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let ledger_path = tmp_copy_ledger( @@ -261,7 +258,6 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { validator, &ledger_path, keypair, - Arc::new(Keypair::new()), Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -292,13 +288,12 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { success = 0; // start up another validator from zero, converge and then check everyone's // balances - let keypair = Arc::new(Keypair::new()); + let keypair = Keypair::new(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let val = Fullnode::new( validator, &zero_ledger_path, keypair, - Arc::new(Keypair::new()), Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -352,7 +347,7 @@ fn test_multi_node_basic() { logger::setup(); const N: usize = 5; trace!("test_multi_node_basic"); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); @@ -365,7 +360,6 @@ fn test_multi_node_basic() { leader, &leader_ledger_path, leader_keypair, - Arc::new(Keypair::new()), None, false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -378,7 +372,7 @@ fn test_multi_node_basic() { let mut nodes = vec![server]; for _ in 0..N { - let keypair = Arc::new(Keypair::new()); + let keypair = Keypair::new(); let validator_pubkey = keypair.pubkey().clone(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_basic"); @@ -394,7 +388,6 @@ fn test_multi_node_basic() { validator, &ledger_path, keypair, - Arc::new(Keypair::new()), Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -433,7 +426,7 @@ fn test_multi_node_basic() { #[ignore] fn test_boot_validator_from_file() -> result::Result<()> { logger::setup(); - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader_pubkey = leader_keypair.pubkey(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = Keypair::new().pubkey(); @@ -446,7 +439,6 @@ fn test_boot_validator_from_file() -> result::Result<()> { leader, &leader_ledger_path, leader_keypair, - Arc::new(Keypair::new()), None, false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -458,7 +450,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(1000)).unwrap(); assert_eq!(leader_balance, 1000); - let keypair = Arc::new(Keypair::new()); + let keypair = Keypair::new(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.info.clone(); let ledger_path = tmp_copy_ledger(&leader_ledger_path, "boot_validator_from_file"); @@ -467,7 +459,6 @@ fn test_boot_validator_from_file() -> result::Result<()> { validator, &ledger_path, keypair, - Arc::new(Keypair::new()), Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -486,14 +477,13 @@ fn test_boot_validator_from_file() -> result::Result<()> { } fn create_leader(ledger_path: &str) -> (NodeInfo, Fullnode) { - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let leader_fullnode = Fullnode::new( leader, &ledger_path, leader_keypair, - Arc::new(Keypair::new()), None, false, LeaderScheduler::from_bootstrap_leader(leader_data.id), @@ -542,7 +532,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { let (leader_data, leader_fullnode) = create_leader(&ledger_path); // start validator from old ledger - let keypair = Arc::new(Keypair::new()); + let keypair = Keypair::new(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.info.clone(); @@ -550,7 +540,6 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { validator, &stale_ledger_path, keypair, - Arc::new(Keypair::new()), Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_data.id), @@ -599,7 +588,7 @@ fn test_multi_node_dynamic_network() { Err(_) => 120, }; - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = Keypair::new().pubkey(); @@ -615,7 +604,6 @@ fn test_multi_node_dynamic_network() { leader, &leader_ledger_path, leader_keypair, - Arc::new(Keypair::new()), None, true, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -687,8 +675,7 @@ fn test_multi_node_dynamic_network() { let val = Fullnode::new( validator, &ledger_path, - Arc::new(keypair), - Arc::new(Keypair::new()), + keypair, Some(leader_data.contact_info.ncp), true, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -793,7 +780,7 @@ fn test_leader_to_validator_transition() { let validator_keypair = Keypair::new(); // Create the leader node information - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_info = leader_node.info.clone(); @@ -814,7 +801,7 @@ fn test_leader_to_validator_transition() { // Write the bootstrap entries to the ledger that will cause leader rotation // after the bootstrap height let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); - let (bootstrap_entries, _) = + let bootstrap_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); ledger_writer.write_entries(&bootstrap_entries).unwrap(); @@ -832,7 +819,6 @@ fn test_leader_to_validator_transition() { leader_node, &leader_ledger_path, leader_keypair, - Arc::new(Keypair::new()), Some(leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -885,7 +871,7 @@ fn test_leader_to_validator_transition() { _ => panic!("Expected reason for exit to be leader rotation"), } - // Query newly transitioned validator to make sure that they have the proper balances in + // Query newly transitioned validator to make sure that he has the proper balances in // the after the transitions let mut leader_client = mk_client(&leader_info); @@ -897,10 +883,8 @@ fn test_leader_to_validator_transition() { // Check the ledger to make sure it's the right height, we should've // transitioned after tick_height == bootstrap_height - let (_, tick_height, _, _) = Fullnode::new_bank_from_ledger( - &leader_ledger_path, - Arc::new(RwLock::new(LeaderScheduler::default())), - ); + let (_, tick_height, _, _) = + Fullnode::new_bank_from_ledger(&leader_ledger_path, &mut LeaderScheduler::default()); assert_eq!(tick_height, bootstrap_height); @@ -919,12 +903,12 @@ fn test_leader_validator_basic() { let bob_pubkey = Keypair::new().pubkey(); // Create the leader node information - let leader_keypair = Arc::new(Keypair::new()); + let leader_keypair = Keypair::new(); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_info = leader_node.info.clone(); // Create the validator node information - let validator_keypair = Arc::new(Keypair::new()); + let validator_keypair = Keypair::new(); let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); // Make a common mint and a genesis entry for both leader + validator ledgers @@ -947,7 +931,7 @@ fn test_leader_validator_basic() { // Write the bootstrap entries to the ledger that will cause leader rotation // after the bootstrap height let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); - let (active_set_entries, vote_account_keypair) = + let active_set_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); ledger_writer.write_entries(&active_set_entries).unwrap(); @@ -962,23 +946,21 @@ fn test_leader_validator_basic() { Some(bootstrap_height), ); - // Start the validator node - let mut validator = Fullnode::new( - validator_node, - &validator_ledger_path, - validator_keypair, - Arc::new(vote_account_keypair), - Some(leader_info.contact_info.ncp), - false, - LeaderScheduler::new(&leader_scheduler_config), - ); - // Start the leader fullnode let mut leader = Fullnode::new( leader_node, &leader_ledger_path, leader_keypair, - Arc::new(Keypair::new()), + Some(leader_info.contact_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + ); + + // Start the validator node + let mut validator = Fullnode::new( + validator_node, + &validator_ledger_path, + validator_keypair, Some(leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -1017,7 +999,7 @@ fn test_leader_validator_basic() { _ => panic!("Expected reason for exit to be leader rotation"), } - // Query newly transitioned validator to make sure they have the proper balances + // Query newly transitioned validator to make sure that he has the proper balances // in the bank after the transitions let mut leader_client = mk_client(&leader_info); @@ -1089,7 +1071,7 @@ fn test_dropped_handoff_recovery() { logger::setup(); // Create the bootstrap leader node information - let bootstrap_leader_keypair = Arc::new(Keypair::new()); + let bootstrap_leader_keypair = Keypair::new(); let bootstrap_leader_node = Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey()); let bootstrap_leader_info = bootstrap_leader_node.info.clone(); @@ -1104,17 +1086,17 @@ fn test_dropped_handoff_recovery() { .id; // Create the validator keypair that will be the next leader in line - let next_leader_keypair = Arc::new(Keypair::new()); + let next_leader_keypair = Keypair::new(); // Create a common ledger with entries in the beginning that will add only // the "next_leader" validator to the active set for leader election, guaranteeing - // they are the next leader after bootstrap_height + // he is the next leader after bootstrap_height let mut ledger_paths = Vec::new(); ledger_paths.push(bootstrap_leader_ledger_path.clone()); - // Make the entries to give the next_leader validator some stake so that they will be in + // Make the entries to give the next_leader validator some stake so that he will be in // leader election active set - let (active_set_entries, vote_account_keypair) = + let active_set_entries = make_active_set_entries(&next_leader_keypair, &mint.keypair(), &last_id, &last_id, 0); // Write the entries @@ -1149,7 +1131,6 @@ fn test_dropped_handoff_recovery() { bootstrap_leader_node, &bootstrap_leader_ledger_path, bootstrap_leader_keypair, - Arc::new(Keypair::new()), Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -1159,7 +1140,7 @@ fn test_dropped_handoff_recovery() { // Start up the validators other than the "next_leader" validator for _ in 0..(N - 1) { - let kp = Arc::new(Keypair::new()); + let kp = Keypair::new(); let validator_ledger_path = tmp_copy_ledger( &bootstrap_leader_ledger_path, "test_dropped_handoff_recovery", @@ -1171,7 +1152,6 @@ fn test_dropped_handoff_recovery() { validator_node, &validator_ledger_path, kp, - Arc::new(Keypair::new()), Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -1196,7 +1176,6 @@ fn test_dropped_handoff_recovery() { next_leader_node, &next_leader_ledger_path, next_leader_keypair, - Arc::new(vote_account_keypair), Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -1268,11 +1247,10 @@ fn test_full_leader_validator_network() { let mut ledger_paths = Vec::new(); ledger_paths.push(bootstrap_leader_ledger_path.clone()); - let mut vote_account_keypairs = VecDeque::new(); for node_keypair in node_keypairs.iter() { - // Make entries to give each node some stake so that they will be in the + // Make entries to give the validator some stake so that he will be in // leader election active set - let (bootstrap_entries, vote_account_keypair) = make_active_set_entries( + let bootstrap_entries = make_active_set_entries( node_keypair, &mint.keypair(), &last_entry_id, @@ -1280,8 +1258,6 @@ fn test_full_leader_validator_network() { 0, ); - vote_account_keypairs.push_back(vote_account_keypair); - // Write the entries let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); last_entry_id = bootstrap_entries @@ -1310,8 +1286,7 @@ fn test_full_leader_validator_network() { let bootstrap_leader = Arc::new(RwLock::new(Fullnode::new( bootstrap_leader_node, &bootstrap_leader_ledger_path, - Arc::new(node_keypairs.pop_front().unwrap()), - Arc::new(vote_account_keypairs.pop_front().unwrap()), + node_keypairs.pop_front().unwrap(), Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -1336,8 +1311,7 @@ fn test_full_leader_validator_network() { let validator = Arc::new(RwLock::new(Fullnode::new( validator_node, &validator_ledger_path, - Arc::new(kp), - Arc::new(vote_account_keypairs.pop_front().unwrap()), + kp, Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -1364,7 +1338,7 @@ fn test_full_leader_validator_network() { num_reached_target_height = 0; for n in nodes.iter() { let node_lock = n.read().unwrap(); - let ls_lock = node_lock.get_leader_scheduler(); + let ls_lock = &node_lock.leader_scheduler; if let Some(sh) = ls_lock.read().unwrap().last_seed_height { if sh >= target_height { num_reached_target_height += 1;