Tpu vote 1.7 (#20187)

* Add separate vote processing tpu port

* Add feature to send to tpu vote port

* Add vote rejecting sigverify mode

* use packet.meta.is_simple_vote_tx in place of deserialization

* consolidate code that identifies vote tx atcommon path for cpu and gpu

* new key for feature set

* banking forward tpu vote

* add tpu vote port to dockerfile and other review changes

* Simplify thread id compare

* fix a test; updated cluster_info ABI change

Co-authored-by: Tao Zhu <tao@solana.com>
This commit is contained in:
sakridge
2021-09-29 09:12:58 -07:00
committed by GitHub
parent 47c1730808
commit 257ddbeee1
20 changed files with 415 additions and 144 deletions

View File

@@ -166,6 +166,7 @@ fn main() {
let (verified_sender, verified_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let bank0 = Bank::new(&genesis_config);
let mut bank_forks = BankForks::new(bank0);
@@ -226,6 +227,7 @@ fn main() {
&cluster_info,
&poh_recorder,
verified_receiver,
tpu_vote_receiver,
vote_receiver,
None,
replay_vote_sender,
@@ -381,6 +383,7 @@ fn main() {
);
drop(verified_sender);
drop(tpu_vote_sender);
drop(vote_sender);
exit.store(true, Ordering::Relaxed);
banking_stage.join().unwrap();

View File

@@ -158,6 +158,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
genesis_config.ticks_per_slot = 10_000;
let (verified_sender, verified_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded();
let mut bank = Bank::new(&genesis_config);
// Allow arbitrary transaction processing time for the purposes of this bench
@@ -213,6 +214,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
&cluster_info,
&poh_recorder,
verified_receiver,
tpu_vote_receiver,
vote_receiver,
None,
s,
@@ -259,6 +261,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
start += chunk_len;
start %= verified.len();
});
drop(tpu_vote_sender);
drop(vote_sender);
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();

View File

@@ -6,7 +6,7 @@ use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use lru::LruCache;
use retain_mut::RetainMut;
use solana_gossip::cluster_info::ClusterInfo;
use solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo};
use solana_ledger::{blockstore_processor::TransactionStatusSender, entry::hash_transactions};
use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_debug, inc_new_counter_info};
@@ -49,7 +49,7 @@ use std::{
collections::{HashMap, VecDeque},
env,
mem::size_of,
net::UdpSocket,
net::{SocketAddr, UdpSocket},
ops::DerefMut,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Mutex},
@@ -77,6 +77,9 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
const DEFAULT_LRU_SIZE: usize = 200_000;
const NUM_VOTE_PROCESSING_THREADS: u32 = 2;
const MIN_THREADS_BANKING: u32 = 1;
#[derive(Debug, Default)]
pub struct BankingStageStats {
last_report: AtomicInterval,
@@ -220,6 +223,13 @@ pub enum BufferedPacketsDecision {
Hold,
}
#[derive(Debug, Clone)]
pub enum ForwardOption {
NotForward,
ForwardTpuVote,
ForwardTransaction,
}
impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
#[allow(clippy::new_ret_no_self)]
@@ -227,6 +237,7 @@ impl BankingStage {
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
@@ -235,6 +246,7 @@ impl BankingStage {
cluster_info,
poh_recorder,
verified_receiver,
tpu_verified_vote_receiver,
verified_vote_receiver,
Self::num_threads(),
transaction_status_sender,
@@ -247,6 +259,7 @@ impl BankingStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
@@ -262,13 +275,20 @@ impl BankingStage {
)));
let data_budget = Arc::new(DataBudget::default());
// Many banks that process transactions in parallel.
assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING);
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|i| {
let (verified_receiver, enable_forwarding) = if i < num_threads - 1 {
(verified_receiver.clone(), true)
} else {
// Disable forwarding of vote transactions, as votes are gossiped
(verified_vote_receiver.clone(), false)
let (verified_receiver, forward_option) = match i {
0 => {
// Disable forwarding of vote transactions
// from gossip. Note - votes can also arrive from tpu
(verified_vote_receiver.clone(), ForwardOption::NotForward)
}
1 => (
tpu_verified_vote_receiver.clone(),
ForwardOption::ForwardTpuVote,
),
_ => (verified_receiver.clone(), ForwardOption::ForwardTransaction),
};
let poh_recorder = poh_recorder.clone();
@@ -287,7 +307,7 @@ impl BankingStage {
&poh_recorder,
&cluster_info,
&mut recv_start,
enable_forwarding,
forward_option,
i,
batch_limit,
transaction_status_sender,
@@ -493,7 +513,7 @@ impl BankingStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &ClusterInfo,
buffered_packets: &mut UnprocessedPackets,
enable_forwarding: bool,
forward_option: &ForwardOption,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
@@ -543,7 +563,7 @@ impl BankingStage {
}
BufferedPacketsDecision::Forward => {
Self::handle_forwarding(
enable_forwarding,
forward_option,
cluster_info,
buffered_packets,
poh_recorder,
@@ -554,7 +574,7 @@ impl BankingStage {
}
BufferedPacketsDecision::ForwardAndHold => {
Self::handle_forwarding(
enable_forwarding,
forward_option,
cluster_info,
buffered_packets,
poh_recorder,
@@ -569,7 +589,7 @@ impl BankingStage {
}
fn handle_forwarding(
enable_forwarding: bool,
forward_option: &ForwardOption,
cluster_info: &ClusterInfo,
buffered_packets: &mut UnprocessedPackets,
poh_recorder: &Arc<Mutex<PohRecorder>>,
@@ -577,14 +597,19 @@ impl BankingStage {
hold: bool,
data_budget: &DataBudget,
) {
if !enable_forwarding {
if !hold {
buffered_packets.clear();
let addr = match forward_option {
ForwardOption::NotForward => {
if !hold {
buffered_packets.clear();
}
return;
}
return;
}
let addr = match next_leader_tpu_forwards(cluster_info, poh_recorder) {
ForwardOption::ForwardTransaction => {
next_leader_tpu_forwards(cluster_info, poh_recorder)
}
ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder),
};
let addr = match addr {
Some(addr) => addr,
None => return,
};
@@ -606,7 +631,7 @@ impl BankingStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &ClusterInfo,
recv_start: &mut Instant,
enable_forwarding: bool,
forward_option: ForwardOption,
id: u32,
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
@@ -626,7 +651,7 @@ impl BankingStage {
poh_recorder,
cluster_info,
&mut buffered_packets,
enable_forwarding,
&forward_option,
transaction_status_sender.clone(),
&gossip_vote_sender,
&banking_stage_stats,
@@ -676,13 +701,11 @@ impl BankingStage {
}
pub fn num_threads() -> u32 {
const MIN_THREADS_VOTES: u32 = 1;
const MIN_THREADS_BANKING: u32 = 1;
cmp::max(
env::var("SOLANA_BANKING_THREADS")
.map(|x| x.parse().unwrap_or(NUM_THREADS))
.unwrap_or(NUM_THREADS),
MIN_THREADS_VOTES + MIN_THREADS_BANKING,
NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING,
)
}
@@ -1010,12 +1033,11 @@ impl BankingStage {
.iter()
.filter_map(|tx_index| {
let p = &msgs.packets[*tx_index];
let tx: Transaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?;
if votes_only && !solana_runtime::bank::is_simple_vote_transaction(&tx) {
if votes_only && !p.meta.is_simple_vote_tx {
return None;
}
let tx: Transaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?;
tx.verify_precompiles(libsecp256k1_0_5_upgrade_enabled)
.ok()?;
let message_bytes = Self::packet_message(p)?;
@@ -1403,27 +1425,37 @@ pub(crate) fn next_leader_tpu(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
) -> Option<std::net::SocketAddr> {
if let Some(leader_pubkey) = poh_recorder
.lock()
.unwrap()
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET)
{
cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu)
} else {
None
}
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu)
}
fn next_leader_tpu_forwards(
cluster_info: &ClusterInfo,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Mutex<PohRecorder>,
) -> Option<std::net::SocketAddr> {
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_forwards)
}
pub(crate) fn next_leader_tpu_vote(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
) -> Option<std::net::SocketAddr> {
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_vote)
}
fn next_leader_x<F>(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
port_selector: F,
) -> Option<std::net::SocketAddr>
where
F: FnOnce(&ContactInfo) -> SocketAddr,
{
if let Some(leader_pubkey) = poh_recorder
.lock()
.unwrap()
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET)
{
cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu_forwards)
cluster_info.lookup_contact_info(&leader_pubkey, port_selector)
} else {
None
}
@@ -1459,6 +1491,7 @@ mod tests {
};
use solana_streamer::socket::SocketAddrSpace;
use solana_transaction_status::TransactionWithStatusMeta;
use solana_vote_program::vote_transaction;
use std::{
net::SocketAddr,
path::Path,
@@ -1482,8 +1515,9 @@ mod tests {
let genesis_config = create_genesis_config(2).genesis_config;
let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config));
let (verified_sender, verified_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded();
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let (vote_forward_sender, _vote_forward_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
@@ -1498,12 +1532,14 @@ mod tests {
&cluster_info,
&poh_recorder,
verified_receiver,
vote_receiver,
tpu_vote_receiver,
gossip_verified_vote_receiver,
None,
gossip_vote_sender,
vote_forward_sender,
);
drop(verified_sender);
drop(vote_sender);
drop(gossip_verified_vote_sender);
drop(tpu_vote_sender);
exit.store(true, Ordering::Relaxed);
banking_stage.join().unwrap();
poh_service.join().unwrap();
@@ -1522,7 +1558,7 @@ mod tests {
let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config));
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
@@ -1537,19 +1573,22 @@ mod tests {
create_test_recorder(&bank, &blockstore, Some(poh_config));
let cluster_info = new_test_cluster_info(Node::new_localhost().info);
let cluster_info = Arc::new(cluster_info);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let (vote_forward_sender, _vote_forward_receiver) = unbounded();
let (verified_gossip_vote_sender, verified_gossip_vote_receiver) = unbounded();
let banking_stage = BankingStage::new(
&cluster_info,
&poh_recorder,
verified_receiver,
vote_receiver,
tpu_vote_receiver,
verified_gossip_vote_receiver,
None,
gossip_vote_sender,
vote_forward_sender,
);
trace!("sending bank");
drop(verified_sender);
drop(vote_sender);
drop(verified_gossip_vote_sender);
drop(tpu_vote_sender);
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder);
@@ -1589,7 +1628,8 @@ mod tests {
let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config));
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
@@ -1612,7 +1652,8 @@ mod tests {
&cluster_info,
&poh_recorder,
verified_receiver,
vote_receiver,
tpu_vote_receiver,
gossip_verified_vote_receiver,
None,
gossip_vote_sender,
);
@@ -1652,7 +1693,8 @@ mod tests {
.unwrap();
drop(verified_sender);
drop(vote_sender);
drop(tpu_vote_sender);
drop(gossip_verified_vote_sender);
// wait until banking_stage to finish up all packets
banking_stage.join().unwrap();
@@ -1733,6 +1775,7 @@ mod tests {
verified_sender.send(packets).unwrap();
let (vote_sender, vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path!();
{
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
@@ -1758,8 +1801,9 @@ mod tests {
&cluster_info,
&poh_recorder,
verified_receiver,
tpu_vote_receiver,
vote_receiver,
2,
3,
None,
gossip_vote_sender,
);
@@ -1774,6 +1818,7 @@ mod tests {
};
drop(verified_sender);
drop(vote_sender);
drop(tpu_vote_sender);
// consume the entire entry_receiver, feed it into a new bank
// check that the balance is what we expect.
@@ -2741,7 +2786,7 @@ mod tests {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let data_budget = DataBudget::default();
BankingStage::handle_forwarding(
true,
&ForwardOption::ForwardTransaction,
&cluster_info,
&mut unprocessed_packets,
&poh_recorder,
@@ -2870,36 +2915,123 @@ mod tests {
);
}
#[cfg(test)]
fn make_test_packets(
transactions: Vec<Transaction>,
vote_indexes: Vec<usize>,
) -> (Packets, Vec<usize>) {
let capacity = transactions.len();
let mut packets = Packets::with_capacity(capacity);
let mut packet_indexes = Vec::with_capacity(capacity);
packets.packets.resize(capacity, Packet::default());
for (index, tx) in transactions.iter().enumerate() {
Packet::populate_packet(&mut packets.packets[index], None, tx).ok();
packet_indexes.push(index);
}
for index in vote_indexes.iter() {
packets.packets[*index].meta.is_simple_vote_tx = true;
}
(packets, packet_indexes)
}
#[test]
fn test_transactions_from_packets() {
use solana_vote_program::vote_state::Vote;
solana_logger::setup();
let mut vote_packet = Packet::default();
let vote_instruction = solana_vote_program::vote_instruction::vote(
&Pubkey::new_unique(),
&Pubkey::new_unique(),
Vote::default(),
);
let vote_transaction =
Transaction::new_with_payer(&[vote_instruction], Some(&Pubkey::new_unique()));
Packet::populate_packet(&mut vote_packet, None, &vote_transaction).unwrap();
let mut non_vote = Packet::default();
let tx = system_transaction::transfer(
&Keypair::new(),
&Pubkey::new_unique(),
2,
let keypair = Keypair::new();
let transfer_tx =
system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default());
let vote_tx = vote_transaction::new_vote_transaction(
vec![42],
Hash::default(),
Hash::default(),
&keypair,
&keypair,
&keypair,
None,
);
Packet::populate_packet(&mut non_vote, None, &tx).unwrap();
let msgs = Packets::new(vec![non_vote, vote_packet]);
let packet_indexes = [0, 1];
let (transactions, _transaction_to_packet_indexes) =
BankingStage::transactions_from_packets(&msgs, &packet_indexes, false, true);
assert_eq!(transactions.len(), 1);
assert!(!transactions[0].transaction().signatures.is_empty());
let (transactions, _transaction_to_packet_indexes) =
BankingStage::transactions_from_packets(&msgs, &packet_indexes, false, false);
assert_eq!(transactions.len(), 2);
// packets with no votes
{
let vote_indexes = vec![];
let (packets, packet_indexes) =
make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes);
let mut votes_only = false;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packets,
&packet_indexes,
false,
votes_only,
);
assert_eq!(2, txs.len());
assert_eq!(vec![0, 1], tx_packet_index);
votes_only = true;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packets,
&packet_indexes,
false,
votes_only,
);
assert_eq!(0, txs.len());
assert_eq!(0, tx_packet_index.len());
}
// packets with some votes
{
let vote_indexes = vec![0, 2];
let (packets, packet_indexes) = make_test_packets(
vec![vote_tx.clone(), transfer_tx, vote_tx.clone()],
vote_indexes,
);
let mut votes_only = false;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packets,
&packet_indexes,
false,
votes_only,
);
assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index);
votes_only = true;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packets,
&packet_indexes,
false,
votes_only,
);
assert_eq!(2, txs.len());
assert_eq!(vec![0, 2], tx_packet_index);
}
// packets with all votes
{
let vote_indexes = vec![0, 1, 2];
let (packets, packet_indexes) = make_test_packets(
vec![vote_tx.clone(), vote_tx.clone(), vote_tx],
vote_indexes,
);
let mut votes_only = false;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packets,
&packet_indexes,
false,
votes_only,
);
assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index);
votes_only = true;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packets,
&packet_indexes,
false,
votes_only,
);
assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index);
}
}
}

View File

@@ -351,7 +351,10 @@ impl ClusterInfoVoteListener {
labels: Vec<CrdsValueLabel>,
) -> (Vec<Transaction>, Vec<(CrdsValueLabel, Slot, Packets)>) {
let mut msgs = packet::to_packets_chunked(&votes, 1);
sigverify::ed25519_verify_cpu(&mut msgs);
// Votes should already be filtered by this point.
let reject_non_vote = false;
sigverify::ed25519_verify_cpu(&mut msgs, reject_non_vote);
let (vote_txs, packets) = izip!(labels.into_iter(), votes.into_iter(), msgs,)
.filter_map(|(label, vote, packet)| {

View File

@@ -23,38 +23,49 @@ impl FetchStage {
pub fn new(
sockets: Vec<UdpSocket>,
tpu_forwards_sockets: Vec<UdpSocket>,
tpu_vote_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
coalesce_ms: u64,
) -> (Self, PacketReceiver) {
) -> (Self, PacketReceiver, PacketReceiver) {
let (sender, receiver) = channel();
let (vote_sender, vote_receiver) = channel();
(
Self::new_with_sender(
sockets,
tpu_forwards_sockets,
tpu_vote_sockets,
exit,
&sender,
poh_recorder,
&vote_sender,
&poh_recorder,
coalesce_ms,
),
receiver,
vote_receiver,
)
}
pub fn new_with_sender(
sockets: Vec<UdpSocket>,
tpu_forwards_sockets: Vec<UdpSocket>,
tpu_vote_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>,
sender: &PacketSender,
vote_sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
coalesce_ms: u64,
) -> Self {
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect();
let tpu_vote_sockets = tpu_vote_sockets.into_iter().map(Arc::new).collect();
Self::new_multi_socket(
tx_sockets,
tpu_forwards_sockets,
tpu_vote_sockets,
exit,
sender,
vote_sender,
poh_recorder,
coalesce_ms,
)
@@ -98,8 +109,10 @@ impl FetchStage {
fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>,
tpu_forwards_sockets: Vec<Arc<UdpSocket>>,
tpu_vote_sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>,
sender: &PacketSender,
vote_sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
coalesce_ms: u64,
) -> Self {
@@ -130,6 +143,18 @@ impl FetchStage {
)
});
let tpu_vote_threads = tpu_vote_sockets.into_iter().map(|socket| {
streamer::receiver(
socket,
&exit,
vote_sender.clone(),
recycler.clone(),
"fetch_vote_stage",
coalesce_ms,
true,
)
});
let sender = sender.clone();
let poh_recorder = poh_recorder.clone();
@@ -150,7 +175,10 @@ impl FetchStage {
})
.unwrap();
let mut thread_hdls: Vec<_> = tpu_threads.chain(tpu_forwards_threads).collect();
let mut thread_hdls: Vec<_> = tpu_threads
.chain(tpu_forwards_threads)
.chain(tpu_vote_threads)
.collect();
thread_hdls.push(fwd_thread_hdl);
Self { thread_hdls }
}

View File

@@ -4713,7 +4713,12 @@ mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);
crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
vote_info,
false,
);
let mut cursor = Cursor::default();
let (_, votes) = cluster_info.get_votes(&mut cursor);
@@ -4770,7 +4775,12 @@ mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);
crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
vote_info,
false,
);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1);
let vote_tx = &votes[0];
@@ -4832,7 +4842,12 @@ mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);
crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
vote_info,
false,
);
assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
let (_, votes) = cluster_info.get_votes(&mut cursor);

