removes repeated bank-forks locking in window-service

Window service is repeatedly locking bank-forks to look-up working-bank
for every single shred:
https://github.com/solana-labs/solana/blob/5fde4ee3a/core/src/window_service.rs#L597-L606

This commit updates shred_filter signature in recv_window so that where
we already obtain the lock on bank-forks, we can also look-up
working-bank once for all packets:
https://github.com/solana-labs/solana/blob/5fde4ee3a/core/src/window_service.rs#L256-L277
This commit is contained in:
behzad nouri
2021-08-13 10:46:02 -04:00
parent 01b00bc593
commit d57398a959

View File

@ -5,7 +5,6 @@ use crate::{
ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
cluster_info_vote_listener::VerifiedVoteReceiver, cluster_info_vote_listener::VerifiedVoteReceiver,
completed_data_sets_service::CompletedDataSetsSender, completed_data_sets_service::CompletedDataSetsSender,
outstanding_requests::OutstandingRequests,
repair_response, repair_response,
repair_service::{OutstandingShredRepairs, RepairInfo, RepairService}, repair_service::{OutstandingShredRepairs, RepairInfo, RepairService},
result::{Error, Result}, result::{Error, Result},
@ -88,10 +87,7 @@ pub fn should_retransmit_and_persist(
root: u64, root: u64,
shred_version: u16, shred_version: u16,
) -> bool { ) -> bool {
let slot_leader_pubkey = match bank { let slot_leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), bank.as_deref());
None => leader_schedule_cache.slot_leader_at(shred.slot(), None),
Some(bank) => leader_schedule_cache.slot_leader_at(shred.slot(), Some(&bank)),
};
if let Some(leader_id) = slot_leader_pubkey { if let Some(leader_id) = slot_leader_pubkey {
if leader_id == *my_pubkey { if leader_id == *my_pubkey {
inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1); inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1);
@ -265,7 +261,7 @@ fn recv_window<F>(
thread_pool: &ThreadPool, thread_pool: &ThreadPool,
) -> Result<()> ) -> Result<()>
where where
F: Fn(&Shred, u64) -> bool + Sync, F: Fn(&Shred, Arc<Bank>, /*last root:*/ Slot) -> bool + Sync,
{ {
let timer = Duration::from_millis(200); let timer = Duration::from_millis(200);
let mut packets = verified_receiver.recv_timeout(timer)?; let mut packets = verified_receiver.recv_timeout(timer)?;
@ -274,7 +270,10 @@ where
let now = Instant::now(); let now = Instant::now();
inc_new_counter_debug!("streamer-recv_window-recv", total_packets); inc_new_counter_debug!("streamer-recv_window-recv", total_packets);
let root_bank = bank_forks.read().unwrap().root_bank(); let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
let last_root = blockstore.last_root(); let last_root = blockstore.last_root();
let handle_packet = |packet: &mut Packet| { let handle_packet = |packet: &mut Packet| {
if packet.meta.discard { if packet.meta.discard {
@ -286,8 +285,9 @@ where
// call to `new_from_serialized_shred` is safe. // call to `new_from_serialized_shred` is safe.
assert_eq!(packet.data.len(), PACKET_DATA_SIZE); assert_eq!(packet.data.len(), PACKET_DATA_SIZE);
let serialized_shred = packet.data.to_vec(); let serialized_shred = packet.data.to_vec();
let working_bank = Arc::clone(&working_bank);
let shred = match Shred::new_from_serialized_shred(serialized_shred) { let shred = match Shred::new_from_serialized_shred(serialized_shred) {
Ok(shred) if shred_filter(&shred, last_root) => { Ok(shred) if shred_filter(&shred, working_bank, last_root) => {
let leader_pubkey = let leader_pubkey =
leader_schedule_cache.slot_leader_at(shred.slot(), Some(root_bank.deref())); leader_schedule_cache.slot_leader_at(shred.slot(), Some(root_bank.deref()));
packet.meta.slot = shred.slot(); packet.meta.slot = shred.slot();
@ -386,12 +386,11 @@ impl WindowService {
) -> WindowService ) -> WindowService
where where
F: 'static F: 'static
+ Fn(&Pubkey, &Shred, Option<Arc<Bank>>, u64) -> bool + Fn(&Pubkey, &Shred, Option<Arc<Bank>>, /*last root:*/ Slot) -> bool
+ std::marker::Send + std::marker::Send
+ std::marker::Sync, + std::marker::Sync,
{ {
let outstanding_requests: Arc<RwLock<OutstandingShredRepairs>> = let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
Arc::new(RwLock::new(OutstandingRequests::default()));
let bank_forks = repair_info.bank_forks.clone(); let bank_forks = repair_info.bank_forks.clone();
let cluster_info = repair_info.cluster_info.clone(); let cluster_info = repair_info.cluster_info.clone();
@ -559,7 +558,6 @@ impl WindowService {
let exit = exit.clone(); let exit = exit.clone();
let blockstore = blockstore.clone(); let blockstore = blockstore.clone();
let bank_forks = bank_forks.clone(); let bank_forks = bank_forks.clone();
let bank_forks_opt = Some(bank_forks.clone());
let leader_schedule_cache = leader_schedule_cache.clone(); let leader_schedule_cache = leader_schedule_cache.clone();
Builder::new() Builder::new()
.name("solana-window".to_string()) .name("solana-window".to_string())
@ -594,13 +592,11 @@ impl WindowService {
&id, &id,
&verified_receiver, &verified_receiver,
&retransmit, &retransmit,
|shred, last_root| { |shred, bank, last_root| {
shred_filter( shred_filter(
&id, &id,
shred, shred,
bank_forks_opt Some(bank),
.as_ref()
.map(|bank_forks| bank_forks.read().unwrap().working_bank()),
last_root, last_root,
) )
}, },