Skip retransmit when node is leader (#2625)
* Skip retransmit when node is leader * Fix window test
This commit is contained in:
		| @@ -120,6 +120,7 @@ pub fn retransmit_all_leader_blocks( | |||||||
|     dq: &[SharedBlob], |     dq: &[SharedBlob], | ||||||
|     leader_scheduler: &Arc<RwLock<LeaderScheduler>>, |     leader_scheduler: &Arc<RwLock<LeaderScheduler>>, | ||||||
|     retransmit: &BlobSender, |     retransmit: &BlobSender, | ||||||
|  |     id: &Pubkey, | ||||||
| ) -> Result<()> { | ) -> Result<()> { | ||||||
|     let mut retransmit_queue: Vec<SharedBlob> = Vec::new(); |     let mut retransmit_queue: Vec<SharedBlob> = Vec::new(); | ||||||
|     for b in dq { |     for b in dq { | ||||||
| @@ -127,9 +128,11 @@ pub fn retransmit_all_leader_blocks( | |||||||
|         // add to the retransmit_queue |         // add to the retransmit_queue | ||||||
|         let slot = b.read().unwrap().slot(); |         let slot = b.read().unwrap().slot(); | ||||||
|         if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) { |         if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) { | ||||||
|  |             if leader_id != *id { | ||||||
|                 add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue); |                 add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     submit( |     submit( | ||||||
|         influxdb::Point::new("retransmit-queue") |         influxdb::Point::new("retransmit-queue") | ||||||
| @@ -398,7 +401,12 @@ mod test { | |||||||
|  |  | ||||||
|         // Expect blob from leader to be retransmitted |         // Expect blob from leader to be retransmitted | ||||||
|         blob.write().unwrap().set_id(&leader); |         blob.write().unwrap().set_id(&leader); | ||||||
|         retransmit_all_leader_blocks(&vec![blob.clone()], &leader_scheduler, &blob_sender) |         retransmit_all_leader_blocks( | ||||||
|  |             &vec![blob.clone()], | ||||||
|  |             &leader_scheduler, | ||||||
|  |             &blob_sender, | ||||||
|  |             &nonleader, | ||||||
|  |         ) | ||||||
|         .expect("Expect successful retransmit"); |         .expect("Expect successful retransmit"); | ||||||
|         let output_blob = blob_receiver |         let output_blob = blob_receiver | ||||||
|             .try_recv() |             .try_recv() | ||||||
| @@ -412,7 +420,18 @@ mod test { | |||||||
|  |  | ||||||
|         // Expect blob from nonleader to not be retransmitted |         // Expect blob from nonleader to not be retransmitted | ||||||
|         blob.write().unwrap().set_id(&nonleader); |         blob.write().unwrap().set_id(&nonleader); | ||||||
|         retransmit_all_leader_blocks(&vec![blob], &leader_scheduler, &blob_sender) |         retransmit_all_leader_blocks( | ||||||
|  |             &vec![blob.clone()], | ||||||
|  |             &leader_scheduler, | ||||||
|  |             &blob_sender, | ||||||
|  |             &nonleader, | ||||||
|  |         ) | ||||||
|  |         .expect("Expect successful retransmit"); | ||||||
|  |         assert!(blob_receiver.try_recv().is_err()); | ||||||
|  |  | ||||||
|  |         // Expect blob from leader while currently leader to not be retransmitted | ||||||
|  |         blob.write().unwrap().set_id(&leader); | ||||||
|  |         retransmit_all_leader_blocks(&vec![blob], &leader_scheduler, &blob_sender, &leader) | ||||||
|             .expect("Expect successful retransmit"); |             .expect("Expect successful retransmit"); | ||||||
|         assert!(blob_receiver.try_recv().is_err()); |         assert!(blob_receiver.try_recv().is_err()); | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -76,7 +76,7 @@ fn recv_window( | |||||||
|             .to_owned(), |             .to_owned(), | ||||||
|     ); |     ); | ||||||
|  |  | ||||||
|     retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit)?; |     retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit, id)?; | ||||||
|  |  | ||||||
|     //send a contiguous set of blocks |     //send a contiguous set of blocks | ||||||
|     let mut consume_queue = Vec::new(); |     let mut consume_queue = Vec::new(); | ||||||
| @@ -249,15 +249,19 @@ mod test { | |||||||
|     #[test] |     #[test] | ||||||
|     pub fn window_send_test() { |     pub fn window_send_test() { | ||||||
|         solana_logger::setup(); |         solana_logger::setup(); | ||||||
|         let tn = Node::new_localhost(); |         // setup a leader whose id is used to generates blobs and a validator | ||||||
|  |         // node whose window service will retransmit leader blobs. | ||||||
|  |         let leader_node = Node::new_localhost(); | ||||||
|  |         let validator_node = Node::new_localhost(); | ||||||
|         let exit = Arc::new(AtomicBool::new(false)); |         let exit = Arc::new(AtomicBool::new(false)); | ||||||
|         let mut cluster_info_me = ClusterInfo::new(tn.info.clone()); |         let mut cluster_info_me = ClusterInfo::new(validator_node.info.clone()); | ||||||
|         let me_id = cluster_info_me.my_data().id; |         let me_id = leader_node.info.id; | ||||||
|         cluster_info_me.set_leader(me_id); |         cluster_info_me.set_leader(me_id); | ||||||
|         let subs = Arc::new(RwLock::new(cluster_info_me)); |         let subs = Arc::new(RwLock::new(cluster_info_me)); | ||||||
|  |  | ||||||
|         let (s_reader, r_reader) = channel(); |         let (s_reader, r_reader) = channel(); | ||||||
|         let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); |         let t_receiver = | ||||||
|  |             blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader); | ||||||
|         let (s_window, r_window) = channel(); |         let (s_window, r_window) = channel(); | ||||||
|         let (s_retransmit, r_retransmit) = channel(); |         let (s_retransmit, r_retransmit) = channel(); | ||||||
|         let done = Arc::new(AtomicBool::new(false)); |         let done = Arc::new(AtomicBool::new(false)); | ||||||
| @@ -274,7 +278,7 @@ mod test { | |||||||
|             r_reader, |             r_reader, | ||||||
|             Some(s_window), |             Some(s_window), | ||||||
|             s_retransmit, |             s_retransmit, | ||||||
|             Arc::new(tn.sockets.repair), |             Arc::new(leader_node.sockets.repair), | ||||||
|             Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), |             Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), | ||||||
|             done, |             done, | ||||||
|             exit.clone(), |             exit.clone(), | ||||||
| @@ -282,11 +286,11 @@ mod test { | |||||||
|         let t_responder = { |         let t_responder = { | ||||||
|             let (s_responder, r_responder) = channel(); |             let (s_responder, r_responder) = channel(); | ||||||
|             let blob_sockets: Vec<Arc<UdpSocket>> = |             let blob_sockets: Vec<Arc<UdpSocket>> = | ||||||
|                 tn.sockets.tvu.into_iter().map(Arc::new).collect(); |                 leader_node.sockets.tvu.into_iter().map(Arc::new).collect(); | ||||||
|  |  | ||||||
|             let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); |             let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); | ||||||
|             let num_blobs_to_make = 10; |             let num_blobs_to_make = 10; | ||||||
|             let gossip_address = &tn.info.gossip; |             let gossip_address = &leader_node.info.gossip; | ||||||
|             let msgs = make_consecutive_blobs( |             let msgs = make_consecutive_blobs( | ||||||
|                 &me_id, |                 &me_id, | ||||||
|                 num_blobs_to_make, |                 num_blobs_to_make, | ||||||
| @@ -320,14 +324,18 @@ mod test { | |||||||
|     #[test] |     #[test] | ||||||
|     pub fn window_send_leader_test2() { |     pub fn window_send_leader_test2() { | ||||||
|         solana_logger::setup(); |         solana_logger::setup(); | ||||||
|         let tn = Node::new_localhost(); |         // setup a leader whose id is used to generates blobs and a validator | ||||||
|  |         // node whose window service will retransmit leader blobs. | ||||||
|  |         let leader_node = Node::new_localhost(); | ||||||
|  |         let validator_node = Node::new_localhost(); | ||||||
|         let exit = Arc::new(AtomicBool::new(false)); |         let exit = Arc::new(AtomicBool::new(false)); | ||||||
|         let cluster_info_me = ClusterInfo::new(tn.info.clone()); |         let cluster_info_me = ClusterInfo::new(validator_node.info.clone()); | ||||||
|         let me_id = cluster_info_me.my_data().id; |         let me_id = leader_node.info.id; | ||||||
|         let subs = Arc::new(RwLock::new(cluster_info_me)); |         let subs = Arc::new(RwLock::new(cluster_info_me)); | ||||||
|  |  | ||||||
|         let (s_reader, r_reader) = channel(); |         let (s_reader, r_reader) = channel(); | ||||||
|         let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); |         let t_receiver = | ||||||
|  |             blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader); | ||||||
|         let (s_window, _r_window) = channel(); |         let (s_window, _r_window) = channel(); | ||||||
|         let (s_retransmit, r_retransmit) = channel(); |         let (s_retransmit, r_retransmit) = channel(); | ||||||
|         let done = Arc::new(AtomicBool::new(false)); |         let done = Arc::new(AtomicBool::new(false)); | ||||||
| @@ -344,7 +352,7 @@ mod test { | |||||||
|             r_reader, |             r_reader, | ||||||
|             Some(s_window), |             Some(s_window), | ||||||
|             s_retransmit, |             s_retransmit, | ||||||
|             Arc::new(tn.sockets.repair), |             Arc::new(leader_node.sockets.repair), | ||||||
|             Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), |             Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), | ||||||
|             done, |             done, | ||||||
|             exit.clone(), |             exit.clone(), | ||||||
| @@ -352,11 +360,16 @@ mod test { | |||||||
|         let t_responder = { |         let t_responder = { | ||||||
|             let (s_responder, r_responder) = channel(); |             let (s_responder, r_responder) = channel(); | ||||||
|             let blob_sockets: Vec<Arc<UdpSocket>> = |             let blob_sockets: Vec<Arc<UdpSocket>> = | ||||||
|                 tn.sockets.tvu.into_iter().map(Arc::new).collect(); |                 leader_node.sockets.tvu.into_iter().map(Arc::new).collect(); | ||||||
|             let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); |             let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); | ||||||
|             let mut msgs = Vec::new(); |             let mut msgs = Vec::new(); | ||||||
|             let blobs = |             let blobs = make_consecutive_blobs( | ||||||
|                 make_consecutive_blobs(&me_id, 14u64, 0, Default::default(), &tn.info.gossip); |                 &me_id, | ||||||
|  |                 14u64, | ||||||
|  |                 0, | ||||||
|  |                 Default::default(), | ||||||
|  |                 &leader_node.info.gossip, | ||||||
|  |             ); | ||||||
|  |  | ||||||
|             for v in 0..10 { |             for v in 0..10 { | ||||||
|                 let i = 9 - v; |                 let i = 9 - v; | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user