Tell verifiers when not to parallelize accounting
Without this patch, many batches of transactions could be tossed into a single entry, but the parallelized accountant can only guarentee the transactions in the batch can be processed in parallel. This patch signals the historian to generate a new Entry after each batch. Validators must maintain sequential consistency across Entries.
This commit is contained in:
		| @@ -155,8 +155,12 @@ impl Accountant { | |||||||
|     pub fn process_verified_transactions(&self, trs: Vec<Transaction>) -> Vec<Result<Transaction>> { |     pub fn process_verified_transactions(&self, trs: Vec<Transaction>) -> Vec<Result<Transaction>> { | ||||||
|         // Run all debits first to filter out any transactions that can't be processed |         // Run all debits first to filter out any transactions that can't be processed | ||||||
|         // in parallel deterministically. |         // in parallel deterministically. | ||||||
|         trs.into_par_iter() |         let results: Vec<_> = trs.into_par_iter() | ||||||
|             .map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr)) |             .map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr)) | ||||||
|  |             .collect(); // Calling collect() here forces all debits to complete before moving on. | ||||||
|  |  | ||||||
|  |         results | ||||||
|  |             .into_par_iter() | ||||||
|             .map(|result| { |             .map(|result| { | ||||||
|                 result.map(|tr| { |                 result.map(|tr| { | ||||||
|                     self.process_verified_transaction_credits(&tr); |                     self.process_verified_transaction_credits(&tr); | ||||||
| @@ -166,6 +170,27 @@ impl Accountant { | |||||||
|             .collect() |             .collect() | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     fn partition_events(events: Vec<Event>) -> (Vec<Transaction>, Vec<Event>) { | ||||||
|  |         let mut trs = vec![]; | ||||||
|  |         let mut rest = vec![]; | ||||||
|  |         for event in events { | ||||||
|  |             match event { | ||||||
|  |                 Event::Transaction(tr) => trs.push(tr), | ||||||
|  |                 _ => rest.push(event), | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |         (trs, rest) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn process_verified_events(&self, events: Vec<Event>) -> Result<()> { | ||||||
|  |         let (trs, rest) = Self::partition_events(events); | ||||||
|  |         self.process_verified_transactions(trs); | ||||||
|  |         for event in rest { | ||||||
|  |             self.process_verified_event(&event)?; | ||||||
|  |         } | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  |  | ||||||
|     /// Process a Witness Signature that has already been verified. |     /// Process a Witness Signature that has already been verified. | ||||||
|     fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { |     fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { | ||||||
|         if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) { |         if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) { | ||||||
| @@ -410,6 +435,17 @@ mod tests { | |||||||
|         // Assert we're no longer able to use the oldest entry ID. |         // Assert we're no longer able to use the oldest entry ID. | ||||||
|         assert!(!acc.reserve_signature_with_last_id(&sig, &alice.last_id())); |         assert!(!acc.reserve_signature_with_last_id(&sig, &alice.last_id())); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_debits_before_credits() { | ||||||
|  |         let mint = Mint::new(2); | ||||||
|  |         let acc = Accountant::new(&mint); | ||||||
|  |         let alice = KeyPair::new(); | ||||||
|  |         let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); | ||||||
|  |         let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); | ||||||
|  |         let trs = vec![tr0, tr1]; | ||||||
|  |         assert!(acc.process_verified_transactions(trs)[1].is_err()); | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| #[cfg(all(feature = "unstable", test))] | #[cfg(all(feature = "unstable", test))] | ||||||
|   | |||||||
| @@ -179,6 +179,10 @@ impl<W: Write + Send + 'static> AccountantSkel<W> { | |||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |  | ||||||
|  |         // Let validators know they should not attempt to process additional | ||||||
|  |         // transactions in parallel. | ||||||
|  |         self.historian.sender.send(Signal::Tick)?; | ||||||
|  |  | ||||||
|         // Process the remaining requests serially. |         // Process the remaining requests serially. | ||||||
|         let rsps = reqs.into_iter() |         let rsps = reqs.into_iter() | ||||||
|             .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) |             .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) | ||||||
| @@ -321,12 +325,14 @@ mod tests { | |||||||
|     use accountant::Accountant; |     use accountant::Accountant; | ||||||
|     use accountant_skel::AccountantSkel; |     use accountant_skel::AccountantSkel; | ||||||
|     use accountant_stub::AccountantStub; |     use accountant_stub::AccountantStub; | ||||||
|  |     use entry::Entry; | ||||||
|     use historian::Historian; |     use historian::Historian; | ||||||
|     use mint::Mint; |     use mint::Mint; | ||||||
|     use plan::Plan; |     use plan::Plan; | ||||||
|  |     use recorder::Signal; | ||||||
|     use signature::{KeyPair, KeyPairUtil}; |     use signature::{KeyPair, KeyPairUtil}; | ||||||
|     use std::io::sink; |     use std::io::sink; | ||||||
|     use std::net::UdpSocket; |     use std::net::{SocketAddr, UdpSocket}; | ||||||
|     use std::sync::atomic::{AtomicBool, Ordering}; |     use std::sync::atomic::{AtomicBool, Ordering}; | ||||||
|     use std::sync::{Arc, Mutex}; |     use std::sync::{Arc, Mutex}; | ||||||
|     use std::thread::sleep; |     use std::thread::sleep; | ||||||
| @@ -359,6 +365,43 @@ mod tests { | |||||||
|         assert_eq!(rv[1].read().unwrap().packets.len(), 1); |         assert_eq!(rv[1].read().unwrap().packets.len(), 1); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_accounting_sequential_consistency() { | ||||||
|  |         // In this attack we'll demonstrate that a verifier can interpret the ledger | ||||||
|  |         // differently if either the server doesn't signal the ledger to add an | ||||||
|  |         // Entry OR if the verifier tries to parallelize across multiple Entries. | ||||||
|  |         let mint = Mint::new(2); | ||||||
|  |         let acc = Accountant::new(&mint); | ||||||
|  |         let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address"); | ||||||
|  |         let historian = Historian::new(&mint.last_id(), None); | ||||||
|  |         let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian); | ||||||
|  |  | ||||||
|  |         // Process a batch that includes a transaction that receives two tokens. | ||||||
|  |         let alice = KeyPair::new(); | ||||||
|  |         let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); | ||||||
|  |         let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)]; | ||||||
|  |         assert!(skel.process_packets(req_vers).is_ok()); | ||||||
|  |  | ||||||
|  |         // Process a second batch that spends one of those tokens. | ||||||
|  |         let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); | ||||||
|  |         let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)]; | ||||||
|  |         assert!(skel.process_packets(req_vers).is_ok()); | ||||||
|  |  | ||||||
|  |         // Collect the ledger and feed it to a new accountant. | ||||||
|  |         skel.historian.sender.send(Signal::Tick).unwrap(); | ||||||
|  |         drop(skel.historian.sender); | ||||||
|  |         let entries: Vec<Entry> = skel.historian.receiver.iter().collect(); | ||||||
|  |  | ||||||
|  |         // Assert the user holds one token, not two. If the server only output one | ||||||
|  |         // entry, then the second transaction will be rejected, because it drives | ||||||
|  |         // the account balance below zero before the credit is added. | ||||||
|  |         let acc = Accountant::new(&mint); | ||||||
|  |         for entry in entries { | ||||||
|  |             acc.process_verified_events(entry.events).unwrap(); | ||||||
|  |         } | ||||||
|  |         assert_eq!(acc.get_balance(&alice.pubkey()), Some(1)); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_accountant_bad_sig() { |     fn test_accountant_bad_sig() { | ||||||
|         let serve_port = 9002; |         let serve_port = 9002; | ||||||
|   | |||||||
| @@ -53,9 +53,7 @@ fn main() { | |||||||
|     let mut last_id = entry1.id; |     let mut last_id = entry1.id; | ||||||
|     for entry in entries { |     for entry in entries { | ||||||
|         last_id = entry.id; |         last_id = entry.id; | ||||||
|         for event in entry.events { |         acc.process_verified_events(entry.events).unwrap(); | ||||||
|             acc.process_verified_event(&event).unwrap(); |  | ||||||
|         } |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     let historian = Historian::new(&last_id, Some(1000)); |     let historian = Historian::new(&last_id, Some(1000)); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user