Get rid of verified packets and use the Meta::discard flag (#6674)

* get rid of verified packets and use the disabled meta field everywhere
This commit is contained in:
anatoly yakovenko
2019-11-01 14:23:03 -07:00
committed by GitHub
parent 7b6e3a23be
commit 385b4ce959
10 changed files with 88 additions and 86 deletions

View File

@ -25,7 +25,6 @@ use solana_sdk::signature::Signature;
use solana_sdk::system_transaction; use solana_sdk::system_transaction;
use solana_sdk::timing::{duration_as_us, timestamp}; use solana_sdk::timing::{duration_as_us, timestamp};
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::iter;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver; use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
@ -142,13 +141,7 @@ fn main() {
assert!(r.is_ok(), "sanity parallel execution"); assert!(r.is_ok(), "sanity parallel execution");
} }
bank.clear_signatures(); bank.clear_signatures();
let mut verified: Vec<_> = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH) let mut verified: Vec<_> = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH);
.into_iter()
.map(|x| {
let len = x.packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
{ {
let blocktree = Arc::new( let blocktree = Arc::new(
@ -209,7 +202,7 @@ fn main() {
index, index,
); );
for xv in v { for xv in v {
sent += xv.0.packets.len(); sent += xv.packets.len();
} }
verified_sender.send(v.to_vec()).unwrap(); verified_sender.send(v.to_vec()).unwrap();
} }
@ -288,13 +281,7 @@ fn main() {
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect(); let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
tx.signatures[0] = Signature::new(&sig[0..64]); tx.signatures[0] = Signature::new(&sig[0..64]);
} }
verified = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH) verified = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH);
.into_iter()
.map(|x| {
let len = x.packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
} }
start += chunk_len; start += chunk_len;

View File

