Merge pull request #183 from sakridge/verify_thread_rework

Rework sig processing threads and add perf for process/verify
This commit is contained in:
Greg Fitzgerald 2018-05-08 13:15:41 -06:00 committed by GitHub
commit 3bb06d8364
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 140 additions and 51 deletions

View File

@ -68,3 +68,4 @@ libc = "^0.2.1"
getopts = "^0.2" getopts = "^0.2"
isatty = "0.1" isatty = "0.1"
futures = "0.1" futures = "0.1"
rand = "0.4.2"

View File

@ -17,7 +17,6 @@ use recorder::Signal;
use result::Result; use result::Result;
use serde_json; use serde_json;
use signature::PublicKey; use signature::PublicKey;
use std::cmp::max;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::sink; use std::io::sink;
use std::io::{Cursor, Write}; use std::io::{Cursor, Write};
@ -30,6 +29,9 @@ use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer; use streamer;
use transaction::Transaction; use transaction::Transaction;
use timing;
use std::time::Instant;
use rand::{thread_rng, Rng};
pub struct AccountantSkel { pub struct AccountantSkel {
acc: Mutex<Accountant>, acc: Mutex<Accountant>,
@ -256,41 +258,64 @@ impl AccountantSkel {
} }
} }
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<Vec<SharedPackets>> { fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?; let msgs = recvr.recv_timeout(timer)?;
debug!("got msgs"); debug!("got msgs");
let mut len = msgs.read().unwrap().packets.len();
let mut batch = vec![msgs]; let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() { while let Ok(more) = recvr.try_recv() {
debug!("got more msgs"); trace!("got more msgs");
len += more.read().unwrap().packets.len();
batch.push(more); batch.push(more);
if len > 100_000 {
break;
}
} }
info!("batch len {}", batch.len()); debug!("batch len {}", batch.len());
Ok(batch) Ok((batch, len))
} }
fn verify_batch(batch: Vec<SharedPackets>) -> Vec<Vec<(SharedPackets, Vec<u8>)>> { fn verify_batch(
let chunk_size = max(1, (batch.len() + 3) / 4); batch: Vec<SharedPackets>,
let batches: Vec<_> = batch.chunks(chunk_size).map(|x| x.to_vec()).collect(); sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
batches ) -> Result<()> {
.into_par_iter() let r = ecdsa::ed25519_verify(&batch);
.map(|batch| { let res = batch.into_iter().zip(r).collect();
let r = ecdsa::ed25519_verify(&batch); sendr.lock().unwrap().send(res)?;
batch.into_iter().zip(r).collect() // TODO: fix error handling here?
}) Ok(())
.collect()
} }
fn verifier( fn verifier(
recvr: &streamer::PacketReceiver, recvr: &Arc<Mutex<streamer::PacketReceiver>>,
sendr: &Sender<Vec<(SharedPackets, Vec<u8>)>>, sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> Result<()> { ) -> Result<()> {
let batch = Self::recv_batch(recvr)?; let (batch, len) = Self::recv_batch(&recvr.lock().unwrap())?;
let verified_batches = Self::verify_batch(batch); let now = Instant::now();
debug!("verified batches: {}", verified_batches.len()); let batch_len = batch.len();
for xs in verified_batches { let rand_id = thread_rng().gen_range(0, 100);
sendr.send(xs)?; info!(
} "@{:?} verifier: verifying: {} id: {}",
timing::timestamp(),
batch.len(),
rand_id
);
Self::verify_batch(batch, sendr).unwrap();
let total_time_ms = timing::duration_as_ms(&now.elapsed());
let total_time_s = timing::duration_as_s(&now.elapsed());
info!(
"@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}",
timing::timestamp(),
batch_len,
total_time_ms,
rand_id,
len,
(len as f32 / total_time_s)
);
Ok(()) Ok(())
} }
@ -335,8 +360,6 @@ impl AccountantSkel {
} }
} }
debug!("processing verified");
// Let validators know they should not attempt to process additional // Let validators know they should not attempt to process additional
// transactions in parallel. // transactions in parallel.
self.historian_input.lock().unwrap().send(Signal::Tick)?; self.historian_input.lock().unwrap().send(Signal::Tick)?;
@ -387,16 +410,25 @@ impl AccountantSkel {
blob_recycler: &packet::BlobRecycler, blob_recycler: &packet::BlobRecycler,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let recv_start = Instant::now();
let mms = verified_receiver.recv_timeout(timer)?; let mms = verified_receiver.recv_timeout(timer)?;
debug!("got some messages: {}", mms.len()); let mut reqs_len = 0;
let mms_len = mms.len();
info!(
"@{:?} process start stalled for: {:?}ms batches: {}",
timing::timestamp(),
timing::duration_as_ms(&recv_start.elapsed()),
mms.len(),
);
let proc_start = Instant::now();
for (msgs, vers) in mms { for (msgs, vers) in mms {
let reqs = Self::deserialize_packets(&msgs.read().unwrap()); let reqs = Self::deserialize_packets(&msgs.read().unwrap());
reqs_len += reqs.len();
let req_vers = reqs.into_iter() let req_vers = reqs.into_iter()
.zip(vers) .zip(vers)
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
.filter(|x| { .filter(|x| {
let v = x.0.verify(); let v = x.0.verify();
trace!("v:{} x:{:?}", v, x);
v v
}) })
.collect(); .collect();
@ -421,7 +453,16 @@ impl AccountantSkel {
} }
packet_recycler.recycle(msgs); packet_recycler.recycle(msgs);
} }
debug!("done responding"); let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
info!(
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
timing::timestamp(),
mms_len,
total_time_ms,
reqs_len,
(reqs_len as f32) / (total_time_s)
);
Ok(()) Ok(())
} }
/// Process verified blobs, already in order /// Process verified blobs, already in order
@ -486,13 +527,21 @@ impl AccountantSkel {
); );
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let exit_ = exit.clone(); let mut verify_threads = Vec::new();
let t_verifier = spawn(move || loop { let shared_verified_sender = Arc::new(Mutex::new(verified_sender));
let e = Self::verifier(&packet_receiver, &verified_sender); let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver));
if e.is_err() && exit_.load(Ordering::Relaxed) { for _ in 0..4 {
break; let exit_ = exit.clone();
} let recv = shared_packet_receiver.clone();
}); let sender = shared_verified_sender.clone();
let thread = spawn(move || loop {
let e = Self::verifier(&recv, &sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
verify_threads.push(thread);
}
let (broadcast_sender, broadcast_receiver) = channel(); let (broadcast_sender, broadcast_receiver) = channel();
@ -528,16 +577,18 @@ impl AccountantSkel {
} }
} }
}); });
Ok(vec![
let mut threads = vec![
t_receiver, t_receiver,
t_responder, t_responder,
t_server, t_server,
t_verifier,
t_sync, t_sync,
t_gossip, t_gossip,
t_listen, t_listen,
t_broadcast, t_broadcast,
]) ];
threads.extend(verify_threads.into_iter());
Ok(threads)
} }
/// This service receives messages from a leader in the network and processes the transactions /// This service receives messages from a leader in the network and processes the transactions
@ -639,15 +690,21 @@ impl AccountantSkel {
); );
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let exit_ = exit.clone(); let mut verify_threads = Vec::new();
let t_verifier = spawn(move || loop { let shared_verified_sender = Arc::new(Mutex::new(verified_sender));
let e = Self::verifier(&packet_receiver, &verified_sender); let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver));
if e.is_err() && exit_.load(Ordering::Relaxed) { for _ in 0..4 {
trace!("verifier exiting"); let exit_ = exit.clone();
break; let recv = shared_packet_receiver.clone();
} let sender = shared_verified_sender.clone();
}); let thread = spawn(move || loop {
let e = Self::verifier(&recv, &sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
verify_threads.push(thread);
}
let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone()); let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone());
let skel = obj.clone(); let skel = obj.clone();
@ -667,7 +724,7 @@ impl AccountantSkel {
} }
}); });
Ok(vec![ let mut threads = vec![
//replicate threads //replicate threads
t_blob_receiver, t_blob_receiver,
t_retransmit, t_retransmit,
@ -679,9 +736,10 @@ impl AccountantSkel {
t_packet_receiver, t_packet_receiver,
t_responder, t_responder,
t_server, t_server,
t_verifier,
t_sync, t_sync,
]) ];
threads.extend(verify_threads.into_iter());
Ok(threads)
} }
} }

