RpcClient no longer panics in a tokio multi-threaded runtime (#16393)

(cherry picked from commit a4f0d8636a)

Co-authored-by: Michael Vines <mvines@gmail.com>
This commit is contained in:
mergify[bot]
2021-04-14 03:17:33 +00:00
committed by GitHub
parent cdc10712b1
commit 31ed985fd0
5 changed files with 82 additions and 19 deletions

1
Cargo.lock generated
View File

@ -4260,6 +4260,7 @@ dependencies = [
"solana-version", "solana-version",
"solana-vote-program", "solana-vote-program",
"thiserror", "thiserror",
"tokio 1.1.1",
"tungstenite", "tungstenite",
"url 2.2.0", "url 2.2.0",
] ]

View File

@ -33,6 +33,7 @@ solana-transaction-status = { path = "../transaction-status", version = "=1.6.5"
solana-version = { path = "../version", version = "=1.6.5" } solana-version = { path = "../version", version = "=1.6.5" }
solana-vote-program = { path = "../programs/vote", version = "=1.6.5" } solana-vote-program = { path = "../programs/vote", version = "=1.6.5" }
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "1", features = ["full"] }
tungstenite = "0.10.1" tungstenite = "0.10.1"
url = "2.1.1" url = "2.1.1"

View File

@ -8,12 +8,20 @@ use {
}, },
log::*, log::*,
reqwest::{self, header::CONTENT_TYPE, StatusCode}, reqwest::{self, header::CONTENT_TYPE, StatusCode},
std::{thread::sleep, time::Duration}, std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
thread::sleep,
time::Duration,
},
}; };
pub struct HttpSender { pub struct HttpSender {
client: reqwest::blocking::Client, client: Arc<reqwest::blocking::Client>,
url: String, url: String,
request_id: AtomicU64,
} }
impl HttpSender { impl HttpSender {
@ -22,12 +30,22 @@ impl HttpSender {
} }
pub fn new_with_timeout(url: String, timeout: Duration) -> Self { pub fn new_with_timeout(url: String, timeout: Duration) -> Self {
let client = reqwest::blocking::Client::builder() // `reqwest::blocking::Client` panics if run in a tokio async context. Shuttle the
.timeout(timeout) // request to a different tokio thread to avoid this
.build() let client = Arc::new(
.expect("build rpc client"); tokio::task::block_in_place(move || {
reqwest::blocking::Client::builder()
.timeout(timeout)
.build()
})
.expect("build rpc client"),
);
Self { client, url } Self {
client,
url,
request_id: AtomicU64::new(0),
}
} }
} }
@ -40,20 +58,26 @@ struct RpcErrorObject {
impl RpcSender for HttpSender { impl RpcSender for HttpSender {
fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> { fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> {
// Concurrent requests are not supported so reuse the same request id for all requests let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);
let request_id = 1; let request_json = request.build_request_json(request_id, params).to_string();
let request_json = request.build_request_json(request_id, params);
let mut too_many_requests_retries = 5; let mut too_many_requests_retries = 5;
loop { loop {
match self // `reqwest::blocking::Client` panics if run in a tokio async context. Shuttle the
.client // request to a different tokio thread to avoid this
.post(&self.url) let response = {
.header(CONTENT_TYPE, "application/json") let client = self.client.clone();
.body(request_json.to_string()) let request_json = request_json.clone();
.send() tokio::task::block_in_place(move || {
{ client
.post(&self.url)
.header(CONTENT_TYPE, "application/json")
.body(request_json)
.send()
})
};
match response {
Ok(response) => { Ok(response) => {
if !response.status().is_success() { if !response.status().is_success() {
if response.status() == StatusCode::TOO_MANY_REQUESTS if response.status() == StatusCode::TOO_MANY_REQUESTS
@ -72,7 +96,9 @@ impl RpcSender for HttpSender {
return Err(response.error_for_status().unwrap_err().into()); return Err(response.error_for_status().unwrap_err().into());
} }
let json: serde_json::Value = serde_json::from_str(&response.text()?)?; let response_text = tokio::task::block_in_place(move || response.text())?;
let json: serde_json::Value = serde_json::from_str(&response_text)?;
if json["error"].is_object() { if json["error"].is_object() {
return match serde_json::from_value::<RpcErrorObject>(json["error"].clone()) return match serde_json::from_value::<RpcErrorObject>(json["error"].clone())
{ {
@ -122,3 +148,22 @@ impl RpcSender for HttpSender {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn http_sender_on_tokio_multi_thread() {
let http_sender = HttpSender::new("http://localhost:1234".to_string());
let _ = http_sender.send(RpcRequest::GetVersion, serde_json::Value::Null);
}
#[tokio::test(flavor = "current_thread")]
#[should_panic(expected = "can call blocking only when running on the multi-threaded runtime")]
async fn http_sender_ontokio_current_thread_should_panic() {
// RpcClient::new() will panic in the tokio current-thread runtime due to `tokio::task::block_in_place()` usage, and there
// doesn't seem to be a way to detect whether the tokio runtime is multi_thread or current_thread...
let _ = HttpSender::new("http://localhost:1234".to_string());
}
}

View File

@ -1726,6 +1726,21 @@ mod tests {
#[test] #[test]
fn test_send() { fn test_send() {
_test_send();
}
#[tokio::test(flavor = "current_thread")]
#[should_panic(expected = "can call blocking only when running on the multi-threaded runtime")]
async fn test_send_async_current_thread_should_panic() {
_test_send();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_send_async_multi_thread() {
_test_send();
}
fn _test_send() {
let (sender, receiver) = channel(); let (sender, receiver) = channel();
thread::spawn(move || { thread::spawn(move || {
let rpc_addr = "0.0.0.0:0".parse().unwrap(); let rpc_addr = "0.0.0.0:0".parse().unwrap();

View File

@ -3014,6 +3014,7 @@ dependencies = [
"solana-version", "solana-version",
"solana-vote-program", "solana-vote-program",
"thiserror", "thiserror",
"tokio 1.1.1",
"tungstenite", "tungstenite",
"url", "url",
] ]