diff --git a/src/tpu_forwarder.rs b/src/tpu_forwarder.rs index 45e84119eb..5e2796aa15 100644 --- a/src/tpu_forwarder.rs +++ b/src/tpu_forwarder.rs @@ -5,11 +5,15 @@ use crate::cluster_info::ClusterInfo; use crate::contact_info::ContactInfo; use crate::counter::Counter; +use crate::packet::Packets; use crate::result::Result; use crate::service::Service; use crate::streamer::{self, PacketReceiver}; use log::Level; +use solana_sdk::pubkey::Pubkey; +use std::error::Error; use std::net::UdpSocket; +use std::result; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -36,24 +40,37 @@ impl TpuForwarder { "tpu_forwarder-msgs_received", msgs.read().unwrap().packets.len() ); - - if let Some(leader_data) = cluster_info + let leader_data = cluster_info .read() .expect("cluster_info.read() in TpuForwarder::forward()") .leader_data() - .cloned() - { - if leader_data.id == my_id || !ContactInfo::is_valid_address(&leader_data.tpu) { + .cloned(); + match TpuForwarder::update_addrs(leader_data, &my_id, &msgs.clone()) { + Ok(_) => msgs.read().unwrap().send_to(&socket)?, + Err(_) => continue, + } + } + } + + fn update_addrs( + leader_data: Option, + my_id: &Pubkey, + msgs: &Arc>, + ) -> result::Result<(), Box> { + match leader_data { + Some(leader_data) => { + if leader_data.id == *my_id || !ContactInfo::is_valid_address(&leader_data.tpu) { // weird cases, but we don't want to broadcast, send to ANY, or // induce an infinite loop, but this shouldn't happen, or shouldn't be true for long... - continue; + return Err("Invalid leader addr")?; } for m in msgs.write().unwrap().packets.iter_mut() { m.meta.set_addr(&leader_data.tpu); } - msgs.read().unwrap().send_to(&socket)? + Ok(()) } + _ => Err("No leader contact data")?, } } @@ -105,96 +122,52 @@ impl Service for TpuForwarder { #[cfg(test)] mod tests { use super::*; - use crate::cluster_info::ClusterInfo; use crate::contact_info::ContactInfo; + use crate::packet::Packet; use solana_netutil::bind_in_range; - use solana_sdk::pubkey::Pubkey; - use std::net::UdpSocket; + use solana_sdk::signature::{Keypair, KeypairUtil}; use std::net::{Ipv4Addr, SocketAddr}; - use std::thread::sleep; - use std::time::Duration; #[test] - #[ignore] - pub fn test_tpu_forwarder() { - let nodes: Vec<_> = (0..3) - .map(|_| { - let (port, s) = bind_in_range((8000, 10000)).unwrap(); - s.set_nonblocking(true).unwrap(); - ( - s, - ContactInfo::new_with_socketaddr(&socketaddr!([127, 0, 0, 1], port)), - ) - }) - .collect(); - - let mut cluster_info = ClusterInfo::new(nodes[0].1.clone()); - - cluster_info.insert_info(nodes[1].1.clone()); - cluster_info.insert_info(nodes[2].1.clone()); - cluster_info.insert_info(Default::default()); - - let cluster_info = Arc::new(RwLock::new(cluster_info)); - - let (transaction_port, transaction_socket) = bind_in_range((8000, 10000)).unwrap(); - let transaction_addr = socketaddr!([127, 0, 0, 1], transaction_port); - - let tpu_forwarder = TpuForwarder::new(vec![transaction_socket], cluster_info.clone()); - - let test_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - - // no leader set in cluster_info, drop the "transaction" - test_socket - .send_to(b"alice pays bob", &transaction_addr) - .unwrap(); - sleep(Duration::from_millis(100)); - - let mut data = vec![0u8; 64]; - // should be nothing on any socket - assert!(nodes[0].0.recv_from(&mut data).is_err()); - assert!(nodes[1].0.recv_from(&mut data).is_err()); - assert!(nodes[2].0.recv_from(&mut data).is_err()); - - // set leader to host with no tpu - cluster_info.write().unwrap().set_leader(Pubkey::default()); - test_socket - .send_to(b"alice pays bart", &transaction_addr) - .unwrap(); - sleep(Duration::from_millis(100)); - - let mut data = vec![0u8; 64]; - // should be nothing on any gossip socket - assert!(nodes[0].0.recv_from(&mut data).is_err()); - assert!(nodes[1].0.recv_from(&mut data).is_err()); - assert!(nodes[2].0.recv_from(&mut data).is_err()); - - cluster_info.write().unwrap().set_leader(nodes[0].1.id); // set leader to myself, bytes get dropped :-( - - test_socket - .send_to(b"alice pays bill", &transaction_addr) - .unwrap(); - sleep(Duration::from_millis(100)); - - // should *still* be nothing on any socket - assert!(nodes[0].0.recv_from(&mut data).is_err()); - assert!(nodes[1].0.recv_from(&mut data).is_err()); - assert!(nodes[2].0.recv_from(&mut data).is_err()); - - cluster_info.write().unwrap().set_leader(nodes[1].1.id); // set leader to node[1] - - test_socket - .send_to(b"alice pays chuck", &transaction_addr) - .unwrap(); - sleep(Duration::from_millis(100)); - - // should only be data on node[1]'s socket - assert!(nodes[0].0.recv_from(&mut data).is_err()); - assert!(nodes[2].0.recv_from(&mut data).is_err()); - - assert!(nodes[1].0.recv_from(&mut data).is_ok()); - assert_eq!(&data[..b"alice pays chuck".len()], b"alice pays chuck"); - - assert!(tpu_forwarder.join().is_ok()); + pub fn test_update_addrs() { + let keypair = Keypair::new(); + let my_id = keypair.pubkey(); + // test with no pubkey + assert!(!TpuForwarder::update_addrs( + None, + &my_id, + &Arc::new(RwLock::new(Packets::default())) + ) + .is_ok()); + // test with no tpu + assert!(!TpuForwarder::update_addrs( + Some(ContactInfo::default()), + &my_id, + &Arc::new(RwLock::new(Packets::default())) + ) + .is_ok()); + // test with my pubkey + let leader_data = ContactInfo::new_localhost(my_id, 0); + assert!(!TpuForwarder::update_addrs( + Some(leader_data), + &my_id, + &Arc::new(RwLock::new(Packets::default())) + ) + .is_ok()); + // test that the address is actually being updated + let (port, _) = bind_in_range((8000, 10000)).unwrap(); + let leader_data = ContactInfo::new_with_socketaddr(&socketaddr!([127, 0, 0, 1], port)); + let packet = Packet::default(); + let p = Packets { + packets: vec![packet], + }; + let msgs = Arc::new(RwLock::new(p)); + assert!( + TpuForwarder::update_addrs(Some(leader_data.clone()), &my_id, &msgs.clone()).is_ok() + ); + assert_eq!( + SocketAddr::from(msgs.read().unwrap().packets[0].meta.addr()), + leader_data.tpu + ); } - }