Remove transaction processing from RPU and request processing from TVU
This commit is contained in:
@@ -6,14 +6,12 @@ use event::Event;
|
||||
use packet;
|
||||
use packet::SharedPackets;
|
||||
use rayon::prelude::*;
|
||||
use recorder::Signal;
|
||||
use request::{Request, Response};
|
||||
use result::Result;
|
||||
use std::collections::VecDeque;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc::{Receiver, Sender};
|
||||
use std::time::Duration;
|
||||
use std::sync::mpsc::Receiver;
|
||||
use std::time::Instant;
|
||||
use streamer;
|
||||
use timing;
|
||||
@@ -53,7 +51,6 @@ impl RequestProcessor {
|
||||
info!("Response::TransactionCount {:?}", rsp);
|
||||
Some(rsp)
|
||||
}
|
||||
Request::Transaction(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,24 +88,6 @@ impl RequestProcessor {
|
||||
}
|
||||
|
||||
/// Split Request list into verified transactions and the rest
|
||||
fn partition_requests(
|
||||
req_vers: Vec<(Request, SocketAddr, u8)>,
|
||||
) -> (Vec<Event>, Vec<(Request, SocketAddr)>) {
|
||||
let mut events = vec![];
|
||||
let mut reqs = vec![];
|
||||
for (msg, rsp_addr, verify) in req_vers {
|
||||
match msg {
|
||||
Request::Transaction(tr) => {
|
||||
if verify != 0 {
|
||||
events.push(Event::Transaction(tr));
|
||||
}
|
||||
}
|
||||
_ => reqs.push((msg, rsp_addr)),
|
||||
}
|
||||
}
|
||||
(events, reqs)
|
||||
}
|
||||
|
||||
fn serialize_response(
|
||||
resp: Response,
|
||||
rsp_addr: SocketAddr,
|
||||
@@ -139,49 +118,29 @@ impl RequestProcessor {
|
||||
|
||||
pub fn process_request_packets(
|
||||
&self,
|
||||
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
|
||||
signal_sender: &Sender<Signal>,
|
||||
packet_receiver: &Receiver<SharedPackets>,
|
||||
blob_sender: &streamer::BlobSender,
|
||||
packet_recycler: &packet::PacketRecycler,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let recv_start = Instant::now();
|
||||
let mms = verified_receiver.recv_timeout(timer)?;
|
||||
let mut reqs_len = 0;
|
||||
let mms_len = mms.len();
|
||||
let (batch, batch_len) = streamer::recv_batch(packet_receiver)?;
|
||||
|
||||
info!(
|
||||
"@{:?} process start stalled for: {:?}ms batches: {}",
|
||||
"@{:?} request_stage: processing: {}",
|
||||
timing::timestamp(),
|
||||
timing::duration_as_ms(&recv_start.elapsed()),
|
||||
mms.len(),
|
||||
batch_len
|
||||
);
|
||||
|
||||
let mut reqs_len = 0;
|
||||
let proc_start = Instant::now();
|
||||
for (msgs, vers) in mms {
|
||||
let reqs = Self::deserialize_requests(&msgs.read().unwrap());
|
||||
reqs_len += reqs.len();
|
||||
let req_vers = reqs.into_iter()
|
||||
.zip(vers)
|
||||
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
|
||||
.filter(|x| {
|
||||
let v = x.0.verify();
|
||||
v
|
||||
})
|
||||
for msgs in batch {
|
||||
let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap())
|
||||
.into_iter()
|
||||
.filter_map(|x| x)
|
||||
.collect();
|
||||
reqs_len += reqs.len();
|
||||
|
||||
debug!("partitioning");
|
||||
let (events, reqs) = Self::partition_requests(req_vers);
|
||||
debug!("events: {} reqs: {}", events.len(), reqs.len());
|
||||
|
||||
debug!("process_events");
|
||||
let results = self.bank.process_verified_events(events);
|
||||
let events = results.into_iter().filter_map(|x| x.ok()).collect();
|
||||
signal_sender.send(Signal::Events(events))?;
|
||||
debug!("done process_events");
|
||||
|
||||
debug!("process_requests");
|
||||
let rsps = self.process_requests(reqs);
|
||||
debug!("done process_requests");
|
||||
|
||||
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
||||
if !blobs.is_empty() {
|
||||
@@ -196,7 +155,7 @@ impl RequestProcessor {
|
||||
info!(
|
||||
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
|
||||
timing::timestamp(),
|
||||
mms_len,
|
||||
batch_len,
|
||||
total_time_ms,
|
||||
reqs_len,
|
||||
(reqs_len as f32) / (total_time_s)
|
||||
|
Reference in New Issue
Block a user