removes repeated bank-forks locking in window-service (backport #19210) (#20239)

* removes repeated bank-forks locking in window-service

Window service is repeatedly locking bank-forks to look-up working-bank
for every single shred:
https://github.com/solana-labs/solana/blob/5fde4ee3a/core/src/window_service.rs#L597-L606

This commit updates shred_filter signature in recv_window so that where
we already obtain the lock on bank-forks, we can also look-up
working-bank once for all packets:
https://github.com/solana-labs/solana/blob/5fde4ee3a/core/src/window_service.rs#L256-L277

(cherry picked from commit d57398a959)

# Conflicts:
#	core/src/window_service.rs

* removes erroneous uses of &Arc<...> from window-service

(cherry picked from commit b64eeb7729)

# Conflicts:
#	core/src/window_service.rs

* removes backport merge conflicts

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-09-27 13:48:58 +00:00
committed by GitHub
parent 30f0b3cf53
commit 502ae8b319
3 changed files with 101 additions and 107 deletions

View File

@ -623,9 +623,9 @@ impl RetransmitStage {
verified_receiver, verified_receiver,
retransmit_sender, retransmit_sender,
repair_socket, repair_socket,
exit, exit.clone(),
repair_info, repair_info,
leader_schedule_cache, leader_schedule_cache.clone(),
move |id, shred, working_bank, last_root| { move |id, shred, working_bank, last_root| {
let is_connected = cfg let is_connected = cfg
.as_ref() .as_ref()

View File

@ -1,44 +1,47 @@
//! `window_service` handles the data plane incoming shreds, storing them in //! `window_service` handles the data plane incoming shreds, storing them in
//! blockstore and retransmitting where required //! blockstore and retransmitting where required
//! //!
use crate::{ use {
crate::{
cluster_info_vote_listener::VerifiedVoteReceiver, cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
completed_data_sets_service::CompletedDataSetsSender, completed_data_sets_service::CompletedDataSetsSender,
outstanding_requests::OutstandingRequests,
repair_response, repair_response,
repair_service::{OutstandingRepairs, RepairInfo, RepairService}, repair_service::{OutstandingRepairs, RepairInfo, RepairService},
result::{Error, Result}, result::{Error, Result},
}; },
use crossbeam_channel::{ crossbeam_channel::{
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
}; },
use rayon::{prelude::*, ThreadPool}; rayon::{prelude::*, ThreadPool},
use solana_gossip::cluster_info::ClusterInfo; solana_gossip::cluster_info::ClusterInfo,
use solana_ledger::{ solana_ledger::{
blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT}, blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT},
leader_schedule_cache::LeaderScheduleCache, leader_schedule_cache::LeaderScheduleCache,
shred::{Nonce, Shred}, shred::{Nonce, Shred},
}; },
use solana_measure::measure::Measure; solana_measure::measure::Measure,
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; solana_metrics::{inc_new_counter_debug, inc_new_counter_error},
use solana_perf::packet::{Packet, Packets}; solana_perf::packet::{Packet, Packets},
use solana_rayon_threadlimit::get_thread_count; solana_rayon_threadlimit::get_thread_count,
use solana_runtime::{bank::Bank, bank_forks::BankForks}; solana_runtime::{bank::Bank, bank_forks::BankForks},
use solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}; solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms},
use solana_streamer::streamer::PacketSender; solana_streamer::streamer::PacketSender,
use std::collections::HashSet; std::collections::HashSet,
use std::{ std::{
net::{SocketAddr, UdpSocket}, net::{SocketAddr, UdpSocket},
ops::Deref, ops::Deref,
sync::atomic::{AtomicBool, Ordering}, sync::{
sync::{Arc, RwLock}, atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::{Duration, Instant}, time::{Duration, Instant},
},
}; };
pub type DuplicateSlotSender = CrossbeamSender<Slot>; type DuplicateSlotSender = CrossbeamSender<Slot>;
pub type DuplicateSlotReceiver = CrossbeamReceiver<Slot>; pub(crate) type DuplicateSlotReceiver = CrossbeamReceiver<Slot>;
#[derive(Default)] #[derive(Default)]
struct WindowServiceMetrics { struct WindowServiceMetrics {
@ -49,7 +52,7 @@ struct WindowServiceMetrics {
} }
impl WindowServiceMetrics { impl WindowServiceMetrics {
pub fn report_metrics(&self, metric_name: &'static str) { fn report_metrics(&self, metric_name: &'static str) {
datapoint_info!( datapoint_info!(
metric_name, metric_name,
("run_insert_count", self.run_insert_count as i64, i64), ("run_insert_count", self.run_insert_count as i64, i64),
@ -80,18 +83,15 @@ fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
/// drop shreds that are from myself or not from the correct leader for the /// drop shreds that are from myself or not from the correct leader for the
/// shred's slot /// shred's slot
pub fn should_retransmit_and_persist( pub(crate) fn should_retransmit_and_persist(
shred: &Shred, shred: &Shred,
bank: Option<Arc<Bank>>, bank: Option<Arc<Bank>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &LeaderScheduleCache,
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
root: u64, root: u64,
shred_version: u16, shred_version: u16,
) -> bool { ) -> bool {
let slot_leader_pubkey = match bank { let slot_leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), bank.as_deref());
None => leader_schedule_cache.slot_leader_at(shred.slot(), None),
Some(bank) => leader_schedule_cache.slot_leader_at(shred.slot(), Some(&bank)),
};
if let Some(leader_id) = slot_leader_pubkey { if let Some(leader_id) = slot_leader_pubkey {
if leader_id == *my_pubkey { if leader_id == *my_pubkey {
inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1); inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1);
@ -172,7 +172,7 @@ fn verify_repair(
fn prune_shreds_invalid_repair( fn prune_shreds_invalid_repair(
shreds: &mut Vec<Shred>, shreds: &mut Vec<Shred>,
repair_infos: &mut Vec<Option<RepairMeta>>, repair_infos: &mut Vec<Option<RepairMeta>>,
outstanding_requests: &Arc<RwLock<OutstandingRepairs>>, outstanding_requests: &RwLock<OutstandingRepairs>,
) { ) {
assert_eq!(shreds.len(), repair_infos.len()); assert_eq!(shreds.len(), repair_infos.len());
let mut i = 0; let mut i = 0;
@ -198,13 +198,13 @@ fn prune_shreds_invalid_repair(
fn run_insert<F>( fn run_insert<F>(
shred_receiver: &CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>, shred_receiver: &CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
blockstore: &Arc<Blockstore>, blockstore: &Blockstore,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &LeaderScheduleCache,
handle_duplicate: F, handle_duplicate: F,
metrics: &mut BlockstoreInsertionMetrics, metrics: &mut BlockstoreInsertionMetrics,
ws_metrics: &mut WindowServiceMetrics, ws_metrics: &mut WindowServiceMetrics,
completed_data_sets_sender: &CompletedDataSetsSender, completed_data_sets_sender: &CompletedDataSetsSender,
outstanding_requests: &Arc<RwLock<OutstandingRepairs>>, outstanding_requests: &RwLock<OutstandingRepairs>,
) -> Result<()> ) -> Result<()>
where where
F: Fn(Shred), F: Fn(Shred),
@ -251,9 +251,9 @@ where
} }
fn recv_window<F>( fn recv_window<F>(
blockstore: &Arc<Blockstore>, blockstore: &Blockstore,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &LeaderScheduleCache,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &RwLock<BankForks>,
insert_shred_sender: &CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>, insert_shred_sender: &CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
verified_receiver: &CrossbeamReceiver<Vec<Packets>>, verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
@ -262,7 +262,7 @@ fn recv_window<F>(
thread_pool: &ThreadPool, thread_pool: &ThreadPool,
) -> Result<()> ) -> Result<()>
where where
F: Fn(&Shred, u64) -> bool + Sync, F: Fn(&Shred, Arc<Bank>, /*last root:*/ Slot) -> bool + Sync,
{ {
let timer = Duration::from_millis(200); let timer = Duration::from_millis(200);
let mut packets = verified_receiver.recv_timeout(timer)?; let mut packets = verified_receiver.recv_timeout(timer)?;
@ -271,7 +271,10 @@ where
let now = Instant::now(); let now = Instant::now();
inc_new_counter_debug!("streamer-recv_window-recv", total_packets); inc_new_counter_debug!("streamer-recv_window-recv", total_packets);
let root_bank = bank_forks.read().unwrap().root_bank(); let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
let last_root = blockstore.last_root(); let last_root = blockstore.last_root();
let handle_packet = |packet: &mut Packet| { let handle_packet = |packet: &mut Packet| {
if packet.meta.discard { if packet.meta.discard {
@ -283,8 +286,9 @@ where
// call to `new_from_serialized_shred` is safe. // call to `new_from_serialized_shred` is safe.
assert_eq!(packet.data.len(), PACKET_DATA_SIZE); assert_eq!(packet.data.len(), PACKET_DATA_SIZE);
let serialized_shred = packet.data.to_vec(); let serialized_shred = packet.data.to_vec();
let working_bank = Arc::clone(&working_bank);
let shred = match Shred::new_from_serialized_shred(serialized_shred) { let shred = match Shred::new_from_serialized_shred(serialized_shred) {
Ok(shred) if shred_filter(&shred, last_root) => { Ok(shred) if shred_filter(&shred, working_bank, last_root) => {
let leader_pubkey = let leader_pubkey =
leader_schedule_cache.slot_leader_at(shred.slot(), Some(root_bank.deref())); leader_schedule_cache.slot_leader_at(shred.slot(), Some(root_bank.deref()));
packet.meta.slot = shred.slot(); packet.meta.slot = shred.slot();
@ -358,7 +362,7 @@ impl Drop for Finalizer {
} }
} }
pub struct WindowService { pub(crate) struct WindowService {
t_window: JoinHandle<()>, t_window: JoinHandle<()>,
t_insert: JoinHandle<()>, t_insert: JoinHandle<()>,
t_check_duplicate: JoinHandle<()>, t_check_duplicate: JoinHandle<()>,
@ -367,15 +371,15 @@ pub struct WindowService {
impl WindowService { impl WindowService {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new<F>( pub(crate) fn new<F>(
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>, verified_receiver: CrossbeamReceiver<Vec<Packets>>,
retransmit: PacketSender, retransmit: PacketSender,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
exit: &Arc<AtomicBool>, exit: Arc<AtomicBool>,
repair_info: RepairInfo, repair_info: RepairInfo,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: Arc<LeaderScheduleCache>,
shred_filter: F, shred_filter: F,
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
@ -384,12 +388,11 @@ impl WindowService {
) -> WindowService ) -> WindowService
where where
F: 'static F: 'static
+ Fn(&Pubkey, &Shred, Option<Arc<Bank>>, u64) -> bool + Fn(&Pubkey, &Shred, Option<Arc<Bank>>, /*last root:*/ Slot) -> bool
+ std::marker::Send + std::marker::Send
+ std::marker::Sync, + std::marker::Sync,
{ {
let outstanding_requests: Arc<RwLock<OutstandingRepairs>> = let outstanding_requests = Arc::<RwLock<OutstandingRepairs>>::default();
Arc::new(RwLock::new(OutstandingRequests::default()));
let bank_forks = repair_info.bank_forks.clone(); let bank_forks = repair_info.bank_forks.clone();
@ -416,9 +419,9 @@ impl WindowService {
); );
let t_insert = Self::start_window_insert_thread( let t_insert = Self::start_window_insert_thread(
exit, exit.clone(),
&blockstore, blockstore.clone(),
leader_schedule_cache, leader_schedule_cache.clone(),
insert_receiver, insert_receiver,
duplicate_sender, duplicate_sender,
completed_data_sets_sender, completed_data_sets_sender,
@ -428,12 +431,12 @@ impl WindowService {
let t_window = Self::start_recv_window_thread( let t_window = Self::start_recv_window_thread(
cluster_info.id(), cluster_info.id(),
exit, exit,
&blockstore, blockstore,
insert_sender, insert_sender,
verified_receiver, verified_receiver,
shred_filter, shred_filter,
leader_schedule_cache, leader_schedule_cache,
&bank_forks, bank_forks,
retransmit, retransmit,
); );
@ -478,17 +481,14 @@ impl WindowService {
} }
fn start_window_insert_thread( fn start_window_insert_thread(
exit: &Arc<AtomicBool>, exit: Arc<AtomicBool>,
blockstore: &Arc<Blockstore>, blockstore: Arc<Blockstore>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: Arc<LeaderScheduleCache>,
insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>, insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
check_duplicate_sender: CrossbeamSender<Shred>, check_duplicate_sender: CrossbeamSender<Shred>,
completed_data_sets_sender: CompletedDataSetsSender, completed_data_sets_sender: CompletedDataSetsSender,
outstanding_requests: Arc<RwLock<OutstandingRepairs>>, outstanding_requests: Arc<RwLock<OutstandingRepairs>>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let exit = exit.clone();
let blockstore = blockstore.clone();
let leader_schedule_cache = leader_schedule_cache.clone();
let mut handle_timeout = || {}; let mut handle_timeout = || {};
let handle_error = || { let handle_error = || {
inc_new_counter_error!("solana-window-insert-error", 1, 1); inc_new_counter_error!("solana-window-insert-error", 1, 1);
@ -538,13 +538,13 @@ impl WindowService {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn start_recv_window_thread<F>( fn start_recv_window_thread<F>(
id: Pubkey, id: Pubkey,
exit: &Arc<AtomicBool>, exit: Arc<AtomicBool>,
blockstore: &Arc<Blockstore>, blockstore: Arc<Blockstore>,
insert_sender: CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>, insert_sender: CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>, verified_receiver: CrossbeamReceiver<Vec<Packets>>,
shred_filter: F, shred_filter: F,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: Arc<LeaderScheduleCache>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
retransmit: PacketSender, retransmit: PacketSender,
) -> JoinHandle<()> ) -> JoinHandle<()>
where where
@ -553,11 +553,6 @@ impl WindowService {
+ std::marker::Send + std::marker::Send
+ std::marker::Sync, + std::marker::Sync,
{ {
let exit = exit.clone();
let blockstore = blockstore.clone();
let bank_forks = bank_forks.clone();
let bank_forks_opt = Some(bank_forks.clone());
let leader_schedule_cache = leader_schedule_cache.clone();
Builder::new() Builder::new()
.name("solana-window".to_string()) .name("solana-window".to_string())
.spawn(move || { .spawn(move || {
@ -591,13 +586,11 @@ impl WindowService {
&id, &id,
&verified_receiver, &verified_receiver,
&retransmit, &retransmit,
|shred, last_root| { |shred, bank, last_root| {
shred_filter( shred_filter(
&id, &id,
shred, shred,
bank_forks_opt Some(bank),
.as_ref()
.map(|bank_forks| bank_forks.read().unwrap().working_bank()),
last_root, last_root,
) )
}, },
@ -634,7 +627,7 @@ impl WindowService {
} }
} }
pub fn join(self) -> thread::Result<()> { pub(crate) fn join(self) -> thread::Result<()> {
self.t_window.join()?; self.t_window.join()?;
self.t_insert.join()?; self.t_insert.join()?;
self.t_check_duplicate.join()?; self.t_check_duplicate.join()?;
@ -644,23 +637,24 @@ impl WindowService {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use {
use solana_gossip::contact_info::ContactInfo; super::*,
use solana_ledger::{ solana_gossip::contact_info::ContactInfo,
solana_ledger::{
blockstore::{make_many_slot_entries, Blockstore}, blockstore::{make_many_slot_entries, Blockstore},
entry::{create_ticks, Entry}, entry::{create_ticks, Entry},
genesis_utils::create_genesis_config_with_leader, genesis_utils::create_genesis_config_with_leader,
get_tmp_ledger_path, get_tmp_ledger_path,
shred::{DataShredHeader, Shredder}, shred::{DataShredHeader, Shredder},
}; },
use solana_sdk::{ solana_sdk::{
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
hash::Hash, hash::Hash,
signature::{Keypair, Signer}, signature::{Keypair, Signer},
timing::timestamp, timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
}; };
use solana_streamer::socket::SocketAddrSpace;
use std::sync::Arc;
fn local_entries_to_shred( fn local_entries_to_shred(
entries: &[Entry], entries: &[Entry],

View File

@ -802,7 +802,7 @@ impl Blockstore {
&self, &self,
shreds: Vec<Shred>, shreds: Vec<Shred>,
is_repaired: Vec<bool>, is_repaired: Vec<bool>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>, leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool, is_trusted: bool,
handle_duplicate: &F, handle_duplicate: &F,
metrics: &mut BlockstoreInsertionMetrics, metrics: &mut BlockstoreInsertionMetrics,
@ -1045,7 +1045,7 @@ impl Blockstore {
pub fn insert_shreds( pub fn insert_shreds(
&self, &self,
shreds: Vec<Shred>, shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>, leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool, is_trusted: bool,
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)> { ) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)> {
let shreds_len = shreds.len(); let shreds_len = shreds.len();
@ -1228,7 +1228,7 @@ impl Blockstore {
index_meta_time: &mut u64, index_meta_time: &mut u64,
is_trusted: bool, is_trusted: bool,
handle_duplicate: &F, handle_duplicate: &F,
leader_schedule: Option<&Arc<LeaderScheduleCache>>, leader_schedule: Option<&LeaderScheduleCache>,
shred_source: ShredSource, shred_source: ShredSource,
) -> std::result::Result<Vec<(u32, u32)>, InsertDataShredError> ) -> std::result::Result<Vec<(u32, u32)>, InsertDataShredError>
where where
@ -1363,7 +1363,7 @@ impl Blockstore {
slot_meta: &SlotMeta, slot_meta: &SlotMeta,
just_inserted_data_shreds: &HashMap<(u64, u64), Shred>, just_inserted_data_shreds: &HashMap<(u64, u64), Shred>,
last_root: &RwLock<u64>, last_root: &RwLock<u64>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>, leader_schedule: Option<&LeaderScheduleCache>,
shred_source: ShredSource, shred_source: ShredSource,
) -> bool { ) -> bool {
use crate::shred::SHRED_PAYLOAD_SIZE; use crate::shred::SHRED_PAYLOAD_SIZE;