Add Vote PubSub endpoint for live gossip votes. (#10045)
* Add Vote PubSub endpoint for live gossip votes. * Updated tests for Vote RPC and Vote Listener * Add JSON RPC documentation for Vote RPC. * Base58 encode hash in Vote RPC response.
This commit is contained in:
@ -3,6 +3,7 @@ use crate::{
|
|||||||
crds_value::CrdsValueLabel,
|
crds_value::CrdsValueLabel,
|
||||||
poh_recorder::PohRecorder,
|
poh_recorder::PohRecorder,
|
||||||
result::{Error, Result},
|
result::{Error, Result},
|
||||||
|
rpc_subscriptions::RpcSubscriptions,
|
||||||
sigverify,
|
sigverify,
|
||||||
verified_vote_packets::VerifiedVotePackets,
|
verified_vote_packets::VerifiedVotePackets,
|
||||||
};
|
};
|
||||||
@ -202,6 +203,7 @@ impl ClusterInfoVoteListener {
|
|||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
vote_tracker: Arc<VoteTracker>,
|
vote_tracker: Arc<VoteTracker>,
|
||||||
bank_forks: Arc<RwLock<BankForks>>,
|
bank_forks: Arc<RwLock<BankForks>>,
|
||||||
|
subscriptions: Arc<RpcSubscriptions>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let exit_ = exit.clone();
|
let exit_ = exit.clone();
|
||||||
|
|
||||||
@ -242,6 +244,7 @@ impl ClusterInfoVoteListener {
|
|||||||
verified_vote_transactions_receiver,
|
verified_vote_transactions_receiver,
|
||||||
vote_tracker,
|
vote_tracker,
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
|
subscriptions,
|
||||||
);
|
);
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@ -364,6 +367,7 @@ impl ClusterInfoVoteListener {
|
|||||||
vote_txs_receiver: VerifiedVoteTransactionsReceiver,
|
vote_txs_receiver: VerifiedVoteTransactionsReceiver,
|
||||||
vote_tracker: Arc<VoteTracker>,
|
vote_tracker: Arc<VoteTracker>,
|
||||||
bank_forks: &RwLock<BankForks>,
|
bank_forks: &RwLock<BankForks>,
|
||||||
|
subscriptions: Arc<RpcSubscriptions>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
loop {
|
loop {
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
@ -373,9 +377,12 @@ impl ClusterInfoVoteListener {
|
|||||||
let root_bank = bank_forks.read().unwrap().root_bank().clone();
|
let root_bank = bank_forks.read().unwrap().root_bank().clone();
|
||||||
vote_tracker.process_new_root_bank(&root_bank);
|
vote_tracker.process_new_root_bank(&root_bank);
|
||||||
|
|
||||||
if let Err(e) =
|
if let Err(e) = Self::get_and_process_votes(
|
||||||
Self::get_and_process_votes(&vote_txs_receiver, &vote_tracker, root_bank.slot())
|
&vote_txs_receiver,
|
||||||
{
|
&vote_tracker,
|
||||||
|
root_bank.slot(),
|
||||||
|
subscriptions.clone(),
|
||||||
|
) {
|
||||||
match e {
|
match e {
|
||||||
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => {
|
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@ -389,21 +396,37 @@ impl ClusterInfoVoteListener {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn get_and_process_votes_for_tests(
|
||||||
|
vote_txs_receiver: &VerifiedVoteTransactionsReceiver,
|
||||||
|
vote_tracker: &Arc<VoteTracker>,
|
||||||
|
last_root: Slot,
|
||||||
|
subscriptions: Arc<RpcSubscriptions>,
|
||||||
|
) -> Result<()> {
|
||||||
|
Self::get_and_process_votes(vote_txs_receiver, vote_tracker, last_root, subscriptions)
|
||||||
|
}
|
||||||
|
|
||||||
fn get_and_process_votes(
|
fn get_and_process_votes(
|
||||||
vote_txs_receiver: &VerifiedVoteTransactionsReceiver,
|
vote_txs_receiver: &VerifiedVoteTransactionsReceiver,
|
||||||
vote_tracker: &Arc<VoteTracker>,
|
vote_tracker: &Arc<VoteTracker>,
|
||||||
last_root: Slot,
|
last_root: Slot,
|
||||||
|
subscriptions: Arc<RpcSubscriptions>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::from_millis(200);
|
let timer = Duration::from_millis(200);
|
||||||
let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?;
|
let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?;
|
||||||
while let Ok(new_txs) = vote_txs_receiver.try_recv() {
|
while let Ok(new_txs) = vote_txs_receiver.try_recv() {
|
||||||
vote_txs.extend(new_txs);
|
vote_txs.extend(new_txs);
|
||||||
}
|
}
|
||||||
Self::process_votes(vote_tracker, vote_txs, last_root);
|
Self::process_votes(vote_tracker, vote_txs, last_root, subscriptions);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_votes(vote_tracker: &VoteTracker, vote_txs: Vec<Transaction>, root: Slot) {
|
fn process_votes(
|
||||||
|
vote_tracker: &VoteTracker,
|
||||||
|
vote_txs: Vec<Transaction>,
|
||||||
|
root: Slot,
|
||||||
|
subscriptions: Arc<RpcSubscriptions>,
|
||||||
|
) {
|
||||||
let mut diff: HashMap<Slot, HashSet<Arc<Pubkey>>> = HashMap::new();
|
let mut diff: HashMap<Slot, HashSet<Arc<Pubkey>>> = HashMap::new();
|
||||||
{
|
{
|
||||||
let all_slot_trackers = &vote_tracker.slot_vote_trackers;
|
let all_slot_trackers = &vote_tracker.slot_vote_trackers;
|
||||||
@ -455,7 +478,7 @@ impl ClusterInfoVoteListener {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
for slot in vote.slots {
|
for &slot in vote.slots.iter() {
|
||||||
if slot <= root {
|
if slot <= root {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -480,6 +503,8 @@ impl ClusterInfoVoteListener {
|
|||||||
.or_default()
|
.or_default()
|
||||||
.insert(unduplicated_pubkey.unwrap());
|
.insert(unduplicated_pubkey.unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
subscriptions.notify_vote(&vote);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -519,6 +544,8 @@ impl ClusterInfoVoteListener {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::commitment::BlockCommitmentCache;
|
||||||
|
use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path};
|
||||||
use solana_perf::packet;
|
use solana_perf::packet;
|
||||||
use solana_runtime::{
|
use solana_runtime::{
|
||||||
bank::Bank,
|
bank::Bank,
|
||||||
@ -615,7 +642,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_update_new_root() {
|
fn test_update_new_root() {
|
||||||
let (vote_tracker, bank, _) = setup();
|
let (vote_tracker, bank, _, _) = setup();
|
||||||
|
|
||||||
// Check outdated slots are purged with new root
|
// Check outdated slots are purged with new root
|
||||||
let new_voter = Arc::new(Pubkey::new_rand());
|
let new_voter = Arc::new(Pubkey::new_rand());
|
||||||
@ -656,7 +683,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_update_new_leader_schedule_epoch() {
|
fn test_update_new_leader_schedule_epoch() {
|
||||||
let (vote_tracker, bank, _) = setup();
|
let (vote_tracker, bank, _, _) = setup();
|
||||||
|
|
||||||
// Check outdated slots are purged with new root
|
// Check outdated slots are purged with new root
|
||||||
let leader_schedule_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
let leader_schedule_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
||||||
@ -698,7 +725,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_process_votes() {
|
fn test_process_votes() {
|
||||||
// Create some voters at genesis
|
// Create some voters at genesis
|
||||||
let (vote_tracker, _, validator_voting_keypairs) = setup();
|
let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup();
|
||||||
let (votes_sender, votes_receiver) = unbounded();
|
let (votes_sender, votes_receiver) = unbounded();
|
||||||
|
|
||||||
let vote_slots = vec![1, 2];
|
let vote_slots = vec![1, 2];
|
||||||
@ -717,7 +744,13 @@ mod tests {
|
|||||||
});
|
});
|
||||||
|
|
||||||
// Check that all the votes were registered for each validator correctly
|
// Check that all the votes were registered for each validator correctly
|
||||||
ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker, 0).unwrap();
|
ClusterInfoVoteListener::get_and_process_votes(
|
||||||
|
&votes_receiver,
|
||||||
|
&vote_tracker,
|
||||||
|
0,
|
||||||
|
subscriptions,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
for vote_slot in vote_slots {
|
for vote_slot in vote_slots {
|
||||||
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap();
|
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap();
|
||||||
let r_slot_vote_tracker = slot_vote_tracker.read().unwrap();
|
let r_slot_vote_tracker = slot_vote_tracker.read().unwrap();
|
||||||
@ -736,7 +769,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_process_votes2() {
|
fn test_process_votes2() {
|
||||||
// Create some voters at genesis
|
// Create some voters at genesis
|
||||||
let (vote_tracker, _, validator_voting_keypairs) = setup();
|
let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup();
|
||||||
// Send some votes to process
|
// Send some votes to process
|
||||||
let (votes_sender, votes_receiver) = unbounded();
|
let (votes_sender, votes_receiver) = unbounded();
|
||||||
|
|
||||||
@ -760,7 +793,13 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check that all the votes were registered for each validator correctly
|
// Check that all the votes were registered for each validator correctly
|
||||||
ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker, 0).unwrap();
|
ClusterInfoVoteListener::get_and_process_votes(
|
||||||
|
&votes_receiver,
|
||||||
|
&vote_tracker,
|
||||||
|
0,
|
||||||
|
subscriptions,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() {
|
for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() {
|
||||||
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(i as u64 + 1).unwrap();
|
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(i as u64 + 1).unwrap();
|
||||||
let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap();
|
let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap();
|
||||||
@ -779,7 +818,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_get_voters_by_epoch() {
|
fn test_get_voters_by_epoch() {
|
||||||
// Create some voters at genesis
|
// Create some voters at genesis
|
||||||
let (vote_tracker, bank, validator_voting_keypairs) = setup();
|
let (vote_tracker, bank, validator_voting_keypairs, _) = setup();
|
||||||
let last_known_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
let last_known_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
||||||
let last_known_slot = bank
|
let last_known_slot = bank
|
||||||
.epoch_schedule()
|
.epoch_schedule()
|
||||||
@ -850,11 +889,23 @@ mod tests {
|
|||||||
100,
|
100,
|
||||||
);
|
);
|
||||||
let bank = Bank::new(&genesis_config);
|
let bank = Bank::new(&genesis_config);
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
let bank_forks = BankForks::new(0, bank);
|
||||||
|
let bank = bank_forks.get(0).unwrap().clone();
|
||||||
|
let vote_tracker = VoteTracker::new(&bank);
|
||||||
|
let ledger_path = get_tmp_ledger_path!();
|
||||||
|
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||||
|
let subscriptions = Arc::new(RpcSubscriptions::new(
|
||||||
|
&exit,
|
||||||
|
Arc::new(RwLock::new(bank_forks)),
|
||||||
|
Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore(
|
||||||
|
blockstore.clone(),
|
||||||
|
))),
|
||||||
|
));
|
||||||
|
|
||||||
// Send a vote to process, should add a reference to the pubkey for that voter
|
// Send a vote to process, should add a reference to the pubkey for that voter
|
||||||
// in the tracker
|
// in the tracker
|
||||||
let validator0_keypairs = &validator_voting_keypairs[0];
|
let validator0_keypairs = &validator_voting_keypairs[0];
|
||||||
let vote_tracker = VoteTracker::new(&bank);
|
|
||||||
let vote_tx = vec![vote_transaction::new_vote_transaction(
|
let vote_tx = vec![vote_transaction::new_vote_transaction(
|
||||||
// Must vote > root to be processed
|
// Must vote > root to be processed
|
||||||
vec![bank.slot() + 1],
|
vec![bank.slot() + 1],
|
||||||
@ -865,7 +916,7 @@ mod tests {
|
|||||||
&validator0_keypairs.vote_keypair,
|
&validator0_keypairs.vote_keypair,
|
||||||
)];
|
)];
|
||||||
|
|
||||||
ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx, 0);
|
ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx, 0, subscriptions.clone());
|
||||||
let ref_count = Arc::strong_count(
|
let ref_count = Arc::strong_count(
|
||||||
&vote_tracker
|
&vote_tracker
|
||||||
.keys
|
.keys
|
||||||
@ -915,7 +966,7 @@ mod tests {
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0);
|
ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0, subscriptions);
|
||||||
|
|
||||||
let ref_count = Arc::strong_count(
|
let ref_count = Arc::strong_count(
|
||||||
&vote_tracker
|
&vote_tracker
|
||||||
@ -929,7 +980,12 @@ mod tests {
|
|||||||
assert_eq!(ref_count, current_ref_count);
|
assert_eq!(ref_count, current_ref_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn setup() -> (Arc<VoteTracker>, Arc<Bank>, Vec<ValidatorVoteKeypairs>) {
|
fn setup() -> (
|
||||||
|
Arc<VoteTracker>,
|
||||||
|
Arc<Bank>,
|
||||||
|
Vec<ValidatorVoteKeypairs>,
|
||||||
|
Arc<RpcSubscriptions>,
|
||||||
|
) {
|
||||||
let validator_voting_keypairs: Vec<_> = (0..10)
|
let validator_voting_keypairs: Vec<_> = (0..10)
|
||||||
.map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new()))
|
.map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new()))
|
||||||
.collect();
|
.collect();
|
||||||
@ -941,6 +997,18 @@ mod tests {
|
|||||||
);
|
);
|
||||||
let bank = Bank::new(&genesis_config);
|
let bank = Bank::new(&genesis_config);
|
||||||
let vote_tracker = VoteTracker::new(&bank);
|
let vote_tracker = VoteTracker::new(&bank);
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
let bank_forks = BankForks::new(0, bank);
|
||||||
|
let bank = bank_forks.get(0).unwrap().clone();
|
||||||
|
let ledger_path = get_tmp_ledger_path!();
|
||||||
|
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||||
|
let subscriptions = Arc::new(RpcSubscriptions::new(
|
||||||
|
&exit,
|
||||||
|
Arc::new(RwLock::new(bank_forks)),
|
||||||
|
Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore(
|
||||||
|
blockstore.clone(),
|
||||||
|
))),
|
||||||
|
));
|
||||||
|
|
||||||
// Integrity Checks
|
// Integrity Checks
|
||||||
let current_epoch = bank.epoch();
|
let current_epoch = bank.epoch();
|
||||||
@ -967,8 +1035,9 @@ mod tests {
|
|||||||
assert_eq!(*vote_tracker.current_epoch.read().unwrap(), current_epoch);
|
assert_eq!(*vote_tracker.current_epoch.read().unwrap(), current_epoch);
|
||||||
(
|
(
|
||||||
Arc::new(vote_tracker),
|
Arc::new(vote_tracker),
|
||||||
Arc::new(bank),
|
bank,
|
||||||
validator_voting_keypairs,
|
validator_voting_keypairs,
|
||||||
|
subscriptions,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
//! The `pubsub` module implements a threaded subscription service on client RPC request
|
//! The `pubsub` module implements a threaded subscription service on client RPC request
|
||||||
|
|
||||||
use crate::rpc_subscriptions::{RpcSubscriptions, SlotInfo};
|
use crate::rpc_subscriptions::{RpcSubscriptions, RpcVote, SlotInfo};
|
||||||
use jsonrpc_core::{Error, ErrorCode, Result};
|
use jsonrpc_core::{Error, ErrorCode, Result};
|
||||||
use jsonrpc_derive::rpc;
|
use jsonrpc_derive::rpc;
|
||||||
use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId};
|
use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId};
|
||||||
@ -114,6 +114,18 @@ pub trait RpcSolPubSub {
|
|||||||
)]
|
)]
|
||||||
fn slot_unsubscribe(&self, meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool>;
|
fn slot_unsubscribe(&self, meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool>;
|
||||||
|
|
||||||
|
// Get notification when vote is encountered
|
||||||
|
#[pubsub(subscription = "voteNotification", subscribe, name = "voteSubscribe")]
|
||||||
|
fn vote_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<RpcVote>);
|
||||||
|
|
||||||
|
// Unsubscribe from vote notification subscription.
|
||||||
|
#[pubsub(
|
||||||
|
subscription = "voteNotification",
|
||||||
|
unsubscribe,
|
||||||
|
name = "voteUnsubscribe"
|
||||||
|
)]
|
||||||
|
fn vote_unsubscribe(&self, meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool>;
|
||||||
|
|
||||||
// Get notification when a new root is set
|
// Get notification when a new root is set
|
||||||
#[pubsub(subscription = "rootNotification", subscribe, name = "rootSubscribe")]
|
#[pubsub(subscription = "rootNotification", subscribe, name = "rootSubscribe")]
|
||||||
fn root_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<Slot>);
|
fn root_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<Slot>);
|
||||||
@ -295,6 +307,27 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn vote_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<RpcVote>) {
|
||||||
|
info!("vote_subscribe");
|
||||||
|
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
|
||||||
|
let sub_id = SubscriptionId::Number(id as u64);
|
||||||
|
info!("vote_subscribe: id={:?}", sub_id);
|
||||||
|
self.subscriptions.add_vote_subscription(sub_id, subscriber);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn vote_unsubscribe(&self, _meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {
|
||||||
|
info!("vote_unsubscribe");
|
||||||
|
if self.subscriptions.remove_vote_subscription(&id) {
|
||||||
|
Ok(true)
|
||||||
|
} else {
|
||||||
|
Err(Error {
|
||||||
|
code: ErrorCode::InvalidParams,
|
||||||
|
message: "Invalid Request: Subscription id does not exist".into(),
|
||||||
|
data: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn root_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<Slot>) {
|
fn root_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<Slot>) {
|
||||||
info!("root_subscribe");
|
info!("root_subscribe");
|
||||||
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
|
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
|
||||||
@ -321,9 +354,11 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{
|
use crate::{
|
||||||
|
cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker},
|
||||||
commitment::{BlockCommitment, BlockCommitmentCache},
|
commitment::{BlockCommitment, BlockCommitmentCache},
|
||||||
rpc_subscriptions::tests::robust_poll_or_panic,
|
rpc_subscriptions::tests::robust_poll_or_panic,
|
||||||
};
|
};
|
||||||
|
use crossbeam_channel::unbounded;
|
||||||
use jsonrpc_core::{futures::sync::mpsc, Response};
|
use jsonrpc_core::{futures::sync::mpsc, Response};
|
||||||
use jsonrpc_pubsub::{PubSubHandler, Session};
|
use jsonrpc_pubsub::{PubSubHandler, Session};
|
||||||
use serial_test_derive::serial;
|
use serial_test_derive::serial;
|
||||||
@ -333,13 +368,18 @@ mod tests {
|
|||||||
genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
||||||
get_tmp_ledger_path,
|
get_tmp_ledger_path,
|
||||||
};
|
};
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::{
|
||||||
|
bank::Bank,
|
||||||
|
genesis_utils::{create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs},
|
||||||
|
};
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
|
hash::Hash,
|
||||||
pubkey::Pubkey,
|
pubkey::Pubkey,
|
||||||
signature::{Keypair, Signer},
|
signature::{Keypair, Signer},
|
||||||
system_program, system_transaction,
|
system_program, system_transaction,
|
||||||
transaction::{self, Transaction},
|
transaction::{self, Transaction},
|
||||||
};
|
};
|
||||||
|
use solana_vote_program::vote_transaction;
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
sync::{atomic::AtomicBool, RwLock},
|
sync::{atomic::AtomicBool, RwLock},
|
||||||
@ -831,4 +871,97 @@ mod tests {
|
|||||||
.slot_unsubscribe(Some(session), SubscriptionId::Number(0))
|
.slot_unsubscribe(Some(session), SubscriptionId::Number(0))
|
||||||
.is_ok());
|
.is_ok());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[serial]
|
||||||
|
fn test_vote_subscribe() {
|
||||||
|
let ledger_path = get_tmp_ledger_path!();
|
||||||
|
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||||
|
let block_commitment_cache = Arc::new(RwLock::new(
|
||||||
|
BlockCommitmentCache::new_for_tests_with_blockstore(blockstore.clone()),
|
||||||
|
));
|
||||||
|
|
||||||
|
let validator_voting_keypairs: Vec<_> = (0..10)
|
||||||
|
.map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new()))
|
||||||
|
.collect();
|
||||||
|
let GenesisConfigInfo { genesis_config, .. } =
|
||||||
|
create_genesis_config_with_vote_accounts(10_000, &validator_voting_keypairs, 100);
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
let bank = Bank::new(&genesis_config);
|
||||||
|
let bank_forks = BankForks::new(0, bank);
|
||||||
|
let bank = bank_forks.get(0).unwrap().clone();
|
||||||
|
let bank_forks = Arc::new(RwLock::new(bank_forks));
|
||||||
|
|
||||||
|
// Setup RPC
|
||||||
|
let mut rpc =
|
||||||
|
RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks.clone());
|
||||||
|
let session = create_session();
|
||||||
|
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("voteNotification");
|
||||||
|
|
||||||
|
// Setup Subscriptions
|
||||||
|
let subscriptions =
|
||||||
|
RpcSubscriptions::new(&exit, bank_forks.clone(), block_commitment_cache.clone());
|
||||||
|
rpc.subscriptions = Arc::new(subscriptions);
|
||||||
|
rpc.vote_subscribe(session, subscriber);
|
||||||
|
|
||||||
|
// Create some voters at genesis
|
||||||
|
let vote_tracker = VoteTracker::new(&bank);
|
||||||
|
let (votes_sender, votes_receiver) = unbounded();
|
||||||
|
let (vote_tracker, validator_voting_keypairs) =
|
||||||
|
(Arc::new(vote_tracker), validator_voting_keypairs);
|
||||||
|
|
||||||
|
let vote_slots = vec![1, 2];
|
||||||
|
validator_voting_keypairs.iter().for_each(|keypairs| {
|
||||||
|
let node_keypair = &keypairs.node_keypair;
|
||||||
|
let vote_keypair = &keypairs.vote_keypair;
|
||||||
|
let vote_tx = vote_transaction::new_vote_transaction(
|
||||||
|
vote_slots.clone(),
|
||||||
|
Hash::default(),
|
||||||
|
Hash::default(),
|
||||||
|
node_keypair,
|
||||||
|
vote_keypair,
|
||||||
|
vote_keypair,
|
||||||
|
);
|
||||||
|
votes_sender.send(vec![vote_tx]).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Process votes and check they were notified.
|
||||||
|
ClusterInfoVoteListener::get_and_process_votes_for_tests(
|
||||||
|
&votes_receiver,
|
||||||
|
&vote_tracker,
|
||||||
|
0,
|
||||||
|
rpc.subscriptions.clone(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let (response, _) = robust_poll_or_panic(receiver);
|
||||||
|
assert_eq!(
|
||||||
|
response,
|
||||||
|
r#"{"jsonrpc":"2.0","method":"voteNotification","params":{"result":{"hash":"11111111111111111111111111111111","slots":[1,2],"timestamp":null},"subscription":0}}"#
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[serial]
|
||||||
|
fn test_vote_unsubscribe() {
|
||||||
|
let ledger_path = get_tmp_ledger_path!();
|
||||||
|
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||||
|
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
||||||
|
let bank = Bank::new(&genesis_config);
|
||||||
|
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
|
||||||
|
let rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks);
|
||||||
|
let session = create_session();
|
||||||
|
let (subscriber, _id_receiver, _) = Subscriber::new_test("voteNotification");
|
||||||
|
rpc.vote_subscribe(session, subscriber);
|
||||||
|
|
||||||
|
let session = create_session();
|
||||||
|
assert!(rpc
|
||||||
|
.vote_unsubscribe(Some(session), SubscriptionId::Number(42))
|
||||||
|
.is_err());
|
||||||
|
|
||||||
|
let session = create_session();
|
||||||
|
assert!(rpc
|
||||||
|
.vote_unsubscribe(Some(session), SubscriptionId::Number(0))
|
||||||
|
.is_ok());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,12 +15,13 @@ use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore};
|
|||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
account::Account,
|
account::Account,
|
||||||
clock::Slot,
|
clock::{Slot, UnixTimestamp},
|
||||||
commitment_config::{CommitmentConfig, CommitmentLevel},
|
commitment_config::{CommitmentConfig, CommitmentLevel},
|
||||||
pubkey::Pubkey,
|
pubkey::Pubkey,
|
||||||
signature::Signature,
|
signature::Signature,
|
||||||
transaction,
|
transaction,
|
||||||
};
|
};
|
||||||
|
use solana_vote_program::vote_state::Vote;
|
||||||
use std::sync::{
|
use std::sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
mpsc::{Receiver, RecvTimeoutError, SendError, Sender},
|
mpsc::{Receiver, RecvTimeoutError, SendError, Sender},
|
||||||
@ -43,8 +44,17 @@ pub struct SlotInfo {
|
|||||||
pub root: Slot,
|
pub root: Slot,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A more human-friendly version of Vote, with the bank state signature base58 encoded.
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct RpcVote {
|
||||||
|
pub slots: Vec<Slot>,
|
||||||
|
pub hash: String,
|
||||||
|
pub timestamp: Option<UnixTimestamp>,
|
||||||
|
}
|
||||||
|
|
||||||
enum NotificationEntry {
|
enum NotificationEntry {
|
||||||
Slot(SlotInfo),
|
Slot(SlotInfo),
|
||||||
|
Vote(Vote),
|
||||||
Root(Slot),
|
Root(Slot),
|
||||||
Bank(Slot),
|
Bank(Slot),
|
||||||
}
|
}
|
||||||
@ -53,6 +63,7 @@ impl std::fmt::Debug for NotificationEntry {
|
|||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
NotificationEntry::Root(root) => write!(f, "Root({})", root),
|
NotificationEntry::Root(root) => write!(f, "Root({})", root),
|
||||||
|
NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote),
|
||||||
NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info),
|
NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info),
|
||||||
NotificationEntry::Bank(current_slot) => {
|
NotificationEntry::Bank(current_slot) => {
|
||||||
write!(f, "Bank({{current_slot: {:?}}})", current_slot)
|
write!(f, "Bank({{current_slot: {:?}}})", current_slot)
|
||||||
@ -74,6 +85,7 @@ type RpcSignatureSubscriptions = RwLock<
|
|||||||
HashMap<Signature, HashMap<SubscriptionId, SubscriptionData<Response<RpcSignatureResult>>>>,
|
HashMap<Signature, HashMap<SubscriptionId, SubscriptionData<Response<RpcSignatureResult>>>>,
|
||||||
>;
|
>;
|
||||||
type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
|
type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
|
||||||
|
type RpcVoteSubscriptions = RwLock<HashMap<SubscriptionId, Sink<RpcVote>>>;
|
||||||
type RpcRootSubscriptions = RwLock<HashMap<SubscriptionId, Sink<Slot>>>;
|
type RpcRootSubscriptions = RwLock<HashMap<SubscriptionId, Sink<Slot>>>;
|
||||||
|
|
||||||
fn add_subscription<K, S>(
|
fn add_subscription<K, S>(
|
||||||
@ -250,6 +262,7 @@ struct Subscriptions {
|
|||||||
program_subscriptions: Arc<RpcProgramSubscriptions>,
|
program_subscriptions: Arc<RpcProgramSubscriptions>,
|
||||||
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
|
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
|
||||||
slot_subscriptions: Arc<RpcSlotSubscriptions>,
|
slot_subscriptions: Arc<RpcSlotSubscriptions>,
|
||||||
|
vote_subscriptions: Arc<RpcVoteSubscriptions>,
|
||||||
root_subscriptions: Arc<RpcRootSubscriptions>,
|
root_subscriptions: Arc<RpcRootSubscriptions>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -286,6 +299,7 @@ impl RpcSubscriptions {
|
|||||||
let program_subscriptions = Arc::new(RpcProgramSubscriptions::default());
|
let program_subscriptions = Arc::new(RpcProgramSubscriptions::default());
|
||||||
let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default());
|
let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default());
|
||||||
let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default());
|
let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default());
|
||||||
|
let vote_subscriptions = Arc::new(RpcVoteSubscriptions::default());
|
||||||
let root_subscriptions = Arc::new(RpcRootSubscriptions::default());
|
let root_subscriptions = Arc::new(RpcRootSubscriptions::default());
|
||||||
let notification_sender = Arc::new(Mutex::new(notification_sender));
|
let notification_sender = Arc::new(Mutex::new(notification_sender));
|
||||||
|
|
||||||
@ -297,6 +311,7 @@ impl RpcSubscriptions {
|
|||||||
program_subscriptions,
|
program_subscriptions,
|
||||||
signature_subscriptions,
|
signature_subscriptions,
|
||||||
slot_subscriptions,
|
slot_subscriptions,
|
||||||
|
vote_subscriptions,
|
||||||
root_subscriptions,
|
root_subscriptions,
|
||||||
};
|
};
|
||||||
let _subscriptions = subscriptions.clone();
|
let _subscriptions = subscriptions.clone();
|
||||||
@ -522,6 +537,21 @@ impl RpcSubscriptions {
|
|||||||
self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root }));
|
self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root }));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn add_vote_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<RpcVote>) {
|
||||||
|
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
||||||
|
let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap();
|
||||||
|
subscriptions.insert(sub_id, sink);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_vote_subscription(&self, id: &SubscriptionId) -> bool {
|
||||||
|
let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap();
|
||||||
|
subscriptions.remove(id).is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn notify_vote(&self, vote: &Vote) {
|
||||||
|
self.enqueue_notification(NotificationEntry::Vote(vote.clone()));
|
||||||
|
}
|
||||||
|
|
||||||
pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<Slot>) {
|
pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<Slot>) {
|
||||||
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
||||||
let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap();
|
let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap();
|
||||||
@ -577,6 +607,19 @@ impl RpcSubscriptions {
|
|||||||
notifier.notify(slot_info, sink);
|
notifier.notify(slot_info, sink);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
NotificationEntry::Vote(ref vote_info) => {
|
||||||
|
let subscriptions = subscriptions.vote_subscriptions.read().unwrap();
|
||||||
|
for (_, sink) in subscriptions.iter() {
|
||||||
|
notifier.notify(
|
||||||
|
RpcVote {
|
||||||
|
slots: vote_info.slots.clone(),
|
||||||
|
hash: bs58::encode(vote_info.hash).into_string(),
|
||||||
|
timestamp: vote_info.timestamp,
|
||||||
|
},
|
||||||
|
sink,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
NotificationEntry::Root(root) => {
|
NotificationEntry::Root(root) => {
|
||||||
let subscriptions = subscriptions.root_subscriptions.read().unwrap();
|
let subscriptions = subscriptions.root_subscriptions.read().unwrap();
|
||||||
for (_, sink) in subscriptions.iter() {
|
for (_, sink) in subscriptions.iter() {
|
||||||
|
@ -8,6 +8,7 @@ use crate::{
|
|||||||
cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker},
|
cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker},
|
||||||
fetch_stage::FetchStage,
|
fetch_stage::FetchStage,
|
||||||
poh_recorder::{PohRecorder, WorkingBankEntry},
|
poh_recorder::{PohRecorder, WorkingBankEntry},
|
||||||
|
rpc_subscriptions::RpcSubscriptions,
|
||||||
sigverify::TransactionSigVerifier,
|
sigverify::TransactionSigVerifier,
|
||||||
sigverify_stage::SigVerifyStage,
|
sigverify_stage::SigVerifyStage,
|
||||||
};
|
};
|
||||||
@ -43,6 +44,7 @@ impl Tpu {
|
|||||||
transactions_sockets: Vec<UdpSocket>,
|
transactions_sockets: Vec<UdpSocket>,
|
||||||
tpu_forwards_sockets: Vec<UdpSocket>,
|
tpu_forwards_sockets: Vec<UdpSocket>,
|
||||||
broadcast_sockets: Vec<UdpSocket>,
|
broadcast_sockets: Vec<UdpSocket>,
|
||||||
|
subscriptions: &Arc<RpcSubscriptions>,
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
broadcast_type: &BroadcastStageType,
|
broadcast_type: &BroadcastStageType,
|
||||||
@ -74,6 +76,7 @@ impl Tpu {
|
|||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
vote_tracker,
|
vote_tracker,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
|
subscriptions.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let banking_stage = BankingStage::new(
|
let banking_stage = BankingStage::new(
|
||||||
|
@ -448,6 +448,7 @@ impl Validator {
|
|||||||
node.sockets.tpu,
|
node.sockets.tpu,
|
||||||
node.sockets.tpu_forwards,
|
node.sockets.tpu_forwards,
|
||||||
node.sockets.broadcast,
|
node.sockets.broadcast,
|
||||||
|
&subscriptions,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
&blockstore,
|
&blockstore,
|
||||||
&config.broadcast_stage_type,
|
&config.broadcast_stage_type,
|
||||||
|
@ -1452,3 +1452,57 @@ Unsubscribe from root notifications
|
|||||||
// Result
|
// Result
|
||||||
{"jsonrpc": "2.0","result": true,"id": 1}
|
{"jsonrpc": "2.0","result": true,"id": 1}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### voteSubscribe
|
||||||
|
|
||||||
|
Subscribe to receive notification anytime a new vote is observed in gossip.
|
||||||
|
These votes are pre-consensus therefore there is no guarantee these votes will
|
||||||
|
enter the ledger.
|
||||||
|
|
||||||
|
#### Parameters:
|
||||||
|
|
||||||
|
None
|
||||||
|
|
||||||
|
#### Results:
|
||||||
|
|
||||||
|
* `integer` - subscription id \(needed to unsubscribe\)
|
||||||
|
|
||||||
|
#### Example:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
// Request
|
||||||
|
{"jsonrpc":"2.0", "id":1, "method":"voteSubscribe"}
|
||||||
|
|
||||||
|
// Result
|
||||||
|
{"jsonrpc": "2.0","result": 0,"id": 1}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Notification Format:
|
||||||
|
|
||||||
|
The result is the latest vote, containing its hash, a list of voted slots, and an optional timestamp.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
{"jsonrpc":"2.0","method":"voteNotification","params":{"result":{"hash":"8Rshv2oMkPu5E4opXTRyuyBeZBqQ4S477VG26wUTFxUM","slots":[1,2],"timestamp":null},"subscription":0}}
|
||||||
|
```
|
||||||
|
|
||||||
|
### voteUnsubscribe
|
||||||
|
|
||||||
|
Unsubscribe from vote notifications
|
||||||
|
|
||||||
|
#### Parameters:
|
||||||
|
|
||||||
|
* `<integer>` - subscription id to cancel
|
||||||
|
|
||||||
|
#### Results:
|
||||||
|
|
||||||
|
* `<bool>` - unsubscribe success message
|
||||||
|
|
||||||
|
#### Example:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
// Request
|
||||||
|
{"jsonrpc":"2.0", "id":1, "method":"voteUnsubscribe", "params":[0]}
|
||||||
|
|
||||||
|
// Result
|
||||||
|
{"jsonrpc": "2.0","result": true,"id": 1}
|
||||||
|
```
|
||||||
|
Reference in New Issue
Block a user