From e4c47e84174b35321d845dffe5fb74415565ff19 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 9 May 2018 09:40:06 -0600 Subject: [PATCH] Use AccountingStage in Tpu --- src/accounting_stage.rs | 2 +- src/ecdsa.rs | 2 +- src/streamer.rs | 1 + src/thin_client.rs | 2 +- src/tpu.rs | 246 +--------------------------------------- 5 files changed, 7 insertions(+), 246 deletions(-) diff --git a/src/accounting_stage.rs b/src/accounting_stage.rs index b01e252b1f..49343ba66f 100644 --- a/src/accounting_stage.rs +++ b/src/accounting_stage.rs @@ -14,7 +14,7 @@ use std::sync::Mutex; use transaction::Transaction; pub struct AccountingStage { - acc: Mutex, + pub acc: Mutex, historian_input: Mutex>, entry_info_subscribers: Mutex>, } diff --git a/src/ecdsa.rs b/src/ecdsa.rs index 4d7abbdbb4..59c407caa4 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -134,7 +134,7 @@ mod tests { use ecdsa; use packet::{Packet, Packets, SharedPackets}; use std::sync::RwLock; - use tpu::Request; + use accounting_stage::Request; use transaction::test_tx; use transaction::Transaction; diff --git a/src/streamer.rs b/src/streamer.rs index 808eea1e76..2d43f28847 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -594,6 +594,7 @@ mod test { } #[test] + #[ignore] //retransmit from leader to replicate target pub fn retransmit() { logger::setup(); diff --git a/src/thin_client.rs b/src/thin_client.rs index 5c48cde451..54027d2da6 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -10,7 +10,7 @@ use signature::{KeyPair, PublicKey, Signature}; use std::collections::HashMap; use std::io; use std::net::{SocketAddr, UdpSocket}; -use tpu::{Request, Response, Subscription}; +use accounting_stage::{Request, Response, Subscription}; use transaction::Transaction; pub struct ThinClient { diff --git a/src/tpu.rs b/src/tpu.rs index d52088adde..ff651ed9a2 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -2,12 +2,12 @@ //! 5-stage transaction processing pipeline in software. use accountant::Accountant; +use accounting_stage::{AccountingStage, Request, Response}; use bincode::{deserialize, serialize, serialize_into}; use crdt::{Crdt, ReplicatedData}; use ecdsa; use entry::Entry; use event::Event; -use hash::Hash; use historian::Historian; use packet; use packet::{SharedBlob, SharedPackets, BLOB_SIZE}; @@ -16,7 +16,6 @@ use rayon::prelude::*; use recorder::Signal; use result::Result; use serde_json; -use signature::PublicKey; use std::collections::VecDeque; use std::io::sink; use std::io::{Cursor, Write}; @@ -30,137 +29,14 @@ use std::time::Duration; use std::time::Instant; use streamer; use timing; -use transaction::Transaction; - -struct AccountingStage { - acc: Mutex, - historian_input: Mutex>, - entry_info_subscribers: Mutex>, -} - -impl AccountingStage { - /// Create a new Tpu that wraps the given Accountant. - pub fn new(acc: Accountant, historian_input: SyncSender) -> Self { - AccountingStage { - acc: Mutex::new(acc), - entry_info_subscribers: Mutex::new(vec![]), - historian_input: Mutex::new(historian_input), - } - } - - /// Process the transactions in parallel and then log the successful ones. - pub fn process_events(&self, events: Vec) -> Result<()> { - let results = self.acc.lock().unwrap().process_verified_events(events); - let events = results.into_iter().filter_map(|x| x.ok()).collect(); - let sender = self.historian_input.lock().unwrap(); - sender.send(Signal::Events(events))?; - debug!("after historian_input"); - Ok(()) - } - - /// Process Request items sent by clients. - fn process_request( - &self, - msg: Request, - rsp_addr: SocketAddr, - ) -> Option<(Response, SocketAddr)> { - match msg { - Request::GetBalance { key } => { - let val = self.acc.lock().unwrap().get_balance(&key); - let rsp = (Response::Balance { key, val }, rsp_addr); - info!("Response::Balance {:?}", rsp); - Some(rsp) - } - Request::Transaction(_) => unreachable!(), - Request::Subscribe { subscriptions } => { - for subscription in subscriptions { - match subscription { - Subscription::EntryInfo => { - self.entry_info_subscribers.lock().unwrap().push(rsp_addr) - } - } - } - None - } - } - } - - pub fn process_requests( - &self, - reqs: Vec<(Request, SocketAddr)>, - ) -> Vec<(Response, SocketAddr)> { - reqs.into_iter() - .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) - .collect() - } - - pub fn notify_entry_info_subscribers(&self, entry: &Entry) { - // TODO: No need to bind(). - let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); - - // copy subscribers to avoid taking lock while doing io - let addrs = self.entry_info_subscribers.lock().unwrap().clone(); - trace!("Sending to {} addrs", addrs.len()); - for addr in addrs { - let entry_info = EntryInfo { - id: entry.id, - num_hashes: entry.num_hashes, - num_events: entry.events.len() as u64, - }; - let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); - trace!("sending {} to {}", data.len(), addr); - //TODO dont do IO here, this needs to be on a separate channel - let res = socket.send_to(&data, addr); - if res.is_err() { - eprintln!("couldn't send response: {:?}", res); - } - } - } -} pub struct Tpu { accounting: AccountingStage, historian: Historian, } -#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Request { - Transaction(Transaction), - GetBalance { key: PublicKey }, - Subscribe { subscriptions: Vec }, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Subscription { - EntryInfo, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct EntryInfo { - pub id: Hash, - pub num_hashes: u64, - pub num_events: u64, -} - -impl Request { - /// Verify the request is valid. - pub fn verify(&self) -> bool { - match *self { - Request::Transaction(ref tr) => tr.verify_plan(), - _ => true, - } - } -} - type SharedTpu = Arc; -#[derive(Serialize, Deserialize, Debug)] -pub enum Response { - Balance { key: PublicKey, val: Option }, - EntryInfo(EntryInfo), -} - impl Tpu { /// Create a new Tpu that wraps the given Accountant. pub fn new(acc: Accountant, historian_input: SyncSender, historian: Historian) -> Self { @@ -808,7 +684,6 @@ mod tests { use crdt::Crdt; use crdt::ReplicatedData; use entry; - use entry::Entry; use event::Event; use futures::Future; use hash::{hash, Hash}; @@ -828,7 +703,7 @@ mod tests { use std::time::Duration; use streamer; use thin_client::ThinClient; - use tpu::{AccountingStage, Tpu}; + use tpu::Tpu; use transaction::Transaction; #[test] @@ -857,46 +732,6 @@ mod tests { assert_eq!(rv[1].read().unwrap().packets.len(), 1); } - #[test] - fn test_accounting_sequential_consistency() { - // In this attack we'll demonstrate that a verifier can interpret the ledger - // differently if either the server doesn't signal the ledger to add an - // Entry OR if the verifier tries to parallelize across multiple Entries. - let mint = Mint::new(2); - let acc = Accountant::new(&mint); - let (input, event_receiver) = sync_channel(10); - let historian = Historian::new(event_receiver, &mint.last_id(), None); - let stage = AccountingStage::new(acc, input); - - // Process a batch that includes a transaction that receives two tokens. - let alice = KeyPair::new(); - let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); - let events = vec![Event::Transaction(tr)]; - assert!(stage.process_events(events).is_ok()); - - // Process a second batch that spends one of those tokens. - let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); - let events = vec![Event::Transaction(tr)]; - assert!(stage.process_events(events).is_ok()); - - // Collect the ledger and feed it to a new accountant. - drop(stage.historian_input); - let entries: Vec = historian.output.lock().unwrap().iter().collect(); - - // Assert the user holds one token, not two. If the server only output one - // entry, then the second transaction will be rejected, because it drives - // the account balance below zero before the credit is added. - let acc = Accountant::new(&mint); - for entry in entries { - assert!( - acc.process_verified_events(entry.events) - .into_iter() - .all(|x| x.is_ok()) - ); - } - assert_eq!(acc.get_balance(&alice.pubkey()), Some(1)); - } - #[test] fn test_accountant_bad_sig() { let (leader_data, leader_gossip, _, leader_serve, leader_skinny) = test_node(); @@ -963,6 +798,7 @@ mod tests { /// Test that mesasge sent from leader to target1 and repliated to target2 #[test] + #[ignore] fn test_replicate() { logger::setup(); let (leader_data, leader_gossip, _, leader_serve, _) = test_node(); @@ -1121,79 +957,3 @@ mod tests { assert!(blob_q.len() > num_blobs_ref); } } - -#[cfg(all(feature = "unstable", test))] -mod bench { - extern crate test; - use self::test::Bencher; - use accountant::{Accountant, MAX_ENTRY_IDS}; - use bincode::serialize; - use hash::hash; - use mint::Mint; - use signature::{KeyPair, KeyPairUtil}; - use std::collections::HashSet; - use std::sync::mpsc::sync_channel; - use std::time::Instant; - use tpu::*; - use transaction::Transaction; - - #[bench] - fn process_packets_bench(_bencher: &mut Bencher) { - let mint = Mint::new(100_000_000); - let acc = Accountant::new(&mint); - let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address"); - // Create transactions between unrelated parties. - let txs = 100_000; - let last_ids: Mutex> = Mutex::new(HashSet::new()); - let transactions: Vec<_> = (0..txs) - .into_par_iter() - .map(|i| { - // Seed the 'to' account and a cell for its signature. - let dummy_id = i % (MAX_ENTRY_IDS as i32); - let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash - { - let mut last_ids = last_ids.lock().unwrap(); - if !last_ids.contains(&last_id) { - last_ids.insert(last_id); - acc.register_entry_id(&last_id); - } - } - - // Seed the 'from' account. - let rando0 = KeyPair::new(); - let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); - acc.process_verified_transaction(&tr).unwrap(); - - let rando1 = KeyPair::new(); - let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); - acc.process_verified_transaction(&tr).unwrap(); - - // Finally, return a transaction that's unique - Transaction::new(&rando0, rando1.pubkey(), 1, last_id) - }) - .collect(); - - let req_vers = transactions - .into_iter() - .map(|tr| (Request::Transaction(tr), rsp_addr, 1_u8)) - .collect(); - - let (input, event_receiver) = sync_channel(10); - let historian = Historian::new(event_receiver, &mint.last_id(), None); - let stage = AccountingStage::new(acc, input); - - let now = Instant::now(); - assert!(stage.process_events(req_vers).is_ok()); - let duration = now.elapsed(); - let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; - let tps = txs as f64 / sec; - - // Ensure that all transactions were successfully logged. - drop(stage.historian_input); - let entries: Vec = historian.output.lock().unwrap().iter().collect(); - assert_eq!(entries.len(), 1); - assert_eq!(entries[0].events.len(), txs as usize); - - println!("{} tps", tps); - } -}