@@ -15,6 +15,8 @@ bzip2 = "0.3.3"
|
||||
chrono = { version = "0.4.11", features = ["serde"] }
|
||||
crossbeam-channel = "0.4"
|
||||
dir-diff = "0.3.2"
|
||||
dlopen_derive = "0.1.4"
|
||||
dlopen = "0.1.8"
|
||||
sha2 = "0.8.2"
|
||||
flate2 = "1.0.14"
|
||||
zstd = "0.5.1"
|
||||
|
@@ -3,7 +3,10 @@
|
||||
//! transactions within it. Entries cannot be reordered, and its field `num_hashes`
|
||||
//! represents an approximate amount of time since the last Entry was created.
|
||||
use crate::poh::Poh;
|
||||
use dlopen::symbor::{Container, SymBorApi, Symbol};
|
||||
use dlopen_derive::SymBorApi;
|
||||
use log::*;
|
||||
use rand::{thread_rng, Rng};
|
||||
use rayon::prelude::*;
|
||||
use rayon::ThreadPool;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -18,7 +21,9 @@ use solana_sdk::hash::Hash;
|
||||
use solana_sdk::timing;
|
||||
use solana_sdk::transaction::Transaction;
|
||||
use std::cell::RefCell;
|
||||
use std::ffi::OsStr;
|
||||
use std::sync::mpsc::{Receiver, Sender};
|
||||
use std::sync::Once;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread::JoinHandle;
|
||||
use std::time::Instant;
|
||||
@@ -33,6 +38,56 @@ thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::
|
||||
pub type EntrySender = Sender<Vec<Entry>>;
|
||||
pub type EntryReceiver = Receiver<Vec<Entry>>;
|
||||
|
||||
static mut API: Option<Container<Api>> = None;
|
||||
|
||||
pub fn init_poh() {
|
||||
init(OsStr::new("libpoh-simd.so"));
|
||||
}
|
||||
|
||||
fn init(name: &OsStr) {
|
||||
static INIT_HOOK: Once = Once::new();
|
||||
|
||||
info!("Loading {:?}", name);
|
||||
unsafe {
|
||||
INIT_HOOK.call_once(|| {
|
||||
let path;
|
||||
let lib_name = if let Some(perf_libs_path) = solana_perf::perf_libs::locate_perf_libs()
|
||||
{
|
||||
solana_perf::perf_libs::append_to_ld_library_path(
|
||||
perf_libs_path.to_str().unwrap_or("").to_string(),
|
||||
);
|
||||
path = perf_libs_path.join(name);
|
||||
path.as_os_str()
|
||||
} else {
|
||||
name
|
||||
};
|
||||
|
||||
API = Container::load(lib_name).ok();
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn api() -> Option<&'static Container<Api<'static>>> {
|
||||
{
|
||||
static INIT_HOOK: Once = Once::new();
|
||||
INIT_HOOK.call_once(|| {
|
||||
if std::env::var("TEST_PERF_LIBS").is_ok() {
|
||||
init_poh()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
unsafe { API.as_ref() }
|
||||
}
|
||||
|
||||
#[derive(SymBorApi)]
|
||||
pub struct Api<'a> {
|
||||
pub poh_verify_many_simd_avx512skx:
|
||||
Symbol<'a, unsafe extern "C" fn(hashes: *mut u8, num_hashes: *const u64)>,
|
||||
pub poh_verify_many_simd_avx2:
|
||||
Symbol<'a, unsafe extern "C" fn(hashes: *mut u8, num_hashes: *const u64)>,
|
||||
}
|
||||
|
||||
/// Each Entry contains three pieces of data. The `num_hashes` field is the number
|
||||
/// of hashes performed since the previous entry. The `hash` field is the result
|
||||
/// of hashing `hash` from the previous entry `num_hashes` times. The `transactions`
|
||||
@@ -248,10 +303,26 @@ impl EntryVerificationState {
|
||||
}
|
||||
}
|
||||
|
||||
fn compare_hashes(computed_hash: Hash, ref_entry: &Entry) -> bool {
|
||||
if ref_entry.num_hashes == 0 {
|
||||
computed_hash == ref_entry.hash
|
||||
} else {
|
||||
let mut poh = Poh::new(computed_hash, None);
|
||||
if ref_entry.transactions.is_empty() {
|
||||
poh.tick().unwrap().hash == ref_entry.hash
|
||||
} else {
|
||||
let tx_hash = hash_transactions(&ref_entry.transactions);
|
||||
poh.record(tx_hash).unwrap().hash == ref_entry.hash
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// an EntrySlice is a slice of Entries
|
||||
pub trait EntrySlice {
|
||||
/// Verifies the hashes and counts of a slice of transactions are all consistent.
|
||||
fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState;
|
||||
fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState;
|
||||
fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState;
|
||||
fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers)
|
||||
-> EntryVerificationState;
|
||||
fn verify(&self, start_hash: &Hash) -> bool;
|
||||
@@ -269,7 +340,8 @@ impl EntrySlice for [Entry] {
|
||||
self.start_verify(start_hash, VerifyRecyclers::default())
|
||||
.finish_verify(self)
|
||||
}
|
||||
fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState {
|
||||
|
||||
fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState {
|
||||
let now = Instant::now();
|
||||
let genesis = [Entry {
|
||||
num_hashes: 0,
|
||||
@@ -293,6 +365,7 @@ impl EntrySlice for [Entry] {
|
||||
})
|
||||
})
|
||||
});
|
||||
|
||||
let poh_duration_us = timing::duration_as_us(&now.elapsed());
|
||||
EntryVerificationState {
|
||||
verification_status: if res {
|
||||
@@ -306,6 +379,112 @@ impl EntrySlice for [Entry] {
|
||||
}
|
||||
}
|
||||
|
||||
fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState {
|
||||
use solana_sdk::hash::HASH_BYTES;
|
||||
let now = Instant::now();
|
||||
let genesis = [Entry {
|
||||
num_hashes: 0,
|
||||
hash: *start_hash,
|
||||
transactions: vec![],
|
||||
}];
|
||||
|
||||
let aligned_len = ((self.len() + simd_len - 1) / simd_len) * simd_len;
|
||||
let mut hashes_bytes = vec![0u8; HASH_BYTES * aligned_len];
|
||||
genesis
|
||||
.iter()
|
||||
.chain(self)
|
||||
.enumerate()
|
||||
.for_each(|(i, entry)| {
|
||||
if i < self.len() {
|
||||
let start = i * HASH_BYTES;
|
||||
let end = start + HASH_BYTES;
|
||||
hashes_bytes[start..end].copy_from_slice(&entry.hash.to_bytes());
|
||||
}
|
||||
});
|
||||
let mut hashes_chunked: Vec<_> = hashes_bytes.chunks_mut(simd_len * HASH_BYTES).collect();
|
||||
|
||||
let mut num_hashes: Vec<u64> = self
|
||||
.iter()
|
||||
.map(|entry| entry.num_hashes.saturating_sub(1))
|
||||
.collect();
|
||||
num_hashes.resize(aligned_len, 0);
|
||||
let num_hashes: Vec<_> = num_hashes.chunks(simd_len).collect();
|
||||
|
||||
let res = PAR_THREAD_POOL.with(|thread_pool| {
|
||||
thread_pool.borrow().install(|| {
|
||||
hashes_chunked
|
||||
.par_iter_mut()
|
||||
.zip(num_hashes)
|
||||
.enumerate()
|
||||
.all(|(i, (chunk, num_hashes))| {
|
||||
match simd_len {
|
||||
8 => unsafe {
|
||||
(api().unwrap().poh_verify_many_simd_avx2)(
|
||||
chunk.as_mut_ptr(),
|
||||
num_hashes.as_ptr(),
|
||||
);
|
||||
},
|
||||
16 => unsafe {
|
||||
(api().unwrap().poh_verify_many_simd_avx512skx)(
|
||||
chunk.as_mut_ptr(),
|
||||
num_hashes.as_ptr(),
|
||||
);
|
||||
},
|
||||
_ => {
|
||||
panic!("unsupported simd len: {}", simd_len);
|
||||
}
|
||||
}
|
||||
let entry_start = i * simd_len;
|
||||
// The last chunk may produce indexes larger than what we have in the reference entries
|
||||
// because it is aligned to simd_len.
|
||||
let entry_end = std::cmp::min(entry_start + simd_len, self.len());
|
||||
self[entry_start..entry_end]
|
||||
.iter()
|
||||
.enumerate()
|
||||
.all(|(j, ref_entry)| {
|
||||
let start = j * HASH_BYTES;
|
||||
let end = start + HASH_BYTES;
|
||||
let hash = Hash::new(&chunk[start..end]);
|
||||
compare_hashes(hash, ref_entry)
|
||||
})
|
||||
})
|
||||
})
|
||||
});
|
||||
let poh_duration_us = timing::duration_as_us(&now.elapsed());
|
||||
EntryVerificationState {
|
||||
verification_status: if res {
|
||||
EntryVerificationStatus::Success
|
||||
} else {
|
||||
EntryVerificationStatus::Failure
|
||||
},
|
||||
poh_duration_us,
|
||||
transaction_duration_us: 0,
|
||||
device_verification_data: DeviceVerificationData::CPU(),
|
||||
}
|
||||
}
|
||||
|
||||
fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState {
|
||||
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
|
||||
let (has_avx2, has_avx512) = (
|
||||
is_x86_feature_detected!("avx2"),
|
||||
is_x86_feature_detected!("avx512f"),
|
||||
);
|
||||
#[cfg(not(any(target_arch = "x86", target_arch = "x86_64")))]
|
||||
let (has_avx2, has_avx512) = (false, false);
|
||||
|
||||
if api().is_some() {
|
||||
if has_avx512 && self.len() >= 128 {
|
||||
self.verify_cpu_x86_simd(start_hash, 16)
|
||||
} else if has_avx2 && self.len() >= 48 {
|
||||
self.verify_cpu_x86_simd(start_hash, 8)
|
||||
} else {
|
||||
self.verify_cpu_generic(start_hash)
|
||||
}
|
||||
} else {
|
||||
self.verify_cpu_generic(start_hash)
|
||||
}
|
||||
}
|
||||
|
||||
fn verify_transaction_signatures(&self) -> bool {
|
||||
PAR_THREAD_POOL.with(|thread_pool| {
|
||||
thread_pool.borrow().install(|| {
|
||||
@@ -343,7 +522,7 @@ impl EntrySlice for [Entry] {
|
||||
return res;
|
||||
}
|
||||
let api = api.unwrap();
|
||||
inc_new_counter_warn!("entry_verify-num_entries", self.len() as usize);
|
||||
inc_new_counter_info!("entry_verify-num_entries", self.len() as usize);
|
||||
|
||||
let genesis = [Entry {
|
||||
num_hashes: 0,
|
||||
@@ -469,6 +648,17 @@ pub fn create_ticks(num_ticks: u64, hashes_per_tick: u64, mut hash: Hash) -> Vec
|
||||
ticks
|
||||
}
|
||||
|
||||
pub fn create_random_ticks(num_ticks: u64, max_hashes_per_tick: u64, mut hash: Hash) -> Vec<Entry> {
|
||||
let mut ticks = Vec::with_capacity(num_ticks as usize);
|
||||
for _ in 0..num_ticks {
|
||||
let hashes_per_tick = thread_rng().gen_range(1, max_hashes_per_tick);
|
||||
let new_tick = next_entry_mut(&mut hash, hashes_per_tick, vec![]);
|
||||
ticks.push(new_tick);
|
||||
}
|
||||
|
||||
ticks
|
||||
}
|
||||
|
||||
/// Creates the next Tick or Transaction Entry `num_hashes` after `start_hash`.
|
||||
pub fn next_entry(prev_hash: &Hash, num_hashes: u64, transactions: Vec<Transaction>) -> Entry {
|
||||
assert!(num_hashes > 0 || transactions.is_empty());
|
||||
@@ -618,7 +808,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_verify_slice() {
|
||||
fn test_verify_slice1() {
|
||||
solana_logger::setup();
|
||||
let zero = Hash::default();
|
||||
let one = hash(&zero.as_ref());
|
||||
@@ -636,7 +826,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_verify_slice_with_hashes() {
|
||||
fn test_verify_slice_with_hashes1() {
|
||||
solana_logger::setup();
|
||||
let zero = Hash::default();
|
||||
let one = hash(&zero.as_ref());
|
||||
@@ -739,4 +929,30 @@ mod tests {
|
||||
assert!(!too_many_tx_entries.verify_tick_hash_count(&mut tick_hash_count, hashes_per_tick));
|
||||
assert_eq!(tick_hash_count, hashes_per_tick);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_poh_verify_fuzz() {
|
||||
solana_logger::setup();
|
||||
for _ in 0..100 {
|
||||
let mut time = Measure::start("ticks");
|
||||
let num_ticks = thread_rng().gen_range(1, 100);
|
||||
info!("create {} ticks:", num_ticks);
|
||||
let mut entries = create_random_ticks(num_ticks, 100, Hash::default());
|
||||
time.stop();
|
||||
|
||||
let mut modified = false;
|
||||
if thread_rng().gen_ratio(1, 2) {
|
||||
modified = true;
|
||||
let modify_idx = thread_rng().gen_range(0, num_ticks) as usize;
|
||||
entries[modify_idx].hash = hash(&[1, 2, 3]);
|
||||
}
|
||||
|
||||
info!("done.. {}", time);
|
||||
let mut time = Measure::start("poh");
|
||||
let res = entries.verify(&Hash::default());
|
||||
assert_eq!(res, !modified);
|
||||
time.stop();
|
||||
info!("{} {}", time, res);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user