eliminate lock on record (#15929) (#16073)

* eliminate lock on record

* use same error as MaxHeightReached

* clippy

* review feedback

* refactor should_tick code

* pr feedback

(cherry picked from commit 57ba86c821)

Co-authored-by: Jeff Washington (jwash) <75863576+jeffwashington@users.noreply.github.com>
This commit is contained in:
mergify[bot]
2021-03-30 00:46:13 +00:00
committed by GitHub
parent ee06789a66
commit 4d731ecd08
6 changed files with 452 additions and 153 deletions

View File

@ -66,6 +66,8 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
let (exit, poh_recorder, poh_service, _signal_receiver) =
create_test_recorder(&bank, &blockstore, None);
let recorder = poh_recorder.lock().unwrap().recorder();
let tx = test_tx();
let len = 4096;
let chunk_size = 1024;
@ -88,6 +90,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&s,
None::<Box<dyn Fn()>>,
None,
&recorder,
);
});

View File

@ -4,7 +4,7 @@
use crate::{
cluster_info::ClusterInfo,
packet_hasher::PacketHasher,
poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry},
poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder, WorkingBankEntry},
poh_service::{self, PohService},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
@ -293,6 +293,7 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
test_fn: Option<impl Fn()>,
banking_stage_stats: Option<&BankingStageStats>,
recorder: &TransactionRecorder,
) {
let mut rebuffered_packets_len = 0;
let mut new_tx_count = 0;
@ -321,7 +322,7 @@ impl BankingStage {
Self::process_packets_transactions(
&bank,
&bank_creation_time,
&poh_recorder,
&recorder,
&msgs,
original_unprocessed_indexes.to_owned(),
transaction_status_sender.clone(),
@ -426,6 +427,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
) -> BufferedPacketsDecision {
let bank_start;
let (
@ -465,6 +467,7 @@ impl BankingStage {
gossip_vote_sender,
None::<Box<dyn Fn()>>,
Some(banking_stage_stats),
recorder,
);
}
BufferedPacketsDecision::Forward => {
@ -542,6 +545,7 @@ impl BankingStage {
gossip_vote_sender: ReplayVoteSender,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packets = VecDeque::with_capacity(batch_limit);
let banking_stage_stats = BankingStageStats::new(id);
@ -550,13 +554,14 @@ impl BankingStage {
let decision = Self::process_buffered_packets(
&my_pubkey,
&socket,
poh_recorder,
&poh_recorder,
cluster_info,
&mut buffered_packets,
enable_forwarding,
transaction_status_sender.clone(),
&gossip_vote_sender,
&banking_stage_stats,
&recorder,
);
if matches!(decision, BufferedPacketsDecision::Hold)
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
@ -590,6 +595,7 @@ impl BankingStage {
&mut buffered_packets,
&banking_stage_stats,
duplicates,
&recorder,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
@ -623,7 +629,7 @@ impl BankingStage {
bank_slot: Slot,
txs: &[Transaction],
results: &[TransactionExecutionResult],
poh: &Arc<Mutex<PohRecorder>>,
recorder: &TransactionRecorder,
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut processed_generation = Measure::start("record::process_generation");
let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) = results
@ -653,10 +659,7 @@ impl BankingStage {
let mut poh_record = Measure::start("record::poh_record");
// record and unlock will unlock all the successful transactions
let res = poh
.lock()
.unwrap()
.record(bank_slot, hash, processed_transactions);
let res = recorder.record(bank_slot, hash, processed_transactions);
match res {
Ok(()) => (),
Err(PohRecorderError::MaxHeightReached) => {
@ -681,7 +684,7 @@ impl BankingStage {
fn process_and_record_transactions_locked(
bank: &Arc<Bank>,
poh: &Arc<Mutex<PohRecorder>>,
poh: &TransactionRecorder,
batch: &TransactionBatch,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
@ -799,7 +802,7 @@ impl BankingStage {
pub fn process_and_record_transactions(
bank: &Arc<Bank>,
txs: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>,
poh: &TransactionRecorder,
chunk_offset: usize,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
@ -843,7 +846,7 @@ impl BankingStage {
bank: &Arc<Bank>,
bank_creation_time: &Instant,
transactions: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>,
poh: &TransactionRecorder,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> (usize, Vec<usize>) {
@ -1014,7 +1017,7 @@ impl BankingStage {
fn process_packets_transactions(
bank: &Arc<Bank>,
bank_creation_time: &Instant,
poh: &Arc<Mutex<PohRecorder>>,
poh: &TransactionRecorder,
msgs: &Packets,
packet_indexes: Vec<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
@ -1128,6 +1131,7 @@ impl BankingStage {
buffered_packets: &mut UnprocessedPackets,
banking_stage_stats: &BankingStageStats,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
recorder: &TransactionRecorder,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
let mms = verified_receiver.recv_timeout(recv_timeout)?;
@ -1170,7 +1174,7 @@ impl BankingStage {
Self::process_packets_transactions(
&bank,
&bank_creation_time,
&poh,
recorder,
&msgs,
packet_indexes,
transaction_status_sender.clone(),
@ -1306,7 +1310,7 @@ pub fn create_test_recorder(
) {
let exit = Arc::new(AtomicBool::new(false));
let poh_config = Arc::new(poh_config.unwrap_or_default());
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -1327,6 +1331,7 @@ pub fn create_test_recorder(
bank.ticks_per_slot(),
poh_service::DEFAULT_PINNED_CPU_CORE,
poh_service::DEFAULT_HASHES_PER_BATCH,
record_receiver,
);
(exit, poh_recorder, poh_service, entry_receiver)
@ -1336,7 +1341,7 @@ pub fn create_test_recorder(
mod tests {
use super::*;
use crate::{
cluster_info::Node, poh_recorder::WorkingBank,
cluster_info::Node, poh_recorder::Record, poh_recorder::WorkingBank,
transaction_status_service::TransactionStatusService,
};
use crossbeam_channel::unbounded;
@ -1356,7 +1361,15 @@ mod tests {
transaction::TransactionError,
};
use solana_transaction_status::TransactionWithStatusMeta;
use std::{net::SocketAddr, path::Path, sync::atomic::Ordering, thread::sleep};
use std::{
net::SocketAddr,
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::Receiver,
},
thread::sleep,
};
#[test]
fn test_banking_stage_shutdown1() {
@ -1681,6 +1694,8 @@ mod tests {
#[test]
fn test_bank_record_transactions() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
@ -1698,7 +1713,8 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (poh_recorder, entry_receiver) = PohRecorder::new(
let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
// TODO use record_receiver
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -1709,8 +1725,11 @@ mod tests {
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
@ -1722,12 +1741,8 @@ mod tests {
];
let mut results = vec![(Ok(()), None), (Ok(()), None)];
let _ = BankingStage::record_transactions(
bank.slot(),
&transactions,
&results,
&poh_recorder,
);
let _ =
BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder);
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert_eq!(entry.transactions.len(), transactions.len());
@ -1739,12 +1754,8 @@ mod tests {
)),
None,
);
let (res, retryable) = BankingStage::record_transactions(
bank.slot(),
&transactions,
&results,
&poh_recorder,
);
let (res, retryable) =
BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder);
res.unwrap();
assert!(retryable.is_empty());
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
@ -1752,12 +1763,8 @@ mod tests {
// Other TransactionErrors should not be recorded
results[0] = (Err(TransactionError::AccountNotFound), None);
let (res, retryable) = BankingStage::record_transactions(
bank.slot(),
&transactions,
&results,
&poh_recorder,
);
let (res, retryable) =
BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder);
res.unwrap();
assert!(retryable.is_empty());
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
@ -1770,7 +1777,7 @@ mod tests {
bank.slot() + 1,
&transactions,
&results,
&poh_recorder,
&recorder,
);
assert_matches!(res, Err(PohRecorderError::MaxHeightReached));
// The first result was an error so it's filtered out. The second result was Ok(),
@ -1778,6 +1785,9 @@ mod tests {
assert_eq!(retryable, vec![1]);
// Should receive nothing from PohRecorder b/c record failed
assert!(entry_receiver.try_recv().is_err());
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(&ledger_path).unwrap();
}
@ -2049,7 +2059,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (poh_recorder, entry_receiver) = PohRecorder::new(
let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -2060,15 +2070,18 @@ mod tests {
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
BankingStage::process_and_record_transactions(
&bank,
&transactions,
&poh_recorder,
&recorder,
0,
None,
&gossip_vote_sender,
@ -2105,7 +2118,7 @@ mod tests {
BankingStage::process_and_record_transactions(
&bank,
&transactions,
&poh_recorder,
&recorder,
0,
None,
&gossip_vote_sender,
@ -2114,11 +2127,36 @@ mod tests {
Err(PohRecorderError::MaxHeightReached)
);
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
assert_eq!(bank.get_balance(&pubkey), 1);
}
Blockstore::destroy(&ledger_path).unwrap();
}
fn simulate_poh(
record_receiver: Receiver<Record>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> (JoinHandle<()>, Arc<AtomicBool>) {
let exit = Arc::new(AtomicBool::new(false));
let exit_ = exit.clone();
let poh_recorder = poh_recorder.clone();
let tick_producer = Builder::new()
.name("solana-simulate_poh".to_string())
.spawn(move || loop {
PohService::read_record_receiver_and_process(
&poh_recorder,
&record_receiver,
Duration::from_millis(10),
);
if exit_.load(Ordering::Relaxed) {
break;
}
});
(tick_producer.unwrap(), exit)
}
#[test]
fn test_bank_process_and_record_transactions_account_in_use() {
solana_logger::setup();
@ -2147,7 +2185,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (poh_recorder, _entry_receiver) = PohRecorder::new(
let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -2158,21 +2196,27 @@ mod tests {
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let (result, unprocessed) = BankingStage::process_and_record_transactions(
&bank,
&transactions,
&poh_recorder,
&recorder,
0,
None,
&gossip_vote_sender,
);
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
assert!(result.is_ok());
assert_eq!(unprocessed.len(), 1);
}
@ -2242,7 +2286,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (poh_recorder, _entry_receiver) = PohRecorder::new(
let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -2254,9 +2298,12 @@ mod tests {
&Arc::new(PohConfig::default()),
);
// Poh Recorder has not working bank, so should throw MaxHeightReached error on
// Poh Recorder has no working bank, so should throw MaxHeightReached error on
// record
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let recorder = poh_recorder.recorder();
let (poh_simulator, exit) =
simulate_poh(record_receiver, &Arc::new(Mutex::new(poh_recorder)));
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
@ -2265,7 +2312,7 @@ mod tests {
&bank,
&Instant::now(),
&transactions,
&poh_recorder,
&recorder,
None,
&gossip_vote_sender,
);
@ -2275,6 +2322,9 @@ mod tests {
retryable_txs.sort_unstable();
let expected: Vec<usize> = (0..transactions.len()).collect();
assert_eq!(retryable_txs, expected);
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(&ledger_path).unwrap();
@ -2321,7 +2371,7 @@ mod tests {
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let blockstore = Arc::new(blockstore);
let (poh_recorder, _entry_receiver) = PohRecorder::new(
let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -2332,8 +2382,11 @@ mod tests {
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let shreds = entries_to_test_shreds(entries, bank.slot(), 0, true, 0);
@ -2353,7 +2406,7 @@ mod tests {
let _ = BankingStage::process_and_record_transactions(
&bank,
&transactions,
&poh_recorder,
&recorder,
0,
Some(TransactionStatusSender {
sender: transaction_status_sender,
@ -2386,10 +2439,14 @@ mod tests {
assert_eq!(meta, None);
}
}
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(&ledger_path).unwrap();
}
#[allow(clippy::type_complexity)]
fn setup_conflicting_transactions(
ledger_path: &Path,
) -> (
@ -2397,6 +2454,8 @@ mod tests {
Arc<Bank>,
Arc<Mutex<PohRecorder>>,
Receiver<WorkingBankEntry>,
JoinHandle<()>,
Arc<AtomicBool>,
) {
Blockstore::destroy(&ledger_path).unwrap();
let genesis_config_info = create_genesis_config(10_000);
@ -2408,7 +2467,7 @@ mod tests {
let blockstore =
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger");
let bank = Arc::new(Bank::new(&genesis_config));
let (poh_recorder, entry_receiver) = PohRecorder::new(
let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -2430,15 +2489,25 @@ mod tests {
system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey2, 1, genesis_config.hash()),
];
(transactions, bank, poh_recorder, entry_receiver)
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
(
transactions,
bank,
poh_recorder,
entry_receiver,
poh_simulator,
exit,
)
}
#[test]
fn test_consume_buffered_packets() {
let ledger_path = get_tmp_ledger_path!();
{
let (transactions, bank, poh_recorder, _entry_receiver) =
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator, exit) =
setup_conflicting_transactions(&ledger_path);
let recorder = poh_recorder.lock().unwrap().recorder();
let num_conflicting_transactions = transactions.len();
let mut packets_vec = to_packets_chunked(&transactions, num_conflicting_transactions);
assert_eq!(packets_vec.len(), 1);
@ -2466,6 +2535,7 @@ mod tests {
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
None,
&recorder,
);
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
// When the poh recorder has a bank, should process all non conflicting buffered packets.
@ -2481,6 +2551,7 @@ mod tests {
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
None,
&recorder,
);
if num_expected_unprocessed == 0 {
assert!(buffered_packets.is_empty())
@ -2488,6 +2559,8 @@ mod tests {
assert_eq!(buffered_packets[0].1.len(), num_expected_unprocessed);
}
}
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(&ledger_path).unwrap();
}
@ -2496,7 +2569,7 @@ mod tests {
fn test_consume_buffered_packets_interrupted() {
let ledger_path = get_tmp_ledger_path!();
{
let (transactions, bank, poh_recorder, _entry_receiver) =
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator, exit) =
setup_conflicting_transactions(&ledger_path);
let num_conflicting_transactions = transactions.len();
let packets_vec = to_packets_chunked(&transactions, 1);
@ -2524,6 +2597,7 @@ mod tests {
let interrupted_iteration = 1;
poh_recorder.lock().unwrap().set_bank(&bank);
let poh_recorder_ = poh_recorder.clone();
let recorder = poh_recorder_.lock().unwrap().recorder();
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
// Start up thread to process the banks
let t_consume = Builder::new()
@ -2538,6 +2612,7 @@ mod tests {
&gossip_vote_sender,
test_fn,
None,
&recorder,
);
// Check everything is correct. All indexes after `interrupted_iteration`
@ -2571,6 +2646,8 @@ mod tests {
}
t_consume.join().unwrap();
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(&ledger_path).unwrap();
}

View File

@ -51,6 +51,80 @@ type Result<T> = std::result::Result<T, PohRecorderError>;
pub type WorkingBankEntry = (Arc<Bank>, (Entry, u64));
pub type BankStart = (Arc<Bank>, Arc<Instant>);
pub struct Record {
pub mixin: Hash,
pub transactions: Vec<Transaction>,
pub slot: Slot,
pub sender: Sender<Result<()>>,
}
impl Record {
pub fn new(
mixin: Hash,
transactions: Vec<Transaction>,
slot: Slot,
sender: Sender<Result<()>>,
) -> Self {
Self {
mixin,
transactions,
slot,
sender,
}
}
}
pub struct TransactionRecorder {
// shared by all users of PohRecorder
pub record_sender: Sender<Record>,
// unique to this caller
pub result_sender: Sender<Result<()>>,
pub result_receiver: Receiver<Result<()>>,
}
impl Clone for TransactionRecorder {
fn clone(&self) -> Self {
TransactionRecorder::new(self.record_sender.clone())
}
}
impl TransactionRecorder {
pub fn new(record_sender: Sender<Record>) -> Self {
let (result_sender, result_receiver) = channel();
Self {
// shared
record_sender,
// unique to this caller
result_sender,
result_receiver,
}
}
pub fn record(
&self,
bank_slot: Slot,
mixin: Hash,
transactions: Vec<Transaction>,
) -> Result<()> {
let res = self.record_sender.send(Record::new(
mixin,
transactions,
bank_slot,
self.result_sender.clone(),
));
if res.is_err() {
// If the channel is dropped, then the validator is shutting down so return that we are hitting
// the max tick height to stop transaction processing and flush any transactions in the pipeline.
return Err(PohRecorderError::MaxHeightReached);
}
let res = self
.result_receiver
.recv_timeout(std::time::Duration::from_millis(2000));
match res {
Err(_err) => Err(PohRecorderError::MaxHeightReached),
Ok(result) => result,
}
}
}
#[derive(Clone)]
pub struct WorkingBank {
pub bank: Arc<Bank>,
@ -81,6 +155,7 @@ pub struct PohRecorder {
tick_overhead_us: u64,
record_us: u64,
last_metric: Instant,
record_sender: Sender<Record>,
}
impl PohRecorder {
@ -157,6 +232,10 @@ impl PohRecorder {
self.ticks_per_slot
}
pub fn recorder(&self) -> TransactionRecorder {
TransactionRecorder::new(self.record_sender.clone())
}
fn is_same_fork_as_previous_leader(&self, slot: Slot) -> bool {
(slot.saturating_sub(NUM_CONSECUTIVE_LEADER_SLOTS)..slot).any(|slot| {
// Check if the last slot Poh reset to was any of the
@ -479,12 +558,13 @@ impl PohRecorder {
clear_bank_signal: Option<SyncSender<bool>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
poh_config: &Arc<PohConfig>,
) -> (Self, Receiver<WorkingBankEntry>) {
) -> (Self, Receiver<WorkingBankEntry>, Receiver<Record>) {
let poh = Arc::new(Mutex::new(Poh::new(
last_entry_hash,
poh_config.hashes_per_tick,
)));
let (sender, receiver) = channel();
let (record_sender, record_receiver) = channel();
let (leader_first_tick_height, leader_last_tick_height, grace_ticks) =
Self::compute_leader_slot_tick_heights(next_leader_slot, ticks_per_slot);
(
@ -510,8 +590,10 @@ impl PohRecorder {
record_us: 0,
tick_overhead_us: 0,
last_metric: Instant::now(),
record_sender,
},
receiver,
record_receiver,
)
}
@ -528,7 +610,7 @@ impl PohRecorder {
blockstore: &Arc<Blockstore>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
poh_config: &Arc<PohConfig>,
) -> (Self, Receiver<WorkingBankEntry>) {
) -> (Self, Receiver<WorkingBankEntry>, Receiver<Record>) {
Self::new_with_clear_signal(
tick_height,
last_entry_hash,
@ -580,7 +662,7 @@ mod tests {
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -607,7 +689,7 @@ mod tests {
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -633,7 +715,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
Hash::default(),
0,
@ -661,7 +743,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -697,7 +779,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -748,7 +830,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -797,7 +879,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -835,7 +917,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -877,7 +959,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -923,7 +1005,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -967,7 +1049,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -1004,7 +1086,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
Hash::default(),
0,
@ -1031,7 +1113,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
Hash::default(),
0,
@ -1059,7 +1141,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
Hash::default(),
0,
@ -1092,7 +1174,7 @@ mod tests {
.expect("Expected to be able to open database ledger");
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
Hash::default(),
0,
@ -1126,18 +1208,19 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let (sender, receiver) = sync_channel(1);
let (mut poh_recorder, _entry_receiver) = PohRecorder::new_with_clear_signal(
0,
Hash::default(),
0,
None,
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blockstore),
Some(sender),
&Arc::new(LeaderScheduleCache::default()),
&Arc::new(PohConfig::default()),
);
let (mut poh_recorder, _entry_receiver, _record_receiver) =
PohRecorder::new_with_clear_signal(
0,
Hash::default(),
0,
None,
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blockstore),
Some(sender),
&Arc::new(LeaderScheduleCache::default()),
&Arc::new(PohConfig::default()),
);
poh_recorder.set_bank(&bank);
poh_recorder.clear_bank();
assert!(receiver.try_recv().is_ok());
@ -1160,7 +1243,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -1209,7 +1292,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -1271,7 +1354,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -1400,7 +1483,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -1468,7 +1551,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_config));
let genesis_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
bank.last_blockhash(),
0,

View File

@ -1,12 +1,13 @@
//! The `poh_service` module implements a service that records the passing of
//! "ticks", a measure of time in the PoH stream
use crate::poh_recorder::PohRecorder;
use crate::poh_recorder::{PohRecorder, Record};
use solana_ledger::poh::Poh;
use solana_measure::measure::Measure;
use solana_sdk::poh_config::PohConfig;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{mpsc::Receiver, Arc, Mutex};
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Instant;
use std::time::{Duration, Instant};
pub struct PohService {
tick_producer: JoinHandle<()>,
@ -24,6 +25,54 @@ pub const DEFAULT_PINNED_CPU_CORE: usize = 0;
const TARGET_SLOT_ADJUSTMENT_NS: u64 = 50_000_000;
#[derive(Debug)]
struct PohTiming {
num_ticks: u64,
num_hashes: u64,
total_sleep_us: u64,
total_lock_time_ns: u64,
total_hash_time_ns: u64,
total_tick_time_ns: u64,
last_metric: Instant,
}
impl PohTiming {
fn new() -> Self {
Self {
num_ticks: 0,
num_hashes: 0,
total_sleep_us: 0,
total_lock_time_ns: 0,
total_hash_time_ns: 0,
total_tick_time_ns: 0,
last_metric: Instant::now(),
}
}
fn report(&mut self, ticks_per_slot: u64) {
if self.last_metric.elapsed().as_millis() > 1000 {
let elapsed_us = self.last_metric.elapsed().as_micros() as u64;
let us_per_slot = (elapsed_us * ticks_per_slot) / self.num_ticks;
datapoint_info!(
"poh-service",
("ticks", self.num_ticks as i64, i64),
("hashes", self.num_hashes as i64, i64),
("elapsed_us", us_per_slot, i64),
("total_sleep_us", self.total_sleep_us, i64),
("total_tick_time_us", self.total_tick_time_ns / 1000, i64),
("total_lock_time_us", self.total_lock_time_ns / 1000, i64),
("total_hash_time_us", self.total_hash_time_ns / 1000, i64),
);
self.total_sleep_us = 0;
self.num_ticks = 0;
self.num_hashes = 0;
self.total_tick_time_ns = 0;
self.total_lock_time_ns = 0;
self.total_hash_time_ns = 0;
self.last_metric = Instant::now();
}
}
}
impl PohService {
pub fn new(
poh_recorder: Arc<Mutex<PohRecorder>>,
@ -32,6 +81,7 @@ impl PohService {
ticks_per_slot: u64,
pinned_cpu_core: usize,
hashes_per_batch: u64,
record_receiver: Receiver<Record>,
) -> Self {
let poh_exit_ = poh_exit.clone();
let poh_config = poh_config.clone();
@ -41,12 +91,18 @@ impl PohService {
solana_sys_tuner::request_realtime_poh();
if poh_config.hashes_per_tick.is_none() {
if poh_config.target_tick_count.is_none() {
Self::sleepy_tick_producer(poh_recorder, &poh_config, &poh_exit_);
Self::sleepy_tick_producer(
poh_recorder,
&poh_config,
&poh_exit_,
record_receiver,
);
} else {
Self::short_lived_sleepy_tick_producer(
poh_recorder,
&poh_config,
&poh_exit_,
record_receiver,
);
}
} else {
@ -69,6 +125,7 @@ impl PohService {
poh_config.target_tick_duration.as_nanos() as u64 - adjustment_per_tick,
ticks_per_slot,
hashes_per_batch,
record_receiver,
);
}
poh_exit_.store(true, Ordering::Relaxed);
@ -82,20 +139,53 @@ impl PohService {
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_config: &PohConfig,
poh_exit: &AtomicBool,
record_receiver: Receiver<Record>,
) {
while !poh_exit.load(Ordering::Relaxed) {
Self::read_record_receiver_and_process(
&poh_recorder,
&record_receiver,
Duration::from_millis(0),
);
sleep(poh_config.target_tick_duration);
poh_recorder.lock().unwrap().tick();
}
}
pub fn read_record_receiver_and_process(
poh_recorder: &Arc<Mutex<PohRecorder>>,
record_receiver: &Receiver<Record>,
timeout: Duration,
) {
let record = record_receiver.recv_timeout(timeout);
if let Ok(record) = record {
if record
.sender
.send(poh_recorder.lock().unwrap().record(
record.slot,
record.mixin,
record.transactions,
))
.is_err()
{
panic!("Error returning mixin hash");
}
}
}
fn short_lived_sleepy_tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_config: &PohConfig,
poh_exit: &AtomicBool,
record_receiver: Receiver<Record>,
) {
let mut warned = false;
for _ in 0..poh_config.target_tick_count.unwrap() {
Self::read_record_receiver_and_process(
&poh_recorder,
&record_receiver,
Duration::from_millis(0),
);
sleep(poh_config.target_tick_duration);
poh_recorder.lock().unwrap().tick();
if poh_exit.load(Ordering::Relaxed) && !warned {
@ -105,78 +195,121 @@ impl PohService {
}
}
fn record_or_hash(
next_record: &mut Option<Record>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
timing: &mut PohTiming,
record_receiver: &Receiver<Record>,
hashes_per_batch: u64,
poh: &Arc<Mutex<Poh>>,
) -> bool {
match next_record.take() {
Some(mut record) => {
// received message to record
// so, record for as long as we have queued up record requests
let mut lock_time = Measure::start("lock");
let mut poh_recorder_l = poh_recorder.lock().unwrap();
lock_time.stop();
timing.total_lock_time_ns += lock_time.as_ns();
loop {
let res = poh_recorder_l.record(
record.slot,
record.mixin,
std::mem::take(&mut record.transactions),
);
let _ = record.sender.send(res); // what do we do on failure here? Ignore for now.
timing.num_hashes += 1; // note: may have also ticked inside record
let new_record_result = record_receiver.try_recv();
match new_record_result {
Ok(new_record) => {
// we already have second request to record, so record again while we still have the mutex
record = new_record;
}
Err(_) => {
break;
}
}
}
// PohRecorder.record would have ticked if it needed to, so should_tick will be false
}
None => {
// did not receive instructions to record, so hash until we notice we've been asked to record (or we need to tick) and then remember what to record
let mut lock_time = Measure::start("lock");
let mut poh_l = poh.lock().unwrap();
lock_time.stop();
timing.total_lock_time_ns += lock_time.as_ns();
loop {
timing.num_hashes += hashes_per_batch;
let mut hash_time = Measure::start("hash");
let should_tick = poh_l.hash(hashes_per_batch);
hash_time.stop();
timing.total_hash_time_ns += hash_time.as_ns();
if should_tick {
return true; // nothing else can be done. tick required.
}
// check to see if a record request has been sent
let get_again = record_receiver.try_recv();
match get_again {
Ok(record) => {
// remember the record we just received as the next record to occur
*next_record = Some(record);
break;
}
Err(_) => {
continue;
}
}
}
}
};
false // should_tick = false for all code that reaches here
}
fn tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_exit: &AtomicBool,
target_tick_ns: u64,
ticks_per_slot: u64,
hashes_per_batch: u64,
record_receiver: Receiver<Record>,
) {
let poh = poh_recorder.lock().unwrap().poh.clone();
let mut now = Instant::now();
let mut last_metric = Instant::now();
let mut num_ticks = 0;
let mut num_hashes = 0;
let mut total_sleep_us = 0;
let mut total_lock_time_ns = 0;
let mut total_hash_time_ns = 0;
let mut total_tick_time_ns = 0;
let mut timing = PohTiming::new();
let mut next_record = None;
loop {
num_hashes += hashes_per_batch;
let should_tick = {
let mut lock_time = Measure::start("lock");
let mut poh_l = poh.lock().unwrap();
lock_time.stop();
total_lock_time_ns += lock_time.as_ns();
let mut hash_time = Measure::start("hash");
let r = poh_l.hash(hashes_per_batch);
hash_time.stop();
total_hash_time_ns += hash_time.as_ns();
r
};
let should_tick = Self::record_or_hash(
&mut next_record,
&poh_recorder,
&mut timing,
&record_receiver,
hashes_per_batch,
&poh,
);
if should_tick {
// Lock PohRecorder only for the final hash...
// Lock PohRecorder only for the final hash. record_or_hash will lock PohRecorder for record calls but not for hashing.
{
let mut lock_time = Measure::start("lock");
let mut poh_recorder_l = poh_recorder.lock().unwrap();
lock_time.stop();
total_lock_time_ns += lock_time.as_ns();
timing.total_lock_time_ns += lock_time.as_ns();
let mut tick_time = Measure::start("tick");
poh_recorder_l.tick();
tick_time.stop();
total_tick_time_ns += tick_time.as_ns();
timing.total_tick_time_ns += tick_time.as_ns();
}
num_ticks += 1;
timing.num_ticks += 1;
let elapsed_ns = now.elapsed().as_nanos() as u64;
// sleep is not accurate enough to get a predictable time.
// Kernel can not schedule the thread for a while.
while (now.elapsed().as_nanos() as u64) < target_tick_ns {
std::hint::spin_loop();
}
total_sleep_us += (now.elapsed().as_nanos() as u64 - elapsed_ns) / 1000;
timing.total_sleep_us += (now.elapsed().as_nanos() as u64 - elapsed_ns) / 1000;
now = Instant::now();
if last_metric.elapsed().as_millis() > 1000 {
let elapsed_us = last_metric.elapsed().as_micros() as u64;
let us_per_slot = (elapsed_us * ticks_per_slot) / num_ticks;
datapoint_info!(
"poh-service",
("ticks", num_ticks as i64, i64),
("hashes", num_hashes as i64, i64),
("elapsed_us", us_per_slot, i64),
("total_sleep_us", total_sleep_us, i64),
("total_tick_time_us", total_tick_time_ns / 1000, i64),
("total_lock_time_us", total_lock_time_ns / 1000, i64),
("total_hash_time_us", total_hash_time_ns / 1000, i64),
);
total_sleep_us = 0;
num_ticks = 0;
num_hashes = 0;
total_tick_time_ns = 0;
total_lock_time_ns = 0;
total_hash_time_ns = 0;
last_metric = Instant::now();
}
timing.report(ticks_per_slot);
if poh_exit.load(Ordering::Relaxed) {
break;
}
@ -225,7 +358,7 @@ mod tests {
target_tick_duration,
target_tick_count: None,
});
let (poh_recorder, entry_receiver) = PohRecorder::new(
let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
prev_hash,
bank.slot(),
@ -305,6 +438,7 @@ mod tests {
0,
DEFAULT_PINNED_CPU_CORE,
hashes_per_batch,
record_receiver,
);
poh_recorder.lock().unwrap().set_working_bank(working_bank);

View File

@ -801,7 +801,7 @@ mod test {
);
let bank = Arc::new(Bank::new(&genesis_config));
let (poh_recorder, _entry_receiver) = PohRecorder::new(
let (poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
bank.last_blockhash(),
0,

View File

@ -490,24 +490,25 @@ impl Validator {
);
let poh_config = Arc::new(genesis_config.poh_config.clone());
let (mut poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
leader_schedule_cache.next_leader_slot(
&id,
let (mut poh_recorder, entry_receiver, record_receiver) =
PohRecorder::new_with_clear_signal(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
&bank,
Some(&blockstore),
GRACE_TICKS_FACTOR * MAX_GRACE_SLOTS,
),
bank.ticks_per_slot(),
&id,
&blockstore,
blockstore.new_shreds_signals.first().cloned(),
&leader_schedule_cache,
&poh_config,
);
leader_schedule_cache.next_leader_slot(
&id,
bank.slot(),
&bank,
Some(&blockstore),
GRACE_TICKS_FACTOR * MAX_GRACE_SLOTS,
),
bank.ticks_per_slot(),
&id,
&blockstore,
blockstore.new_shreds_signals.first().cloned(),
&leader_schedule_cache,
&poh_config,
);
if config.snapshot_config.is_some() {
poh_recorder.set_bank(&bank);
}
@ -655,6 +656,7 @@ impl Validator {
bank.ticks_per_slot(),
config.poh_pinned_cpu_core,
config.poh_hashes_per_batch,
record_receiver,
);
assert_eq!(
blockstore.new_shreds_signals.len(),