(cherry picked from commit 0feac57cb0
)
Co-authored-by: sakridge <sakridge@gmail.com>
This commit is contained in:
@ -33,7 +33,7 @@ use solana_runtime::{
|
|||||||
vote_sender_types::{ReplayVoteReceiver, ReplayedVote},
|
vote_sender_types::{ReplayVoteReceiver, ReplayedVote},
|
||||||
};
|
};
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
clock::{Epoch, Slot, DEFAULT_MS_PER_SLOT},
|
clock::{Epoch, Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT},
|
||||||
epoch_schedule::EpochSchedule,
|
epoch_schedule::EpochSchedule,
|
||||||
hash::Hash,
|
hash::Hash,
|
||||||
pubkey::Pubkey,
|
pubkey::Pubkey,
|
||||||
@ -384,9 +384,14 @@ impl ClusterInfoVoteListener {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let would_be_leader = poh_recorder
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.would_be_leader(20 * DEFAULT_TICKS_PER_SLOT);
|
||||||
if let Err(e) = verified_vote_packets.receive_and_process_vote_packets(
|
if let Err(e) = verified_vote_packets.receive_and_process_vote_packets(
|
||||||
&verified_vote_label_packets_receiver,
|
&verified_vote_label_packets_receiver,
|
||||||
&mut update_version,
|
&mut update_version,
|
||||||
|
would_be_leader,
|
||||||
) {
|
) {
|
||||||
match e {
|
match e {
|
||||||
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => {
|
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => {
|
||||||
|
@ -15,17 +15,25 @@ impl VerifiedVotePackets {
|
|||||||
&mut self,
|
&mut self,
|
||||||
vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
|
vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
|
||||||
last_update_version: &mut u64,
|
last_update_version: &mut u64,
|
||||||
|
would_be_leader: bool,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::from_millis(200);
|
let timer = Duration::from_millis(200);
|
||||||
let vote_packets = vote_packets_receiver.recv_timeout(timer)?;
|
let vote_packets = vote_packets_receiver.recv_timeout(timer)?;
|
||||||
*last_update_version += 1;
|
*last_update_version += 1;
|
||||||
for (label, slot, packet) in vote_packets {
|
if would_be_leader {
|
||||||
self.0.insert(label, (*last_update_version, slot, packet));
|
|
||||||
}
|
|
||||||
while let Ok(vote_packets) = vote_packets_receiver.try_recv() {
|
|
||||||
for (label, slot, packet) in vote_packets {
|
for (label, slot, packet) in vote_packets {
|
||||||
self.0.insert(label, (*last_update_version, slot, packet));
|
self.0.insert(label, (*last_update_version, slot, packet));
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
self.0.clear();
|
||||||
|
self.0.shrink_to_fit();
|
||||||
|
}
|
||||||
|
while let Ok(vote_packets) = vote_packets_receiver.try_recv() {
|
||||||
|
if would_be_leader {
|
||||||
|
for (label, slot, packet) in vote_packets {
|
||||||
|
self.0.insert(label, (*last_update_version, slot, packet));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -137,7 +145,7 @@ mod tests {
|
|||||||
s.send(vec![(label1.clone(), 42, later_packets)]).unwrap();
|
s.send(vec![(label1.clone(), 42, later_packets)]).unwrap();
|
||||||
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
|
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
|
||||||
verified_vote_packets
|
verified_vote_packets
|
||||||
.receive_and_process_vote_packets(&r, &mut update_version)
|
.receive_and_process_vote_packets(&r, &mut update_version, true)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Test timestamps for same batch are the same
|
// Test timestamps for same batch are the same
|
||||||
@ -171,7 +179,7 @@ mod tests {
|
|||||||
s.send(vec![(label2.clone(), 51, Packets::default())])
|
s.send(vec![(label2.clone(), 51, Packets::default())])
|
||||||
.unwrap();
|
.unwrap();
|
||||||
verified_vote_packets
|
verified_vote_packets
|
||||||
.receive_and_process_vote_packets(&r, &mut update_version)
|
.receive_and_process_vote_packets(&r, &mut update_version, true)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let update_version2 = verified_vote_packets.get_vote_packets(&label2).unwrap().0;
|
let update_version2 = verified_vote_packets.get_vote_packets(&label2).unwrap().0;
|
||||||
assert!(update_version2 > update_version1);
|
assert!(update_version2 > update_version1);
|
||||||
@ -179,7 +187,7 @@ mod tests {
|
|||||||
// Test empty doesn't bump the version
|
// Test empty doesn't bump the version
|
||||||
let before = update_version;
|
let before = update_version;
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
verified_vote_packets.receive_and_process_vote_packets(&r, &mut update_version),
|
verified_vote_packets.receive_and_process_vote_packets(&r, &mut update_version, true),
|
||||||
Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout))
|
Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout))
|
||||||
);
|
);
|
||||||
assert_eq!(before, update_version);
|
assert_eq!(before, update_version);
|
||||||
|
Reference in New Issue
Block a user