diff --git a/core/src/replicator.rs b/core/src/replicator.rs index f752a9d48e..ee03026b8e 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -14,10 +14,6 @@ use crate::window_service::WindowService; use bincode::deserialize; use rand::thread_rng; use rand::Rng; -#[cfg(feature = "chacha")] -use rand::SeedableRng; -#[cfg(feature = "chacha")] -use rand_chacha::ChaChaRng; use solana_client::rpc_client::RpcClient; use solana_client::rpc_request::RpcRequest; use solana_client::thin_client::ThinClient; @@ -57,17 +53,16 @@ pub struct Replicator { ledger_path: String, keypair: Arc, storage_keypair: Arc, + blockhash: String, signature: ed25519_dalek::Signature, cluster_info: Arc>, ledger_data_file_encrypted: PathBuf, sampling_offsets: Vec, - hash: Hash, + sha_state: Hash, #[cfg(feature = "chacha")] num_chacha_blocks: usize, #[cfg(feature = "chacha")] blocktree: Arc, - #[cfg(feature = "chacha")] - rng: ChaChaRng, } pub(crate) fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { @@ -212,7 +207,7 @@ impl Replicator { let client = crate::gossip_service::get_client(&nodes); let (storage_blockhash, storage_slot) = - match Self::poll_for_blockhash_and_slot(&cluster_info) { + match Self::poll_for_blockhash_and_slot(&cluster_info, &Hash::default().to_string()) { Ok(blockhash_and_slot) => blockhash_and_slot, Err(e) => { //shutdown services before exiting @@ -285,11 +280,6 @@ impl Replicator { //always push this last thread_handles.push(t_replicate); - let mut rng_seed = [0u8; 32]; - rng_seed.copy_from_slice(&signature.to_bytes()[0..32]); - #[cfg(feature = "chacha")] - let rng = ChaChaRng::from_seed(rng_seed); - Ok(Self { gossip_service, fetch_stage, @@ -300,36 +290,46 @@ impl Replicator { ledger_path: ledger_path.to_string(), keypair, storage_keypair, + blockhash: storage_blockhash, signature, cluster_info, ledger_data_file_encrypted: PathBuf::default(), sampling_offsets: vec![], - hash: Hash::default(), + sha_state: Hash::default(), #[cfg(feature = "chacha")] num_chacha_blocks: 0, #[cfg(feature = "chacha")] blocktree, - #[cfg(feature = "chacha")] - rng, }) } pub fn run(&mut self) { info!("waiting for ledger download"); self.thread_handles.pop().unwrap().join().unwrap(); - self.encrypt_ledger() - .expect("ledger encrypt not successful"); - let mut proof_index = 0; loop { + self.encrypt_ledger() + .expect("ledger encrypt not successful"); self.create_sampling_offsets(); - proof_index += 1; if let Err(err) = self.sample_file_to_create_mining_hash() { warn!("Error sampling file, exiting: {:?}", err); break; } - self.submit_mining_proof(proof_index); - // TODO: Replicators should be submitting proofs as fast as possible - sleep(Duration::from_secs(2)); + self.submit_mining_proof(); + + // prep the next proof + let (storage_blockhash, _) = + match Self::poll_for_blockhash_and_slot(&self.cluster_info, &self.blockhash) { + Ok(blockhash_and_slot) => blockhash_and_slot, + Err(e) => { + warn!( + "Error could get a newer blockhash than {:?}. {:?}", + self.blockhash, e + ); + break; + } + }; + self.signature = self.storage_keypair.sign(storage_blockhash.as_ref()); + self.blockhash = storage_blockhash; } } @@ -406,16 +406,22 @@ impl Replicator { #[cfg(feature = "chacha")] { use crate::storage_stage::NUM_STORAGE_SAMPLES; + use rand::{Rng, SeedableRng}; + use rand_chacha::ChaChaRng; + + let mut rng_seed = [0u8; 32]; + rng_seed.copy_from_slice(&self.signature.to_bytes()[0..32]); + let mut rng = ChaChaRng::from_seed(rng_seed); for _ in 0..NUM_STORAGE_SAMPLES { self.sampling_offsets - .push(self.rng.gen_range(0, self.num_chacha_blocks) as u64); + .push(rng.gen_range(0, self.num_chacha_blocks) as u64); } } } fn sample_file_to_create_mining_hash(&mut self) -> Result<()> { - self.hash = sample_file(&self.ledger_data_file_encrypted, &self.sampling_offsets)?; - info!("sampled hash: {}", self.hash); + self.sha_state = sample_file(&self.ledger_data_file_encrypted, &self.sampling_offsets)?; + info!("sampled sha_state: {}", self.sha_state); Ok(()) } @@ -457,7 +463,7 @@ impl Replicator { Ok(()) } - fn submit_mining_proof(&self, proof_index: u64) { + fn submit_mining_proof(&self) { // No point if we've got no storage account... let nodes = self.cluster_info.read().unwrap().tvu_peers(); let client = crate::gossip_service::get_client(&nodes); @@ -473,10 +479,9 @@ impl Replicator { let (blockhash, _) = client.get_recent_blockhash().expect("No recent blockhash"); let instruction = storage_instruction::mining_proof( &self.storage_keypair.pubkey(), - self.hash, + self.sha_state, self.slot, Signature::new(&self.signature.to_bytes()), - proof_index, ); let message = Message::new_with_payer(vec![instruction], Some(&self.keypair.pubkey())); let mut transaction = Transaction::new( @@ -508,8 +513,10 @@ impl Replicator { } } + /// Poll for a different blockhash and associated max_slot than `previous_blockhash` fn poll_for_blockhash_and_slot( cluster_info: &Arc>, + previous_blockhash: &str, ) -> Result<(String, u64)> { for _ in 0..10 { let rpc_client = { @@ -523,14 +530,16 @@ impl Replicator { .retry_make_rpc_request(&RpcRequest::GetStorageBlockhash, None, 0) .expect("rpc request") .to_string(); - let storage_slot = rpc_client - .retry_make_rpc_request(&RpcRequest::GetStorageSlot, None, 0) - .expect("rpc request") - .as_u64() - .unwrap(); - info!("storage slot: {}", storage_slot); - if get_segment_from_slot(storage_slot) != 0 { - return Ok((storage_blockhash, storage_slot)); + if storage_blockhash != *previous_blockhash { + let storage_slot = rpc_client + .retry_make_rpc_request(&RpcRequest::GetStorageSlot, None, 0) + .expect("rpc request") + .as_u64() + .unwrap(); + info!("storage slot: {}", storage_slot); + if get_segment_from_slot(storage_slot) != 0 { + return Ok((storage_blockhash, storage_slot)); + } } info!("waiting for segment..."); sleep(Duration::from_secs(5)); diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 55e5db10b5..be5b26a06e 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -290,16 +290,16 @@ impl StorageStage { storage_keypair: &Arc, state: &Arc>, _blocktree: &Arc, - entry_id: Hash, + blockhash: Hash, slot: u64, instruction_sender: &InstructionSender, ) -> Result<()> { let mut seed = [0u8; 32]; - let signature = storage_keypair.sign(&entry_id.as_ref()); + let signature = storage_keypair.sign(&blockhash.as_ref()); let ix = storage_instruction::advertise_recent_blockhash( &storage_keypair.pubkey(), - entry_id, + blockhash, slot, ); instruction_sender.send(ix)?; @@ -308,7 +308,11 @@ impl StorageStage { let mut rng = ChaChaRng::from_seed(seed); - state.write().unwrap().slot = slot; + { + let mut w_state = state.write().unwrap(); + w_state.slot = slot; + w_state.storage_blockhash = blockhash; + } // Regenerate the answers let num_segments = get_segment_from_slot(slot) as usize; @@ -372,7 +376,6 @@ impl StorageStage { slot: proof_slot, signature, sha_state, - .. }) => { if proof_slot < slot { { @@ -476,7 +479,7 @@ impl StorageStage { &storage_keypair, &storage_state, &blocktree, - entries.last().unwrap().hash, + entry_hash, slot, instruction_sender, )?; @@ -711,7 +714,6 @@ mod tests { Hash::default(), 0, keypair.sign_message(b"test"), - 0, ); let mining_proof_tx = Transaction::new_unsigned_instructions(vec![mining_proof_ix]); let mining_txs = vec![mining_proof_tx]; diff --git a/programs/storage_api/src/storage_instruction.rs b/programs/storage_api/src/storage_instruction.rs index 2b317afa9d..e261913bcf 100644 --- a/programs/storage_api/src/storage_instruction.rs +++ b/programs/storage_api/src/storage_instruction.rs @@ -23,7 +23,6 @@ pub enum StorageInstruction { sha_state: Hash, slot: u64, signature: Signature, - proof_index: u64, }, AdvertiseStorageRecentBlockhash { hash: Hash, @@ -111,13 +110,11 @@ pub fn mining_proof( sha_state: Hash, slot: u64, signature: Signature, - proof_index: u64, ) -> Instruction { let storage_instruction = StorageInstruction::SubmitMiningProof { sha_state, slot, signature, - proof_index, }; let account_metas = vec![ AccountMeta::new(*storage_pubkey, true), diff --git a/programs/storage_api/src/storage_processor.rs b/programs/storage_api/src/storage_processor.rs index d8bf70357e..298efa116b 100644 --- a/programs/storage_api/src/storage_processor.rs +++ b/programs/storage_api/src/storage_processor.rs @@ -43,7 +43,6 @@ pub fn process_instruction( sha_state, slot, signature, - .. } => { if me_unsigned || rest.len() != 1 { // This instruction must be signed by `me` @@ -158,7 +157,6 @@ mod tests { Hash::default(), SLOTS_PER_SEGMENT, Signature::default(), - 0, ); // the proof is for slot 16, which is in segment 0, need to move the tick height into segment 2 let ticks_till_next_segment = TICKS_IN_SEGMENT * 2; @@ -204,7 +202,7 @@ mod tests { let mut accounts = [Account::default()]; let ix = - storage_instruction::mining_proof(&pubkey, Hash::default(), 0, Signature::default(), 0); + storage_instruction::mining_proof(&pubkey, Hash::default(), 0, Signature::default()); // move tick height into segment 1 let ticks_till_next_segment = TICKS_IN_SEGMENT + 1; let mut tick_account = tick_height::create_account(1); @@ -226,7 +224,7 @@ mod tests { accounts[1].data.resize(STORAGE_ACCOUNT_SPACE as usize, 0); let ix = - storage_instruction::mining_proof(&pubkey, Hash::default(), 0, Signature::default(), 0); + storage_instruction::mining_proof(&pubkey, Hash::default(), 0, Signature::default()); // submitting a proof for a slot in the past, so this should fail assert!(test_instruction(&ix, &mut accounts).is_err()); @@ -244,7 +242,7 @@ mod tests { } let ix = - storage_instruction::mining_proof(&pubkey, Hash::default(), 0, Signature::default(), 0); + storage_instruction::mining_proof(&pubkey, Hash::default(), 0, Signature::default()); // move tick height into segment 1 let ticks_till_next_segment = TICKS_IN_SEGMENT + 1; let mut tick_account = tick_height::create_account(1); @@ -501,7 +499,6 @@ mod tests { sha_state, slot, Signature::default(), - 0, )], Some(&mint_keypair.pubkey()), ); @@ -595,7 +592,6 @@ mod tests { Hash::default(), slot, Signature::default(), - 0, )], Some(&mint_pubkey), );