* Add TPU client for sending txs to the current leader tpu port
* Update tpu_client.rs
(cherry picked from commit 75b8434b76
)
Co-authored-by: Justin Starry <justin@solana.com>
This commit is contained in:
@@ -444,7 +444,7 @@ pub struct CliConfig<'a> {
|
||||
pub websocket_url: String,
|
||||
pub signers: Vec<&'a dyn Signer>,
|
||||
pub keypair_path: String,
|
||||
pub rpc_client: Option<RpcClient>,
|
||||
pub rpc_client: Option<Arc<RpcClient>>,
|
||||
pub rpc_timeout: Duration,
|
||||
pub verbose: bool,
|
||||
pub output_format: OutputFormat,
|
||||
@@ -1284,17 +1284,15 @@ pub fn process_command(config: &CliConfig) -> ProcessResult {
|
||||
println_name_value("Commitment:", &config.commitment.commitment.to_string());
|
||||
}
|
||||
|
||||
let mut _rpc_client;
|
||||
let rpc_client = if config.rpc_client.is_none() {
|
||||
_rpc_client = RpcClient::new_with_timeout_and_commitment(
|
||||
Arc::new(RpcClient::new_with_timeout_and_commitment(
|
||||
config.json_rpc_url.to_string(),
|
||||
config.rpc_timeout,
|
||||
config.commitment,
|
||||
);
|
||||
&_rpc_client
|
||||
))
|
||||
} else {
|
||||
// Primarily for testing
|
||||
config.rpc_client.as_ref().unwrap()
|
||||
config.rpc_client.as_ref().unwrap().clone()
|
||||
};
|
||||
|
||||
match &config.command {
|
||||
@@ -1502,7 +1500,7 @@ pub fn process_command(config: &CliConfig) -> ProcessResult {
|
||||
use_deprecated_loader,
|
||||
allow_excessive_balance,
|
||||
} => process_deploy(
|
||||
&rpc_client,
|
||||
rpc_client,
|
||||
config,
|
||||
program_location,
|
||||
*address,
|
||||
@@ -1510,7 +1508,7 @@ pub fn process_command(config: &CliConfig) -> ProcessResult {
|
||||
*allow_excessive_balance,
|
||||
),
|
||||
CliCommand::Program(program_subcommand) => {
|
||||
process_program_subcommand(&rpc_client, config, program_subcommand)
|
||||
process_program_subcommand(rpc_client, config, program_subcommand)
|
||||
}
|
||||
|
||||
// Stake Commands
|
||||
@@ -2585,7 +2583,7 @@ mod tests {
|
||||
fn test_cli_process_command() {
|
||||
// Success cases
|
||||
let mut config = CliConfig {
|
||||
rpc_client: Some(RpcClient::new_mock("succeeds".to_string())),
|
||||
rpc_client: Some(Arc::new(RpcClient::new_mock("succeeds".to_string()))),
|
||||
json_rpc_url: "http://127.0.0.1:8899".to_string(),
|
||||
..CliConfig::default()
|
||||
};
|
||||
@@ -2785,13 +2783,13 @@ mod tests {
|
||||
assert!(process_command(&config).is_ok());
|
||||
|
||||
// sig_not_found case
|
||||
config.rpc_client = Some(RpcClient::new_mock("sig_not_found".to_string()));
|
||||
config.rpc_client = Some(Arc::new(RpcClient::new_mock("sig_not_found".to_string())));
|
||||
let missing_signature = Signature::new(&bs58::decode("5VERv8NMvzbJMEkV8xnrLkEaWRtSz9CosKDYjCJjBRnbJLgp8uirBgmQpjKhoR4tjF3ZpRzrFmBV6UjKdiSZkQUW").into_vec().unwrap());
|
||||
config.command = CliCommand::Confirm(missing_signature);
|
||||
assert_eq!(process_command(&config).unwrap(), "Not found");
|
||||
|
||||
// Tx error case
|
||||
config.rpc_client = Some(RpcClient::new_mock("account_in_use".to_string()));
|
||||
config.rpc_client = Some(Arc::new(RpcClient::new_mock("account_in_use".to_string())));
|
||||
let any_signature = Signature::new(&bs58::decode(SIGNATURE).into_vec().unwrap());
|
||||
config.command = CliCommand::Confirm(any_signature);
|
||||
assert_eq!(
|
||||
@@ -2800,7 +2798,7 @@ mod tests {
|
||||
);
|
||||
|
||||
// Failure cases
|
||||
config.rpc_client = Some(RpcClient::new_mock("fails".to_string()));
|
||||
config.rpc_client = Some(Arc::new(RpcClient::new_mock("fails".to_string())));
|
||||
|
||||
config.command = CliCommand::Airdrop {
|
||||
pubkey: None,
|
||||
@@ -2870,7 +2868,7 @@ mod tests {
|
||||
mocks.insert(RpcRequest::GetAccountInfo, account_info_response);
|
||||
let rpc_client = RpcClient::new_mock_with_mocks("".to_string(), mocks);
|
||||
|
||||
config.rpc_client = Some(rpc_client);
|
||||
config.rpc_client = Some(Arc::new(rpc_client));
|
||||
let default_keypair = Keypair::new();
|
||||
config.signers = vec![&default_keypair];
|
||||
|
||||
|
@@ -29,7 +29,6 @@ pub mod inflation;
|
||||
pub mod memo;
|
||||
pub mod nonce;
|
||||
pub mod program;
|
||||
pub mod send_tpu;
|
||||
pub mod spend_utils;
|
||||
pub mod stake;
|
||||
pub mod test_utils;
|
||||
|
@@ -1,4 +1,3 @@
|
||||
use crate::send_tpu::{get_leader_tpus, send_transaction_tpu};
|
||||
use crate::{
|
||||
checks::*,
|
||||
cli::{
|
||||
@@ -6,7 +5,6 @@ use crate::{
|
||||
ProcessResult,
|
||||
},
|
||||
};
|
||||
use bincode::serialize;
|
||||
use bip39::{Language, Mnemonic, MnemonicType, Seed};
|
||||
use clap::{App, AppSettings, Arg, ArgMatches, SubCommand};
|
||||
use log::*;
|
||||
@@ -25,7 +23,7 @@ use solana_client::{
|
||||
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
|
||||
rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType},
|
||||
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
|
||||
rpc_response::RpcLeaderSchedule,
|
||||
tpu_client::{TpuClient, TpuClientConfig},
|
||||
};
|
||||
use solana_rbpf::vm::{Config, Executable};
|
||||
use solana_remote_wallet::remote_wallet::RemoteWalletManager;
|
||||
@@ -51,20 +49,17 @@ use solana_sdk::{
|
||||
};
|
||||
use solana_transaction_status::TransactionConfirmationStatus;
|
||||
use std::{
|
||||
cmp::min,
|
||||
collections::HashMap,
|
||||
error,
|
||||
fs::File,
|
||||
io::{Read, Write},
|
||||
net::UdpSocket,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
thread::sleep,
|
||||
time::{Duration, Instant},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
const DATA_CHUNK_SIZE: usize = 229; // Keep program chunks under PACKET_DATA_SIZE
|
||||
const NUM_TPU_LEADERS: u64 = 2;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ProgramCliCommand {
|
||||
@@ -622,7 +617,7 @@ pub fn parse_program_subcommand(
|
||||
}
|
||||
|
||||
pub fn process_program_subcommand(
|
||||
rpc_client: &RpcClient,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
config: &CliConfig,
|
||||
program_subcommand: &ProgramCliCommand,
|
||||
) -> ProcessResult {
|
||||
@@ -638,7 +633,7 @@ pub fn process_program_subcommand(
|
||||
max_len,
|
||||
allow_excessive_balance,
|
||||
} => process_program_deploy(
|
||||
&rpc_client,
|
||||
rpc_client,
|
||||
config,
|
||||
program_location,
|
||||
*program_signer_index,
|
||||
@@ -657,7 +652,7 @@ pub fn process_program_subcommand(
|
||||
buffer_authority_signer_index,
|
||||
max_len,
|
||||
} => process_write_buffer(
|
||||
&rpc_client,
|
||||
rpc_client,
|
||||
config,
|
||||
program_location,
|
||||
*buffer_signer_index,
|
||||
@@ -746,7 +741,7 @@ fn get_default_program_keypair(program_location: &Option<String>) -> Keypair {
|
||||
/// Deploy using upgradeable loader
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn process_program_deploy(
|
||||
rpc_client: &RpcClient,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
config: &CliConfig,
|
||||
program_location: &Option<String>,
|
||||
program_signer_index: Option<SignerIndex>,
|
||||
@@ -892,7 +887,7 @@ fn process_program_deploy(
|
||||
|
||||
let result = if do_deploy {
|
||||
do_process_program_write_and_deploy(
|
||||
rpc_client,
|
||||
rpc_client.clone(),
|
||||
config,
|
||||
&program_data,
|
||||
buffer_data_len,
|
||||
@@ -907,7 +902,7 @@ fn process_program_deploy(
|
||||
)
|
||||
} else {
|
||||
do_process_program_upgrade(
|
||||
rpc_client,
|
||||
rpc_client.clone(),
|
||||
config,
|
||||
&program_data,
|
||||
&program_pubkey,
|
||||
@@ -918,7 +913,7 @@ fn process_program_deploy(
|
||||
};
|
||||
if result.is_ok() && is_final {
|
||||
process_set_authority(
|
||||
rpc_client,
|
||||
&rpc_client,
|
||||
config,
|
||||
Some(program_pubkey),
|
||||
None,
|
||||
@@ -933,7 +928,7 @@ fn process_program_deploy(
|
||||
}
|
||||
|
||||
fn process_write_buffer(
|
||||
rpc_client: &RpcClient,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
config: &CliConfig,
|
||||
program_location: &str,
|
||||
buffer_signer_index: Option<SignerIndex>,
|
||||
@@ -1450,7 +1445,7 @@ fn process_close(
|
||||
|
||||
/// Deploy using non-upgradeable loader
|
||||
pub fn process_deploy(
|
||||
rpc_client: &RpcClient,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
config: &CliConfig,
|
||||
program_location: &str,
|
||||
buffer_signer_index: Option<SignerIndex>,
|
||||
@@ -1495,7 +1490,7 @@ pub fn process_deploy(
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn do_process_program_write_and_deploy(
|
||||
rpc_client: &RpcClient,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
config: &CliConfig,
|
||||
program_data: &[u8],
|
||||
buffer_data_len: usize,
|
||||
@@ -1633,7 +1628,7 @@ fn do_process_program_write_and_deploy(
|
||||
messages.push(message);
|
||||
}
|
||||
|
||||
check_payer(rpc_client, config, balance_needed, &messages)?;
|
||||
check_payer(&rpc_client, config, balance_needed, &messages)?;
|
||||
|
||||
send_deploy_messages(
|
||||
rpc_client,
|
||||
@@ -1660,7 +1655,7 @@ fn do_process_program_write_and_deploy(
|
||||
}
|
||||
|
||||
fn do_process_program_upgrade(
|
||||
rpc_client: &RpcClient,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
config: &CliConfig,
|
||||
program_data: &[u8],
|
||||
program_id: &Pubkey,
|
||||
@@ -1756,7 +1751,7 @@ fn do_process_program_upgrade(
|
||||
);
|
||||
messages.push(&final_message);
|
||||
|
||||
check_payer(rpc_client, config, balance_needed, &messages)?;
|
||||
check_payer(&rpc_client, config, balance_needed, &messages)?;
|
||||
send_deploy_messages(
|
||||
rpc_client,
|
||||
config,
|
||||
@@ -1861,7 +1856,7 @@ fn check_payer(
|
||||
}
|
||||
|
||||
fn send_deploy_messages(
|
||||
rpc_client: &RpcClient,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
config: &CliConfig,
|
||||
initial_message: &Option<Message>,
|
||||
write_messages: &Option<Vec<Message>>,
|
||||
@@ -1909,7 +1904,8 @@ fn send_deploy_messages(
|
||||
}
|
||||
|
||||
send_and_confirm_transactions_with_spinner(
|
||||
&rpc_client,
|
||||
rpc_client.clone(),
|
||||
&config.websocket_url,
|
||||
write_transactions,
|
||||
&[payer_signer, write_signer],
|
||||
config.commitment,
|
||||
@@ -1978,7 +1974,8 @@ fn report_ephemeral_mnemonic(words: usize, mnemonic: bip39::Mnemonic) {
|
||||
}
|
||||
|
||||
fn send_and_confirm_transactions_with_spinner<T: Signers>(
|
||||
rpc_client: &RpcClient,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
websocket_url: &str,
|
||||
mut transactions: Vec<Transaction>,
|
||||
signer_keys: &T,
|
||||
commitment: CommitmentConfig,
|
||||
@@ -1986,39 +1983,19 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
|
||||
) -> Result<(), Box<dyn error::Error>> {
|
||||
let progress_bar = new_spinner_progress_bar();
|
||||
let mut send_retries = 5;
|
||||
let mut leader_schedule: Option<RpcLeaderSchedule> = None;
|
||||
let mut leader_schedule_epoch = 0;
|
||||
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let cluster_nodes = rpc_client.get_cluster_nodes().ok();
|
||||
|
||||
progress_bar.set_message("Finding leader nodes...");
|
||||
let tpu_client = TpuClient::new(
|
||||
rpc_client.clone(),
|
||||
websocket_url,
|
||||
TpuClientConfig::default(),
|
||||
)?;
|
||||
loop {
|
||||
progress_bar.set_message("Finding leader nodes...");
|
||||
let epoch_info = rpc_client.get_epoch_info()?;
|
||||
let mut slot = epoch_info.absolute_slot;
|
||||
let mut last_epoch_fetch = Instant::now();
|
||||
if epoch_info.epoch > leader_schedule_epoch || leader_schedule.is_none() {
|
||||
leader_schedule = rpc_client.get_leader_schedule(Some(epoch_info.absolute_slot))?;
|
||||
leader_schedule_epoch = epoch_info.epoch;
|
||||
}
|
||||
|
||||
let mut tpu_addresses = get_leader_tpus(
|
||||
min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch),
|
||||
NUM_TPU_LEADERS,
|
||||
leader_schedule.as_ref(),
|
||||
cluster_nodes.as_ref(),
|
||||
);
|
||||
|
||||
// Send all transactions
|
||||
let mut pending_transactions = HashMap::new();
|
||||
let num_transactions = transactions.len();
|
||||
for transaction in transactions {
|
||||
if !tpu_addresses.is_empty() {
|
||||
let wire_transaction =
|
||||
serialize(&transaction).expect("serialization should succeed");
|
||||
for tpu_address in &tpu_addresses {
|
||||
send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction);
|
||||
}
|
||||
} else {
|
||||
if !tpu_client.send_transaction(&transaction) {
|
||||
let _result = rpc_client
|
||||
.send_transaction_with_config(
|
||||
&transaction,
|
||||
@@ -2038,22 +2015,11 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
|
||||
|
||||
// Throttle transactions to about 100 TPS
|
||||
sleep(Duration::from_millis(10));
|
||||
|
||||
// Update leader periodically
|
||||
if last_epoch_fetch.elapsed() > Duration::from_millis(400) {
|
||||
let epoch_info = rpc_client.get_epoch_info()?;
|
||||
last_epoch_fetch = Instant::now();
|
||||
tpu_addresses = get_leader_tpus(
|
||||
min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch),
|
||||
NUM_TPU_LEADERS,
|
||||
leader_schedule.as_ref(),
|
||||
cluster_nodes.as_ref(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Collect statuses for all the transactions, drop those that are confirmed
|
||||
loop {
|
||||
let mut slot = 0;
|
||||
let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
|
||||
for pending_signatures_chunk in
|
||||
pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
|
||||
@@ -2095,22 +2061,8 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
|
||||
break;
|
||||
}
|
||||
|
||||
let epoch_info = rpc_client.get_epoch_info()?;
|
||||
tpu_addresses = get_leader_tpus(
|
||||
min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch),
|
||||
NUM_TPU_LEADERS,
|
||||
leader_schedule.as_ref(),
|
||||
cluster_nodes.as_ref(),
|
||||
);
|
||||
|
||||
for transaction in pending_transactions.values() {
|
||||
if !tpu_addresses.is_empty() {
|
||||
let wire_transaction =
|
||||
serialize(&transaction).expect("serialization should succeed");
|
||||
for tpu_address in &tpu_addresses {
|
||||
send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction);
|
||||
}
|
||||
} else {
|
||||
if !tpu_client.send_transaction(transaction) {
|
||||
let _result = rpc_client
|
||||
.send_transaction_with_config(
|
||||
transaction,
|
||||
@@ -2933,7 +2885,7 @@ mod tests {
|
||||
write_keypair_file(&program_pubkey, &program_keypair_location).unwrap();
|
||||
|
||||
let config = CliConfig {
|
||||
rpc_client: Some(RpcClient::new_mock("".to_string())),
|
||||
rpc_client: Some(Arc::new(RpcClient::new_mock("".to_string()))),
|
||||
command: CliCommand::Program(ProgramCliCommand::Deploy {
|
||||
program_location: Some(program_location.to_str().unwrap().to_string()),
|
||||
buffer_signer_index: None,
|
||||
|
@@ -1,46 +0,0 @@
|
||||
use log::*;
|
||||
use solana_client::rpc_response::{RpcContactInfo, RpcLeaderSchedule};
|
||||
use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
|
||||
pub fn get_leader_tpus(
|
||||
slot_index: u64,
|
||||
num_leaders: u64,
|
||||
leader_schedule: Option<&RpcLeaderSchedule>,
|
||||
cluster_nodes: Option<&Vec<RpcContactInfo>>,
|
||||
) -> Vec<SocketAddr> {
|
||||
let leaders: Vec<_> = (0..num_leaders)
|
||||
.filter_map(|i| {
|
||||
leader_schedule?
|
||||
.iter()
|
||||
.find(|(_pubkey, slots)| {
|
||||
slots.iter().any(|slot| {
|
||||
*slot as u64 == (slot_index + (i * NUM_CONSECUTIVE_LEADER_SLOTS))
|
||||
})
|
||||
})
|
||||
.and_then(|(pubkey, _)| {
|
||||
cluster_nodes?
|
||||
.iter()
|
||||
.find(|contact_info| contact_info.pubkey == *pubkey)
|
||||
.and_then(|contact_info| contact_info.tpu)
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
let mut unique_leaders = vec![];
|
||||
for leader in leaders.into_iter() {
|
||||
if !unique_leaders.contains(&leader) {
|
||||
unique_leaders.push(leader);
|
||||
}
|
||||
}
|
||||
unique_leaders
|
||||
}
|
||||
|
||||
pub fn send_transaction_tpu(
|
||||
send_socket: &UdpSocket,
|
||||
tpu_address: &SocketAddr,
|
||||
wire_transaction: &[u8],
|
||||
) {
|
||||
if let Err(err) = send_socket.send_to(wire_transaction, tpu_address) {
|
||||
warn!("Failed to send transaction to {}: {:?}", tpu_address, err);
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user