comments
This commit is contained in:
@ -222,6 +222,11 @@ fn retransmit(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//service to retransmit messages from the leader to layer 1 nodes
|
||||||
|
//see subscriber.rs for network layer definitions
|
||||||
|
//window receives blobs from the network
|
||||||
|
//for any blobs that originated from the leader, we broadcast
|
||||||
|
//to the rest of the network
|
||||||
pub fn retransmitter(
|
pub fn retransmitter(
|
||||||
sock: UdpSocket,
|
sock: UdpSocket,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
@ -485,18 +490,18 @@ mod test {
|
|||||||
)));
|
)));
|
||||||
let n3 = Node::new([0; 8], 1, read.local_addr().unwrap());
|
let n3 = Node::new([0; 8], 1, read.local_addr().unwrap());
|
||||||
subs.write().unwrap().insert(&[n3]);
|
subs.write().unwrap().insert(&[n3]);
|
||||||
let (s_cast, r_cast) = channel();
|
let (s_retransmit, r_retransmit) = channel();
|
||||||
let re = BlobRecycler::default();
|
let blob_recycler = BlobRecycler::default();
|
||||||
let saddr = send.local_addr().unwrap();
|
let saddr = send.local_addr().unwrap();
|
||||||
let t_retransmit = retransmitter(send, exit.clone(), subs, re.clone(), r_cast);
|
let t_retransmit = retransmitter(send, exit.clone(), subs, blob_recycler.clone(), r_retransmit);
|
||||||
let mut bq = VecDeque::new();
|
let mut bq = VecDeque::new();
|
||||||
let b = re.allocate();
|
let b = blob_recycler.allocate();
|
||||||
b.write().unwrap().meta.size = 10;
|
b.write().unwrap().meta.size = 10;
|
||||||
bq.push_back(b);
|
bq.push_back(b);
|
||||||
s_cast.send(bq).unwrap();
|
s_retransmit.send(bq).unwrap();
|
||||||
let (s_recv, r_recv) = channel();
|
let (s_blob_receiver, r_blob_receiver) = channel();
|
||||||
let t_receiver = blob_receiver(exit.clone(), re.clone(), read, s_recv).unwrap();
|
let t_receiver = blob_receiver(exit.clone(), blob_recycler.clone(), read, s_blob_receiver).unwrap();
|
||||||
let mut oq = r_recv.recv().unwrap();
|
let mut oq = r_blob_receiver.recv().unwrap();
|
||||||
assert_eq!(oq.len(), 1);
|
assert_eq!(oq.len(), 1);
|
||||||
let o = oq.pop_front().unwrap();
|
let o = oq.pop_front().unwrap();
|
||||||
let ro = o.read().unwrap();
|
let ro = o.read().unwrap();
|
||||||
|
@ -13,11 +13,7 @@ pub struct Node {
|
|||||||
//sockaddr doesn't implement default
|
//sockaddr doesn't implement default
|
||||||
impl Default for Node {
|
impl Default for Node {
|
||||||
fn default() -> Node {
|
fn default() -> Node {
|
||||||
Node {
|
Node { id: [0; 8], weight: 0, addr: "0.0.0.0:0".parse().unwrap(), }
|
||||||
id: [0; 8],
|
|
||||||
weight: 0,
|
|
||||||
addr: "0.0.0.0:0".parse().unwrap(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -34,9 +30,13 @@ impl Node {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Subscriber data structure
|
||||||
|
//layer 0, leader
|
||||||
|
//layer 1, as many nodes as we can fit to quickly get reliable 2/3+1 finality
|
||||||
|
//layer 2, everyone else, if layer 1 is 2**10, layer 2 should be 2**20 number of nodes
|
||||||
pub struct Subscribers {
|
pub struct Subscribers {
|
||||||
pub data: Vec<Node>,
|
data: Vec<Node>,
|
||||||
pub me: Node,
|
me: Node,
|
||||||
pub leader: Node,
|
pub leader: Node,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -50,6 +50,8 @@ impl Subscribers {
|
|||||||
h.insert(&[me, leader]);
|
h.insert(&[me, leader]);
|
||||||
h
|
h
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//retransmit messages from the leader to layer 1 nodes
|
||||||
pub fn retransmit(&self, blob: &mut Blob, s: &UdpSocket) -> Result<()> {
|
pub fn retransmit(&self, blob: &mut Blob, s: &UdpSocket) -> Result<()> {
|
||||||
let errs: Vec<_> = self.data
|
let errs: Vec<_> = self.data
|
||||||
.par_iter()
|
.par_iter()
|
||||||
|
Reference in New Issue
Block a user