Defer repair request for blobs that may still be in avalanche transit (#814)
This commit is contained in:
32
src/streamer.rs
Normal file → Executable file
32
src/streamer.rs
Normal file → Executable file
@ -205,6 +205,14 @@ fn find_next_missing(
|
|||||||
Ok(reqs)
|
Ok(reqs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u64) -> u64 {
|
||||||
|
// Calculate the highest blob index that this node should have already received
|
||||||
|
// via avalanche. The avalanche splits data stream into nodes and each node retransmits
|
||||||
|
// the data to their peer nodes. So there's a possibility that a blob (with index lower
|
||||||
|
// than current received index) is being retransmitted by a peer node.
|
||||||
|
cmp::max(consumed, received.saturating_sub(num_peers))
|
||||||
|
}
|
||||||
|
|
||||||
fn repair_window(
|
fn repair_window(
|
||||||
debug_id: u64,
|
debug_id: u64,
|
||||||
window: &SharedWindow,
|
window: &SharedWindow,
|
||||||
@ -227,16 +235,21 @@ fn repair_window(
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let reqs = find_next_missing(window, crdt, recycler, consumed, received)?;
|
let highest_lost = calculate_highest_lost_blob_index(
|
||||||
|
crdt.read().unwrap().table.len() as u64,
|
||||||
|
consumed,
|
||||||
|
received,
|
||||||
|
);
|
||||||
|
let reqs = find_next_missing(window, crdt, recycler, consumed, highest_lost)?;
|
||||||
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
|
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
|
||||||
if !reqs.is_empty() {
|
if !reqs.is_empty() {
|
||||||
inc_new_counter!("streamer-repair_window-repair", reqs.len());
|
inc_new_counter!("streamer-repair_window-repair", reqs.len());
|
||||||
debug!(
|
debug!(
|
||||||
"{:x}: repair_window counter times: {} consumed: {} received: {} missing: {}",
|
"{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
|
||||||
debug_id,
|
debug_id,
|
||||||
*times,
|
*times,
|
||||||
consumed,
|
consumed,
|
||||||
received,
|
highest_lost,
|
||||||
reqs.len()
|
reqs.len()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -245,7 +258,7 @@ fn repair_window(
|
|||||||
//todo cache socket
|
//todo cache socket
|
||||||
debug!(
|
debug!(
|
||||||
"{:x}: repair_window request {} {} {}",
|
"{:x}: repair_window request {} {} {}",
|
||||||
debug_id, consumed, received, to
|
debug_id, consumed, highest_lost, to
|
||||||
);
|
);
|
||||||
assert!(req.len() <= BLOB_SIZE);
|
assert!(req.len() <= BLOB_SIZE);
|
||||||
sock.send_to(&req, to)?;
|
sock.send_to(&req, to)?;
|
||||||
@ -902,6 +915,7 @@ mod test {
|
|||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use streamer::calculate_highest_lost_blob_index;
|
||||||
use streamer::{blob_receiver, receiver, responder, window};
|
use streamer::{blob_receiver, receiver, responder, window};
|
||||||
use streamer::{default_window, BlobReceiver, PacketReceiver};
|
use streamer::{default_window, BlobReceiver, PacketReceiver};
|
||||||
|
|
||||||
@ -1053,4 +1067,14 @@ mod test {
|
|||||||
t_responder.join().expect("join");
|
t_responder.join().expect("join");
|
||||||
t_window.join().expect("join");
|
t_window.join().expect("join");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn calculate_highest_lost_blob_index_test() {
|
||||||
|
assert_eq!(calculate_highest_lost_blob_index(0, 10, 90), 90);
|
||||||
|
assert_eq!(calculate_highest_lost_blob_index(15, 10, 90), 75);
|
||||||
|
assert_eq!(calculate_highest_lost_blob_index(90, 10, 90), 10);
|
||||||
|
assert_eq!(calculate_highest_lost_blob_index(90, 10, 50), 10);
|
||||||
|
assert_eq!(calculate_highest_lost_blob_index(90, 10, 99), 10);
|
||||||
|
assert_eq!(calculate_highest_lost_blob_index(90, 10, 101), 11);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user