Poh subsystem cleanup, genesis plumbing, enable real PoH on edge testnet (#4292)

* Remove unused PohServiceConfig::Step

* Clarify variable name

* Poh::hash() now takes an iteration counter

* man -> max

* Inline functions with single call site

* Move PohServiceConfig into GenesisBlock

* Add plumbing to enable real PoH on testnets

* Batch hashes to improve PoH hash rate

* Ensure a constant hashes_per_tick

* Remove PohEntry mixin field

* Poh/PohEntry no longer maintains tick_height

* Ensure a constant hashes_per_tick

* ci/localnet-sanity.sh: Use real PoH

* Rework Poh/PohService to keep PohRecorder unlocked as much as possible while hashing
This commit is contained in:
Michael Vines
2019-05-18 14:01:36 -07:00
committed by GitHub
parent 7e1a7862db
commit 392a39dd54
26 changed files with 625 additions and 319 deletions

View File

@ -9,7 +9,7 @@ use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::packet;
use crate::packet::{Packet, Packets};
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries};
use crate::poh_service::{PohService, PohServiceConfig};
use crate::poh_service::PohService;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::sigverify_stage::VerifiedPackets;
@ -19,6 +19,7 @@ use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counte
use solana_runtime::accounts_db::ErrorCounters;
use solana_runtime::bank::Bank;
use solana_runtime::locked_accounts_results::LockedAccountsResults;
use solana_sdk::poh_config::PohConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{
self, duration_as_us, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES,
@ -755,6 +756,7 @@ pub fn create_test_recorder(
Receiver<WorkingBankEntries>,
) {
let exit = Arc::new(AtomicBool::new(false));
let poh_config = Arc::new(PohConfig::default());
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
@ -764,11 +766,12 @@ pub fn create_test_recorder(
&Pubkey::default(),
blocktree,
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&poh_config,
);
poh_recorder.set_bank(&bank);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit);
let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit);
(exit, poh_recorder, poh_service, entry_receiver)
}
@ -1082,6 +1085,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
@ -1367,6 +1371,7 @@ mod tests {
&pubkey,
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
@ -1451,6 +1456,7 @@ mod tests {
&pubkey,
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));

View File

@ -8,21 +8,22 @@ use crate::contact_info::ContactInfo;
use crate::entry::{Entry, EntrySlice};
use crate::gossip_service::discover_cluster;
use crate::locktower::VOTE_THRESHOLD_DEPTH;
use crate::poh_service::PohServiceConfig;
use solana_client::thin_client::create_client;
use solana_runtime::epoch_schedule::MINIMUM_SLOT_LENGTH;
use solana_sdk::client::SyncClient;
use solana_sdk::hash::Hash;
use solana_sdk::poh_config::PohConfig;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::system_transaction;
use solana_sdk::timing::{
duration_as_ms, DEFAULT_TICKS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS, NUM_TICKS_PER_SECOND,
duration_as_ms, DEFAULT_NUM_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT,
NUM_CONSECUTIVE_LEADER_SLOTS,
};
use solana_sdk::transport::TransportError;
use std::thread::sleep;
use std::time::Duration;
const SLOT_MILLIS: u64 = (DEFAULT_TICKS_PER_SLOT * 1000) / NUM_TICKS_PER_SECOND;
const DEFAULT_SLOT_MILLIS: u64 = (DEFAULT_TICKS_PER_SLOT * 1000) / DEFAULT_NUM_TICKS_PER_SECOND;
/// Spend and verify from every node in the network
pub fn spend_and_verify_all_nodes(
@ -87,7 +88,7 @@ pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) {
let client = create_client(node.client_facing_addr(), FULLNODE_PORT_RANGE);
assert!(client.fullnode_exit().unwrap());
}
sleep(Duration::from_millis(SLOT_MILLIS));
sleep(Duration::from_millis(DEFAULT_SLOT_MILLIS));
for node in &cluster_nodes {
let client = create_client(node.client_facing_addr(), FULLNODE_PORT_RANGE);
assert!(client.fullnode_exit().is_err());
@ -129,21 +130,15 @@ pub fn verify_ledger_ticks(ledger_path: &str, ticks_per_slot: usize) {
pub fn sleep_n_epochs(
num_epochs: f64,
config: &PohServiceConfig,
config: &PohConfig,
ticks_per_slot: u64,
slots_per_epoch: u64,
) {
let num_ticks_per_second = {
match config {
PohServiceConfig::Sleep(d) => (1000 / duration_as_ms(d)) as f64,
_ => panic!("Unsuppported tick config for testing"),
}
};
let num_ticks_per_second = (1000 / duration_as_ms(&config.target_tick_duration)) as f64;
let num_ticks_to_sleep = num_epochs * ticks_per_slot as f64 * slots_per_epoch as f64;
sleep(Duration::from_secs(
((num_ticks_to_sleep + num_ticks_per_second - 1.0) / num_ticks_per_second) as u64,
));
let secs = ((num_ticks_to_sleep + num_ticks_per_second - 1.0) / num_ticks_per_second) as u64;
warn!("sleep_n_epochs: {} seconds", secs);
sleep(Duration::from_secs(secs));
}
pub fn kill_entry_and_spend_and_verify_rest(

View File

@ -171,16 +171,12 @@ fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) -
return *start_hash;
}
let mut poh = Poh::new(*start_hash, 0);
for _ in 1..num_hashes {
poh.hash();
}
let mut poh = Poh::new(*start_hash, None);
poh.hash(num_hashes.saturating_sub(1));
if transactions.is_empty() {
poh.tick().hash
poh.tick().unwrap().hash
} else {
poh.record(hash_transactions(transactions)).hash
poh.record(hash_transactions(transactions)).unwrap().hash
}
}

