Config recorder with any kind of Duration, not just milliseconds

This commit is contained in:
Greg Fitzgerald
2018-05-14 14:12:36 -06:00
parent 27984e469a
commit 3f10bf44db
7 changed files with 47 additions and 26 deletions

View File

@ -19,6 +19,7 @@ use std::net::UdpSocket;
use std::process::exit; use std::process::exit;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::time::Duration;
fn print_usage(program: &str, opts: Options) { fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program); let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
@ -115,7 +116,8 @@ fn main() {
eprintln!("creating networking stack..."); eprintln!("creating networking stack...");
let event_processor = EventProcessor::new(accountant, &last_id, Some(1000)); let event_processor =
EventProcessor::new(accountant, &last_id, Some(Duration::from_millis(1000)));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let rpu = Rpu::new(event_processor); let rpu = Rpu::new(event_processor);
let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let serve_sock = UdpSocket::bind(&serve_addr).unwrap();

View File

@ -9,26 +9,27 @@ use recorder::Signal;
use result::Result; use result::Result;
use std::sync::mpsc::{channel, Sender}; use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration;
pub struct EventProcessor { pub struct EventProcessor {
pub accountant: Arc<Accountant>, pub accountant: Arc<Accountant>,
historian_input: Mutex<Sender<Signal>>, historian_input: Mutex<Sender<Signal>>,
historian: Mutex<Historian>, historian: Mutex<Historian>,
pub start_hash: Hash, pub start_hash: Hash,
pub ms_per_tick: Option<u64>, pub tick_duration: Option<Duration>,
} }
impl EventProcessor { impl EventProcessor {
/// Create a new stage of the TPU for event and transaction processing /// Create a new stage of the TPU for event and transaction processing
pub fn new(accountant: Accountant, start_hash: &Hash, ms_per_tick: Option<u64>) -> Self { pub fn new(accountant: Accountant, start_hash: &Hash, tick_duration: Option<Duration>) -> Self {
let (historian_input, event_receiver) = channel(); let (historian_input, event_receiver) = channel();
let historian = Historian::new(event_receiver, start_hash, ms_per_tick); let historian = Historian::new(event_receiver, start_hash, tick_duration);
EventProcessor { EventProcessor {
accountant: Arc::new(accountant), accountant: Arc::new(accountant),
historian_input: Mutex::new(historian_input), historian_input: Mutex::new(historian_input),
historian: Mutex::new(historian), historian: Mutex::new(historian),
start_hash: *start_hash, start_hash: *start_hash,
ms_per_tick, tick_duration,
} }
} }

View File

@ -6,7 +6,7 @@ use hash::Hash;
use recorder::{ExitReason, Recorder, Signal}; use recorder::{ExitReason, Recorder, Signal};
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Instant; use std::time::{Duration, Instant};
pub struct Historian { pub struct Historian {
pub entry_receiver: Receiver<Entry>, pub entry_receiver: Receiver<Entry>,
@ -17,11 +17,11 @@ impl Historian {
pub fn new( pub fn new(
event_receiver: Receiver<Signal>, event_receiver: Receiver<Signal>,
start_hash: &Hash, start_hash: &Hash,
ms_per_tick: Option<u64>, tick_duration: Option<Duration>,
) -> Self { ) -> Self {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let thread_hdl = let thread_hdl =
Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); Self::create_recorder(*start_hash, tick_duration, event_receiver, entry_sender);
Historian { Historian {
entry_receiver, entry_receiver,
thread_hdl, thread_hdl,
@ -32,18 +32,18 @@ impl Historian {
/// sending back Entry messages until either the receiver or sender channel is closed. /// sending back Entry messages until either the receiver or sender channel is closed.
fn create_recorder( fn create_recorder(
start_hash: Hash, start_hash: Hash,
ms_per_tick: Option<u64>, tick_duration: Option<Duration>,
receiver: Receiver<Signal>, receiver: Receiver<Signal>,
sender: Sender<Entry>, sender: Sender<Entry>,
) -> JoinHandle<ExitReason> { ) -> JoinHandle<ExitReason> {
spawn(move || { spawn(move || {
let mut recorder = Recorder::new(receiver, sender, start_hash); let mut recorder = Recorder::new(receiver, sender, start_hash);
let now = Instant::now(); let duration_data = tick_duration.map(|dur| (Instant::now(), dur));
loop { loop {
if let Err(err) = recorder.process_events(now, ms_per_tick) { if let Err(err) = recorder.process_events(duration_data) {
return err; return err;
} }
if ms_per_tick.is_some() { if duration_data.is_some() {
recorder.hash(); recorder.hash();
} }
} }
@ -60,7 +60,6 @@ mod tests {
use super::*; use super::*;
use ledger::Block; use ledger::Block;
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration;
#[test] #[test]
fn test_historian() { fn test_historian() {
@ -109,7 +108,7 @@ mod tests {
fn test_ticking_historian() { fn test_ticking_historian() {
let (input, event_receiver) = channel(); let (input, event_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let hist = Historian::new(event_receiver, &zero, Some(20)); let hist = Historian::new(event_receiver, &zero, Some(Duration::from_millis(20)));
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
input.send(Signal::Tick).unwrap(); input.send(Signal::Tick).unwrap();
drop(input); drop(input);

View File

@ -57,12 +57,11 @@ impl Recorder {
pub fn process_events( pub fn process_events(
&mut self, &mut self,
epoch: Instant, duration_data: Option<(Instant, Duration)>,
ms_per_tick: Option<u64>,
) -> Result<(), ExitReason> { ) -> Result<(), ExitReason> {
loop { loop {
if let Some(ms) = ms_per_tick { if let Some((start_time, tick_duration)) = duration_data {
if epoch.elapsed() > Duration::from_millis(ms) * (self.num_ticks + 1) { if start_time.elapsed() > tick_duration * (self.num_ticks + 1) {
self.record_entry(vec![])?; self.record_entry(vec![])?;
// TODO: don't let this overflow u32 // TODO: don't let this overflow u32
self.num_ticks += 1; self.num_ticks += 1;
@ -105,7 +104,7 @@ mod tests {
signal_sender signal_sender
.send(Signal::Events(vec![event0, event1])) .send(Signal::Events(vec![event0, event1]))
.unwrap(); .unwrap();
recorder.process_events(Instant::now(), None).unwrap(); recorder.process_events(None).unwrap();
drop(recorder.sender); drop(recorder.sender);
let entries: Vec<_> = entry_receiver.iter().collect(); let entries: Vec<_> = entry_receiver.iter().collect();

View File

@ -98,7 +98,7 @@ impl Rpu {
let historian_stage = Historian::new( let historian_stage = Historian::new(
request_stage.signal_receiver, request_stage.signal_receiver,
&self.event_processor.start_hash, &self.event_processor.start_hash,
self.event_processor.ms_per_tick, self.event_processor.tick_duration,
); );
let (broadcast_sender, broadcast_receiver) = channel(); let (broadcast_sender, broadcast_receiver) = channel();

View File

@ -191,7 +191,11 @@ mod tests {
let accountant = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); let event_processor = EventProcessor::new(
accountant,
&alice.last_id(),
Some(Duration::from_millis(30)),
);
let rpu = Rpu::new(event_processor); let rpu = Rpu::new(event_processor);
let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
@ -230,7 +234,11 @@ mod tests {
let accountant = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); let event_processor = EventProcessor::new(
accountant,
&alice.last_id(),
Some(Duration::from_millis(30)),
);
let rpu = Rpu::new(event_processor); let rpu = Rpu::new(event_processor);
let serve_addr = leader_serve.local_addr().unwrap(); let serve_addr = leader_serve.local_addr().unwrap();
let threads = rpu.serve( let threads = rpu.serve(
@ -300,13 +308,21 @@ mod tests {
let leader_acc = { let leader_acc = {
let accountant = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); let event_processor = EventProcessor::new(
accountant,
&alice.last_id(),
Some(Duration::from_millis(30)),
);
Rpu::new(event_processor) Rpu::new(event_processor)
}; };
let replicant_acc = { let replicant_acc = {
let accountant = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); let event_processor = EventProcessor::new(
accountant,
&alice.last_id(),
Some(Duration::from_millis(30)),
);
Arc::new(Tvu::new(event_processor)) Arc::new(Tvu::new(event_processor))
}; };

View File

@ -183,7 +183,7 @@ impl Tvu {
let historian_stage = Historian::new( let historian_stage = Historian::new(
request_stage.signal_receiver, request_stage.signal_receiver,
&obj.event_processor.start_hash, &obj.event_processor.start_hash,
obj.event_processor.ms_per_tick, obj.event_processor.tick_duration,
); );
let t_write = Self::drain_service( let t_write = Self::drain_service(
@ -311,7 +311,11 @@ mod tests {
let starting_balance = 10_000; let starting_balance = 10_000;
let alice = Mint::new(starting_balance); let alice = Mint::new(starting_balance);
let accountant = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); let event_processor = EventProcessor::new(
accountant,
&alice.last_id(),
Some(Duration::from_millis(30)),
);
let tvu = Arc::new(Tvu::new(event_processor)); let tvu = Arc::new(Tvu::new(event_processor));
let replicate_addr = target1_data.replicate_addr; let replicate_addr = target1_data.replicate_addr;
let threads = Tvu::serve( let threads = Tvu::serve(