committed by
GitHub
parent
2d5957a4b4
commit
cb5106a15b
@@ -48,6 +48,7 @@ solana-account-decoder = { path = "../account-decoder", version = "=1.8.14" }
|
||||
solana-accountsdb-plugin-manager = { path = "../accountsdb-plugin-manager", version = "=1.8.14" }
|
||||
solana-banks-server = { path = "../banks-server", version = "=1.8.14" }
|
||||
solana-clap-utils = { path = "../clap-utils", version = "=1.8.14" }
|
||||
solana-bloom = { path = "../bloom", version = "=1.8.14" }
|
||||
solana-client = { path = "../client", version = "=1.8.14" }
|
||||
solana-gossip = { path = "../gossip", version = "=1.8.14" }
|
||||
solana-ledger = { path = "../ledger", version = "=1.8.14" }
|
||||
|
@@ -2,11 +2,9 @@
|
||||
//! to contruct a software pipeline. The stage uses all available CPU cores and
|
||||
//! can do its processing in parallel with signature verification on the GPU.
|
||||
use {
|
||||
crate::packet_hasher::PacketHasher,
|
||||
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
|
||||
histogram::Histogram,
|
||||
itertools::Itertools,
|
||||
lru::LruCache,
|
||||
retain_mut::RetainMut,
|
||||
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
|
||||
solana_ledger::{blockstore_processor::TransactionStatusSender, entry::hash_transactions},
|
||||
@@ -53,7 +51,6 @@ use {
|
||||
env,
|
||||
mem::size_of,
|
||||
net::{SocketAddr, UdpSocket},
|
||||
ops::DerefMut,
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||
Arc, Mutex, RwLock, RwLockReadGuard,
|
||||
@@ -80,8 +77,6 @@ const TOTAL_BUFFERED_PACKETS: usize = 500_000;
|
||||
|
||||
const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
|
||||
|
||||
const DEFAULT_LRU_SIZE: usize = 200_000;
|
||||
|
||||
const NUM_VOTE_PROCESSING_THREADS: u32 = 2;
|
||||
const MIN_THREADS_BANKING: u32 = 1;
|
||||
|
||||
@@ -93,7 +88,6 @@ pub struct BankingStageStats {
|
||||
new_tx_count: AtomicUsize,
|
||||
dropped_packet_batches_count: AtomicUsize,
|
||||
dropped_packets_count: AtomicUsize,
|
||||
dropped_duplicated_packets_count: AtomicUsize,
|
||||
newly_buffered_packets_count: AtomicUsize,
|
||||
current_buffered_packets_count: AtomicUsize,
|
||||
current_buffered_packet_batches_count: AtomicUsize,
|
||||
@@ -108,7 +102,6 @@ pub struct BankingStageStats {
|
||||
process_packets_elapsed: AtomicU64,
|
||||
handle_retryable_packets_elapsed: AtomicU64,
|
||||
filter_pending_packets_elapsed: AtomicU64,
|
||||
packet_duplicate_check_elapsed: AtomicU64,
|
||||
packet_conversion_elapsed: AtomicU64,
|
||||
unprocessed_packet_conversion_elapsed: AtomicU64,
|
||||
transaction_processing_elapsed: AtomicU64,
|
||||
@@ -154,12 +147,6 @@ impl BankingStageStats {
|
||||
self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"dropped_duplicated_packets_count",
|
||||
self.dropped_duplicated_packets_count
|
||||
.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"newly_buffered_packets_count",
|
||||
self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64,
|
||||
@@ -222,12 +209,6 @@ impl BankingStageStats {
|
||||
.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"packet_duplicate_check_elapsed",
|
||||
self.packet_duplicate_check_elapsed
|
||||
.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"packet_conversion_elapsed",
|
||||
self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64,
|
||||
@@ -348,10 +329,6 @@ impl BankingStage {
|
||||
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
|
||||
// Once an entry has been recorded, its blockhash is registered with the bank.
|
||||
let my_pubkey = cluster_info.id();
|
||||
let duplicates = Arc::new(Mutex::new((
|
||||
LruCache::new(DEFAULT_LRU_SIZE),
|
||||
PacketHasher::default(),
|
||||
)));
|
||||
let data_budget = Arc::new(DataBudget::default());
|
||||
// Many banks that process transactions in parallel.
|
||||
assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING);
|
||||
@@ -375,7 +352,6 @@ impl BankingStage {
|
||||
let mut recv_start = Instant::now();
|
||||
let transaction_status_sender = transaction_status_sender.clone();
|
||||
let gossip_vote_sender = gossip_vote_sender.clone();
|
||||
let duplicates = duplicates.clone();
|
||||
let data_budget = data_budget.clone();
|
||||
let cost_model = cost_model.clone();
|
||||
Builder::new()
|
||||
@@ -392,7 +368,6 @@ impl BankingStage {
|
||||
batch_limit,
|
||||
transaction_status_sender,
|
||||
gossip_vote_sender,
|
||||
&duplicates,
|
||||
&data_budget,
|
||||
cost_model,
|
||||
);
|
||||
@@ -743,7 +718,6 @@ impl BankingStage {
|
||||
batch_limit: usize,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
gossip_vote_sender: ReplayVoteSender,
|
||||
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
|
||||
data_budget: &DataBudget,
|
||||
cost_model: Arc<RwLock<CostModel>>,
|
||||
) {
|
||||
@@ -798,7 +772,6 @@ impl BankingStage {
|
||||
&gossip_vote_sender,
|
||||
&mut buffered_packet_batches,
|
||||
&mut banking_stage_stats,
|
||||
duplicates,
|
||||
&recorder,
|
||||
&cost_model,
|
||||
) {
|
||||
@@ -1456,7 +1429,6 @@ impl BankingStage {
|
||||
gossip_vote_sender: &ReplayVoteSender,
|
||||
buffered_packet_batches: &mut UnprocessedPacketBatches,
|
||||
banking_stage_stats: &mut BankingStageStats,
|
||||
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
|
||||
recorder: &TransactionRecorder,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
) -> Result<(), RecvTimeoutError> {
|
||||
@@ -1493,7 +1465,6 @@ impl BankingStage {
|
||||
&mut dropped_packets_count,
|
||||
&mut newly_buffered_packets_count,
|
||||
batch_limit,
|
||||
duplicates,
|
||||
banking_stage_stats,
|
||||
);
|
||||
continue;
|
||||
@@ -1524,7 +1495,6 @@ impl BankingStage {
|
||||
&mut dropped_packets_count,
|
||||
&mut newly_buffered_packets_count,
|
||||
batch_limit,
|
||||
duplicates,
|
||||
banking_stage_stats,
|
||||
);
|
||||
|
||||
@@ -1553,7 +1523,6 @@ impl BankingStage {
|
||||
&mut dropped_packets_count,
|
||||
&mut newly_buffered_packets_count,
|
||||
batch_limit,
|
||||
duplicates,
|
||||
banking_stage_stats,
|
||||
);
|
||||
}
|
||||
@@ -1610,40 +1579,13 @@ impl BankingStage {
|
||||
fn push_unprocessed(
|
||||
unprocessed_packet_batches: &mut UnprocessedPacketBatches,
|
||||
packet_batch: PacketBatch,
|
||||
mut packet_indexes: Vec<usize>,
|
||||
packet_indexes: Vec<usize>,
|
||||
dropped_packet_batches_count: &mut usize,
|
||||
dropped_packets_count: &mut usize,
|
||||
newly_buffered_packets_count: &mut usize,
|
||||
batch_limit: usize,
|
||||
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
|
||||
banking_stage_stats: &mut BankingStageStats,
|
||||
) {
|
||||
{
|
||||
let original_packets_count = packet_indexes.len();
|
||||
let mut packet_duplicate_check_time = Measure::start("packet_duplicate_check");
|
||||
let mut duplicates = duplicates.lock().unwrap();
|
||||
let (cache, hasher) = duplicates.deref_mut();
|
||||
packet_indexes.retain(|i| {
|
||||
let packet_hash = hasher.hash_packet(&packet_batch.packets[*i]);
|
||||
match cache.get_mut(&packet_hash) {
|
||||
Some(_hash) => false,
|
||||
None => {
|
||||
cache.put(packet_hash, ());
|
||||
true
|
||||
}
|
||||
}
|
||||
});
|
||||
packet_duplicate_check_time.stop();
|
||||
banking_stage_stats
|
||||
.packet_duplicate_check_elapsed
|
||||
.fetch_add(packet_duplicate_check_time.as_us(), Ordering::Relaxed);
|
||||
banking_stage_stats
|
||||
.dropped_duplicated_packets_count
|
||||
.fetch_add(
|
||||
original_packets_count.saturating_sub(packet_indexes.len()),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
|
||||
if unprocessed_packet_batches.len() >= batch_limit {
|
||||
*dropped_packet_batches_count += 1;
|
||||
@@ -3221,10 +3163,6 @@ mod tests {
|
||||
let new_packet_batch = PacketBatch::new(vec![Packet::default()]);
|
||||
let packet_indexes = vec![];
|
||||
|
||||
let duplicates = Arc::new(Mutex::new((
|
||||
LruCache::new(DEFAULT_LRU_SIZE),
|
||||
PacketHasher::default(),
|
||||
)));
|
||||
let mut dropped_packet_batches_count = 0;
|
||||
let mut dropped_packets_count = 0;
|
||||
let mut newly_buffered_packets_count = 0;
|
||||
@@ -3239,7 +3177,6 @@ mod tests {
|
||||
&mut dropped_packets_count,
|
||||
&mut newly_buffered_packets_count,
|
||||
batch_limit,
|
||||
&duplicates,
|
||||
&mut banking_stage_stats,
|
||||
);
|
||||
assert_eq!(unprocessed_packets.len(), 1);
|
||||
@@ -3258,7 +3195,6 @@ mod tests {
|
||||
&mut dropped_packets_count,
|
||||
&mut newly_buffered_packets_count,
|
||||
batch_limit,
|
||||
&duplicates,
|
||||
&mut banking_stage_stats,
|
||||
);
|
||||
assert_eq!(unprocessed_packets.len(), 2);
|
||||
@@ -3274,27 +3210,6 @@ mod tests {
|
||||
)
|
||||
.unwrap()]);
|
||||
assert_eq!(unprocessed_packets.len(), batch_limit);
|
||||
BankingStage::push_unprocessed(
|
||||
&mut unprocessed_packets,
|
||||
new_packet_batch.clone(),
|
||||
packet_indexes.clone(),
|
||||
&mut dropped_packet_batches_count,
|
||||
&mut dropped_packets_count,
|
||||
&mut newly_buffered_packets_count,
|
||||
batch_limit,
|
||||
&duplicates,
|
||||
&mut banking_stage_stats,
|
||||
);
|
||||
assert_eq!(unprocessed_packets.len(), 2);
|
||||
assert_eq!(
|
||||
unprocessed_packets[1].0.packets[0],
|
||||
new_packet_batch.packets[0]
|
||||
);
|
||||
assert_eq!(dropped_packet_batches_count, 1);
|
||||
assert_eq!(dropped_packets_count, 2);
|
||||
assert_eq!(newly_buffered_packets_count, 2);
|
||||
|
||||
// Check duplicates are dropped (newly buffered shouldn't change)
|
||||
BankingStage::push_unprocessed(
|
||||
&mut unprocessed_packets,
|
||||
new_packet_batch.clone(),
|
||||
@@ -3302,8 +3217,7 @@ mod tests {
|
||||
&mut dropped_packet_batches_count,
|
||||
&mut dropped_packets_count,
|
||||
&mut newly_buffered_packets_count,
|
||||
3,
|
||||
&duplicates,
|
||||
batch_limit,
|
||||
&mut banking_stage_stats,
|
||||
);
|
||||
assert_eq!(unprocessed_packets.len(), 2);
|
||||
|
@@ -7,10 +7,13 @@
|
||||
|
||||
use {
|
||||
crate::sigverify,
|
||||
core::time::Duration,
|
||||
crossbeam_channel::{SendError, Sender as CrossbeamSender},
|
||||
itertools::Itertools,
|
||||
solana_bloom::bloom::{AtomicBloom, Bloom},
|
||||
solana_measure::measure::Measure,
|
||||
solana_perf::packet::PacketBatch,
|
||||
solana_perf::sigverify::dedup_packets,
|
||||
solana_sdk::timing,
|
||||
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError},
|
||||
std::{
|
||||
@@ -50,10 +53,13 @@ struct SigVerifierStats {
|
||||
recv_batches_us_hist: histogram::Histogram, // time to call recv_batch
|
||||
verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch
|
||||
discard_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch
|
||||
dedup_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch
|
||||
batches_hist: histogram::Histogram, // number of packet batches per verify call
|
||||
packets_hist: histogram::Histogram, // number of packets per verify call
|
||||
total_batches: usize,
|
||||
total_packets: usize,
|
||||
total_dedup: usize,
|
||||
total_excess_fail: usize,
|
||||
}
|
||||
|
||||
impl SigVerifierStats {
|
||||
@@ -122,6 +128,26 @@ impl SigVerifierStats {
|
||||
self.discard_packets_pp_us_hist.mean().unwrap_or(0),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"dedup_packets_pp_us_90pct",
|
||||
self.dedup_packets_pp_us_hist.percentile(90.0).unwrap_or(0),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"dedup_packets_pp_us_min",
|
||||
self.dedup_packets_pp_us_hist.minimum().unwrap_or(0),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"dedup_packets_pp_us_max",
|
||||
self.dedup_packets_pp_us_hist.maximum().unwrap_or(0),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"dedup_packets_pp_us_mean",
|
||||
self.dedup_packets_pp_us_hist.mean().unwrap_or(0),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"batches_90pct",
|
||||
self.batches_hist.percentile(90.0).unwrap_or(0),
|
||||
@@ -140,6 +166,8 @@ impl SigVerifierStats {
|
||||
("packets_mean", self.packets_hist.mean().unwrap_or(0), i64),
|
||||
("total_batches", self.total_batches, i64),
|
||||
("total_packets", self.total_packets, i64),
|
||||
("total_dedup", self.total_dedup, i64),
|
||||
("total_excess_fail", self.total_excess_fail, i64),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -187,22 +215,36 @@ impl SigVerifyStage {
|
||||
}
|
||||
|
||||
fn verifier<T: SigVerifier>(
|
||||
bloom: &AtomicBloom<&[u8]>,
|
||||
recvr: &PacketBatchReceiver,
|
||||
sendr: &CrossbeamSender<Vec<PacketBatch>>,
|
||||
verifier: &T,
|
||||
stats: &mut SigVerifierStats,
|
||||
) -> Result<()> {
|
||||
let (mut batches, len, recv_time) = streamer::recv_batch(recvr)?;
|
||||
let (mut batches, num_packets, recv_time) = streamer::recv_batch(recvr)?;
|
||||
|
||||
let batches_len = batches.len();
|
||||
debug!(
|
||||
"@{:?} verifier: verifying: {}",
|
||||
timing::timestamp(),
|
||||
num_packets,
|
||||
);
|
||||
|
||||
let mut dedup_time = Measure::start("sigverify_dedup_time");
|
||||
let dedup_fail = dedup_packets(bloom, &mut batches) as usize;
|
||||
dedup_time.stop();
|
||||
let valid_packets = num_packets.saturating_sub(dedup_fail);
|
||||
|
||||
let mut discard_time = Measure::start("sigverify_discard_time");
|
||||
if valid_packets > MAX_SIGVERIFY_BATCH {
|
||||
Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH)
|
||||
};
|
||||
let excess_fail = valid_packets.saturating_sub(MAX_SIGVERIFY_BATCH);
|
||||
discard_time.stop();
|
||||
|
||||
let mut verify_batch_time = Measure::start("sigverify_batch_time");
|
||||
let batches_len = batches.len();
|
||||
debug!("@{:?} verifier: verifying: {}", timing::timestamp(), len,);
|
||||
let mut discard_time = Measure::start("sigverify_discard_time");
|
||||
if len > MAX_SIGVERIFY_BATCH {
|
||||
Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH);
|
||||
}
|
||||
discard_time.stop();
|
||||
sendr.send(verifier.verify_batches(batches))?;
|
||||
let batches = verifier.verify_batches(batches);
|
||||
sendr.send(batches)?;
|
||||
verify_batch_time.stop();
|
||||
|
||||
debug!(
|
||||
@@ -210,14 +252,14 @@ impl SigVerifyStage {
|
||||
timing::timestamp(),
|
||||
batches_len,
|
||||
verify_batch_time.as_ms(),
|
||||
len,
|
||||
(len as f32 / verify_batch_time.as_s())
|
||||
num_packets,
|
||||
(num_packets as f32 / verify_batch_time.as_s())
|
||||
);
|
||||
|
||||
datapoint_debug!(
|
||||
"sigverify_stage-total_verify_time",
|
||||
("num_batches", batches_len, i64),
|
||||
("num_packets", len, i64),
|
||||
("num_packets", num_packets, i64),
|
||||
("verify_time_ms", verify_batch_time.as_ms(), i64),
|
||||
("recv_time", recv_time, i64),
|
||||
);
|
||||
@@ -228,16 +270,22 @@ impl SigVerifyStage {
|
||||
.unwrap();
|
||||
stats
|
||||
.verify_batches_pp_us_hist
|
||||
.increment(verify_batch_time.as_us() / (len as u64))
|
||||
.increment(verify_batch_time.as_us() / (num_packets as u64))
|
||||
.unwrap();
|
||||
stats
|
||||
.discard_packets_pp_us_hist
|
||||
.increment(discard_time.as_us() / (len as u64))
|
||||
.increment(discard_time.as_us() / (num_packets as u64))
|
||||
.unwrap();
|
||||
stats
|
||||
.dedup_packets_pp_us_hist
|
||||
.increment(dedup_time.as_us() / (num_packets as u64))
|
||||
.unwrap();
|
||||
stats.batches_hist.increment(batches_len as u64).unwrap();
|
||||
stats.packets_hist.increment(len as u64).unwrap();
|
||||
stats.packets_hist.increment(num_packets as u64).unwrap();
|
||||
stats.total_batches += batches_len;
|
||||
stats.total_packets += len;
|
||||
stats.total_packets += num_packets;
|
||||
stats.total_dedup += dedup_fail;
|
||||
stats.total_excess_fail += excess_fail;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -250,29 +298,48 @@ impl SigVerifyStage {
|
||||
let verifier = verifier.clone();
|
||||
let mut stats = SigVerifierStats::default();
|
||||
let mut last_print = Instant::now();
|
||||
const MAX_BLOOM_AGE: Duration = Duration::from_millis(2_000);
|
||||
const MAX_BLOOM_ITEMS: usize = 1_000_000;
|
||||
const MAX_BLOOM_FAIL: f64 = 0.0001;
|
||||
const MAX_BLOOM_BITS: usize = 8 << 22;
|
||||
Builder::new()
|
||||
.name("solana-verifier".to_string())
|
||||
.spawn(move || loop {
|
||||
if let Err(e) =
|
||||
Self::verifier(&packet_receiver, &verified_sender, &verifier, &mut stats)
|
||||
{
|
||||
match e {
|
||||
SigVerifyServiceError::Streamer(StreamerError::RecvTimeout(
|
||||
RecvTimeoutError::Disconnected,
|
||||
)) => break,
|
||||
SigVerifyServiceError::Streamer(StreamerError::RecvTimeout(
|
||||
RecvTimeoutError::Timeout,
|
||||
)) => (),
|
||||
SigVerifyServiceError::Send(_) => {
|
||||
break;
|
||||
}
|
||||
_ => error!("{:?}", e),
|
||||
.spawn(move || {
|
||||
let mut bloom =
|
||||
Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into();
|
||||
let mut bloom_age = Instant::now();
|
||||
loop {
|
||||
let now = Instant::now();
|
||||
if now.duration_since(bloom_age) > MAX_BLOOM_AGE {
|
||||
bloom =
|
||||
Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into();
|
||||
bloom_age = now;
|
||||
}
|
||||
if let Err(e) = Self::verifier(
|
||||
&bloom,
|
||||
&packet_receiver,
|
||||
&verified_sender,
|
||||
&verifier,
|
||||
&mut stats,
|
||||
) {
|
||||
match e {
|
||||
SigVerifyServiceError::Streamer(StreamerError::RecvTimeout(
|
||||
RecvTimeoutError::Disconnected,
|
||||
)) => break,
|
||||
SigVerifyServiceError::Streamer(StreamerError::RecvTimeout(
|
||||
RecvTimeoutError::Timeout,
|
||||
)) => (),
|
||||
SigVerifyServiceError::Send(_) => {
|
||||
break;
|
||||
}
|
||||
_ => error!("{:?}", e),
|
||||
}
|
||||
}
|
||||
if last_print.elapsed().as_secs() > 2 {
|
||||
stats.report();
|
||||
stats = SigVerifierStats::default();
|
||||
last_print = Instant::now();
|
||||
}
|
||||
}
|
||||
if last_print.elapsed().as_secs() > 2 {
|
||||
stats.report();
|
||||
stats = SigVerifierStats::default();
|
||||
last_print = Instant::now();
|
||||
}
|
||||
})
|
||||
.unwrap()
|
||||
|
Reference in New Issue
Block a user