Files
solana/replica-lib/src/accountsdb_repl_server.rs
Lijun Wang 8378e8790f Accountsdb replication installment 2 (#19325)
This is the 2nd installment for the AccountsDb replication.

Summary of Changes

The basic google protocol buffer protocol for replicating updated slots and accounts. tonic/tokio is used for transporting the messages.

The basic framework of the client and server for replicating slots and accounts -- the persisting of accounts in the replica-side will be done at the next PR -- right now -- the accounts are streamed to the replica-node and dumped. Replication for information about Bank is also not done in this PR -- to be addressed in the next PR to limit the change size.

Functionality used by both the client and server side are encapsulated in the replica-lib crate.

There is no impact to the existing validator by default.

Tests:

Observe the confirmed slots replicated to the replica-node.
Observe the accounts for the confirmed slot are received at the replica-node side.
2021-09-01 14:10:16 -07:00

177 lines
5.4 KiB
Rust

use {
futures_util::FutureExt,
log::*,
std::{
net::SocketAddr,
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
},
tokio::{
runtime::Runtime,
sync::oneshot::{self, Receiver, Sender},
},
tonic::{self, transport},
};
tonic::include_proto!("accountsdb_repl");
pub trait ReplicaSlotConfirmationServer {
fn get_confirmed_slots(
&self,
request: &ReplicaSlotConfirmationRequest,
) -> Result<ReplicaSlotConfirmationResponse, tonic::Status>;
fn join(&mut self) -> thread::Result<()>;
}
pub trait ReplicaAccountsServer {
fn get_slot_accounts(
&self,
request: &ReplicaAccountsRequest,
) -> Result<ReplicaAccountsResponse, tonic::Status>;
fn join(&mut self) -> thread::Result<()>;
}
#[derive(Clone)]
struct AccountsDbReplServer {
confirmed_slots_server: Arc<RwLock<dyn ReplicaSlotConfirmationServer + Sync + Send>>,
accounts_server: Arc<RwLock<dyn ReplicaAccountsServer + Sync + Send>>,
}
/// Implementing the AccountsDbRepl interface declared by the protocol
#[tonic::async_trait]
impl accounts_db_repl_server::AccountsDbRepl for AccountsDbReplServer {
async fn get_confirmed_slots(
&self,
request: tonic::Request<ReplicaSlotConfirmationRequest>,
) -> Result<tonic::Response<ReplicaSlotConfirmationResponse>, tonic::Status> {
let server = self.confirmed_slots_server.read().unwrap();
let result = server.get_confirmed_slots(&request.into_inner());
result.map(tonic::Response::new)
}
async fn get_slot_accounts(
&self,
request: tonic::Request<ReplicaAccountsRequest>,
) -> Result<tonic::Response<ReplicaAccountsResponse>, tonic::Status> {
let server = self.accounts_server.read().unwrap();
let result = server.get_slot_accounts(&request.into_inner());
result.map(tonic::Response::new)
}
}
impl AccountsDbReplServer {
pub fn new(
confirmed_slots_server: Arc<RwLock<dyn ReplicaSlotConfirmationServer + Sync + Send>>,
accounts_server: Arc<RwLock<dyn ReplicaAccountsServer + Sync + Send>>,
) -> Self {
Self {
confirmed_slots_server,
accounts_server,
}
}
pub fn join(self) -> thread::Result<()> {
self.confirmed_slots_server.write().unwrap().join()?;
self.accounts_server.write().unwrap().join()
}
}
#[derive(Clone)]
pub struct AccountsDbReplServiceConfig {
pub worker_threads: usize,
pub replica_server_addr: SocketAddr,
}
/// The service wraps the AccountsDbReplServer to make runnable in the tokio runtime
/// and handles start and stop of the service.
pub struct AccountsDbReplService {
accountsdb_repl_server: AccountsDbReplServer,
thread: JoinHandle<()>,
exit_signal_sender: Sender<()>,
}
impl AccountsDbReplService {
pub fn new(
config: AccountsDbReplServiceConfig,
confirmed_slots_server: Arc<RwLock<dyn ReplicaSlotConfirmationServer + Sync + Send>>,
accounts_server: Arc<RwLock<dyn ReplicaAccountsServer + Sync + Send>>,
) -> Self {
let accountsdb_repl_server =
AccountsDbReplServer::new(confirmed_slots_server, accounts_server);
let worker_threads = config.worker_threads;
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(worker_threads)
.thread_name("sol-accountsdb-repl-wrk")
.enable_all()
.build()
.expect("Runtime"),
);
let server_cloned = accountsdb_repl_server.clone();
let (exit_signal_sender, exit_signal_receiver) = oneshot::channel::<()>();
let thread = Builder::new()
.name("sol-accountsdb-repl-rt".to_string())
.spawn(move || {
Self::run_accountsdb_repl_server_in_runtime(
config,
runtime,
server_cloned,
exit_signal_receiver,
);
})
.unwrap();
Self {
accountsdb_repl_server,
thread,
exit_signal_sender,
}
}
async fn run_accountsdb_repl_server(
config: AccountsDbReplServiceConfig,
server: AccountsDbReplServer,
exit_signal: Receiver<()>,
) -> Result<(), tonic::transport::Error> {
info!(
"Running AccountsDbReplServer at the endpoint: {:?}",
config.replica_server_addr
);
transport::Server::builder()
.add_service(accounts_db_repl_server::AccountsDbReplServer::new(server))
.serve_with_shutdown(config.replica_server_addr, exit_signal.map(drop))
.await
}
fn run_accountsdb_repl_server_in_runtime(
config: AccountsDbReplServiceConfig,
runtime: Arc<Runtime>,
server: AccountsDbReplServer,
exit_signal: Receiver<()>,
) {
let result = runtime.block_on(Self::run_accountsdb_repl_server(
config,
server,
exit_signal,
));
match result {
Ok(_) => {
info!("AccountsDbReplServer finished");
}
Err(err) => {
error!("AccountsDbReplServer finished in error: {:}?", err);
}
}
}
pub fn join(self) -> thread::Result<()> {
let _ = self.exit_signal_sender.send(());
self.accountsdb_repl_server.join()?;
self.thread.join()
}
}