Cache block time in Blockstore (#11955)

* Add blockstore column to cache block times

* Add method to cache block time

* Add service to cache block time

* Update rpc getBlockTime to use new method, and refactor blockstore slightly

* Return block_time with confirmed block, if available

* Add measure and warning to cache-block-time
This commit is contained in:
Tyera Eulberg
2020-09-09 09:33:14 -06:00
committed by GitHub
parent 28f2fa3fd5
commit 05db41fe9c
9 changed files with 316 additions and 106 deletions

View File

@ -0,0 +1,76 @@
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_ledger::blockstore::Blockstore;
use solana_measure::measure::Measure;
use solana_runtime::bank::Bank;
use solana_sdk::timing::slot_duration_from_slots_per_year;
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
};
pub type CacheBlockTimeReceiver = Receiver<Arc<Bank>>;
pub type CacheBlockTimeSender = Sender<Arc<Bank>>;
pub struct CacheBlockTimeService {
thread_hdl: JoinHandle<()>,
}
const CACHE_BLOCK_TIME_WARNING_MS: u64 = 150;
impl CacheBlockTimeService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
cache_block_time_receiver: CacheBlockTimeReceiver,
blockstore: Arc<Blockstore>,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("solana-cache-block-time".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let recv_result = cache_block_time_receiver.recv_timeout(Duration::from_secs(1));
match recv_result {
Err(RecvTimeoutError::Disconnected) => {
break;
}
Ok(bank) => {
let mut cache_block_time_timer = Measure::start("cache_block_time_timer");
Self::cache_block_time(bank, &blockstore);
cache_block_time_timer.stop();
if cache_block_time_timer.as_ms() > CACHE_BLOCK_TIME_WARNING_MS {
warn!(
"cache_block_time operation took: {}ms",
cache_block_time_timer.as_ms()
);
}
}
_ => {}
}
})
.unwrap();
Self { thread_hdl }
}
fn cache_block_time(bank: Arc<Bank>, blockstore: &Arc<Blockstore>) {
let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year());
let epoch = bank.epoch_schedule().get_epoch(bank.slot());
let stakes = HashMap::new();
let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes);
if let Err(e) = blockstore.cache_block_time(bank.slot(), slot_duration, stakes) {
error!("cache_block_time failed: slot {:?} {:?}", bank.slot(), e);
}
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -11,6 +11,7 @@ pub mod accounts_hash_verifier;
pub mod banking_stage;
pub mod bigtable_upload_service;
pub mod broadcast_stage;
pub mod cache_block_time_service;
pub mod cluster_info_vote_listener;
pub mod commitment_service;
pub mod completed_data_sets_service;

View File

