From 82772f95a1a26eb91c58cfffb4b447cc2ddeeae3 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sun, 24 May 2020 23:24:45 -0700 Subject: [PATCH] LedgerCleanupService no longer causes an OOM and actually purges (bp #10199) (#10221) automerge --- core/src/ledger_cleanup_service.rs | 138 +++++++++++++---------- ledger-tool/src/main.rs | 57 +--------- ledger/src/blockstore.rs | 175 ++++++++++------------------- ledger/tests/blockstore.rs | 4 +- 4 files changed, 146 insertions(+), 228 deletions(-) diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index f5a1d05f16..58880fa7dc 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -29,9 +29,8 @@ pub const DEFAULT_MIN_MAX_LEDGER_SHREDS: u64 = 50_000_000; // and starve other blockstore users. pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512; -// Remove a limited number of slots at a time, so the operation -// does not take too long and block other blockstore users. -pub const DEFAULT_PURGE_BATCH_SIZE: u64 = 256; +// Delay between purges to cooperate with other blockstore users +pub const DEFAULT_DELAY_BETWEEN_PURGES: Duration = Duration::from_millis(500); pub struct LedgerCleanupService { t_cleanup: JoinHandle<()>, @@ -62,6 +61,7 @@ impl LedgerCleanupService { max_ledger_slots, &mut last_purge_slot, DEFAULT_PURGE_SLOT_INTERVAL, + Some(DEFAULT_DELAY_BETWEEN_PURGES), ) { match e { RecvTimeoutError::Disconnected => break, @@ -77,8 +77,8 @@ impl LedgerCleanupService { blockstore: &Arc, root: Slot, max_ledger_shreds: u64, - ) -> (u64, Slot, Slot, u64) { - let mut shreds = Vec::new(); + ) -> (bool, Slot, Slot, u64) { + let mut total_slots = Vec::new(); let mut iterate_time = Measure::start("iterate_time"); let mut total_shreds = 0; let mut first_slot = 0; @@ -89,33 +89,43 @@ impl LedgerCleanupService { } // Not exact since non-full slots will have holes total_shreds += meta.received; - shreds.push((slot, meta.received)); + total_slots.push((slot, meta.received)); if slot > root { break; } } iterate_time.stop(); info!( - "checking for ledger purge: max_shreds: {} slots: {} total_shreds: {} {}", - max_ledger_shreds, - shreds.len(), + "first_slot={} total_slots={} total_shreds={} max_ledger_shreds={}, {}", + first_slot, + total_slots.len(), total_shreds, + max_ledger_shreds, iterate_time ); if (total_shreds as u64) < max_ledger_shreds { - return (0, 0, 0, total_shreds); + return (false, 0, 0, total_shreds); } - let mut cur_shreds = 0; - let mut lowest_slot_to_clean = shreds[0].0; - for (slot, num_shreds) in shreds.iter().rev() { - cur_shreds += *num_shreds as u64; - if cur_shreds > max_ledger_shreds { - lowest_slot_to_clean = *slot; + let mut num_shreds_to_clean = 0; + let mut lowest_cleanup_slot = total_slots[0].0; + for (slot, num_shreds) in total_slots.iter().rev() { + num_shreds_to_clean += *num_shreds as u64; + if num_shreds_to_clean > max_ledger_shreds { + lowest_cleanup_slot = *slot; break; } } - (cur_shreds, lowest_slot_to_clean, first_slot, total_shreds) + (true, lowest_cleanup_slot, first_slot, total_shreds) + } + + fn receive_new_roots(new_root_receiver: &Receiver) -> Result { + let mut root = new_root_receiver.recv_timeout(Duration::from_secs(1))?; + // Get the newest root + while let Ok(new_root) = new_root_receiver.try_recv() { + root = new_root; + } + Ok(root) } fn cleanup_ledger( @@ -124,58 +134,63 @@ impl LedgerCleanupService { max_ledger_shreds: u64, last_purge_slot: &mut u64, purge_interval: u64, + delay_between_purges: Option, ) -> Result<(), RecvTimeoutError> { - let mut root = new_root_receiver.recv_timeout(Duration::from_secs(1))?; - // Get the newest root - while let Ok(new_root) = new_root_receiver.try_recv() { - root = new_root; + let root = Self::receive_new_roots(new_root_receiver)?; + if root - *last_purge_slot <= purge_interval { + return Ok(()); } - if root - *last_purge_slot > purge_interval { - let disk_utilization_pre = blockstore.storage_size(); + let disk_utilization_pre = blockstore.storage_size(); + info!( + "purge: last_root={}, last_purge_slot={}, purge_interval={}, disk_utilization={:?}", + root, last_purge_slot, purge_interval, disk_utilization_pre + ); + *last_purge_slot = root; + + let (slots_to_clean, lowest_cleanup_slot, first_slot, total_shreds) = + Self::find_slots_to_clean(&blockstore, root, max_ledger_shreds); + + if slots_to_clean { info!( - "purge: new root: {} last_purge: {} purge_interval: {} disk: {:?}", - root, last_purge_slot, purge_interval, disk_utilization_pre + "purging data from slots {} to {}", + first_slot, lowest_cleanup_slot ); - *last_purge_slot = root; - - let (num_shreds_to_clean, lowest_slot_to_clean, mut first_slot, total_shreds) = - Self::find_slots_to_clean(blockstore, root, max_ledger_shreds); - - if num_shreds_to_clean > 0 { - debug!( - "cleaning up to: {} shreds: {} first: {}", - lowest_slot_to_clean, num_shreds_to_clean, first_slot - ); - loop { - let current_lowest = - std::cmp::min(lowest_slot_to_clean, first_slot + DEFAULT_PURGE_BATCH_SIZE); + let purge_complete = Arc::new(AtomicBool::new(false)); + let blockstore = blockstore.clone(); + let purge_complete1 = purge_complete.clone(); + let _t_purge = Builder::new() + .name("solana-ledger-purge".to_string()) + .spawn(move || { let mut slot_update_time = Measure::start("slot_update"); - *blockstore.lowest_cleanup_slot.write().unwrap() = current_lowest; + *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot; slot_update_time.stop(); - let mut clean_time = Measure::start("ledger_clean"); - blockstore.purge_slots(first_slot, Some(current_lowest)); - clean_time.stop(); - - debug!( - "ledger purge {} -> {}: {} {}", - first_slot, current_lowest, slot_update_time, clean_time + let mut purge_time = Measure::start("purge_slots_with_delay"); + blockstore.purge_slots_with_delay( + first_slot, + lowest_cleanup_slot, + delay_between_purges, ); - first_slot += DEFAULT_PURGE_BATCH_SIZE; - if current_lowest == lowest_slot_to_clean { - break; - } - thread::sleep(Duration::from_millis(500)); + purge_time.stop(); + info!("{}", purge_time); + purge_complete1.store(true, Ordering::Relaxed); + }) + .unwrap(); + + // Keep pulling roots off `new_root_receiver` while purging to avoid channel buildup + while !purge_complete.load(Ordering::Relaxed) { + if let Err(err) = Self::receive_new_roots(new_root_receiver) { + debug!("receive_new_roots: {}", err); } + thread::sleep(Duration::from_secs(1)); } - - let disk_utilization_post = blockstore.storage_size(); - - Self::report_disk_metrics(disk_utilization_pre, disk_utilization_post, total_shreds); } + let disk_utilization_post = blockstore.storage_size(); + Self::report_disk_metrics(disk_utilization_pre, disk_utilization_post, total_shreds); + Ok(()) } @@ -219,8 +234,15 @@ mod tests { //send a signal to kill all but 5 shreds, which will be in the newest slots let mut last_purge_slot = 0; sender.send(50).unwrap(); - LedgerCleanupService::cleanup_ledger(&receiver, &blockstore, 5, &mut last_purge_slot, 10) - .unwrap(); + LedgerCleanupService::cleanup_ledger( + &receiver, + &blockstore, + 5, + &mut last_purge_slot, + 10, + None, + ) + .unwrap(); //check that 0-40 don't exist blockstore @@ -273,6 +295,7 @@ mod tests { initial_slots, &mut last_purge_slot, 10, + None, ) .unwrap(); time.stop(); @@ -315,6 +338,7 @@ mod tests { max_ledger_shreds, &mut next_purge_batch, 10, + None, ) .unwrap(); diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 686ca2a80b..52e256b949 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -733,17 +733,6 @@ fn main() { .arg(&account_paths_arg) .arg(&halt_at_slot_arg) .arg(&hard_forks_arg) - ).subcommand( - SubCommand::with_name("prune") - .about("Prune the ledger at the block height") - .arg( - Arg::with_name("slot_list") - .long("slot-list") - .value_name("FILENAME") - .takes_value(true) - .required(true) - .help("The location of the YAML file with a list of rollback slot heights and hashes"), - ) ).subcommand( SubCommand::with_name("purge") .about("Purge the ledger at the block height") @@ -753,14 +742,14 @@ fn main() { .value_name("SLOT") .takes_value(true) .required(true) - .help("Start slot to purge from."), + .help("Start slot to purge from (inclusive)"), ) .arg( Arg::with_name("end_slot") .index(2) .value_name("SLOT") - .takes_value(true) - .help("Optional ending slot to stop purging."), + .required(true) + .help("Ending slot to stop purging (inclusive)"), ) ) .subcommand( @@ -1135,48 +1124,10 @@ fn main() { } ("purge", Some(arg_matches)) => { let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot); - let end_slot = value_t!(arg_matches, "end_slot", Slot); - let end_slot = end_slot.map_or(None, Some); + let end_slot = value_t_or_exit!(arg_matches, "end_slot", Slot); let blockstore = open_blockstore(&ledger_path); blockstore.purge_slots(start_slot, end_slot); } - ("prune", Some(arg_matches)) => { - if let Some(prune_file_path) = arg_matches.value_of("slot_list") { - let blockstore = open_blockstore(&ledger_path); - let prune_file = File::open(prune_file_path.to_string()).unwrap(); - let slot_hashes: BTreeMap = - serde_yaml::from_reader(prune_file).unwrap(); - - let iter = - RootedSlotIterator::new(0, &blockstore).expect("Failed to get rooted slot"); - - let potential_hashes: Vec<_> = iter - .filter_map(|(slot, _meta)| { - let blockhash = blockstore - .get_slot_entries(slot, 0) - .unwrap() - .last() - .unwrap() - .hash - .to_string(); - - slot_hashes.get(&slot).and_then(|hash| { - if *hash == blockhash { - Some((slot, blockhash)) - } else { - None - } - }) - }) - .collect(); - - let (target_slot, target_hash) = potential_hashes - .last() - .expect("Failed to find a valid slot"); - println!("Prune at slot {:?} hash {:?}", target_slot, target_hash); - blockstore.prune(*target_slot); - } - } ("list-roots", Some(arg_matches)) => { let blockstore = open_blockstore(&ledger_path); let max_height = if let Some(height) = arg_matches.value_of("max_height") { diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 6298646661..903840fdfd 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -299,47 +299,56 @@ impl Blockstore { false } - /// Silently deletes all blockstore column families starting at the given slot until the `to` slot + /// Silently deletes all blockstore column families in the range [from_slot,to_slot] /// Dangerous; Use with care: /// Does not check for integrity and does not update slot metas that refer to deleted slots /// Modifies multiple column families simultaneously - pub fn purge_slots(&self, mut from_slot: Slot, to_slot: Option) { + pub fn purge_slots_with_delay( + &self, + from_slot: Slot, + to_slot: Slot, + delay_between_purges: Option, + ) { // if there's no upper bound, split the purge request into batches of 1000 slots const PURGE_BATCH_SIZE: u64 = 1000; - let mut batch_end = to_slot.unwrap_or(from_slot + PURGE_BATCH_SIZE); - while from_slot < batch_end { - match self.run_purge(from_slot, batch_end) { - Ok(end) => { - if !self.no_compaction { - if let Err(e) = self.compact_storage(from_slot, batch_end) { - // This error is not fatal and indicates an internal error - error!( - "Error: {:?}; Couldn't compact storage from {:?} to {:?}", - e, from_slot, batch_end - ); - } - } + let mut batch_start = from_slot; + while batch_start < to_slot { + let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot); + match self.run_purge(batch_start, batch_end) { + Ok(_all_columns_purged) => { + batch_start = batch_end; - if end { - break; - } else { - // update the next batch bounds - from_slot = batch_end; - batch_end = to_slot.unwrap_or(batch_end + PURGE_BATCH_SIZE); + if let Some(ref duration) = delay_between_purges { + // Cooperate with other blockstore users + std::thread::sleep(*duration); } } Err(e) => { error!( "Error: {:?}; Purge failed in range {:?} to {:?}", - e, from_slot, batch_end + e, batch_start, batch_end ); break; } } } + + if !self.no_compaction { + if let Err(e) = self.compact_storage(from_slot, to_slot) { + // This error is not fatal and indicates an internal error + error!( + "Error: {:?}; Couldn't compact storage from {:?} to {:?}", + e, from_slot, to_slot + ); + } + } } - // Returns whether or not all columns have been purged until their end + pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) { + self.purge_slots_with_delay(from_slot, to_slot, None) + } + + // Returns whether or not all columns successfully purged the slot range fn run_purge(&self, from_slot: Slot, to_slot: Slot) -> Result { let mut write_batch = self .db @@ -347,6 +356,8 @@ impl Blockstore { .expect("Database Error: Failed to get write batch"); // delete range cf is not inclusive let to_slot = to_slot.checked_add(1).unwrap_or_else(|| std::u64::MAX); + + let mut delete_range_timer = Measure::start("delete_range"); let mut columns_empty = self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) @@ -403,6 +414,7 @@ impl Blockstore { .delete_range_cf::(&mut write_batch, index, index + 1) .unwrap_or(false); } + delete_range_timer.stop(); let mut write_timer = Measure::start("write_batch"); if let Err(e) = self.db.write(write_batch) { error!( @@ -414,12 +426,17 @@ impl Blockstore { write_timer.stop(); datapoint_info!( "blockstore-purge", + ("from_slot", from_slot as i64, i64), + ("to_slot", to_slot as i64, i64), + ("delete_range_us", delete_range_timer.as_us() as i64, i64), ("write_batch_us", write_timer.as_us() as i64, i64) ); Ok(columns_empty) } pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result { + info!("compact_storage: from {} to {}", from_slot, to_slot); + let mut compact_timer = Measure::start("compact_range"); let result = self .meta_cf .compact_range(from_slot, to_slot) @@ -473,6 +490,14 @@ impl Blockstore { .rewards_cf .compact_range(from_slot, to_slot) .unwrap_or(false); + compact_timer.stop(); + if !result { + info!("compact_storage incomplete"); + } + datapoint_info!( + "blockstore-compact", + ("compact_range_us", compact_timer.as_us() as i64, i64), + ); Ok(result) } @@ -2137,39 +2162,6 @@ impl Blockstore { Ok(orphans_iter.map(|(slot, _)| slot)) } - /// Prune blockstore such that slots higher than `target_slot` are deleted and all references to - /// higher slots are removed - pub fn prune(&self, target_slot: Slot) { - let mut meta = self - .meta(target_slot) - .expect("couldn't read slot meta") - .expect("no meta for target slot"); - meta.next_slots.clear(); - self.put_meta_bytes( - target_slot, - &bincode::serialize(&meta).expect("couldn't get meta bytes"), - ) - .expect("unable to update meta for target slot"); - - self.purge_slots(target_slot + 1, None); - - // fixup anything that refers to non-root slots and delete the rest - for (slot, mut meta) in self - .slot_meta_iterator(0) - .expect("unable to iterate over meta") - { - if slot > target_slot { - break; - } - meta.next_slots.retain(|slot| *slot <= target_slot); - self.put_meta_bytes( - slot, - &bincode::serialize(&meta).expect("couldn't update meta"), - ) - .expect("couldn't update meta"); - } - } - pub fn last_root(&self) -> Slot { *self.last_root.read().unwrap() } @@ -4808,42 +4800,6 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } - #[test] - fn test_prune() { - let blockstore_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let (shreds, _) = make_many_slot_entries(0, 50, 6); - let shreds_per_slot = shreds.len() as u64 / 50; - blockstore.insert_shreds(shreds, None, false).unwrap(); - blockstore - .slot_meta_iterator(0) - .unwrap() - .for_each(|(_, meta)| assert_eq!(meta.last_index, shreds_per_slot - 1)); - - blockstore.prune(5); - - blockstore - .slot_meta_iterator(0) - .unwrap() - .for_each(|(slot, meta)| { - assert!(slot <= 5); - assert_eq!(meta.last_index, shreds_per_slot - 1) - }); - - let data_iter = blockstore - .data_shred_cf - .iter(IteratorMode::From((0, 0), IteratorDirection::Forward)) - .unwrap(); - for ((slot, _), _) in data_iter { - if slot > 5 { - assert!(false); - } - } - - drop(blockstore); - Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); - } - #[test] fn test_purge_slots() { let blockstore_path = get_tmp_ledger_path!(); @@ -4851,11 +4807,11 @@ pub mod tests { let (shreds, _) = make_many_slot_entries(0, 50, 5); blockstore.insert_shreds(shreds, None, false).unwrap(); - blockstore.purge_slots(0, Some(5)); + blockstore.purge_slots(0, 5); test_all_empty_or_min(&blockstore, 6); - blockstore.purge_slots(0, None); + blockstore.purge_slots(0, 50); // min slot shouldn't matter, blockstore should be empty test_all_empty_or_min(&blockstore, 100); @@ -4879,7 +4835,7 @@ pub mod tests { let (shreds, _) = make_many_slot_entries(0, 5000, 10); blockstore.insert_shreds(shreds, None, false).unwrap(); - blockstore.purge_slots(0, Some(4999)); + blockstore.purge_slots(0, 4999); test_all_empty_or_min(&blockstore, 5000); @@ -4887,19 +4843,6 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } - #[should_panic] - #[test] - fn test_prune_out_of_bounds() { - let blockstore_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - - // slot 5 does not exist, prune should panic - blockstore.prune(5); - - drop(blockstore); - Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); - } - #[test] fn test_iter_bounds() { let blockstore_path = get_tmp_ledger_path!(); @@ -6400,14 +6343,14 @@ pub mod tests { .insert_shreds(all_shreds, Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, Some(slot)); + blockstore.purge_slots(0, slot); // Test inserting just the codes, enough for recovery blockstore .insert_shreds(coding_shreds.clone(), Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, Some(slot)); + blockstore.purge_slots(0, slot); // Test inserting some codes, but not enough for recovery blockstore @@ -6418,7 +6361,7 @@ pub mod tests { ) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, Some(slot)); + blockstore.purge_slots(0, slot); // Test inserting just the codes, and some data, enough for recovery let shreds: Vec<_> = data_shreds[..data_shreds.len() - 1] @@ -6430,7 +6373,7 @@ pub mod tests { .insert_shreds(shreds, Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, Some(slot)); + blockstore.purge_slots(0, slot); // Test inserting some codes, and some data, but enough for recovery let shreds: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1] @@ -6442,7 +6385,7 @@ pub mod tests { .insert_shreds(shreds, Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, Some(slot)); + blockstore.purge_slots(0, slot); // Test inserting all shreds in 2 rounds, make sure nothing is lost let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1] @@ -6462,7 +6405,7 @@ pub mod tests { .insert_shreds(shreds2, Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, Some(slot)); + blockstore.purge_slots(0, slot); // Test not all, but enough data and coding shreds in 2 rounds to trigger recovery, // make sure nothing is lost @@ -6487,7 +6430,7 @@ pub mod tests { .insert_shreds(shreds2, Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, Some(slot)); + blockstore.purge_slots(0, slot); // Test insert shreds in 2 rounds, but not enough to trigger // recovery, make sure nothing is lost @@ -6512,7 +6455,7 @@ pub mod tests { .insert_shreds(shreds2, Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, Some(slot)); + blockstore.purge_slots(0, slot); } Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } diff --git a/ledger/tests/blockstore.rs b/ledger/tests/blockstore.rs index 04588a027e..4fa73002a6 100644 --- a/ledger/tests/blockstore.rs +++ b/ledger/tests/blockstore.rs @@ -15,7 +15,7 @@ fn test_multiple_threads_insert_shred() { for _ in 0..100 { let num_threads = 10; - // Create `num_threads` different ticks in slots 1..num_therads + 1, all + // Create `num_threads` different ticks in slots 1..num_threads + 1, all // with parent = slot 0 let threads: Vec<_> = (0..num_threads) .map(|i| { @@ -42,7 +42,7 @@ fn test_multiple_threads_insert_shred() { assert_eq!(meta0.next_slots, expected_next_slots); // Delete slots for next iteration - blockstore.purge_slots(0, None); + blockstore.purge_slots(0, num_threads + 1); } // Cleanup