ledger_cleanup_service: compact at a slower rate than purging (#10414)
This commit is contained in:
@ -3,7 +3,7 @@
|
|||||||
use solana_ledger::blockstore::{Blockstore, PurgeType};
|
use solana_ledger::blockstore::{Blockstore, PurgeType};
|
||||||
use solana_ledger::blockstore_db::Result as BlockstoreResult;
|
use solana_ledger::blockstore_db::Result as BlockstoreResult;
|
||||||
use solana_measure::measure::Measure;
|
use solana_measure::measure::Measure;
|
||||||
use solana_sdk::clock::Slot;
|
use solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SLOT, TICKS_PER_DAY};
|
||||||
use std::string::ToString;
|
use std::string::ToString;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::{Receiver, RecvTimeoutError};
|
use std::sync::mpsc::{Receiver, RecvTimeoutError};
|
||||||
@ -32,6 +32,10 @@ pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512;
|
|||||||
// Delay between purges to cooperate with other blockstore users
|
// Delay between purges to cooperate with other blockstore users
|
||||||
pub const DEFAULT_DELAY_BETWEEN_PURGES: Duration = Duration::from_millis(500);
|
pub const DEFAULT_DELAY_BETWEEN_PURGES: Duration = Duration::from_millis(500);
|
||||||
|
|
||||||
|
// Compacting at a slower interval than purging helps keep IOPS down.
|
||||||
|
// Once a day should be ample
|
||||||
|
const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_SLOT;
|
||||||
|
|
||||||
pub struct LedgerCleanupService {
|
pub struct LedgerCleanupService {
|
||||||
t_cleanup: JoinHandle<()>,
|
t_cleanup: JoinHandle<()>,
|
||||||
}
|
}
|
||||||
@ -49,6 +53,8 @@ impl LedgerCleanupService {
|
|||||||
);
|
);
|
||||||
let exit = exit.clone();
|
let exit = exit.clone();
|
||||||
let mut last_purge_slot = 0;
|
let mut last_purge_slot = 0;
|
||||||
|
let mut last_compaction_slot = 0;
|
||||||
|
|
||||||
let t_cleanup = Builder::new()
|
let t_cleanup = Builder::new()
|
||||||
.name("solana-ledger-cleanup".to_string())
|
.name("solana-ledger-cleanup".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
@ -62,6 +68,8 @@ impl LedgerCleanupService {
|
|||||||
&mut last_purge_slot,
|
&mut last_purge_slot,
|
||||||
DEFAULT_PURGE_SLOT_INTERVAL,
|
DEFAULT_PURGE_SLOT_INTERVAL,
|
||||||
Some(DEFAULT_DELAY_BETWEEN_PURGES),
|
Some(DEFAULT_DELAY_BETWEEN_PURGES),
|
||||||
|
&mut last_compaction_slot,
|
||||||
|
DEFAULT_COMPACTION_SLOT_INTERVAL,
|
||||||
) {
|
) {
|
||||||
match e {
|
match e {
|
||||||
RecvTimeoutError::Disconnected => break,
|
RecvTimeoutError::Disconnected => break,
|
||||||
@ -116,7 +124,7 @@ impl LedgerCleanupService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(true, lowest_cleanup_slot, first_slot, total_shreds)
|
(true, first_slot, lowest_cleanup_slot, total_shreds)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> {
|
fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> {
|
||||||
@ -135,6 +143,8 @@ impl LedgerCleanupService {
|
|||||||
last_purge_slot: &mut u64,
|
last_purge_slot: &mut u64,
|
||||||
purge_interval: u64,
|
purge_interval: u64,
|
||||||
delay_between_purges: Option<Duration>,
|
delay_between_purges: Option<Duration>,
|
||||||
|
last_compaction_slot: &mut u64,
|
||||||
|
compaction_interval: u64,
|
||||||
) -> Result<(), RecvTimeoutError> {
|
) -> Result<(), RecvTimeoutError> {
|
||||||
let root = Self::receive_new_roots(new_root_receiver)?;
|
let root = Self::receive_new_roots(new_root_receiver)?;
|
||||||
if root - *last_purge_slot <= purge_interval {
|
if root - *last_purge_slot <= purge_interval {
|
||||||
@ -143,19 +153,20 @@ impl LedgerCleanupService {
|
|||||||
|
|
||||||
let disk_utilization_pre = blockstore.storage_size();
|
let disk_utilization_pre = blockstore.storage_size();
|
||||||
info!(
|
info!(
|
||||||
"purge: last_root={}, last_purge_slot={}, purge_interval={}, disk_utilization={:?}",
|
"purge: last_root={}, last_purge_slot={}, purge_interval={}, last_compaction_slot={}, disk_utilization={:?}",
|
||||||
root, last_purge_slot, purge_interval, disk_utilization_pre
|
root, last_purge_slot, purge_interval, last_compaction_slot, disk_utilization_pre
|
||||||
);
|
);
|
||||||
*last_purge_slot = root;
|
*last_purge_slot = root;
|
||||||
|
|
||||||
let (slots_to_clean, lowest_cleanup_slot, first_slot, total_shreds) =
|
let (slots_to_clean, purge_first_slot, lowest_cleanup_slot, total_shreds) =
|
||||||
Self::find_slots_to_clean(&blockstore, root, max_ledger_shreds);
|
Self::find_slots_to_clean(&blockstore, root, max_ledger_shreds);
|
||||||
|
|
||||||
if slots_to_clean {
|
if slots_to_clean {
|
||||||
info!(
|
let mut compact_first_slot = std::u64::MAX;
|
||||||
"purging data from slots {} to {}",
|
if lowest_cleanup_slot.saturating_sub(*last_compaction_slot) > compaction_interval {
|
||||||
first_slot, lowest_cleanup_slot
|
compact_first_slot = *last_compaction_slot;
|
||||||
);
|
*last_compaction_slot = lowest_cleanup_slot;
|
||||||
|
}
|
||||||
|
|
||||||
let purge_complete = Arc::new(AtomicBool::new(false));
|
let purge_complete = Arc::new(AtomicBool::new(false));
|
||||||
let blockstore = blockstore.clone();
|
let blockstore = blockstore.clone();
|
||||||
@ -167,15 +178,37 @@ impl LedgerCleanupService {
|
|||||||
*blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot;
|
*blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot;
|
||||||
slot_update_time.stop();
|
slot_update_time.stop();
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"purging data from slots {} to {}",
|
||||||
|
purge_first_slot, lowest_cleanup_slot
|
||||||
|
);
|
||||||
|
|
||||||
let mut purge_time = Measure::start("purge_slots_with_delay");
|
let mut purge_time = Measure::start("purge_slots_with_delay");
|
||||||
blockstore.purge_slots_with_delay(
|
blockstore.purge_slots_with_delay(
|
||||||
first_slot,
|
purge_first_slot,
|
||||||
lowest_cleanup_slot,
|
lowest_cleanup_slot,
|
||||||
delay_between_purges,
|
delay_between_purges,
|
||||||
PurgeType::PrimaryIndex,
|
PurgeType::PrimaryIndex,
|
||||||
);
|
);
|
||||||
purge_time.stop();
|
purge_time.stop();
|
||||||
info!("{}", purge_time);
|
info!("{}", purge_time);
|
||||||
|
|
||||||
|
if compact_first_slot < lowest_cleanup_slot {
|
||||||
|
info!(
|
||||||
|
"compacting data from slots {} to {}",
|
||||||
|
compact_first_slot, lowest_cleanup_slot
|
||||||
|
);
|
||||||
|
if let Err(err) =
|
||||||
|
blockstore.compact_storage(compact_first_slot, lowest_cleanup_slot)
|
||||||
|
{
|
||||||
|
// This error is not fatal and indicates an internal error?
|
||||||
|
error!(
|
||||||
|
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
|
||||||
|
err, compact_first_slot, lowest_cleanup_slot
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
purge_complete1.store(true, Ordering::Relaxed);
|
purge_complete1.store(true, Ordering::Relaxed);
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@ -234,6 +267,7 @@ mod tests {
|
|||||||
|
|
||||||
//send a signal to kill all but 5 shreds, which will be in the newest slots
|
//send a signal to kill all but 5 shreds, which will be in the newest slots
|
||||||
let mut last_purge_slot = 0;
|
let mut last_purge_slot = 0;
|
||||||
|
let mut last_compaction_slot = 0;
|
||||||
sender.send(50).unwrap();
|
sender.send(50).unwrap();
|
||||||
LedgerCleanupService::cleanup_ledger(
|
LedgerCleanupService::cleanup_ledger(
|
||||||
&receiver,
|
&receiver,
|
||||||
@ -242,6 +276,8 @@ mod tests {
|
|||||||
&mut last_purge_slot,
|
&mut last_purge_slot,
|
||||||
10,
|
10,
|
||||||
None,
|
None,
|
||||||
|
&mut last_compaction_slot,
|
||||||
|
10,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@ -273,6 +309,7 @@ mod tests {
|
|||||||
info!("{}", first_insert);
|
info!("{}", first_insert);
|
||||||
|
|
||||||
let mut last_purge_slot = 0;
|
let mut last_purge_slot = 0;
|
||||||
|
let mut last_compaction_slot = 0;
|
||||||
let mut slot = initial_slots;
|
let mut slot = initial_slots;
|
||||||
let mut num_slots = 6;
|
let mut num_slots = 6;
|
||||||
for _ in 0..5 {
|
for _ in 0..5 {
|
||||||
@ -297,6 +334,8 @@ mod tests {
|
|||||||
&mut last_purge_slot,
|
&mut last_purge_slot,
|
||||||
10,
|
10,
|
||||||
None,
|
None,
|
||||||
|
&mut last_compaction_slot,
|
||||||
|
10,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
time.stop();
|
time.stop();
|
||||||
|
@ -374,14 +374,17 @@ mod tests {
|
|||||||
// send signal to cleanup slots
|
// send signal to cleanup slots
|
||||||
let (sender, receiver) = channel();
|
let (sender, receiver) = channel();
|
||||||
sender.send(n).unwrap();
|
sender.send(n).unwrap();
|
||||||
let mut next_purge_batch = 0;
|
let mut last_purge_slot = 0;
|
||||||
|
let mut last_compaction_slot = 0;
|
||||||
LedgerCleanupService::cleanup_ledger(
|
LedgerCleanupService::cleanup_ledger(
|
||||||
&receiver,
|
&receiver,
|
||||||
&blockstore,
|
&blockstore,
|
||||||
max_ledger_shreds,
|
max_ledger_shreds,
|
||||||
&mut next_purge_batch,
|
&mut last_purge_slot,
|
||||||
10,
|
10,
|
||||||
None,
|
None,
|
||||||
|
&mut last_compaction_slot,
|
||||||
|
10,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -35,20 +35,18 @@ impl Blockstore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !self.no_compaction {
|
|
||||||
if let Err(e) = self.compact_storage(from_slot, to_slot) {
|
|
||||||
// This error is not fatal and indicates an internal error
|
|
||||||
error!(
|
|
||||||
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
|
|
||||||
e, from_slot, to_slot
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: rename purge_slots() to purge_and_compact_slots()
|
||||||
pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) {
|
pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) {
|
||||||
self.purge_slots_with_delay(from_slot, to_slot, None, PurgeType::Exact)
|
self.purge_slots_with_delay(from_slot, to_slot, None, PurgeType::Exact);
|
||||||
|
if let Err(e) = self.compact_storage(from_slot, to_slot) {
|
||||||
|
// This error is not fatal and indicates an internal error?
|
||||||
|
error!(
|
||||||
|
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
|
||||||
|
e, from_slot, to_slot
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ensures that the SlotMeta::next_slots vector for all slots contain no references in the
|
/// Ensures that the SlotMeta::next_slots vector for all slots contain no references in the
|
||||||
@ -169,6 +167,10 @@ impl Blockstore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> {
|
pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> {
|
||||||
|
if self.no_compaction {
|
||||||
|
info!("compact_storage: compaction disabled");
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
info!("compact_storage: from {} to {}", from_slot, to_slot);
|
info!("compact_storage: from {} to {}", from_slot, to_slot);
|
||||||
let mut compact_timer = Measure::start("compact_range");
|
let mut compact_timer = Measure::start("compact_range");
|
||||||
let result = self
|
let result = self
|
||||||
|
Reference in New Issue
Block a user