Multiple entrypoint support

This commit is contained in:
Michael Vines
2020-12-18 10:54:48 -08:00
parent 3373082ffa
commit ace360ade2
7 changed files with 250 additions and 185 deletions

View File

@ -36,6 +36,7 @@ use solana_sdk::sanitize::{Sanitize, SanitizeError};
use bincode::{serialize, serialized_size};
use core::cmp;
use itertools::Itertools;
use rand::thread_rng;
use rayon::prelude::*;
use rayon::{ThreadPool, ThreadPoolBuilder};
use serde::ser::Serialize;
@ -291,8 +292,8 @@ pub struct ClusterInfo {
pub gossip: RwLock<CrdsGossip>,
/// set the keypair that will be used to sign crds values generated. It is unset only in tests.
pub(crate) keypair: Arc<Keypair>,
/// The network entrypoint
entrypoint: RwLock<Option<ContactInfo>>,
/// Network entrypoints
entrypoints: RwLock<Vec<ContactInfo>>,
outbound_budget: DataBudget,
my_contact_info: RwLock<ContactInfo>,
ping_cache: RwLock<PingCache>,
@ -546,7 +547,7 @@ impl ClusterInfo {
let me = Self {
gossip: RwLock::new(CrdsGossip::default()),
keypair,
entrypoint: RwLock::new(None),
entrypoints: RwLock::new(vec![]),
outbound_budget: DataBudget::default(),
my_contact_info: RwLock::new(contact_info),
ping_cache: RwLock::new(PingCache::new(
@ -558,7 +559,7 @@ impl ClusterInfo {
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
local_message_pending_push_queue: RwLock::new(vec![]),
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL,
instance: NodeInstance::new(&mut rand::thread_rng(), id, timestamp()),
instance: NodeInstance::new(&mut thread_rng(), id, timestamp()),
};
{
let mut gossip = me.gossip.write().unwrap();
@ -579,7 +580,7 @@ impl ClusterInfo {
ClusterInfo {
gossip: RwLock::new(gossip),
keypair: self.keypair.clone(),
entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()),
entrypoints: RwLock::new(self.entrypoints.read().unwrap().clone()),
outbound_budget: self.outbound_budget.clone_non_atomic(),
my_contact_info: RwLock::new(my_contact_info),
ping_cache: RwLock::new(self.ping_cache.read().unwrap().mock_clone()),
@ -593,7 +594,7 @@ impl ClusterInfo {
.clone(),
),
contact_debug_interval: self.contact_debug_interval,
instance: NodeInstance::new(&mut rand::thread_rng(), *new_id, timestamp()),
instance: NodeInstance::new(&mut thread_rng(), *new_id, timestamp()),
}
}
@ -645,7 +646,11 @@ impl ClusterInfo {
}
pub fn set_entrypoint(&self, entrypoint: ContactInfo) {
*self.entrypoint.write().unwrap() = Some(entrypoint)
self.set_entrypoints(vec![entrypoint]);
}
pub fn set_entrypoints(&self, entrypoints: Vec<ContactInfo>) {
*self.entrypoints.write().unwrap() = entrypoints;
}
pub fn id(&self) -> Pubkey {
@ -1501,52 +1506,49 @@ impl ClusterInfo {
thread_pool: &ThreadPool,
pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>,
) {
let pull_from_entrypoint = {
let mut w_entrypoint = self.entrypoint.write().unwrap();
if let Some(ref mut entrypoint) = &mut *w_entrypoint {
let entrypoint_id_and_gossip = {
let mut entrypoints = self.entrypoints.write().unwrap();
if entrypoints.is_empty() {
None
} else {
let i = thread_rng().gen_range(0, entrypoints.len());
let entrypoint = &mut entrypoints[i];
if pulls.is_empty() {
// Nobody else to pull from, try the entrypoint
true
// Nobody else to pull from, try an entrypoint
Some((entrypoint.id, entrypoint.gossip))
} else {
let now = timestamp();
// Only consider pulling from the entrypoint periodically to avoid spamming it
if timestamp() - entrypoint.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
false
if now - entrypoint.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
None
} else {
entrypoint.wallclock = now;
let found_entrypoint = self
if self
.time_gossip_read_lock("entrypoint", &self.stats.entrypoint)
.crds
.get_nodes_contact_info()
.any(|node| node.gossip == entrypoint.gossip);
!found_entrypoint
.any(|node| node.gossip == entrypoint.gossip)
{
None // Found the entrypoint, no need to pull from it
} else {
Some((entrypoint.id, entrypoint.gossip))
}
}
}
} else {
false
}
};
if pull_from_entrypoint {
let id_and_gossip = {
self.entrypoint
.read()
.unwrap()
.as_ref()
.map(|e| (e.id, e.gossip))
};
if let Some((id, gossip)) = id_and_gossip {
let r_gossip = self.time_gossip_read_lock("entrypoint", &self.stats.entrypoint2);
let self_info = r_gossip
.crds
.lookup(&CrdsValueLabel::ContactInfo(self.id()))
.unwrap_or_else(|| panic!("self_id invalid {}", self.id()));
r_gossip
.pull
.build_crds_filters(thread_pool, &r_gossip.crds, MAX_BLOOM_SIZE)
.into_iter()
.for_each(|filter| pulls.push((id, filter, gossip, self_info.clone())));
}
if let Some((id, gossip)) = entrypoint_id_and_gossip {
let r_gossip = self.time_gossip_read_lock("entrypoint", &self.stats.entrypoint2);
let self_info = r_gossip
.crds
.lookup(&CrdsValueLabel::ContactInfo(self.id()))
.unwrap_or_else(|| panic!("self_id invalid {}", self.id()));
r_gossip
.pull
.build_crds_filters(thread_pool, &r_gossip.crds, MAX_BLOOM_SIZE)
.into_iter()
.for_each(|filter| pulls.push((id, filter, gossip, self_info.clone())));
}
}
@ -1731,46 +1733,51 @@ impl ClusterInfo {
Ok(())
}
fn process_entrypoint(&self, entrypoint_processed: &mut bool) {
if *entrypoint_processed {
fn process_entrypoints(&self, entrypoints_processed: &mut bool) {
if *entrypoints_processed {
return;
}
let gossip_addr = self.entrypoint.read().unwrap().as_ref().map(|e| e.gossip);
if let Some(gossip_addr) = gossip_addr {
// If a pull from the entrypoint was successful it should exist in the CRDS table
let entrypoint = self.lookup_contact_info_by_gossip_addr(&gossip_addr);
if let Some(entrypoint) = entrypoint {
// Adopt the entrypoint's `shred_version` if ours is unset
if self.my_shred_version() == 0 {
if entrypoint.shred_version == 0 {
warn!("Unable to adopt entrypoint shred version of 0");
} else {
info!(
"Setting shred version to {:?} from entrypoint {:?}",
entrypoint.shred_version, entrypoint.id
);
self.my_contact_info.write().unwrap().shred_version =
entrypoint.shred_version;
self.gossip
.write()
.unwrap()
.set_shred_version(entrypoint.shred_version);
self.insert_self();
*entrypoint_processed = true;
}
} else {
*entrypoint_processed = true;
}
// Update the entrypoint's id so future entrypoint pulls correctly reference it
*self.entrypoint.write().unwrap() = Some(entrypoint);
}
} else {
let mut entrypoints = self.entrypoints.write().unwrap();
if entrypoints.is_empty() {
// No entrypoint specified. Nothing more to process
*entrypoint_processed = true;
*entrypoints_processed = true;
return;
}
for entrypoint in entrypoints.iter_mut() {
if entrypoint.id == Pubkey::default() {
// If a pull from the entrypoint was successful it should exist in the CRDS table
if let Some(entrypoint_from_gossip) =
self.lookup_contact_info_by_gossip_addr(&entrypoint.gossip)
{
// Update the entrypoint's id so future entrypoint pulls correctly reference it
*entrypoint = entrypoint_from_gossip;
}
}
}
// Adopt an entrypoint's `shred_version` if ours is unset
if self.my_shred_version() == 0 {
if let Some(entrypoint) = entrypoints
.iter()
.find(|entrypoint| entrypoint.shred_version != 0)
{
info!(
"Setting shred version to {:?} from entrypoint {:?}",
entrypoint.shred_version, entrypoint.id
);
self.my_contact_info.write().unwrap().shred_version = entrypoint.shred_version;
self.gossip
.write()
.unwrap()
.set_shred_version(entrypoint.shred_version);
}
}
*entrypoints_processed = self.my_shred_version() != 0
&& entrypoints
.iter()
.all(|entrypoint| entrypoint.id != Pubkey::default());
}
fn handle_purge(
@ -1816,7 +1823,7 @@ impl ClusterInfo {
.spawn(move || {
let mut last_push = timestamp();
let mut last_contact_info_trace = timestamp();
let mut entrypoint_processed = false;
let mut entrypoints_processed = false;
let recycler = PacketsRecycler::default();
let crds_data = vec![
CrdsData::Version(Version::new(self.id())),
@ -1863,7 +1870,7 @@ impl ClusterInfo {
self.handle_purge(&thread_pool, &bank_forks, &stakes);
self.process_entrypoint(&mut entrypoint_processed);
self.process_entrypoints(&mut entrypoints_processed);
//TODO: possibly tune this parameter
//we saw a deadlock passing an self.read().unwrap().timeout into sleep
@ -3920,7 +3927,7 @@ mod tests {
);
let pulls = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
assert_eq!(1, pulls.len() as u64);
assert_eq!(*cluster_info.entrypoint.read().unwrap(), Some(entrypoint));
assert_eq!(*cluster_info.entrypoints.read().unwrap(), vec![entrypoint]);
}
#[test]
@ -4106,13 +4113,7 @@ mod tests {
// Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should
// now be two pull requests
cluster_info
.entrypoint
.write()
.unwrap()
.as_mut()
.unwrap()
.wallclock = 0;
cluster_info.entrypoints.write().unwrap()[0].wallclock = 0;
let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
assert_eq!(2, pulls.len() as u64);
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
@ -4242,46 +4243,75 @@ mod tests {
));
assert_eq!(cluster_info.my_shred_version(), 0);
// Simulating starting up with default entrypoint, no known id, only a gossip
// Simulating starting up with two entrypoints, no known id, only a gossip
// address
let entrypoint_gossip_addr = socketaddr!("127.0.0.2:1234");
let mut entrypoint = ContactInfo::new_localhost(&Pubkey::default(), timestamp());
entrypoint.gossip = entrypoint_gossip_addr;
assert_eq!(entrypoint.shred_version, 0);
cluster_info.set_entrypoint(entrypoint);
let entrypoint1_gossip_addr = socketaddr!("127.0.0.2:1234");
let mut entrypoint1 = ContactInfo::new_localhost(&Pubkey::default(), timestamp());
entrypoint1.gossip = entrypoint1_gossip_addr;
assert_eq!(entrypoint1.shred_version, 0);
// Simulate getting entrypoint ContactInfo from gossip with an entrypoint shred version of
let entrypoint2_gossip_addr = socketaddr!("127.0.0.2:5678");
let mut entrypoint2 = ContactInfo::new_localhost(&Pubkey::default(), timestamp());
entrypoint2.gossip = entrypoint2_gossip_addr;
assert_eq!(entrypoint2.shred_version, 0);
cluster_info.set_entrypoints(vec![entrypoint1, entrypoint2]);
// Simulate getting entrypoint ContactInfo from gossip with an entrypoint1 shred version of
// 0
let mut gossiped_entrypoint_info =
let mut gossiped_entrypoint1_info =
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
gossiped_entrypoint_info.gossip = entrypoint_gossip_addr;
gossiped_entrypoint_info.shred_version = 0;
cluster_info.insert_info(gossiped_entrypoint_info.clone());
gossiped_entrypoint1_info.gossip = entrypoint1_gossip_addr;
gossiped_entrypoint1_info.shred_version = 0;
cluster_info.insert_info(gossiped_entrypoint1_info.clone());
assert!(!cluster_info
.entrypoints
.read()
.unwrap()
.iter()
.any(|entrypoint| *entrypoint == gossiped_entrypoint1_info));
// Adopt the entrypoint's gossiped contact info and verify
let mut entrypoint_processed = false;
ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed);
assert_eq!(
cluster_info.entrypoint.read().unwrap().as_ref().unwrap(),
&gossiped_entrypoint_info
);
assert!(!entrypoint_processed); // <--- entrypoint processing incomplete because shred adoption still pending
let mut entrypoints_processed = false;
ClusterInfo::process_entrypoints(&cluster_info, &mut entrypoints_processed);
assert_eq!(cluster_info.entrypoints.read().unwrap().len(), 2);
assert!(cluster_info
.entrypoints
.read()
.unwrap()
.iter()
.any(|entrypoint| *entrypoint == gossiped_entrypoint1_info));
assert!(!entrypoints_processed); // <--- entrypoint processing incomplete because shred adoption still pending
assert_eq!(cluster_info.my_shred_version(), 0); // <-- shred version still 0
// Simulate getting entrypoint ContactInfo from gossip with an entrypoint shred version of
// Simulate getting entrypoint ContactInfo from gossip with an entrypoint2 shred version of
// !0
gossiped_entrypoint_info.shred_version = 1;
cluster_info.insert_info(gossiped_entrypoint_info.clone());
let mut gossiped_entrypoint2_info =
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
gossiped_entrypoint2_info.gossip = entrypoint2_gossip_addr;
gossiped_entrypoint2_info.shred_version = 1;
cluster_info.insert_info(gossiped_entrypoint2_info.clone());
assert!(!cluster_info
.entrypoints
.read()
.unwrap()
.iter()
.any(|entrypoint| *entrypoint == gossiped_entrypoint2_info));
// Adopt the entrypoint's gossiped contact info and verify
let mut entrypoint_processed = false;
ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed);
assert_eq!(
cluster_info.entrypoint.read().unwrap().as_ref().unwrap(),
&gossiped_entrypoint_info
);
assert!(entrypoint_processed);
assert_eq!(cluster_info.my_shred_version(), 1); // <-- shred version now adopted from entrypoint
error!("Adopt the entrypoint's gossiped contact info and verify");
let mut entrypoints_processed = false;
ClusterInfo::process_entrypoints(&cluster_info, &mut entrypoints_processed);
assert_eq!(cluster_info.entrypoints.read().unwrap().len(), 2);
assert!(cluster_info
.entrypoints
.read()
.unwrap()
.iter()
.any(|entrypoint| *entrypoint == gossiped_entrypoint2_info));
assert!(entrypoints_processed);
assert_eq!(cluster_info.my_shred_version(), 1); // <-- shred version now adopted from entrypoint2
}
#[test]
@ -4314,13 +4344,14 @@ mod tests {
cluster_info.insert_info(gossiped_entrypoint_info.clone());
// Adopt the entrypoint's gossiped contact info and verify
let mut entrypoint_processed = false;
ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed);
let mut entrypoints_processed = false;
ClusterInfo::process_entrypoints(&cluster_info, &mut entrypoints_processed);
assert_eq!(cluster_info.entrypoints.read().unwrap().len(), 1);
assert_eq!(
cluster_info.entrypoint.read().unwrap().as_ref().unwrap(),
&gossiped_entrypoint_info
cluster_info.entrypoints.read().unwrap()[0],
gossiped_entrypoint_info
);
assert!(entrypoint_processed);
assert!(entrypoints_processed);
assert_eq!(cluster_info.my_shred_version(), 2); // <--- No change to shred version
}
}

View File

@ -356,7 +356,7 @@ impl TestValidator {
&ledger_path,
&validator_vote_account.pubkey(),
vec![Arc::new(validator_vote_account)],
None,
vec![],
&validator_config,
));

View File

@ -222,7 +222,7 @@ impl Validator {
ledger_path: &Path,
vote_account: &Pubkey,
mut authorized_voter_keypairs: Vec<Arc<Keypair>>,
cluster_entrypoint: Option<&ContactInfo>,
cluster_entrypoints: Vec<ContactInfo>,
config: &ValidatorConfig,
) -> Self {
let id = identity_keypair.pubkey();
@ -241,7 +241,9 @@ impl Validator {
}
report_target_features();
info!("entrypoint: {:?}", cluster_entrypoint);
for cluster_entrypoint in &cluster_entrypoints {
info!("entrypoint: {:?}", cluster_entrypoint);
}
if solana_perf::perf_libs::api().is_some() {
info!("Initializing sigverify, this could take a while...");
@ -492,6 +494,7 @@ impl Validator {
config.gossip_validators.clone(),
&exit,
);
cluster_info.set_entrypoints(cluster_entrypoints);
let serve_repair = Arc::new(RwLock::new(ServeRepair::new(cluster_info.clone())));
let serve_repair_service = ServeRepairService::new(
@ -501,12 +504,6 @@ impl Validator {
&exit,
);
// Insert the entrypoint info, should only be None if this node
// is the bootstrap validator
if let Some(cluster_entrypoint) = cluster_entrypoint {
cluster_info.set_entrypoint(cluster_entrypoint.clone());
}
let (snapshot_packager_service, snapshot_config_and_package_sender) =
if let Some(snapshot_config) = config.snapshot_config.clone() {
if is_snapshot_config_invalid(
@ -1287,7 +1284,7 @@ mod tests {
&validator_ledger_path,
&voting_keypair.pubkey(),
vec![voting_keypair.clone()],
Some(&leader_node.info),
vec![leader_node.info],
&config,
);
validator.close();
@ -1357,7 +1354,7 @@ mod tests {
&validator_ledger_path,
&vote_account_keypair.pubkey(),
vec![Arc::new(vote_account_keypair)],
Some(&leader_node.info),
vec![leader_node.info.clone()],
&config,
)
})