Create solana-rpc crate and move subscriptions (#17320)

* Move non_circulating_supply to runtime

* Add solana-rpc crate and move max_slots

* Move subscriptions to solana-rpc

* Single use statements
This commit is contained in:
Tyera Eulberg
2021-05-19 00:54:28 -06:00
committed by GitHub
parent a3c0833a1c
commit 827355a6b1
29 changed files with 459 additions and 354 deletions

44
rpc/Cargo.toml Normal file
View File

@ -0,0 +1,44 @@
[package]
name = "solana-rpc"
version = "1.7.0"
description = "Solana RPC"
authors = ["Solana Maintainers <maintainers@solana.foundation>"]
repository = "https://github.com/solana-labs/solana"
license = "Apache-2.0"
homepage = "https://solana.com/"
documentation = "https://docs.rs/solana-rpc"
edition = "2018"
[dependencies]
bs58 = "0.3.1"
crossbeam-channel = "0.4"
jsonrpc-core = "17.0.0"
jsonrpc-core-client = { version = "17.0.0", features = ["ipc", "ws"] }
jsonrpc-derive = "17.0.0"
jsonrpc-pubsub = "17.0.0"
jsonrpc-ws-server = "17.0.0"
log = "0.4.11"
serde = "1.0.122"
serde_derive = "1.0.103"
serde_json = "1.0.56"
solana-account-decoder = { path = "../account-decoder", version = "=1.7.0" }
solana-client = { path = "../client", version = "=1.7.0" }
solana-ledger = { path = "../ledger", version = "=1.7.0" }
solana-measure = { path = "../measure", version = "=1.7.0" }
solana-metrics = { path = "../metrics", version = "=1.7.0" }
solana-runtime = { path = "../runtime", version = "=1.7.0" }
solana-sdk = { path = "../sdk", version = "=1.7.0" }
solana-vote-program = { path = "../programs/vote", version = "=1.7.0" }
spl-token-v2-0 = { package = "spl-token", version = "=3.1.0", features = ["no-entrypoint"] }
[dev-dependencies]
serial_test = "0.4.0"
solana-stake-program = { path = "../programs/stake", version = "=1.7.0" }
tokio = { version = "1", features = ["full"] }
[lib]
crate-type = ["lib"]
name = "solana_rpc"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

21
rpc/src/lib.rs Normal file
View File

@ -0,0 +1,21 @@
#![allow(clippy::integer_arithmetic)]
pub mod max_slots;
pub mod optimistically_confirmed_bank_tracker;
pub mod parsed_token_accounts;
pub mod rpc_completed_slots_service;
pub mod rpc_pubsub;
pub mod rpc_pubsub_service;
pub mod rpc_subscriptions;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
#[cfg(test)]
#[macro_use]
extern crate serde_json;
#[macro_use]
extern crate solana_metrics;

7
rpc/src/max_slots.rs Normal file
View File

@ -0,0 +1,7 @@
use std::sync::atomic::AtomicU64;
#[derive(Default)]
pub struct MaxSlots {
pub retransmit: AtomicU64,
pub shred_insert: AtomicU64,
}

View File

@ -0,0 +1,328 @@
//! The `optimistically_confirmed_bank_tracker` module implements a threaded service to track the
//! most recent optimistically confirmed bank for use in rpc services, and triggers gossip
//! subscription notifications
use {
crate::rpc_subscriptions::RpcSubscriptions,
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
solana_client::rpc_response::{SlotTransactionStats, SlotUpdate},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, timing::timestamp},
std::{
collections::HashSet,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};
pub struct OptimisticallyConfirmedBank {
pub bank: Arc<Bank>,
}
impl OptimisticallyConfirmedBank {
pub fn locked_from_bank_forks_root(bank_forks: &Arc<RwLock<BankForks>>) -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(Self {
bank: bank_forks.read().unwrap().root_bank(),
}))
}
}
pub enum BankNotification {
OptimisticallyConfirmed(Slot),
Frozen(Arc<Bank>),
Root(Arc<Bank>),
}
impl std::fmt::Debug for BankNotification {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
BankNotification::OptimisticallyConfirmed(slot) => {
write!(f, "OptimisticallyConfirmed({:?})", slot)
}
BankNotification::Frozen(bank) => write!(f, "Frozen({})", bank.slot()),
BankNotification::Root(bank) => write!(f, "Root({})", bank.slot()),
}
}
}
pub type BankNotificationReceiver = Receiver<BankNotification>;
pub type BankNotificationSender = Sender<BankNotification>;
pub struct OptimisticallyConfirmedBankTracker {
thread_hdl: JoinHandle<()>,
}
impl OptimisticallyConfirmedBankTracker {
pub fn new(
receiver: BankNotificationReceiver,
exit: &Arc<AtomicBool>,
bank_forks: Arc<RwLock<BankForks>>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
subscriptions: Arc<RpcSubscriptions>,
) -> Self {
let exit_ = exit.clone();
let mut pending_optimistically_confirmed_banks = HashSet::new();
let thread_hdl = Builder::new()
.name("solana-optimistic-bank-tracker".to_string())
.spawn(move || loop {
if exit_.load(Ordering::Relaxed) {
break;
}
if let Err(RecvTimeoutError::Disconnected) = Self::recv_notification(
&receiver,
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
) {
break;
}
})
.unwrap();
Self { thread_hdl }
}
fn recv_notification(
receiver: &Receiver<BankNotification>,
bank_forks: &Arc<RwLock<BankForks>>,
optimistically_confirmed_bank: &Arc<RwLock<OptimisticallyConfirmedBank>>,
subscriptions: &Arc<RpcSubscriptions>,
mut pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
) -> Result<(), RecvTimeoutError> {
let notification = receiver.recv_timeout(Duration::from_secs(1))?;
Self::process_notification(
notification,
bank_forks,
optimistically_confirmed_bank,
subscriptions,
&mut pending_optimistically_confirmed_banks,
);
Ok(())
}
pub fn process_notification(
notification: BankNotification,
bank_forks: &Arc<RwLock<BankForks>>,
optimistically_confirmed_bank: &Arc<RwLock<OptimisticallyConfirmedBank>>,
subscriptions: &Arc<RpcSubscriptions>,
pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
) {
debug!("received bank notification: {:?}", notification);
match notification {
BankNotification::OptimisticallyConfirmed(slot) => {
if let Some(bank) = bank_forks
.read()
.unwrap()
.get(slot)
.filter(|b| b.is_frozen())
{
let mut w_optimistically_confirmed_bank =
optimistically_confirmed_bank.write().unwrap();
if bank.slot() > w_optimistically_confirmed_bank.bank.slot() {
w_optimistically_confirmed_bank.bank = bank.clone();
subscriptions.notify_gossip_subscribers(slot);
}
drop(w_optimistically_confirmed_bank);
} else if slot > bank_forks.read().unwrap().root_bank().slot() {
pending_optimistically_confirmed_banks.insert(slot);
} else {
inc_new_counter_info!("dropped-already-rooted-optimistic-bank-notification", 1);
}
// Send slot notification regardless of whether the bank is replayed
subscriptions.notify_slot_update(SlotUpdate::OptimisticConfirmation {
slot,
timestamp: timestamp(),
});
}
BankNotification::Frozen(bank) => {
let frozen_slot = bank.slot();
if let Some(parent) = bank.parent() {
let num_successful_transactions = bank
.transaction_count()
.saturating_sub(parent.transaction_count());
subscriptions.notify_slot_update(SlotUpdate::Frozen {
slot: frozen_slot,
timestamp: timestamp(),
stats: SlotTransactionStats {
num_transaction_entries: bank.transaction_entries_count(),
num_successful_transactions,
num_failed_transactions: bank.transaction_error_count(),
max_transactions_per_entry: bank.transactions_per_entry_max(),
},
});
}
if pending_optimistically_confirmed_banks.remove(&bank.slot()) {
let mut w_optimistically_confirmed_bank =
optimistically_confirmed_bank.write().unwrap();
if frozen_slot > w_optimistically_confirmed_bank.bank.slot() {
w_optimistically_confirmed_bank.bank = bank;
subscriptions.notify_gossip_subscribers(frozen_slot);
}
drop(w_optimistically_confirmed_bank);
}
}
BankNotification::Root(bank) => {
let root_slot = bank.slot();
let mut w_optimistically_confirmed_bank =
optimistically_confirmed_bank.write().unwrap();
if root_slot > w_optimistically_confirmed_bank.bank.slot() {
w_optimistically_confirmed_bank.bank = bank;
}
drop(w_optimistically_confirmed_bank);
pending_optimistically_confirmed_banks.retain(|&s| s > root_slot);
}
}
}
pub fn close(self) -> thread::Result<()> {
self.join()
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
#[cfg(test)]
mod tests {
use {
super::*,
solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
solana_runtime::{
accounts_background_service::AbsRequestSender, commitment::BlockCommitmentCache,
},
solana_sdk::pubkey::Pubkey,
};
#[test]
fn test_process_notification() {
let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone();
let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2);
bank_forks.write().unwrap().insert(bank2);
let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone();
let bank3 = Bank::new_from_parent(&bank2, &Pubkey::default(), 3);
bank_forks.write().unwrap().insert(bank3);
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
block_commitment_cache,
optimistically_confirmed_bank.clone(),
));
let mut pending_optimistically_confirmed_banks = HashSet::new();
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 0);
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(2),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
// Test max optimistically confirmed bank remains in the cache
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(1),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
// Test bank will only be cached when frozen
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(3),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
assert_eq!(pending_optimistically_confirmed_banks.contains(&3), true);
// Test bank will only be cached when frozen
let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone();
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::Frozen(bank3),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
// Test higher root will be cached and clear pending_optimistically_confirmed_banks
let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone();
let bank4 = Bank::new_from_parent(&bank3, &Pubkey::default(), 4);
bank_forks.write().unwrap().insert(bank4);
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(4),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
assert_eq!(pending_optimistically_confirmed_banks.contains(&4), true);
let bank4 = bank_forks.read().unwrap().get(4).unwrap().clone();
let bank5 = Bank::new_from_parent(&bank4, &Pubkey::default(), 5);
bank_forks.write().unwrap().insert(bank5);
let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone();
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::Root(bank5),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
assert_eq!(pending_optimistically_confirmed_banks.contains(&4), false);
// Banks <= root do not get added to pending list, even if not frozen
let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone();
let bank6 = Bank::new_from_parent(&bank5, &Pubkey::default(), 6);
bank_forks.write().unwrap().insert(bank6);
let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone();
let bank7 = Bank::new_from_parent(&bank5, &Pubkey::default(), 7);
bank_forks.write().unwrap().insert(bank7);
bank_forks
.write()
.unwrap()
.set_root(7, &AbsRequestSender::default(), None);
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(6),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
assert_eq!(pending_optimistically_confirmed_banks.contains(&6), false);
}
}

