* Detect and notify when deserializable shreds are available (#11816)
* Add logic to check for complete data ranges
* Add RPC signature notification
Co-authored-by: Carl <carl@solana.com>
(cherry picked from commit 1c1a3f979d
)
# Conflicts:
# accounts-bench/Cargo.toml
# core/src/rpc_pubsub.rs
* Fix conflicts
Co-authored-by: carllin <wumu727@gmail.com>
Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
@@ -10,9 +10,11 @@ use jsonrpc_pubsub::{
|
||||
use serde::Serialize;
|
||||
use solana_account_decoder::{parse_token::spl_token_id_v2_0, UiAccount, UiAccountEncoding};
|
||||
use solana_client::{
|
||||
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
|
||||
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig},
|
||||
rpc_filter::RpcFilterType,
|
||||
rpc_response::{Response, RpcKeyedAccount, RpcResponseContext, RpcSignatureResult},
|
||||
rpc_response::{
|
||||
ProcessedSignatureResult, Response, RpcKeyedAccount, RpcResponseContext, RpcSignatureResult,
|
||||
},
|
||||
};
|
||||
use solana_runtime::{
|
||||
bank::Bank,
|
||||
@@ -67,6 +69,7 @@ enum NotificationEntry {
|
||||
Frozen(Slot),
|
||||
Bank(CommitmentSlots),
|
||||
Gossip(Slot),
|
||||
SignaturesReceived((Slot, Vec<Signature>)),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for NotificationEntry {
|
||||
@@ -79,6 +82,9 @@ impl std::fmt::Debug for NotificationEntry {
|
||||
NotificationEntry::Bank(commitment_slots) => {
|
||||
write!(f, "Bank({{slot: {:?}}})", commitment_slots.slot)
|
||||
}
|
||||
NotificationEntry::SignaturesReceived(slot_signatures) => {
|
||||
write!(f, "SignaturesReceived({:?})", slot_signatures)
|
||||
}
|
||||
NotificationEntry::Gossip(slot) => write!(f, "Gossip({:?})", slot),
|
||||
}
|
||||
}
|
||||
@@ -108,7 +114,10 @@ type RpcProgramSubscriptions = RwLock<
|
||||
>,
|
||||
>;
|
||||
type RpcSignatureSubscriptions = RwLock<
|
||||
HashMap<Signature, HashMap<SubscriptionId, SubscriptionData<Response<RpcSignatureResult>, ()>>>,
|
||||
HashMap<
|
||||
Signature,
|
||||
HashMap<SubscriptionId, SubscriptionData<Response<RpcSignatureResult>, bool>>,
|
||||
>,
|
||||
>;
|
||||
type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
|
||||
type RpcVoteSubscriptions = RwLock<HashMap<SubscriptionId, Sink<RpcVote>>>;
|
||||
@@ -134,13 +143,11 @@ fn add_subscription<K, S, T>(
|
||||
last_notified_slot: RwLock::new(last_notified_slot),
|
||||
config,
|
||||
};
|
||||
if let Some(current_hashmap) = subscriptions.get_mut(&hashmap_key) {
|
||||
current_hashmap.insert(sub_id, subscription_data);
|
||||
return;
|
||||
}
|
||||
let mut hashmap = HashMap::new();
|
||||
hashmap.insert(sub_id, subscription_data);
|
||||
subscriptions.insert(hashmap_key, hashmap);
|
||||
|
||||
subscriptions
|
||||
.entry(hashmap_key)
|
||||
.or_default()
|
||||
.insert(sub_id, subscription_data);
|
||||
}
|
||||
|
||||
fn remove_subscription<K, S, T>(
|
||||
@@ -279,15 +286,15 @@ fn filter_signature_result(
|
||||
result: Option<transaction::Result<()>>,
|
||||
_signature: &Signature,
|
||||
last_notified_slot: Slot,
|
||||
_config: Option<()>,
|
||||
_config: Option<bool>,
|
||||
_bank: Option<Arc<Bank>>,
|
||||
) -> (Box<dyn Iterator<Item = RpcSignatureResult>>, Slot) {
|
||||
(
|
||||
Box::new(
|
||||
result
|
||||
.into_iter()
|
||||
.map(|result| RpcSignatureResult { err: result.err() }),
|
||||
),
|
||||
Box::new(result.into_iter().map(|result| {
|
||||
RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult {
|
||||
err: result.err(),
|
||||
})
|
||||
})),
|
||||
last_notified_slot,
|
||||
)
|
||||
}
|
||||
@@ -629,13 +636,18 @@ impl RpcSubscriptions {
|
||||
pub fn add_signature_subscription(
|
||||
&self,
|
||||
signature: Signature,
|
||||
commitment: Option<CommitmentConfig>,
|
||||
signature_subscribe_config: Option<RpcSignatureSubscribeConfig>,
|
||||
sub_id: SubscriptionId,
|
||||
subscriber: Subscriber<Response<RpcSignatureResult>>,
|
||||
) {
|
||||
let (commitment, enable_received_notification) = signature_subscribe_config
|
||||
.map(|config| (config.commitment, config.enable_received_notification))
|
||||
.unwrap_or((None, Some(false)));
|
||||
|
||||
let commitment_level = commitment
|
||||
.unwrap_or_else(CommitmentConfig::recent)
|
||||
.commitment;
|
||||
|
||||
let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip {
|
||||
self.subscriptions
|
||||
.gossip_signature_subscriptions
|
||||
@@ -651,7 +663,7 @@ impl RpcSubscriptions {
|
||||
sub_id,
|
||||
subscriber,
|
||||
0, // last_notified_slot is not utilized for signature subscriptions
|
||||
None,
|
||||
enable_received_notification,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -696,6 +708,10 @@ impl RpcSubscriptions {
|
||||
self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root }));
|
||||
}
|
||||
|
||||
pub fn notify_signatures_received(&self, slot_signatures: (Slot, Vec<Signature>)) {
|
||||
self.enqueue_notification(NotificationEntry::SignaturesReceived(slot_signatures));
|
||||
}
|
||||
|
||||
pub fn add_vote_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<RpcVote>) {
|
||||
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
||||
let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap();
|
||||
@@ -840,6 +856,13 @@ impl RpcSubscriptions {
|
||||
);
|
||||
}
|
||||
}
|
||||
NotificationEntry::SignaturesReceived(slot_signatures) => {
|
||||
RpcSubscriptions::process_signatures_received(
|
||||
&slot_signatures,
|
||||
&subscriptions.signature_subscriptions,
|
||||
¬ifier,
|
||||
)
|
||||
}
|
||||
},
|
||||
Err(RecvTimeoutError::Timeout) => {
|
||||
// not a problem - try reading again
|
||||
@@ -939,6 +962,40 @@ impl RpcSubscriptions {
|
||||
}
|
||||
}
|
||||
|
||||
fn process_signatures_received(
|
||||
(received_slot, signatures): &(Slot, Vec<Signature>),
|
||||
signature_subscriptions: &Arc<RpcSignatureSubscriptions>,
|
||||
notifier: &RpcNotifier,
|
||||
) {
|
||||
for signature in signatures {
|
||||
if let Some(hashmap) = signature_subscriptions.read().unwrap().get(signature) {
|
||||
for (
|
||||
_,
|
||||
SubscriptionData {
|
||||
sink,
|
||||
config: is_received_notification_enabled,
|
||||
..
|
||||
},
|
||||
) in hashmap.iter()
|
||||
{
|
||||
if is_received_notification_enabled
|
||||
.expect("All signature subscriptions must have this config field set")
|
||||
{
|
||||
notifier.notify(
|
||||
Response {
|
||||
context: RpcResponseContext {
|
||||
slot: *received_slot,
|
||||
},
|
||||
value: RpcSignatureResult::ReceivedSignature,
|
||||
},
|
||||
&sink,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn shutdown(&mut self) -> std::thread::Result<()> {
|
||||
if let Some(runtime) = self.notifier_runtime.take() {
|
||||
info!("RPC Notifier runtime - shutting down");
|
||||
@@ -1247,31 +1304,55 @@ pub(crate) mod tests {
|
||||
Subscriber::new_test("signatureNotification");
|
||||
let (processed_sub, _id_receiver, processed_recv) =
|
||||
Subscriber::new_test("signatureNotification");
|
||||
let (processed_sub3, _id_receiver, processed_recv3) =
|
||||
Subscriber::new_test("signatureNotification");
|
||||
|
||||
subscriptions.add_signature_subscription(
|
||||
past_bank_tx.signatures[0],
|
||||
Some(CommitmentConfig::recent()),
|
||||
Some(RpcSignatureSubscribeConfig {
|
||||
commitment: Some(CommitmentConfig::recent()),
|
||||
enable_received_notification: Some(false),
|
||||
}),
|
||||
SubscriptionId::Number(1 as u64),
|
||||
past_bank_sub1,
|
||||
);
|
||||
subscriptions.add_signature_subscription(
|
||||
past_bank_tx.signatures[0],
|
||||
Some(CommitmentConfig::root()),
|
||||
Some(RpcSignatureSubscribeConfig {
|
||||
commitment: Some(CommitmentConfig::root()),
|
||||
enable_received_notification: Some(false),
|
||||
}),
|
||||
SubscriptionId::Number(2 as u64),
|
||||
past_bank_sub2,
|
||||
);
|
||||
subscriptions.add_signature_subscription(
|
||||
processed_tx.signatures[0],
|
||||
Some(CommitmentConfig::recent()),
|
||||
Some(RpcSignatureSubscribeConfig {
|
||||
commitment: Some(CommitmentConfig::recent()),
|
||||
enable_received_notification: Some(false),
|
||||
}),
|
||||
SubscriptionId::Number(3 as u64),
|
||||
processed_sub,
|
||||
);
|
||||
subscriptions.add_signature_subscription(
|
||||
unprocessed_tx.signatures[0],
|
||||
Some(CommitmentConfig::recent()),
|
||||
Some(RpcSignatureSubscribeConfig {
|
||||
commitment: Some(CommitmentConfig::recent()),
|
||||
enable_received_notification: Some(false),
|
||||
}),
|
||||
SubscriptionId::Number(4 as u64),
|
||||
Subscriber::new_test("signatureNotification").0,
|
||||
);
|
||||
// Add a subscription that gets `received` notifications
|
||||
subscriptions.add_signature_subscription(
|
||||
unprocessed_tx.signatures[0],
|
||||
Some(RpcSignatureSubscribeConfig {
|
||||
commitment: Some(CommitmentConfig::recent()),
|
||||
enable_received_notification: Some(true),
|
||||
}),
|
||||
SubscriptionId::Number(5 as u64),
|
||||
processed_sub3,
|
||||
);
|
||||
|
||||
{
|
||||
let sig_subs = subscriptions
|
||||
@@ -1284,46 +1365,62 @@ pub(crate) mod tests {
|
||||
assert!(sig_subs.contains_key(&processed_tx.signatures[0]));
|
||||
}
|
||||
let mut commitment_slots = CommitmentSlots::default();
|
||||
commitment_slots.slot = 1;
|
||||
let received_slot = 1;
|
||||
commitment_slots.slot = received_slot;
|
||||
subscriptions
|
||||
.notify_signatures_received((received_slot, vec![unprocessed_tx.signatures[0]]));
|
||||
subscriptions.notify_subscribers(commitment_slots);
|
||||
let expected_res = RpcSignatureResult { err: None };
|
||||
|
||||
let expected_res =
|
||||
RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { err: None });
|
||||
let received_expected_res = RpcSignatureResult::ReceivedSignature;
|
||||
struct Notification {
|
||||
slot: Slot,
|
||||
id: u64,
|
||||
}
|
||||
|
||||
let expected_notification = |exp: Notification| -> String {
|
||||
let json = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "signatureNotification",
|
||||
"params": {
|
||||
"result": {
|
||||
"context": { "slot": exp.slot },
|
||||
"value": &expected_res,
|
||||
},
|
||||
"subscription": exp.id,
|
||||
}
|
||||
});
|
||||
serde_json::to_string(&json).unwrap()
|
||||
};
|
||||
let expected_notification =
|
||||
|exp: Notification, expected_res: &RpcSignatureResult| -> String {
|
||||
let json = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "signatureNotification",
|
||||
"params": {
|
||||
"result": {
|
||||
"context": { "slot": exp.slot },
|
||||
"value": expected_res,
|
||||
},
|
||||
"subscription": exp.id,
|
||||
}
|
||||
});
|
||||
serde_json::to_string(&json).unwrap()
|
||||
};
|
||||
|
||||
// Expect to receive a notification from bank 1 because this subscription is
|
||||
// looking for 0 confirmations and so checks the current bank
|
||||
let expected = expected_notification(Notification { slot: 1, id: 1 });
|
||||
let expected = expected_notification(Notification { slot: 1, id: 1 }, &expected_res);
|
||||
let (response, _) = robust_poll_or_panic(past_bank_recv1);
|
||||
assert_eq!(expected, response);
|
||||
|
||||
// Expect to receive a notification from bank 0 because this subscription is
|
||||
// looking for 1 confirmation and so checks the past bank
|
||||
let expected = expected_notification(Notification { slot: 0, id: 2 });
|
||||
let expected = expected_notification(Notification { slot: 0, id: 2 }, &expected_res);
|
||||
let (response, _) = robust_poll_or_panic(past_bank_recv2);
|
||||
assert_eq!(expected, response);
|
||||
|
||||
let expected = expected_notification(Notification { slot: 1, id: 3 });
|
||||
let expected = expected_notification(Notification { slot: 1, id: 3 }, &expected_res);
|
||||
let (response, _) = robust_poll_or_panic(processed_recv);
|
||||
assert_eq!(expected, response);
|
||||
|
||||
// Expect a "received" notification
|
||||
let expected = expected_notification(
|
||||
Notification {
|
||||
slot: received_slot,
|
||||
id: 5,
|
||||
},
|
||||
&received_expected_res,
|
||||
);
|
||||
let (response, _) = robust_poll_or_panic(processed_recv3);
|
||||
assert_eq!(expected, response);
|
||||
|
||||
// Subscription should be automatically removed after notification
|
||||
let sig_subs = subscriptions
|
||||
.subscriptions
|
||||
@@ -1336,7 +1433,7 @@ pub(crate) mod tests {
|
||||
// Unprocessed signature subscription should not be removed
|
||||
assert_eq!(
|
||||
sig_subs.get(&unprocessed_tx.signatures[0]).unwrap().len(),
|
||||
1
|
||||
2
|
||||
);
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user