Hoist historian

This commit is contained in:
Greg Fitzgerald
2018-05-09 09:26:58 -06:00
parent 876c77d0bc
commit 98ae80f4ed

View File

@ -35,18 +35,16 @@ use transaction::Transaction;
struct AccountingStage { struct AccountingStage {
acc: Mutex<Accountant>, acc: Mutex<Accountant>,
historian_input: Mutex<SyncSender<Signal>>, historian_input: Mutex<SyncSender<Signal>>,
historian: Historian,
entry_info_subscribers: Mutex<Vec<SocketAddr>>, entry_info_subscribers: Mutex<Vec<SocketAddr>>,
} }
impl AccountingStage { impl AccountingStage {
/// Create a new Tpu that wraps the given Accountant. /// Create a new Tpu that wraps the given Accountant.
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self { pub fn new(acc: Accountant, historian_input: SyncSender<Signal>) -> Self {
AccountingStage { AccountingStage {
acc: Mutex::new(acc), acc: Mutex::new(acc),
entry_info_subscribers: Mutex::new(vec![]), entry_info_subscribers: Mutex::new(vec![]),
historian_input: Mutex::new(historian_input), historian_input: Mutex::new(historian_input),
historian,
} }
} }
@ -122,6 +120,7 @@ impl AccountingStage {
pub struct Tpu { pub struct Tpu {
accounting: AccountingStage, accounting: AccountingStage,
historian: Historian,
} }
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
@ -165,8 +164,11 @@ pub enum Response {
impl Tpu { impl Tpu {
/// Create a new Tpu that wraps the given Accountant. /// Create a new Tpu that wraps the given Accountant.
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self { pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self {
let accounting = AccountingStage::new(acc, historian_input, historian); let accounting = AccountingStage::new(acc, historian_input);
Tpu { accounting } Tpu {
accounting,
historian,
}
} }
fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) { fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) {
@ -187,15 +189,14 @@ impl Tpu {
fn receive_all<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> { fn receive_all<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations //TODO implement a serialize for channel that does this without allocations
let mut l = vec![]; let mut l = vec![];
let entry = obj.accounting let entry = obj.historian
.historian
.output .output
.lock() .lock()
.unwrap() .unwrap()
.recv_timeout(Duration::new(1, 0))?; .recv_timeout(Duration::new(1, 0))?;
Self::update_entry(obj, writer, &entry); Self::update_entry(obj, writer, &entry);
l.push(entry); l.push(entry);
while let Ok(entry) = obj.accounting.historian.receive() { while let Ok(entry) = obj.historian.receive() {
Self::update_entry(obj, writer, &entry); Self::update_entry(obj, writer, &entry);
l.push(entry); l.push(entry);
} }
@ -865,7 +866,7 @@ mod tests {
let acc = Accountant::new(&mint); let acc = Accountant::new(&mint);
let (input, event_receiver) = sync_channel(10); let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None); let historian = Historian::new(event_receiver, &mint.last_id(), None);
let stage = AccountingStage::new(acc, input, historian); let stage = AccountingStage::new(acc, input);
// Process a batch that includes a transaction that receives two tokens. // Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new(); let alice = KeyPair::new();
@ -880,7 +881,7 @@ mod tests {
// Collect the ledger and feed it to a new accountant. // Collect the ledger and feed it to a new accountant.
drop(stage.historian_input); drop(stage.historian_input);
let entries: Vec<Entry> = stage.historian.output.lock().unwrap().iter().collect(); let entries: Vec<Entry> = historian.output.lock().unwrap().iter().collect();
// Assert the user holds one token, not two. If the server only output one // Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives // entry, then the second transaction will be rejected, because it drives
@ -1179,7 +1180,7 @@ mod bench {
let (input, event_receiver) = sync_channel(10); let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None); let historian = Historian::new(event_receiver, &mint.last_id(), None);
let stage = AccountingStage::new(acc, input, historian); let stage = AccountingStage::new(acc, input);
let now = Instant::now(); let now = Instant::now();
assert!(stage.process_events(req_vers).is_ok()); assert!(stage.process_events(req_vers).is_ok());
@ -1189,7 +1190,7 @@ mod bench {
// Ensure that all transactions were successfully logged. // Ensure that all transactions were successfully logged.
drop(stage.historian_input); drop(stage.historian_input);
let entries: Vec<Entry> = stage.historian.output.lock().unwrap().iter().collect(); let entries: Vec<Entry> = historian.output.lock().unwrap().iter().collect();
assert_eq!(entries.len(), 1); assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize); assert_eq!(entries[0].events.len(), txs as usize);