@ -3,6 +3,7 @@
use crate::{
bank_weight_fork_choice::BankWeightForkChoice,
broadcast_stage::RetransmitSlotsSender,
cache_block_time_service::CacheBlockTimeSender,
cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots,
@ -106,6 +107,7 @@ pub struct ReplayStageConfig {
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_time_sender: Option<CacheBlockTimeSender>,
}
#[derive(Default)]
@ -235,6 +237,7 @@ impl ReplayStage {
block_commitment_cache,
transaction_status_sender,
rewards_recorder_sender,
cache_block_time_sender,
} = config;
trace!("replay stage");
@ -494,6 +497,7 @@ impl ReplayStage {
&subscriptions,
&block_commitment_cache,
&mut heaviest_subtree_fork_choice,
&cache_block_time_sender,
)?;
};
voting_time.stop();
@ -1004,6 +1008,7 @@ impl ReplayStage {
subscriptions: &Arc<RpcSubscriptions>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
cache_block_time_sender: &Option<CacheBlockTimeSender>,
) -> Result<()> {
if bank.is_empty() {
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
@ -1029,6 +1034,12 @@ impl ReplayStage {
blockstore
.set_roots(&rooted_slots)
.expect("Ledger set roots failed");
Self::cache_block_times(
blockstore,
bank_forks,
&rooted_slots,
cache_block_time_sender,
);
let highest_confirmed_root = Some(
block_commitment_cache
.read()
@ -1855,6 +1866,36 @@ impl ReplayStage {
}
}
fn cache_block_times(
blockstore: &Arc<Blockstore>,
bank_forks: &Arc<RwLock<BankForks>>,
rooted_slots: &[Slot],
cache_block_time_sender: &Option<CacheBlockTimeSender>,
) {
if let Some(cache_block_time_sender) = cache_block_time_sender {
for slot in rooted_slots {
if blockstore
.get_block_time(*slot)
.unwrap_or_default()
.is_none()
{
if let Some(rooted_bank) = bank_forks.read().unwrap().get(*slot) {
cache_block_time_sender
.send(rooted_bank.clone())
.unwrap_or_else(|err| {
warn!("cache_block_time_sender failed: {:?}", err)
});
} else {
error!(
"rooted_bank {:?} not available in BankForks; block time not cached",
slot
);
}
}
}
}
}
pub fn get_unlock_switch_vote_slot(cluster_type: ClusterType) -> Slot {
match cluster_type {
ClusterType::Development => 0,

View File

@ -54,7 +54,6 @@ use solana_sdk::{
stake_history::StakeHistory,
system_instruction,
sysvar::{stake_history, Sysvar},
timing::slot_duration_from_slots_per_year,
transaction::{self, Transaction},
};
use solana_stake_program::stake_state::StakeState;
@ -686,18 +685,7 @@ impl JsonRpcRequestProcessor {
.unwrap()
.highest_confirmed_root()
{
// This calculation currently assumes that bank.slots_per_year will remain unchanged after
// genesis (ie. that this bank's slot_per_year will be applicable to any rooted slot being
// queried). If these values will be variable in the future, those timing parameters will
// need to be stored persistently, and the slot_duration calculation will likely need to be
// moved upstream into blockstore. Also, an explicit commitment level will need to be set.
let bank = self.bank(None);
let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year());
let epoch = bank.epoch_schedule().get_epoch(slot);
let stakes = HashMap::new();
let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes);
let result = self.blockstore.get_block_time(slot, slot_duration, stakes);
let result = self.blockstore.get_block_time(slot);
self.check_slot_cleaned_up(&result, slot)?;
Ok(result.ok().unwrap_or(None))
} else {
@ -2544,6 +2532,7 @@ pub mod tests {
nonce, rpc_port,
signature::{Keypair, Signer},
system_program, system_transaction,
timing::slot_duration_from_slots_per_year,
transaction::{self, TransactionError},
};
use solana_transaction_status::{EncodedTransaction, TransactionWithStatusMeta, UiMessage};
@ -2555,7 +2544,7 @@ pub mod tests {
option::COption, solana_sdk::pubkey::Pubkey as SplTokenPubkey,
state::AccountState as TokenAccountState, state::Mint,
};
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};
const TEST_MINT_LAMPORTS: u64 = 1_000_000;
const TEST_SLOTS_PER_EPOCH: u64 = DELINQUENT_VALIDATOR_SLOT_DISTANCE + 1;
@ -2656,6 +2645,11 @@ pub mod tests {
for root in roots.iter() {
bank_forks.write().unwrap().set_root(*root, &None, Some(0));
let mut stakes = HashMap::new();
stakes.insert(leader_vote_keypair.pubkey(), (1, Account::default()));
blockstore
.cache_block_time(*root, Duration::from_millis(400), &stakes)
.unwrap();
}
}

View File

@ -5,6 +5,7 @@ use crate::{
accounts_background_service::AccountsBackgroundService,
accounts_hash_verifier::AccountsHashVerifier,
broadcast_stage::RetransmitSlotsSender,
cache_block_time_service::CacheBlockTimeSender,
cluster_info::ClusterInfo,
cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker},
cluster_slots::ClusterSlots,
@ -96,6 +97,7 @@ impl Tvu {
cfg: Option<Arc<AtomicBool>>,
transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
cache_block_time_sender: Option<CacheBlockTimeSender>,
snapshot_package_sender: Option<AccountsPackageSender>,
vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender,
@ -191,6 +193,7 @@ impl Tvu {
block_commitment_cache,
transaction_status_sender,
rewards_recorder_sender,
cache_block_time_sender,
};
let replay_stage = ReplayStage::new(
@ -327,6 +330,7 @@ pub mod tests {
None,
None,
None,
None,
Arc::new(VoteTracker::new(&bank)),
retransmit_slots_sender,
verified_vote_receiver,

View File

@ -2,6 +2,7 @@
use crate::{
broadcast_stage::BroadcastStageType,
cache_block_time_service::{CacheBlockTimeSender, CacheBlockTimeService},
cluster_info::{ClusterInfo, Node},
cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService,
@ -149,6 +150,8 @@ struct TransactionHistoryServices {
transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
rewards_recorder_service: Option<RewardsRecorderService>,
cache_block_time_sender: Option<CacheBlockTimeSender>,
cache_block_time_service: Option<CacheBlockTimeService>,
}
pub struct Validator {
@ -157,6 +160,7 @@ pub struct Validator {
rpc_service: Option<(JsonRpcService, PubSubService, RpcBanksService)>,
transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_service: Option<RewardsRecorderService>,
cache_block_time_service: Option<CacheBlockTimeService>,
gossip_service: GossipService,
serve_repair_service: ServeRepairService,
completed_data_sets_service: CompletedDataSetsService,
@ -244,6 +248,8 @@ impl Validator {
transaction_status_service,
rewards_recorder_sender,
rewards_recorder_service,
cache_block_time_sender,
cache_block_time_service,
},
) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit);
@ -477,6 +483,7 @@ impl Validator {
config.enable_partition.clone(),
transaction_status_sender.clone(),
rewards_recorder_sender,
cache_block_time_sender,
snapshot_package_sender,
vote_tracker.clone(),
retransmit_slots_sender,
@ -523,6 +530,7 @@ impl Validator {
rpc_service,
transaction_status_service,
rewards_recorder_service,
cache_block_time_service,
snapshot_packager_service,
completed_data_sets_service,
tpu,
@ -587,6 +595,10 @@ impl Validator {
rewards_recorder_service.join()?;
}
if let Some(cache_block_time_service) = self.cache_block_time_service {
cache_block_time_service.join()?;
}
if let Some(s) = self.snapshot_packager_service {
s.join()?;
}
@ -772,6 +784,14 @@ fn initialize_rpc_transaction_history_services(
let rewards_recorder_sender = Some(rewards_recorder_sender);
let rewards_recorder_service = Some(RewardsRecorderService::new(
rewards_receiver,
blockstore.clone(),
exit,
));
let (cache_block_time_sender, cache_block_time_receiver) = unbounded();
let cache_block_time_sender = Some(cache_block_time_sender);
let cache_block_time_service = Some(CacheBlockTimeService::new(
cache_block_time_receiver,
blockstore,
exit,
));
@ -780,6 +800,8 @@ fn initialize_rpc_transaction_history_services(
transaction_status_service,
rewards_recorder_sender,
rewards_recorder_service,
cache_block_time_sender,
cache_block_time_service,
}
}