SnapshotPackagerService pushes incremental snapshot hashes to CRDS (#20442)

Now that CRDS supports incremental snapshot hashes,
SnapshotPackagerService needs to push 'em!

This commit does two main things:

1. SnapshotPackagerService now knows about incremental snapshot hashes,
   and will push SnapshotPackage::IncrementalSnapshot hashes to CRDS.
2. At startup, when loading from a full + incremental snapshot, the
   hashes need to be passed all the way to SnapshotPackagerService so it
   can push these starting hashes to CRDS.  Those values have been piped
   through.

Fixes #20441 and #20423
This commit is contained in:
Brooks Prumo
2021-10-08 15:14:56 -05:00
committed by GitHub
parent 675fa6993b
commit 5440c1d2e1
5 changed files with 234 additions and 56 deletions

View File

@ -1,7 +1,13 @@
use solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
use solana_gossip::cluster_info::{
ClusterInfo, MAX_INCREMENTAL_SNAPSHOT_HASHES, MAX_SNAPSHOT_HASHES,
};
use solana_runtime::{
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
snapshot_hash::{
FullSnapshotHash, FullSnapshotHashes, IncrementalSnapshotHash, IncrementalSnapshotHashes,
StartingSnapshotHashes,
},
snapshot_package::{PendingSnapshotPackage, SnapshotType},
snapshot_utils,
};
@ -22,26 +28,34 @@ pub struct SnapshotPackagerService {
impl SnapshotPackagerService {
pub fn new(
pending_snapshot_package: PendingSnapshotPackage,
starting_snapshot_hash: Option<(Slot, Hash)>,
starting_snapshot_hashes: Option<StartingSnapshotHashes>,
exit: &Arc<AtomicBool>,
cluster_info: &Arc<ClusterInfo>,
snapshot_config: SnapshotConfig,
) -> Self {
let exit = exit.clone();
let cluster_info = cluster_info.clone();
let max_snapshot_hashes = std::cmp::min(
let max_full_snapshot_hashes = std::cmp::min(
MAX_SNAPSHOT_HASHES,
snapshot_config.maximum_full_snapshot_archives_to_retain,
);
let max_incremental_snapshot_hashes = std::cmp::min(
MAX_INCREMENTAL_SNAPSHOT_HASHES,
snapshot_config.maximum_incremental_snapshot_archives_to_retain,
);
let t_snapshot_packager = Builder::new()
.name("snapshot-packager".to_string())
.spawn(move || {
let mut hashes = vec![];
if let Some(starting_snapshot_hash) = starting_snapshot_hash {
hashes.push(starting_snapshot_hash);
}
cluster_info.push_snapshot_hashes(hashes.clone());
let mut snapshot_gossip_manager = SnapshotGossipManager {
cluster_info,
max_full_snapshot_hashes,
max_incremental_snapshot_hashes,
full_snapshot_hashes: FullSnapshotHashes::default(),
incremental_snapshot_hashes: IncrementalSnapshotHashes::default(),
};
snapshot_gossip_manager.push_starting_snapshot_hashes(starting_snapshot_hashes);
loop {
if exit.load(Ordering::Relaxed) {
break;
@ -64,15 +78,10 @@ impl SnapshotPackagerService {
)
.expect("failed to archive snapshot package");
// NOTE: For backwards compatibility with version <=1.7, only _full_ snapshots
// can have their hashes pushed out to the cluster.
if snapshot_package.snapshot_type == SnapshotType::FullSnapshot {
hashes.push((snapshot_package.slot(), *snapshot_package.hash()));
while hashes.len() > max_snapshot_hashes {
hashes.remove(0);
}
cluster_info.push_snapshot_hashes(hashes.clone());
}
snapshot_gossip_manager.push_snapshot_hash(
snapshot_package.snapshot_type,
(snapshot_package.slot(), *snapshot_package.hash()),
);
}
})
.unwrap();
@ -87,6 +96,105 @@ impl SnapshotPackagerService {
}
}
struct SnapshotGossipManager {
cluster_info: Arc<ClusterInfo>,
max_full_snapshot_hashes: usize,
max_incremental_snapshot_hashes: usize,
full_snapshot_hashes: FullSnapshotHashes,
incremental_snapshot_hashes: IncrementalSnapshotHashes,
}
impl SnapshotGossipManager {
/// If there were starting snapshot hashes, add those to their respective vectors, then push
/// those vectors to the cluster via CRDS.
fn push_starting_snapshot_hashes(
&mut self,
starting_snapshot_hashes: Option<StartingSnapshotHashes>,
) {
if let Some(starting_snapshot_hashes) = starting_snapshot_hashes {
let starting_full_snapshot_hash = starting_snapshot_hashes.full;
self.push_full_snapshot_hash(starting_full_snapshot_hash);
if let Some(starting_incremental_snapshot_hash) = starting_snapshot_hashes.incremental {
self.push_incremental_snapshot_hash(starting_incremental_snapshot_hash);
};
}
}
/// Add `snapshot_hash` to its respective vector of hashes, then push that vector to the
/// cluster via CRDS.
fn push_snapshot_hash(&mut self, snapshot_type: SnapshotType, snapshot_hash: (Slot, Hash)) {
match snapshot_type {
SnapshotType::FullSnapshot => {
self.push_full_snapshot_hash(FullSnapshotHash {
hash: snapshot_hash,
});
}
SnapshotType::IncrementalSnapshot(base_slot) => {
let latest_full_snapshot_hash = *self.full_snapshot_hashes.hashes.last().unwrap();
assert_eq!(
base_slot, latest_full_snapshot_hash.0,
"the incremental snapshot's base slot ({}) must match the latest full snapshot hash's slot ({})",
base_slot, latest_full_snapshot_hash.0,
);
self.push_incremental_snapshot_hash(IncrementalSnapshotHash {
base: latest_full_snapshot_hash,
hash: snapshot_hash,
});
}
}
}
/// Add `full_snapshot_hash` to the vector of full snapshot hashes, then push that vector to
/// the cluster via CRDS.
fn push_full_snapshot_hash(&mut self, full_snapshot_hash: FullSnapshotHash) {
self.full_snapshot_hashes
.hashes
.push(full_snapshot_hash.hash);
while self.full_snapshot_hashes.hashes.len() > self.max_full_snapshot_hashes {
self.full_snapshot_hashes.hashes.remove(0);
}
self.cluster_info
.push_snapshot_hashes(self.full_snapshot_hashes.hashes.clone());
}
/// Add `incremental_snapshot_hash` to the vector of incremental snapshot hashes, then push
/// that vector to the cluster via CRDS.
fn push_incremental_snapshot_hash(
&mut self,
incremental_snapshot_hash: IncrementalSnapshotHash,
) {
// If the base snapshot hash is different from the one in IncrementalSnapshotHashes, then
// that means the old incremental snapshot hashes are no longer valid, so clear them all
// out.
if incremental_snapshot_hash.base != self.incremental_snapshot_hashes.base {
self.incremental_snapshot_hashes.hashes.clear();
self.incremental_snapshot_hashes.base = incremental_snapshot_hash.base;
}
self.incremental_snapshot_hashes
.hashes
.push(incremental_snapshot_hash.hash);
while self.incremental_snapshot_hashes.hashes.len() > self.max_incremental_snapshot_hashes {
self.incremental_snapshot_hashes.hashes.remove(0);
}
// Pushing incremental snapshot hashes to the cluster should never fail. The only error
// case is when the length of the hashes is too big, but we account for that with
// `max_incremental_snapshot_hashes`. If this call ever does error, it's a programmer bug!
// Check to see what changed in `push_incremental_snapshot_hashes()` and handle the new
// error condition here.
self.cluster_info
.push_incremental_snapshot_hashes(
self.incremental_snapshot_hashes.base,
self.incremental_snapshot_hashes.hashes.clone(),
)
.expect(
"Bug! The programmer contract has changed for push_incremental_snapshot_hashes() \
and a new error case has been added, which has not been handled here.",
);
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -71,6 +71,7 @@ use {
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
snapshot_hash::StartingSnapshotHashes,
snapshot_package::{AccountsPackageSender, PendingSnapshotPackage},
snapshot_utils,
},
@ -417,7 +418,7 @@ impl Validator {
completed_slots_receiver,
leader_schedule_cache,
last_full_snapshot_slot,
snapshot_hash,
starting_snapshot_hashes,
TransactionHistoryServices {
transaction_status_sender,
transaction_status_service,
@ -695,7 +696,7 @@ impl Validator {
let snapshot_packager_service = SnapshotPackagerService::new(
pending_snapshot_package.clone(),
snapshot_hash,
starting_snapshot_hashes,
&exit,
&cluster_info,
snapshot_config.clone(),
@ -1149,7 +1150,7 @@ fn new_banks_from_ledger(
CompletedSlotsReceiver,
LeaderScheduleCache,
Option<Slot>,
Option<(Slot, Hash)>,
Option<StartingSnapshotHashes>,
TransactionHistoryServices,
Tower,
) {
@ -1244,27 +1245,31 @@ fn new_banks_from_ledger(
TransactionHistoryServices::default()
};
let (mut bank_forks, mut leader_schedule_cache, last_full_snapshot_slot, snapshot_hash) =
bank_forks_utils::load(
&genesis_config,
&blockstore,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
process_options,
transaction_history_services
.transaction_status_sender
.as_ref(),
transaction_history_services
.cache_block_meta_sender
.as_ref(),
accounts_package_sender,
accounts_update_notifier,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
abort()
});
let (
mut bank_forks,
mut leader_schedule_cache,
last_full_snapshot_slot,
starting_snapshot_hashes,
) = bank_forks_utils::load(
&genesis_config,
&blockstore,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
process_options,
transaction_history_services
.transaction_status_sender
.as_ref(),
transaction_history_services
.cache_block_meta_sender
.as_ref(),
accounts_package_sender,
accounts_update_notifier,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
abort()
});
if let Some(warp_slot) = config.warp_slot {
let snapshot_config = config.snapshot_config.as_ref().unwrap_or_else(|| {
@ -1344,7 +1349,7 @@ fn new_banks_from_ledger(
completed_slots_receiver,
leader_schedule_cache,
last_full_snapshot_slot,
snapshot_hash,
starting_snapshot_hashes,
transaction_history_services,
tower,
)