Merge pull request #166 from garious/refactor-historian
TPU-friendly Historian
This commit is contained in:
		@@ -21,7 +21,7 @@ use std::collections::VecDeque;
 | 
			
		||||
use std::io::Write;
 | 
			
		||||
use std::net::{SocketAddr, UdpSocket};
 | 
			
		||||
use std::sync::atomic::{AtomicBool, Ordering};
 | 
			
		||||
use std::sync::mpsc::{channel, Receiver, Sender};
 | 
			
		||||
use std::sync::mpsc::{channel, Receiver, Sender, SyncSender};
 | 
			
		||||
use std::sync::{Arc, Mutex, RwLock};
 | 
			
		||||
use std::thread::{spawn, JoinHandle};
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
@@ -34,6 +34,7 @@ pub struct AccountantSkel<W: Write + Send + 'static> {
 | 
			
		||||
    acc: Accountant,
 | 
			
		||||
    last_id: Hash,
 | 
			
		||||
    writer: W,
 | 
			
		||||
    historian_input: SyncSender<Signal>,
 | 
			
		||||
    historian: Historian,
 | 
			
		||||
    entry_info_subscribers: Vec<SocketAddr>,
 | 
			
		||||
}
 | 
			
		||||
@@ -78,11 +79,18 @@ pub enum Response {
 | 
			
		||||
 | 
			
		||||
impl<W: Write + Send + 'static> AccountantSkel<W> {
 | 
			
		||||
    /// Create a new AccountantSkel that wraps the given Accountant.
 | 
			
		||||
    pub fn new(acc: Accountant, last_id: Hash, writer: W, historian: Historian) -> Self {
 | 
			
		||||
    pub fn new(
 | 
			
		||||
        acc: Accountant,
 | 
			
		||||
        last_id: Hash,
 | 
			
		||||
        writer: W,
 | 
			
		||||
        historian_input: SyncSender<Signal>,
 | 
			
		||||
        historian: Historian,
 | 
			
		||||
    ) -> Self {
 | 
			
		||||
        AccountantSkel {
 | 
			
		||||
            acc,
 | 
			
		||||
            last_id,
 | 
			
		||||
            writer,
 | 
			
		||||
            historian_input,
 | 
			
		||||
            historian,
 | 
			
		||||
            entry_info_subscribers: vec![],
 | 
			
		||||
        }
 | 
			
		||||
@@ -105,7 +113,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
 | 
			
		||||
 | 
			
		||||
    /// Process any Entry items that have been published by the Historian.
 | 
			
		||||
    pub fn sync(&mut self) -> Hash {
 | 
			
		||||
        while let Ok(entry) = self.historian.receiver.try_recv() {
 | 
			
		||||
        while let Ok(entry) = self.historian.output.try_recv() {
 | 
			
		||||
            self.last_id = entry.id;
 | 
			
		||||
            self.acc.register_entry_id(&self.last_id);
 | 
			
		||||
            writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
 | 
			
		||||
@@ -214,15 +222,14 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
 | 
			
		||||
        // Process the transactions in parallel and then log the successful ones.
 | 
			
		||||
        for result in self.acc.process_verified_transactions(trs) {
 | 
			
		||||
            if let Ok(tr) = result {
 | 
			
		||||
                self.historian
 | 
			
		||||
                    .sender
 | 
			
		||||
                self.historian_input
 | 
			
		||||
                    .send(Signal::Event(Event::Transaction(tr)))?;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Let validators know they should not attempt to process additional
 | 
			
		||||
        // transactions in parallel.
 | 
			
		||||
        self.historian.sender.send(Signal::Tick)?;
 | 
			
		||||
        self.historian_input.send(Signal::Tick)?;
 | 
			
		||||
 | 
			
		||||
        // Process the remaining requests serially.
 | 
			
		||||
        let rsps = reqs.into_iter()
 | 
			
		||||
@@ -482,6 +489,7 @@ mod tests {
 | 
			
		||||
    use std::io::sink;
 | 
			
		||||
    use std::net::{SocketAddr, UdpSocket};
 | 
			
		||||
    use std::sync::atomic::{AtomicBool, Ordering};
 | 
			
		||||
    use std::sync::mpsc::sync_channel;
 | 
			
		||||
    use std::sync::{Arc, Mutex};
 | 
			
		||||
    use std::thread::sleep;
 | 
			
		||||
    use std::time::Duration;
 | 
			
		||||
@@ -530,8 +538,9 @@ mod tests {
 | 
			
		||||
        let mint = Mint::new(2);
 | 
			
		||||
        let acc = Accountant::new(&mint);
 | 
			
		||||
        let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
 | 
			
		||||
        let historian = Historian::new(&mint.last_id(), None);
 | 
			
		||||
        let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian);
 | 
			
		||||
        let (input, event_receiver) = sync_channel(10);
 | 
			
		||||
        let historian = Historian::new(event_receiver, &mint.last_id(), None);
 | 
			
		||||
        let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), input, historian);
 | 
			
		||||
 | 
			
		||||
        // Process a batch that includes a transaction that receives two tokens.
 | 
			
		||||
        let alice = KeyPair::new();
 | 
			
		||||
@@ -545,9 +554,9 @@ mod tests {
 | 
			
		||||
        assert!(skel.process_packets(req_vers).is_ok());
 | 
			
		||||
 | 
			
		||||
        // Collect the ledger and feed it to a new accountant.
 | 
			
		||||
        skel.historian.sender.send(Signal::Tick).unwrap();
 | 
			
		||||
        drop(skel.historian.sender);
 | 
			
		||||
        let entries: Vec<Entry> = skel.historian.receiver.iter().collect();
 | 
			
		||||
        skel.historian_input.send(Signal::Tick).unwrap();
 | 
			
		||||
        drop(skel.historian_input);
 | 
			
		||||
        let entries: Vec<Entry> = skel.historian.output.iter().collect();
 | 
			
		||||
 | 
			
		||||
        // Assert the user holds one token, not two. If the server only output one
 | 
			
		||||
        // entry, then the second transaction will be rejected, because it drives
 | 
			
		||||
@@ -569,11 +578,13 @@ mod tests {
 | 
			
		||||
        let acc = Accountant::new(&alice);
 | 
			
		||||
        let bob_pubkey = KeyPair::new().pubkey();
 | 
			
		||||
        let exit = Arc::new(AtomicBool::new(false));
 | 
			
		||||
        let historian = Historian::new(&alice.last_id(), Some(30));
 | 
			
		||||
        let (input, event_receiver) = sync_channel(10);
 | 
			
		||||
        let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
 | 
			
		||||
        let acc = Arc::new(Mutex::new(AccountantSkel::new(
 | 
			
		||||
            acc,
 | 
			
		||||
            alice.last_id(),
 | 
			
		||||
            sink(),
 | 
			
		||||
            input,
 | 
			
		||||
            historian,
 | 
			
		||||
        )));
 | 
			
		||||
        let _threads = AccountantSkel::serve(&acc, &addr, exit.clone()).unwrap();
 | 
			
		||||
@@ -651,11 +662,13 @@ mod tests {
 | 
			
		||||
        let starting_balance = 10_000;
 | 
			
		||||
        let alice = Mint::new(starting_balance);
 | 
			
		||||
        let acc = Accountant::new(&alice);
 | 
			
		||||
        let historian = Historian::new(&alice.last_id(), Some(30));
 | 
			
		||||
        let (input, event_receiver) = sync_channel(10);
 | 
			
		||||
        let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
 | 
			
		||||
        let acc = Arc::new(Mutex::new(AccountantSkel::new(
 | 
			
		||||
            acc,
 | 
			
		||||
            alice.last_id(),
 | 
			
		||||
            sink(),
 | 
			
		||||
            input,
 | 
			
		||||
            historian,
 | 
			
		||||
        )));
 | 
			
		||||
 | 
			
		||||
@@ -790,8 +803,9 @@ mod bench {
 | 
			
		||||
            .map(|tr| (Request::Transaction(tr), rsp_addr, 1_u8))
 | 
			
		||||
            .collect();
 | 
			
		||||
 | 
			
		||||
        let historian = Historian::new(&mint.last_id(), None);
 | 
			
		||||
        let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian);
 | 
			
		||||
        let (input, event_receiver) = sync_channel(10);
 | 
			
		||||
        let historian = Historian::new(event_receiver, &mint.last_id(), None);
 | 
			
		||||
        let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), input, historian);
 | 
			
		||||
 | 
			
		||||
        let now = Instant::now();
 | 
			
		||||
        assert!(skel.process_packets(req_vers).is_ok());
 | 
			
		||||
@@ -800,8 +814,8 @@ mod bench {
 | 
			
		||||
        let tps = txs as f64 / sec;
 | 
			
		||||
 | 
			
		||||
        // Ensure that all transactions were successfully logged.
 | 
			
		||||
        drop(skel.historian.sender);
 | 
			
		||||
        let entries: Vec<Entry> = skel.historian.receiver.iter().collect();
 | 
			
		||||
        drop(input);
 | 
			
		||||
        let entries: Vec<Entry> = skel.historian.output.iter().collect();
 | 
			
		||||
        assert_eq!(entries.len(), 1);
 | 
			
		||||
        assert_eq!(entries[0].events.len(), txs as usize);
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -165,6 +165,7 @@ mod tests {
 | 
			
		||||
    use signature::{KeyPair, KeyPairUtil};
 | 
			
		||||
    use std::io::sink;
 | 
			
		||||
    use std::sync::atomic::{AtomicBool, Ordering};
 | 
			
		||||
    use std::sync::mpsc::sync_channel;
 | 
			
		||||
    use std::sync::{Arc, Mutex};
 | 
			
		||||
    use std::thread::sleep;
 | 
			
		||||
    use std::time::Duration;
 | 
			
		||||
@@ -178,11 +179,13 @@ mod tests {
 | 
			
		||||
        let acc = Accountant::new(&alice);
 | 
			
		||||
        let bob_pubkey = KeyPair::new().pubkey();
 | 
			
		||||
        let exit = Arc::new(AtomicBool::new(false));
 | 
			
		||||
        let historian = Historian::new(&alice.last_id(), Some(30));
 | 
			
		||||
        let (input, event_receiver) = sync_channel(10);
 | 
			
		||||
        let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
 | 
			
		||||
        let acc = Arc::new(Mutex::new(AccountantSkel::new(
 | 
			
		||||
            acc,
 | 
			
		||||
            alice.last_id(),
 | 
			
		||||
            sink(),
 | 
			
		||||
            input,
 | 
			
		||||
            historian,
 | 
			
		||||
        )));
 | 
			
		||||
        let _threads = AccountantSkel::serve(&acc, addr, exit.clone()).unwrap();
 | 
			
		||||
 
 | 
			
		||||
@@ -8,26 +8,27 @@ use solana::ledger::Block;
 | 
			
		||||
use solana::recorder::Signal;
 | 
			
		||||
use solana::signature::{KeyPair, KeyPairUtil};
 | 
			
		||||
use solana::transaction::Transaction;
 | 
			
		||||
use std::sync::mpsc::SendError;
 | 
			
		||||
use std::sync::mpsc::{sync_channel, SendError, SyncSender};
 | 
			
		||||
use std::thread::sleep;
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
 | 
			
		||||
fn create_ledger(hist: &Historian, seed: &Hash) -> Result<(), SendError<Signal>> {
 | 
			
		||||
fn create_ledger(input: &SyncSender<Signal>, seed: &Hash) -> Result<(), SendError<Signal>> {
 | 
			
		||||
    sleep(Duration::from_millis(15));
 | 
			
		||||
    let keypair = KeyPair::new();
 | 
			
		||||
    let tr = Transaction::new(&keypair, keypair.pubkey(), 42, *seed);
 | 
			
		||||
    let signal0 = Signal::Event(Event::Transaction(tr));
 | 
			
		||||
    hist.sender.send(signal0)?;
 | 
			
		||||
    input.send(signal0)?;
 | 
			
		||||
    sleep(Duration::from_millis(10));
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn main() {
 | 
			
		||||
    let (input, event_receiver) = sync_channel(10);
 | 
			
		||||
    let seed = Hash::default();
 | 
			
		||||
    let hist = Historian::new(&seed, Some(10));
 | 
			
		||||
    create_ledger(&hist, &seed).expect("send error");
 | 
			
		||||
    drop(hist.sender);
 | 
			
		||||
    let entries: Vec<Entry> = hist.receiver.iter().collect();
 | 
			
		||||
    let hist = Historian::new(event_receiver, &seed, Some(10));
 | 
			
		||||
    create_ledger(&input, &seed).expect("send error");
 | 
			
		||||
    drop(input);
 | 
			
		||||
    let entries: Vec<Entry> = hist.output.iter().collect();
 | 
			
		||||
    for entry in &entries {
 | 
			
		||||
        println!("{:?}", entry);
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
@@ -15,6 +15,7 @@ use std::env;
 | 
			
		||||
use std::io::{stdin, stdout, Read};
 | 
			
		||||
use std::process::exit;
 | 
			
		||||
use std::sync::atomic::AtomicBool;
 | 
			
		||||
use std::sync::mpsc::sync_channel;
 | 
			
		||||
use std::sync::{Arc, Mutex};
 | 
			
		||||
 | 
			
		||||
fn print_usage(program: &str, opts: Options) {
 | 
			
		||||
@@ -95,12 +96,14 @@ fn main() {
 | 
			
		||||
        acc.register_entry_id(&last_id);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let historian = Historian::new(&last_id, Some(1000));
 | 
			
		||||
    let (input, event_receiver) = sync_channel(10_000);
 | 
			
		||||
    let historian = Historian::new(event_receiver, &last_id, Some(1000));
 | 
			
		||||
    let exit = Arc::new(AtomicBool::new(false));
 | 
			
		||||
    let skel = Arc::new(Mutex::new(AccountantSkel::new(
 | 
			
		||||
        acc,
 | 
			
		||||
        last_id,
 | 
			
		||||
        stdout(),
 | 
			
		||||
        input,
 | 
			
		||||
        historian,
 | 
			
		||||
    )));
 | 
			
		||||
    let threads = AccountantSkel::serve(&skel, &addr, exit.clone()).unwrap();
 | 
			
		||||
 
 | 
			
		||||
@@ -9,22 +9,20 @@ use std::thread::{spawn, JoinHandle};
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
 | 
			
		||||
pub struct Historian {
 | 
			
		||||
    pub sender: SyncSender<Signal>,
 | 
			
		||||
    pub receiver: Receiver<Entry>,
 | 
			
		||||
    pub output: Receiver<Entry>,
 | 
			
		||||
    pub thread_hdl: JoinHandle<ExitReason>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Historian {
 | 
			
		||||
    pub fn new(start_hash: &Hash, ms_per_tick: Option<u64>) -> Self {
 | 
			
		||||
        let (sender, event_receiver) = sync_channel(10_000);
 | 
			
		||||
        let (entry_sender, receiver) = sync_channel(10_000);
 | 
			
		||||
    pub fn new(
 | 
			
		||||
        event_receiver: Receiver<Signal>,
 | 
			
		||||
        start_hash: &Hash,
 | 
			
		||||
        ms_per_tick: Option<u64>,
 | 
			
		||||
    ) -> Self {
 | 
			
		||||
        let (entry_sender, output) = sync_channel(10_000);
 | 
			
		||||
        let thread_hdl =
 | 
			
		||||
            Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender);
 | 
			
		||||
        Historian {
 | 
			
		||||
            sender,
 | 
			
		||||
            receiver,
 | 
			
		||||
            thread_hdl,
 | 
			
		||||
        }
 | 
			
		||||
        Historian { output, thread_hdl }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// A background thread that will continue tagging received Event messages and
 | 
			
		||||
@@ -59,24 +57,25 @@ mod tests {
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_historian() {
 | 
			
		||||
        let (input, event_receiver) = sync_channel(10);
 | 
			
		||||
        let zero = Hash::default();
 | 
			
		||||
        let hist = Historian::new(&zero, None);
 | 
			
		||||
        let hist = Historian::new(event_receiver, &zero, None);
 | 
			
		||||
 | 
			
		||||
        hist.sender.send(Signal::Tick).unwrap();
 | 
			
		||||
        input.send(Signal::Tick).unwrap();
 | 
			
		||||
        sleep(Duration::new(0, 1_000_000));
 | 
			
		||||
        hist.sender.send(Signal::Tick).unwrap();
 | 
			
		||||
        input.send(Signal::Tick).unwrap();
 | 
			
		||||
        sleep(Duration::new(0, 1_000_000));
 | 
			
		||||
        hist.sender.send(Signal::Tick).unwrap();
 | 
			
		||||
        input.send(Signal::Tick).unwrap();
 | 
			
		||||
 | 
			
		||||
        let entry0 = hist.receiver.recv().unwrap();
 | 
			
		||||
        let entry1 = hist.receiver.recv().unwrap();
 | 
			
		||||
        let entry2 = hist.receiver.recv().unwrap();
 | 
			
		||||
        let entry0 = hist.output.recv().unwrap();
 | 
			
		||||
        let entry1 = hist.output.recv().unwrap();
 | 
			
		||||
        let entry2 = hist.output.recv().unwrap();
 | 
			
		||||
 | 
			
		||||
        assert_eq!(entry0.num_hashes, 0);
 | 
			
		||||
        assert_eq!(entry1.num_hashes, 0);
 | 
			
		||||
        assert_eq!(entry2.num_hashes, 0);
 | 
			
		||||
 | 
			
		||||
        drop(hist.sender);
 | 
			
		||||
        drop(input);
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            hist.thread_hdl.join().unwrap(),
 | 
			
		||||
            ExitReason::RecvDisconnected
 | 
			
		||||
@@ -87,10 +86,11 @@ mod tests {
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_historian_closed_sender() {
 | 
			
		||||
        let (input, event_receiver) = sync_channel(10);
 | 
			
		||||
        let zero = Hash::default();
 | 
			
		||||
        let hist = Historian::new(&zero, None);
 | 
			
		||||
        drop(hist.receiver);
 | 
			
		||||
        hist.sender.send(Signal::Tick).unwrap();
 | 
			
		||||
        let hist = Historian::new(event_receiver, &zero, None);
 | 
			
		||||
        drop(hist.output);
 | 
			
		||||
        input.send(Signal::Tick).unwrap();
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            hist.thread_hdl.join().unwrap(),
 | 
			
		||||
            ExitReason::SendDisconnected
 | 
			
		||||
@@ -99,12 +99,13 @@ mod tests {
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_ticking_historian() {
 | 
			
		||||
        let (input, event_receiver) = sync_channel(10);
 | 
			
		||||
        let zero = Hash::default();
 | 
			
		||||
        let hist = Historian::new(&zero, Some(20));
 | 
			
		||||
        let hist = Historian::new(event_receiver, &zero, Some(20));
 | 
			
		||||
        sleep(Duration::from_millis(300));
 | 
			
		||||
        hist.sender.send(Signal::Tick).unwrap();
 | 
			
		||||
        drop(hist.sender);
 | 
			
		||||
        let entries: Vec<Entry> = hist.receiver.iter().collect();
 | 
			
		||||
        input.send(Signal::Tick).unwrap();
 | 
			
		||||
        drop(input);
 | 
			
		||||
        let entries: Vec<Entry> = hist.output.iter().collect();
 | 
			
		||||
        assert!(entries.len() > 1);
 | 
			
		||||
 | 
			
		||||
        // Ensure the ID is not the seed.
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user