move fullnode trace generation into crdt
This commit is contained in:
@ -18,7 +18,7 @@ use solana::logger;
|
|||||||
use solana::metrics;
|
use solana::metrics;
|
||||||
use solana::ncp::Ncp;
|
use solana::ncp::Ncp;
|
||||||
use solana::service::Service;
|
use solana::service::Service;
|
||||||
use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil, Pubkey};
|
use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil};
|
||||||
use solana::thin_client::{poll_gossip_for_leader, ThinClient};
|
use solana::thin_client::{poll_gossip_for_leader, ThinClient};
|
||||||
use solana::timing::{duration_as_ms, duration_as_s};
|
use solana::timing::{duration_as_ms, duration_as_s};
|
||||||
use solana::transaction::Transaction;
|
use solana::transaction::Transaction;
|
||||||
@ -509,31 +509,6 @@ fn main() {
|
|||||||
let mut c_threads = vec![];
|
let mut c_threads = vec![];
|
||||||
let (nodes, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads);
|
let (nodes, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads);
|
||||||
|
|
||||||
let leader_id = if let Some(leader) = &leader {
|
|
||||||
leader.id
|
|
||||||
} else {
|
|
||||||
Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
fn print_gossip_info(nodes: &Vec<NodeInfo>, leader_id: &Pubkey) -> () {
|
|
||||||
println!(" Node gossip address | Node identifier");
|
|
||||||
println!("---------------------+------------------");
|
|
||||||
for node in nodes {
|
|
||||||
println!(
|
|
||||||
" {:20} | {}{}",
|
|
||||||
node.contact_info.ncp.to_string(),
|
|
||||||
node.id,
|
|
||||||
if node.id == *leader_id {
|
|
||||||
" <==== leader"
|
|
||||||
} else {
|
|
||||||
""
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
println!("Nodes: {}", nodes.len());
|
|
||||||
}
|
|
||||||
print_gossip_info(&nodes, &leader_id);
|
|
||||||
|
|
||||||
if nodes.len() < num_nodes {
|
if nodes.len() < num_nodes {
|
||||||
println!(
|
println!(
|
||||||
"Error: Insufficient nodes discovered. Expecting {} or more",
|
"Error: Insufficient nodes discovered. Expecting {} or more",
|
||||||
@ -734,16 +709,15 @@ fn converge(
|
|||||||
let window = Arc::new(RwLock::new(default_window()));
|
let window = Arc::new(RwLock::new(default_window()));
|
||||||
let ncp = Ncp::new(&spy_ref, window, None, gossip_socket, exit_signal.clone());
|
let ncp = Ncp::new(&spy_ref, window, None, gossip_socket, exit_signal.clone());
|
||||||
let mut v: Vec<NodeInfo> = vec![];
|
let mut v: Vec<NodeInfo> = vec![];
|
||||||
//wait for the network to converge, 30 seconds should be plenty
|
// wait for the network to converge, 30 seconds should be plenty
|
||||||
for _ in 0..30 {
|
for _ in 0..30 {
|
||||||
if spy_ref.read().unwrap().leader_data().is_none() {
|
{
|
||||||
sleep(Duration::new(1, 0));
|
let spy_ref = spy_ref.read().unwrap();
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
println!("{}", spy_ref.node_info_trace());
|
||||||
|
|
||||||
|
if spy_ref.leader_data().is_some() {
|
||||||
v = spy_ref
|
v = spy_ref
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.table
|
.table
|
||||||
.values()
|
.values()
|
||||||
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu))
|
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu))
|
||||||
@ -760,6 +734,8 @@ fn converge(
|
|||||||
num_nodes
|
num_nodes
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
sleep(Duration::new(1, 0));
|
sleep(Duration::new(1, 0));
|
||||||
}
|
}
|
||||||
threads.extend(ncp.thread_hdls().into_iter());
|
threads.extend(ncp.thread_hdls().into_iter());
|
||||||
|
36
src/crdt.rs
36
src/crdt.rs
@ -254,6 +254,42 @@ impl Crdt {
|
|||||||
self.table.get(&leader_id)
|
self.table.get(&leader_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn node_info_trace(&self) -> String {
|
||||||
|
let leader_id = self.table[&self.id].leader_id;
|
||||||
|
|
||||||
|
let nodes: Vec<_> = self
|
||||||
|
.table
|
||||||
|
.values()
|
||||||
|
.filter(|n| Self::is_valid_address(&n.contact_info.rpu))
|
||||||
|
.cloned()
|
||||||
|
.map(|node| {
|
||||||
|
format!(
|
||||||
|
" ncp: {:20} | {}{}\n \
|
||||||
|
rpu: {:20} |\n \
|
||||||
|
tpu: {:20} |\n",
|
||||||
|
node.contact_info.ncp.to_string(),
|
||||||
|
node.id,
|
||||||
|
if node.id == leader_id {
|
||||||
|
" <==== leader"
|
||||||
|
} else {
|
||||||
|
""
|
||||||
|
},
|
||||||
|
node.contact_info.rpu.to_string(),
|
||||||
|
node.contact_info.tpu.to_string()
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
format!(
|
||||||
|
" NodeInfo.contact_info | Node identifier\n\
|
||||||
|
---------------------------+------------------\n\
|
||||||
|
{}\n \
|
||||||
|
Nodes: {}",
|
||||||
|
nodes.join(""),
|
||||||
|
nodes.len()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn set_leader(&mut self, key: Pubkey) -> () {
|
pub fn set_leader(&mut self, key: Pubkey) -> () {
|
||||||
let mut me = self.my_data().clone();
|
let mut me = self.my_data().clone();
|
||||||
warn!("{}: LEADER_UPDATE TO {} from {}", me.id, key, me.leader_id);
|
warn!("{}: LEADER_UPDATE TO {} from {}", me.id, key, me.leader_id);
|
||||||
|
@ -371,26 +371,6 @@ impl Drop for ThinClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn trace_node_info(nodes: &Vec<NodeInfo>, leader_id: &Pubkey) -> () {
|
|
||||||
trace!(" NodeInfo.contact_info | Node identifier");
|
|
||||||
trace!("---------------------------+------------------");
|
|
||||||
for node in nodes {
|
|
||||||
trace!(
|
|
||||||
" ncp: {:20} | {}{}",
|
|
||||||
node.contact_info.ncp.to_string(),
|
|
||||||
node.id,
|
|
||||||
if node.id == *leader_id {
|
|
||||||
" <==== leader"
|
|
||||||
} else {
|
|
||||||
""
|
|
||||||
}
|
|
||||||
);
|
|
||||||
trace!(" rpu: {:20} | ", node.contact_info.rpu.to_string(),);
|
|
||||||
trace!(" tpu: {:20} | ", node.contact_info.tpu.to_string(),);
|
|
||||||
}
|
|
||||||
trace!("Nodes: {}", nodes.len());
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> Result<NodeInfo> {
|
pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> Result<NodeInfo> {
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let (node, gossip_socket) = Crdt::spy_node();
|
let (node, gossip_socket) = Crdt::spy_node();
|
||||||
@ -421,16 +401,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> R
|
|||||||
}
|
}
|
||||||
|
|
||||||
if log_enabled!(Level::Trace) {
|
if log_enabled!(Level::Trace) {
|
||||||
// print validators/fullnodes
|
trace!("{}", crdt.read().unwrap().node_info_trace());
|
||||||
let nodes: Vec<NodeInfo> = crdt
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.table
|
|
||||||
.values()
|
|
||||||
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu))
|
|
||||||
.cloned()
|
|
||||||
.collect();
|
|
||||||
trace_node_info(&nodes, &Default::default());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if now.elapsed() > deadline {
|
if now.elapsed() > deadline {
|
||||||
@ -443,22 +414,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> R
|
|||||||
ncp.close()?;
|
ncp.close()?;
|
||||||
|
|
||||||
if log_enabled!(Level::Trace) {
|
if log_enabled!(Level::Trace) {
|
||||||
let leader_id = if let Some(leader) = &leader {
|
trace!("{}", crdt.read().unwrap().node_info_trace());
|
||||||
leader.id
|
|
||||||
} else {
|
|
||||||
Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
// print validators/fullnodes
|
|
||||||
let nodes: Vec<NodeInfo> = crdt
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.table
|
|
||||||
.values()
|
|
||||||
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu))
|
|
||||||
.cloned()
|
|
||||||
.collect();
|
|
||||||
trace_node_info(&nodes, &leader_id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(leader.unwrap().clone())
|
Ok(leader.unwrap().clone())
|
||||||
|
Reference in New Issue
Block a user