diff --git a/src/crdt.rs b/src/crdt.rs index df01d89ec1..b7f712a0c7 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -168,7 +168,7 @@ impl Crdt { ) -> Result<()> { let (me, table): (ReplicatedData, Vec) = { // copy to avoid locking durring IO - let robj = obj.read().unwrap(); + let robj = obj.read().expect("'obj' read lock in pub fn broadcast"); let cloned_table: Vec = robj.table.values().cloned().collect(); (robj.table[&robj.me].clone(), cloned_table) }; @@ -194,10 +194,10 @@ impl Crdt { .map(|((i, v), b)| { // only leader should be broadcasting assert!(me.current_leader_id != v.id); - let mut blob = b.write().unwrap(); - blob.set_id(me.id).expect("set_id"); + let mut blob = b.write().expect("'b' write lock in pub fn broadcast"); + blob.set_id(me.id).expect("set_id in pub fn broadcast"); blob.set_index(*transmit_index + i as u64) - .expect("set_index"); + .expect("set_index in pub fn broadcast"); //TODO profile this, may need multiple sockets for par_iter s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr) }) @@ -219,10 +219,10 @@ impl Crdt { pub fn retransmit(obj: &Arc>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> { let (me, table): (ReplicatedData, Vec) = { // copy to avoid locking durring IO - let s = obj.read().unwrap(); + let s = obj.read().expect("'obj' read lock in pub fn retransmit"); (s.table[&s.me].clone(), s.table.values().cloned().collect()) }; - let rblob = blob.read().unwrap(); + let rblob = blob.read().expect("'blob' read lock in pub fn retransmit"); let daddr = "0.0.0.0:0".parse().unwrap(); let orders: Vec<_> = table .iter() @@ -261,9 +261,9 @@ impl Crdt { fn random() -> u64 { let rnd = SystemRandom::new(); let mut buf = [0u8; 8]; - rnd.fill(&mut buf).unwrap(); + rnd.fill(&mut buf).expect("rnd.fill in pub fn random"); let mut rdr = Cursor::new(&buf); - rdr.read_u64::().unwrap() + rdr.read_u64::().expect("rdr.read_u64 in fn random") } fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { //trace!("get updates since {}", v); @@ -287,10 +287,10 @@ impl Crdt { return Err(Error::GeneralError); } let mut n = (Self::random() as usize) % self.table.len(); - while self.table.values().nth(n).unwrap().id == self.me { + while self.table.values().nth(n).expect("'values().nth(n)' while loop in fn gossip_request").id == self.me { n = (Self::random() as usize) % self.table.len(); } - let v = self.table.values().nth(n).unwrap().clone(); + let v = self.table.values().nth(n).expect("'values().nth(n)' in fn gossip_request").clone(); let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); Ok((v.gossip_addr, req)) @@ -303,7 +303,7 @@ impl Crdt { // Lock the object only to do this operation and not for any longer // especially not when doing the `sock.send_to` - let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request()?; + let (remote_gossip_addr, req) = obj.read().expect("'obj' read lock in fn run_gossip").gossip_request()?; let sock = UdpSocket::bind("0.0.0.0:0")?; // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have @@ -335,7 +335,7 @@ impl Crdt { return; } //TODO this should be a tuned parameter - sleep(obj.read().unwrap().timeout); + sleep(obj.read().expect("'obj' read lock in pub fn gossip").timeout); }) } @@ -353,18 +353,18 @@ impl Crdt { trace!("RequestUpdates {}", v); let addr = reqdata.gossip_addr; // only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from` - let (from, ups, data) = obj.read().unwrap().get_updates_since(v); + let (from, ups, data) = obj.read().expect("'obj' read lock in RequestUpdates").get_updates_since(v); trace!("get updates since response {} {}", v, data.len()); let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?; trace!("send_to {}", addr); //TODO verify reqdata belongs to sender - obj.write().unwrap().insert(reqdata); - sock.send_to(&rsp, addr).unwrap(); + obj.write().expect("'obj' write lock in RequestUpdates").insert(reqdata); + sock.send_to(&rsp, addr).expect("'sock.send_to' in RequestUpdates"); trace!("send_to done!"); } Protocol::ReceiveUpdates(from, ups, data) => { trace!("ReceivedUpdates"); - obj.write().unwrap().apply_updates(from, ups, &data); + obj.write().expect("'obj' write lock in ReceiveUpdates").apply_updates(from, ups, &data); } } Ok(()) @@ -374,7 +374,7 @@ impl Crdt { sock: UdpSocket, exit: Arc, ) -> JoinHandle<()> { - sock.set_read_timeout(Some(Duration::new(2, 0))).unwrap(); + sock.set_read_timeout(Some(Duration::new(2, 0))).expect("'sock.set_read_timeout' in crdt.rs"); spawn(move || loop { let _ = Self::run_listen(&obj, &sock); if exit.load(Ordering::Relaxed) {