From 232e1bb8a3f143e04e97daf5f5059542ab8daab8 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 29 Mar 2018 12:55:41 -0600 Subject: [PATCH 1/3] Colocate packet dependencies --- src/accountant_skel.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index eff72389c7..9558fab85c 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -101,6 +101,7 @@ impl AccountantSkel { let mut num = 0; let mut ursps = rsps.write().unwrap(); for packet in &msgs.read().unwrap().packets { + let rsp_addr = packet.meta.get_addr(); let sz = packet.meta.size; let req = deserialize(&packet.data[0..sz])?; if let Some(resp) = obj.lock().unwrap().process_request(req) { @@ -114,7 +115,7 @@ impl AccountantSkel { let len = v.len(); rsp.data[..len].copy_from_slice(&v); rsp.meta.size = len; - rsp.meta.set_addr(&packet.meta.get_addr()); + rsp.meta.set_addr(&rsp_addr); num += 1; } } From c59c38e50e324a35d5118b600f7574e5cff871cc Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 29 Mar 2018 13:09:21 -0600 Subject: [PATCH 2/3] Refactor for batch verification --- src/accountant_skel.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 9558fab85c..4b8c611858 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -11,7 +11,7 @@ use serde_json; use signature::PublicKey; use std::default::Default; use std::io::Write; -use std::net::UdpSocket; +use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; @@ -34,6 +34,10 @@ pub enum Request { GetId { is_last: bool }, } +fn filter_valid_requests(reqs: Vec<(Request, SocketAddr)>) -> Vec<(Request, SocketAddr)> { + reqs +} + #[derive(Serialize, Deserialize, Debug)] pub enum Response { Balance { key: PublicKey, val: Option }, @@ -98,12 +102,18 @@ impl AccountantSkel { let rsps = streamer::allocate(response_recycler); let rsps_ = rsps.clone(); { - let mut num = 0; - let mut ursps = rsps.write().unwrap(); + let mut reqs = vec![]; for packet in &msgs.read().unwrap().packets { let rsp_addr = packet.meta.get_addr(); let sz = packet.meta.size; let req = deserialize(&packet.data[0..sz])?; + reqs.push((req, rsp_addr)); + } + let reqs = filter_valid_requests(reqs); + + let mut num = 0; + let mut ursps = rsps.write().unwrap(); + for (req, rsp_addr) in reqs { if let Some(resp) = obj.lock().unwrap().process_request(req) { if ursps.responses.len() <= num { ursps From 22f5985f1b22ee91fab49cc0df2ce998c8cea81c Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 29 Mar 2018 13:18:08 -0600 Subject: [PATCH 3/3] Do request verification in parallel, and then process the verified requests --- src/accountant.rs | 2 +- src/accountant_skel.rs | 20 ++++++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index 622b96a1c1..8d4d90995f 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -116,7 +116,7 @@ impl Accountant { } /// Process a Transaction that has already been verified. - fn process_verified_transaction( + pub fn process_verified_transaction( self: &mut Self, tr: &Transaction, allow_deposits: bool, diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 4b8c611858..390fab91dc 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -19,6 +19,7 @@ use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; use transaction::Transaction; +use rayon::prelude::*; pub struct AccountantSkel { pub acc: Accountant, @@ -34,8 +35,19 @@ pub enum Request { GetId { is_last: bool }, } +impl Request { + /// Verify the request is valid. + pub fn verify(&self) -> bool { + match *self { + Request::Transaction(ref tr) => tr.verify(), + _ => true, + } + } +} + +/// Parallel verfication of a batch of requests. fn filter_valid_requests(reqs: Vec<(Request, SocketAddr)>) -> Vec<(Request, SocketAddr)> { - reqs + reqs.into_par_iter().filter({ |x| x.0.verify() }).collect() } #[derive(Serialize, Deserialize, Debug)] @@ -66,10 +78,10 @@ impl AccountantSkel { } /// Process Request items sent by clients. - pub fn process_request(self: &mut Self, msg: Request) -> Option { + pub fn process_verified_request(self: &mut Self, msg: Request) -> Option { match msg { Request::Transaction(tr) => { - if let Err(err) = self.acc.process_transaction(tr) { + if let Err(err) = self.acc.process_verified_transaction(&tr, false) { eprintln!("Transaction error: {:?}", err); } None @@ -114,7 +126,7 @@ impl AccountantSkel { let mut num = 0; let mut ursps = rsps.write().unwrap(); for (req, rsp_addr) in reqs { - if let Some(resp) = obj.lock().unwrap().process_request(req) { + if let Some(resp) = obj.lock().unwrap().process_verified_request(req) { if ursps.responses.len() <= num { ursps .responses