diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 70e5d0fa5b..a3eb94245b 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -18,9 +18,10 @@ use solana_sdk::message::Message; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::transaction::Transaction; -use solana_storage_api::storage_instruction::StorageInstruction; +use solana_storage_api::storage_contract::{CheckedProof, Proof, ProofStatus}; +use solana_storage_api::storage_instruction::{proof_validation, StorageInstruction}; use solana_storage_api::{get_segment_from_slot, storage_instruction}; -use std::collections::HashSet; +use std::collections::HashMap; use std::io; use std::mem::size_of; use std::net::UdpSocket; @@ -34,7 +35,7 @@ use std::time::Duration; // Vec of [ledger blocks] x [keys] type StorageResults = Vec; type StorageKeys = Vec; -type ReplicatorMap = Vec>; +type ReplicatorMap = Vec>>; #[derive(Default)] pub struct StorageStateInner { @@ -117,7 +118,7 @@ impl StorageState { let replicator_map = &self.state.read().unwrap().replicator_map; if index < replicator_map.len() { replicator_map[index] - .iter() + .keys() .cloned() .take(MAX_PUBKEYS_TO_RETURN) .collect::>() @@ -350,8 +351,6 @@ impl StorageStage { } } } - // TODO: bundle up mining submissions from replicators - // and submit them in a tx to the leader to get reward. Ok(()) } @@ -360,13 +359,13 @@ impl StorageStage { slot: u64, storage_state: &Arc>, current_key_idx: &mut usize, - transaction_key0: Pubkey, + storage_account_key: Pubkey, ) { match deserialize(data) { Ok(StorageInstruction::SubmitMiningProof { slot: proof_slot, signature, - .. + sha_state, }) => { if proof_slot < slot { { @@ -383,14 +382,21 @@ impl StorageStage { let mut statew = storage_state.write().unwrap(); let max_segment_index = get_segment_from_slot(slot) as usize; - if statew.replicator_map.len() <= max_segment_index { + if statew.replicator_map.len() < max_segment_index { statew .replicator_map - .resize(max_segment_index, HashSet::new()); + .resize(max_segment_index, HashMap::new()); } let proof_segment_index = get_segment_from_slot(proof_slot) as usize; if proof_segment_index < statew.replicator_map.len() { - statew.replicator_map[proof_segment_index].insert(transaction_key0); + // Copy the submitted proof + statew.replicator_map[proof_segment_index] + .entry(storage_account_key) + .or_default() + .push(Proof { + signature, + sha_state, + }); } } debug!("storage proof: slot: {}", slot); @@ -426,14 +432,18 @@ impl StorageStage { // Go through the transactions, find proofs, and use them to update // the storage_keys with their signatures for tx in &entry.transactions { - for (i, program_id) in tx.message.program_ids().iter().enumerate() { + for instruction in tx.message.instructions.iter() { + let program_id = + tx.message.account_keys[instruction.program_ids_index as usize]; if solana_storage_api::check_id(&program_id) { + let storage_account_key = + tx.message.account_keys[instruction.accounts[0] as usize]; Self::process_storage_transaction( - &tx.message().instructions[i].data, + &instruction.data, slot, storage_state, current_key_idx, - tx.message.account_keys[0], + storage_account_key, ); } } @@ -454,6 +464,47 @@ impl StorageStage { slot, instruction_sender, )?; + // bundle up mining submissions from replicators + // and submit them in a tx to the leader to get rewarded. + let mut w_state = storage_state.write().unwrap(); + let instructions: Vec<_> = w_state + .replicator_map + .iter_mut() + .enumerate() + .flat_map(|(segment, proof_map)| { + let checked_proofs = proof_map + .iter_mut() + .map(|(id, proofs)| { + ( + *id, + proofs + .drain(..) + .map(|proof| CheckedProof { + proof, + status: ProofStatus::Valid, + }) + .collect::>(), + ) + }) + .collect::>(); + if !checked_proofs.is_empty() { + let ix = proof_validation( + &storage_keypair.pubkey(), + segment as u64, + checked_proofs, + ); + Some(ix) + } else { + None + } + }) + .collect(); + // TODO Avoid AccountInUse errors in this loop + let res: std::result::Result<_, _> = instructions + .into_iter() + .map(|ix| instruction_sender.send(ix)) + .collect(); + res? } } } diff --git a/programs/storage_api/src/storage_contract.rs b/programs/storage_api/src/storage_contract.rs index 4f748103ec..e5e2082ae7 100644 --- a/programs/storage_api/src/storage_contract.rs +++ b/programs/storage_api/src/storage_contract.rs @@ -13,7 +13,7 @@ use std::collections::HashMap; pub const TOTAL_VALIDATOR_REWARDS: u64 = 1; pub const TOTAL_REPLICATOR_REWARDS: u64 = 1; // Todo Tune this for actual use cases when replicators are feature complete -pub const STORAGE_ACCOUNT_SPACE: u64 = 1024 * 4; +pub const STORAGE_ACCOUNT_SPACE: u64 = 1024 * 8; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub enum ProofStatus { @@ -30,7 +30,6 @@ impl Default for ProofStatus { #[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct Proof { - pub id: Pubkey, pub signature: Signature, pub sha_state: Hash, } @@ -46,17 +45,19 @@ pub enum StorageContract { Uninitialized, // Must be first (aka, 0) ValidatorStorage { + // Most recently advertised slot slot: u64, + // Most recently advertised blockhash hash: Hash, - lockout_validations: HashMap>, - reward_validations: HashMap>, + lockout_validations: HashMap>, + reward_validations: HashMap>, }, ReplicatorStorage { /// Map of Proofs per segment, in a HashMap based on the sha_state proofs: HashMap>, /// Map of Rewards per segment, in a HashMap based on the sha_state /// Multiple validators can validate the same set of proofs so it needs a Vec - reward_validations: HashMap>>, + reward_validations: HashMap>>, }, MiningPool, @@ -127,7 +128,6 @@ impl<'a> StorageAccount<'a> { pub fn submit_mining_proof( &mut self, - id: Pubkey, sha_state: Hash, slot: u64, signature: Signature, @@ -148,14 +148,19 @@ impl<'a> StorageAccount<'a> { sha_state, slot ); - proofs.entry(segment_index).or_default().insert( + let segment_proofs = proofs.entry(segment_index).or_default(); + if segment_proofs.contains_key(&sha_state) { + // do not accept duplicate proofs + return Err(InstructionError::InvalidArgument); + } + segment_proofs.insert( sha_state, Proof { - id, sha_state, signature, }, ); + self.account.set_state(storage_contract) } else { Err(InstructionError::InvalidArgument)? @@ -200,8 +205,8 @@ impl<'a> StorageAccount<'a> { pub fn proof_validation( &mut self, - slot: u64, - proofs: Vec, + segment: u64, + proofs: Vec<(Pubkey, Vec)>, replicator_accounts: &mut [StorageAccount], ) -> Result<(), InstructionError> { let mut storage_contract = &mut self.account.state()?; @@ -211,14 +216,14 @@ impl<'a> StorageAccount<'a> { .. } = &mut storage_contract { - let segment_index = get_segment_from_slot(slot); + let segment_index = segment as usize; let state_segment = get_segment_from_slot(*state_slot); if segment_index > state_segment { return Err(InstructionError::InvalidArgument); } - let mut previous_proofs = replicator_accounts + let accounts_and_proofs = replicator_accounts .iter_mut() .filter_map(|account| { account @@ -226,32 +231,33 @@ impl<'a> StorageAccount<'a> { .state() .ok() .map(move |contract| match contract { - StorageContract::ReplicatorStorage { proofs, .. } => Some(( - account, - proofs.get(&segment_index).cloned().unwrap_or_default(), - )), + StorageContract::ReplicatorStorage { proofs, .. } => { + if let Some(proofs) = proofs.get(&segment_index).cloned() { + Some((account, proofs)) + } else { + None + } + } _ => None, }) }) .flatten() .collect::>(); - if previous_proofs.len() != proofs.len() { + if accounts_and_proofs.len() != proofs.len() { // don't have all the accounts to validate the proofs against return Err(InstructionError::InvalidArgument); } let valid_proofs: Vec<_> = proofs .into_iter() - .enumerate() - .filter_map(|(i, entry)| { - let (account, proofs) = &mut previous_proofs[i]; - proofs.get(&entry.proof.sha_state).map(|proof| { - if process_validation(account, segment_index, &proof, &entry).is_ok() { - Some(entry) - } else { - None - } + .zip(accounts_and_proofs.into_iter()) + .flat_map(|((_id, checked_proofs), (account, proofs))| { + checked_proofs.into_iter().filter_map(move |checked_proof| { + proofs.get(&checked_proof.proof.sha_state).map(|proof| { + process_validation(account, segment_index, &proof, &checked_proof) + .map(|_| checked_proof) + }) }) }) .flatten() @@ -262,8 +268,9 @@ impl<'a> StorageAccount<'a> { lockout_validations .entry(segment_index) .or_default() - .insert(proof.proof.sha_state, proof); + .insert(proof.proof.sha_state, proof.status); }); + self.account.set_state(storage_contract) } else { Err(InstructionError::InvalidArgument)? @@ -333,11 +340,11 @@ impl<'a> StorageAccount<'a> { .map(|mut proofs| { proofs .drain() - .map(|(_, proof)| { + .map(|(sha_state, proof)| { proof .into_iter() .map(|proof| { - segment_proofs.remove(&proof.proof.sha_state); + segment_proofs.remove(&sha_state); proof }) .collect::>() @@ -346,16 +353,12 @@ impl<'a> StorageAccount<'a> { .collect::>() }) .unwrap_or_default(); - let _num_validations = count_valid_proofs(&checked_proofs); - - // TODO enable when rewards are working - /* - let reward = num_validations - * TOTAL_REPLICATOR_REWARDS - * (num_validations / reward_validations[&claim_segment].len() as u64); + let total_proofs = checked_proofs.len() as u64; + let num_validations = count_valid_proofs(&checked_proofs); + let reward = + num_validations * TOTAL_REPLICATOR_REWARDS * (num_validations / total_proofs); mining_pool.account.lamports -= reward; self.account.lamports += reward; - */ self.account.set_state(storage_contract) } else { Err(InstructionError::InvalidArgument)? @@ -390,7 +393,7 @@ fn store_validation_result( .or_default() .entry(checked_proof.proof.sha_state) .or_default() - .push(checked_proof); + .push(checked_proof.status); } else { return Err(InstructionError::InvalidAccountData); } @@ -400,10 +403,10 @@ fn store_validation_result( storage_account.account.set_state(&storage_contract) } -fn count_valid_proofs(proofs: &[CheckedProof]) -> u64 { +fn count_valid_proofs(proofs: &[ProofStatus]) -> u64 { let mut num = 0; for proof in proofs { - if let ProofStatus::Valid = proof.status { + if let ProofStatus::Valid = proof { num += 1; } } @@ -477,7 +480,6 @@ mod tests { }; let segment_index = 0_usize; let proof = Proof { - id: Pubkey::default(), signature: Signature::default(), sha_state: Hash::default(), }; diff --git a/programs/storage_api/src/storage_instruction.rs b/programs/storage_api/src/storage_instruction.rs index 40063cefb4..e91af834ae 100644 --- a/programs/storage_api/src/storage_instruction.rs +++ b/programs/storage_api/src/storage_instruction.rs @@ -6,6 +6,7 @@ use solana_sdk::instruction::{AccountMeta, Instruction}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; use solana_sdk::system_instruction; +use std::collections::HashMap; #[derive(Serialize, Deserialize, Debug, Clone)] pub enum StorageInstruction { @@ -35,8 +36,8 @@ pub enum StorageInstruction { slot: u64, }, ProofValidation { - slot: u64, - proofs: Vec, + segment: u64, + proofs: Vec<(Pubkey, Vec)>, }, } @@ -131,16 +132,18 @@ pub fn advertise_recent_blockhash( Instruction::new(id(), &storage_instruction, account_metas) } -pub fn proof_validation( +pub fn proof_validation( storage_pubkey: &Pubkey, - slot: u64, - proofs: Vec, + segment: u64, + checked_proofs: HashMap, S>, ) -> Instruction { let mut account_metas = vec![AccountMeta::new(*storage_pubkey, true)]; - proofs.iter().for_each(|checked_proof| { - account_metas.push(AccountMeta::new(checked_proof.proof.id, false)) + let mut proofs = vec![]; + checked_proofs.into_iter().for_each(|(id, p)| { + proofs.push((id, p)); + account_metas.push(AccountMeta::new(id, false)) }); - let storage_instruction = StorageInstruction::ProofValidation { slot, proofs }; + let storage_instruction = StorageInstruction::ProofValidation { segment, proofs }; Instruction::new(id(), &storage_instruction, account_metas) } diff --git a/programs/storage_api/src/storage_processor.rs b/programs/storage_api/src/storage_processor.rs index df3494797b..9ea5b7ff14 100644 --- a/programs/storage_api/src/storage_processor.rs +++ b/programs/storage_api/src/storage_processor.rs @@ -18,7 +18,6 @@ pub fn process_instruction( let (me, rest) = keyed_accounts.split_at_mut(1); let me_unsigned = me[0].signer_key().is_none(); - let storage_account_pubkey = *me[0].unsigned_key(); let mut storage_account = StorageAccount::new(&mut me[0].account); match bincode::deserialize(data).map_err(|_| InstructionError::InvalidInstructionData)? { @@ -50,7 +49,6 @@ pub fn process_instruction( Err(InstructionError::InvalidArgument)?; } storage_account.submit_mining_proof( - storage_account_pubkey, sha_state, slot, signature, @@ -78,16 +76,16 @@ pub fn process_instruction( tick_height / DEFAULT_TICKS_PER_SLOT, ) } - StorageInstruction::ProofValidation { slot, proofs } => { + StorageInstruction::ProofValidation { segment, proofs } => { if me_unsigned || rest.is_empty() { - // This instruction must be signed by `me` + // This instruction must be signed by `me` and `rest` cannot be empty Err(InstructionError::InvalidArgument)?; } let mut rest: Vec<_> = rest .iter_mut() .map(|keyed_account| StorageAccount::new(&mut keyed_account.account)) .collect(); - storage_account.proof_validation(slot, proofs, &mut rest) + storage_account.proof_validation(segment, proofs, &mut rest) } } } @@ -95,13 +93,13 @@ pub fn process_instruction( #[cfg(test)] mod tests { use super::*; - use crate::id; use crate::storage_contract::{ CheckedProof, Proof, ProofStatus, StorageContract, STORAGE_ACCOUNT_SPACE, - TOTAL_VALIDATOR_REWARDS, + TOTAL_REPLICATOR_REWARDS, TOTAL_VALIDATOR_REWARDS, }; use crate::storage_instruction; use crate::SLOTS_PER_SEGMENT; + use crate::{get_segment_from_slot, id}; use assert_matches::assert_matches; use bincode::deserialize; use log::*; @@ -115,6 +113,7 @@ mod tests { use solana_sdk::message::Message; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; + use std::collections::HashMap; use std::sync::Arc; const TICKS_IN_SEGMENT: u64 = SLOTS_PER_SEGMENT * DEFAULT_TICKS_PER_SLOT; @@ -251,10 +250,16 @@ mod tests { solana_logger::setup(); let (genesis_block, mint_keypair) = create_genesis_block(1000); let mint_pubkey = mint_keypair.pubkey(); - let replicator_keypair = Keypair::new(); - let replicator_pubkey = replicator_keypair.pubkey(); - let validator_keypair = Keypair::new(); - let validator_pubkey = validator_keypair.pubkey(); + + let replicator_1_storage_keypair = Keypair::new(); + let replicator_1_storage_id = replicator_1_storage_keypair.pubkey(); + + let replicator_2_storage_keypair = Keypair::new(); + let replicator_2_storage_id = replicator_2_storage_keypair.pubkey(); + + let validator_storage_keypair = Keypair::new(); + let validator_storage_id = validator_storage_keypair.pubkey(); + let mining_pool_keypair = Keypair::new(); let mining_pool_pubkey = mining_pool_keypair.pubkey(); @@ -264,20 +269,13 @@ mod tests { let slot = 0; let bank_client = BankClient::new_shared(&bank); - let message = Message::new(storage_instruction::create_validator_storage_account( - &mint_pubkey, - &validator_pubkey, + init_storage_accounts( + &bank_client, + &mint_keypair, + &[&validator_storage_id], + &[&replicator_1_storage_id, &replicator_2_storage_id], 10, - )); - bank_client.send_message(&[&mint_keypair], message).unwrap(); - - let message = Message::new(storage_instruction::create_replicator_storage_account( - &mint_pubkey, - &replicator_pubkey, - 10, - )); - bank_client.send_message(&[&mint_keypair], message).unwrap(); - + ); let message = Message::new(storage_instruction::create_mining_pool_account( &mint_pubkey, &mining_pool_pubkey, @@ -294,35 +292,42 @@ mod tests { // advertise for storage segment 1 let message = Message::new_with_payer( vec![storage_instruction::advertise_recent_blockhash( - &validator_pubkey, + &validator_storage_id, Hash::default(), SLOTS_PER_SEGMENT, )], Some(&mint_pubkey), ); assert_matches!( - bank_client.send_message(&[&mint_keypair, &validator_keypair], message), - Ok(_) - ); - - let message = Message::new_with_payer( - vec![storage_instruction::mining_proof( - &replicator_pubkey, - Hash::default(), - slot, - Signature::default(), - )], - Some(&mint_pubkey), - ); - - assert_matches!( - bank_client.send_message(&[&mint_keypair, &replicator_keypair], message), + bank_client.send_message(&[&mint_keypair, &validator_storage_keypair], message), Ok(_) ); + // submit proofs 5 proofs for each replicator for segment 0 + let mut checked_proofs: HashMap<_, Vec<_>> = HashMap::new(); + for slot in 0..5 { + checked_proofs + .entry(replicator_1_storage_id) + .or_default() + .push(submit_proof( + &mint_keypair, + &replicator_1_storage_keypair, + slot, + &bank_client, + )); + checked_proofs + .entry(replicator_2_storage_id) + .or_default() + .push(submit_proof( + &mint_keypair, + &replicator_2_storage_keypair, + slot, + &bank_client, + )); + } let message = Message::new_with_payer( vec![storage_instruction::advertise_recent_blockhash( - &validator_pubkey, + &validator_storage_id, Hash::default(), SLOTS_PER_SEGMENT * 2, )], @@ -335,34 +340,27 @@ mod tests { } assert_matches!( - bank_client.send_message(&[&mint_keypair, &validator_keypair], message), + bank_client.send_message(&[&mint_keypair, &validator_storage_keypair], message), Ok(_) ); let message = Message::new_with_payer( vec![storage_instruction::proof_validation( - &validator_pubkey, - slot, - vec![CheckedProof { - proof: Proof { - id: replicator_pubkey, - signature: Signature::default(), - sha_state: Hash::default(), - }, - status: ProofStatus::Valid, - }], + &validator_storage_id, + get_segment_from_slot(slot) as u64, + checked_proofs, )], Some(&mint_pubkey), ); assert_matches!( - bank_client.send_message(&[&mint_keypair, &validator_keypair], message), + bank_client.send_message(&[&mint_keypair, &validator_storage_keypair], message), Ok(_) ); let message = Message::new_with_payer( vec![storage_instruction::advertise_recent_blockhash( - &validator_pubkey, + &validator_storage_id, Hash::default(), SLOTS_PER_SEGMENT * 3, )], @@ -375,25 +373,24 @@ mod tests { } assert_matches!( - bank_client.send_message(&[&mint_keypair, &validator_keypair], message), + bank_client.send_message(&[&mint_keypair, &validator_storage_keypair], message), Ok(_) ); - assert_eq!(bank_client.get_balance(&validator_pubkey).unwrap(), 10,); + assert_eq!(bank_client.get_balance(&validator_storage_id).unwrap(), 10); let message = Message::new_with_payer( vec![storage_instruction::claim_reward( - &validator_pubkey, + &validator_storage_id, &mining_pool_pubkey, slot, )], Some(&mint_pubkey), ); assert_matches!(bank_client.send_message(&[&mint_keypair], message), Ok(_)); - assert_eq!( - bank_client.get_balance(&validator_pubkey).unwrap(), - 10 + TOTAL_VALIDATOR_REWARDS + bank_client.get_balance(&validator_storage_id).unwrap(), + 10 + (TOTAL_VALIDATOR_REWARDS * 10) ); // tick the bank into the next storage epoch so that rewards can be claimed @@ -401,11 +398,24 @@ mod tests { bank.register_tick(&bank.last_blockhash()); } - assert_eq!(bank_client.get_balance(&replicator_pubkey).unwrap(), 10); + assert_eq!( + bank_client.get_balance(&replicator_1_storage_id).unwrap(), + 10 + ); let message = Message::new_with_payer( vec![storage_instruction::claim_reward( - &replicator_pubkey, + &replicator_1_storage_id, + &mining_pool_pubkey, + slot, + )], + Some(&mint_pubkey), + ); + assert_matches!(bank_client.send_message(&[&mint_keypair], message), Ok(_)); + + let message = Message::new_with_payer( + vec![storage_instruction::claim_reward( + &replicator_2_storage_id, &mining_pool_pubkey, slot, )], @@ -414,7 +424,40 @@ mod tests { assert_matches!(bank_client.send_message(&[&mint_keypair], message), Ok(_)); // TODO enable when rewards are working - // assert_eq!(bank_client.get_balance(&replicator_pubkey).unwrap(), 10 + TOTAL_REPLICATOR_REWARDS); + assert_eq!( + bank_client.get_balance(&replicator_1_storage_id).unwrap(), + 10 + (TOTAL_REPLICATOR_REWARDS * 5) + ); + } + + fn init_storage_accounts( + client: &BankClient, + mint: &Keypair, + validator_accounts_to_create: &[&Pubkey], + replicator_accounts_to_create: &[&Pubkey], + lamports: u64, + ) { + let mut ixs: Vec<_> = validator_accounts_to_create + .into_iter() + .flat_map(|account| { + storage_instruction::create_validator_storage_account( + &mint.pubkey(), + account, + lamports, + ) + }) + .collect(); + replicator_accounts_to_create + .into_iter() + .for_each(|account| { + ixs.append(&mut storage_instruction::create_replicator_storage_account( + &mint.pubkey(), + account, + lamports, + )) + }); + let message = Message::new(ixs); + client.send_message(&[mint], message).unwrap(); } fn get_storage_slot(client: &C, account: &Pubkey) -> u64 { @@ -437,6 +480,36 @@ mod tests { 0 } + fn submit_proof( + mint_keypair: &Keypair, + storage_keypair: &Keypair, + slot: u64, + bank_client: &BankClient, + ) -> CheckedProof { + let sha_state = Hash::new(Pubkey::new_rand().as_ref()); + let message = Message::new_with_payer( + vec![storage_instruction::mining_proof( + &storage_keypair.pubkey(), + sha_state, + slot, + Signature::default(), + )], + Some(&mint_keypair.pubkey()), + ); + + assert_matches!( + bank_client.send_message(&[&mint_keypair, &storage_keypair], message), + Ok(_) + ); + CheckedProof { + proof: Proof { + signature: Signature::default(), + sha_state, + }, + status: ProofStatus::Valid, + } + } + fn get_storage_blockhash(client: &C, account: &Pubkey) -> Hash { if let Some(storage_system_account_data) = client.get_account_data(&account).unwrap() { let contract = deserialize(&storage_system_account_data);