diff --git a/src/fullnode.rs b/src/fullnode.rs index f30a83a802..4feddd3b2d 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::Result; use tpu::{Tpu, TpuReturnType}; +use tpu_forwarder::TpuForwarder; use tvu::{Tvu, TvuReturnType}; use untrusted::Input; use window::{new_window, SharedWindow}; @@ -56,15 +57,18 @@ impl LeaderServices { pub struct ValidatorServices { tvu: Tvu, + tpu_forwarder: TpuForwarder, } impl ValidatorServices { - fn new(tvu: Tvu) -> Self { - ValidatorServices { tvu } + fn new(tvu: Tvu, tpu_forwarder: TpuForwarder) -> Self { + ValidatorServices { tvu, tpu_forwarder } } pub fn join(self) -> Result> { - self.tvu.join() + let ret = self.tvu.join(); // TVU calls the shots, we wait for it to shut down + self.tpu_forwarder.join()?; + ret } pub fn is_exited(&self) -> bool { @@ -279,7 +283,16 @@ impl Fullnode { .expect("Failed to clone retransmit socket"), Some(ledger_path), ); - let validator_state = ValidatorServices::new(tvu); + let tpu_forwarder = TpuForwarder::new( + node.sockets + .transaction + .iter() + .map(|s| s.try_clone().expect("Failed to clone transaction sockets")) + .collect(), + cluster_info.clone(), + ); + + let validator_state = ValidatorServices::new(tvu, tpu_forwarder); Some(NodeRole::Validator(validator_state)) } else { let max_tick_height = { @@ -423,7 +436,15 @@ impl Fullnode { .expect("Failed to clone retransmit socket"), Some(&self.ledger_path), ); - let validator_state = ValidatorServices::new(tvu); + let tpu_forwarder = TpuForwarder::new( + self.transaction_sockets + .iter() + .map(|s| s.try_clone().expect("Failed to clone transaction sockets")) + .collect(), + self.cluster_info.clone(), + ); + + let validator_state = ValidatorServices::new(tvu, tpu_forwarder); self.node_role = Some(NodeRole::Validator(validator_state)); Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index b8d156c2b9..1be37472a9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -77,6 +77,7 @@ pub mod system_transaction; pub mod thin_client; pub mod token_program; pub mod tpu; +pub mod tpu_forwarder; pub mod transaction; pub mod tvu; pub mod vote_program; diff --git a/src/tpu_forwarder.rs b/src/tpu_forwarder.rs new file mode 100644 index 0000000000..9c552046a2 --- /dev/null +++ b/src/tpu_forwarder.rs @@ -0,0 +1,197 @@ +//! The `tpu_forwarder` module implements a validator's +//! transaction processing unit responsibility, which +//! forwards received packets to the current leader + +use cluster_info::ClusterInfo; +use contact_info::ContactInfo; +use counter::Counter; +use log::Level; +use result::Result; +use service::Service; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::mpsc::channel; +use std::sync::{Arc, RwLock}; +use std::thread::{self, Builder, JoinHandle}; +use streamer::{self, PacketReceiver}; + +pub struct TpuForwarder { + exit: Arc, + thread_hdls: Vec>, +} + +impl TpuForwarder { + fn forward(receiver: &PacketReceiver, cluster_info: &Arc>) -> Result<()> { + let socket = UdpSocket::bind("0.0.0.0:0")?; + + let my_id = cluster_info + .read() + .expect("cluster_info.read() in TpuForwarder::forward()") + .id(); + + loop { + let msgs = receiver.recv()?; + + inc_new_counter_info!( + "tpu_forwarder-msgs_received", + msgs.read().unwrap().packets.len() + ); + + if let Some(leader_data) = cluster_info + .read() + .expect("cluster_info.read() in TpuForwarder::forward()") + .leader_data() + .cloned() + { + if leader_data.id == my_id || !ContactInfo::is_valid_address(&leader_data.tpu) { + // weird cases, but we don't want to broadcast, send to ANY, or + // induce an infinite loop, but this shouldn't happen, or shouldn't be true for long... + continue; + } + + for m in msgs.write().unwrap().packets.iter_mut() { + m.meta.set_addr(&leader_data.tpu); + } + msgs.read().unwrap().send_to(&socket)? + } + } + } + + pub fn new(sockets: Vec, cluster_info: Arc>) -> Self { + let exit = Arc::new(AtomicBool::new(false)); + let (sender, receiver) = channel(); + + let mut thread_hdls: Vec<_> = sockets + .into_iter() + .map(|socket| { + streamer::receiver( + Arc::new(socket), + exit.clone(), + sender.clone(), + "tpu-forwarder", + ) + }).collect(); + + let thread_hdl = Builder::new() + .name("solana-tpu_forwarder".to_string()) + .spawn(move || { + let _ignored = Self::forward(&receiver, &cluster_info); + () + }).unwrap(); + + thread_hdls.push(thread_hdl); + + TpuForwarder { exit, thread_hdls } + } + + pub fn close(&self) { + self.exit.store(true, Ordering::Relaxed); + } +} + +impl Service for TpuForwarder { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + self.close(); + for thread_hdl in self.thread_hdls { + thread_hdl.join()?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use cluster_info::ClusterInfo; + use contact_info::ContactInfo; + use netutil::bind_in_range; + use solana_sdk::pubkey::Pubkey; + use std::net::UdpSocket; + use std::net::{Ipv4Addr, SocketAddr}; + use std::thread::sleep; + use std::time::Duration; + + #[test] + pub fn test_tpu_forwarder() { + let nodes: Vec<_> = (0..3) + .map(|_| { + let (port, s) = bind_in_range((8000, 10000)).unwrap(); + s.set_nonblocking(true).unwrap(); + ( + s, + ContactInfo::new_with_socketaddr(&socketaddr!([127, 0, 0, 1], port)), + ) + }).collect(); + + let mut cluster_info = ClusterInfo::new(nodes[0].1.clone()); + + cluster_info.insert_info(nodes[1].1.clone()); + cluster_info.insert_info(nodes[2].1.clone()); + cluster_info.insert_info(Default::default()); + + let cluster_info = Arc::new(RwLock::new(cluster_info)); + + let (transaction_port, transaction_socket) = bind_in_range((8000, 10000)).unwrap(); + let transaction_addr = socketaddr!([127, 0, 0, 1], transaction_port); + + let tpu_forwarder = TpuForwarder::new(vec![transaction_socket], cluster_info.clone()); + + let test_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + + // no leader set in cluster_info, drop the "transaction" + test_socket + .send_to(b"alice pays bob", &transaction_addr) + .unwrap(); + sleep(Duration::from_millis(100)); + + let mut data = vec![0u8; 64]; + // should be nothing on any socket + assert!(nodes[0].0.recv_from(&mut data).is_err()); + assert!(nodes[1].0.recv_from(&mut data).is_err()); + assert!(nodes[2].0.recv_from(&mut data).is_err()); + + // set leader to host with no tpu + cluster_info.write().unwrap().set_leader(Pubkey::default()); + test_socket + .send_to(b"alice pays bart", &transaction_addr) + .unwrap(); + sleep(Duration::from_millis(100)); + + let mut data = vec![0u8; 64]; + // should be nothing on any socket ncp + assert!(nodes[0].0.recv_from(&mut data).is_err()); + assert!(nodes[1].0.recv_from(&mut data).is_err()); + assert!(nodes[2].0.recv_from(&mut data).is_err()); + + cluster_info.write().unwrap().set_leader(nodes[0].1.id); // set leader to myself, bytes get dropped :-( + + test_socket + .send_to(b"alice pays bill", &transaction_addr) + .unwrap(); + sleep(Duration::from_millis(100)); + + // should *still* be nothing on any socket + assert!(nodes[0].0.recv_from(&mut data).is_err()); + assert!(nodes[1].0.recv_from(&mut data).is_err()); + assert!(nodes[2].0.recv_from(&mut data).is_err()); + + cluster_info.write().unwrap().set_leader(nodes[1].1.id); // set leader to node[1] + + test_socket + .send_to(b"alice pays chuck", &transaction_addr) + .unwrap(); + sleep(Duration::from_millis(100)); + + // should only be data on node[1]'s socket + assert!(nodes[0].0.recv_from(&mut data).is_err()); + assert!(nodes[2].0.recv_from(&mut data).is_err()); + + assert!(nodes[1].0.recv_from(&mut data).is_ok()); + assert_eq!(&data[..b"alice pays chuck".len()], b"alice pays chuck"); + + assert!(tpu_forwarder.join().is_ok()); + } + +}