diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 2eeb58aeb5..8bbebe2c35 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -51,20 +51,17 @@ fn sample_tx_count( now = Instant::now(); let sample = tx_count - initial_tx_count; initial_tx_count = tx_count; - println!( - "{}: Transactions processed {}", - v.addrs.transactions, sample - ); + println!("{}: Transactions processed {}", v.contact_info.tpu, sample); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let tps = (sample * 1_000_000_000) as f64 / ns as f64; if tps > max_tps { max_tps = tps; } - println!("{}: {:.2} tps", v.addrs.transactions, tps); + println!("{}: {:.2} tps", v.contact_info.tpu, tps); total = tx_count - first_count; println!( "{}: Total Transactions processed {}", - v.addrs.transactions, total + v.contact_info.tpu, total ); sleep(Duration::new(sample_period, 0)); @@ -116,7 +113,7 @@ fn generate_and_send_txs( println!( "Transferring 1 unit {} times... to {:?}", txs.len(), - leader.addrs.transactions + leader.contact_info.tpu ); for tx in txs { client.transfer_signed(tx.clone()).unwrap(); @@ -215,7 +212,7 @@ fn main() { time_sec = s.to_string().parse().expect("integer"); } - let mut drone_addr = leader.addrs.transactions.clone(); + let mut drone_addr = leader.contact_info.tpu.clone(); drone_addr.set_port(9900); let signal = Arc::new(AtomicBool::new(false)); @@ -330,9 +327,9 @@ fn mk_client(r: &ReplicatedData) -> ThinClient { .unwrap(); ThinClient::new( - r.addrs.requests, + r.contact_info.rpu, requests_socket, - r.addrs.transactions, + r.contact_info.tpu, transactions_socket, ) } @@ -384,7 +381,7 @@ fn converge( .table .values() .into_iter() - .filter(|x| x.addrs.requests != daddr) + .filter(|x| x.contact_info.rpu != daddr) .cloned() .collect(); if v.len() >= num_nodes { diff --git a/src/bin/drone.rs b/src/bin/drone.rs index 7a299ed387..ff494e515d 100644 --- a/src/bin/drone.rs +++ b/src/bin/drone.rs @@ -94,8 +94,8 @@ fn main() { let drone = Arc::new(Mutex::new(Drone::new( mint_keypair, drone_addr, - leader.addrs.transactions, - leader.addrs.requests, + leader.contact_info.tpu, + leader.contact_info.rpu, time_slice, request_cap, ))); diff --git a/src/bin/wallet.rs b/src/bin/wallet.rs index 6431682c9e..dc46a5dd08 100644 --- a/src/bin/wallet.rs +++ b/src/bin/wallet.rs @@ -156,7 +156,7 @@ fn parse_args() -> Result> { exit(1); }; - let mut drone_addr = leader.addrs.transactions.clone(); + let mut drone_addr = leader.contact_info.tpu.clone(); drone_addr.set_port(9900); let command = match matches.subcommand() { @@ -305,9 +305,9 @@ fn mk_client(r: &ReplicatedData) -> io::Result { .unwrap(); Ok(ThinClient::new( - r.addrs.requests, + r.contact_info.rpu, requests_socket, - r.addrs.transactions, + r.contact_info.tpu, transactions_socket, )) } diff --git a/src/crdt.rs b/src/crdt.rs index d8d1a8dc60..448a312f1f 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -91,18 +91,18 @@ pub fn get_ip_addr() -> Option { /// Structure to be replicated by the network #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub struct Addrs { +pub struct ContactInfo { /// gossip address - pub gossip: SocketAddr, + pub ncp: SocketAddr, /// address to connect to for replication - pub replicate: SocketAddr, + pub tvu: SocketAddr, /// address to connect to when this node is leader - pub requests: SocketAddr, + pub rpu: SocketAddr, /// transactions address - pub transactions: SocketAddr, + pub tpu: SocketAddr, /// repair address, we use this to jump ahead of the packets /// destined to the replciate_addr - pub repair: SocketAddr, + pub tvu_window: SocketAddr, /// if this struture changes update this value as well /// Always update `ReplicatedData` version too /// This separate version for addresses allows us to use the `Vote` @@ -117,7 +117,7 @@ pub struct ReplicatedData { /// If any of the bits change, update increment this value pub version: u64, /// network addresses - pub addrs: Addrs, + pub contact_info: ContactInfo, /// current leader identity pub current_leader_id: PublicKey, /// last verified hash that was submitted to the leader @@ -135,21 +135,21 @@ fn make_debug_id(buf: &[u8]) -> u64 { impl ReplicatedData { pub fn new( id: PublicKey, - gossip: SocketAddr, - replicate: SocketAddr, - requests: SocketAddr, - transactions: SocketAddr, - repair: SocketAddr, + ncp: SocketAddr, + tvu: SocketAddr, + rpu: SocketAddr, + tpu: SocketAddr, + tvu_window: SocketAddr, ) -> ReplicatedData { ReplicatedData { id, version: 0, - addrs: Addrs { - gossip, - replicate, - requests, - transactions, - repair, + contact_info: ContactInfo { + ncp, + tvu, + rpu, + tpu, + tvu_window, version: 0, }, current_leader_id: PublicKey::default(), @@ -415,7 +415,7 @@ impl Crdt { if me.id == v.id { //filter myself false - } else if v.addrs.replicate == daddr { + } else if v.contact_info.tvu == daddr { trace!( "{:x}:broadcast skip not listening {:x}", me.debug_id(), @@ -427,7 +427,7 @@ impl Crdt { "{:x}:broadcast node {:x} {}", me.debug_id(), v.debug_id(), - v.addrs.replicate + v.contact_info.tvu ); true } @@ -482,17 +482,17 @@ impl Crdt { blob.get_index().unwrap(), blob.meta.size, v.debug_id(), - v.addrs.replicate, + v.contact_info.tvu, blob.is_coding() ); assert!(blob.meta.size < BLOB_SIZE); - let e = s.send_to(&blob.data[..blob.meta.size], &v.addrs.replicate); + let e = s.send_to(&blob.data[..blob.meta.size], &v.contact_info.tvu); trace!( "{:x}: done broadcast {} to {:x} {}", me.debug_id(), blob.meta.size, v.debug_id(), - v.addrs.replicate + v.contact_info.tvu ); e }) @@ -531,7 +531,7 @@ impl Crdt { } else if me.current_leader_id == v.id { trace!("skip retransmit to leader {:?}", v.id); false - } else if v.addrs.replicate == daddr { + } else if v.contact_info.tvu == daddr { trace!("skip nodes that are not listening {:?}", v.id); false } else { @@ -551,7 +551,7 @@ impl Crdt { ); //TODO profile this, may need multiple sockets for par_iter assert!(rblob.meta.size < BLOB_SIZE); - s.send_to(&rblob.data[..rblob.meta.size], &v.addrs.replicate) + s.send_to(&rblob.data[..rblob.meta.size], &v.contact_info.tvu) }) .collect(); for e in errs { @@ -594,13 +594,13 @@ impl Crdt { let daddr = "0.0.0.0:0".parse().unwrap(); let valid: Vec<_> = self.table .values() - .filter(|r| r.id != self.me && r.addrs.repair != daddr) + .filter(|r| r.id != self.me && r.contact_info.tvu_window != daddr) .collect(); if valid.is_empty() { Err(CrdtError::TooSmall)?; } let n = (Self::random() as usize) % valid.len(); - let addr = valid[n].addrs.gossip.clone(); + let addr = valid[n].contact_info.ncp.clone(); let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix); let out = serialize(&req)?; Ok((addr, out)) @@ -637,10 +637,10 @@ impl Crdt { "created gossip request from {:x} to {:x} {}", self.debug_id(), v.debug_id(), - v.addrs.gossip + v.contact_info.ncp ); - Ok((v.addrs.gossip, req)) + Ok((v.contact_info.ncp, req)) } /// At random pick a node and try to get updated changes from them @@ -809,7 +809,7 @@ impl Crdt { let sz = wblob.meta.size; outblob.meta.size = sz; outblob.data[..sz].copy_from_slice(&wblob.data[..sz]); - outblob.meta.set_addr(&from.addrs.repair); + outblob.meta.set_addr(&from.contact_info.tvu_window); outblob.set_id(sender_id).expect("blob set_id"); } @@ -845,7 +845,7 @@ impl Crdt { // TODO sigverify these Ok(Protocol::RequestUpdates(v, from_rd)) => { trace!("RequestUpdates {}", v); - let addr = from_rd.addrs.gossip; + let addr = from_rd.contact_info.ncp; let me = obj.read().unwrap(); // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = me.get_updates_since(v); @@ -905,7 +905,7 @@ impl Crdt { from.debug_id(), ix, ); - assert_ne!(from.addrs.repair, me.addrs.repair); + assert_ne!(from.contact_info.tvu_window, me.contact_info.tvu_window); Self::run_window_request(&window, &me, &from, ix, blob_recycler) } Err(_) => { @@ -1029,19 +1029,19 @@ impl TestNode { } pub fn new_with_bind_addr(data: ReplicatedData, bind_addr: SocketAddr) -> TestNode { let mut local_gossip_addr = bind_addr.clone(); - local_gossip_addr.set_port(data.addrs.gossip.port()); + local_gossip_addr.set_port(data.contact_info.ncp.port()); let mut local_replicate_addr = bind_addr.clone(); - local_replicate_addr.set_port(data.addrs.replicate.port()); + local_replicate_addr.set_port(data.contact_info.tvu.port()); let mut local_requests_addr = bind_addr.clone(); - local_requests_addr.set_port(data.addrs.requests.port()); + local_requests_addr.set_port(data.contact_info.rpu.port()); let mut local_transactions_addr = bind_addr.clone(); - local_transactions_addr.set_port(data.addrs.transactions.port()); + local_transactions_addr.set_port(data.contact_info.tpu.port()); let mut local_repair_addr = bind_addr.clone(); - local_repair_addr.set_port(data.addrs.repair.port()); + local_repair_addr.set_port(data.contact_info.tvu_window.port()); let transaction = UdpSocket::bind(local_transactions_addr).unwrap(); let gossip = UdpSocket::bind(local_gossip_addr).unwrap(); @@ -1133,11 +1133,14 @@ mod tests { &"127.0.0.1:1234".parse().unwrap(), ); assert_eq!(d1.id, kp.pubkey()); - assert_eq!(d1.addrs.gossip, "127.0.0.1:1235".parse().unwrap()); - assert_eq!(d1.addrs.replicate, "127.0.0.1:1236".parse().unwrap()); - assert_eq!(d1.addrs.requests, "127.0.0.1:1237".parse().unwrap()); - assert_eq!(d1.addrs.transactions, "127.0.0.1:1234".parse().unwrap()); - assert_eq!(d1.addrs.repair, "127.0.0.1:1238".parse().unwrap()); + assert_eq!(d1.contact_info.ncp, "127.0.0.1:1235".parse().unwrap()); + assert_eq!(d1.contact_info.tvu, "127.0.0.1:1236".parse().unwrap()); + assert_eq!(d1.contact_info.rpu, "127.0.0.1:1237".parse().unwrap()); + assert_eq!(d1.contact_info.tpu, "127.0.0.1:1234".parse().unwrap()); + assert_eq!( + d1.contact_info.tvu_window, + "127.0.0.1:1238".parse().unwrap() + ); } #[test] fn update_test() { @@ -1232,7 +1235,7 @@ mod tests { ); crdt.insert(&nxt); let rv = crdt.window_index_request(0).unwrap(); - assert_eq!(nxt.addrs.gossip, "127.0.0.2:1234".parse().unwrap()); + assert_eq!(nxt.contact_info.ncp, "127.0.0.2:1234".parse().unwrap()); assert_eq!(rv.0, "127.0.0.2:1234".parse().unwrap()); let nxt = ReplicatedData::new( @@ -1285,7 +1288,7 @@ mod tests { crdt.insert(&nxt1); let rv = crdt.gossip_request().unwrap(); - assert_eq!(rv.0, nxt1.addrs.gossip); + assert_eq!(rv.0, nxt1.contact_info.ncp); let nxt2 = ReplicatedData::new_entry_point("127.0.0.3:1234".parse().unwrap()); crdt.insert(&nxt2); @@ -1306,9 +1309,9 @@ mod tests { } assert!(rv.len() > 0); for i in rv.iter() { - if i.read().unwrap().meta.addr() == nxt1.addrs.gossip { + if i.read().unwrap().meta.addr() == nxt1.contact_info.ncp { one = true; - } else if i.read().unwrap().meta.addr() == nxt2.addrs.gossip { + } else if i.read().unwrap().meta.addr() == nxt2.contact_info.ncp { two = true; } else { //unexpected request @@ -1335,19 +1338,19 @@ mod tests { crdt.set_leader(me.id); crdt.insert(&nxt); let rv = crdt.gossip_request().unwrap(); - assert_eq!(rv.0, nxt.addrs.gossip); + assert_eq!(rv.0, nxt.contact_info.ncp); let now = crdt.alive[&nxt.id]; crdt.purge(now); let rv = crdt.gossip_request().unwrap(); - assert_eq!(rv.0, nxt.addrs.gossip); + assert_eq!(rv.0, nxt.contact_info.ncp); crdt.purge(now + GOSSIP_PURGE_MILLIS); let rv = crdt.gossip_request().unwrap(); - assert_eq!(rv.0, nxt.addrs.gossip); + assert_eq!(rv.0, nxt.contact_info.ncp); crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); let rv = crdt.gossip_request().unwrap(); - assert_eq!(rv.0, nxt.addrs.gossip); + assert_eq!(rv.0, nxt.contact_info.ncp); let nxt2 = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); assert_ne!(me.id, nxt2.id); @@ -1365,7 +1368,7 @@ mod tests { crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); assert_eq!(len as usize - 1, crdt.table.len()); let rv = crdt.gossip_request().unwrap(); - assert_eq!(rv.0, nxt.addrs.gossip); + assert_eq!(rv.0, nxt.contact_info.ncp); } /// test window requests respond with the right blob, and do not overrun diff --git a/src/drone.rs b/src/drone.rs index 70de99f0fe..37a8bf4a57 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -287,8 +287,8 @@ mod tests { let mut drone = Drone::new( alice.keypair(), addr, - leader_data.addrs.transactions, - leader_data.addrs.requests, + leader_data.contact_info.tpu, + leader_data.contact_info.rpu, None, Some(150_000), ); @@ -312,9 +312,9 @@ mod tests { UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket"); let mut client = ThinClient::new( - leader_data.addrs.requests, + leader_data.contact_info.rpu, requests_socket, - leader_data.addrs.transactions, + leader_data.contact_info.tpu, transactions_socket, ); diff --git a/src/fullnode.rs b/src/fullnode.rs index 040f889f9e..956ae49788 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -67,9 +67,9 @@ impl FullNode { let local_requests_addr = node.sockets.requests.local_addr().unwrap(); info!( "starting... local gossip address: {} (advertising {})", - local_gossip_addr, node.data.addrs.gossip + local_gossip_addr, node.data.contact_info.ncp ); - let requests_addr = node.data.addrs.requests.clone(); + let requests_addr = node.data.contact_info.rpu.clone(); if !leader { let testnet_addr = network_entry_for_validator.expect("validator requires entry"); diff --git a/src/streamer.rs b/src/streamer.rs index cd57c27f8e..baaeaad791 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -980,7 +980,7 @@ mod test { w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.data.addrs.gossip); + w.meta.set_addr(&tn.data.contact_info.ncp); } msgs.push_back(b); } diff --git a/src/thin_client.rs b/src/thin_client.rs index e48d2be715..731b568aae 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -302,9 +302,9 @@ mod tests { let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader_data.addrs.requests, + leader_data.contact_info.rpu, requests_socket, - leader_data.addrs.transactions, + leader_data.contact_info.tpu, transactions_socket, ); let last_id = client.get_last_id(); @@ -344,9 +344,9 @@ mod tests { .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader_data.addrs.requests, + leader_data.contact_info.rpu, requests_socket, - leader_data.addrs.transactions, + leader_data.contact_info.tpu, transactions_socket, ); let last_id = client.get_last_id(); @@ -396,9 +396,9 @@ mod tests { .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader_data.addrs.requests, + leader_data.contact_info.rpu, requests_socket, - leader_data.addrs.transactions, + leader_data.contact_info.tpu, transactions_socket, ); let last_id = client.get_last_id(); diff --git a/src/tvu.rs b/src/tvu.rs index c23649615e..522b2d1c9e 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -198,7 +198,7 @@ pub mod tests { let starting_balance = 10_000; let mint = Mint::new(starting_balance); - let replicate_addr = target1.data.addrs.replicate; + let replicate_addr = target1.data.contact_info.tvu; let bank = Arc::new(Bank::new(&mint)); //start crdt1 diff --git a/tests/multinode.rs b/tests/multinode.rs index 02887be883..edfe57e398 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -29,8 +29,8 @@ fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec { let mut spy = TestNode::new(); let daddr = "0.0.0.0:0".parse().unwrap(); let me = spy.data.id.clone(); - spy.data.addrs.replicate = daddr; - spy.data.addrs.requests = daddr; + spy.data.contact_info.tvu = daddr; + spy.data.contact_info.rpu = daddr; let mut spy_crdt = Crdt::new(spy.data); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); @@ -55,7 +55,7 @@ fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec { .values() .into_iter() .filter(|x| x.id != me) - .filter(|x| x.addrs.requests != daddr) + .filter(|x| x.contact_info.rpu != daddr) .cloned() .collect(); if num >= num_nodes as u64 && v.len() >= num_nodes { @@ -110,7 +110,7 @@ fn test_multi_node_validator_catchup_from_zero() { validator, false, InFile::Path(ledger_path.clone()), - Some(leader_data.addrs.gossip), + Some(leader_data.contact_info.ncp), None, exit.clone(), ); @@ -143,7 +143,7 @@ fn test_multi_node_validator_catchup_from_zero() { TestNode::new(), false, InFile::Path(ledger_path.clone()), - Some(leader_data.addrs.gossip), + Some(leader_data.contact_info.ncp), None, exit.clone(), ); @@ -211,7 +211,7 @@ fn test_multi_node_basic() { validator, false, InFile::Path(ledger_path.clone()), - Some(leader_data.addrs.gossip), + Some(leader_data.contact_info.ncp), None, exit.clone(), ); @@ -272,7 +272,7 @@ fn test_boot_validator_from_file() { validator, false, InFile::Path(ledger_path.clone()), - Some(leader_data.addrs.gossip), + Some(leader_data.contact_info.ncp), None, exit.clone(), ); @@ -356,7 +356,7 @@ fn test_leader_restart_validator_start_from_old_ledger() { validator, false, InFile::Path(stale_ledger_path.clone()), - Some(leader_data.addrs.gossip), + Some(leader_data.contact_info.ncp), None, exit.clone(), ); @@ -425,7 +425,7 @@ fn test_multi_node_dynamic_network() { validator, false, InFile::Path(ledger_path.clone()), - Some(leader_data.addrs.gossip), + Some(leader_data.contact_info.ncp), Some(OutFile::Path(ledger_path.clone())), exit.clone(), ); @@ -482,7 +482,7 @@ fn test_multi_node_dynamic_network() { validator, false, InFile::Path(ledger_path.clone()), - Some(leader_data.addrs.gossip), + Some(leader_data.contact_info.ncp), Some(OutFile::Path(ledger_path.clone())), exit.clone(), ); @@ -518,12 +518,12 @@ fn mk_client(leader: &ReplicatedData) -> ThinClient { .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let daddr = "0.0.0.0:0".parse().unwrap(); - assert!(leader.addrs.requests != daddr); - assert!(leader.addrs.transactions != daddr); + assert!(leader.contact_info.rpu != daddr); + assert!(leader.contact_info.tpu != daddr); ThinClient::new( - leader.addrs.requests, + leader.contact_info.rpu, requests_socket, - leader.addrs.transactions, + leader.contact_info.tpu, transactions_socket, ) }