Send BanksClient RPC requests before returning futures (#13539)

* Send RPC requests before returning futures

* Add process_transactions()
This commit is contained in:
Greg Fitzgerald
2020-11-11 18:56:26 -07:00
committed by GitHub
parent eb306da148
commit dadea873a9
3 changed files with 87 additions and 65 deletions

1
Cargo.lock generated
View File

@ -3632,6 +3632,7 @@ dependencies = [
"serde", "serde",
"solana-sdk", "solana-sdk",
"tarpc", "tarpc",
"tokio 0.3.2",
] ]
[[package]] [[package]]

View File

@ -5,7 +5,7 @@
//! but they are undocumented, may change over time, and are generally more //! but they are undocumented, may change over time, and are generally more
//! cumbersome to use. //! cumbersome to use.
use futures::future::join_all; use futures::{future::join_all, Future, FutureExt};
pub use solana_banks_interface::{BanksClient as TarpcClient, TransactionStatus}; pub use solana_banks_interface::{BanksClient as TarpcClient, TransactionStatus};
use solana_banks_interface::{BanksRequest, BanksResponse}; use solana_banks_interface::{BanksRequest, BanksResponse};
use solana_sdk::{ use solana_sdk::{
@ -52,168 +52,187 @@ impl BanksClient {
TarpcClient::new(config, transport) TarpcClient::new(config, transport)
} }
pub async fn send_transaction_with_context( pub fn send_transaction_with_context(
&mut self, &mut self,
ctx: Context, ctx: Context,
transaction: Transaction, transaction: Transaction,
) -> io::Result<()> { ) -> impl Future<Output = io::Result<()>> + '_ {
self.inner self.inner.send_transaction_with_context(ctx, transaction)
.send_transaction_with_context(ctx, transaction)
.await
} }
pub async fn get_fees_with_commitment_and_context( pub fn get_fees_with_commitment_and_context(
&mut self, &mut self,
ctx: Context, ctx: Context,
commitment: CommitmentLevel, commitment: CommitmentLevel,
) -> io::Result<(FeeCalculator, Hash, Slot)> { ) -> impl Future<Output = io::Result<(FeeCalculator, Hash, Slot)>> + '_ {
self.inner self.inner
.get_fees_with_commitment_and_context(ctx, commitment) .get_fees_with_commitment_and_context(ctx, commitment)
.await
} }
pub async fn get_transaction_status_with_context( pub fn get_transaction_status_with_context(
&mut self, &mut self,
ctx: Context, ctx: Context,
signature: Signature, signature: Signature,
) -> io::Result<Option<TransactionStatus>> { ) -> impl Future<Output = io::Result<Option<TransactionStatus>>> + '_ {
self.inner self.inner
.get_transaction_status_with_context(ctx, signature) .get_transaction_status_with_context(ctx, signature)
.await
} }
pub async fn get_slot_with_context( pub fn get_slot_with_context(
&mut self, &mut self,
ctx: Context, ctx: Context,
commitment: CommitmentLevel, commitment: CommitmentLevel,
) -> io::Result<Slot> { ) -> impl Future<Output = io::Result<Slot>> + '_ {
self.inner.get_slot_with_context(ctx, commitment).await self.inner.get_slot_with_context(ctx, commitment)
} }
pub async fn process_transaction_with_commitment_and_context( pub fn process_transaction_with_commitment_and_context(
&mut self, &mut self,
ctx: Context, ctx: Context,
transaction: Transaction, transaction: Transaction,
commitment: CommitmentLevel, commitment: CommitmentLevel,
) -> io::Result<Option<transaction::Result<()>>> { ) -> impl Future<Output = io::Result<Option<transaction::Result<()>>>> + '_ {
self.inner self.inner
.process_transaction_with_commitment_and_context(ctx, transaction, commitment) .process_transaction_with_commitment_and_context(ctx, transaction, commitment)
.await
} }
pub async fn get_account_with_commitment_and_context( pub fn get_account_with_commitment_and_context(
&mut self, &mut self,
ctx: Context, ctx: Context,
address: Pubkey, address: Pubkey,
commitment: CommitmentLevel, commitment: CommitmentLevel,
) -> io::Result<Option<Account>> { ) -> impl Future<Output = io::Result<Option<Account>>> + '_ {
self.inner self.inner
.get_account_with_commitment_and_context(ctx, address, commitment) .get_account_with_commitment_and_context(ctx, address, commitment)
.await
} }
/// Send a transaction and return immediately. The server will resend the /// Send a transaction and return immediately. The server will resend the
/// transaction until either it is accepted by the cluster or the transaction's /// transaction until either it is accepted by the cluster or the transaction's
/// blockhash expires. /// blockhash expires.
pub async fn send_transaction(&mut self, transaction: Transaction) -> io::Result<()> { pub fn send_transaction(
&mut self,
transaction: Transaction,
) -> impl Future<Output = io::Result<()>> + '_ {
self.send_transaction_with_context(context::current(), transaction) self.send_transaction_with_context(context::current(), transaction)
.await
} }
/// Return the fee parameters associated with a recent, rooted blockhash. The cluster /// Return the fee parameters associated with a recent, rooted blockhash. The cluster
/// will use the transaction's blockhash to look up these same fee parameters and /// will use the transaction's blockhash to look up these same fee parameters and
/// use them to calculate the transaction fee. /// use them to calculate the transaction fee.
pub async fn get_fees(&mut self) -> io::Result<(FeeCalculator, Hash, Slot)> { pub fn get_fees(
&mut self,
) -> impl Future<Output = io::Result<(FeeCalculator, Hash, Slot)>> + '_ {
self.get_fees_with_commitment_and_context(context::current(), CommitmentLevel::Root) self.get_fees_with_commitment_and_context(context::current(), CommitmentLevel::Root)
.await
} }
/// Return the cluster rent /// Return the cluster rent
pub async fn get_rent(&mut self) -> io::Result<Rent> { pub fn get_rent(&mut self) -> impl Future<Output = io::Result<Rent>> + '_ {
let rent_sysvar = self self.get_account(sysvar::rent::id()).map(|result| {
.get_account(sysvar::rent::id()) let rent_sysvar = result?
.await? .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Rent sysvar not present"))?;
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Rent sysvar not present"))?; from_account::<Rent>(&rent_sysvar).ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "Failed to deserialize Rent sysvar")
from_account::<Rent>(&rent_sysvar).ok_or_else(|| { })
io::Error::new(io::ErrorKind::Other, "Failed to deserialize Rent sysvar")
}) })
} }
/// Return a recent, rooted blockhash from the server. The cluster will only accept /// Return a recent, rooted blockhash from the server. The cluster will only accept
/// transactions with a blockhash that has not yet expired. Use the `get_fees` /// transactions with a blockhash that has not yet expired. Use the `get_fees`
/// method to get both a blockhash and the blockhash's last valid slot. /// method to get both a blockhash and the blockhash's last valid slot.
pub async fn get_recent_blockhash(&mut self) -> io::Result<Hash> { pub fn get_recent_blockhash(&mut self) -> impl Future<Output = io::Result<Hash>> + '_ {
Ok(self.get_fees().await?.1) self.get_fees().map(|result| Ok(result?.1))
} }
/// Send a transaction and return after the transaction has been rejected or /// Send a transaction and return after the transaction has been rejected or
/// reached the given level of commitment. /// reached the given level of commitment.
pub async fn process_transaction_with_commitment( pub fn process_transaction_with_commitment(
&mut self, &mut self,
transaction: Transaction, transaction: Transaction,
commitment: CommitmentLevel, commitment: CommitmentLevel,
) -> transport::Result<()> { ) -> impl Future<Output = transport::Result<()>> + '_ {
let mut ctx = context::current(); let mut ctx = context::current();
ctx.deadline += Duration::from_secs(50); ctx.deadline += Duration::from_secs(50);
let result = self self.process_transaction_with_commitment_and_context(ctx, transaction, commitment)
.process_transaction_with_commitment_and_context(ctx, transaction, commitment) .map(|result| match result? {
.await?; None => {
match result { Err(Error::new(ErrorKind::TimedOut, "invalid blockhash or fee-payer").into())
None => Err(Error::new(ErrorKind::TimedOut, "invalid blockhash or fee-payer").into()), }
Some(transaction_result) => Ok(transaction_result?), Some(transaction_result) => Ok(transaction_result?),
} })
} }
/// Send a transaction and return after the transaction has been finalized or rejected. /// Send a transaction and return until the transaction has been finalized or rejected.
pub async fn process_transaction(&mut self, transaction: Transaction) -> transport::Result<()> { pub fn process_transaction(
&mut self,
transaction: Transaction,
) -> impl Future<Output = transport::Result<()>> + '_ {
self.process_transaction_with_commitment(transaction, CommitmentLevel::default()) self.process_transaction_with_commitment(transaction, CommitmentLevel::default())
.await }
pub async fn process_transactions_with_commitment(
&mut self,
transactions: Vec<Transaction>,
commitment: CommitmentLevel,
) -> transport::Result<()> {
let mut clients: Vec<_> = transactions.iter().map(|_| self.clone()).collect();
let futures = clients
.iter_mut()
.zip(transactions)
.map(|(client, transaction)| {
client.process_transaction_with_commitment(transaction, commitment)
});
let statuses = join_all(futures).await;
statuses.into_iter().collect() // Convert Vec<Result<_, _>> to Result<Vec<_>>
}
/// Send transactions and return until the transaction has been finalized or rejected.
pub fn process_transactions(
&mut self,
transactions: Vec<Transaction>,
) -> impl Future<Output = transport::Result<()>> + '_ {
self.process_transactions_with_commitment(transactions, CommitmentLevel::default())
} }
/// Return the most recent rooted slot height. All transactions at or below this height /// Return the most recent rooted slot height. All transactions at or below this height
/// are said to be finalized. The cluster will not fork to a higher slot height. /// are said to be finalized. The cluster will not fork to a higher slot height.
pub async fn get_root_slot(&mut self) -> io::Result<Slot> { pub fn get_root_slot(&mut self) -> impl Future<Output = io::Result<Slot>> + '_ {
self.get_slot_with_context(context::current(), CommitmentLevel::Root) self.get_slot_with_context(context::current(), CommitmentLevel::Root)
.await
} }
/// Return the account at the given address at the slot corresponding to the given /// Return the account at the given address at the slot corresponding to the given
/// commitment level. If the account is not found, None is returned. /// commitment level. If the account is not found, None is returned.
pub async fn get_account_with_commitment( pub fn get_account_with_commitment(
&mut self, &mut self,
address: Pubkey, address: Pubkey,
commitment: CommitmentLevel, commitment: CommitmentLevel,
) -> io::Result<Option<Account>> { ) -> impl Future<Output = io::Result<Option<Account>>> + '_ {
self.get_account_with_commitment_and_context(context::current(), address, commitment) self.get_account_with_commitment_and_context(context::current(), address, commitment)
.await
} }
/// Return the account at the given address at the time of the most recent root slot. /// Return the account at the given address at the time of the most recent root slot.
/// If the account is not found, None is returned. /// If the account is not found, None is returned.
pub async fn get_account(&mut self, address: Pubkey) -> io::Result<Option<Account>> { pub fn get_account(
&mut self,
address: Pubkey,
) -> impl Future<Output = io::Result<Option<Account>>> + '_ {
self.get_account_with_commitment(address, CommitmentLevel::default()) self.get_account_with_commitment(address, CommitmentLevel::default())
.await
} }
/// Return the balance in lamports of an account at the given address at the slot /// Return the balance in lamports of an account at the given address at the slot
/// corresponding to the given commitment level. /// corresponding to the given commitment level.
pub async fn get_balance_with_commitment( pub fn get_balance_with_commitment(
&mut self, &mut self,
address: Pubkey, address: Pubkey,
commitment: CommitmentLevel, commitment: CommitmentLevel,
) -> io::Result<u64> { ) -> impl Future<Output = io::Result<u64>> + '_ {
let account = self self.get_account_with_commitment_and_context(context::current(), address, commitment)
.get_account_with_commitment_and_context(context::current(), address, commitment) .map(|result| Ok(result?.map(|x| x.lamports).unwrap_or(0)))
.await?;
Ok(account.map(|x| x.lamports).unwrap_or(0))
} }
/// Return the balance in lamports of an account at the given address at the time /// Return the balance in lamports of an account at the given address at the time
/// of the most recent root slot. /// of the most recent root slot.
pub async fn get_balance(&mut self, address: Pubkey) -> io::Result<u64> { pub fn get_balance(&mut self, address: Pubkey) -> impl Future<Output = io::Result<u64>> + '_ {
self.get_balance_with_commitment(address, CommitmentLevel::default()) self.get_balance_with_commitment(address, CommitmentLevel::default())
.await
} }
/// Return the status of a transaction with a signature matching the transaction's first /// Return the status of a transaction with a signature matching the transaction's first
@ -221,12 +240,11 @@ impl BanksClient {
/// blockhash was expired or the fee-paying account had insufficient funds to pay the /// blockhash was expired or the fee-paying account had insufficient funds to pay the
/// transaction fee. Note that servers rarely store the full transaction history. This /// transaction fee. Note that servers rarely store the full transaction history. This
/// method may return None if the transaction status has been discarded. /// method may return None if the transaction status has been discarded.
pub async fn get_transaction_status( pub fn get_transaction_status(
&mut self, &mut self,
signature: Signature, signature: Signature,
) -> io::Result<Option<TransactionStatus>> { ) -> impl Future<Output = io::Result<Option<TransactionStatus>>> + '_ {
self.get_transaction_status_with_context(context::current(), signature) self.get_transaction_status_with_context(context::current(), signature)
.await
} }
/// Same as get_transaction_status, but for multiple transactions. /// Same as get_transaction_status, but for multiple transactions.

View File

@ -13,6 +13,9 @@ serde = { version = "1.0.112", features = ["derive"] }
solana-sdk = { path = "../sdk", version = "1.5.0" } solana-sdk = { path = "../sdk", version = "1.5.0" }
tarpc = { version = "0.23.0", features = ["full"] } tarpc = { version = "0.23.0", features = ["full"] }
[dev-dependencies]
tokio = { version = "0.3", features = ["full"] }
[lib] [lib]
crate-type = ["lib"] crate-type = ["lib"]
name = "solana_banks_interface" name = "solana_banks_interface"