Move Tick out of Event
Every Entry is now a Tick and the entries contain events.
This commit is contained in:
@ -9,6 +9,7 @@ use transaction::{Condition, Transaction};
|
|||||||
use signature::{KeyPair, PublicKey, Signature};
|
use signature::{KeyPair, PublicKey, Signature};
|
||||||
use mint::Mint;
|
use mint::Mint;
|
||||||
use historian::{reserve_signature, Historian};
|
use historian::{reserve_signature, Historian};
|
||||||
|
use logger::Signal;
|
||||||
use std::sync::mpsc::SendError;
|
use std::sync::mpsc::SendError;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::result;
|
use std::result;
|
||||||
@ -96,7 +97,10 @@ impl Accountant {
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.process_verified_transaction(&tr, false)?;
|
self.process_verified_transaction(&tr, false)?;
|
||||||
if let Err(SendError(_)) = self.historian.sender.send(Event::Transaction(tr)) {
|
if let Err(SendError(_)) = self.historian
|
||||||
|
.sender
|
||||||
|
.send(Signal::Event(Event::Transaction(tr)))
|
||||||
|
{
|
||||||
return Err(AccountingError::SendError);
|
return Err(AccountingError::SendError);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,7 +247,6 @@ impl Accountant {
|
|||||||
|
|
||||||
fn process_verified_event(self: &mut Self, event: &Event, allow_deposits: bool) -> Result<()> {
|
fn process_verified_event(self: &mut Self, event: &Event, allow_deposits: bool) -> Result<()> {
|
||||||
match *event {
|
match *event {
|
||||||
Event::Tick => Ok(()),
|
|
||||||
Event::Transaction(ref tr) => self.process_verified_transaction(tr, allow_deposits),
|
Event::Transaction(ref tr) => self.process_verified_transaction(tr, allow_deposits),
|
||||||
Event::Signature { from, tx_sig, .. } => self.process_verified_sig(from, tx_sig),
|
Event::Signature { from, tx_sig, .. } => self.process_verified_sig(from, tx_sig),
|
||||||
Event::Timestamp { from, dt, .. } => self.process_verified_timestamp(from, dt),
|
Event::Timestamp { from, dt, .. } => self.process_verified_timestamp(from, dt),
|
||||||
|
@ -4,6 +4,7 @@ use silk::historian::Historian;
|
|||||||
use silk::hash::Hash;
|
use silk::hash::Hash;
|
||||||
use silk::entry::Entry;
|
use silk::entry::Entry;
|
||||||
use silk::log::verify_slice;
|
use silk::log::verify_slice;
|
||||||
|
use silk::logger::Signal;
|
||||||
use silk::signature::{KeyPair, KeyPairUtil};
|
use silk::signature::{KeyPair, KeyPairUtil};
|
||||||
use silk::transaction::Transaction;
|
use silk::transaction::Transaction;
|
||||||
use silk::event::Event;
|
use silk::event::Event;
|
||||||
@ -11,12 +12,12 @@ use std::thread::sleep;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::sync::mpsc::SendError;
|
use std::sync::mpsc::SendError;
|
||||||
|
|
||||||
fn create_log(hist: &Historian, seed: &Hash) -> Result<(), SendError<Event>> {
|
fn create_log(hist: &Historian, seed: &Hash) -> Result<(), SendError<Signal>> {
|
||||||
sleep(Duration::from_millis(15));
|
sleep(Duration::from_millis(15));
|
||||||
let keypair = KeyPair::new();
|
let keypair = KeyPair::new();
|
||||||
let tr = Transaction::new(&keypair, keypair.pubkey(), 42, *seed);
|
let tr = Transaction::new(&keypair, keypair.pubkey(), 42, *seed);
|
||||||
let event0 = Event::Transaction(tr);
|
let signal0 = Signal::Event(Event::Transaction(tr));
|
||||||
hist.sender.send(event0)?;
|
hist.sender.send(signal0)?;
|
||||||
sleep(Duration::from_millis(10));
|
sleep(Duration::from_millis(10));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -5,14 +5,8 @@ use transaction::Transaction;
|
|||||||
use chrono::prelude::*;
|
use chrono::prelude::*;
|
||||||
use bincode::serialize;
|
use bincode::serialize;
|
||||||
|
|
||||||
/// When 'event' is Tick, the event represents a simple clock tick, and exists for the
|
|
||||||
/// sole purpose of improving the performance of event log verification. A tick can
|
|
||||||
/// be generated in 'num_hashes' hashes and verified in 'num_hashes' hashes. By logging
|
|
||||||
/// a hash alongside the tick, each tick and be verified in parallel using the 'id'
|
|
||||||
/// of the preceding tick to seed its hashing.
|
|
||||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
|
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
|
||||||
pub enum Event {
|
pub enum Event {
|
||||||
Tick,
|
|
||||||
Transaction(Transaction<i64>),
|
Transaction(Transaction<i64>),
|
||||||
Signature {
|
Signature {
|
||||||
from: PublicKey,
|
from: PublicKey,
|
||||||
@ -40,7 +34,6 @@ impl Event {
|
|||||||
// TODO: Rename this to transaction_signature().
|
// TODO: Rename this to transaction_signature().
|
||||||
pub fn get_signature(&self) -> Option<Signature> {
|
pub fn get_signature(&self) -> Option<Signature> {
|
||||||
match *self {
|
match *self {
|
||||||
Event::Tick => None,
|
|
||||||
Event::Transaction(ref tr) => Some(tr.sig),
|
Event::Transaction(ref tr) => Some(tr.sig),
|
||||||
Event::Signature { .. } => None,
|
Event::Signature { .. } => None,
|
||||||
Event::Timestamp { .. } => None,
|
Event::Timestamp { .. } => None,
|
||||||
@ -49,7 +42,6 @@ impl Event {
|
|||||||
|
|
||||||
pub fn verify(&self) -> bool {
|
pub fn verify(&self) -> bool {
|
||||||
match *self {
|
match *self {
|
||||||
Event::Tick => true,
|
|
||||||
Event::Transaction(ref tr) => tr.verify(),
|
Event::Transaction(ref tr) => tr.verify(),
|
||||||
Event::Signature { from, tx_sig, sig } => sig.verify(&from, &tx_sig),
|
Event::Signature { from, tx_sig, sig } => sig.verify(&from, &tx_sig),
|
||||||
Event::Timestamp { from, dt, sig } => sig.verify(&from, &serialize(&dt).unwrap()),
|
Event::Timestamp { from, dt, sig } => sig.verify(&from, &serialize(&dt).unwrap()),
|
||||||
|
@ -7,12 +7,11 @@ use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
|
|||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use hash::{hash, Hash};
|
use hash::{hash, Hash};
|
||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
use logger::{ExitReason, Logger};
|
use logger::{ExitReason, Logger, Signal};
|
||||||
use signature::Signature;
|
use signature::Signature;
|
||||||
use event::Event;
|
|
||||||
|
|
||||||
pub struct Historian {
|
pub struct Historian {
|
||||||
pub sender: SyncSender<Event>,
|
pub sender: SyncSender<Signal>,
|
||||||
pub receiver: Receiver<Entry>,
|
pub receiver: Receiver<Entry>,
|
||||||
pub thread_hdl: JoinHandle<ExitReason>,
|
pub thread_hdl: JoinHandle<ExitReason>,
|
||||||
pub signatures: HashSet<Signature>,
|
pub signatures: HashSet<Signature>,
|
||||||
@ -38,7 +37,7 @@ impl Historian {
|
|||||||
fn create_logger(
|
fn create_logger(
|
||||||
start_hash: Hash,
|
start_hash: Hash,
|
||||||
ms_per_tick: Option<u64>,
|
ms_per_tick: Option<u64>,
|
||||||
receiver: Receiver<Event>,
|
receiver: Receiver<Signal>,
|
||||||
sender: SyncSender<Entry>,
|
sender: SyncSender<Entry>,
|
||||||
) -> JoinHandle<ExitReason> {
|
) -> JoinHandle<ExitReason> {
|
||||||
spawn(move || {
|
spawn(move || {
|
||||||
@ -69,7 +68,6 @@ pub fn reserve_signature(sigs: &mut HashSet<Signature>, sig: &Signature) -> bool
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use log::*;
|
use log::*;
|
||||||
use event::*;
|
|
||||||
use std::thread::sleep;
|
use std::thread::sleep;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
@ -78,11 +76,11 @@ mod tests {
|
|||||||
let zero = Hash::default();
|
let zero = Hash::default();
|
||||||
let hist = Historian::new(&zero, None);
|
let hist = Historian::new(&zero, None);
|
||||||
|
|
||||||
hist.sender.send(Event::Tick).unwrap();
|
hist.sender.send(Signal::Tick).unwrap();
|
||||||
sleep(Duration::new(0, 1_000_000));
|
sleep(Duration::new(0, 1_000_000));
|
||||||
hist.sender.send(Event::Tick).unwrap();
|
hist.sender.send(Signal::Tick).unwrap();
|
||||||
sleep(Duration::new(0, 1_000_000));
|
sleep(Duration::new(0, 1_000_000));
|
||||||
hist.sender.send(Event::Tick).unwrap();
|
hist.sender.send(Signal::Tick).unwrap();
|
||||||
|
|
||||||
let entry0 = hist.receiver.recv().unwrap();
|
let entry0 = hist.receiver.recv().unwrap();
|
||||||
let entry1 = hist.receiver.recv().unwrap();
|
let entry1 = hist.receiver.recv().unwrap();
|
||||||
@ -106,7 +104,7 @@ mod tests {
|
|||||||
let zero = Hash::default();
|
let zero = Hash::default();
|
||||||
let hist = Historian::new(&zero, None);
|
let hist = Historian::new(&zero, None);
|
||||||
drop(hist.receiver);
|
drop(hist.receiver);
|
||||||
hist.sender.send(Event::Tick).unwrap();
|
hist.sender.send(Signal::Tick).unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
hist.thread_hdl.join().unwrap(),
|
hist.thread_hdl.join().unwrap(),
|
||||||
ExitReason::SendDisconnected
|
ExitReason::SendDisconnected
|
||||||
@ -126,7 +124,7 @@ mod tests {
|
|||||||
let zero = Hash::default();
|
let zero = Hash::default();
|
||||||
let hist = Historian::new(&zero, Some(20));
|
let hist = Historian::new(&zero, Some(20));
|
||||||
sleep(Duration::from_millis(30));
|
sleep(Duration::from_millis(30));
|
||||||
hist.sender.send(Event::Tick).unwrap();
|
hist.sender.send(Signal::Tick).unwrap();
|
||||||
drop(hist.sender);
|
drop(hist.sender);
|
||||||
let entries: Vec<Entry> = hist.receiver.iter().collect();
|
let entries: Vec<Entry> = hist.receiver.iter().collect();
|
||||||
|
|
||||||
|
@ -13,6 +13,11 @@ use entry::{create_entry_mut, Entry};
|
|||||||
use event::Event;
|
use event::Event;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
|
|
||||||
|
pub enum Signal {
|
||||||
|
Tick,
|
||||||
|
Event(Event),
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub enum ExitReason {
|
pub enum ExitReason {
|
||||||
RecvDisconnected,
|
RecvDisconnected,
|
||||||
@ -21,7 +26,7 @@ pub enum ExitReason {
|
|||||||
|
|
||||||
pub struct Logger {
|
pub struct Logger {
|
||||||
pub sender: SyncSender<Entry>,
|
pub sender: SyncSender<Entry>,
|
||||||
pub receiver: Receiver<Event>,
|
pub receiver: Receiver<Signal>,
|
||||||
pub last_id: Hash,
|
pub last_id: Hash,
|
||||||
pub events: Vec<Event>,
|
pub events: Vec<Event>,
|
||||||
pub num_hashes: u64,
|
pub num_hashes: u64,
|
||||||
@ -29,7 +34,7 @@ pub struct Logger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Logger {
|
impl Logger {
|
||||||
pub fn new(receiver: Receiver<Event>, sender: SyncSender<Entry>, start_hash: Hash) -> Self {
|
pub fn new(receiver: Receiver<Signal>, sender: SyncSender<Entry>, start_hash: Hash) -> Self {
|
||||||
Logger {
|
Logger {
|
||||||
receiver,
|
receiver,
|
||||||
sender,
|
sender,
|
||||||
@ -61,16 +66,17 @@ impl Logger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
match self.receiver.try_recv() {
|
match self.receiver.try_recv() {
|
||||||
Ok(event) => {
|
Ok(signal) => match signal {
|
||||||
if let Event::Tick = event {
|
Signal::Tick => {
|
||||||
let entry = self.log_entry()?;
|
let entry = self.log_entry()?;
|
||||||
self.sender
|
self.sender
|
||||||
.send(entry)
|
.send(entry)
|
||||||
.or(Err(ExitReason::SendDisconnected))?;
|
.or(Err(ExitReason::SendDisconnected))?;
|
||||||
} else {
|
}
|
||||||
|
Signal::Event(event) => {
|
||||||
self.events.push(event);
|
self.events.push(event);
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
Err(TryRecvError::Empty) => return Ok(()),
|
Err(TryRecvError::Empty) => return Ok(()),
|
||||||
Err(TryRecvError::Disconnected) => return Err(ExitReason::RecvDisconnected),
|
Err(TryRecvError::Disconnected) => return Err(ExitReason::RecvDisconnected),
|
||||||
};
|
};
|
||||||
|
Reference in New Issue
Block a user