Setup a tokio 0.2 runtime for RPC usage

This commit is contained in:
Michael Vines
2020-07-23 09:12:51 -07:00
parent b876fb84ba
commit 0e02740565
3 changed files with 16 additions and 0 deletions

View File

@ -70,6 +70,7 @@ solana-vote-signer = { path = "../vote-signer", version = "1.4.0" }
spl-token-v1-0 = { package = "spl-token", version = "1.0.6", features = ["skip-no-mangle"] } spl-token-v1-0 = { package = "spl-token", version = "1.0.6", features = ["skip-no-mangle"] }
tempfile = "3.1.0" tempfile = "3.1.0"
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "0.2.22", features = ["full"] }
tokio_01 = { version = "0.1", package = "tokio" } tokio_01 = { version = "0.1", package = "tokio" }
tokio_fs_01 = { version = "0.1", package = "tokio-fs" } tokio_fs_01 = { version = "0.1", package = "tokio-fs" }
tokio_io_01 = { version = "0.1", package = "tokio-io" } tokio_io_01 = { version = "0.1", package = "tokio-io" }

View File

@ -66,6 +66,7 @@ use std::{
Arc, Mutex, RwLock, Arc, Mutex, RwLock,
}, },
}; };
use tokio::runtime;
fn new_response<T>(bank: &Bank, value: T) -> RpcResponse<T> { fn new_response<T>(bank: &Bank, value: T) -> RpcResponse<T> {
let context = RpcResponseContext { slot: bank.slot() }; let context = RpcResponseContext { slot: bank.slot() };
@ -103,6 +104,7 @@ pub struct JsonRpcRequestProcessor {
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash, genesis_hash: Hash,
transaction_sender: Arc<Mutex<Sender<TransactionInfo>>>, transaction_sender: Arc<Mutex<Sender<TransactionInfo>>>,
runtime_handle: runtime::Handle,
} }
impl Metadata for JsonRpcRequestProcessor {} impl Metadata for JsonRpcRequestProcessor {}
@ -166,6 +168,7 @@ impl JsonRpcRequestProcessor {
health: Arc<RpcHealth>, health: Arc<RpcHealth>,
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash, genesis_hash: Hash,
runtime: &runtime::Runtime,
) -> (Self, Receiver<TransactionInfo>) { ) -> (Self, Receiver<TransactionInfo>) {
let (sender, receiver) = channel(); let (sender, receiver) = channel();
( (
@ -217,6 +220,7 @@ impl JsonRpcRequestProcessor {
cluster_info, cluster_info,
genesis_hash, genesis_hash,
transaction_sender: Arc::new(Mutex::new(sender)), transaction_sender: Arc::new(Mutex::new(sender)),
runtime_handle: runtime.handle().clone(),
} }
} }

View File

@ -23,6 +23,7 @@ use std::{
sync::{mpsc::channel, Arc, RwLock}, sync::{mpsc::channel, Arc, RwLock},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
}; };
use tokio::runtime;
pub struct JsonRpcService { pub struct JsonRpcService {
thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<()>,
@ -31,6 +32,7 @@ pub struct JsonRpcService {
pub request_processor: JsonRpcRequestProcessor, // Used only by test_rpc_new()... pub request_processor: JsonRpcRequestProcessor, // Used only by test_rpc_new()...
close_handle: Option<CloseHandle>, close_handle: Option<CloseHandle>,
runtime: runtime::Runtime,
} }
struct RpcRequestMiddleware { struct RpcRequestMiddleware {
@ -251,6 +253,12 @@ impl JsonRpcService {
)); ));
let tpu_address = cluster_info.my_contact_info().tpu; let tpu_address = cluster_info.my_contact_info().tpu;
let runtime = runtime::Builder::new()
.threaded_scheduler()
.thread_name("rpc-runtime")
.enable_all()
.build()
.expect("Runtime");
let (request_processor, receiver) = JsonRpcRequestProcessor::new( let (request_processor, receiver) = JsonRpcRequestProcessor::new(
config, config,
bank_forks.clone(), bank_forks.clone(),
@ -260,6 +268,7 @@ impl JsonRpcService {
health.clone(), health.clone(),
cluster_info, cluster_info,
genesis_hash, genesis_hash,
&runtime,
); );
let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); let exit_send_transaction_service = Arc::new(AtomicBool::new(false));
@ -327,6 +336,7 @@ impl JsonRpcService {
.register_exit(Box::new(move || close_handle_.close())); .register_exit(Box::new(move || close_handle_.close()));
Self { Self {
thread_hdl, thread_hdl,
runtime,
#[cfg(test)] #[cfg(test)]
request_processor: test_request_processor, request_processor: test_request_processor,
close_handle: Some(close_handle), close_handle: Some(close_handle),
@ -340,6 +350,7 @@ impl JsonRpcService {
} }
pub fn join(self) -> thread::Result<()> { pub fn join(self) -> thread::Result<()> {
self.runtime.shutdown_background();
self.thread_hdl.join() self.thread_hdl.join()
} }
} }