, T>>>,
- hashmap_key: &K,
+fn check_commitment_and_notify(
+ params: &P,
+ subscription: &SubscriptionInfo,
bank_forks: &Arc>,
commitment_slots: &CommitmentSlots,
bank_method: B,
filter_results: F,
- notifier: &RpcNotifier,
-) -> HashSet
+ notifier: &mut RpcNotifier,
+ is_final: bool,
+) -> bool
where
- K: Eq + Hash + Clone + Copy,
S: Clone + Serialize,
- B: Fn(&Bank, &K) -> X,
- F: Fn(X, &K, Slot, Option, Arc) -> (Box>, Slot),
+ B: Fn(&Bank, &P) -> X,
+ F: Fn(X, &P, Slot, Arc) -> (Box>, Slot),
X: Clone + Default,
- T: Clone,
{
- let mut notified_set: HashSet = HashSet::new();
- if let Some(hashmap) = subscriptions.get(hashmap_key) {
- for (
- sub_id,
- SubscriptionData {
- sink,
- commitment,
- last_notified_slot,
- config,
- },
- ) in hashmap.iter()
- {
- let slot = if commitment.is_finalized() {
- commitment_slots.highest_confirmed_root
- } else if commitment.is_confirmed() {
- commitment_slots.highest_confirmed_slot
- } else {
- commitment_slots.slot
- };
+ let commitment = if let Some(commitment) = subscription.commitment() {
+ commitment
+ } else {
+ error!("missing commitment in check_commitment_and_notify");
+ return false;
+ };
+ let slot = if commitment.is_finalized() {
+ commitment_slots.highest_confirmed_root
+ } else if commitment.is_confirmed() {
+ commitment_slots.highest_confirmed_slot
+ } else {
+ commitment_slots.slot
+ };
- if let Some(bank) = bank_forks.read().unwrap().get(slot).cloned() {
- let results = bank_method(&bank, hashmap_key);
- let mut w_last_notified_slot = last_notified_slot.write().unwrap();
- let (filter_results, result_slot) = filter_results(
- results,
- hashmap_key,
- *w_last_notified_slot,
- config.as_ref().cloned(),
- bank,
- );
- for result in filter_results {
- notifier.notify(
- Response {
- context: RpcResponseContext { slot },
- value: result,
- },
- sink,
- );
- notified_set.insert(sub_id.clone());
- *w_last_notified_slot = result_slot;
- }
- }
+ let mut notified = false;
+ if let Some(bank) = bank_forks.read().unwrap().get(slot).cloned() {
+ let results = bank_method(&bank, params);
+ let mut w_last_notified_slot = subscription.last_notified_slot.write().unwrap();
+ let (filter_results, result_slot) =
+ filter_results(results, params, *w_last_notified_slot, bank);
+ for result in filter_results {
+ notifier.notify(
+ Response {
+ context: RpcResponseContext { slot },
+ value: result,
+ },
+ subscription,
+ is_final,
+ );
+ *w_last_notified_slot = result_slot;
+ notified = true;
}
}
- notified_set
+ notified
}
-struct RpcNotifier;
+#[derive(Debug, Clone)]
+pub struct RpcNotification {
+ pub subscription_id: SubscriptionId,
+ pub is_final: bool,
+ pub json: Weak,
+}
+
+struct RecentItems {
+ queue: VecDeque>,
+ total_bytes: usize,
+ max_len: usize,
+ max_total_bytes: usize,
+}
+
+impl RecentItems {
+ fn new(max_len: usize, max_total_bytes: usize) -> Self {
+ Self {
+ queue: VecDeque::new(),
+ total_bytes: 0,
+ max_len,
+ max_total_bytes,
+ }
+ }
+
+ fn push(&mut self, item: Arc) {
+ self.total_bytes = self
+ .total_bytes
+ .checked_add(item.len())
+ .expect("total bytes overflow");
+ self.queue.push_back(item);
+
+ while self.total_bytes > self.max_total_bytes || self.queue.len() > self.max_len {
+ let item = self.queue.pop_front().expect("can't be empty");
+ self.total_bytes = self
+ .total_bytes
+ .checked_sub(item.len())
+ .expect("total bytes underflow");
+ }
+ }
+}
+
+struct RpcNotifier {
+ sender: broadcast::Sender,
+ buf: Vec,
+ recent_items: RecentItems,
+}
+
+#[derive(Debug, Serialize)]
+struct NotificationParams {
+ result: T,
+ subscription: SubscriptionId,
+}
+
+#[derive(Debug, Serialize)]
+struct Notification {
+ jsonrpc: Option,
+ method: &'static str,
+ params: NotificationParams,
+}
impl RpcNotifier {
- fn notify(&self, value: T, sink: &Sink)
+ fn notify(&mut self, value: T, subscription: &SubscriptionInfo, is_final: bool)
where
T: serde::Serialize,
{
- let _ = sink.notify(Ok(value));
+ self.buf.clear();
+ let notification = Notification {
+ jsonrpc: Some(jsonrpc_core::Version::V2),
+ method: subscription.method(),
+ params: NotificationParams {
+ result: value,
+ subscription: subscription.id(),
+ },
+ };
+ serde_json::to_writer(Cursor::new(&mut self.buf), ¬ification)
+ .expect("serialization never fails");
+ let buf_str = str::from_utf8(&self.buf).expect("json is always utf-8");
+ let buf_arc = Arc::new(String::from(buf_str));
+
+ let notification = RpcNotification {
+ subscription_id: subscription.id(),
+ json: Arc::downgrade(&buf_arc),
+ is_final,
+ };
+ // There is an unlikely case where this can fail: if the last subscription is closed
+ // just as the notifier generates a notification for it.
+ let _ = self.sender.send(notification);
+
+ inc_new_counter_info!("rpc-pubsub-messages", 1);
+ inc_new_counter_info!("rpc-pubsub-bytes", buf_arc.len());
+
+ self.recent_items.push(buf_arc);
}
}
fn filter_account_result(
result: Option<(AccountSharedData, Slot)>,
- pubkey: &Pubkey,
+ params: &AccountSubscriptionParams,
last_notified_slot: Slot,
- encoding: Option,
bank: Arc,
) -> (Box>, Slot) {
// If the account is not found, `last_modified_slot` will default to zero and
@@ -289,12 +281,20 @@ fn filter_account_result(
// If last_modified_slot < last_notified_slot this means that we last notified for a fork
// and should notify that the account state has been reverted.
let results: Box> = if last_modified_slot != last_notified_slot {
- let encoding = encoding.unwrap_or(UiAccountEncoding::Binary);
- if account.owner == spl_token_id_v2_0() && encoding == UiAccountEncoding::JsonParsed {
- Box::new(iter::once(get_parsed_token_account(bank, pubkey, account)))
+ if account.owner == spl_token_id_v2_0() && params.encoding == UiAccountEncoding::JsonParsed
+ {
+ Box::new(iter::once(get_parsed_token_account(
+ bank,
+ ¶ms.pubkey,
+ account,
+ )))
} else {
Box::new(iter::once(UiAccount::encode(
- pubkey, &account, encoding, None, None,
+ ¶ms.pubkey,
+ &account,
+ params.encoding,
+ None,
+ None,
)))
}
} else {
@@ -306,9 +306,8 @@ fn filter_account_result(
fn filter_signature_result(
result: Option>,
- _signature: &Signature,
+ _params: &SignatureSubscriptionParams,
last_notified_slot: Slot,
- _config: Option,
_bank: Arc,
) -> (Box>, Slot) {
(
@@ -321,23 +320,22 @@ fn filter_signature_result(
fn filter_program_results(
accounts: Vec<(Pubkey, AccountSharedData)>,
- program_id: &Pubkey,
+ params: &ProgramSubscriptionParams,
last_notified_slot: Slot,
- config: Option,
bank: Arc,
) -> (Box>, Slot) {
- let config = config.unwrap_or_default();
- let encoding = config.encoding.unwrap_or(UiAccountEncoding::Binary);
- let filters = config.filters;
let accounts_is_empty = accounts.is_empty();
+ let encoding = params.encoding;
+ let filters = params.filters.clone();
let keyed_accounts = accounts.into_iter().filter(move |(_, account)| {
filters.iter().all(|filter_type| match filter_type {
RpcFilterType::DataSize(size) => account.data().len() as u64 == *size,
RpcFilterType::Memcmp(compare) => compare.bytes_match(&account.data()),
})
});
- let accounts: Box