repair socket and receiver thread (#303)

repair socket and receiver thread
This commit is contained in:
anatoly yakovenko
2018-06-02 08:32:51 -07:00
committed by GitHub
parent fd338c3097
commit 216510c573
7 changed files with 159 additions and 28 deletions

View File

@ -235,7 +235,14 @@ fn spy_node(client_addr: &Arc<RwLock<SocketAddr>>) -> (ReplicatedData, UdpSocket
addr.set_port(port + 1); addr.set_port(port + 1);
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
let pubkey = KeyPair::new().pubkey(); let pubkey = KeyPair::new().pubkey();
let node = ReplicatedData::new(pubkey, gossip.local_addr().unwrap(), daddr, daddr, daddr); let node = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
daddr,
daddr,
daddr,
daddr,
);
(node, gossip) (node, gossip)
} }

View File

@ -129,6 +129,7 @@ fn main() {
UdpSocket::bind("0.0.0.0:0").unwrap(), UdpSocket::bind("0.0.0.0:0").unwrap(),
UdpSocket::bind(repl_data.replicate_addr).unwrap(), UdpSocket::bind(repl_data.replicate_addr).unwrap(),
UdpSocket::bind(repl_data.gossip_addr).unwrap(), UdpSocket::bind(repl_data.gossip_addr).unwrap(),
UdpSocket::bind(repl_data.repair_addr).unwrap(),
leader, leader,
exit.clone(), exit.clone(),
); );

View File

