diff --git a/src/blocktree.rs b/src/blocktree.rs index bb4fd628af..f94b99ebab 100644 --- a/src/blocktree.rs +++ b/src/blocktree.rs @@ -462,7 +462,7 @@ impl Blocktree { } } - pub fn write_shared_blobs(&self, shared_blobs: I) -> Result> + pub fn write_shared_blobs(&self, shared_blobs: I) -> Result<()> where I: IntoIterator, I::Item: Borrow, @@ -476,21 +476,18 @@ impl Blocktree { let blobs = r_blobs.iter().map(|s| &**s); - let new_entries = self.insert_data_blobs(blobs)?; - Ok(new_entries) + self.insert_data_blobs(blobs) } - pub fn write_blobs(&self, blobs: I) -> Result> + pub fn write_blobs(&self, blobs: I) -> Result<()> where I: IntoIterator, I::Item: Borrow, { - //let blobs = blobs.into_iter().map(|b| *b.borrow()); - let entries = self.insert_data_blobs(blobs)?; - Ok(entries) + self.insert_data_blobs(blobs) } - pub fn write_entries(&self, slot: u64, index: u64, entries: I) -> Result> + pub fn write_entries(&self, slot: u64, index: u64, entries: I) -> Result<()> where I: IntoIterator, I::Item: Borrow, @@ -509,7 +506,7 @@ impl Blocktree { self.write_blobs(&blobs) } - pub fn insert_data_blobs(&self, new_blobs: I) -> Result> + pub fn insert_data_blobs(&self, new_blobs: I) -> Result<()> where I: IntoIterator, I::Item: Borrow, @@ -521,7 +518,6 @@ impl Blocktree { let new_blobs: Vec<_> = new_blobs.into_iter().collect(); let mut prev_inserted_blob_datas = HashMap::new(); - let mut consecutive_entries = vec![]; for blob in new_blobs.iter() { let blob = blob.borrow(); let blob_slot = blob.slot(); @@ -562,15 +558,12 @@ impl Blocktree { continue; } - let entries = self.insert_data_blob( + let _ = self.insert_data_blob( blob, &mut prev_inserted_blob_datas, slot_meta, &mut write_batch, ); - if let Ok(entries) = entries { - consecutive_entries.extend(entries); - } } // Handle chaining for the working set @@ -599,11 +592,7 @@ impl Blocktree { } } - // TODO: Delete returning these entries and instead have replay_stage query blocktree - // for updates. Returning these entries is to temporarily support current API as to - // not break functionality in db_window. - // Issue: https://github.com/solana-labs/solana/issues/2444 - Ok(consecutive_entries) + Ok(()) } // Fill 'buf' with num_blobs or most number of consecutive @@ -859,7 +848,7 @@ impl Blocktree { // Return error if there was a database error during lookup of any of the // slot indexes let slots: Result>> = slot_heights - .into_iter() + .iter() .map(|slot_height| self.meta_cf.get_slot_meta(*slot_height)) .collect(); @@ -1100,7 +1089,7 @@ impl Blocktree { prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, slot_meta: &mut SlotMeta, write_batch: &mut WriteBatch, - ) -> Result> { + ) -> Result<()> { let blob_index = blob_to_insert.index(); let blob_slot = blob_to_insert.slot(); let blob_size = blob_to_insert.size(); @@ -1111,7 +1100,7 @@ impl Blocktree { return Err(Error::BlocktreeError(BlocktreeError::BlobForIndexExists)); } - let (new_consumed, new_consumed_ticks, blob_datas) = { + let (new_consumed, new_consumed_ticks) = { if slot_meta.consumed == blob_index { let blob_datas = self.get_slot_consecutive_blobs( blob_slot, @@ -1125,7 +1114,6 @@ impl Blocktree { let blob_to_insert = Cow::Borrowed(&blob_to_insert.data[..]); let mut new_consumed_ticks = 0; - let mut entries = vec![]; // Check all the consecutive blobs for ticks for blob_data in once(&blob_to_insert).chain(blob_datas.iter()) { let serialized_entry_data = &blob_data[BLOB_HEADER_SIZE..]; @@ -1135,7 +1123,6 @@ impl Blocktree { if entry.is_tick() { new_consumed_ticks += 1; } - entries.push(entry); } ( @@ -1143,10 +1130,9 @@ impl Blocktree { // get_slot_consecutive_blobs() earlier slot_meta.consumed + blob_datas.len() as u64 + 1, new_consumed_ticks, - entries, ) } else { - (slot_meta.consumed, 0, vec![]) + (slot_meta.consumed, 0) } }; @@ -1162,11 +1148,7 @@ impl Blocktree { slot_meta.received = cmp::max(blob_index + 1, slot_meta.received); slot_meta.consumed = new_consumed; slot_meta.consumed_ticks += new_consumed_ticks; - // TODO: Remove returning these entries and instead have replay_stage query blocktree - // for updates. Returning these entries is to temporarily support current API as to - // not break functionality in db_window. - // Issue: https://github.com/solana-labs/solana/issues/2444 - Ok(blob_datas) + Ok(()) } /// Returns the next consumed index and the number of ticks in the new consumed diff --git a/src/db_window.rs b/src/db_window.rs index 9c68352f91..c932e73ed0 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -1,8 +1,6 @@ //! Set of functions for emulating windowing functions from a database ledger implementation use crate::blocktree::*; -use crate::cluster_info::ClusterInfo; use crate::counter::Counter; -use crate::entry::Entry; #[cfg(feature = "erasure")] use crate::erasure; use crate::leader_scheduler::LeaderScheduler; @@ -13,148 +11,11 @@ use log::Level; use solana_metrics::{influxdb, submit}; use solana_sdk::pubkey::Pubkey; use std::borrow::Borrow; -use std::cmp; -use std::net::SocketAddr; use std::sync::atomic::AtomicUsize; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; pub const MAX_REPAIR_LENGTH: usize = 128; -pub fn generate_repairs(blocktree: &Blocktree, max_repairs: usize) -> Result> { - // Slot height and blob indexes for blobs we want to repair - let mut repairs: Vec<(u64, u64)> = vec![]; - let mut slots = vec![0]; - while repairs.len() < max_repairs && !slots.is_empty() { - let slot_height = slots.pop().unwrap(); - let slot = blocktree.meta(slot_height)?; - if slot.is_none() { - continue; - } - let slot = slot.unwrap(); - slots.extend(slot.next_slots.clone()); - - if slot.contains_all_ticks(blocktree) { - continue; - } else { - let num_unreceived_ticks = { - if slot.consumed == slot.received { - slot.num_expected_ticks(blocktree) - slot.consumed_ticks - } else { - 0 - } - }; - - let upper = slot.received + num_unreceived_ticks; - - let reqs = blocktree.find_missing_data_indexes( - 0, - slot.consumed, - upper, - max_repairs - repairs.len(), - ); - - repairs.extend(reqs.into_iter().map(|i| (slot_height, i))) - } - } - - Ok(repairs) -} - -pub fn repair( - blocktree: &Blocktree, - slot_index: u64, - cluster_info: &Arc>, - id: &Pubkey, - times: usize, - tick_height: u64, - max_entry_height: u64, - leader_scheduler_option: &Arc>, -) -> Result)>> { - let rcluster_info = cluster_info.read().unwrap(); - let is_next_leader = false; - let meta = blocktree.meta(slot_index)?; - if meta.is_none() { - return Ok(vec![]); - } - let meta = meta.unwrap(); - - let consumed = meta.consumed; - let received = meta.received; - - // Repair should only be called when received > consumed, enforced in window_service - assert!(received > consumed); - - // Check if we are the next next slot leader - { - let leader_scheduler = leader_scheduler_option.read().unwrap(); - let next_slot = leader_scheduler.tick_height_to_slot(tick_height) + 1; - match leader_scheduler.get_leader_for_slot(next_slot) { - Some(leader_id) if leader_id == *id => true, - // In the case that we are not in the current scope of the leader schedule - // window then either: - // - // 1) The replay stage hasn't caught up to the "consumed" entries we sent, - // in which case it will eventually catch up - // - // 2) We are on the border between ticks_per_epochs, so the - // schedule won't be known until the entry on that cusp is received - // by the replay stage (which comes after this stage). Hence, the next - // leader at the beginning of that next epoch will not know they are the - // leader until they receive that last "cusp" entry. The leader also won't ask for repairs - // for that entry because "is_next_leader" won't be set here. In this case, - // everybody will be blocking waiting for that "cusp" entry instead of repairing, - // until the leader hits "times" >= the max times in calculate_max_repair_entry_height(). - // The impact of this, along with the similar problem from broadcast for the transitioning - // leader, can be observed in the multinode test, test_full_leader_validator_network(), - None => false, - _ => false, - } - }; - - let num_peers = rcluster_info.repair_peers().len() as u64; - - // Check if there's a max_entry_height limitation - let max_repair_entry_height = if max_entry_height == 0 { - calculate_max_repair_entry_height(num_peers, consumed, received, times, is_next_leader) - } else { - max_entry_height + 2 - }; - - let idxs = blocktree.find_missing_data_indexes( - DEFAULT_SLOT_HEIGHT, - consumed, - max_repair_entry_height - 1, - MAX_REPAIR_LENGTH, - ); - - let reqs: Vec<_> = idxs - .into_iter() - .filter_map(|pix| rcluster_info.window_index_request(slot_index, pix).ok()) - .collect(); - - drop(rcluster_info); - - inc_new_counter_info!("streamer-repair_window-repair", reqs.len()); - - if log_enabled!(Level::Trace) { - trace!( - "{}: repair_window counter times: {} consumed: {} received: {} max_repair_entry_height: {} missing: {}", - id, - times, - consumed, - received, - max_repair_entry_height, - reqs.len() - ); - for (to, _) in &reqs { - trace!("{}: repair_window request to {}", id, to); - } - } - - Ok(reqs) -} - pub fn retransmit_all_leader_blocks( dq: &[SharedBlob], leader_scheduler: &Arc>, @@ -207,17 +68,11 @@ pub fn add_blob_to_retransmit_queue( } } -/// Process a blob: Add blob to the ledger window. If a continuous set of blobs -/// starting from consumed is thereby formed, add that continuous -/// range of blobs to a queue to be sent on to the next stage. +/// Process a blob: Add blob to the ledger window. pub fn process_blob( leader_scheduler: &Arc>, blocktree: &Arc, blob: &SharedBlob, - max_ix: u64, - consume_queue: &mut Vec, - tick_height: &mut u64, - done: &Arc, ) -> Result<()> { let is_coding = blob.read().unwrap().is_coding(); @@ -238,23 +93,18 @@ pub fn process_blob( return Ok(()); // Occurs as a leader is rotating into a validator } - // Insert the new blob into the window - let mut consumed_entries = if is_coding { + // Insert the new blob into block tree + if is_coding { let blob = &blob.read().unwrap(); blocktree.put_coding_blob_bytes(slot, pix, &blob.data[..BLOB_HEADER_SIZE + blob.size()])?; - vec![] } else { - blocktree.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])? - }; + blocktree.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])?; + } #[cfg(feature = "erasure")] { - // If write_shared_blobs() of these recovered blobs fails fails, don't return - // because consumed_entries might be nonempty from earlier, and tick height needs to - // be updated. Hopefully we can recover these blobs next time successfully. - // TODO: Support per-slot erasure. Issue: https://github.com/solana-labs/solana/issues/2441 - if let Err(e) = try_erasure(blocktree, &mut consumed_entries, 0) { + if let Err(e) = try_erasure(blocktree, 0) { trace!( "erasure::recover failed to write recovered coding blobs. Err: {:?}", e @@ -262,60 +112,11 @@ pub fn process_blob( } } - for entry in &consumed_entries { - *tick_height += entry.is_tick() as u64; - } - - // For downloading storage blobs, - // we only want up to a certain index - // then stop - if max_ix != 0 && !consumed_entries.is_empty() { - let meta = blocktree - .meta(0)? - .expect("Expect metadata to exist if consumed entries is nonzero"); - - let consumed = meta.consumed; - - // Check if we ran over the last wanted entry - if consumed > max_ix { - let consumed_entries_len = consumed_entries.len(); - let extra_unwanted_entries_len = - cmp::min(consumed_entries_len, (consumed - (max_ix + 1)) as usize); - consumed_entries.truncate(consumed_entries_len - extra_unwanted_entries_len); - done.store(true, Ordering::Relaxed); - } - } - - consume_queue.extend(consumed_entries); Ok(()) } -pub fn calculate_max_repair_entry_height( - num_peers: u64, - consumed: u64, - received: u64, - times: usize, - is_next_leader: bool, -) -> u64 { - // Calculate the highest blob index that this node should have already received - // via avalanche. The avalanche splits data stream into nodes and each node retransmits - // the data to their peer nodes. So there's a possibility that a blob (with index lower - // than current received index) is being retransmitted by a peer node. - if times >= 8 || is_next_leader { - // if repair backoff is getting high, or if we are the next leader, - // don't wait for avalanche. received - 1 is the index of the highest blob. - received - } else { - cmp::max(consumed, received.saturating_sub(num_peers)) - } -} - #[cfg(feature = "erasure")] -fn try_erasure( - blocktree: &Arc, - consume_queue: &mut Vec, - slot_index: u64, -) -> Result<()> { +fn try_erasure(blocktree: &Arc, slot_index: u64) -> Result<()> { let meta = blocktree.meta(slot_index)?; if let Some(meta) = meta { @@ -329,11 +130,10 @@ fn try_erasure( )?; } - let entries = blocktree.write_shared_blobs(data)?; - consume_queue.extend(entries); + blocktree.write_shared_blobs(data) + } else { + Ok(()) } - - Ok(()) } #[cfg(test)] @@ -418,23 +218,6 @@ mod test { t_responder.join().expect("join"); } - #[test] - pub fn test_calculate_max_repair_entry_height() { - assert_eq!(calculate_max_repair_entry_height(20, 4, 11, 0, false), 4); - assert_eq!(calculate_max_repair_entry_height(0, 10, 90, 0, false), 90); - assert_eq!(calculate_max_repair_entry_height(15, 10, 90, 32, false), 90); - assert_eq!(calculate_max_repair_entry_height(15, 10, 90, 0, false), 75); - assert_eq!(calculate_max_repair_entry_height(90, 10, 90, 0, false), 10); - assert_eq!(calculate_max_repair_entry_height(90, 10, 50, 0, false), 10); - assert_eq!(calculate_max_repair_entry_height(90, 10, 99, 0, false), 10); - assert_eq!(calculate_max_repair_entry_height(90, 10, 101, 0, false), 11); - assert_eq!(calculate_max_repair_entry_height(90, 10, 101, 0, true), 101); - assert_eq!( - calculate_max_repair_entry_height(90, 10, 101, 30, true), - 101 - ); - } - #[test] pub fn test_retransmit() { let leader = Keypair::new().pubkey(); @@ -483,79 +266,6 @@ mod test { assert!(blob_receiver.try_recv().is_err()); } - #[test] - pub fn test_generate_repairs() { - let blocktree_path = get_tmp_ledger_path("test_generate_repairs"); - let num_ticks_per_slot = 10; - let blocktree_config = BlocktreeConfig::new(num_ticks_per_slot); - let blocktree = Blocktree::open_config(&blocktree_path, blocktree_config).unwrap(); - - let num_entries_per_slot = 10; - let num_slots = 2; - let mut blobs = make_tiny_test_entries(num_slots * num_entries_per_slot).to_blobs(); - - // Insert every nth entry for each slot - let nth = 3; - for (i, b) in blobs.iter_mut().enumerate() { - b.set_index(((i % num_entries_per_slot) * nth) as u64); - b.set_slot((i / num_entries_per_slot) as u64); - } - - blocktree.write_blobs(&blobs).unwrap(); - - let missing_indexes_per_slot: Vec = (0..num_entries_per_slot - 1) - .flat_map(|x| ((nth * x + 1) as u64..(nth * x + nth) as u64)) - .collect(); - - let expected: Vec<(u64, u64)> = (0..num_slots) - .flat_map(|slot_height| { - missing_indexes_per_slot - .iter() - .map(move |blob_index| (slot_height as u64, *blob_index)) - }) - .collect(); - - // Across all slots, find all missing indexes in the range [0, num_entries_per_slot * nth] - assert_eq!( - generate_repairs(&blocktree, std::usize::MAX).unwrap(), - expected - ); - - assert_eq!( - generate_repairs(&blocktree, expected.len() - 2).unwrap()[..], - expected[0..expected.len() - 2] - ); - - // Now fill in all the holes for each slot such that for each slot, consumed == received. - // Because none of the slots contain ticks, we should see that the repair requests - // ask for ticks, starting from the last received index for that slot - for (slot_height, blob_index) in expected { - let mut b = make_tiny_test_entries(1).to_blobs().pop().unwrap(); - b.set_index(blob_index); - b.set_slot(slot_height); - blocktree.write_blobs(&vec![b]).unwrap(); - } - - let last_index_per_slot = ((num_entries_per_slot - 1) * nth) as u64; - let missing_indexes_per_slot: Vec = - (last_index_per_slot + 1..last_index_per_slot + 1 + num_ticks_per_slot).collect(); - let expected: Vec<(u64, u64)> = (0..num_slots) - .flat_map(|slot_height| { - missing_indexes_per_slot - .iter() - .map(move |blob_index| (slot_height as u64, *blob_index)) - }) - .collect(); - assert_eq!( - generate_repairs(&blocktree, std::usize::MAX).unwrap(), - expected - ); - assert_eq!( - generate_repairs(&blocktree, expected.len() - 2).unwrap()[..], - expected[0..expected.len() - 2] - ); - } - #[test] pub fn test_find_missing_data_indexes_sanity() { let slot = DEFAULT_SLOT_HEIGHT; @@ -813,9 +523,7 @@ mod test { let ledger_path = get_tmp_ledger_path("test_try_erasure"); let blocktree = Arc::new(generate_blocktree_from_window(&ledger_path, &window, false)); - let mut consume_queue = vec![]; - try_erasure(&blocktree, &mut consume_queue, DEFAULT_SLOT_HEIGHT) - .expect("Expected successful erasure attempt"); + try_erasure(&blocktree, DEFAULT_SLOT_HEIGHT).expect("Expected successful erasure attempt"); window[erased_index].data = erased_data; { @@ -829,7 +537,17 @@ mod test { let locked_data: Vec<&Blob> = locks.iter().map(|lock| &**lock).collect(); let (expected, _) = reconstruct_entries_from_blobs(locked_data).unwrap(); - assert_eq!(consume_queue, expected); + + assert_eq!( + blocktree + .get_slot_entries( + 0, + erased_index as u64, + Some((end_index - erased_index) as u64) + ) + .unwrap(), + expected + ); } let erased_coding_l = erased_coding.read().unwrap(); @@ -862,24 +580,15 @@ mod test { &vec![DEFAULT_SLOT_HEIGHT; num_entries], ); - let mut consume_queue = vec![]; - let mut tick_height = 2; - let done = Arc::new(AtomicBool::new(false)); - for blob in shared_blobs.iter().rev() { - process_blob( - &leader_scheduler, - &blocktree, - blob, - 0, - &mut consume_queue, - &mut tick_height, - &done, - ) - .expect("Expect successful processing of blob"); + process_blob(&leader_scheduler, &blocktree, blob) + .expect("Expect successful processing of blob"); } - assert_eq!(consume_queue, original_entries); + assert_eq!( + blocktree.get_slot_entries(0, 0, None).unwrap(), + original_entries + ); drop(blocktree); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); diff --git a/src/replicator.rs b/src/replicator.rs index 7c70a655b1..56aab3d89a 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -82,7 +82,7 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { fn get_entry_heights_from_last_id( signature: &ring::signature::Signature, storage_entry_height: u64, -) -> (u64, u64) { +) -> u64 { let signature_vec = signature.as_ref(); let mut segment_index = u64::from(signature_vec[0]) | (u64::from(signature_vec[1]) << 8) @@ -90,10 +90,7 @@ fn get_entry_heights_from_last_id( | (u64::from(signature_vec[2]) << 24); let max_segment_index = get_segment_from_entry(storage_entry_height); segment_index %= max_segment_index as u64; - let entry_height = segment_index * ENTRIES_PER_SEGMENT; - let max_entry_height = entry_height + ENTRIES_PER_SEGMENT; - - (entry_height, max_entry_height) + segment_index * ENTRIES_PER_SEGMENT } impl Replicator { @@ -116,7 +113,6 @@ impl Replicator { timeout: Option, ) -> Result { let exit = Arc::new(AtomicBool::new(false)); - let done = Arc::new(AtomicBool::new(false)); let timeout = timeout.unwrap_or_else(|| Duration::new(30, 0)); info!("Replicator: id: {}", keypair.pubkey()); @@ -156,8 +152,7 @@ impl Replicator { Self::poll_for_last_id_and_entry_height(&cluster_info)?; let signature = keypair.sign(storage_last_id.as_ref()); - let (entry_height, max_entry_height) = - get_entry_heights_from_last_id(&signature, storage_entry_height); + let entry_height = get_entry_heights_from_last_id(&signature, storage_entry_height); info!("replicating entry_height: {}", entry_height); @@ -175,13 +170,10 @@ impl Replicator { let window_service = WindowService::new( blocktree.clone(), cluster_info.clone(), - 0, - max_entry_height, blob_fetch_receiver, retransmit_sender, repair_socket, Arc::new(RwLock::new(LeaderScheduler::default())), - done.clone(), exit.clone(), ); diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index ab70bf84d2..da8f4bb1a8 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -128,7 +128,6 @@ impl RetransmitStage { bank: &Arc, blocktree: Arc, cluster_info: &Arc>, - tick_height: u64, retransmit_socket: Arc, repair_socket: Arc, fetch_stage_receiver: BlobReceiver, @@ -143,17 +142,13 @@ impl RetransmitStage { cluster_info.clone(), retransmit_receiver, ); - let done = Arc::new(AtomicBool::new(false)); let window_service = WindowService::new( blocktree, cluster_info.clone(), - tick_height, - 0, fetch_stage_receiver, retransmit_sender, repair_socket, leader_scheduler, - done, exit, ); diff --git a/src/sigverify.rs b/src/sigverify.rs index a227b858ca..2dc0f277aa 100644 --- a/src/sigverify.rs +++ b/src/sigverify.rs @@ -154,7 +154,7 @@ pub fn generate_offsets(batches: &[SharedPackets]) -> Result { let mut msg_sizes: Vec<_> = Vec::new(); let mut current_packet = 0; let mut v_sig_lens = Vec::new(); - batches.into_iter().for_each(|p| { + batches.iter().for_each(|p| { let mut sig_lens = Vec::new(); p.read().unwrap().packets.iter().for_each(|packet| { let current_offset = current_packet as u32 * size_of::() as u32; diff --git a/src/tvu.rs b/src/tvu.rs index 4bf3a3a9fe..84239698b6 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -110,7 +110,6 @@ impl Tvu { bank, blocktree.clone(), &cluster_info, - bank.tick_height(), Arc::new(retransmit_socket), repair_socket, blob_fetch_receiver, diff --git a/src/window_service.rs b/src/window_service.rs index 82664e9169..e77f719c94 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -32,11 +32,8 @@ fn recv_window( blocktree: &Arc, id: &Pubkey, leader_scheduler: &Arc>, - tick_height: &mut u64, - max_ix: u64, r: &BlobReceiver, retransmit: &BlobSender, - done: &Arc, ) -> Result<()> { let timer = Duration::from_millis(200); let mut dq = r.recv_timeout(timer)?; @@ -56,8 +53,6 @@ fn recv_window( retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit, id)?; //send a contiguous set of blocks - let mut consume_queue = Vec::new(); - trace!("{} num blobs received: {}", id, dq.len()); for b in dq { @@ -68,15 +63,7 @@ fn recv_window( trace!("{} window pix: {} size: {}", id, pix, meta_size); - let _ = process_blob( - leader_scheduler, - blocktree, - &b, - max_ix, - &mut consume_queue, - tick_height, - done, - ); + let _ = process_blob(leader_scheduler, blocktree, &b); } trace!( @@ -115,13 +102,10 @@ impl WindowService { pub fn new( blocktree: Arc, cluster_info: Arc>, - tick_height: u64, - max_entry_height: u64, r: BlobReceiver, retransmit: BlobSender, repair_socket: Arc, leader_scheduler: Arc>, - done: Arc, exit: Arc, ) -> WindowService { let exit_ = exit.clone(); @@ -135,23 +119,14 @@ impl WindowService { .name("solana-window".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_); - let mut tick_height_ = tick_height; let id = cluster_info.read().unwrap().id(); trace!("{}: RECV_WINDOW started", id); loop { if exit.load(Ordering::Relaxed) { break; } - if let Err(e) = recv_window( - &blocktree, - &id, - &leader_scheduler, - &mut tick_height_, - max_entry_height, - &r, - &retransmit, - &done, - ) { + if let Err(e) = recv_window(&blocktree, &id, &leader_scheduler, &r, &retransmit) + { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -217,7 +192,6 @@ mod test { let t_receiver = blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader); let (s_retransmit, r_retransmit) = channel(); - let done = Arc::new(AtomicBool::new(false)); let blocktree_path = get_tmp_ledger_path("window_send_test"); let blocktree = Arc::new( Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"), @@ -227,13 +201,10 @@ mod test { let t_window = WindowService::new( blocktree, subs, - 0, - 0, r_reader, s_retransmit, Arc::new(leader_node.sockets.repair), Arc::new(RwLock::new(leader_schedule)), - done, exit.clone(), ); let t_responder = { @@ -298,7 +269,6 @@ mod test { let t_receiver = blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader); let (s_retransmit, r_retransmit) = channel(); - let done = Arc::new(AtomicBool::new(false)); let blocktree_path = get_tmp_ledger_path("window_send_late_leader_test"); let blocktree = Arc::new( Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"), @@ -308,13 +278,10 @@ mod test { let t_window = WindowService::new( blocktree, subs.clone(), - 0, - 0, r_reader, s_retransmit, Arc::new(leader_node.sockets.repair), Arc::new(RwLock::new(leader_schedule)), - done, exit.clone(), ); let t_responder = {