diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 68e50414f9..faee12d5fe 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -117,4 +117,43 @@ mod tests { drop(blocktree); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + + #[test] + fn test_compaction() { + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap()); + + let n = 10_000; + let batch_size = 100; + let batches = n / batch_size; + let max_ledger_slots = 100; + + for i in 0..batches { + let (shreds, _) = make_many_slot_entries(i * batch_size, batch_size, 1); + blocktree.insert_shreds(shreds, None, false).unwrap(); + } + + let u1 = blocktree.storage_size().unwrap() as f64; + + // send signal to cleanup slots + let (sender, receiver) = channel(); + sender.send((n, Pubkey::default())).unwrap(); + LedgerCleanupService::cleanup_ledger(&receiver, &blocktree, max_ledger_slots).unwrap(); + + thread::sleep(Duration::from_secs(2)); + + let u2 = blocktree.storage_size().unwrap() as f64; + + assert!(u2 < u1, "insufficient compaction! pre={},post={}", u1, u2,); + + // check that early slots don't exist + let max_slot = n - max_ledger_slots; + blocktree + .slot_meta_iterator(0) + .unwrap() + .for_each(|(slot, _)| assert!(slot > max_slot)); + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } } diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index b29c2d17b0..168a0e7f97 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -275,8 +275,8 @@ impl Blocktree { // Returns whether or not all iterators have reached their end fn run_purge_batch(&self, from_slot: Slot, batch_end: Slot) -> Result { - let from_slot = Some(from_slot); - let batch_end = Some(batch_end); + let some_from_slot = Some(from_slot); + let some_batch_end = Some(batch_end); let mut write_batch = self .db @@ -284,40 +284,77 @@ impl Blocktree { .expect("Database Error: Failed to get write batch"); let end = self .meta_cf - .delete_slot(&mut write_batch, from_slot, batch_end) + .delete_slot(&mut write_batch, some_from_slot, some_batch_end) .unwrap_or(false) + & self + .meta_cf + .compact_range(from_slot, batch_end) + .unwrap_or(false) & self .erasure_meta_cf - .delete_slot(&mut write_batch, from_slot, batch_end) + .delete_slot(&mut write_batch, some_from_slot, some_batch_end) + .unwrap_or(false) + & self + .erasure_meta_cf + .compact_range(from_slot, batch_end) .unwrap_or(false) & self .data_shred_cf - .delete_slot(&mut write_batch, from_slot, batch_end) + .delete_slot(&mut write_batch, some_from_slot, some_batch_end) + .unwrap_or(false) + & self + .data_shred_cf + .compact_range(from_slot, batch_end) .unwrap_or(false) & self .code_shred_cf - .delete_slot(&mut write_batch, from_slot, batch_end) + .delete_slot(&mut write_batch, some_from_slot, some_batch_end) + .unwrap_or(false) + & self + .code_shred_cf + .compact_range(from_slot, batch_end) .unwrap_or(false) & self .transaction_status_cf - .delete_slot(&mut write_batch, from_slot, batch_end) + .delete_slot(&mut write_batch, some_from_slot, some_batch_end) + .unwrap_or(false) + & self + .transaction_status_cf + .compact_range(from_slot, batch_end) .unwrap_or(false) & self .orphans_cf - .delete_slot(&mut write_batch, from_slot, batch_end) + .delete_slot(&mut write_batch, some_from_slot, some_batch_end) + .unwrap_or(false) + & self + .orphans_cf + .compact_range(from_slot, batch_end) .unwrap_or(false) & self .index_cf - .delete_slot(&mut write_batch, from_slot, batch_end) + .delete_slot(&mut write_batch, some_from_slot, some_batch_end) + .unwrap_or(false) + & self + .index_cf + .compact_range(from_slot, batch_end) .unwrap_or(false) & self .dead_slots_cf - .delete_slot(&mut write_batch, from_slot, batch_end) + .delete_slot(&mut write_batch, some_from_slot, some_batch_end) + .unwrap_or(false) + & self + .dead_slots_cf + .compact_range(from_slot, batch_end) .unwrap_or(false) & self .db .column::() - .delete_slot(&mut write_batch, from_slot, batch_end) + .delete_slot(&mut write_batch, some_from_slot, some_batch_end) + .unwrap_or(false) + & self + .db + .column::() + .compact_range(from_slot, batch_end) .unwrap_or(false); if let Err(e) = self.db.write(write_batch) { diff --git a/ledger/src/blocktree_db.rs b/ledger/src/blocktree_db.rs index 60e5c9bbc6..383c6a5c2d 100644 --- a/ledger/src/blocktree_db.rs +++ b/ledger/src/blocktree_db.rs @@ -641,6 +641,17 @@ where Ok(end) } + pub fn compact_range(&self, from: Slot, to: Slot) -> Result + where + C::Index: PartialOrd + Copy, + { + let cf = self.handle(); + let from = Some(C::key(C::as_index(from))); + let to = Some(C::key(C::as_index(to))); + self.backend.0.compact_range_cf(cf, from, to); + Ok(true) + } + #[inline] pub fn handle(&self) -> &ColumnFamily { self.backend.cf_handle(C::NAME)