This commit is contained in:
Anatoly Yakovenko
2018-05-23 07:11:11 -07:00
committed by Greg Fitzgerald
parent 79a58da6a9
commit 437c485e5c
5 changed files with 52 additions and 78 deletions

View File

@ -131,6 +131,7 @@ fn main() {
gossip_sock.local_addr().unwrap(),
replicate_sock.local_addr().unwrap(),
serve_sock.local_addr().unwrap(),
events_sock.local_addr().unwrap(),
);
let mut local = serve_sock.local_addr().unwrap();

View File

@ -31,7 +31,7 @@ use std::thread::{sleep, spawn, JoinHandle};
use std::time::Duration;
/// Structure to be replicated by the network
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ReplicatedData {
pub id: PublicKey,
sig: Signature,
@ -42,7 +42,9 @@ pub struct ReplicatedData {
/// address to connect to for replication
pub replicate_addr: SocketAddr,
/// address to connect to when this node is leader
pub serve_addr: SocketAddr,
pub requests_addr: SocketAddr,
/// events address
pub events_addr: SocketAddr,
/// current leader identity
current_leader_id: PublicKey,
/// last verified hash that was submitted to the leader
@ -56,7 +58,8 @@ impl ReplicatedData {
id: PublicKey,
gossip_addr: SocketAddr,
replicate_addr: SocketAddr,
serve_addr: SocketAddr,
requests_addr: SocketAddr,
events_addr: SocketAddr,
) -> ReplicatedData {
ReplicatedData {
id,
@ -64,7 +67,8 @@ impl ReplicatedData {
version: 0,
gossip_addr,
replicate_addr,
serve_addr,
requests_addr,
events_addr,
current_leader_id: PublicKey::default(),
last_verified_hash: Hash::default(),
last_verified_count: 0,
@ -515,12 +519,14 @@ mod test {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
let events = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
events.local_addr().unwrap(),
);
let crdt = Crdt::new(d);
trace!(
@ -632,6 +638,7 @@ mod test {
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
);
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone());

View File

@ -650,12 +650,14 @@ mod test {
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let serve = UdpSocket::bind("127.0.0.1:0").expect("bind");
let event = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let rep_data = ReplicatedData::new(
pubkey_me,
read.local_addr().unwrap(),
send.local_addr().unwrap(),
serve.local_addr().unwrap(),
event.local_addr().unwrap(),
);
let mut crdt_me = Crdt::new(rep_data);
let me_id = crdt_me.my_data().id;
@ -712,21 +714,17 @@ mod test {
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
let event = UdpSocket::bind("127.0.0.1:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
event.local_addr().unwrap(),
);
let crdt = Crdt::new(d);
trace!(
"id: {} gossip: {} replicate: {} serve: {}",
crdt.my_data().id[0],
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
trace!("data: {:?}", d);
(Arc::new(RwLock::new(crdt)), gossip, replicate, serve)
}

View File

@ -211,6 +211,7 @@ mod tests {
gossip.local_addr().unwrap(),
"0.0.0.0:0".parse().unwrap(),
requests_socket.local_addr().unwrap(),
events_addr,
);
let alice = Mint::new(10_000);
@ -326,6 +327,7 @@ mod tests {
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
events_socket.local_addr().unwrap(),
);
(leader, gossip, serve, replicate, events_socket)
}
@ -364,7 +366,7 @@ mod tests {
let daddr = "0.0.0.0:0".parse().unwrap();
let me = spy.id.clone();
spy.replicate_addr = daddr;
spy.serve_addr = daddr;
spy.requests_addr = daddr;
let mut spy_crdt = Crdt::new(spy);
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);
@ -393,7 +395,7 @@ mod tests {
.values()
.into_iter()
.filter(|x| x.id != me)
.map(|x| x.serve_addr)
.map(|x| x.requests_addr)
.collect();
v.clone()
}
@ -446,7 +448,7 @@ mod tests {
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(
leader.0.serve_addr,
leader.0.requests_addr,
requests_socket,
events_addr,
events_socket,

View File

@ -1,5 +1,23 @@
//! The `tvu` module implements the Transaction Validation Unit, a
//! 5-stage transaction validation pipeline in software.
//! 1. streamer
//! - Incoming blobs are picked up from the replicate socket.
//! 2. verifier
//! - TODO Blobs are sent to the GPU, and while the memory is there the PoH stream is verified
//! along with the ecdsa signature for the blob and each signature in all the transactions.
//! 3.a retransmit
//! - Blobs originating from the parent (leader atm is the only parent), are retransmit to all the
//! peers in the crdt. Peers is everyone who is not me or the leader that has a known replicate
//! address.
//! 3.b window
//! - Verified blobs are placed into a window, indexed by the counter set by the leader. This could
//! be the PoH counter if its monitonically increasing in each blob. Easure coding is used to
//! recover any missing packets, and requests are made at random to peers and parents to retransmit
//! a missing packet.
//! 4. accountant
//! - Contigous blobs are sent to the accountant for processing transactions
//! 5. validator
//! - TODO Validation messages are sent back to the leader
use bank::Bank;
use banking_stage::BankingStage;
@ -20,48 +38,27 @@ use streamer;
use write_stage::WriteStage;
pub struct Tvu {
bank: Arc<Bank>,
start_hash: Hash,
tick_duration: Option<Duration>,
pub thread_hdls: Vec<JoinHandle<()>>,
}
impl Tvu {
/// Create a new Tvu that wraps the given Bank.
pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option<Duration>) -> Self {
Tvu {
bank: Arc::new(bank),
start_hash,
tick_duration,
}
}
/// This service receives messages from a leader in the network and processes the transactions
/// on the bank state.
/// # Arguments
/// * `obj` - The bank state.
/// * `bank` - The bank state.
/// * `me` - my configuration
/// * `gossip` - my gosisp socket
/// * `replicte` - my replicte socket
/// * `leader` - leader configuration
/// * `exit` - The exit signal.
/// # Remarks
/// The pipeline is constructed as follows:
/// 1. receive blobs from the network, these are out of order
/// 2. verify blobs, PoH, signatures (TODO)
/// 3. reconstruct contiguous window
/// a. order the blobs
/// b. use erasure coding to reconstruct missing blobs
/// c. ask the network for missing blobs, if erasure coding is insufficient
/// d. make sure that the blobs PoH sequences connect (TODO)
/// 4. process the transaction state machine
/// 5. respond with the hash of the state back to the leader
pub fn serve(
obj: &Arc<Tvu>,
pub fn new(
bank: Arc<Bank>,
me: ReplicatedData,
gossip: UdpSocket,
requests_socket: UdpSocket,
replicate: UdpSocket,
leader: ReplicatedData,
exit: Arc<AtomicBool>,
) -> Result<Vec<JoinHandle<()>>> {
) -> Self {
//replicate pipeline
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
crdt.write()
@ -118,39 +115,7 @@ impl Tvu {
blob_recycler.clone(),
);
//serve pipeline
// make sure we are on the same interface
let mut local = requests_socket.local_addr()?;
local.set_port(0);
let packet_recycler = packet::PacketRecycler::default();
let (packet_sender, packet_receiver) = channel();
let t_packet_receiver = streamer::receiver(
requests_socket,
exit.clone(),
packet_recycler.clone(),
packet_sender,
);
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
let banking_stage = BankingStage::new(
obj.bank.clone(),
exit.clone(),
sig_verify_stage.verified_receiver,
packet_recycler.clone(),
);
let record_stage = RecordStage::new(
banking_stage.signal_receiver,
&obj.start_hash,
obj.tick_duration,
);
let write_stage =
WriteStage::new_drain(obj.bank.clone(), exit.clone(), record_stage.entry_receiver);
let mut threads = vec![
let threads = vec![
//replicate threads
t_blob_receiver,
t_retransmit,
@ -164,7 +129,7 @@ impl Tvu {
write_stage.thread_hdl,
];
threads.extend(sig_verify_stage.thread_hdls.into_iter());
Ok(threads)
Tvu{threads}
}
}
@ -185,6 +150,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
requests_socket.local_addr().unwrap(),
events_socket.local_addr().unwrap(),
);
(d, gossip, replicate, requests_socket, events_socket)
}
@ -215,7 +181,7 @@ mod tests {
fn test_replicate() {
logger::setup();
let (leader_data, leader_gossip, _, leader_serve, _) = test_node();
let (target1_data, target1_gossip, target1_replicate, target1_serve, _) = test_node();
let (target1_data, target1_gossip, target1_replicate, _, target1_events) = test_node();
let (target2_data, target2_gossip, target2_replicate, _, _) = test_node();
let exit = Arc::new(AtomicBool::new(false));
@ -273,7 +239,7 @@ mod tests {
&tvu,
target1_data,
target1_gossip,
target1_serve,
target1_events,
target1_replicate,
leader_data,
exit.clone(),