diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 4db0803e6d..18e2335d2a 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -65,8 +65,8 @@ pub fn discover(gossip_addr: &SocketAddr, num_nodes: usize) -> std::io::Result= num_nodes { + let tvu_peers = spy_ref.read().unwrap().tvu_peers(); + if tvu_peers.len() >= num_nodes { info!( "discover success in {}s...\n{}", now.elapsed().as_secs(), @@ -75,7 +75,7 @@ pub fn discover(gossip_addr: &SocketAddr, num_nodes: usize) -> std::io::Result, + thread_handles: Vec>, exit: Arc, slot: u64, ledger_path: String, keypair: Arc, signature: ring::signature::Signature, cluster_entrypoint: ContactInfo, - node_info: ContactInfo, - cluster_info: Arc>, ledger_data_file_encrypted: PathBuf, sampling_offsets: Vec, hash: Hash, - blocktree: Arc, #[cfg(feature = "chacha")] num_chacha_blocks: usize, + #[cfg(feature = "chacha")] + blocktree: Arc, } pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { @@ -108,6 +117,54 @@ fn get_entry_heights_from_blockhash( segment_index * ENTRIES_PER_SEGMENT } +fn create_request_processor( + socket: UdpSocket, + exit: &Arc, + slot: u64, +) -> Vec> { + let mut thread_handles = vec![]; + let (s_reader, r_reader) = channel(); + let (s_responder, r_responder) = channel(); + let storage_socket = Arc::new(socket); + let t_receiver = receiver( + storage_socket.clone(), + exit, + s_reader, + "replicator-receiver", + ); + thread_handles.push(t_receiver); + + let t_responder = responder("replicator-responder", storage_socket.clone(), r_responder); + thread_handles.push(t_responder); + + let exit4 = exit.clone(); + let t_processor = spawn(move || loop { + let packets = r_reader.recv_timeout(Duration::from_secs(1)); + if let Ok(packets) = packets { + for packet in &packets.read().unwrap().packets { + let req: result::Result> = + deserialize(&packet.data[..packet.meta.size]); + match req { + Ok(ReplicatorRequest::GetSlotHeight(from)) => { + if let Ok(blob) = to_shared_blob(slot, from) { + let _ = s_responder.send(vec![blob]); + } + } + Err(e) => { + info!("invalid request: {:?}", e); + } + } + } + } + if exit4.load(Ordering::Relaxed) { + break; + } + }); + thread_handles.push(t_processor); + + thread_handles +} + impl Replicator { /// Returns a Result that contains a replicator on success /// @@ -194,6 +251,9 @@ impl Replicator { repair_slot_range, ); + let mut thread_handles = + create_request_processor(node.sockets.storage.unwrap(), &exit, slot); + // receive blobs from retransmit and drop them. let exit2 = exit.clone(); let t_retransmit = spawn(move || loop { @@ -202,31 +262,40 @@ impl Replicator { break; } }); + thread_handles.push(t_retransmit); + + let exit3 = exit.clone(); + let blocktree1 = blocktree.clone(); + let t_replicate = spawn(move || loop { + Self::wait_for_ledger_download(slot, &blocktree1, &exit3, &node_info, &cluster_info); + if exit3.load(Ordering::Relaxed) { + break; + } + }); + thread_handles.push(t_replicate); Ok(Self { gossip_service, fetch_stage, window_service, - t_retransmit, + thread_handles, exit, slot, ledger_path: ledger_path.to_string(), keypair: keypair.clone(), signature, cluster_entrypoint, - node_info, - cluster_info, ledger_data_file_encrypted: PathBuf::default(), sampling_offsets: vec![], hash: Hash::default(), - blocktree, #[cfg(feature = "chacha")] num_chacha_blocks: 0, + #[cfg(feature = "chacha")] + blocktree, }) } pub fn run(&mut self) { - self.wait_for_ledger_download(); self.encrypt_ledger() .expect("ledger encrypt not successful"); loop { @@ -239,30 +308,61 @@ impl Replicator { } } - fn wait_for_ledger_download(&self) { + fn wait_for_ledger_download( + start_slot: u64, + blocktree: &Arc, + exit: &Arc, + node_info: &ContactInfo, + cluster_info: &Arc>, + ) { info!("window created, waiting for ledger download done"); let _start = Instant::now(); let mut _received_so_far = 0; + let mut current_slot = start_slot; + let mut done = false; loop { - if let Ok(entries) = self.blocktree.get_slot_entries(self.slot, 0, None) { - if !entries.is_empty() { + loop { + if let Ok(meta) = blocktree.meta(current_slot) { + if let Some(meta) = meta { + if meta.is_rooted { + current_slot += 1; + info!("current slot: {}", current_slot); + } else { + break; + } + } else { + break; + } + } else { break; } + if current_slot >= start_slot + ENTRIES_PER_SEGMENT { + info!("current slot: {} start: {}", current_slot, start_slot); + done = true; + break; + } + } + + if done { + break; + } + + if exit.load(Ordering::Relaxed) { + break; } sleep(Duration::from_secs(1)); } info!("Done receiving entries from window_service"); - let mut contact_info = self.node_info.clone(); + // Remove replicator from the data plane + let mut contact_info = node_info.clone(); contact_info.tvu = "0.0.0.0:0".parse().unwrap(); { - let mut cluster_info_w = self.cluster_info.write().unwrap(); + let mut cluster_info_w = cluster_info.write().unwrap(); cluster_info_w.insert_self(contact_info); } - - info!("Done downloading ledger at {}", self.ledger_path); } fn encrypt_ledger(&mut self) -> Result<()> { @@ -345,7 +445,9 @@ impl Replicator { self.gossip_service.join().unwrap(); self.fetch_stage.join().unwrap(); self.window_service.join().unwrap(); - self.t_retransmit.join().unwrap(); + for handle in self.thread_handles { + handle.join().unwrap(); + } } pub fn entry_height(&self) -> u64 { diff --git a/core/tests/replicator.rs b/core/tests/replicator.rs index b1b0f3e2d9..72f3d37795 100644 --- a/core/tests/replicator.rs +++ b/core/tests/replicator.rs @@ -4,49 +4,134 @@ extern crate log; #[macro_use] extern crate solana; +use bincode::{deserialize, serialize}; use solana::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree, Blocktree}; -use solana::cluster_info::Node; +use solana::cluster_info::{ClusterInfo, Node}; use solana::contact_info::ContactInfo; +use solana::entry::Entry; use solana::fullnode::{Fullnode, FullnodeConfig}; use solana::gossip_service::discover; use solana::local_cluster::LocalCluster; use solana::replicator::Replicator; +use solana::replicator::ReplicatorRequest; use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT; +use solana::streamer::blob_receiver; use solana_sdk::genesis_block::GenesisBlock; +use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::timing::DEFAULT_SLOTS_PER_EPOCH; use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; use std::fs::remove_dir_all; +use std::net::SocketAddr; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; use std::sync::Arc; +use std::thread::sleep; use std::time::Duration; +fn get_slot_height(to: SocketAddr) -> u64 { + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + socket + .set_read_timeout(Some(Duration::from_secs(5))) + .unwrap(); + + let req = ReplicatorRequest::GetSlotHeight(socket.local_addr().unwrap()); + let serialized_req = serialize(&req).unwrap(); + for _ in 0..10 { + socket.send_to(&serialized_req, to).unwrap(); + let mut buf = [0; 1024]; + if let Ok((size, _addr)) = socket.recv_from(&mut buf) { + return deserialize(&buf[..size]).unwrap(); + } + sleep(Duration::from_millis(500)); + } + panic!("Couldn't get slot height!"); +} + +fn download_from_replicator(replicator_info: &ContactInfo) { + // Create a client which downloads from the replicator and see that it + // can respond with blobs. + let tn = Node::new_localhost(); + let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone()); + let mut repair_index = get_slot_height(replicator_info.storage_addr); + info!("repair index: {}", repair_index); + + repair_index = 0; + let req = cluster_info + .window_index_request_bytes(0, repair_index) + .unwrap(); + + let exit = Arc::new(AtomicBool::new(false)); + let (s_reader, r_reader) = channel(); + let repair_socket = Arc::new(tn.sockets.repair); + let t_receiver = blob_receiver(repair_socket.clone(), &exit, s_reader); + + info!( + "Sending repair requests from: {} to: {}", + tn.info.id, replicator_info.gossip + ); + + let mut received_blob = false; + for _ in 0..5 { + repair_socket.send_to(&req, replicator_info.gossip).unwrap(); + + let x = r_reader.recv_timeout(Duration::new(1, 0)); + + if let Ok(blobs) = x { + for b in blobs { + let br = b.read().unwrap(); + assert!(br.index() == repair_index); + let entry: Entry = deserialize(&br.data()[..br.meta.size]).unwrap(); + info!("entry: {:?}", entry); + assert_ne!(entry.hash, Hash::default()); + received_blob = true; + } + break; + } + } + exit.store(true, Ordering::Relaxed); + t_receiver.join().unwrap(); + + assert!(received_blob); +} + #[test] fn test_replicator_startup_basic() { solana_logger::setup(); info!("starting replicator test"); const NUM_NODES: usize = 2; + let num_replicators = 1; let mut fullnode_config = FullnodeConfig::default(); fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT; - let _cluster = LocalCluster::new_with_config_replicators( + let cluster = LocalCluster::new_with_config_replicators( &[100; NUM_NODES], 10_000, &fullnode_config, - 1, + num_replicators, DEFAULT_TICKS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH, ); - let cluster_nodes = discover(&cluster.entry_point_info.gossip, 3).unwrap(); - assert_eq!(cluster_nodes.len(), 3); + let cluster_nodes = discover( + &cluster.entry_point_info.gossip, + NUM_NODES + num_replicators, + ) + .unwrap(); + assert_eq!(cluster_nodes.len(), NUM_NODES + num_replicators); let mut replicator_count = 0; + let mut replicator_info = ContactInfo::default(); for node in &cluster_nodes { info!("storage: {:?} rpc: {:?}", node.storage_addr, node.rpc); if ContactInfo::is_valid_address(&node.storage_addr) { replicator_count += 1; + replicator_info = node.clone(); } } - assert_eq!(replicator_count, 1); + assert_eq!(replicator_count, num_replicators); + + download_from_replicator(&replicator_info); } #[test]