commit
59c1b9983d
253
src/banking_stage.rs
Normal file
253
src/banking_stage.rs
Normal file
@ -0,0 +1,253 @@
|
|||||||
|
//! The `banking_stage` processes Event messages.
|
||||||
|
|
||||||
|
use bank::Bank;
|
||||||
|
use bincode::deserialize;
|
||||||
|
use event::Event;
|
||||||
|
use packet;
|
||||||
|
use packet::SharedPackets;
|
||||||
|
use rayon::prelude::*;
|
||||||
|
use recorder::Signal;
|
||||||
|
use result::Result;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||||
|
use std::thread::{spawn, JoinHandle};
|
||||||
|
use std::time::Duration;
|
||||||
|
use std::time::Instant;
|
||||||
|
use timing;
|
||||||
|
|
||||||
|
pub struct BankingStage {
|
||||||
|
pub thread_hdl: JoinHandle<()>,
|
||||||
|
pub signal_receiver: Receiver<Signal>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BankingStage {
|
||||||
|
pub fn new(
|
||||||
|
bank: Arc<Bank>,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
|
||||||
|
packet_recycler: packet::PacketRecycler,
|
||||||
|
) -> Self {
|
||||||
|
let (signal_sender, signal_receiver) = channel();
|
||||||
|
let thread_hdl = spawn(move || loop {
|
||||||
|
let e = Self::process_packets(
|
||||||
|
bank.clone(),
|
||||||
|
&verified_receiver,
|
||||||
|
&signal_sender,
|
||||||
|
&packet_recycler,
|
||||||
|
);
|
||||||
|
if e.is_err() {
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
BankingStage {
|
||||||
|
thread_hdl,
|
||||||
|
signal_receiver,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deserialize_events(p: &packet::Packets) -> Vec<Option<(Event, SocketAddr)>> {
|
||||||
|
p.packets
|
||||||
|
.par_iter()
|
||||||
|
.map(|x| {
|
||||||
|
deserialize(&x.data[0..x.meta.size])
|
||||||
|
.map(|req| (req, x.meta.addr()))
|
||||||
|
.ok()
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process_packets(
|
||||||
|
bank: Arc<Bank>,
|
||||||
|
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
|
||||||
|
signal_sender: &Sender<Signal>,
|
||||||
|
packet_recycler: &packet::PacketRecycler,
|
||||||
|
) -> Result<()> {
|
||||||
|
let timer = Duration::new(1, 0);
|
||||||
|
let recv_start = Instant::now();
|
||||||
|
let mms = verified_receiver.recv_timeout(timer)?;
|
||||||
|
let mut reqs_len = 0;
|
||||||
|
let mms_len = mms.len();
|
||||||
|
info!(
|
||||||
|
"@{:?} process start stalled for: {:?}ms batches: {}",
|
||||||
|
timing::timestamp(),
|
||||||
|
timing::duration_as_ms(&recv_start.elapsed()),
|
||||||
|
mms.len(),
|
||||||
|
);
|
||||||
|
let proc_start = Instant::now();
|
||||||
|
for (msgs, vers) in mms {
|
||||||
|
let events = Self::deserialize_events(&msgs.read().unwrap());
|
||||||
|
reqs_len += events.len();
|
||||||
|
let events = events
|
||||||
|
.into_iter()
|
||||||
|
.zip(vers)
|
||||||
|
.filter_map(|(event, ver)| match event {
|
||||||
|
None => None,
|
||||||
|
Some((event, _addr)) => if event.verify() && ver != 0 {
|
||||||
|
Some(event)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
},
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
debug!("process_events");
|
||||||
|
let results = bank.process_verified_events(events);
|
||||||
|
let events = results.into_iter().filter_map(|x| x.ok()).collect();
|
||||||
|
signal_sender.send(Signal::Events(events))?;
|
||||||
|
debug!("done process_events");
|
||||||
|
|
||||||
|
packet_recycler.recycle(msgs);
|
||||||
|
}
|
||||||
|
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
|
||||||
|
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
|
||||||
|
info!(
|
||||||
|
"@{:?} done processing event batches: {} time: {:?}ms reqs: {} reqs/s: {}",
|
||||||
|
timing::timestamp(),
|
||||||
|
mms_len,
|
||||||
|
total_time_ms,
|
||||||
|
reqs_len,
|
||||||
|
(reqs_len as f32) / (total_time_s)
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: When banking is pulled out of RequestStage, add this test back in.
|
||||||
|
|
||||||
|
//use bank::Bank;
|
||||||
|
//use entry::Entry;
|
||||||
|
//use event::Event;
|
||||||
|
//use hash::Hash;
|
||||||
|
//use record_stage::RecordStage;
|
||||||
|
//use recorder::Signal;
|
||||||
|
//use result::Result;
|
||||||
|
//use std::sync::mpsc::{channel, Sender};
|
||||||
|
//use std::sync::{Arc, Mutex};
|
||||||
|
//use std::time::Duration;
|
||||||
|
//
|
||||||
|
//#[cfg(test)]
|
||||||
|
//mod tests {
|
||||||
|
// use bank::Bank;
|
||||||
|
// use event::Event;
|
||||||
|
// use event_processor::EventProcessor;
|
||||||
|
// use mint::Mint;
|
||||||
|
// use signature::{KeyPair, KeyPairUtil};
|
||||||
|
// use transaction::Transaction;
|
||||||
|
//
|
||||||
|
// #[test]
|
||||||
|
// // TODO: Move this test banking_stage. Calling process_events() directly
|
||||||
|
// // defeats the purpose of this test.
|
||||||
|
// fn test_banking_sequential_consistency() {
|
||||||
|
// // In this attack we'll demonstrate that a verifier can interpret the ledger
|
||||||
|
// // differently if either the server doesn't signal the ledger to add an
|
||||||
|
// // Entry OR if the verifier tries to parallelize across multiple Entries.
|
||||||
|
// let mint = Mint::new(2);
|
||||||
|
// let bank = Bank::new(&mint);
|
||||||
|
// let event_processor = EventProcessor::new(bank, &mint.last_id(), None);
|
||||||
|
//
|
||||||
|
// // Process a batch that includes a transaction that receives two tokens.
|
||||||
|
// let alice = KeyPair::new();
|
||||||
|
// let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
||||||
|
// let events = vec![Event::Transaction(tr)];
|
||||||
|
// let entry0 = event_processor.process_events(events).unwrap();
|
||||||
|
//
|
||||||
|
// // Process a second batch that spends one of those tokens.
|
||||||
|
// let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
|
||||||
|
// let events = vec![Event::Transaction(tr)];
|
||||||
|
// let entry1 = event_processor.process_events(events).unwrap();
|
||||||
|
//
|
||||||
|
// // Collect the ledger and feed it to a new bank.
|
||||||
|
// let entries = vec![entry0, entry1];
|
||||||
|
//
|
||||||
|
// // Assert the user holds one token, not two. If the server only output one
|
||||||
|
// // entry, then the second transaction will be rejected, because it drives
|
||||||
|
// // the account balance below zero before the credit is added.
|
||||||
|
// let bank = Bank::new(&mint);
|
||||||
|
// for entry in entries {
|
||||||
|
// assert!(
|
||||||
|
// bank
|
||||||
|
// .process_verified_events(entry.events)
|
||||||
|
// .into_iter()
|
||||||
|
// .all(|x| x.is_ok())
|
||||||
|
// );
|
||||||
|
// }
|
||||||
|
// assert_eq!(bank.get_balance(&alice.pubkey()), Some(1));
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//#[cfg(all(feature = "unstable", test))]
|
||||||
|
//mod bench {
|
||||||
|
// extern crate test;
|
||||||
|
// use self::test::Bencher;
|
||||||
|
// use bank::{Bank, MAX_ENTRY_IDS};
|
||||||
|
// use bincode::serialize;
|
||||||
|
// use event_processor::*;
|
||||||
|
// use hash::hash;
|
||||||
|
// use mint::Mint;
|
||||||
|
// use rayon::prelude::*;
|
||||||
|
// use signature::{KeyPair, KeyPairUtil};
|
||||||
|
// use std::collections::HashSet;
|
||||||
|
// use std::time::Instant;
|
||||||
|
// use transaction::Transaction;
|
||||||
|
//
|
||||||
|
// #[bench]
|
||||||
|
// fn process_events_bench(_bencher: &mut Bencher) {
|
||||||
|
// let mint = Mint::new(100_000_000);
|
||||||
|
// let bank = Bank::new(&mint);
|
||||||
|
// // Create transactions between unrelated parties.
|
||||||
|
// let txs = 100_000;
|
||||||
|
// let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
|
||||||
|
// let transactions: Vec<_> = (0..txs)
|
||||||
|
// .into_par_iter()
|
||||||
|
// .map(|i| {
|
||||||
|
// // Seed the 'to' account and a cell for its signature.
|
||||||
|
// let dummy_id = i % (MAX_ENTRY_IDS as i32);
|
||||||
|
// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
|
||||||
|
// {
|
||||||
|
// let mut last_ids = last_ids.lock().unwrap();
|
||||||
|
// if !last_ids.contains(&last_id) {
|
||||||
|
// last_ids.insert(last_id);
|
||||||
|
// bank.register_entry_id(&last_id);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // Seed the 'from' account.
|
||||||
|
// let rando0 = KeyPair::new();
|
||||||
|
// let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
|
||||||
|
// bank.process_verified_transaction(&tr).unwrap();
|
||||||
|
//
|
||||||
|
// let rando1 = KeyPair::new();
|
||||||
|
// let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
|
||||||
|
// bank.process_verified_transaction(&tr).unwrap();
|
||||||
|
//
|
||||||
|
// // Finally, return a transaction that's unique
|
||||||
|
// Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
|
||||||
|
// })
|
||||||
|
// .collect();
|
||||||
|
//
|
||||||
|
// let events: Vec<_> = transactions
|
||||||
|
// .into_iter()
|
||||||
|
// .map(|tr| Event::Transaction(tr))
|
||||||
|
// .collect();
|
||||||
|
//
|
||||||
|
// let event_processor = EventProcessor::new(bank, &mint.last_id(), None);
|
||||||
|
//
|
||||||
|
// let now = Instant::now();
|
||||||
|
// assert!(event_processor.process_events(events).is_ok());
|
||||||
|
// let duration = now.elapsed();
|
||||||
|
// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
|
||||||
|
// let tps = txs as f64 / sec;
|
||||||
|
//
|
||||||
|
// // Ensure that all transactions were successfully logged.
|
||||||
|
// drop(event_processor.historian_input);
|
||||||
|
// let entries: Vec<Entry> = event_processor.output.lock().unwrap().iter().collect();
|
||||||
|
// assert_eq!(entries.len(), 1);
|
||||||
|
// assert_eq!(entries[0].events.len(), txs as usize);
|
||||||
|
//
|
||||||
|
// println!("{} tps", tps);
|
||||||
|
// }
|
||||||
|
//}
|
@ -10,7 +10,7 @@ use solana::bank::Bank;
|
|||||||
use solana::crdt::ReplicatedData;
|
use solana::crdt::ReplicatedData;
|
||||||
use solana::entry::Entry;
|
use solana::entry::Entry;
|
||||||
use solana::event::Event;
|
use solana::event::Event;
|
||||||
use solana::rpu::Rpu;
|
use solana::server::Server;
|
||||||
use solana::signature::{KeyPair, KeyPairUtil};
|
use solana::signature::{KeyPair, KeyPairUtil};
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::io::{stdin, stdout, Read};
|
use std::io::{stdin, stdout, Read};
|
||||||
@ -116,11 +116,14 @@ fn main() {
|
|||||||
eprintln!("creating networking stack...");
|
eprintln!("creating networking stack...");
|
||||||
|
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let rpu = Rpu::new(bank, last_id, Some(Duration::from_millis(1000)));
|
|
||||||
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
|
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
|
||||||
|
serve_sock
|
||||||
|
.set_read_timeout(Some(Duration::new(1, 0)))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
|
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
|
||||||
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
|
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
|
||||||
let _events_sock = UdpSocket::bind(&events_addr).unwrap();
|
let events_sock = UdpSocket::bind(&events_addr).unwrap();
|
||||||
let pubkey = KeyPair::new().pubkey();
|
let pubkey = KeyPair::new().pubkey();
|
||||||
let d = ReplicatedData::new(
|
let d = ReplicatedData::new(
|
||||||
pubkey,
|
pubkey,
|
||||||
@ -128,11 +131,28 @@ fn main() {
|
|||||||
replicate_sock.local_addr().unwrap(),
|
replicate_sock.local_addr().unwrap(),
|
||||||
serve_sock.local_addr().unwrap(),
|
serve_sock.local_addr().unwrap(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut local = serve_sock.local_addr().unwrap();
|
||||||
|
local.set_port(0);
|
||||||
|
let broadcast_socket = UdpSocket::bind(local).unwrap();
|
||||||
|
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
|
||||||
|
|
||||||
eprintln!("starting server...");
|
eprintln!("starting server...");
|
||||||
let threads = rpu.serve(d, serve_sock, gossip_sock, exit.clone(), stdout())
|
let server = Server::new(
|
||||||
.unwrap();
|
bank,
|
||||||
|
last_id,
|
||||||
|
Some(Duration::from_millis(1000)),
|
||||||
|
d,
|
||||||
|
serve_sock,
|
||||||
|
events_sock,
|
||||||
|
broadcast_socket,
|
||||||
|
respond_socket,
|
||||||
|
gossip_sock,
|
||||||
|
exit.clone(),
|
||||||
|
stdout(),
|
||||||
|
);
|
||||||
eprintln!("Ready. Listening on {}", serve_addr);
|
eprintln!("Ready. Listening on {}", serve_addr);
|
||||||
for t in threads {
|
for t in server.thread_hdls {
|
||||||
t.join().expect("join");
|
t.join().expect("join");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
15
src/ecdsa.rs
15
src/ecdsa.rs
@ -136,14 +136,23 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use bincode::serialize;
|
use bincode::serialize;
|
||||||
use ecdsa;
|
use ecdsa;
|
||||||
|
use event::Event;
|
||||||
use packet::{Packet, Packets, SharedPackets};
|
use packet::{Packet, Packets, SharedPackets};
|
||||||
use request::Request;
|
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
use transaction::test_tx;
|
use transaction::{memfind, test_tx};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_layout() {
|
||||||
|
let tr = test_tx();
|
||||||
|
let tx = serialize(&tr).unwrap();
|
||||||
|
let packet = serialize(&Event::Transaction(tr)).unwrap();
|
||||||
|
assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET));
|
||||||
|
assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None);
|
||||||
|
}
|
||||||
|
|
||||||
fn make_packet_from_transaction(tr: Transaction) -> Packet {
|
fn make_packet_from_transaction(tr: Transaction) -> Packet {
|
||||||
let tx = serialize(&Request::Transaction(tr)).unwrap();
|
let tx = serialize(&Event::Transaction(tr)).unwrap();
|
||||||
let mut packet = Packet::default();
|
let mut packet = Packet::default();
|
||||||
packet.meta.size = tx.len();
|
packet.meta.size = tx.len();
|
||||||
packet.data[..packet.meta.size].copy_from_slice(&tx);
|
packet.data[..packet.meta.size].copy_from_slice(&tx);
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#![cfg_attr(feature = "unstable", feature(test))]
|
#![cfg_attr(feature = "unstable", feature(test))]
|
||||||
pub mod bank;
|
pub mod bank;
|
||||||
|
pub mod banking_stage;
|
||||||
pub mod crdt;
|
pub mod crdt;
|
||||||
pub mod ecdsa;
|
pub mod ecdsa;
|
||||||
pub mod entry;
|
pub mod entry;
|
||||||
@ -20,11 +21,13 @@ pub mod request_processor;
|
|||||||
pub mod request_stage;
|
pub mod request_stage;
|
||||||
pub mod result;
|
pub mod result;
|
||||||
pub mod rpu;
|
pub mod rpu;
|
||||||
|
pub mod server;
|
||||||
pub mod sig_verify_stage;
|
pub mod sig_verify_stage;
|
||||||
pub mod signature;
|
pub mod signature;
|
||||||
pub mod streamer;
|
pub mod streamer;
|
||||||
pub mod thin_client;
|
pub mod thin_client;
|
||||||
pub mod timing;
|
pub mod timing;
|
||||||
|
pub mod tpu;
|
||||||
pub mod transaction;
|
pub mod transaction;
|
||||||
pub mod tvu;
|
pub mod tvu;
|
||||||
pub mod write_stage;
|
pub mod write_stage;
|
||||||
|
@ -5,12 +5,10 @@ use hash::Hash;
|
|||||||
use packet;
|
use packet;
|
||||||
use packet::SharedPackets;
|
use packet::SharedPackets;
|
||||||
use signature::PublicKey;
|
use signature::PublicKey;
|
||||||
use transaction::Transaction;
|
|
||||||
|
|
||||||
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
|
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
pub enum Request {
|
pub enum Request {
|
||||||
Transaction(Transaction),
|
|
||||||
GetBalance { key: PublicKey },
|
GetBalance { key: PublicKey },
|
||||||
GetLastId,
|
GetLastId,
|
||||||
GetTransactionCount,
|
GetTransactionCount,
|
||||||
@ -19,10 +17,7 @@ pub enum Request {
|
|||||||
impl Request {
|
impl Request {
|
||||||
/// Verify the request is valid.
|
/// Verify the request is valid.
|
||||||
pub fn verify(&self) -> bool {
|
pub fn verify(&self) -> bool {
|
||||||
match *self {
|
true
|
||||||
Request::Transaction(ref tr) => tr.verify_plan(),
|
|
||||||
_ => true,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,24 +49,12 @@ pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use bincode::serialize;
|
|
||||||
use ecdsa;
|
|
||||||
use packet::{PacketRecycler, NUM_PACKETS};
|
use packet::{PacketRecycler, NUM_PACKETS};
|
||||||
use request::{to_request_packets, Request};
|
use request::{to_request_packets, Request};
|
||||||
use transaction::{memfind, test_tx};
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_layout() {
|
|
||||||
let tr = test_tx();
|
|
||||||
let tx = serialize(&tr).unwrap();
|
|
||||||
let packet = serialize(&Request::Transaction(tr)).unwrap();
|
|
||||||
assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET));
|
|
||||||
assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_to_packets() {
|
fn test_to_packets() {
|
||||||
let tr = Request::Transaction(test_tx());
|
let tr = Request::GetTransactionCount;
|
||||||
let re = PacketRecycler::default();
|
let re = PacketRecycler::default();
|
||||||
let rv = to_request_packets(&re, vec![tr.clone(); 1]);
|
let rv = to_request_packets(&re, vec![tr.clone(); 1]);
|
||||||
assert_eq!(rv.len(), 1);
|
assert_eq!(rv.len(), 1);
|
||||||
|
@ -6,14 +6,12 @@ use event::Event;
|
|||||||
use packet;
|
use packet;
|
||||||
use packet::SharedPackets;
|
use packet::SharedPackets;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use recorder::Signal;
|
|
||||||
use request::{Request, Response};
|
use request::{Request, Response};
|
||||||
use result::Result;
|
use result::Result;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::mpsc::{Receiver, Sender};
|
use std::sync::mpsc::Receiver;
|
||||||
use std::time::Duration;
|
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use streamer;
|
use streamer;
|
||||||
use timing;
|
use timing;
|
||||||
@ -53,7 +51,6 @@ impl RequestProcessor {
|
|||||||
info!("Response::TransactionCount {:?}", rsp);
|
info!("Response::TransactionCount {:?}", rsp);
|
||||||
Some(rsp)
|
Some(rsp)
|
||||||
}
|
}
|
||||||
Request::Transaction(_) => unreachable!(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,24 +88,6 @@ impl RequestProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Split Request list into verified transactions and the rest
|
/// Split Request list into verified transactions and the rest
|
||||||
fn partition_requests(
|
|
||||||
req_vers: Vec<(Request, SocketAddr, u8)>,
|
|
||||||
) -> (Vec<Event>, Vec<(Request, SocketAddr)>) {
|
|
||||||
let mut events = vec![];
|
|
||||||
let mut reqs = vec![];
|
|
||||||
for (msg, rsp_addr, verify) in req_vers {
|
|
||||||
match msg {
|
|
||||||
Request::Transaction(tr) => {
|
|
||||||
if verify != 0 {
|
|
||||||
events.push(Event::Transaction(tr));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => reqs.push((msg, rsp_addr)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
(events, reqs)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_response(
|
fn serialize_response(
|
||||||
resp: Response,
|
resp: Response,
|
||||||
rsp_addr: SocketAddr,
|
rsp_addr: SocketAddr,
|
||||||
@ -139,49 +118,29 @@ impl RequestProcessor {
|
|||||||
|
|
||||||
pub fn process_request_packets(
|
pub fn process_request_packets(
|
||||||
&self,
|
&self,
|
||||||
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
|
packet_receiver: &Receiver<SharedPackets>,
|
||||||
signal_sender: &Sender<Signal>,
|
|
||||||
blob_sender: &streamer::BlobSender,
|
blob_sender: &streamer::BlobSender,
|
||||||
packet_recycler: &packet::PacketRecycler,
|
packet_recycler: &packet::PacketRecycler,
|
||||||
blob_recycler: &packet::BlobRecycler,
|
blob_recycler: &packet::BlobRecycler,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::new(1, 0);
|
let (batch, batch_len) = streamer::recv_batch(packet_receiver)?;
|
||||||
let recv_start = Instant::now();
|
|
||||||
let mms = verified_receiver.recv_timeout(timer)?;
|
|
||||||
let mut reqs_len = 0;
|
|
||||||
let mms_len = mms.len();
|
|
||||||
info!(
|
info!(
|
||||||
"@{:?} process start stalled for: {:?}ms batches: {}",
|
"@{:?} request_stage: processing: {}",
|
||||||
timing::timestamp(),
|
timing::timestamp(),
|
||||||
timing::duration_as_ms(&recv_start.elapsed()),
|
batch_len
|
||||||
mms.len(),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut reqs_len = 0;
|
||||||
let proc_start = Instant::now();
|
let proc_start = Instant::now();
|
||||||
for (msgs, vers) in mms {
|
for msgs in batch {
|
||||||
let reqs = Self::deserialize_requests(&msgs.read().unwrap());
|
let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap())
|
||||||
reqs_len += reqs.len();
|
.into_iter()
|
||||||
let req_vers = reqs.into_iter()
|
.filter_map(|x| x)
|
||||||
.zip(vers)
|
|
||||||
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
|
|
||||||
.filter(|x| {
|
|
||||||
let v = x.0.verify();
|
|
||||||
v
|
|
||||||
})
|
|
||||||
.collect();
|
.collect();
|
||||||
|
reqs_len += reqs.len();
|
||||||
|
|
||||||
debug!("partitioning");
|
|
||||||
let (events, reqs) = Self::partition_requests(req_vers);
|
|
||||||
debug!("events: {} reqs: {}", events.len(), reqs.len());
|
|
||||||
|
|
||||||
debug!("process_events");
|
|
||||||
let results = self.bank.process_verified_events(events);
|
|
||||||
let events = results.into_iter().filter_map(|x| x.ok()).collect();
|
|
||||||
signal_sender.send(Signal::Events(events))?;
|
|
||||||
debug!("done process_events");
|
|
||||||
|
|
||||||
debug!("process_requests");
|
|
||||||
let rsps = self.process_requests(reqs);
|
let rsps = self.process_requests(reqs);
|
||||||
debug!("done process_requests");
|
|
||||||
|
|
||||||
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
||||||
if !blobs.is_empty() {
|
if !blobs.is_empty() {
|
||||||
@ -196,7 +155,7 @@ impl RequestProcessor {
|
|||||||
info!(
|
info!(
|
||||||
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
|
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
|
||||||
timing::timestamp(),
|
timing::timestamp(),
|
||||||
mms_len,
|
batch_len,
|
||||||
total_time_ms,
|
total_time_ms,
|
||||||
reqs_len,
|
reqs_len,
|
||||||
(reqs_len as f32) / (total_time_s)
|
(reqs_len as f32) / (total_time_s)
|
||||||
|
@ -2,7 +2,6 @@
|
|||||||
|
|
||||||
use packet;
|
use packet;
|
||||||
use packet::SharedPackets;
|
use packet::SharedPackets;
|
||||||
use recorder::Signal;
|
|
||||||
use request_processor::RequestProcessor;
|
use request_processor::RequestProcessor;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
@ -12,7 +11,6 @@ use streamer;
|
|||||||
|
|
||||||
pub struct RequestStage {
|
pub struct RequestStage {
|
||||||
pub thread_hdl: JoinHandle<()>,
|
pub thread_hdl: JoinHandle<()>,
|
||||||
pub signal_receiver: Receiver<Signal>,
|
|
||||||
pub blob_receiver: streamer::BlobReceiver,
|
pub blob_receiver: streamer::BlobReceiver,
|
||||||
pub request_processor: Arc<RequestProcessor>,
|
pub request_processor: Arc<RequestProcessor>,
|
||||||
}
|
}
|
||||||
@ -21,18 +19,16 @@ impl RequestStage {
|
|||||||
pub fn new(
|
pub fn new(
|
||||||
request_processor: RequestProcessor,
|
request_processor: RequestProcessor,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
|
packet_receiver: Receiver<SharedPackets>,
|
||||||
packet_recycler: packet::PacketRecycler,
|
packet_recycler: packet::PacketRecycler,
|
||||||
blob_recycler: packet::BlobRecycler,
|
blob_recycler: packet::BlobRecycler,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let request_processor = Arc::new(request_processor);
|
let request_processor = Arc::new(request_processor);
|
||||||
let request_processor_ = request_processor.clone();
|
let request_processor_ = request_processor.clone();
|
||||||
let (signal_sender, signal_receiver) = channel();
|
|
||||||
let (blob_sender, blob_receiver) = channel();
|
let (blob_sender, blob_receiver) = channel();
|
||||||
let thread_hdl = spawn(move || loop {
|
let thread_hdl = spawn(move || loop {
|
||||||
let e = request_processor_.process_request_packets(
|
let e = request_processor_.process_request_packets(
|
||||||
&verified_receiver,
|
&packet_receiver,
|
||||||
&signal_sender,
|
|
||||||
&blob_sender,
|
&blob_sender,
|
||||||
&packet_recycler,
|
&packet_recycler,
|
||||||
&blob_recycler,
|
&blob_recycler,
|
||||||
@ -45,145 +41,8 @@ impl RequestStage {
|
|||||||
});
|
});
|
||||||
RequestStage {
|
RequestStage {
|
||||||
thread_hdl,
|
thread_hdl,
|
||||||
signal_receiver,
|
|
||||||
blob_receiver,
|
blob_receiver,
|
||||||
request_processor,
|
request_processor,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: When banking is pulled out of RequestStage, add this test back in.
|
|
||||||
|
|
||||||
//use bank::Bank;
|
|
||||||
//use entry::Entry;
|
|
||||||
//use event::Event;
|
|
||||||
//use hash::Hash;
|
|
||||||
//use record_stage::RecordStage;
|
|
||||||
//use recorder::Signal;
|
|
||||||
//use result::Result;
|
|
||||||
//use std::sync::mpsc::{channel, Sender};
|
|
||||||
//use std::sync::{Arc, Mutex};
|
|
||||||
//use std::time::Duration;
|
|
||||||
//
|
|
||||||
//#[cfg(test)]
|
|
||||||
//mod tests {
|
|
||||||
// use bank::Bank;
|
|
||||||
// use event::Event;
|
|
||||||
// use event_processor::EventProcessor;
|
|
||||||
// use mint::Mint;
|
|
||||||
// use signature::{KeyPair, KeyPairUtil};
|
|
||||||
// use transaction::Transaction;
|
|
||||||
//
|
|
||||||
// #[test]
|
|
||||||
// // TODO: Move this test banking_stage. Calling process_events() directly
|
|
||||||
// // defeats the purpose of this test.
|
|
||||||
// fn test_banking_sequential_consistency() {
|
|
||||||
// // In this attack we'll demonstrate that a verifier can interpret the ledger
|
|
||||||
// // differently if either the server doesn't signal the ledger to add an
|
|
||||||
// // Entry OR if the verifier tries to parallelize across multiple Entries.
|
|
||||||
// let mint = Mint::new(2);
|
|
||||||
// let bank = Bank::new(&mint);
|
|
||||||
// let event_processor = EventProcessor::new(bank, &mint.last_id(), None);
|
|
||||||
//
|
|
||||||
// // Process a batch that includes a transaction that receives two tokens.
|
|
||||||
// let alice = KeyPair::new();
|
|
||||||
// let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
|
||||||
// let events = vec![Event::Transaction(tr)];
|
|
||||||
// let entry0 = event_processor.process_events(events).unwrap();
|
|
||||||
//
|
|
||||||
// // Process a second batch that spends one of those tokens.
|
|
||||||
// let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
|
|
||||||
// let events = vec![Event::Transaction(tr)];
|
|
||||||
// let entry1 = event_processor.process_events(events).unwrap();
|
|
||||||
//
|
|
||||||
// // Collect the ledger and feed it to a new bank.
|
|
||||||
// let entries = vec![entry0, entry1];
|
|
||||||
//
|
|
||||||
// // Assert the user holds one token, not two. If the server only output one
|
|
||||||
// // entry, then the second transaction will be rejected, because it drives
|
|
||||||
// // the account balance below zero before the credit is added.
|
|
||||||
// let bank = Bank::new(&mint);
|
|
||||||
// for entry in entries {
|
|
||||||
// assert!(
|
|
||||||
// bank
|
|
||||||
// .process_verified_events(entry.events)
|
|
||||||
// .into_iter()
|
|
||||||
// .all(|x| x.is_ok())
|
|
||||||
// );
|
|
||||||
// }
|
|
||||||
// assert_eq!(bank.get_balance(&alice.pubkey()), Some(1));
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//#[cfg(all(feature = "unstable", test))]
|
|
||||||
//mod bench {
|
|
||||||
// extern crate test;
|
|
||||||
// use self::test::Bencher;
|
|
||||||
// use bank::{Bank, MAX_ENTRY_IDS};
|
|
||||||
// use bincode::serialize;
|
|
||||||
// use event_processor::*;
|
|
||||||
// use hash::hash;
|
|
||||||
// use mint::Mint;
|
|
||||||
// use rayon::prelude::*;
|
|
||||||
// use signature::{KeyPair, KeyPairUtil};
|
|
||||||
// use std::collections::HashSet;
|
|
||||||
// use std::time::Instant;
|
|
||||||
// use transaction::Transaction;
|
|
||||||
//
|
|
||||||
// #[bench]
|
|
||||||
// fn process_events_bench(_bencher: &mut Bencher) {
|
|
||||||
// let mint = Mint::new(100_000_000);
|
|
||||||
// let bank = Bank::new(&mint);
|
|
||||||
// // Create transactions between unrelated parties.
|
|
||||||
// let txs = 100_000;
|
|
||||||
// let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
|
|
||||||
// let transactions: Vec<_> = (0..txs)
|
|
||||||
// .into_par_iter()
|
|
||||||
// .map(|i| {
|
|
||||||
// // Seed the 'to' account and a cell for its signature.
|
|
||||||
// let dummy_id = i % (MAX_ENTRY_IDS as i32);
|
|
||||||
// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
|
|
||||||
// {
|
|
||||||
// let mut last_ids = last_ids.lock().unwrap();
|
|
||||||
// if !last_ids.contains(&last_id) {
|
|
||||||
// last_ids.insert(last_id);
|
|
||||||
// bank.register_entry_id(&last_id);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Seed the 'from' account.
|
|
||||||
// let rando0 = KeyPair::new();
|
|
||||||
// let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
|
|
||||||
// bank.process_verified_transaction(&tr).unwrap();
|
|
||||||
//
|
|
||||||
// let rando1 = KeyPair::new();
|
|
||||||
// let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
|
|
||||||
// bank.process_verified_transaction(&tr).unwrap();
|
|
||||||
//
|
|
||||||
// // Finally, return a transaction that's unique
|
|
||||||
// Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
|
|
||||||
// })
|
|
||||||
// .collect();
|
|
||||||
//
|
|
||||||
// let events: Vec<_> = transactions
|
|
||||||
// .into_iter()
|
|
||||||
// .map(|tr| Event::Transaction(tr))
|
|
||||||
// .collect();
|
|
||||||
//
|
|
||||||
// let event_processor = EventProcessor::new(bank, &mint.last_id(), None);
|
|
||||||
//
|
|
||||||
// let now = Instant::now();
|
|
||||||
// assert!(event_processor.process_events(events).is_ok());
|
|
||||||
// let duration = now.elapsed();
|
|
||||||
// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
|
|
||||||
// let tps = txs as f64 / sec;
|
|
||||||
//
|
|
||||||
// // Ensure that all transactions were successfully logged.
|
|
||||||
// drop(event_processor.historian_input);
|
|
||||||
// let entries: Vec<Entry> = event_processor.output.lock().unwrap().iter().collect();
|
|
||||||
// assert_eq!(entries.len(), 1);
|
|
||||||
// assert_eq!(entries[0].events.len(), txs as usize);
|
|
||||||
//
|
|
||||||
// println!("{} tps", tps);
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
91
src/rpu.rs
91
src/rpu.rs
@ -2,60 +2,27 @@
|
|||||||
//! 5-stage transaction processing pipeline in software.
|
//! 5-stage transaction processing pipeline in software.
|
||||||
|
|
||||||
use bank::Bank;
|
use bank::Bank;
|
||||||
use crdt::{Crdt, ReplicatedData};
|
|
||||||
use hash::Hash;
|
|
||||||
use packet;
|
use packet;
|
||||||
use record_stage::RecordStage;
|
|
||||||
use request_processor::RequestProcessor;
|
use request_processor::RequestProcessor;
|
||||||
use request_stage::RequestStage;
|
use request_stage::RequestStage;
|
||||||
use result::Result;
|
|
||||||
use sig_verify_stage::SigVerifyStage;
|
|
||||||
use std::io::Write;
|
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
use std::sync::{Arc, Mutex, RwLock};
|
|
||||||
use std::thread::JoinHandle;
|
use std::thread::JoinHandle;
|
||||||
use std::time::Duration;
|
|
||||||
use streamer;
|
use streamer;
|
||||||
use write_stage::WriteStage;
|
|
||||||
|
|
||||||
pub struct Rpu {
|
pub struct Rpu {
|
||||||
bank: Arc<Bank>,
|
pub thread_hdls: Vec<JoinHandle<()>>,
|
||||||
start_hash: Hash,
|
|
||||||
tick_duration: Option<Duration>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Rpu {
|
impl Rpu {
|
||||||
/// Create a new Rpu that wraps the given Bank.
|
pub fn new(
|
||||||
pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option<Duration>) -> Self {
|
bank: Arc<Bank>,
|
||||||
Rpu {
|
|
||||||
bank: Arc::new(bank),
|
|
||||||
start_hash,
|
|
||||||
tick_duration,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a UDP microservice that forwards messages the given Rpu.
|
|
||||||
/// This service is the network leader
|
|
||||||
/// Set `exit` to shutdown its threads.
|
|
||||||
pub fn serve<W: Write + Send + 'static>(
|
|
||||||
&self,
|
|
||||||
me: ReplicatedData,
|
|
||||||
requests_socket: UdpSocket,
|
requests_socket: UdpSocket,
|
||||||
gossip: UdpSocket,
|
respond_socket: UdpSocket,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
writer: W,
|
) -> Self {
|
||||||
) -> Result<Vec<JoinHandle<()>>> {
|
|
||||||
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
|
|
||||||
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
|
|
||||||
let window = streamer::default_window();
|
|
||||||
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone());
|
|
||||||
|
|
||||||
// make sure we are on the same interface
|
|
||||||
let mut local = requests_socket.local_addr()?;
|
|
||||||
local.set_port(0);
|
|
||||||
|
|
||||||
let packet_recycler = packet::PacketRecycler::default();
|
let packet_recycler = packet::PacketRecycler::default();
|
||||||
let (packet_sender, packet_receiver) = channel();
|
let (packet_sender, packet_receiver) = channel();
|
||||||
let t_receiver = streamer::receiver(
|
let t_receiver = streamer::receiver(
|
||||||
@ -63,45 +30,18 @@ impl Rpu {
|
|||||||
exit.clone(),
|
exit.clone(),
|
||||||
packet_recycler.clone(),
|
packet_recycler.clone(),
|
||||||
packet_sender,
|
packet_sender,
|
||||||
)?;
|
);
|
||||||
|
|
||||||
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
|
|
||||||
|
|
||||||
let blob_recycler = packet::BlobRecycler::default();
|
let blob_recycler = packet::BlobRecycler::default();
|
||||||
let request_processor = RequestProcessor::new(self.bank.clone());
|
let request_processor = RequestProcessor::new(bank.clone());
|
||||||
let request_stage = RequestStage::new(
|
let request_stage = RequestStage::new(
|
||||||
request_processor,
|
request_processor,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
sig_verify_stage.verified_receiver,
|
packet_receiver,
|
||||||
packet_recycler.clone(),
|
packet_recycler.clone(),
|
||||||
blob_recycler.clone(),
|
blob_recycler.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let record_stage = RecordStage::new(
|
|
||||||
request_stage.signal_receiver,
|
|
||||||
&self.start_hash,
|
|
||||||
self.tick_duration,
|
|
||||||
);
|
|
||||||
|
|
||||||
let write_stage = WriteStage::new(
|
|
||||||
self.bank.clone(),
|
|
||||||
exit.clone(),
|
|
||||||
blob_recycler.clone(),
|
|
||||||
Mutex::new(writer),
|
|
||||||
record_stage.entry_receiver,
|
|
||||||
);
|
|
||||||
|
|
||||||
let broadcast_socket = UdpSocket::bind(local)?;
|
|
||||||
let t_broadcast = streamer::broadcaster(
|
|
||||||
broadcast_socket,
|
|
||||||
exit.clone(),
|
|
||||||
crdt.clone(),
|
|
||||||
window,
|
|
||||||
blob_recycler.clone(),
|
|
||||||
write_stage.blob_receiver,
|
|
||||||
);
|
|
||||||
|
|
||||||
let respond_socket = UdpSocket::bind(local.clone())?;
|
|
||||||
let t_responder = streamer::responder(
|
let t_responder = streamer::responder(
|
||||||
respond_socket,
|
respond_socket,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
@ -109,16 +49,7 @@ impl Rpu {
|
|||||||
request_stage.blob_receiver,
|
request_stage.blob_receiver,
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut threads = vec![
|
let thread_hdls = vec![t_receiver, t_responder, request_stage.thread_hdl];
|
||||||
t_receiver,
|
Rpu { thread_hdls }
|
||||||
t_responder,
|
|
||||||
request_stage.thread_hdl,
|
|
||||||
write_stage.thread_hdl,
|
|
||||||
t_gossip,
|
|
||||||
t_listen,
|
|
||||||
t_broadcast,
|
|
||||||
];
|
|
||||||
threads.extend(sig_verify_stage.thread_hdls.into_iter());
|
|
||||||
Ok(threads)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
53
src/server.rs
Normal file
53
src/server.rs
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
//! The `server` module hosts all the server microservices.
|
||||||
|
|
||||||
|
use bank::Bank;
|
||||||
|
use crdt::ReplicatedData;
|
||||||
|
use hash::Hash;
|
||||||
|
use rpu::Rpu;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::net::UdpSocket;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
use std::thread::JoinHandle;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tpu::Tpu;
|
||||||
|
|
||||||
|
pub struct Server {
|
||||||
|
pub thread_hdls: Vec<JoinHandle<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Server {
|
||||||
|
pub fn new<W: Write + Send + 'static>(
|
||||||
|
bank: Bank,
|
||||||
|
start_hash: Hash,
|
||||||
|
tick_duration: Option<Duration>,
|
||||||
|
me: ReplicatedData,
|
||||||
|
requests_socket: UdpSocket,
|
||||||
|
events_socket: UdpSocket,
|
||||||
|
broadcast_socket: UdpSocket,
|
||||||
|
respond_socket: UdpSocket,
|
||||||
|
gossip: UdpSocket,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
writer: W,
|
||||||
|
) -> Self {
|
||||||
|
let bank = Arc::new(bank);
|
||||||
|
let mut thread_hdls = vec![];
|
||||||
|
let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone());
|
||||||
|
thread_hdls.extend(rpu.thread_hdls);
|
||||||
|
|
||||||
|
let tpu = Tpu::new(
|
||||||
|
bank.clone(),
|
||||||
|
start_hash,
|
||||||
|
tick_duration,
|
||||||
|
me,
|
||||||
|
events_socket,
|
||||||
|
broadcast_socket,
|
||||||
|
gossip,
|
||||||
|
exit.clone(),
|
||||||
|
writer,
|
||||||
|
);
|
||||||
|
thread_hdls.extend(tpu.thread_hdls);
|
||||||
|
|
||||||
|
Server { thread_hdls }
|
||||||
|
}
|
||||||
|
}
|
@ -18,9 +18,9 @@ pub struct SigVerifyStage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl SigVerifyStage {
|
impl SigVerifyStage {
|
||||||
pub fn new(exit: Arc<AtomicBool>, packets_receiver: Receiver<SharedPackets>) -> Self {
|
pub fn new(exit: Arc<AtomicBool>, packet_receiver: Receiver<SharedPackets>) -> Self {
|
||||||
let (verified_sender, verified_receiver) = channel();
|
let (verified_sender, verified_receiver) = channel();
|
||||||
let thread_hdls = Self::verifier_services(exit, packets_receiver, verified_sender);
|
let thread_hdls = Self::verifier_services(exit, packet_receiver, verified_sender);
|
||||||
SigVerifyStage {
|
SigVerifyStage {
|
||||||
thread_hdls,
|
thread_hdls,
|
||||||
verified_receiver,
|
verified_receiver,
|
||||||
@ -71,11 +71,11 @@ impl SigVerifyStage {
|
|||||||
|
|
||||||
fn verifier_service(
|
fn verifier_service(
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
packets_receiver: Arc<Mutex<streamer::PacketReceiver>>,
|
packet_receiver: Arc<Mutex<streamer::PacketReceiver>>,
|
||||||
verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
|
verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
spawn(move || loop {
|
spawn(move || loop {
|
||||||
let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone());
|
let e = Self::verifier(&packet_receiver.clone(), &verified_sender.clone());
|
||||||
if e.is_err() && exit.load(Ordering::Relaxed) {
|
if e.is_err() && exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -84,11 +84,11 @@ impl SigVerifyStage {
|
|||||||
|
|
||||||
fn verifier_services(
|
fn verifier_services(
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
packets_receiver: streamer::PacketReceiver,
|
packet_receiver: streamer::PacketReceiver,
|
||||||
verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>,
|
verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>,
|
||||||
) -> Vec<JoinHandle<()>> {
|
) -> Vec<JoinHandle<()>> {
|
||||||
let sender = Arc::new(Mutex::new(verified_sender));
|
let sender = Arc::new(Mutex::new(verified_sender));
|
||||||
let receiver = Arc::new(Mutex::new(packets_receiver));
|
let receiver = Arc::new(Mutex::new(packet_receiver));
|
||||||
(0..4)
|
(0..4)
|
||||||
.map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone()))
|
.map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone()))
|
||||||
.collect()
|
.collect()
|
||||||
|
@ -51,14 +51,12 @@ pub fn receiver(
|
|||||||
sock: UdpSocket,
|
sock: UdpSocket,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
recycler: PacketRecycler,
|
recycler: PacketRecycler,
|
||||||
channel: PacketSender,
|
packet_sender: PacketSender,
|
||||||
) -> Result<JoinHandle<()>> {
|
) -> JoinHandle<()> {
|
||||||
let timer = Duration::new(1, 0);
|
spawn(move || {
|
||||||
sock.set_read_timeout(Some(timer))?;
|
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
|
||||||
Ok(spawn(move || {
|
|
||||||
let _ = recv_loop(&sock, &exit, &recycler, &channel);
|
|
||||||
()
|
()
|
||||||
}))
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
|
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
|
||||||
@ -515,6 +513,8 @@ mod bench {
|
|||||||
}
|
}
|
||||||
fn run_streamer_bench() -> Result<()> {
|
fn run_streamer_bench() -> Result<()> {
|
||||||
let read = UdpSocket::bind("127.0.0.1:0")?;
|
let read = UdpSocket::bind("127.0.0.1:0")?;
|
||||||
|
read.set_read_timeout(Some(Duration::new(1, 0)))?;
|
||||||
|
|
||||||
let addr = read.local_addr()?;
|
let addr = read.local_addr()?;
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let pack_recycler = PacketRecycler::default();
|
let pack_recycler = PacketRecycler::default();
|
||||||
@ -591,13 +591,15 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
pub fn streamer_send_test() {
|
pub fn streamer_send_test() {
|
||||||
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
||||||
|
|
||||||
let addr = read.local_addr().unwrap();
|
let addr = read.local_addr().unwrap();
|
||||||
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let pack_recycler = PacketRecycler::default();
|
let pack_recycler = PacketRecycler::default();
|
||||||
let resp_recycler = BlobRecycler::default();
|
let resp_recycler = BlobRecycler::default();
|
||||||
let (s_reader, r_reader) = channel();
|
let (s_reader, r_reader) = channel();
|
||||||
let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader).unwrap();
|
let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader);
|
||||||
let (s_responder, r_responder) = channel();
|
let (s_responder, r_responder) = channel();
|
||||||
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
|
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
|
||||||
let mut msgs = VecDeque::new();
|
let mut msgs = VecDeque::new();
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
//! unstable and may change in future releases.
|
//! unstable and may change in future releases.
|
||||||
|
|
||||||
use bincode::{deserialize, serialize};
|
use bincode::{deserialize, serialize};
|
||||||
|
use event::Event;
|
||||||
use futures::future::{ok, FutureResult};
|
use futures::future::{ok, FutureResult};
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
use request::{Request, Response};
|
use request::{Request, Response};
|
||||||
@ -67,9 +68,9 @@ impl ThinClient {
|
|||||||
/// Send a signed Transaction to the server for processing. This method
|
/// Send a signed Transaction to the server for processing. This method
|
||||||
/// does not wait for a response.
|
/// does not wait for a response.
|
||||||
pub fn transfer_signed(&self, tr: Transaction) -> io::Result<usize> {
|
pub fn transfer_signed(&self, tr: Transaction) -> io::Result<usize> {
|
||||||
let req = Request::Transaction(tr);
|
let event = Event::Transaction(tr);
|
||||||
let data = serialize(&req).expect("serialize Transaction in pub fn transfer_signed");
|
let data = serialize(&event).expect("serialize Transaction in pub fn transfer_signed");
|
||||||
self.requests_socket.send_to(&data, &self.addr)
|
self.events_socket.send_to(&data, &self.addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates, signs, and processes a Transaction. Useful for writing unit-tests.
|
/// Creates, signs, and processes a Transaction. Useful for writing unit-tests.
|
||||||
@ -160,7 +161,7 @@ mod tests {
|
|||||||
use logger;
|
use logger;
|
||||||
use mint::Mint;
|
use mint::Mint;
|
||||||
use plan::Plan;
|
use plan::Plan;
|
||||||
use rpu::Rpu;
|
use server::Server;
|
||||||
use signature::{KeyPair, KeyPairUtil};
|
use signature::{KeyPair, KeyPairUtil};
|
||||||
use std::io::sink;
|
use std::io::sink;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
@ -177,23 +178,43 @@ mod tests {
|
|||||||
fn test_thin_client() {
|
fn test_thin_client() {
|
||||||
logger::setup();
|
logger::setup();
|
||||||
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let _events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
requests_socket
|
||||||
let addr = serve.local_addr().unwrap();
|
.set_read_timeout(Some(Duration::new(1, 0)))
|
||||||
|
.unwrap();
|
||||||
|
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
let addr = requests_socket.local_addr().unwrap();
|
||||||
let pubkey = KeyPair::new().pubkey();
|
let pubkey = KeyPair::new().pubkey();
|
||||||
let d = ReplicatedData::new(
|
let d = ReplicatedData::new(
|
||||||
pubkey,
|
pubkey,
|
||||||
gossip.local_addr().unwrap(),
|
gossip.local_addr().unwrap(),
|
||||||
"0.0.0.0:0".parse().unwrap(),
|
"0.0.0.0:0".parse().unwrap(),
|
||||||
serve.local_addr().unwrap(),
|
requests_socket.local_addr().unwrap(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let alice = Mint::new(10_000);
|
let alice = Mint::new(10_000);
|
||||||
let bank = Bank::new(&alice);
|
let bank = Bank::new(&alice);
|
||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30)));
|
|
||||||
let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap();
|
let mut local = requests_socket.local_addr().unwrap();
|
||||||
|
local.set_port(0);
|
||||||
|
let broadcast_socket = UdpSocket::bind(local).unwrap();
|
||||||
|
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
|
||||||
|
|
||||||
|
let server = Server::new(
|
||||||
|
bank,
|
||||||
|
alice.last_id(),
|
||||||
|
Some(Duration::from_millis(30)),
|
||||||
|
d,
|
||||||
|
requests_socket,
|
||||||
|
events_socket,
|
||||||
|
broadcast_socket,
|
||||||
|
respond_socket,
|
||||||
|
gossip,
|
||||||
|
exit.clone(),
|
||||||
|
sink(),
|
||||||
|
);
|
||||||
sleep(Duration::from_millis(900));
|
sleep(Duration::from_millis(900));
|
||||||
|
|
||||||
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
@ -217,7 +238,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
assert_eq!(balance.unwrap(), 500);
|
assert_eq!(balance.unwrap(), 500);
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
for t in threads {
|
for t in server.thread_hdls {
|
||||||
t.join().unwrap();
|
t.join().unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -230,15 +251,27 @@ mod tests {
|
|||||||
let bank = Bank::new(&alice);
|
let bank = Bank::new(&alice);
|
||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30)));
|
|
||||||
let serve_addr = leader_serve.local_addr().unwrap();
|
let serve_addr = leader_serve.local_addr().unwrap();
|
||||||
let threads = rpu.serve(
|
|
||||||
|
let mut local = leader_serve.local_addr().unwrap();
|
||||||
|
local.set_port(0);
|
||||||
|
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
let broadcast_socket = UdpSocket::bind(local).unwrap();
|
||||||
|
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
|
||||||
|
|
||||||
|
let server = Server::new(
|
||||||
|
bank,
|
||||||
|
alice.last_id(),
|
||||||
|
Some(Duration::from_millis(30)),
|
||||||
leader_data,
|
leader_data,
|
||||||
leader_serve,
|
leader_serve,
|
||||||
|
events_socket,
|
||||||
|
broadcast_socket,
|
||||||
|
respond_socket,
|
||||||
leader_gossip,
|
leader_gossip,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
sink(),
|
sink(),
|
||||||
).unwrap();
|
);
|
||||||
sleep(Duration::from_millis(300));
|
sleep(Duration::from_millis(300));
|
||||||
|
|
||||||
let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
@ -266,7 +299,7 @@ mod tests {
|
|||||||
trace!("exiting");
|
trace!("exiting");
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
trace!("joining threads");
|
trace!("joining threads");
|
||||||
for t in threads {
|
for t in server.thread_hdls {
|
||||||
t.join().unwrap();
|
t.join().unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -274,6 +307,8 @@ mod tests {
|
|||||||
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
|
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
|
||||||
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
serve.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
||||||
|
|
||||||
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let pubkey = KeyPair::new().pubkey();
|
let pubkey = KeyPair::new().pubkey();
|
||||||
@ -369,15 +404,28 @@ mod tests {
|
|||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
let leader_bank = {
|
let leader_bank = Bank::new(&alice);
|
||||||
let bank = Bank::new(&alice);
|
|
||||||
Rpu::new(bank, alice.last_id(), None)
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut threads = leader_bank
|
let mut local = leader.2.local_addr().unwrap();
|
||||||
.serve(leader.0.clone(), leader.2, leader.1, exit.clone(), sink())
|
local.set_port(0);
|
||||||
.unwrap();
|
let broadcast_socket = UdpSocket::bind(local).unwrap();
|
||||||
|
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
|
||||||
|
|
||||||
|
let server = Server::new(
|
||||||
|
leader_bank,
|
||||||
|
alice.last_id(),
|
||||||
|
None,
|
||||||
|
leader.0.clone(),
|
||||||
|
leader.2,
|
||||||
|
leader.4,
|
||||||
|
broadcast_socket,
|
||||||
|
respond_socket,
|
||||||
|
leader.1,
|
||||||
|
exit.clone(),
|
||||||
|
sink(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut threads = server.thread_hdls;
|
||||||
for _ in 0..N {
|
for _ in 0..N {
|
||||||
replicant(&leader.0, exit.clone(), &alice, &mut threads);
|
replicant(&leader.0, exit.clone(), &alice, &mut threads);
|
||||||
}
|
}
|
||||||
|
92
src/tpu.rs
Normal file
92
src/tpu.rs
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
//! The `tpu` module implements the Transaction Processing Unit, a
|
||||||
|
//! 5-stage transaction processing pipeline in software.
|
||||||
|
|
||||||
|
use bank::Bank;
|
||||||
|
use banking_stage::BankingStage;
|
||||||
|
use crdt::{Crdt, ReplicatedData};
|
||||||
|
use hash::Hash;
|
||||||
|
use packet;
|
||||||
|
use record_stage::RecordStage;
|
||||||
|
use sig_verify_stage::SigVerifyStage;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::net::UdpSocket;
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
use std::sync::mpsc::channel;
|
||||||
|
use std::sync::{Arc, Mutex, RwLock};
|
||||||
|
use std::thread::JoinHandle;
|
||||||
|
use std::time::Duration;
|
||||||
|
use streamer;
|
||||||
|
use write_stage::WriteStage;
|
||||||
|
|
||||||
|
pub struct Tpu {
|
||||||
|
pub thread_hdls: Vec<JoinHandle<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Tpu {
|
||||||
|
pub fn new<W: Write + Send + 'static>(
|
||||||
|
bank: Arc<Bank>,
|
||||||
|
start_hash: Hash,
|
||||||
|
tick_duration: Option<Duration>,
|
||||||
|
me: ReplicatedData,
|
||||||
|
events_socket: UdpSocket,
|
||||||
|
broadcast_socket: UdpSocket,
|
||||||
|
gossip: UdpSocket,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
writer: W,
|
||||||
|
) -> Self {
|
||||||
|
let packet_recycler = packet::PacketRecycler::default();
|
||||||
|
let (packet_sender, packet_receiver) = channel();
|
||||||
|
let t_receiver = streamer::receiver(
|
||||||
|
events_socket,
|
||||||
|
exit.clone(),
|
||||||
|
packet_recycler.clone(),
|
||||||
|
packet_sender,
|
||||||
|
);
|
||||||
|
|
||||||
|
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
|
||||||
|
|
||||||
|
let blob_recycler = packet::BlobRecycler::default();
|
||||||
|
let banking_stage = BankingStage::new(
|
||||||
|
bank.clone(),
|
||||||
|
exit.clone(),
|
||||||
|
sig_verify_stage.verified_receiver,
|
||||||
|
packet_recycler.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let record_stage =
|
||||||
|
RecordStage::new(banking_stage.signal_receiver, &start_hash, tick_duration);
|
||||||
|
|
||||||
|
let write_stage = WriteStage::new(
|
||||||
|
bank.clone(),
|
||||||
|
exit.clone(),
|
||||||
|
blob_recycler.clone(),
|
||||||
|
Mutex::new(writer),
|
||||||
|
record_stage.entry_receiver,
|
||||||
|
);
|
||||||
|
|
||||||
|
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
|
||||||
|
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
|
||||||
|
let window = streamer::default_window();
|
||||||
|
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone());
|
||||||
|
|
||||||
|
let t_broadcast = streamer::broadcaster(
|
||||||
|
broadcast_socket,
|
||||||
|
exit.clone(),
|
||||||
|
crdt.clone(),
|
||||||
|
window,
|
||||||
|
blob_recycler.clone(),
|
||||||
|
write_stage.blob_receiver,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut thread_hdls = vec![
|
||||||
|
t_receiver,
|
||||||
|
banking_stage.thread_hdl,
|
||||||
|
write_stage.thread_hdl,
|
||||||
|
t_gossip,
|
||||||
|
t_listen,
|
||||||
|
t_broadcast,
|
||||||
|
];
|
||||||
|
thread_hdls.extend(sig_verify_stage.thread_hdls.into_iter());
|
||||||
|
Tpu { thread_hdls }
|
||||||
|
}
|
||||||
|
}
|
28
src/tvu.rs
28
src/tvu.rs
@ -2,13 +2,12 @@
|
|||||||
//! 5-stage transaction validation pipeline in software.
|
//! 5-stage transaction validation pipeline in software.
|
||||||
|
|
||||||
use bank::Bank;
|
use bank::Bank;
|
||||||
|
use banking_stage::BankingStage;
|
||||||
use crdt::{Crdt, ReplicatedData};
|
use crdt::{Crdt, ReplicatedData};
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
use ledger;
|
use ledger;
|
||||||
use packet;
|
use packet;
|
||||||
use record_stage::RecordStage;
|
use record_stage::RecordStage;
|
||||||
use request_processor::RequestProcessor;
|
|
||||||
use request_stage::RequestStage;
|
|
||||||
use result::Result;
|
use result::Result;
|
||||||
use sig_verify_stage::SigVerifyStage;
|
use sig_verify_stage::SigVerifyStage;
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
@ -146,31 +145,27 @@ impl Tvu {
|
|||||||
// make sure we are on the same interface
|
// make sure we are on the same interface
|
||||||
let mut local = requests_socket.local_addr()?;
|
let mut local = requests_socket.local_addr()?;
|
||||||
local.set_port(0);
|
local.set_port(0);
|
||||||
let respond_socket = UdpSocket::bind(local.clone())?;
|
|
||||||
|
|
||||||
let packet_recycler = packet::PacketRecycler::default();
|
let packet_recycler = packet::PacketRecycler::default();
|
||||||
let blob_recycler = packet::BlobRecycler::default();
|
|
||||||
let (packet_sender, packet_receiver) = channel();
|
let (packet_sender, packet_receiver) = channel();
|
||||||
let t_packet_receiver = streamer::receiver(
|
let t_packet_receiver = streamer::receiver(
|
||||||
requests_socket,
|
requests_socket,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
packet_recycler.clone(),
|
packet_recycler.clone(),
|
||||||
packet_sender,
|
packet_sender,
|
||||||
)?;
|
);
|
||||||
|
|
||||||
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
|
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
|
||||||
|
|
||||||
let request_processor = RequestProcessor::new(obj.bank.clone());
|
let banking_stage = BankingStage::new(
|
||||||
let request_stage = RequestStage::new(
|
obj.bank.clone(),
|
||||||
request_processor,
|
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
sig_verify_stage.verified_receiver,
|
sig_verify_stage.verified_receiver,
|
||||||
packet_recycler.clone(),
|
packet_recycler.clone(),
|
||||||
blob_recycler.clone(),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let record_stage = RecordStage::new(
|
let record_stage = RecordStage::new(
|
||||||
request_stage.signal_receiver,
|
banking_stage.signal_receiver,
|
||||||
&obj.start_hash,
|
&obj.start_hash,
|
||||||
obj.tick_duration,
|
obj.tick_duration,
|
||||||
);
|
);
|
||||||
@ -178,13 +173,6 @@ impl Tvu {
|
|||||||
let write_stage =
|
let write_stage =
|
||||||
WriteStage::new_drain(obj.bank.clone(), exit.clone(), record_stage.entry_receiver);
|
WriteStage::new_drain(obj.bank.clone(), exit.clone(), record_stage.entry_receiver);
|
||||||
|
|
||||||
let t_responder = streamer::responder(
|
|
||||||
respond_socket,
|
|
||||||
exit.clone(),
|
|
||||||
blob_recycler.clone(),
|
|
||||||
request_stage.blob_receiver,
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut threads = vec![
|
let mut threads = vec![
|
||||||
//replicate threads
|
//replicate threads
|
||||||
t_blob_receiver,
|
t_blob_receiver,
|
||||||
@ -195,8 +183,7 @@ impl Tvu {
|
|||||||
t_listen,
|
t_listen,
|
||||||
//serve threads
|
//serve threads
|
||||||
t_packet_receiver,
|
t_packet_receiver,
|
||||||
t_responder,
|
banking_stage.thread_hdl,
|
||||||
request_stage.thread_hdl,
|
|
||||||
write_stage.thread_hdl,
|
write_stage.thread_hdl,
|
||||||
];
|
];
|
||||||
threads.extend(sig_verify_stage.thread_hdls.into_iter());
|
threads.extend(sig_verify_stage.thread_hdls.into_iter());
|
||||||
@ -212,6 +199,9 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke
|
|||||||
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
|
requests_socket
|
||||||
|
.set_read_timeout(Some(Duration::new(1, 0)))
|
||||||
|
.unwrap();
|
||||||
let pubkey = KeyPair::new().pubkey();
|
let pubkey = KeyPair::new().pubkey();
|
||||||
let d = ReplicatedData::new(
|
let d = ReplicatedData::new(
|
||||||
pubkey,
|
pubkey,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user