From 98aa1fa4ea7d737cb02fad88d74fb958b2fe8f7d Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 2 Feb 2021 19:53:08 -0700 Subject: [PATCH] Upgrade jsonrpc crates to v17.0.0 (#15018) * Upgrade to jsonrpc 17.0.0 * Fix test * tree Co-authored-by: Michael Vines --- Cargo.lock | 227 +++++++++++----------------------- client/Cargo.toml | 5 +- client/src/rpc_client.rs | 8 +- core/Cargo.toml | 18 ++- core/src/rpc.rs | 24 ++-- core/src/rpc_pubsub.rs | 4 +- core/src/rpc_service.rs | 106 +++++++--------- core/src/rpc_subscriptions.rs | 75 +++++------ core/tests/rpc.rs | 105 +++++++--------- programs/bpf/Cargo.lock | 121 +++++++++--------- 10 files changed, 285 insertions(+), 408 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f55f9ccaf..86a292c765 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -890,6 +890,17 @@ dependencies = [ "syn 1.0.48", ] +[[package]] +name = "derive_more" +version = "0.99.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb0e6161ad61ed084a36ba71fbba9e3ac5aee3606fb607fe08da6acbcf3d8c" +dependencies = [ + "proc-macro2 1.0.24", + "quote 1.0.6", + "syn 1.0.48", +] + [[package]] name = "dialoguer" version = "0.6.2" @@ -1251,16 +1262,6 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748" -[[package]] -name = "futures-cpupool" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" -dependencies = [ - "futures 0.1.29", - "num_cpus", -] - [[package]] name = "futures-executor" version = "0.3.8" @@ -1270,6 +1271,7 @@ dependencies = [ "futures-core", "futures-task", "futures-util", + "num_cpus", ] [[package]] @@ -1311,6 +1313,7 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2" dependencies = [ + "futures 0.1.29", "futures-channel", "futures-core", "futures-io", @@ -1442,24 +1445,6 @@ dependencies = [ "scroll", ] -[[package]] -name = "h2" -version = "0.1.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5b34c246847f938a410a03c5458c7fee2274436675e76d8b903c08efc29c462" -dependencies = [ - "byteorder", - "bytes 0.4.12", - "fnv", - "futures 0.1.29", - "http 0.1.21", - "indexmap", - "log 0.4.11", - "slab", - "string", - "tokio-io", -] - [[package]] name = "h2" version = "0.2.5" @@ -1471,7 +1456,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 0.2.1", + "http", "indexmap", "log 0.4.11", "slab", @@ -1575,17 +1560,6 @@ dependencies = [ "hmac 0.7.1", ] -[[package]] -name = "http" -version = "0.1.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6ccf5ede3a895d8856620237b2f02972c1bbc78d2965ad7fe8838d4a0ed41f0" -dependencies = [ - "bytes 0.4.12", - "fnv", - "itoa", -] - [[package]] name = "http" version = "0.2.1" @@ -1597,18 +1571,6 @@ dependencies = [ "itoa", ] -[[package]] -name = "http-body" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6741c859c1b2463a423a1dbce98d418e6c3c3fc720fb0d45528657320920292d" -dependencies = [ - "bytes 0.4.12", - "futures 0.1.29", - "http 0.1.21", - "tokio-buf", -] - [[package]] name = "http-body" version = "0.3.1" @@ -1616,7 +1578,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" dependencies = [ "bytes 0.5.4", - "http 0.2.1", + "http", ] [[package]] @@ -1659,36 +1621,6 @@ dependencies = [ "url 1.7.2", ] -[[package]] -name = "hyper" -version = "0.12.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dbe6ed1438e1f8ad955a4701e9a944938e9519f6888d12d8558b645e247d5f6" -dependencies = [ - "bytes 0.4.12", - "futures 0.1.29", - "futures-cpupool", - "h2 0.1.26", - "http 0.1.21", - "http-body 0.1.0", - "httparse", - "iovec", - "itoa", - "log 0.4.11", - "net2", - "rustc_version", - "time 0.1.43", - "tokio 0.1.22", - "tokio-buf", - "tokio-executor", - "tokio-io", - "tokio-reactor", - "tokio-tcp", - "tokio-threadpool", - "tokio-timer", - "want 0.2.0", -] - [[package]] name = "hyper" version = "0.13.5" @@ -1699,9 +1631,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.2.5", - "http 0.2.1", - "http-body 0.3.1", + "h2", + "http", + "http-body", "httparse", "itoa", "log 0.4.11", @@ -1710,7 +1642,7 @@ dependencies = [ "time 0.1.43", "tokio 0.2.22", "tower-service", - "want 0.3.0", + "want", ] [[package]] @@ -1905,29 +1837,29 @@ dependencies = [ [[package]] name = "jsonrpc-client-transports" -version = "15.1.0" +version = "17.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "489b9c612e60c766f751ab40fcb43cbb55a1e10bb44a9b4307ed510ca598cbd7" +checksum = "15b6c6ad01c7354d60de493148c30ac8a82b759e22ae678c8705e9b8e0c566a4" dependencies = [ - "failure", - "futures 0.1.29", + "derive_more", + "futures 0.3.8", "jsonrpc-core", "jsonrpc-pubsub", "log 0.4.11", "serde", "serde_json", - "tokio 0.1.22", + "tokio 0.2.22", "url 1.7.2", "websocket", ] [[package]] name = "jsonrpc-core" -version = "15.1.0" +version = "17.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0745a6379e3edc893c84ec203589790774e4247420033e71a76d3ab4687991fa" +checksum = "07569945133257ff557eb37b015497104cea61a2c9edaf126c1cbd6e8332397f" dependencies = [ - "futures 0.1.29", + "futures 0.3.8", "log 0.4.11", "serde", "serde_derive", @@ -1936,18 +1868,19 @@ dependencies = [ [[package]] name = "jsonrpc-core-client" -version = "15.1.0" +version = "17.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f764902d7b891344a0acb65625f32f6f7c6db006952143bd650209fbe7d94db" +checksum = "7ac9d56dc729912796637c30f475bbf834594607b27740dfea6e5fa7ba40d1f1" dependencies = [ + "futures 0.3.8", "jsonrpc-client-transports", ] [[package]] name = "jsonrpc-derive" -version = "15.1.0" +version = "17.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99a847f9ec7bb52149b2786a17c9cb260d6effc6b8eeb8c16b343a487a7563a3" +checksum = "b68ba7e76e5c7796cfa4d2a30e83986550c34404c6d40551c902ca6f7bd4a137" dependencies = [ "proc-macro-crate", "proc-macro2 1.0.24", @@ -1957,59 +1890,64 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" -version = "15.1.0" +version = "17.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb5c4513b7b542f42da107942b7b759f27120b5cc894729f88254b28dff44b7" +checksum = "eff2303c4f0562afcbd2dae75e3e21815095f8994749a80fbcd365877e44ed64" dependencies = [ - "hyper 0.12.35", + "futures 0.3.8", + "hyper 0.13.5", "jsonrpc-core", "jsonrpc-server-utils", "log 0.4.11", "net2", - "parking_lot 0.10.2", + "parking_lot 0.11.0", "unicase 2.6.0", ] [[package]] name = "jsonrpc-pubsub" -version = "15.1.0" +version = "17.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "639558e0604013be9787ae52f798506ae42bf4220fe587bdc5625871cc8b9c77" +checksum = "0c48dbebce7a9c88ab272a4db7d6478aa4c6d9596e6c086366e89efc4e9ed89e" dependencies = [ + "futures 0.3.8", "jsonrpc-core", + "lazy_static", "log 0.4.11", - "parking_lot 0.10.2", + "parking_lot 0.11.0", "rand 0.7.3", "serde", ] [[package]] name = "jsonrpc-server-utils" -version = "15.1.0" +version = "17.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72f1f3990650c033bd8f6bd46deac76d990f9bbfb5f8dc8c4767bf0a00392176" +checksum = "f4207cce738bf713a82525065b750a008f28351324f438f56b33d698ada95bb4" dependencies = [ - "bytes 0.4.12", + "bytes 0.5.4", + "futures 0.3.8", "globset", "jsonrpc-core", "lazy_static", "log 0.4.11", - "tokio 0.1.22", - "tokio-codec", + "tokio 0.2.22", + "tokio-util 0.3.1", "unicase 2.6.0", ] [[package]] name = "jsonrpc-ws-server" -version = "15.1.0" +version = "17.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6596fe75209b73a2a75ebe1dce4e60e03b88a2b25e8807b667597f6315150d22" +checksum = "abe06e1385e4a912711703123ba44f735627d666f87e5fec764ad1338ec617dc" dependencies = [ + "futures 0.3.8", "jsonrpc-core", "jsonrpc-server-utils", "log 0.4.11", "parity-ws", - "parking_lot 0.10.2", + "parking_lot 0.11.0", "slab", ] @@ -3230,8 +3168,8 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "http 0.2.1", - "http-body 0.3.1", + "http", + "http-body", "hyper 0.13.5", "hyper-rustls", "hyper-tls", @@ -4076,7 +4014,6 @@ dependencies = [ "bs58", "bv", "byteorder", - "bytes 0.4.12", "chrono", "core_affinity", "crossbeam-channel 0.4.4", @@ -4140,11 +4077,8 @@ dependencies = [ "systemstat", "tempfile", "thiserror", - "tokio 0.1.22", "tokio 0.2.22", - "tokio-codec", - "tokio-fs", - "tokio-io", + "tokio-util 0.2.0", "trees", ] @@ -5430,15 +5364,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" -[[package]] -name = "string" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d24114bfcceb867ca7f71a0d3fe45d45619ec47a6fbfa98cb14e14250bfa5d6d" -dependencies = [ - "bytes 0.4.12", -] - [[package]] name = "strsim" version = "0.8.0" @@ -5789,17 +5714,6 @@ dependencies = [ "winapi 0.3.8", ] -[[package]] -name = "tokio-buf" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fb220f46c53859a4b7ec083e41dec9778ff0b1851c0942b211edb89e0ccdc46" -dependencies = [ - "bytes 0.4.12", - "either", - "futures 0.1.29", -] - [[package]] name = "tokio-codec" version = "0.1.2" @@ -6027,6 +5941,20 @@ dependencies = [ "tokio-reactor", ] +[[package]] +name = "tokio-util" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930" +dependencies = [ + "bytes 0.5.4", + "futures-core", + "futures-sink", + "log 0.4.11", + "pin-project-lite 0.1.5", + "tokio 0.2.22", +] + [[package]] name = "tokio-util" version = "0.3.1" @@ -6076,8 +6004,8 @@ dependencies = [ "bytes 0.5.4", "futures-core", "futures-util", - "http 0.2.1", - "http-body 0.3.1", + "http", + "http-body", "hyper 0.13.5", "percent-encoding 2.1.0", "pin-project 0.4.23", @@ -6351,7 +6279,7 @@ dependencies = [ "base64 0.11.0", "byteorder", "bytes 0.5.4", - "http 0.2.1", + "http", "httparse", "input_buffer", "log 0.4.11", @@ -6552,17 +6480,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "want" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6395efa4784b027708f7451087e647ec73cc74f5d9bc2e418404248d679a230" -dependencies = [ - "futures 0.1.29", - "log 0.4.11", - "try-lock", -] - [[package]] name = "want" version = "0.3.0" diff --git a/client/Cargo.toml b/client/Cargo.toml index 6651552569..514239b5b5 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -14,7 +14,7 @@ bincode = "1.3.1" bs58 = "0.3.1" clap = "2.33.0" indicatif = "0.15.0" -jsonrpc-core = "15.0.0" +jsonrpc-core = "17.0.0" log = "0.4.11" net2 = "0.2.37" rayon = "1.5.0" @@ -36,8 +36,7 @@ url = "2.1.1" [dev-dependencies] assert_matches = "1.3.0" -jsonrpc-core = "15.0.0" -jsonrpc-http-server = "15.0.0" +jsonrpc-http-server = "17.0.0" solana-logger = { path = "../logger", version = "1.6.0" } [package.metadata.docs.rs] diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index 840c6031da..4dd8d7343b 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -1569,7 +1569,7 @@ mod tests { use super::*; use crate::{client_error::ClientErrorKind, mock_sender::PUBKEY}; use assert_matches::assert_matches; - use jsonrpc_core::{Error, IoHandler, Params}; + use jsonrpc_core::{futures::prelude::*, Error, IoHandler, Params}; use jsonrpc_http_server::{AccessControlAllowOrigin, DomainsValidation, ServerBuilder}; use serde_json::Number; use solana_sdk::{ @@ -1586,14 +1586,14 @@ mod tests { let mut io = IoHandler::default(); // Successful request io.add_method("getBalance", |_params: Params| { - Ok(Value::Number(Number::from(50))) + future::ok(Value::Number(Number::from(50))) }); // Failed request io.add_method("getRecentBlockhash", |params: Params| { if params != Params::None { - Err(Error::invalid_request()) + future::err(Error::invalid_request()) } else { - Ok(Value::String( + future::ok(Value::String( "deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx".to_string(), )) } diff --git a/core/Cargo.toml b/core/Cargo.toml index 735cc15aa5..28399cb38a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -28,12 +28,12 @@ fs_extra = "1.1.0" flate2 = "1.0" indexmap = { version = "1.5", features = ["rayon"] } itertools = "0.9.0" -jsonrpc-core = "15.0.0" -jsonrpc-core-client = { version = "15.0.0", features = ["ws"] } -jsonrpc-derive = "15.0.0" -jsonrpc-http-server = "15.0.0" -jsonrpc-pubsub = "15.0.0" -jsonrpc-ws-server = "15.0.0" +jsonrpc-core = "17.0.0" +jsonrpc-core-client = { version = "17.0.0", features = ["ws"] } +jsonrpc-derive = "17.0.0" +jsonrpc-http-server = "17.0.0" +jsonrpc-pubsub = "17.0.0" +jsonrpc-ws-server = "17.0.0" log = "0.4.11" lru = "0.6.1" miow = "0.2.2" @@ -77,11 +77,7 @@ spl-token-v2-0 = { package = "spl-token", version = "=3.0.1", features = ["no-en tempfile = "3.1.0" thiserror = "1.0" tokio = { version = "0.2", features = ["full"] } -tokio_01 = { version = "0.1", package = "tokio" } -tokio_01_bytes = { version = "0.4.7", package = "bytes" } -tokio_fs_01 = { version = "0.1", package = "tokio-fs" } -tokio_io_01 = { version = "0.1", package = "tokio-io" } -tokio_codec_01 = { version = "0.1", package = "tokio-codec" } +tokio-util = { version = "0.2", features = ["codec"] } solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.6.0" } trees = "0.2.1" diff --git a/core/src/rpc.rs b/core/src/rpc.rs index f063ad97db..770d779914 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -2964,9 +2964,7 @@ pub mod tests { rpc_subscriptions::RpcSubscriptions, }; use bincode::deserialize; - use jsonrpc_core::{ - futures::future::Future, ErrorCode, MetaIoHandler, Output, Response, Value, - }; + use jsonrpc_core::{futures, ErrorCode, MetaIoHandler, Output, Response, Value}; use jsonrpc_core_client::transports::local; use solana_client::rpc_filter::{Memcmp, MemcmpEncodedBytes}; use solana_ledger::{ @@ -3231,15 +3229,23 @@ pub mod tests { let mut io = MetaIoHandler::default(); io.extend_with(RpcSolImpl.to_delegate()); - let fut = { - let (client, server) = - local::connect_with_metadata::(&io, meta); + async fn use_client(client: gen_client::Client, mint_pubkey: Pubkey) -> u64 { client .get_balance(mint_pubkey.to_string(), None) - .join(server) + .await + .unwrap() + .value + } + + let fut = async { + let (client, server) = + local::connect_with_metadata::(&io, meta); + let client = use_client(client, mint_pubkey); + + futures::join!(client, server) }; - let (response, _) = fut.wait().unwrap(); - assert_eq!(response.value, 20); + let (response, _) = futures::executor::block_on(fut); + assert_eq!(response, 20); } #[test] diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 2ac5e54460..d15aad0b8a 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -488,7 +488,7 @@ mod tests { rpc_subscriptions::tests::robust_poll_or_panic, }; use crossbeam_channel::unbounded; - use jsonrpc_core::{futures::sync::mpsc, Response}; + use jsonrpc_core::{futures::channel::mpsc, Response}; use jsonrpc_pubsub::{PubSubHandler, Session}; use serial_test::serial; use solana_account_decoder::{parse_account_data::parse_account_data, UiAccountEncoding}; @@ -543,7 +543,7 @@ mod tests { } fn create_session() -> Arc { - Arc::new(Session::new(mpsc::channel(1).0)) + Arc::new(Session::new(mpsc::unbounded().0)) } #[test] diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 406434716b..cf2a67d4a7 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -10,7 +10,7 @@ use crate::{ send_transaction_service::{LeaderInfo, SendTransactionService}, validator::ValidatorExit, }; -use jsonrpc_core::MetaIoHandler; +use jsonrpc_core::{futures::prelude::*, MetaIoHandler}; use jsonrpc_http_server::{ hyper, AccessControlAllowOrigin, CloseHandle, DomainsValidation, RequestMiddleware, RequestMiddlewareAction, ServerBuilder, @@ -32,7 +32,8 @@ use std::{ sync::{mpsc::channel, Arc, Mutex, RwLock}, thread::{self, Builder, JoinHandle}, }; -use tokio::runtime; +use tokio::{self, runtime}; +use tokio_util::codec::{BytesCodec, FramedRead}; pub struct JsonRpcService { thread_hdl: JoinHandle<()>, @@ -108,9 +109,6 @@ impl RpcRequestMiddleware { } fn process_file_get(&self, path: &str) -> RequestMiddlewareAction { - // Stuck on tokio 0.1 until the jsonrpc-http-server crate upgrades to tokio 0.2 - use tokio_01::prelude::*; - let stem = path.split_at(1).1; // Drop leading '/' from path let filename = { match path { @@ -134,25 +132,23 @@ impl RpcRequestMiddleware { .unwrap_or(0) .to_string(); info!("get {} -> {:?} ({} bytes)", path, filename, file_length); - RequestMiddlewareAction::Respond { should_validate_hosts: true, - response: Box::new( - tokio_fs_01::file::File::open(filename) - .and_then(|file| { - use tokio_codec_01::{BytesCodec, FramedRead}; - - let stream = FramedRead::new(file, BytesCodec::new()) - .map(tokio_01_bytes::BytesMut::freeze); + response: Box::pin(async { + match tokio::fs::File::open(filename).await { + Err(_) => Ok(Self::internal_server_error()), + Ok(file) => { + let stream = + FramedRead::new(file, BytesCodec::new()).map_ok(|b| b.freeze()); let body = hyper::Body::wrap_stream(stream); Ok(hyper::Response::builder() .header(hyper::header::CONTENT_LENGTH, file_length) .body(body) .unwrap()) - }) - .or_else(|_| Ok(RpcRequestMiddleware::not_found())), - ), + } + } + }), } } @@ -173,57 +169,41 @@ impl RequestMiddleware for RpcRequestMiddleware { if let Some(ref snapshot_config) = self.snapshot_config { if request.uri().path() == "/snapshot.tar.bz2" { // Convenience redirect to the latest snapshot - return RequestMiddlewareAction::Respond { - should_validate_hosts: true, - response: Box::new(jsonrpc_core::futures::future::ok( - if let Some((snapshot_archive, _)) = - snapshot_utils::get_highest_snapshot_archive_path( - &snapshot_config.snapshot_package_output_path, - ) - { - RpcRequestMiddleware::redirect(&format!( - "/{}", - snapshot_archive - .file_name() - .unwrap_or_else(|| std::ffi::OsStr::new("")) - .to_str() - .unwrap_or(&"") - )) - } else { - RpcRequestMiddleware::not_found() - }, - )), - }; + return if let Some((snapshot_archive, _)) = + snapshot_utils::get_highest_snapshot_archive_path( + &snapshot_config.snapshot_package_output_path, + ) { + RpcRequestMiddleware::redirect(&format!( + "/{}", + snapshot_archive + .file_name() + .unwrap_or_else(|| std::ffi::OsStr::new("")) + .to_str() + .unwrap_or(&"") + )) + } else { + RpcRequestMiddleware::not_found() + } + .into(); } } if let Some(result) = process_rest(&self.bank_forks, request.uri().path()) { - RequestMiddlewareAction::Respond { - should_validate_hosts: true, - response: Box::new(jsonrpc_core::futures::future::ok( - hyper::Response::builder() - .status(hyper::StatusCode::OK) - .body(hyper::Body::from(result)) - .unwrap(), - )), - } + hyper::Response::builder() + .status(hyper::StatusCode::OK) + .body(hyper::Body::from(result)) + .unwrap() + .into() } else if self.is_file_get_path(request.uri().path()) { self.process_file_get(request.uri().path()) } else if request.uri().path() == "/health" { - RequestMiddlewareAction::Respond { - should_validate_hosts: true, - response: Box::new(jsonrpc_core::futures::future::ok( - hyper::Response::builder() - .status(hyper::StatusCode::OK) - .body(hyper::Body::from(self.health_check())) - .unwrap(), - )), - } + hyper::Response::builder() + .status(hyper::StatusCode::OK) + .body(hyper::Body::from(self.health_check())) + .unwrap() + .into() } else { - RequestMiddlewareAction::Proceed { - should_continue_on_invalid_cors: false, - request, - } + request.into() } } } @@ -366,9 +346,11 @@ impl JsonRpcService { // so that we avoid the single-threaded event loops from being created automatically by // jsonrpc for threads when .threads(N > 1) is given. let event_loop = { - tokio_01::runtime::Builder::new() + runtime::Builder::new() .core_threads(rpc_threads) - .name_prefix("sol-rpc-el") + .threaded_scheduler() + .enable_all() + .thread_name("sol-rpc-el") .build() .unwrap() }; @@ -391,7 +373,7 @@ impl JsonRpcService { io, move |_req: &hyper::Request| request_processor.clone(), ) - .event_loop_executor(event_loop.executor()) + .event_loop_executor(event_loop.handle().clone()) .threads(1) .cors(DomainsValidation::AllowOnly(vec![ AccessControlAllowOrigin::Any, diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 3619dfb6d6..ece3782752 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -5,7 +5,6 @@ use crate::{ rpc::{get_parsed_token_account, get_parsed_token_accounts}, }; use core::hash::Hash; -use jsonrpc_core::futures::Future; use jsonrpc_pubsub::{ typed::{Sink, Subscriber}, SubscriptionId, @@ -49,9 +48,6 @@ use std::{ time::Duration, }; -// Stuck on tokio 0.1 until the jsonrpc-pubsub crate upgrades to tokio 0.2 -use tokio_01::runtime::{Builder as RuntimeBuilder, Runtime, TaskExecutor}; - const RECEIVE_DELAY_MILLIS: u64 = 100; trait BankGetTransactionLogsAdapter { @@ -262,15 +258,14 @@ where notified_set } -struct RpcNotifier(TaskExecutor); +struct RpcNotifier; impl RpcNotifier { fn notify(&self, value: T, sink: &Sink) where T: serde::Serialize, { - self.0 - .spawn(sink.notify(Ok(value)).map(|_| ()).map_err(|_| ())); + let _ = sink.notify(Ok(value)); } } @@ -418,7 +413,6 @@ pub struct RpcSubscriptions { subscriptions: Subscriptions, notification_sender: Arc>>, t_cleanup: Option>, - notifier_runtime: Option, bank_forks: Arc>, block_commitment_cache: Arc>, optimistically_confirmed_bank: Arc>, @@ -493,13 +487,7 @@ impl RpcSubscriptions { }; let _subscriptions = subscriptions.clone(); - let notifier_runtime = RuntimeBuilder::new() - .core_threads(1) - .name_prefix("solana-rpc-notifier-") - .build() - .unwrap(); - - let notifier = RpcNotifier(notifier_runtime.executor()); + let notifier = RpcNotifier {}; let t_cleanup = Builder::new() .name("solana-rpc-notifications".to_string()) .spawn(move || { @@ -516,7 +504,6 @@ impl RpcSubscriptions { Self { subscriptions, notification_sender, - notifier_runtime: Some(notifier_runtime), t_cleanup: Some(t_cleanup), bank_forks, block_commitment_cache, @@ -1269,12 +1256,6 @@ impl RpcSubscriptions { } fn shutdown(&mut self) -> std::thread::Result<()> { - if let Some(runtime) = self.notifier_runtime.take() { - info!("RPC Notifier runtime - shutting down"); - let _ = runtime.shutdown_now().wait(); - info!("RPC Notifier runtime - shut down"); - } - if self.t_cleanup.is_some() { info!("RPC Notification thread - shutting down"); self.exit.store(true, Ordering::Relaxed); @@ -1294,7 +1275,7 @@ pub(crate) mod tests { use crate::optimistically_confirmed_bank_tracker::{ BankNotification, OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker, }; - use jsonrpc_core::futures::{self, stream::Stream}; + use jsonrpc_core::futures::StreamExt; use jsonrpc_pubsub::typed::Subscriber; use serial_test::serial; use solana_runtime::{ @@ -1307,31 +1288,37 @@ pub(crate) mod tests { system_instruction, system_program, system_transaction, transaction::Transaction, }; - use std::{fmt::Debug, sync::mpsc::channel, time::Instant}; - use tokio_01::{prelude::FutureExt, runtime::Runtime, timer::Delay}; + use std::{fmt::Debug, sync::mpsc::channel}; + use tokio::{ + runtime::Runtime, + time::{delay_for, timeout}, + }; pub(crate) fn robust_poll_or_panic( - receiver: futures::sync::mpsc::Receiver, - ) -> (T, futures::sync::mpsc::Receiver) { + receiver: jsonrpc_core::futures::channel::mpsc::UnboundedReceiver, + ) -> ( + T, + jsonrpc_core::futures::channel::mpsc::UnboundedReceiver, + ) { let (inner_sender, inner_receiver) = channel(); - let mut rt = Runtime::new().unwrap(); - rt.spawn(futures::lazy(|| { - let recv_timeout = receiver - .into_future() - .timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) - .map(move |result| match result { - (Some(value), receiver) => { - inner_sender.send((value, receiver)).expect("send error") - } - (None, _) => panic!("unexpected end of stream"), - }) - .map_err(|err| panic!("stream error {:?}", err)); + let rt = Runtime::new().unwrap(); + rt.spawn(async move { + let result = timeout( + Duration::from_millis(RECEIVE_DELAY_MILLIS), + receiver.into_future(), + ) + .await + .unwrap_or_else(|err| panic!("stream error {:?}", err)); - const INITIAL_DELAY_MS: u64 = RECEIVE_DELAY_MILLIS * 2; - Delay::new(Instant::now() + Duration::from_millis(INITIAL_DELAY_MS)) - .and_then(|_| recv_timeout) - .map_err(|err| panic!("timer error {:?}", err)) - })); + match result { + (Some(value), receiver) => { + inner_sender.send((value, receiver)).expect("send error") + } + (None, _) => panic!("unexpected end of stream"), + } + + delay_for(Duration::from_millis(RECEIVE_DELAY_MILLIS * 2)).await; + }); inner_receiver.recv().expect("recv error") } diff --git a/core/tests/rpc.rs b/core/tests/rpc.rs index 2f46dc2e36..952d8e49ec 100644 --- a/core/tests/rpc.rs +++ b/core/tests/rpc.rs @@ -1,8 +1,5 @@ use bincode::serialize; -use jsonrpc_core::futures::{ - future::{self, Future}, - stream::Stream, -}; +use jsonrpc_core::futures::StreamExt; use jsonrpc_core_client::transports::ws; use log::*; use reqwest::{self, header::CONTENT_TYPE}; @@ -27,7 +24,7 @@ use std::{ thread::sleep, time::{Duration, Instant}, }; -use tokio_01::runtime::Runtime; +use tokio::runtime::Runtime; macro_rules! json_req { ($method: expr, $params: expr) => {{ @@ -184,62 +181,48 @@ fn test_rpc_subscriptions() { let (status_sender, status_receiver) = channel::<(String, Response)>(); // Create the pub sub runtime - let mut rt = Runtime::new().unwrap(); + let rt = Runtime::new().unwrap(); + let rpc_pubsub_url = test_validator.rpc_pubsub_url(); + let signature_set_clone = signature_set.clone(); + rt.spawn(async move { + let connect = ws::try_connect::(&rpc_pubsub_url).unwrap(); + let client = connect.await.unwrap(); - // Subscribe to all signatures - rt.spawn({ - let connect = ws::try_connect::(&test_validator.rpc_pubsub_url()).unwrap(); - let signature_set = signature_set.clone(); - connect - .and_then(move |client| { - for sig in signature_set { - let status_sender = status_sender.clone(); - tokio_01::spawn( - client - .signature_subscribe(sig.clone(), None) - .and_then(move |sig_stream| { - sig_stream.for_each(move |result| { - status_sender.send((sig.clone(), result)).unwrap(); - future::ok(()) - }) - }) - .map_err(|err| { - eprintln!("sig sub err: {:#?}", err); - }), - ); - } - tokio_01::spawn( - client - .slot_subscribe() - .and_then(move |slot_stream| { - slot_stream.for_each(move |_| { - ready_sender.send(()).unwrap(); - future::ok(()) - }) - }) - .map_err(|err| { - eprintln!("slot sub err: {:#?}", err); - }), - ); - for pubkey in account_set { - let account_sender = account_sender.clone(); - tokio_01::spawn( - client - .account_subscribe(pubkey, None) - .and_then(move |account_stream| { - account_stream.for_each(move |result| { - account_sender.send(result).unwrap(); - future::ok(()) - }) - }) - .map_err(|err| { - eprintln!("acct sub err: {:#?}", err); - }), - ); - } - future::ok(()) - }) - .map_err(|_| ()) + // Subscribe to signature notifications + for sig in signature_set_clone { + let status_sender = status_sender.clone(); + let mut sig_sub = client + .signature_subscribe(sig.clone(), None) + .unwrap_or_else(|err| panic!("sig sub err: {:#?}", err)); + + tokio::spawn(async move { + let response = sig_sub.next().await.unwrap(); + status_sender + .send((sig.clone(), response.unwrap())) + .unwrap(); + }); + } + + // Subscribe to account notifications + for pubkey in account_set { + let account_sender = account_sender.clone(); + let mut client_sub = client + .account_subscribe(pubkey, None) + .unwrap_or_else(|err| panic!("acct sub err: {:#?}", err)); + tokio::spawn(async move { + let response = client_sub.next().await.unwrap(); + account_sender.send(response.unwrap()).unwrap(); + }); + } + + // Signal ready after the next slot notification + let mut slot_sub = client + .slot_subscribe() + .unwrap_or_else(|err| panic!("sig sub err: {:#?}", err)); + tokio::spawn(async move { + let _response = slot_sub.next().await.unwrap(); + ready_sender.send(()).unwrap(); + }); }); // Wait for signature subscriptions @@ -311,6 +294,4 @@ fn test_rpc_subscriptions() { } } } - - rt.shutdown_now().wait().unwrap(); } diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index d06c3941fc..0ded55b78b 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -830,31 +830,58 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef" [[package]] -name = "futures-channel" -version = "0.3.7" +name = "futures" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0448174b01148032eed37ac4aed28963aaaa8cfa93569a08e5b479bbc6c2c151" +checksum = "da9052a1a50244d8d5aa9bf55cbc2fb6f357c86cc52e46c62ed390a7180cf150" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2d31b7ec7efab6eefc7c57233bb10b847986139d88cc2f5a02a1ae6871a1846" dependencies = [ "futures-core", + "futures-sink", ] [[package]] name = "futures-core" -version = "0.3.7" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18eaa56102984bed2c88ea39026cff3ce3b4c7f508ca970cedf2450ea10d4e46" +checksum = "79e5145dde8da7d1b3892dad07a9c98fc04bc39892b1ecc9692cf53e2b780a65" + +[[package]] +name = "futures-executor" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9e59fdc009a4b3096bf94f740a0f2424c082521f20a9b08c5c07c48d90fd9b9" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] [[package]] name = "futures-io" -version = "0.3.7" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e1798854a4727ff944a7b12aa999f58ce7aa81db80d2dfaaf2ba06f065ddd2b" +checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500" [[package]] name = "futures-macro" -version = "0.3.7" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36fccf3fc58563b4a14d265027c627c3b665d7fed489427e88e7cc929559efe" +checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd" dependencies = [ "proc-macro-hack", "proc-macro2 1.0.24", @@ -864,31 +891,33 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f878195a49cee50e006b02b93cf7e0a95a38ac7b776b4c4d9cc1207cd20fcb3d" +checksum = "caf5c69029bda2e743fddd0582d1083951d65cc9539aebf8812f36c3491342d6" [[package]] name = "futures-task" -version = "0.3.7" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96d502af37186c4fef99453df03e374683f8a1eec9dcc1e66b3b82dc8278ce3c" +checksum = "13de07eb8ea81ae445aca7b69f5f7bf15d7bf4912d8ca37d6645c77ae8a58d86" dependencies = [ "once_cell", ] [[package]] name = "futures-util" -version = "0.3.7" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abcb44342f62e6f3e8ac427b8aa815f724fd705dfad060b18ac7866c15bb8e34" +checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", + "futures-sink", "futures-task", "memchr", - "pin-project 1.0.1", + "pin-project-lite 0.2.4", "pin-utils", "proc-macro-hack", "proc-macro-nested", @@ -1110,7 +1139,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project 0.4.26", + "pin-project", "socket2", "tokio 0.2.21", "tower-service", @@ -1275,11 +1304,11 @@ dependencies = [ [[package]] name = "jsonrpc-core" -version = "15.1.0" +version = "17.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0745a6379e3edc893c84ec203589790774e4247420033e71a76d3ab4687991fa" +checksum = "07569945133257ff557eb37b015497104cea61a2c9edaf126c1cbd6e8332397f" dependencies = [ - "futures", + "futures 0.3.12", "log", "serde", "serde_derive", @@ -1849,16 +1878,7 @@ version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13fbdfd6bdee3dc9be46452f86af4a4072975899cf8592466668620bebfbcc17" dependencies = [ - "pin-project-internal 0.4.26", -] - -[[package]] -name = "pin-project" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee41d838744f60d959d7074e3afb6b35c7456d0f61cad38a24e35e6553f73841" -dependencies = [ - "pin-project-internal 1.0.1", + "pin-project-internal", ] [[package]] @@ -1872,17 +1892,6 @@ dependencies = [ "syn 1.0.48", ] -[[package]] -name = "pin-project-internal" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81a4ffa594b66bff340084d4081df649a7dc049ac8d7fc458d8e628bfbbb2f86" -dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.6", - "syn 1.0.48", -] - [[package]] name = "pin-project-lite" version = "0.1.5" @@ -1930,9 +1939,9 @@ dependencies = [ [[package]] name = "proc-macro-hack" -version = "0.5.16" +version = "0.5.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro-nested" @@ -3573,7 +3582,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.29", "mio 0.6.22", "num_cpus", "tokio-codec", @@ -3637,7 +3646,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.29", "tokio-io", ] @@ -3647,7 +3656,7 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1de0e32a83f131e002238d7ccde18211c0a5397f60cbfffcb112868c2e0e20e" dependencies = [ - "futures", + "futures 0.1.29", "tokio-executor", ] @@ -3658,7 +3667,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671" dependencies = [ "crossbeam-utils 0.7.2", - "futures", + "futures 0.1.29", ] [[package]] @@ -3667,7 +3676,7 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "297a1206e0ca6302a0eed35b700d292b275256f596e2f3fea7729d5e629b6ff4" dependencies = [ - "futures", + "futures 0.1.29", "tokio-io", "tokio-threadpool", ] @@ -3679,7 +3688,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.29", "log", ] @@ -3701,7 +3710,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09bc590ec4ba8ba87652da2068d150dcada2cfa2e07faae270a5e0409aa51351" dependencies = [ "crossbeam-utils 0.7.2", - "futures", + "futures 0.1.29", "lazy_static", "log", "mio 0.6.22", @@ -3732,7 +3741,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edfe50152bc8164fcc456dab7891fa9bf8beaf01c5ee7e1dd43a397c3cf87dee" dependencies = [ "fnv", - "futures", + "futures 0.1.29", ] [[package]] @@ -3742,7 +3751,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98df18ed66e3b72e742f185882a9e201892407957e45fbff8da17ae7a7c51f72" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.29", "iovec", "mio 0.6.22", "tokio-io", @@ -3758,7 +3767,7 @@ dependencies = [ "crossbeam-deque 0.7.3", "crossbeam-queue", "crossbeam-utils 0.7.2", - "futures", + "futures 0.1.29", "lazy_static", "log", "num_cpus", @@ -3773,7 +3782,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93044f2d313c95ff1cb7809ce9a7a05735b012288a888b62d4434fd58c94f296" dependencies = [ "crossbeam-utils 0.7.2", - "futures", + "futures 0.1.29", "slab", "tokio-executor", ] @@ -3785,7 +3794,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.29", "log", "mio 0.6.22", "tokio-codec", @@ -3800,7 +3809,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5076db410d6fdc6523df7595447629099a1fdc47b3d9f896220780fa48faf798" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.29", "iovec", "libc", "log",