diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 05fd4b5d4a..8db628e885 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -12,7 +12,7 @@ //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight -use bincode::{deserialize, serialize}; +use bincode::{deserialize, serialize, serialized_size}; use budget_instruction::Vote; use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy}; use counter::Counter; @@ -228,6 +228,7 @@ pub struct ClusterInfo { // TODO These messages should be signed, and go through the gpu pipeline for spam filtering #[derive(Serialize, Deserialize, Debug)] +#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] enum Protocol { /// forward your own latest data structure when requesting an update /// this doesn't update the `remote` update index, but it allows the @@ -235,8 +236,14 @@ enum Protocol { /// (last update index i saw from you, my replicated data) RequestUpdates(u64, NodeInfo), //TODO might need a since? - /// from id, form's last update index, NodeInfo - ReceiveUpdates(Pubkey, u64, Vec, Vec<(Pubkey, u64)>), + /// * from - from id, + /// * max_update_index - from's max update index in the response + /// * nodes - (NodeInfo, remote_last_update) vector + ReceiveUpdates { + from: Pubkey, + max_update_index: u64, + nodes: Vec<(NodeInfo, u64)>, + }, /// ask for a missing index /// (my replicated data to keep alive, missing window index) RequestWindowIndex(NodeInfo, u64), @@ -702,17 +709,43 @@ impl ClusterInfo { 1.0 } - fn get_updates_since(&self, v: u64) -> (Pubkey, u64, Vec) { - //trace!("get updates since {}", v); - let data = self + fn max_updates(max_bytes: usize) -> usize { + let unit = (NodeInfo::new_localhost(Default::default()), 0); + let unit_size = serialized_size(&unit).unwrap(); + let msg = Protocol::ReceiveUpdates { + from: Default::default(), + max_update_index: 0, + nodes: vec![unit], + }; + let msg_size = serialized_size(&msg).unwrap(); + ((max_bytes - (msg_size as usize)) / (unit_size as usize)) + 1 + } + // Get updated node since v up to a maximum of `max_bytes` updates + fn get_updates_since(&self, v: u64, max: usize) -> (Pubkey, u64, Vec<(NodeInfo, u64)>) { + let nodes: Vec<_> = self .table .values() .filter(|x| x.id != Pubkey::default() && self.local[&x.id] > v) .cloned() .collect(); + let liveness: Vec = nodes + .iter() + .map(|d| *self.remote.get(&d.id).unwrap_or(&0)) + .collect(); + let updates: Vec = nodes.iter().map(|d| self.local[&d.id]).collect(); + trace!("{:?}", updates); let id = self.id; - let ups = self.update_index; - (id, ups, data) + let mut out: Vec<(u64, (NodeInfo, u64))> = updates + .into_iter() + .zip(nodes.into_iter().zip(liveness)) + .collect(); + out.sort_by_key(|k| k.0); + let last_node = std::cmp::max(1, std::cmp::min(out.len(), max)) - 1; + let max_updated_node = out.get(last_node).map(|x| x.0).unwrap_or(0); + let updated_data: Vec<(NodeInfo, u64)> = out.into_iter().take(max).map(|x| x.1).collect(); + + trace!("get updates since response {} {}", v, updated_data.len()); + (id, max_updated_node, updated_data) } pub fn valid_last_ids(&self) -> Vec { @@ -852,24 +885,19 @@ impl ClusterInfo { /// * `from` - identity of the sender of the updates /// * `update_index` - the number of updates that `from` has completed and this set of `data` represents /// * `data` - the update data - fn apply_updates( - &mut self, - from: Pubkey, - update_index: u64, - data: &[NodeInfo], - external_liveness: &[(Pubkey, u64)], - ) { + fn apply_updates(&mut self, from: Pubkey, update_index: u64, data: &[(NodeInfo, u64)]) { trace!("got updates {}", data.len()); // TODO we need to punish/spam resist here // sigverify the whole update and slash anyone who sends a bad update let mut insert_total = 0; for v in data { - insert_total += self.insert(&v); + insert_total += self.insert(&v.0); } inc_new_counter_info!("cluster_info-update-count", insert_total); - for (pubkey, external_remote_index) in external_liveness { - let remote_entry = if let Some(v) = self.remote.get(pubkey) { + for (node, external_remote_index) in data { + let pubkey = node.id; + let remote_entry = if let Some(v) = self.remote.get(&pubkey) { *v } else { 0 @@ -881,7 +909,7 @@ impl ClusterInfo { let liveness_entry = self .external_liveness - .entry(*pubkey) + .entry(pubkey) .or_insert_with(HashMap::new); let peer_index = *liveness_entry.entry(from).or_insert(*external_remote_index); if *external_remote_index > peer_index { @@ -1055,20 +1083,8 @@ impl ClusterInfo { inc_new_counter_info!("cluster_info-window-request-updates-unspec-ncp", 1); from.contact_info.ncp = *from_addr; } - - let (from_id, ups, data, liveness) = { - let me = me.read().unwrap(); - - // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` - let (from_id, ups, data) = me.get_updates_since(version); - - ( - from_id, - ups, - data, - me.remote.iter().map(|(k, v)| (*k, *v)).collect(), - ) - }; + let max = Self::max_updates(1024 * 64 - 512); + let (from_id, ups, data) = me.read().unwrap().get_updates_since(version, max); // update entry only after collecting liveness { @@ -1077,10 +1093,10 @@ impl ClusterInfo { me.update_liveness(from.id); } - trace!("get updates since response {} {}", version, data.len()); let len = data.len(); + trace!("get updates since response {} {}", version, len); - if len < 1 { + if data.is_empty() { let me = me.read().unwrap(); trace!( "no updates me {} ix {} since {}", @@ -1090,7 +1106,11 @@ impl ClusterInfo { ); None } else { - let rsp = Protocol::ReceiveUpdates(from_id, ups, data, liveness); + let rsp = Protocol::ReceiveUpdates { + from: from_id, + max_update_index: ups, + nodes: data, + }; if let Ok(r) = to_blob(rsp, from.contact_info.ncp) { trace!( @@ -1107,22 +1127,26 @@ impl ClusterInfo { } } } - Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => { + Protocol::ReceiveUpdates { + from, + max_update_index, + nodes, + } => { let now = Instant::now(); trace!( "ReceivedUpdates from={} update_index={} len={}", from, - update_index, - data.len() + max_update_index, + nodes.len() ); me.write() .expect("'me' write lock in ReceiveUpdates") - .apply_updates(from, update_index, &data, &external_liveness); + .apply_updates(from, max_update_index, &nodes); report_time_spent( "ReceiveUpdates", &now.elapsed(), - &format!(" len: {}", data.len()), + &format!(" len: {}", nodes.len()), ); None } @@ -1361,6 +1385,7 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) { #[cfg(test)] mod tests { + use bincode::serialize; use budget_instruction::Vote; use cluster_info::{ ClusterInfo, ClusterInfoError, Node, NodeInfo, Protocol, FULLNODE_PORT_RANGE, @@ -1465,9 +1490,9 @@ mod tests { //should be accepted, since the update is for the same address field as the one we know assert_eq!(cluster_info.table[&d.id].version, 1); } - fn sorted(ls: &Vec) -> Vec { + fn sorted(ls: &Vec<(NodeInfo, u64)>) -> Vec<(NodeInfo, u64)> { let mut copy: Vec<_> = ls.iter().cloned().collect(); - copy.sort_by(|x, y| x.id.cmp(&y.id)); + copy.sort_by(|x, y| x.0.id.cmp(&y.0.id)); copy } #[test] @@ -1484,42 +1509,92 @@ mod tests { assert_eq!(d1.contact_info.tpu, socketaddr!("127.0.0.1:1234")); } #[test] + fn max_updates() { + let size = 1024 * 64 - 512; + let num = ClusterInfo::max_updates(size); + let msg = Protocol::ReceiveUpdates { + from: Default::default(), + max_update_index: 0, + nodes: vec![(NodeInfo::new_unspecified(), 0); num], + }; + trace!("{} {} {}", serialize(&msg).unwrap().len(), size, num); + assert!(serialize(&msg).unwrap().len() <= size); + } + #[test] fn update_test() { let d1 = NodeInfo::new_localhost(Keypair::new().pubkey()); let d2 = NodeInfo::new_localhost(Keypair::new().pubkey()); let d3 = NodeInfo::new_localhost(Keypair::new().pubkey()); let mut cluster_info = ClusterInfo::new(d1.clone()).expect("ClusterInfo::new"); - let (key, ix, ups) = cluster_info.get_updates_since(0); + let (key, ix, ups) = cluster_info.get_updates_since(0, 1); assert_eq!(key, d1.id); assert_eq!(ix, 1); assert_eq!(ups.len(), 1); - assert_eq!(sorted(&ups), sorted(&vec![d1.clone()])); + assert_eq!(sorted(&ups), sorted(&vec![(d1.clone(), 0)])); cluster_info.insert(&d2); - let (key, ix, ups) = cluster_info.get_updates_since(0); + let (key, ix, ups) = cluster_info.get_updates_since(0, 2); assert_eq!(key, d1.id); assert_eq!(ix, 2); assert_eq!(ups.len(), 2); - assert_eq!(sorted(&ups), sorted(&vec![d1.clone(), d2.clone()])); + assert_eq!( + sorted(&ups), + sorted(&vec![(d1.clone(), 0), (d2.clone(), 0)]) + ); cluster_info.insert(&d3); - let (key, ix, ups) = cluster_info.get_updates_since(0); + let (key, ix, ups) = cluster_info.get_updates_since(0, 3); assert_eq!(key, d1.id); assert_eq!(ix, 3); assert_eq!(ups.len(), 3); assert_eq!( sorted(&ups), - sorted(&vec![d1.clone(), d2.clone(), d3.clone()]) + sorted(&vec![(d1.clone(), 0), (d2.clone(), 0), (d3.clone(), 0)]) ); let mut cluster_info2 = ClusterInfo::new(d2.clone()).expect("ClusterInfo::new"); - cluster_info2.apply_updates(key, ix, &ups, &vec![]); + cluster_info2.apply_updates(key, ix, &ups); assert_eq!(cluster_info2.table.values().len(), 3); assert_eq!( - sorted(&cluster_info2.table.values().map(|x| x.clone()).collect()), - sorted(&cluster_info.table.values().map(|x| x.clone()).collect()) + sorted( + &cluster_info2 + .table + .values() + .map(|x| (x.clone(), 0)) + .collect() + ), + sorted( + &cluster_info + .table + .values() + .map(|x| (x.clone(), 0)) + .collect() + ) ); let d4 = NodeInfo::new_entry_point(&socketaddr!("127.0.0.4:1234")); cluster_info.insert(&d4); - let (_key, _ix, ups) = cluster_info.get_updates_since(0); - assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3])); + let (_key, ix, ups) = cluster_info.get_updates_since(0, 3); + assert_eq!( + sorted(&ups), + sorted(&vec![(d2.clone(), 0), (d1.clone(), 0), (d3.clone(), 0)]) + ); + assert_eq!(ix, 3); + + let (_key, ix, ups) = cluster_info.get_updates_since(0, 2); + assert_eq!( + sorted(&ups), + sorted(&vec![(d2.clone(), 0), (d1.clone(), 0)]) + ); + assert_eq!(ix, 2); + + let (_key, ix, ups) = cluster_info.get_updates_since(0, 1); + assert_eq!(sorted(&ups), sorted(&vec![(d1.clone(), 0)])); + assert_eq!(ix, 1); + + let (_key, ix, ups) = cluster_info.get_updates_since(1, 3); + assert_eq!(ups.len(), 2); + assert_eq!(ix, 3); + assert_eq!(sorted(&ups), sorted(&vec![(d2, 0), (d3, 0)])); + let (_key, ix, ups) = cluster_info.get_updates_since(3, 3); + assert_eq!(ups.len(), 0); + assert_eq!(ix, 0); } #[test] fn window_index_request() { diff --git a/tests/multinode.rs b/tests/multinode.rs index 482aa502e1..b0115e9f12 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -651,30 +651,28 @@ fn test_multi_node_dynamic_network() { let t1: Vec<_> = (0..num_nodes) .into_iter() .map(|n| { - let leader_data = leader_data.clone(); - let alice_clone = alice_arc.clone(); Builder::new() .name("keypair-thread".to_string()) .spawn(move || { info!("Spawned thread {}", n); - let keypair = Keypair::new(); - //send some tokens to the new validators - let bal = retry_send_tx_and_retry_get_balance( - &leader_data, - &alice_clone.read().unwrap(), - &keypair.pubkey(), - Some(500), - ); - assert_eq!(bal, Some(500)); - info!("sent balance to[{}/{}] {}", n, num_nodes, keypair.pubkey()); - keypair + Keypair::new() }).unwrap() }).collect(); info!("Waiting for keypairs to be created"); let keypairs: Vec<_> = t1.into_iter().map(|t| t.join().unwrap()).collect(); info!("keypairs created"); - + keypairs.iter().enumerate().for_each(|(n, keypair)| { + //send some tokens to the new validators + let bal = retry_send_tx_and_retry_get_balance( + &leader_data, + &alice_arc.read().unwrap(), + &keypair.pubkey(), + Some(500), + ); + assert_eq!(bal, Some(500)); + info!("sent balance to [{}/{}] {}", n, num_nodes, keypair.pubkey()); + }); let t2: Vec<_> = keypairs .into_iter() .map(|keypair| {