- make cost_tracker a member of bank, remove shared instance from TPU; (#20627)

- decouple cost_model from cost_tracker; allowing one cost_model
  instance being shared within a validator;
- update cost_model api to calculate_cost(&self...)->transaction_cost
This commit is contained in:
Tao Zhu
2021-10-19 14:37:33 -05:00
committed by GitHub
parent 27d1850c3d
commit 7496b5784b
8 changed files with 152 additions and 296 deletions

View File

@ -25,6 +25,7 @@ use solana_runtime::{
TransactionExecutionResult,
},
bank_utils,
cost_model::CostModel,
cost_tracker::CostTracker,
cost_tracker_stats::CostTrackerStats,
transaction_batch::TransactionBatch,
@ -55,7 +56,7 @@ use std::{
net::{SocketAddr, UdpSocket},
ops::DerefMut,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Mutex, RwLock},
sync::{Arc, Mutex, RwLock, RwLockReadGuard},
thread::{self, Builder, JoinHandle},
time::Duration,
time::Instant,
@ -96,7 +97,6 @@ pub struct BankingStageStats {
current_buffered_packet_batches_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize,
consumed_buffered_packets_count: AtomicUsize,
reset_cost_tracker_count: AtomicUsize,
cost_tracker_check_count: AtomicUsize,
cost_forced_retry_transactions_count: AtomicUsize,
@ -175,11 +175,6 @@ impl BankingStageStats {
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"reset_cost_tracker_count",
self.reset_cost_tracker_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"cost_tracker_check_count",
self.cost_tracker_check_count.swap(0, Ordering::Relaxed) as i64,
@ -288,7 +283,7 @@ impl BankingStage {
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_tracker: Arc<RwLock<CostTracker>>,
cost_model: Arc<RwLock<CostModel>>,
) -> Self {
Self::new_num_threads(
cluster_info,
@ -299,7 +294,7 @@ impl BankingStage {
Self::num_threads(),
transaction_status_sender,
gossip_vote_sender,
cost_tracker,
cost_model,
)
}
@ -312,7 +307,7 @@ impl BankingStage {
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_tracker: Arc<RwLock<CostTracker>>,
cost_model: Arc<RwLock<CostModel>>,
) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
// Single thread to generate entries from many banks.
@ -346,8 +341,8 @@ impl BankingStage {
let transaction_status_sender = transaction_status_sender.clone();
let gossip_vote_sender = gossip_vote_sender.clone();
let duplicates = duplicates.clone();
let cost_tracker = cost_tracker.clone();
let data_budget = data_budget.clone();
let cost_model = cost_model.clone();
Builder::new()
.name("solana-banking-stage-tx".to_string())
.spawn(move || {
@ -362,8 +357,8 @@ impl BankingStage {
transaction_status_sender,
gossip_vote_sender,
&duplicates,
&cost_tracker,
&data_budget,
cost_model,
);
})
.unwrap()
@ -426,24 +421,6 @@ impl BankingStage {
has_more_unprocessed_transactions
}
fn reset_cost_tracker_if_new_bank(
cost_tracker: &Arc<RwLock<CostTracker>>,
bank_slot: Slot,
banking_stage_stats: &BankingStageStats,
cost_tracker_stats: &mut CostTrackerStats,
) {
if cost_tracker
.write()
.unwrap()
.reset_if_new_bank(bank_slot, cost_tracker_stats)
{
// only increase counter when bank changed
banking_stage_stats
.reset_cost_tracker_count
.fetch_add(1, Ordering::Relaxed);
}
}
#[allow(clippy::too_many_arguments)]
pub fn consume_buffered_packets(
my_pubkey: &Pubkey,
@ -455,7 +432,7 @@ impl BankingStage {
test_fn: Option<impl Fn()>,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker_stats: &mut CostTrackerStats,
) {
let mut rebuffered_packets_len = 0;
@ -474,8 +451,8 @@ impl BankingStage {
original_unprocessed_indexes,
my_pubkey,
*next_leader,
cost_tracker,
banking_stage_stats,
cost_model,
cost_tracker_stats,
);
Self::update_buffered_packets_with_new_unprocessed(
@ -489,12 +466,6 @@ impl BankingStage {
bank_creation_time,
}) = bank_start
{
Self::reset_cost_tracker_if_new_bank(
cost_tracker,
working_bank.slot(),
banking_stage_stats,
cost_tracker_stats,
);
let (processed, verified_txs_len, new_unprocessed_indexes) =
Self::process_packets_transactions(
&working_bank,
@ -505,7 +476,7 @@ impl BankingStage {
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
cost_tracker,
cost_model,
cost_tracker_stats,
);
if processed < verified_txs_len
@ -611,8 +582,8 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
cost_tracker: &Arc<RwLock<CostTracker>>,
data_budget: &DataBudget,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker_stats: &mut CostTrackerStats,
) -> BufferedPacketsDecision {
let bank_start;
@ -624,15 +595,6 @@ impl BankingStage {
) = {
let poh = poh_recorder.lock().unwrap();
bank_start = poh.bank_start();
if let Some(ref bank_start) = bank_start {
Self::reset_cost_tracker_if_new_bank(
cost_tracker,
bank_start.working_bank.slot(),
banking_stage_stats,
cost_tracker_stats,
);
};
(
poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET),
PohRecorder::get_working_bank_if_not_expired(&bank_start.as_ref()),
@ -663,7 +625,7 @@ impl BankingStage {
None::<Box<dyn Fn()>>,
banking_stage_stats,
recorder,
cost_tracker,
cost_model,
cost_tracker_stats,
);
}
@ -742,8 +704,8 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
data_budget: &DataBudget,
cost_model: Arc<RwLock<CostModel>>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -764,8 +726,8 @@ impl BankingStage {
&gossip_vote_sender,
&banking_stage_stats,
&recorder,
cost_tracker,
data_budget,
&cost_model,
&mut cost_tracker_stats,
);
if matches!(decision, BufferedPacketsDecision::Hold)
@ -801,7 +763,7 @@ impl BankingStage {
&banking_stage_stats,
duplicates,
&recorder,
cost_tracker,
&cost_model,
&mut cost_tracker_stats,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
@ -906,7 +868,6 @@ impl BankingStage {
};
let mut execute_timings = ExecuteTimings::default();
let (
mut loaded_accounts,
results,
@ -1137,10 +1098,11 @@ impl BankingStage {
msgs: &Packets,
transaction_indexes: &[usize],
feature_set: &Arc<feature_set::FeatureSet>,
cost_tracker: &Arc<RwLock<CostTracker>>,
read_cost_tracker: &RwLockReadGuard<CostTracker>,
banking_stage_stats: &BankingStageStats,
demote_program_write_locks: bool,
votes_only: bool,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker_stats: &mut CostTrackerStats,
) -> (Vec<SanitizedTransaction>, Vec<usize>, Vec<usize>) {
let mut retryable_transaction_packet_indexes: Vec<usize> = vec![];
@ -1171,17 +1133,19 @@ impl BankingStage {
let mut cost_tracker_check_time = Measure::start("cost_tracker_check_time");
let (filtered_transactions, filter_transaction_packet_indexes) = {
let cost_tracker_readonly = cost_tracker.read().unwrap();
verified_transactions_with_packet_indexes
.into_iter()
.filter_map(|(tx, tx_index)| {
// excluding vote TX from cost_model, for now
let is_vote = &msgs.packets[tx_index].meta.is_simple_vote_tx;
if !is_vote
&& cost_tracker_readonly
&& read_cost_tracker
.would_transaction_fit(
&tx,
demote_program_write_locks,
&cost_model
.read()
.unwrap()
.calculate_cost(&tx, demote_program_write_locks),
cost_tracker_stats,
)
.is_err()
@ -1258,7 +1222,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker_stats: &mut CostTrackerStats,
) -> (usize, usize, Vec<usize>) {
let mut packet_conversion_time = Measure::start("packet_conversion");
@ -1267,10 +1231,11 @@ impl BankingStage {
msgs,
&packet_indexes,
&bank.feature_set,
cost_tracker,
&bank.read_cost_tracker().unwrap(),
banking_stage_stats,
bank.demote_program_write_locks(),
bank.vote_only_bank(),
cost_model,
cost_tracker_stats,
);
packet_conversion_time.stop();
@ -1308,9 +1273,12 @@ impl BankingStage {
let mut cost_tracking_time = Measure::start("cost_tracking_time");
transactions.iter().enumerate().for_each(|(index, tx)| {
if unprocessed_tx_indexes.iter().all(|&i| i != index) {
cost_tracker.write().unwrap().add_transaction_cost(
bank.write_cost_tracker().unwrap().add_transaction_cost(
tx,
bank.demote_program_write_locks(),
&cost_model
.read()
.unwrap()
.calculate_cost(tx, bank.demote_program_write_locks()),
cost_tracker_stats,
);
}
@ -1357,8 +1325,8 @@ impl BankingStage {
transaction_indexes: &[usize],
my_pubkey: &Pubkey,
next_leader: Option<Pubkey>,
cost_tracker: &Arc<RwLock<CostTracker>>,
banking_stage_stats: &BankingStageStats,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker_stats: &mut CostTrackerStats,
) -> Vec<usize> {
// Check if we are the next leader. If so, let's not filter the packets
@ -1377,10 +1345,11 @@ impl BankingStage {
msgs,
transaction_indexes,
&bank.feature_set,
cost_tracker,
&bank.read_cost_tracker().unwrap(),
banking_stage_stats,
bank.demote_program_write_locks(),
bank.vote_only_bank(),
cost_model,
cost_tracker_stats,
);
unprocessed_packet_conversion_time.stop();
@ -1442,7 +1411,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
recorder: &TransactionRecorder,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker_stats: &mut CostTrackerStats,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
@ -1490,12 +1459,6 @@ impl BankingStage {
working_bank,
bank_creation_time,
} = &*working_bank_start.unwrap();
Self::reset_cost_tracker_if_new_bank(
cost_tracker,
working_bank.slot(),
banking_stage_stats,
cost_tracker_stats,
);
let (processed, verified_txs_len, unprocessed_indexes) =
Self::process_packets_transactions(
@ -1507,7 +1470,7 @@ impl BankingStage {
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
cost_tracker,
cost_model,
cost_tracker_stats,
);
@ -1540,8 +1503,8 @@ impl BankingStage {
&packet_indexes,
my_pubkey,
next_leader,
cost_tracker,
banking_stage_stats,
cost_model,
cost_tracker_stats,
);
Self::push_unprocessed(
@ -1772,9 +1735,7 @@ mod tests {
gossip_verified_vote_receiver,
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
Arc::new(RwLock::new(CostModel::default())),
);
drop(verified_sender);
drop(gossip_verified_vote_sender);
@ -1823,9 +1784,7 @@ mod tests {
verified_gossip_vote_receiver,
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
Arc::new(RwLock::new(CostModel::default())),
);
trace!("sending bank");
drop(verified_sender);
@ -1898,9 +1857,7 @@ mod tests {
gossip_verified_vote_receiver,
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
Arc::new(RwLock::new(CostModel::default())),
);
// fund another account so we can send 2 good transactions in a single batch.
@ -2051,9 +2008,7 @@ mod tests {
3,
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
Arc::new(RwLock::new(CostModel::default())),
);
// wait for banking_stage to eat the packets
@ -2852,9 +2807,7 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
&Arc::new(RwLock::new(CostModel::default())),
&mut CostTrackerStats::default(),
);
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
@ -2872,9 +2825,7 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
&Arc::new(RwLock::new(CostModel::default())),
&mut CostTrackerStats::default(),
);
if num_expected_unprocessed == 0 {
@ -2941,9 +2892,7 @@ mod tests {
test_fn,
&BankingStageStats::default(),
&recorder,
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
&Arc::new(RwLock::new(CostModel::default())),
&mut CostTrackerStats::default(),
);
@ -3199,12 +3148,11 @@ mod tests {
&packets,
&packet_indexes,
&Arc::new(FeatureSet::default()),
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
&RwLock::new(CostTracker::default()).read().unwrap(),
&BankingStageStats::default(),
false,
votes_only,
&Arc::new(RwLock::new(CostModel::default())),
&mut CostTrackerStats::default(),
);
assert_eq!(2, txs.len());
@ -3216,12 +3164,11 @@ mod tests {
&packets,
&packet_indexes,
&Arc::new(FeatureSet::default()),
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
&RwLock::new(CostTracker::default()).read().unwrap(),
&BankingStageStats::default(),
false,
votes_only,
&Arc::new(RwLock::new(CostModel::default())),
&mut CostTrackerStats::default(),
);
assert_eq!(0, txs.len());
@ -3242,12 +3189,11 @@ mod tests {
&packets,
&packet_indexes,
&Arc::new(FeatureSet::default()),
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
&RwLock::new(CostTracker::default()).read().unwrap(),
&BankingStageStats::default(),
false,
votes_only,
&Arc::new(RwLock::new(CostModel::default())),
&mut CostTrackerStats::default(),
);
assert_eq!(3, txs.len());
@ -3259,12 +3205,11 @@ mod tests {
&packets,
&packet_indexes,
&Arc::new(FeatureSet::default()),
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
&RwLock::new(CostTracker::default()).read().unwrap(),
&BankingStageStats::default(),
false,
votes_only,
&Arc::new(RwLock::new(CostModel::default())),
&mut CostTrackerStats::default(),
);
assert_eq!(2, txs.len());
@ -3285,12 +3230,11 @@ mod tests {
&packets,
&packet_indexes,
&Arc::new(FeatureSet::default()),
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
&RwLock::new(CostTracker::default()).read().unwrap(),
&BankingStageStats::default(),
false,
votes_only,
&Arc::new(RwLock::new(CostModel::default())),
&mut CostTrackerStats::default(),
);
assert_eq!(3, txs.len());
@ -3302,12 +3246,11 @@ mod tests {
&packets,
&packet_indexes,
&Arc::new(FeatureSet::default()),
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
&RwLock::new(CostTracker::default()).read().unwrap(),
&BankingStageStats::default(),
false,
votes_only,
&Arc::new(RwLock::new(CostModel::default())),
&mut CostTrackerStats::default(),
);
assert_eq!(3, txs.len());

View File

@ -23,7 +23,6 @@ use solana_rpc::{
use solana_runtime::{
bank_forks::BankForks,
cost_model::CostModel,
cost_tracker::CostTracker,
vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
};
use std::{
@ -123,7 +122,6 @@ impl Tpu {
cluster_confirmed_slot_sender,
);
let cost_tracker = Arc::new(RwLock::new(CostTracker::new(cost_model.clone())));
let banking_stage = BankingStage::new(
cluster_info,
poh_recorder,
@ -132,7 +130,7 @@ impl Tpu {
verified_gossip_vote_packets_receiver,
transaction_status_sender,
replay_vote_sender,
cost_tracker,
cost_model.clone(),
);
let broadcast_stage = broadcast_type.new_broadcast_stage(