Update replicator sampling and proof generation (#4522)

* Update replicator sampling and proof generation

* Clippy
This commit is contained in:
Sagar Dhawan
2019-06-03 17:27:28 -07:00
committed by GitHub
parent dea663d509
commit 167e15a5ae
4 changed files with 58 additions and 54 deletions

View File

@ -14,10 +14,6 @@ use crate::window_service::WindowService;
use bincode::deserialize;
use rand::thread_rng;
use rand::Rng;
#[cfg(feature = "chacha")]
use rand::SeedableRng;
#[cfg(feature = "chacha")]
use rand_chacha::ChaChaRng;
use solana_client::rpc_client::RpcClient;
use solana_client::rpc_request::RpcRequest;
use solana_client::thin_client::ThinClient;
@ -57,17 +53,16 @@ pub struct Replicator {
ledger_path: String,
keypair: Arc<Keypair>,
storage_keypair: Arc<Keypair>,
blockhash: String,
signature: ed25519_dalek::Signature,
cluster_info: Arc<RwLock<ClusterInfo>>,
ledger_data_file_encrypted: PathBuf,
sampling_offsets: Vec<u64>,
hash: Hash,
sha_state: Hash,
#[cfg(feature = "chacha")]
num_chacha_blocks: usize,
#[cfg(feature = "chacha")]
blocktree: Arc<Blocktree>,
#[cfg(feature = "chacha")]
rng: ChaChaRng,
}
pub(crate) fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<Hash> {
@ -212,7 +207,7 @@ impl Replicator {
let client = crate::gossip_service::get_client(&nodes);
let (storage_blockhash, storage_slot) =
match Self::poll_for_blockhash_and_slot(&cluster_info) {
match Self::poll_for_blockhash_and_slot(&cluster_info, &Hash::default().to_string()) {
Ok(blockhash_and_slot) => blockhash_and_slot,
Err(e) => {
//shutdown services before exiting
@ -285,11 +280,6 @@ impl Replicator {
//always push this last
thread_handles.push(t_replicate);
let mut rng_seed = [0u8; 32];
rng_seed.copy_from_slice(&signature.to_bytes()[0..32]);
#[cfg(feature = "chacha")]
let rng = ChaChaRng::from_seed(rng_seed);
Ok(Self {
gossip_service,
fetch_stage,
@ -300,36 +290,46 @@ impl Replicator {
ledger_path: ledger_path.to_string(),
keypair,
storage_keypair,
blockhash: storage_blockhash,
signature,
cluster_info,
ledger_data_file_encrypted: PathBuf::default(),
sampling_offsets: vec![],
hash: Hash::default(),
sha_state: Hash::default(),
#[cfg(feature = "chacha")]
num_chacha_blocks: 0,
#[cfg(feature = "chacha")]
blocktree,
#[cfg(feature = "chacha")]
rng,
})
}
pub fn run(&mut self) {
info!("waiting for ledger download");
self.thread_handles.pop().unwrap().join().unwrap();
self.encrypt_ledger()
.expect("ledger encrypt not successful");
let mut proof_index = 0;
loop {
self.encrypt_ledger()
.expect("ledger encrypt not successful");
self.create_sampling_offsets();
proof_index += 1;
if let Err(err) = self.sample_file_to_create_mining_hash() {
warn!("Error sampling file, exiting: {:?}", err);
break;
}
self.submit_mining_proof(proof_index);
// TODO: Replicators should be submitting proofs as fast as possible
sleep(Duration::from_secs(2));
self.submit_mining_proof();
// prep the next proof
let (storage_blockhash, _) =
match Self::poll_for_blockhash_and_slot(&self.cluster_info, &self.blockhash) {
Ok(blockhash_and_slot) => blockhash_and_slot,
Err(e) => {
warn!(
"Error could get a newer blockhash than {:?}. {:?}",
self.blockhash, e
);
break;
}
};
self.signature = self.storage_keypair.sign(storage_blockhash.as_ref());
self.blockhash = storage_blockhash;
}
}
@ -406,16 +406,22 @@ impl Replicator {
#[cfg(feature = "chacha")]
{
use crate::storage_stage::NUM_STORAGE_SAMPLES;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
let mut rng_seed = [0u8; 32];
rng_seed.copy_from_slice(&self.signature.to_bytes()[0..32]);
let mut rng = ChaChaRng::from_seed(rng_seed);
for _ in 0..NUM_STORAGE_SAMPLES {
self.sampling_offsets
.push(self.rng.gen_range(0, self.num_chacha_blocks) as u64);
.push(rng.gen_range(0, self.num_chacha_blocks) as u64);
}
}
}
fn sample_file_to_create_mining_hash(&mut self) -> Result<()> {
self.hash = sample_file(&self.ledger_data_file_encrypted, &self.sampling_offsets)?;
info!("sampled hash: {}", self.hash);
self.sha_state = sample_file(&self.ledger_data_file_encrypted, &self.sampling_offsets)?;
info!("sampled sha_state: {}", self.sha_state);
Ok(())
}
@ -457,7 +463,7 @@ impl Replicator {
Ok(())
}
fn submit_mining_proof(&self, proof_index: u64) {
fn submit_mining_proof(&self) {
// No point if we've got no storage account...
let nodes = self.cluster_info.read().unwrap().tvu_peers();
let client = crate::gossip_service::get_client(&nodes);
@ -473,10 +479,9 @@ impl Replicator {
let (blockhash, _) = client.get_recent_blockhash().expect("No recent blockhash");
let instruction = storage_instruction::mining_proof(
&self.storage_keypair.pubkey(),
self.hash,
self.sha_state,
self.slot,
Signature::new(&self.signature.to_bytes()),
proof_index,
);
let message = Message::new_with_payer(vec![instruction], Some(&self.keypair.pubkey()));
let mut transaction = Transaction::new(
@ -508,8 +513,10 @@ impl Replicator {
}
}
/// Poll for a different blockhash and associated max_slot than `previous_blockhash`
fn poll_for_blockhash_and_slot(
cluster_info: &Arc<RwLock<ClusterInfo>>,
previous_blockhash: &str,
) -> Result<(String, u64)> {
for _ in 0..10 {
let rpc_client = {
@ -523,14 +530,16 @@ impl Replicator {
.retry_make_rpc_request(&RpcRequest::GetStorageBlockhash, None, 0)
.expect("rpc request")
.to_string();
let storage_slot = rpc_client
.retry_make_rpc_request(&RpcRequest::GetStorageSlot, None, 0)
.expect("rpc request")
.as_u64()
.unwrap();
info!("storage slot: {}", storage_slot);
if get_segment_from_slot(storage_slot) != 0 {
return Ok((storage_blockhash, storage_slot));
if storage_blockhash != *previous_blockhash {
let storage_slot = rpc_client
.retry_make_rpc_request(&RpcRequest::GetStorageSlot, None, 0)
.expect("rpc request")
.as_u64()
.unwrap();
info!("storage slot: {}", storage_slot);
if get_segment_from_slot(storage_slot) != 0 {
return Ok((storage_blockhash, storage_slot));
}
}
info!("waiting for segment...");
sleep(Duration::from_secs(5));

View File

@ -290,16 +290,16 @@ impl StorageStage {
storage_keypair: &Arc<Keypair>,
state: &Arc<RwLock<StorageStateInner>>,
_blocktree: &Arc<Blocktree>,
entry_id: Hash,
blockhash: Hash,
slot: u64,
instruction_sender: &InstructionSender,
) -> Result<()> {
let mut seed = [0u8; 32];
let signature = storage_keypair.sign(&entry_id.as_ref());
let signature = storage_keypair.sign(&blockhash.as_ref());
let ix = storage_instruction::advertise_recent_blockhash(
&storage_keypair.pubkey(),
entry_id,
blockhash,
slot,
);
instruction_sender.send(ix)?;
@ -308,7 +308,11 @@ impl StorageStage {
let mut rng = ChaChaRng::from_seed(seed);
state.write().unwrap().slot = slot;
{
let mut w_state = state.write().unwrap();
w_state.slot = slot;
w_state.storage_blockhash = blockhash;
}
// Regenerate the answers
let num_segments = get_segment_from_slot(slot) as usize;
@ -372,7 +376,6 @@ impl StorageStage {
slot: proof_slot,
signature,
sha_state,
..
}) => {
if proof_slot < slot {
{
@ -476,7 +479,7 @@ impl StorageStage {
&storage_keypair,
&storage_state,
&blocktree,
entries.last().unwrap().hash,
entry_hash,
slot,
instruction_sender,
)?;
@ -711,7 +714,6 @@ mod tests {
Hash::default(),
0,
keypair.sign_message(b"test"),
0,
);
let mining_proof_tx = Transaction::new_unsigned_instructions(vec![mining_proof_ix]);
let mining_txs = vec![mining_proof_tx];