View File

@ -0,0 +1,94 @@
use {
jsonrpc_core::{Error, Result},
solana_account_decoder::{
parse_account_data::AccountAdditionalData,
parse_token::{get_token_account_mint, spl_token_id_v2_0, spl_token_v2_0_native_mint},
UiAccount, UiAccountData, UiAccountEncoding,
},
solana_client::rpc_response::RpcKeyedAccount,
solana_runtime::bank::Bank,
solana_sdk::{
account::{AccountSharedData, ReadableAccount},
pubkey::Pubkey,
},
spl_token_v2_0::{solana_program::program_pack::Pack, state::Mint},
std::{collections::HashMap, sync::Arc},
};
pub fn get_parsed_token_account(
bank: Arc<Bank>,
pubkey: &Pubkey,
account: AccountSharedData,
) -> UiAccount {
let additional_data = get_token_account_mint(&account.data())
.and_then(|mint_pubkey| get_mint_owner_and_decimals(&bank, &mint_pubkey).ok())
.map(|(_, decimals)| AccountAdditionalData {
spl_token_decimals: Some(decimals),
});
UiAccount::encode(
pubkey,
account,
UiAccountEncoding::JsonParsed,
additional_data,
None,
)
}
pub fn get_parsed_token_accounts<I>(
bank: Arc<Bank>,
keyed_accounts: I,
) -> impl Iterator<Item = RpcKeyedAccount>
where
I: Iterator<Item = (Pubkey, AccountSharedData)>,
{
let mut mint_decimals: HashMap<Pubkey, u8> = HashMap::new();
keyed_accounts.filter_map(move |(pubkey, account)| {
let additional_data = get_token_account_mint(&account.data()).map(|mint_pubkey| {
let spl_token_decimals = mint_decimals.get(&mint_pubkey).cloned().or_else(|| {
let (_, decimals) = get_mint_owner_and_decimals(&bank, &mint_pubkey).ok()?;
mint_decimals.insert(mint_pubkey, decimals);
Some(decimals)
});
AccountAdditionalData { spl_token_decimals }
});
let maybe_encoded_account = UiAccount::encode(
&pubkey,
account,
UiAccountEncoding::JsonParsed,
additional_data,
None,
);
if let UiAccountData::Json(_) = &maybe_encoded_account.data {
Some(RpcKeyedAccount {
pubkey: pubkey.to_string(),
account: maybe_encoded_account,
})
} else {
None
}
})
}
/// Analyze a mint Pubkey that may be the native_mint and get the mint-account owner (token
/// program_id) and decimals
pub fn get_mint_owner_and_decimals(bank: &Arc<Bank>, mint: &Pubkey) -> Result<(Pubkey, u8)> {
if mint == &spl_token_v2_0_native_mint() {
Ok((spl_token_id_v2_0(), spl_token_v2_0::native_mint::DECIMALS))
} else {
let mint_account = bank.get_account(mint).ok_or_else(|| {
Error::invalid_params("Invalid param: could not find mint".to_string())
})?;
let decimals = get_mint_decimals(&mint_account.data())?;
Ok((*mint_account.owner(), decimals))
}
}
fn get_mint_decimals(data: &[u8]) -> Result<u8> {
Mint::unpack(data)
.map_err(|_| {
Error::invalid_params("Invalid param: Token mint could not be unpacked".to_string())
})
.map(|mint| mint.decimals)
}

