Remove slots past wait-for-supermajority slot. (#10720)
This commit is contained in:
parent
0952b76f02
commit
2ba8fc5243
@ -23,9 +23,10 @@ use crate::{
|
||||
tvu::{Sockets, Tvu, TvuConfig},
|
||||
};
|
||||
use crossbeam_channel::unbounded;
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_ledger::{
|
||||
bank_forks_utils,
|
||||
blockstore::{Blockstore, CompletedSlotsReceiver},
|
||||
blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType},
|
||||
blockstore_processor, create_new_tmp_ledger,
|
||||
leader_schedule::FixedSchedule,
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
@ -382,7 +383,14 @@ impl Validator {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
wait_for_supermajority(config, &bank, &cluster_info, rpc_override_health_check);
|
||||
wait_for_supermajority(
|
||||
config,
|
||||
&bank,
|
||||
&cluster_info,
|
||||
rpc_override_health_check,
|
||||
&blockstore,
|
||||
ledger_path,
|
||||
);
|
||||
|
||||
let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit);
|
||||
assert_eq!(
|
||||
@ -628,16 +636,58 @@ fn new_banks_from_blockstore(
|
||||
)
|
||||
}
|
||||
|
||||
fn backup_and_clear_blockstore(blockstore: &Arc<Blockstore>, ledger_path: &Path, start_slot: Slot) {
|
||||
use std::time::Instant;
|
||||
let folder_name = format!("backup_rocksdb_{}", thread_rng().gen_range(0, 99999));
|
||||
let backup_blockstore = Blockstore::open(&ledger_path.join(folder_name));
|
||||
if let Ok(slot_meta_iterator) = blockstore.slot_meta_iterator(start_slot) {
|
||||
let mut last_print = Instant::now();
|
||||
let mut copied = 0;
|
||||
let mut end_slot = start_slot;
|
||||
for (slot, _meta) in slot_meta_iterator {
|
||||
if let Ok(shreds) = blockstore.get_data_shreds_for_slot(slot, 0) {
|
||||
if let Ok(ref backup_blockstore) = backup_blockstore {
|
||||
copied += shreds.len();
|
||||
let _ = backup_blockstore.insert_shreds(shreds, None, true);
|
||||
}
|
||||
end_slot = slot;
|
||||
}
|
||||
if last_print.elapsed().as_millis() > 3000 {
|
||||
info!(
|
||||
"Copying shreds from slot {} copied {} so far.",
|
||||
start_slot, copied
|
||||
);
|
||||
last_print = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
info!("Purging slots {} to {}", start_slot, end_slot);
|
||||
blockstore.purge_slots_with_delay(start_slot, end_slot, None, PurgeType::Exact);
|
||||
info!("Purging done, compacting db..");
|
||||
if let Err(e) = blockstore.compact_storage(start_slot, end_slot) {
|
||||
warn!(
|
||||
"Error from compacting storage from {} to {}: {:?}",
|
||||
start_slot, end_slot, e
|
||||
);
|
||||
}
|
||||
info!("done");
|
||||
}
|
||||
}
|
||||
|
||||
fn wait_for_supermajority(
|
||||
config: &ValidatorConfig,
|
||||
bank: &Bank,
|
||||
cluster_info: &ClusterInfo,
|
||||
rpc_override_health_check: Arc<AtomicBool>,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
ledger_path: &Path,
|
||||
) {
|
||||
if config.wait_for_supermajority != Some(bank.slot()) {
|
||||
return;
|
||||
}
|
||||
|
||||
backup_and_clear_blockstore(blockstore, ledger_path, bank.slot() + 1);
|
||||
|
||||
info!(
|
||||
"Waiting for 80% of activated stake at slot {} to be in gossip...",
|
||||
bank.slot()
|
||||
@ -904,6 +954,39 @@ mod tests {
|
||||
remove_dir_all(validator_ledger_path).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_backup_and_clear_blockstore() {
|
||||
use std::time::Instant;
|
||||
solana_logger::setup();
|
||||
use solana_ledger::get_tmp_ledger_path;
|
||||
use solana_ledger::{blockstore, entry};
|
||||
let blockstore_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
|
||||
|
||||
info!("creating shreds");
|
||||
let mut last_print = Instant::now();
|
||||
for i in 1..10 {
|
||||
let entries = entry::create_ticks(1, 0, Hash::default());
|
||||
let shreds = blockstore::entries_to_test_shreds(entries, i, i - 1, true, 1);
|
||||
blockstore.insert_shreds(shreds, None, true).unwrap();
|
||||
if last_print.elapsed().as_millis() > 5000 {
|
||||
info!("inserted {}", i);
|
||||
last_print = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
backup_and_clear_blockstore(&blockstore, &blockstore_path, 5);
|
||||
|
||||
for i in 6..10 {
|
||||
assert!(blockstore
|
||||
.get_data_shreds_for_slot(i, 0)
|
||||
.unwrap()
|
||||
.is_empty());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validator_parallel_exit() {
|
||||
let leader_keypair = Keypair::new();
|
||||
|
@ -1,4 +1,11 @@
|
||||
use super::*;
|
||||
use std::time::Instant;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct PurgeStats {
|
||||
delete_range: u64,
|
||||
write_batch: u64,
|
||||
}
|
||||
|
||||
impl Blockstore {
|
||||
/// Silently deletes all blockstore column families in the range [from_slot,to_slot]
|
||||
@ -15,9 +22,27 @@ impl Blockstore {
|
||||
// if there's no upper bound, split the purge request into batches of 1000 slots
|
||||
const PURGE_BATCH_SIZE: u64 = 1000;
|
||||
let mut batch_start = from_slot;
|
||||
let mut purge_stats = PurgeStats::default();
|
||||
let mut last_datapoint = Instant::now();
|
||||
while batch_start < to_slot {
|
||||
let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot);
|
||||
match self.run_purge(batch_start, batch_end, purge_type) {
|
||||
|
||||
let purge_result =
|
||||
self.run_purge_with_stats(batch_start, batch_end, purge_type, &mut purge_stats);
|
||||
|
||||
if last_datapoint.elapsed().as_millis() > 1000 {
|
||||
datapoint_info!(
|
||||
"blockstore-purge",
|
||||
("from_slot", batch_start as i64, i64),
|
||||
("to_slot", to_slot as i64, i64),
|
||||
("delete_range_us", purge_stats.delete_range as i64, i64),
|
||||
("write_batch_us", purge_stats.write_batch as i64, i64)
|
||||
);
|
||||
last_datapoint = Instant::now();
|
||||
purge_stats = PurgeStats::default();
|
||||
}
|
||||
|
||||
match purge_result {
|
||||
Ok(_all_columns_purged) => {
|
||||
batch_start = batch_end;
|
||||
|
||||
@ -76,12 +101,22 @@ impl Blockstore {
|
||||
}
|
||||
}
|
||||
|
||||
// Returns whether or not all columns successfully purged the slot range
|
||||
pub(crate) fn run_purge(
|
||||
&self,
|
||||
from_slot: Slot,
|
||||
to_slot: Slot,
|
||||
purge_type: PurgeType,
|
||||
) -> Result<bool> {
|
||||
self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default())
|
||||
}
|
||||
|
||||
// Returns whether or not all columns successfully purged the slot range
|
||||
pub(crate) fn run_purge_with_stats(
|
||||
&self,
|
||||
from_slot: Slot,
|
||||
to_slot: Slot,
|
||||
purge_type: PurgeType,
|
||||
purge_stats: &mut PurgeStats,
|
||||
) -> Result<bool> {
|
||||
let mut write_batch = self
|
||||
.db
|
||||
@ -156,13 +191,8 @@ impl Blockstore {
|
||||
return Err(e);
|
||||
}
|
||||
write_timer.stop();
|
||||
datapoint_info!(
|
||||
"blockstore-purge",
|
||||
("from_slot", from_slot as i64, i64),
|
||||
("to_slot", to_slot as i64, i64),
|
||||
("delete_range_us", delete_range_timer.as_us() as i64, i64),
|
||||
("write_batch_us", write_timer.as_us() as i64, i64)
|
||||
);
|
||||
purge_stats.delete_range += delete_range_timer.as_us();
|
||||
purge_stats.write_batch += write_timer.as_us();
|
||||
Ok(columns_purged)
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user