diff --git a/Cargo.toml b/Cargo.toml index c5bd748293..23f262848f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,10 @@ path = "src/bin/demo.rs" name = "silk-client-demo" path = "src/bin/client-demo.rs" +[[bin]] +name = "silk-testnode" +path = "src/bin/testnode.rs" + [badges] codecov = { repository = "loomprotocol/silk", branch = "master", service = "github" } diff --git a/src/accountant.rs b/src/accountant.rs index 6596e7af26..968ae6d3dd 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -2,7 +2,7 @@ //! event log to record transactions. Its users can deposit funds and //! transfer funds to other users. -use log::{verify_entry, Event, PublicKey, Sha256Hash, Signature}; +use log::{Event, PublicKey, Sha256Hash, Signature}; use historian::Historian; use ring::signature::Ed25519KeyPair; use std::sync::mpsc::{RecvError, SendError}; @@ -24,8 +24,9 @@ impl Accountant { } } - pub fn process_event(self: &mut Self, event: Event) { - match event { + pub fn process_event(self: &mut Self, event: &Event) { + println!("accountant: Processing event: {:?}", event); + match *event { Event::Claim { key, data, .. } => { if self.balances.contains_key(&key) { if let Some(x) = self.balances.get_mut(&key) { @@ -52,11 +53,21 @@ impl Accountant { } pub fn sync(self: &mut Self) { + let mut entries = vec![]; while let Ok(entry) = self.historian.receiver.try_recv() { - assert!(verify_entry(&entry, &self.end_hash)); - self.end_hash = entry.end_hash; - - self.process_event(entry.event); + println!("accountant: got event {:?}", entry.event); + entries.push(entry); + } + // TODO: Does this cause the historian's channel to get blocked? + //use log::verify_slice_u64; + //println!("accountant: verifying {} entries...", entries.len()); + //assert!(verify_slice_u64(&entries, &self.end_hash)); + //println!("accountant: Done verifying {} entries.", entries.len()); + if let Some(last_entry) = entries.last() { + self.end_hash = last_entry.end_hash; + } + for e in &entries { + self.process_event(&e.event); } } @@ -88,16 +99,20 @@ impl Accountant { data: u64, sig: Signature, ) -> Result<(), SendError>> { + println!("accountant: Checking funds (needs sync)..."); if self.get_balance(&from).unwrap() < data { // TODO: Replace the SendError result with a custom one. + println!("Error: Insufficient funds"); return Ok(()); } + println!("accountant: Sufficient funds."); let event = Event::Transaction { from, to, data, sig, }; + println!("accountant: Sending Transaction to historian."); self.historian.sender.send(event) } @@ -115,7 +130,9 @@ impl Accountant { } pub fn get_balance(self: &mut Self, pubkey: &PublicKey) -> Result { + println!("accountant: syncing the log..."); self.sync(); + println!("accountant: done syncing."); Ok(*self.balances.get(pubkey).unwrap_or(&0)) } } diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 008b0e7d40..7d5c39bee3 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -35,14 +35,16 @@ impl AccountantSkel { AccountantSkel { obj } } - pub fn process_message(self: &mut Self, msg: Request) -> Option { + pub fn process_request(self: &mut Self, msg: Request) -> Option { match msg { Request::Deposit { key, val, sig } => { let _ = self.obj.deposit_signed(key, val, sig); None } Request::Transfer { from, to, val, sig } => { + println!("skel: Invoking transfer_signed..."); let _ = self.obj.transfer_signed(from, to, val, sig); + println!("skel: transfer_signed done."); None } Request::GetBalance { key } => { @@ -60,17 +62,21 @@ impl AccountantSkel { let listener = TcpListener::bind(addr)?; let mut buf = vec![0u8; 1024]; loop { - let (mut stream, _addr) = listener.accept()?; - - // TODO: Guard against large message DoS attack. + println!("skel: Waiting for incoming connections..."); + let (mut stream, addr) = listener.accept()?; + println!("skel: Accepted incoming connection frm {:?}.", addr); let _sz = stream.read(&mut buf)?; // TODO: Return a descriptive error message if deserialization fails. let req = deserialize(&buf).expect("deserialize request"); + println!("skel: Got request {:?}", req); - if let Some(resp) = self.process_message(req) { + println!("skel: Processing request..."); + if let Some(resp) = self.process_request(req) { + println!("skel: Writing response..."); stream.write(&serialize(&resp).expect("serialize response"))?; } + println!("skel: Done processing request."); } } } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 5c5749b1bb..10568eee6d 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -49,8 +49,13 @@ impl AccountantStub { ) -> io::Result { let req = Request::Transfer { from, to, val, sig }; let data = serialize(&req).unwrap(); + println!("TcpStream::connect()..."); let mut stream = TcpStream::connect(&self.addr)?; - stream.write(&data) + println!("Connected."); + println!("accountant_stub: Writing transfer message..."); + let ret = stream.write(&data); + println!("Done."); + ret } pub fn transfer( diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 209893f4ac..fd8b6be2d0 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -10,13 +10,18 @@ fn main() { let mut acc = AccountantStub::new(addr); let alice_keypair = generate_keypair(); let bob_keypair = generate_keypair(); + println!("Depositing..."); acc.deposit(10_000, &alice_keypair).unwrap(); acc.deposit(1_000, &bob_keypair).unwrap(); + println!("Done."); sleep(Duration::from_millis(30)); let bob_pubkey = get_pubkey(&bob_keypair); + println!("Transferring..."); acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); + println!("Done."); - sleep(Duration::from_millis(300)); - assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500); + sleep(Duration::from_millis(30)); + println!("Done. Checking balance."); + println!("Balance {}", acc.get_balance(&bob_pubkey).unwrap()); } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs new file mode 100644 index 0000000000..8e4adb9f71 --- /dev/null +++ b/src/bin/testnode.rs @@ -0,0 +1,13 @@ +extern crate silk; + +use silk::accountant_skel::AccountantSkel; +use silk::accountant::Accountant; +use silk::log::Sha256Hash; + +fn main() { + let addr = "127.0.0.1:8000"; + let zero = Sha256Hash::default(); + let acc = Accountant::new(&zero, Some(1000)); + let mut skel = AccountantSkel::new(acc); + skel.serve(addr).unwrap(); +} diff --git a/src/historian.rs b/src/historian.rs index 80df3ba6f5..0ed4fcbe7d 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -10,6 +10,7 @@ use std::sync::mpsc::{Receiver, Sender}; use std::time::{Duration, SystemTime}; use log::{hash, hash_event, verify_event, Entry, Event, Sha256Hash}; use serde::Serialize; +use std::fmt::Debug; pub struct Historian { pub sender: Sender>, @@ -22,13 +23,14 @@ pub enum ExitReason { RecvDisconnected, SendDisconnected, } -fn log_event( +fn log_event( sender: &Sender>, num_hashes: &mut u64, end_hash: &mut Sha256Hash, event: Event, ) -> Result<(), (Entry, ExitReason)> { *end_hash = hash_event(end_hash, &event); + println!("historian: logging event {:?}", event); let entry = Entry { end_hash: *end_hash, num_hashes: *num_hashes, @@ -41,7 +43,7 @@ fn log_event( Ok(()) } -fn log_events( +fn log_events( receiver: &Receiver>, sender: &Sender>, num_hashes: &mut u64, @@ -82,7 +84,7 @@ fn log_events( /// A background thread that will continue tagging received Event messages and /// sending back Entry messages until either the receiver or sender channel is closed. -pub fn create_logger( +pub fn create_logger( start_hash: Sha256Hash, ms_per_tick: Option, receiver: Receiver>, @@ -112,7 +114,7 @@ pub fn create_logger( }) } -impl Historian { +impl Historian { pub fn new(start_hash: &Sha256Hash, ms_per_tick: Option) -> Self { use std::sync::mpsc::channel; let (sender, event_receiver) = channel(); diff --git a/src/log.rs b/src/log.rs index a0e6b739ce..523a12afaa 100644 --- a/src/log.rs +++ b/src/log.rs @@ -219,6 +219,14 @@ pub fn verify_slice(events: &[Entry], start_hash: &Sha256Hash) -> bo event_pairs.all(|(x0, x1)| verify_entry(&x1, &x0.end_hash)) } +/// Verifies the hashes and counts of a slice of events are all consistent. +pub fn verify_slice_u64(events: &[Entry], start_hash: &Sha256Hash) -> bool { + use rayon::prelude::*; + let genesis = [Entry::new_tick(Default::default(), start_hash)]; + let event_pairs = genesis.par_iter().chain(events).zip(events); + event_pairs.all(|(x0, x1)| verify_entry(&x1, &x0.end_hash)) +} + /// Verifies the hashes and events serially. Exists only for reference. pub fn verify_slice_seq(events: &[Entry], start_hash: &Sha256Hash) -> bool { let genesis = [Entry::new_tick(0, start_hash)];