Respect confirmations param for signature subscription notifications (#9019)
automerge
This commit is contained in:
@ -344,7 +344,7 @@ impl RpcSubscriptions {
|
|||||||
signature,
|
signature,
|
||||||
current_slot,
|
current_slot,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
Bank::get_signature_status,
|
Bank::get_signature_status_processed_since_parent,
|
||||||
filter_signature_result,
|
filter_signature_result,
|
||||||
notifier,
|
notifier,
|
||||||
);
|
);
|
||||||
@ -726,43 +726,108 @@ pub(crate) mod tests {
|
|||||||
} = create_genesis_config(100);
|
} = create_genesis_config(100);
|
||||||
let bank = Bank::new(&genesis_config);
|
let bank = Bank::new(&genesis_config);
|
||||||
let blockhash = bank.last_blockhash();
|
let blockhash = bank.last_blockhash();
|
||||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
|
let mut bank_forks = BankForks::new(0, bank);
|
||||||
let alice = Keypair::new();
|
let alice = Keypair::new();
|
||||||
let tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 20, blockhash);
|
|
||||||
let signature = tx.signatures[0];
|
let past_bank_tx =
|
||||||
|
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, blockhash);
|
||||||
let unprocessed_tx =
|
let unprocessed_tx =
|
||||||
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 10, blockhash);
|
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, blockhash);
|
||||||
let not_ready_signature = unprocessed_tx.signatures[0];
|
let processed_tx =
|
||||||
|
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 3, blockhash);
|
||||||
|
|
||||||
bank_forks
|
bank_forks
|
||||||
.write()
|
|
||||||
.unwrap()
|
|
||||||
.get(0)
|
.get(0)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.process_transaction(&tx)
|
.process_transaction(&past_bank_tx)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
let next_bank =
|
||||||
|
Bank::new_from_parent(&bank_forks.banks[&0].clone(), &Pubkey::new_rand(), 1);
|
||||||
|
bank_forks.insert(next_bank);
|
||||||
|
|
||||||
|
bank_forks
|
||||||
|
.get(1)
|
||||||
|
.unwrap()
|
||||||
|
.process_transaction(&processed_tx)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let bank_forks = Arc::new(RwLock::new(bank_forks));
|
||||||
|
|
||||||
let (subscriber, _id_receiver, transport_receiver) =
|
let (subscriber, _id_receiver, transport_receiver) =
|
||||||
Subscriber::new_test("signatureNotification");
|
Subscriber::new_test("signatureNotification");
|
||||||
let sub_id = SubscriptionId::Number(0 as u64);
|
let sink = subscriber.assign_id(SubscriptionId::Number(0)).unwrap();
|
||||||
let remaining_sub_id = SubscriptionId::Number(1 as u64);
|
|
||||||
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let subscriptions = RpcSubscriptions::new(&exit);
|
let subscriptions = RpcSubscriptions::new(&exit);
|
||||||
subscriptions.add_signature_subscription(&signature, None, &sub_id, &sink.clone());
|
|
||||||
subscriptions.add_signature_subscription(
|
subscriptions.add_signature_subscription(
|
||||||
¬_ready_signature,
|
&past_bank_tx.signatures[0],
|
||||||
None,
|
Some(0),
|
||||||
&remaining_sub_id,
|
&SubscriptionId::Number(1 as u64),
|
||||||
|
&sink.clone(),
|
||||||
|
);
|
||||||
|
subscriptions.add_signature_subscription(
|
||||||
|
&processed_tx.signatures[0],
|
||||||
|
Some(0),
|
||||||
|
&SubscriptionId::Number(2 as u64),
|
||||||
|
&sink.clone(),
|
||||||
|
);
|
||||||
|
subscriptions.add_signature_subscription(
|
||||||
|
&unprocessed_tx.signatures[0],
|
||||||
|
Some(0),
|
||||||
|
&SubscriptionId::Number(3 as u64),
|
||||||
&sink.clone(),
|
&sink.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
{
|
{
|
||||||
let sig_subs = subscriptions.signature_subscriptions.read().unwrap();
|
let sig_subs = subscriptions.signature_subscriptions.read().unwrap();
|
||||||
assert!(sig_subs.contains_key(&signature));
|
assert!(sig_subs.contains_key(&past_bank_tx.signatures[0]));
|
||||||
assert!(sig_subs.contains_key(¬_ready_signature));
|
assert!(sig_subs.contains_key(&unprocessed_tx.signatures[0]));
|
||||||
|
assert!(sig_subs.contains_key(&processed_tx.signatures[0]));
|
||||||
}
|
}
|
||||||
|
|
||||||
subscriptions.notify_subscribers(0, &bank_forks);
|
subscriptions.notify_subscribers(1, &bank_forks);
|
||||||
|
let response = robust_poll_or_panic(transport_receiver);
|
||||||
|
let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
|
||||||
|
let expected = json!({
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"method": "signatureNotification",
|
||||||
|
"params": {
|
||||||
|
"result": {
|
||||||
|
"context": { "slot": 1 },
|
||||||
|
"value": expected_res,
|
||||||
|
},
|
||||||
|
"subscription": 0,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
|
||||||
|
|
||||||
|
let sig_subs = subscriptions.signature_subscriptions.read().unwrap();
|
||||||
|
|
||||||
|
// Subscription should be automatically removed after notification
|
||||||
|
assert!(!sig_subs.contains_key(&processed_tx.signatures[0]));
|
||||||
|
|
||||||
|
// Only one notification is expected for signature processed in previous bank
|
||||||
|
assert_eq!(sig_subs.get(&past_bank_tx.signatures[0]).unwrap().len(), 1);
|
||||||
|
|
||||||
|
// Unprocessed signature subscription should not be removed
|
||||||
|
assert_eq!(
|
||||||
|
sig_subs.get(&unprocessed_tx.signatures[0]).unwrap().len(),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
|
||||||
|
let (subscriber, _id_receiver, transport_receiver) =
|
||||||
|
Subscriber::new_test("signatureNotification");
|
||||||
|
let sink = subscriber.assign_id(SubscriptionId::Number(0)).unwrap();
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
let subscriptions = RpcSubscriptions::new(&exit);
|
||||||
|
|
||||||
|
subscriptions.add_signature_subscription(
|
||||||
|
&past_bank_tx.signatures[0],
|
||||||
|
Some(1),
|
||||||
|
&SubscriptionId::Number(1 as u64),
|
||||||
|
&sink.clone(),
|
||||||
|
);
|
||||||
|
subscriptions.notify_subscribers(1, &bank_forks);
|
||||||
let response = robust_poll_or_panic(transport_receiver);
|
let response = robust_poll_or_panic(transport_receiver);
|
||||||
let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
|
let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
|
||||||
let expected = json!({
|
let expected = json!({
|
||||||
@ -777,20 +842,6 @@ pub(crate) mod tests {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
|
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
|
||||||
|
|
||||||
// Subscription should be automatically removed after notification
|
|
||||||
assert!(!subscriptions
|
|
||||||
.signature_subscriptions
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.contains_key(&signature));
|
|
||||||
|
|
||||||
// Unprocessed signature subscription should not be removed
|
|
||||||
assert!(subscriptions
|
|
||||||
.signature_subscriptions
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.contains_key(¬_ready_signature));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -7,9 +7,18 @@ use jsonrpc_core_client::transports::ws;
|
|||||||
use log::*;
|
use log::*;
|
||||||
use reqwest::{self, header::CONTENT_TYPE};
|
use reqwest::{self, header::CONTENT_TYPE};
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
use solana_client::{rpc_client::get_rpc_request_str, rpc_response::Response};
|
use solana_client::{
|
||||||
|
rpc_client::{get_rpc_request_str, RpcClient},
|
||||||
|
rpc_response::Response,
|
||||||
|
};
|
||||||
use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, validator::TestValidator};
|
use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, validator::TestValidator};
|
||||||
use solana_sdk::{hash::Hash, pubkey::Pubkey, system_transaction, transaction};
|
use solana_sdk::{
|
||||||
|
commitment_config::CommitmentConfig,
|
||||||
|
hash::Hash,
|
||||||
|
pubkey::Pubkey,
|
||||||
|
system_transaction,
|
||||||
|
transaction::{self, Transaction},
|
||||||
|
};
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashSet,
|
collections::HashSet,
|
||||||
fs::remove_dir_all,
|
fs::remove_dir_all,
|
||||||
@ -17,8 +26,7 @@ use std::{
|
|||||||
sync::mpsc::channel,
|
sync::mpsc::channel,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
thread::sleep,
|
thread::sleep,
|
||||||
time::Duration,
|
time::{Duration, Instant},
|
||||||
time::SystemTime,
|
|
||||||
};
|
};
|
||||||
use tokio::runtime::Runtime;
|
use tokio::runtime::Runtime;
|
||||||
|
|
||||||
@ -204,35 +212,43 @@ fn test_rpc_subscriptions() {
|
|||||||
|
|
||||||
// Create transaction signatures to subscribe to
|
// Create transaction signatures to subscribe to
|
||||||
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let mut signature_set: HashSet<String> = (0..1000)
|
let transactions: Vec<Transaction> = (0..500)
|
||||||
.map(|_| {
|
.map(|_| system_transaction::transfer(&alice, &Pubkey::new_rand(), 1, genesis_hash))
|
||||||
let tx = system_transaction::transfer(&alice, &Pubkey::new_rand(), 1, genesis_hash);
|
.collect();
|
||||||
transactions_socket
|
let mut signature_set: HashSet<String> = transactions
|
||||||
.send_to(&bincode::serialize(&tx).unwrap(), leader_data.tpu)
|
.iter()
|
||||||
.unwrap();
|
.map(|tx| tx.signatures[0].to_string())
|
||||||
tx.signatures[0].to_string()
|
|
||||||
})
|
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Create the pub sub runtime
|
// Create the pub sub runtime
|
||||||
let mut rt = Runtime::new().unwrap();
|
let mut rt = Runtime::new().unwrap();
|
||||||
let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub);
|
let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub);
|
||||||
let (sender, receiver) = channel::<(String, Response<transaction::Result<()>>)>();
|
|
||||||
let sender = Arc::new(Mutex::new(sender));
|
|
||||||
|
|
||||||
|
let (status_sender, status_receiver) = channel::<(String, Response<transaction::Result<()>>)>();
|
||||||
|
let status_sender = Arc::new(Mutex::new(status_sender));
|
||||||
|
let (sent_sender, sent_receiver) = channel::<()>();
|
||||||
|
let sent_sender = Arc::new(Mutex::new(sent_sender));
|
||||||
|
|
||||||
|
// Subscribe to all signatures
|
||||||
rt.spawn({
|
rt.spawn({
|
||||||
let connect = ws::try_connect::<PubsubClient>(&rpc_pubsub_url).unwrap();
|
let connect = ws::try_connect::<PubsubClient>(&rpc_pubsub_url).unwrap();
|
||||||
let signature_set = signature_set.clone();
|
let signature_set = signature_set.clone();
|
||||||
connect
|
connect
|
||||||
.and_then(move |client| {
|
.and_then(move |client| {
|
||||||
for sig in signature_set {
|
for sig in signature_set {
|
||||||
let sender = sender.clone();
|
let status_sender = status_sender.clone();
|
||||||
|
let sent_sender = sent_sender.clone();
|
||||||
tokio::spawn(
|
tokio::spawn(
|
||||||
client
|
client
|
||||||
.signature_subscribe(sig.clone(), None)
|
.signature_subscribe(sig.clone(), None)
|
||||||
.and_then(move |sig_stream| {
|
.and_then(move |sig_stream| {
|
||||||
|
sent_sender.lock().unwrap().send(()).unwrap();
|
||||||
sig_stream.for_each(move |result| {
|
sig_stream.for_each(move |result| {
|
||||||
sender.lock().unwrap().send((sig.clone(), result)).unwrap();
|
status_sender
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.send((sig.clone(), result))
|
||||||
|
.unwrap();
|
||||||
future::ok(())
|
future::ok(())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -246,18 +262,49 @@ fn test_rpc_subscriptions() {
|
|||||||
.map_err(|_| ())
|
.map_err(|_| ())
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Wait for signature subscriptions
|
||||||
|
let deadline = Instant::now() + Duration::from_secs(2);
|
||||||
|
(0..transactions.len()).for_each(|_| {
|
||||||
|
sent_receiver
|
||||||
|
.recv_timeout(deadline.saturating_duration_since(Instant::now()))
|
||||||
|
.unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
let rpc_client = RpcClient::new_socket(leader_data.rpc);
|
||||||
|
let transaction_count = rpc_client
|
||||||
|
.get_transaction_count_with_commitment(CommitmentConfig::recent())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Send all transactions to tpu socket for processing
|
||||||
|
transactions.iter().for_each(|tx| {
|
||||||
|
transactions_socket
|
||||||
|
.send_to(&bincode::serialize(&tx).unwrap(), leader_data.tpu)
|
||||||
|
.unwrap();
|
||||||
|
});
|
||||||
|
let mut x = 0;
|
||||||
|
let now = Instant::now();
|
||||||
|
while x < transaction_count + 500 || now.elapsed() > Duration::from_secs(5) {
|
||||||
|
x = rpc_client
|
||||||
|
.get_transaction_count_with_commitment(CommitmentConfig::recent())
|
||||||
|
.unwrap();
|
||||||
|
sleep(Duration::from_millis(200));
|
||||||
|
}
|
||||||
|
|
||||||
// Wait for all signature subscriptions
|
// Wait for all signature subscriptions
|
||||||
let now = SystemTime::now();
|
let deadline = Instant::now() + Duration::from_secs(5);
|
||||||
let timeout = Duration::from_secs(5);
|
|
||||||
while !signature_set.is_empty() {
|
while !signature_set.is_empty() {
|
||||||
assert!(now.elapsed().unwrap() < timeout);
|
let timeout = deadline.saturating_duration_since(Instant::now());
|
||||||
match receiver.recv_timeout(Duration::from_secs(1)) {
|
match status_receiver.recv_timeout(timeout) {
|
||||||
Ok((sig, result)) => {
|
Ok((sig, result)) => {
|
||||||
assert!(result.value.is_ok());
|
assert!(result.value.is_ok());
|
||||||
assert!(signature_set.remove(&sig));
|
assert!(signature_set.remove(&sig));
|
||||||
}
|
}
|
||||||
Err(_err) => {
|
Err(_err) => {
|
||||||
eprintln!("unexpected receive timeout");
|
eprintln!(
|
||||||
|
"recv_timeout, {}/{} signatures remaining",
|
||||||
|
signature_set.len(),
|
||||||
|
transactions.len()
|
||||||
|
);
|
||||||
assert!(false)
|
assert!(false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1823,6 +1823,18 @@ impl Bank {
|
|||||||
.fetch_add(signature_count, Ordering::Relaxed);
|
.fetch_add(signature_count, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_signature_status_processed_since_parent(
|
||||||
|
&self,
|
||||||
|
signature: &Signature,
|
||||||
|
) -> Option<Result<()>> {
|
||||||
|
if let Some(status) = self.get_signature_confirmation_status(signature) {
|
||||||
|
if status.slot == self.slot() {
|
||||||
|
return Some(status.status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_signature_confirmation_status(
|
pub fn get_signature_confirmation_status(
|
||||||
&self,
|
&self,
|
||||||
signature: &Signature,
|
signature: &Signature,
|
||||||
|
Reference in New Issue
Block a user