View File

@ -0,0 +1,35 @@
use {
crate::rpc_subscriptions::RpcSubscriptions,
solana_client::rpc_response::SlotUpdate,
solana_ledger::blockstore::CompletedSlotsReceiver,
solana_sdk::timing::timestamp,
std::{
sync::Arc,
thread::{Builder, JoinHandle},
},
};
pub struct RpcCompletedSlotsService;
impl RpcCompletedSlotsService {
pub fn spawn(
completed_slots_receiver: CompletedSlotsReceiver,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Option<JoinHandle<()>> {
let rpc_subscriptions = rpc_subscriptions?;
Some(
Builder::new()
.name("solana-rpc-completed-slots-service".to_string())
.spawn(move || {
for slots in completed_slots_receiver.iter() {
for slot in slots {
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
slot,
timestamp: timestamp(),
});
}
}
})
.unwrap(),
)
}
}

1287
rpc/src/rpc_pubsub.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,145 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request
use {
crate::{
rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl},
rpc_subscriptions::RpcSubscriptions,
},
jsonrpc_pubsub::{PubSubHandler, Session},
jsonrpc_ws_server::{RequestContext, ServerBuilder},
std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
},
};
#[derive(Debug, Clone)]
pub struct PubSubConfig {
pub enable_vote_subscription: bool,
// See the corresponding fields in
// https://github.com/paritytech/ws-rs/blob/be4d47575bae55c60d9f51b47480d355492a94fc/src/lib.rs#L131
// for a complete description of each field in this struct
pub max_connections: usize,
pub max_fragment_size: usize,
pub max_in_buffer_capacity: usize,
pub max_out_buffer_capacity: usize,
}
impl Default for PubSubConfig {
fn default() -> Self {
Self {
enable_vote_subscription: false,
max_connections: 1000, // Arbitrary, default of 100 is too low
max_fragment_size: 50 * 1024, // 50KB
max_in_buffer_capacity: 50 * 1024, // 50KB
max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc
}
}
}
pub struct PubSubService {
thread_hdl: JoinHandle<()>,
}
impl PubSubService {
pub fn new(
pubsub_config: PubSubConfig,
subscriptions: &Arc<RpcSubscriptions>,
pubsub_addr: SocketAddr,
exit: &Arc<AtomicBool>,
) -> Self {
info!("rpc_pubsub bound to {:?}", pubsub_addr);
let rpc = RpcSolPubSubImpl::new(subscriptions.clone());
let exit_ = exit.clone();
let thread_hdl = Builder::new()
.name("solana-pubsub".to_string())
.spawn(move || {
let mut io = PubSubHandler::default();
io.extend_with(rpc.to_delegate());
let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
info!("New pubsub connection");
let session = Arc::new(Session::new(context.sender()));
session.on_drop(|| {
info!("Pubsub connection dropped");
});
session
})
.max_connections(pubsub_config.max_connections)
.max_payload(pubsub_config.max_fragment_size)
.max_in_buffer_capacity(pubsub_config.max_in_buffer_capacity)
.max_out_buffer_capacity(pubsub_config.max_out_buffer_capacity)
.start(&pubsub_addr);
if let Err(e) = server {
warn!(
"Pubsub service unavailable error: {:?}. \n\
Also, check that port {} is not already in use by another application",
e,
pubsub_addr.port()
);
return;
}
while !exit_.load(Ordering::Relaxed) {
sleep(Duration::from_millis(100));
}
server.unwrap().close();
})
.unwrap();
Self { thread_hdl }
}
pub fn close(self) -> thread::Result<()> {
self.join()
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
solana_runtime::{
bank::Bank,
bank_forks::BankForks,
commitment::BlockCommitmentCache,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
},
std::{
net::{IpAddr, Ipv4Addr},
sync::RwLock,
},
};
#[test]
fn test_pubsub_new() {
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
));
let pubsub_service =
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr, &exit);
let thread = pubsub_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-pubsub");
}
}

2297
rpc/src/rpc_subscriptions.rs Normal file

File diff suppressed because it is too large Load Diff