LedgerCleanupService no longer causes an OOM and actually purges (#10199)

* cleanup_ledger() now services new_root_receiver while purging
* purge_slots() now fully deletes before compacting
* Add ledger pruning grafana graph
This commit is contained in:
Michael Vines
2020-05-24 21:41:54 -07:00
committed by GitHub
parent 8a8384e674
commit 156387aba4
6 changed files with 374 additions and 260 deletions

View File

@ -301,47 +301,56 @@ impl Blockstore {
false
}
/// Silently deletes all blockstore column families starting at the given slot until the `to` slot
/// Silently deletes all blockstore column families in the range [from_slot,to_slot]
/// Dangerous; Use with care:
/// Does not check for integrity and does not update slot metas that refer to deleted slots
/// Modifies multiple column families simultaneously
pub fn purge_slots(&self, mut from_slot: Slot, to_slot: Option<Slot>) {
pub fn purge_slots_with_delay(
&self,
from_slot: Slot,
to_slot: Slot,
delay_between_purges: Option<Duration>,
) {
// if there's no upper bound, split the purge request into batches of 1000 slots
const PURGE_BATCH_SIZE: u64 = 1000;
let mut batch_end = to_slot.unwrap_or(from_slot + PURGE_BATCH_SIZE);
while from_slot < batch_end {
match self.run_purge(from_slot, batch_end) {
Ok(end) => {
if !self.no_compaction {
if let Err(e) = self.compact_storage(from_slot, batch_end) {
// This error is not fatal and indicates an internal error
error!(
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
e, from_slot, batch_end
);
}
}
let mut batch_start = from_slot;
while batch_start < to_slot {
let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot);
match self.run_purge(batch_start, batch_end) {
Ok(_all_columns_purged) => {
batch_start = batch_end;
if end {
break;
} else {
// update the next batch bounds
from_slot = batch_end;
batch_end = to_slot.unwrap_or(batch_end + PURGE_BATCH_SIZE);
if let Some(ref duration) = delay_between_purges {
// Cooperate with other blockstore users
std::thread::sleep(*duration);
}
}
Err(e) => {
error!(
"Error: {:?}; Purge failed in range {:?} to {:?}",
e, from_slot, batch_end
e, batch_start, batch_end
);
break;
}
}
}
if !self.no_compaction {
if let Err(e) = self.compact_storage(from_slot, to_slot) {
// This error is not fatal and indicates an internal error
error!(
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
e, from_slot, to_slot
);
}
}
}
// Returns whether or not all columns have been purged until their end
pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) {
self.purge_slots_with_delay(from_slot, to_slot, None)
}
// Returns whether or not all columns successfully purged the slot range
fn run_purge(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> {
let mut write_batch = self
.db
@ -349,6 +358,8 @@ impl Blockstore {
.expect("Database Error: Failed to get write batch");
// delete range cf is not inclusive
let to_slot = to_slot.checked_add(1).unwrap_or_else(|| std::u64::MAX);
let mut delete_range_timer = Measure::start("delete_range");
let mut columns_empty = self
.db
.delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot)
@ -405,6 +416,7 @@ impl Blockstore {
.delete_range_cf::<cf::AddressSignatures>(&mut write_batch, index, index + 1)
.unwrap_or(false);
}
delete_range_timer.stop();
let mut write_timer = Measure::start("write_batch");
if let Err(e) = self.db.write(write_batch) {
error!(
@ -416,12 +428,17 @@ impl Blockstore {
write_timer.stop();
datapoint_info!(
"blockstore-purge",
("from_slot", from_slot as i64, i64),
("to_slot", to_slot as i64, i64),
("delete_range_us", delete_range_timer.as_us() as i64, i64),
("write_batch_us", write_timer.as_us() as i64, i64)
);
Ok(columns_empty)
}
pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> {
info!("compact_storage: from {} to {}", from_slot, to_slot);
let mut compact_timer = Measure::start("compact_range");
let result = self
.meta_cf
.compact_range(from_slot, to_slot)
@ -475,6 +492,14 @@ impl Blockstore {
.rewards_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false);
compact_timer.stop();
if !result {
info!("compact_storage incomplete");
}
datapoint_info!(
"blockstore-compact",
("compact_range_us", compact_timer.as_us() as i64, i64),
);
Ok(result)
}
@ -2262,39 +2287,6 @@ impl Blockstore {
Ok(dead_slots_iterator.map(|(slot, _)| slot))
}
/// Prune blockstore such that slots higher than `target_slot` are deleted and all references to
/// higher slots are removed
pub fn prune(&self, target_slot: Slot) {
let mut meta = self
.meta(target_slot)
.expect("couldn't read slot meta")
.expect("no meta for target slot");
meta.next_slots.clear();
self.put_meta_bytes(
target_slot,
&bincode::serialize(&meta).expect("couldn't get meta bytes"),
)
.expect("unable to update meta for target slot");
self.purge_slots(target_slot + 1, None);
// fixup anything that refers to non-root slots and delete the rest
for (slot, mut meta) in self
.slot_meta_iterator(0)
.expect("unable to iterate over meta")
{
if slot > target_slot {
break;
}
meta.next_slots.retain(|slot| *slot <= target_slot);
self.put_meta_bytes(
slot,
&bincode::serialize(&meta).expect("couldn't update meta"),
)
.expect("couldn't update meta");
}
}
pub fn last_root(&self) -> Slot {
*self.last_root.read().unwrap()
}
@ -4994,42 +4986,6 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_prune() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let (shreds, _) = make_many_slot_entries(0, 50, 6);
let shreds_per_slot = shreds.len() as u64 / 50;
blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore
.slot_meta_iterator(0)
.unwrap()
.for_each(|(_, meta)| assert_eq!(meta.last_index, shreds_per_slot - 1));
blockstore.prune(5);
blockstore
.slot_meta_iterator(0)
.unwrap()
.for_each(|(slot, meta)| {
assert!(slot <= 5);
assert_eq!(meta.last_index, shreds_per_slot - 1)
});
let data_iter = blockstore
.data_shred_cf
.iter(IteratorMode::From((0, 0), IteratorDirection::Forward))
.unwrap();
for ((slot, _), _) in data_iter {
if slot > 5 {
panic!();
}
}
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_purge_slots() {
let blockstore_path = get_tmp_ledger_path!();
@ -5037,11 +4993,11 @@ pub mod tests {
let (shreds, _) = make_many_slot_entries(0, 50, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore.purge_slots(0, Some(5));
blockstore.purge_slots(0, 5);
test_all_empty_or_min(&blockstore, 6);
blockstore.purge_slots(0, None);
blockstore.purge_slots(0, 50);
// min slot shouldn't matter, blockstore should be empty
test_all_empty_or_min(&blockstore, 100);
@ -5065,7 +5021,7 @@ pub mod tests {
let (shreds, _) = make_many_slot_entries(0, 5000, 10);
blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore.purge_slots(0, Some(4999));
blockstore.purge_slots(0, 4999);
test_all_empty_or_min(&blockstore, 5000);
@ -5073,19 +5029,6 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[should_panic]
#[test]
fn test_prune_out_of_bounds() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
// slot 5 does not exist, prune should panic
blockstore.prune(5);
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_iter_bounds() {
let blockstore_path = get_tmp_ledger_path!();
@ -6588,14 +6531,14 @@ pub mod tests {
.insert_shreds(all_shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot));
blockstore.purge_slots(0, slot);
// Test inserting just the codes, enough for recovery
blockstore
.insert_shreds(coding_shreds.clone(), Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot));
blockstore.purge_slots(0, slot);
// Test inserting some codes, but not enough for recovery
blockstore
@ -6606,7 +6549,7 @@ pub mod tests {
)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot));
blockstore.purge_slots(0, slot);
// Test inserting just the codes, and some data, enough for recovery
let shreds: Vec<_> = data_shreds[..data_shreds.len() - 1]
@ -6618,7 +6561,7 @@ pub mod tests {
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot));
blockstore.purge_slots(0, slot);
// Test inserting some codes, and some data, but enough for recovery
let shreds: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
@ -6630,7 +6573,7 @@ pub mod tests {
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot));
blockstore.purge_slots(0, slot);
// Test inserting all shreds in 2 rounds, make sure nothing is lost
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
@ -6650,7 +6593,7 @@ pub mod tests {
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot));
blockstore.purge_slots(0, slot);
// Test not all, but enough data and coding shreds in 2 rounds to trigger recovery,
// make sure nothing is lost
@ -6675,7 +6618,7 @@ pub mod tests {
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot));
blockstore.purge_slots(0, slot);
// Test insert shreds in 2 rounds, but not enough to trigger
// recovery, make sure nothing is lost
@ -6700,7 +6643,7 @@ pub mod tests {
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, Some(slot));
blockstore.purge_slots(0, slot);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}

View File

@ -15,7 +15,7 @@ fn test_multiple_threads_insert_shred() {
for _ in 0..100 {
let num_threads = 10;
// Create `num_threads` different ticks in slots 1..num_therads + 1, all
// Create `num_threads` different ticks in slots 1..num_threads + 1, all
// with parent = slot 0
let threads: Vec<_> = (0..num_threads)
.map(|i| {
@ -42,7 +42,7 @@ fn test_multiple_threads_insert_shred() {
assert_eq!(meta0.next_slots, expected_next_slots);
// Delete slots for next iteration
blockstore.purge_slots(0, None);
blockstore.purge_slots(0, num_threads + 1);
}
// Cleanup