Clean up default commitment handling for subscriptions
This commit is contained in:
@ -1,4 +1,7 @@
|
|||||||
use crate::rpc_response::{Response as RpcResponse, RpcSignatureResult, SlotInfo};
|
use crate::{
|
||||||
|
rpc_config::RpcSignatureSubscribeConfig,
|
||||||
|
rpc_response::{Response as RpcResponse, RpcSignatureResult, SlotInfo},
|
||||||
|
};
|
||||||
use log::*;
|
use log::*;
|
||||||
use serde::de::DeserializeOwned;
|
use serde::de::DeserializeOwned;
|
||||||
use serde_json::{
|
use serde_json::{
|
||||||
@ -205,6 +208,7 @@ impl PubsubClient {
|
|||||||
pub fn signature_subscribe(
|
pub fn signature_subscribe(
|
||||||
url: &str,
|
url: &str,
|
||||||
signature: &Signature,
|
signature: &Signature,
|
||||||
|
config: Option<RpcSignatureSubscribeConfig>,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
(
|
(
|
||||||
PubsubSignatureResponse,
|
PubsubSignatureResponse,
|
||||||
@ -226,7 +230,7 @@ impl PubsubClient {
|
|||||||
"method":format!("{}Subscribe", SIGNATURE_OPERATION),
|
"method":format!("{}Subscribe", SIGNATURE_OPERATION),
|
||||||
"params":[
|
"params":[
|
||||||
signature.to_string(),
|
signature.to_string(),
|
||||||
{"enableReceivedNotification": true }
|
config
|
||||||
]
|
]
|
||||||
})
|
})
|
||||||
.to_string();
|
.to_string();
|
||||||
|
@ -442,7 +442,15 @@ mod tests {
|
|||||||
|
|
||||||
let session = create_session();
|
let session = create_session();
|
||||||
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("signatureNotification");
|
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("signatureNotification");
|
||||||
rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string(), None);
|
rpc.signature_subscribe(
|
||||||
|
session,
|
||||||
|
subscriber,
|
||||||
|
tx.signatures[0].to_string(),
|
||||||
|
Some(RpcSignatureSubscribeConfig {
|
||||||
|
commitment: Some(CommitmentConfig::single()),
|
||||||
|
..RpcSignatureSubscribeConfig::default()
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions, 0).unwrap();
|
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions, 0).unwrap();
|
||||||
|
|
||||||
@ -472,7 +480,7 @@ mod tests {
|
|||||||
subscriber,
|
subscriber,
|
||||||
tx.signatures[0].to_string(),
|
tx.signatures[0].to_string(),
|
||||||
Some(RpcSignatureSubscribeConfig {
|
Some(RpcSignatureSubscribeConfig {
|
||||||
commitment: None,
|
commitment: Some(CommitmentConfig::single()),
|
||||||
enable_received_notification: Some(true),
|
enable_received_notification: Some(true),
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
@ -122,7 +122,7 @@ type RpcRootSubscriptions = RwLock<HashMap<SubscriptionId, Sink<Slot>>>;
|
|||||||
fn add_subscription<K, S, T>(
|
fn add_subscription<K, S, T>(
|
||||||
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, SubscriptionData<S, T>>>,
|
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, SubscriptionData<S, T>>>,
|
||||||
hashmap_key: K,
|
hashmap_key: K,
|
||||||
commitment: Option<CommitmentConfig>,
|
commitment: CommitmentConfig,
|
||||||
sub_id: SubscriptionId,
|
sub_id: SubscriptionId,
|
||||||
subscriber: Subscriber<S>,
|
subscriber: Subscriber<S>,
|
||||||
last_notified_slot: Slot,
|
last_notified_slot: Slot,
|
||||||
@ -132,7 +132,6 @@ fn add_subscription<K, S, T>(
|
|||||||
S: Clone,
|
S: Clone,
|
||||||
{
|
{
|
||||||
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
||||||
let commitment = commitment.unwrap_or_else(CommitmentConfig::single);
|
|
||||||
let subscription_data = SubscriptionData {
|
let subscription_data = SubscriptionData {
|
||||||
sink,
|
sink,
|
||||||
commitment,
|
commitment,
|
||||||
@ -528,11 +527,11 @@ impl RpcSubscriptions {
|
|||||||
subscriber: Subscriber<Response<UiAccount>>,
|
subscriber: Subscriber<Response<UiAccount>>,
|
||||||
) {
|
) {
|
||||||
let config = config.unwrap_or_default();
|
let config = config.unwrap_or_default();
|
||||||
let commitment_level = config
|
let commitment = config
|
||||||
.commitment
|
.commitment
|
||||||
.unwrap_or_else(CommitmentConfig::single)
|
.unwrap_or_else(CommitmentConfig::single_gossip);
|
||||||
.commitment;
|
|
||||||
let slot = match commitment_level {
|
let slot = match commitment.commitment {
|
||||||
CommitmentLevel::Max => self
|
CommitmentLevel::Max => self
|
||||||
.block_commitment_cache
|
.block_commitment_cache
|
||||||
.read()
|
.read()
|
||||||
@ -564,7 +563,7 @@ impl RpcSubscriptions {
|
|||||||
0
|
0
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip {
|
let mut subscriptions = if commitment.commitment == CommitmentLevel::SingleGossip {
|
||||||
self.subscriptions
|
self.subscriptions
|
||||||
.gossip_account_subscriptions
|
.gossip_account_subscriptions
|
||||||
.write()
|
.write()
|
||||||
@ -572,10 +571,11 @@ impl RpcSubscriptions {
|
|||||||
} else {
|
} else {
|
||||||
self.subscriptions.account_subscriptions.write().unwrap()
|
self.subscriptions.account_subscriptions.write().unwrap()
|
||||||
};
|
};
|
||||||
|
|
||||||
add_subscription(
|
add_subscription(
|
||||||
&mut subscriptions,
|
&mut subscriptions,
|
||||||
pubkey,
|
pubkey,
|
||||||
config.commitment,
|
commitment,
|
||||||
sub_id,
|
sub_id,
|
||||||
subscriber,
|
subscriber,
|
||||||
last_notified_slot,
|
last_notified_slot,
|
||||||
@ -605,12 +605,12 @@ impl RpcSubscriptions {
|
|||||||
subscriber: Subscriber<Response<RpcKeyedAccount>>,
|
subscriber: Subscriber<Response<RpcKeyedAccount>>,
|
||||||
) {
|
) {
|
||||||
let config = config.unwrap_or_default();
|
let config = config.unwrap_or_default();
|
||||||
let commitment_level = config
|
let commitment = config
|
||||||
.account_config
|
.account_config
|
||||||
.commitment
|
.commitment
|
||||||
.unwrap_or_else(CommitmentConfig::recent)
|
.unwrap_or_else(CommitmentConfig::single_gossip);
|
||||||
.commitment;
|
|
||||||
let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip {
|
let mut subscriptions = if commitment.commitment == CommitmentLevel::SingleGossip {
|
||||||
self.subscriptions
|
self.subscriptions
|
||||||
.gossip_program_subscriptions
|
.gossip_program_subscriptions
|
||||||
.write()
|
.write()
|
||||||
@ -618,10 +618,11 @@ impl RpcSubscriptions {
|
|||||||
} else {
|
} else {
|
||||||
self.subscriptions.program_subscriptions.write().unwrap()
|
self.subscriptions.program_subscriptions.write().unwrap()
|
||||||
};
|
};
|
||||||
|
|
||||||
add_subscription(
|
add_subscription(
|
||||||
&mut subscriptions,
|
&mut subscriptions,
|
||||||
program_id,
|
program_id,
|
||||||
config.account_config.commitment,
|
commitment,
|
||||||
sub_id,
|
sub_id,
|
||||||
subscriber,
|
subscriber,
|
||||||
0, // last_notified_slot is not utilized for program subscriptions
|
0, // last_notified_slot is not utilized for program subscriptions
|
||||||
@ -657,11 +658,9 @@ impl RpcSubscriptions {
|
|||||||
.map(|config| (config.commitment, config.enable_received_notification))
|
.map(|config| (config.commitment, config.enable_received_notification))
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let commitment_level = commitment
|
let commitment = commitment.unwrap_or_else(CommitmentConfig::single_gossip);
|
||||||
.unwrap_or_else(CommitmentConfig::recent)
|
|
||||||
.commitment;
|
|
||||||
|
|
||||||
let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip {
|
let mut subscriptions = if commitment.commitment == CommitmentLevel::SingleGossip {
|
||||||
self.subscriptions
|
self.subscriptions
|
||||||
.gossip_signature_subscriptions
|
.gossip_signature_subscriptions
|
||||||
.write()
|
.write()
|
||||||
@ -669,6 +668,7 @@ impl RpcSubscriptions {
|
|||||||
} else {
|
} else {
|
||||||
self.subscriptions.signature_subscriptions.write().unwrap()
|
self.subscriptions.signature_subscriptions.write().unwrap()
|
||||||
};
|
};
|
||||||
|
|
||||||
add_subscription(
|
add_subscription(
|
||||||
&mut subscriptions,
|
&mut subscriptions,
|
||||||
signature,
|
signature,
|
||||||
@ -1286,7 +1286,13 @@ pub(crate) mod tests {
|
|||||||
);
|
);
|
||||||
subscriptions.add_program_subscription(
|
subscriptions.add_program_subscription(
|
||||||
solana_stake_program::id(),
|
solana_stake_program::id(),
|
||||||
None,
|
Some(RpcProgramAccountsConfig {
|
||||||
|
account_config: RpcAccountInfoConfig {
|
||||||
|
commitment: Some(CommitmentConfig::recent()),
|
||||||
|
..RpcAccountInfoConfig::default()
|
||||||
|
},
|
||||||
|
..RpcProgramAccountsConfig::default()
|
||||||
|
}),
|
||||||
sub_id.clone(),
|
sub_id.clone(),
|
||||||
subscriber,
|
subscriber,
|
||||||
);
|
);
|
||||||
@ -1645,13 +1651,22 @@ pub(crate) mod tests {
|
|||||||
fn test_add_and_remove_subscription() {
|
fn test_add_and_remove_subscription() {
|
||||||
let mut subscriptions: HashMap<u64, HashMap<SubscriptionId, SubscriptionData<(), ()>>> =
|
let mut subscriptions: HashMap<u64, HashMap<SubscriptionId, SubscriptionData<(), ()>>> =
|
||||||
HashMap::new();
|
HashMap::new();
|
||||||
|
let commitment = CommitmentConfig::single_gossip();
|
||||||
|
|
||||||
let num_keys = 5;
|
let num_keys = 5;
|
||||||
for key in 0..num_keys {
|
for key in 0..num_keys {
|
||||||
let (subscriber, _id_receiver, _transport_receiver) =
|
let (subscriber, _id_receiver, _transport_receiver) =
|
||||||
Subscriber::new_test("notification");
|
Subscriber::new_test("notification");
|
||||||
let sub_id = SubscriptionId::Number(key);
|
let sub_id = SubscriptionId::Number(key);
|
||||||
add_subscription(&mut subscriptions, key, None, sub_id, subscriber, 0, None);
|
add_subscription(
|
||||||
|
&mut subscriptions,
|
||||||
|
key,
|
||||||
|
commitment,
|
||||||
|
sub_id,
|
||||||
|
subscriber,
|
||||||
|
0,
|
||||||
|
None,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add another subscription to the "0" key
|
// Add another subscription to the "0" key
|
||||||
@ -1660,7 +1675,7 @@ pub(crate) mod tests {
|
|||||||
add_subscription(
|
add_subscription(
|
||||||
&mut subscriptions,
|
&mut subscriptions,
|
||||||
0,
|
0,
|
||||||
None,
|
commitment,
|
||||||
extra_sub_id.clone(),
|
extra_sub_id.clone(),
|
||||||
subscriber,
|
subscriber,
|
||||||
0,
|
0,
|
||||||
|
@ -2843,7 +2843,7 @@ After connecting to the RPC PubSub websocket at `ws://<ADDRESS>/`:
|
|||||||
|
|
||||||
- Submit subscription requests to the websocket using the methods below
|
- Submit subscription requests to the websocket using the methods below
|
||||||
- Multiple subscriptions may be active at once
|
- Multiple subscriptions may be active at once
|
||||||
- Many subscriptions take the optional [`commitment` parameter](jsonrpc-api.md#configuring-state-commitment), defining how finalized a change should be to trigger a notification. For subscriptions, if commitment is unspecified, the default value is `"single"`.
|
- Many subscriptions take the optional [`commitment` parameter](jsonrpc-api.md#configuring-state-commitment), defining how finalized a change should be to trigger a notification. For subscriptions, if commitment is unspecified, the default value is `"singleGossip"`.
|
||||||
|
|
||||||
### accountSubscribe
|
### accountSubscribe
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@ use serial_test_derive::serial;
|
|||||||
use solana_client::{
|
use solana_client::{
|
||||||
pubsub_client::PubsubClient,
|
pubsub_client::PubsubClient,
|
||||||
rpc_client::RpcClient,
|
rpc_client::RpcClient,
|
||||||
rpc_config::RpcProgramAccountsConfig,
|
rpc_config::{RpcProgramAccountsConfig, RpcSignatureSubscribeConfig},
|
||||||
rpc_response::RpcSignatureResult,
|
rpc_response::RpcSignatureResult,
|
||||||
thin_client::{create_client, ThinClient},
|
thin_client::{create_client, ThinClient},
|
||||||
};
|
};
|
||||||
@ -179,6 +179,10 @@ fn test_local_cluster_signature_subscribe() {
|
|||||||
let (mut sig_subscribe_client, receiver) = PubsubClient::signature_subscribe(
|
let (mut sig_subscribe_client, receiver) = PubsubClient::signature_subscribe(
|
||||||
&format!("ws://{}", &non_bootstrap_info.rpc_pubsub.to_string()),
|
&format!("ws://{}", &non_bootstrap_info.rpc_pubsub.to_string()),
|
||||||
&transaction.signatures[0],
|
&transaction.signatures[0],
|
||||||
|
Some(RpcSignatureSubscribeConfig {
|
||||||
|
commitment: Some(CommitmentConfig::recent()),
|
||||||
|
enable_received_notification: Some(true),
|
||||||
|
}),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user