diff --git a/src/bank.rs b/src/bank.rs index baff2a06a5..677c7f4323 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -1417,9 +1417,11 @@ impl Bank { subscriptions.remove(pubkey).is_some() } - pub fn get_current_leader(&self) -> Option { - let ls_lock = self.leader_scheduler.read().unwrap(); - ls_lock.get_scheduled_leader(self.tick_height()) + pub fn get_current_leader(&self) -> Option<(Pubkey, u64)> { + self.leader_scheduler + .read() + .unwrap() + .get_scheduled_leader(self.tick_height()) } pub fn tick_height(&self) -> u64 { diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 8c35266f9f..45853ef66f 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -9,7 +9,7 @@ use influx_db_client as influxdb; use ledger::Block; use log::Level; use metrics; -use packet::SharedBlobs; +use packet::{index_blobs, SharedBlobs}; use rayon::prelude::*; use result::{Error, Result}; use service::Service; @@ -20,7 +20,7 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; use timing::duration_as_ms; -use window::{self, SharedWindow, WindowIndex, WindowUtil}; +use window::{SharedWindow, WindowIndex, WindowUtil}; #[derive(Debug, PartialEq, Eq, Clone)] pub enum BroadcastStageReturnType { @@ -36,6 +36,7 @@ fn broadcast( sock: &UdpSocket, transmit_index: &mut WindowIndex, receive_index: &mut u64, + leader_slot: u64, ) -> Result<()> { let id = node_info.id; let timer = Duration::new(1, 0); @@ -73,10 +74,7 @@ fn broadcast( let blobs_len = blobs.len(); trace!("{}: broadcast blobs.len: {}", id, blobs_len); - // TODO: move all this into window.rs - // Index the blobs - window::index_blobs(node_info, &blobs, receive_index) - .expect("index blobs for initial window"); + index_blobs(&blobs, &node_info.id, *receive_index, leader_slot); // keep the cache of blobs that are broadcast inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); @@ -84,13 +82,13 @@ fn broadcast( let mut win = window.write().unwrap(); assert!(blobs.len() <= win.len()); for b in &blobs { - let ix = b.read().unwrap().get_index().expect("blob index"); + let ix = b.read().unwrap().index().expect("blob index"); let pos = (ix % window_size) as usize; if let Some(x) = win[pos].data.take() { trace!( "{} popped {} at {}", id, - x.read().unwrap().get_index().unwrap(), + x.read().unwrap().index().unwrap(), pos ); } @@ -98,7 +96,7 @@ fn broadcast( trace!( "{} popped {} at {}", id, - x.read().unwrap().get_index().unwrap(), + x.read().unwrap().index().unwrap(), pos ); } @@ -106,7 +104,7 @@ fn broadcast( trace!("{} null {}", id, pos); } for b in &blobs { - let ix = b.read().unwrap().get_index().expect("blob index"); + let ix = b.read().unwrap().index().expect("blob index"); let pos = (ix % window_size) as usize; trace!("{} caching {} at {}", id, ix, pos); assert!(win[pos].data.is_none()); @@ -188,6 +186,7 @@ impl BroadcastStage { cluster_info: &Arc>, window: &SharedWindow, entry_height: u64, + leader_slot: u64, receiver: &Receiver>, ) -> BroadcastStageReturnType { let mut transmit_index = WindowIndex { @@ -206,6 +205,7 @@ impl BroadcastStage { &sock, &mut transmit_index, &mut receive_index, + leader_slot, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { @@ -242,6 +242,7 @@ impl BroadcastStage { cluster_info: Arc>, window: SharedWindow, entry_height: u64, + leader_slot: u64, receiver: Receiver>, exit_sender: Arc, ) -> Self { @@ -249,7 +250,14 @@ impl BroadcastStage { .name("solana-broadcaster".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_sender); - Self::run(&sock, &cluster_info, &window, entry_height, &receiver) + Self::run( + &sock, + &cluster_info, + &window, + entry_height, + leader_slot, + &receiver, + ) }).unwrap(); BroadcastStage { thread_hdl } diff --git a/src/cluster_info.rs b/src/cluster_info.rs index c7978ddb11..3f81d7c300 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -539,7 +539,7 @@ impl ClusterInfo { trace!( "{}: BROADCAST idx: {} sz: {} to {},{} coding: {}", me.id, - blob.get_index().unwrap(), + blob.index().unwrap(), blob.meta.size, v.id, v.contact_info.tvu, @@ -587,7 +587,7 @@ impl ClusterInfo { }; blob.write() .unwrap() - .set_id(me.id) + .set_id(&me.id) .expect("set_id in pub fn retransmit"); let rblob = blob.read().unwrap(); let orders: Vec<_> = table @@ -617,7 +617,7 @@ impl ClusterInfo { debug!( "{}: retransmit blob {} to {} {}", me.id, - rblob.get_index().unwrap(), + rblob.index().unwrap(), v.id, v.contact_info.tvu, ); @@ -869,7 +869,7 @@ impl ClusterInfo { let pos = (ix as usize) % window.read().unwrap().len(); if let Some(ref mut blob) = &mut window.write().unwrap()[pos].data { let mut wblob = blob.write().unwrap(); - let blob_ix = wblob.get_index().expect("run_window_request get_index"); + let blob_ix = wblob.index().expect("run_window_request index"); if blob_ix == ix { let num_retransmits = wblob.meta.num_retransmits; wblob.meta.num_retransmits += 1; @@ -896,7 +896,7 @@ impl ClusterInfo { outblob.meta.size = sz; outblob.data[..sz].copy_from_slice(&wblob.data[..sz]); outblob.meta.set_addr(from_addr); - outblob.set_id(sender_id).expect("blob set_id"); + outblob.set_id(&sender_id).expect("blob set_id"); } inc_new_counter_info!("cluster_info-window-request-pass", 1); @@ -1735,7 +1735,7 @@ mod tests { } else { mock_peer.id }; - assert_eq!(blob.get_id().unwrap(), id); + assert_eq!(blob.id().unwrap(), id); } } diff --git a/src/entry.rs b/src/entry.rs index 3a1ec67c13..4198a194c3 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -108,7 +108,7 @@ impl Entry { blob_w.set_index(idx).expect("set_index()"); } if let Some(id) = id { - blob_w.set_id(id).expect("set_id()"); + blob_w.set_id(&id).expect("set_id()"); } if let Some(addr) = addr { blob_w.meta.set_addr(addr); diff --git a/src/erasure.rs b/src/erasure.rs index 79adfdfc93..e08d0eb81d 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -295,8 +295,8 @@ pub fn generate_coding( if let Some(data) = &window[n].data { let data_rl = data.read().unwrap(); - let index = data_rl.get_index().unwrap(); - let id = data_rl.get_id().unwrap(); + let index = data_rl.index().unwrap(); + let id = data_rl.id().unwrap(); trace!( "{} copying index {} id {:?} from data to coding", @@ -305,7 +305,7 @@ pub fn generate_coding( id ); coding_wl.set_index(index).unwrap(); - coding_wl.set_id(id).unwrap(); + coding_wl.set_id(&id).unwrap(); } coding_wl.set_size(max_data_size); if coding_wl.set_coding().is_err() { @@ -351,7 +351,7 @@ pub fn generate_coding( // false if slot has a blob with the right index fn is_missing(id: &Pubkey, idx: u64, window_slot: &mut Option, c_or_d: &str) -> bool { if let Some(blob) = window_slot.take() { - let blob_idx = blob.read().unwrap().get_index().unwrap(); + let blob_idx = blob.read().unwrap().index().unwrap(); if blob_idx == idx { trace!("recover {}: idx: {} good {}", id, idx, c_or_d); // put it back @@ -553,7 +553,7 @@ pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: us let mut data_size; if n < NUM_DATA { - data_size = locks[n].get_data_size().unwrap() as usize; + data_size = locks[n].data_size().unwrap() as usize; data_size -= BLOB_HEADER_SIZE; if data_size > BLOB_DATA_SIZE { error!("{} corrupt data blob[{}] data_size: {}", id, idx, data_size); @@ -591,15 +591,14 @@ pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: us #[cfg(test)] mod test { - use cluster_info; use erasure; use logger; - use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE}; + use packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE}; use rand::{thread_rng, Rng}; use signature::{Keypair, KeypairUtil}; use solana_sdk::pubkey::Pubkey; // use std::sync::{Arc, RwLock}; - use window::{index_blobs, WindowSlot}; + use window::WindowSlot; #[test] pub fn test_coding() { @@ -660,7 +659,7 @@ mod test { let window_l2 = window_l1.read().unwrap(); print!( "data index: {:?} meta.size: {} data: ", - window_l2.get_index(), + window_l2.index(), window_l2.meta.size ); for i in 0..64 { @@ -676,7 +675,7 @@ mod test { let window_l2 = window_l1.read().unwrap(); print!( "coding index: {:?} meta.size: {} data: ", - window_l2.get_index(), + window_l2.index(), window_l2.meta.size ); for i in 0..8 { @@ -726,10 +725,9 @@ mod test { blobs.push(b_); } - let d = cluster_info::NodeInfo::new_localhost(Keypair::new().pubkey()); - assert!(index_blobs(&d, &blobs, &mut (offset as u64)).is_ok()); + index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, 0); for b in blobs { - let idx = b.read().unwrap().get_index().unwrap() as usize % WINDOW_SIZE; + let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE; window[idx].data = Some(b); } @@ -815,7 +813,7 @@ mod test { assert_eq!(window_l2.meta.port, ref_l2.meta.port); assert_eq!(window_l2.meta.v6, ref_l2.meta.v6); assert_eq!( - window_l2.get_index().unwrap(), + window_l2.index().unwrap(), (erase_offset + WINDOW_SIZE) as u64 ); } @@ -850,7 +848,7 @@ mod test { assert_eq!(window_l2.meta.port, ref_l2.meta.port); assert_eq!(window_l2.meta.v6, ref_l2.meta.v6); assert_eq!( - window_l2.get_index().unwrap(), + window_l2.index().unwrap(), (erase_offset + WINDOW_SIZE) as u64 ); } @@ -896,7 +894,7 @@ mod test { assert_eq!(window_l2.meta.port, ref_l2.meta.port); assert_eq!(window_l2.meta.v6, ref_l2.meta.v6); assert_eq!( - window_l2.get_index().unwrap(), + window_l2.index().unwrap(), (erase_offset + WINDOW_SIZE) as u64 ); } diff --git a/src/fullnode.rs b/src/fullnode.rs index 1cde1e535f..e1885bb39b 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -237,7 +237,7 @@ impl Fullnode { } // Get the scheduled leader - let scheduled_leader = bank + let (scheduled_leader, leader_slot) = bank .get_current_leader() .expect("Leader not known after processing bank"); @@ -296,6 +296,7 @@ impl Fullnode { cluster_info.clone(), shared_window.clone(), entry_height, + leader_slot, entry_receiver, tpu_exit, ); @@ -354,7 +355,7 @@ impl Fullnode { ); let new_bank = Arc::new(new_bank); - let scheduled_leader = new_bank + let (scheduled_leader, _) = new_bank .get_current_leader() .expect("Scheduled leader should exist after rebuilding bank"); @@ -446,6 +447,7 @@ impl Fullnode { self.cluster_info.clone(), self.shared_window.clone(), entry_height, + 0, // TODO: get real leader slot from leader_scheduler blob_receiver, tpu_exit, ); diff --git a/src/leader_scheduler.rs b/src/leader_scheduler.rs index 5d79d032a7..000334fb79 100644 --- a/src/leader_scheduler.rs +++ b/src/leader_scheduler.rs @@ -248,15 +248,15 @@ impl LeaderScheduler { // Uses the schedule generated by the last call to generate_schedule() to return the // leader for a given PoH height in round-robin fashion - pub fn get_scheduled_leader(&self, height: u64) -> Option { + pub fn get_scheduled_leader(&self, height: u64) -> Option<(Pubkey, u64)> { if self.use_only_bootstrap_leader { - return Some(self.bootstrap_leader); + return Some((self.bootstrap_leader, 0)); } // This covers cases where the schedule isn't yet generated. if self.last_seed_height == None { if height < self.bootstrap_height { - return Some(self.bootstrap_leader); + return Some((self.bootstrap_leader, 0)); } else { // If there's been no schedule generated yet before we reach the end of the // bootstrapping period, then the leader is unknown @@ -273,9 +273,10 @@ impl LeaderScheduler { } // Find index into the leader_schedule that this PoH height maps to + let leader_slot = (height - self.bootstrap_height) / self.leader_rotation_interval + 1; let index = (height - last_seed_height) / self.leader_rotation_interval; let validator_index = index as usize % self.leader_schedule.len(); - Some(self.leader_schedule[validator_index]) + Some((self.leader_schedule[validator_index], leader_slot)) } // TODO: We use a HashSet for now because a single validator could potentially register @@ -351,7 +352,7 @@ impl LeaderScheduler { // If possible, try to avoid having the same leader twice in a row, but // if there's only one leader to choose from, then we have no other choice if validator_rankings.len() > 1 { - let old_epoch_last_leader = self + let (old_epoch_last_leader, _) = self .get_scheduled_leader(height - 1) .expect("Previous leader schedule should still exist"); let new_epoch_start_leader = validator_rankings[0]; @@ -587,11 +588,11 @@ mod tests { // be the bootstrap leader assert_eq!( leader_scheduler.get_scheduled_leader(0), - Some(bootstrap_leader_id) + Some((bootstrap_leader_id, 0)) ); assert_eq!( leader_scheduler.get_scheduled_leader(bootstrap_height - 1), - Some(bootstrap_leader_id) + Some((bootstrap_leader_id, 0)) ); assert_eq!( leader_scheduler.get_scheduled_leader(bootstrap_height), @@ -625,7 +626,7 @@ mod tests { let mut start_leader_index = None; for i in 0..num_rounds { let begin_height = bootstrap_height + i * leader_rotation_interval; - let current_leader = leader_scheduler + let (current_leader, slot) = leader_scheduler .get_scheduled_leader(begin_height) .expect("Expected a leader from scheduler"); @@ -645,10 +646,11 @@ mod tests { let expected_leader = validators[(start_leader_index.unwrap() + i as usize) % num_validators]; assert_eq!(current_leader, expected_leader); + assert_eq!(slot, i + 1); // Check that the same leader is in power for the next leader_rotation_interval entries assert_eq!( leader_scheduler.get_scheduled_leader(begin_height + leader_rotation_interval - 1), - Some(current_leader) + Some((current_leader, slot)) ); } } diff --git a/src/ledger.rs b/src/ledger.rs index a81ed40e07..a6b248814b 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -508,9 +508,9 @@ pub fn reconstruct_entries_from_blobs(blobs: Vec) -> Result(rsps: Vec<(T, SocketAddr)>) -> Result Ok(blobs) } -const BLOB_INDEX_END: usize = size_of::(); +const BLOB_SLOT_END: usize = size_of::(); +const BLOB_INDEX_END: usize = BLOB_SLOT_END + size_of::(); const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::(); const BLOB_FLAGS_END: usize = BLOB_ID_END + size_of::(); const BLOB_SIZE_END: usize = BLOB_FLAGS_END + size_of::(); @@ -265,31 +266,42 @@ pub const BLOB_FLAG_IS_CODING: u32 = 0x1; pub const BLOB_HEADER_SIZE: usize = align!(BLOB_SIZE_END, 64); impl Blob { - pub fn get_index(&self) -> Result { - let mut rdr = io::Cursor::new(&self.data[0..BLOB_INDEX_END]); + pub fn slot(&self) -> Result { + let mut rdr = io::Cursor::new(&self.data[0..BLOB_SLOT_END]); + let r = rdr.read_u64::()?; + Ok(r) + } + pub fn set_slot(&mut self, ix: u64) -> Result<()> { + let mut wtr = vec![]; + wtr.write_u64::(ix)?; + self.data[..BLOB_SLOT_END].clone_from_slice(&wtr); + Ok(()) + } + pub fn index(&self) -> Result { + let mut rdr = io::Cursor::new(&self.data[BLOB_SLOT_END..BLOB_INDEX_END]); let r = rdr.read_u64::()?; Ok(r) } pub fn set_index(&mut self, ix: u64) -> Result<()> { let mut wtr = vec![]; wtr.write_u64::(ix)?; - self.data[..BLOB_INDEX_END].clone_from_slice(&wtr); + self.data[BLOB_SLOT_END..BLOB_INDEX_END].clone_from_slice(&wtr); Ok(()) } /// sender id, we use this for identifying if its a blob from the leader that we should /// retransmit. eventually blobs should have a signature that we can use ffor spam filtering - pub fn get_id(&self) -> Result { + pub fn id(&self) -> Result { let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?; Ok(e) } - pub fn set_id(&mut self, id: Pubkey) -> Result<()> { - let wtr = serialize(&id)?; + pub fn set_id(&mut self, id: &Pubkey) -> Result<()> { + let wtr = serialize(id)?; self.data[BLOB_INDEX_END..BLOB_ID_END].clone_from_slice(&wtr); Ok(()) } - pub fn get_flags(&self) -> Result { + pub fn flags(&self) -> Result { let mut rdr = io::Cursor::new(&self.data[BLOB_ID_END..BLOB_FLAGS_END]); let r = rdr.read_u32::()?; Ok(r) @@ -303,15 +315,15 @@ impl Blob { } pub fn is_coding(&self) -> bool { - (self.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0 + (self.flags().unwrap() & BLOB_FLAG_IS_CODING) != 0 } pub fn set_coding(&mut self) -> Result<()> { - let flags = self.get_flags().unwrap(); + let flags = self.flags().unwrap(); self.set_flags(flags | BLOB_FLAG_IS_CODING) } - pub fn get_data_size(&self) -> Result { + pub fn data_size(&self) -> Result { let mut rdr = io::Cursor::new(&self.data[BLOB_FLAGS_END..BLOB_SIZE_END]); let r = rdr.read_u64::()?; Ok(r) @@ -330,8 +342,8 @@ impl Blob { pub fn data_mut(&mut self) -> &mut [u8] { &mut self.data[BLOB_HEADER_SIZE..] } - pub fn get_size(&self) -> Result { - let size = self.get_data_size()? as usize; + pub fn size(&self) -> Result { + let size = self.data_size()? as usize; if self.meta.size == size { Ok(size - BLOB_HEADER_SIZE) } else { @@ -406,6 +418,20 @@ impl Blob { } } +pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut index: u64, slot: u64) { + // enumerate all the blobs, those are the indices + for b in blobs { + let mut blob = b.write().unwrap(); + + blob.set_index(index).expect("set_index"); + blob.set_slot(slot).expect("set_slot"); + blob.set_id(id).expect("set_id"); + blob.set_flags(0).unwrap(); + + index += 1; + } +} + #[cfg(test)] pub fn make_consecutive_blobs( me_id: Pubkey, @@ -525,10 +551,10 @@ mod tests { pub fn blob_test() { let mut b = Blob::default(); b.set_index(::max_value()).unwrap(); - assert_eq!(b.get_index().unwrap(), ::max_value()); + assert_eq!(b.index().unwrap(), ::max_value()); b.data_mut()[0] = 1; assert_eq!(b.data()[0], 1); - assert_eq!(b.get_index().unwrap(), ::max_value()); + assert_eq!(b.index().unwrap(), ::max_value()); assert_eq!(b.meta, Meta::default()); } diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 2ec3b84386..54da567737 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -78,13 +78,13 @@ impl ReplicateStage { let mut res = Ok(()); let last_entry_id = { let mut num_entries_to_write = entries.len(); - let current_leader = bank + let (current_leader, _) = bank .get_current_leader() .expect("Scheduled leader id should never be unknown while processing entries"); for (i, entry) in entries.iter().enumerate() { res = bank.process_entry(&entry); let my_id = keypair.pubkey(); - let scheduled_leader = bank + let (scheduled_leader, _) = bank .get_current_leader() .expect("Scheduled leader id should never be unknown while processing entries"); @@ -164,7 +164,7 @@ impl ReplicateStage { let mut entry_height_ = entry_height; let mut last_entry_id = None; loop { - let leader_id = bank + let (leader_id, _) = bank .get_current_leader() .expect("Scheduled leader id should never be unknown at this point"); diff --git a/src/tvu.rs b/src/tvu.rs index cc0c0c4799..d7f5c6115b 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -279,7 +279,7 @@ pub mod tests { let mut alice_ref_balance = starting_balance; let mut msgs = Vec::new(); let mut cur_hash = Hash::default(); - let mut blob_id = 0; + let mut blob_idx = 0; let num_transfers = 10; let transfer_amount = 501; let bob_keypair = Keypair::new(); @@ -306,9 +306,9 @@ pub mod tests { let mut b = SharedBlob::default(); { let mut w = b.write().unwrap(); - w.set_index(blob_id).unwrap(); - blob_id += 1; - w.set_id(leader_id).unwrap(); + w.set_index(blob_idx).unwrap(); + blob_idx += 1; + w.set_id(&leader_id).unwrap(); let serialized_entry = serialize(&entry).unwrap(); diff --git a/src/vote_stage.rs b/src/vote_stage.rs index a51ad0e416..3b73b3211d 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -52,12 +52,9 @@ pub fn create_new_signed_vote_blob( } fn get_leader_tpu(bank: &Bank, cluster_info: &Arc>) -> Result { - let leader_id = { - if let Some(leader_id) = bank.get_current_leader() { - leader_id - } else { - return Err(Error::VoteError(VoteError::NoLeader)); - } + let leader_id = match bank.get_current_leader() { + Some((leader_id, _)) => leader_id, + None => return Err(Error::VoteError(VoteError::NoLeader)), }; let rcluster_info = cluster_info.read().unwrap(); diff --git a/src/window.rs b/src/window.rs index 9a9c994aa8..c81f82daf9 100644 --- a/src/window.rs +++ b/src/window.rs @@ -1,6 +1,6 @@ //! The `window` module defines data structure for storing the tail of the ledger. //! -use cluster_info::{ClusterInfo, NodeInfo}; +use cluster_info::ClusterInfo; use counter::Counter; use entry::Entry; #[cfg(feature = "erasure")] @@ -9,7 +9,6 @@ use leader_scheduler::LeaderScheduler; use ledger::reconstruct_entries_from_blobs; use log::Level; use packet::SharedBlob; -use result::Result; use solana_sdk::pubkey::Pubkey; use std::cmp; use std::mem; @@ -27,7 +26,7 @@ pub struct WindowSlot { impl WindowSlot { fn blob_index(&self) -> Option { match self.data { - Some(ref blob) => blob.read().unwrap().get_index().ok(), + Some(ref blob) => blob.read().unwrap().index().ok(), None => None, } } @@ -154,7 +153,7 @@ impl WindowUtil for Window { ls_lock.max_height_for_leader(tick_height) { match ls_lock.get_scheduled_leader(next_leader_rotation_height) { - Some(leader_id) if leader_id == *id => is_next_leader = true, + Some((leader_id, _)) if leader_id == *id => is_next_leader = true, // In the case that we are not in the current scope of the leader schedule // window then either: // @@ -296,7 +295,7 @@ impl WindowUtil for Window { c_or_d: &str, ) -> bool { if let Some(old) = mem::replace(window_slot, Some(blob)) { - let is_dup = old.read().unwrap().get_index().unwrap() == pix; + let is_dup = old.read().unwrap().index().unwrap() == pix; trace!( "{}: occupied {} window slot {:}, is_dup: {}", id, @@ -341,7 +340,7 @@ impl WindowUtil for Window { let k_data_blob; let k_data_slot = &mut self[k].data; if let Some(blob) = k_data_slot { - if blob.read().unwrap().get_index().unwrap() < *consumed { + if blob.read().unwrap().index().unwrap() < *consumed { // window wrap-around, end of received break; } @@ -407,26 +406,6 @@ pub fn default_window() -> Window { (0..2048).map(|_| WindowSlot::default()).collect() } -pub fn index_blobs( - node_info: &NodeInfo, - blobs: &[SharedBlob], - receive_index: &mut u64, -) -> Result<()> { - // enumerate all the blobs, those are the indices - trace!("{}: INDEX_BLOBS {}", node_info.id, blobs.len()); - for (i, b) in blobs.iter().enumerate() { - // only leader should be broadcasting - let mut blob = b.write().unwrap(); - blob.set_id(node_info.id) - .expect("set_id in pub fn broadcast"); - blob.set_index(*receive_index + i as u64) - .expect("set_index in pub fn broadcast"); - blob.set_flags(0).unwrap(); - } - - Ok(()) -} - #[cfg(test)] mod test { use packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; diff --git a/src/window_service.rs b/src/window_service.rs index 12d94ef2f7..7601cbf4cb 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -59,17 +59,14 @@ fn add_block_to_retransmit_queue( //we need to maintain a sequence window trace!( "idx: {} addr: {:?} id: {:?} leader: {:?}", - p.get_index() + p.index() .expect("get_index in fn add_block_to_retransmit_queue"), - p.get_id() + p.id() .expect("get_id in trace! fn add_block_to_retransmit_queue"), p.meta.addr(), leader_id ); - if p.get_id() - .expect("get_id in fn add_block_to_retransmit_queue") - == leader_id - { + if p.id().expect("get_id in fn add_block_to_retransmit_queue") == leader_id { //TODO //need to copy the retransmitted blob //otherwise we get into races with which thread @@ -202,7 +199,7 @@ fn recv_window( for b in dq { let (pix, meta_size) = { let p = b.read().unwrap(); - (p.get_index()?, p.meta.size) + (p.index()?, p.meta.size) }; pixs.push(pix); @@ -495,8 +492,8 @@ mod test { { let mut w = b.write().unwrap(); w.set_index(i).unwrap(); - w.set_id(me_id).unwrap(); - assert_eq!(i, w.get_index().unwrap()); + w.set_id(&me_id).unwrap(); + assert_eq!(i, w.index().unwrap()); w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&tn.info.contact_info.ncp); } @@ -559,8 +556,8 @@ mod test { { let mut w = b.write().unwrap(); w.set_index(i).unwrap(); - w.set_id(me_id).unwrap(); - assert_eq!(i, w.get_index().unwrap()); + w.set_id(&me_id).unwrap(); + assert_eq!(i, w.index().unwrap()); w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&tn.info.contact_info.ncp); } @@ -579,8 +576,8 @@ mod test { { let mut w = b.write().unwrap(); w.set_index(i).unwrap(); - w.set_id(me_id).unwrap(); - assert_eq!(i, w.get_index().unwrap()); + w.set_id(&me_id).unwrap(); + assert_eq!(i, w.index().unwrap()); w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&tn.info.contact_info.ncp); }