diff --git a/src/request_processor.rs b/src/request_processor.rs index 6b7fd03f98..2ce0ecf487 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -1,19 +1,9 @@ //! The `request_processor` processes thin client Request messages. use bank::Bank; -use bincode::{deserialize, serialize}; -use packet; -use packet::SharedPackets; -use rayon::prelude::*; use request::{Request, Response}; -use result::Result; -use std::collections::VecDeque; use std::net::SocketAddr; use std::sync::Arc; -use std::sync::mpsc::Receiver; -use std::time::Instant; -use streamer; -use timing; pub struct RequestProcessor { bank: Arc, @@ -61,91 +51,4 @@ impl RequestProcessor { .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) .collect() } - - pub fn deserialize_requests(p: &packet::Packets) -> Vec> { - p.packets - .par_iter() - .map(|x| { - deserialize(&x.data[0..x.meta.size]) - .map(|req| (req, x.meta.addr())) - .ok() - }) - .collect() - } - - /// Split Request list into verified transactions and the rest - fn serialize_response( - resp: Response, - rsp_addr: SocketAddr, - blob_recycler: &packet::BlobRecycler, - ) -> Result { - let blob = blob_recycler.allocate(); - { - let mut b = blob.write().unwrap(); - let v = serialize(&resp)?; - let len = v.len(); - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - } - Ok(blob) - } - - fn serialize_responses( - rsps: Vec<(Response, SocketAddr)>, - blob_recycler: &packet::BlobRecycler, - ) -> Result> { - let mut blobs = VecDeque::new(); - for (resp, rsp_addr) in rsps { - blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); - } - Ok(blobs) - } - - pub fn process_request_packets( - &self, - packet_receiver: &Receiver, - blob_sender: &streamer::BlobSender, - packet_recycler: &packet::PacketRecycler, - blob_recycler: &packet::BlobRecycler, - ) -> Result<()> { - let (batch, batch_len) = streamer::recv_batch(packet_receiver)?; - - info!( - "@{:?} request_stage: processing: {}", - timing::timestamp(), - batch_len - ); - - let mut reqs_len = 0; - let proc_start = Instant::now(); - for msgs in batch { - let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap()) - .into_iter() - .filter_map(|x| x) - .collect(); - reqs_len += reqs.len(); - - let rsps = self.process_requests(reqs); - - let blobs = Self::serialize_responses(rsps, blob_recycler)?; - if !blobs.is_empty() { - info!("process: sending blobs: {}", blobs.len()); - //don't wake up the other side if there is nothing - blob_sender.send(blobs)?; - } - packet_recycler.recycle(msgs); - } - let total_time_s = timing::duration_as_s(&proc_start.elapsed()); - let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); - info!( - "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", - timing::timestamp(), - batch_len, - total_time_ms, - reqs_len, - (reqs_len as f32) / (total_time_s) - ); - Ok(()) - } } diff --git a/src/request_stage.rs b/src/request_stage.rs index cd98a7d439..8b4e0db09e 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -1,13 +1,21 @@ //! The `request_stage` processes thin client Request messages. +use bincode::{deserialize, serialize}; use packet; use packet::SharedPackets; +use rayon::prelude::*; +use request::{Request, Response}; use request_processor::RequestProcessor; +use result::Result; +use std::collections::VecDeque; +use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; use std::thread::{spawn, JoinHandle}; +use std::time::Instant; use streamer; +use timing; pub struct RequestStage { pub thread_hdl: JoinHandle<()>, @@ -16,6 +24,92 @@ pub struct RequestStage { } impl RequestStage { + pub fn deserialize_requests(p: &packet::Packets) -> Vec> { + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } + + /// Split Request list into verified transactions and the rest + fn serialize_response( + resp: Response, + rsp_addr: SocketAddr, + blob_recycler: &packet::BlobRecycler, + ) -> Result { + let blob = blob_recycler.allocate(); + { + let mut b = blob.write().unwrap(); + let v = serialize(&resp)?; + let len = v.len(); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + } + Ok(blob) + } + + fn serialize_responses( + rsps: Vec<(Response, SocketAddr)>, + blob_recycler: &packet::BlobRecycler, + ) -> Result> { + let mut blobs = VecDeque::new(); + for (resp, rsp_addr) in rsps { + blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); + } + Ok(blobs) + } + + pub fn process_request_packets( + request_processor: &RequestProcessor, + packet_receiver: &Receiver, + blob_sender: &streamer::BlobSender, + packet_recycler: &packet::PacketRecycler, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let (batch, batch_len) = streamer::recv_batch(packet_receiver)?; + + info!( + "@{:?} request_stage: processing: {}", + timing::timestamp(), + batch_len + ); + + let mut reqs_len = 0; + let proc_start = Instant::now(); + for msgs in batch { + let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap()) + .into_iter() + .filter_map(|x| x) + .collect(); + reqs_len += reqs.len(); + + let rsps = request_processor.process_requests(reqs); + + let blobs = Self::serialize_responses(rsps, blob_recycler)?; + if !blobs.is_empty() { + info!("process: sending blobs: {}", blobs.len()); + //don't wake up the other side if there is nothing + blob_sender.send(blobs)?; + } + packet_recycler.recycle(msgs); + } + let total_time_s = timing::duration_as_s(&proc_start.elapsed()); + let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); + info!( + "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", + timing::timestamp(), + batch_len, + total_time_ms, + reqs_len, + (reqs_len as f32) / (total_time_s) + ); + Ok(()) + } pub fn new( request_processor: RequestProcessor, exit: Arc, @@ -27,7 +121,8 @@ impl RequestStage { let request_processor_ = request_processor.clone(); let (blob_sender, blob_receiver) = channel(); let thread_hdl = spawn(move || loop { - let e = request_processor_.process_request_packets( + let e = Self::process_request_packets( + &request_processor_, &packet_receiver, &blob_sender, &packet_recycler,