From f906112c03118eb29640046db48d63894e9c5505 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sat, 3 Mar 2018 13:52:57 -0700 Subject: [PATCH 1/5] Move logging thread's state into a struct --- src/historian.rs | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/src/historian.rs b/src/historian.rs index 515a5746e9..197bc10837 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -26,6 +26,15 @@ pub enum ExitReason { RecvDisconnected, SendDisconnected, } + +pub struct Logger { + pub sender: SyncSender>, + pub receiver: Receiver>, + pub end_hash: Sha256Hash, + pub num_hashes: u64, + pub num_ticks: u64, +} + fn log_event( sender: &SyncSender>, num_hashes: &mut u64, @@ -108,24 +117,28 @@ pub fn create_logger( ) -> JoinHandle<(Entry, ExitReason)> { use std::thread; thread::spawn(move || { - let mut end_hash = start_hash; - let mut num_hashes = 0; - let mut num_ticks = 0; + let mut logger = Logger { + receiver: receiver, + sender: sender, + end_hash: start_hash, + num_hashes: 0, + num_ticks: 0, + }; let epoch = SystemTime::now(); loop { if let Err(err) = log_events( - &receiver, - &sender, - &mut num_hashes, - &mut end_hash, + &logger.receiver, + &logger.sender, + &mut logger.num_hashes, + &mut logger.end_hash, epoch, - &mut num_ticks, + &mut logger.num_ticks, ms_per_tick, ) { return err; } - end_hash = hash(&end_hash); - num_hashes += 1; + logger.end_hash = hash(&logger.end_hash); + logger.num_hashes += 1; } }) } From c7de48c9826c4188600ce2237d6838b54323420f Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sat, 3 Mar 2018 14:00:37 -0700 Subject: [PATCH 2/5] Convert log_events from function to method --- src/historian.rs | 80 ++++++++++++++++++++++++------------------------ 1 file changed, 40 insertions(+), 40 deletions(-) diff --git a/src/historian.rs b/src/historian.rs index 197bc10837..4e6ed0601a 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -70,38 +70,46 @@ fn verify_event_and_reserve_signature( true } -fn log_events( - receiver: &Receiver>, - sender: &SyncSender>, - num_hashes: &mut u64, - end_hash: &mut Sha256Hash, - epoch: SystemTime, - num_ticks: &mut u64, - ms_per_tick: Option, -) -> Result<(), (Entry, ExitReason)> { - use std::sync::mpsc::TryRecvError; - loop { - if let Some(ms) = ms_per_tick { - let now = SystemTime::now(); - if now > epoch + Duration::from_millis((*num_ticks + 1) * ms) { - log_event(sender, num_hashes, end_hash, Event::Tick)?; - *num_ticks += 1; +impl Logger { + fn log_events( + &mut self, + epoch: SystemTime, + ms_per_tick: Option, + ) -> Result<(), (Entry, ExitReason)> { + use std::sync::mpsc::TryRecvError; + loop { + if let Some(ms) = ms_per_tick { + let now = SystemTime::now(); + if now > epoch + Duration::from_millis((self.num_ticks + 1) * ms) { + log_event( + &self.sender, + &mut self.num_hashes, + &mut self.end_hash, + Event::Tick, + )?; + self.num_ticks += 1; + } } - } - match receiver.try_recv() { - Ok(event) => { - log_event(sender, num_hashes, end_hash, event)?; - } - Err(TryRecvError::Empty) => { - return Ok(()); - } - Err(TryRecvError::Disconnected) => { - let entry = Entry { - end_hash: *end_hash, - num_hashes: *num_hashes, - event: Event::Tick, - }; - return Err((entry, ExitReason::RecvDisconnected)); + match self.receiver.try_recv() { + Ok(event) => { + log_event( + &self.sender, + &mut self.num_hashes, + &mut self.end_hash, + event, + )?; + } + Err(TryRecvError::Empty) => { + return Ok(()); + } + Err(TryRecvError::Disconnected) => { + let entry = Entry { + end_hash: self.end_hash, + num_hashes: self.num_hashes, + event: Event::Tick, + }; + return Err((entry, ExitReason::RecvDisconnected)); + } } } } @@ -126,15 +134,7 @@ pub fn create_logger( }; let epoch = SystemTime::now(); loop { - if let Err(err) = log_events( - &logger.receiver, - &logger.sender, - &mut logger.num_hashes, - &mut logger.end_hash, - epoch, - &mut logger.num_ticks, - ms_per_tick, - ) { + if let Err(err) = logger.log_events(epoch, ms_per_tick) { return err; } logger.end_hash = hash(&logger.end_hash); From afb2bf442c4db2123d303cb94a6c4decd06cb868 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sat, 3 Mar 2018 14:08:51 -0700 Subject: [PATCH 3/5] Use Instant instead of SystemTime for more precise ticking And convert log_event from function to method --- src/historian.rs | 80 ++++++++++++++++++++++-------------------------- 1 file changed, 36 insertions(+), 44 deletions(-) diff --git a/src/historian.rs b/src/historian.rs index 4e6ed0601a..f66790a4b9 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -8,7 +8,7 @@ use std::thread::JoinHandle; use std::collections::HashSet; use std::sync::mpsc::{Receiver, SyncSender}; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant}; use log::{hash, hash_event, Entry, Sha256Hash}; use event::{get_signature, verify_event, Event, Signature}; use serde::Serialize; @@ -35,25 +35,6 @@ pub struct Logger { pub num_ticks: u64, } -fn log_event( - sender: &SyncSender>, - num_hashes: &mut u64, - end_hash: &mut Sha256Hash, - event: Event, -) -> Result<(), (Entry, ExitReason)> { - *end_hash = hash_event(end_hash, &event); - let entry = Entry { - end_hash: *end_hash, - num_hashes: *num_hashes, - event, - }; - if let Err(_) = sender.send(entry.clone()) { - return Err((entry, ExitReason::SendDisconnected)); - } - *num_hashes = 0; - Ok(()) -} - fn verify_event_and_reserve_signature( signatures: &mut HashSet, event: &Event, @@ -71,33 +52,50 @@ fn verify_event_and_reserve_signature( } impl Logger { + fn new( + receiver: Receiver>, + sender: SyncSender>, + start_hash: Sha256Hash, + ) -> Self { + Logger { + receiver, + sender, + end_hash: start_hash, + num_hashes: 0, + num_ticks: 0, + } + } + + fn log_event(&mut self, event: Event) -> Result<(), (Entry, ExitReason)> { + self.end_hash = hash_event(&self.end_hash, &event); + let entry = Entry { + end_hash: self.end_hash, + num_hashes: self.num_hashes, + event, + }; + if let Err(_) = self.sender.send(entry.clone()) { + return Err((entry, ExitReason::SendDisconnected)); + } + self.num_hashes = 0; + Ok(()) + } + fn log_events( &mut self, - epoch: SystemTime, + epoch: Instant, ms_per_tick: Option, ) -> Result<(), (Entry, ExitReason)> { use std::sync::mpsc::TryRecvError; loop { if let Some(ms) = ms_per_tick { - let now = SystemTime::now(); - if now > epoch + Duration::from_millis((self.num_ticks + 1) * ms) { - log_event( - &self.sender, - &mut self.num_hashes, - &mut self.end_hash, - Event::Tick, - )?; + if epoch.elapsed() > Duration::from_millis((self.num_ticks + 1) * ms) { + self.log_event(Event::Tick)?; self.num_ticks += 1; } } match self.receiver.try_recv() { Ok(event) => { - log_event( - &self.sender, - &mut self.num_hashes, - &mut self.end_hash, - event, - )?; + self.log_event(event)?; } Err(TryRecvError::Empty) => { return Ok(()); @@ -125,16 +123,10 @@ pub fn create_logger( ) -> JoinHandle<(Entry, ExitReason)> { use std::thread; thread::spawn(move || { - let mut logger = Logger { - receiver: receiver, - sender: sender, - end_hash: start_hash, - num_hashes: 0, - num_ticks: 0, - }; - let epoch = SystemTime::now(); + let mut logger = Logger::new(receiver, sender, start_hash); + let now = Instant::now(); loop { - if let Err(err) = logger.log_events(epoch, ms_per_tick) { + if let Err(err) = logger.log_events(now, ms_per_tick) { return err; } logger.end_hash = hash(&logger.end_hash); From bebba7dc1fb657173e4ddfc4005f32bd49dcac99 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sat, 3 Mar 2018 14:24:32 -0700 Subject: [PATCH 4/5] Give logger its own crate --- src/accountant.rs | 2 +- src/historian.rs | 127 ++----------------------------------------- src/lib.rs | 1 + src/logger.rs | 135 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 142 insertions(+), 123 deletions(-) create mode 100644 src/logger.rs diff --git a/src/accountant.rs b/src/accountant.rs index 69966c49de..ebaec39c4d 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -152,7 +152,7 @@ impl Accountant { mod tests { use super::*; use event::{generate_keypair, get_pubkey}; - use historian::ExitReason; + use logger::ExitReason; #[test] fn test_accountant() { diff --git a/src/historian.rs b/src/historian.rs index f66790a4b9..6011bddcdd 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -1,16 +1,13 @@ //! The `historian` crate provides a microservice for generating a Proof-of-History. -//! It logs Event items on behalf of its users. It continuously generates -//! new hashes, only stopping to check if it has been sent an Event item. It -//! tags each Event with an Entry and sends it back. The Entry includes the -//! Event, the latest hash, and the number of hashes since the last event. -//! The resulting stream of entries represents ordered events in time. +//! It manages a thread containing a Proof-of-History Logger. use std::thread::JoinHandle; use std::collections::HashSet; use std::sync::mpsc::{Receiver, SyncSender}; -use std::time::{Duration, Instant}; -use log::{hash, hash_event, Entry, Sha256Hash}; -use event::{get_signature, verify_event, Event, Signature}; +use std::time::Instant; +use log::{hash, Entry, Sha256Hash}; +use logger::{verify_event_and_reserve_signature, ExitReason, Logger}; +use event::{Event, Signature}; use serde::Serialize; use std::fmt::Debug; @@ -21,98 +18,6 @@ pub struct Historian { pub signatures: HashSet, } -#[derive(Debug, PartialEq, Eq)] -pub enum ExitReason { - RecvDisconnected, - SendDisconnected, -} - -pub struct Logger { - pub sender: SyncSender>, - pub receiver: Receiver>, - pub end_hash: Sha256Hash, - pub num_hashes: u64, - pub num_ticks: u64, -} - -fn verify_event_and_reserve_signature( - signatures: &mut HashSet, - event: &Event, -) -> bool { - if !verify_event(&event) { - return false; - } - if let Some(sig) = get_signature(&event) { - if signatures.contains(&sig) { - return false; - } - signatures.insert(sig); - } - true -} - -impl Logger { - fn new( - receiver: Receiver>, - sender: SyncSender>, - start_hash: Sha256Hash, - ) -> Self { - Logger { - receiver, - sender, - end_hash: start_hash, - num_hashes: 0, - num_ticks: 0, - } - } - - fn log_event(&mut self, event: Event) -> Result<(), (Entry, ExitReason)> { - self.end_hash = hash_event(&self.end_hash, &event); - let entry = Entry { - end_hash: self.end_hash, - num_hashes: self.num_hashes, - event, - }; - if let Err(_) = self.sender.send(entry.clone()) { - return Err((entry, ExitReason::SendDisconnected)); - } - self.num_hashes = 0; - Ok(()) - } - - fn log_events( - &mut self, - epoch: Instant, - ms_per_tick: Option, - ) -> Result<(), (Entry, ExitReason)> { - use std::sync::mpsc::TryRecvError; - loop { - if let Some(ms) = ms_per_tick { - if epoch.elapsed() > Duration::from_millis((self.num_ticks + 1) * ms) { - self.log_event(Event::Tick)?; - self.num_ticks += 1; - } - } - match self.receiver.try_recv() { - Ok(event) => { - self.log_event(event)?; - } - Err(TryRecvError::Empty) => { - return Ok(()); - } - Err(TryRecvError::Disconnected) => { - let entry = Entry { - end_hash: self.end_hash, - num_hashes: self.num_hashes, - event: Event::Tick, - }; - return Err((entry, ExitReason::RecvDisconnected)); - } - } - } - } -} - /// A background thread that will continue tagging received Event messages and /// sending back Entry messages until either the receiver or sender channel is closed. pub fn create_logger( @@ -215,26 +120,4 @@ mod tests { assert!(entries.len() > 1); assert!(verify_slice(&entries, &zero)); } - - #[test] - fn test_bad_event_signature() { - let keypair = generate_keypair(); - let sig = sign_claim_data(&hash(b"hello, world"), &keypair); - let event0 = Event::new_claim(get_pubkey(&keypair), hash(b"goodbye cruel world"), sig); - let mut sigs = HashSet::new(); - assert!(!verify_event_and_reserve_signature(&mut sigs, &event0)); - assert!(!sigs.contains(&sig)); - } - - #[test] - fn test_duplicate_event_signature() { - let keypair = generate_keypair(); - let to = get_pubkey(&keypair); - let data = &hash(b"hello, world"); - let sig = sign_claim_data(data, &keypair); - let event0 = Event::new_claim(to, data, sig); - let mut sigs = HashSet::new(); - assert!(verify_event_and_reserve_signature(&mut sigs, &event0)); - assert!(!verify_event_and_reserve_signature(&mut sigs, &event0)); - } } diff --git a/src/lib.rs b/src/lib.rs index 851262000a..588c798493 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![cfg_attr(feature = "unstable", feature(test))] pub mod log; +pub mod logger; pub mod event; pub mod historian; pub mod accountant; diff --git a/src/logger.rs b/src/logger.rs new file mode 100644 index 0000000000..ce8040711c --- /dev/null +++ b/src/logger.rs @@ -0,0 +1,135 @@ +//! The `logger` crate provides an object for generating a Proof-of-History. +//! It logs Event items on behalf of its users. It continuously generates +//! new hashes, only stopping to check if it has been sent an Event item. It +//! tags each Event with an Entry and sends it back. The Entry includes the +//! Event, the latest hash, and the number of hashes since the last event. +//! The resulting stream of entries represents ordered events in time. + +use std::collections::HashSet; +use std::sync::mpsc::{Receiver, SyncSender}; +use std::time::{Duration, Instant}; +use log::{hash_event, Entry, Sha256Hash}; +use event::{get_signature, verify_event, Event, Signature}; +use serde::Serialize; +use std::fmt::Debug; + +#[derive(Debug, PartialEq, Eq)] +pub enum ExitReason { + RecvDisconnected, + SendDisconnected, +} + +pub struct Logger { + pub sender: SyncSender>, + pub receiver: Receiver>, + pub end_hash: Sha256Hash, + pub num_hashes: u64, + pub num_ticks: u64, +} + +pub fn verify_event_and_reserve_signature( + signatures: &mut HashSet, + event: &Event, +) -> bool { + if !verify_event(&event) { + return false; + } + if let Some(sig) = get_signature(&event) { + if signatures.contains(&sig) { + return false; + } + signatures.insert(sig); + } + true +} + +impl Logger { + pub fn new( + receiver: Receiver>, + sender: SyncSender>, + start_hash: Sha256Hash, + ) -> Self { + Logger { + receiver, + sender, + end_hash: start_hash, + num_hashes: 0, + num_ticks: 0, + } + } + + pub fn log_event(&mut self, event: Event) -> Result<(), (Entry, ExitReason)> { + self.end_hash = hash_event(&self.end_hash, &event); + let entry = Entry { + end_hash: self.end_hash, + num_hashes: self.num_hashes, + event, + }; + if let Err(_) = self.sender.send(entry.clone()) { + return Err((entry, ExitReason::SendDisconnected)); + } + self.num_hashes = 0; + Ok(()) + } + + pub fn log_events( + &mut self, + epoch: Instant, + ms_per_tick: Option, + ) -> Result<(), (Entry, ExitReason)> { + use std::sync::mpsc::TryRecvError; + loop { + if let Some(ms) = ms_per_tick { + if epoch.elapsed() > Duration::from_millis((self.num_ticks + 1) * ms) { + self.log_event(Event::Tick)?; + self.num_ticks += 1; + } + } + match self.receiver.try_recv() { + Ok(event) => { + self.log_event(event)?; + } + Err(TryRecvError::Empty) => { + return Ok(()); + } + Err(TryRecvError::Disconnected) => { + let entry = Entry { + end_hash: self.end_hash, + num_hashes: self.num_hashes, + event: Event::Tick, + }; + return Err((entry, ExitReason::RecvDisconnected)); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use log::*; + use event::*; + + #[test] + fn test_bad_event_signature() { + let keypair = generate_keypair(); + let sig = sign_claim_data(&hash(b"hello, world"), &keypair); + let event0 = Event::new_claim(get_pubkey(&keypair), hash(b"goodbye cruel world"), sig); + let mut sigs = HashSet::new(); + assert!(!verify_event_and_reserve_signature(&mut sigs, &event0)); + assert!(!sigs.contains(&sig)); + } + + #[test] + fn test_duplicate_event_signature() { + let keypair = generate_keypair(); + let to = get_pubkey(&keypair); + let data = &hash(b"hello, world"); + let sig = sign_claim_data(data, &keypair); + let event0 = Event::new_claim(to, data, sig); + let mut sigs = HashSet::new(); + assert!(verify_event_and_reserve_signature(&mut sigs, &event0)); + assert!(!verify_event_and_reserve_signature(&mut sigs, &event0)); + } +} From f7496ea6d10f1883718eb22dddc944f663ca5661 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sat, 3 Mar 2018 14:26:57 -0700 Subject: [PATCH 5/5] Make create_logger a static method Allows us to share the super long type signature in impl. --- src/historian.rs | 48 +++++++++++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/src/historian.rs b/src/historian.rs index 6011bddcdd..446570642e 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -18,34 +18,13 @@ pub struct Historian { pub signatures: HashSet, } -/// A background thread that will continue tagging received Event messages and -/// sending back Entry messages until either the receiver or sender channel is closed. -pub fn create_logger( - start_hash: Sha256Hash, - ms_per_tick: Option, - receiver: Receiver>, - sender: SyncSender>, -) -> JoinHandle<(Entry, ExitReason)> { - use std::thread; - thread::spawn(move || { - let mut logger = Logger::new(receiver, sender, start_hash); - let now = Instant::now(); - loop { - if let Err(err) = logger.log_events(now, ms_per_tick) { - return err; - } - logger.end_hash = hash(&logger.end_hash); - logger.num_hashes += 1; - } - }) -} - impl Historian { pub fn new(start_hash: &Sha256Hash, ms_per_tick: Option) -> Self { use std::sync::mpsc::sync_channel; let (sender, event_receiver) = sync_channel(1000); let (entry_sender, receiver) = sync_channel(1000); - let thread_hdl = create_logger(*start_hash, ms_per_tick, event_receiver, entry_sender); + let thread_hdl = + Historian::create_logger(*start_hash, ms_per_tick, event_receiver, entry_sender); let signatures = HashSet::new(); Historian { sender, @@ -54,9 +33,32 @@ impl Historian { signatures, } } + pub fn verify_event(self: &mut Self, event: &Event) -> bool { return verify_event_and_reserve_signature(&mut self.signatures, event); } + + /// A background thread that will continue tagging received Event messages and + /// sending back Entry messages until either the receiver or sender channel is closed. + fn create_logger( + start_hash: Sha256Hash, + ms_per_tick: Option, + receiver: Receiver>, + sender: SyncSender>, + ) -> JoinHandle<(Entry, ExitReason)> { + use std::thread; + thread::spawn(move || { + let mut logger = Logger::new(receiver, sender, start_hash); + let now = Instant::now(); + loop { + if let Err(err) = logger.log_events(now, ms_per_tick) { + return err; + } + logger.end_hash = hash(&logger.end_hash); + logger.num_hashes += 1; + } + }) + } } #[cfg(test)]