Add block height to ConfirmedBlock structs (#17523)
* Add BlockHeight CF to blockstore * Rename CacheBlockTimeService to be more general * Cache block-height using service * Fixup previous proto mishandling * Add block_height to block structs * Add block-height to solana block * Fallback to BankForks if block time or block height are not yet written to Blockstore * Add docs * Review comments
This commit is contained in:
@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
blockstore::Blockstore,
|
||||
blockstore_processor::{
|
||||
self, BlockstoreProcessorError, BlockstoreProcessorResult, CacheBlockTimeSender,
|
||||
self, BlockstoreProcessorError, BlockstoreProcessorResult, CacheBlockMetaSender,
|
||||
ProcessOptions, TransactionStatusSender,
|
||||
},
|
||||
entry::VerifyRecyclers,
|
||||
@ -37,7 +37,7 @@ pub fn load(
|
||||
snapshot_config: Option<&SnapshotConfig>,
|
||||
process_options: ProcessOptions,
|
||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||
cache_block_time_sender: Option<&CacheBlockTimeSender>,
|
||||
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
|
||||
) -> LoadResult {
|
||||
if let Some(snapshot_config) = snapshot_config.as_ref() {
|
||||
info!(
|
||||
@ -102,7 +102,7 @@ pub fn load(
|
||||
&process_options,
|
||||
&VerifyRecyclers::default(),
|
||||
transaction_status_sender,
|
||||
cache_block_time_sender,
|
||||
cache_block_meta_sender,
|
||||
),
|
||||
Some(deserialized_snapshot_hash),
|
||||
);
|
||||
@ -120,7 +120,7 @@ pub fn load(
|
||||
&blockstore,
|
||||
account_paths,
|
||||
process_options,
|
||||
cache_block_time_sender,
|
||||
cache_block_meta_sender,
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
@ -139,6 +139,7 @@ pub struct Blockstore {
|
||||
rewards_cf: LedgerColumn<cf::Rewards>,
|
||||
blocktime_cf: LedgerColumn<cf::Blocktime>,
|
||||
perf_samples_cf: LedgerColumn<cf::PerfSamples>,
|
||||
block_height_cf: LedgerColumn<cf::BlockHeight>,
|
||||
last_root: Arc<RwLock<Slot>>,
|
||||
insert_shreds_lock: Arc<Mutex<()>>,
|
||||
pub new_shreds_signals: Vec<SyncSender<bool>>,
|
||||
@ -309,6 +310,7 @@ impl Blockstore {
|
||||
let rewards_cf = db.column();
|
||||
let blocktime_cf = db.column();
|
||||
let perf_samples_cf = db.column();
|
||||
let block_height_cf = db.column();
|
||||
|
||||
let db = Arc::new(db);
|
||||
|
||||
@ -356,6 +358,7 @@ impl Blockstore {
|
||||
rewards_cf,
|
||||
blocktime_cf,
|
||||
perf_samples_cf,
|
||||
block_height_cf,
|
||||
new_shreds_signals: vec![],
|
||||
completed_slots_senders: vec![],
|
||||
insert_shreds_lock: Arc::new(Mutex::new(())),
|
||||
@ -1773,11 +1776,25 @@ impl Blockstore {
|
||||
}
|
||||
|
||||
pub fn cache_block_time(&self, slot: Slot, timestamp: UnixTimestamp) -> Result<()> {
|
||||
if self.get_block_time(slot).unwrap_or_default().is_none() {
|
||||
self.blocktime_cf.put(slot, ×tamp)
|
||||
} else {
|
||||
Ok(())
|
||||
self.blocktime_cf.put(slot, ×tamp)
|
||||
}
|
||||
|
||||
pub fn get_block_height(&self, slot: Slot) -> Result<Option<u64>> {
|
||||
datapoint_info!(
|
||||
"blockstore-rpc-api",
|
||||
("method", "get_block_height".to_string(), String)
|
||||
);
|
||||
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
|
||||
// lowest_cleanup_slot is the last slot that was not cleaned up by
|
||||
// LedgerCleanupService
|
||||
if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot {
|
||||
return Err(BlockstoreError::SlotCleanedUp);
|
||||
}
|
||||
self.block_height_cf.get(slot)
|
||||
}
|
||||
|
||||
pub fn cache_block_height(&self, slot: Slot, block_height: u64) -> Result<()> {
|
||||
self.block_height_cf.put(slot, &block_height)
|
||||
}
|
||||
|
||||
pub fn get_first_available_block(&self) -> Result<Slot> {
|
||||
@ -1857,7 +1874,12 @@ impl Blockstore {
|
||||
.get_protobuf_or_bincode::<StoredExtendedRewards>(slot)?
|
||||
.unwrap_or_default()
|
||||
.into();
|
||||
|
||||
// The Blocktime and BlockHeight column families are updated asynchronously; they
|
||||
// may not be written by the time the complete slot entries are available. In this
|
||||
// case, these fields will be `None`.
|
||||
let block_time = self.blocktime_cf.get(slot)?;
|
||||
let block_height = self.block_height_cf.get(slot)?;
|
||||
|
||||
let block = ConfirmedBlock {
|
||||
previous_blockhash: previous_blockhash.to_string(),
|
||||
@ -1867,6 +1889,7 @@ impl Blockstore {
|
||||
.map_transactions_to_statuses(slot, slot_transaction_iterator),
|
||||
rewards,
|
||||
block_time,
|
||||
block_height,
|
||||
};
|
||||
return Ok(block);
|
||||
}
|
||||
@ -6063,6 +6086,7 @@ pub mod tests {
|
||||
previous_blockhash: Hash::default().to_string(),
|
||||
rewards: vec![],
|
||||
block_time: None,
|
||||
block_height: None,
|
||||
};
|
||||
assert_eq!(confirmed_block, expected_block);
|
||||
|
||||
@ -6076,6 +6100,7 @@ pub mod tests {
|
||||
previous_blockhash: blockhash.to_string(),
|
||||
rewards: vec![],
|
||||
block_time: None,
|
||||
block_height: None,
|
||||
};
|
||||
assert_eq!(confirmed_block, expected_block);
|
||||
|
||||
@ -6092,13 +6117,17 @@ pub mod tests {
|
||||
previous_blockhash: blockhash.to_string(),
|
||||
rewards: vec![],
|
||||
block_time: None,
|
||||
block_height: None,
|
||||
};
|
||||
assert_eq!(complete_block, expected_complete_block);
|
||||
|
||||
// Test block_time returns, if available
|
||||
// Test block_time & block_height return, if available
|
||||
let timestamp = 1_576_183_541;
|
||||
ledger.blocktime_cf.put(slot + 1, ×tamp).unwrap();
|
||||
expected_block.block_time = Some(timestamp);
|
||||
let block_height = slot - 2;
|
||||
ledger.block_height_cf.put(slot + 1, &block_height).unwrap();
|
||||
expected_block.block_height = Some(block_height);
|
||||
|
||||
let confirmed_block = ledger.get_rooted_block(slot + 1, true).unwrap();
|
||||
assert_eq!(confirmed_block, expected_block);
|
||||
@ -6106,6 +6135,9 @@ pub mod tests {
|
||||
let timestamp = 1_576_183_542;
|
||||
ledger.blocktime_cf.put(slot + 2, ×tamp).unwrap();
|
||||
expected_complete_block.block_time = Some(timestamp);
|
||||
let block_height = slot - 1;
|
||||
ledger.block_height_cf.put(slot + 2, &block_height).unwrap();
|
||||
expected_complete_block.block_height = Some(block_height);
|
||||
|
||||
let complete_block = ledger.get_complete_block(slot + 2, true).unwrap();
|
||||
assert_eq!(complete_block, expected_complete_block);
|
||||
|
@ -55,6 +55,8 @@ const REWARDS_CF: &str = "rewards";
|
||||
const BLOCKTIME_CF: &str = "blocktime";
|
||||
/// Column family for Performance Samples
|
||||
const PERF_SAMPLES_CF: &str = "perf_samples";
|
||||
/// Column family for BlockHeight
|
||||
const BLOCK_HEIGHT_CF: &str = "block_height";
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum BlockstoreError {
|
||||
@ -151,6 +153,10 @@ pub mod columns {
|
||||
#[derive(Debug)]
|
||||
/// The performance samples column
|
||||
pub struct PerfSamples;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// The block height column
|
||||
pub struct BlockHeight;
|
||||
}
|
||||
|
||||
pub enum AccessType {
|
||||
@ -212,9 +218,9 @@ impl Rocks {
|
||||
recovery_mode: Option<BlockstoreRecoveryMode>,
|
||||
) -> Result<Rocks> {
|
||||
use columns::{
|
||||
AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans,
|
||||
PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
|
||||
TransactionStatusIndex,
|
||||
AddressSignatures, BlockHeight, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta,
|
||||
Index, Orphans, PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta,
|
||||
TransactionStatus, TransactionStatusIndex,
|
||||
};
|
||||
|
||||
fs::create_dir_all(&path)?;
|
||||
@ -259,6 +265,8 @@ impl Rocks {
|
||||
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options(&access_type));
|
||||
let perf_samples_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options(&access_type));
|
||||
let block_height_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(BlockHeight::NAME, get_cf_options(&access_type));
|
||||
|
||||
let cfs = vec![
|
||||
(SlotMeta::NAME, meta_cf_descriptor),
|
||||
@ -279,6 +287,7 @@ impl Rocks {
|
||||
(Rewards::NAME, rewards_cf_descriptor),
|
||||
(Blocktime::NAME, blocktime_cf_descriptor),
|
||||
(PerfSamples::NAME, perf_samples_cf_descriptor),
|
||||
(BlockHeight::NAME, block_height_cf_descriptor),
|
||||
];
|
||||
|
||||
// Open the database
|
||||
@ -316,9 +325,9 @@ impl Rocks {
|
||||
|
||||
fn columns(&self) -> Vec<&'static str> {
|
||||
use columns::{
|
||||
AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans,
|
||||
PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
|
||||
TransactionStatusIndex,
|
||||
AddressSignatures, BlockHeight, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta,
|
||||
Index, Orphans, PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta,
|
||||
TransactionStatus, TransactionStatusIndex,
|
||||
};
|
||||
|
||||
vec![
|
||||
@ -337,6 +346,7 @@ impl Rocks {
|
||||
Rewards::NAME,
|
||||
Blocktime::NAME,
|
||||
PerfSamples::NAME,
|
||||
BlockHeight::NAME,
|
||||
]
|
||||
}
|
||||
|
||||
@ -579,6 +589,14 @@ impl TypedColumn for columns::PerfSamples {
|
||||
type Type = blockstore_meta::PerfSample;
|
||||
}
|
||||
|
||||
impl SlotColumn for columns::BlockHeight {}
|
||||
impl ColumnName for columns::BlockHeight {
|
||||
const NAME: &'static str = BLOCK_HEIGHT_CF;
|
||||
}
|
||||
impl TypedColumn for columns::BlockHeight {
|
||||
type Type = u64;
|
||||
}
|
||||
|
||||
impl Column for columns::ShredCode {
|
||||
type Index = (u64, u64);
|
||||
|
||||
|
@ -380,7 +380,7 @@ pub fn process_blockstore(
|
||||
blockstore: &Blockstore,
|
||||
account_paths: Vec<PathBuf>,
|
||||
opts: ProcessOptions,
|
||||
cache_block_time_sender: Option<&CacheBlockTimeSender>,
|
||||
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
|
||||
) -> BlockstoreProcessorResult {
|
||||
if let Some(num_threads) = opts.override_num_threads {
|
||||
PAR_THREAD_POOL.with(|pool| {
|
||||
@ -409,7 +409,7 @@ pub fn process_blockstore(
|
||||
blockstore,
|
||||
&opts,
|
||||
&recyclers,
|
||||
cache_block_time_sender,
|
||||
cache_block_meta_sender,
|
||||
);
|
||||
do_process_blockstore_from_root(
|
||||
blockstore,
|
||||
@ -417,7 +417,7 @@ pub fn process_blockstore(
|
||||
&opts,
|
||||
&recyclers,
|
||||
None,
|
||||
cache_block_time_sender,
|
||||
cache_block_meta_sender,
|
||||
)
|
||||
}
|
||||
|
||||
@ -428,7 +428,7 @@ pub(crate) fn process_blockstore_from_root(
|
||||
opts: &ProcessOptions,
|
||||
recyclers: &VerifyRecyclers,
|
||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||
cache_block_time_sender: Option<&CacheBlockTimeSender>,
|
||||
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
|
||||
) -> BlockstoreProcessorResult {
|
||||
do_process_blockstore_from_root(
|
||||
blockstore,
|
||||
@ -436,7 +436,7 @@ pub(crate) fn process_blockstore_from_root(
|
||||
opts,
|
||||
recyclers,
|
||||
transaction_status_sender,
|
||||
cache_block_time_sender,
|
||||
cache_block_meta_sender,
|
||||
)
|
||||
}
|
||||
|
||||
@ -446,7 +446,7 @@ fn do_process_blockstore_from_root(
|
||||
opts: &ProcessOptions,
|
||||
recyclers: &VerifyRecyclers,
|
||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||
cache_block_time_sender: Option<&CacheBlockTimeSender>,
|
||||
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
|
||||
) -> BlockstoreProcessorResult {
|
||||
info!("processing ledger from slot {}...", bank.slot());
|
||||
|
||||
@ -507,7 +507,7 @@ fn do_process_blockstore_from_root(
|
||||
opts,
|
||||
recyclers,
|
||||
transaction_status_sender,
|
||||
cache_block_time_sender,
|
||||
cache_block_meta_sender,
|
||||
&mut timing,
|
||||
)?;
|
||||
initial_forks.sort_by_key(|bank| bank.slot());
|
||||
@ -815,7 +815,7 @@ fn process_bank_0(
|
||||
blockstore: &Blockstore,
|
||||
opts: &ProcessOptions,
|
||||
recyclers: &VerifyRecyclers,
|
||||
cache_block_time_sender: Option<&CacheBlockTimeSender>,
|
||||
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
|
||||
) {
|
||||
assert_eq!(bank0.slot(), 0);
|
||||
let mut progress = ConfirmationProgress::new(bank0.last_blockhash());
|
||||
@ -831,7 +831,7 @@ fn process_bank_0(
|
||||
)
|
||||
.expect("processing for bank 0 must succeed");
|
||||
bank0.freeze();
|
||||
cache_block_time(bank0, cache_block_time_sender);
|
||||
cache_block_meta(bank0, cache_block_meta_sender);
|
||||
}
|
||||
|
||||
// Given a bank, add its children to the pending slots queue if those children slots are
|
||||
@ -899,7 +899,7 @@ fn load_frozen_forks(
|
||||
opts: &ProcessOptions,
|
||||
recyclers: &VerifyRecyclers,
|
||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||
cache_block_time_sender: Option<&CacheBlockTimeSender>,
|
||||
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
|
||||
timing: &mut ExecuteTimings,
|
||||
) -> result::Result<Vec<Arc<Bank>>, BlockstoreProcessorError> {
|
||||
let mut initial_forks = HashMap::new();
|
||||
@ -954,7 +954,7 @@ fn load_frozen_forks(
|
||||
recyclers,
|
||||
&mut progress,
|
||||
transaction_status_sender,
|
||||
cache_block_time_sender,
|
||||
cache_block_meta_sender,
|
||||
None,
|
||||
timing,
|
||||
)
|
||||
@ -1128,7 +1128,7 @@ fn process_single_slot(
|
||||
recyclers: &VerifyRecyclers,
|
||||
progress: &mut ConfirmationProgress,
|
||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||
cache_block_time_sender: Option<&CacheBlockTimeSender>,
|
||||
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
|
||||
replay_vote_sender: Option<&ReplayVoteSender>,
|
||||
timing: &mut ExecuteTimings,
|
||||
) -> result::Result<(), BlockstoreProcessorError> {
|
||||
@ -1148,7 +1148,7 @@ fn process_single_slot(
|
||||
})?;
|
||||
|
||||
bank.freeze(); // all banks handled by this routine are created from complete slots
|
||||
cache_block_time(bank, cache_block_time_sender);
|
||||
cache_block_meta(bank, cache_block_meta_sender);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -1226,13 +1226,13 @@ impl TransactionStatusSender {
|
||||
}
|
||||
}
|
||||
|
||||
pub type CacheBlockTimeSender = Sender<Arc<Bank>>;
|
||||
pub type CacheBlockMetaSender = Sender<Arc<Bank>>;
|
||||
|
||||
pub fn cache_block_time(bank: &Arc<Bank>, cache_block_time_sender: Option<&CacheBlockTimeSender>) {
|
||||
if let Some(cache_block_time_sender) = cache_block_time_sender {
|
||||
cache_block_time_sender
|
||||
pub fn cache_block_meta(bank: &Arc<Bank>, cache_block_meta_sender: Option<&CacheBlockMetaSender>) {
|
||||
if let Some(cache_block_meta_sender) = cache_block_meta_sender {
|
||||
cache_block_meta_sender
|
||||
.send(bank.clone())
|
||||
.unwrap_or_else(|err| warn!("cache_block_time_sender failed: {:?}", err));
|
||||
.unwrap_or_else(|err| warn!("cache_block_meta_sender failed: {:?}", err));
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user