Compute finality computation in new ComputeLeaderFinalityService (#1652)
* Move finality computation into a service run from the banking stage, ComputeLeaderFinalityService * Change last ids nth to tick height, remove separate tick height from bank
This commit is contained in:
		
							
								
								
									
										114
									
								
								src/bank.rs
									
									
									
									
									
								
							
							
						
						
									
										114
									
								
								src/bank.rs
									
									
									
									
									
								
							@@ -123,22 +123,22 @@ pub struct LastIds {
 | 
			
		||||
    /// values are so old that the `last_id` has been pulled out of the queue.
 | 
			
		||||
 | 
			
		||||
    /// updated whenever an id is registered
 | 
			
		||||
    nth: isize,
 | 
			
		||||
    tick_height: u64,
 | 
			
		||||
 | 
			
		||||
    /// last id to be registered
 | 
			
		||||
    last: Option<Hash>,
 | 
			
		||||
 | 
			
		||||
    /// Mapping of hashes to signature sets along with timestamp and what nth
 | 
			
		||||
    /// Mapping of hashes to signature sets along with timestamp and what tick_height
 | 
			
		||||
    /// was when the id was added. The bank uses this data to
 | 
			
		||||
    /// reject transactions with signatures it's seen before and to reject
 | 
			
		||||
    /// transactions that are too old (nth is too small)
 | 
			
		||||
    sigs: HashMap<Hash, (SignatureStatusMap, u64, isize)>,
 | 
			
		||||
    /// transactions that are too old (tick_height is too small)
 | 
			
		||||
    sigs: HashMap<Hash, (SignatureStatusMap, u64, u64)>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Default for LastIds {
 | 
			
		||||
    fn default() -> Self {
 | 
			
		||||
        LastIds {
 | 
			
		||||
            nth: 0,
 | 
			
		||||
            tick_height: 0,
 | 
			
		||||
            last: None,
 | 
			
		||||
            sigs: HashMap::new(),
 | 
			
		||||
        }
 | 
			
		||||
@@ -172,9 +172,6 @@ pub struct Bank {
 | 
			
		||||
    /// Tracks and updates the leader schedule based on the votes and account stakes
 | 
			
		||||
    /// processed by the bank
 | 
			
		||||
    pub leader_scheduler: Arc<RwLock<LeaderScheduler>>,
 | 
			
		||||
 | 
			
		||||
    // The number of ticks that have elapsed since genesis
 | 
			
		||||
    tick_height: Mutex<u64>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Default for Bank {
 | 
			
		||||
@@ -188,7 +185,6 @@ impl Default for Bank {
 | 
			
		||||
            account_subscriptions: RwLock::new(HashMap::new()),
 | 
			
		||||
            signature_subscriptions: RwLock::new(HashMap::new()),
 | 
			
		||||
            leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())),
 | 
			
		||||
            tick_height: Mutex::new(0),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -287,7 +283,7 @@ impl Bank {
 | 
			
		||||
        let entry = last_ids.sigs.get(&entry_id);
 | 
			
		||||
 | 
			
		||||
        match entry {
 | 
			
		||||
            Some(entry) => ((last_ids.nth - entry.2) as usize) < max_age,
 | 
			
		||||
            Some(entry) => ((last_ids.tick_height - entry.2) as usize) < max_age,
 | 
			
		||||
            _ => false,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@@ -298,7 +294,7 @@ impl Bank {
 | 
			
		||||
        sig: &Signature,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        if let Some(entry) = last_ids.sigs.get_mut(last_id) {
 | 
			
		||||
            if ((last_ids.nth - entry.2) as usize) <= MAX_ENTRY_IDS {
 | 
			
		||||
            if ((last_ids.tick_height - entry.2) as usize) < MAX_ENTRY_IDS {
 | 
			
		||||
                return Self::reserve_signature(&mut entry.0, sig);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
@@ -321,7 +317,7 @@ impl Bank {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn update_signature_status_with_last_id(
 | 
			
		||||
        last_ids_sigs: &mut HashMap<Hash, (SignatureStatusMap, u64, isize)>,
 | 
			
		||||
        last_ids_sigs: &mut HashMap<Hash, (SignatureStatusMap, u64, u64)>,
 | 
			
		||||
        signature: &Signature,
 | 
			
		||||
        result: &Result<()>,
 | 
			
		||||
        last_id: &Hash,
 | 
			
		||||
@@ -356,6 +352,16 @@ impl Bank {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Maps a tick height to a timestamp
 | 
			
		||||
    fn tick_height_to_timestamp(last_ids: &LastIds, tick_height: u64) -> Option<u64> {
 | 
			
		||||
        for entry in last_ids.sigs.values() {
 | 
			
		||||
            if entry.2 == tick_height {
 | 
			
		||||
                return Some(entry.1);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        None
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Look through the last_ids and find all the valid ids
 | 
			
		||||
    /// This is batched to avoid holding the lock for a significant amount of time
 | 
			
		||||
    ///
 | 
			
		||||
@@ -366,7 +372,7 @@ impl Bank {
 | 
			
		||||
        let mut ret = Vec::new();
 | 
			
		||||
        for (i, id) in ids.iter().enumerate() {
 | 
			
		||||
            if let Some(entry) = last_ids.sigs.get(id) {
 | 
			
		||||
                if ((last_ids.nth - entry.2) as usize) <= MAX_ENTRY_IDS {
 | 
			
		||||
                if ((last_ids.tick_height - entry.2) as usize) < MAX_ENTRY_IDS {
 | 
			
		||||
                    ret.push((i, entry.1));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
@@ -374,6 +380,42 @@ impl Bank {
 | 
			
		||||
        ret
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Looks through a list of tick heights and stakes, and finds the latest
 | 
			
		||||
    /// tick that has achieved finality
 | 
			
		||||
    pub fn get_finality_timestamp(
 | 
			
		||||
        &self,
 | 
			
		||||
        ticks_and_stakes: &mut [(u64, i64)],
 | 
			
		||||
        supermajority_stake: i64,
 | 
			
		||||
    ) -> Option<u64> {
 | 
			
		||||
        // Sort by tick height
 | 
			
		||||
        ticks_and_stakes.sort_by(|a, b| a.0.cmp(&b.0));
 | 
			
		||||
        let last_ids = self.last_ids.read().unwrap();
 | 
			
		||||
        let current_tick_height = last_ids.tick_height;
 | 
			
		||||
        let mut total = 0;
 | 
			
		||||
        for (tick_height, stake) in ticks_and_stakes.iter() {
 | 
			
		||||
            if ((current_tick_height - tick_height) as usize) < MAX_ENTRY_IDS {
 | 
			
		||||
                total += stake;
 | 
			
		||||
                if total > supermajority_stake {
 | 
			
		||||
                    return Self::tick_height_to_timestamp(&last_ids, *tick_height);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        None
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Tell the bank about the genesis Entry IDs.
 | 
			
		||||
    pub fn register_genesis_entry(&self, last_id: &Hash) {
 | 
			
		||||
        let mut last_ids = self.last_ids.write().unwrap();
 | 
			
		||||
 | 
			
		||||
        last_ids
 | 
			
		||||
            .sigs
 | 
			
		||||
            .insert(*last_id, (HashMap::new(), timestamp(), 0));
 | 
			
		||||
 | 
			
		||||
        last_ids.last = Some(*last_id);
 | 
			
		||||
 | 
			
		||||
        inc_new_counter_info!("bank-register_genesis_entry_id-registered", 1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Tell the bank which Entry IDs exist on the ledger. This function
 | 
			
		||||
    /// assumes subsequent calls correspond to later entries, and will boot
 | 
			
		||||
    /// the oldest ones once its internal cache is full. Once boot, the
 | 
			
		||||
@@ -381,21 +423,22 @@ impl Bank {
 | 
			
		||||
    pub fn register_entry_id(&self, last_id: &Hash) {
 | 
			
		||||
        let mut last_ids = self.last_ids.write().unwrap();
 | 
			
		||||
 | 
			
		||||
        let last_ids_nth = last_ids.nth;
 | 
			
		||||
        last_ids.tick_height += 1;
 | 
			
		||||
        let last_ids_tick_height = last_ids.tick_height;
 | 
			
		||||
 | 
			
		||||
        // this clean up can be deferred until sigs gets larger
 | 
			
		||||
        //  because we verify entry.nth every place we check for validity
 | 
			
		||||
        // because we verify entry.tick_height every place we check for validity
 | 
			
		||||
        if last_ids.sigs.len() >= MAX_ENTRY_IDS {
 | 
			
		||||
            last_ids
 | 
			
		||||
                .sigs
 | 
			
		||||
                .retain(|_, (_, _, nth)| ((last_ids_nth - *nth) as usize) <= MAX_ENTRY_IDS);
 | 
			
		||||
            last_ids.sigs.retain(|_, (_, _, tick_height)| {
 | 
			
		||||
                ((last_ids_tick_height - *tick_height) as usize) < MAX_ENTRY_IDS
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        last_ids
 | 
			
		||||
            .sigs
 | 
			
		||||
            .insert(*last_id, (HashMap::new(), timestamp(), last_ids_nth));
 | 
			
		||||
        last_ids.sigs.insert(
 | 
			
		||||
            *last_id,
 | 
			
		||||
            (HashMap::new(), timestamp(), last_ids_tick_height),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        last_ids.nth += 1;
 | 
			
		||||
        last_ids.last = Some(*last_id);
 | 
			
		||||
 | 
			
		||||
        inc_new_counter_info!("bank-register_entry_id-registered", 1);
 | 
			
		||||
@@ -412,6 +455,7 @@ impl Bank {
 | 
			
		||||
            Ok(_) => Ok(()),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn lock_account(
 | 
			
		||||
        account_locks: &mut HashSet<Pubkey>,
 | 
			
		||||
        keys: &[Pubkey],
 | 
			
		||||
@@ -908,17 +952,12 @@ impl Bank {
 | 
			
		||||
                result?;
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            let tick_height = {
 | 
			
		||||
                let mut tick_height_lock = self.tick_height.lock().unwrap();
 | 
			
		||||
                *tick_height_lock += 1;
 | 
			
		||||
                *tick_height_lock
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            self.register_entry_id(&entry.id);
 | 
			
		||||
            let tick_height = self.last_ids.read().unwrap().tick_height;
 | 
			
		||||
            self.leader_scheduler
 | 
			
		||||
                .write()
 | 
			
		||||
                .unwrap()
 | 
			
		||||
                .update_height(tick_height, self);
 | 
			
		||||
            self.register_entry_id(&entry.id);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
@@ -963,7 +1002,6 @@ impl Bank {
 | 
			
		||||
                // if its a tick, execute the group and register the tick
 | 
			
		||||
                self.par_execute_entries(&mt_group)?;
 | 
			
		||||
                self.register_entry_id(&entry.id);
 | 
			
		||||
                *self.tick_height.lock().unwrap() += 1;
 | 
			
		||||
                mt_group = vec![];
 | 
			
		||||
                continue;
 | 
			
		||||
            }
 | 
			
		||||
@@ -1111,8 +1149,8 @@ impl Bank {
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        self.register_entry_id(&entry0.id);
 | 
			
		||||
        self.register_entry_id(&entry1.id);
 | 
			
		||||
        self.register_genesis_entry(&entry0.id);
 | 
			
		||||
        self.register_genesis_entry(&entry1.id);
 | 
			
		||||
 | 
			
		||||
        Ok(self.process_ledger_blocks(entry1.id, 2, entries)?)
 | 
			
		||||
    }
 | 
			
		||||
@@ -1148,6 +1186,12 @@ impl Bank {
 | 
			
		||||
            .unwrap_or(0)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// TODO: Need to implement a real staking contract to hold node stake.
 | 
			
		||||
    /// Right now this just gets the account balances. See github issue #1655.
 | 
			
		||||
    pub fn get_stake(&self, pubkey: &Pubkey) -> i64 {
 | 
			
		||||
        self.get_balance(pubkey)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn get_account(&self, pubkey: &Pubkey) -> Option<Account> {
 | 
			
		||||
        let accounts = self
 | 
			
		||||
            .accounts
 | 
			
		||||
@@ -1233,12 +1277,12 @@ impl Bank {
 | 
			
		||||
 | 
			
		||||
    pub fn get_current_leader(&self) -> Option<Pubkey> {
 | 
			
		||||
        let ls_lock = self.leader_scheduler.read().unwrap();
 | 
			
		||||
        let tick_height = self.tick_height.lock().unwrap();
 | 
			
		||||
        ls_lock.get_scheduled_leader(*tick_height)
 | 
			
		||||
        let tick_height = self.last_ids.read().unwrap().tick_height;
 | 
			
		||||
        ls_lock.get_scheduled_leader(tick_height)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn get_tick_height(&self) -> u64 {
 | 
			
		||||
        *self.tick_height.lock().unwrap()
 | 
			
		||||
        self.last_ids.read().unwrap().tick_height
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn check_account_subscriptions(&self, pubkey: &Pubkey, account: &Account) {
 | 
			
		||||
 
 | 
			
		||||
@@ -4,6 +4,7 @@
 | 
			
		||||
 | 
			
		||||
use bank::Bank;
 | 
			
		||||
use bincode::deserialize;
 | 
			
		||||
use compute_leader_finality_service::ComputeLeaderFinalityService;
 | 
			
		||||
use counter::Counter;
 | 
			
		||||
use entry::Entry;
 | 
			
		||||
use hash::Hash;
 | 
			
		||||
@@ -38,6 +39,7 @@ pub struct BankingStage {
 | 
			
		||||
    /// Handle to the stage's thread.
 | 
			
		||||
    bank_thread_hdls: Vec<JoinHandle<Option<BankingStageReturnType>>>,
 | 
			
		||||
    poh_service: PohService,
 | 
			
		||||
    compute_finality_service: ComputeLeaderFinalityService,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl BankingStage {
 | 
			
		||||
@@ -65,6 +67,10 @@ impl BankingStage {
 | 
			
		||||
        // Once an entry has been recorded, its last_id is registered with the bank.
 | 
			
		||||
        let poh_service = PohService::new(poh_recorder.clone(), config);
 | 
			
		||||
 | 
			
		||||
        // Single thread to compute finality
 | 
			
		||||
        let compute_finality_service =
 | 
			
		||||
            ComputeLeaderFinalityService::new(bank.clone(), poh_service.poh_exit.clone());
 | 
			
		||||
 | 
			
		||||
        // Many banks that process transactions in parallel.
 | 
			
		||||
        let bank_thread_hdls: Vec<JoinHandle<Option<BankingStageReturnType>>> = (0..NUM_THREADS)
 | 
			
		||||
            .map(|_| {
 | 
			
		||||
@@ -112,6 +118,7 @@ impl BankingStage {
 | 
			
		||||
            BankingStage {
 | 
			
		||||
                bank_thread_hdls,
 | 
			
		||||
                poh_service,
 | 
			
		||||
                compute_finality_service,
 | 
			
		||||
            },
 | 
			
		||||
            entry_receiver,
 | 
			
		||||
        )
 | 
			
		||||
@@ -228,6 +235,8 @@ impl Service for BankingStage {
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        self.compute_finality_service.join()?;
 | 
			
		||||
 | 
			
		||||
        let poh_return_value = self.poh_service.join()?;
 | 
			
		||||
        match poh_return_value {
 | 
			
		||||
            Ok(_) => (),
 | 
			
		||||
 
 | 
			
		||||
@@ -108,8 +108,6 @@ pub struct NodeInfo {
 | 
			
		||||
    pub contact_info: ContactInfo,
 | 
			
		||||
    /// current leader identity
 | 
			
		||||
    pub leader_id: Pubkey,
 | 
			
		||||
    /// information about the state of the ledger
 | 
			
		||||
    pub ledger_state: LedgerState,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl NodeInfo {
 | 
			
		||||
@@ -133,9 +131,6 @@ impl NodeInfo {
 | 
			
		||||
                version: 0,
 | 
			
		||||
            },
 | 
			
		||||
            leader_id: Pubkey::default(),
 | 
			
		||||
            ledger_state: LedgerState {
 | 
			
		||||
                last_id: Hash::default(),
 | 
			
		||||
            },
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -707,17 +702,6 @@ impl ClusterInfo {
 | 
			
		||||
        (id, max_updated_node, updated_data)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn valid_last_ids(&self) -> Vec<Hash> {
 | 
			
		||||
        self.table
 | 
			
		||||
            .values()
 | 
			
		||||
            .filter(|r| {
 | 
			
		||||
                r.id != Pubkey::default()
 | 
			
		||||
                    && (Self::is_valid_address(&r.contact_info.tpu)
 | 
			
		||||
                        || Self::is_valid_address(&r.contact_info.tvu))
 | 
			
		||||
            }).map(|x| x.ledger_state.last_id)
 | 
			
		||||
            .collect()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
 | 
			
		||||
        // find a peer that appears to be accepting replication, as indicated
 | 
			
		||||
        //  by a valid tvu port location
 | 
			
		||||
@@ -1776,36 +1760,6 @@ mod tests {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_valid_last_ids() {
 | 
			
		||||
        logger::setup();
 | 
			
		||||
        let mut leader0 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234"));
 | 
			
		||||
        leader0.ledger_state.last_id = hash(b"0");
 | 
			
		||||
        let mut leader1 = NodeInfo::new_multicast();
 | 
			
		||||
        leader1.ledger_state.last_id = hash(b"1");
 | 
			
		||||
        let mut leader2 =
 | 
			
		||||
            NodeInfo::new_with_pubkey_socketaddr(Pubkey::default(), &socketaddr!("127.0.0.2:1234"));
 | 
			
		||||
        leader2.ledger_state.last_id = hash(b"2");
 | 
			
		||||
        // test that only valid tvu or tpu are retured as nodes
 | 
			
		||||
        let mut leader3 = NodeInfo::new(
 | 
			
		||||
            Keypair::new().pubkey(),
 | 
			
		||||
            socketaddr!("127.0.0.1:1234"),
 | 
			
		||||
            socketaddr_any!(),
 | 
			
		||||
            socketaddr!("127.0.0.1:1236"),
 | 
			
		||||
            socketaddr_any!(),
 | 
			
		||||
            socketaddr_any!(),
 | 
			
		||||
        );
 | 
			
		||||
        leader3.ledger_state.last_id = hash(b"3");
 | 
			
		||||
        let mut cluster_info = ClusterInfo::new(leader0.clone()).expect("ClusterInfo::new");
 | 
			
		||||
        cluster_info.insert(&leader1);
 | 
			
		||||
        cluster_info.insert(&leader2);
 | 
			
		||||
        cluster_info.insert(&leader3);
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            cluster_info.valid_last_ids(),
 | 
			
		||||
            vec![leader0.ledger_state.last_id]
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Validates the node that sent Protocol::ReceiveUpdates gets its
 | 
			
		||||
    /// liveness updated, but not if the node sends Protocol::ReceiveUpdates
 | 
			
		||||
    /// to itself.
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										204
									
								
								src/compute_leader_finality_service.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										204
									
								
								src/compute_leader_finality_service.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,204 @@
 | 
			
		||||
//! The `compute_leader_finality_service` module implements the tools necessary
 | 
			
		||||
//! to generate a thread which regularly calculates the last finality times
 | 
			
		||||
//! observed by the leader
 | 
			
		||||
 | 
			
		||||
use bank::Bank;
 | 
			
		||||
use influx_db_client as influxdb;
 | 
			
		||||
use metrics;
 | 
			
		||||
use service::Service;
 | 
			
		||||
use std::result;
 | 
			
		||||
use std::sync::atomic::{AtomicBool, Ordering};
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::thread::sleep;
 | 
			
		||||
use std::thread::{self, Builder, JoinHandle};
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
use timing;
 | 
			
		||||
use vote_program::VoteProgram;
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, PartialEq, Eq)]
 | 
			
		||||
pub enum FinalityError {
 | 
			
		||||
    NoValidSupermajority,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub const COMPUTE_FINALITY_MS: u64 = 1000;
 | 
			
		||||
 | 
			
		||||
pub struct ComputeLeaderFinalityService {
 | 
			
		||||
    compute_finality_thread: JoinHandle<()>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ComputeLeaderFinalityService {
 | 
			
		||||
    fn get_last_supermajority_timestamp(
 | 
			
		||||
        bank: &Arc<Bank>,
 | 
			
		||||
        now: u64,
 | 
			
		||||
        last_valid_validator_timestamp: u64,
 | 
			
		||||
    ) -> result::Result<u64, FinalityError> {
 | 
			
		||||
        let mut total_stake = 0;
 | 
			
		||||
 | 
			
		||||
        let mut ticks_and_stakes: Vec<(u64, i64)> = {
 | 
			
		||||
            let bank_accounts = bank.accounts.read().unwrap();
 | 
			
		||||
            // TODO: Doesn't account for duplicates since a single validator could potentially register
 | 
			
		||||
            // multiple vote accounts. Once that is no longer possible (see the TODO in vote_program.rs,
 | 
			
		||||
            // process_transaction(), case VoteInstruction::RegisterAccount), this will be more accurate.
 | 
			
		||||
            // See github issue 1654.
 | 
			
		||||
            bank_accounts
 | 
			
		||||
                .values()
 | 
			
		||||
                .filter_map(|account| {
 | 
			
		||||
                    // Filter out any accounts that don't belong to the VoteProgram
 | 
			
		||||
                    // by returning None
 | 
			
		||||
                    if VoteProgram::check_id(&account.program_id) {
 | 
			
		||||
                        if let Ok(vote_state) = VoteProgram::deserialize(&account.userdata) {
 | 
			
		||||
                            let validator_stake = bank.get_stake(&vote_state.node_id);
 | 
			
		||||
                            total_stake += validator_stake;
 | 
			
		||||
                            // Filter out any validators that don't have at least one vote
 | 
			
		||||
                            // by returning None
 | 
			
		||||
                            return vote_state
 | 
			
		||||
                                .votes
 | 
			
		||||
                                .back()
 | 
			
		||||
                                .map(|vote| (vote.tick_height, validator_stake));
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    None
 | 
			
		||||
                }).collect()
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let super_majority_stake = (2 * total_stake) / 3;
 | 
			
		||||
 | 
			
		||||
        if let Some(last_valid_validator_timestamp) =
 | 
			
		||||
            bank.get_finality_timestamp(&mut ticks_and_stakes, super_majority_stake)
 | 
			
		||||
        {
 | 
			
		||||
            return Ok(last_valid_validator_timestamp);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if last_valid_validator_timestamp != 0 {
 | 
			
		||||
            metrics::submit(
 | 
			
		||||
                influxdb::Point::new(&"leader-finality")
 | 
			
		||||
                    .add_field(
 | 
			
		||||
                        "duration_ms",
 | 
			
		||||
                        influxdb::Value::Integer((now - last_valid_validator_timestamp) as i64),
 | 
			
		||||
                    ).to_owned(),
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Err(FinalityError::NoValidSupermajority)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn compute_finality(bank: &Arc<Bank>, last_valid_validator_timestamp: &mut u64) {
 | 
			
		||||
        let now = timing::timestamp();
 | 
			
		||||
        if let Ok(super_majority_timestamp) =
 | 
			
		||||
            Self::get_last_supermajority_timestamp(bank, now, *last_valid_validator_timestamp)
 | 
			
		||||
        {
 | 
			
		||||
            let finality_ms = now - super_majority_timestamp;
 | 
			
		||||
 | 
			
		||||
            *last_valid_validator_timestamp = super_majority_timestamp;
 | 
			
		||||
            bank.set_finality((now - *last_valid_validator_timestamp) as usize);
 | 
			
		||||
 | 
			
		||||
            metrics::submit(
 | 
			
		||||
                influxdb::Point::new(&"leader-finality")
 | 
			
		||||
                    .add_field("duration_ms", influxdb::Value::Integer(finality_ms as i64))
 | 
			
		||||
                    .to_owned(),
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Create a new ComputeLeaderFinalityService for computing finality.
 | 
			
		||||
    pub fn new(bank: Arc<Bank>, exit: Arc<AtomicBool>) -> Self {
 | 
			
		||||
        let compute_finality_thread = Builder::new()
 | 
			
		||||
            .name("solana-leader-finality-stage".to_string())
 | 
			
		||||
            .spawn(move || {
 | 
			
		||||
                let mut last_valid_validator_timestamp = 0;
 | 
			
		||||
                loop {
 | 
			
		||||
                    if exit.load(Ordering::Relaxed) {
 | 
			
		||||
                        break;
 | 
			
		||||
                    }
 | 
			
		||||
                    Self::compute_finality(&bank, &mut last_valid_validator_timestamp);
 | 
			
		||||
                    sleep(Duration::from_millis(COMPUTE_FINALITY_MS));
 | 
			
		||||
                }
 | 
			
		||||
            }).unwrap();
 | 
			
		||||
 | 
			
		||||
        (ComputeLeaderFinalityService {
 | 
			
		||||
            compute_finality_thread,
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Service for ComputeLeaderFinalityService {
 | 
			
		||||
    type JoinReturnType = ();
 | 
			
		||||
 | 
			
		||||
    fn join(self) -> thread::Result<()> {
 | 
			
		||||
        self.compute_finality_thread.join()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
pub mod tests {
 | 
			
		||||
    use bank::Bank;
 | 
			
		||||
    use bincode::serialize;
 | 
			
		||||
    use compute_leader_finality_service::ComputeLeaderFinalityService;
 | 
			
		||||
    use hash::hash;
 | 
			
		||||
    use logger;
 | 
			
		||||
    use mint::Mint;
 | 
			
		||||
    use signature::{Keypair, KeypairUtil};
 | 
			
		||||
    use std::sync::Arc;
 | 
			
		||||
    use std::thread::sleep;
 | 
			
		||||
    use std::time::Duration;
 | 
			
		||||
    use transaction::Transaction;
 | 
			
		||||
    use vote_program::Vote;
 | 
			
		||||
    use vote_transaction::{create_vote_account, VoteTransaction};
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_compute_finality() {
 | 
			
		||||
        logger::setup();
 | 
			
		||||
 | 
			
		||||
        let mint = Mint::new(1234);
 | 
			
		||||
        let bank = Arc::new(Bank::new(&mint));
 | 
			
		||||
        // generate 10 validators, but only vote for the first 6 validators
 | 
			
		||||
        let ids: Vec<_> = (0..10)
 | 
			
		||||
            .map(|i| {
 | 
			
		||||
                let last_id = hash(&serialize(&i).unwrap()); // Unique hash
 | 
			
		||||
                bank.register_entry_id(&last_id);
 | 
			
		||||
                // sleep to get a different timestamp in the bank
 | 
			
		||||
                sleep(Duration::from_millis(1));
 | 
			
		||||
                last_id
 | 
			
		||||
            }).collect();
 | 
			
		||||
 | 
			
		||||
        // Create a total of 10 vote accounts, each will have a balance of 1 (after giving 1 to
 | 
			
		||||
        // their vote account), for a total staking pool of 10 tokens.
 | 
			
		||||
        let vote_accounts: Vec<_> = (0..10)
 | 
			
		||||
            .map(|i| {
 | 
			
		||||
                // Create new validator to vote
 | 
			
		||||
                let validator_keypair = Keypair::new();
 | 
			
		||||
                let last_id = ids[i];
 | 
			
		||||
 | 
			
		||||
                // Give the validator some tokens
 | 
			
		||||
                bank.transfer(2, &mint.keypair(), validator_keypair.pubkey(), last_id)
 | 
			
		||||
                    .unwrap();
 | 
			
		||||
                let vote_account = create_vote_account(&validator_keypair, &bank, 1, last_id)
 | 
			
		||||
                    .expect("Expected successful creation of account");
 | 
			
		||||
 | 
			
		||||
                if i < 6 {
 | 
			
		||||
                    let vote = Vote {
 | 
			
		||||
                        tick_height: (i + 1) as u64,
 | 
			
		||||
                    };
 | 
			
		||||
                    let vote_tx = Transaction::vote_new(&vote_account, vote, last_id, 0);
 | 
			
		||||
                    bank.process_transaction(&vote_tx).unwrap();
 | 
			
		||||
                }
 | 
			
		||||
                vote_account
 | 
			
		||||
            }).collect();
 | 
			
		||||
 | 
			
		||||
        // There isn't 2/3 consensus, so the bank's finality value should be the default
 | 
			
		||||
        let mut last_finality_time = 0;
 | 
			
		||||
        ComputeLeaderFinalityService::compute_finality(&bank, &mut last_finality_time);
 | 
			
		||||
        assert_eq!(bank.finality(), std::usize::MAX);
 | 
			
		||||
 | 
			
		||||
        // Get another validator to vote, so we now have 2/3 consensus
 | 
			
		||||
        let vote_account = &vote_accounts[7];
 | 
			
		||||
        let vote = Vote { tick_height: 7 };
 | 
			
		||||
        let vote_tx = Transaction::vote_new(&vote_account, vote, ids[6], 0);
 | 
			
		||||
        bank.process_transaction(&vote_tx).unwrap();
 | 
			
		||||
 | 
			
		||||
        ComputeLeaderFinalityService::compute_finality(&bank, &mut last_finality_time);
 | 
			
		||||
        assert!(bank.finality() != std::usize::MAX);
 | 
			
		||||
        assert!(last_finality_time > 0);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -286,7 +286,7 @@ impl LeaderScheduler {
 | 
			
		||||
        let lower_bound = height.saturating_sub(self.active_window_length);
 | 
			
		||||
 | 
			
		||||
        {
 | 
			
		||||
            let bank_accounts = &*bank.accounts.read().unwrap();
 | 
			
		||||
            let bank_accounts = &bank.accounts.read().unwrap();
 | 
			
		||||
 | 
			
		||||
            bank_accounts
 | 
			
		||||
                .values()
 | 
			
		||||
@@ -374,17 +374,13 @@ impl LeaderScheduler {
 | 
			
		||||
        self.last_seed_height = Some(height);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn get_stake(id: &Pubkey, bank: &Bank) -> i64 {
 | 
			
		||||
        bank.get_balance(id)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn rank_active_set<'a, I>(bank: &Bank, active: I) -> Vec<(&'a Pubkey, u64)>
 | 
			
		||||
    where
 | 
			
		||||
        I: Iterator<Item = &'a Pubkey>,
 | 
			
		||||
    {
 | 
			
		||||
        let mut active_accounts: Vec<(&'a Pubkey, u64)> = active
 | 
			
		||||
            .filter_map(|pk| {
 | 
			
		||||
                let stake = Self::get_stake(pk, bank);
 | 
			
		||||
                let stake = bank.get_stake(pk);
 | 
			
		||||
                if stake > 0 {
 | 
			
		||||
                    Some((pk, stake as u64))
 | 
			
		||||
                } else {
 | 
			
		||||
@@ -500,7 +496,6 @@ mod tests {
 | 
			
		||||
        DEFAULT_LEADER_ROTATION_INTERVAL, DEFAULT_SEED_ROTATION_INTERVAL,
 | 
			
		||||
    };
 | 
			
		||||
    use mint::Mint;
 | 
			
		||||
    use result::Result;
 | 
			
		||||
    use signature::{Keypair, KeypairUtil};
 | 
			
		||||
    use solana_sdk::pubkey::Pubkey;
 | 
			
		||||
    use std::collections::HashSet;
 | 
			
		||||
@@ -508,7 +503,7 @@ mod tests {
 | 
			
		||||
    use std::iter::FromIterator;
 | 
			
		||||
    use transaction::Transaction;
 | 
			
		||||
    use vote_program::Vote;
 | 
			
		||||
    use vote_transaction::VoteTransaction;
 | 
			
		||||
    use vote_transaction::{create_vote_account, VoteTransaction};
 | 
			
		||||
 | 
			
		||||
    fn to_hashset_owned<T>(slice: &[T]) -> HashSet<T>
 | 
			
		||||
    where
 | 
			
		||||
@@ -527,31 +522,6 @@ mod tests {
 | 
			
		||||
        bank.process_transaction(&new_vote_tx).unwrap();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn create_vote_account(
 | 
			
		||||
        node_keypair: &Keypair,
 | 
			
		||||
        bank: &Bank,
 | 
			
		||||
        num_tokens: i64,
 | 
			
		||||
        last_id: Hash,
 | 
			
		||||
    ) -> Result<Keypair> {
 | 
			
		||||
        let new_vote_account = Keypair::new();
 | 
			
		||||
 | 
			
		||||
        // Create the new vote account
 | 
			
		||||
        let tx = Transaction::vote_account_new(
 | 
			
		||||
            node_keypair,
 | 
			
		||||
            new_vote_account.pubkey(),
 | 
			
		||||
            last_id,
 | 
			
		||||
            num_tokens,
 | 
			
		||||
        );
 | 
			
		||||
        bank.process_transaction(&tx)?;
 | 
			
		||||
 | 
			
		||||
        // Register the vote account to the validator
 | 
			
		||||
        let tx =
 | 
			
		||||
            Transaction::vote_account_register(node_keypair, new_vote_account.pubkey(), last_id, 0);
 | 
			
		||||
        bank.process_transaction(&tx)?;
 | 
			
		||||
 | 
			
		||||
        Ok(new_vote_account)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn run_scheduler_test(
 | 
			
		||||
        num_validators: usize,
 | 
			
		||||
        bootstrap_height: u64,
 | 
			
		||||
 
 | 
			
		||||
@@ -26,6 +26,7 @@ pub mod client;
 | 
			
		||||
#[macro_use]
 | 
			
		||||
pub mod cluster_info;
 | 
			
		||||
pub mod budget_program;
 | 
			
		||||
pub mod compute_leader_finality_service;
 | 
			
		||||
pub mod drone;
 | 
			
		||||
pub mod entry;
 | 
			
		||||
pub mod entry_writer;
 | 
			
		||||
 
 | 
			
		||||
@@ -34,7 +34,7 @@ impl PohRecorder {
 | 
			
		||||
        // TODO: amortize the cost of this lock by doing the loop in here for
 | 
			
		||||
        // some min amount of hashes
 | 
			
		||||
        let mut poh = self.poh.lock().unwrap();
 | 
			
		||||
        if self.is_max_tick_height_reached(&*poh) {
 | 
			
		||||
        if self.is_max_tick_height_reached(&poh) {
 | 
			
		||||
            Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
 | 
			
		||||
        } else {
 | 
			
		||||
            poh.hash();
 | 
			
		||||
@@ -47,7 +47,7 @@ impl PohRecorder {
 | 
			
		||||
        // hasn't been reached.
 | 
			
		||||
        // This guarantees PoH order and Entry production and banks LastId queue is the same
 | 
			
		||||
        let mut poh = self.poh.lock().unwrap();
 | 
			
		||||
        if self.is_max_tick_height_reached(&*poh) {
 | 
			
		||||
        if self.is_max_tick_height_reached(&poh) {
 | 
			
		||||
            Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
 | 
			
		||||
        } else if self.is_virtual {
 | 
			
		||||
            self.generate_and_store_tick(&mut *poh);
 | 
			
		||||
@@ -67,7 +67,7 @@ impl PohRecorder {
 | 
			
		||||
        // Register and send the entry out while holding the lock.
 | 
			
		||||
        // This guarantees PoH order and Entry production and banks LastId queue is the same.
 | 
			
		||||
        let mut poh = self.poh.lock().unwrap();
 | 
			
		||||
        if self.is_max_tick_height_reached(&*poh) {
 | 
			
		||||
        if self.is_max_tick_height_reached(&poh) {
 | 
			
		||||
            Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
 | 
			
		||||
        } else {
 | 
			
		||||
            self.record_and_send_txs(&mut *poh, mixin, txs)?;
 | 
			
		||||
 
 | 
			
		||||
@@ -90,6 +90,7 @@ impl Tpu {
 | 
			
		||||
            ledger_write_stage,
 | 
			
		||||
            exit: exit.clone(),
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        (tpu, entry_forwarder, exit)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -104,7 +104,7 @@ impl VoteProgram {
 | 
			
		||||
        match deserialize(tx.userdata(instruction_index)) {
 | 
			
		||||
            Ok(VoteInstruction::RegisterAccount) => {
 | 
			
		||||
                // TODO: a single validator could register multiple "vote accounts"
 | 
			
		||||
                // which would clutter the "accounts" structure.
 | 
			
		||||
                // which would clutter the "accounts" structure. See github issue 1654.
 | 
			
		||||
                accounts[1].program_id = Self::id();
 | 
			
		||||
 | 
			
		||||
                let mut vote_state = VoteProgram {
 | 
			
		||||
 
 | 
			
		||||
@@ -17,11 +17,9 @@ use transaction::Transaction;
 | 
			
		||||
use vote_program::Vote;
 | 
			
		||||
use vote_transaction::VoteTransaction;
 | 
			
		||||
 | 
			
		||||
pub const VOTE_TIMEOUT_MS: u64 = 1000;
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, PartialEq, Eq)]
 | 
			
		||||
pub enum VoteError {
 | 
			
		||||
    NoValidLastIdsToVoteOn,
 | 
			
		||||
    NoValidSupermajority,
 | 
			
		||||
    NoLeader,
 | 
			
		||||
    LeaderInfoNotFound,
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,14 @@
 | 
			
		||||
//! The `vote_transaction` module provides functionality for creating vote transactions.
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
use bank::Bank;
 | 
			
		||||
use bincode::{deserialize, serialize};
 | 
			
		||||
use hash::Hash;
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
use result::Result;
 | 
			
		||||
use signature::Keypair;
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
use signature::KeypairUtil;
 | 
			
		||||
use solana_sdk::pubkey::Pubkey;
 | 
			
		||||
use system_transaction::SystemTransaction;
 | 
			
		||||
use transaction::Transaction;
 | 
			
		||||
@@ -81,5 +87,27 @@ impl VoteTransaction for Transaction {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
pub fn create_vote_account(
 | 
			
		||||
    node_keypair: &Keypair,
 | 
			
		||||
    bank: &Bank,
 | 
			
		||||
    num_tokens: i64,
 | 
			
		||||
    last_id: Hash,
 | 
			
		||||
) -> Result<Keypair> {
 | 
			
		||||
    let new_vote_account = Keypair::new();
 | 
			
		||||
 | 
			
		||||
    // Create the new vote account
 | 
			
		||||
    let tx =
 | 
			
		||||
        Transaction::vote_account_new(node_keypair, new_vote_account.pubkey(), last_id, num_tokens);
 | 
			
		||||
    bank.process_transaction(&tx)?;
 | 
			
		||||
 | 
			
		||||
    // Register the vote account to the validator
 | 
			
		||||
    let tx =
 | 
			
		||||
        Transaction::vote_account_register(node_keypair, new_vote_account.pubkey(), last_id, 0);
 | 
			
		||||
    bank.process_transaction(&tx)?;
 | 
			
		||||
 | 
			
		||||
    Ok(new_vote_account)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod tests {}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user