diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index d1d4e0893e..74880c1367 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1272,7 +1272,17 @@ impl ClusterInfo { let (_, gossip_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap(); let daddr = socketaddr_any!(); - let node = ContactInfo::new(id, daddr, daddr, daddr, daddr, daddr, daddr, timestamp()); + let node = ContactInfo::new( + *id, + daddr, + daddr, + daddr, + daddr, + daddr, + daddr, + daddr, + timestamp(), + ); (node, gossip_socket) } } @@ -1331,6 +1341,7 @@ pub struct Sockets { pub gossip: UdpSocket, pub tvu: Vec, pub tpu: Vec, + pub forwarder: Vec, pub broadcast: UdpSocket, pub repair: UdpSocket, pub retransmit: UdpSocket, @@ -1351,6 +1362,7 @@ impl Node { let tpu = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let tvu = UdpSocket::bind("127.0.0.1:0").unwrap(); + let forwarder = UdpSocket::bind("127.0.0.1:0").unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let rpc_port = find_available_port_in_range((1024, 65535)).unwrap(); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port); @@ -1366,6 +1378,7 @@ impl Node { gossip.local_addr().unwrap(), tvu.local_addr().unwrap(), tpu.local_addr().unwrap(), + forwarder.local_addr().unwrap(), storage.local_addr().unwrap(), rpc_addr, rpc_pubsub_addr, @@ -1377,6 +1390,7 @@ impl Node { gossip, tvu: vec![tvu], tpu: vec![tpu], + forwarder: vec![forwarder], broadcast, repair, retransmit, @@ -1405,6 +1419,9 @@ impl Node { let (tpu_port, tpu_sockets) = multi_bind_in_range(FULLNODE_PORT_RANGE, 32).expect("tpu multi_bind"); + let (forwarder_port, forwarder_sockets) = + multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tpu multi_bind"); + let (_, repair) = bind(); let (_, broadcast) = bind(); let (_, retransmit) = bind(); @@ -1415,6 +1432,7 @@ impl Node { SocketAddr::new(gossip_addr.ip(), gossip_port), SocketAddr::new(gossip_addr.ip(), tvu_port), SocketAddr::new(gossip_addr.ip(), tpu_port), + SocketAddr::new(gossip_addr.ip(), forwarder_port), SocketAddr::new(gossip_addr.ip(), storage_port), SocketAddr::new(gossip_addr.ip(), RPC_PORT), SocketAddr::new(gossip_addr.ip(), RPC_PORT + 1), @@ -1428,6 +1446,7 @@ impl Node { gossip, tvu: tvu_sockets, tpu: tpu_sockets, + forwarder: forwarder_sockets, broadcast, repair, retransmit, @@ -1528,6 +1547,7 @@ mod tests { socketaddr!([127, 0, 0, 1], 1237), socketaddr!([127, 0, 0, 1], 1238), socketaddr!([127, 0, 0, 1], 1239), + socketaddr!([127, 0, 0, 1], 1240), 0, ); cluster_info.insert_info(nxt.clone()); @@ -1544,6 +1564,7 @@ mod tests { socketaddr!([127, 0, 0, 1], 1237), socketaddr!([127, 0, 0, 1], 1238), socketaddr!([127, 0, 0, 1], 1239), + socketaddr!([127, 0, 0, 1], 1240), 0, ); cluster_info.insert_info(nxt); @@ -1577,6 +1598,7 @@ mod tests { socketaddr!("127.0.0.1:1237"), socketaddr!("127.0.0.1:1238"), socketaddr!("127.0.0.1:1239"), + socketaddr!("127.0.0.1:1240"), 0, ); let rv = ClusterInfo::run_window_request( diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index 6fc2f4e2f0..91924e9c59 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -18,6 +18,8 @@ pub struct ContactInfo { pub tvu: SocketAddr, /// transactions address pub tpu: SocketAddr, + // forwarer address + pub forwarder: SocketAddr, /// storage data address pub storage_addr: SocketAddr, /// address to which to send JSON-RPC requests @@ -72,6 +74,7 @@ impl Default for ContactInfo { gossip: socketaddr_any!(), tvu: socketaddr_any!(), tpu: socketaddr_any!(), + forwarder: socketaddr_any!(), storage_addr: socketaddr_any!(), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), @@ -87,6 +90,7 @@ impl ContactInfo { gossip: SocketAddr, tvu: SocketAddr, tpu: SocketAddr, + forwarder: SocketAddr, storage_addr: SocketAddr, rpc: SocketAddr, rpc_pubsub: SocketAddr, @@ -98,6 +102,7 @@ impl ContactInfo { gossip, tvu, tpu, + forwarder, storage_addr, rpc, rpc_pubsub, @@ -114,6 +119,7 @@ impl ContactInfo { socketaddr!("127.0.0.1:1237"), socketaddr!("127.0.0.1:1238"), socketaddr!("127.0.0.1:1239"), + socketaddr!("127.0.0.1:1240"), now, ) } @@ -131,6 +137,7 @@ impl ContactInfo { addr, addr, addr, + addr, 0, ) } @@ -143,6 +150,7 @@ impl ContactInfo { let tpu_addr = *bind_addr; let gossip_addr = Self::next_port(&bind_addr, 1); let tvu_addr = Self::next_port(&bind_addr, 2); + let forwarder_addr = Self::next_port(&bind_addr, 3); let rpc_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT); let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT + 1); Self::new( @@ -150,6 +158,7 @@ impl ContactInfo { gossip_addr, tvu_addr, tpu_addr, + forwarder_addr, "0.0.0.0:0".parse().unwrap(), rpc_addr, rpc_pubsub_addr, @@ -172,6 +181,7 @@ impl ContactInfo { daddr, daddr, daddr, + daddr, timestamp(), ) } @@ -251,6 +261,7 @@ mod tests { let ci = ContactInfo::default(); assert!(ci.gossip.ip().is_unspecified()); assert!(ci.tvu.ip().is_unspecified()); + assert!(ci.forwarder.ip().is_unspecified()); assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); @@ -261,6 +272,7 @@ mod tests { let ci = ContactInfo::new_multicast(); assert!(ci.gossip.ip().is_multicast()); assert!(ci.tvu.ip().is_multicast()); + assert!(ci.forwarder.ip().is_multicast()); assert!(ci.rpc.ip().is_multicast()); assert!(ci.rpc_pubsub.ip().is_multicast()); assert!(ci.tpu.ip().is_multicast()); @@ -272,6 +284,7 @@ mod tests { let ci = ContactInfo::new_gossip_entry_point(&addr); assert_eq!(ci.gossip, addr); assert!(ci.tvu.ip().is_unspecified()); + assert!(ci.forwarder.ip().is_unspecified()); assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); @@ -284,6 +297,7 @@ mod tests { assert_eq!(ci.tpu, addr); assert_eq!(ci.gossip.port(), 11); assert_eq!(ci.tvu.port(), 12); + assert_eq!(ci.forwarder.port(), 13); assert_eq!(ci.rpc.port(), 8899); assert_eq!(ci.rpc_pubsub.port(), 8900); assert!(ci.storage_addr.ip().is_unspecified()); @@ -298,6 +312,7 @@ mod tests { assert_eq!(d1.id, keypair.pubkey()); assert_eq!(d1.gossip, socketaddr!("127.0.0.1:1235")); assert_eq!(d1.tvu, socketaddr!("127.0.0.1:1236")); + assert_eq!(d1.forwarder, socketaddr!("127.0.0.1:1237")); assert_eq!(d1.tpu, socketaddr!("127.0.0.1:1234")); assert_eq!(d1.rpc, socketaddr!("127.0.0.1:8899")); assert_eq!(d1.rpc_pubsub, socketaddr!("127.0.0.1:8900")); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index d5b064627d..217ac25c49 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -14,12 +14,20 @@ pub struct FetchStage { impl FetchStage { #[allow(clippy::new_ret_no_self)] - pub fn new(sockets: Vec, exit: &Arc) -> (Self, PacketReceiver) { + pub fn new( + sockets: Vec, + forwarder_sockets: Vec, + exit: &Arc, + ) -> (Self, PacketReceiver) { let (sender, receiver) = channel(); - (Self::new_with_sender(sockets, exit, &sender), receiver) + ( + Self::new_with_sender(sockets, forwarder_sockets, exit, &sender), + receiver, + ) } pub fn new_with_sender( sockets: Vec, + forwarder_sockets: Vec, exit: &Arc, sender: &PacketSender, ) -> Self { diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index 475de8a379..99ab9d6e99 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -220,6 +220,7 @@ impl Fullnode { &poh_recorder, entry_receiver, node.sockets.tpu, + node.sockets.forwarder, node.sockets.broadcast, config.sigverify_disabled, &blocktree, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 781dbe2a5a..8517e298a8 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -32,6 +32,7 @@ impl Tpu { poh_recorder: &Arc>, entry_receiver: Receiver, transactions_sockets: Vec, + forwarder_sockets: Vec, broadcast_socket: UdpSocket, sigverify_disabled: bool, blocktree: &Arc, @@ -40,8 +41,12 @@ impl Tpu { cluster_info.write().unwrap().set_leader(id); let (packet_sender, packet_receiver) = channel(); - let fetch_stage = - FetchStage::new_with_sender(transactions_sockets, &exit, &packet_sender.clone()); + let fetch_stage = FetchStage::new_with_sender( + transactions_sockets, + forwarder_sockets, + &exit, + &packet_sender.clone(), + ); let cluster_info_vote_listener = ClusterInfoVoteListener::new(&exit, cluster_info.clone(), packet_sender);