Replace channel with Mutex<Option> for AccountsPackage (#24013)

This commit is contained in:
Brooks Prumo
2022-04-06 05:47:19 -05:00
committed by GitHub
parent 07f4a9040a
commit c322842257
10 changed files with 232 additions and 118 deletions

View File

@ -5,19 +5,22 @@
// set and halt the node if a mismatch is detected.
use {
crossbeam_channel::RecvTimeoutError,
solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES},
solana_measure::measure::Measure,
solana_runtime::{
accounts_hash::{CalcAccountsHashConfig, HashStats},
snapshot_config::SnapshotConfig,
snapshot_package::{
AccountsPackage, AccountsPackageReceiver, PendingSnapshotPackage, SnapshotPackage,
AccountsPackage, PendingAccountsPackage, PendingSnapshotPackage, SnapshotPackage,
SnapshotType,
},
sorted_storages::SortedStorages,
},
solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey},
solana_sdk::{
clock::{Slot, SLOT_MS},
hash::Hash,
pubkey::Pubkey,
},
std::{
collections::{HashMap, HashSet},
sync::{
@ -35,7 +38,7 @@ pub struct AccountsHashVerifier {
impl AccountsHashVerifier {
pub fn new(
accounts_package_receiver: AccountsPackageReceiver,
pending_accounts_package: PendingAccountsPackage,
pending_snapshot_package: Option<PendingSnapshotPackage>,
exit: &Arc<AtomicBool>,
cluster_info: &Arc<ClusterInfo>,
@ -55,23 +58,24 @@ impl AccountsHashVerifier {
break;
}
match accounts_package_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(accounts_package) => {
Self::process_accounts_package(
accounts_package,
&cluster_info,
known_validators.as_ref(),
halt_on_known_validators_accounts_hash_mismatch,
pending_snapshot_package.as_ref(),
&mut hashes,
&exit,
fault_injection_rate_slots,
snapshot_config.as_ref(),
);
}
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => (),
let accounts_package = pending_accounts_package.lock().unwrap().take();
if accounts_package.is_none() {
std::thread::sleep(Duration::from_millis(SLOT_MS));
continue;
}
let accounts_package = accounts_package.unwrap();
Self::process_accounts_package(
accounts_package,
&cluster_info,
known_validators.as_ref(),
halt_on_known_validators_accounts_hash_mismatch,
pending_snapshot_package.as_ref(),
&mut hashes,
&exit,
fault_injection_rate_slots,
snapshot_config.as_ref(),
);
}
})
.unwrap();

View File

@ -48,9 +48,7 @@ use {
commitment::BlockCommitmentCache,
cost_model::CostModel,
snapshot_config::SnapshotConfig,
snapshot_package::{
AccountsPackageReceiver, AccountsPackageSender, PendingSnapshotPackage,
},
snapshot_package::{PendingAccountsPackage, PendingSnapshotPackage},
transaction_cost_metrics_sender::{
TransactionCostMetricsSender, TransactionCostMetricsService,
},
@ -143,7 +141,7 @@ impl Tvu {
tvu_config: TvuConfig,
max_slots: &Arc<MaxSlots>,
cost_model: &Arc<RwLock<CostModel>>,
accounts_package_channel: (AccountsPackageSender, AccountsPackageReceiver),
pending_accounts_package: PendingAccountsPackage,
last_full_snapshot_slot: Option<Slot>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
wait_to_vote_slot: Option<Slot>,
@ -220,9 +218,8 @@ impl Tvu {
(Some(snapshot_config), Some(pending_snapshot_package))
})
.unwrap_or((None, None));
let (accounts_package_sender, accounts_package_receiver) = accounts_package_channel;
let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_package_receiver,
Arc::clone(&pending_accounts_package),
pending_snapshot_package,
exit,
cluster_info,
@ -241,7 +238,7 @@ impl Tvu {
Some(SnapshotRequestHandler {
snapshot_config,
snapshot_request_receiver,
accounts_package_sender,
pending_accounts_package,
}),
)
}
@ -444,7 +441,6 @@ pub mod tests {
let (_, gossip_confirmed_slots_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let tower = Tower::default();
let accounts_package_channel = unbounded();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let (_pruned_banks_sender, pruned_banks_receiver) = unbounded();
let tvu = Tvu::new(
@ -492,7 +488,7 @@ pub mod tests {
TvuConfig::default(),
&Arc::new(MaxSlots::default()),
&Arc::new(RwLock::new(CostModel::default())),
accounts_package_channel,
PendingAccountsPackage::default(),
None,
None,
None,

View File

@ -80,7 +80,7 @@ use {
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
snapshot_hash::StartingSnapshotHashes,
snapshot_package::{AccountsPackageSender, PendingSnapshotPackage},
snapshot_package::{PendingAccountsPackage, PendingSnapshotPackage},
snapshot_utils,
},
solana_sdk::{
@ -460,8 +460,6 @@ impl Validator {
.register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
}
let accounts_package_channel = unbounded();
let accounts_update_notifier = geyser_plugin_service
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_accounts_update_notifier());
@ -520,6 +518,7 @@ impl Validator {
Some(poh_timing_point_sender.clone()),
);
let pending_accounts_package = PendingAccountsPackage::default();
let last_full_snapshot_slot = process_blockstore(
&blockstore,
&mut bank_forks,
@ -528,7 +527,7 @@ impl Validator {
transaction_status_sender.as_ref(),
cache_block_meta_sender.as_ref(),
config.snapshot_config.as_ref(),
accounts_package_channel.0.clone(),
Arc::clone(&pending_accounts_package),
blockstore_root_scan,
pruned_banks_receiver.clone(),
);
@ -934,7 +933,7 @@ impl Validator {
},
&max_slots,
&cost_model,
accounts_package_channel,
pending_accounts_package,
last_full_snapshot_slot,
block_metadata_notifier,
config.wait_to_vote_slot,
@ -1414,7 +1413,7 @@ fn process_blockstore(
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
pending_accounts_package: PendingAccountsPackage,
blockstore_root_scan: BlockstoreRootScan,
pruned_banks_receiver: DroppedSlotsReceiver,
) -> Option<Slot> {
@ -1426,7 +1425,7 @@ fn process_blockstore(
transaction_status_sender,
cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
pending_accounts_package,
pruned_banks_receiver,
)
.unwrap_or_else(|err| {

View File

@ -69,7 +69,8 @@ mod tests {
snapshot_archive_info::FullSnapshotArchiveInfo,
snapshot_config::SnapshotConfig,
snapshot_package::{
AccountsPackage, PendingSnapshotPackage, SnapshotPackage, SnapshotType,
AccountsPackage, PendingAccountsPackage, PendingSnapshotPackage, SnapshotPackage,
SnapshotType,
},
snapshot_utils::{self, ArchiveFormat, SnapshotVersion},
status_cache::MAX_CACHE_ENTRIES,
@ -247,12 +248,11 @@ mod tests {
let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair;
let (s, snapshot_request_receiver) = unbounded();
let (accounts_package_sender, _r) = unbounded();
let request_sender = AbsRequestSender::new(Some(s));
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_receiver,
accounts_package_sender,
pending_accounts_package: PendingAccountsPackage::default(),
};
for slot in 1..=last_slot {
let mut bank = Bank::new_from_parent(&bank_forks[slot - 1], &Pubkey::default(), slot);
@ -366,8 +366,8 @@ mod tests {
.unwrap();
// Set up snapshotting channels
let (sender, receiver) = unbounded();
let (fake_sender, _fake_receiver) = unbounded();
let real_pending_accounts_package = PendingAccountsPackage::default();
let fake_pending_accounts_package = PendingAccountsPackage::default();
// Create next MAX_CACHE_ENTRIES + 2 banks and snapshots. Every bank will get snapshotted
// and the snapshot purging logic will run on every snapshot taken. This means the three
@ -394,21 +394,21 @@ mod tests {
bank.squash();
let accounts_hash = bank.update_accounts_hash();
let package_sender = {
let pending_accounts_package = {
if slot == saved_slot as u64 {
// Only send one package on the real sender so that the packaging service
// doesn't take forever to run the packaging logic on all MAX_CACHE_ENTRIES
// later
&sender
// Only send one package on the real pending_accounts_package so that the
// packaging service doesn't take forever to run the packaging logic on all
// MAX_CACHE_ENTRIES later
&real_pending_accounts_package
} else {
&fake_sender
&fake_pending_accounts_package
}
};
snapshot_utils::snapshot_bank(
&bank,
vec![],
package_sender,
pending_accounts_package,
bank_snapshots_dir,
snapshot_archives_dir,
snapshot_config.snapshot_version,
@ -506,15 +506,16 @@ mod tests {
let _package_receiver = std::thread::Builder::new()
.name("package-receiver".to_string())
.spawn(move || {
while let Ok(mut accounts_package) = receiver.recv() {
// Only package the latest
while let Ok(new_accounts_package) = receiver.try_recv() {
accounts_package = new_accounts_package;
}
let snapshot_package = SnapshotPackage::from(accounts_package);
*pending_snapshot_package.lock().unwrap() = Some(snapshot_package);
}
let accounts_package = real_pending_accounts_package
.lock()
.unwrap()
.take()
.unwrap();
let snapshot_package = SnapshotPackage::from(accounts_package);
pending_snapshot_package
.lock()
.unwrap()
.replace(snapshot_package);
// Wait until the package is consumed by SnapshotPackagerService
while pending_snapshot_package.lock().unwrap().is_some() {
@ -526,10 +527,6 @@ mod tests {
})
.unwrap();
// Close the channel so that the package receiver will exit after reading all the
// packages off the channel
drop(sender);
// Wait for service to finish
snapshot_packager_service
.join()
@ -669,12 +666,11 @@ mod tests {
let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair;
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
let (accounts_package_sender, _accounts_package_receiver) = unbounded();
let request_sender = AbsRequestSender::new(Some(snapshot_request_sender));
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_receiver,
accounts_package_sender,
pending_accounts_package: PendingAccountsPackage::default(),
};
let mut last_full_snapshot_slot = None;
@ -892,7 +888,7 @@ mod tests {
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
let (accounts_package_sender, accounts_package_receiver) = unbounded();
let pending_accounts_package = PendingAccountsPackage::default();
let pending_snapshot_package = PendingSnapshotPackage::default();
let bank_forks = Arc::new(RwLock::new(snapshot_test_config.bank_forks));
@ -912,7 +908,7 @@ mod tests {
let snapshot_request_handler = Some(SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_receiver,
accounts_package_sender,
pending_accounts_package: Arc::clone(&pending_accounts_package),
});
let abs_request_handler = AbsRequestHandler {
snapshot_request_handler,
@ -930,7 +926,7 @@ mod tests {
);
let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_package_receiver,
pending_accounts_package,
Some(pending_snapshot_package),
&exit,
&cluster_info,