@ -77,6 +77,9 @@ pub struct ReplicatedData {
pub requests_addr: SocketAddr, pub requests_addr: SocketAddr,
/// transactions address /// transactions address
pub transactions_addr: SocketAddr, pub transactions_addr: SocketAddr,
/// repair address, we use this to jump ahead of the packets
/// destined to the replciate_addr
pub repair_addr: SocketAddr,
/// current leader identity /// current leader identity
pub current_leader_id: PublicKey, pub current_leader_id: PublicKey,
/// last verified hash that was submitted to the leader /// last verified hash that was submitted to the leader
@ -92,6 +95,7 @@ impl ReplicatedData {
replicate_addr: SocketAddr, replicate_addr: SocketAddr,
requests_addr: SocketAddr, requests_addr: SocketAddr,
transactions_addr: SocketAddr, transactions_addr: SocketAddr,
repair_addr: SocketAddr,
) -> ReplicatedData { ) -> ReplicatedData {
ReplicatedData { ReplicatedData {
id, id,
@ -101,6 +105,7 @@ impl ReplicatedData {
replicate_addr, replicate_addr,
requests_addr, requests_addr,
transactions_addr, transactions_addr,
repair_addr,
current_leader_id: PublicKey::default(), current_leader_id: PublicKey::default(),
last_verified_hash: Hash::default(), last_verified_hash: Hash::default(),
last_verified_count: 0, last_verified_count: 0,
@ -118,6 +123,7 @@ impl ReplicatedData {
let gossip_addr = Self::next_port(&bind_addr, 1); let gossip_addr = Self::next_port(&bind_addr, 1);
let replicate_addr = Self::next_port(&bind_addr, 2); let replicate_addr = Self::next_port(&bind_addr, 2);
let requests_addr = Self::next_port(&bind_addr, 3); let requests_addr = Self::next_port(&bind_addr, 3);
let repair_addr = Self::next_port(&bind_addr, 4);
let pubkey = KeyPair::new().pubkey(); let pubkey = KeyPair::new().pubkey();
ReplicatedData::new( ReplicatedData::new(
pubkey, pubkey,
@ -125,6 +131,7 @@ impl ReplicatedData {
replicate_addr, replicate_addr,
requests_addr, requests_addr,
transactions_addr, transactions_addr,
repair_addr,
) )
} }
} }
@ -390,7 +397,7 @@ impl Crdt {
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
let valid: Vec<_> = self.table let valid: Vec<_> = self.table
.values() .values()
.filter(|r| r.id != self.me && r.replicate_addr != daddr) .filter(|r| r.id != self.me && r.repair_addr != daddr)
.collect(); .collect();
if valid.is_empty() { if valid.is_empty() {
return Err(Error::CrdtTooSmall); return Err(Error::CrdtTooSmall);
@ -509,7 +516,7 @@ impl Crdt {
let sz = rblob.meta.size; let sz = rblob.meta.size;
outblob.meta.size = sz; outblob.meta.size = sz;
outblob.data[..sz].copy_from_slice(&rblob.data[..sz]); outblob.data[..sz].copy_from_slice(&rblob.data[..sz]);
outblob.meta.set_addr(&from.replicate_addr); outblob.meta.set_addr(&from.repair_addr);
//TODO, set the sender id to the requester so we dont retransmit //TODO, set the sender id to the requester so we dont retransmit
//come up with a cleaner solution for this when sender signatures are checked //come up with a cleaner solution for this when sender signatures are checked
outblob.set_id(from.id).expect("blob set_id"); outblob.set_id(from.id).expect("blob set_id");
@ -518,7 +525,7 @@ impl Crdt {
} }
} else { } else {
assert!(window.read().unwrap()[pos].is_none()); assert!(window.read().unwrap()[pos].is_none());
info!("failed RequestWindowIndex {} {}", ix, from.replicate_addr); info!("failed RequestWindowIndex {} {}", ix, from.repair_addr);
} }
None None
} }
@ -580,10 +587,10 @@ impl Crdt {
trace!( trace!(
"received RequestWindowIndex {} {} myaddr {}", "received RequestWindowIndex {} {} myaddr {}",
ix, ix,
from.replicate_addr, from.repair_addr,
me.replicate_addr me.repair_addr
); );
assert_ne!(from.replicate_addr, me.replicate_addr); assert_ne!(from.repair_addr, me.repair_addr);
Self::run_window_request(&window, &from, ix, blob_recycler) Self::run_window_request(&window, &from, ix, blob_recycler)
} }
Err(_) => { Err(_) => {
@ -656,6 +663,7 @@ pub struct Sockets {
pub transaction: UdpSocket, pub transaction: UdpSocket,
pub respond: UdpSocket, pub respond: UdpSocket,
pub broadcast: UdpSocket, pub broadcast: UdpSocket,
pub repair: UdpSocket,
} }
pub struct TestNode { pub struct TestNode {
@ -672,6 +680,7 @@ impl TestNode {
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let repair = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey(); let pubkey = KeyPair::new().pubkey();
let data = ReplicatedData::new( let data = ReplicatedData::new(
pubkey, pubkey,
@ -679,6 +688,7 @@ impl TestNode {
replicate.local_addr().unwrap(), replicate.local_addr().unwrap(),
requests.local_addr().unwrap(), requests.local_addr().unwrap(),
transaction.local_addr().unwrap(), transaction.local_addr().unwrap(),
repair.local_addr().unwrap(),
); );
TestNode { TestNode {
data: data, data: data,
@ -690,6 +700,7 @@ impl TestNode {
transaction, transaction,
respond, respond,
broadcast, broadcast,
repair,
}, },
} }
} }
@ -698,6 +709,7 @@ impl TestNode {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crdt::{parse_port_or_addr, Crdt, ReplicatedData}; use crdt::{parse_port_or_addr, Crdt, ReplicatedData};
use result::Error;
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
#[test] #[test]
@ -719,6 +731,7 @@ mod tests {
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
); );
assert_eq!(d.version, 0); assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone()); let mut crdt = Crdt::new(d.clone());
@ -736,6 +749,15 @@ mod tests {
copy copy
} }
#[test] #[test]
fn replicated_data_new_leader() {
let d1 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap());
assert_eq!(d1.gossip_addr, "127.0.0.1:1235".parse().unwrap());
assert_eq!(d1.replicate_addr, "127.0.0.1:1236".parse().unwrap());
assert_eq!(d1.requests_addr, "127.0.0.1:1237".parse().unwrap());
assert_eq!(d1.transactions_addr, "127.0.0.1:1234".parse().unwrap());
assert_eq!(d1.repair_addr, "127.0.0.1:1238".parse().unwrap());
}
#[test]
fn update_test() { fn update_test() {
let d1 = ReplicatedData::new( let d1 = ReplicatedData::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
@ -743,6 +765,7 @@ mod tests {
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
); );
let d2 = ReplicatedData::new( let d2 = ReplicatedData::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
@ -750,6 +773,7 @@ mod tests {
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
); );
let d3 = ReplicatedData::new( let d3 = ReplicatedData::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
@ -757,6 +781,7 @@ mod tests {
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
); );
let mut crdt = Crdt::new(d1.clone()); let mut crdt = Crdt::new(d1.clone());
let (key, ix, ups) = crdt.get_updates_since(0); let (key, ix, ups) = crdt.get_updates_since(0);
@ -784,5 +809,65 @@ mod tests {
sorted(&crdt.table.values().map(|x| x.clone()).collect()) sorted(&crdt.table.values().map(|x| x.clone()).collect())
); );
} }
/// Test that insert drops messages that are older
#[test]
fn window_index_request() {
let me = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let mut crdt = Crdt::new(me.clone());
let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtTooSmall));
let nxt = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
);
crdt.insert(&nxt);
let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtTooSmall));
let nxt = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.2:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
crdt.insert(&nxt);
let rv = crdt.window_index_request(0).unwrap();
assert_eq!(nxt.gossip_addr, "127.0.0.2:1234".parse().unwrap());
assert_eq!(rv.0, "127.0.0.2:1234".parse().unwrap());
let nxt = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.3:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
crdt.insert(&nxt);
let mut one = false;
let mut two = false;
while !one || !two {
//this randomly picks an option, so eventually it should pick both
let rv = crdt.window_index_request(0).unwrap();
if rv.0 == "127.0.0.2:1234".parse().unwrap() {
one = true;
}
if rv.0 == "127.0.0.3:1234".parse().unwrap() {
two = true;
}
}
assert!(one && two);
}
} }

