* Add --restricted-repair-only-mode flag (cherry picked from commit63a67f415e
) * Add --gossip-validator argument (cherry picked from commitdaae638781
) * Documenet how to reduce validator port exposure (cherry picked from commitc8f03c7f6d
) Co-authored-by: Michael Vines <mvines@gmail.com>
This commit is contained in:
@@ -421,7 +421,7 @@ impl ClusterInfo {
|
||||
gossip.set_shred_version(me.my_shred_version());
|
||||
}
|
||||
me.insert_self();
|
||||
me.push_self(&HashMap::new());
|
||||
me.push_self(&HashMap::new(), None);
|
||||
me
|
||||
}
|
||||
|
||||
@@ -453,13 +453,17 @@ impl ClusterInfo {
|
||||
self.insert_self()
|
||||
}
|
||||
|
||||
fn push_self(&self, stakes: &HashMap<Pubkey, u64>) {
|
||||
fn push_self(
|
||||
&self,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
gossip_validators: Option<&HashSet<Pubkey>>,
|
||||
) {
|
||||
let now = timestamp();
|
||||
self.my_contact_info.write().unwrap().wallclock = now;
|
||||
let entry =
|
||||
CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair);
|
||||
let mut w_gossip = self.gossip.write().unwrap();
|
||||
w_gossip.refresh_push_active_set(stakes);
|
||||
w_gossip.refresh_push_active_set(stakes, gossip_validators);
|
||||
w_gossip.process_push_message(&self.id(), vec![entry], now);
|
||||
}
|
||||
|
||||
@@ -1363,13 +1367,17 @@ impl ClusterInfo {
|
||||
messages
|
||||
}
|
||||
|
||||
fn new_pull_requests(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
|
||||
fn new_pull_requests(
|
||||
&self,
|
||||
gossip_validators: Option<&HashSet<Pubkey>>,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
) -> Vec<(SocketAddr, Protocol)> {
|
||||
let now = timestamp();
|
||||
let mut pulls: Vec<_> = {
|
||||
let r_gossip =
|
||||
self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests);
|
||||
r_gossip
|
||||
.new_pull_request(now, stakes, MAX_BLOOM_SIZE)
|
||||
.new_pull_request(now, gossip_validators, stakes, MAX_BLOOM_SIZE)
|
||||
.ok()
|
||||
.into_iter()
|
||||
.filter_map(|(peer, filters, me)| {
|
||||
@@ -1430,27 +1438,32 @@ impl ClusterInfo {
|
||||
// Generate new push and pull requests
|
||||
fn generate_new_gossip_requests(
|
||||
&self,
|
||||
gossip_validators: Option<&HashSet<Pubkey>>,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
generate_pull_requests: bool,
|
||||
) -> Vec<(SocketAddr, Protocol)> {
|
||||
let pulls: Vec<_> = if generate_pull_requests {
|
||||
self.new_pull_requests(stakes)
|
||||
let mut pulls: Vec<_> = if generate_pull_requests {
|
||||
self.new_pull_requests(gossip_validators, stakes)
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
let pushes: Vec<_> = self.new_push_requests();
|
||||
vec![pulls, pushes].into_iter().flatten().collect()
|
||||
let mut pushes: Vec<_> = self.new_push_requests();
|
||||
|
||||
pulls.append(&mut pushes);
|
||||
pulls
|
||||
}
|
||||
|
||||
/// At random pick a node and try to get updated changes from them
|
||||
fn run_gossip(
|
||||
&self,
|
||||
gossip_validators: Option<&HashSet<Pubkey>>,
|
||||
recycler: &PacketsRecycler,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
sender: &PacketSender,
|
||||
generate_pull_requests: bool,
|
||||
) -> Result<()> {
|
||||
let reqs = self.generate_new_gossip_requests(&stakes, generate_pull_requests);
|
||||
let reqs =
|
||||
self.generate_new_gossip_requests(gossip_validators, &stakes, generate_pull_requests);
|
||||
if !reqs.is_empty() {
|
||||
let packets = to_packets_with_destination(recycler.clone(), &reqs);
|
||||
sender.send(packets)?;
|
||||
@@ -1519,6 +1532,7 @@ impl ClusterInfo {
|
||||
self: Arc<Self>,
|
||||
bank_forks: Option<Arc<RwLock<BankForks>>>,
|
||||
sender: PacketSender,
|
||||
gossip_validators: Option<HashSet<Pubkey>>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
) -> JoinHandle<()> {
|
||||
let exit = exit.clone();
|
||||
@@ -1549,7 +1563,13 @@ impl ClusterInfo {
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
let _ = self.run_gossip(&recycler, &stakes, &sender, generate_pull_requests);
|
||||
let _ = self.run_gossip(
|
||||
gossip_validators.as_ref(),
|
||||
&recycler,
|
||||
&stakes,
|
||||
&sender,
|
||||
generate_pull_requests,
|
||||
);
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
@@ -1561,7 +1581,7 @@ impl ClusterInfo {
|
||||
//TODO: possibly tune this parameter
|
||||
//we saw a deadlock passing an self.read().unwrap().timeout into sleep
|
||||
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
|
||||
self.push_self(&stakes);
|
||||
self.push_self(&stakes, gossip_validators.as_ref());
|
||||
last_push = timestamp();
|
||||
}
|
||||
let elapsed = timestamp() - start;
|
||||
@@ -2703,8 +2723,8 @@ mod tests {
|
||||
.gossip
|
||||
.write()
|
||||
.unwrap()
|
||||
.refresh_push_active_set(&HashMap::new());
|
||||
let reqs = cluster_info.generate_new_gossip_requests(&HashMap::new(), true);
|
||||
.refresh_push_active_set(&HashMap::new(), None);
|
||||
let reqs = cluster_info.generate_new_gossip_requests(None, &HashMap::new(), true);
|
||||
//assert none of the addrs are invalid.
|
||||
reqs.iter().all(|(addr, _)| {
|
||||
let res = ContactInfo::is_valid_address(addr);
|
||||
@@ -2842,7 +2862,7 @@ mod tests {
|
||||
.gossip
|
||||
.write()
|
||||
.unwrap()
|
||||
.refresh_push_active_set(&HashMap::new());
|
||||
.refresh_push_active_set(&HashMap::new(), None);
|
||||
//check that all types of gossip messages are signed correctly
|
||||
let (_, push_messages) = cluster_info
|
||||
.gossip
|
||||
@@ -2859,7 +2879,7 @@ mod tests {
|
||||
.gossip
|
||||
.write()
|
||||
.unwrap()
|
||||
.new_pull_request(timestamp(), &HashMap::new(), MAX_BLOOM_SIZE)
|
||||
.new_pull_request(timestamp(), None, &HashMap::new(), MAX_BLOOM_SIZE)
|
||||
.ok()
|
||||
.unwrap();
|
||||
assert!(val.verify());
|
||||
@@ -3078,7 +3098,7 @@ mod tests {
|
||||
let entrypoint_pubkey = Pubkey::new_rand();
|
||||
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
|
||||
cluster_info.set_entrypoint(entrypoint.clone());
|
||||
let pulls = cluster_info.new_pull_requests(&HashMap::new());
|
||||
let pulls = cluster_info.new_pull_requests(None, &HashMap::new());
|
||||
assert_eq!(1, pulls.len() as u64);
|
||||
match pulls.get(0) {
|
||||
Some((addr, msg)) => {
|
||||
@@ -3105,7 +3125,7 @@ mod tests {
|
||||
vec![entrypoint_crdsvalue],
|
||||
&timeouts,
|
||||
);
|
||||
let pulls = cluster_info.new_pull_requests(&HashMap::new());
|
||||
let pulls = cluster_info.new_pull_requests(None, &HashMap::new());
|
||||
assert_eq!(1, pulls.len() as u64);
|
||||
assert_eq!(*cluster_info.entrypoint.read().unwrap(), Some(entrypoint));
|
||||
}
|
||||
@@ -3248,7 +3268,7 @@ mod tests {
|
||||
|
||||
// Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a
|
||||
// fresh timestamp). There should only be one pull request to `other_node`
|
||||
let pulls = cluster_info.new_pull_requests(&stakes);
|
||||
let pulls = cluster_info.new_pull_requests(None, &stakes);
|
||||
assert_eq!(1, pulls.len() as u64);
|
||||
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
|
||||
|
||||
@@ -3261,14 +3281,14 @@ mod tests {
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.wallclock = 0;
|
||||
let pulls = cluster_info.new_pull_requests(&stakes);
|
||||
let pulls = cluster_info.new_pull_requests(None, &stakes);
|
||||
assert_eq!(2, pulls.len() as u64);
|
||||
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
|
||||
assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip);
|
||||
|
||||
// Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should
|
||||
// only be one pull request to `other_node`
|
||||
let pulls = cluster_info.new_pull_requests(&stakes);
|
||||
let pulls = cluster_info.new_pull_requests(None, &stakes);
|
||||
assert_eq!(1, pulls.len() as u64);
|
||||
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
|
||||
}
|
||||
|
Reference in New Issue
Block a user