View File

@@ -738,7 +738,7 @@ mod tests {
repair: socketaddr!("127.0.0.1:1237"),
tpu: socketaddr!("127.0.0.1:1238"),
tpu_forwards: socketaddr!("127.0.0.1:1239"),
unused: socketaddr!("127.0.0.1:1240"),
tpu_vote: socketaddr!("127.0.0.1:1240"),
rpc: socketaddr!("127.0.0.1:1241"),
rpc_pubsub: socketaddr!("127.0.0.1:1242"),
serve_repair: socketaddr!("127.0.0.1:1243"),
@@ -825,7 +825,7 @@ mod tests {
repair: socketaddr!([127, 0, 0, 1], 1237),
tpu: socketaddr!([127, 0, 0, 1], 1238),
tpu_forwards: socketaddr!([127, 0, 0, 1], 1239),
unused: socketaddr!([127, 0, 0, 1], 1240),
tpu_vote: socketaddr!([127, 0, 0, 1], 1240),
rpc: socketaddr!([127, 0, 0, 1], 1241),
rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242),
serve_repair: serve_repair_addr,
@@ -855,7 +855,7 @@ mod tests {
repair: socketaddr!([127, 0, 0, 1], 1237),
tpu: socketaddr!([127, 0, 0, 1], 1238),
tpu_forwards: socketaddr!([127, 0, 0, 1], 1239),
unused: socketaddr!([127, 0, 0, 1], 1240),
tpu_vote: socketaddr!([127, 0, 0, 1], 1240),
rpc: socketaddr!([127, 0, 0, 1], 1241),
rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242),
serve_repair: serve_repair_addr2,

View File

@@ -17,6 +17,16 @@ pub use solana_perf::sigverify::{
pub struct TransactionSigVerifier {
recycler: Recycler<TxOffset>,
recycler_out: Recycler<PinnedVec<u8>>,
reject_non_vote: bool,
}
impl TransactionSigVerifier {
pub fn new_reject_non_vote() -> Self {
TransactionSigVerifier {
reject_non_vote: true,
..TransactionSigVerifier::default()
}
}
}
impl Default for TransactionSigVerifier {
@@ -25,13 +35,19 @@ impl Default for TransactionSigVerifier {
Self {
recycler: Recycler::warmed(50, 4096),
recycler_out: Recycler::warmed(50, 4096),
reject_non_vote: false,
}
}
}
impl SigVerifier for TransactionSigVerifier {
fn verify_batch(&self, mut batch: Vec<Packets>) -> Vec<Packets> {
sigverify::ed25519_verify(&mut batch, &self.recycler, &self.recycler_out);
sigverify::ed25519_verify(
&mut batch,
&self.recycler,
&self.recycler_out,
self.reject_non_vote,
);
batch
}
}

View File

@@ -39,6 +39,7 @@ pub const DEFAULT_TPU_COALESCE_MS: u64 = 5;
pub struct Tpu {
fetch_stage: FetchStage,
sigverify_stage: SigVerifyStage,
vote_sigverify_stage: SigVerifyStage,
banking_stage: BankingStage,
cluster_info_vote_listener: ClusterInfoVoteListener,
broadcast_stage: BroadcastStage,
@@ -53,6 +54,7 @@ impl Tpu {
retransmit_slots_receiver: RetransmitSlotsReceiver,
transactions_sockets: Vec<UdpSocket>,
tpu_forwards_sockets: Vec<UdpSocket>,
tpu_vote_sockets: Vec<UdpSocket>,
broadcast_sockets: Vec<UdpSocket>,
subscriptions: &Arc<RpcSubscriptions>,
transaction_status_sender: Option<TransactionStatusSender>,
@@ -71,11 +73,14 @@ impl Tpu {
cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender,
) -> Self {
let (packet_sender, packet_receiver) = channel();
let (vote_packet_sender, vote_packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender(
transactions_sockets,
tpu_forwards_sockets,
tpu_vote_sockets,
exit,
&packet_sender,
&vote_packet_sender,
poh_recorder,
tpu_coalesce_ms,
);
@@ -86,11 +91,23 @@ impl Tpu {
SigVerifyStage::new(packet_receiver, verified_sender, verifier)
};
let (verified_vote_packets_sender, verified_vote_packets_receiver) = unbounded();
let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded();
let vote_sigverify_stage = {
let verifier = TransactionSigVerifier::new_reject_non_vote();
SigVerifyStage::new(
vote_packet_receiver,
verified_tpu_vote_packets_sender,
verifier,
)
};
let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) =
unbounded();
let cluster_info_vote_listener = ClusterInfoVoteListener::new(
exit,
cluster_info.clone(),
verified_vote_packets_sender,
verified_gossip_vote_packets_sender,
poh_recorder,
vote_tracker,
bank_forks.clone(),
@@ -107,7 +124,8 @@ impl Tpu {
cluster_info,
poh_recorder,
verified_receiver,
verified_vote_packets_receiver,
verified_tpu_vote_packets_receiver,
verified_gossip_vote_packets_receiver,
transaction_status_sender,
replay_vote_sender,
);
@@ -126,6 +144,7 @@ impl Tpu {
Self {
fetch_stage,
sigverify_stage,
vote_sigverify_stage,
banking_stage,
cluster_info_vote_listener,
broadcast_stage,
@@ -136,6 +155,7 @@ impl Tpu {
let results = vec![
self.fetch_stage.join(),
self.sigverify_stage.join(),
self.vote_sigverify_stage.join(),
self.cluster_info_vote_listener.join(),
self.banking_stage.join(),
];

View File

@@ -276,8 +276,12 @@ impl Tvu {
};
let (voting_sender, voting_receiver) = channel();
let voting_service =
VotingService::new(voting_receiver, cluster_info.clone(), poh_recorder.clone());
let voting_service = VotingService::new(
voting_receiver,
cluster_info.clone(),
poh_recorder.clone(),
bank_forks.clone(),
);
let replay_stage = ReplayStage::new(
replay_stage_config,

View File

@@ -764,6 +764,7 @@ impl Validator {
retransmit_slots_receiver,
node.sockets.tpu,
node.sockets.tpu_forwards,
node.sockets.tpu_vote,
node.sockets.broadcast,
&rpc_subscriptions,
transaction_status_sender,

View File

@@ -1,8 +1,9 @@
use solana_gossip::cluster_info::ClusterInfo;
use solana_poh::poh_recorder::PohRecorder;
use solana_runtime::bank_forks::BankForks;
use solana_sdk::{clock::Slot, transaction::Transaction};
use std::{
sync::{mpsc::Receiver, Arc, Mutex},
sync::{mpsc::Receiver, Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
};
@@ -38,12 +39,15 @@ impl VotingService {
vote_receiver: Receiver<VoteOp>,
cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<Mutex<PohRecorder>>,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
let thread_hdl = Builder::new()
.name("sol-vote-service".to_string())
.spawn(move || {
for vote_op in vote_receiver.iter() {
Self::handle_vote(&cluster_info, &poh_recorder, vote_op);
let rooted_bank = bank_forks.read().unwrap().root_bank().clone();
let send_to_tpu_vote_port = rooted_bank.send_to_tpu_vote_port_enabled();
Self::handle_vote(&cluster_info, &poh_recorder, vote_op, send_to_tpu_vote_port);
}
})
.unwrap();
@@ -54,11 +58,14 @@ impl VotingService {
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
vote_op: VoteOp,
send_to_tpu_vote_port: bool,
) {
let _ = cluster_info.send_vote(
vote_op.tx(),
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder),
);
let target_address = if send_to_tpu_vote_port {
crate::banking_stage::next_leader_tpu_vote(cluster_info, poh_recorder)
} else {
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder)
};
let _ = cluster_info.send_vote(vote_op.tx(), target_address);
match vote_op {
VoteOp::PushVote { tx, tower_slots } => {

View File

@@ -255,7 +255,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "GANv3KVkTYF84kmg1bAuWEZd9MaiYzPquuu13hup3379")]
#[frozen_abi(digest = "3qq56sFGXGbNqr7qKq8x47t144ugdfv5adCkVJUMnMf3")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
@@ -759,7 +759,7 @@ impl ClusterInfo {
};
let ip_addr = node.gossip.ip();
Some(format!(
"{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n",
"{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n",
if ContactInfo::is_valid_address(&node.gossip, &self.socket_addr_space) {
ip_addr.to_string()
} else {
@@ -774,6 +774,7 @@ impl ClusterInfo {
"-".to_string()
},
addr_to_string(&ip_addr, &node.gossip),
addr_to_string(&ip_addr, &node.tpu_vote),
addr_to_string(&ip_addr, &node.tpu),
addr_to_string(&ip_addr, &node.tpu_forwards),
addr_to_string(&ip_addr, &node.tvu),
@@ -788,9 +789,9 @@ impl ClusterInfo {
format!(
"IP Address |Age(ms)| Node identifier \
| Version |Gossip| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer\n\
| Version |Gossip|TPUvote| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer\n\
------------------+-------+----------------------------------------------+---------+\
------+------+------+------+------+------+------+--------\n\
------+------+-------+------+------+------+------+------+--------\n\
{}\
Nodes: {}{}{}",
nodes.join(""),
@@ -2705,6 +2706,7 @@ pub struct Sockets {
pub tvu_forwards: Vec<UdpSocket>,
pub tpu: Vec<UdpSocket>,
pub tpu_forwards: Vec<UdpSocket>,
pub tpu_vote: Vec<UdpSocket>,
pub broadcast: Vec<UdpSocket>,
pub repair: UdpSocket,
pub retransmit_sockets: Vec<UdpSocket>,
@@ -2731,6 +2733,7 @@ impl Node {
let tvu = UdpSocket::bind("127.0.0.1:0").unwrap();
let tvu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap();
let tpu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap();
let tpu_vote = UdpSocket::bind("127.0.0.1:0").unwrap();
let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
let rpc_port = find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap();
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port);
@@ -2741,7 +2744,6 @@ impl Node {
let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()];
let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap();
let unused = UdpSocket::bind("0.0.0.0:0").unwrap();
let info = ContactInfo {
id: *pubkey,
gossip: gossip_addr,
@@ -2750,7 +2752,7 @@ impl Node {
repair: repair.local_addr().unwrap(),
tpu: tpu.local_addr().unwrap(),
tpu_forwards: tpu_forwards.local_addr().unwrap(),
unused: unused.local_addr().unwrap(),
tpu_vote: tpu_vote.local_addr().unwrap(),
rpc: rpc_addr,
rpc_pubsub: rpc_pubsub_addr,
serve_repair: serve_repair.local_addr().unwrap(),
@@ -2766,6 +2768,7 @@ impl Node {
tvu_forwards: vec![tvu_forwards],
tpu: vec![tpu],
tpu_forwards: vec![tpu_forwards],
tpu_vote: vec![tpu_vote],
broadcast,
repair,
retransmit_sockets: vec![retransmit_socket],
@@ -2806,6 +2809,7 @@ impl Node {
let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range);
let (tpu_port, tpu) = Self::bind(bind_ip_addr, port_range);
let (tpu_forwards_port, tpu_forwards) = Self::bind(bind_ip_addr, port_range);
let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range);
let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
let (repair_port, repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
@@ -2822,7 +2826,7 @@ impl Node {
repair: SocketAddr::new(gossip_addr.ip(), repair_port),
tpu: SocketAddr::new(gossip_addr.ip(), tpu_port),
tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port),
unused: socketaddr_any!(),
tpu_vote: SocketAddr::new(gossip_addr.ip(), tpu_vote_port),
rpc: SocketAddr::new(gossip_addr.ip(), rpc_port),
rpc_pubsub: SocketAddr::new(gossip_addr.ip(), rpc_pubsub_port),
serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port),
@@ -2840,6 +2844,7 @@ impl Node {
tvu_forwards: vec![tvu_forwards],
tpu: vec![tpu],
tpu_forwards: vec![tpu_forwards],
tpu_vote: vec![tpu_vote],
broadcast: vec![broadcast],
repair,
retransmit_sockets: vec![retransmit_socket],
@@ -2869,6 +2874,9 @@ impl Node {
let (tpu_forwards_port, tpu_forwards_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind");
let (tpu_vote_port, tpu_vote_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind");
let (_, retransmit_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("retransmit multi_bind");
@@ -2886,7 +2894,7 @@ impl Node {
repair: SocketAddr::new(gossip_addr.ip(), repair_port),
tpu: SocketAddr::new(gossip_addr.ip(), tpu_port),
tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port),
unused: socketaddr_any!(),
tpu_vote: SocketAddr::new(gossip_addr.ip(), tpu_vote_port),
rpc: socketaddr_any!(),
rpc_pubsub: socketaddr_any!(),
serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port),
@@ -2903,6 +2911,7 @@ impl Node {
tvu_forwards: tvu_forwards_sockets,
tpu: tpu_sockets,
tpu_forwards: tpu_forwards_sockets,
tpu_vote: tpu_vote_sockets,
broadcast,
repair,
retransmit_sockets,

View File

@@ -28,7 +28,7 @@ pub struct ContactInfo {
/// address to forward unprocessed transactions to
pub tpu_forwards: SocketAddr,
/// address to which to send bank state requests
pub unused: SocketAddr,
pub tpu_vote: SocketAddr,
/// address to which to send JSON-RPC requests
pub rpc: SocketAddr,
/// websocket for JSON-RPC push notifications
@@ -76,7 +76,7 @@ impl Default for ContactInfo {
repair: socketaddr_any!(),
tpu: socketaddr_any!(),
tpu_forwards: socketaddr_any!(),
unused: socketaddr_any!(),
tpu_vote: socketaddr_any!(),
rpc: socketaddr_any!(),
rpc_pubsub: socketaddr_any!(),
serve_repair: socketaddr_any!(),
@@ -96,7 +96,7 @@ impl ContactInfo {
repair: socketaddr!("127.0.0.1:1237"),
tpu: socketaddr!("127.0.0.1:1238"),
tpu_forwards: socketaddr!("127.0.0.1:1239"),
unused: socketaddr!("127.0.0.1:1240"),
tpu_vote: socketaddr!("127.0.0.1:1240"),
rpc: socketaddr!("127.0.0.1:1241"),
rpc_pubsub: socketaddr!("127.0.0.1:1242"),
serve_repair: socketaddr!("127.0.0.1:1243"),
@@ -126,7 +126,7 @@ impl ContactInfo {
repair: addr,
tpu: addr,
tpu_forwards: addr,
unused: addr,
tpu_vote: addr,
rpc: addr,
rpc_pubsub: addr,
serve_repair: addr,
@@ -152,6 +152,7 @@ impl ContactInfo {
let rpc = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT);
let rpc_pubsub = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT);
let serve_repair = next_port(bind_addr, 6);
let tpu_vote = next_port(&bind_addr, 7);
Self {
id: *pubkey,
gossip,
@@ -160,7 +161,7 @@ impl ContactInfo {
repair,
tpu,
tpu_forwards,
unused: "0.0.0.0:0".parse().unwrap(),
tpu_vote,
rpc,
rpc_pubsub,
serve_repair,
@@ -262,7 +263,7 @@ mod tests {
assert!(ci.rpc.ip().is_unspecified());
assert!(ci.rpc_pubsub.ip().is_unspecified());
assert!(ci.tpu.ip().is_unspecified());
assert!(ci.unused.ip().is_unspecified());
assert!(ci.tpu_vote.ip().is_unspecified());
assert!(ci.serve_repair.ip().is_unspecified());
}
#[test]
@@ -274,7 +275,7 @@ mod tests {
assert!(ci.rpc.ip().is_multicast());
assert!(ci.rpc_pubsub.ip().is_multicast());
assert!(ci.tpu.ip().is_multicast());
assert!(ci.unused.ip().is_multicast());
assert!(ci.tpu_vote.ip().is_multicast());
assert!(ci.serve_repair.ip().is_multicast());
}
#[test]
@@ -287,7 +288,7 @@ mod tests {
assert!(ci.rpc.ip().is_unspecified());
assert!(ci.rpc_pubsub.ip().is_unspecified());
assert!(ci.tpu.ip().is_unspecified());
assert!(ci.unused.ip().is_unspecified());
assert!(ci.tpu_vote.ip().is_unspecified());
assert!(ci.serve_repair.ip().is_unspecified());
}
#[test]
@@ -295,12 +296,12 @@ mod tests {
let addr = socketaddr!("127.0.0.1:10");
let ci = ContactInfo::new_with_socketaddr(&addr);
assert_eq!(ci.tpu, addr);
assert_eq!(ci.tpu_vote.port(), 17);
assert_eq!(ci.gossip.port(), 11);
assert_eq!(ci.tvu.port(), 12);
assert_eq!(ci.tpu_forwards.port(), 13);
assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT);
assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT);
assert!(ci.unused.ip().is_unspecified());
assert_eq!(ci.serve_repair.port(), 16);
}
@@ -327,6 +328,7 @@ mod tests {
assert_eq!(d1.tvu_forwards, socketaddr!("127.0.0.1:1238"));
assert_eq!(d1.repair, socketaddr!("127.0.0.1:1239"));
assert_eq!(d1.serve_repair, socketaddr!("127.0.0.1:1240"));
assert_eq!(d1.tpu_vote, socketaddr!("127.0.0.1:1241"));
}
#[test]

View File

@@ -19,7 +19,7 @@ fn bench_sigverify(bencher: &mut Bencher) {
let recycler_out = Recycler::default();
// verify packets
bencher.iter(|| {
let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out);
let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false);
})
}
@@ -34,6 +34,6 @@ fn bench_get_offsets(bencher: &mut Bencher) {
let recycler = Recycler::default();
// verify packets
bencher.iter(|| {
let _ans = sigverify::generate_offsets(&mut batches, &recycler);
let _ans = sigverify::generate_offsets(&mut batches, &recycler, false);
})
}

View File

@@ -103,8 +103,8 @@ pub fn init() {
}
}
fn verify_packet(packet: &mut Packet) {
let packet_offsets = get_packet_offsets(packet, 0);
fn verify_packet(packet: &mut Packet, reject_non_vote: bool) {
let packet_offsets = get_packet_offsets(packet, 0, reject_non_vote);
let mut sig_start = packet_offsets.sig_start as usize;
let mut pubkey_start = packet_offsets.pubkey_start as usize;
let msg_start = packet_offsets.msg_start as usize;
@@ -249,15 +249,20 @@ fn do_get_packet_offsets(
))
}
fn get_packet_offsets(packet: &mut Packet, current_offset: usize) -> PacketOffsets {
fn get_packet_offsets(
packet: &mut Packet,
current_offset: usize,
reject_non_vote: bool,
) -> PacketOffsets {
let unsanitized_packet_offsets = do_get_packet_offsets(packet, current_offset);
if let Ok(offsets) = unsanitized_packet_offsets {
check_for_simple_vote_transaction(packet, &offsets, current_offset).ok();
offsets
} else {
// force sigverify to fail by returning zeros
PacketOffsets::new(0, 0, 0, 0, 0)
if !reject_non_vote || packet.meta.is_simple_vote_tx {
return offsets;
}
}
// force sigverify to fail by returning zeros
PacketOffsets::new(0, 0, 0, 0, 0)
}
fn check_for_simple_vote_transaction(
@@ -328,7 +333,11 @@ fn check_for_simple_vote_transaction(
Ok(())
}
pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler<TxOffset>) -> TxOffsets {
pub fn generate_offsets(
batches: &mut [Packets],
recycler: &Recycler<TxOffset>,
reject_non_vote: bool,
) -> TxOffsets {
debug!("allocating..");
let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets");
signature_offsets.set_pinnable();
@@ -343,7 +352,7 @@ pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler<TxOffset>)
batches.iter_mut().for_each(|p| {
let mut sig_lens = Vec::new();
p.packets.iter_mut().for_each(|packet| {
let packet_offsets = get_packet_offsets(packet, current_offset);
let packet_offsets = get_packet_offsets(packet, current_offset, reject_non_vote);
sig_lens.push(packet_offsets.sig_len);
@@ -377,14 +386,16 @@ pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler<TxOffset>)
)
}
pub fn ed25519_verify_cpu(batches: &mut [Packets]) {
pub fn ed25519_verify_cpu(batches: &mut [Packets], reject_non_vote: bool) {
use rayon::prelude::*;
let count = batch_size(batches);
debug!("CPU ECDSA for {}", batch_size(batches));
PAR_THREAD_POOL.install(|| {
batches
.into_par_iter()
.for_each(|p| p.packets.par_iter_mut().for_each(|p| verify_packet(p)))
batches.into_par_iter().for_each(|p| {
p.packets
.par_iter_mut()
.for_each(|p| verify_packet(p, reject_non_vote))
})
});
inc_new_counter_debug!("ed25519_verify_cpu", count);
}
@@ -464,10 +475,11 @@ pub fn ed25519_verify(
batches: &mut [Packets],
recycler: &Recycler<TxOffset>,
recycler_out: &Recycler<PinnedVec<u8>>,
reject_non_vote: bool,
) {
let api = perf_libs::api();
if api.is_none() {
return ed25519_verify_cpu(batches);
return ed25519_verify_cpu(batches, reject_non_vote);
}
let api = api.unwrap();
@@ -480,11 +492,11 @@ pub fn ed25519_verify(
// may be busy doing other things while being a real validator
// TODO: dynamically adjust this crossover
if count < 64 {
return ed25519_verify_cpu(batches);
return ed25519_verify_cpu(batches, reject_non_vote);
}
let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) =
generate_offsets(batches, recycler);
generate_offsets(batches, recycler, reject_non_vote);
debug!("CUDA ECDSA for {}", batch_size(batches));
debug!("allocating out..");
@@ -599,7 +611,7 @@ mod tests {
let message_data = tx.message_data();
let mut packet = sigverify::make_packet_from_transaction(tx.clone());
let packet_offsets = sigverify::get_packet_offsets(&mut packet, 0);
let packet_offsets = sigverify::get_packet_offsets(&mut packet, 0, false);
assert_eq!(
memfind(&tx_bytes, tx.signatures[0].as_ref()),
@@ -694,7 +706,7 @@ mod tests {
let res = sigverify::do_get_packet_offsets(&packet, 0);
assert_eq!(res, Err(PacketError::InvalidPubkeyLen));
verify_packet(&mut packet);
verify_packet(&mut packet, false);
assert!(packet.meta.discard);
packet.meta.discard = false;
@@ -730,7 +742,7 @@ mod tests {
let res = sigverify::do_get_packet_offsets(&packet, 0);
assert_eq!(res, Err(PacketError::InvalidPubkeyLen));
verify_packet(&mut packet);
verify_packet(&mut packet, false);
assert!(packet.meta.discard);
packet.meta.discard = false;
@@ -824,7 +836,8 @@ mod tests {
// Just like get_packet_offsets, but not returning redundant information.
fn get_packet_offsets_from_tx(tx: Transaction, current_offset: u32) -> PacketOffsets {
let mut packet = sigverify::make_packet_from_transaction(tx);
let packet_offsets = sigverify::get_packet_offsets(&mut packet, current_offset as usize);
let packet_offsets =
sigverify::get_packet_offsets(&mut packet, current_offset as usize, false);
PacketOffsets::new(
packet_offsets.sig_len,
packet_offsets.sig_start - current_offset,
@@ -905,7 +918,7 @@ mod tests {
fn ed25519_verify(batches: &mut [Packets]) {
let recycler = Recycler::default();
let recycler_out = Recycler::default();
sigverify::ed25519_verify(batches, &recycler, &recycler_out);
sigverify::ed25519_verify(batches, &recycler, &recycler_out, false);
}
#[test]
@@ -1003,8 +1016,8 @@ mod tests {
// verify from GPU verification pipeline (when GPU verification is enabled) are
// equivalent to the CPU verification pipeline.
let mut batches_cpu = batches.clone();
sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out);
ed25519_verify_cpu(&mut batches_cpu);
sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false);
ed25519_verify_cpu(&mut batches_cpu, false);
// check result
batches

View File

@@ -5135,6 +5135,11 @@ impl Bank {
.is_active(&feature_set::stakes_remove_delegation_if_inactive::id())
}
pub fn send_to_tpu_vote_port_enabled(&self) -> bool {
self.feature_set
.is_active(&feature_set::send_to_tpu_vote_port::id())
}
// Check if the wallclock time from bank creation to now has exceeded the allotted
// time for transaction processing
pub fn should_bank_still_be_processing_txs(
@@ -5449,7 +5454,7 @@ pub fn goto_end_of_slot(bank: &mut Bank) {
}
}
pub fn is_simple_vote_transaction(transaction: &Transaction) -> bool {
fn is_simple_vote_transaction(transaction: &Transaction) -> bool {
if transaction.message.instructions.len() == 1 {
let instruction = &transaction.message.instructions[0];
let program_pubkey =

View File

@@ -30,6 +30,8 @@ EXPOSE 8006/udp
EXPOSE 8007/udp
# broadcast
EXPOSE 8008/udp
# tpu_vote
EXPOSE 8009/udp
RUN apt update && \
apt-get install -y bzip2 libssl-dev && \

View File

@@ -215,6 +215,10 @@ pub mod stakes_remove_delegation_if_inactive {
solana_sdk::declare_id!("HFpdDDNQjvcXnXKec697HDDsyk6tFoWS2o8fkxuhQZpL");
}
pub mod send_to_tpu_vote_port {
solana_sdk::declare_id!("C5fh68nJ7uyKAuYZg2x9sEQ5YrVf3dkW6oojNBSc3Jvo");
}
lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
@@ -269,6 +273,7 @@ lazy_static! {
(fix_write_privs::id(), "fix native invoke write privileges"),
(reduce_required_deploy_balance::id(), "reduce required payer balance for program deploys"),
(stakes_remove_delegation_if_inactive::id(), "remove delegations from stakes cache when inactive"),
(send_to_tpu_vote_port::id(), "Send votes to the tpu vote port"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()

View File

@@ -681,6 +681,9 @@ fn verify_reachable_ports(
if ContactInfo::is_valid_address(&node.info.tpu_forwards, socket_addr_space) {
udp_sockets.extend(node.sockets.tpu_forwards.iter());
}
if ContactInfo::is_valid_address(&node.info.tpu_vote, socket_addr_space) {
udp_sockets.extend(node.sockets.tpu_vote.iter());
}
if ContactInfo::is_valid_address(&node.info.tvu, socket_addr_space) {
udp_sockets.extend(node.sockets.tvu.iter());
udp_sockets.extend(node.sockets.broadcast.iter());