diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index 452a170df4..23c8127c1c 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -57,7 +57,7 @@ pub struct BankForks { working_bank: Arc, root: u64, snapshot_config: Option, - last_snapshot: u64, + slots_since_snapshot: Vec, confidence: HashMap, } @@ -110,7 +110,7 @@ impl BankForks { working_bank, root: 0, snapshot_config: None, - last_snapshot: 0, + slots_since_snapshot: vec![], confidence: HashMap::new(), } } @@ -175,7 +175,7 @@ impl BankForks { banks, working_bank, snapshot_config: None, - last_snapshot: 0, + slots_since_snapshot: vec![], confidence: HashMap::new(), } } @@ -208,26 +208,27 @@ impl BankForks { .unwrap_or(0); root_bank.squash(); let new_tx_count = root_bank.transaction_count(); - self.prune_non_root(root); + self.slots_since_snapshot.push(root); // Generate a snapshot if snapshots are configured and it's been an appropriate number // of banks since the last snapshot - if self.snapshot_config.is_some() { + if self.snapshot_config.is_some() && snapshot_package_sender.is_some() { let config = self .snapshot_config .as_ref() .expect("Called package_snapshot without a snapshot configuration"); - if root - self.last_snapshot >= config.snapshot_interval_slots as u64 { + if root - self.slots_since_snapshot[0] >= config.snapshot_interval_slots as u64 { let mut snapshot_time = Measure::start("total-snapshot-ms"); let r = self.generate_snapshot( root, + &self.slots_since_snapshot[1..], snapshot_package_sender.as_ref().unwrap(), snapshot_utils::get_snapshot_tar_path(&config.snapshot_package_output_path), ); if r.is_err() { warn!("Error generating snapshot for bank: {}, err: {:?}", root, r); } else { - self.last_snapshot = root; + self.slots_since_snapshot = vec![root]; } // Cleanup outdated snapshots @@ -237,6 +238,8 @@ impl BankForks { } } + self.prune_non_root(root); + inc_new_counter_info!( "bank-forks_set_root_ms", timing::duration_as_ms(&set_root_start.elapsed()) as usize @@ -254,10 +257,10 @@ impl BankForks { fn purge_old_snapshots(&self) { // Remove outdated snapshots let config = self.snapshot_config.as_ref().unwrap(); - let names = snapshot_utils::get_snapshot_names(&config.snapshot_path); - let num_to_remove = names.len().saturating_sub(MAX_CACHE_ENTRIES); - for old_root in &names[..num_to_remove] { - let r = snapshot_utils::remove_snapshot(*old_root, &config.snapshot_path); + let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path); + let num_to_remove = slot_snapshot_paths.len().saturating_sub(MAX_CACHE_ENTRIES); + for slot_files in &slot_snapshot_paths[..num_to_remove] { + let r = snapshot_utils::remove_snapshot(slot_files.slot, &config.snapshot_path); if r.is_err() { warn!("Couldn't remove snapshot at: {:?}", config.snapshot_path); } @@ -267,6 +270,7 @@ impl BankForks { fn generate_snapshot>( &self, root: u64, + slots_since_snapshot: &[u64], snapshot_package_sender: &SnapshotPackageSender, tar_output_file: P, ) -> Result<()> { @@ -277,19 +281,17 @@ impl BankForks { .get(root) .cloned() .expect("root must exist in BankForks"); - snapshot_utils::add_snapshot(&config.snapshot_path, &bank)?; - + snapshot_utils::add_snapshot(&config.snapshot_path, &bank, slots_since_snapshot)?; // Package the relevant snapshots - let names = snapshot_utils::get_snapshot_names(&config.snapshot_path); + let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path); // We only care about the last MAX_CACHE_ENTRIES snapshots of roots because // the status cache of anything older is thrown away by the bank in // status_cache.prune_roots() - let start = names.len().saturating_sub(MAX_CACHE_ENTRIES); + let start = slot_snapshot_paths.len().saturating_sub(MAX_CACHE_ENTRIES); let package = snapshot_utils::package_snapshot( &bank, - &names[start..], - &config.snapshot_path, + &slot_snapshot_paths[start..], tar_output_file, )?; @@ -362,9 +364,10 @@ impl BankForks { let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR); let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR); + let snapshot_paths = snapshot_utils::get_snapshot_paths(&unpacked_snapshots_dir); let bank = snapshot_utils::bank_from_snapshots( account_paths, - &unpacked_snapshots_dir, + &snapshot_paths, unpacked_accounts_dir, )?; @@ -378,34 +381,11 @@ impl BankForks { copy_options.overwrite = true; fs_extra::move_items(&paths, snapshot_config.snapshot_path(), ©_options)?; - /*let mut bank_maps = vec![]; - let status_cache_rc = StatusCacheRc::default(); - let id = (names[names.len() - 1] + 1) as usize; - let mut bank0 = - Bank::create_with_genesis(&genesis_block, account_paths.clone(), &status_cache_rc, id); - bank0.freeze(); - let bank_root = snapshot_utils::load_snapshots( - &names, - &mut bank0, - &mut bank_maps, - &status_cache_rc, - &snapshot_config.snapshot_path, - ); - if bank_maps.is_empty() || bank_root.is_none() { - return Err(Error::IO(IOError::new( - ErrorKind::Other, - "no snapshots loaded", - ))); - } - - let (banks, last_slot) = BankForks::setup_banks(&mut bank_maps, &bank.rc, &status_cache_rc); - let working_bank = banks[&last_slot].clone();*/ - let mut banks = HashMap::new(); banks.insert(bank.slot(), bank.clone()); let root = bank.slot(); - let names = snapshot_utils::get_snapshot_names(&snapshot_config.snapshot_path); - if names.is_empty() { + let snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshot_config.snapshot_path); + if snapshot_paths.is_empty() { return Err(Error::IO(IOError::new( ErrorKind::Other, "no snapshots found", @@ -416,9 +396,7 @@ impl BankForks { working_bank: bank, root, snapshot_config: None, - last_snapshot: *names - .last() - .expect("untarred snapshot should have at least one snapshot"), + slots_since_snapshot: vec![snapshot_paths.last().unwrap().slot], confidence: HashMap::new(), }) } @@ -432,7 +410,7 @@ mod tests { use crate::snapshot_package::SnapshotPackagerService; use fs_extra::dir::CopyOptions; use itertools::Itertools; - use solana_sdk::hash::Hash; + use solana_sdk::hash::{hashv, Hash}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; @@ -576,8 +554,14 @@ mod tests { } } - #[test] - fn test_bank_forks_snapshot_n() { + // creates banks upto "last_slot" and runs the input function `f` on each bank created + // also marks each bank as root and generates snapshots + // finally tries to restore from the last bank's snapshot and compares the restored bank to the + // `last_slot` bank + fn run_bank_forks_snapshot_n(last_slot: u64, f: F) + where + F: Fn(&mut Bank, &Keypair), + { solana_logger::setup(); let accounts_dir = TempDir::new().unwrap(); let snapshot_dir = TempDir::new().unwrap(); @@ -587,55 +571,107 @@ mod tests { mint_keypair, .. } = create_genesis_block(10_000); - for index in 0..4 { - let bank0 = Bank::new_with_paths( - &genesis_block, - Some(accounts_dir.path().to_str().unwrap().to_string()), - ); - bank0.freeze(); - let mut bank_forks = BankForks::new(0, bank0); - let snapshot_config = SnapshotConfig::new( - PathBuf::from(snapshot_dir.path()), - PathBuf::from(snapshot_output_path.path()), - 100, - ); - bank_forks.set_snapshot_config(snapshot_config.clone()); - let bank0 = bank_forks.get(0).unwrap(); - snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0).unwrap(); - for forks in 0..index { - let bank = Bank::new_from_parent(&bank_forks[forks], &Pubkey::default(), forks + 1); - let key1 = Keypair::new().pubkey(); - let tx = system_transaction::create_user_account( - &mint_keypair, - &key1, - 1, - genesis_block.hash(), - ); - assert_eq!(bank.process_transaction(&tx), Ok(())); - bank.freeze(); - snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, &bank).unwrap(); - bank_forks.insert(bank); - } - // Generate a snapshot package for last bank - let last_bank = bank_forks.get(index.saturating_sub(1)).unwrap(); - let names: Vec<_> = (0..=index).collect(); - let snapshot_package = snapshot_utils::package_snapshot( - last_bank, - &names, + let bank0 = Bank::new_with_paths( + &genesis_block, + Some(accounts_dir.path().to_str().unwrap().to_string()), + ); + bank0.freeze(); + let mut bank_forks = BankForks::new(0, bank0); + let snapshot_config = SnapshotConfig::new( + PathBuf::from(snapshot_dir.path()), + PathBuf::from(snapshot_output_path.path()), + 100, + ); + bank_forks.set_snapshot_config(snapshot_config.clone()); + let bank0 = bank_forks.get(0).unwrap(); + snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, &vec![]).unwrap(); + for slot in 0..last_slot { + let mut bank = Bank::new_from_parent(&bank_forks[slot], &Pubkey::default(), slot + 1); + f(&mut bank, &mint_keypair); + let bank = bank_forks.insert(bank); + // set root to make sure we don't end up with too many account storage entries + // and to allow purging logic on status_cache to kick in + bank_forks.set_root(bank.slot(), &None); + snapshot_utils::add_snapshot( &snapshot_config.snapshot_path, - snapshot_utils::get_snapshot_tar_path( - &snapshot_config.snapshot_package_output_path, - ), + &bank, + &(vec![bank.slot()]), ) .unwrap(); - SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap(); - - restore_from_snapshot( - bank_forks, - accounts_dir.path().to_str().unwrap().to_string(), - index, - ); } + // Generate a snapshot package for last bank + let last_bank = bank_forks.get(last_slot).unwrap(); + let slot_snapshot_paths = + snapshot_utils::get_snapshot_paths(&snapshot_config.snapshot_path); + let snapshot_package = snapshot_utils::package_snapshot( + last_bank, + &slot_snapshot_paths, + snapshot_utils::get_snapshot_tar_path(&snapshot_config.snapshot_package_output_path), + ) + .unwrap(); + SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap(); + + restore_from_snapshot( + bank_forks, + accounts_dir.path().to_str().unwrap().to_string(), + last_slot, + ); + } + + #[test] + fn test_bank_forks_snapshot_n() { + // create banks upto slot 4 and create 1 new account in each bank. test that bank 4 snapshots + // and restores correctly + run_bank_forks_snapshot_n(4, |bank, mint_keypair| { + let key1 = Keypair::new().pubkey(); + let tx = system_transaction::create_user_account( + &mint_keypair, + &key1, + 1, + bank.last_blockhash(), + ); + assert_eq!(bank.process_transaction(&tx), Ok(())); + bank.freeze(); + }); + } + + fn goto_end_of_slot(bank: &mut Bank) { + let mut tick_hash = bank.last_blockhash(); + loop { + tick_hash = hashv(&[&tick_hash.as_ref(), &[42]]); + bank.register_tick(&tick_hash); + if tick_hash == bank.last_blockhash() { + bank.freeze(); + return; + } + } + } + + #[test] + fn test_bank_forks_status_cache_snapshot_n() { + // create banks upto slot 256 while transferring 1 lamport into 2 different accounts each time + // this is done to ensure the AccountStorageEntries keep getting cleaned up as the root moves + // ahead. Also tests the status_cache purge and status cache snapshotting. + // Makes sure that the last bank is restored correctly + let key1 = Keypair::new().pubkey(); + let key2 = Keypair::new().pubkey(); + run_bank_forks_snapshot_n(256, |bank, mint_keypair| { + let tx = system_transaction::transfer( + &mint_keypair, + &key1, + 1, + bank.parent().unwrap().last_blockhash(), + ); + assert_eq!(bank.process_transaction(&tx), Ok(())); + let tx = system_transaction::transfer( + &mint_keypair, + &key2, + 1, + bank.parent().unwrap().last_blockhash(), + ); + assert_eq!(bank.process_transaction(&tx), Ok(())); + goto_end_of_slot(bank); + }); } #[test] @@ -668,7 +704,7 @@ mod tests { // Take snapshot of zeroth bank let bank0 = bank_forks.get(0).unwrap(); - snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0).unwrap(); + snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, &vec![]).unwrap(); // Create next MAX_CACHE_ENTRIES + 2 banks and snapshots. Every bank will get snapshotted // and the snapshot purging logic will run on every snapshot taken. This means the three @@ -715,6 +751,7 @@ mod tests { bank_forks .generate_snapshot( slot, + &vec![], &package_sender, snapshot_config .snapshot_package_output_path @@ -732,10 +769,10 @@ mod tests { // Purge all the outdated snapshots, including the ones needed to generate the package // currently sitting in the channel bank_forks.purge_old_snapshots(); - let mut snapshot_names = snapshot_utils::get_snapshot_names(&snapshots_dir); - snapshot_names.sort(); + let mut snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshots_dir); + snapshot_paths.sort(); assert_eq!( - snapshot_names, + snapshot_paths.iter().map(|path| path.slot).collect_vec(), (3..=MAX_CACHE_ENTRIES as u64 + 2).collect_vec() ); diff --git a/core/src/snapshot_utils.rs b/core/src/snapshot_utils.rs index 0ba5400bd6..3811497f8b 100644 --- a/core/src/snapshot_utils.rs +++ b/core/src/snapshot_utils.rs @@ -3,16 +3,54 @@ use crate::snapshot_package::SnapshotPackage; use bincode::{deserialize_from, serialize_into}; use flate2::read::GzDecoder; use solana_runtime::bank::Bank; +use solana_runtime::status_cache::SlotDelta; +use solana_sdk::transaction; +use std::cmp::Ordering; use std::fs; use std::fs::File; use std::io::{BufReader, BufWriter, Error as IOError, ErrorKind}; use std::path::{Path, PathBuf}; use tar::Archive; -pub fn package_snapshot, Q: AsRef>( +const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache"; + +#[derive(PartialEq, Ord, Eq, Debug)] +pub struct SlotSnapshotPaths { + pub slot: u64, + pub snapshot_file_path: PathBuf, + pub snapshot_status_cache_path: PathBuf, +} + +impl PartialOrd for SlotSnapshotPaths { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.slot.cmp(&other.slot)) + } +} + +impl SlotSnapshotPaths { + fn hardlink_snapshot_directory>(&self, snapshot_hardlink_dir: P) -> Result<()> { + // Create a new directory in snapshot_hardlink_dir + let new_slot_hardlink_dir = snapshot_hardlink_dir.as_ref().join(self.slot.to_string()); + let _ = fs::remove_dir_all(&new_slot_hardlink_dir); + fs::create_dir_all(&new_slot_hardlink_dir)?; + + // Hardlink the snapshot + fs::hard_link( + &self.snapshot_file_path, + &new_slot_hardlink_dir.join(self.slot.to_string()), + )?; + // Hardlink the status cache + fs::hard_link( + &self.snapshot_status_cache_path, + &new_slot_hardlink_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME), + )?; + Ok(()) + } +} + +pub fn package_snapshot>( bank: &Bank, - snapshot_names: &[u64], - snapshot_dir: P, + snapshot_files: &[SlotSnapshotPaths], snapshot_package_output_file: Q, ) -> Result { let slot = bank.slot(); @@ -46,15 +84,15 @@ pub fn package_snapshot, Q: AsRef>( // Any errors from this point on will cause the above SnapshotPackage to drop, clearing // any temporary state created for the SnapshotPackage (like the snapshot_hard_links_dir) - for name in snapshot_names { - hardlink_snapshot_directory(&snapshot_dir, &snapshot_hard_links_dir, *name)?; + for files in snapshot_files { + files.hardlink_snapshot_directory(&snapshot_hard_links_dir)?; } Ok(package) } -pub fn get_snapshot_names>(snapshot_path: P) -> Vec { - let paths = fs::read_dir(snapshot_path).expect("Invalid snapshot path"); +pub fn get_snapshot_paths>(snapshot_path: P) -> Vec { + let paths = fs::read_dir(&snapshot_path).expect("Invalid snapshot path"); let mut names = paths .filter_map(|entry| { entry.ok().and_then(|e| { @@ -64,37 +102,64 @@ pub fn get_snapshot_names>(snapshot_path: P) -> Vec { .unwrap_or(None) }) }) - .collect::>(); + .map(|slot| { + let snapshot_path = snapshot_path.as_ref().join(slot.to_string()); + SlotSnapshotPaths { + slot, + snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)), + snapshot_status_cache_path: snapshot_path.join(SNAPSHOT_STATUS_CACHE_FILE_NAME), + } + }) + .collect::>(); names.sort(); names } -pub fn add_snapshot>(snapshot_path: P, bank: &Bank) -> Result<()> { +pub fn add_snapshot>( + snapshot_path: P, + bank: &Bank, + slots_since_snapshot: &[u64], +) -> Result<()> { let slot = bank.slot(); + // snapshot_path/slot let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot); fs::create_dir_all(slot_snapshot_dir.clone()).map_err(Error::from)?; + // the snapshot is stored as snapshot_path/slot/slot let snapshot_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot)); + // the status cache is stored as snapshot_path/slot/slot_satus_cache + let snapshot_status_cache_file_path = slot_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME); info!( - "creating snapshot {}, path: {:?}", + "creating snapshot {}, path: {:?} status_cache: {:?}", bank.slot(), - snapshot_file_path + snapshot_file_path, + snapshot_status_cache_file_path ); - let file = File::create(&snapshot_file_path)?; - let mut stream = BufWriter::new(file); + let snapshot_file = File::create(&snapshot_file_path)?; + // snapshot writer + let mut snapshot_stream = BufWriter::new(snapshot_file); + let status_cache = File::create(&snapshot_status_cache_file_path)?; + // status cache writer + let mut status_cache_stream = BufWriter::new(status_cache); // Create the snapshot - serialize_into(&mut stream, &*bank).map_err(|e| get_io_error(&e.to_string()))?; - serialize_into(&mut stream, &bank.rc).map_err(|e| get_io_error(&e.to_string()))?; - // TODO: Add status cache serialization code - /*serialize_into(&mut stream, &bank.src).map_err(|e| get_io_error(&e.to_string()))?;*/ + serialize_into(&mut snapshot_stream, &*bank).map_err(|e| get_io_error(&e.to_string()))?; + serialize_into(&mut snapshot_stream, &bank.rc).map_err(|e| get_io_error(&e.to_string()))?; + // write the status cache + serialize_into( + &mut status_cache_stream, + &bank.src.slot_deltas(slots_since_snapshot), + ) + .map_err(|_| get_io_error("serialize bank status cache error"))?; info!( - "successfully created snapshot {}, path: {:?}", + "successfully created snapshot {}, path: {:?} status_cache: {:?}", bank.slot(), - snapshot_file_path + snapshot_file_path, + snapshot_status_cache_file_path ); + Ok(()) } @@ -105,25 +170,20 @@ pub fn remove_snapshot>(slot: u64, snapshot_path: P) -> Result<() Ok(()) } -pub fn bank_from_snapshots( +pub fn bank_from_snapshots

( local_account_paths: String, - snapshot_path: P, - append_vecs_path: Q, + snapshot_paths: &[SlotSnapshotPaths], + append_vecs_path: P, ) -> Result where P: AsRef, - Q: AsRef, { // Rebuild the last root bank - let names = get_snapshot_names(&snapshot_path); - let last_root = names + let last_root_paths = snapshot_paths .last() .ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?; - let snapshot_file_name = get_snapshot_file_name(*last_root); - let snapshot_dir = get_bank_snapshot_dir(&snapshot_path, *last_root); - let snapshot_file_path = snapshot_dir.join(&snapshot_file_name); - info!("Load from {:?}", snapshot_file_path); - let file = File::open(snapshot_file_path)?; + info!("Load from {:?}", &last_root_paths.snapshot_file_path); + let file = File::open(&last_root_paths.snapshot_file_path)?; let mut stream = BufReader::new(file); let bank: Bank = deserialize_from(&mut stream).map_err(|e| get_io_error(&e.to_string()))?; @@ -131,39 +191,15 @@ where bank.rc .accounts_from_stream(&mut stream, local_account_paths, append_vecs_path)?; - for bank_slot in names.iter().rev() { - let snapshot_file_name = get_snapshot_file_name(*bank_slot); - let snapshot_dir = get_bank_snapshot_dir(&snapshot_path, *bank_slot); - let snapshot_file_path = snapshot_dir.join(snapshot_file_name.clone()); - let file = File::open(snapshot_file_path)?; - let mut stream = BufReader::new(file); - let _bank: Result = - deserialize_from(&mut stream).map_err(|e| get_io_error(&e.to_string())); + // merge the status caches from all previous banks + for slot_paths in snapshot_paths.iter().rev() { + let status_cache = File::open(&slot_paths.snapshot_status_cache_path)?; + let mut stream = BufReader::new(status_cache); + let slot_deltas: Vec>> = deserialize_from(&mut stream) + .map_err(|_| get_io_error("deserialize root error")) + .unwrap_or_default(); - // TODO: Uncomment and deserialize status cache here - - /*let status_cache: Result = deserialize_from(&mut stream) - .map_err(|e| get_io_error(&e.to_string())); - if bank_root.is_some() { - match bank { - Ok(v) => { - if status_cache.is_ok() { - let status_cache = status_cache?; - status_cache_rc.append(&status_cache); - // On the last snapshot, purge all outdated status cache - // entries - if i == names.len() - 1 { - status_cache_rc.purge_roots(); - } - } - - bank_maps.push((*bank_slot, parent_slot, v)); - } - Err(_) => warn!("Load snapshot failed for {}", bank_slot), - } - } else { - warn!("Load snapshot rc failed for {}", bank_slot); - }*/ + bank.src.append(&slot_deltas); } Ok(bank) @@ -184,28 +220,6 @@ pub fn untar_snapshot_in, Q: AsRef>( Ok(()) } -fn hardlink_snapshot_directory, Q: AsRef>( - snapshot_dir: P, - snapshot_hardlink_dir: Q, - slot: u64, -) -> Result<()> { - // Create a new directory in snapshot_hardlink_dir - let new_slot_hardlink_dir = snapshot_hardlink_dir.as_ref().join(slot.to_string()); - let _ = fs::remove_dir_all(&new_slot_hardlink_dir); - fs::create_dir_all(&new_slot_hardlink_dir)?; - - // Hardlink the contents of the directory - let snapshot_file = snapshot_dir - .as_ref() - .join(slot.to_string()) - .join(slot.to_string()); - fs::hard_link( - &snapshot_file, - &new_slot_hardlink_dir.join(slot.to_string()), - )?; - Ok(()) -} - fn get_snapshot_file_name(slot: u64) -> String { slot.to_string() } diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 2456288f77..c28868d61d 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -23,7 +23,7 @@ maplit = "1.0.1" memmap = "0.6.2" rand = "0.6.5" rayon = "1.1.0" -serde = "1.0.98" +serde = { version = "1.0.98", features = ["rc"] } serde_derive = "1.0.98" serde_json = "1.0.40" solana-logger = { path = "../logger", version = "0.18.0-pre1" } diff --git a/runtime/benches/status_cache.rs b/runtime/benches/status_cache.rs index 29310098b7..2903f3dc42 100644 --- a/runtime/benches/status_cache.rs +++ b/runtime/benches/status_cache.rs @@ -28,6 +28,6 @@ fn test_statuscache_serialize(bencher: &mut Bencher) { } } bencher.iter(|| { - let _ = serialize(&status_cache).unwrap(); + let _ = serialize(&status_cache.slot_deltas(&vec![0])).unwrap(); }); } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 4be78ff006..191bec757f 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -143,7 +143,7 @@ impl AccountsIndex { pub fn add_root(&mut self, fork: Fork) { assert!( - (self.last_root == 0 && fork == 0) || (fork > self.last_root), + (self.last_root == 0 && fork == 0) || (fork >= self.last_root), "new roots must be increasing" ); self.last_root = fork; diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index c6e92c2db9..c1dd844a5a 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -16,10 +16,10 @@ use crate::serde_utils::{ deserialize_atomicbool, deserialize_atomicusize, serialize_atomicbool, serialize_atomicusize, }; use crate::stakes::Stakes; -use crate::status_cache::StatusCache; +use crate::status_cache::{SlotDelta, StatusCache}; use crate::storage_utils; use crate::storage_utils::StorageAccounts; -use bincode::{deserialize_from, serialize_into, serialized_size}; +use bincode::{deserialize_from, serialize_into}; use byteorder::{ByteOrder, LittleEndian}; use log::*; use serde::{Deserialize, Serialize}; @@ -40,11 +40,10 @@ use solana_sdk::sysvar::{ clock, fees, rewards, slot_hashes::{self, SlotHashes}, }; -use solana_sdk::timing::{duration_as_ns, get_segment_from_slot, MAX_RECENT_BLOCKHASHES}; +use solana_sdk::timing::{duration_as_ns, get_segment_from_slot, Slot, MAX_RECENT_BLOCKHASHES}; use solana_sdk::transaction::{Result, Transaction, TransactionError}; use std::cmp; use std::collections::HashMap; -use std::fmt; use std::io::{BufReader, Cursor, Error as IOError, Read}; use std::path::Path; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -120,65 +119,15 @@ pub struct StatusCacheRc { status_cache: Arc>, } -impl Serialize for StatusCacheRc { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::ser::Serializer, - { - use serde::ser::Error; - let len = serialized_size(&*self.status_cache).unwrap(); - let mut buf = vec![0u8; len as usize]; - let mut wr = Cursor::new(&mut buf[..]); - { - let mut status_cache = self.status_cache.write().unwrap(); - serialize_into(&mut wr, &*status_cache).map_err(Error::custom)?; - status_cache.merge_caches(); - } - let len = wr.position() as usize; - serializer.serialize_bytes(&wr.into_inner()[..len]) - } -} - -struct StatusCacheRcVisitor; - -impl<'a> serde::de::Visitor<'a> for StatusCacheRcVisitor { - type Value = StatusCacheRc; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("Expecting StatusCacheRc") - } - - #[allow(clippy::mutex_atomic)] - fn visit_bytes(self, data: &[u8]) -> std::result::Result - where - E: serde::de::Error, - { - use serde::de::Error; - let mut rd = Cursor::new(&data[..]); - let status_cache: BankStatusCache = deserialize_from(&mut rd).map_err(Error::custom)?; - Ok(StatusCacheRc { - status_cache: Arc::new(RwLock::new(status_cache)), - }) - } -} - -impl<'de> Deserialize<'de> for StatusCacheRc { - fn deserialize(deserializer: D) -> std::result::Result - where - D: ::serde::Deserializer<'de>, - { - deserializer.deserialize_bytes(StatusCacheRcVisitor) - } -} - impl StatusCacheRc { - pub fn append(&self, status_cache_rc: &StatusCacheRc) { - let sc = status_cache_rc.status_cache.write().unwrap(); - self.status_cache.write().unwrap().append(&sc); + pub fn slot_deltas(&self, slots: &[Slot]) -> Vec>> { + let sc = self.status_cache.read().unwrap(); + sc.slot_deltas(slots) } - pub fn purge_roots(&self) { - self.status_cache.write().unwrap().purge_roots(); + pub fn append(&self, slot_deltas: &[SlotDelta>]) { + let mut sc = self.status_cache.write().unwrap(); + sc.append(slot_deltas); } } @@ -554,20 +503,22 @@ impl Bank { pub fn squash(&self) { self.freeze(); - let parents = self.parents(); + //this bank and all its parents are now on the rooted path + let mut roots = vec![self.slot()]; + roots.append(&mut self.parents().iter().map(|p| p.slot()).collect()); *self.rc.parent.write().unwrap() = None; let mut squash_accounts_time = Measure::start("squash_accounts_time"); - for p in parents.iter().rev() { + for slot in roots.iter().rev() { // root forks cannot be purged - self.rc.accounts.add_root(p.slot()); + self.rc.accounts.add_root(*slot); } squash_accounts_time.stop(); let mut squash_cache_time = Measure::start("squash_cache_time"); - parents + roots .iter() - .for_each(|p| self.src.status_cache.write().unwrap().add_root(p.slot())); + .for_each(|slot| self.src.status_cache.write().unwrap().add_root(*slot)); squash_cache_time.stop(); datapoint_info!( @@ -1472,9 +1423,9 @@ impl Bank { assert_eq!(*bhq, *dbhq); // TODO: Uncomment once status cache serialization is done - /*let sc = self.src.status_cache.read().unwrap(); + let sc = self.src.status_cache.read().unwrap(); let dsc = dbank.src.status_cache.read().unwrap(); - assert_eq!(*sc, *dsc);*/ + assert_eq!(*sc, *dsc); assert_eq!( self.rc.accounts.hash_internal_state(self.slot), dbank.rc.accounts.hash_internal_state(dbank.slot) @@ -2414,7 +2365,10 @@ mod tests { let bank3 = Arc::new(Bank::new_from_parent(&bank1, &Pubkey::default(), 3)); bank1.squash(); - assert_eq!(bank0.get_balance(&key1.pubkey()), 1); + // This picks up the values from 1 which is the highest root: + // TODO: if we need to access rooted banks older than this, + // need to fix the lookup. + assert_eq!(bank0.get_balance(&key1.pubkey()), 4); assert_eq!(bank3.get_balance(&key1.pubkey()), 4); assert_eq!(bank2.get_balance(&key1.pubkey()), 3); bank3.squash(); diff --git a/runtime/src/status_cache.rs b/runtime/src/status_cache.rs index 1d3b209e14..0c83afa67a 100644 --- a/runtime/src/status_cache.rs +++ b/runtime/src/status_cache.rs @@ -1,75 +1,93 @@ use log::*; use rand::{thread_rng, Rng}; -use serde::ser::SerializeSeq; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use solana_sdk::hash::Hash; use solana_sdk::signature::Signature; +use solana_sdk::timing::Slot; use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, Mutex}; pub const MAX_CACHE_ENTRIES: usize = solana_sdk::timing::MAX_HASH_AGE_IN_SECONDS; const CACHED_SIGNATURE_SIZE: usize = 20; // Store forks in a single chunk of memory to avoid another lookup. -pub type ForkId = u64; -pub type ForkStatus = Vec<(ForkId, T)>; +pub type ForkStatus = Vec<(Slot, T)>; type SignatureSlice = [u8; CACHED_SIGNATURE_SIZE]; type SignatureMap = HashMap>; -type StatusMap = HashMap)>; +// Map of Hash and signature status +pub type SignatureStatus = Arc)>>>; +// A Map of hash + the highest fork it's been observed on along with +// the signature offset and a Map of the signature slice + Fork status for that signature +type StatusMap = HashMap)>; -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +// A map of signatures recorded in each fork; used to serialize for snapshots easily. +// Doesn't store a `SlotDelta` in it because the bool `root` is usually set much later +type SlotDeltaMap = HashMap>; + +// The signature statuses added during a slot, can be used to build on top of a status cache or to +// construct a new one. Usually derived from a status cache's `SlotDeltaMap` +pub type SlotDelta = (Slot, bool, SignatureStatus); + +#[derive(Clone, Debug)] pub struct StatusCache { - /// all signatures seen during a hash period - #[serde(serialize_with = "serialize_statusmap")] - cache: Vec>, - roots: HashSet, -} - -fn serialize_statusmap(x: &[StatusMap], s: S) -> Result -where - T: serde::Serialize + Clone, - S: serde::Serializer, -{ - let cache0: StatusMap = HashMap::new(); - let mut seq = s.serialize_seq(Some(x.len()))?; - seq.serialize_element(&cache0)?; - seq.serialize_element(&x[1])?; - seq.end() + cache: StatusMap, + roots: HashSet, + /// all signatures seen during a fork/slot + slot_deltas: SlotDeltaMap, } impl Default for StatusCache { fn default() -> Self { Self { - cache: vec![HashMap::default(); 2], - roots: HashSet::default(), + cache: HashMap::default(), + // 0 is always a root + roots: [0].iter().cloned().collect(), + slot_deltas: HashMap::default(), } } } +impl PartialEq for StatusCache { + fn eq(&self, other: &Self) -> bool { + self.roots == other.roots + && self.cache.iter().all(|(hash, (slot, sig_index, sig_map))| { + if let Some((other_slot, other_sig_index, other_sig_map)) = other.cache.get(hash) { + if slot == other_slot && sig_index == other_sig_index { + return sig_map.iter().all(|(slice, fork_map)| { + if let Some(other_fork_map) = other_sig_map.get(slice) { + // all this work just to compare the highest forks in the fork map + // per signature + return fork_map.last() == other_fork_map.last(); + } + false + }); + } + } + false + }) + } +} + impl StatusCache { /// Check if the signature from a transaction is in any of the forks in the ancestors set. pub fn get_signature_status( &self, sig: &Signature, transaction_blockhash: &Hash, - ancestors: &HashMap, - ) -> Option<(ForkId, T)> { - for cache in self.cache.iter() { - let map = cache.get(transaction_blockhash); - if map.is_none() { - continue; - } - let (_, index, sigmap) = map.unwrap(); - let mut sig_slice = [0u8; CACHED_SIGNATURE_SIZE]; - sig_slice.clone_from_slice(&sig.as_ref()[*index..*index + CACHED_SIGNATURE_SIZE]); - if let Some(stored_forks) = sigmap.get(&sig_slice) { - let res = stored_forks - .iter() - .filter(|(f, _)| ancestors.get(f).is_some() || self.roots.get(f).is_some()) - .nth(0) - .cloned(); - if res.is_some() { - return res; - } + ancestors: &HashMap, + ) -> Option<(Slot, T)> { + let map = self.cache.get(transaction_blockhash)?; + let (_, index, sigmap) = map; + let mut sig_slice = [0u8; CACHED_SIGNATURE_SIZE]; + sig_slice.clone_from_slice(&sig.as_ref()[*index..*index + CACHED_SIGNATURE_SIZE]); + if let Some(stored_forks) = sigmap.get(&sig_slice) { + let res = stored_forks + .iter() + .filter(|(f, _)| ancestors.get(f).is_some() || self.roots.get(f).is_some()) + .nth(0) + .cloned(); + if res.is_some() { + return res; } } None @@ -79,14 +97,13 @@ impl StatusCache { pub fn get_signature_status_slow( &self, sig: &Signature, - ancestors: &HashMap, + ancestors: &HashMap, ) -> Option<(usize, T)> { trace!("get_signature_status_slow"); let mut keys = vec![]; - for cache in self.cache.iter() { - let mut val: Vec<_> = cache.iter().map(|(k, _)| *k).collect(); - keys.append(&mut val); - } + let mut val: Vec<_> = self.cache.iter().map(|(k, _)| *k).collect(); + keys.append(&mut val); + for blockhash in keys.iter() { trace!("get_signature_status_slow: trying {}", blockhash); if let Some((forkid, res)) = self.get_signature_status(sig, blockhash, ancestors) { @@ -102,99 +119,122 @@ impl StatusCache { /// Add a known root fork. Roots are always valid ancestors. /// After MAX_CACHE_ENTRIES, roots are removed, and any old signatures are cleared. - pub fn add_root(&mut self, fork: ForkId) { + pub fn add_root(&mut self, fork: Slot) { self.roots.insert(fork); self.purge_roots(); } - /// Insert a new signature for a specific fork. - pub fn insert(&mut self, transaction_blockhash: &Hash, sig: &Signature, fork: ForkId, res: T) { + /// Insert a new signature for a specific slot. + pub fn insert(&mut self, transaction_blockhash: &Hash, sig: &Signature, slot: Slot, res: T) { let sig_index: usize; - if let Some(sig_map) = self.cache[0].get(transaction_blockhash) { + if let Some(sig_map) = self.cache.get(transaction_blockhash) { sig_index = sig_map.1; } else { sig_index = thread_rng().gen_range(0, std::mem::size_of::() - CACHED_SIGNATURE_SIZE); } - let sig_map = self.cache[1].entry(*transaction_blockhash).or_insert(( - fork, - sig_index, - HashMap::new(), - )); - sig_map.0 = std::cmp::max(fork, sig_map.0); + + let sig_map = + self.cache + .entry(*transaction_blockhash) + .or_insert((slot, sig_index, HashMap::new())); + sig_map.0 = std::cmp::max(slot, sig_map.0); let index = sig_map.1; let mut sig_slice = [0u8; CACHED_SIGNATURE_SIZE]; sig_slice.clone_from_slice(&sig.as_ref()[index..index + CACHED_SIGNATURE_SIZE]); - let sig_forks = sig_map.2.entry(sig_slice).or_insert_with(|| vec![]); - sig_forks.push((fork, res)); + self.insert_with_slice(transaction_blockhash, slot, sig_index, sig_slice, res); } pub fn purge_roots(&mut self) { if self.roots.len() > MAX_CACHE_ENTRIES { if let Some(min) = self.roots.iter().min().cloned() { self.roots.remove(&min); - for cache in self.cache.iter_mut() { - cache.retain(|_, (fork, _, _)| *fork > min); - } + self.cache.retain(|_, (fork, _, _)| *fork > min); + self.slot_deltas.retain(|slot, _| *slot > min); } } } - fn insert_entry( - &mut self, - transaction_blockhash: &Hash, - sig_slice: &[u8; CACHED_SIGNATURE_SIZE], - status: Vec<(ForkId, T)>, - index: usize, - ) { - let fork = status - .iter() - .fold(0, |acc, (f, _)| if acc > *f { acc } else { *f }); - let sig_map = - self.cache[0] - .entry(*transaction_blockhash) - .or_insert((fork, index, HashMap::new())); - sig_map.0 = std::cmp::max(fork, sig_map.0); - let sig_forks = sig_map.2.entry(*sig_slice).or_insert_with(|| vec![]); - sig_forks.extend(status); - } - /// Clear for testing pub fn clear_signatures(&mut self) { - for cache in self.cache.iter_mut() { - for v in cache.values_mut() { - v.2 = HashMap::new(); + for v in self.cache.values_mut() { + v.2 = HashMap::new(); + } + + self.slot_deltas + .iter_mut() + .for_each(|(_, status)| status.lock().unwrap().clear()); + } + + // returns the signature statuses for each slot in the slots provided + pub fn slot_deltas(&self, slots: &[Slot]) -> Vec> { + let empty = Arc::new(Mutex::new(HashMap::new())); + slots + .iter() + .map(|slot| { + ( + *slot, + self.roots.contains(slot), + self.slot_deltas.get(slot).unwrap_or_else(|| &empty).clone(), + ) + }) + .collect() + } + + // replay deltas into a status_cache allows "appending" data + pub fn append(&mut self, slot_deltas: &[SlotDelta]) { + for (slot, is_root, statuses) in slot_deltas { + statuses + .lock() + .unwrap() + .iter() + .for_each(|(tx_hash, (sig_index, statuses))| { + for (sig_slice, res) in statuses.iter() { + self.insert_with_slice(&tx_hash, *slot, *sig_index, *sig_slice, res.clone()) + } + }); + if *is_root { + self.add_root(*slot); } } } - pub fn append(&mut self, status_cache: &StatusCache) { - for (hash, sigmap) in status_cache.cache[1].iter() { - for (signature, fork_status) in sigmap.2.iter() { - self.insert_entry(hash, signature, fork_status.clone(), sigmap.1); - } - } - - self.roots = self.roots.union(&status_cache.roots).cloned().collect(); + pub fn from_slot_deltas(slot_deltas: &[SlotDelta]) -> Self { + // play all deltas back into the the status cache + let mut me = Self::default(); + me.append(slot_deltas); + me } - pub fn merge_caches(&mut self) { - let mut cache = HashMap::new(); - std::mem::swap(&mut cache, &mut self.cache[1]); - for (hash, sigmap) in cache.iter() { - for (signature, fork_status) in sigmap.2.iter() { - self.insert_entry(hash, signature, fork_status.clone(), sigmap.1); - } - } + fn insert_with_slice( + &mut self, + transaction_blockhash: &Hash, + slot: Slot, + sig_index: usize, + sig_slice: [u8; CACHED_SIGNATURE_SIZE], + res: T, + ) { + let sig_map = + self.cache + .entry(*transaction_blockhash) + .or_insert((slot, sig_index, HashMap::new())); + sig_map.0 = std::cmp::max(slot, sig_map.0); + + let sig_forks = sig_map.2.entry(sig_slice).or_insert_with(|| vec![]); + sig_forks.push((slot, res.clone())); + let slot_deltas = self.slot_deltas.entry(slot).or_default(); + let mut fork_entry = slot_deltas.lock().unwrap(); + let (_, hash_entry) = fork_entry + .entry(*transaction_blockhash) + .or_insert((sig_index, vec![])); + hash_entry.push((sig_slice, res)) } } #[cfg(test)] mod tests { use super::*; - use bincode::{deserialize_from, serialize_into, serialized_size}; use solana_sdk::hash::hash; - use std::io::Cursor; type BankStatusCache = StatusCache<()>; @@ -236,7 +276,7 @@ mod tests { let mut status_cache = BankStatusCache::default(); let blockhash = hash(Hash::default().as_ref()); let ancestors = HashMap::new(); - status_cache.insert(&blockhash, &sig, 0, ()); + status_cache.insert(&blockhash, &sig, 1, ()); assert_eq!( status_cache.get_signature_status(&sig, &blockhash, &ancestors), None @@ -347,91 +387,43 @@ mod tests { let blockhash = hash(Hash::default().as_ref()); status_cache.clear_signatures(); status_cache.insert(&blockhash, &sig, 0, ()); - let (_, index, sig_map) = status_cache.cache[1].get(&blockhash).unwrap(); + let (_, index, sig_map) = status_cache.cache.get(&blockhash).unwrap(); let mut sig_slice = [0u8; CACHED_SIGNATURE_SIZE]; sig_slice.clone_from_slice(&sig.as_ref()[*index..*index + CACHED_SIGNATURE_SIZE]); assert!(sig_map.get(&sig_slice).is_some()); } #[test] - fn test_statuscache_append() { + fn test_slot_deltas() { let sig = Signature::default(); - let mut status_cache0 = BankStatusCache::default(); - let blockhash0 = hash(Hash::new(&vec![0; 32]).as_ref()); - status_cache0.add_root(0); - status_cache0.insert(&blockhash0, &sig, 0, ()); - - let sig = Signature::default(); - let mut status_cache1 = BankStatusCache::default(); - let blockhash1 = hash(Hash::new(&vec![1; 32]).as_ref()); - status_cache1.insert(&blockhash0, &sig, 1, ()); - status_cache1.add_root(1); - status_cache1.insert(&blockhash1, &sig, 1, ()); - - status_cache0.append(&status_cache1); - let roots: HashSet<_> = [0, 1].iter().cloned().collect(); - assert_eq!(status_cache0.roots, roots); - let ancestors = vec![(0, 1), (1, 1)].into_iter().collect(); - assert!(status_cache0 - .get_signature_status(&sig, &blockhash0, &ancestors) - .is_some()); - assert!(status_cache0 - .get_signature_status(&sig, &blockhash1, &ancestors) - .is_some()); - } - - fn test_serialize(sc: &mut BankStatusCache, blockhash: Vec, sig: &Signature) { - let len = serialized_size(&sc).unwrap(); - let mut buf = vec![0u8; len as usize]; - let mut writer = Cursor::new(&mut buf[..]); - let cache0 = sc.cache[0].clone(); - serialize_into(&mut writer, sc).unwrap(); - for hash in blockhash.iter() { - if let Some(map0) = sc.cache[0].get(hash) { - if let Some(map1) = sc.cache[1].get(hash) { - assert_eq!(map0.1, map1.1); - } - } - } - sc.merge_caches(); - let len = writer.position() as usize; - - let mut reader = Cursor::new(&mut buf[..len]); - let mut status_cache: BankStatusCache = deserialize_from(&mut reader).unwrap(); - status_cache.cache[0] = cache0; - status_cache.merge_caches(); - assert!(status_cache.cache[0].len() > 0); - assert!(status_cache.cache[1].is_empty()); - let ancestors = vec![(0, 1), (1, 1)].into_iter().collect(); - assert_eq!(*sc, status_cache); - for hash in blockhash.iter() { - assert!(status_cache - .get_signature_status(&sig, &hash, &ancestors) - .is_some()); - } + let mut status_cache = BankStatusCache::default(); + let blockhash = hash(Hash::default().as_ref()); + status_cache.clear_signatures(); + status_cache.insert(&blockhash, &sig, 0, ()); + let slot_deltas = status_cache.slot_deltas(&[0]); + let cache = StatusCache::from_slot_deltas(&slot_deltas); + assert_eq!(cache, status_cache); + let slot_deltas = cache.slot_deltas(&[0]); + let cache = StatusCache::from_slot_deltas(&slot_deltas); + assert_eq!(cache, status_cache); } #[test] - fn test_statuscache_serialize() { + fn test_roots_deltas() { let sig = Signature::default(); let mut status_cache = BankStatusCache::default(); - let blockhash0 = hash(Hash::new(&vec![0; 32]).as_ref()); - status_cache.add_root(0); - status_cache.clear_signatures(); - status_cache.insert(&blockhash0, &sig, 0, ()); - test_serialize(&mut status_cache, vec![blockhash0], &sig); - - status_cache.insert(&blockhash0, &sig, 1, ()); - test_serialize(&mut status_cache, vec![blockhash0], &sig); - - let blockhash1 = hash(Hash::new(&vec![1; 32]).as_ref()); - status_cache.insert(&blockhash1, &sig, 1, ()); - test_serialize(&mut status_cache, vec![blockhash0, blockhash1], &sig); - - let blockhash2 = hash(Hash::new(&vec![2; 32]).as_ref()); - let ancestors = vec![(0, 1), (1, 1)].into_iter().collect(); - assert!(status_cache - .get_signature_status(&sig, &blockhash2, &ancestors) - .is_none()); + let blockhash = hash(Hash::default().as_ref()); + let blockhash2 = hash(blockhash.as_ref()); + status_cache.insert(&blockhash, &sig, 0, ()); + status_cache.insert(&blockhash, &sig, 1, ()); + status_cache.insert(&blockhash2, &sig, 1, ()); + for i in 0..(MAX_CACHE_ENTRIES + 1) { + status_cache.add_root(i as u64); + } + let slots: Vec<_> = (0_u64..MAX_CACHE_ENTRIES as u64 + 1).collect(); + let slot_deltas = status_cache.slot_deltas(&slots); + let cache = StatusCache::from_slot_deltas(&slot_deltas); + assert_eq!(cache, status_cache); } + }