From 45c368d67081be4b5b4c2b1c20d2955a01a69a18 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 15 Sep 2020 04:44:04 +0000 Subject: [PATCH] Add new validator options for running in more restrictive environments (bp #12191) (#12217) * Add --restricted-repair-only-mode flag (cherry picked from commit 63a67f415ef3c5bb243b8b91d3c9b054dcdc2227) # Conflicts: # validator/src/main.rs * Add --gossip-validator argument (cherry picked from commit daae638781b3fe3b90fef52b4ac355da8c10312d) # Conflicts: # core/src/crds_gossip_pull.rs # core/src/crds_gossip_push.rs # core/src/gossip_service.rs # validator/src/main.rs * Documenet how to reduce validator port exposure (cherry picked from commit c8f03c7f6de9c334501f2e5541c1a2e501a8f59d) * Resolve conflicts Co-authored-by: Michael Vines --- core/src/cluster_info.rs | 62 ++++++++++------ core/src/crds_gossip.rs | 11 ++- core/src/crds_gossip_pull.rs | 88 +++++++++++++++++++++-- core/src/crds_gossip_push.rs | 92 +++++++++++++++++++++--- core/src/gossip_service.rs | 31 +++++--- core/src/validator.rs | 3 + core/tests/crds_gossip.rs | 8 +-- core/tests/gossip.rs | 4 +- docs/src/integrations/exchange.md | 43 +++++++++-- validator/src/main.rs | 115 ++++++++++++++++++++++-------- 10 files changed, 368 insertions(+), 89 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 7d6c2e8b70..0020bd7201 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -418,7 +418,7 @@ impl ClusterInfo { gossip.set_shred_version(me.my_shred_version()); } me.insert_self(); - me.push_self(&HashMap::new()); + me.push_self(&HashMap::new(), None); me } @@ -450,13 +450,17 @@ impl ClusterInfo { self.insert_self() } - fn push_self(&self, stakes: &HashMap) { + fn push_self( + &self, + stakes: &HashMap, + gossip_validators: Option<&HashSet>, + ) { let now = timestamp(); self.my_contact_info.write().unwrap().wallclock = now; let entry = CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair); let mut w_gossip = self.gossip.write().unwrap(); - w_gossip.refresh_push_active_set(stakes); + w_gossip.refresh_push_active_set(stakes, gossip_validators); w_gossip.process_push_message(&self.id(), vec![entry], now); } @@ -1344,13 +1348,17 @@ impl ClusterInfo { messages } - fn new_pull_requests(&self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { + fn new_pull_requests( + &self, + gossip_validators: Option<&HashSet>, + stakes: &HashMap, + ) -> Vec<(SocketAddr, Protocol)> { let now = timestamp(); let mut pulls: Vec<_> = { let r_gossip = self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests); r_gossip - .new_pull_request(now, stakes, MAX_BLOOM_SIZE) + .new_pull_request(now, gossip_validators, stakes, MAX_BLOOM_SIZE) .ok() .into_iter() .filter_map(|(peer, filters, me)| { @@ -1411,27 +1419,32 @@ impl ClusterInfo { // Generate new push and pull requests fn generate_new_gossip_requests( &self, + gossip_validators: Option<&HashSet>, stakes: &HashMap, generate_pull_requests: bool, ) -> Vec<(SocketAddr, Protocol)> { - let pulls: Vec<_> = if generate_pull_requests { - self.new_pull_requests(stakes) + let mut pulls: Vec<_> = if generate_pull_requests { + self.new_pull_requests(gossip_validators, stakes) } else { vec![] }; - let pushes: Vec<_> = self.new_push_requests(); - vec![pulls, pushes].into_iter().flatten().collect() + let mut pushes: Vec<_> = self.new_push_requests(); + + pulls.append(&mut pushes); + pulls } /// At random pick a node and try to get updated changes from them fn run_gossip( &self, + gossip_validators: Option<&HashSet>, recycler: &PacketsRecycler, stakes: &HashMap, sender: &PacketSender, generate_pull_requests: bool, ) -> Result<()> { - let reqs = self.generate_new_gossip_requests(&stakes, generate_pull_requests); + let reqs = + self.generate_new_gossip_requests(gossip_validators, &stakes, generate_pull_requests); if !reqs.is_empty() { let packets = to_packets_with_destination(recycler.clone(), &reqs); sender.send(packets)?; @@ -1497,6 +1510,7 @@ impl ClusterInfo { self: Arc, bank_forks: Option>>, sender: PacketSender, + gossip_validators: Option>, exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); @@ -1527,7 +1541,13 @@ impl ClusterInfo { None => HashMap::new(), }; - let _ = self.run_gossip(&recycler, &stakes, &sender, generate_pull_requests); + let _ = self.run_gossip( + gossip_validators.as_ref(), + &recycler, + &stakes, + &sender, + generate_pull_requests, + ); if exit.load(Ordering::Relaxed) { return; } @@ -1539,7 +1559,7 @@ impl ClusterInfo { //TODO: possibly tune this parameter //we saw a deadlock passing an self.read().unwrap().timeout into sleep if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { - self.push_self(&stakes); + self.push_self(&stakes, gossip_validators.as_ref()); last_push = timestamp(); } let elapsed = timestamp() - start; @@ -2666,8 +2686,8 @@ mod tests { .gossip .write() .unwrap() - .refresh_push_active_set(&HashMap::new()); - let reqs = cluster_info.generate_new_gossip_requests(&HashMap::new(), true); + .refresh_push_active_set(&HashMap::new(), None); + let reqs = cluster_info.generate_new_gossip_requests(None, &HashMap::new(), true); //assert none of the addrs are invalid. reqs.iter().all(|(addr, _)| { let res = ContactInfo::is_valid_address(addr); @@ -2805,7 +2825,7 @@ mod tests { .gossip .write() .unwrap() - .refresh_push_active_set(&HashMap::new()); + .refresh_push_active_set(&HashMap::new(), None); //check that all types of gossip messages are signed correctly let (_, push_messages) = cluster_info .gossip @@ -2822,7 +2842,7 @@ mod tests { .gossip .write() .unwrap() - .new_pull_request(timestamp(), &HashMap::new(), MAX_BLOOM_SIZE) + .new_pull_request(timestamp(), None, &HashMap::new(), MAX_BLOOM_SIZE) .ok() .unwrap(); assert!(val.verify()); @@ -3041,7 +3061,7 @@ mod tests { let entrypoint_pubkey = Pubkey::new_rand(); let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); cluster_info.set_entrypoint(entrypoint.clone()); - let pulls = cluster_info.new_pull_requests(&HashMap::new()); + let pulls = cluster_info.new_pull_requests(None, &HashMap::new()); assert_eq!(1, pulls.len() as u64); match pulls.get(0) { Some((addr, msg)) => { @@ -3068,7 +3088,7 @@ mod tests { vec![entrypoint_crdsvalue], &timeouts, ); - let pulls = cluster_info.new_pull_requests(&HashMap::new()); + let pulls = cluster_info.new_pull_requests(None, &HashMap::new()); assert_eq!(1, pulls.len() as u64); assert_eq!(*cluster_info.entrypoint.read().unwrap(), Some(entrypoint)); } @@ -3211,7 +3231,7 @@ mod tests { // Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a // fresh timestamp). There should only be one pull request to `other_node` - let pulls = cluster_info.new_pull_requests(&stakes); + let pulls = cluster_info.new_pull_requests(None, &stakes); assert_eq!(1, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); @@ -3224,14 +3244,14 @@ mod tests { .as_mut() .unwrap() .wallclock = 0; - let pulls = cluster_info.new_pull_requests(&stakes); + let pulls = cluster_info.new_pull_requests(None, &stakes); assert_eq!(2, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip); // Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should // only be one pull request to `other_node` - let pulls = cluster_info.new_pull_requests(&stakes); + let pulls = cluster_info.new_pull_requests(None, &stakes); assert_eq!(1, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); } diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 53520930f2..c207611770 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -115,10 +115,15 @@ impl CrdsGossip { /// refresh the push active set /// * ratio - number of actives to rotate - pub fn refresh_push_active_set(&mut self, stakes: &HashMap) { + pub fn refresh_push_active_set( + &mut self, + stakes: &HashMap, + gossip_validators: Option<&HashSet>, + ) { self.push.refresh_push_active_set( &self.crds, stakes, + gossip_validators, &self.id, self.shred_version, self.pull.pull_request_time.len(), @@ -130,6 +135,7 @@ impl CrdsGossip { pub fn new_pull_request( &self, now: u64, + gossip_validators: Option<&HashSet>, stakes: &HashMap, bloom_size: usize, ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { @@ -138,6 +144,7 @@ impl CrdsGossip { &self.id, self.shred_version, now, + gossip_validators, stakes, bloom_size, ) @@ -271,7 +278,7 @@ mod test { 0, ) .unwrap(); - crds_gossip.refresh_push_active_set(&HashMap::new()); + crds_gossip.refresh_push_active_set(&HashMap::new(), None); let now = timestamp(); //incorrect dest let mut res = crds_gossip.process_prune_msg( diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 1a5c65b6d4..c6c2642572 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -163,10 +163,18 @@ impl CrdsGossipPull { self_id: &Pubkey, self_shred_version: u16, now: u64, + gossip_validators: Option<&HashSet>, stakes: &HashMap, bloom_size: usize, ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { - let options = self.pull_options(crds, &self_id, self_shred_version, now, stakes); + let options = self.pull_options( + crds, + &self_id, + self_shred_version, + now, + gossip_validators, + stakes, + ); if options.is_empty() { return Err(CrdsGossipError::NoPeers); } @@ -185,6 +193,7 @@ impl CrdsGossipPull { self_id: &Pubkey, self_shred_version: u16, now: u64, + gossip_validators: Option<&HashSet>, stakes: &HashMap, ) -> Vec<(f32, &'a ContactInfo)> { crds.table @@ -196,6 +205,8 @@ impl CrdsGossipPull { && (self_shred_version == 0 || v.shred_version == 0 || self_shred_version == v.shred_version) + && gossip_validators + .map_or(true, |gossip_validators| gossip_validators.contains(&v.id)) }) .map(|item| { let max_weight = f32::from(u16::max_value()) - 1.0; @@ -545,7 +556,7 @@ mod test { stakes.insert(id, i * 100); } let now = 1024; - let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, &stakes); + let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, None, &stakes); assert!(!options.is_empty()); options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); // check that the highest stake holder is also the heaviest weighted. @@ -595,7 +606,7 @@ mod test { // shred version 123 should ignore 456 nodes let options = node - .pull_options(&crds, &me.label().pubkey(), 123, 0, &stakes) + .pull_options(&crds, &me.label().pubkey(), 123, 0, None, &stakes) .iter() .map(|(_, c)| c.id) .collect::>(); @@ -605,7 +616,7 @@ mod test { // spy nodes will see all let options = node - .pull_options(&crds, &spy.label().pubkey(), 0, 0, &stakes) + .pull_options(&crds, &spy.label().pubkey(), 0, 0, None, &stakes) .iter() .map(|(_, c)| c.id) .collect::>(); @@ -615,6 +626,65 @@ mod test { assert!(options.contains(&node_456.pubkey())); } + #[test] + fn test_pulls_only_from_allowed() { + let mut crds = Crds::default(); + let stakes = HashMap::new(); + let node = CrdsGossipPull::default(); + let gossip = socketaddr!("127.0.0.1:1234"); + + let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + gossip, + ..ContactInfo::default() + })); + let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + gossip, + ..ContactInfo::default() + })); + + crds.insert(me.clone(), 0).unwrap(); + crds.insert(node_123.clone(), 0).unwrap(); + + // Empty gossip_validators -- will pull from nobody + let mut gossip_validators = HashSet::new(); + let options = node.pull_options( + &crds, + &me.label().pubkey(), + 0, + 0, + Some(&gossip_validators), + &stakes, + ); + assert!(options.is_empty()); + + // Unknown pubkey in gossip_validators -- will pull from nobody + gossip_validators.insert(Pubkey::new_rand()); + let options = node.pull_options( + &crds, + &me.label().pubkey(), + 0, + 0, + Some(&gossip_validators), + &stakes, + ); + assert!(options.is_empty()); + + // node_123 pubkey in gossip_validators -- will pull from it + gossip_validators.insert(node_123.pubkey()); + let options = node.pull_options( + &crds, + &me.label().pubkey(), + 0, + 0, + Some(&gossip_validators), + &stakes, + ); + assert_eq!(options.len(), 1); + assert_eq!(options[0].1.id, node_123.pubkey()); + } + #[test] fn test_new_pull_request() { let mut crds = Crds::default(); @@ -625,13 +695,13 @@ mod test { let id = entry.label().pubkey(); let node = CrdsGossipPull::default(); assert_eq!( - node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE), + node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE), Err(CrdsGossipError::NoPeers) ); crds.insert(entry.clone(), 0).unwrap(); assert_eq!( - node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE), + node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE), Err(CrdsGossipError::NoPeers) ); @@ -640,7 +710,7 @@ mod test { 0, ))); crds.insert(new.clone(), 0).unwrap(); - let req = node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE); + let req = node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE); let (to, _, self_info) = req.unwrap(); assert_eq!(to, new.label().pubkey()); assert_eq!(self_info, entry); @@ -677,6 +747,7 @@ mod test { &node_pubkey, 0, u64::max_value(), + None, &HashMap::new(), PACKET_DATA_SIZE, ); @@ -706,6 +777,7 @@ mod test { &node_pubkey, 0, 0, + None, &HashMap::new(), PACKET_DATA_SIZE, ); @@ -766,6 +838,7 @@ mod test { &node_pubkey, 0, 0, + None, &HashMap::new(), PACKET_DATA_SIZE, ); @@ -840,6 +913,7 @@ mod test { &node_pubkey, 0, 0, + None, &HashMap::new(), PACKET_DATA_SIZE, ); diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 3efc3afd08..0e9cc39a8b 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -280,6 +280,7 @@ impl CrdsGossipPush { &mut self, crds: &Crds, stakes: &HashMap, + gossip_validators: Option<&HashSet>, self_id: &Pubkey, self_shred_version: u16, network_size: usize, @@ -288,7 +289,13 @@ impl CrdsGossipPush { let need = Self::compute_need(self.num_active, self.active_set.len(), ratio); let mut new_items = HashMap::new(); - let options: Vec<_> = self.push_options(crds, &self_id, self_shred_version, stakes); + let options: Vec<_> = self.push_options( + crds, + &self_id, + self_shred_version, + stakes, + gossip_validators, + ); if options.is_empty() { return; } @@ -336,6 +343,7 @@ impl CrdsGossipPush { self_id: &Pubkey, self_shred_version: u16, stakes: &HashMap, + gossip_validators: Option<&HashSet>, ) -> Vec<(f32, &'a ContactInfo)> { crds.table .values() @@ -347,6 +355,9 @@ impl CrdsGossipPush { && (self_shred_version == 0 || info.shred_version == 0 || self_shred_version == info.shred_version) + && gossip_validators.map_or(true, |gossip_validators| { + gossip_validators.contains(&info.id) + }) }) .map(|(info, value)| { let max_weight = f32::from(u16::max_value()) - 1.0; @@ -554,7 +565,7 @@ mod test { ))); assert_eq!(crds.insert(value1.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); assert!(push.active_set.get(&value1.label().pubkey()).is_some()); let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -564,7 +575,7 @@ mod test { assert!(push.active_set.get(&value2.label().pubkey()).is_none()); assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); for _ in 0..30 { - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); if push.active_set.get(&value2.label().pubkey()).is_some() { break; } @@ -577,7 +588,7 @@ mod test { )); assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); } - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); assert_eq!(push.active_set.len(), push.num_active); } #[test] @@ -595,7 +606,7 @@ mod test { crds.insert(peer.clone(), time).unwrap(); stakes.insert(id, i * 100); } - let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes); + let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None); assert!(!options.is_empty()); options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); // check that the highest stake holder is also the heaviest weighted. @@ -645,7 +656,7 @@ mod test { // shred version 123 should ignore 456 nodes let options = node - .push_options(&crds, &me.label().pubkey(), 123, &stakes) + .push_options(&crds, &me.label().pubkey(), 123, &stakes, None) .iter() .map(|(_, c)| c.id) .collect::>(); @@ -655,7 +666,7 @@ mod test { // spy nodes will see all let options = node - .push_options(&crds, &spy.label().pubkey(), 0, &stakes) + .push_options(&crds, &spy.label().pubkey(), 0, &stakes, None) .iter() .map(|(_, c)| c.id) .collect::>(); @@ -664,6 +675,65 @@ mod test { assert!(options.contains(&node_123.pubkey())); assert!(options.contains(&node_456.pubkey())); } + + #[test] + fn test_pushes_only_to_allowed() { + let mut crds = Crds::default(); + let stakes = HashMap::new(); + let node = CrdsGossipPush::default(); + let gossip = socketaddr!("127.0.0.1:1234"); + + let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + gossip, + ..ContactInfo::default() + })); + let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + gossip, + ..ContactInfo::default() + })); + + crds.insert(me.clone(), 0).unwrap(); + crds.insert(node_123.clone(), 0).unwrap(); + + // Unknown pubkey in gossip_validators -- will push to nobody + let mut gossip_validators = HashSet::new(); + let options = node.push_options( + &crds, + &me.label().pubkey(), + 0, + &stakes, + Some(&gossip_validators), + ); + + assert!(options.is_empty()); + + // Unknown pubkey in gossip_validators -- will push to nobody + gossip_validators.insert(Pubkey::new_rand()); + let options = node.push_options( + &crds, + &me.label().pubkey(), + 0, + &stakes, + Some(&gossip_validators), + ); + assert!(options.is_empty()); + + // node_123 pubkey in gossip_validators -- will push to it + gossip_validators.insert(node_123.pubkey()); + let options = node.push_options( + &crds, + &me.label().pubkey(), + 0, + &stakes, + Some(&gossip_validators), + ); + + assert_eq!(options.len(), 1); + assert_eq!(options[0].1.id, node_123.pubkey()); + } + #[test] fn test_new_push_messages() { let mut crds = Crds::default(); @@ -673,7 +743,7 @@ mod test { 0, ))); assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -710,7 +780,7 @@ mod test { push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), 0), Ok(None) ); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); // push 3's contact info to 1 and 2 and 3 let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -733,7 +803,7 @@ mod test { 0, ))); assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -760,7 +830,7 @@ mod test { 0, ))); assert_eq!(crds.insert(peer, 0), Ok(None)); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); ci.wallclock = 1; diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index c80cdb5bc1..c451c8ea5e 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -9,12 +9,17 @@ use solana_perf::recycler::Recycler; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, Signer}; use solana_streamer::streamer; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; -use std::sync::{Arc, RwLock}; -use std::thread::{self, sleep, JoinHandle}; -use std::time::{Duration, Instant}; +use std::{ + collections::HashSet, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::channel, + {Arc, RwLock}, + }, + thread::{self, sleep, JoinHandle}, + time::{Duration, Instant}, +}; pub struct GossipService { thread_hdls: Vec>, @@ -25,6 +30,7 @@ impl GossipService { cluster_info: &Arc, bank_forks: Option>>, gossip_socket: UdpSocket, + gossip_validators: Option>, exit: &Arc, ) -> Self { let (request_sender, request_receiver) = channel(); @@ -50,7 +56,13 @@ impl GossipService { response_sender.clone(), exit, ); - let t_gossip = ClusterInfo::gossip(cluster_info.clone(), bank_forks, response_sender, exit); + let t_gossip = ClusterInfo::gossip( + cluster_info.clone(), + bank_forks, + response_sender, + gossip_validators, + exit, + ); let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; Self { thread_hdls } } @@ -262,7 +274,8 @@ fn make_gossip_node( cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint)); } let cluster_info = Arc::new(cluster_info); - let gossip_service = GossipService::new(&cluster_info.clone(), None, gossip_socket, &exit); + let gossip_service = + GossipService::new(&cluster_info.clone(), None, gossip_socket, None, &exit); (gossip_service, ip_echo, cluster_info) } @@ -281,7 +294,7 @@ mod tests { let tn = Node::new_localhost(); let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone()); let c = Arc::new(cluster_info); - let d = GossipService::new(&c, None, tn.sockets.gossip, &exit); + let d = GossipService::new(&c, None, tn.sockets.gossip, None, &exit); exit.store(true, Ordering::Relaxed); d.join().unwrap(); } diff --git a/core/src/validator.rs b/core/src/validator.rs index c3b9c302bc..fc15f35bb6 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -79,6 +79,7 @@ pub struct ValidatorConfig { pub new_hard_forks: Option>, pub trusted_validators: Option>, // None = trust all pub repair_validators: Option>, // None = repair from all + pub gossip_validators: Option>, // None = gossip with all pub halt_on_trusted_validators_accounts_hash_mismatch: bool, pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection pub frozen_accounts: Vec, @@ -108,6 +109,7 @@ impl Default for ValidatorConfig { new_hard_forks: None, trusted_validators: None, repair_validators: None, + gossip_validators: None, halt_on_trusted_validators_accounts_hash_mismatch: false, accounts_hash_fault_injection_slots: 0, frozen_accounts: vec![], @@ -364,6 +366,7 @@ impl Validator { &cluster_info, Some(bank_forks.clone()), node.sockets.gossip, + config.gossip_validators.clone(), &exit, ); diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 6384b5cf48..451ee3733c 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -222,7 +222,7 @@ fn network_simulator(network: &mut Network, max_convergance: f64) { network_values.par_iter().for_each(|node| { node.lock() .unwrap() - .refresh_push_active_set(&HashMap::new()); + .refresh_push_active_set(&HashMap::new(), None); }); let mut total_bytes = bytes_tx; for second in 1..num { @@ -361,7 +361,7 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, network_values.par_iter().for_each(|node| { node.lock() .unwrap() - .refresh_push_active_set(&HashMap::new()); + .refresh_push_active_set(&HashMap::new(), None); }); } total = network_values @@ -408,7 +408,7 @@ fn network_run_pull( .filter_map(|from| { from.lock() .unwrap() - .new_pull_request(now, &HashMap::new(), cluster_info::MAX_BLOOM_SIZE) + .new_pull_request(now, None, &HashMap::new(), cluster_info::MAX_BLOOM_SIZE) .ok() }) .collect() @@ -581,7 +581,7 @@ fn test_prune_errors() { 0, ) .unwrap(); - crds_gossip.refresh_push_active_set(&HashMap::new()); + crds_gossip.refresh_push_active_set(&HashMap::new(), None); let now = timestamp(); //incorrect dest let mut res = crds_gossip.process_prune_msg( diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index 0f3783d501..934f2f347f 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -19,7 +19,8 @@ fn test_node(exit: &Arc) -> (Arc, GossipService, UdpSoc let keypair = Arc::new(Keypair::new()); let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey()); let cluster_info = Arc::new(ClusterInfo::new(test_node.info.clone(), keypair)); - let gossip_service = GossipService::new(&cluster_info, None, test_node.sockets.gossip, exit); + let gossip_service = + GossipService::new(&cluster_info, None, test_node.sockets.gossip, None, exit); let _ = cluster_info.my_contact_info(); ( cluster_info, @@ -39,6 +40,7 @@ fn test_node_with_bank( &cluster_info, Some(bank_forks), test_node.sockets.gossip, + None, exit, ); let _ = cluster_info.my_contact_info(); diff --git a/docs/src/integrations/exchange.md b/docs/src/integrations/exchange.md index a83d49a952..b25ab4967f 100644 --- a/docs/src/integrations/exchange.md +++ b/docs/src/integrations/exchange.md @@ -14,7 +14,7 @@ much data is retained, and ensure you do not miss any data if one node fails. To run an api node: 1. [Install the Solana command-line tool suite](../cli/install-solana-cli-tools.md) -2. Boot the node with at least the following parameters: +2. Start the validator with at least the following parameters: ```bash solana-validator \ @@ -25,7 +25,7 @@ solana-validator \ --rpc-port 8899 \ --no-voting \ --enable-rpc-transaction-history \ - --limit-ledger-size \ + --limit-ledger-size \ --trusted-validator \ --no-untrusted-rpc ``` @@ -35,7 +35,7 @@ Customize `--ledger` to your desired ledger storage location, and `--rpc-port` t The `--entrypoint`, `--expected-genesis-hash`, and `--expected-shred-version` parameters are all specific to the cluster you are joining. The shred version will change on any hard forks in the cluster, so including `--expected-shred-version` ensures you are receiving current data from the cluster you expect. [Current parameters for Mainnet Beta](../clusters.md#example-solana-validator-command-line-2) -The `--limit-ledger-size` parameter allows you to specify how many ledger [shreds](../terminology.md#shred) your node retains on disk. If you do not include this parameter, the ledger will keep the entire ledger until it runs out of disk space. A larger value like `--limit-ledger-size 250000000000` is good for a couple days +The `--limit-ledger-size` parameter allows you to specify how many ledger [shreds](../terminology.md#shred) your node retains on disk. If you do not include this parameter, the validator will keep the entire ledger until it runs out of disk space. The default value is good for at least a couple days but larger values may be used by adding an argument to `--limit-ledger-size` if desired. Check `solana-validator --help` for the default limit value used by `--limit-ledger-size` Specifying one or more `--trusted-validator` parameters can protect you from booting from a malicious snapshot. [More on the value of booting with trusted validators](../running-validator/validator-start.md#trusted-validators) @@ -60,15 +60,46 @@ order to prevent this issue, add the `--no-snapshot-fetch` parameter to your `solana-validator` command to receive historical ledger data instead of a snapshot. -If you pass the `--no-snapshot-fetch` parameter on your initial boot, it will -take your node a very long time to catch up. We recommend booting from a -snapshot first, and then using the `--no-snapshot-fetch` parameter for reboots. +Do not pass the `--no-snapshot-fetch` parameter on your initial boot as it's not +possible to boot the node all the way from the genesis block. Instead boot from +a snapshot first and then add the `--no-snapshot-fetch` parameter for reboots. It is important to note that the amount of historical ledger available to your nodes is limited to what your trusted validators retain. You will need to ensure your nodes do not experience downtimes longer than this span, if ledger continuity is crucial for you. + +### Minimizing Validator Port Exposure + +The validator requires that various UDP and TCP ports be open for inbound +traffic from all other Solana validators. While this is the most efficient mode of +operation, and is strongly recommended, it is possible to restrict the +validator to only require inbound traffic from one other Solana validator. + +First add the `--restricted-repair-only-mode` argument. This will cause the +validator to operate in a restricted mode where it will not receive pushes from +the rest of the validators, and instead will need to continually poll other +validators for blocks. The validator will only transmit UDP packets to other +validators using the *Gossip* and *ServeR* ("serve repair") ports, and only +receive UDP packets on its *Gossip* and *Repair* ports. + +The *Gossip* port is bi-directional and allows your validator to remain in +contact with the rest of the cluster. Your validator transmits on the *ServeR* +to make repair requests to obtaining new blocks from the rest of the network, +since Turbine is now disabled. Your validator will then receive repair +responses on the *Repair* port from other validators. + +To further restrict the validator to only requesting blocks from one or more +validators, first determine the identity pubkey for that validator and add the +`--gossip-pull-validator PUBKEY --repair-validator PUBKEY` arguments for each +PUBKEY. This will cause your validator to be a resource drain on each validator +that you add, so please do this sparingly and only after consulting with the +target validator. + +Your validator should now only be communicating with the explicitly listed +validators and only on the *Gossip*, *Repair* and *ServeR* ports. + ## Setting up Deposit Accounts Solana accounts do not require any on-chain initialization; once they contain diff --git a/validator/src/main.rs b/validator/src/main.rs index 96532a6a4f..4d70987b0b 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -142,6 +142,7 @@ fn start_gossip_node( gossip_addr: &SocketAddr, gossip_socket: UdpSocket, expected_shred_version: Option, + gossip_validators: Option>, ) -> (Arc, Arc, GossipService) { let cluster_info = ClusterInfo::new( ClusterInfo::gossip_contact_info( @@ -159,6 +160,7 @@ fn start_gossip_node( &cluster_info.clone(), None, gossip_socket, + gossip_validators, &gossip_exit_flag, ); (cluster_info, gossip_exit_flag, gossip_service) @@ -601,6 +603,17 @@ pub fn main() { .requires("entrypoint") .help("Skip the RPC vote account sanity check") ) + .arg( + Arg::with_name("restricted_repair_only_mode") + .long("restricted-repair-only-mode") + .takes_value(false) + .help("Do not publish the Gossip, TPU, TVU or Repair Service ports causing \ + the validator to operate in a limited capacity that reduces its \ + exposure to the rest of the cluster. \ + \ + The --no-voting flag is implicit when this flag is enabled \ + "), + ) .arg( Arg::with_name("dev_halt_at_slot") .long("dev-halt-at-slot") @@ -809,7 +822,8 @@ pub fn main() { .requires("expected_bank_hash") .value_name("SLOT") .validator(is_slot) - .help("After processing the ledger and the next slot is SLOT, wait until a supermajority of stake is visible on gossip before starting PoH"), + .help("After processing the ledger and the next slot is SLOT, wait until a \ + supermajority of stake is visible on gossip before starting PoH"), ) .arg( Arg::with_name("hard_forks") @@ -844,7 +858,18 @@ pub fn main() { .multiple(true) .takes_value(true) .help("A list of validators to request repairs from. If specified, repair will not \ - request from validators outside this set [default: request repairs from all validators]") + request from validators outside this set [default: all validators]") + ) + .arg( + Arg::with_name("gossip_validators") + .long("gossip-validator") + .validator(is_pubkey) + .value_name("PUBKEY") + .multiple(true) + .takes_value(true) + .help("A list of validators to gossip with. If specified, gossip \ + will not pull/pull from from validators outside this set. \ + [default: all validators]") ) .arg( Arg::with_name("no_rocksdb_compaction") @@ -960,6 +985,23 @@ pub fn main() { "repair_validators", "--repair-validator", ); + let gossip_validators = validators_set( + &identity_keypair.pubkey(), + &matches, + "gossip_validators", + "--gossip-validator", + ); + + let bind_address = solana_net_utils::parse_host(matches.value_of("bind_address").unwrap()) + .expect("invalid bind_address"); + let rpc_bind_address = if matches.is_present("rpc_bind_address") { + solana_net_utils::parse_host(matches.value_of("rpc_bind_address").unwrap()) + .expect("invalid rpc_bind_address") + } else { + bind_address + }; + + let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode"); let mut validator_config = ValidatorConfig { dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(), @@ -991,10 +1033,11 @@ pub fn main() { rpc_ports: value_t!(matches, "rpc_port", u16) .ok() .map(|rpc_port| (rpc_port, rpc_port + 1)), - voting_disabled: matches.is_present("no_voting"), + voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode, wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(), trusted_validators, repair_validators, + gossip_validators, frozen_accounts: values_t!(matches, "frozen_accounts", Pubkey).unwrap_or_default(), no_rocksdb_compaction, wal_recovery_mode, @@ -1002,8 +1045,10 @@ pub fn main() { }; let vote_account = pubkey_of(&matches, "vote_account").unwrap_or_else(|| { - warn!("--vote-account not specified, validator will not vote"); - validator_config.voting_disabled = true; + if !validator_config.voting_disabled { + warn!("--vote-account not specified, validator will not vote"); + validator_config.voting_disabled = true; + } Keypair::new().pubkey() }); @@ -1011,15 +1056,6 @@ pub fn main() { solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap()) .expect("invalid dynamic_port_range"); - let bind_address = solana_net_utils::parse_host(matches.value_of("bind_address").unwrap()) - .expect("invalid bind_address"); - let rpc_bind_address = if matches.is_present("rpc_bind_address") { - solana_net_utils::parse_host(matches.value_of("rpc_bind_address").unwrap()) - .expect("invalid rpc_bind_address") - } else { - bind_address - }; - let account_paths = if let Some(account_paths) = matches.value_of("account_paths") { account_paths.split(',').map(PathBuf::from).collect() } else { @@ -1211,6 +1247,18 @@ pub fn main() { bind_address, ); + if restricted_repair_only_mode { + let any = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), 0); + // When in --restricted_repair_only_mode is enabled only the gossip and repair ports + // need to be reachable by the entrypoint to respond to gossip pull requests and repair + // requests initiated by the node. All other ports are unused. + node.info.tpu = any; + node.info.tpu_forwards = any; + node.info.tvu = any; + node.info.tvu_forwards = any; + node.info.serve_repair = any; + } + if !private_rpc { if let Some((rpc_port, rpc_pubsub_port)) = validator_config.rpc_ports { node.info.rpc = SocketAddr::new(node.info.gossip.ip(), rpc_port); @@ -1219,17 +1267,25 @@ pub fn main() { } if let Some(ref cluster_entrypoint) = cluster_entrypoint { - let mut udp_sockets = vec![ - &node.sockets.gossip, - &node.sockets.repair, - &node.sockets.serve_repair, - ]; - udp_sockets.extend(node.sockets.tpu.iter()); - udp_sockets.extend(node.sockets.tpu_forwards.iter()); - udp_sockets.extend(node.sockets.tvu.iter()); - udp_sockets.extend(node.sockets.tvu_forwards.iter()); - udp_sockets.extend(node.sockets.broadcast.iter()); - udp_sockets.extend(node.sockets.retransmit_sockets.iter()); + let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair]; + + if ContactInfo::is_valid_address(&node.info.serve_repair) { + udp_sockets.push(&node.sockets.serve_repair); + } + if ContactInfo::is_valid_address(&node.info.tpu) { + udp_sockets.extend(node.sockets.tpu.iter()); + } + if ContactInfo::is_valid_address(&node.info.tpu_forwards) { + udp_sockets.extend(node.sockets.tpu_forwards.iter()); + } + if ContactInfo::is_valid_address(&node.info.tvu) { + udp_sockets.extend(node.sockets.tvu.iter()); + udp_sockets.extend(node.sockets.broadcast.iter()); + udp_sockets.extend(node.sockets.retransmit_sockets.iter()); + } + if ContactInfo::is_valid_address(&node.info.tvu_forwards) { + udp_sockets.extend(node.sockets.tvu_forwards.iter()); + } let mut tcp_listeners = vec![]; if !private_rpc { @@ -1247,9 +1303,11 @@ pub fn main() { } } - if let Some(ip_echo) = &node.sockets.ip_echo { - let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener"); - tcp_listeners.push((node.info.gossip.port(), ip_echo)); + if !restricted_repair_only_mode { + if let Some(ip_echo) = &node.sockets.ip_echo { + let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener"); + tcp_listeners.push((ip_echo.local_addr().unwrap().port(), ip_echo)); + } } if !solana_net_utils::verify_reachable_ports( @@ -1266,6 +1324,7 @@ pub fn main() { &node.info.gossip, node.sockets.gossip.try_clone().unwrap(), validator_config.expected_shred_version, + validator_config.gossip_validators.clone(), ); let mut blacklisted_rpc_nodes = HashSet::new();