Forward transactions to the expected leader instead of your own TPU port (#12012)

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin
2020-09-04 15:22:18 -07:00
committed by GitHub
parent 2b4af48537
commit 7b7b7be99c
4 changed files with 122 additions and 42 deletions

View File

@ -2664,6 +2664,7 @@ pub mod tests {
Arc::new(SendTransactionService::new(
&cluster_info,
&bank_forks,
None,
&exit,
)),
&runtime::Runtime::new().unwrap(),
@ -2717,6 +2718,7 @@ pub mod tests {
Arc::new(SendTransactionService::new(
&cluster_info,
&bank_forks,
None,
&exit,
)),
&runtime::Runtime::new().unwrap(),
@ -3983,6 +3985,7 @@ pub mod tests {
Arc::new(SendTransactionService::new(
&cluster_info,
&bank_forks,
None,
&exit,
)),
&runtime::Runtime::new().unwrap(),
@ -4029,6 +4032,7 @@ pub mod tests {
Arc::new(SendTransactionService::new(
&cluster_info,
&bank_forks,
None,
&exit,
)),
&runtime::Runtime::new().unwrap(),
@ -4225,6 +4229,7 @@ pub mod tests {
Arc::new(SendTransactionService::new(
&cluster_info,
&bank_forks,
None,
&exit,
)),
&runtime::Runtime::new().unwrap(),
@ -4259,6 +4264,7 @@ pub mod tests {
Arc::new(SendTransactionService::new(
&cluster_info,
&bank_forks,
None,
&exit,
)),
&runtime::Runtime::new().unwrap(),
@ -4352,6 +4358,7 @@ pub mod tests {
Arc::new(SendTransactionService::new(
&cluster_info,
&bank_forks,
None,
&exit,
)),
&runtime::Runtime::new().unwrap(),

View File

@ -1,7 +1,8 @@
//! The `rpc_service` module implements the Solana JSON RPC service.
use crate::{
cluster_info::ClusterInfo, commitment::BlockCommitmentCache, rpc::*, rpc_health::*,
cluster_info::ClusterInfo, commitment::BlockCommitmentCache, poh_recorder::PohRecorder, rpc::*,
rpc_health::*, send_transaction_service::LeaderInfo,
send_transaction_service::SendTransactionService, validator::ValidatorExit,
};
use jsonrpc_core::MetaIoHandler;
@ -21,7 +22,7 @@ use std::{
net::SocketAddr,
path::{Path, PathBuf},
sync::atomic::{AtomicBool, Ordering},
sync::{mpsc::channel, Arc, RwLock},
sync::{mpsc::channel, Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
};
use tokio::runtime;
@ -237,6 +238,7 @@ impl JsonRpcService {
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
blockstore: Arc<Blockstore>,
cluster_info: Arc<ClusterInfo>,
poh_recorder: Option<Arc<Mutex<PohRecorder>>>,
genesis_hash: Hash,
ledger_path: &Path,
validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
@ -254,9 +256,12 @@ impl JsonRpcService {
));
let exit_send_transaction_service = Arc::new(AtomicBool::new(false));
let leader_info =
poh_recorder.map(|recorder| LeaderInfo::new(cluster_info.clone(), recorder));
let send_transaction_service = Arc::new(SendTransactionService::new(
&cluster_info,
&bank_forks,
leader_info,
&exit_send_transaction_service,
));
@ -418,6 +423,7 @@ mod tests {
block_commitment_cache,
blockstore,
cluster_info,
None,
Hash::default(),
&PathBuf::from("farf"),
validator_exit,

View File

@ -1,8 +1,9 @@
use crate::cluster_info::ClusterInfo;
use crate::{cluster_info::ClusterInfo, poh_recorder::PohRecorder};
use log::*;
use solana_ledger::bank_forks::BankForks;
use solana_metrics::{datapoint_warn, inc_new_counter_info};
use solana_runtime::bank::Bank;
use solana_sdk::{clock::Slot, signature::Signature};
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature};
use std::{
collections::HashMap,
net::{SocketAddr, UdpSocket},
@ -25,6 +26,39 @@ pub struct SendTransactionService {
tpu_address: SocketAddr,
}
pub struct LeaderInfo {
cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<Mutex<PohRecorder>>,
recent_peers: HashMap<Pubkey, SocketAddr>,
}
impl LeaderInfo {
pub fn new(cluster_info: Arc<ClusterInfo>, poh_recorder: Arc<Mutex<PohRecorder>>) -> Self {
Self {
cluster_info,
poh_recorder,
recent_peers: HashMap::new(),
}
}
pub fn refresh_recent_peers(&mut self) {
self.recent_peers = self
.cluster_info
.tpu_peers()
.into_iter()
.map(|ci| (ci.id, ci.tpu))
.collect();
}
pub fn get_leader_tpu(&self) -> Option<&SocketAddr> {
self.poh_recorder
.lock()
.unwrap()
.leader_after_n_slots(0)
.and_then(|leader| self.recent_peers.get(&leader))
}
}
struct TransactionInfo {
signature: Signature,
wire_transaction: Vec<u8>,
@ -44,12 +78,19 @@ impl SendTransactionService {
pub fn new(
cluster_info: &Arc<ClusterInfo>,
bank_forks: &Arc<RwLock<BankForks>>,
leader_info: Option<LeaderInfo>,
exit: &Arc<AtomicBool>,
) -> Self {
let (sender, receiver) = channel::<TransactionInfo>();
let tpu_address = cluster_info.my_contact_info().tpu;
let thread = Self::retry_thread(receiver, bank_forks.clone(), tpu_address, exit.clone());
let thread = Self::retry_thread(
receiver,
bank_forks.clone(),
leader_info,
tpu_address,
exit.clone(),
);
Self {
thread,
sender: Mutex::new(sender),
@ -61,6 +102,7 @@ impl SendTransactionService {
fn retry_thread(
receiver: Receiver<TransactionInfo>,
bank_forks: Arc<RwLock<BankForks>>,
mut leader_info: Option<LeaderInfo>,
tpu_address: SocketAddr,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
@ -68,6 +110,10 @@ impl SendTransactionService {
let mut transactions = HashMap::new();
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
}
Builder::new()
.name("send-tx-svc".to_string())
.spawn(move || loop {
@ -76,6 +122,15 @@ impl SendTransactionService {
}
if let Ok(transaction_info) = receiver.recv_timeout(Duration::from_secs(1)) {
let address = leader_info
.as_ref()
.and_then(|leader_info| leader_info.get_leader_tpu())
.unwrap_or(&tpu_address);
Self::send_transaction(
&send_socket,
address,
&transaction_info.wire_transaction,
);
if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE {
transactions.insert(transaction_info.signature, transaction_info);
} else {
@ -99,9 +154,13 @@ impl SendTransactionService {
&send_socket,
&tpu_address,
&mut transactions,
&leader_info,
);
}
last_status_check = Instant::now();
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
}
}
})
.unwrap()
@ -113,6 +172,7 @@ impl SendTransactionService {
send_socket: &UdpSocket,
tpu_address: &SocketAddr,
transactions: &mut HashMap<Signature, TransactionInfo>,
leader_info: &Option<LeaderInfo>,
) -> ProcessTransactionsResult {
let mut result = ProcessTransactionsResult::default();
@ -137,7 +197,10 @@ impl SendTransactionService {
inc_new_counter_info!("send_transaction_service-retry", 1);
Self::send_transaction(
&send_socket,
&tpu_address,
leader_info
.as_ref()
.and_then(|leader_info| leader_info.get_leader_tpu())
.unwrap_or(&tpu_address),
&transaction_info.wire_transaction,
);
true
@ -203,7 +266,7 @@ mod test {
let exit = Arc::new(AtomicBool::new(false));
let send_tranaction_service =
SendTransactionService::new(&cluster_info, &bank_forks, &exit);
SendTransactionService::new(&cluster_info, &bank_forks, None, &exit);
exit.store(true, Ordering::Relaxed);
send_tranaction_service.join().unwrap();
@ -211,8 +274,6 @@ mod test {
#[test]
fn process_transactions() {
solana_logger::setup();
let (bank_forks, mint_keypair, _voting_keypair) = new_bank_forks();
let cluster_info = ClusterInfo::default();
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -263,6 +324,7 @@ mod test {
&send_socket,
&tpu_address,
&mut transactions,
&None,
);
assert!(transactions.is_empty());
assert_eq!(
@ -288,6 +350,7 @@ mod test {
&send_socket,
&tpu_address,
&mut transactions,
&None,
);
assert!(transactions.is_empty());
assert_eq!(
@ -313,6 +376,7 @@ mod test {
&send_socket,
&tpu_address,
&mut transactions,
&None,
);
assert!(transactions.is_empty());
assert_eq!(
@ -338,6 +402,7 @@ mod test {
&send_socket,
&tpu_address,
&mut transactions,
&None,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
@ -364,6 +429,7 @@ mod test {
&send_socket,
&tpu_address,
&mut transactions,
&None,
);
assert_eq!(transactions.len(), 1);
assert_eq!(

View File

@ -279,38 +279,6 @@ impl Validator {
block_commitment_cache.clone(),
));
let rpc_override_health_check = Arc::new(AtomicBool::new(false));
let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| {
if ContactInfo::is_valid_address(&node.info.rpc) {
assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub));
assert_eq!(rpc_port, node.info.rpc.port());
assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port());
} else {
assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub));
}
(
JsonRpcService::new(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port),
config.rpc_config.clone(),
config.snapshot_config.clone(),
bank_forks.clone(),
block_commitment_cache.clone(),
blockstore.clone(),
cluster_info.clone(),
genesis_config.hash(),
ledger_path,
validator_exit.clone(),
config.trusted_validators.clone(),
rpc_override_health_check.clone(),
),
PubSubService::new(
&subscriptions,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port),
&exit,
),
)
});
info!(
"Starting PoH: epoch={} slot={} tick_height={} blockhash={} leader={:?}",
bank.epoch(),
@ -333,7 +301,7 @@ impl Validator {
std::thread::park();
}
let poh_config = Arc::new(genesis_config.poh_config);
let poh_config = Arc::new(genesis_config.poh_config.clone());
let (mut poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal(
bank.tick_height(),
bank.last_blockhash(),
@ -357,6 +325,39 @@ impl Validator {
}
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let rpc_override_health_check = Arc::new(AtomicBool::new(false));
let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| {
if ContactInfo::is_valid_address(&node.info.rpc) {
assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub));
assert_eq!(rpc_port, node.info.rpc.port());
assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port());
} else {
assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub));
}
(
JsonRpcService::new(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port),
config.rpc_config.clone(),
config.snapshot_config.clone(),
bank_forks.clone(),
block_commitment_cache.clone(),
blockstore.clone(),
cluster_info.clone(),
Some(poh_recorder.clone()),
genesis_config.hash(),
ledger_path,
validator_exit.clone(),
config.trusted_validators.clone(),
rpc_override_health_check.clone(),
),
PubSubService::new(
&subscriptions,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port),
&exit,
),
)
});
let ip_echo_server = solana_net_utils::ip_echo_server(node.sockets.ip_echo.unwrap());
let gossip_service = GossipService::new(