From f7083e09234872ca85b3507353ef30bf8afd9688 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 12:15:29 -0600 Subject: [PATCH] Remove transaction processing from RPU and request processing from TVU --- src/ecdsa.rs | 15 +++- src/request.rs | 21 +----- src/request_processor.rs | 69 ++++--------------- src/request_stage.rs | 145 +-------------------------------------- src/rpu.rs | 9 +-- src/sig_verify_stage.rs | 12 ++-- src/thin_client.rs | 5 +- src/tvu.rs | 23 ++----- 8 files changed, 46 insertions(+), 253 deletions(-) diff --git a/src/ecdsa.rs b/src/ecdsa.rs index b2477b6712..12ecb306ae 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -136,14 +136,23 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { mod tests { use bincode::serialize; use ecdsa; + use event::Event; use packet::{Packet, Packets, SharedPackets}; - use request::Request; use std::sync::RwLock; use transaction::Transaction; - use transaction::test_tx; + use transaction::{memfind, test_tx}; + + #[test] + fn test_layout() { + let tr = test_tx(); + let tx = serialize(&tr).unwrap(); + let packet = serialize(&Event::Transaction(tr)).unwrap(); + assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); + assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); + } fn make_packet_from_transaction(tr: Transaction) -> Packet { - let tx = serialize(&Request::Transaction(tr)).unwrap(); + let tx = serialize(&Event::Transaction(tr)).unwrap(); let mut packet = Packet::default(); packet.meta.size = tx.len(); packet.data[..packet.meta.size].copy_from_slice(&tx); diff --git a/src/request.rs b/src/request.rs index d1c692ae42..1be8ee01dc 100644 --- a/src/request.rs +++ b/src/request.rs @@ -5,12 +5,10 @@ use hash::Hash; use packet; use packet::SharedPackets; use signature::PublicKey; -use transaction::Transaction; #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] #[derive(Serialize, Deserialize, Debug, Clone)] pub enum Request { - Transaction(Transaction), GetBalance { key: PublicKey }, GetLastId, GetTransactionCount, @@ -19,10 +17,7 @@ pub enum Request { impl Request { /// Verify the request is valid. pub fn verify(&self) -> bool { - match *self { - Request::Transaction(ref tr) => tr.verify_plan(), - _ => true, - } + true } } @@ -54,24 +49,12 @@ pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec #[cfg(test)] mod tests { - use bincode::serialize; - use ecdsa; use packet::{PacketRecycler, NUM_PACKETS}; use request::{to_request_packets, Request}; - use transaction::{memfind, test_tx}; - - #[test] - fn test_layout() { - let tr = test_tx(); - let tx = serialize(&tr).unwrap(); - let packet = serialize(&Request::Transaction(tr)).unwrap(); - assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); - assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); - } #[test] fn test_to_packets() { - let tr = Request::Transaction(test_tx()); + let tr = Request::GetTransactionCount; let re = PacketRecycler::default(); let rv = to_request_packets(&re, vec![tr.clone(); 1]); assert_eq!(rv.len(), 1); diff --git a/src/request_processor.rs b/src/request_processor.rs index f070bbfe3d..63f6c6ddb9 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -6,14 +6,12 @@ use event::Event; use packet; use packet::SharedPackets; use rayon::prelude::*; -use recorder::Signal; use request::{Request, Response}; use result::Result; use std::collections::VecDeque; use std::net::SocketAddr; use std::sync::Arc; -use std::sync::mpsc::{Receiver, Sender}; -use std::time::Duration; +use std::sync::mpsc::Receiver; use std::time::Instant; use streamer; use timing; @@ -53,7 +51,6 @@ impl RequestProcessor { info!("Response::TransactionCount {:?}", rsp); Some(rsp) } - Request::Transaction(_) => unreachable!(), } } @@ -91,24 +88,6 @@ impl RequestProcessor { } /// Split Request list into verified transactions and the rest - fn partition_requests( - req_vers: Vec<(Request, SocketAddr, u8)>, - ) -> (Vec, Vec<(Request, SocketAddr)>) { - let mut events = vec![]; - let mut reqs = vec![]; - for (msg, rsp_addr, verify) in req_vers { - match msg { - Request::Transaction(tr) => { - if verify != 0 { - events.push(Event::Transaction(tr)); - } - } - _ => reqs.push((msg, rsp_addr)), - } - } - (events, reqs) - } - fn serialize_response( resp: Response, rsp_addr: SocketAddr, @@ -139,49 +118,29 @@ impl RequestProcessor { pub fn process_request_packets( &self, - verified_receiver: &Receiver)>>, - signal_sender: &Sender, + packet_receiver: &Receiver, blob_sender: &streamer::BlobSender, packet_recycler: &packet::PacketRecycler, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { - let timer = Duration::new(1, 0); - let recv_start = Instant::now(); - let mms = verified_receiver.recv_timeout(timer)?; - let mut reqs_len = 0; - let mms_len = mms.len(); + let (batch, batch_len) = streamer::recv_batch(packet_receiver)?; + info!( - "@{:?} process start stalled for: {:?}ms batches: {}", + "@{:?} request_stage: processing: {}", timing::timestamp(), - timing::duration_as_ms(&recv_start.elapsed()), - mms.len(), + batch_len ); + + let mut reqs_len = 0; let proc_start = Instant::now(); - for (msgs, vers) in mms { - let reqs = Self::deserialize_requests(&msgs.read().unwrap()); - reqs_len += reqs.len(); - let req_vers = reqs.into_iter() - .zip(vers) - .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) - .filter(|x| { - let v = x.0.verify(); - v - }) + for msgs in batch { + let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap()) + .into_iter() + .filter_map(|x| x) .collect(); + reqs_len += reqs.len(); - debug!("partitioning"); - let (events, reqs) = Self::partition_requests(req_vers); - debug!("events: {} reqs: {}", events.len(), reqs.len()); - - debug!("process_events"); - let results = self.bank.process_verified_events(events); - let events = results.into_iter().filter_map(|x| x.ok()).collect(); - signal_sender.send(Signal::Events(events))?; - debug!("done process_events"); - - debug!("process_requests"); let rsps = self.process_requests(reqs); - debug!("done process_requests"); let blobs = Self::serialize_responses(rsps, blob_recycler)?; if !blobs.is_empty() { @@ -196,7 +155,7 @@ impl RequestProcessor { info!( "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", timing::timestamp(), - mms_len, + batch_len, total_time_ms, reqs_len, (reqs_len as f32) / (total_time_s) diff --git a/src/request_stage.rs b/src/request_stage.rs index 8efc4f8237..cd98a7d439 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -2,7 +2,6 @@ use packet; use packet::SharedPackets; -use recorder::Signal; use request_processor::RequestProcessor; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -12,7 +11,6 @@ use streamer; pub struct RequestStage { pub thread_hdl: JoinHandle<()>, - pub signal_receiver: Receiver, pub blob_receiver: streamer::BlobReceiver, pub request_processor: Arc, } @@ -21,18 +19,16 @@ impl RequestStage { pub fn new( request_processor: RequestProcessor, exit: Arc, - verified_receiver: Receiver)>>, + packet_receiver: Receiver, packet_recycler: packet::PacketRecycler, blob_recycler: packet::BlobRecycler, ) -> Self { let request_processor = Arc::new(request_processor); let request_processor_ = request_processor.clone(); - let (signal_sender, signal_receiver) = channel(); let (blob_sender, blob_receiver) = channel(); let thread_hdl = spawn(move || loop { let e = request_processor_.process_request_packets( - &verified_receiver, - &signal_sender, + &packet_receiver, &blob_sender, &packet_recycler, &blob_recycler, @@ -45,145 +41,8 @@ impl RequestStage { }); RequestStage { thread_hdl, - signal_receiver, blob_receiver, request_processor, } } } - -// TODO: When banking is pulled out of RequestStage, add this test back in. - -//use bank::Bank; -//use entry::Entry; -//use event::Event; -//use hash::Hash; -//use record_stage::RecordStage; -//use recorder::Signal; -//use result::Result; -//use std::sync::mpsc::{channel, Sender}; -//use std::sync::{Arc, Mutex}; -//use std::time::Duration; -// -//#[cfg(test)] -//mod tests { -// use bank::Bank; -// use event::Event; -// use event_processor::EventProcessor; -// use mint::Mint; -// use signature::{KeyPair, KeyPairUtil}; -// use transaction::Transaction; -// -// #[test] -// // TODO: Move this test banking_stage. Calling process_events() directly -// // defeats the purpose of this test. -// fn test_banking_sequential_consistency() { -// // In this attack we'll demonstrate that a verifier can interpret the ledger -// // differently if either the server doesn't signal the ledger to add an -// // Entry OR if the verifier tries to parallelize across multiple Entries. -// let mint = Mint::new(2); -// let bank = Bank::new(&mint); -// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); -// -// // Process a batch that includes a transaction that receives two tokens. -// let alice = KeyPair::new(); -// let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); -// let events = vec![Event::Transaction(tr)]; -// let entry0 = event_processor.process_events(events).unwrap(); -// -// // Process a second batch that spends one of those tokens. -// let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); -// let events = vec![Event::Transaction(tr)]; -// let entry1 = event_processor.process_events(events).unwrap(); -// -// // Collect the ledger and feed it to a new bank. -// let entries = vec![entry0, entry1]; -// -// // Assert the user holds one token, not two. If the server only output one -// // entry, then the second transaction will be rejected, because it drives -// // the account balance below zero before the credit is added. -// let bank = Bank::new(&mint); -// for entry in entries { -// assert!( -// bank -// .process_verified_events(entry.events) -// .into_iter() -// .all(|x| x.is_ok()) -// ); -// } -// assert_eq!(bank.get_balance(&alice.pubkey()), Some(1)); -// } -//} -// -//#[cfg(all(feature = "unstable", test))] -//mod bench { -// extern crate test; -// use self::test::Bencher; -// use bank::{Bank, MAX_ENTRY_IDS}; -// use bincode::serialize; -// use event_processor::*; -// use hash::hash; -// use mint::Mint; -// use rayon::prelude::*; -// use signature::{KeyPair, KeyPairUtil}; -// use std::collections::HashSet; -// use std::time::Instant; -// use transaction::Transaction; -// -// #[bench] -// fn process_events_bench(_bencher: &mut Bencher) { -// let mint = Mint::new(100_000_000); -// let bank = Bank::new(&mint); -// // Create transactions between unrelated parties. -// let txs = 100_000; -// let last_ids: Mutex> = Mutex::new(HashSet::new()); -// let transactions: Vec<_> = (0..txs) -// .into_par_iter() -// .map(|i| { -// // Seed the 'to' account and a cell for its signature. -// let dummy_id = i % (MAX_ENTRY_IDS as i32); -// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash -// { -// let mut last_ids = last_ids.lock().unwrap(); -// if !last_ids.contains(&last_id) { -// last_ids.insert(last_id); -// bank.register_entry_id(&last_id); -// } -// } -// -// // Seed the 'from' account. -// let rando0 = KeyPair::new(); -// let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); -// bank.process_verified_transaction(&tr).unwrap(); -// -// let rando1 = KeyPair::new(); -// let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); -// bank.process_verified_transaction(&tr).unwrap(); -// -// // Finally, return a transaction that's unique -// Transaction::new(&rando0, rando1.pubkey(), 1, last_id) -// }) -// .collect(); -// -// let events: Vec<_> = transactions -// .into_iter() -// .map(|tr| Event::Transaction(tr)) -// .collect(); -// -// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); -// -// let now = Instant::now(); -// assert!(event_processor.process_events(events).is_ok()); -// let duration = now.elapsed(); -// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; -// let tps = txs as f64 / sec; -// -// // Ensure that all transactions were successfully logged. -// drop(event_processor.historian_input); -// let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); -// assert_eq!(entries.len(), 1); -// assert_eq!(entries[0].events.len(), txs as usize); -// -// println!("{} tps", tps); -// } -//} diff --git a/src/rpu.rs b/src/rpu.rs index ba4c4efc6f..eef1515d21 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -5,7 +5,6 @@ use bank::Bank; use packet; use request_processor::RequestProcessor; use request_stage::RequestStage; -use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::Arc; use std::sync::atomic::AtomicBool; @@ -33,14 +32,12 @@ impl Rpu { packet_sender, ); - let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - let blob_recycler = packet::BlobRecycler::default(); let request_processor = RequestProcessor::new(bank.clone()); let request_stage = RequestStage::new( request_processor, exit.clone(), - sig_verify_stage.verified_receiver, + packet_receiver, packet_recycler.clone(), blob_recycler.clone(), ); @@ -52,9 +49,7 @@ impl Rpu { request_stage.blob_receiver, ); - let mut thread_hdls = vec![t_receiver, t_responder, request_stage.thread_hdl]; - thread_hdls.extend(sig_verify_stage.thread_hdls.into_iter()); - + let thread_hdls = vec![t_receiver, t_responder, request_stage.thread_hdl]; Rpu { thread_hdls } } } diff --git a/src/sig_verify_stage.rs b/src/sig_verify_stage.rs index 6528537c6a..ccb201e3ae 100644 --- a/src/sig_verify_stage.rs +++ b/src/sig_verify_stage.rs @@ -18,9 +18,9 @@ pub struct SigVerifyStage { } impl SigVerifyStage { - pub fn new(exit: Arc, packets_receiver: Receiver) -> Self { + pub fn new(exit: Arc, packet_receiver: Receiver) -> Self { let (verified_sender, verified_receiver) = channel(); - let thread_hdls = Self::verifier_services(exit, packets_receiver, verified_sender); + let thread_hdls = Self::verifier_services(exit, packet_receiver, verified_sender); SigVerifyStage { thread_hdls, verified_receiver, @@ -71,11 +71,11 @@ impl SigVerifyStage { fn verifier_service( exit: Arc, - packets_receiver: Arc>, + packet_receiver: Arc>, verified_sender: Arc)>>>>, ) -> JoinHandle<()> { spawn(move || loop { - let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone()); + let e = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()); if e.is_err() && exit.load(Ordering::Relaxed) { break; } @@ -84,11 +84,11 @@ impl SigVerifyStage { fn verifier_services( exit: Arc, - packets_receiver: streamer::PacketReceiver, + packet_receiver: streamer::PacketReceiver, verified_sender: Sender)>>, ) -> Vec> { let sender = Arc::new(Mutex::new(verified_sender)); - let receiver = Arc::new(Mutex::new(packets_receiver)); + let receiver = Arc::new(Mutex::new(packet_receiver)); (0..4) .map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone())) .collect() diff --git a/src/thin_client.rs b/src/thin_client.rs index 6214ef520a..1979d42ff2 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -4,6 +4,7 @@ //! unstable and may change in future releases. use bincode::{deserialize, serialize}; +use event::Event; use futures::future::{ok, FutureResult}; use hash::Hash; use request::{Request, Response}; @@ -67,8 +68,8 @@ impl ThinClient { /// Send a signed Transaction to the server for processing. This method /// does not wait for a response. pub fn transfer_signed(&self, tr: Transaction) -> io::Result { - let req = Request::Transaction(tr); - let data = serialize(&req).expect("serialize Transaction in pub fn transfer_signed"); + let event = Event::Transaction(tr); + let data = serialize(&event).expect("serialize Transaction in pub fn transfer_signed"); self.events_socket.send_to(&data, &self.addr) } diff --git a/src/tvu.rs b/src/tvu.rs index fece1980cf..3a87a6dab4 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -2,13 +2,12 @@ //! 5-stage transaction validation pipeline in software. use bank::Bank; +use banking_stage::BankingStage; use crdt::{Crdt, ReplicatedData}; use hash::Hash; use ledger; use packet; use record_stage::RecordStage; -use request_processor::RequestProcessor; -use request_stage::RequestStage; use result::Result; use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; @@ -146,10 +145,8 @@ impl Tvu { // make sure we are on the same interface let mut local = requests_socket.local_addr()?; local.set_port(0); - let respond_socket = UdpSocket::bind(local.clone())?; let packet_recycler = packet::PacketRecycler::default(); - let blob_recycler = packet::BlobRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_packet_receiver = streamer::receiver( requests_socket, @@ -160,17 +157,15 @@ impl Tvu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - let request_processor = RequestProcessor::new(obj.bank.clone()); - let request_stage = RequestStage::new( - request_processor, + let banking_stage = BankingStage::new( + obj.bank.clone(), exit.clone(), sig_verify_stage.verified_receiver, packet_recycler.clone(), - blob_recycler.clone(), ); let record_stage = RecordStage::new( - request_stage.signal_receiver, + banking_stage.signal_receiver, &obj.start_hash, obj.tick_duration, ); @@ -178,13 +173,6 @@ impl Tvu { let write_stage = WriteStage::new_drain(obj.bank.clone(), exit.clone(), record_stage.entry_receiver); - let t_responder = streamer::responder( - respond_socket, - exit.clone(), - blob_recycler.clone(), - request_stage.blob_receiver, - ); - let mut threads = vec![ //replicate threads t_blob_receiver, @@ -195,8 +183,7 @@ impl Tvu { t_listen, //serve threads t_packet_receiver, - t_responder, - request_stage.thread_hdl, + banking_stage.thread_hdl, write_stage.thread_hdl, ]; threads.extend(sig_verify_stage.thread_hdls.into_iter());