Decouple turns from segments in PoRep (#5004)

* Decouple Segments from Turns in Storage

* Get replicator local cluster tests running in a reasonable amount of time

* Fix unused imports

* Document new RPC APIs

* Check for exit while polling
This commit is contained in:
Sagar Dhawan
2019-07-10 13:33:29 -07:00
committed by GitHub
parent a383ea532f
commit 35ec7a5156
11 changed files with 183 additions and 99 deletions

View File

@@ -104,7 +104,7 @@ pub(crate) fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<
Ok(hasher.result())
}
fn get_slot_from_blockhash(
fn get_slot_from_signature(
signature: &ed25519_dalek::Signature,
storage_turn: u64,
slots_per_segment: u64,
@@ -265,7 +265,7 @@ impl Replicator {
};
spawn(move || {
// setup replicator
let window_service = Self::setup(
let window_service = match Self::setup(
&mut meta,
cluster_info.clone(),
&blocktree,
@@ -275,8 +275,21 @@ impl Replicator {
repair_socket,
blob_fetch_receiver,
slot_sender,
)
.ok();
) {
Ok(window_service) => window_service,
Err(e) => {
//shutdown services before exiting
error!("setup failed {:?}; replicator thread exiting...", e);
exit.store(true, Ordering::Relaxed);
request_processor
.into_iter()
.for_each(|t| t.join().unwrap());
fetch_stage.join().unwrap();
gossip_service.join().unwrap();
return;
}
};
info!("setup complete");
// run replicator
Self::run(
@@ -293,9 +306,7 @@ impl Replicator {
.for_each(|t| t.join().unwrap());
fetch_stage.join().unwrap();
gossip_service.join().unwrap();
if let Some(window) = window_service {
window.join().unwrap()
}
window_service.join().unwrap()
})
};
@@ -342,6 +353,7 @@ impl Replicator {
&cluster_info,
meta.slots_per_segment,
&meta.blockhash,
exit,
) {
Ok(blockhash_and_slot) => blockhash_and_slot,
Err(e) => {
@@ -410,27 +422,27 @@ impl Replicator {
return Err(e);
}
};
let (storage_blockhash, storage_slot) = match Self::poll_for_blockhash_and_slot(
let (segment_blockhash, segment_slot) = match Self::poll_for_segment(
&cluster_info,
slots_per_segment,
&Hash::default(),
exit,
) {
Ok(blockhash_and_slot) => blockhash_and_slot,
Err(e) => {
error!("unable to get turn status, exiting...");
//shutdown services before exiting
exit.store(true, Ordering::Relaxed);
return Err(e);
}
};
let signature = storage_keypair.sign(storage_blockhash.as_ref());
let slot = get_slot_from_blockhash(&signature, storage_slot, slots_per_segment);
let signature = storage_keypair.sign(segment_blockhash.as_ref());
let slot = get_slot_from_signature(&signature, segment_slot, slots_per_segment);
info!("replicating slot: {}", slot);
slot_sender.send(slot)?;
meta.slot = slot;
meta.slots_per_segment = slots_per_segment;
meta.signature = Signature::new(&signature.to_bytes());
meta.blockhash = storage_blockhash;
meta.blockhash = segment_blockhash;
let mut repair_slot_range = RepairSlotRange::default();
repair_slot_range.end = slot + slots_per_segment;
@@ -682,13 +694,35 @@ impl Replicator {
}
}
/// Waits until the first segment is ready, and returns the current segment
fn poll_for_segment(
cluster_info: &Arc<RwLock<ClusterInfo>>,
slots_per_segment: u64,
previous_blockhash: &Hash,
exit: &Arc<AtomicBool>,
) -> result::Result<(Hash, u64), Error> {
loop {
let (blockhash, turn_slot) = Self::poll_for_blockhash_and_slot(
cluster_info,
slots_per_segment,
previous_blockhash,
exit,
)?;
if get_segment_from_slot(turn_slot, slots_per_segment) != 0 {
return Ok((blockhash, turn_slot));
}
}
}
/// Poll for a different blockhash and associated max_slot than `previous_blockhash`
fn poll_for_blockhash_and_slot(
cluster_info: &Arc<RwLock<ClusterInfo>>,
slots_per_segment: u64,
previous_blockhash: &Hash,
exit: &Arc<AtomicBool>,
) -> result::Result<(Hash, u64), Error> {
for _ in 0..10 {
info!("waiting for the next turn...");
loop {
let rpc_peers = {
let cluster_info = cluster_info.read().unwrap();
cluster_info.rpc_peers()
@@ -700,19 +734,19 @@ impl Replicator {
RpcClient::new_socket(rpc_peers[node_index].rpc)
};
let response = rpc_client
.retry_make_rpc_request(&RpcRequest::GetStorageBlockhash, None, 0)
.retry_make_rpc_request(&RpcRequest::GetStorageTurn, None, 0)
.map_err(|err| {
warn!("Error while making rpc request {:?}", err);
Error::IO(io::Error::new(ErrorKind::Other, "rpc error"))
})?;
let storage_blockhash =
serde_json::from_value::<(String)>(response).map_err(|err| {
let (storage_blockhash, turn_slot) =
serde_json::from_value::<((String, u64))>(response).map_err(|err| {
io::Error::new(
io::ErrorKind::Other,
format!("Couldn't parse response: {:?}", err),
)
})?;
let storage_blockhash = storage_blockhash.parse().map_err(|err| {
let turn_blockhash = storage_blockhash.parse().map_err(|err| {
io::Error::new(
io::ErrorKind::Other,
format!(
@@ -721,28 +755,21 @@ impl Replicator {
),
)
})?;
if storage_blockhash != *previous_blockhash {
let storage_slot = rpc_client
.retry_make_rpc_request(&RpcRequest::GetStorageSlot, None, 0)
.map_err(|err| {
warn!("Error while making rpc request {:?}", err);
Error::IO(io::Error::new(ErrorKind::Other, "rpc error"))
})?
.as_u64()
.unwrap();
info!("storage slot: {}", storage_slot);
if get_segment_from_slot(storage_slot, slots_per_segment) != 0 {
return Ok((storage_blockhash, storage_slot));
if turn_blockhash != *previous_blockhash {
info!("turn slot: {}", turn_slot);
if get_segment_from_slot(turn_slot, slots_per_segment) != 0 {
return Ok((turn_blockhash, turn_slot));
}
}
}
info!("waiting for segment...");
if exit.load(Ordering::Relaxed) {
return Err(Error::IO(io::Error::new(
ErrorKind::Other,
"exit signalled...",
)));
}
sleep(Duration::from_secs(5));
}
Err(io::Error::new(
ErrorKind::Other,
"Couldn't get blockhash or slot",
))?
}
/// Ask a replicator to populate a given blocktree with its segment.
@@ -867,7 +894,7 @@ impl Replicator {
}
sleep(Duration::from_millis(500));
}
panic!("Couldn't get slot height!");
panic!("Couldn't get segment slot from replicator!");
}
}