Update TpuConnection interface to be compatible with versioned txs (#23760) (#23913)

* Update TpuConnection interface to be compatible with versioned txs

* Add convenience method for sending txs

* use parallel iterator to serialize transactions

(cherry picked from commit 016d3c450a)

Co-authored-by: Justin Starry <justin@solana.com>
This commit is contained in:
mergify[bot]
2022-03-24 22:09:34 +00:00
committed by GitHub
parent d135e3b839
commit 8222f3a675
4 changed files with 52 additions and 35 deletions

View File

@ -7,10 +7,8 @@ use {
futures::future::join_all, futures::future::join_all,
itertools::Itertools, itertools::Itertools,
quinn::{ClientConfig, Endpoint, EndpointConfig, NewConnection, WriteError}, quinn::{ClientConfig, Endpoint, EndpointConfig, NewConnection, WriteError},
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_sdk::{ solana_sdk::{
quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_PORT_OFFSET}, quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_PORT_OFFSET},
transaction::Transaction,
transport::Result as TransportResult, transport::Result as TransportResult,
}, },
std::{ std::{
@ -65,21 +63,19 @@ impl TpuConnection for QuicTpuConnection {
&self.client.addr &self.client.addr
} }
fn send_wire_transaction(&self, data: &[u8]) -> TransportResult<()> { fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()> {
let _guard = self.client.runtime.enter(); let _guard = self.client.runtime.enter();
let send_buffer = self.client.send_buffer(data); let send_buffer = self.client.send_buffer(wire_transaction);
self.client.runtime.block_on(send_buffer)?; self.client.runtime.block_on(send_buffer)?;
Ok(()) Ok(())
} }
fn send_batch(&self, transactions: &[Transaction]) -> TransportResult<()> { fn send_wire_transaction_batch(
let buffers = transactions &self,
.into_par_iter() wire_transaction_batch: &[Vec<u8>],
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) ) -> TransportResult<()> {
.collect::<Vec<_>>();
let _guard = self.client.runtime.enter(); let _guard = self.client.runtime.enter();
let send_batch = self.client.send_batch(&buffers); let send_batch = self.client.send_batch(wire_transaction_batch);
self.client.runtime.block_on(send_batch)?; self.client.runtime.block_on(send_batch)?;
Ok(()) Ok(())
} }

View File

@ -24,7 +24,7 @@ use {
signers::Signers, signers::Signers,
system_instruction, system_instruction,
timing::duration_as_ms, timing::duration_as_ms,
transaction::{self, Transaction}, transaction::{self, Transaction, VersionedTransaction},
transport::Result as TransportResult, transport::Result as TransportResult,
}, },
std::{ std::{
@ -215,10 +215,13 @@ impl<C: 'static + TpuConnection> ThinClient<C> {
let mut num_confirmed = 0; let mut num_confirmed = 0;
let mut wait_time = MAX_PROCESSING_AGE; let mut wait_time = MAX_PROCESSING_AGE;
// resend the same transaction until the transaction has no chance of succeeding // resend the same transaction until the transaction has no chance of succeeding
let wire_transaction =
bincode::serialize(&transaction).expect("transaction serialization failed");
while now.elapsed().as_secs() < wait_time as u64 { while now.elapsed().as_secs() < wait_time as u64 {
if num_confirmed == 0 { if num_confirmed == 0 {
// Send the transaction if there has been no confirmation (e.g. the first time) // Send the transaction if there has been no confirmation (e.g. the first time)
self.tpu_connection().send_transaction(transaction)?; self.tpu_connection()
.send_wire_transaction(&wire_transaction)?;
} }
if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation( if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
@ -601,12 +604,17 @@ impl<C: 'static + TpuConnection> SyncClient for ThinClient<C> {
impl<C: 'static + TpuConnection> AsyncClient for ThinClient<C> { impl<C: 'static + TpuConnection> AsyncClient for ThinClient<C> {
fn async_send_transaction(&self, transaction: Transaction) -> TransportResult<Signature> { fn async_send_transaction(&self, transaction: Transaction) -> TransportResult<Signature> {
self.tpu_connection().send_transaction(&transaction)?; let transaction = VersionedTransaction::from(transaction);
self.tpu_connection()
.serialize_and_send_transaction(&transaction)?;
Ok(transaction.signatures[0]) Ok(transaction.signatures[0])
} }
fn async_send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()> { fn async_send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()> {
self.tpu_connection().send_batch(&transactions) let batch: Vec<VersionedTransaction> = transactions.into_iter().map(Into::into).collect();
self.tpu_connection()
.par_serialize_and_send_transaction_batch(&batch)?;
Ok(())
} }
fn async_send_message<T: Signers>( fn async_send_message<T: Signers>(

View File

@ -1,5 +1,6 @@
use { use {
solana_sdk::{transaction::Transaction, transport::Result as TransportResult}, rayon::iter::{IntoParallelRefIterator, ParallelIterator},
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
std::net::{SocketAddr, UdpSocket}, std::net::{SocketAddr, UdpSocket},
}; };
@ -10,12 +11,35 @@ pub trait TpuConnection {
fn tpu_addr(&self) -> &SocketAddr; fn tpu_addr(&self) -> &SocketAddr;
fn send_transaction(&self, tx: &Transaction) -> TransportResult<()> { fn serialize_and_send_transaction(
let data = bincode::serialize(tx).expect("serialize Transaction in send_transaction"); &self,
self.send_wire_transaction(&data) transaction: &VersionedTransaction,
) -> TransportResult<()> {
let wire_transaction =
bincode::serialize(transaction).expect("serialize Transaction in send_batch");
self.send_wire_transaction(&wire_transaction)
} }
fn send_wire_transaction(&self, data: &[u8]) -> TransportResult<()>; fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()>;
fn send_batch(&self, transactions: &[Transaction]) -> TransportResult<()>; fn par_serialize_and_send_transaction_batch(
&self,
transaction_batch: &[VersionedTransaction],
) -> TransportResult<()> {
let wire_transaction_batch: Vec<_> = transaction_batch
.par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect();
self.send_wire_transaction_batch(&wire_transaction_batch)
}
fn send_wire_transaction_batch(
&self,
wire_transaction_batch: &[Vec<u8>],
) -> TransportResult<()> {
for wire_transaction in wire_transaction_batch {
self.send_wire_transaction(wire_transaction)?;
}
Ok(())
}
} }

View File

@ -3,7 +3,7 @@
use { use {
crate::tpu_connection::TpuConnection, crate::tpu_connection::TpuConnection,
solana_sdk::{transaction::Transaction, transport::Result as TransportResult}, solana_sdk::transport::Result as TransportResult,
std::net::{SocketAddr, UdpSocket}, std::net::{SocketAddr, UdpSocket},
}; };
@ -24,19 +24,8 @@ impl TpuConnection for UdpTpuConnection {
&self.addr &self.addr
} }
fn send_wire_transaction(&self, data: &[u8]) -> TransportResult<()> { fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()> {
self.socket.send_to(data, self.addr)?; self.socket.send_to(wire_transaction, self.addr)?;
Ok(())
}
fn send_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
transactions
.iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.try_for_each(|buff| -> TransportResult<()> {
self.socket.send_to(&buff, self.addr)?;
Ok(())
})?;
Ok(()) Ok(())
} }
} }