Cleanup consecutive entries code from window_service (#2697)

* Remove returning entries from db_ledger on insert

* Fix tests to check for correctness

* Delete generate_repairs and max_repair_entry_height
This commit is contained in:
carllin 2019-02-08 14:19:28 -08:00 committed by GitHub
parent 0e29868e34
commit 1278396bd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 48 additions and 404 deletions

View File

@ -462,7 +462,7 @@ impl Blocktree {
}
}
pub fn write_shared_blobs<I>(&self, shared_blobs: I) -> Result<Vec<Entry>>
pub fn write_shared_blobs<I>(&self, shared_blobs: I) -> Result<()>
where
I: IntoIterator,
I::Item: Borrow<SharedBlob>,
@ -476,21 +476,18 @@ impl Blocktree {
let blobs = r_blobs.iter().map(|s| &**s);
let new_entries = self.insert_data_blobs(blobs)?;
Ok(new_entries)
self.insert_data_blobs(blobs)
}
pub fn write_blobs<I>(&self, blobs: I) -> Result<Vec<Entry>>
pub fn write_blobs<I>(&self, blobs: I) -> Result<()>
where
I: IntoIterator,
I::Item: Borrow<Blob>,
{
//let blobs = blobs.into_iter().map(|b| *b.borrow());
let entries = self.insert_data_blobs(blobs)?;
Ok(entries)
self.insert_data_blobs(blobs)
}
pub fn write_entries<I>(&self, slot: u64, index: u64, entries: I) -> Result<Vec<Entry>>
pub fn write_entries<I>(&self, slot: u64, index: u64, entries: I) -> Result<()>
where
I: IntoIterator,
I::Item: Borrow<Entry>,
@ -509,7 +506,7 @@ impl Blocktree {
self.write_blobs(&blobs)
}
pub fn insert_data_blobs<I>(&self, new_blobs: I) -> Result<Vec<Entry>>
pub fn insert_data_blobs<I>(&self, new_blobs: I) -> Result<()>
where
I: IntoIterator,
I::Item: Borrow<Blob>,
@ -521,7 +518,6 @@ impl Blocktree {
let new_blobs: Vec<_> = new_blobs.into_iter().collect();
let mut prev_inserted_blob_datas = HashMap::new();
let mut consecutive_entries = vec![];
for blob in new_blobs.iter() {
let blob = blob.borrow();
let blob_slot = blob.slot();
@ -562,15 +558,12 @@ impl Blocktree {
continue;
}
let entries = self.insert_data_blob(
let _ = self.insert_data_blob(
blob,
&mut prev_inserted_blob_datas,
slot_meta,
&mut write_batch,
);
if let Ok(entries) = entries {
consecutive_entries.extend(entries);
}
}
// Handle chaining for the working set
@ -599,11 +592,7 @@ impl Blocktree {
}
}
// TODO: Delete returning these entries and instead have replay_stage query blocktree
// for updates. Returning these entries is to temporarily support current API as to
// not break functionality in db_window.
// Issue: https://github.com/solana-labs/solana/issues/2444
Ok(consecutive_entries)
Ok(())
}
// Fill 'buf' with num_blobs or most number of consecutive
@ -859,7 +848,7 @@ impl Blocktree {
// Return error if there was a database error during lookup of any of the
// slot indexes
let slots: Result<Vec<Option<SlotMeta>>> = slot_heights
.into_iter()
.iter()
.map(|slot_height| self.meta_cf.get_slot_meta(*slot_height))
.collect();
@ -1100,7 +1089,7 @@ impl Blocktree {
prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>,
slot_meta: &mut SlotMeta,
write_batch: &mut WriteBatch,
) -> Result<Vec<Entry>> {
) -> Result<()> {
let blob_index = blob_to_insert.index();
let blob_slot = blob_to_insert.slot();
let blob_size = blob_to_insert.size();
@ -1111,7 +1100,7 @@ impl Blocktree {
return Err(Error::BlocktreeError(BlocktreeError::BlobForIndexExists));
}
let (new_consumed, new_consumed_ticks, blob_datas) = {
let (new_consumed, new_consumed_ticks) = {
if slot_meta.consumed == blob_index {
let blob_datas = self.get_slot_consecutive_blobs(
blob_slot,
@ -1125,7 +1114,6 @@ impl Blocktree {
let blob_to_insert = Cow::Borrowed(&blob_to_insert.data[..]);
let mut new_consumed_ticks = 0;
let mut entries = vec![];
// Check all the consecutive blobs for ticks
for blob_data in once(&blob_to_insert).chain(blob_datas.iter()) {
let serialized_entry_data = &blob_data[BLOB_HEADER_SIZE..];
@ -1135,7 +1123,6 @@ impl Blocktree {
if entry.is_tick() {
new_consumed_ticks += 1;
}
entries.push(entry);
}
(
@ -1143,10 +1130,9 @@ impl Blocktree {
// get_slot_consecutive_blobs() earlier
slot_meta.consumed + blob_datas.len() as u64 + 1,
new_consumed_ticks,
entries,
)
} else {
(slot_meta.consumed, 0, vec![])
(slot_meta.consumed, 0)
}
};
@ -1162,11 +1148,7 @@ impl Blocktree {
slot_meta.received = cmp::max(blob_index + 1, slot_meta.received);
slot_meta.consumed = new_consumed;
slot_meta.consumed_ticks += new_consumed_ticks;
// TODO: Remove returning these entries and instead have replay_stage query blocktree
// for updates. Returning these entries is to temporarily support current API as to
// not break functionality in db_window.
// Issue: https://github.com/solana-labs/solana/issues/2444
Ok(blob_datas)
Ok(())
}
/// Returns the next consumed index and the number of ticks in the new consumed

View File

@ -1,8 +1,6 @@
//! Set of functions for emulating windowing functions from a database ledger implementation
use crate::blocktree::*;
use crate::cluster_info::ClusterInfo;
use crate::counter::Counter;
use crate::entry::Entry;
#[cfg(feature = "erasure")]
use crate::erasure;
use crate::leader_scheduler::LeaderScheduler;
@ -13,148 +11,11 @@ use log::Level;
use solana_metrics::{influxdb, submit};
use solana_sdk::pubkey::Pubkey;
use std::borrow::Borrow;
use std::cmp;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
pub const MAX_REPAIR_LENGTH: usize = 128;
pub fn generate_repairs(blocktree: &Blocktree, max_repairs: usize) -> Result<Vec<(u64, u64)>> {
// Slot height and blob indexes for blobs we want to repair
let mut repairs: Vec<(u64, u64)> = vec![];
let mut slots = vec![0];
while repairs.len() < max_repairs && !slots.is_empty() {
let slot_height = slots.pop().unwrap();
let slot = blocktree.meta(slot_height)?;
if slot.is_none() {
continue;
}
let slot = slot.unwrap();
slots.extend(slot.next_slots.clone());
if slot.contains_all_ticks(blocktree) {
continue;
} else {
let num_unreceived_ticks = {
if slot.consumed == slot.received {
slot.num_expected_ticks(blocktree) - slot.consumed_ticks
} else {
0
}
};
let upper = slot.received + num_unreceived_ticks;
let reqs = blocktree.find_missing_data_indexes(
0,
slot.consumed,
upper,
max_repairs - repairs.len(),
);
repairs.extend(reqs.into_iter().map(|i| (slot_height, i)))
}
}
Ok(repairs)
}
pub fn repair(
blocktree: &Blocktree,
slot_index: u64,
cluster_info: &Arc<RwLock<ClusterInfo>>,
id: &Pubkey,
times: usize,
tick_height: u64,
max_entry_height: u64,
leader_scheduler_option: &Arc<RwLock<LeaderScheduler>>,
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
let rcluster_info = cluster_info.read().unwrap();
let is_next_leader = false;
let meta = blocktree.meta(slot_index)?;
if meta.is_none() {
return Ok(vec![]);
}
let meta = meta.unwrap();
let consumed = meta.consumed;
let received = meta.received;
// Repair should only be called when received > consumed, enforced in window_service
assert!(received > consumed);
// Check if we are the next next slot leader
{
let leader_scheduler = leader_scheduler_option.read().unwrap();
let next_slot = leader_scheduler.tick_height_to_slot(tick_height) + 1;
match leader_scheduler.get_leader_for_slot(next_slot) {
Some(leader_id) if leader_id == *id => true,
// In the case that we are not in the current scope of the leader schedule
// window then either:
//
// 1) The replay stage hasn't caught up to the "consumed" entries we sent,
// in which case it will eventually catch up
//
// 2) We are on the border between ticks_per_epochs, so the
// schedule won't be known until the entry on that cusp is received
// by the replay stage (which comes after this stage). Hence, the next
// leader at the beginning of that next epoch will not know they are the
// leader until they receive that last "cusp" entry. The leader also won't ask for repairs
// for that entry because "is_next_leader" won't be set here. In this case,
// everybody will be blocking waiting for that "cusp" entry instead of repairing,
// until the leader hits "times" >= the max times in calculate_max_repair_entry_height().
// The impact of this, along with the similar problem from broadcast for the transitioning
// leader, can be observed in the multinode test, test_full_leader_validator_network(),
None => false,
_ => false,
}
};
let num_peers = rcluster_info.repair_peers().len() as u64;
// Check if there's a max_entry_height limitation
let max_repair_entry_height = if max_entry_height == 0 {
calculate_max_repair_entry_height(num_peers, consumed, received, times, is_next_leader)
} else {
max_entry_height + 2
};
let idxs = blocktree.find_missing_data_indexes(
DEFAULT_SLOT_HEIGHT,
consumed,
max_repair_entry_height - 1,
MAX_REPAIR_LENGTH,
);
let reqs: Vec<_> = idxs
.into_iter()
.filter_map(|pix| rcluster_info.window_index_request(slot_index, pix).ok())
.collect();
drop(rcluster_info);
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
if log_enabled!(Level::Trace) {
trace!(
"{}: repair_window counter times: {} consumed: {} received: {} max_repair_entry_height: {} missing: {}",
id,
times,
consumed,
received,
max_repair_entry_height,
reqs.len()
);
for (to, _) in &reqs {
trace!("{}: repair_window request to {}", id, to);
}
}
Ok(reqs)
}
pub fn retransmit_all_leader_blocks(
dq: &[SharedBlob],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
@ -207,17 +68,11 @@ pub fn add_blob_to_retransmit_queue(
}
}
/// Process a blob: Add blob to the ledger window. If a continuous set of blobs
/// starting from consumed is thereby formed, add that continuous
/// range of blobs to a queue to be sent on to the next stage.
/// Process a blob: Add blob to the ledger window.
pub fn process_blob(
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
blocktree: &Arc<Blocktree>,
blob: &SharedBlob,
max_ix: u64,
consume_queue: &mut Vec<Entry>,
tick_height: &mut u64,
done: &Arc<AtomicBool>,
) -> Result<()> {
let is_coding = blob.read().unwrap().is_coding();
@ -238,23 +93,18 @@ pub fn process_blob(
return Ok(()); // Occurs as a leader is rotating into a validator
}
// Insert the new blob into the window
let mut consumed_entries = if is_coding {
// Insert the new blob into block tree
if is_coding {
let blob = &blob.read().unwrap();
blocktree.put_coding_blob_bytes(slot, pix, &blob.data[..BLOB_HEADER_SIZE + blob.size()])?;
vec![]
} else {
blocktree.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])?
};
blocktree.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])?;
}
#[cfg(feature = "erasure")]
{
// If write_shared_blobs() of these recovered blobs fails fails, don't return
// because consumed_entries might be nonempty from earlier, and tick height needs to
// be updated. Hopefully we can recover these blobs next time successfully.
// TODO: Support per-slot erasure. Issue: https://github.com/solana-labs/solana/issues/2441
if let Err(e) = try_erasure(blocktree, &mut consumed_entries, 0) {
if let Err(e) = try_erasure(blocktree, 0) {
trace!(
"erasure::recover failed to write recovered coding blobs. Err: {:?}",
e
@ -262,60 +112,11 @@ pub fn process_blob(
}
}
for entry in &consumed_entries {
*tick_height += entry.is_tick() as u64;
}
// For downloading storage blobs,
// we only want up to a certain index
// then stop
if max_ix != 0 && !consumed_entries.is_empty() {
let meta = blocktree
.meta(0)?
.expect("Expect metadata to exist if consumed entries is nonzero");
let consumed = meta.consumed;
// Check if we ran over the last wanted entry
if consumed > max_ix {
let consumed_entries_len = consumed_entries.len();
let extra_unwanted_entries_len =
cmp::min(consumed_entries_len, (consumed - (max_ix + 1)) as usize);
consumed_entries.truncate(consumed_entries_len - extra_unwanted_entries_len);
done.store(true, Ordering::Relaxed);
}
}
consume_queue.extend(consumed_entries);
Ok(())
}
pub fn calculate_max_repair_entry_height(
num_peers: u64,
consumed: u64,
received: u64,
times: usize,
is_next_leader: bool,
) -> u64 {
// Calculate the highest blob index that this node should have already received
// via avalanche. The avalanche splits data stream into nodes and each node retransmits
// the data to their peer nodes. So there's a possibility that a blob (with index lower
// than current received index) is being retransmitted by a peer node.
if times >= 8 || is_next_leader {
// if repair backoff is getting high, or if we are the next leader,
// don't wait for avalanche. received - 1 is the index of the highest blob.
received
} else {
cmp::max(consumed, received.saturating_sub(num_peers))
}
}
#[cfg(feature = "erasure")]
fn try_erasure(
blocktree: &Arc<Blocktree>,
consume_queue: &mut Vec<Entry>,
slot_index: u64,
) -> Result<()> {
fn try_erasure(blocktree: &Arc<Blocktree>, slot_index: u64) -> Result<()> {
let meta = blocktree.meta(slot_index)?;
if let Some(meta) = meta {
@ -329,11 +130,10 @@ fn try_erasure(
)?;
}
let entries = blocktree.write_shared_blobs(data)?;
consume_queue.extend(entries);
blocktree.write_shared_blobs(data)
} else {
Ok(())
}
Ok(())
}
#[cfg(test)]
@ -418,23 +218,6 @@ mod test {
t_responder.join().expect("join");
}
#[test]
pub fn test_calculate_max_repair_entry_height() {
assert_eq!(calculate_max_repair_entry_height(20, 4, 11, 0, false), 4);
assert_eq!(calculate_max_repair_entry_height(0, 10, 90, 0, false), 90);
assert_eq!(calculate_max_repair_entry_height(15, 10, 90, 32, false), 90);
assert_eq!(calculate_max_repair_entry_height(15, 10, 90, 0, false), 75);
assert_eq!(calculate_max_repair_entry_height(90, 10, 90, 0, false), 10);
assert_eq!(calculate_max_repair_entry_height(90, 10, 50, 0, false), 10);
assert_eq!(calculate_max_repair_entry_height(90, 10, 99, 0, false), 10);
assert_eq!(calculate_max_repair_entry_height(90, 10, 101, 0, false), 11);
assert_eq!(calculate_max_repair_entry_height(90, 10, 101, 0, true), 101);
assert_eq!(
calculate_max_repair_entry_height(90, 10, 101, 30, true),
101
);
}
#[test]
pub fn test_retransmit() {
let leader = Keypair::new().pubkey();
@ -483,79 +266,6 @@ mod test {
assert!(blob_receiver.try_recv().is_err());
}
#[test]
pub fn test_generate_repairs() {
let blocktree_path = get_tmp_ledger_path("test_generate_repairs");
let num_ticks_per_slot = 10;
let blocktree_config = BlocktreeConfig::new(num_ticks_per_slot);
let blocktree = Blocktree::open_config(&blocktree_path, blocktree_config).unwrap();
let num_entries_per_slot = 10;
let num_slots = 2;
let mut blobs = make_tiny_test_entries(num_slots * num_entries_per_slot).to_blobs();
// Insert every nth entry for each slot
let nth = 3;
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(((i % num_entries_per_slot) * nth) as u64);
b.set_slot((i / num_entries_per_slot) as u64);
}
blocktree.write_blobs(&blobs).unwrap();
let missing_indexes_per_slot: Vec<u64> = (0..num_entries_per_slot - 1)
.flat_map(|x| ((nth * x + 1) as u64..(nth * x + nth) as u64))
.collect();
let expected: Vec<(u64, u64)> = (0..num_slots)
.flat_map(|slot_height| {
missing_indexes_per_slot
.iter()
.map(move |blob_index| (slot_height as u64, *blob_index))
})
.collect();
// Across all slots, find all missing indexes in the range [0, num_entries_per_slot * nth]
assert_eq!(
generate_repairs(&blocktree, std::usize::MAX).unwrap(),
expected
);
assert_eq!(
generate_repairs(&blocktree, expected.len() - 2).unwrap()[..],
expected[0..expected.len() - 2]
);
// Now fill in all the holes for each slot such that for each slot, consumed == received.
// Because none of the slots contain ticks, we should see that the repair requests
// ask for ticks, starting from the last received index for that slot
for (slot_height, blob_index) in expected {
let mut b = make_tiny_test_entries(1).to_blobs().pop().unwrap();
b.set_index(blob_index);
b.set_slot(slot_height);
blocktree.write_blobs(&vec![b]).unwrap();
}
let last_index_per_slot = ((num_entries_per_slot - 1) * nth) as u64;
let missing_indexes_per_slot: Vec<u64> =
(last_index_per_slot + 1..last_index_per_slot + 1 + num_ticks_per_slot).collect();
let expected: Vec<(u64, u64)> = (0..num_slots)
.flat_map(|slot_height| {
missing_indexes_per_slot
.iter()
.map(move |blob_index| (slot_height as u64, *blob_index))
})
.collect();
assert_eq!(
generate_repairs(&blocktree, std::usize::MAX).unwrap(),
expected
);
assert_eq!(
generate_repairs(&blocktree, expected.len() - 2).unwrap()[..],
expected[0..expected.len() - 2]
);
}
#[test]
pub fn test_find_missing_data_indexes_sanity() {
let slot = DEFAULT_SLOT_HEIGHT;
@ -813,9 +523,7 @@ mod test {
let ledger_path = get_tmp_ledger_path("test_try_erasure");
let blocktree = Arc::new(generate_blocktree_from_window(&ledger_path, &window, false));
let mut consume_queue = vec![];
try_erasure(&blocktree, &mut consume_queue, DEFAULT_SLOT_HEIGHT)
.expect("Expected successful erasure attempt");
try_erasure(&blocktree, DEFAULT_SLOT_HEIGHT).expect("Expected successful erasure attempt");
window[erased_index].data = erased_data;
{
@ -829,7 +537,17 @@ mod test {
let locked_data: Vec<&Blob> = locks.iter().map(|lock| &**lock).collect();
let (expected, _) = reconstruct_entries_from_blobs(locked_data).unwrap();
assert_eq!(consume_queue, expected);
assert_eq!(
blocktree
.get_slot_entries(
0,
erased_index as u64,
Some((end_index - erased_index) as u64)
)
.unwrap(),
expected
);
}
let erased_coding_l = erased_coding.read().unwrap();
@ -862,24 +580,15 @@ mod test {
&vec![DEFAULT_SLOT_HEIGHT; num_entries],
);
let mut consume_queue = vec![];
let mut tick_height = 2;
let done = Arc::new(AtomicBool::new(false));
for blob in shared_blobs.iter().rev() {
process_blob(
&leader_scheduler,
&blocktree,
blob,
0,
&mut consume_queue,
&mut tick_height,
&done,
)
.expect("Expect successful processing of blob");
process_blob(&leader_scheduler, &blocktree, blob)
.expect("Expect successful processing of blob");
}
assert_eq!(consume_queue, original_entries);
assert_eq!(
blocktree.get_slot_entries(0, 0, None).unwrap(),
original_entries
);
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");

View File

@ -82,7 +82,7 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<Hash> {
fn get_entry_heights_from_last_id(
signature: &ring::signature::Signature,
storage_entry_height: u64,
) -> (u64, u64) {
) -> u64 {
let signature_vec = signature.as_ref();
let mut segment_index = u64::from(signature_vec[0])
| (u64::from(signature_vec[1]) << 8)
@ -90,10 +90,7 @@ fn get_entry_heights_from_last_id(
| (u64::from(signature_vec[2]) << 24);
let max_segment_index = get_segment_from_entry(storage_entry_height);
segment_index %= max_segment_index as u64;
let entry_height = segment_index * ENTRIES_PER_SEGMENT;
let max_entry_height = entry_height + ENTRIES_PER_SEGMENT;
(entry_height, max_entry_height)
segment_index * ENTRIES_PER_SEGMENT
}
impl Replicator {
@ -116,7 +113,6 @@ impl Replicator {
timeout: Option<Duration>,
) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false));
let done = Arc::new(AtomicBool::new(false));
let timeout = timeout.unwrap_or_else(|| Duration::new(30, 0));
info!("Replicator: id: {}", keypair.pubkey());
@ -156,8 +152,7 @@ impl Replicator {
Self::poll_for_last_id_and_entry_height(&cluster_info)?;
let signature = keypair.sign(storage_last_id.as_ref());
let (entry_height, max_entry_height) =
get_entry_heights_from_last_id(&signature, storage_entry_height);
let entry_height = get_entry_heights_from_last_id(&signature, storage_entry_height);
info!("replicating entry_height: {}", entry_height);
@ -175,13 +170,10 @@ impl Replicator {
let window_service = WindowService::new(
blocktree.clone(),
cluster_info.clone(),
0,
max_entry_height,
blob_fetch_receiver,
retransmit_sender,
repair_socket,
Arc::new(RwLock::new(LeaderScheduler::default())),
done.clone(),
exit.clone(),
);

View File

@ -128,7 +128,6 @@ impl RetransmitStage {
bank: &Arc<Bank>,
blocktree: Arc<Blocktree>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
tick_height: u64,
retransmit_socket: Arc<UdpSocket>,
repair_socket: Arc<UdpSocket>,
fetch_stage_receiver: BlobReceiver,
@ -143,17 +142,13 @@ impl RetransmitStage {
cluster_info.clone(),
retransmit_receiver,
);
let done = Arc::new(AtomicBool::new(false));
let window_service = WindowService::new(
blocktree,
cluster_info.clone(),
tick_height,
0,
fetch_stage_receiver,
retransmit_sender,
repair_socket,
leader_scheduler,
done,
exit,
);

View File

@ -154,7 +154,7 @@ pub fn generate_offsets(batches: &[SharedPackets]) -> Result<TxOffsets> {
let mut msg_sizes: Vec<_> = Vec::new();
let mut current_packet = 0;
let mut v_sig_lens = Vec::new();
batches.into_iter().for_each(|p| {
batches.iter().for_each(|p| {
let mut sig_lens = Vec::new();
p.read().unwrap().packets.iter().for_each(|packet| {
let current_offset = current_packet as u32 * size_of::<Packet>() as u32;

View File

@ -110,7 +110,6 @@ impl Tvu {
bank,
blocktree.clone(),
&cluster_info,
bank.tick_height(),
Arc::new(retransmit_socket),
repair_socket,
blob_fetch_receiver,

View File

@ -32,11 +32,8 @@ fn recv_window(
blocktree: &Arc<Blocktree>,
id: &Pubkey,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
tick_height: &mut u64,
max_ix: u64,
r: &BlobReceiver,
retransmit: &BlobSender,
done: &Arc<AtomicBool>,
) -> Result<()> {
let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?;
@ -56,8 +53,6 @@ fn recv_window(
retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit, id)?;
//send a contiguous set of blocks
let mut consume_queue = Vec::new();
trace!("{} num blobs received: {}", id, dq.len());
for b in dq {
@ -68,15 +63,7 @@ fn recv_window(
trace!("{} window pix: {} size: {}", id, pix, meta_size);
let _ = process_blob(
leader_scheduler,
blocktree,
&b,
max_ix,
&mut consume_queue,
tick_height,
done,
);
let _ = process_blob(leader_scheduler, blocktree, &b);
}
trace!(
@ -115,13 +102,10 @@ impl WindowService {
pub fn new(
blocktree: Arc<Blocktree>,
cluster_info: Arc<RwLock<ClusterInfo>>,
tick_height: u64,
max_entry_height: u64,
r: BlobReceiver,
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
done: Arc<AtomicBool>,
exit: Arc<AtomicBool>,
) -> WindowService {
let exit_ = exit.clone();
@ -135,23 +119,14 @@ impl WindowService {
.name("solana-window".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit_);
let mut tick_height_ = tick_height;
let id = cluster_info.read().unwrap().id();
trace!("{}: RECV_WINDOW started", id);
loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = recv_window(
&blocktree,
&id,
&leader_scheduler,
&mut tick_height_,
max_entry_height,
&r,
&retransmit,
&done,
) {
if let Err(e) = recv_window(&blocktree, &id, &leader_scheduler, &r, &retransmit)
{
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
@ -217,7 +192,6 @@ mod test {
let t_receiver =
blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader);
let (s_retransmit, r_retransmit) = channel();
let done = Arc::new(AtomicBool::new(false));
let blocktree_path = get_tmp_ledger_path("window_send_test");
let blocktree = Arc::new(
Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"),
@ -227,13 +201,10 @@ mod test {
let t_window = WindowService::new(
blocktree,
subs,
0,
0,
r_reader,
s_retransmit,
Arc::new(leader_node.sockets.repair),
Arc::new(RwLock::new(leader_schedule)),
done,
exit.clone(),
);
let t_responder = {
@ -298,7 +269,6 @@ mod test {
let t_receiver =
blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader);
let (s_retransmit, r_retransmit) = channel();
let done = Arc::new(AtomicBool::new(false));
let blocktree_path = get_tmp_ledger_path("window_send_late_leader_test");
let blocktree = Arc::new(
Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"),
@ -308,13 +278,10 @@ mod test {
let t_window = WindowService::new(
blocktree,
subs.clone(),
0,
0,
r_reader,
s_retransmit,
Arc::new(leader_node.sockets.repair),
Arc::new(RwLock::new(leader_schedule)),
done,
exit.clone(),
);
let t_responder = {