Avoid tmp snapshot backlog in SnapshotPackagerService under high load (#14516)

This commit is contained in:
Michael Vines
2021-01-11 10:21:15 -08:00
parent b15603b4eb
commit f46ad1b7b7
6 changed files with 95 additions and 56 deletions

View File

@ -4,10 +4,11 @@
// hash on gossip. Monitor gossip for messages from validators in the --trusted-validators // hash on gossip. Monitor gossip for messages from validators in the --trusted-validators
// set and halt the node if a mismatch is detected. // set and halt the node if a mismatch is detected.
use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}; use crate::{
use solana_runtime::snapshot_package::{ cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES},
AccountsPackage, AccountsPackageReceiver, AccountsPackageSender, snapshot_packager_service::PendingSnapshotPackage,
}; };
use solana_runtime::snapshot_package::{AccountsPackage, AccountsPackageReceiver};
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::{ use std::{
@ -27,7 +28,7 @@ pub struct AccountsHashVerifier {
impl AccountsHashVerifier { impl AccountsHashVerifier {
pub fn new( pub fn new(
accounts_package_receiver: AccountsPackageReceiver, accounts_package_receiver: AccountsPackageReceiver,
accounts_package_sender: Option<AccountsPackageSender>, pending_snapshot_package: Option<PendingSnapshotPackage>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
cluster_info: &Arc<ClusterInfo>, cluster_info: &Arc<ClusterInfo>,
trusted_validators: Option<HashSet<Pubkey>>, trusted_validators: Option<HashSet<Pubkey>>,
@ -53,7 +54,7 @@ impl AccountsHashVerifier {
&cluster_info, &cluster_info,
&trusted_validators, &trusted_validators,
halt_on_trusted_validators_accounts_hash_mismatch, halt_on_trusted_validators_accounts_hash_mismatch,
&accounts_package_sender, &pending_snapshot_package,
&mut hashes, &mut hashes,
&exit, &exit,
fault_injection_rate_slots, fault_injection_rate_slots,
@ -76,7 +77,7 @@ impl AccountsHashVerifier {
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
trusted_validators: &Option<HashSet<Pubkey>>, trusted_validators: &Option<HashSet<Pubkey>>,
halt_on_trusted_validator_accounts_hash_mismatch: bool, halt_on_trusted_validator_accounts_hash_mismatch: bool,
accounts_package_sender: &Option<AccountsPackageSender>, pending_snapshot_package: &Option<PendingSnapshotPackage>,
hashes: &mut Vec<(Slot, Hash)>, hashes: &mut Vec<(Slot, Hash)>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
fault_injection_rate_slots: u64, fault_injection_rate_slots: u64,
@ -111,8 +112,8 @@ impl AccountsHashVerifier {
} }
if accounts_package.block_height % snapshot_interval_slots == 0 { if accounts_package.block_height % snapshot_interval_slots == 0 {
if let Some(sender) = accounts_package_sender.as_ref() { if let Some(pending_snapshot_package) = pending_snapshot_package.as_ref() {
if sender.send(accounts_package).is_err() {} *pending_snapshot_package.lock().unwrap() = Some(accounts_package);
} }
} }

View File

@ -1,23 +1,24 @@
use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}; use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
use solana_runtime::{snapshot_package::AccountsPackageReceiver, snapshot_utils}; use solana_runtime::{snapshot_package::AccountsPackage, snapshot_utils};
use solana_sdk::{clock::Slot, hash::Hash}; use solana_sdk::{clock::Slot, hash::Hash};
use std::{ use std::{
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
mpsc::RecvTimeoutError, Arc, Mutex,
Arc,
}, },
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Duration, time::Duration,
}; };
pub type PendingSnapshotPackage = Arc<Mutex<Option<AccountsPackage>>>;
pub struct SnapshotPackagerService { pub struct SnapshotPackagerService {
t_snapshot_packager: JoinHandle<()>, t_snapshot_packager: JoinHandle<()>,
} }
impl SnapshotPackagerService { impl SnapshotPackagerService {
pub fn new( pub fn new(
snapshot_package_receiver: AccountsPackageReceiver, pending_snapshot_package: PendingSnapshotPackage,
starting_snapshot_hash: Option<(Slot, Hash)>, starting_snapshot_hash: Option<(Slot, Hash)>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
cluster_info: &Arc<ClusterInfo>, cluster_info: &Arc<ClusterInfo>,
@ -26,7 +27,7 @@ impl SnapshotPackagerService {
let cluster_info = cluster_info.clone(); let cluster_info = cluster_info.clone();
let t_snapshot_packager = Builder::new() let t_snapshot_packager = Builder::new()
.name("solana-snapshot-packager".to_string()) .name("snapshot-packager".to_string())
.spawn(move || { .spawn(move || {
let mut hashes = vec![]; let mut hashes = vec![];
if let Some(starting_snapshot_hash) = starting_snapshot_hash { if let Some(starting_snapshot_hash) = starting_snapshot_hash {
@ -38,14 +39,8 @@ impl SnapshotPackagerService {
break; break;
} }
match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) { let snapshot_package = pending_snapshot_package.lock().unwrap().take();
Ok(mut snapshot_package) => { if let Some(snapshot_package) = snapshot_package {
// Only package the latest
while let Ok(new_snapshot_package) =
snapshot_package_receiver.try_recv()
{
snapshot_package = new_snapshot_package;
}
if let Err(err) = if let Err(err) =
snapshot_utils::archive_snapshot_package(&snapshot_package) snapshot_utils::archive_snapshot_package(&snapshot_package)
{ {
@ -57,13 +52,13 @@ impl SnapshotPackagerService {
} }
cluster_info.push_snapshot_hashes(hashes.clone()); cluster_info.push_snapshot_hashes(hashes.clone());
} }
} } else {
Err(RecvTimeoutError::Disconnected) => break, std::thread::sleep(Duration::from_millis(100));
Err(RecvTimeoutError::Timeout) => (),
} }
} }
}) })
.unwrap(); .unwrap();
Self { Self {
t_snapshot_packager, t_snapshot_packager,
} }

View File

@ -20,6 +20,7 @@ use crate::{
shred_fetch_stage::ShredFetchStage, shred_fetch_stage::ShredFetchStage,
sigverify_shreds::ShredSigVerifier, sigverify_shreds::ShredSigVerifier,
sigverify_stage::SigVerifyStage, sigverify_stage::SigVerifyStage,
snapshot_packager_service::PendingSnapshotPackage,
}; };
use crossbeam_channel::unbounded; use crossbeam_channel::unbounded;
use solana_ledger::{ use solana_ledger::{
@ -34,7 +35,6 @@ use solana_runtime::{
}, },
bank_forks::{BankForks, SnapshotConfig}, bank_forks::{BankForks, SnapshotConfig},
commitment::BlockCommitmentCache, commitment::BlockCommitmentCache,
snapshot_package::AccountsPackageSender,
vote_sender_types::ReplayVoteSender, vote_sender_types::ReplayVoteSender,
}; };
use solana_sdk::{ use solana_sdk::{
@ -107,7 +107,7 @@ impl Tvu {
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>, rewards_recorder_sender: Option<RewardsRecorderSender>,
cache_block_time_sender: Option<CacheBlockTimeSender>, cache_block_time_sender: Option<CacheBlockTimeSender>,
snapshot_config_and_package_sender: Option<(SnapshotConfig, AccountsPackageSender)>, snapshot_config_and_pending_package: Option<(SnapshotConfig, PendingSnapshotPackage)>,
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender, retransmit_slots_sender: RetransmitSlotsSender,
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
@ -179,15 +179,15 @@ impl Tvu {
} }
}; };
info!("snapshot_interval_slots: {}", snapshot_interval_slots); info!("snapshot_interval_slots: {}", snapshot_interval_slots);
let (snapshot_config, accounts_package_sender) = snapshot_config_and_package_sender let (snapshot_config, pending_snapshot_package) = snapshot_config_and_pending_package
.map(|(snapshot_config, accounts_package_sender)| { .map(|(snapshot_config, pending_snapshot_package)| {
(Some(snapshot_config), Some(accounts_package_sender)) (Some(snapshot_config), Some(pending_snapshot_package))
}) })
.unwrap_or((None, None)); .unwrap_or((None, None));
let (accounts_hash_sender, accounts_hash_receiver) = channel(); let (accounts_hash_sender, accounts_hash_receiver) = channel();
let accounts_hash_verifier = AccountsHashVerifier::new( let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_hash_receiver, accounts_hash_receiver,
accounts_package_sender, pending_snapshot_package,
exit, exit,
&cluster_info, &cluster_info,
tvu_config.trusted_validators.clone(), tvu_config.trusted_validators.clone(),

View File

@ -26,7 +26,7 @@ use crate::{
serve_repair::ServeRepair, serve_repair::ServeRepair,
serve_repair_service::ServeRepairService, serve_repair_service::ServeRepairService,
sigverify, sigverify,
snapshot_packager_service::SnapshotPackagerService, snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService},
tpu::Tpu, tpu::Tpu,
transaction_status_service::TransactionStatusService, transaction_status_service::TransactionStatusService,
tvu::{Sockets, Tvu, TvuConfig}, tvu::{Sockets, Tvu, TvuConfig},
@ -70,7 +70,7 @@ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::atomic::{AtomicBool, Ordering}, sync::atomic::{AtomicBool, Ordering},
sync::mpsc::Receiver, sync::mpsc::Receiver,
sync::{mpsc::channel, Arc, Mutex, RwLock}, sync::{Arc, Mutex, RwLock},
thread::sleep, thread::sleep,
time::Duration, time::Duration,
}; };
@ -518,7 +518,7 @@ impl Validator {
&exit, &exit,
); );
let (snapshot_packager_service, snapshot_config_and_package_sender) = let (snapshot_packager_service, snapshot_config_and_pending_package) =
if let Some(snapshot_config) = config.snapshot_config.clone() { if let Some(snapshot_config) = config.snapshot_config.clone() {
if is_snapshot_config_invalid( if is_snapshot_config_invalid(
snapshot_config.snapshot_interval_slots, snapshot_config.snapshot_interval_slots,
@ -528,12 +528,17 @@ impl Validator {
} }
// Start a snapshot packaging service // Start a snapshot packaging service
let (sender, receiver) = channel(); let pending_snapshot_package = PendingSnapshotPackage::default();
let snapshot_packager_service =
SnapshotPackagerService::new(receiver, snapshot_hash, &exit, &cluster_info); let snapshot_packager_service = SnapshotPackagerService::new(
pending_snapshot_package.clone(),
snapshot_hash,
&exit,
&cluster_info,
);
( (
Some(snapshot_packager_service), Some(snapshot_packager_service),
Some((snapshot_config, sender)), Some((snapshot_config, pending_snapshot_package)),
) )
} else { } else {
(None, None) (None, None)
@ -609,7 +614,7 @@ impl Validator {
transaction_status_sender.clone(), transaction_status_sender.clone(),
rewards_recorder_sender, rewards_recorder_sender,
cache_block_time_sender, cache_block_time_sender,
snapshot_config_and_package_sender, snapshot_config_and_pending_package,
vote_tracker.clone(), vote_tracker.clone(),
retransmit_slots_sender, retransmit_slots_sender,
verified_vote_receiver, verified_vote_receiver,

View File

@ -39,8 +39,9 @@ mod tests {
use fs_extra::dir::CopyOptions; use fs_extra::dir::CopyOptions;
use itertools::Itertools; use itertools::Itertools;
use solana_core::{ use solana_core::{
cluster_info::ClusterInfo, contact_info::ContactInfo, cluster_info::ClusterInfo,
snapshot_packager_service::SnapshotPackagerService, contact_info::ContactInfo,
snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService},
}; };
use solana_runtime::{ use solana_runtime::{
accounts_background_service::{ABSRequestSender, SnapshotRequestHandler}, accounts_background_service::{ABSRequestSender, SnapshotRequestHandler},
@ -60,8 +61,15 @@ mod tests {
system_transaction, system_transaction,
}; };
use std::{ use std::{
collections::HashSet, fs, path::PathBuf, sync::atomic::AtomicBool, sync::mpsc::channel, collections::HashSet,
sync::Arc, fs,
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::channel,
Arc,
},
time::Duration,
}; };
use tempfile::TempDir; use tempfile::TempDir;
@ -398,10 +406,37 @@ mod tests {
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default()));
let snapshot_packager_service = let pending_snapshot_package = PendingSnapshotPackage::default();
SnapshotPackagerService::new(receiver, None, &exit, &cluster_info); let snapshot_packager_service = SnapshotPackagerService::new(
pending_snapshot_package.clone(),
None,
&exit,
&cluster_info,
);
// Close the channel so that the package service will exit after reading all the let _package_receiver = std::thread::Builder::new()
.name("package-receiver".to_string())
.spawn(move || {
while let Ok(mut snapshot_package) = receiver.recv() {
// Only package the latest
while let Ok(new_snapshot_package) = receiver.try_recv() {
snapshot_package = new_snapshot_package;
}
*pending_snapshot_package.lock().unwrap() = Some(snapshot_package);
}
// Wait until the package is consumed by SnapshotPackagerService
while pending_snapshot_package.lock().unwrap().is_some() {
std::thread::sleep(Duration::from_millis(100));
}
// Shutdown SnapshotPackagerService
exit.store(true, Ordering::Relaxed);
})
.unwrap();
// Close the channel so that the package receiver will exit after reading all the
// packages off the channel // packages off the channel
drop(sender); drop(sender);

View File

@ -163,7 +163,7 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
) -> Result<AccountsPackage> { ) -> Result<AccountsPackage> {
// Hard link all the snapshots we need for this package // Hard link all the snapshots we need for this package
let snapshot_hard_links_dir = tempfile::Builder::new() let snapshot_hard_links_dir = tempfile::Builder::new()
.prefix(TMP_SNAPSHOT_DIR_PREFIX) .prefix(&format!("{}{}-", TMP_SNAPSHOT_DIR_PREFIX, bank.slot()))
.tempdir_in(snapshot_path)?; .tempdir_in(snapshot_path)?;
// Create a snapshot package // Create a snapshot package
@ -251,7 +251,10 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()
// Create the staging directories // Create the staging directories
let staging_dir = tempfile::Builder::new() let staging_dir = tempfile::Builder::new()
.prefix(TMP_SNAPSHOT_DIR_PREFIX) .prefix(&format!(
"{}{}-",
TMP_SNAPSHOT_DIR_PREFIX, snapshot_package.slot
))
.tempdir_in(tar_dir)?; .tempdir_in(tar_dir)?;
let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR); let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR);