Don't propogate errors we'll never handle
This commit is contained in:
@ -671,7 +671,7 @@ fn converge(
|
|||||||
None,
|
None,
|
||||||
gossip_socket,
|
gossip_socket,
|
||||||
exit_signal.clone(),
|
exit_signal.clone(),
|
||||||
).expect("DataReplicator::new");
|
);
|
||||||
let mut v: Vec<NodeInfo> = vec![];
|
let mut v: Vec<NodeInfo> = vec![];
|
||||||
//wait for the network to converge, 30 seconds should be plenty
|
//wait for the network to converge, 30 seconds should be plenty
|
||||||
for _ in 0..30 {
|
for _ in 0..30 {
|
||||||
|
@ -32,7 +32,6 @@ impl BlobFetchStage {
|
|||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|socket| {
|
.map(|socket| {
|
||||||
streamer::blob_receiver(socket, exit.clone(), recycler.clone(), sender.clone())
|
streamer::blob_receiver(socket, exit.clone(), recycler.clone(), sender.clone())
|
||||||
.expect("blob receiver init")
|
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
@ -211,7 +211,7 @@ impl Fullnode {
|
|||||||
ledger_path,
|
ledger_path,
|
||||||
node.sockets.gossip,
|
node.sockets.gossip,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
).expect("Ncp::new");
|
);
|
||||||
thread_hdls.extend(ncp.thread_hdls());
|
thread_hdls.extend(ncp.thread_hdls());
|
||||||
|
|
||||||
match leader_info {
|
match leader_info {
|
||||||
|
@ -2,7 +2,6 @@
|
|||||||
|
|
||||||
use crdt::Crdt;
|
use crdt::Crdt;
|
||||||
use packet::BlobRecycler;
|
use packet::BlobRecycler;
|
||||||
use result::Result;
|
|
||||||
use service::Service;
|
use service::Service;
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
@ -24,7 +23,7 @@ impl Ncp {
|
|||||||
ledger_path: Option<&str>,
|
ledger_path: Option<&str>,
|
||||||
gossip_socket: UdpSocket,
|
gossip_socket: UdpSocket,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
) -> Result<Ncp> {
|
) -> Self {
|
||||||
let blob_recycler = BlobRecycler::default();
|
let blob_recycler = BlobRecycler::default();
|
||||||
let (request_sender, request_receiver) = channel();
|
let (request_sender, request_receiver) = channel();
|
||||||
let gossip_socket = Arc::new(gossip_socket);
|
let gossip_socket = Arc::new(gossip_socket);
|
||||||
@ -38,7 +37,7 @@ impl Ncp {
|
|||||||
exit.clone(),
|
exit.clone(),
|
||||||
blob_recycler.clone(),
|
blob_recycler.clone(),
|
||||||
request_sender,
|
request_sender,
|
||||||
)?;
|
);
|
||||||
let (response_sender, response_receiver) = channel();
|
let (response_sender, response_receiver) = channel();
|
||||||
let t_responder = streamer::responder(
|
let t_responder = streamer::responder(
|
||||||
"ncp",
|
"ncp",
|
||||||
@ -57,7 +56,7 @@ impl Ncp {
|
|||||||
);
|
);
|
||||||
let t_gossip = Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit.clone());
|
let t_gossip = Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit.clone());
|
||||||
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
|
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
|
||||||
Ok(Ncp { exit, thread_hdls })
|
Ncp { exit, thread_hdls }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn close(self) -> thread::Result<()> {
|
pub fn close(self) -> thread::Result<()> {
|
||||||
@ -95,7 +94,7 @@ mod tests {
|
|||||||
let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
||||||
let c = Arc::new(RwLock::new(crdt));
|
let c = Arc::new(RwLock::new(crdt));
|
||||||
let w = Arc::new(RwLock::new(vec![]));
|
let w = Arc::new(RwLock::new(vec![]));
|
||||||
let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone()).unwrap();
|
let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone());
|
||||||
d.close().expect("thread join");
|
d.close().expect("thread join");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -124,12 +124,13 @@ pub fn blob_receiver(
|
|||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
recycler: BlobRecycler,
|
recycler: BlobRecycler,
|
||||||
s: BlobSender,
|
s: BlobSender,
|
||||||
) -> Result<JoinHandle<()>> {
|
) -> JoinHandle<()> {
|
||||||
//DOCUMENTED SIDE-EFFECT
|
//DOCUMENTED SIDE-EFFECT
|
||||||
//1 second timeout on socket read
|
//1 second timeout on socket read
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
sock.set_read_timeout(Some(timer))?;
|
sock.set_read_timeout(Some(timer))
|
||||||
let t = Builder::new()
|
.expect("set socket timeout");
|
||||||
|
Builder::new()
|
||||||
.name("solana-blob_receiver".to_string())
|
.name("solana-blob_receiver".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
@ -137,8 +138,7 @@ pub fn blob_receiver(
|
|||||||
}
|
}
|
||||||
let _ = recv_blobs(&recycler, &sock, &s);
|
let _ = recv_blobs(&recycler, &sock, &s);
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap()
|
||||||
Ok(t)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -366,7 +366,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> R
|
|||||||
let (node, gossip_socket) = Crdt::spy_node();
|
let (node, gossip_socket) = Crdt::spy_node();
|
||||||
let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new")));
|
let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new")));
|
||||||
let window = Arc::new(RwLock::new(vec![]));
|
let window = Arc::new(RwLock::new(vec![]));
|
||||||
let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone()).unwrap();
|
let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone());
|
||||||
let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp);
|
let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp);
|
||||||
crdt.write().unwrap().insert(&leader_entry_point);
|
crdt.write().unwrap().insert(&leader_entry_point);
|
||||||
|
|
||||||
|
15
src/tvu.rs
15
src/tvu.rs
@ -152,7 +152,6 @@ pub mod tests {
|
|||||||
use mint::Mint;
|
use mint::Mint;
|
||||||
use ncp::Ncp;
|
use ncp::Ncp;
|
||||||
use packet::BlobRecycler;
|
use packet::BlobRecycler;
|
||||||
use result::Result;
|
|
||||||
use service::Service;
|
use service::Service;
|
||||||
use signature::{Keypair, KeypairUtil};
|
use signature::{Keypair, KeypairUtil};
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
@ -170,10 +169,10 @@ pub mod tests {
|
|||||||
crdt: Arc<RwLock<Crdt>>,
|
crdt: Arc<RwLock<Crdt>>,
|
||||||
gossip: UdpSocket,
|
gossip: UdpSocket,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
) -> Result<(Ncp, SharedWindow)> {
|
) -> (Ncp, SharedWindow) {
|
||||||
let window = window::default_window();
|
let window = window::default_window();
|
||||||
let ncp = Ncp::new(&crdt, window.clone(), None, gossip, exit)?;
|
let ncp = Ncp::new(&crdt, window.clone(), None, gossip, exit);
|
||||||
Ok((ncp, window))
|
(ncp, window)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Test that message sent from leader to target1 and replicated to target2
|
/// Test that message sent from leader to target1 and replicated to target2
|
||||||
@ -191,7 +190,7 @@ pub mod tests {
|
|||||||
crdt_l.set_leader(leader.info.id);
|
crdt_l.set_leader(leader.info.id);
|
||||||
|
|
||||||
let cref_l = Arc::new(RwLock::new(crdt_l));
|
let cref_l = Arc::new(RwLock::new(crdt_l));
|
||||||
let dr_l = new_ncp(cref_l, leader.sockets.gossip, exit.clone()).unwrap();
|
let dr_l = new_ncp(cref_l, leader.sockets.gossip, exit.clone());
|
||||||
|
|
||||||
//start crdt2
|
//start crdt2
|
||||||
let mut crdt2 = Crdt::new(target2.info.clone()).expect("Crdt::new");
|
let mut crdt2 = Crdt::new(target2.info.clone()).expect("Crdt::new");
|
||||||
@ -199,7 +198,7 @@ pub mod tests {
|
|||||||
crdt2.set_leader(leader.info.id);
|
crdt2.set_leader(leader.info.id);
|
||||||
let leader_id = leader.info.id;
|
let leader_id = leader.info.id;
|
||||||
let cref2 = Arc::new(RwLock::new(crdt2));
|
let cref2 = Arc::new(RwLock::new(crdt2));
|
||||||
let dr_2 = new_ncp(cref2, target2.sockets.gossip, exit.clone()).unwrap();
|
let dr_2 = new_ncp(cref2, target2.sockets.gossip, exit.clone());
|
||||||
|
|
||||||
// setup some blob services to send blobs into the socket
|
// setup some blob services to send blobs into the socket
|
||||||
// to simulate the source peer and get blobs out of the socket to
|
// to simulate the source peer and get blobs out of the socket to
|
||||||
@ -212,7 +211,7 @@ pub mod tests {
|
|||||||
exit.clone(),
|
exit.clone(),
|
||||||
recv_recycler.clone(),
|
recv_recycler.clone(),
|
||||||
s_reader,
|
s_reader,
|
||||||
).unwrap();
|
);
|
||||||
|
|
||||||
// simulate leader sending messages
|
// simulate leader sending messages
|
||||||
let (s_responder, r_responder) = channel();
|
let (s_responder, r_responder) = channel();
|
||||||
@ -233,7 +232,7 @@ pub mod tests {
|
|||||||
crdt1.insert(&leader.info);
|
crdt1.insert(&leader.info);
|
||||||
crdt1.set_leader(leader.info.id);
|
crdt1.set_leader(leader.info.id);
|
||||||
let cref1 = Arc::new(RwLock::new(crdt1));
|
let cref1 = Arc::new(RwLock::new(crdt1));
|
||||||
let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()).unwrap();
|
let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone());
|
||||||
|
|
||||||
let tvu = Tvu::new(
|
let tvu = Tvu::new(
|
||||||
target1_keypair,
|
target1_keypair,
|
||||||
|
@ -799,7 +799,7 @@ mod test {
|
|||||||
exit.clone(),
|
exit.clone(),
|
||||||
resp_recycler.clone(),
|
resp_recycler.clone(),
|
||||||
s_reader,
|
s_reader,
|
||||||
).unwrap();
|
);
|
||||||
let (s_window, r_window) = channel();
|
let (s_window, r_window) = channel();
|
||||||
let (s_retransmit, r_retransmit) = channel();
|
let (s_retransmit, r_retransmit) = channel();
|
||||||
let win = default_window();
|
let win = default_window();
|
||||||
@ -869,7 +869,7 @@ mod test {
|
|||||||
exit.clone(),
|
exit.clone(),
|
||||||
resp_recycler.clone(),
|
resp_recycler.clone(),
|
||||||
s_reader,
|
s_reader,
|
||||||
).unwrap();
|
);
|
||||||
let (s_window, _r_window) = channel();
|
let (s_window, _r_window) = channel();
|
||||||
let (s_retransmit, r_retransmit) = channel();
|
let (s_retransmit, r_retransmit) = channel();
|
||||||
let win = default_window();
|
let win = default_window();
|
||||||
@ -932,7 +932,7 @@ mod test {
|
|||||||
exit.clone(),
|
exit.clone(),
|
||||||
resp_recycler.clone(),
|
resp_recycler.clone(),
|
||||||
s_reader,
|
s_reader,
|
||||||
).unwrap();
|
);
|
||||||
let (s_window, _r_window) = channel();
|
let (s_window, _r_window) = channel();
|
||||||
let (s_retransmit, r_retransmit) = channel();
|
let (s_retransmit, r_retransmit) = channel();
|
||||||
let win = default_window();
|
let win = default_window();
|
||||||
|
@ -21,7 +21,7 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
|
|||||||
let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
||||||
let c = Arc::new(RwLock::new(crdt));
|
let c = Arc::new(RwLock::new(crdt));
|
||||||
let w = Arc::new(RwLock::new(vec![]));
|
let w = Arc::new(RwLock::new(vec![]));
|
||||||
let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit).unwrap();
|
let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit);
|
||||||
(c, d, tn.sockets.replicate)
|
(c, d, tn.sockets.replicate)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,7 +42,7 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
|
|||||||
spy_crdt.set_leader(leader.id);
|
spy_crdt.set_leader(leader.id);
|
||||||
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
||||||
let spy_window = default_window();
|
let spy_window = default_window();
|
||||||
let ncp = Ncp::new(&spy_ref, spy_window, None, spy.sockets.gossip, exit.clone()).unwrap();
|
let ncp = Ncp::new(&spy_ref, spy_window, None, spy.sockets.gossip, exit.clone());
|
||||||
//wait for the network to converge
|
//wait for the network to converge
|
||||||
let mut converged = false;
|
let mut converged = false;
|
||||||
let mut rv = vec![];
|
let mut rv = vec![];
|
||||||
|
Reference in New Issue
Block a user