diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 4431474a33..39082010f1 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -5,7 +5,6 @@ use crate::{ ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, cluster_info_vote_listener::VerifiedVoteReceiver, completed_data_sets_service::CompletedDataSetsSender, - outstanding_requests::OutstandingRequests, repair_response, repair_service::{OutstandingShredRepairs, RepairInfo, RepairService}, result::{Error, Result}, @@ -88,10 +87,7 @@ pub fn should_retransmit_and_persist( root: u64, shred_version: u16, ) -> bool { - let slot_leader_pubkey = match bank { - None => leader_schedule_cache.slot_leader_at(shred.slot(), None), - Some(bank) => leader_schedule_cache.slot_leader_at(shred.slot(), Some(&bank)), - }; + let slot_leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), bank.as_deref()); if let Some(leader_id) = slot_leader_pubkey { if leader_id == *my_pubkey { inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1); @@ -265,7 +261,7 @@ fn recv_window( thread_pool: &ThreadPool, ) -> Result<()> where - F: Fn(&Shred, u64) -> bool + Sync, + F: Fn(&Shred, Arc, /*last root:*/ Slot) -> bool + Sync, { let timer = Duration::from_millis(200); let mut packets = verified_receiver.recv_timeout(timer)?; @@ -274,7 +270,10 @@ where let now = Instant::now(); inc_new_counter_debug!("streamer-recv_window-recv", total_packets); - let root_bank = bank_forks.read().unwrap().root_bank(); + let (root_bank, working_bank) = { + let bank_forks = bank_forks.read().unwrap(); + (bank_forks.root_bank(), bank_forks.working_bank()) + }; let last_root = blockstore.last_root(); let handle_packet = |packet: &mut Packet| { if packet.meta.discard { @@ -286,8 +285,9 @@ where // call to `new_from_serialized_shred` is safe. assert_eq!(packet.data.len(), PACKET_DATA_SIZE); let serialized_shred = packet.data.to_vec(); + let working_bank = Arc::clone(&working_bank); let shred = match Shred::new_from_serialized_shred(serialized_shred) { - Ok(shred) if shred_filter(&shred, last_root) => { + Ok(shred) if shred_filter(&shred, working_bank, last_root) => { let leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), Some(root_bank.deref())); packet.meta.slot = shred.slot(); @@ -386,12 +386,11 @@ impl WindowService { ) -> WindowService where F: 'static - + Fn(&Pubkey, &Shred, Option>, u64) -> bool + + Fn(&Pubkey, &Shred, Option>, /*last root:*/ Slot) -> bool + std::marker::Send + std::marker::Sync, { - let outstanding_requests: Arc> = - Arc::new(RwLock::new(OutstandingRequests::default())); + let outstanding_requests = Arc::>::default(); let bank_forks = repair_info.bank_forks.clone(); let cluster_info = repair_info.cluster_info.clone(); @@ -559,7 +558,6 @@ impl WindowService { let exit = exit.clone(); let blockstore = blockstore.clone(); let bank_forks = bank_forks.clone(); - let bank_forks_opt = Some(bank_forks.clone()); let leader_schedule_cache = leader_schedule_cache.clone(); Builder::new() .name("solana-window".to_string()) @@ -594,13 +592,11 @@ impl WindowService { &id, &verified_receiver, &retransmit, - |shred, last_root| { + |shred, bank, last_root| { shred_filter( &id, shred, - bank_forks_opt - .as_ref() - .map(|bank_forks| bank_forks.read().unwrap().working_bank()), + Some(bank), last_root, ) },