Process async BankClient transactions in batches (#3738)
* Process async transactions in batches This aims to process transactions at least as fast as LocalCluster * Add benchmark
This commit is contained in:
@ -1,5 +1,5 @@
|
||||
use crate::bank::Bank;
|
||||
use solana_sdk::client::{AsyncClient, SyncClient};
|
||||
use solana_sdk::client::{AsyncClient, Client, SyncClient};
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::instruction::Instruction;
|
||||
use solana_sdk::message::Message;
|
||||
@ -10,17 +10,22 @@ use solana_sdk::system_instruction;
|
||||
use solana_sdk::transaction::{self, Transaction};
|
||||
use solana_sdk::transport::Result;
|
||||
use std::io;
|
||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
use std::sync::Arc;
|
||||
use std::thread::Builder;
|
||||
|
||||
pub struct BankClient {
|
||||
bank: Bank,
|
||||
bank: Arc<Bank>,
|
||||
transaction_sender: Sender<Transaction>,
|
||||
}
|
||||
|
||||
impl Client for BankClient {}
|
||||
|
||||
impl AsyncClient for BankClient {
|
||||
fn async_send_transaction(&self, transaction: Transaction) -> io::Result<Signature> {
|
||||
// Ignore the result. Client must use get_signature_status() instead.
|
||||
let _ = self.bank.process_transaction(&transaction);
|
||||
|
||||
Ok(transaction.signatures.get(0).cloned().unwrap_or_default())
|
||||
let signature = transaction.signatures.get(0).cloned().unwrap_or_default();
|
||||
self.transaction_sender.send(transaction).unwrap();
|
||||
Ok(signature)
|
||||
}
|
||||
|
||||
fn async_send_message(
|
||||
@ -104,8 +109,29 @@ impl SyncClient for BankClient {
|
||||
}
|
||||
|
||||
impl BankClient {
|
||||
fn run(bank: &Bank, transaction_receiver: Receiver<Transaction>) {
|
||||
while let Ok(tx) = transaction_receiver.recv() {
|
||||
let mut transactions = vec![tx];
|
||||
while let Ok(tx) = transaction_receiver.try_recv() {
|
||||
transactions.push(tx);
|
||||
}
|
||||
let _ = bank.process_transactions(&transactions);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(bank: Bank) -> Self {
|
||||
Self { bank }
|
||||
let bank = Arc::new(bank);
|
||||
let (transaction_sender, transaction_receiver) = channel();
|
||||
let thread_bank = bank.clone();
|
||||
let bank = bank.clone();
|
||||
Builder::new()
|
||||
.name("solana-bank-client".to_string())
|
||||
.spawn(move || Self::run(&thread_bank, transaction_receiver))
|
||||
.unwrap();
|
||||
Self {
|
||||
bank,
|
||||
transaction_sender,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user