Add "download from replicator" utility (#4709)

automerge
This commit is contained in:
Sagar Dhawan
2019-06-17 18:12:13 -07:00
committed by Grimes
parent 8fbf0e2d9f
commit cc48773b03
5 changed files with 161 additions and 110 deletions

View File

@ -815,33 +815,34 @@ impl ClusterInfo {
}
let n = thread_rng().gen::<usize>() % valid.len();
let addr = valid[n].gossip; // send the request to the peer's gossip port
let out = {
match repair_request {
RepairType::Blob(slot, blob_index) => {
datapoint_debug!(
"cluster_info-repair",
("repair-slot", *slot, i64),
("repair-ix", *blob_index, i64)
);
self.window_index_request_bytes(*slot, *blob_index)?
}
RepairType::HighestBlob(slot, blob_index) => {
datapoint_debug!(
"cluster_info-repair_highest",
("repair-highest-slot", *slot, i64),
("repair-highest-ix", *blob_index, i64)
);
self.window_highest_index_request_bytes(*slot, *blob_index)?
}
RepairType::Orphan(slot) => {
datapoint_debug!("cluster_info-repair_orphan", ("repair-orphan", *slot, i64));
self.orphan_bytes(*slot)?
}
}
};
let out = self.map_repair_request(repair_request)?;
Ok((addr, out))
}
pub fn map_repair_request(&self, repair_request: &RepairType) -> Result<Vec<u8>> {
match repair_request {
RepairType::Blob(slot, blob_index) => {
datapoint_debug!(
"cluster_info-repair",
("repair-slot", *slot, i64),
("repair-ix", *blob_index, i64)
);
Ok(self.window_index_request_bytes(*slot, *blob_index)?)
}
RepairType::HighestBlob(slot, blob_index) => {
datapoint_debug!(
"cluster_info-repair_highest",
("repair-highest-slot", *slot, i64),
("repair-highest-ix", *blob_index, i64)
);
Ok(self.window_highest_index_request_bytes(*slot, *blob_index)?)
}
RepairType::Orphan(slot) => {
datapoint_debug!("cluster_info-repair_orphan", ("repair-orphan", *slot, i64));
Ok(self.orphan_bytes(*slot)?)
}
}
}
// If the network entrypoint hasn't been discovered yet, add it to the crds table
fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, Bloom<Hash>, SocketAddr, CrdsValue)>) {
match &self.entrypoint {

View File

@ -195,7 +195,7 @@ impl RepairService {
}
// Generate repairs for all slots `x` in the repair_range.start <= x <= repair_range.end
fn generate_repairs_in_range(
pub fn generate_repairs_in_range(
blocktree: &Blocktree,
max_repairs: usize,
repair_range: &RepairSlotRange,

View File

@ -5,11 +5,12 @@ use crate::cluster_info::{ClusterInfo, Node};
use crate::contact_info::ContactInfo;
use crate::gossip_service::GossipService;
use crate::packet::to_shared_blob;
use crate::repair_service::{RepairSlotRange, RepairStrategy};
use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy};
use crate::result::{Error, Result};
use crate::service::Service;
use crate::streamer::{receiver, responder};
use crate::streamer::{blob_receiver, receiver, responder};
use crate::window_service::WindowService;
use crate::{repair_service, window_service};
use bincode::deserialize;
use rand::thread_rng;
use rand::Rng;
@ -599,6 +600,126 @@ impl Replicator {
"Couldn't get blockhash or slot",
))?
}
/// Ask a replicator to populate a given blocktree with its segment.
/// Return the slot at the start of the replicator's segment
///
/// It is recommended to use a temporary blocktree for this since the download will not verify
/// blobs received and might impact the chaining of blobs across slots
pub fn download_from_replicator(
cluster_info: &Arc<RwLock<ClusterInfo>>,
replicator_info: &ContactInfo,
blocktree: &Arc<Blocktree>,
) -> Result<(u64)> {
// Create a client which downloads from the replicator and see that it
// can respond with blobs.
let start_slot = Self::get_replicator_segment_slot(replicator_info.storage_addr);
info!("Replicator download: start at {}", start_slot);
let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel();
let repair_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap());
let t_receiver = blob_receiver(repair_socket.clone(), &exit, s_reader);
let id = cluster_info.read().unwrap().id();
info!(
"Sending repair requests from: {} to: {}",
cluster_info.read().unwrap().my_data().id,
replicator_info.gossip
);
let repair_slot_range = RepairSlotRange {
start: start_slot,
end: start_slot + SLOTS_PER_SEGMENT,
};
// try for upto 180 seconds //TODO needs tuning if segments are huge
for _ in 0..120 {
// Strategy used by replicators
let repairs = RepairService::generate_repairs_in_range(
blocktree,
repair_service::MAX_REPAIR_LENGTH,
&repair_slot_range,
);
//iter over the repairs and send them
if let Ok(repairs) = repairs {
let reqs: Vec<_> = repairs
.into_iter()
.filter_map(|repair_request| {
cluster_info
.read()
.unwrap()
.map_repair_request(&repair_request)
.map(|result| ((replicator_info.gossip, result), repair_request))
.ok()
})
.collect();
for ((to, req), repair_request) in reqs {
if let Ok(local_addr) = repair_socket.local_addr() {
datapoint_info!(
"replicator_download",
("repair_request", format!("{:?}", repair_request), String),
("to", to.to_string(), String),
("from", local_addr.to_string(), String),
("id", id.to_string(), String)
);
}
repair_socket
.send_to(&req, replicator_info.gossip)
.unwrap_or_else(|e| {
error!("{} repair req send_to({}) error {:?}", id, to, e);
0
});
}
}
let res = r_reader.recv_timeout(Duration::new(1, 0));
if let Ok(blobs) = res {
window_service::process_blobs(&blobs, blocktree)?;
}
// check if all the slots in the segment are complete
if Self::segment_complete(start_slot, blocktree) {
break;
}
sleep(Duration::from_millis(500));
}
exit.store(true, Ordering::Relaxed);
t_receiver.join().unwrap();
// check if all the slots in the segment are complete
if !Self::segment_complete(start_slot, blocktree) {
Err(io::Error::new(
ErrorKind::Other,
"Unable to download the full segment",
))?
}
Ok(start_slot)
}
fn segment_complete(start_slot: u64, blocktree: &Arc<Blocktree>) -> bool {
for slot in start_slot..(start_slot + SLOTS_PER_SEGMENT) {
if !blocktree.is_full(slot) {
return false;
}
}
true
}
fn get_replicator_segment_slot(to: SocketAddr) -> u64 {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
let req = ReplicatorRequest::GetSlotHeight(socket.local_addr().unwrap());
let serialized_req = bincode::serialize(&req).unwrap();
for _ in 0..10 {
socket.send_to(&serialized_req, to).unwrap();
let mut buf = [0; 1024];
if let Ok((size, _addr)) = socket.recv_from(&mut buf) {
return deserialize(&buf[..size]).unwrap();
}
sleep(Duration::from_millis(500));
}
panic!("Couldn't get slot height!");
}
}
#[cfg(test)]

View File

@ -50,7 +50,7 @@ fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey)
}
/// Process a blob: Add blob to the ledger window.
fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc<Blocktree>) -> Result<()> {
pub fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc<Blocktree>) -> Result<()> {
// make an iterator for insert_data_blobs()
let blobs: Vec<_> = blobs.iter().map(move |blob| blob.read().unwrap()).collect();