Optimize packet dedup (#22571)

* Use bloom filter to dedup packets

* dedup first

* Update bloom/src/bloom.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* Update core/src/sigverify_stage.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* Update core/src/sigverify_stage.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* Update core/src/sigverify_stage.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* fixup

* fixup

* fixup

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>
This commit is contained in:
anatoly yakovenko
2022-01-19 13:58:20 -08:00
committed by GitHub
parent b448472037
commit d343713f61
25 changed files with 296 additions and 152 deletions

View File

@ -34,6 +34,7 @@ retain_mut = "0.1.5"
serde = "1.0.133"
serde_derive = "1.0.103"
solana-address-lookup-table-program = { path = "../programs/address-lookup-table", version = "=1.10.0" }
solana-bloom = { path = "../bloom", version = "=1.10.0" }
solana-accountsdb-plugin-manager = { path = "../accountsdb-plugin-manager", version = "=1.10.0" }
solana-client = { path = "../client", version = "=1.10.0" }
solana-entry = { path = "../entry", version = "=1.10.0" }

View File

@ -10,7 +10,6 @@ use {
rayon::prelude::*,
solana_core::{
banking_stage::{BankingStage, BankingStageStats},
packet_deduper::PacketDeduper,
qos_service::QosService,
},
solana_entry::entry::{next_hash, Entry},
@ -222,7 +221,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
);
let cluster_info = Arc::new(cluster_info);
let (s, _r) = unbounded();
let packet_deduper = PacketDeduper::default();
let _banking_stage = BankingStage::new(
&cluster_info,
&poh_recorder,
@ -232,7 +230,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
None,
s,
Arc::new(RwLock::new(CostModel::default())),
packet_deduper.clone(),
);
poh_recorder.lock().unwrap().set_bank(&bank);
@ -267,7 +264,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
// in this chunk, but since we rotate between CHUNKS then
// we should clear them by the time we come around again to re-use that chunk.
bank.clear_signatures();
packet_deduper.reset();
trace!(
"time: {} checked: {} sent: {}",
duration_as_us(&now.elapsed()),

View File

@ -2,7 +2,7 @@
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use {
crate::{packet_deduper::PacketDeduper, qos_service::QosService},
crate::qos_service::QosService,
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
histogram::Histogram,
itertools::Itertools,
@ -328,7 +328,6 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: Arc<RwLock<CostModel>>,
packet_deduper: PacketDeduper,
) -> Self {
Self::new_num_threads(
cluster_info,
@ -340,7 +339,6 @@ impl BankingStage {
transaction_status_sender,
gossip_vote_sender,
cost_model,
packet_deduper,
)
}
@ -355,7 +353,6 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: Arc<RwLock<CostModel>>,
packet_deduper: PacketDeduper,
) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
// Single thread to generate entries from many banks.
@ -384,7 +381,6 @@ impl BankingStage {
let mut recv_start = Instant::now();
let transaction_status_sender = transaction_status_sender.clone();
let gossip_vote_sender = gossip_vote_sender.clone();
let packet_deduper = packet_deduper.clone();
let data_budget = data_budget.clone();
let cost_model = cost_model.clone();
Builder::new()
@ -400,7 +396,6 @@ impl BankingStage {
batch_limit,
transaction_status_sender,
gossip_vote_sender,
&packet_deduper,
&data_budget,
cost_model,
);
@ -755,7 +750,6 @@ impl BankingStage {
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
packet_deduper: &PacketDeduper,
data_budget: &DataBudget,
cost_model: Arc<RwLock<CostModel>>,
) {
@ -808,7 +802,6 @@ impl BankingStage {
batch_limit,
&mut buffered_packet_batches,
&mut banking_stage_stats,
packet_deduper,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
@ -1413,7 +1406,6 @@ impl BankingStage {
batch_limit: usize,
buffered_packet_batches: &mut UnprocessedPacketBatches,
banking_stage_stats: &mut BankingStageStats,
packet_deduper: &PacketDeduper,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("receive_and_buffer_packets_recv");
let packet_batches = verified_receiver.recv_timeout(recv_timeout)?;
@ -1444,7 +1436,6 @@ impl BankingStage {
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
packet_deduper,
banking_stage_stats,
);
}
@ -1490,15 +1481,13 @@ impl BankingStage {
fn push_unprocessed(
unprocessed_packet_batches: &mut UnprocessedPacketBatches,
packet_batch: PacketBatch,
mut packet_indexes: Vec<usize>,
packet_indexes: Vec<usize>,
dropped_packet_batches_count: &mut usize,
dropped_packets_count: &mut usize,
newly_buffered_packets_count: &mut usize,
batch_limit: usize,
packet_deduper: &PacketDeduper,
banking_stage_stats: &mut BankingStageStats,
) {
packet_deduper.dedupe_packets(&packet_batch, &mut packet_indexes, banking_stage_stats);
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
if unprocessed_packet_batches.len() >= batch_limit {
*dropped_packet_batches_count += 1;
@ -1658,7 +1647,6 @@ mod tests {
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
PacketDeduper::default(),
);
drop(verified_sender);
drop(gossip_verified_vote_sender);
@ -1708,7 +1696,6 @@ mod tests {
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
PacketDeduper::default(),
);
trace!("sending bank");
drop(verified_sender);
@ -1784,7 +1771,6 @@ mod tests {
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
PacketDeduper::default(),
);
// fund another account so we can send 2 good transactions in a single batch.
@ -1936,7 +1922,6 @@ mod tests {
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
PacketDeduper::default(),
);
// wait for banking_stage to eat the packets
@ -3237,7 +3222,6 @@ mod tests {
let new_packet_batch = PacketBatch::new(vec![Packet::default()]);
let packet_indexes = vec![];
let packet_deduper = PacketDeduper::default();
let mut dropped_packet_batches_count = 0;
let mut dropped_packets_count = 0;
let mut newly_buffered_packets_count = 0;
@ -3252,7 +3236,6 @@ mod tests {
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
&packet_deduper,
&mut banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 1);
@ -3271,7 +3254,6 @@ mod tests {
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
&packet_deduper,
&mut banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
@ -3287,27 +3269,6 @@ mod tests {
)
.unwrap()]);
assert_eq!(unprocessed_packets.len(), batch_limit);
BankingStage::push_unprocessed(
&mut unprocessed_packets,
new_packet_batch.clone(),
packet_indexes.clone(),
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
&packet_deduper,
&mut banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(
unprocessed_packets[1].0.packets[0],
new_packet_batch.packets[0]
);
assert_eq!(dropped_packet_batches_count, 1);
assert_eq!(dropped_packets_count, 2);
assert_eq!(newly_buffered_packets_count, 2);
// Check duplicates are dropped (newly buffered shouldn't change)
BankingStage::push_unprocessed(
&mut unprocessed_packets,
new_packet_batch.clone(),
@ -3315,8 +3276,7 @@ mod tests {
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
3,
&packet_deduper,
batch_limit,
&mut banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);

View File

@ -31,7 +31,6 @@ pub mod latest_validator_votes_for_frozen_banks;
pub mod ledger_cleanup_service;
pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests;
pub mod packet_deduper;
pub mod packet_hasher;
pub mod progress_map;
pub mod qos_service;

View File

@ -1,63 +0,0 @@
use {
crate::{banking_stage::BankingStageStats, packet_hasher::PacketHasher},
lru::LruCache,
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
std::{
ops::DerefMut,
sync::{atomic::Ordering, Arc, Mutex},
},
};
const DEFAULT_LRU_SIZE: usize = 200_000;
#[derive(Clone)]
pub struct PacketDeduper(Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>);
impl Default for PacketDeduper {
fn default() -> Self {
Self(Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
))))
}
}
impl PacketDeduper {
pub fn dedupe_packets(
&self,
packet_batch: &PacketBatch,
packet_indexes: &mut Vec<usize>,
banking_stage_stats: &BankingStageStats,
) {
let original_packets_count = packet_indexes.len();
let mut packet_duplicate_check_time = Measure::start("packet_duplicate_check");
let mut duplicates = self.0.lock().unwrap();
let (cache, hasher) = duplicates.deref_mut();
packet_indexes.retain(|i| {
let packet_hash = hasher.hash_packet(&packet_batch.packets[*i]);
match cache.get_mut(&packet_hash) {
Some(_hash) => false,
None => {
cache.put(packet_hash, ());
true
}
}
});
packet_duplicate_check_time.stop();
banking_stage_stats
.packet_duplicate_check_elapsed
.fetch_add(packet_duplicate_check_time.as_us(), Ordering::Relaxed);
banking_stage_stats
.dropped_duplicated_packets_count
.fetch_add(
original_packets_count.saturating_sub(packet_indexes.len()),
Ordering::Relaxed,
);
}
pub fn reset(&self) {
let mut duplicates = self.0.lock().unwrap();
duplicates.0.clear();
}
}

View File

@ -7,10 +7,13 @@
use {
crate::sigverify,
core::time::Duration,
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
itertools::Itertools,
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_perf::sigverify::dedup_packets,
solana_sdk::timing,
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError},
std::{
@ -49,10 +52,13 @@ struct SigVerifierStats {
recv_batches_us_hist: histogram::Histogram, // time to call recv_batch
verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch
discard_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch
dedup_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch
batches_hist: histogram::Histogram, // number of packet batches per verify call
packets_hist: histogram::Histogram, // number of packets per verify call
total_batches: usize,
total_packets: usize,
total_dedup: usize,
total_excess_fail: usize,
}
impl SigVerifierStats {
@ -121,6 +127,26 @@ impl SigVerifierStats {
self.discard_packets_pp_us_hist.mean().unwrap_or(0),
i64
),
(
"dedup_packets_pp_us_90pct",
self.dedup_packets_pp_us_hist.percentile(90.0).unwrap_or(0),
i64
),
(
"dedup_packets_pp_us_min",
self.dedup_packets_pp_us_hist.minimum().unwrap_or(0),
i64
),
(
"dedup_packets_pp_us_max",
self.dedup_packets_pp_us_hist.maximum().unwrap_or(0),
i64
),
(
"dedup_packets_pp_us_mean",
self.dedup_packets_pp_us_hist.mean().unwrap_or(0),
i64
),
(
"batches_90pct",
self.batches_hist.percentile(90.0).unwrap_or(0),
@ -139,6 +165,8 @@ impl SigVerifierStats {
("packets_mean", self.packets_hist.mean().unwrap_or(0), i64),
("total_batches", self.total_batches, i64),
("total_packets", self.total_packets, i64),
("total_dedup", self.total_dedup, i64),
("total_excess_fail", self.total_excess_fail, i64),
);
}
}
@ -186,6 +214,7 @@ impl SigVerifyStage {
}
fn verifier<T: SigVerifier>(
bloom: &AtomicBloom<&[u8]>,
recvr: &PacketBatchReceiver,
sendr: &Sender<Vec<PacketBatch>>,
verifier: &T,
@ -199,13 +228,22 @@ impl SigVerifyStage {
timing::timestamp(),
num_packets,
);
let mut dedup_time = Measure::start("sigverify_dedup_time");
let dedup_fail = dedup_packets(bloom, &mut batches) as usize;
dedup_time.stop();
let valid_packets = num_packets.saturating_sub(dedup_fail);
let mut discard_time = Measure::start("sigverify_discard_time");
if num_packets > MAX_SIGVERIFY_BATCH {
Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH);
}
if valid_packets > MAX_SIGVERIFY_BATCH {
Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH)
};
let excess_fail = valid_packets.saturating_sub(MAX_SIGVERIFY_BATCH);
discard_time.stop();
let mut verify_batch_time = Measure::start("sigverify_batch_time");
sendr.send(verifier.verify_batches(batches))?;
let batches = verifier.verify_batches(batches);
sendr.send(batches)?;
verify_batch_time.stop();
debug!(
@ -229,10 +267,16 @@ impl SigVerifyStage {
.discard_packets_pp_us_hist
.increment(discard_time.as_us() / (num_packets as u64))
.unwrap();
stats
.dedup_packets_pp_us_hist
.increment(dedup_time.as_us() / (num_packets as u64))
.unwrap();
stats.batches_hist.increment(batches_len as u64).unwrap();
stats.packets_hist.increment(num_packets as u64).unwrap();
stats.total_batches += batches_len;
stats.total_packets += num_packets;
stats.total_dedup += dedup_fail;
stats.total_excess_fail += excess_fail;
Ok(())
}
@ -245,29 +289,48 @@ impl SigVerifyStage {
let verifier = verifier.clone();
let mut stats = SigVerifierStats::default();
let mut last_print = Instant::now();
const MAX_BLOOM_AGE: Duration = Duration::from_millis(2_000);
const MAX_BLOOM_ITEMS: usize = 1_000_000;
const MAX_BLOOM_FAIL: f64 = 0.0001;
const MAX_BLOOM_BITS: usize = 8 << 22;
Builder::new()
.name("solana-verifier".to_string())
.spawn(move || loop {
if let Err(e) =
Self::verifier(&packet_receiver, &verified_sender, &verifier, &mut stats)
{
match e {
SigVerifyServiceError::Streamer(StreamerError::RecvTimeout(
RecvTimeoutError::Disconnected,
)) => break,
SigVerifyServiceError::Streamer(StreamerError::RecvTimeout(
RecvTimeoutError::Timeout,
)) => (),
SigVerifyServiceError::Send(_) => {
break;
}
_ => error!("{:?}", e),
.spawn(move || {
let mut bloom =
Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into();
let mut bloom_age = Instant::now();
loop {
let now = Instant::now();
if now.duration_since(bloom_age) > MAX_BLOOM_AGE {
bloom =
Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into();
bloom_age = now;
}
if let Err(e) = Self::verifier(
&bloom,
&packet_receiver,
&verified_sender,
&verifier,
&mut stats,
) {
match e {
SigVerifyServiceError::Streamer(StreamerError::RecvTimeout(
RecvTimeoutError::Disconnected,
)) => break,
SigVerifyServiceError::Streamer(StreamerError::RecvTimeout(
RecvTimeoutError::Timeout,
)) => (),
SigVerifyServiceError::Send(_) => {
break;
}
_ => error!("{:?}", e),
}
}
if last_print.elapsed().as_secs() > 2 {
stats.report();
stats = SigVerifierStats::default();
last_print = Instant::now();
}
}
if last_print.elapsed().as_secs() > 2 {
stats.report();
stats = SigVerifierStats::default();
last_print = Instant::now();
}
})
.unwrap()

View File

@ -10,7 +10,6 @@ use {
GossipVerifiedVoteHashSender, VerifiedVoteSender, VoteTracker,
},
fetch_stage::FetchStage,
packet_deduper::PacketDeduper,
sigverify::TransactionSigVerifier,
sigverify_stage::SigVerifyStage,
},
@ -141,7 +140,6 @@ impl Tpu {
transaction_status_sender,
replay_vote_sender,
cost_model.clone(),
PacketDeduper::default(),
);
let broadcast_stage = broadcast_type.new_broadcast_stage(