Pacify clippy
This commit is contained in:
@ -490,79 +490,82 @@ impl BankingStage {
|
||||
let mut proc_start = Measure::start("consume_buffered_process");
|
||||
let mut reached_end_of_slot = None;
|
||||
|
||||
RetainMut::retain_mut(buffered_packet_batches, |buffered_packet_batch_and_offsets| {
|
||||
let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) =
|
||||
buffered_packet_batch_and_offsets;
|
||||
if let Some((next_leader, bank)) = &reached_end_of_slot {
|
||||
// We've hit the end of this slot, no need to perform more processing,
|
||||
// just filter the remaining packets for the invalid (e.g. too old) ones
|
||||
let new_unprocessed_indexes = Self::filter_unprocessed_packets(
|
||||
bank,
|
||||
packet_batch,
|
||||
original_unprocessed_indexes,
|
||||
my_pubkey,
|
||||
*next_leader,
|
||||
banking_stage_stats,
|
||||
);
|
||||
Self::update_buffered_packets_with_new_unprocessed(
|
||||
original_unprocessed_indexes,
|
||||
new_unprocessed_indexes,
|
||||
)
|
||||
} else {
|
||||
let bank_start = poh_recorder.lock().unwrap().bank_start();
|
||||
if let Some(BankStart {
|
||||
working_bank,
|
||||
bank_creation_time,
|
||||
}) = bank_start
|
||||
{
|
||||
let (processed, verified_txs_len, new_unprocessed_indexes) =
|
||||
Self::process_packets_transactions(
|
||||
&working_bank,
|
||||
&bank_creation_time,
|
||||
recorder,
|
||||
packet_batch,
|
||||
original_unprocessed_indexes.to_owned(),
|
||||
transaction_status_sender.clone(),
|
||||
gossip_vote_sender,
|
||||
banking_stage_stats,
|
||||
qos_service,
|
||||
);
|
||||
if processed < verified_txs_len
|
||||
|| !Bank::should_bank_still_be_processing_txs(
|
||||
&bank_creation_time,
|
||||
max_tx_ingestion_ns,
|
||||
)
|
||||
{
|
||||
reached_end_of_slot = Some((
|
||||
poh_recorder.lock().unwrap().next_slot_leader(),
|
||||
working_bank,
|
||||
));
|
||||
}
|
||||
new_tx_count += processed;
|
||||
|
||||
// Out of the buffered packets just retried, collect any still unprocessed
|
||||
// transactions in this batch for forwarding
|
||||
rebuffered_packet_count += new_unprocessed_indexes.len();
|
||||
let has_more_unprocessed_transactions =
|
||||
Self::update_buffered_packets_with_new_unprocessed(
|
||||
original_unprocessed_indexes,
|
||||
new_unprocessed_indexes,
|
||||
);
|
||||
if let Some(test_fn) = &test_fn {
|
||||
test_fn();
|
||||
}
|
||||
has_more_unprocessed_transactions
|
||||
RetainMut::retain_mut(
|
||||
buffered_packet_batches,
|
||||
|buffered_packet_batch_and_offsets| {
|
||||
let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) =
|
||||
buffered_packet_batch_and_offsets;
|
||||
if let Some((next_leader, bank)) = &reached_end_of_slot {
|
||||
// We've hit the end of this slot, no need to perform more processing,
|
||||
// just filter the remaining packets for the invalid (e.g. too old) ones
|
||||
let new_unprocessed_indexes = Self::filter_unprocessed_packets(
|
||||
bank,
|
||||
packet_batch,
|
||||
original_unprocessed_indexes,
|
||||
my_pubkey,
|
||||
*next_leader,
|
||||
banking_stage_stats,
|
||||
);
|
||||
Self::update_buffered_packets_with_new_unprocessed(
|
||||
original_unprocessed_indexes,
|
||||
new_unprocessed_indexes,
|
||||
)
|
||||
} else {
|
||||
rebuffered_packet_count += original_unprocessed_indexes.len();
|
||||
// `original_unprocessed_indexes` must have remaining packets to process
|
||||
// if not yet processed.
|
||||
assert!(Self::packet_has_more_unprocessed_transactions(
|
||||
original_unprocessed_indexes
|
||||
));
|
||||
true
|
||||
let bank_start = poh_recorder.lock().unwrap().bank_start();
|
||||
if let Some(BankStart {
|
||||
working_bank,
|
||||
bank_creation_time,
|
||||
}) = bank_start
|
||||
{
|
||||
let (processed, verified_txs_len, new_unprocessed_indexes) =
|
||||
Self::process_packets_transactions(
|
||||
&working_bank,
|
||||
&bank_creation_time,
|
||||
recorder,
|
||||
packet_batch,
|
||||
original_unprocessed_indexes.to_owned(),
|
||||
transaction_status_sender.clone(),
|
||||
gossip_vote_sender,
|
||||
banking_stage_stats,
|
||||
qos_service,
|
||||
);
|
||||
if processed < verified_txs_len
|
||||
|| !Bank::should_bank_still_be_processing_txs(
|
||||
&bank_creation_time,
|
||||
max_tx_ingestion_ns,
|
||||
)
|
||||
{
|
||||
reached_end_of_slot = Some((
|
||||
poh_recorder.lock().unwrap().next_slot_leader(),
|
||||
working_bank,
|
||||
));
|
||||
}
|
||||
new_tx_count += processed;
|
||||
|
||||
// Out of the buffered packets just retried, collect any still unprocessed
|
||||
// transactions in this batch for forwarding
|
||||
rebuffered_packet_count += new_unprocessed_indexes.len();
|
||||
let has_more_unprocessed_transactions =
|
||||
Self::update_buffered_packets_with_new_unprocessed(
|
||||
original_unprocessed_indexes,
|
||||
new_unprocessed_indexes,
|
||||
);
|
||||
if let Some(test_fn) = &test_fn {
|
||||
test_fn();
|
||||
}
|
||||
has_more_unprocessed_transactions
|
||||
} else {
|
||||
rebuffered_packet_count += original_unprocessed_indexes.len();
|
||||
// `original_unprocessed_indexes` must have remaining packets to process
|
||||
// if not yet processed.
|
||||
assert!(Self::packet_has_more_unprocessed_transactions(
|
||||
original_unprocessed_indexes
|
||||
));
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
proc_start.stop();
|
||||
|
||||
|
@ -4327,9 +4327,9 @@ pub mod tests {
|
||||
for i in 0..std::cmp::max(new_vote_pubkeys.len(), new_node_pubkeys.len()) {
|
||||
propagated_stats.is_propagated = false;
|
||||
let len = std::cmp::min(i, new_vote_pubkeys.len());
|
||||
let mut voted_pubkeys = new_vote_pubkeys[..len].iter().copied().collect();
|
||||
let mut voted_pubkeys = new_vote_pubkeys[..len].to_vec();
|
||||
let len = std::cmp::min(i, new_node_pubkeys.len());
|
||||
let mut node_pubkeys = new_node_pubkeys[..len].iter().copied().collect();
|
||||
let mut node_pubkeys = new_node_pubkeys[..len].to_vec();
|
||||
let did_newly_reach_threshold =
|
||||
ReplayStage::update_slot_propagated_threshold_from_votes(
|
||||
&mut voted_pubkeys,
|
||||
|
Reference in New Issue
Block a user