Move block-time caching earlier (#17109)
* Require that blockstore block-time only be recognized slot, instead of root * Move cache_block_time to after Bank freeze * Single use statement * Pass transaction_status_sender by reference * Remove unnecessary slot-existence check before caching block time altogether * Move block-time existence check into Blockstore::cache_block_time, Blockstore no longer needed in blockstore_processor helper
This commit is contained in:
@ -1,18 +1,20 @@
|
||||
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
|
||||
use solana_ledger::blockstore::Blockstore;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_runtime::bank::Bank;
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
pub use solana_ledger::blockstore_processor::CacheBlockTimeSender;
|
||||
use {
|
||||
crossbeam_channel::{Receiver, RecvTimeoutError},
|
||||
solana_ledger::blockstore::Blockstore,
|
||||
solana_measure::measure::Measure,
|
||||
solana_runtime::bank::Bank,
|
||||
std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
time::Duration,
|
||||
},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
pub type CacheBlockTimeReceiver = Receiver<Arc<Bank>>;
|
||||
pub type CacheBlockTimeSender = Sender<Arc<Bank>>;
|
||||
|
||||
pub struct CacheBlockTimeService {
|
||||
thread_hdl: JoinHandle<()>,
|
||||
|
@ -372,7 +372,8 @@ impl ReplayStage {
|
||||
&my_pubkey,
|
||||
&vote_account,
|
||||
&mut progress,
|
||||
transaction_status_sender.clone(),
|
||||
transaction_status_sender.as_ref(),
|
||||
cache_block_time_sender.as_ref(),
|
||||
&verify_recyclers,
|
||||
&mut heaviest_subtree_fork_choice,
|
||||
&replay_vote_sender,
|
||||
@ -565,7 +566,6 @@ impl ReplayStage {
|
||||
&subscriptions,
|
||||
&block_commitment_cache,
|
||||
&mut heaviest_subtree_fork_choice,
|
||||
&cache_block_time_sender,
|
||||
&bank_notification_sender,
|
||||
&mut gossip_duplicate_confirmed_slots,
|
||||
&mut unfrozen_gossip_verified_vote_hashes,
|
||||
@ -1187,7 +1187,7 @@ impl ReplayStage {
|
||||
bank: &Arc<Bank>,
|
||||
blockstore: &Blockstore,
|
||||
bank_progress: &mut ForkProgress,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||
replay_vote_sender: &ReplayVoteSender,
|
||||
verify_recyclers: &VerifyRecyclers,
|
||||
) -> result::Result<usize, BlockstoreProcessorError> {
|
||||
@ -1294,7 +1294,6 @@ impl ReplayStage {
|
||||
subscriptions: &Arc<RpcSubscriptions>,
|
||||
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
|
||||
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
|
||||
cache_block_time_sender: &Option<CacheBlockTimeSender>,
|
||||
bank_notification_sender: &Option<BankNotificationSender>,
|
||||
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
|
||||
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
|
||||
@ -1331,12 +1330,6 @@ impl ReplayStage {
|
||||
blockstore
|
||||
.set_roots(&rooted_slots)
|
||||
.expect("Ledger set roots failed");
|
||||
Self::cache_block_times(
|
||||
blockstore,
|
||||
bank_forks,
|
||||
&rooted_slots,
|
||||
cache_block_time_sender,
|
||||
);
|
||||
let highest_confirmed_root = Some(
|
||||
block_commitment_cache
|
||||
.read()
|
||||
@ -1630,7 +1623,8 @@ impl ReplayStage {
|
||||
my_pubkey: &Pubkey,
|
||||
vote_account: &Pubkey,
|
||||
progress: &mut ProgressMap,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||
cache_block_time_sender: Option<&CacheBlockTimeSender>,
|
||||
verify_recyclers: &VerifyRecyclers,
|
||||
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
|
||||
replay_vote_sender: &ReplayVoteSender,
|
||||
@ -1694,7 +1688,7 @@ impl ReplayStage {
|
||||
&bank,
|
||||
&blockstore,
|
||||
bank_progress,
|
||||
transaction_status_sender.clone(),
|
||||
transaction_status_sender,
|
||||
replay_vote_sender,
|
||||
verify_recyclers,
|
||||
);
|
||||
@ -1729,7 +1723,7 @@ impl ReplayStage {
|
||||
);
|
||||
did_complete_bank = true;
|
||||
info!("bank frozen: {}", bank.slot());
|
||||
if let Some(transaction_status_sender) = transaction_status_sender.clone() {
|
||||
if let Some(transaction_status_sender) = transaction_status_sender {
|
||||
transaction_status_sender.send_transaction_status_freeze_message(&bank);
|
||||
}
|
||||
bank.freeze();
|
||||
@ -1755,6 +1749,7 @@ impl ReplayStage {
|
||||
.send(BankNotification::Frozen(bank.clone()))
|
||||
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
|
||||
}
|
||||
blockstore_processor::cache_block_time(&bank, cache_block_time_sender);
|
||||
|
||||
let bank_hash = bank.hash();
|
||||
if let Some(new_frozen_voters) =
|
||||
@ -2455,36 +2450,6 @@ impl ReplayStage {
|
||||
}
|
||||
}
|
||||
|
||||
fn cache_block_times(
|
||||
blockstore: &Arc<Blockstore>,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
rooted_slots: &[Slot],
|
||||
cache_block_time_sender: &Option<CacheBlockTimeSender>,
|
||||
) {
|
||||
if let Some(cache_block_time_sender) = cache_block_time_sender {
|
||||
for slot in rooted_slots {
|
||||
if blockstore
|
||||
.get_block_time(*slot)
|
||||
.unwrap_or_default()
|
||||
.is_none()
|
||||
{
|
||||
if let Some(rooted_bank) = bank_forks.read().unwrap().get(*slot) {
|
||||
cache_block_time_sender
|
||||
.send(rooted_bank.clone())
|
||||
.unwrap_or_else(|err| {
|
||||
warn!("cache_block_time_sender failed: {:?}", err)
|
||||
});
|
||||
} else {
|
||||
error!(
|
||||
"rooted_bank {:?} not available in BankForks; block time not cached",
|
||||
slot
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_unlock_switch_vote_slot(cluster_type: ClusterType) -> Slot {
|
||||
match cluster_type {
|
||||
ClusterType::Development => 0,
|
||||
@ -3381,7 +3346,7 @@ pub(crate) mod tests {
|
||||
&bank,
|
||||
&mut entries,
|
||||
true,
|
||||
Some(TransactionStatusSender {
|
||||
Some(&TransactionStatusSender {
|
||||
sender: transaction_status_sender,
|
||||
enable_cpi_and_log_storage: false,
|
||||
}),
|
||||
|
@ -693,7 +693,7 @@ mod tests {
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
let (bank_forks, cached_leader_schedule) =
|
||||
process_blockstore(&genesis_config, &blockstore, Vec::new(), opts).unwrap();
|
||||
process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap();
|
||||
let leader_schedule_cache = Arc::new(cached_leader_schedule);
|
||||
let bank_forks = Arc::new(RwLock::new(bank_forks));
|
||||
|
||||
|
@ -1119,7 +1119,10 @@ fn new_banks_from_ledger(
|
||||
process_options,
|
||||
transaction_history_services
|
||||
.transaction_status_sender
|
||||
.clone(),
|
||||
.as_ref(),
|
||||
transaction_history_services
|
||||
.cache_block_time_sender
|
||||
.as_ref(),
|
||||
)
|
||||
.unwrap_or_else(|err| {
|
||||
error!("Failed to load ledger: {:?}", err);
|
||||
|
Reference in New Issue
Block a user