diff --git a/src/ncp.rs b/src/ncp.rs index b72c7a54c2..7e8d05aa57 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -38,8 +38,12 @@ impl Ncp { request_sender, )?; let (response_sender, response_receiver) = channel(); - let t_responder = - streamer::responder(gossip_send_socket, blob_recycler.clone(), response_receiver); + let t_responder = streamer::responder( + "ncp", + gossip_send_socket, + blob_recycler.clone(), + response_receiver, + ); let t_listen = Crdt::listen( crdt.clone(), window, diff --git a/src/packet.rs b/src/packet.rs index f01db16b27..037ac51e33 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -422,7 +422,13 @@ impl Blob { { let p = r.read().expect("'r' read lock in pub fn send_to"); let a = p.meta.addr(); - socket.send_to(&p.data[..p.meta.size], &a)?; + if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) { + info!( + "error sending {} byte packet to {:?}: {:?}", + p.meta.size, a, e + ); + Err(e)?; + } } re.recycle(r); } diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 7d56ccd7e6..bb2cd8e231 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -97,7 +97,12 @@ impl ReplicateStage { ) -> Self { let (vote_blob_sender, vote_blob_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); - let t_responder = responder(send, blob_recycler.clone(), vote_blob_receiver); + let t_responder = responder( + "replicate_stage", + send, + blob_recycler.clone(), + vote_blob_receiver, + ); let skeypair = Arc::new(keypair); let t_replicate = Builder::new() diff --git a/src/rpu.rs b/src/rpu.rs index b5e7f45dc5..ca5ac7d6a2 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -64,7 +64,8 @@ impl Rpu { blob_recycler.clone(), ); - let t_responder = streamer::responder(respond_socket, blob_recycler.clone(), blob_receiver); + let t_responder = + streamer::responder("rpu", respond_socket, blob_recycler.clone(), blob_receiver); let mut thread_hdls = vec![t_receiver, t_responder]; thread_hdls.extend(request_stage.thread_hdls().into_iter()); diff --git a/src/streamer.rs b/src/streamer.rs index a358c0f3dd..cea54aa40d 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -104,15 +104,20 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize)> Ok((batch, len)) } -pub fn responder(sock: UdpSocket, recycler: BlobRecycler, r: BlobReceiver) -> JoinHandle<()> { +pub fn responder( + name: &'static str, + sock: UdpSocket, + recycler: BlobRecycler, + r: BlobReceiver, +) -> JoinHandle<()> { Builder::new() - .name("solana-responder".to_string()) + .name(format!("solana-responder-{}", name)) .spawn(move || loop { if let Err(e) = recv_send(&sock, &recycler, &r) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => error!("{:?}", e), + _ => error!("{} responder error: {:?}", name, e), } } }) @@ -549,7 +554,7 @@ pub fn window( match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => error!("{:?}", e), + _ => error!("window error: {:?}", e), } } let _ = repair_window( @@ -697,7 +702,7 @@ pub fn broadcaster( Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::CrdtError(CrdtError::TooSmall) => (), // TODO: Why are the unit-tests throwing hundreds of these? - _ => error!("{:?}", e), + _ => error!("broadcaster error: {:?}", e), } } } @@ -750,7 +755,7 @@ pub fn retransmitter( match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => error!("{:?}", e), + _ => error!("retransmitter error: {:?}", e), } } } @@ -912,7 +917,12 @@ mod test { let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader); let t_responder = { let (s_responder, r_responder) = channel(); - let t_responder = responder(send, resp_recycler.clone(), r_responder); + let t_responder = responder( + "streamer_send_test", + send, + resp_recycler.clone(), + r_responder, + ); let mut msgs = VecDeque::new(); for i in 0..10 { let b = resp_recycler.allocate(); @@ -986,7 +996,12 @@ mod test { ); let t_responder = { let (s_responder, r_responder) = channel(); - let t_responder = responder(tn.sockets.replicate, resp_recycler.clone(), r_responder); + let t_responder = responder( + "window_send_test", + tn.sockets.replicate, + resp_recycler.clone(), + r_responder, + ); let mut msgs = VecDeque::new(); for v in 0..10 { let i = 9 - v; diff --git a/src/tvu.rs b/src/tvu.rs index 89e12ea4cd..4469bf73f9 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -204,8 +204,12 @@ pub mod tests { // simulate leader sending messages let (s_responder, r_responder) = channel(); - let t_responder = - streamer::responder(leader.sockets.requests, resp_recycler.clone(), r_responder); + let t_responder = streamer::responder( + "test_replicate", + leader.sockets.requests, + resp_recycler.clone(), + r_responder, + ); let starting_balance = 10_000; let mint = Mint::new(starting_balance);