From 788290ad8299c2d61cdc391b60f9dae71ad87a37 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Fri, 17 May 2019 14:52:54 -0700 Subject: [PATCH] Rework Storage Program to accept multiple proofs per segment (#4319) automerge --- core/src/fullnode.rs | 3 +- core/src/storage_stage.rs | 38 +-- programs/storage_api/src/storage_contract.rs | 235 +++++++++++------- programs/storage_api/src/storage_processor.rs | 81 ++++-- 4 files changed, 238 insertions(+), 119 deletions(-) diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index ad2ee545d9..17f38654f3 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -23,6 +23,7 @@ use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::timing::timestamp; +use solana_storage_api::SLOTS_PER_SEGMENT; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::Receiver; @@ -44,7 +45,7 @@ impl Default for FullnodeConfig { // TODO: remove this, temporary parameter to configure // storage amount differently for test configurations // so tests don't take forever to run. - const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = 128; + const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = SLOTS_PER_SEGMENT; Self { sigverify_disabled: false, voting_disabled: false, diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 76b1fe8386..19b8ecb4f1 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -400,13 +400,15 @@ impl StorageStage { ) -> Result<()> { let timeout = Duration::new(1, 0); let slot: u64 = slot_receiver.recv_timeout(timeout)?; - storage_state.write().unwrap().slot = slot; *slot_count += 1; + // Todo check if any rooted slots were missed leading up to this one and bump slot count and process proofs for each missed root + // Update the advertised blockhash to the latest root directly. + if let Ok(entries) = blocktree.get_slot_entries(slot, 0, None) { - for entry in entries { + for entry in &entries { // Go through the transactions, find proofs, and use them to update // the storage_keys with their signatures - for tx in entry.transactions { + for tx in &entry.transactions { for (i, program_id) in tx.message.program_ids().iter().enumerate() { if solana_storage_api::check_id(&program_id) { Self::process_storage_transaction( @@ -419,20 +421,22 @@ impl StorageStage { } } } - if *slot_count % storage_rotate_count == 0 { - debug!( - "crosses sending at slot: {}! hashes: {}", - slot, entry.num_hashes - ); - Self::process_entry_crossing( - &storage_keypair, - &storage_state, - &blocktree, - entry.hash, - slot, - instruction_sender, - )?; - } + } + if *slot_count % storage_rotate_count == 0 { + // assume the last entry in the slot is the blockhash for that slot + let entry_hash = entries.last().unwrap().hash; + debug!( + "crosses sending at root slot: {}! with last entry's hash {}", + slot_count, entry_hash + ); + Self::process_entry_crossing( + &storage_keypair, + &storage_state, + &blocktree, + entries.last().unwrap().hash, + slot, + instruction_sender, + )?; } } Ok(()) diff --git a/programs/storage_api/src/storage_contract.rs b/programs/storage_api/src/storage_contract.rs index a9591fa951..64c1d8a2dd 100644 --- a/programs/storage_api/src/storage_contract.rs +++ b/programs/storage_api/src/storage_contract.rs @@ -7,7 +7,7 @@ use solana_sdk::instruction::InstructionError; use solana_sdk::instruction_processor_utils::State; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; -use std::cmp; +use std::collections::HashMap; pub const TOTAL_VALIDATOR_REWARDS: u64 = 1; pub const TOTAL_REPLICATOR_REWARDS: u64 = 1; @@ -25,7 +25,7 @@ impl Default for ProofStatus { } } -#[derive(Default, Debug, Serialize, Deserialize, Clone)] +#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct Proof { pub id: Pubkey, pub signature: Signature, @@ -46,12 +46,15 @@ pub enum StorageContract { ValidatorStorage { slot: u64, hash: Hash, - lockout_validations: Vec>, - reward_validations: Vec>, + lockout_validations: HashMap>, + reward_validations: HashMap>, }, ReplicatorStorage { - proofs: Vec, - reward_validations: Vec>, + /// Map of Proofs per segment, in a HashMap based on the sha_state + proofs: HashMap>, + /// Map of Rewards per segment, in a HashMap based on the sha_state + /// Multiple validators can validate the same set of proofs so it needs a Vec + reward_validations: HashMap>>, }, } @@ -70,23 +73,22 @@ impl<'a> StorageAccount<'a> { sha_state: Hash, slot: u64, signature: Signature, + current_slot: u64, ) -> Result<(), InstructionError> { let mut storage_contract = &mut self.account.state()?; if let StorageContract::Default = storage_contract { *storage_contract = StorageContract::ReplicatorStorage { - proofs: vec![], - reward_validations: vec![], + proofs: HashMap::new(), + reward_validations: HashMap::new(), }; }; if let StorageContract::ReplicatorStorage { proofs, .. } = &mut storage_contract { let segment_index = get_segment_from_slot(slot); - if segment_index >= proofs.len() || proofs.is_empty() { - proofs.resize(cmp::max(1, segment_index + 1), Proof::default()); - } + let current_segment = get_segment_from_slot(current_slot); - if segment_index >= proofs.len() { - // only possible if usize max < u64 max + if segment_index >= current_segment { + // attempt to submit proof for unconfirmed segment return Err(InstructionError::InvalidArgument); } @@ -95,12 +97,14 @@ impl<'a> StorageAccount<'a> { sha_state, slot ); - let proof_info = Proof { - id, + proofs.entry(segment_index).or_default().insert( sha_state, - signature, - }; - proofs[segment_index] = proof_info; + Proof { + id, + sha_state, + signature, + }, + ); self.account.set_state(storage_contract) } else { Err(InstructionError::InvalidArgument)? @@ -111,14 +115,15 @@ impl<'a> StorageAccount<'a> { &mut self, hash: Hash, slot: u64, + current_slot: u64, ) -> Result<(), InstructionError> { let mut storage_contract = &mut self.account.state()?; if let StorageContract::Default = storage_contract { *storage_contract = StorageContract::ValidatorStorage { slot: 0, hash: Hash::default(), - lockout_validations: vec![], - reward_validations: vec![], + lockout_validations: HashMap::new(), + reward_validations: HashMap::new(), }; }; @@ -129,23 +134,22 @@ impl<'a> StorageAccount<'a> { lockout_validations, } = &mut storage_contract { - let original_segments = get_segment_from_slot(*state_slot); - let segments = get_segment_from_slot(slot); + let current_segment = get_segment_from_slot(current_slot); + let original_segment = get_segment_from_slot(*state_slot); + let segment = get_segment_from_slot(slot); debug!( - "advertise new last id segments: {} orig: {}", - segments, original_segments + "advertise new segment: {} orig: {}", + segment, current_segment ); - if segments <= original_segments { + if segment < original_segment || segment >= current_segment { return Err(InstructionError::InvalidArgument); } *state_slot = slot; *state_hash = hash; - // move lockout_validations to reward_validations - *reward_validations = lockout_validations.clone(); - lockout_validations.clear(); - lockout_validations.resize(segments as usize, Vec::new()); + // move storage epoch updated, move the lockout_validations to reward_validations + reward_validations.extend(lockout_validations.drain()); self.account.set_state(storage_contract) } else { Err(InstructionError::InvalidArgument)? @@ -163,22 +167,24 @@ impl<'a> StorageAccount<'a> { *storage_contract = StorageContract::ValidatorStorage { slot: 0, hash: Hash::default(), - lockout_validations: vec![], - reward_validations: vec![], + lockout_validations: HashMap::new(), + reward_validations: HashMap::new(), }; }; if let StorageContract::ValidatorStorage { - slot: current_slot, + slot: state_slot, lockout_validations, .. } = &mut storage_contract { - if slot >= *current_slot { + let segment_index = get_segment_from_slot(slot); + let state_segment = get_segment_from_slot(*state_slot); + + if segment_index > state_segment { return Err(InstructionError::InvalidArgument); } - let segment_index = get_segment_from_slot(slot); let mut previous_proofs = replicator_accounts .iter_mut() .filter_map(|account| { @@ -187,9 +193,10 @@ impl<'a> StorageAccount<'a> { .state() .ok() .map(move |contract| match contract { - StorageContract::ReplicatorStorage { proofs, .. } => { - Some((account, proofs[segment_index].clone())) - } + StorageContract::ReplicatorStorage { proofs, .. } => Some(( + account, + proofs.get(&segment_index).cloned().unwrap_or_default(), + )), _ => None, }) }) @@ -201,21 +208,29 @@ impl<'a> StorageAccount<'a> { return Err(InstructionError::InvalidArgument); } - let mut valid_proofs: Vec<_> = proofs + let valid_proofs: Vec<_> = proofs .into_iter() .enumerate() .filter_map(|(i, entry)| { - let (account, proof) = &mut previous_proofs[i]; - if process_validation(account, segment_index, &proof, &entry).is_ok() { - Some(entry) - } else { - None - } + let (account, proofs) = &mut previous_proofs[i]; + proofs.get(&entry.proof.sha_state).map(|proof| { + if process_validation(account, segment_index, &proof, &entry).is_ok() { + Some(entry) + } else { + None + } + }) }) + .flatten() .collect(); // allow validators to store successful validations - lockout_validations[segment_index].append(&mut valid_proofs); + valid_proofs.into_iter().for_each(|proof| { + lockout_validations + .entry(segment_index) + .or_default() + .insert(proof.proof.sha_state, proof); + }); self.account.set_state(storage_contract) } else { Err(InstructionError::InvalidArgument)? @@ -225,7 +240,7 @@ impl<'a> StorageAccount<'a> { pub fn claim_storage_reward( &mut self, slot: u64, - tick_height: u64, + current_slot: u64, ) -> Result<(), InstructionError> { let mut storage_contract = &mut self.account.state()?; if let StorageContract::Default = storage_contract { @@ -233,38 +248,77 @@ impl<'a> StorageAccount<'a> { }; if let StorageContract::ValidatorStorage { - reward_validations, .. + reward_validations, + slot: state_slot, + .. } = &mut storage_contract { - let claims_index = get_segment_from_slot(slot); - let _num_validations = count_valid_proofs(&reward_validations[claims_index]); - // TODO can't just create lamports out of thin air - // self.account.lamports += TOTAL_VALIDATOR_REWARDS * num_validations; - reward_validations.clear(); - self.account.set_state(storage_contract) - } else if let StorageContract::ReplicatorStorage { - reward_validations, .. - } = &mut storage_contract - { - // if current tick height is a full segment away? then allow reward collection - // storage needs to move to tick heights too, until then this makes little sense - let current_index = get_segment_from_slot(tick_height); - let claims_index = get_segment_from_slot(slot); - if current_index <= claims_index || claims_index >= reward_validations.len() { + let state_segment = get_segment_from_slot(*state_slot); + let claim_segment = get_segment_from_slot(slot); + if state_segment <= claim_segment || !reward_validations.contains_key(&claim_segment) { debug!( - "current {:?}, claim {:?}, rewards {:?}", - current_index, - claims_index, + "current {:?}, claim {:?}, have rewards for {:?} segments", + state_segment, + claim_segment, reward_validations.len() ); return Err(InstructionError::InvalidArgument); } - let _num_validations = count_valid_proofs(&reward_validations[claims_index]); + let _num_validations = count_valid_proofs( + &reward_validations + .remove(&claim_segment) + .map(|mut proofs| proofs.drain().map(|(_, proof)| proof).collect::>()) + .unwrap_or_default(), + ); + // TODO can't just create lamports out of thin air + // self.account.lamports += TOTAL_VALIDATOR_REWARDS * num_validations; + self.account.set_state(storage_contract) + } else if let StorageContract::ReplicatorStorage { + proofs, + reward_validations, + } = &mut storage_contract + { + // if current tick height is a full segment away, allow reward collection + let claim_index = get_segment_from_slot(current_slot); + let claim_segment = get_segment_from_slot(slot); + // Todo this might might always be true + if claim_index <= claim_segment + || !reward_validations.contains_key(&claim_segment) + || !proofs.contains_key(&claim_segment) + { + debug!( + "current {:?}, claim {:?}, have rewards for {:?} segments", + claim_index, + claim_segment, + reward_validations.len() + ); + return Err(InstructionError::InvalidArgument); + } + // remove proofs for which rewards have already been collected + let segment_proofs = proofs.get_mut(&claim_segment).unwrap(); + let checked_proofs = reward_validations + .remove(&claim_segment) + .map(|mut proofs| { + proofs + .drain() + .map(|(_, proof)| { + proof + .into_iter() + .map(|proof| { + segment_proofs.remove(&proof.proof.sha_state); + proof + }) + .collect::>() + }) + .flatten() + .collect::>() + }) + .unwrap_or_default(); + let _num_validations = count_valid_proofs(&checked_proofs); // TODO can't just create lamports out of thin air // self.account.lamports += num_validations // * TOTAL_REPLICATOR_REWARDS - // * (num_validations / reward_validations[claims_index].len() as u64); - reward_validations.clear(); + // * (num_validations / reward_validations[claim_segment].len() as u64); self.account.set_state(storage_contract) } else { Err(InstructionError::InvalidArgument)? @@ -275,8 +329,8 @@ impl<'a> StorageAccount<'a> { /// Store the result of a proof validation into the replicator account fn store_validation_result( storage_account: &mut StorageAccount, - segment_index: usize, - status: ProofStatus, + segment: usize, + checked_proof: CheckedProof, ) -> Result<(), InstructionError> { let mut storage_contract = storage_account.account.state()?; match &mut storage_contract { @@ -285,17 +339,24 @@ fn store_validation_result( reward_validations, .. } => { - if segment_index >= proofs.len() { + if !proofs.contains_key(&segment) { return Err(InstructionError::InvalidAccountData); } - if segment_index > reward_validations.len() || reward_validations.is_empty() { - reward_validations.resize(cmp::max(1, segment_index), vec![]); + + if proofs + .get(&segment) + .unwrap() + .contains_key(&checked_proof.proof.sha_state) + { + reward_validations + .entry(segment) + .or_default() + .entry(checked_proof.proof.sha_state) + .or_default() + .push(checked_proof); + } else { + return Err(InstructionError::InvalidAccountData); } - let result = proofs[segment_index].clone(); - reward_validations[segment_index].push(CheckedProof { - proof: result, - status, - }); } _ => return Err(InstructionError::InvalidAccountData), } @@ -318,7 +379,7 @@ fn process_validation( proof: &Proof, checked_proof: &CheckedProof, ) -> Result<(), InstructionError> { - store_validation_result(account, segment_index, checked_proof.status.clone())?; + store_validation_result(account, segment_index, checked_proof.clone())?; if proof.signature != checked_proof.proof.signature || checked_proof.status != ProofStatus::Valid { @@ -350,16 +411,16 @@ mod tests { contract = StorageContract::ValidatorStorage { slot: 0, hash: Hash::default(), - lockout_validations: vec![], - reward_validations: vec![], + lockout_validations: HashMap::new(), + reward_validations: HashMap::new(), }; storage_account.account.set_state(&contract).unwrap(); if let StorageContract::ReplicatorStorage { .. } = contract { panic!("Wrong contract type"); } contract = StorageContract::ReplicatorStorage { - proofs: vec![], - reward_validations: vec![], + proofs: HashMap::new(), + reward_validations: HashMap::new(), }; storage_account.account.set_state(&contract).unwrap(); if let StorageContract::ValidatorStorage { .. } = contract { @@ -394,9 +455,13 @@ mod tests { account.account.data.resize(4 * 1024, 0); let storage_contract = &mut account.account.state().unwrap(); if let StorageContract::Default = storage_contract { + let mut proof_map = HashMap::new(); + proof_map.insert(proof.sha_state, proof.clone()); + let mut proofs = HashMap::new(); + proofs.insert(0, proof_map); *storage_contract = StorageContract::ReplicatorStorage { - proofs: vec![proof.clone()], - reward_validations: vec![], + proofs, + reward_validations: HashMap::new(), }; }; account.account.set_state(storage_contract).unwrap(); diff --git a/programs/storage_api/src/storage_processor.rs b/programs/storage_api/src/storage_processor.rs index d5f8001fab..8030197630 100644 --- a/programs/storage_api/src/storage_processor.rs +++ b/programs/storage_api/src/storage_processor.rs @@ -8,6 +8,7 @@ use log::*; use solana_sdk::account::KeyedAccount; use solana_sdk::instruction::InstructionError; use solana_sdk::pubkey::Pubkey; +use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; pub fn process_instruction( _program_id: &Pubkey, @@ -43,7 +44,13 @@ pub fn process_instruction( if num_keyed_accounts != 1 { Err(InstructionError::InvalidArgument)?; } - storage_account.submit_mining_proof(storage_account_pubkey, sha_state, slot, signature) + storage_account.submit_mining_proof( + storage_account_pubkey, + sha_state, + slot, + signature, + tick_height / DEFAULT_TICKS_PER_SLOT, + ) } StorageInstruction::AdvertiseStorageRecentBlockhash { hash, slot } => { if num_keyed_accounts != 1 { @@ -51,7 +58,11 @@ pub fn process_instruction( // to access its data Err(InstructionError::InvalidArgument)?; } - storage_account.advertise_storage_recent_blockhash(hash, slot) + storage_account.advertise_storage_recent_blockhash( + hash, + slot, + tick_height / DEFAULT_TICKS_PER_SLOT, + ) } StorageInstruction::ClaimStorageReward { slot } => { if num_keyed_accounts != 1 { @@ -59,7 +70,7 @@ pub fn process_instruction( // to access its data Err(InstructionError::InvalidArgument)?; } - storage_account.claim_storage_reward(slot, tick_height) + storage_account.claim_storage_reward(slot, tick_height / DEFAULT_TICKS_PER_SLOT) } StorageInstruction::ProofValidation { slot, proofs } => { if num_keyed_accounts == 1 { @@ -89,10 +100,14 @@ mod tests { use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::system_instruction; + use std::sync::Arc; + + const TICKS_IN_SEGMENT: u64 = SLOTS_PER_SEGMENT * DEFAULT_TICKS_PER_SLOT; fn test_instruction( ix: &Instruction, program_accounts: &mut [Account], + tick_height: u64, ) -> Result<(), InstructionError> { let mut keyed_accounts: Vec<_> = ix .accounts @@ -103,7 +118,7 @@ mod tests { }) .collect(); - let ret = process_instruction(&id(), &mut keyed_accounts, &ix.data, 42); + let ret = process_instruction(&id(), &mut keyed_accounts, &ix.data, tick_height); info!("ret: {:?}", ret); ret } @@ -122,8 +137,13 @@ mod tests { SLOTS_PER_SEGMENT, Signature::default(), ); + // the proof is for slot 16, which is in segment 0, need to move the tick height into segment 2 + let ticks_till_next_segment = TICKS_IN_SEGMENT * 2; - assert_eq!(test_instruction(&ix, &mut [account]), Ok(())); + assert_eq!( + test_instruction(&ix, &mut [account], ticks_till_next_segment), + Ok(()) + ); } #[test] @@ -160,11 +180,14 @@ mod tests { let ix = storage_instruction::mining_proof(&pubkey, Hash::default(), 0, Signature::default()); - assert!(test_instruction(&ix, &mut accounts).is_err()); + // move tick height into segment 1 + let ticks_till_next_segment = TICKS_IN_SEGMENT + 1; + + assert!(test_instruction(&ix, &mut accounts, ticks_till_next_segment).is_err()); let mut accounts = [Account::default(), Account::default(), Account::default()]; - assert!(test_instruction(&ix, &mut accounts).is_err()); + assert!(test_instruction(&ix, &mut accounts, ticks_till_next_segment).is_err()); } #[test] @@ -172,13 +195,14 @@ mod tests { solana_logger::setup(); let pubkey = Pubkey::new_rand(); let mut accounts = [Account::default(), Account::default()]; + accounts[0].data.resize(16 * 1024, 0); accounts[1].data.resize(16 * 1024, 0); let ix = storage_instruction::mining_proof(&pubkey, Hash::default(), 0, Signature::default()); - // Haven't seen a transaction to roll over the epoch, so this should fail - assert!(test_instruction(&ix, &mut accounts).is_err()); + // submitting a proof for a slot in the past, so this should fail + assert!(test_instruction(&ix, &mut accounts, 0).is_err()); } #[test] @@ -190,12 +214,13 @@ mod tests { let ix = storage_instruction::mining_proof(&pubkey, Hash::default(), 0, Signature::default()); + // move tick height into segment 1 + let ticks_till_next_segment = TICKS_IN_SEGMENT + 1; - test_instruction(&ix, &mut accounts).unwrap(); + test_instruction(&ix, &mut accounts, ticks_till_next_segment).unwrap(); } #[test] - #[ignore] fn test_validate_mining() { solana_logger::setup(); let (genesis_block, mint_keypair) = create_genesis_block(1000); @@ -207,8 +232,9 @@ mod tests { let mut bank = Bank::new(&genesis_block); bank.add_instruction_processor(id(), process_instruction); + let bank = Arc::new(bank); let slot = 0; - let bank_client = BankClient::new(bank); + let bank_client = BankClient::new_shared(&bank); let ix = system_instruction::create_account(&mint_pubkey, &validator, 10, 4 * 1042, &id()); bank_client.send_instruction(&mint_keypair, ix).unwrap(); @@ -216,6 +242,13 @@ mod tests { let ix = system_instruction::create_account(&mint_pubkey, &replicator, 10, 4 * 1042, &id()); bank_client.send_instruction(&mint_keypair, ix).unwrap(); + // tick the bank up until it's moved into storage segment 2 because the next advertise is for segment 1 + let next_storage_segment_tick_height = TICKS_IN_SEGMENT * 2; + for _ in 0..next_storage_segment_tick_height { + bank.register_tick(&bank.last_blockhash()); + } + + // advertise for storage segment 1 let ix = storage_instruction::advertise_recent_blockhash( &validator, Hash::default(), @@ -241,6 +274,12 @@ mod tests { Hash::default(), SLOTS_PER_SEGMENT * 2, ); + + let next_storage_segment_tick_height = TICKS_IN_SEGMENT; + for _ in 0..next_storage_segment_tick_height { + bank.register_tick(&bank.last_blockhash()); + } + bank_client .send_instruction(&validator_keypair, ix) .unwrap(); @@ -266,6 +305,12 @@ mod tests { Hash::default(), SLOTS_PER_SEGMENT * 3, ); + + let next_storage_segment_tick_height = TICKS_IN_SEGMENT; + for _ in 0..next_storage_segment_tick_height { + bank.register_tick(&bank.last_blockhash()); + } + bank_client .send_instruction(&validator_keypair, ix) .unwrap(); @@ -278,11 +323,10 @@ mod tests { // TODO enable when rewards are working // assert_eq!(bank_client.get_balance(&validator).unwrap(), TOTAL_VALIDATOR_REWARDS); - // TODO extend BankClient with a method to force a block boundary // tick the bank into the next storage epoch so that rewards can be claimed - //for _ in 0..=ENTRIES_PER_SEGMENT { - // bank.register_tick(&bank.last_blockhash()); - //} + for _ in 0..=TICKS_IN_SEGMENT { + bank.register_tick(&bank.last_blockhash()); + } let ix = storage_instruction::reward_claim(&replicator, slot); bank_client @@ -339,6 +383,11 @@ mod tests { let mut bank = Bank::new(&genesis_block); bank.add_instruction_processor(id(), process_instruction); + // tick the bank up until it's moved into storage segment 2 + let next_storage_segment_tick_height = TICKS_IN_SEGMENT * 2; + for _ in 0..next_storage_segment_tick_height { + bank.register_tick(&bank.last_blockhash()); + } let bank_client = BankClient::new(bank); let x = 42;