diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index e35ce8778a..4d6debcda7 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,8 +1,10 @@ use crate::{ cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, + consensus::VOTE_THRESHOLD_SIZE, crds_value::CrdsValueLabel, poh_recorder::PohRecorder, result::{Error, Result}, + rpc_subscriptions::RpcSubscriptions, sigverify, verified_vote_packets::VerifiedVotePackets, }; @@ -14,7 +16,10 @@ use log::*; use solana_ledger::bank_forks::BankForks; use solana_metrics::inc_new_counter_debug; use solana_perf::packet::{self, Packets}; -use solana_runtime::{bank::Bank, epoch_stakes::EpochAuthorizedVoters}; +use solana_runtime::{ + bank::Bank, + epoch_stakes::{EpochAuthorizedVoters, EpochStakes}, +}; use solana_sdk::{ clock::{Epoch, Slot}, epoch_schedule::EpochSchedule, @@ -43,6 +48,7 @@ pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver>; pub struct SlotVoteTracker { voted: HashSet>, updates: Option>>, + total_stake: u64, } impl SlotVoteTracker { @@ -203,6 +209,7 @@ impl ClusterInfoVoteListener { poh_recorder: &Arc>, vote_tracker: Arc, bank_forks: Arc>, + subscriptions: Arc, ) -> Self { let exit_ = exit.clone(); @@ -244,6 +251,7 @@ impl ClusterInfoVoteListener { verified_vote_transactions_receiver, vote_tracker, &bank_forks, + subscriptions, ); }) .unwrap(); @@ -372,6 +380,7 @@ impl ClusterInfoVoteListener { vote_txs_receiver: VerifiedVoteTransactionsReceiver, vote_tracker: Arc, bank_forks: &RwLock, + subscriptions: Arc, ) -> Result<()> { loop { if exit.load(Ordering::Relaxed) { @@ -380,10 +389,15 @@ impl ClusterInfoVoteListener { let root_bank = bank_forks.read().unwrap().root_bank().clone(); vote_tracker.process_new_root_bank(&root_bank); + let epoch_stakes = root_bank.epoch_stakes(root_bank.epoch()); - if let Err(e) = - Self::get_and_process_votes(&vote_txs_receiver, &vote_tracker, root_bank.slot()) - { + if let Err(e) = Self::get_and_process_votes( + &vote_txs_receiver, + &vote_tracker, + root_bank.slot(), + subscriptions.clone(), + epoch_stakes, + ) { match e { Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => { return Ok(()); @@ -397,21 +411,51 @@ impl ClusterInfoVoteListener { } } + #[cfg(test)] + pub fn get_and_process_votes_for_tests( + vote_txs_receiver: &VerifiedVoteTransactionsReceiver, + vote_tracker: &Arc, + last_root: Slot, + subscriptions: Arc, + ) -> Result<()> { + Self::get_and_process_votes( + vote_txs_receiver, + vote_tracker, + last_root, + subscriptions, + None, + ) + } + fn get_and_process_votes( vote_txs_receiver: &VerifiedVoteTransactionsReceiver, vote_tracker: &Arc, last_root: Slot, + subscriptions: Arc, + epoch_stakes: Option<&EpochStakes>, ) -> Result<()> { let timer = Duration::from_millis(200); let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?; while let Ok(new_txs) = vote_txs_receiver.try_recv() { vote_txs.extend(new_txs); } - Self::process_votes(vote_tracker, vote_txs, last_root); + Self::process_votes( + vote_tracker, + vote_txs, + last_root, + subscriptions, + epoch_stakes, + ); Ok(()) } - fn process_votes(vote_tracker: &VoteTracker, vote_txs: Vec, root: Slot) { + fn process_votes( + vote_tracker: &VoteTracker, + vote_txs: Vec, + root: Slot, + subscriptions: Arc, + epoch_stakes: Option<&EpochStakes>, + ) { let mut diff: HashMap>> = HashMap::new(); { let all_slot_trackers = &vote_tracker.slot_vote_trackers; @@ -463,7 +507,7 @@ impl ClusterInfoVoteListener { continue; } - for slot in vote.slots { + for &slot in vote.slots.iter() { if slot <= root { continue; } @@ -488,6 +532,8 @@ impl ClusterInfoVoteListener { .or_default() .insert(unduplicated_pubkey.unwrap()); } + + subscriptions.notify_vote(&vote); } } } @@ -504,15 +550,35 @@ impl ClusterInfoVoteListener { if w_slot_tracker.updates.is_none() { w_slot_tracker.updates = Some(vec![]); } - for pk in slot_diff { - w_slot_tracker.voted.insert(pk.clone()); - w_slot_tracker.updates.as_mut().unwrap().push(pk); + let mut current_stake = 0; + for pubkey in slot_diff { + Self::sum_stake(&mut current_stake, epoch_stakes, &pubkey); + + w_slot_tracker.voted.insert(pubkey.clone()); + w_slot_tracker.updates.as_mut().unwrap().push(pubkey); } + Self::notify_for_stake_change( + current_stake, + w_slot_tracker.total_stake, + &subscriptions, + epoch_stakes, + slot, + ); + w_slot_tracker.total_stake += current_stake; } else { - let voted: HashSet<_> = slot_diff.into_iter().collect(); + let mut total_stake = 0; + let voted: HashSet<_> = slot_diff + .into_iter() + .map(|pubkey| { + Self::sum_stake(&mut total_stake, epoch_stakes, &pubkey); + pubkey + }) + .collect(); + Self::notify_for_stake_change(total_stake, 0, &subscriptions, epoch_stakes, slot); let new_slot_tracker = SlotVoteTracker { voted: voted.clone(), updates: Some(voted.into_iter().collect()), + total_stake, }; vote_tracker .slot_vote_trackers @@ -522,11 +588,38 @@ impl ClusterInfoVoteListener { } } } + + fn notify_for_stake_change( + current_stake: u64, + previous_stake: u64, + subscriptions: &Arc, + epoch_stakes: Option<&EpochStakes>, + slot: Slot, + ) { + if let Some(stakes) = epoch_stakes { + let supermajority_stake = (stakes.total_stake() as f64 * VOTE_THRESHOLD_SIZE) as u64; + if previous_stake < supermajority_stake + && (previous_stake + current_stake) > supermajority_stake + { + subscriptions.notify_gossip_subscribers(slot); + } + } + } + + fn sum_stake(sum: &mut u64, epoch_stakes: Option<&EpochStakes>, pubkey: &Pubkey) { + if let Some(stakes) = epoch_stakes { + if let Some(vote_account) = stakes.stakes().vote_accounts().get(pubkey) { + *sum += vote_account.0; + } + } + } } #[cfg(test)] mod tests { use super::*; + use crate::commitment::BlockCommitmentCache; + use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; use solana_perf::packet; use solana_runtime::{ bank::Bank, @@ -623,7 +716,7 @@ mod tests { #[test] fn test_update_new_root() { - let (vote_tracker, bank, _) = setup(); + let (vote_tracker, bank, _, _) = setup(); // Check outdated slots are purged with new root let new_voter = Arc::new(Pubkey::new_rand()); @@ -664,7 +757,7 @@ mod tests { #[test] fn test_update_new_leader_schedule_epoch() { - let (vote_tracker, bank, _) = setup(); + let (vote_tracker, bank, _, _) = setup(); // Check outdated slots are purged with new root let leader_schedule_epoch = bank.get_leader_schedule_epoch(bank.slot()); @@ -706,7 +799,7 @@ mod tests { #[test] fn test_process_votes() { // Create some voters at genesis - let (vote_tracker, _, validator_voting_keypairs) = setup(); + let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); let (votes_sender, votes_receiver) = unbounded(); let vote_slots = vec![1, 2]; @@ -725,7 +818,14 @@ mod tests { }); // Check that all the votes were registered for each validator correctly - ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker, 0).unwrap(); + ClusterInfoVoteListener::get_and_process_votes( + &votes_receiver, + &vote_tracker, + 0, + subscriptions, + None, + ) + .unwrap(); for vote_slot in vote_slots { let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap(); let r_slot_vote_tracker = slot_vote_tracker.read().unwrap(); @@ -744,7 +844,7 @@ mod tests { #[test] fn test_process_votes2() { // Create some voters at genesis - let (vote_tracker, _, validator_voting_keypairs) = setup(); + let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); // Send some votes to process let (votes_sender, votes_receiver) = unbounded(); @@ -769,7 +869,14 @@ mod tests { } // Check that all the votes were registered for each validator correctly - ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker, 0).unwrap(); + ClusterInfoVoteListener::get_and_process_votes( + &votes_receiver, + &vote_tracker, + 0, + subscriptions, + None, + ) + .unwrap(); for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(i as u64 + 1).unwrap(); let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap(); @@ -788,7 +895,7 @@ mod tests { #[test] fn test_get_voters_by_epoch() { // Create some voters at genesis - let (vote_tracker, bank, validator_voting_keypairs) = setup(); + let (vote_tracker, bank, validator_voting_keypairs, _) = setup(); let last_known_epoch = bank.get_leader_schedule_epoch(bank.slot()); let last_known_slot = bank .epoch_schedule() @@ -859,11 +966,23 @@ mod tests { 100, ); let bank = Bank::new(&genesis_config); + let exit = Arc::new(AtomicBool::new(false)); + let bank_forks = BankForks::new(0, bank); + let bank = bank_forks.get(0).unwrap().clone(); + let vote_tracker = VoteTracker::new(&bank); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(bank_forks)), + Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore( + blockstore.clone(), + ))), + )); // Send a vote to process, should add a reference to the pubkey for that voter // in the tracker let validator0_keypairs = &validator_voting_keypairs[0]; - let vote_tracker = VoteTracker::new(&bank); let vote_tx = vec![vote_transaction::new_vote_transaction( // Must vote > root to be processed vec![bank.slot() + 1], @@ -874,7 +993,13 @@ mod tests { &validator0_keypairs.vote_keypair, )]; - ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx, 0); + ClusterInfoVoteListener::process_votes( + &vote_tracker, + vote_tx, + 0, + subscriptions.clone(), + None, + ); let ref_count = Arc::strong_count( &vote_tracker .keys @@ -924,7 +1049,7 @@ mod tests { }) .collect(); - ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0); + ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0, subscriptions, None); let ref_count = Arc::strong_count( &vote_tracker @@ -938,7 +1063,12 @@ mod tests { assert_eq!(ref_count, current_ref_count); } - fn setup() -> (Arc, Arc, Vec) { + fn setup() -> ( + Arc, + Arc, + Vec, + Arc, + ) { let validator_voting_keypairs: Vec<_> = (0..10) .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) .collect(); @@ -950,6 +1080,18 @@ mod tests { ); let bank = Bank::new(&genesis_config); let vote_tracker = VoteTracker::new(&bank); + let exit = Arc::new(AtomicBool::new(false)); + let bank_forks = BankForks::new(0, bank); + let bank = bank_forks.get(0).unwrap().clone(); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(bank_forks)), + Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore( + blockstore.clone(), + ))), + )); // Integrity Checks let current_epoch = bank.epoch(); @@ -976,8 +1118,9 @@ mod tests { assert_eq!(*vote_tracker.current_epoch.read().unwrap(), current_epoch); ( Arc::new(vote_tracker), - Arc::new(bank), + bank, validator_voting_keypairs, + subscriptions, ) } diff --git a/core/src/rpc.rs b/core/src/rpc.rs index b3c15e11a4..d59903e03e 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -93,7 +93,10 @@ impl JsonRpcRequestProcessor { debug!("RPC using node root: {:?}", slot); Ok(r_bank_forks.get(slot).cloned().unwrap()) } - Some(commitment_config) if commitment_config.commitment == CommitmentLevel::Single => { + Some(commitment_config) + if commitment_config.commitment == CommitmentLevel::Single + || commitment_config.commitment == CommitmentLevel::SingleGossip => + { let slot = self .block_commitment_cache .read() diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 92dbbd1619..7cb7662d12 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -1,6 +1,6 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::rpc_subscriptions::{RpcSubscriptions, SlotInfo}; +use crate::rpc_subscriptions::{RpcSubscriptions, RpcVote, SlotInfo}; use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; @@ -114,6 +114,18 @@ pub trait RpcSolPubSub { )] fn slot_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; + // Get notification when vote is encountered + #[pubsub(subscription = "voteNotification", subscribe, name = "voteSubscribe")] + fn vote_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber); + + // Unsubscribe from vote notification subscription. + #[pubsub( + subscription = "voteNotification", + unsubscribe, + name = "voteUnsubscribe" + )] + fn vote_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; + // Get notification when a new root is set #[pubsub(subscription = "rootNotification", subscribe, name = "rootSubscribe")] fn root_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber); @@ -295,6 +307,27 @@ impl RpcSolPubSub for RpcSolPubSubImpl { } } + fn vote_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber) { + info!("vote_subscribe"); + let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); + let sub_id = SubscriptionId::Number(id as u64); + info!("vote_subscribe: id={:?}", sub_id); + self.subscriptions.add_vote_subscription(sub_id, subscriber); + } + + fn vote_unsubscribe(&self, _meta: Option, id: SubscriptionId) -> Result { + info!("vote_unsubscribe"); + if self.subscriptions.remove_vote_subscription(&id) { + Ok(true) + } else { + Err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Subscription id does not exist".into(), + data: None, + }) + } + } + fn root_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber) { info!("root_subscribe"); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); @@ -321,9 +354,11 @@ impl RpcSolPubSub for RpcSolPubSubImpl { mod tests { use super::*; use crate::{ + cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker}, commitment::{BlockCommitmentCache, CacheSlotInfo}, rpc_subscriptions::tests::robust_poll_or_panic, }; + use crossbeam_channel::unbounded; use jsonrpc_core::{futures::sync::mpsc, Response}; use jsonrpc_pubsub::{PubSubHandler, Session}; use serial_test_derive::serial; @@ -333,13 +368,18 @@ mod tests { genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path, }; - use solana_runtime::bank::Bank; + use solana_runtime::{ + bank::Bank, + genesis_utils::{create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs}, + }; use solana_sdk::{ + hash::Hash, pubkey::Pubkey, signature::{Keypair, Signer}, system_program, system_transaction, transaction::{self, Transaction}, }; + use solana_vote_program::vote_transaction; use std::{ sync::{atomic::AtomicBool, RwLock}, thread::sleep, @@ -519,7 +559,7 @@ mod tests { session, subscriber, contract_state.pubkey().to_string(), - None, + Some(CommitmentConfig::recent()), ); let tx = system_transaction::transfer(&alice, &contract_funds.pubkey(), 51, blockhash); @@ -836,4 +876,97 @@ mod tests { .slot_unsubscribe(Some(session), SubscriptionId::Number(0)) .is_ok()); } + + #[test] + #[serial] + fn test_vote_subscribe() { + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore(blockstore.clone()), + )); + + let validator_voting_keypairs: Vec<_> = (0..10) + .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) + .collect(); + let GenesisConfigInfo { genesis_config, .. } = + create_genesis_config_with_vote_accounts(10_000, &validator_voting_keypairs, 100); + let exit = Arc::new(AtomicBool::new(false)); + let bank = Bank::new(&genesis_config); + let bank_forks = BankForks::new(0, bank); + let bank = bank_forks.get(0).unwrap().clone(); + let bank_forks = Arc::new(RwLock::new(bank_forks)); + + // Setup RPC + let mut rpc = + RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks.clone()); + let session = create_session(); + let (subscriber, _id_receiver, receiver) = Subscriber::new_test("voteNotification"); + + // Setup Subscriptions + let subscriptions = + RpcSubscriptions::new(&exit, bank_forks.clone(), block_commitment_cache.clone()); + rpc.subscriptions = Arc::new(subscriptions); + rpc.vote_subscribe(session, subscriber); + + // Create some voters at genesis + let vote_tracker = VoteTracker::new(&bank); + let (votes_sender, votes_receiver) = unbounded(); + let (vote_tracker, validator_voting_keypairs) = + (Arc::new(vote_tracker), validator_voting_keypairs); + + let vote_slots = vec![1, 2]; + validator_voting_keypairs.iter().for_each(|keypairs| { + let node_keypair = &keypairs.node_keypair; + let vote_keypair = &keypairs.vote_keypair; + let vote_tx = vote_transaction::new_vote_transaction( + vote_slots.clone(), + Hash::default(), + Hash::default(), + node_keypair, + vote_keypair, + vote_keypair, + ); + votes_sender.send(vec![vote_tx]).unwrap(); + }); + + // Process votes and check they were notified. + ClusterInfoVoteListener::get_and_process_votes_for_tests( + &votes_receiver, + &vote_tracker, + 0, + rpc.subscriptions.clone(), + ) + .unwrap(); + + let (response, _) = robust_poll_or_panic(receiver); + assert_eq!( + response, + r#"{"jsonrpc":"2.0","method":"voteNotification","params":{"result":{"hash":"11111111111111111111111111111111","slots":[1,2],"timestamp":null},"subscription":0}}"# + ); + } + + #[test] + #[serial] + fn test_vote_unsubscribe() { + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks); + let session = create_session(); + let (subscriber, _id_receiver, _) = Subscriber::new_test("voteNotification"); + rpc.vote_subscribe(session, subscriber); + + let session = create_session(); + assert!(rpc + .vote_unsubscribe(Some(session), SubscriptionId::Number(42)) + .is_err()); + + let session = create_session(); + assert!(rpc + .vote_unsubscribe(Some(session), SubscriptionId::Number(0)) + .is_ok()); + } } diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 242639cf1b..8d3c40b5a2 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -15,12 +15,13 @@ use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore}; use solana_runtime::bank::Bank; use solana_sdk::{ account::Account, - clock::Slot, + clock::{Slot, UnixTimestamp}, commitment_config::{CommitmentConfig, CommitmentLevel}, pubkey::Pubkey, signature::Signature, transaction, }; +use solana_vote_program::vote_state::Vote; use std::sync::{ atomic::{AtomicBool, Ordering}, mpsc::{Receiver, RecvTimeoutError, SendError, Sender}, @@ -43,22 +44,34 @@ pub struct SlotInfo { pub root: Slot, } +// A more human-friendly version of Vote, with the bank state signature base58 encoded. +#[derive(Serialize, Deserialize, Debug)] +pub struct RpcVote { + pub slots: Vec, + pub hash: String, + pub timestamp: Option, +} + enum NotificationEntry { Slot(SlotInfo), + Vote(Vote), Root(Slot), Bank(CacheSlotInfo), + Gossip(Slot), } impl std::fmt::Debug for NotificationEntry { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { NotificationEntry::Root(root) => write!(f, "Root({})", root), + NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote), NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), NotificationEntry::Bank(cache_slot_info) => write!( f, "Bank({{current_slot: {:?}}})", cache_slot_info.current_slot ), + NotificationEntry::Gossip(slot) => write!(f, "Gossip({:?})", slot), } } } @@ -76,6 +89,7 @@ type RpcSignatureSubscriptions = RwLock< HashMap>>>, >; type RpcSlotSubscriptions = RwLock>>; +type RpcVoteSubscriptions = RwLock>>; type RpcRootSubscriptions = RwLock>>; fn add_subscription( @@ -90,7 +104,7 @@ fn add_subscription( S: Clone, { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let commitment = commitment.unwrap_or_else(CommitmentConfig::recent); + let commitment = commitment.unwrap_or_else(CommitmentConfig::single); let subscription_data = SubscriptionData { sink, commitment, @@ -159,7 +173,9 @@ where CommitmentLevel::Max => cache_slot_info.largest_confirmed_root, CommitmentLevel::Recent => cache_slot_info.current_slot, CommitmentLevel::Root => cache_slot_info.node_root, - CommitmentLevel::Single => cache_slot_info.highest_confirmed_slot, + CommitmentLevel::Single | CommitmentLevel::SingleGossip => { + cache_slot_info.highest_confirmed_slot + } }; let results = { let bank_forks = bank_forks.read().unwrap(); @@ -246,7 +262,11 @@ struct Subscriptions { account_subscriptions: Arc, program_subscriptions: Arc, signature_subscriptions: Arc, + gossip_account_subscriptions: Arc, + gossip_program_subscriptions: Arc, + gossip_signature_subscriptions: Arc, slot_subscriptions: Arc, + vote_subscriptions: Arc, root_subscriptions: Arc, } @@ -257,6 +277,7 @@ pub struct RpcSubscriptions { notifier_runtime: Option, bank_forks: Arc>, block_commitment_cache: Arc>, + last_checked_slots: Arc>>, exit: Arc, } @@ -282,7 +303,11 @@ impl RpcSubscriptions { let account_subscriptions = Arc::new(RpcAccountSubscriptions::default()); let program_subscriptions = Arc::new(RpcProgramSubscriptions::default()); let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default()); + let gossip_account_subscriptions = Arc::new(RpcAccountSubscriptions::default()); + let gossip_program_subscriptions = Arc::new(RpcProgramSubscriptions::default()); + let gossip_signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default()); let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default()); + let vote_subscriptions = Arc::new(RpcVoteSubscriptions::default()); let root_subscriptions = Arc::new(RpcRootSubscriptions::default()); let notification_sender = Arc::new(Mutex::new(notification_sender)); @@ -293,11 +318,18 @@ impl RpcSubscriptions { account_subscriptions, program_subscriptions, signature_subscriptions, + gossip_account_subscriptions, + gossip_program_subscriptions, + gossip_signature_subscriptions, slot_subscriptions, + vote_subscriptions, root_subscriptions, }; let _subscriptions = subscriptions.clone(); + let last_checked_slots = Arc::new(RwLock::new(HashMap::new())); + let _last_checked_slots = last_checked_slots.clone(); + let notifier_runtime = RuntimeBuilder::new() .core_threads(1) .name_prefix("solana-rpc-notifier-") @@ -314,6 +346,7 @@ impl RpcSubscriptions { notification_receiver, _subscriptions, _bank_forks, + _last_checked_slots, ); }) .unwrap(); @@ -325,6 +358,7 @@ impl RpcSubscriptions { t_cleanup: Some(t_cleanup), bank_forks, block_commitment_cache, + last_checked_slots, exit: exit.clone(), } } @@ -412,11 +446,10 @@ impl RpcSubscriptions { sub_id: SubscriptionId, subscriber: Subscriber>, ) { - let mut subscriptions = self.subscriptions.account_subscriptions.write().unwrap(); - let slot = match commitment - .unwrap_or_else(CommitmentConfig::recent) - .commitment - { + let commitment_level = commitment + .unwrap_or_else(CommitmentConfig::single) + .commitment; + let slot = match commitment_level { CommitmentLevel::Max => self .block_commitment_cache .read() @@ -429,6 +462,12 @@ impl RpcSubscriptions { .read() .unwrap() .highest_confirmed_slot(), + CommitmentLevel::SingleGossip => *self + .last_checked_slots + .read() + .unwrap() + .get(&CommitmentLevel::SingleGossip) + .unwrap_or(&0), }; let last_notified_slot = if let Some((_account, slot)) = self .bank_forks @@ -441,6 +480,15 @@ impl RpcSubscriptions { } else { 0 }; + + let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip { + self.subscriptions + .gossip_account_subscriptions + .write() + .unwrap() + } else { + self.subscriptions.account_subscriptions.write().unwrap() + }; add_subscription( &mut subscriptions, pubkey, @@ -453,7 +501,16 @@ impl RpcSubscriptions { pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.subscriptions.account_subscriptions.write().unwrap(); - remove_subscription(&mut subscriptions, id) + if remove_subscription(&mut subscriptions, id) { + true + } else { + let mut subscriptions = self + .subscriptions + .gossip_account_subscriptions + .write() + .unwrap(); + remove_subscription(&mut subscriptions, id) + } } pub fn add_program_subscription( @@ -463,7 +520,17 @@ impl RpcSubscriptions { sub_id: SubscriptionId, subscriber: Subscriber>, ) { - let mut subscriptions = self.subscriptions.program_subscriptions.write().unwrap(); + let commitment_level = commitment + .unwrap_or_else(CommitmentConfig::recent) + .commitment; + let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip { + self.subscriptions + .gossip_program_subscriptions + .write() + .unwrap() + } else { + self.subscriptions.program_subscriptions.write().unwrap() + }; add_subscription( &mut subscriptions, program_id, @@ -476,7 +543,16 @@ impl RpcSubscriptions { pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.subscriptions.program_subscriptions.write().unwrap(); - remove_subscription(&mut subscriptions, id) + if remove_subscription(&mut subscriptions, id) { + true + } else { + let mut subscriptions = self + .subscriptions + .gossip_program_subscriptions + .write() + .unwrap(); + remove_subscription(&mut subscriptions, id) + } } pub fn add_signature_subscription( @@ -486,7 +562,17 @@ impl RpcSubscriptions { sub_id: SubscriptionId, subscriber: Subscriber>, ) { - let mut subscriptions = self.subscriptions.signature_subscriptions.write().unwrap(); + let commitment_level = commitment + .unwrap_or_else(CommitmentConfig::recent) + .commitment; + let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip { + self.subscriptions + .gossip_signature_subscriptions + .write() + .unwrap() + } else { + self.subscriptions.signature_subscriptions.write().unwrap() + }; add_subscription( &mut subscriptions, signature, @@ -499,7 +585,16 @@ impl RpcSubscriptions { pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.subscriptions.signature_subscriptions.write().unwrap(); - remove_subscription(&mut subscriptions, id) + if remove_subscription(&mut subscriptions, id) { + true + } else { + let mut subscriptions = self + .subscriptions + .gossip_signature_subscriptions + .write() + .unwrap(); + remove_subscription(&mut subscriptions, id) + } } /// Notify subscribers of changes to any accounts or new signatures since @@ -508,6 +603,12 @@ impl RpcSubscriptions { self.enqueue_notification(NotificationEntry::Bank(cache_slot_info)); } + /// Notify SingleGossip commitment-level subscribers of changes to any accounts or new + /// signatures. + pub fn notify_gossip_subscribers(&self, slot: Slot) { + self.enqueue_notification(NotificationEntry::Gossip(slot)); + } + pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let mut subscriptions = self.subscriptions.slot_subscriptions.write().unwrap(); @@ -523,6 +624,21 @@ impl RpcSubscriptions { self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root })); } + pub fn add_vote_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap(); + subscriptions.insert(sub_id, sink); + } + + pub fn remove_vote_subscription(&self, id: &SubscriptionId) -> bool { + let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap(); + subscriptions.remove(id).is_some() + } + + pub fn notify_vote(&self, vote: &Vote) { + self.enqueue_notification(NotificationEntry::Vote(vote.clone())); + } + pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap(); @@ -564,6 +680,7 @@ impl RpcSubscriptions { notification_receiver: Receiver, subscriptions: Subscriptions, bank_forks: Arc>, + last_checked_slots: Arc>>, ) { loop { if exit.load(Ordering::Relaxed) { @@ -577,6 +694,19 @@ impl RpcSubscriptions { notifier.notify(slot_info, sink); } } + NotificationEntry::Vote(ref vote_info) => { + let subscriptions = subscriptions.vote_subscriptions.read().unwrap(); + for (_, sink) in subscriptions.iter() { + notifier.notify( + RpcVote { + slots: vote_info.slots.clone(), + hash: bs58::encode(vote_info.hash).into_string(), + timestamp: vote_info.timestamp, + }, + sink, + ); + } + } NotificationEntry::Root(root) => { let subscriptions = subscriptions.root_subscriptions.read().unwrap(); for (_, sink) in subscriptions.iter() { @@ -584,47 +714,32 @@ impl RpcSubscriptions { } } NotificationEntry::Bank(cache_slot_info) => { - let pubkeys: Vec<_> = { - let subs = subscriptions.account_subscriptions.read().unwrap(); - subs.keys().cloned().collect() + RpcSubscriptions::notify_accounts_programs_signatures( + &subscriptions.account_subscriptions, + &subscriptions.program_subscriptions, + &subscriptions.signature_subscriptions, + &bank_forks, + &cache_slot_info, + ¬ifier, + ) + } + NotificationEntry::Gossip(slot) => { + let _ = last_checked_slots + .write() + .unwrap() + .insert(CommitmentLevel::SingleGossip, slot); + let cache_slot_info = CacheSlotInfo { + highest_confirmed_slot: slot, + ..CacheSlotInfo::default() }; - for pubkey in &pubkeys { - Self::check_account( - pubkey, - &bank_forks, - subscriptions.account_subscriptions.clone(), - ¬ifier, - &cache_slot_info, - ); - } - - let programs: Vec<_> = { - let subs = subscriptions.program_subscriptions.read().unwrap(); - subs.keys().cloned().collect() - }; - for program_id in &programs { - Self::check_program( - program_id, - &bank_forks, - subscriptions.program_subscriptions.clone(), - ¬ifier, - &cache_slot_info, - ); - } - - let signatures: Vec<_> = { - let subs = subscriptions.signature_subscriptions.read().unwrap(); - subs.keys().cloned().collect() - }; - for signature in &signatures { - Self::check_signature( - signature, - &bank_forks, - subscriptions.signature_subscriptions.clone(), - ¬ifier, - &cache_slot_info, - ); - } + RpcSubscriptions::notify_accounts_programs_signatures( + &subscriptions.gossip_account_subscriptions, + &subscriptions.gossip_program_subscriptions, + &subscriptions.gossip_signature_subscriptions, + &bank_forks, + &cache_slot_info, + ¬ifier, + ) } }, Err(RecvTimeoutError::Timeout) => { @@ -638,6 +753,57 @@ impl RpcSubscriptions { } } + fn notify_accounts_programs_signatures( + account_subscriptions: &Arc, + program_subscriptions: &Arc, + signature_subscriptions: &Arc, + bank_forks: &Arc>, + cache_slot_info: &CacheSlotInfo, + notifier: &RpcNotifier, + ) { + let pubkeys: Vec<_> = { + let subs = account_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for pubkey in &pubkeys { + Self::check_account( + pubkey, + &bank_forks, + account_subscriptions.clone(), + ¬ifier, + &cache_slot_info, + ); + } + + let programs: Vec<_> = { + let subs = program_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for program_id in &programs { + Self::check_program( + program_id, + &bank_forks, + program_subscriptions.clone(), + ¬ifier, + &cache_slot_info, + ); + } + + let signatures: Vec<_> = { + let subs = signature_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for signature in &signatures { + Self::check_signature( + signature, + &bank_forks, + signature_subscriptions.clone(), + ¬ifier, + &cache_slot_info, + ); + } + } + fn shutdown(&mut self) -> std::thread::Result<()> { if let Some(runtime) = self.notifier_runtime.take() { info!("RPC Notifier runtime - shutting down"); @@ -736,7 +902,12 @@ pub(crate) mod tests { ), )), ); - subscriptions.add_account_subscription(alice.pubkey(), None, sub_id.clone(), subscriber); + subscriptions.add_account_subscription( + alice.pubkey(), + Some(CommitmentConfig::recent()), + sub_id.clone(), + subscriber, + ); assert!(subscriptions .subscriptions @@ -1185,4 +1356,135 @@ pub(crate) mod tests { assert_eq!(subscriptions.len(), (num_keys - 1) as usize); assert!(subscriptions.get(&0).is_none()); } + + #[test] + #[serial] + fn test_gossip_separate_account_notifications() { + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(100); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let bank = Bank::new(&genesis_config); + let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank_forks.write().unwrap().insert(bank1); + let alice = Keypair::new(); + + let (subscriber0, _id_receiver, transport_receiver0) = + Subscriber::new_test("accountNotification"); + let (subscriber1, _id_receiver, transport_receiver1) = + Subscriber::new_test("accountNotification"); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = RpcSubscriptions::new( + &exit, + bank_forks.clone(), + Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore_bank( + blockstore, + bank_forks.read().unwrap().get(1).unwrap().clone(), + 1, + ), + )), + ); + let sub_id0 = SubscriptionId::Number(0 as u64); + subscriptions.add_account_subscription( + alice.pubkey(), + Some(CommitmentConfig::single_gossip()), + sub_id0.clone(), + subscriber0, + ); + let sub_id1 = SubscriptionId::Number(1 as u64); + subscriptions.add_account_subscription( + alice.pubkey(), + Some(CommitmentConfig::recent()), + sub_id1.clone(), + subscriber1, + ); + + assert!(subscriptions + .subscriptions + .account_subscriptions + .read() + .unwrap() + .contains_key(&alice.pubkey())); + + let tx = system_transaction::create_account( + &mint_keypair, + &alice, + blockhash, + 1, + 16, + &solana_budget_program::id(), + ); + bank_forks + .write() + .unwrap() + .get(1) + .unwrap() + .process_transaction(&tx) + .unwrap(); + let mut cache_slot_info = CacheSlotInfo::default(); + cache_slot_info.current_slot = 1; + subscriptions.notify_subscribers(cache_slot_info); + let (response, _) = robust_poll_or_panic(transport_receiver1); + let expected = json!({ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "context": { "slot": 1 }, + "value": { + "data": "1111111111111111", + "executable": false, + "lamports": 1, + "owner": "Budget1111111111111111111111111111111111111", + "rentEpoch": 1, + }, + }, + "subscription": 1, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + + subscriptions.notify_gossip_subscribers(1); + let (response, _) = robust_poll_or_panic(transport_receiver0); + let expected = json!({ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "context": { "slot": 1 }, + "value": { + "data": "1111111111111111", + "executable": false, + "lamports": 1, + "owner": "Budget1111111111111111111111111111111111111", + "rentEpoch": 1, + }, + }, + "subscription": 0, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + + subscriptions.remove_account_subscription(&sub_id0); + assert!(subscriptions + .subscriptions + .account_subscriptions + .read() + .unwrap() + .contains_key(&alice.pubkey())); + subscriptions.remove_account_subscription(&sub_id1); + assert!(!subscriptions + .subscriptions + .account_subscriptions + .read() + .unwrap() + .contains_key(&alice.pubkey())); + } } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 2d484e0542..3346e37c2f 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -8,6 +8,7 @@ use crate::{ cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker}, fetch_stage::FetchStage, poh_recorder::{PohRecorder, WorkingBankEntry}, + rpc_subscriptions::RpcSubscriptions, sigverify::TransactionSigVerifier, sigverify_stage::{DisabledSigVerifier, SigVerifyStage}, }; @@ -44,6 +45,7 @@ impl Tpu { tpu_forwards_sockets: Vec, broadcast_sockets: Vec, sigverify_disabled: bool, + subscriptions: &Arc, transaction_status_sender: Option, blockstore: &Arc, broadcast_type: &BroadcastStageType, @@ -79,6 +81,7 @@ impl Tpu { &poh_recorder, vote_tracker, bank_forks, + subscriptions.clone(), ); let banking_stage = BankingStage::new( diff --git a/core/src/validator.rs b/core/src/validator.rs index 55b0fd5715..1ecae8f5c1 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -470,6 +470,7 @@ impl Validator { node.sockets.tpu_forwards, node.sockets.broadcast, config.dev_sigverify_disabled, + &subscriptions, transaction_status_sender, &blockstore, &config.broadcast_stage_type, diff --git a/docs/src/apps/jsonrpc-api.md b/docs/src/apps/jsonrpc-api.md index fa0f9f3144..0221900566 100644 --- a/docs/src/apps/jsonrpc-api.md +++ b/docs/src/apps/jsonrpc-api.md @@ -1193,7 +1193,7 @@ After connecting to the RPC PubSub websocket at `ws://
/`: * Submit subscription requests to the websocket using the methods below * Multiple subscriptions may be active at once -* Many subscriptions take the optional [`commitment` parameter](jsonrpc-api.md#configuring-state-commitment), defining how finalized a change should be to trigger a notification. For subscriptions, if commitment is unspecified, the default value is `recent`. +* Many subscriptions take the optional [`commitment` parameter](jsonrpc-api.md#configuring-state-commitment), defining how finalized a change should be to trigger a notification. For subscriptions, if commitment is unspecified, the default value is `"single"`. ### accountSubscribe @@ -1204,8 +1204,6 @@ Subscribe to an account to receive notifications when the lamports or data for a * `` - account Pubkey, as base-58 encoded string * `` - (optional) [Commitment](jsonrpc-api.md#configuring-state-commitment) - Default: 0, Max: `MAX_LOCKOUT_HISTORY` \(greater integers rounded down\) - #### Results: * `` - Subscription id \(needed to unsubscribe\) @@ -1216,7 +1214,7 @@ Subscribe to an account to receive notifications when the lamports or data for a // Request {"jsonrpc":"2.0", "id":1, "method":"accountSubscribe", "params":["CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12"]} -{"jsonrpc":"2.0", "id":1, "method":"accountSubscribe", "params":["CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12", 15]} +{"jsonrpc":"2.0", "id":1, "method":"accountSubscribe", "params":["CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12", {"commitment": "single"}]} // Result {"jsonrpc": "2.0","result": 0,"id": 1} @@ -1225,7 +1223,25 @@ Subscribe to an account to receive notifications when the lamports or data for a #### Notification Format: ```bash -{"jsonrpc": "2.0","method": "accountNotification", "params": {"result": {"executable":false,"owner":"4uQeVj5tqViQh7yWWGStvkEG1Zmhx6uasJtWCJziofM","lamports":1,"data":"Joig2k8Ax4JPMpWhXRyc2jMa7Wejz4X1xqVi3i7QRkmVj1ChUgNc4VNpGUQePJGBAui3c6886peU9GEbjsyeANN8JGStprwLbLwcw5wpPjuQQb9mwrjVmoDQBjj3MzZKgeHn6wmnQ5k8DBFuoCYKWWsJfH2gv9FvCzrN6K1CRcQZzF","rentEpoch":28},"subscription":0}} +{ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "context": { + "slot": 5199307 + }, + "value": { + "data": "9qRxMDwy1ntDhBBoiy4Na9uDLbRTSzUS989mpwz", + "executable": false, + "lamports": 33594, + "owner": "H9oaJujXETwkmjyweuqKPFtk2no4SumoU9A3hi3dC8U6", + "rentEpoch": 635 + } + }, + "subscription": 23784 + } +} ``` ### accountUnsubscribe @@ -1259,8 +1275,6 @@ Subscribe to a program to receive notifications when the lamports or data for a * `` - program\_id Pubkey, as base-58 encoded string * `` - (optional) [Commitment](jsonrpc-api.md#configuring-state-commitment) - Default: 0, Max: `MAX_LOCKOUT_HISTORY` \(greater integers rounded down\) - #### Results: * `` - Subscription id \(needed to unsubscribe\) @@ -1269,9 +1283,9 @@ Subscribe to a program to receive notifications when the lamports or data for a ```bash // Request -{"jsonrpc":"2.0", "id":1, "method":"programSubscribe", "params":["9gZbPtbtHrs6hEWgd6MbVY9VPFtS5Z8xKtnYwA2NynHV"]} +{"jsonrpc":"2.0", "id":1, "method":"programSubscribe", "params":["7BwE8yitxiWkD8jVPFvPmV7rs2Znzi4NHzJGLu2dzpUq"]} -{"jsonrpc":"2.0", "id":1, "method":"programSubscribe", "params":["9gZbPtbtHrs6hEWgd6MbVY9VPFtS5Z8xKtnYwA2NynHV", 15]} +{"jsonrpc":"2.0", "id":1, "method":"programSubscribe", "params":["7BwE8yitxiWkD8jVPFvPmV7rs2Znzi4NHzJGLu2dzpUq", {"commitment": "single"}]} // Result {"jsonrpc": "2.0","result": 0,"id": 1} @@ -1279,12 +1293,30 @@ Subscribe to a program to receive notifications when the lamports or data for a #### Notification Format: -* `` - account Pubkey, as base-58 encoded string -* `` - account info JSON object \(see [getAccountInfo](jsonrpc-api.md#getaccountinfo) for field details\) - - ```bash - {"jsonrpc":"2.0","method":"programNotification","params":{{"result":["8Rshv2oMkPu5E4opXTRyuyBeZBqQ4S477VG26wUTFxUM",{"executable":false,"lamports":1,"owner":"9gZbPtbtHrs6hEWgd6MbVY9VPFtS5Z8xKtnYwA2NynHV","data":"4SZWhnbSt3njU4QHVgPrWeekz1BudU4ttmdr9ezmrL4X6XeLeL83xVAo6ZdxwU3oXgHNeF2q6tWZbnVnBXmvNyeLVEGt8ZQ4ZmgjHfVNCEwBtzh2aDrHgQSjBFLYAdmM3uwBhcm1EyHJLeUiFqpsoAUhn6Vphwrpf44dWRAGsAJZbzvVrUW9bfucpR7xudHHg2MxQ2CdqsfS3TfWUJY3vaf2A4AUNzfAmNPHBGi99nU2hYubGSVSPcpVPpdRWQkydgqasBmTosd","rentEpoch":28}],"subscription":0}} - ``` +```bash +{ + "jsonrpc": "2.0", + "method": "programNotification", + "params": { + "result": { + "context": { + "slot": 5208469 + }, + "value": { + "pubkey": "H4vnBqifaSACnKa7acsxstsY1iV1bvJNxsCY7enrd1hq" + "account": { + "data": "9qRxMDwy1ntDhBBoiy4Na9uDLbRTSzUS989m", + "executable": false, + "lamports": 33594, + "owner": "7BwE8yitxiWkD8jVPFvPmV7rs2Znzi4NHzJGLu2dzpUq", + "rentEpoch": 636 + }, + } + }, + "subscription": 24040 + } +} +``` ### programUnsubscribe @@ -1329,7 +1361,7 @@ Subscribe to a transaction signature to receive notification when the transactio // Request {"jsonrpc":"2.0", "id":1, "method":"signatureSubscribe", "params":["2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b"]} -{"jsonrpc":"2.0", "id":1, "method":"signatureSubscribe", "params":["2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b", 15]} +{"jsonrpc":"2.0", "id":1, "method":"signatureSubscribe", "params":["2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b", {"commitment": "max"}]} // Result {"jsonrpc": "2.0","result": 0,"id": 1} @@ -1338,7 +1370,21 @@ Subscribe to a transaction signature to receive notification when the transactio #### Notification Format: ```bash -{"jsonrpc": "2.0","method": "signatureNotification", "params": {"result": {"err": null}, "subscription":0}} +{ + "jsonrpc": "2.0", + "method": "signatureNotification", + "params": { + "result": { + "context": { + "slot": 5207624 + }, + "value": { + "err": null + } + }, + "subscription": 24006 + } +} ``` ### signatureUnsubscribe @@ -1388,7 +1434,18 @@ None #### Notification Format: ```bash -{"jsonrpc": "2.0","method": "slotNotification", "params": {"result":{"parent":75,"root":44,"slot":76},"subscription":0}} +{ + "jsonrpc": "2.0", + "method": "slotNotification", + "params": { + "result": { + "parent": 75, + "root": 44, + "slot": 76 + }, + "subscription": 0 + } +} ``` ### slotUnsubscribe @@ -1440,7 +1497,14 @@ None The result is the latest root slot number. ```bash -{"jsonrpc": "2.0","method": "rootNotification", "params": {"result":42,"subscription":0}} +{ + "jsonrpc": "2.0", + "method": "rootNotification", + "params": { + "result": 42, + "subscription": 0 + } +} ``` ### rootUnsubscribe @@ -1464,3 +1528,57 @@ Unsubscribe from root notifications // Result {"jsonrpc": "2.0","result": true,"id": 1} ``` + +### voteSubscribe + +Subscribe to receive notification anytime a new vote is observed in gossip. +These votes are pre-consensus therefore there is no guarantee these votes will +enter the ledger. + +#### Parameters: + +None + +#### Results: + +* `integer` - subscription id \(needed to unsubscribe\) + +#### Example: + +```bash +// Request +{"jsonrpc":"2.0", "id":1, "method":"voteSubscribe"} + +// Result +{"jsonrpc": "2.0","result": 0,"id": 1} +``` + +#### Notification Format: + +The result is the latest vote, containing its hash, a list of voted slots, and an optional timestamp. + +```bash +{"jsonrpc":"2.0","method":"voteNotification","params":{"result":{"hash":"8Rshv2oMkPu5E4opXTRyuyBeZBqQ4S477VG26wUTFxUM","slots":[1,2],"timestamp":null},"subscription":0}} +``` + +### voteUnsubscribe + +Unsubscribe from vote notifications + +#### Parameters: + +* `` - subscription id to cancel + +#### Results: + +* `` - unsubscribe success message + +#### Example: + +```bash +// Request +{"jsonrpc":"2.0", "id":1, "method":"voteUnsubscribe", "params":[0]} + +// Result +{"jsonrpc": "2.0","result": true,"id": 1} +``` diff --git a/sdk/src/commitment_config.rs b/sdk/src/commitment_config.rs index b72da97bb1..206102c080 100644 --- a/sdk/src/commitment_config.rs +++ b/sdk/src/commitment_config.rs @@ -37,6 +37,12 @@ impl CommitmentConfig { } } + pub fn single_gossip() -> Self { + Self { + commitment: CommitmentLevel::SingleGossip, + } + } + pub fn ok(self) -> Option { if self == Self::default() { None @@ -46,11 +52,12 @@ impl CommitmentConfig { } } -#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)] #[serde(rename_all = "camelCase")] pub enum CommitmentLevel { Max, Recent, Root, Single, + SingleGossip, }