diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index f08ac8b826..c0a3473bdd 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -23,7 +23,7 @@ use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; use solana_runtime::bank::Bank; use solana_sdk::timing::timestamp; use solana_sdk::{clock::Slot, pubkey::Pubkey}; -use solana_streamer::sendmmsg::send_mmsg; +use solana_streamer::{sendmmsg::send_mmsg, socket::is_global}; use std::sync::atomic::AtomicU64; use std::{ collections::HashMap, @@ -387,10 +387,15 @@ pub fn broadcast_shreds( let mut shred_select = Measure::start("shred_select"); let packets: Vec<_> = shreds .iter() - .map(|shred| { + .filter_map(|shred| { let broadcast_index = weighted_best(&peers_and_stakes, shred.seed()); + let node = &peers[broadcast_index]; - (&shred.payload, &peers[broadcast_index].tvu) + if is_global(&node.tvu) { + Some((&shred.payload, &node.tvu)) + } else { + None + } }) .collect(); shred_select.stop(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 82b6f5f715..8499e34f2b 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -61,8 +61,11 @@ use solana_sdk::{ timing::timestamp, transaction::Transaction, }; -use solana_streamer::sendmmsg::multicast; -use solana_streamer::streamer::{PacketReceiver, PacketSender}; +use solana_streamer::{ + sendmmsg::multicast, + socket::is_global, + streamer::{PacketReceiver, PacketSender}, +}; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use std::{ borrow::Cow, @@ -1250,7 +1253,7 @@ impl ClusterInfo { .filter(|node| { node.id != self_pubkey && node.shred_version == self_shred_version - && ContactInfo::is_valid_address(&node.tvu) + && ContactInfo::is_valid_tvu_address(&node.tvu) }) .cloned() .collect() @@ -1382,9 +1385,14 @@ impl ClusterInfo { .iter() .map(|peer| &peer.tvu_forwards) .filter(|addr| ContactInfo::is_valid_address(addr)) + .filter(|addr| is_global(addr)) .collect() } else { - peers.iter().map(|peer| &peer.tvu).collect() + peers + .iter() + .map(|peer| &peer.tvu) + .filter(|addr| is_global(addr)) + .collect() }; let mut dests = &dests[..]; let data = &packet.data[..packet.meta.size]; diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index eafaf636ba..17630a38b0 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -191,10 +191,17 @@ impl ContactInfo { /// port must not be 0 /// ip must be specified and not multicast /// loopback ip is only allowed in tests - pub fn is_valid_address(addr: &SocketAddr) -> bool { + // Keeping this for now not to break tvu-peers and turbine shuffle order of + // nodes when arranging nodes on retransmit tree. Private IP addresses in + // turbine are filtered out just before sending packets. + pub(crate) fn is_valid_tvu_address(addr: &SocketAddr) -> bool { (addr.port() != 0) && Self::is_valid_ip(addr.ip()) } + pub fn is_valid_address(addr: &SocketAddr) -> bool { + Self::is_valid_tvu_address(addr) && solana_streamer::socket::is_global(addr) + } + pub fn client_facing_addr(&self) -> (SocketAddr, SocketAddr) { (self.rpc, self.tpu) } diff --git a/streamer/src/lib.rs b/streamer/src/lib.rs index e0b9bd7176..c8ba5f6d4b 100644 --- a/streamer/src/lib.rs +++ b/streamer/src/lib.rs @@ -2,6 +2,7 @@ pub mod packet; pub mod recvmmsg; pub mod sendmmsg; +pub mod socket; pub mod streamer; #[macro_use] diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index f8837d3b43..faa67237d5 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -1,5 +1,8 @@ //! The `packet` module defines data structures and methods to pull data from the network. -use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; +use crate::{ + recvmmsg::{recv_mmsg, NUM_RCVMMSGS}, + socket::is_global, +}; pub use solana_perf::packet::{ limited_deserialize, to_packets_chunked, Packets, PacketsRecycler, NUM_PACKETS, PACKETS_PER_BATCH, @@ -56,8 +59,10 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: u64) -> Res pub fn send_to(obj: &Packets, socket: &UdpSocket) -> Result<()> { for p in &obj.packets { - let a = p.meta.addr(); - socket.send_to(&p.data[..p.meta.size], &a)?; + let addr = p.meta.addr(); + if is_global(&addr) { + socket.send_to(&p.data[..p.meta.size], &addr)?; + } } Ok(()) } diff --git a/streamer/src/socket.rs b/streamer/src/socket.rs new file mode 100644 index 0000000000..08d40f171b --- /dev/null +++ b/streamer/src/socket.rs @@ -0,0 +1,28 @@ +use std::net::SocketAddr; + +// TODO: remove these once IpAddr::is_global is stable. + +#[cfg(test)] +pub fn is_global(_: &SocketAddr) -> bool { + true +} + +#[cfg(not(test))] +pub fn is_global(addr: &SocketAddr) -> bool { + use std::net::IpAddr; + + match addr.ip() { + IpAddr::V4(addr) => { + // TODO: Consider excluding: + // addr.is_loopback() || addr.is_link_local() + // || addr.is_broadcast() || addr.is_documentation() + // || addr.is_unspecified() + !addr.is_private() + } + IpAddr::V6(_) => { + // TODO: Consider excluding: + // addr.is_loopback() || addr.is_unspecified(), + true + } + } +}