Revert "Upgrade in-tree tokio 0.2 usage to tokio 0.3"

This reverts commit 444ed768dc.
This commit is contained in:
Michael Vines
2020-12-30 20:55:41 -08:00
committed by mergify[bot]
parent 2d8dacb72b
commit 3d077fb656
9 changed files with 75 additions and 60 deletions

View File

@ -5,7 +5,7 @@ use std::{
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
};
use tokio::runtime::Runtime;
use tokio::runtime;
// Delay uploading the largest confirmed root for this many slots. This is done in an attempt to
// ensure that the `CacheBlockTimeService` has had enough time to add the block time for the root
@ -21,7 +21,7 @@ pub struct BigTableUploadService {
impl BigTableUploadService {
pub fn new(
runtime: Arc<Runtime>,
runtime_handle: runtime::Handle,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
@ -32,7 +32,7 @@ impl BigTableUploadService {
.name("bigtable-upload".to_string())
.spawn(move || {
Self::run(
runtime,
runtime_handle,
bigtable_ledger_storage,
blockstore,
block_commitment_cache,
@ -45,7 +45,7 @@ impl BigTableUploadService {
}
fn run(
runtime: Arc<Runtime>,
runtime: runtime::Handle,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,

View File

@ -78,7 +78,7 @@ use std::{
Arc, Mutex, RwLock,
},
};
use tokio::runtime::Runtime;
use tokio::runtime;
pub const MAX_REQUEST_PAYLOAD_SIZE: usize = 50 * (1 << 10); // 50kB
pub const PERFORMANCE_SAMPLES_LIMIT: usize = 720;
@ -122,7 +122,7 @@ pub struct JsonRpcRequestProcessor {
cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash,
transaction_sender: Arc<Mutex<Sender<TransactionInfo>>>,
runtime: Arc<Runtime>,
runtime_handle: runtime::Handle,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
}
@ -201,7 +201,7 @@ impl JsonRpcRequestProcessor {
health: Arc<RpcHealth>,
cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash,
runtime: Arc<Runtime>,
runtime: &runtime::Runtime,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
) -> (Self, Receiver<TransactionInfo>) {
@ -217,7 +217,7 @@ impl JsonRpcRequestProcessor {
cluster_info,
genesis_hash,
transaction_sender: Arc::new(Mutex::new(sender)),
runtime,
runtime_handle: runtime.handle().clone(),
bigtable_ledger_storage,
optimistically_confirmed_bank,
},
@ -253,7 +253,7 @@ impl JsonRpcRequestProcessor {
cluster_info,
genesis_hash,
transaction_sender: Arc::new(Mutex::new(sender)),
runtime: Arc::new(Runtime::new().expect("Runtime")),
runtime_handle: runtime::Runtime::new().unwrap().handle().clone(),
bigtable_ledger_storage: None,
optimistically_confirmed_bank: Arc::new(RwLock::new(OptimisticallyConfirmedBank {
bank: bank.clone(),
@ -669,7 +669,7 @@ impl JsonRpcRequestProcessor {
if result.is_err() {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return Ok(self
.runtime
.runtime_handle
.block_on(bigtable_ledger_storage.get_confirmed_block(slot))
.ok()
.map(|confirmed_block| confirmed_block.encode(encoding)));
@ -712,7 +712,7 @@ impl JsonRpcRequestProcessor {
// [start_slot..end_slot] can be fetched from BigTable.
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return Ok(self
.runtime
.runtime_handle
.block_on(
bigtable_ledger_storage
.get_confirmed_blocks(start_slot, (end_slot - start_slot) as usize),
@ -748,7 +748,7 @@ impl JsonRpcRequestProcessor {
// range can be fetched from BigTable.
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return Ok(self
.runtime
.runtime_handle
.block_on(bigtable_ledger_storage.get_confirmed_blocks(start_slot, limit))
.unwrap_or_else(|_| vec![]));
}
@ -775,7 +775,7 @@ impl JsonRpcRequestProcessor {
if result.is_err() || matches!(result, Ok(None)) {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return Ok(self
.runtime
.runtime_handle
.block_on(bigtable_ledger_storage.get_confirmed_block(slot))
.ok()
.and_then(|confirmed_block| confirmed_block.block_time));
@ -851,7 +851,7 @@ impl JsonRpcRequestProcessor {
})
.or_else(|| {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
self.runtime
self.runtime_handle
.block_on(bigtable_ledger_storage.get_signature_status(&signature))
.map(Some)
.unwrap_or(None)
@ -919,7 +919,7 @@ impl JsonRpcRequestProcessor {
None => {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return self
.runtime
.runtime_handle
.block_on(bigtable_ledger_storage.get_confirmed_transaction(&signature))
.unwrap_or(None)
.map(|confirmed| confirmed.encode(encoding));
@ -986,7 +986,7 @@ impl JsonRpcRequestProcessor {
before = results.last().map(|x| x.signature);
}
let bigtable_results = self.runtime.block_on(
let bigtable_results = self.runtime_handle.block_on(
bigtable_ledger_storage.get_confirmed_signatures_for_address(
&address,
before.as_ref(),
@ -1019,7 +1019,7 @@ impl JsonRpcRequestProcessor {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let bigtable_slot = self
.runtime
.runtime_handle
.block_on(bigtable_ledger_storage.get_first_available_block())
.unwrap_or(None)
.unwrap_or(slot);
@ -2927,7 +2927,7 @@ pub mod tests {
RpcHealth::stub(),
cluster_info.clone(),
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
&runtime::Runtime::new().unwrap(),
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
);
@ -4327,7 +4327,7 @@ pub mod tests {
health.clone(),
cluster_info,
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
&runtime::Runtime::new().unwrap(),
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
);
@ -4523,7 +4523,7 @@ pub mod tests {
RpcHealth::stub(),
cluster_info,
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
&runtime::Runtime::new().unwrap(),
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
);
@ -4555,7 +4555,7 @@ pub mod tests {
RpcHealth::stub(),
cluster_info,
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
&runtime::Runtime::new().unwrap(),
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
);
@ -4646,7 +4646,7 @@ pub mod tests {
RpcHealth::stub(),
cluster_info,
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
&runtime::Runtime::new().unwrap(),
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
);
@ -5826,7 +5826,7 @@ pub mod tests {
RpcHealth::stub(),
cluster_info,
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
&runtime::Runtime::new().unwrap(),
None,
optimistically_confirmed_bank.clone(),
);

View File

@ -32,6 +32,7 @@ use std::{
sync::{mpsc::channel, Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
};
use tokio::runtime;
pub struct JsonRpcService {
thread_hdl: JoinHandle<()>,
@ -40,6 +41,7 @@ pub struct JsonRpcService {
pub request_processor: JsonRpcRequestProcessor, // Used only by test_rpc_new()...
close_handle: Option<CloseHandle>,
runtime: runtime::Runtime,
}
struct RpcRequestMiddleware {
@ -280,13 +282,12 @@ impl JsonRpcService {
));
let tpu_address = cluster_info.my_contact_info().tpu;
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.thread_name("rpc-runtime")
.enable_all()
.build()
.expect("Runtime"),
);
let mut runtime = runtime::Builder::new()
.threaded_scheduler()
.thread_name("rpc-runtime")
.enable_all()
.build()
.expect("Runtime");
let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false));
@ -300,7 +301,7 @@ impl JsonRpcService {
info!("BigTable ledger storage initialized");
let bigtable_ledger_upload_service = Arc::new(BigTableUploadService::new(
runtime.clone(),
runtime.handle().clone(),
bigtable_ledger_storage.clone(),
blockstore.clone(),
block_commitment_cache.clone(),
@ -329,7 +330,7 @@ impl JsonRpcService {
health.clone(),
cluster_info.clone(),
genesis_hash,
runtime,
&runtime,
bigtable_ledger_storage,
optimistically_confirmed_bank,
);
@ -403,6 +404,7 @@ impl JsonRpcService {
.register_exit(Box::new(move || close_handle_.close()));
Self {
thread_hdl,
runtime,
#[cfg(test)]
request_processor: test_request_processor,
close_handle: Some(close_handle),
@ -416,6 +418,7 @@ impl JsonRpcService {
}
pub fn join(self) -> thread::Result<()> {
self.runtime.shutdown_background();
self.thread_hdl.join()
}
}