diff --git a/src/accountant.rs b/src/accountant.rs index b884d5986c..f1e678ca2d 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -16,8 +16,8 @@ use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet, VecDeque}; use std::result; -use std::sync::RwLock; use std::sync::atomic::{AtomicIsize, Ordering}; +use std::sync::RwLock; use transaction::Transaction; pub const MAX_ENTRY_IDS: usize = 1024 * 4; @@ -34,13 +34,17 @@ pub type Result = result::Result; /// Commit funds to the 'to' party. fn apply_payment(balances: &RwLock>, payment: &Payment) { // First we check balances with a read lock to maximize potential parallelization. - if balances.read().unwrap().contains_key(&payment.to) { - let bals = balances.read().unwrap(); + if balances + .read() + .expect("'balances' read lock in apply_payment") + .contains_key(&payment.to) + { + let bals = balances.read().expect("'balances' read lock"); bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed); } else { // Now we know the key wasn't present a nanosecond ago, but it might be there // by the time we aquire a write lock, so we'll have to check again. - let mut bals = balances.write().unwrap(); + let mut bals = balances.write().expect("'balances' write lock"); if bals.contains_key(&payment.to) { bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed); } else { @@ -84,27 +88,37 @@ impl Accountant { /// Return the last entry ID registered pub fn last_id(&self) -> Hash { - let last_ids = self.last_ids.read().unwrap(); - let last_item = last_ids.iter().last().expect("empty last_ids list"); + let last_ids = self.last_ids.read().expect("'last_ids' read lock"); + let last_item = last_ids.iter().last().expect("empty 'last_ids' list"); last_item.0 } fn reserve_signature(signatures: &RwLock>, sig: &Signature) -> bool { - if signatures.read().unwrap().contains(sig) { + if signatures + .read() + .expect("'signatures' read lock") + .contains(sig) + { return false; } - signatures.write().unwrap().insert(*sig); + signatures + .write() + .expect("'signatures' write lock") + .insert(*sig); true } fn forget_signature(signatures: &RwLock>, sig: &Signature) -> bool { - signatures.write().unwrap().remove(sig) + signatures + .write() + .expect("'signatures' write lock in forget_signature") + .remove(sig) } fn forget_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool { if let Some(entry) = self.last_ids .read() - .unwrap() + .expect("'last_ids' read lock in forget_signature_with_last_id") .iter() .rev() .find(|x| x.0 == *last_id) @@ -117,7 +131,7 @@ impl Accountant { fn reserve_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool { if let Some(entry) = self.last_ids .read() - .unwrap() + .expect("'last_ids' read lock in reserve_signature_with_last_id") .iter() .rev() .find(|x| x.0 == *last_id) @@ -132,7 +146,9 @@ impl Accountant { /// the oldest ones once its internal cache is full. Once boot, the /// accountant will reject transactions using that `last_id`. pub fn register_entry_id(&self, last_id: &Hash) { - let mut last_ids = self.last_ids.write().unwrap(); + let mut last_ids = self.last_ids + .write() + .expect("'last_ids' write lock in register_entry_id"); if last_ids.len() >= MAX_ENTRY_IDS { last_ids.pop_front(); } @@ -142,7 +158,9 @@ impl Accountant { /// Deduct tokens from the 'from' address the account has sufficient /// funds and isn't a duplicate. pub fn process_verified_transaction_debits(&self, tr: &Transaction) -> Result<()> { - let bals = self.balances.read().unwrap(); + let bals = self.balances + .read() + .expect("'balances' read lock in process_verified_transaction_debits"); let option = bals.get(&tr.from); if option.is_none() { @@ -154,7 +172,7 @@ impl Accountant { } loop { - let bal = option.unwrap(); + let bal = option.expect("assignment of option to bal"); let current = bal.load(Ordering::Relaxed) as i64; if current < tr.data.tokens { @@ -178,12 +196,16 @@ impl Accountant { pub fn process_verified_transaction_credits(&self, tr: &Transaction) { let mut plan = tr.data.plan.clone(); - plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap())); + plan.apply_witness(&Witness::Timestamp(*self.last_time + .read() + .expect("timestamp creation in process_verified_transaction_credits"))); if let Some(ref payment) = plan.final_payment() { apply_payment(&self.balances, payment); } else { - let mut pending = self.pending.write().unwrap(); + let mut pending = self.pending + .write() + .expect("'pending' write lock in process_verified_transaction_credits"); pending.insert(tr.sig, plan); } } @@ -252,7 +274,11 @@ impl Accountant { /// Process a Witness Signature that has already been verified. fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { - if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) { + if let Occupied(mut e) = self.pending + .write() + .expect("write() in process_verified_sig") + .entry(tx_sig) + { e.get_mut().apply_witness(&Witness::Signature(from)); if let Some(ref payment) = e.get().final_payment() { apply_payment(&self.balances, payment); @@ -267,13 +293,24 @@ impl Accountant { fn process_verified_timestamp(&self, from: PublicKey, dt: DateTime) -> Result<()> { // If this is the first timestamp we've seen, it probably came from the genesis block, // so we'll trust it. - if *self.last_time.read().unwrap() == Utc.timestamp(0, 0) { - self.time_sources.write().unwrap().insert(from); + if *self.last_time + .read() + .expect("'last_time' read lock on first timestamp check") + == Utc.timestamp(0, 0) + { + self.time_sources + .write() + .expect("'time_sources' write lock on first timestamp") + .insert(from); } - if self.time_sources.read().unwrap().contains(&from) { - if dt > *self.last_time.read().unwrap() { - *self.last_time.write().unwrap() = dt; + if self.time_sources + .read() + .expect("'time_sources' read lock") + .contains(&from) + { + if dt > *self.last_time.read().expect("'last_time' read lock") { + *self.last_time.write().expect("'last_time' write lock") = dt; } } else { return Ok(()); @@ -284,9 +321,13 @@ impl Accountant { // Hold 'pending' write lock until the end of this function. Otherwise another thread can // double-spend if it enters before the modified plan is removed from 'pending'. - let mut pending = self.pending.write().unwrap(); + let mut pending = self.pending + .write() + .expect("'pending' write lock in process_verified_timestamp"); for (key, plan) in pending.iter_mut() { - plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap())); + plan.apply_witness(&Witness::Timestamp(*self.last_time + .read() + .expect("'last_time' read lock when creating timestamp"))); if let Some(ref payment) = plan.final_payment() { apply_payment(&self.balances, payment); completed.push(key.clone()); @@ -341,7 +382,9 @@ impl Accountant { } pub fn get_balance(&self, pubkey: &PublicKey) -> Option { - let bals = self.balances.read().unwrap(); + let bals = self.balances + .read() + .expect("'balances' read lock in get_balance"); bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64) } } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 0eb38e195e..1c97043c36 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -17,8 +17,8 @@ use std::env; use std::io::{stdin, stdout, Read}; use std::net::UdpSocket; use std::process::exit; -use std::sync::Arc; use std::sync::atomic::AtomicBool; +use std::sync::Arc; fn print_usage(program: &str, opts: Options) { let mut brief = format!("Usage: cat | {} [options]\n\n", program); diff --git a/src/crdt.rs b/src/crdt.rs index df01d89ec1..965dac08fe 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -168,7 +168,7 @@ impl Crdt { ) -> Result<()> { let (me, table): (ReplicatedData, Vec) = { // copy to avoid locking durring IO - let robj = obj.read().unwrap(); + let robj = obj.read().expect("'obj' read lock in pub fn broadcast"); let cloned_table: Vec = robj.table.values().cloned().collect(); (robj.table[&robj.me].clone(), cloned_table) }; @@ -194,10 +194,10 @@ impl Crdt { .map(|((i, v), b)| { // only leader should be broadcasting assert!(me.current_leader_id != v.id); - let mut blob = b.write().unwrap(); - blob.set_id(me.id).expect("set_id"); + let mut blob = b.write().expect("'b' write lock in pub fn broadcast"); + blob.set_id(me.id).expect("set_id in pub fn broadcast"); blob.set_index(*transmit_index + i as u64) - .expect("set_index"); + .expect("set_index in pub fn broadcast"); //TODO profile this, may need multiple sockets for par_iter s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr) }) @@ -219,10 +219,10 @@ impl Crdt { pub fn retransmit(obj: &Arc>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> { let (me, table): (ReplicatedData, Vec) = { // copy to avoid locking durring IO - let s = obj.read().unwrap(); + let s = obj.read().expect("'obj' read lock in pub fn retransmit"); (s.table[&s.me].clone(), s.table.values().cloned().collect()) }; - let rblob = blob.read().unwrap(); + let rblob = blob.read().expect("'blob' read lock in pub fn retransmit"); let daddr = "0.0.0.0:0".parse().unwrap(); let orders: Vec<_> = table .iter() @@ -261,9 +261,10 @@ impl Crdt { fn random() -> u64 { let rnd = SystemRandom::new(); let mut buf = [0u8; 8]; - rnd.fill(&mut buf).unwrap(); + rnd.fill(&mut buf).expect("rnd.fill in pub fn random"); let mut rdr = Cursor::new(&buf); - rdr.read_u64::().unwrap() + rdr.read_u64::() + .expect("rdr.read_u64 in fn random") } fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { //trace!("get updates since {}", v); @@ -287,10 +288,19 @@ impl Crdt { return Err(Error::GeneralError); } let mut n = (Self::random() as usize) % self.table.len(); - while self.table.values().nth(n).unwrap().id == self.me { + while self.table + .values() + .nth(n) + .expect("'values().nth(n)' while loop in fn gossip_request") + .id == self.me + { n = (Self::random() as usize) % self.table.len(); } - let v = self.table.values().nth(n).unwrap().clone(); + let v = self.table + .values() + .nth(n) + .expect("'values().nth(n)' in fn gossip_request") + .clone(); let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); Ok((v.gossip_addr, req)) @@ -303,7 +313,9 @@ impl Crdt { // Lock the object only to do this operation and not for any longer // especially not when doing the `sock.send_to` - let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request()?; + let (remote_gossip_addr, req) = obj.read() + .expect("'obj' read lock in fn run_gossip") + .gossip_request()?; let sock = UdpSocket::bind("0.0.0.0:0")?; // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have @@ -335,7 +347,11 @@ impl Crdt { return; } //TODO this should be a tuned parameter - sleep(obj.read().unwrap().timeout); + sleep( + obj.read() + .expect("'obj' read lock in pub fn gossip") + .timeout, + ); }) } @@ -353,18 +369,25 @@ impl Crdt { trace!("RequestUpdates {}", v); let addr = reqdata.gossip_addr; // only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from` - let (from, ups, data) = obj.read().unwrap().get_updates_since(v); + let (from, ups, data) = obj.read() + .expect("'obj' read lock in RequestUpdates") + .get_updates_since(v); trace!("get updates since response {} {}", v, data.len()); let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?; trace!("send_to {}", addr); //TODO verify reqdata belongs to sender - obj.write().unwrap().insert(reqdata); - sock.send_to(&rsp, addr).unwrap(); + obj.write() + .expect("'obj' write lock in RequestUpdates") + .insert(reqdata); + sock.send_to(&rsp, addr) + .expect("'sock.send_to' in RequestUpdates"); trace!("send_to done!"); } Protocol::ReceiveUpdates(from, ups, data) => { trace!("ReceivedUpdates"); - obj.write().unwrap().apply_updates(from, ups, &data); + obj.write() + .expect("'obj' write lock in ReceiveUpdates") + .apply_updates(from, ups, &data); } } Ok(()) @@ -374,7 +397,8 @@ impl Crdt { sock: UdpSocket, exit: Arc, ) -> JoinHandle<()> { - sock.set_read_timeout(Some(Duration::new(2, 0))).unwrap(); + sock.set_read_timeout(Some(Duration::new(2, 0))) + .expect("'sock.set_read_timeout' in crdt.rs"); spawn(move || loop { let _ = Self::run_listen(&obj, &sock); if exit.load(Ordering::Relaxed) { diff --git a/src/ecdsa.rs b/src/ecdsa.rs index 4110387015..1214302133 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -59,7 +59,7 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { .into_par_iter() .map(|p| { p.read() - .unwrap() + .expect("'p' read lock in ed25519_verify") .packets .par_iter() .map(verify_packet) @@ -78,7 +78,11 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { let mut rvs = Vec::new(); for packets in batches { - locks.push(packets.read().unwrap()); + locks.push( + packets + .read() + .expect("'packets' read lock in pub fn ed25519_verify"), + ); } let mut num = 0; for p in locks { @@ -135,8 +139,8 @@ mod tests { use packet::{Packet, Packets, SharedPackets}; use std::sync::RwLock; use thin_client_service::Request; - use transaction::Transaction; use transaction::test_tx; + use transaction::Transaction; fn make_packet_from_transaction(tr: Transaction) -> Packet { let tx = serialize(&Request::Transaction(tr)).unwrap(); diff --git a/src/erasure.rs b/src/erasure.rs index 12b4223bb9..35c543f090 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -164,10 +164,14 @@ pub fn generate_coding( let mut coding_ptrs: Vec<&mut [u8]> = Vec::new(); for i in consumed..consumed + NUM_DATA { let n = i % window.len(); - data_blobs.push(window[n].clone().unwrap()); + data_blobs.push( + window[n] + .clone() + .expect("'data_blobs' arr in pub fn generate_coding"), + ); } for b in &data_blobs { - data_locks.push(b.write().unwrap()); + data_locks.push(b.write().expect("'b' write lock in pub fn generate_coding")); } for (i, l) in data_locks.iter_mut().enumerate() { trace!("i: {} data: {}", i, l.data[0]); @@ -180,10 +184,17 @@ pub fn generate_coding( for i in coding_start..coding_end { let n = i % window.len(); window[n] = re.allocate(); - coding_blobs.push(window[n].clone().unwrap()); + coding_blobs.push( + window[n] + .clone() + .expect("'coding_blobs' arr in pub fn generate_coding"), + ); } for b in &coding_blobs { - coding_locks.push(b.write().unwrap()); + coding_locks.push( + b.write() + .expect("'coding_locks' arr in pub fn generate_coding"), + ); } for (i, l) in coding_locks.iter_mut().enumerate() { trace!("i: {} data: {}", i, l.data[0]); @@ -231,7 +242,7 @@ pub fn recover( let j = i % window.len(); let mut b = &mut window[j]; if b.is_some() { - blobs.push(b.clone().unwrap()); + blobs.push(b.clone().expect("'blobs' arr in pb fn recover")); continue; } let n = re.allocate(); @@ -244,7 +255,7 @@ pub fn recover( trace!("erasures: {:?}", erasures); //lock everything for b in &blobs { - locks.push(b.write().unwrap()); + locks.push(b.write().expect("'locks' arr in pb fn recover")); } for (i, l) in locks.iter_mut().enumerate() { if i >= NUM_DATA { diff --git a/src/event.rs b/src/event.rs index fabd6d7735..b3a317e199 100644 --- a/src/event.rs +++ b/src/event.rs @@ -24,7 +24,7 @@ pub enum Event { impl Event { /// Create and sign a new Witness Timestamp. Used for unit-testing. pub fn new_timestamp(from: &KeyPair, dt: DateTime) -> Self { - let sign_data = serialize(&dt).unwrap(); + let sign_data = serialize(&dt).expect("serialize 'dt' in pub fn new_timestamp"); let sig = Signature::clone_from_slice(from.sign(&sign_data).as_ref()); Event::Timestamp { from: from.pubkey(), @@ -49,7 +49,10 @@ impl Event { match *self { Event::Transaction(ref tr) => tr.verify_sig(), Event::Signature { from, tx_sig, sig } => sig.verify(&from, &tx_sig), - Event::Timestamp { from, dt, sig } => sig.verify(&from, &serialize(&dt).unwrap()), + Event::Timestamp { from, dt, sig } => sig.verify( + &from, + &serialize(&dt).expect("serialize 'dt' in pub fn verify"), + ), } } } diff --git a/src/hash.rs b/src/hash.rs index ee7598a0dc..61dd01468c 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -1,7 +1,7 @@ //! The `hash` module provides functions for creating SHA-256 hashes. -use generic_array::GenericArray; use generic_array::typenum::U32; +use generic_array::GenericArray; use sha2::{Digest, Sha256}; pub type Hash = GenericArray; diff --git a/src/historian.rs b/src/historian.rs index 019ec57d36..7a183c1555 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -4,8 +4,8 @@ use entry::Entry; use hash::Hash; use recorder::{ExitReason, Recorder, Signal}; -use std::sync::Mutex; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; +use std::sync::Mutex; use std::thread::{spawn, JoinHandle}; use std::time::Instant; @@ -52,7 +52,10 @@ impl Historian { } pub fn receive(self: &Self) -> Result { - self.output.lock().unwrap().try_recv() + self.output + .lock() + .expect("'output' lock in pub fn receive") + .try_recv() } } diff --git a/src/mint.rs b/src/mint.rs index 7f7f1041fe..52a685e3ea 100644 --- a/src/mint.rs +++ b/src/mint.rs @@ -1,7 +1,7 @@ //! The `mint` module is a library for generating the chain's genesis block. -use entry::Entry; use entry::create_entry; +use entry::Entry; use event::Event; use hash::{hash, Hash}; use ring::rand::SystemRandom; @@ -19,8 +19,11 @@ pub struct Mint { impl Mint { pub fn new(tokens: i64) -> Self { let rnd = SystemRandom::new(); - let pkcs8 = KeyPair::generate_pkcs8(&rnd).unwrap().to_vec(); - let keypair = KeyPair::from_pkcs8(Input::from(&pkcs8)).unwrap(); + let pkcs8 = KeyPair::generate_pkcs8(&rnd) + .expect("generate_pkcs8 in mint pub fn new") + .to_vec(); + let keypair = + KeyPair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in mint pub fn new"); let pubkey = keypair.pubkey(); Mint { pkcs8, @@ -38,7 +41,7 @@ impl Mint { } pub fn keypair(&self) -> KeyPair { - KeyPair::from_pkcs8(Input::from(&self.pkcs8)).unwrap() + KeyPair::from_pkcs8(Input::from(&self.pkcs8)).expect("from_pkcs8 in mint pub fn keypair") } pub fn pubkey(&self) -> PublicKey { diff --git a/src/packet.rs b/src/packet.rs index 713a166f68..258498dcfc 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -156,12 +156,12 @@ impl Clone for Recycler { impl Recycler { pub fn allocate(&self) -> Arc> { - let mut gc = self.gc.lock().expect("recycler lock"); + let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate"); gc.pop() .unwrap_or_else(|| Arc::new(RwLock::new(Default::default()))) } pub fn recycle(&self, msgs: Arc>) { - let mut gc = self.gc.lock().expect("recycler lock"); + let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); gc.push(msgs); } } @@ -264,7 +264,7 @@ impl Blob { for i in 0..NUM_BLOBS { let r = re.allocate(); { - let mut p = r.write().unwrap(); + let mut p = r.write().expect("'r' write lock in pub fn recv_from"); match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { trace!("got {:?} messages", i); @@ -294,7 +294,7 @@ impl Blob { ) -> Result<()> { while let Some(r) = v.pop_front() { { - let p = r.read().unwrap(); + let p = r.read().expect("'r' read lock in pub fn send_to"); let a = p.meta.addr(); socket.send_to(&p.data[..p.meta.size], &a)?; } diff --git a/src/result.rs b/src/result.rs index fca876ebec..d2cb485add 100644 --- a/src/result.rs +++ b/src/result.rs @@ -78,9 +78,9 @@ mod tests { use std::io; use std::io::Write; use std::net::SocketAddr; + use std::sync::mpsc::channel; use std::sync::mpsc::RecvError; use std::sync::mpsc::RecvTimeoutError; - use std::sync::mpsc::channel; use std::thread; fn addr_parse_error() -> Result { diff --git a/src/signature.rs b/src/signature.rs index 2673b3bc99..fca8dfb234 100644 --- a/src/signature.rs +++ b/src/signature.rs @@ -24,8 +24,10 @@ impl KeyPairUtil for Ed25519KeyPair { /// Return a new ED25519 keypair fn new() -> Self { let rng = rand::SystemRandom::new(); - let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng).unwrap(); - signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes)).unwrap() + let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng) + .expect("generate_pkcs8 in signature pb fn new"); + signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes)) + .expect("from_pcks8 in signature pb fn new") } /// Return the public key for the given keypair diff --git a/src/streamer.rs b/src/streamer.rs index 13a8d04a28..d551a890e4 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -27,7 +27,10 @@ fn recv_loop( let msgs = re.allocate(); let msgs_ = msgs.clone(); loop { - match msgs.write().unwrap().recv_from(sock) { + match msgs.write() + .expect("write lock in fn recv_loop") + .recv_from(sock) + { Ok(()) => { channel.send(msgs_)?; break; @@ -136,7 +139,10 @@ fn recv_window( ) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; - let leader_id = crdt.read().unwrap().leader_data().id; + let leader_id = crdt.read() + .expect("'crdt' read lock in fn recv_window") + .leader_data() + .id; while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } @@ -144,17 +150,17 @@ fn recv_window( //retransmit all leader blocks let mut retransmitq = VecDeque::new(); for b in &dq { - let p = b.read().unwrap(); + let p = b.read().expect("'b' read lock in fn recv_window"); //TODO this check isn't safe against adverserial packets //we need to maintain a sequence window trace!( "idx: {} addr: {:?} id: {:?} leader: {:?}", - p.get_index().unwrap(), - p.get_id().unwrap(), + p.get_index().expect("get_index in fn recv_window"), + p.get_id().expect("get_id in trace! fn recv_window"), p.meta.addr(), leader_id ); - if p.get_id().unwrap() == leader_id { + if p.get_id().expect("get_id in fn recv_window") == leader_id { //TODO //need to copy the retransmited blob //otherwise we get into races with which thread @@ -164,7 +170,7 @@ fn recv_window( //is dropped via a weakref to the recycler let nv = recycler.allocate(); { - let mut mnv = nv.write().unwrap(); + let mut mnv = nv.write().expect("recycler write lock in fn recv_window"); let sz = p.meta.size; mnv.meta.size = sz; mnv.data[..sz].copy_from_slice(&p.data[..sz]); @@ -180,7 +186,7 @@ fn recv_window( let mut contq = VecDeque::new(); while let Some(b) = dq.pop_front() { let b_ = b.clone(); - let p = b.write().unwrap(); + let p = b.write().expect("'b' write lock in fn recv_window"); let pix = p.get_index()? as usize; let w = pix % NUM_BLOBS; //TODO, after the block are authenticated @@ -199,7 +205,7 @@ fn recv_window( if window[k].is_none() { break; } - contq.push_back(window[k].clone().unwrap()); + contq.push_back(window[k].clone().expect("clone in fn recv_window")); window[k] = None; *consumed += 1; } @@ -457,8 +463,8 @@ mod test { use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; - use streamer::{BlobReceiver, PacketReceiver}; use streamer::{blob_receiver, receiver, responder, retransmitter, window}; + use streamer::{BlobReceiver, PacketReceiver}; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 { diff --git a/src/thin_client.rs b/src/thin_client.rs index 274ced7e8a..e1ee2b4dcd 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -40,7 +40,7 @@ impl ThinClient { pub fn init(&self) { let subscriptions = vec![Subscription::EntryInfo]; let req = Request::Subscribe { subscriptions }; - let data = serialize(&req).expect("serialize Subscribe"); + let data = serialize(&req).expect("serialize Subscribe in thin_client"); trace!("subscribing to {}", self.addr); let _res = self.socket.send_to(&data, &self.addr); } @@ -50,7 +50,7 @@ impl ThinClient { info!("start recv_from"); self.socket.recv_from(&mut buf)?; info!("end recv_from"); - let resp = deserialize(&buf).expect("deserialize balance"); + let resp = deserialize(&buf).expect("deserialize balance in thin_client"); Ok(resp) } @@ -72,7 +72,7 @@ impl ThinClient { /// does not wait for a response. pub fn transfer_signed(&self, tr: Transaction) -> io::Result { let req = Request::Transaction(tr); - let data = serialize(&req).unwrap(); + let data = serialize(&req).expect("serialize Transaction in pub fn transfer_signed"); self.socket.send_to(&data, &self.addr) } @@ -95,10 +95,10 @@ impl ThinClient { pub fn get_balance(&mut self, pubkey: &PublicKey) -> io::Result { info!("get_balance"); let req = Request::GetBalance { key: *pubkey }; - let data = serialize(&req).expect("serialize GetBalance"); + let data = serialize(&req).expect("serialize GetBalance in pub fn get_balance"); self.socket .send_to(&data, &self.addr) - .expect("buffer error"); + .expect("buffer error in pub fn get_balance"); let mut done = false; while !done { let resp = self.recv_response()?; @@ -124,7 +124,8 @@ impl ThinClient { // Wait for at least one EntryInfo. let mut done = false; while !done { - let resp = self.recv_response().expect("recv response"); + let resp = self.recv_response() + .expect("recv_response in pub fn transaction_count"); if let &Response::EntryInfo(_) = &resp { done = true; } @@ -132,14 +133,18 @@ impl ThinClient { } // Then take the rest. - self.socket.set_nonblocking(true).expect("set nonblocking"); + self.socket + .set_nonblocking(true) + .expect("set_nonblocking in pub fn transaction_count"); loop { match self.recv_response() { Err(_) => break, Ok(resp) => self.process_response(resp), } } - self.socket.set_nonblocking(false).expect("set blocking"); + self.socket + .set_nonblocking(false) + .expect("set_nonblocking in pub fn transaction_count"); self.num_events } } diff --git a/src/timing.rs b/src/timing.rs index 0d3c383839..4b0b9ab576 100644 --- a/src/timing.rs +++ b/src/timing.rs @@ -10,6 +10,8 @@ pub fn duration_as_s(d: &Duration) -> f32 { } pub fn timestamp() -> u64 { - let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("create timestamp in timing"); return duration_as_ms(&now); } diff --git a/src/tpu.rs b/src/tpu.rs index 1cc373fc9a..bb3fce0e04 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -12,8 +12,8 @@ use rand::{thread_rng, Rng}; use result::Result; use serde_json; use std::collections::VecDeque; -use std::io::Write; use std::io::sink; +use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Sender}; @@ -48,10 +48,10 @@ impl Tpu { .accountant .register_entry_id(&entry.id); writeln!( - writer.lock().unwrap(), + writer.lock().expect("'writer' lock in fn fn write_entry"), "{}", - serde_json::to_string(&entry).unwrap() - ).unwrap(); + serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry") + ).expect("writeln! in fn write_entry"); self.thin_client_service .notify_entry_info_subscribers(&entry); } @@ -62,11 +62,16 @@ impl Tpu { let entry = self.accounting_stage .output .lock() - .unwrap() + .expect("'ouput' lock in fn receive_all") .recv_timeout(Duration::new(1, 0))?; self.write_entry(writer, &entry); l.push(entry); - while let Ok(entry) = self.accounting_stage.output.lock().unwrap().try_recv() { + while let Ok(entry) = self.accounting_stage + .output + .lock() + .expect("'output' lock in fn write_entries") + .try_recv() + { self.write_entry(writer, &entry); l.push(entry); } @@ -130,7 +135,10 @@ impl Tpu { ) -> Result<()> { let r = ecdsa::ed25519_verify(&batch); let res = batch.into_iter().zip(r).collect(); - sendr.lock().unwrap().send(res)?; + sendr + .lock() + .expect("lock in fn verify_batch in tpu") + .send(res)?; // TODO: fix error handling here? Ok(()) } @@ -139,7 +147,9 @@ impl Tpu { recvr: &Arc>, sendr: &Arc)>>>>, ) -> Result<()> { - let (batch, len) = streamer::recv_batch(&recvr.lock().unwrap())?; + let (batch, len) = + streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; + let now = Instant::now(); let batch_len = batch.len(); let rand_id = thread_rng().gen_range(0, 100); @@ -150,7 +160,7 @@ impl Tpu { rand_id ); - Self::verify_batch(batch, sendr).unwrap(); + Self::verify_batch(batch, sendr).expect("verify_batch in fn verifier"); let total_time_ms = timing::duration_as_ms(&now.elapsed()); let total_time_s = timing::duration_as_s(&now.elapsed()); @@ -314,8 +324,12 @@ impl Tpu { ) -> Result>> { //replicate pipeline let crdt = Arc::new(RwLock::new(Crdt::new(me))); - crdt.write().unwrap().set_leader(leader.id); - crdt.write().unwrap().insert(leader); + crdt.write() + .expect("'crdt' write lock in pub fn replicate") + .set_leader(leader.id); + crdt.write() + .expect("'crdt' write lock before insert() in pub fn replicate") + .insert(leader); let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone()); diff --git a/src/transaction.rs b/src/transaction.rs index 4080d11be2..ef2acccae4 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -70,7 +70,7 @@ impl Transaction { } fn get_sign_data(&self) -> Vec { - serialize(&(&self.data)).unwrap() + serialize(&(&self.data)).expect("serialize TransactionData in fn get_sign_data") } /// Sign this transaction.