* tpu-client: Move `send_messages_with_spinner` from program (#20960)
We have too many ways of sending transactions, and too many
reimplementations of the same logic all over the place.
The program deploy logic and stake-o-matic currently make the
most use of the TPU client, so this merges their implementations into
one place to be reused by both. Yay for consolidation!
(cherry picked from commit 5f7b60576f
)
# Conflicts:
# cli/src/program.rs
# client/src/mock_sender.rs
* Fix merge issues, use older APIs
* Update mock sender fee to match block height
Co-authored-by: Jon Cinque <jon.cinque@gmail.com>
This commit is contained in:
@ -12,9 +12,9 @@ use solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig};
|
|||||||
use solana_bpf_loader_program::{bpf_verifier, BpfError, ThisInstructionMeter};
|
use solana_bpf_loader_program::{bpf_verifier, BpfError, ThisInstructionMeter};
|
||||||
use solana_clap_utils::{self, input_parsers::*, input_validators::*, keypair::*};
|
use solana_clap_utils::{self, input_parsers::*, input_validators::*, keypair::*};
|
||||||
use solana_cli_output::{
|
use solana_cli_output::{
|
||||||
display::new_spinner_progress_bar, CliProgram, CliProgramAccountType, CliProgramAuthority,
|
CliProgram, CliProgramAccountType, CliProgramAuthority, CliProgramBuffer, CliProgramId,
|
||||||
CliProgramBuffer, CliProgramId, CliUpgradeableBuffer, CliUpgradeableBuffers,
|
CliUpgradeableBuffer, CliUpgradeableBuffers, CliUpgradeableProgram,
|
||||||
CliUpgradeableProgram, CliUpgradeableProgramClosed, CliUpgradeablePrograms,
|
CliUpgradeableProgramClosed, CliUpgradeablePrograms,
|
||||||
};
|
};
|
||||||
use solana_client::{
|
use solana_client::{
|
||||||
client_error::ClientErrorKind,
|
client_error::ClientErrorKind,
|
||||||
@ -22,8 +22,6 @@ use solana_client::{
|
|||||||
rpc_config::RpcSendTransactionConfig,
|
rpc_config::RpcSendTransactionConfig,
|
||||||
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
|
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
|
||||||
rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType},
|
rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType},
|
||||||
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
|
|
||||||
rpc_response::Fees,
|
|
||||||
tpu_client::{TpuClient, TpuClientConfig},
|
tpu_client::{TpuClient, TpuClientConfig},
|
||||||
};
|
};
|
||||||
use solana_rbpf::vm::{Config, Executable};
|
use solana_rbpf::vm::{Config, Executable};
|
||||||
@ -33,7 +31,6 @@ use solana_sdk::{
|
|||||||
account_utils::StateMut,
|
account_utils::StateMut,
|
||||||
bpf_loader, bpf_loader_deprecated,
|
bpf_loader, bpf_loader_deprecated,
|
||||||
bpf_loader_upgradeable::{self, UpgradeableLoaderState},
|
bpf_loader_upgradeable::{self, UpgradeableLoaderState},
|
||||||
commitment_config::CommitmentConfig,
|
|
||||||
instruction::Instruction,
|
instruction::Instruction,
|
||||||
instruction::InstructionError,
|
instruction::InstructionError,
|
||||||
loader_instruction,
|
loader_instruction,
|
||||||
@ -42,24 +39,18 @@ use solana_sdk::{
|
|||||||
packet::PACKET_DATA_SIZE,
|
packet::PACKET_DATA_SIZE,
|
||||||
pubkey::Pubkey,
|
pubkey::Pubkey,
|
||||||
signature::{keypair_from_seed, read_keypair_file, Keypair, Signature, Signer},
|
signature::{keypair_from_seed, read_keypair_file, Keypair, Signature, Signer},
|
||||||
signers::Signers,
|
|
||||||
system_instruction::{self, SystemError},
|
system_instruction::{self, SystemError},
|
||||||
system_program,
|
system_program,
|
||||||
transaction::Transaction,
|
transaction::Transaction,
|
||||||
transaction::TransactionError,
|
transaction::TransactionError,
|
||||||
};
|
};
|
||||||
use solana_transaction_status::TransactionConfirmationStatus;
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
|
||||||
error,
|
|
||||||
fs::File,
|
fs::File,
|
||||||
io::{Read, Write},
|
io::{Read, Write},
|
||||||
mem::size_of,
|
mem::size_of,
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
str::FromStr,
|
str::FromStr,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
thread::sleep,
|
|
||||||
time::Duration,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq)]
|
||||||
@ -2116,29 +2107,29 @@ fn send_deploy_messages(
|
|||||||
if let Some(write_messages) = write_messages {
|
if let Some(write_messages) = write_messages {
|
||||||
if let Some(write_signer) = write_signer {
|
if let Some(write_signer) = write_signer {
|
||||||
trace!("Writing program data");
|
trace!("Writing program data");
|
||||||
let Fees {
|
let tpu_client = TpuClient::new(
|
||||||
blockhash,
|
|
||||||
last_valid_block_height,
|
|
||||||
..
|
|
||||||
} = rpc_client
|
|
||||||
.get_fees_with_commitment(config.commitment)?
|
|
||||||
.value;
|
|
||||||
let mut write_transactions = vec![];
|
|
||||||
for message in write_messages.iter() {
|
|
||||||
let mut tx = Transaction::new_unsigned(message.clone());
|
|
||||||
tx.try_sign(&[payer_signer, write_signer], blockhash)?;
|
|
||||||
write_transactions.push(tx);
|
|
||||||
}
|
|
||||||
|
|
||||||
send_and_confirm_transactions_with_spinner(
|
|
||||||
rpc_client.clone(),
|
rpc_client.clone(),
|
||||||
&config.websocket_url,
|
&config.websocket_url,
|
||||||
write_transactions,
|
TpuClientConfig::default(),
|
||||||
&[payer_signer, write_signer],
|
)?;
|
||||||
config.commitment,
|
let transaction_errors = tpu_client
|
||||||
last_valid_block_height,
|
.send_and_confirm_messages_with_spinner(
|
||||||
)
|
write_messages,
|
||||||
.map_err(|err| format!("Data writes to account failed: {}", err))?;
|
&[payer_signer, write_signer],
|
||||||
|
)
|
||||||
|
.map_err(|err| format!("Data writes to account failed: {}", err))?
|
||||||
|
.into_iter()
|
||||||
|
.flatten()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
if !transaction_errors.is_empty() {
|
||||||
|
for transaction_error in &transaction_errors {
|
||||||
|
error!("{:?}", transaction_error);
|
||||||
|
}
|
||||||
|
return Err(
|
||||||
|
format!("{} write transactions failed", transaction_errors.len()).into(),
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2199,134 +2190,6 @@ fn report_ephemeral_mnemonic(words: usize, mnemonic: bip39::Mnemonic) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_and_confirm_transactions_with_spinner<T: Signers>(
|
|
||||||
rpc_client: Arc<RpcClient>,
|
|
||||||
websocket_url: &str,
|
|
||||||
mut transactions: Vec<Transaction>,
|
|
||||||
signer_keys: &T,
|
|
||||||
commitment: CommitmentConfig,
|
|
||||||
mut last_valid_block_height: u64,
|
|
||||||
) -> Result<(), Box<dyn error::Error>> {
|
|
||||||
let progress_bar = new_spinner_progress_bar();
|
|
||||||
let mut send_retries = 5;
|
|
||||||
|
|
||||||
progress_bar.set_message("Finding leader nodes...");
|
|
||||||
let tpu_client = TpuClient::new(
|
|
||||||
rpc_client.clone(),
|
|
||||||
websocket_url,
|
|
||||||
TpuClientConfig::default(),
|
|
||||||
)?;
|
|
||||||
loop {
|
|
||||||
// Send all transactions
|
|
||||||
let mut pending_transactions = HashMap::new();
|
|
||||||
let num_transactions = transactions.len();
|
|
||||||
for transaction in transactions {
|
|
||||||
if !tpu_client.send_transaction(&transaction) {
|
|
||||||
let _result = rpc_client
|
|
||||||
.send_transaction_with_config(
|
|
||||||
&transaction,
|
|
||||||
RpcSendTransactionConfig {
|
|
||||||
preflight_commitment: Some(commitment.commitment),
|
|
||||||
..RpcSendTransactionConfig::default()
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.ok();
|
|
||||||
}
|
|
||||||
pending_transactions.insert(transaction.signatures[0], transaction);
|
|
||||||
progress_bar.set_message(&format!(
|
|
||||||
"[{}/{}] Transactions sent",
|
|
||||||
pending_transactions.len(),
|
|
||||||
num_transactions
|
|
||||||
));
|
|
||||||
|
|
||||||
// Throttle transactions to about 100 TPS
|
|
||||||
sleep(Duration::from_millis(10));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect statuses for all the transactions, drop those that are confirmed
|
|
||||||
loop {
|
|
||||||
let mut block_height = 0;
|
|
||||||
let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
|
|
||||||
for pending_signatures_chunk in
|
|
||||||
pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
|
|
||||||
{
|
|
||||||
if let Ok(result) = rpc_client.get_signature_statuses(pending_signatures_chunk) {
|
|
||||||
let statuses = result.value;
|
|
||||||
for (signature, status) in
|
|
||||||
pending_signatures_chunk.iter().zip(statuses.into_iter())
|
|
||||||
{
|
|
||||||
if let Some(status) = status {
|
|
||||||
if let Some(confirmation_status) = &status.confirmation_status {
|
|
||||||
if *confirmation_status != TransactionConfirmationStatus::Processed
|
|
||||||
{
|
|
||||||
let _ = pending_transactions.remove(signature);
|
|
||||||
}
|
|
||||||
} else if status.confirmations.is_none()
|
|
||||||
|| status.confirmations.unwrap() > 1
|
|
||||||
{
|
|
||||||
let _ = pending_transactions.remove(signature);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
block_height = rpc_client.get_block_height()?;
|
|
||||||
progress_bar.set_message(&format!(
|
|
||||||
"[{}/{}] Transactions confirmed. Retrying in {} blocks",
|
|
||||||
num_transactions - pending_transactions.len(),
|
|
||||||
num_transactions,
|
|
||||||
last_valid_block_height.saturating_sub(block_height)
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
if pending_transactions.is_empty() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
if block_height > last_valid_block_height {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
for transaction in pending_transactions.values() {
|
|
||||||
if !tpu_client.send_transaction(transaction) {
|
|
||||||
let _result = rpc_client
|
|
||||||
.send_transaction_with_config(
|
|
||||||
transaction,
|
|
||||||
RpcSendTransactionConfig {
|
|
||||||
preflight_commitment: Some(commitment.commitment),
|
|
||||||
..RpcSendTransactionConfig::default()
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.ok();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg!(not(test)) {
|
|
||||||
// Retry twice a second
|
|
||||||
sleep(Duration::from_millis(500));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if send_retries == 0 {
|
|
||||||
return Err("Transactions failed".into());
|
|
||||||
}
|
|
||||||
send_retries -= 1;
|
|
||||||
|
|
||||||
// Re-sign any failed transactions with a new blockhash and retry
|
|
||||||
let Fees {
|
|
||||||
blockhash,
|
|
||||||
last_valid_block_height: new_last_valid_block_height,
|
|
||||||
..
|
|
||||||
} = rpc_client.get_fees_with_commitment(commitment)?.value;
|
|
||||||
last_valid_block_height = new_last_valid_block_height;
|
|
||||||
transactions = vec![];
|
|
||||||
for (_, mut transaction) in pending_transactions.into_iter() {
|
|
||||||
transaction.try_sign(signer_keys, blockhash)?;
|
|
||||||
transactions.push(transaction);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -18,5 +18,6 @@ pub mod rpc_filter;
|
|||||||
pub mod rpc_request;
|
pub mod rpc_request;
|
||||||
pub mod rpc_response;
|
pub mod rpc_response;
|
||||||
pub mod rpc_sender;
|
pub mod rpc_sender;
|
||||||
|
pub mod spinner;
|
||||||
pub mod thin_client;
|
pub mod thin_client;
|
||||||
pub mod tpu_client;
|
pub mod tpu_client;
|
||||||
|
@ -146,8 +146,8 @@ impl RpcSender for MockSender {
|
|||||||
value: serde_json::to_value(RpcFees {
|
value: serde_json::to_value(RpcFees {
|
||||||
blockhash: PUBKEY.to_string(),
|
blockhash: PUBKEY.to_string(),
|
||||||
fee_calculator: FeeCalculator::default(),
|
fee_calculator: FeeCalculator::default(),
|
||||||
last_valid_slot: 42,
|
last_valid_slot: 1234,
|
||||||
last_valid_block_height: 42,
|
last_valid_block_height: 1234,
|
||||||
})
|
})
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
})?,
|
})?,
|
||||||
|
@ -21,9 +21,9 @@ use {
|
|||||||
rpc_request::{RpcError, RpcRequest, RpcResponseErrorData, TokenAccountsFilter},
|
rpc_request::{RpcError, RpcRequest, RpcResponseErrorData, TokenAccountsFilter},
|
||||||
rpc_response::*,
|
rpc_response::*,
|
||||||
rpc_sender::*,
|
rpc_sender::*,
|
||||||
|
spinner,
|
||||||
},
|
},
|
||||||
bincode::serialize,
|
bincode::serialize,
|
||||||
indicatif::{ProgressBar, ProgressStyle},
|
|
||||||
log::*,
|
log::*,
|
||||||
serde_json::{json, Value},
|
serde_json::{json, Value},
|
||||||
solana_account_decoder::{
|
solana_account_decoder::{
|
||||||
@ -1067,7 +1067,7 @@ impl RpcClient {
|
|||||||
};
|
};
|
||||||
let mut confirmations = 0;
|
let mut confirmations = 0;
|
||||||
|
|
||||||
let progress_bar = new_spinner_progress_bar();
|
let progress_bar = spinner::new_progress_bar();
|
||||||
|
|
||||||
progress_bar.set_message(&format!(
|
progress_bar.set_message(&format!(
|
||||||
"[{}/{}] Finalizing transaction {}",
|
"[{}/{}] Finalizing transaction {}",
|
||||||
@ -4725,14 +4725,6 @@ pub struct GetConfirmedSignaturesForAddress2Config {
|
|||||||
pub commitment: Option<CommitmentConfig>,
|
pub commitment: Option<CommitmentConfig>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_spinner_progress_bar() -> ProgressBar {
|
|
||||||
let progress_bar = ProgressBar::new(42);
|
|
||||||
progress_bar
|
|
||||||
.set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}"));
|
|
||||||
progress_bar.enable_steady_tick(100);
|
|
||||||
progress_bar
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_rpc_request_str(rpc_addr: SocketAddr, tls: bool) -> String {
|
fn get_rpc_request_str(rpc_addr: SocketAddr, tls: bool) -> String {
|
||||||
if tls {
|
if tls {
|
||||||
format!("https://{}", rpc_addr)
|
format!("https://{}", rpc_addr)
|
||||||
|
11
client/src/spinner.rs
Normal file
11
client/src/spinner.rs
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
//! Spinner creator
|
||||||
|
|
||||||
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
|
|
||||||
|
pub(crate) fn new_progress_bar() -> ProgressBar {
|
||||||
|
let progress_bar = ProgressBar::new(42);
|
||||||
|
progress_bar
|
||||||
|
.set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}"));
|
||||||
|
progress_bar.enable_steady_tick(100);
|
||||||
|
progress_bar
|
||||||
|
}
|
@ -1,12 +1,21 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
|
client_error::ClientError,
|
||||||
pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription},
|
pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription},
|
||||||
rpc_client::RpcClient,
|
rpc_client::RpcClient,
|
||||||
rpc_response::SlotUpdate,
|
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
|
||||||
|
rpc_response::{Fees, SlotUpdate},
|
||||||
|
spinner,
|
||||||
};
|
};
|
||||||
use bincode::serialize;
|
use bincode::serialize;
|
||||||
use log::*;
|
use log::*;
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey, transaction::Transaction,
|
clock::Slot,
|
||||||
|
commitment_config::CommitmentConfig,
|
||||||
|
message::Message,
|
||||||
|
pubkey::Pubkey,
|
||||||
|
signature::SignerError,
|
||||||
|
signers::Signers,
|
||||||
|
transaction::{Transaction, TransactionError},
|
||||||
};
|
};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet, VecDeque},
|
collections::{HashMap, HashSet, VecDeque},
|
||||||
@ -16,7 +25,7 @@ use std::{
|
|||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
Arc, RwLock,
|
Arc, RwLock,
|
||||||
},
|
},
|
||||||
thread::JoinHandle,
|
thread::{sleep, JoinHandle},
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
@ -26,9 +35,13 @@ pub enum TpuSenderError {
|
|||||||
#[error("Pubsub error: {0:?}")]
|
#[error("Pubsub error: {0:?}")]
|
||||||
PubsubError(#[from] PubsubClientError),
|
PubsubError(#[from] PubsubClientError),
|
||||||
#[error("RPC error: {0:?}")]
|
#[error("RPC error: {0:?}")]
|
||||||
RpcError(#[from] crate::client_error::ClientError),
|
RpcError(#[from] ClientError),
|
||||||
#[error("IO error: {0:?}")]
|
#[error("IO error: {0:?}")]
|
||||||
IoError(#[from] std::io::Error),
|
IoError(#[from] std::io::Error),
|
||||||
|
#[error("Signer error: {0:?}")]
|
||||||
|
SignerError(#[from] SignerError),
|
||||||
|
#[error("Custom error: {0}")]
|
||||||
|
Custom(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, TpuSenderError>;
|
type Result<T> = std::result::Result<T, TpuSenderError>;
|
||||||
@ -62,6 +75,7 @@ pub struct TpuClient {
|
|||||||
fanout_slots: u64,
|
fanout_slots: u64,
|
||||||
leader_tpu_service: LeaderTpuService,
|
leader_tpu_service: LeaderTpuService,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
|
rpc_client: Arc<RpcClient>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TpuClient {
|
impl TpuClient {
|
||||||
@ -96,15 +110,161 @@ impl TpuClient {
|
|||||||
config: TpuClientConfig,
|
config: TpuClientConfig,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let leader_tpu_service = LeaderTpuService::new(rpc_client, websocket_url, exit.clone())?;
|
let leader_tpu_service =
|
||||||
|
LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone())?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||||
fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1),
|
fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1),
|
||||||
leader_tpu_service,
|
leader_tpu_service,
|
||||||
exit,
|
exit,
|
||||||
|
rpc_client,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn send_and_confirm_messages_with_spinner<T: Signers>(
|
||||||
|
&self,
|
||||||
|
messages: &[Message],
|
||||||
|
signers: &T,
|
||||||
|
) -> Result<Vec<Option<TransactionError>>> {
|
||||||
|
let mut expired_blockhash_retries = 5;
|
||||||
|
/* Send at ~100 TPS */
|
||||||
|
const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
|
||||||
|
/* Retry batch send after 4 seconds */
|
||||||
|
const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
|
||||||
|
|
||||||
|
let progress_bar = spinner::new_progress_bar();
|
||||||
|
progress_bar.set_message("Setting up...");
|
||||||
|
|
||||||
|
let mut transactions = messages
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, message)| (i, Transaction::new_unsigned(message.clone())))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let num_transactions = transactions.len() as f64;
|
||||||
|
let mut transaction_errors = vec![None; transactions.len()];
|
||||||
|
let set_message = |confirmed_transactions,
|
||||||
|
block_height: Option<u64>,
|
||||||
|
last_valid_block_height: u64,
|
||||||
|
status: &str| {
|
||||||
|
progress_bar.set_message(&format!(
|
||||||
|
"{:>5.1}% | {:<40}{}",
|
||||||
|
confirmed_transactions as f64 * 100. / num_transactions,
|
||||||
|
status,
|
||||||
|
match block_height {
|
||||||
|
Some(block_height) => format!(
|
||||||
|
" [block height {}; re-sign in {} blocks]",
|
||||||
|
block_height,
|
||||||
|
last_valid_block_height.saturating_sub(block_height),
|
||||||
|
),
|
||||||
|
None => String::new(),
|
||||||
|
},
|
||||||
|
));
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut confirmed_transactions = 0;
|
||||||
|
let mut block_height = self.rpc_client.get_block_height()?;
|
||||||
|
while expired_blockhash_retries > 0 {
|
||||||
|
let Fees {
|
||||||
|
blockhash,
|
||||||
|
fee_calculator: _,
|
||||||
|
last_valid_block_height,
|
||||||
|
} = self.rpc_client.get_fees()?;
|
||||||
|
|
||||||
|
let mut pending_transactions = HashMap::new();
|
||||||
|
for (i, mut transaction) in transactions {
|
||||||
|
transaction.try_sign(signers, blockhash)?;
|
||||||
|
pending_transactions.insert(transaction.signatures[0], (i, transaction));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut last_resend = Instant::now() - TRANSACTION_RESEND_INTERVAL;
|
||||||
|
while block_height <= last_valid_block_height {
|
||||||
|
let num_transactions = pending_transactions.len();
|
||||||
|
|
||||||
|
// Periodically re-send all pending transactions
|
||||||
|
if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL {
|
||||||
|
for (index, (_i, transaction)) in pending_transactions.values().enumerate() {
|
||||||
|
if !self.send_transaction(transaction) {
|
||||||
|
let _result = self.rpc_client.send_transaction(transaction).ok();
|
||||||
|
}
|
||||||
|
set_message(
|
||||||
|
confirmed_transactions,
|
||||||
|
None, //block_height,
|
||||||
|
last_valid_block_height,
|
||||||
|
&format!("Sending {}/{} transactions", index + 1, num_transactions,),
|
||||||
|
);
|
||||||
|
sleep(SEND_TRANSACTION_INTERVAL);
|
||||||
|
}
|
||||||
|
last_resend = Instant::now();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the next block before checking for transaction statuses
|
||||||
|
let mut block_height_refreshes = 10;
|
||||||
|
set_message(
|
||||||
|
confirmed_transactions,
|
||||||
|
Some(block_height),
|
||||||
|
last_valid_block_height,
|
||||||
|
&format!("Waiting for next block, {} pending...", num_transactions),
|
||||||
|
);
|
||||||
|
let mut new_block_height = block_height;
|
||||||
|
while block_height == new_block_height && block_height_refreshes > 0 {
|
||||||
|
sleep(Duration::from_millis(500));
|
||||||
|
new_block_height = self.rpc_client.get_block_height()?;
|
||||||
|
block_height_refreshes -= 1;
|
||||||
|
}
|
||||||
|
block_height = new_block_height;
|
||||||
|
|
||||||
|
// Collect statuses for the transactions, drop those that are confirmed
|
||||||
|
let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
|
||||||
|
for pending_signatures_chunk in
|
||||||
|
pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
|
||||||
|
{
|
||||||
|
if let Ok(result) = self
|
||||||
|
.rpc_client
|
||||||
|
.get_signature_statuses(pending_signatures_chunk)
|
||||||
|
{
|
||||||
|
let statuses = result.value;
|
||||||
|
for (signature, status) in
|
||||||
|
pending_signatures_chunk.iter().zip(statuses.into_iter())
|
||||||
|
{
|
||||||
|
if let Some(status) = status {
|
||||||
|
if status.satisfies_commitment(self.rpc_client.commitment()) {
|
||||||
|
if let Some((i, _)) = pending_transactions.remove(signature) {
|
||||||
|
confirmed_transactions += 1;
|
||||||
|
if status.err.is_some() {
|
||||||
|
progress_bar.println(format!(
|
||||||
|
"Failed transaction: {:?}",
|
||||||
|
status
|
||||||
|
));
|
||||||
|
}
|
||||||
|
transaction_errors[i] = status.err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
set_message(
|
||||||
|
confirmed_transactions,
|
||||||
|
Some(block_height),
|
||||||
|
last_valid_block_height,
|
||||||
|
"Checking transaction status...",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if pending_transactions.is_empty() {
|
||||||
|
return Ok(transaction_errors);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
transactions = pending_transactions.into_iter().map(|(_k, v)| v).collect();
|
||||||
|
progress_bar.println(format!(
|
||||||
|
"Blockhash expired. {} retries remaining",
|
||||||
|
expired_blockhash_retries
|
||||||
|
));
|
||||||
|
expired_blockhash_retries -= 1;
|
||||||
|
}
|
||||||
|
Err(TpuSenderError::Custom("Max retries exceeded".into()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for TpuClient {
|
impl Drop for TpuClient {
|
||||||
|
Reference in New Issue
Block a user