diff --git a/src/bank.rs b/src/bank.rs index 3b2ef23c98..e983b62d13 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -70,7 +70,8 @@ impl Bank { /// Commit funds to the 'to' party. fn apply_payment(&self, payment: &Payment) { // First we check balances with a read lock to maximize potential parallelization. - if self.balances + if self + .balances .read() .expect("'balances' read lock in apply_payment") .contains_key(&payment.to) @@ -119,7 +120,8 @@ impl Bank { } fn forget_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) { - if let Some(entry) = self.last_ids + if let Some(entry) = self + .last_ids .read() .expect("'last_ids' read lock in forget_signature_with_last_id") .iter() @@ -131,7 +133,8 @@ impl Bank { } fn reserve_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> Result<()> { - if let Some(entry) = self.last_ids + if let Some(entry) = self + .last_ids .read() .expect("'last_ids' read lock in reserve_signature_with_last_id") .iter() @@ -148,7 +151,8 @@ impl Bank { /// the oldest ones once its internal cache is full. Once boot, the /// bank will reject transactions using that `last_id`. pub fn register_entry_id(&self, last_id: &Hash) { - let mut last_ids = self.last_ids + let mut last_ids = self + .last_ids .write() .expect("'last_ids' write lock in register_entry_id"); if last_ids.len() >= MAX_ENTRY_IDS { @@ -166,7 +170,8 @@ impl Bank { return Err(BankError::NegativeTokens); } } - let bals = self.balances + let bals = self + .balances .read() .expect("'balances' read lock in apply_debits"); let option = bals.get(&tx.from); @@ -211,14 +216,18 @@ impl Bank { match &tx.instruction { Instruction::NewContract(contract) => { let mut plan = contract.plan.clone(); - plan.apply_witness(&Witness::Timestamp(*self.last_time - .read() - .expect("timestamp creation in apply_credits"))); + plan.apply_witness(&Witness::Timestamp( + *self + .last_time + .read() + .expect("timestamp creation in apply_credits"), + )); if let Some(ref payment) = plan.final_payment() { self.apply_payment(payment); } else { - let mut pending = self.pending + let mut pending = self + .pending .write() .expect("'pending' write lock in apply_credits"); pending.insert(tx.sig, plan); @@ -245,7 +254,8 @@ impl Bank { // Run all debits first to filter out any transactions that can't be processed // in parallel deterministically. info!("processing Transactions {}", txs.len()); - let results: Vec<_> = txs.into_par_iter() + let results: Vec<_> = txs + .into_par_iter() .map(|tx| self.apply_debits(&tx).map(|_| tx)) .collect(); // Calling collect() here forces all debits to complete before moving on. @@ -272,7 +282,8 @@ impl Bank { /// Process a Witness Signature. fn apply_signature(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { - if let Occupied(mut e) = self.pending + if let Occupied(mut e) = self + .pending .write() .expect("write() in apply_signature") .entry(tx_sig) @@ -291,7 +302,8 @@ impl Bank { fn apply_timestamp(&self, from: PublicKey, dt: DateTime) -> Result<()> { // If this is the first timestamp we've seen, it probably came from the genesis block, // so we'll trust it. - if *self.last_time + if *self + .last_time .read() .expect("'last_time' read lock on first timestamp check") == Utc.timestamp(0, 0) @@ -302,7 +314,8 @@ impl Bank { .insert(from); } - if self.time_sources + if self + .time_sources .read() .expect("'time_sources' read lock") .contains(&from) @@ -319,13 +332,17 @@ impl Bank { // Hold 'pending' write lock until the end of this function. Otherwise another thread can // double-spend if it enters before the modified plan is removed from 'pending'. - let mut pending = self.pending + let mut pending = self + .pending .write() .expect("'pending' write lock in apply_timestamp"); for (key, plan) in pending.iter_mut() { - plan.apply_witness(&Witness::Timestamp(*self.last_time - .read() - .expect("'last_time' read lock when creating timestamp"))); + plan.apply_witness(&Witness::Timestamp( + *self + .last_time + .read() + .expect("'last_time' read lock when creating timestamp"), + )); if let Some(ref payment) = plan.final_payment() { self.apply_payment(payment); completed.push(key.clone()); @@ -370,7 +387,8 @@ impl Bank { } pub fn get_balance(&self, pubkey: &PublicKey) -> Option { - let bals = self.balances + let bals = self + .balances .read() .expect("'balances' read lock in get_balance"); bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64) @@ -512,7 +530,8 @@ mod tests { let bank = Bank::new(&mint); let pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); - let sig = bank.transfer_on_date(1, &mint.keypair(), pubkey, dt, mint.last_id()) + let sig = bank + .transfer_on_date(1, &mint.keypair(), pubkey, dt, mint.last_id()) .unwrap(); // Assert the debit counts as a transaction. diff --git a/src/crdt.rs b/src/crdt.rs index 63a1e1b993..453614a800 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -326,7 +326,8 @@ impl Crdt { } fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { //trace!("get updates since {}", v); - let data = self.table + let data = self + .table .values() .filter(|x| self.local[&x.id] > v) .cloned() @@ -338,7 +339,8 @@ impl Crdt { pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec)> { let daddr = "0.0.0.0:0".parse().unwrap(); - let valid: Vec<_> = self.table + let valid: Vec<_> = self + .table .values() .filter(|r| r.id != self.me && r.replicate_addr != daddr) .collect(); @@ -391,7 +393,8 @@ impl Crdt { // Lock the object only to do this operation and not for any longer // especially not when doing the `sock.send_to` - let (remote_gossip_addr, req) = obj.read() + let (remote_gossip_addr, req) = obj + .read() .expect("'obj' read lock in fn run_gossip") .gossip_request()?; // TODO this will get chatty, so we need to first ask for number of updates since @@ -483,7 +486,8 @@ impl Crdt { trace!("RequestUpdates {}", v); let addr = reqdata.gossip_addr; // only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from` - let (from, ups, data) = obj.read() + let (from, ups, data) = obj + .read() .expect("'obj' read lock in RequestUpdates") .get_updates_since(v); trace!("get updates since response {} {}", v, data.len()); @@ -554,7 +558,8 @@ impl Crdt { while let Ok(mut more) = requests_receiver.try_recv() { reqs.append(&mut more); } - let resp: VecDeque<_> = reqs.iter() + let resp: VecDeque<_> = reqs + .iter() .filter_map(|b| Self::handle_blob(obj, window, blob_recycler, &b.read().unwrap())) .collect(); response_sender.send(resp)?; diff --git a/src/streamer.rs b/src/streamer.rs index d069eef479..3b2fff7903 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -29,7 +29,8 @@ fn recv_loop( let msgs = re.allocate(); let msgs_ = msgs.clone(); loop { - match msgs.write() + match msgs + .write() .expect("write lock in fn recv_loop") .recv_from(sock) { @@ -200,7 +201,8 @@ fn recv_window( ) -> Result<()> { let timer = Duration::from_millis(200); let mut dq = r.recv_timeout(timer)?; - let leader_id = crdt.read() + let leader_id = crdt + .read() .expect("'crdt' read lock in fn recv_window") .leader_data() .id; @@ -596,8 +598,8 @@ mod test { use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; - use streamer::{default_window, BlobReceiver, PacketReceiver}; use streamer::{blob_receiver, receiver, responder, window}; + use streamer::{default_window, BlobReceiver, PacketReceiver}; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 { diff --git a/src/thin_client.rs b/src/thin_client.rs index 22c624ddb4..4d079a8041 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -184,8 +184,8 @@ mod tests { use server::Server; use signature::{KeyPair, KeyPairUtil}; use std::io::sink; - use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; use std::thread::sleep; use std::time::Duration; use transaction::{Instruction, Plan}; diff --git a/src/transaction.rs b/src/transaction.rs index 82c88fa473..92baad360d 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -155,7 +155,8 @@ impl Transaction { pub fn verify_plan(&self) -> bool { if let Instruction::NewContract(contract) = &self.instruction { - self.fee >= 0 && self.fee <= contract.tokens + self.fee >= 0 + && self.fee <= contract.tokens && contract.plan.verify(contract.tokens - self.fee) } else { true diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 644f3d8c4e..78a7764c4a 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -150,7 +150,8 @@ pub fn crdt_retransmit() { trace!("waiting to converge:"); let mut done = false; for _ in 0..30 { - done = c1.read().unwrap().table.len() == 3 && c2.read().unwrap().table.len() == 3 + done = c1.read().unwrap().table.len() == 3 + && c2.read().unwrap().table.len() == 3 && c3.read().unwrap().table.len() == 3; if done { break;