diff --git a/Cargo.lock b/Cargo.lock index c77f6f3a61..0a567cb9a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3999,9 +3999,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.130" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913" +checksum = "97565067517b60e2d1ea8b268e59ce036de907ac523ad83a0475da04e818989a" dependencies = [ "serde_derive", ] @@ -4027,9 +4027,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.130" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b" +checksum = "ed201699328568d8d08208fdd080e3ff594e6c422e438b6705905da01005d537" dependencies = [ "proc-macro2 1.0.32", "quote 1.0.10", @@ -4513,6 +4513,23 @@ dependencies = [ "solana-version", ] +[[package]] +name = "solana-bloom" +version = "1.9.5" +dependencies = [ + "bv", + "fnv", + "log 0.4.14", + "rand 0.7.3", + "rayon", + "rustc_version 0.4.0", + "serde", + "serde_derive", + "solana-frozen-abi 1.9.5", + "solana-frozen-abi-macro 1.9.5", + "solana-sdk", +] + [[package]] name = "solana-bpf-loader-program" version = "1.9.5" @@ -4778,6 +4795,7 @@ dependencies = [ "serde_json", "serial_test", "solana-accountsdb-plugin-manager", + "solana-bloom", "solana-client", "solana-entry", "solana-frozen-abi 1.9.5", @@ -4998,6 +5016,7 @@ dependencies = [ "serde_bytes", "serde_derive", "serial_test", + "solana-bloom", "solana-clap-utils", "solana-client", "solana-entry", @@ -5307,10 +5326,12 @@ name = "solana-perf" version = "1.9.5" dependencies = [ "bincode", + "bv", "caps", "curve25519-dalek 3.2.0", "dlopen", "dlopen_derive", + "fnv", "lazy_static", "libc", "log 0.4.14", @@ -5319,6 +5340,7 @@ dependencies = [ "rand 0.7.3", "rayon", "serde", + "solana-bloom", "solana-logger 1.9.5", "solana-metrics", "solana-rayon-threadlimit", @@ -5677,6 +5699,7 @@ dependencies = [ "serde", "serde_derive", "solana-address-lookup-table-program", + "solana-bloom", "solana-bucket-map", "solana-compute-budget-program", "solana-config-program", diff --git a/Cargo.toml b/Cargo.toml index cafa5685d3..f26ae0f1d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "banks-interface", "banks-server", "bucket_map", + "bloom", "clap-utils", "cli-config", "cli-output", diff --git a/bloom/Cargo.toml b/bloom/Cargo.toml new file mode 100644 index 0000000000..1a13395259 --- /dev/null +++ b/bloom/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "solana-bloom" +version = "1.9.5" +description = "Solana bloom filter" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-bloom" +edition = "2021" + +[dependencies] +bv = { version = "0.11.1", features = ["serde"] } +fnv = "1.0.7" +rand = "0.7.0" +serde = { version = "1.0.133", features = ["rc"] } +rayon = "1.5.1" +serde_derive = "1.0.103" +solana-frozen-abi = { path = "../frozen-abi", version = "=1.9.5" } +solana-frozen-abi-macro = { path = "../frozen-abi/macro", version = "=1.9.5" } +solana-sdk = { path = "../sdk", version = "=1.9.5" } +log = "0.4.14" + +[lib] +crate-type = ["lib"] +name = "solana_bloom" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[build-dependencies] +rustc_version = "0.4" diff --git a/runtime/benches/bloom.rs b/bloom/benches/bloom.rs similarity index 98% rename from runtime/benches/bloom.rs rename to bloom/benches/bloom.rs index ca2ed9de77..925c18fea3 100644 --- a/runtime/benches/bloom.rs +++ b/bloom/benches/bloom.rs @@ -5,7 +5,7 @@ use { bv::BitVec, fnv::FnvHasher, rand::Rng, - solana_runtime::bloom::{AtomicBloom, Bloom, BloomHashIndex}, + solana_bloom::bloom::{AtomicBloom, Bloom, BloomHashIndex}, solana_sdk::{ hash::{hash, Hash}, signature::Signature, diff --git a/bloom/build.rs b/bloom/build.rs new file mode 120000 index 0000000000..ae66c237c5 --- /dev/null +++ b/bloom/build.rs @@ -0,0 +1 @@ +../frozen-abi/build.rs \ No newline at end of file diff --git a/runtime/src/bloom.rs b/bloom/src/bloom.rs similarity index 97% rename from runtime/src/bloom.rs rename to bloom/src/bloom.rs index c3bbf1f4b5..152c387e12 100644 --- a/runtime/src/bloom.rs +++ b/bloom/src/bloom.rs @@ -101,7 +101,7 @@ impl Bloom { } } fn pos(&self, key: &T, k: u64) -> u64 { - key.hash_at_index(k) % self.bits.len() + key.hash_at_index(k).wrapping_rem(self.bits.len()) } pub fn clear(&mut self) { self.bits = BitVec::new_fill(false, self.bits.len()); @@ -111,7 +111,7 @@ impl Bloom { for k in &self.keys { let pos = self.pos(key, *k); if !self.bits.get(pos) { - self.num_bits_set += 1; + self.num_bits_set = self.num_bits_set.saturating_add(1); self.bits.set(pos, true); } } @@ -164,13 +164,13 @@ impl From> for AtomicBloom { impl AtomicBloom { fn pos(&self, key: &T, hash_index: u64) -> (usize, u64) { - let pos = key.hash_at_index(hash_index) % self.num_bits; + let pos = key.hash_at_index(hash_index).wrapping_rem(self.num_bits); // Divide by 64 to figure out which of the // AtomicU64 bit chunks we need to modify. - let index = pos >> 6; + let index = pos.wrapping_shr(6); // (pos & 63) is equivalent to mod 64 so that we can find // the index of the bit within the AtomicU64 to modify. - let mask = 1u64 << (pos & 63); + let mask = 1u64.wrapping_shl(u32::try_from(pos & 63).unwrap()); (index as usize, mask) } diff --git a/bloom/src/lib.rs b/bloom/src/lib.rs new file mode 100644 index 0000000000..9a78bdcd9e --- /dev/null +++ b/bloom/src/lib.rs @@ -0,0 +1,5 @@ +#![cfg_attr(RUSTC_WITH_SPECIALIZATION, feature(min_specialization))] +pub mod bloom; + +#[macro_use] +extern crate solana_frozen_abi_macro; diff --git a/core/Cargo.toml b/core/Cargo.toml index 606becb059..26ee169518 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -35,6 +35,7 @@ retain_mut = "0.1.5" serde = "1.0.130" serde_derive = "1.0.103" solana-accountsdb-plugin-manager = { path = "../accountsdb-plugin-manager", version = "=1.9.5" } +solana-bloom = { path = "../bloom", version = "=1.9.5" } solana-client = { path = "../client", version = "=1.9.5" } solana-entry = { path = "../entry", version = "=1.9.5" } solana-gossip = { path = "../gossip", version = "=1.9.5" } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 935d271e4e..3c59467d48 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -2,11 +2,10 @@ //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. use { - crate::{packet_hasher::PacketHasher, qos_service::QosService}, + crate::qos_service::QosService, crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, histogram::Histogram, itertools::Itertools, - lru::LruCache, retain_mut::RetainMut, solana_entry::entry::hash_transactions, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, @@ -55,7 +54,6 @@ use { env, mem::size_of, net::{SocketAddr, UdpSocket}, - ops::DerefMut, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, RwLock, @@ -82,8 +80,6 @@ const TOTAL_BUFFERED_PACKETS: usize = 500_000; const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; -const DEFAULT_LRU_SIZE: usize = 200_000; - const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; @@ -354,10 +350,6 @@ impl BankingStage { // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its blockhash is registered with the bank. - let duplicates = Arc::new(Mutex::new(( - LruCache::new(DEFAULT_LRU_SIZE), - PacketHasher::default(), - ))); let data_budget = Arc::new(DataBudget::default()); let qos_service = Arc::new(QosService::new(cost_model)); // Many banks that process transactions in parallel. @@ -382,7 +374,6 @@ impl BankingStage { let mut recv_start = Instant::now(); let transaction_status_sender = transaction_status_sender.clone(); let gossip_vote_sender = gossip_vote_sender.clone(); - let duplicates = duplicates.clone(); let data_budget = data_budget.clone(); let qos_service = qos_service.clone(); Builder::new() @@ -398,7 +389,6 @@ impl BankingStage { batch_limit, transaction_status_sender, gossip_vote_sender, - &duplicates, &data_budget, qos_service, ); @@ -752,7 +742,6 @@ impl BankingStage { batch_limit: usize, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, - duplicates: &Arc, PacketHasher)>>, data_budget: &DataBudget, qos_service: Arc, ) { @@ -808,7 +797,6 @@ impl BankingStage { &gossip_vote_sender, &mut buffered_packet_batches, &mut banking_stage_stats, - duplicates, &recorder, &qos_service, ) { @@ -1360,7 +1348,6 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, buffered_packet_batches: &mut UnprocessedPacketBatches, banking_stage_stats: &mut BankingStageStats, - duplicates: &Arc, PacketHasher)>>, recorder: &TransactionRecorder, qos_service: &Arc, ) -> Result<(), RecvTimeoutError> { @@ -1398,7 +1385,6 @@ impl BankingStage { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - duplicates, banking_stage_stats, ); continue; @@ -1434,7 +1420,6 @@ impl BankingStage { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - duplicates, banking_stage_stats, ); @@ -1462,7 +1447,6 @@ impl BankingStage { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - duplicates, banking_stage_stats, ); } @@ -1519,40 +1503,13 @@ impl BankingStage { fn push_unprocessed( unprocessed_packet_batches: &mut UnprocessedPacketBatches, packet_batch: PacketBatch, - mut packet_indexes: Vec, + packet_indexes: Vec, dropped_packet_batches_count: &mut usize, dropped_packets_count: &mut usize, newly_buffered_packets_count: &mut usize, batch_limit: usize, - duplicates: &Arc, PacketHasher)>>, banking_stage_stats: &mut BankingStageStats, ) { - { - let original_packets_count = packet_indexes.len(); - let mut packet_duplicate_check_time = Measure::start("packet_duplicate_check"); - let mut duplicates = duplicates.lock().unwrap(); - let (cache, hasher) = duplicates.deref_mut(); - packet_indexes.retain(|i| { - let packet_hash = hasher.hash_packet(&packet_batch.packets[*i]); - match cache.get_mut(&packet_hash) { - Some(_hash) => false, - None => { - cache.put(packet_hash, ()); - true - } - } - }); - packet_duplicate_check_time.stop(); - banking_stage_stats - .packet_duplicate_check_elapsed - .fetch_add(packet_duplicate_check_time.as_us(), Ordering::Relaxed); - banking_stage_stats - .dropped_duplicated_packets_count - .fetch_add( - original_packets_count.saturating_sub(packet_indexes.len()), - Ordering::Relaxed, - ); - } if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { if unprocessed_packet_batches.len() >= batch_limit { *dropped_packet_batches_count += 1; @@ -3126,10 +3083,6 @@ mod tests { let new_packet_batch = PacketBatch::new(vec![Packet::default()]); let packet_indexes = vec![]; - let duplicates = Arc::new(Mutex::new(( - LruCache::new(DEFAULT_LRU_SIZE), - PacketHasher::default(), - ))); let mut dropped_packet_batches_count = 0; let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; @@ -3144,7 +3097,6 @@ mod tests { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - &duplicates, &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 1); @@ -3163,7 +3115,6 @@ mod tests { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - &duplicates, &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); @@ -3179,27 +3130,6 @@ mod tests { ) .unwrap()]); assert_eq!(unprocessed_packets.len(), batch_limit); - BankingStage::push_unprocessed( - &mut unprocessed_packets, - new_packet_batch.clone(), - packet_indexes.clone(), - &mut dropped_packet_batches_count, - &mut dropped_packets_count, - &mut newly_buffered_packets_count, - batch_limit, - &duplicates, - &mut banking_stage_stats, - ); - assert_eq!(unprocessed_packets.len(), 2); - assert_eq!( - unprocessed_packets[1].0.packets[0], - new_packet_batch.packets[0] - ); - assert_eq!(dropped_packet_batches_count, 1); - assert_eq!(dropped_packets_count, 2); - assert_eq!(newly_buffered_packets_count, 2); - - // Check duplicates are dropped (newly buffered shouldn't change) BankingStage::push_unprocessed( &mut unprocessed_packets, new_packet_batch.clone(), @@ -3207,8 +3137,7 @@ mod tests { &mut dropped_packet_batches_count, &mut dropped_packets_count, &mut newly_buffered_packets_count, - 3, - &duplicates, + batch_limit, &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index ae121f9a45..00db3d2b67 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -7,10 +7,13 @@ use { crate::sigverify, + core::time::Duration, crossbeam_channel::{SendError, Sender as CrossbeamSender}, itertools::Itertools, + solana_bloom::bloom::{AtomicBloom, Bloom}, solana_measure::measure::Measure, solana_perf::packet::PacketBatch, + solana_perf::sigverify::dedup_packets, solana_sdk::timing, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, std::{ @@ -50,10 +53,13 @@ struct SigVerifierStats { recv_batches_us_hist: histogram::Histogram, // time to call recv_batch verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch discard_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch + dedup_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch batches_hist: histogram::Histogram, // number of packet batches per verify call packets_hist: histogram::Histogram, // number of packets per verify call total_batches: usize, total_packets: usize, + total_dedup: usize, + total_excess_fail: usize, } impl SigVerifierStats { @@ -122,6 +128,26 @@ impl SigVerifierStats { self.discard_packets_pp_us_hist.mean().unwrap_or(0), i64 ), + ( + "dedup_packets_pp_us_90pct", + self.dedup_packets_pp_us_hist.percentile(90.0).unwrap_or(0), + i64 + ), + ( + "dedup_packets_pp_us_min", + self.dedup_packets_pp_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "dedup_packets_pp_us_max", + self.dedup_packets_pp_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "dedup_packets_pp_us_mean", + self.dedup_packets_pp_us_hist.mean().unwrap_or(0), + i64 + ), ( "batches_90pct", self.batches_hist.percentile(90.0).unwrap_or(0), @@ -140,6 +166,8 @@ impl SigVerifierStats { ("packets_mean", self.packets_hist.mean().unwrap_or(0), i64), ("total_batches", self.total_batches, i64), ("total_packets", self.total_packets, i64), + ("total_dedup", self.total_dedup, i64), + ("total_excess_fail", self.total_excess_fail, i64), ); } } @@ -187,6 +215,7 @@ impl SigVerifyStage { } fn verifier( + bloom: &AtomicBloom<&[u8]>, recvr: &PacketBatchReceiver, sendr: &CrossbeamSender>, verifier: &T, @@ -200,13 +229,22 @@ impl SigVerifyStage { timing::timestamp(), num_packets, ); + + let mut dedup_time = Measure::start("sigverify_dedup_time"); + let dedup_fail = dedup_packets(bloom, &mut batches) as usize; + dedup_time.stop(); + let valid_packets = num_packets.saturating_sub(dedup_fail); + let mut discard_time = Measure::start("sigverify_discard_time"); - if num_packets > MAX_SIGVERIFY_BATCH { - Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH); - } + if valid_packets > MAX_SIGVERIFY_BATCH { + Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH) + }; + let excess_fail = valid_packets.saturating_sub(MAX_SIGVERIFY_BATCH); discard_time.stop(); + let mut verify_batch_time = Measure::start("sigverify_batch_time"); - sendr.send(verifier.verify_batches(batches))?; + let batches = verifier.verify_batches(batches); + sendr.send(batches)?; verify_batch_time.stop(); debug!( @@ -230,10 +268,16 @@ impl SigVerifyStage { .discard_packets_pp_us_hist .increment(discard_time.as_us() / (num_packets as u64)) .unwrap(); + stats + .dedup_packets_pp_us_hist + .increment(dedup_time.as_us() / (num_packets as u64)) + .unwrap(); stats.batches_hist.increment(batches_len as u64).unwrap(); stats.packets_hist.increment(num_packets as u64).unwrap(); stats.total_batches += batches_len; stats.total_packets += num_packets; + stats.total_dedup += dedup_fail; + stats.total_excess_fail += excess_fail; Ok(()) } @@ -246,29 +290,48 @@ impl SigVerifyStage { let verifier = verifier.clone(); let mut stats = SigVerifierStats::default(); let mut last_print = Instant::now(); + const MAX_BLOOM_AGE: Duration = Duration::from_millis(2_000); + const MAX_BLOOM_ITEMS: usize = 1_000_000; + const MAX_BLOOM_FAIL: f64 = 0.0001; + const MAX_BLOOM_BITS: usize = 8 << 22; Builder::new() .name("solana-verifier".to_string()) - .spawn(move || loop { - if let Err(e) = - Self::verifier(&packet_receiver, &verified_sender, &verifier, &mut stats) - { - match e { - SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( - RecvTimeoutError::Disconnected, - )) => break, - SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( - RecvTimeoutError::Timeout, - )) => (), - SigVerifyServiceError::Send(_) => { - break; - } - _ => error!("{:?}", e), + .spawn(move || { + let mut bloom = + Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into(); + let mut bloom_age = Instant::now(); + loop { + let now = Instant::now(); + if now.duration_since(bloom_age) > MAX_BLOOM_AGE { + bloom = + Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into(); + bloom_age = now; + } + if let Err(e) = Self::verifier( + &bloom, + &packet_receiver, + &verified_sender, + &verifier, + &mut stats, + ) { + match e { + SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( + RecvTimeoutError::Disconnected, + )) => break, + SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( + RecvTimeoutError::Timeout, + )) => (), + SigVerifyServiceError::Send(_) => { + break; + } + _ => error!("{:?}", e), + } + } + if last_print.elapsed().as_secs() > 2 { + stats.report(); + stats = SigVerifierStats::default(); + last_print = Instant::now(); } - } - if last_print.elapsed().as_secs() > 2 { - stats.report(); - stats = SigVerifierStats::default(); - last_print = Instant::now(); } }) .unwrap() diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index b1ec215a27..554545eb90 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -26,6 +26,7 @@ rayon = "1.5.1" serde = "1.0.130" serde_bytes = "0.11" serde_derive = "1.0.103" +solana-bloom = { path = "../bloom", version = "=1.9.5" } solana-clap-utils = { path = "../clap-utils", version = "=1.9.5" } solana-client = { path = "../client", version = "=1.9.5" } solana-entry = { path = "../entry", version = "=1.9.5" } diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index d5d3acea11..adcc768a62 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -261,7 +261,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "4qB65g6HSnHFxkhZuvMEBCLHARBda1HBwJ8qeQ5RZ6Pk")] +#[frozen_abi(digest = "C1nR7B7CgMyUYo6h3z2KXcS38JSwF6y8jmZ6Y9Cz7XEd")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 5ce1559f17..4e0d8bcd61 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -25,7 +25,7 @@ use { lru::LruCache, rand::Rng, rayon::{prelude::*, ThreadPool}, - solana_runtime::bloom::{AtomicBloom, Bloom}, + solana_bloom::bloom::{AtomicBloom, Bloom}, solana_sdk::{ hash::{hash, Hash}, pubkey::Pubkey, diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index 9f3ba781a8..98a618b178 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -26,7 +26,7 @@ use { itertools::Itertools, lru::LruCache, rand::{seq::SliceRandom, Rng}, - solana_runtime::bloom::{AtomicBloom, Bloom}, + solana_bloom::bloom::{AtomicBloom, Bloom}, solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp}, solana_streamer::socket::SocketAddrSpace, std::{ diff --git a/perf/Cargo.toml b/perf/Cargo.toml index 4ddc9c788d..fc808c1b54 100644 --- a/perf/Cargo.toml +++ b/perf/Cargo.toml @@ -15,10 +15,13 @@ curve25519-dalek = { version = "3" } dlopen = "0.1.8" dlopen_derive = "0.1.4" lazy_static = "1.4.0" +bv = { version = "0.11.1", features = ["serde"] } +fnv = "1.0.7" log = "0.4.14" rand = "0.7.0" rayon = "1.5.1" serde = "1.0.130" +solana-bloom = { path = "../bloom", version = "=1.9.5" } solana-logger = { path = "../logger", version = "=1.9.5" } solana-metrics = { path = "../metrics", version = "=1.9.5" } solana-sdk = { path = "../sdk", version = "=1.9.5" } diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs new file mode 100644 index 0000000000..e8590a67ec --- /dev/null +++ b/perf/benches/dedup.rs @@ -0,0 +1,45 @@ +#![feature(test)] + +extern crate test; + +use { + solana_bloom::bloom::{AtomicBloom, Bloom}, + solana_perf::{packet::to_packet_batches, sigverify, test_tx::test_tx}, + test::Bencher, +}; + +#[bench] +fn bench_dedup_same(bencher: &mut Bencher) { + let tx = test_tx(); + + // generate packet vector + let mut batches = to_packet_batches( + &std::iter::repeat(tx).take(64 * 1024).collect::>(), + 128, + ); + let packet_count = sigverify::count_packets_in_batches(&batches); + let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into(); + + println!("packet_count {} {}", packet_count, batches.len()); + + // verify packets + bencher.iter(|| { + let _ans = sigverify::dedup_packets(&bloom, &mut batches); + }) +} + +#[bench] +fn bench_dedup_diff(bencher: &mut Bencher) { + // generate packet vector + let mut batches = + to_packet_batches(&(0..64 * 1024).map(|_| test_tx()).collect::>(), 128); + let packet_count = sigverify::count_packets_in_batches(&batches); + let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into(); + + println!("packet_count {} {}", packet_count, batches.len()); + + // verify packets + bencher.iter(|| { + let _ans = sigverify::dedup_packets(&bloom, &mut batches); + }) +} diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 8396bb529b..e6977430d0 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -4,6 +4,7 @@ //! to the GPU. //! +use solana_bloom::bloom::AtomicBloom; #[cfg(test)] use solana_sdk::transaction::Transaction; use { @@ -23,6 +24,7 @@ use { short_vec::decode_shortu16_len, signature::Signature, }, + std::sync::atomic::{AtomicU64, Ordering}, std::{convert::TryFrom, mem::size_of}, }; @@ -418,6 +420,37 @@ pub fn generate_offsets( ) } +fn dedup_packet(count: &AtomicU64, packet: &mut Packet, bloom: &AtomicBloom<&[u8]>) { + // If this packet was already marked as discard, drop it + if packet.meta.discard() { + return; + } + + if bloom.contains(&packet.data.as_slice()) { + packet.meta.set_discard(true); + count.fetch_add(1, Ordering::Relaxed); + return; + } + bloom.add(&packet.data.as_slice()); +} + +pub fn dedup_packets(bloom: &AtomicBloom<&[u8]>, batches: &mut [PacketBatch]) -> u64 { + use rayon::prelude::*; + let packet_count = count_packets_in_batches(batches); + // machine specific random offset to read the u64 from the packet signature + let count = AtomicU64::new(0); + PAR_THREAD_POOL.install(|| { + batches.into_par_iter().for_each(|batch| { + batch + .packets + .par_iter_mut() + .for_each(|p| dedup_packet(&count, p, bloom)) + }) + }); + inc_new_counter_debug!("dedup_packets_total", packet_count); + count.load(Ordering::Relaxed) +} + pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) { use rayon::prelude::*; let packet_count = count_packets_in_batches(batches); @@ -597,11 +630,12 @@ mod tests { use { super::*, crate::{ - packet::{Packet, PacketBatch}, + packet::{to_packet_batches, Packet, PacketBatch}, sigverify::{self, PacketOffsets}, test_tx::{new_test_vote_tx, test_multisig_tx, test_tx}, }, bincode::{deserialize, serialize}, + solana_bloom::bloom::{AtomicBloom, Bloom}, solana_sdk::{ instruction::CompiledInstruction, message::{Message, MessageHeader}, @@ -1261,4 +1295,31 @@ mod tests { current_offset = current_offset.saturating_add(size_of::()); }); } + + #[test] + fn test_dedup_same() { + let tx = test_tx(); + + // generate packet vector + let mut batches = + to_packet_batches(&std::iter::repeat(tx).take(1024).collect::>(), 128); + let packet_count = sigverify::count_packets_in_batches(&batches); + let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 20).into(); + let discard = sigverify::dedup_packets(&bloom, &mut batches) as usize; + // because dedup uses a threadpool, there maybe up to N threads of txs that go through + let n = get_thread_count(); + assert!(packet_count < discard + n * 2); + } + + #[test] + fn test_dedup_diff() { + // generate packet vector + let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); + + let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 20).into(); + let discard = sigverify::dedup_packets(&bloom, &mut batches) as usize; + // because dedup uses a threadpool, there maybe up to N threads of txs that go through + let n = get_thread_count(); + assert!(discard < n * 2); + } } diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 5d28e4ce88..b4da4f7346 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -2335,9 +2335,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.130" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913" +checksum = "97565067517b60e2d1ea8b268e59ce036de907ac523ad83a0475da04e818989a" dependencies = [ "serde_derive", ] @@ -2353,9 +2353,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.130" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b" +checksum = "ed201699328568d8d08208fdd080e3ff594e6c422e438b6705905da01005d537" dependencies = [ "proc-macro2 1.0.24", "quote 1.0.6", @@ -2560,6 +2560,23 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "solana-bloom" +version = "1.9.5" +dependencies = [ + "bv", + "fnv", + "log", + "rand 0.7.3", + "rayon", + "rustc_version 0.4.0", + "serde", + "serde_derive", + "solana-frozen-abi 1.9.5", + "solana-frozen-abi-macro 1.9.5", + "solana-sdk", +] + [[package]] name = "solana-bpf-loader-program" version = "1.9.5" @@ -3174,10 +3191,12 @@ name = "solana-perf" version = "1.9.5" dependencies = [ "bincode", + "bv", "caps", "curve25519-dalek 3.2.0", "dlopen", "dlopen_derive", + "fnv", "lazy_static", "libc", "log", @@ -3185,6 +3204,7 @@ dependencies = [ "rand 0.7.3", "rayon", "serde", + "solana-bloom", "solana-logger 1.9.5", "solana-metrics", "solana-rayon-threadlimit", @@ -3371,6 +3391,7 @@ dependencies = [ "serde", "serde_derive", "solana-address-lookup-table-program", + "solana-bloom", "solana-bucket-map", "solana-compute-budget-program", "solana-config-program", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 3f1b1f653d..d0c3633733 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -37,6 +37,7 @@ regex = "1.5.4" serde = { version = "1.0.130", features = ["rc"] } serde_derive = "1.0.103" solana-address-lookup-table-program = { path = "../programs/address-lookup-table", version = "=1.9.5" } +solana-bloom = { path = "../bloom", version = "=1.9.5" } solana-config-program = { path = "../programs/config", version = "=1.9.5" } solana-compute-budget-program = { path = "../programs/compute-budget", version = "=1.9.5" } solana-frozen-abi = { path = "../frozen-abi", version = "=1.9.5" } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index b59f47101b..19d5aab533 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -17,7 +17,6 @@ pub mod bank_forks; pub mod bank_utils; pub mod block_cost_limits; pub mod blockhash_queue; -pub mod bloom; pub mod bucket_map_holder; pub mod bucket_map_holder_stats; pub mod builtins;