Let thin client own the receiver channel

This commit is contained in:
Greg Fitzgerald
2018-05-11 23:45:57 -06:00
parent d2f95d5319
commit 2376dfc139
2 changed files with 15 additions and 23 deletions

View File

@ -15,15 +15,14 @@ use signature::PublicKey;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use transaction::Transaction;
//use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::mpsc::Receiver;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use streamer; use streamer;
use timing; use timing;
use transaction::Transaction;
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
@ -62,8 +61,6 @@ pub enum Response {
} }
pub struct RequestProcessor { pub struct RequestProcessor {
//pub output: Mutex<Receiver<Response>>,
//response_sender: Mutex<Sender<Response>>,
accountant: Arc<Accountant>, accountant: Arc<Accountant>,
entry_info_subscribers: Mutex<Vec<SocketAddr>>, entry_info_subscribers: Mutex<Vec<SocketAddr>>,
} }
@ -71,10 +68,7 @@ pub struct RequestProcessor {
impl RequestProcessor { impl RequestProcessor {
/// Create a new Tpu that wraps the given Accountant. /// Create a new Tpu that wraps the given Accountant.
pub fn new(accountant: Arc<Accountant>) -> Self { pub fn new(accountant: Arc<Accountant>) -> Self {
//let (response_sender, output) = channel();
RequestProcessor { RequestProcessor {
//output: Mutex::new(output),
//response_sender: Mutex::new(response_sender),
accountant, accountant,
entry_info_subscribers: Mutex::new(vec![]), entry_info_subscribers: Mutex::new(vec![]),
} }
@ -278,6 +272,7 @@ impl RequestProcessor {
pub struct ThinClientService { pub struct ThinClientService {
pub thread_hdl: JoinHandle<()>, pub thread_hdl: JoinHandle<()>,
pub output: streamer::BlobReceiver,
} }
impl ThinClientService { impl ThinClientService {
@ -286,10 +281,10 @@ impl ThinClientService {
accounting_stage: Arc<AccountingStage>, accounting_stage: Arc<AccountingStage>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>, verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
responder_sender: streamer::BlobSender,
packet_recycler: packet::PacketRecycler, packet_recycler: packet::PacketRecycler,
blob_recycler: packet::BlobRecycler, blob_recycler: packet::BlobRecycler,
) -> Self { ) -> Self {
let (responder_sender, output) = channel();
let thread_hdl = spawn(move || loop { let thread_hdl = spawn(move || loop {
let e = request_processor.process_request_packets( let e = request_processor.process_request_packets(
&accounting_stage, &accounting_stage,
@ -304,7 +299,7 @@ impl ThinClientService {
} }
} }
}); });
ThinClientService { thread_hdl } ThinClientService { thread_hdl, output }
} }
} }

View File

@ -97,13 +97,11 @@ impl Tpu {
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
let blob_recycler = packet::BlobRecycler::default(); let blob_recycler = packet::BlobRecycler::default();
let (responder_sender, responder_receiver) = channel();
let thin_client_service = ThinClientService::new( let thin_client_service = ThinClientService::new(
obj.request_processor.clone(), obj.request_processor.clone(),
obj.accounting_stage.clone(), obj.accounting_stage.clone(),
exit.clone(), exit.clone(),
sig_verify_stage.output, sig_verify_stage.output,
responder_sender,
packet_recycler.clone(), packet_recycler.clone(),
blob_recycler.clone(), blob_recycler.clone(),
); );
@ -131,7 +129,7 @@ impl Tpu {
respond_socket, respond_socket,
exit.clone(), exit.clone(),
blob_recycler.clone(), blob_recycler.clone(),
responder_receiver, thin_client_service.output,
); );
let mut threads = vec![ let mut threads = vec![
@ -265,28 +263,27 @@ impl Tpu {
packet_recycler.clone(), packet_recycler.clone(),
packet_sender, packet_sender,
)?; )?;
let (responder_sender, responder_receiver) = channel();
let t_responder = streamer::responder(
respond_socket,
exit.clone(),
blob_recycler.clone(),
responder_receiver,
);
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
let t_write = Self::drain_service(obj.clone(), exit.clone());
let thin_client_service = ThinClientService::new( let thin_client_service = ThinClientService::new(
obj.request_processor.clone(), obj.request_processor.clone(),
obj.accounting_stage.clone(), obj.accounting_stage.clone(),
exit.clone(), exit.clone(),
sig_verify_stage.output, sig_verify_stage.output,
responder_sender,
packet_recycler.clone(), packet_recycler.clone(),
blob_recycler.clone(), blob_recycler.clone(),
); );
let t_write = Self::drain_service(obj.clone(), exit.clone());
let t_responder = streamer::responder(
respond_socket,
exit.clone(),
blob_recycler.clone(),
thin_client_service.output,
);
let mut threads = vec![ let mut threads = vec![
//replicate threads //replicate threads
t_blob_receiver, t_blob_receiver,