Move ledger write to its own stage (#1577)

* Move ledger write to its own stage

- Also, rename write_stage to leader_vote_stage, as write functionality
  is moved to a different stage

* Address review comments

* Fix leader rotation test failure

* address review comments
This commit is contained in:
Pankaj Garg
2018-10-23 14:42:48 -07:00
committed by GitHub
parent c77b1c9687
commit 8d9912b4e2
13 changed files with 181 additions and 113 deletions

View File

@ -54,7 +54,7 @@ fn main() -> Result<(), Box<error::Error>> {
let mint = Mint::new_with_pkcs8(tokens, pkcs8); let mint = Mint::new_with_pkcs8(tokens, pkcs8);
let mut ledger_writer = LedgerWriter::open(&ledger_path, true)?; let mut ledger_writer = LedgerWriter::open(&ledger_path, true)?;
ledger_writer.write_entries(mint.create_entries())?; ledger_writer.write_entries(&mint.create_entries())?;
Ok(()) Ok(())
} }

View File

@ -104,7 +104,7 @@ mod tests {
let ledger_path = get_tmp_ledger_path(ledger_dir); let ledger_path = get_tmp_ledger_path(ledger_dir);
{ {
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(entries.clone()).unwrap(); writer.write_entries(&entries).unwrap();
} }
let out_path = Path::new("test_chacha_encrypt_file_many_keys_output.txt.enc"); let out_path = Path::new("test_chacha_encrypt_file_many_keys_output.txt.enc");

View File

@ -1823,7 +1823,7 @@ mod tests {
let zero = Hash::default(); let zero = Hash::default();
let one = hash(&zero.as_ref()); let one = hash(&zero.as_ref());
writer writer
.write_entries(vec![Entry::new_tick(0, &zero), Entry::new_tick(0, &one)].to_vec()) .write_entries(&vec![Entry::new_tick(0, &zero), Entry::new_tick(0, &one)].to_vec())
.unwrap(); .unwrap();
path path
} }

View File

