Fixes to TPS calculation and reporting (#3836)
Fixes to TPS calculations and reporting
This commit is contained in:
@ -115,6 +115,7 @@ impl BankingStage {
|
||||
.iter()
|
||||
.flat_map(|(p, start_index)| &p.packets[**start_index..])
|
||||
.collect();
|
||||
inc_new_counter_info!("banking_stage-forwarded_packets", packets.len());
|
||||
let blobs = packet::packets_to_blobs(&packets);
|
||||
|
||||
for blob in blobs {
|
||||
@ -132,12 +133,14 @@ impl BankingStage {
|
||||
let mut bank_shutdown = false;
|
||||
for (msgs, offset, vers) in buffered_packets {
|
||||
if bank_shutdown {
|
||||
inc_new_counter_info!("banking_stage-rebuffered_packets", vers.len() - *offset);
|
||||
unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned()));
|
||||
continue;
|
||||
}
|
||||
|
||||
let bank = poh_recorder.lock().unwrap().bank();
|
||||
if bank.is_none() {
|
||||
inc_new_counter_info!("banking_stage-rebuffered_packets", vers.len() - *offset);
|
||||
unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned()));
|
||||
continue;
|
||||
}
|
||||
@ -145,7 +148,12 @@ impl BankingStage {
|
||||
|
||||
let (processed, verified_txs, verified_indexes) =
|
||||
Self::process_received_packets(&bank, &poh_recorder, &msgs, &vers, *offset)?;
|
||||
|
||||
inc_new_counter_info!("banking_stage-consumed_buffered_packets", processed);
|
||||
inc_new_counter_info!("banking_stage-process_transactions", processed);
|
||||
inc_new_counter_info!(
|
||||
"banking_stage-rebuffered_packets",
|
||||
verified_txs.len() - processed
|
||||
);
|
||||
if processed < verified_txs.len() {
|
||||
bank_shutdown = true;
|
||||
// Collect any unprocessed transactions in this batch for forwarding
|
||||
@ -221,14 +229,14 @@ impl BankingStage {
|
||||
|
||||
// Buffer the packets if I am the next leader
|
||||
// or, if it was getting sent to me
|
||||
// or, the next leader is unknown
|
||||
let leader_id = match poh_recorder.lock().unwrap().bank() {
|
||||
Some(bank) => {
|
||||
leader_schedule_utils::slot_leader_at(bank.slot() + 1, &bank).unwrap_or_default()
|
||||
}
|
||||
None => rcluster_info
|
||||
.leader_data()
|
||||
.map(|x| x.id)
|
||||
.unwrap_or_default(),
|
||||
.map_or(rcluster_info.id(), |x| x.id),
|
||||
};
|
||||
|
||||
leader_id == rcluster_info.id()
|
||||
@ -269,6 +277,13 @@ impl BankingStage {
|
||||
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
|
||||
Ok(unprocessed_packets) => {
|
||||
if Self::should_buffer_packets(poh_recorder, cluster_info) {
|
||||
let num = unprocessed_packets
|
||||
.iter()
|
||||
.map(|(x, start, _)| {
|
||||
x.read().unwrap().packets.len().saturating_sub(*start)
|
||||
})
|
||||
.sum();
|
||||
inc_new_counter_info!("banking_stage-buffered_packets", num);
|
||||
buffered_packets.extend_from_slice(&unprocessed_packets);
|
||||
continue;
|
||||
}
|
||||
@ -426,6 +441,7 @@ impl BankingStage {
|
||||
poh,
|
||||
);
|
||||
trace!("process_transcations: {:?}", result);
|
||||
chunk_start = chunk_end;
|
||||
if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result {
|
||||
info!(
|
||||
"process transactions: max height reached slot: {} height: {}",
|
||||
@ -435,7 +451,6 @@ impl BankingStage {
|
||||
break;
|
||||
}
|
||||
result?;
|
||||
chunk_start = chunk_end;
|
||||
}
|
||||
Ok(chunk_start)
|
||||
}
|
||||
|
Reference in New Issue
Block a user