diff --git a/Cargo.toml b/Cargo.toml index 8cb20021f1..31e6919a42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "silk" description = "A silky smooth implementation of the Loom architecture" -version = "0.2.1" +version = "0.2.2" documentation = "https://docs.rs/silk" homepage = "http://loomprotocol.com/" repository = "https://github.com/loomprotocol/silk" diff --git a/README.md b/README.md index e0d4fb0f38..074643e857 100644 --- a/README.md +++ b/README.md @@ -24,28 +24,27 @@ Create a *Historian* and send it *events* to generate an *event log*, where each is tagged with the historian's latest *hash*. Then ensure the order of events was not tampered with by verifying each entry's hash can be generated from the hash in the previous entry: -![historian](https://user-images.githubusercontent.com/55449/36492930-97a572be-16eb-11e8-8289-358e9507189e.png) +![historian](https://user-images.githubusercontent.com/55449/36499105-7c8db6a0-16fd-11e8-8b88-c6e0f52d7a50.png) ```rust extern crate silk; use silk::historian::Historian; use silk::log::{verify_slice, Entry, Event, Sha256Hash}; -use std::{thread, time}; +use std::thread::sleep; +use std::time::Duration; use std::sync::mpsc::SendError; fn create_log(hist: &Historian) -> Result<(), SendError> { - hist.sender.send(Event::Tick)?; - thread::sleep(time::Duration::new(0, 100_000)); + sleep(Duration::from_millis(15)); hist.sender.send(Event::UserDataKey(Sha256Hash::default()))?; - thread::sleep(time::Duration::new(0, 100_000)); - hist.sender.send(Event::Tick)?; + sleep(Duration::from_millis(10)); Ok(()) } fn main() { let seed = Sha256Hash::default(); - let hist = Historian::new(&seed); + let hist = Historian::new(&seed, Some(10)); create_log(&hist).expect("send error"); drop(hist.sender); let entries: Vec = hist.receiver.iter().collect(); diff --git a/diagrams/historian.msc b/diagrams/historian.msc index 703c51bb2e..ee67f49d12 100644 --- a/diagrams/historian.msc +++ b/diagrams/historian.msc @@ -1,8 +1,6 @@ msc { client,historian,logger; - client=>historian [ label = "Tick" ] ; - historian=>logger [ label = "Tick" ] ; logger=>historian [ label = "e0 = Entry{hash: h0, n: 0, event: Tick}" ] ; logger=>logger [ label = "h1 = hash(h0)" ] ; logger=>logger [ label = "h2 = hash(h1)" ] ; @@ -13,8 +11,6 @@ msc { logger=>logger [ label = "h4 = hash(h3)" ] ; logger=>logger [ label = "h5 = hash(h4)" ] ; logger=>logger [ label = "h6 = hash(h5)" ] ; - client=>historian [ label = "Tick" ] ; - historian=>logger [ label = "Tick" ] ; logger=>historian [ label = "e2 = Entry{hash: h6, n: 3, event: Tick}" ] ; client=>historian [ label = "collect()" ] ; historian=>client [ label = "entries = [e0, e1, e2]" ] ; diff --git a/src/bin/demo.rs b/src/bin/demo.rs index ac7864c619..e4930521af 100644 --- a/src/bin/demo.rs +++ b/src/bin/demo.rs @@ -2,21 +2,20 @@ extern crate silk; use silk::historian::Historian; use silk::log::{verify_slice, Entry, Event, Sha256Hash}; -use std::{thread, time}; +use std::thread::sleep; +use std::time::Duration; use std::sync::mpsc::SendError; fn create_log(hist: &Historian) -> Result<(), SendError> { - hist.sender.send(Event::Tick)?; - thread::sleep(time::Duration::new(0, 100_000)); + sleep(Duration::from_millis(15)); hist.sender.send(Event::UserDataKey(Sha256Hash::default()))?; - thread::sleep(time::Duration::new(0, 100_000)); - hist.sender.send(Event::Tick)?; + sleep(Duration::from_millis(10)); Ok(()) } fn main() { let seed = Sha256Hash::default(); - let hist = Historian::new(&seed); + let hist = Historian::new(&seed, Some(10)); create_log(&hist).expect("send error"); drop(hist.sender); let entries: Vec = hist.receiver.iter().collect(); diff --git a/src/historian.rs b/src/historian.rs index f958e0b45a..37592dee83 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -7,6 +7,7 @@ use std::thread::JoinHandle; use std::sync::mpsc::{Receiver, Sender}; +use std::time::{Duration, SystemTime}; use log::{extend_and_hash, hash, Entry, Event, Sha256Hash}; pub struct Historian { @@ -20,29 +21,48 @@ pub enum ExitReason { RecvDisconnected, SendDisconnected, } +fn log_event( + sender: &Sender, + num_hashes: &mut u64, + end_hash: &mut Sha256Hash, + event: Event, +) -> Result<(), (Entry, ExitReason)> { + if let Event::UserDataKey(key) = event { + *end_hash = extend_and_hash(end_hash, &key); + } + let entry = Entry { + end_hash: *end_hash, + num_hashes: *num_hashes, + event, + }; + if let Err(_) = sender.send(entry.clone()) { + return Err((entry, ExitReason::SendDisconnected)); + } + *num_hashes = 0; + Ok(()) +} fn log_events( receiver: &Receiver, sender: &Sender, num_hashes: &mut u64, end_hash: &mut Sha256Hash, + epoch: SystemTime, + num_ticks: &mut u64, + ms_per_tick: Option, ) -> Result<(), (Entry, ExitReason)> { use std::sync::mpsc::TryRecvError; loop { + if let Some(ms) = ms_per_tick { + let now = SystemTime::now(); + if now > epoch + Duration::from_millis((*num_ticks + 1) * ms) { + log_event(sender, num_hashes, end_hash, Event::Tick)?; + *num_ticks += 1; + } + } match receiver.try_recv() { Ok(event) => { - if let Event::UserDataKey(key) = event { - *end_hash = extend_and_hash(end_hash, &key); - } - let entry = Entry { - end_hash: *end_hash, - num_hashes: *num_hashes, - event, - }; - if let Err(_) = sender.send(entry.clone()) { - return Err((entry, ExitReason::SendDisconnected)); - } - *num_hashes = 0; + log_event(sender, num_hashes, end_hash, event)?; } Err(TryRecvError::Empty) => { return Ok(()); @@ -63,6 +83,7 @@ fn log_events( /// sending back Entry messages until either the receiver or sender channel is closed. pub fn create_logger( start_hash: Sha256Hash, + ms_per_tick: Option, receiver: Receiver, sender: Sender, ) -> JoinHandle<(Entry, ExitReason)> { @@ -70,8 +91,18 @@ pub fn create_logger( thread::spawn(move || { let mut end_hash = start_hash; let mut num_hashes = 0; + let mut num_ticks = 0; + let epoch = SystemTime::now(); loop { - if let Err(err) = log_events(&receiver, &sender, &mut num_hashes, &mut end_hash) { + if let Err(err) = log_events( + &receiver, + &sender, + &mut num_hashes, + &mut end_hash, + epoch, + &mut num_ticks, + ms_per_tick, + ) { return err; } end_hash = hash(&end_hash); @@ -81,11 +112,11 @@ pub fn create_logger( } impl Historian { - pub fn new(start_hash: &Sha256Hash) -> Self { + pub fn new(start_hash: &Sha256Hash, ms_per_tick: Option) -> Self { use std::sync::mpsc::channel; let (sender, event_receiver) = channel(); let (entry_sender, receiver) = channel(); - let thread_hdl = create_logger(*start_hash, event_receiver, entry_sender); + let thread_hdl = create_logger(*start_hash, ms_per_tick, event_receiver, entry_sender); Historian { sender, receiver, @@ -98,14 +129,13 @@ impl Historian { mod tests { use super::*; use log::*; + use std::thread::sleep; + use std::time::Duration; #[test] fn test_historian() { - use std::thread::sleep; - use std::time::Duration; - let zero = Sha256Hash::default(); - let hist = Historian::new(&zero); + let hist = Historian::new(&zero, None); hist.sender.send(Event::Tick).unwrap(); sleep(Duration::new(0, 1_000_000)); @@ -129,7 +159,7 @@ mod tests { #[test] fn test_historian_closed_sender() { let zero = Sha256Hash::default(); - let hist = Historian::new(&zero); + let hist = Historian::new(&zero, None); drop(hist.receiver); hist.sender.send(Event::Tick).unwrap(); assert_eq!( @@ -137,4 +167,22 @@ mod tests { ExitReason::SendDisconnected ); } + + #[test] + fn test_ticking_historian() { + let zero = Sha256Hash::default(); + let hist = Historian::new(&zero, Some(20)); + sleep(Duration::from_millis(30)); + hist.sender.send(Event::UserDataKey(zero)).unwrap(); + sleep(Duration::from_millis(15)); + drop(hist.sender); + assert_eq!( + hist.thread_hdl.join().unwrap().1, + ExitReason::RecvDisconnected + ); + + let entries: Vec = hist.receiver.iter().collect(); + assert!(entries.len() > 1); + assert!(verify_slice(&entries, &zero)); + } }