diff --git a/core/src/rpc_pubsub_service.rs b/core/src/rpc_pubsub_service.rs index a1a06e66d4..711fb7b3ca 100644 --- a/core/src/rpc_pubsub_service.rs +++ b/core/src/rpc_pubsub_service.rs @@ -16,12 +16,35 @@ use std::{ time::Duration, }; +#[derive(Debug, Clone)] +pub struct PubSubConfig { + // See the corresponding fields in + // https://github.com/paritytech/ws-rs/blob/be4d47575bae55c60d9f51b47480d355492a94fc/src/lib.rs#L131 + // for a complete description of each field in this struct + pub max_connections: usize, + pub max_fragment_size: usize, + pub max_in_buffer_capacity: usize, + pub max_out_buffer_capacity: usize, +} + +impl Default for PubSubConfig { + fn default() -> Self { + Self { + max_connections: 1000, // Arbitrary, default of 100 is too low + max_fragment_size: 50 * 1024, // 50KB + max_in_buffer_capacity: 50 * 1024, // 50KB + max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc + } + } +} + pub struct PubSubService { thread_hdl: JoinHandle<()>, } impl PubSubService { pub fn new( + pubsub_config: PubSubConfig, subscriptions: &Arc, pubsub_addr: SocketAddr, exit: &Arc, @@ -29,6 +52,20 @@ impl PubSubService { info!("rpc_pubsub bound to {:?}", pubsub_addr); let rpc = RpcSolPubSubImpl::new(subscriptions.clone()); let exit_ = exit.clone(); + + // TODO: Once https://github.com/paritytech/jsonrpc/pull/594 lands, use + // `ServerBuilder::max_in_buffer_capacity()` and `Server::max_out_buffer_capacity() methods + // instead of only `ServerBuilder::max_payload` + let max_payload = *[ + pubsub_config.max_fragment_size, + pubsub_config.max_in_buffer_capacity, + pubsub_config.max_out_buffer_capacity, + ] + .iter() + .max() + .unwrap(); + info!("rpc_pubsub max_payload: {}", max_payload); + let thread_hdl = Builder::new() .name("solana-pubsub".to_string()) .spawn(move || { @@ -36,19 +73,24 @@ impl PubSubService { io.extend_with(rpc.to_delegate()); let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| { - info!("New pubsub connection"); - let session = Arc::new(Session::new(context.sender())); - session.on_drop(|| { - info!("Pubsub connection dropped"); - }); - session + info!("New pubsub connection"); + let session = Arc::new(Session::new(context.sender())); + session.on_drop(|| { + info!("Pubsub connection dropped"); + }); + session }) - .max_connections(1000) // Arbitrary, default of 100 is too low - .max_payload(10 * 1024 * 1024 + 1024) // max account size (10MB) + extra (1K) + .max_connections(pubsub_config.max_connections) + .max_payload(max_payload) .start(&pubsub_addr); if let Err(e) = server { - warn!("Pubsub service unavailable error: {:?}. \nAlso, check that port {} is not already in use by another application", e, pubsub_addr.port()); + warn!( + "Pubsub service unavailable error: {:?}. \n\ + Also, check that port {} is not already in use by another application", + e, + pubsub_addr.port() + ); return; } while !exit_.load(Ordering::Relaxed) { @@ -99,7 +141,8 @@ mod tests { Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), optimistically_confirmed_bank, )); - let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); + let pubsub_service = + PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr, &exit); let thread = pubsub_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-pubsub"); } diff --git a/core/src/validator.rs b/core/src/validator.rs index 5427193a61..a206d1a021 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -17,7 +17,7 @@ use crate::{ poh_service::PohService, rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService}, rpc::JsonRpcConfig, - rpc_pubsub_service::PubSubService, + rpc_pubsub_service::{PubSubConfig, PubSubService}, rpc_service::JsonRpcService, rpc_subscriptions::RpcSubscriptions, sample_performance_service::SamplePerformanceService, @@ -83,6 +83,7 @@ pub struct ValidatorConfig { pub account_paths: Vec, pub rpc_config: JsonRpcConfig, pub rpc_addrs: Option<(SocketAddr, SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub, Banks) + pub pubsub_config: PubSubConfig, pub snapshot_config: Option, pub max_ledger_shreds: Option, pub broadcast_stage_type: BroadcastStageType, @@ -118,6 +119,7 @@ impl Default for ValidatorConfig { account_paths: Vec::new(), rpc_config: JsonRpcConfig::default(), rpc_addrs: None, + pubsub_config: PubSubConfig::default(), snapshot_config: None, broadcast_stage_type: BroadcastStageType::Standard, enable_partition: None, @@ -435,7 +437,12 @@ impl Validator { rpc_override_health_check.clone(), optimistically_confirmed_bank.clone(), ), - pubsub_service: PubSubService::new(&subscriptions, rpc_pubsub_addr, &exit), + pubsub_service: PubSubService::new( + config.pubsub_config.clone(), + &subscriptions, + rpc_pubsub_addr, + &exit, + ), rpc_banks_service: RpcBanksService::new( rpc_banks_addr, tpu_address, diff --git a/core/tests/client.rs b/core/tests/client.rs index 77b10ef807..ba849ded44 100644 --- a/core/tests/client.rs +++ b/core/tests/client.rs @@ -1,7 +1,8 @@ use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::SlotInfo}; use solana_core::{ optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, - rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions, + rpc_pubsub_service::{PubSubConfig, PubSubService}, + rpc_subscriptions::RpcSubscriptions, test_validator::TestValidator, }; use solana_runtime::{ @@ -100,7 +101,8 @@ fn test_slot_subscription() { Arc::new(RwLock::new(BlockCommitmentCache::default())), optimistically_confirmed_bank, )); - let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); + let pubsub_service = + PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr, &exit); std::thread::sleep(Duration::from_millis(400)); let (mut client, receiver) = diff --git a/validator/src/main.rs b/validator/src/main.rs index 88c1917b81..98fe9726ff 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -20,6 +20,7 @@ use solana_core::{ contact_info::ContactInfo, gossip_service::GossipService, rpc::JsonRpcConfig, + rpc_pubsub_service::PubSubConfig, validator::{Validator, ValidatorConfig}, }; use solana_download_utils::{download_genesis_if_missing, download_snapshot}; @@ -786,6 +787,13 @@ pub fn main() { let default_dynamic_port_range = &format!("{}-{}", VALIDATOR_PORT_RANGE.0, VALIDATOR_PORT_RANGE.1); let default_genesis_archive_unpacked_size = &MAX_GENESIS_ARCHIVE_UNPACKED_SIZE.to_string(); + let default_rpc_pubsub_max_connections = PubSubConfig::default().max_connections.to_string(); + let default_rpc_pubsub_max_fragment_size = + PubSubConfig::default().max_fragment_size.to_string(); + let default_rpc_pubsub_max_in_buffer_capacity = + PubSubConfig::default().max_in_buffer_capacity.to_string(); + let default_rpc_pubsub_max_out_buffer_capacity = + PubSubConfig::default().max_out_buffer_capacity.to_string(); let matches = App::new(crate_name!()).about(crate_description!()) .version(solana_version::version!()) @@ -1186,6 +1194,45 @@ pub fn main() { .validator(solana_net_utils::is_host) .help("IP address to bind the RPC port [default: use --bind-address]"), ) + .arg( + Arg::with_name("rpc_pubsub_max_connections") + .long("rpc-pubsub-max-connections") + .value_name("NUMBER") + .takes_value(true) + .validator(is_parsable::) + .default_value(&default_rpc_pubsub_max_connections) + .help("The maximum number of connections that RPC PubSub will support. \ + This is a hard limit and no new connections beyond this limit can \ + be made until an old connection is dropped."), + ) + .arg( + Arg::with_name("rpc_pubsub_max_fragment_size") + .long("rpc-pubsub-max-fragment-size") + .value_name("BYTES") + .takes_value(true) + .validator(is_parsable::) + .default_value(&default_rpc_pubsub_max_fragment_size) + .help("The maximum length in bytes of acceptable incoming frames. Messages longer \ + than this will be rejected."), + ) + .arg( + Arg::with_name("rpc_pubsub_max_in_buffer_capacity") + .long("rpc-pubsub-max-in-buffer-capacity") + .value_name("BYTES") + .takes_value(true) + .validator(is_parsable::) + .default_value(&default_rpc_pubsub_max_in_buffer_capacity) + .help("The maximum size in bytes to which the incoming websocket buffer can grow."), + ) + .arg( + Arg::with_name("rpc_pubsub_max_out_buffer_capacity") + .long("rpc-pubsub-max-out-buffer-capacity") + .value_name("BYTES") + .takes_value(true) + .validator(is_parsable::) + .default_value(&default_rpc_pubsub_max_out_buffer_capacity) + .help("The maximum size in bytes to which the outgoing websocket buffer can grow."), + ) .arg( Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch") .long("halt-on-trusted-validators-accounts-hash-mismatch") @@ -1312,7 +1359,6 @@ pub fn main() { }; let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode"); - let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(), @@ -1349,6 +1395,20 @@ pub fn main() { SocketAddr::new(rpc_bind_address, rpc_port + 3), ) }), + pubsub_config: PubSubConfig { + max_connections: value_t_or_exit!(matches, "rpc_pubsub_max_connections", usize), + max_fragment_size: value_t_or_exit!(matches, "rpc_pubsub_max_fragment_size", usize), + max_in_buffer_capacity: value_t_or_exit!( + matches, + "rpc_pubsub_max_in_buffer_capacity", + usize + ), + max_out_buffer_capacity: value_t_or_exit!( + matches, + "rpc_pubsub_max_out_buffer_capacity", + usize + ), + }, voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode, wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(), trusted_validators,