From bfcfbab818162d14dad451932444960b53af6d66 Mon Sep 17 00:00:00 2001 From: Reisen Date: Sun, 17 May 2020 22:01:08 +0100 Subject: [PATCH] Add Vote PubSub endpoint for live gossip votes. (#10045) * Add Vote PubSub endpoint for live gossip votes. * Updated tests for Vote RPC and Vote Listener * Add JSON RPC documentation for Vote RPC. * Base58 encode hash in Vote RPC response. --- core/src/cluster_info_vote_listener.rs | 105 +++++++++++++++---- core/src/rpc_pubsub.rs | 137 ++++++++++++++++++++++++- core/src/rpc_subscriptions.rs | 45 +++++++- core/src/tpu.rs | 3 + core/src/validator.rs | 1 + docs/src/apps/jsonrpc-api.md | 54 ++++++++++ 6 files changed, 324 insertions(+), 21 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 944306dae1..5162ca1a99 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -3,6 +3,7 @@ use crate::{ crds_value::CrdsValueLabel, poh_recorder::PohRecorder, result::{Error, Result}, + rpc_subscriptions::RpcSubscriptions, sigverify, verified_vote_packets::VerifiedVotePackets, }; @@ -202,6 +203,7 @@ impl ClusterInfoVoteListener { poh_recorder: &Arc>, vote_tracker: Arc, bank_forks: Arc>, + subscriptions: Arc, ) -> Self { let exit_ = exit.clone(); @@ -242,6 +244,7 @@ impl ClusterInfoVoteListener { verified_vote_transactions_receiver, vote_tracker, &bank_forks, + subscriptions, ); }) .unwrap(); @@ -364,6 +367,7 @@ impl ClusterInfoVoteListener { vote_txs_receiver: VerifiedVoteTransactionsReceiver, vote_tracker: Arc, bank_forks: &RwLock, + subscriptions: Arc, ) -> Result<()> { loop { if exit.load(Ordering::Relaxed) { @@ -373,9 +377,12 @@ impl ClusterInfoVoteListener { let root_bank = bank_forks.read().unwrap().root_bank().clone(); vote_tracker.process_new_root_bank(&root_bank); - if let Err(e) = - Self::get_and_process_votes(&vote_txs_receiver, &vote_tracker, root_bank.slot()) - { + if let Err(e) = Self::get_and_process_votes( + &vote_txs_receiver, + &vote_tracker, + root_bank.slot(), + subscriptions.clone(), + ) { match e { Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => { return Ok(()); @@ -389,21 +396,37 @@ impl ClusterInfoVoteListener { } } + #[cfg(test)] + pub fn get_and_process_votes_for_tests( + vote_txs_receiver: &VerifiedVoteTransactionsReceiver, + vote_tracker: &Arc, + last_root: Slot, + subscriptions: Arc, + ) -> Result<()> { + Self::get_and_process_votes(vote_txs_receiver, vote_tracker, last_root, subscriptions) + } + fn get_and_process_votes( vote_txs_receiver: &VerifiedVoteTransactionsReceiver, vote_tracker: &Arc, last_root: Slot, + subscriptions: Arc, ) -> Result<()> { let timer = Duration::from_millis(200); let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?; while let Ok(new_txs) = vote_txs_receiver.try_recv() { vote_txs.extend(new_txs); } - Self::process_votes(vote_tracker, vote_txs, last_root); + Self::process_votes(vote_tracker, vote_txs, last_root, subscriptions); Ok(()) } - fn process_votes(vote_tracker: &VoteTracker, vote_txs: Vec, root: Slot) { + fn process_votes( + vote_tracker: &VoteTracker, + vote_txs: Vec, + root: Slot, + subscriptions: Arc, + ) { let mut diff: HashMap>> = HashMap::new(); { let all_slot_trackers = &vote_tracker.slot_vote_trackers; @@ -455,7 +478,7 @@ impl ClusterInfoVoteListener { continue; } - for slot in vote.slots { + for &slot in vote.slots.iter() { if slot <= root { continue; } @@ -480,6 +503,8 @@ impl ClusterInfoVoteListener { .or_default() .insert(unduplicated_pubkey.unwrap()); } + + subscriptions.notify_vote(&vote); } } } @@ -519,6 +544,8 @@ impl ClusterInfoVoteListener { #[cfg(test)] mod tests { use super::*; + use crate::commitment::BlockCommitmentCache; + use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; use solana_perf::packet; use solana_runtime::{ bank::Bank, @@ -615,7 +642,7 @@ mod tests { #[test] fn test_update_new_root() { - let (vote_tracker, bank, _) = setup(); + let (vote_tracker, bank, _, _) = setup(); // Check outdated slots are purged with new root let new_voter = Arc::new(Pubkey::new_rand()); @@ -656,7 +683,7 @@ mod tests { #[test] fn test_update_new_leader_schedule_epoch() { - let (vote_tracker, bank, _) = setup(); + let (vote_tracker, bank, _, _) = setup(); // Check outdated slots are purged with new root let leader_schedule_epoch = bank.get_leader_schedule_epoch(bank.slot()); @@ -698,7 +725,7 @@ mod tests { #[test] fn test_process_votes() { // Create some voters at genesis - let (vote_tracker, _, validator_voting_keypairs) = setup(); + let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); let (votes_sender, votes_receiver) = unbounded(); let vote_slots = vec![1, 2]; @@ -717,7 +744,13 @@ mod tests { }); // Check that all the votes were registered for each validator correctly - ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker, 0).unwrap(); + ClusterInfoVoteListener::get_and_process_votes( + &votes_receiver, + &vote_tracker, + 0, + subscriptions, + ) + .unwrap(); for vote_slot in vote_slots { let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap(); let r_slot_vote_tracker = slot_vote_tracker.read().unwrap(); @@ -736,7 +769,7 @@ mod tests { #[test] fn test_process_votes2() { // Create some voters at genesis - let (vote_tracker, _, validator_voting_keypairs) = setup(); + let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); // Send some votes to process let (votes_sender, votes_receiver) = unbounded(); @@ -760,7 +793,13 @@ mod tests { } // Check that all the votes were registered for each validator correctly - ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker, 0).unwrap(); + ClusterInfoVoteListener::get_and_process_votes( + &votes_receiver, + &vote_tracker, + 0, + subscriptions, + ) + .unwrap(); for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(i as u64 + 1).unwrap(); let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap(); @@ -779,7 +818,7 @@ mod tests { #[test] fn test_get_voters_by_epoch() { // Create some voters at genesis - let (vote_tracker, bank, validator_voting_keypairs) = setup(); + let (vote_tracker, bank, validator_voting_keypairs, _) = setup(); let last_known_epoch = bank.get_leader_schedule_epoch(bank.slot()); let last_known_slot = bank .epoch_schedule() @@ -850,11 +889,23 @@ mod tests { 100, ); let bank = Bank::new(&genesis_config); + let exit = Arc::new(AtomicBool::new(false)); + let bank_forks = BankForks::new(0, bank); + let bank = bank_forks.get(0).unwrap().clone(); + let vote_tracker = VoteTracker::new(&bank); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(bank_forks)), + Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore( + blockstore.clone(), + ))), + )); // Send a vote to process, should add a reference to the pubkey for that voter // in the tracker let validator0_keypairs = &validator_voting_keypairs[0]; - let vote_tracker = VoteTracker::new(&bank); let vote_tx = vec![vote_transaction::new_vote_transaction( // Must vote > root to be processed vec![bank.slot() + 1], @@ -865,7 +916,7 @@ mod tests { &validator0_keypairs.vote_keypair, )]; - ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx, 0); + ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx, 0, subscriptions.clone()); let ref_count = Arc::strong_count( &vote_tracker .keys @@ -915,7 +966,7 @@ mod tests { }) .collect(); - ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0); + ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0, subscriptions); let ref_count = Arc::strong_count( &vote_tracker @@ -929,7 +980,12 @@ mod tests { assert_eq!(ref_count, current_ref_count); } - fn setup() -> (Arc, Arc, Vec) { + fn setup() -> ( + Arc, + Arc, + Vec, + Arc, + ) { let validator_voting_keypairs: Vec<_> = (0..10) .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) .collect(); @@ -941,6 +997,18 @@ mod tests { ); let bank = Bank::new(&genesis_config); let vote_tracker = VoteTracker::new(&bank); + let exit = Arc::new(AtomicBool::new(false)); + let bank_forks = BankForks::new(0, bank); + let bank = bank_forks.get(0).unwrap().clone(); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(bank_forks)), + Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore( + blockstore.clone(), + ))), + )); // Integrity Checks let current_epoch = bank.epoch(); @@ -967,8 +1035,9 @@ mod tests { assert_eq!(*vote_tracker.current_epoch.read().unwrap(), current_epoch); ( Arc::new(vote_tracker), - Arc::new(bank), + bank, validator_voting_keypairs, + subscriptions, ) } diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 235c440462..708836d9cd 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -1,6 +1,6 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::rpc_subscriptions::{RpcSubscriptions, SlotInfo}; +use crate::rpc_subscriptions::{RpcSubscriptions, RpcVote, SlotInfo}; use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; @@ -114,6 +114,18 @@ pub trait RpcSolPubSub { )] fn slot_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; + // Get notification when vote is encountered + #[pubsub(subscription = "voteNotification", subscribe, name = "voteSubscribe")] + fn vote_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber); + + // Unsubscribe from vote notification subscription. + #[pubsub( + subscription = "voteNotification", + unsubscribe, + name = "voteUnsubscribe" + )] + fn vote_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; + // Get notification when a new root is set #[pubsub(subscription = "rootNotification", subscribe, name = "rootSubscribe")] fn root_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber); @@ -295,6 +307,27 @@ impl RpcSolPubSub for RpcSolPubSubImpl { } } + fn vote_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber) { + info!("vote_subscribe"); + let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); + let sub_id = SubscriptionId::Number(id as u64); + info!("vote_subscribe: id={:?}", sub_id); + self.subscriptions.add_vote_subscription(sub_id, subscriber); + } + + fn vote_unsubscribe(&self, _meta: Option, id: SubscriptionId) -> Result { + info!("vote_unsubscribe"); + if self.subscriptions.remove_vote_subscription(&id) { + Ok(true) + } else { + Err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Subscription id does not exist".into(), + data: None, + }) + } + } + fn root_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber) { info!("root_subscribe"); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); @@ -321,9 +354,11 @@ impl RpcSolPubSub for RpcSolPubSubImpl { mod tests { use super::*; use crate::{ + cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker}, commitment::{BlockCommitment, BlockCommitmentCache}, rpc_subscriptions::tests::robust_poll_or_panic, }; + use crossbeam_channel::unbounded; use jsonrpc_core::{futures::sync::mpsc, Response}; use jsonrpc_pubsub::{PubSubHandler, Session}; use serial_test_derive::serial; @@ -333,13 +368,18 @@ mod tests { genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path, }; - use solana_runtime::bank::Bank; + use solana_runtime::{ + bank::Bank, + genesis_utils::{create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs}, + }; use solana_sdk::{ + hash::Hash, pubkey::Pubkey, signature::{Keypair, Signer}, system_program, system_transaction, transaction::{self, Transaction}, }; + use solana_vote_program::vote_transaction; use std::{ collections::HashMap, sync::{atomic::AtomicBool, RwLock}, @@ -831,4 +871,97 @@ mod tests { .slot_unsubscribe(Some(session), SubscriptionId::Number(0)) .is_ok()); } + + #[test] + #[serial] + fn test_vote_subscribe() { + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore(blockstore.clone()), + )); + + let validator_voting_keypairs: Vec<_> = (0..10) + .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) + .collect(); + let GenesisConfigInfo { genesis_config, .. } = + create_genesis_config_with_vote_accounts(10_000, &validator_voting_keypairs, 100); + let exit = Arc::new(AtomicBool::new(false)); + let bank = Bank::new(&genesis_config); + let bank_forks = BankForks::new(0, bank); + let bank = bank_forks.get(0).unwrap().clone(); + let bank_forks = Arc::new(RwLock::new(bank_forks)); + + // Setup RPC + let mut rpc = + RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks.clone()); + let session = create_session(); + let (subscriber, _id_receiver, receiver) = Subscriber::new_test("voteNotification"); + + // Setup Subscriptions + let subscriptions = + RpcSubscriptions::new(&exit, bank_forks.clone(), block_commitment_cache.clone()); + rpc.subscriptions = Arc::new(subscriptions); + rpc.vote_subscribe(session, subscriber); + + // Create some voters at genesis + let vote_tracker = VoteTracker::new(&bank); + let (votes_sender, votes_receiver) = unbounded(); + let (vote_tracker, validator_voting_keypairs) = + (Arc::new(vote_tracker), validator_voting_keypairs); + + let vote_slots = vec![1, 2]; + validator_voting_keypairs.iter().for_each(|keypairs| { + let node_keypair = &keypairs.node_keypair; + let vote_keypair = &keypairs.vote_keypair; + let vote_tx = vote_transaction::new_vote_transaction( + vote_slots.clone(), + Hash::default(), + Hash::default(), + node_keypair, + vote_keypair, + vote_keypair, + ); + votes_sender.send(vec![vote_tx]).unwrap(); + }); + + // Process votes and check they were notified. + ClusterInfoVoteListener::get_and_process_votes_for_tests( + &votes_receiver, + &vote_tracker, + 0, + rpc.subscriptions.clone(), + ) + .unwrap(); + + let (response, _) = robust_poll_or_panic(receiver); + assert_eq!( + response, + r#"{"jsonrpc":"2.0","method":"voteNotification","params":{"result":{"hash":"11111111111111111111111111111111","slots":[1,2],"timestamp":null},"subscription":0}}"# + ); + } + + #[test] + #[serial] + fn test_vote_unsubscribe() { + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks); + let session = create_session(); + let (subscriber, _id_receiver, _) = Subscriber::new_test("voteNotification"); + rpc.vote_subscribe(session, subscriber); + + let session = create_session(); + assert!(rpc + .vote_unsubscribe(Some(session), SubscriptionId::Number(42)) + .is_err()); + + let session = create_session(); + assert!(rpc + .vote_unsubscribe(Some(session), SubscriptionId::Number(0)) + .is_ok()); + } } diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 49b2fece7c..2b4157c71e 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -15,12 +15,13 @@ use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore}; use solana_runtime::bank::Bank; use solana_sdk::{ account::Account, - clock::Slot, + clock::{Slot, UnixTimestamp}, commitment_config::{CommitmentConfig, CommitmentLevel}, pubkey::Pubkey, signature::Signature, transaction, }; +use solana_vote_program::vote_state::Vote; use std::sync::{ atomic::{AtomicBool, Ordering}, mpsc::{Receiver, RecvTimeoutError, SendError, Sender}, @@ -43,8 +44,17 @@ pub struct SlotInfo { pub root: Slot, } +// A more human-friendly version of Vote, with the bank state signature base58 encoded. +#[derive(Serialize, Deserialize, Debug)] +pub struct RpcVote { + pub slots: Vec, + pub hash: String, + pub timestamp: Option, +} + enum NotificationEntry { Slot(SlotInfo), + Vote(Vote), Root(Slot), Bank(Slot), } @@ -53,6 +63,7 @@ impl std::fmt::Debug for NotificationEntry { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { NotificationEntry::Root(root) => write!(f, "Root({})", root), + NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote), NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), NotificationEntry::Bank(current_slot) => { write!(f, "Bank({{current_slot: {:?}}})", current_slot) @@ -74,6 +85,7 @@ type RpcSignatureSubscriptions = RwLock< HashMap>>>, >; type RpcSlotSubscriptions = RwLock>>; +type RpcVoteSubscriptions = RwLock>>; type RpcRootSubscriptions = RwLock>>; fn add_subscription( @@ -250,6 +262,7 @@ struct Subscriptions { program_subscriptions: Arc, signature_subscriptions: Arc, slot_subscriptions: Arc, + vote_subscriptions: Arc, root_subscriptions: Arc, } @@ -286,6 +299,7 @@ impl RpcSubscriptions { let program_subscriptions = Arc::new(RpcProgramSubscriptions::default()); let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default()); let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default()); + let vote_subscriptions = Arc::new(RpcVoteSubscriptions::default()); let root_subscriptions = Arc::new(RpcRootSubscriptions::default()); let notification_sender = Arc::new(Mutex::new(notification_sender)); @@ -297,6 +311,7 @@ impl RpcSubscriptions { program_subscriptions, signature_subscriptions, slot_subscriptions, + vote_subscriptions, root_subscriptions, }; let _subscriptions = subscriptions.clone(); @@ -522,6 +537,21 @@ impl RpcSubscriptions { self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root })); } + pub fn add_vote_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap(); + subscriptions.insert(sub_id, sink); + } + + pub fn remove_vote_subscription(&self, id: &SubscriptionId) -> bool { + let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap(); + subscriptions.remove(id).is_some() + } + + pub fn notify_vote(&self, vote: &Vote) { + self.enqueue_notification(NotificationEntry::Vote(vote.clone())); + } + pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap(); @@ -577,6 +607,19 @@ impl RpcSubscriptions { notifier.notify(slot_info, sink); } } + NotificationEntry::Vote(ref vote_info) => { + let subscriptions = subscriptions.vote_subscriptions.read().unwrap(); + for (_, sink) in subscriptions.iter() { + notifier.notify( + RpcVote { + slots: vote_info.slots.clone(), + hash: bs58::encode(vote_info.hash).into_string(), + timestamp: vote_info.timestamp, + }, + sink, + ); + } + } NotificationEntry::Root(root) => { let subscriptions = subscriptions.root_subscriptions.read().unwrap(); for (_, sink) in subscriptions.iter() { diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 35699d963c..6f77824fea 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -8,6 +8,7 @@ use crate::{ cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker}, fetch_stage::FetchStage, poh_recorder::{PohRecorder, WorkingBankEntry}, + rpc_subscriptions::RpcSubscriptions, sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, }; @@ -43,6 +44,7 @@ impl Tpu { transactions_sockets: Vec, tpu_forwards_sockets: Vec, broadcast_sockets: Vec, + subscriptions: &Arc, transaction_status_sender: Option, blockstore: &Arc, broadcast_type: &BroadcastStageType, @@ -74,6 +76,7 @@ impl Tpu { &poh_recorder, vote_tracker, bank_forks, + subscriptions.clone(), ); let banking_stage = BankingStage::new( diff --git a/core/src/validator.rs b/core/src/validator.rs index c4ab4886d9..25064683fd 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -448,6 +448,7 @@ impl Validator { node.sockets.tpu, node.sockets.tpu_forwards, node.sockets.broadcast, + &subscriptions, transaction_status_sender, &blockstore, &config.broadcast_stage_type, diff --git a/docs/src/apps/jsonrpc-api.md b/docs/src/apps/jsonrpc-api.md index 938ee88d7f..a375fb73ef 100644 --- a/docs/src/apps/jsonrpc-api.md +++ b/docs/src/apps/jsonrpc-api.md @@ -1452,3 +1452,57 @@ Unsubscribe from root notifications // Result {"jsonrpc": "2.0","result": true,"id": 1} ``` + +### voteSubscribe + +Subscribe to receive notification anytime a new vote is observed in gossip. +These votes are pre-consensus therefore there is no guarantee these votes will +enter the ledger. + +#### Parameters: + +None + +#### Results: + +* `integer` - subscription id \(needed to unsubscribe\) + +#### Example: + +```bash +// Request +{"jsonrpc":"2.0", "id":1, "method":"voteSubscribe"} + +// Result +{"jsonrpc": "2.0","result": 0,"id": 1} +``` + +#### Notification Format: + +The result is the latest vote, containing its hash, a list of voted slots, and an optional timestamp. + +```bash +{"jsonrpc":"2.0","method":"voteNotification","params":{"result":{"hash":"8Rshv2oMkPu5E4opXTRyuyBeZBqQ4S477VG26wUTFxUM","slots":[1,2],"timestamp":null},"subscription":0}} +``` + +### voteUnsubscribe + +Unsubscribe from vote notifications + +#### Parameters: + +* `` - subscription id to cancel + +#### Results: + +* `` - unsubscribe success message + +#### Example: + +```bash +// Request +{"jsonrpc":"2.0", "id":1, "method":"voteUnsubscribe", "params":[0]} + +// Result +{"jsonrpc": "2.0","result": true,"id": 1} +```