@ -30,7 +30,6 @@ use solana_sdk::system_instruction;
use solana_sdk::system_transaction; use solana_sdk::system_transaction;
use solana_sdk::timing::{duration_as_us, timestamp}; use solana_sdk::timing::{duration_as_us, timestamp};
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::iter;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver; use std::sync::mpsc::Receiver;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -184,13 +183,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
assert!(r.is_ok(), "sanity parallel execution"); assert!(r.is_ok(), "sanity parallel execution");
} }
bank.clear_signatures(); bank.clear_signatures();
let verified: Vec<_> = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH) let verified: Vec<_> = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH);
.into_iter()
.map(|x| {
let len = x.packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
{ {
let blocktree = Arc::new( let blocktree = Arc::new(
@ -229,7 +222,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
v.len(), v.len(),
); );
for xv in v { for xv in v {
sent += xv.0.packets.len(); sent += xv.packets.len();
} }
verified_sender.send(v.to_vec()).unwrap(); verified_sender.send(v.to_vec()).unwrap();
} }

View File

@ -71,8 +71,8 @@ fn bench_sigverify_stage(bencher: &mut Bencher) {
loop { loop {
if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) {
while let Some(v) = verifieds.pop() { while let Some(v) = verifieds.pop() {
received += v.0.packets.len(); received += v.packets.len();
batches.push(v.0); batches.push(v);
} }
if received >= sent_len { if received >= sent_len {
break; break;

View File

@ -9,7 +9,6 @@ use crate::{
poh_service::PohService, poh_service::PohService,
result::{Error, Result}, result::{Error, Result},
service::Service, service::Service,
sigverify_stage::VerifiedPackets,
}; };
use bincode::deserialize; use bincode::deserialize;
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
@ -74,8 +73,8 @@ impl BankingStage {
pub fn new( pub fn new(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: CrossbeamReceiver<VerifiedPackets>, verified_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_vote_receiver: CrossbeamReceiver<VerifiedPackets>, verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
) -> Self { ) -> Self {
Self::new_num_threads( Self::new_num_threads(
cluster_info, cluster_info,
@ -89,8 +88,8 @@ impl BankingStage {
fn new_num_threads( fn new_num_threads(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: CrossbeamReceiver<VerifiedPackets>, verified_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_vote_receiver: CrossbeamReceiver<VerifiedPackets>, verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
num_threads: u32, num_threads: u32,
) -> Self { ) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH); let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
@ -345,7 +344,7 @@ impl BankingStage {
pub fn process_loop( pub fn process_loop(
my_pubkey: Pubkey, my_pubkey: Pubkey,
verified_receiver: &CrossbeamReceiver<VerifiedPackets>, verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
recv_start: &mut Instant, recv_start: &mut Instant,
@ -793,17 +792,25 @@ impl BankingStage {
filtered_unprocessed_packet_indexes filtered_unprocessed_packet_indexes
} }
fn generate_packet_indexes(vers: Vec<u8>) -> Vec<usize> { fn generate_packet_indexes(vers: &[Packet]) -> Vec<usize> {
vers.iter() vers.iter()
.enumerate() .enumerate()
.filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None }) .filter_map(
|(index, ver)| {
if !ver.meta.discard {
Some(index)
} else {
None
}
},
)
.collect() .collect()
} }
/// Process the incoming packets /// Process the incoming packets
pub fn process_packets( pub fn process_packets(
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
verified_receiver: &CrossbeamReceiver<VerifiedPackets>, verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
recv_start: &mut Instant, recv_start: &mut Instant,
recv_timeout: Duration, recv_timeout: Duration,
@ -815,7 +822,7 @@ impl BankingStage {
recv_time.stop(); recv_time.stop();
let mms_len = mms.len(); let mms_len = mms.len();
let count: usize = mms.iter().map(|x| x.1.len()).sum(); let count: usize = mms.iter().map(|x| x.packets.len()).sum();
debug!( debug!(
"@{:?} process start stalled for: {:?}ms txs: {} id: {}", "@{:?} process start stalled for: {:?}ms txs: {} id: {}",
timestamp(), timestamp(),
@ -830,8 +837,8 @@ impl BankingStage {
let mut mms_iter = mms.into_iter(); let mut mms_iter = mms.into_iter();
let mut unprocessed_packets = vec![]; let mut unprocessed_packets = vec![];
let mut dropped_batches_count = 0; let mut dropped_batches_count = 0;
while let Some((msgs, vers)) = mms_iter.next() { while let Some(msgs) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(vers); let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
let bank = poh.lock().unwrap().bank(); let bank = poh.lock().unwrap().bank();
if bank.is_none() { if bank.is_none() {
Self::push_unprocessed( Self::push_unprocessed(
@ -863,8 +870,8 @@ impl BankingStage {
let next_leader = poh.lock().unwrap().next_slot_leader(); let next_leader = poh.lock().unwrap().next_slot_leader();
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
#[allow(clippy::while_let_on_iterator)] #[allow(clippy::while_let_on_iterator)]
while let Some((msgs, vers)) = mms_iter.next() { while let Some(msgs) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(vers); let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
let unprocessed_indexes = Self::filter_unprocessed_packets( let unprocessed_indexes = Self::filter_unprocessed_packets(
&bank, &bank,
&msgs, &msgs,
@ -1062,6 +1069,16 @@ mod tests {
Blocktree::destroy(&ledger_path).unwrap(); Blocktree::destroy(&ledger_path).unwrap();
} }
pub fn convert_from_old_verified(mut with_vers: Vec<(Packets, Vec<u8>)>) -> Vec<Packets> {
with_vers.iter_mut().for_each(|(b, v)| {
b.packets
.iter_mut()
.zip(v)
.for_each(|(p, f)| p.meta.discard = *f == 0)
});
with_vers.into_iter().map(|(b, _)| b).collect()
}
#[test] #[test]
fn test_banking_stage_entries_only() { fn test_banking_stage_entries_only() {
solana_logger::setup(); solana_logger::setup();
@ -1122,7 +1139,7 @@ mod tests {
.into_iter() .into_iter()
.map(|packets| (packets, vec![0u8, 1u8, 1u8])) .map(|packets| (packets, vec![0u8, 1u8, 1u8]))
.collect(); .collect();
let packets = convert_from_old_verified(packets);
verified_sender // no_ver, anf, tx verified_sender // no_ver, anf, tx
.send(packets) .send(packets)
.unwrap(); .unwrap();
@ -1194,6 +1211,7 @@ mod tests {
.into_iter() .into_iter()
.map(|packets| (packets, vec![1u8])) .map(|packets| (packets, vec![1u8]))
.collect(); .collect();
let packets = convert_from_old_verified(packets);
verified_sender.send(packets).unwrap(); verified_sender.send(packets).unwrap();
// Process a second batch that uses the same from account, so conflicts with above TX // Process a second batch that uses the same from account, so conflicts with above TX
@ -1204,6 +1222,7 @@ mod tests {
.into_iter() .into_iter()
.map(|packets| (packets, vec![1u8])) .map(|packets| (packets, vec![1u8]))
.collect(); .collect();
let packets = convert_from_old_verified(packets);
verified_sender.send(packets).unwrap(); verified_sender.send(packets).unwrap();
let (vote_sender, vote_receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded();

View File

@ -1,8 +1,8 @@
use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}; use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS};
use crate::packet::Packets;
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
use crate::result::Result; use crate::result::Result;
use crate::service::Service; use crate::service::Service;
use crate::sigverify_stage::VerifiedPackets;
use crate::{packet, sigverify}; use crate::{packet, sigverify};
use crossbeam_channel::Sender as CrossbeamSender; use crossbeam_channel::Sender as CrossbeamSender;
use solana_metrics::inc_new_counter_debug; use solana_metrics::inc_new_counter_debug;
@ -20,7 +20,7 @@ impl ClusterInfoVoteListener {
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
sigverify_disabled: bool, sigverify_disabled: bool,
sender: CrossbeamSender<VerifiedPackets>, sender: CrossbeamSender<Vec<Packets>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> Self { ) -> Self {
let exit = exit.clone(); let exit = exit.clone();
@ -45,7 +45,7 @@ impl ClusterInfoVoteListener {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
sigverify_disabled: bool, sigverify_disabled: bool,
sender: &CrossbeamSender<VerifiedPackets>, sender: &CrossbeamSender<Vec<Packets>>,
poh_recorder: Arc<Mutex<PohRecorder>>, poh_recorder: Arc<Mutex<PohRecorder>>,
) -> Result<()> { ) -> Result<()> {
let mut last_ts = 0; let mut last_ts = 0;
@ -57,14 +57,15 @@ impl ClusterInfoVoteListener {
if poh_recorder.lock().unwrap().has_bank() { if poh_recorder.lock().unwrap().has_bank() {
last_ts = new_ts; last_ts = new_ts;
inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len()); inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len());
let msgs = packet::to_packets(&votes); let mut msgs = packet::to_packets(&votes);
if !msgs.is_empty() { if !msgs.is_empty() {
let r = if sigverify_disabled { let r = if sigverify_disabled {
sigverify::ed25519_verify_disabled(&msgs) sigverify::ed25519_verify_disabled(&msgs)
} else { } else {
sigverify::ed25519_verify_cpu(&msgs) sigverify::ed25519_verify_cpu(&msgs)
}; };
sender.send(msgs.into_iter().zip(r).collect())?; sigverify::mark_disabled(&mut msgs, &r);
sender.send(msgs)?;
} }
} }
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));

View File

@ -2,10 +2,10 @@
use crate::{ use crate::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
packet::Packets,
repair_service::RepairStrategy, repair_service::RepairStrategy,
result::{Error, Result}, result::{Error, Result},
service::Service, service::Service,
sigverify_stage::VerifiedPackets,
streamer::PacketReceiver, streamer::PacketReceiver,
window_service::{should_retransmit_and_persist, WindowService}, window_service::{should_retransmit_and_persist, WindowService},
}; };
@ -208,7 +208,7 @@ impl RetransmitStage {
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
retransmit_sockets: Arc<Vec<UdpSocket>>, retransmit_sockets: Arc<Vec<UdpSocket>>,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
verified_receiver: CrossbeamReceiver<VerifiedPackets>, verified_receiver: CrossbeamReceiver<Vec<Packets>>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
completed_slots_receiver: CompletedSlotsReceiver, completed_slots_receiver: CompletedSlotsReceiver,
epoch_schedule: EpochSchedule, epoch_schedule: EpochSchedule,

View File

@ -7,7 +7,7 @@
use crate::cuda_runtime::PinnedVec; use crate::cuda_runtime::PinnedVec;
use crate::packet::{Packet, Packets}; use crate::packet::{Packet, Packets};
use crate::recycler::Recycler; use crate::recycler::Recycler;
use crate::sigverify_stage::{SigVerifier, VerifiedPackets}; use crate::sigverify_stage::SigVerifier;
use bincode::serialized_size; use bincode::serialized_size;
use rayon::ThreadPool; use rayon::ThreadPool;
use solana_ledger::perf_libs; use solana_ledger::perf_libs;
@ -37,12 +37,22 @@ impl Default for TransactionSigVerifier {
} }
impl SigVerifier for TransactionSigVerifier { impl SigVerifier for TransactionSigVerifier {
fn verify_batch(&self, batch: Vec<Packets>) -> VerifiedPackets { fn verify_batch(&self, mut batch: Vec<Packets>) -> Vec<Packets> {
let r = ed25519_verify(&batch, &self.recycler, &self.recycler_out); let r = ed25519_verify(&batch, &self.recycler, &self.recycler_out);
batch.into_iter().zip(r).collect() mark_disabled(&mut batch, &r);
batch
} }
} }
pub fn mark_disabled(batches: &mut Vec<Packets>, r: &[Vec<u8>]) {
batches.iter_mut().zip(r).for_each(|(b, v)| {
b.packets
.iter_mut()
.zip(v)
.for_each(|(p, f)| p.meta.discard = *f == 0)
});
}
use solana_rayon_threadlimit::get_thread_count; use solana_rayon_threadlimit::get_thread_count;
use std::cell::RefCell; use std::cell::RefCell;

View File

@ -5,7 +5,6 @@ use crate::recycler::Recycler;
use crate::recycler::Reset; use crate::recycler::Reset;
use crate::sigverify::{self, TxOffset}; use crate::sigverify::{self, TxOffset};
use crate::sigverify_stage::SigVerifier; use crate::sigverify_stage::SigVerifier;
use crate::sigverify_stage::VerifiedPackets;
use bincode::deserialize; use bincode::deserialize;
use rayon::iter::IntoParallelIterator; use rayon::iter::IntoParallelIterator;
use rayon::iter::ParallelIterator; use rayon::iter::ParallelIterator;
@ -68,7 +67,7 @@ impl ShredSigVerifier {
} }
impl SigVerifier for ShredSigVerifier { impl SigVerifier for ShredSigVerifier {
fn verify_batch(&self, batches: Vec<Packets>) -> VerifiedPackets { fn verify_batch(&self, mut batches: Vec<Packets>) -> Vec<Packets> {
let r_bank = self.bank_forks.read().unwrap().working_bank(); let r_bank = self.bank_forks.read().unwrap().working_bank();
let slots: HashSet<u64> = Self::read_slots(&batches); let slots: HashSet<u64> = Self::read_slots(&batches);
let mut leader_slots: HashMap<u64, Pubkey> = slots let mut leader_slots: HashMap<u64, Pubkey> = slots
@ -90,7 +89,8 @@ impl SigVerifier for ShredSigVerifier {
&self.recycler_pubkeys, &self.recycler_pubkeys,
&self.recycler_out, &self.recycler_out,
); );
batches.into_iter().zip(r).collect() sigverify::mark_disabled(&mut batches, &r);
batches
} }
} }
@ -543,6 +543,7 @@ pub mod tests {
batch[0].packets[1].meta.size = shred.payload.len(); batch[0].packets[1].meta.size = shred.payload.len();
let rv = verifier.verify_batch(batch); let rv = verifier.verify_batch(batch);
assert_eq!(rv[0].1, vec![1, 0]); assert_eq!(rv[0].packets[0].meta.discard, false);
assert_eq!(rv[0].packets[1].meta.discard, true);
} }
} }

View File

@ -22,23 +22,22 @@ use std::thread::{self, Builder, JoinHandle};
const RECV_BATCH_MAX_CPU: usize = 1_000; const RECV_BATCH_MAX_CPU: usize = 1_000;
const RECV_BATCH_MAX_GPU: usize = 5_000; const RECV_BATCH_MAX_GPU: usize = 5_000;
pub type VerifiedPackets = Vec<(Packets, Vec<u8>)>;
pub struct SigVerifyStage { pub struct SigVerifyStage {
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
pub trait SigVerifier { pub trait SigVerifier {
fn verify_batch(&self, batch: Vec<Packets>) -> VerifiedPackets; fn verify_batch(&self, batch: Vec<Packets>) -> Vec<Packets>;
} }
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct DisabledSigVerifier {} pub struct DisabledSigVerifier {}
impl SigVerifier for DisabledSigVerifier { impl SigVerifier for DisabledSigVerifier {
fn verify_batch(&self, batch: Vec<Packets>) -> VerifiedPackets { fn verify_batch(&self, mut batch: Vec<Packets>) -> Vec<Packets> {
let r = sigverify::ed25519_verify_disabled(&batch); let r = sigverify::ed25519_verify_disabled(&batch);
batch.into_iter().zip(r).collect() sigverify::mark_disabled(&mut batch, &r);
batch
} }
} }
@ -46,7 +45,7 @@ impl SigVerifyStage {
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new<T: SigVerifier + 'static + Send + Clone>( pub fn new<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: Receiver<Packets>, packet_receiver: Receiver<Packets>,
verified_sender: CrossbeamSender<VerifiedPackets>, verified_sender: CrossbeamSender<Vec<Packets>>,
verifier: T, verifier: T,
) -> Self { ) -> Self {
let thread_hdls = Self::verifier_services(packet_receiver, verified_sender, verifier); let thread_hdls = Self::verifier_services(packet_receiver, verified_sender, verifier);
@ -55,7 +54,7 @@ impl SigVerifyStage {
fn verifier<T: SigVerifier>( fn verifier<T: SigVerifier>(
recvr: &Arc<Mutex<PacketReceiver>>, recvr: &Arc<Mutex<PacketReceiver>>,
sendr: &CrossbeamSender<VerifiedPackets>, sendr: &CrossbeamSender<Vec<Packets>>,
id: usize, id: usize,
verifier: &T, verifier: &T,
) -> Result<()> { ) -> Result<()> {
@ -115,7 +114,7 @@ impl SigVerifyStage {
fn verifier_service<T: SigVerifier + 'static + Send + Clone>( fn verifier_service<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: Arc<Mutex<PacketReceiver>>, packet_receiver: Arc<Mutex<PacketReceiver>>,
verified_sender: CrossbeamSender<VerifiedPackets>, verified_sender: CrossbeamSender<Vec<Packets>>,
id: usize, id: usize,
verifier: &T, verifier: &T,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
@ -139,7 +138,7 @@ impl SigVerifyStage {
fn verifier_services<T: SigVerifier + 'static + Send + Clone>( fn verifier_services<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: PacketReceiver, packet_receiver: PacketReceiver,
verified_sender: CrossbeamSender<VerifiedPackets>, verified_sender: CrossbeamSender<Vec<Packets>>,
verifier: T, verifier: T,
) -> Vec<JoinHandle<()>> { ) -> Vec<JoinHandle<()>> {
let receiver = Arc::new(Mutex::new(packet_receiver)); let receiver = Arc::new(Mutex::new(packet_receiver));

View File

@ -2,10 +2,10 @@
//! blocktree and retransmitting where required //! blocktree and retransmitting where required
//! //!
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::packet::Packets;
use crate::repair_service::{RepairService, RepairStrategy}; use crate::repair_service::{RepairService, RepairStrategy};
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
use crate::sigverify_stage::VerifiedPackets;
use crate::streamer::PacketSender; use crate::streamer::PacketSender;
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use rayon::iter::IntoParallelRefMutIterator; use rayon::iter::IntoParallelRefMutIterator;
@ -67,7 +67,7 @@ pub fn should_retransmit_and_persist(
fn recv_window<F>( fn recv_window<F>(
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
verified_receiver: &CrossbeamReceiver<VerifiedPackets>, verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
retransmit: &PacketSender, retransmit: &PacketSender,
shred_filter: F, shred_filter: F,
thread_pool: &ThreadPool, thread_pool: &ThreadPool,
@ -78,10 +78,10 @@ where
{ {
let timer = Duration::from_millis(200); let timer = Duration::from_millis(200);
let mut packets = verified_receiver.recv_timeout(timer)?; let mut packets = verified_receiver.recv_timeout(timer)?;
let mut total_packets: usize = packets.iter().map(|(p, _)| p.packets.len()).sum(); let mut total_packets: usize = packets.iter().map(|p| p.packets.len()).sum();
while let Ok(mut more_packets) = verified_receiver.try_recv() { while let Ok(mut more_packets) = verified_receiver.try_recv() {
let count: usize = more_packets.iter().map(|(p, _)| p.packets.len()).sum(); let count: usize = more_packets.iter().map(|p| p.packets.len()).sum();
total_packets += count; total_packets += count;
packets.append(&mut more_packets) packets.append(&mut more_packets)
} }
@ -93,14 +93,12 @@ where
let shreds: Vec<_> = thread_pool.install(|| { let shreds: Vec<_> = thread_pool.install(|| {
packets packets
.par_iter_mut() .par_iter_mut()
.flat_map(|(packets, sigs)| { .flat_map(|packets| {
packets packets
.packets .packets
.iter_mut() .iter_mut()
.zip(sigs.iter()) .filter_map(|packet| {
.filter_map(|(packet, sigcheck)| { if packet.meta.discard {
if *sigcheck == 0 {
packet.meta.discard = true;
inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1); inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1);
None None
} else if let Ok(shred) = } else if let Ok(shred) =
@ -128,7 +126,7 @@ where
trace!("{} num total shreds received: {}", my_pubkey, total_packets); trace!("{} num total shreds received: {}", my_pubkey, total_packets);
for (packets, _) in packets.into_iter() { for packets in packets.into_iter() {
if !packets.packets.is_empty() { if !packets.packets.is_empty() {
// Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit) // Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit)
let _ = retransmit.send(packets); let _ = retransmit.send(packets);
@ -174,7 +172,7 @@ impl WindowService {
pub fn new<F>( pub fn new<F>(
blocktree: Arc<Blocktree>, blocktree: Arc<Blocktree>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
verified_receiver: CrossbeamReceiver<VerifiedPackets>, verified_receiver: CrossbeamReceiver<Vec<Packets>>,
retransmit: PacketSender, retransmit: PacketSender,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
@ -410,7 +408,7 @@ mod test {
} }
fn make_test_window( fn make_test_window(
verified_receiver: CrossbeamReceiver<VerifiedPackets>, verified_receiver: CrossbeamReceiver<Vec<Packets>>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> WindowService { ) -> WindowService {
let blocktree_path = get_tmp_ledger_path!(); let blocktree_path = get_tmp_ledger_path!();
@ -453,24 +451,18 @@ mod test {
}) })
.collect(); .collect();
let mut packets = Packets::new(packets); let mut packets = Packets::new(packets);
let verified = vec![1; packets.packets.len()]; packet_sender.send(vec![packets.clone()]).unwrap();
packet_sender
.send(vec![(packets.clone(), verified)])
.unwrap();
sleep(Duration::from_millis(500)); sleep(Duration::from_millis(500));
// add some empty packets to the data set. These should fail to deserialize // add some empty packets to the data set. These should fail to deserialize
packets.packets.append(&mut vec![Packet::default(); 10]); packets.packets.append(&mut vec![Packet::default(); 10]);
packets.packets.shuffle(&mut thread_rng()); packets.packets.shuffle(&mut thread_rng());
let verified = vec![1; packets.packets.len()]; packet_sender.send(vec![packets.clone()]).unwrap();
packet_sender
.send(vec![(packets.clone(), verified)])
.unwrap();
sleep(Duration::from_millis(500)); sleep(Duration::from_millis(500));
// send 1 empty packet that cannot deserialize into a shred // send 1 empty packet that cannot deserialize into a shred
packet_sender packet_sender
.send(vec![(Packets::new(vec![Packet::default(); 1]), vec![1])]) .send(vec![Packets::new(vec![Packet::default(); 1])])
.unwrap(); .unwrap();
sleep(Duration::from_millis(500)); sleep(Duration::from_millis(500));