diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 46625292b8..de5d6e07c2 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -2,27 +2,28 @@ // for storage mining. Replicators submit storage proofs, validator then bundles them // to submit its proof for mining to be rewarded. +use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; #[cfg(all(feature = "chacha", feature = "cuda"))] use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys; -use crate::cluster_info::{ClusterInfo, FULLNODE_PORT_RANGE}; +use crate::cluster_info::ClusterInfo; use crate::entry::{Entry, EntryReceiver}; use crate::result::{Error, Result}; use crate::service::Service; use bincode::deserialize; use rand::{Rng, SeedableRng}; use rand_chacha::ChaChaRng; -use solana_client::thin_client::{create_client_with_timeout, ThinClient}; -use solana_sdk::client::{AsyncClient, SyncClient}; use solana_sdk::hash::Hash; +use solana_sdk::instruction::Instruction; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; -use solana_sdk::system_transaction; +use solana_sdk::system_instruction; use solana_sdk::transaction::Transaction; use solana_storage_api::storage_instruction::{self, StorageInstruction}; use std::collections::HashSet; use std::io; use std::mem::size_of; +use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, RecvTimeoutError, Sender}; use std::sync::{Arc, RwLock}; @@ -67,7 +68,7 @@ pub const NUM_STORAGE_SAMPLES: usize = 4; pub const ENTRIES_PER_SEGMENT: u64 = 16; const KEY_SIZE: usize = 64; -type TransactionSender = Sender; +type InstructionSender = Sender; pub fn get_segment_from_entry(entry_height: u64) -> u64 { entry_height / ENTRIES_PER_SEGMENT @@ -138,6 +139,7 @@ impl StorageState { } impl StorageStage { + #[allow(clippy::too_many_arguments)] pub fn new( storage_state: &StorageState, storage_entry_receiver: EntryReceiver, @@ -146,6 +148,7 @@ impl StorageStage { storage_keypair: &Arc, exit: &Arc, entry_height: u64, + bank_forks: &Arc>, storage_rotate_count: u64, cluster_info: &Arc>, ) -> Self { @@ -155,7 +158,7 @@ impl StorageStage { let exit0 = exit.clone(); let keypair0 = storage_keypair.clone(); - let (tx_sender, tx_receiver) = channel(); + let (instruction_sender, instruction_receiver) = channel(); let t_storage_mining_verifier = Builder::new() .name("solana-storage-mining-verify-stage".to_string()) @@ -174,7 +177,7 @@ impl StorageStage { &mut entry_height, &mut current_key, storage_rotate_count, - &tx_sender, + &instruction_sender, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -194,34 +197,39 @@ impl StorageStage { let exit1 = exit.clone(); let keypair1 = keypair.clone(); let storage_keypair1 = storage_keypair.clone(); + let bank_forks1 = bank_forks.clone(); let t_storage_create_accounts = Builder::new() .name("solana-storage-create-accounts".to_string()) - .spawn(move || loop { - match tx_receiver.recv_timeout(Duration::from_secs(1)) { - Ok(mut tx) => { - if Self::send_transaction( - &cluster_info0, - &mut tx, - &exit1, - &keypair1, - &storage_keypair1, - Some(storage_keypair1.pubkey()), - ) - .is_ok() - { - debug!("sent transaction: {:?}", tx); + .spawn(move || { + let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + loop { + match instruction_receiver.recv_timeout(Duration::from_secs(1)) { + Ok(instruction) => { + if Self::send_transaction( + &bank_forks1, + &cluster_info0, + instruction, + &keypair1, + &storage_keypair1, + Some(storage_keypair1.pubkey()), + &transactions_socket, + ) + .is_err() + { + debug!("Failed to send storage transaction"); + } } - } - Err(e) => match e { - RecvTimeoutError::Disconnected => break, - RecvTimeoutError::Timeout => (), - }, - }; + Err(e) => match e { + RecvTimeoutError::Disconnected => break, + RecvTimeoutError::Timeout => (), + }, + }; - if exit1.load(Ordering::Relaxed) { - break; + if exit1.load(Ordering::Relaxed) { + break; + } + sleep(Duration::from_millis(100)); } - sleep(Duration::from_millis(100)); }) .unwrap(); @@ -231,81 +239,43 @@ impl StorageStage { } } - fn check_signature( - client: &ThinClient, - signature: &Signature, - exit: &Arc, - ) -> io::Result<()> { - for _ in 0..10 { - if client.check_signature(&signature) { - return Ok(()); - } - - if exit.load(Ordering::Relaxed) { - Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; - } - - sleep(Duration::from_millis(200)); - } - Err(io::Error::new(io::ErrorKind::Other, "other failure")) - } - fn send_transaction( + bank_forks: &Arc>, cluster_info: &Arc>, - transaction: &mut Transaction, - exit: &Arc, + instruction: Instruction, keypair: &Arc, storage_keypair: &Arc, account_to_create: Option, + transactions_socket: &UdpSocket, ) -> io::Result<()> { - let contact_info = cluster_info.read().unwrap().my_data(); - let client = create_client_with_timeout( - contact_info.client_facing_addr(), - FULLNODE_PORT_RANGE, - Duration::from_secs(5), - ); - - let mut blockhash = None; - for _ in 0..10 { - if let Ok(new_blockhash) = client.get_recent_blockhash() { - blockhash = Some(new_blockhash); - break; - } - - if exit.load(Ordering::Relaxed) { - Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; + let working_bank = bank_forks.read().unwrap().working_bank(); + let blockhash = working_bank.confirmed_last_blockhash(); + let mut instructions = vec![]; + let mut signing_keys = vec![]; + if let Some(account) = account_to_create { + if working_bank.get_account(&account).is_none() { + // TODO the account space needs to be well defined somewhere + let create_instruction = system_instruction::create_account( + &keypair.pubkey(), + &storage_keypair.pubkey(), + 1, + 1024 * 4, + &solana_storage_api::id(), + ); + instructions.push(create_instruction); + signing_keys.push(keypair.as_ref()); + info!("storage account requested"); } } - if let Some(blockhash) = blockhash { - if let Some(account) = account_to_create { - if client.get_account_data(&account).is_err() { - // TODO the account space needs to be well defined somewhere - let tx = system_transaction::create_account( - keypair, - &storage_keypair.pubkey(), - blockhash, - 1, - 1024 * 4, - &solana_storage_api::id(), - 0, - ); - let signature = client.async_send_transaction(tx).unwrap(); - Self::check_signature(&client, &signature, &exit)?; - } - } - transaction.sign(&[storage_keypair.as_ref()], blockhash); - - if exit.load(Ordering::Relaxed) { - Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; - } - - if let Ok(signature) = client.async_send_transaction(transaction.clone()) { - Self::check_signature(&client, &signature, &exit)?; - return Ok(()); - } - } - - Err(io::Error::new(io::ErrorKind::Other, "other failure")) + instructions.push(instruction); + signing_keys.push(storage_keypair.as_ref()); + let mut transaction = Transaction::new_unsigned_instructions(instructions); + transaction.sign(&signing_keys, blockhash); + transactions_socket.send_to( + &bincode::serialize(&transaction).unwrap(), + cluster_info.read().unwrap().my_data().tpu, + )?; + Ok(()) } fn process_entry_crossing( @@ -314,7 +284,7 @@ impl StorageStage { _blocktree: &Arc, entry_id: Hash, entry_height: u64, - tx_sender: &TransactionSender, + instruction_sender: &InstructionSender, ) -> Result<()> { let mut seed = [0u8; 32]; let signature = keypair.sign(&entry_id.as_ref()); @@ -324,8 +294,7 @@ impl StorageStage { entry_id, entry_height, ); - let tx = Transaction::new_unsigned_instructions(vec![ix]); - tx_sender.send(tx)?; + instruction_sender.send(ix)?; seed.copy_from_slice(&signature.to_bytes()[..32]); @@ -394,7 +363,7 @@ impl StorageStage { entry_height: &mut u64, current_key_idx: &mut usize, storage_rotate_count: u64, - tx_sender: &TransactionSender, + instruction_sender: &InstructionSender, ) -> Result<()> { let timeout = Duration::new(1, 0); let entries: Vec = entry_receiver.recv_timeout(timeout)?; @@ -465,7 +434,7 @@ impl StorageStage { &blocktree, entry.hash, *entry_height, - tx_sender, + instruction_sender, )?; } *entry_height += 1; @@ -493,6 +462,7 @@ mod tests { use crate::entry::{make_tiny_test_entries, Entry}; use crate::service::Service; use rayon::prelude::*; + use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::{Hash, Hasher}; use solana_sdk::pubkey::Pubkey; @@ -512,7 +482,9 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let cluster_info = test_cluster_info(&keypair.pubkey()); - + let (genesis_block, _mint_keypair) = GenesisBlock::new(1000); + let bank = Arc::new(Bank::new(&genesis_block)); + let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank]))); let (_storage_entry_sender, storage_entry_receiver) = channel(); let storage_state = StorageState::new(); let storage_stage = StorageStage::new( @@ -523,6 +495,7 @@ mod tests { &storage_keypair, &exit.clone(), 0, + &bank_forks, STORAGE_ROTATE_TEST_COUNT, &cluster_info, ); @@ -549,6 +522,8 @@ mod tests { let entries = make_tiny_test_entries(64); let blocktree = Blocktree::open(&ledger_path).unwrap(); + let bank = Arc::new(Bank::new(&genesis_block)); + let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank]))); blocktree .write_entries(1, 0, 0, ticks_per_slot, &entries) .unwrap(); @@ -565,6 +540,7 @@ mod tests { &storage_keypair, &exit.clone(), 0, + &bank_forks, STORAGE_ROTATE_TEST_COUNT, &cluster_info, ); @@ -618,7 +594,8 @@ mod tests { blocktree .write_entries(1, 0, 0, ticks_per_slot, &entries) .unwrap(); - + let bank = Arc::new(Bank::new(&genesis_block)); + let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank]))); let cluster_info = test_cluster_info(&keypair.pubkey()); let (storage_entry_sender, storage_entry_receiver) = channel(); @@ -631,6 +608,7 @@ mod tests { &storage_keypair, &exit.clone(), 0, + &bank_forks, STORAGE_ROTATE_TEST_COUNT, &cluster_info, ); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 91033d2cbf..d1b11f1a4c 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -151,6 +151,7 @@ impl Tvu { &storage_keypair, &exit, bank_forks_info[0].entry_height, // TODO: StorageStage needs to deal with BankForks somehow still + &bank_forks, storage_rotate_count, &cluster_info, );