This reverts commit e61257695f.
			
			
This commit is contained in:
		| @@ -16,7 +16,6 @@ use solana_core::{ | ||||
|     packet::{limited_deserialize, PACKET_DATA_SIZE}, | ||||
|     repair_service, | ||||
|     repair_service::{RepairService, RepairSlotRange, RepairStrategy}, | ||||
|     serve_repair::ServeRepair, | ||||
|     shred_fetch_stage::ShredFetchStage, | ||||
|     sigverify_stage::{DisabledSigVerifier, SigVerifyStage}, | ||||
|     storage_stage::NUM_STORAGE_SAMPLES, | ||||
| @@ -196,7 +195,13 @@ impl Archiver { | ||||
|             Blockstore::open(ledger_path).expect("Expected to be able to open database ledger"), | ||||
|         ); | ||||
|  | ||||
|         let gossip_service = GossipService::new(&cluster_info, None, node.sockets.gossip, &exit); | ||||
|         let gossip_service = GossipService::new( | ||||
|             &cluster_info, | ||||
|             Some(blockstore.clone()), | ||||
|             None, | ||||
|             node.sockets.gossip, | ||||
|             &exit, | ||||
|         ); | ||||
|  | ||||
|         info!("Connecting to the cluster via {:?}", cluster_entrypoint); | ||||
|         let (nodes, _) = | ||||
| @@ -809,7 +814,7 @@ impl Archiver { | ||||
|     /// It is recommended to use a temporary blockstore for this since the download will not verify | ||||
|     /// shreds received and might impact the chaining of shreds across slots | ||||
|     pub fn download_from_archiver( | ||||
|         serve_repair: &ServeRepair, | ||||
|         cluster_info: &Arc<RwLock<ClusterInfo>>, | ||||
|         archiver_info: &ContactInfo, | ||||
|         blockstore: &Arc<Blockstore>, | ||||
|         slots_per_segment: u64, | ||||
| @@ -829,10 +834,10 @@ impl Archiver { | ||||
|             Recycler::default(), | ||||
|             "archiver_reeciver", | ||||
|         ); | ||||
|         let id = serve_repair.keypair().pubkey(); | ||||
|         let id = cluster_info.read().unwrap().id(); | ||||
|         info!( | ||||
|             "Sending repair requests from: {} to: {}", | ||||
|             serve_repair.my_info().id, | ||||
|             cluster_info.read().unwrap().my_data().id, | ||||
|             archiver_info.gossip | ||||
|         ); | ||||
|         let repair_slot_range = RepairSlotRange { | ||||
| @@ -852,7 +857,9 @@ impl Archiver { | ||||
|                 let reqs: Vec<_> = repairs | ||||
|                     .into_iter() | ||||
|                     .filter_map(|repair_request| { | ||||
|                         serve_repair | ||||
|                         cluster_info | ||||
|                             .read() | ||||
|                             .unwrap() | ||||
|                             .map_repair_request(&repair_request) | ||||
|                             .map(|result| ((archiver_info.gossip, result), repair_request)) | ||||
|                             .ok() | ||||
|   | ||||
| @@ -21,6 +21,7 @@ use crate::{ | ||||
|     crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, | ||||
|     crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, Vote}, | ||||
|     packet::{Packet, PACKET_DATA_SIZE}, | ||||
|     repair_service::RepairType, | ||||
|     result::{Error, Result}, | ||||
|     sendmmsg::{multicast, send_mmsg}, | ||||
|     weighted_shuffle::{weighted_best, weighted_shuffle}, | ||||
| @@ -28,7 +29,8 @@ use crate::{ | ||||
| use bincode::{serialize, serialized_size}; | ||||
| use core::cmp; | ||||
| use itertools::Itertools; | ||||
| use solana_ledger::{bank_forks::BankForks, staking_utils}; | ||||
| use rand::{thread_rng, Rng}; | ||||
| use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore, staking_utils}; | ||||
| use solana_measure::thread_mem_usage; | ||||
| use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; | ||||
| use solana_net_utils::{ | ||||
| @@ -61,12 +63,15 @@ pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000); | ||||
| pub const DATA_PLANE_FANOUT: usize = 200; | ||||
| /// milliseconds we sleep for between gossip requests | ||||
| pub const GOSSIP_SLEEP_MILLIS: u64 = 100; | ||||
|  | ||||
| /// the number of slots to respond with when responding to `Orphan` requests | ||||
| pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; | ||||
| /// The maximum size of a bloom filter | ||||
| pub const MAX_BLOOM_SIZE: usize = 1018; | ||||
| pub const MAX_BLOOM_SIZE: usize = 1028; | ||||
| /// The maximum size of a protocol payload | ||||
| const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE; | ||||
| /// The largest protocol header size | ||||
| const MAX_PROTOCOL_HEADER_SIZE: u64 = 214; | ||||
| const MAX_PROTOCOL_HEADER_SIZE: u64 = 204; | ||||
|  | ||||
| #[derive(Debug, PartialEq, Eq)] | ||||
| pub enum ClusterInfoError { | ||||
| @@ -169,6 +174,12 @@ enum Protocol { | ||||
|     PullResponse(Pubkey, Vec<CrdsValue>), | ||||
|     PushMessage(Pubkey, Vec<CrdsValue>), | ||||
|     PruneMessage(Pubkey, PruneData), | ||||
|  | ||||
|     /// Window protocol messages | ||||
|     /// TODO: move this message to a different module | ||||
|     RequestWindowIndex(ContactInfo, u64, u64), | ||||
|     RequestHighestWindowIndex(ContactInfo, u64, u64), | ||||
|     RequestOrphan(ContactInfo, u64), | ||||
| } | ||||
|  | ||||
| impl ClusterInfo { | ||||
| @@ -518,7 +529,7 @@ impl ClusterInfo { | ||||
|     } | ||||
|  | ||||
|     /// all tvu peers with valid gossip addrs that likely have the slot being requested | ||||
|     pub fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> { | ||||
|     fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> { | ||||
|         let me = self.my_data(); | ||||
|         ClusterInfo::tvu_peers(self) | ||||
|             .into_iter() | ||||
| @@ -860,6 +871,61 @@ impl ClusterInfo { | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn window_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> { | ||||
|         let req = Protocol::RequestWindowIndex(self.my_data(), slot, shred_index); | ||||
|         let out = serialize(&req)?; | ||||
|         Ok(out) | ||||
|     } | ||||
|  | ||||
|     fn window_highest_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> { | ||||
|         let req = Protocol::RequestHighestWindowIndex(self.my_data(), slot, shred_index); | ||||
|         let out = serialize(&req)?; | ||||
|         Ok(out) | ||||
|     } | ||||
|  | ||||
|     fn orphan_bytes(&self, slot: Slot) -> Result<Vec<u8>> { | ||||
|         let req = Protocol::RequestOrphan(self.my_data(), slot); | ||||
|         let out = serialize(&req)?; | ||||
|         Ok(out) | ||||
|     } | ||||
|  | ||||
|     pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec<u8>)> { | ||||
|         // find a peer that appears to be accepting replication and has the desired slot, as indicated | ||||
|         // by a valid tvu port location | ||||
|         let valid: Vec<_> = self.repair_peers(repair_request.slot()); | ||||
|         if valid.is_empty() { | ||||
|             return Err(ClusterInfoError::NoPeers.into()); | ||||
|         } | ||||
|         let n = thread_rng().gen::<usize>() % valid.len(); | ||||
|         let addr = valid[n].gossip; // send the request to the peer's gossip port | ||||
|         let out = self.map_repair_request(repair_request)?; | ||||
|  | ||||
|         Ok((addr, out)) | ||||
|     } | ||||
|     pub fn map_repair_request(&self, repair_request: &RepairType) -> Result<Vec<u8>> { | ||||
|         match repair_request { | ||||
|             RepairType::Shred(slot, shred_index) => { | ||||
|                 datapoint_debug!( | ||||
|                     "cluster_info-repair", | ||||
|                     ("repair-slot", *slot, i64), | ||||
|                     ("repair-ix", *shred_index, i64) | ||||
|                 ); | ||||
|                 Ok(self.window_index_request_bytes(*slot, *shred_index)?) | ||||
|             } | ||||
|             RepairType::HighestShred(slot, shred_index) => { | ||||
|                 datapoint_debug!( | ||||
|                     "cluster_info-repair_highest", | ||||
|                     ("repair-highest-slot", *slot, i64), | ||||
|                     ("repair-highest-ix", *shred_index, i64) | ||||
|                 ); | ||||
|                 Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?) | ||||
|             } | ||||
|             RepairType::Orphan(slot) => { | ||||
|                 datapoint_debug!("cluster_info-repair_orphan", ("repair-orphan", *slot, i64)); | ||||
|                 Ok(self.orphan_bytes(*slot)?) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|     // If the network entrypoint hasn't been discovered yet, add it to the crds table | ||||
|     fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>) { | ||||
|         let pull_from_entrypoint = if let Some(entrypoint) = &mut self.entrypoint { | ||||
| @@ -1111,9 +1177,117 @@ impl ClusterInfo { | ||||
|             .unwrap() | ||||
|     } | ||||
|  | ||||
|     fn get_data_shred_as_packet( | ||||
|         blockstore: &Arc<Blockstore>, | ||||
|         slot: Slot, | ||||
|         shred_index: u64, | ||||
|         dest: &SocketAddr, | ||||
|     ) -> Result<Option<Packet>> { | ||||
|         let data = blockstore.get_data_shred(slot, shred_index)?; | ||||
|         Ok(data.map(|data| { | ||||
|             let mut packet = Packet::default(); | ||||
|             packet.meta.size = data.len(); | ||||
|             packet.meta.set_addr(dest); | ||||
|             packet.data.copy_from_slice(&data); | ||||
|             packet | ||||
|         })) | ||||
|     } | ||||
|  | ||||
|     fn run_window_request( | ||||
|         recycler: &PacketsRecycler, | ||||
|         from: &ContactInfo, | ||||
|         from_addr: &SocketAddr, | ||||
|         blockstore: Option<&Arc<Blockstore>>, | ||||
|         me: &ContactInfo, | ||||
|         slot: Slot, | ||||
|         shred_index: u64, | ||||
|     ) -> Option<Packets> { | ||||
|         if let Some(blockstore) = blockstore { | ||||
|             // Try to find the requested index in one of the slots | ||||
|             let packet = Self::get_data_shred_as_packet(blockstore, slot, shred_index, from_addr); | ||||
|  | ||||
|             if let Ok(Some(packet)) = packet { | ||||
|                 inc_new_counter_debug!("cluster_info-window-request-ledger", 1); | ||||
|                 return Some(Packets::new_with_recycler_data( | ||||
|                     recycler, | ||||
|                     "run_window_request", | ||||
|                     vec![packet], | ||||
|                 )); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         inc_new_counter_debug!("cluster_info-window-request-fail", 1); | ||||
|         trace!( | ||||
|             "{}: failed RequestWindowIndex {} {} {}", | ||||
|             me.id, | ||||
|             from.id, | ||||
|             slot, | ||||
|             shred_index, | ||||
|         ); | ||||
|  | ||||
|         None | ||||
|     } | ||||
|  | ||||
|     fn run_highest_window_request( | ||||
|         recycler: &PacketsRecycler, | ||||
|         from_addr: &SocketAddr, | ||||
|         blockstore: Option<&Arc<Blockstore>>, | ||||
|         slot: Slot, | ||||
|         highest_index: u64, | ||||
|     ) -> Option<Packets> { | ||||
|         let blockstore = blockstore?; | ||||
|         // Try to find the requested index in one of the slots | ||||
|         let meta = blockstore.meta(slot).ok()??; | ||||
|         if meta.received > highest_index { | ||||
|             // meta.received must be at least 1 by this point | ||||
|             let packet = | ||||
|                 Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr) | ||||
|                     .ok()??; | ||||
|             return Some(Packets::new_with_recycler_data( | ||||
|                 recycler, | ||||
|                 "run_highest_window_request", | ||||
|                 vec![packet], | ||||
|             )); | ||||
|         } | ||||
|         None | ||||
|     } | ||||
|  | ||||
|     fn run_orphan( | ||||
|         recycler: &PacketsRecycler, | ||||
|         from_addr: &SocketAddr, | ||||
|         blockstore: Option<&Arc<Blockstore>>, | ||||
|         mut slot: Slot, | ||||
|         max_responses: usize, | ||||
|     ) -> Option<Packets> { | ||||
|         let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan"); | ||||
|         if let Some(blockstore) = blockstore { | ||||
|             // Try to find the next "n" parent slots of the input slot | ||||
|             while let Ok(Some(meta)) = blockstore.meta(slot) { | ||||
|                 if meta.received == 0 { | ||||
|                     break; | ||||
|                 } | ||||
|                 let packet = | ||||
|                     Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr); | ||||
|                 if let Ok(Some(packet)) = packet { | ||||
|                     res.packets.push(packet); | ||||
|                 } | ||||
|                 if meta.is_parent_set() && res.packets.len() <= max_responses { | ||||
|                     slot = meta.parent_slot; | ||||
|                 } else { | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         if res.is_empty() { | ||||
|             return None; | ||||
|         } | ||||
|         Some(res) | ||||
|     } | ||||
|  | ||||
|     fn handle_packets( | ||||
|         me: &Arc<RwLock<Self>>, | ||||
|         recycler: &PacketsRecycler, | ||||
|         blockstore: Option<&Arc<Blockstore>>, | ||||
|         stakes: &HashMap<Pubkey, u64>, | ||||
|         packets: Packets, | ||||
|         response_sender: &PacketSender, | ||||
| @@ -1220,6 +1394,13 @@ impl ClusterInfo { | ||||
|                             ("prune_message", (allocated.get() - start) as i64, i64), | ||||
|                         ); | ||||
|                     } | ||||
|                     _ => { | ||||
|                         let rsp = | ||||
|                             Self::handle_repair(me, recycler, &from_addr, blockstore, request); | ||||
|                         if let Some(rsp) = rsp { | ||||
|                             let _ignore_disconnect = response_sender.send(rsp); | ||||
|                         } | ||||
|                     } | ||||
|                 }) | ||||
|         }); | ||||
|         // process the collected pulls together | ||||
| @@ -1347,10 +1528,104 @@ impl ClusterInfo { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn get_repair_sender(request: &Protocol) -> &ContactInfo { | ||||
|         match request { | ||||
|             Protocol::RequestWindowIndex(ref from, _, _) => from, | ||||
|             Protocol::RequestHighestWindowIndex(ref from, _, _) => from, | ||||
|             Protocol::RequestOrphan(ref from, _) => from, | ||||
|             _ => panic!("Not a repair request"), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn handle_repair( | ||||
|         me: &Arc<RwLock<Self>>, | ||||
|         recycler: &PacketsRecycler, | ||||
|         from_addr: &SocketAddr, | ||||
|         blockstore: Option<&Arc<Blockstore>>, | ||||
|         request: Protocol, | ||||
|     ) -> Option<Packets> { | ||||
|         let now = Instant::now(); | ||||
|  | ||||
|         //TODO this doesn't depend on cluster_info module, could be moved | ||||
|         //but we are using the listen thread to service these request | ||||
|         //TODO verify from is signed | ||||
|  | ||||
|         let self_id = me.read().unwrap().gossip.id; | ||||
|         let from = Self::get_repair_sender(&request); | ||||
|         if from.id == me.read().unwrap().gossip.id { | ||||
|             warn!( | ||||
|                 "{}: Ignored received repair request from ME {}", | ||||
|                 self_id, from.id, | ||||
|             ); | ||||
|             inc_new_counter_debug!("cluster_info-handle-repair--eq", 1); | ||||
|             return None; | ||||
|         } | ||||
|  | ||||
|         me.write() | ||||
|             .unwrap() | ||||
|             .gossip | ||||
|             .crds | ||||
|             .update_record_timestamp(&from.id, timestamp()); | ||||
|         let my_info = me.read().unwrap().my_data(); | ||||
|  | ||||
|         let (res, label) = { | ||||
|             match &request { | ||||
|                 Protocol::RequestWindowIndex(from, slot, shred_index) => { | ||||
|                     inc_new_counter_debug!("cluster_info-request-window-index", 1); | ||||
|                     ( | ||||
|                         Self::run_window_request( | ||||
|                             recycler, | ||||
|                             from, | ||||
|                             &from_addr, | ||||
|                             blockstore, | ||||
|                             &my_info, | ||||
|                             *slot, | ||||
|                             *shred_index, | ||||
|                         ), | ||||
|                         "RequestWindowIndex", | ||||
|                     ) | ||||
|                 } | ||||
|  | ||||
|                 Protocol::RequestHighestWindowIndex(_, slot, highest_index) => { | ||||
|                     inc_new_counter_debug!("cluster_info-request-highest-window-index", 1); | ||||
|                     ( | ||||
|                         Self::run_highest_window_request( | ||||
|                             recycler, | ||||
|                             &from_addr, | ||||
|                             blockstore, | ||||
|                             *slot, | ||||
|                             *highest_index, | ||||
|                         ), | ||||
|                         "RequestHighestWindowIndex", | ||||
|                     ) | ||||
|                 } | ||||
|                 Protocol::RequestOrphan(_, slot) => { | ||||
|                     inc_new_counter_debug!("cluster_info-request-orphan", 1); | ||||
|                     ( | ||||
|                         Self::run_orphan( | ||||
|                             recycler, | ||||
|                             &from_addr, | ||||
|                             blockstore, | ||||
|                             *slot, | ||||
|                             MAX_ORPHAN_REPAIR_RESPONSES, | ||||
|                         ), | ||||
|                         "RequestOrphan", | ||||
|                     ) | ||||
|                 } | ||||
|                 _ => panic!("Not a repair request"), | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         trace!("{}: received repair request: {:?}", self_id, request); | ||||
|         report_time_spent(label, &now.elapsed(), ""); | ||||
|         res | ||||
|     } | ||||
|  | ||||
|     /// Process messages from the network | ||||
|     fn run_listen( | ||||
|         obj: &Arc<RwLock<Self>>, | ||||
|         recycler: &PacketsRecycler, | ||||
|         blockstore: Option<&Arc<Blockstore>>, | ||||
|         bank_forks: Option<&Arc<RwLock<BankForks>>>, | ||||
|         requests_receiver: &PacketReceiver, | ||||
|         response_sender: &PacketSender, | ||||
| @@ -1365,11 +1640,12 @@ impl ClusterInfo { | ||||
|             None => HashMap::new(), | ||||
|         }; | ||||
|  | ||||
|         Self::handle_packets(obj, &recycler, &stakes, reqs, response_sender); | ||||
|         Self::handle_packets(obj, &recycler, blockstore, &stakes, reqs, response_sender); | ||||
|         Ok(()) | ||||
|     } | ||||
|     pub fn listen( | ||||
|         me: Arc<RwLock<Self>>, | ||||
|         blockstore: Option<Arc<Blockstore>>, | ||||
|         bank_forks: Option<Arc<RwLock<BankForks>>>, | ||||
|         requests_receiver: PacketReceiver, | ||||
|         response_sender: PacketSender, | ||||
| @@ -1383,6 +1659,7 @@ impl ClusterInfo { | ||||
|                 let e = Self::run_listen( | ||||
|                     &me, | ||||
|                     &recycler, | ||||
|                     blockstore.as_ref(), | ||||
|                     bank_forks.as_ref(), | ||||
|                     &requests_receiver, | ||||
|                     &response_sender, | ||||
| @@ -1417,7 +1694,6 @@ impl ClusterInfo { | ||||
|             dummy_addr, | ||||
|             dummy_addr, | ||||
|             dummy_addr, | ||||
|             dummy_addr, | ||||
|             timestamp(), | ||||
|         ) | ||||
|     } | ||||
| @@ -1498,7 +1774,6 @@ pub struct Sockets { | ||||
|     pub repair: UdpSocket, | ||||
|     pub retransmit_sockets: Vec<UdpSocket>, | ||||
|     pub storage: Option<UdpSocket>, | ||||
|     pub serve_repair: UdpSocket, | ||||
| } | ||||
|  | ||||
| #[derive(Debug)] | ||||
| @@ -1519,10 +1794,9 @@ impl Node { | ||||
|         let storage = UdpSocket::bind("127.0.0.1:0").unwrap(); | ||||
|         let empty = "0.0.0.0:0".parse().unwrap(); | ||||
|         let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); | ||||
|  | ||||
|         let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; | ||||
|         let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); | ||||
|         let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap(); | ||||
|  | ||||
|         let info = ContactInfo::new( | ||||
|             pubkey, | ||||
|             gossip.local_addr().unwrap(), | ||||
| @@ -1534,7 +1808,6 @@ impl Node { | ||||
|             storage.local_addr().unwrap(), | ||||
|             empty, | ||||
|             empty, | ||||
|             serve_repair.local_addr().unwrap(), | ||||
|             timestamp(), | ||||
|         ); | ||||
|  | ||||
| @@ -1549,7 +1822,6 @@ impl Node { | ||||
|                 broadcast, | ||||
|                 repair, | ||||
|                 retransmit_sockets: vec![retransmit], | ||||
|                 serve_repair, | ||||
|                 storage: Some(storage), | ||||
|                 ip_echo: None, | ||||
|             }, | ||||
| @@ -1572,7 +1844,6 @@ impl Node { | ||||
|         let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; | ||||
|         let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); | ||||
|         let storage = UdpSocket::bind("0.0.0.0:0").unwrap(); | ||||
|         let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap(); | ||||
|         let info = ContactInfo::new( | ||||
|             pubkey, | ||||
|             gossip_addr, | ||||
| @@ -1584,7 +1855,6 @@ impl Node { | ||||
|             storage.local_addr().unwrap(), | ||||
|             rpc_addr, | ||||
|             rpc_pubsub_addr, | ||||
|             serve_repair.local_addr().unwrap(), | ||||
|             timestamp(), | ||||
|         ); | ||||
|         Node { | ||||
| @@ -1600,7 +1870,6 @@ impl Node { | ||||
|                 repair, | ||||
|                 retransmit_sockets: vec![retransmit_socket], | ||||
|                 storage: None, | ||||
|                 serve_repair, | ||||
|             }, | ||||
|         } | ||||
|     } | ||||
| @@ -1643,8 +1912,6 @@ impl Node { | ||||
|             multi_bind_in_range(port_range, 8).expect("retransmit multi_bind"); | ||||
|  | ||||
|         let (repair_port, repair) = Self::bind(port_range); | ||||
|         let (serve_repair_port, serve_repair) = Self::bind(port_range); | ||||
|  | ||||
|         let (_, broadcast) = multi_bind_in_range(port_range, 4).expect("broadcast multi_bind"); | ||||
|  | ||||
|         let info = ContactInfo::new( | ||||
| @@ -1655,7 +1922,6 @@ impl Node { | ||||
|             SocketAddr::new(gossip_addr.ip(), repair_port), | ||||
|             SocketAddr::new(gossip_addr.ip(), tpu_port), | ||||
|             SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), | ||||
|             SocketAddr::new(gossip_addr.ip(), serve_repair_port), | ||||
|             socketaddr_any!(), | ||||
|             socketaddr_any!(), | ||||
|             socketaddr_any!(), | ||||
| @@ -1675,7 +1941,6 @@ impl Node { | ||||
|                 repair, | ||||
|                 retransmit_sockets, | ||||
|                 storage: None, | ||||
|                 serve_repair, | ||||
|                 ip_echo: Some(ip_echo), | ||||
|             }, | ||||
|         } | ||||
| @@ -1712,8 +1977,18 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) { | ||||
| mod tests { | ||||
|     use super::*; | ||||
|     use crate::crds_value::CrdsValueLabel; | ||||
|     use crate::repair_service::RepairType; | ||||
|     use crate::result::Error; | ||||
|     use rayon::prelude::*; | ||||
|     use solana_ledger::blockstore::make_many_slot_entries; | ||||
|     use solana_ledger::blockstore::Blockstore; | ||||
|     use solana_ledger::blockstore_processor::fill_blockstore_slot_with_ticks; | ||||
|     use solana_ledger::get_tmp_ledger_path; | ||||
|     use solana_ledger::shred::{ | ||||
|         max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, | ||||
|     }; | ||||
|     use solana_perf::test_tx::test_tx; | ||||
|     use solana_sdk::hash::Hash; | ||||
|     use solana_sdk::signature::{Keypair, KeypairUtil}; | ||||
|     use std::collections::HashSet; | ||||
|     use std::net::{IpAddr, Ipv4Addr}; | ||||
| @@ -1784,6 +2059,242 @@ mod tests { | ||||
|         let label = CrdsValueLabel::ContactInfo(d.id); | ||||
|         assert!(cluster_info.gossip.crds.lookup(&label).is_none()); | ||||
|     } | ||||
|     #[test] | ||||
|     fn window_index_request() { | ||||
|         let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); | ||||
|         let mut cluster_info = ClusterInfo::new_with_invalid_keypair(me); | ||||
|         let rv = cluster_info.repair_request(&RepairType::Shred(0, 0)); | ||||
|         assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); | ||||
|  | ||||
|         let gossip_addr = socketaddr!([127, 0, 0, 1], 1234); | ||||
|         let nxt = ContactInfo::new( | ||||
|             &Pubkey::new_rand(), | ||||
|             gossip_addr, | ||||
|             socketaddr!([127, 0, 0, 1], 1235), | ||||
|             socketaddr!([127, 0, 0, 1], 1236), | ||||
|             socketaddr!([127, 0, 0, 1], 1237), | ||||
|             socketaddr!([127, 0, 0, 1], 1238), | ||||
|             socketaddr!([127, 0, 0, 1], 1239), | ||||
|             socketaddr!([127, 0, 0, 1], 1240), | ||||
|             socketaddr!([127, 0, 0, 1], 1241), | ||||
|             socketaddr!([127, 0, 0, 1], 1242), | ||||
|             0, | ||||
|         ); | ||||
|         cluster_info.insert_info(nxt.clone()); | ||||
|         let rv = cluster_info | ||||
|             .repair_request(&RepairType::Shred(0, 0)) | ||||
|             .unwrap(); | ||||
|         assert_eq!(nxt.gossip, gossip_addr); | ||||
|         assert_eq!(rv.0, nxt.gossip); | ||||
|  | ||||
|         let gossip_addr2 = socketaddr!([127, 0, 0, 2], 1234); | ||||
|         let nxt = ContactInfo::new( | ||||
|             &Pubkey::new_rand(), | ||||
|             gossip_addr2, | ||||
|             socketaddr!([127, 0, 0, 1], 1235), | ||||
|             socketaddr!([127, 0, 0, 1], 1236), | ||||
|             socketaddr!([127, 0, 0, 1], 1237), | ||||
|             socketaddr!([127, 0, 0, 1], 1238), | ||||
|             socketaddr!([127, 0, 0, 1], 1239), | ||||
|             socketaddr!([127, 0, 0, 1], 1240), | ||||
|             socketaddr!([127, 0, 0, 1], 1241), | ||||
|             socketaddr!([127, 0, 0, 1], 1242), | ||||
|             0, | ||||
|         ); | ||||
|         cluster_info.insert_info(nxt); | ||||
|         let mut one = false; | ||||
|         let mut two = false; | ||||
|         while !one || !two { | ||||
|             //this randomly picks an option, so eventually it should pick both | ||||
|             let rv = cluster_info | ||||
|                 .repair_request(&RepairType::Shred(0, 0)) | ||||
|                 .unwrap(); | ||||
|             if rv.0 == gossip_addr { | ||||
|                 one = true; | ||||
|             } | ||||
|             if rv.0 == gossip_addr2 { | ||||
|                 two = true; | ||||
|             } | ||||
|         } | ||||
|         assert!(one && two); | ||||
|     } | ||||
|  | ||||
|     /// test window requests respond with the right shred, and do not overrun | ||||
|     #[test] | ||||
|     fn run_window_request() { | ||||
|         let recycler = PacketsRecycler::default(); | ||||
|         solana_logger::setup(); | ||||
|         let ledger_path = get_tmp_ledger_path!(); | ||||
|         { | ||||
|             let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); | ||||
|             let me = ContactInfo::new( | ||||
|                 &Pubkey::new_rand(), | ||||
|                 socketaddr!("127.0.0.1:1234"), | ||||
|                 socketaddr!("127.0.0.1:1235"), | ||||
|                 socketaddr!("127.0.0.1:1236"), | ||||
|                 socketaddr!("127.0.0.1:1237"), | ||||
|                 socketaddr!("127.0.0.1:1238"), | ||||
|                 socketaddr!("127.0.0.1:1239"), | ||||
|                 socketaddr!("127.0.0.1:1240"), | ||||
|                 socketaddr!("127.0.0.1:1241"), | ||||
|                 socketaddr!("127.0.0.1:1242"), | ||||
|                 0, | ||||
|             ); | ||||
|             let rv = ClusterInfo::run_window_request( | ||||
|                 &recycler, | ||||
|                 &me, | ||||
|                 &socketaddr_any!(), | ||||
|                 Some(&blockstore), | ||||
|                 &me, | ||||
|                 0, | ||||
|                 0, | ||||
|             ); | ||||
|             assert!(rv.is_none()); | ||||
|             let mut common_header = ShredCommonHeader::default(); | ||||
|             common_header.slot = 2; | ||||
|             common_header.index = 1; | ||||
|             let mut data_header = DataShredHeader::default(); | ||||
|             data_header.parent_offset = 1; | ||||
|             let shred_info = Shred::new_empty_from_header( | ||||
|                 common_header, | ||||
|                 data_header, | ||||
|                 CodingShredHeader::default(), | ||||
|             ); | ||||
|  | ||||
|             blockstore | ||||
|                 .insert_shreds(vec![shred_info], None, false) | ||||
|                 .expect("Expect successful ledger write"); | ||||
|  | ||||
|             let rv = ClusterInfo::run_window_request( | ||||
|                 &recycler, | ||||
|                 &me, | ||||
|                 &socketaddr_any!(), | ||||
|                 Some(&blockstore), | ||||
|                 &me, | ||||
|                 2, | ||||
|                 1, | ||||
|             ); | ||||
|             assert!(!rv.is_none()); | ||||
|             let rv: Vec<Shred> = rv | ||||
|                 .expect("packets") | ||||
|                 .packets | ||||
|                 .into_iter() | ||||
|                 .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) | ||||
|                 .collect(); | ||||
|             assert_eq!(rv[0].index(), 1); | ||||
|             assert_eq!(rv[0].slot(), 2); | ||||
|         } | ||||
|  | ||||
|         Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); | ||||
|     } | ||||
|  | ||||
|     /// test run_window_requestwindow requests respond with the right shred, and do not overrun | ||||
|     #[test] | ||||
|     fn run_highest_window_request() { | ||||
|         let recycler = PacketsRecycler::default(); | ||||
|         solana_logger::setup(); | ||||
|         let ledger_path = get_tmp_ledger_path!(); | ||||
|         { | ||||
|             let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); | ||||
|             let rv = ClusterInfo::run_highest_window_request( | ||||
|                 &recycler, | ||||
|                 &socketaddr_any!(), | ||||
|                 Some(&blockstore), | ||||
|                 0, | ||||
|                 0, | ||||
|             ); | ||||
|             assert!(rv.is_none()); | ||||
|  | ||||
|             let _ = fill_blockstore_slot_with_ticks( | ||||
|                 &blockstore, | ||||
|                 max_ticks_per_n_shreds(1) + 1, | ||||
|                 2, | ||||
|                 1, | ||||
|                 Hash::default(), | ||||
|             ); | ||||
|  | ||||
|             let rv = ClusterInfo::run_highest_window_request( | ||||
|                 &recycler, | ||||
|                 &socketaddr_any!(), | ||||
|                 Some(&blockstore), | ||||
|                 2, | ||||
|                 1, | ||||
|             ); | ||||
|             let rv: Vec<Shred> = rv | ||||
|                 .expect("packets") | ||||
|                 .packets | ||||
|                 .into_iter() | ||||
|                 .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) | ||||
|                 .collect(); | ||||
|             assert!(!rv.is_empty()); | ||||
|             let index = blockstore.meta(2).unwrap().unwrap().received - 1; | ||||
|             assert_eq!(rv[0].index(), index as u32); | ||||
|             assert_eq!(rv[0].slot(), 2); | ||||
|  | ||||
|             let rv = ClusterInfo::run_highest_window_request( | ||||
|                 &recycler, | ||||
|                 &socketaddr_any!(), | ||||
|                 Some(&blockstore), | ||||
|                 2, | ||||
|                 index + 1, | ||||
|             ); | ||||
|             assert!(rv.is_none()); | ||||
|         } | ||||
|  | ||||
|         Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn run_orphan() { | ||||
|         solana_logger::setup(); | ||||
|         let recycler = PacketsRecycler::default(); | ||||
|         let ledger_path = get_tmp_ledger_path!(); | ||||
|         { | ||||
|             let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); | ||||
|             let rv = | ||||
|                 ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 2, 0); | ||||
|             assert!(rv.is_none()); | ||||
|  | ||||
|             // Create slots 1, 2, 3 with 5 shreds apiece | ||||
|             let (shreds, _) = make_many_slot_entries(1, 3, 5); | ||||
|  | ||||
|             blockstore | ||||
|                 .insert_shreds(shreds, None, false) | ||||
|                 .expect("Expect successful ledger write"); | ||||
|  | ||||
|             // We don't have slot 4, so we don't know how to service this requeset | ||||
|             let rv = | ||||
|                 ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 4, 5); | ||||
|             assert!(rv.is_none()); | ||||
|  | ||||
|             // For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively | ||||
|             // for this request | ||||
|             let rv: Vec<_> = | ||||
|                 ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 3, 5) | ||||
|                     .expect("run_orphan packets") | ||||
|                     .packets | ||||
|                     .iter() | ||||
|                     .map(|b| b.clone()) | ||||
|                     .collect(); | ||||
|             let expected: Vec<_> = (1..=3) | ||||
|                 .rev() | ||||
|                 .map(|slot| { | ||||
|                     let index = blockstore.meta(slot).unwrap().unwrap().received - 1; | ||||
|                     ClusterInfo::get_data_shred_as_packet( | ||||
|                         &blockstore, | ||||
|                         slot, | ||||
|                         index, | ||||
|                         &socketaddr_any!(), | ||||
|                     ) | ||||
|                     .unwrap() | ||||
|                     .unwrap() | ||||
|                 }) | ||||
|                 .collect(); | ||||
|             assert_eq!(rv, expected) | ||||
|         } | ||||
|  | ||||
|         Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); | ||||
|     } | ||||
|  | ||||
|     fn assert_in_range(x: u16, range: (u16, u16)) { | ||||
|         assert!(x >= range.0); | ||||
| @@ -2164,16 +2675,13 @@ mod tests { | ||||
|     } | ||||
|  | ||||
|     fn test_split_messages(value: CrdsValue) { | ||||
|         const NUM_VALUES: u64 = 30; | ||||
|         const NUM_VALUES: usize = 30; | ||||
|         let value_size = value.size(); | ||||
|         let num_values_per_payload = (MAX_PROTOCOL_PAYLOAD_SIZE / value_size).max(1); | ||||
|  | ||||
|         // Expected len is the ceiling of the division | ||||
|         let expected_len = (NUM_VALUES + num_values_per_payload - 1) / num_values_per_payload; | ||||
|         let msgs = vec![value; NUM_VALUES as usize]; | ||||
|         let expected_len = NUM_VALUES / (MAX_PROTOCOL_PAYLOAD_SIZE / value_size).max(1) as usize; | ||||
|         let msgs = vec![value; NUM_VALUES]; | ||||
|  | ||||
|         let split = ClusterInfo::split_gossip_messages(msgs); | ||||
|         assert!(split.len() as u64 <= expected_len); | ||||
|         assert!(split.len() <= expected_len); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
| @@ -2346,6 +2854,25 @@ mod tests { | ||||
|                 - serialized_size(&PruneData::default()).unwrap(), | ||||
|         ); | ||||
|  | ||||
|         // make sure repairs are always smaller than the gossip messages | ||||
|         assert!( | ||||
|             max_protocol_size | ||||
|                 > serialized_size(&Protocol::RequestWindowIndex(ContactInfo::default(), 0, 0)) | ||||
|                     .unwrap() | ||||
|         ); | ||||
|         assert!( | ||||
|             max_protocol_size | ||||
|                 > serialized_size(&Protocol::RequestHighestWindowIndex( | ||||
|                     ContactInfo::default(), | ||||
|                     0, | ||||
|                     0 | ||||
|                 )) | ||||
|                 .unwrap() | ||||
|         ); | ||||
|         assert!( | ||||
|             max_protocol_size | ||||
|                 > serialized_size(&Protocol::RequestOrphan(ContactInfo::default(), 0)).unwrap() | ||||
|         ); | ||||
|         // finally assert the header size estimation is correct | ||||
|         assert_eq!(MAX_PROTOCOL_HEADER_SIZE, max_protocol_size); | ||||
|     } | ||||
|   | ||||
| @@ -17,7 +17,7 @@ pub struct ContactInfo { | ||||
|     pub tvu: SocketAddr, | ||||
|     /// address to forward shreds to | ||||
|     pub tvu_forwards: SocketAddr, | ||||
|     /// address to send repair responses to | ||||
|     /// address to send repairs to | ||||
|     pub repair: SocketAddr, | ||||
|     /// transactions address | ||||
|     pub tpu: SocketAddr, | ||||
| @@ -29,8 +29,6 @@ pub struct ContactInfo { | ||||
|     pub rpc: SocketAddr, | ||||
|     /// websocket for JSON-RPC push notifications | ||||
|     pub rpc_pubsub: SocketAddr, | ||||
|     /// address to send repair requests to | ||||
|     pub serve_repair: SocketAddr, | ||||
|     /// latest wallclock picked | ||||
|     pub wallclock: u64, | ||||
|     /// node shred version | ||||
| @@ -87,7 +85,6 @@ impl Default for ContactInfo { | ||||
|             storage_addr: socketaddr_any!(), | ||||
|             rpc: socketaddr_any!(), | ||||
|             rpc_pubsub: socketaddr_any!(), | ||||
|             serve_repair: socketaddr_any!(), | ||||
|             wallclock: 0, | ||||
|             shred_version: 0, | ||||
|         } | ||||
| @@ -107,7 +104,6 @@ impl ContactInfo { | ||||
|         storage_addr: SocketAddr, | ||||
|         rpc: SocketAddr, | ||||
|         rpc_pubsub: SocketAddr, | ||||
|         serve_repair: SocketAddr, | ||||
|         now: u64, | ||||
|     ) -> Self { | ||||
|         Self { | ||||
| @@ -121,7 +117,6 @@ impl ContactInfo { | ||||
|             storage_addr, | ||||
|             rpc, | ||||
|             rpc_pubsub, | ||||
|             serve_repair, | ||||
|             wallclock: now, | ||||
|             shred_version: 0, | ||||
|         } | ||||
| @@ -139,7 +134,6 @@ impl ContactInfo { | ||||
|             socketaddr!("127.0.0.1:1240"), | ||||
|             socketaddr!("127.0.0.1:1241"), | ||||
|             socketaddr!("127.0.0.1:1242"), | ||||
|             socketaddr!("127.0.0.1:1243"), | ||||
|             now, | ||||
|         ) | ||||
|     } | ||||
| @@ -160,7 +154,6 @@ impl ContactInfo { | ||||
|             addr, | ||||
|             addr, | ||||
|             addr, | ||||
|             addr, | ||||
|             0, | ||||
|         ) | ||||
|     } | ||||
| @@ -181,7 +174,6 @@ impl ContactInfo { | ||||
|         let repair = next_port(&bind_addr, 5); | ||||
|         let rpc_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT); | ||||
|         let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); | ||||
|         let serve_repair = next_port(&bind_addr, 6); | ||||
|         Self::new( | ||||
|             pubkey, | ||||
|             gossip_addr, | ||||
| @@ -193,7 +185,6 @@ impl ContactInfo { | ||||
|             "0.0.0.0:0".parse().unwrap(), | ||||
|             rpc_addr, | ||||
|             rpc_pubsub_addr, | ||||
|             serve_repair, | ||||
|             timestamp(), | ||||
|         ) | ||||
|     } | ||||
| @@ -218,7 +209,6 @@ impl ContactInfo { | ||||
|             daddr, | ||||
|             daddr, | ||||
|             daddr, | ||||
|             daddr, | ||||
|             timestamp(), | ||||
|         ) | ||||
|     } | ||||
| @@ -277,7 +267,6 @@ mod tests { | ||||
|         assert!(ci.rpc_pubsub.ip().is_unspecified()); | ||||
|         assert!(ci.tpu.ip().is_unspecified()); | ||||
|         assert!(ci.storage_addr.ip().is_unspecified()); | ||||
|         assert!(ci.serve_repair.ip().is_unspecified()); | ||||
|     } | ||||
|     #[test] | ||||
|     fn test_multicast() { | ||||
| @@ -289,7 +278,6 @@ mod tests { | ||||
|         assert!(ci.rpc_pubsub.ip().is_multicast()); | ||||
|         assert!(ci.tpu.ip().is_multicast()); | ||||
|         assert!(ci.storage_addr.ip().is_multicast()); | ||||
|         assert!(ci.serve_repair.ip().is_multicast()); | ||||
|     } | ||||
|     #[test] | ||||
|     fn test_entry_point() { | ||||
| @@ -302,7 +290,6 @@ mod tests { | ||||
|         assert!(ci.rpc_pubsub.ip().is_unspecified()); | ||||
|         assert!(ci.tpu.ip().is_unspecified()); | ||||
|         assert!(ci.storage_addr.ip().is_unspecified()); | ||||
|         assert!(ci.serve_repair.ip().is_unspecified()); | ||||
|     } | ||||
|     #[test] | ||||
|     fn test_socketaddr() { | ||||
| @@ -315,9 +302,7 @@ mod tests { | ||||
|         assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT); | ||||
|         assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); | ||||
|         assert!(ci.storage_addr.ip().is_unspecified()); | ||||
|         assert_eq!(ci.serve_repair.port(), 16); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn replayed_data_new_with_socketaddr_with_pubkey() { | ||||
|         let keypair = Keypair::new(); | ||||
| @@ -338,9 +323,6 @@ mod tests { | ||||
|             d1.rpc_pubsub, | ||||
|             socketaddr!(format!("127.0.0.1:{}", rpc_port::DEFAULT_RPC_PUBSUB_PORT)) | ||||
|         ); | ||||
|         assert_eq!(d1.tvu_forwards, socketaddr!("127.0.0.1:1238")); | ||||
|         assert_eq!(d1.repair, socketaddr!("127.0.0.1:1239")); | ||||
|         assert_eq!(d1.serve_repair, socketaddr!("127.0.0.1:1240")); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|   | ||||
| @@ -6,6 +6,7 @@ use crate::streamer; | ||||
| use rand::{thread_rng, Rng}; | ||||
| use solana_client::thin_client::{create_client, ThinClient}; | ||||
| use solana_ledger::bank_forks::BankForks; | ||||
| use solana_ledger::blockstore::Blockstore; | ||||
| use solana_perf::recycler::Recycler; | ||||
| use solana_sdk::pubkey::Pubkey; | ||||
| use solana_sdk::signature::{Keypair, KeypairUtil}; | ||||
| @@ -23,6 +24,7 @@ pub struct GossipService { | ||||
| impl GossipService { | ||||
|     pub fn new( | ||||
|         cluster_info: &Arc<RwLock<ClusterInfo>>, | ||||
|         blockstore: Option<Arc<Blockstore>>, | ||||
|         bank_forks: Option<Arc<RwLock<BankForks>>>, | ||||
|         gossip_socket: UdpSocket, | ||||
|         exit: &Arc<AtomicBool>, | ||||
| @@ -45,6 +47,7 @@ impl GossipService { | ||||
|         let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); | ||||
|         let t_listen = ClusterInfo::listen( | ||||
|             cluster_info.clone(), | ||||
|             blockstore, | ||||
|             bank_forks.clone(), | ||||
|             request_receiver, | ||||
|             response_sender.clone(), | ||||
| @@ -280,7 +283,8 @@ fn make_gossip_node( | ||||
|         cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint)); | ||||
|     } | ||||
|     let cluster_info = Arc::new(RwLock::new(cluster_info)); | ||||
|     let gossip_service = GossipService::new(&cluster_info.clone(), None, gossip_socket, &exit); | ||||
|     let gossip_service = | ||||
|         GossipService::new(&cluster_info.clone(), None, None, gossip_socket, &exit); | ||||
|     (gossip_service, ip_echo, cluster_info) | ||||
| } | ||||
|  | ||||
| @@ -299,7 +303,7 @@ mod tests { | ||||
|         let tn = Node::new_localhost(); | ||||
|         let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone()); | ||||
|         let c = Arc::new(RwLock::new(cluster_info)); | ||||
|         let d = GossipService::new(&c, None, tn.sockets.gossip, &exit); | ||||
|         let d = GossipService::new(&c, None, None, tn.sockets.gossip, &exit); | ||||
|         exit.store(true, Ordering::Relaxed); | ||||
|         d.join().unwrap(); | ||||
|     } | ||||
|   | ||||
| @@ -43,8 +43,6 @@ pub mod rpc_pubsub_service; | ||||
| pub mod rpc_service; | ||||
| pub mod rpc_subscriptions; | ||||
| pub mod sendmmsg; | ||||
| pub mod serve_repair; | ||||
| pub mod serve_repair_service; | ||||
| pub mod sigverify; | ||||
| pub mod sigverify_shreds; | ||||
| pub mod sigverify_stage; | ||||
|   | ||||
| @@ -1,10 +1,8 @@ | ||||
| //! The `repair_service` module implements the tools necessary to generate a thread which | ||||
| //! regularly finds missing shreds in the ledger and sends repair requests for those shreds | ||||
| use crate::{ | ||||
|     cluster_info::ClusterInfo, | ||||
|     cluster_info_repair_listener::ClusterInfoRepairListener, | ||||
|     cluster_info::ClusterInfo, cluster_info_repair_listener::ClusterInfoRepairListener, | ||||
|     result::Result, | ||||
|     serve_repair::{RepairType, ServeRepair}, | ||||
| }; | ||||
| use solana_ledger::{ | ||||
|     bank_forks::BankForks, | ||||
| @@ -35,6 +33,23 @@ pub enum RepairStrategy { | ||||
|     }, | ||||
| } | ||||
|  | ||||
| #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] | ||||
| pub enum RepairType { | ||||
|     Orphan(Slot), | ||||
|     HighestShred(Slot, u64), | ||||
|     Shred(Slot, u64), | ||||
| } | ||||
|  | ||||
| impl RepairType { | ||||
|     pub fn slot(&self) -> Slot { | ||||
|         match self { | ||||
|             RepairType::Orphan(slot) => *slot, | ||||
|             RepairType::HighestShred(slot, _) => *slot, | ||||
|             RepairType::Shred(slot, _) => *slot, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub struct RepairSlotRange { | ||||
|     pub start: Slot, | ||||
|     pub end: Slot, | ||||
| @@ -101,7 +116,6 @@ impl RepairService { | ||||
|         cluster_info: &Arc<RwLock<ClusterInfo>>, | ||||
|         repair_strategy: RepairStrategy, | ||||
|     ) { | ||||
|         let serve_repair = ServeRepair::new(cluster_info.clone()); | ||||
|         let mut epoch_slots: BTreeSet<Slot> = BTreeSet::new(); | ||||
|         let id = cluster_info.read().unwrap().id(); | ||||
|         let mut current_root = 0; | ||||
| @@ -159,7 +173,9 @@ impl RepairService { | ||||
|                 let reqs: Vec<_> = repairs | ||||
|                     .into_iter() | ||||
|                     .filter_map(|repair_request| { | ||||
|                         serve_repair | ||||
|                         cluster_info | ||||
|                             .read() | ||||
|                             .unwrap() | ||||
|                             .repair_request(&repair_request) | ||||
|                             .map(|result| (result, repair_request)) | ||||
|                             .ok() | ||||
|   | ||||
| @@ -1,676 +0,0 @@ | ||||
| use crate::packet::limited_deserialize; | ||||
| use crate::streamer::{PacketReceiver, PacketSender}; | ||||
| use crate::{ | ||||
|     cluster_info::{ClusterInfo, ClusterInfoError}, | ||||
|     contact_info::ContactInfo, | ||||
|     packet::Packet, | ||||
|     result::Result, | ||||
| }; | ||||
| use bincode::serialize; | ||||
| use rand::{thread_rng, Rng}; | ||||
| use solana_ledger::blockstore::Blockstore; | ||||
| use solana_measure::thread_mem_usage; | ||||
| use solana_metrics::{datapoint_debug, inc_new_counter_debug}; | ||||
| use solana_perf::packet::{Packets, PacketsRecycler}; | ||||
| use solana_sdk::{ | ||||
|     clock::Slot, | ||||
|     signature::{Keypair, KeypairUtil}, | ||||
|     timing::duration_as_ms, | ||||
| }; | ||||
| use std::{ | ||||
|     net::SocketAddr, | ||||
|     sync::atomic::{AtomicBool, Ordering}, | ||||
|     sync::{Arc, RwLock}, | ||||
|     thread::{Builder, JoinHandle}, | ||||
|     time::{Duration, Instant}, | ||||
| }; | ||||
|  | ||||
| /// the number of slots to respond with when responding to `Orphan` requests | ||||
| pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; | ||||
|  | ||||
| #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] | ||||
| pub enum RepairType { | ||||
|     Orphan(Slot), | ||||
|     HighestShred(Slot, u64), | ||||
|     Shred(Slot, u64), | ||||
| } | ||||
|  | ||||
| impl RepairType { | ||||
|     pub fn slot(&self) -> Slot { | ||||
|         match self { | ||||
|             RepairType::Orphan(slot) => *slot, | ||||
|             RepairType::HighestShred(slot, _) => *slot, | ||||
|             RepairType::Shred(slot, _) => *slot, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Window protocol messages | ||||
| #[derive(Serialize, Deserialize, Debug)] | ||||
| enum RepairProtocol { | ||||
|     WindowIndex(ContactInfo, u64, u64), | ||||
|     HighestWindowIndex(ContactInfo, u64, u64), | ||||
|     Orphan(ContactInfo, u64), | ||||
| } | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub struct ServeRepair { | ||||
|     /// set the keypair that will be used to sign repair responses | ||||
|     keypair: Arc<Keypair>, | ||||
|     my_info: ContactInfo, | ||||
|     cluster_info: Arc<RwLock<ClusterInfo>>, | ||||
| } | ||||
|  | ||||
| impl ServeRepair { | ||||
|     /// Without a valid keypair gossip will not function. Only useful for tests. | ||||
|     pub fn new_with_invalid_keypair(contact_info: ContactInfo) -> Self { | ||||
|         Self::new(Arc::new(RwLock::new( | ||||
|             ClusterInfo::new_with_invalid_keypair(contact_info), | ||||
|         ))) | ||||
|     } | ||||
|  | ||||
|     pub fn new(cluster_info: Arc<RwLock<ClusterInfo>>) -> Self { | ||||
|         let (keypair, my_info) = { | ||||
|             let r_cluster_info = cluster_info.read().unwrap(); | ||||
|             (r_cluster_info.keypair.clone(), r_cluster_info.my_data()) | ||||
|         }; | ||||
|         Self { | ||||
|             keypair, | ||||
|             my_info, | ||||
|             cluster_info, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn my_info(&self) -> &ContactInfo { | ||||
|         &self.my_info | ||||
|     } | ||||
|  | ||||
|     pub fn keypair(&self) -> &Arc<Keypair> { | ||||
|         &self.keypair | ||||
|     } | ||||
|  | ||||
|     fn get_repair_sender(request: &RepairProtocol) -> &ContactInfo { | ||||
|         match request { | ||||
|             RepairProtocol::WindowIndex(ref from, _, _) => from, | ||||
|             RepairProtocol::HighestWindowIndex(ref from, _, _) => from, | ||||
|             RepairProtocol::Orphan(ref from, _) => from, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn handle_repair( | ||||
|         me: &Arc<RwLock<Self>>, | ||||
|         recycler: &PacketsRecycler, | ||||
|         from_addr: &SocketAddr, | ||||
|         blockstore: Option<&Arc<Blockstore>>, | ||||
|         request: RepairProtocol, | ||||
|     ) -> Option<Packets> { | ||||
|         let now = Instant::now(); | ||||
|  | ||||
|         //TODO verify from is signed | ||||
|         let my_id = me.read().unwrap().keypair.pubkey(); | ||||
|         let from = Self::get_repair_sender(&request); | ||||
|         if from.id == my_id { | ||||
|             warn!( | ||||
|                 "{}: Ignored received repair request from ME {}", | ||||
|                 my_id, from.id, | ||||
|             ); | ||||
|             inc_new_counter_debug!("serve_repair-handle-repair--eq", 1); | ||||
|             return None; | ||||
|         } | ||||
|  | ||||
|         let (res, label) = { | ||||
|             match &request { | ||||
|                 RepairProtocol::WindowIndex(from, slot, shred_index) => { | ||||
|                     inc_new_counter_debug!("serve_repair-request-window-index", 1); | ||||
|                     ( | ||||
|                         Self::run_window_request( | ||||
|                             recycler, | ||||
|                             from, | ||||
|                             &from_addr, | ||||
|                             blockstore, | ||||
|                             &me.read().unwrap().my_info, | ||||
|                             *slot, | ||||
|                             *shred_index, | ||||
|                         ), | ||||
|                         "WindowIndex", | ||||
|                     ) | ||||
|                 } | ||||
|  | ||||
|                 RepairProtocol::HighestWindowIndex(_, slot, highest_index) => { | ||||
|                     inc_new_counter_debug!("serve_repair-request-highest-window-index", 1); | ||||
|                     ( | ||||
|                         Self::run_highest_window_request( | ||||
|                             recycler, | ||||
|                             &from_addr, | ||||
|                             blockstore, | ||||
|                             *slot, | ||||
|                             *highest_index, | ||||
|                         ), | ||||
|                         "HighestWindowIndex", | ||||
|                     ) | ||||
|                 } | ||||
|                 RepairProtocol::Orphan(_, slot) => { | ||||
|                     inc_new_counter_debug!("serve_repair-request-orphan", 1); | ||||
|                     ( | ||||
|                         Self::run_orphan( | ||||
|                             recycler, | ||||
|                             &from_addr, | ||||
|                             blockstore, | ||||
|                             *slot, | ||||
|                             MAX_ORPHAN_REPAIR_RESPONSES, | ||||
|                         ), | ||||
|                         "Orphan", | ||||
|                     ) | ||||
|                 } | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         trace!("{}: received repair request: {:?}", my_id, request); | ||||
|         Self::report_time_spent(label, &now.elapsed(), ""); | ||||
|         res | ||||
|     } | ||||
|  | ||||
|     fn report_time_spent(label: &str, time: &Duration, extra: &str) { | ||||
|         let count = duration_as_ms(time); | ||||
|         if count > 5 { | ||||
|             info!("{} took: {} ms {}", label, count, extra); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Process messages from the network | ||||
|     fn run_listen( | ||||
|         obj: &Arc<RwLock<Self>>, | ||||
|         recycler: &PacketsRecycler, | ||||
|         blockstore: Option<&Arc<Blockstore>>, | ||||
|         requests_receiver: &PacketReceiver, | ||||
|         response_sender: &PacketSender, | ||||
|     ) -> Result<()> { | ||||
|         //TODO cache connections | ||||
|         let timeout = Duration::new(1, 0); | ||||
|         let reqs = requests_receiver.recv_timeout(timeout)?; | ||||
|  | ||||
|         Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender); | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn listen( | ||||
|         me: Arc<RwLock<Self>>, | ||||
|         blockstore: Option<Arc<Blockstore>>, | ||||
|         requests_receiver: PacketReceiver, | ||||
|         response_sender: PacketSender, | ||||
|         exit: &Arc<AtomicBool>, | ||||
|     ) -> JoinHandle<()> { | ||||
|         let exit = exit.clone(); | ||||
|         let recycler = PacketsRecycler::default(); | ||||
|         Builder::new() | ||||
|             .name("solana-repair-listen".to_string()) | ||||
|             .spawn(move || loop { | ||||
|                 let e = Self::run_listen( | ||||
|                     &me, | ||||
|                     &recycler, | ||||
|                     blockstore.as_ref(), | ||||
|                     &requests_receiver, | ||||
|                     &response_sender, | ||||
|                 ); | ||||
|                 if exit.load(Ordering::Relaxed) { | ||||
|                     return; | ||||
|                 } | ||||
|                 if e.is_err() { | ||||
|                     info!("repair listener error: {:?}", e); | ||||
|                 } | ||||
|                 thread_mem_usage::datapoint("solana-repair-listen"); | ||||
|             }) | ||||
|             .unwrap() | ||||
|     } | ||||
|  | ||||
|     fn handle_packets( | ||||
|         me: &Arc<RwLock<Self>>, | ||||
|         recycler: &PacketsRecycler, | ||||
|         blockstore: Option<&Arc<Blockstore>>, | ||||
|         packets: Packets, | ||||
|         response_sender: &PacketSender, | ||||
|     ) { | ||||
|         // iter over the packets, collect pulls separately and process everything else | ||||
|         let allocated = thread_mem_usage::Allocatedp::default(); | ||||
|         packets.packets.iter().for_each(|packet| { | ||||
|             let start = allocated.get(); | ||||
|             let from_addr = packet.meta.addr(); | ||||
|             limited_deserialize(&packet.data[..packet.meta.size]) | ||||
|                 .into_iter() | ||||
|                 .for_each(|request| { | ||||
|                     let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request); | ||||
|                     if let Some(rsp) = rsp { | ||||
|                         let _ignore_disconnect = response_sender.send(rsp); | ||||
|                     } | ||||
|                 }); | ||||
|             datapoint_debug!( | ||||
|                 "solana-serve-repair-memory", | ||||
|                 ("serve_repair", (allocated.get() - start) as i64, i64), | ||||
|             ); | ||||
|         }); | ||||
|     } | ||||
|  | ||||
|     fn window_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> { | ||||
|         let req = RepairProtocol::WindowIndex(self.my_info.clone(), slot, shred_index); | ||||
|         let out = serialize(&req)?; | ||||
|         Ok(out) | ||||
|     } | ||||
|  | ||||
|     fn window_highest_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> { | ||||
|         let req = RepairProtocol::HighestWindowIndex(self.my_info.clone(), slot, shred_index); | ||||
|         let out = serialize(&req)?; | ||||
|         Ok(out) | ||||
|     } | ||||
|  | ||||
|     fn orphan_bytes(&self, slot: Slot) -> Result<Vec<u8>> { | ||||
|         let req = RepairProtocol::Orphan(self.my_info.clone(), slot); | ||||
|         let out = serialize(&req)?; | ||||
|         Ok(out) | ||||
|     } | ||||
|  | ||||
|     pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec<u8>)> { | ||||
|         // find a peer that appears to be accepting replication and has the desired slot, as indicated | ||||
|         // by a valid tvu port location | ||||
|         let valid: Vec<_> = self | ||||
|             .cluster_info | ||||
|             .read() | ||||
|             .unwrap() | ||||
|             .repair_peers(repair_request.slot()); | ||||
|         if valid.is_empty() { | ||||
|             return Err(ClusterInfoError::NoPeers.into()); | ||||
|         } | ||||
|         let n = thread_rng().gen::<usize>() % valid.len(); | ||||
|         let addr = valid[n].serve_repair; // send the request to the peer's serve_repair port | ||||
|         let out = self.map_repair_request(repair_request)?; | ||||
|  | ||||
|         Ok((addr, out)) | ||||
|     } | ||||
|  | ||||
|     pub fn map_repair_request(&self, repair_request: &RepairType) -> Result<Vec<u8>> { | ||||
|         match repair_request { | ||||
|             RepairType::Shred(slot, shred_index) => { | ||||
|                 datapoint_debug!( | ||||
|                     "serve_repair-repair", | ||||
|                     ("repair-slot", *slot, i64), | ||||
|                     ("repair-ix", *shred_index, i64) | ||||
|                 ); | ||||
|                 Ok(self.window_index_request_bytes(*slot, *shred_index)?) | ||||
|             } | ||||
|             RepairType::HighestShred(slot, shred_index) => { | ||||
|                 datapoint_debug!( | ||||
|                     "serve_repair-repair_highest", | ||||
|                     ("repair-highest-slot", *slot, i64), | ||||
|                     ("repair-highest-ix", *shred_index, i64) | ||||
|                 ); | ||||
|                 Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?) | ||||
|             } | ||||
|             RepairType::Orphan(slot) => { | ||||
|                 datapoint_debug!("serve_repair-repair_orphan", ("repair-orphan", *slot, i64)); | ||||
|                 Ok(self.orphan_bytes(*slot)?) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn run_window_request( | ||||
|         recycler: &PacketsRecycler, | ||||
|         from: &ContactInfo, | ||||
|         from_addr: &SocketAddr, | ||||
|         blockstore: Option<&Arc<Blockstore>>, | ||||
|         me: &ContactInfo, | ||||
|         slot: Slot, | ||||
|         shred_index: u64, | ||||
|     ) -> Option<Packets> { | ||||
|         if let Some(blockstore) = blockstore { | ||||
|             // Try to find the requested index in one of the slots | ||||
|             let packet = Self::get_data_shred_as_packet(blockstore, slot, shred_index, from_addr); | ||||
|  | ||||
|             if let Ok(Some(packet)) = packet { | ||||
|                 inc_new_counter_debug!("serve_repair-window-request-ledger", 1); | ||||
|                 return Some(Packets::new_with_recycler_data( | ||||
|                     recycler, | ||||
|                     "run_window_request", | ||||
|                     vec![packet], | ||||
|                 )); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         inc_new_counter_debug!("serve_repair-window-request-fail", 1); | ||||
|         trace!( | ||||
|             "{}: failed WindowIndex {} {} {}", | ||||
|             me.id, | ||||
|             from.id, | ||||
|             slot, | ||||
|             shred_index, | ||||
|         ); | ||||
|  | ||||
|         None | ||||
|     } | ||||
|  | ||||
|     fn run_highest_window_request( | ||||
|         recycler: &PacketsRecycler, | ||||
|         from_addr: &SocketAddr, | ||||
|         blockstore: Option<&Arc<Blockstore>>, | ||||
|         slot: Slot, | ||||
|         highest_index: u64, | ||||
|     ) -> Option<Packets> { | ||||
|         let blockstore = blockstore?; | ||||
|         // Try to find the requested index in one of the slots | ||||
|         let meta = blockstore.meta(slot).ok()??; | ||||
|         if meta.received > highest_index { | ||||
|             // meta.received must be at least 1 by this point | ||||
|             let packet = | ||||
|                 Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr) | ||||
|                     .ok()??; | ||||
|             return Some(Packets::new_with_recycler_data( | ||||
|                 recycler, | ||||
|                 "run_highest_window_request", | ||||
|                 vec![packet], | ||||
|             )); | ||||
|         } | ||||
|         None | ||||
|     } | ||||
|  | ||||
|     fn run_orphan( | ||||
|         recycler: &PacketsRecycler, | ||||
|         from_addr: &SocketAddr, | ||||
|         blockstore: Option<&Arc<Blockstore>>, | ||||
|         mut slot: Slot, | ||||
|         max_responses: usize, | ||||
|     ) -> Option<Packets> { | ||||
|         let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan"); | ||||
|         if let Some(blockstore) = blockstore { | ||||
|             // Try to find the next "n" parent slots of the input slot | ||||
|             while let Ok(Some(meta)) = blockstore.meta(slot) { | ||||
|                 if meta.received == 0 { | ||||
|                     break; | ||||
|                 } | ||||
|                 let packet = | ||||
|                     Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr); | ||||
|                 if let Ok(Some(packet)) = packet { | ||||
|                     res.packets.push(packet); | ||||
|                 } | ||||
|                 if meta.is_parent_set() && res.packets.len() <= max_responses { | ||||
|                     slot = meta.parent_slot; | ||||
|                 } else { | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         if res.is_empty() { | ||||
|             return None; | ||||
|         } | ||||
|         Some(res) | ||||
|     } | ||||
|  | ||||
|     fn get_data_shred_as_packet( | ||||
|         blockstore: &Arc<Blockstore>, | ||||
|         slot: Slot, | ||||
|         shred_index: u64, | ||||
|         dest: &SocketAddr, | ||||
|     ) -> Result<Option<Packet>> { | ||||
|         let data = blockstore.get_data_shred(slot, shred_index)?; | ||||
|         Ok(data.map(|data| { | ||||
|             let mut packet = Packet::default(); | ||||
|             packet.meta.size = data.len(); | ||||
|             packet.meta.set_addr(dest); | ||||
|             packet.data.copy_from_slice(&data); | ||||
|             packet | ||||
|         })) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     use super::*; | ||||
|     use crate::result::Error; | ||||
|     use solana_ledger::get_tmp_ledger_path; | ||||
|     use solana_ledger::{ | ||||
|         blockstore::make_many_slot_entries, | ||||
|         blockstore_processor::fill_blockstore_slot_with_ticks, | ||||
|         shred::{ | ||||
|             max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, | ||||
|         }, | ||||
|     }; | ||||
|     use solana_sdk::{hash::Hash, pubkey::Pubkey, timing::timestamp}; | ||||
|  | ||||
|     /// test run_window_requestwindow requests respond with the right shred, and do not overrun | ||||
|     #[test] | ||||
|     fn run_highest_window_request() { | ||||
|         let recycler = PacketsRecycler::default(); | ||||
|         solana_logger::setup(); | ||||
|         let ledger_path = get_tmp_ledger_path!(); | ||||
|         { | ||||
|             let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); | ||||
|             let rv = ServeRepair::run_highest_window_request( | ||||
|                 &recycler, | ||||
|                 &socketaddr_any!(), | ||||
|                 Some(&blockstore), | ||||
|                 0, | ||||
|                 0, | ||||
|             ); | ||||
|             assert!(rv.is_none()); | ||||
|  | ||||
|             let _ = fill_blockstore_slot_with_ticks( | ||||
|                 &blockstore, | ||||
|                 max_ticks_per_n_shreds(1) + 1, | ||||
|                 2, | ||||
|                 1, | ||||
|                 Hash::default(), | ||||
|             ); | ||||
|  | ||||
|             let rv = ServeRepair::run_highest_window_request( | ||||
|                 &recycler, | ||||
|                 &socketaddr_any!(), | ||||
|                 Some(&blockstore), | ||||
|                 2, | ||||
|                 1, | ||||
|             ); | ||||
|             let rv: Vec<Shred> = rv | ||||
|                 .expect("packets") | ||||
|                 .packets | ||||
|                 .into_iter() | ||||
|                 .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) | ||||
|                 .collect(); | ||||
|             assert!(!rv.is_empty()); | ||||
|             let index = blockstore.meta(2).unwrap().unwrap().received - 1; | ||||
|             assert_eq!(rv[0].index(), index as u32); | ||||
|             assert_eq!(rv[0].slot(), 2); | ||||
|  | ||||
|             let rv = ServeRepair::run_highest_window_request( | ||||
|                 &recycler, | ||||
|                 &socketaddr_any!(), | ||||
|                 Some(&blockstore), | ||||
|                 2, | ||||
|                 index + 1, | ||||
|             ); | ||||
|             assert!(rv.is_none()); | ||||
|         } | ||||
|  | ||||
|         Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); | ||||
|     } | ||||
|  | ||||
|     /// test window requests respond with the right shred, and do not overrun | ||||
|     #[test] | ||||
|     fn run_window_request() { | ||||
|         let recycler = PacketsRecycler::default(); | ||||
|         solana_logger::setup(); | ||||
|         let ledger_path = get_tmp_ledger_path!(); | ||||
|         { | ||||
|             let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); | ||||
|             let me = ContactInfo::new( | ||||
|                 &Pubkey::new_rand(), | ||||
|                 socketaddr!("127.0.0.1:1234"), | ||||
|                 socketaddr!("127.0.0.1:1235"), | ||||
|                 socketaddr!("127.0.0.1:1236"), | ||||
|                 socketaddr!("127.0.0.1:1237"), | ||||
|                 socketaddr!("127.0.0.1:1238"), | ||||
|                 socketaddr!("127.0.0.1:1239"), | ||||
|                 socketaddr!("127.0.0.1:1240"), | ||||
|                 socketaddr!("127.0.0.1:1241"), | ||||
|                 socketaddr!("127.0.0.1:1242"), | ||||
|                 socketaddr!("127.0.0.1:1243"), | ||||
|                 0, | ||||
|             ); | ||||
|             let rv = ServeRepair::run_window_request( | ||||
|                 &recycler, | ||||
|                 &me, | ||||
|                 &socketaddr_any!(), | ||||
|                 Some(&blockstore), | ||||
|                 &me, | ||||
|                 0, | ||||
|                 0, | ||||
|             ); | ||||
|             assert!(rv.is_none()); | ||||
|             let mut common_header = ShredCommonHeader::default(); | ||||
|             common_header.slot = 2; | ||||
|             common_header.index = 1; | ||||
|             let mut data_header = DataShredHeader::default(); | ||||
|             data_header.parent_offset = 1; | ||||
|             let shred_info = Shred::new_empty_from_header( | ||||
|                 common_header, | ||||
|                 data_header, | ||||
|                 CodingShredHeader::default(), | ||||
|             ); | ||||
|  | ||||
|             blockstore | ||||
|                 .insert_shreds(vec![shred_info], None, false) | ||||
|                 .expect("Expect successful ledger write"); | ||||
|  | ||||
|             let rv = ServeRepair::run_window_request( | ||||
|                 &recycler, | ||||
|                 &me, | ||||
|                 &socketaddr_any!(), | ||||
|                 Some(&blockstore), | ||||
|                 &me, | ||||
|                 2, | ||||
|                 1, | ||||
|             ); | ||||
|             assert!(!rv.is_none()); | ||||
|             let rv: Vec<Shred> = rv | ||||
|                 .expect("packets") | ||||
|                 .packets | ||||
|                 .into_iter() | ||||
|                 .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) | ||||
|                 .collect(); | ||||
|             assert_eq!(rv[0].index(), 1); | ||||
|             assert_eq!(rv[0].slot(), 2); | ||||
|         } | ||||
|  | ||||
|         Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn window_index_request() { | ||||
|         let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); | ||||
|         let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(me))); | ||||
|         let serve_repair = ServeRepair::new(cluster_info.clone()); | ||||
|         let rv = serve_repair.repair_request(&RepairType::Shred(0, 0)); | ||||
|         assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); | ||||
|  | ||||
|         let serve_repair_addr = socketaddr!([127, 0, 0, 1], 1243); | ||||
|         let nxt = ContactInfo::new( | ||||
|             &Pubkey::new_rand(), | ||||
|             socketaddr!([127, 0, 0, 1], 1234), | ||||
|             socketaddr!([127, 0, 0, 1], 1235), | ||||
|             socketaddr!([127, 0, 0, 1], 1236), | ||||
|             socketaddr!([127, 0, 0, 1], 1237), | ||||
|             socketaddr!([127, 0, 0, 1], 1238), | ||||
|             socketaddr!([127, 0, 0, 1], 1239), | ||||
|             socketaddr!([127, 0, 0, 1], 1240), | ||||
|             socketaddr!([127, 0, 0, 1], 1241), | ||||
|             socketaddr!([127, 0, 0, 1], 1242), | ||||
|             serve_repair_addr, | ||||
|             0, | ||||
|         ); | ||||
|         cluster_info.write().unwrap().insert_info(nxt.clone()); | ||||
|         let rv = serve_repair | ||||
|             .repair_request(&RepairType::Shred(0, 0)) | ||||
|             .unwrap(); | ||||
|         assert_eq!(nxt.serve_repair, serve_repair_addr); | ||||
|         assert_eq!(rv.0, nxt.serve_repair); | ||||
|  | ||||
|         let serve_repair_addr2 = socketaddr!([127, 0, 0, 2], 1243); | ||||
|         let nxt = ContactInfo::new( | ||||
|             &Pubkey::new_rand(), | ||||
|             socketaddr!([127, 0, 0, 1], 1234), | ||||
|             socketaddr!([127, 0, 0, 1], 1235), | ||||
|             socketaddr!([127, 0, 0, 1], 1236), | ||||
|             socketaddr!([127, 0, 0, 1], 1237), | ||||
|             socketaddr!([127, 0, 0, 1], 1238), | ||||
|             socketaddr!([127, 0, 0, 1], 1239), | ||||
|             socketaddr!([127, 0, 0, 1], 1240), | ||||
|             socketaddr!([127, 0, 0, 1], 1241), | ||||
|             socketaddr!([127, 0, 0, 1], 1242), | ||||
|             serve_repair_addr2, | ||||
|             0, | ||||
|         ); | ||||
|         cluster_info.write().unwrap().insert_info(nxt); | ||||
|         let mut one = false; | ||||
|         let mut two = false; | ||||
|         while !one || !two { | ||||
|             //this randomly picks an option, so eventually it should pick both | ||||
|             let rv = serve_repair | ||||
|                 .repair_request(&RepairType::Shred(0, 0)) | ||||
|                 .unwrap(); | ||||
|             if rv.0 == serve_repair_addr { | ||||
|                 one = true; | ||||
|             } | ||||
|             if rv.0 == serve_repair_addr2 { | ||||
|                 two = true; | ||||
|             } | ||||
|         } | ||||
|         assert!(one && two); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn run_orphan() { | ||||
|         solana_logger::setup(); | ||||
|         let recycler = PacketsRecycler::default(); | ||||
|         let ledger_path = get_tmp_ledger_path!(); | ||||
|         { | ||||
|             let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); | ||||
|             let rv = | ||||
|                 ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 2, 0); | ||||
|             assert!(rv.is_none()); | ||||
|  | ||||
|             // Create slots 1, 2, 3 with 5 shreds apiece | ||||
|             let (shreds, _) = make_many_slot_entries(1, 3, 5); | ||||
|  | ||||
|             blockstore | ||||
|                 .insert_shreds(shreds, None, false) | ||||
|                 .expect("Expect successful ledger write"); | ||||
|  | ||||
|             // We don't have slot 4, so we don't know how to service this requeset | ||||
|             let rv = | ||||
|                 ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 4, 5); | ||||
|             assert!(rv.is_none()); | ||||
|  | ||||
|             // For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively | ||||
|             // for this request | ||||
|             let rv: Vec<_> = | ||||
|                 ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 3, 5) | ||||
|                     .expect("run_orphan packets") | ||||
|                     .packets | ||||
|                     .iter() | ||||
|                     .map(|b| b.clone()) | ||||
|                     .collect(); | ||||
|             let expected: Vec<_> = (1..=3) | ||||
|                 .rev() | ||||
|                 .map(|slot| { | ||||
|                     let index = blockstore.meta(slot).unwrap().unwrap().received - 1; | ||||
|                     ServeRepair::get_data_shred_as_packet( | ||||
|                         &blockstore, | ||||
|                         slot, | ||||
|                         index, | ||||
|                         &socketaddr_any!(), | ||||
|                     ) | ||||
|                     .unwrap() | ||||
|                     .unwrap() | ||||
|                 }) | ||||
|                 .collect(); | ||||
|             assert_eq!(rv, expected) | ||||
|         } | ||||
|  | ||||
|         Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); | ||||
|     } | ||||
| } | ||||
| @@ -1,57 +0,0 @@ | ||||
| use crate::serve_repair::ServeRepair; | ||||
| use crate::streamer; | ||||
| use solana_ledger::blockstore::Blockstore; | ||||
| use solana_perf::recycler::Recycler; | ||||
| use std::net::UdpSocket; | ||||
| use std::sync::atomic::AtomicBool; | ||||
| use std::sync::mpsc::channel; | ||||
| use std::sync::{Arc, RwLock}; | ||||
| use std::thread::{self, JoinHandle}; | ||||
|  | ||||
| pub struct ServeRepairService { | ||||
|     thread_hdls: Vec<JoinHandle<()>>, | ||||
| } | ||||
|  | ||||
| impl ServeRepairService { | ||||
|     pub fn new( | ||||
|         serve_repair: &Arc<RwLock<ServeRepair>>, | ||||
|         blockstore: Option<Arc<Blockstore>>, | ||||
|         serve_repair_socket: UdpSocket, | ||||
|         exit: &Arc<AtomicBool>, | ||||
|     ) -> Self { | ||||
|         let (request_sender, request_receiver) = channel(); | ||||
|         let serve_repair_socket = Arc::new(serve_repair_socket); | ||||
|         trace!( | ||||
|             "ServeRepairService: id: {}, listening on: {:?}", | ||||
|             &serve_repair.read().unwrap().my_info().id, | ||||
|             serve_repair_socket.local_addr().unwrap() | ||||
|         ); | ||||
|         let t_receiver = streamer::receiver( | ||||
|             serve_repair_socket.clone(), | ||||
|             &exit, | ||||
|             request_sender, | ||||
|             Recycler::default(), | ||||
|             "serve_repair_receiver", | ||||
|         ); | ||||
|         let (response_sender, response_receiver) = channel(); | ||||
|         let t_responder = | ||||
|             streamer::responder("serve-repairs", serve_repair_socket, response_receiver); | ||||
|         let t_listen = ServeRepair::listen( | ||||
|             serve_repair.clone(), | ||||
|             blockstore, | ||||
|             request_receiver, | ||||
|             response_sender, | ||||
|             exit, | ||||
|         ); | ||||
|  | ||||
|         let thread_hdls = vec![t_receiver, t_responder, t_listen]; | ||||
|         Self { thread_hdls } | ||||
|     } | ||||
|  | ||||
|     pub fn join(self) -> thread::Result<()> { | ||||
|         for thread_hdl in self.thread_hdls { | ||||
|             thread_hdl.join()?; | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| @@ -12,8 +12,6 @@ use crate::{ | ||||
|     rpc_pubsub_service::PubSubService, | ||||
|     rpc_service::JsonRpcService, | ||||
|     rpc_subscriptions::RpcSubscriptions, | ||||
|     serve_repair::ServeRepair, | ||||
|     serve_repair_service::ServeRepairService, | ||||
|     sigverify, | ||||
|     storage_stage::StorageState, | ||||
|     tpu::Tpu, | ||||
| @@ -123,7 +121,6 @@ pub struct Validator { | ||||
|     rpc_service: Option<(JsonRpcService, PubSubService)>, | ||||
|     transaction_status_service: Option<TransactionStatusService>, | ||||
|     gossip_service: GossipService, | ||||
|     serve_repair_service: ServeRepairService, | ||||
|     poh_recorder: Arc<Mutex<PohRecorder>>, | ||||
|     poh_service: PohService, | ||||
|     tpu: Tpu, | ||||
| @@ -305,19 +302,12 @@ impl Validator { | ||||
|  | ||||
|         let gossip_service = GossipService::new( | ||||
|             &cluster_info, | ||||
|             Some(blockstore.clone()), | ||||
|             Some(bank_forks.clone()), | ||||
|             node.sockets.gossip, | ||||
|             &exit, | ||||
|         ); | ||||
|  | ||||
|         let serve_repair = Arc::new(RwLock::new(ServeRepair::new(cluster_info.clone()))); | ||||
|         let serve_repair_service = ServeRepairService::new( | ||||
|             &serve_repair, | ||||
|             Some(blockstore.clone()), | ||||
|             node.sockets.serve_repair, | ||||
|             &exit, | ||||
|         ); | ||||
|  | ||||
|         // Insert the entrypoint info, should only be None if this node | ||||
|         // is the bootstrap validator | ||||
|         if let Some(entrypoint_info) = entrypoint_info_option { | ||||
| @@ -413,7 +403,6 @@ impl Validator { | ||||
|         Self { | ||||
|             id, | ||||
|             gossip_service, | ||||
|             serve_repair_service, | ||||
|             rpc_service, | ||||
|             transaction_status_service, | ||||
|             tpu, | ||||
| @@ -474,7 +463,6 @@ impl Validator { | ||||
|         } | ||||
|  | ||||
|         self.gossip_service.join()?; | ||||
|         self.serve_repair_service.join()?; | ||||
|         self.tpu.join()?; | ||||
|         self.tvu.join()?; | ||||
|         self.ip_echo_server.shutdown_now(); | ||||
|   | ||||
| @@ -21,7 +21,8 @@ fn test_node(exit: &Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, GossipService | ||||
|         test_node.info.clone(), | ||||
|         keypair, | ||||
|     ))); | ||||
|     let gossip_service = GossipService::new(&cluster_info, None, test_node.sockets.gossip, exit); | ||||
|     let gossip_service = | ||||
|         GossipService::new(&cluster_info, None, None, test_node.sockets.gossip, exit); | ||||
|     let _ = cluster_info.read().unwrap().my_data(); | ||||
|     ( | ||||
|         cluster_info, | ||||
|   | ||||
| @@ -6,7 +6,6 @@ use solana_core::{ | ||||
|     cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE}, | ||||
|     contact_info::ContactInfo, | ||||
|     gossip_service::discover_cluster, | ||||
|     serve_repair::ServeRepair, | ||||
|     storage_stage::SLOTS_PER_TURN_TEST, | ||||
|     validator::ValidatorConfig, | ||||
| }; | ||||
| @@ -62,11 +61,10 @@ fn run_archiver_startup_basic(num_nodes: usize, num_archivers: usize) { | ||||
|     let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( | ||||
|         cluster_nodes[0].clone(), | ||||
|     ))); | ||||
|     let serve_repair = ServeRepair::new(cluster_info); | ||||
|     let path = get_tmp_ledger_path!(); | ||||
|     let blockstore = Arc::new(Blockstore::open(&path).unwrap()); | ||||
|     Archiver::download_from_archiver( | ||||
|         &serve_repair, | ||||
|         &cluster_info, | ||||
|         &archiver_info, | ||||
|         &blockstore, | ||||
|         slots_per_segment, | ||||
|   | ||||
| @@ -6903,7 +6903,7 @@ | ||||
|       "renderer": "flot", | ||||
|       "seriesOverrides": [ | ||||
|         { | ||||
|           "alias": "serve_repair-repair_highest.ix", | ||||
|           "alias": "cluster_info-repair_highest.ix", | ||||
|           "yaxis": 2 | ||||
|         } | ||||
|       ], | ||||
| @@ -6928,7 +6928,7 @@ | ||||
|           ], | ||||
|           "orderByTime": "ASC", | ||||
|           "policy": "default", | ||||
|           "query": "SELECT last(\"repair-highest-slot\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", | ||||
|           "query": "SELECT last(\"repair-highest-slot\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", | ||||
|           "rawQuery": true, | ||||
|           "refId": "C", | ||||
|           "resultFormat": "time_series", | ||||
| @@ -6965,7 +6965,7 @@ | ||||
|           ], | ||||
|           "orderByTime": "ASC", | ||||
|           "policy": "default", | ||||
|           "query": "SELECT last(\"repair-highest-ix\") AS \"ix\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", | ||||
|           "query": "SELECT last(\"repair-highest-ix\") AS \"ix\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", | ||||
|           "rawQuery": true, | ||||
|           "refId": "A", | ||||
|           "resultFormat": "time_series", | ||||
| @@ -7064,7 +7064,7 @@ | ||||
|       "renderer": "flot", | ||||
|       "seriesOverrides": [ | ||||
|         { | ||||
|           "alias": "serve_repair-repair.repair-ix", | ||||
|           "alias": "cluster_info-repair.repair-ix", | ||||
|           "yaxis": 2 | ||||
|         } | ||||
|       ], | ||||
| @@ -7089,7 +7089,7 @@ | ||||
|           ], | ||||
|           "orderByTime": "ASC", | ||||
|           "policy": "default", | ||||
|           "query": "SELECT last(\"repair-ix\") AS \"repair-ix\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", | ||||
|           "query": "SELECT last(\"repair-ix\") AS \"repair-ix\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", | ||||
|           "rawQuery": true, | ||||
|           "refId": "C", | ||||
|           "resultFormat": "time_series", | ||||
| @@ -7126,7 +7126,7 @@ | ||||
|           ], | ||||
|           "orderByTime": "ASC", | ||||
|           "policy": "default", | ||||
|           "query": "SELECT last(\"repair-slot\") AS \"repair-slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", | ||||
|           "query": "SELECT last(\"repair-slot\") AS \"repair-slot\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", | ||||
|           "rawQuery": true, | ||||
|           "refId": "A", | ||||
|           "resultFormat": "time_series", | ||||
| @@ -7245,7 +7245,7 @@ | ||||
|           ], | ||||
|           "orderByTime": "ASC", | ||||
|           "policy": "default", | ||||
|           "query": "SELECT last(\"repair-orphan\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair_orphan\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", | ||||
|           "query": "SELECT last(\"repair-orphan\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair_orphan\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", | ||||
|           "rawQuery": true, | ||||
|           "refId": "C", | ||||
|           "resultFormat": "time_series", | ||||
|   | ||||
| @@ -213,6 +213,7 @@ fn get_rpc_addr( | ||||
|     let gossip_service = GossipService::new( | ||||
|         &cluster_info.clone(), | ||||
|         None, | ||||
|         None, | ||||
|         node.sockets.gossip.try_clone().unwrap(), | ||||
|         &gossip_exit_flag, | ||||
|     ); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user