- cost_tracker is data member of a bank, it can report metrics when bank is frozen (#20802)
- removed cost_tracker_stats and histogram - move stats reporting outside of bank freeze
This commit is contained in:
@ -27,7 +27,6 @@ use solana_runtime::{
|
||||
bank_utils,
|
||||
cost_model::CostModel,
|
||||
cost_tracker::CostTracker,
|
||||
cost_tracker_stats::CostTrackerStats,
|
||||
transaction_batch::TransactionBatch,
|
||||
vote_sender_types::ReplayVoteSender,
|
||||
};
|
||||
@ -440,7 +439,6 @@ impl BankingStage {
|
||||
banking_stage_stats: &BankingStageStats,
|
||||
recorder: &TransactionRecorder,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
cost_tracker_stats: &mut CostTrackerStats,
|
||||
) {
|
||||
let mut rebuffered_packets_len = 0;
|
||||
let mut new_tx_count = 0;
|
||||
@ -460,7 +458,6 @@ impl BankingStage {
|
||||
*next_leader,
|
||||
banking_stage_stats,
|
||||
cost_model,
|
||||
cost_tracker_stats,
|
||||
);
|
||||
Self::update_buffered_packets_with_new_unprocessed(
|
||||
original_unprocessed_indexes,
|
||||
@ -484,7 +481,6 @@ impl BankingStage {
|
||||
gossip_vote_sender,
|
||||
banking_stage_stats,
|
||||
cost_model,
|
||||
cost_tracker_stats,
|
||||
);
|
||||
if processed < verified_txs_len
|
||||
|| !Bank::should_bank_still_be_processing_txs(
|
||||
@ -591,7 +587,6 @@ impl BankingStage {
|
||||
recorder: &TransactionRecorder,
|
||||
data_budget: &DataBudget,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
cost_tracker_stats: &mut CostTrackerStats,
|
||||
) -> BufferedPacketsDecision {
|
||||
let bank_start;
|
||||
let (
|
||||
@ -633,7 +628,6 @@ impl BankingStage {
|
||||
banking_stage_stats,
|
||||
recorder,
|
||||
cost_model,
|
||||
cost_tracker_stats,
|
||||
);
|
||||
}
|
||||
BufferedPacketsDecision::Forward => {
|
||||
@ -718,7 +712,6 @@ impl BankingStage {
|
||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let mut buffered_packets = VecDeque::with_capacity(batch_limit);
|
||||
let banking_stage_stats = BankingStageStats::new(id);
|
||||
let mut cost_tracker_stats = CostTrackerStats::new(id, 0);
|
||||
loop {
|
||||
let my_pubkey = cluster_info.id();
|
||||
while !buffered_packets.is_empty() {
|
||||
@ -735,7 +728,6 @@ impl BankingStage {
|
||||
&recorder,
|
||||
data_budget,
|
||||
&cost_model,
|
||||
&mut cost_tracker_stats,
|
||||
);
|
||||
if matches!(decision, BufferedPacketsDecision::Hold)
|
||||
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
|
||||
@ -771,7 +763,6 @@ impl BankingStage {
|
||||
duplicates,
|
||||
&recorder,
|
||||
&cost_model,
|
||||
&mut cost_tracker_stats,
|
||||
) {
|
||||
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
@ -1110,7 +1101,6 @@ impl BankingStage {
|
||||
demote_program_write_locks: bool,
|
||||
votes_only: bool,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
cost_tracker_stats: &mut CostTrackerStats,
|
||||
) -> (Vec<SanitizedTransaction>, Vec<usize>, Vec<usize>) {
|
||||
let mut retryable_transaction_packet_indexes: Vec<usize> = vec![];
|
||||
|
||||
@ -1153,7 +1143,6 @@ impl BankingStage {
|
||||
.read()
|
||||
.unwrap()
|
||||
.calculate_cost(&tx, demote_program_write_locks),
|
||||
cost_tracker_stats,
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
@ -1230,7 +1219,6 @@ impl BankingStage {
|
||||
gossip_vote_sender: &ReplayVoteSender,
|
||||
banking_stage_stats: &BankingStageStats,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
cost_tracker_stats: &mut CostTrackerStats,
|
||||
) -> (usize, usize, Vec<usize>) {
|
||||
let mut packet_conversion_time = Measure::start("packet_conversion");
|
||||
let (transactions, transaction_to_packet_indexes, retryable_packet_indexes) =
|
||||
@ -1243,7 +1231,6 @@ impl BankingStage {
|
||||
bank.demote_program_write_locks(),
|
||||
bank.vote_only_bank(),
|
||||
cost_model,
|
||||
cost_tracker_stats,
|
||||
);
|
||||
packet_conversion_time.stop();
|
||||
inc_new_counter_info!("banking_stage-packet_conversion", 1);
|
||||
@ -1286,7 +1273,6 @@ impl BankingStage {
|
||||
.read()
|
||||
.unwrap()
|
||||
.calculate_cost(tx, bank.demote_program_write_locks()),
|
||||
cost_tracker_stats,
|
||||
);
|
||||
}
|
||||
});
|
||||
@ -1334,7 +1320,6 @@ impl BankingStage {
|
||||
next_leader: Option<Pubkey>,
|
||||
banking_stage_stats: &BankingStageStats,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
cost_tracker_stats: &mut CostTrackerStats,
|
||||
) -> Vec<usize> {
|
||||
// Check if we are the next leader. If so, let's not filter the packets
|
||||
// as we'll filter it again while processing the packets.
|
||||
@ -1357,7 +1342,6 @@ impl BankingStage {
|
||||
bank.demote_program_write_locks(),
|
||||
bank.vote_only_bank(),
|
||||
cost_model,
|
||||
cost_tracker_stats,
|
||||
);
|
||||
unprocessed_packet_conversion_time.stop();
|
||||
|
||||
@ -1419,7 +1403,6 @@ impl BankingStage {
|
||||
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
|
||||
recorder: &TransactionRecorder,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
cost_tracker_stats: &mut CostTrackerStats,
|
||||
) -> Result<(), RecvTimeoutError> {
|
||||
let mut recv_time = Measure::start("process_packets_recv");
|
||||
let mms = verified_receiver.recv_timeout(recv_timeout)?;
|
||||
@ -1478,7 +1461,6 @@ impl BankingStage {
|
||||
gossip_vote_sender,
|
||||
banking_stage_stats,
|
||||
cost_model,
|
||||
cost_tracker_stats,
|
||||
);
|
||||
|
||||
new_tx_count += processed;
|
||||
@ -1512,7 +1494,6 @@ impl BankingStage {
|
||||
next_leader,
|
||||
banking_stage_stats,
|
||||
cost_model,
|
||||
cost_tracker_stats,
|
||||
);
|
||||
Self::push_unprocessed(
|
||||
buffered_packets,
|
||||
@ -2825,7 +2806,6 @@ mod tests {
|
||||
&BankingStageStats::default(),
|
||||
&recorder,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&mut CostTrackerStats::default(),
|
||||
);
|
||||
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
|
||||
// When the poh recorder has a bank, should process all non conflicting buffered packets.
|
||||
@ -2843,7 +2823,6 @@ mod tests {
|
||||
&BankingStageStats::default(),
|
||||
&recorder,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&mut CostTrackerStats::default(),
|
||||
);
|
||||
if num_expected_unprocessed == 0 {
|
||||
assert!(buffered_packets.is_empty())
|
||||
@ -2910,7 +2889,6 @@ mod tests {
|
||||
&BankingStageStats::default(),
|
||||
&recorder,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&mut CostTrackerStats::default(),
|
||||
);
|
||||
|
||||
// Check everything is correct. All indexes after `interrupted_iteration`
|
||||
@ -3170,7 +3148,6 @@ mod tests {
|
||||
false,
|
||||
votes_only,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&mut CostTrackerStats::default(),
|
||||
);
|
||||
assert_eq!(2, txs.len());
|
||||
assert_eq!(vec![0, 1], tx_packet_index);
|
||||
@ -3186,7 +3163,6 @@ mod tests {
|
||||
false,
|
||||
votes_only,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&mut CostTrackerStats::default(),
|
||||
);
|
||||
assert_eq!(0, txs.len());
|
||||
assert_eq!(0, tx_packet_index.len());
|
||||
@ -3211,7 +3187,6 @@ mod tests {
|
||||
false,
|
||||
votes_only,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&mut CostTrackerStats::default(),
|
||||
);
|
||||
assert_eq!(3, txs.len());
|
||||
assert_eq!(vec![0, 1, 2], tx_packet_index);
|
||||
@ -3227,7 +3202,6 @@ mod tests {
|
||||
false,
|
||||
votes_only,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&mut CostTrackerStats::default(),
|
||||
);
|
||||
assert_eq!(2, txs.len());
|
||||
assert_eq!(vec![0, 2], tx_packet_index);
|
||||
@ -3252,7 +3226,6 @@ mod tests {
|
||||
false,
|
||||
votes_only,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&mut CostTrackerStats::default(),
|
||||
);
|
||||
assert_eq!(3, txs.len());
|
||||
assert_eq!(vec![0, 1, 2], tx_packet_index);
|
||||
@ -3268,7 +3241,6 @@ mod tests {
|
||||
false,
|
||||
votes_only,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&mut CostTrackerStats::default(),
|
||||
);
|
||||
assert_eq!(3, txs.len());
|
||||
assert_eq!(vec![0, 1, 2], tx_packet_index);
|
||||
|
@ -5,7 +5,7 @@
|
||||
|
||||
use solana_ledger::blockstore::Blockstore;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_runtime::{bank::ExecuteTimings, cost_model::CostModel};
|
||||
use solana_runtime::{bank::Bank, bank::ExecuteTimings, cost_model::CostModel};
|
||||
use solana_sdk::timing::timestamp;
|
||||
use std::{
|
||||
sync::{
|
||||
@ -65,7 +65,12 @@ impl CostUpdateServiceTiming {
|
||||
}
|
||||
}
|
||||
|
||||
pub type CostUpdateReceiver = Receiver<ExecuteTimings>;
|
||||
pub enum CostUpdate {
|
||||
FrozenBank { bank: Arc<Bank> },
|
||||
ExecuteTiming { execute_timings: ExecuteTimings },
|
||||
}
|
||||
|
||||
pub type CostUpdateReceiver = Receiver<CostUpdate>;
|
||||
|
||||
pub struct CostUpdateService {
|
||||
thread_hdl: JoinHandle<()>,
|
||||
@ -113,8 +118,15 @@ impl CostUpdateService {
|
||||
update_count = 0_u64;
|
||||
let mut update_cost_model_time = Measure::start("update_cost_model_time");
|
||||
for cost_update in cost_update_receiver.try_iter() {
|
||||
dirty |= Self::update_cost_model(&cost_model, &cost_update);
|
||||
update_count += 1;
|
||||
match cost_update {
|
||||
CostUpdate::FrozenBank { bank } => {
|
||||
bank.read_cost_tracker().unwrap().report_stats(bank.slot());
|
||||
}
|
||||
CostUpdate::ExecuteTiming { execute_timings } => {
|
||||
dirty |= Self::update_cost_model(&cost_model, &execute_timings);
|
||||
update_count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
update_cost_model_time.stop();
|
||||
|
||||
|
@ -14,6 +14,7 @@ use {
|
||||
consensus::{
|
||||
ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes, SWITCH_FORK_THRESHOLD,
|
||||
},
|
||||
cost_update_service::CostUpdate,
|
||||
fork_choice::{ForkChoice, SelectVoteAndResetForkResult},
|
||||
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
|
||||
latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks,
|
||||
@ -323,7 +324,7 @@ impl ReplayStage {
|
||||
gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
|
||||
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
|
||||
cluster_slots_update_sender: ClusterSlotsUpdateSender,
|
||||
cost_update_sender: Sender<ExecuteTimings>,
|
||||
cost_update_sender: Sender<CostUpdate>,
|
||||
voting_sender: Sender<VoteOp>,
|
||||
) -> Self {
|
||||
let ReplayStageConfig {
|
||||
@ -1986,7 +1987,7 @@ impl ReplayStage {
|
||||
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
|
||||
latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks,
|
||||
cluster_slots_update_sender: &ClusterSlotsUpdateSender,
|
||||
cost_update_sender: &Sender<ExecuteTimings>,
|
||||
cost_update_sender: &Sender<CostUpdate>,
|
||||
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
|
||||
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
|
||||
) -> bool {
|
||||
@ -2084,6 +2085,13 @@ impl ReplayStage {
|
||||
transaction_status_sender.send_transaction_status_freeze_message(&bank);
|
||||
}
|
||||
bank.freeze();
|
||||
// report cost tracker stats
|
||||
cost_update_sender
|
||||
.send(CostUpdate::FrozenBank { bank: bank.clone() })
|
||||
.unwrap_or_else(|err| {
|
||||
warn!("cost_update_sender failed sending bank stats: {:?}", err)
|
||||
});
|
||||
|
||||
let bank_hash = bank.hash();
|
||||
assert_ne!(bank_hash, Hash::default());
|
||||
// Needs to be updated before `check_slot_agrees_with_cluster()` so that
|
||||
@ -2150,7 +2158,7 @@ impl ReplayStage {
|
||||
// send accumulated excute-timings to cost_update_service
|
||||
if !execute_timings.details.per_program_timings.is_empty() {
|
||||
cost_update_sender
|
||||
.send(execute_timings)
|
||||
.send(CostUpdate::ExecuteTiming { execute_timings })
|
||||
.unwrap_or_else(|err| warn!("cost_update_sender failed: {:?}", err));
|
||||
}
|
||||
|
||||
|
@ -39,7 +39,6 @@ use solana_runtime::{
|
||||
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, SnapshotRequestHandler,
|
||||
},
|
||||
accounts_db::AccountShrinkThreshold,
|
||||
bank::ExecuteTimings,
|
||||
bank_forks::BankForks,
|
||||
commitment::BlockCommitmentCache,
|
||||
cost_model::CostModel,
|
||||
@ -54,7 +53,7 @@ use std::{
|
||||
net::UdpSocket,
|
||||
sync::{
|
||||
atomic::AtomicBool,
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
mpsc::{channel, Receiver},
|
||||
Arc, Mutex, RwLock,
|
||||
},
|
||||
thread,
|
||||
@ -295,10 +294,7 @@ impl Tvu {
|
||||
bank_forks.clone(),
|
||||
);
|
||||
|
||||
let (cost_update_sender, cost_update_receiver): (
|
||||
Sender<ExecuteTimings>,
|
||||
Receiver<ExecuteTimings>,
|
||||
) = channel();
|
||||
let (cost_update_sender, cost_update_receiver) = channel();
|
||||
let cost_update_service = CostUpdateService::new(
|
||||
exit.clone(),
|
||||
blockstore.clone(),
|
||||
|
Reference in New Issue
Block a user