Add a client for BankForks (#10728)

Also:
* Use BanksClient in solana-tokens
This commit is contained in:
Greg Fitzgerald
2020-08-07 08:45:17 -06:00
committed by GitHub
parent 4f2f9bd26f
commit bad486823c
29 changed files with 1239 additions and 342 deletions

26
banks-server/Cargo.toml Normal file
View File

@@ -0,0 +1,26 @@
[package]
name = "solana-banks-server"
version = "1.4.0"
description = "Solana banks server"
authors = ["Solana Maintainers <maintainers@solana.foundation>"]
repository = "https://github.com/solana-labs/solana"
license = "Apache-2.0"
homepage = "https://solana.com/"
edition = "2018"
[dependencies]
bincode = "1.3.1"
futures = "0.3"
solana-banks-interface = { path = "../banks-interface", version = "1.4.0" }
solana-runtime = { path = "../runtime", version = "1.4.0" }
solana-sdk = { path = "../sdk", version = "1.4.0" }
tarpc = { version = "0.21.0", features = ["full"] }
tokio = "0.2"
tokio-serde = { version = "0.6", features = ["bincode"] }
[lib]
crate-type = ["lib"]
name = "solana_banks_server"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

View File

@@ -0,0 +1,272 @@
use bincode::{deserialize, serialize};
use futures::{
future,
prelude::stream::{self, StreamExt},
};
use solana_banks_interface::{Banks, BanksRequest, BanksResponse, TransactionStatus};
use solana_runtime::{
bank::Bank,
bank_forks::BankForks,
commitment::{BlockCommitmentCache, CommitmentSlots},
send_transaction_service::{SendTransactionService, TransactionInfo},
};
use solana_sdk::{
account::Account,
clock::Slot,
commitment_config::CommitmentLevel,
fee_calculator::FeeCalculator,
hash::Hash,
pubkey::Pubkey,
signature::Signature,
transaction::{self, Transaction},
};
use std::{
collections::HashMap,
io,
net::SocketAddr,
sync::{
atomic::AtomicBool,
mpsc::{channel, Receiver, Sender},
Arc, RwLock,
},
thread::Builder,
time::Duration,
};
use tarpc::{
context::Context,
rpc::{transport::channel::UnboundedChannel, ClientMessage, Response},
serde_transport::tcp,
server::{self, Channel, Handler},
transport,
};
use tokio::time::delay_for;
use tokio_serde::formats::Bincode;
#[derive(Clone)]
struct BanksServer {
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
transaction_sender: Sender<TransactionInfo>,
}
impl BanksServer {
/// Return a BanksServer that forwards transactions to the
/// given sender. If unit-testing, those transactions can go to
/// a bank in the given BankForks. Otherwise, the receiver should
/// forward them to a validator in the leader schedule.
fn new(
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
transaction_sender: Sender<TransactionInfo>,
) -> Self {
Self {
bank_forks,
block_commitment_cache,
transaction_sender,
}
}
fn run(bank: &Bank, transaction_receiver: Receiver<TransactionInfo>) {
while let Ok(info) = transaction_receiver.recv() {
let mut transaction_infos = vec![info];
while let Ok(info) = transaction_receiver.try_recv() {
transaction_infos.push(info);
}
let transactions: Vec<_> = transaction_infos
.into_iter()
.map(|info| deserialize(&info.wire_transaction).unwrap())
.collect();
let _ = bank.process_transactions(&transactions);
}
}
/// Useful for unit-testing
fn new_loopback(bank_forks: Arc<RwLock<BankForks>>) -> Self {
let (transaction_sender, transaction_receiver) = channel();
let bank = bank_forks.read().unwrap().working_bank();
let slot = bank.slot();
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new(
HashMap::default(),
0,
CommitmentSlots {
slot,
root: 0,
highest_confirmed_slot: 0,
highest_confirmed_root: 0,
},
)));
Builder::new()
.name("solana-bank-forks-client".to_string())
.spawn(move || Self::run(&bank, transaction_receiver))
.unwrap();
Self::new(bank_forks, block_commitment_cache, transaction_sender)
}
fn slot(&self, commitment: CommitmentLevel) -> Slot {
self.block_commitment_cache
.read()
.unwrap()
.slot_with_commitment(commitment)
}
fn bank(&self, commitment: CommitmentLevel) -> Arc<Bank> {
self.bank_forks.read().unwrap()[self.slot(commitment)].clone()
}
async fn poll_signature_status(
self,
signature: Signature,
last_valid_slot: Slot,
commitment: CommitmentLevel,
) -> Option<transaction::Result<()>> {
let mut status = self.bank(commitment).get_signature_status(&signature);
while status.is_none() {
delay_for(Duration::from_millis(200)).await;
let bank = self.bank(commitment);
if bank.slot() > last_valid_slot {
break;
}
status = bank.get_signature_status(&signature);
}
status
}
}
#[tarpc::server]
impl Banks for BanksServer {
async fn send_transaction_with_context(self, _: Context, transaction: Transaction) {
let blockhash = &transaction.message.recent_blockhash;
let last_valid_slot = self
.bank_forks
.read()
.unwrap()
.root_bank()
.get_blockhash_last_valid_slot(&blockhash)
.unwrap();
let signature = transaction.signatures.get(0).cloned().unwrap_or_default();
let info =
TransactionInfo::new(signature, serialize(&transaction).unwrap(), last_valid_slot);
self.transaction_sender.send(info).unwrap();
}
async fn get_fees_with_commitment_and_context(
self,
_: Context,
commitment: CommitmentLevel,
) -> (FeeCalculator, Hash, Slot) {
let bank = self.bank(commitment);
let (blockhash, fee_calculator) = bank.last_blockhash_with_fee_calculator();
let last_valid_slot = bank.get_blockhash_last_valid_slot(&blockhash).unwrap();
(fee_calculator, blockhash, last_valid_slot)
}
async fn get_transaction_status_with_context(
self,
_: Context,
signature: Signature,
) -> Option<TransactionStatus> {
let bank = self.bank(CommitmentLevel::Recent);
let (slot, status) = bank.get_signature_status_slot(&signature)?;
let r_block_commitment_cache = self.block_commitment_cache.read().unwrap();
let confirmations = if r_block_commitment_cache.root() >= slot {
None
} else {
r_block_commitment_cache
.get_confirmation_count(slot)
.or(Some(0))
};
Some(TransactionStatus {
slot,
confirmations,
err: status.err(),
})
}
async fn get_slot_with_context(self, _: Context, commitment: CommitmentLevel) -> Slot {
self.slot(commitment)
}
async fn process_transaction_with_commitment_and_context(
self,
_: Context,
transaction: Transaction,
commitment: CommitmentLevel,
) -> Option<transaction::Result<()>> {
let blockhash = &transaction.message.recent_blockhash;
let last_valid_slot = self
.bank_forks
.read()
.unwrap()
.root_bank()
.get_blockhash_last_valid_slot(&blockhash)
.unwrap();
let signature = transaction.signatures.get(0).cloned().unwrap_or_default();
let info =
TransactionInfo::new(signature, serialize(&transaction).unwrap(), last_valid_slot);
self.transaction_sender.send(info).unwrap();
self.poll_signature_status(signature, last_valid_slot, commitment)
.await
}
async fn get_account_with_commitment_and_context(
self,
_: Context,
address: Pubkey,
commitment: CommitmentLevel,
) -> Option<Account> {
let bank = self.bank(commitment);
bank.get_account(&address)
}
}
pub async fn start_local_server(
bank_forks: &Arc<RwLock<BankForks>>,
) -> UnboundedChannel<Response<BanksResponse>, ClientMessage<BanksRequest>> {
let banks_server = BanksServer::new_loopback(bank_forks.clone());
let (client_transport, server_transport) = transport::channel::unbounded();
let server = server::new(server::Config::default())
.incoming(stream::once(future::ready(server_transport)))
.respond_with(banks_server.serve());
tokio::spawn(server);
client_transport
}
pub async fn start_tcp_server(
listen_addr: SocketAddr,
tpu_addr: SocketAddr,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
) -> io::Result<()> {
// Note: These settings are copied straight from the tarpc example.
let server = tcp::listen(listen_addr, Bincode::default)
.await?
// Ignore accept errors.
.filter_map(|r| future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)
// Limit channels to 1 per IP.
.max_channels_per_key(1, |t| t.as_ref().peer_addr().unwrap().ip())
// serve is generated by the service attribute. It takes as input any type implementing
// the generated Banks trait.
.map(move |chan| {
let (sender, receiver) = channel();
let exit_send_transaction_service = Arc::new(AtomicBool::new(false));
SendTransactionService::new(
tpu_addr,
&bank_forks,
&exit_send_transaction_service,
receiver,
);
let server =
BanksServer::new(bank_forks.clone(), block_commitment_cache.clone(), sender);
chan.respond_with(server.serve()).execute()
})
// Max 10 channels.
.buffer_unordered(10)
.for_each(|_| async {});
server.await;
Ok(())
}

