Disable the PubSub vote subscription by default
The --rpc-pubsub-enable-vote-subscription flag may be used to enable it. The current vote subscription is problematic because it emits a notification for *every* vote, so hundreds a second in a real cluster. Critically it's also missing information about *who* is voting, rendering all those notifications practically useless. Until these two issues can be resolved, the vote subscription is not much more than a potential DoS vector.
This commit is contained in:
@ -1002,11 +1002,12 @@ mod tests {
|
|||||||
// Setup Subscriptions
|
// Setup Subscriptions
|
||||||
let optimistically_confirmed_bank =
|
let optimistically_confirmed_bank =
|
||||||
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
||||||
let subscriptions = RpcSubscriptions::new(
|
let subscriptions = RpcSubscriptions::new_with_vote_subscription(
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
block_commitment_cache,
|
block_commitment_cache,
|
||||||
optimistically_confirmed_bank,
|
optimistically_confirmed_bank,
|
||||||
|
true,
|
||||||
);
|
);
|
||||||
rpc.subscriptions = Arc::new(subscriptions);
|
rpc.subscriptions = Arc::new(subscriptions);
|
||||||
rpc.vote_subscribe(session, subscriber);
|
rpc.vote_subscribe(session, subscriber);
|
||||||
|
@ -18,6 +18,8 @@ use std::{
|
|||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct PubSubConfig {
|
pub struct PubSubConfig {
|
||||||
|
pub enable_vote_subscription: bool,
|
||||||
|
|
||||||
// See the corresponding fields in
|
// See the corresponding fields in
|
||||||
// https://github.com/paritytech/ws-rs/blob/be4d47575bae55c60d9f51b47480d355492a94fc/src/lib.rs#L131
|
// https://github.com/paritytech/ws-rs/blob/be4d47575bae55c60d9f51b47480d355492a94fc/src/lib.rs#L131
|
||||||
// for a complete description of each field in this struct
|
// for a complete description of each field in this struct
|
||||||
@ -30,8 +32,9 @@ pub struct PubSubConfig {
|
|||||||
impl Default for PubSubConfig {
|
impl Default for PubSubConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
max_connections: 1000, // Arbitrary, default of 100 is too low
|
enable_vote_subscription: false,
|
||||||
max_fragment_size: 50 * 1024, // 50KB
|
max_connections: 1000, // Arbitrary, default of 100 is too low
|
||||||
|
max_fragment_size: 50 * 1024, // 50KB
|
||||||
max_in_buffer_capacity: 50 * 1024, // 50KB
|
max_in_buffer_capacity: 50 * 1024, // 50KB
|
||||||
max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc
|
max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc
|
||||||
}
|
}
|
||||||
|
@ -344,6 +344,7 @@ pub struct RpcSubscriptions {
|
|||||||
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||||
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
|
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
|
enable_vote_subscription: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for RpcSubscriptions {
|
impl Drop for RpcSubscriptions {
|
||||||
@ -360,6 +361,22 @@ impl RpcSubscriptions {
|
|||||||
bank_forks: Arc<RwLock<BankForks>>,
|
bank_forks: Arc<RwLock<BankForks>>,
|
||||||
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||||
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
|
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
|
||||||
|
) -> Self {
|
||||||
|
Self::new_with_vote_subscription(
|
||||||
|
exit,
|
||||||
|
bank_forks,
|
||||||
|
block_commitment_cache,
|
||||||
|
optimistically_confirmed_bank,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_with_vote_subscription(
|
||||||
|
exit: &Arc<AtomicBool>,
|
||||||
|
bank_forks: Arc<RwLock<BankForks>>,
|
||||||
|
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||||
|
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
|
||||||
|
enable_vote_subscription: bool,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (notification_sender, notification_receiver): (
|
let (notification_sender, notification_receiver): (
|
||||||
Sender<NotificationEntry>,
|
Sender<NotificationEntry>,
|
||||||
@ -422,17 +439,20 @@ impl RpcSubscriptions {
|
|||||||
block_commitment_cache,
|
block_commitment_cache,
|
||||||
optimistically_confirmed_bank,
|
optimistically_confirmed_bank,
|
||||||
exit: exit.clone(),
|
exit: exit.clone(),
|
||||||
|
enable_vote_subscription,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For tests only...
|
||||||
pub fn default_with_bank_forks(bank_forks: Arc<RwLock<BankForks>>) -> Self {
|
pub fn default_with_bank_forks(bank_forks: Arc<RwLock<BankForks>>) -> Self {
|
||||||
let optimistically_confirmed_bank =
|
let optimistically_confirmed_bank =
|
||||||
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
||||||
Self::new(
|
Self::new_with_vote_subscription(
|
||||||
&Arc::new(AtomicBool::new(false)),
|
&Arc::new(AtomicBool::new(false)),
|
||||||
bank_forks,
|
bank_forks,
|
||||||
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
||||||
optimistically_confirmed_bank,
|
optimistically_confirmed_bank,
|
||||||
|
true,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -706,9 +726,15 @@ impl RpcSubscriptions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_vote_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<RpcVote>) {
|
pub fn add_vote_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<RpcVote>) {
|
||||||
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
if self.enable_vote_subscription {
|
||||||
let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap();
|
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
||||||
subscriptions.insert(sub_id, sink);
|
let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap();
|
||||||
|
subscriptions.insert(sub_id, sink);
|
||||||
|
} else {
|
||||||
|
let _ = subscriber.reject(jsonrpc_core::Error::new(
|
||||||
|
jsonrpc_core::ErrorCode::MethodNotFound,
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove_vote_subscription(&self, id: &SubscriptionId) -> bool {
|
pub fn remove_vote_subscription(&self, id: &SubscriptionId) -> bool {
|
||||||
|
@ -344,11 +344,12 @@ impl Validator {
|
|||||||
let optimistically_confirmed_bank =
|
let optimistically_confirmed_bank =
|
||||||
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
||||||
|
|
||||||
let subscriptions = Arc::new(RpcSubscriptions::new(
|
let subscriptions = Arc::new(RpcSubscriptions::new_with_vote_subscription(
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
block_commitment_cache.clone(),
|
block_commitment_cache.clone(),
|
||||||
optimistically_confirmed_bank.clone(),
|
optimistically_confirmed_bank.clone(),
|
||||||
|
config.pubsub_config.enable_vote_subscription,
|
||||||
));
|
));
|
||||||
|
|
||||||
let (completed_data_sets_sender, completed_data_sets_receiver) =
|
let (completed_data_sets_sender, completed_data_sets_receiver) =
|
||||||
|
@ -2834,7 +2834,7 @@ Result:
|
|||||||
{"jsonrpc":"2.0","result":true,"id":1}
|
{"jsonrpc":"2.0","result":true,"id":1}
|
||||||
```
|
```
|
||||||
|
|
||||||
### Subscription Websocket
|
## Subscription Websocket
|
||||||
|
|
||||||
After connecting to the RPC PubSub websocket at `ws://<ADDRESS>/`:
|
After connecting to the RPC PubSub websocket at `ws://<ADDRESS>/`:
|
||||||
|
|
||||||
@ -3355,7 +3355,11 @@ Result:
|
|||||||
{"jsonrpc": "2.0","result": true,"id": 1}
|
{"jsonrpc": "2.0","result": true,"id": 1}
|
||||||
```
|
```
|
||||||
|
|
||||||
### voteSubscribe
|
### voteSubscribe - Unstable, disabled by default
|
||||||
|
|
||||||
|
**This subscription is unstable and only available if the validator was started
|
||||||
|
with the `--rpc-pubsub-enable-vote-subscription` flag. The format of this
|
||||||
|
subscription may change in the future**
|
||||||
|
|
||||||
Subscribe to receive notification anytime a new vote is observed in gossip.
|
Subscribe to receive notification anytime a new vote is observed in gossip.
|
||||||
These votes are pre-consensus therefore there is no guarantee these votes will
|
These votes are pre-consensus therefore there is no guarantee these votes will
|
||||||
|
@ -1264,6 +1264,12 @@ pub fn main() {
|
|||||||
.validator(solana_net_utils::is_host)
|
.validator(solana_net_utils::is_host)
|
||||||
.help("IP address to bind the RPC port [default: use --bind-address]"),
|
.help("IP address to bind the RPC port [default: use --bind-address]"),
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("rpc_pubsub_enable_vote_subscription")
|
||||||
|
.long("rpc-pubsub-enable-vote-subscription")
|
||||||
|
.takes_value(false)
|
||||||
|
.help("Enable the unstable RPC PubSub `voteSubscribe` subscription"),
|
||||||
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("rpc_pubsub_max_connections")
|
Arg::with_name("rpc_pubsub_max_connections")
|
||||||
.long("rpc-pubsub-max-connections")
|
.long("rpc-pubsub-max-connections")
|
||||||
@ -1469,6 +1475,7 @@ pub fn main() {
|
|||||||
)
|
)
|
||||||
}),
|
}),
|
||||||
pubsub_config: PubSubConfig {
|
pubsub_config: PubSubConfig {
|
||||||
|
enable_vote_subscription: matches.is_present("rpc_pubsub_enable_vote_subscription"),
|
||||||
max_connections: value_t_or_exit!(matches, "rpc_pubsub_max_connections", usize),
|
max_connections: value_t_or_exit!(matches, "rpc_pubsub_max_connections", usize),
|
||||||
max_fragment_size: value_t_or_exit!(matches, "rpc_pubsub_max_fragment_size", usize),
|
max_fragment_size: value_t_or_exit!(matches, "rpc_pubsub_max_fragment_size", usize),
|
||||||
max_in_buffer_capacity: value_t_or_exit!(
|
max_in_buffer_capacity: value_t_or_exit!(
|
||||||
|
Reference in New Issue
Block a user