diff --git a/src/crdt.rs b/src/crdt.rs index 50a37b403d..b8aabbab7f 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -1210,7 +1210,7 @@ impl Crdt { pub struct Sockets { pub gossip: UdpSocket, pub requests: UdpSocket, - pub replicate: UdpSocket, + pub replicate: Vec, pub transaction: Vec, pub respond: UdpSocket, pub broadcast: UdpSocket, @@ -1250,7 +1250,7 @@ impl Node { sockets: Sockets { gossip, requests, - replicate, + replicate: vec![replicate], transaction: vec![transaction], respond, broadcast, @@ -1270,7 +1270,9 @@ impl Node { bind() }; - let (replicate_port, replicate) = bind(); + let (replicate_port, replicate_sockets) = + multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tvu multi_bind"); + let (requests_port, requests) = bind(); let (transaction_port, transaction_sockets) = @@ -1299,7 +1301,7 @@ impl Node { sockets: Sockets { gossip, requests, - replicate, + replicate: replicate_sockets, transaction: transaction_sockets, respond, broadcast, @@ -1992,7 +1994,10 @@ mod tests { let ip = Ipv4Addr::from(0); let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(ip, 0)); assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); - assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip); + assert!(node.sockets.replicate.len() > 1); + for tx_socket in node.sockets.replicate.iter() { + assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); + } assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); assert!(node.sockets.transaction.len() > 1); for tx_socket in node.sockets.transaction.iter() { @@ -2002,8 +2007,12 @@ mod tests { assert!(node.sockets.gossip.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.gossip.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); - assert!(node.sockets.replicate.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); - assert!(node.sockets.replicate.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); + let tx_port = node.sockets.replicate[0].local_addr().unwrap().port(); + assert!(tx_port >= FULLNODE_PORT_RANGE.0); + assert!(tx_port < FULLNODE_PORT_RANGE.1); + for tx_socket in node.sockets.replicate.iter() { + assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port); + } assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); let tx_port = node.sockets.transaction[0].local_addr().unwrap().port(); @@ -2021,7 +2030,10 @@ mod tests { let ip = IpAddr::V4(Ipv4Addr::from(0)); let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(0, 8050)); assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); - assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip); + assert!(node.sockets.replicate.len() > 1); + for tx_socket in node.sockets.replicate.iter() { + assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); + } assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); assert!(node.sockets.transaction.len() > 1); for tx_socket in node.sockets.transaction.iter() { @@ -2030,8 +2042,12 @@ mod tests { assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050); - assert!(node.sockets.replicate.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); - assert!(node.sockets.replicate.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); + let tx_port = node.sockets.replicate[0].local_addr().unwrap().port(); + assert!(tx_port >= FULLNODE_PORT_RANGE.0); + assert!(tx_port < FULLNODE_PORT_RANGE.1); + for tx_socket in node.sockets.replicate.iter() { + assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port); + } assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); let tx_port = node.sockets.transaction[0].local_addr().unwrap().port(); diff --git a/src/tvu.rs b/src/tvu.rs index fef15b46fa..eebe84abce 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -76,18 +76,18 @@ impl Tvu { crdt: Arc>, window: SharedWindow, blob_recycler: BlobRecycler, - replicate_socket: UdpSocket, + replicate_sockets: Vec, repair_socket: UdpSocket, retransmit_socket: UdpSocket, ledger_path: Option<&str>, exit: Arc, ) -> Self { let repair_socket = Arc::new(repair_socket); - let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket( - vec![Arc::new(replicate_socket), repair_socket.clone()], - exit.clone(), - &blob_recycler, - ); + let mut blob_sockets: Vec> = + replicate_sockets.into_iter().map(Arc::new).collect(); + blob_sockets.push(repair_socket.clone()); + let (fetch_stage, blob_fetch_receiver) = + BlobFetchStage::new_multi_socket(blob_sockets, exit.clone(), &blob_recycler); //TODO //the packets coming out of blob_receiver need to be sent to the GPU and verified //then sent to the window, which does the erasure coding reconstruction @@ -200,8 +200,15 @@ pub mod tests { // simulate target peer let recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); + let blob_sockets: Vec> = target2 + .sockets + .replicate + .into_iter() + .map(Arc::new) + .collect(); + let t_receiver = streamer::blob_receiver( - Arc::new(target2.sockets.replicate), + blob_sockets[0].clone(), exit.clone(), recycler.clone(), s_reader, diff --git a/src/window_service.rs b/src/window_service.rs index 510479ea6c..c32aa4822c 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -302,6 +302,7 @@ mod test { use crdt::{Crdt, Node}; use logger; use packet::{BlobRecycler, PACKET_DATA_SIZE}; + use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -361,9 +362,12 @@ mod test { ); let t_responder = { let (s_responder, r_responder) = channel(); + let blob_sockets: Vec> = + tn.sockets.replicate.into_iter().map(Arc::new).collect(); + let t_responder = responder( "window_send_test", - Arc::new(tn.sockets.replicate), + blob_sockets[0].clone(), resp_recycler.clone(), r_responder, ); @@ -431,9 +435,11 @@ mod test { ); let t_responder = { let (s_responder, r_responder) = channel(); + let blob_sockets: Vec> = + tn.sockets.replicate.into_iter().map(Arc::new).collect(); let t_responder = responder( "window_send_test", - Arc::new(tn.sockets.replicate), + blob_sockets[0].clone(), resp_recycler.clone(), r_responder, ); @@ -494,9 +500,11 @@ mod test { ); let t_responder = { let (s_responder, r_responder) = channel(); + let blob_sockets: Vec> = + tn.sockets.replicate.into_iter().map(Arc::new).collect(); let t_responder = responder( "window_send_test", - Arc::new(tn.sockets.replicate), + blob_sockets[0].clone(), resp_recycler.clone(), r_responder, ); diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 30e8e5347c..39846e6356 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -17,7 +17,7 @@ use std::thread::sleep; use std::time::Duration; fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { - let tn = Node::new_localhost(); + let mut tn = Node::new_localhost(); let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new"); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); @@ -29,7 +29,7 @@ fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { tn.sockets.gossip, exit, ); - (c, d, tn.sockets.replicate) + (c, d, tn.sockets.replicate.pop().unwrap()) } /// Test that the network converges.