diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index f366260943..f5ccdee9d9 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -167,7 +167,7 @@ impl BroadcastStage { let my_id = rcrdt.my_data().id; match rcrdt.get_scheduled_leader(transmit_index.data) { Some(id) if id == my_id => (), - // If the leader stays in power for the next + // If the leader stays in power for the next // round as well, then we don't exit. Otherwise, exit. _ => { return; @@ -219,7 +219,15 @@ impl BroadcastStage { let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { - Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver); + Self::run( + &sock, + &crdt, + &window, + entry_height, + &recycler, + &receiver, + exit_sender, + ); }) .unwrap(); @@ -238,32 +246,40 @@ impl Service for BroadcastStage { #[cfg(test)] mod tests { - use crdt::{Crdt, LEADER_ROTATION_INTERVAL, Node}; + use broadcast_stage::BroadcastStage; + use crdt::{Crdt, Node, LEADER_ROTATION_INTERVAL}; use entry::Entry; - use ledger::Block; use hash::Hash; + use ledger::Block; + use mint::Mint; use packet::BlobRecycler; use recorder::Recorder; use service::Service; use signature::{Keypair, KeypairUtil, Pubkey}; - use std::sync::{Arc, RwLock}; - use std::sync::mpsc::{channel, Receiver}; - use broadcast_stage::BroadcastStage; - use mint::Mint; - use streamer::BlobSender; use std::cmp; - use window::{new_window_from_entries, SharedWindow}; + use std::sync::mpsc::{channel, Receiver}; + use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; + use streamer::BlobSender; + use window::{new_window_from_entries, SharedWindow}; - fn setup_dummy_broadcast_stage() -> - (Pubkey, Pubkey, BroadcastStage, SharedWindow, BlobSender, BlobRecycler, Arc>, Vec, Receiver) - { + fn setup_dummy_broadcast_stage() -> ( + Pubkey, + Pubkey, + BroadcastStage, + SharedWindow, + BlobSender, + BlobRecycler, + Arc>, + Vec, + Receiver, + ) { // Setup dummy leader info let leader_keypair = Keypair::new(); let id = leader_keypair.pubkey(); let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - + // Give the leader somebody to broadcast to so he isn't lonely let buddy_keypair = Keypair::new(); let buddy_id = buddy_keypair.pubkey(); @@ -346,8 +362,7 @@ mod tests { } let genesis_len = entries.len() as u64; - let last_entry_hash = - entries.last().expect("Ledger should not be empty").id; + let last_entry_hash = entries.last().expect("Ledger should not be empty").id; // Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will // trigger a check for leader rotation. Because the next scheduled leader @@ -366,7 +381,7 @@ mod tests { .set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, buddy_id); // Input another LEADER_ROTATION_INTERVAL dummy entries, which will take us - // past the point of the leader rotation. The write_stage will see that + // past the point of the leader rotation. The write_stage will see that // it's no longer the leader after checking the crdt, and exit for _ in 0..LEADER_ROTATION_INTERVAL { let new_entry = recorder.record(vec![]); @@ -379,8 +394,7 @@ mod tests { } match exit_receiver.recv() { - Ok(x) if x == false => - panic!("Unexpected value on exit channel for Broadcast stage"), + Ok(x) if x == false => panic!("Unexpected value on exit channel for Broadcast stage"), _ => (), } diff --git a/src/fullnode.rs b/src/fullnode.rs index e8aa6b1434..38d6e4f21d 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -344,10 +344,10 @@ impl Service for Fullnode { match self.node_role { Some(NodeRole::Validator(validator_service)) => { validator_service.join()?; - }, + } Some(NodeRole::Leader(leader_service)) => { leader_service.join()?; - }, + } _ => (), } diff --git a/src/ledger.rs b/src/ledger.rs index 321c961ff9..58469cbabf 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -12,9 +12,9 @@ use mint::Mint; use packet::{self, SharedBlob, BLOB_DATA_SIZE}; use rayon::prelude::*; use result::{Error, Result}; +use signature::Pubkey; #[cfg(test)] use signature::{Keypair, KeypairUtil}; -use signature::Pubkey; use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; use std::io::prelude::*; use std::io::{self, BufReader, BufWriter, Seek, SeekFrom}; @@ -449,7 +449,7 @@ impl Block for [Entry] { .flat_map(|entry| entry.transactions.iter().filter_map(Transaction::vote)) .collect() } -} +} pub fn reconstruct_entries_from_blobs(blobs: Vec) -> Result> { let mut entries: Vec = Vec::with_capacity(blobs.len());