Replicator timeout (#2480)

* Add timeout to Replicator::new; used when polling for leader

* Add timeout functionality to replicator ledger download

Shares the same timeout as polling for leader

Defaults to 30 seconds

* Add docs for Replicator::new
This commit is contained in:
Mark
2019-01-21 15:37:41 -06:00
committed by GitHub
parent 6e8b69fc88
commit f37eb533f1
3 changed files with 162 additions and 10 deletions

View File

@@ -6,7 +6,7 @@ use crate::cluster_info::{ClusterInfo, Node, NodeInfo};
use crate::db_ledger::DbLedger;
use crate::gossip_service::GossipService;
use crate::leader_scheduler::LeaderScheduler;
use crate::result::Result;
use crate::result::{self, Result};
use crate::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler};
use crate::service::Service;
use crate::storage_stage::{get_segment_from_entry, ENTRIES_PER_SEGMENT};
@@ -35,7 +35,7 @@ use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::JoinHandle;
use std::time::Duration;
use std::time::{Duration, Instant};
pub struct Replicator {
gossip_service: GossipService,
@@ -99,15 +99,27 @@ fn get_entry_heights_from_last_id(
}
impl Replicator {
/// Returns a Result that contains a replicator on success
///
/// # Arguments
/// * `ledger_path` - (Not actually optional) path to where the the ledger will be stored.
/// Causes panic if none
/// * `node` - The replicator node
/// * `leader_info` - NodeInfo representing the leader
/// * `keypair` - Keypair for this replicator
/// * `timeout` - (optional) timeout for polling for leader/downloading the ledger. Defaults to
/// 30 seconds
#[allow(clippy::new_ret_no_self)]
pub fn new(
ledger_path: Option<&str>,
node: Node,
leader_info: &NodeInfo,
keypair: &Keypair,
timeout: Option<Duration>,
) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false));
let done = Arc::new(AtomicBool::new(false));
let timeout = timeout.unwrap_or_else(|| Duration::new(30, 0));
info!("Replicator: id: {}", keypair.pubkey());
info!("Creating cluster info....");
@@ -138,7 +150,7 @@ impl Replicator {
);
info!("polling for leader");
let leader = Self::poll_for_leader(&cluster_info)?;
let leader = Self::poll_for_leader(&cluster_info, timeout)?;
info!("Got leader: {:?}", leader);
@@ -161,6 +173,8 @@ impl Replicator {
// todo: pull blobs off the retransmit_receiver and recycle them?
let (retransmit_sender, retransmit_receiver) = channel();
let (entry_sender, entry_receiver) = channel();
let t_window = window_service(
db_ledger.clone(),
cluster_info.clone(),
@@ -168,7 +182,7 @@ impl Replicator {
entry_height,
max_entry_height,
blob_fetch_receiver,
None,
Some(entry_sender),
retransmit_sender,
repair_socket,
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
@@ -178,10 +192,25 @@ impl Replicator {
);
info!("window created, waiting for ledger download done");
let start = Instant::now();
let mut received_so_far = 0;
while !done.load(Ordering::Relaxed) {
sleep(Duration::from_millis(100));
let elapsed = start.elapsed();
received_so_far += entry_receiver.try_recv().map(|v| v.len()).unwrap_or(0);
if received_so_far == 0 && elapsed > timeout {
return Err(result::Error::IO(io::Error::new(
ErrorKind::TimedOut,
"Timed out waiting to receive any blocks",
)));
}
}
info!("Done receiving entries from window_service");
let mut node_info = node.info.clone();
node_info.tvu = "0.0.0.0:0".parse().unwrap();
{
@@ -279,16 +308,27 @@ impl Replicator {
self.entry_height
}
fn poll_for_leader(cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<NodeInfo> {
for _ in 0..30 {
fn poll_for_leader(
cluster_info: &Arc<RwLock<ClusterInfo>>,
timeout: Duration,
) -> Result<NodeInfo> {
let start = Instant::now();
loop {
if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() {
return Ok(l.clone());
}
let elapsed = start.elapsed();
if elapsed > timeout {
return Err(result::Error::IO(io::Error::new(
ErrorKind::TimedOut,
"Timed out waiting to receive any blocks",
)));
}
sleep(Duration::from_millis(900));
info!("{}", cluster_info.read().unwrap().node_info_trace());
}
Err(Error::new(ErrorKind::Other, "Couldn't find leader"))?
}
fn poll_for_last_id_and_entry_height(
@@ -355,7 +395,6 @@ impl Replicator {
#[cfg(test)]
mod tests {
use crate::replicator::sample_file;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
@@ -429,5 +468,4 @@ mod tests {
let res = sample_file(&in_path, &samples);
assert!(res.is_err());
}
}