From c4a5442146d235ee657a053f1c746e5c0d2f8c67 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Wed, 4 Sep 2019 23:10:35 -0700 Subject: [PATCH] Confirm validator ports are reachable by the entrypoint at startup (#5795) --- Cargo.lock | 4 ++ netutil/Cargo.toml | 4 ++ netutil/src/ip_echo_server.rs | 124 ++++++++++++++++++++++++++++++---- netutil/src/lib.rs | 109 ++++++++++++++++++++++++++++-- validator/src/main.rs | 54 +++++++++------ 5 files changed, 255 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e58dee64ec..cd33bd5cac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3621,13 +3621,17 @@ name = "solana-netutil" version = "0.19.0-pre0" dependencies = [ "bincode 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "nix 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", "socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)", "solana-logger 0.19.0-pre0", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/netutil/Cargo.toml b/netutil/Cargo.toml index 631eb4586c..2782e05931 100644 --- a/netutil/Cargo.toml +++ b/netutil/Cargo.toml @@ -10,13 +10,17 @@ edition = "2018" [dependencies] bincode = "1.1.4" +bytes = "0.4" clap = "2.33.0" log = "0.4.8" nix = "0.15.0" rand = "0.6.1" +serde = "1.0.99" +serde_derive = "1.0.99" socket2 = "0.3.11" solana-logger = { path = "../logger", version = "0.19.0-pre0" } tokio = "0.1" +tokio-codec = "0.1" [lib] name = "solana_netutil" diff --git a/netutil/src/ip_echo_server.rs b/netutil/src/ip_echo_server.rs index 36189bfcc4..5d7ff101bc 100644 --- a/netutil/src/ip_echo_server.rs +++ b/netutil/src/ip_echo_server.rs @@ -1,12 +1,35 @@ +use bytes::Bytes; use log::*; +use serde_derive::{Deserialize, Serialize}; +use std::io; use std::net::SocketAddr; +use std::time::Duration; use tokio; use tokio::net::TcpListener; -use tokio::prelude::{Future, Stream}; +use tokio::prelude::*; use tokio::runtime::Runtime; +use tokio_codec::{BytesCodec, Decoder}; pub type IpEchoServer = Runtime; +#[derive(Serialize, Deserialize, Default)] +pub(crate) struct IpEchoServerMessage { + tcp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde + udp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde +} + +impl IpEchoServerMessage { + pub fn new(tcp_ports: &[u16], udp_ports: &[u16]) -> Self { + let mut msg = Self::default(); + assert!(tcp_ports.len() <= msg.tcp_ports.len()); + assert!(udp_ports.len() <= msg.udp_ports.len()); + + msg.tcp_ports[..tcp_ports.len()].copy_from_slice(tcp_ports); + msg.udp_ports[..udp_ports.len()].copy_from_slice(udp_ports); + msg + } +} + /// Starts a simple TCP server on the given port that echos the IP address of any peer that /// connects. Used by |get_public_ip_addr| pub fn ip_echo_server(port: u16) -> IpEchoServer { @@ -19,23 +42,96 @@ pub fn ip_echo_server(port: u16) -> IpEchoServer { .incoming() .map_err(|err| warn!("accept failed: {:?}", err)) .for_each(move |socket| { - let ip = socket - .peer_addr() - .and_then(|peer_addr| { - bincode::serialize(&peer_addr.ip()).map_err(|err| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("Failed to serialize: {:?}", err), - ) + let ip = socket.peer_addr().expect("Expect peer_addr()").ip(); + info!("connection from {:?}", ip); + + let framed = BytesCodec::new().framed(socket); + let (writer, reader) = framed.split(); + + let processor = reader + .and_then(move |bytes| { + bincode::deserialize::(&bytes).or_else(|err| { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Failed to deserialize IpEchoServerMessage: {:?}", err), + )) }) }) - .unwrap_or_else(|_| vec![]); + .and_then(move |msg| { + // Fire a datagram at each non-zero UDP port + if !msg.udp_ports.is_empty() { + match std::net::UdpSocket::bind("0.0.0.0:0") { + Ok(udp_socket) => { + for udp_port in &msg.udp_ports { + if *udp_port != 0 { + match udp_socket + .send_to(&[0], SocketAddr::from((ip, *udp_port))) + { + Ok(_) => debug!("Successful send_to udp/{}", udp_port), + Err(err) => { + info!("Failed to send_to udp/{}: {}", udp_port, err) + } + } + } + } + } + Err(err) => { + warn!("Failed to bind local udp socket: {}", err); + } + } + } - let write_future = tokio::io::write_all(socket, ip) - .map_err(|err| warn!("write error: {:?}", err)) - .map(|_| ()); + // Try to connect to each non-zero TCP port + let tcp_futures: Vec<_> = msg + .tcp_ports + .iter() + .filter_map(|tcp_port| { + let tcp_port = *tcp_port; + if tcp_port == 0 { + None + } else { + Some( + tokio::net::TcpStream::connect(&SocketAddr::new(ip, tcp_port)) + .and_then(move |tcp_stream| { + debug!("Connection established to tcp/{}", tcp_port); + let _ = tcp_stream.shutdown(std::net::Shutdown::Both); + Ok(()) + }) + .timeout(Duration::from_secs(5)) + .or_else(move |err| { + Err(io::Error::new( + io::ErrorKind::Other, + format!( + "Connection timeout to {}: {:?}", + tcp_port, err + ), + )) + }), + ) + } + }) + .collect(); + future::join_all(tcp_futures) + }) + .and_then(move |_| { + let ip = bincode::serialize(&ip).unwrap_or_else(|err| { + warn!("Failed to serialize: {:?}", err); + vec![] + }); + Ok(Bytes::from(ip)) + }); - tokio::spawn(write_future) + let connection = writer + .send_all(processor) + .timeout(Duration::from_secs(5)) + .then(|result| { + if let Err(err) = result { + info!("Session failed: {:?}", err); + } + Ok(()) + }); + + tokio::spawn(connection) }); let mut rt = Runtime::new().expect("Failed to create Runtime"); diff --git a/netutil/src/lib.rs b/netutil/src/lib.rs index 67851b73a3..771d997983 100644 --- a/netutil/src/lib.rs +++ b/netutil/src/lib.rs @@ -2,13 +2,14 @@ use log::*; use rand::{thread_rng, Rng}; use socket2::{Domain, SockAddr, Socket, Type}; -use std::io; -use std::io::Read; +use std::io::{self, Read, Write}; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket}; +use std::sync::mpsc::channel; use std::time::Duration; mod ip_echo_server; -pub use ip_echo_server::*; +use ip_echo_server::IpEchoServerMessage; +pub use ip_echo_server::{ip_echo_server, IpEchoServer}; /// A data type representing a public Udp socket pub struct UdpSocketPair { @@ -19,14 +20,18 @@ pub struct UdpSocketPair { pub type PortRange = (u16, u16); -/// Determine the public IP address of this machine by asking an ip_echo_server at the given -/// address -pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result { +fn ip_echo_server_request( + ip_echo_server_addr: &SocketAddr, + msg: IpEchoServerMessage, +) -> Result { let mut data = Vec::new(); let timeout = Duration::new(5, 0); TcpStream::connect_timeout(ip_echo_server_addr, timeout) .and_then(|mut stream| { + let msg = bincode::serialize(&msg).expect("serialize IpEchoServerMessage"); + stream.write_all(&msg)?; + stream.shutdown(std::net::Shutdown::Write)?; stream .set_read_timeout(Some(Duration::new(10, 0))) .expect("set_read_timeout"); @@ -43,6 +48,98 @@ pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result Result { + ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default()) +} + +// Aborts the process if any of the provided TCP/UDP ports are not reachable by the machine at +// `ip_echo_server_addr` +pub fn verify_reachable_ports( + ip_echo_server_addr: &SocketAddr, + tcp_ports: &[u16], + udp_sockets: &[&UdpSocket], +) { + let tcp: Vec<(_, _)> = tcp_ports + .iter() + .map(|port| { + ( + port, + TcpListener::bind(&SocketAddr::from(([0, 0, 0, 0], *port))).unwrap_or_else(|err| { + error!("Unable to bind to tcp/{}: {}", port, err); + std::process::exit(1); + }), + ) + }) + .collect(); + + let udp: Vec<(_, _)> = udp_sockets + .iter() + .map(|udp_socket| { + ( + udp_socket.local_addr().unwrap().port(), + udp_socket.try_clone().expect("Unable to clone udp socket"), + ) + }) + .collect(); + + let udp_ports: Vec<_> = udp.iter().map(|x| x.0).collect(); + + info!( + "Checking that tcp ports {:?} and udp ports {:?} are reachable from {:?}", + tcp_ports, udp_ports, ip_echo_server_addr + ); + + let _ = ip_echo_server_request( + ip_echo_server_addr, + IpEchoServerMessage::new(&tcp_ports, &udp_ports), + ) + .map_err(|err| warn!("ip_echo_server request failed: {}", err)); + + // Wait for a connection to open on each TCP port + for (port, tcp_listener) in tcp { + let (sender, receiver) = channel(); + let port = *port; + std::thread::spawn(move || { + debug!("Waiting for incoming connection on tcp/{}", port); + let _ = tcp_listener.incoming().next().expect("tcp incoming failed"); + sender.send(()).expect("send failure"); + }); + receiver + .recv_timeout(Duration::from_secs(5)) + .unwrap_or_else(|err| { + error!( + "Received no response at tcp/{}, check your port configuration: {}", + port, err + ); + std::process::exit(1); + }); + info!("tdp/{} is reachable", port); + } + + // Wait for a datagram to arrive at each UDP port + for (port, udp_socket) in udp { + let (sender, receiver) = channel(); + std::thread::spawn(move || { + let mut buf = [0; 1]; + debug!("Waiting for incoming datagram on udp/{}", port); + let _ = udp_socket.recv(&mut buf).expect("udp recv failure"); + sender.send(()).expect("send failure"); + }); + receiver + .recv_timeout(Duration::from_secs(5)) + .unwrap_or_else(|err| { + error!( + "Received no response at udp/{}, check your port configuration: {}", + port, err + ); + std::process::exit(1); + }); + info!("udp/{} is reachable", port); + } +} + pub fn parse_port_or_addr(optstr: Option<&str>, default_addr: SocketAddr) -> SocketAddr { if let Some(addrstr) = optstr { if let Ok(port) = addrstr.parse() { diff --git a/validator/src/main.rs b/validator/src/main.rs index 7e68fd613a..717f1e3958 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -12,7 +12,6 @@ use solana_core::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS; use solana_core::service::Service; use solana_core::socketaddr; use solana_core::validator::{Validator, ValidatorConfig}; -use solana_netutil::parse_port_range; use solana_sdk::hash::Hash; use solana_sdk::signature::{read_keypair, Keypair, KeypairUtil}; use solana_sdk::timing::Slot; @@ -25,7 +24,7 @@ use std::sync::Arc; use std::time::Instant; fn port_range_validator(port_range: String) -> Result<(), String> { - if parse_port_range(&port_range).is_some() { + if solana_netutil::parse_port_range(&port_range).is_some() { Ok(()) } else { Err("Invalid port range".to_string()) @@ -155,7 +154,6 @@ fn download_tar_bz2( fn initialize_ledger_path( entrypoint: &ContactInfo, - gossip_addr: &SocketAddr, ledger_path: &Path, no_snapshot_fetch: bool, ) -> Result { @@ -165,7 +163,7 @@ fn initialize_ledger_path( Some(60), None, Some(entrypoint.gossip.ip()), - Some(&gossip_addr), + None, ) .map_err(|err| err.to_string())?; @@ -443,8 +441,9 @@ fn main() { solana_netutil::parse_host_port(address).expect("failed to parse drone address") }); - let dynamic_port_range = parse_port_range(matches.value_of("dynamic_port_range").unwrap()) - .expect("invalid dynamic_port_range"); + let dynamic_port_range = + solana_netutil::parse_port_range(matches.value_of("dynamic_port_range").unwrap()) + .expect("invalid dynamic_port_range"); let mut gossip_addr = solana_netutil::parse_port_or_addr( matches.value_of("gossip_port"), @@ -528,10 +527,36 @@ fn main() { ); solana_metrics::set_host_id(keypair.pubkey().to_string()); - if let Some(ref entrypoint_addr) = cluster_entrypoint { + let mut tcp_ports = vec![gossip_addr.port()]; + + let mut node = Node::new_with_external_ip(&keypair.pubkey(), &gossip_addr, dynamic_port_range); + if let Some(port) = matches.value_of("rpc_port") { + let port_number = port.to_string().parse().expect("integer"); + if port_number == 0 { + eprintln!("Invalid RPC port requested: {:?}", port); + exit(1); + } + node.info.rpc = SocketAddr::new(gossip_addr.ip(), port_number); + node.info.rpc_pubsub = SocketAddr::new(gossip_addr.ip(), port_number + 1); + tcp_ports.extend_from_slice(&[port_number, port_number + 1]); + }; + + if let Some(ref cluster_entrypoint) = cluster_entrypoint { + let udp_sockets = [ + &node.sockets.gossip, + &node.sockets.broadcast, + &node.sockets.repair, + &node.sockets.retransmit, + ]; + + solana_netutil::verify_reachable_ports( + &cluster_entrypoint.gossip, + &tcp_ports, + &udp_sockets, + ); + let expected_genesis_blockhash = initialize_ledger_path( - entrypoint_addr, - &gossip_addr, + cluster_entrypoint, &ledger_path, matches.is_present("no_snapshot_fetch"), ) @@ -551,17 +576,6 @@ fn main() { } } - let mut node = Node::new_with_external_ip(&keypair.pubkey(), &gossip_addr, dynamic_port_range); - if let Some(port) = matches.value_of("rpc_port") { - let port_number = port.to_string().parse().expect("integer"); - if port_number == 0 { - eprintln!("Invalid RPC port requested: {:?}", port); - exit(1); - } - node.info.rpc = SocketAddr::new(gossip_addr.ip(), port_number); - node.info.rpc_pubsub = SocketAddr::new(gossip_addr.ip(), port_number + 1); - }; - let validator = Validator::new( node, &Arc::new(keypair),