* client: Add retry logic on Pubsub 429s (#19990)
(cherry picked from commit e9b066d497
)
* Use exponential backoff for older version of tungstenite
Co-authored-by: Jon Cinque <jon.cinque@gmail.com>
This commit is contained in:
@ -22,7 +22,8 @@ use {
|
|||||||
mpsc::{channel, Receiver},
|
mpsc::{channel, Receiver},
|
||||||
Arc, RwLock,
|
Arc, RwLock,
|
||||||
},
|
},
|
||||||
thread::JoinHandle,
|
thread::{sleep, JoinHandle},
|
||||||
|
time::Duration,
|
||||||
},
|
},
|
||||||
thiserror::Error,
|
thiserror::Error,
|
||||||
tungstenite::{client::AutoStream, connect, Message, WebSocket},
|
tungstenite::{client::AutoStream, connect, Message, WebSocket},
|
||||||
@ -164,6 +165,28 @@ pub type SignatureSubscription = (
|
|||||||
|
|
||||||
pub struct PubsubClient {}
|
pub struct PubsubClient {}
|
||||||
|
|
||||||
|
fn connect_with_retry(url: Url) -> Result<WebSocket<AutoStream>, tungstenite::Error> {
|
||||||
|
let mut connection_retries = 5;
|
||||||
|
loop {
|
||||||
|
let result = connect(url.clone()).map(|(socket, _)| socket);
|
||||||
|
if let Err(tungstenite::Error::Http(status_code)) = &result {
|
||||||
|
if *status_code == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 {
|
||||||
|
let duration = Duration::from_millis(500) * 2u32.pow(5 - connection_retries);
|
||||||
|
|
||||||
|
connection_retries -= 1;
|
||||||
|
debug!(
|
||||||
|
"Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
|
||||||
|
status_code, connection_retries, duration
|
||||||
|
);
|
||||||
|
|
||||||
|
sleep(duration);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl PubsubClient {
|
impl PubsubClient {
|
||||||
pub fn logs_subscribe(
|
pub fn logs_subscribe(
|
||||||
url: &str,
|
url: &str,
|
||||||
@ -171,7 +194,7 @@ impl PubsubClient {
|
|||||||
config: RpcTransactionLogsConfig,
|
config: RpcTransactionLogsConfig,
|
||||||
) -> Result<LogsSubscription, PubsubClientError> {
|
) -> Result<LogsSubscription, PubsubClientError> {
|
||||||
let url = Url::parse(url)?;
|
let url = Url::parse(url)?;
|
||||||
let (socket, _response) = connect(url)?;
|
let socket = connect_with_retry(url)?;
|
||||||
let (sender, receiver) = channel();
|
let (sender, receiver) = channel();
|
||||||
|
|
||||||
let socket = Arc::new(RwLock::new(socket));
|
let socket = Arc::new(RwLock::new(socket));
|
||||||
@ -226,7 +249,7 @@ impl PubsubClient {
|
|||||||
|
|
||||||
pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
|
pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
|
||||||
let url = Url::parse(url)?;
|
let url = Url::parse(url)?;
|
||||||
let (socket, _response) = connect(url)?;
|
let socket = connect_with_retry(url)?;
|
||||||
let (sender, receiver) = channel::<SlotInfo>();
|
let (sender, receiver) = channel::<SlotInfo>();
|
||||||
|
|
||||||
let socket = Arc::new(RwLock::new(socket));
|
let socket = Arc::new(RwLock::new(socket));
|
||||||
@ -282,7 +305,7 @@ impl PubsubClient {
|
|||||||
config: Option<RpcSignatureSubscribeConfig>,
|
config: Option<RpcSignatureSubscribeConfig>,
|
||||||
) -> Result<SignatureSubscription, PubsubClientError> {
|
) -> Result<SignatureSubscription, PubsubClientError> {
|
||||||
let url = Url::parse(url)?;
|
let url = Url::parse(url)?;
|
||||||
let (socket, _response) = connect(url)?;
|
let socket = connect_with_retry(url)?;
|
||||||
let (sender, receiver) = channel();
|
let (sender, receiver) = channel();
|
||||||
|
|
||||||
let socket = Arc::new(RwLock::new(socket));
|
let socket = Arc::new(RwLock::new(socket));
|
||||||
@ -348,7 +371,7 @@ impl PubsubClient {
|
|||||||
handler: impl Fn(SlotUpdate) + Send + 'static,
|
handler: impl Fn(SlotUpdate) + Send + 'static,
|
||||||
) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
|
) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
|
||||||
let url = Url::parse(url)?;
|
let url = Url::parse(url)?;
|
||||||
let (socket, _response) = connect(url)?;
|
let socket = connect_with_retry(url)?;
|
||||||
|
|
||||||
let socket = Arc::new(RwLock::new(socket));
|
let socket = Arc::new(RwLock::new(socket));
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
Reference in New Issue
Block a user