tpu.rs - panic cleanup

This commit is contained in:
Jackson Sandland
2018-05-10 18:21:10 -07:00
parent 18d3659b91
commit bb654f286c

View File

@ -93,7 +93,7 @@ impl Tpu {
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
// copy subscribers to avoid taking lock while doing io // copy subscribers to avoid taking lock while doing io
let addrs = obj.entry_info_subscribers.lock().unwrap().clone(); let addrs = obj.entry_info_subscribers.lock().expect("'entry_info_subscribers' lock").clone();
trace!("Sending to {} addrs", addrs.len()); trace!("Sending to {} addrs", addrs.len());
for addr in addrs { for addr in addrs {
let entry_info = EntryInfo { let entry_info = EntryInfo {
@ -113,12 +113,12 @@ impl Tpu {
fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) { fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) {
trace!("update_entry entry"); trace!("update_entry entry");
obj.acc.lock().unwrap().register_entry_id(&entry.id); obj.acc.lock().expect("'acc' lock in fn update_entry").register_entry_id(&entry.id);
writeln!( writeln!(
writer.lock().unwrap(), writer.lock().expect("'writer' lock in fn update_entry"),
"{}", "{}",
serde_json::to_string(&entry).unwrap() serde_json::to_string(&entry).expect("entry to_string in fn update_entry")
).unwrap(); ).expect("writeln! in fn update_entry");
Self::notify_entry_info_subscribers(obj, &entry); Self::notify_entry_info_subscribers(obj, &entry);
} }
@ -128,7 +128,7 @@ impl Tpu {
let entry = obj.historian let entry = obj.historian
.output .output
.lock() .lock()
.unwrap() .expect("'ouput' lock in fn receive_all")
.recv_timeout(Duration::new(1, 0))?; .recv_timeout(Duration::new(1, 0))?;
Self::update_entry(obj, writer, &entry); Self::update_entry(obj, writer, &entry);
l.push(entry); l.push(entry);
@ -166,13 +166,13 @@ impl Tpu {
let b = blob_recycler.allocate(); let b = blob_recycler.allocate();
let pos = { let pos = {
let mut bd = b.write().unwrap(); let mut bd = b.write().expect("'b' write lock in pos in fn process_entry_list_into_blobs");
let mut out = Cursor::new(bd.data_mut()); let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &list[start..end]).expect("failed to serialize output"); serialize_into(&mut out, &list[start..end]).expect("serialize_into in fn process_entry_list_into_blobs");
out.position() as usize out.position() as usize
}; };
assert!(pos < BLOB_SIZE); assert!(pos < BLOB_SIZE);
b.write().unwrap().set_size(pos); b.write().expect("'b' write lock in fn process_entry_list_into_blobs").set_size(pos);
q.push_back(b); q.push_back(b);
start = end; start = end;
} }
@ -255,7 +255,7 @@ impl Tpu {
) -> Option<(Response, SocketAddr)> { ) -> Option<(Response, SocketAddr)> {
match msg { match msg {
Request::GetBalance { key } => { Request::GetBalance { key } => {
let val = self.acc.lock().unwrap().get_balance(&key); let val = self.acc.lock().expect("'acc' lock in pub fn process_request").get_balance(&key);
let rsp = (Response::Balance { key, val }, rsp_addr); let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp); info!("Response::Balance {:?}", rsp);
Some(rsp) Some(rsp)
@ -265,7 +265,7 @@ impl Tpu {
for subscription in subscriptions { for subscription in subscriptions {
match subscription { match subscription {
Subscription::EntryInfo => { Subscription::EntryInfo => {
self.entry_info_subscribers.lock().unwrap().push(rsp_addr) self.entry_info_subscribers.lock().expect("lock in Subscribe in fn process_request").push(rsp_addr)
} }
} }
} }
@ -278,11 +278,11 @@ impl Tpu {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?; let msgs = recvr.recv_timeout(timer)?;
debug!("got msgs"); debug!("got msgs");
let mut len = msgs.read().unwrap().packets.len(); let mut len = msgs.read().expect("'msgs' read lock in fn recv_batch").packets.len();
let mut batch = vec![msgs]; let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() { while let Ok(more) = recvr.try_recv() {
trace!("got more msgs"); trace!("got more msgs");
len += more.read().unwrap().packets.len(); len += more.read().expect("'more' read lock in fn recv_batch").packets.len();
batch.push(more); batch.push(more);
if len > 100_000 { if len > 100_000 {
@ -299,7 +299,7 @@ impl Tpu {
) -> Result<()> { ) -> Result<()> {
let r = ecdsa::ed25519_verify(&batch); let r = ecdsa::ed25519_verify(&batch);
let res = batch.into_iter().zip(r).collect(); let res = batch.into_iter().zip(r).collect();
sendr.lock().unwrap().send(res)?; sendr.lock().expect("lock in fn verify_batch in tpu").send(res)?;
// TODO: fix error handling here? // TODO: fix error handling here?
Ok(()) Ok(())
} }
@ -308,7 +308,7 @@ impl Tpu {
recvr: &Arc<Mutex<streamer::PacketReceiver>>, recvr: &Arc<Mutex<streamer::PacketReceiver>>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>, sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> Result<()> { ) -> Result<()> {
let (batch, len) = Self::recv_batch(&recvr.lock().unwrap())?; let (batch, len) = Self::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?;
let now = Instant::now(); let now = Instant::now();
let batch_len = batch.len(); let batch_len = batch.len();
let rand_id = thread_rng().gen_range(0, 100); let rand_id = thread_rng().gen_range(0, 100);
@ -319,7 +319,7 @@ impl Tpu {
rand_id rand_id
); );
Self::verify_batch(batch, sendr).unwrap(); Self::verify_batch(batch, sendr).expect("verify_batch in fn verifier");
let total_time_ms = timing::duration_as_ms(&now.elapsed()); let total_time_ms = timing::duration_as_ms(&now.elapsed());
let total_time_s = timing::duration_as_s(&now.elapsed()); let total_time_s = timing::duration_as_s(&now.elapsed());
@ -367,18 +367,18 @@ impl Tpu {
/// Process the transactions in parallel and then log the successful ones. /// Process the transactions in parallel and then log the successful ones.
fn process_events(&self, events: Vec<Event>) -> Result<()> { fn process_events(&self, events: Vec<Event>) -> Result<()> {
for result in self.acc.lock().unwrap().process_verified_events(events) { for result in self.acc.lock().expect("'acc' lock in fn process_events").process_verified_events(events) {
if let Ok(event) = result { if let Ok(event) = result {
self.historian_input self.historian_input
.lock() .lock()
.unwrap() .expect("historian_input lock in in for loop fn process_events")
.send(Signal::Event(event))?; .send(Signal::Event(event))?;
} }
} }
// Let validators know they should not attempt to process additional // Let validators know they should not attempt to process additional
// transactions in parallel. // transactions in parallel.
self.historian_input.lock().unwrap().send(Signal::Tick)?; self.historian_input.lock().expect("'historian_input' lock in fn process_events").send(Signal::Tick)?;
debug!("after historian_input"); debug!("after historian_input");
Ok(()) Ok(())
@ -397,7 +397,7 @@ impl Tpu {
) -> Result<packet::SharedBlob> { ) -> Result<packet::SharedBlob> {
let blob = blob_recycler.allocate(); let blob = blob_recycler.allocate();
{ {
let mut b = blob.write().unwrap(); let mut b = blob.write().expect("write in fn serialize_response");
let v = serialize(&resp)?; let v = serialize(&resp)?;
let len = v.len(); let len = v.len();
b.data[..len].copy_from_slice(&v); b.data[..len].copy_from_slice(&v);
@ -438,7 +438,7 @@ impl Tpu {
); );
let proc_start = Instant::now(); let proc_start = Instant::now();
for (msgs, vers) in mms { for (msgs, vers) in mms {
let reqs = Self::deserialize_packets(&msgs.read().unwrap()); let reqs = Self::deserialize_packets(&msgs.read().expect("'msgs' read lock in fn process in tpu"));
reqs_len += reqs.len(); reqs_len += reqs.len();
let req_vers = reqs.into_iter() let req_vers = reqs.into_iter()
.zip(vers) .zip(vers)
@ -492,9 +492,9 @@ impl Tpu {
let blobs = verified_receiver.recv_timeout(timer)?; let blobs = verified_receiver.recv_timeout(timer)?;
trace!("replicating blobs {}", blobs.len()); trace!("replicating blobs {}", blobs.len());
for msgs in &blobs { for msgs in &blobs {
let blob = msgs.read().unwrap(); let blob = msgs.read().expect("'msgs' read lock in fn replicate_state in tpu");
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap(); let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).expect("deserialize in fn replicate_state");
let acc = obj.acc.lock().unwrap(); let acc = obj.acc.lock().expect("'acc' lock in fn replicate_state");
for entry in entries { for entry in entries {
acc.register_entry_id(&entry.id); acc.register_entry_id(&entry.id);
for result in acc.process_verified_events(entry.events) { for result in acc.process_verified_events(entry.events) {
@ -640,8 +640,8 @@ impl Tpu {
) -> Result<Vec<JoinHandle<()>>> { ) -> Result<Vec<JoinHandle<()>>> {
//replicate pipeline //replicate pipeline
let crdt = Arc::new(RwLock::new(Crdt::new(me))); let crdt = Arc::new(RwLock::new(Crdt::new(me)));
crdt.write().unwrap().set_leader(leader.id); crdt.write().expect("'crdt' write lock in pub fn replicate").set_leader(leader.id);
crdt.write().unwrap().insert(leader); crdt.write().expect("'crdt' write lock before insert() in pub fn replicate").insert(leader);
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone()); let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone());