Move the historian up to accountant_skel

This commit is contained in:
Greg Fitzgerald
2018-04-02 14:41:07 -06:00
parent 90cd9bd533
commit 2b788d06b7
4 changed files with 52 additions and 76 deletions

View File

@ -7,15 +7,12 @@ use chrono::prelude::*;
use entry::Entry; use entry::Entry;
use event::Event; use event::Event;
use hash::Hash; use hash::Hash;
use historian::Historian;
use mint::Mint; use mint::Mint;
use plan::{Payment, Plan, Witness}; use plan::{Payment, Plan, Witness};
use recorder::Signal;
use signature::{KeyPair, PublicKey, Signature}; use signature::{KeyPair, PublicKey, Signature};
use std::collections::hash_map::Entry::Occupied; use std::collections::hash_map::Entry::Occupied;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::result; use std::result;
use std::sync::mpsc::{Receiver, SendError};
use transaction::Transaction; use transaction::Transaction;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
@ -23,7 +20,6 @@ pub enum AccountingError {
InsufficientFunds, InsufficientFunds,
InvalidTransfer, InvalidTransfer,
InvalidTransferSignature, InvalidTransferSignature,
SendError,
} }
pub type Result<T> = result::Result<T, AccountingError>; pub type Result<T> = result::Result<T, AccountingError>;
@ -34,7 +30,6 @@ fn apply_payment(balances: &mut HashMap<PublicKey, i64>, payment: &Payment) {
} }
pub struct Accountant { pub struct Accountant {
historian: Historian,
balances: HashMap<PublicKey, i64>, balances: HashMap<PublicKey, i64>,
pending: HashMap<Signature, Plan>, pending: HashMap<Signature, Plan>,
signatures: HashSet<Signature>, signatures: HashSet<Signature>,
@ -44,16 +39,10 @@ pub struct Accountant {
impl Accountant { impl Accountant {
/// Create an Accountant using a deposit. /// Create an Accountant using a deposit.
pub fn new_from_deposit( pub fn new_from_deposit(deposit: &Payment) -> Self {
start_hash: &Hash,
deposit: &Payment,
ms_per_tick: Option<u64>,
) -> Self {
let mut balances = HashMap::new(); let mut balances = HashMap::new();
apply_payment(&mut balances, &deposit); apply_payment(&mut balances, &deposit);
let historian = Historian::new(&start_hash, ms_per_tick);
Accountant { Accountant {
historian,
balances, balances,
pending: HashMap::new(), pending: HashMap::new(),
signatures: HashSet::new(), signatures: HashSet::new(),
@ -63,16 +52,16 @@ impl Accountant {
} }
/// Create an Accountant with only a Mint. Typically used by unit tests. /// Create an Accountant with only a Mint. Typically used by unit tests.
pub fn new(mint: &Mint, ms_per_tick: Option<u64>) -> Self { pub fn new(mint: &Mint) -> Self {
let deposit = Payment { let deposit = Payment {
to: mint.pubkey(), to: mint.pubkey(),
tokens: mint.tokens, tokens: mint.tokens,
}; };
Self::new_from_deposit(&mint.seed(), &deposit, ms_per_tick) Self::new_from_deposit(&deposit)
} }
/// Create an Accountant using an existing ledger. /// Create an Accountant using an existing ledger.
pub fn new_from_entries<I>(entries: I, ms_per_tick: Option<u64>) -> (Self, Hash) pub fn new_from_entries<I>(entries: I) -> (Self, Hash)
where where
I: IntoIterator<Item = Entry>, I: IntoIterator<Item = Entry>,
{ {
@ -80,8 +69,7 @@ impl Accountant {
// The first item in the ledger is required to be an entry with zero num_hashes, // The first item in the ledger is required to be an entry with zero num_hashes,
// which implies its id can be used as the ledger's seed. // which implies its id can be used as the ledger's seed.
let entry0 = entries.next().unwrap(); entries.next().unwrap();
let start_hash = entry0.id;
// The second item in the ledger is a special transaction where the to and from // The second item in the ledger is a special transaction where the to and from
// fields are the same. That entry should be treated as a deposit, not a // fields are the same. That entry should be treated as a deposit, not a
@ -93,7 +81,7 @@ impl Accountant {
None None
}; };
let mut acc = Self::new_from_deposit(&start_hash, &deposit.unwrap(), ms_per_tick); let mut acc = Self::new_from_deposit(&deposit.unwrap());
let mut last_id = entry1.id; let mut last_id = entry1.id;
for entry in entries { for entry in entries {
@ -105,30 +93,13 @@ impl Accountant {
(acc, last_id) (acc, last_id)
} }
pub fn receiver(&self) -> &Receiver<Entry> {
&self.historian.receiver
}
/// Process and log the given Transaction.
pub fn log_verified_transaction(&mut self, tr: Transaction) -> Result<()> {
self.process_verified_transaction(&tr)?;
if let Err(SendError(_)) = self.historian
.sender
.send(Signal::Event(Event::Transaction(tr)))
{
return Err(AccountingError::SendError);
}
Ok(())
}
/// Verify and process the given Transaction. /// Verify and process the given Transaction.
pub fn log_transaction(&mut self, tr: Transaction) -> Result<()> { pub fn process_transaction(&mut self, tr: Transaction) -> Result<()> {
if !tr.verify() { if !tr.verify() {
return Err(AccountingError::InvalidTransfer); return Err(AccountingError::InvalidTransfer);
} }
self.log_verified_transaction(tr) self.process_verified_transaction(&tr)
} }
fn reserve_signature(&mut self, sig: &Signature) -> bool { fn reserve_signature(&mut self, sig: &Signature) -> bool {
@ -231,7 +202,7 @@ impl Accountant {
) -> Result<Signature> { ) -> Result<Signature> {
let tr = Transaction::new(keypair, to, n, last_id); let tr = Transaction::new(keypair, to, n, last_id);
let sig = tr.sig; let sig = tr.sig;
self.log_transaction(tr).map(|_| sig) self.process_transaction(tr).map(|_| sig)
} }
/// Create, sign, and process a postdated Transaction from `keypair` /// Create, sign, and process a postdated Transaction from `keypair`
@ -247,7 +218,7 @@ impl Accountant {
) -> Result<Signature> { ) -> Result<Signature> {
let tr = Transaction::new_on_date(keypair, to, dt, n, last_id); let tr = Transaction::new_on_date(keypair, to, dt, n, last_id);
let sig = tr.sig; let sig = tr.sig;
self.log_transaction(tr).map(|_| sig) self.process_transaction(tr).map(|_| sig)
} }
pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> { pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> {
@ -258,14 +229,13 @@ impl Accountant {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use recorder::ExitReason;
use signature::KeyPairUtil; use signature::KeyPairUtil;
#[test] #[test]
fn test_accountant() { fn test_accountant() {
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let mut acc = Accountant::new(&alice, Some(2)); let mut acc = Accountant::new(&alice);
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed()) acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed())
.unwrap(); .unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
@ -273,18 +243,12 @@ mod tests {
acc.transfer(500, &alice.keypair(), bob_pubkey, alice.seed()) acc.transfer(500, &alice.keypair(), bob_pubkey, alice.seed())
.unwrap(); .unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500);
drop(acc.historian.sender);
assert_eq!(
acc.historian.thread_hdl.join().unwrap(),
ExitReason::RecvDisconnected
);
} }
#[test] #[test]
fn test_invalid_transfer() { fn test_invalid_transfer() {
let alice = Mint::new(11_000); let alice = Mint::new(11_000);
let mut acc = Accountant::new(&alice, Some(2)); let mut acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed()) acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed())
.unwrap(); .unwrap();
@ -296,25 +260,19 @@ mod tests {
let alice_pubkey = alice.keypair().pubkey(); let alice_pubkey = alice.keypair().pubkey();
assert_eq!(acc.get_balance(&alice_pubkey).unwrap(), 10_000); assert_eq!(acc.get_balance(&alice_pubkey).unwrap(), 10_000);
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
drop(acc.historian.sender);
assert_eq!(
acc.historian.thread_hdl.join().unwrap(),
ExitReason::RecvDisconnected
);
} }
#[test] #[test]
fn test_overspend_attack() { fn test_overspend_attack() {
let alice = Mint::new(1); let alice = Mint::new(1);
let mut acc = Accountant::new(&alice, None); let mut acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let mut tr = Transaction::new(&alice.keypair(), bob_pubkey, 1, alice.seed()); let mut tr = Transaction::new(&alice.keypair(), bob_pubkey, 1, alice.seed());
if let Plan::Pay(ref mut payment) = tr.plan { if let Plan::Pay(ref mut payment) = tr.plan {
payment.tokens = 2; // <-- attack! payment.tokens = 2; // <-- attack!
} }
assert_eq!( assert_eq!(
acc.log_transaction(tr.clone()), acc.process_transaction(tr.clone()),
Err(AccountingError::InvalidTransfer) Err(AccountingError::InvalidTransfer)
); );
@ -323,7 +281,7 @@ mod tests {
payment.tokens = 0; // <-- whoops! payment.tokens = 0; // <-- whoops!
} }
assert_eq!( assert_eq!(
acc.log_transaction(tr.clone()), acc.process_transaction(tr.clone()),
Err(AccountingError::InvalidTransfer) Err(AccountingError::InvalidTransfer)
); );
} }
@ -331,24 +289,18 @@ mod tests {
#[test] #[test]
fn test_transfer_to_newb() { fn test_transfer_to_newb() {
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let mut acc = Accountant::new(&alice, Some(2)); let mut acc = Accountant::new(&alice);
let alice_keypair = alice.keypair(); let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
acc.transfer(500, &alice_keypair, bob_pubkey, alice.seed()) acc.transfer(500, &alice_keypair, bob_pubkey, alice.seed())
.unwrap(); .unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500);
drop(acc.historian.sender);
assert_eq!(
acc.historian.thread_hdl.join().unwrap(),
ExitReason::RecvDisconnected
);
} }
#[test] #[test]
fn test_transfer_on_date() { fn test_transfer_on_date() {
let alice = Mint::new(1); let alice = Mint::new(1);
let mut acc = Accountant::new(&alice, Some(2)); let mut acc = Accountant::new(&alice);
let alice_keypair = alice.keypair(); let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now(); let dt = Utc::now();
@ -374,7 +326,7 @@ mod tests {
#[test] #[test]
fn test_transfer_after_date() { fn test_transfer_after_date() {
let alice = Mint::new(1); let alice = Mint::new(1);
let mut acc = Accountant::new(&alice, Some(2)); let mut acc = Accountant::new(&alice);
let alice_keypair = alice.keypair(); let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now(); let dt = Utc::now();
@ -391,7 +343,7 @@ mod tests {
#[test] #[test]
fn test_cancel_transfer() { fn test_cancel_transfer() {
let alice = Mint::new(1); let alice = Mint::new(1);
let mut acc = Accountant::new(&alice, Some(2)); let mut acc = Accountant::new(&alice);
let alice_keypair = alice.keypair(); let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now(); let dt = Utc::now();
@ -417,7 +369,7 @@ mod tests {
#[test] #[test]
fn test_duplicate_event_signature() { fn test_duplicate_event_signature() {
let alice = Mint::new(1); let alice = Mint::new(1);
let mut acc = Accountant::new(&alice, None); let mut acc = Accountant::new(&alice);
let sig = Signature::default(); let sig = Signature::default();
assert!(acc.reserve_signature(&sig)); assert!(acc.reserve_signature(&sig));
assert!(!acc.reserve_signature(&sig)); assert!(!acc.reserve_signature(&sig));

View File

@ -3,8 +3,11 @@
//! in flux. Clients should use AccountantStub to interact with it. //! in flux. Clients should use AccountantStub to interact with it.
use accountant::Accountant; use accountant::Accountant;
use historian::Historian;
use recorder::Signal;
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize};
use entry::Entry; use entry::Entry;
use event::Event;
use hash::Hash; use hash::Hash;
use result::Result; use result::Result;
use serde_json; use serde_json;
@ -13,7 +16,7 @@ use std::default::Default;
use std::io::Write; use std::io::Write;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::{channel, SendError};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
@ -25,6 +28,7 @@ pub struct AccountantSkel<W: Write + Send + 'static> {
acc: Accountant, acc: Accountant,
last_id: Hash, last_id: Hash,
writer: W, writer: W,
historian: Historian,
} }
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
@ -59,17 +63,18 @@ pub enum Response {
impl<W: Write + Send + 'static> AccountantSkel<W> { impl<W: Write + Send + 'static> AccountantSkel<W> {
/// Create a new AccountantSkel that wraps the given Accountant. /// Create a new AccountantSkel that wraps the given Accountant.
pub fn new(acc: Accountant, last_id: Hash, writer: W) -> Self { pub fn new(acc: Accountant, last_id: Hash, writer: W, historian: Historian) -> Self {
AccountantSkel { AccountantSkel {
acc, acc,
last_id, last_id,
writer, writer,
historian,
} }
} }
/// Process any Entry items that have been published by the Historian. /// Process any Entry items that have been published by the Historian.
pub fn sync(&mut self) -> Hash { pub fn sync(&mut self) -> Hash {
while let Ok(entry) = self.acc.receiver().try_recv() { while let Ok(entry) = self.historian.receiver.try_recv() {
self.last_id = entry.id; self.last_id = entry.id;
writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
} }
@ -80,8 +85,13 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
pub fn log_verified_request(&mut self, msg: Request) -> Option<Response> { pub fn log_verified_request(&mut self, msg: Request) -> Option<Response> {
match msg { match msg {
Request::Transaction(tr) => { Request::Transaction(tr) => {
if let Err(err) = self.acc.log_verified_transaction(tr) { if let Err(err) = self.acc.process_verified_transaction(&tr) {
eprintln!("Transaction error: {:?}", err); eprintln!("Transaction error: {:?}", err);
} else if let Err(SendError(_)) = self.historian
.sender
.send(Signal::Event(Event::Transaction(tr)))
{
eprintln!("Channel send error");
} }
None None
} }

View File

@ -87,6 +87,7 @@ impl AccountantStub {
mod tests { mod tests {
use super::*; use super::*;
use accountant::Accountant; use accountant::Accountant;
use historian::Historian;
use accountant_skel::AccountantSkel; use accountant_skel::AccountantSkel;
use mint::Mint; use mint::Mint;
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
@ -102,10 +103,16 @@ mod tests {
let addr = "127.0.0.1:9000"; let addr = "127.0.0.1:9000";
let send_addr = "127.0.0.1:9001"; let send_addr = "127.0.0.1:9001";
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let acc = Accountant::new(&alice, Some(30)); let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let acc = Arc::new(Mutex::new(AccountantSkel::new(acc, alice.seed(), sink()))); let historian = Historian::new(&alice.seed(), Some(30));
let acc = Arc::new(Mutex::new(AccountantSkel::new(
acc,
alice.seed(),
sink(),
historian,
)));
let _threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap(); let _threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap();
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));

View File

@ -2,6 +2,7 @@ extern crate serde_json;
extern crate solana; extern crate solana;
use solana::accountant::Accountant; use solana::accountant::Accountant;
use solana::historian::Historian;
use solana::accountant_skel::AccountantSkel; use solana::accountant_skel::AccountantSkel;
use std::io::{self, stdout, BufRead}; use std::io::{self, stdout, BufRead};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
@ -14,9 +15,15 @@ fn main() {
.lock() .lock()
.lines() .lines()
.map(|line| serde_json::from_str(&line.unwrap()).unwrap()); .map(|line| serde_json::from_str(&line.unwrap()).unwrap());
let (acc, last_id) = Accountant::new_from_entries(entries, Some(1000)); let (acc, last_id) = Accountant::new_from_entries(entries);
let historian = Historian::new(&last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let skel = Arc::new(Mutex::new(AccountantSkel::new(acc, last_id, stdout()))); let skel = Arc::new(Mutex::new(AccountantSkel::new(
acc,
last_id,
stdout(),
historian,
)));
eprintln!("Listening on {}", addr); eprintln!("Listening on {}", addr);
let threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap(); let threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap();
for t in threads { for t in threads {