Accountsdb replication installment 2 (#19325)

This is the 2nd installment for the AccountsDb replication.

Summary of Changes

The basic google protocol buffer protocol for replicating updated slots and accounts. tonic/tokio is used for transporting the messages.

The basic framework of the client and server for replicating slots and accounts -- the persisting of accounts in the replica-side will be done at the next PR -- right now -- the accounts are streamed to the replica-node and dumped. Replication for information about Bank is also not done in this PR -- to be addressed in the next PR to limit the change size.

Functionality used by both the client and server side are encapsulated in the replica-lib crate.

There is no impact to the existing validator by default.

Tests:

Observe the confirmed slots replicated to the replica-node.
Observe the accounts for the confirmed slot are received at the replica-node side.
This commit is contained in:
Lijun Wang
2021-09-01 14:10:16 -07:00
committed by GitHub
parent 27c2180db9
commit 8378e8790f
28 changed files with 994 additions and 27 deletions

34
replica-lib/Cargo.toml Normal file
View File

@@ -0,0 +1,34 @@
[package]
authors = ["Solana Maintainers <maintainers@solana.foundation>"]
edition = "2018"
name = "solana-replica-lib"
description = "The library used for replication by both the client and server"
version = "1.8.0"
repository = "https://github.com/solana-labs/solana"
license = "Apache-2.0"
homepage = "https://solana.com/"
documentation = "https://docs.rs/solana-validator"
[dependencies]
bincode = "1.3.1"
chrono = { version = "0.4.11", features = ["serde"] }
crossbeam-channel = "0.5"
futures = "0.3"
futures-util = "0.3"
log = "0.4.11"
prost = "0.8.0"
prost-types = "0.8.0"
serde = "1.0.112"
solana-logger = { path = "../logger", version = "=1.8.0" }
solana-metrics = { path = "../metrics", version = "=1.8.0" }
solana-rpc = { path = "../rpc", version = "=1.8.0" }
solana-runtime = { path = "../runtime", version = "=1.8.0" }
solana-sdk = { path = "../sdk", version = "=1.8.0" }
tokio = { version = "1", features = ["full"] }
tonic = { version = "0.5.0", features = ["tls", "transport"] }
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
[build-dependencies]
tonic-build = "0.5.1"

5
replica-lib/build.rs Normal file
View File

@@ -0,0 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
// compiling protos using path on build time
tonic_build::configure().compile(&["proto/accountsdb_repl.proto"], &["proto"])?;
Ok(())
}

View File

@@ -0,0 +1,44 @@
// version of prorocol buffer used
syntax = "proto3";
package accountsdb_repl;
message ReplicaSlotConfirmationRequest {
uint64 last_replicated_slot = 1;
}
message ReplicaSlotConfirmationResponse {
repeated uint64 updated_slots = 1;
}
message ReplicaAccountsRequest {
uint64 slot = 1;
}
message ReplicaAccountMeta {
bytes Pubkey = 1;
uint64 lamports = 2;
bytes owner = 3;
bool executable = 4;
uint64 rent_epoch = 5;
}
message ReplicaAccountData {
bytes data = 1;
}
message ReplicaAccountInfo {
ReplicaAccountMeta account_meta = 1;
bytes hash = 2;
ReplicaAccountData data = 3;
}
message ReplicaAccountsResponse {
repeated ReplicaAccountInfo accounts = 1;
}
service AccountsDbRepl {
rpc get_confirmed_slots(ReplicaSlotConfirmationRequest) returns (ReplicaSlotConfirmationResponse);
rpc get_slot_accounts(ReplicaAccountsRequest) returns (ReplicaAccountsResponse);
}

View File

