Fix fannout gossip bench (bp #10509) (#10555)

* Fix fannout gossip bench (#10509)

* Gossip benchmark

* Rayon tweaking

* push pulls

* fanout to max nodes

* fixup! fanout to max nodes

* fixup! fixup! fanout to max nodes

* update

* multi vote test

* fixup prune

* fast propagation

* fixups

* compute up to 95%

* test for specific tx

* stats

* stats

* fixed tests

* rename

* track a lagging view of which nodes have the local node in their active set in the local received_cache

* test fixups

* dups are old now

* dont prune your own origin

* send vote to tpu

* tests

* fixed tests

* fixed test

* update

* ignore scale

* lint

* fixup

* fixup

* fixup

* cleanup

Co-authored-by: Stephen Akridge <sakridge@gmail.com>
(cherry picked from commit ba83e4ca50)

* Merge fixes

Co-authored-by: anatoly yakovenko <anatoly@solana.com>
This commit is contained in:
mergify[bot]
2020-06-14 06:28:43 -07:00
committed by GitHub
parent 5c5207b7c4
commit 79b1d49e42
8 changed files with 128 additions and 72 deletions

View File

@ -51,7 +51,7 @@ type PacketsAndOffsets = (Packets, Vec<usize>);
pub type UnprocessedPackets = Vec<PacketsAndOffsets>; pub type UnprocessedPackets = Vec<PacketsAndOffsets>;
/// Transaction forwarding /// Transaction forwarding
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 4; pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 1;
// Fixed thread size seems to be fastest on GCP setup // Fixed thread size seems to be fastest on GCP setup
pub const NUM_THREADS: u32 = 4; pub const NUM_THREADS: u32 = 4;

View File

@ -254,6 +254,7 @@ pub struct ClusterInfo {
my_contact_info: RwLock<ContactInfo>, my_contact_info: RwLock<ContactInfo>,
id: Pubkey, id: Pubkey,
stats: GossipStats, stats: GossipStats,
socket: UdpSocket,
} }
impl Default for ClusterInfo { impl Default for ClusterInfo {
@ -409,6 +410,7 @@ impl ClusterInfo {
my_contact_info: RwLock::new(contact_info), my_contact_info: RwLock::new(contact_info),
id, id,
stats: GossipStats::default(), stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
}; };
{ {
let mut gossip = me.gossip.write().unwrap(); let mut gossip = me.gossip.write().unwrap();
@ -434,6 +436,7 @@ impl ClusterInfo {
my_contact_info: RwLock::new(my_contact_info), my_contact_info: RwLock::new(my_contact_info),
id: *new_id, id: *new_id,
stats: GossipStats::default(), stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
} }
} }
@ -747,6 +750,13 @@ impl ClusterInfo {
.process_push_message(&self.id(), vec![entry], now); .process_push_message(&self.id(), vec![entry], now);
} }
pub fn send_vote(&self, vote: &Transaction) -> Result<()> {
let tpu = self.my_contact_info().tpu;
let buf = serialize(vote)?;
self.socket.send_to(&buf, &tpu)?;
Ok(())
}
/// Get votes in the crds /// Get votes in the crds
/// * since - The timestamp of when the vote inserted must be greater than /// * since - The timestamp of when the vote inserted must be greater than
/// since. This allows the bank to query for new votes only. /// since. This allows the bank to query for new votes only.

View File

