diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 8bab8e4c44..c81185cdcc 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -623,9 +623,9 @@ impl RetransmitStage { verified_receiver, retransmit_sender, repair_socket, - exit, + exit.clone(), repair_info, - leader_schedule_cache, + leader_schedule_cache.clone(), move |id, shred, working_bank, last_root| { let is_connected = cfg .as_ref() diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 8ee0b620f8..cfd1d4ed52 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -1,44 +1,47 @@ //! `window_service` handles the data plane incoming shreds, storing them in //! blockstore and retransmitting where required //! -use crate::{ - cluster_info_vote_listener::VerifiedVoteReceiver, - cluster_slots::ClusterSlots, - completed_data_sets_service::CompletedDataSetsSender, - outstanding_requests::OutstandingRequests, - repair_response, - repair_service::{OutstandingRepairs, RepairInfo, RepairService}, - result::{Error, Result}, -}; -use crossbeam_channel::{ - unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, -}; -use rayon::{prelude::*, ThreadPool}; -use solana_gossip::cluster_info::ClusterInfo; -use solana_ledger::{ - blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT}, - leader_schedule_cache::LeaderScheduleCache, - shred::{Nonce, Shred}, -}; -use solana_measure::measure::Measure; -use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; -use solana_perf::packet::{Packet, Packets}; -use solana_rayon_threadlimit::get_thread_count; -use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}; -use solana_streamer::streamer::PacketSender; -use std::collections::HashSet; -use std::{ - net::{SocketAddr, UdpSocket}, - ops::Deref, - sync::atomic::{AtomicBool, Ordering}, - sync::{Arc, RwLock}, - thread::{self, Builder, JoinHandle}, - time::{Duration, Instant}, +use { + crate::{ + cluster_info_vote_listener::VerifiedVoteReceiver, + cluster_slots::ClusterSlots, + completed_data_sets_service::CompletedDataSetsSender, + repair_response, + repair_service::{OutstandingRepairs, RepairInfo, RepairService}, + result::{Error, Result}, + }, + crossbeam_channel::{ + unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, + }, + rayon::{prelude::*, ThreadPool}, + solana_gossip::cluster_info::ClusterInfo, + solana_ledger::{ + blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT}, + leader_schedule_cache::LeaderScheduleCache, + shred::{Nonce, Shred}, + }, + solana_measure::measure::Measure, + solana_metrics::{inc_new_counter_debug, inc_new_counter_error}, + solana_perf::packet::{Packet, Packets}, + solana_rayon_threadlimit::get_thread_count, + solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}, + solana_streamer::streamer::PacketSender, + std::collections::HashSet, + std::{ + net::{SocketAddr, UdpSocket}, + ops::Deref, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, + }, }; -pub type DuplicateSlotSender = CrossbeamSender; -pub type DuplicateSlotReceiver = CrossbeamReceiver; +type DuplicateSlotSender = CrossbeamSender; +pub(crate) type DuplicateSlotReceiver = CrossbeamReceiver; #[derive(Default)] struct WindowServiceMetrics { @@ -49,7 +52,7 @@ struct WindowServiceMetrics { } impl WindowServiceMetrics { - pub fn report_metrics(&self, metric_name: &'static str) { + fn report_metrics(&self, metric_name: &'static str) { datapoint_info!( metric_name, ("run_insert_count", self.run_insert_count as i64, i64), @@ -80,18 +83,15 @@ fn verify_shred_slot(shred: &Shred, root: u64) -> bool { /// drop shreds that are from myself or not from the correct leader for the /// shred's slot -pub fn should_retransmit_and_persist( +pub(crate) fn should_retransmit_and_persist( shred: &Shred, bank: Option>, - leader_schedule_cache: &Arc, + leader_schedule_cache: &LeaderScheduleCache, my_pubkey: &Pubkey, root: u64, shred_version: u16, ) -> bool { - let slot_leader_pubkey = match bank { - None => leader_schedule_cache.slot_leader_at(shred.slot(), None), - Some(bank) => leader_schedule_cache.slot_leader_at(shred.slot(), Some(&bank)), - }; + let slot_leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), bank.as_deref()); if let Some(leader_id) = slot_leader_pubkey { if leader_id == *my_pubkey { inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1); @@ -172,7 +172,7 @@ fn verify_repair( fn prune_shreds_invalid_repair( shreds: &mut Vec, repair_infos: &mut Vec>, - outstanding_requests: &Arc>, + outstanding_requests: &RwLock, ) { assert_eq!(shreds.len(), repair_infos.len()); let mut i = 0; @@ -198,13 +198,13 @@ fn prune_shreds_invalid_repair( fn run_insert( shred_receiver: &CrossbeamReceiver<(Vec, Vec>)>, - blockstore: &Arc, - leader_schedule_cache: &Arc, + blockstore: &Blockstore, + leader_schedule_cache: &LeaderScheduleCache, handle_duplicate: F, metrics: &mut BlockstoreInsertionMetrics, ws_metrics: &mut WindowServiceMetrics, completed_data_sets_sender: &CompletedDataSetsSender, - outstanding_requests: &Arc>, + outstanding_requests: &RwLock, ) -> Result<()> where F: Fn(Shred), @@ -251,9 +251,9 @@ where } fn recv_window( - blockstore: &Arc, - leader_schedule_cache: &Arc, - bank_forks: &Arc>, + blockstore: &Blockstore, + leader_schedule_cache: &LeaderScheduleCache, + bank_forks: &RwLock, insert_shred_sender: &CrossbeamSender<(Vec, Vec>)>, my_pubkey: &Pubkey, verified_receiver: &CrossbeamReceiver>, @@ -262,7 +262,7 @@ fn recv_window( thread_pool: &ThreadPool, ) -> Result<()> where - F: Fn(&Shred, u64) -> bool + Sync, + F: Fn(&Shred, Arc, /*last root:*/ Slot) -> bool + Sync, { let timer = Duration::from_millis(200); let mut packets = verified_receiver.recv_timeout(timer)?; @@ -271,7 +271,10 @@ where let now = Instant::now(); inc_new_counter_debug!("streamer-recv_window-recv", total_packets); - let root_bank = bank_forks.read().unwrap().root_bank(); + let (root_bank, working_bank) = { + let bank_forks = bank_forks.read().unwrap(); + (bank_forks.root_bank(), bank_forks.working_bank()) + }; let last_root = blockstore.last_root(); let handle_packet = |packet: &mut Packet| { if packet.meta.discard { @@ -283,8 +286,9 @@ where // call to `new_from_serialized_shred` is safe. assert_eq!(packet.data.len(), PACKET_DATA_SIZE); let serialized_shred = packet.data.to_vec(); + let working_bank = Arc::clone(&working_bank); let shred = match Shred::new_from_serialized_shred(serialized_shred) { - Ok(shred) if shred_filter(&shred, last_root) => { + Ok(shred) if shred_filter(&shred, working_bank, last_root) => { let leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), Some(root_bank.deref())); packet.meta.slot = shred.slot(); @@ -358,7 +362,7 @@ impl Drop for Finalizer { } } -pub struct WindowService { +pub(crate) struct WindowService { t_window: JoinHandle<()>, t_insert: JoinHandle<()>, t_check_duplicate: JoinHandle<()>, @@ -367,15 +371,15 @@ pub struct WindowService { impl WindowService { #[allow(clippy::too_many_arguments)] - pub fn new( + pub(crate) fn new( blockstore: Arc, cluster_info: Arc, verified_receiver: CrossbeamReceiver>, retransmit: PacketSender, repair_socket: Arc, - exit: &Arc, + exit: Arc, repair_info: RepairInfo, - leader_schedule_cache: &Arc, + leader_schedule_cache: Arc, shred_filter: F, cluster_slots: Arc, verified_vote_receiver: VerifiedVoteReceiver, @@ -384,12 +388,11 @@ impl WindowService { ) -> WindowService where F: 'static - + Fn(&Pubkey, &Shred, Option>, u64) -> bool + + Fn(&Pubkey, &Shred, Option>, /*last root:*/ Slot) -> bool + std::marker::Send + std::marker::Sync, { - let outstanding_requests: Arc> = - Arc::new(RwLock::new(OutstandingRequests::default())); + let outstanding_requests = Arc::>::default(); let bank_forks = repair_info.bank_forks.clone(); @@ -416,9 +419,9 @@ impl WindowService { ); let t_insert = Self::start_window_insert_thread( - exit, - &blockstore, - leader_schedule_cache, + exit.clone(), + blockstore.clone(), + leader_schedule_cache.clone(), insert_receiver, duplicate_sender, completed_data_sets_sender, @@ -428,12 +431,12 @@ impl WindowService { let t_window = Self::start_recv_window_thread( cluster_info.id(), exit, - &blockstore, + blockstore, insert_sender, verified_receiver, shred_filter, leader_schedule_cache, - &bank_forks, + bank_forks, retransmit, ); @@ -478,17 +481,14 @@ impl WindowService { } fn start_window_insert_thread( - exit: &Arc, - blockstore: &Arc, - leader_schedule_cache: &Arc, + exit: Arc, + blockstore: Arc, + leader_schedule_cache: Arc, insert_receiver: CrossbeamReceiver<(Vec, Vec>)>, check_duplicate_sender: CrossbeamSender, completed_data_sets_sender: CompletedDataSetsSender, outstanding_requests: Arc>, ) -> JoinHandle<()> { - let exit = exit.clone(); - let blockstore = blockstore.clone(); - let leader_schedule_cache = leader_schedule_cache.clone(); let mut handle_timeout = || {}; let handle_error = || { inc_new_counter_error!("solana-window-insert-error", 1, 1); @@ -538,13 +538,13 @@ impl WindowService { #[allow(clippy::too_many_arguments)] fn start_recv_window_thread( id: Pubkey, - exit: &Arc, - blockstore: &Arc, + exit: Arc, + blockstore: Arc, insert_sender: CrossbeamSender<(Vec, Vec>)>, verified_receiver: CrossbeamReceiver>, shred_filter: F, - leader_schedule_cache: &Arc, - bank_forks: &Arc>, + leader_schedule_cache: Arc, + bank_forks: Arc>, retransmit: PacketSender, ) -> JoinHandle<()> where @@ -553,11 +553,6 @@ impl WindowService { + std::marker::Send + std::marker::Sync, { - let exit = exit.clone(); - let blockstore = blockstore.clone(); - let bank_forks = bank_forks.clone(); - let bank_forks_opt = Some(bank_forks.clone()); - let leader_schedule_cache = leader_schedule_cache.clone(); Builder::new() .name("solana-window".to_string()) .spawn(move || { @@ -591,13 +586,11 @@ impl WindowService { &id, &verified_receiver, &retransmit, - |shred, last_root| { + |shred, bank, last_root| { shred_filter( &id, shred, - bank_forks_opt - .as_ref() - .map(|bank_forks| bank_forks.read().unwrap().working_bank()), + Some(bank), last_root, ) }, @@ -634,7 +627,7 @@ impl WindowService { } } - pub fn join(self) -> thread::Result<()> { + pub(crate) fn join(self) -> thread::Result<()> { self.t_window.join()?; self.t_insert.join()?; self.t_check_duplicate.join()?; @@ -644,23 +637,24 @@ impl WindowService { #[cfg(test)] mod test { - use super::*; - use solana_gossip::contact_info::ContactInfo; - use solana_ledger::{ - blockstore::{make_many_slot_entries, Blockstore}, - entry::{create_ticks, Entry}, - genesis_utils::create_genesis_config_with_leader, - get_tmp_ledger_path, - shred::{DataShredHeader, Shredder}, + use { + super::*, + solana_gossip::contact_info::ContactInfo, + solana_ledger::{ + blockstore::{make_many_slot_entries, Blockstore}, + entry::{create_ticks, Entry}, + genesis_utils::create_genesis_config_with_leader, + get_tmp_ledger_path, + shred::{DataShredHeader, Shredder}, + }, + solana_sdk::{ + epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, + hash::Hash, + signature::{Keypair, Signer}, + timing::timestamp, + }, + solana_streamer::socket::SocketAddrSpace, }; - use solana_sdk::{ - epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, - hash::Hash, - signature::{Keypair, Signer}, - timing::timestamp, - }; - use solana_streamer::socket::SocketAddrSpace; - use std::sync::Arc; fn local_entries_to_shred( entries: &[Entry], diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 2a461e6216..9a60a74797 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -802,7 +802,7 @@ impl Blockstore { &self, shreds: Vec, is_repaired: Vec, - leader_schedule: Option<&Arc>, + leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, handle_duplicate: &F, metrics: &mut BlockstoreInsertionMetrics, @@ -1045,7 +1045,7 @@ impl Blockstore { pub fn insert_shreds( &self, shreds: Vec, - leader_schedule: Option<&Arc>, + leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, ) -> Result<(Vec, Vec)> { let shreds_len = shreds.len(); @@ -1228,7 +1228,7 @@ impl Blockstore { index_meta_time: &mut u64, is_trusted: bool, handle_duplicate: &F, - leader_schedule: Option<&Arc>, + leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, ) -> std::result::Result, InsertDataShredError> where @@ -1363,7 +1363,7 @@ impl Blockstore { slot_meta: &SlotMeta, just_inserted_data_shreds: &HashMap<(u64, u64), Shred>, last_root: &RwLock, - leader_schedule: Option<&Arc>, + leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, ) -> bool { use crate::shred::SHRED_PAYLOAD_SIZE;