This commit is contained in:
Anatoly Yakovenko
2018-05-30 13:38:15 -07:00
committed by Greg Fitzgerald
parent a2b92c35e1
commit a8e1c44663
5 changed files with 92 additions and 74 deletions

View File

@ -11,7 +11,7 @@ use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{spawn, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use timing; use timing;
@ -30,19 +30,22 @@ impl BankingStage {
packet_recycler: packet::PacketRecycler, packet_recycler: packet::PacketRecycler,
) -> Self { ) -> Self {
let (signal_sender, signal_receiver) = channel(); let (signal_sender, signal_receiver) = channel();
let thread_hdl = spawn(move || loop { let thread_hdl = Builder::new()
let e = Self::process_packets( .name("solana-banking-stage".to_string())
bank.clone(), .spawn(move || loop {
&verified_receiver, let e = Self::process_packets(
&signal_sender, bank.clone(),
&packet_recycler, &verified_receiver,
); &signal_sender,
if e.is_err() { &packet_recycler,
if exit.load(Ordering::Relaxed) { );
break; if e.is_err() {
if exit.load(Ordering::Relaxed) {
break;
}
} }
} })
}); .unwrap();
BankingStage { BankingStage {
thread_hdl, thread_hdl,
signal_receiver, signal_receiver,

View File

@ -9,7 +9,7 @@ use entry::Entry;
use hash::Hash; use hash::Hash;
use recorder::Recorder; use recorder::Recorder;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::thread::{spawn, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use transaction::Transaction; use transaction::Transaction;
@ -35,23 +35,26 @@ impl RecordStage {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let start_hash = start_hash.clone(); let start_hash = start_hash.clone();
let thread_hdl = spawn(move || { let thread_hdl = Builder::new()
let mut recorder = Recorder::new(start_hash); .name("solana-record-stage".to_string())
let duration_data = tick_duration.map(|dur| (Instant::now(), dur)); .spawn(move || {
loop { let mut recorder = Recorder::new(start_hash);
if let Err(_) = Self::process_transactions( let duration_data = tick_duration.map(|dur| (Instant::now(), dur));
&mut recorder, loop {
duration_data, if let Err(_) = Self::process_transactions(
&transaction_receiver, &mut recorder,
&entry_sender, duration_data,
) { &transaction_receiver,
return; &entry_sender,
) {
return;
}
if duration_data.is_some() {
recorder.hash();
}
} }
if duration_data.is_some() { })
recorder.hash(); .unwrap();
}
}
});
RecordStage { RecordStage {
entry_receiver, entry_receiver,

View File

@ -6,7 +6,7 @@ use packet;
use result::Result; use result::Result;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{spawn, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer; use streamer;
@ -41,12 +41,15 @@ impl ReplicateStage {
window_receiver: streamer::BlobReceiver, window_receiver: streamer::BlobReceiver,
blob_recycler: packet::BlobRecycler, blob_recycler: packet::BlobRecycler,
) -> Self { ) -> Self {
let thread_hdl = spawn(move || loop { let thread_hdl = Builder::new()
let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler); .name("solana-replicate-stage".to_string())
if e.is_err() && exit.load(Ordering::Relaxed) { .spawn(move || loop {
break; let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler);
} if e.is_err() && exit.load(Ordering::Relaxed) {
}); break;
}
})
.unwrap();
ReplicateStage { thread_hdl } ReplicateStage { thread_hdl }
} }
} }

View File

@ -11,7 +11,7 @@ use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{spawn, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::Instant; use std::time::Instant;
use streamer; use streamer;
use timing; use timing;
@ -90,20 +90,23 @@ impl RequestStage {
let request_processor = Arc::new(request_processor); let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone(); let request_processor_ = request_processor.clone();
let (blob_sender, blob_receiver) = channel(); let (blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || loop { let thread_hdl = Builder::new()
let e = Self::process_request_packets( .name("solana-request-stage".to_string())
&request_processor_, .spawn(move || loop {
&packet_receiver, let e = Self::process_request_packets(
&blob_sender, &request_processor_,
&packet_recycler, &packet_receiver,
&blob_recycler, &blob_sender,
); &packet_recycler,
if e.is_err() { &blob_recycler,
if exit.load(Ordering::Relaxed) { );
break; if e.is_err() {
if exit.load(Ordering::Relaxed) {
break;
}
} }
} })
}); .unwrap();
RequestStage { RequestStage {
thread_hdl, thread_hdl,
blob_receiver, blob_receiver,

View File

@ -8,7 +8,7 @@ use std::io::Write;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle}; use std::thread::{Builder, JoinHandle};
use streamer; use streamer;
pub struct WriteStage { pub struct WriteStage {
@ -26,19 +26,22 @@ impl WriteStage {
entry_receiver: Receiver<Entry>, entry_receiver: Receiver<Entry>,
) -> Self { ) -> Self {
let (blob_sender, blob_receiver) = channel(); let (blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || loop { let thread_hdl = Builder::new()
let entry_writer = EntryWriter::new(&bank); .name("solana-writer".to_string())
let _ = entry_writer.write_and_send_entries( .spawn(move || loop {
&blob_sender, let entry_writer = EntryWriter::new(&bank);
&blob_recycler, let _ = entry_writer.write_and_send_entries(
&writer, &blob_sender,
&entry_receiver, &blob_recycler,
); &writer,
if exit.load(Ordering::Relaxed) { &entry_receiver,
info!("broadcat_service exiting"); );
break; if exit.load(Ordering::Relaxed) {
} info!("broadcat_service exiting");
}); break;
}
})
.unwrap();
WriteStage { WriteStage {
thread_hdl, thread_hdl,
@ -52,16 +55,19 @@ impl WriteStage {
entry_receiver: Receiver<Entry>, entry_receiver: Receiver<Entry>,
) -> Self { ) -> Self {
let (_blob_sender, blob_receiver) = channel(); let (_blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || { let thread_hdl = Builder::new()
let entry_writer = EntryWriter::new(&bank); .name("solana-drain".to_string())
loop { .spawn(move || {
let _ = entry_writer.drain_entries(&entry_receiver); let entry_writer = EntryWriter::new(&bank);
if exit.load(Ordering::Relaxed) { loop {
info!("drain_service exiting"); let _ = entry_writer.drain_entries(&entry_receiver);
break; if exit.load(Ordering::Relaxed) {
info!("drain_service exiting");
break;
}
} }
} })
}); .unwrap();
WriteStage { WriteStage {
thread_hdl, thread_hdl,