Use cluster confirmations in rpc and pubsub (#9138)

* Add runtime methods to simply get status and slot

* Add helper function to get slot confirmation_count from BlockCommitmentCache

* Return cluster confirmations in getSignatureStatus

* Remove use of invalid get_signature_confirmation_status

* Remove unused methods

* Update pubsub to use cluster confirmations

* Fix test_check_signature_subscribe failure

* Refactor confirmations to read commitment cache only once

* Review comments

* Use bank, root from BlockCommitmentCache

* Update docs

* Add metric for block-commitment aggregations

Co-authored-by: Justin Starry <justin@solana.com>
This commit is contained in:
Tyera Eulberg 2020-03-30 17:53:25 -06:00 committed by GitHub
parent 8636ef5e24
commit 50fa577af8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 400 additions and 344 deletions

View File

@ -1,3 +1,6 @@
use crate::consensus::VOTE_THRESHOLD_SIZE;
use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_info;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::clock::Slot; use solana_sdk::clock::Slot;
use solana_vote_program::{vote_state::VoteState, vote_state::MAX_LOCKOUT_HISTORY}; use solana_vote_program::{vote_state::VoteState, vote_state::MAX_LOCKOUT_HISTORY};
@ -88,27 +91,34 @@ impl BlockCommitmentCache {
self.root self.root
} }
pub fn get_block_with_depth_commitment( pub fn get_confirmation_count(&self, slot: Slot) -> Option<usize> {
&self, self.get_lockout_count(slot, VOTE_THRESHOLD_SIZE)
minimum_depth: usize,
minimum_stake_percentage: f64,
) -> Option<Slot> {
self.block_commitment
.iter()
.filter(|&(_, block_commitment)| {
let fork_stake_minimum_depth: u64 = block_commitment.commitment[minimum_depth..]
.iter()
.cloned()
.sum();
fork_stake_minimum_depth as f64 / self.total_stake as f64
>= minimum_stake_percentage
})
.map(|(slot, _)| *slot)
.max()
} }
pub fn get_rooted_block_with_commitment(&self, minimum_stake_percentage: f64) -> Option<u64> { // Returns the lowest level at which at least `minimum_stake_percentage` of the total epoch
self.get_block_with_depth_commitment(MAX_LOCKOUT_HISTORY - 1, minimum_stake_percentage) // stake is locked out
fn get_lockout_count(&self, slot: Slot, minimum_stake_percentage: f64) -> Option<usize> {
self.get_block_commitment(slot).map(|block_commitment| {
let iterator = block_commitment.commitment.iter().enumerate().rev();
let mut sum = 0;
for (i, stake) in iterator {
sum += stake;
if (sum as f64 / self.total_stake as f64) > minimum_stake_percentage {
return i + 1;
}
}
0
})
}
#[cfg(test)]
pub fn new_for_tests() -> Self {
let mut block_commitment: HashMap<Slot, BlockCommitment> = HashMap::new();
block_commitment.insert(0, BlockCommitment::default());
Self {
block_commitment,
total_stake: 42,
..Self::default()
}
} }
} }
@ -184,6 +194,7 @@ impl AggregateCommitmentService {
continue; continue;
} }
let mut aggregate_commitment_time = Measure::start("aggregate-commitment-ms");
let block_commitment = Self::aggregate_commitment(&ancestors, &aggregation_data.bank); let block_commitment = Self::aggregate_commitment(&ancestors, &aggregation_data.bank);
let mut new_block_commitment = BlockCommitmentCache::new( let mut new_block_commitment = BlockCommitmentCache::new(
@ -196,6 +207,11 @@ impl AggregateCommitmentService {
let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();
std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment);
aggregate_commitment_time.stop();
inc_new_counter_info!(
"aggregate-commitment-ms",
aggregate_commitment_time.as_ms() as usize
);
} }
} }
@ -290,86 +306,31 @@ mod tests {
} }
#[test] #[test]
fn test_get_block_with_depth_commitment() { fn test_get_confirmations() {
let bank = Arc::new(Bank::default()); let bank = Arc::new(Bank::default());
// Build BlockCommitmentCache with votes at depths 0 and 1 for 2 slots // Build BlockCommitmentCache with votes at depths 0 and 1 for 2 slots
let mut cache0 = BlockCommitment::default(); let mut cache0 = BlockCommitment::default();
cache0.increase_confirmation_stake(1, 15); cache0.increase_confirmation_stake(1, 5);
cache0.increase_confirmation_stake(2, 25); cache0.increase_confirmation_stake(2, 40);
let mut cache1 = BlockCommitment::default(); let mut cache1 = BlockCommitment::default();
cache1.increase_confirmation_stake(1, 10); cache1.increase_confirmation_stake(1, 40);
cache1.increase_confirmation_stake(2, 20); cache1.increase_confirmation_stake(2, 5);
let mut cache2 = BlockCommitment::default();
cache2.increase_confirmation_stake(1, 20);
cache2.increase_confirmation_stake(2, 5);
let mut block_commitment = HashMap::new(); let mut block_commitment = HashMap::new();
block_commitment.entry(0).or_insert(cache0.clone()); block_commitment.entry(0).or_insert(cache0.clone());
block_commitment.entry(1).or_insert(cache1.clone()); block_commitment.entry(1).or_insert(cache1.clone());
block_commitment.entry(2).or_insert(cache2.clone());
let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 50, bank, 0); let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 50, bank, 0);
// Neither slot has rooted votes assert_eq!(block_commitment_cache.get_confirmation_count(0), Some(2));
assert_eq!( assert_eq!(block_commitment_cache.get_confirmation_count(1), Some(1));
block_commitment_cache.get_rooted_block_with_commitment(0.1), assert_eq!(block_commitment_cache.get_confirmation_count(2), Some(0),);
None assert_eq!(block_commitment_cache.get_confirmation_count(3), None,);
);
// Neither slot meets the minimum level of commitment 0.6 at depth 1
assert_eq!(
block_commitment_cache.get_block_with_depth_commitment(1, 0.6),
None
);
// Only slot 0 meets the minimum level of commitment 0.5 at depth 1
assert_eq!(
block_commitment_cache.get_block_with_depth_commitment(1, 0.5),
Some(0)
);
// If multiple slots meet the minimum level of commitment, method should return the most recent
assert_eq!(
block_commitment_cache.get_block_with_depth_commitment(1, 0.4),
Some(1)
);
// If multiple slots meet the minimum level of commitment, method should return the most recent
assert_eq!(
block_commitment_cache.get_block_with_depth_commitment(0, 0.6),
Some(1)
);
// Neither slot meets the minimum level of commitment 0.9 at depth 0
assert_eq!(
block_commitment_cache.get_block_with_depth_commitment(0, 0.9),
None
);
}
#[test]
fn test_get_rooted_block_with_commitment() {
let bank = Arc::new(Bank::default());
// Build BlockCommitmentCache with rooted votes
let mut cache0 = BlockCommitment::new([0; MAX_LOCKOUT_HISTORY]);
cache0.increase_confirmation_stake(MAX_LOCKOUT_HISTORY, 40);
cache0.increase_confirmation_stake(MAX_LOCKOUT_HISTORY - 1, 10);
let mut cache1 = BlockCommitment::new([0; MAX_LOCKOUT_HISTORY]);
cache1.increase_confirmation_stake(MAX_LOCKOUT_HISTORY, 30);
cache1.increase_confirmation_stake(MAX_LOCKOUT_HISTORY - 1, 10);
cache1.increase_confirmation_stake(MAX_LOCKOUT_HISTORY - 2, 10);
let mut block_commitment = HashMap::new();
block_commitment.entry(0).or_insert(cache0.clone());
block_commitment.entry(1).or_insert(cache1.clone());
let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 50, bank, 0);
// Only slot 0 meets the minimum level of commitment 0.66 at root
assert_eq!(
block_commitment_cache.get_rooted_block_with_commitment(0.66),
Some(0)
);
// If multiple slots meet the minimum level of commitment, method should return the most recent
assert_eq!(
block_commitment_cache.get_rooted_block_with_commitment(0.6),
Some(1)
);
// Neither slot meets the minimum level of commitment 0.9 at root
assert_eq!(
block_commitment_cache.get_rooted_block_with_commitment(0.9),
None
);
} }
#[test] #[test]

