adds validator flag to allow private ip addresses (#18850)

This commit is contained in:
behzad nouri
2021-07-23 15:25:03 +00:00
committed by GitHub
parent 63aec9728f
commit d2d5f36a3c
69 changed files with 1263 additions and 391 deletions

View File

@ -64,7 +64,7 @@ use {
solana_streamer::{
packet,
sendmmsg::{multi_target_send, SendPktsError},
socket::is_global,
socket::SocketAddrSpace,
streamer::{PacketReceiver, PacketSender},
},
solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY,
@ -157,12 +157,7 @@ pub struct ClusterInfo {
contact_save_interval: u64, // milliseconds, 0 = disabled
instance: RwLock<NodeInstance>,
contact_info_path: PathBuf,
}
impl Default for ClusterInfo {
fn default() -> Self {
Self::new_with_invalid_keypair(ContactInfo::default())
}
socket_addr_space: SocketAddrSpace,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, AbiExample)]
@ -391,12 +386,11 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
}
impl ClusterInfo {
/// Without a valid keypair gossip will not function. Only useful for tests.
pub fn new_with_invalid_keypair(contact_info: ContactInfo) -> Self {
Self::new(contact_info, Arc::new(Keypair::new()))
}
pub fn new(contact_info: ContactInfo, keypair: Arc<Keypair>) -> Self {
pub fn new(
contact_info: ContactInfo,
keypair: Arc<Keypair>,
socket_addr_space: SocketAddrSpace,
) -> Self {
let id = contact_info.id;
let me = Self {
gossip: CrdsGossip::default(),
@ -415,6 +409,7 @@ impl ClusterInfo {
instance: RwLock::new(NodeInstance::new(&mut thread_rng(), id, timestamp())),
contact_info_path: PathBuf::default(),
contact_save_interval: 0, // disabled
socket_addr_space,
};
me.insert_self();
me.push_self(&HashMap::new(), None);
@ -444,6 +439,7 @@ impl ClusterInfo {
instance: RwLock::new(NodeInstance::new(&mut thread_rng(), *new_id, timestamp())),
contact_info_path: PathBuf::default(),
contact_save_interval: 0, // disabled
..*self
}
}
@ -451,6 +447,10 @@ impl ClusterInfo {
self.contact_debug_interval = new;
}
pub fn socket_addr_space(&self) -> &SocketAddrSpace {
&self.socket_addr_space
}
fn push_self(
&self,
stakes: &HashMap<Pubkey, u64>,
@ -474,8 +474,13 @@ impl ClusterInfo {
shred_version,
..
} = *self.my_contact_info.read().unwrap();
self.gossip
.refresh_push_active_set(&self_pubkey, shred_version, stakes, gossip_validators);
self.gossip.refresh_push_active_set(
&self_pubkey,
shred_version,
stakes,
gossip_validators,
&self.socket_addr_space,
);
}
// TODO kill insert_info, only used by tests
@ -667,7 +672,7 @@ impl ClusterInfo {
.all_peers()
.into_iter()
.filter_map(|(node, last_updated)| {
if !ContactInfo::is_valid_address(&node.rpc) {
if !ContactInfo::is_valid_address(&node.rpc, &self.socket_addr_space) {
return None;
}
@ -678,8 +683,8 @@ impl ClusterInfo {
return None;
}
fn addr_to_string(default_ip: &IpAddr, addr: &SocketAddr) -> String {
if ContactInfo::is_valid_address(addr) {
let addr_to_string = |default_ip: &IpAddr, addr: &SocketAddr| -> String {
if ContactInfo::is_valid_address(addr, &self.socket_addr_space) {
if &addr.ip() == default_ip {
addr.port().to_string()
} else {
@ -688,7 +693,7 @@ impl ClusterInfo {
} else {
"none".to_string()
}
}
};
let rpc_addr = node.rpc.ip();
Some(format!(
@ -732,7 +737,7 @@ impl ClusterInfo {
.all_peers()
.into_iter()
.filter_map(|(node, last_updated)| {
let is_spy_node = Self::is_spy_node(&node);
let is_spy_node = Self::is_spy_node(&node, &self.socket_addr_space);
if is_spy_node {
total_spy_nodes = total_spy_nodes.saturating_add(1);
}
@ -745,8 +750,8 @@ impl ClusterInfo {
if is_spy_node {
shred_spy_nodes = shred_spy_nodes.saturating_add(1);
}
fn addr_to_string(default_ip: &IpAddr, addr: &SocketAddr) -> String {
if ContactInfo::is_valid_address(addr) {
let addr_to_string = |default_ip: &IpAddr, addr: &SocketAddr| -> String {
if ContactInfo::is_valid_address(addr, &self.socket_addr_space) {
if &addr.ip() == default_ip {
addr.port().to_string()
} else {
@ -755,11 +760,11 @@ impl ClusterInfo {
} else {
"none".to_string()
}
}
};
let ip_addr = node.gossip.ip();
Some(format!(
"{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n",
if ContactInfo::is_valid_address(&node.gossip) {
if ContactInfo::is_valid_address(&node.gossip, &self.socket_addr_space) {
ip_addr.to_string()
} else {
"none".to_string()
@ -1131,7 +1136,10 @@ impl ClusterInfo {
let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds
.get_nodes_contact_info()
.filter(|x| x.id != self_pubkey && ContactInfo::is_valid_address(&x.rpc))
.filter(|x| {
x.id != self_pubkey
&& ContactInfo::is_valid_address(&x.rpc, &self.socket_addr_space)
})
.cloned()
.collect()
}
@ -1151,7 +1159,9 @@ impl ClusterInfo {
gossip_crds
.get_nodes_contact_info()
// shred_version not considered for gossip peers (ie, spy nodes do not set shred_version)
.filter(|x| x.id != me && ContactInfo::is_valid_address(&x.gossip))
.filter(|x| {
x.id != me && ContactInfo::is_valid_address(&x.gossip, &self.socket_addr_space)
})
.cloned()
.collect()
}
@ -1161,7 +1171,10 @@ impl ClusterInfo {
let self_pubkey = self.id();
self.time_gossip_read_lock("all_tvu_peers", &self.stats.all_tvu_peers)
.get_nodes_contact_info()
.filter(|x| ContactInfo::is_valid_address(&x.tvu) && x.id != self_pubkey)
.filter(|x| {
ContactInfo::is_valid_address(&x.tvu, &self.socket_addr_space)
&& x.id != self_pubkey
})
.cloned()
.collect()
}
@ -1193,7 +1206,7 @@ impl ClusterInfo {
node.id != self_pubkey
&& node.shred_version == self_shred_version
&& ContactInfo::is_valid_tvu_address(&node.tvu)
&& ContactInfo::is_valid_address(&node.serve_repair)
&& ContactInfo::is_valid_address(&node.serve_repair, &self.socket_addr_space)
&& match gossip_crds.get::<&LowestSlot>(node.id) {
None => true, // fallback to legacy behavior
Some(lowest_slot) => lowest_slot.lowest <= slot,
@ -1203,10 +1216,10 @@ impl ClusterInfo {
.collect()
}
fn is_spy_node(contact_info: &ContactInfo) -> bool {
!ContactInfo::is_valid_address(&contact_info.tpu)
|| !ContactInfo::is_valid_address(&contact_info.gossip)
|| !ContactInfo::is_valid_address(&contact_info.tvu)
fn is_spy_node(contact_info: &ContactInfo, socket_addr_space: &SocketAddrSpace) -> bool {
!ContactInfo::is_valid_address(&contact_info.tpu, socket_addr_space)
|| !ContactInfo::is_valid_address(&contact_info.gossip, socket_addr_space)
|| !ContactInfo::is_valid_address(&contact_info.tvu, socket_addr_space)
}
/// compute broadcast table
@ -1215,7 +1228,10 @@ impl ClusterInfo {
let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds
.get_nodes_contact_info()
.filter(|x| x.id != self_pubkey && ContactInfo::is_valid_address(&x.tpu))
.filter(|x| {
x.id != self_pubkey
&& ContactInfo::is_valid_address(&x.tpu, &self.socket_addr_space)
})
.cloned()
.collect()
}
@ -1223,20 +1239,25 @@ impl ClusterInfo {
/// retransmit messages to a list of nodes
/// # Remarks
/// We need to avoid having obj locked while doing a io, such as the `send_to`
pub fn retransmit_to(peers: &[&ContactInfo], packet: &Packet, s: &UdpSocket, forwarded: bool) {
pub fn retransmit_to(
peers: &[&ContactInfo],
packet: &Packet,
s: &UdpSocket,
forwarded: bool,
socket_addr_space: &SocketAddrSpace,
) {
trace!("retransmit orders {}", peers.len());
let dests: Vec<_> = if forwarded {
peers
.iter()
.map(|peer| &peer.tvu_forwards)
.filter(|addr| ContactInfo::is_valid_address(addr))
.filter(|addr| is_global(addr))
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect()
} else {
peers
.iter()
.map(|peer| &peer.tvu)
.filter(|addr| is_global(addr))
.filter(|addr| socket_addr_space.check(addr))
.collect()
};
let data = &packet.data[..packet.meta.size];
@ -1377,6 +1398,7 @@ impl ClusterInfo {
MAX_BLOOM_SIZE,
&self.ping_cache,
&mut pings,
&self.socket_addr_space,
) {
Err(_) => Vec::default(),
Ok((peer, filters)) => vec![(peer, filters)],
@ -1850,7 +1872,7 @@ impl ClusterInfo {
// incoming pull-requests, pings are also sent to request.from_addr (as
// opposed to caller.gossip address).
move |request| {
ContactInfo::is_valid_address(&request.from_addr) && {
ContactInfo::is_valid_address(&request.from_addr, &self.socket_addr_space) && {
let node = (request.caller.pubkey(), request.from_addr);
*cache.entry(node).or_insert_with(|| hard_check(node))
}
@ -2219,7 +2241,7 @@ impl ClusterInfo {
let new_push_requests = self.new_push_requests(stakes, require_stake_for_gossip);
inc_new_counter_debug!("cluster_info-push_message-pushes", new_push_requests.len());
for (address, request) in new_push_requests {
if ContactInfo::is_valid_address(&address) {
if ContactInfo::is_valid_address(&address, &self.socket_addr_space) {
match Packet::from_data(Some(&address), &request) {
Ok(packet) => packets.packets.push(packet),
Err(err) => error!("failed to write push-request packet: {:?}", err),
@ -2869,13 +2891,14 @@ pub fn push_messages_to_peer(
messages: Vec<CrdsValue>,
self_id: Pubkey,
peer_gossip: SocketAddr,
socket_addr_space: &SocketAddrSpace,
) -> Result<(), GossipError> {
let reqs: Vec<_> = ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, messages)
.map(move |payload| (peer_gossip, Protocol::PushMessage(self_id, payload)))
.collect();
let packets = to_packets_with_destination(PacketsRecycler::default(), &reqs);
let sock = UdpSocket::bind("0.0.0.0:0").unwrap();
packet::send_to(&packets, &sock)?;
packet::send_to(&packets, &sock, socket_addr_space)?;
Ok(())
}
@ -2969,20 +2992,30 @@ mod tests {
fn test_gossip_node() {
//check that a gossip nodes always show up as spies
let (node, _, _) = ClusterInfo::spy_node(solana_sdk::pubkey::new_rand(), 0);
assert!(ClusterInfo::is_spy_node(&node));
assert!(ClusterInfo::is_spy_node(
&node,
&SocketAddrSpace::Unspecified
));
let (node, _, _) = ClusterInfo::gossip_node(
solana_sdk::pubkey::new_rand(),
&"1.1.1.1:1111".parse().unwrap(),
0,
);
assert!(ClusterInfo::is_spy_node(&node));
assert!(ClusterInfo::is_spy_node(
&node,
&SocketAddrSpace::Unspecified
));
}
#[test]
fn test_handle_pull() {
solana_logger::setup();
let node = Node::new_localhost();
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));
let cluster_info = Arc::new(ClusterInfo::new(
node.info,
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
));
let entrypoint_pubkey = solana_sdk::pubkey::new_rand();
let data = test_crds_values(entrypoint_pubkey);
@ -3039,6 +3072,7 @@ mod tests {
let cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&this_node.pubkey(), timestamp()),
this_node.clone(),
SocketAddrSpace::Unspecified,
);
let remote_nodes: Vec<(Keypair, SocketAddr)> =
repeat_with(|| new_rand_remote_node(&mut rng))
@ -3093,6 +3127,7 @@ mod tests {
let cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&this_node.pubkey(), timestamp()),
this_node.clone(),
SocketAddrSpace::Unspecified,
);
let remote_nodes: Vec<(Keypair, SocketAddr)> =
repeat_with(|| new_rand_remote_node(&mut rng))
@ -3252,13 +3287,18 @@ mod tests {
//check that gossip doesn't try to push to invalid addresses
let node = Node::new_localhost();
let (spy, _, _) = ClusterInfo::spy_node(solana_sdk::pubkey::new_rand(), 0);
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));
let cluster_info = Arc::new(ClusterInfo::new(
node.info,
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
));
cluster_info.insert_info(spy);
cluster_info.gossip.refresh_push_active_set(
&cluster_info.id(),
cluster_info.my_shred_version(),
&HashMap::new(), // stakes
None, // gossip validators
&SocketAddrSpace::Unspecified,
);
let reqs = cluster_info.generate_new_gossip_requests(
&thread_pool,
@ -3269,7 +3309,7 @@ mod tests {
);
//assert none of the addrs are invalid.
reqs.iter().all(|(addr, _)| {
let res = ContactInfo::is_valid_address(addr);
let res = ContactInfo::is_valid_address(addr, &SocketAddrSpace::Unspecified);
assert!(res);
res
});
@ -3278,14 +3318,19 @@ mod tests {
#[test]
fn test_cluster_info_new() {
let d = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
let cluster_info = ClusterInfo::new_with_invalid_keypair(d.clone());
let cluster_info = ClusterInfo::new(
d.clone(),
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
);
assert_eq!(d.id, cluster_info.id());
}
#[test]
fn insert_info_test() {
let d = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
let cluster_info = ClusterInfo::new_with_invalid_keypair(d);
let cluster_info =
ClusterInfo::new(d, Arc::new(Keypair::new()), SocketAddrSpace::Unspecified);
let d = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
let label = CrdsValueLabel::ContactInfo(d.id);
cluster_info.insert_info(d);
@ -3364,7 +3409,11 @@ mod tests {
let peer_keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
let peer = ContactInfo::new_localhost(&peer_keypair.pubkey(), 0);
let cluster_info = ClusterInfo::new(contact_info, Arc::new(keypair));
let cluster_info = ClusterInfo::new(
contact_info,
Arc::new(keypair),
SocketAddrSpace::Unspecified,
);
cluster_info
.ping_cache
.lock()
@ -3376,6 +3425,7 @@ mod tests {
cluster_info.my_shred_version(),
&HashMap::new(), // stakes
None, // gossip validators
&SocketAddrSpace::Unspecified,
);
//check that all types of gossip messages are signed correctly
let push_messages = cluster_info
@ -3400,6 +3450,7 @@ mod tests {
MAX_BLOOM_SIZE,
&cluster_info.ping_cache,
&mut pings,
&cluster_info.socket_addr_space,
)
.ok()
.unwrap();
@ -3409,7 +3460,11 @@ mod tests {
fn test_refresh_vote() {
let keys = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
let cluster_info = ClusterInfo::new(
contact_info,
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
);
// Construct and push a vote for some other slot
let unrefresh_slot = 5;
@ -3496,7 +3551,11 @@ mod tests {
let mut rng = rand::thread_rng();
let keys = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
let cluster_info = ClusterInfo::new(
contact_info,
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
);
// make sure empty crds is handled correctly
let mut cursor = Cursor::default();
@ -3567,7 +3626,11 @@ mod tests {
let mut rng = rand::thread_rng();
let keys = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
let cluster_info = ClusterInfo::new(
contact_info,
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
);
let mut tower = Vec::new();
for k in 0..MAX_LOCKOUT_HISTORY {
let slot = k as Slot;
@ -3613,7 +3676,11 @@ mod tests {
fn test_push_epoch_slots() {
let keys = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
let cluster_info = ClusterInfo::new(
contact_info,
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
);
let slots = cluster_info.get_epoch_slots(&mut Cursor::default());
assert!(slots.is_empty());
cluster_info.push_epoch_slots(&[0]);
@ -3670,6 +3737,7 @@ mod tests {
let cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
SocketAddrSpace::Unspecified,
);
let entrypoint_pubkey = solana_sdk::pubkey::new_rand();
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
@ -3820,7 +3888,11 @@ mod tests {
#[test]
fn test_tvu_peers_and_stakes() {
let d = ContactInfo::new_localhost(&Pubkey::new(&[0; 32]), timestamp());
let cluster_info = ClusterInfo::new_with_invalid_keypair(d.clone());
let cluster_info = ClusterInfo::new(
d.clone(),
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
);
let mut stakes = HashMap::new();
// no stake
@ -3861,6 +3933,7 @@ mod tests {
let cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
SocketAddrSpace::Unspecified,
);
let entrypoint_pubkey = solana_sdk::pubkey::new_rand();
let mut entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
@ -3916,6 +3989,7 @@ mod tests {
let cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
SocketAddrSpace::Unspecified,
);
for i in 0..10 {
// make these invalid for the upcoming repair request
@ -3987,6 +4061,7 @@ mod tests {
let cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
SocketAddrSpace::Unspecified,
);
let mut range: Vec<Slot> = vec![];
//random should be hard to compress
@ -4034,6 +4109,7 @@ mod tests {
let cluster_info = Arc::new(ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
SocketAddrSpace::Unspecified,
));
assert_eq!(cluster_info.my_shred_version(), 0);
@ -4117,6 +4193,7 @@ mod tests {
contact_info
},
node_keypair,
SocketAddrSpace::Unspecified,
));
assert_eq!(cluster_info.my_shred_version(), 2);
@ -4288,7 +4365,11 @@ mod tests {
#[ignore] // TODO: debug why this is flaky on buildkite!
fn test_pull_request_time_pruning() {
let node = Node::new_localhost();
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));
let cluster_info = Arc::new(ClusterInfo::new(
node.info,
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
));
let entrypoint_pubkey = solana_sdk::pubkey::new_rand();
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
cluster_info.set_entrypoint(entrypoint);

View File

@ -7,6 +7,7 @@ use {
signature::{Keypair, Signer},
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
std::net::{IpAddr, SocketAddr},
};
@ -200,16 +201,22 @@ impl ContactInfo {
(addr.port() != 0) && Self::is_valid_ip(addr.ip())
}
pub fn is_valid_address(addr: &SocketAddr) -> bool {
Self::is_valid_tvu_address(addr) && solana_streamer::socket::is_global(addr)
// TODO: Replace this entirely with streamer SocketAddrSpace.
pub fn is_valid_address(addr: &SocketAddr, socket_addr_space: &SocketAddrSpace) -> bool {
Self::is_valid_tvu_address(addr) && socket_addr_space.check(addr)
}
pub fn client_facing_addr(&self) -> (SocketAddr, SocketAddr) {
(self.rpc, self.tpu)
}
pub fn valid_client_facing_addr(&self) -> Option<(SocketAddr, SocketAddr)> {
if ContactInfo::is_valid_address(&self.rpc) && ContactInfo::is_valid_address(&self.tpu) {
pub fn valid_client_facing_addr(
&self,
socket_addr_space: &SocketAddrSpace,
) -> Option<(SocketAddr, SocketAddr)> {
if ContactInfo::is_valid_address(&self.rpc, socket_addr_space)
&& ContactInfo::is_valid_address(&self.tpu, socket_addr_space)
{
Some((self.rpc, self.tpu))
} else {
None
@ -224,13 +231,25 @@ mod tests {
#[test]
fn test_is_valid_address() {
let bad_address_port = socketaddr!("127.0.0.1:0");
assert!(!ContactInfo::is_valid_address(&bad_address_port));
assert!(!ContactInfo::is_valid_address(
&bad_address_port,
&SocketAddrSpace::Unspecified
));
let bad_address_unspecified = socketaddr!(0, 1234);
assert!(!ContactInfo::is_valid_address(&bad_address_unspecified));
assert!(!ContactInfo::is_valid_address(
&bad_address_unspecified,
&SocketAddrSpace::Unspecified
));
let bad_address_multicast = socketaddr!([224, 254, 0, 0], 1234);
assert!(!ContactInfo::is_valid_address(&bad_address_multicast));
assert!(!ContactInfo::is_valid_address(
&bad_address_multicast,
&SocketAddrSpace::Unspecified
));
let loopback = socketaddr!("127.0.0.1:1234");
assert!(ContactInfo::is_valid_address(&loopback));
assert!(ContactInfo::is_valid_address(
&loopback,
&SocketAddrSpace::Unspecified
));
// assert!(!ContactInfo::is_valid_ip_internal(loopback.ip(), false));
}
@ -313,11 +332,19 @@ mod tests {
#[test]
fn test_valid_client_facing() {
let mut ci = ContactInfo::default();
assert_eq!(ci.valid_client_facing_addr(), None);
assert_eq!(
ci.valid_client_facing_addr(&SocketAddrSpace::Unspecified),
None
);
ci.tpu = socketaddr!("127.0.0.1:123");
assert_eq!(ci.valid_client_facing_addr(), None);
assert_eq!(
ci.valid_client_facing_addr(&SocketAddrSpace::Unspecified),
None
);
ci.rpc = socketaddr!("127.0.0.1:234");
assert!(ci.valid_client_facing_addr().is_some());
assert!(ci
.valid_client_facing_addr(&SocketAddrSpace::Unspecified)
.is_some());
}
#[test]

View File

@ -24,6 +24,7 @@ use {
signature::{Keypair, Signer},
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
std::{
collections::{HashMap, HashSet},
net::SocketAddr,
@ -176,6 +177,7 @@ impl CrdsGossip {
self_shred_version: u16,
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
socket_addr_space: &SocketAddrSpace,
) {
let network_size = self.crds.read().unwrap().num_nodes();
self.push.refresh_push_active_set(
@ -186,6 +188,7 @@ impl CrdsGossip {
self_shred_version,
network_size,
CRDS_GOSSIP_NUM_ACTIVE,
socket_addr_space,
)
}
@ -202,6 +205,7 @@ impl CrdsGossip {
bloom_size: usize,
ping_cache: &Mutex<PingCache>,
pings: &mut Vec<(SocketAddr, Ping)>,
socket_addr_space: &SocketAddrSpace,
) -> Result<(ContactInfo, Vec<CrdsFilter>), CrdsGossipError> {
self.pull.new_pull_request(
thread_pool,
@ -214,6 +218,7 @@ impl CrdsGossip {
bloom_size,
ping_cache,
pings,
socket_addr_space,
)
}
@ -370,6 +375,7 @@ mod test {
0, // shred version
&HashMap::new(), // stakes
None, // gossip validators
&SocketAddrSpace::Unspecified,
);
let now = timestamp();
//incorrect dest

View File

@ -32,6 +32,7 @@ use {
pubkey::Pubkey,
signature::{Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
std::{
collections::{HashMap, HashSet, VecDeque},
convert::TryInto,
@ -226,6 +227,7 @@ impl CrdsGossipPull {
bloom_size: usize,
ping_cache: &Mutex<PingCache>,
pings: &mut Vec<(SocketAddr, Ping)>,
socket_addr_space: &SocketAddrSpace,
) -> Result<(ContactInfo, Vec<CrdsFilter>), CrdsGossipError> {
let (weights, peers): (Vec<_>, Vec<_>) = {
self.pull_options(
@ -235,6 +237,7 @@ impl CrdsGossipPull {
now,
gossip_validators,
stakes,
socket_addr_space,
)
.into_iter()
.map(|(weight, node, gossip_addr)| (weight, (node, gossip_addr)))
@ -281,6 +284,7 @@ impl CrdsGossipPull {
now: u64,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
socket_addr_space: &SocketAddrSpace,
) -> Vec<(
u64, // weight
Pubkey, // node
@ -307,7 +311,7 @@ impl CrdsGossipPull {
})
.filter(|v| {
v.id != *self_id
&& ContactInfo::is_valid_address(&v.gossip)
&& ContactInfo::is_valid_address(&v.gossip, socket_addr_space)
&& (self_shred_version == 0 || self_shred_version == v.shred_version)
&& gossip_validators
.map_or(true, |gossip_validators| gossip_validators.contains(&v.id))
@ -734,7 +738,15 @@ pub(crate) mod tests {
}
let now = 1024;
let crds = RwLock::new(crds);
let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, None, &stakes);
let mut options = node.pull_options(
&crds,
&me.label().pubkey(),
0,
now,
None,
&stakes,
&SocketAddrSpace::Unspecified,
);
assert!(!options.is_empty());
options
.sort_by(|(weight_l, _, _), (weight_r, _, _)| weight_r.partial_cmp(weight_l).unwrap());
@ -783,7 +795,15 @@ pub(crate) mod tests {
// shred version 123 should ignore nodes with versions 0 and 456
let options = node
.pull_options(&crds, &me.label().pubkey(), 123, 0, None, &stakes)
.pull_options(
&crds,
&me.label().pubkey(),
123,
0,
None,
&stakes,
&SocketAddrSpace::Unspecified,
)
.iter()
.map(|(_, pk, _)| *pk)
.collect::<Vec<_>>();
@ -793,7 +813,15 @@ pub(crate) mod tests {
// spy nodes will see all
let options = node
.pull_options(&crds, &spy.label().pubkey(), 0, 0, None, &stakes)
.pull_options(
&crds,
&spy.label().pubkey(),
0,
0,
None,
&stakes,
&SocketAddrSpace::Unspecified,
)
.iter()
.map(|(_, pk, _)| *pk)
.collect::<Vec<_>>();
@ -834,6 +862,7 @@ pub(crate) mod tests {
0,
Some(&gossip_validators),
&stakes,
&SocketAddrSpace::Unspecified,
);
assert!(options.is_empty());
@ -846,6 +875,7 @@ pub(crate) mod tests {
0,
Some(&gossip_validators),
&stakes,
&SocketAddrSpace::Unspecified,
);
assert!(options.is_empty());
@ -858,6 +888,7 @@ pub(crate) mod tests {
0,
Some(&gossip_validators),
&stakes,
&SocketAddrSpace::Unspecified,
);
assert_eq!(options.len(), 1);
assert_eq!(options[0].1, node_123.pubkey());
@ -978,6 +1009,7 @@ pub(crate) mod tests {
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
&SocketAddrSpace::Unspecified,
),
Err(CrdsGossipError::NoPeers)
);
@ -995,6 +1027,7 @@ pub(crate) mod tests {
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
&SocketAddrSpace::Unspecified,
),
Err(CrdsGossipError::NoPeers)
);
@ -1017,6 +1050,7 @@ pub(crate) mod tests {
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
&SocketAddrSpace::Unspecified,
);
let (peer, _) = req.unwrap();
assert_eq!(peer, *new.contact_info().unwrap());
@ -1036,6 +1070,7 @@ pub(crate) mod tests {
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
&SocketAddrSpace::Unspecified,
);
// Even though the offline node should have higher weight, we shouldn't request from it
// until we receive a ping.
@ -1091,6 +1126,7 @@ pub(crate) mod tests {
PACKET_DATA_SIZE, // bloom_size
&ping_cache,
&mut pings,
&SocketAddrSpace::Unspecified,
)
.unwrap();
peer
@ -1170,6 +1206,7 @@ pub(crate) mod tests {
PACKET_DATA_SIZE,
&Mutex::new(ping_cache),
&mut pings,
&SocketAddrSpace::Unspecified,
);
let dest_crds = RwLock::<Crds>::default();
@ -1260,6 +1297,7 @@ pub(crate) mod tests {
PACKET_DATA_SIZE,
&Mutex::new(ping_cache),
&mut pings,
&SocketAddrSpace::Unspecified,
);
let dest_crds = RwLock::<Crds>::default();
@ -1339,6 +1377,7 @@ pub(crate) mod tests {
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
&SocketAddrSpace::Unspecified,
);
let (_, filters) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();

View File

@ -28,6 +28,7 @@ use {
rand::{seq::SliceRandom, Rng},
solana_runtime::bloom::{AtomicBloom, Bloom},
solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp},
solana_streamer::socket::SocketAddrSpace,
std::{
cmp,
collections::{HashMap, HashSet},
@ -342,6 +343,7 @@ impl CrdsGossipPush {
self_shred_version: u16,
network_size: usize,
ratio: usize,
socket_addr_space: &SocketAddrSpace,
) {
const BLOOM_FALSE_RATE: f64 = 0.1;
const BLOOM_MAX_BITS: usize = 1024 * 8 * 4;
@ -352,9 +354,16 @@ impl CrdsGossipPush {
let mut rng = rand::thread_rng();
let mut new_items = HashMap::new();
let (weights, peers): (Vec<_>, Vec<_>) = {
self.push_options(crds, self_id, self_shred_version, stakes, gossip_validators)
.into_iter()
.unzip()
self.push_options(
crds,
self_id,
self_shred_version,
stakes,
gossip_validators,
socket_addr_space,
)
.into_iter()
.unzip()
};
if peers.is_empty() {
return;
@ -396,6 +405,7 @@ impl CrdsGossipPush {
self_shred_version: u16,
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
socket_addr_space: &SocketAddrSpace,
) -> Vec<(/*weight:*/ u64, /*node:*/ Pubkey)> {
let now = timestamp();
let mut rng = rand::thread_rng();
@ -420,7 +430,7 @@ impl CrdsGossipPush {
})
.filter(|info| {
info.id != *self_id
&& ContactInfo::is_valid_address(&info.gossip)
&& ContactInfo::is_valid_address(&info.gossip, socket_addr_space)
&& self_shred_version == info.shred_version
&& gossip_validators.map_or(true, |gossip_validators| {
gossip_validators.contains(&info.id)
@ -657,7 +667,16 @@ mod test {
assert_eq!(crds.insert(value1.clone(), now), Ok(()));
let crds = RwLock::new(crds);
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&SocketAddrSpace::Unspecified,
);
let active_set = push.active_set.read().unwrap();
assert!(active_set.get(&value1.label().pubkey()).is_some());
@ -669,7 +688,16 @@ mod test {
drop(active_set);
assert_eq!(crds.write().unwrap().insert(value2.clone(), now), Ok(()));
for _ in 0..30 {
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&SocketAddrSpace::Unspecified,
);
let active_set = push.active_set.read().unwrap();
if active_set.get(&value2.label().pubkey()).is_some() {
break;
@ -685,7 +713,16 @@ mod test {
));
assert_eq!(crds.write().unwrap().insert(value2.clone(), now), Ok(()));
}
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&SocketAddrSpace::Unspecified,
);
assert_eq!(push.active_set.read().unwrap().len(), push.num_active);
}
#[test]
@ -706,7 +743,14 @@ mod test {
push.last_pushed_to.write().unwrap().put(id, time);
}
let crds = RwLock::new(crds);
let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None);
let mut options = push.push_options(
&crds,
&Pubkey::default(),
0,
&stakes,
None,
&SocketAddrSpace::Unspecified,
);
assert!(!options.is_empty());
options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
// check that the highest stake holder is also the heaviest weighted.
@ -755,7 +799,14 @@ mod test {
// shred version 123 should ignore nodes with versions 0 and 456
let options = node
.push_options(&crds, &me.label().pubkey(), 123, &stakes, None)
.push_options(
&crds,
&me.label().pubkey(),
123,
&stakes,
None,
&SocketAddrSpace::Unspecified,
)
.iter()
.map(|(_, pk)| *pk)
.collect::<Vec<_>>();
@ -764,7 +815,14 @@ mod test {
assert!(options.contains(&node_123.pubkey()));
// spy nodes should not push to people on different shred versions
let options = node.push_options(&crds, &spy.label().pubkey(), 0, &stakes, None);
let options = node.push_options(
&crds,
&spy.label().pubkey(),
0,
&stakes,
None,
&SocketAddrSpace::Unspecified,
);
assert!(options.is_empty());
}
@ -799,6 +857,7 @@ mod test {
0,
&stakes,
Some(&gossip_validators),
&SocketAddrSpace::Unspecified,
);
assert!(options.is_empty());
@ -811,6 +870,7 @@ mod test {
0,
&stakes,
Some(&gossip_validators),
&SocketAddrSpace::Unspecified,
);
assert!(options.is_empty());
@ -822,6 +882,7 @@ mod test {
0,
&stakes,
Some(&gossip_validators),
&SocketAddrSpace::Unspecified,
);
assert_eq!(options.len(), 1);
@ -839,7 +900,16 @@ mod test {
)));
assert_eq!(crds.insert(peer.clone(), now), Ok(()));
let crds = RwLock::new(crds);
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&SocketAddrSpace::Unspecified,
);
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
@ -877,7 +947,16 @@ mod test {
push.process_push_message(&crds, &Pubkey::default(), vec![peers[2].clone()], now),
[Ok(origin[2])],
);
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&SocketAddrSpace::Unspecified,
);
// push 3's contact info to 1 and 2 and 3
let expected: HashMap<_, _> = vec![
@ -900,7 +979,16 @@ mod test {
)));
assert_eq!(crds.insert(peer.clone(), 0), Ok(()));
let crds = RwLock::new(crds);
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&SocketAddrSpace::Unspecified,
);
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
@ -929,7 +1017,16 @@ mod test {
)));
assert_eq!(crds.insert(peer, 0), Ok(()));
let crds = RwLock::new(crds);
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&SocketAddrSpace::Unspecified,
);
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ci.wallclock = 1;

View File

@ -13,6 +13,7 @@ use {
pubkey::Pubkey,
signature::{Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
solana_streamer::streamer,
std::{
collections::HashSet,
@ -47,6 +48,7 @@ impl GossipService {
&cluster_info.id(),
gossip_socket.local_addr().unwrap()
);
let socket_addr_space = *cluster_info.socket_addr_space();
let t_receiver = streamer::receiver(
gossip_socket.clone(),
exit,
@ -82,7 +84,12 @@ impl GossipService {
// https://github.com/rust-lang/rust/issues/54267
// responder thread should start after response_sender.clone(). see:
// https://github.com/rust-lang/rust/issues/39364#issuecomment-381446873
let t_responder = streamer::responder("gossip", gossip_socket, response_receiver);
let t_responder = streamer::responder(
"gossip",
gossip_socket,
response_receiver,
socket_addr_space,
);
let thread_hdls = vec![
t_receiver,
t_responder,
@ -105,6 +112,7 @@ impl GossipService {
pub fn discover_cluster(
entrypoint: &SocketAddr,
num_nodes: usize,
socket_addr_space: SocketAddrSpace,
) -> std::io::Result<Vec<ContactInfo>> {
const DISCOVER_CLUSTER_TIMEOUT: Duration = Duration::from_secs(120);
let (_all_peers, validators) = discover(
@ -116,6 +124,7 @@ pub fn discover_cluster(
None, // find_node_by_gossip_addr
None, // my_gossip_addr
0, // my_shred_version
socket_addr_space,
)?;
Ok(validators)
}
@ -129,6 +138,7 @@ pub fn discover(
find_node_by_gossip_addr: Option<&SocketAddr>,
my_gossip_addr: Option<&SocketAddr>,
my_shred_version: u16,
socket_addr_space: SocketAddrSpace,
) -> std::io::Result<(
Vec<ContactInfo>, // all gossip peers
Vec<ContactInfo>, // tvu peers (validators)
@ -145,6 +155,7 @@ pub fn discover(
my_gossip_addr,
my_shred_version,
true, // should_check_duplicate_instance,
socket_addr_space,
);
let id = spy_ref.id();
@ -191,28 +202,31 @@ pub fn discover(
}
/// Creates a ThinClient per valid node
pub fn get_clients(nodes: &[ContactInfo]) -> Vec<ThinClient> {
pub fn get_clients(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) -> Vec<ThinClient> {
nodes
.iter()
.filter_map(ContactInfo::valid_client_facing_addr)
.filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space))
.map(|addrs| create_client(addrs, VALIDATOR_PORT_RANGE))
.collect()
}
/// Creates a ThinClient by selecting a valid node at random
pub fn get_client(nodes: &[ContactInfo]) -> ThinClient {
pub fn get_client(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) -> ThinClient {
let nodes: Vec<_> = nodes
.iter()
.filter_map(ContactInfo::valid_client_facing_addr)
.filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space))
.collect();
let select = thread_rng().gen_range(0, nodes.len());
create_client(nodes[select], VALIDATOR_PORT_RANGE)
}
pub fn get_multi_client(nodes: &[ContactInfo]) -> (ThinClient, usize) {
pub fn get_multi_client(
nodes: &[ContactInfo],
socket_addr_space: &SocketAddrSpace,
) -> (ThinClient, usize) {
let addrs: Vec<_> = nodes
.iter()
.filter_map(ContactInfo::valid_client_facing_addr)
.filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space))
.collect();
let rpc_addrs: Vec<_> = addrs.iter().map(|addr| addr.0).collect();
let tpu_addrs: Vec<_> = addrs.iter().map(|addr| addr.1).collect();
@ -303,13 +317,14 @@ pub fn make_gossip_node(
gossip_addr: Option<&SocketAddr>,
shred_version: u16,
should_check_duplicate_instance: bool,
socket_addr_space: SocketAddrSpace,
) -> (GossipService, Option<TcpListener>, Arc<ClusterInfo>) {
let (node, gossip_socket, ip_echo) = if let Some(gossip_addr) = gossip_addr {
ClusterInfo::gossip_node(keypair.pubkey(), gossip_addr, shred_version)
} else {
ClusterInfo::spy_node(keypair.pubkey(), shred_version)
};
let cluster_info = ClusterInfo::new(node, Arc::new(keypair));
let cluster_info = ClusterInfo::new(node, Arc::new(keypair), socket_addr_space);
if let Some(entrypoint) = entrypoint {
cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint));
}
@ -339,7 +354,11 @@ mod tests {
fn test_exit() {
let exit = Arc::new(AtomicBool::new(false));
let tn = Node::new_localhost();
let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone());
let cluster_info = ClusterInfo::new(
tn.info.clone(),
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
);
let c = Arc::new(cluster_info);
let d = GossipService::new(
&c,
@ -362,7 +381,11 @@ mod tests {
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
let peer0_info = ContactInfo::new_localhost(&peer0, 0);
let peer1_info = ContactInfo::new_localhost(&peer1, 0);
let cluster_info = ClusterInfo::new(contact_info, Arc::new(keypair));
let cluster_info = ClusterInfo::new(
contact_info,
Arc::new(keypair),
SocketAddrSpace::Unspecified,
);
cluster_info.insert_info(peer0_info.clone());
cluster_info.insert_info(peer1_info);

View File

@ -11,6 +11,7 @@ use {
},
solana_gossip::{contact_info::ContactInfo, gossip_service::discover},
solana_sdk::pubkey::Pubkey,
solana_streamer::socket::SocketAddrSpace,
std::{
error,
net::{IpAddr, Ipv4Addr, SocketAddr},
@ -31,6 +32,13 @@ fn parse_matches() -> ArgMatches<'static> {
.about(crate_description!())
.version(solana_version::version!())
.setting(AppSettings::SubcommandRequiredElseHelp)
.arg(
Arg::with_name("allow_private_addr")
.long("allow-private-addr")
.takes_value(false)
.help("Allow contacting private ip addresses")
.hidden(true),
)
.subcommand(
SubCommand::with_name("rpc-url")
.about("Get an RPC URL for the cluster")
@ -223,6 +231,7 @@ fn process_spy(matches: &ArgMatches) -> std::io::Result<()> {
let pubkey = matches
.value_of("node_pubkey")
.map(|pubkey_str| pubkey_str.parse::<Pubkey>().unwrap());
let socket_addr_space = SocketAddrSpace::new(matches.is_present("allow_private_addr"));
let shred_version = value_t_or_exit!(matches, "shred_version", u16);
let identity_keypair = keypair_of(matches, "identity");
@ -250,6 +259,7 @@ fn process_spy(matches: &ArgMatches) -> std::io::Result<()> {
None, // find_node_by_gossip_addr
Some(&gossip_addr), // my_gossip_addr
shred_version,
socket_addr_space,
)?;
process_spy_results(timeout, validators, num_nodes, num_nodes_exactly, pubkey);
@ -272,6 +282,7 @@ fn process_rpc_url(matches: &ArgMatches) -> std::io::Result<()> {
let entrypoint_addr = parse_entrypoint(matches);
let timeout = value_t_or_exit!(matches, "timeout", u64);
let shred_version = value_t_or_exit!(matches, "shred_version", u16);
let socket_addr_space = SocketAddrSpace::new(matches.is_present("allow_private_addr"));
let (_all_peers, validators) = discover(
None, // keypair
entrypoint_addr.as_ref(),
@ -281,13 +292,14 @@ fn process_rpc_url(matches: &ArgMatches) -> std::io::Result<()> {
entrypoint_addr.as_ref(), // find_node_by_gossip_addr
None, // my_gossip_addr
shred_version,
socket_addr_space,
)?;
let rpc_addrs: Vec<_> = validators
.iter()
.filter_map(|contact_info| {
if (any || all || Some(contact_info.gossip) == entrypoint_addr)
&& ContactInfo::is_valid_address(&contact_info.rpc)
&& ContactInfo::is_valid_address(&contact_info.rpc, &socket_addr_space)
{
return Some(contact_info.rpc);
}