diff --git a/Cargo.lock b/Cargo.lock index 7ccee814f2..66b89e5e3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4848,6 +4848,7 @@ dependencies = [ "socket2", "solana-clap-utils", "solana-logger 1.7.3", + "solana-sdk", "solana-version", "tokio 1.1.1", "url 2.2.0", diff --git a/core/src/validator.rs b/core/src/validator.rs index 8f2fe577c8..34e83d9795 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -570,9 +570,13 @@ impl Validator { *start_progress.write().unwrap() = ValidatorStartProgress::Halted; std::thread::park(); } - - let ip_echo_server = node.sockets.ip_echo.map(solana_net_utils::ip_echo_server); - + let ip_echo_server = match node.sockets.ip_echo { + None => None, + Some(tcp_listener) => Some(solana_net_utils::ip_echo_server( + tcp_listener, + Some(node.info.shred_version), + )), + }; let gossip_service = GossipService::new( &cluster_info, Some(bank_forks.clone()), diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index e06f2577cf..04042ef5c5 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -151,9 +151,8 @@ pub fn discover( if let Some(my_gossip_addr) = my_gossip_addr { info!("Gossip Address: {:?}", my_gossip_addr); } - - let _ip_echo_server = ip_echo.map(solana_net_utils::ip_echo_server); - + let _ip_echo_server = ip_echo + .map(|tcp_listener| solana_net_utils::ip_echo_server(tcp_listener, Some(my_shred_version))); let (met_criteria, elapsed, all_peers, tvu_peers) = spy( spy_ref.clone(), num_nodes, diff --git a/net-utils/Cargo.toml b/net-utils/Cargo.toml index b106996a54..003794e83d 100644 --- a/net-utils/Cargo.toml +++ b/net-utils/Cargo.toml @@ -20,6 +20,7 @@ serde_derive = "1.0.103" socket2 = "0.3.17" solana-clap-utils = { path = "../clap-utils", version = "=1.7.3" } solana-logger = { path = "../logger", version = "=1.7.3" } +solana-sdk = { path = "../sdk", version = "=1.7.3" } solana-version = { path = "../version", version = "=1.7.3" } tokio = { version = "1", features = ["full"] } url = "2.1.1" diff --git a/net-utils/src/bin/ip_address_server.rs b/net-utils/src/bin/ip_address_server.rs index d1b2e60d32..dbf55f8edb 100644 --- a/net-utils/src/bin/ip_address_server.rs +++ b/net-utils/src/bin/ip_address_server.rs @@ -19,7 +19,7 @@ fn main() { .unwrap_or_else(|_| panic!("Unable to parse {}", port)); let bind_addr = SocketAddr::from(([0, 0, 0, 0], port)); let tcp_listener = TcpListener::bind(bind_addr).expect("unable to start tcp listener"); - let _runtime = solana_net_utils::ip_echo_server(tcp_listener); + let _runtime = solana_net_utils::ip_echo_server(tcp_listener, /*shred_version=*/ None); loop { std::thread::park(); } diff --git a/net-utils/src/ip_echo_server.rs b/net-utils/src/ip_echo_server.rs index df46be3559..31f0b704dd 100644 --- a/net-utils/src/ip_echo_server.rs +++ b/net-utils/src/ip_echo_server.rs @@ -1,8 +1,13 @@ use { - crate::{ip_echo_server_reply_length, HEADER_LENGTH}, + crate::{HEADER_LENGTH, IP_ECHO_SERVER_RESPONSE_LENGTH}, log::*, serde_derive::{Deserialize, Serialize}, - std::{io, net::SocketAddr, time::Duration}, + solana_sdk::deserialize_utils::default_on_eof, + std::{ + io, + net::{IpAddr, SocketAddr}, + time::Duration, + }, tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, @@ -23,6 +28,15 @@ pub(crate) struct IpEchoServerMessage { udp_ports: [u16; MAX_PORT_COUNT_PER_MESSAGE], // Fixed size list of ports to avoid vec serde } +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct IpEchoServerResponse { + // Public IP address of request echoed back to the node. + pub(crate) address: IpAddr, + // Cluster shred-version of the node running the server. + #[serde(deserialize_with = "default_on_eof")] + pub(crate) shred_version: Option, +} + impl IpEchoServerMessage { pub fn new(tcp_ports: &[u16], udp_ports: &[u16]) -> Self { let mut msg = Self::default(); @@ -42,7 +56,11 @@ pub(crate) fn ip_echo_server_request_length() -> usize { + REQUEST_TERMINUS_LENGTH } -async fn process_connection(mut socket: TcpStream, peer_addr: SocketAddr) -> io::Result<()> { +async fn process_connection( + mut socket: TcpStream, + peer_addr: SocketAddr, + shred_version: Option, +) -> io::Result<()> { info!("connection from {:?}", peer_addr); let mut data = vec![0u8; ip_echo_server_request_length()]; @@ -113,16 +131,19 @@ async fn process_connection(mut socket: TcpStream, peer_addr: SocketAddr) -> io: let _ = tcp_stream.shutdown(); } } - + let response = IpEchoServerResponse { + address: peer_addr.ip(), + shred_version, + }; // "\0\0\0\0" header is added to ensure a valid response will never // conflict with the first four bytes of a valid HTTP response. - let mut bytes = vec![0u8; ip_echo_server_reply_length()]; - bincode::serialize_into(&mut bytes[HEADER_LENGTH..], &peer_addr.ip()).unwrap(); + let mut bytes = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH]; + bincode::serialize_into(&mut bytes[HEADER_LENGTH..], &response).unwrap(); trace!("response: {:?}", bytes); writer.write_all(&bytes).await } -async fn run_echo_server(tcp_listener: std::net::TcpListener) { +async fn run_echo_server(tcp_listener: std::net::TcpListener, shred_version: Option) { info!("bound to {:?}", tcp_listener.local_addr().unwrap()); let tcp_listener = TcpListener::from_std(tcp_listener).expect("Failed to convert std::TcpListener"); @@ -131,7 +152,7 @@ async fn run_echo_server(tcp_listener: std::net::TcpListener) { match tcp_listener.accept().await { Ok((socket, peer_addr)) => { runtime::Handle::current().spawn(async move { - if let Err(err) = process_connection(socket, peer_addr).await { + if let Err(err) = process_connection(socket, peer_addr, shred_version).await { info!("session failed: {:?}", err); } }); @@ -143,10 +164,14 @@ async fn run_echo_server(tcp_listener: std::net::TcpListener) { /// Starts a simple TCP server on the given port that echos the IP address of any peer that /// connects. Used by |get_public_ip_addr| -pub fn ip_echo_server(tcp_listener: std::net::TcpListener) -> IpEchoServer { +pub fn ip_echo_server( + tcp_listener: std::net::TcpListener, + // Cluster shred-version of the node running the server. + shred_version: Option, +) -> IpEchoServer { tcp_listener.set_nonblocking(true).unwrap(); let runtime = Runtime::new().expect("Failed to create Runtime"); - runtime.spawn(run_echo_server(tcp_listener)); + runtime.spawn(run_echo_server(tcp_listener, shred_version)); runtime } diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 6c66f3d7fd..55d702baeb 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -15,8 +15,8 @@ use { }; mod ip_echo_server; -use ip_echo_server::IpEchoServerMessage; pub use ip_echo_server::{ip_echo_server, IpEchoServer, MAX_PORT_COUNT_PER_MESSAGE}; +use ip_echo_server::{IpEchoServerMessage, IpEchoServerResponse}; /// A data type representing a public Udp socket pub struct UdpSocketPair { @@ -28,17 +28,12 @@ pub struct UdpSocketPair { pub type PortRange = (u16, u16); pub(crate) const HEADER_LENGTH: usize = 4; -pub(crate) fn ip_echo_server_reply_length() -> usize { - let largest_ip_addr = IpAddr::from([0u16; 8]); // IPv6 variant - HEADER_LENGTH + bincode::serialized_size(&largest_ip_addr).unwrap() as usize -} +pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23; fn ip_echo_server_request( ip_echo_server_addr: &SocketAddr, msg: IpEchoServerMessage, -) -> Result { - let mut data = vec![0u8; ip_echo_server_reply_length()]; - +) -> Result { let timeout = Duration::new(5, 0); TcpStream::connect_timeout(ip_echo_server_addr, timeout) .and_then(|mut stream| { @@ -54,9 +49,11 @@ fn ip_echo_server_request( stream.set_read_timeout(Some(Duration::new(10, 0)))?; stream.write_all(&bytes)?; stream.shutdown(std::net::Shutdown::Write)?; - stream.read(data.as_mut_slice()) + let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH]; + let _ = stream.read(&mut data[..])?; + Ok(data) }) - .and_then(|_| { + .and_then(|data| { // It's common for users to accidentally confuse the validator's gossip port and JSON // RPC port. Attempt to detect when this occurs by looking for the standard HTTP // response header and provide the user with a helpful error message @@ -102,7 +99,14 @@ fn ip_echo_server_request( /// Determine the public IP address of this machine by asking an ip_echo_server at the given /// address pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result { - ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default()) + let resp = ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())?; + Ok(resp.address) +} + +pub fn get_cluster_shred_version(ip_echo_server_addr: &SocketAddr) -> Result { + let resp = ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())?; + resp.shred_version + .ok_or_else(|| String::from("IP echo server does not return a shred-version")) } // Checks if any of the provided TCP/UDP ports are not reachable by the machine at @@ -524,6 +528,57 @@ mod tests { use super::*; use std::net::Ipv4Addr; + #[test] + fn test_response_length() { + let resp = IpEchoServerResponse { + address: IpAddr::from([u16::MAX; 8]), // IPv6 variant + shred_version: Some(u16::MAX), + }; + let resp_size = bincode::serialized_size(&resp).unwrap(); + assert_eq!( + IP_ECHO_SERVER_RESPONSE_LENGTH, + HEADER_LENGTH + resp_size as usize + ); + } + + // Asserts that an old client can parse the response from a new server. + #[test] + fn test_backward_compat() { + let address = IpAddr::from([ + 525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16, + ]); + let response = IpEchoServerResponse { + address, + shred_version: Some(42), + }; + let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH]; + bincode::serialize_into(&mut data[HEADER_LENGTH..], &response).unwrap(); + data.truncate(HEADER_LENGTH + 20); + assert_eq!( + bincode::deserialize::(&data[HEADER_LENGTH..]).unwrap(), + address + ); + } + + // Asserts that a new client can parse the response from an old server. + #[test] + fn test_forward_compat() { + let address = IpAddr::from([ + 525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16, + ]); + let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH]; + bincode::serialize_into(&mut data[HEADER_LENGTH..], &address).unwrap(); + let response: Result = + bincode::deserialize(&data[HEADER_LENGTH..]); + assert_eq!( + response.unwrap(), + IpEchoServerResponse { + address, + shred_version: None, + } + ); + } + #[test] fn test_parse_port_or_addr() { let p1 = parse_port_or_addr(Some("9000"), SocketAddr::from(([1, 2, 3, 4], 1))); @@ -624,14 +679,14 @@ mod tests { let (_server_port, (server_udp_socket, server_tcp_listener)) = bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); - let _runtime = ip_echo_server(server_tcp_listener); + let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(42)); let server_ip_echo_addr = server_udp_socket.local_addr().unwrap(); assert_eq!( get_public_ip_addr(&server_ip_echo_addr), parse_host("127.0.0.1"), ); - + assert_eq!(get_cluster_shred_version(&server_ip_echo_addr), Ok(42)); assert!(verify_reachable_ports(&server_ip_echo_addr, vec![], &[],)); } @@ -644,14 +699,14 @@ mod tests { let (client_port, (client_udp_socket, client_tcp_listener)) = bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); - let _runtime = ip_echo_server(server_tcp_listener); + let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(65535)); let ip_echo_server_addr = server_udp_socket.local_addr().unwrap(); assert_eq!( get_public_ip_addr(&ip_echo_server_addr), parse_host("127.0.0.1"), ); - + assert_eq!(get_cluster_shred_version(&ip_echo_server_addr), Ok(65535)); assert!(verify_reachable_ports( &ip_echo_server_addr, vec![(client_port, client_tcp_listener)], diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 2dfff193bd..7eb5de1741 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -3334,6 +3334,7 @@ dependencies = [ "socket2", "solana-clap-utils", "solana-logger 1.7.3", + "solana-sdk", "solana-version", "tokio 1.4.0", "url", diff --git a/validator/src/main.rs b/validator/src/main.rs index cd17204227..b740e7e62f 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -991,6 +991,28 @@ fn rpc_bootstrap( } } +fn get_cluster_shred_version(entrypoints: &[SocketAddr]) -> Option { + let entrypoints = { + let mut index: Vec<_> = (0..entrypoints.len()).collect(); + index.shuffle(&mut rand::thread_rng()); + index.into_iter().map(|i| &entrypoints[i]) + }; + for entrypoint in entrypoints { + match solana_net_utils::get_cluster_shred_version(entrypoint) { + Err(err) => eprintln!("get_cluster_shred_version failed: {}, {}", entrypoint, err), + Ok(0) => eprintln!("zero sherd-version from entrypoint: {}", entrypoint), + Ok(shred_version) => { + info!( + "obtained shred-version {} from {}", + shred_version, entrypoint + ); + return Some(shred_version); + } + } + } + None +} + pub fn main() { let default_dynamic_port_range = &format!("{}-{}", VALIDATOR_PORT_RANGE.0, VALIDATOR_PORT_RANGE.1); @@ -2133,6 +2155,25 @@ pub fn main() { } else { AccountShrinkThreshold::IndividalStore { shrink_ratio } }; + let entrypoint_addrs = values_t!(matches, "entrypoint", String) + .unwrap_or_default() + .into_iter() + .map(|entrypoint| { + solana_net_utils::parse_host_port(&entrypoint).unwrap_or_else(|e| { + eprintln!("failed to parse entrypoint address: {}", e); + exit(1); + }) + }) + .collect::>() + .into_iter() + .collect::>(); + // TODO: Once entrypoints are updated to return shred-version, this should + // abort if it fails to obtain a shred-version, so that nodes always join + // gossip with a valid shred-version. The code to adopt entrypoint shred + // version can then be deleted from gossip and get_rpc_node above. + let expected_shred_version = value_t!(matches, "expected_shred_version", u16) + .ok() + .or_else(|| get_cluster_shred_version(&entrypoint_addrs)); let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), @@ -2145,7 +2186,7 @@ pub fn main() { expected_bank_hash: matches .value_of("expected_bank_hash") .map(|s| Hash::from_str(s).unwrap()), - expected_shred_version: value_t!(matches, "expected_shred_version", u16).ok(), + expected_shred_version, new_hard_forks: hardforks_of(&matches, "hard_forks"), rpc_config: JsonRpcConfig { enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"), @@ -2396,19 +2437,6 @@ pub fn main() { validator_config.halt_on_trusted_validators_accounts_hash_mismatch = true; } - let entrypoint_addrs = values_t!(matches, "entrypoint", String) - .unwrap_or_default() - .into_iter() - .map(|entrypoint| { - solana_net_utils::parse_host_port(&entrypoint).unwrap_or_else(|e| { - eprintln!("failed to parse entrypoint address: {}", e); - exit(1); - }) - }) - .collect::>() - .into_iter() - .collect::>(); - let public_rpc_addr = matches.value_of("public_rpc_addr").map(|addr| { solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| { eprintln!("failed to parse public rpc address: {}", e);