@@ -0,0 +1,119 @@
use {
log::*,
solana_sdk::clock::Slot,
std::{net::SocketAddr, sync::Arc},
tokio::runtime::Runtime,
tonic::{self, transport::Endpoint, Request},
};
tonic::include_proto!("accountsdb_repl");
pub struct AccountsDbReplClient {
client: accounts_db_repl_client::AccountsDbReplClient<tonic::transport::Channel>,
}
#[derive(Debug)]
pub enum ReplicaRpcError {
InvalidUrl(String),
ConnectionError(String),
GetSlotsError(String),
GetAccountsError(String),
}
impl From<tonic::transport::Error> for ReplicaRpcError {
fn from(err: tonic::transport::Error) -> Self {
ReplicaRpcError::ConnectionError(err.to_string())
}
}
impl AccountsDbReplClient {
pub async fn connect(rpc_peer: &SocketAddr) -> Result<Self, ReplicaRpcError> {
let url = format!("http://{}", rpc_peer);
let endpoint = match Endpoint::from_shared(url.to_string()) {
Ok(endpoint) => endpoint,
Err(e) => {
return Err(ReplicaRpcError::InvalidUrl(e.to_string()));
}
};
let client = accounts_db_repl_client::AccountsDbReplClient::connect(endpoint).await?;
info!(
"Successfully connected to the AccountsDb Replication server: {:?}",
url
);
Ok(AccountsDbReplClient { client })
}
pub async fn get_confirmed_slots(
&mut self,
last_slot: Slot,
) -> Result<Vec<Slot>, ReplicaRpcError> {
let request = ReplicaSlotConfirmationRequest {
last_replicated_slot: last_slot,
};
let response = self.client.get_confirmed_slots(Request::new(request)).await;
match response {
Ok(response) => Ok(response.into_inner().updated_slots),
Err(status) => Err(ReplicaRpcError::GetSlotsError(status.to_string())),
}
}
pub async fn get_slot_accounts(
&mut self,
slot: Slot,
) -> Result<Vec<ReplicaAccountInfo>, ReplicaRpcError> {
let request = ReplicaAccountsRequest { slot };
let response = self.client.get_slot_accounts(Request::new(request)).await;
match response {
Ok(response) => Ok(response.into_inner().accounts),
Err(status) => Err(ReplicaRpcError::GetAccountsError(status.to_string())),
}
}
}
#[derive(Clone)]
pub struct AccountsDbReplClientServiceConfig {
pub worker_threads: usize,
pub replica_server_addr: SocketAddr,
}
/// The service wrapper over AccountsDbReplClient to make it run in the tokio runtime
pub struct AccountsDbReplClientService {
runtime: Arc<Runtime>,
accountsdb_repl_client: AccountsDbReplClient,
}
impl AccountsDbReplClientService {
pub fn new(config: AccountsDbReplClientServiceConfig) -> Result<Self, ReplicaRpcError> {
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.worker_threads)
.thread_name("sol-accountsdb-repl-wrk")
.enable_all()
.build()
.expect("Runtime"),
);
let accountsdb_repl_client =
runtime.block_on(AccountsDbReplClient::connect(&config.replica_server_addr))?;
Ok(Self {
runtime,
accountsdb_repl_client,
})
}
pub fn get_confirmed_slots(&mut self, last_slot: Slot) -> Result<Vec<Slot>, ReplicaRpcError> {
self.runtime
.block_on(self.accountsdb_repl_client.get_confirmed_slots(last_slot))
}
pub fn get_slot_accounts(
&mut self,
slot: Slot,
) -> Result<Vec<ReplicaAccountInfo>, ReplicaRpcError> {
self.runtime
.block_on(self.accountsdb_repl_client.get_slot_accounts(slot))
}
}

View File

