* rpc-send-tx-svc: add with_config constructor (cherry picked from commitfe098b5ddc
) # Conflicts: # Cargo.lock # core/Cargo.toml # replica-node/Cargo.toml # rpc/src/rpc_service.rs # rpc/src/send_transaction_service.rs # validator/Cargo.toml * rpc-send-tx-svc: server-side retry knobs (cherry picked from commit2744a2128c
) Co-authored-by: Trent Nelson <trent@solana.com>
This commit is contained in:
@ -45,6 +45,7 @@ use {
|
||||
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
|
||||
poh_service::{self, PohService},
|
||||
},
|
||||
solana_rpc::send_transaction_service,
|
||||
solana_rpc::{
|
||||
max_slots::MaxSlots,
|
||||
optimistically_confirmed_bank_tracker::{
|
||||
@ -138,8 +139,7 @@ pub struct ValidatorConfig {
|
||||
pub contact_debug_interval: u64,
|
||||
pub contact_save_interval: u64,
|
||||
pub bpf_jit: bool,
|
||||
pub send_transaction_retry_ms: u64,
|
||||
pub send_transaction_leader_forward_count: u64,
|
||||
pub send_transaction_service_config: send_transaction_service::Config,
|
||||
pub no_poh_speed_test: bool,
|
||||
pub poh_pinned_cpu_core: usize,
|
||||
pub poh_hashes_per_batch: u64,
|
||||
@ -197,8 +197,7 @@ impl Default for ValidatorConfig {
|
||||
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
|
||||
contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
|
||||
bpf_jit: false,
|
||||
send_transaction_retry_ms: 2000,
|
||||
send_transaction_leader_forward_count: 2,
|
||||
send_transaction_service_config: send_transaction_service::Config::default(),
|
||||
no_poh_speed_test: true,
|
||||
poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
|
||||
poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
|
||||
@ -592,8 +591,7 @@ impl Validator {
|
||||
config.trusted_validators.clone(),
|
||||
rpc_override_health_check.clone(),
|
||||
optimistically_confirmed_bank.clone(),
|
||||
config.send_transaction_retry_ms,
|
||||
config.send_transaction_leader_forward_count,
|
||||
config.send_transaction_service_config.clone(),
|
||||
max_slots.clone(),
|
||||
leader_schedule_cache.clone(),
|
||||
max_complete_transaction_status_slot,
|
||||
|
@ -43,8 +43,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
|
||||
contact_debug_interval: config.contact_debug_interval,
|
||||
contact_save_interval: config.contact_save_interval,
|
||||
bpf_jit: config.bpf_jit,
|
||||
send_transaction_retry_ms: config.send_transaction_retry_ms,
|
||||
send_transaction_leader_forward_count: config.send_transaction_leader_forward_count,
|
||||
send_transaction_service_config: config.send_transaction_service_config.clone(),
|
||||
no_poh_speed_test: config.no_poh_speed_test,
|
||||
poh_pinned_cpu_core: config.poh_pinned_cpu_core,
|
||||
account_indexes: config.account_indexes.clone(),
|
||||
|
@ -6,7 +6,7 @@ use {
|
||||
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
|
||||
rpc::{rpc_deprecated_v1_7::*, rpc_full::*, rpc_minimal::*, rpc_obsolete_v1_7::*, *},
|
||||
rpc_health::*,
|
||||
send_transaction_service::{LeaderInfo, SendTransactionService},
|
||||
send_transaction_service::{self, LeaderInfo, SendTransactionService},
|
||||
},
|
||||
jsonrpc_core::{futures::prelude::*, MetaIoHandler},
|
||||
jsonrpc_http_server::{
|
||||
@ -280,8 +280,7 @@ impl JsonRpcService {
|
||||
trusted_validators: Option<HashSet<Pubkey>>,
|
||||
override_health_check: Arc<AtomicBool>,
|
||||
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
|
||||
send_transaction_retry_ms: u64,
|
||||
send_transaction_leader_forward_count: u64,
|
||||
send_transaction_service_config: send_transaction_service::Config,
|
||||
max_slots: Arc<MaxSlots>,
|
||||
leader_schedule_cache: Arc<LeaderScheduleCache>,
|
||||
current_transaction_status_slot: Arc<AtomicU64>,
|
||||
@ -378,13 +377,12 @@ impl JsonRpcService {
|
||||
|
||||
let leader_info =
|
||||
poh_recorder.map(|recorder| LeaderInfo::new(cluster_info.clone(), recorder));
|
||||
let _send_transaction_service = Arc::new(SendTransactionService::new(
|
||||
let _send_transaction_service = Arc::new(SendTransactionService::new_with_config(
|
||||
tpu_address,
|
||||
&bank_forks,
|
||||
leader_info,
|
||||
receiver,
|
||||
send_transaction_retry_ms,
|
||||
send_transaction_leader_forward_count,
|
||||
send_transaction_service_config,
|
||||
));
|
||||
|
||||
#[cfg(test)]
|
||||
@ -540,8 +538,11 @@ mod tests {
|
||||
None,
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
optimistically_confirmed_bank,
|
||||
1000,
|
||||
1,
|
||||
send_transaction_service::Config {
|
||||
retry_rate_ms: 1000,
|
||||
leader_forward_count: 1,
|
||||
..send_transaction_service::Config::default()
|
||||
},
|
||||
Arc::new(MaxSlots::default()),
|
||||
Arc::new(LeaderScheduleCache::default()),
|
||||
Arc::new(AtomicU64::default()),
|
||||
|
@ -23,6 +23,12 @@ use {
|
||||
|
||||
/// Maximum size of the transaction queue
|
||||
const MAX_TRANSACTION_QUEUE_SIZE: usize = 10_000; // This seems like a lot but maybe it needs to be bigger one day
|
||||
/// Default retry interval
|
||||
const DEFAULT_RETRY_RATE_MS: u64 = 2_000;
|
||||
/// Default number of leaders to forward transactions to
|
||||
const DEFAULT_LEADER_FORWARD_COUNT: u64 = 2;
|
||||
/// Default max number of time the service will retry broadcast
|
||||
const DEFAULT_SERVICE_MAX_RETRIES: usize = usize::MAX;
|
||||
|
||||
pub struct SendTransactionService {
|
||||
thread: JoinHandle<()>,
|
||||
@ -108,6 +114,25 @@ struct ProcessTransactionsResult {
|
||||
retained: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Config {
|
||||
pub retry_rate_ms: u64,
|
||||
pub leader_forward_count: u64,
|
||||
pub default_max_retries: Option<usize>,
|
||||
pub service_max_retries: usize,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
retry_rate_ms: DEFAULT_RETRY_RATE_MS,
|
||||
leader_forward_count: DEFAULT_LEADER_FORWARD_COUNT,
|
||||
default_max_retries: None,
|
||||
service_max_retries: DEFAULT_SERVICE_MAX_RETRIES,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SendTransactionService {
|
||||
pub fn new(
|
||||
tpu_address: SocketAddr,
|
||||
@ -116,14 +141,28 @@ impl SendTransactionService {
|
||||
receiver: Receiver<TransactionInfo>,
|
||||
retry_rate_ms: u64,
|
||||
leader_forward_count: u64,
|
||||
) -> Self {
|
||||
let config = Config {
|
||||
retry_rate_ms,
|
||||
leader_forward_count,
|
||||
..Config::default()
|
||||
};
|
||||
Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config)
|
||||
}
|
||||
|
||||
pub fn new_with_config(
|
||||
tpu_address: SocketAddr,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
leader_info: Option<LeaderInfo>,
|
||||
receiver: Receiver<TransactionInfo>,
|
||||
config: Config,
|
||||
) -> Self {
|
||||
let thread = Self::retry_thread(
|
||||
tpu_address,
|
||||
receiver,
|
||||
bank_forks.clone(),
|
||||
leader_info,
|
||||
retry_rate_ms,
|
||||
leader_forward_count,
|
||||
config,
|
||||
);
|
||||
Self { thread }
|
||||
}
|
||||
@ -133,8 +172,7 @@ impl SendTransactionService {
|
||||
receiver: Receiver<TransactionInfo>,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
mut leader_info: Option<LeaderInfo>,
|
||||
retry_rate_ms: u64,
|
||||
leader_forward_count: u64,
|
||||
config: Config,
|
||||
) -> JoinHandle<()> {
|
||||
let mut last_status_check = Instant::now();
|
||||
let mut last_leader_refresh = Instant::now();
|
||||
@ -148,13 +186,13 @@ impl SendTransactionService {
|
||||
Builder::new()
|
||||
.name("send-tx-sv2".to_string())
|
||||
.spawn(move || loop {
|
||||
match receiver.recv_timeout(Duration::from_millis(1000.min(retry_rate_ms))) {
|
||||
match receiver.recv_timeout(Duration::from_millis(1000.min(config.retry_rate_ms))) {
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
Err(RecvTimeoutError::Timeout) => {}
|
||||
Ok(transaction_info) => {
|
||||
let addresses = leader_info
|
||||
.as_ref()
|
||||
.map(|leader_info| leader_info.get_leader_tpus(leader_forward_count));
|
||||
let addresses = leader_info.as_ref().map(|leader_info| {
|
||||
leader_info.get_leader_tpus(config.leader_forward_count)
|
||||
});
|
||||
let addresses = addresses
|
||||
.map(|address_list| {
|
||||
if address_list.is_empty() {
|
||||
@ -179,7 +217,7 @@ impl SendTransactionService {
|
||||
}
|
||||
}
|
||||
|
||||
if last_status_check.elapsed().as_millis() as u64 >= retry_rate_ms {
|
||||
if last_status_check.elapsed().as_millis() as u64 >= config.retry_rate_ms {
|
||||
if !transactions.is_empty() {
|
||||
datapoint_info!(
|
||||
"send_transaction_service-queue-size",
|
||||
@ -200,7 +238,7 @@ impl SendTransactionService {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&leader_info,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
}
|
||||
last_status_check = Instant::now();
|
||||
@ -222,7 +260,7 @@ impl SendTransactionService {
|
||||
tpu_address: &SocketAddr,
|
||||
transactions: &mut HashMap<Signature, TransactionInfo>,
|
||||
leader_info: &Option<LeaderInfo>,
|
||||
leader_forward_count: u64,
|
||||
config: &Config,
|
||||
) -> ProcessTransactionsResult {
|
||||
let mut result = ProcessTransactionsResult::default();
|
||||
|
||||
@ -253,7 +291,13 @@ impl SendTransactionService {
|
||||
inc_new_counter_info!("send_transaction_service-expired", 1);
|
||||
return false;
|
||||
}
|
||||
if let Some(max_retries) = transaction_info.max_retries {
|
||||
|
||||
let max_retries = transaction_info
|
||||
.max_retries
|
||||
.or(config.default_max_retries)
|
||||
.map(|max_retries| max_retries.min(config.service_max_retries));
|
||||
|
||||
if let Some(max_retries) = max_retries {
|
||||
if transaction_info.retries >= max_retries {
|
||||
info!("Dropping transaction due to max retries: {}", signature);
|
||||
result.max_retries_elapsed += 1;
|
||||
@ -270,9 +314,9 @@ impl SendTransactionService {
|
||||
result.retried += 1;
|
||||
transaction_info.retries += 1;
|
||||
inc_new_counter_info!("send_transaction_service-retry", 1);
|
||||
let addresses = leader_info
|
||||
.as_ref()
|
||||
.map(|leader_info| leader_info.get_leader_tpus(leader_forward_count));
|
||||
let addresses = leader_info.as_ref().map(|leader_info| {
|
||||
leader_info.get_leader_tpus(config.leader_forward_count)
|
||||
});
|
||||
let addresses = addresses
|
||||
.map(|address_list| {
|
||||
if address_list.is_empty() {
|
||||
@ -372,7 +416,10 @@ mod test {
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
|
||||
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let tpu_address = "127.0.0.1:0".parse().unwrap();
|
||||
let leader_forward_count = 1;
|
||||
let config = Config {
|
||||
leader_forward_count: 1,
|
||||
..Config::default()
|
||||
};
|
||||
|
||||
let root_bank = Arc::new(Bank::new_from_parent(
|
||||
&bank_forks.read().unwrap().working_bank(),
|
||||
@ -418,7 +465,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert!(transactions.is_empty());
|
||||
assert_eq!(
|
||||
@ -447,7 +494,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert!(transactions.is_empty());
|
||||
assert_eq!(
|
||||
@ -476,7 +523,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert!(transactions.is_empty());
|
||||
assert_eq!(
|
||||
@ -505,7 +552,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert_eq!(transactions.len(), 1);
|
||||
assert_eq!(
|
||||
@ -535,7 +582,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert_eq!(transactions.len(), 1);
|
||||
assert_eq!(
|
||||
@ -575,7 +622,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert_eq!(transactions.len(), 1);
|
||||
assert_eq!(
|
||||
@ -593,7 +640,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert!(transactions.is_empty());
|
||||
assert_eq!(
|
||||
@ -614,7 +661,10 @@ mod test {
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
|
||||
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let tpu_address = "127.0.0.1:0".parse().unwrap();
|
||||
let leader_forward_count = 1;
|
||||
let config = Config {
|
||||
leader_forward_count: 1,
|
||||
..Config::default()
|
||||
};
|
||||
|
||||
let root_bank = Arc::new(Bank::new_from_parent(
|
||||
&bank_forks.read().unwrap().working_bank(),
|
||||
@ -673,7 +723,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert!(transactions.is_empty());
|
||||
assert_eq!(
|
||||
@ -701,7 +751,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert!(transactions.is_empty());
|
||||
assert_eq!(
|
||||
@ -731,7 +781,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert!(transactions.is_empty());
|
||||
assert_eq!(
|
||||
@ -759,7 +809,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert!(transactions.is_empty());
|
||||
assert_eq!(
|
||||
@ -788,7 +838,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert!(transactions.is_empty());
|
||||
assert_eq!(
|
||||
@ -817,7 +867,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert_eq!(transactions.len(), 1);
|
||||
assert_eq!(
|
||||
@ -847,7 +897,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert_eq!(transactions.len(), 1);
|
||||
assert_eq!(
|
||||
@ -875,7 +925,7 @@ mod test {
|
||||
&tpu_address,
|
||||
&mut transactions,
|
||||
&None,
|
||||
leader_forward_count,
|
||||
&config,
|
||||
);
|
||||
assert_eq!(transactions.len(), 0);
|
||||
assert_eq!(
|
||||
|
@ -36,7 +36,7 @@ use {
|
||||
solana_ledger::blockstore_db::BlockstoreRecoveryMode,
|
||||
solana_perf::recycler::enable_recycler_warming,
|
||||
solana_poh::poh_service,
|
||||
solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig},
|
||||
solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig, send_transaction_service},
|
||||
solana_runtime::{
|
||||
accounts_db::{
|
||||
AccountShrinkThreshold, DEFAULT_ACCOUNTS_SHRINK_OPTIMIZE_TOTAL_SPACE,
|
||||
@ -1062,11 +1062,15 @@ pub fn main() {
|
||||
PubSubConfig::default().queue_capacity_items.to_string();
|
||||
let default_rpc_pubsub_queue_capacity_bytes =
|
||||
PubSubConfig::default().queue_capacity_bytes.to_string();
|
||||
let default_rpc_send_transaction_retry_ms = ValidatorConfig::default()
|
||||
.send_transaction_retry_ms
|
||||
let default_send_transaction_service_config = send_transaction_service::Config::default();
|
||||
let default_rpc_send_transaction_retry_ms = default_send_transaction_service_config
|
||||
.retry_rate_ms
|
||||
.to_string();
|
||||
let default_rpc_send_transaction_leader_forward_count = ValidatorConfig::default()
|
||||
.send_transaction_leader_forward_count
|
||||
let default_rpc_send_transaction_leader_forward_count = default_send_transaction_service_config
|
||||
.leader_forward_count
|
||||
.to_string();
|
||||
let default_rpc_send_transaction_service_max_retries = default_send_transaction_service_config
|
||||
.service_max_retries
|
||||
.to_string();
|
||||
let default_rpc_threads = num_cpus::get().to_string();
|
||||
let default_max_snapshot_to_retain = &DEFAULT_MAX_SNAPSHOTS_TO_RETAIN.to_string();
|
||||
@ -1721,6 +1725,23 @@ pub fn main() {
|
||||
.default_value(&default_rpc_send_transaction_leader_forward_count)
|
||||
.help("The number of upcoming leaders to which to forward transactions sent via rpc service."),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("rpc_send_transaction_default_max_retries")
|
||||
.long("rpc-send-default-max-retries")
|
||||
.value_name("NUMBER")
|
||||
.takes_value(true)
|
||||
.validator(is_parsable::<usize>)
|
||||
.help("The maximum number of transaction broadcast retries when unspecified by the request, otherwise retried until expiration."),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("rpc_send_transaction_service_max_retries")
|
||||
.long("rpc-send-service-max-retries")
|
||||
.value_name("NUMBER")
|
||||
.takes_value(true)
|
||||
.validator(is_parsable::<usize>)
|
||||
.default_value(&default_rpc_send_transaction_service_max_retries)
|
||||
.help("The maximum number of transaction broadcast retries, regardless of requested value."),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("rpc_scan_and_fix_roots")
|
||||
.long("rpc-scan-and-fix-roots")
|
||||
@ -2423,12 +2444,25 @@ pub fn main() {
|
||||
debug_keys,
|
||||
contact_debug_interval,
|
||||
bpf_jit: !matches.is_present("no_bpf_jit"),
|
||||
send_transaction_retry_ms: value_t_or_exit!(matches, "rpc_send_transaction_retry_ms", u64),
|
||||
send_transaction_leader_forward_count: value_t_or_exit!(
|
||||
matches,
|
||||
"rpc_send_transaction_leader_forward_count",
|
||||
u64
|
||||
),
|
||||
send_transaction_service_config: send_transaction_service::Config {
|
||||
retry_rate_ms: value_t_or_exit!(matches, "rpc_send_transaction_retry_ms", u64),
|
||||
leader_forward_count: value_t_or_exit!(
|
||||
matches,
|
||||
"rpc_send_transaction_leader_forward_count",
|
||||
u64
|
||||
),
|
||||
default_max_retries: value_t!(
|
||||
matches,
|
||||
"rpc_send_transaction_default_max_retries",
|
||||
usize
|
||||
)
|
||||
.ok(),
|
||||
service_max_retries: value_t_or_exit!(
|
||||
matches,
|
||||
"rpc_send_transaction_service_max_retries",
|
||||
usize
|
||||
),
|
||||
},
|
||||
no_poh_speed_test: matches.is_present("no_poh_speed_test"),
|
||||
poh_pinned_cpu_core: value_of(&matches, "poh_pinned_cpu_core")
|
||||
.unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE),
|
||||
|
Reference in New Issue
Block a user