Create function for thin client thread

This commit is contained in:
Greg Fitzgerald
2018-05-11 17:58:27 -06:00
parent 55100854d6
commit f0be595e4c

View File

@ -16,7 +16,7 @@ use std::io::Write;
use std::io::sink; use std::io::sink;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Sender}; use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
@ -196,6 +196,30 @@ impl Tpu {
Ok(()) Ok(())
} }
fn thin_client_service(
obj: SharedTpu,
exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
responder_sender: streamer::BlobSender,
packet_recycler: packet::PacketRecycler,
blob_recycler: packet::BlobRecycler,
) -> JoinHandle<()> {
spawn(move || loop {
let e = obj.thin_client_service.process_request_packets(
&obj.accounting_stage,
&verified_receiver,
&responder_sender,
&packet_recycler,
&blob_recycler,
);
if e.is_err() {
if exit.load(Ordering::Relaxed) {
break;
}
}
})
}
/// Create a UDP microservice that forwards messages the given Tpu. /// Create a UDP microservice that forwards messages the given Tpu.
/// This service is the network leader /// This service is the network leader
/// Set `exit` to shutdown its threads. /// Set `exit` to shutdown its threads.
@ -270,26 +294,19 @@ impl Tpu {
Mutex::new(writer), Mutex::new(writer),
); );
let tpu = obj.clone(); let t_thin_client = Self::thin_client_service(
let t_server = spawn(move || loop { obj.clone(),
let e = tpu.thin_client_service.process_request_packets( exit.clone(),
&tpu.accounting_stage, verified_receiver,
&verified_receiver, responder_sender,
&responder_sender, packet_recycler.clone(),
&packet_recycler, blob_recycler.clone(),
&blob_recycler, );
);
if e.is_err() {
if exit.load(Ordering::Relaxed) {
break;
}
}
});
let mut threads = vec![ let mut threads = vec![
t_receiver, t_receiver,
t_responder, t_responder,
t_server, t_thin_client,
t_sync, t_sync,
t_gossip, t_gossip,
t_listen, t_listen,
@ -423,22 +440,14 @@ impl Tpu {
} }
let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone()); let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone());
let tpu = obj.clone(); let t_thin_client = Self::thin_client_service(
let s_exit = exit.clone(); obj.clone(),
let t_server = spawn(move || loop { exit.clone(),
let e = tpu.thin_client_service.process_request_packets( verified_receiver,
&tpu.accounting_stage, responder_sender,
&verified_receiver, packet_recycler.clone(),
&responder_sender, blob_recycler.clone(),
&packet_recycler, );
&blob_recycler,
);
if e.is_err() {
if s_exit.load(Ordering::Relaxed) {
break;
}
}
});
let mut threads = vec![ let mut threads = vec![
//replicate threads //replicate threads
@ -451,7 +460,7 @@ impl Tpu {
//serve threads //serve threads
t_packet_receiver, t_packet_receiver,
t_responder, t_responder,
t_server, t_thin_client,
t_sync, t_sync,
]; ];
threads.extend(verify_threads.into_iter()); threads.extend(verify_threads.into_iter());