Add snapshotting integration test (#5519)
* Add snapshotting integration test * Update ContactInfo on restart in local cluster nodes
This commit is contained in:
		
							
								
								
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @@ -3535,6 +3535,7 @@ name = "solana-local-cluster" | ||||
| version = "0.18.0-pre2" | ||||
| dependencies = [ | ||||
|  "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", | ||||
|  "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", | ||||
|  "serial_test 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", | ||||
|  "serial_test_derive 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", | ||||
|  "solana-client 0.18.0-pre2", | ||||
| @@ -3547,6 +3548,7 @@ dependencies = [ | ||||
|  "solana-storage-program 0.18.0-pre2", | ||||
|  "solana-vote-api 0.18.0-pre2", | ||||
|  "symlink 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", | ||||
|  "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", | ||||
| ] | ||||
|  | ||||
| [[package]] | ||||
|   | ||||
| @@ -238,6 +238,7 @@ impl BankForks { | ||||
|                 .snapshot_config | ||||
|                 .as_ref() | ||||
|                 .expect("Called package_snapshot without a snapshot configuration"); | ||||
|             info!("setting snapshot root: {}", root); | ||||
|             if root - self.slots_since_snapshot[0] >= config.snapshot_interval_slots as u64 { | ||||
|                 let mut snapshot_time = Measure::start("total-snapshot-ms"); | ||||
|                 let r = self.generate_snapshot( | ||||
|   | ||||
| @@ -1,8 +1,9 @@ | ||||
| use crate::validator::ValidatorConfig; | ||||
| use solana_client::thin_client::ThinClient; | ||||
| use solana_sdk::pubkey::Pubkey; | ||||
|  | ||||
| pub trait Cluster { | ||||
|     fn get_node_pubkeys(&self) -> Vec<Pubkey>; | ||||
|     fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient>; | ||||
|     fn restart_node(&mut self, pubkey: Pubkey); | ||||
|     fn restart_node(&mut self, pubkey: Pubkey, config: &ValidatorConfig); | ||||
| } | ||||
|   | ||||
| @@ -14,6 +14,7 @@ use std::fs::File; | ||||
| use std::io::{BufReader, BufWriter, Error as IOError, ErrorKind}; | ||||
| use std::path::{Path, PathBuf}; | ||||
| use tar::Archive; | ||||
| use tempfile::TempDir; | ||||
|  | ||||
| const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache"; | ||||
|  | ||||
| @@ -57,8 +58,6 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>( | ||||
|     snapshot_package_output_file: P, | ||||
|     snapshot_path: Q, | ||||
| ) -> Result<SnapshotPackage> { | ||||
|     let slot = bank.slot(); | ||||
|  | ||||
|     // Hard link all the snapshots we need for this package | ||||
|     let snapshot_hard_links_dir = tempfile::tempdir_in(snapshot_path)?; | ||||
|  | ||||
| @@ -73,7 +72,7 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>( | ||||
|     // Create a snapshot package | ||||
|     info!( | ||||
|         "Snapshot for bank: {} has {} account storage entries", | ||||
|         slot, | ||||
|         bank.slot(), | ||||
|         account_storage_entries.len() | ||||
|     ); | ||||
|  | ||||
| @@ -172,6 +171,20 @@ pub fn remove_snapshot<P: AsRef<Path>>(slot: u64, snapshot_path: P) -> Result<() | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| pub fn bank_slot_from_archive<P: AsRef<Path>>(snapshot_tar: P) -> Result<u64> { | ||||
|     let tempdir = TempDir::new()?; | ||||
|     untar_snapshot_in(&snapshot_tar, &tempdir)?; | ||||
|     let unpacked_snapshots_dir = tempdir.path().join(TAR_SNAPSHOTS_DIR); | ||||
|     let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir); | ||||
|     let last_root_paths = snapshot_paths | ||||
|         .last() | ||||
|         .ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?; | ||||
|     let file = File::open(&last_root_paths.snapshot_file_path)?; | ||||
|     let mut stream = BufReader::new(file); | ||||
|     let bank: Bank = deserialize_from(&mut stream).map_err(|e| get_io_error(&e.to_string()))?; | ||||
|     Ok(bank.slot()) | ||||
| } | ||||
|  | ||||
| pub fn bank_from_archive<P: AsRef<Path>>( | ||||
|     account_paths: String, | ||||
|     snapshot_config: &SnapshotConfig, | ||||
|   | ||||
| @@ -10,6 +10,7 @@ homepage = "https://solana.com/" | ||||
|  | ||||
| [dependencies] | ||||
| log = "0.4.8" | ||||
| rand = "0.6.5" | ||||
| solana-core = { path = "../core", version = "0.18.0-pre2" } | ||||
| solana-client = { path = "../client", version = "0.18.0-pre2" } | ||||
| solana-logger = { path = "../logger", version = "0.18.0-pre2" } | ||||
| @@ -20,6 +21,7 @@ solana-storage-api = { path = "../programs/storage_api", version = "0.18.0-pre2" | ||||
| solana-storage-program = { path = "../programs/storage_program", version = "0.18.0-pre2" } | ||||
| solana-vote-api = { path = "../programs/vote_api", version = "0.18.0-pre2" } | ||||
| symlink = "0.1.0" | ||||
| tempfile = "3.1.0" | ||||
|  | ||||
| [dev-dependencies] | ||||
| serial_test = "0.2.0" | ||||
|   | ||||
| @@ -1,3 +1,4 @@ | ||||
| use rand::{thread_rng, Rng}; | ||||
| use solana_client::thin_client::create_client; | ||||
| /// Cluster independant integration tests | ||||
| /// | ||||
| @@ -25,7 +26,12 @@ use solana_sdk::{ | ||||
|     }, | ||||
|     transport::TransportError, | ||||
| }; | ||||
| use std::{collections::HashSet, path::Path, thread::sleep, time::Duration}; | ||||
| use std::{ | ||||
|     collections::{HashMap, HashSet}, | ||||
|     path::Path, | ||||
|     thread::sleep, | ||||
|     time::Duration, | ||||
| }; | ||||
|  | ||||
| const DEFAULT_SLOT_MILLIS: u64 = (DEFAULT_TICKS_PER_SLOT * 1000) / DEFAULT_TICKS_PER_SECOND; | ||||
|  | ||||
| @@ -65,8 +71,25 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher>( | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub fn send_many_transactions(node: &ContactInfo, funding_keypair: &Keypair, num_txs: u64) { | ||||
| pub fn verify_balances<S: ::std::hash::BuildHasher>( | ||||
|     expected_balances: HashMap<Pubkey, u64, S>, | ||||
|     node: &ContactInfo, | ||||
| ) { | ||||
|     let client = create_client(node.client_facing_addr(), FULLNODE_PORT_RANGE); | ||||
|     for (pk, b) in expected_balances { | ||||
|         let bal = client.poll_get_balance(&pk).expect("balance in source"); | ||||
|         assert_eq!(bal, b); | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub fn send_many_transactions( | ||||
|     node: &ContactInfo, | ||||
|     funding_keypair: &Keypair, | ||||
|     max_tokens_per_transfer: u64, | ||||
|     num_txs: u64, | ||||
| ) -> HashMap<Pubkey, u64> { | ||||
|     let client = create_client(node.client_facing_addr(), FULLNODE_PORT_RANGE); | ||||
|     let mut expected_balances = HashMap::new(); | ||||
|     for _ in 0..num_txs { | ||||
|         let random_keypair = Keypair::new(); | ||||
|         let bal = client | ||||
| @@ -74,12 +97,23 @@ pub fn send_many_transactions(node: &ContactInfo, funding_keypair: &Keypair, num | ||||
|             .expect("balance in source"); | ||||
|         assert!(bal > 0); | ||||
|         let (blockhash, _fee_calculator) = client.get_recent_blockhash().unwrap(); | ||||
|         let mut transaction = | ||||
|             system_transaction::transfer(&funding_keypair, &random_keypair.pubkey(), 1, blockhash); | ||||
|         let transfer_amount = thread_rng().gen_range(1, max_tokens_per_transfer); | ||||
|  | ||||
|         let mut transaction = system_transaction::transfer( | ||||
|             &funding_keypair, | ||||
|             &random_keypair.pubkey(), | ||||
|             transfer_amount, | ||||
|             blockhash, | ||||
|         ); | ||||
|  | ||||
|         client | ||||
|             .retry_transfer(&funding_keypair, &mut transaction, 5) | ||||
|             .unwrap(); | ||||
|  | ||||
|         expected_balances.insert(random_keypair.pubkey(), transfer_amount); | ||||
|     } | ||||
|  | ||||
|     expected_balances | ||||
| } | ||||
|  | ||||
| pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) { | ||||
|   | ||||
| @@ -9,3 +9,5 @@ extern crate solana_core; | ||||
|  | ||||
| #[macro_use] | ||||
| extern crate solana_storage_program; | ||||
|  | ||||
| extern crate tempfile; | ||||
|   | ||||
| @@ -585,19 +585,28 @@ impl Cluster for LocalCluster { | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     fn restart_node(&mut self, pubkey: Pubkey) { | ||||
|     fn restart_node(&mut self, pubkey: Pubkey, config: &ValidatorConfig) { | ||||
|         // Shut down the fullnode | ||||
|         let mut node = self.fullnodes.remove(&pubkey).unwrap(); | ||||
|         node.exit(); | ||||
|         node.join().unwrap(); | ||||
|  | ||||
|         // Restart the node | ||||
|         let fullnode_info = &self.fullnode_infos[&pubkey].info; | ||||
|         let config = &self.fullnode_infos[&pubkey].config; | ||||
|         let node = Node::new_localhost_with_pubkey(&fullnode_info.keypair.pubkey()); | ||||
|         // Update the stored ContactInfo for this node | ||||
|         let node_pubkey = &self.fullnode_infos[&pubkey].info.keypair.pubkey(); | ||||
|         let node = Node::new_localhost_with_pubkey(&node_pubkey); | ||||
|         self.fullnode_infos | ||||
|             .get_mut(&pubkey) | ||||
|             .unwrap() | ||||
|             .info | ||||
|             .contact_info = node.info.clone(); | ||||
|         if pubkey == self.entry_point_info.id { | ||||
|             self.entry_point_info = node.info.clone(); | ||||
|         } | ||||
|  | ||||
|         // Restart the node | ||||
|         self.fullnode_infos.get_mut(&pubkey).unwrap().config = config.clone(); | ||||
|         let fullnode_info = &self.fullnode_infos[&pubkey].info; | ||||
|  | ||||
|         let restarted_node = Validator::new( | ||||
|             node, | ||||
|             &fullnode_info.keypair, | ||||
|   | ||||
| @@ -3,16 +3,25 @@ extern crate solana_core; | ||||
| use log::*; | ||||
| use serial_test_derive::serial; | ||||
| use solana_core::{ | ||||
|     blocktree::Blocktree, broadcast_stage::BroadcastStageType, cluster::Cluster, | ||||
|     gossip_service::discover_cluster, validator::ValidatorConfig, | ||||
|     bank_forks::SnapshotConfig, blocktree::Blocktree, broadcast_stage::BroadcastStageType, | ||||
|     cluster::Cluster, gossip_service::discover_cluster, snapshot_utils, validator::ValidatorConfig, | ||||
| }; | ||||
| use solana_local_cluster::{ | ||||
|     cluster_tests, | ||||
|     local_cluster::{ClusterConfig, LocalCluster}, | ||||
| }; | ||||
| use solana_runtime::epoch_schedule::{EpochSchedule, MINIMUM_SLOTS_PER_EPOCH}; | ||||
| use solana_runtime::{ | ||||
|     accounts_db::AccountsDB, | ||||
|     epoch_schedule::{EpochSchedule, MINIMUM_SLOTS_PER_EPOCH}, | ||||
| }; | ||||
| use solana_sdk::{client::SyncClient, poh_config::PohConfig, timing}; | ||||
| use std::{collections::HashSet, thread::sleep, time::Duration}; | ||||
| use std::{ | ||||
|     collections::{HashMap, HashSet}, | ||||
|     fs, | ||||
|     thread::sleep, | ||||
|     time::Duration, | ||||
| }; | ||||
| use tempfile::TempDir; | ||||
|  | ||||
| #[test] | ||||
| #[serial] | ||||
| @@ -232,7 +241,7 @@ fn test_forwarding() { | ||||
|         .unwrap(); | ||||
|  | ||||
|     // Confirm that transactions were forwarded to and processed by the leader. | ||||
|     cluster_tests::send_many_transactions(&validator_info, &cluster.funding_keypair, 20); | ||||
|     cluster_tests::send_many_transactions(&validator_info, &cluster.funding_keypair, 10, 20); | ||||
| } | ||||
|  | ||||
| #[test] | ||||
| @@ -242,10 +251,11 @@ fn test_restart_node() { | ||||
|     error!("test_restart_node"); | ||||
|     let slots_per_epoch = MINIMUM_SLOTS_PER_EPOCH as u64; | ||||
|     let ticks_per_slot = 16; | ||||
|     let validator_config = ValidatorConfig::default(); | ||||
|     let mut cluster = LocalCluster::new(&ClusterConfig { | ||||
|         node_stakes: vec![3], | ||||
|         cluster_lamports: 100, | ||||
|         validator_configs: vec![ValidatorConfig::default()], | ||||
|         validator_configs: vec![validator_config.clone()], | ||||
|         ticks_per_slot, | ||||
|         slots_per_epoch, | ||||
|         ..ClusterConfig::default() | ||||
| @@ -257,14 +267,19 @@ fn test_restart_node() { | ||||
|         timing::DEFAULT_TICKS_PER_SLOT, | ||||
|         slots_per_epoch, | ||||
|     ); | ||||
|     cluster.restart_node(nodes[0]); | ||||
|     cluster.restart_node(nodes[0], &validator_config); | ||||
|     cluster_tests::sleep_n_epochs( | ||||
|         0.5, | ||||
|         &cluster.genesis_block.poh_config, | ||||
|         timing::DEFAULT_TICKS_PER_SLOT, | ||||
|         slots_per_epoch, | ||||
|     ); | ||||
|     cluster_tests::send_many_transactions(&cluster.entry_point_info, &cluster.funding_keypair, 1); | ||||
|     cluster_tests::send_many_transactions( | ||||
|         &cluster.entry_point_info, | ||||
|         &cluster.funding_keypair, | ||||
|         10, | ||||
|         1, | ||||
|     ); | ||||
| } | ||||
|  | ||||
| #[test] | ||||
| @@ -282,6 +297,100 @@ fn test_listener_startup() { | ||||
|     assert_eq!(cluster_nodes.len(), 4); | ||||
| } | ||||
|  | ||||
| #[test] | ||||
| #[serial] | ||||
| fn test_snapshots_restart_validity() { | ||||
|     let temp_dir = TempDir::new().unwrap(); | ||||
|     let snapshot_path = temp_dir.path().join("bank_states"); | ||||
|     let snapshot_package_output_path = temp_dir.path().join("tar"); | ||||
|     let snapshot_interval_slots = 25; | ||||
|  | ||||
|     // Create the snapshot directories | ||||
|     fs::create_dir_all(&snapshot_path).expect("Failed to create snapshots bank state directory"); | ||||
|     fs::create_dir_all(&snapshot_package_output_path) | ||||
|         .expect("Failed to create snapshots tar directory"); | ||||
|  | ||||
|     // Set up the cluster with 1 snapshotting validator | ||||
|     let mut snapshot_validator_config = ValidatorConfig::default(); | ||||
|     snapshot_validator_config.rpc_config.enable_fullnode_exit = true; | ||||
|     snapshot_validator_config.snapshot_config = Some(SnapshotConfig::new( | ||||
|         snapshot_path, | ||||
|         snapshot_package_output_path.clone(), | ||||
|         snapshot_interval_slots, | ||||
|     )); | ||||
|     let num_account_paths = 4; | ||||
|     let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths); | ||||
|     let mut all_account_storage_dirs = vec![account_storage_dirs]; | ||||
|     snapshot_validator_config.account_paths = Some(account_storage_paths); | ||||
|  | ||||
|     let config = ClusterConfig { | ||||
|         node_stakes: vec![10000], | ||||
|         cluster_lamports: 100000, | ||||
|         validator_configs: vec![snapshot_validator_config.clone()], | ||||
|         ..ClusterConfig::default() | ||||
|     }; | ||||
|  | ||||
|     // Create and reboot the node from snapshot `num_runs` times | ||||
|     let num_runs = 3; | ||||
|     let mut expected_balances = HashMap::new(); | ||||
|     let mut cluster = LocalCluster::new(&config); | ||||
|     for _ in 0..num_runs { | ||||
|         // Push transactions to one of the nodes and confirm that transactions were | ||||
|         // forwarded to and processed. | ||||
|         trace!("Sending transactions"); | ||||
|         let new_balances = cluster_tests::send_many_transactions( | ||||
|             &cluster.entry_point_info, | ||||
|             &cluster.funding_keypair, | ||||
|             10, | ||||
|             10, | ||||
|         ); | ||||
|  | ||||
|         expected_balances.extend(new_balances); | ||||
|  | ||||
|         // Get slot after which this was generated | ||||
|         let client = cluster | ||||
|             .get_validator_client(&cluster.entry_point_info.id) | ||||
|             .unwrap(); | ||||
|         let last_slot = client.get_slot().expect("Couldn't get slot"); | ||||
|  | ||||
|         // Wait for a snapshot for a bank >= last_slot to be made so we know that the snapshot | ||||
|         // must include the transactions just pushed | ||||
|         let tar = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path); | ||||
|         trace!("Waiting for tar to be generated"); | ||||
|         loop { | ||||
|             if tar.exists() && snapshot_utils::bank_slot_from_archive(&tar).unwrap() >= last_slot { | ||||
|                 break; | ||||
|             } | ||||
|             sleep(Duration::from_millis(100)); | ||||
|         } | ||||
|  | ||||
|         // Create new account paths since fullnode exit is not guaranteed to cleanup RPC threads, | ||||
|         // which may delete the old accounts on exit at any point | ||||
|         let (new_account_storage_dirs, new_account_storage_paths) = | ||||
|             generate_account_paths(num_account_paths); | ||||
|         all_account_storage_dirs.push(new_account_storage_dirs); | ||||
|         snapshot_validator_config.account_paths = Some(new_account_storage_paths); | ||||
|  | ||||
|         // Restart a node | ||||
|         trace!("Restarting cluster from snapshot"); | ||||
|         let nodes = cluster.get_node_pubkeys(); | ||||
|         cluster.restart_node(nodes[0], &snapshot_validator_config); | ||||
|  | ||||
|         // Verify account balances on validator | ||||
|         trace!("Verifying balances"); | ||||
|         cluster_tests::verify_balances(expected_balances.clone(), &cluster.entry_point_info); | ||||
|  | ||||
|         // Check that we can still push transactions | ||||
|         trace!("Spending and verifying"); | ||||
|         cluster_tests::spend_and_verify_all_nodes( | ||||
|             &cluster.entry_point_info, | ||||
|             &cluster.funding_keypair, | ||||
|             1, | ||||
|             HashSet::new(), | ||||
|         ); | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[allow(unused_attributes)] | ||||
| #[test] | ||||
| #[serial] | ||||
| @@ -453,3 +562,15 @@ fn run_repairman_catchup(num_repairmen: u64) { | ||||
|         sleep(Duration::from_secs(1)); | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn generate_account_paths(num_account_paths: usize) -> (Vec<TempDir>, String) { | ||||
|     let account_storage_dirs: Vec<TempDir> = (0..num_account_paths) | ||||
|         .map(|_| TempDir::new().unwrap()) | ||||
|         .collect(); | ||||
|     let account_storage_paths: Vec<_> = account_storage_dirs | ||||
|         .iter() | ||||
|         .map(|a| a.path().to_str().unwrap().to_string()) | ||||
|         .collect(); | ||||
|     let account_storage_paths = AccountsDB::format_paths(account_storage_paths); | ||||
|     (account_storage_dirs, account_storage_paths) | ||||
| } | ||||
|   | ||||
| @@ -427,13 +427,10 @@ impl AccountsDB { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn paths(&self) -> String { | ||||
|         let paths: Vec<String> = self | ||||
|             .paths | ||||
|             .read() | ||||
|             .unwrap() | ||||
|     pub fn format_paths<P: AsRef<Path>>(paths: Vec<P>) -> String { | ||||
|         let paths: Vec<String> = paths | ||||
|             .iter() | ||||
|             .map(|p| p.to_str().unwrap().to_owned()) | ||||
|             .map(|p| p.as_ref().to_str().unwrap().to_owned()) | ||||
|             .collect(); | ||||
|         paths.join(",") | ||||
|     } | ||||
| @@ -1417,7 +1414,11 @@ pub mod tests { | ||||
|         let buf = writer.into_inner(); | ||||
|         let mut reader = BufReader::new(&buf[..]); | ||||
|         let daccounts = AccountsDB::new(None); | ||||
|         let local_paths = daccounts.paths(); | ||||
|  | ||||
|         let local_paths = { | ||||
|             let paths = daccounts.paths.read().unwrap(); | ||||
|             AccountsDB::format_paths(paths.to_vec()) | ||||
|         }; | ||||
|         let copied_accounts = TempDir::new().unwrap(); | ||||
|         // Simulate obtaining a copy of the AppendVecs from a tarball | ||||
|         copy_append_vecs(&accounts, copied_accounts.path()).unwrap(); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user