Use runtime executor to send pubsub notifications (#8353)

automerge
This commit is contained in:
Justin Starry
2020-02-26 12:23:54 +08:00
committed by GitHub
parent 407d058611
commit 8839dbfe5b
13 changed files with 598 additions and 148 deletions

View File

@@ -25,6 +25,7 @@ fs_extra = "1.1.0"
indexmap = "1.3"
itertools = "0.8.2"
jsonrpc-core = "14.0.5"
jsonrpc-core-client = { version = "14.0.5", features = ["ws"] }
jsonrpc-derive = "14.0.5"
jsonrpc-http-server = "14.0.6"
jsonrpc-pubsub = "14.0.6"

View File

@@ -12,7 +12,7 @@ use std::sync::{atomic, Arc};
// https://github.com/paritytech/jsonrpc/blob/2d38e6424d8461cdf72e78425ce67d51af9c6586/derive/src/lib.rs#L204
// Once https://github.com/paritytech/jsonrpc/issues/418 is resolved, try to remove this clippy allow
#[allow(clippy::needless_return)]
#[rpc(server)]
#[rpc]
pub trait RpcSolPubSub {
type Metadata;

View File

@@ -17,14 +17,16 @@ use std::thread::{Builder, JoinHandle};
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
iter,
sync::{Arc, Mutex, RwLock},
};
use tokio::runtime::{Builder as RuntimeBuilder, Runtime, TaskExecutor};
const RECEIVE_DELAY_MILLIS: u64 = 100;
pub type Confirmations = usize;
#[derive(Serialize, Clone, Copy, Debug)]
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
pub struct SlotInfo {
pub slot: Slot,
pub parent: Slot,
@@ -103,19 +105,20 @@ where
found
}
fn check_confirmations_and_notify<K, S, F, N, X>(
fn check_confirmations_and_notify<K, S, B, F, X>(
subscriptions: &HashMap<K, HashMap<SubscriptionId, (Sink<S>, Confirmations)>>,
hashmap_key: &K,
current_slot: Slot,
bank_forks: &Arc<RwLock<BankForks>>,
bank_method: F,
notify: N,
bank_method: B,
filter_results: F,
notifier: &RpcNotifier,
) -> HashSet<SubscriptionId>
where
K: Eq + Hash + Clone + Copy,
S: Clone + Serialize,
F: Fn(&Bank, &K) -> X,
N: Fn(X, &Sink<S>, u64) -> bool,
B: Fn(&Bank, &K) -> X,
F: Fn(X, u64) -> Box<dyn Iterator<Item = S>>,
X: Clone + Serialize,
{
let current_ancestors = bank_forks
@@ -149,8 +152,9 @@ where
.get(desired_slot[0])
.unwrap()
.clone();
let result = bank_method(&desired_bank, hashmap_key);
if notify(result, &sink, root) {
let results = bank_method(&desired_bank, hashmap_key);
for result in filter_results(results, root) {
notifier.notify(result, sink);
notified_set.insert(bank_sub_id.clone());
}
}
@@ -159,41 +163,49 @@ where
notified_set
}
fn notify_account(result: Option<(Account, Slot)>, sink: &Sink<RpcAccount>, root: Slot) -> bool {
struct RpcNotifier(TaskExecutor);
impl RpcNotifier {
fn notify<T>(&self, value: T, sink: &Sink<T>)
where
T: serde::Serialize,
{
self.0
.spawn(sink.notify(Ok(value)).map(|_| ()).map_err(|_| ()));
}
}
fn filter_account_result(
result: Option<(Account, Slot)>,
root: Slot,
) -> Box<dyn Iterator<Item = RpcAccount>> {
if let Some((account, fork)) = result {
if fork >= root {
sink.notify(Ok(RpcAccount::encode(account))).wait().unwrap();
return true;
return Box::new(iter::once(RpcAccount::encode(account)));
}
}
false
Box::new(iter::empty())
}
fn notify_signature<S>(result: Option<S>, sink: &Sink<S>, _root: Slot) -> bool
fn filter_signature_result<S>(result: Option<S>, _root: Slot) -> Box<dyn Iterator<Item = S>>
where
S: Clone + Serialize,
S: 'static + Clone + Serialize,
{
if let Some(result) = result {
sink.notify(Ok(result)).wait().unwrap();
return true;
}
false
Box::new(result.into_iter())
}
fn notify_program(
fn filter_program_results(
accounts: Vec<(Pubkey, Account)>,
sink: &Sink<RpcKeyedAccount>,
_root: Slot,
) -> bool {
for (pubkey, account) in accounts.iter() {
sink.notify(Ok(RpcKeyedAccount {
pubkey: pubkey.to_string(),
account: RpcAccount::encode(account.clone()),
}))
.wait()
.unwrap();
}
!accounts.is_empty()
) -> Box<dyn Iterator<Item = RpcKeyedAccount>> {
Box::new(
accounts
.into_iter()
.map(|(pubkey, account)| RpcKeyedAccount {
pubkey: pubkey.to_string(),
account: RpcAccount::encode(account),
}),
)
}
pub struct RpcSubscriptions {
@@ -203,6 +215,7 @@ pub struct RpcSubscriptions {
slot_subscriptions: Arc<RpcSlotSubscriptions>,
notification_sender: Arc<Mutex<Sender<NotificationEntry>>>,
t_cleanup: Option<JoinHandle<()>>,
notifier_runtime: Option<Runtime>,
exit: Arc<AtomicBool>,
}
@@ -239,11 +252,19 @@ impl RpcSubscriptions {
let signature_subscriptions_clone = signature_subscriptions.clone();
let slot_subscriptions_clone = slot_subscriptions.clone();
let notifier_runtime = RuntimeBuilder::new()
.core_threads(1)
.name_prefix("solana-rpc-notifier-")
.build()
.unwrap();
let notifier = RpcNotifier(notifier_runtime.executor());
let t_cleanup = Builder::new()
.name("solana-rpc-notifications".to_string())
.spawn(move || {
Self::process_notifications(
exit_clone,
notifier,
notification_receiver,
account_subscriptions_clone,
program_subscriptions_clone,
@@ -259,6 +280,7 @@ impl RpcSubscriptions {
signature_subscriptions,
slot_subscriptions,
notification_sender,
notifier_runtime: Some(notifier_runtime),
t_cleanup: Some(t_cleanup),
exit: exit.clone(),
}
@@ -269,6 +291,7 @@ impl RpcSubscriptions {
current_slot: Slot,
bank_forks: &Arc<RwLock<BankForks>>,
account_subscriptions: Arc<RpcAccountSubscriptions>,
notifier: &RpcNotifier,
) {
let subscriptions = account_subscriptions.read().unwrap();
check_confirmations_and_notify(
@@ -277,7 +300,8 @@ impl RpcSubscriptions {
current_slot,
bank_forks,
Bank::get_account_modified_since_parent,
notify_account,
filter_account_result,
notifier,
);
}
@@ -286,6 +310,7 @@ impl RpcSubscriptions {
current_slot: Slot,
bank_forks: &Arc<RwLock<BankForks>>,
program_subscriptions: Arc<RpcProgramSubscriptions>,
notifier: &RpcNotifier,
) {
let subscriptions = program_subscriptions.read().unwrap();
check_confirmations_and_notify(
@@ -294,7 +319,8 @@ impl RpcSubscriptions {
current_slot,
bank_forks,
Bank::get_program_accounts_modified_since_parent,
notify_program,
filter_program_results,
notifier,
);
}
@@ -303,6 +329,7 @@ impl RpcSubscriptions {
current_slot: Slot,
bank_forks: &Arc<RwLock<BankForks>>,
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
notifier: &RpcNotifier,
) {
let mut subscriptions = signature_subscriptions.write().unwrap();
let notified_ids = check_confirmations_and_notify(
@@ -311,7 +338,8 @@ impl RpcSubscriptions {
current_slot,
bank_forks,
Bank::get_signature_status,
notify_signature,
filter_signature_result,
notifier,
);
if let Some(subscription_ids) = subscriptions.get_mut(signature) {
subscription_ids.retain(|k, _| !notified_ids.contains(k));
@@ -408,6 +436,7 @@ impl RpcSubscriptions {
fn process_notifications(
exit: Arc<AtomicBool>,
notifier: RpcNotifier,
notification_receiver: Receiver<NotificationEntry>,
account_subscriptions: Arc<RpcAccountSubscriptions>,
program_subscriptions: Arc<RpcProgramSubscriptions>,
@@ -423,7 +452,7 @@ impl RpcSubscriptions {
NotificationEntry::Slot(slot_info) => {
let subscriptions = slot_subscriptions.read().unwrap();
for (_, sink) in subscriptions.iter() {
sink.notify(Ok(slot_info)).wait().unwrap();
notifier.notify(slot_info, sink);
}
}
NotificationEntry::Bank((current_slot, bank_forks)) => {
@@ -437,6 +466,7 @@ impl RpcSubscriptions {
current_slot,
&bank_forks,
account_subscriptions.clone(),
&notifier,
);
}
@@ -450,6 +480,7 @@ impl RpcSubscriptions {
current_slot,
&bank_forks,
program_subscriptions.clone(),
&notifier,
);
}
@@ -463,6 +494,7 @@ impl RpcSubscriptions {
current_slot,
&bank_forks,
signature_subscriptions.clone(),
&notifier,
);
}
}
@@ -479,6 +511,12 @@ impl RpcSubscriptions {
}
fn shutdown(&mut self) -> std::thread::Result<()> {
if let Some(runtime) = self.notifier_runtime.take() {
info!("RPC Notifier runtime - shutting down");
let _ = runtime.shutdown_now().wait();
info!("RPC Notifier runtime - shut down");
}
if self.t_cleanup.is_some() {
info!("RPC Notification thread - shutting down");
self.exit.store(true, Ordering::Relaxed);

View File

@@ -642,74 +642,94 @@ fn wait_for_supermajority(
}
}
pub fn new_validator_for_tests() -> (Validator, ContactInfo, Keypair, PathBuf) {
let (node, contact_info, mint_keypair, ledger_path, _vote_pubkey) =
new_validator_for_tests_with_vote_pubkey();
(node, contact_info, mint_keypair, ledger_path)
pub struct TestValidator {
pub server: Validator,
pub leader_data: ContactInfo,
pub alice: Keypair,
pub ledger_path: PathBuf,
pub genesis_hash: Hash,
pub vote_pubkey: Pubkey,
}
pub fn new_validator_for_tests_with_vote_pubkey(
) -> (Validator, ContactInfo, Keypair, PathBuf, Pubkey) {
use crate::genesis_utils::BOOTSTRAP_VALIDATOR_LAMPORTS;
new_validator_for_tests_ex(0, BOOTSTRAP_VALIDATOR_LAMPORTS)
pub struct TestValidatorOptions {
pub fees: u64,
pub bootstrap_validator_lamports: u64,
}
pub fn new_validator_for_tests_ex(
fees: u64,
bootstrap_validator_lamports: u64,
) -> (Validator, ContactInfo, Keypair, PathBuf, Pubkey) {
use crate::genesis_utils::{create_genesis_config_with_leader_ex, GenesisConfigInfo};
use solana_sdk::fee_calculator::FeeCalculator;
impl Default for TestValidatorOptions {
fn default() -> Self {
use crate::genesis_utils::BOOTSTRAP_VALIDATOR_LAMPORTS;
TestValidatorOptions {
fees: 0,
bootstrap_validator_lamports: BOOTSTRAP_VALIDATOR_LAMPORTS,
}
}
}
let node_keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&node_keypair.pubkey());
let contact_info = node.info.clone();
impl TestValidator {
pub fn run() -> Self {
Self::run_with_options(TestValidatorOptions::default())
}
let GenesisConfigInfo {
mut genesis_config,
mint_keypair,
voting_keypair,
} = create_genesis_config_with_leader_ex(
1_000_000,
&contact_info.id,
42,
bootstrap_validator_lamports,
);
genesis_config
.native_instruction_processors
.push(solana_budget_program!());
pub fn run_with_options(options: TestValidatorOptions) -> Self {
use crate::genesis_utils::{create_genesis_config_with_leader_ex, GenesisConfigInfo};
use solana_sdk::fee_calculator::FeeCalculator;
genesis_config.rent.lamports_per_byte_year = 1;
genesis_config.rent.exemption_threshold = 1.0;
genesis_config.fee_calculator = FeeCalculator::new(fees, 0);
let TestValidatorOptions {
fees,
bootstrap_validator_lamports,
} = options;
let node_keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&node_keypair.pubkey());
let contact_info = node.info.clone();
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
let GenesisConfigInfo {
mut genesis_config,
mint_keypair,
voting_keypair,
} = create_genesis_config_with_leader_ex(
1_000_000,
&contact_info.id,
42,
bootstrap_validator_lamports,
);
genesis_config
.native_instruction_processors
.push(solana_budget_program!());
let leader_voting_keypair = Arc::new(voting_keypair);
let storage_keypair = Arc::new(Keypair::new());
let config = ValidatorConfig {
rpc_ports: Some((node.info.rpc.port(), node.info.rpc_pubsub.port())),
..ValidatorConfig::default()
};
let node = Validator::new(
node,
&node_keypair,
&ledger_path,
&leader_voting_keypair.pubkey(),
&leader_voting_keypair,
&storage_keypair,
None,
true,
&config,
);
discover_cluster(&contact_info.gossip, 1).expect("Node startup failed");
(
node,
contact_info,
mint_keypair,
ledger_path,
leader_voting_keypair.pubkey(),
)
genesis_config.rent.lamports_per_byte_year = 1;
genesis_config.rent.exemption_threshold = 1.0;
genesis_config.fee_calculator = FeeCalculator::new(fees, 0);
let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_config);
let leader_voting_keypair = Arc::new(voting_keypair);
let storage_keypair = Arc::new(Keypair::new());
let config = ValidatorConfig {
rpc_ports: Some((node.info.rpc.port(), node.info.rpc_pubsub.port())),
..ValidatorConfig::default()
};
let node = Validator::new(
node,
&node_keypair,
&ledger_path,
&leader_voting_keypair.pubkey(),
&leader_voting_keypair,
&storage_keypair,
None,
true,
&config,
);
discover_cluster(&contact_info.gossip, 1).expect("Node startup failed");
TestValidator {
server: node,
leader_data: contact_info,
alice: mint_keypair,
ledger_path,
genesis_hash: blockhash,
vote_pubkey: leader_voting_keypair.pubkey(),
}
}
}
fn report_target_features() {

View File

@@ -4,7 +4,7 @@ use solana_client::{
};
use solana_core::{
rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions,
validator::new_validator_for_tests,
validator::TestValidator,
};
use solana_sdk::{
commitment_config::CommitmentConfig, pubkey::Pubkey, rpc_port, signature::Signer,
@@ -26,7 +26,13 @@ use systemstat::Ipv4Addr;
fn test_rpc_client() {
solana_logger::setup();
let (server, leader_data, alice, ledger_path) = new_validator_for_tests();
let TestValidator {
server,
leader_data,
alice,
ledger_path,
..
} = TestValidator::run();
let bob_pubkey = Pubkey::new_rand();
let client = RpcClient::new_socket(leader_data.rpc);

View File

@@ -1,21 +1,38 @@
use bincode::serialize;
use jsonrpc_core::futures::{
future::{self, Future},
stream::Stream,
};
use jsonrpc_core_client::transports::ws;
use log::*;
use reqwest::{self, header::CONTENT_TYPE};
use serde_json::{json, Value};
use solana_client::rpc_client::get_rpc_request_str;
use solana_core::validator::new_validator_for_tests;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::system_transaction;
use std::fs::remove_dir_all;
use std::thread::sleep;
use std::time::Duration;
use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, validator::TestValidator};
use solana_sdk::{hash::Hash, pubkey::Pubkey, system_transaction, transaction};
use std::{
collections::HashSet,
fs::remove_dir_all,
net::UdpSocket,
sync::mpsc::channel,
sync::{Arc, Mutex},
thread::sleep,
time::Duration,
time::SystemTime,
};
use tokio::runtime::Runtime;
#[test]
fn test_rpc_send_tx() {
solana_logger::setup();
let (server, leader_data, alice, ledger_path) = new_validator_for_tests();
let TestValidator {
server,
leader_data,
alice,
ledger_path,
..
} = TestValidator::run();
let bob_pubkey = Pubkey::new_rand();
let client = reqwest::blocking::Client::new();
@@ -100,7 +117,12 @@ fn test_rpc_send_tx() {
fn test_rpc_invalid_requests() {
solana_logger::setup();
let (server, leader_data, _alice, ledger_path) = new_validator_for_tests();
let TestValidator {
server,
leader_data,
ledger_path,
..
} = TestValidator::run();
let bob_pubkey = Pubkey::new_rand();
// test invalid get_balance request
@@ -166,3 +188,82 @@ fn test_rpc_invalid_requests() {
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}
#[test]
fn test_rpc_subscriptions() {
solana_logger::setup();
let TestValidator {
server,
leader_data,
alice,
ledger_path,
genesis_hash,
..
} = TestValidator::run();
// Create transaction signatures to subscribe to
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut signature_set: HashSet<String> = (0..1000)
.map(|_| {
let tx = system_transaction::transfer(&alice, &Pubkey::new_rand(), 1, genesis_hash);
transactions_socket
.send_to(&bincode::serialize(&tx).unwrap(), leader_data.tpu)
.unwrap();
tx.signatures[0].to_string()
})
.collect();
// Create the pub sub runtime
let mut rt = Runtime::new().unwrap();
let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub);
let (sender, receiver) = channel::<(String, transaction::Result<()>)>();
let sender = Arc::new(Mutex::new(sender));
rt.spawn({
let connect = ws::try_connect::<PubsubClient>(&rpc_pubsub_url).unwrap();
let signature_set = signature_set.clone();
connect
.and_then(move |client| {
for sig in signature_set {
let sender = sender.clone();
tokio::spawn(
client
.signature_subscribe(sig.clone(), None)
.and_then(move |sig_stream| {
sig_stream.for_each(move |result| {
sender.lock().unwrap().send((sig.clone(), result)).unwrap();
future::ok(())
})
})
.map_err(|err| {
eprintln!("sig sub err: {:#?}", err);
}),
);
}
future::ok(())
})
.map_err(|_| ())
});
// Wait for all signature subscriptions
let now = SystemTime::now();
let timeout = Duration::from_secs(5);
while !signature_set.is_empty() {
assert!(now.elapsed().unwrap() < timeout);
match receiver.recv_timeout(Duration::from_secs(1)) {
Ok((sig, result)) => {
assert!(result.is_ok());
assert!(signature_set.remove(&sig));
}
Err(_err) => {
eprintln!("unexpected receive timeout");
assert!(false)
}
}
}
rt.shutdown_now().wait().unwrap();
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}