From dcaeacc5077542b8cf310f1862048bdcd51f3177 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 25 May 2018 17:10:14 -0600 Subject: [PATCH] request_stage::serialize_packets -> packet::to_blobs Good stuff - no need to hide them. --- src/packet.rs | 28 ++++++++++++++++++++++++++++ src/request_stage.rs | 35 ++--------------------------------- 2 files changed, 30 insertions(+), 33 deletions(-) diff --git a/src/packet.rs b/src/packet.rs index 086cd8e0cc..064f65d4e0 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -236,6 +236,34 @@ pub fn to_packets(r: &PacketRecycler, xs: Vec) -> Vec( + resp: T, + rsp_addr: SocketAddr, + blob_recycler: &BlobRecycler, +) -> Result { + let blob = blob_recycler.allocate(); + { + let mut b = blob.write().unwrap(); + let v = serialize(&resp)?; + let len = v.len(); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + } + Ok(blob) +} + +pub fn to_blobs( + rsps: Vec<(T, SocketAddr)>, + blob_recycler: &BlobRecycler, +) -> Result> { + let mut blobs = VecDeque::new(); + for (resp, rsp_addr) in rsps { + blobs.push_back(to_blob(resp, rsp_addr, blob_recycler)?); + } + Ok(blobs) +} + const BLOB_INDEX_END: usize = size_of::(); const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::() + size_of::(); diff --git a/src/request_stage.rs b/src/request_stage.rs index 63344ede78..33d845aab4 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -1,14 +1,12 @@ //! The `request_stage` processes thin client Request messages. -use bincode::{deserialize, serialize}; +use bincode::deserialize; use packet; use packet::SharedPackets; use rayon::prelude::*; use request::Request; use request_processor::RequestProcessor; use result::Result; -use serde::Serialize; -use std::collections::VecDeque; use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -36,35 +34,6 @@ impl RequestStage { .collect() } - /// Split Request list into verified transactions and the rest - fn serialize_response( - resp: T, - rsp_addr: SocketAddr, - blob_recycler: &packet::BlobRecycler, - ) -> Result { - let blob = blob_recycler.allocate(); - { - let mut b = blob.write().unwrap(); - let v = serialize(&resp)?; - let len = v.len(); - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - } - Ok(blob) - } - - fn serialize_responses( - rsps: Vec<(T, SocketAddr)>, - blob_recycler: &packet::BlobRecycler, - ) -> Result> { - let mut blobs = VecDeque::new(); - for (resp, rsp_addr) in rsps { - blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); - } - Ok(blobs) - } - pub fn process_request_packets( request_processor: &RequestProcessor, packet_receiver: &Receiver, @@ -91,7 +60,7 @@ impl RequestStage { let rsps = request_processor.process_requests(reqs); - let blobs = Self::serialize_responses(rsps, blob_recycler)?; + let blobs = packet::to_blobs(rsps, blob_recycler)?; if !blobs.is_empty() { info!("process: sending blobs: {}", blobs.len()); //don't wake up the other side if there is nothing