From bb654f286c9953aeaa36d04ba67ea6fed1447a9e Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Thu, 10 May 2018 18:21:10 -0700 Subject: [PATCH] tpu.rs - panic cleanup --- src/tpu.rs | 52 ++++++++++++++++++++++++++-------------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/src/tpu.rs b/src/tpu.rs index 6da34e1332..86a9a1056b 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -93,7 +93,7 @@ impl Tpu { let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); // copy subscribers to avoid taking lock while doing io - let addrs = obj.entry_info_subscribers.lock().unwrap().clone(); + let addrs = obj.entry_info_subscribers.lock().expect("'entry_info_subscribers' lock").clone(); trace!("Sending to {} addrs", addrs.len()); for addr in addrs { let entry_info = EntryInfo { @@ -113,12 +113,12 @@ impl Tpu { fn update_entry(obj: &SharedTpu, writer: &Arc>, entry: &Entry) { trace!("update_entry entry"); - obj.acc.lock().unwrap().register_entry_id(&entry.id); + obj.acc.lock().expect("'acc' lock in fn update_entry").register_entry_id(&entry.id); writeln!( - writer.lock().unwrap(), + writer.lock().expect("'writer' lock in fn update_entry"), "{}", - serde_json::to_string(&entry).unwrap() - ).unwrap(); + serde_json::to_string(&entry).expect("entry to_string in fn update_entry") + ).expect("writeln! in fn update_entry"); Self::notify_entry_info_subscribers(obj, &entry); } @@ -128,7 +128,7 @@ impl Tpu { let entry = obj.historian .output .lock() - .unwrap() + .expect("'ouput' lock in fn receive_all") .recv_timeout(Duration::new(1, 0))?; Self::update_entry(obj, writer, &entry); l.push(entry); @@ -166,13 +166,13 @@ impl Tpu { let b = blob_recycler.allocate(); let pos = { - let mut bd = b.write().unwrap(); + let mut bd = b.write().expect("'b' write lock in pos in fn process_entry_list_into_blobs"); let mut out = Cursor::new(bd.data_mut()); - serialize_into(&mut out, &list[start..end]).expect("failed to serialize output"); + serialize_into(&mut out, &list[start..end]).expect("serialize_into in fn process_entry_list_into_blobs"); out.position() as usize }; assert!(pos < BLOB_SIZE); - b.write().unwrap().set_size(pos); + b.write().expect("'b' write lock in fn process_entry_list_into_blobs").set_size(pos); q.push_back(b); start = end; } @@ -255,7 +255,7 @@ impl Tpu { ) -> Option<(Response, SocketAddr)> { match msg { Request::GetBalance { key } => { - let val = self.acc.lock().unwrap().get_balance(&key); + let val = self.acc.lock().expect("'acc' lock in pub fn process_request").get_balance(&key); let rsp = (Response::Balance { key, val }, rsp_addr); info!("Response::Balance {:?}", rsp); Some(rsp) @@ -265,7 +265,7 @@ impl Tpu { for subscription in subscriptions { match subscription { Subscription::EntryInfo => { - self.entry_info_subscribers.lock().unwrap().push(rsp_addr) + self.entry_info_subscribers.lock().expect("lock in Subscribe in fn process_request").push(rsp_addr) } } } @@ -278,11 +278,11 @@ impl Tpu { let timer = Duration::new(1, 0); let msgs = recvr.recv_timeout(timer)?; debug!("got msgs"); - let mut len = msgs.read().unwrap().packets.len(); + let mut len = msgs.read().expect("'msgs' read lock in fn recv_batch").packets.len(); let mut batch = vec![msgs]; while let Ok(more) = recvr.try_recv() { trace!("got more msgs"); - len += more.read().unwrap().packets.len(); + len += more.read().expect("'more' read lock in fn recv_batch").packets.len(); batch.push(more); if len > 100_000 { @@ -299,7 +299,7 @@ impl Tpu { ) -> Result<()> { let r = ecdsa::ed25519_verify(&batch); let res = batch.into_iter().zip(r).collect(); - sendr.lock().unwrap().send(res)?; + sendr.lock().expect("lock in fn verify_batch in tpu").send(res)?; // TODO: fix error handling here? Ok(()) } @@ -308,7 +308,7 @@ impl Tpu { recvr: &Arc>, sendr: &Arc)>>>>, ) -> Result<()> { - let (batch, len) = Self::recv_batch(&recvr.lock().unwrap())?; + let (batch, len) = Self::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; let now = Instant::now(); let batch_len = batch.len(); let rand_id = thread_rng().gen_range(0, 100); @@ -319,7 +319,7 @@ impl Tpu { rand_id ); - Self::verify_batch(batch, sendr).unwrap(); + Self::verify_batch(batch, sendr).expect("verify_batch in fn verifier"); let total_time_ms = timing::duration_as_ms(&now.elapsed()); let total_time_s = timing::duration_as_s(&now.elapsed()); @@ -367,18 +367,18 @@ impl Tpu { /// Process the transactions in parallel and then log the successful ones. fn process_events(&self, events: Vec) -> Result<()> { - for result in self.acc.lock().unwrap().process_verified_events(events) { + for result in self.acc.lock().expect("'acc' lock in fn process_events").process_verified_events(events) { if let Ok(event) = result { self.historian_input .lock() - .unwrap() + .expect("historian_input lock in in for loop fn process_events") .send(Signal::Event(event))?; } } // Let validators know they should not attempt to process additional // transactions in parallel. - self.historian_input.lock().unwrap().send(Signal::Tick)?; + self.historian_input.lock().expect("'historian_input' lock in fn process_events").send(Signal::Tick)?; debug!("after historian_input"); Ok(()) @@ -397,7 +397,7 @@ impl Tpu { ) -> Result { let blob = blob_recycler.allocate(); { - let mut b = blob.write().unwrap(); + let mut b = blob.write().expect("write in fn serialize_response"); let v = serialize(&resp)?; let len = v.len(); b.data[..len].copy_from_slice(&v); @@ -438,7 +438,7 @@ impl Tpu { ); let proc_start = Instant::now(); for (msgs, vers) in mms { - let reqs = Self::deserialize_packets(&msgs.read().unwrap()); + let reqs = Self::deserialize_packets(&msgs.read().expect("'msgs' read lock in fn process in tpu")); reqs_len += reqs.len(); let req_vers = reqs.into_iter() .zip(vers) @@ -492,9 +492,9 @@ impl Tpu { let blobs = verified_receiver.recv_timeout(timer)?; trace!("replicating blobs {}", blobs.len()); for msgs in &blobs { - let blob = msgs.read().unwrap(); - let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); - let acc = obj.acc.lock().unwrap(); + let blob = msgs.read().expect("'msgs' read lock in fn replicate_state in tpu"); + let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).expect("deserialize in fn replicate_state"); + let acc = obj.acc.lock().expect("'acc' lock in fn replicate_state"); for entry in entries { acc.register_entry_id(&entry.id); for result in acc.process_verified_events(entry.events) { @@ -640,8 +640,8 @@ impl Tpu { ) -> Result>> { //replicate pipeline let crdt = Arc::new(RwLock::new(Crdt::new(me))); - crdt.write().unwrap().set_leader(leader.id); - crdt.write().unwrap().insert(leader); + crdt.write().expect("'crdt' write lock in pub fn replicate").set_leader(leader.id); + crdt.write().expect("'crdt' write lock before insert() in pub fn replicate").insert(leader); let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone());