Change replicators to slot-based (#4118)

This commit is contained in:
Sagar Dhawan
2019-05-03 16:27:53 -07:00
committed by GitHub
parent 5bb75a5894
commit a7b695c27a
17 changed files with 255 additions and 484 deletions

View File

@@ -9,7 +9,7 @@ use crate::packet::to_shared_blob;
use crate::repair_service::RepairSlotRange;
use crate::result::Result;
use crate::service::Service;
use crate::storage_stage::{get_segment_from_entry, ENTRIES_PER_SEGMENT};
use crate::storage_stage::SLOTS_PER_SEGMENT;
use crate::streamer::receiver;
use crate::streamer::responder;
use crate::window_service::WindowService;
@@ -26,7 +26,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::system_transaction;
use solana_sdk::transaction::Transaction;
use solana_sdk::transport::TransportError;
use solana_storage_api::storage_instruction;
use solana_storage_api::{get_segment_from_slot, storage_instruction};
use std::fs::File;
use std::io;
use std::io::BufReader;
@@ -107,18 +107,15 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<Hash> {
Ok(hasher.result())
}
fn get_entry_heights_from_blockhash(
signature: &ed25519_dalek::Signature,
storage_entry_height: u64,
) -> u64 {
fn get_slot_from_blockhash(signature: &ed25519_dalek::Signature, storage_slot: u64) -> u64 {
let signature_vec = signature.to_bytes();
let mut segment_index = u64::from(signature_vec[0])
| (u64::from(signature_vec[1]) << 8)
| (u64::from(signature_vec[1]) << 16)
| (u64::from(signature_vec[2]) << 24);
let max_segment_index = get_segment_from_entry(storage_entry_height);
let max_segment_index = get_segment_from_slot(storage_slot);
segment_index %= max_segment_index as u64;
segment_index * ENTRIES_PER_SEGMENT
segment_index * SLOTS_PER_SEGMENT
}
fn create_request_processor(
@@ -213,16 +210,15 @@ impl Replicator {
info!("Looking for leader at {:?}", cluster_entrypoint);
crate::gossip_service::discover_nodes(&cluster_entrypoint.gossip, 1)?;
let (storage_blockhash, storage_entry_height) =
Self::poll_for_blockhash_and_entry_height(&cluster_info)?;
let (storage_blockhash, storage_slot) = Self::poll_for_blockhash_and_slot(&cluster_info)?;
let node_info = node.info.clone();
let signature = storage_keypair.sign(storage_blockhash.as_ref());
let slot = get_entry_heights_from_blockhash(&signature, storage_entry_height);
let slot = get_slot_from_blockhash(&signature, storage_slot);
info!("replicating slot: {}", slot);
let mut repair_slot_range = RepairSlotRange::default();
repair_slot_range.end = slot + ENTRIES_PER_SEGMENT;
repair_slot_range.end = slot + SLOTS_PER_SEGMENT;
repair_slot_range.start = slot;
let repair_socket = Arc::new(node.sockets.repair);
@@ -324,7 +320,7 @@ impl Replicator {
if meta.is_connected {
current_slot += 1;
warn!("current slot: {}", current_slot);
if current_slot >= start_slot + ENTRIES_PER_SEGMENT {
if current_slot >= start_slot + SLOTS_PER_SEGMENT {
break 'outer;
}
} else {
@@ -481,11 +477,11 @@ impl Replicator {
}
}
pub fn entry_height(&self) -> u64 {
pub fn slot(&self) -> u64 {
self.slot
}
fn poll_for_blockhash_and_entry_height(
fn poll_for_blockhash_and_slot(
cluster_info: &Arc<RwLock<ClusterInfo>>,
) -> Result<(String, u64)> {
for _ in 0..10 {
@@ -500,20 +496,20 @@ impl Replicator {
.retry_make_rpc_request(&RpcRequest::GetStorageBlockhash, None, 0)
.expect("rpc request")
.to_string();
let storage_entry_height = rpc_client
.retry_make_rpc_request(&RpcRequest::GetStorageEntryHeight, None, 0)
let storage_slot = rpc_client
.retry_make_rpc_request(&RpcRequest::GetStorageSlot, None, 0)
.expect("rpc request")
.as_u64()
.unwrap();
info!("max entry_height: {}", storage_entry_height);
if get_segment_from_entry(storage_entry_height) != 0 {
return Ok((storage_blockhash, storage_entry_height));
info!("max slot: {}", storage_slot);
if get_segment_from_slot(storage_slot) != 0 {
return Ok((storage_blockhash, storage_slot));
}
sleep(Duration::from_secs(3));
}
Err(Error::new(
ErrorKind::Other,
"Couldn't get blockhash or entry_height",
"Couldn't get blockhash or slot",
))?
}
}