2
banks-server/src/lib.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod banks_server;
pub mod rpc_banks_service;

View File

@@ -0,0 +1,116 @@
//! The `rpc_banks_service` module implements the Solana Banks RPC API.
use crate::banks_server::start_tcp_server;
use futures::{future::FutureExt, pin_mut, prelude::stream::StreamExt, select};
use solana_runtime::{bank_forks::BankForks, commitment::BlockCommitmentCache};
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
};
use tokio::{
runtime::Runtime,
time::{self, Duration},
};
pub struct RpcBanksService {
thread_hdl: JoinHandle<()>,
}
/// Run the TCP service until `exit` is set to true
async fn start_abortable_tcp_server(
listen_addr: SocketAddr,
tpu_addr: SocketAddr,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
exit: Arc<AtomicBool>,
) {
let server = start_tcp_server(
listen_addr,
tpu_addr,
bank_forks.clone(),
block_commitment_cache.clone(),
)
.fuse();
let interval = time::interval(Duration::from_millis(100)).fuse();
pin_mut!(server, interval);
loop {
select! {
_ = server => {},
_ = interval.select_next_some() => {
if exit.load(Ordering::Relaxed) {
break;
}
}
}
}
}
impl RpcBanksService {
fn run(
listen_addr: SocketAddr,
tpu_addr: SocketAddr,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
exit: Arc<AtomicBool>,
) {
let server = start_abortable_tcp_server(
listen_addr,
tpu_addr,
bank_forks,
block_commitment_cache,
exit,
);
Runtime::new().unwrap().block_on(server);
}
pub fn new(
listen_addr: SocketAddr,
tpu_addr: SocketAddr,
bank_forks: &Arc<RwLock<BankForks>>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
exit: &Arc<AtomicBool>,
) -> Self {
let bank_forks = bank_forks.clone();
let block_commitment_cache = block_commitment_cache.clone();
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("solana-rpc-banks".to_string())
.spawn(move || {
Self::run(
listen_addr,
tpu_addr,
bank_forks,
block_commitment_cache,
exit,
)
})
.unwrap();
Self { thread_hdl }
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
#[cfg(test)]
mod tests {
use super::*;
use solana_runtime::bank::Bank;
#[test]
fn test_rpc_banks_server_exit() {
let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::default())));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let exit = Arc::new(AtomicBool::new(false));
let addr = "127.0.0.1:0".parse().unwrap();
let service = RpcBanksService::new(addr, addr, &bank_forks, &block_commitment_cache, &exit);
exit.store(true, Ordering::Relaxed);
service.join().unwrap();
}
}