Move JSON printing up the stack
This commit is contained in:
		@@ -44,7 +44,6 @@ pub struct Accountant {
 | 
				
			|||||||
    pub historian: Historian,
 | 
					    pub historian: Historian,
 | 
				
			||||||
    pub balances: HashMap<PublicKey, i64>,
 | 
					    pub balances: HashMap<PublicKey, i64>,
 | 
				
			||||||
    pub first_id: Hash,
 | 
					    pub first_id: Hash,
 | 
				
			||||||
    pub last_id: Hash,
 | 
					 | 
				
			||||||
    pending: HashMap<Signature, Plan>,
 | 
					    pending: HashMap<Signature, Plan>,
 | 
				
			||||||
    time_sources: HashSet<PublicKey>,
 | 
					    time_sources: HashSet<PublicKey>,
 | 
				
			||||||
    last_time: DateTime<Utc>,
 | 
					    last_time: DateTime<Utc>,
 | 
				
			||||||
@@ -67,7 +66,6 @@ impl Accountant {
 | 
				
			|||||||
            historian: hist,
 | 
					            historian: hist,
 | 
				
			||||||
            balances: HashMap::new(),
 | 
					            balances: HashMap::new(),
 | 
				
			||||||
            first_id: start_hash,
 | 
					            first_id: start_hash,
 | 
				
			||||||
            last_id: start_hash,
 | 
					 | 
				
			||||||
            pending: HashMap::new(),
 | 
					            pending: HashMap::new(),
 | 
				
			||||||
            time_sources: HashSet::new(),
 | 
					            time_sources: HashSet::new(),
 | 
				
			||||||
            last_time: Utc.timestamp(0, 0),
 | 
					            last_time: Utc.timestamp(0, 0),
 | 
				
			||||||
@@ -91,13 +89,6 @@ impl Accountant {
 | 
				
			|||||||
        Self::new_from_entries(mint.create_entries(), ms_per_tick)
 | 
					        Self::new_from_entries(mint.create_entries(), ms_per_tick)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn sync(self: &mut Self) -> Hash {
 | 
					 | 
				
			||||||
        while let Ok(entry) = self.historian.receiver.try_recv() {
 | 
					 | 
				
			||||||
            self.last_id = entry.id;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        self.last_id
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    fn is_deposit(allow_deposits: bool, from: &PublicKey, plan: &Plan) -> bool {
 | 
					    fn is_deposit(allow_deposits: bool, from: &PublicKey, plan: &Plan) -> bool {
 | 
				
			||||||
        if let Plan::Pay(ref payment) = *plan {
 | 
					        if let Plan::Pay(ref payment) = *plan {
 | 
				
			||||||
            allow_deposits && *from == payment.to
 | 
					            allow_deposits && *from == payment.to
 | 
				
			||||||
@@ -210,8 +201,9 @@ impl Accountant {
 | 
				
			|||||||
        n: i64,
 | 
					        n: i64,
 | 
				
			||||||
        keypair: &KeyPair,
 | 
					        keypair: &KeyPair,
 | 
				
			||||||
        to: PublicKey,
 | 
					        to: PublicKey,
 | 
				
			||||||
 | 
					        last_id: Hash,
 | 
				
			||||||
    ) -> Result<Signature> {
 | 
					    ) -> Result<Signature> {
 | 
				
			||||||
        let tr = Transaction::new(keypair, to, n, self.last_id);
 | 
					        let tr = Transaction::new(keypair, to, n, last_id);
 | 
				
			||||||
        let sig = tr.sig;
 | 
					        let sig = tr.sig;
 | 
				
			||||||
        self.process_transaction(tr).map(|_| sig)
 | 
					        self.process_transaction(tr).map(|_| sig)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -222,8 +214,9 @@ impl Accountant {
 | 
				
			|||||||
        keypair: &KeyPair,
 | 
					        keypair: &KeyPair,
 | 
				
			||||||
        to: PublicKey,
 | 
					        to: PublicKey,
 | 
				
			||||||
        dt: DateTime<Utc>,
 | 
					        dt: DateTime<Utc>,
 | 
				
			||||||
 | 
					        last_id: Hash,
 | 
				
			||||||
    ) -> Result<Signature> {
 | 
					    ) -> Result<Signature> {
 | 
				
			||||||
        let tr = Transaction::new_on_date(keypair, to, dt, n, self.last_id);
 | 
					        let tr = Transaction::new_on_date(keypair, to, dt, n, last_id);
 | 
				
			||||||
        let sig = tr.sig;
 | 
					        let sig = tr.sig;
 | 
				
			||||||
        self.process_transaction(tr).map(|_| sig)
 | 
					        self.process_transaction(tr).map(|_| sig)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -244,10 +237,12 @@ mod tests {
 | 
				
			|||||||
        let alice = Mint::new(10_000);
 | 
					        let alice = Mint::new(10_000);
 | 
				
			||||||
        let bob_pubkey = KeyPair::new().pubkey();
 | 
					        let bob_pubkey = KeyPair::new().pubkey();
 | 
				
			||||||
        let mut acc = Accountant::new(&alice, Some(2));
 | 
					        let mut acc = Accountant::new(&alice, Some(2));
 | 
				
			||||||
        acc.transfer(1_000, &alice.keypair(), bob_pubkey).unwrap();
 | 
					        acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed())
 | 
				
			||||||
 | 
					            .unwrap();
 | 
				
			||||||
        assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
 | 
					        assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        acc.transfer(500, &alice.keypair(), bob_pubkey).unwrap();
 | 
					        acc.transfer(500, &alice.keypair(), bob_pubkey, alice.seed())
 | 
				
			||||||
 | 
					            .unwrap();
 | 
				
			||||||
        assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500);
 | 
					        assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        drop(acc.historian.sender);
 | 
					        drop(acc.historian.sender);
 | 
				
			||||||
@@ -262,9 +257,10 @@ mod tests {
 | 
				
			|||||||
        let alice = Mint::new(11_000);
 | 
					        let alice = Mint::new(11_000);
 | 
				
			||||||
        let mut acc = Accountant::new(&alice, Some(2));
 | 
					        let mut acc = Accountant::new(&alice, Some(2));
 | 
				
			||||||
        let bob_pubkey = KeyPair::new().pubkey();
 | 
					        let bob_pubkey = KeyPair::new().pubkey();
 | 
				
			||||||
        acc.transfer(1_000, &alice.keypair(), bob_pubkey).unwrap();
 | 
					        acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed())
 | 
				
			||||||
 | 
					            .unwrap();
 | 
				
			||||||
        assert_eq!(
 | 
					        assert_eq!(
 | 
				
			||||||
            acc.transfer(10_001, &alice.keypair(), bob_pubkey),
 | 
					            acc.transfer(10_001, &alice.keypair(), bob_pubkey, alice.seed()),
 | 
				
			||||||
            Err(AccountingError::InsufficientFunds)
 | 
					            Err(AccountingError::InsufficientFunds)
 | 
				
			||||||
        );
 | 
					        );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -309,7 +305,8 @@ mod tests {
 | 
				
			|||||||
        let mut acc = Accountant::new(&alice, Some(2));
 | 
					        let mut acc = Accountant::new(&alice, Some(2));
 | 
				
			||||||
        let alice_keypair = alice.keypair();
 | 
					        let alice_keypair = alice.keypair();
 | 
				
			||||||
        let bob_pubkey = KeyPair::new().pubkey();
 | 
					        let bob_pubkey = KeyPair::new().pubkey();
 | 
				
			||||||
        acc.transfer(500, &alice_keypair, bob_pubkey).unwrap();
 | 
					        acc.transfer(500, &alice_keypair, bob_pubkey, alice.seed())
 | 
				
			||||||
 | 
					            .unwrap();
 | 
				
			||||||
        assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500);
 | 
					        assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        drop(acc.historian.sender);
 | 
					        drop(acc.historian.sender);
 | 
				
			||||||
@@ -326,7 +323,7 @@ mod tests {
 | 
				
			|||||||
        let alice_keypair = alice.keypair();
 | 
					        let alice_keypair = alice.keypair();
 | 
				
			||||||
        let bob_pubkey = KeyPair::new().pubkey();
 | 
					        let bob_pubkey = KeyPair::new().pubkey();
 | 
				
			||||||
        let dt = Utc::now();
 | 
					        let dt = Utc::now();
 | 
				
			||||||
        acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt)
 | 
					        acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.seed())
 | 
				
			||||||
            .unwrap();
 | 
					            .unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // Alice's balance will be zero because all funds are locked up.
 | 
					        // Alice's balance will be zero because all funds are locked up.
 | 
				
			||||||
@@ -355,7 +352,7 @@ mod tests {
 | 
				
			|||||||
        acc.process_verified_timestamp(alice.pubkey(), dt).unwrap();
 | 
					        acc.process_verified_timestamp(alice.pubkey(), dt).unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // It's now past now, so this transfer should be processed immediately.
 | 
					        // It's now past now, so this transfer should be processed immediately.
 | 
				
			||||||
        acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt)
 | 
					        acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.seed())
 | 
				
			||||||
            .unwrap();
 | 
					            .unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        assert_eq!(acc.get_balance(&alice.pubkey()), Some(0));
 | 
					        assert_eq!(acc.get_balance(&alice.pubkey()), Some(0));
 | 
				
			||||||
@@ -369,7 +366,7 @@ mod tests {
 | 
				
			|||||||
        let alice_keypair = alice.keypair();
 | 
					        let alice_keypair = alice.keypair();
 | 
				
			||||||
        let bob_pubkey = KeyPair::new().pubkey();
 | 
					        let bob_pubkey = KeyPair::new().pubkey();
 | 
				
			||||||
        let dt = Utc::now();
 | 
					        let dt = Utc::now();
 | 
				
			||||||
        let sig = acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt)
 | 
					        let sig = acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.seed())
 | 
				
			||||||
            .unwrap();
 | 
					            .unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // Alice's balance will be zero because all funds are locked up.
 | 
					        // Alice's balance will be zero because all funds are locked up.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -12,9 +12,11 @@ use std::time::Duration;
 | 
				
			|||||||
use std::sync::mpsc::channel;
 | 
					use std::sync::mpsc::channel;
 | 
				
			||||||
use std::thread::{spawn, JoinHandle};
 | 
					use std::thread::{spawn, JoinHandle};
 | 
				
			||||||
use std::default::Default;
 | 
					use std::default::Default;
 | 
				
			||||||
 | 
					use serde_json;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub struct AccountantSkel {
 | 
					pub struct AccountantSkel {
 | 
				
			||||||
    pub acc: Accountant,
 | 
					    pub acc: Accountant,
 | 
				
			||||||
 | 
					    pub last_id: Hash,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(Serialize, Deserialize, Debug)]
 | 
					#[derive(Serialize, Deserialize, Debug)]
 | 
				
			||||||
@@ -34,7 +36,16 @@ pub enum Response {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
impl AccountantSkel {
 | 
					impl AccountantSkel {
 | 
				
			||||||
    pub fn new(acc: Accountant) -> Self {
 | 
					    pub fn new(acc: Accountant) -> Self {
 | 
				
			||||||
        AccountantSkel { acc }
 | 
					        let last_id = acc.first_id;
 | 
				
			||||||
 | 
					        AccountantSkel { acc, last_id }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pub fn sync(self: &mut Self) -> Hash {
 | 
				
			||||||
 | 
					        while let Ok(entry) = self.acc.historian.receiver.try_recv() {
 | 
				
			||||||
 | 
					            self.last_id = entry.id;
 | 
				
			||||||
 | 
					            println!("{}", serde_json::to_string(&entry).unwrap());
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        self.last_id
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn process_request(self: &mut Self, msg: Request) -> Option<Response> {
 | 
					    pub fn process_request(self: &mut Self, msg: Request) -> Option<Response> {
 | 
				
			||||||
@@ -52,8 +63,8 @@ impl AccountantSkel {
 | 
				
			|||||||
            Request::GetEntries { .. } => Some(Response::Entries { entries: vec![] }),
 | 
					            Request::GetEntries { .. } => Some(Response::Entries { entries: vec![] }),
 | 
				
			||||||
            Request::GetId { is_last } => Some(Response::Id {
 | 
					            Request::GetId { is_last } => Some(Response::Id {
 | 
				
			||||||
                id: if is_last {
 | 
					                id: if is_last {
 | 
				
			||||||
                    self.acc.sync();
 | 
					                    self.sync();
 | 
				
			||||||
                    self.acc.last_id
 | 
					                    self.last_id
 | 
				
			||||||
                } else {
 | 
					                } else {
 | 
				
			||||||
                    self.acc.first_id
 | 
					                    self.acc.first_id
 | 
				
			||||||
                },
 | 
					                },
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -5,7 +5,7 @@ use std::thread::{spawn, JoinHandle};
 | 
				
			|||||||
use std::collections::HashSet;
 | 
					use std::collections::HashSet;
 | 
				
			||||||
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
 | 
					use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
 | 
				
			||||||
use std::time::Instant;
 | 
					use std::time::Instant;
 | 
				
			||||||
use hash::{hash, Hash};
 | 
					use hash::Hash;
 | 
				
			||||||
use entry::Entry;
 | 
					use entry::Entry;
 | 
				
			||||||
use recorder::{ExitReason, Recorder, Signal};
 | 
					use recorder::{ExitReason, Recorder, Signal};
 | 
				
			||||||
use signature::Signature;
 | 
					use signature::Signature;
 | 
				
			||||||
@@ -48,8 +48,7 @@ impl Historian {
 | 
				
			|||||||
                    return err;
 | 
					                    return err;
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                if ms_per_tick.is_some() {
 | 
					                if ms_per_tick.is_some() {
 | 
				
			||||||
                    recorder.last_id = hash(&recorder.last_id);
 | 
					                    recorder.hash();
 | 
				
			||||||
                    recorder.num_hashes += 1;
 | 
					 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        })
 | 
					        })
 | 
				
			||||||
@@ -127,12 +126,9 @@ mod tests {
 | 
				
			|||||||
        hist.sender.send(Signal::Tick).unwrap();
 | 
					        hist.sender.send(Signal::Tick).unwrap();
 | 
				
			||||||
        drop(hist.sender);
 | 
					        drop(hist.sender);
 | 
				
			||||||
        let entries: Vec<Entry> = hist.receiver.iter().collect();
 | 
					        let entries: Vec<Entry> = hist.receiver.iter().collect();
 | 
				
			||||||
 | 
					        assert!(entries.len() > 1);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // Ensure one entry is sent back for each tick sent in.
 | 
					        // Ensure the ID is not the seed.
 | 
				
			||||||
        assert_eq!(entries.len(), 1);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        // Ensure the ID is not the seed, which indicates another Tick
 | 
					 | 
				
			||||||
        // was recorded before the one we sent.
 | 
					 | 
				
			||||||
        assert_ne!(entries[0].id, zero);
 | 
					        assert_ne!(entries[0].id, zero);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -8,10 +8,9 @@
 | 
				
			|||||||
use std::sync::mpsc::{Receiver, SyncSender, TryRecvError};
 | 
					use std::sync::mpsc::{Receiver, SyncSender, TryRecvError};
 | 
				
			||||||
use std::time::{Duration, Instant};
 | 
					use std::time::{Duration, Instant};
 | 
				
			||||||
use std::mem;
 | 
					use std::mem;
 | 
				
			||||||
use hash::Hash;
 | 
					use hash::{hash, Hash};
 | 
				
			||||||
use entry::{create_entry_mut, Entry};
 | 
					use entry::{create_entry_mut, Entry};
 | 
				
			||||||
use event::Event;
 | 
					use event::Event;
 | 
				
			||||||
use serde_json;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub enum Signal {
 | 
					pub enum Signal {
 | 
				
			||||||
    Tick,
 | 
					    Tick,
 | 
				
			||||||
@@ -25,12 +24,12 @@ pub enum ExitReason {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub struct Recorder {
 | 
					pub struct Recorder {
 | 
				
			||||||
    pub sender: SyncSender<Entry>,
 | 
					    sender: SyncSender<Entry>,
 | 
				
			||||||
    pub receiver: Receiver<Signal>,
 | 
					    receiver: Receiver<Signal>,
 | 
				
			||||||
    pub last_id: Hash,
 | 
					    last_hash: Hash,
 | 
				
			||||||
    pub events: Vec<Event>,
 | 
					    events: Vec<Event>,
 | 
				
			||||||
    pub num_hashes: u64,
 | 
					    num_hashes: u64,
 | 
				
			||||||
    pub num_ticks: u64,
 | 
					    num_ticks: u64,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl Recorder {
 | 
					impl Recorder {
 | 
				
			||||||
@@ -38,18 +37,25 @@ impl Recorder {
 | 
				
			|||||||
        Recorder {
 | 
					        Recorder {
 | 
				
			||||||
            receiver,
 | 
					            receiver,
 | 
				
			||||||
            sender,
 | 
					            sender,
 | 
				
			||||||
            last_id: start_hash,
 | 
					            last_hash: start_hash,
 | 
				
			||||||
            events: vec![],
 | 
					            events: vec![],
 | 
				
			||||||
            num_hashes: 0,
 | 
					            num_hashes: 0,
 | 
				
			||||||
            num_ticks: 0,
 | 
					            num_ticks: 0,
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn record_entry(&mut self) -> Result<Entry, ExitReason> {
 | 
					    pub fn hash(&mut self) {
 | 
				
			||||||
 | 
					        self.last_hash = hash(&self.last_hash);
 | 
				
			||||||
 | 
					        self.num_hashes += 1;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pub fn record_entry(&mut self) -> Result<(), ExitReason> {
 | 
				
			||||||
        let events = mem::replace(&mut self.events, vec![]);
 | 
					        let events = mem::replace(&mut self.events, vec![]);
 | 
				
			||||||
        let entry = create_entry_mut(&mut self.last_id, &mut self.num_hashes, events);
 | 
					        let entry = create_entry_mut(&mut self.last_hash, &mut self.num_hashes, events);
 | 
				
			||||||
        println!("{}", serde_json::to_string(&entry).unwrap());
 | 
					        self.sender
 | 
				
			||||||
        Ok(entry)
 | 
					            .send(entry)
 | 
				
			||||||
 | 
					            .or(Err(ExitReason::SendDisconnected))?;
 | 
				
			||||||
 | 
					        Ok(())
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn process_events(
 | 
					    pub fn process_events(
 | 
				
			||||||
@@ -68,10 +74,7 @@ impl Recorder {
 | 
				
			|||||||
            match self.receiver.try_recv() {
 | 
					            match self.receiver.try_recv() {
 | 
				
			||||||
                Ok(signal) => match signal {
 | 
					                Ok(signal) => match signal {
 | 
				
			||||||
                    Signal::Tick => {
 | 
					                    Signal::Tick => {
 | 
				
			||||||
                        let entry = self.record_entry()?;
 | 
					                        self.record_entry()?;
 | 
				
			||||||
                        self.sender
 | 
					 | 
				
			||||||
                            .send(entry)
 | 
					 | 
				
			||||||
                            .or(Err(ExitReason::SendDisconnected))?;
 | 
					 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                    Signal::Event(event) => {
 | 
					                    Signal::Event(event) => {
 | 
				
			||||||
                        self.events.push(event);
 | 
					                        self.events.push(event);
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user