diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index a0a2f044a6..16a4f5054c 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1383,7 +1383,7 @@ impl ClusterInfo { /// We need to avoid having obj locked while doing a io, such as the `send_to` pub fn retransmit_to( peers: &[&ContactInfo], - packet: &mut Packet, + packet: &Packet, s: &UdpSocket, forwarded: bool, ) -> Result<()> { diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 9f9ba13855..fd2c76c5bd 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -64,6 +64,7 @@ struct RetransmitStats { retransmit_total: AtomicU64, last_ts: AtomicU64, compute_turbine_peers_total: AtomicU64, + retransmit_tree_mismatch: AtomicU64, packets_by_slot: Mutex>, packets_by_source: Mutex>, } @@ -82,6 +83,7 @@ fn update_retransmit_stats( packets_by_source: HashMap, epoch_fetch: u64, epoch_cach_update: u64, + retransmit_tree_mismatch: u64, ) { stats.total_time.fetch_add(total_time, Ordering::Relaxed); stats @@ -104,6 +106,9 @@ fn update_retransmit_stats( stats .epoch_cache_update .fetch_add(epoch_cach_update, Ordering::Relaxed); + stats + .retransmit_tree_mismatch + .fetch_add(retransmit_tree_mismatch, Ordering::Relaxed); { let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap(); for (slot, count) in packets_by_slot { @@ -156,6 +161,11 @@ fn update_retransmit_stats( stats.retransmit_total.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "retransmit_tree_mismatch", + stats.retransmit_tree_mismatch.swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "compute_turbine", stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64, @@ -314,15 +324,15 @@ fn retransmit( first_shreds_received: &Mutex>, rpc_subscriptions: &Option>, ) -> Result<()> { - let timer = Duration::new(1, 0); + const RECV_TIMEOUT: Duration = Duration::from_secs(1); let r_lock = r.lock().unwrap(); - let packets = r_lock.recv_timeout(timer)?; + let packets = r_lock.recv_timeout(RECV_TIMEOUT)?; let mut timer_start = Measure::start("retransmit"); let mut total_packets = packets.packets.len(); - let mut packet_v = vec![packets]; + let mut packets = vec![packets]; while let Ok(nq) = r_lock.try_recv() { total_packets += nq.packets.len(); - packet_v.push(nq); + packets.push(nq); if total_packets >= MAX_PACKET_BATCH_SIZE { break; } @@ -367,91 +377,97 @@ fn retransmit( let mut repair_total = 0; let mut retransmit_total = 0; let mut compute_turbine_peers_total = 0; + let mut retransmit_tree_mismatch = 0; let mut packets_by_slot: HashMap = HashMap::new(); let mut packets_by_source: HashMap = HashMap::new(); let mut max_slot = 0; - for mut packets in packet_v { - for packet in packets.packets.iter_mut() { - // skip discarded packets and repair packets - if packet.meta.discard { - total_packets -= 1; - discard_total += 1; - continue; - } - if packet.meta.repair { - total_packets -= 1; - repair_total += 1; - continue; - } - let shred_slot = match check_if_already_received(packet, shreds_received) { - Some(slot) => slot, - None => continue, - }; - max_slot = max_slot.max(shred_slot); - - if let Some(rpc_subscriptions) = rpc_subscriptions { - if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) { - rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived { - slot: shred_slot, - timestamp: timestamp(), - }); - } - } - - let mut compute_turbine_peers = Measure::start("turbine_start"); - let stakes_and_index = get_retransmit_peers( - my_id, - shred_slot, - leader_schedule_cache, - r_bank.deref(), - r_epoch_stakes_cache.deref(), - ); - let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( - &my_id, - &r_epoch_stakes_cache.peers, - &stakes_and_index, - packet.meta.seed, - ); - peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); - // split off the indexes, we don't need the stakes anymore - let indexes: Vec<_> = shuffled_stakes_and_index - .into_iter() - .map(|(_, index)| index) - .collect(); - - let (neighbors, children) = - compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes); - let neighbors: Vec<_> = neighbors - .into_iter() - .filter_map(|index| { - let peer = &r_epoch_stakes_cache.peers[index]; - if peer.id == my_id { - None - } else { - Some(peer) - } - }) - .collect(); - let children: Vec<_> = children - .into_iter() - .map(|index| &r_epoch_stakes_cache.peers[index]) - .collect(); - compute_turbine_peers.stop(); - compute_turbine_peers_total += compute_turbine_peers.as_us(); - - *packets_by_slot.entry(packet.meta.slot).or_insert(0) += 1; - *packets_by_source - .entry(packet.meta.addr().to_string()) - .or_insert(0) += 1; - - let mut retransmit_time = Measure::start("retransmit_to"); - if !packet.meta.forward { - ClusterInfo::retransmit_to(&neighbors, packet, sock, true)?; - } - ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward)?; - retransmit_time.stop(); - retransmit_total += retransmit_time.as_us(); + for packet in packets.iter().flat_map(|p| p.packets.iter()) { + // skip discarded packets and repair packets + if packet.meta.discard { + total_packets -= 1; + discard_total += 1; + continue; } + if packet.meta.repair { + total_packets -= 1; + repair_total += 1; + continue; + } + let shred_slot = match check_if_already_received(packet, shreds_received) { + Some(slot) => slot, + None => continue, + }; + max_slot = max_slot.max(shred_slot); + + if let Some(rpc_subscriptions) = rpc_subscriptions { + if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) { + rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived { + slot: shred_slot, + timestamp: timestamp(), + }); + } + } + + let mut compute_turbine_peers = Measure::start("turbine_start"); + let stakes_and_index = get_retransmit_peers( + my_id, + shred_slot, + leader_schedule_cache, + r_bank.deref(), + r_epoch_stakes_cache.deref(), + ); + let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( + &my_id, + &r_epoch_stakes_cache.peers, + &stakes_and_index, + packet.meta.seed, + ); + // If the node is on the critical path (i.e. the first node in each + // neighborhood), then we expect that the packet arrives at tvu socket + // as opposed to tvu-forwards. If this is not the case, then the + // turbine broadcast/retransmit tree is mismatched across nodes. + if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) { + retransmit_tree_mismatch += 1; + } + peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); + // split off the indexes, we don't need the stakes anymore + let indexes: Vec<_> = shuffled_stakes_and_index + .into_iter() + .map(|(_, index)| index) + .collect(); + debug_assert_eq!(my_id, r_epoch_stakes_cache.peers[indexes[my_index]].id); + + let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes); + let neighbors: Vec<_> = neighbors + .into_iter() + .filter_map(|index| { + let peer = &r_epoch_stakes_cache.peers[index]; + if peer.id == my_id { + None + } else { + Some(peer) + } + }) + .collect(); + let children: Vec<_> = children + .into_iter() + .map(|index| &r_epoch_stakes_cache.peers[index]) + .collect(); + compute_turbine_peers.stop(); + compute_turbine_peers_total += compute_turbine_peers.as_us(); + + *packets_by_slot.entry(packet.meta.slot).or_default() += 1; + *packets_by_source + .entry(packet.meta.addr().to_string()) + .or_default() += 1; + + let mut retransmit_time = Measure::start("retransmit_to"); + if !packet.meta.forward { + ClusterInfo::retransmit_to(&neighbors, packet, sock, true)?; + } + ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward)?; + retransmit_time.stop(); + retransmit_total += retransmit_time.as_us(); } max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed); timer_start.stop(); @@ -475,6 +491,7 @@ fn retransmit( packets_by_source, epoch_fetch.as_us(), epoch_cache_update.as_us(), + retransmit_tree_mismatch, ); Ok(()) diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index ad036109ad..b66b034cb8 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -204,7 +204,7 @@ pub fn cluster_info_retransmit() { p.meta.size = 10; let peers = c1.tvu_peers(); let retransmit_peers: Vec<_> = peers.iter().collect(); - ClusterInfo::retransmit_to(&retransmit_peers, &mut p, &tn1, false).unwrap(); + ClusterInfo::retransmit_to(&retransmit_peers, &p, &tn1, false).unwrap(); let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter() .map(|s| {