View File

@ -8,7 +8,7 @@ use crate::contact_info::ContactInfo;
use crate::gossip_service::{discover_cluster, GossipService};
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::poh_recorder::PohRecorder;
use crate::poh_service::{PohService, PohServiceConfig};
use crate::poh_service::PohService;
use crate::rpc::JsonRpcConfig;
use crate::rpc_pubsub_service::PubSubService;
use crate::rpc_service::JsonRpcService;
@ -20,6 +20,7 @@ use crate::tvu::{Sockets, Tvu};
use solana_metrics::inc_new_counter_info;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::poh_config::PohConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::timestamp;
@ -36,7 +37,6 @@ pub struct FullnodeConfig {
pub voting_disabled: bool,
pub blockstream: Option<String>,
pub storage_rotate_count: u64,
pub tick_config: PohServiceConfig,
pub account_paths: Option<String>,
pub rpc_config: JsonRpcConfig,
}
@ -51,7 +51,6 @@ impl Default for FullnodeConfig {
voting_disabled: false,
blockstream: None,
storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE,
tick_config: PohServiceConfig::default(),
account_paths: None,
rpc_config: JsonRpcConfig::default(),
}
@ -101,6 +100,7 @@ impl Fullnode {
ledger_signal_receiver,
completed_slots_receiver,
leader_schedule_cache,
poh_config,
) = new_banks_from_blocktree(ledger_path, config.account_paths.clone());
let leader_schedule_cache = Arc::new(leader_schedule_cache);
@ -115,6 +115,7 @@ impl Fullnode {
);
let blocktree = Arc::new(blocktree);
let poh_config = Arc::new(poh_config);
let (poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal(
bank.tick_height(),
bank.last_blockhash(),
@ -125,9 +126,10 @@ impl Fullnode {
&blocktree,
blocktree.new_blobs_signals.first().cloned(),
&leader_schedule_cache,
&poh_config,
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit);
let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit);
assert_eq!(
blocktree.new_blobs_signals.len(),
1,
@ -298,6 +300,7 @@ pub fn new_banks_from_blocktree(
Receiver<bool>,
CompletedSlotsReceiver,
LeaderScheduleCache,
PohConfig,
) {
let genesis_block =
GenesisBlock::load(blocktree_path).expect("Expected to successfully open genesis block");
@ -317,6 +320,7 @@ pub fn new_banks_from_blocktree(
ledger_signal_receiver,
completed_slots_receiver,
leader_schedule_cache,
genesis_block.poh_config,
)
}

View File

@ -11,6 +11,7 @@ use solana_client::thin_client::create_client;
use solana_client::thin_client::ThinClient;
use solana_sdk::client::SyncClient;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::poh_config::PohConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction;
@ -62,6 +63,7 @@ pub struct ClusterConfig {
pub ticks_per_slot: u64,
pub slots_per_epoch: u64,
pub native_instruction_processors: Vec<(String, Pubkey)>,
pub poh_config: PohConfig,
}
impl Default for ClusterConfig {
@ -75,6 +77,7 @@ impl Default for ClusterConfig {
ticks_per_slot: DEFAULT_TICKS_PER_SLOT,
slots_per_epoch: DEFAULT_SLOTS_PER_EPOCH,
native_instruction_processors: vec![],
poh_config: PohConfig::default(),
}
}
}
@ -120,6 +123,7 @@ impl LocalCluster {
);
genesis_block.ticks_per_slot = config.ticks_per_slot;
genesis_block.slots_per_epoch = config.slots_per_epoch;
genesis_block.poh_config = config.poh_config.clone();
genesis_block
.native_instruction_processors
.extend_from_slice(&config.native_instruction_processors);

View File

@ -1,94 +1,111 @@
//! The `Poh` module provhashes an object for generating a Proof of History.
//! It records Hashes items on behalf of its users.
//! The `Poh` module provides an object for generating a Proof of History.
use solana_sdk::hash::{hash, hashv, Hash};
pub struct Poh {
pub hash: Hash,
num_hashes: u64,
pub tick_height: u64,
hashes_per_tick: u64,
remaining_hashes: u64,
}
#[derive(Debug)]
pub struct PohEntry {
pub tick_height: u64,
pub num_hashes: u64,
pub hash: Hash,
pub mixin: Option<Hash>,
}
impl Poh {
pub fn new(hash: Hash, tick_height: u64) -> Self {
pub fn new(hash: Hash, hashes_per_tick: Option<u64>) -> Self {
let hashes_per_tick = hashes_per_tick.unwrap_or(std::u64::MAX);
assert!(hashes_per_tick > 1);
Poh {
num_hashes: 0,
hash,
tick_height,
num_hashes: 0,
hashes_per_tick,
remaining_hashes: hashes_per_tick,
}
}
pub fn hash(&mut self) {
self.hash = hash(&self.hash.as_ref());
self.num_hashes += 1;
pub fn reset(&mut self, hash: Hash, hashes_per_tick: Option<u64>) {
let mut poh = Poh::new(hash, hashes_per_tick);
std::mem::swap(&mut poh, self);
}
pub fn record(&mut self, mixin: Hash) -> PohEntry {
self.hash = hashv(&[&self.hash.as_ref(), &mixin.as_ref()]);
pub fn hash(&mut self, max_num_hashes: u64) -> bool {
let num_hashes = std::cmp::min(self.remaining_hashes - 1, max_num_hashes);
for _ in 0..num_hashes {
self.hash = hash(&self.hash.as_ref());
}
self.num_hashes += num_hashes;
self.remaining_hashes -= num_hashes;
assert!(self.remaining_hashes > 0);
self.remaining_hashes == 1 // Return `true` if caller needs to `tick()` next
}
pub fn record(&mut self, mixin: Hash) -> Option<PohEntry> {
if self.remaining_hashes == 1 {
return None; // Caller needs to `tick()` first
}
self.hash = hashv(&[&self.hash.as_ref(), &mixin.as_ref()]);
let num_hashes = self.num_hashes + 1;
self.num_hashes = 0;
self.remaining_hashes -= 1;
PohEntry {
tick_height: self.tick_height,
Some(PohEntry {
num_hashes,
hash: self.hash,
mixin: Some(mixin),
}
})
}
// emissions of Ticks (i.e. PohEntries without a mixin) allows
// validators to parallelize the work of catching up
pub fn tick(&mut self) -> PohEntry {
self.hash();
pub fn tick(&mut self) -> Option<PohEntry> {
self.hash = hash(&self.hash.as_ref());
self.num_hashes += 1;
self.remaining_hashes -= 1;
// If the hashes_per_tick is variable (std::u64::MAX) then always generate a tick.
// Otherwise only tick if there are no remaining hashes
if self.hashes_per_tick < std::u64::MAX && self.remaining_hashes != 0 {
return None;
}
let num_hashes = self.num_hashes;
self.remaining_hashes = self.hashes_per_tick;
self.num_hashes = 0;
self.tick_height += 1;
PohEntry {
tick_height: self.tick_height,
Some(PohEntry {
num_hashes,
hash: self.hash,
mixin: None,
}
})
}
}
#[cfg(test)]
pub fn verify(initial_hash: Hash, entries: &[PohEntry]) -> bool {
let mut current_hash = initial_hash;
for entry in entries {
assert!(entry.num_hashes != 0);
for _ in 1..entry.num_hashes {
current_hash = hash(&current_hash.as_ref());
}
current_hash = match entry.mixin {
Some(mixin) => hashv(&[&current_hash.as_ref(), &mixin.as_ref()]),
None => hash(&current_hash.as_ref()),
};
if current_hash != entry.hash {
return false;
}
}
true
}
#[cfg(test)]
mod tests {
use crate::poh::{verify, Poh, PohEntry};
use crate::poh::{Poh, PohEntry};
use solana_sdk::hash::{hash, hashv, Hash};
fn verify(initial_hash: Hash, entries: &[(PohEntry, Option<Hash>)]) -> bool {
let mut current_hash = initial_hash;
for (entry, mixin) in entries {
assert_ne!(entry.num_hashes, 0);
for _ in 1..entry.num_hashes {
current_hash = hash(&current_hash.as_ref());
}
current_hash = match mixin {
Some(mixin) => hashv(&[&current_hash.as_ref(), &mixin.as_ref()]),
None => hash(&current_hash.as_ref()),
};
if current_hash != entry.hash {
return false;
}
}
true
}
#[test]
fn test_poh_verify() {
let zero = Hash::default();
@ -96,11 +113,16 @@ mod tests {
let two = hash(&one.as_ref());
let one_with_zero = hashv(&[&zero.as_ref(), &zero.as_ref()]);
let mut poh = Poh::new(zero, 0);
let mut poh = Poh::new(zero, None);
assert_eq!(
verify(
zero,
&[poh.tick(), poh.record(zero), poh.record(zero), poh.tick(),],
&[
(poh.tick().unwrap(), None),
(poh.record(zero).unwrap(), Some(zero)),
(poh.record(zero).unwrap(), Some(zero)),
(poh.tick().unwrap(), None),
],
),
true
);
@ -108,24 +130,26 @@ mod tests {
assert_eq!(
verify(
zero,
&[PohEntry {
tick_height: 0,
num_hashes: 1,
hash: one,
mixin: None,
}],
&[(
PohEntry {
num_hashes: 1,
hash: one,
},
None
)],
),
true
);
assert_eq!(
verify(
zero,
&[PohEntry {
tick_height: 0,
num_hashes: 2,
hash: two,
mixin: None,
}]
&[(
PohEntry {
num_hashes: 2,
hash: two,
},
None
)]
),
true
);
@ -133,24 +157,26 @@ mod tests {
assert_eq!(
verify(
zero,
&[PohEntry {
tick_height: 0,
num_hashes: 1,
hash: one_with_zero,
mixin: Some(zero),
}]
&[(
PohEntry {
num_hashes: 1,
hash: one_with_zero,
},
Some(zero)
)]
),
true
);
assert_eq!(
verify(
zero,
&[PohEntry {
tick_height: 0,
num_hashes: 1,
hash: zero,
mixin: None
}]
&[(
PohEntry {
num_hashes: 1,
hash: zero,
},
None
)]
),
false
);
@ -159,18 +185,20 @@ mod tests {
verify(
zero,
&[
PohEntry {
tick_height: 0,
num_hashes: 1,
hash: one_with_zero,
mixin: Some(zero),
},
PohEntry {
tick_height: 0,
num_hashes: 1,
hash: hash(&one_with_zero.as_ref()),
mixin: None
},
(
PohEntry {
num_hashes: 1,
hash: one_with_zero,
},
Some(zero)
),
(
PohEntry {
num_hashes: 1,
hash: hash(&one_with_zero.as_ref()),
},
None
)
]
),
true
@ -182,13 +210,52 @@ mod tests {
fn test_poh_verify_assert() {
verify(
Hash::default(),
&[PohEntry {
tick_height: 0,
num_hashes: 0,
hash: Hash::default(),
mixin: None,
}],
&[(
PohEntry {
num_hashes: 0,
hash: Hash::default(),
},
None,
)],
);
}
#[test]
fn test_poh_tick() {
let mut poh = Poh::new(Hash::default(), Some(2));
assert_eq!(poh.remaining_hashes, 2);
assert!(poh.tick().is_none());
assert_eq!(poh.remaining_hashes, 1);
assert!(poh.tick().is_some());
assert_eq!(poh.remaining_hashes, 2); // Ready for the next tick
}
#[test]
fn test_poh_tick_large_batch() {
let mut poh = Poh::new(Hash::default(), Some(2));
assert_eq!(poh.remaining_hashes, 2);
assert!(poh.hash(1_000_000)); // Stop hashing before the next tick
assert_eq!(poh.remaining_hashes, 1);
assert!(poh.hash(1_000_000)); // Does nothing...
assert_eq!(poh.remaining_hashes, 1);
poh.tick();
assert_eq!(poh.remaining_hashes, 2); // Ready for the next tick
}
#[test]
fn test_poh_tick_too_soon() {
let mut poh = Poh::new(Hash::default(), Some(2));
assert_eq!(poh.remaining_hashes, 2);
assert!(poh.tick().is_none());
}
#[test]
fn test_poh_record_not_permitted_at_final_hash() {
let mut poh = Poh::new(Hash::default(), Some(10));
assert!(poh.hash(9));
assert_eq!(poh.remaining_hashes, 1);
assert!(poh.record(Hash::default()).is_none()); // <-- record() rejected to avoid exceeding hashes_per_tick
poh.tick();
assert!(poh.record(Hash::default()).is_some()); // <-- record() ok
}
}

View File

@ -5,10 +5,10 @@
//! within the specified WorkingBank range.
//!
//! For Ticks:
//! * tick must be > WorkingBank::min_tick_height && tick must be <= WorkingBank::man_tick_height
//! * tick must be > WorkingBank::min_tick_height && tick must be <= WorkingBank::max_tick_height
//!
//! For Entries:
//! * recorded entry must be >= WorkingBank::min_tick_height && entry must be < WorkingBank::man_tick_height
//! * recorded entry must be >= WorkingBank::min_tick_height && entry must be < WorkingBank::max_tick_height
//!
use crate::blocktree::Blocktree;
use crate::entry::Entry;
@ -18,10 +18,11 @@ use crate::poh::Poh;
use crate::result::{Error, Result};
use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
use solana_sdk::poh_config::PohConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::Transaction;
use std::sync::mpsc::{channel, Receiver, Sender, SyncSender};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
const MAX_LAST_LEADER_GRACE_TICKS_FACTOR: u64 = 2;
@ -42,7 +43,8 @@ pub struct WorkingBank {
}
pub struct PohRecorder {
poh: Poh,
pub poh: Arc<Mutex<Poh>>,
tick_height: u64,
clear_bank_signal: Option<SyncSender<bool>>,
start_slot: u64,
start_tick: u64,
@ -55,6 +57,7 @@ pub struct PohRecorder {
id: Pubkey,
blocktree: Arc<Blocktree>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
poh_config: Arc<PohConfig>,
ticks_per_slot: u64,
}
@ -99,12 +102,6 @@ impl PohRecorder {
self.leader_schedule_cache.slot_leader_at(slot + 1, None)
}
pub fn hash(&mut self) {
// TODO: amortize the cost of this lock by doing the loop in here for
// some min amount of hashes
self.poh.hash();
}
pub fn start_slot(&self) -> u64 {
self.start_slot
}
@ -114,7 +111,7 @@ impl PohRecorder {
}
pub fn tick_height(&self) -> u64 {
self.poh.tick_height
self.tick_height
}
// returns if leader tick has reached, and how many grace ticks were afforded
@ -176,14 +173,19 @@ impl PohRecorder {
) {
self.clear_bank();
let mut cache = vec![];
info!(
"reset poh from: {},{} to: {},{}",
self.poh.hash, self.poh.tick_height, blockhash, tick_height,
);
{
let mut poh = self.poh.lock().unwrap();
info!(
"reset poh from: {},{} to: {},{}",
poh.hash, self.tick_height, blockhash, tick_height,
);
poh.reset(blockhash, self.poh_config.hashes_per_tick);
}
std::mem::swap(&mut cache, &mut self.tick_cache);
self.start_slot = start_slot;
self.start_tick = tick_height + 1;
self.poh = Poh::new(blockhash, tick_height);
self.tick_height = tick_height;
self.max_last_leader_grace_ticks = ticks_per_slot / MAX_LAST_LEADER_GRACE_TICKS_FACTOR;
let (start_leader_at_tick, last_leader_tick) = Self::compute_leader_slot_ticks(
&my_next_leader_slot,
@ -221,31 +223,31 @@ impl PohRecorder {
.working_bank
.as_ref()
.ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?;
if self.poh.tick_height < working_bank.min_tick_height {
if self.tick_height < working_bank.min_tick_height {
return Err(Error::PohRecorderError(
PohRecorderError::MinHeightNotReached,
));
}
if tick && self.poh.tick_height == working_bank.min_tick_height {
if tick && self.tick_height == working_bank.min_tick_height {
return Err(Error::PohRecorderError(
PohRecorderError::MinHeightNotReached,
));
}
let cnt = self
let entry_count = self
.tick_cache
.iter()
.take_while(|x| x.1 <= working_bank.max_tick_height)
.count();
let e = if cnt > 0 {
let send_result = if entry_count > 0 {
debug!(
"flush_cache: bank_slot: {} tick_height: {} max: {} sending: {}",
working_bank.bank.slot(),
working_bank.bank.tick_height(),
working_bank.max_tick_height,
cnt,
entry_count,
);
let cache = &self.tick_cache[..cnt];
let cache = &self.tick_cache[..entry_count];
for t in cache {
working_bank.bank.register_tick(&t.0.hash);
}
@ -254,7 +256,7 @@ impl PohRecorder {
} else {
Ok(())
};
if self.poh.tick_height >= working_bank.max_tick_height {
if self.tick_height >= working_bank.max_tick_height {
info!(
"poh_record: max_tick_height reached, setting working bank {} to None",
working_bank.bank.slot()
@ -263,35 +265,76 @@ impl PohRecorder {
self.start_tick = (self.start_slot + 1) * working_bank.bank.ticks_per_slot();
self.clear_bank();
}
if e.is_err() {
info!("WorkingBank::sender disconnected {:?}", e);
//revert the cache, but clear the working bank
if send_result.is_err() {
info!("WorkingBank::sender disconnected {:?}", send_result);
// revert the cache, but clear the working bank
self.clear_bank();
} else {
//commit the flush
let _ = self.tick_cache.drain(..cnt);
// commit the flush
let _ = self.tick_cache.drain(..entry_count);
}
Ok(())
}
pub fn tick(&mut self) {
let tick = self.generate_tick();
trace!("tick {}", tick.1);
let poh_entry = self.poh.lock().unwrap().tick();
if let Some(poh_entry) = poh_entry {
self.tick_height += 1;
trace!("tick {}", self.tick_height);
if self.start_leader_at_tick.is_none() {
return;
if self.start_leader_at_tick.is_none() {
return;
}
let entry = Entry {
num_hashes: poh_entry.num_hashes,
hash: poh_entry.hash,
transactions: vec![],
};
self.tick_cache.push((entry, self.tick_height));
let _ = self.flush_cache(true);
}
self.tick_cache.push(tick);
let _ = self.flush_cache(true);
}
pub fn record(&mut self, bank_slot: u64, mixin: Hash, txs: Vec<Transaction>) -> Result<()> {
self.flush_cache(false)?;
self.record_and_send_txs(bank_slot, mixin, txs)
pub fn record(
&mut self,
bank_slot: u64,
mixin: Hash,
transactions: Vec<Transaction>,
) -> Result<()> {
// Entries without transactions are used to track real-time passing in the ledger and
// cannot be generated by `record()`
assert!(!transactions.is_empty(), "No transactions provided");
loop {
self.flush_cache(false)?;
let working_bank = self
.working_bank
.as_ref()
.ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?;
if bank_slot != working_bank.bank.slot() {
return Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached));
}
if let Some(poh_entry) = self.poh.lock().unwrap().record(mixin) {
let entry = Entry {
num_hashes: poh_entry.num_hashes,
hash: poh_entry.hash,
transactions,
};
self.sender
.send((working_bank.bank.clone(), vec![(entry, self.tick_height)]))?;
return Ok(());
}
// record() might fail if the next PoH hash needs to be a tick. But that's ok, tick()
// and re-record()
self.tick();
}
}
#[allow(clippy::too_many_arguments)]
pub fn new_with_clear_signal(
tick_height: u64,
last_entry_hash: Hash,
@ -302,8 +345,12 @@ impl PohRecorder {
blocktree: &Arc<Blocktree>,
clear_bank_signal: Option<SyncSender<bool>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
poh_config: &Arc<PohConfig>,
) -> (Self, Receiver<WorkingBankEntries>) {
let poh = Poh::new(last_entry_hash, tick_height);
let poh = Arc::new(Mutex::new(Poh::new(
last_entry_hash,
poh_config.hashes_per_tick,
)));
let (sender, receiver) = channel();
let max_last_leader_grace_ticks = ticks_per_slot / MAX_LAST_LEADER_GRACE_TICKS_FACTOR;
let (start_leader_at_tick, last_leader_tick) = Self::compute_leader_slot_ticks(
@ -314,6 +361,7 @@ impl PohRecorder {
(
Self {
poh,
tick_height,
tick_cache: vec![],
working_bank: None,
sender,
@ -327,6 +375,7 @@ impl PohRecorder {
blocktree: blocktree.clone(),
leader_schedule_cache: leader_schedule_cache.clone(),
ticks_per_slot,
poh_config: poh_config.clone(),
},
receiver,
)
@ -344,6 +393,7 @@ impl PohRecorder {
id: &Pubkey,
blocktree: &Arc<Blocktree>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
poh_config: &Arc<PohConfig>,
) -> (Self, Receiver<WorkingBankEntries>) {
Self::new_with_clear_signal(
tick_height,
@ -355,47 +405,7 @@ impl PohRecorder {
blocktree,
None,
leader_schedule_cache,
)
}
fn record_and_send_txs(
&mut self,
bank_slot: u64,
mixin: Hash,
txs: Vec<Transaction>,
) -> Result<()> {
let working_bank = self
.working_bank
.as_ref()
.ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?;
if bank_slot != working_bank.bank.slot() {
return Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached));
}
let poh_entry = self.poh.record(mixin);
assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function");
let recorded_entry = Entry {
num_hashes: poh_entry.num_hashes,
hash: poh_entry.hash,
transactions: txs,
};
trace!("sending entry {}", recorded_entry.is_tick());
self.sender.send((
working_bank.bank.clone(),
vec![(recorded_entry, poh_entry.tick_height)],
))?;
Ok(())
}
fn generate_tick(&mut self) -> (Entry, u64) {
let tick = self.poh.tick();
assert_ne!(tick.tick_height, 0);
(
Entry {
num_hashes: tick.num_hashes,
hash: tick.hash,
transactions: vec![],
},
tick.tick_height,
poh_config,
)
}
}
@ -405,12 +415,10 @@ mod tests {
use super::*;
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
use crate::genesis_utils::create_genesis_block;
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::test_tx::test_tx;
use solana_sdk::hash::hash;
use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
use std::sync::mpsc::sync_channel;
use std::sync::Arc;
#[test]
fn test_poh_recorder_no_zero_tick() {
@ -429,11 +437,12 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::default()),
&Arc::new(PohConfig::default()),
);
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 1);
assert_eq!(poh_recorder.tick_cache[0].1, 1);
assert_eq!(poh_recorder.poh.tick_height, 1);
assert_eq!(poh_recorder.tick_height, 1);
}
Blocktree::destroy(&ledger_path).unwrap();
}
@ -455,12 +464,13 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::default()),
&Arc::new(PohConfig::default()),
);
poh_recorder.tick();
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 2);
assert_eq!(poh_recorder.tick_cache[1].1, 2);
assert_eq!(poh_recorder.poh.tick_height, 2);
assert_eq!(poh_recorder.tick_height, 2);
}
Blocktree::destroy(&ledger_path).unwrap();
}
@ -480,6 +490,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::default()),
&Arc::new(PohConfig::default()),
);
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 1);
@ -507,6 +518,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let working_bank = WorkingBank {
@ -540,6 +552,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let working_bank = WorkingBank {
@ -557,7 +570,7 @@ mod tests {
// all ticks are sent after height > min
poh_recorder.tick();
assert_eq!(poh_recorder.poh.tick_height, 3);
assert_eq!(poh_recorder.tick_height, 3);
assert_eq!(poh_recorder.tick_cache.len(), 0);
let (bank_, e) = entry_receiver.recv().expect("recv 1");
assert_eq!(e.len(), 3);
@ -585,6 +598,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
poh_recorder.tick();
@ -592,7 +606,7 @@ mod tests {
poh_recorder.tick();
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.last().unwrap().1, 4);
assert_eq!(poh_recorder.poh.tick_height, 4);
assert_eq!(poh_recorder.tick_height, 4);
let working_bank = WorkingBank {
bank,
@ -602,7 +616,7 @@ mod tests {
poh_recorder.set_working_bank(working_bank);
poh_recorder.tick();
assert_eq!(poh_recorder.poh.tick_height, 5);
assert_eq!(poh_recorder.tick_height, 5);
assert!(poh_recorder.working_bank.is_none());
let (_, e) = entry_receiver.recv().expect("recv 1");
assert_eq!(e.len(), 3);
@ -628,6 +642,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let working_bank = WorkingBank {
@ -665,6 +680,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let working_bank = WorkingBank {
@ -675,7 +691,7 @@ mod tests {
poh_recorder.set_working_bank(working_bank);
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 1);
assert_eq!(poh_recorder.poh.tick_height, 1);
assert_eq!(poh_recorder.tick_height, 1);
let tx = test_tx();
let h1 = hash(b"hello world!");
assert_matches!(
@ -704,6 +720,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let working_bank = WorkingBank {
@ -714,7 +731,7 @@ mod tests {
poh_recorder.set_working_bank(working_bank);
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 1);
assert_eq!(poh_recorder.poh.tick_height, 1);
assert_eq!(poh_recorder.tick_height, 1);
let tx = test_tx();
let h1 = hash(b"hello world!");
assert!(poh_recorder
@ -750,6 +767,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let working_bank = WorkingBank {
@ -760,7 +778,7 @@ mod tests {
poh_recorder.set_working_bank(working_bank);
poh_recorder.tick();
poh_recorder.tick();
assert_eq!(poh_recorder.poh.tick_height, 2);
assert_eq!(poh_recorder.tick_height, 2);
let tx = test_tx();
let h1 = hash(b"hello world!");
assert!(poh_recorder
@ -793,6 +811,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let working_bank = WorkingBank {
@ -803,7 +822,7 @@ mod tests {
poh_recorder.set_working_bank(working_bank);
poh_recorder.tick();
poh_recorder.tick();
assert_eq!(poh_recorder.poh.tick_height, 2);
assert_eq!(poh_recorder.tick_height, 2);
drop(entry_receiver);
poh_recorder.tick();
assert!(poh_recorder.working_bank.is_none());
@ -827,13 +846,15 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::default()),
&Arc::new(PohConfig::default()),
);
poh_recorder.tick();
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 2);
let hash = poh_recorder.poh.lock().unwrap().hash;
poh_recorder.reset(
poh_recorder.poh.tick_height,
poh_recorder.poh.hash,
poh_recorder.tick_height,
hash,
0,
Some(4),
DEFAULT_TICKS_PER_SLOT,
@ -858,6 +879,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::default()),
&Arc::new(PohConfig::default()),
);
poh_recorder.tick();
poh_recorder.tick();
@ -889,16 +911,17 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::default()),
&Arc::new(PohConfig::default()),
);
poh_recorder.tick();
poh_recorder.tick();
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 3);
assert_eq!(poh_recorder.poh.tick_height, 3);
assert_eq!(poh_recorder.tick_height, 3);
poh_recorder.reset(1, hash(b"hello"), 0, Some(4), DEFAULT_TICKS_PER_SLOT);
assert_eq!(poh_recorder.tick_cache.len(), 0);
poh_recorder.tick();
assert_eq!(poh_recorder.poh.tick_height, 2);
assert_eq!(poh_recorder.tick_height, 2);
}
Blocktree::destroy(&ledger_path).unwrap();
}
@ -920,6 +943,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let ticks_per_slot = bank.ticks_per_slot();
let working_bank = WorkingBank {
@ -953,6 +977,7 @@ mod tests {
&Arc::new(blocktree),
Some(sender),
&Arc::new(LeaderScheduleCache::default()),
&Arc::new(PohConfig::default()),
);
poh_recorder.set_bank(&bank);
poh_recorder.clear_bank();
@ -982,6 +1007,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let end_slot = 3;
@ -1027,6 +1053,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
// Test that with no leader slot, we don't reach the leader tick
@ -1188,6 +1215,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
// Test that with no leader slot, we don't reach the leader tick

View File

@ -2,62 +2,39 @@
//! "ticks", a measure of time in the PoH stream
use crate::poh_recorder::PohRecorder;
use crate::service::Service;
use solana_sdk::timing::{self, NUM_TICKS_PER_SECOND};
use solana_sdk::poh_config::PohConfig;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::SyncSender;
use std::sync::{Arc, Mutex};
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration;
#[derive(Clone, Debug)]
pub enum PohServiceConfig {
/// * `Tick` - Run full PoH thread. Tick is a rough estimate of how many hashes to roll before
/// transmitting a new entry.
Tick(usize),
/// * `Sleep`- Low power mode. Sleep is a rough estimate of how long to sleep before rolling 1
/// PoH once and producing 1 tick.
Sleep(Duration),
/// each node in simulation will be blocked until the receiver reads their step
Step(SyncSender<()>),
}
impl Default for PohServiceConfig {
fn default() -> PohServiceConfig {
// TODO: Change this to Tick to enable PoH
PohServiceConfig::Sleep(Duration::from_millis(1000 / NUM_TICKS_PER_SECOND))
}
}
impl PohServiceConfig {
pub fn ticks_to_ms(&self, num_ticks: u64) -> u64 {
match self {
PohServiceConfig::Sleep(d) => timing::duration_as_ms(d) * num_ticks,
_ => panic!("Unsuppported tick config"),
}
}
}
pub struct PohService {
tick_producer: JoinHandle<()>,
}
// Number of hashes to batch together.
// * If this number is too small, PoH hash rate will suffer.
// * The larger this number is from 1, the speed of recording transactions will suffer due to lock
// contention with the PoH hashing within `tick_producer()`.
//
// See benches/poh.rs for some benchmarks that attempt to justify this magic number.
pub const NUM_HASHES_PER_BATCH: u64 = 128;
impl PohService {
pub fn new(
poh_recorder: Arc<Mutex<PohRecorder>>,
config: &PohServiceConfig,
poh_config: &Arc<PohConfig>,
poh_exit: &Arc<AtomicBool>,
) -> Self {
// PohService is a headless producer, so when it exits it should notify the banking stage.
// Since channel are not used to talk between these threads an AtomicBool is used as a
// signal.
let poh_exit_ = poh_exit.clone();
// Single thread to generate ticks
let config = config.clone();
let poh_config = poh_config.clone();
let tick_producer = Builder::new()
.name("solana-poh-service-tick_producer".to_string())
.spawn(move || {
let poh_recorder = poh_recorder;
Self::tick_producer(&poh_recorder, &config, &poh_exit_);
if poh_config.hashes_per_tick.is_none() {
Self::sleepy_tick_producer(poh_recorder, &poh_config, &poh_exit_);
} else {
Self::tick_producer(poh_recorder, &poh_exit_);
}
poh_exit_.store(true, Ordering::Relaxed);
})
.unwrap();
@ -65,31 +42,26 @@ impl PohService {
Self { tick_producer }
}
fn tick_producer(
poh: &Arc<Mutex<PohRecorder>>,
config: &PohServiceConfig,
fn sleepy_tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_config: &PohConfig,
poh_exit: &AtomicBool,
) {
while !poh_exit.load(Ordering::Relaxed) {
sleep(poh_config.target_tick_duration);
poh_recorder.lock().unwrap().tick();
}
}
fn tick_producer(poh_recorder: Arc<Mutex<PohRecorder>>, poh_exit: &AtomicBool) {
let poh = poh_recorder.lock().unwrap().poh.clone();
loop {
match config {
PohServiceConfig::Tick(num) => {
for _ in 1..*num {
poh.lock().unwrap().hash();
}
if poh.lock().unwrap().hash(NUM_HASHES_PER_BATCH) {
// Lock PohRecorder only for the final hash...
poh_recorder.lock().unwrap().tick();
if poh_exit.load(Ordering::Relaxed) {
break;
}
PohServiceConfig::Sleep(duration) => {
sleep(*duration);
}
PohServiceConfig::Step(sender) => {
let r = sender.send(());
if r.is_err() {
break;
}
}
}
poh.lock().unwrap().tick();
if poh_exit.load(Ordering::Relaxed) {
return;
}
}
}
@ -115,6 +87,7 @@ mod tests {
use solana_runtime::bank::Bank;
use solana_sdk::hash::hash;
use solana_sdk::pubkey::Pubkey;
use std::time::Duration;
#[test]
fn test_poh_service() {
@ -125,6 +98,10 @@ mod tests {
{
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
let poh_config = Arc::new(PohConfig {
hashes_per_tick: Some(2),
target_tick_duration: Duration::from_millis(42),
});
let (poh_recorder, entry_receiver) = PohRecorder::new(
bank.tick_height(),
prev_hash,
@ -134,6 +111,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&poh_config,
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let exit = Arc::new(AtomicBool::new(false));
@ -154,11 +132,10 @@ mod tests {
// send some data
let h1 = hash(b"hello world!");
let tx = test_tx();
poh_recorder
let _ = poh_recorder
.lock()
.unwrap()
.record(bank.slot(), h1, vec![tx])
.unwrap();
.record(bank.slot(), h1, vec![tx]);
if exit.load(Ordering::Relaxed) {
break Ok(());
@ -168,12 +145,7 @@ mod tests {
.unwrap()
};
const HASHES_PER_TICK: u64 = 2;
let poh_service = PohService::new(
poh_recorder.clone(),
&PohServiceConfig::Tick(HASHES_PER_TICK as usize),
&exit,
);
let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit);
poh_recorder.lock().unwrap().set_working_bank(working_bank);
// get some events
@ -186,9 +158,16 @@ mod tests {
for entry in entry_receiver.recv().unwrap().1 {
let entry = &entry.0;
if entry.is_tick() {
assert!(entry.num_hashes <= HASHES_PER_TICK);
assert!(
entry.num_hashes <= poh_config.hashes_per_tick.unwrap(),
format!(
"{} <= {}",
entry.num_hashes,
poh_config.hashes_per_tick.unwrap()
)
);
if entry.num_hashes == HASHES_PER_TICK {
if entry.num_hashes == poh_config.hashes_per_tick.unwrap() {
need_tick = false;
} else {
need_partial = false;
@ -196,13 +175,13 @@ mod tests {
hashes += entry.num_hashes;
assert_eq!(hashes, HASHES_PER_TICK);
assert_eq!(hashes, poh_config.hashes_per_tick.unwrap());
hashes = 0;
} else {
assert!(entry.num_hashes >= 1);
need_entry = false;
hashes += entry.num_hashes - 1;
hashes += entry.num_hashes;
}
}
}