Update "limit-ledger-size" to use DeleteRange for much faster deletes (#7515)
* Update "limit-ledger-size" to use DeleteRange for much faster deletes * Update core/src/ledger_cleanup_service.rs Co-Authored-By: Michael Vines <mvines@gmail.com> * Rewrite more idiomatically * Move max_ledger_slots to a fn for clippy * Remove unused import * Detect when all columns have been purged and fix a bug in deletion * Check that more than 1 column is actually deleted * Add helper to test that ledger meets minimum slot bounds * Remove manual batching of deletes * Refactor to keep some N slots older than the highest root * Define MAX_LEDGER_SLOTS that ledger_cleanup_service will try to keep around * Refactor compact range
This commit is contained in:
@ -3,8 +3,7 @@
|
||||
use crate::result::{Error, Result};
|
||||
use solana_ledger::blocktree::Blocktree;
|
||||
use solana_metrics::datapoint_debug;
|
||||
use solana_sdk::clock::DEFAULT_SLOTS_PER_EPOCH;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::clock::Slot;
|
||||
use std::string::ToString;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{Receiver, RecvTimeoutError};
|
||||
@ -13,7 +12,14 @@ use std::thread;
|
||||
use std::thread::{Builder, JoinHandle};
|
||||
use std::time::Duration;
|
||||
|
||||
pub const DEFAULT_MAX_LEDGER_SLOTS: u64 = 3 * DEFAULT_SLOTS_PER_EPOCH;
|
||||
// This is chosen to allow enough time for
|
||||
// - To try and keep the RocksDB size under 128GB at 50k tps (100 slots take ~2GB).
|
||||
// - A validator to download a snapshot from a peer and boot from it
|
||||
// - To make sure that if a validator needs to reboot from its own snapshot, it has enough slots locally
|
||||
// to catch back up to where it was when it stopped
|
||||
pub const MAX_LEDGER_SLOTS: u64 = 6400;
|
||||
// Remove a fixed number of slots at a time, it's more efficient than doing it one-by-one
|
||||
pub const DEFAULT_PURGE_BATCH_SIZE: u64 = 256;
|
||||
|
||||
pub struct LedgerCleanupService {
|
||||
t_cleanup: JoinHandle<()>,
|
||||
@ -21,7 +27,7 @@ pub struct LedgerCleanupService {
|
||||
|
||||
impl LedgerCleanupService {
|
||||
pub fn new(
|
||||
slot_full_receiver: Receiver<(u64, Pubkey)>,
|
||||
new_root_receiver: Receiver<Slot>,
|
||||
blocktree: Arc<Blocktree>,
|
||||
max_ledger_slots: u64,
|
||||
exit: &Arc<AtomicBool>,
|
||||
@ -31,15 +37,19 @@ impl LedgerCleanupService {
|
||||
max_ledger_slots
|
||||
);
|
||||
let exit = exit.clone();
|
||||
let mut next_purge_batch = max_ledger_slots;
|
||||
let t_cleanup = Builder::new()
|
||||
.name("solana-ledger-cleanup".to_string())
|
||||
.spawn(move || loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
if let Err(e) =
|
||||
Self::cleanup_ledger(&slot_full_receiver, &blocktree, max_ledger_slots)
|
||||
{
|
||||
if let Err(e) = Self::cleanup_ledger(
|
||||
&new_root_receiver,
|
||||
&blocktree,
|
||||
max_ledger_slots,
|
||||
&mut next_purge_batch,
|
||||
) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
@ -52,16 +62,18 @@ impl LedgerCleanupService {
|
||||
}
|
||||
|
||||
fn cleanup_ledger(
|
||||
slot_full_receiver: &Receiver<(u64, Pubkey)>,
|
||||
new_root_receiver: &Receiver<Slot>,
|
||||
blocktree: &Arc<Blocktree>,
|
||||
max_ledger_slots: u64,
|
||||
next_purge_batch: &mut u64,
|
||||
) -> Result<()> {
|
||||
let disk_utilization_pre = blocktree.storage_size();
|
||||
|
||||
let (slot, _) = slot_full_receiver.recv_timeout(Duration::from_secs(1))?;
|
||||
if slot > max_ledger_slots {
|
||||
let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
|
||||
if root > *next_purge_batch {
|
||||
//cleanup
|
||||
blocktree.purge_slots(0, Some(slot - max_ledger_slots));
|
||||
blocktree.purge_slots(0, Some(root - max_ledger_slots));
|
||||
*next_purge_batch += DEFAULT_PURGE_BATCH_SIZE;
|
||||
}
|
||||
|
||||
let disk_utilization_post = blocktree.storage_size();
|
||||
@ -105,8 +117,10 @@ mod tests {
|
||||
let (sender, receiver) = channel();
|
||||
|
||||
//send a signal to kill slots 0-40
|
||||
sender.send((50, Pubkey::default())).unwrap();
|
||||
LedgerCleanupService::cleanup_ledger(&receiver, &blocktree, 10).unwrap();
|
||||
let mut next_purge_slot = 0;
|
||||
sender.send(50).unwrap();
|
||||
LedgerCleanupService::cleanup_ledger(&receiver, &blocktree, 10, &mut next_purge_slot)
|
||||
.unwrap();
|
||||
|
||||
//check that 0-40 don't exist
|
||||
blocktree
|
||||
@ -137,8 +151,15 @@ mod tests {
|
||||
|
||||
// send signal to cleanup slots
|
||||
let (sender, receiver) = channel();
|
||||
sender.send((n, Pubkey::default())).unwrap();
|
||||
LedgerCleanupService::cleanup_ledger(&receiver, &blocktree, max_ledger_slots).unwrap();
|
||||
sender.send(n).unwrap();
|
||||
let mut next_purge_batch = 0;
|
||||
LedgerCleanupService::cleanup_ledger(
|
||||
&receiver,
|
||||
&blocktree,
|
||||
max_ledger_slots,
|
||||
&mut next_purge_batch,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
thread::sleep(Duration::from_secs(2));
|
||||
|
||||
|
@ -73,6 +73,7 @@ pub struct ReplayStageConfig {
|
||||
pub subscriptions: Arc<RpcSubscriptions>,
|
||||
pub leader_schedule_cache: Arc<LeaderScheduleCache>,
|
||||
pub slot_full_senders: Vec<Sender<(u64, Pubkey)>>,
|
||||
pub latest_root_senders: Vec<Sender<Slot>>,
|
||||
pub snapshot_package_sender: Option<SnapshotPackageSender>,
|
||||
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||
pub transaction_status_sender: Option<TransactionStatusSender>,
|
||||
@ -193,6 +194,7 @@ impl ReplayStage {
|
||||
subscriptions,
|
||||
leader_schedule_cache,
|
||||
slot_full_senders,
|
||||
latest_root_senders,
|
||||
snapshot_package_sender,
|
||||
block_commitment_cache,
|
||||
transaction_status_sender,
|
||||
@ -315,6 +317,7 @@ impl ReplayStage {
|
||||
stats.total_staked,
|
||||
&lockouts_sender,
|
||||
&snapshot_package_sender,
|
||||
&latest_root_senders,
|
||||
)?;
|
||||
}
|
||||
datapoint_debug!(
|
||||
@ -615,6 +618,7 @@ impl ReplayStage {
|
||||
total_staked: u64,
|
||||
lockouts_sender: &Sender<CommitmentAggregationData>,
|
||||
snapshot_package_sender: &Option<SnapshotPackageSender>,
|
||||
latest_root_senders: &[Sender<Slot>],
|
||||
) -> Result<()> {
|
||||
if bank.is_empty() {
|
||||
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
|
||||
@ -645,6 +649,11 @@ impl ReplayStage {
|
||||
.unwrap()
|
||||
.set_root(new_root, snapshot_package_sender);
|
||||
Self::handle_new_root(&bank_forks, progress);
|
||||
latest_root_senders.iter().for_each(|s| {
|
||||
if let Err(e) = s.send(new_root) {
|
||||
trace!("latest root send failed: {:?}", e);
|
||||
}
|
||||
});
|
||||
trace!("new root {}", new_root);
|
||||
if let Err(e) = root_bank_sender.send(rooted_banks) {
|
||||
trace!("root_bank_sender failed: {:?}", e);
|
||||
|
@ -166,7 +166,8 @@ impl Tvu {
|
||||
exit: exit.clone(),
|
||||
subscriptions: subscriptions.clone(),
|
||||
leader_schedule_cache: leader_schedule_cache.clone(),
|
||||
slot_full_senders: vec![blockstream_slot_sender, ledger_cleanup_slot_sender],
|
||||
slot_full_senders: vec![blockstream_slot_sender],
|
||||
latest_root_senders: vec![ledger_cleanup_slot_sender],
|
||||
snapshot_package_sender,
|
||||
block_commitment_cache,
|
||||
transaction_status_sender,
|
||||
|
Reference in New Issue
Block a user