SigVerify stage for shreds. (#6563)
This commit is contained in:
committed by
GitHub
parent
9ee65009cd
commit
34a9619806
@ -5,8 +5,11 @@ use crate::cluster_info::ClusterInfo;
|
||||
use crate::repair_service::{RepairService, RepairStrategy};
|
||||
use crate::result::{Error, Result};
|
||||
use crate::service::Service;
|
||||
use crate::streamer::{PacketReceiver, PacketSender};
|
||||
use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
|
||||
use crate::sigverify_stage::VerifiedPackets;
|
||||
use crate::streamer::PacketSender;
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
|
||||
use rayon::iter::IntoParallelRefMutIterator;
|
||||
use rayon::iter::ParallelIterator;
|
||||
use rayon::ThreadPool;
|
||||
use solana_ledger::blocktree::{self, Blocktree};
|
||||
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
||||
@ -18,7 +21,6 @@ use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::timing::duration_as_ms;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::RecvTimeoutError;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{self, Builder, JoinHandle};
|
||||
use std::time::{Duration, Instant};
|
||||
@ -53,9 +55,6 @@ pub fn should_retransmit_and_persist(
|
||||
} else if !verify_shred_slot(shred, root) {
|
||||
inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1);
|
||||
false
|
||||
} else if !shred.verify(&leader_id) {
|
||||
inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
@ -68,7 +67,7 @@ pub fn should_retransmit_and_persist(
|
||||
fn recv_window<F>(
|
||||
blocktree: &Arc<Blocktree>,
|
||||
my_pubkey: &Pubkey,
|
||||
r: &PacketReceiver,
|
||||
verified_receiver: &CrossbeamReceiver<VerifiedPackets>,
|
||||
retransmit: &PacketSender,
|
||||
shred_filter: F,
|
||||
thread_pool: &ThreadPool,
|
||||
@ -78,12 +77,13 @@ where
|
||||
F: Fn(&Shred, u64) -> bool + Sync,
|
||||
{
|
||||
let timer = Duration::from_millis(200);
|
||||
let mut packets = vec![r.recv_timeout(timer)?];
|
||||
let mut total_packets = packets[0].packets.len();
|
||||
let mut packets = verified_receiver.recv_timeout(timer)?;
|
||||
let mut total_packets: usize = packets.iter().map(|(p, _)| p.packets.len()).sum();
|
||||
|
||||
while let Ok(more_packets) = r.try_recv() {
|
||||
total_packets += more_packets.packets.len();
|
||||
packets.push(more_packets)
|
||||
while let Ok(mut more_packets) = verified_receiver.try_recv() {
|
||||
let count: usize = more_packets.iter().map(|(p, _)| p.packets.len()).sum();
|
||||
total_packets += count;
|
||||
packets.append(&mut more_packets)
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
@ -93,12 +93,19 @@ where
|
||||
let shreds: Vec<_> = thread_pool.install(|| {
|
||||
packets
|
||||
.par_iter_mut()
|
||||
.flat_map(|packets| {
|
||||
.flat_map(|(packets, sigs)| {
|
||||
packets
|
||||
.packets
|
||||
.iter_mut()
|
||||
.filter_map(|packet| {
|
||||
if let Ok(shred) = Shred::new_from_serialized_shred(packet.data.to_vec()) {
|
||||
.zip(sigs.iter())
|
||||
.filter_map(|(packet, sigcheck)| {
|
||||
if *sigcheck == 0 {
|
||||
packet.meta.discard = true;
|
||||
inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1);
|
||||
None
|
||||
} else if let Ok(shred) =
|
||||
Shred::new_from_serialized_shred(packet.data.to_vec())
|
||||
{
|
||||
if shred_filter(&shred, last_root) {
|
||||
packet.meta.slot = shred.slot();
|
||||
packet.meta.seed = shred.seed();
|
||||
@ -121,7 +128,7 @@ where
|
||||
|
||||
trace!("{} num total shreds received: {}", my_pubkey, total_packets);
|
||||
|
||||
for packets in packets.into_iter() {
|
||||
for (packets, _) in packets.into_iter() {
|
||||
if !packets.packets.is_empty() {
|
||||
// Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit)
|
||||
let _ = retransmit.send(packets);
|
||||
@ -167,7 +174,7 @@ impl WindowService {
|
||||
pub fn new<F>(
|
||||
blocktree: Arc<Blocktree>,
|
||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||
r: PacketReceiver,
|
||||
verified_receiver: CrossbeamReceiver<VerifiedPackets>,
|
||||
retransmit: PacketSender,
|
||||
repair_socket: Arc<UdpSocket>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
@ -219,7 +226,7 @@ impl WindowService {
|
||||
if let Err(e) = recv_window(
|
||||
&blocktree,
|
||||
&id,
|
||||
&r,
|
||||
&verified_receiver,
|
||||
&retransmit,
|
||||
|shred, last_root| {
|
||||
shred_filter(
|
||||
@ -235,8 +242,8 @@ impl WindowService {
|
||||
&leader_schedule_cache,
|
||||
) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => {
|
||||
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => {
|
||||
if now.elapsed() > Duration::from_secs(30) {
|
||||
warn!("Window does not seem to be receiving data. Ensure port configuration is correct...");
|
||||
now = Instant::now();
|
||||
@ -281,6 +288,7 @@ mod test {
|
||||
repair_service::RepairSlotRange,
|
||||
service::Service,
|
||||
};
|
||||
use crossbeam_channel::unbounded;
|
||||
use rand::{seq::SliceRandom, thread_rng};
|
||||
use solana_ledger::shred::DataShredHeader;
|
||||
use solana_ledger::{
|
||||
@ -296,7 +304,7 @@ mod test {
|
||||
use std::{
|
||||
net::UdpSocket,
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
sync::mpsc::{channel, Receiver},
|
||||
sync::mpsc::channel,
|
||||
sync::{Arc, RwLock},
|
||||
thread::sleep,
|
||||
time::Duration,
|
||||
@ -370,24 +378,6 @@ mod test {
|
||||
false
|
||||
);
|
||||
|
||||
// set the blob to have come from the wrong leader
|
||||
let wrong_leader_keypair = Arc::new(Keypair::new());
|
||||
let leader_pubkey = wrong_leader_keypair.pubkey();
|
||||
let wrong_bank = Arc::new(Bank::new(
|
||||
&create_genesis_block_with_leader(100, &leader_pubkey, 10).genesis_block,
|
||||
));
|
||||
let wrong_cache = Arc::new(LeaderScheduleCache::new_from_bank(&wrong_bank));
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(
|
||||
&shreds[0],
|
||||
Some(wrong_bank.clone()),
|
||||
&wrong_cache,
|
||||
&me_id,
|
||||
0
|
||||
),
|
||||
false
|
||||
);
|
||||
|
||||
// with a Bank and no idea who leader is, blob gets thrown out
|
||||
shreds[0].set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3);
|
||||
assert_eq!(
|
||||
@ -420,7 +410,7 @@ mod test {
|
||||
}
|
||||
|
||||
fn make_test_window(
|
||||
packet_receiver: Receiver<Packets>,
|
||||
verified_receiver: CrossbeamReceiver<VerifiedPackets>,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> WindowService {
|
||||
let blocktree_path = get_tmp_ledger_path!();
|
||||
@ -436,7 +426,7 @@ mod test {
|
||||
let window = WindowService::new(
|
||||
blocktree,
|
||||
cluster_info,
|
||||
packet_receiver,
|
||||
verified_receiver,
|
||||
retransmit_sender,
|
||||
repair_sock,
|
||||
&exit,
|
||||
@ -449,7 +439,7 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn test_recv_window() {
|
||||
let (packet_sender, packet_receiver) = channel();
|
||||
let (packet_sender, packet_receiver) = unbounded();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let window = make_test_window(packet_receiver, exit.clone());
|
||||
// send 5 slots worth of data to the window
|
||||
@ -463,18 +453,24 @@ mod test {
|
||||
})
|
||||
.collect();
|
||||
let mut packets = Packets::new(packets);
|
||||
packet_sender.send(packets.clone()).unwrap();
|
||||
let verified = vec![1; packets.packets.len()];
|
||||
packet_sender
|
||||
.send(vec![(packets.clone(), verified)])
|
||||
.unwrap();
|
||||
sleep(Duration::from_millis(500));
|
||||
|
||||
// add some empty packets to the data set. These should fail to deserialize
|
||||
packets.packets.append(&mut vec![Packet::default(); 10]);
|
||||
packets.packets.shuffle(&mut thread_rng());
|
||||
packet_sender.send(packets.clone()).unwrap();
|
||||
let verified = vec![1; packets.packets.len()];
|
||||
packet_sender
|
||||
.send(vec![(packets.clone(), verified)])
|
||||
.unwrap();
|
||||
sleep(Duration::from_millis(500));
|
||||
|
||||
// send 1 empty packet that cannot deserialize into a shred
|
||||
packet_sender
|
||||
.send(Packets::new(vec![Packet::default(); 1]))
|
||||
.send(vec![(Packets::new(vec![Packet::default(); 1]), vec![1])])
|
||||
.unwrap();
|
||||
sleep(Duration::from_millis(500));
|
||||
|
||||
|
Reference in New Issue
Block a user