Add compact_cf
calls to reclaim storage during ledger slot purge (#7264)
This commit is contained in:
@ -117,4 +117,43 @@ mod tests {
|
||||
drop(blocktree);
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compaction() {
|
||||
let blocktree_path = get_tmp_ledger_path!();
|
||||
let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap());
|
||||
|
||||
let n = 10_000;
|
||||
let batch_size = 100;
|
||||
let batches = n / batch_size;
|
||||
let max_ledger_slots = 100;
|
||||
|
||||
for i in 0..batches {
|
||||
let (shreds, _) = make_many_slot_entries(i * batch_size, batch_size, 1);
|
||||
blocktree.insert_shreds(shreds, None, false).unwrap();
|
||||
}
|
||||
|
||||
let u1 = blocktree.storage_size().unwrap() as f64;
|
||||
|
||||
// send signal to cleanup slots
|
||||
let (sender, receiver) = channel();
|
||||
sender.send((n, Pubkey::default())).unwrap();
|
||||
LedgerCleanupService::cleanup_ledger(&receiver, &blocktree, max_ledger_slots).unwrap();
|
||||
|
||||
thread::sleep(Duration::from_secs(2));
|
||||
|
||||
let u2 = blocktree.storage_size().unwrap() as f64;
|
||||
|
||||
assert!(u2 < u1, "insufficient compaction! pre={},post={}", u1, u2,);
|
||||
|
||||
// check that early slots don't exist
|
||||
let max_slot = n - max_ledger_slots;
|
||||
blocktree
|
||||
.slot_meta_iterator(0)
|
||||
.unwrap()
|
||||
.for_each(|(slot, _)| assert!(slot > max_slot));
|
||||
|
||||
drop(blocktree);
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
}
|
||||
}
|
||||
|
@ -275,8 +275,8 @@ impl Blocktree {
|
||||
|
||||
// Returns whether or not all iterators have reached their end
|
||||
fn run_purge_batch(&self, from_slot: Slot, batch_end: Slot) -> Result<bool> {
|
||||
let from_slot = Some(from_slot);
|
||||
let batch_end = Some(batch_end);
|
||||
let some_from_slot = Some(from_slot);
|
||||
let some_batch_end = Some(batch_end);
|
||||
|
||||
let mut write_batch = self
|
||||
.db
|
||||
@ -284,40 +284,77 @@ impl Blocktree {
|
||||
.expect("Database Error: Failed to get write batch");
|
||||
let end = self
|
||||
.meta_cf
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
.delete_slot(&mut write_batch, some_from_slot, some_batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.meta_cf
|
||||
.compact_range(from_slot, batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.erasure_meta_cf
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
.delete_slot(&mut write_batch, some_from_slot, some_batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.erasure_meta_cf
|
||||
.compact_range(from_slot, batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.data_shred_cf
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
.delete_slot(&mut write_batch, some_from_slot, some_batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.data_shred_cf
|
||||
.compact_range(from_slot, batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.code_shred_cf
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
.delete_slot(&mut write_batch, some_from_slot, some_batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.code_shred_cf
|
||||
.compact_range(from_slot, batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.transaction_status_cf
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
.delete_slot(&mut write_batch, some_from_slot, some_batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.transaction_status_cf
|
||||
.compact_range(from_slot, batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.orphans_cf
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
.delete_slot(&mut write_batch, some_from_slot, some_batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.orphans_cf
|
||||
.compact_range(from_slot, batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.index_cf
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
.delete_slot(&mut write_batch, some_from_slot, some_batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.index_cf
|
||||
.compact_range(from_slot, batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.dead_slots_cf
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
.delete_slot(&mut write_batch, some_from_slot, some_batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.dead_slots_cf
|
||||
.compact_range(from_slot, batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.db
|
||||
.column::<cf::Root>()
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
.delete_slot(&mut write_batch, some_from_slot, some_batch_end)
|
||||
.unwrap_or(false)
|
||||
& self
|
||||
.db
|
||||
.column::<cf::Root>()
|
||||
.compact_range(from_slot, batch_end)
|
||||
.unwrap_or(false);
|
||||
|
||||
if let Err(e) = self.db.write(write_batch) {
|
||||
|
@ -641,6 +641,17 @@ where
|
||||
Ok(end)
|
||||
}
|
||||
|
||||
pub fn compact_range(&self, from: Slot, to: Slot) -> Result<bool>
|
||||
where
|
||||
C::Index: PartialOrd + Copy,
|
||||
{
|
||||
let cf = self.handle();
|
||||
let from = Some(C::key(C::as_index(from)));
|
||||
let to = Some(C::key(C::as_index(to)));
|
||||
self.backend.0.compact_range_cf(cf, from, to);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn handle(&self) -> &ColumnFamily {
|
||||
self.backend.cf_handle(C::NAME)
|
||||
|
Reference in New Issue
Block a user