Upgrade jsonrpc crates to v17.0.0 (#15018)

* Upgrade to jsonrpc 17.0.0

* Fix test

* tree

Co-authored-by: Michael Vines <mvines@gmail.com>
This commit is contained in:
Tyera Eulberg
2021-02-02 19:53:08 -07:00
committed by GitHub
parent a1b9e00c14
commit 98aa1fa4ea
10 changed files with 285 additions and 408 deletions

View File

@ -2964,9 +2964,7 @@ pub mod tests {
rpc_subscriptions::RpcSubscriptions,
};
use bincode::deserialize;
use jsonrpc_core::{
futures::future::Future, ErrorCode, MetaIoHandler, Output, Response, Value,
};
use jsonrpc_core::{futures, ErrorCode, MetaIoHandler, Output, Response, Value};
use jsonrpc_core_client::transports::local;
use solana_client::rpc_filter::{Memcmp, MemcmpEncodedBytes};
use solana_ledger::{
@ -3231,15 +3229,23 @@ pub mod tests {
let mut io = MetaIoHandler::default();
io.extend_with(RpcSolImpl.to_delegate());
let fut = {
let (client, server) =
local::connect_with_metadata::<gen_client::Client, _, _>(&io, meta);
async fn use_client(client: gen_client::Client, mint_pubkey: Pubkey) -> u64 {
client
.get_balance(mint_pubkey.to_string(), None)
.join(server)
.await
.unwrap()
.value
}
let fut = async {
let (client, server) =
local::connect_with_metadata::<gen_client::Client, _, _>(&io, meta);
let client = use_client(client, mint_pubkey);
futures::join!(client, server)
};
let (response, _) = fut.wait().unwrap();
assert_eq!(response.value, 20);
let (response, _) = futures::executor::block_on(fut);
assert_eq!(response, 20);
}
#[test]

View File

@ -488,7 +488,7 @@ mod tests {
rpc_subscriptions::tests::robust_poll_or_panic,
};
use crossbeam_channel::unbounded;
use jsonrpc_core::{futures::sync::mpsc, Response};
use jsonrpc_core::{futures::channel::mpsc, Response};
use jsonrpc_pubsub::{PubSubHandler, Session};
use serial_test::serial;
use solana_account_decoder::{parse_account_data::parse_account_data, UiAccountEncoding};
@ -543,7 +543,7 @@ mod tests {
}
fn create_session() -> Arc<Session> {
Arc::new(Session::new(mpsc::channel(1).0))
Arc::new(Session::new(mpsc::unbounded().0))
}
#[test]

View File

@ -10,7 +10,7 @@ use crate::{
send_transaction_service::{LeaderInfo, SendTransactionService},
validator::ValidatorExit,
};
use jsonrpc_core::MetaIoHandler;
use jsonrpc_core::{futures::prelude::*, MetaIoHandler};
use jsonrpc_http_server::{
hyper, AccessControlAllowOrigin, CloseHandle, DomainsValidation, RequestMiddleware,
RequestMiddlewareAction, ServerBuilder,
@ -32,7 +32,8 @@ use std::{
sync::{mpsc::channel, Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
};
use tokio::runtime;
use tokio::{self, runtime};
use tokio_util::codec::{BytesCodec, FramedRead};
pub struct JsonRpcService {
thread_hdl: JoinHandle<()>,
@ -108,9 +109,6 @@ impl RpcRequestMiddleware {
}
fn process_file_get(&self, path: &str) -> RequestMiddlewareAction {
// Stuck on tokio 0.1 until the jsonrpc-http-server crate upgrades to tokio 0.2
use tokio_01::prelude::*;
let stem = path.split_at(1).1; // Drop leading '/' from path
let filename = {
match path {
@ -134,25 +132,23 @@ impl RpcRequestMiddleware {
.unwrap_or(0)
.to_string();
info!("get {} -> {:?} ({} bytes)", path, filename, file_length);
RequestMiddlewareAction::Respond {
should_validate_hosts: true,
response: Box::new(
tokio_fs_01::file::File::open(filename)
.and_then(|file| {
use tokio_codec_01::{BytesCodec, FramedRead};
let stream = FramedRead::new(file, BytesCodec::new())
.map(tokio_01_bytes::BytesMut::freeze);
response: Box::pin(async {
match tokio::fs::File::open(filename).await {
Err(_) => Ok(Self::internal_server_error()),
Ok(file) => {
let stream =
FramedRead::new(file, BytesCodec::new()).map_ok(|b| b.freeze());
let body = hyper::Body::wrap_stream(stream);
Ok(hyper::Response::builder()
.header(hyper::header::CONTENT_LENGTH, file_length)
.body(body)
.unwrap())
})
.or_else(|_| Ok(RpcRequestMiddleware::not_found())),
),
}
}
}),
}
}
@ -173,57 +169,41 @@ impl RequestMiddleware for RpcRequestMiddleware {
if let Some(ref snapshot_config) = self.snapshot_config {
if request.uri().path() == "/snapshot.tar.bz2" {
// Convenience redirect to the latest snapshot
return RequestMiddlewareAction::Respond {
should_validate_hosts: true,
response: Box::new(jsonrpc_core::futures::future::ok(
if let Some((snapshot_archive, _)) =
snapshot_utils::get_highest_snapshot_archive_path(
&snapshot_config.snapshot_package_output_path,
)
{
RpcRequestMiddleware::redirect(&format!(
"/{}",
snapshot_archive
.file_name()
.unwrap_or_else(|| std::ffi::OsStr::new(""))
.to_str()
.unwrap_or(&"")
))
} else {
RpcRequestMiddleware::not_found()
},
)),
};
return if let Some((snapshot_archive, _)) =
snapshot_utils::get_highest_snapshot_archive_path(
&snapshot_config.snapshot_package_output_path,
) {
RpcRequestMiddleware::redirect(&format!(
"/{}",
snapshot_archive
.file_name()
.unwrap_or_else(|| std::ffi::OsStr::new(""))
.to_str()
.unwrap_or(&"")
))
} else {
RpcRequestMiddleware::not_found()
}
.into();
}
}
if let Some(result) = process_rest(&self.bank_forks, request.uri().path()) {
RequestMiddlewareAction::Respond {
should_validate_hosts: true,
response: Box::new(jsonrpc_core::futures::future::ok(
hyper::Response::builder()
.status(hyper::StatusCode::OK)
.body(hyper::Body::from(result))
.unwrap(),
)),
}
hyper::Response::builder()
.status(hyper::StatusCode::OK)
.body(hyper::Body::from(result))
.unwrap()
.into()
} else if self.is_file_get_path(request.uri().path()) {
self.process_file_get(request.uri().path())
} else if request.uri().path() == "/health" {
RequestMiddlewareAction::Respond {
should_validate_hosts: true,
response: Box::new(jsonrpc_core::futures::future::ok(
hyper::Response::builder()
.status(hyper::StatusCode::OK)
.body(hyper::Body::from(self.health_check()))
.unwrap(),
)),
}
hyper::Response::builder()
.status(hyper::StatusCode::OK)
.body(hyper::Body::from(self.health_check()))
.unwrap()
.into()
} else {
RequestMiddlewareAction::Proceed {
should_continue_on_invalid_cors: false,
request,
}
request.into()
}
}
}
@ -366,9 +346,11 @@ impl JsonRpcService {
// so that we avoid the single-threaded event loops from being created automatically by
// jsonrpc for threads when .threads(N > 1) is given.
let event_loop = {
tokio_01::runtime::Builder::new()
runtime::Builder::new()
.core_threads(rpc_threads)
.name_prefix("sol-rpc-el")
.threaded_scheduler()
.enable_all()
.thread_name("sol-rpc-el")
.build()
.unwrap()
};
@ -391,7 +373,7 @@ impl JsonRpcService {
io,
move |_req: &hyper::Request<hyper::Body>| request_processor.clone(),
)
.event_loop_executor(event_loop.executor())
.event_loop_executor(event_loop.handle().clone())
.threads(1)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Any,

View File

@ -5,7 +5,6 @@ use crate::{
rpc::{get_parsed_token_account, get_parsed_token_accounts},
};
use core::hash::Hash;
use jsonrpc_core::futures::Future;
use jsonrpc_pubsub::{
typed::{Sink, Subscriber},
SubscriptionId,
@ -49,9 +48,6 @@ use std::{
time::Duration,
};
// Stuck on tokio 0.1 until the jsonrpc-pubsub crate upgrades to tokio 0.2
use tokio_01::runtime::{Builder as RuntimeBuilder, Runtime, TaskExecutor};
const RECEIVE_DELAY_MILLIS: u64 = 100;
trait BankGetTransactionLogsAdapter {
@ -262,15 +258,14 @@ where
notified_set
}
struct RpcNotifier(TaskExecutor);
struct RpcNotifier;
impl RpcNotifier {
fn notify<T>(&self, value: T, sink: &Sink<T>)
where
T: serde::Serialize,
{
self.0
.spawn(sink.notify(Ok(value)).map(|_| ()).map_err(|_| ()));
let _ = sink.notify(Ok(value));
}
}
@ -418,7 +413,6 @@ pub struct RpcSubscriptions {
subscriptions: Subscriptions,
notification_sender: Arc<Mutex<Sender<NotificationEntry>>>,
t_cleanup: Option<JoinHandle<()>>,
notifier_runtime: Option<Runtime>,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
@ -493,13 +487,7 @@ impl RpcSubscriptions {
};
let _subscriptions = subscriptions.clone();
let notifier_runtime = RuntimeBuilder::new()
.core_threads(1)
.name_prefix("solana-rpc-notifier-")
.build()
.unwrap();
let notifier = RpcNotifier(notifier_runtime.executor());
let notifier = RpcNotifier {};
let t_cleanup = Builder::new()
.name("solana-rpc-notifications".to_string())
.spawn(move || {
@ -516,7 +504,6 @@ impl RpcSubscriptions {
Self {
subscriptions,
notification_sender,
notifier_runtime: Some(notifier_runtime),
t_cleanup: Some(t_cleanup),
bank_forks,
block_commitment_cache,
@ -1269,12 +1256,6 @@ impl RpcSubscriptions {
}
fn shutdown(&mut self) -> std::thread::Result<()> {
if let Some(runtime) = self.notifier_runtime.take() {
info!("RPC Notifier runtime - shutting down");
let _ = runtime.shutdown_now().wait();
info!("RPC Notifier runtime - shut down");
}
if self.t_cleanup.is_some() {
info!("RPC Notification thread - shutting down");
self.exit.store(true, Ordering::Relaxed);
@ -1294,7 +1275,7 @@ pub(crate) mod tests {
use crate::optimistically_confirmed_bank_tracker::{
BankNotification, OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
};
use jsonrpc_core::futures::{self, stream::Stream};
use jsonrpc_core::futures::StreamExt;
use jsonrpc_pubsub::typed::Subscriber;
use serial_test::serial;
use solana_runtime::{
@ -1307,31 +1288,37 @@ pub(crate) mod tests {
system_instruction, system_program, system_transaction,
transaction::Transaction,
};
use std::{fmt::Debug, sync::mpsc::channel, time::Instant};
use tokio_01::{prelude::FutureExt, runtime::Runtime, timer::Delay};
use std::{fmt::Debug, sync::mpsc::channel};
use tokio::{
runtime::Runtime,
time::{delay_for, timeout},
};
pub(crate) fn robust_poll_or_panic<T: Debug + Send + 'static>(
receiver: futures::sync::mpsc::Receiver<T>,
) -> (T, futures::sync::mpsc::Receiver<T>) {
receiver: jsonrpc_core::futures::channel::mpsc::UnboundedReceiver<T>,
) -> (
T,
jsonrpc_core::futures::channel::mpsc::UnboundedReceiver<T>,
) {
let (inner_sender, inner_receiver) = channel();
let mut rt = Runtime::new().unwrap();
rt.spawn(futures::lazy(|| {
let recv_timeout = receiver
.into_future()
.timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS))
.map(move |result| match result {
(Some(value), receiver) => {
inner_sender.send((value, receiver)).expect("send error")
}
(None, _) => panic!("unexpected end of stream"),
})
.map_err(|err| panic!("stream error {:?}", err));
let rt = Runtime::new().unwrap();
rt.spawn(async move {
let result = timeout(
Duration::from_millis(RECEIVE_DELAY_MILLIS),
receiver.into_future(),
)
.await
.unwrap_or_else(|err| panic!("stream error {:?}", err));
const INITIAL_DELAY_MS: u64 = RECEIVE_DELAY_MILLIS * 2;
Delay::new(Instant::now() + Duration::from_millis(INITIAL_DELAY_MS))
.and_then(|_| recv_timeout)
.map_err(|err| panic!("timer error {:?}", err))
}));
match result {
(Some(value), receiver) => {
inner_sender.send((value, receiver)).expect("send error")
}
(None, _) => panic!("unexpected end of stream"),
}
delay_for(Duration::from_millis(RECEIVE_DELAY_MILLIS * 2)).await;
});
inner_receiver.recv().expect("recv error")
}