View File

@ -79,6 +79,7 @@ impl Server {
respond_socket: UdpSocket, respond_socket: UdpSocket,
replicate_socket: UdpSocket, replicate_socket: UdpSocket,
gossip_socket: UdpSocket, gossip_socket: UdpSocket,
repair_socket: UdpSocket,
leader_repl_data: ReplicatedData, leader_repl_data: ReplicatedData,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
@ -91,6 +92,7 @@ impl Server {
me, me,
gossip_socket, gossip_socket,
replicate_socket, replicate_socket,
repair_socket,
leader_repl_data, leader_repl_data,
exit.clone(), exit.clone(),
); );
@ -98,3 +100,34 @@ impl Server {
Server { thread_hdls } Server { thread_hdls }
} }
} }
#[cfg(test)]
mod tests {
use bank::Bank;
use crdt::TestNode;
use mint::Mint;
use server::Server;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[test]
fn validator_exit() {
let tn = TestNode::new();
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let v = Server::new_validator(
bank,
tn.data.clone(),
tn.sockets.requests,
tn.sockets.respond,
tn.sockets.replicate,
tn.sockets.gossip,
tn.sockets.repair,
tn.data,
exit.clone(),
);
exit.store(true, Ordering::Relaxed);
for t in v.thread_hdls {
t.join().unwrap();
}
}
}

View File

