Bump jsonrpc crates and remove old tokio (backport #18779) (#19453)

* Bump jsonrpc crates and remove old tokio (#18779)

* Bump jsonrpc crates and replace old tokio

* Bump tokio

* getBlockTime

* getBlocks

* getBlocksWithLimit, getInflationReward

* getBlock

* getFirstAvailableBlock

* getTransaction

* getSignaturesForAddress

* getSignatureStatuses

* Remove superfluous runtime

(cherry picked from commit 8596db8f53)

# Conflicts:
#	Cargo.lock
#	client/Cargo.toml
#	core/Cargo.toml
#	programs/bpf/Cargo.lock
#	rpc/Cargo.toml
#	rpc/src/rpc.rs
#	validator/Cargo.toml

* Fix conflicts

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
mergify[bot]
2021-08-27 00:55:02 +00:00
committed by GitHub
parent c734db59cb
commit 52dfb4a09c
12 changed files with 340 additions and 927 deletions

View File

@@ -9,7 +9,7 @@ use {
send_transaction_service::{SendTransactionService, TransactionInfo},
},
bincode::{config::Options, serialize},
jsonrpc_core::{types::error, Error, Metadata, Result},
jsonrpc_core::{futures::future, types::error, BoxFuture, Error, Metadata, Result},
jsonrpc_derive::rpc,
serde::{Deserialize, Serialize},
solana_account_decoder::{
@@ -90,7 +90,6 @@ use {
},
time::Duration,
},
tokio::runtime::Runtime,
};
type RpcCustomResult<T> = std::result::Result<T, RpcCustomError>;
@@ -157,7 +156,6 @@ pub struct JsonRpcRequestProcessor {
cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash,
transaction_sender: Arc<Mutex<Sender<TransactionInfo>>>,
runtime: Arc<Runtime>,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
largest_accounts_cache: Arc<RwLock<LargestAccountsCache>>,
@@ -247,7 +245,6 @@ impl JsonRpcRequestProcessor {
health: Arc<RpcHealth>,
cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash,
runtime: Arc<Runtime>,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
largest_accounts_cache: Arc<RwLock<LargestAccountsCache>>,
@@ -268,7 +265,6 @@ impl JsonRpcRequestProcessor {
cluster_info,
genesis_hash,
transaction_sender: Arc::new(Mutex::new(sender)),
runtime,
bigtable_ledger_storage,
optimistically_confirmed_bank,
largest_accounts_cache,
@@ -313,7 +309,6 @@ impl JsonRpcRequestProcessor {
cluster_info,
genesis_hash,
transaction_sender: Arc::new(Mutex::new(sender)),
runtime: Arc::new(Runtime::new().expect("Runtime")),
bigtable_ledger_storage: None,
optimistically_confirmed_bank: Arc::new(RwLock::new(OptimisticallyConfirmedBank {
bank: bank.clone(),
@@ -408,14 +403,14 @@ impl JsonRpcRequestProcessor {
})
}
pub fn get_inflation_reward(
pub async fn get_inflation_reward(
&self,
addresses: Vec<Pubkey>,
config: Option<RpcEpochConfig>,
) -> Result<Vec<Option<RpcInflationReward>>> {
let config = config.unwrap_or_default();
let epoch_schedule = self.get_epoch_schedule();
let first_available_block = self.get_first_available_block();
let first_available_block = self.get_first_available_block().await;
let epoch = config.epoch.unwrap_or_else(|| {
epoch_schedule
.get_epoch(self.get_slot(config.commitment))
@@ -440,16 +435,20 @@ impl JsonRpcRequestProcessor {
}
let first_confirmed_block_in_epoch = *self
.get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment)?
.get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment)
.await?
.get(0)
.ok_or(RpcCustomError::BlockNotAvailable {
slot: first_slot_in_epoch,
})?;
let first_confirmed_block = if let Ok(Some(first_confirmed_block)) = self.get_block(
first_confirmed_block_in_epoch,
Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()),
) {
let first_confirmed_block = if let Ok(Some(first_confirmed_block)) = self
.get_block(
first_confirmed_block_in_epoch,
Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()),
)
.await
{
first_confirmed_block
} else {
return Err(RpcCustomError::BlockNotAvailable {
@@ -941,7 +940,7 @@ impl JsonRpcRequestProcessor {
Ok(())
}
pub fn get_block(
pub async fn get_block(
&self,
slot: Slot,
config: Option<RpcEncodingConfigWrapper<RpcBlockConfig>>,
@@ -977,9 +976,8 @@ impl JsonRpcRequestProcessor {
};
if result.is_err() {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let bigtable_result = self
.runtime
.block_on(bigtable_ledger_storage.get_confirmed_block(slot));
let bigtable_result =
bigtable_ledger_storage.get_confirmed_block(slot).await;
self.check_bigtable_result(&bigtable_result)?;
return Ok(bigtable_result.ok().map(configure_block));
}
@@ -1021,7 +1019,7 @@ impl JsonRpcRequestProcessor {
Err(RpcCustomError::BlockNotAvailable { slot }.into())
}
pub fn get_blocks(
pub async fn get_blocks(
&self,
start_slot: Slot,
end_slot: Option<Slot>,
@@ -1060,12 +1058,9 @@ impl JsonRpcRequestProcessor {
// [start_slot..end_slot] can be fetched from BigTable. This range should not ever run
// into unfinalized confirmed blocks due to MAX_GET_CONFIRMED_BLOCKS_RANGE
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return self
.runtime
.block_on(
bigtable_ledger_storage
.get_confirmed_blocks(start_slot, (end_slot - start_slot) as usize + 1), // increment limit by 1 to ensure returned range is inclusive of both start_slot and end_slot
)
return bigtable_ledger_storage
.get_confirmed_blocks(start_slot, (end_slot - start_slot) as usize + 1) // increment limit by 1 to ensure returned range is inclusive of both start_slot and end_slot
.await
.map(|mut bigtable_blocks| {
bigtable_blocks.retain(|&slot| slot <= end_slot);
bigtable_blocks
@@ -1105,7 +1100,7 @@ impl JsonRpcRequestProcessor {
Ok(blocks)
}
pub fn get_blocks_with_limit(
pub async fn get_blocks_with_limit(
&self,
start_slot: Slot,
limit: usize,
@@ -1128,9 +1123,9 @@ impl JsonRpcRequestProcessor {
// range can be fetched from BigTable. This range should not ever run into unfinalized
// confirmed blocks due to MAX_GET_CONFIRMED_BLOCKS_RANGE
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return Ok(self
.runtime
.block_on(bigtable_ledger_storage.get_confirmed_blocks(start_slot, limit))
return Ok(bigtable_ledger_storage
.get_confirmed_blocks(start_slot, limit)
.await
.unwrap_or_default());
}
}
@@ -1169,7 +1164,7 @@ impl JsonRpcRequestProcessor {
Ok(blocks)
}
pub fn get_block_time(&self, slot: Slot) -> Result<Option<UnixTimestamp>> {
pub async fn get_block_time(&self, slot: Slot) -> Result<Option<UnixTimestamp>> {
if slot == 0 {
return Ok(Some(self.genesis_creation_time()));
}
@@ -1184,9 +1179,7 @@ impl JsonRpcRequestProcessor {
self.check_blockstore_root(&result, slot)?;
if result.is_err() || matches!(result, Ok(None)) {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let bigtable_result = self
.runtime
.block_on(bigtable_ledger_storage.get_confirmed_block(slot));
let bigtable_result = bigtable_ledger_storage.get_confirmed_block(slot).await;
self.check_bigtable_result(&bigtable_result)?;
return Ok(bigtable_result
.ok()
@@ -1231,7 +1224,7 @@ impl JsonRpcRequestProcessor {
Some(status)
}
pub fn get_signature_statuses(
pub async fn get_signature_statuses(
&self,
signatures: Vec<Signature>,
config: Option<RpcSignatureStatusConfig>,
@@ -1251,7 +1244,8 @@ impl JsonRpcRequestProcessor {
let status = if let Some(status) = self.get_transaction_status(signature, &bank) {
Some(status)
} else if self.config.enable_rpc_transaction_history && search_transaction_history {
self.blockstore
if let Some(status) = self
.blockstore
.get_rooted_transaction_status(signature)
.map_err(|_| Error::internal_error())?
.filter(|(slot, _status_meta)| {
@@ -1271,16 +1265,17 @@ impl JsonRpcRequestProcessor {
confirmation_status: Some(TransactionConfirmationStatus::Finalized),
}
})
.or_else(|| {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
self.runtime
.block_on(bigtable_ledger_storage.get_signature_status(&signature))
.map(Some)
.unwrap_or(None)
} else {
None
}
})
{
Some(status)
} else if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
bigtable_ledger_storage
.get_signature_status(&signature)
.await
.map(Some)
.unwrap_or(None)
} else {
None
}
} else {
None
};
@@ -1326,7 +1321,7 @@ impl JsonRpcRequestProcessor {
})
}
pub fn get_transaction(
pub async fn get_transaction(
&self,
signature: Signature,
config: Option<RpcEncodingConfigWrapper<RpcTransactionConfig>>,
@@ -1374,9 +1369,9 @@ impl JsonRpcRequestProcessor {
}
None => {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return Ok(self
.runtime
.block_on(bigtable_ledger_storage.get_confirmed_transaction(&signature))
return Ok(bigtable_ledger_storage
.get_confirmed_transaction(&signature)
.await
.unwrap_or(None)
.map(|confirmed| confirmed.encode(encoding)));
}
@@ -1412,7 +1407,7 @@ impl JsonRpcRequestProcessor {
}
}
pub fn get_signatures_for_address(
pub async fn get_signatures_for_address(
&self,
address: Pubkey,
mut before: Option<Signature>,
@@ -1448,14 +1443,14 @@ impl JsonRpcRequestProcessor {
before = results.last().map(|x| x.signature);
}
let bigtable_results = self.runtime.block_on(
bigtable_ledger_storage.get_confirmed_signatures_for_address(
let bigtable_results = bigtable_ledger_storage
.get_confirmed_signatures_for_address(
&address,
before.as_ref(),
until.as_ref(),
limit,
),
);
)
.await;
match bigtable_results {
Ok(bigtable_results) => {
results.extend(bigtable_results.into_iter().map(|x| x.0));
@@ -1490,16 +1485,16 @@ impl JsonRpcRequestProcessor {
}
}
pub fn get_first_available_block(&self) -> Slot {
pub async fn get_first_available_block(&self) -> Slot {
let slot = self
.blockstore
.get_first_available_block()
.unwrap_or_default();
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let bigtable_slot = self
.runtime
.block_on(bigtable_ledger_storage.get_first_available_block())
let bigtable_slot = bigtable_ledger_storage
.get_first_available_block()
.await
.unwrap_or(None)
.unwrap_or(slot);
@@ -1977,6 +1972,28 @@ fn verify_token_account_filter(
}
}
fn verify_and_parse_signatures_for_address_params(
address: String,
before: Option<String>,
until: Option<String>,
limit: Option<usize>,
) -> Result<(Pubkey, Option<Signature>, Option<Signature>, usize)> {
let address = verify_pubkey(&address)?;
let before = before
.map(|ref before| verify_signature(before))
.transpose()?;
let until = until.map(|ref until| verify_signature(until)).transpose()?;
let limit = limit.unwrap_or(MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT);
if limit == 0 || limit > MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT {
return Err(Error::invalid_params(format!(
"Invalid limit; max {}",
MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT
)));
}
Ok((address, before, until, limit))
}
fn check_is_at_least_confirmed(commitment: CommitmentConfig) -> Result<()> {
if !commitment.is_at_least_confirmed() {
return Err(Error::invalid_params(
@@ -2427,7 +2444,7 @@ pub mod rpc_full {
meta: Self::Metadata,
address_strs: Vec<String>,
config: Option<RpcEpochConfig>,
) -> Result<Vec<Option<RpcInflationReward>>>;
) -> BoxFuture<Result<Vec<Option<RpcInflationReward>>>>;
#[rpc(meta, name = "getInflationGovernor")]
fn get_inflation_governor(
@@ -2496,7 +2513,7 @@ pub mod rpc_full {
meta: Self::Metadata,
signature_strs: Vec<String>,
config: Option<RpcSignatureStatusConfig>,
) -> Result<RpcResponse<Vec<Option<TransactionStatus>>>>;
) -> BoxFuture<Result<RpcResponse<Vec<Option<TransactionStatus>>>>>;
#[rpc(meta, name = "getMaxRetransmitSlot")]
fn get_max_retransmit_slot(&self, meta: Self::Metadata) -> Result<Slot>;
@@ -2567,11 +2584,14 @@ pub mod rpc_full {
meta: Self::Metadata,
slot: Slot,
config: Option<RpcEncodingConfigWrapper<RpcBlockConfig>>,
) -> Result<Option<UiConfirmedBlock>>;
) -> BoxFuture<Result<Option<UiConfirmedBlock>>>;
#[rpc(meta, name = "getBlockTime")]
fn get_block_time(&self, meta: Self::Metadata, slot: Slot)
-> Result<Option<UnixTimestamp>>;
fn get_block_time(
&self,
meta: Self::Metadata,
slot: Slot,
) -> BoxFuture<Result<Option<UnixTimestamp>>>;
#[rpc(meta, name = "getBlocks")]
fn get_blocks(
@@ -2580,7 +2600,7 @@ pub mod rpc_full {
start_slot: Slot,
config: Option<RpcBlocksConfigWrapper>,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>>;
) -> BoxFuture<Result<Vec<Slot>>>;
#[rpc(meta, name = "getBlocksWithLimit")]
fn get_blocks_with_limit(
@@ -2589,7 +2609,7 @@ pub mod rpc_full {
start_slot: Slot,
limit: usize,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>>;
) -> BoxFuture<Result<Vec<Slot>>>;
#[rpc(meta, name = "getTransaction")]
fn get_transaction(
@@ -2597,7 +2617,7 @@ pub mod rpc_full {
meta: Self::Metadata,
signature_str: String,
config: Option<RpcEncodingConfigWrapper<RpcTransactionConfig>>,
) -> Result<Option<EncodedConfirmedTransaction>>;
) -> BoxFuture<Result<Option<EncodedConfirmedTransaction>>>;
#[rpc(meta, name = "getSignaturesForAddress")]
fn get_signatures_for_address(
@@ -2605,10 +2625,10 @@ pub mod rpc_full {
meta: Self::Metadata,
address: String,
config: Option<RpcSignaturesForAddressConfig>,
) -> Result<Vec<RpcConfirmedTransactionStatusWithSignature>>;
) -> BoxFuture<Result<Vec<RpcConfirmedTransactionStatusWithSignature>>>;
#[rpc(meta, name = "getFirstAvailableBlock")]
fn get_first_available_block(&self, meta: Self::Metadata) -> Result<Slot>;
fn get_first_available_block(&self, meta: Self::Metadata) -> BoxFuture<Result<Slot>>;
#[rpc(meta, name = "getStakeActivation")]
fn get_stake_activation(
@@ -2914,22 +2934,27 @@ pub mod rpc_full {
meta: Self::Metadata,
signature_strs: Vec<String>,
config: Option<RpcSignatureStatusConfig>,
) -> Result<RpcResponse<Vec<Option<TransactionStatus>>>> {
) -> BoxFuture<Result<RpcResponse<Vec<Option<TransactionStatus>>>>> {
debug!(
"get_signature_statuses rpc request received: {:?}",
signature_strs.len()
);
if signature_strs.len() > MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS {
return Err(Error::invalid_params(format!(
return Box::pin(future::err(Error::invalid_params(format!(
"Too many inputs provided; max {}",
MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS
)));
))));
}
let mut signatures: Vec<Signature> = vec![];
for signature_str in signature_strs {
signatures.push(verify_signature(&signature_str)?);
match verify_signature(&signature_str) {
Ok(signature) => {
signatures.push(signature);
}
Err(err) => return Box::pin(future::err(err)),
}
}
meta.get_signature_statuses(signatures, config)
Box::pin(async move { meta.get_signature_statuses(signatures, config).await })
}
fn get_max_retransmit_slot(&self, meta: Self::Metadata) -> Result<Slot> {
@@ -3239,9 +3264,9 @@ pub mod rpc_full {
meta: Self::Metadata,
slot: Slot,
config: Option<RpcEncodingConfigWrapper<RpcBlockConfig>>,
) -> Result<Option<UiConfirmedBlock>> {
) -> BoxFuture<Result<Option<UiConfirmedBlock>>> {
debug!("get_block rpc request received: {:?}", slot);
meta.get_block(slot, config)
Box::pin(async move { meta.get_block(slot, config).await })
}
fn get_blocks(
@@ -3250,14 +3275,17 @@ pub mod rpc_full {
start_slot: Slot,
config: Option<RpcBlocksConfigWrapper>,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>> {
) -> BoxFuture<Result<Vec<Slot>>> {
let (end_slot, maybe_commitment) =
config.map(|config| config.unzip()).unwrap_or_default();
debug!(
"get_blocks rpc request received: {}-{:?}",
start_slot, end_slot
);
meta.get_blocks(start_slot, end_slot, commitment.or(maybe_commitment))
Box::pin(async move {
meta.get_blocks(start_slot, end_slot, commitment.or(maybe_commitment))
.await
})
}
fn get_blocks_with_limit(
@@ -3266,20 +3294,23 @@ pub mod rpc_full {
start_slot: Slot,
limit: usize,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>> {
) -> BoxFuture<Result<Vec<Slot>>> {
debug!(
"get_blocks_with_limit rpc request received: {}-{}",
start_slot, limit,
);
meta.get_blocks_with_limit(start_slot, limit, commitment)
Box::pin(async move {
meta.get_blocks_with_limit(start_slot, limit, commitment)
.await
})
}
fn get_block_time(
&self,
meta: Self::Metadata,
slot: Slot,
) -> Result<Option<UnixTimestamp>> {
meta.get_block_time(slot)
) -> BoxFuture<Result<Option<UnixTimestamp>>> {
Box::pin(async move { meta.get_block_time(slot).await })
}
fn get_transaction(
@@ -3287,10 +3318,13 @@ pub mod rpc_full {
meta: Self::Metadata,
signature_str: String,
config: Option<RpcEncodingConfigWrapper<RpcTransactionConfig>>,
) -> Result<Option<EncodedConfirmedTransaction>> {
) -> BoxFuture<Result<Option<EncodedConfirmedTransaction>>> {
debug!("get_transaction rpc request received: {:?}", signature_str);
let signature = verify_signature(&signature_str)?;
meta.get_transaction(signature, config)
let signature = verify_signature(&signature_str);
if let Err(err) = signature {
return Box::pin(future::err(err));
}
Box::pin(async move { meta.get_transaction(signature.unwrap(), config).await })
}
fn get_signatures_for_address(
@@ -3298,35 +3332,28 @@ pub mod rpc_full {
meta: Self::Metadata,
address: String,
config: Option<RpcSignaturesForAddressConfig>,
) -> Result<Vec<RpcConfirmedTransactionStatusWithSignature>> {
let address = verify_pubkey(&address)?;
) -> BoxFuture<Result<Vec<RpcConfirmedTransactionStatusWithSignature>>> {
let config = config.unwrap_or_default();
let before = config
.before
.map(|ref before| verify_signature(before))
.transpose()?;
let until = config
.until
.map(|ref until| verify_signature(until))
.transpose()?;
let limit = config
.limit
.unwrap_or(MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT);
let commitment = config.commitment;
let verification = verify_and_parse_signatures_for_address_params(
address,
config.before,
config.until,
config.limit,
);
if limit == 0 || limit > MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT {
return Err(Error::invalid_params(format!(
"Invalid limit; max {}",
MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT
)));
match verification {
Err(err) => Box::pin(future::err(err)),
Ok((address, before, until, limit)) => Box::pin(async move {
meta.get_signatures_for_address(address, before, until, limit, commitment)
.await
}),
}
meta.get_signatures_for_address(address, before, until, limit, config.commitment)
}
fn get_first_available_block(&self, meta: Self::Metadata) -> Result<Slot> {
fn get_first_available_block(&self, meta: Self::Metadata) -> BoxFuture<Result<Slot>> {
debug!("get_first_available_block rpc request received");
Ok(meta.get_first_available_block())
Box::pin(async move { Ok(meta.get_first_available_block().await) })
}
fn get_stake_activation(
@@ -3437,7 +3464,7 @@ pub mod rpc_full {
meta: Self::Metadata,
address_strs: Vec<String>,
config: Option<RpcEpochConfig>,
) -> Result<Vec<Option<RpcInflationReward>>> {
) -> BoxFuture<Result<Vec<Option<RpcInflationReward>>>> {
debug!(
"get_inflation_reward rpc request received: {:?}",
address_strs.len()
@@ -3445,10 +3472,15 @@ pub mod rpc_full {
let mut addresses: Vec<Pubkey> = vec![];
for address_str in address_strs {
addresses.push(verify_pubkey(&address_str)?);
match verify_pubkey(&address_str) {
Ok(pubkey) => {
addresses.push(pubkey);
}
Err(err) => return Box::pin(future::err(err)),
}
}
meta.get_inflation_reward(addresses, config)
Box::pin(async move { meta.get_inflation_reward(addresses, config).await })
}
fn get_token_account_balance(
@@ -3539,7 +3571,7 @@ pub mod rpc_deprecated_v1_7 {
meta: Self::Metadata,
slot: Slot,
config: Option<RpcEncodingConfigWrapper<RpcConfirmedBlockConfig>>,
) -> Result<Option<UiConfirmedBlock>>;
) -> BoxFuture<Result<Option<UiConfirmedBlock>>>;
// DEPRECATED
#[rpc(meta, name = "getConfirmedBlocks")]
@@ -3549,7 +3581,7 @@ pub mod rpc_deprecated_v1_7 {
start_slot: Slot,
config: Option<RpcConfirmedBlocksConfigWrapper>,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>>;
) -> BoxFuture<Result<Vec<Slot>>>;
// DEPRECATED
#[rpc(meta, name = "getConfirmedBlocksWithLimit")]
@@ -3559,7 +3591,7 @@ pub mod rpc_deprecated_v1_7 {
start_slot: Slot,
limit: usize,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>>;
) -> BoxFuture<Result<Vec<Slot>>>;
// DEPRECATED
#[rpc(meta, name = "getConfirmedTransaction")]
@@ -3568,7 +3600,7 @@ pub mod rpc_deprecated_v1_7 {
meta: Self::Metadata,
signature_str: String,
config: Option<RpcEncodingConfigWrapper<RpcConfirmedTransactionConfig>>,
) -> Result<Option<EncodedConfirmedTransaction>>;
) -> BoxFuture<Result<Option<EncodedConfirmedTransaction>>>;
// DEPRECATED
#[rpc(meta, name = "getConfirmedSignaturesForAddress2")]
@@ -3577,7 +3609,7 @@ pub mod rpc_deprecated_v1_7 {
meta: Self::Metadata,
address: String,
config: Option<RpcGetConfirmedSignaturesForAddress2Config>,
) -> Result<Vec<RpcConfirmedTransactionStatusWithSignature>>;
) -> BoxFuture<Result<Vec<RpcConfirmedTransactionStatusWithSignature>>>;
}
pub struct DeprecatedV1_7Impl;
@@ -3589,9 +3621,12 @@ pub mod rpc_deprecated_v1_7 {
meta: Self::Metadata,
slot: Slot,
config: Option<RpcEncodingConfigWrapper<RpcConfirmedBlockConfig>>,
) -> Result<Option<UiConfirmedBlock>> {
) -> BoxFuture<Result<Option<UiConfirmedBlock>>> {
debug!("get_confirmed_block rpc request received: {:?}", slot);
meta.get_block(slot, config.map(|config| config.convert()))
Box::pin(async move {
meta.get_block(slot, config.map(|config| config.convert()))
.await
})
}
fn get_confirmed_blocks(
@@ -3600,14 +3635,17 @@ pub mod rpc_deprecated_v1_7 {
start_slot: Slot,
config: Option<RpcConfirmedBlocksConfigWrapper>,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>> {
) -> BoxFuture<Result<Vec<Slot>>> {
let (end_slot, maybe_commitment) =
config.map(|config| config.unzip()).unwrap_or_default();
debug!(
"get_confirmed_blocks rpc request received: {}-{:?}",
start_slot, end_slot
);
meta.get_blocks(start_slot, end_slot, commitment.or(maybe_commitment))
Box::pin(async move {
meta.get_blocks(start_slot, end_slot, commitment.or(maybe_commitment))
.await
})
}
fn get_confirmed_blocks_with_limit(
@@ -3616,12 +3654,15 @@ pub mod rpc_deprecated_v1_7 {
start_slot: Slot,
limit: usize,
commitment: Option<CommitmentConfig>,
) -> Result<Vec<Slot>> {
) -> BoxFuture<Result<Vec<Slot>>> {
debug!(
"get_confirmed_blocks_with_limit rpc request received: {}-{}",
start_slot, limit,
);
meta.get_blocks_with_limit(start_slot, limit, commitment)
Box::pin(async move {
meta.get_blocks_with_limit(start_slot, limit, commitment)
.await
})
}
fn get_confirmed_transaction(
@@ -3629,13 +3670,19 @@ pub mod rpc_deprecated_v1_7 {
meta: Self::Metadata,
signature_str: String,
config: Option<RpcEncodingConfigWrapper<RpcConfirmedTransactionConfig>>,
) -> Result<Option<EncodedConfirmedTransaction>> {
) -> BoxFuture<Result<Option<EncodedConfirmedTransaction>>> {
debug!(
"get_confirmed_transaction rpc request received: {:?}",
signature_str
);
let signature = verify_signature(&signature_str)?;
meta.get_transaction(signature, config.map(|config| config.convert()))
let signature = verify_signature(&signature_str);
if let Err(err) = signature {
return Box::pin(future::err(err));
}
Box::pin(async move {
meta.get_transaction(signature.unwrap(), config.map(|config| config.convert()))
.await
})
}
fn get_confirmed_signatures_for_address2(
@@ -3643,30 +3690,23 @@ pub mod rpc_deprecated_v1_7 {
meta: Self::Metadata,
address: String,
config: Option<RpcGetConfirmedSignaturesForAddress2Config>,
) -> Result<Vec<RpcConfirmedTransactionStatusWithSignature>> {
let address = verify_pubkey(&address)?;
) -> BoxFuture<Result<Vec<RpcConfirmedTransactionStatusWithSignature>>> {
let config = config.unwrap_or_default();
let before = config
.before
.map(|ref before| verify_signature(before))
.transpose()?;
let until = config
.until
.map(|ref until| verify_signature(until))
.transpose()?;
let limit = config
.limit
.unwrap_or(MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT);
let commitment = config.commitment;
let verification = verify_and_parse_signatures_for_address_params(
address,
config.before,
config.until,
config.limit,
);
if limit == 0 || limit > MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT {
return Err(Error::invalid_params(format!(
"Invalid limit; max {}",
MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT
)));
match verification {
Err(err) => Box::pin(future::err(err)),
Ok((address, before, until, limit)) => Box::pin(async move {
meta.get_signatures_for_address(address, before, until, limit, commitment)
.await
}),
}
meta.get_signatures_for_address(address, before, until, limit, config.commitment)
}
}
}
@@ -4181,7 +4221,6 @@ pub mod tests {
RpcHealth::stub(),
cluster_info.clone(),
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
@@ -5740,7 +5779,6 @@ pub mod tests {
health.clone(),
cluster_info,
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
@@ -6021,7 +6059,6 @@ pub mod tests {
RpcHealth::stub(),
cluster_info,
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
@@ -7447,7 +7484,6 @@ pub mod tests {
RpcHealth::stub(),
cluster_info,
Hash::default(),
Arc::new(tokio::runtime::Runtime::new().unwrap()),
None,
optimistically_confirmed_bank.clone(),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),

View File

@@ -39,7 +39,6 @@ use {
sync::{mpsc::channel, Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
},
tokio::runtime,
tokio_util::codec::{BytesCodec, FramedRead},
};
@@ -118,10 +117,8 @@ impl RpcRequestMiddleware {
}
#[cfg(unix)]
async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio_02::fs::File> {
// Stuck on tokio 0.2 until the jsonrpc crates upgrade
use tokio_02::fs::os::unix::OpenOptionsExt;
tokio_02::fs::OpenOptions::new()
async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio::fs::File> {
tokio::fs::OpenOptions::new()
.read(true)
.write(false)
.create(false)
@@ -131,10 +128,9 @@ impl RpcRequestMiddleware {
}
#[cfg(not(unix))]
async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio_02::fs::File> {
async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio::fs::File> {
// TODO: Is there any way to achieve the same on Windows?
// Stuck on tokio 0.2 until the jsonrpc crates upgrade
tokio_02::fs::File::open(path).await
tokio::fs::File::open(path).await
}
fn process_file_get(&self, path: &str) -> RequestMiddlewareAction {
@@ -306,9 +302,17 @@ impl JsonRpcService {
)));
let tpu_address = cluster_info.my_contact_info().tpu;
// sadly, some parts of our current rpc implemention block the jsonrpc's
// _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU,
// causing no further processing of incoming requests and ultimatily innocent clients timing-out.
// So create a (shared) multi-threaded event_loop for jsonrpc and set its .threads() to 1,
// so that we avoid the single-threaded event loops from being created automatically by
// jsonrpc for threads when .threads(N > 1) is given.
let runtime = Arc::new(
runtime::Builder::new_multi_thread()
.thread_name("rpc-runtime")
tokio::runtime::Builder::new_multi_thread()
.worker_threads(rpc_threads)
.thread_name("sol-rpc-el")
.enable_all()
.build()
.expect("Runtime"),
@@ -364,7 +368,6 @@ impl JsonRpcService {
health.clone(),
cluster_info.clone(),
genesis_hash,
runtime,
bigtable_ledger_storage,
optimistically_confirmed_bank,
largest_accounts_cache,
@@ -389,23 +392,6 @@ impl JsonRpcService {
let ledger_path = ledger_path.to_path_buf();
// sadly, some parts of our current rpc implemention block the jsonrpc's
// _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU,
// causing no further processing of incoming requests and ultimatily innocent clients timing-out.
// So create a (shared) multi-threaded event_loop for jsonrpc and set its .threads() to 1,
// so that we avoid the single-threaded event loops from being created automatically by
// jsonrpc for threads when .threads(N > 1) is given.
let event_loop = {
// Stuck on tokio 0.2 until the jsonrpc crates upgrade
tokio_02::runtime::Builder::new()
.core_threads(rpc_threads)
.threaded_scheduler()
.enable_all()
.thread_name("sol-rpc-el")
.build()
.unwrap()
};
let (close_handle_sender, close_handle_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-jsonrpc".to_string())
@@ -431,7 +417,7 @@ impl JsonRpcService {
io,
move |_req: &hyper::Request<hyper::Body>| request_processor.clone(),
)
.event_loop_executor(event_loop.handle().clone())
.event_loop_executor(runtime.handle().clone())
.threads(1)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Any,
@@ -510,6 +496,7 @@ mod tests {
io::Write,
net::{IpAddr, Ipv4Addr},
},
tokio::runtime::Runtime,
};
#[test]
@@ -648,7 +635,7 @@ mod tests {
#[test]
fn test_process_file_get() {
let mut runtime = tokio_02::runtime::Runtime::new().unwrap();
let runtime = Runtime::new().unwrap();
let ledger_path = get_tmp_ledger_path!();
std::fs::create_dir(&ledger_path).unwrap();