rpc-send-tx-svc: add with_config constructor

This commit is contained in:
Trent Nelson
2021-10-19 17:11:46 -06:00
committed by Trent Nelson
parent 95e91a4863
commit fe098b5ddc
10 changed files with 109 additions and 59 deletions

View File

@ -18,6 +18,10 @@ use {
/// Maximum size of the transaction queue
const MAX_TRANSACTION_QUEUE_SIZE: usize = 10_000; // This seems like a lot but maybe it needs to be bigger one day
/// Default retry interval
const DEFAULT_RETRY_RATE_MS: u64 = 2_000;
/// Default number of leaders to forward transactions to
const DEFAULT_LEADER_FORWARD_COUNT: u64 = 2;
pub struct SendTransactionService {
thread: JoinHandle<()>,
@ -61,6 +65,21 @@ struct ProcessTransactionsResult {
retained: u64,
}
#[derive(Clone, Debug)]
pub struct Config {
pub retry_rate_ms: u64,
pub leader_forward_count: u64,
}
impl Default for Config {
fn default() -> Self {
Self {
retry_rate_ms: DEFAULT_RETRY_RATE_MS,
leader_forward_count: DEFAULT_LEADER_FORWARD_COUNT,
}
}
}
impl SendTransactionService {
pub fn new<T: TpuInfo + std::marker::Send + 'static>(
tpu_address: SocketAddr,
@ -69,14 +88,28 @@ impl SendTransactionService {
receiver: Receiver<TransactionInfo>,
retry_rate_ms: u64,
leader_forward_count: u64,
) -> Self {
let config = Config {
retry_rate_ms,
leader_forward_count,
..Config::default()
};
Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config)
}
pub fn new_with_config<T: TpuInfo + std::marker::Send + 'static>(
tpu_address: SocketAddr,
bank_forks: &Arc<RwLock<BankForks>>,
leader_info: Option<T>,
receiver: Receiver<TransactionInfo>,
config: Config,
) -> Self {
let thread = Self::retry_thread(
tpu_address,
receiver,
bank_forks.clone(),
leader_info,
retry_rate_ms,
leader_forward_count,
config,
);
Self { thread }
}
@ -86,8 +119,7 @@ impl SendTransactionService {
receiver: Receiver<TransactionInfo>,
bank_forks: Arc<RwLock<BankForks>>,
mut leader_info: Option<T>,
retry_rate_ms: u64,
leader_forward_count: u64,
config: Config,
) -> JoinHandle<()> {
let mut last_status_check = Instant::now();
let mut last_leader_refresh = Instant::now();
@ -101,13 +133,13 @@ impl SendTransactionService {
Builder::new()
.name("send-tx-sv2".to_string())
.spawn(move || loop {
match receiver.recv_timeout(Duration::from_millis(1000.min(retry_rate_ms))) {
match receiver.recv_timeout(Duration::from_millis(1000.min(config.retry_rate_ms))) {
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => {}
Ok(transaction_info) => {
let addresses = leader_info
.as_ref()
.map(|leader_info| leader_info.get_leader_tpus(leader_forward_count));
let addresses = leader_info.as_ref().map(|leader_info| {
leader_info.get_leader_tpus(config.leader_forward_count)
});
let addresses = addresses
.map(|address_list| {
if address_list.is_empty() {
@ -132,7 +164,7 @@ impl SendTransactionService {
}
}
if last_status_check.elapsed().as_millis() as u64 >= retry_rate_ms {
if last_status_check.elapsed().as_millis() as u64 >= config.retry_rate_ms {
if !transactions.is_empty() {
datapoint_info!(
"send_transaction_service-queue-size",
@ -153,7 +185,7 @@ impl SendTransactionService {
&tpu_address,
&mut transactions,
&leader_info,
leader_forward_count,
&config,
);
}
last_status_check = Instant::now();
@ -175,7 +207,7 @@ impl SendTransactionService {
tpu_address: &SocketAddr,
transactions: &mut HashMap<Signature, TransactionInfo>,
leader_info: &Option<T>,
leader_forward_count: u64,
config: &Config,
) -> ProcessTransactionsResult {
let mut result = ProcessTransactionsResult::default();
@ -223,9 +255,9 @@ impl SendTransactionService {
result.retried += 1;
transaction_info.retries += 1;
inc_new_counter_info!("send_transaction_service-retry", 1);
let addresses = leader_info
.as_ref()
.map(|leader_info| leader_info.get_leader_tpus(leader_forward_count));
let addresses = leader_info.as_ref().map(|leader_info| {
leader_info.get_leader_tpus(config.leader_forward_count)
});
let addresses = addresses
.map(|address_list| {
if address_list.is_empty() {
@ -318,7 +350,10 @@ mod test {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let tpu_address = "127.0.0.1:0".parse().unwrap();
let leader_forward_count = 1;
let config = Config {
leader_forward_count: 1,
..Config::default()
};
let root_bank = Arc::new(Bank::new_from_parent(
&bank_forks.read().unwrap().working_bank(),
@ -364,7 +399,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert!(transactions.is_empty());
assert_eq!(
@ -393,7 +428,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert!(transactions.is_empty());
assert_eq!(
@ -422,7 +457,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert!(transactions.is_empty());
assert_eq!(
@ -451,7 +486,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
@ -481,7 +516,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
@ -521,7 +556,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
@ -539,7 +574,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert!(transactions.is_empty());
assert_eq!(
@ -560,7 +595,10 @@ mod test {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let tpu_address = "127.0.0.1:0".parse().unwrap();
let leader_forward_count = 1;
let config = Config {
leader_forward_count: 1,
..Config::default()
};
let root_bank = Arc::new(Bank::new_from_parent(
&bank_forks.read().unwrap().working_bank(),
@ -619,7 +657,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert!(transactions.is_empty());
assert_eq!(
@ -647,7 +685,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert!(transactions.is_empty());
assert_eq!(
@ -677,7 +715,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert!(transactions.is_empty());
assert_eq!(
@ -705,7 +743,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert!(transactions.is_empty());
assert_eq!(
@ -734,7 +772,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert!(transactions.is_empty());
assert_eq!(
@ -763,7 +801,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
@ -793,7 +831,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
@ -821,7 +859,7 @@ mod test {
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
&config,
);
assert_eq!(transactions.len(), 0);
assert_eq!(