Revert "Update TpuConnection interface to be compatible with versioned txs (#23760)"
This reverts commit 016d3c450a
.
This commit is contained in:
committed by
GitHub
parent
016d3c450a
commit
88b6797049
@ -7,8 +7,10 @@ use {
|
|||||||
futures::future::join_all,
|
futures::future::join_all,
|
||||||
itertools::Itertools,
|
itertools::Itertools,
|
||||||
quinn::{ClientConfig, Endpoint, EndpointConfig, NewConnection, WriteError},
|
quinn::{ClientConfig, Endpoint, EndpointConfig, NewConnection, WriteError},
|
||||||
|
rayon::iter::{IntoParallelIterator, ParallelIterator},
|
||||||
solana_sdk::{
|
solana_sdk::{
|
||||||
quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_PORT_OFFSET},
|
quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_PORT_OFFSET},
|
||||||
|
transaction::Transaction,
|
||||||
transport::Result as TransportResult,
|
transport::Result as TransportResult,
|
||||||
},
|
},
|
||||||
std::{
|
std::{
|
||||||
@ -63,19 +65,21 @@ impl TpuConnection for QuicTpuConnection {
|
|||||||
&self.client.addr
|
&self.client.addr
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()> {
|
fn send_wire_transaction(&self, data: &[u8]) -> TransportResult<()> {
|
||||||
let _guard = self.client.runtime.enter();
|
let _guard = self.client.runtime.enter();
|
||||||
let send_buffer = self.client.send_buffer(wire_transaction);
|
let send_buffer = self.client.send_buffer(data);
|
||||||
self.client.runtime.block_on(send_buffer)?;
|
self.client.runtime.block_on(send_buffer)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_wire_transaction_batch(
|
fn send_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
|
||||||
&self,
|
let buffers = transactions
|
||||||
wire_transaction_batch: &[Vec<u8>],
|
.into_par_iter()
|
||||||
) -> TransportResult<()> {
|
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let _guard = self.client.runtime.enter();
|
let _guard = self.client.runtime.enter();
|
||||||
let send_batch = self.client.send_batch(wire_transaction_batch);
|
let send_batch = self.client.send_batch(&buffers);
|
||||||
self.client.runtime.block_on(send_batch)?;
|
self.client.runtime.block_on(send_batch)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,7 @@ use {
|
|||||||
signers::Signers,
|
signers::Signers,
|
||||||
system_instruction,
|
system_instruction,
|
||||||
timing::duration_as_ms,
|
timing::duration_as_ms,
|
||||||
transaction::{self, Transaction, VersionedTransaction},
|
transaction::{self, Transaction},
|
||||||
transport::Result as TransportResult,
|
transport::Result as TransportResult,
|
||||||
},
|
},
|
||||||
std::{
|
std::{
|
||||||
@ -215,13 +215,10 @@ impl<C: 'static + TpuConnection> ThinClient<C> {
|
|||||||
let mut num_confirmed = 0;
|
let mut num_confirmed = 0;
|
||||||
let mut wait_time = MAX_PROCESSING_AGE;
|
let mut wait_time = MAX_PROCESSING_AGE;
|
||||||
// resend the same transaction until the transaction has no chance of succeeding
|
// resend the same transaction until the transaction has no chance of succeeding
|
||||||
let wire_transaction =
|
|
||||||
bincode::serialize(&transaction).expect("transaction serialization failed");
|
|
||||||
while now.elapsed().as_secs() < wait_time as u64 {
|
while now.elapsed().as_secs() < wait_time as u64 {
|
||||||
if num_confirmed == 0 {
|
if num_confirmed == 0 {
|
||||||
// Send the transaction if there has been no confirmation (e.g. the first time)
|
// Send the transaction if there has been no confirmation (e.g. the first time)
|
||||||
self.tpu_connection()
|
self.tpu_connection().send_transaction(transaction)?;
|
||||||
.send_wire_transaction(&wire_transaction)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
|
if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
|
||||||
@ -604,17 +601,12 @@ impl<C: 'static + TpuConnection> SyncClient for ThinClient<C> {
|
|||||||
|
|
||||||
impl<C: 'static + TpuConnection> AsyncClient for ThinClient<C> {
|
impl<C: 'static + TpuConnection> AsyncClient for ThinClient<C> {
|
||||||
fn async_send_transaction(&self, transaction: Transaction) -> TransportResult<Signature> {
|
fn async_send_transaction(&self, transaction: Transaction) -> TransportResult<Signature> {
|
||||||
let transaction = VersionedTransaction::from(transaction);
|
self.tpu_connection().send_transaction(&transaction)?;
|
||||||
self.tpu_connection()
|
|
||||||
.serialize_and_send_transaction(&transaction)?;
|
|
||||||
Ok(transaction.signatures[0])
|
Ok(transaction.signatures[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
fn async_send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()> {
|
fn async_send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()> {
|
||||||
let batch: Vec<VersionedTransaction> = transactions.into_iter().map(Into::into).collect();
|
self.tpu_connection().send_batch(&transactions)
|
||||||
self.tpu_connection()
|
|
||||||
.par_serialize_and_send_transaction_batch(&batch)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn async_send_message<T: Signers>(
|
fn async_send_message<T: Signers>(
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
use {
|
use {
|
||||||
rayon::iter::{IntoParallelRefIterator, ParallelIterator},
|
solana_sdk::{transaction::Transaction, transport::Result as TransportResult},
|
||||||
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
|
|
||||||
std::net::{SocketAddr, UdpSocket},
|
std::net::{SocketAddr, UdpSocket},
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -11,35 +10,12 @@ pub trait TpuConnection {
|
|||||||
|
|
||||||
fn tpu_addr(&self) -> &SocketAddr;
|
fn tpu_addr(&self) -> &SocketAddr;
|
||||||
|
|
||||||
fn serialize_and_send_transaction(
|
fn send_transaction(&self, tx: &Transaction) -> TransportResult<()> {
|
||||||
&self,
|
let data = bincode::serialize(tx).expect("serialize Transaction in send_transaction");
|
||||||
transaction: &VersionedTransaction,
|
self.send_wire_transaction(&data)
|
||||||
) -> TransportResult<()> {
|
|
||||||
let wire_transaction =
|
|
||||||
bincode::serialize(transaction).expect("serialize Transaction in send_batch");
|
|
||||||
self.send_wire_transaction(&wire_transaction)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()>;
|
fn send_wire_transaction(&self, data: &[u8]) -> TransportResult<()>;
|
||||||
|
|
||||||
fn par_serialize_and_send_transaction_batch(
|
fn send_batch(&self, transactions: &[Transaction]) -> TransportResult<()>;
|
||||||
&self,
|
|
||||||
transaction_batch: &[VersionedTransaction],
|
|
||||||
) -> TransportResult<()> {
|
|
||||||
let wire_transaction_batch: Vec<_> = transaction_batch
|
|
||||||
.par_iter()
|
|
||||||
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
|
|
||||||
.collect();
|
|
||||||
self.send_wire_transaction_batch(&wire_transaction_batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn send_wire_transaction_batch(
|
|
||||||
&self,
|
|
||||||
wire_transaction_batch: &[Vec<u8>],
|
|
||||||
) -> TransportResult<()> {
|
|
||||||
for wire_transaction in wire_transaction_batch {
|
|
||||||
self.send_wire_transaction(wire_transaction)?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
|
|
||||||
use {
|
use {
|
||||||
crate::tpu_connection::TpuConnection,
|
crate::tpu_connection::TpuConnection,
|
||||||
solana_sdk::transport::Result as TransportResult,
|
solana_sdk::{transaction::Transaction, transport::Result as TransportResult},
|
||||||
std::net::{SocketAddr, UdpSocket},
|
std::net::{SocketAddr, UdpSocket},
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -24,8 +24,19 @@ impl TpuConnection for UdpTpuConnection {
|
|||||||
&self.addr
|
&self.addr
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()> {
|
fn send_wire_transaction(&self, data: &[u8]) -> TransportResult<()> {
|
||||||
self.socket.send_to(wire_transaction, self.addr)?;
|
self.socket.send_to(data, self.addr)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
|
||||||
|
transactions
|
||||||
|
.iter()
|
||||||
|
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
|
||||||
|
.try_for_each(|buff| -> TransportResult<()> {
|
||||||
|
self.socket.send_to(&buff, self.addr)?;
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user