diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 01c902af12..b484c0c243 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -41,7 +41,7 @@ pub const MAX_FANOUT_SLOTS: u64 = 100; #[derive(Clone, Debug)] pub struct TpuClientConfig { /// The range of upcoming slots to include when determining which - /// leaders to send transactions to (min: 1, max: 100) + /// leaders to send transactions to (min: 1, max: `MAX_FANOUT_SLOTS`) pub fanout_slots: u64, } @@ -63,13 +63,14 @@ pub struct TpuClient { } impl TpuClient { - /// Serializes and sends a transaction to the current leader's TPU port + /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout + /// size pub fn send_transaction(&self, transaction: &Transaction) -> bool { let wire_transaction = serialize(transaction).expect("serialization should succeed"); self.send_wire_transaction(&wire_transaction) } - /// Sends a transaction to the current leader's TPU port + /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool { let mut sent = false; for tpu_address in self @@ -119,14 +120,14 @@ struct LeaderTpuCache { } impl LeaderTpuCache { - fn new(rpc_client: &RpcClient, first_slot: Slot) -> Self { - let leaders = Self::fetch_slot_leaders(rpc_client, first_slot).unwrap_or_default(); - let leader_tpu_map = Self::fetch_cluster_tpu_sockets(rpc_client).unwrap_or_default(); - Self { + fn new(rpc_client: &RpcClient, first_slot: Slot) -> Result { + let leaders = Self::fetch_slot_leaders(rpc_client, first_slot)?; + let leader_tpu_map = Self::fetch_cluster_tpu_sockets(rpc_client)?; + Ok(Self { first_slot, leaders, leader_tpu_map, - } + }) } // Last slot that has a cached leader pubkey @@ -144,7 +145,13 @@ impl LeaderTpuCache { if leader_set.insert(*leader) { leader_sockets.push(*tpu_socket); } + } else { + // The leader is probably delinquent + trace!("TPU not available for leader {}", leader); } + } else { + // Overran the local leader schedule cache + warn!("Leader not known for slot {}", leader_slot); } } leader_sockets @@ -245,7 +252,7 @@ impl LeaderTpuService { let start_slot = rpc_client.get_max_shred_insert_slot()?; let recent_slots = RecentLeaderSlots::new(start_slot); - let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(&rpc_client, start_slot))); + let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(&rpc_client, start_slot)?)); let subscription = if !websocket_url.is_empty() { let recent_slots = recent_slots.clone(); @@ -317,42 +324,48 @@ impl LeaderTpuService { break; } + // Sleep a few slots before checking if leader cache needs to be refreshed again + std::thread::sleep(Duration::from_millis(sleep_ms)); + sleep_ms = 1000; + // Refresh cluster TPU ports every 5min in case validators restart with new port configuration // or new validators come online if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) { - if let Ok(leader_tpu_map) = LeaderTpuCache::fetch_cluster_tpu_sockets(&rpc_client) { - leader_tpu_cache.write().unwrap().leader_tpu_map = leader_tpu_map; - last_cluster_refresh = Instant::now(); - } else { - sleep_ms = 100; - continue; + match LeaderTpuCache::fetch_cluster_tpu_sockets(&rpc_client) { + Ok(leader_tpu_map) => { + leader_tpu_cache.write().unwrap().leader_tpu_map = leader_tpu_map; + last_cluster_refresh = Instant::now(); + } + Err(err) => { + warn!("Failed to fetch cluster tpu sockets: {}", err); + sleep_ms = 100; + } } } - // Sleep a few slots before checking if leader cache needs to be refreshed again - std::thread::sleep(Duration::from_millis(sleep_ms)); - - let current_slot = recent_slots.estimated_current_slot(); - if current_slot + let estimated_current_slot = recent_slots.estimated_current_slot(); + if estimated_current_slot >= leader_tpu_cache .read() .unwrap() .last_slot() .saturating_sub(MAX_FANOUT_SLOTS) { - if let Ok(slot_leaders) = - LeaderTpuCache::fetch_slot_leaders(&rpc_client, current_slot) - { - let mut leader_tpu_cache = leader_tpu_cache.write().unwrap(); - leader_tpu_cache.first_slot = current_slot; - leader_tpu_cache.leaders = slot_leaders; - } else { - sleep_ms = 100; - continue; + match LeaderTpuCache::fetch_slot_leaders(&rpc_client, estimated_current_slot) { + Ok(slot_leaders) => { + let mut leader_tpu_cache = leader_tpu_cache.write().unwrap(); + leader_tpu_cache.first_slot = estimated_current_slot; + leader_tpu_cache.leaders = slot_leaders; + } + Err(err) => { + warn!( + "Failed to fetch slot leaders (current estimated slot: {}): {}", + estimated_current_slot, err + ); + sleep_ms = 100; + } } } - - sleep_ms = 1000; } } }