diff --git a/core/Cargo.toml b/core/Cargo.toml index 8796c1b3d5..24f73dd016 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -70,6 +70,7 @@ solana-vote-signer = { path = "../vote-signer", version = "1.4.0" } spl-token-v1-0 = { package = "spl-token", version = "1.0.6", features = ["skip-no-mangle"] } tempfile = "3.1.0" thiserror = "1.0" +tokio = { version = "0.2.22", features = ["full"] } tokio_01 = { version = "0.1", package = "tokio" } tokio_fs_01 = { version = "0.1", package = "tokio-fs" } tokio_io_01 = { version = "0.1", package = "tokio-io" } diff --git a/core/src/rpc.rs b/core/src/rpc.rs index b6782f3ed1..ebe40538a4 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -66,6 +66,7 @@ use std::{ Arc, Mutex, RwLock, }, }; +use tokio::runtime; fn new_response(bank: &Bank, value: T) -> RpcResponse { let context = RpcResponseContext { slot: bank.slot() }; @@ -103,6 +104,7 @@ pub struct JsonRpcRequestProcessor { cluster_info: Arc, genesis_hash: Hash, transaction_sender: Arc>>, + runtime_handle: runtime::Handle, } impl Metadata for JsonRpcRequestProcessor {} @@ -166,6 +168,7 @@ impl JsonRpcRequestProcessor { health: Arc, cluster_info: Arc, genesis_hash: Hash, + runtime: &runtime::Runtime, ) -> (Self, Receiver) { let (sender, receiver) = channel(); ( @@ -217,6 +220,7 @@ impl JsonRpcRequestProcessor { cluster_info, genesis_hash, transaction_sender: Arc::new(Mutex::new(sender)), + runtime_handle: runtime.handle().clone(), } } diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 009d5f5601..be8d40efcb 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -23,6 +23,7 @@ use std::{ sync::{mpsc::channel, Arc, RwLock}, thread::{self, Builder, JoinHandle}, }; +use tokio::runtime; pub struct JsonRpcService { thread_hdl: JoinHandle<()>, @@ -31,6 +32,7 @@ pub struct JsonRpcService { pub request_processor: JsonRpcRequestProcessor, // Used only by test_rpc_new()... close_handle: Option, + runtime: runtime::Runtime, } struct RpcRequestMiddleware { @@ -251,6 +253,12 @@ impl JsonRpcService { )); let tpu_address = cluster_info.my_contact_info().tpu; + let runtime = runtime::Builder::new() + .threaded_scheduler() + .thread_name("rpc-runtime") + .enable_all() + .build() + .expect("Runtime"); let (request_processor, receiver) = JsonRpcRequestProcessor::new( config, bank_forks.clone(), @@ -260,6 +268,7 @@ impl JsonRpcService { health.clone(), cluster_info, genesis_hash, + &runtime, ); let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); @@ -327,6 +336,7 @@ impl JsonRpcService { .register_exit(Box::new(move || close_handle_.close())); Self { thread_hdl, + runtime, #[cfg(test)] request_processor: test_request_processor, close_handle: Some(close_handle), @@ -340,6 +350,7 @@ impl JsonRpcService { } pub fn join(self) -> thread::Result<()> { + self.runtime.shutdown_background(); self.thread_hdl.join() } }