diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 381c443c5b..1f4bff3974 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -601,9 +601,9 @@ impl RetransmitStage { verified_receiver, retransmit_sender, repair_socket, - exit, + exit.clone(), repair_info, - leader_schedule_cache, + leader_schedule_cache.clone(), move |id, shred, working_bank, last_root| { let is_connected = cfg .as_ref() diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 39082010f1..8de3a1d9a2 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -1,43 +1,47 @@ //! `window_service` handles the data plane incoming shreds, storing them in //! blockstore and retransmitting where required //! -use crate::{ - ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, - cluster_info_vote_listener::VerifiedVoteReceiver, - completed_data_sets_service::CompletedDataSetsSender, - repair_response, - repair_service::{OutstandingShredRepairs, RepairInfo, RepairService}, - result::{Error, Result}, -}; -use crossbeam_channel::{ - unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, -}; -use rayon::{prelude::*, ThreadPool}; -use solana_gossip::cluster_info::ClusterInfo; -use solana_ledger::{ - blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT}, - leader_schedule_cache::LeaderScheduleCache, - shred::{Nonce, Shred}, -}; -use solana_measure::measure::Measure; -use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; -use solana_perf::packet::{Packet, Packets}; -use solana_rayon_threadlimit::get_thread_count; -use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}; -use solana_streamer::streamer::PacketSender; -use std::collections::HashSet; -use std::{ - net::{SocketAddr, UdpSocket}, - ops::Deref, - sync::atomic::{AtomicBool, Ordering}, - sync::{Arc, RwLock}, - thread::{self, Builder, JoinHandle}, - time::{Duration, Instant}, +use { + crate::{ + ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, + cluster_info_vote_listener::VerifiedVoteReceiver, + completed_data_sets_service::CompletedDataSetsSender, + repair_response, + repair_service::{OutstandingShredRepairs, RepairInfo, RepairService}, + result::{Error, Result}, + }, + crossbeam_channel::{ + unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, + }, + rayon::{prelude::*, ThreadPool}, + solana_gossip::cluster_info::ClusterInfo, + solana_ledger::{ + blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT}, + leader_schedule_cache::LeaderScheduleCache, + shred::{Nonce, Shred}, + }, + solana_measure::measure::Measure, + solana_metrics::{inc_new_counter_debug, inc_new_counter_error}, + solana_perf::packet::{Packet, Packets}, + solana_rayon_threadlimit::get_thread_count, + solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}, + solana_streamer::streamer::PacketSender, + std::collections::HashSet, + std::{ + net::{SocketAddr, UdpSocket}, + ops::Deref, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, + }, }; -pub type DuplicateSlotSender = CrossbeamSender; -pub type DuplicateSlotReceiver = CrossbeamReceiver; +type DuplicateSlotSender = CrossbeamSender; +pub(crate) type DuplicateSlotReceiver = CrossbeamReceiver; #[derive(Default)] struct WindowServiceMetrics { @@ -48,7 +52,7 @@ struct WindowServiceMetrics { } impl WindowServiceMetrics { - pub fn report_metrics(&self, metric_name: &'static str) { + fn report_metrics(&self, metric_name: &'static str) { datapoint_info!( metric_name, ("run_insert_count", self.run_insert_count as i64, i64), @@ -79,10 +83,10 @@ fn verify_shred_slot(shred: &Shred, root: u64) -> bool { /// drop shreds that are from myself or not from the correct leader for the /// shred's slot -pub fn should_retransmit_and_persist( +pub(crate) fn should_retransmit_and_persist( shred: &Shred, bank: Option>, - leader_schedule_cache: &Arc, + leader_schedule_cache: &LeaderScheduleCache, my_pubkey: &Pubkey, root: u64, shred_version: u16, @@ -171,7 +175,7 @@ fn verify_repair( fn prune_shreds_invalid_repair( shreds: &mut Vec, repair_infos: &mut Vec>, - outstanding_requests: &Arc>, + outstanding_requests: &RwLock, ) { assert_eq!(shreds.len(), repair_infos.len()); let mut i = 0; @@ -197,13 +201,13 @@ fn prune_shreds_invalid_repair( fn run_insert( shred_receiver: &CrossbeamReceiver<(Vec, Vec>)>, - blockstore: &Arc, - leader_schedule_cache: &Arc, + blockstore: &Blockstore, + leader_schedule_cache: &LeaderScheduleCache, handle_duplicate: F, metrics: &mut BlockstoreInsertionMetrics, ws_metrics: &mut WindowServiceMetrics, completed_data_sets_sender: &CompletedDataSetsSender, - outstanding_requests: &Arc>, + outstanding_requests: &RwLock, ) -> Result<()> where F: Fn(Shred), @@ -250,9 +254,9 @@ where } fn recv_window( - blockstore: &Arc, - leader_schedule_cache: &Arc, - bank_forks: &Arc>, + blockstore: &Blockstore, + leader_schedule_cache: &LeaderScheduleCache, + bank_forks: &RwLock, insert_shred_sender: &CrossbeamSender<(Vec, Vec>)>, my_pubkey: &Pubkey, verified_receiver: &CrossbeamReceiver>, @@ -361,7 +365,7 @@ impl Drop for Finalizer { } } -pub struct WindowService { +pub(crate) struct WindowService { t_window: JoinHandle<()>, t_insert: JoinHandle<()>, t_check_duplicate: JoinHandle<()>, @@ -370,14 +374,14 @@ pub struct WindowService { impl WindowService { #[allow(clippy::too_many_arguments)] - pub fn new( + pub(crate) fn new( blockstore: Arc, verified_receiver: CrossbeamReceiver>, retransmit: PacketSender, repair_socket: Arc, - exit: &Arc, + exit: Arc, repair_info: RepairInfo, - leader_schedule_cache: &Arc, + leader_schedule_cache: Arc, shred_filter: F, verified_vote_receiver: VerifiedVoteReceiver, completed_data_sets_sender: CompletedDataSetsSender, @@ -418,9 +422,9 @@ impl WindowService { ); let t_insert = Self::start_window_insert_thread( - exit, - &blockstore, - leader_schedule_cache, + exit.clone(), + blockstore.clone(), + leader_schedule_cache.clone(), insert_receiver, duplicate_sender, completed_data_sets_sender, @@ -430,12 +434,12 @@ impl WindowService { let t_window = Self::start_recv_window_thread( id, exit, - &blockstore, + blockstore, insert_sender, verified_receiver, shred_filter, leader_schedule_cache, - &bank_forks, + bank_forks, retransmit, ); @@ -480,17 +484,14 @@ impl WindowService { } fn start_window_insert_thread( - exit: &Arc, - blockstore: &Arc, - leader_schedule_cache: &Arc, + exit: Arc, + blockstore: Arc, + leader_schedule_cache: Arc, insert_receiver: CrossbeamReceiver<(Vec, Vec>)>, check_duplicate_sender: CrossbeamSender, completed_data_sets_sender: CompletedDataSetsSender, outstanding_requests: Arc>, ) -> JoinHandle<()> { - let exit = exit.clone(); - let blockstore = blockstore.clone(); - let leader_schedule_cache = leader_schedule_cache.clone(); let mut handle_timeout = || {}; let handle_error = || { inc_new_counter_error!("solana-window-insert-error", 1, 1); @@ -540,13 +541,13 @@ impl WindowService { #[allow(clippy::too_many_arguments)] fn start_recv_window_thread( id: Pubkey, - exit: &Arc, - blockstore: &Arc, + exit: Arc, + blockstore: Arc, insert_sender: CrossbeamSender<(Vec, Vec>)>, verified_receiver: CrossbeamReceiver>, shred_filter: F, - leader_schedule_cache: &Arc, - bank_forks: &Arc>, + leader_schedule_cache: Arc, + bank_forks: Arc>, retransmit: PacketSender, ) -> JoinHandle<()> where @@ -555,10 +556,6 @@ impl WindowService { + std::marker::Send + std::marker::Sync, { - let exit = exit.clone(); - let blockstore = blockstore.clone(); - let bank_forks = bank_forks.clone(); - let leader_schedule_cache = leader_schedule_cache.clone(); Builder::new() .name("solana-window".to_string()) .spawn(move || { @@ -633,7 +630,7 @@ impl WindowService { } } - pub fn join(self) -> thread::Result<()> { + pub(crate) fn join(self) -> thread::Result<()> { self.t_window.join()?; self.t_insert.join()?; self.t_check_duplicate.join()?; @@ -643,23 +640,24 @@ impl WindowService { #[cfg(test)] mod test { - use super::*; - use solana_entry::entry::{create_ticks, Entry}; - use solana_gossip::contact_info::ContactInfo; - use solana_ledger::{ - blockstore::{make_many_slot_entries, Blockstore}, - genesis_utils::create_genesis_config_with_leader, - get_tmp_ledger_path, - shred::{DataShredHeader, Shredder}, + use { + super::*, + solana_entry::entry::{create_ticks, Entry}, + solana_gossip::contact_info::ContactInfo, + solana_ledger::{ + blockstore::{make_many_slot_entries, Blockstore}, + genesis_utils::create_genesis_config_with_leader, + get_tmp_ledger_path, + shred::{DataShredHeader, Shredder}, + }, + solana_sdk::{ + epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, + hash::Hash, + signature::{Keypair, Signer}, + timing::timestamp, + }, + solana_streamer::socket::SocketAddrSpace, }; - use solana_sdk::{ - epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, - hash::Hash, - signature::{Keypair, Signer}, - timing::timestamp, - }; - use solana_streamer::socket::SocketAddrSpace; - use std::sync::Arc; fn local_entries_to_shred( entries: &[Entry], diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 3d781f94a8..f1eb114c37 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -797,7 +797,7 @@ impl Blockstore { &self, shreds: Vec, is_repaired: Vec, - leader_schedule: Option<&Arc>, + leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, handle_duplicate: &F, metrics: &mut BlockstoreInsertionMetrics, @@ -1040,7 +1040,7 @@ impl Blockstore { pub fn insert_shreds( &self, shreds: Vec, - leader_schedule: Option<&Arc>, + leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, ) -> Result<(Vec, Vec)> { let shreds_len = shreds.len(); @@ -1223,7 +1223,7 @@ impl Blockstore { index_meta_time: &mut u64, is_trusted: bool, handle_duplicate: &F, - leader_schedule: Option<&Arc>, + leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, ) -> std::result::Result, InsertDataShredError> where @@ -1358,7 +1358,7 @@ impl Blockstore { slot_meta: &SlotMeta, just_inserted_data_shreds: &HashMap<(u64, u64), Shred>, last_root: &RwLock, - leader_schedule: Option<&Arc>, + leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, ) -> bool { use crate::shred::SHRED_PAYLOAD_SIZE;