Bump jsonrpc crates and remove old tokio (#18779)

* Bump jsonrpc crates and replace old tokio

* Bump tokio

* getBlockTime

* getBlocks

* getBlocksWithLimit, getInflationReward

* getBlock

* getFirstAvailableBlock

* getTransaction

* getSignaturesForAddress

* getSignatureStatuses

* Remove superfluous runtime
This commit is contained in:
Tyera Eulberg
2021-07-26 12:32:17 -06:00
committed by GitHub
parent b97113408b
commit 8596db8f53
12 changed files with 379 additions and 1039 deletions

View File

@ -15,12 +15,12 @@ bincode = "1.3.3"
bs58 = "0.4.0"
crossbeam-channel = "0.5"
itertools = "0.10.1"
jsonrpc-core = "17.1.0"
jsonrpc-core-client = { version = "17.1.0", features = ["ipc", "ws"] }
jsonrpc-derive = "17.1.0"
jsonrpc-http-server = "17.1.0"
jsonrpc-pubsub = "17.1.0"
jsonrpc-ws-server = "17.1.0"
jsonrpc-core = "18.0.0"
jsonrpc-core-client = { version = "18.0.0", features = ["ipc", "ws"] }
jsonrpc-derive = "18.0.0"
jsonrpc-http-server = "18.0.0"
jsonrpc-pubsub = "18.0.0"
jsonrpc-ws-server = "18.0.0"
libc = "0.2.98"
log = "0.4.14"
regex = "1.5.4"
@ -46,8 +46,7 @@ solana-version = { path = "../version", version = "=1.8.0" }
solana-vote-program = { path = "../programs/vote", version = "=1.8.0" }
spl-token-v2-0 = { package = "spl-token", version = "=3.2.0", features = ["no-entrypoint"] }
tokio = { version = "1", features = ["full"] }
tokio_02 = { version = "0.2", package = "tokio", features = ["full"] }
tokio-util = { version = "0.3", features = ["codec"] } # This crate needs to stay in sync with tokio_02, until that dependency can be removed
tokio-util = { version = "0.6", features = ["codec"] }
[dev-dependencies]
serial_test = "0.4.0"

View File

@ -9,7 +9,7 @@ use {
send_transaction_service::{SendTransactionService, TransactionInfo},
},
bincode::{config::Options, serialize},
jsonrpc_core::{types::error, Error, Metadata, Result},
jsonrpc_core::{futures::future, types::error, BoxFuture, Error, Metadata, Result},
jsonrpc_derive::rpc,
serde::{Deserialize, Serialize},
solana_account_decoder::{
@ -91,7 +91,6 @@ use {
},
time::Duration,
},
tokio::runtime::Runtime,
};
type RpcCustomResult<T> = std::result::Result<T, RpcCustomError>;
@ -157,7 +156,6 @@ pub struct JsonRpcRequestProcessor {
cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash,
transaction_sender: Arc<Mutex<Sender<TransactionInfo>>>,
runtime: Arc<Runtime>,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
largest_accounts_cache: Arc<RwLock<LargestAccountsCache>>,
@ -243,7 +241,6 @@ impl JsonRpcRequestProcessor {
health: Arc<RpcHealth>,
cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash,
runtime: Arc<Runtime>,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
largest_accounts_cache: Arc<RwLock<LargestAccountsCache>>,
@ -264,7 +261,6 @@ impl JsonRpcRequestProcessor {
cluster_info,
genesis_hash,
transaction_sender: Arc::new(Mutex::new(sender)),
runtime,
bigtable_ledger_storage,
optimistically_confirmed_bank,
largest_accounts_cache,
@ -309,7 +305,6 @@ impl JsonRpcRequestProcessor {
cluster_info,
genesis_hash,
transaction_sender: Arc::new(Mutex::new(sender)),
runtime: Arc::new(Runtime::new().expect("Runtime")),
bigtable_ledger_storage: None,
optimistically_confirmed_bank: Arc::new(RwLock::new(OptimisticallyConfirmedBank {
bank: bank.clone(),
@ -404,14 +399,14 @@ impl JsonRpcRequestProcessor {
})
}
pub fn get_inflation_reward(
pub async fn get_inflation_reward(
&self,
addresses: Vec<Pubkey>,
config: Option<RpcEpochConfig>,
) -> Result<Vec<Option<RpcInflationReward>>> {
let config = config.unwrap_or_default();
let epoch_schedule = self.get_epoch_schedule();
let first_available_block = self.get_first_available_block();
let first_available_block = self.get_first_available_block().await;
let epoch = config.epoch.unwrap_or_else(|| {
epoch_schedule
.get_epoch(self.get_slot(config.commitment))
@ -436,16 +431,20 @@ impl JsonRpcRequestProcessor {
}
let first_confirmed_block_in_epoch = *self
.get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment)?
.get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment)
.await?
.get(0)
.ok_or(RpcCustomError::BlockNotAvailable {
slot: first_slot_in_epoch,
})?;
let first_confirmed_block = if let Ok(Some(first_confirmed_block)) = self.get_block(
first_confirmed_block_in_epoch,
Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()),
) {
let first_confirmed_block = if let Ok(Some(first_confirmed_block)) = self
.get_block(
first_confirmed_block_in_epoch,
Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()),
)
.await
{
first_confirmed_block
} else {
return Err(RpcCustomError::BlockNotAvailable {
@ -929,7 +928,7 @@ impl JsonRpcRequestProcessor {
Ok(())
}
pub fn get_block(
pub async fn get_block(
&self,
slot: Slot,
config: Option<RpcEncodingConfigWrapper<RpcBlockConfig>>,
@ -956,9 +955,8 @@ impl JsonRpcRequestProcessor {
self.check_blockstore_root(&result, slot)?;
if result.is_err() {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let bigtable_result = self
.runtime
.block_on(bigtable_ledger_storage.get_confirmed_block(slot));
let bigtable_result =
bigtable_ledger_storage.get_confirmed_block(slot).await;
self.check_bigtable_result(&bigtable_result)?;
return Ok(bigtable_result.ok().map(|confirmed_block| {
confirmed_block.configure(encoding, transaction_details, show_rewards)
@ -1004,7 +1002,7 @@ impl JsonRpcRequestProcessor {
Err(RpcCustomError::BlockNotAvailable { slot }.into())
}
pub fn get_blocks(
pub async fn get_blocks(
&self,
start_slot: Slot,
end_slot: Option<Slot>,
@ -1043,12 +1041,9 @@ impl JsonRpcRequestProcessor {
// [start_slot..end_slot] can be fetched from BigTable. This range should not ever run
// into unfinalized confirmed blocks due to MAX_GET_CONFIRMED_BLOCKS_RANGE
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return self
.runtime
.block_on(
bigtable_ledger_storage
.get_confirmed_blocks(start_slot, (end_slot - start_slot) as usize + 1), // increment limit by 1 to ensure returned range is inclusive of both start_slot and end_slot
)
return bigtable_ledger_storage
.get_confirmed_blocks(start_slot, (end_slot - start_slot) as usize + 1) // increment limit by 1 to ensure returned range is inclusive of both start_slot and end_slot
.await
.map(|mut bigtable_blocks| {
bigtable_blocks.retain(|&slot| slot <= end_slot);
bigtable_blocks
@ -1085,7 +1080,7 @@ impl JsonRpcRequestProcessor {
Ok(blocks)
}
pub fn get_blocks_with_limit(
pub async fn get_blocks_with_limit(
&self,
start_slot: Slot,
limit: usize,
@ -1108,9 +1103,9 @@ impl JsonRpcRequestProcessor {
// range can be fetched from BigTable. This range should not ever run into unfinalized
// confirmed blocks due to MAX_GET_CONFIRMED_BLOCKS_RANGE
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return Ok(self
.runtime
.block_on(bigtable_ledger_storage.get_confirmed_blocks(start_slot, limit))
return Ok(bigtable_ledger_storage
.get_confirmed_blocks(start_slot, limit)
.await
.unwrap_or_default());
}
}
@ -1149,7 +1144,7 @@ impl JsonRpcRequestProcessor {
Ok(blocks)
}
pub fn get_block_time(&self, slot: Slot) -> Result<Option<UnixTimestamp>> {
pub async fn get_block_time(&self, slot: Slot) -> Result<Option<UnixTimestamp>> {
if slot
<= self
.block_commitment_cache
@ -1161,9 +1156,7 @@ impl JsonRpcRequestProcessor {
self.check_blockstore_root(&result, slot)?;
if result.is_err() || matches!(result, Ok(None)) {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let bigtable_result = self
.runtime
.block_on(bigtable_ledger_storage.get_confirmed_block(slot));
let bigtable_result = bigtable_ledger_storage.get_confirmed_block(slot).await;
self.check_bigtable_result(&bigtable_result)?;
return Ok(bigtable_result
.ok()
@ -1208,7 +1201,7 @@ impl JsonRpcRequestProcessor {
Some(status)
}
pub fn get_signature_statuses(
pub async fn get_signature_statuses(
&self,
signatures: Vec<Signature>,
config: Option<RpcSignatureStatusConfig>,
@ -1228,7 +1221,8 @@ impl JsonRpcRequestProcessor {
let status = if let Some(status) = self.get_transaction_status(signature, &bank) {
Some(status)
} else if self.config.enable_rpc_transaction_history && search_transaction_history {
self.blockstore
if let Some(status) = self
.blockstore
.get_rooted_transaction_status(signature)
.map_err(|_| Error::internal_error())?
.filter(|(slot, _status_meta)| {
@ -1248,16 +1242,17 @@ impl JsonRpcRequestProcessor {
confirmation_status: Some(TransactionConfirmationStatus::Finalized),
}
})
.or_else(|| {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
self.runtime
.block_on(bigtable_ledger_storage.get_signature_status(&signature))
.map(Some)
.unwrap_or(None)
} else {
None
}
})
{
Some(status)
} else if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
bigtable_ledger_storage
.get_signature_status(&signature)
.await
.map(Some)
.unwrap_or(None)
} else {
None
}
} else {
None
};
@ -1303,7 +1298,7 @@ impl JsonRpcRequestProcessor {
})
}
pub fn get_transaction(
pub async fn get_transaction(
&self,
signature: Signature,
config: Option<RpcEncodingConfigWrapper<RpcTransactionConfig>>,
@ -1351,9 +1346,9 @@ impl JsonRpcRequestProcessor {
}
None => {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return Ok(self
.runtime
.block_on(bigtable_ledger_storage.get_confirmed_transaction(&signature))
return Ok(bigtable_ledger_storage
.get_confirmed_transaction(&signature)
.await
.unwrap_or(None)
.map(|confirmed| confirmed.encode(encoding)));
}
@ -1389,7 +1384,7 @@ impl JsonRpcRequestProcessor {
}
}
pub fn get_signatures_for_address(
pub async fn get_signatures_for_address(
&self,
address: Pubkey,
mut before: Option<Signature>,
@ -1425,14 +1420,14 @@ impl JsonRpcRequestProcessor {
before = results.last().map(|x| x.signature);
}
let bigtable_results = self.runtime.block_on(
bigtable_ledger_storage.get_confirmed_signatures_for_address(
let bigtable_results = bigtable_ledger_storage
.get_confirmed_signatures_for_address(
&address,
before.as_ref(),
until.as_ref(),
limit,
),
);
)
.await;
match bigtable_results {
Ok(bigtable_results) => {
results.extend(bigtable_results.into_iter().map(|x| x.0));
@ -1467,16 +1462,16 @@ impl JsonRpcRequestProcessor {
}
}
pub fn get_first_available_block(&self) -> Slot {
pub async fn get_first_available_block(&self) -> Slot {
let slot = self
.blockstore
.get_first_available_block()
.unwrap_or_default();
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let bigtable_slot = self
.runtime
.block_on(bigtable_ledger_storage.get_first_available_block())
let bigtable_slot = bigtable_ledger_storage
.get_first_available_block()
.await
.unwrap_or(None)
.unwrap_or(slot);
@ -1957,6 +1952,28 @@ fn verify_token_account_filter(
}
}
fn verify_and_parse_signatures_for_address_params(
address: String,
before: Option<String>,
until: Option<String>,
limit: Option<usize>,
) -> Result<(Pubkey, Option<Signature>, Option<Signature>, usize)> {
let address = verify_pubkey(&address)?;
let before = before
.map(|ref before| verify_signature(before))
.transpose()?;
let until = until.map(|ref until| verify_signature(until)).transpose()?;
let limit = limit.unwrap_or(MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT);
if limit == 0 || limit > MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT {
return Err(Error::invalid_params(format!(
"Invalid limit; max {}",
MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT
)));
}
Ok((address, before, until, limit))
}
fn check_is_at_least_confirmed(commitment: CommitmentConfig) -> Result<()> {
if !commitment.is_at_least_confirmed() {
return Err(Error::invalid_params(
@ -2955,7 +2972,7 @@ pub mod rpc_full {
meta: Self::Metadata,
address_strs: Vec<String>,
config: Option<RpcEpochConfig>,
) -> Result<Vec<Option<RpcInflationReward>>>;
) -> BoxFuture<Result<Vec<Option<RpcInflationReward>>>>;
#[rpc(meta, name = "getClusterNodes")]
fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result<Vec<RpcContactInfo>>;
@ -2976,7 +2993,7 @@ pub mod rpc_full {
meta: Self::Metadata,
signature_strs: Vec<String>,
config: Option<RpcSignatureStatusConfig>,
) -> Result<RpcResponse<Vec<Option<TransactionStatus>>>>;
) -> BoxFuture<Result<RpcResponse<Vec<Option<TransactionStatus>>>>>;
#[rpc(meta, name = "getMaxRetransmitSlot")]
fn get_max_retransmit_slot(&self, meta: Self::Metadata) -> Result<Slot>;
@ -3018,11 +3035,14 @@ pub mod rpc_full {
meta: Self::Metadata,
slot: Slot,
config: Option<RpcEncodingConfigWrapper<RpcBlockConfig>>,
) -> Result<Option<UiConfirmedBlock>>;
) -> BoxFuture<Result<Option<UiConfirmedBlock>>>;
#[rpc(meta, name = "getBlockTime")]
fn get_block_time(&self, meta: Self::Metadata, slot: Slot)
-> Result<Option<UnixTimestamp>>;
fn get_block_time(
&self,
meta: Self::Metadata,
slot: Slot,
) -> BoxFuture<Result<Option<UnixTimestamp>>>;
#[rpc(meta, name = "getBlocks")]
fn get_blocks(
@ -3031,7 +3051,7 @@ pub mod rpc_full {
start_slot: Slot,
config: Option<RpcBlocksConfigWrapper>,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>>;
) -> BoxFuture<Result<Vec<Slot>>>;
#[rpc(meta, name = "getBlocksWithLimit")]
fn get_blocks_with_limit(
@ -3040,7 +3060,7 @@ pub mod rpc_full {
start_slot: Slot,
limit: usize,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>>;
) -> BoxFuture<Result<Vec<Slot>>>;
#[rpc(meta, name = "getTransaction")]
fn get_transaction(
@ -3048,7 +3068,7 @@ pub mod rpc_full {
meta: Self::Metadata,
signature_str: String,
config: Option<RpcEncodingConfigWrapper<RpcTransactionConfig>>,
) -> Result<Option<EncodedConfirmedTransaction>>;
) -> BoxFuture<Result<Option<EncodedConfirmedTransaction>>>;
#[rpc(meta, name = "getSignaturesForAddress")]
fn get_signatures_for_address(
@ -3056,10 +3076,10 @@ pub mod rpc_full {
meta: Self::Metadata,
address: String,
config: Option<RpcSignaturesForAddressConfig>,
) -> Result<Vec<RpcConfirmedTransactionStatusWithSignature>>;
) -> BoxFuture<Result<Vec<RpcConfirmedTransactionStatusWithSignature>>>;
#[rpc(meta, name = "getFirstAvailableBlock")]
fn get_first_available_block(&self, meta: Self::Metadata) -> Result<Slot>;
fn get_first_available_block(&self, meta: Self::Metadata) -> BoxFuture<Result<Slot>>;
}
pub struct FullImpl;
@ -3151,22 +3171,27 @@ pub mod rpc_full {
meta: Self::Metadata,
signature_strs: Vec<String>,
config: Option<RpcSignatureStatusConfig>,
) -> Result<RpcResponse<Vec<Option<TransactionStatus>>>> {
) -> BoxFuture<Result<RpcResponse<Vec<Option<TransactionStatus>>>>> {
debug!(
"get_signature_statuses rpc request received: {:?}",
signature_strs.len()
);
if signature_strs.len() > MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS {
return Err(Error::invalid_params(format!(
return Box::pin(future::err(Error::invalid_params(format!(
"Too many inputs provided; max {}",
MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS
)));
))));
}
let mut signatures: Vec<Signature> = vec![];
for signature_str in signature_strs {
signatures.push(verify_signature(&signature_str)?);
match verify_signature(&signature_str) {
Ok(signature) => {
signatures.push(signature);
}
Err(err) => return Box::pin(future::err(err)),
}
}
meta.get_signature_statuses(signatures, config)
Box::pin(async move { meta.get_signature_statuses(signatures, config).await })
}
fn get_max_retransmit_slot(&self, meta: Self::Metadata) -> Result<Slot> {
@ -3425,9 +3450,9 @@ pub mod rpc_full {
meta: Self::Metadata,
slot: Slot,
config: Option<RpcEncodingConfigWrapper<RpcBlockConfig>>,
) -> Result<Option<UiConfirmedBlock>> {
) -> BoxFuture<Result<Option<UiConfirmedBlock>>> {
debug!("get_block rpc request received: {:?}", slot);
meta.get_block(slot, config)
Box::pin(async move { meta.get_block(slot, config).await })
}
fn get_blocks(
@ -3436,14 +3461,17 @@ pub mod rpc_full {
start_slot: Slot,
config: Option<RpcBlocksConfigWrapper>,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>> {
) -> BoxFuture<Result<Vec<Slot>>> {
let (end_slot, maybe_commitment) =
config.map(|config| config.unzip()).unwrap_or_default();
debug!(
"get_blocks rpc request received: {}-{:?}",
start_slot, end_slot
);
meta.get_blocks(start_slot, end_slot, commitment.or(maybe_commitment))
Box::pin(async move {
meta.get_blocks(start_slot, end_slot, commitment.or(maybe_commitment))
.await
})
}
fn get_blocks_with_limit(
@ -3452,20 +3480,23 @@ pub mod rpc_full {
start_slot: Slot,
limit: usize,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>> {
) -> BoxFuture<Result<Vec<Slot>>> {
debug!(
"get_blocks_with_limit rpc request received: {}-{}",
start_slot, limit,
);
meta.get_blocks_with_limit(start_slot, limit, commitment)
Box::pin(async move {
meta.get_blocks_with_limit(start_slot, limit, commitment)
.await
})
}
fn get_block_time(
&self,
meta: Self::Metadata,
slot: Slot,
) -> Result<Option<UnixTimestamp>> {
meta.get_block_time(slot)
) -> BoxFuture<Result<Option<UnixTimestamp>>> {
Box::pin(async move { meta.get_block_time(slot).await })
}
fn get_transaction(
@ -3473,10 +3504,13 @@ pub mod rpc_full {
meta: Self::Metadata,
signature_str: String,
config: Option<RpcEncodingConfigWrapper<RpcTransactionConfig>>,
) -> Result<Option<EncodedConfirmedTransaction>> {
) -> BoxFuture<Result<Option<EncodedConfirmedTransaction>>> {
debug!("get_transaction rpc request received: {:?}", signature_str);
let signature = verify_signature(&signature_str)?;
meta.get_transaction(signature, config)
let signature = verify_signature(&signature_str);
if let Err(err) = signature {
return Box::pin(future::err(err));
}
Box::pin(async move { meta.get_transaction(signature.unwrap(), config).await })
}
fn get_signatures_for_address(
@ -3484,35 +3518,28 @@ pub mod rpc_full {
meta: Self::Metadata,
address: String,
config: Option<RpcSignaturesForAddressConfig>,
) -> Result<Vec<RpcConfirmedTransactionStatusWithSignature>> {
let address = verify_pubkey(&address)?;
) -> BoxFuture<Result<Vec<RpcConfirmedTransactionStatusWithSignature>>> {
let config = config.unwrap_or_default();
let before = config
.before
.map(|ref before| verify_signature(before))
.transpose()?;
let until = config
.until
.map(|ref until| verify_signature(until))
.transpose()?;
let limit = config
.limit
.unwrap_or(MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT);
let commitment = config.commitment;
let verification = verify_and_parse_signatures_for_address_params(
address,
config.before,
config.until,
config.limit,
);
if limit == 0 || limit > MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT {
return Err(Error::invalid_params(format!(
"Invalid limit; max {}",
MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT
)));
match verification {
Err(err) => Box::pin(future::err(err)),
Ok((address, before, until, limit)) => Box::pin(async move {
meta.get_signatures_for_address(address, before, until, limit, commitment)
.await
}),
}
meta.get_signatures_for_address(address, before, until, limit, config.commitment)
}
fn get_first_available_block(&self, meta: Self::Metadata) -> Result<Slot> {
fn get_first_available_block(&self, meta: Self::Metadata) -> BoxFuture<Result<Slot>> {
debug!("get_first_available_block rpc request received");
Ok(meta.get_first_available_block())
Box::pin(async move { Ok(meta.get_first_available_block().await) })
}
fn get_inflation_reward(
@ -3520,7 +3547,7 @@ pub mod rpc_full {
meta: Self::Metadata,
address_strs: Vec<String>,
config: Option<RpcEpochConfig>,
) -> Result<Vec<Option<RpcInflationReward>>> {
) -> BoxFuture<Result<Vec<Option<RpcInflationReward>>>> {
debug!(
"get_inflation_reward rpc request received: {:?}",
address_strs.len()
@ -3528,10 +3555,15 @@ pub mod rpc_full {
let mut addresses: Vec<Pubkey> = vec![];
for address_str in address_strs {
addresses.push(verify_pubkey(&address_str)?);
match verify_pubkey(&address_str) {
Ok(pubkey) => {
addresses.push(pubkey);
}
Err(err) => return Box::pin(future::err(err)),
}
}
meta.get_inflation_reward(addresses, config)
Box::pin(async move { meta.get_inflation_reward(addresses, config).await })
}
}
}
@ -3551,7 +3583,7 @@ pub mod rpc_deprecated_v1_7 {
meta: Self::Metadata,
slot: Slot,
config: Option<RpcEncodingConfigWrapper<RpcConfirmedBlockConfig>>,
) -> Result<Option<UiConfirmedBlock>>;
) -> BoxFuture<Result<Option<UiConfirmedBlock>>>;
// DEPRECATED
#[rpc(meta, name = "getConfirmedBlocks")]
@ -3561,7 +3593,7 @@ pub mod rpc_deprecated_v1_7 {
start_slot: Slot,
config: Option<RpcConfirmedBlocksConfigWrapper>,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>>;
) -> BoxFuture<Result<Vec<Slot>>>;
// DEPRECATED
#[rpc(meta, name = "getConfirmedBlocksWithLimit")]
@ -3571,7 +3603,7 @@ pub mod rpc_deprecated_v1_7 {
start_slot: Slot,
limit: usize,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>>;
) -> BoxFuture<Result<Vec<Slot>>>;
// DEPRECATED
#[rpc(meta, name = "getConfirmedTransaction")]
@ -3580,7 +3612,7 @@ pub mod rpc_deprecated_v1_7 {
meta: Self::Metadata,
signature_str: String,
config: Option<RpcEncodingConfigWrapper<RpcConfirmedTransactionConfig>>,
) -> Result<Option<EncodedConfirmedTransaction>>;
) -> BoxFuture<Result<Option<EncodedConfirmedTransaction>>>;
// DEPRECATED
#[rpc(meta, name = "getConfirmedSignaturesForAddress2")]
@ -3589,7 +3621,7 @@ pub mod rpc_deprecated_v1_7 {
meta: Self::Metadata,
address: String,
config: Option<RpcGetConfirmedSignaturesForAddress2Config>,
) -> Result<Vec<RpcConfirmedTransactionStatusWithSignature>>;
) -> BoxFuture<Result<Vec<RpcConfirmedTransactionStatusWithSignature>>>;
}
pub struct DeprecatedV1_7Impl;
@ -3601,9 +3633,12 @@ pub mod rpc_deprecated_v1_7 {
meta: Self::Metadata,
slot: Slot,
config: Option<RpcEncodingConfigWrapper<RpcConfirmedBlockConfig>>,
) -> Result<Option<UiConfirmedBlock>> {
) -> BoxFuture<Result<Option<UiConfirmedBlock>>> {
debug!("get_confirmed_block rpc request received: {:?}", slot);
meta.get_block(slot, config.map(|config| config.convert()))
Box::pin(async move {
meta.get_block(slot, config.map(|config| config.convert()))
.await
})
}
fn get_confirmed_blocks(
@ -3612,14 +3647,17 @@ pub mod rpc_deprecated_v1_7 {
start_slot: Slot,
config: Option<RpcConfirmedBlocksConfigWrapper>,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>> {
) -> BoxFuture<Result<Vec<Slot>>> {
let (end_slot, maybe_commitment) =
config.map(|config| config.unzip()).unwrap_or_default();
debug!(
"get_confirmed_blocks rpc request received: {}-{:?}",
start_slot, end_slot
);
meta.get_blocks(start_slot, end_slot, commitment.or(maybe_commitment))
Box::pin(async move {
meta.get_blocks(start_slot, end_slot, commitment.or(maybe_commitment))
.await
})
}
fn get_confirmed_blocks_with_limit(
@ -3628,12 +3666,15 @@ pub mod rpc_deprecated_v1_7 {
start_slot: Slot,
limit: usize,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>> {
) -> BoxFuture<Result<Vec<Slot>>> {
debug!(
"get_confirmed_blocks_with_limit rpc request received: {}-{}",
start_slot, limit,
);
meta.get_blocks_with_limit(start_slot, limit, commitment)
Box::pin(async move {
meta.get_blocks_with_limit(start_slot, limit, commitment)
.await
})
}
fn get_confirmed_transaction(
@ -3641,13 +3682,19 @@ pub mod rpc_deprecated_v1_7 {
meta: Self::Metadata,
signature_str: String,
config: Option<RpcEncodingConfigWrapper<RpcConfirmedTransactionConfig>>,
) -> Result<Option<EncodedConfirmedTransaction>> {
) -> BoxFuture<Result<Option<EncodedConfirmedTransaction>>> {
debug!(
"get_confirmed_transaction rpc request received: {:?}",
signature_str
);
let signature = verify_signature(&signature_str)?;
meta.get_transaction(signature, config.map(|config| config.convert()))
let signature = verify_signature(&signature_str);
if let Err(err) = signature {
return Box::pin(future::err(err));
}
Box::pin(async move {
meta.get_transaction(signature.unwrap(), config.map(|config| config.convert()))
.await
})
}
fn get_confirmed_signatures_for_address2(
@ -3655,30 +3702,23 @@ pub mod rpc_deprecated_v1_7 {
meta: Self::Metadata,
address: String,
config: Option<RpcGetConfirmedSignaturesForAddress2Config>,
) -> Result<Vec<RpcConfirmedTransactionStatusWithSignature>> {
let address = verify_pubkey(&address)?;
) -> BoxFuture<Result<Vec<RpcConfirmedTransactionStatusWithSignature>>> {
let config = config.unwrap_or_default();
let before = config
.before
.map(|ref before| verify_signature(before))
.transpose()?;
let until = config
.until
.map(|ref until| verify_signature(until))
.transpose()?;
let limit = config
.limit
.unwrap_or(MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT);
let commitment = config.commitment;
let verification = verify_and_parse_signatures_for_address_params(
address,
config.before,
config.until,
config.limit,
);
if limit == 0 || limit > MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT {
return Err(Error::invalid_params(format!(
"Invalid limit; max {}",
MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT
)));
match verification {
Err(err) => Box::pin(future::err(err)),
Ok((address, before, until, limit)) => Box::pin(async move {
meta.get_signatures_for_address(address, before, until, limit, commitment)
.await
}),
}
meta.get_signatures_for_address(address, before, until, limit, config.commitment)
}
}
}
@ -4195,7 +4235,6 @@ pub mod tests {
RpcHealth::stub(),
cluster_info.clone(),
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
@ -5762,7 +5801,6 @@ pub mod tests {
health.clone(),
cluster_info,
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
@ -6045,7 +6083,6 @@ pub mod tests {
RpcHealth::stub(),
cluster_info,
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
@ -7471,7 +7508,6 @@ pub mod tests {
RpcHealth::stub(),
cluster_info,
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
None,
optimistically_confirmed_bank.clone(),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),

View File

@ -41,7 +41,6 @@ use {
sync::{mpsc::channel, Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
},
tokio::runtime,
tokio_util::codec::{BytesCodec, FramedRead},
};
@ -120,10 +119,8 @@ impl RpcRequestMiddleware {
}
#[cfg(unix)]
async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio_02::fs::File> {
// Stuck on tokio 0.2 until the jsonrpc crates upgrade
use tokio_02::fs::os::unix::OpenOptionsExt;
tokio_02::fs::OpenOptions::new()
async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio::fs::File> {
tokio::fs::OpenOptions::new()
.read(true)
.write(false)
.create(false)
@ -133,10 +130,9 @@ impl RpcRequestMiddleware {
}
#[cfg(not(unix))]
async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio_02::fs::File> {
async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio::fs::File> {
// TODO: Is there any way to achieve the same on Windows?
// Stuck on tokio 0.2 until the jsonrpc crates upgrade
tokio_02::fs::File::open(path).await
tokio::fs::File::open(path).await
}
fn process_file_get(&self, path: &str) -> RequestMiddlewareAction {
@ -309,9 +305,17 @@ impl JsonRpcService {
)));
let tpu_address = cluster_info.my_contact_info().tpu;
// sadly, some parts of our current rpc implemention block the jsonrpc's
// _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU,
// causing no further processing of incoming requests and ultimatily innocent clients timing-out.
// So create a (shared) multi-threaded event_loop for jsonrpc and set its .threads() to 1,
// so that we avoid the single-threaded event loops from being created automatically by
// jsonrpc for threads when .threads(N > 1) is given.
let runtime = Arc::new(
runtime::Builder::new_multi_thread()
.thread_name("rpc-runtime")
tokio::runtime::Builder::new_multi_thread()
.worker_threads(rpc_threads)
.thread_name("sol-rpc-el")
.enable_all()
.build()
.expect("Runtime"),
@ -367,7 +371,6 @@ impl JsonRpcService {
health.clone(),
cluster_info.clone(),
genesis_hash,
runtime,
bigtable_ledger_storage,
optimistically_confirmed_bank,
largest_accounts_cache,
@ -392,23 +395,6 @@ impl JsonRpcService {
let ledger_path = ledger_path.to_path_buf();
// sadly, some parts of our current rpc implemention block the jsonrpc's
// _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU,
// causing no further processing of incoming requests and ultimatily innocent clients timing-out.
// So create a (shared) multi-threaded event_loop for jsonrpc and set its .threads() to 1,
// so that we avoid the single-threaded event loops from being created automatically by
// jsonrpc for threads when .threads(N > 1) is given.
let event_loop = {
// Stuck on tokio 0.2 until the jsonrpc crates upgrade
tokio_02::runtime::Builder::new()
.core_threads(rpc_threads)
.threaded_scheduler()
.enable_all()
.thread_name("sol-rpc-el")
.build()
.unwrap()
};
let (close_handle_sender, close_handle_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-jsonrpc".to_string())
@ -436,7 +422,7 @@ impl JsonRpcService {
io,
move |_req: &hyper::Request<hyper::Body>| request_processor.clone(),
)
.event_loop_executor(event_loop.handle().clone())
.event_loop_executor(runtime.handle().clone())
.threads(1)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Any,
@ -517,6 +503,7 @@ mod tests {
io::Write,
net::{IpAddr, Ipv4Addr},
},
tokio::runtime::Runtime,
};
#[test]
@ -655,7 +642,7 @@ mod tests {
#[test]
fn test_process_file_get() {
let mut runtime = tokio_02::runtime::Runtime::new().unwrap();
let runtime = Runtime::new().unwrap();
let ledger_path = get_tmp_ledger_path!();
std::fs::create_dir(&ledger_path).unwrap();