@@ -0,0 +1,176 @@
use {
futures_util::FutureExt,
log::*,
std::{
net::SocketAddr,
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
},
tokio::{
runtime::Runtime,
sync::oneshot::{self, Receiver, Sender},
},
tonic::{self, transport},
};
tonic::include_proto!("accountsdb_repl");
pub trait ReplicaSlotConfirmationServer {
fn get_confirmed_slots(
&self,
request: &ReplicaSlotConfirmationRequest,
) -> Result<ReplicaSlotConfirmationResponse, tonic::Status>;
fn join(&mut self) -> thread::Result<()>;
}
pub trait ReplicaAccountsServer {
fn get_slot_accounts(
&self,
request: &ReplicaAccountsRequest,
) -> Result<ReplicaAccountsResponse, tonic::Status>;
fn join(&mut self) -> thread::Result<()>;
}
#[derive(Clone)]
struct AccountsDbReplServer {
confirmed_slots_server: Arc<RwLock<dyn ReplicaSlotConfirmationServer + Sync + Send>>,
accounts_server: Arc<RwLock<dyn ReplicaAccountsServer + Sync + Send>>,
}
/// Implementing the AccountsDbRepl interface declared by the protocol
#[tonic::async_trait]
impl accounts_db_repl_server::AccountsDbRepl for AccountsDbReplServer {
async fn get_confirmed_slots(
&self,
request: tonic::Request<ReplicaSlotConfirmationRequest>,
) -> Result<tonic::Response<ReplicaSlotConfirmationResponse>, tonic::Status> {
let server = self.confirmed_slots_server.read().unwrap();
let result = server.get_confirmed_slots(&request.into_inner());
result.map(tonic::Response::new)
}
async fn get_slot_accounts(
&self,
request: tonic::Request<ReplicaAccountsRequest>,
) -> Result<tonic::Response<ReplicaAccountsResponse>, tonic::Status> {
let server = self.accounts_server.read().unwrap();
let result = server.get_slot_accounts(&request.into_inner());
result.map(tonic::Response::new)
}
}
impl AccountsDbReplServer {
pub fn new(
confirmed_slots_server: Arc<RwLock<dyn ReplicaSlotConfirmationServer + Sync + Send>>,
accounts_server: Arc<RwLock<dyn ReplicaAccountsServer + Sync + Send>>,
) -> Self {
Self {
confirmed_slots_server,
accounts_server,
}
}
pub fn join(self) -> thread::Result<()> {
self.confirmed_slots_server.write().unwrap().join()?;
self.accounts_server.write().unwrap().join()
}
}
#[derive(Clone)]
pub struct AccountsDbReplServiceConfig {
pub worker_threads: usize,
pub replica_server_addr: SocketAddr,
}
/// The service wraps the AccountsDbReplServer to make runnable in the tokio runtime
/// and handles start and stop of the service.
pub struct AccountsDbReplService {
accountsdb_repl_server: AccountsDbReplServer,
thread: JoinHandle<()>,
exit_signal_sender: Sender<()>,
}
impl AccountsDbReplService {
pub fn new(
config: AccountsDbReplServiceConfig,
confirmed_slots_server: Arc<RwLock<dyn ReplicaSlotConfirmationServer + Sync + Send>>,
accounts_server: Arc<RwLock<dyn ReplicaAccountsServer + Sync + Send>>,
) -> Self {
let accountsdb_repl_server =
AccountsDbReplServer::new(confirmed_slots_server, accounts_server);
let worker_threads = config.worker_threads;
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(worker_threads)
.thread_name("sol-accountsdb-repl-wrk")
.enable_all()
.build()
.expect("Runtime"),
);
let server_cloned = accountsdb_repl_server.clone();
let (exit_signal_sender, exit_signal_receiver) = oneshot::channel::<()>();
let thread = Builder::new()
.name("sol-accountsdb-repl-rt".to_string())
.spawn(move || {
Self::run_accountsdb_repl_server_in_runtime(
config,
runtime,
server_cloned,
exit_signal_receiver,
);
})
.unwrap();
Self {
accountsdb_repl_server,
thread,
exit_signal_sender,
}
}
async fn run_accountsdb_repl_server(
config: AccountsDbReplServiceConfig,
server: AccountsDbReplServer,
exit_signal: Receiver<()>,
) -> Result<(), tonic::transport::Error> {
info!(
"Running AccountsDbReplServer at the endpoint: {:?}",
config.replica_server_addr
);
transport::Server::builder()
.add_service(accounts_db_repl_server::AccountsDbReplServer::new(server))
.serve_with_shutdown(config.replica_server_addr, exit_signal.map(drop))
.await
}
fn run_accountsdb_repl_server_in_runtime(
config: AccountsDbReplServiceConfig,
runtime: Arc<Runtime>,
server: AccountsDbReplServer,
exit_signal: Receiver<()>,
) {
let result = runtime.block_on(Self::run_accountsdb_repl_server(
config,
server,
exit_signal,
));
match result {
Ok(_) => {
info!("AccountsDbReplServer finished");
}
Err(err) => {
error!("AccountsDbReplServer finished in error: {:}?", err);
}
}
}
pub fn join(self) -> thread::Result<()> {
let _ = self.exit_signal_sender.send(());
self.accountsdb_repl_server.join()?;
self.thread.join()
}
}

View File

