makes turbine peer computation consistent between broadcast and retransmit (#14910) (#16143)

get_broadcast_peers is using tvu_peers:
https://github.com/solana-labs/solana/blob/84e52b606/core/src/broadcast_stage.rs#L362-L370
which is potentially inconsistent with retransmit_peers:
https://github.com/solana-labs/solana/blob/84e52b606/core/src/cluster_info.rs#L1332-L1345

Also, the leader does not include its own contact-info when broadcasting
shreds:
https://github.com/solana-labs/solana/blob/84e52b606/core/src/cluster_info.rs#L1324
but on the retransmit side, slot leader is removed only _after_ neighbors and
children are computed:
https://github.com/solana-labs/solana/blob/84e52b606/core/src/retransmit_stage.rs#L383-L384
So the turbine broadcast tree is different between the two stages.

This commit:
* Removes retransmit_peers. Broadcast and retransmit stages will use tvu_peers
  consistently.
* Retransmit stage removes slot leader _before_ computing children and
  neighbors.

(cherry picked from commit 570fd3f810)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-03-26 00:16:48 +00:00
committed by GitHub
parent 86ce650661
commit 7475a6f444
7 changed files with 68 additions and 53 deletions

View File

@@ -252,7 +252,6 @@ struct GossipStats {
get_accounts_hash: Counter,
all_tvu_peers: Counter,
tvu_peers: Counter,
retransmit_peers: Counter,
repair_peers: Counter,
new_push_requests: Counter,
new_push_requests2: Counter,
@@ -1383,21 +1382,6 @@ impl ClusterInfo {
.collect()
}
/// all peers that have a valid tvu
pub fn retransmit_peers(&self) -> Vec<ContactInfo> {
self.time_gossip_read_lock("retransmit_peers", &self.stats.retransmit_peers)
.crds
.get_nodes_contact_info()
.filter(|x| {
x.id != self.id()
&& x.shred_version == self.my_shred_version()
&& ContactInfo::is_valid_address(&x.tvu)
&& ContactInfo::is_valid_address(&x.tvu_forwards)
})
.cloned()
.collect()
}
/// all tvu peers with valid gossip addrs that likely have the slot being requested
pub fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> {
let mut time = Measure::start("repair_peers");
@@ -1461,9 +1445,9 @@ impl ClusterInfo {
stakes_and_index: &[(u64, usize)],
seed: [u8; 32],
) -> Vec<(u64, usize)> {
let stake_weights = stakes_and_index.iter().map(|(w, _)| *w).collect();
let stake_weights: Vec<_> = stakes_and_index.iter().map(|(w, _)| *w).collect();
let shuffle = weighted_shuffle(stake_weights, seed);
let shuffle = weighted_shuffle(&stake_weights, seed);
shuffle.iter().map(|x| stakes_and_index[*x]).collect()
}
@@ -1473,7 +1457,7 @@ impl ClusterInfo {
&self,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
let mut peers = self.retransmit_peers();
let mut peers = self.tvu_peers();
// insert "self" into this list for the layer and neighborhood computation
peers.push(self.my_contact_info());
let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes);
@@ -1520,20 +1504,22 @@ impl ClusterInfo {
pub fn retransmit_to(
peers: &[&ContactInfo],
packet: &mut Packet,
slot_leader_pubkey: Option<Pubkey>,
s: &UdpSocket,
forwarded: bool,
) -> Result<()> {
trace!("retransmit orders {}", peers.len());
let dests: Vec<_> = peers
.iter()
.filter(|v| v.id != slot_leader_pubkey.unwrap_or_default())
.map(|v| if forwarded { &v.tvu_forwards } else { &v.tvu })
.collect();
let dests: Vec<_> = if forwarded {
peers
.iter()
.map(|peer| &peer.tvu_forwards)
.filter(|addr| ContactInfo::is_valid_address(addr))
.collect()
} else {
peers.iter().map(|peer| &peer.tvu).collect()
};
let mut sent = 0;
while sent < dests.len() {
match multicast(s, &mut packet.data[..packet.meta.size], &dests[sent..]) {
match multicast(s, &packet.data[..packet.meta.size], &dests[sent..]) {
Ok(n) => sent += n,
Err(e) => {
inc_new_counter_error!(
@@ -2902,7 +2888,6 @@ impl ClusterInfo {
self.stats.gossip_packets_dropped_count.clear(),
i64
),
("retransmit_peers", self.stats.retransmit_peers.clear(), i64),
("repair_peers", self.stats.repair_peers.clear(), i64),
(
"new_push_requests",