- Encapsulate QoS Service metrics reporting within QosServioce, so client (#21191)
code (eg banking_stage) doesn't need to worry about it. - Remove dead cost_* stats from banking_stage, clean up call path.
This commit is contained in:
@ -8,6 +8,7 @@ use log::*;
|
||||
use rand::{thread_rng, Rng};
|
||||
use rayon::prelude::*;
|
||||
use solana_core::banking_stage::{BankingStage, BankingStageStats};
|
||||
use solana_core::qos_service::QosService;
|
||||
use solana_entry::entry::{next_hash, Entry};
|
||||
use solana_gossip::cluster_info::ClusterInfo;
|
||||
use solana_gossip::cluster_info::Node;
|
||||
@ -93,7 +94,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
|
||||
None::<Box<dyn Fn()>>,
|
||||
&BankingStageStats::default(),
|
||||
&recorder,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))),
|
||||
);
|
||||
});
|
||||
|
||||
|
@ -1,10 +1,7 @@
|
||||
//! The `banking_stage` processes Transaction messages. It is intended to be used
|
||||
//! to contruct a software pipeline. The stage uses all available CPU cores and
|
||||
//! can do its processing in parallel with signature verification on the GPU.
|
||||
use crate::{
|
||||
packet_hasher::PacketHasher,
|
||||
qos_service::{QosService, QosServiceStats},
|
||||
};
|
||||
use crate::{packet_hasher::PacketHasher, qos_service::QosService};
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
|
||||
use itertools::Itertools;
|
||||
use lru::LruCache;
|
||||
@ -295,6 +292,7 @@ impl BankingStage {
|
||||
PacketHasher::default(),
|
||||
)));
|
||||
let data_budget = Arc::new(DataBudget::default());
|
||||
let qos_service = Arc::new(QosService::new(cost_model));
|
||||
// Many banks that process transactions in parallel.
|
||||
assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING);
|
||||
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
|
||||
@ -319,7 +317,7 @@ impl BankingStage {
|
||||
let gossip_vote_sender = gossip_vote_sender.clone();
|
||||
let duplicates = duplicates.clone();
|
||||
let data_budget = data_budget.clone();
|
||||
let cost_model = cost_model.clone();
|
||||
let qos_service = qos_service.clone();
|
||||
Builder::new()
|
||||
.name("solana-banking-stage-tx".to_string())
|
||||
.spawn(move || {
|
||||
@ -335,7 +333,7 @@ impl BankingStage {
|
||||
gossip_vote_sender,
|
||||
&duplicates,
|
||||
&data_budget,
|
||||
cost_model,
|
||||
qos_service,
|
||||
);
|
||||
})
|
||||
.unwrap()
|
||||
@ -409,7 +407,7 @@ impl BankingStage {
|
||||
test_fn: Option<impl Fn()>,
|
||||
banking_stage_stats: &BankingStageStats,
|
||||
recorder: &TransactionRecorder,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
qos_service: &Arc<QosService>,
|
||||
) {
|
||||
let mut rebuffered_packets_len = 0;
|
||||
let mut new_tx_count = 0;
|
||||
@ -450,7 +448,7 @@ impl BankingStage {
|
||||
transaction_status_sender.clone(),
|
||||
gossip_vote_sender,
|
||||
banking_stage_stats,
|
||||
cost_model,
|
||||
qos_service,
|
||||
);
|
||||
if processed < verified_txs_len
|
||||
|| !Bank::should_bank_still_be_processing_txs(
|
||||
@ -556,7 +554,7 @@ impl BankingStage {
|
||||
banking_stage_stats: &BankingStageStats,
|
||||
recorder: &TransactionRecorder,
|
||||
data_budget: &DataBudget,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
qos_service: &Arc<QosService>,
|
||||
) -> BufferedPacketsDecision {
|
||||
let bank_start;
|
||||
let (
|
||||
@ -597,7 +595,7 @@ impl BankingStage {
|
||||
None::<Box<dyn Fn()>>,
|
||||
banking_stage_stats,
|
||||
recorder,
|
||||
cost_model,
|
||||
qos_service,
|
||||
);
|
||||
}
|
||||
BufferedPacketsDecision::Forward => {
|
||||
@ -676,7 +674,7 @@ impl BankingStage {
|
||||
gossip_vote_sender: ReplayVoteSender,
|
||||
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
|
||||
data_budget: &DataBudget,
|
||||
cost_model: Arc<RwLock<CostModel>>,
|
||||
qos_service: Arc<QosService>,
|
||||
) {
|
||||
let recorder = poh_recorder.lock().unwrap().recorder();
|
||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
@ -697,7 +695,7 @@ impl BankingStage {
|
||||
&banking_stage_stats,
|
||||
&recorder,
|
||||
data_budget,
|
||||
&cost_model,
|
||||
&qos_service,
|
||||
);
|
||||
if matches!(decision, BufferedPacketsDecision::Hold)
|
||||
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
|
||||
@ -732,7 +730,7 @@ impl BankingStage {
|
||||
&banking_stage_stats,
|
||||
duplicates,
|
||||
&recorder,
|
||||
&cost_model,
|
||||
&qos_service,
|
||||
) {
|
||||
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
@ -930,22 +928,13 @@ impl BankingStage {
|
||||
chunk_offset: usize,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
gossip_vote_sender: &ReplayVoteSender,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
qos_service: &Arc<QosService>,
|
||||
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
|
||||
let mut qos_service_stats = QosServiceStats::default();
|
||||
let qos_service = QosService::new(cost_model.clone());
|
||||
let tx_costs = qos_service.compute_transaction_costs(
|
||||
txs.iter(),
|
||||
bank.demote_program_write_locks(),
|
||||
&mut qos_service_stats,
|
||||
);
|
||||
let tx_costs =
|
||||
qos_service.compute_transaction_costs(txs.iter(), bank.demote_program_write_locks());
|
||||
|
||||
let transactions_qos_results = qos_service.select_transactions_per_cost(
|
||||
txs.iter(),
|
||||
tx_costs.iter(),
|
||||
bank,
|
||||
&mut qos_service_stats,
|
||||
);
|
||||
let transactions_qos_results =
|
||||
qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank);
|
||||
|
||||
// Only lock accounts for those transactions are selected for the block;
|
||||
// Once accounts are locked, other threads cannot encode transactions that will modify the
|
||||
@ -978,8 +967,6 @@ impl BankingStage {
|
||||
txs.len(),
|
||||
);
|
||||
|
||||
qos_service_stats.report();
|
||||
|
||||
(result, retryable_txs)
|
||||
}
|
||||
|
||||
@ -994,7 +981,7 @@ impl BankingStage {
|
||||
poh: &TransactionRecorder,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
gossip_vote_sender: &ReplayVoteSender,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
qos_service: &Arc<QosService>,
|
||||
) -> (usize, Vec<usize>) {
|
||||
let mut chunk_start = 0;
|
||||
let mut unprocessed_txs = vec![];
|
||||
@ -1011,7 +998,7 @@ impl BankingStage {
|
||||
chunk_start,
|
||||
transaction_status_sender.clone(),
|
||||
gossip_vote_sender,
|
||||
cost_model,
|
||||
qos_service,
|
||||
);
|
||||
trace!("process_transactions result: {:?}", result);
|
||||
|
||||
@ -1163,7 +1150,7 @@ impl BankingStage {
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
gossip_vote_sender: &ReplayVoteSender,
|
||||
banking_stage_stats: &BankingStageStats,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
qos_service: &Arc<QosService>,
|
||||
) -> (usize, usize, Vec<usize>) {
|
||||
let mut packet_conversion_time = Measure::start("packet_conversion");
|
||||
let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
|
||||
@ -1185,7 +1172,7 @@ impl BankingStage {
|
||||
poh,
|
||||
transaction_status_sender,
|
||||
gossip_vote_sender,
|
||||
cost_model,
|
||||
qos_service,
|
||||
);
|
||||
process_tx_time.stop();
|
||||
let unprocessed_tx_count = unprocessed_tx_indexes.len();
|
||||
@ -1303,7 +1290,7 @@ impl BankingStage {
|
||||
banking_stage_stats: &BankingStageStats,
|
||||
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
|
||||
recorder: &TransactionRecorder,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
qos_service: &Arc<QosService>,
|
||||
) -> Result<(), RecvTimeoutError> {
|
||||
let mut recv_time = Measure::start("process_packets_recv");
|
||||
let mms = verified_receiver.recv_timeout(recv_timeout)?;
|
||||
@ -1361,7 +1348,7 @@ impl BankingStage {
|
||||
transaction_status_sender.clone(),
|
||||
gossip_vote_sender,
|
||||
banking_stage_stats,
|
||||
cost_model,
|
||||
qos_service,
|
||||
);
|
||||
|
||||
new_tx_count += processed;
|
||||
@ -1574,7 +1561,6 @@ mod tests {
|
||||
poh_service::PohService,
|
||||
};
|
||||
use solana_rpc::transaction_status_service::TransactionStatusService;
|
||||
use solana_runtime::cost_model::CostModel;
|
||||
use solana_sdk::{
|
||||
hash::Hash,
|
||||
instruction::InstructionError,
|
||||
@ -2239,7 +2225,7 @@ mod tests {
|
||||
0,
|
||||
None,
|
||||
&gossip_vote_sender,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))),
|
||||
)
|
||||
.0
|
||||
.unwrap();
|
||||
@ -2281,7 +2267,7 @@ mod tests {
|
||||
0,
|
||||
None,
|
||||
&gossip_vote_sender,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))),
|
||||
)
|
||||
.0,
|
||||
Err(PohRecorderError::MaxHeightReached)
|
||||
@ -2369,7 +2355,7 @@ mod tests {
|
||||
0,
|
||||
None,
|
||||
&gossip_vote_sender,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))),
|
||||
);
|
||||
|
||||
poh_recorder
|
||||
@ -2478,7 +2464,7 @@ mod tests {
|
||||
&recorder,
|
||||
None,
|
||||
&gossip_vote_sender,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))),
|
||||
);
|
||||
|
||||
assert_eq!(processed_transactions_count, 0,);
|
||||
@ -2571,7 +2557,7 @@ mod tests {
|
||||
enable_cpi_and_log_storage: false,
|
||||
}),
|
||||
&gossip_vote_sender,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))),
|
||||
);
|
||||
|
||||
transaction_status_service.join().unwrap();
|
||||
@ -2699,7 +2685,7 @@ mod tests {
|
||||
None::<Box<dyn Fn()>>,
|
||||
&BankingStageStats::default(),
|
||||
&recorder,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))),
|
||||
);
|
||||
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
|
||||
// When the poh recorder has a bank, should process all non conflicting buffered packets.
|
||||
@ -2716,7 +2702,7 @@ mod tests {
|
||||
None::<Box<dyn Fn()>>,
|
||||
&BankingStageStats::default(),
|
||||
&recorder,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))),
|
||||
);
|
||||
if num_expected_unprocessed == 0 {
|
||||
assert!(buffered_packets.is_empty())
|
||||
@ -2782,7 +2768,7 @@ mod tests {
|
||||
test_fn,
|
||||
&BankingStageStats::default(),
|
||||
&recorder,
|
||||
&Arc::new(RwLock::new(CostModel::default())),
|
||||
&Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))),
|
||||
);
|
||||
|
||||
// Check everything is correct. All indexes after `interrupted_iteration`
|
||||
|
@ -9,58 +9,69 @@ use {
|
||||
cost_model::{CostModel, TransactionCost},
|
||||
cost_tracker::CostTrackerError,
|
||||
},
|
||||
solana_sdk::transaction::{self, SanitizedTransaction, TransactionError},
|
||||
std::sync::{Arc, RwLock},
|
||||
solana_sdk::{
|
||||
timing::AtomicInterval,
|
||||
transaction::{self, SanitizedTransaction, TransactionError},
|
||||
},
|
||||
std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicU64, Ordering},
|
||||
Arc, RwLock,
|
||||
},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
time::Duration,
|
||||
},
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct QosServiceStats {
|
||||
compute_cost_time: u64,
|
||||
cost_tracking_time: u64,
|
||||
selected_txs_count: u64,
|
||||
retried_txs_per_block_limit_count: u64,
|
||||
retried_txs_per_account_limit_count: u64,
|
||||
}
|
||||
|
||||
impl QosServiceStats {
|
||||
pub fn report(&mut self) {
|
||||
datapoint_info!(
|
||||
"qos-service-stats",
|
||||
("compute_cost_time", self.compute_cost_time, i64),
|
||||
("cost_tracking_time", self.cost_tracking_time, i64),
|
||||
("selected_txs_count", self.selected_txs_count, i64),
|
||||
(
|
||||
"retried_txs_per_block_limit_count",
|
||||
self.retried_txs_per_block_limit_count,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"retried_txs_per_account_limit_count",
|
||||
self.retried_txs_per_account_limit_count,
|
||||
i64
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct QosService {
|
||||
cost_model: Arc<RwLock<CostModel>>,
|
||||
metrics: Arc<QosServiceMetrics>,
|
||||
reporting_thread: Option<JoinHandle<()>>,
|
||||
running_flag: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl Drop for QosService {
|
||||
fn drop(&mut self) {
|
||||
self.running_flag.store(false, Ordering::Relaxed);
|
||||
self.reporting_thread
|
||||
.take()
|
||||
.unwrap()
|
||||
.join()
|
||||
.expect("qos service metrics reporting thread failed to join");
|
||||
}
|
||||
}
|
||||
|
||||
impl QosService {
|
||||
pub fn new(cost_model: Arc<RwLock<CostModel>>) -> Self {
|
||||
Self { cost_model }
|
||||
let running_flag = Arc::new(AtomicBool::new(true));
|
||||
let metrics = Arc::new(QosServiceMetrics::default());
|
||||
|
||||
let running_flag_clone = running_flag.clone();
|
||||
let metrics_clone = metrics.clone();
|
||||
let reporting_thread = Some(
|
||||
Builder::new()
|
||||
.name("solana-qos-service-metrics-repoting".to_string())
|
||||
.spawn(move || {
|
||||
Self::reporting_loop(running_flag_clone, metrics_clone);
|
||||
})
|
||||
.unwrap(),
|
||||
);
|
||||
Self {
|
||||
cost_model,
|
||||
metrics,
|
||||
reporting_thread,
|
||||
running_flag,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn compute_transaction_costs<'a>(
|
||||
&self,
|
||||
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
|
||||
demote_program_write_locks: bool,
|
||||
stats: &mut QosServiceStats,
|
||||
) -> Vec<TransactionCost> {
|
||||
let mut compute_cost_time = Measure::start("compute_cost_time");
|
||||
let cost_model = self.cost_model.read().unwrap();
|
||||
let txs_costs = transactions
|
||||
let txs_costs: Vec<_> = transactions
|
||||
.map(|tx| {
|
||||
let cost = cost_model.calculate_cost(tx, demote_program_write_locks);
|
||||
debug!(
|
||||
@ -73,7 +84,12 @@ impl QosService {
|
||||
})
|
||||
.collect();
|
||||
compute_cost_time.stop();
|
||||
stats.compute_cost_time += compute_cost_time.as_us();
|
||||
self.metrics
|
||||
.compute_cost_time
|
||||
.fetch_add(compute_cost_time.as_us(), Ordering::Relaxed);
|
||||
self.metrics
|
||||
.compute_cost_count
|
||||
.fetch_add(txs_costs.len() as u64, Ordering::Relaxed);
|
||||
txs_costs
|
||||
}
|
||||
|
||||
@ -84,7 +100,6 @@ impl QosService {
|
||||
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
|
||||
transactions_costs: impl Iterator<Item = &'a TransactionCost>,
|
||||
bank: &Arc<Bank>,
|
||||
stats: &mut QosServiceStats,
|
||||
) -> Vec<transaction::Result<()>> {
|
||||
let mut cost_tracking_time = Measure::start("cost_tracking_time");
|
||||
let mut cost_tracker = bank.write_cost_tracker().unwrap();
|
||||
@ -93,18 +108,18 @@ impl QosService {
|
||||
.map(|(tx, cost)| match cost_tracker.try_add(tx, cost) {
|
||||
Ok(current_block_cost) => {
|
||||
debug!("slot {:?}, transaction {:?}, cost {:?}, fit into current block, current block cost {}", bank.slot(), tx, cost, current_block_cost);
|
||||
stats.selected_txs_count += 1;
|
||||
self.metrics.selected_txs_count.fetch_add(1, Ordering::Relaxed);
|
||||
Ok(())
|
||||
},
|
||||
Err(e) => {
|
||||
debug!("slot {:?}, transaction {:?}, cost {:?}, not fit into current block, '{:?}'", bank.slot(), tx, cost, e);
|
||||
match e {
|
||||
CostTrackerError::WouldExceedBlockMaxLimit => {
|
||||
stats.retried_txs_per_block_limit_count += 1;
|
||||
self.metrics.retried_txs_per_block_limit_count.fetch_add(1, Ordering::Relaxed);
|
||||
Err(TransactionError::WouldExceedMaxBlockCostLimit)
|
||||
}
|
||||
CostTrackerError::WouldExceedAccountMaxLimit => {
|
||||
stats.retried_txs_per_account_limit_count += 1;
|
||||
self.metrics.retried_txs_per_account_limit_count.fetch_add(1, Ordering::Relaxed);
|
||||
Err(TransactionError::WouldExceedMaxAccountCostLimit)
|
||||
}
|
||||
}
|
||||
@ -112,9 +127,72 @@ impl QosService {
|
||||
})
|
||||
.collect();
|
||||
cost_tracking_time.stop();
|
||||
stats.cost_tracking_time += cost_tracking_time.as_us();
|
||||
self.metrics
|
||||
.cost_tracking_time
|
||||
.fetch_add(cost_tracking_time.as_us(), Ordering::Relaxed);
|
||||
select_results
|
||||
}
|
||||
|
||||
fn reporting_loop(running_flag: Arc<AtomicBool>, metrics: Arc<QosServiceMetrics>) {
|
||||
while running_flag.load(Ordering::Relaxed) {
|
||||
// hardcode to report every 1000ms
|
||||
metrics.report(1000u64);
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct QosServiceMetrics {
|
||||
last_report: AtomicInterval,
|
||||
compute_cost_time: AtomicU64,
|
||||
compute_cost_count: AtomicU64,
|
||||
cost_tracking_time: AtomicU64,
|
||||
selected_txs_count: AtomicU64,
|
||||
retried_txs_per_block_limit_count: AtomicU64,
|
||||
retried_txs_per_account_limit_count: AtomicU64,
|
||||
}
|
||||
|
||||
impl QosServiceMetrics {
|
||||
pub fn report(&self, report_interval_ms: u64) {
|
||||
if self.last_report.should_update(report_interval_ms) {
|
||||
datapoint_info!(
|
||||
"qos-service-stats",
|
||||
(
|
||||
"compute_cost_time",
|
||||
self.compute_cost_time.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"compute_cost_count",
|
||||
self.compute_cost_count.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"cost_tracking_time",
|
||||
self.cost_tracking_time.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"selected_txs_count",
|
||||
self.selected_txs_count.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"retried_txs_per_block_limit_count",
|
||||
self.retried_txs_per_block_limit_count
|
||||
.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"retried_txs_per_account_limit_count",
|
||||
self.retried_txs_per_account_limit_count
|
||||
.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -135,7 +213,7 @@ mod tests {
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_compute_transactions_costs() {
|
||||
fn test_compute_transaction_costs() {
|
||||
solana_logger::setup();
|
||||
|
||||
// make a vec of txs
|
||||
@ -158,11 +236,7 @@ mod tests {
|
||||
|
||||
let cost_model = Arc::new(RwLock::new(CostModel::default()));
|
||||
let qos_service = QosService::new(cost_model.clone());
|
||||
let txs_costs = qos_service.compute_transaction_costs(
|
||||
txs.iter(),
|
||||
false,
|
||||
&mut QosServiceStats::default(),
|
||||
);
|
||||
let txs_costs = qos_service.compute_transaction_costs(txs.iter(), false);
|
||||
|
||||
// verify the size of txs_costs and its contents
|
||||
assert_eq!(txs_costs.len(), txs.len());
|
||||
@ -214,23 +288,14 @@ mod tests {
|
||||
let txs = vec![transfer_tx.clone(), vote_tx.clone(), transfer_tx, vote_tx];
|
||||
|
||||
let qos_service = QosService::new(cost_model);
|
||||
let txs_costs = qos_service.compute_transaction_costs(
|
||||
txs.iter(),
|
||||
false,
|
||||
&mut QosServiceStats::default(),
|
||||
);
|
||||
let txs_costs = qos_service.compute_transaction_costs(txs.iter(), false);
|
||||
|
||||
// set cost tracker limit to fit 1 transfer tx, vote tx bypasses limit check
|
||||
let cost_limit = transfer_tx_cost;
|
||||
bank.write_cost_tracker()
|
||||
.unwrap()
|
||||
.set_limits(cost_limit, cost_limit);
|
||||
let results = qos_service.select_transactions_per_cost(
|
||||
txs.iter(),
|
||||
txs_costs.iter(),
|
||||
&bank,
|
||||
&mut QosServiceStats::default(),
|
||||
);
|
||||
let results = qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank);
|
||||
|
||||
// verify that first transfer tx and all votes are allowed
|
||||
assert_eq!(results.len(), txs.len());
|
||||
@ -239,4 +304,46 @@ mod tests {
|
||||
assert!(results[2].is_err());
|
||||
assert!(results[3].is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_async_report_metrics() {
|
||||
solana_logger::setup();
|
||||
|
||||
// make a vec of txs
|
||||
let txs_count = 2048usize;
|
||||
let keypair = Keypair::new();
|
||||
let transfer_tx = SanitizedTransaction::from_transaction_for_tests(
|
||||
system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()),
|
||||
);
|
||||
let mut txs_1 = Vec::with_capacity(txs_count);
|
||||
let mut txs_2 = Vec::with_capacity(txs_count);
|
||||
for _i in 0..txs_count {
|
||||
txs_1.push(transfer_tx.clone());
|
||||
txs_2.push(transfer_tx.clone());
|
||||
}
|
||||
|
||||
let cost_model = Arc::new(RwLock::new(CostModel::default()));
|
||||
let qos_service = Arc::new(QosService::new(cost_model));
|
||||
let qos_service_1 = qos_service.clone();
|
||||
let qos_service_2 = qos_service.clone();
|
||||
|
||||
let th_1 = thread::spawn(move || {
|
||||
qos_service_1.compute_transaction_costs(txs_1.iter(), false);
|
||||
});
|
||||
|
||||
let th_2 = thread::spawn(move || {
|
||||
qos_service_2.compute_transaction_costs(txs_2.iter(), false);
|
||||
});
|
||||
|
||||
th_1.join().expect("qos service 1 faield to join");
|
||||
th_2.join().expect("qos service 2 faield to join");
|
||||
|
||||
assert_eq!(
|
||||
txs_count as u64 * 2,
|
||||
qos_service
|
||||
.metrics
|
||||
.compute_cost_count
|
||||
.load(Ordering::Relaxed)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user