diff --git a/cli/src/program.rs b/cli/src/program.rs index 0ac855386d..7b0974f8a9 100644 --- a/cli/src/program.rs +++ b/cli/src/program.rs @@ -40,8 +40,16 @@ use solana_sdk::{ }; use solana_transaction_status::TransactionConfirmationStatus; use std::{ - cmp::min, collections::HashMap, error, fs::File, io::Read, net::UdpSocket, path::PathBuf, - sync::Arc, thread::sleep, time::Duration, + cmp::min, + collections::HashMap, + error, + fs::File, + io::Read, + net::UdpSocket, + path::PathBuf, + sync::Arc, + thread::sleep, + time::{Duration, Instant}, }; const DATA_CHUNK_SIZE: usize = 229; // Keep program chunks under PACKET_DATA_SIZE @@ -1493,15 +1501,16 @@ fn send_and_confirm_transactions_with_spinner( let cluster_nodes = rpc_client.get_cluster_nodes().ok(); loop { - let mut status_retries = 15; - progress_bar.set_message("Finding leader node..."); let epoch_info = rpc_client.get_epoch_info()?; + let mut slot = epoch_info.absolute_slot; + let mut last_epoch_fetch = Instant::now(); if epoch_info.epoch > leader_schedule_epoch || leader_schedule.is_none() { leader_schedule = rpc_client.get_leader_schedule(Some(epoch_info.absolute_slot))?; leader_schedule_epoch = epoch_info.epoch; } - let tpu_address = get_leader_tpu( + + let mut tpu_address = get_leader_tpu( min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch), leader_schedule.as_ref(), cluster_nodes.as_ref(), @@ -1527,52 +1536,61 @@ fn send_and_confirm_transactions_with_spinner( .ok(); } pending_transactions.insert(transaction.signatures[0], transaction); - progress_bar.set_message(&format!( - "[{}/{}] Total Transactions sent", + "[{}/{}] Transactions sent", pending_transactions.len(), num_transactions )); + + // Throttle transactions to about 100 TPS + sleep(Duration::from_millis(10)); + + // Update leader periodically + if last_epoch_fetch.elapsed() > Duration::from_millis(400) { + let epoch_info = rpc_client.get_epoch_info()?; + last_epoch_fetch = Instant::now(); + tpu_address = get_leader_tpu( + min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch), + leader_schedule.as_ref(), + cluster_nodes.as_ref(), + ); + } } // Collect statuses for all the transactions, drop those that are confirmed - while status_retries > 0 { - status_retries -= 1; - - progress_bar.set_message(&format!( - "[{}/{}] Transactions confirmed", - num_transactions - pending_transactions.len(), - num_transactions - )); - - let mut statuses = vec![]; + loop { let pending_signatures = pending_transactions.keys().cloned().collect::>(); for pending_signatures_chunk in - pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS - 1) + pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS) { - statuses.extend( - rpc_client - .get_signature_statuses_with_history(pending_signatures_chunk)? - .value - .into_iter(), - ); - } - assert_eq!(statuses.len(), pending_signatures.len()); - - for (signature, status) in pending_signatures.into_iter().zip(statuses.into_iter()) { - if let Some(status) = status { - if let Some(confirmation_status) = &status.confirmation_status { - if *confirmation_status != TransactionConfirmationStatus::Processed { - let _ = pending_transactions.remove(&signature); + if let Ok(result) = + rpc_client.get_signature_statuses_with_history(pending_signatures_chunk) + { + let statuses = result.value; + for (signature, status) in + pending_signatures_chunk.iter().zip(statuses.into_iter()) + { + if let Some(status) = status { + if let Some(confirmation_status) = &status.confirmation_status { + if *confirmation_status != TransactionConfirmationStatus::Processed + { + let _ = pending_transactions.remove(signature); + } + } else if status.confirmations.is_none() + || status.confirmations.unwrap() > 1 + { + let _ = pending_transactions.remove(signature); + } } - } else if status.confirmations.is_none() || status.confirmations.unwrap() > 1 { - let _ = pending_transactions.remove(&signature); } } + + slot = rpc_client.get_slot()?; progress_bar.set_message(&format!( - "[{}/{}] Transactions confirmed", + "[{}/{}] Transactions confirmed. Retrying in {} slots", num_transactions - pending_transactions.len(), - num_transactions + num_transactions, + last_valid_slot.saturating_sub(slot) )); } @@ -1580,11 +1598,35 @@ fn send_and_confirm_transactions_with_spinner( return Ok(()); } - let slot = rpc_client.get_slot()?; if slot > last_valid_slot { break; } + let epoch_info = rpc_client.get_epoch_info()?; + tpu_address = get_leader_tpu( + min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch), + leader_schedule.as_ref(), + cluster_nodes.as_ref(), + ); + + for transaction in pending_transactions.values() { + if let Some(tpu_address) = tpu_address { + let wire_transaction = + serialize(transaction).expect("serialization should succeed"); + send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction); + } else { + let _result = rpc_client + .send_transaction_with_config( + transaction, + RpcSendTransactionConfig { + preflight_commitment: Some(commitment.commitment), + ..RpcSendTransactionConfig::default() + }, + ) + .ok(); + } + } + if cfg!(not(test)) { // Retry twice a second sleep(Duration::from_millis(500));