RpcClient no longer panics in a tokio multi-threaded runtime
This commit is contained in:
		
							
								
								
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							@@ -4259,6 +4259,7 @@ dependencies = [
 | 
				
			|||||||
 "solana-version",
 | 
					 "solana-version",
 | 
				
			||||||
 "solana-vote-program",
 | 
					 "solana-vote-program",
 | 
				
			||||||
 "thiserror",
 | 
					 "thiserror",
 | 
				
			||||||
 | 
					 "tokio 1.1.1",
 | 
				
			||||||
 "tungstenite",
 | 
					 "tungstenite",
 | 
				
			||||||
 "url 2.2.0",
 | 
					 "url 2.2.0",
 | 
				
			||||||
]
 | 
					]
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -32,6 +32,7 @@ solana-transaction-status = { path = "../transaction-status", version = "=1.7.0"
 | 
				
			|||||||
solana-version = { path = "../version", version = "=1.7.0" }
 | 
					solana-version = { path = "../version", version = "=1.7.0" }
 | 
				
			||||||
solana-vote-program = { path = "../programs/vote", version = "=1.7.0" }
 | 
					solana-vote-program = { path = "../programs/vote", version = "=1.7.0" }
 | 
				
			||||||
thiserror = "1.0"
 | 
					thiserror = "1.0"
 | 
				
			||||||
 | 
					tokio = { version = "1", features = ["full"] }
 | 
				
			||||||
tungstenite = "0.10.1"
 | 
					tungstenite = "0.10.1"
 | 
				
			||||||
url = "2.1.1"
 | 
					url = "2.1.1"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -8,12 +8,20 @@ use {
 | 
				
			|||||||
    },
 | 
					    },
 | 
				
			||||||
    log::*,
 | 
					    log::*,
 | 
				
			||||||
    reqwest::{self, header::CONTENT_TYPE, StatusCode},
 | 
					    reqwest::{self, header::CONTENT_TYPE, StatusCode},
 | 
				
			||||||
    std::{thread::sleep, time::Duration},
 | 
					    std::{
 | 
				
			||||||
 | 
					        sync::{
 | 
				
			||||||
 | 
					            atomic::{AtomicU64, Ordering},
 | 
				
			||||||
 | 
					            Arc,
 | 
				
			||||||
 | 
					        },
 | 
				
			||||||
 | 
					        thread::sleep,
 | 
				
			||||||
 | 
					        time::Duration,
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub struct HttpSender {
 | 
					pub struct HttpSender {
 | 
				
			||||||
    client: reqwest::blocking::Client,
 | 
					    client: Arc<reqwest::blocking::Client>,
 | 
				
			||||||
    url: String,
 | 
					    url: String,
 | 
				
			||||||
 | 
					    request_id: AtomicU64,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl HttpSender {
 | 
					impl HttpSender {
 | 
				
			||||||
@@ -22,12 +30,22 @@ impl HttpSender {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn new_with_timeout(url: String, timeout: Duration) -> Self {
 | 
					    pub fn new_with_timeout(url: String, timeout: Duration) -> Self {
 | 
				
			||||||
        let client = reqwest::blocking::Client::builder()
 | 
					        // `reqwest::blocking::Client` panics if run in a tokio async context.  Shuttle the
 | 
				
			||||||
 | 
					        // request to a different tokio thread to avoid this
 | 
				
			||||||
 | 
					        let client = Arc::new(
 | 
				
			||||||
 | 
					            tokio::task::block_in_place(move || {
 | 
				
			||||||
 | 
					                reqwest::blocking::Client::builder()
 | 
				
			||||||
                    .timeout(timeout)
 | 
					                    .timeout(timeout)
 | 
				
			||||||
                    .build()
 | 
					                    .build()
 | 
				
			||||||
            .expect("build rpc client");
 | 
					            })
 | 
				
			||||||
 | 
					            .expect("build rpc client"),
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        Self { client, url }
 | 
					        Self {
 | 
				
			||||||
 | 
					            client,
 | 
				
			||||||
 | 
					            url,
 | 
				
			||||||
 | 
					            request_id: AtomicU64::new(0),
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -40,20 +58,26 @@ struct RpcErrorObject {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
impl RpcSender for HttpSender {
 | 
					impl RpcSender for HttpSender {
 | 
				
			||||||
    fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> {
 | 
					    fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> {
 | 
				
			||||||
        // Concurrent requests are not supported so reuse the same request id for all requests
 | 
					        let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);
 | 
				
			||||||
        let request_id = 1;
 | 
					        let request_json = request.build_request_json(request_id, params).to_string();
 | 
				
			||||||
 | 
					 | 
				
			||||||
        let request_json = request.build_request_json(request_id, params);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let mut too_many_requests_retries = 5;
 | 
					        let mut too_many_requests_retries = 5;
 | 
				
			||||||
        loop {
 | 
					        loop {
 | 
				
			||||||
            match self
 | 
					            // `reqwest::blocking::Client` panics if run in a tokio async context.  Shuttle the
 | 
				
			||||||
                .client
 | 
					            // request to a different tokio thread to avoid this
 | 
				
			||||||
 | 
					            let response = {
 | 
				
			||||||
 | 
					                let client = self.client.clone();
 | 
				
			||||||
 | 
					                let request_json = request_json.clone();
 | 
				
			||||||
 | 
					                tokio::task::block_in_place(move || {
 | 
				
			||||||
 | 
					                    client
 | 
				
			||||||
                        .post(&self.url)
 | 
					                        .post(&self.url)
 | 
				
			||||||
                        .header(CONTENT_TYPE, "application/json")
 | 
					                        .header(CONTENT_TYPE, "application/json")
 | 
				
			||||||
                .body(request_json.to_string())
 | 
					                        .body(request_json)
 | 
				
			||||||
                        .send()
 | 
					                        .send()
 | 
				
			||||||
            {
 | 
					                })
 | 
				
			||||||
 | 
					            };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            match response {
 | 
				
			||||||
                Ok(response) => {
 | 
					                Ok(response) => {
 | 
				
			||||||
                    if !response.status().is_success() {
 | 
					                    if !response.status().is_success() {
 | 
				
			||||||
                        if response.status() == StatusCode::TOO_MANY_REQUESTS
 | 
					                        if response.status() == StatusCode::TOO_MANY_REQUESTS
 | 
				
			||||||
@@ -72,7 +96,9 @@ impl RpcSender for HttpSender {
 | 
				
			|||||||
                        return Err(response.error_for_status().unwrap_err().into());
 | 
					                        return Err(response.error_for_status().unwrap_err().into());
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    let json: serde_json::Value = serde_json::from_str(&response.text()?)?;
 | 
					                    let response_text = tokio::task::block_in_place(move || response.text())?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    let json: serde_json::Value = serde_json::from_str(&response_text)?;
 | 
				
			||||||
                    if json["error"].is_object() {
 | 
					                    if json["error"].is_object() {
 | 
				
			||||||
                        return match serde_json::from_value::<RpcErrorObject>(json["error"].clone())
 | 
					                        return match serde_json::from_value::<RpcErrorObject>(json["error"].clone())
 | 
				
			||||||
                        {
 | 
					                        {
 | 
				
			||||||
@@ -122,3 +148,22 @@ impl RpcSender for HttpSender {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[cfg(test)]
 | 
				
			||||||
 | 
					mod tests {
 | 
				
			||||||
 | 
					    use super::*;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #[tokio::test(flavor = "multi_thread")]
 | 
				
			||||||
 | 
					    async fn http_sender_on_tokio_multi_thread() {
 | 
				
			||||||
 | 
					        let http_sender = HttpSender::new("http://localhost:1234".to_string());
 | 
				
			||||||
 | 
					        let _ = http_sender.send(RpcRequest::GetVersion, serde_json::Value::Null);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #[tokio::test(flavor = "current_thread")]
 | 
				
			||||||
 | 
					    #[should_panic(expected = "can call blocking only when running on the multi-threaded runtime")]
 | 
				
			||||||
 | 
					    async fn http_sender_ontokio_current_thread_should_panic() {
 | 
				
			||||||
 | 
					        // RpcClient::new() will panic in the tokio current-thread runtime due to `tokio::task::block_in_place()` usage, and there
 | 
				
			||||||
 | 
					        // doesn't seem to be a way to detect whether the tokio runtime is multi_thread or current_thread...
 | 
				
			||||||
 | 
					        let _ = HttpSender::new("http://localhost:1234".to_string());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1693,6 +1693,21 @@ mod tests {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    #[test]
 | 
					    #[test]
 | 
				
			||||||
    fn test_send() {
 | 
					    fn test_send() {
 | 
				
			||||||
 | 
					        _test_send();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #[tokio::test(flavor = "current_thread")]
 | 
				
			||||||
 | 
					    #[should_panic(expected = "can call blocking only when running on the multi-threaded runtime")]
 | 
				
			||||||
 | 
					    async fn test_send_async_current_thread_should_panic() {
 | 
				
			||||||
 | 
					        _test_send();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #[tokio::test(flavor = "multi_thread")]
 | 
				
			||||||
 | 
					    async fn test_send_async_multi_thread() {
 | 
				
			||||||
 | 
					        _test_send();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn _test_send() {
 | 
				
			||||||
        let (sender, receiver) = channel();
 | 
					        let (sender, receiver) = channel();
 | 
				
			||||||
        thread::spawn(move || {
 | 
					        thread::spawn(move || {
 | 
				
			||||||
            let rpc_addr = "0.0.0.0:0".parse().unwrap();
 | 
					            let rpc_addr = "0.0.0.0:0".parse().unwrap();
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										1
									
								
								programs/bpf/Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1
									
								
								programs/bpf/Cargo.lock
									
									
									
										generated
									
									
									
								
							@@ -2946,6 +2946,7 @@ dependencies = [
 | 
				
			|||||||
 "solana-version",
 | 
					 "solana-version",
 | 
				
			||||||
 "solana-vote-program",
 | 
					 "solana-vote-program",
 | 
				
			||||||
 "thiserror",
 | 
					 "thiserror",
 | 
				
			||||||
 | 
					 "tokio 1.1.1",
 | 
				
			||||||
 "tungstenite",
 | 
					 "tungstenite",
 | 
				
			||||||
 "url",
 | 
					 "url",
 | 
				
			||||||
]
 | 
					]
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user