excludes private ip addresses (#18739)
(cherry picked from commit e316586516
)
# Conflicts:
# core/src/broadcast_stage.rs
# core/src/cluster_info.rs
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -23,7 +23,7 @@ use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
|
|||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
use solana_sdk::timing::timestamp;
|
use solana_sdk::timing::timestamp;
|
||||||
use solana_sdk::{clock::Slot, pubkey::Pubkey};
|
use solana_sdk::{clock::Slot, pubkey::Pubkey};
|
||||||
use solana_streamer::sendmmsg::send_mmsg;
|
use solana_streamer::{sendmmsg::send_mmsg, socket::is_global};
|
||||||
use std::sync::atomic::AtomicU64;
|
use std::sync::atomic::AtomicU64;
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
@ -387,10 +387,15 @@ pub fn broadcast_shreds(
|
|||||||
let mut shred_select = Measure::start("shred_select");
|
let mut shred_select = Measure::start("shred_select");
|
||||||
let packets: Vec<_> = shreds
|
let packets: Vec<_> = shreds
|
||||||
.iter()
|
.iter()
|
||||||
.map(|shred| {
|
.filter_map(|shred| {
|
||||||
let broadcast_index = weighted_best(&peers_and_stakes, shred.seed());
|
let broadcast_index = weighted_best(&peers_and_stakes, shred.seed());
|
||||||
|
let node = &peers[broadcast_index];
|
||||||
|
|
||||||
(&shred.payload, &peers[broadcast_index].tvu)
|
if is_global(&node.tvu) {
|
||||||
|
Some((&shred.payload, &node.tvu))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
shred_select.stop();
|
shred_select.stop();
|
||||||
|
@ -61,8 +61,11 @@ use solana_sdk::{
|
|||||||
timing::timestamp,
|
timing::timestamp,
|
||||||
transaction::Transaction,
|
transaction::Transaction,
|
||||||
};
|
};
|
||||||
use solana_streamer::sendmmsg::multicast;
|
use solana_streamer::{
|
||||||
use solana_streamer::streamer::{PacketReceiver, PacketSender};
|
sendmmsg::multicast,
|
||||||
|
socket::is_global,
|
||||||
|
streamer::{PacketReceiver, PacketSender},
|
||||||
|
};
|
||||||
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
|
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
|
||||||
use std::{
|
use std::{
|
||||||
borrow::Cow,
|
borrow::Cow,
|
||||||
@ -1250,7 +1253,7 @@ impl ClusterInfo {
|
|||||||
.filter(|node| {
|
.filter(|node| {
|
||||||
node.id != self_pubkey
|
node.id != self_pubkey
|
||||||
&& node.shred_version == self_shred_version
|
&& node.shred_version == self_shred_version
|
||||||
&& ContactInfo::is_valid_address(&node.tvu)
|
&& ContactInfo::is_valid_tvu_address(&node.tvu)
|
||||||
})
|
})
|
||||||
.cloned()
|
.cloned()
|
||||||
.collect()
|
.collect()
|
||||||
@ -1382,9 +1385,14 @@ impl ClusterInfo {
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|peer| &peer.tvu_forwards)
|
.map(|peer| &peer.tvu_forwards)
|
||||||
.filter(|addr| ContactInfo::is_valid_address(addr))
|
.filter(|addr| ContactInfo::is_valid_address(addr))
|
||||||
|
.filter(|addr| is_global(addr))
|
||||||
.collect()
|
.collect()
|
||||||
} else {
|
} else {
|
||||||
peers.iter().map(|peer| &peer.tvu).collect()
|
peers
|
||||||
|
.iter()
|
||||||
|
.map(|peer| &peer.tvu)
|
||||||
|
.filter(|addr| is_global(addr))
|
||||||
|
.collect()
|
||||||
};
|
};
|
||||||
let mut dests = &dests[..];
|
let mut dests = &dests[..];
|
||||||
let data = &packet.data[..packet.meta.size];
|
let data = &packet.data[..packet.meta.size];
|
||||||
|
@ -191,10 +191,17 @@ impl ContactInfo {
|
|||||||
/// port must not be 0
|
/// port must not be 0
|
||||||
/// ip must be specified and not multicast
|
/// ip must be specified and not multicast
|
||||||
/// loopback ip is only allowed in tests
|
/// loopback ip is only allowed in tests
|
||||||
pub fn is_valid_address(addr: &SocketAddr) -> bool {
|
// Keeping this for now not to break tvu-peers and turbine shuffle order of
|
||||||
|
// nodes when arranging nodes on retransmit tree. Private IP addresses in
|
||||||
|
// turbine are filtered out just before sending packets.
|
||||||
|
pub(crate) fn is_valid_tvu_address(addr: &SocketAddr) -> bool {
|
||||||
(addr.port() != 0) && Self::is_valid_ip(addr.ip())
|
(addr.port() != 0) && Self::is_valid_ip(addr.ip())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_valid_address(addr: &SocketAddr) -> bool {
|
||||||
|
Self::is_valid_tvu_address(addr) && solana_streamer::socket::is_global(addr)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn client_facing_addr(&self) -> (SocketAddr, SocketAddr) {
|
pub fn client_facing_addr(&self) -> (SocketAddr, SocketAddr) {
|
||||||
(self.rpc, self.tpu)
|
(self.rpc, self.tpu)
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
pub mod packet;
|
pub mod packet;
|
||||||
pub mod recvmmsg;
|
pub mod recvmmsg;
|
||||||
pub mod sendmmsg;
|
pub mod sendmmsg;
|
||||||
|
pub mod socket;
|
||||||
pub mod streamer;
|
pub mod streamer;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
|
@ -1,5 +1,8 @@
|
|||||||
//! The `packet` module defines data structures and methods to pull data from the network.
|
//! The `packet` module defines data structures and methods to pull data from the network.
|
||||||
use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
|
use crate::{
|
||||||
|
recvmmsg::{recv_mmsg, NUM_RCVMMSGS},
|
||||||
|
socket::is_global,
|
||||||
|
};
|
||||||
pub use solana_perf::packet::{
|
pub use solana_perf::packet::{
|
||||||
limited_deserialize, to_packets_chunked, Packets, PacketsRecycler, NUM_PACKETS,
|
limited_deserialize, to_packets_chunked, Packets, PacketsRecycler, NUM_PACKETS,
|
||||||
PACKETS_PER_BATCH,
|
PACKETS_PER_BATCH,
|
||||||
@ -56,8 +59,10 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: u64) -> Res
|
|||||||
|
|
||||||
pub fn send_to(obj: &Packets, socket: &UdpSocket) -> Result<()> {
|
pub fn send_to(obj: &Packets, socket: &UdpSocket) -> Result<()> {
|
||||||
for p in &obj.packets {
|
for p in &obj.packets {
|
||||||
let a = p.meta.addr();
|
let addr = p.meta.addr();
|
||||||
socket.send_to(&p.data[..p.meta.size], &a)?;
|
if is_global(&addr) {
|
||||||
|
socket.send_to(&p.data[..p.meta.size], &addr)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
28
streamer/src/socket.rs
Normal file
28
streamer/src/socket.rs
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
// TODO: remove these once IpAddr::is_global is stable.
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn is_global(_: &SocketAddr) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(test))]
|
||||||
|
pub fn is_global(addr: &SocketAddr) -> bool {
|
||||||
|
use std::net::IpAddr;
|
||||||
|
|
||||||
|
match addr.ip() {
|
||||||
|
IpAddr::V4(addr) => {
|
||||||
|
// TODO: Consider excluding:
|
||||||
|
// addr.is_loopback() || addr.is_link_local()
|
||||||
|
// || addr.is_broadcast() || addr.is_documentation()
|
||||||
|
// || addr.is_unspecified()
|
||||||
|
!addr.is_private()
|
||||||
|
}
|
||||||
|
IpAddr::V6(_) => {
|
||||||
|
// TODO: Consider excluding:
|
||||||
|
// addr.is_loopback() || addr.is_unspecified(),
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user