From 6b24dd1c6ac20426e9a2917be7fbf9739c5ca76f Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 20 Sep 2021 12:13:06 +0000 Subject: [PATCH] client: Add retry logic on Pubsub 429 error during connect (backport #19990) (#20002) * client: Add retry logic on Pubsub 429s (#19990) (cherry picked from commit e9b066d4973960b073a1b9c3386e35bd85df43a6) * Use exponential backoff for older version of tungstenite Co-authored-by: Jon Cinque --- client/src/pubsub_client.rs | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index d3dd63bcc3..725ee901f0 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -22,7 +22,8 @@ use { mpsc::{channel, Receiver}, Arc, RwLock, }, - thread::JoinHandle, + thread::{sleep, JoinHandle}, + time::Duration, }, thiserror::Error, tungstenite::{client::AutoStream, connect, Message, WebSocket}, @@ -164,6 +165,28 @@ pub type SignatureSubscription = ( pub struct PubsubClient {} +fn connect_with_retry(url: Url) -> Result, tungstenite::Error> { + let mut connection_retries = 5; + loop { + let result = connect(url.clone()).map(|(socket, _)| socket); + if let Err(tungstenite::Error::Http(status_code)) = &result { + if *status_code == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 { + let duration = Duration::from_millis(500) * 2u32.pow(5 - connection_retries); + + connection_retries -= 1; + debug!( + "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}", + status_code, connection_retries, duration + ); + + sleep(duration); + continue; + } + } + return result; + } +} + impl PubsubClient { pub fn logs_subscribe( url: &str, @@ -171,7 +194,7 @@ impl PubsubClient { config: RpcTransactionLogsConfig, ) -> Result { let url = Url::parse(url)?; - let (socket, _response) = connect(url)?; + let socket = connect_with_retry(url)?; let (sender, receiver) = channel(); let socket = Arc::new(RwLock::new(socket)); @@ -226,7 +249,7 @@ impl PubsubClient { pub fn slot_subscribe(url: &str) -> Result { let url = Url::parse(url)?; - let (socket, _response) = connect(url)?; + let socket = connect_with_retry(url)?; let (sender, receiver) = channel::(); let socket = Arc::new(RwLock::new(socket)); @@ -282,7 +305,7 @@ impl PubsubClient { config: Option, ) -> Result { let url = Url::parse(url)?; - let (socket, _response) = connect(url)?; + let socket = connect_with_retry(url)?; let (sender, receiver) = channel(); let socket = Arc::new(RwLock::new(socket)); @@ -348,7 +371,7 @@ impl PubsubClient { handler: impl Fn(SlotUpdate) + Send + 'static, ) -> Result, PubsubClientError> { let url = Url::parse(url)?; - let (socket, _response) = connect(url)?; + let socket = connect_with_retry(url)?; let socket = Arc::new(RwLock::new(socket)); let exit = Arc::new(AtomicBool::new(false));