Add forwarder sockets and address to contact info and sockets structs

This commit is contained in:
Carl 2019-03-08 14:59:11 -08:00 committed by Pankaj Garg
parent fe1f67ea9a
commit 7beefb3f81
5 changed files with 56 additions and 5 deletions

View File

@ -1272,7 +1272,17 @@ impl ClusterInfo {
let (_, gossip_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap();
let daddr = socketaddr_any!();
let node = ContactInfo::new(id, daddr, daddr, daddr, daddr, daddr, daddr, timestamp());
let node = ContactInfo::new(
*id,
daddr,
daddr,
daddr,
daddr,
daddr,
daddr,
daddr,
timestamp(),
);
(node, gossip_socket)
}
}
@ -1331,6 +1341,7 @@ pub struct Sockets {
pub gossip: UdpSocket,
pub tvu: Vec<UdpSocket>,
pub tpu: Vec<UdpSocket>,
pub forwarder: Vec<UdpSocket>,
pub broadcast: UdpSocket,
pub repair: UdpSocket,
pub retransmit: UdpSocket,
@ -1351,6 +1362,7 @@ impl Node {
let tpu = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let tvu = UdpSocket::bind("127.0.0.1:0").unwrap();
let forwarder = UdpSocket::bind("127.0.0.1:0").unwrap();
let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
let rpc_port = find_available_port_in_range((1024, 65535)).unwrap();
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port);
@ -1366,6 +1378,7 @@ impl Node {
gossip.local_addr().unwrap(),
tvu.local_addr().unwrap(),
tpu.local_addr().unwrap(),
forwarder.local_addr().unwrap(),
storage.local_addr().unwrap(),
rpc_addr,
rpc_pubsub_addr,
@ -1377,6 +1390,7 @@ impl Node {
gossip,
tvu: vec![tvu],
tpu: vec![tpu],
forwarder: vec![forwarder],
broadcast,
repair,
retransmit,
@ -1405,6 +1419,9 @@ impl Node {
let (tpu_port, tpu_sockets) =
multi_bind_in_range(FULLNODE_PORT_RANGE, 32).expect("tpu multi_bind");
let (forwarder_port, forwarder_sockets) =
multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tpu multi_bind");
let (_, repair) = bind();
let (_, broadcast) = bind();
let (_, retransmit) = bind();
@ -1415,6 +1432,7 @@ impl Node {
SocketAddr::new(gossip_addr.ip(), gossip_port),
SocketAddr::new(gossip_addr.ip(), tvu_port),
SocketAddr::new(gossip_addr.ip(), tpu_port),
SocketAddr::new(gossip_addr.ip(), forwarder_port),
SocketAddr::new(gossip_addr.ip(), storage_port),
SocketAddr::new(gossip_addr.ip(), RPC_PORT),
SocketAddr::new(gossip_addr.ip(), RPC_PORT + 1),
@ -1428,6 +1446,7 @@ impl Node {
gossip,
tvu: tvu_sockets,
tpu: tpu_sockets,
forwarder: forwarder_sockets,
broadcast,
repair,
retransmit,
@ -1528,6 +1547,7 @@ mod tests {
socketaddr!([127, 0, 0, 1], 1237),
socketaddr!([127, 0, 0, 1], 1238),
socketaddr!([127, 0, 0, 1], 1239),
socketaddr!([127, 0, 0, 1], 1240),
0,
);
cluster_info.insert_info(nxt.clone());
@ -1544,6 +1564,7 @@ mod tests {
socketaddr!([127, 0, 0, 1], 1237),
socketaddr!([127, 0, 0, 1], 1238),
socketaddr!([127, 0, 0, 1], 1239),
socketaddr!([127, 0, 0, 1], 1240),
0,
);
cluster_info.insert_info(nxt);
@ -1577,6 +1598,7 @@ mod tests {
socketaddr!("127.0.0.1:1237"),
socketaddr!("127.0.0.1:1238"),
socketaddr!("127.0.0.1:1239"),
socketaddr!("127.0.0.1:1240"),
0,
);
let rv = ClusterInfo::run_window_request(

View File

@ -18,6 +18,8 @@ pub struct ContactInfo {
pub tvu: SocketAddr,
/// transactions address
pub tpu: SocketAddr,
// forwarer address
pub forwarder: SocketAddr,
/// storage data address
pub storage_addr: SocketAddr,
/// address to which to send JSON-RPC requests
@ -72,6 +74,7 @@ impl Default for ContactInfo {
gossip: socketaddr_any!(),
tvu: socketaddr_any!(),
tpu: socketaddr_any!(),
forwarder: socketaddr_any!(),
storage_addr: socketaddr_any!(),
rpc: socketaddr_any!(),
rpc_pubsub: socketaddr_any!(),
@ -87,6 +90,7 @@ impl ContactInfo {
gossip: SocketAddr,
tvu: SocketAddr,
tpu: SocketAddr,
forwarder: SocketAddr,
storage_addr: SocketAddr,
rpc: SocketAddr,
rpc_pubsub: SocketAddr,
@ -98,6 +102,7 @@ impl ContactInfo {
gossip,
tvu,
tpu,
forwarder,
storage_addr,
rpc,
rpc_pubsub,
@ -114,6 +119,7 @@ impl ContactInfo {
socketaddr!("127.0.0.1:1237"),
socketaddr!("127.0.0.1:1238"),
socketaddr!("127.0.0.1:1239"),
socketaddr!("127.0.0.1:1240"),
now,
)
}
@ -131,6 +137,7 @@ impl ContactInfo {
addr,
addr,
addr,
addr,
0,
)
}
@ -143,6 +150,7 @@ impl ContactInfo {
let tpu_addr = *bind_addr;
let gossip_addr = Self::next_port(&bind_addr, 1);
let tvu_addr = Self::next_port(&bind_addr, 2);
let forwarder_addr = Self::next_port(&bind_addr, 3);
let rpc_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT);
let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT + 1);
Self::new(
@ -150,6 +158,7 @@ impl ContactInfo {
gossip_addr,
tvu_addr,
tpu_addr,
forwarder_addr,
"0.0.0.0:0".parse().unwrap(),
rpc_addr,
rpc_pubsub_addr,
@ -172,6 +181,7 @@ impl ContactInfo {
daddr,
daddr,
daddr,
daddr,
timestamp(),
)
}
@ -251,6 +261,7 @@ mod tests {
let ci = ContactInfo::default();
assert!(ci.gossip.ip().is_unspecified());
assert!(ci.tvu.ip().is_unspecified());
assert!(ci.forwarder.ip().is_unspecified());
assert!(ci.rpc.ip().is_unspecified());
assert!(ci.rpc_pubsub.ip().is_unspecified());
assert!(ci.tpu.ip().is_unspecified());
@ -261,6 +272,7 @@ mod tests {
let ci = ContactInfo::new_multicast();
assert!(ci.gossip.ip().is_multicast());
assert!(ci.tvu.ip().is_multicast());
assert!(ci.forwarder.ip().is_multicast());
assert!(ci.rpc.ip().is_multicast());
assert!(ci.rpc_pubsub.ip().is_multicast());
assert!(ci.tpu.ip().is_multicast());
@ -272,6 +284,7 @@ mod tests {
let ci = ContactInfo::new_gossip_entry_point(&addr);
assert_eq!(ci.gossip, addr);
assert!(ci.tvu.ip().is_unspecified());
assert!(ci.forwarder.ip().is_unspecified());
assert!(ci.rpc.ip().is_unspecified());
assert!(ci.rpc_pubsub.ip().is_unspecified());
assert!(ci.tpu.ip().is_unspecified());
@ -284,6 +297,7 @@ mod tests {
assert_eq!(ci.tpu, addr);
assert_eq!(ci.gossip.port(), 11);
assert_eq!(ci.tvu.port(), 12);
assert_eq!(ci.forwarder.port(), 13);
assert_eq!(ci.rpc.port(), 8899);
assert_eq!(ci.rpc_pubsub.port(), 8900);
assert!(ci.storage_addr.ip().is_unspecified());
@ -298,6 +312,7 @@ mod tests {
assert_eq!(d1.id, keypair.pubkey());
assert_eq!(d1.gossip, socketaddr!("127.0.0.1:1235"));
assert_eq!(d1.tvu, socketaddr!("127.0.0.1:1236"));
assert_eq!(d1.forwarder, socketaddr!("127.0.0.1:1237"));
assert_eq!(d1.tpu, socketaddr!("127.0.0.1:1234"));
assert_eq!(d1.rpc, socketaddr!("127.0.0.1:8899"));
assert_eq!(d1.rpc_pubsub, socketaddr!("127.0.0.1:8900"));

View File

@ -14,12 +14,20 @@ pub struct FetchStage {
impl FetchStage {
#[allow(clippy::new_ret_no_self)]
pub fn new(sockets: Vec<UdpSocket>, exit: &Arc<AtomicBool>) -> (Self, PacketReceiver) {
pub fn new(
sockets: Vec<UdpSocket>,
forwarder_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>,
) -> (Self, PacketReceiver) {
let (sender, receiver) = channel();
(Self::new_with_sender(sockets, exit, &sender), receiver)
(
Self::new_with_sender(sockets, forwarder_sockets, exit, &sender),
receiver,
)
}
pub fn new_with_sender(
sockets: Vec<UdpSocket>,
forwarder_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>,
sender: &PacketSender,
) -> Self {

View File

@ -220,6 +220,7 @@ impl Fullnode {
&poh_recorder,
entry_receiver,
node.sockets.tpu,
node.sockets.forwarder,
node.sockets.broadcast,
config.sigverify_disabled,
&blocktree,

View File

@ -32,6 +32,7 @@ impl Tpu {
poh_recorder: &Arc<Mutex<PohRecorder>>,
entry_receiver: Receiver<WorkingBankEntries>,
transactions_sockets: Vec<UdpSocket>,
forwarder_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket,
sigverify_disabled: bool,
blocktree: &Arc<Blocktree>,
@ -40,8 +41,12 @@ impl Tpu {
cluster_info.write().unwrap().set_leader(id);
let (packet_sender, packet_receiver) = channel();
let fetch_stage =
FetchStage::new_with_sender(transactions_sockets, &exit, &packet_sender.clone());
let fetch_stage = FetchStage::new_with_sender(
transactions_sockets,
forwarder_sockets,
&exit,
&packet_sender.clone(),
);
let cluster_info_vote_listener =
ClusterInfoVoteListener::new(&exit, cluster_info.clone(), packet_sender);