From ded28c705f46e2e020b16163db567fdd71b54a06 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 9 May 2018 12:25:13 -0600 Subject: [PATCH] Tuck away the Historian The Historian is now just a utility of the accounting stage. --- src/accounting_stage.rs | 13 +++++-------- src/bin/testnode.rs | 8 +++----- src/thin_client.rs | 18 +++++++----------- src/tpu.rs | 18 ++++++------------ 4 files changed, 21 insertions(+), 36 deletions(-) diff --git a/src/accounting_stage.rs b/src/accounting_stage.rs index 74bea95efc..5de02a6565 100644 --- a/src/accounting_stage.rs +++ b/src/accounting_stage.rs @@ -25,7 +25,9 @@ pub struct AccountingStage { impl AccountingStage { /// Create a new Tpu that wraps the given Accountant. - pub fn new(acc: Accountant, historian_input: Sender, historian: Historian) -> Self { + pub fn new(acc: Accountant, start_hash: &Hash, ms_per_tick: Option) -> Self { + let (historian_input, event_receiver) = channel(); + let historian = Historian::new(event_receiver, start_hash, ms_per_tick); let (entry_sender, output) = channel(); AccountingStage { output: Arc::new(Mutex::new(output)), @@ -157,10 +159,8 @@ mod tests { use accounting_stage::AccountingStage; use entry::Entry; use event::Event; - use historian::Historian; use mint::Mint; use signature::{KeyPair, KeyPairUtil}; - use std::sync::mpsc::channel; use transaction::Transaction; #[test] @@ -170,9 +170,7 @@ mod tests { // Entry OR if the verifier tries to parallelize across multiple Entries. let mint = Mint::new(2); let acc = Accountant::new(&mint); - let (input, event_receiver) = channel(); - let historian = Historian::new(event_receiver, &mint.last_id(), None); - let stage = AccountingStage::new(acc, input, historian); + let stage = AccountingStage::new(acc, &mint.last_id(), None); // Process a batch that includes a transaction that receives two tokens. let alice = KeyPair::new(); @@ -262,8 +260,7 @@ mod bench { .collect(); let (input, event_receiver) = channel(); - let historian = Historian::new(event_receiver, &mint.last_id(), None); - let stage = AccountingStage::new(acc, input, historian); + let stage = AccountingStage::new(acc, &mint.last_id(), None); let now = Instant::now(); assert!(stage.process_events(events).is_ok()); diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index b2bfd17c44..cddd4e1e7e 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -7,10 +7,10 @@ extern crate solana; use getopts::Options; use isatty::stdin_isatty; use solana::accountant::Accountant; +use solana::accounting_stage::AccountingStage; use solana::crdt::ReplicatedData; use solana::entry::Entry; use solana::event::Event; -use solana::historian::Historian; use solana::signature::{KeyPair, KeyPairUtil}; use solana::tpu::Tpu; use std::env; @@ -18,7 +18,6 @@ use std::io::{stdin, stdout, Read}; use std::net::UdpSocket; use std::process::exit; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::channel; use std::sync::Arc; fn print_usage(program: &str, opts: Options) { @@ -116,10 +115,9 @@ fn main() { eprintln!("creating networking stack..."); - let (input, event_receiver) = channel(); - let historian = Historian::new(event_receiver, &last_id, Some(1000)); + let accounting = AccountingStage::new(acc, &last_id, Some(1000)); let exit = Arc::new(AtomicBool::new(false)); - let tpu = Arc::new(Tpu::new(acc, input, historian)); + let tpu = Arc::new(Tpu::new(accounting)); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); diff --git a/src/thin_client.rs b/src/thin_client.rs index 096b43eaad..c33a2314f5 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -148,15 +148,14 @@ impl ThinClient { mod tests { use super::*; use accountant::Accountant; + use accounting_stage::AccountingStage; use crdt::{Crdt, ReplicatedData}; use futures::Future; - use historian::Historian; use logger; use mint::Mint; use signature::{KeyPair, KeyPairUtil}; use std::io::sink; use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; @@ -183,9 +182,8 @@ mod tests { let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let (input, event_receiver) = channel(); - let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc = Arc::new(Tpu::new(acc, input, historian)); + let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30)); + let acc = Arc::new(Tpu::new(accounting)); let threads = Tpu::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(300)); @@ -240,17 +238,15 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let leader_acc = { - let (input, event_receiver) = channel(); - let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let acc = Accountant::new(&alice); - Arc::new(Tpu::new(acc, input, historian)) + let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30)); + Arc::new(Tpu::new(accounting)) }; let replicant_acc = { - let (input, event_receiver) = channel(); - let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let acc = Accountant::new(&alice); - Arc::new(Tpu::new(acc, input, historian)) + let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30)); + Arc::new(Tpu::new(accounting)) }; let leader_threads = Tpu::serve( diff --git a/src/tpu.rs b/src/tpu.rs index 406e3b8913..58e5623daa 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -1,19 +1,16 @@ //! The `tpu` module implements the Transaction Processing Unit, a //! 5-stage transaction processing pipeline in software. -use accountant::Accountant; use accounting_stage::{AccountingStage, Request, Response}; use bincode::{deserialize, serialize, serialize_into}; use crdt::{Crdt, ReplicatedData}; use ecdsa; use entry::Entry; use event::Event; -use historian::Historian; use packet; use packet::{SharedBlob, SharedPackets, BLOB_SIZE}; use rand::{thread_rng, Rng}; use rayon::prelude::*; -use recorder::Signal; use result::Result; use serde_json; use std::collections::VecDeque; @@ -38,8 +35,7 @@ type SharedTpu = Arc; impl Tpu { /// Create a new Tpu that wraps the given Accountant. - pub fn new(acc: Accountant, historian_input: Sender, historian: Historian) -> Self { - let accounting = AccountingStage::new(acc, historian_input, historian); + pub fn new(accounting: AccountingStage) -> Self { Tpu { accounting } } @@ -676,6 +672,7 @@ mod tests { use transaction::{memfind, test_tx}; use accountant::Accountant; + use accounting_stage::AccountingStage; use chrono::prelude::*; use crdt::Crdt; use crdt::ReplicatedData; @@ -683,7 +680,6 @@ mod tests { use event::Event; use futures::Future; use hash::{hash, Hash}; - use historian::Historian; use logger; use mint::Mint; use plan::Plan; @@ -734,9 +730,8 @@ mod tests { let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let (input, event_receiver) = channel(); - let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let tpu = Arc::new(Tpu::new(acc, input, historian)); + let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30)); + let tpu = Arc::new(Tpu::new(accounting)); let serve_addr = leader_serve.local_addr().unwrap(); let threads = Tpu::serve( &tpu, @@ -843,9 +838,8 @@ mod tests { let starting_balance = 10_000; let alice = Mint::new(starting_balance); let acc = Accountant::new(&alice); - let (input, event_receiver) = channel(); - let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let tpu = Arc::new(Tpu::new(acc, input, historian)); + let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30)); + let tpu = Arc::new(Tpu::new(accounting)); let replicate_addr = target1_data.replicate_addr; let threads = Tpu::replicate( &tpu,