Added check in broadcast stage to exit after transmitting last blob before leader rotation. Also added tests

This commit is contained in:
Carl
2018-09-12 13:59:19 -07:00
committed by Greg Fitzgerald
parent f28ba3937b
commit 98b47d2540
5 changed files with 50 additions and 45 deletions

View File

@ -124,5 +124,18 @@ fn main() -> () {
} }
} }
fullnode.join().expect("to never happen"); /*loop {
match fullnode.node_role {
NodeRole::Leader(leader_services) => {
// TODO: return an exit code that signals we should do a role switch
leader_services.join();
//fullnode.start_tvu();
},
NodeRole::Validator(validator_services) => {
validator_services.join();
}
}
}*/
let _ = fullnode.join();
} }

View File

@ -1,7 +1,7 @@
//! The `broadcast_stage` broadcasts data from a leader node to validators //! The `broadcast_stage` broadcasts data from a leader node to validators
//! //!
use counter::Counter; use counter::Counter;
use crdt::{Crdt, CrdtError, NodeInfo}; use crdt::{Crdt, CrdtError, NodeInfo, LEADER_ROTATION_INTERVAL};
use entry::Entry; use entry::Entry;
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
use erasure; use erasure;
@ -162,6 +162,19 @@ impl BroadcastStage {
let mut receive_index = entry_height; let mut receive_index = entry_height;
let me = crdt.read().unwrap().my_data().clone(); let me = crdt.read().unwrap().my_data().clone();
loop { loop {
if transmit_index.data % (LEADER_ROTATION_INTERVAL as u64) == 0 {
let rcrdt = crdt.read().unwrap();
let my_id = rcrdt.my_data().id;
match rcrdt.get_scheduled_leader(transmit_index.data) {
Some(id) if id == my_id => (),
// If the leader stays in power for the next
// round as well, then we don't exit. Otherwise, exit.
_ => {
return;
}
}
}
let broadcast_table = crdt.read().unwrap().compute_broadcast_table(); let broadcast_table = crdt.read().unwrap().compute_broadcast_table();
if let Err(e) = broadcast( if let Err(e) = broadcast(
&me, &me,
@ -207,7 +220,8 @@ impl BroadcastStage {
.name("solana-broadcaster".to_string()) .name("solana-broadcaster".to_string())
.spawn(move || { .spawn(move || {
Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver); Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver);
}).unwrap(); })
.unwrap();
BroadcastStage { thread_hdl } BroadcastStage { thread_hdl }
} }

View File

@ -39,7 +39,7 @@ impl LeaderServices {
} }
} }
fn join(self) -> Result<()> { pub fn join(self) -> Result<()> {
self.tpu.join()?; self.tpu.join()?;
self.broadcast_stage.join() self.broadcast_stage.join()
} }
@ -54,7 +54,7 @@ impl ValidatorServices {
ValidatorServices { tvu } ValidatorServices { tvu }
} }
fn join(self) -> Result<()> { pub fn join(self) -> Result<()> {
self.tvu.join() self.tvu.join()
} }
} }
@ -68,7 +68,7 @@ pub struct Fullnode {
rpu: Rpu, rpu: Rpu,
rpc_service: JsonRpcService, rpc_service: JsonRpcService,
ncp: Ncp, ncp: Ncp,
pub node_role: NodeRole, pub node_role: Option<NodeRole>,
} }
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
@ -140,9 +140,9 @@ impl Fullnode {
match leader_addr { match leader_addr {
Some(leader_addr) => { Some(leader_addr) => {
info!( info!(
"validator ready... local request address: {} (advertising {}) connected to: {}", "validator ready... local request address: {} (advertising {}) connected to: {}",
local_requests_addr, requests_addr, leader_addr local_requests_addr, requests_addr, leader_addr
); );
} }
None => { None => {
info!( info!(
@ -278,7 +278,7 @@ impl Fullnode {
exit.clone(), exit.clone(),
); );
let validator_state = ValidatorServices::new(tvu); let validator_state = ValidatorServices::new(tvu);
node_role = NodeRole::Validator(validator_state); node_role = Some(NodeRole::Validator(validator_state));
} }
None => { None => {
// Start in leader mode. // Start in leader mode.
@ -309,7 +309,7 @@ impl Fullnode {
entry_receiver, entry_receiver,
); );
let leader_state = LeaderServices::new(tpu, broadcast_stage); let leader_state = LeaderServices::new(tpu, broadcast_stage);
node_role = NodeRole::Leader(leader_state); node_role = Some(NodeRole::Leader(leader_state));
} }
} }
@ -340,13 +340,15 @@ impl Service for Fullnode {
self.rpu.join()?; self.rpu.join()?;
self.ncp.join()?; self.ncp.join()?;
self.rpc_service.join()?; self.rpc_service.join()?;
match self.node_role { match self.node_role {
NodeRole::Validator(validator_service) => { Some(NodeRole::Validator(validator_service)) => {
validator_service.join()?; validator_service.join()?;
} },
NodeRole::Leader(leader_service) => { Some(NodeRole::Leader(leader_service)) => {
leader_service.join()?; leader_service.join()?;
} },
_ => (),
} }
// TODO: Case on join values above to determine if // TODO: Case on join values above to determine if

View File

@ -12,9 +12,9 @@ use mint::Mint;
use packet::{self, SharedBlob, BLOB_DATA_SIZE}; use packet::{self, SharedBlob, BLOB_DATA_SIZE};
use rayon::prelude::*; use rayon::prelude::*;
use result::{Error, Result}; use result::{Error, Result};
use signature::Pubkey;
#[cfg(test)] #[cfg(test)]
use signature::{Keypair, KeypairUtil}; use signature::{Keypair, KeypairUtil};
use signature::Pubkey;
use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
use std::io::prelude::*; use std::io::prelude::*;
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom}; use std::io::{self, BufReader, BufWriter, Seek, SeekFrom};
@ -449,7 +449,7 @@ impl Block for [Entry] {
.flat_map(|entry| entry.transactions.iter().filter_map(Transaction::vote)) .flat_map(|entry| entry.transactions.iter().filter_map(Transaction::vote))
.collect() .collect()
} }
} }
pub fn reconstruct_entries_from_blobs(blobs: Vec<SharedBlob>) -> Result<Vec<Entry>> { pub fn reconstruct_entries_from_blobs(blobs: Vec<SharedBlob>) -> Result<Vec<Entry>> {
let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len()); let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len());
@ -549,14 +549,17 @@ pub fn next_entries(
#[cfg(test)] #[cfg(test)]
pub fn tmp_ledger_path(name: &str) -> String { pub fn tmp_ledger_path(name: &str) -> String {
let keypair = Keypair::new(); let keypair = Keypair::new();
format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey()) format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey())
} }
#[cfg(test)] #[cfg(test)]
pub fn genesis(name: &str, num: i64) -> (Mint, String) { pub fn genesis(name: &str, num: i64) -> (Mint, String) {
let mint = Mint::new(num); let mint = Mint::new(num);
let path = tmp_ledger_path(name); let path = tmp_ledger_path(name);
let mut writer = LedgerWriter::open(&path, true).unwrap(); let mut writer = LedgerWriter::open(&path, true).unwrap();
writer.write_entries(mint.create_entries()).unwrap(); writer.write_entries(mint.create_entries()).unwrap();
(mint, path) (mint, path)
} }
@ -574,14 +577,6 @@ mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use transaction::Transaction; use transaction::Transaction;
fn tmp_ledger_path(name: &str) -> String {
use std::env;
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let keypair = Keypair::new();
format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey())
}
#[test] #[test]
fn test_verify_slice() { fn test_verify_slice() {
use logger; use logger;

View File

@ -290,11 +290,8 @@ mod tests {
use service::Service; use service::Service;
use signature::{Keypair, KeypairUtil, Pubkey}; use signature::{Keypair, KeypairUtil, Pubkey};
use std::fs::remove_dir_all; use std::fs::remove_dir_all;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use write_stage::{WriteStage, WriteStageReturnType}; use write_stage::{WriteStage, WriteStageReturnType};
fn process_ledger(ledger_path: &str, bank: &Bank) -> (u64, Vec<Entry>) { fn process_ledger(ledger_path: &str, bank: &Bank) -> (u64, Vec<Entry>) {
@ -310,7 +307,6 @@ mod tests {
fn setup_dummy_write_stage() -> ( fn setup_dummy_write_stage() -> (
Pubkey, Pubkey,
WriteStage, WriteStage,
Arc<AtomicBool>,
Sender<Vec<Entry>>, Sender<Vec<Entry>>,
Receiver<Vec<Entry>>, Receiver<Vec<Entry>>,
Arc<RwLock<Crdt>>, Arc<RwLock<Crdt>>,
@ -347,11 +343,9 @@ mod tests {
entry_height, entry_height,
); );
let exit_sender = Arc::new(AtomicBool::new(false));
( (
id, id,
write_stage, write_stage,
exit_sender,
entry_sender, entry_sender,
write_stage_entry_receiver, write_stage_entry_receiver,
crdt, crdt,
@ -366,7 +360,6 @@ mod tests {
let ( let (
id, id,
write_stage, write_stage,
exit_sender,
entry_sender, entry_sender,
_write_stage_entry_receiver, _write_stage_entry_receiver,
crdt, crdt,
@ -392,16 +385,6 @@ mod tests {
entry_sender.send(new_entry).unwrap(); entry_sender.send(new_entry).unwrap();
} }
// Wait until at least LEADER_ROTATION_INTERVAL have been written to the ledger
loop {
sleep(Duration::from_secs(1));
let (current_entry_height, _) = process_ledger(&leader_ledger_path, &bank);
if current_entry_height == LEADER_ROTATION_INTERVAL {
break;
}
}
// Set the scheduled next leader in the crdt to some other node // Set the scheduled next leader in the crdt to some other node
let leader2_keypair = Keypair::new(); let leader2_keypair = Keypair::new();
let leader2_info = Node::new_localhost_with_pubkey(leader2_keypair.pubkey()); let leader2_info = Node::new_localhost_with_pubkey(leader2_keypair.pubkey());
@ -415,14 +398,12 @@ mod tests {
// Input another LEADER_ROTATION_INTERVAL dummy entries one at a time, // Input another LEADER_ROTATION_INTERVAL dummy entries one at a time,
// which will take us past the point of the leader rotation. // which will take us past the point of the leader rotation.
// The write_stage will see that it's no longer the leader after // The write_stage will see that it's no longer the leader after
// checking the crdt, and exit // checking the schedule, and exit
for _ in 0..LEADER_ROTATION_INTERVAL { for _ in 0..LEADER_ROTATION_INTERVAL {
let new_entry = recorder.record(vec![]); let new_entry = recorder.record(vec![]);
entry_sender.send(new_entry).unwrap(); entry_sender.send(new_entry).unwrap();
} }
// Make sure the threads closed cleanly
exit_sender.store(true, Ordering::Relaxed);
assert_eq!( assert_eq!(
write_stage.join().unwrap(), write_stage.join().unwrap(),
WriteStageReturnType::LeaderRotation WriteStageReturnType::LeaderRotation