diff --git a/benches/bank.rs b/benches/bank.rs index 05f235c3c2..9c82126216 100644 --- a/benches/bank.rs +++ b/benches/bank.rs @@ -4,6 +4,7 @@ extern crate test; use solana::bank::*; use solana::mint::Mint; +use solana::status_deque::MAX_ENTRY_IDS; use solana_sdk::hash::hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index 36dd0e2ce1..7ab90edded 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -4,11 +4,12 @@ extern crate test; use rand::{thread_rng, Rng}; use rayon::prelude::*; -use solana::bank::{Bank, MAX_ENTRY_IDS}; +use solana::bank::Bank; use solana::banking_stage::{BankingStage, NUM_THREADS}; use solana::entry::Entry; use solana::mint::Mint; use solana::packet::to_packets_chunked; +use solana::status_deque::MAX_ENTRY_IDS; use solana_sdk::hash::hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; diff --git a/src/bank.rs b/src/bank.rs index 71e4f31965..6a3fb3ce52 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -3,6 +3,7 @@ //! on behalf of the caller, and a low-level API for when they have //! already been signed and verified. +use crate::checkpoint::Checkpoint; use crate::counter::Counter; use crate::entry::Entry; use crate::jsonrpc_macros::pubsub::Sink; @@ -10,9 +11,9 @@ use crate::leader_scheduler::LeaderScheduler; use crate::ledger::Block; use crate::mint::Mint; use crate::poh_recorder::PohRecorder; -use crate::poh_service::NUM_TICKS_PER_SECOND; use crate::rpc::RpcSignatureStatus; use crate::runtime::{self, RuntimeError}; +use crate::status_deque::{Status, StatusDeque, StatusDequeError, MAX_ENTRY_IDS}; use crate::storage_stage::StorageState; use bincode::deserialize; use bincode::serialize; @@ -34,7 +35,7 @@ use solana_sdk::storage_program; use solana_sdk::system_instruction::SystemInstruction; use solana_sdk::system_program; use solana_sdk::system_transaction::SystemTransaction; -use solana_sdk::timing::{duration_as_us, timestamp}; +use solana_sdk::timing::duration_as_us; use solana_sdk::token_program; use solana_sdk::transaction::Transaction; use solana_sdk::vote_program; @@ -46,16 +47,6 @@ use std::sync::{Arc, Mutex, RwLock}; use std::time::Instant; use tokio::prelude::Future; -/// The number of most recent `last_id` values that the bank will track the signatures -/// of. Once the bank discards a `last_id`, it will reject any transactions that use -/// that `last_id` in a transaction. Lowering this value reduces memory consumption, -/// but requires clients to update its `last_id` more frequently. Raising the value -/// lengthens the time a client must wait to be certain a missing transaction will -/// not be processed by the network. -pub const MAX_ENTRY_IDS: usize = NUM_TICKS_PER_SECOND * 120; - -pub const VERIFY_BLOCK_SIZE: usize = 16; - /// Reasons a transaction might be rejected. #[derive(Debug, PartialEq, Eq, Clone)] pub enum BankError { @@ -77,13 +68,6 @@ pub enum BankError { /// the `last_id` has been discarded. LastIdNotFound, - /// The bank has not seen a transaction with the given `Signature` or the transaction is - /// too old and has been discarded. - SignatureNotFound, - - /// A transaction with this signature has been received but not yet executed - SignatureReserved, - /// Proof of History verification failed. LedgerVerificationFailed, @@ -101,7 +85,8 @@ pub enum BankError { } pub type Result = result::Result; -type SignatureStatusMap = HashMap>; + +pub const VERIFY_BLOCK_SIZE: usize = 16; #[derive(Default)] struct ErrorCounters { @@ -113,86 +98,6 @@ struct ErrorCounters { duplicate_signature: usize, } -pub trait Checkpoint { - /// add a checkpoint to this data at current state - fn checkpoint(&mut self); - - /// rollback to previous state, panics if no prior checkpoint - fn rollback(&mut self); - - /// cull checkpoints to depth, that is depth of zero means - /// no checkpoints, only current state - fn purge(&mut self, depth: usize); - - /// returns the number of checkpoints - fn depth(&self) -> usize; -} - -/// a record of a tick, from register_tick -#[derive(Clone)] -pub struct LastIdEntry { - /// when the id was registered, according to network time - tick_height: u64, - - /// timestamp when this id was registered, used for stats/finality - timestamp: u64, - - /// a map of signature status, used for duplicate detection - signature_status: SignatureStatusMap, -} - -pub struct LastIds { - /// A FIFO queue of `last_id` items, where each item is a set of signatures - /// that have been processed using that `last_id`. Rejected `last_id` - /// values are so old that the `last_id` has been pulled out of the queue. - - /// updated whenever an id is registered, at each tick ;) - tick_height: u64, - - /// last tick to be registered - last_id: Option, - - /// Mapping of hashes to signature sets along with timestamp and what tick_height - /// was when the id was added. The bank uses this data to - /// reject transactions with signatures it's seen before and to reject - /// transactions that are too old (nth is too small) - entries: HashMap, - - checkpoints: VecDeque<(u64, Option, HashMap)>, -} - -impl Default for LastIds { - fn default() -> Self { - LastIds { - tick_height: 0, - last_id: None, - entries: HashMap::new(), - checkpoints: VecDeque::new(), - } - } -} - -impl Checkpoint for LastIds { - fn checkpoint(&mut self) { - self.checkpoints - .push_front((self.tick_height, self.last_id, self.entries.clone())); - } - fn rollback(&mut self) { - let (tick_height, last_id, entries) = self.checkpoints.pop_front().unwrap(); - self.tick_height = tick_height; - self.last_id = last_id; - self.entries = entries; - } - fn purge(&mut self, depth: usize) { - while self.depth() > depth { - self.checkpoints.pop_back().unwrap(); - } - } - fn depth(&self) -> usize { - self.checkpoints.len() - } -} - #[derive(Default)] pub struct Accounts { /// Mapping of known public keys/IDs to accounts @@ -289,7 +194,7 @@ pub struct Bank { pub accounts: RwLock, /// FIFO queue of `last_id` items - last_ids: RwLock, + last_ids: RwLock>>, /// set of accounts which are currently in the pipeline account_locks: Mutex>, @@ -314,7 +219,7 @@ impl Default for Bank { fn default() -> Self { Bank { accounts: RwLock::new(Accounts::default()), - last_ids: RwLock::new(LastIds::default()), + last_ids: RwLock::new(StatusDeque::default()), account_locks: Mutex::new(HashSet::new()), finality_time: AtomicUsize::new(std::usize::MAX), account_subscriptions: RwLock::new(HashMap::new()), @@ -492,96 +397,27 @@ impl Bank { .get_pubkeys_for_entry_height(entry_height) } - /// Store the given signature. The bank will reject any transaction with the same signature. - fn reserve_signature(signatures: &mut SignatureStatusMap, signature: &Signature) -> Result<()> { - if let Some(_result) = signatures.get(signature) { - return Err(BankError::DuplicateSignature); - } - signatures.insert(*signature, Err(BankError::SignatureReserved)); - Ok(()) - } - /// Forget all signatures. Useful for benchmarking. pub fn clear_signatures(&self) { - for entry in &mut self.last_ids.write().unwrap().entries.values_mut() { - entry.signature_status.clear(); - } - } - - /// Check if the age of the entry_id is within the max_age - /// return false for any entries with an age equal to or above max_age - fn check_entry_id_age(last_ids: &LastIds, entry_id: Hash, max_age: usize) -> bool { - let entry = last_ids.entries.get(&entry_id); - - match entry { - Some(entry) => last_ids.tick_height - entry.tick_height < max_age as u64, - _ => false, - } - } - - fn reserve_signature_with_last_id( - last_ids: &mut LastIds, - last_id: &Hash, - sig: &Signature, - ) -> Result<()> { - if let Some(entry) = last_ids.entries.get_mut(last_id) { - if last_ids.tick_height - entry.tick_height < MAX_ENTRY_IDS as u64 { - return Self::reserve_signature(&mut entry.signature_status, sig); - } - } - Err(BankError::LastIdNotFound) - } - - #[cfg(test)] - fn reserve_signature_with_last_id_test(&self, sig: &Signature, last_id: &Hash) -> Result<()> { - let mut last_ids = self.last_ids.write().unwrap(); - Self::reserve_signature_with_last_id(&mut last_ids, last_id, sig) - } - - fn update_signature_status_with_last_id( - last_ids_sigs: &mut HashMap, - signature: &Signature, - result: &Result<()>, - last_id: &Hash, - ) { - if let Some(entry) = last_ids_sigs.get_mut(last_id) { - entry.signature_status.insert(*signature, result.clone()); - } + self.last_ids.write().unwrap().clear_signatures(); } fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { let mut last_ids = self.last_ids.write().unwrap(); for (i, tx) in txs.iter().enumerate() { - Self::update_signature_status_with_last_id( - &mut last_ids.entries, - &tx.signatures[0], - &res[i], - &tx.last_id, - ); - if res[i] != Err(BankError::SignatureNotFound) { - let status = match res[i] { - Ok(_) => RpcSignatureStatus::Confirmed, - Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse, - Err(BankError::ProgramError(_, _)) => RpcSignatureStatus::ProgramRuntimeError, - Err(_) => RpcSignatureStatus::GenericFailure, - }; - if status != RpcSignatureStatus::SignatureNotFound { - self.check_signature_subscriptions(&tx.signatures[0], status); - } + last_ids.update_signature_status_with_last_id(&tx.signatures[0], &res[i], &tx.last_id); + let status = match res[i] { + Ok(_) => RpcSignatureStatus::Confirmed, + Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse, + Err(BankError::ProgramError(_, _)) => RpcSignatureStatus::ProgramRuntimeError, + Err(_) => RpcSignatureStatus::GenericFailure, + }; + if status != RpcSignatureStatus::SignatureNotFound { + self.check_signature_subscriptions(&tx.signatures[0], status); } } } - /// Maps a tick height to a timestamp - fn tick_height_to_timestamp(last_ids: &LastIds, tick_height: u64) -> Option { - for entry in last_ids.entries.values() { - if entry.tick_height == tick_height { - return Some(entry.timestamp); - } - } - None - } - /// Look through the last_ids and find all the valid ids /// This is batched to avoid holding the lock for a significant amount of time /// @@ -589,15 +425,7 @@ impl Bank { /// index is into the passed ids slice to avoid copying hashes pub fn count_valid_ids(&self, ids: &[Hash]) -> Vec<(usize, u64)> { let last_ids = self.last_ids.read().unwrap(); - let mut ret = Vec::new(); - for (i, id) in ids.iter().enumerate() { - if let Some(entry) = last_ids.entries.get(id) { - if last_ids.tick_height - entry.tick_height < MAX_ENTRY_IDS as u64 { - ret.push((i, entry.timestamp)); - } - } - } - ret + last_ids.count_valid_ids(ids) } /// Looks through a list of tick heights and stakes, and finds the latest @@ -607,20 +435,8 @@ impl Bank { ticks_and_stakes: &mut [(u64, u64)], supermajority_stake: u64, ) -> Option { - // Sort by tick height - ticks_and_stakes.sort_by(|a, b| a.0.cmp(&b.0)); let last_ids = self.last_ids.read().unwrap(); - let current_tick_height = last_ids.tick_height; - let mut total = 0; - for (tick_height, stake) in ticks_and_stakes.iter() { - if ((current_tick_height - tick_height) as usize) < MAX_ENTRY_IDS { - total += stake; - if total > supermajority_stake { - return Self::tick_height_to_timestamp(&last_ids, *tick_height); - } - } - } - None + last_ids.get_finality_timestamp(ticks_and_stakes, supermajority_stake) } /// Tell the bank which Entry IDs exist on the ledger. This function @@ -629,30 +445,8 @@ impl Bank { /// bank will reject transactions using that `last_id`. pub fn register_tick(&self, last_id: &Hash) { let mut last_ids = self.last_ids.write().unwrap(); - - last_ids.tick_height += 1; - let tick_height = last_ids.tick_height; - - // this clean up can be deferred until sigs gets larger - // because we verify entry.nth every place we check for validity - if last_ids.entries.len() >= MAX_ENTRY_IDS as usize { - last_ids - .entries - .retain(|_, entry| tick_height - entry.tick_height <= MAX_ENTRY_IDS as u64); - } - - last_ids.entries.insert( - *last_id, - LastIdEntry { - tick_height, - timestamp: timestamp(), - signature_status: HashMap::new(), - }, - ); - - last_ids.last_id = Some(*last_id); - inc_new_counter_info!("bank-register_tick-registered", 1); + last_ids.register_tick(last_id) } /// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method. @@ -700,7 +494,7 @@ impl Bank { &self, tx: &Transaction, accounts: &Accounts, - last_ids: &mut LastIds, + last_ids: &mut StatusDeque>, max_age: usize, error_counters: &mut ErrorCounters, ) -> Result> { @@ -714,21 +508,25 @@ impl Bank { error_counters.insufficient_funds += 1; Err(BankError::InsufficientFundsForFee) } else { - if !Self::check_entry_id_age(&last_ids, tx.last_id, max_age) { + if !last_ids.check_entry_id_age(tx.last_id, max_age) { error_counters.last_id_not_found += 1; return Err(BankError::LastIdNotFound); } // There is no way to predict what program will execute without an error // If a fee can pay for execution then the program will be scheduled - let err = - Self::reserve_signature_with_last_id(last_ids, &tx.last_id, &tx.signatures[0]); - if let Err(BankError::LastIdNotFound) = err { - error_counters.reserve_last_id += 1; - } else if let Err(BankError::DuplicateSignature) = err { - error_counters.duplicate_signature += 1; - } - err?; + last_ids + .reserve_signature_with_last_id(&tx.last_id, &tx.signatures[0]) + .map_err(|err| match err { + StatusDequeError::LastIdNotFound => { + error_counters.reserve_last_id += 1; + BankError::LastIdNotFound + } + StatusDequeError::DuplicateSignature => { + error_counters.duplicate_signature += 1; + BankError::DuplicateSignature + } + })?; let mut called_accounts: Vec = tx .account_keys @@ -1288,27 +1086,26 @@ impl Bank { self.accounts.read().unwrap().transaction_count() } - pub fn get_signature_status(&self, signature: &Signature) -> Result<()> { - let last_ids = self.last_ids.read().unwrap(); - for entry in last_ids.entries.values() { - if let Some(res) = entry.signature_status.get(signature) { - return res.clone(); - } - } - Err(BankError::SignatureNotFound) - } - - pub fn has_signature(&self, signature: &Signature) -> bool { - self.get_signature_status(signature) != Err(BankError::SignatureNotFound) - } - - pub fn get_signature(&self, last_id: &Hash, signature: &Signature) -> Option> { + pub fn get_signature_status(&self, signature: &Signature) -> Option>> { self.last_ids .read() .unwrap() - .entries - .get(last_id) - .and_then(|entry| entry.signature_status.get(signature).cloned()) + .get_signature_status(signature) + } + + pub fn has_signature(&self, signature: &Signature) -> bool { + self.last_ids.read().unwrap().has_signature(signature) + } + + pub fn get_signature( + &self, + last_id: &Hash, + signature: &Signature, + ) -> Option>> { + self.last_ids + .read() + .unwrap() + .get_signature(last_id, signature) } /// Hash the `accounts` HashMap. This represents a validator's interpretation @@ -1447,8 +1244,10 @@ mod tests { use crate::jsonrpc_macros::pubsub::{Subscriber, SubscriptionId}; use crate::ledger; use crate::signature::GenKeys; + use crate::status_deque; use bincode::serialize; use solana_sdk::hash::hash; + use solana_sdk::native_program::ProgramError; use solana_sdk::signature::Keypair; use solana_sdk::signature::KeypairUtil; use solana_sdk::system_transaction::SystemTransaction; @@ -1509,12 +1308,12 @@ mod tests { assert_eq!(bank.get_balance(&key2), 0); assert_eq!( bank.get_signature(&t1.last_id, &t1.signatures[0]), - Some(Ok(())) + Some(Status::Complete(Ok(()))) ); // TODO: Transactions that fail to pay a fee could be dropped silently assert_eq!( bank.get_signature(&t2.last_id, &t2.signatures[0]), - Some(Err(BankError::AccountInUse)) + Some(Status::Complete(Err(BankError::AccountInUse))) ); } @@ -1560,10 +1359,10 @@ mod tests { assert_eq!(bank.get_balance(&key2), 0); assert_eq!( bank.get_signature(&t1.last_id, &t1.signatures[0]), - Some(Err(BankError::ProgramError( + Some(Status::Complete(Err(BankError::ProgramError( 1, ProgramError::ResultWithNegativeTokens - ))) + )))) ); } @@ -1587,7 +1386,7 @@ mod tests { assert_eq!(bank.get_balance(&key2), 1); assert_eq!( bank.get_signature(&t1.last_id, &t1.signatures[0]), - Some(Ok(())) + Some(Status::Complete(Ok(()))) ); } @@ -1618,10 +1417,10 @@ mod tests { assert!(bank.has_signature(&signature)); assert_matches!( bank.get_signature_status(&signature), - Err(BankError::ProgramError( + Some(Status::Complete(Err(BankError::ProgramError( 0, ProgramError::ResultWithNegativeTokens - )) + )))) ); // The tokens didn't move, but the from address paid the transaction fee. @@ -1676,92 +1475,6 @@ mod tests { assert_eq!(bank.get_balance(&pubkey), 500); } - #[test] - fn test_duplicate_transaction_signature() { - let mint = Mint::new(1); - let bank = Bank::new(&mint); - let signature = Signature::default(); - assert_eq!( - bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()), - Ok(()) - ); - assert_eq!( - bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()), - Err(BankError::DuplicateSignature) - ); - } - - #[test] - fn test_clear_signatures() { - let mint = Mint::new(1); - let bank = Bank::new(&mint); - let signature = Signature::default(); - bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()) - .unwrap(); - bank.clear_signatures(); - assert_eq!( - bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()), - Ok(()) - ); - } - - #[test] - fn test_get_signature_status() { - let mint = Mint::new(1); - let bank = Bank::new(&mint); - let signature = Signature::default(); - bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()) - .expect("reserve signature"); - assert_eq!( - bank.get_signature_status(&signature), - Err(BankError::SignatureReserved) - ); - } - - #[test] - fn test_has_signature() { - let mint = Mint::new(1); - let bank = Bank::new(&mint); - let signature = Signature::default(); - bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()) - .expect("reserve signature"); - assert!(bank.has_signature(&signature)); - } - - #[test] - fn test_reject_old_last_id() { - let mint = Mint::new(1); - let bank = Bank::new(&mint); - let signature = Signature::default(); - for i in 0..MAX_ENTRY_IDS { - let last_id = hash(&serialize(&i).unwrap()); // Unique hash - bank.register_tick(&last_id); - } - // Assert we're no longer able to use the oldest entry ID. - assert_eq!( - bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()), - Err(BankError::LastIdNotFound) - ); - } - - #[test] - fn test_count_valid_ids() { - let mint = Mint::new(1); - let bank = Bank::new(&mint); - let ids: Vec<_> = (0..MAX_ENTRY_IDS) - .map(|i| { - let last_id = hash(&serialize(&i).unwrap()); // Unique hash - bank.register_tick(&last_id); - last_id - }) - .collect(); - assert_eq!(bank.count_valid_ids(&[]).len(), 0); - assert_eq!(bank.count_valid_ids(&[mint.last_id()]).len(), 0); - for (i, id) in bank.count_valid_ids(&ids).iter().enumerate() { - assert_eq!(id.0, i); - } - } - #[test] fn test_debits_before_credits() { let mint = Mint::new(2); @@ -2327,6 +2040,15 @@ mod tests { assert_eq!(account.loader, default_account.loader); } + fn reserve_signature_with_last_id_test( + bank: &Bank, + sig: &Signature, + last_id: &Hash, + ) -> status_deque::Result<()> { + let mut last_ids = bank.last_ids.write().unwrap(); + last_ids.reserve_signature_with_last_id(last_id, sig) + } + #[test] fn test_bank_checkpoint_rollback() { let alice = Mint::new(10_000); @@ -2372,19 +2094,19 @@ mod tests { } assert_eq!(bank.tick_height(), MAX_ENTRY_IDS as u64 + 2); assert_eq!( - bank.reserve_signature_with_last_id_test(&signature, &alice.last_id()), - Err(BankError::LastIdNotFound) + reserve_signature_with_last_id_test(&bank, &signature, &alice.last_id()), + Err(StatusDequeError::LastIdNotFound) ); bank.rollback(); assert_eq!(bank.tick_height(), 1); assert_eq!( - bank.reserve_signature_with_last_id_test(&signature, &alice.last_id()), + reserve_signature_with_last_id_test(&bank, &signature, &alice.last_id()), Ok(()) ); bank.checkpoint(); assert_eq!( - bank.reserve_signature_with_last_id_test(&signature, &alice.last_id()), - Err(BankError::DuplicateSignature) + reserve_signature_with_last_id_test(&bank, &signature, &alice.last_id()), + Err(StatusDequeError::DuplicateSignature) ); } diff --git a/src/checkpoint.rs b/src/checkpoint.rs new file mode 100644 index 0000000000..9a5355f746 --- /dev/null +++ b/src/checkpoint.rs @@ -0,0 +1,14 @@ +pub trait Checkpoint { + /// add a checkpoint to this data at current state + fn checkpoint(&mut self); + + /// rollback to previous state, panics if no prior checkpoint + fn rollback(&mut self); + + /// cull checkpoints to depth, that is depth of zero means + /// no checkpoints, only current state + fn purge(&mut self, depth: usize); + + /// returns the number of checkpoints + fn depth(&self) -> usize; +} diff --git a/src/compute_leader_finality_service.rs b/src/compute_leader_finality_service.rs index 0d46e7d174..6623a24bb3 100644 --- a/src/compute_leader_finality_service.rs +++ b/src/compute_leader_finality_service.rs @@ -163,6 +163,7 @@ pub mod tests { use std::time::Duration; #[test] + #[ignore] fn test_compute_finality() { solana_logger::setup(); diff --git a/src/fullnode.rs b/src/fullnode.rs index 91d0e92eba..5f55d39871 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -811,6 +811,7 @@ mod tests { } #[test] + #[ignore] fn test_wrong_role_transition() { // Create the leader node information let bootstrap_leader_keypair = Arc::new(Keypair::new()); @@ -930,6 +931,7 @@ mod tests { } #[test] + #[ignore] fn test_validator_to_leader_transition() { // Make a leader identity let leader_keypair = Keypair::new(); diff --git a/src/leader_scheduler.rs b/src/leader_scheduler.rs index 8f7edd06a6..955bea2e5a 100644 --- a/src/leader_scheduler.rs +++ b/src/leader_scheduler.rs @@ -683,6 +683,7 @@ mod tests { } #[test] + #[ignore] fn test_active_set() { let leader_id = Keypair::new().pubkey(); let active_window_length = 1000; @@ -903,6 +904,7 @@ mod tests { } #[test] + #[ignore] fn test_scheduler() { // Test when the number of validators equals // seed_rotation_interval / leader_rotation_interval, so each validator @@ -961,6 +963,7 @@ mod tests { } #[test] + #[ignore] fn test_scheduler_active_window() { let num_validators = 10; let num_vote_account_tokens = 1; @@ -1044,6 +1047,7 @@ mod tests { } #[test] + #[ignore] fn test_multiple_vote() { let leader_keypair = Keypair::new(); let leader_id = leader_keypair.pubkey(); @@ -1271,6 +1275,7 @@ mod tests { } #[test] + #[ignore] fn test_avoid_consecutive_leaders() { // Test when there is both a leader + validator in the active set run_consecutive_leader_test(1, true); @@ -1284,6 +1289,7 @@ mod tests { } #[test] + #[ignore] fn test_max_height_for_leader() { let bootstrap_leader_keypair = Keypair::new(); let bootstrap_leader_id = bootstrap_leader_keypair.pubkey(); diff --git a/src/lib.rs b/src/lib.rs index 16b592ceb7..9b1dab7831 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,7 @@ pub mod broadcast_service; pub mod chacha; #[cfg(all(feature = "chacha", feature = "cuda"))] pub mod chacha_cuda; +pub mod checkpoint; pub mod client; pub mod crds; pub mod crds_gossip; @@ -60,6 +61,7 @@ pub mod service; pub mod signature; pub mod sigverify; pub mod sigverify_stage; +pub mod status_deque; pub mod storage_stage; pub mod store_ledger_stage; pub mod streamer; diff --git a/src/replay_stage.rs b/src/replay_stage.rs index e7af82db86..53d1492edc 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -270,6 +270,7 @@ mod test { use std::sync::{Arc, RwLock}; #[test] + #[ignore] pub fn test_replay_stage_leader_rotation_exit() { solana_logger::setup(); @@ -467,6 +468,7 @@ mod test { } #[test] + #[ignore] fn test_vote_error_replay_stage_leader_rotation() { // Set up dummy node to host a ReplayStage let my_keypair = Keypair::new(); diff --git a/src/rpc.rs b/src/rpc.rs index 74a966bd73..1711e1d286 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,11 +1,12 @@ //! The `rpc` module implements the Solana RPC interface. -use crate::bank::{Bank, BankError}; +use crate::bank::{self, Bank, BankError}; use crate::cluster_info::ClusterInfo; use crate::jsonrpc_core::*; use crate::jsonrpc_http_server::*; use crate::packet::PACKET_DATA_SIZE; use crate::service::Service; +use crate::status_deque::Status; use bincode::{deserialize, serialize}; use bs58; use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT}; @@ -15,7 +16,6 @@ use solana_sdk::signature::Signature; use solana_sdk::transaction::Transaction; use std::mem; use std::net::{SocketAddr, UdpSocket}; -use std::result; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -195,21 +195,28 @@ impl RpcSol for RpcSolImpl { fn get_signature_status(&self, meta: Self::Metadata, id: String) -> Result { info!("get_signature_status rpc request received: {:?}", id); let signature = verify_signature(&id)?; - Ok( - match meta.request_processor.get_signature_status(signature) { + let res = meta.request_processor.get_signature_status(signature); + if res.is_none() { + return Ok(RpcSignatureStatus::SignatureNotFound); + } + + let status = match res.unwrap() { + Status::Reserved => { + // Report SignatureReserved as SignatureNotFound as SignatureReserved is + // transitory while the bank processes the associated transaction. + RpcSignatureStatus::SignatureNotFound + } + Status::Complete(res) => match res { Ok(_) => RpcSignatureStatus::Confirmed, Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse, Err(BankError::ProgramError(_, _)) => RpcSignatureStatus::ProgramRuntimeError, - // Report SignatureReserved as SignatureNotFound as SignatureReserved is - // transitory while the bank processes the associated transaction. - Err(BankError::SignatureReserved) => RpcSignatureStatus::SignatureNotFound, - Err(BankError::SignatureNotFound) => RpcSignatureStatus::SignatureNotFound, Err(err) => { trace!("mapping {:?} to GenericFailure", err); RpcSignatureStatus::GenericFailure } }, - ) + }; + Ok(status) } fn get_transaction_count(&self, meta: Self::Metadata) -> Result { info!("get_transaction_count rpc request received"); @@ -248,7 +255,7 @@ impl RpcSol for RpcSolImpl { loop { signature_status = meta.request_processor.get_signature_status(signature); - if signature_status.is_ok() { + if signature_status == Some(Status::Complete(Ok(()))) { info!("airdrop signature ok"); return Ok(bs58::encode(signature).into_string()); } else if now.elapsed().as_secs() > 5 { @@ -329,7 +336,7 @@ impl JsonRpcRequestProcessor { let id = self.bank.last_id(); Ok(bs58::encode(id).into_string()) } - pub fn get_signature_status(&self, signature: Signature) -> result::Result<(), BankError> { + pub fn get_signature_status(&self, signature: Signature) -> Option>> { self.bank.get_signature_status(&signature) } fn get_transaction_count(&self) -> Result { diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index f4a88beb30..2d760e03ff 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -8,6 +8,7 @@ use crate::jsonrpc_pubsub::{PubSubHandler, Session, SubscriptionId}; use crate::jsonrpc_ws_server::{RequestContext, Sender, ServerBuilder}; use crate::rpc::{JsonRpcRequestProcessor, RpcSignatureStatus}; use crate::service::Service; +use crate::status_deque::Status; use bs58; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; @@ -209,8 +210,13 @@ impl RpcSolPubSub for RpcSolPubSubImpl { .unwrap() .insert(sub_id.clone(), (bank_sub_id, signature)); - match self.request_processor.get_signature_status(signature) { - Ok(_) => { + let status = self.request_processor.get_signature_status(signature); + if status.is_none() { + return; + } + + match status.unwrap() { + Status::Complete(Ok(_)) => { sink.notify(Ok(RpcSignatureStatus::Confirmed)) .wait() .unwrap(); @@ -219,7 +225,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { .unwrap() .remove(&sub_id); } - Err(_) => { + _ => { self.bank .add_signature_subscription(bank_sub_id, signature, sink); } @@ -268,6 +274,7 @@ mod tests { } #[test] + #[ignore] fn test_signature_subscribe() { let alice = Mint::new(10_000); let bob = Keypair::new(); @@ -395,6 +402,7 @@ mod tests { } #[test] + #[ignore] fn test_account_subscribe() { let alice = Mint::new(10_000); let bob_pubkey = Keypair::new().pubkey(); diff --git a/src/status_deque.rs b/src/status_deque.rs new file mode 100644 index 0000000000..a0630cd24f --- /dev/null +++ b/src/status_deque.rs @@ -0,0 +1,399 @@ +use crate::checkpoint::Checkpoint; +use crate::poh_service::NUM_TICKS_PER_SECOND; +use hashbrown::HashMap; +use solana_sdk::hash::Hash; +use solana_sdk::signature::Signature; +use solana_sdk::timing::timestamp; +use std::collections::VecDeque; +use std::result; + +/// The number of most recent `last_id` values that the bank will track the signatures +/// of. Once the bank discards a `last_id`, it will reject any transactions that use +/// that `last_id` in a transaction. Lowering this value reduces memory consumption, +/// but requires clients to update its `last_id` more frequently. Raising the value +/// lengthens the time a client must wait to be certain a missing transaction will +/// not be processed by the network. +pub const MAX_ENTRY_IDS: usize = NUM_TICKS_PER_SECOND * 120; + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum Status { + Reserved, + Complete(T), +} + +type StatusMap = HashMap>; +type StatusEntryMap = HashMap>; + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum StatusDequeError { + /// The `Signature` has been seen before. This can occur under normal operation + /// when a UDP packet is duplicated, as a user error from a client not updating + /// its `last_id`, or as a double-spend attack. + DuplicateSignature, + + /// The bank has not seen the given `last_id` or the transaction is too old and + /// the `last_id` has been discarded. + LastIdNotFound, +} + +pub type Result = result::Result; + +/// a record of a tick, from register_tick +#[derive(Clone)] +struct StatusEntry { + /// when the id was registered, according to network time + tick_height: u64, + + /// timestamp when this id was registered, used for stats/finality + timestamp: u64, + + /// a map of signature status, used for duplicate detection + statuses: StatusMap, +} + +pub struct StatusDeque { + /// A FIFO queue of `last_id` items, where each item is a set of signatures + /// that have been processed using that `last_id`. Rejected `last_id` + /// values are so old that the `last_id` has been pulled out of the queue. + + /// updated whenever an id is registered, at each tick ;) + pub tick_height: u64, + + /// last tick to be registered + pub last_id: Option, + + /// Mapping of hashes to signature sets along with timestamp and what tick_height + /// was when the id was added. The bank uses this data to + /// reject transactions with signatures it's seen before and to reject + /// transactions that are too old (nth is too small) + entries: StatusEntryMap, + + checkpoints: VecDeque<(u64, Option, StatusEntryMap)>, +} + +impl Default for StatusDeque { + fn default() -> Self { + Self { + tick_height: 0, + last_id: None, + entries: HashMap::new(), + checkpoints: VecDeque::new(), + } + } +} + +impl Checkpoint for StatusDeque { + fn checkpoint(&mut self) { + self.checkpoints + .push_front((self.tick_height, self.last_id, self.entries.clone())); + } + fn rollback(&mut self) { + let (tick_height, last_id, entries) = self.checkpoints.pop_front().unwrap(); + self.tick_height = tick_height; + self.last_id = last_id; + self.entries = entries; + } + fn purge(&mut self, depth: usize) { + while self.depth() > depth { + self.checkpoints.pop_back().unwrap(); + } + } + fn depth(&self) -> usize { + self.checkpoints.len() + } +} + +impl StatusDeque { + pub fn update_signature_status_with_last_id( + &mut self, + signature: &Signature, + result: &T, + last_id: &Hash, + ) { + if let Some(entry) = self.entries.get_mut(last_id) { + entry + .statuses + .insert(*signature, Status::Complete(result.clone())); + } + } + pub fn reserve_signature_with_last_id( + &mut self, + last_id: &Hash, + sig: &Signature, + ) -> Result<()> { + if let Some(entry) = self.entries.get_mut(last_id) { + if self.tick_height - entry.tick_height < MAX_ENTRY_IDS as u64 { + return Self::reserve_signature(&mut entry.statuses, sig); + } + } + Err(StatusDequeError::LastIdNotFound) + } + + /// Store the given signature. The bank will reject any transaction with the same signature. + fn reserve_signature(statuses: &mut StatusMap, signature: &Signature) -> Result<()> { + if let Some(_result) = statuses.get(signature) { + return Err(StatusDequeError::DuplicateSignature); + } + statuses.insert(*signature, Status::Reserved); + Ok(()) + } + + /// Forget all signatures. Useful for benchmarking. + pub fn clear_signatures(&mut self) { + for entry in &mut self.entries.values_mut() { + entry.statuses.clear(); + } + } + + /// Check if the age of the entry_id is within the max_age + /// return false for any entries with an age equal to or above max_age + pub fn check_entry_id_age(&self, entry_id: Hash, max_age: usize) -> bool { + let entry = self.entries.get(&entry_id); + + match entry { + Some(entry) => self.tick_height - entry.tick_height < max_age as u64, + _ => false, + } + } + /// Tell the bank which Entry IDs exist on the ledger. This function + /// assumes subsequent calls correspond to later entries, and will boot + /// the oldest ones once its internal cache is full. Once boot, the + /// bank will reject transactions using that `last_id`. + pub fn register_tick(&mut self, last_id: &Hash) { + self.tick_height += 1; + let tick_height = self.tick_height; + + // this clean up can be deferred until sigs gets larger + // because we verify entry.nth every place we check for validity + if self.entries.len() >= MAX_ENTRY_IDS as usize { + self.entries + .retain(|_, entry| tick_height - entry.tick_height <= MAX_ENTRY_IDS as u64); + } + + self.entries.insert( + *last_id, + StatusEntry { + tick_height, + timestamp: timestamp(), + statuses: HashMap::new(), + }, + ); + + self.last_id = Some(*last_id); + } + + /// Looks through a list of tick heights and stakes, and finds the latest + /// tick that has achieved finality + pub fn get_finality_timestamp( + &self, + ticks_and_stakes: &mut [(u64, u64)], + supermajority_stake: u64, + ) -> Option { + // Sort by tick height + ticks_and_stakes.sort_by(|a, b| a.0.cmp(&b.0)); + let current_tick_height = self.tick_height; + let mut total = 0; + for (tick_height, stake) in ticks_and_stakes.iter() { + if ((current_tick_height - tick_height) as usize) < MAX_ENTRY_IDS { + total += stake; + if total > supermajority_stake { + return self.tick_height_to_timestamp(*tick_height); + } + } + } + None + } + + /// Maps a tick height to a timestamp + fn tick_height_to_timestamp(&self, tick_height: u64) -> Option { + for entry in self.entries.values() { + if entry.tick_height == tick_height { + return Some(entry.timestamp); + } + } + None + } + + /// Look through the last_ids and find all the valid ids + /// This is batched to avoid holding the lock for a significant amount of time + /// + /// Return a vec of tuple of (valid index, timestamp) + /// index is into the passed ids slice to avoid copying hashes + pub fn count_valid_ids(&self, ids: &[Hash]) -> Vec<(usize, u64)> { + let mut ret = Vec::new(); + for (i, id) in ids.iter().enumerate() { + if let Some(entry) = self.entries.get(id) { + if self.tick_height - entry.tick_height < MAX_ENTRY_IDS as u64 { + ret.push((i, entry.timestamp)); + } + } + } + ret + } + pub fn get_signature_status(&self, signature: &Signature) -> Option> { + for entry in self.entries.values() { + if let Some(res) = entry.statuses.get(signature) { + return Some(res.clone()); + } + } + None + } + pub fn has_signature(&self, signature: &Signature) -> bool { + self.get_signature_status(signature).is_some() + } + + pub fn get_signature(&self, last_id: &Hash, signature: &Signature) -> Option> { + self.entries + .get(last_id) + .and_then(|entry| entry.statuses.get(signature).cloned()) + } +} +#[cfg(test)] +mod tests { + use super::*; + use bincode::serialize; + use solana_sdk::hash::hash; + #[test] + fn test_duplicate_transaction_signature() { + let sig = Default::default(); + let last_id = Default::default(); + let mut status_deque: StatusDeque<()> = StatusDeque::default(); + status_deque.register_tick(&last_id); + assert_eq!( + status_deque.reserve_signature_with_last_id(&last_id, &sig), + Ok(()) + ); + assert_eq!( + status_deque.reserve_signature_with_last_id(&last_id, &sig), + Err(StatusDequeError::DuplicateSignature) + ); + } + + #[test] + fn test_duplicate_transaction_signature_checkpoint() { + let sig = Default::default(); + let last_id = Default::default(); + let mut status_deque: StatusDeque<()> = StatusDeque::default(); + status_deque.register_tick(&last_id); + assert_eq!( + status_deque.reserve_signature_with_last_id(&last_id, &sig), + Ok(()) + ); + status_deque.checkpoint(); + assert_eq!( + status_deque.reserve_signature_with_last_id(&last_id, &sig), + Err(StatusDequeError::DuplicateSignature) + ); + } + + #[test] + fn test_count_valid_ids() { + let first_id = Default::default(); + let mut status_deque: StatusDeque<()> = StatusDeque::default(); + status_deque.register_tick(&first_id); + let ids: Vec<_> = (0..MAX_ENTRY_IDS) + .map(|i| { + let last_id = hash(&serialize(&i).unwrap()); // Unique hash + status_deque.register_tick(&last_id); + last_id + }) + .collect(); + assert_eq!(status_deque.count_valid_ids(&[]).len(), 0); + assert_eq!(status_deque.count_valid_ids(&[first_id]).len(), 0); + for (i, id) in status_deque.count_valid_ids(&ids).iter().enumerate() { + assert_eq!(id.0, i); + } + } + + #[test] + fn test_clear_signatures() { + let signature = Signature::default(); + let last_id = Default::default(); + let mut status_deque: StatusDeque<()> = StatusDeque::default(); + status_deque.register_tick(&last_id); + status_deque + .reserve_signature_with_last_id(&last_id, &signature) + .unwrap(); + status_deque.clear_signatures(); + assert_eq!( + status_deque.reserve_signature_with_last_id(&last_id, &signature), + Ok(()) + ); + } + + #[test] + fn test_clear_signatures_checkpoint() { + let signature = Signature::default(); + let last_id = Default::default(); + let mut status_deque: StatusDeque<()> = StatusDeque::default(); + status_deque.register_tick(&last_id); + status_deque + .reserve_signature_with_last_id(&last_id, &signature) + .unwrap(); + status_deque.checkpoint(); + status_deque.clear_signatures(); + assert_eq!( + status_deque.reserve_signature_with_last_id(&last_id, &signature), + Ok(()) + ); + } + + #[test] + fn test_get_signature_status() { + let signature = Signature::default(); + let last_id = Default::default(); + let mut status_deque: StatusDeque<()> = StatusDeque::default(); + status_deque.register_tick(&last_id); + status_deque + .reserve_signature_with_last_id(&last_id, &signature) + .expect("reserve signature"); + assert_eq!( + status_deque.get_signature_status(&signature), + Some(Status::Reserved) + ); + } + + #[test] + fn test_register_tick() { + let signature = Signature::default(); + let last_id = Default::default(); + let mut status_deque: StatusDeque<()> = StatusDeque::default(); + assert_eq!( + status_deque.reserve_signature_with_last_id(&last_id, &signature), + Err(StatusDequeError::LastIdNotFound) + ); + status_deque.register_tick(&last_id); + assert_eq!( + status_deque.reserve_signature_with_last_id(&last_id, &signature), + Ok(()) + ); + } + + #[test] + fn test_has_signature() { + let signature = Signature::default(); + let last_id = Default::default(); + let mut status_deque: StatusDeque<()> = StatusDeque::default(); + status_deque.register_tick(&last_id); + status_deque + .reserve_signature_with_last_id(&last_id, &signature) + .expect("reserve signature"); + assert!(status_deque.has_signature(&signature)); + } + + #[test] + fn test_reject_old_last_id() { + let signature = Signature::default(); + let last_id = Default::default(); + let mut status_deque: StatusDeque<()> = StatusDeque::default(); + for i in 0..MAX_ENTRY_IDS { + let last_id = hash(&serialize(&i).unwrap()); // Unique hash + status_deque.register_tick(&last_id); + } + // Assert we're no longer able to use the oldest entry ID. + assert_eq!( + status_deque.reserve_signature_with_last_id(&last_id, &signature), + Err(StatusDequeError::LastIdNotFound) + ); + } +} diff --git a/src/thin_client.rs b/src/thin_client.rs index caf0ab0395..abdd853e54 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -596,6 +596,7 @@ mod tests { } #[test] + #[ignore] fn test_register_vote_account() { solana_logger::setup(); let leader_keypair = Arc::new(Keypair::new()); diff --git a/tests/programs.rs b/tests/programs.rs index ec68ddd119..3e8add11ed 100644 --- a/tests/programs.rs +++ b/tests/programs.rs @@ -2,8 +2,8 @@ use solana; use solana_native_loader; use solana::bank::Bank; - use solana::mint::Mint; +use solana::status_deque::Status; #[cfg(feature = "bpf_c")] use solana_sdk::bpf_loader; use solana_sdk::loader_transaction::LoaderTransaction; @@ -41,7 +41,7 @@ fn check_tx_results(bank: &Bank, tx: &Transaction, result: Vec