Expose validator cli arguments for pubsub buffer tuning
This commit is contained in:
@ -16,12 +16,35 @@ use std::{
|
|||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct PubSubConfig {
|
||||||
|
// See the corresponding fields in
|
||||||
|
// https://github.com/paritytech/ws-rs/blob/be4d47575bae55c60d9f51b47480d355492a94fc/src/lib.rs#L131
|
||||||
|
// for a complete description of each field in this struct
|
||||||
|
pub max_connections: usize,
|
||||||
|
pub max_fragment_size: usize,
|
||||||
|
pub max_in_buffer_capacity: usize,
|
||||||
|
pub max_out_buffer_capacity: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for PubSubConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
max_connections: 1000, // Arbitrary, default of 100 is too low
|
||||||
|
max_fragment_size: 50 * 1024, // 50KB
|
||||||
|
max_in_buffer_capacity: 50 * 1024, // 50KB
|
||||||
|
max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct PubSubService {
|
pub struct PubSubService {
|
||||||
thread_hdl: JoinHandle<()>,
|
thread_hdl: JoinHandle<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PubSubService {
|
impl PubSubService {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
|
pubsub_config: PubSubConfig,
|
||||||
subscriptions: &Arc<RpcSubscriptions>,
|
subscriptions: &Arc<RpcSubscriptions>,
|
||||||
pubsub_addr: SocketAddr,
|
pubsub_addr: SocketAddr,
|
||||||
exit: &Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
@ -29,6 +52,20 @@ impl PubSubService {
|
|||||||
info!("rpc_pubsub bound to {:?}", pubsub_addr);
|
info!("rpc_pubsub bound to {:?}", pubsub_addr);
|
||||||
let rpc = RpcSolPubSubImpl::new(subscriptions.clone());
|
let rpc = RpcSolPubSubImpl::new(subscriptions.clone());
|
||||||
let exit_ = exit.clone();
|
let exit_ = exit.clone();
|
||||||
|
|
||||||
|
// TODO: Once https://github.com/paritytech/jsonrpc/pull/594 lands, use
|
||||||
|
// `ServerBuilder::max_in_buffer_capacity()` and `Server::max_out_buffer_capacity() methods
|
||||||
|
// instead of only `ServerBuilder::max_payload`
|
||||||
|
let max_payload = *[
|
||||||
|
pubsub_config.max_fragment_size,
|
||||||
|
pubsub_config.max_in_buffer_capacity,
|
||||||
|
pubsub_config.max_out_buffer_capacity,
|
||||||
|
]
|
||||||
|
.iter()
|
||||||
|
.max()
|
||||||
|
.unwrap();
|
||||||
|
info!("rpc_pubsub max_payload: {}", max_payload);
|
||||||
|
|
||||||
let thread_hdl = Builder::new()
|
let thread_hdl = Builder::new()
|
||||||
.name("solana-pubsub".to_string())
|
.name("solana-pubsub".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
@ -36,19 +73,24 @@ impl PubSubService {
|
|||||||
io.extend_with(rpc.to_delegate());
|
io.extend_with(rpc.to_delegate());
|
||||||
|
|
||||||
let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
|
let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
|
||||||
info!("New pubsub connection");
|
info!("New pubsub connection");
|
||||||
let session = Arc::new(Session::new(context.sender()));
|
let session = Arc::new(Session::new(context.sender()));
|
||||||
session.on_drop(|| {
|
session.on_drop(|| {
|
||||||
info!("Pubsub connection dropped");
|
info!("Pubsub connection dropped");
|
||||||
});
|
});
|
||||||
session
|
session
|
||||||
})
|
})
|
||||||
.max_connections(1000) // Arbitrary, default of 100 is too low
|
.max_connections(pubsub_config.max_connections)
|
||||||
.max_payload(10 * 1024 * 1024 + 1024) // max account size (10MB) + extra (1K)
|
.max_payload(max_payload)
|
||||||
.start(&pubsub_addr);
|
.start(&pubsub_addr);
|
||||||
|
|
||||||
if let Err(e) = server {
|
if let Err(e) = server {
|
||||||
warn!("Pubsub service unavailable error: {:?}. \nAlso, check that port {} is not already in use by another application", e, pubsub_addr.port());
|
warn!(
|
||||||
|
"Pubsub service unavailable error: {:?}. \n\
|
||||||
|
Also, check that port {} is not already in use by another application",
|
||||||
|
e,
|
||||||
|
pubsub_addr.port()
|
||||||
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
while !exit_.load(Ordering::Relaxed) {
|
while !exit_.load(Ordering::Relaxed) {
|
||||||
@ -99,7 +141,8 @@ mod tests {
|
|||||||
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
|
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
|
||||||
optimistically_confirmed_bank,
|
optimistically_confirmed_bank,
|
||||||
));
|
));
|
||||||
let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit);
|
let pubsub_service =
|
||||||
|
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr, &exit);
|
||||||
let thread = pubsub_service.thread_hdl.thread();
|
let thread = pubsub_service.thread_hdl.thread();
|
||||||
assert_eq!(thread.name().unwrap(), "solana-pubsub");
|
assert_eq!(thread.name().unwrap(), "solana-pubsub");
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,7 @@ use crate::{
|
|||||||
poh_service::PohService,
|
poh_service::PohService,
|
||||||
rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService},
|
rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService},
|
||||||
rpc::JsonRpcConfig,
|
rpc::JsonRpcConfig,
|
||||||
rpc_pubsub_service::PubSubService,
|
rpc_pubsub_service::{PubSubConfig, PubSubService},
|
||||||
rpc_service::JsonRpcService,
|
rpc_service::JsonRpcService,
|
||||||
rpc_subscriptions::RpcSubscriptions,
|
rpc_subscriptions::RpcSubscriptions,
|
||||||
sample_performance_service::SamplePerformanceService,
|
sample_performance_service::SamplePerformanceService,
|
||||||
@ -83,6 +83,7 @@ pub struct ValidatorConfig {
|
|||||||
pub account_paths: Vec<PathBuf>,
|
pub account_paths: Vec<PathBuf>,
|
||||||
pub rpc_config: JsonRpcConfig,
|
pub rpc_config: JsonRpcConfig,
|
||||||
pub rpc_addrs: Option<(SocketAddr, SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub, Banks)
|
pub rpc_addrs: Option<(SocketAddr, SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub, Banks)
|
||||||
|
pub pubsub_config: PubSubConfig,
|
||||||
pub snapshot_config: Option<SnapshotConfig>,
|
pub snapshot_config: Option<SnapshotConfig>,
|
||||||
pub max_ledger_shreds: Option<u64>,
|
pub max_ledger_shreds: Option<u64>,
|
||||||
pub broadcast_stage_type: BroadcastStageType,
|
pub broadcast_stage_type: BroadcastStageType,
|
||||||
@ -118,6 +119,7 @@ impl Default for ValidatorConfig {
|
|||||||
account_paths: Vec::new(),
|
account_paths: Vec::new(),
|
||||||
rpc_config: JsonRpcConfig::default(),
|
rpc_config: JsonRpcConfig::default(),
|
||||||
rpc_addrs: None,
|
rpc_addrs: None,
|
||||||
|
pubsub_config: PubSubConfig::default(),
|
||||||
snapshot_config: None,
|
snapshot_config: None,
|
||||||
broadcast_stage_type: BroadcastStageType::Standard,
|
broadcast_stage_type: BroadcastStageType::Standard,
|
||||||
enable_partition: None,
|
enable_partition: None,
|
||||||
@ -435,7 +437,12 @@ impl Validator {
|
|||||||
rpc_override_health_check.clone(),
|
rpc_override_health_check.clone(),
|
||||||
optimistically_confirmed_bank.clone(),
|
optimistically_confirmed_bank.clone(),
|
||||||
),
|
),
|
||||||
pubsub_service: PubSubService::new(&subscriptions, rpc_pubsub_addr, &exit),
|
pubsub_service: PubSubService::new(
|
||||||
|
config.pubsub_config.clone(),
|
||||||
|
&subscriptions,
|
||||||
|
rpc_pubsub_addr,
|
||||||
|
&exit,
|
||||||
|
),
|
||||||
rpc_banks_service: RpcBanksService::new(
|
rpc_banks_service: RpcBanksService::new(
|
||||||
rpc_banks_addr,
|
rpc_banks_addr,
|
||||||
tpu_address,
|
tpu_address,
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::SlotInfo};
|
use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::SlotInfo};
|
||||||
use solana_core::{
|
use solana_core::{
|
||||||
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
|
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
|
||||||
rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions,
|
rpc_pubsub_service::{PubSubConfig, PubSubService},
|
||||||
|
rpc_subscriptions::RpcSubscriptions,
|
||||||
test_validator::TestValidator,
|
test_validator::TestValidator,
|
||||||
};
|
};
|
||||||
use solana_runtime::{
|
use solana_runtime::{
|
||||||
@ -100,7 +101,8 @@ fn test_slot_subscription() {
|
|||||||
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
||||||
optimistically_confirmed_bank,
|
optimistically_confirmed_bank,
|
||||||
));
|
));
|
||||||
let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit);
|
let pubsub_service =
|
||||||
|
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr, &exit);
|
||||||
std::thread::sleep(Duration::from_millis(400));
|
std::thread::sleep(Duration::from_millis(400));
|
||||||
|
|
||||||
let (mut client, receiver) =
|
let (mut client, receiver) =
|
||||||
|
@ -20,6 +20,7 @@ use solana_core::{
|
|||||||
contact_info::ContactInfo,
|
contact_info::ContactInfo,
|
||||||
gossip_service::GossipService,
|
gossip_service::GossipService,
|
||||||
rpc::JsonRpcConfig,
|
rpc::JsonRpcConfig,
|
||||||
|
rpc_pubsub_service::PubSubConfig,
|
||||||
validator::{Validator, ValidatorConfig},
|
validator::{Validator, ValidatorConfig},
|
||||||
};
|
};
|
||||||
use solana_download_utils::{download_genesis_if_missing, download_snapshot};
|
use solana_download_utils::{download_genesis_if_missing, download_snapshot};
|
||||||
@ -786,6 +787,13 @@ pub fn main() {
|
|||||||
let default_dynamic_port_range =
|
let default_dynamic_port_range =
|
||||||
&format!("{}-{}", VALIDATOR_PORT_RANGE.0, VALIDATOR_PORT_RANGE.1);
|
&format!("{}-{}", VALIDATOR_PORT_RANGE.0, VALIDATOR_PORT_RANGE.1);
|
||||||
let default_genesis_archive_unpacked_size = &MAX_GENESIS_ARCHIVE_UNPACKED_SIZE.to_string();
|
let default_genesis_archive_unpacked_size = &MAX_GENESIS_ARCHIVE_UNPACKED_SIZE.to_string();
|
||||||
|
let default_rpc_pubsub_max_connections = PubSubConfig::default().max_connections.to_string();
|
||||||
|
let default_rpc_pubsub_max_fragment_size =
|
||||||
|
PubSubConfig::default().max_fragment_size.to_string();
|
||||||
|
let default_rpc_pubsub_max_in_buffer_capacity =
|
||||||
|
PubSubConfig::default().max_in_buffer_capacity.to_string();
|
||||||
|
let default_rpc_pubsub_max_out_buffer_capacity =
|
||||||
|
PubSubConfig::default().max_out_buffer_capacity.to_string();
|
||||||
|
|
||||||
let matches = App::new(crate_name!()).about(crate_description!())
|
let matches = App::new(crate_name!()).about(crate_description!())
|
||||||
.version(solana_version::version!())
|
.version(solana_version::version!())
|
||||||
@ -1186,6 +1194,45 @@ pub fn main() {
|
|||||||
.validator(solana_net_utils::is_host)
|
.validator(solana_net_utils::is_host)
|
||||||
.help("IP address to bind the RPC port [default: use --bind-address]"),
|
.help("IP address to bind the RPC port [default: use --bind-address]"),
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("rpc_pubsub_max_connections")
|
||||||
|
.long("rpc-pubsub-max-connections")
|
||||||
|
.value_name("NUMBER")
|
||||||
|
.takes_value(true)
|
||||||
|
.validator(is_parsable::<usize>)
|
||||||
|
.default_value(&default_rpc_pubsub_max_connections)
|
||||||
|
.help("The maximum number of connections that RPC PubSub will support. \
|
||||||
|
This is a hard limit and no new connections beyond this limit can \
|
||||||
|
be made until an old connection is dropped."),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("rpc_pubsub_max_fragment_size")
|
||||||
|
.long("rpc-pubsub-max-fragment-size")
|
||||||
|
.value_name("BYTES")
|
||||||
|
.takes_value(true)
|
||||||
|
.validator(is_parsable::<usize>)
|
||||||
|
.default_value(&default_rpc_pubsub_max_fragment_size)
|
||||||
|
.help("The maximum length in bytes of acceptable incoming frames. Messages longer \
|
||||||
|
than this will be rejected."),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("rpc_pubsub_max_in_buffer_capacity")
|
||||||
|
.long("rpc-pubsub-max-in-buffer-capacity")
|
||||||
|
.value_name("BYTES")
|
||||||
|
.takes_value(true)
|
||||||
|
.validator(is_parsable::<usize>)
|
||||||
|
.default_value(&default_rpc_pubsub_max_in_buffer_capacity)
|
||||||
|
.help("The maximum size in bytes to which the incoming websocket buffer can grow."),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("rpc_pubsub_max_out_buffer_capacity")
|
||||||
|
.long("rpc-pubsub-max-out-buffer-capacity")
|
||||||
|
.value_name("BYTES")
|
||||||
|
.takes_value(true)
|
||||||
|
.validator(is_parsable::<usize>)
|
||||||
|
.default_value(&default_rpc_pubsub_max_out_buffer_capacity)
|
||||||
|
.help("The maximum size in bytes to which the outgoing websocket buffer can grow."),
|
||||||
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch")
|
Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch")
|
||||||
.long("halt-on-trusted-validators-accounts-hash-mismatch")
|
.long("halt-on-trusted-validators-accounts-hash-mismatch")
|
||||||
@ -1312,7 +1359,6 @@ pub fn main() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
|
let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
|
||||||
|
|
||||||
let mut validator_config = ValidatorConfig {
|
let mut validator_config = ValidatorConfig {
|
||||||
require_tower: matches.is_present("require_tower"),
|
require_tower: matches.is_present("require_tower"),
|
||||||
dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(),
|
dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(),
|
||||||
@ -1349,6 +1395,20 @@ pub fn main() {
|
|||||||
SocketAddr::new(rpc_bind_address, rpc_port + 3),
|
SocketAddr::new(rpc_bind_address, rpc_port + 3),
|
||||||
)
|
)
|
||||||
}),
|
}),
|
||||||
|
pubsub_config: PubSubConfig {
|
||||||
|
max_connections: value_t_or_exit!(matches, "rpc_pubsub_max_connections", usize),
|
||||||
|
max_fragment_size: value_t_or_exit!(matches, "rpc_pubsub_max_fragment_size", usize),
|
||||||
|
max_in_buffer_capacity: value_t_or_exit!(
|
||||||
|
matches,
|
||||||
|
"rpc_pubsub_max_in_buffer_capacity",
|
||||||
|
usize
|
||||||
|
),
|
||||||
|
max_out_buffer_capacity: value_t_or_exit!(
|
||||||
|
matches,
|
||||||
|
"rpc_pubsub_max_out_buffer_capacity",
|
||||||
|
usize
|
||||||
|
),
|
||||||
|
},
|
||||||
voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
|
voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
|
||||||
wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
|
wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
|
||||||
trusted_validators,
|
trusted_validators,
|
||||||
|
Reference in New Issue
Block a user