View File

@ -161,6 +161,7 @@ mod tests {
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
use std::time::Instant;
// TODO: Figure out why this test sometimes hangs on TravisCI. // TODO: Figure out why this test sometimes hangs on TravisCI.
#[test] #[test]
@ -193,7 +194,18 @@ mod tests {
let last_id = acc.get_last_id().wait().unwrap(); let last_id = acc.get_last_id().wait().unwrap();
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id) let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
.unwrap(); .unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500); let mut balance;
let now = Instant::now();
loop {
balance = acc.get_balance(&bob_pubkey);
if balance.is_ok() {
break;
}
if now.elapsed().as_secs() > 0 {
break;
}
}
assert_eq!(balance.unwrap(), 500);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
for t in threads { for t in threads {
t.join().unwrap(); t.join().unwrap();

View File

@ -20,6 +20,7 @@ pub mod result;
pub mod signature; pub mod signature;
pub mod streamer; pub mod streamer;
pub mod transaction; pub mod transaction;
pub mod timing;
extern crate bincode; extern crate bincode;
extern crate byteorder; extern crate byteorder;
extern crate chrono; extern crate chrono;
@ -41,3 +42,5 @@ extern crate futures;
#[cfg(test)] #[cfg(test)]
#[macro_use] #[macro_use]
extern crate matches; extern crate matches;
extern crate rand;

15
src/timing.rs Normal file
View File

@ -0,0 +1,15 @@
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::Duration;
pub fn duration_as_ms(d: &Duration) -> u64 {
return (d.as_secs() * 1000) + (d.subsec_nanos() as u64 / 1_000_000);
}
pub fn duration_as_s(d: &Duration) -> f32 {
return d.as_secs() as f32 + (d.subsec_nanos() as f32 / 1_000_000_000.0);
}
pub fn timestamp() -> u64 {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
return duration_as_ms(&now);
}