diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 826e2fac6a..82cd42b2c1 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -10,7 +10,7 @@ use futures::Future; use getopts::Options; use isatty::stdin_isatty; use rayon::prelude::*; -use solana::accountant_stub::AccountantStub; +use solana::thin_client::ThinClient; use solana::mint::MintDemo; use solana::signature::{KeyPair, KeyPairUtil}; use solana::transaction::Transaction; @@ -87,7 +87,7 @@ fn main() { println!("Binding to {}", client_addr); let socket = UdpSocket::bind(&client_addr).unwrap(); socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); - let mut acc = AccountantStub::new(addr.parse().unwrap(), socket); + let mut acc = ThinClient::new(addr.parse().unwrap(), socket); println!("Get last ID..."); let last_id = acc.get_last_id().wait().unwrap(); @@ -129,7 +129,7 @@ fn main() { let mut client_addr: SocketAddr = client_addr.parse().unwrap(); client_addr.set_port(0); let socket = UdpSocket::bind(client_addr).unwrap(); - let acc = AccountantStub::new(addr.parse().unwrap(), socket); + let acc = ThinClient::new(addr.parse().unwrap(), socket); for tr in trs { acc.transfer_signed(tr.clone()).unwrap(); } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 5aede0c3f1..736acb0179 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -7,7 +7,7 @@ extern crate solana; use getopts::Options; use isatty::stdin_isatty; use solana::accountant::Accountant; -use solana::accountant_skel::AccountantSkel; +use solana::tpu::Tpu; use solana::crdt::ReplicatedData; use solana::entry::Entry; use solana::event::Event; @@ -119,7 +119,7 @@ fn main() { let (input, event_receiver) = sync_channel(10_000); let historian = Historian::new(event_receiver, &last_id, Some(1000)); let exit = Arc::new(AtomicBool::new(false)); - let skel = Arc::new(AccountantSkel::new(acc, input, historian)); + let tpu = Arc::new(Tpu::new(acc, input, historian)); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); @@ -132,8 +132,8 @@ fn main() { serve_sock.local_addr().unwrap(), ); eprintln!("starting server..."); - let threads = AccountantSkel::serve( - &skel, + let threads = Tpu::serve( + &tpu, d, serve_sock, skinny_sock, diff --git a/src/ecdsa.rs b/src/ecdsa.rs index c0a06646d0..c029f1e1f9 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -130,7 +130,7 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { #[cfg(test)] mod tests { - use accountant_skel::Request; + use tpu::Request; use bincode::serialize; use ecdsa; use packet::{Packet, Packets, SharedPackets}; diff --git a/src/lib.rs b/src/lib.rs index b316a79c98..5c7568c54d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,5 @@ #![cfg_attr(feature = "unstable", feature(test))] pub mod accountant; -pub mod accountant_skel; -pub mod accountant_stub; pub mod crdt; pub mod ecdsa; pub mod entry; @@ -19,8 +17,10 @@ pub mod recorder; pub mod result; pub mod signature; pub mod streamer; -pub mod transaction; +pub mod thin_client; pub mod timing; +pub mod transaction; +pub mod tpu; extern crate bincode; extern crate byteorder; extern crate chrono; diff --git a/src/accountant_stub.rs b/src/thin_client.rs similarity index 88% rename from src/accountant_stub.rs rename to src/thin_client.rs index 274bbb0d38..920081a4f6 100644 --- a/src/accountant_stub.rs +++ b/src/thin_client.rs @@ -1,9 +1,9 @@ -//! The `accountant_stub` module is a client-side object that interfaces with a server-side Accountant -//! object via the network interface exposed by AccountantSkel. Client code should use -//! this object instead of writing messages to the network directly. The binary -//! encoding of its messages are unstable and may change in future releases. +//! The `thin_client` module is a client-side object that interfaces with +//! a server-side TPU. Client code should use this object instead of writing +//! messages to the network directly. The binary encoding of its messages are +//! unstable and may change in future releases. -use accountant_skel::{Request, Response, Subscription}; +use tpu::{Request, Response, Subscription}; use bincode::{deserialize, serialize}; use futures::future::{ok, FutureResult}; use hash::Hash; @@ -13,7 +13,7 @@ use std::io; use std::net::{SocketAddr, UdpSocket}; use transaction::Transaction; -pub struct AccountantStub { +pub struct ThinClient { pub addr: SocketAddr, pub socket: UdpSocket, last_id: Option, @@ -21,20 +21,20 @@ pub struct AccountantStub { balances: HashMap>, } -impl AccountantStub { - /// Create a new AccountantStub that will interface with AccountantSkel +impl ThinClient { + /// Create a new ThinClient that will interface with Tpu /// over `socket`. To receive responses, the caller must bind `socket` - /// to a public address before invoking AccountantStub methods. + /// to a public address before invoking ThinClient methods. pub fn new(addr: SocketAddr, socket: UdpSocket) -> Self { - let stub = AccountantStub { + let client = ThinClient { addr: addr, socket, last_id: None, num_events: 0, balances: HashMap::new(), }; - stub.init(); - stub + client.init(); + client } pub fn init(&self) { @@ -119,7 +119,7 @@ impl AccountantStub { } /// Return the number of transactions the server processed since creating - /// this stub instance. + /// this client instance. pub fn transaction_count(&mut self) -> u64 { // Wait for at least one EntryInfo. let mut done = false; @@ -148,7 +148,7 @@ impl AccountantStub { mod tests { use super::*; use accountant::Accountant; - use accountant_skel::AccountantSkel; + use tpu::Tpu; use crdt::{Crdt, ReplicatedData}; use futures::Future; use historian::Historian; @@ -165,7 +165,7 @@ mod tests { // TODO: Figure out why this test sometimes hangs on TravisCI. #[test] - fn test_accountant_stub() { + fn test_thin_client() { logger::setup(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -185,14 +185,13 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc = Arc::new(AccountantSkel::new(acc, input, historian)); - let threads = - AccountantSkel::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap(); + let acc = Arc::new(Tpu::new(acc, input, historian)); + let threads = Tpu::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(300)); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut acc = AccountantStub::new(addr, socket); + let mut acc = ThinClient::new(addr, socket); let last_id = acc.get_last_id().wait().unwrap(); let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); @@ -230,9 +229,9 @@ mod tests { } #[test] - fn test_multi_accountant_stub() { + fn test_multi_node() { logger::setup(); - info!("test_multi_accountant_stub"); + info!("test_multi_node"); let leader = test_node(); let replicant = test_node(); let alice = Mint::new(10_000); @@ -243,17 +242,17 @@ mod tests { let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let acc = Accountant::new(&alice); - Arc::new(AccountantSkel::new(acc, input, historian)) + Arc::new(Tpu::new(acc, input, historian)) }; let replicant_acc = { let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let acc = Accountant::new(&alice); - Arc::new(AccountantSkel::new(acc, input, historian)) + Arc::new(Tpu::new(acc, input, historian)) }; - let leader_threads = AccountantSkel::serve( + let leader_threads = Tpu::serve( &leader_acc, leader.0.clone(), leader.2, @@ -262,7 +261,7 @@ mod tests { exit.clone(), sink(), ).unwrap(); - let replicant_threads = AccountantSkel::replicate( + let replicant_threads = Tpu::replicate( &replicant_acc, replicant.0.clone(), replicant.1, @@ -314,7 +313,7 @@ mod tests { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); - let mut acc = AccountantStub::new(leader.0.serve_addr, socket); + let mut acc = ThinClient::new(leader.0.serve_addr, socket); info!("getting leader last_id"); let last_id = acc.get_last_id().wait().unwrap(); info!("executing leader transer"); @@ -330,7 +329,7 @@ mod tests { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); - let mut acc = AccountantStub::new(replicant.0.serve_addr, socket); + let mut acc = ThinClient::new(replicant.0.serve_addr, socket); info!("getting replicant balance"); if let Ok(bal) = acc.get_balance(&bob_pubkey) { replicant_balance = bal; diff --git a/src/accountant_skel.rs b/src/tpu.rs similarity index 93% rename from src/accountant_skel.rs rename to src/tpu.rs index d30975ae83..32142e6ed8 100644 --- a/src/accountant_skel.rs +++ b/src/tpu.rs @@ -1,6 +1,5 @@ -//! The `accountant_skel` module is a microservice that exposes the high-level -//! Accountant API to the network. Its message encoding is currently -//! in flux. Clients should use AccountantStub to interact with it. +//! The `tpu` module implements the Transaction Processing Unit, a +//! 5-stage transaction processing pipeline in software. use accountant::Accountant; use bincode::{deserialize, serialize, serialize_into}; @@ -33,7 +32,7 @@ use timing; use std::time::Instant; use rand::{thread_rng, Rng}; -pub struct AccountantSkel { +pub struct Tpu { acc: Mutex, historian_input: Mutex>, historian: Historian, @@ -70,7 +69,7 @@ impl Request { } } -type SharedSkel = Arc; +type SharedTpu = Arc; #[derive(Serialize, Deserialize, Debug)] pub enum Response { @@ -78,10 +77,10 @@ pub enum Response { EntryInfo(EntryInfo), } -impl AccountantSkel { - /// Create a new AccountantSkel that wraps the given Accountant. +impl Tpu { + /// Create a new Tpu that wraps the given Accountant. pub fn new(acc: Accountant, historian_input: SyncSender, historian: Historian) -> Self { - AccountantSkel { + Tpu { acc: Mutex::new(acc), entry_info_subscribers: Mutex::new(vec![]), historian_input: Mutex::new(historian_input), @@ -89,7 +88,7 @@ impl AccountantSkel { } } - fn notify_entry_info_subscribers(obj: &SharedSkel, entry: &Entry) { + fn notify_entry_info_subscribers(obj: &SharedTpu, entry: &Entry) { // TODO: No need to bind(). let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); @@ -112,7 +111,7 @@ impl AccountantSkel { } } - fn update_entry(obj: &SharedSkel, writer: &Arc>, entry: &Entry) { + fn update_entry(obj: &SharedTpu, writer: &Arc>, entry: &Entry) { trace!("update_entry entry"); obj.acc.lock().unwrap().register_entry_id(&entry.id); writeln!( @@ -123,7 +122,7 @@ impl AccountantSkel { Self::notify_entry_info_subscribers(obj, &entry); } - fn receive_all(obj: &SharedSkel, writer: &Arc>) -> Result> { + fn receive_all(obj: &SharedTpu, writer: &Arc>) -> Result> { //TODO implement a serialize for channel that does this without allocations let mut l = vec![]; let entry = obj.historian @@ -182,7 +181,7 @@ impl AccountantSkel { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out fn run_sync( - obj: SharedSkel, + obj: SharedTpu, broadcast: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, writer: &Arc>, @@ -198,7 +197,7 @@ impl AccountantSkel { } pub fn sync_service( - obj: SharedSkel, + obj: SharedTpu, exit: Arc, broadcast: streamer::BlobSender, blob_recycler: packet::BlobRecycler, @@ -213,12 +212,12 @@ impl AccountantSkel { }) } - fn process_thin_client_requests(_obj: SharedSkel, _socket: &UdpSocket) -> Result<()> { + fn process_thin_client_requests(_obj: SharedTpu, _socket: &UdpSocket) -> Result<()> { Ok(()) } fn thin_client_service( - obj: SharedSkel, + obj: SharedTpu, exit: Arc, socket: UdpSocket, ) -> JoinHandle<()> { @@ -233,12 +232,12 @@ impl AccountantSkel { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out - fn run_sync_no_broadcast(obj: SharedSkel) -> Result<()> { + fn run_sync_no_broadcast(obj: SharedTpu) -> Result<()> { Self::receive_all(&obj, &Arc::new(Mutex::new(sink())))?; Ok(()) } - pub fn sync_no_broadcast_service(obj: SharedSkel, exit: Arc) -> JoinHandle<()> { + pub fn sync_no_broadcast_service(obj: SharedTpu, exit: Arc) -> JoinHandle<()> { spawn(move || loop { let _ = Self::run_sync_no_broadcast(obj.clone()); if exit.load(Ordering::Relaxed) { @@ -420,7 +419,7 @@ impl AccountantSkel { } fn process( - obj: &SharedSkel, + obj: &SharedTpu, verified_receiver: &Receiver)>>, responder_sender: &streamer::BlobSender, packet_recycler: &packet::PacketRecycler, @@ -485,7 +484,7 @@ impl AccountantSkel { /// Process verified blobs, already in order /// Respond with a signed hash of the state fn replicate_state( - obj: &SharedSkel, + obj: &SharedTpu, verified_receiver: &streamer::BlobReceiver, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { @@ -510,11 +509,11 @@ impl AccountantSkel { Ok(()) } - /// Create a UDP microservice that forwards messages the given AccountantSkel. + /// Create a UDP microservice that forwards messages the given Tpu. /// This service is the network leader /// Set `exit` to shutdown its threads. pub fn serve( - obj: &SharedSkel, + obj: &SharedTpu, me: ReplicatedData, serve: UdpSocket, skinny: UdpSocket, @@ -582,10 +581,10 @@ impl AccountantSkel { let t_skinny = Self::thin_client_service(obj.clone(), exit.clone(), skinny); - let skel = obj.clone(); + let tpu = obj.clone(); let t_server = spawn(move || loop { let e = Self::process( - &mut skel.clone(), + &mut tpu.clone(), &verified_receiver, &responder_sender, &packet_recycler, @@ -631,7 +630,7 @@ impl AccountantSkel { /// 4. process the transaction state machine /// 5. respond with the hash of the state back to the leader pub fn replicate( - obj: &SharedSkel, + obj: &SharedTpu, me: ReplicatedData, gossip: UdpSocket, serve: UdpSocket, @@ -682,10 +681,10 @@ impl AccountantSkel { retransmit_sender, ); - let skel = obj.clone(); + let tpu = obj.clone(); let s_exit = exit.clone(); let t_replicator = spawn(move || loop { - let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler); + let e = Self::replicate_state(&tpu, &window_receiver, &blob_recycler); if e.is_err() && s_exit.load(Ordering::Relaxed) { break; } @@ -728,11 +727,11 @@ impl AccountantSkel { } let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone()); - let skel = obj.clone(); + let tpu = obj.clone(); let s_exit = exit.clone(); let t_server = spawn(move || loop { let e = Self::process( - &mut skel.clone(), + &mut tpu.clone(), &verified_receiver, &responder_sender, &packet_recycler, @@ -786,15 +785,15 @@ pub fn to_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec = skel.historian.output.lock().unwrap().iter().collect(); + drop(tpu.historian_input); + let entries: Vec = tpu.historian.output.lock().unwrap().iter().collect(); // Assert the user holds one token, not two. If the server only output one // entry, then the second transaction will be rejected, because it drives @@ -901,10 +900,10 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc_skel = Arc::new(AccountantSkel::new(acc, input, historian)); + let tpu = Arc::new(Tpu::new(acc, input, historian)); let serve_addr = leader_serve.local_addr().unwrap(); - let threads = AccountantSkel::serve( - &acc_skel, + let threads = Tpu::serve( + &tpu, leader_data, leader_serve, leader_skinny, @@ -916,23 +915,23 @@ mod tests { let socket = UdpSocket::bind("127.0.0.1:0").unwrap(); socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); - let mut acc_stub = AccountantStub::new(serve_addr, socket); - let last_id = acc_stub.get_last_id().wait().unwrap(); + let mut client = ThinClient::new(serve_addr, socket); + let last_id = client.get_last_id().wait().unwrap(); trace!("doing stuff"); let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); - let _sig = acc_stub.transfer_signed(tr).unwrap(); + let _sig = client.transfer_signed(tr).unwrap(); - let last_id = acc_stub.get_last_id().wait().unwrap(); + let last_id = client.get_last_id().wait().unwrap(); let mut tr2 = Transaction::new(&alice.keypair(), bob_pubkey, 501, last_id); tr2.data.tokens = 502; tr2.data.plan = Plan::new_payment(502, bob_pubkey); - let _sig = acc_stub.transfer_signed(tr2).unwrap(); + let _sig = client.transfer_signed(tr2).unwrap(); - assert_eq!(acc_stub.get_balance(&bob_pubkey).unwrap(), 500); + assert_eq!(client.get_balance(&bob_pubkey).unwrap(), 500); trace!("exiting"); exit.store(true, Ordering::Relaxed); trace!("joining threads"); @@ -1009,9 +1008,9 @@ mod tests { let acc = Accountant::new(&alice); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc = Arc::new(AccountantSkel::new(acc, input, historian)); + let acc = Arc::new(Tpu::new(acc, input, historian)); let replicate_addr = target1_data.replicate_addr; - let threads = AccountantSkel::replicate( + let threads = Tpu::replicate( &acc, target1_data, target1_gossip, @@ -1111,7 +1110,7 @@ mod tests { let entry_list = vec![e0; 1000]; let blob_recycler = BlobRecycler::default(); let mut blob_q = VecDeque::new(); - AccountantSkel::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q); + Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q); let serialized_entry_list = serialize(&entry_list).unwrap(); let mut num_blobs_ref = serialized_entry_list.len() / BLOB_SIZE; if serialized_entry_list.len() % BLOB_SIZE != 0 { @@ -1127,7 +1126,7 @@ mod bench { extern crate test; use self::test::Bencher; use accountant::{Accountant, MAX_ENTRY_IDS}; - use accountant_skel::*; + use tpu::*; use bincode::serialize; use hash::hash; use mint::Mint; @@ -1180,17 +1179,17 @@ mod bench { let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &mint.last_id(), None); - let skel = AccountantSkel::new(acc, input, historian); + let tpu = Tpu::new(acc, input, historian); let now = Instant::now(); - assert!(skel.process_events(req_vers).is_ok()); + assert!(tpu.process_events(req_vers).is_ok()); let duration = now.elapsed(); let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; let tps = txs as f64 / sec; // Ensure that all transactions were successfully logged. - drop(skel.historian_input); - let entries: Vec = skel.historian.output.lock().unwrap().iter().collect(); + drop(tpu.historian_input); + let entries: Vec = tpu.historian.output.lock().unwrap().iter().collect(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].events.len(), txs as usize);