formatted code
This commit is contained in:
@ -167,7 +167,7 @@ impl BroadcastStage {
|
|||||||
let my_id = rcrdt.my_data().id;
|
let my_id = rcrdt.my_data().id;
|
||||||
match rcrdt.get_scheduled_leader(transmit_index.data) {
|
match rcrdt.get_scheduled_leader(transmit_index.data) {
|
||||||
Some(id) if id == my_id => (),
|
Some(id) if id == my_id => (),
|
||||||
// If the leader stays in power for the next
|
// If the leader stays in power for the next
|
||||||
// round as well, then we don't exit. Otherwise, exit.
|
// round as well, then we don't exit. Otherwise, exit.
|
||||||
_ => {
|
_ => {
|
||||||
return;
|
return;
|
||||||
@ -219,7 +219,15 @@ impl BroadcastStage {
|
|||||||
let thread_hdl = Builder::new()
|
let thread_hdl = Builder::new()
|
||||||
.name("solana-broadcaster".to_string())
|
.name("solana-broadcaster".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver);
|
Self::run(
|
||||||
|
&sock,
|
||||||
|
&crdt,
|
||||||
|
&window,
|
||||||
|
entry_height,
|
||||||
|
&recycler,
|
||||||
|
&receiver,
|
||||||
|
exit_sender,
|
||||||
|
);
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@ -238,32 +246,40 @@ impl Service for BroadcastStage {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crdt::{Crdt, LEADER_ROTATION_INTERVAL, Node};
|
use broadcast_stage::BroadcastStage;
|
||||||
|
use crdt::{Crdt, Node, LEADER_ROTATION_INTERVAL};
|
||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
use ledger::Block;
|
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
|
use ledger::Block;
|
||||||
|
use mint::Mint;
|
||||||
use packet::BlobRecycler;
|
use packet::BlobRecycler;
|
||||||
use recorder::Recorder;
|
use recorder::Recorder;
|
||||||
use service::Service;
|
use service::Service;
|
||||||
use signature::{Keypair, KeypairUtil, Pubkey};
|
use signature::{Keypair, KeypairUtil, Pubkey};
|
||||||
use std::sync::{Arc, RwLock};
|
|
||||||
use std::sync::mpsc::{channel, Receiver};
|
|
||||||
use broadcast_stage::BroadcastStage;
|
|
||||||
use mint::Mint;
|
|
||||||
use streamer::BlobSender;
|
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use window::{new_window_from_entries, SharedWindow};
|
use std::sync::mpsc::{channel, Receiver};
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread::sleep;
|
use std::thread::sleep;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use streamer::BlobSender;
|
||||||
|
use window::{new_window_from_entries, SharedWindow};
|
||||||
|
|
||||||
fn setup_dummy_broadcast_stage() ->
|
fn setup_dummy_broadcast_stage() -> (
|
||||||
(Pubkey, Pubkey, BroadcastStage, SharedWindow, BlobSender, BlobRecycler, Arc<RwLock<Crdt>>, Vec<Entry>, Receiver<bool>)
|
Pubkey,
|
||||||
{
|
Pubkey,
|
||||||
|
BroadcastStage,
|
||||||
|
SharedWindow,
|
||||||
|
BlobSender,
|
||||||
|
BlobRecycler,
|
||||||
|
Arc<RwLock<Crdt>>,
|
||||||
|
Vec<Entry>,
|
||||||
|
Receiver<bool>,
|
||||||
|
) {
|
||||||
// Setup dummy leader info
|
// Setup dummy leader info
|
||||||
let leader_keypair = Keypair::new();
|
let leader_keypair = Keypair::new();
|
||||||
let id = leader_keypair.pubkey();
|
let id = leader_keypair.pubkey();
|
||||||
let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
|
let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
|
||||||
|
|
||||||
// Give the leader somebody to broadcast to so he isn't lonely
|
// Give the leader somebody to broadcast to so he isn't lonely
|
||||||
let buddy_keypair = Keypair::new();
|
let buddy_keypair = Keypair::new();
|
||||||
let buddy_id = buddy_keypair.pubkey();
|
let buddy_id = buddy_keypair.pubkey();
|
||||||
@ -346,8 +362,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let genesis_len = entries.len() as u64;
|
let genesis_len = entries.len() as u64;
|
||||||
let last_entry_hash =
|
let last_entry_hash = entries.last().expect("Ledger should not be empty").id;
|
||||||
entries.last().expect("Ledger should not be empty").id;
|
|
||||||
|
|
||||||
// Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will
|
// Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will
|
||||||
// trigger a check for leader rotation. Because the next scheduled leader
|
// trigger a check for leader rotation. Because the next scheduled leader
|
||||||
@ -366,7 +381,7 @@ mod tests {
|
|||||||
.set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, buddy_id);
|
.set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, buddy_id);
|
||||||
|
|
||||||
// Input another LEADER_ROTATION_INTERVAL dummy entries, which will take us
|
// Input another LEADER_ROTATION_INTERVAL dummy entries, which will take us
|
||||||
// past the point of the leader rotation. The write_stage will see that
|
// past the point of the leader rotation. The write_stage will see that
|
||||||
// it's no longer the leader after checking the crdt, and exit
|
// it's no longer the leader after checking the crdt, and exit
|
||||||
for _ in 0..LEADER_ROTATION_INTERVAL {
|
for _ in 0..LEADER_ROTATION_INTERVAL {
|
||||||
let new_entry = recorder.record(vec![]);
|
let new_entry = recorder.record(vec![]);
|
||||||
@ -379,8 +394,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
match exit_receiver.recv() {
|
match exit_receiver.recv() {
|
||||||
Ok(x) if x == false =>
|
Ok(x) if x == false => panic!("Unexpected value on exit channel for Broadcast stage"),
|
||||||
panic!("Unexpected value on exit channel for Broadcast stage"),
|
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -344,10 +344,10 @@ impl Service for Fullnode {
|
|||||||
match self.node_role {
|
match self.node_role {
|
||||||
Some(NodeRole::Validator(validator_service)) => {
|
Some(NodeRole::Validator(validator_service)) => {
|
||||||
validator_service.join()?;
|
validator_service.join()?;
|
||||||
},
|
}
|
||||||
Some(NodeRole::Leader(leader_service)) => {
|
Some(NodeRole::Leader(leader_service)) => {
|
||||||
leader_service.join()?;
|
leader_service.join()?;
|
||||||
},
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,9 +12,9 @@ use mint::Mint;
|
|||||||
use packet::{self, SharedBlob, BLOB_DATA_SIZE};
|
use packet::{self, SharedBlob, BLOB_DATA_SIZE};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use result::{Error, Result};
|
use result::{Error, Result};
|
||||||
|
use signature::Pubkey;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
use signature::{Keypair, KeypairUtil};
|
use signature::{Keypair, KeypairUtil};
|
||||||
use signature::Pubkey;
|
|
||||||
use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
|
use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
|
||||||
use std::io::prelude::*;
|
use std::io::prelude::*;
|
||||||
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom};
|
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom};
|
||||||
@ -449,7 +449,7 @@ impl Block for [Entry] {
|
|||||||
.flat_map(|entry| entry.transactions.iter().filter_map(Transaction::vote))
|
.flat_map(|entry| entry.transactions.iter().filter_map(Transaction::vote))
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn reconstruct_entries_from_blobs(blobs: Vec<SharedBlob>) -> Result<Vec<Entry>> {
|
pub fn reconstruct_entries_from_blobs(blobs: Vec<SharedBlob>) -> Result<Vec<Entry>> {
|
||||||
let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len());
|
let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len());
|
||||||
|
Reference in New Issue
Block a user