@ -36,6 +36,7 @@ use std::collections::HashMap;
pub struct Crds { pub struct Crds {
/// Stores the map of labels and values /// Stores the map of labels and values
pub table: IndexMap<CrdsValueLabel, VersionedCrdsValue>, pub table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
pub num_inserts: usize,
} }
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]
@ -84,6 +85,7 @@ impl Default for Crds {
fn default() -> Self { fn default() -> Self {
Crds { Crds {
table: IndexMap::new(), table: IndexMap::new(),
num_inserts: 0,
} }
} }
} }
@ -125,6 +127,7 @@ impl Crds {
.unwrap_or(true); .unwrap_or(true);
if do_insert { if do_insert {
let old = self.table.insert(label, new_value); let old = self.table.insert(label, new_value);
self.num_inserts += 1;
Ok(old) Ok(old)
} else { } else {
trace!("INSERT FAILED data: {} new.wallclock: {}", label, wallclock,); trace!("INSERT FAILED data: {} new.wallclock: {}", label, wallclock,);

View File

@ -76,17 +76,10 @@ impl CrdsGossip {
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
) -> HashMap<Pubkey, HashSet<Pubkey>> { ) -> HashMap<Pubkey, HashSet<Pubkey>> {
let id = &self.id; let id = &self.id;
let crds = &self.crds;
let push = &mut self.push; let push = &mut self.push;
let versioned = labels
.into_iter()
.filter_map(|label| crds.lookup_versioned(&label));
let mut prune_map: HashMap<Pubkey, HashSet<_>> = HashMap::new(); let mut prune_map: HashMap<Pubkey, HashSet<_>> = HashMap::new();
for val in versioned { for origin in labels.iter().map(|k| k.pubkey()) {
let origin = val.value.pubkey(); let peers = push.prune_received_cache(id, &origin, stakes);
let hash = val.value_hash;
let peers = push.prune_received_cache(id, &origin, hash, stakes);
for from in peers { for from in peers {
prune_map.entry(from).or_default().insert(origin); prune_map.entry(from).or_default().insert(origin);
} }
@ -113,7 +106,7 @@ impl CrdsGossip {
return Err(CrdsGossipError::PruneMessageTimeout); return Err(CrdsGossipError::PruneMessageTimeout);
} }
if self.id == *destination { if self.id == *destination {
self.push.process_prune_msg(peer, origin); self.push.process_prune_msg(&self.id, peer, origin);
Ok(()) Ok(())
} else { } else {
Err(CrdsGossipError::BadPruneDestination) Err(CrdsGossipError::BadPruneDestination)
@ -190,14 +183,15 @@ impl CrdsGossip {
now: u64, now: u64,
process_pull_stats: &mut ProcessPullStats, process_pull_stats: &mut ProcessPullStats,
) { ) {
self.pull.process_pull_responses( let success = self.pull.process_pull_responses(
&mut self.crds, &mut self.crds,
from, from,
responses, responses,
responses_expired_timeout, responses_expired_timeout,
now, now,
process_pull_stats, process_pull_stats,
) );
self.push.push_pull_responses(success, now);
} }
pub fn make_timeouts_test(&self) -> HashMap<Pubkey, u64> { pub fn make_timeouts_test(&self) -> HashMap<Pubkey, u64> {

View File

@ -2,7 +2,6 @@
pub enum CrdsGossipError { pub enum CrdsGossipError {
NoPeers, NoPeers,
PushMessageTimeout, PushMessageTimeout,
PushMessageAlreadyReceived,
PushMessageOldVersion, PushMessageOldVersion,
BadPruneDestination, BadPruneDestination,
PruneMessageTimeout, PruneMessageTimeout,

View File

@ -134,6 +134,7 @@ pub struct CrdsGossipPull {
purged_values: VecDeque<(Hash, u64)>, purged_values: VecDeque<(Hash, u64)>,
pub crds_timeout: u64, pub crds_timeout: u64,
pub msg_timeout: u64, pub msg_timeout: u64,
pub num_pulls: usize,
} }
impl Default for CrdsGossipPull { impl Default for CrdsGossipPull {
@ -143,6 +144,7 @@ impl Default for CrdsGossipPull {
pull_request_time: HashMap::new(), pull_request_time: HashMap::new(),
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
num_pulls: 0,
} }
} }
} }
@ -313,18 +315,24 @@ impl CrdsGossipPull {
responses_expired_timeout: Vec<VersionedCrdsValue>, responses_expired_timeout: Vec<VersionedCrdsValue>,
now: u64, now: u64,
stats: &mut ProcessPullStats, stats: &mut ProcessPullStats,
) { ) -> Vec<(CrdsValueLabel, Hash, u64)> {
let mut success = vec![];
let mut owners = HashSet::new(); let mut owners = HashSet::new();
for r in responses_expired_timeout { for r in responses_expired_timeout {
stats.failed_insert += crds.insert_versioned(r).is_err() as usize; stats.failed_insert += crds.insert_versioned(r).is_err() as usize;
} }
for r in responses { for r in responses {
let owner = r.value.label().pubkey(); let owner = r.value.label().pubkey();
let label = r.value.label();
let wc = r.value.wallclock();
let hash = r.value_hash;
let old = crds.insert_versioned(r); let old = crds.insert_versioned(r);
if old.is_err() { if old.is_err() {
stats.failed_insert += 1; stats.failed_insert += 1;
} else { } else {
stats.success += 1; stats.success += 1;
self.num_pulls += 1;
success.push((label, hash, wc));
} }
old.ok().map(|opt| { old.ok().map(|opt| {
owners.insert(owner); owners.insert(owner);
@ -338,6 +346,7 @@ impl CrdsGossipPull {
for owner in owners { for owner in owners {
crds.update_record_timestamp(&owner, now); crds.update_record_timestamp(&owner, now);
} }
success
} }
// build a set of filters of the current crds table // build a set of filters of the current crds table
// num_filters - used to increase the likelyhood of a value in crds being added to some filter // num_filters - used to increase the likelyhood of a value in crds being added to some filter

View File

@ -35,6 +35,7 @@ pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000; pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000;
pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500; pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15; pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2;
#[derive(Clone)] #[derive(Clone)]
pub struct CrdsGossipPush { pub struct CrdsGossipPush {
@ -44,12 +45,18 @@ pub struct CrdsGossipPush {
active_set: IndexMap<Pubkey, Bloom<Pubkey>>, active_set: IndexMap<Pubkey, Bloom<Pubkey>>,
/// push message queue /// push message queue
push_messages: HashMap<CrdsValueLabel, Hash>, push_messages: HashMap<CrdsValueLabel, Hash>,
/// cache that tracks which validators a message was received from /// Cache that tracks which validators a message was received from
received_cache: HashMap<Hash, (u64, HashSet<Pubkey>)>, /// bool indicates it has been pruned.
/// This cache represents a lagging view of which validators
/// currently have this node in their `active_set`
received_cache: HashMap<Pubkey, HashMap<Pubkey, (bool, u64)>>,
pub num_active: usize, pub num_active: usize,
pub push_fanout: usize, pub push_fanout: usize,
pub msg_timeout: u64, pub msg_timeout: u64,
pub prune_timeout: u64, pub prune_timeout: u64,
pub num_total: usize,
pub num_old: usize,
pub num_pushes: usize,
} }
impl Default for CrdsGossipPush { impl Default for CrdsGossipPush {
@ -64,6 +71,9 @@ impl Default for CrdsGossipPush {
push_fanout: CRDS_GOSSIP_PUSH_FANOUT, push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS, prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS,
num_total: 0,
num_old: 0,
num_pushes: 0,
} }
} }
} }
@ -81,18 +91,21 @@ impl CrdsGossipPush {
&mut self, &mut self,
self_pubkey: &Pubkey, self_pubkey: &Pubkey,
origin: &Pubkey, origin: &Pubkey,
hash: Hash,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
) -> Vec<Pubkey> { ) -> Vec<Pubkey> {
let origin_stake = stakes.get(origin).unwrap_or(&0); let origin_stake = stakes.get(origin).unwrap_or(&0);
let self_stake = stakes.get(self_pubkey).unwrap_or(&0); let self_stake = stakes.get(self_pubkey).unwrap_or(&0);
let cache = self.received_cache.get(&hash); let cache = self.received_cache.get(origin);
if cache.is_none() { if cache.is_none() {
return Vec::new(); return Vec::new();
} }
let peers = cache.unwrap();
let peers = &cache.unwrap().1; let peer_stake_total: u64 = peers
let peer_stake_total: u64 = peers.iter().map(|p| stakes.get(p).unwrap_or(&0)).sum(); .iter()
.filter(|v| !(v.1).0)
.map(|v| stakes.get(v.0).unwrap_or(&0))
.sum();
let prune_stake_threshold = Self::prune_stake_threshold(*self_stake, *origin_stake); let prune_stake_threshold = Self::prune_stake_threshold(*self_stake, *origin_stake);
if peer_stake_total < prune_stake_threshold { if peer_stake_total < prune_stake_threshold {
return Vec::new(); return Vec::new();
@ -100,7 +113,8 @@ impl CrdsGossipPush {
let staked_peers: Vec<(Pubkey, u64)> = peers let staked_peers: Vec<(Pubkey, u64)> = peers
.iter() .iter()
.filter_map(|p| stakes.get(p).map(|s| (*p, *s))) .filter(|v| !(v.1).0)
.filter_map(|p| stakes.get(p.0).map(|s| (*p.0, *s)))
.filter(|(_, s)| *s > 0) .filter(|(_, s)| *s > 0)
.collect(); .collect();
@ -117,16 +131,27 @@ impl CrdsGossipPush {
let (next_peer, next_stake) = staked_peers[next]; let (next_peer, next_stake) = staked_peers[next];
keep.insert(next_peer); keep.insert(next_peer);
peer_stake_sum += next_stake; peer_stake_sum += next_stake;
if peer_stake_sum >= prune_stake_threshold { if peer_stake_sum >= prune_stake_threshold
&& keep.len() >= CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES
{
break; break;
} }
} }
peers let pruned_peers: Vec<Pubkey> = peers
.iter() .keys()
.filter(|p| !keep.contains(p)) .filter(|p| !keep.contains(p))
.cloned() .cloned()
.collect() .collect();
pruned_peers.iter().for_each(|p| {
self.received_cache
.get_mut(origin)
.unwrap()
.get_mut(p)
.unwrap()
.0 = true;
});
pruned_peers
} }
/// process a push message to the network /// process a push message to the network
@ -137,6 +162,7 @@ impl CrdsGossipPush {
value: CrdsValue, value: CrdsValue,
now: u64, now: u64,
) -> Result<Option<VersionedCrdsValue>, CrdsGossipError> { ) -> Result<Option<VersionedCrdsValue>, CrdsGossipError> {
self.num_total += 1;
if now if now
> value > value
.wallclock() .wallclock()
@ -149,21 +175,32 @@ impl CrdsGossipPush {
return Err(CrdsGossipError::PushMessageTimeout); return Err(CrdsGossipError::PushMessageTimeout);
} }
let label = value.label(); let label = value.label();
let origin = label.pubkey();
let new_value = crds.new_versioned(now, value); let new_value = crds.new_versioned(now, value);
let value_hash = new_value.value_hash; let value_hash = new_value.value_hash;
if let Some((_, ref mut received_set)) = self.received_cache.get_mut(&value_hash) { let received_set = self
received_set.insert(from.clone()); .received_cache
return Err(CrdsGossipError::PushMessageAlreadyReceived); .entry(origin)
} .or_insert_with(HashMap::new);
received_set.entry(*from).or_insert((false, 0)).1 = now;
let old = crds.insert_versioned(new_value); let old = crds.insert_versioned(new_value);
if old.is_err() { if old.is_err() {
self.num_old += 1;
return Err(CrdsGossipError::PushMessageOldVersion); return Err(CrdsGossipError::PushMessageOldVersion);
} }
let mut received_set = HashSet::new();
received_set.insert(from.clone());
self.push_messages.insert(label, value_hash); self.push_messages.insert(label, value_hash);
self.received_cache.insert(value_hash, (now, received_set)); Ok(old.unwrap())
Ok(old.ok().and_then(|opt| opt)) }
/// push pull responses
pub fn push_pull_responses(&mut self, values: Vec<(CrdsValueLabel, Hash, u64)>, now: u64) {
for (label, value_hash, wc) in values {
if now > wc.checked_add(self.msg_timeout).unwrap_or_else(|| 0) {
continue;
}
self.push_messages.insert(label, value_hash);
}
} }
/// New push message to broadcast to peers. /// New push message to broadcast to peers.
@ -172,18 +209,10 @@ impl CrdsGossipPush {
/// The list of push messages is created such that all the randomly selected peers have not /// The list of push messages is created such that all the randomly selected peers have not
/// pruned the source addresses. /// pruned the source addresses.
pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap<Pubkey, Vec<CrdsValue>> { pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap<Pubkey, Vec<CrdsValue>> {
let max = self.active_set.len();
let mut nodes: Vec<_> = (0..max).collect();
nodes.shuffle(&mut rand::thread_rng());
let peers: Vec<Pubkey> = nodes
.into_iter()
.filter_map(|n| self.active_set.get_index(n))
.take(self.push_fanout)
.map(|n| *n.0)
.collect();
let mut total_bytes: usize = 0; let mut total_bytes: usize = 0;
let mut values = vec![]; let mut values = vec![];
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new(); let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
trace!("new_push_messages {}", self.push_messages.len());
for (label, hash) in &self.push_messages { for (label, hash) in &self.push_messages {
let res = crds.lookup_versioned(label); let res = crds.lookup_versioned(label);
if res.is_none() { if res.is_none() {
@ -203,21 +232,37 @@ impl CrdsGossipPush {
} }
values.push(value.clone()); values.push(value.clone());
} }
trace!(
"new_push_messages {} {}",
values.len(),
self.active_set.len()
);
for v in values { for v in values {
for p in peers.iter() { //use a consistent index for the same origin so
let filter = self.active_set.get_mut(p); //the active set learns the MST for that origin
if filter.is_some() && !filter.unwrap().contains(&v.label().pubkey()) { let start = v.label().pubkey().as_ref()[0] as usize;
push_messages.entry(*p).or_default().push(v.clone()); let max = self.push_fanout.min(self.active_set.len());
for i in start..(start + max) {
let ix = i % self.active_set.len();
if let Some((p, filter)) = self.active_set.get_index(ix) {
if !filter.contains(&v.label().pubkey()) {
trace!("new_push_messages insert {} {:?}", *p, v);
push_messages.entry(*p).or_default().push(v.clone());
self.num_pushes += 1;
}
} }
self.push_messages.remove(&v.label());
} }
self.push_messages.remove(&v.label());
} }
push_messages push_messages
} }
/// add the `from` to the peer's filter of nodes /// add the `from` to the peer's filter of nodes
pub fn process_prune_msg(&mut self, peer: &Pubkey, origins: &[Pubkey]) { pub fn process_prune_msg(&mut self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) {
for origin in origins { for origin in origins {
if origin == self_pubkey {
continue;
}
if let Some(p) = self.active_set.get_mut(peer) { if let Some(p) = self.active_set.get_mut(peer) {
p.add(origin) p.add(origin)
} }
@ -339,15 +384,11 @@ impl CrdsGossipPush {
/// purge received push message cache /// purge received push message cache
pub fn purge_old_received_cache(&mut self, min_time: u64) { pub fn purge_old_received_cache(&mut self, min_time: u64) {
let old_msgs: Vec<Hash> = self self.received_cache
.received_cache .iter_mut()
.iter() .for_each(|v| v.1.retain(|_, v| v.1 > min_time));
.filter_map(|(k, (rcvd_time, _))| if *rcvd_time < min_time { Some(k) } else { None })
.cloned() self.received_cache.retain(|_, v| !v.is_empty());
.collect();
for k in old_msgs {
self.received_cache.remove(&k);
}
} }
} }
@ -371,7 +412,6 @@ mod test {
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&origin, 0, &origin, 0,
))); )));
let label = value.label();
let low_staked_peers = (0..10).map(|_| Pubkey::new_rand()); let low_staked_peers = (0..10).map(|_| Pubkey::new_rand());
let mut low_staked_set = HashSet::new(); let mut low_staked_set = HashSet::new();
low_staked_peers.for_each(|p| { low_staked_peers.for_each(|p| {
@ -380,11 +420,7 @@ mod test {
stakes.insert(p, 1); stakes.insert(p, 1);
}); });
let versioned = crds let pruned = push.prune_received_cache(&self_id, &origin, &stakes);
.lookup_versioned(&label)
.expect("versioned value should exist");
let hash = versioned.value_hash;
let pruned = push.prune_received_cache(&self_id, &origin, hash, &stakes);
assert!( assert!(
pruned.is_empty(), pruned.is_empty(),
"should not prune if min threshold has not been reached" "should not prune if min threshold has not been reached"
@ -395,7 +431,7 @@ mod test {
stakes.insert(high_staked_peer, high_stake); stakes.insert(high_staked_peer, high_stake);
let _ = push.process_push_message(&mut crds, &high_staked_peer, value.clone(), 0); let _ = push.process_push_message(&mut crds, &high_staked_peer, value.clone(), 0);
let pruned = push.prune_received_cache(&self_id, &origin, hash, &stakes); let pruned = push.prune_received_cache(&self_id, &origin, &stakes);
assert!( assert!(
pruned.len() < low_staked_set.len() + 1, pruned.len() < low_staked_set.len() + 1,
"should not prune all peers" "should not prune all peers"
@ -409,7 +445,7 @@ mod test {
} }
#[test] #[test]
fn test_process_push() { fn test_process_push_one() {
let mut crds = Crds::default(); let mut crds = Crds::default();
let mut push = CrdsGossipPush::default(); let mut push = CrdsGossipPush::default();
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
@ -426,8 +462,8 @@ mod test {
// push it again // push it again
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
Err(CrdsGossipError::PushMessageAlreadyReceived) Err(CrdsGossipError::PushMessageOldVersion)
); );
} }
#[test] #[test]
@ -690,6 +726,7 @@ mod test {
#[test] #[test]
fn test_process_prune() { fn test_process_prune() {
let mut crds = Crds::default(); let mut crds = Crds::default();
let self_id = Pubkey::new_rand();
let mut push = CrdsGossipPush::default(); let mut push = CrdsGossipPush::default();
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(), &Pubkey::new_rand(),
@ -707,7 +744,11 @@ mod test {
push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 0), push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 0),
Ok(None) Ok(None)
); );
push.process_prune_msg(&peer.label().pubkey(), &[new_msg.label().pubkey()]); push.process_prune_msg(
&self_id,
&peer.label().pubkey(),
&[new_msg.label().pubkey()],
);
assert_eq!(push.new_push_messages(&crds, 0), expected); assert_eq!(push.new_push_messages(&crds, 0), expected);
} }
#[test] #[test]
@ -749,9 +790,9 @@ mod test {
assert_eq!(crds.lookup(&label), Some(&value)); assert_eq!(crds.lookup(&label), Some(&value));
// push it again // push it again
assert_eq!( assert_matches!(
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
Err(CrdsGossipError::PushMessageAlreadyReceived) Err(CrdsGossipError::PushMessageOldVersion)
); );
// purge the old pushed // purge the old pushed

View File

@ -758,7 +758,6 @@ impl ReplayStage {
progress.get_fork_stats(bank.slot()).unwrap().total_staked, progress.get_fork_stats(bank.slot()).unwrap().total_staked,
lockouts_sender, lockouts_sender,
); );
Self::push_vote( Self::push_vote(
cluster_info, cluster_info,
bank, bank,
@ -838,6 +837,7 @@ impl ReplayStage {
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash); vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash);
vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash); vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash);
let _ = cluster_info.send_vote(&vote_tx);
cluster_info.push_vote(tower_index, vote_tx); cluster_info.push_vote(tower_index, vote_tx);
} }