Use multicast to send retransmit packets (#6319)
This commit is contained in:
		| @@ -21,7 +21,8 @@ use crate::crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}; | |||||||
| use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote}; | use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote}; | ||||||
| use crate::packet::{to_shared_blob, Blob, Packet, SharedBlob}; | use crate::packet::{to_shared_blob, Blob, Packet, SharedBlob}; | ||||||
| use crate::repair_service::RepairType; | use crate::repair_service::RepairType; | ||||||
| use crate::result::Result; | use crate::result::{Error, Result}; | ||||||
|  | use crate::sendmmsg::multicast; | ||||||
| use crate::staking_utils; | use crate::staking_utils; | ||||||
| use crate::streamer::{BlobReceiver, BlobSender}; | use crate::streamer::{BlobReceiver, BlobSender}; | ||||||
| use crate::weighted_shuffle::{weighted_best, weighted_shuffle}; | use crate::weighted_shuffle::{weighted_best, weighted_shuffle}; | ||||||
| @@ -737,29 +738,33 @@ impl ClusterInfo { | |||||||
|     /// # Remarks |     /// # Remarks | ||||||
|     /// We need to avoid having obj locked while doing a io, such as the `send_to` |     /// We need to avoid having obj locked while doing a io, such as the `send_to` | ||||||
|     pub fn retransmit_to( |     pub fn retransmit_to( | ||||||
|         id: &Pubkey, |  | ||||||
|         peers: &[&ContactInfo], |         peers: &[&ContactInfo], | ||||||
|         packet: &Packet, |         packet: &mut Packet, | ||||||
|         slot_leader_pubkey: Option<Pubkey>, |         slot_leader_pubkey: Option<Pubkey>, | ||||||
|         s: &UdpSocket, |         s: &UdpSocket, | ||||||
|         forwarded: bool, |         forwarded: bool, | ||||||
|     ) -> Result<()> { |     ) -> Result<()> { | ||||||
|         trace!("retransmit orders {}", peers.len()); |         trace!("retransmit orders {}", peers.len()); | ||||||
|         let errs: Vec<_> = peers |         let dests: Vec<_> = peers | ||||||
|             .iter() |             .iter() | ||||||
|             .filter(|v| v.id != slot_leader_pubkey.unwrap_or_default()) |             .filter(|v| v.id != slot_leader_pubkey.unwrap_or_default()) | ||||||
|             .map(|v| { |             .map(|v| if forwarded { &v.tvu_forwards } else { &v.tvu }) | ||||||
|                 let dest = if forwarded { &v.tvu_forwards } else { &v.tvu }; |  | ||||||
|                 debug!("{}: retransmit packet to {} {}", id, v.id, *dest,); |  | ||||||
|                 s.send_to(&packet.data, dest) |  | ||||||
|             }) |  | ||||||
|             .collect(); |             .collect(); | ||||||
|         for e in errs { |  | ||||||
|             if let Err(e) = &e { |         let mut sent = 0; | ||||||
|                 inc_new_counter_error!("cluster_info-retransmit-send_to_error", 1, 1); |         while sent < dests.len() { | ||||||
|  |             match multicast(s, packet, &dests[sent..]) { | ||||||
|  |                 Ok(n) => sent += n, | ||||||
|  |                 Err(e) => { | ||||||
|  |                     inc_new_counter_error!( | ||||||
|  |                         "cluster_info-retransmit-send_to_error", | ||||||
|  |                         dests.len() - sent, | ||||||
|  |                         1 | ||||||
|  |                     ); | ||||||
|                     error!("retransmit result {:?}", e); |                     error!("retransmit result {:?}", e); | ||||||
|  |                     return Err(Error::IO(e)); | ||||||
|  |                 } | ||||||
|             } |             } | ||||||
|             e?; |  | ||||||
|         } |         } | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -67,8 +67,8 @@ fn retransmit( | |||||||
|     let me = cluster_info.read().unwrap().my_data().clone(); |     let me = cluster_info.read().unwrap().my_data().clone(); | ||||||
|     let mut retransmit_total = 0; |     let mut retransmit_total = 0; | ||||||
|     let mut compute_turbine_peers_total = 0; |     let mut compute_turbine_peers_total = 0; | ||||||
|     for packets in packet_v { |     for mut packets in packet_v { | ||||||
|         for packet in &packets.packets { |         for packet in packets.packets.iter_mut() { | ||||||
|             // skip repair packets |             // skip repair packets | ||||||
|             if packet.meta.repair { |             if packet.meta.repair { | ||||||
|                 total_packets -= 1; |                 total_packets -= 1; | ||||||
| @@ -100,10 +100,10 @@ fn retransmit( | |||||||
|                 leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref())); |                 leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref())); | ||||||
|             let mut retransmit_time = Measure::start("retransmit_to"); |             let mut retransmit_time = Measure::start("retransmit_to"); | ||||||
|             if !packet.meta.forward { |             if !packet.meta.forward { | ||||||
|                 ClusterInfo::retransmit_to(&me.id, &neighbors, packet, leader, sock, true)?; |                 ClusterInfo::retransmit_to(&neighbors, packet, leader, sock, true)?; | ||||||
|                 ClusterInfo::retransmit_to(&me.id, &children, packet, leader, sock, false)?; |                 ClusterInfo::retransmit_to(&children, packet, leader, sock, false)?; | ||||||
|             } else { |             } else { | ||||||
|                 ClusterInfo::retransmit_to(&me.id, &children, packet, leader, sock, true)?; |                 ClusterInfo::retransmit_to(&children, packet, leader, sock, true)?; | ||||||
|             } |             } | ||||||
|             retransmit_time.stop(); |             retransmit_time.stop(); | ||||||
|             retransmit_total += retransmit_time.as_ms(); |             retransmit_total += retransmit_time.as_ms(); | ||||||
|   | |||||||
| @@ -177,9 +177,8 @@ pub fn cluster_info_retransmit() -> result::Result<()> { | |||||||
|     let mut p = Packet::default(); |     let mut p = Packet::default(); | ||||||
|     p.meta.size = 10; |     p.meta.size = 10; | ||||||
|     let peers = c1.read().unwrap().retransmit_peers(); |     let peers = c1.read().unwrap().retransmit_peers(); | ||||||
|     let self_id = c1.read().unwrap().id(); |  | ||||||
|     let retransmit_peers: Vec<_> = peers.iter().collect(); |     let retransmit_peers: Vec<_> = peers.iter().collect(); | ||||||
|     ClusterInfo::retransmit_to(&self_id, &retransmit_peers, &p, None, &tn1, false)?; |     ClusterInfo::retransmit_to(&retransmit_peers, &mut p, None, &tn1, false)?; | ||||||
|     let res: Vec<_> = [tn1, tn2, tn3] |     let res: Vec<_> = [tn1, tn2, tn3] | ||||||
|         .into_par_iter() |         .into_par_iter() | ||||||
|         .map(|s| { |         .map(|s| { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user