diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index d8b3fc9066..5da4c4f5db 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -238,7 +238,7 @@ impl CrdsGossipPull { if now > r.wallclock().checked_add(timeout).unwrap_or_else(|| 0) || now + timeout < r.wallclock() { - inc_new_counter_info!( + inc_new_counter_warn!( "cluster_info-gossip_pull_response_value_timeout", 1 ); @@ -250,7 +250,7 @@ impl CrdsGossipPull { // Before discarding this value, check if a ContactInfo for the owner // exists in the table. If it doesn't, that implies that this value can be discarded if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() { - inc_new_counter_info!( + inc_new_counter_warn!( "cluster_info-gossip_pull_response_value_timeout", 1 ); diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 74c9fc4081..3f72f5ae62 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -131,6 +131,7 @@ impl LedgerCleanupService { while let Ok(new_root) = new_root_receiver.try_recv() { root = new_root; } + if root - *last_purge_slot > purge_interval { let disk_utilization_pre = blockstore.storage_size(); info!( diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 23f612f025..06e3993ed0 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -354,7 +354,7 @@ impl PohRecorder { pub fn tick(&mut self) { let now = Instant::now(); let poh_entry = self.poh.lock().unwrap().tick(); - inc_new_counter_info!( + inc_new_counter_warn!( "poh_recorder-tick_lock_contention", timing::duration_as_us(&now.elapsed()) as usize ); @@ -364,7 +364,7 @@ impl PohRecorder { trace!("tick_height {}", self.tick_height); if self.leader_first_tick_height.is_none() { - inc_new_counter_info!( + inc_new_counter_warn!( "poh_recorder-tick_overhead", timing::duration_as_us(&now.elapsed()) as usize ); @@ -380,7 +380,7 @@ impl PohRecorder { self.tick_cache.push((entry, self.tick_height)); let _ = self.flush_cache(true); } - inc_new_counter_info!( + inc_new_counter_warn!( "poh_recorder-tick_overhead", timing::duration_as_us(&now.elapsed()) as usize ); @@ -409,13 +409,13 @@ impl PohRecorder { { let now = Instant::now(); let mut poh_lock = self.poh.lock().unwrap(); - inc_new_counter_info!( + inc_new_counter_warn!( "poh_recorder-record_lock_contention", timing::duration_as_us(&now.elapsed()) as usize ); let now = Instant::now(); let res = poh_lock.record(mixin); - inc_new_counter_info!( + inc_new_counter_warn!( "poh_recorder-record_ms", timing::duration_as_us(&now.elapsed()) as usize ); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index d75d4133b0..7c8f7c1d7f 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -262,7 +262,7 @@ impl RepairService { } else if slot_meta.consumed == slot_meta.received { vec![RepairType::HighestShred(slot, slot_meta.received)] } else { - let reqs = blockstore.find_missing_data_indexes_ts( + let reqs = blockstore.find_missing_data_indexes( slot, slot_meta.first_shred_timestamp, slot_meta.consumed, diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index 3a7445ed6e..a3baff7919 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -1,5 +1,4 @@ use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}; -use solana_ledger::blockstore::Blockstore; use solana_ledger::{snapshot_package::AccountsPackageReceiver, snapshot_utils}; use solana_sdk::{clock::Slot, hash::Hash}; use std::{ @@ -22,7 +21,6 @@ impl SnapshotPackagerService { starting_snapshot_hash: Option<(Slot, Hash)>, exit: &Arc, cluster_info: &Arc, - blockstore: Option>, ) -> Self { let exit = exit.clone(); let cluster_info = cluster_info.clone(); @@ -59,9 +57,6 @@ impl SnapshotPackagerService { } cluster_info.push_snapshot_hashes(hashes.clone()); } - if let Some(ref blockstore) = blockstore { - let _ = blockstore.tar_shreds(snapshot_package.root); - } } Err(RecvTimeoutError::Disconnected) => break, Err(RecvTimeoutError::Timeout) => (), diff --git a/core/src/tvu.rs b/core/src/tvu.rs index ec889e14f0..83b37e9052 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -291,7 +291,6 @@ pub mod tests { Blockstore::open_with_signal(&blockstore_path) .expect("Expected to successfully open ledger"); let blockstore = Arc::new(blockstore); - let bank = bank_forks.working_bank(); let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank, &blockstore, None); diff --git a/core/src/validator.rs b/core/src/validator.rs index 7cb44462b8..feb1ada18f 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -198,10 +198,6 @@ impl Validator { let bank_info = &bank_forks_info[0]; let bank = bank_forks[bank_info.bank_slot].clone(); - blockstore - .reconcile_shreds(Some(&leader_schedule_cache)) - .expect("Expected to successfully reconcile shreds"); - info!("Starting validator from slot {}", bank.slot()); { let hard_forks: Vec<_> = bank.hard_forks().read().unwrap().iter().copied().collect(); @@ -376,13 +372,8 @@ impl Validator { if config.snapshot_config.is_some() { // Start a snapshot packaging service let (sender, receiver) = channel(); - let snapshot_packager_service = SnapshotPackagerService::new( - receiver, - snapshot_hash, - &exit, - &cluster_info, - Some(blockstore.clone()), - ); + let snapshot_packager_service = + SnapshotPackagerService::new(receiver, snapshot_hash, &exit, &cluster_info); (Some(snapshot_packager_service), Some(sender)) } else { (None, None) diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index 06cd06bea4..da84b01526 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -321,7 +321,7 @@ mod tests { let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); let snapshot_packager_service = - SnapshotPackagerService::new(receiver, None, &exit, &cluster_info, None); + SnapshotPackagerService::new(receiver, None, &exit, &cluster_info); // Close the channel so that the package service will exit after reading all the // packages off the channel diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 40b87e8915..450945f7b4 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -494,6 +494,8 @@ fn analyze_storage(database: &Database) -> Result<(), String> { analyze_column::(database, "ErasureMeta", ErasureMeta::key_size())?; analyze_column::(database, "Root", Root::key_size())?; analyze_column::(database, "Index", Index::key_size())?; + analyze_column::(database, "ShredData", ShredData::key_size())?; + analyze_column::(database, "ShredCode", ShredCode::key_size())?; analyze_column::( database, "TransactionStatus", diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 5f48ccb9a5..3bce56ba09 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -20,19 +20,19 @@ use rayon::{ iter::{IntoParallelRefIterator, ParallelIterator}, ThreadPool, }; +use rocksdb::DBRawIterator; use solana_measure::measure::Measure; use solana_metrics::{datapoint_debug, datapoint_error}; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::{ account::Account, - clock::DEFAULT_MS_PER_SLOT, clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK}, genesis_config::GenesisConfig, hash::Hash, program_utils::limited_deserialize, pubkey::Pubkey, signature::{Keypair, Signature, Signer}, - timing::{duration_as_ms, timestamp}, + timing::timestamp, transaction::Transaction, }; use solana_transaction_status::{ @@ -43,10 +43,8 @@ use solana_vote_program::{vote_instruction::VoteInstruction, vote_state::TIMESTA use std::{ cell::RefCell, cmp, - collections::{HashMap, HashSet}, + collections::HashMap, fs, - io::Read, - io::Write, path::{Path, PathBuf}, rc::Rc, sync::{ @@ -57,7 +55,6 @@ use std::{ }; pub const BLOCKSTORE_DIRECTORY: &str = "rocksdb"; -pub const SHREDS_DIRECTORY: &str = "shreds"; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) @@ -86,6 +83,8 @@ pub struct Blockstore { erasure_meta_cf: LedgerColumn, orphans_cf: LedgerColumn, index_cf: LedgerColumn, + data_shred_cf: LedgerColumn, + code_shred_cf: LedgerColumn, transaction_status_cf: LedgerColumn, address_signatures_cf: LedgerColumn, transaction_status_index_cf: LedgerColumn, @@ -97,7 +96,6 @@ pub struct Blockstore { pub completed_slots_senders: Vec>>, pub lowest_cleanup_slot: Arc>, no_compaction: bool, - shreds_dir: String, } pub struct IndexMetaWorkingSetEntry { @@ -179,11 +177,6 @@ impl Blockstore { pub fn open(ledger_path: &Path) -> Result { fs::create_dir_all(&ledger_path)?; let blockstore_path = ledger_path.join(BLOCKSTORE_DIRECTORY); - let shreds_dir = ledger_path - .join(SHREDS_DIRECTORY) - .to_str() - .unwrap() - .to_string(); adjust_ulimit_nofile(); @@ -206,6 +199,8 @@ impl Blockstore { let orphans_cf = db.column(); let index_cf = db.column(); + let data_shred_cf = db.column(); + let code_shred_cf = db.column(); let transaction_status_cf = db.column(); let address_signatures_cf = db.column(); let transaction_status_index_cf = db.column(); @@ -247,6 +242,8 @@ impl Blockstore { erasure_meta_cf, orphans_cf, index_cf, + data_shred_cf, + code_shred_cf, transaction_status_cf, address_signatures_cf, transaction_status_index_cf, @@ -258,7 +255,6 @@ impl Blockstore { last_root, lowest_cleanup_slot: Arc::new(RwLock::new(0)), no_compaction: false, - shreds_dir, }; if initialize_transaction_status_index { blockstore.initialize_transaction_status_index()?; @@ -343,49 +339,6 @@ impl Blockstore { } } - fn tar_dir(dir: String, archive: String) -> Result<()> { - let args = ["cfzP", &archive, &dir]; - let output = std::process::Command::new("tar").args(&args).output()?; - if !output.status.success() { - warn!( - "tar shreds {} command failed with exit code: {}", - dir, output.status, - ); - use std::str::from_utf8; - info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?")); - info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?")); - } else { - let _ = fs::remove_dir_all(dir); - } - Ok(()) - } - - pub fn tar_shreds(&self, max_slot: Slot) -> Result<()> { - fs::create_dir_all(Path::new(&self.shreds_dir).join("data").join("tars"))?; - fs::create_dir_all(Path::new(&self.shreds_dir).join("coding").join("tars"))?; - let dir = fs::read_dir(Path::new(&self.shreds_dir).join("data"))?; - let slots = dir - .filter_map(|e| { - let e = e.ok()?; - let path = e.path(); - if !path.is_dir() { - return None; - } - let ix: Slot = std::str::FromStr::from_str(e.file_name().to_str()?).ok()?; - Some(ix) - }) - .filter(|ix| *ix < max_slot); - for slot in slots { - let dir = self.slot_data_dir(slot); - let archive = self.slot_data_tar_path(slot); - let _ = Self::tar_dir(dir, archive); - let dir = self.slot_coding_dir(slot); - let archive = self.slot_coding_tar_path(slot); - let _ = Self::tar_dir(dir, archive); - } - Ok(()) - } - // Returns whether or not all columns have been purged until their end fn run_purge(&self, from_slot: Slot, to_slot: Slot) -> Result { let mut write_batch = self @@ -394,12 +347,6 @@ impl Blockstore { .expect("Database Error: Failed to get write batch"); // delete range cf is not inclusive let to_slot = to_slot.checked_add(1).unwrap_or_else(|| std::u64::MAX); - for s in from_slot..to_slot { - let _ = fs::remove_dir_all(self.slot_data_dir(s)); - let _ = fs::remove_file(self.slot_data_tar_path(s)); - let _ = fs::remove_dir_all(self.slot_coding_dir(s)); - let _ = fs::remove_file(self.slot_coding_tar_path(s)); - } let mut columns_empty = self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) @@ -408,6 +355,14 @@ impl Blockstore { .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) .unwrap_or(false) + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .unwrap_or(false) + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .unwrap_or(false) & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) @@ -474,6 +429,14 @@ impl Blockstore { .column::() .compact_range(from_slot, to_slot) .unwrap_or(false) + && self + .data_shred_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .code_shred_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) && self .dead_slots_cf .compact_range(from_slot, to_slot) @@ -547,40 +510,29 @@ impl Blockstore { let orphans_iter = self.orphans_iterator(root + 1).unwrap(); root_forks.chain(orphans_iter.flat_map(move |orphan| NextSlotsIterator::new(orphan, self))) } - pub fn dir_iterator( - slot: u64, - path: String, - index: u64, - ) -> Result)>> { - let dir = fs::read_dir(path)?; - Ok(dir.filter_map(move |e| { - let e = e.ok()?; - let ix: u64 = std::str::FromStr::from_str(e.file_name().to_str()?).ok()?; - if ix >= index { - let buf = Self::get_data(&e.path().to_str().unwrap().to_string(), None).ok()??; - Some(((slot, ix), buf.into_boxed_slice())) - } else { - None - } - })) - } - pub fn slot_data_iterator( - &self, + pub fn slot_data_iterator<'a>( + &'a self, slot: Slot, index: u64, - ) -> Result)>> { - let dir = self.slot_data_dir(slot); - Self::dir_iterator(slot, dir, index) + ) -> Result)> + 'a> { + let slot_iterator = self.db.iter::(IteratorMode::From( + (slot, index), + IteratorDirection::Forward, + ))?; + Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot)) } - pub fn slot_coding_iterator( - &self, + pub fn slot_coding_iterator<'a>( + &'a self, slot: Slot, index: u64, - ) -> Result)>> { - let dir = self.slot_coding_dir(slot); - Self::dir_iterator(slot, dir, index) + ) -> Result)> + 'a> { + let slot_iterator = self.db.iter::(IteratorMode::From( + (slot, index), + IteratorDirection::Forward, + ))?; + Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot)) } pub fn rooted_slot_iterator<'a>( @@ -594,12 +546,14 @@ impl Blockstore { } fn try_shred_recovery( - &self, + db: &Database, erasure_metas: &HashMap<(u64, u64), ErasureMeta>, index_working_set: &mut HashMap, prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, ) -> Vec { + let data_cf = db.column::(); + let code_cf = db.column::(); let mut recovered_data_shreds = vec![]; // Recovery rules: // 1. Only try recovery around indexes for which new data or coding shreds are received @@ -634,8 +588,8 @@ impl Blockstore { if index.data().is_present(i) { if let Some(shred) = prev_inserted_datas.remove(&(slot, i)).or_else(|| { - let some_data = self - .get_data_shred(slot, i) + let some_data = data_cf + .get_bytes((slot, i)) .expect("Database failure, could not fetch data shred"); if let Some(data) = some_data { Shred::new_from_serialized_shred(data).ok() @@ -665,8 +619,8 @@ impl Blockstore { }) .or_else(|| { if index.coding().is_present(i) { - let some_code = self - .get_coding_shred(slot, i) + let some_code = code_cf + .get_bytes((slot, i)) .expect("Database failure, could not fetch code shred"); if let Some(code) = some_code { Shred::new_from_serialized_shred(code).ok() @@ -756,6 +710,7 @@ impl Blockstore { &mut erasure_metas, &mut index_working_set, &mut slot_meta_working_set, + &mut write_batch, &mut just_inserted_data_shreds, &mut index_meta_time, is_trusted, @@ -782,7 +737,8 @@ impl Blockstore { let mut start = Measure::start("Shred recovery"); let mut num_recovered = 0; if let Some(leader_schedule_cache) = leader_schedule { - let recovered_data = self.try_shred_recovery( + let recovered_data = Self::try_shred_recovery( + &db, &erasure_metas, &mut index_working_set, &mut just_inserted_data_shreds, @@ -798,6 +754,7 @@ impl Blockstore { &mut erasure_metas, &mut index_working_set, &mut slot_meta_working_set, + &mut write_batch, &mut just_inserted_data_shreds, &mut index_meta_time, is_trusted, @@ -813,7 +770,12 @@ impl Blockstore { just_inserted_coding_shreds .into_iter() .for_each(|((_, _), shred)| { - self.check_insert_coding_shred(shred, &mut index_working_set, &mut index_meta_time); + self.check_insert_coding_shred( + shred, + &mut index_working_set, + &mut write_batch, + &mut index_meta_time, + ); num_inserted += 1; }); @@ -891,6 +853,7 @@ impl Blockstore { &self, shred: Shred, index_working_set: &mut HashMap, + write_batch: &mut WriteBatch, index_meta_time: &mut u64, ) -> bool { let slot = shred.slot(); @@ -901,7 +864,7 @@ impl Blockstore { let index_meta = &mut index_meta_working_set_entry.index; // This gives the index of first coding shred in this FEC block // So, all coding shreds in a given FEC block will have the same set index - self.insert_coding_shred(index_meta, &shred) + self.insert_coding_shred(index_meta, &shred, write_batch) .map(|_| { index_meta_working_set_entry.did_insert_occur = true; }) @@ -942,7 +905,6 @@ impl Blockstore { .get((slot, set_index)) .expect("Expect database get to succeed") .unwrap_or_else(|| { - fs::create_dir_all(self.slot_coding_dir(slot)).unwrap(); ErasureMeta::new(set_index, first_coding_index, &erasure_config) }) }); @@ -955,6 +917,7 @@ impl Blockstore { erasure_meta.config, erasure_config ); } + // Should be safe to modify index_meta here. Two cases // 1) Recovery happens: Then all inserted erasure metas are removed // from just_received_coding_shreds, and nothing wll be committed by @@ -979,6 +942,7 @@ impl Blockstore { erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, + write_batch: &mut WriteBatch, just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>, index_meta_time: &mut u64, is_trusted: bool, @@ -1009,10 +973,9 @@ impl Blockstore { } let set_index = u64::from(shred.common_header.fec_set_index); - if slot_meta.received == 0 { - fs::create_dir_all(self.slot_data_dir(shred.slot())).unwrap(); - } - if let Ok(()) = self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred) { + if let Ok(()) = + self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch) + { just_inserted_data_shreds.insert((slot, shred_index), shred); index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; @@ -1051,18 +1014,12 @@ impl Blockstore { || slot <= *last_root.read().unwrap()) } - fn write_all(path: &str, payload: &[u8]) -> Result<()> { - let tmp_name = format!("{}.tmp", path); - let tmp_path = Path::new(&tmp_name); - let mut f = fs::File::create(tmp_path)?; - f.write_all(payload)?; - let real_path = Path::new(path); - //after rename syscall returns, the real path is on disk - fs::rename(tmp_path, real_path)?; - Ok(()) - } - - fn insert_coding_shred(&self, index_meta: &mut Index, shred: &Shred) -> Result<()> { + fn insert_coding_shred( + &self, + index_meta: &mut Index, + shred: &Shred, + write_batch: &mut WriteBatch, + ) -> Result<()> { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -1072,8 +1029,7 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - let path = self.coding_shred_path(slot, shred_index); - Self::write_all(&path, &shred.payload)?; + write_batch.put_bytes::((slot, shred_index), &shred.payload)?; index_meta.coding_mut().set_present(shred_index, true); Ok(()) @@ -1142,6 +1098,7 @@ impl Blockstore { slot_meta: &mut SlotMeta, data_index: &mut ShredIndex, shred: &Shred, + write_batch: &mut WriteBatch, ) -> Result<()> { let slot = shred.slot(); let index = u64::from(shred.index()); @@ -1176,8 +1133,7 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - let path = self.data_shred_path(slot, index); - Self::write_all(&path, &shred.payload)?; + write_batch.put_bytes::((slot, index), &shred.payload)?; update_slot_meta( last_in_slot, last_in_data, @@ -1191,93 +1147,8 @@ impl Blockstore { Ok(()) } - fn slot_data_tar_path(&self, slot: Slot) -> String { - Path::new(&self.shreds_dir) - .join("data") - .join("tars") - .join(slot.to_string()) - .with_extension("tar.gz") - .to_str() - .unwrap() - .to_string() - } - - fn slot_data_dir(&self, slot: Slot) -> String { - Path::new(&self.shreds_dir) - .join("data") - .join(slot.to_string()) - .to_str() - .unwrap() - .to_string() - } - fn slot_coding_dir(&self, slot: Slot) -> String { - Path::new(&self.shreds_dir) - .join("coding") - .join(slot.to_string()) - .to_str() - .unwrap() - .to_string() - } - fn slot_coding_tar_path(&self, slot: Slot) -> String { - Path::new(&self.shreds_dir) - .join("coding") - .join("tars") - .join(slot.to_string()) - .with_extension("tar.gz") - .to_str() - .unwrap() - .to_string() - } - fn data_shred_path(&self, slot: Slot, index: u64) -> String { - Path::new(&self.slot_data_dir(slot)) - .join(index.to_string()) - .to_str() - .unwrap() - .to_string() - } - fn coding_shred_path(&self, slot: Slot, index: u64) -> String { - Path::new(&self.slot_coding_dir(slot)) - .join(index.to_string()) - .to_str() - .unwrap() - .to_string() - } - fn extract_data(archive: &str, file: &str) -> Result>> { - let args = ["xfzO", archive, file]; - let output = std::process::Command::new("tar").args(&args).output()?; - if !output.status.success() { - warn!( - "tar extract shred {} {} command failed with exit code: {}", - archive, file, output.status, - ); - use std::str::from_utf8; - info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?")); - info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?")); - Ok(None) - } else { - Ok(Some(output.stdout)) - } - } - fn get_data(shred_path: &str, tgz: Option<&str>) -> Result>> { - let path = Path::new(shred_path); - let f = fs::File::open(path); - if f.is_err() { - if let Some(archive) = tgz { - if Path::new(archive).is_file() { - return Self::extract_data(archive, shred_path); - } - } - Ok(None) - } else { - let mut buf = vec![]; - f?.read_to_end(&mut buf)?; - Ok(Some(buf)) - } - } pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result>> { - let shred_path = self.data_shred_path(slot, index); - let archive_path = self.slot_data_tar_path(slot); - Self::get_data(&shred_path, Some(&archive_path)) + self.data_shred_cf.get_bytes((slot, index)) } pub fn get_data_shreds_for_slot( @@ -1285,15 +1156,10 @@ impl Blockstore { slot: Slot, start_index: u64, ) -> ShredResult> { - let mut vec: Vec = self - .slot_data_iterator(slot, start_index) - .map(|iter| { - iter.filter_map(|data| Shred::new_from_serialized_shred(data.1.to_vec()).ok()) - .collect() - }) - .unwrap_or_else(|_| vec![]); - vec.sort_by_key(|s| s.index()); - Ok(vec) + self.slot_data_iterator(slot, start_index) + .expect("blockstore couldn't fetch iterator") + .map(|data| Shred::new_from_serialized_shred(data.1.to_vec())) + .collect() } pub fn get_data_shreds( @@ -1342,9 +1208,7 @@ impl Blockstore { } pub fn get_coding_shred(&self, slot: Slot, index: u64) -> Result>> { - let shred_path = self.coding_shred_path(slot, index); - let archive_path = self.slot_data_tar_path(slot); - Self::get_data(&shred_path, Some(&archive_path)) + self.code_shred_cf.get_bytes((slot, index)) } pub fn get_coding_shreds_for_slot( @@ -1352,15 +1216,10 @@ impl Blockstore { slot: Slot, start_index: u64, ) -> ShredResult> { - let mut vec: Vec = self - .slot_coding_iterator(slot, start_index) - .map(|iter| { - iter.filter_map(|code| Shred::new_from_serialized_shred(code.1.to_vec()).ok()) - .collect() - }) - .unwrap_or_else(|_| vec![]); - vec.sort_by_key(|s| s.index()); - Ok(vec) + self.slot_coding_iterator(slot, start_index) + .expect("blockstore couldn't fetch iterator") + .map(|code| Shred::new_from_serialized_shred(code.1.to_vec())) + .collect() } // Only used by tests @@ -1447,71 +1306,110 @@ impl Blockstore { self.meta_cf.put_bytes(slot, bytes) } + // Given a start and end entry index, find all the missing + // indexes in the ledger in the range [start_index, end_index) + // for the slot with the specified slot + fn find_missing_indexes( + db_iterator: &mut DBRawIterator, + slot: Slot, + first_timestamp: u64, + start_index: u64, + end_index: u64, + max_missing: usize, + ) -> Vec + where + C: Column, + { + if start_index >= end_index || max_missing == 0 { + return vec![]; + } + + let mut missing_indexes = vec![]; + let ticks_since_first_insert = + DEFAULT_TICKS_PER_SECOND * (timestamp() - first_timestamp) / 1000; + + // Seek to the first shred with index >= start_index + db_iterator.seek(&C::key((slot, start_index))); + + // The index of the first missing shred in the slot + let mut prev_index = start_index; + 'outer: loop { + if !db_iterator.valid() { + for i in prev_index..end_index { + missing_indexes.push(i); + if missing_indexes.len() == max_missing { + break; + } + } + break; + } + let (current_slot, index) = C::index(&db_iterator.key().expect("Expect a valid key")); + + let current_index = { + if current_slot > slot { + end_index + } else { + index + } + }; + + let upper_index = cmp::min(current_index, end_index); + // the tick that will be used to figure out the timeout for this hole + let reference_tick = u64::from(Shred::reference_tick_from_data( + &db_iterator.value().expect("couldn't read value"), + )); + + if ticks_since_first_insert < reference_tick + MAX_TURBINE_DELAY_IN_TICKS { + // The higher index holes have not timed out yet + break 'outer; + } + for i in prev_index..upper_index { + missing_indexes.push(i); + if missing_indexes.len() == max_missing { + break 'outer; + } + } + + if current_slot > slot { + break; + } + + if current_index >= end_index { + break; + } + + prev_index = current_index + 1; + db_iterator.next(); + } + + missing_indexes + } + pub fn find_missing_data_indexes( &self, slot: Slot, + first_timestamp: u64, start_index: u64, end_index: u64, max_missing: usize, ) -> Vec { - self.find_missing_data_indexes_ts(slot, 0, start_index, end_index, max_missing) - } - fn find_data_indexes( - &self, - slot: Slot, - start_index: u64, - end_index: u64, - first_ts: &mut u64, - ) -> HashSet { - let dir = fs::read_dir(self.slot_data_dir(slot)); - let min_ts = *first_ts; - if let Ok(dir) = dir { - dir.filter_map(|e| { - let e = e.ok()?; - let ix: u64 = std::str::FromStr::from_str(e.file_name().to_str()?).ok()?; - if ix >= start_index || ix <= end_index { - if min_ts > 0 { - let ts = fs::metadata(e.path()) - .ok()? - .modified() - .ok()? - .duration_since(std::time::UNIX_EPOCH) - .ok()?; - let ts = duration_as_ms(&ts); - if ts > min_ts { - *first_ts = cmp::min(ts, *first_ts); - } - } - Some(ix) - } else { - None - } - }) - .collect() + if let Ok(mut db_iterator) = self + .db + .raw_iterator_cf(self.db.cf_handle::()) + { + Self::find_missing_indexes::( + &mut db_iterator, + slot, + first_timestamp, + start_index, + end_index, + max_missing, + ) } else { - HashSet::new() + vec![] } } - pub fn find_missing_data_indexes_ts( - &self, - slot: Slot, - mut first_ts: u64, - start_index: u64, - end_index: u64, - max_missing: usize, - ) -> Vec { - let current = self.find_data_indexes(slot, start_index, end_index, &mut first_ts); - let now = timestamp(); - if now < first_ts || now - first_ts < DEFAULT_MS_PER_SLOT / 2 { - return vec![]; - } - (start_index..end_index) - .filter(|ix| !current.contains(ix)) - .take(max_missing) - .collect() - } - pub fn get_block_time( &self, slot: Slot, @@ -1987,75 +1885,6 @@ impl Blockstore { .map(|x| x.0) } - fn verify_shred(&self, leader: &Pubkey, slot: Slot, x: u64) { - let payload = self.get_data_shred(slot, x).unwrap().unwrap(); - let shred = Shred::new_from_serialized_shred(payload).unwrap(); - assert!( - shred.verify(leader), - "shred failed verification {} {} {}", - slot, - x, - self.shreds_dir, - ); - } - - fn verify_expected_shreds( - &self, - ls: Option<&Arc>, - slot: Slot, - total: &mut usize, - ) -> ShredIndex { - let index = self - .index_cf - .get(slot) - .unwrap() - .unwrap_or_else(|| Index::new(slot)); - let data = index.data(); - if slot != 0 { - if let Some(ls) = ls { - let leader = ls.slot_leader_at(slot, None).unwrap(); - for ix in data.index.iter() { - self.verify_shred(&leader, slot, *ix); - *total += 1; - } - } - } - data.clone() - } - - pub fn reconcile_shreds( - &self, - leader_schedule: Option<&Arc>, - ) -> Result { - let root_slot: Slot = *self.last_root.read().unwrap(); - let mut total = 0; - info!("reconciling shreds from root {}", root_slot); - let slot_iterator = self - .db - .iter::(IteratorMode::From(root_slot, IteratorDirection::Forward))?; - for (slot, _) in slot_iterator { - info!("reconciling shreds slot {}", slot); - let existing = self.find_data_indexes(slot, 0, std::u64::MAX, &mut 0); - let expected = self.verify_expected_shreds(leader_schedule, slot, &mut total); - let new_shreds: Vec = existing - .into_iter() - .filter_map(|x| { - if expected.is_present(x) { - None - } else { - let payload = self.get_data_shred(slot, x).ok()??; - Shred::new_from_serialized_shred(payload).ok() - } - }) - .collect(); - if !new_shreds.is_empty() { - self.insert_shreds(new_shreds, leader_schedule, false)?; - } - } - info!("Done reconciling shreds. Verified {}", total); - Ok(total) - } - /// Returns the entry vector for the slot starting with `shred_start_index`, the number of /// shreds that comprise the entry vector, and whether the slot is full (consumed all shreds). pub fn get_slot_entries_with_shred_info( @@ -2144,26 +1973,26 @@ impl Blockstore { end_index: u32, slot_meta: &SlotMeta, ) -> Result> { + let data_shred_cf = self.db.column::(); + // Short circuit on first error let data_shreds: Result> = (start_index..=end_index) .map(|i| { - self.get_data_shred(slot, u64::from(i)) + data_shred_cf + .get_bytes((slot, u64::from(i))) .and_then(|serialized_shred| { Shred::new_from_serialized_shred(serialized_shred.unwrap_or_else(|| { - let index = self.index_cf.get(slot).unwrap().map(|i| i.data().clone()); panic!( "Shred with slot: {}, index: {}, consumed: {}, - completed_indexes: {:?}, - index_meta data: {:?}, + completed_indexes: {:?} must exist if shred index was included in a range: {} {}", slot, i, slot_meta.consumed, slot_meta.completed_data_indexes, - index, start_index, end_index ) @@ -2360,27 +2189,7 @@ impl Blockstore { } pub fn storage_size(&self) -> Result { - let storage = self.db.storage_size()?; - let mut dir_len = 0; - let path = Path::new(&self.shreds_dir); - Self::dir_size(path, &mut dir_len)?; - Ok(storage + dir_len) - } - - fn dir_size(dir: &Path, size: &mut u64) -> Result<()> { - if dir.is_dir() { - for entry in fs::read_dir(dir)? { - let entry = entry?; - let path = entry.path(); - if path.is_dir() { - Self::dir_size(&path, size)?; - } else { - let len = fs::metadata(path)?.len(); - *size += len; - } - } - } - Ok(()) + self.db.storage_size() } } @@ -3100,6 +2909,20 @@ pub mod tests { .next() .map(|(slot, _)| slot >= min_slot) .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|((slot, _), _)| slot >= min_slot) + .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|((slot, _), _)| slot >= min_slot) + .unwrap_or(true) & blockstore .db .iter::(IteratorMode::Start) @@ -3199,7 +3022,8 @@ pub mod tests { .unwrap(); let serialized_shred = ledger - .get_data_shred(0, last_shred.index() as u64) + .data_shred_cf + .get_bytes((0, last_shred.index() as u64)) .unwrap() .unwrap(); let deserialized_shred = Shred::new_from_serialized_shred(serialized_shred).unwrap(); @@ -3309,6 +3133,56 @@ pub mod tests { Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); } + #[test] + fn test_put_get_simple() { + let ledger_path = get_tmp_ledger_path!(); + let ledger = Blockstore::open(&ledger_path).unwrap(); + + // Test meta column family + let meta = SlotMeta::new(0, 1); + ledger.meta_cf.put(0, &meta).unwrap(); + let result = ledger + .meta_cf + .get(0) + .unwrap() + .expect("Expected meta object to exist"); + + assert_eq!(result, meta); + + // Test erasure column family + let erasure = vec![1u8; 16]; + let erasure_key = (0, 0); + ledger + .code_shred_cf + .put_bytes(erasure_key, &erasure) + .unwrap(); + + let result = ledger + .code_shred_cf + .get_bytes(erasure_key) + .unwrap() + .expect("Expected erasure object to exist"); + + assert_eq!(result, erasure); + + // Test data column family + let data = vec![2u8; 16]; + let data_key = (0, 0); + ledger.data_shred_cf.put_bytes(data_key, &data).unwrap(); + + let result = ledger + .data_shred_cf + .get_bytes(data_key) + .unwrap() + .expect("Expected data object to exist"); + + assert_eq!(result, data); + + // Destroying database without closing it first is undefined behavior + drop(ledger); + Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); + } + #[test] fn test_read_shred_bytes() { let slot = 0; @@ -4427,27 +4301,27 @@ pub mod tests { // range of [0, gap) let expected: Vec = (1..gap).collect(); assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, gap, gap as usize), + blockstore.find_missing_data_indexes(slot, 0, 0, gap, gap as usize), expected ); assert_eq!( - blockstore.find_missing_data_indexes(slot, 1, gap, (gap - 1) as usize), + blockstore.find_missing_data_indexes(slot, 0, 1, gap, (gap - 1) as usize), expected, ); assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, gap - 1, (gap - 1) as usize), + blockstore.find_missing_data_indexes(slot, 0, 0, gap - 1, (gap - 1) as usize), &expected[..expected.len() - 1], ); assert_eq!( - blockstore.find_missing_data_indexes(slot, gap - 2, gap, gap as usize), + blockstore.find_missing_data_indexes(slot, 0, gap - 2, gap, gap as usize), vec![gap - 2, gap - 1], ); assert_eq!( - blockstore.find_missing_data_indexes(slot, gap - 2, gap, 1), + blockstore.find_missing_data_indexes(slot, 0, gap - 2, gap, 1), vec![gap - 2], ); assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, gap, 1), + blockstore.find_missing_data_indexes(slot, 0, 0, gap, 1), vec![1], ); @@ -4456,11 +4330,11 @@ pub mod tests { let mut expected: Vec = (1..gap).collect(); expected.push(gap + 1); assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, gap + 2, (gap + 2) as usize), + blockstore.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap + 2) as usize), expected, ); assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, gap + 2, (gap - 1) as usize), + blockstore.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap - 1) as usize), &expected[..expected.len() - 1], ); @@ -4476,6 +4350,7 @@ pub mod tests { assert_eq!( blockstore.find_missing_data_indexes( slot, + 0, j * gap, i * gap, ((i - j) * gap) as usize @@ -4513,14 +4388,15 @@ pub mod tests { }) .collect(); blockstore.insert_shreds(shreds, None, false).unwrap(); + let empty: Vec = vec![]; assert_eq!( - blockstore.find_missing_data_indexes_ts(slot, timestamp() + 1, 0, 50, 1), + blockstore.find_missing_data_indexes(slot, timestamp(), 0, 50, 1), empty ); let expected: Vec<_> = (1..=9).collect(); assert_eq!( - blockstore.find_missing_data_indexes_ts(slot, timestamp() - 400, 0, 50, 9), + blockstore.find_missing_data_indexes(slot, timestamp() - 400, 0, 50, 9), expected ); @@ -4537,10 +4413,22 @@ pub mod tests { // Early exit conditions let empty: Vec = vec![]; - assert_eq!(blockstore.find_missing_data_indexes(slot, 0, 0, 1), empty); - assert_eq!(blockstore.find_missing_data_indexes(slot, 5, 5, 1), empty); - assert_eq!(blockstore.find_missing_data_indexes(slot, 4, 3, 1), empty); - assert_eq!(blockstore.find_missing_data_indexes(slot, 1, 2, 0), empty); + assert_eq!( + blockstore.find_missing_data_indexes(slot, 0, 0, 0, 1), + empty + ); + assert_eq!( + blockstore.find_missing_data_indexes(slot, 0, 5, 5, 1), + empty + ); + assert_eq!( + blockstore.find_missing_data_indexes(slot, 0, 4, 3, 1), + empty + ); + assert_eq!( + blockstore.find_missing_data_indexes(slot, 0, 1, 2, 0), + empty + ); let entries = create_ticks(100, 0, Hash::default()); let mut shreds = entries_to_test_shreds(entries, slot, 0, true, 0); @@ -4564,7 +4452,7 @@ pub mod tests { // [i, first_index - 1] for start in 0..STARTS { let result = blockstore.find_missing_data_indexes( - slot, start, // start + slot, 0, start, // start END, //end MAX, //max ); @@ -4594,7 +4482,7 @@ pub mod tests { for i in 0..num_shreds as u64 { for j in 0..i { assert_eq!( - blockstore.find_missing_data_indexes(slot, j, i, (i - j) as usize), + blockstore.find_missing_data_indexes(slot, 0, j, i, (i - j) as usize), empty ); } @@ -4894,8 +4782,9 @@ pub mod tests { } // Slot doesnt exist, iterator should be empty - let shred_iter = blockstore.slot_data_iterator(5, 0); - assert!(shred_iter.is_err()); + let shred_iter = blockstore.slot_data_iterator(5, 0).unwrap(); + let result: Vec<_> = shred_iter.collect(); + assert_eq!(result, vec![]); // Test that the iterator for slot 8 contains what was inserted earlier let shred_iter = blockstore.slot_data_iterator(8, 0).unwrap(); @@ -4949,11 +4838,14 @@ pub mod tests { assert!(slot <= 5); assert_eq!(meta.last_index, shreds_per_slot - 1) }); - for slot in 0..50 { + + let data_iter = blockstore + .data_shred_cf + .iter(IteratorMode::From((0, 0), IteratorDirection::Forward)) + .unwrap(); + for ((slot, _), _) in data_iter { if slot > 5 { - assert!(blockstore.slot_data_iterator(slot, 0).is_err()); - } else { - assert!(blockstore.slot_data_iterator(slot, 0).is_ok()); + assert!(false); } } @@ -6671,13 +6563,11 @@ pub mod tests { let index = blockstore.get_index(slot).unwrap().unwrap(); // Test the set of data shreds in the index and in the data column // family are the same + let data_iter = blockstore.slot_data_iterator(slot, 0).unwrap(); let mut num_data = 0; - let data_iter = blockstore.slot_data_iterator(slot, 0); - if data_iter.is_ok() { - for ((slot, index), _) in data_iter.unwrap() { - num_data += 1; - assert!(blockstore.get_data_shred(slot, index).unwrap().is_some()); - } + for ((slot, index), _) in data_iter { + num_data += 1; + assert!(blockstore.get_data_shred(slot, index).unwrap().is_some()); } // Test the data index doesn't have anything extra @@ -6747,48 +6637,4 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } - - #[test] - fn test_reconcile() { - let slot = 1; - let num_entries = 100; - let (data_shreds, _, leader_schedule_cache) = - setup_erasure_shreds(slot, 0, num_entries, 1.0); - - let ledger_path = get_tmp_ledger_path!(); - let ledger = Blockstore::open(&ledger_path).unwrap(); - - let len = data_shreds.len(); - ledger - .insert_shreds(data_shreds, Some(&leader_schedule_cache), false) - .unwrap(); - - let num = ledger - .reconcile_shreds(Some(&leader_schedule_cache)) - .unwrap(); - assert_eq!(num, len); - drop(ledger); - Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); - } - - #[test] - fn test_reconcile_missing() { - let slot = 1; - let num_entries = 100; - let (data_shreds, _, leader_schedule_cache) = - setup_erasure_shreds(slot, 0, num_entries, 1.0); - - let ledger_path = get_tmp_ledger_path!(); - let ledger = Blockstore::open(&ledger_path).unwrap(); - - ledger - .insert_shreds(data_shreds, Some(&leader_schedule_cache), false) - .unwrap(); - let shred_path = ledger.data_shred_path(slot, 0); - fs::remove_file(shred_path.clone()).unwrap(); - let result = std::panic::catch_unwind(|| { - let _ = ledger.reconcile_shreds(Some(&leader_schedule_cache)); - }); - assert!(result.is_err()); - } } diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index b82134fa60..0488f24c30 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -43,7 +43,7 @@ pub struct Index { #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] pub struct ShredIndex { /// Map representing presence/absence of shreds - pub index: BTreeSet, + index: BTreeSet, } #[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] diff --git a/ledger/src/hardened_unpack.rs b/ledger/src/hardened_unpack.rs index cd0fc27471..3ba7d9345e 100644 --- a/ledger/src/hardened_unpack.rs +++ b/ledger/src/hardened_unpack.rs @@ -206,9 +206,6 @@ fn is_valid_genesis_archive_entry(parts: &[&str], kind: tar::EntryType) -> bool (["rocksdb"], Directory) => true, (["rocksdb", ..], GNUSparse) => true, (["rocksdb", ..], Regular) => true, - (["shreds", ..], Directory) => true, - (["shreds", ..], GNUSparse) => true, - (["shreds", ..], Regular) => true, _ => false, } } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 64d7d66e84..8b687de76b 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -5,7 +5,7 @@ use solana_client::rpc_client::RpcClient; use solana_client::thin_client::create_client; use solana_core::{ broadcast_stage::BroadcastStageType, consensus::VOTE_THRESHOLD_DEPTH, - contact_info::ContactInfo, gossip_service::discover_cluster, validator::ValidatorConfig, + gossip_service::discover_cluster, validator::ValidatorConfig, }; use solana_download_utils::download_snapshot; use solana_ledger::bank_forks::CompressionType; @@ -274,7 +274,7 @@ fn run_cluster_partition( for node in &cluster_nodes { let node_client = RpcClient::new_socket(node.rpc); if let Ok(epoch_info) = node_client.get_epoch_info() { - debug!("slots_per_epoch: {:?}", epoch_info); + info!("slots_per_epoch: {:?}", epoch_info); if epoch_info.slots_in_epoch <= (1 << VOTE_THRESHOLD_DEPTH) { reached_epoch = false; break; @@ -343,16 +343,13 @@ fn run_cluster_partition( alive_node_contact_infos.len(), ) .unwrap(); - assert!(wait_for_new_roots(&alive_node_contact_infos, 1024, 16)); info!("PARTITION_TEST discovered {} nodes", cluster_nodes.len()); -} - -pub fn wait_for_new_roots(nodes: &[ContactInfo], mut tries: usize, min_roots: usize) -> bool { - info!("looking for new roots on all nodes"); - let mut roots = vec![HashSet::new(); nodes.len()]; + info!("PARTITION_TEST looking for new roots on all nodes"); + let mut roots = vec![HashSet::new(); alive_node_contact_infos.len()]; + let mut done = false; let mut last_print = Instant::now(); - while tries > 0 { - for (i, ingress_node) in nodes.iter().enumerate() { + while !done { + for (i, ingress_node) in alive_node_contact_infos.iter().enumerate() { let client = create_client( ingress_node.client_facing_addr(), solana_core::cluster_info::VALIDATOR_PORT_RANGE, @@ -361,24 +358,14 @@ pub fn wait_for_new_roots(nodes: &[ContactInfo], mut tries: usize, min_roots: us roots[i].insert(slot); let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0); if last_print.elapsed().as_secs() > 3 { - info!( - "{}: min observed roots {}/{} in {} nodes", - tries, - min_node, - min_roots, - roots.len() - ); + info!("PARTITION_TEST min observed roots {}/16", min_node); last_print = Instant::now(); } - if min_node >= min_roots { - return true; - } + done = min_node >= 16; } sleep(Duration::from_millis(clock::DEFAULT_MS_PER_SLOT / 2)); - tries -= 1; } - info!("failed waiting for roots"); - false + info!("PARTITION_TEST done waiting for roots"); } #[allow(unused_attributes)] @@ -876,7 +863,6 @@ fn test_snapshot_download() { #[test] #[serial] fn test_snapshot_restart_tower() { - solana_logger::setup(); // First set up the cluster with 2 nodes let snapshot_interval_slots = 10; let num_account_paths = 2; @@ -934,11 +920,12 @@ fn test_snapshot_restart_tower() { // Use the restarted node as the discovery point so that we get updated // validator's ContactInfo let restarted_node_info = cluster.get_contact_info(&validator_id).unwrap(); - - let (cluster_nodes, _) = - discover_cluster(&restarted_node_info.gossip, cluster.validators.len()).unwrap(); - - assert!(wait_for_new_roots(&cluster_nodes, 512, 16)); + cluster_tests::spend_and_verify_all_nodes( + &restarted_node_info, + &cluster.funding_keypair, + 1, + HashSet::new(), + ); } #[test]