v1.3: Backport block time updates (#12423)

* Submit a vote timestamp every vote (#10630)

* Submit a timestamp for every vote

* Submit at most one vote timestamp per second

* Submit a timestamp for every new vote

Co-authored-by: Tyera Eulberg <tyera@solana.com>

* Timestamp first vote (#11856)

* Cache block time in Blockstore (#11955)

* Add blockstore column to cache block times

* Add method to cache block time

* Add service to cache block time

* Update rpc getBlockTime to use new method, and refactor blockstore slightly

* Return block_time with confirmed block, if available

* Add measure and warning to cache-block-time

Co-authored-by: Michael Vines <mvines@gmail.com>
This commit is contained in:
Tyera Eulberg
2020-09-23 13:54:49 -06:00
committed by GitHub
parent 0f3a555af5
commit 35a1ab981c
12 changed files with 308 additions and 146 deletions

View File

@ -0,0 +1,76 @@
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_ledger::blockstore::Blockstore;
use solana_measure::measure::Measure;
use solana_runtime::bank::Bank;
use solana_sdk::timing::slot_duration_from_slots_per_year;
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
};
pub type CacheBlockTimeReceiver = Receiver<Arc<Bank>>;
pub type CacheBlockTimeSender = Sender<Arc<Bank>>;
pub struct CacheBlockTimeService {
thread_hdl: JoinHandle<()>,
}
const CACHE_BLOCK_TIME_WARNING_MS: u64 = 150;
impl CacheBlockTimeService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
cache_block_time_receiver: CacheBlockTimeReceiver,
blockstore: Arc<Blockstore>,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("solana-cache-block-time".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let recv_result = cache_block_time_receiver.recv_timeout(Duration::from_secs(1));
match recv_result {
Err(RecvTimeoutError::Disconnected) => {
break;
}
Ok(bank) => {
let mut cache_block_time_timer = Measure::start("cache_block_time_timer");
Self::cache_block_time(bank, &blockstore);
cache_block_time_timer.stop();
if cache_block_time_timer.as_ms() > CACHE_BLOCK_TIME_WARNING_MS {
warn!(
"cache_block_time operation took: {}ms",
cache_block_time_timer.as_ms()
);
}
}
_ => {}
}
})
.unwrap();
Self { thread_hdl }
}
fn cache_block_time(bank: Arc<Bank>, blockstore: &Arc<Blockstore>) {
let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year());
let epoch = bank.epoch_schedule().get_epoch(bank.slot());
let stakes = HashMap::new();
let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes);
if let Err(e) = blockstore.cache_block_time(bank.slot(), slot_duration, stakes) {
error!("cache_block_time failed: slot {:?} {:?}", bank.slot(), e);
}
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -13,9 +13,7 @@ use solana_sdk::{
}; };
use solana_vote_program::{ use solana_vote_program::{
vote_instruction, vote_instruction,
vote_state::{ vote_state::{BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY},
BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY, TIMESTAMP_SLOT_INTERVAL,
},
}; };
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
@ -345,6 +343,22 @@ impl Tower {
last_vote last_vote
} }
fn maybe_timestamp(&mut self, current_slot: Slot) -> Option<UnixTimestamp> {
if current_slot > self.last_timestamp.slot
|| self.last_timestamp.slot == 0 && current_slot == self.last_timestamp.slot
{
let timestamp = Utc::now().timestamp();
if timestamp >= self.last_timestamp.timestamp {
self.last_timestamp = BlockTimestamp {
slot: current_slot,
timestamp,
};
return Some(timestamp);
}
}
None
}
pub fn root(&self) -> Option<Slot> { pub fn root(&self) -> Option<Slot> {
self.lockouts.root_slot self.lockouts.root_slot
} }
@ -653,21 +667,6 @@ impl Tower {
); );
} }
} }
fn maybe_timestamp(&mut self, current_slot: Slot) -> Option<UnixTimestamp> {
if self.last_timestamp.slot == 0
|| self.last_timestamp.slot < (current_slot - (current_slot % TIMESTAMP_SLOT_INTERVAL))
{
let timestamp = Utc::now().timestamp();
self.last_timestamp = BlockTimestamp {
slot: current_slot,
timestamp,
};
Some(timestamp)
} else {
None
}
}
} }
#[cfg(test)] #[cfg(test)]
@ -694,12 +693,7 @@ pub mod test {
vote_state::{Vote, VoteStateVersions, MAX_LOCKOUT_HISTORY}, vote_state::{Vote, VoteStateVersions, MAX_LOCKOUT_HISTORY},
vote_transaction, vote_transaction,
}; };
use std::{ use std::{collections::HashMap, rc::Rc, sync::RwLock};
collections::HashMap,
rc::Rc,
sync::RwLock,
{thread::sleep, time::Duration},
};
use trees::{tr, Tree, TreeWalk}; use trees::{tr, Tree, TreeWalk};
pub(crate) struct VoteSimulator { pub(crate) struct VoteSimulator {
@ -1832,17 +1826,15 @@ pub mod test {
#[test] #[test]
fn test_maybe_timestamp() { fn test_maybe_timestamp() {
let mut tower = Tower::default(); let mut tower = Tower::default();
assert!(tower.maybe_timestamp(TIMESTAMP_SLOT_INTERVAL).is_some()); assert!(tower.maybe_timestamp(0).is_some());
let BlockTimestamp { slot, timestamp } = tower.last_timestamp; assert!(tower.maybe_timestamp(1).is_some());
assert!(tower.maybe_timestamp(0).is_none()); // Refuse to timestamp an older slot
assert!(tower.maybe_timestamp(1).is_none()); // Refuse to timestamp the same slot twice
assert_eq!(tower.maybe_timestamp(1), None); tower.last_timestamp.timestamp -= 1; // Move last_timestamp into the past
assert_eq!(tower.maybe_timestamp(slot), None); assert!(tower.maybe_timestamp(2).is_some()); // slot 2 gets a timestamp
assert_eq!(tower.maybe_timestamp(slot + 1), None);
sleep(Duration::from_secs(1)); tower.last_timestamp.timestamp += 1_000_000; // Move last_timestamp well into the future
assert!(tower assert!(tower.maybe_timestamp(3).is_none()); // slot 3 gets no timestamp
.maybe_timestamp(slot + TIMESTAMP_SLOT_INTERVAL + 1)
.is_some());
assert!(tower.last_timestamp.timestamp > timestamp);
} }
} }

View File

@ -11,6 +11,7 @@ pub mod accounts_hash_verifier;
pub mod banking_stage; pub mod banking_stage;
pub mod bigtable_upload_service; pub mod bigtable_upload_service;
pub mod broadcast_stage; pub mod broadcast_stage;
pub mod cache_block_time_service;
pub mod cluster_info_vote_listener; pub mod cluster_info_vote_listener;
pub mod commitment_service; pub mod commitment_service;
pub mod completed_data_sets_service; pub mod completed_data_sets_service;

View File

@ -3,6 +3,7 @@
use crate::{ use crate::{
bank_weight_fork_choice::BankWeightForkChoice, bank_weight_fork_choice::BankWeightForkChoice,
broadcast_stage::RetransmitSlotsSender, broadcast_stage::RetransmitSlotsSender,
cache_block_time_service::CacheBlockTimeSender,
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker, cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
@ -106,6 +107,7 @@ pub struct ReplayStageConfig {
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
pub transaction_status_sender: Option<TransactionStatusSender>, pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>, pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_time_sender: Option<CacheBlockTimeSender>,
} }
#[derive(Default)] #[derive(Default)]
@ -235,6 +237,7 @@ impl ReplayStage {
block_commitment_cache, block_commitment_cache,
transaction_status_sender, transaction_status_sender,
rewards_recorder_sender, rewards_recorder_sender,
cache_block_time_sender,
} = config; } = config;
trace!("replay stage"); trace!("replay stage");
@ -494,6 +497,7 @@ impl ReplayStage {
&subscriptions, &subscriptions,
&block_commitment_cache, &block_commitment_cache,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&cache_block_time_sender,
)?; )?;
}; };
voting_time.stop(); voting_time.stop();
@ -1004,6 +1008,7 @@ impl ReplayStage {
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
cache_block_time_sender: &Option<CacheBlockTimeSender>,
) -> Result<()> { ) -> Result<()> {
if bank.is_empty() { if bank.is_empty() {
inc_new_counter_info!("replay_stage-voted_empty_bank", 1); inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
@ -1029,6 +1034,12 @@ impl ReplayStage {
blockstore blockstore
.set_roots(&rooted_slots) .set_roots(&rooted_slots)
.expect("Ledger set roots failed"); .expect("Ledger set roots failed");
Self::cache_block_times(
blockstore,
bank_forks,
&rooted_slots,
cache_block_time_sender,
);
let highest_confirmed_root = Some( let highest_confirmed_root = Some(
block_commitment_cache block_commitment_cache
.read() .read()
@ -1855,6 +1866,36 @@ impl ReplayStage {
} }
} }
fn cache_block_times(
blockstore: &Arc<Blockstore>,
bank_forks: &Arc<RwLock<BankForks>>,
rooted_slots: &[Slot],
cache_block_time_sender: &Option<CacheBlockTimeSender>,
) {
if let Some(cache_block_time_sender) = cache_block_time_sender {
for slot in rooted_slots {
if blockstore
.get_block_time(*slot)
.unwrap_or_default()
.is_none()
{
if let Some(rooted_bank) = bank_forks.read().unwrap().get(*slot) {
cache_block_time_sender
.send(rooted_bank.clone())
.unwrap_or_else(|err| {
warn!("cache_block_time_sender failed: {:?}", err)
});
} else {
error!(
"rooted_bank {:?} not available in BankForks; block time not cached",
slot
);
}
}
}
}
}
pub fn get_unlock_switch_vote_slot(cluster_type: ClusterType) -> Slot { pub fn get_unlock_switch_vote_slot(cluster_type: ClusterType) -> Slot {
match cluster_type { match cluster_type {
ClusterType::Development => 0, ClusterType::Development => 0,

View File

@ -57,7 +57,6 @@ use solana_sdk::{
stake_history::StakeHistory, stake_history::StakeHistory,
system_instruction, system_instruction,
sysvar::{stake_history, Sysvar}, sysvar::{stake_history, Sysvar},
timing::slot_duration_from_slots_per_year,
transaction::{self, Transaction}, transaction::{self, Transaction},
}; };
use solana_stake_program::stake_state::StakeState; use solana_stake_program::stake_state::StakeState;
@ -697,18 +696,7 @@ impl JsonRpcRequestProcessor {
.unwrap() .unwrap()
.highest_confirmed_root() .highest_confirmed_root()
{ {
// This calculation currently assumes that bank.slots_per_year will remain unchanged after let result = self.blockstore.get_block_time(slot);
// genesis (ie. that this bank's slot_per_year will be applicable to any rooted slot being
// queried). If these values will be variable in the future, those timing parameters will
// need to be stored persistently, and the slot_duration calculation will likely need to be
// moved upstream into blockstore. Also, an explicit commitment level will need to be set.
let bank = self.bank(None);
let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year());
let epoch = bank.epoch_schedule().get_epoch(slot);
let stakes = HashMap::new();
let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes);
let result = self.blockstore.get_block_time(slot, slot_duration, stakes);
self.check_slot_cleaned_up(&result, slot)?; self.check_slot_cleaned_up(&result, slot)?;
Ok(result.ok().unwrap_or(None)) Ok(result.ok().unwrap_or(None))
} else { } else {
@ -2577,6 +2565,7 @@ pub mod tests {
nonce, rpc_port, nonce, rpc_port,
signature::{Keypair, Signer}, signature::{Keypair, Signer},
system_program, system_transaction, system_program, system_transaction,
timing::slot_duration_from_slots_per_year,
transaction::{self, TransactionError}, transaction::{self, TransactionError},
}; };
use solana_transaction_status::{EncodedTransaction, TransactionWithStatusMeta, UiMessage}; use solana_transaction_status::{EncodedTransaction, TransactionWithStatusMeta, UiMessage};
@ -2589,7 +2578,7 @@ pub mod tests {
state::AccountState as TokenAccountState, state::AccountState as TokenAccountState,
state::Mint, state::Mint,
}; };
use std::collections::HashMap; use std::{collections::HashMap, time::Duration};
const TEST_MINT_LAMPORTS: u64 = 1_000_000; const TEST_MINT_LAMPORTS: u64 = 1_000_000;
const TEST_SLOTS_PER_EPOCH: u64 = DELINQUENT_VALIDATOR_SLOT_DISTANCE + 1; const TEST_SLOTS_PER_EPOCH: u64 = DELINQUENT_VALIDATOR_SLOT_DISTANCE + 1;
@ -2695,6 +2684,11 @@ pub mod tests {
for root in roots.iter() { for root in roots.iter() {
bank_forks.write().unwrap().set_root(*root, &None, Some(0)); bank_forks.write().unwrap().set_root(*root, &None, Some(0));
let mut stakes = HashMap::new();
stakes.insert(leader_vote_keypair.pubkey(), (1, Account::default()));
blockstore
.cache_block_time(*root, Duration::from_millis(400), &stakes)
.unwrap();
} }
} }

View File

@ -50,7 +50,7 @@ impl From<RpcCustomError> for Error {
}, },
RpcCustomError::BlockNotAvailable { slot } => Self { RpcCustomError::BlockNotAvailable { slot } => Self {
code: ErrorCode::ServerError(JSON_RPC_SERVER_ERROR_4), code: ErrorCode::ServerError(JSON_RPC_SERVER_ERROR_4),
message: format!("Block not available for slot {}", slot,), message: format!("Block not available for slot {}", slot),
data: None, data: None,
}, },
RpcCustomError::RpcNodeUnhealthy => Self { RpcCustomError::RpcNodeUnhealthy => Self {

View File

@ -5,6 +5,7 @@ use crate::{
accounts_background_service::AccountsBackgroundService, accounts_background_service::AccountsBackgroundService,
accounts_hash_verifier::AccountsHashVerifier, accounts_hash_verifier::AccountsHashVerifier,
broadcast_stage::RetransmitSlotsSender, broadcast_stage::RetransmitSlotsSender,
cache_block_time_service::CacheBlockTimeSender,
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker}, cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker},
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
@ -96,6 +97,7 @@ impl Tvu {
cfg: Option<Arc<AtomicBool>>, cfg: Option<Arc<AtomicBool>>,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>, rewards_recorder_sender: Option<RewardsRecorderSender>,
cache_block_time_sender: Option<CacheBlockTimeSender>,
snapshot_package_sender: Option<AccountsPackageSender>, snapshot_package_sender: Option<AccountsPackageSender>,
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender, retransmit_slots_sender: RetransmitSlotsSender,
@ -191,6 +193,7 @@ impl Tvu {
block_commitment_cache, block_commitment_cache,
transaction_status_sender, transaction_status_sender,
rewards_recorder_sender, rewards_recorder_sender,
cache_block_time_sender,
}; };
let replay_stage = ReplayStage::new( let replay_stage = ReplayStage::new(
@ -327,6 +330,7 @@ pub mod tests {
None, None,
None, None,
None, None,
None,
Arc::new(VoteTracker::new(&bank)), Arc::new(VoteTracker::new(&bank)),
retransmit_slots_sender, retransmit_slots_sender,
verified_vote_receiver, verified_vote_receiver,

View File

@ -2,6 +2,7 @@
use crate::{ use crate::{
broadcast_stage::BroadcastStageType, broadcast_stage::BroadcastStageType,
cache_block_time_service::{CacheBlockTimeSender, CacheBlockTimeService},
cluster_info::{ClusterInfo, Node}, cluster_info::{ClusterInfo, Node},
cluster_info_vote_listener::VoteTracker, cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService, completed_data_sets_service::CompletedDataSetsService,
@ -155,6 +156,8 @@ struct TransactionHistoryServices {
transaction_status_service: Option<TransactionStatusService>, transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_sender: Option<RewardsRecorderSender>, rewards_recorder_sender: Option<RewardsRecorderSender>,
rewards_recorder_service: Option<RewardsRecorderService>, rewards_recorder_service: Option<RewardsRecorderService>,
cache_block_time_sender: Option<CacheBlockTimeSender>,
cache_block_time_service: Option<CacheBlockTimeService>,
} }
pub struct Validator { pub struct Validator {
@ -163,6 +166,7 @@ pub struct Validator {
rpc_service: Option<(JsonRpcService, PubSubService, RpcBanksService)>, rpc_service: Option<(JsonRpcService, PubSubService, RpcBanksService)>,
transaction_status_service: Option<TransactionStatusService>, transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_service: Option<RewardsRecorderService>, rewards_recorder_service: Option<RewardsRecorderService>,
cache_block_time_service: Option<CacheBlockTimeService>,
sample_performance_service: Option<SamplePerformanceService>, sample_performance_service: Option<SamplePerformanceService>,
gossip_service: GossipService, gossip_service: GossipService,
serve_repair_service: ServeRepairService, serve_repair_service: ServeRepairService,
@ -257,6 +261,8 @@ impl Validator {
transaction_status_service, transaction_status_service,
rewards_recorder_sender, rewards_recorder_sender,
rewards_recorder_service, rewards_recorder_service,
cache_block_time_sender,
cache_block_time_service,
}, },
) = new_banks_from_ledger(config, ledger_path, config.poh_verify, &exit); ) = new_banks_from_ledger(config, ledger_path, config.poh_verify, &exit);
@ -501,6 +507,7 @@ impl Validator {
config.enable_partition.clone(), config.enable_partition.clone(),
transaction_status_sender.clone(), transaction_status_sender.clone(),
rewards_recorder_sender, rewards_recorder_sender,
cache_block_time_sender,
snapshot_package_sender, snapshot_package_sender,
vote_tracker.clone(), vote_tracker.clone(),
retransmit_slots_sender, retransmit_slots_sender,
@ -547,6 +554,7 @@ impl Validator {
rpc_service, rpc_service,
transaction_status_service, transaction_status_service,
rewards_recorder_service, rewards_recorder_service,
cache_block_time_service,
sample_performance_service, sample_performance_service,
snapshot_packager_service, snapshot_packager_service,
completed_data_sets_service, completed_data_sets_service,
@ -612,6 +620,10 @@ impl Validator {
rewards_recorder_service.join()?; rewards_recorder_service.join()?;
} }
if let Some(cache_block_time_service) = self.cache_block_time_service {
cache_block_time_service.join()?;
}
if let Some(sample_performance_service) = self.sample_performance_service { if let Some(sample_performance_service) = self.sample_performance_service {
sample_performance_service.join()?; sample_performance_service.join()?;
} }
@ -801,6 +813,14 @@ fn initialize_rpc_transaction_history_services(
let rewards_recorder_sender = Some(rewards_recorder_sender); let rewards_recorder_sender = Some(rewards_recorder_sender);
let rewards_recorder_service = Some(RewardsRecorderService::new( let rewards_recorder_service = Some(RewardsRecorderService::new(
rewards_receiver, rewards_receiver,
blockstore.clone(),
exit,
));
let (cache_block_time_sender, cache_block_time_receiver) = unbounded();
let cache_block_time_sender = Some(cache_block_time_sender);
let cache_block_time_service = Some(CacheBlockTimeService::new(
cache_block_time_receiver,
blockstore, blockstore,
exit, exit,
)); ));
@ -809,6 +829,8 @@ fn initialize_rpc_transaction_history_services(
transaction_status_service, transaction_status_service,
rewards_recorder_sender, rewards_recorder_sender,
rewards_recorder_service, rewards_recorder_service,
cache_block_time_sender,
cache_block_time_service,
} }
} }

View File

@ -41,7 +41,7 @@ use solana_transaction_status::{
EncodedTransaction, Rewards, TransactionStatusMeta, TransactionWithStatusMeta, EncodedTransaction, Rewards, TransactionStatusMeta, TransactionWithStatusMeta,
UiTransactionEncoding, UiTransactionStatusMeta, UiTransactionEncoding, UiTransactionStatusMeta,
}; };
use solana_vote_program::{vote_instruction::VoteInstruction, vote_state::TIMESTAMP_SLOT_INTERVAL}; use solana_vote_program::vote_instruction::VoteInstruction;
use std::{ use std::{
cell::RefCell, cell::RefCell,
cmp, cmp,
@ -136,7 +136,7 @@ pub struct Blockstore {
transaction_status_index_cf: LedgerColumn<cf::TransactionStatusIndex>, transaction_status_index_cf: LedgerColumn<cf::TransactionStatusIndex>,
active_transaction_status_index: RwLock<u64>, active_transaction_status_index: RwLock<u64>,
rewards_cf: LedgerColumn<cf::Rewards>, rewards_cf: LedgerColumn<cf::Rewards>,
_blocktime_cf: LedgerColumn<cf::Blocktime>, blocktime_cf: LedgerColumn<cf::Blocktime>,
perf_samples_cf: LedgerColumn<cf::PerfSamples>, perf_samples_cf: LedgerColumn<cf::PerfSamples>,
last_root: Arc<RwLock<Slot>>, last_root: Arc<RwLock<Slot>>,
insert_shreds_lock: Arc<Mutex<()>>, insert_shreds_lock: Arc<Mutex<()>>,
@ -293,9 +293,7 @@ impl Blockstore {
let address_signatures_cf = db.column(); let address_signatures_cf = db.column();
let transaction_status_index_cf = db.column(); let transaction_status_index_cf = db.column();
let rewards_cf = db.column(); let rewards_cf = db.column();
// This column is created (but never populated) in order to maintain compatibility with let blocktime_cf = db.column();
// newer versions of Blockstore.
let _blocktime_cf = db.column();
let perf_samples_cf = db.column(); let perf_samples_cf = db.column();
let db = Arc::new(db); let db = Arc::new(db);
@ -341,7 +339,7 @@ impl Blockstore {
transaction_status_index_cf, transaction_status_index_cf,
active_transaction_status_index: RwLock::new(active_transaction_status_index), active_transaction_status_index: RwLock::new(active_transaction_status_index),
rewards_cf, rewards_cf,
_blocktime_cf, blocktime_cf,
perf_samples_cf, perf_samples_cf,
new_shreds_signals: vec![], new_shreds_signals: vec![],
completed_slots_senders: vec![], completed_slots_senders: vec![],
@ -1564,12 +1562,7 @@ impl Blockstore {
} }
} }
pub fn get_block_time( pub fn get_block_time(&self, slot: Slot) -> Result<Option<UnixTimestamp>> {
&self,
slot: Slot,
slot_duration: Duration,
stakes: &HashMap<Pubkey, (u64, Account)>,
) -> Result<Option<UnixTimestamp>> {
datapoint_info!( datapoint_info!(
"blockstore-rpc-api", "blockstore-rpc-api",
("method", "get_block_time".to_string(), String) ("method", "get_block_time".to_string(), String)
@ -1580,18 +1573,56 @@ impl Blockstore {
if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot { if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot {
return Err(BlockstoreError::SlotCleanedUp); return Err(BlockstoreError::SlotCleanedUp);
} }
self.blocktime_cf.get(slot)
}
fn get_timestamp_slots(&self, slot: Slot, timestamp_sample_range: usize) -> Vec<Slot> {
let root_iterator = self
.db
.iter::<cf::Root>(IteratorMode::From(slot, IteratorDirection::Reverse));
if !self.is_root(slot) || root_iterator.is_err() {
return vec![];
}
let mut get_slots = Measure::start("get_slots");
let mut timestamp_slots: Vec<Slot> = root_iterator
.unwrap()
.map(|(iter_slot, _)| iter_slot)
.take(timestamp_sample_range)
.collect();
timestamp_slots.sort();
get_slots.stop();
datapoint_info!(
"blockstore-get-timestamp-slots",
("slot", slot as i64, i64),
("get_slots_us", get_slots.as_us() as i64, i64)
);
timestamp_slots
}
pub fn cache_block_time(
&self,
slot: Slot,
slot_duration: Duration,
stakes: &HashMap<Pubkey, (u64, Account)>,
) -> Result<()> {
if !self.is_root(slot) {
return Err(BlockstoreError::SlotNotRooted);
}
let mut get_unique_timestamps = Measure::start("get_unique_timestamps"); let mut get_unique_timestamps = Measure::start("get_unique_timestamps");
let unique_timestamps: HashMap<Pubkey, (Slot, UnixTimestamp)> = self let unique_timestamps: HashMap<Pubkey, (Slot, UnixTimestamp)> = self
.get_timestamp_slots(slot, TIMESTAMP_SLOT_INTERVAL, TIMESTAMP_SLOT_RANGE) .get_timestamp_slots(slot, TIMESTAMP_SLOT_RANGE)
.into_iter() .into_iter()
.flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default()) .flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default())
.collect(); .collect();
get_unique_timestamps.stop(); get_unique_timestamps.stop();
if unique_timestamps.is_empty() {
return Err(BlockstoreError::NoVoteTimestampsInRange);
}
let mut calculate_timestamp = Measure::start("calculate_timestamp"); let mut calculate_timestamp = Measure::start("calculate_timestamp");
let stake_weighted_timestamps = let stake_weighted_timestamp =
calculate_stake_weighted_timestamp(unique_timestamps, stakes, slot, slot_duration); calculate_stake_weighted_timestamp(unique_timestamps, stakes, slot, slot_duration)
.ok_or(BlockstoreError::EmptyEpochStakes)?;
calculate_timestamp.stop(); calculate_timestamp.stop();
datapoint_info!( datapoint_info!(
"blockstore-get-block-time", "blockstore-get-block-time",
@ -1607,52 +1638,7 @@ impl Blockstore {
i64 i64
) )
); );
self.blocktime_cf.put(slot, &stake_weighted_timestamp)
Ok(stake_weighted_timestamps)
}
fn get_timestamp_slots(
&self,
slot: Slot,
timestamp_interval: u64,
timestamp_sample_range: usize,
) -> Vec<Slot> {
let baseline_slot = slot - (slot % timestamp_interval);
let root_iterator = self.db.iter::<cf::Root>(IteratorMode::From(
baseline_slot,
IteratorDirection::Forward,
));
if !self.is_root(slot) || root_iterator.is_err() {
return vec![];
}
let mut get_slots = Measure::start("get_slots");
let mut slots: Vec<Slot> = root_iterator
.unwrap()
.map(|(iter_slot, _)| iter_slot)
.take(timestamp_sample_range)
.filter(|&iter_slot| iter_slot <= slot)
.collect();
if slots.len() < timestamp_sample_range && baseline_slot >= timestamp_interval {
let earlier_baseline = baseline_slot - timestamp_interval;
let earlier_root_iterator = self.db.iter::<cf::Root>(IteratorMode::From(
earlier_baseline,
IteratorDirection::Forward,
));
if let Ok(iterator) = earlier_root_iterator {
slots = iterator
.map(|(iter_slot, _)| iter_slot)
.take(timestamp_sample_range)
.collect();
}
}
get_slots.stop();
datapoint_info!(
"blockstore-get-timestamp-slots",
("slot", slot as i64, i64),
("get_slots_us", get_slots.as_us() as i64, i64)
);
slots
} }
pub fn get_first_available_block(&self) -> Result<Slot> { pub fn get_first_available_block(&self) -> Result<Slot> {
@ -1705,6 +1691,7 @@ impl Blockstore {
.unwrap_or_else(|| panic!("Rooted slot {:?} must have blockhash", slot)); .unwrap_or_else(|| panic!("Rooted slot {:?} must have blockhash", slot));
let rewards = self.rewards_cf.get(slot)?.unwrap_or_else(Vec::new); let rewards = self.rewards_cf.get(slot)?.unwrap_or_else(Vec::new);
let block_time = self.blocktime_cf.get(slot)?;
let block = ConfirmedBlock { let block = ConfirmedBlock {
previous_blockhash: previous_blockhash.to_string(), previous_blockhash: previous_blockhash.to_string(),
@ -1716,7 +1703,7 @@ impl Blockstore {
slot_transaction_iterator, slot_transaction_iterator,
), ),
rewards, rewards,
block_time: None, // See https://github.com/solana-labs/solana/issues/10089 block_time,
}; };
return Ok(block); return Ok(block);
} }
@ -5584,8 +5571,6 @@ pub mod tests {
fn test_get_timestamp_slots() { fn test_get_timestamp_slots() {
let timestamp_sample_range = 5; let timestamp_sample_range = 5;
let ticks_per_slot = 5; let ticks_per_slot = 5;
// Smaller interval than TIMESTAMP_SLOT_INTERVAL for convenience of building blockstore
let timestamp_interval = 7;
/* /*
Build a blockstore with < TIMESTAMP_SLOT_RANGE roots Build a blockstore with < TIMESTAMP_SLOT_RANGE roots
*/ */
@ -5612,11 +5597,11 @@ pub mod tests {
blockstore.set_roots(&[1, 2, 3]).unwrap(); blockstore.set_roots(&[1, 2, 3]).unwrap();
assert_eq!( assert_eq!(
blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range), blockstore.get_timestamp_slots(2, timestamp_sample_range),
vec![0, 1, 2] vec![0, 1, 2]
); );
assert_eq!( assert_eq!(
blockstore.get_timestamp_slots(3, timestamp_interval, timestamp_sample_range), blockstore.get_timestamp_slots(3, timestamp_sample_range),
vec![0, 1, 2, 3] vec![0, 1, 2, 3]
); );
@ -5624,14 +5609,13 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
/* /*
Build a blockstore in the ledger with the following rooted slots: Build a blockstore in the ledger with gaps in rooted slot sequence
[0, 1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 14, 15, 16, 17]
*/ */
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap(); let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore.set_roots(&[0]).unwrap(); blockstore.set_roots(&[0]).unwrap();
let desired_roots = vec![1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19]; let desired_roots = vec![1, 2, 3, 5, 6, 8, 11];
let mut last_entry_hash = Hash::default(); let mut last_entry_hash = Hash::default();
for (i, slot) in desired_roots.iter().enumerate() { for (i, slot) in desired_roots.iter().enumerate() {
let parent = { let parent = {
@ -5652,28 +5636,20 @@ pub mod tests {
blockstore.set_roots(&desired_roots).unwrap(); blockstore.set_roots(&desired_roots).unwrap();
assert_eq!( assert_eq!(
blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range), blockstore.get_timestamp_slots(2, timestamp_sample_range),
vec![0, 1, 2] vec![0, 1, 2]
); );
assert_eq!( assert_eq!(
blockstore.get_timestamp_slots(6, timestamp_interval, timestamp_sample_range), blockstore.get_timestamp_slots(6, timestamp_sample_range),
vec![0, 1, 2, 3, 4] vec![1, 2, 3, 5, 6]
); );
assert_eq!( assert_eq!(
blockstore.get_timestamp_slots(8, timestamp_interval, timestamp_sample_range), blockstore.get_timestamp_slots(8, timestamp_sample_range),
vec![0, 1, 2, 3, 4] vec![2, 3, 5, 6, 8]
); );
assert_eq!( assert_eq!(
blockstore.get_timestamp_slots(13, timestamp_interval, timestamp_sample_range), blockstore.get_timestamp_slots(11, timestamp_sample_range),
vec![8, 9, 10, 11, 12] vec![3, 5, 6, 8, 11]
);
assert_eq!(
blockstore.get_timestamp_slots(18, timestamp_interval, timestamp_sample_range),
vec![8, 9, 10, 11, 12]
);
assert_eq!(
blockstore.get_timestamp_slots(19, timestamp_interval, timestamp_sample_range),
vec![14, 16, 17, 18, 19]
); );
} }
@ -5777,7 +5753,7 @@ pub mod tests {
let confirmed_block = ledger.get_confirmed_block(slot + 1, None).unwrap(); let confirmed_block = ledger.get_confirmed_block(slot + 1, None).unwrap();
assert_eq!(confirmed_block.transactions.len(), 100); assert_eq!(confirmed_block.transactions.len(), 100);
let expected_block = ConfirmedBlock { let mut expected_block = ConfirmedBlock {
transactions: expected_transactions transactions: expected_transactions
.iter() .iter()
.cloned() .cloned()
@ -5797,6 +5773,14 @@ pub mod tests {
let not_root = ledger.get_confirmed_block(slot + 2, None).unwrap_err(); let not_root = ledger.get_confirmed_block(slot + 2, None).unwrap_err();
assert_matches!(not_root, BlockstoreError::SlotNotRooted); assert_matches!(not_root, BlockstoreError::SlotNotRooted);
// Test block_time returns, if available
let timestamp = 1_576_183_541;
ledger.blocktime_cf.put(slot + 1, &timestamp).unwrap();
expected_block.block_time = Some(timestamp);
let confirmed_block = ledger.get_confirmed_block(slot + 1, None).unwrap();
assert_eq!(confirmed_block, expected_block);
drop(ledger); drop(ledger);
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
} }
@ -5844,14 +5828,25 @@ pub mod tests {
); );
assert_eq!(blockstore.get_block_timestamps(2).unwrap(), vec![]); assert_eq!(blockstore.get_block_timestamps(2).unwrap(), vec![]);
// Build epoch vote_accounts HashMap to test stake-weighted block time
blockstore.set_roots(&[3, 8]).unwrap(); blockstore.set_roots(&[3, 8]).unwrap();
let mut stakes = HashMap::new(); let mut stakes = HashMap::new();
let slot_duration = Duration::from_millis(400);
for slot in &[1, 2, 3, 8] {
assert!(blockstore
.cache_block_time(*slot, slot_duration, &stakes)
.is_err());
}
// Build epoch vote_accounts HashMap to test stake-weighted block time
for (i, keypair) in vote_keypairs.iter().enumerate() { for (i, keypair) in vote_keypairs.iter().enumerate() {
stakes.insert(keypair.pubkey(), (1 + i as u64, Account::default())); stakes.insert(keypair.pubkey(), (1 + i as u64, Account::default()));
} }
let slot_duration = Duration::from_millis(400); for slot in &[1, 2, 3, 8] {
let block_time_slot_3 = blockstore.get_block_time(3, slot_duration, &stakes); blockstore
.cache_block_time(*slot, slot_duration, &stakes)
.unwrap();
}
let block_time_slot_3 = blockstore.get_block_time(3);
let mut total_stake = 0; let mut total_stake = 0;
let mut expected_time: u64 = (0..6) let mut expected_time: u64 = (0..6)
@ -5867,14 +5862,53 @@ pub mod tests {
expected_time /= total_stake; expected_time /= total_stake;
assert_eq!(block_time_slot_3.unwrap().unwrap() as u64, expected_time); assert_eq!(block_time_slot_3.unwrap().unwrap() as u64, expected_time);
assert_eq!( assert_eq!(
blockstore blockstore.get_block_time(8).unwrap().unwrap() as u64,
.get_block_time(8, slot_duration, &stakes)
.unwrap()
.unwrap() as u64,
expected_time + 2 // At 400ms block duration, 5 slots == 2sec expected_time + 2 // At 400ms block duration, 5 slots == 2sec
); );
} }
#[test]
fn test_get_block_time_no_timestamps() {
let vote_keypairs: Vec<Keypair> = (0..6).map(|_| Keypair::new()).collect();
// Populate slot 1 with vote transactions, none of which have timestamps
let mut vote_entries: Vec<Entry> = Vec::new();
for (i, keypair) in vote_keypairs.iter().enumerate() {
let vote = Vote {
slots: vec![1],
hash: Hash::default(),
timestamp: None,
};
let vote_ix = vote_instruction::vote(&keypair.pubkey(), &keypair.pubkey(), vote);
let vote_msg = Message::new(&[vote_ix], Some(&keypair.pubkey()));
let vote_tx = Transaction::new(&[keypair], vote_msg, Hash::default());
vote_entries.push(next_entry_mut(&mut Hash::default(), 0, vec![vote_tx]));
let mut tick = create_ticks(1, 0, hash(&serialize(&i).unwrap()));
vote_entries.append(&mut tick);
}
let shreds = entries_to_test_shreds(vote_entries, 1, 0, true, 0);
let ledger_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&ledger_path).unwrap();
blockstore.insert_shreds(shreds, None, false).unwrap();
// Populate slot 2 with ticks only
fill_blockstore_slot_with_ticks(&blockstore, 6, 2, 1, Hash::default());
blockstore.set_roots(&[0, 1, 2]).unwrap();
// Build epoch vote_accounts HashMap to test stake-weighted block time
let mut stakes = HashMap::new();
for (i, keypair) in vote_keypairs.iter().enumerate() {
stakes.insert(keypair.pubkey(), (1 + i as u64, Account::default()));
}
let slot_duration = Duration::from_millis(400);
for slot in &[1, 2, 3, 8] {
assert!(blockstore
.cache_block_time(*slot, slot_duration, &stakes)
.is_err());
assert_eq!(blockstore.get_block_time(*slot).unwrap(), None);
}
}
#[test] #[test]
fn test_calculate_stake_weighted_timestamp() { fn test_calculate_stake_weighted_timestamp() {
let recent_timestamp: UnixTimestamp = 1_578_909_061; let recent_timestamp: UnixTimestamp = 1_578_909_061;

View File

@ -233,7 +233,7 @@ impl Blockstore {
.compact_range(from_slot, to_slot) .compact_range(from_slot, to_slot)
.unwrap_or(false) .unwrap_or(false)
&& self && self
._blocktime_cf .blocktime_cf
.compact_range(from_slot, to_slot) .compact_range(from_slot, to_slot)
.unwrap_or(false) .unwrap_or(false)
&& self && self

View File

@ -69,6 +69,8 @@ pub enum BlockstoreError {
UnpackError(#[from] UnpackError), UnpackError(#[from] UnpackError),
UnableToSetOpenFileDescriptorLimit, UnableToSetOpenFileDescriptorLimit,
TransactionStatusSlotMismatch, TransactionStatusSlotMismatch,
EmptyEpochStakes,
NoVoteTimestampsInRange,
} }
pub type Result<T> = std::result::Result<T, BlockstoreError>; pub type Result<T> = std::result::Result<T, BlockstoreError>;

View File

@ -32,10 +32,6 @@ pub const INITIAL_LOCKOUT: usize = 2;
// smaller numbers makes // smaller numbers makes
pub const MAX_EPOCH_CREDITS_HISTORY: usize = 64; pub const MAX_EPOCH_CREDITS_HISTORY: usize = 64;
// Frequency of timestamp Votes. In v0.22.0, this is approximately 30min with cluster clock
// defaults, intended to limit block time drift to < 1hr
pub const TIMESTAMP_SLOT_INTERVAL: u64 = 4500;
#[frozen_abi(digest = "69hYtmmcuqPbhpc64ZaNJDidaUcg66CW6wzPFiuYZ3To")] #[frozen_abi(digest = "69hYtmmcuqPbhpc64ZaNJDidaUcg66CW6wzPFiuYZ3To")]
#[derive(Serialize, Default, Deserialize, Debug, PartialEq, Eq, Clone, AbiExample)] #[derive(Serialize, Default, Deserialize, Debug, PartialEq, Eq, Clone, AbiExample)]
pub struct Vote { pub struct Vote {