diff --git a/src/tpu.rs b/src/tpu.rs index 7865e6b400..b979d8742a 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -220,6 +220,31 @@ impl Tpu { }) } + fn verifier_service( + exit: Arc, + packets_receiver: Arc>, + verified_sender: Arc)>>>>, + ) -> JoinHandle<()> { + spawn(move || loop { + let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone()); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }) + } + + fn verifier_services( + exit: Arc, + packets_receiver: streamer::PacketReceiver, + verified_sender: Sender)>>, + ) -> Vec> { + let sender = Arc::new(Mutex::new(verified_sender)); + let receiver = Arc::new(Mutex::new(packets_receiver)); + (0..4) + .map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone())) + .collect() + } + /// Create a UDP microservice that forwards messages the given Tpu. /// This service is the network leader /// Set `exit` to shutdown its threads. @@ -257,23 +282,10 @@ impl Tpu { blob_recycler.clone(), responder_receiver, ); - let (verified_sender, verified_receiver) = channel(); - let mut verify_threads = Vec::new(); - let shared_verified_sender = Arc::new(Mutex::new(verified_sender)); - let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver)); - for _ in 0..4 { - let exit_ = exit.clone(); - let recv = shared_packet_receiver.clone(); - let sender = shared_verified_sender.clone(); - let thread = spawn(move || loop { - let e = Self::verifier(&recv, &sender); - if e.is_err() && exit_.load(Ordering::Relaxed) { - break; - } - }); - verify_threads.push(thread); - } + let (verified_sender, verified_receiver) = channel(); + let verify_threads: Vec<_> = + Self::verifier_services(exit.clone(), packet_receiver, verified_sender); let (broadcast_sender, broadcast_receiver) = channel(); @@ -422,22 +434,9 @@ impl Tpu { responder_receiver, ); let (verified_sender, verified_receiver) = channel(); + let verify_threads: Vec<_> = + Self::verifier_services(exit.clone(), packet_receiver, verified_sender); - let mut verify_threads = Vec::new(); - let shared_verified_sender = Arc::new(Mutex::new(verified_sender)); - let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver)); - for _ in 0..4 { - let exit_ = exit.clone(); - let recv = shared_packet_receiver.clone(); - let sender = shared_verified_sender.clone(); - let thread = spawn(move || loop { - let e = Self::verifier(&recv, &sender); - if e.is_err() && exit_.load(Ordering::Relaxed) { - break; - } - }); - verify_threads.push(thread); - } let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone()); let t_thin_client = Self::thin_client_service(