Consistent naming of senders and receivers

This commit is contained in:
Greg Fitzgerald
2018-05-12 15:24:17 -06:00
parent a3869dd4c1
commit 6264508f5e
6 changed files with 24 additions and 24 deletions

View File

@@ -37,7 +37,7 @@ impl EventProcessor {
sender.send(Signal::Events(events))?; sender.send(Signal::Events(events))?;
// Wait for the historian to tag our Events with an ID and then register it. // Wait for the historian to tag our Events with an ID and then register it.
let entry = historian.output.lock().unwrap().recv()?; let entry = historian.entry_receiver.lock().unwrap().recv()?;
self.accountant.register_entry_id(&entry.id); self.accountant.register_entry_id(&entry.id);
Ok(entry) Ok(entry)
} }

View File

@@ -10,7 +10,7 @@ use std::thread::{spawn, JoinHandle};
use std::time::Instant; use std::time::Instant;
pub struct Historian { pub struct Historian {
pub output: Mutex<Receiver<Entry>>, pub entry_receiver: Mutex<Receiver<Entry>>,
pub thread_hdl: JoinHandle<ExitReason>, pub thread_hdl: JoinHandle<ExitReason>,
} }
@@ -20,11 +20,11 @@ impl Historian {
start_hash: &Hash, start_hash: &Hash,
ms_per_tick: Option<u64>, ms_per_tick: Option<u64>,
) -> Self { ) -> Self {
let (entry_sender, output) = channel(); let (entry_sender, entry_receiver) = channel();
let thread_hdl = let thread_hdl =
Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender);
Historian { Historian {
output: Mutex::new(output), entry_receiver: Mutex::new(entry_receiver),
thread_hdl, thread_hdl,
} }
} }
@@ -52,9 +52,9 @@ impl Historian {
} }
pub fn receive(self: &Self) -> Result<Entry, TryRecvError> { pub fn receive(self: &Self) -> Result<Entry, TryRecvError> {
self.output self.entry_receiver
.lock() .lock()
.expect("'output' lock in pub fn receive") .expect("'entry_receiver' lock in pub fn receive")
.try_recv() .try_recv()
} }
} }
@@ -78,9 +78,9 @@ mod tests {
sleep(Duration::new(0, 1_000_000)); sleep(Duration::new(0, 1_000_000));
input.send(Signal::Tick).unwrap(); input.send(Signal::Tick).unwrap();
let entry0 = hist.output.lock().unwrap().recv().unwrap(); let entry0 = hist.entry_receiver.lock().unwrap().recv().unwrap();
let entry1 = hist.output.lock().unwrap().recv().unwrap(); let entry1 = hist.entry_receiver.lock().unwrap().recv().unwrap();
let entry2 = hist.output.lock().unwrap().recv().unwrap(); let entry2 = hist.entry_receiver.lock().unwrap().recv().unwrap();
assert_eq!(entry0.num_hashes, 0); assert_eq!(entry0.num_hashes, 0);
assert_eq!(entry1.num_hashes, 0); assert_eq!(entry1.num_hashes, 0);
@@ -100,7 +100,7 @@ mod tests {
let (input, event_receiver) = channel(); let (input, event_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let hist = Historian::new(event_receiver, &zero, None); let hist = Historian::new(event_receiver, &zero, None);
drop(hist.output); drop(hist.entry_receiver);
input.send(Signal::Tick).unwrap(); input.send(Signal::Tick).unwrap();
assert_eq!( assert_eq!(
hist.thread_hdl.join().unwrap(), hist.thread_hdl.join().unwrap(),
@@ -116,7 +116,7 @@ mod tests {
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));
input.send(Signal::Tick).unwrap(); input.send(Signal::Tick).unwrap();
drop(input); drop(input);
let entries: Vec<Entry> = hist.output.lock().unwrap().iter().collect(); let entries: Vec<Entry> = hist.entry_receiver.lock().unwrap().iter().collect();
assert!(entries.len() > 1); assert!(entries.len() > 1);
// Ensure the ID is not the seed. // Ensure the ID is not the seed.

View File

@@ -208,7 +208,7 @@ impl RequestProcessor {
event_processor: &EventProcessor, event_processor: &EventProcessor,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>, verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
entry_sender: &Sender<Entry>, entry_sender: &Sender<Entry>,
responder_sender: &streamer::BlobSender, blob_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler, packet_recycler: &packet::PacketRecycler,
blob_recycler: &packet::BlobRecycler, blob_recycler: &packet::BlobRecycler,
) -> Result<()> { ) -> Result<()> {
@@ -253,7 +253,7 @@ impl RequestProcessor {
if !blobs.is_empty() { if !blobs.is_empty() {
info!("process: sending blobs: {}", blobs.len()); info!("process: sending blobs: {}", blobs.len());
//don't wake up the other side if there is nothing //don't wake up the other side if there is nothing
responder_sender.send(blobs)?; blob_sender.send(blobs)?;
} }
packet_recycler.recycle(msgs); packet_recycler.recycle(msgs);
} }
@@ -274,7 +274,7 @@ impl RequestProcessor {
pub struct RequestStage { pub struct RequestStage {
pub thread_hdl: JoinHandle<()>, pub thread_hdl: JoinHandle<()>,
pub entry_receiver: Receiver<Entry>, pub entry_receiver: Receiver<Entry>,
pub output: streamer::BlobReceiver, pub blob_receiver: streamer::BlobReceiver,
pub request_processor: Arc<RequestProcessor>, pub request_processor: Arc<RequestProcessor>,
} }
@@ -290,13 +290,13 @@ impl RequestStage {
let request_processor = Arc::new(request_processor); let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone(); let request_processor_ = request_processor.clone();
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let (responder_sender, output) = channel(); let (blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || loop { let thread_hdl = spawn(move || loop {
let e = request_processor_.process_request_packets( let e = request_processor_.process_request_packets(
&event_processor, &event_processor,
&verified_receiver, &verified_receiver,
&entry_sender, &entry_sender,
&responder_sender, &blob_sender,
&packet_recycler, &packet_recycler,
&blob_recycler, &blob_recycler,
); );
@@ -309,7 +309,7 @@ impl RequestStage {
RequestStage { RequestStage {
thread_hdl, thread_hdl,
entry_receiver, entry_receiver,
output, blob_receiver,
request_processor, request_processor,
} }
} }

View File

@@ -89,7 +89,7 @@ impl Rpu {
request_processor, request_processor,
self.event_processor.clone(), self.event_processor.clone(),
exit.clone(), exit.clone(),
sig_verify_stage.output, sig_verify_stage.verified_receiver,
packet_recycler.clone(), packet_recycler.clone(),
blob_recycler.clone(), blob_recycler.clone(),
); );
@@ -119,7 +119,7 @@ impl Rpu {
respond_socket, respond_socket,
exit.clone(), exit.clone(),
blob_recycler.clone(), blob_recycler.clone(),
request_stage.output, request_stage.blob_receiver,
); );
let mut threads = vec![ let mut threads = vec![

View File

@@ -13,17 +13,17 @@ use streamer;
use timing; use timing;
pub struct SigVerifyStage { pub struct SigVerifyStage {
pub output: Receiver<Vec<(SharedPackets, Vec<u8>)>>, pub verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
pub thread_hdls: Vec<JoinHandle<()>>, pub thread_hdls: Vec<JoinHandle<()>>,
} }
impl SigVerifyStage { impl SigVerifyStage {
pub fn new(exit: Arc<AtomicBool>, packets_receiver: Receiver<SharedPackets>) -> Self { pub fn new(exit: Arc<AtomicBool>, packets_receiver: Receiver<SharedPackets>) -> Self {
let (verified_sender, output) = channel(); let (verified_sender, verified_receiver) = channel();
let thread_hdls = Self::verifier_services(exit, packets_receiver, verified_sender); let thread_hdls = Self::verifier_services(exit, packets_receiver, verified_sender);
SigVerifyStage { SigVerifyStage {
thread_hdls, thread_hdls,
output, verified_receiver,
} }
} }

View File

@@ -174,7 +174,7 @@ impl Tvu {
request_processor, request_processor,
obj.event_processor.clone(), obj.event_processor.clone(),
exit.clone(), exit.clone(),
sig_verify_stage.output, sig_verify_stage.verified_receiver,
packet_recycler.clone(), packet_recycler.clone(),
blob_recycler.clone(), blob_recycler.clone(),
); );
@@ -190,7 +190,7 @@ impl Tvu {
respond_socket, respond_socket,
exit.clone(), exit.clone(),
blob_recycler.clone(), blob_recycler.clone(),
request_stage.output, request_stage.blob_receiver,
); );
let mut threads = vec![ let mut threads = vec![