Add encoding and filters parameters to rpc Subscriptions (#11065)

* Plumb account configs and enable encoding

* Enable filters for pubsub program accounts

* Update docs
This commit is contained in:
Tyera Eulberg
2020-07-23 13:38:28 -06:00
committed by GitHub
parent ca6480a8ac
commit c90de8978d
3 changed files with 281 additions and 54 deletions

View File

@@ -8,8 +8,10 @@ use jsonrpc_pubsub::{
};
use serde::Serialize;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::rpc_response::{
Response, RpcKeyedAccount, RpcResponseContext, RpcSignatureResult,
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::RpcFilterType,
rpc_response::{Response, RpcKeyedAccount, RpcResponseContext, RpcSignatureResult},
};
use solana_runtime::{
bank::Bank,
@@ -79,29 +81,44 @@ impl std::fmt::Debug for NotificationEntry {
}
}
struct SubscriptionData<S> {
struct SubscriptionData<S, T> {
sink: Sink<S>,
commitment: CommitmentConfig,
last_notified_slot: RwLock<Slot>,
config: Option<T>,
}
type RpcAccountSubscriptions =
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, SubscriptionData<Response<UiAccount>>>>>;
type RpcProgramSubscriptions =
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, SubscriptionData<Response<RpcKeyedAccount>>>>>;
#[derive(Default, Clone)]
struct ProgramConfig {
filters: Vec<RpcFilterType>,
encoding: Option<UiAccountEncoding>,
}
type RpcAccountSubscriptions = RwLock<
HashMap<
Pubkey,
HashMap<SubscriptionId, SubscriptionData<Response<UiAccount>, UiAccountEncoding>>,
>,
>;
type RpcProgramSubscriptions = RwLock<
HashMap<
Pubkey,
HashMap<SubscriptionId, SubscriptionData<Response<RpcKeyedAccount>, ProgramConfig>>,
>,
>;
type RpcSignatureSubscriptions = RwLock<
HashMap<Signature, HashMap<SubscriptionId, SubscriptionData<Response<RpcSignatureResult>>>>,
HashMap<Signature, HashMap<SubscriptionId, SubscriptionData<Response<RpcSignatureResult>, ()>>>,
>;
type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
type RpcVoteSubscriptions = RwLock<HashMap<SubscriptionId, Sink<RpcVote>>>;
type RpcRootSubscriptions = RwLock<HashMap<SubscriptionId, Sink<Slot>>>;
fn add_subscription<K, S>(
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, SubscriptionData<S>>>,
fn add_subscription<K, S, T>(
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, SubscriptionData<S, T>>>,
hashmap_key: K,
commitment: Option<CommitmentConfig>,
sub_id: SubscriptionId,
subscriber: Subscriber<S>,
last_notified_slot: Slot,
config: Option<T>,
) where
K: Eq + Hash,
S: Clone,
@@ -112,6 +129,7 @@ fn add_subscription<K, S>(
sink,
commitment,
last_notified_slot: RwLock::new(last_notified_slot),
config,
};
if let Some(current_hashmap) = subscriptions.get_mut(&hashmap_key) {
current_hashmap.insert(sub_id, subscription_data);
@@ -122,8 +140,8 @@ fn add_subscription<K, S>(
subscriptions.insert(hashmap_key, hashmap);
}
fn remove_subscription<K, S>(
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, SubscriptionData<S>>>,
fn remove_subscription<K, S, T>(
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, SubscriptionData<S, T>>>,
sub_id: &SubscriptionId,
) -> bool
where
@@ -145,8 +163,8 @@ where
}
#[allow(clippy::type_complexity)]
fn check_commitment_and_notify<K, S, B, F, X>(
subscriptions: &HashMap<K, HashMap<SubscriptionId, SubscriptionData<Response<S>>>>,
fn check_commitment_and_notify<K, S, B, F, X, T>(
subscriptions: &HashMap<K, HashMap<SubscriptionId, SubscriptionData<Response<S>, T>>>,
hashmap_key: &K,
bank_forks: &Arc<RwLock<BankForks>>,
commitment_slots: &CommitmentSlots,
@@ -158,8 +176,9 @@ where
K: Eq + Hash + Clone + Copy,
S: Clone + Serialize,
B: Fn(&Bank, &K) -> X,
F: Fn(X, Slot) -> (Box<dyn Iterator<Item = S>>, Slot),
F: Fn(X, Slot, Option<T>) -> (Box<dyn Iterator<Item = S>>, Slot),
X: Clone + Serialize + Default,
T: Clone,
{
let mut notified_set: HashSet<SubscriptionId> = HashSet::new();
if let Some(hashmap) = subscriptions.get(hashmap_key) {
@@ -169,6 +188,7 @@ where
sink,
commitment,
last_notified_slot,
config,
},
) in hashmap.iter()
{
@@ -188,7 +208,8 @@ where
.unwrap_or_default()
};
let mut w_last_notified_slot = last_notified_slot.write().unwrap();
let (filter_results, result_slot) = filter_results(results, *w_last_notified_slot);
let (filter_results, result_slot) =
filter_results(results, *w_last_notified_slot, config.as_ref().cloned());
for result in filter_results {
notifier.notify(
Response {
@@ -220,16 +241,15 @@ impl RpcNotifier {
fn filter_account_result(
result: Option<(Account, Slot)>,
last_notified_slot: Slot,
encoding: Option<UiAccountEncoding>,
) -> (Box<dyn Iterator<Item = UiAccount>>, Slot) {
if let Some((account, fork)) = result {
// If fork < last_notified_slot this means that we last notified for a fork
// and should notify that the account state has been reverted.
if fork != last_notified_slot {
let encoding = encoding.unwrap_or(UiAccountEncoding::Binary);
return (
Box::new(iter::once(UiAccount::encode(
account,
UiAccountEncoding::Binary,
))),
Box::new(iter::once(UiAccount::encode(account, encoding))),
fork,
);
}
@@ -240,6 +260,7 @@ fn filter_account_result(
fn filter_signature_result(
result: Option<transaction::Result<()>>,
last_notified_slot: Slot,
_config: Option<()>,
) -> (Box<dyn Iterator<Item = RpcSignatureResult>>, Slot) {
(
Box::new(
@@ -254,14 +275,24 @@ fn filter_signature_result(
fn filter_program_results(
accounts: Vec<(Pubkey, Account)>,
last_notified_slot: Slot,
config: Option<ProgramConfig>,
) -> (Box<dyn Iterator<Item = RpcKeyedAccount>>, Slot) {
let config = config.unwrap_or_default();
let encoding = config.encoding.unwrap_or(UiAccountEncoding::Binary);
let filters = config.filters;
(
Box::new(
accounts
.into_iter()
.map(|(pubkey, account)| RpcKeyedAccount {
.filter(move |(_, account)| {
filters.iter().all(|filter_type| match filter_type {
RpcFilterType::DataSize(size) => account.data.len() as u64 == *size,
RpcFilterType::Memcmp(compare) => compare.bytes_match(&account.data),
})
})
.map(move |(pubkey, account)| RpcKeyedAccount {
pubkey: pubkey.to_string(),
account: UiAccount::encode(account, UiAccountEncoding::Binary),
account: UiAccount::encode(account, encoding.clone()),
}),
),
last_notified_slot,
@@ -448,11 +479,13 @@ impl RpcSubscriptions {
pub fn add_account_subscription(
&self,
pubkey: Pubkey,
commitment: Option<CommitmentConfig>,
config: Option<RpcAccountInfoConfig>,
sub_id: SubscriptionId,
subscriber: Subscriber<Response<UiAccount>>,
) {
let commitment_level = commitment
let config = config.unwrap_or_default();
let commitment_level = config
.commitment
.unwrap_or_else(CommitmentConfig::single)
.commitment;
let slot = match commitment_level {
@@ -498,10 +531,11 @@ impl RpcSubscriptions {
add_subscription(
&mut subscriptions,
pubkey,
commitment,
config.commitment,
sub_id,
subscriber,
last_notified_slot,
config.encoding,
);
}
@@ -522,11 +556,14 @@ impl RpcSubscriptions {
pub fn add_program_subscription(
&self,
program_id: Pubkey,
commitment: Option<CommitmentConfig>,
config: Option<RpcProgramAccountsConfig>,
sub_id: SubscriptionId,
subscriber: Subscriber<Response<RpcKeyedAccount>>,
) {
let commitment_level = commitment
let config = config.unwrap_or_default();
let commitment_level = config
.account_config
.commitment
.unwrap_or_else(CommitmentConfig::recent)
.commitment;
let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip {
@@ -540,10 +577,14 @@ impl RpcSubscriptions {
add_subscription(
&mut subscriptions,
program_id,
commitment,
config.account_config.commitment,
sub_id,
subscriber,
0, // last_notified_slot is not utilized for program subscriptions
Some(ProgramConfig {
filters: config.filters.unwrap_or_default(),
encoding: config.account_config.encoding,
}),
);
}
@@ -586,6 +627,7 @@ impl RpcSubscriptions {
sub_id,
subscriber,
0, // last_notified_slot is not utilized for signature subscriptions
None,
);
}
@@ -964,7 +1006,10 @@ pub(crate) mod tests {
);
subscriptions.add_account_subscription(
alice.pubkey(),
Some(CommitmentConfig::recent()),
Some(RpcAccountInfoConfig {
commitment: Some(CommitmentConfig::recent()),
encoding: None,
}),
sub_id.clone(),
subscriber,
);
@@ -1368,7 +1413,7 @@ pub(crate) mod tests {
#[test]
#[serial]
fn test_add_and_remove_subscription() {
let mut subscriptions: HashMap<u64, HashMap<SubscriptionId, SubscriptionData<()>>> =
let mut subscriptions: HashMap<u64, HashMap<SubscriptionId, SubscriptionData<(), ()>>> =
HashMap::new();
let num_keys = 5;
@@ -1376,7 +1421,7 @@ pub(crate) mod tests {
let (subscriber, _id_receiver, _transport_receiver) =
Subscriber::new_test("notification");
let sub_id = SubscriptionId::Number(key);
add_subscription(&mut subscriptions, key, None, sub_id, subscriber, 0);
add_subscription(&mut subscriptions, key, None, sub_id, subscriber, 0, None);
}
// Add another subscription to the "0" key
@@ -1389,6 +1434,7 @@ pub(crate) mod tests {
extra_sub_id.clone(),
subscriber,
0,
None,
);
assert_eq!(subscriptions.len(), num_keys as usize);
@@ -1444,7 +1490,10 @@ pub(crate) mod tests {
let sub_id0 = SubscriptionId::Number(0 as u64);
subscriptions.add_account_subscription(
alice.pubkey(),
Some(CommitmentConfig::single_gossip()),
Some(RpcAccountInfoConfig {
commitment: Some(CommitmentConfig::single_gossip()),
encoding: None,
}),
sub_id0.clone(),
subscriber0,
);
@@ -1509,7 +1558,10 @@ pub(crate) mod tests {
let sub_id1 = SubscriptionId::Number(1 as u64);
subscriptions.add_account_subscription(
alice.pubkey(),
Some(CommitmentConfig::single_gossip()),
Some(RpcAccountInfoConfig {
commitment: Some(CommitmentConfig::single_gossip()),
encoding: None,
}),
sub_id1.clone(),
subscriber1,
);