diff --git a/src/cluster_info.rs b/src/cluster_info.rs index edf409404f..2154a4c179 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -157,6 +157,7 @@ enum Protocol { /// Window protocol messages /// TODO: move this message to a different module RequestWindowIndex(NodeInfo, u64, u64), + RequestHighestWindowIndex(NodeInfo, u64, u64), } impl ClusterInfo { @@ -712,10 +713,22 @@ impl ClusterInfo { Ok(out) } + pub fn window_highest_index_request_bytes( + &self, + slot_height: u64, + blob_index: u64, + ) -> Result> { + let req = + Protocol::RequestHighestWindowIndex(self.my_data().clone(), slot_height, blob_index); + let out = serialize(&req)?; + Ok(out) + } + pub fn window_index_request( &self, slot_height: u64, blob_index: u64, + get_highest: bool, ) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication, as indicated // by a valid tvu port location @@ -725,7 +738,13 @@ impl ClusterInfo { } let n = thread_rng().gen::() % valid.len(); let addr = valid[n].gossip; // send the request to the peer's gossip port - let out = self.window_index_request_bytes(slot_height, blob_index)?; + let out = { + if get_highest { + self.window_highest_index_request_bytes(slot_height, blob_index)? + } else { + self.window_index_request_bytes(slot_height, blob_index)? + } + }; submit( influxdb::Point::new("cluster-info") @@ -735,6 +754,7 @@ impl ClusterInfo { Ok((addr, out)) } + fn new_pull_requests(&mut self) -> Vec<(SocketAddr, Protocol)> { let now = timestamp(); let pulls: Vec<_> = self.gossip.new_pull_request(now).ok().into_iter().collect(); @@ -854,9 +874,6 @@ impl ClusterInfo { .unwrap() } - // TODO: To support repairing multiple slots, broadcast needs to reset - // blob index for every slot, and window requests should be by slot + index. - // Issue: https://github.com/solana-labs/solana/issues/2440 fn run_window_request( from: &NodeInfo, from_addr: &SocketAddr, @@ -889,6 +906,32 @@ impl ClusterInfo { vec![] } + fn run_highest_window_request( + from_addr: &SocketAddr, + blocktree: Option<&Arc>, + slot_height: u64, + highest_index: u64, + ) -> Vec { + if let Some(blocktree) = blocktree { + // Try to find the requested index in one of the slots + let meta = blocktree.meta(slot_height); + + if let Ok(Some(meta)) = meta { + if meta.received > highest_index { + // meta.received must be at least 1 by this point + let blob = blocktree.get_data_blob(slot_height, meta.received - 1); + + if let Ok(Some(mut blob)) = blob { + blob.meta.set_addr(from_addr); + return vec![Arc::new(RwLock::new(blob))]; + } + } + } + } + + vec![] + } + //TODO we should first coalesce all the requests fn handle_blob( obj: &Arc>, @@ -1012,6 +1055,7 @@ impl ClusterInfo { slot_height: u64, blob_index: u64, from_addr: &SocketAddr, + is_get_highest: bool, ) -> Vec { let now = Instant::now(); @@ -1039,14 +1083,20 @@ impl ClusterInfo { slot_height, blob_index, ); - let res = Self::run_window_request( - &from, - &from_addr, - blocktree, - &my_info, - slot_height, - blob_index, - ); + let res = { + if is_get_highest { + Self::run_highest_window_request(&from_addr, blocktree, slot_height, blob_index) + } else { + Self::run_window_request( + &from, + &from_addr, + blocktree, + &my_info, + slot_height, + blob_index, + ) + } + }; report_time_spent( "RequestWindowIndex", &now.elapsed(), @@ -1054,6 +1104,7 @@ impl ClusterInfo { ); res } + fn handle_protocol( me: &Arc>, from_addr: &SocketAddr, @@ -1120,6 +1171,18 @@ impl ClusterInfo { slot_height, blob_index, from_addr, + false, + ) + } + Protocol::RequestHighestWindowIndex(from, slot_height, highest_index) => { + Self::handle_request_window_index( + me, + &from, + blocktree, + slot_height, + highest_index, + from_addr, + true, ) } } @@ -1368,7 +1431,7 @@ mod tests { fn window_index_request() { let me = NodeInfo::new_localhost(Keypair::new().pubkey(), timestamp()); let mut cluster_info = ClusterInfo::new(me); - let rv = cluster_info.window_index_request(0, 0); + let rv = cluster_info.window_index_request(0, 0, false); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); let gossip_addr = socketaddr!([127, 0, 0, 1], 1234); @@ -1383,7 +1446,7 @@ mod tests { 0, ); cluster_info.insert_info(nxt.clone()); - let rv = cluster_info.window_index_request(0, 0).unwrap(); + let rv = cluster_info.window_index_request(0, 0, false).unwrap(); assert_eq!(nxt.gossip, gossip_addr); assert_eq!(rv.0, nxt.gossip); @@ -1403,7 +1466,7 @@ mod tests { let mut two = false; while !one || !two { //this randomly picks an option, so eventually it should pick both - let rv = cluster_info.window_index_request(0, 0).unwrap(); + let rv = cluster_info.window_index_request(0, 0, false).unwrap(); if rv.0 == gossip_addr { one = true; } @@ -1472,6 +1535,54 @@ mod tests { Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); } + /// test run_window_requestwindow requests respond with the right blob, and do not overrun + #[test] + fn run_highest_window_request() { + solana_logger::setup(); + let ledger_path = get_tmp_ledger_path("run_highest_window_request"); + { + let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); + let rv = + ClusterInfo::run_highest_window_request(&socketaddr_any!(), Some(&blocktree), 0, 0); + assert!(rv.is_empty()); + + let data_size = 1; + let max_index = 5; + let blobs: Vec<_> = (0..max_index) + .map(|i| { + let mut blob = Blob::default(); + blob.set_size(data_size); + blob.set_index(i); + blob.set_slot(2); + blob.meta.size = data_size + BLOB_HEADER_SIZE; + blob + }) + .collect(); + + blocktree + .write_blobs(&blobs) + .expect("Expect successful ledger write"); + + let rv = + ClusterInfo::run_highest_window_request(&socketaddr_any!(), Some(&blocktree), 2, 1); + assert!(!rv.is_empty()); + let v = rv[0].clone(); + assert_eq!(v.read().unwrap().index(), max_index - 1); + assert_eq!(v.read().unwrap().slot(), 2); + assert_eq!(v.read().unwrap().meta.size, BLOB_HEADER_SIZE + data_size); + + let rv = ClusterInfo::run_highest_window_request( + &socketaddr_any!(), + Some(&blocktree), + 2, + max_index, + ); + assert!(rv.is_empty()); + } + + Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); + } + #[test] fn test_default_leader() { solana_logger::setup(); diff --git a/src/repair_service.rs b/src/repair_service.rs index f01dc4b0f7..fac3b56025 100644 --- a/src/repair_service.rs +++ b/src/repair_service.rs @@ -59,7 +59,7 @@ impl RepairService { cluster_info .read() .unwrap() - .window_index_request(slot_height, blob_index) + .window_index_request(slot_height, blob_index, false) .map(|result| (result, slot_height, blob_index)) .ok() })