Do not generate gossip requests to unspecified addresses (#657)
* Do not generate gossip requests to unspecified addresses * review comments
This commit is contained in:
parent
2bd48b4207
commit
1fb537deb9
@ -29,7 +29,7 @@ impl<'a, 'b> ChooseRandomPeerStrategy<'a> {
|
|||||||
impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> {
|
impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> {
|
||||||
fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
|
fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
|
||||||
if options.is_empty() {
|
if options.is_empty() {
|
||||||
Err(CrdtError::TooSmall)?;
|
Err(CrdtError::NoPeers)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let n = ((self.random)() as usize) % options.len();
|
let n = ((self.random)() as usize) % options.len();
|
||||||
@ -174,7 +174,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
|
|||||||
impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
|
impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
|
||||||
fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
|
fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
|
||||||
if options.is_empty() {
|
if options.is_empty() {
|
||||||
Err(CrdtError::TooSmall)?;
|
Err(CrdtError::NoPeers)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut weighted_peers = vec![];
|
let mut weighted_peers = vec![];
|
||||||
|
105
src/crdt.rs
105
src/crdt.rs
@ -46,7 +46,7 @@ const MIN_TABLE_SIZE: usize = 2;
|
|||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub enum CrdtError {
|
pub enum CrdtError {
|
||||||
TooSmall,
|
NoPeers,
|
||||||
NoLeader,
|
NoLeader,
|
||||||
BadContactInfo,
|
BadContactInfo,
|
||||||
BadNodeInfo,
|
BadNodeInfo,
|
||||||
@ -150,7 +150,7 @@ impl NodeInfo {
|
|||||||
rpu: SocketAddr,
|
rpu: SocketAddr,
|
||||||
tpu: SocketAddr,
|
tpu: SocketAddr,
|
||||||
tvu_window: SocketAddr,
|
tvu_window: SocketAddr,
|
||||||
) -> NodeInfo {
|
) -> Self {
|
||||||
NodeInfo {
|
NodeInfo {
|
||||||
id,
|
id,
|
||||||
version: 0,
|
version: 0,
|
||||||
@ -169,6 +169,35 @@ impl NodeInfo {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#[cfg(test)]
|
||||||
|
/// NodeInfo with unspecified addresses for adversarial testing.
|
||||||
|
pub fn new_unspecified() -> Self {
|
||||||
|
let addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||||
|
assert!(addr.ip().is_unspecified());
|
||||||
|
Self::new(
|
||||||
|
KeyPair::new().pubkey(),
|
||||||
|
addr.clone(),
|
||||||
|
addr.clone(),
|
||||||
|
addr.clone(),
|
||||||
|
addr.clone(),
|
||||||
|
addr.clone(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
#[cfg(test)]
|
||||||
|
/// NodeInfo with multicast addresses for adversarial testing.
|
||||||
|
pub fn new_multicast() -> Self {
|
||||||
|
let addr: SocketAddr = "224.0.1.255:1000".parse().unwrap();
|
||||||
|
assert!(addr.ip().is_multicast());
|
||||||
|
Self::new(
|
||||||
|
KeyPair::new().pubkey(),
|
||||||
|
addr.clone(),
|
||||||
|
addr.clone(),
|
||||||
|
addr.clone(),
|
||||||
|
addr.clone(),
|
||||||
|
addr.clone(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn debug_id(&self) -> u64 {
|
pub fn debug_id(&self) -> u64 {
|
||||||
make_debug_id(&self.id)
|
make_debug_id(&self.id)
|
||||||
}
|
}
|
||||||
@ -538,7 +567,7 @@ impl Crdt {
|
|||||||
if broadcast_table.is_empty() {
|
if broadcast_table.is_empty() {
|
||||||
warn!("{:x}:not enough peers in crdt table", me.debug_id());
|
warn!("{:x}:not enough peers in crdt table", me.debug_id());
|
||||||
inc_new_counter!("crdt-broadcast-not_enough_peers_error", 1, 1);
|
inc_new_counter!("crdt-broadcast-not_enough_peers_error", 1, 1);
|
||||||
Err(CrdtError::TooSmall)?;
|
Err(CrdtError::NoPeers)?;
|
||||||
}
|
}
|
||||||
trace!("broadcast nodes {}", broadcast_table.len());
|
trace!("broadcast nodes {}", broadcast_table.len());
|
||||||
|
|
||||||
@ -686,7 +715,7 @@ impl Crdt {
|
|||||||
.filter(|r| r.id != self.me && r.contact_info.tvu_window != daddr)
|
.filter(|r| r.id != self.me && r.contact_info.tvu_window != daddr)
|
||||||
.collect();
|
.collect();
|
||||||
if valid.is_empty() {
|
if valid.is_empty() {
|
||||||
Err(CrdtError::TooSmall)?;
|
Err(CrdtError::NoPeers)?;
|
||||||
}
|
}
|
||||||
let n = (Self::random() as usize) % valid.len();
|
let n = (Self::random() as usize) % valid.len();
|
||||||
let addr = valid[n].contact_info.ncp;
|
let addr = valid[n].contact_info.ncp;
|
||||||
@ -701,7 +730,14 @@ impl Crdt {
|
|||||||
/// * A - Address to send to
|
/// * A - Address to send to
|
||||||
/// * B - RequestUpdates protocol message
|
/// * B - RequestUpdates protocol message
|
||||||
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
|
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
|
||||||
let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect();
|
let options: Vec<_> = self.table
|
||||||
|
.values()
|
||||||
|
.filter(|v| {
|
||||||
|
v.id != self.me
|
||||||
|
&& !v.contact_info.ncp.ip().is_unspecified()
|
||||||
|
&& !v.contact_info.ncp.ip().is_multicast()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
let choose_peer_strategy = ChooseWeightedPeerStrategy::new(
|
let choose_peer_strategy = ChooseWeightedPeerStrategy::new(
|
||||||
&self.remote,
|
&self.remote,
|
||||||
@ -711,7 +747,7 @@ impl Crdt {
|
|||||||
|
|
||||||
let choose_peer_result = choose_peer_strategy.choose_peer(options);
|
let choose_peer_result = choose_peer_strategy.choose_peer(options);
|
||||||
|
|
||||||
if let Err(Error::CrdtError(CrdtError::TooSmall)) = &choose_peer_result {
|
if let Err(Error::CrdtError(CrdtError::NoPeers)) = &choose_peer_result {
|
||||||
trace!(
|
trace!(
|
||||||
"crdt too small for gossip {:x} {}",
|
"crdt too small for gossip {:x} {}",
|
||||||
self.debug_id(),
|
self.debug_id(),
|
||||||
@ -1233,35 +1269,14 @@ mod tests {
|
|||||||
Crdt::new(d2).err(),
|
Crdt::new(d2).err(),
|
||||||
Some(Error::CrdtError(CrdtError::BadContactInfo))
|
Some(Error::CrdtError(CrdtError::BadContactInfo))
|
||||||
);
|
);
|
||||||
let d3 = NodeInfo::new(
|
let d3 = NodeInfo::new_unspecified();
|
||||||
KeyPair::new().pubkey(),
|
|
||||||
"0.0.0.0:0".parse().unwrap(),
|
|
||||||
"0.0.0.0:0".parse().unwrap(),
|
|
||||||
"0.0.0.0:0".parse().unwrap(),
|
|
||||||
"0.0.0.0:0".parse().unwrap(),
|
|
||||||
"0.0.0.0:0".parse().unwrap(),
|
|
||||||
);
|
|
||||||
assert_eq!(Crdt::new(d3).is_ok(), true);
|
assert_eq!(Crdt::new(d3).is_ok(), true);
|
||||||
let d4 = NodeInfo::new(
|
let d4 = NodeInfo::new_multicast();
|
||||||
KeyPair::new().pubkey(),
|
|
||||||
"255.255.255.255:0".parse().unwrap(),
|
|
||||||
"255.255.255.255:0".parse().unwrap(),
|
|
||||||
"255.255.255.255:0".parse().unwrap(),
|
|
||||||
"255.255.255.255:0".parse().unwrap(),
|
|
||||||
"255.255.255.255:0".parse().unwrap(),
|
|
||||||
);
|
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
Crdt::new(d4).err(),
|
Crdt::new(d4).err(),
|
||||||
Some(Error::CrdtError(CrdtError::BadContactInfo))
|
Some(Error::CrdtError(CrdtError::BadContactInfo))
|
||||||
);
|
);
|
||||||
let mut d5 = NodeInfo::new(
|
let mut d5 = NodeInfo::new_multicast();
|
||||||
KeyPair::new().pubkey(),
|
|
||||||
"255.255.255.255:0".parse().unwrap(),
|
|
||||||
"255.255.255.255:0".parse().unwrap(),
|
|
||||||
"255.255.255.255:0".parse().unwrap(),
|
|
||||||
"255.255.255.255:0".parse().unwrap(),
|
|
||||||
"255.255.255.255:0".parse().unwrap(),
|
|
||||||
);
|
|
||||||
d5.version = 1;
|
d5.version = 1;
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
Crdt::new(d5).err(),
|
Crdt::new(d5).err(),
|
||||||
@ -1466,7 +1481,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
|
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
|
||||||
let rv = crdt.window_index_request(0);
|
let rv = crdt.window_index_request(0);
|
||||||
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
|
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
|
||||||
let nxt = NodeInfo::new(
|
let nxt = NodeInfo::new(
|
||||||
KeyPair::new().pubkey(),
|
KeyPair::new().pubkey(),
|
||||||
"127.0.0.1:1234".parse().unwrap(),
|
"127.0.0.1:1234".parse().unwrap(),
|
||||||
@ -1477,7 +1492,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
crdt.insert(&nxt);
|
crdt.insert(&nxt);
|
||||||
let rv = crdt.window_index_request(0);
|
let rv = crdt.window_index_request(0);
|
||||||
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
|
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
|
||||||
let nxt = NodeInfo::new(
|
let nxt = NodeInfo::new(
|
||||||
KeyPair::new().pubkey(),
|
KeyPair::new().pubkey(),
|
||||||
"127.0.0.2:1234".parse().unwrap(),
|
"127.0.0.2:1234".parse().unwrap(),
|
||||||
@ -1515,6 +1530,30 @@ mod tests {
|
|||||||
assert!(one && two);
|
assert!(one && two);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn gossip_request_bad_addr() {
|
||||||
|
let me = NodeInfo::new(
|
||||||
|
KeyPair::new().pubkey(),
|
||||||
|
"127.0.0.1:127".parse().unwrap(),
|
||||||
|
"127.0.0.1:127".parse().unwrap(),
|
||||||
|
"127.0.0.1:127".parse().unwrap(),
|
||||||
|
"127.0.0.1:127".parse().unwrap(),
|
||||||
|
"127.0.0.1:127".parse().unwrap(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut crdt = Crdt::new(me).expect("Crdt::new");
|
||||||
|
let nxt1 = NodeInfo::new_unspecified();
|
||||||
|
// Filter out unspecified addresses
|
||||||
|
crdt.insert(&nxt1); //<--- attack!
|
||||||
|
let rv = crdt.gossip_request();
|
||||||
|
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
|
||||||
|
let nxt2 = NodeInfo::new_multicast();
|
||||||
|
// Filter out multicast addresses
|
||||||
|
crdt.insert(&nxt2); //<--- attack!
|
||||||
|
let rv = crdt.gossip_request();
|
||||||
|
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
|
||||||
|
}
|
||||||
|
|
||||||
/// test that gossip requests are eventually generated for all nodes
|
/// test that gossip requests are eventually generated for all nodes
|
||||||
#[test]
|
#[test]
|
||||||
fn gossip_request() {
|
fn gossip_request() {
|
||||||
@ -1528,7 +1567,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
|
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
|
||||||
let rv = crdt.gossip_request();
|
let rv = crdt.gossip_request();
|
||||||
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
|
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
|
||||||
let nxt1 = NodeInfo::new(
|
let nxt1 = NodeInfo::new(
|
||||||
KeyPair::new().pubkey(),
|
KeyPair::new().pubkey(),
|
||||||
"127.0.0.2:1234".parse().unwrap(),
|
"127.0.0.2:1234".parse().unwrap(),
|
||||||
|
@ -731,7 +731,7 @@ pub fn broadcaster(
|
|||||||
match e {
|
match e {
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
Error::CrdtError(CrdtError::TooSmall) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
||||||
_ => {
|
_ => {
|
||||||
inc_new_counter!("streamer-broadcaster-error", 1, 1);
|
inc_new_counter!("streamer-broadcaster-error", 1, 1);
|
||||||
error!("broadcaster error: {:?}", e);
|
error!("broadcaster error: {:?}", e);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user