Files
solana/src/request_processor.rs

152 lines
4.7 KiB
Rust
Raw Normal View History

2018-05-12 17:57:28 -06:00
//! The `request_stage` processes thin client Request messages.
2018-05-14 15:33:11 -06:00
use bank::Bank;
2018-05-12 17:57:28 -06:00
use bincode::{deserialize, serialize};
use packet;
use packet::SharedPackets;
use rayon::prelude::*;
use request::{Request, Response};
2018-05-12 17:57:28 -06:00
use result::Result;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::mpsc::Receiver;
2018-05-12 17:57:28 -06:00
use std::time::Instant;
use streamer;
use timing;
pub struct RequestProcessor {
2018-05-14 15:33:11 -06:00
bank: Arc<Bank>,
2018-05-12 17:57:28 -06:00
}
impl RequestProcessor {
2018-05-14 15:33:11 -06:00
/// Create a new Tpu that wraps the given Bank.
pub fn new(bank: Arc<Bank>) -> Self {
RequestProcessor { bank }
2018-05-12 17:57:28 -06:00
}
/// Process Request items sent by clients.
fn process_request(
&self,
msg: Request,
rsp_addr: SocketAddr,
) -> Option<(Response, SocketAddr)> {
match msg {
Request::GetBalance { key } => {
2018-05-14 15:33:11 -06:00
let val = self.bank.get_balance(&key);
2018-05-12 17:57:28 -06:00
let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp);
Some(rsp)
}
2018-05-14 09:35:10 -06:00
Request::GetLastId => {
2018-05-14 15:33:11 -06:00
let id = self.bank.last_id();
2018-05-14 09:35:10 -06:00
let rsp = (Response::LastId { id }, rsp_addr);
info!("Response::LastId {:?}", rsp);
Some(rsp)
}
Request::GetTransactionCount => {
2018-05-14 15:33:11 -06:00
let transaction_count = self.bank.transaction_count() as u64;
let rsp = (Response::TransactionCount { transaction_count }, rsp_addr);
info!("Response::TransactionCount {:?}", rsp);
Some(rsp)
}
2018-05-12 17:57:28 -06:00
}
}
pub fn process_requests(
&self,
reqs: Vec<(Request, SocketAddr)>,
) -> Vec<(Response, SocketAddr)> {
reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect()
}
2018-05-25 15:51:41 -06:00
pub fn deserialize_requests(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> {
2018-05-12 17:57:28 -06:00
p.packets
.par_iter()
.map(|x| {
deserialize(&x.data[0..x.meta.size])
.map(|req| (req, x.meta.addr()))
.ok()
})
.collect()
}
/// Split Request list into verified transactions and the rest
fn serialize_response(
resp: Response,
rsp_addr: SocketAddr,
blob_recycler: &packet::BlobRecycler,
) -> Result<packet::SharedBlob> {
let blob = blob_recycler.allocate();
{
let mut b = blob.write().unwrap();
let v = serialize(&resp)?;
let len = v.len();
b.data[..len].copy_from_slice(&v);
b.meta.size = len;
b.meta.set_addr(&rsp_addr);
}
Ok(blob)
}
fn serialize_responses(
rsps: Vec<(Response, SocketAddr)>,
blob_recycler: &packet::BlobRecycler,
) -> Result<VecDeque<packet::SharedBlob>> {
let mut blobs = VecDeque::new();
for (resp, rsp_addr) in rsps {
blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?);
}
Ok(blobs)
}
pub fn process_request_packets(
&self,
packet_receiver: &Receiver<SharedPackets>,
2018-05-12 17:57:28 -06:00
blob_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let (batch, batch_len) = streamer::recv_batch(packet_receiver)?;
2018-05-12 17:57:28 -06:00
info!(
"@{:?} request_stage: processing: {}",
2018-05-12 17:57:28 -06:00
timing::timestamp(),
batch_len
2018-05-12 17:57:28 -06:00
);
let mut reqs_len = 0;
2018-05-12 17:57:28 -06:00
let proc_start = Instant::now();
for msgs in batch {
let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap())
.into_iter()
.filter_map(|x| x)
2018-05-12 17:57:28 -06:00
.collect();
reqs_len += reqs.len();
2018-05-12 17:57:28 -06:00
let rsps = self.process_requests(reqs);
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
if !blobs.is_empty() {
info!("process: sending blobs: {}", blobs.len());
//don't wake up the other side if there is nothing
blob_sender.send(blobs)?;
}
packet_recycler.recycle(msgs);
}
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
info!(
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
timing::timestamp(),
batch_len,
2018-05-12 17:57:28 -06:00
total_time_ms,
reqs_len,
(reqs_len as f32) / (total_time_s)
);
Ok(())
}
}