From f28ba3937b82e52a42262859ad8937920d24db90 Mon Sep 17 00:00:00 2001 From: Carl Date: Tue, 11 Sep 2018 18:40:38 -0700 Subject: [PATCH] Added check in write stage to exit when scheduled entry_height for leader rotation is detected --- src/crdt.rs | 22 ++++ src/fullnode.rs | 3 +- src/ledger.rs | 18 +++ src/tpu.rs | 2 + src/write_stage.rs | 291 ++++++++++++++++++++++++++++++++++++++++++--- 5 files changed, 320 insertions(+), 16 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index 42177d79a8..53b73b3de0 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -37,6 +37,11 @@ use timing::{duration_as_ms, timestamp}; use window::{SharedWindow, WindowIndex}; pub const FULLNODE_PORT_RANGE: (u16, u16) = (8000, 10_000); +#[cfg(test)] +pub const LEADER_ROTATION_INTERVAL: u64 = 10; +#[cfg(not(test))] +pub const LEADER_ROTATION_INTERVAL: u64 = 100; + /// milliseconds we sleep for between gossip requests const GOSSIP_SLEEP_MILLIS: u64 = 100; const GOSSIP_PURGE_MILLIS: u64 = 15000; @@ -205,6 +210,9 @@ pub struct Crdt { /// last time we heard from anyone getting a message fro this public key /// these are rumers and shouldn't be trusted directly external_liveness: HashMap>, + /// TODO: Clearly not the correct implementation of this, but a temporary abstraction + /// for testing + pub scheduled_leaders: HashMap, } // TODO These messages should be signed, and go through the gpu pipeline for spam filtering #[derive(Serialize, Deserialize, Debug)] @@ -235,6 +243,7 @@ impl Crdt { external_liveness: HashMap::new(), id: node_info.id, update_index: 1, + scheduled_leaders: HashMap::new(), }; me.local.insert(node_info.id, me.update_index); me.table.insert(node_info.id, node_info); @@ -297,6 +306,19 @@ impl Crdt { self.insert(&me); } + // TODO: Dummy leader scheduler, need to implement actual leader scheduling. + pub fn get_scheduled_leader(&self, entry_height: u64) -> Option { + match self.scheduled_leaders.get(&entry_height) { + Some(x) => Some(x.clone()), + None => Some(self.my_data().leader_id), + } + } + + // TODO: Dummy leader schedule setter, need to implement actual leader scheduling. + pub fn set_scheduled_leader(&mut self, entry_height: u64, new_leader_id: Pubkey) -> () { + self.scheduled_leaders.insert(entry_height, new_leader_id); + } + pub fn get_external_liveness_entry(&self, key: &Pubkey) -> Option<&HashMap> { self.external_liveness.get(key) } diff --git a/src/fullnode.rs b/src/fullnode.rs index 9d9ee31389..5d7d0d7828 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -205,7 +205,7 @@ impl Fullnode { /// `--------` | | `------------` /// `-------------------------------` /// ``` -pub fn new_with_bank( + pub fn new_with_bank( keypair: Keypair, bank: Bank, entry_height: u64, @@ -297,6 +297,7 @@ pub fn new_with_bank( exit.clone(), ledger_path, sigverify_disabled, + entry_height, ); let broadcast_stage = BroadcastStage::new( diff --git a/src/ledger.rs b/src/ledger.rs index 77283817f1..ef308c0f0e 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -7,10 +7,14 @@ use entry::Entry; use hash::Hash; use instruction::Vote; use log::Level::Trace; +#[cfg(test)] +use mint::Mint; use packet::{self, SharedBlob, BLOB_DATA_SIZE}; use rayon::prelude::*; use result::{Error, Result}; use signature::Pubkey; +#[cfg(test)] +use signature::{Keypair, KeypairUtil}; use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; use std::io::prelude::*; use std::io::{self, BufReader, BufWriter, Seek, SeekFrom}; @@ -542,6 +546,20 @@ pub fn next_entries( next_entries_mut(&mut id, &mut num_hashes, transactions) } +#[cfg(test)] +pub fn tmp_ledger_path(name: &str) -> String { + let keypair = Keypair::new(); + format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey()) +} +#[cfg(test)] +pub fn genesis(name: &str, num: i64) -> (Mint, String) { + let mint = Mint::new(num); + let path = tmp_ledger_path(name); + let mut writer = LedgerWriter::open(&path, true).unwrap(); + writer.write_entries(mint.create_entries()).unwrap(); + (mint, path) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/tpu.rs b/src/tpu.rs index f3f4b0e052..122b56a7aa 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -62,6 +62,7 @@ impl Tpu { exit: Arc, ledger_path: &str, sigverify_disabled: bool, + entry_height: u64, ) -> (Self, Receiver>) { let mut packet_recycler = PacketRecycler::default(); packet_recycler.set_name("tpu::Packet"); @@ -89,6 +90,7 @@ impl Tpu { blob_recycler.clone(), ledger_path, entry_receiver, + entry_height, ); let tpu = Tpu { diff --git a/src/write_stage.rs b/src/write_stage.rs index da36210beb..8fa7dc93d0 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -4,7 +4,7 @@ use bank::Bank; use counter::Counter; -use crdt::Crdt; +use crdt::{Crdt, LEADER_ROTATION_INTERVAL}; use entry::Entry; use ledger::{Block, LedgerWriter}; use log::Level; @@ -12,6 +12,7 @@ use packet::BlobRecycler; use result::{Error, Result}; use service::Service; use signature::Keypair; +use std::cmp; use std::net::UdpSocket; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; @@ -22,11 +23,60 @@ use streamer::responder; use timing::{duration_as_ms, duration_as_s}; use vote_stage::send_leader_vote; +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum WriteStageReturnType { + LeaderRotation, + ChannelDisconnected, +} + pub struct WriteStage { thread_hdls: Vec>, + write_thread: JoinHandle, } impl WriteStage { + // Given a vector of potential new entries to write, return as many as we can + // fit before we hit the entry height for leader rotation. Also return a boolean + // reflecting whether we actually hit an entry height for leader rotation. + fn find_leader_rotation_index( + crdt: &Arc>, + entry_height: u64, + mut new_entries: Vec, + ) -> (Vec, bool) { + // Find out how many more entries we can squeeze in until the next leader + // rotation + let entries_until_leader_rotation = + LEADER_ROTATION_INTERVAL - (entry_height % LEADER_ROTATION_INTERVAL); + + let new_entries_length = new_entries.len(); + + let mut i = cmp::min(entries_until_leader_rotation as usize, new_entries_length); + + let mut is_leader_rotation = false; + + loop { + if (entry_height + i as u64) % LEADER_ROTATION_INTERVAL == 0 { + let rcrdt = crdt.read().unwrap(); + let my_id = rcrdt.my_data().id; + let next_leader = rcrdt.get_scheduled_leader(entry_height + i as u64); + if next_leader != Some(my_id) { + is_leader_rotation = true; + break; + } + } + + if i == new_entries_length { + break; + } + + i += cmp::min(LEADER_ROTATION_INTERVAL as usize, new_entries_length - i); + } + + new_entries.truncate(i as usize); + + (new_entries, is_leader_rotation) + } + /// Process any Entry items that have been published by the RecordStage. /// continuosly send entries out pub fn write_and_send_entries( @@ -34,19 +84,37 @@ impl WriteStage { ledger_writer: &mut LedgerWriter, entry_sender: &Sender>, entry_receiver: &Receiver>, + entry_height: &mut u64, ) -> Result<()> { let mut ventries = Vec::new(); - let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; - let mut num_entries = entries.len(); + let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; + let mut num_new_entries = received_entries.len(); let mut num_txs = 0; - ventries.push(entries); - while let Ok(more) = entry_receiver.try_recv() { - num_entries += more.len(); - ventries.push(more); + loop { + // Find out how many more entries we can squeeze in until the next leader + // rotation + let (new_entries, is_leader_rotation) = Self::find_leader_rotation_index( + crdt, + *entry_height + num_new_entries as u64, + received_entries, + ); + + num_new_entries += new_entries.len(); + ventries.push(new_entries); + + if is_leader_rotation { + break; + } + + if let Ok(n) = entry_receiver.try_recv() { + received_entries = n; + } else { + break; + } } - info!("write_stage entries: {}", num_entries); + info!("write_stage entries: {}", num_new_entries); let to_blobs_total = 0; let mut blob_send_total = 0; @@ -65,6 +133,9 @@ impl WriteStage { crdt_votes_total += duration_as_ms(&crdt_votes_start.elapsed()); ledger_writer.write_entries(entries.clone())?; + // Once the entries have been written to the ledger, then we can + // safely incement entry height + *entry_height += entries.len() as u64; let register_entry_start = Instant::now(); register_entry_total += duration_as_ms(®ister_entry_start.elapsed()); @@ -105,6 +176,7 @@ impl WriteStage { blob_recycler: BlobRecycler, ledger_path: &str, entry_receiver: Receiver>, + entry_height: u64, ) -> (Self, Receiver>) { let (vote_blob_sender, vote_blob_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); @@ -117,21 +189,47 @@ impl WriteStage { let (entry_sender, entry_receiver_forward) = channel(); let mut ledger_writer = LedgerWriter::recover(ledger_path).unwrap(); - let thread_hdl = Builder::new() + let write_thread = Builder::new() .name("solana-writer".to_string()) .spawn(move || { let mut last_vote = 0; let mut last_valid_validator_timestamp = 0; let id = crdt.read().unwrap().id; + let mut entry_height = entry_height; loop { + // Note that entry height is not zero indexed, it starts at 1, so the + // old leader is in power up to and including entry height + // n * LEADER_ROTATION_INTERVAL, so once we've forwarded that last block, + // check for the next leader. + if entry_height % (LEADER_ROTATION_INTERVAL as u64) == 0 { + let rcrdt = crdt.read().unwrap(); + let my_id = rcrdt.my_data().id; + let scheduled_leader = rcrdt.get_scheduled_leader(entry_height); + drop(rcrdt); + match scheduled_leader { + Some(id) if id == my_id => (), + // If the leader stays in power for the next + // round as well, then we don't exit. Otherwise, exit. + _ => { + // When the broadcast stage has received the last blob, it + // will signal to close the fetch stage, which will in turn + // close down this write stage + return WriteStageReturnType::LeaderRotation; + } + } + } + if let Err(e) = Self::write_and_send_entries( &crdt, &mut ledger_writer, &entry_sender, &entry_receiver, + &mut entry_height, ) { match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { + return WriteStageReturnType::ChannelDisconnected + } Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), _ => { inc_new_counter_info!( @@ -158,18 +256,181 @@ impl WriteStage { } }).unwrap(); - let thread_hdls = vec![t_responder, thread_hdl]; - (WriteStage { thread_hdls }, entry_receiver_forward) + let thread_hdls = vec![t_responder]; + ( + WriteStage { + write_thread, + thread_hdls, + }, + entry_receiver_forward, + ) } } impl Service for WriteStage { - type JoinReturnType = (); + type JoinReturnType = WriteStageReturnType; - fn join(self) -> thread::Result<()> { + fn join(self) -> thread::Result { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } - Ok(()) + + self.write_thread.join() + } +} + +#[cfg(test)] +mod tests { + use bank::Bank; + use crdt::{Crdt, Node, LEADER_ROTATION_INTERVAL}; + use entry::Entry; + use ledger::{genesis, read_ledger}; + use packet::BlobRecycler; + use recorder::Recorder; + use service::Service; + use signature::{Keypair, KeypairUtil, Pubkey}; + use std::fs::remove_dir_all; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::mpsc::{channel, Receiver, Sender}; + use std::sync::{Arc, RwLock}; + use std::thread::sleep; + use std::time::Duration; + use write_stage::{WriteStage, WriteStageReturnType}; + + fn process_ledger(ledger_path: &str, bank: &Bank) -> (u64, Vec) { + let entries = read_ledger(ledger_path, true).expect("opening ledger"); + + let entries = entries + .map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err))); + + info!("processing ledger..."); + bank.process_ledger(entries).expect("process_ledger") + } + + fn setup_dummy_write_stage() -> ( + Pubkey, + WriteStage, + Arc, + Sender>, + Receiver>, + Arc>, + Arc, + String, + Vec, + ) { + // Setup leader info + let leader_keypair = Keypair::new(); + let id = leader_keypair.pubkey(); + let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); + + let crdt = Arc::new(RwLock::new(Crdt::new(leader_info.info).expect("Crdt::new"))); + let bank = Bank::new_default(true); + let bank = Arc::new(bank); + let blob_recycler = BlobRecycler::default(); + + // Make a ledger + let (_, leader_ledger_path) = genesis("test_leader_rotation_exit", 10_000); + + let (entry_height, ledger_tail) = process_ledger(&leader_ledger_path, &bank); + + // Make a dummy pipe + let (entry_sender, entry_receiver) = channel(); + + // Start up the write stage + let (write_stage, write_stage_entry_receiver) = WriteStage::new( + leader_keypair, + bank.clone(), + crdt.clone(), + blob_recycler, + &leader_ledger_path, + entry_receiver, + entry_height, + ); + + let exit_sender = Arc::new(AtomicBool::new(false)); + ( + id, + write_stage, + exit_sender, + entry_sender, + write_stage_entry_receiver, + crdt, + bank, + leader_ledger_path, + ledger_tail, + ) + } + + #[test] + fn test_write_stage_leader_rotation_exit() { + let ( + id, + write_stage, + exit_sender, + entry_sender, + _write_stage_entry_receiver, + crdt, + bank, + leader_ledger_path, + ledger_tail, + ) = setup_dummy_write_stage(); + + crdt.write() + .unwrap() + .set_scheduled_leader(LEADER_ROTATION_INTERVAL, id); + + let last_entry_hash = ledger_tail.last().expect("Ledger should not be empty").id; + + let genesis_entry_height = ledger_tail.len() as u64; + + // Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will + // trigger a check for leader rotation. Because the next scheduled leader + // is ourselves, we won't exit + let mut recorder = Recorder::new(last_entry_hash); + for _ in genesis_entry_height..LEADER_ROTATION_INTERVAL { + let new_entry = recorder.record(vec![]); + entry_sender.send(new_entry).unwrap(); + } + + // Wait until at least LEADER_ROTATION_INTERVAL have been written to the ledger + loop { + sleep(Duration::from_secs(1)); + let (current_entry_height, _) = process_ledger(&leader_ledger_path, &bank); + + if current_entry_height == LEADER_ROTATION_INTERVAL { + break; + } + } + + // Set the scheduled next leader in the crdt to some other node + let leader2_keypair = Keypair::new(); + let leader2_info = Node::new_localhost_with_pubkey(leader2_keypair.pubkey()); + + { + let mut wcrdt = crdt.write().unwrap(); + wcrdt.insert(&leader2_info.info); + wcrdt.set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, leader2_keypair.pubkey()); + } + + // Input another LEADER_ROTATION_INTERVAL dummy entries one at a time, + // which will take us past the point of the leader rotation. + // The write_stage will see that it's no longer the leader after + // checking the crdt, and exit + for _ in 0..LEADER_ROTATION_INTERVAL { + let new_entry = recorder.record(vec![]); + entry_sender.send(new_entry).unwrap(); + } + + // Make sure the threads closed cleanly + exit_sender.store(true, Ordering::Relaxed); + assert_eq!( + write_stage.join().unwrap(), + WriteStageReturnType::LeaderRotation + ); + + // Make sure the ledger contains exactly LEADER_ROTATION_INTERVAL entries + let (entry_height, _) = process_ledger(&leader_ledger_path, &bank); + remove_dir_all(leader_ledger_path).unwrap(); + assert_eq!(entry_height, 2 * LEADER_ROTATION_INTERVAL); } }