@@ -0,0 +1,29 @@
use {
crate::{
accountsdb_repl_server::{AccountsDbReplService, AccountsDbReplServiceConfig},
replica_accounts_server::ReplicaAccountsServerImpl,
replica_confirmed_slots_server::ReplicaSlotConfirmationServerImpl,
},
crossbeam_channel::Receiver,
solana_runtime::bank_forks::BankForks,
solana_sdk::clock::Slot,
std::sync::{Arc, RwLock},
};
pub struct AccountsDbReplServerFactory {}
impl AccountsDbReplServerFactory {
pub fn build_accountsdb_repl_server(
config: AccountsDbReplServiceConfig,
confirmed_bank_receiver: Receiver<Slot>,
bank_forks: Arc<RwLock<BankForks>>,
) -> AccountsDbReplService {
AccountsDbReplService::new(
config,
Arc::new(RwLock::new(ReplicaSlotConfirmationServerImpl::new(
confirmed_bank_receiver,
))),
Arc::new(RwLock::new(ReplicaAccountsServerImpl::new(bank_forks))),
)
}
}

7
replica-lib/src/lib.rs Normal file
View File

@@ -0,0 +1,7 @@
#![allow(clippy::integer_arithmetic)]
pub mod accountsdb_repl_client;
pub mod accountsdb_repl_server;
pub mod accountsdb_repl_server_factory;
pub mod replica_accounts_server;
pub mod replica_confirmed_slots_server;

View File

@@ -0,0 +1,95 @@
use {
crate::accountsdb_repl_server::{
self, ReplicaAccountData, ReplicaAccountInfo, ReplicaAccountMeta, ReplicaAccountsServer,
},
solana_runtime::{
accounts_cache::CachedAccount, accounts_db::LoadedAccount, append_vec::StoredAccountMeta,
bank_forks::BankForks,
},
solana_sdk::account::Account,
std::{
cmp::Eq,
sync::{Arc, RwLock},
thread,
},
};
pub(crate) struct ReplicaAccountsServerImpl {
bank_forks: Arc<RwLock<BankForks>>,
}
impl Eq for ReplicaAccountInfo {}
impl ReplicaAccountInfo {
fn from_stored_account_meta(stored_account_meta: &StoredAccountMeta) -> Self {
let account_meta = Some(ReplicaAccountMeta {
pubkey: stored_account_meta.meta.pubkey.to_bytes().to_vec(),
lamports: stored_account_meta.account_meta.lamports,
owner: stored_account_meta.account_meta.owner.to_bytes().to_vec(),
executable: stored_account_meta.account_meta.executable,
rent_epoch: stored_account_meta.account_meta.rent_epoch,
});
let data = Some(ReplicaAccountData {
data: stored_account_meta.data.to_vec(),
});
ReplicaAccountInfo {
account_meta,
hash: stored_account_meta.hash.0.to_vec(),
data,
}
}
fn from_cached_account(cached_account: &CachedAccount) -> Self {
let account = Account::from(cached_account.account.clone());
let account_meta = Some(ReplicaAccountMeta {
pubkey: cached_account.pubkey().to_bytes().to_vec(),
lamports: account.lamports,
owner: account.owner.to_bytes().to_vec(),
executable: account.executable,
rent_epoch: account.rent_epoch,
});
let data = Some(ReplicaAccountData {
data: account.data.to_vec(),
});
ReplicaAccountInfo {
account_meta,
hash: cached_account.hash().0.to_vec(),
data,
}
}
}
impl ReplicaAccountsServer for ReplicaAccountsServerImpl {
fn get_slot_accounts(
&self,
request: &accountsdb_repl_server::ReplicaAccountsRequest,
) -> Result<accountsdb_repl_server::ReplicaAccountsResponse, tonic::Status> {
let slot = request.slot;
match self.bank_forks.read().unwrap().get(slot) {
None => Err(tonic::Status::not_found("The slot is not found")),
Some(bank) => {
let accounts = bank.rc.accounts.scan_slot(slot, |account| match account {
LoadedAccount::Stored(stored_account_meta) => Some(
ReplicaAccountInfo::from_stored_account_meta(&stored_account_meta),
),
LoadedAccount::Cached((_pubkey, cached_account)) => {
Some(ReplicaAccountInfo::from_cached_account(&cached_account))
}
});
Ok(accountsdb_repl_server::ReplicaAccountsResponse { accounts })
}
}
}
fn join(&mut self) -> thread::Result<()> {
Ok(())
}
}
impl ReplicaAccountsServerImpl {
pub fn new(bank_forks: Arc<RwLock<BankForks>>) -> Self {
Self { bank_forks }
}
}

