diff --git a/Cargo.toml b/Cargo.toml index 433712ae68..747cd4f38d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,10 @@ path = "src/bin/genesis-demo.rs" name = "solana-mint" path = "src/bin/mint.rs" +[[bin]] +name = "solana-mint-demo" +path = "src/bin/mint-demo.rs" + [badges] codecov = { repository = "solana-labs/solana", branch = "master", service = "github" } diff --git a/README.md b/README.md index 611be8a836..729d99ac72 100644 --- a/README.md +++ b/README.md @@ -39,25 +39,28 @@ $ cd solana The testnode server is initialized with a ledger from stdin and generates new ledger entries on stdout. To create the input ledger, we'll need to create *the mint* and use it to generate a *genesis ledger*. It's done in -two steps because the mint.json file contains a private key that will be +two steps because the mint-demo.json file contains private keys that will be used later in this demo. ```bash - $ echo 1000000000 | cargo run --release --bin solana-mint | tee mint.json - $ cat mint.json | cargo run --release --bin solana-genesis | tee genesis.log + $ echo 1000000000 | cargo run --release --bin solana-mint-demo > mint-demo.json + $ cat mint-demo.json | cargo run --release --bin solana-genesis-demo > genesis.log ``` Now you can start the server: ```bash - $ cat genesis.log | cargo run --release --bin solana-testnode | tee transactions0.log + $ cat genesis.log | cargo run --release --bin solana-testnode > transactions0.log ``` +Wait a few seconds for the server to initialize. It will print "Ready." when it's safe +to start sending it transactions. + Then, in a separate shell, let's execute some transactions. Note we pass in the JSON configuration file here, not the genesis ledger. ```bash - $ cat mint.json | cargo run --release --bin solana-client-demo + $ cat mint-demo.json | cargo run --release --bin solana-client-demo ``` Now kill the server with Ctrl-C, and take a look at the ledger. You should @@ -73,14 +76,14 @@ Now restart the server from where we left off. Pass it both the genesis ledger, the transaction ledger. ```bash - $ cat genesis.log transactions0.log | cargo run --release --bin solana-testnode | tee transactions1.log + $ cat genesis.log transactions0.log | cargo run --release --bin solana-testnode > transactions1.log ``` Lastly, run the client demo again, and verify that all funds were spent in the previous round, and so no additional transactions are added. ```bash - $ cat mint.json | cargo run --release --bin solana-client-demo + $ cat mint-demo.json | cargo run --release --bin solana-client-demo ``` Stop the server again, and verify there are only Tick entries, and no Transaction entries. diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index b4bf805c63..c3811b4214 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -33,6 +33,7 @@ pub struct AccountantSkel { last_id: Hash, writer: W, historian: Historian, + entry_info_subscribers: Vec, } #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] @@ -41,6 +42,19 @@ pub enum Request { Transaction(Transaction), GetBalance { key: PublicKey }, GetLastId, + Subscribe { subscriptions: Vec }, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Subscription { + EntryInfo, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EntryInfo { + pub id: Hash, + pub num_hashes: u64, + pub num_events: u64, } impl Request { @@ -56,7 +70,7 @@ impl Request { #[derive(Serialize, Deserialize, Debug)] pub enum Response { Balance { key: PublicKey, val: Option }, - Entries { entries: Vec }, + EntryInfo(EntryInfo), LastId { id: Hash }, } @@ -68,6 +82,22 @@ impl AccountantSkel { last_id, writer, historian, + entry_info_subscribers: vec![], + } + } + + fn notify_entry_info_subscribers(&mut self, entry: &Entry) { + // TODO: No need to bind(). + let socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); + + for addr in &self.entry_info_subscribers { + let entry_info = EntryInfo { + id: entry.id, + num_hashes: entry.num_hashes, + num_events: entry.events.len() as u64, + }; + let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); + let _res = socket.send_to(&data, addr); } } @@ -77,6 +107,7 @@ impl AccountantSkel { self.last_id = entry.id; self.acc.register_entry_id(&self.last_id); writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); + self.notify_entry_info_subscribers(&entry); } self.last_id } @@ -94,6 +125,14 @@ impl AccountantSkel { } Request::GetLastId => Some((Response::LastId { id: self.sync() }, rsp_addr)), Request::Transaction(_) => unreachable!(), + Request::Subscribe { subscriptions } => { + for subscription in subscriptions { + match subscription { + Subscription::EntryInfo => self.entry_info_subscribers.push(rsp_addr), + } + } + None + } } } @@ -242,7 +281,11 @@ impl AccountantSkel { blob_sender.send(blobs)?; } packet_recycler.recycle(msgs); + + // Write new entries to the ledger and notify subscribers. + obj.lock().unwrap().sync(); } + Ok(()) } @@ -286,8 +329,13 @@ impl AccountantSkel { &packet_recycler, &blob_recycler, ); - if e.is_err() && exit.load(Ordering::Relaxed) { - break; + if e.is_err() { + // Assume this was a timeout, so sync any empty entries. + skel.lock().unwrap().sync(); + + if exit.load(Ordering::Relaxed) { + break; + } } }); Ok(vec![t_receiver, t_responder, t_server, t_verifier]) @@ -426,7 +474,7 @@ mod tests { let socket = UdpSocket::bind(send_addr).unwrap(); socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); - let acc = AccountantStub::new(&addr, socket); + let mut acc = AccountantStub::new(&addr, socket); let last_id = acc.get_last_id().wait().unwrap(); let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 5edfa6a053..2dd331da7b 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -3,11 +3,12 @@ //! this object instead of writing messages to the network directly. The binary //! encoding of its messages are unstable and may change in future releases. -use accountant_skel::{Request, Response}; +use accountant_skel::{Request, Response, Subscription}; use bincode::{deserialize, serialize}; -use futures::future::{err, ok, FutureResult}; +use futures::future::{ok, FutureResult}; use hash::Hash; use signature::{KeyPair, PublicKey, Signature}; +use std::collections::HashMap; use std::io; use std::net::UdpSocket; use transaction::Transaction; @@ -15,6 +16,9 @@ use transaction::Transaction; pub struct AccountantStub { pub addr: String, pub socket: UdpSocket, + last_id: Option, + num_events: u64, + balances: HashMap>, } impl AccountantStub { @@ -22,9 +26,43 @@ impl AccountantStub { /// over `socket`. To receive responses, the caller must bind `socket` /// to a public address before invoking AccountantStub methods. pub fn new(addr: &str, socket: UdpSocket) -> Self { - AccountantStub { + let stub = AccountantStub { addr: addr.to_string(), socket, + last_id: None, + num_events: 0, + balances: HashMap::new(), + }; + stub.init(); + stub + } + + pub fn init(&self) { + let subscriptions = vec![Subscription::EntryInfo]; + let req = Request::Subscribe { subscriptions }; + let data = serialize(&req).expect("serialize Subscribe"); + let _res = self.socket.send_to(&data, &self.addr); + } + + pub fn recv_response(&self) -> io::Result { + let mut buf = vec![0u8; 1024]; + self.socket.recv_from(&mut buf)?; + let resp = deserialize(&buf).expect("deserialize balance"); + Ok(resp) + } + + pub fn process_response(&mut self, resp: Response) { + match resp { + Response::Balance { key, val } => { + self.balances.insert(key, val); + } + Response::LastId { id } => { + self.last_id = Some(id); + } + Response::EntryInfo(entry_info) => { + self.last_id = Some(entry_info.id); + self.num_events += entry_info.num_events; + } } } @@ -52,42 +90,67 @@ impl AccountantStub { /// Request the balance of the user holding `pubkey`. This method blocks /// until the server sends a response. If the response packet is dropped /// by the network, this method will hang indefinitely. - pub fn get_balance(&self, pubkey: &PublicKey) -> FutureResult { + pub fn get_balance(&mut self, pubkey: &PublicKey) -> FutureResult { let req = Request::GetBalance { key: *pubkey }; let data = serialize(&req).expect("serialize GetBalance"); self.socket .send_to(&data, &self.addr) .expect("buffer error"); - let mut buf = vec![0u8; 1024]; - self.socket.recv_from(&mut buf).expect("buffer error"); - let resp = deserialize(&buf).expect("deserialize balance"); - if let Response::Balance { key, val } = resp { - assert_eq!(key, *pubkey); - return match val { - Some(x) => ok(x), - _ => err(0), - }; + let mut done = false; + while !done { + let resp = self.recv_response().expect("recv response"); + if let &Response::Balance { ref key, .. } = &resp { + done = key == pubkey; + } + self.process_response(resp); } - err(0) + ok(self.balances[pubkey].unwrap()) } /// Request the last Entry ID from the server. This method blocks /// until the server sends a response. At the time of this writing, /// it also has the side-effect of causing the server to log any /// entries that have been published by the Historian. - pub fn get_last_id(&self) -> FutureResult { + pub fn get_last_id(&mut self) -> FutureResult { let req = Request::GetLastId; let data = serialize(&req).expect("serialize GetId"); self.socket .send_to(&data, &self.addr) .expect("buffer error"); - let mut buf = vec![0u8; 1024]; - self.socket.recv_from(&mut buf).expect("buffer error"); - let resp = deserialize(&buf).expect("deserialize Id"); - if let Response::LastId { id } = resp { - return ok(id); + let mut done = false; + while !done { + let resp = self.recv_response().expect("recv response"); + if let &Response::LastId { .. } = &resp { + done = true; + } + self.process_response(resp); } - ok(Default::default()) + ok(self.last_id.unwrap_or(Hash::default())) + } + + /// Return the number of transactions the server processed since creating + /// this stub instance. + pub fn transaction_count(&mut self) -> u64 { + // Wait for at least one EntryInfo. + let mut done = false; + while !done { + let resp = self.recv_response().expect("recv response"); + if let &Response::EntryInfo(_) = &resp { + done = true; + } + self.process_response(resp); + } + + // Then take the rest. + self.socket.set_nonblocking(true).expect("set nonblocking"); + loop { + match self.recv_response() { + Err(_) => break, + Ok(resp) => self.process_response(resp), + } + } + self.socket.set_nonblocking(false).expect("set blocking"); + self.num_events } } @@ -128,7 +191,7 @@ mod tests { let socket = UdpSocket::bind(send_addr).unwrap(); socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); - let acc = AccountantStub::new(addr, socket); + let mut acc = AccountantStub::new(addr, socket); let last_id = acc.get_last_id().wait().unwrap(); let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index e4ca981597..50f2e8a2ec 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -4,21 +4,22 @@ extern crate isatty; extern crate rayon; extern crate serde_json; extern crate solana; +extern crate untrusted; use futures::Future; use getopts::Options; use isatty::stdin_isatty; use rayon::prelude::*; use solana::accountant_stub::AccountantStub; -use solana::mint::Mint; +use solana::mint::MintDemo; use solana::signature::{KeyPair, KeyPairUtil}; use solana::transaction::Transaction; use std::env; use std::io::{stdin, Read}; use std::net::UdpSocket; use std::process::exit; -use std::thread::sleep; -use std::time::{Duration, Instant}; +use std::time::Instant; +use untrusted::Input; fn print_usage(program: &str, opts: Options) { let mut brief = format!("Usage: cat | {} [options]\n\n", program); @@ -75,74 +76,68 @@ fn main() { exit(1); } - let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| { + println!("Parsing stdin..."); + let demo: MintDemo = serde_json::from_str(&buffer).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); exit(1); }); - let mint_keypair = mint.keypair(); - let mint_pubkey = mint.pubkey(); let socket = UdpSocket::bind(&send_addr).unwrap(); - println!("Stub new"); - let acc = AccountantStub::new(&addr, socket); - println!("Get last id"); + let mut acc = AccountantStub::new(&addr, socket); + + println!("Get last ID..."); let last_id = acc.get_last_id().wait().unwrap(); - println!("Get Balance"); - let mint_balance = acc.get_balance(&mint_pubkey).wait().unwrap(); - println!("Mint's Initial Balance {}", mint_balance); + println!("Creating keypairs..."); + let txs = demo.users.len() / 2; + let keypairs: Vec<_> = demo.users + .into_par_iter() + .map(|(pkcs8, _)| KeyPair::from_pkcs8(Input::from(&pkcs8)).unwrap()) + .collect(); + let keypair_pairs: Vec<_> = keypairs.chunks(2).collect(); println!("Signing transactions..."); - let txs = 1_000_000; let now = Instant::now(); - let transactions: Vec<_> = (0..txs) + let transactions: Vec<_> = keypair_pairs .into_par_iter() - .map(|_| { - let rando_pubkey = KeyPair::new().pubkey(); - Transaction::new(&mint_keypair, rando_pubkey, 1, last_id) - }) + .map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id)) .collect(); let duration = now.elapsed(); - let ns = duration.as_secs() * 2_000_000_000 + u64::from(duration.subsec_nanos()); - let bsps = f64::from(txs) / ns as f64; - let nsps = ns as f64 / f64::from(txs); + let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); + let bsps = txs as f64 / ns as f64; + let nsps = ns as f64 / txs as f64; println!( "Done. {} thousand signatures per second, {}us per signature", bsps * 1_000_000_f64, nsps / 1_000_f64 ); + let initial_tx_count = acc.transaction_count(); + println!("Transfering {} transactions in {} batches", txs, threads); let now = Instant::now(); let sz = transactions.len() / threads; let chunks: Vec<_> = transactions.chunks(sz).collect(); - let _: Vec<_> = chunks - .into_par_iter() - .map(|trs| { - println!("Transferring 1 unit {} times...", trs.len()); - let send_addr = "0.0.0.0:0"; - let socket = UdpSocket::bind(send_addr).unwrap(); - let acc = AccountantStub::new(&addr, socket); - for tr in trs { - acc.transfer_signed(tr.clone()).unwrap(); - } - () - }) - .collect(); - println!("Waiting for last transaction to be confirmed...",); - let mut val = mint_balance; - let mut prev = 0; - while val != prev { - sleep(Duration::from_millis(20)); - prev = val; - val = acc.get_balance(&mint_pubkey).wait().unwrap(); + chunks.into_par_iter().for_each(|trs| { + println!("Transferring 1 unit {} times...", trs.len()); + let send_addr = "0.0.0.0:0"; + let socket = UdpSocket::bind(send_addr).unwrap(); + let acc = AccountantStub::new(&addr, socket); + for tr in trs { + acc.transfer_signed(tr.clone()).unwrap(); + } + }); + + println!("Waiting for half the transactions to complete...",); + let mut tx_count = acc.transaction_count(); + while tx_count < transactions.len() as u64 / 2 { + tx_count = acc.transaction_count(); } - println!("Mint's Final Balance {}", val); - let txs = mint_balance - val; - println!("Successful transactions {}", txs); + let txs = tx_count - initial_tx_count; + println!("Transactions processed {}", txs); let duration = now.elapsed(); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let tps = (txs * 1_000_000_000) as f64 / ns as f64; - println!("Done. {} tps!", tps); + println!("Done. {} tps", tps); } diff --git a/src/bin/genesis-demo.rs b/src/bin/genesis-demo.rs index 340f09c6cf..68c10b9928 100644 --- a/src/bin/genesis-demo.rs +++ b/src/bin/genesis-demo.rs @@ -1,21 +1,23 @@ extern crate isatty; +extern crate rayon; +extern crate ring; extern crate serde_json; extern crate solana; +extern crate untrusted; use isatty::stdin_isatty; -use solana::entry::create_entry; +use rayon::prelude::*; +use solana::accountant::MAX_ENTRY_IDS; +use solana::entry::{create_entry, next_tick}; use solana::event::Event; -use solana::hash::Hash; -use solana::mint::Mint; -use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; +use solana::mint::MintDemo; +use solana::signature::{KeyPair, KeyPairUtil}; use solana::transaction::Transaction; use std::io::{stdin, Read}; use std::process::exit; +use untrusted::Input; -fn transfer(from: &KeyPair, (to, tokens): (PublicKey, i64), last_id: Hash) -> Event { - Event::Transaction(Transaction::new(from, to, tokens, last_id)) -} - +// Generate a ledger with lots and lots of accounts. fn main() { if stdin_isatty() { eprintln!("nothing found on stdin, expected a json file"); @@ -29,20 +31,39 @@ fn main() { exit(1); } - let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| { + let demo: MintDemo = serde_json::from_str(&buffer).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); exit(1); }); - let mut entries = mint.create_entries(); - let from = mint.keypair(); - let seed = mint.seed(); - let alice = (KeyPair::new().pubkey(), 200); - let bob = (KeyPair::new().pubkey(), 100); - let events = vec![transfer(&from, alice, seed), transfer(&from, bob, seed)]; - entries.push(create_entry(&seed, 0, events)); + let num_accounts = demo.users.len(); + let last_id = demo.mint.last_id(); + let mint_keypair = demo.mint.keypair(); - for entry in entries { + eprintln!("Signing {} transactions...", num_accounts); + let events: Vec<_> = demo.users + .into_par_iter() + .map(|(pkcs8, tokens)| { + let rando = KeyPair::from_pkcs8(Input::from(&pkcs8)).unwrap(); + let tr = Transaction::new(&mint_keypair, rando.pubkey(), tokens, last_id); + Event::Transaction(tr) + }) + .collect(); + + for entry in demo.mint.create_entries() { + println!("{}", serde_json::to_string(&entry).unwrap()); + } + + eprintln!("Logging the creation of {} accounts...", num_accounts); + let entry = create_entry(&last_id, 0, events); + println!("{}", serde_json::to_string(&entry).unwrap()); + + eprintln!("Creating {} empty entries...", MAX_ENTRY_IDS); + // Offer client lots of entry IDs to use for each transaction's last_id. + let mut last_id = last_id; + for _ in 0..MAX_ENTRY_IDS { + let entry = next_tick(&last_id, 1); + last_id = entry.id; let serialized = serde_json::to_string(&entry).unwrap_or_else(|e| { eprintln!("failed to serialize: {}", e); exit(1); diff --git a/src/bin/mint-demo.rs b/src/bin/mint-demo.rs new file mode 100644 index 0000000000..1127ce19b5 --- /dev/null +++ b/src/bin/mint-demo.rs @@ -0,0 +1,33 @@ +extern crate rayon; +extern crate ring; +extern crate serde_json; +extern crate solana; + +use rayon::prelude::*; +use ring::rand::SystemRandom; +use solana::mint::{Mint, MintDemo}; +use solana::signature::KeyPair; +use std::io; + +fn main() { + let mut input_text = String::new(); + io::stdin().read_line(&mut input_text).unwrap(); + let trimmed = input_text.trim(); + let tokens = trimmed.parse::().unwrap(); + + let mint = Mint::new(tokens); + let tokens_per_user = 1_000; + let num_accounts = tokens / tokens_per_user; + let rnd = SystemRandom::new(); + + let users: Vec<_> = (0..num_accounts) + .into_par_iter() + .map(|_| { + let pkcs8 = KeyPair::generate_pkcs8(&rnd).unwrap().to_vec(); + (pkcs8, tokens_per_user) + }) + .collect(); + + let demo = MintDemo { mint, users }; + println!("{}", serde_json::to_string(&demo).unwrap()); +} diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 3fa995a718..8b6da2c1c8 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -62,6 +62,7 @@ fn main() { exit(1); } + eprintln!("Initializing..."); let mut entries = buffer.lines().map(|line| { serde_json::from_str(&line).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); @@ -71,7 +72,7 @@ fn main() { // The first item in the ledger is required to be an entry with zero num_hashes, // which implies its id can be used as the ledger's seed. - entries.next().unwrap(); + let entry0 = entries.next().unwrap(); // The second item in the ledger is a special transaction where the to and from // fields are the same. That entry should be treated as a deposit, not a @@ -84,11 +85,14 @@ fn main() { }; let acc = Accountant::new_from_deposit(&deposit.unwrap()); + acc.register_entry_id(&entry0.id); + acc.register_entry_id(&entry1.id); let mut last_id = entry1.id; for entry in entries { last_id = entry.id; acc.process_verified_events(entry.events).unwrap(); + acc.register_entry_id(&last_id); } let historian = Historian::new(&last_id, Some(1000)); @@ -99,8 +103,8 @@ fn main() { stdout(), historian, ))); - eprintln!("Listening on {}", addr); let threads = AccountantSkel::serve(&skel, &addr, exit.clone()).unwrap(); + eprintln!("Ready. Listening on {}", addr); for t in threads { t.join().expect("join"); } diff --git a/src/entry.rs b/src/entry.rs index e672e71c04..cd23276353 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -61,7 +61,7 @@ fn add_event_data(hash_data: &mut Vec, event: &Event) { } /// Creates the hash `num_hashes` after `start_hash`. If the event contains -/// signature, the final hash will be a hash of both the previous ID and +/// a signature, the final hash will be a hash of both the previous ID and /// the signature. pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Event]) -> Hash { let mut id = *start_hash; @@ -76,10 +76,12 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Event]) -> Hash { } if !hash_data.is_empty() { - return extend_and_hash(&id, &hash_data); + extend_and_hash(&id, &hash_data) + } else if num_hashes != 0 { + hash(&id) + } else { + id } - - id } /// Creates the next Entry `num_hashes` after `start_hash`. @@ -167,6 +169,8 @@ mod tests { #[test] fn test_next_tick() { let zero = Hash::default(); - assert_eq!(next_tick(&zero, 1).num_hashes, 1) + let tick = next_tick(&zero, 1); + assert_eq!(tick.num_hashes, 1); + assert_ne!(tick.id, zero); } } diff --git a/src/mint.rs b/src/mint.rs index f3b6c5e598..754cacaa41 100644 --- a/src/mint.rs +++ b/src/mint.rs @@ -58,6 +58,12 @@ impl Mint { } } +#[derive(Serialize, Deserialize, Debug)] +pub struct MintDemo { + pub mint: Mint, + pub users: Vec<(Vec, i64)>, +} + #[cfg(test)] mod tests { use super::*;