diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index e47c81f75f..75d54a8f1d 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1383,7 +1383,7 @@ impl ClusterInfo { /// We need to avoid having obj locked while doing a io, such as the `send_to` pub fn retransmit_to( peers: &[&ContactInfo], - packet: &mut Packet, + packet: &Packet, s: &UdpSocket, forwarded: bool, ) -> Result<()> { diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index f7716c2370..8f09cbdbec 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -65,6 +65,7 @@ struct RetransmitStats { retransmit_total: AtomicU64, last_ts: AtomicU64, compute_turbine_peers_total: AtomicU64, + retransmit_tree_mismatch: AtomicU64, packets_by_slot: Mutex>, packets_by_source: Mutex>, } @@ -83,6 +84,7 @@ fn update_retransmit_stats( packets_by_source: HashMap, epoch_fetch: u64, epoch_cach_update: u64, + retransmit_tree_mismatch: u64, ) { stats.total_time.fetch_add(total_time, Ordering::Relaxed); stats @@ -105,6 +107,9 @@ fn update_retransmit_stats( stats .epoch_cache_update .fetch_add(epoch_cach_update, Ordering::Relaxed); + stats + .retransmit_tree_mismatch + .fetch_add(retransmit_tree_mismatch, Ordering::Relaxed); { let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap(); for (slot, count) in packets_by_slot { @@ -157,6 +162,11 @@ fn update_retransmit_stats( stats.retransmit_total.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "retransmit_tree_mismatch", + stats.retransmit_tree_mismatch.swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "compute_turbine", stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64, @@ -315,9 +325,9 @@ fn retransmit( first_shreds_received: &Mutex>, rpc_subscriptions: &Option>, ) -> Result<()> { - let timer = Duration::new(1, 0); + const RECV_TIMEOUT: Duration = Duration::from_secs(1); let r_lock = r.lock().unwrap(); - let packets = r_lock.recv_timeout(timer)?; + let packets = r_lock.recv_timeout(RECV_TIMEOUT)?; let mut timer_start = Measure::start("retransmit"); let mut total_packets = packets.packets.len(); let mut packet_v = vec![packets]; @@ -368,11 +378,12 @@ fn retransmit( let mut repair_total = 0; let mut retransmit_total = 0; let mut compute_turbine_peers_total = 0; + let mut retransmit_tree_mismatch = 0; let mut packets_by_slot: HashMap = HashMap::new(); let mut packets_by_source: HashMap = HashMap::new(); let mut max_slot = 0; - for mut packets in packet_v { - for packet in packets.packets.iter_mut() { + for packets in packet_v { + for packet in packets.packets.iter() { // skip discarded packets and repair packets if packet.meta.discard { total_packets -= 1; @@ -413,6 +424,13 @@ fn retransmit( &stakes_and_index, packet.meta.seed, ); + // If the node is on the critical path (i.e. the first node in each + // neighborhood), then we expect that the packet arrives at tvu + // socket as opposed to tvu-forwards. If this is not the case, then + // the turbine broadcast/retransmit tree mismatch across nodes. + if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) { + retransmit_tree_mismatch += 1; + } peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); // split off the indexes, we don't need the stakes anymore let indexes: Vec<_> = shuffled_stakes_and_index @@ -476,6 +494,7 @@ fn retransmit( packets_by_source, epoch_fetch.as_us(), epoch_cache_update.as_us(), + retransmit_tree_mismatch, ); Ok(()) diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index ad036109ad..b66b034cb8 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -204,7 +204,7 @@ pub fn cluster_info_retransmit() { p.meta.size = 10; let peers = c1.tvu_peers(); let retransmit_peers: Vec<_> = peers.iter().collect(); - ClusterInfo::retransmit_to(&retransmit_peers, &mut p, &tn1, false).unwrap(); + ClusterInfo::retransmit_to(&retransmit_peers, &p, &tn1, false).unwrap(); let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter() .map(|s| {