poll_gossip_for_leader() code cleanup

This commit is contained in:
Michael Vines
2019-03-06 15:41:21 -08:00
parent b083e4db48
commit 679a718cbf
2 changed files with 15 additions and 27 deletions

View File

@ -117,8 +117,7 @@ fn make_spy_node(
) -> (GossipService, Arc<RwLock<ClusterInfo>>) { ) -> (GossipService, Arc<RwLock<ClusterInfo>>) {
let keypair = Arc::new(Keypair::new()); let keypair = Arc::new(Keypair::new());
let (node, gossip_socket) = ClusterInfo::spy_node(&keypair.pubkey()); let (node, gossip_socket) = ClusterInfo::spy_node(&keypair.pubkey());
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(node);
let mut cluster_info = ClusterInfo::new(node, keypair);
cluster_info.insert_info(entry_point.clone()); cluster_info.insert_info(entry_point.clone());
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));

View File

@ -390,7 +390,7 @@ impl Drop for ThinClient {
} }
} }
pub fn poll_gossip_for_leader(leader_gossip: SocketAddr, timeout: Option<u64>) -> Result<NodeInfo> { pub fn poll_gossip_for_leader(gossip_addr: SocketAddr, timeout: Option<u64>) -> Result<NodeInfo> {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (node, gossip_socket) = ClusterInfo::spy_node(); let (node, gossip_socket) = ClusterInfo::spy_node();
let my_addr = gossip_socket.local_addr().unwrap(); let my_addr = gossip_socket.local_addr().unwrap();
@ -398,28 +398,23 @@ pub fn poll_gossip_for_leader(leader_gossip: SocketAddr, timeout: Option<u64>) -
let gossip_service = let gossip_service =
GossipService::new(&cluster_info.clone(), None, None, gossip_socket, &exit); GossipService::new(&cluster_info.clone(), None, None, gossip_socket, &exit);
let leader_entry_point = NodeInfo::new_entry_point(&leader_gossip); let entry_point = NodeInfo::new_entry_point(&gossip_addr);
cluster_info cluster_info.write().unwrap().insert_info(entry_point);
.write()
.unwrap()
.insert_info(leader_entry_point);
sleep(Duration::from_millis(100));
let deadline = match timeout { let deadline = match timeout {
Some(timeout) => Duration::new(timeout, 0), Some(timeout) => Duration::new(timeout, 0),
None => Duration::new(std::u64::MAX, 0), None => Duration::new(std::u64::MAX, 0),
}; };
let now = Instant::now(); let now = Instant::now();
// Block until leader's correct contact info is received let result = loop {
let leader; sleep(Duration::from_millis(100));
trace!("polling {:?} for leader from {:?}", gossip_addr, my_addr);
loop { if let Some(leader) = cluster_info.read().unwrap().get_gossip_top_leader() {
trace!("polling {:?} for leader from {:?}", leader_gossip, my_addr); if log_enabled!(log::Level::Trace) {
trace!("{}", cluster_info.read().unwrap().node_info_trace());
if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() { }
leader = Some(l.clone()); break Ok(leader.clone());
break;
} }
if log_enabled!(log::Level::Trace) { if log_enabled!(log::Level::Trace) {
@ -427,20 +422,14 @@ pub fn poll_gossip_for_leader(leader_gossip: SocketAddr, timeout: Option<u64>) -
} }
if now.elapsed() > deadline { if now.elapsed() > deadline {
return Err(Error::ClusterInfoError(ClusterInfoError::NoLeader)); break Err(Error::ClusterInfoError(ClusterInfoError::NoLeader));
} }
};
sleep(Duration::from_millis(100));
}
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
gossip_service.join()?; gossip_service.join()?;
if log_enabled!(log::Level::Trace) { result
trace!("{}", cluster_info.read().unwrap().node_info_trace());
}
Ok(leader.unwrap().clone())
} }
pub fn retry_get_balance( pub fn retry_get_balance(