diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index 696d8e30ca..a64a4c81a8 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -23,7 +23,8 @@ use { mpsc::{channel, Receiver}, Arc, RwLock, }, - thread::JoinHandle, + thread::{sleep, JoinHandle}, + time::Duration, }, thiserror::Error, tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket}, @@ -165,6 +166,40 @@ pub type SignatureSubscription = ( pub struct PubsubClient {} +fn connect_with_retry( + url: Url, +) -> Result>, tungstenite::Error> { + let mut connection_retries = 5; + loop { + let result = connect(url.clone()).map(|(socket, _)| socket); + if let Err(tungstenite::Error::Http(response)) = &result { + if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 + { + let mut duration = Duration::from_millis(500); + if let Some(retry_after) = response.headers().get(reqwest::header::RETRY_AFTER) { + if let Ok(retry_after) = retry_after.to_str() { + if let Ok(retry_after) = retry_after.parse::() { + if retry_after < 120 { + duration = Duration::from_secs(retry_after); + } + } + } + } + + connection_retries -= 1; + debug!( + "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}", + response, connection_retries, duration + ); + + sleep(duration); + continue; + } + } + return result; + } +} + impl PubsubClient { pub fn logs_subscribe( url: &str, @@ -172,7 +207,7 @@ impl PubsubClient { config: RpcTransactionLogsConfig, ) -> Result { let url = Url::parse(url)?; - let (socket, _response) = connect(url)?; + let socket = connect_with_retry(url)?; let (sender, receiver) = channel(); let socket = Arc::new(RwLock::new(socket)); @@ -227,7 +262,7 @@ impl PubsubClient { pub fn slot_subscribe(url: &str) -> Result { let url = Url::parse(url)?; - let (socket, _response) = connect(url)?; + let socket = connect_with_retry(url)?; let (sender, receiver) = channel::(); let socket = Arc::new(RwLock::new(socket)); @@ -283,7 +318,7 @@ impl PubsubClient { config: Option, ) -> Result { let url = Url::parse(url)?; - let (socket, _response) = connect(url)?; + let socket = connect_with_retry(url)?; let (sender, receiver) = channel(); let socket = Arc::new(RwLock::new(socket)); @@ -349,7 +384,7 @@ impl PubsubClient { handler: impl Fn(SlotUpdate) + Send + 'static, ) -> Result, PubsubClientError> { let url = Url::parse(url)?; - let (socket, _response) = connect(url)?; + let socket = connect_with_retry(url)?; let socket = Arc::new(RwLock::new(socket)); let exit = Arc::new(AtomicBool::new(false));