Rework backup and clear function (#10751) (#10754)

(cherry picked from commit a1ef921b88)

Co-authored-by: sakridge <sakridge@gmail.com>
This commit is contained in:
mergify[bot]
2020-06-23 22:56:14 +00:00
committed by GitHub
parent 6f48aafd3a
commit 52eea215ce
2 changed files with 51 additions and 24 deletions

View File

@ -181,6 +181,16 @@ impl Validator {
sigverify::init(); sigverify::init();
info!("Done."); info!("Done.");
if let Some(shred_version) = config.expected_shred_version {
if let Some(wait_for_supermajority_slot) = config.wait_for_supermajority {
backup_and_clear_blockstore(
ledger_path,
wait_for_supermajority_slot + 1,
shred_version,
);
}
}
info!("creating bank..."); info!("creating bank...");
let ( let (
genesis_config, genesis_config,
@ -381,14 +391,7 @@ impl Validator {
(None, None) (None, None)
}; };
wait_for_supermajority( wait_for_supermajority(config, &bank, &cluster_info, rpc_override_health_check);
config,
&bank,
&cluster_info,
rpc_override_health_check,
&blockstore,
ledger_path,
);
let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit); let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit);
assert_eq!( assert_eq!(
@ -634,21 +637,39 @@ fn new_banks_from_blockstore(
) )
} }
fn backup_and_clear_blockstore(blockstore: &Arc<Blockstore>, ledger_path: &Path, start_slot: Slot) { fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_version: u16) {
use std::time::Instant; use std::time::Instant;
let blockstore = Blockstore::open(ledger_path).unwrap();
let mut do_copy_and_clear = false;
// Search for shreds with incompatible version in blockstore
if let Ok(slot_meta_iterator) = blockstore.slot_meta_iterator(start_slot) {
for (slot, _meta) in slot_meta_iterator {
if let Ok(shreds) = blockstore.get_data_shreds_for_slot(slot, 0) {
for shred in &shreds {
if shred.version() != shred_version {
do_copy_and_clear = true;
break;
}
}
}
}
}
// If found, then copy shreds to another db and clear from start_slot
if do_copy_and_clear {
let folder_name = format!("backup_rocksdb_{}", thread_rng().gen_range(0, 99999)); let folder_name = format!("backup_rocksdb_{}", thread_rng().gen_range(0, 99999));
let backup_blockstore = Blockstore::open(&ledger_path.join(folder_name)); let backup_blockstore = Blockstore::open(&ledger_path.join(folder_name));
if let Ok(slot_meta_iterator) = blockstore.slot_meta_iterator(start_slot) {
let mut last_print = Instant::now(); let mut last_print = Instant::now();
let mut copied = 0; let mut copied = 0;
let mut end_slot = start_slot; let mut last_slot = None;
let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot).unwrap();
for (slot, _meta) in slot_meta_iterator { for (slot, _meta) in slot_meta_iterator {
if let Ok(shreds) = blockstore.get_data_shreds_for_slot(slot, 0) { if let Ok(shreds) = blockstore.get_data_shreds_for_slot(slot, 0) {
if let Ok(ref backup_blockstore) = backup_blockstore { if let Ok(ref backup_blockstore) = backup_blockstore {
copied += shreds.len(); copied += shreds.len();
let _ = backup_blockstore.insert_shreds(shreds, None, true); let _ = backup_blockstore.insert_shreds(shreds, None, true);
} }
end_slot = slot;
} }
if last_print.elapsed().as_millis() > 3000 { if last_print.elapsed().as_millis() > 3000 {
info!( info!(
@ -657,10 +678,13 @@ fn backup_and_clear_blockstore(blockstore: &Arc<Blockstore>, ledger_path: &Path,
); );
last_print = Instant::now(); last_print = Instant::now();
} }
last_slot = Some(slot);
} }
let end_slot = last_slot.unwrap();
info!("Purging slots {} to {}", start_slot, end_slot); info!("Purging slots {} to {}", start_slot, end_slot);
blockstore.purge_slots_with_delay(start_slot, end_slot, None, PurgeType::Exact); blockstore.purge_slots_with_delay(start_slot, end_slot, None, PurgeType::Exact);
blockstore.purge_from_next_slots(start_slot, end_slot);
info!("Purging done, compacting db.."); info!("Purging done, compacting db..");
if let Err(e) = blockstore.compact_storage(start_slot, end_slot) { if let Err(e) = blockstore.compact_storage(start_slot, end_slot) {
warn!( warn!(
@ -670,6 +694,7 @@ fn backup_and_clear_blockstore(blockstore: &Arc<Blockstore>, ledger_path: &Path,
} }
info!("done"); info!("done");
} }
drop(blockstore);
} }
fn wait_for_supermajority( fn wait_for_supermajority(
@ -677,15 +702,11 @@ fn wait_for_supermajority(
bank: &Bank, bank: &Bank,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
rpc_override_health_check: Arc<AtomicBool>, rpc_override_health_check: Arc<AtomicBool>,
blockstore: &Arc<Blockstore>,
ledger_path: &Path,
) { ) {
if config.wait_for_supermajority != Some(bank.slot()) { if config.wait_for_supermajority != Some(bank.slot()) {
return; return;
} }
backup_and_clear_blockstore(blockstore, ledger_path, bank.slot() + 1);
info!( info!(
"Waiting for 80% of activated stake at slot {} to be in gossip...", "Waiting for 80% of activated stake at slot {} to be in gossip...",
bank.slot() bank.slot()
@ -960,23 +981,27 @@ mod tests {
use solana_ledger::{blockstore, entry}; use solana_ledger::{blockstore, entry};
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();
{ {
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap()); let blockstore = Blockstore::open(&blockstore_path).unwrap();
let entries = entry::create_ticks(1, 0, Hash::default());
info!("creating shreds"); info!("creating shreds");
let mut last_print = Instant::now(); let mut last_print = Instant::now();
for i in 1..10 { for i in 1..10 {
let entries = entry::create_ticks(1, 0, Hash::default()); let shreds = blockstore::entries_to_test_shreds(entries.clone(), i, i - 1, true, 1);
let shreds = blockstore::entries_to_test_shreds(entries, i, i - 1, true, 1);
blockstore.insert_shreds(shreds, None, true).unwrap(); blockstore.insert_shreds(shreds, None, true).unwrap();
if last_print.elapsed().as_millis() > 5000 { if last_print.elapsed().as_millis() > 5000 {
info!("inserted {}", i); info!("inserted {}", i);
last_print = Instant::now(); last_print = Instant::now();
} }
} }
drop(blockstore);
backup_and_clear_blockstore(&blockstore, &blockstore_path, 5); backup_and_clear_blockstore(&blockstore_path, 5, 2);
for i in 6..10 { let blockstore = Blockstore::open(&blockstore_path).unwrap();
assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
for i in 5..10 {
assert!(blockstore assert!(blockstore
.get_data_shreds_for_slot(i, 0) .get_data_shreds_for_slot(i, 0)
.unwrap() .unwrap()

View File

@ -24,6 +24,7 @@ impl Blockstore {
let mut batch_start = from_slot; let mut batch_start = from_slot;
let mut purge_stats = PurgeStats::default(); let mut purge_stats = PurgeStats::default();
let mut last_datapoint = Instant::now(); let mut last_datapoint = Instant::now();
let mut datapoint_start = batch_start;
while batch_start < to_slot { while batch_start < to_slot {
let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot); let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot);
@ -33,13 +34,14 @@ impl Blockstore {
if last_datapoint.elapsed().as_millis() > 1000 { if last_datapoint.elapsed().as_millis() > 1000 {
datapoint_info!( datapoint_info!(
"blockstore-purge", "blockstore-purge",
("from_slot", batch_start as i64, i64), ("from_slot", datapoint_start as i64, i64),
("to_slot", to_slot as i64, i64), ("to_slot", batch_end as i64, i64),
("delete_range_us", purge_stats.delete_range as i64, i64), ("delete_range_us", purge_stats.delete_range as i64, i64),
("write_batch_us", purge_stats.write_batch as i64, i64) ("write_batch_us", purge_stats.write_batch as i64, i64)
); );
last_datapoint = Instant::now(); last_datapoint = Instant::now();
purge_stats = PurgeStats::default(); purge_stats = PurgeStats::default();
datapoint_start = batch_end;
} }
match purge_result { match purge_result {