Optimize RPC pubsub for multiple clients with the same subscription (backport #18943) (#19987)

* Optimize RPC pubsub for multiple clients with the same subscription (#18943)

* reimplement rpc pubsub with a broadcast queue

* update tests for new pubsub implementation

* fix: fix review suggestions

* chore(rpc): add additional pubsub metrics

* integrate max subscriptions check into SubscriptionTracker to reduce locking

* separate subscription control from tracker

* limit memory usage of items in pubsub broadcast queue, improve error handling

* add more pubsub metrics

* add final count metrics to pubsub

* add metric for total number of subscriptions

* fix small review suggestions

* remove by_params from SubscriptionTracker and add node_progress_watchers map instead

* add subscription tracker tests

* add metrics for number of pubsub notifications as a counter

* ignore clippy lint in TokenCounter

* fix underflow in token counter

* reduce queue capacity in pubsub tests

* fix(rpc): fix test timeouts

* fix race in account subscription test

* Add RpcSubscriptions::new_for_tests

Co-authored-by: Pavel Strakhov <p.strakhov@iconic.vc>
Co-authored-by: Nikita Podoliako <n.podoliako@zubr.io>
Co-authored-by: Tyera Eulberg <tyera@solana.com>
(cherry picked from commit 65227f44dc)

# Conflicts:
#	Cargo.lock
#	core/Cargo.toml
#	core/src/replay_stage.rs
#	core/src/validator.rs
#	replica-node/src/replica_node.rs
#	rpc/Cargo.toml

* Fix conflicts (and standardize naming to make future subscription backports easier

Co-authored-by: Pavel Strakhov <ri@idzaaus.org>
Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
mergify[bot]
2021-09-20 06:00:08 +00:00
committed by GitHub
parent 07b71329a7
commit 03da3eaa81
22 changed files with 2936 additions and 2098 deletions

View File

@@ -1029,15 +1029,13 @@ pub fn main() {
&format!("{}-{}", VALIDATOR_PORT_RANGE.0, VALIDATOR_PORT_RANGE.1);
let default_genesis_archive_unpacked_size = &MAX_GENESIS_ARCHIVE_UNPACKED_SIZE.to_string();
let default_rpc_max_multiple_accounts = &MAX_MULTIPLE_ACCOUNTS.to_string();
let default_rpc_pubsub_max_connections = PubSubConfig::default().max_connections.to_string();
let default_rpc_pubsub_max_fragment_size =
PubSubConfig::default().max_fragment_size.to_string();
let default_rpc_pubsub_max_in_buffer_capacity =
PubSubConfig::default().max_in_buffer_capacity.to_string();
let default_rpc_pubsub_max_out_buffer_capacity =
PubSubConfig::default().max_out_buffer_capacity.to_string();
let default_rpc_pubsub_max_active_subscriptions =
PubSubConfig::default().max_active_subscriptions.to_string();
let default_rpc_pubsub_queue_capacity_items =
PubSubConfig::default().queue_capacity_items.to_string();
let default_rpc_pubsub_queue_capacity_bytes =
PubSubConfig::default().queue_capacity_bytes.to_string();
let default_rpc_send_transaction_retry_ms = ValidatorConfig::default()
.send_transaction_retry_ms
.to_string();
@@ -1614,10 +1612,10 @@ pub fn main() {
.value_name("NUMBER")
.takes_value(true)
.validator(is_parsable::<usize>)
.default_value(&default_rpc_pubsub_max_connections)
.hidden(true)
.help("The maximum number of connections that RPC PubSub will support. \
This is a hard limit and no new connections beyond this limit can \
be made until an old connection is dropped."),
be made until an old connection is dropped. (Obsolete)"),
)
.arg(
Arg::with_name("rpc_pubsub_max_fragment_size")
@@ -1625,9 +1623,9 @@ pub fn main() {
.value_name("BYTES")
.takes_value(true)
.validator(is_parsable::<usize>)
.default_value(&default_rpc_pubsub_max_fragment_size)
.hidden(true)
.help("The maximum length in bytes of acceptable incoming frames. Messages longer \
than this will be rejected."),
than this will be rejected. (Obsolete)"),
)
.arg(
Arg::with_name("rpc_pubsub_max_in_buffer_capacity")
@@ -1635,8 +1633,9 @@ pub fn main() {
.value_name("BYTES")
.takes_value(true)
.validator(is_parsable::<usize>)
.default_value(&default_rpc_pubsub_max_in_buffer_capacity)
.help("The maximum size in bytes to which the incoming websocket buffer can grow."),
.hidden(true)
.help("The maximum size in bytes to which the incoming websocket buffer can grow. \
(Obsolete)"),
)
.arg(
Arg::with_name("rpc_pubsub_max_out_buffer_capacity")
@@ -1644,8 +1643,9 @@ pub fn main() {
.value_name("BYTES")
.takes_value(true)
.validator(is_parsable::<usize>)
.default_value(&default_rpc_pubsub_max_out_buffer_capacity)
.help("The maximum size in bytes to which the outgoing websocket buffer can grow."),
.hidden(true)
.help("The maximum size in bytes to which the outgoing websocket buffer can grow. \
(Obsolete)"),
)
.arg(
Arg::with_name("rpc_pubsub_max_active_subscriptions")
@@ -1657,6 +1657,26 @@ pub fn main() {
.help("The maximum number of active subscriptions that RPC PubSub will accept \
across all connections."),
)
.arg(
Arg::with_name("rpc_pubsub_queue_capacity_items")
.long("rpc-pubsub-queue-capacity-items")
.takes_value(true)
.value_name("NUMBER")
.validator(is_parsable::<usize>)
.default_value(&default_rpc_pubsub_queue_capacity_items)
.help("The maximum number of notifications that RPC PubSub will store \
across all connections."),
)
.arg(
Arg::with_name("rpc_pubsub_queue_capacity_bytes")
.long("rpc-pubsub-queue-capacity-bytes")
.takes_value(true)
.value_name("BYTES")
.validator(is_parsable::<usize>)
.default_value(&default_rpc_pubsub_queue_capacity_bytes)
.help("The maximum total size of notifications that RPC PubSub will store \
across all connections."),
)
.arg(
Arg::with_name("rpc_send_transaction_retry_ms")
.long("rpc-send-retry-ms")
@@ -2282,23 +2302,21 @@ pub fn main() {
}),
pubsub_config: PubSubConfig {
enable_vote_subscription: matches.is_present("rpc_pubsub_enable_vote_subscription"),
max_connections: value_t_or_exit!(matches, "rpc_pubsub_max_connections", usize),
max_fragment_size: value_t_or_exit!(matches, "rpc_pubsub_max_fragment_size", usize),
max_in_buffer_capacity: value_t_or_exit!(
matches,
"rpc_pubsub_max_in_buffer_capacity",
usize
),
max_out_buffer_capacity: value_t_or_exit!(
matches,
"rpc_pubsub_max_out_buffer_capacity",
usize
),
max_active_subscriptions: value_t_or_exit!(
matches,
"rpc_pubsub_max_active_subscriptions",
usize
),
queue_capacity_items: value_t_or_exit!(
matches,
"rpc_pubsub_queue_capacity_items",
usize
),
queue_capacity_bytes: value_t_or_exit!(
matches,
"rpc_pubsub_queue_capacity_bytes",
usize
),
},
voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),