View File

@ -136,7 +136,7 @@ impl ReplayStage {
// Start the replay stage loop // Start the replay stage loop
let (lockouts_sender, commitment_service) = let (lockouts_sender, commitment_service) =
AggregateCommitmentService::new(&exit, block_commitment_cache); AggregateCommitmentService::new(&exit, block_commitment_cache.clone());
#[allow(clippy::cognitive_complexity)] #[allow(clippy::cognitive_complexity)]
let t_replay = Builder::new() let t_replay = Builder::new()
@ -306,7 +306,7 @@ impl ReplayStage {
// Vote on a fork // Vote on a fork
let voted_on_different_fork = { let voted_on_different_fork = {
if let Some(ref vote_bank) = vote_bank { if let Some(ref vote_bank) = vote_bank {
subscriptions.notify_subscribers(vote_bank.slot(), &bank_forks); subscriptions.notify_subscribers(block_commitment_cache.read().unwrap().slot(), &bank_forks);
if let Some(votable_leader) = leader_schedule_cache if let Some(votable_leader) = leader_schedule_cache
.slot_leader_at(vote_bank.slot(), Some(vote_bank)) .slot_leader_at(vote_bank.slot(), Some(vote_bank))
{ {
@ -1904,7 +1904,10 @@ pub(crate) mod tests {
); );
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
));
let mut bank_forks = BankForks::new(0, bank0); let mut bank_forks = BankForks::new(0, bank0);
// Insert a non-root bank so that the propagation logic will update this // Insert a non-root bank so that the propagation logic will update this

View File

@ -13,7 +13,7 @@ use solana_ledger::{
bank_forks::BankForks, blockstore::Blockstore, rooted_slot_iterator::RootedSlotIterator, bank_forks::BankForks, blockstore::Blockstore, rooted_slot_iterator::RootedSlotIterator,
}; };
use solana_perf::packet::PACKET_DATA_SIZE; use solana_perf::packet::PACKET_DATA_SIZE;
use solana_runtime::{bank::Bank, status_cache::SignatureConfirmationStatus}; use solana_runtime::bank::Bank;
use solana_sdk::{ use solana_sdk::{
clock::{Slot, UnixTimestamp}, clock::{Slot, UnixTimestamp},
commitment_config::{CommitmentConfig, CommitmentLevel}, commitment_config::{CommitmentConfig, CommitmentLevel},
@ -196,11 +196,9 @@ impl JsonRpcRequestProcessor {
match signature { match signature {
Err(e) => Err(e), Err(e) => Err(e),
Ok(sig) => { Ok(sig) => {
let status = bank.get_signature_confirmation_status(&sig); let status = bank.get_signature_status(&sig);
match status { match status {
Some(SignatureConfirmationStatus { status, .. }) => { Some(status) => new_response(bank, status.is_ok()),
new_response(bank, status.is_ok())
}
None => new_response(bank, false), None => new_response(bank, false),
} }
} }
@ -409,21 +407,24 @@ impl JsonRpcRequestProcessor {
let bank = self.bank(commitment); let bank = self.bank(commitment);
for signature in signatures { for signature in signatures {
let status = bank.get_signature_confirmation_status(&signature).map( let status = bank
|SignatureConfirmationStatus { .get_signature_status_slot(&signature)
slot, .map(|(slot, status)| {
status, let r_block_commitment_cache = self.block_commitment_cache.read().unwrap();
confirmations,
}| TransactionStatus { let confirmations = if r_block_commitment_cache.root() >= slot {
slot,
status,
confirmations: if confirmations <= MAX_LOCKOUT_HISTORY {
Some(confirmations)
} else {
None None
}, } else {
}, r_block_commitment_cache
); .get_confirmation_count(slot)
.or(Some(0))
};
TransactionStatus {
slot,
status,
confirmations,
}
});
statuses.push(status); statuses.push(status);
} }
Ok(Response { Ok(Response {
@ -1237,8 +1238,10 @@ pub mod tests {
blockstore.clone(), blockstore.clone(),
); );
let commitment_slot0 = BlockCommitment::new([8; MAX_LOCKOUT_HISTORY]); let mut commitment_slot0 = BlockCommitment::default();
let commitment_slot1 = BlockCommitment::new([9; MAX_LOCKOUT_HISTORY]); commitment_slot0.increase_confirmation_stake(2, 9);
let mut commitment_slot1 = BlockCommitment::default();
commitment_slot1.increase_confirmation_stake(1, 9);
let mut block_commitment: HashMap<u64, BlockCommitment> = HashMap::new(); let mut block_commitment: HashMap<u64, BlockCommitment> = HashMap::new();
block_commitment block_commitment
.entry(0) .entry(0)
@ -1248,7 +1251,7 @@ pub mod tests {
.or_insert(commitment_slot1.clone()); .or_insert(commitment_slot1.clone());
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new( let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new(
block_commitment, block_commitment,
42, 10,
bank.clone(), bank.clone(),
0, 0,
))); )));
@ -1774,7 +1777,9 @@ pub mod tests {
let result: Option<TransactionStatus> = let result: Option<TransactionStatus> =
serde_json::from_value(json["result"]["value"][0].clone()) serde_json::from_value(json["result"]["value"][0].clone())
.expect("actual response deserialization"); .expect("actual response deserialization");
assert_eq!(expected_res, result.as_ref().unwrap().status); let result = result.as_ref().unwrap();
assert_eq!(expected_res, result.status);
assert_eq!(None, result.confirmations);
// Test getSignatureStatus request on unprocessed tx // Test getSignatureStatus request on unprocessed tx
let tx = system_transaction::transfer(&alice, &bob_pubkey, 10, blockhash); let tx = system_transaction::transfer(&alice, &bob_pubkey, 10, blockhash);
@ -2211,7 +2216,7 @@ pub mod tests {
.get_block_commitment(0) .get_block_commitment(0)
.map(|block_commitment| block_commitment.commitment) .map(|block_commitment| block_commitment.commitment)
); );
assert_eq!(total_stake, 42); assert_eq!(total_stake, 10);
let req = let req =
format!(r#"{{"jsonrpc":"2.0","id":1,"method":"getBlockCommitment","params":[2]}}"#); format!(r#"{{"jsonrpc":"2.0","id":1,"method":"getBlockCommitment","params":[2]}}"#);
@ -2229,7 +2234,7 @@ pub mod tests {
panic!("Expected single response"); panic!("Expected single response");
}; };
assert_eq!(commitment_response.commitment, None); assert_eq!(commitment_response.commitment, None);
assert_eq!(commitment_response.total_stake, 42); assert_eq!(commitment_response.total_stake, 10);
} }
#[test] #[test]

View File

@ -312,7 +312,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::rpc_subscriptions::tests::robust_poll_or_panic; use crate::{
commitment::{BlockCommitment, BlockCommitmentCache},
rpc_subscriptions::tests::robust_poll_or_panic,
};
use jsonrpc_core::{futures::sync::mpsc, Response}; use jsonrpc_core::{futures::sync::mpsc, Response};
use jsonrpc_pubsub::{PubSubHandler, Session}; use jsonrpc_pubsub::{PubSubHandler, Session};
use solana_budget_program::{self, budget_instruction}; use solana_budget_program::{self, budget_instruction};
@ -325,7 +328,12 @@ mod tests {
system_program, system_transaction, system_program, system_transaction,
transaction::{self, Transaction}, transaction::{self, Transaction},
}; };
use std::{sync::RwLock, thread::sleep, time::Duration}; use std::{
collections::HashMap,
sync::{atomic::AtomicBool, RwLock},
thread::sleep,
time::Duration,
};
fn process_transaction_and_notify( fn process_transaction_and_notify(
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
@ -358,8 +366,13 @@ mod tests {
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let rpc = RpcSolPubSubImpl {
let rpc = RpcSolPubSubImpl::default(); subscriptions: Arc::new(RpcSubscriptions::new(
&Arc::new(AtomicBool::new(false)),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
)),
..RpcSolPubSubImpl::default()
};
// Test signature subscriptions // Test signature subscriptions
let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash); let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash);
@ -457,7 +470,13 @@ mod tests {
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let rpc = RpcSolPubSubImpl::default(); let rpc = RpcSolPubSubImpl {
subscriptions: Arc::new(RpcSubscriptions::new(
&Arc::new(AtomicBool::new(false)),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
)),
..RpcSolPubSubImpl::default()
};
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification");
rpc.account_subscribe( rpc.account_subscribe(
@ -591,7 +610,13 @@ mod tests {
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let bob = Keypair::new(); let bob = Keypair::new();
let rpc = RpcSolPubSubImpl::default(); let mut rpc = RpcSolPubSubImpl::default();
let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
);
rpc.subscriptions = Arc::new(subscriptions);
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification");
rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2));
@ -622,7 +647,12 @@ mod tests {
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let bob = Keypair::new(); let bob = Keypair::new();
let rpc = RpcSolPubSubImpl::default(); let mut rpc = RpcSolPubSubImpl::default();
let exit = Arc::new(AtomicBool::new(false));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests()));
let subscriptions = RpcSubscriptions::new(&exit, block_commitment_cache.clone());
rpc.subscriptions = Arc::new(subscriptions);
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification");
rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2));
@ -640,10 +670,32 @@ mod tests {
let bank0 = bank_forks.read().unwrap()[0].clone(); let bank0 = bank_forks.read().unwrap()[0].clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1); bank_forks.write().unwrap().insert(bank1);
rpc.subscriptions.notify_subscribers(1, &bank_forks);
let bank1 = bank_forks.read().unwrap()[1].clone(); let bank1 = bank_forks.read().unwrap()[1].clone();
let mut cache0 = BlockCommitment::default();
cache0.increase_confirmation_stake(1, 10);
let mut block_commitment = HashMap::new();
block_commitment.entry(0).or_insert(cache0.clone());
let mut new_block_commitment =
BlockCommitmentCache::new(block_commitment, 10, bank1.clone(), 0);
let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();
std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment);
drop(w_block_commitment_cache);
rpc.subscriptions.notify_subscribers(1, &bank_forks);
let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2);
bank_forks.write().unwrap().insert(bank2); bank_forks.write().unwrap().insert(bank2);
let bank2 = bank_forks.read().unwrap()[2].clone();
let mut cache0 = BlockCommitment::default();
cache0.increase_confirmation_stake(2, 10);
let mut block_commitment = HashMap::new();
block_commitment.entry(0).or_insert(cache0.clone());
let mut new_block_commitment = BlockCommitmentCache::new(block_commitment, 10, bank2, 0);
let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();
std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment);
drop(w_block_commitment_cache);
rpc.subscriptions.notify_subscribers(2, &bank_forks); rpc.subscriptions.notify_subscribers(2, &bank_forks);
let expected = json!({ let expected = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",

View File

@ -1,14 +1,20 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request //! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl}; use crate::{
use crate::rpc_subscriptions::RpcSubscriptions; rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl},
rpc_subscriptions::RpcSubscriptions,
};
use jsonrpc_pubsub::{PubSubHandler, Session}; use jsonrpc_pubsub::{PubSubHandler, Session};
use jsonrpc_ws_server::{RequestContext, ServerBuilder}; use jsonrpc_ws_server::{RequestContext, ServerBuilder};
use std::net::SocketAddr; use std::{
use std::sync::atomic::{AtomicBool, Ordering}; net::SocketAddr,
use std::sync::Arc; sync::{
use std::thread::{self, sleep, Builder, JoinHandle}; atomic::{AtomicBool, Ordering},
use std::time::Duration; Arc,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
};
pub struct PubSubService { pub struct PubSubService {
thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<()>,
@ -66,13 +72,20 @@ impl PubSubService {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::net::{IpAddr, Ipv4Addr}; use crate::commitment::BlockCommitmentCache;
use std::{
net::{IpAddr, Ipv4Addr},
sync::RwLock,
};
#[test] #[test]
fn test_pubsub_new() { fn test_pubsub_new() {
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
));
let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit);
let thread = pubsub_service.thread_hdl.thread(); let thread = pubsub_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-pubsub"); assert_eq!(thread.name().unwrap(), "solana-pubsub");

View File

@ -1,5 +1,6 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request //! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::commitment::BlockCommitmentCache;
use core::hash::Hash; use core::hash::Hash;
use jsonrpc_core::futures::Future; use jsonrpc_core::futures::Future;
use jsonrpc_pubsub::{ use jsonrpc_pubsub::{
@ -14,11 +15,14 @@ use solana_sdk::{
account::Account, clock::Slot, pubkey::Pubkey, signature::Signature, transaction, account::Account, clock::Slot, pubkey::Pubkey, signature::Signature, transaction,
}; };
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{
use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender}; atomic::{AtomicBool, Ordering},
mpsc::{Receiver, RecvTimeoutError, SendError, Sender},
};
use std::thread::{Builder, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use std::{ use std::{
cmp::min,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
iter, iter,
sync::{Arc, Mutex, RwLock}, sync::{Arc, Mutex, RwLock},
@ -80,11 +84,7 @@ fn add_subscription<K, S>(
{ {
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let confirmations = confirmations.unwrap_or(0); let confirmations = confirmations.unwrap_or(0);
let confirmations = if confirmations > MAX_LOCKOUT_HISTORY { let confirmations = min(confirmations, MAX_LOCKOUT_HISTORY + 1);
MAX_LOCKOUT_HISTORY
} else {
confirmations
};
if let Some(current_hashmap) = subscriptions.get_mut(&hashmap_key) { if let Some(current_hashmap) = subscriptions.get_mut(&hashmap_key) {
current_hashmap.insert(sub_id, (sink, confirmations)); current_hashmap.insert(sub_id, (sink, confirmations));
return; return;
@ -120,8 +120,8 @@ where
fn check_confirmations_and_notify<K, S, B, F, X>( fn check_confirmations_and_notify<K, S, B, F, X>(
subscriptions: &HashMap<K, HashMap<SubscriptionId, (Sink<Response<S>>, Confirmations)>>, subscriptions: &HashMap<K, HashMap<SubscriptionId, (Sink<Response<S>>, Confirmations)>>,
hashmap_key: &K, hashmap_key: &K,
current_slot: Slot,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
bank_method: B, bank_method: B,
filter_results: F, filter_results: F,
notifier: &RpcNotifier, notifier: &RpcNotifier,
@ -133,6 +133,10 @@ where
F: Fn(X, u64) -> Box<dyn Iterator<Item = S>>, F: Fn(X, u64) -> Box<dyn Iterator<Item = S>>,
X: Clone + Serialize, X: Clone + Serialize,
{ {
let mut confirmation_slots: HashMap<usize, Slot> = HashMap::new();
let r_block_commitment_cache = block_commitment_cache.read().unwrap();
let current_slot = r_block_commitment_cache.slot();
let root = r_block_commitment_cache.root();
let current_ancestors = bank_forks let current_ancestors = bank_forks
.read() .read()
.unwrap() .unwrap()
@ -140,25 +144,24 @@ where
.unwrap() .unwrap()
.ancestors .ancestors
.clone(); .clone();
for (slot, _) in current_ancestors.iter() {
if let Some(confirmations) = r_block_commitment_cache.get_confirmation_count(*slot) {
confirmation_slots.entry(confirmations).or_insert(*slot);
}
}
drop(r_block_commitment_cache);
let mut notified_set: HashSet<SubscriptionId> = HashSet::new(); let mut notified_set: HashSet<SubscriptionId> = HashSet::new();
if let Some(hashmap) = subscriptions.get(hashmap_key) { if let Some(hashmap) = subscriptions.get(hashmap_key) {
for (sub_id, (sink, confirmations)) in hashmap.iter() { for (sub_id, (sink, confirmations)) in hashmap.iter() {
let desired_slot: Vec<u64> = current_ancestors let desired_slot = if *confirmations == 0 {
.iter() Some(&current_slot)
.filter(|(_, &v)| v == *confirmations) } else if *confirmations == MAX_LOCKOUT_HISTORY + 1 {
.map(|(k, _)| k) Some(&root)
.cloned() } else {
.collect(); confirmation_slots.get(confirmations)
let root: Vec<u64> = current_ancestors };
.iter() if let Some(&slot) = desired_slot {
.filter(|(_, &v)| v == 32)
.map(|(k, _)| k)
.cloned()
.collect();
let root = if root.len() == 1 { root[0] } else { 0 };
if desired_slot.len() == 1 {
let slot = desired_slot[0];
let results = { let results = {
let bank_forks = bank_forks.read().unwrap(); let bank_forks = bank_forks.read().unwrap();
let desired_bank = bank_forks.get(slot).unwrap(); let desired_bank = bank_forks.get(slot).unwrap();
@ -239,7 +242,10 @@ pub struct RpcSubscriptions {
impl Default for RpcSubscriptions { impl Default for RpcSubscriptions {
fn default() -> Self { fn default() -> Self {
Self::new(&Arc::new(AtomicBool::new(false))) Self::new(
&Arc::new(AtomicBool::new(false)),
Arc::new(RwLock::new(BlockCommitmentCache::default())),
)
} }
} }
@ -252,7 +258,10 @@ impl Drop for RpcSubscriptions {
} }
impl RpcSubscriptions { impl RpcSubscriptions {
pub fn new(exit: &Arc<AtomicBool>) -> Self { pub fn new(
exit: &Arc<AtomicBool>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
) -> Self {
let (notification_sender, notification_receiver): ( let (notification_sender, notification_receiver): (
Sender<NotificationEntry>, Sender<NotificationEntry>,
Receiver<NotificationEntry>, Receiver<NotificationEntry>,
@ -291,6 +300,7 @@ impl RpcSubscriptions {
signature_subscriptions_clone, signature_subscriptions_clone,
slot_subscriptions_clone, slot_subscriptions_clone,
root_subscriptions_clone, root_subscriptions_clone,
block_commitment_cache,
); );
}) })
.unwrap(); .unwrap();
@ -310,8 +320,8 @@ impl RpcSubscriptions {
fn check_account( fn check_account(
pubkey: &Pubkey, pubkey: &Pubkey,
current_slot: Slot,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
account_subscriptions: Arc<RpcAccountSubscriptions>, account_subscriptions: Arc<RpcAccountSubscriptions>,
notifier: &RpcNotifier, notifier: &RpcNotifier,
) { ) {
@ -319,8 +329,8 @@ impl RpcSubscriptions {
check_confirmations_and_notify( check_confirmations_and_notify(
&subscriptions, &subscriptions,
pubkey, pubkey,
current_slot,
bank_forks, bank_forks,
block_commitment_cache,
Bank::get_account_modified_since_parent, Bank::get_account_modified_since_parent,
filter_account_result, filter_account_result,
notifier, notifier,
@ -329,8 +339,8 @@ impl RpcSubscriptions {
fn check_program( fn check_program(
program_id: &Pubkey, program_id: &Pubkey,
current_slot: Slot,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
program_subscriptions: Arc<RpcProgramSubscriptions>, program_subscriptions: Arc<RpcProgramSubscriptions>,
notifier: &RpcNotifier, notifier: &RpcNotifier,
) { ) {
@ -338,8 +348,8 @@ impl RpcSubscriptions {
check_confirmations_and_notify( check_confirmations_and_notify(
&subscriptions, &subscriptions,
program_id, program_id,
current_slot,
bank_forks, bank_forks,
block_commitment_cache,
Bank::get_program_accounts_modified_since_parent, Bank::get_program_accounts_modified_since_parent,
filter_program_results, filter_program_results,
notifier, notifier,
@ -348,8 +358,8 @@ impl RpcSubscriptions {
fn check_signature( fn check_signature(
signature: &Signature, signature: &Signature,
current_slot: Slot,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
signature_subscriptions: Arc<RpcSignatureSubscriptions>, signature_subscriptions: Arc<RpcSignatureSubscriptions>,
notifier: &RpcNotifier, notifier: &RpcNotifier,
) { ) {
@ -357,8 +367,8 @@ impl RpcSubscriptions {
let notified_ids = check_confirmations_and_notify( let notified_ids = check_confirmations_and_notify(
&subscriptions, &subscriptions,
signature, signature,
current_slot,
bank_forks, bank_forks,
block_commitment_cache,
Bank::get_signature_status_processed_since_parent, Bank::get_signature_status_processed_since_parent,
filter_signature_result, filter_signature_result,
notifier, notifier,
@ -502,6 +512,7 @@ impl RpcSubscriptions {
signature_subscriptions: Arc<RpcSignatureSubscriptions>, signature_subscriptions: Arc<RpcSignatureSubscriptions>,
slot_subscriptions: Arc<RpcSlotSubscriptions>, slot_subscriptions: Arc<RpcSlotSubscriptions>,
root_subscriptions: Arc<RpcRootSubscriptions>, root_subscriptions: Arc<RpcRootSubscriptions>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
) { ) {
loop { loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
@ -521,7 +532,7 @@ impl RpcSubscriptions {
notifier.notify(root, sink); notifier.notify(root, sink);
} }
} }
NotificationEntry::Bank((current_slot, bank_forks)) => { NotificationEntry::Bank((_current_slot, bank_forks)) => {
let pubkeys: Vec<_> = { let pubkeys: Vec<_> = {
let subs = account_subscriptions.read().unwrap(); let subs = account_subscriptions.read().unwrap();
subs.keys().cloned().collect() subs.keys().cloned().collect()
@ -529,8 +540,8 @@ impl RpcSubscriptions {
for pubkey in &pubkeys { for pubkey in &pubkeys {
Self::check_account( Self::check_account(
pubkey, pubkey,
current_slot,
&bank_forks, &bank_forks,
&block_commitment_cache,
account_subscriptions.clone(), account_subscriptions.clone(),
&notifier, &notifier,
); );
@ -543,8 +554,8 @@ impl RpcSubscriptions {
for program_id in &programs { for program_id in &programs {
Self::check_program( Self::check_program(
program_id, program_id,
current_slot,
&bank_forks, &bank_forks,
&block_commitment_cache,
program_subscriptions.clone(), program_subscriptions.clone(),
&notifier, &notifier,
); );
@ -557,8 +568,8 @@ impl RpcSubscriptions {
for signature in &signatures { for signature in &signatures {
Self::check_signature( Self::check_signature(
signature, signature,
current_slot,
&bank_forks, &bank_forks,
&block_commitment_cache,
signature_subscriptions.clone(), signature_subscriptions.clone(),
&notifier, &notifier,
); );
@ -599,6 +610,7 @@ impl RpcSubscriptions {
#[cfg(test)] #[cfg(test)]
pub(crate) mod tests { pub(crate) mod tests {
use super::*; use super::*;
use crate::commitment::BlockCommitment;
use jsonrpc_core::futures::{self, stream::Stream}; use jsonrpc_core::futures::{self, stream::Stream};
use jsonrpc_pubsub::typed::Subscriber; use jsonrpc_pubsub::typed::Subscriber;
use solana_budget_program; use solana_budget_program;
@ -666,7 +678,10 @@ pub(crate) mod tests {
Subscriber::new_test("accountNotification"); Subscriber::new_test("accountNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sub_id = SubscriptionId::Number(0 as u64);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit); let subscriptions = RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
);
subscriptions.add_account_subscription(alice.pubkey(), None, sub_id.clone(), subscriber); subscriptions.add_account_subscription(alice.pubkey(), None, sub_id.clone(), subscriber);
assert!(subscriptions assert!(subscriptions
@ -735,7 +750,10 @@ pub(crate) mod tests {
Subscriber::new_test("programNotification"); Subscriber::new_test("programNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sub_id = SubscriptionId::Number(0 as u64);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit); let subscriptions = RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
);
subscriptions.add_program_subscription( subscriptions.add_program_subscription(
solana_budget_program::id(), solana_budget_program::id(),
None, None,
@ -815,27 +833,41 @@ pub(crate) mod tests {
.unwrap() .unwrap()
.process_transaction(&processed_tx) .process_transaction(&processed_tx)
.unwrap(); .unwrap();
let bank1 = bank_forks[1].clone();
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
let exit = Arc::new(AtomicBool::new(false)); let mut cache0 = BlockCommitment::default();
let subscriptions = RpcSubscriptions::new(&exit); cache0.increase_confirmation_stake(1, 10);
let cache1 = BlockCommitment::default();
let (past_bank_sub, _id_receiver, past_bank_recv) = let mut block_commitment = HashMap::new();
block_commitment.entry(0).or_insert(cache0.clone());
block_commitment.entry(1).or_insert(cache1.clone());
let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 10, bank1, 0);
let exit = Arc::new(AtomicBool::new(false));
let subscriptions =
RpcSubscriptions::new(&exit, Arc::new(RwLock::new(block_commitment_cache)));
let (past_bank_sub1, _id_receiver, past_bank_recv1) =
Subscriber::new_test("signatureNotification");
let (past_bank_sub2, _id_receiver, past_bank_recv2) =
Subscriber::new_test("signatureNotification"); Subscriber::new_test("signatureNotification");
let (processed_sub, _id_receiver, processed_recv) = let (processed_sub, _id_receiver, processed_recv) =
Subscriber::new_test("signatureNotification"); Subscriber::new_test("signatureNotification");
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
past_bank_tx.signatures[0], past_bank_tx.signatures[0],
Some(0), Some(0),
SubscriptionId::Number(1 as u64), SubscriptionId::Number(1 as u64),
Subscriber::new_test("signatureNotification").0, past_bank_sub1,
); );
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
past_bank_tx.signatures[0], past_bank_tx.signatures[0],
Some(1), Some(1),
SubscriptionId::Number(2 as u64), SubscriptionId::Number(2 as u64),
past_bank_sub, past_bank_sub2,
); );
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
processed_tx.signatures[0], processed_tx.signatures[0],
@ -860,41 +892,46 @@ pub(crate) mod tests {
subscriptions.notify_subscribers(1, &bank_forks); subscriptions.notify_subscribers(1, &bank_forks);
let expected_res: Option<transaction::Result<()>> = Some(Ok(())); let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
let expected = json!({ struct Notification {
"jsonrpc": "2.0", slot: Slot,
"method": "signatureNotification", id: u64,
"params": { }
"result": {
"context": { "slot": 0 },
"value": expected_res,
},
"subscription": 2,
}
});
let (response, _) = robust_poll_or_panic(past_bank_recv);
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
let expected = json!({ let expected_notification = |exp: Notification| -> String {
"jsonrpc": "2.0", let json = json!({
"method": "signatureNotification", "jsonrpc": "2.0",
"params": { "method": "signatureNotification",
"result": { "params": {
"context": { "slot": 1 }, "result": {
"value": expected_res, "context": { "slot": exp.slot },
}, "value": &expected_res,
"subscription": 3, },
} "subscription": exp.id,
}); }
});
serde_json::to_string(&json).unwrap()
};
// Expect to receive a notification from bank 1 because this subscription is
// looking for 0 confirmations and so checks the current bank
let expected = expected_notification(Notification { slot: 1, id: 1 });
let (response, _) = robust_poll_or_panic(past_bank_recv1);
assert_eq!(expected, response);
// Expect to receive a notification from bank 0 because this subscription is
// looking for 1 confirmation and so checks the past bank
let expected = expected_notification(Notification { slot: 0, id: 2 });
let (response, _) = robust_poll_or_panic(past_bank_recv2);
assert_eq!(expected, response);
let expected = expected_notification(Notification { slot: 1, id: 3 });
let (response, _) = robust_poll_or_panic(processed_recv); let (response, _) = robust_poll_or_panic(processed_recv);
assert_eq!(serde_json::to_string(&expected).unwrap(), response); assert_eq!(expected, response);
let sig_subs = subscriptions.signature_subscriptions.read().unwrap();
// Subscription should be automatically removed after notification // Subscription should be automatically removed after notification
let sig_subs = subscriptions.signature_subscriptions.read().unwrap();
assert!(!sig_subs.contains_key(&processed_tx.signatures[0])); assert!(!sig_subs.contains_key(&processed_tx.signatures[0]));
assert!(!sig_subs.contains_key(&past_bank_tx.signatures[0]));
// Only one notification is expected for signature processed in previous bank
assert_eq!(sig_subs.get(&past_bank_tx.signatures[0]).unwrap().len(), 1);
// Unprocessed signature subscription should not be removed // Unprocessed signature subscription should not be removed
assert_eq!( assert_eq!(
@ -909,7 +946,10 @@ pub(crate) mod tests {
Subscriber::new_test("slotNotification"); Subscriber::new_test("slotNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sub_id = SubscriptionId::Number(0 as u64);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit); let subscriptions = RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
);
subscriptions.add_slot_subscription(sub_id.clone(), subscriber); subscriptions.add_slot_subscription(sub_id.clone(), subscriber);
assert!(subscriptions assert!(subscriptions
@ -947,7 +987,10 @@ pub(crate) mod tests {
Subscriber::new_test("rootNotification"); Subscriber::new_test("rootNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sub_id = SubscriptionId::Number(0 as u64);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit); let subscriptions = RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
);
subscriptions.add_root_subscription(sub_id.clone(), subscriber); subscriptions.add_root_subscription(sub_id.clone(), subscriber);
assert!(subscriptions assert!(subscriptions

View File

@ -4,6 +4,7 @@
use crate::{ use crate::{
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
commitment::BlockCommitmentCache,
contact_info::ContactInfo, contact_info::ContactInfo,
result::{Error, Result}, result::{Error, Result},
}; };
@ -11,9 +12,7 @@ use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng; use rand_chacha::ChaChaRng;
use solana_chacha_cuda::chacha_cuda::chacha_cbc_encrypt_file_many_keys; use solana_chacha_cuda::chacha_cuda::chacha_cbc_encrypt_file_many_keys;
use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore}; use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore};
use solana_runtime::{ use solana_runtime::{bank::Bank, storage_utils::archiver_accounts};
bank::Bank, status_cache::SignatureConfirmationStatus, storage_utils::archiver_accounts,
};
use solana_sdk::{ use solana_sdk::{
account::Account, account::Account,
account_utils::StateMut, account_utils::StateMut,
@ -30,6 +29,7 @@ use solana_storage_program::{
storage_instruction, storage_instruction,
storage_instruction::proof_validation, storage_instruction::proof_validation,
}; };
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
use std::{ use std::{
cmp, cmp,
collections::HashMap, collections::HashMap,
@ -185,6 +185,7 @@ impl StorageStage {
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
) -> Self { ) -> Self {
let (instruction_sender, instruction_receiver) = channel(); let (instruction_sender, instruction_receiver) = channel();
@ -256,6 +257,7 @@ impl StorageStage {
&keypair, &keypair,
&storage_keypair, &storage_keypair,
&transactions_socket, &transactions_socket,
&block_commitment_cache,
) )
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
info!("failed to send storage transaction: {:?}", err) info!("failed to send storage transaction: {:?}", err)
@ -289,6 +291,7 @@ impl StorageStage {
keypair: &Arc<Keypair>, keypair: &Arc<Keypair>,
storage_keypair: &Arc<Keypair>, storage_keypair: &Arc<Keypair>,
transactions_socket: &UdpSocket, transactions_socket: &UdpSocket,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
) -> io::Result<()> { ) -> io::Result<()> {
let working_bank = bank_forks.read().unwrap().working_bank(); let working_bank = bank_forks.read().unwrap().working_bank();
let blockhash = working_bank.confirmed_last_blockhash().0; let blockhash = working_bank.confirmed_last_blockhash().0;
@ -323,8 +326,13 @@ impl StorageStage {
cluster_info.read().unwrap().my_data().tpu, cluster_info.read().unwrap().my_data().tpu,
)?; )?;
sleep(Duration::from_millis(100)); sleep(Duration::from_millis(100));
if Self::poll_for_signature_confirmation(bank_forks, &transaction.signatures[0], 0) if Self::poll_for_signature_confirmation(
.is_ok() bank_forks,
block_commitment_cache,
&transaction.signatures[0],
0,
)
.is_ok()
{ {
break; break;
}; };
@ -334,23 +342,24 @@ impl StorageStage {
fn poll_for_signature_confirmation( fn poll_for_signature_confirmation(
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
signature: &Signature, signature: &Signature,
min_confirmed_blocks: usize, min_confirmed_blocks: usize,
) -> Result<()> { ) -> Result<()> {
let mut now = Instant::now(); let mut now = Instant::now();
let mut confirmed_blocks = 0; let mut confirmed_blocks = 0;
loop { loop {
let response = bank_forks let working_bank = bank_forks.read().unwrap().working_bank();
.read() let response = working_bank.get_signature_status_slot(signature);
.unwrap() if let Some((slot, status)) = response {
.working_bank() let confirmations = if working_bank.src.roots().contains(&slot) {
.get_signature_confirmation_status(signature); MAX_LOCKOUT_HISTORY + 1
if let Some(SignatureConfirmationStatus { } else {
confirmations, let r_block_commitment_cache = block_commitment_cache.read().unwrap();
status, r_block_commitment_cache
.. .get_confirmation_count(slot)
}) = response .unwrap_or(0)
{ };
if status.is_ok() { if status.is_ok() {
if confirmed_blocks != confirmations { if confirmed_blocks != confirmations {
now = Instant::now(); now = Instant::now();
@ -655,12 +664,18 @@ mod tests {
use rayon::prelude::*; use rayon::prelude::*;
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::hash::Hasher; use solana_sdk::{
use solana_sdk::signature::{Keypair, Signer}; hash::Hasher,
use std::cmp::{max, min}; signature::{Keypair, Signer},
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; };
use std::sync::mpsc::channel; use std::{
use std::sync::{Arc, RwLock}; cmp::{max, min},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc::channel,
Arc, RwLock,
},
};
#[test] #[test]
fn test_storage_stage_none_ledger() { fn test_storage_stage_none_ledger() {
@ -675,6 +690,7 @@ mod tests {
&[bank.clone()], &[bank.clone()],
vec![0], vec![0],
))); )));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let (_slot_sender, slot_receiver) = channel(); let (_slot_sender, slot_receiver) = channel();
let storage_state = StorageState::new( let storage_state = StorageState::new(
&bank.last_blockhash(), &bank.last_blockhash(),
@ -690,6 +706,7 @@ mod tests {
&exit.clone(), &exit.clone(),
&bank_forks, &bank_forks,
&cluster_info, &cluster_info,
block_commitment_cache,
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
storage_stage.join().unwrap(); storage_stage.join().unwrap();

View File

@ -185,7 +185,7 @@ impl Tvu {
leader_schedule_cache: leader_schedule_cache.clone(), leader_schedule_cache: leader_schedule_cache.clone(),
latest_root_senders: vec![ledger_cleanup_slot_sender], latest_root_senders: vec![ledger_cleanup_slot_sender],
accounts_hash_sender: Some(accounts_hash_sender), accounts_hash_sender: Some(accounts_hash_sender),
block_commitment_cache, block_commitment_cache: block_commitment_cache.clone(),
transaction_status_sender, transaction_status_sender,
rewards_recorder_sender, rewards_recorder_sender,
}; };
@ -221,6 +221,7 @@ impl Tvu {
&exit, &exit,
&bank_forks, &bank_forks,
&cluster_info, &cluster_info,
block_commitment_cache,
); );
Tvu { Tvu {
@ -307,7 +308,10 @@ pub mod tests {
blockstore, blockstore,
&StorageState::default(), &StorageState::default(),
l_receiver, l_receiver,
&Arc::new(RpcSubscriptions::new(&exit)), &Arc::new(RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
)),
&poh_recorder, &poh_recorder,
&leader_schedule_cache, &leader_schedule_cache,
&exit, &exit,

View File

@ -234,7 +234,7 @@ impl Validator {
let blockstore = Arc::new(blockstore); let blockstore = Arc::new(blockstore);
let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); let subscriptions = Arc::new(RpcSubscriptions::new(&exit, block_commitment_cache.clone()));
let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| { let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| {
if ContactInfo::is_valid_address(&node.info.rpc) { if ContactInfo::is_valid_address(&node.info.rpc) {

View File

@ -3,8 +3,8 @@ use solana_client::{
rpc_client::RpcClient, rpc_client::RpcClient,
}; };
use solana_core::{ use solana_core::{
rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions, commitment::BlockCommitmentCache, rpc_pubsub_service::PubSubService,
validator::TestValidator, rpc_subscriptions::RpcSubscriptions, validator::TestValidator,
}; };
use solana_sdk::{ use solana_sdk::{
commitment_config::CommitmentConfig, pubkey::Pubkey, rpc_port, signature::Signer, commitment_config::CommitmentConfig, pubkey::Pubkey, rpc_port, signature::Signer,
@ -15,7 +15,7 @@ use std::{
net::{IpAddr, SocketAddr}, net::{IpAddr, SocketAddr},
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Arc, RwLock,
}, },
thread::sleep, thread::sleep,
time::{Duration, Instant}, time::{Duration, Instant},
@ -85,7 +85,10 @@ fn test_slot_subscription() {
rpc_port::DEFAULT_RPC_PUBSUB_PORT, rpc_port::DEFAULT_RPC_PUBSUB_PORT,
); );
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
));
let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit);
std::thread::sleep(Duration::from_millis(400)); std::thread::sleep(Duration::from_millis(400));

View File

@ -3,28 +3,36 @@
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use log::*; use log::*;
use solana_core::storage_stage::{test_cluster_info, SLOTS_PER_TURN_TEST}; use solana_core::{
use solana_core::storage_stage::{StorageStage, StorageState}; commitment::BlockCommitmentCache,
use solana_ledger::bank_forks::BankForks; storage_stage::{test_cluster_info, StorageStage, StorageState, SLOTS_PER_TURN_TEST},
use solana_ledger::blockstore_processor; };
use solana_ledger::entry; use solana_ledger::{
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; bank_forks::BankForks,
use solana_ledger::{blockstore::Blockstore, create_new_tmp_ledger}; blockstore::Blockstore,
blockstore_processor, create_new_tmp_ledger, entry,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
};
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT; use solana_sdk::{
use solana_sdk::hash::Hash; clock::DEFAULT_TICKS_PER_SLOT,
use solana_sdk::message::Message; hash::Hash,
use solana_sdk::pubkey::Pubkey; message::Message,
use solana_sdk::signature::{Keypair, Signer}; pubkey::Pubkey,
use solana_sdk::transaction::Transaction; signature::{Keypair, Signer},
use solana_storage_program::storage_instruction; transaction::Transaction,
use solana_storage_program::storage_instruction::StorageAccountType; };
use std::fs::remove_dir_all; use solana_storage_program::storage_instruction::{self, StorageAccountType};
use std::sync::atomic::{AtomicBool, Ordering}; use std::{
use std::sync::mpsc::channel; fs::remove_dir_all,
use std::sync::{Arc, RwLock}; sync::{
use std::thread::sleep; atomic::{AtomicBool, Ordering},
use std::time::Duration; mpsc::channel,
Arc, RwLock,
},
thread::sleep,
time::Duration,
};
#[test] #[test]
fn test_storage_stage_process_account_proofs() { fn test_storage_stage_process_account_proofs() {
@ -52,6 +60,7 @@ mod tests {
&[bank.clone()], &[bank.clone()],
vec![0], vec![0],
))); )));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let cluster_info = test_cluster_info(&keypair.pubkey()); let cluster_info = test_cluster_info(&keypair.pubkey());
let (bank_sender, bank_receiver) = channel(); let (bank_sender, bank_receiver) = channel();
@ -69,6 +78,7 @@ mod tests {
&exit.clone(), &exit.clone(),
&bank_forks, &bank_forks,
&cluster_info, &cluster_info,
block_commitment_cache,
); );
bank_sender.send(vec![bank.clone()]).unwrap(); bank_sender.send(vec![bank.clone()]).unwrap();
@ -171,6 +181,7 @@ mod tests {
&[bank.clone()], &[bank.clone()],
vec![0], vec![0],
))); )));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let cluster_info = test_cluster_info(&keypair.pubkey()); let cluster_info = test_cluster_info(&keypair.pubkey());
let (bank_sender, bank_receiver) = channel(); let (bank_sender, bank_receiver) = channel();
@ -188,6 +199,7 @@ mod tests {
&exit.clone(), &exit.clone(),
&bank_forks, &bank_forks,
&cluster_info, &cluster_info,
block_commitment_cache,
); );
bank_sender.send(vec![bank.clone()]).unwrap(); bank_sender.send(vec![bank.clone()]).unwrap();

View File

@ -118,7 +118,7 @@ Many methods that take a commitment parameter return an RpcResponse JSON object
### confirmTransaction ### confirmTransaction
Returns a transaction receipt Returns a transaction receipt. This method only searches the recent status cache of signatures, which retains all active slots plus `MAX_RECENT_BLOCKHASHES` rooted slots.
#### Parameters: #### Parameters:
@ -656,14 +656,13 @@ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0","id":1, "m
### getSignatureStatus ### getSignatureStatus
Returns the status of a given signature. This method is similar to [confirmTransaction](jsonrpc-api.md#confirmtransaction) but provides more resolution for error events. Returns the status of a given signature. This method is similar to [confirmTransaction](jsonrpc-api.md#confirmtransaction) but provides more resolution for error events. This method only searches the recent status cache of signatures, which retains all active slots plus `MAX_RECENT_BLOCKHASHES` rooted slots.
#### Parameters: #### Parameters:
* `<array>` - An array of transaction signatures to confirm, as base-58 encoded strings * `<array>` - An array of transaction signatures to confirm, as base-58 encoded strings
* `<object>` - (optional) Extended Rpc configuration, containing the following optional fields: * `<object>` - (optional) Extended Rpc configuration, containing the following optional fields:
* `commitment: <string>` - [Commitment](jsonrpc-api.md#configuring-state-commitment) * `commitment: <string>` - [Commitment](jsonrpc-api.md#configuring-state-commitment)
* `searchTransactionHistory: <bool>` - whether to search the ledger transaction status cache, which may be expensive
#### Results: #### Results:

View File

@ -14,7 +14,7 @@ use crate::{
deserialize_atomicbool, deserialize_atomicu64, serialize_atomicbool, serialize_atomicu64, deserialize_atomicbool, deserialize_atomicu64, serialize_atomicbool, serialize_atomicu64,
}, },
stakes::Stakes, stakes::Stakes,
status_cache::{SignatureConfirmationStatus, SlotDelta, StatusCache}, status_cache::{SlotDelta, StatusCache},
storage_utils, storage_utils,
storage_utils::StorageAccounts, storage_utils::StorageAccounts,
system_instruction_processor::{get_system_account_kind, SystemAccountKind}, system_instruction_processor::{get_system_account_kind, SystemAccountKind},
@ -1845,29 +1845,25 @@ impl Bank {
&self, &self,
signature: &Signature, signature: &Signature,
) -> Option<Result<()>> { ) -> Option<Result<()>> {
if let Some(status) = self.get_signature_confirmation_status(signature) { if let Some((slot, status)) = self.get_signature_status_slot(signature) {
if status.slot == self.slot() { if slot <= self.slot() {
return Some(status.status); return Some(status);
} }
} }
None None
} }
pub fn get_signature_confirmation_status( pub fn get_signature_status_slot(&self, signature: &Signature) -> Option<(Slot, Result<()>)> {
&self,
signature: &Signature,
) -> Option<SignatureConfirmationStatus<Result<()>>> {
let rcache = self.src.status_cache.read().unwrap(); let rcache = self.src.status_cache.read().unwrap();
rcache.get_signature_status_slow(signature, &self.ancestors) rcache.get_signature_slot(signature, &self.ancestors)
} }
pub fn get_signature_status(&self, signature: &Signature) -> Option<Result<()>> { pub fn get_signature_status(&self, signature: &Signature) -> Option<Result<()>> {
self.get_signature_confirmation_status(signature) self.get_signature_status_slot(signature).map(|v| v.1)
.map(|v| v.status)
} }
pub fn has_signature(&self, signature: &Signature) -> bool { pub fn has_signature(&self, signature: &Signature) -> bool {
self.get_signature_confirmation_status(signature).is_some() self.get_signature_status_slot(signature).is_some()
} }
/// Hash the `accounts` HashMap. This represents a validator's interpretation /// Hash the `accounts` HashMap. This represents a validator's interpretation

View File

@ -1,4 +1,4 @@
use crate::{bank::Bank, status_cache::SignatureConfirmationStatus}; use crate::bank::Bank;
use solana_sdk::{ use solana_sdk::{
account::Account, account::Account,
client::{AsyncClient, Client, SyncClient}, client::{AsyncClient, Client, SyncClient},
@ -184,26 +184,15 @@ impl SyncClient for BankClient {
signature: &Signature, signature: &Signature,
min_confirmed_blocks: usize, min_confirmed_blocks: usize,
) -> Result<usize> { ) -> Result<usize> {
let mut now = Instant::now(); // https://github.com/solana-labs/solana/issues/7199
let mut confirmed_blocks = 0; assert_eq!(min_confirmed_blocks, 1, "BankClient cannot observe the passage of multiple blocks, so min_confirmed_blocks must be 1");
let now = Instant::now();
let confirmed_blocks;
loop { loop {
let response = self.bank.get_signature_confirmation_status(signature); if self.bank.get_signature_status(signature).is_some() {
if let Some(SignatureConfirmationStatus { confirmed_blocks = 1;
confirmations, break;
status, }
..
}) = response
{
if status.is_ok() {
if confirmed_blocks != confirmations {
now = Instant::now();
confirmed_blocks = confirmations;
}
if confirmations >= min_confirmed_blocks {
break;
}
}
};
if now.elapsed().as_secs() > 15 { if now.elapsed().as_secs() > 15 {
return Err(TransportError::IoError(io::Error::new( return Err(TransportError::IoError(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,

View File

@ -103,29 +103,20 @@ impl<T: Serialize + Clone> StatusCache<T> {
None None
} }
pub fn get_signature_status_slow( pub fn get_signature_slot(
&self, &self,
sig: &Signature, signature: &Signature,
ancestors: &HashMap<Slot, usize>, ancestors: &HashMap<Slot, usize>,
) -> Option<SignatureConfirmationStatus<T>> { ) -> Option<(Slot, T)> {
trace!("get_signature_status_slow");
let mut keys = vec![]; let mut keys = vec![];
let mut val: Vec<_> = self.cache.iter().map(|(k, _)| *k).collect(); let mut val: Vec<_> = self.cache.iter().map(|(k, _)| *k).collect();
keys.append(&mut val); keys.append(&mut val);
for blockhash in keys.iter() { for blockhash in keys.iter() {
trace!("get_signature_status_slow: trying {}", blockhash); trace!("get_signature_slot: trying {}", blockhash);
if let Some((forkid, res)) = self.get_signature_status(sig, blockhash, ancestors) { let status = self.get_signature_status(signature, blockhash, ancestors);
trace!("get_signature_status_slow: got {}", forkid); if status.is_some() {
let confirmations = ancestors return status;
.get(&forkid)
.copied()
.unwrap_or_else(|| ancestors.len());
return Some(SignatureConfirmationStatus {
slot: forkid,
confirmations,
status: res,
});
} }
} }
None None
@ -265,10 +256,7 @@ mod tests {
status_cache.get_signature_status(&sig, &blockhash, &HashMap::new()), status_cache.get_signature_status(&sig, &blockhash, &HashMap::new()),
None None
); );
assert_eq!( assert_eq!(status_cache.get_signature_slot(&sig, &HashMap::new()), None);
status_cache.get_signature_status_slow(&sig, &HashMap::new()),
None
);
} }
#[test] #[test]
@ -283,12 +271,8 @@ mod tests {
Some((0, ())) Some((0, ()))
); );
assert_eq!( assert_eq!(
status_cache.get_signature_status_slow(&sig, &ancestors), status_cache.get_signature_slot(&sig, &ancestors),
Some(SignatureConfirmationStatus { Some((0, ()))
slot: 0,
confirmations: 1,
status: ()
})
); );
} }
@ -303,10 +287,7 @@ mod tests {
status_cache.get_signature_status(&sig, &blockhash, &ancestors), status_cache.get_signature_status(&sig, &blockhash, &ancestors),
None None
); );
assert_eq!( assert_eq!(status_cache.get_signature_slot(&sig, &ancestors), None);
status_cache.get_signature_status_slow(&sig, &ancestors),
None
);
} }
#[test] #[test]
@ -323,24 +304,6 @@ mod tests {
); );
} }
#[test]
fn test_find_sig_with_root_ancestor_fork_max_len() {
let sig = Signature::default();
let mut status_cache = BankStatusCache::default();
let blockhash = hash(Hash::default().as_ref());
let ancestors = vec![(2, 2)].into_iter().collect();
status_cache.insert(&blockhash, &sig, 0, ());
status_cache.add_root(0);
assert_eq!(
status_cache.get_signature_status_slow(&sig, &ancestors),
Some(SignatureConfirmationStatus {
slot: 0,
confirmations: ancestors.len(),
status: ()
})
);
}
#[test] #[test]
fn test_insert_picks_latest_blockhash_fork() { fn test_insert_picks_latest_blockhash_fork() {
let sig = Signature::default(); let sig = Signature::default();
@ -371,10 +334,6 @@ mod tests {
status_cache.get_signature_status(&sig, &blockhash, &ancestors), status_cache.get_signature_status(&sig, &blockhash, &ancestors),
None None
); );
assert_eq!(
status_cache.get_signature_status_slow(&sig, &ancestors),
None
);
} }
#[test] #[test]