Fast-track vote signature verification and processing (#3695)
This commit is contained in:
@ -1,10 +1,11 @@
|
|||||||
use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS};
|
use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS};
|
||||||
use crate::packet;
|
|
||||||
use crate::result::Result;
|
use crate::result::Result;
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::streamer::PacketSender;
|
use crate::sigverify_stage::VerifiedPackets;
|
||||||
|
use crate::{packet, sigverify};
|
||||||
use solana_metrics::counter::Counter;
|
use solana_metrics::counter::Counter;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::mpsc::Sender;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread::{self, sleep, Builder, JoinHandle};
|
use std::thread::{self, sleep, Builder, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@ -17,13 +18,14 @@ impl ClusterInfoVoteListener {
|
|||||||
pub fn new(
|
pub fn new(
|
||||||
exit: &Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||||
sender: PacketSender,
|
sigverify_disabled: bool,
|
||||||
|
sender: Sender<VerifiedPackets>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let exit = exit.clone();
|
let exit = exit.clone();
|
||||||
let thread = Builder::new()
|
let thread = Builder::new()
|
||||||
.name("solana-cluster_info_vote_listener".to_string())
|
.name("solana-cluster_info_vote_listener".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let _ = Self::recv_loop(exit, &cluster_info, &sender);
|
let _ = Self::recv_loop(exit, &cluster_info, sigverify_disabled, &sender);
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
Self {
|
Self {
|
||||||
@ -33,7 +35,8 @@ impl ClusterInfoVoteListener {
|
|||||||
fn recv_loop(
|
fn recv_loop(
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
sender: &PacketSender,
|
sigverify_disabled: bool,
|
||||||
|
sender: &Sender<VerifiedPackets>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut last_ts = 0;
|
let mut last_ts = 0;
|
||||||
loop {
|
loop {
|
||||||
@ -44,9 +47,12 @@ impl ClusterInfoVoteListener {
|
|||||||
last_ts = new_ts;
|
last_ts = new_ts;
|
||||||
inc_new_counter_info!("cluster_info_vote_listener-recv_count", votes.len());
|
inc_new_counter_info!("cluster_info_vote_listener-recv_count", votes.len());
|
||||||
let msgs = packet::to_packets(&votes);
|
let msgs = packet::to_packets(&votes);
|
||||||
for m in msgs {
|
let r = if sigverify_disabled {
|
||||||
sender.send(m)?;
|
sigverify::ed25519_verify_disabled(&msgs)
|
||||||
}
|
} else {
|
||||||
|
sigverify::ed25519_verify_cpu(&msgs)
|
||||||
|
};
|
||||||
|
sender.send(msgs.into_iter().zip(r).collect())?;
|
||||||
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
|
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@ use rand::{thread_rng, Rng};
|
|||||||
use solana_metrics::counter::Counter;
|
use solana_metrics::counter::Counter;
|
||||||
use solana_metrics::{influxdb, submit};
|
use solana_metrics::{influxdb, submit};
|
||||||
use solana_sdk::timing;
|
use solana_sdk::timing;
|
||||||
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
|
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::thread::{self, spawn, JoinHandle};
|
use std::thread::{self, spawn, JoinHandle};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
@ -30,12 +30,12 @@ impl SigVerifyStage {
|
|||||||
pub fn new(
|
pub fn new(
|
||||||
packet_receiver: Receiver<SharedPackets>,
|
packet_receiver: Receiver<SharedPackets>,
|
||||||
sigverify_disabled: bool,
|
sigverify_disabled: bool,
|
||||||
) -> (Self, Receiver<VerifiedPackets>) {
|
verified_sender: Sender<VerifiedPackets>,
|
||||||
|
) -> Self {
|
||||||
sigverify::init();
|
sigverify::init();
|
||||||
let (verified_sender, verified_receiver) = channel();
|
|
||||||
let thread_hdls =
|
let thread_hdls =
|
||||||
Self::verifier_services(packet_receiver, verified_sender, sigverify_disabled);
|
Self::verifier_services(packet_receiver, verified_sender, sigverify_disabled);
|
||||||
(Self { thread_hdls }, verified_receiver)
|
Self { thread_hdls }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_batch(batch: Vec<SharedPackets>, sigverify_disabled: bool) -> VerifiedPackets {
|
fn verify_batch(batch: Vec<SharedPackets>, sigverify_disabled: bool) -> VerifiedPackets {
|
||||||
|
@ -48,13 +48,19 @@ impl Tpu {
|
|||||||
transactions_sockets,
|
transactions_sockets,
|
||||||
tpu_via_blobs_sockets,
|
tpu_via_blobs_sockets,
|
||||||
&exit,
|
&exit,
|
||||||
&packet_sender.clone(),
|
&packet_sender,
|
||||||
);
|
);
|
||||||
let cluster_info_vote_listener =
|
let (verified_sender, verified_receiver) = channel();
|
||||||
ClusterInfoVoteListener::new(&exit, cluster_info.clone(), packet_sender);
|
|
||||||
|
|
||||||
let (sigverify_stage, verified_receiver) =
|
let sigverify_stage =
|
||||||
SigVerifyStage::new(packet_receiver, sigverify_disabled);
|
SigVerifyStage::new(packet_receiver, sigverify_disabled, verified_sender.clone());
|
||||||
|
|
||||||
|
let cluster_info_vote_listener = ClusterInfoVoteListener::new(
|
||||||
|
&exit,
|
||||||
|
cluster_info.clone(),
|
||||||
|
sigverify_disabled,
|
||||||
|
verified_sender,
|
||||||
|
);
|
||||||
|
|
||||||
let banking_stage = BankingStage::new(&cluster_info, poh_recorder, verified_receiver);
|
let banking_stage = BankingStage::new(&cluster_info, poh_recorder, verified_receiver);
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user