From abbb0378885f844e9f496e55c213a2f4fa4a2575 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Thu, 17 Jan 2019 14:41:48 -0800 Subject: [PATCH] Implement storage contract logic --- programs/native/storage/src/lib.rs | 327 ++++++++++++++++++++++++++++- sdk/src/storage_program.rs | 130 +++++++++++- src/bank.rs | 95 +++++++++ src/storage_stage.rs | 138 ++++++++++-- src/tvu.rs | 5 +- 5 files changed, 675 insertions(+), 20 deletions(-) diff --git a/programs/native/storage/src/lib.rs b/programs/native/storage/src/lib.rs index 6748227864..df67895d79 100644 --- a/programs/native/storage/src/lib.rs +++ b/programs/native/storage/src/lib.rs @@ -3,12 +3,27 @@ //! and give reward for good proofs. use log::*; +extern crate solana_sdk; + use solana_sdk::account::KeyedAccount; use solana_sdk::native_program::ProgramError; use solana_sdk::pubkey::Pubkey; use solana_sdk::solana_entrypoint; use solana_sdk::storage_program::*; +pub const TOTAL_VALIDATOR_REWARDS: u64 = 1000; +pub const TOTAL_REPLICATOR_REWARDS: u64 = 1000; + +fn count_valid_proofs(proofs: &[ProofStatus]) -> u64 { + let mut num = 0; + for proof in proofs { + if let ProofStatus::Valid = proof { + num += 1; + } + } + num +} + solana_entrypoint!(entrypoint); fn entrypoint( _program_id: &Pubkey, @@ -18,25 +33,149 @@ fn entrypoint( ) -> Result<(), ProgramError> { solana_logger::setup(); + if keyed_accounts.len() != 2 { + // keyed_accounts[1] should be the main storage key + // to access its userdata + Err(ProgramError::InvalidArgument)?; + } + // accounts_keys[0] must be signed if keyed_accounts[0].signer_key().is_none() { info!("account[0] is unsigned"); + Err(ProgramError::GenericError)?; + } + + if *keyed_accounts[1].unsigned_key() != system_id() { + info!( + "invalid account id owner: {:?} system_id: {:?}", + keyed_accounts[1].unsigned_key(), + system_id() + ); Err(ProgramError::InvalidArgument)?; } if let Ok(syscall) = bincode::deserialize(data) { + let mut storage_account_state = if let Ok(storage_account_state) = + bincode::deserialize(&keyed_accounts[1].account.userdata) + { + storage_account_state + } else { + StorageProgramState::default() + }; + + debug!( + "deserialized state height: {}", + storage_account_state.entry_height + ); match syscall { StorageProgram::SubmitMiningProof { sha_state, entry_height, - .. + signature, } => { - info!( + let segment_index = get_segment_from_entry(entry_height); + let current_segment_index = + get_segment_from_entry(storage_account_state.entry_height); + if segment_index >= current_segment_index { + return Err(ProgramError::InvalidArgument); + } + + debug!( "Mining proof submitted with state {:?} entry_height: {}", sha_state, entry_height ); + + let proof_info = ProofInfo { + id: *keyed_accounts[0].signer_key().unwrap(), + sha_state, + signature, + }; + storage_account_state.proofs[segment_index].push(proof_info); + } + StorageProgram::AdvertiseStorageLastId { id, entry_height } => { + let original_segments = storage_account_state.entry_height / ENTRIES_PER_SEGMENT; + let segments = entry_height / ENTRIES_PER_SEGMENT; + debug!( + "advertise new last id segments: {} orig: {}", + segments, original_segments + ); + if segments <= original_segments { + return Err(ProgramError::InvalidArgument); + } + + storage_account_state.entry_height = entry_height; + storage_account_state.id = id; + + // move the proofs to previous_proofs + storage_account_state.previous_proofs = storage_account_state.proofs.clone(); + storage_account_state.proofs.clear(); + storage_account_state + .proofs + .resize(segments as usize, Vec::new()); + + // move lockout_validations to reward_validations + storage_account_state.reward_validations = + storage_account_state.lockout_validations.clone(); + storage_account_state.lockout_validations.clear(); + storage_account_state + .lockout_validations + .resize(segments as usize, Vec::new()); + } + StorageProgram::ProofValidation { + entry_height, + proof_mask, + } => { + if entry_height >= storage_account_state.entry_height { + return Err(ProgramError::InvalidArgument); + } + + let segment_index = get_segment_from_entry(entry_height); + if storage_account_state.previous_proofs[segment_index].len() != proof_mask.len() { + return Err(ProgramError::InvalidArgument); + } + + // TODO: Check that each proof mask matches the signature + /*for (i, entry) in proof_mask.iter().enumerate() { + if storage_account_state.previous_proofs[segment_index][i] != signature.as_ref[0] { + return Err(ProgramError::InvalidArgument); + } + }*/ + + let info = ValidationInfo { + id: *keyed_accounts[0].signer_key().unwrap(), + proof_mask, + }; + storage_account_state.lockout_validations[segment_index].push(info); + } + StorageProgram::ClaimStorageReward { entry_height } => { + let claims_index = get_segment_from_entry(entry_height); + let account_key = keyed_accounts[0].signer_key().unwrap(); + let mut num_validations = 0; + let mut total_validations = 0; + for validation in &storage_account_state.reward_validations[claims_index] { + if *account_key == validation.id { + num_validations += count_valid_proofs(&validation.proof_mask); + } else { + total_validations += count_valid_proofs(&validation.proof_mask); + } + } + total_validations += num_validations; + if total_validations > 0 { + keyed_accounts[0].account.tokens += + (TOTAL_VALIDATOR_REWARDS * num_validations) / total_validations; + } } } + + if bincode::serialize_into( + &mut keyed_accounts[1].account.userdata[..], + &storage_account_state, + ) + .is_err() + { + return Err(ProgramError::UserdataTooSmall); + } + Ok(()) } else { info!("Invalid instruction userdata: {:?}", data); @@ -47,8 +186,42 @@ fn entrypoint( #[cfg(test)] mod test { use super::*; - use solana_sdk::account::create_keyed_accounts; - use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::account::{create_keyed_accounts, Account}; + use solana_sdk::hash::Hash; + use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; + use solana_sdk::storage_program; + use solana_sdk::storage_program::ProofStatus; + use solana_sdk::storage_program::StorageTransaction; + use solana_sdk::transaction::{Instruction, Transaction}; + + fn test_transaction( + tx: &Transaction, + program_accounts: &mut [Account], + ) -> Result<(), ProgramError> { + assert_eq!(tx.instructions.len(), 1); + let Instruction { + ref accounts, + ref userdata, + .. + } = tx.instructions[0]; + + info!("accounts: {:?}", accounts); + + let mut keyed_accounts: Vec<_> = accounts + .iter() + .map(|&index| { + let index = index as usize; + let key = &tx.account_keys[index]; + (key, index < tx.signatures.len()) + }) + .zip(program_accounts.iter_mut()) + .map(|((key, is_signer), account)| KeyedAccount::new(key, is_signer, account)) + .collect(); + + let ret = entrypoint(&id(), &mut keyed_accounts, &userdata, 42); + info!("ret: {:?}", ret); + ret + } #[test] fn test_storage_tx() { @@ -57,4 +230,150 @@ mod test { let mut keyed_accounts = create_keyed_accounts(&mut accounts); assert!(entrypoint(&id(), &mut keyed_accounts, &[], 42).is_err()); } + + #[test] + fn test_serialize_overflow() { + let keypair = Keypair::new(); + let mut keyed_accounts = Vec::new(); + let mut user_account = Account::default(); + let mut system_account = Account::default(); + let pubkey = keypair.pubkey(); + let system_key = storage_program::system_id(); + keyed_accounts.push(KeyedAccount::new(&pubkey, true, &mut user_account)); + keyed_accounts.push(KeyedAccount::new(&system_key, false, &mut system_account)); + + let tx = Transaction::storage_new_advertise_last_id( + &keypair, + Hash::default(), + Hash::default(), + ENTRIES_PER_SEGMENT, + ); + + assert_eq!( + entrypoint(&id(), &mut keyed_accounts, &tx.instructions[0].userdata, 42), + Err(ProgramError::UserdataTooSmall) + ); + } + + #[test] + fn test_invalid_accounts_len() { + let keypair = Keypair::new(); + let mut accounts = [Default::default()]; + + let tx = Transaction::storage_new_mining_proof( + &keypair, + Hash::default(), + Hash::default(), + 0, + Signature::default(), + ); + assert!(test_transaction(&tx, &mut accounts).is_err()); + + let mut accounts = [Default::default(), Default::default(), Default::default()]; + + assert!(test_transaction(&tx, &mut accounts).is_err()); + } + + #[test] + fn test_submit_mining_invalid_entry_height() { + solana_logger::setup(); + let keypair = Keypair::new(); + let mut accounts = [Account::default(), Account::default()]; + accounts[1].userdata.resize(16 * 1024, 0); + + let tx = Transaction::storage_new_mining_proof( + &keypair, + Hash::default(), + Hash::default(), + 0, + Signature::default(), + ); + + // Haven't seen a transaction to roll over the epoch, so this should fail + assert!(test_transaction(&tx, &mut accounts).is_err()); + } + + #[test] + fn test_submit_mining_ok() { + solana_logger::setup(); + let keypair = Keypair::new(); + let mut accounts = [Account::default(), Account::default()]; + accounts[1].userdata.resize(16 * 1024, 0); + + let tx = Transaction::storage_new_advertise_last_id( + &keypair, + Hash::default(), + Hash::default(), + ENTRIES_PER_SEGMENT, + ); + + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + let tx = Transaction::storage_new_mining_proof( + &keypair, + Hash::default(), + Hash::default(), + 0, + Signature::default(), + ); + + assert!(test_transaction(&tx, &mut accounts).is_ok()); + } + + #[test] + fn test_validate_mining() { + solana_logger::setup(); + let keypair = Keypair::new(); + let mut accounts = [Account::default(), Account::default()]; + accounts[1].userdata.resize(16 * 1024, 0); + + let entry_height = 0; + + let tx = Transaction::storage_new_advertise_last_id( + &keypair, + Hash::default(), + Hash::default(), + ENTRIES_PER_SEGMENT, + ); + + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + let tx = Transaction::storage_new_mining_proof( + &keypair, + Hash::default(), + Hash::default(), + entry_height, + Signature::default(), + ); + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + let tx = Transaction::storage_new_advertise_last_id( + &keypair, + Hash::default(), + Hash::default(), + ENTRIES_PER_SEGMENT * 2, + ); + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + let tx = Transaction::storage_new_proof_validation( + &keypair, + Hash::default(), + entry_height, + vec![ProofStatus::Valid], + ); + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + let tx = Transaction::storage_new_advertise_last_id( + &keypair, + Hash::default(), + Hash::default(), + ENTRIES_PER_SEGMENT * 3, + ); + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + let tx = Transaction::storage_new_reward_claim(&keypair, Hash::default(), entry_height); + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + assert!(accounts[0].tokens == TOTAL_VALIDATOR_REWARDS); + } } diff --git a/sdk/src/storage_program.rs b/sdk/src/storage_program.rs index 4f21cd8864..be48ac258b 100644 --- a/sdk/src/storage_program.rs +++ b/sdk/src/storage_program.rs @@ -1,8 +1,46 @@ use crate::hash::Hash; use crate::pubkey::Pubkey; -use crate::signature::{Keypair, KeypairUtil, Signature}; +use crate::signature::{Keypair, Signature}; use crate::transaction::Transaction; +pub const ENTRIES_PER_SEGMENT: u64 = 16; + +pub fn get_segment_from_entry(entry_height: u64) -> usize { + (entry_height / ENTRIES_PER_SEGMENT) as usize +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum ProofStatus { + Valid, + NotValid, + Skipped, +} + +#[derive(Default, Debug, Serialize, Deserialize, Clone)] +pub struct ProofInfo { + pub id: Pubkey, + pub signature: Signature, + pub sha_state: Hash, +} + +#[derive(Default, Debug, Serialize, Deserialize, Clone)] +pub struct ValidationInfo { + pub id: Pubkey, + pub proof_mask: Vec, +} + +#[derive(Default, Debug, Serialize, Deserialize)] +pub struct StorageProgramState { + pub entry_height: u64, + pub id: Hash, + + pub proofs: Vec>, + pub previous_proofs: Vec>, + + pub lockout_validations: Vec>, + pub reward_validations: Vec>, +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub enum StorageProgram { SubmitMiningProof { @@ -10,6 +48,17 @@ pub enum StorageProgram { entry_height: u64, signature: Signature, }, + AdvertiseStorageLastId { + id: Hash, + entry_height: u64, + }, + ClaimStorageReward { + entry_height: u64, + }, + ProofValidation { + entry_height: u64, + proof_mask: Vec, + }, } pub const STORAGE_PROGRAM_ID: [u8; 32] = [ @@ -17,6 +66,11 @@ pub const STORAGE_PROGRAM_ID: [u8; 32] = [ 0, ]; +pub const STORAGE_SYSTEM_ACCOUNT_ID: [u8; 32] = [ + 133, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, +]; + pub fn check_id(program_id: &Pubkey) -> bool { program_id.as_ref() == STORAGE_PROGRAM_ID } @@ -25,6 +79,10 @@ pub fn id() -> Pubkey { Pubkey::new(&STORAGE_PROGRAM_ID) } +pub fn system_id() -> Pubkey { + Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID) +} + pub trait StorageTransaction { fn storage_new_mining_proof( from_keypair: &Keypair, @@ -33,6 +91,22 @@ pub trait StorageTransaction { entry_height: u64, signature: Signature, ) -> Self; + + fn storage_new_advertise_last_id( + from_keypair: &Keypair, + storage_last_id: Hash, + last_id: Hash, + entry_height: u64, + ) -> Self; + + fn storage_new_proof_validation( + from_keypair: &Keypair, + last_id: Hash, + entry_height: u64, + proof_mask: Vec, + ) -> Self; + + fn storage_new_reward_claim(from_keypair: &Keypair, last_id: Hash, entry_height: u64) -> Self; } impl StorageTransaction for Transaction { @@ -50,7 +124,59 @@ impl StorageTransaction for Transaction { }; Transaction::new( from_keypair, - &[from_keypair.pubkey()], + &[Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID)], + id(), + &program, + last_id, + 0, + ) + } + + fn storage_new_advertise_last_id( + from_keypair: &Keypair, + storage_id: Hash, + last_id: Hash, + entry_height: u64, + ) -> Self { + let program = StorageProgram::AdvertiseStorageLastId { + id: storage_id, + entry_height, + }; + Transaction::new( + from_keypair, + &[Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID)], + id(), + &program, + last_id, + 0, + ) + } + + fn storage_new_proof_validation( + from_keypair: &Keypair, + last_id: Hash, + entry_height: u64, + proof_mask: Vec, + ) -> Self { + let program = StorageProgram::ProofValidation { + entry_height, + proof_mask, + }; + Transaction::new( + from_keypair, + &[Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID)], + id(), + &program, + last_id, + 0, + ) + } + + fn storage_new_reward_claim(from_keypair: &Keypair, last_id: Hash, entry_height: u64) -> Self { + let program = StorageProgram::ClaimStorageReward { entry_height }; + Transaction::new( + from_keypair, + &[Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID)], id(), &program, last_id, diff --git a/src/bank.rs b/src/bank.rs index 199289b144..359ce8bbfa 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -234,6 +234,16 @@ impl Bank { self.accounts .store_slow(&storage_program::id(), &storage_program_account); + let storage_system_account = Account { + tokens: 1, + owner: storage_program::system_id(), + userdata: vec![0; 16 * 1024], + executable: false, + loader: Pubkey::default(), + }; + self.accounts + .store_slow(&storage_program::system_id(), &storage_system_account); + // Bpf Loader let bpf_loader_account = Account { tokens: 1, @@ -284,6 +294,33 @@ impl Bank { .get_pubkeys_for_entry_height(entry_height) } + pub fn get_storage_entry_height(&self) -> u64 { + match self.get_account(&storage_program::system_id()) { + Some(storage_system_account) => { + let state = deserialize(&storage_system_account.userdata); + if let Ok(state) = state { + let state: storage_program::StorageProgramState = state; + return state.entry_height; + } + } + None => { + info!("error in reading entry_height"); + } + } + 0 + } + + pub fn get_storage_last_id(&self) -> Hash { + if let Some(storage_system_account) = self.get_account(&storage_program::system_id()) { + let state = deserialize(&storage_system_account.userdata); + if let Ok(state) = state { + let state: storage_program::StorageProgramState = state; + return state.id; + } + } + Hash::default() + } + /// Forget all signatures. Useful for benchmarking. pub fn clear_signatures(&self) { self.last_ids.write().unwrap().clear_signatures(); @@ -964,6 +1001,7 @@ mod tests { use solana_sdk::native_program::ProgramError; use solana_sdk::signature::Keypair; use solana_sdk::signature::KeypairUtil; + use solana_sdk::storage_program::{StorageTransaction, ENTRIES_PER_SEGMENT}; use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::transaction::Instruction; use std; @@ -1699,6 +1737,10 @@ mod tests { 132, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ]); + let storage_system = Pubkey::new(&[ + 133, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, + ]); assert_eq!(system_program::id(), system); assert_eq!(solana_native_loader::id(), native); @@ -1707,6 +1749,7 @@ mod tests { assert_eq!(storage_program::id(), storage); assert_eq!(token_program::id(), token); assert_eq!(vote_program::id(), vote); + assert_eq!(storage_program::system_id(), storage_system); } #[test] @@ -1720,6 +1763,7 @@ mod tests { storage_program::id(), token_program::id(), vote_program::id(), + storage_program::system_id(), ]; assert!(ids.into_iter().all(move |id| unique.insert(id))); } @@ -1930,4 +1974,55 @@ mod tests { let entries = entry_receiver.recv().unwrap(); assert_eq!(entries[0].transactions.len(), transactions.len() - 1); } + + #[test] + fn test_bank_storage() { + solana_logger::setup(); + let alice = Mint::new(1000); + let bank = Bank::new(&alice); + + let bob = Keypair::new(); + let jack = Keypair::new(); + let jill = Keypair::new(); + + let x = 42; + let last_id = hash(&[x]); + let x2 = x * 2; + let storage_last_id = hash(&[x2]); + + bank.register_tick(&last_id); + + bank.transfer(10, &alice.keypair(), jill.pubkey(), last_id) + .unwrap(); + + bank.transfer(10, &alice.keypair(), bob.pubkey(), last_id) + .unwrap(); + bank.transfer(10, &alice.keypair(), jack.pubkey(), last_id) + .unwrap(); + + let tx = Transaction::storage_new_advertise_last_id( + &bob, + storage_last_id, + last_id, + ENTRIES_PER_SEGMENT, + ); + + assert!(bank.process_transaction(&tx).is_ok()); + + let entry_height = 0; + + let tx = Transaction::storage_new_mining_proof( + &jack, + Hash::default(), + last_id, + entry_height, + Signature::default(), + ); + + assert!(bank.process_transaction(&tx).is_ok()); + + assert_eq!(bank.get_storage_entry_height(), ENTRIES_PER_SEGMENT); + assert_eq!(bank.get_storage_last_id(), storage_last_id); + assert_eq!(bank.get_pubkeys_for_entry_height(0), vec![]); + } } diff --git a/src/storage_stage.rs b/src/storage_stage.rs index cd777985f1..3da9327072 100644 --- a/src/storage_stage.rs +++ b/src/storage_stage.rs @@ -4,6 +4,8 @@ #[cfg(all(feature = "chacha", feature = "cuda"))] use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys; +use crate::client::mk_client; +use crate::cluster_info::ClusterInfo; use crate::db_ledger::DbLedger; use crate::entry::EntryReceiver; use crate::result::{Error, Result}; @@ -17,12 +19,17 @@ use solana_sdk::signature::Keypair; use solana_sdk::signature::Signature; use solana_sdk::storage_program; use solana_sdk::storage_program::StorageProgram; +use solana_sdk::storage_program::StorageTransaction; +use solana_sdk::transaction::Transaction; use solana_sdk::vote_program; use std::collections::HashSet; +use std::io; use std::mem::size_of; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::RecvTimeoutError; +use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, RwLock}; +use std::thread::sleep; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; @@ -48,6 +55,7 @@ pub struct StorageState { pub struct StorageStage { t_storage_mining_verifier: JoinHandle<()>, + t_storage_create_accounts: JoinHandle<()>, } macro_rules! cross_boundary { @@ -63,6 +71,8 @@ pub const NUM_STORAGE_SAMPLES: usize = 4; pub const ENTRIES_PER_SEGMENT: u64 = 16; const KEY_SIZE: usize = 64; +type TransactionSender = Sender; + pub fn get_segment_from_entry(entry_height: u64) -> u64 { entry_height / ENTRIES_PER_SEGMENT } @@ -136,25 +146,30 @@ impl StorageStage { storage_state: &StorageState, storage_entry_receiver: EntryReceiver, db_ledger: Option>, - keypair: Arc, - exit: Arc, + keypair: &Arc, + exit: &Arc, entry_height: u64, storage_rotate_count: u64, + cluster_info: &Arc>, ) -> Self { debug!("storage_stage::new: entry_height: {}", entry_height); storage_state.state.write().unwrap().entry_height = entry_height; let storage_state_inner = storage_state.state.clone(); + let exit0 = exit.clone(); + let keypair0 = keypair.clone(); + + let (tx_sender, tx_receiver) = channel(); + let t_storage_mining_verifier = Builder::new() .name("solana-storage-mining-verify-stage".to_string()) .spawn(move || { - let exit = exit.clone(); let mut poh_height = 0; let mut current_key = 0; let mut entry_height = entry_height; loop { if let Some(ref some_db_ledger) = db_ledger { if let Err(e) = Self::process_entries( - &keypair, + &keypair0, &storage_state_inner, &storage_entry_receiver, &some_db_ledger, @@ -162,6 +177,7 @@ impl StorageStage { &mut entry_height, &mut current_key, storage_rotate_count, + &tx_sender, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -170,28 +186,105 @@ impl StorageStage { } } } - if exit.load(Ordering::Relaxed) { + if exit0.load(Ordering::Relaxed) { break; } } }) .unwrap(); + let cluster_info0 = cluster_info.clone(); + let exit1 = exit.clone(); + let keypair1 = keypair.clone(); + let t_storage_create_accounts = Builder::new() + .name("solana-storage-create-accounts".to_string()) + .spawn(move || loop { + match tx_receiver.recv_timeout(Duration::from_secs(1)) { + Ok(mut tx) => { + if Self::send_tx(&cluster_info0, &mut tx, &exit1, &keypair1, None).is_ok() { + debug!("sent tx: {:?}", tx); + } + } + Err(e) => match e { + RecvTimeoutError::Disconnected => break, + RecvTimeoutError::Timeout => (), + }, + }; + + if exit1.load(Ordering::Relaxed) { + break; + } + sleep(Duration::from_millis(100)); + }) + .unwrap(); + StorageStage { t_storage_mining_verifier, + t_storage_create_accounts, } } + fn send_tx( + cluster_info: &Arc>, + tx: &mut Transaction, + exit: &Arc, + keypair: &Arc, + account_to_create: Option, + ) -> io::Result<()> { + if let Some(leader_info) = cluster_info.read().unwrap().leader_data() { + let mut client = mk_client(leader_info); + + if let Some(account) = account_to_create { + if client.get_account_userdata(&account).is_ok() { + return Ok(()); + } + } + + let last_id = client.get_last_id(); + + tx.sign(&[&keypair], last_id); + + if exit.load(Ordering::Relaxed) { + Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; + } + + if let Ok(signature) = client.transfer_signed(&tx) { + for _ in 0..10 { + if client.check_signature(&signature) { + return Ok(()); + } + + if exit.load(Ordering::Relaxed) { + Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; + } + + sleep(Duration::from_millis(200)); + } + } + } + + Err(io::Error::new(io::ErrorKind::Other, "leader not found")) + } + pub fn process_entry_crossing( state: &Arc>, keypair: &Arc, _db_ledger: &Arc, entry_id: Hash, entry_height: u64, + tx_sender: &TransactionSender, ) -> Result<()> { let mut seed = [0u8; 32]; let signature = keypair.sign(&entry_id.as_ref()); + let tx = Transaction::storage_new_advertise_last_id( + keypair, + entry_id, + Hash::default(), + entry_height, + ); + tx_sender.send(tx)?; + seed.copy_from_slice(&signature.as_ref()[..32]); let mut rng = ChaChaRng::from_seed(seed); @@ -259,6 +352,7 @@ impl StorageStage { entry_height: &mut u64, current_key_idx: &mut usize, storage_rotate_count: u64, + tx_sender: &TransactionSender, ) -> Result<()> { let timeout = Duration::new(1, 0); let entries = entry_receiver.recv_timeout(timeout)?; @@ -302,6 +396,7 @@ impl StorageStage { } debug!("storage proof: entry_height: {}", entry_height); } + Ok(_) => {} Err(e) => { info!("error: {:?}", e); } @@ -320,6 +415,7 @@ impl StorageStage { &db_ledger, entry.id, *entry_height, + tx_sender, )?; } *entry_height += 1; @@ -333,6 +429,7 @@ impl Service for StorageStage { type JoinReturnType = (); fn join(self) -> thread::Result<()> { + self.t_storage_create_accounts.join().unwrap(); self.t_storage_mining_verifier.join() } } @@ -343,6 +440,7 @@ mod tests { use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT}; use crate::entry::{make_tiny_test_entries, Entry}; + use crate::cluster_info::{ClusterInfo, NodeInfo}; use crate::service::Service; use crate::storage_stage::StorageState; use crate::storage_stage::NUM_IDENTITIES; @@ -352,6 +450,7 @@ mod tests { use rayon::prelude::*; use solana_sdk::hash::Hash; use solana_sdk::hash::Hasher; + use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::transaction::Transaction; use solana_sdk::vote_program::Vote; @@ -360,7 +459,7 @@ mod tests { use std::fs::remove_dir_all; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::channel; - use std::sync::Arc; + use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; @@ -369,21 +468,30 @@ mod tests { let keypair = Arc::new(Keypair::new()); let exit = Arc::new(AtomicBool::new(false)); + let cluster_info = test_cluster_info(keypair.pubkey()); + let (_storage_entry_sender, storage_entry_receiver) = channel(); let storage_state = StorageState::new(); let storage_stage = StorageStage::new( &storage_state, storage_entry_receiver, None, - keypair, - exit.clone(), + &keypair, + &exit.clone(), 0, STORAGE_ROTATE_TEST_COUNT, + &cluster_info, ); exit.store(true, Ordering::Relaxed); storage_stage.join().unwrap(); } + fn test_cluster_info(id: Pubkey) -> Arc> { + let node_info = NodeInfo::new_localhost(id, 0); + let cluster_info = ClusterInfo::new(node_info); + Arc::new(RwLock::new(cluster_info)) + } + #[test] fn test_storage_stage_process_entries() { solana_logger::setup(); @@ -404,16 +512,19 @@ mod tests { .write_entries(DEFAULT_SLOT_HEIGHT, genesis_entries.len() as u64, &entries) .unwrap(); + let cluster_info = test_cluster_info(keypair.pubkey()); + let (storage_entry_sender, storage_entry_receiver) = channel(); let storage_state = StorageState::new(); let storage_stage = StorageStage::new( &storage_state, storage_entry_receiver, Some(Arc::new(db_ledger)), - keypair, - exit.clone(), + &keypair, + &exit.clone(), 0, STORAGE_ROTATE_TEST_COUNT, + &cluster_info, ); storage_entry_sender.send(entries.clone()).unwrap(); @@ -469,16 +580,19 @@ mod tests { .write_entries(DEFAULT_SLOT_HEIGHT, genesis_entries.len() as u64, &entries) .unwrap(); + let cluster_info = test_cluster_info(keypair.pubkey()); + let (storage_entry_sender, storage_entry_receiver) = channel(); let storage_state = StorageState::new(); let storage_stage = StorageStage::new( &storage_state, storage_entry_receiver, Some(Arc::new(db_ledger)), - keypair, - exit.clone(), + &keypair, + &exit.clone(), 0, STORAGE_ROTATE_TEST_COUNT, + &cluster_info, ); storage_entry_sender.send(entries.clone()).unwrap(); diff --git a/src/tvu.rs b/src/tvu.rs index 0e3ddf303b..7f85801636 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -117,10 +117,11 @@ impl Tvu { &bank.storage_state, ledger_entry_receiver, Some(db_ledger), - keypair, - exit.clone(), + &keypair, + &exit.clone(), entry_height, storage_rotate_count, + &cluster_info, ); Tvu {