diff --git a/src/db_window.rs b/src/db_window.rs index 3e4b012df9..63a309eff5 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -120,6 +120,7 @@ pub fn retransmit_all_leader_blocks( dq: &[SharedBlob], leader_scheduler: &Arc>, retransmit: &BlobSender, + id: &Pubkey, ) -> Result<()> { let mut retransmit_queue: Vec = Vec::new(); for b in dq { @@ -127,7 +128,9 @@ pub fn retransmit_all_leader_blocks( // add to the retransmit_queue let slot = b.read().unwrap().slot(); if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) { - add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue); + if leader_id != *id { + add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue); + } } } @@ -398,8 +401,13 @@ mod test { // Expect blob from leader to be retransmitted blob.write().unwrap().set_id(&leader); - retransmit_all_leader_blocks(&vec![blob.clone()], &leader_scheduler, &blob_sender) - .expect("Expect successful retransmit"); + retransmit_all_leader_blocks( + &vec![blob.clone()], + &leader_scheduler, + &blob_sender, + &nonleader, + ) + .expect("Expect successful retransmit"); let output_blob = blob_receiver .try_recv() .expect("Expect input blob to be retransmitted"); @@ -412,7 +420,18 @@ mod test { // Expect blob from nonleader to not be retransmitted blob.write().unwrap().set_id(&nonleader); - retransmit_all_leader_blocks(&vec![blob], &leader_scheduler, &blob_sender) + retransmit_all_leader_blocks( + &vec![blob.clone()], + &leader_scheduler, + &blob_sender, + &nonleader, + ) + .expect("Expect successful retransmit"); + assert!(blob_receiver.try_recv().is_err()); + + // Expect blob from leader while currently leader to not be retransmitted + blob.write().unwrap().set_id(&leader); + retransmit_all_leader_blocks(&vec![blob], &leader_scheduler, &blob_sender, &leader) .expect("Expect successful retransmit"); assert!(blob_receiver.try_recv().is_err()); } diff --git a/src/window_service.rs b/src/window_service.rs index ab3bf2ec00..895f2e42e5 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -76,7 +76,7 @@ fn recv_window( .to_owned(), ); - retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit)?; + retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit, id)?; //send a contiguous set of blocks let mut consume_queue = Vec::new(); @@ -249,15 +249,19 @@ mod test { #[test] pub fn window_send_test() { solana_logger::setup(); - let tn = Node::new_localhost(); + // setup a leader whose id is used to generates blobs and a validator + // node whose window service will retransmit leader blobs. + let leader_node = Node::new_localhost(); + let validator_node = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); - let mut cluster_info_me = ClusterInfo::new(tn.info.clone()); - let me_id = cluster_info_me.my_data().id; + let mut cluster_info_me = ClusterInfo::new(validator_node.info.clone()); + let me_id = leader_node.info.id; cluster_info_me.set_leader(me_id); let subs = Arc::new(RwLock::new(cluster_info_me)); let (s_reader, r_reader) = channel(); - let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); + let t_receiver = + blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader); let (s_window, r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); let done = Arc::new(AtomicBool::new(false)); @@ -274,7 +278,7 @@ mod test { r_reader, Some(s_window), s_retransmit, - Arc::new(tn.sockets.repair), + Arc::new(leader_node.sockets.repair), Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), done, exit.clone(), @@ -282,11 +286,11 @@ mod test { let t_responder = { let (s_responder, r_responder) = channel(); let blob_sockets: Vec> = - tn.sockets.tvu.into_iter().map(Arc::new).collect(); + leader_node.sockets.tvu.into_iter().map(Arc::new).collect(); let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let num_blobs_to_make = 10; - let gossip_address = &tn.info.gossip; + let gossip_address = &leader_node.info.gossip; let msgs = make_consecutive_blobs( &me_id, num_blobs_to_make, @@ -320,14 +324,18 @@ mod test { #[test] pub fn window_send_leader_test2() { solana_logger::setup(); - let tn = Node::new_localhost(); + // setup a leader whose id is used to generates blobs and a validator + // node whose window service will retransmit leader blobs. + let leader_node = Node::new_localhost(); + let validator_node = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); - let cluster_info_me = ClusterInfo::new(tn.info.clone()); - let me_id = cluster_info_me.my_data().id; + let cluster_info_me = ClusterInfo::new(validator_node.info.clone()); + let me_id = leader_node.info.id; let subs = Arc::new(RwLock::new(cluster_info_me)); let (s_reader, r_reader) = channel(); - let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); + let t_receiver = + blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader); let (s_window, _r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); let done = Arc::new(AtomicBool::new(false)); @@ -344,7 +352,7 @@ mod test { r_reader, Some(s_window), s_retransmit, - Arc::new(tn.sockets.repair), + Arc::new(leader_node.sockets.repair), Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), done, exit.clone(), @@ -352,11 +360,16 @@ mod test { let t_responder = { let (s_responder, r_responder) = channel(); let blob_sockets: Vec> = - tn.sockets.tvu.into_iter().map(Arc::new).collect(); + leader_node.sockets.tvu.into_iter().map(Arc::new).collect(); let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let mut msgs = Vec::new(); - let blobs = - make_consecutive_blobs(&me_id, 14u64, 0, Default::default(), &tn.info.gossip); + let blobs = make_consecutive_blobs( + &me_id, + 14u64, + 0, + Default::default(), + &leader_node.info.gossip, + ); for v in 0..10 { let i = 9 - v;