Disable the PubSub vote subscription by default (#13599)
The --rpc-pubsub-enable-vote-subscription flag may be used to enable it.
The current vote subscription is problematic because it emits a
notification for *every* vote, so hundreds a second in a real cluster.
Critically it's also missing information about *who* is voting,
rendering all those notifications practically useless.
Until these two issues can be resolved, the vote subscription is not
much more than a potential DoS vector.
(cherry picked from commit 5d72e52ad0)
Co-authored-by: Michael Vines <mvines@gmail.com>
			
			
This commit is contained in:
		| @@ -1006,11 +1006,12 @@ mod tests { | |||||||
|         // Setup Subscriptions |         // Setup Subscriptions | ||||||
|         let optimistically_confirmed_bank = |         let optimistically_confirmed_bank = | ||||||
|             OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); |             OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); | ||||||
|         let subscriptions = RpcSubscriptions::new( |         let subscriptions = RpcSubscriptions::new_with_vote_subscription( | ||||||
|             &exit, |             &exit, | ||||||
|             bank_forks, |             bank_forks, | ||||||
|             block_commitment_cache, |             block_commitment_cache, | ||||||
|             optimistically_confirmed_bank, |             optimistically_confirmed_bank, | ||||||
|  |             true, | ||||||
|         ); |         ); | ||||||
|         rpc.subscriptions = Arc::new(subscriptions); |         rpc.subscriptions = Arc::new(subscriptions); | ||||||
|         rpc.vote_subscribe(session, subscriber); |         rpc.vote_subscribe(session, subscriber); | ||||||
|   | |||||||
| @@ -18,6 +18,8 @@ use std::{ | |||||||
|  |  | ||||||
| #[derive(Debug, Clone)] | #[derive(Debug, Clone)] | ||||||
| pub struct PubSubConfig { | pub struct PubSubConfig { | ||||||
|  |     pub enable_vote_subscription: bool, | ||||||
|  |  | ||||||
|     // See the corresponding fields in |     // See the corresponding fields in | ||||||
|     // https://github.com/paritytech/ws-rs/blob/be4d47575bae55c60d9f51b47480d355492a94fc/src/lib.rs#L131 |     // https://github.com/paritytech/ws-rs/blob/be4d47575bae55c60d9f51b47480d355492a94fc/src/lib.rs#L131 | ||||||
|     // for a complete description of each field in this struct |     // for a complete description of each field in this struct | ||||||
| @@ -30,8 +32,9 @@ pub struct PubSubConfig { | |||||||
| impl Default for PubSubConfig { | impl Default for PubSubConfig { | ||||||
|     fn default() -> Self { |     fn default() -> Self { | ||||||
|         Self { |         Self { | ||||||
|             max_connections: 1000,             // Arbitrary, default of 100 is too low |             enable_vote_subscription: false, | ||||||
|             max_fragment_size: 50 * 1024,      // 50KB |             max_connections: 1000, // Arbitrary, default of 100 is too low | ||||||
|  |             max_fragment_size: 50 * 1024, // 50KB | ||||||
|             max_in_buffer_capacity: 50 * 1024, // 50KB |             max_in_buffer_capacity: 50 * 1024, // 50KB | ||||||
|             max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc |             max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc | ||||||
|         } |         } | ||||||
|   | |||||||
| @@ -349,6 +349,7 @@ pub struct RpcSubscriptions { | |||||||
|     block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, |     block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, | ||||||
|     optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>, |     optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>, | ||||||
|     exit: Arc<AtomicBool>, |     exit: Arc<AtomicBool>, | ||||||
|  |     enable_vote_subscription: bool, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Drop for RpcSubscriptions { | impl Drop for RpcSubscriptions { | ||||||
| @@ -365,6 +366,22 @@ impl RpcSubscriptions { | |||||||
|         bank_forks: Arc<RwLock<BankForks>>, |         bank_forks: Arc<RwLock<BankForks>>, | ||||||
|         block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, |         block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, | ||||||
|         optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>, |         optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>, | ||||||
|  |     ) -> Self { | ||||||
|  |         Self::new_with_vote_subscription( | ||||||
|  |             exit, | ||||||
|  |             bank_forks, | ||||||
|  |             block_commitment_cache, | ||||||
|  |             optimistically_confirmed_bank, | ||||||
|  |             false, | ||||||
|  |         ) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn new_with_vote_subscription( | ||||||
|  |         exit: &Arc<AtomicBool>, | ||||||
|  |         bank_forks: Arc<RwLock<BankForks>>, | ||||||
|  |         block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, | ||||||
|  |         optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>, | ||||||
|  |         enable_vote_subscription: bool, | ||||||
|     ) -> Self { |     ) -> Self { | ||||||
|         let (notification_sender, notification_receiver): ( |         let (notification_sender, notification_receiver): ( | ||||||
|             Sender<NotificationEntry>, |             Sender<NotificationEntry>, | ||||||
| @@ -427,17 +444,20 @@ impl RpcSubscriptions { | |||||||
|             block_commitment_cache, |             block_commitment_cache, | ||||||
|             optimistically_confirmed_bank, |             optimistically_confirmed_bank, | ||||||
|             exit: exit.clone(), |             exit: exit.clone(), | ||||||
|  |             enable_vote_subscription, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     // For tests only... | ||||||
|     pub fn default_with_bank_forks(bank_forks: Arc<RwLock<BankForks>>) -> Self { |     pub fn default_with_bank_forks(bank_forks: Arc<RwLock<BankForks>>) -> Self { | ||||||
|         let optimistically_confirmed_bank = |         let optimistically_confirmed_bank = | ||||||
|             OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); |             OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); | ||||||
|         Self::new( |         Self::new_with_vote_subscription( | ||||||
|             &Arc::new(AtomicBool::new(false)), |             &Arc::new(AtomicBool::new(false)), | ||||||
|             bank_forks, |             bank_forks, | ||||||
|             Arc::new(RwLock::new(BlockCommitmentCache::default())), |             Arc::new(RwLock::new(BlockCommitmentCache::default())), | ||||||
|             optimistically_confirmed_bank, |             optimistically_confirmed_bank, | ||||||
|  |             true, | ||||||
|         ) |         ) | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -711,9 +731,15 @@ impl RpcSubscriptions { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn add_vote_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<RpcVote>) { |     pub fn add_vote_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<RpcVote>) { | ||||||
|         let sink = subscriber.assign_id(sub_id.clone()).unwrap(); |         if self.enable_vote_subscription { | ||||||
|         let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap(); |             let sink = subscriber.assign_id(sub_id.clone()).unwrap(); | ||||||
|         subscriptions.insert(sub_id, sink); |             let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap(); | ||||||
|  |             subscriptions.insert(sub_id, sink); | ||||||
|  |         } else { | ||||||
|  |             let _ = subscriber.reject(jsonrpc_core::Error::new( | ||||||
|  |                 jsonrpc_core::ErrorCode::MethodNotFound, | ||||||
|  |             )); | ||||||
|  |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn remove_vote_subscription(&self, id: &SubscriptionId) -> bool { |     pub fn remove_vote_subscription(&self, id: &SubscriptionId) -> bool { | ||||||
|   | |||||||
| @@ -332,11 +332,12 @@ impl Validator { | |||||||
|         let optimistically_confirmed_bank = |         let optimistically_confirmed_bank = | ||||||
|             OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); |             OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); | ||||||
|  |  | ||||||
|         let subscriptions = Arc::new(RpcSubscriptions::new( |         let subscriptions = Arc::new(RpcSubscriptions::new_with_vote_subscription( | ||||||
|             &exit, |             &exit, | ||||||
|             bank_forks.clone(), |             bank_forks.clone(), | ||||||
|             block_commitment_cache.clone(), |             block_commitment_cache.clone(), | ||||||
|             optimistically_confirmed_bank.clone(), |             optimistically_confirmed_bank.clone(), | ||||||
|  |             config.pubsub_config.enable_vote_subscription, | ||||||
|         )); |         )); | ||||||
|  |  | ||||||
|         let (completed_data_sets_sender, completed_data_sets_receiver) = |         let (completed_data_sets_sender, completed_data_sets_receiver) = | ||||||
|   | |||||||
| @@ -1551,7 +1551,7 @@ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0","id":1, "m | |||||||
| {"jsonrpc":"2.0","result":true,"id":1} | {"jsonrpc":"2.0","result":true,"id":1} | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
| ### Subscription Websocket | ## Subscription Websocket | ||||||
|  |  | ||||||
| After connecting to the RPC PubSub websocket at `ws://<ADDRESS>/`: | After connecting to the RPC PubSub websocket at `ws://<ADDRESS>/`: | ||||||
|  |  | ||||||
| @@ -1973,7 +1973,11 @@ Unsubscribe from root notifications | |||||||
| {"jsonrpc": "2.0","result": true,"id": 1} | {"jsonrpc": "2.0","result": true,"id": 1} | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
| ### voteSubscribe | ### voteSubscribe - Unstable, disabled by default | ||||||
|  |  | ||||||
|  | **This subscription is unstable and only available if the validator was started | ||||||
|  | with the `--rpc-pubsub-enable-vote-subscription` flag.  The format of this | ||||||
|  | subscription may change in the future** | ||||||
|  |  | ||||||
| Subscribe to receive notification anytime a new vote is observed in gossip. | Subscribe to receive notification anytime a new vote is observed in gossip. | ||||||
| These votes are pre-consensus therefore there is no guarantee these votes will | These votes are pre-consensus therefore there is no guarantee these votes will | ||||||
|   | |||||||
| @@ -1252,6 +1252,12 @@ pub fn main() { | |||||||
|                 .validator(solana_net_utils::is_host) |                 .validator(solana_net_utils::is_host) | ||||||
|                 .help("IP address to bind the RPC port [default: use --bind-address]"), |                 .help("IP address to bind the RPC port [default: use --bind-address]"), | ||||||
|         ) |         ) | ||||||
|  |         .arg( | ||||||
|  |             Arg::with_name("rpc_pubsub_enable_vote_subscription") | ||||||
|  |                 .long("rpc-pubsub-enable-vote-subscription") | ||||||
|  |                 .takes_value(false) | ||||||
|  |                 .help("Enable the unstable RPC PubSub `voteSubscribe` subscription"), | ||||||
|  |         ) | ||||||
|         .arg( |         .arg( | ||||||
|             Arg::with_name("rpc_pubsub_max_connections") |             Arg::with_name("rpc_pubsub_max_connections") | ||||||
|                 .long("rpc-pubsub-max-connections") |                 .long("rpc-pubsub-max-connections") | ||||||
| @@ -1455,6 +1461,7 @@ pub fn main() { | |||||||
|             ) |             ) | ||||||
|         }), |         }), | ||||||
|         pubsub_config: PubSubConfig { |         pubsub_config: PubSubConfig { | ||||||
|  |             enable_vote_subscription: matches.is_present("rpc_pubsub_enable_vote_subscription"), | ||||||
|             max_connections: value_t_or_exit!(matches, "rpc_pubsub_max_connections", usize), |             max_connections: value_t_or_exit!(matches, "rpc_pubsub_max_connections", usize), | ||||||
|             max_fragment_size: value_t_or_exit!(matches, "rpc_pubsub_max_fragment_size", usize), |             max_fragment_size: value_t_or_exit!(matches, "rpc_pubsub_max_fragment_size", usize), | ||||||
|             max_in_buffer_capacity: value_t_or_exit!( |             max_in_buffer_capacity: value_t_or_exit!( | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user