View File

@@ -0,0 +1,119 @@
use {
crate::accountsdb_repl_server::{self, ReplicaSlotConfirmationServer},
crossbeam_channel::Receiver,
solana_sdk::{clock::Slot, commitment_config::CommitmentLevel},
std::{
collections::VecDeque,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
},
tonic,
};
/// The structure modelling the slots eligible for replication and
/// their states.
#[derive(Default, Clone)]
struct ReplicaEligibleSlotSet {
slot_set: Arc<RwLock<VecDeque<(Slot, CommitmentLevel)>>>,
}
pub(crate) struct ReplicaSlotConfirmationServerImpl {
eligible_slot_set: ReplicaEligibleSlotSet,
confirmed_bank_receiver_service: Option<JoinHandle<()>>,
cleanup_service: Option<JoinHandle<()>>,
exit_updated_slot_server: Arc<AtomicBool>,
}
impl ReplicaSlotConfirmationServer for ReplicaSlotConfirmationServerImpl {
fn get_confirmed_slots(
&self,
request: &accountsdb_repl_server::ReplicaSlotConfirmationRequest,
) -> Result<accountsdb_repl_server::ReplicaSlotConfirmationResponse, tonic::Status> {
let slot_set = self.eligible_slot_set.slot_set.read().unwrap();
let updated_slots: Vec<u64> = slot_set
.iter()
.filter(|(slot, _)| *slot > request.last_replicated_slot)
.map(|(slot, _)| *slot)
.collect();
Ok(accountsdb_repl_server::ReplicaSlotConfirmationResponse { updated_slots })
}
fn join(&mut self) -> thread::Result<()> {
self.exit_updated_slot_server.store(true, Ordering::Relaxed);
self.confirmed_bank_receiver_service
.take()
.map(JoinHandle::join)
.unwrap()
.expect("confirmed_bank_receiver_service");
self.cleanup_service.take().map(JoinHandle::join).unwrap()
}
}
const MAX_ELIGIBLE_SLOT_SET_SIZE: usize = 262144;
impl ReplicaSlotConfirmationServerImpl {
pub fn new(confirmed_bank_receiver: Receiver<Slot>) -> Self {
let eligible_slot_set = ReplicaEligibleSlotSet::default();
let exit_updated_slot_server = Arc::new(AtomicBool::new(false));
Self {
eligible_slot_set: eligible_slot_set.clone(),
confirmed_bank_receiver_service: Some(Self::run_confirmed_bank_receiver(
confirmed_bank_receiver,
eligible_slot_set.clone(),
exit_updated_slot_server.clone(),
)),
cleanup_service: Some(Self::run_cleanup_service(
eligible_slot_set,
MAX_ELIGIBLE_SLOT_SET_SIZE,
exit_updated_slot_server.clone(),
)),
exit_updated_slot_server,
}
}
fn run_confirmed_bank_receiver(
confirmed_bank_receiver: Receiver<Slot>,
eligible_slot_set: ReplicaEligibleSlotSet,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("confirmed_bank_receiver".to_string())
.spawn(move || {
while !exit.load(Ordering::Relaxed) {
if let Ok(slot) = confirmed_bank_receiver.recv() {
let mut slot_set = eligible_slot_set.slot_set.write().unwrap();
slot_set.push_back((slot, CommitmentLevel::Confirmed));
}
}
})
.unwrap()
}
fn run_cleanup_service(
eligible_slot_set: ReplicaEligibleSlotSet,
max_set_size: usize,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("cleanup_service".to_string())
.spawn(move || {
while !exit.load(Ordering::Relaxed) {
let mut slot_set = eligible_slot_set.slot_set.write().unwrap();
let count_to_drain = slot_set.len().saturating_sub(max_set_size);
if count_to_drain > 0 {
drop(slot_set.drain(..count_to_drain));
}
drop(slot_set);
sleep(Duration::from_millis(200));
}
})
.unwrap()
}
}