uses rayon thread-pool for retransmit-stage parallelization (#19486)

This commit is contained in:
behzad nouri
2021-09-07 15:15:01 +00:00
committed by GitHub
parent fe8ba81ce6
commit 01a7ec8198
2 changed files with 146 additions and 224 deletions

View File

@ -31,7 +31,7 @@ use {
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
mpsc::channel, mpsc::channel,
Arc, Mutex, RwLock, Arc, RwLock,
}, },
thread::{sleep, Builder}, thread::{sleep, Builder},
time::Duration, time::Duration,
@ -78,7 +78,6 @@ fn bench_retransmitter(bencher: &mut Bencher) {
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
let (shreds_sender, shreds_receiver) = channel(); let (shreds_sender, shreds_receiver) = channel();
let shreds_receiver = Arc::new(Mutex::new(shreds_receiver));
const NUM_THREADS: usize = 2; const NUM_THREADS: usize = 2;
let sockets = (0..NUM_THREADS) let sockets = (0..NUM_THREADS)
.map(|_| UdpSocket::bind("0.0.0.0:0").unwrap()) .map(|_| UdpSocket::bind("0.0.0.0:0").unwrap())
@ -165,7 +164,5 @@ fn bench_retransmitter(bencher: &mut Bencher) {
total.store(0, Ordering::Relaxed); total.store(0, Ordering::Relaxed);
}); });
for t in retransmitter_handles { retransmitter_handles.join().unwrap();
t.join().unwrap();
}
} }

View File