@ -817,7 +817,7 @@ mod tests {
.iter() .iter()
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64) .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64)
+ num_ending_ticks as u64; + num_ending_ticks as u64;
ledger_writer.write_entries(active_set_entries).unwrap(); ledger_writer.write_entries(&active_set_entries).unwrap();
// Create the common leader scheduling configuration // Create the common leader scheduling configuration
let num_slots_per_epoch = 3; let num_slots_per_epoch = 3;
@ -913,7 +913,7 @@ mod tests {
let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height;
let active_set_entries_len = active_set_entries.len() as u64; let active_set_entries_len = active_set_entries.len() as u64;
last_id = active_set_entries.last().unwrap().id; last_id = active_set_entries.last().unwrap().id;
ledger_writer.write_entries(active_set_entries).unwrap(); ledger_writer.write_entries(&active_set_entries).unwrap();
let ledger_initial_len = genesis_entries.len() as u64 + active_set_entries_len; let ledger_initial_len = genesis_entries.len() as u64 + active_set_entries_len;
// Set the leader scheduler for the validator // Set the leader scheduler for the validator

View File

@ -1,12 +1,12 @@
//! The `write_stage` module implements the TPU's write stage. It //! The `leader_vote_stage` module implements the TPU's vote stage. It
//! writes entries to the given writer, which is typically a file or //! computes and notes the votes for the entries, and then sends the
//! stdout, and then sends the Entry to its output channel. //! Entry to its output channel.
use bank::Bank; use bank::Bank;
use cluster_info::ClusterInfo; use cluster_info::ClusterInfo;
use counter::Counter; use counter::Counter;
use entry::Entry; use entry::Entry;
use ledger::{Block, LedgerWriter}; use ledger::Block;
use log::Level; use log::Level;
use result::{Error, Result}; use result::{Error, Result};
use service::Service; use service::Service;
@ -18,20 +18,19 @@ use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use streamer::responder; use streamer::responder;
use timing::{duration_as_ms, duration_as_s}; use timing::duration_as_ms;
use vote_stage::send_leader_vote; use vote_stage::send_leader_vote;
pub struct WriteStage { pub struct LeaderVoteStage {
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
write_thread: JoinHandle<()>, vote_thread: JoinHandle<()>,
} }
impl WriteStage { impl LeaderVoteStage {
/// Process any Entry items that have been published by the RecordStage. /// Process any Entry items that have been published by the RecordStage.
/// continuosly send entries out /// continuosly send entries out
pub fn write_and_send_entries( pub fn compute_vote_and_send_entries(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
ledger_writer: &mut LedgerWriter,
entry_sender: &Sender<Vec<Entry>>, entry_sender: &Sender<Vec<Entry>>,
entry_receiver: &Receiver<Vec<Entry>>, entry_receiver: &Receiver<Vec<Entry>>,
) -> Result<()> { ) -> Result<()> {
@ -39,7 +38,6 @@ impl WriteStage {
let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let now = Instant::now(); let now = Instant::now();
let mut num_new_entries = 0; let mut num_new_entries = 0;
let mut num_txs = 0;
loop { loop {
num_new_entries += received_entries.len(); num_new_entries += received_entries.len();
@ -51,84 +49,60 @@ impl WriteStage {
break; break;
} }
} }
inc_new_counter_info!("write_stage-entries_received", num_new_entries); inc_new_counter_info!("leader_vote_stage-entries_received", num_new_entries);
debug!("leader_vote_stage entries: {}", num_new_entries);
debug!("write_stage entries: {}", num_new_entries);
let mut entries_send_total = 0;
let mut cluster_info_votes_total = 0;
let start = Instant::now();
for entries in ventries { for entries in ventries {
let cluster_info_votes_start = Instant::now();
let votes = &entries.votes(); let votes = &entries.votes();
cluster_info.write().unwrap().insert_votes(&votes); cluster_info.write().unwrap().insert_votes(&votes);
cluster_info_votes_total += duration_as_ms(&cluster_info_votes_start.elapsed());
for e in &entries { inc_new_counter_info!("leader_vote_stage-write_entries", entries.len());
num_txs += e.transactions.len();
ledger_writer.write_entry_noflush(&e)?;
}
inc_new_counter_info!("write_stage-write_entries", entries.len());
//TODO(anatoly): real stake based voting needs to change this //TODO(anatoly): real stake based voting needs to change this
//leader simply votes if the current set of validators have voted //leader simply votes if the current set of validators have voted
//on a valid last id //on a valid last id
trace!("New entries? {}", entries.len()); trace!("New entries? {}", entries.len());
let entries_send_start = Instant::now();
if !entries.is_empty() { if !entries.is_empty() {
inc_new_counter_info!("write_stage-recv_vote", votes.len()); inc_new_counter_info!("leader_vote_stage-recv_vote", votes.len());
inc_new_counter_info!("write_stage-entries_sent", entries.len()); inc_new_counter_info!("leader_vote_stage-entries_sent", entries.len());
trace!("broadcasting {}", entries.len()); trace!("broadcasting {}", entries.len());
entry_sender.send(entries)?; entry_sender.send(entries)?;
} }
entries_send_total += duration_as_ms(&entries_send_start.elapsed());
} }
ledger_writer.flush()?;
inc_new_counter_info!( inc_new_counter_info!(
"write_stage-time_ms", "leader_vote_stage-time_ms",
duration_as_ms(&now.elapsed()) as usize duration_as_ms(&now.elapsed()) as usize
); );
debug!("done write_stage txs: {} time {} ms txs/s: {} entries_send_total: {} cluster_info_votes_total: {}",
num_txs, duration_as_ms(&start.elapsed()),
num_txs as f32 / duration_as_s(&start.elapsed()),
entries_send_total,
cluster_info_votes_total);
Ok(()) Ok(())
} }
/// Create a new WriteStage for writing and broadcasting entries. /// Create a new LeaderVoteStage for voting and broadcasting entries.
pub fn new( pub fn new(
keypair: Arc<Keypair>, keypair: Arc<Keypair>,
bank: Arc<Bank>, bank: Arc<Bank>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
ledger_path: &str,
entry_receiver: Receiver<Vec<Entry>>, entry_receiver: Receiver<Vec<Entry>>,
) -> (Self, Receiver<Vec<Entry>>) { ) -> (Self, Receiver<Vec<Entry>>) {
let (vote_blob_sender, vote_blob_receiver) = channel(); let (vote_blob_sender, vote_blob_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
let t_responder = responder( let t_responder = responder(
"write_stage_vote_sender", "leader_vote_stage_vote_sender",
Arc::new(send), Arc::new(send),
vote_blob_receiver, vote_blob_receiver,
); );
let (entry_sender, entry_receiver_forward) = channel(); let (entry_sender, entry_receiver_forward) = channel();
let mut ledger_writer = LedgerWriter::recover(ledger_path).unwrap();
let write_thread = Builder::new() let vote_thread = Builder::new()
.name("solana-writer".to_string()) .name("solana-writer".to_string())
.spawn(move || { .spawn(move || {
let mut last_vote = 0; let mut last_vote = 0;
let mut last_valid_validator_timestamp = 0; let mut last_valid_validator_timestamp = 0;
let id = cluster_info.read().unwrap().id; let id = cluster_info.read().unwrap().id;
loop { loop {
if let Err(e) = Self::write_and_send_entries( if let Err(e) = Self::compute_vote_and_send_entries(
&cluster_info, &cluster_info,
&mut ledger_writer,
&entry_sender, &entry_sender,
&entry_receiver, &entry_receiver,
) { ) {
@ -139,7 +113,7 @@ impl WriteStage {
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => { _ => {
inc_new_counter_info!( inc_new_counter_info!(
"write_stage-write_and_send_entries-error", "leader_vote_stage-compute_vote_and_send_entries-error",
1 1
); );
error!("{:?}", e); error!("{:?}", e);
@ -155,7 +129,7 @@ impl WriteStage {
&mut last_vote, &mut last_vote,
&mut last_valid_validator_timestamp, &mut last_valid_validator_timestamp,
) { ) {
inc_new_counter_info!("write_stage-leader_vote-error", 1); inc_new_counter_info!("leader_vote_stage-leader_vote-error", 1);
error!("{:?}", e); error!("{:?}", e);
} }
} }
@ -163,8 +137,8 @@ impl WriteStage {
let thread_hdls = vec![t_responder]; let thread_hdls = vec![t_responder];
( (
WriteStage { LeaderVoteStage {
write_thread, vote_thread,
thread_hdls, thread_hdls,
}, },
entry_receiver_forward, entry_receiver_forward,
@ -172,7 +146,7 @@ impl WriteStage {
} }
} }
impl Service for WriteStage { impl Service for LeaderVoteStage {
type JoinReturnType = (); type JoinReturnType = ();
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
@ -180,6 +154,6 @@ impl Service for WriteStage {
thread_hdl.join()?; thread_hdl.join()?;
} }
self.write_thread.join() self.vote_thread.join()
} }
} }

View File

@ -372,7 +372,7 @@ impl LedgerWriter {
Ok(LedgerWriter { index, data }) Ok(LedgerWriter { index, data })
} }
pub fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> { fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> {
let len = serialized_size(&entry).map_err(err_bincode_to_io)?; let len = serialized_size(&entry).map_err(err_bincode_to_io)?;
serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?; serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?;
@ -399,25 +399,23 @@ impl LedgerWriter {
Ok(()) Ok(())
} }
pub fn flush(&mut self) -> io::Result<()> { pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
self.write_entry_noflush(&entry)?;
self.index.flush()?; self.index.flush()?;
self.data.flush()?; self.data.flush()?;
Ok(()) Ok(())
} }
pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { pub fn write_entries<'a, I>(&mut self, entries: I) -> io::Result<()>
self.write_entry_noflush(&entry)?;
self.flush()
}
pub fn write_entries<I>(&mut self, entries: I) -> io::Result<()>
where where
I: IntoIterator<Item = Entry>, I: IntoIterator<Item = &'a Entry>,
{ {
for entry in entries { for entry in entries {
self.write_entry_noflush(&entry)?; self.write_entry_noflush(&entry)?;
} }
self.flush() self.index.flush()?;
self.data.flush()?;
Ok(())
} }
} }
@ -616,7 +614,7 @@ pub fn create_tmp_ledger_with_mint(name: &str, mint: &Mint) -> String {
let path = get_tmp_ledger_path(name); let path = get_tmp_ledger_path(name);
let mut writer = LedgerWriter::open(&path, true).unwrap(); let mut writer = LedgerWriter::open(&path, true).unwrap();
writer.write_entries(mint.create_entries()).unwrap(); writer.write_entries(&mint.create_entries()).unwrap();
path path
} }
@ -653,7 +651,7 @@ pub fn create_tmp_sample_ledger(
genesis.extend(ticks); genesis.extend(ticks);
let mut writer = LedgerWriter::open(&path, true).unwrap(); let mut writer = LedgerWriter::open(&path, true).unwrap();
writer.write_entries(genesis.clone()).unwrap(); writer.write_entries(&genesis.clone()).unwrap();
(mint, path, genesis) (mint, path, genesis)
} }
@ -830,7 +828,7 @@ mod tests {
{ {
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(entries.clone()).unwrap(); writer.write_entries(&entries.clone()).unwrap();
// drops writer, flushes buffers // drops writer, flushes buffers
} }
verify_ledger(&ledger_path).unwrap(); verify_ledger(&ledger_path).unwrap();
@ -862,7 +860,7 @@ mod tests {
fn truncated_last_entry(ledger_path: &str, entries: Vec<Entry>) { fn truncated_last_entry(ledger_path: &str, entries: Vec<Entry>) {
let len = { let len = {
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(entries).unwrap(); writer.write_entries(&entries).unwrap();
writer.data.seek(SeekFrom::Current(0)).unwrap() writer.data.seek(SeekFrom::Current(0)).unwrap()
}; };
verify_ledger(&ledger_path).unwrap(); verify_ledger(&ledger_path).unwrap();
@ -876,7 +874,7 @@ mod tests {
fn garbage_on_data(ledger_path: &str, entries: Vec<Entry>) { fn garbage_on_data(ledger_path: &str, entries: Vec<Entry>) {
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(entries).unwrap(); writer.write_entries(&entries).unwrap();
writer.data.write_all(b"hi there!").unwrap(); writer.data.write_all(b"hi there!").unwrap();
} }
@ -959,7 +957,7 @@ mod tests {
let ledger_path = get_tmp_ledger_path("test_verify_ledger"); let ledger_path = get_tmp_ledger_path("test_verify_ledger");
{ {
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(entries.clone()).unwrap(); writer.write_entries(&entries).unwrap();
} }
// TODO more cases that make ledger_verify() fail // TODO more cases that make ledger_verify() fail
// assert!(verify_ledger(&ledger_path).is_err()); // assert!(verify_ledger(&ledger_path).is_err());
@ -976,7 +974,7 @@ mod tests {
let ledger_path = get_tmp_ledger_path("test_raw_entries"); let ledger_path = get_tmp_ledger_path("test_raw_entries");
{ {
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(entries.clone()).unwrap(); writer.write_entries(&entries).unwrap();
} }
let mut window = LedgerWindow::open(&ledger_path).unwrap(); let mut window = LedgerWindow::open(&ledger_path).unwrap();

87
src/ledger_write_stage.rs Normal file
View File

@ -0,0 +1,87 @@
//! The `ledger_write_stage` module implements the ledger write stage. It
//! writes entries to the given writer, which is typically a file
use counter::Counter;
use entry::{EntryReceiver, EntrySender};
use ledger::LedgerWriter;
use log::Level;
use result::{Error, Result};
use service::Service;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::RecvTimeoutError;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
pub struct LedgerWriteStage {
write_thread: JoinHandle<()>,
}
impl LedgerWriteStage {
pub fn write(
ledger_writer: Option<&mut LedgerWriter>,
entry_receiver: &EntryReceiver,
forwarder: &Option<EntrySender>,
) -> Result<()> {
let mut ventries = Vec::new();
let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
loop {
ventries.push(received_entries);
if let Ok(n) = entry_receiver.try_recv() {
received_entries = n;
} else {
break;
}
}
if let Some(ledger_writer) = ledger_writer {
ledger_writer.write_entries(ventries.iter().flatten())?;
}
if let Some(forwarder) = forwarder {
for entries in ventries {
forwarder.send(entries)?;
}
}
Ok(())
}
pub fn new(
ledger_path: Option<&str>,
entry_receiver: EntryReceiver,
forwarder: Option<EntrySender>,
) -> Self {
let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap());
let write_thread = Builder::new()
.name("solana-ledger-writer".to_string())
.spawn(move || loop {
if let Err(e) = Self::write(ledger_writer.as_mut(), &entry_receiver, &forwarder) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
break;
}
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_info!(
"ledger-write_stage-write_and_send_entries-error",
1
);
error!("{:?}", e);
}
}
};
}).unwrap();
LedgerWriteStage { write_thread }
}
}
impl Service for LedgerWriteStage {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.write_thread.join()
}
}

View File

@ -34,7 +34,9 @@ pub mod fetch_stage;
pub mod fullnode; pub mod fullnode;
pub mod hash; pub mod hash;
pub mod leader_scheduler; pub mod leader_scheduler;
pub mod leader_vote_stage;
pub mod ledger; pub mod ledger;
pub mod ledger_write_stage;
pub mod loader_transaction; pub mod loader_transaction;
pub mod logger; pub mod logger;
pub mod metrics; pub mod metrics;
@ -80,7 +82,6 @@ pub mod vote_stage;
pub mod wallet; pub mod wallet;
pub mod window; pub mod window;
pub mod window_service; pub mod window_service;
pub mod write_stage;
extern crate bincode; extern crate bincode;
extern crate bs58; extern crate bs58;
extern crate byteorder; extern crate byteorder;

View File

@ -3,11 +3,11 @@
use bank::Bank; use bank::Bank;
use cluster_info::ClusterInfo; use cluster_info::ClusterInfo;
use counter::Counter; use counter::Counter;
use entry::EntryReceiver; use entry::{EntryReceiver, EntrySender};
use hash::Hash; use hash::Hash;
use influx_db_client as influxdb; use influx_db_client as influxdb;
use leader_scheduler::LeaderScheduler; use leader_scheduler::LeaderScheduler;
use ledger::{Block, LedgerWriter}; use ledger::Block;
use log::Level; use log::Level;
use metrics; use metrics;
use result::{Error, Result}; use result::{Error, Result};
@ -58,9 +58,9 @@ impl ReplicateStage {
bank: &Arc<Bank>, bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
window_receiver: &EntryReceiver, window_receiver: &EntryReceiver,
ledger_writer: Option<&mut LedgerWriter>,
keypair: &Arc<Keypair>, keypair: &Arc<Keypair>,
vote_blob_sender: Option<&BlobSender>, vote_blob_sender: Option<&BlobSender>,
ledger_entry_sender: &EntrySender,
tick_height: &mut u64, tick_height: &mut u64,
entry_height: &mut u64, entry_height: &mut u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>, leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
@ -149,8 +149,8 @@ impl ReplicateStage {
// TODO: In line with previous behavior, this will write all the entries even if // TODO: In line with previous behavior, this will write all the entries even if
// an error occurred processing one of the entries (causing the rest of the entries to // an error occurred processing one of the entries (causing the rest of the entries to
// not be processed). // not be processed).
if let Some(ledger_writer) = ledger_writer { if entries_len != 0 {
ledger_writer.write_entries(entries)?; ledger_entry_sender.send(entries)?;
} }
*entry_height += entries_len; *entry_height += entries_len;
@ -163,17 +163,16 @@ impl ReplicateStage {
bank: Arc<Bank>, bank: Arc<Bank>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
window_receiver: EntryReceiver, window_receiver: EntryReceiver,
ledger_path: Option<&str>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
tick_height: u64, tick_height: u64,
entry_height: u64, entry_height: u64,
leader_scheduler: Arc<RwLock<LeaderScheduler>>, leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> Self { ) -> (Self, EntryReceiver) {
let (vote_blob_sender, vote_blob_receiver) = channel(); let (vote_blob_sender, vote_blob_receiver) = channel();
let (ledger_entry_sender, ledger_entry_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
let t_responder = responder("replicate_stage", Arc::new(send), vote_blob_receiver); let t_responder = responder("replicate_stage", Arc::new(send), vote_blob_receiver);
let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap());
let keypair = Arc::new(keypair); let keypair = Arc::new(keypair);
let t_replicate = Builder::new() let t_replicate = Builder::new()
@ -215,9 +214,9 @@ impl ReplicateStage {
&bank, &bank,
&cluster_info, &cluster_info,
&window_receiver, &window_receiver,
ledger_writer.as_mut(),
&keypair, &keypair,
vote_sender, vote_sender,
&ledger_entry_sender,
&mut tick_height_, &mut tick_height_,
&mut entry_height_, &mut entry_height_,
&leader_scheduler, &leader_scheduler,
@ -234,10 +233,13 @@ impl ReplicateStage {
None None
}).unwrap(); }).unwrap();
(
ReplicateStage { ReplicateStage {
t_responder, t_responder,
t_replicate, t_replicate,
} },
ledger_entry_receiver,
)
} }
} }
@ -301,7 +303,7 @@ mod test {
let active_set_entries_len = active_set_entries.len() as u64; let active_set_entries_len = active_set_entries.len() as u64;
let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height;
let initial_entry_len = genesis_entries.len() as u64 + active_set_entries_len; let initial_entry_len = genesis_entries.len() as u64 + active_set_entries_len;
ledger_writer.write_entries(active_set_entries).unwrap(); ledger_writer.write_entries(&active_set_entries).unwrap();
// Set up the LeaderScheduler so that this this node becomes the leader at // Set up the LeaderScheduler so that this this node becomes the leader at
// bootstrap_height = num_bootstrap_slots * leader_rotation_interval // bootstrap_height = num_bootstrap_slots * leader_rotation_interval
@ -328,12 +330,11 @@ mod test {
// Set up the replicate stage // Set up the replicate stage
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let replicate_stage = ReplicateStage::new( let (replicate_stage, _ledger_writer_recv) = ReplicateStage::new(
Arc::new(my_keypair), Arc::new(my_keypair),
Arc::new(bank), Arc::new(bank),
Arc::new(RwLock::new(cluster_info_me)), Arc::new(RwLock::new(cluster_info_me)),
entry_receiver, entry_receiver,
Some(&my_ledger_path),
exit.clone(), exit.clone(),
initial_tick_height, initial_tick_height,
initial_entry_len, initial_entry_len,
@ -381,11 +382,6 @@ mod test {
.expect("RwLock for LeaderScheduler is still locked"); .expect("RwLock for LeaderScheduler is still locked");
leader_scheduler.reset(); leader_scheduler.reset();
let (_, tick_height, entry_height, _) =
Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler);
assert_eq!(tick_height, bootstrap_height);
assert_eq!(entry_height, expected_entry_height);
let _ignored = remove_dir_all(&my_ledger_path); let _ignored = remove_dir_all(&my_ledger_path);
} }
} }

View File

@ -33,7 +33,7 @@ impl StoreLedgerStage {
); );
if let Some(ledger_writer) = ledger_writer { if let Some(ledger_writer) = ledger_writer {
ledger_writer.write_entries(entries)?; ledger_writer.write_entries(&entries)?;
} }
Ok(()) Ok(())

View File

@ -31,15 +31,17 @@ use cluster_info::ClusterInfo;
use entry::Entry; use entry::Entry;
use fetch_stage::FetchStage; use fetch_stage::FetchStage;
use hash::Hash; use hash::Hash;
use leader_vote_stage::LeaderVoteStage;
use ledger_write_stage::LedgerWriteStage;
use service::Service; use service::Service;
use signature::Keypair; use signature::Keypair;
use sigverify_stage::SigVerifyStage; use sigverify_stage::SigVerifyStage;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::mpsc::Receiver; use std::sync::mpsc::Receiver;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
use write_stage::WriteStage;
pub enum TpuReturnType { pub enum TpuReturnType {
LeaderRotation, LeaderRotation,
@ -49,7 +51,8 @@ pub struct Tpu {
fetch_stage: FetchStage, fetch_stage: FetchStage,
sigverify_stage: SigVerifyStage, sigverify_stage: SigVerifyStage,
banking_stage: BankingStage, banking_stage: BankingStage,
write_stage: WriteStage, leader_vote_stage: LeaderVoteStage,
ledger_write_stage: LedgerWriteStage,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
} }
@ -83,19 +86,22 @@ impl Tpu {
max_tick_height, max_tick_height,
); );
let (write_stage, entry_forwarder) = WriteStage::new( let (leader_vote_stage, ledger_entry_receiver) =
keypair, LeaderVoteStage::new(keypair, bank.clone(), cluster_info.clone(), entry_receiver);
bank.clone(),
cluster_info.clone(), let (ledger_entry_sender, entry_forwarder) = channel();
ledger_path, let ledger_write_stage = LedgerWriteStage::new(
entry_receiver, Some(ledger_path),
ledger_entry_receiver,
Some(ledger_entry_sender),
); );
let tpu = Tpu { let tpu = Tpu {
fetch_stage, fetch_stage,
sigverify_stage, sigverify_stage,
banking_stage, banking_stage,
write_stage, leader_vote_stage,
ledger_write_stage,
exit: exit.clone(), exit: exit.clone(),
}; };
(tpu, entry_forwarder, exit) (tpu, entry_forwarder, exit)
@ -121,7 +127,8 @@ impl Service for Tpu {
fn join(self) -> thread::Result<(Option<TpuReturnType>)> { fn join(self) -> thread::Result<(Option<TpuReturnType>)> {
self.fetch_stage.join()?; self.fetch_stage.join()?;
self.sigverify_stage.join()?; self.sigverify_stage.join()?;
self.write_stage.join()?; self.leader_vote_stage.join()?;
self.ledger_write_stage.join()?;
match self.banking_stage.join()? { match self.banking_stage.join()? {
Some(BankingStageReturnType::LeaderRotation) => Ok(Some(TpuReturnType::LeaderRotation)), Some(BankingStageReturnType::LeaderRotation) => Ok(Some(TpuReturnType::LeaderRotation)),
_ => Ok(None), _ => Ok(None),

View File

@ -41,6 +41,7 @@ use blob_fetch_stage::BlobFetchStage;
use cluster_info::ClusterInfo; use cluster_info::ClusterInfo;
use hash::Hash; use hash::Hash;
use leader_scheduler::LeaderScheduler; use leader_scheduler::LeaderScheduler;
use ledger_write_stage::LedgerWriteStage;
use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; use replicate_stage::{ReplicateStage, ReplicateStageReturnType};
use retransmit_stage::RetransmitStage; use retransmit_stage::RetransmitStage;
use service::Service; use service::Service;
@ -60,6 +61,7 @@ pub struct Tvu {
replicate_stage: ReplicateStage, replicate_stage: ReplicateStage,
fetch_stage: BlobFetchStage, fetch_stage: BlobFetchStage,
retransmit_stage: RetransmitStage, retransmit_stage: RetransmitStage,
ledger_write_stage: LedgerWriteStage,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
} }
@ -111,22 +113,24 @@ impl Tvu {
leader_scheduler.clone(), leader_scheduler.clone(),
); );
let replicate_stage = ReplicateStage::new( let (replicate_stage, ledger_entry_receiver) = ReplicateStage::new(
keypair, keypair,
bank.clone(), bank.clone(),
cluster_info, cluster_info,
blob_window_receiver, blob_window_receiver,
ledger_path,
exit.clone(), exit.clone(),
tick_height, tick_height,
entry_height, entry_height,
leader_scheduler, leader_scheduler,
); );
let ledger_write_stage = LedgerWriteStage::new(ledger_path, ledger_entry_receiver, None);
Tvu { Tvu {
replicate_stage, replicate_stage,
fetch_stage, fetch_stage,
retransmit_stage, retransmit_stage,
ledger_write_stage,
exit, exit,
} }
} }
@ -151,6 +155,7 @@ impl Service for Tvu {
fn join(self) -> thread::Result<Option<TvuReturnType>> { fn join(self) -> thread::Result<Option<TvuReturnType>> {
self.retransmit_stage.join()?; self.retransmit_stage.join()?;
self.fetch_stage.join()?; self.fetch_stage.join()?;
self.ledger_write_stage.join()?;
match self.replicate_stage.join()? { match self.replicate_stage.join()? {
Some(ReplicateStageReturnType::LeaderRotation( Some(ReplicateStageReturnType::LeaderRotation(
tick_height, tick_height,

View File

@ -129,7 +129,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
let entries = make_tiny_test_entries(alice.last_id(), WINDOW_SIZE as usize); let entries = make_tiny_test_entries(alice.last_id(), WINDOW_SIZE as usize);
let mut writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); let mut writer = LedgerWriter::open(&leader_ledger_path, false).unwrap();
writer.write_entries(entries).unwrap(); writer.write_entries(&entries).unwrap();
} }
let leader = Fullnode::new( let leader = Fullnode::new(
@ -802,7 +802,7 @@ fn test_leader_to_validator_transition() {
let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap();
let bootstrap_entries = let bootstrap_entries =
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0);
ledger_writer.write_entries(bootstrap_entries).unwrap(); ledger_writer.write_entries(&bootstrap_entries).unwrap();
// Start the leader node // Start the leader node
let bootstrap_height = leader_rotation_interval; let bootstrap_height = leader_rotation_interval;
@ -932,7 +932,7 @@ fn test_leader_validator_basic() {
let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap();
let active_set_entries = let active_set_entries =
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0);
ledger_writer.write_entries(active_set_entries).unwrap(); ledger_writer.write_entries(&active_set_entries).unwrap();
// Create the leader scheduler config // Create the leader scheduler config
let num_bootstrap_slots = 2; let num_bootstrap_slots = 2;
@ -1100,7 +1100,7 @@ fn test_dropped_handoff_recovery() {
// Write the entries // Write the entries
let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap();
ledger_writer.write_entries(active_set_entries).unwrap(); ledger_writer.write_entries(&active_set_entries).unwrap();
let next_leader_ledger_path = tmp_copy_ledger( let next_leader_ledger_path = tmp_copy_ledger(
&bootstrap_leader_ledger_path, &bootstrap_leader_ledger_path,
@ -1263,7 +1263,7 @@ fn test_full_leader_validator_network() {
.last() .last()
.expect("expected at least one genesis entry") .expect("expected at least one genesis entry")
.id; .id;
ledger_writer.write_entries(bootstrap_entries).unwrap(); ledger_writer.write_entries(&bootstrap_entries).unwrap();
} }
// Create the common leader scheduling configuration // Create the common leader scheduling configuration