@ -1,8 +1,10 @@
|
||||
use crate::{
|
||||
cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
|
||||
consensus::VOTE_THRESHOLD_SIZE,
|
||||
crds_value::CrdsValueLabel,
|
||||
poh_recorder::PohRecorder,
|
||||
result::{Error, Result},
|
||||
rpc_subscriptions::RpcSubscriptions,
|
||||
sigverify,
|
||||
verified_vote_packets::VerifiedVotePackets,
|
||||
};
|
||||
@ -14,7 +16,10 @@ use log::*;
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
use solana_metrics::inc_new_counter_debug;
|
||||
use solana_perf::packet::{self, Packets};
|
||||
use solana_runtime::{bank::Bank, epoch_stakes::EpochAuthorizedVoters};
|
||||
use solana_runtime::{
|
||||
bank::Bank,
|
||||
epoch_stakes::{EpochAuthorizedVoters, EpochStakes},
|
||||
};
|
||||
use solana_sdk::{
|
||||
clock::{Epoch, Slot},
|
||||
epoch_schedule::EpochSchedule,
|
||||
@ -43,6 +48,7 @@ pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>;
|
||||
pub struct SlotVoteTracker {
|
||||
voted: HashSet<Arc<Pubkey>>,
|
||||
updates: Option<Vec<Arc<Pubkey>>>,
|
||||
total_stake: u64,
|
||||
}
|
||||
|
||||
impl SlotVoteTracker {
|
||||
@ -203,6 +209,7 @@ impl ClusterInfoVoteListener {
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
vote_tracker: Arc<VoteTracker>,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
subscriptions: Arc<RpcSubscriptions>,
|
||||
) -> Self {
|
||||
let exit_ = exit.clone();
|
||||
|
||||
@ -244,6 +251,7 @@ impl ClusterInfoVoteListener {
|
||||
verified_vote_transactions_receiver,
|
||||
vote_tracker,
|
||||
&bank_forks,
|
||||
subscriptions,
|
||||
);
|
||||
})
|
||||
.unwrap();
|
||||
@ -372,6 +380,7 @@ impl ClusterInfoVoteListener {
|
||||
vote_txs_receiver: VerifiedVoteTransactionsReceiver,
|
||||
vote_tracker: Arc<VoteTracker>,
|
||||
bank_forks: &RwLock<BankForks>,
|
||||
subscriptions: Arc<RpcSubscriptions>,
|
||||
) -> Result<()> {
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
@ -380,10 +389,15 @@ impl ClusterInfoVoteListener {
|
||||
|
||||
let root_bank = bank_forks.read().unwrap().root_bank().clone();
|
||||
vote_tracker.process_new_root_bank(&root_bank);
|
||||
let epoch_stakes = root_bank.epoch_stakes(root_bank.epoch());
|
||||
|
||||
if let Err(e) =
|
||||
Self::get_and_process_votes(&vote_txs_receiver, &vote_tracker, root_bank.slot())
|
||||
{
|
||||
if let Err(e) = Self::get_and_process_votes(
|
||||
&vote_txs_receiver,
|
||||
&vote_tracker,
|
||||
root_bank.slot(),
|
||||
subscriptions.clone(),
|
||||
epoch_stakes,
|
||||
) {
|
||||
match e {
|
||||
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => {
|
||||
return Ok(());
|
||||
@ -397,21 +411,51 @@ impl ClusterInfoVoteListener {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn get_and_process_votes_for_tests(
|
||||
vote_txs_receiver: &VerifiedVoteTransactionsReceiver,
|
||||
vote_tracker: &Arc<VoteTracker>,
|
||||
last_root: Slot,
|
||||
subscriptions: Arc<RpcSubscriptions>,
|
||||
) -> Result<()> {
|
||||
Self::get_and_process_votes(
|
||||
vote_txs_receiver,
|
||||
vote_tracker,
|
||||
last_root,
|
||||
subscriptions,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
fn get_and_process_votes(
|
||||
vote_txs_receiver: &VerifiedVoteTransactionsReceiver,
|
||||
vote_tracker: &Arc<VoteTracker>,
|
||||
last_root: Slot,
|
||||
subscriptions: Arc<RpcSubscriptions>,
|
||||
epoch_stakes: Option<&EpochStakes>,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::from_millis(200);
|
||||
let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?;
|
||||
while let Ok(new_txs) = vote_txs_receiver.try_recv() {
|
||||
vote_txs.extend(new_txs);
|
||||
}
|
||||
Self::process_votes(vote_tracker, vote_txs, last_root);
|
||||
Self::process_votes(
|
||||
vote_tracker,
|
||||
vote_txs,
|
||||
last_root,
|
||||
subscriptions,
|
||||
epoch_stakes,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_votes(vote_tracker: &VoteTracker, vote_txs: Vec<Transaction>, root: Slot) {
|
||||
fn process_votes(
|
||||
vote_tracker: &VoteTracker,
|
||||
vote_txs: Vec<Transaction>,
|
||||
root: Slot,
|
||||
subscriptions: Arc<RpcSubscriptions>,
|
||||
epoch_stakes: Option<&EpochStakes>,
|
||||
) {
|
||||
let mut diff: HashMap<Slot, HashSet<Arc<Pubkey>>> = HashMap::new();
|
||||
{
|
||||
let all_slot_trackers = &vote_tracker.slot_vote_trackers;
|
||||
@ -463,7 +507,7 @@ impl ClusterInfoVoteListener {
|
||||
continue;
|
||||
}
|
||||
|
||||
for slot in vote.slots {
|
||||
for &slot in vote.slots.iter() {
|
||||
if slot <= root {
|
||||
continue;
|
||||
}
|
||||
@ -488,6 +532,8 @@ impl ClusterInfoVoteListener {
|
||||
.or_default()
|
||||
.insert(unduplicated_pubkey.unwrap());
|
||||
}
|
||||
|
||||
subscriptions.notify_vote(&vote);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -504,15 +550,35 @@ impl ClusterInfoVoteListener {
|
||||
if w_slot_tracker.updates.is_none() {
|
||||
w_slot_tracker.updates = Some(vec![]);
|
||||
}
|
||||
for pk in slot_diff {
|
||||
w_slot_tracker.voted.insert(pk.clone());
|
||||
w_slot_tracker.updates.as_mut().unwrap().push(pk);
|
||||
let mut current_stake = 0;
|
||||
for pubkey in slot_diff {
|
||||
Self::sum_stake(&mut current_stake, epoch_stakes, &pubkey);
|
||||
|
||||
w_slot_tracker.voted.insert(pubkey.clone());
|
||||
w_slot_tracker.updates.as_mut().unwrap().push(pubkey);
|
||||
}
|
||||
Self::notify_for_stake_change(
|
||||
current_stake,
|
||||
w_slot_tracker.total_stake,
|
||||
&subscriptions,
|
||||
epoch_stakes,
|
||||
slot,
|
||||
);
|
||||
w_slot_tracker.total_stake += current_stake;
|
||||
} else {
|
||||
let voted: HashSet<_> = slot_diff.into_iter().collect();
|
||||
let mut total_stake = 0;
|
||||
let voted: HashSet<_> = slot_diff
|
||||
.into_iter()
|
||||
.map(|pubkey| {
|
||||
Self::sum_stake(&mut total_stake, epoch_stakes, &pubkey);
|
||||
pubkey
|
||||
})
|
||||
.collect();
|
||||
Self::notify_for_stake_change(total_stake, 0, &subscriptions, epoch_stakes, slot);
|
||||
let new_slot_tracker = SlotVoteTracker {
|
||||
voted: voted.clone(),
|
||||
updates: Some(voted.into_iter().collect()),
|
||||
total_stake,
|
||||
};
|
||||
vote_tracker
|
||||
.slot_vote_trackers
|
||||
@ -522,11 +588,38 @@ impl ClusterInfoVoteListener {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn notify_for_stake_change(
|
||||
current_stake: u64,
|
||||
previous_stake: u64,
|
||||
subscriptions: &Arc<RpcSubscriptions>,
|
||||
epoch_stakes: Option<&EpochStakes>,
|
||||
slot: Slot,
|
||||
) {
|
||||
if let Some(stakes) = epoch_stakes {
|
||||
let supermajority_stake = (stakes.total_stake() as f64 * VOTE_THRESHOLD_SIZE) as u64;
|
||||
if previous_stake < supermajority_stake
|
||||
&& (previous_stake + current_stake) > supermajority_stake
|
||||
{
|
||||
subscriptions.notify_gossip_subscribers(slot);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn sum_stake(sum: &mut u64, epoch_stakes: Option<&EpochStakes>, pubkey: &Pubkey) {
|
||||
if let Some(stakes) = epoch_stakes {
|
||||
if let Some(vote_account) = stakes.stakes().vote_accounts().get(pubkey) {
|
||||
*sum += vote_account.0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::commitment::BlockCommitmentCache;
|
||||
use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path};
|
||||
use solana_perf::packet;
|
||||
use solana_runtime::{
|
||||
bank::Bank,
|
||||
@ -623,7 +716,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_update_new_root() {
|
||||
let (vote_tracker, bank, _) = setup();
|
||||
let (vote_tracker, bank, _, _) = setup();
|
||||
|
||||
// Check outdated slots are purged with new root
|
||||
let new_voter = Arc::new(Pubkey::new_rand());
|
||||
@ -664,7 +757,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_update_new_leader_schedule_epoch() {
|
||||
let (vote_tracker, bank, _) = setup();
|
||||
let (vote_tracker, bank, _, _) = setup();
|
||||
|
||||
// Check outdated slots are purged with new root
|
||||
let leader_schedule_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
||||
@ -706,7 +799,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_process_votes() {
|
||||
// Create some voters at genesis
|
||||
let (vote_tracker, _, validator_voting_keypairs) = setup();
|
||||
let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup();
|
||||
let (votes_sender, votes_receiver) = unbounded();
|
||||
|
||||
let vote_slots = vec![1, 2];
|
||||
@ -725,7 +818,14 @@ mod tests {
|
||||
});
|
||||
|
||||
// Check that all the votes were registered for each validator correctly
|
||||
ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker, 0).unwrap();
|
||||
ClusterInfoVoteListener::get_and_process_votes(
|
||||
&votes_receiver,
|
||||
&vote_tracker,
|
||||
0,
|
||||
subscriptions,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
for vote_slot in vote_slots {
|
||||
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap();
|
||||
let r_slot_vote_tracker = slot_vote_tracker.read().unwrap();
|
||||
@ -744,7 +844,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_process_votes2() {
|
||||
// Create some voters at genesis
|
||||
let (vote_tracker, _, validator_voting_keypairs) = setup();
|
||||
let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup();
|
||||
// Send some votes to process
|
||||
let (votes_sender, votes_receiver) = unbounded();
|
||||
|
||||
@ -769,7 +869,14 @@ mod tests {
|
||||
}
|
||||
|
||||
// Check that all the votes were registered for each validator correctly
|
||||
ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker, 0).unwrap();
|
||||
ClusterInfoVoteListener::get_and_process_votes(
|
||||
&votes_receiver,
|
||||
&vote_tracker,
|
||||
0,
|
||||
subscriptions,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() {
|
||||
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(i as u64 + 1).unwrap();
|
||||
let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap();
|
||||
@ -788,7 +895,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_get_voters_by_epoch() {
|
||||
// Create some voters at genesis
|
||||
let (vote_tracker, bank, validator_voting_keypairs) = setup();
|
||||
let (vote_tracker, bank, validator_voting_keypairs, _) = setup();
|
||||
let last_known_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
||||
let last_known_slot = bank
|
||||
.epoch_schedule()
|
||||
@ -859,11 +966,23 @@ mod tests {
|
||||
100,
|
||||
);
|
||||
let bank = Bank::new(&genesis_config);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let bank_forks = BankForks::new(0, bank);
|
||||
let bank = bank_forks.get(0).unwrap().clone();
|
||||
let vote_tracker = VoteTracker::new(&bank);
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||
let subscriptions = Arc::new(RpcSubscriptions::new(
|
||||
&exit,
|
||||
Arc::new(RwLock::new(bank_forks)),
|
||||
Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore(
|
||||
blockstore.clone(),
|
||||
))),
|
||||
));
|
||||
|
||||
// Send a vote to process, should add a reference to the pubkey for that voter
|
||||
// in the tracker
|
||||
let validator0_keypairs = &validator_voting_keypairs[0];
|
||||
let vote_tracker = VoteTracker::new(&bank);
|
||||
let vote_tx = vec![vote_transaction::new_vote_transaction(
|
||||
// Must vote > root to be processed
|
||||
vec![bank.slot() + 1],
|
||||
@ -874,7 +993,13 @@ mod tests {
|
||||
&validator0_keypairs.vote_keypair,
|
||||
)];
|
||||
|
||||
ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx, 0);
|
||||
ClusterInfoVoteListener::process_votes(
|
||||
&vote_tracker,
|
||||
vote_tx,
|
||||
0,
|
||||
subscriptions.clone(),
|
||||
None,
|
||||
);
|
||||
let ref_count = Arc::strong_count(
|
||||
&vote_tracker
|
||||
.keys
|
||||
@ -924,7 +1049,7 @@ mod tests {
|
||||
})
|
||||
.collect();
|
||||
|
||||
ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0);
|
||||
ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0, subscriptions, None);
|
||||
|
||||
let ref_count = Arc::strong_count(
|
||||
&vote_tracker
|
||||
@ -938,7 +1063,12 @@ mod tests {
|
||||
assert_eq!(ref_count, current_ref_count);
|
||||
}
|
||||
|
||||
fn setup() -> (Arc<VoteTracker>, Arc<Bank>, Vec<ValidatorVoteKeypairs>) {
|
||||
fn setup() -> (
|
||||
Arc<VoteTracker>,
|
||||
Arc<Bank>,
|
||||
Vec<ValidatorVoteKeypairs>,
|
||||
Arc<RpcSubscriptions>,
|
||||
) {
|
||||
let validator_voting_keypairs: Vec<_> = (0..10)
|
||||
.map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new()))
|
||||
.collect();
|
||||
@ -950,6 +1080,18 @@ mod tests {
|
||||
);
|
||||
let bank = Bank::new(&genesis_config);
|
||||
let vote_tracker = VoteTracker::new(&bank);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let bank_forks = BankForks::new(0, bank);
|
||||
let bank = bank_forks.get(0).unwrap().clone();
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||
let subscriptions = Arc::new(RpcSubscriptions::new(
|
||||
&exit,
|
||||
Arc::new(RwLock::new(bank_forks)),
|
||||
Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore(
|
||||
blockstore.clone(),
|
||||
))),
|
||||
));
|
||||
|
||||
// Integrity Checks
|
||||
let current_epoch = bank.epoch();
|
||||
@ -976,8 +1118,9 @@ mod tests {
|
||||
assert_eq!(*vote_tracker.current_epoch.read().unwrap(), current_epoch);
|
||||
(
|
||||
Arc::new(vote_tracker),
|
||||
Arc::new(bank),
|
||||
bank,
|
||||
validator_voting_keypairs,
|
||||
subscriptions,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -93,7 +93,10 @@ impl JsonRpcRequestProcessor {
|
||||
debug!("RPC using node root: {:?}", slot);
|
||||
Ok(r_bank_forks.get(slot).cloned().unwrap())
|
||||
}
|
||||
Some(commitment_config) if commitment_config.commitment == CommitmentLevel::Single => {
|
||||
Some(commitment_config)
|
||||
if commitment_config.commitment == CommitmentLevel::Single
|
||||
|| commitment_config.commitment == CommitmentLevel::SingleGossip =>
|
||||
{
|
||||
let slot = self
|
||||
.block_commitment_cache
|
||||
.read()
|
||||
|
@ -1,6 +1,6 @@
|
||||
//! The `pubsub` module implements a threaded subscription service on client RPC request
|
||||
|
||||
use crate::rpc_subscriptions::{RpcSubscriptions, SlotInfo};
|
||||
use crate::rpc_subscriptions::{RpcSubscriptions, RpcVote, SlotInfo};
|
||||
use jsonrpc_core::{Error, ErrorCode, Result};
|
||||
use jsonrpc_derive::rpc;
|
||||
use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId};
|
||||
@ -114,6 +114,18 @@ pub trait RpcSolPubSub {
|
||||
)]
|
||||
fn slot_unsubscribe(&self, meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool>;
|
||||
|
||||
// Get notification when vote is encountered
|
||||
#[pubsub(subscription = "voteNotification", subscribe, name = "voteSubscribe")]
|
||||
fn vote_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<RpcVote>);
|
||||
|
||||
// Unsubscribe from vote notification subscription.
|
||||
#[pubsub(
|
||||
subscription = "voteNotification",
|
||||
unsubscribe,
|
||||
name = "voteUnsubscribe"
|
||||
)]
|
||||
fn vote_unsubscribe(&self, meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool>;
|
||||
|
||||
// Get notification when a new root is set
|
||||
#[pubsub(subscription = "rootNotification", subscribe, name = "rootSubscribe")]
|
||||
fn root_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<Slot>);
|
||||
@ -295,6 +307,27 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
||||
}
|
||||
}
|
||||
|
||||
fn vote_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<RpcVote>) {
|
||||
info!("vote_subscribe");
|
||||
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
|
||||
let sub_id = SubscriptionId::Number(id as u64);
|
||||
info!("vote_subscribe: id={:?}", sub_id);
|
||||
self.subscriptions.add_vote_subscription(sub_id, subscriber);
|
||||
}
|
||||
|
||||
fn vote_unsubscribe(&self, _meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {
|
||||
info!("vote_unsubscribe");
|
||||
if self.subscriptions.remove_vote_subscription(&id) {
|
||||
Ok(true)
|
||||
} else {
|
||||
Err(Error {
|
||||
code: ErrorCode::InvalidParams,
|
||||
message: "Invalid Request: Subscription id does not exist".into(),
|
||||
data: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn root_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<Slot>) {
|
||||
info!("root_subscribe");
|
||||
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
|
||||
@ -321,9 +354,11 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker},
|
||||
commitment::{BlockCommitmentCache, CacheSlotInfo},
|
||||
rpc_subscriptions::tests::robust_poll_or_panic,
|
||||
};
|
||||
use crossbeam_channel::unbounded;
|
||||
use jsonrpc_core::{futures::sync::mpsc, Response};
|
||||
use jsonrpc_pubsub::{PubSubHandler, Session};
|
||||
use serial_test_derive::serial;
|
||||
@ -333,13 +368,18 @@ mod tests {
|
||||
genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
||||
get_tmp_ledger_path,
|
||||
};
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_runtime::{
|
||||
bank::Bank,
|
||||
genesis_utils::{create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs},
|
||||
};
|
||||
use solana_sdk::{
|
||||
hash::Hash,
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signer},
|
||||
system_program, system_transaction,
|
||||
transaction::{self, Transaction},
|
||||
};
|
||||
use solana_vote_program::vote_transaction;
|
||||
use std::{
|
||||
sync::{atomic::AtomicBool, RwLock},
|
||||
thread::sleep,
|
||||
@ -519,7 +559,7 @@ mod tests {
|
||||
session,
|
||||
subscriber,
|
||||
contract_state.pubkey().to_string(),
|
||||
None,
|
||||
Some(CommitmentConfig::recent()),
|
||||
);
|
||||
|
||||
let tx = system_transaction::transfer(&alice, &contract_funds.pubkey(), 51, blockhash);
|
||||
@ -836,4 +876,97 @@ mod tests {
|
||||
.slot_unsubscribe(Some(session), SubscriptionId::Number(0))
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_vote_subscribe() {
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||
let block_commitment_cache = Arc::new(RwLock::new(
|
||||
BlockCommitmentCache::new_for_tests_with_blockstore(blockstore.clone()),
|
||||
));
|
||||
|
||||
let validator_voting_keypairs: Vec<_> = (0..10)
|
||||
.map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new()))
|
||||
.collect();
|
||||
let GenesisConfigInfo { genesis_config, .. } =
|
||||
create_genesis_config_with_vote_accounts(10_000, &validator_voting_keypairs, 100);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let bank = Bank::new(&genesis_config);
|
||||
let bank_forks = BankForks::new(0, bank);
|
||||
let bank = bank_forks.get(0).unwrap().clone();
|
||||
let bank_forks = Arc::new(RwLock::new(bank_forks));
|
||||
|
||||
// Setup RPC
|
||||
let mut rpc =
|
||||
RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks.clone());
|
||||
let session = create_session();
|
||||
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("voteNotification");
|
||||
|
||||
// Setup Subscriptions
|
||||
let subscriptions =
|
||||
RpcSubscriptions::new(&exit, bank_forks.clone(), block_commitment_cache.clone());
|
||||
rpc.subscriptions = Arc::new(subscriptions);
|
||||
rpc.vote_subscribe(session, subscriber);
|
||||
|
||||
// Create some voters at genesis
|
||||
let vote_tracker = VoteTracker::new(&bank);
|
||||
let (votes_sender, votes_receiver) = unbounded();
|
||||
let (vote_tracker, validator_voting_keypairs) =
|
||||
(Arc::new(vote_tracker), validator_voting_keypairs);
|
||||
|
||||
let vote_slots = vec![1, 2];
|
||||
validator_voting_keypairs.iter().for_each(|keypairs| {
|
||||
let node_keypair = &keypairs.node_keypair;
|
||||
let vote_keypair = &keypairs.vote_keypair;
|
||||
let vote_tx = vote_transaction::new_vote_transaction(
|
||||
vote_slots.clone(),
|
||||
Hash::default(),
|
||||
Hash::default(),
|
||||
node_keypair,
|
||||
vote_keypair,
|
||||
vote_keypair,
|
||||
);
|
||||
votes_sender.send(vec![vote_tx]).unwrap();
|
||||
});
|
||||
|
||||
// Process votes and check they were notified.
|
||||
ClusterInfoVoteListener::get_and_process_votes_for_tests(
|
||||
&votes_receiver,
|
||||
&vote_tracker,
|
||||
0,
|
||||
rpc.subscriptions.clone(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (response, _) = robust_poll_or_panic(receiver);
|
||||
assert_eq!(
|
||||
response,
|
||||
r#"{"jsonrpc":"2.0","method":"voteNotification","params":{"result":{"hash":"11111111111111111111111111111111","slots":[1,2],"timestamp":null},"subscription":0}}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_vote_unsubscribe() {
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
||||
let bank = Bank::new(&genesis_config);
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
|
||||
let rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks);
|
||||
let session = create_session();
|
||||
let (subscriber, _id_receiver, _) = Subscriber::new_test("voteNotification");
|
||||
rpc.vote_subscribe(session, subscriber);
|
||||
|
||||
let session = create_session();
|
||||
assert!(rpc
|
||||
.vote_unsubscribe(Some(session), SubscriptionId::Number(42))
|
||||
.is_err());
|
||||
|
||||
let session = create_session();
|
||||
assert!(rpc
|
||||
.vote_unsubscribe(Some(session), SubscriptionId::Number(0))
|
||||
.is_ok());
|
||||
}
|
||||
}
|
||||
|
@ -15,12 +15,13 @@ use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore};
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::{
|
||||
account::Account,
|
||||
clock::Slot,
|
||||
clock::{Slot, UnixTimestamp},
|
||||
commitment_config::{CommitmentConfig, CommitmentLevel},
|
||||
pubkey::Pubkey,
|
||||
signature::Signature,
|
||||
transaction,
|
||||
};
|
||||
use solana_vote_program::vote_state::Vote;
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
mpsc::{Receiver, RecvTimeoutError, SendError, Sender},
|
||||
@ -43,22 +44,34 @@ pub struct SlotInfo {
|
||||
pub root: Slot,
|
||||
}
|
||||
|
||||
// A more human-friendly version of Vote, with the bank state signature base58 encoded.
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct RpcVote {
|
||||
pub slots: Vec<Slot>,
|
||||
pub hash: String,
|
||||
pub timestamp: Option<UnixTimestamp>,
|
||||
}
|
||||
|
||||
enum NotificationEntry {
|
||||
Slot(SlotInfo),
|
||||
Vote(Vote),
|
||||
Root(Slot),
|
||||
Bank(CacheSlotInfo),
|
||||
Gossip(Slot),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for NotificationEntry {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
NotificationEntry::Root(root) => write!(f, "Root({})", root),
|
||||
NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote),
|
||||
NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info),
|
||||
NotificationEntry::Bank(cache_slot_info) => write!(
|
||||
f,
|
||||
"Bank({{current_slot: {:?}}})",
|
||||
cache_slot_info.current_slot
|
||||
),
|
||||
NotificationEntry::Gossip(slot) => write!(f, "Gossip({:?})", slot),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -76,6 +89,7 @@ type RpcSignatureSubscriptions = RwLock<
|
||||
HashMap<Signature, HashMap<SubscriptionId, SubscriptionData<Response<RpcSignatureResult>>>>,
|
||||
>;
|
||||
type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
|
||||
type RpcVoteSubscriptions = RwLock<HashMap<SubscriptionId, Sink<RpcVote>>>;
|
||||
type RpcRootSubscriptions = RwLock<HashMap<SubscriptionId, Sink<Slot>>>;
|
||||
|
||||
fn add_subscription<K, S>(
|
||||
@ -90,7 +104,7 @@ fn add_subscription<K, S>(
|
||||
S: Clone,
|
||||
{
|
||||
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
||||
let commitment = commitment.unwrap_or_else(CommitmentConfig::recent);
|
||||
let commitment = commitment.unwrap_or_else(CommitmentConfig::single);
|
||||
let subscription_data = SubscriptionData {
|
||||
sink,
|
||||
commitment,
|
||||
@ -159,7 +173,9 @@ where
|
||||
CommitmentLevel::Max => cache_slot_info.largest_confirmed_root,
|
||||
CommitmentLevel::Recent => cache_slot_info.current_slot,
|
||||
CommitmentLevel::Root => cache_slot_info.node_root,
|
||||
CommitmentLevel::Single => cache_slot_info.highest_confirmed_slot,
|
||||
CommitmentLevel::Single | CommitmentLevel::SingleGossip => {
|
||||
cache_slot_info.highest_confirmed_slot
|
||||
}
|
||||
};
|
||||
let results = {
|
||||
let bank_forks = bank_forks.read().unwrap();
|
||||
@ -246,7 +262,11 @@ struct Subscriptions {
|
||||
account_subscriptions: Arc<RpcAccountSubscriptions>,
|
||||
program_subscriptions: Arc<RpcProgramSubscriptions>,
|
||||
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
|
||||
gossip_account_subscriptions: Arc<RpcAccountSubscriptions>,
|
||||
gossip_program_subscriptions: Arc<RpcProgramSubscriptions>,
|
||||
gossip_signature_subscriptions: Arc<RpcSignatureSubscriptions>,
|
||||
slot_subscriptions: Arc<RpcSlotSubscriptions>,
|
||||
vote_subscriptions: Arc<RpcVoteSubscriptions>,
|
||||
root_subscriptions: Arc<RpcRootSubscriptions>,
|
||||
}
|
||||
|
||||
@ -257,6 +277,7 @@ pub struct RpcSubscriptions {
|
||||
notifier_runtime: Option<Runtime>,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||
last_checked_slots: Arc<RwLock<HashMap<CommitmentLevel, Slot>>>,
|
||||
exit: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
@ -282,7 +303,11 @@ impl RpcSubscriptions {
|
||||
let account_subscriptions = Arc::new(RpcAccountSubscriptions::default());
|
||||
let program_subscriptions = Arc::new(RpcProgramSubscriptions::default());
|
||||
let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default());
|
||||
let gossip_account_subscriptions = Arc::new(RpcAccountSubscriptions::default());
|
||||
let gossip_program_subscriptions = Arc::new(RpcProgramSubscriptions::default());
|
||||
let gossip_signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default());
|
||||
let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default());
|
||||
let vote_subscriptions = Arc::new(RpcVoteSubscriptions::default());
|
||||
let root_subscriptions = Arc::new(RpcRootSubscriptions::default());
|
||||
let notification_sender = Arc::new(Mutex::new(notification_sender));
|
||||
|
||||
@ -293,11 +318,18 @@ impl RpcSubscriptions {
|
||||
account_subscriptions,
|
||||
program_subscriptions,
|
||||
signature_subscriptions,
|
||||
gossip_account_subscriptions,
|
||||
gossip_program_subscriptions,
|
||||
gossip_signature_subscriptions,
|
||||
slot_subscriptions,
|
||||
vote_subscriptions,
|
||||
root_subscriptions,
|
||||
};
|
||||
let _subscriptions = subscriptions.clone();
|
||||
|
||||
let last_checked_slots = Arc::new(RwLock::new(HashMap::new()));
|
||||
let _last_checked_slots = last_checked_slots.clone();
|
||||
|
||||
let notifier_runtime = RuntimeBuilder::new()
|
||||
.core_threads(1)
|
||||
.name_prefix("solana-rpc-notifier-")
|
||||
@ -314,6 +346,7 @@ impl RpcSubscriptions {
|
||||
notification_receiver,
|
||||
_subscriptions,
|
||||
_bank_forks,
|
||||
_last_checked_slots,
|
||||
);
|
||||
})
|
||||
.unwrap();
|
||||
@ -325,6 +358,7 @@ impl RpcSubscriptions {
|
||||
t_cleanup: Some(t_cleanup),
|
||||
bank_forks,
|
||||
block_commitment_cache,
|
||||
last_checked_slots,
|
||||
exit: exit.clone(),
|
||||
}
|
||||
}
|
||||
@ -412,11 +446,10 @@ impl RpcSubscriptions {
|
||||
sub_id: SubscriptionId,
|
||||
subscriber: Subscriber<Response<RpcAccount>>,
|
||||
) {
|
||||
let mut subscriptions = self.subscriptions.account_subscriptions.write().unwrap();
|
||||
let slot = match commitment
|
||||
.unwrap_or_else(CommitmentConfig::recent)
|
||||
.commitment
|
||||
{
|
||||
let commitment_level = commitment
|
||||
.unwrap_or_else(CommitmentConfig::single)
|
||||
.commitment;
|
||||
let slot = match commitment_level {
|
||||
CommitmentLevel::Max => self
|
||||
.block_commitment_cache
|
||||
.read()
|
||||
@ -429,6 +462,12 @@ impl RpcSubscriptions {
|
||||
.read()
|
||||
.unwrap()
|
||||
.highest_confirmed_slot(),
|
||||
CommitmentLevel::SingleGossip => *self
|
||||
.last_checked_slots
|
||||
.read()
|
||||
.unwrap()
|
||||
.get(&CommitmentLevel::SingleGossip)
|
||||
.unwrap_or(&0),
|
||||
};
|
||||
let last_notified_slot = if let Some((_account, slot)) = self
|
||||
.bank_forks
|
||||
@ -441,6 +480,15 @@ impl RpcSubscriptions {
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip {
|
||||
self.subscriptions
|
||||
.gossip_account_subscriptions
|
||||
.write()
|
||||
.unwrap()
|
||||
} else {
|
||||
self.subscriptions.account_subscriptions.write().unwrap()
|
||||
};
|
||||
add_subscription(
|
||||
&mut subscriptions,
|
||||
pubkey,
|
||||
@ -453,8 +501,17 @@ impl RpcSubscriptions {
|
||||
|
||||
pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool {
|
||||
let mut subscriptions = self.subscriptions.account_subscriptions.write().unwrap();
|
||||
if remove_subscription(&mut subscriptions, id) {
|
||||
true
|
||||
} else {
|
||||
let mut subscriptions = self
|
||||
.subscriptions
|
||||
.gossip_account_subscriptions
|
||||
.write()
|
||||
.unwrap();
|
||||
remove_subscription(&mut subscriptions, id)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_program_subscription(
|
||||
&self,
|
||||
@ -463,7 +520,17 @@ impl RpcSubscriptions {
|
||||
sub_id: SubscriptionId,
|
||||
subscriber: Subscriber<Response<RpcKeyedAccount>>,
|
||||
) {
|
||||
let mut subscriptions = self.subscriptions.program_subscriptions.write().unwrap();
|
||||
let commitment_level = commitment
|
||||
.unwrap_or_else(CommitmentConfig::recent)
|
||||
.commitment;
|
||||
let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip {
|
||||
self.subscriptions
|
||||
.gossip_program_subscriptions
|
||||
.write()
|
||||
.unwrap()
|
||||
} else {
|
||||
self.subscriptions.program_subscriptions.write().unwrap()
|
||||
};
|
||||
add_subscription(
|
||||
&mut subscriptions,
|
||||
program_id,
|
||||
@ -476,8 +543,17 @@ impl RpcSubscriptions {
|
||||
|
||||
pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool {
|
||||
let mut subscriptions = self.subscriptions.program_subscriptions.write().unwrap();
|
||||
if remove_subscription(&mut subscriptions, id) {
|
||||
true
|
||||
} else {
|
||||
let mut subscriptions = self
|
||||
.subscriptions
|
||||
.gossip_program_subscriptions
|
||||
.write()
|
||||
.unwrap();
|
||||
remove_subscription(&mut subscriptions, id)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_signature_subscription(
|
||||
&self,
|
||||
@ -486,7 +562,17 @@ impl RpcSubscriptions {
|
||||
sub_id: SubscriptionId,
|
||||
subscriber: Subscriber<Response<RpcSignatureResult>>,
|
||||
) {
|
||||
let mut subscriptions = self.subscriptions.signature_subscriptions.write().unwrap();
|
||||
let commitment_level = commitment
|
||||
.unwrap_or_else(CommitmentConfig::recent)
|
||||
.commitment;
|
||||
let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip {
|
||||
self.subscriptions
|
||||
.gossip_signature_subscriptions
|
||||
.write()
|
||||
.unwrap()
|
||||
} else {
|
||||
self.subscriptions.signature_subscriptions.write().unwrap()
|
||||
};
|
||||
add_subscription(
|
||||
&mut subscriptions,
|
||||
signature,
|
||||
@ -499,8 +585,17 @@ impl RpcSubscriptions {
|
||||
|
||||
pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool {
|
||||
let mut subscriptions = self.subscriptions.signature_subscriptions.write().unwrap();
|
||||
if remove_subscription(&mut subscriptions, id) {
|
||||
true
|
||||
} else {
|
||||
let mut subscriptions = self
|
||||
.subscriptions
|
||||
.gossip_signature_subscriptions
|
||||
.write()
|
||||
.unwrap();
|
||||
remove_subscription(&mut subscriptions, id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify subscribers of changes to any accounts or new signatures since
|
||||
/// the bank's last checkpoint.
|
||||
@ -508,6 +603,12 @@ impl RpcSubscriptions {
|
||||
self.enqueue_notification(NotificationEntry::Bank(cache_slot_info));
|
||||
}
|
||||
|
||||
/// Notify SingleGossip commitment-level subscribers of changes to any accounts or new
|
||||
/// signatures.
|
||||
pub fn notify_gossip_subscribers(&self, slot: Slot) {
|
||||
self.enqueue_notification(NotificationEntry::Gossip(slot));
|
||||
}
|
||||
|
||||
pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<SlotInfo>) {
|
||||
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
||||
let mut subscriptions = self.subscriptions.slot_subscriptions.write().unwrap();
|
||||
@ -523,6 +624,21 @@ impl RpcSubscriptions {
|
||||
self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root }));
|
||||
}
|
||||
|
||||
pub fn add_vote_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<RpcVote>) {
|
||||
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
||||
let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap();
|
||||
subscriptions.insert(sub_id, sink);
|
||||
}
|
||||
|
||||
pub fn remove_vote_subscription(&self, id: &SubscriptionId) -> bool {
|
||||
let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap();
|
||||
subscriptions.remove(id).is_some()
|
||||
}
|
||||
|
||||
pub fn notify_vote(&self, vote: &Vote) {
|
||||
self.enqueue_notification(NotificationEntry::Vote(vote.clone()));
|
||||
}
|
||||
|
||||
pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<Slot>) {
|
||||
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
||||
let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap();
|
||||
@ -564,6 +680,7 @@ impl RpcSubscriptions {
|
||||
notification_receiver: Receiver<NotificationEntry>,
|
||||
subscriptions: Subscriptions,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
last_checked_slots: Arc<RwLock<HashMap<CommitmentLevel, Slot>>>,
|
||||
) {
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
@ -577,6 +694,19 @@ impl RpcSubscriptions {
|
||||
notifier.notify(slot_info, sink);
|
||||
}
|
||||
}
|
||||
NotificationEntry::Vote(ref vote_info) => {
|
||||
let subscriptions = subscriptions.vote_subscriptions.read().unwrap();
|
||||
for (_, sink) in subscriptions.iter() {
|
||||
notifier.notify(
|
||||
RpcVote {
|
||||
slots: vote_info.slots.clone(),
|
||||
hash: bs58::encode(vote_info.hash).into_string(),
|
||||
timestamp: vote_info.timestamp,
|
||||
},
|
||||
sink,
|
||||
);
|
||||
}
|
||||
}
|
||||
NotificationEntry::Root(root) => {
|
||||
let subscriptions = subscriptions.root_subscriptions.read().unwrap();
|
||||
for (_, sink) in subscriptions.iter() {
|
||||
@ -584,47 +714,32 @@ impl RpcSubscriptions {
|
||||
}
|
||||
}
|
||||
NotificationEntry::Bank(cache_slot_info) => {
|
||||
let pubkeys: Vec<_> = {
|
||||
let subs = subscriptions.account_subscriptions.read().unwrap();
|
||||
subs.keys().cloned().collect()
|
||||
};
|
||||
for pubkey in &pubkeys {
|
||||
Self::check_account(
|
||||
pubkey,
|
||||
RpcSubscriptions::notify_accounts_programs_signatures(
|
||||
&subscriptions.account_subscriptions,
|
||||
&subscriptions.program_subscriptions,
|
||||
&subscriptions.signature_subscriptions,
|
||||
&bank_forks,
|
||||
subscriptions.account_subscriptions.clone(),
|
||||
¬ifier,
|
||||
&cache_slot_info,
|
||||
);
|
||||
¬ifier,
|
||||
)
|
||||
}
|
||||
|
||||
let programs: Vec<_> = {
|
||||
let subs = subscriptions.program_subscriptions.read().unwrap();
|
||||
subs.keys().cloned().collect()
|
||||
NotificationEntry::Gossip(slot) => {
|
||||
let _ = last_checked_slots
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(CommitmentLevel::SingleGossip, slot);
|
||||
let cache_slot_info = CacheSlotInfo {
|
||||
highest_confirmed_slot: slot,
|
||||
..CacheSlotInfo::default()
|
||||
};
|
||||
for program_id in &programs {
|
||||
Self::check_program(
|
||||
program_id,
|
||||
RpcSubscriptions::notify_accounts_programs_signatures(
|
||||
&subscriptions.gossip_account_subscriptions,
|
||||
&subscriptions.gossip_program_subscriptions,
|
||||
&subscriptions.gossip_signature_subscriptions,
|
||||
&bank_forks,
|
||||
subscriptions.program_subscriptions.clone(),
|
||||
¬ifier,
|
||||
&cache_slot_info,
|
||||
);
|
||||
}
|
||||
|
||||
let signatures: Vec<_> = {
|
||||
let subs = subscriptions.signature_subscriptions.read().unwrap();
|
||||
subs.keys().cloned().collect()
|
||||
};
|
||||
for signature in &signatures {
|
||||
Self::check_signature(
|
||||
signature,
|
||||
&bank_forks,
|
||||
subscriptions.signature_subscriptions.clone(),
|
||||
¬ifier,
|
||||
&cache_slot_info,
|
||||
);
|
||||
}
|
||||
)
|
||||
}
|
||||
},
|
||||
Err(RecvTimeoutError::Timeout) => {
|
||||
@ -638,6 +753,57 @@ impl RpcSubscriptions {
|
||||
}
|
||||
}
|
||||
|
||||
fn notify_accounts_programs_signatures(
|
||||
account_subscriptions: &Arc<RpcAccountSubscriptions>,
|
||||
program_subscriptions: &Arc<RpcProgramSubscriptions>,
|
||||
signature_subscriptions: &Arc<RpcSignatureSubscriptions>,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
cache_slot_info: &CacheSlotInfo,
|
||||
notifier: &RpcNotifier,
|
||||
) {
|
||||
let pubkeys: Vec<_> = {
|
||||
let subs = account_subscriptions.read().unwrap();
|
||||
subs.keys().cloned().collect()
|
||||
};
|
||||
for pubkey in &pubkeys {
|
||||
Self::check_account(
|
||||
pubkey,
|
||||
&bank_forks,
|
||||
account_subscriptions.clone(),
|
||||
¬ifier,
|
||||
&cache_slot_info,
|
||||
);
|
||||
}
|
||||
|
||||
let programs: Vec<_> = {
|
||||
let subs = program_subscriptions.read().unwrap();
|
||||
subs.keys().cloned().collect()
|
||||
};
|
||||
for program_id in &programs {
|
||||
Self::check_program(
|
||||
program_id,
|
||||
&bank_forks,
|
||||
program_subscriptions.clone(),
|
||||
¬ifier,
|
||||
&cache_slot_info,
|
||||
);
|
||||
}
|
||||
|
||||
let signatures: Vec<_> = {
|
||||
let subs = signature_subscriptions.read().unwrap();
|
||||
subs.keys().cloned().collect()
|
||||
};
|
||||
for signature in &signatures {
|
||||
Self::check_signature(
|
||||
signature,
|
||||
&bank_forks,
|
||||
signature_subscriptions.clone(),
|
||||
¬ifier,
|
||||
&cache_slot_info,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn shutdown(&mut self) -> std::thread::Result<()> {
|
||||
if let Some(runtime) = self.notifier_runtime.take() {
|
||||
info!("RPC Notifier runtime - shutting down");
|
||||
@ -736,7 +902,12 @@ pub(crate) mod tests {
|
||||
),
|
||||
)),
|
||||
);
|
||||
subscriptions.add_account_subscription(alice.pubkey(), None, sub_id.clone(), subscriber);
|
||||
subscriptions.add_account_subscription(
|
||||
alice.pubkey(),
|
||||
Some(CommitmentConfig::recent()),
|
||||
sub_id.clone(),
|
||||
subscriber,
|
||||
);
|
||||
|
||||
assert!(subscriptions
|
||||
.subscriptions
|
||||
@ -1185,4 +1356,135 @@ pub(crate) mod tests {
|
||||
assert_eq!(subscriptions.len(), (num_keys - 1) as usize);
|
||||
assert!(subscriptions.get(&0).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_gossip_separate_account_notifications() {
|
||||
let GenesisConfigInfo {
|
||||
genesis_config,
|
||||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_config(100);
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||
let bank = Bank::new(&genesis_config);
|
||||
let blockhash = bank.last_blockhash();
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
|
||||
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
|
||||
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
|
||||
bank_forks.write().unwrap().insert(bank1);
|
||||
let alice = Keypair::new();
|
||||
|
||||
let (subscriber0, _id_receiver, transport_receiver0) =
|
||||
Subscriber::new_test("accountNotification");
|
||||
let (subscriber1, _id_receiver, transport_receiver1) =
|
||||
Subscriber::new_test("accountNotification");
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let subscriptions = RpcSubscriptions::new(
|
||||
&exit,
|
||||
bank_forks.clone(),
|
||||
Arc::new(RwLock::new(
|
||||
BlockCommitmentCache::new_for_tests_with_blockstore_bank(
|
||||
blockstore,
|
||||
bank_forks.read().unwrap().get(1).unwrap().clone(),
|
||||
1,
|
||||
),
|
||||
)),
|
||||
);
|
||||
let sub_id0 = SubscriptionId::Number(0 as u64);
|
||||
subscriptions.add_account_subscription(
|
||||
alice.pubkey(),
|
||||
Some(CommitmentConfig::single_gossip()),
|
||||
sub_id0.clone(),
|
||||
subscriber0,
|
||||
);
|
||||
let sub_id1 = SubscriptionId::Number(1 as u64);
|
||||
subscriptions.add_account_subscription(
|
||||
alice.pubkey(),
|
||||
Some(CommitmentConfig::recent()),
|
||||
sub_id1.clone(),
|
||||
subscriber1,
|
||||
);
|
||||
|
||||
assert!(subscriptions
|
||||
.subscriptions
|
||||
.account_subscriptions
|
||||
.read()
|
||||
.unwrap()
|
||||
.contains_key(&alice.pubkey()));
|
||||
|
||||
let tx = system_transaction::create_account(
|
||||
&mint_keypair,
|
||||
&alice,
|
||||
blockhash,
|
||||
1,
|
||||
16,
|
||||
&solana_budget_program::id(),
|
||||
);
|
||||
bank_forks
|
||||
.write()
|
||||
.unwrap()
|
||||
.get(1)
|
||||
.unwrap()
|
||||
.process_transaction(&tx)
|
||||
.unwrap();
|
||||
let mut cache_slot_info = CacheSlotInfo::default();
|
||||
cache_slot_info.current_slot = 1;
|
||||
subscriptions.notify_subscribers(cache_slot_info);
|
||||
let (response, _) = robust_poll_or_panic(transport_receiver1);
|
||||
let expected = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "accountNotification",
|
||||
"params": {
|
||||
"result": {
|
||||
"context": { "slot": 1 },
|
||||
"value": {
|
||||
"data": "1111111111111111",
|
||||
"executable": false,
|
||||
"lamports": 1,
|
||||
"owner": "Budget1111111111111111111111111111111111111",
|
||||
"rentEpoch": 1,
|
||||
},
|
||||
},
|
||||
"subscription": 1,
|
||||
}
|
||||
});
|
||||
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
|
||||
|
||||
subscriptions.notify_gossip_subscribers(1);
|
||||
let (response, _) = robust_poll_or_panic(transport_receiver0);
|
||||
let expected = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "accountNotification",
|
||||
"params": {
|
||||
"result": {
|
||||
"context": { "slot": 1 },
|
||||
"value": {
|
||||
"data": "1111111111111111",
|
||||
"executable": false,
|
||||
"lamports": 1,
|
||||
"owner": "Budget1111111111111111111111111111111111111",
|
||||
"rentEpoch": 1,
|
||||
},
|
||||
},
|
||||
"subscription": 0,
|
||||
}
|
||||
});
|
||||
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
|
||||
|
||||
subscriptions.remove_account_subscription(&sub_id0);
|
||||
assert!(subscriptions
|
||||
.subscriptions
|
||||
.account_subscriptions
|
||||
.read()
|
||||
.unwrap()
|
||||
.contains_key(&alice.pubkey()));
|
||||
subscriptions.remove_account_subscription(&sub_id1);
|
||||
assert!(!subscriptions
|
||||
.subscriptions
|
||||
.account_subscriptions
|
||||
.read()
|
||||
.unwrap()
|
||||
.contains_key(&alice.pubkey()));
|
||||
}
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ use crate::{
|
||||
cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker},
|
||||
fetch_stage::FetchStage,
|
||||
poh_recorder::{PohRecorder, WorkingBankEntry},
|
||||
rpc_subscriptions::RpcSubscriptions,
|
||||
sigverify::TransactionSigVerifier,
|
||||
sigverify_stage::{DisabledSigVerifier, SigVerifyStage},
|
||||
};
|
||||
@ -44,6 +45,7 @@ impl Tpu {
|
||||
tpu_forwards_sockets: Vec<UdpSocket>,
|
||||
broadcast_sockets: Vec<UdpSocket>,
|
||||
sigverify_disabled: bool,
|
||||
subscriptions: &Arc<RpcSubscriptions>,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
broadcast_type: &BroadcastStageType,
|
||||
@ -79,6 +81,7 @@ impl Tpu {
|
||||
&poh_recorder,
|
||||
vote_tracker,
|
||||
bank_forks,
|
||||
subscriptions.clone(),
|
||||
);
|
||||
|
||||
let banking_stage = BankingStage::new(
|
||||
|
@ -470,6 +470,7 @@ impl Validator {
|
||||
node.sockets.tpu_forwards,
|
||||
node.sockets.broadcast,
|
||||
config.dev_sigverify_disabled,
|
||||
&subscriptions,
|
||||
transaction_status_sender,
|
||||
&blockstore,
|
||||
&config.broadcast_stage_type,
|
||||
|
@ -1193,7 +1193,7 @@ After connecting to the RPC PubSub websocket at `ws://<ADDRESS>/`:
|
||||
|
||||
* Submit subscription requests to the websocket using the methods below
|
||||
* Multiple subscriptions may be active at once
|
||||
* Many subscriptions take the optional [`commitment` parameter](jsonrpc-api.md#configuring-state-commitment), defining how finalized a change should be to trigger a notification. For subscriptions, if commitment is unspecified, the default value is `recent`.
|
||||
* Many subscriptions take the optional [`commitment` parameter](jsonrpc-api.md#configuring-state-commitment), defining how finalized a change should be to trigger a notification. For subscriptions, if commitment is unspecified, the default value is `"single"`.
|
||||
|
||||
### accountSubscribe
|
||||
|
||||
@ -1204,8 +1204,6 @@ Subscribe to an account to receive notifications when the lamports or data for a
|
||||
* `<string>` - account Pubkey, as base-58 encoded string
|
||||
* `<object>` - (optional) [Commitment](jsonrpc-api.md#configuring-state-commitment)
|
||||
|
||||
Default: 0, Max: `MAX_LOCKOUT_HISTORY` \(greater integers rounded down\)
|
||||
|
||||
#### Results:
|
||||
|
||||
* `<number>` - Subscription id \(needed to unsubscribe\)
|
||||
@ -1216,7 +1214,7 @@ Subscribe to an account to receive notifications when the lamports or data for a
|
||||
// Request
|
||||
{"jsonrpc":"2.0", "id":1, "method":"accountSubscribe", "params":["CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12"]}
|
||||
|
||||
{"jsonrpc":"2.0", "id":1, "method":"accountSubscribe", "params":["CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12", 15]}
|
||||
{"jsonrpc":"2.0", "id":1, "method":"accountSubscribe", "params":["CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12", {"commitment": "single"}]}
|
||||
|
||||
// Result
|
||||
{"jsonrpc": "2.0","result": 0,"id": 1}
|
||||
@ -1225,7 +1223,25 @@ Subscribe to an account to receive notifications when the lamports or data for a
|
||||
#### Notification Format:
|
||||
|
||||
```bash
|
||||
{"jsonrpc": "2.0","method": "accountNotification", "params": {"result": {"executable":false,"owner":"4uQeVj5tqViQh7yWWGStvkEG1Zmhx6uasJtWCJziofM","lamports":1,"data":"Joig2k8Ax4JPMpWhXRyc2jMa7Wejz4X1xqVi3i7QRkmVj1ChUgNc4VNpGUQePJGBAui3c6886peU9GEbjsyeANN8JGStprwLbLwcw5wpPjuQQb9mwrjVmoDQBjj3MzZKgeHn6wmnQ5k8DBFuoCYKWWsJfH2gv9FvCzrN6K1CRcQZzF","rentEpoch":28},"subscription":0}}
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "accountNotification",
|
||||
"params": {
|
||||
"result": {
|
||||
"context": {
|
||||
"slot": 5199307
|
||||
},
|
||||
"value": {
|
||||
"data": "9qRxMDwy1ntDhBBoiy4Na9uDLbRTSzUS989mpwz",
|
||||
"executable": false,
|
||||
"lamports": 33594,
|
||||
"owner": "H9oaJujXETwkmjyweuqKPFtk2no4SumoU9A3hi3dC8U6",
|
||||
"rentEpoch": 635
|
||||
}
|
||||
},
|
||||
"subscription": 23784
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### accountUnsubscribe
|
||||
@ -1259,8 +1275,6 @@ Subscribe to a program to receive notifications when the lamports or data for a
|
||||
* `<string>` - program\_id Pubkey, as base-58 encoded string
|
||||
* `<object>` - (optional) [Commitment](jsonrpc-api.md#configuring-state-commitment)
|
||||
|
||||
Default: 0, Max: `MAX_LOCKOUT_HISTORY` \(greater integers rounded down\)
|
||||
|
||||
#### Results:
|
||||
|
||||
* `<integer>` - Subscription id \(needed to unsubscribe\)
|
||||
@ -1269,9 +1283,9 @@ Subscribe to a program to receive notifications when the lamports or data for a
|
||||
|
||||
```bash
|
||||
// Request
|
||||
{"jsonrpc":"2.0", "id":1, "method":"programSubscribe", "params":["9gZbPtbtHrs6hEWgd6MbVY9VPFtS5Z8xKtnYwA2NynHV"]}
|
||||
{"jsonrpc":"2.0", "id":1, "method":"programSubscribe", "params":["7BwE8yitxiWkD8jVPFvPmV7rs2Znzi4NHzJGLu2dzpUq"]}
|
||||
|
||||
{"jsonrpc":"2.0", "id":1, "method":"programSubscribe", "params":["9gZbPtbtHrs6hEWgd6MbVY9VPFtS5Z8xKtnYwA2NynHV", 15]}
|
||||
{"jsonrpc":"2.0", "id":1, "method":"programSubscribe", "params":["7BwE8yitxiWkD8jVPFvPmV7rs2Znzi4NHzJGLu2dzpUq", {"commitment": "single"}]}
|
||||
|
||||
// Result
|
||||
{"jsonrpc": "2.0","result": 0,"id": 1}
|
||||
@ -1279,12 +1293,30 @@ Subscribe to a program to receive notifications when the lamports or data for a
|
||||
|
||||
#### Notification Format:
|
||||
|
||||
* `<string>` - account Pubkey, as base-58 encoded string
|
||||
* `<object>` - account info JSON object \(see [getAccountInfo](jsonrpc-api.md#getaccountinfo) for field details\)
|
||||
|
||||
```bash
|
||||
{"jsonrpc":"2.0","method":"programNotification","params":{{"result":["8Rshv2oMkPu5E4opXTRyuyBeZBqQ4S477VG26wUTFxUM",{"executable":false,"lamports":1,"owner":"9gZbPtbtHrs6hEWgd6MbVY9VPFtS5Z8xKtnYwA2NynHV","data":"4SZWhnbSt3njU4QHVgPrWeekz1BudU4ttmdr9ezmrL4X6XeLeL83xVAo6ZdxwU3oXgHNeF2q6tWZbnVnBXmvNyeLVEGt8ZQ4ZmgjHfVNCEwBtzh2aDrHgQSjBFLYAdmM3uwBhcm1EyHJLeUiFqpsoAUhn6Vphwrpf44dWRAGsAJZbzvVrUW9bfucpR7xudHHg2MxQ2CdqsfS3TfWUJY3vaf2A4AUNzfAmNPHBGi99nU2hYubGSVSPcpVPpdRWQkydgqasBmTosd","rentEpoch":28}],"subscription":0}}
|
||||
```
|
||||
```bash
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "programNotification",
|
||||
"params": {
|
||||
"result": {
|
||||
"context": {
|
||||
"slot": 5208469
|
||||
},
|
||||
"value": {
|
||||
"pubkey": "H4vnBqifaSACnKa7acsxstsY1iV1bvJNxsCY7enrd1hq"
|
||||
"account": {
|
||||
"data": "9qRxMDwy1ntDhBBoiy4Na9uDLbRTSzUS989m",
|
||||
"executable": false,
|
||||
"lamports": 33594,
|
||||
"owner": "7BwE8yitxiWkD8jVPFvPmV7rs2Znzi4NHzJGLu2dzpUq",
|
||||
"rentEpoch": 636
|
||||
},
|
||||
}
|
||||
},
|
||||
"subscription": 24040
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### programUnsubscribe
|
||||
|
||||
@ -1329,7 +1361,7 @@ Subscribe to a transaction signature to receive notification when the transactio
|
||||
// Request
|
||||
{"jsonrpc":"2.0", "id":1, "method":"signatureSubscribe", "params":["2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b"]}
|
||||
|
||||
{"jsonrpc":"2.0", "id":1, "method":"signatureSubscribe", "params":["2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b", 15]}
|
||||
{"jsonrpc":"2.0", "id":1, "method":"signatureSubscribe", "params":["2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b", {"commitment": "max"}]}
|
||||
|
||||
// Result
|
||||
{"jsonrpc": "2.0","result": 0,"id": 1}
|
||||
@ -1338,7 +1370,21 @@ Subscribe to a transaction signature to receive notification when the transactio
|
||||
#### Notification Format:
|
||||
|
||||
```bash
|
||||
{"jsonrpc": "2.0","method": "signatureNotification", "params": {"result": {"err": null}, "subscription":0}}
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "signatureNotification",
|
||||
"params": {
|
||||
"result": {
|
||||
"context": {
|
||||
"slot": 5207624
|
||||
},
|
||||
"value": {
|
||||
"err": null
|
||||
}
|
||||
},
|
||||
"subscription": 24006
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### signatureUnsubscribe
|
||||
@ -1388,7 +1434,18 @@ None
|
||||
#### Notification Format:
|
||||
|
||||
```bash
|
||||
{"jsonrpc": "2.0","method": "slotNotification", "params": {"result":{"parent":75,"root":44,"slot":76},"subscription":0}}
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "slotNotification",
|
||||
"params": {
|
||||
"result": {
|
||||
"parent": 75,
|
||||
"root": 44,
|
||||
"slot": 76
|
||||
},
|
||||
"subscription": 0
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### slotUnsubscribe
|
||||
@ -1440,7 +1497,14 @@ None
|
||||
The result is the latest root slot number.
|
||||
|
||||
```bash
|
||||
{"jsonrpc": "2.0","method": "rootNotification", "params": {"result":42,"subscription":0}}
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "rootNotification",
|
||||
"params": {
|
||||
"result": 42,
|
||||
"subscription": 0
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### rootUnsubscribe
|
||||
@ -1464,3 +1528,57 @@ Unsubscribe from root notifications
|
||||
// Result
|
||||
{"jsonrpc": "2.0","result": true,"id": 1}
|
||||
```
|
||||
|
||||
### voteSubscribe
|
||||
|
||||
Subscribe to receive notification anytime a new vote is observed in gossip.
|
||||
These votes are pre-consensus therefore there is no guarantee these votes will
|
||||
enter the ledger.
|
||||
|
||||
#### Parameters:
|
||||
|
||||
None
|
||||
|
||||
#### Results:
|
||||
|
||||
* `integer` - subscription id \(needed to unsubscribe\)
|
||||
|
||||
#### Example:
|
||||
|
||||
```bash
|
||||
// Request
|
||||
{"jsonrpc":"2.0", "id":1, "method":"voteSubscribe"}
|
||||
|
||||
// Result
|
||||
{"jsonrpc": "2.0","result": 0,"id": 1}
|
||||
```
|
||||
|
||||
#### Notification Format:
|
||||
|
||||
The result is the latest vote, containing its hash, a list of voted slots, and an optional timestamp.
|
||||
|
||||
```bash
|
||||
{"jsonrpc":"2.0","method":"voteNotification","params":{"result":{"hash":"8Rshv2oMkPu5E4opXTRyuyBeZBqQ4S477VG26wUTFxUM","slots":[1,2],"timestamp":null},"subscription":0}}
|
||||
```
|
||||
|
||||
### voteUnsubscribe
|
||||
|
||||
Unsubscribe from vote notifications
|
||||
|
||||
#### Parameters:
|
||||
|
||||
* `<integer>` - subscription id to cancel
|
||||
|
||||
#### Results:
|
||||
|
||||
* `<bool>` - unsubscribe success message
|
||||
|
||||
#### Example:
|
||||
|
||||
```bash
|
||||
// Request
|
||||
{"jsonrpc":"2.0", "id":1, "method":"voteUnsubscribe", "params":[0]}
|
||||
|
||||
// Result
|
||||
{"jsonrpc": "2.0","result": true,"id": 1}
|
||||
```
|
||||
|
@ -37,6 +37,12 @@ impl CommitmentConfig {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn single_gossip() -> Self {
|
||||
Self {
|
||||
commitment: CommitmentLevel::SingleGossip,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ok(self) -> Option<Self> {
|
||||
if self == Self::default() {
|
||||
None
|
||||
@ -46,11 +52,12 @@ impl CommitmentConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq)]
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub enum CommitmentLevel {
|
||||
Max,
|
||||
Recent,
|
||||
Root,
|
||||
Single,
|
||||
SingleGossip,
|
||||
}
|
||||
|
Reference in New Issue
Block a user