diff --git a/src/tpu.rs b/src/tpu.rs index 8ef5bde978..6db538d1f9 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -34,7 +34,7 @@ use sigverify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread::JoinHandle; use std::time::Duration; use streamer::BlobReceiver; @@ -81,7 +81,7 @@ impl Tpu { bank.clone(), exit.clone(), blob_recycler.clone(), - Mutex::new(writer), + writer, record_stage.entry_receiver, ); let mut thread_hdls = vec![ diff --git a/src/write_stage.rs b/src/write_stage.rs index 5984e0b7f6..c2810164ed 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -49,24 +49,27 @@ impl WriteStage { bank: Arc, exit: Arc, blob_recycler: BlobRecycler, - writer: Mutex, + writer: W, entry_receiver: Receiver>, ) -> Self { let (blob_sender, blob_receiver) = channel(); let thread_hdl = Builder::new() .name("solana-writer".to_string()) - .spawn(move || loop { + .spawn(move || { let entry_writer = EntryWriter::new(&bank); - let _ = Self::write_and_send_entries( - &entry_writer, - &blob_sender, - &blob_recycler, - &writer, - &entry_receiver, - ); - if exit.load(Ordering::Relaxed) { - info!("broadcat_service exiting"); - break; + let writer = Mutex::new(writer); + loop { + let _ = Self::write_and_send_entries( + &entry_writer, + &blob_sender, + &blob_recycler, + &writer, + &entry_receiver, + ); + if exit.load(Ordering::Relaxed) { + info!("broadcat_service exiting"); + break; + } } }) .unwrap();