LedgerCleanupService no longer causes an OOM and actually purges (bp #10199) (#10221)

automerge
This commit is contained in:
mergify[bot]
2020-05-24 23:24:45 -07:00
committed by GitHub
parent 0b5d3df251
commit 82772f95a1
4 changed files with 146 additions and 228 deletions

View File

@ -29,9 +29,8 @@ pub const DEFAULT_MIN_MAX_LEDGER_SHREDS: u64 = 50_000_000;
// and starve other blockstore users. // and starve other blockstore users.
pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512; pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512;
// Remove a limited number of slots at a time, so the operation // Delay between purges to cooperate with other blockstore users
// does not take too long and block other blockstore users. pub const DEFAULT_DELAY_BETWEEN_PURGES: Duration = Duration::from_millis(500);
pub const DEFAULT_PURGE_BATCH_SIZE: u64 = 256;
pub struct LedgerCleanupService { pub struct LedgerCleanupService {
t_cleanup: JoinHandle<()>, t_cleanup: JoinHandle<()>,
@ -62,6 +61,7 @@ impl LedgerCleanupService {
max_ledger_slots, max_ledger_slots,
&mut last_purge_slot, &mut last_purge_slot,
DEFAULT_PURGE_SLOT_INTERVAL, DEFAULT_PURGE_SLOT_INTERVAL,
Some(DEFAULT_DELAY_BETWEEN_PURGES),
) { ) {
match e { match e {
RecvTimeoutError::Disconnected => break, RecvTimeoutError::Disconnected => break,
@ -77,8 +77,8 @@ impl LedgerCleanupService {
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
root: Slot, root: Slot,
max_ledger_shreds: u64, max_ledger_shreds: u64,
) -> (u64, Slot, Slot, u64) { ) -> (bool, Slot, Slot, u64) {
let mut shreds = Vec::new(); let mut total_slots = Vec::new();
let mut iterate_time = Measure::start("iterate_time"); let mut iterate_time = Measure::start("iterate_time");
let mut total_shreds = 0; let mut total_shreds = 0;
let mut first_slot = 0; let mut first_slot = 0;
@ -89,33 +89,43 @@ impl LedgerCleanupService {
} }
// Not exact since non-full slots will have holes // Not exact since non-full slots will have holes
total_shreds += meta.received; total_shreds += meta.received;
shreds.push((slot, meta.received)); total_slots.push((slot, meta.received));
if slot > root { if slot > root {
break; break;
} }
} }
iterate_time.stop(); iterate_time.stop();
info!( info!(
"checking for ledger purge: max_shreds: {} slots: {} total_shreds: {} {}", "first_slot={} total_slots={} total_shreds={} max_ledger_shreds={}, {}",
max_ledger_shreds, first_slot,
shreds.len(), total_slots.len(),
total_shreds, total_shreds,
max_ledger_shreds,
iterate_time iterate_time
); );
if (total_shreds as u64) < max_ledger_shreds { if (total_shreds as u64) < max_ledger_shreds {
return (0, 0, 0, total_shreds); return (false, 0, 0, total_shreds);
} }
let mut cur_shreds = 0; let mut num_shreds_to_clean = 0;
let mut lowest_slot_to_clean = shreds[0].0; let mut lowest_cleanup_slot = total_slots[0].0;
for (slot, num_shreds) in shreds.iter().rev() { for (slot, num_shreds) in total_slots.iter().rev() {
cur_shreds += *num_shreds as u64; num_shreds_to_clean += *num_shreds as u64;
if cur_shreds > max_ledger_shreds { if num_shreds_to_clean > max_ledger_shreds {
lowest_slot_to_clean = *slot; lowest_cleanup_slot = *slot;
break; break;
} }
} }
(cur_shreds, lowest_slot_to_clean, first_slot, total_shreds) (true, lowest_cleanup_slot, first_slot, total_shreds)
}
fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> {
let mut root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
// Get the newest root
while let Ok(new_root) = new_root_receiver.try_recv() {
root = new_root;
}
Ok(root)
} }
fn cleanup_ledger( fn cleanup_ledger(
@ -124,58 +134,63 @@ impl LedgerCleanupService {
max_ledger_shreds: u64, max_ledger_shreds: u64,
last_purge_slot: &mut u64, last_purge_slot: &mut u64,
purge_interval: u64, purge_interval: u64,
delay_between_purges: Option<Duration>,
) -> Result<(), RecvTimeoutError> { ) -> Result<(), RecvTimeoutError> {
let mut root = new_root_receiver.recv_timeout(Duration::from_secs(1))?; let root = Self::receive_new_roots(new_root_receiver)?;
// Get the newest root if root - *last_purge_slot <= purge_interval {
while let Ok(new_root) = new_root_receiver.try_recv() { return Ok(());
root = new_root;
} }
if root - *last_purge_slot > purge_interval { let disk_utilization_pre = blockstore.storage_size();
let disk_utilization_pre = blockstore.storage_size(); info!(
"purge: last_root={}, last_purge_slot={}, purge_interval={}, disk_utilization={:?}",
root, last_purge_slot, purge_interval, disk_utilization_pre
);
*last_purge_slot = root;
let (slots_to_clean, lowest_cleanup_slot, first_slot, total_shreds) =
Self::find_slots_to_clean(&blockstore, root, max_ledger_shreds);
if slots_to_clean {
info!( info!(
"purge: new root: {} last_purge: {} purge_interval: {} disk: {:?}", "purging data from slots {} to {}",
root, last_purge_slot, purge_interval, disk_utilization_pre first_slot, lowest_cleanup_slot
); );
*last_purge_slot = root;
let (num_shreds_to_clean, lowest_slot_to_clean, mut first_slot, total_shreds) =
Self::find_slots_to_clean(blockstore, root, max_ledger_shreds);
if num_shreds_to_clean > 0 {
debug!(
"cleaning up to: {} shreds: {} first: {}",
lowest_slot_to_clean, num_shreds_to_clean, first_slot
);
loop {
let current_lowest =
std::cmp::min(lowest_slot_to_clean, first_slot + DEFAULT_PURGE_BATCH_SIZE);
let purge_complete = Arc::new(AtomicBool::new(false));
let blockstore = blockstore.clone();
let purge_complete1 = purge_complete.clone();
let _t_purge = Builder::new()
.name("solana-ledger-purge".to_string())
.spawn(move || {
let mut slot_update_time = Measure::start("slot_update"); let mut slot_update_time = Measure::start("slot_update");
*blockstore.lowest_cleanup_slot.write().unwrap() = current_lowest; *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot;
slot_update_time.stop(); slot_update_time.stop();
let mut clean_time = Measure::start("ledger_clean"); let mut purge_time = Measure::start("purge_slots_with_delay");
blockstore.purge_slots(first_slot, Some(current_lowest)); blockstore.purge_slots_with_delay(
clean_time.stop(); first_slot,
lowest_cleanup_slot,
debug!( delay_between_purges,
"ledger purge {} -> {}: {} {}",
first_slot, current_lowest, slot_update_time, clean_time
); );
first_slot += DEFAULT_PURGE_BATCH_SIZE; purge_time.stop();
if current_lowest == lowest_slot_to_clean { info!("{}", purge_time);
break; purge_complete1.store(true, Ordering::Relaxed);
} })
thread::sleep(Duration::from_millis(500)); .unwrap();
// Keep pulling roots off `new_root_receiver` while purging to avoid channel buildup
while !purge_complete.load(Ordering::Relaxed) {
if let Err(err) = Self::receive_new_roots(new_root_receiver) {
debug!("receive_new_roots: {}", err);
} }
thread::sleep(Duration::from_secs(1));
} }
let disk_utilization_post = blockstore.storage_size();
Self::report_disk_metrics(disk_utilization_pre, disk_utilization_post, total_shreds);
} }
let disk_utilization_post = blockstore.storage_size();
Self::report_disk_metrics(disk_utilization_pre, disk_utilization_post, total_shreds);
Ok(()) Ok(())
} }
@ -219,8 +234,15 @@ mod tests {
//send a signal to kill all but 5 shreds, which will be in the newest slots //send a signal to kill all but 5 shreds, which will be in the newest slots
let mut last_purge_slot = 0; let mut last_purge_slot = 0;
sender.send(50).unwrap(); sender.send(50).unwrap();
LedgerCleanupService::cleanup_ledger(&receiver, &blockstore, 5, &mut last_purge_slot, 10) LedgerCleanupService::cleanup_ledger(
.unwrap(); &receiver,
&blockstore,
5,
&mut last_purge_slot,
10,
None,
)
.unwrap();
//check that 0-40 don't exist //check that 0-40 don't exist
blockstore blockstore
@ -273,6 +295,7 @@ mod tests {
initial_slots, initial_slots,
&mut last_purge_slot, &mut last_purge_slot,
10, 10,
None,
) )
.unwrap(); .unwrap();
time.stop(); time.stop();
@ -315,6 +338,7 @@ mod tests {
max_ledger_shreds, max_ledger_shreds,
&mut next_purge_batch, &mut next_purge_batch,
10, 10,
None,
) )
.unwrap(); .unwrap();

View File

@ -733,17 +733,6 @@ fn main() {
.arg(&account_paths_arg) .arg(&account_paths_arg)
.arg(&halt_at_slot_arg) .arg(&halt_at_slot_arg)
.arg(&hard_forks_arg) .arg(&hard_forks_arg)
).subcommand(
SubCommand::with_name("prune")
.about("Prune the ledger at the block height")
.arg(
Arg::with_name("slot_list")
.long("slot-list")
.value_name("FILENAME")
.takes_value(true)
.required(true)
.help("The location of the YAML file with a list of rollback slot heights and hashes"),
)
).subcommand( ).subcommand(
SubCommand::with_name("purge") SubCommand::with_name("purge")
.about("Purge the ledger at the block height") .about("Purge the ledger at the block height")
@ -753,14 +742,14 @@ fn main() {
.value_name("SLOT") .value_name("SLOT")
.takes_value(true) .takes_value(true)
.required(true) .required(true)
.help("Start slot to purge from."), .help("Start slot to purge from (inclusive)"),
) )
.arg( .arg(
Arg::with_name("end_slot") Arg::with_name("end_slot")
.index(2) .index(2)
.value_name("SLOT") .value_name("SLOT")
.takes_value(true) .required(true)
.help("Optional ending slot to stop purging."), .help("Ending slot to stop purging (inclusive)"),
) )
) )
.subcommand( .subcommand(
@ -1135,48 +1124,10 @@ fn main() {
} }
("purge", Some(arg_matches)) => { ("purge", Some(arg_matches)) => {
let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot); let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot);
let end_slot = value_t!(arg_matches, "end_slot", Slot); let end_slot = value_t_or_exit!(arg_matches, "end_slot", Slot);
let end_slot = end_slot.map_or(None, Some);
let blockstore = open_blockstore(&ledger_path); let blockstore = open_blockstore(&ledger_path);
blockstore.purge_slots(start_slot, end_slot); blockstore.purge_slots(start_slot, end_slot);
} }
("prune", Some(arg_matches)) => {
if let Some(prune_file_path) = arg_matches.value_of("slot_list") {
let blockstore = open_blockstore(&ledger_path);
let prune_file = File::open(prune_file_path.to_string()).unwrap();
let slot_hashes: BTreeMap<u64, String> =
serde_yaml::from_reader(prune_file).unwrap();
let iter =
RootedSlotIterator::new(0, &blockstore).expect("Failed to get rooted slot");
let potential_hashes: Vec<_> = iter
.filter_map(|(slot, _meta)| {
let blockhash = blockstore
.get_slot_entries(slot, 0)
.unwrap()
.last()
.unwrap()
.hash
.to_string();
slot_hashes.get(&slot).and_then(|hash| {
if *hash == blockhash {
Some((slot, blockhash))
} else {
None
}
})
})
.collect();
let (target_slot, target_hash) = potential_hashes
.last()
.expect("Failed to find a valid slot");
println!("Prune at slot {:?} hash {:?}", target_slot, target_hash);
blockstore.prune(*target_slot);
}
}
("list-roots", Some(arg_matches)) => { ("list-roots", Some(arg_matches)) => {
let blockstore = open_blockstore(&ledger_path); let blockstore = open_blockstore(&ledger_path);
let max_height = if let Some(height) = arg_matches.value_of("max_height") { let max_height = if let Some(height) = arg_matches.value_of("max_height") {

View File

@ -299,47 +299,56 @@ impl Blockstore {
false false
} }
/// Silently deletes all blockstore column families starting at the given slot until the `to` slot /// Silently deletes all blockstore column families in the range [from_slot,to_slot]
/// Dangerous; Use with care: /// Dangerous; Use with care:
/// Does not check for integrity and does not update slot metas that refer to deleted slots /// Does not check for integrity and does not update slot metas that refer to deleted slots
/// Modifies multiple column families simultaneously /// Modifies multiple column families simultaneously
pub fn purge_slots(&self, mut from_slot: Slot, to_slot: Option<Slot>) { pub fn purge_slots_with_delay(
&self,
from_slot: Slot,
to_slot: Slot,
delay_between_purges: Option<Duration>,
) {
// if there's no upper bound, split the purge request into batches of 1000 slots // if there's no upper bound, split the purge request into batches of 1000 slots
const PURGE_BATCH_SIZE: u64 = 1000; const PURGE_BATCH_SIZE: u64 = 1000;
let mut batch_end = to_slot.unwrap_or(from_slot + PURGE_BATCH_SIZE); let mut batch_start = from_slot;
while from_slot < batch_end { while batch_start < to_slot {
match self.run_purge(from_slot, batch_end) { let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot);
Ok(end) => { match self.run_purge(batch_start, batch_end) {
if !self.no_compaction { Ok(_all_columns_purged) => {
if let Err(e) = self.compact_storage(from_slot, batch_end) { batch_start = batch_end;
// This error is not fatal and indicates an internal error
error!(
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
e, from_slot, batch_end
);
}
}
if end { if let Some(ref duration) = delay_between_purges {
break; // Cooperate with other blockstore users
} else { std::thread::sleep(*duration);
// update the next batch bounds
from_slot = batch_end;
batch_end = to_slot.unwrap_or(batch_end + PURGE_BATCH_SIZE);
} }
} }
Err(e) => { Err(e) => {
error!( error!(
"Error: {:?}; Purge failed in range {:?} to {:?}", "Error: {:?}; Purge failed in range {:?} to {:?}",
e, from_slot, batch_end e, batch_start, batch_end
); );
break; break;
} }
} }
} }
if !self.no_compaction {
if let Err(e) = self.compact_storage(from_slot, to_slot) {
// This error is not fatal and indicates an internal error
error!(
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
e, from_slot, to_slot
);
}
}
} }
// Returns whether or not all columns have been purged until their end pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) {
self.purge_slots_with_delay(from_slot, to_slot, None)
}
// Returns whether or not all columns successfully purged the slot range
fn run_purge(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> { fn run_purge(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> {
let mut write_batch = self let mut write_batch = self
.db .db
@ -347,6 +356,8 @@ impl Blockstore {
.expect("Database Error: Failed to get write batch"); .expect("Database Error: Failed to get write batch");
// delete range cf is not inclusive // delete range cf is not inclusive
let to_slot = to_slot.checked_add(1).unwrap_or_else(|| std::u64::MAX); let to_slot = to_slot.checked_add(1).unwrap_or_else(|| std::u64::MAX);
let mut delete_range_timer = Measure::start("delete_range");
let mut columns_empty = self let mut columns_empty = self
.db .db
.delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot) .delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot)
@ -403,6 +414,7 @@ impl Blockstore {
.delete_range_cf::<cf::AddressSignatures>(&mut write_batch, index, index + 1) .delete_range_cf::<cf::AddressSignatures>(&mut write_batch, index, index + 1)
.unwrap_or(false); .unwrap_or(false);
} }
delete_range_timer.stop();
let mut write_timer = Measure::start("write_batch"); let mut write_timer = Measure::start("write_batch");
if let Err(e) = self.db.write(write_batch) { if let Err(e) = self.db.write(write_batch) {
error!( error!(
@ -414,12 +426,17 @@ impl Blockstore {
write_timer.stop(); write_timer.stop();
datapoint_info!( datapoint_info!(
"blockstore-purge", "blockstore-purge",
("from_slot", from_slot as i64, i64),
("to_slot", to_slot as i64, i64),
("delete_range_us", delete_range_timer.as_us() as i64, i64),
("write_batch_us", write_timer.as_us() as i64, i64) ("write_batch_us", write_timer.as_us() as i64, i64)
); );
Ok(columns_empty) Ok(columns_empty)
} }
pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> { pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> {
info!("compact_storage: from {} to {}", from_slot, to_slot);
let mut compact_timer = Measure::start("compact_range");
let result = self let result = self
.meta_cf .meta_cf
.compact_range(from_slot, to_slot) .compact_range(from_slot, to_slot)
@ -473,6 +490,14 @@ impl Blockstore {
.rewards_cf .rewards_cf
.compact_range(from_slot, to_slot) .compact_range(from_slot, to_slot)
.unwrap_or(false); .unwrap_or(false);
compact_timer.stop();
if !result {
info!("compact_storage incomplete");
}
datapoint_info!(
"blockstore-compact",
("compact_range_us", compact_timer.as_us() as i64, i64),
);
Ok(result) Ok(result)
} }
@ -2137,39 +2162,6 @@ impl Blockstore {
Ok(orphans_iter.map(|(slot, _)| slot)) Ok(orphans_iter.map(|(slot, _)| slot))
} }
/// Prune blockstore such that slots higher than `target_slot` are deleted and all references to
/// higher slots are removed
pub fn prune(&self, target_slot: Slot) {
let mut meta = self
.meta(target_slot)
.expect("couldn't read slot meta")
.expect("no meta for target slot");
meta.next_slots.clear();
self.put_meta_bytes(
target_slot,
&bincode::serialize(&meta).expect("couldn't get meta bytes"),
)
.expect("unable to update meta for target slot");
self.purge_slots(target_slot + 1, None);
// fixup anything that refers to non-root slots and delete the rest
for (slot, mut meta) in self
.slot_meta_iterator(0)
.expect("unable to iterate over meta")
{
if slot > target_slot {
break;
}
meta.next_slots.retain(|slot| *slot <= target_slot);
self.put_meta_bytes(
slot,
&bincode::serialize(&meta).expect("couldn't update meta"),
)
.expect("couldn't update meta");
}
}
pub fn last_root(&self) -> Slot { pub fn last_root(&self) -> Slot {
*self.last_root.read().unwrap() *self.last_root.read().unwrap()
} }
@ -4808,42 +4800,6 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
#[test]
fn test_prune() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let (shreds, _) = make_many_slot_entries(0, 50, 6);
let shreds_per_slot = shreds.len() as u64 / 50;
blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore
.slot_meta_iterator(0)
.unwrap()
.for_each(|(_, meta)| assert_eq!(meta.last_index, shreds_per_slot - 1));
blockstore.prune(5);
blockstore
.slot_meta_iterator(0)
.unwrap()
.for_each(|(slot, meta)| {
assert!(slot <= 5);
assert_eq!(meta.last_index, shreds_per_slot - 1)
});
let data_iter = blockstore
.data_shred_cf
.iter(IteratorMode::From((0, 0), IteratorDirection::Forward))
.unwrap();
for ((slot, _), _) in data_iter {
if slot > 5 {
assert!(false);
}
}
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test] #[test]
fn test_purge_slots() { fn test_purge_slots() {
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();
@ -4851,11 +4807,11 @@ pub mod tests {
let (shreds, _) = make_many_slot_entries(0, 50, 5); let (shreds, _) = make_many_slot_entries(0, 50, 5);
blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore.purge_slots(0, Some(5)); blockstore.purge_slots(0, 5);
test_all_empty_or_min(&blockstore, 6); test_all_empty_or_min(&blockstore, 6);
blockstore.purge_slots(0, None); blockstore.purge_slots(0, 50);
// min slot shouldn't matter, blockstore should be empty // min slot shouldn't matter, blockstore should be empty
test_all_empty_or_min(&blockstore, 100); test_all_empty_or_min(&blockstore, 100);
@ -4879,7 +4835,7 @@ pub mod tests {
let (shreds, _) = make_many_slot_entries(0, 5000, 10); let (shreds, _) = make_many_slot_entries(0, 5000, 10);
blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore.purge_slots(0, Some(4999)); blockstore.purge_slots(0, 4999);
test_all_empty_or_min(&blockstore, 5000); test_all_empty_or_min(&blockstore, 5000);
@ -4887,19 +4843,6 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
#[should_panic]
#[test]
fn test_prune_out_of_bounds() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
// slot 5 does not exist, prune should panic
blockstore.prune(5);
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test] #[test]
fn test_iter_bounds() { fn test_iter_bounds() {
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();
@ -6400,14 +6343,14 @@ pub mod tests {
.insert_shreds(all_shreds, Some(&leader_schedule_cache), false) .insert_shreds(all_shreds, Some(&leader_schedule_cache), false)
.unwrap(); .unwrap();
verify_index_integrity(&blockstore, slot); verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot)); blockstore.purge_slots(0, slot);
// Test inserting just the codes, enough for recovery // Test inserting just the codes, enough for recovery
blockstore blockstore
.insert_shreds(coding_shreds.clone(), Some(&leader_schedule_cache), false) .insert_shreds(coding_shreds.clone(), Some(&leader_schedule_cache), false)
.unwrap(); .unwrap();
verify_index_integrity(&blockstore, slot); verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot)); blockstore.purge_slots(0, slot);
// Test inserting some codes, but not enough for recovery // Test inserting some codes, but not enough for recovery
blockstore blockstore
@ -6418,7 +6361,7 @@ pub mod tests {
) )
.unwrap(); .unwrap();
verify_index_integrity(&blockstore, slot); verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot)); blockstore.purge_slots(0, slot);
// Test inserting just the codes, and some data, enough for recovery // Test inserting just the codes, and some data, enough for recovery
let shreds: Vec<_> = data_shreds[..data_shreds.len() - 1] let shreds: Vec<_> = data_shreds[..data_shreds.len() - 1]
@ -6430,7 +6373,7 @@ pub mod tests {
.insert_shreds(shreds, Some(&leader_schedule_cache), false) .insert_shreds(shreds, Some(&leader_schedule_cache), false)
.unwrap(); .unwrap();
verify_index_integrity(&blockstore, slot); verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot)); blockstore.purge_slots(0, slot);
// Test inserting some codes, and some data, but enough for recovery // Test inserting some codes, and some data, but enough for recovery
let shreds: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1] let shreds: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
@ -6442,7 +6385,7 @@ pub mod tests {
.insert_shreds(shreds, Some(&leader_schedule_cache), false) .insert_shreds(shreds, Some(&leader_schedule_cache), false)
.unwrap(); .unwrap();
verify_index_integrity(&blockstore, slot); verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot)); blockstore.purge_slots(0, slot);
// Test inserting all shreds in 2 rounds, make sure nothing is lost // Test inserting all shreds in 2 rounds, make sure nothing is lost
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1] let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
@ -6462,7 +6405,7 @@ pub mod tests {
.insert_shreds(shreds2, Some(&leader_schedule_cache), false) .insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap(); .unwrap();
verify_index_integrity(&blockstore, slot); verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot)); blockstore.purge_slots(0, slot);
// Test not all, but enough data and coding shreds in 2 rounds to trigger recovery, // Test not all, but enough data and coding shreds in 2 rounds to trigger recovery,
// make sure nothing is lost // make sure nothing is lost
@ -6487,7 +6430,7 @@ pub mod tests {
.insert_shreds(shreds2, Some(&leader_schedule_cache), false) .insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap(); .unwrap();
verify_index_integrity(&blockstore, slot); verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot)); blockstore.purge_slots(0, slot);
// Test insert shreds in 2 rounds, but not enough to trigger // Test insert shreds in 2 rounds, but not enough to trigger
// recovery, make sure nothing is lost // recovery, make sure nothing is lost
@ -6512,7 +6455,7 @@ pub mod tests {
.insert_shreds(shreds2, Some(&leader_schedule_cache), false) .insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap(); .unwrap();
verify_index_integrity(&blockstore, slot); verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot)); blockstore.purge_slots(0, slot);
} }
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }

View File

@ -15,7 +15,7 @@ fn test_multiple_threads_insert_shred() {
for _ in 0..100 { for _ in 0..100 {
let num_threads = 10; let num_threads = 10;
// Create `num_threads` different ticks in slots 1..num_therads + 1, all // Create `num_threads` different ticks in slots 1..num_threads + 1, all
// with parent = slot 0 // with parent = slot 0
let threads: Vec<_> = (0..num_threads) let threads: Vec<_> = (0..num_threads)
.map(|i| { .map(|i| {
@ -42,7 +42,7 @@ fn test_multiple_threads_insert_shred() {
assert_eq!(meta0.next_slots, expected_next_slots); assert_eq!(meta0.next_slots, expected_next_slots);
// Delete slots for next iteration // Delete slots for next iteration
blockstore.purge_slots(0, None); blockstore.purge_slots(0, num_threads + 1);
} }
// Cleanup // Cleanup