request_stage::serialize_packets -> packet::to_blobs
Good stuff - no need to hide them.
This commit is contained in:
@ -236,6 +236,34 @@ pub fn to_packets<T: Serialize>(r: &PacketRecycler, xs: Vec<T>) -> Vec<SharedPac
|
|||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn to_blob<T: Serialize>(
|
||||||
|
resp: T,
|
||||||
|
rsp_addr: SocketAddr,
|
||||||
|
blob_recycler: &BlobRecycler,
|
||||||
|
) -> Result<SharedBlob> {
|
||||||
|
let blob = blob_recycler.allocate();
|
||||||
|
{
|
||||||
|
let mut b = blob.write().unwrap();
|
||||||
|
let v = serialize(&resp)?;
|
||||||
|
let len = v.len();
|
||||||
|
b.data[..len].copy_from_slice(&v);
|
||||||
|
b.meta.size = len;
|
||||||
|
b.meta.set_addr(&rsp_addr);
|
||||||
|
}
|
||||||
|
Ok(blob)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn to_blobs<T: Serialize>(
|
||||||
|
rsps: Vec<(T, SocketAddr)>,
|
||||||
|
blob_recycler: &BlobRecycler,
|
||||||
|
) -> Result<VecDeque<SharedBlob>> {
|
||||||
|
let mut blobs = VecDeque::new();
|
||||||
|
for (resp, rsp_addr) in rsps {
|
||||||
|
blobs.push_back(to_blob(resp, rsp_addr, blob_recycler)?);
|
||||||
|
}
|
||||||
|
Ok(blobs)
|
||||||
|
}
|
||||||
|
|
||||||
const BLOB_INDEX_END: usize = size_of::<u64>();
|
const BLOB_INDEX_END: usize = size_of::<u64>();
|
||||||
const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::<usize>() + size_of::<PublicKey>();
|
const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::<usize>() + size_of::<PublicKey>();
|
||||||
|
|
||||||
|
@ -1,14 +1,12 @@
|
|||||||
//! The `request_stage` processes thin client Request messages.
|
//! The `request_stage` processes thin client Request messages.
|
||||||
|
|
||||||
use bincode::{deserialize, serialize};
|
use bincode::deserialize;
|
||||||
use packet;
|
use packet;
|
||||||
use packet::SharedPackets;
|
use packet::SharedPackets;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use request::Request;
|
use request::Request;
|
||||||
use request_processor::RequestProcessor;
|
use request_processor::RequestProcessor;
|
||||||
use result::Result;
|
use result::Result;
|
||||||
use serde::Serialize;
|
|
||||||
use std::collections::VecDeque;
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
@ -36,35 +34,6 @@ impl RequestStage {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Split Request list into verified transactions and the rest
|
|
||||||
fn serialize_response<T: Serialize>(
|
|
||||||
resp: T,
|
|
||||||
rsp_addr: SocketAddr,
|
|
||||||
blob_recycler: &packet::BlobRecycler,
|
|
||||||
) -> Result<packet::SharedBlob> {
|
|
||||||
let blob = blob_recycler.allocate();
|
|
||||||
{
|
|
||||||
let mut b = blob.write().unwrap();
|
|
||||||
let v = serialize(&resp)?;
|
|
||||||
let len = v.len();
|
|
||||||
b.data[..len].copy_from_slice(&v);
|
|
||||||
b.meta.size = len;
|
|
||||||
b.meta.set_addr(&rsp_addr);
|
|
||||||
}
|
|
||||||
Ok(blob)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_responses<T: Serialize>(
|
|
||||||
rsps: Vec<(T, SocketAddr)>,
|
|
||||||
blob_recycler: &packet::BlobRecycler,
|
|
||||||
) -> Result<VecDeque<packet::SharedBlob>> {
|
|
||||||
let mut blobs = VecDeque::new();
|
|
||||||
for (resp, rsp_addr) in rsps {
|
|
||||||
blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?);
|
|
||||||
}
|
|
||||||
Ok(blobs)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn process_request_packets(
|
pub fn process_request_packets(
|
||||||
request_processor: &RequestProcessor,
|
request_processor: &RequestProcessor,
|
||||||
packet_receiver: &Receiver<SharedPackets>,
|
packet_receiver: &Receiver<SharedPackets>,
|
||||||
@ -91,7 +60,7 @@ impl RequestStage {
|
|||||||
|
|
||||||
let rsps = request_processor.process_requests(reqs);
|
let rsps = request_processor.process_requests(reqs);
|
||||||
|
|
||||||
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
let blobs = packet::to_blobs(rsps, blob_recycler)?;
|
||||||
if !blobs.is_empty() {
|
if !blobs.is_empty() {
|
||||||
info!("process: sending blobs: {}", blobs.len());
|
info!("process: sending blobs: {}", blobs.len());
|
||||||
//don't wake up the other side if there is nothing
|
//don't wake up the other side if there is nothing
|
||||||
|
Reference in New Issue
Block a user