adds shred-version to ip-echo-server response
When starting a validator, the node initially joins gossip with shred_verison = 0, until it adopts the entrypoint's shred-version: https://github.com/solana-labs/solana/blob/9b182f408/validator/src/main.rs#L417 Depending on the load on the entrypoint, this adopting entrypoint shred-version through gossip sometimes becomes very slow, and causes several problems in gossip because we have to partially support shred_version == 0 which is a source of leaking crds values from one cluster to another. e.g. see https://github.com/solana-labs/solana/pull/17899 and the other linked issues there. In order to remove shred_version == 0 from gossip, this commit adds shred-version to ip-echo-server response. Once the entrypoints are updated, on validator start-up, if --expected_shred_version is not specified we will obtain shred-version from the entrypoint using ip-echo-server.
This commit is contained in:
		@@ -19,7 +19,7 @@ fn main() {
 | 
			
		||||
        .unwrap_or_else(|_| panic!("Unable to parse {}", port));
 | 
			
		||||
    let bind_addr = SocketAddr::from(([0, 0, 0, 0], port));
 | 
			
		||||
    let tcp_listener = TcpListener::bind(bind_addr).expect("unable to start tcp listener");
 | 
			
		||||
    let _runtime = solana_net_utils::ip_echo_server(tcp_listener);
 | 
			
		||||
    let _runtime = solana_net_utils::ip_echo_server(tcp_listener, /*shred_version=*/ None);
 | 
			
		||||
    loop {
 | 
			
		||||
        std::thread::park();
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,13 @@
 | 
			
		||||
use {
 | 
			
		||||
    crate::{ip_echo_server_reply_length, HEADER_LENGTH},
 | 
			
		||||
    crate::{HEADER_LENGTH, IP_ECHO_SERVER_RESPONSE_LENGTH},
 | 
			
		||||
    log::*,
 | 
			
		||||
    serde_derive::{Deserialize, Serialize},
 | 
			
		||||
    std::{io, net::SocketAddr, time::Duration},
 | 
			
		||||
    solana_sdk::deserialize_utils::default_on_eof,
 | 
			
		||||
    std::{
 | 
			
		||||
        io,
 | 
			
		||||
        net::{IpAddr, SocketAddr},
 | 
			
		||||
        time::Duration,
 | 
			
		||||
    },
 | 
			
		||||
    tokio::{
 | 
			
		||||
        io::{AsyncReadExt, AsyncWriteExt},
 | 
			
		||||
        net::{TcpListener, TcpStream},
 | 
			
		||||
@@ -23,6 +28,15 @@ pub(crate) struct IpEchoServerMessage {
 | 
			
		||||
    udp_ports: [u16; MAX_PORT_COUNT_PER_MESSAGE], // Fixed size list of ports to avoid vec serde
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
 | 
			
		||||
pub struct IpEchoServerResponse {
 | 
			
		||||
    // Public IP address of request echoed back to the node.
 | 
			
		||||
    pub(crate) address: IpAddr,
 | 
			
		||||
    // Cluster shred-version of the node running the server.
 | 
			
		||||
    #[serde(deserialize_with = "default_on_eof")]
 | 
			
		||||
    pub(crate) shred_version: Option<u16>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl IpEchoServerMessage {
 | 
			
		||||
    pub fn new(tcp_ports: &[u16], udp_ports: &[u16]) -> Self {
 | 
			
		||||
        let mut msg = Self::default();
 | 
			
		||||
@@ -42,7 +56,11 @@ pub(crate) fn ip_echo_server_request_length() -> usize {
 | 
			
		||||
        + REQUEST_TERMINUS_LENGTH
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn process_connection(mut socket: TcpStream, peer_addr: SocketAddr) -> io::Result<()> {
 | 
			
		||||
async fn process_connection(
 | 
			
		||||
    mut socket: TcpStream,
 | 
			
		||||
    peer_addr: SocketAddr,
 | 
			
		||||
    shred_version: Option<u16>,
 | 
			
		||||
) -> io::Result<()> {
 | 
			
		||||
    info!("connection from {:?}", peer_addr);
 | 
			
		||||
 | 
			
		||||
    let mut data = vec![0u8; ip_echo_server_request_length()];
 | 
			
		||||
@@ -113,16 +131,19 @@ async fn process_connection(mut socket: TcpStream, peer_addr: SocketAddr) -> io:
 | 
			
		||||
            let _ = tcp_stream.shutdown();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let response = IpEchoServerResponse {
 | 
			
		||||
        address: peer_addr.ip(),
 | 
			
		||||
        shred_version,
 | 
			
		||||
    };
 | 
			
		||||
    // "\0\0\0\0" header is added to ensure a valid response will never
 | 
			
		||||
    // conflict with the first four bytes of a valid HTTP response.
 | 
			
		||||
    let mut bytes = vec![0u8; ip_echo_server_reply_length()];
 | 
			
		||||
    bincode::serialize_into(&mut bytes[HEADER_LENGTH..], &peer_addr.ip()).unwrap();
 | 
			
		||||
    let mut bytes = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
 | 
			
		||||
    bincode::serialize_into(&mut bytes[HEADER_LENGTH..], &response).unwrap();
 | 
			
		||||
    trace!("response: {:?}", bytes);
 | 
			
		||||
    writer.write_all(&bytes).await
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn run_echo_server(tcp_listener: std::net::TcpListener) {
 | 
			
		||||
async fn run_echo_server(tcp_listener: std::net::TcpListener, shred_version: Option<u16>) {
 | 
			
		||||
    info!("bound to {:?}", tcp_listener.local_addr().unwrap());
 | 
			
		||||
    let tcp_listener =
 | 
			
		||||
        TcpListener::from_std(tcp_listener).expect("Failed to convert std::TcpListener");
 | 
			
		||||
@@ -131,7 +152,7 @@ async fn run_echo_server(tcp_listener: std::net::TcpListener) {
 | 
			
		||||
        match tcp_listener.accept().await {
 | 
			
		||||
            Ok((socket, peer_addr)) => {
 | 
			
		||||
                runtime::Handle::current().spawn(async move {
 | 
			
		||||
                    if let Err(err) = process_connection(socket, peer_addr).await {
 | 
			
		||||
                    if let Err(err) = process_connection(socket, peer_addr, shred_version).await {
 | 
			
		||||
                        info!("session failed: {:?}", err);
 | 
			
		||||
                    }
 | 
			
		||||
                });
 | 
			
		||||
@@ -143,10 +164,14 @@ async fn run_echo_server(tcp_listener: std::net::TcpListener) {
 | 
			
		||||
 | 
			
		||||
/// Starts a simple TCP server on the given port that echos the IP address of any peer that
 | 
			
		||||
/// connects.  Used by |get_public_ip_addr|
 | 
			
		||||
pub fn ip_echo_server(tcp_listener: std::net::TcpListener) -> IpEchoServer {
 | 
			
		||||
pub fn ip_echo_server(
 | 
			
		||||
    tcp_listener: std::net::TcpListener,
 | 
			
		||||
    // Cluster shred-version of the node running the server.
 | 
			
		||||
    shred_version: Option<u16>,
 | 
			
		||||
) -> IpEchoServer {
 | 
			
		||||
    tcp_listener.set_nonblocking(true).unwrap();
 | 
			
		||||
 | 
			
		||||
    let runtime = Runtime::new().expect("Failed to create Runtime");
 | 
			
		||||
    runtime.spawn(run_echo_server(tcp_listener));
 | 
			
		||||
    runtime.spawn(run_echo_server(tcp_listener, shred_version));
 | 
			
		||||
    runtime
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -15,8 +15,8 @@ use {
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
mod ip_echo_server;
 | 
			
		||||
use ip_echo_server::IpEchoServerMessage;
 | 
			
		||||
pub use ip_echo_server::{ip_echo_server, IpEchoServer, MAX_PORT_COUNT_PER_MESSAGE};
 | 
			
		||||
use ip_echo_server::{IpEchoServerMessage, IpEchoServerResponse};
 | 
			
		||||
 | 
			
		||||
/// A data type representing a public Udp socket
 | 
			
		||||
pub struct UdpSocketPair {
 | 
			
		||||
@@ -28,17 +28,12 @@ pub struct UdpSocketPair {
 | 
			
		||||
pub type PortRange = (u16, u16);
 | 
			
		||||
 | 
			
		||||
pub(crate) const HEADER_LENGTH: usize = 4;
 | 
			
		||||
pub(crate) fn ip_echo_server_reply_length() -> usize {
 | 
			
		||||
    let largest_ip_addr = IpAddr::from([0u16; 8]); // IPv6 variant
 | 
			
		||||
    HEADER_LENGTH + bincode::serialized_size(&largest_ip_addr).unwrap() as usize
 | 
			
		||||
}
 | 
			
		||||
pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23;
 | 
			
		||||
 | 
			
		||||
fn ip_echo_server_request(
 | 
			
		||||
    ip_echo_server_addr: &SocketAddr,
 | 
			
		||||
    msg: IpEchoServerMessage,
 | 
			
		||||
) -> Result<IpAddr, String> {
 | 
			
		||||
    let mut data = vec![0u8; ip_echo_server_reply_length()];
 | 
			
		||||
 | 
			
		||||
) -> Result<IpEchoServerResponse, String> {
 | 
			
		||||
    let timeout = Duration::new(5, 0);
 | 
			
		||||
    TcpStream::connect_timeout(ip_echo_server_addr, timeout)
 | 
			
		||||
        .and_then(|mut stream| {
 | 
			
		||||
@@ -54,9 +49,11 @@ fn ip_echo_server_request(
 | 
			
		||||
            stream.set_read_timeout(Some(Duration::new(10, 0)))?;
 | 
			
		||||
            stream.write_all(&bytes)?;
 | 
			
		||||
            stream.shutdown(std::net::Shutdown::Write)?;
 | 
			
		||||
            stream.read(data.as_mut_slice())
 | 
			
		||||
            let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
 | 
			
		||||
            let _ = stream.read(&mut data[..])?;
 | 
			
		||||
            Ok(data)
 | 
			
		||||
        })
 | 
			
		||||
        .and_then(|_| {
 | 
			
		||||
        .and_then(|data| {
 | 
			
		||||
            // It's common for users to accidentally confuse the validator's gossip port and JSON
 | 
			
		||||
            // RPC port.  Attempt to detect when this occurs by looking for the standard HTTP
 | 
			
		||||
            // response header and provide the user with a helpful error message
 | 
			
		||||
@@ -102,7 +99,14 @@ fn ip_echo_server_request(
 | 
			
		||||
/// Determine the public IP address of this machine by asking an ip_echo_server at the given
 | 
			
		||||
/// address
 | 
			
		||||
pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, String> {
 | 
			
		||||
    ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())
 | 
			
		||||
    let resp = ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())?;
 | 
			
		||||
    Ok(resp.address)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn get_cluster_shred_version(ip_echo_server_addr: &SocketAddr) -> Result<u16, String> {
 | 
			
		||||
    let resp = ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())?;
 | 
			
		||||
    resp.shred_version
 | 
			
		||||
        .ok_or_else(|| String::from("IP echo server does not return a shred-version"))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Checks if any of the provided TCP/UDP ports are not reachable by the machine at
 | 
			
		||||
@@ -524,6 +528,57 @@ mod tests {
 | 
			
		||||
    use super::*;
 | 
			
		||||
    use std::net::Ipv4Addr;
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_response_length() {
 | 
			
		||||
        let resp = IpEchoServerResponse {
 | 
			
		||||
            address: IpAddr::from([u16::MAX; 8]), // IPv6 variant
 | 
			
		||||
            shred_version: Some(u16::MAX),
 | 
			
		||||
        };
 | 
			
		||||
        let resp_size = bincode::serialized_size(&resp).unwrap();
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            IP_ECHO_SERVER_RESPONSE_LENGTH,
 | 
			
		||||
            HEADER_LENGTH + resp_size as usize
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Asserts that an old client can parse the response from a new server.
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_backward_compat() {
 | 
			
		||||
        let address = IpAddr::from([
 | 
			
		||||
            525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16,
 | 
			
		||||
        ]);
 | 
			
		||||
        let response = IpEchoServerResponse {
 | 
			
		||||
            address,
 | 
			
		||||
            shred_version: Some(42),
 | 
			
		||||
        };
 | 
			
		||||
        let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
 | 
			
		||||
        bincode::serialize_into(&mut data[HEADER_LENGTH..], &response).unwrap();
 | 
			
		||||
        data.truncate(HEADER_LENGTH + 20);
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            bincode::deserialize::<IpAddr>(&data[HEADER_LENGTH..]).unwrap(),
 | 
			
		||||
            address
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Asserts that a new client can parse the response from an old server.
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_forward_compat() {
 | 
			
		||||
        let address = IpAddr::from([
 | 
			
		||||
            525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16,
 | 
			
		||||
        ]);
 | 
			
		||||
        let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
 | 
			
		||||
        bincode::serialize_into(&mut data[HEADER_LENGTH..], &address).unwrap();
 | 
			
		||||
        let response: Result<IpEchoServerResponse, _> =
 | 
			
		||||
            bincode::deserialize(&data[HEADER_LENGTH..]);
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            response.unwrap(),
 | 
			
		||||
            IpEchoServerResponse {
 | 
			
		||||
                address,
 | 
			
		||||
                shred_version: None,
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_parse_port_or_addr() {
 | 
			
		||||
        let p1 = parse_port_or_addr(Some("9000"), SocketAddr::from(([1, 2, 3, 4], 1)));
 | 
			
		||||
@@ -624,14 +679,14 @@ mod tests {
 | 
			
		||||
        let (_server_port, (server_udp_socket, server_tcp_listener)) =
 | 
			
		||||
            bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
 | 
			
		||||
 | 
			
		||||
        let _runtime = ip_echo_server(server_tcp_listener);
 | 
			
		||||
        let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(42));
 | 
			
		||||
 | 
			
		||||
        let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            get_public_ip_addr(&server_ip_echo_addr),
 | 
			
		||||
            parse_host("127.0.0.1"),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        assert_eq!(get_cluster_shred_version(&server_ip_echo_addr), Ok(42));
 | 
			
		||||
        assert!(verify_reachable_ports(&server_ip_echo_addr, vec![], &[],));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -644,14 +699,14 @@ mod tests {
 | 
			
		||||
        let (client_port, (client_udp_socket, client_tcp_listener)) =
 | 
			
		||||
            bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
 | 
			
		||||
 | 
			
		||||
        let _runtime = ip_echo_server(server_tcp_listener);
 | 
			
		||||
        let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(65535));
 | 
			
		||||
 | 
			
		||||
        let ip_echo_server_addr = server_udp_socket.local_addr().unwrap();
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            get_public_ip_addr(&ip_echo_server_addr),
 | 
			
		||||
            parse_host("127.0.0.1"),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        assert_eq!(get_cluster_shred_version(&ip_echo_server_addr), Ok(65535));
 | 
			
		||||
        assert!(verify_reachable_ports(
 | 
			
		||||
            &ip_echo_server_addr,
 | 
			
		||||
            vec![(client_port, client_tcp_listener)],
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user