@ -11,11 +11,11 @@ use {
completed_data_sets_service::CompletedDataSetsSender, completed_data_sets_service::CompletedDataSetsSender,
packet_hasher::PacketHasher, packet_hasher::PacketHasher,
repair_service::{DuplicateSlotsResetSender, RepairInfo}, repair_service::{DuplicateSlotsResetSender, RepairInfo},
result::{Error, Result},
window_service::{should_retransmit_and_persist, WindowService}, window_service::{should_retransmit_and_persist, WindowService},
}, },
crossbeam_channel::{Receiver, Sender}, crossbeam_channel::{Receiver, Sender},
lru::LruCache, lru::LruCache,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_client::rpc_response::SlotUpdate, solana_client::rpc_response::SlotUpdate,
solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}, solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
solana_ledger::{ solana_ledger::{
@ -23,125 +23,90 @@ use {
{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}, {blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
}, },
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
solana_perf::packet::Packets, solana_perf::packet::Packets,
solana_rayon_threadlimit::get_thread_count,
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{ solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp},
clock::Slot,
epoch_schedule::EpochSchedule,
pubkey::Pubkey,
timing::{timestamp, AtomicInterval},
},
std::{ std::{
collections::{BTreeSet, HashSet}, collections::{BTreeSet, HashSet},
net::UdpSocket, net::UdpSocket,
ops::DerefMut, ops::DerefMut,
sync::{ sync::{
atomic::{AtomicBool, AtomicU64, Ordering}, atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
mpsc::{self, channel, RecvTimeoutError}, mpsc::{self, channel, RecvTimeoutError},
Arc, Mutex, RwLock, Arc, Mutex, RwLock,
}, },
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Duration, time::{Duration, Instant},
}, },
}; };
const MAX_DUPLICATE_COUNT: usize = 2; const MAX_DUPLICATE_COUNT: usize = 2;
const DEFAULT_LRU_SIZE: usize = 10_000; const DEFAULT_LRU_SIZE: usize = 10_000;
// Limit a given thread to consume about this many shreds so that
// it doesn't pull up too much work.
const MAX_SHREDS_BATCH_SIZE: usize = 100;
const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
#[derive(Default)] #[derive(Default)]
struct RetransmitStats { struct RetransmitStats {
num_shreds: AtomicU64, since: Option<Instant>,
num_shreds_skipped: AtomicU64, num_shreds: usize,
total_batches: AtomicU64, num_shreds_skipped: AtomicUsize,
total_time: AtomicU64, total_batches: usize,
epoch_fetch: AtomicU64, total_time: u64,
epoch_cache_update: AtomicU64, epoch_fetch: u64,
epoch_cache_update: u64,
retransmit_total: AtomicU64, retransmit_total: AtomicU64,
last_ts: AtomicInterval,
compute_turbine_peers_total: AtomicU64, compute_turbine_peers_total: AtomicU64,
unknown_shred_slot_leader: AtomicUsize,
} }
#[allow(clippy::too_many_arguments)] impl RetransmitStats {
fn update_retransmit_stats( fn maybe_submit(
stats: &RetransmitStats, &mut self,
total_time: u64, root_bank: &Bank,
num_shreds: usize, working_bank: &Bank,
num_shreds_skipped: usize, cluster_info: &ClusterInfo,
retransmit_total: u64, cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
compute_turbine_peers_total: u64,
peers_len: usize,
epoch_fetch: u64,
epoch_cach_update: u64,
) { ) {
stats.total_time.fetch_add(total_time, Ordering::Relaxed); const SUBMIT_CADENCE: Duration = Duration::from_secs(2);
stats let elapsed = self.since.as_ref().map(Instant::elapsed);
.num_shreds if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE {
.fetch_add(num_shreds as u64, Ordering::Relaxed); return;
stats }
.num_shreds_skipped let num_peers = cluster_nodes_cache
.fetch_add(num_shreds_skipped as u64, Ordering::Relaxed); .get(root_bank.slot(), root_bank, working_bank, cluster_info)
stats .num_peers();
.retransmit_total let stats = std::mem::replace(
.fetch_add(retransmit_total, Ordering::Relaxed); self,
stats Self {
.compute_turbine_peers_total since: Some(Instant::now()),
.fetch_add(compute_turbine_peers_total, Ordering::Relaxed); ..Self::default()
stats.total_batches.fetch_add(1, Ordering::Relaxed); },
stats.epoch_fetch.fetch_add(epoch_fetch, Ordering::Relaxed); );
stats datapoint_info!("retransmit-num_nodes", ("count", num_peers, i64));
.epoch_cache_update
.fetch_add(epoch_cach_update, Ordering::Relaxed);
if stats.last_ts.should_update(2000) {
datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64));
datapoint_info!( datapoint_info!(
"retransmit-stage", "retransmit-stage",
( ("total_time", stats.total_time, i64),
"total_time", ("epoch_fetch", stats.epoch_fetch, i64),
stats.total_time.swap(0, Ordering::Relaxed) as i64, ("epoch_cache_update", stats.epoch_cache_update, i64),
i64 ("total_batches", stats.total_batches, i64),
), ("num_shreds", stats.num_shreds, i64),
(
"epoch_fetch",
stats.epoch_fetch.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"epoch_cache_update",
stats.epoch_cache_update.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_batches",
stats.total_batches.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"num_shreds",
stats.num_shreds.swap(0, Ordering::Relaxed) as i64,
i64
),
( (
"num_shreds_skipped", "num_shreds_skipped",
stats.num_shreds_skipped.swap(0, Ordering::Relaxed) as i64, stats.num_shreds_skipped.into_inner(),
i64
),
(
"retransmit_total",
stats.retransmit_total.swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
("retransmit_total", stats.retransmit_total.into_inner(), i64),
( (
"compute_turbine", "compute_turbine",
stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64, stats.compute_turbine_peers_total.into_inner(),
i64
),
(
"unknown_shred_slot_leader",
stats.unknown_shred_slot_leader.into_inner(),
i64 i64
), ),
); );
@ -188,14 +153,11 @@ fn check_if_first_shred_received(
} }
let mut first_shreds_received_locked = first_shreds_received.lock().unwrap(); let mut first_shreds_received_locked = first_shreds_received.lock().unwrap();
if !first_shreds_received_locked.contains(&shred_slot) { if first_shreds_received_locked.insert(shred_slot) {
datapoint_info!("retransmit-first-shred", ("slot", shred_slot, i64)); datapoint_info!("retransmit-first-shred", ("slot", shred_slot, i64));
first_shreds_received_locked.insert(shred_slot);
if first_shreds_received_locked.len() > 100 { if first_shreds_received_locked.len() > 100 {
let mut slots_before_root = *first_shreds_received_locked =
first_shreds_received_locked.split_off(&(root_bank.slot() + 1)); first_shreds_received_locked.split_off(&(root_bank.slot() + 1));
// `slots_before_root` now contains all slots <= root
std::mem::swap(&mut slots_before_root, &mut first_shreds_received_locked);
} }
true true
} else { } else {
@ -205,16 +167,11 @@ fn check_if_first_shred_received(
fn maybe_reset_shreds_received_cache( fn maybe_reset_shreds_received_cache(
shreds_received: &Mutex<ShredFilterAndHasher>, shreds_received: &Mutex<ShredFilterAndHasher>,
hasher_reset_ts: &AtomicU64, hasher_reset_ts: &mut Instant,
) { ) {
const UPDATE_INTERVAL_MS: u64 = 1000; const UPDATE_INTERVAL: Duration = Duration::from_secs(1);
let now = timestamp(); if hasher_reset_ts.elapsed() >= UPDATE_INTERVAL {
let prev = hasher_reset_ts.load(Ordering::Acquire); *hasher_reset_ts = Instant::now();
if now.saturating_sub(prev) > UPDATE_INTERVAL_MS
&& hasher_reset_ts
.compare_exchange(prev, now, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let mut shreds_received = shreds_received.lock().unwrap(); let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut(); let (cache, hasher) = shreds_received.deref_mut();
cache.clear(); cache.clear();
@ -224,31 +181,26 @@ fn maybe_reset_shreds_received_cache(
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn retransmit( fn retransmit(
thread_pool: &ThreadPool,
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache, leader_schedule_cache: &LeaderScheduleCache,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
shreds_receiver: &Mutex<mpsc::Receiver<Vec<Shred>>>, shreds_receiver: &mpsc::Receiver<Vec<Shred>>,
sock: &UdpSocket, sockets: &[UdpSocket],
id: u32, stats: &mut RetransmitStats,
stats: &RetransmitStats,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>, cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
hasher_reset_ts: &AtomicU64, hasher_reset_ts: &mut Instant,
shreds_received: &Mutex<ShredFilterAndHasher>, shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots, max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>, first_shreds_received: &Mutex<BTreeSet<Slot>>,
rpc_subscriptions: Option<&RpcSubscriptions>, rpc_subscriptions: Option<&RpcSubscriptions>,
) -> Result<()> { ) -> Result<(), RecvTimeoutError> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1); const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let shreds_receiver = shreds_receiver.lock().unwrap();
let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?; let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?;
let mut timer_start = Measure::start("retransmit"); let mut timer_start = Measure::start("retransmit");
while let Ok(more_shreds) = shreds_receiver.try_recv() { shreds.extend(shreds_receiver.try_iter().flatten());
shreds.extend(more_shreds); stats.num_shreds += shreds.len();
if shreds.len() >= MAX_SHREDS_BATCH_SIZE { stats.total_batches += 1;
break;
}
}
drop(shreds_receiver);
let mut epoch_fetch = Measure::start("retransmit_epoch_fetch"); let mut epoch_fetch = Measure::start("retransmit_epoch_fetch");
let (working_bank, root_bank) = { let (working_bank, root_bank) = {
@ -256,25 +208,24 @@ fn retransmit(
(bank_forks.working_bank(), bank_forks.root_bank()) (bank_forks.working_bank(), bank_forks.root_bank())
}; };
epoch_fetch.stop(); epoch_fetch.stop();
stats.epoch_fetch += epoch_fetch.as_us();
let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts); maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts);
epoch_cache_update.stop(); epoch_cache_update.stop();
stats.epoch_cache_update += epoch_cache_update.as_us();
let num_shreds = shreds.len();
let my_id = cluster_info.id(); let my_id = cluster_info.id();
let socket_addr_space = cluster_info.socket_addr_space(); let socket_addr_space = cluster_info.socket_addr_space();
let mut retransmit_total = 0; let retransmit_shred = |shred: Shred, socket: &UdpSocket| {
let mut num_shreds_skipped = 0;
let mut compute_turbine_peers_total = 0;
let mut max_slot = 0;
for shred in shreds {
if should_skip_retransmit(&shred, shreds_received) { if should_skip_retransmit(&shred, shreds_received) {
num_shreds_skipped += 1; stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed);
continue; return;
} }
let shred_slot = shred.slot(); let shred_slot = shred.slot();
max_slot = max_slot.max(shred_slot); max_slots
.retransmit
.fetch_max(shred_slot, Ordering::Relaxed);
if let Some(rpc_subscriptions) = rpc_subscriptions { if let Some(rpc_subscriptions) = rpc_subscriptions {
if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) { if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) {
@ -293,7 +244,12 @@ fn retransmit(
let slot_leader = let slot_leader =
match leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)) { match leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)) {
Some(pubkey) => pubkey, Some(pubkey) => pubkey,
None => continue, None => {
stats
.unknown_shred_slot_leader
.fetch_add(1, Ordering::Relaxed);
return;
}
}; };
let cluster_nodes = let cluster_nodes =
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
@ -302,7 +258,9 @@ fn retransmit(
cluster_nodes.get_retransmit_peers(shred_seed, DATA_PLANE_FANOUT, slot_leader); cluster_nodes.get_retransmit_peers(shred_seed, DATA_PLANE_FANOUT, slot_leader);
let anchor_node = neighbors[0].id == my_id; let anchor_node = neighbors[0].id == my_id;
compute_turbine_peers.stop(); compute_turbine_peers.stop();
compute_turbine_peers_total += compute_turbine_peers.as_us(); stats
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);
let mut retransmit_time = Measure::start("retransmit_to"); let mut retransmit_time = Measure::start("retransmit_to");
// If the node is on the critical path (i.e. the first node in each // If the node is on the critical path (i.e. the first node in each
@ -314,7 +272,7 @@ fn retransmit(
ClusterInfo::retransmit_to( ClusterInfo::retransmit_to(
&neighbors[1..], &neighbors[1..],
&shred.payload, &shred.payload,
sock, socket,
true, // forward socket true, // forward socket
socket_addr_space, socket_addr_space,
); );
@ -322,36 +280,25 @@ fn retransmit(
ClusterInfo::retransmit_to( ClusterInfo::retransmit_to(
&children, &children,
&shred.payload, &shred.payload,
sock, socket,
!anchor_node, // send to forward socket! !anchor_node, // send to forward socket!
socket_addr_space, socket_addr_space,
); );
retransmit_time.stop(); retransmit_time.stop();
retransmit_total += retransmit_time.as_us(); stats
} .retransmit_total
max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed); .fetch_add(retransmit_time.as_us(), Ordering::Relaxed);
};
thread_pool.install(|| {
shreds.into_par_iter().with_min_len(4).for_each(|shred| {
let index = thread_pool.current_thread_index().unwrap();
let socket = &sockets[index % sockets.len()];
retransmit_shred(shred, socket);
});
});
timer_start.stop(); timer_start.stop();
debug!( stats.total_time += timer_start.as_us();
"retransmitted {} shreds in {}ms retransmit_time: {}ms id: {}", stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache);
num_shreds,
timer_start.as_ms(),
retransmit_total,
id,
);
let cluster_nodes =
cluster_nodes_cache.get(root_bank.slot(), &root_bank, &working_bank, cluster_info);
update_retransmit_stats(
stats,
timer_start.as_us(),
num_shreds,
num_shreds_skipped,
retransmit_total,
compute_turbine_peers_total,
cluster_nodes.num_peers(),
epoch_fetch.as_us(),
epoch_cache_update.as_us(),
);
Ok(()) Ok(())
} }
@ -368,74 +315,56 @@ pub fn retransmitter(
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>, leader_schedule_cache: Arc<LeaderScheduleCache>,
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
shreds_receiver: Arc<Mutex<mpsc::Receiver<Vec<Shred>>>>, shreds_receiver: mpsc::Receiver<Vec<Shred>>,
max_slots: Arc<MaxSlots>, max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>, rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Vec<JoinHandle<()>> { ) -> JoinHandle<()> {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<RetransmitStage>::new( let cluster_nodes_cache = ClusterNodesCache::<RetransmitStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL, CLUSTER_NODES_CACHE_TTL,
)); );
let hasher_reset_ts = Arc::default(); let mut hasher_reset_ts = Instant::now();
let stats = Arc::new(RetransmitStats::default()); let mut stats = RetransmitStats::default();
let shreds_received = Arc::new(Mutex::new(( let shreds_received = Mutex::new((LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default()));
LruCache::new(DEFAULT_LRU_SIZE), let first_shreds_received = Mutex::<BTreeSet<Slot>>::default();
PacketHasher::default(), let num_threads = get_thread_count().min(8).max(sockets.len());
))); let thread_pool = ThreadPoolBuilder::new()
let first_shreds_received = Arc::new(Mutex::new(BTreeSet::new())); .num_threads(num_threads)
(0..sockets.len()) .thread_name(|i| format!("retransmit-{}", i))
.map(|s| { .build()
let sockets = sockets.clone(); .unwrap();
let bank_forks = bank_forks.clone();
let leader_schedule_cache = leader_schedule_cache.clone();
let shreds_receiver = shreds_receiver.clone();
let cluster_info = cluster_info.clone();
let stats = stats.clone();
let cluster_nodes_cache = Arc::clone(&cluster_nodes_cache);
let hasher_reset_ts = Arc::clone(&hasher_reset_ts);
let shreds_received = shreds_received.clone();
let max_slots = max_slots.clone();
let first_shreds_received = first_shreds_received.clone();
let rpc_subscriptions = rpc_subscriptions.clone();
Builder::new() Builder::new()
.name("solana-retransmitter".to_string()) .name("solana-retransmitter".to_string())
.spawn(move || { .spawn(move || {
trace!("retransmitter started"); trace!("retransmitter started");
loop { loop {
if let Err(e) = retransmit( match retransmit(
&thread_pool,
&bank_forks, &bank_forks,
&leader_schedule_cache, &leader_schedule_cache,
&cluster_info, &cluster_info,
&shreds_receiver, &shreds_receiver,
&sockets[s], &sockets,
s as u32, &mut stats,
&stats,
&cluster_nodes_cache, &cluster_nodes_cache,
&hasher_reset_ts, &mut hasher_reset_ts,
&shreds_received, &shreds_received,
&max_slots, &max_slots,
&first_shreds_received, &first_shreds_received,
rpc_subscriptions.as_deref(), rpc_subscriptions.as_deref(),
) { ) {
match e { Ok(()) => (),
Error::RecvTimeout(RecvTimeoutError::Disconnected) => break, Err(RecvTimeoutError::Timeout) => (),
Error::RecvTimeout(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => break,
_ => {
inc_new_counter_error!("streamer-retransmit-error", 1, 1);
}
}
} }
} }
trace!("exiting retransmitter"); trace!("exiting retransmitter");
}) })
.unwrap() .unwrap()
})
.collect()
} }
pub(crate) struct RetransmitStage { pub(crate) struct RetransmitStage {
thread_hdls: Vec<JoinHandle<()>>, retransmit_thread_handle: JoinHandle<()>,
window_service: WindowService, window_service: WindowService,
cluster_slots_service: ClusterSlotsService, cluster_slots_service: ClusterSlotsService,
} }
@ -470,8 +399,7 @@ impl RetransmitStage {
// https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136 // https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136
let _retransmit_sender = retransmit_sender.clone(); let _retransmit_sender = retransmit_sender.clone();
let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver)); let retransmit_thread_handle = retransmitter(
let thread_hdls = retransmitter(
retransmit_sockets, retransmit_sockets,
bank_forks.clone(), bank_forks.clone(),
leader_schedule_cache.clone(), leader_schedule_cache.clone(),
@ -529,19 +457,16 @@ impl RetransmitStage {
); );
Self { Self {
thread_hdls, retransmit_thread_handle,
window_service, window_service,
cluster_slots_service, cluster_slots_service,
} }
} }
pub(crate) fn join(self) -> thread::Result<()> { pub(crate) fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls { self.retransmit_thread_handle.join()?;
thread_hdl.join()?;
}
self.window_service.join()?; self.window_service.join()?;
self.cluster_slots_service.join()?; self.cluster_slots_service.join()
Ok(())
} }
} }
@ -613,7 +538,7 @@ mod tests {
bank_forks, bank_forks,
leader_schedule_cache, leader_schedule_cache,
cluster_info, cluster_info,
Arc::new(Mutex::new(retransmit_receiver)), retransmit_receiver,
Arc::default(), // MaxSlots Arc::default(), // MaxSlots
None, None,
); );