Persist gossip contact info

This commit is contained in:
Michael Vines
2020-12-25 22:31:25 -08:00
parent c693ffaa08
commit 9ddd6f08e8
5 changed files with 152 additions and 11 deletions

View File

@ -69,8 +69,11 @@ use std::{
cmp::min,
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
fmt::{self, Debug},
fs::{self, File},
io::BufReader,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
thread::{sleep, Builder, JoinHandle},
@ -107,7 +110,8 @@ const MAX_PRUNE_DATA_NODES: usize = 32;
const GOSSIP_PING_TOKEN_SIZE: usize = 32;
const GOSSIP_PING_CACHE_CAPACITY: usize = 16384;
const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640);
pub const DEFAULT_CONTACT_DEBUG_INTERVAL: u64 = 10_000;
pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000;
pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000;
/// Minimum serialized size of a Protocol::PullResponse packet.
const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 167;
@ -297,8 +301,10 @@ pub struct ClusterInfo {
stats: GossipStats,
socket: UdpSocket,
local_message_pending_push_queue: RwLock<Vec<(CrdsValue, u64)>>,
contact_debug_interval: u64,
contact_debug_interval: u64, // milliseconds, 0 = disabled
contact_save_interval: u64, // milliseconds, 0 = disabled
instance: NodeInstance,
contact_info_path: PathBuf,
}
impl Default for ClusterInfo {
@ -554,8 +560,10 @@ impl ClusterInfo {
stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
local_message_pending_push_queue: RwLock::new(vec![]),
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL,
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
instance: NodeInstance::new(&mut thread_rng(), id, timestamp()),
contact_info_path: PathBuf::default(),
contact_save_interval: 0, // disabled
};
{
let mut gossip = me.gossip.write().unwrap();
@ -591,6 +599,8 @@ impl ClusterInfo {
),
contact_debug_interval: self.contact_debug_interval,
instance: NodeInstance::new(&mut thread_rng(), *new_id, timestamp()),
contact_info_path: PathBuf::default(),
contact_save_interval: 0, // disabled
}
}
@ -649,6 +659,117 @@ impl ClusterInfo {
*self.entrypoints.write().unwrap() = entrypoints;
}
pub fn save_contact_info(&self) {
let nodes = {
let gossip = self.gossip.read().unwrap();
let entrypoint_gossip_addrs = self
.entrypoints
.read()
.unwrap()
.iter()
.map(|contact_info| contact_info.gossip)
.collect::<HashSet<_>>();
gossip
.crds
.get_nodes()
.filter_map(|v| {
// Don't save:
// 1. Our ContactInfo. No point
// 2. Entrypoint ContactInfo. This will avoid adopting the incorrect shred
// version on restart if the entrypoint shred version changes. Also
// there's not much point in saving entrypoint ContactInfo since by
// definition that information is already available
let contact_info = v.value.contact_info().unwrap();
if contact_info.id != self.id()
&& !entrypoint_gossip_addrs.contains(&contact_info.gossip)
{
return Some(v.value.clone());
}
None
})
.collect::<Vec<_>>()
};
if nodes.is_empty() {
return;
}
let filename = self.contact_info_path.join("contact-info.bin");
let tmp_filename = &filename.with_extension("tmp");
match File::create(&tmp_filename) {
Ok(mut file) => {
if let Err(err) = bincode::serialize_into(&mut file, &nodes) {
warn!(
"Failed to serialize contact info info {}: {}",
tmp_filename.display(),
err
);
return;
}
}
Err(err) => {
warn!("Failed to create {}: {}", tmp_filename.display(), err);
return;
}
}
match fs::rename(&tmp_filename, &filename) {
Ok(()) => {
info!(
"Saved contact info for {} nodes into {}",
nodes.len(),
filename.display()
);
}
Err(err) => {
warn!(
"Failed to rename {} to {}: {}",
tmp_filename.display(),
filename.display(),
err
);
}
}
}
pub fn restore_contact_info(&mut self, contact_info_path: &Path, contact_save_interval: u64) {
self.contact_info_path = contact_info_path.into();
self.contact_save_interval = contact_save_interval;
let filename = contact_info_path.join("contact-info.bin");
if !filename.exists() {
return;
}
let nodes: Vec<CrdsValue> = match File::open(&filename) {
Ok(file) => {
bincode::deserialize_from(&mut BufReader::new(file)).unwrap_or_else(|err| {
warn!("Failed to deserialize {}: {}", filename.display(), err);
vec![]
})
}
Err(err) => {
warn!("Failed to open {}: {}", filename.display(), err);
vec![]
}
};
info!(
"Loaded contact info for {} nodes from {}",
nodes.len(),
filename.display()
);
let now = timestamp();
let mut gossip = self.gossip.write().unwrap();
for node in nodes {
if let Err(err) = gossip.crds.insert(node, now) {
warn!("crds insert failed {:?}", err);
}
}
}
pub fn id(&self) -> Pubkey {
self.id
}
@ -1805,6 +1926,7 @@ impl ClusterInfo {
.spawn(move || {
let mut last_push = timestamp();
let mut last_contact_info_trace = timestamp();
let mut last_contact_info_save = timestamp();
let mut entrypoints_processed = false;
let recycler = PacketsRecycler::default();
let crds_data = vec![
@ -1822,7 +1944,7 @@ impl ClusterInfo {
if self.contact_debug_interval != 0
&& start - last_contact_info_trace > self.contact_debug_interval
{
// Log contact info every 10 seconds
// Log contact info
info!(
"\n{}\n\n{}",
self.contact_info_trace(),
@ -1831,6 +1953,13 @@ impl ClusterInfo {
last_contact_info_trace = start;
}
if self.contact_save_interval != 0
&& start - last_contact_info_save > self.contact_save_interval
{
self.save_contact_info();
last_contact_info_save = start;
}
let stakes: HashMap<_, _> = match bank_forks {
Some(ref bank_forks) => {
bank_forks.read().unwrap().root_bank().staked_nodes()

View File

@ -3,7 +3,10 @@
use crate::{
broadcast_stage::BroadcastStageType,
cache_block_time_service::{CacheBlockTimeSender, CacheBlockTimeService},
cluster_info::{ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL},
cluster_info::{
ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
},
cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService,
consensus::{reconcile_blockstore_roots_with_tower, Tower},
@ -107,6 +110,7 @@ pub struct ValidatorConfig {
pub require_tower: bool,
pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
pub contact_debug_interval: u64,
pub contact_save_interval: u64,
pub bpf_jit: bool,
pub send_transaction_retry_ms: u64,
pub send_transaction_leader_forward_count: u64,
@ -147,7 +151,8 @@ impl Default for ValidatorConfig {
cuda: false,
require_tower: false,
debug_keys: None,
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL,
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
bpf_jit: false,
send_transaction_retry_ms: 2000,
send_transaction_leader_forward_count: 2,
@ -365,6 +370,8 @@ impl Validator {
let mut cluster_info = ClusterInfo::new(node.info.clone(), identity_keypair.clone());
cluster_info.set_contact_debug_interval(config.contact_debug_interval);
cluster_info.set_entrypoints(cluster_entrypoints);
cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
let cluster_info = Arc::new(cluster_info);
let mut block_commitment_cache = BlockCommitmentCache::default();
block_commitment_cache.initialize_slots(bank.slot());
@ -495,8 +502,6 @@ impl Validator {
config.gossip_validators.clone(),
&exit,
);
cluster_info.set_entrypoints(cluster_entrypoints);
let serve_repair = Arc::new(RwLock::new(ServeRepair::new(cluster_info.clone())));
let serve_repair_service = ServeRepairService::new(
&serve_repair,