diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index 6161aa8464..5d6ea42aac 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -58,7 +58,7 @@ pub struct ClusterConfig { pub fullnode_config: FullnodeConfig, /// Number of replicators in the cluster /// Note- replicators will timeout if ticks_per_slot is much larger than the default 8 - pub num_replicators: u64, + pub num_replicators: usize, /// Number of nodes that are unstaked and not voting (a.k.a listening) pub num_listeners: u64, /// The stakes of each node diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 95878a89ce..feb64d355c 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -141,82 +141,85 @@ impl StorageStage { storage_rotate_count: u64, cluster_info: &Arc>, ) -> Self { - let storage_state_inner = storage_state.state.clone(); - let exit0 = exit.clone(); - let keypair0 = storage_keypair.clone(); - let (instruction_sender, instruction_receiver) = channel(); - let t_storage_mining_verifier = Builder::new() - .name("solana-storage-mining-verify-stage".to_string()) - .spawn(move || { - let mut current_key = 0; - let mut slot_count = 0; - loop { - if let Some(ref some_blocktree) = blocktree { - if let Err(e) = Self::process_entries( - &keypair0, - &storage_state_inner, - &slot_receiver, - &some_blocktree, - &mut slot_count, - &mut current_key, - storage_rotate_count, - &instruction_sender, - ) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => info!("Error from process_entries: {:?}", e), + let t_storage_mining_verifier = { + let storage_state_inner = storage_state.state.clone(); + let exit = exit.clone(); + let storage_keypair = storage_keypair.clone(); + Builder::new() + .name("solana-storage-mining-verify-stage".to_string()) + .spawn(move || { + let mut current_key = 0; + let mut slot_count = 0; + loop { + if let Some(ref some_blocktree) = blocktree { + if let Err(e) = Self::process_entries( + &storage_keypair, + &storage_state_inner, + &slot_receiver, + &some_blocktree, + &mut slot_count, + &mut current_key, + storage_rotate_count, + &instruction_sender, + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { + break + } + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => info!("Error from process_entries: {:?}", e), + } } } - } - if exit0.load(Ordering::Relaxed) { - break; - } - } - }) - .unwrap(); - - let cluster_info0 = cluster_info.clone(); - let exit1 = exit.clone(); - let keypair1 = keypair.clone(); - let storage_keypair1 = storage_keypair.clone(); - let bank_forks1 = bank_forks.clone(); - let t_storage_create_accounts = Builder::new() - .name("solana-storage-create-accounts".to_string()) - .spawn(move || { - let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - loop { - match instruction_receiver.recv_timeout(Duration::from_secs(1)) { - Ok(instruction) => { - if Self::send_transaction( - &bank_forks1, - &cluster_info0, - instruction, - &keypair1, - &storage_keypair1, - Some(storage_keypair1.pubkey()), - &transactions_socket, - ) - .is_err() - { - debug!("Failed to send storage transaction"); - } + if exit.load(Ordering::Relaxed) { + break; } - Err(e) => match e { - RecvTimeoutError::Disconnected => break, - RecvTimeoutError::Timeout => (), - }, - }; - - if exit1.load(Ordering::Relaxed) { - break; } - sleep(Duration::from_millis(100)); - } - }) - .unwrap(); + }) + .unwrap() + }; + + let t_storage_create_accounts = { + let cluster_info = cluster_info.clone(); + let exit = exit.clone(); + let keypair = keypair.clone(); + let storage_keypair = storage_keypair.clone(); + let bank_forks = bank_forks.clone(); + Builder::new() + .name("solana-storage-create-accounts".to_string()) + .spawn(move || { + let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + loop { + match instruction_receiver.recv_timeout(Duration::from_secs(1)) { + Ok(instruction) => { + Self::send_transaction( + &bank_forks, + &cluster_info, + instruction, + &keypair, + &storage_keypair, + &transactions_socket, + ) + .unwrap_or_else(|err| { + info!("failed to send storage transaction: {:?}", err) + }); + } + Err(e) => match e { + RecvTimeoutError::Disconnected => break, + RecvTimeoutError::Timeout => (), + }, + }; + + if exit.load(Ordering::Relaxed) { + break; + } + sleep(Duration::from_millis(100)); + } + }) + .unwrap() + }; StorageStage { t_storage_mining_verifier, @@ -230,27 +233,27 @@ impl StorageStage { instruction: Instruction, keypair: &Arc, storage_keypair: &Arc, - account_to_create: Option, transactions_socket: &UdpSocket, ) -> io::Result<()> { let working_bank = bank_forks.read().unwrap().working_bank(); let blockhash = working_bank.confirmed_last_blockhash(); let mut instructions = vec![]; let mut signing_keys = vec![]; - if let Some(account) = account_to_create { - if working_bank.get_account(&account).is_none() { - // TODO the account space needs to be well defined somewhere - let create_instruction = system_instruction::create_account( - &keypair.pubkey(), - &storage_keypair.pubkey(), - 1, - 1024 * 4, - &solana_storage_api::id(), - ); - instructions.push(create_instruction); - signing_keys.push(keypair.as_ref()); - info!("storage account requested"); - } + if working_bank + .get_account(&storage_keypair.pubkey()) + .is_none() + { + // TODO the account space needs to be well defined somewhere + let create_instruction = system_instruction::create_account( + &keypair.pubkey(), + &storage_keypair.pubkey(), + 1000, + 1024 * 4, + &solana_storage_api::id(), + ); + instructions.push(create_instruction); + signing_keys.push(keypair.as_ref()); + info!("storage account requested"); } instructions.push(instruction); signing_keys.push(storage_keypair.as_ref()); @@ -264,17 +267,21 @@ impl StorageStage { } fn process_entry_crossing( + storage_keypair: &Arc, state: &Arc>, - keypair: &Arc, _blocktree: &Arc, entry_id: Hash, slot: u64, instruction_sender: &InstructionSender, ) -> Result<()> { let mut seed = [0u8; 32]; - let signature = keypair.sign(&entry_id.as_ref()); + let signature = storage_keypair.sign(&entry_id.as_ref()); - let ix = storage_instruction::advertise_recent_blockhash(&keypair.pubkey(), entry_id, slot); + let ix = storage_instruction::advertise_recent_blockhash( + &storage_keypair.pubkey(), + entry_id, + slot, + ); instruction_sender.send(ix)?; seed.copy_from_slice(&signature.to_bytes()[..32]); @@ -383,7 +390,7 @@ impl StorageStage { } fn process_entries( - keypair: &Arc, + storage_keypair: &Arc, storage_state: &Arc>, slot_receiver: &Receiver, blocktree: &Arc, @@ -419,8 +426,8 @@ impl StorageStage { slot, entry.num_hashes ); Self::process_entry_crossing( + &storage_keypair, &storage_state, - &keypair, &blocktree, entry.hash, slot, diff --git a/core/tests/replicator.rs b/core/tests/replicator.rs index 1cab706ecb..12bb565ef6 100644 --- a/core/tests/replicator.rs +++ b/core/tests/replicator.rs @@ -107,7 +107,7 @@ fn run_replicator_startup_basic(num_nodes: usize, num_replicators: usize) { fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT; let config = ClusterConfig { fullnode_config, - num_replicators: num_replicators as u64, + num_replicators, node_stakes: vec![100; num_nodes], cluster_lamports: 10_000, ..ClusterConfig::default() diff --git a/core/tests/tvu.rs b/core/tests/tvu.rs index f5cb836634..bf1d38f973 100644 --- a/core/tests/tvu.rs +++ b/core/tests/tvu.rs @@ -85,8 +85,11 @@ fn test_replay() { let (bank_forks, _bank_forks_info, blocktree, ledger_signal_receiver, leader_schedule_cache) = fullnode::new_banks_from_blocktree(&blocktree_path, None); - let bank = bank_forks.working_bank(); - assert_eq!(bank.get_balance(&mint_keypair.pubkey()), mint_balance); + let working_bank = bank_forks.working_bank(); + assert_eq!( + working_bank.get_balance(&mint_keypair.pubkey()), + mint_balance + ); let leader_schedule_cache = Arc::new(leader_schedule_cache); // start cluster_info1 @@ -100,7 +103,7 @@ fn test_replay() { let blocktree = Arc::new(blocktree); { let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) = - create_test_recorder(&bank, &blocktree); + create_test_recorder(&working_bank, &blocktree); let tvu = Tvu::new( &voting_keypair.pubkey(), Some(Arc::new(voting_keypair)),