@ -602,10 +602,8 @@ mod bench {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, TestNode};
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
use signature::KeyPair;
use signature::KeyPairUtil;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io; use std::io;
use std::io::Write; use std::io::Write;
@ -688,29 +686,21 @@ mod test {
#[test] #[test]
pub fn window_send_test() { pub fn window_send_test() {
let pubkey_me = KeyPair::new().pubkey(); let tn = TestNode::new();
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let serve = UdpSocket::bind("127.0.0.1:0").expect("bind");
let transaction = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let rep_data = ReplicatedData::new( let mut crdt_me = Crdt::new(tn.data.clone());
pubkey_me,
read.local_addr().unwrap(),
send.local_addr().unwrap(),
serve.local_addr().unwrap(),
transaction.local_addr().unwrap(),
);
let mut crdt_me = Crdt::new(rep_data);
let me_id = crdt_me.my_data().id; let me_id = crdt_me.my_data().id;
crdt_me.set_leader(me_id); crdt_me.set_leader(me_id);
let subs = Arc::new(RwLock::new(crdt_me)); let subs = Arc::new(RwLock::new(crdt_me));
let resp_recycler = BlobRecycler::default(); let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = let t_receiver = blob_receiver(
blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap(); exit.clone(),
resp_recycler.clone(),
tn.sockets.gossip,
s_reader,
).unwrap();
let (s_window, r_window) = channel(); let (s_window, r_window) = channel();
let (s_retransmit, r_retransmit) = channel(); let (s_retransmit, r_retransmit) = channel();
let win = default_window(); let win = default_window();
@ -724,7 +714,12 @@ mod test {
s_retransmit, s_retransmit,
); );
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); let t_responder = responder(
tn.sockets.replicate,
exit.clone(),
resp_recycler.clone(),
r_responder,
);
let mut msgs = VecDeque::new(); let mut msgs = VecDeque::new();
for v in 0..10 { for v in 0..10 {
let i = 9 - v; let i = 9 - v;
@ -735,7 +730,7 @@ mod test {
w.set_id(me_id).unwrap(); w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap()); assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE; w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr); w.meta.set_addr(&tn.data.gossip_addr);
msgs.push_back(b_); msgs.push_back(b_);
} }
s_responder.send(msgs).expect("send"); s_responder.send(msgs).expect("send");

View File

@ -51,6 +51,7 @@ impl Tvu {
me: ReplicatedData, me: ReplicatedData,
gossip_listen_socket: UdpSocket, gossip_listen_socket: UdpSocket,
replicate: UdpSocket, replicate: UdpSocket,
repair_socket: UdpSocket,
leader: ReplicatedData, leader: ReplicatedData,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
@ -96,6 +97,12 @@ impl Tvu {
blob_recycler.clone(), blob_recycler.clone(),
retransmit_receiver, retransmit_receiver,
); );
let t_repair_receiver = streamer::blob_receiver(
exit.clone(),
blob_recycler.clone(),
repair_socket,
blob_sender.clone(),
).expect("tvu: blob repair receiver fail");
//TODO //TODO
//the packets coming out of blob_receiver need to be sent to the GPU and verified //the packets coming out of blob_receiver need to be sent to the GPU and verified
@ -122,6 +129,7 @@ impl Tvu {
t_blob_receiver, t_blob_receiver,
t_retransmit, t_retransmit,
t_window, t_window,
t_repair_receiver,
replicate_stage.thread_hdl, replicate_stage.thread_hdl,
]; ];
threads.extend(data_replicator.thread_hdls.into_iter()); threads.extend(data_replicator.thread_hdls.into_iter());
@ -218,6 +226,7 @@ pub mod tests {
target1.data, target1.data,
target1.sockets.gossip, target1.sockets.gossip,
target1.sockets.replicate, target1.sockets.replicate,
target1.sockets.repair,
leader.data, leader.data,
exit.clone(), exit.clone(),
); );

View File

@ -37,6 +37,7 @@ fn validator(
validator.sockets.respond, validator.sockets.respond,
validator.sockets.replicate, validator.sockets.replicate,
validator.sockets.gossip, validator.sockets.gossip,
validator.sockets.repair,
leader.clone(), leader.clone(),
exit.clone(), exit.clone(),
); );