* Add separate vote processing tpu port * Add feature to send to tpu vote port * Add vote rejecting sigverify mode * use packet.meta.is_simple_vote_tx in place of deserialization * consolidate code that identifies vote tx atcommon path for cpu and gpu * new key for feature set * banking forward tpu vote * add tpu vote port to dockerfile and other review changes * Simplify thread id compare * fix a test; updated cluster_info ABI change Co-authored-by: Tao Zhu <tao@solana.com> Co-authored-by: sakridge <sakridge@gmail.com>
This commit is contained in:
@ -166,6 +166,7 @@ fn main() {
|
|||||||
|
|
||||||
let (verified_sender, verified_receiver) = unbounded();
|
let (verified_sender, verified_receiver) = unbounded();
|
||||||
let (vote_sender, vote_receiver) = unbounded();
|
let (vote_sender, vote_receiver) = unbounded();
|
||||||
|
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
|
||||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||||
let bank0 = Bank::new_for_benches(&genesis_config);
|
let bank0 = Bank::new_for_benches(&genesis_config);
|
||||||
let mut bank_forks = BankForks::new(bank0);
|
let mut bank_forks = BankForks::new(bank0);
|
||||||
@ -227,6 +228,7 @@ fn main() {
|
|||||||
&cluster_info,
|
&cluster_info,
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
verified_receiver,
|
verified_receiver,
|
||||||
|
tpu_vote_receiver,
|
||||||
vote_receiver,
|
vote_receiver,
|
||||||
None,
|
None,
|
||||||
replay_vote_sender,
|
replay_vote_sender,
|
||||||
@ -384,6 +386,7 @@ fn main() {
|
|||||||
);
|
);
|
||||||
|
|
||||||
drop(verified_sender);
|
drop(verified_sender);
|
||||||
|
drop(tpu_vote_sender);
|
||||||
drop(vote_sender);
|
drop(vote_sender);
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
banking_stage.join().unwrap();
|
banking_stage.join().unwrap();
|
||||||
|
@ -165,6 +165,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
|
|||||||
genesis_config.ticks_per_slot = 10_000;
|
genesis_config.ticks_per_slot = 10_000;
|
||||||
|
|
||||||
let (verified_sender, verified_receiver) = unbounded();
|
let (verified_sender, verified_receiver) = unbounded();
|
||||||
|
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
|
||||||
let (vote_sender, vote_receiver) = unbounded();
|
let (vote_sender, vote_receiver) = unbounded();
|
||||||
let mut bank = Bank::new_for_benches(&genesis_config);
|
let mut bank = Bank::new_for_benches(&genesis_config);
|
||||||
// Allow arbitrary transaction processing time for the purposes of this bench
|
// Allow arbitrary transaction processing time for the purposes of this bench
|
||||||
@ -220,6 +221,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
|
|||||||
&cluster_info,
|
&cluster_info,
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
verified_receiver,
|
verified_receiver,
|
||||||
|
tpu_vote_receiver,
|
||||||
vote_receiver,
|
vote_receiver,
|
||||||
None,
|
None,
|
||||||
s,
|
s,
|
||||||
@ -269,6 +271,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
|
|||||||
start += chunk_len;
|
start += chunk_len;
|
||||||
start %= verified.len();
|
start %= verified.len();
|
||||||
});
|
});
|
||||||
|
drop(tpu_vote_sender);
|
||||||
drop(vote_sender);
|
drop(vote_sender);
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
poh_service.join().unwrap();
|
poh_service.join().unwrap();
|
||||||
|
@ -9,7 +9,7 @@ use itertools::Itertools;
|
|||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
use retain_mut::RetainMut;
|
use retain_mut::RetainMut;
|
||||||
use solana_entry::entry::hash_transactions;
|
use solana_entry::entry::hash_transactions;
|
||||||
use solana_gossip::cluster_info::ClusterInfo;
|
use solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo};
|
||||||
use solana_ledger::blockstore_processor::TransactionStatusSender;
|
use solana_ledger::blockstore_processor::TransactionStatusSender;
|
||||||
use solana_measure::measure::Measure;
|
use solana_measure::measure::Measure;
|
||||||
use solana_metrics::{inc_new_counter_debug, inc_new_counter_info};
|
use solana_metrics::{inc_new_counter_debug, inc_new_counter_info};
|
||||||
@ -52,7 +52,7 @@ use std::{
|
|||||||
collections::{HashMap, VecDeque},
|
collections::{HashMap, VecDeque},
|
||||||
env,
|
env,
|
||||||
mem::size_of,
|
mem::size_of,
|
||||||
net::UdpSocket,
|
net::{SocketAddr, UdpSocket},
|
||||||
ops::DerefMut,
|
ops::DerefMut,
|
||||||
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
|
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||||
sync::{Arc, Mutex, RwLock},
|
sync::{Arc, Mutex, RwLock},
|
||||||
@ -80,6 +80,9 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
|
|||||||
|
|
||||||
const DEFAULT_LRU_SIZE: usize = 200_000;
|
const DEFAULT_LRU_SIZE: usize = 200_000;
|
||||||
|
|
||||||
|
const NUM_VOTE_PROCESSING_THREADS: u32 = 2;
|
||||||
|
const MIN_THREADS_BANKING: u32 = 1;
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct BankingStageStats {
|
pub struct BankingStageStats {
|
||||||
last_report: AtomicInterval,
|
last_report: AtomicInterval,
|
||||||
@ -267,6 +270,13 @@ pub enum BufferedPacketsDecision {
|
|||||||
Hold,
|
Hold,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum ForwardOption {
|
||||||
|
NotForward,
|
||||||
|
ForwardTpuVote,
|
||||||
|
ForwardTransaction,
|
||||||
|
}
|
||||||
|
|
||||||
impl BankingStage {
|
impl BankingStage {
|
||||||
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
|
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
|
||||||
#[allow(clippy::new_ret_no_self)]
|
#[allow(clippy::new_ret_no_self)]
|
||||||
@ -274,6 +284,7 @@ impl BankingStage {
|
|||||||
cluster_info: &Arc<ClusterInfo>,
|
cluster_info: &Arc<ClusterInfo>,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||||
|
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||||
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
|
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
gossip_vote_sender: ReplayVoteSender,
|
gossip_vote_sender: ReplayVoteSender,
|
||||||
@ -283,6 +294,7 @@ impl BankingStage {
|
|||||||
cluster_info,
|
cluster_info,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
verified_receiver,
|
verified_receiver,
|
||||||
|
tpu_verified_vote_receiver,
|
||||||
verified_vote_receiver,
|
verified_vote_receiver,
|
||||||
Self::num_threads(),
|
Self::num_threads(),
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
@ -296,6 +308,7 @@ impl BankingStage {
|
|||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||||
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
|
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||||
|
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||||
num_threads: u32,
|
num_threads: u32,
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
gossip_vote_sender: ReplayVoteSender,
|
gossip_vote_sender: ReplayVoteSender,
|
||||||
@ -311,13 +324,20 @@ impl BankingStage {
|
|||||||
)));
|
)));
|
||||||
let data_budget = Arc::new(DataBudget::default());
|
let data_budget = Arc::new(DataBudget::default());
|
||||||
// Many banks that process transactions in parallel.
|
// Many banks that process transactions in parallel.
|
||||||
|
assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING);
|
||||||
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
|
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
|
||||||
.map(|i| {
|
.map(|i| {
|
||||||
let (verified_receiver, enable_forwarding) = if i < num_threads - 1 {
|
let (verified_receiver, forward_option) = match i {
|
||||||
(verified_receiver.clone(), true)
|
0 => {
|
||||||
} else {
|
// Disable forwarding of vote transactions
|
||||||
// Disable forwarding of vote transactions, as votes are gossiped
|
// from gossip. Note - votes can also arrive from tpu
|
||||||
(verified_vote_receiver.clone(), false)
|
(verified_vote_receiver.clone(), ForwardOption::NotForward)
|
||||||
|
}
|
||||||
|
1 => (
|
||||||
|
tpu_verified_vote_receiver.clone(),
|
||||||
|
ForwardOption::ForwardTpuVote,
|
||||||
|
),
|
||||||
|
_ => (verified_receiver.clone(), ForwardOption::ForwardTransaction),
|
||||||
};
|
};
|
||||||
|
|
||||||
let poh_recorder = poh_recorder.clone();
|
let poh_recorder = poh_recorder.clone();
|
||||||
@ -336,7 +356,7 @@ impl BankingStage {
|
|||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&mut recv_start,
|
&mut recv_start,
|
||||||
enable_forwarding,
|
forward_option,
|
||||||
i,
|
i,
|
||||||
batch_limit,
|
batch_limit,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
@ -586,7 +606,7 @@ impl BankingStage {
|
|||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
cluster_info: &ClusterInfo,
|
cluster_info: &ClusterInfo,
|
||||||
buffered_packets: &mut UnprocessedPackets,
|
buffered_packets: &mut UnprocessedPackets,
|
||||||
enable_forwarding: bool,
|
forward_option: &ForwardOption,
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
gossip_vote_sender: &ReplayVoteSender,
|
gossip_vote_sender: &ReplayVoteSender,
|
||||||
banking_stage_stats: &BankingStageStats,
|
banking_stage_stats: &BankingStageStats,
|
||||||
@ -649,7 +669,7 @@ impl BankingStage {
|
|||||||
}
|
}
|
||||||
BufferedPacketsDecision::Forward => {
|
BufferedPacketsDecision::Forward => {
|
||||||
Self::handle_forwarding(
|
Self::handle_forwarding(
|
||||||
enable_forwarding,
|
forward_option,
|
||||||
cluster_info,
|
cluster_info,
|
||||||
buffered_packets,
|
buffered_packets,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
@ -660,7 +680,7 @@ impl BankingStage {
|
|||||||
}
|
}
|
||||||
BufferedPacketsDecision::ForwardAndHold => {
|
BufferedPacketsDecision::ForwardAndHold => {
|
||||||
Self::handle_forwarding(
|
Self::handle_forwarding(
|
||||||
enable_forwarding,
|
forward_option,
|
||||||
cluster_info,
|
cluster_info,
|
||||||
buffered_packets,
|
buffered_packets,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
@ -675,7 +695,7 @@ impl BankingStage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn handle_forwarding(
|
fn handle_forwarding(
|
||||||
enable_forwarding: bool,
|
forward_option: &ForwardOption,
|
||||||
cluster_info: &ClusterInfo,
|
cluster_info: &ClusterInfo,
|
||||||
buffered_packets: &mut UnprocessedPackets,
|
buffered_packets: &mut UnprocessedPackets,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
@ -683,14 +703,19 @@ impl BankingStage {
|
|||||||
hold: bool,
|
hold: bool,
|
||||||
data_budget: &DataBudget,
|
data_budget: &DataBudget,
|
||||||
) {
|
) {
|
||||||
if !enable_forwarding {
|
let addr = match forward_option {
|
||||||
|
ForwardOption::NotForward => {
|
||||||
if !hold {
|
if !hold {
|
||||||
buffered_packets.clear();
|
buffered_packets.clear();
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
ForwardOption::ForwardTransaction => {
|
||||||
let addr = match next_leader_tpu_forwards(cluster_info, poh_recorder) {
|
next_leader_tpu_forwards(cluster_info, poh_recorder)
|
||||||
|
}
|
||||||
|
ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder),
|
||||||
|
};
|
||||||
|
let addr = match addr {
|
||||||
Some(addr) => addr,
|
Some(addr) => addr,
|
||||||
None => return,
|
None => return,
|
||||||
};
|
};
|
||||||
@ -711,7 +736,7 @@ impl BankingStage {
|
|||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
cluster_info: &ClusterInfo,
|
cluster_info: &ClusterInfo,
|
||||||
recv_start: &mut Instant,
|
recv_start: &mut Instant,
|
||||||
enable_forwarding: bool,
|
forward_option: ForwardOption,
|
||||||
id: u32,
|
id: u32,
|
||||||
batch_limit: usize,
|
batch_limit: usize,
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
@ -734,7 +759,7 @@ impl BankingStage {
|
|||||||
poh_recorder,
|
poh_recorder,
|
||||||
cluster_info,
|
cluster_info,
|
||||||
&mut buffered_packets,
|
&mut buffered_packets,
|
||||||
enable_forwarding,
|
&forward_option,
|
||||||
transaction_status_sender.clone(),
|
transaction_status_sender.clone(),
|
||||||
&gossip_vote_sender,
|
&gossip_vote_sender,
|
||||||
&banking_stage_stats,
|
&banking_stage_stats,
|
||||||
@ -788,13 +813,11 @@ impl BankingStage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn num_threads() -> u32 {
|
pub fn num_threads() -> u32 {
|
||||||
const MIN_THREADS_VOTES: u32 = 1;
|
|
||||||
const MIN_THREADS_BANKING: u32 = 1;
|
|
||||||
cmp::max(
|
cmp::max(
|
||||||
env::var("SOLANA_BANKING_THREADS")
|
env::var("SOLANA_BANKING_THREADS")
|
||||||
.map(|x| x.parse().unwrap_or(NUM_THREADS))
|
.map(|x| x.parse().unwrap_or(NUM_THREADS))
|
||||||
.unwrap_or(NUM_THREADS),
|
.unwrap_or(NUM_THREADS),
|
||||||
MIN_THREADS_VOTES + MIN_THREADS_BANKING,
|
NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1126,6 +1149,10 @@ impl BankingStage {
|
|||||||
.iter()
|
.iter()
|
||||||
.filter_map(|tx_index| {
|
.filter_map(|tx_index| {
|
||||||
let p = &msgs.packets[*tx_index];
|
let p = &msgs.packets[*tx_index];
|
||||||
|
if votes_only && !p.meta.is_simple_vote_tx {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
let tx: VersionedTransaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?;
|
let tx: VersionedTransaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?;
|
||||||
let message_bytes = Self::packet_message(p)?;
|
let message_bytes = Self::packet_message(p)?;
|
||||||
let message_hash = Message::hash_raw_message(message_bytes);
|
let message_hash = Message::hash_raw_message(message_bytes);
|
||||||
@ -1133,9 +1160,6 @@ impl BankingStage {
|
|||||||
Err(TransactionError::UnsupportedVersion)
|
Err(TransactionError::UnsupportedVersion)
|
||||||
})
|
})
|
||||||
.ok()?;
|
.ok()?;
|
||||||
if votes_only && !solana_runtime::bank::is_simple_vote_transaction(&tx) {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
tx.verify_precompiles(feature_set).ok()?;
|
tx.verify_precompiles(feature_set).ok()?;
|
||||||
Some((tx, *tx_index))
|
Some((tx, *tx_index))
|
||||||
})
|
})
|
||||||
@ -1627,27 +1651,37 @@ pub(crate) fn next_leader_tpu(
|
|||||||
cluster_info: &ClusterInfo,
|
cluster_info: &ClusterInfo,
|
||||||
poh_recorder: &Mutex<PohRecorder>,
|
poh_recorder: &Mutex<PohRecorder>,
|
||||||
) -> Option<std::net::SocketAddr> {
|
) -> Option<std::net::SocketAddr> {
|
||||||
if let Some(leader_pubkey) = poh_recorder
|
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu)
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET)
|
|
||||||
{
|
|
||||||
cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next_leader_tpu_forwards(
|
fn next_leader_tpu_forwards(
|
||||||
cluster_info: &ClusterInfo,
|
cluster_info: &ClusterInfo,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Mutex<PohRecorder>,
|
||||||
) -> Option<std::net::SocketAddr> {
|
) -> Option<std::net::SocketAddr> {
|
||||||
|
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_forwards)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn next_leader_tpu_vote(
|
||||||
|
cluster_info: &ClusterInfo,
|
||||||
|
poh_recorder: &Mutex<PohRecorder>,
|
||||||
|
) -> Option<std::net::SocketAddr> {
|
||||||
|
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_vote)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn next_leader_x<F>(
|
||||||
|
cluster_info: &ClusterInfo,
|
||||||
|
poh_recorder: &Mutex<PohRecorder>,
|
||||||
|
port_selector: F,
|
||||||
|
) -> Option<std::net::SocketAddr>
|
||||||
|
where
|
||||||
|
F: FnOnce(&ContactInfo) -> SocketAddr,
|
||||||
|
{
|
||||||
if let Some(leader_pubkey) = poh_recorder
|
if let Some(leader_pubkey) = poh_recorder
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET)
|
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET)
|
||||||
{
|
{
|
||||||
cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu_forwards)
|
cluster_info.lookup_contact_info(&leader_pubkey, port_selector)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
@ -1684,6 +1718,7 @@ mod tests {
|
|||||||
};
|
};
|
||||||
use solana_streamer::socket::SocketAddrSpace;
|
use solana_streamer::socket::SocketAddrSpace;
|
||||||
use solana_transaction_status::TransactionWithStatusMeta;
|
use solana_transaction_status::TransactionWithStatusMeta;
|
||||||
|
use solana_vote_program::vote_transaction;
|
||||||
use std::{
|
use std::{
|
||||||
convert::{TryFrom, TryInto},
|
convert::{TryFrom, TryInto},
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
@ -1708,8 +1743,8 @@ mod tests {
|
|||||||
let genesis_config = create_genesis_config(2).genesis_config;
|
let genesis_config = create_genesis_config(2).genesis_config;
|
||||||
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
|
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
|
||||||
let (verified_sender, verified_receiver) = unbounded();
|
let (verified_sender, verified_receiver) = unbounded();
|
||||||
let (vote_sender, vote_receiver) = unbounded();
|
let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded();
|
||||||
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
|
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
|
||||||
let ledger_path = get_tmp_ledger_path!();
|
let ledger_path = get_tmp_ledger_path!();
|
||||||
{
|
{
|
||||||
let blockstore = Arc::new(
|
let blockstore = Arc::new(
|
||||||
@ -1720,11 +1755,14 @@ mod tests {
|
|||||||
create_test_recorder(&bank, &blockstore, None);
|
create_test_recorder(&bank, &blockstore, None);
|
||||||
let cluster_info = new_test_cluster_info(Node::new_localhost().info);
|
let cluster_info = new_test_cluster_info(Node::new_localhost().info);
|
||||||
let cluster_info = Arc::new(cluster_info);
|
let cluster_info = Arc::new(cluster_info);
|
||||||
|
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
|
||||||
|
|
||||||
let banking_stage = BankingStage::new(
|
let banking_stage = BankingStage::new(
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
verified_receiver,
|
verified_receiver,
|
||||||
vote_receiver,
|
tpu_vote_receiver,
|
||||||
|
gossip_verified_vote_receiver,
|
||||||
None,
|
None,
|
||||||
gossip_vote_sender,
|
gossip_vote_sender,
|
||||||
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
||||||
@ -1732,7 +1770,8 @@ mod tests {
|
|||||||
))))),
|
))))),
|
||||||
);
|
);
|
||||||
drop(verified_sender);
|
drop(verified_sender);
|
||||||
drop(vote_sender);
|
drop(gossip_verified_vote_sender);
|
||||||
|
drop(tpu_vote_sender);
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
banking_stage.join().unwrap();
|
banking_stage.join().unwrap();
|
||||||
poh_service.join().unwrap();
|
poh_service.join().unwrap();
|
||||||
@ -1751,7 +1790,7 @@ mod tests {
|
|||||||
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
|
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
|
||||||
let start_hash = bank.last_blockhash();
|
let start_hash = bank.last_blockhash();
|
||||||
let (verified_sender, verified_receiver) = unbounded();
|
let (verified_sender, verified_receiver) = unbounded();
|
||||||
let (vote_sender, vote_receiver) = unbounded();
|
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
|
||||||
let ledger_path = get_tmp_ledger_path!();
|
let ledger_path = get_tmp_ledger_path!();
|
||||||
{
|
{
|
||||||
let blockstore = Arc::new(
|
let blockstore = Arc::new(
|
||||||
@ -1766,13 +1805,15 @@ mod tests {
|
|||||||
create_test_recorder(&bank, &blockstore, Some(poh_config));
|
create_test_recorder(&bank, &blockstore, Some(poh_config));
|
||||||
let cluster_info = new_test_cluster_info(Node::new_localhost().info);
|
let cluster_info = new_test_cluster_info(Node::new_localhost().info);
|
||||||
let cluster_info = Arc::new(cluster_info);
|
let cluster_info = Arc::new(cluster_info);
|
||||||
|
let (verified_gossip_vote_sender, verified_gossip_vote_receiver) = unbounded();
|
||||||
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
|
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
|
||||||
|
|
||||||
let banking_stage = BankingStage::new(
|
let banking_stage = BankingStage::new(
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
verified_receiver,
|
verified_receiver,
|
||||||
vote_receiver,
|
tpu_vote_receiver,
|
||||||
|
verified_gossip_vote_receiver,
|
||||||
None,
|
None,
|
||||||
gossip_vote_sender,
|
gossip_vote_sender,
|
||||||
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
||||||
@ -1781,7 +1822,8 @@ mod tests {
|
|||||||
);
|
);
|
||||||
trace!("sending bank");
|
trace!("sending bank");
|
||||||
drop(verified_sender);
|
drop(verified_sender);
|
||||||
drop(vote_sender);
|
drop(verified_gossip_vote_sender);
|
||||||
|
drop(tpu_vote_sender);
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
poh_service.join().unwrap();
|
poh_service.join().unwrap();
|
||||||
drop(poh_recorder);
|
drop(poh_recorder);
|
||||||
@ -1821,7 +1863,8 @@ mod tests {
|
|||||||
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
|
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
|
||||||
let start_hash = bank.last_blockhash();
|
let start_hash = bank.last_blockhash();
|
||||||
let (verified_sender, verified_receiver) = unbounded();
|
let (verified_sender, verified_receiver) = unbounded();
|
||||||
let (vote_sender, vote_receiver) = unbounded();
|
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
|
||||||
|
let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded();
|
||||||
let ledger_path = get_tmp_ledger_path!();
|
let ledger_path = get_tmp_ledger_path!();
|
||||||
{
|
{
|
||||||
let blockstore = Arc::new(
|
let blockstore = Arc::new(
|
||||||
@ -1844,7 +1887,8 @@ mod tests {
|
|||||||
&cluster_info,
|
&cluster_info,
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
verified_receiver,
|
verified_receiver,
|
||||||
vote_receiver,
|
tpu_vote_receiver,
|
||||||
|
gossip_verified_vote_receiver,
|
||||||
None,
|
None,
|
||||||
gossip_vote_sender,
|
gossip_vote_sender,
|
||||||
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
||||||
@ -1887,7 +1931,8 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
drop(verified_sender);
|
drop(verified_sender);
|
||||||
drop(vote_sender);
|
drop(tpu_vote_sender);
|
||||||
|
drop(gossip_verified_vote_sender);
|
||||||
// wait until banking_stage to finish up all packets
|
// wait until banking_stage to finish up all packets
|
||||||
banking_stage.join().unwrap();
|
banking_stage.join().unwrap();
|
||||||
|
|
||||||
@ -1968,6 +2013,7 @@ mod tests {
|
|||||||
verified_sender.send(packets).unwrap();
|
verified_sender.send(packets).unwrap();
|
||||||
|
|
||||||
let (vote_sender, vote_receiver) = unbounded();
|
let (vote_sender, vote_receiver) = unbounded();
|
||||||
|
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
|
||||||
let ledger_path = get_tmp_ledger_path!();
|
let ledger_path = get_tmp_ledger_path!();
|
||||||
{
|
{
|
||||||
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
|
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
|
||||||
@ -1993,8 +2039,9 @@ mod tests {
|
|||||||
&cluster_info,
|
&cluster_info,
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
verified_receiver,
|
verified_receiver,
|
||||||
|
tpu_vote_receiver,
|
||||||
vote_receiver,
|
vote_receiver,
|
||||||
2,
|
3,
|
||||||
None,
|
None,
|
||||||
gossip_vote_sender,
|
gossip_vote_sender,
|
||||||
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
||||||
@ -2012,6 +2059,7 @@ mod tests {
|
|||||||
};
|
};
|
||||||
drop(verified_sender);
|
drop(verified_sender);
|
||||||
drop(vote_sender);
|
drop(vote_sender);
|
||||||
|
drop(tpu_vote_sender);
|
||||||
|
|
||||||
// consume the entire entry_receiver, feed it into a new bank
|
// consume the entire entry_receiver, feed it into a new bank
|
||||||
// check that the balance is what we expect.
|
// check that the balance is what we expect.
|
||||||
@ -2968,7 +3016,7 @@ mod tests {
|
|||||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let data_budget = DataBudget::default();
|
let data_budget = DataBudget::default();
|
||||||
BankingStage::handle_forwarding(
|
BankingStage::handle_forwarding(
|
||||||
true,
|
&ForwardOption::ForwardTransaction,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&mut unprocessed_packets,
|
&mut unprocessed_packets,
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
@ -3097,59 +3145,166 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
fn make_test_packets(
|
||||||
|
transactions: Vec<Transaction>,
|
||||||
|
vote_indexes: Vec<usize>,
|
||||||
|
) -> (Packets, Vec<usize>) {
|
||||||
|
let capacity = transactions.len();
|
||||||
|
let mut packets = Packets::with_capacity(capacity);
|
||||||
|
let mut packet_indexes = Vec::with_capacity(capacity);
|
||||||
|
packets.packets.resize(capacity, Packet::default());
|
||||||
|
for (index, tx) in transactions.iter().enumerate() {
|
||||||
|
Packet::populate_packet(&mut packets.packets[index], None, tx).ok();
|
||||||
|
packet_indexes.push(index);
|
||||||
|
}
|
||||||
|
for index in vote_indexes.iter() {
|
||||||
|
packets.packets[*index].meta.is_simple_vote_tx = true;
|
||||||
|
}
|
||||||
|
(packets, packet_indexes)
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_transactions_from_packets() {
|
fn test_transactions_from_packets() {
|
||||||
use solana_sdk::feature_set::FeatureSet;
|
use solana_sdk::feature_set::FeatureSet;
|
||||||
use solana_vote_program::vote_state::Vote;
|
let keypair = Keypair::new();
|
||||||
solana_logger::setup();
|
let transfer_tx =
|
||||||
let mut vote_packet = Packet::default();
|
system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default());
|
||||||
let vote_instruction = solana_vote_program::vote_instruction::vote(
|
let vote_tx = vote_transaction::new_vote_transaction(
|
||||||
&Pubkey::new_unique(),
|
vec![42],
|
||||||
&Pubkey::new_unique(),
|
|
||||||
Vote::default(),
|
|
||||||
);
|
|
||||||
let vote_transaction =
|
|
||||||
Transaction::new_with_payer(&[vote_instruction], Some(&Pubkey::new_unique()));
|
|
||||||
Packet::populate_packet(&mut vote_packet, None, &vote_transaction).unwrap();
|
|
||||||
let mut non_vote = Packet::default();
|
|
||||||
let tx = system_transaction::transfer(
|
|
||||||
&Keypair::new(),
|
|
||||||
&Pubkey::new_unique(),
|
|
||||||
2,
|
|
||||||
Hash::default(),
|
Hash::default(),
|
||||||
|
Hash::default(),
|
||||||
|
&keypair,
|
||||||
|
&keypair,
|
||||||
|
&keypair,
|
||||||
|
None,
|
||||||
);
|
);
|
||||||
Packet::populate_packet(&mut non_vote, None, &tx).unwrap();
|
|
||||||
let msgs = Packets::new(vec![non_vote, vote_packet]);
|
|
||||||
let packet_indexes = [0, 1];
|
|
||||||
let feature_set = Arc::new(FeatureSet::default());
|
|
||||||
let cost_model = Arc::new(RwLock::new(CostModel::default()));
|
|
||||||
let cost_tracker = Arc::new(RwLock::new(CostTracker::new(cost_model)));
|
|
||||||
let banking_stage_stats = BankingStageStats::default();
|
|
||||||
let (transactions, _transaction_to_packet_indexes, _retryable_packet_indexes) =
|
|
||||||
BankingStage::transactions_from_packets(
|
|
||||||
&msgs,
|
|
||||||
&packet_indexes,
|
|
||||||
&feature_set,
|
|
||||||
&cost_tracker,
|
|
||||||
&banking_stage_stats,
|
|
||||||
false,
|
|
||||||
true,
|
|
||||||
&mut CostTrackerStats::default(),
|
|
||||||
);
|
|
||||||
assert_eq!(transactions.len(), 1);
|
|
||||||
assert!(!transactions[0].signatures().is_empty());
|
|
||||||
|
|
||||||
let (transactions, _transaction_to_packet_indexes, _retryable_packet_indexes) =
|
// packets with no votes
|
||||||
|
{
|
||||||
|
let vote_indexes = vec![];
|
||||||
|
let (packets, packet_indexes) =
|
||||||
|
make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes);
|
||||||
|
|
||||||
|
let mut votes_only = false;
|
||||||
|
let (txs, tx_packet_index, _retryable_packet_indexes) =
|
||||||
BankingStage::transactions_from_packets(
|
BankingStage::transactions_from_packets(
|
||||||
&msgs,
|
&packets,
|
||||||
&packet_indexes,
|
&packet_indexes,
|
||||||
&feature_set,
|
&Arc::new(FeatureSet::default()),
|
||||||
&cost_tracker,
|
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
||||||
&banking_stage_stats,
|
CostModel::default(),
|
||||||
false,
|
))))),
|
||||||
|
&BankingStageStats::default(),
|
||||||
false,
|
false,
|
||||||
|
votes_only,
|
||||||
&mut CostTrackerStats::default(),
|
&mut CostTrackerStats::default(),
|
||||||
);
|
);
|
||||||
assert_eq!(transactions.len(), 2);
|
assert_eq!(2, txs.len());
|
||||||
|
assert_eq!(vec![0, 1], tx_packet_index);
|
||||||
|
|
||||||
|
votes_only = true;
|
||||||
|
let (txs, tx_packet_index, _retryable_packet_indexes) =
|
||||||
|
BankingStage::transactions_from_packets(
|
||||||
|
&packets,
|
||||||
|
&packet_indexes,
|
||||||
|
&Arc::new(FeatureSet::default()),
|
||||||
|
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
||||||
|
CostModel::default(),
|
||||||
|
))))),
|
||||||
|
&BankingStageStats::default(),
|
||||||
|
false,
|
||||||
|
votes_only,
|
||||||
|
&mut CostTrackerStats::default(),
|
||||||
|
);
|
||||||
|
assert_eq!(0, txs.len());
|
||||||
|
assert_eq!(0, tx_packet_index.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
// packets with some votes
|
||||||
|
{
|
||||||
|
let vote_indexes = vec![0, 2];
|
||||||
|
let (packets, packet_indexes) = make_test_packets(
|
||||||
|
vec![vote_tx.clone(), transfer_tx, vote_tx.clone()],
|
||||||
|
vote_indexes,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut votes_only = false;
|
||||||
|
let (txs, tx_packet_index, _retryable_packet_indexes) =
|
||||||
|
BankingStage::transactions_from_packets(
|
||||||
|
&packets,
|
||||||
|
&packet_indexes,
|
||||||
|
&Arc::new(FeatureSet::default()),
|
||||||
|
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
||||||
|
CostModel::default(),
|
||||||
|
))))),
|
||||||
|
&BankingStageStats::default(),
|
||||||
|
false,
|
||||||
|
votes_only,
|
||||||
|
&mut CostTrackerStats::default(),
|
||||||
|
);
|
||||||
|
assert_eq!(3, txs.len());
|
||||||
|
assert_eq!(vec![0, 1, 2], tx_packet_index);
|
||||||
|
|
||||||
|
votes_only = true;
|
||||||
|
let (txs, tx_packet_index, _retryable_packet_indexes) =
|
||||||
|
BankingStage::transactions_from_packets(
|
||||||
|
&packets,
|
||||||
|
&packet_indexes,
|
||||||
|
&Arc::new(FeatureSet::default()),
|
||||||
|
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
||||||
|
CostModel::default(),
|
||||||
|
))))),
|
||||||
|
&BankingStageStats::default(),
|
||||||
|
false,
|
||||||
|
votes_only,
|
||||||
|
&mut CostTrackerStats::default(),
|
||||||
|
);
|
||||||
|
assert_eq!(2, txs.len());
|
||||||
|
assert_eq!(vec![0, 2], tx_packet_index);
|
||||||
|
}
|
||||||
|
|
||||||
|
// packets with all votes
|
||||||
|
{
|
||||||
|
let vote_indexes = vec![0, 1, 2];
|
||||||
|
let (packets, packet_indexes) = make_test_packets(
|
||||||
|
vec![vote_tx.clone(), vote_tx.clone(), vote_tx],
|
||||||
|
vote_indexes,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut votes_only = false;
|
||||||
|
let (txs, tx_packet_index, _retryable_packet_indexes) =
|
||||||
|
BankingStage::transactions_from_packets(
|
||||||
|
&packets,
|
||||||
|
&packet_indexes,
|
||||||
|
&Arc::new(FeatureSet::default()),
|
||||||
|
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
||||||
|
CostModel::default(),
|
||||||
|
))))),
|
||||||
|
&BankingStageStats::default(),
|
||||||
|
false,
|
||||||
|
votes_only,
|
||||||
|
&mut CostTrackerStats::default(),
|
||||||
|
);
|
||||||
|
assert_eq!(3, txs.len());
|
||||||
|
assert_eq!(vec![0, 1, 2], tx_packet_index);
|
||||||
|
|
||||||
|
votes_only = true;
|
||||||
|
let (txs, tx_packet_index, _retryable_packet_indexes) =
|
||||||
|
BankingStage::transactions_from_packets(
|
||||||
|
&packets,
|
||||||
|
&packet_indexes,
|
||||||
|
&Arc::new(FeatureSet::default()),
|
||||||
|
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
|
||||||
|
CostModel::default(),
|
||||||
|
))))),
|
||||||
|
&BankingStageStats::default(),
|
||||||
|
false,
|
||||||
|
votes_only,
|
||||||
|
&mut CostTrackerStats::default(),
|
||||||
|
);
|
||||||
|
assert_eq!(3, txs.len());
|
||||||
|
assert_eq!(vec![0, 1, 2], tx_packet_index);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -350,7 +350,10 @@ impl ClusterInfoVoteListener {
|
|||||||
labels: Vec<CrdsValueLabel>,
|
labels: Vec<CrdsValueLabel>,
|
||||||
) -> (Vec<Transaction>, Vec<(CrdsValueLabel, Slot, Packets)>) {
|
) -> (Vec<Transaction>, Vec<(CrdsValueLabel, Slot, Packets)>) {
|
||||||
let mut msgs = packet::to_packets_chunked(&votes, 1);
|
let mut msgs = packet::to_packets_chunked(&votes, 1);
|
||||||
sigverify::ed25519_verify_cpu(&mut msgs);
|
|
||||||
|
// Votes should already be filtered by this point.
|
||||||
|
let reject_non_vote = false;
|
||||||
|
sigverify::ed25519_verify_cpu(&mut msgs, reject_non_vote);
|
||||||
|
|
||||||
let (vote_txs, packets) = izip!(labels.into_iter(), votes.into_iter(), msgs,)
|
let (vote_txs, packets) = izip!(labels.into_iter(), votes.into_iter(), msgs,)
|
||||||
.filter_map(|(label, vote, packet)| {
|
.filter_map(|(label, vote, packet)| {
|
||||||
|
@ -23,38 +23,49 @@ impl FetchStage {
|
|||||||
pub fn new(
|
pub fn new(
|
||||||
sockets: Vec<UdpSocket>,
|
sockets: Vec<UdpSocket>,
|
||||||
tpu_forwards_sockets: Vec<UdpSocket>,
|
tpu_forwards_sockets: Vec<UdpSocket>,
|
||||||
|
tpu_vote_sockets: Vec<UdpSocket>,
|
||||||
exit: &Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
coalesce_ms: u64,
|
coalesce_ms: u64,
|
||||||
) -> (Self, PacketReceiver) {
|
) -> (Self, PacketReceiver, PacketReceiver) {
|
||||||
let (sender, receiver) = channel();
|
let (sender, receiver) = channel();
|
||||||
|
let (vote_sender, vote_receiver) = channel();
|
||||||
(
|
(
|
||||||
Self::new_with_sender(
|
Self::new_with_sender(
|
||||||
sockets,
|
sockets,
|
||||||
tpu_forwards_sockets,
|
tpu_forwards_sockets,
|
||||||
|
tpu_vote_sockets,
|
||||||
exit,
|
exit,
|
||||||
&sender,
|
&sender,
|
||||||
|
&vote_sender,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
coalesce_ms,
|
coalesce_ms,
|
||||||
),
|
),
|
||||||
receiver,
|
receiver,
|
||||||
|
vote_receiver,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_with_sender(
|
pub fn new_with_sender(
|
||||||
sockets: Vec<UdpSocket>,
|
sockets: Vec<UdpSocket>,
|
||||||
tpu_forwards_sockets: Vec<UdpSocket>,
|
tpu_forwards_sockets: Vec<UdpSocket>,
|
||||||
|
tpu_vote_sockets: Vec<UdpSocket>,
|
||||||
exit: &Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
sender: &PacketSender,
|
sender: &PacketSender,
|
||||||
|
vote_sender: &PacketSender,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
coalesce_ms: u64,
|
coalesce_ms: u64,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
|
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
|
||||||
let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect();
|
let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect();
|
||||||
|
let tpu_vote_sockets = tpu_vote_sockets.into_iter().map(Arc::new).collect();
|
||||||
Self::new_multi_socket(
|
Self::new_multi_socket(
|
||||||
tx_sockets,
|
tx_sockets,
|
||||||
tpu_forwards_sockets,
|
tpu_forwards_sockets,
|
||||||
|
tpu_vote_sockets,
|
||||||
exit,
|
exit,
|
||||||
sender,
|
sender,
|
||||||
|
vote_sender,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
coalesce_ms,
|
coalesce_ms,
|
||||||
)
|
)
|
||||||
@ -98,8 +109,10 @@ impl FetchStage {
|
|||||||
fn new_multi_socket(
|
fn new_multi_socket(
|
||||||
sockets: Vec<Arc<UdpSocket>>,
|
sockets: Vec<Arc<UdpSocket>>,
|
||||||
tpu_forwards_sockets: Vec<Arc<UdpSocket>>,
|
tpu_forwards_sockets: Vec<Arc<UdpSocket>>,
|
||||||
|
tpu_vote_sockets: Vec<Arc<UdpSocket>>,
|
||||||
exit: &Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
sender: &PacketSender,
|
sender: &PacketSender,
|
||||||
|
vote_sender: &PacketSender,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
coalesce_ms: u64,
|
coalesce_ms: u64,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
@ -130,6 +143,18 @@ impl FetchStage {
|
|||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let tpu_vote_threads = tpu_vote_sockets.into_iter().map(|socket| {
|
||||||
|
streamer::receiver(
|
||||||
|
socket,
|
||||||
|
exit,
|
||||||
|
vote_sender.clone(),
|
||||||
|
recycler.clone(),
|
||||||
|
"fetch_vote_stage",
|
||||||
|
coalesce_ms,
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
let sender = sender.clone();
|
let sender = sender.clone();
|
||||||
let poh_recorder = poh_recorder.clone();
|
let poh_recorder = poh_recorder.clone();
|
||||||
|
|
||||||
@ -150,7 +175,10 @@ impl FetchStage {
|
|||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let mut thread_hdls: Vec<_> = tpu_threads.chain(tpu_forwards_threads).collect();
|
let mut thread_hdls: Vec<_> = tpu_threads
|
||||||
|
.chain(tpu_forwards_threads)
|
||||||
|
.chain(tpu_vote_threads)
|
||||||
|
.collect();
|
||||||
thread_hdls.push(fwd_thread_hdl);
|
thread_hdls.push(fwd_thread_hdl);
|
||||||
Self { thread_hdls }
|
Self { thread_hdls }
|
||||||
}
|
}
|
||||||
|
@ -5621,6 +5621,7 @@ pub mod tests {
|
|||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
&tower_storage,
|
&tower_storage,
|
||||||
vote_info,
|
vote_info,
|
||||||
|
false,
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut cursor = Cursor::default();
|
let mut cursor = Cursor::default();
|
||||||
@ -5684,6 +5685,7 @@ pub mod tests {
|
|||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
&tower_storage,
|
&tower_storage,
|
||||||
vote_info,
|
vote_info,
|
||||||
|
false,
|
||||||
);
|
);
|
||||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||||
assert_eq!(votes.len(), 1);
|
assert_eq!(votes.len(), 1);
|
||||||
@ -5751,6 +5753,7 @@ pub mod tests {
|
|||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
&tower_storage,
|
&tower_storage,
|
||||||
vote_info,
|
vote_info,
|
||||||
|
false,
|
||||||
);
|
);
|
||||||
|
|
||||||
assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
|
assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
|
||||||
|
@ -855,7 +855,7 @@ mod tests {
|
|||||||
repair: socketaddr!("127.0.0.1:1237"),
|
repair: socketaddr!("127.0.0.1:1237"),
|
||||||
tpu: socketaddr!("127.0.0.1:1238"),
|
tpu: socketaddr!("127.0.0.1:1238"),
|
||||||
tpu_forwards: socketaddr!("127.0.0.1:1239"),
|
tpu_forwards: socketaddr!("127.0.0.1:1239"),
|
||||||
unused: socketaddr!("127.0.0.1:1240"),
|
tpu_vote: socketaddr!("127.0.0.1:1240"),
|
||||||
rpc: socketaddr!("127.0.0.1:1241"),
|
rpc: socketaddr!("127.0.0.1:1241"),
|
||||||
rpc_pubsub: socketaddr!("127.0.0.1:1242"),
|
rpc_pubsub: socketaddr!("127.0.0.1:1242"),
|
||||||
serve_repair: socketaddr!("127.0.0.1:1243"),
|
serve_repair: socketaddr!("127.0.0.1:1243"),
|
||||||
@ -942,7 +942,7 @@ mod tests {
|
|||||||
repair: socketaddr!([127, 0, 0, 1], 1237),
|
repair: socketaddr!([127, 0, 0, 1], 1237),
|
||||||
tpu: socketaddr!([127, 0, 0, 1], 1238),
|
tpu: socketaddr!([127, 0, 0, 1], 1238),
|
||||||
tpu_forwards: socketaddr!([127, 0, 0, 1], 1239),
|
tpu_forwards: socketaddr!([127, 0, 0, 1], 1239),
|
||||||
unused: socketaddr!([127, 0, 0, 1], 1240),
|
tpu_vote: socketaddr!([127, 0, 0, 1], 1240),
|
||||||
rpc: socketaddr!([127, 0, 0, 1], 1241),
|
rpc: socketaddr!([127, 0, 0, 1], 1241),
|
||||||
rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242),
|
rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242),
|
||||||
serve_repair: serve_repair_addr,
|
serve_repair: serve_repair_addr,
|
||||||
@ -972,7 +972,7 @@ mod tests {
|
|||||||
repair: socketaddr!([127, 0, 0, 1], 1237),
|
repair: socketaddr!([127, 0, 0, 1], 1237),
|
||||||
tpu: socketaddr!([127, 0, 0, 1], 1238),
|
tpu: socketaddr!([127, 0, 0, 1], 1238),
|
||||||
tpu_forwards: socketaddr!([127, 0, 0, 1], 1239),
|
tpu_forwards: socketaddr!([127, 0, 0, 1], 1239),
|
||||||
unused: socketaddr!([127, 0, 0, 1], 1240),
|
tpu_vote: socketaddr!([127, 0, 0, 1], 1240),
|
||||||
rpc: socketaddr!([127, 0, 0, 1], 1241),
|
rpc: socketaddr!([127, 0, 0, 1], 1241),
|
||||||
rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242),
|
rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242),
|
||||||
serve_repair: serve_repair_addr2,
|
serve_repair: serve_repair_addr2,
|
||||||
|
@ -17,6 +17,16 @@ pub use solana_perf::sigverify::{
|
|||||||
pub struct TransactionSigVerifier {
|
pub struct TransactionSigVerifier {
|
||||||
recycler: Recycler<TxOffset>,
|
recycler: Recycler<TxOffset>,
|
||||||
recycler_out: Recycler<PinnedVec<u8>>,
|
recycler_out: Recycler<PinnedVec<u8>>,
|
||||||
|
reject_non_vote: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TransactionSigVerifier {
|
||||||
|
pub fn new_reject_non_vote() -> Self {
|
||||||
|
TransactionSigVerifier {
|
||||||
|
reject_non_vote: true,
|
||||||
|
..TransactionSigVerifier::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for TransactionSigVerifier {
|
impl Default for TransactionSigVerifier {
|
||||||
@ -25,13 +35,19 @@ impl Default for TransactionSigVerifier {
|
|||||||
Self {
|
Self {
|
||||||
recycler: Recycler::warmed(50, 4096),
|
recycler: Recycler::warmed(50, 4096),
|
||||||
recycler_out: Recycler::warmed(50, 4096),
|
recycler_out: Recycler::warmed(50, 4096),
|
||||||
|
reject_non_vote: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SigVerifier for TransactionSigVerifier {
|
impl SigVerifier for TransactionSigVerifier {
|
||||||
fn verify_batch(&self, mut batch: Vec<Packets>) -> Vec<Packets> {
|
fn verify_batch(&self, mut batch: Vec<Packets>) -> Vec<Packets> {
|
||||||
sigverify::ed25519_verify(&mut batch, &self.recycler, &self.recycler_out);
|
sigverify::ed25519_verify(
|
||||||
|
&mut batch,
|
||||||
|
&self.recycler,
|
||||||
|
&self.recycler_out,
|
||||||
|
self.reject_non_vote,
|
||||||
|
);
|
||||||
batch
|
batch
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -41,6 +41,7 @@ pub const DEFAULT_TPU_COALESCE_MS: u64 = 5;
|
|||||||
pub struct Tpu {
|
pub struct Tpu {
|
||||||
fetch_stage: FetchStage,
|
fetch_stage: FetchStage,
|
||||||
sigverify_stage: SigVerifyStage,
|
sigverify_stage: SigVerifyStage,
|
||||||
|
vote_sigverify_stage: SigVerifyStage,
|
||||||
banking_stage: BankingStage,
|
banking_stage: BankingStage,
|
||||||
cluster_info_vote_listener: ClusterInfoVoteListener,
|
cluster_info_vote_listener: ClusterInfoVoteListener,
|
||||||
broadcast_stage: BroadcastStage,
|
broadcast_stage: BroadcastStage,
|
||||||
@ -55,6 +56,7 @@ impl Tpu {
|
|||||||
retransmit_slots_receiver: RetransmitSlotsReceiver,
|
retransmit_slots_receiver: RetransmitSlotsReceiver,
|
||||||
transactions_sockets: Vec<UdpSocket>,
|
transactions_sockets: Vec<UdpSocket>,
|
||||||
tpu_forwards_sockets: Vec<UdpSocket>,
|
tpu_forwards_sockets: Vec<UdpSocket>,
|
||||||
|
tpu_vote_sockets: Vec<UdpSocket>,
|
||||||
broadcast_sockets: Vec<UdpSocket>,
|
broadcast_sockets: Vec<UdpSocket>,
|
||||||
subscriptions: &Arc<RpcSubscriptions>,
|
subscriptions: &Arc<RpcSubscriptions>,
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
@ -74,11 +76,14 @@ impl Tpu {
|
|||||||
cost_model: &Arc<RwLock<CostModel>>,
|
cost_model: &Arc<RwLock<CostModel>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (packet_sender, packet_receiver) = channel();
|
let (packet_sender, packet_receiver) = channel();
|
||||||
|
let (vote_packet_sender, vote_packet_receiver) = channel();
|
||||||
let fetch_stage = FetchStage::new_with_sender(
|
let fetch_stage = FetchStage::new_with_sender(
|
||||||
transactions_sockets,
|
transactions_sockets,
|
||||||
tpu_forwards_sockets,
|
tpu_forwards_sockets,
|
||||||
|
tpu_vote_sockets,
|
||||||
exit,
|
exit,
|
||||||
&packet_sender,
|
&packet_sender,
|
||||||
|
&vote_packet_sender,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
tpu_coalesce_ms,
|
tpu_coalesce_ms,
|
||||||
);
|
);
|
||||||
@ -89,11 +94,23 @@ impl Tpu {
|
|||||||
SigVerifyStage::new(packet_receiver, verified_sender, verifier)
|
SigVerifyStage::new(packet_receiver, verified_sender, verifier)
|
||||||
};
|
};
|
||||||
|
|
||||||
let (verified_vote_packets_sender, verified_vote_packets_receiver) = unbounded();
|
let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded();
|
||||||
|
|
||||||
|
let vote_sigverify_stage = {
|
||||||
|
let verifier = TransactionSigVerifier::new_reject_non_vote();
|
||||||
|
SigVerifyStage::new(
|
||||||
|
vote_packet_receiver,
|
||||||
|
verified_tpu_vote_packets_sender,
|
||||||
|
verifier,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) =
|
||||||
|
unbounded();
|
||||||
let cluster_info_vote_listener = ClusterInfoVoteListener::new(
|
let cluster_info_vote_listener = ClusterInfoVoteListener::new(
|
||||||
exit,
|
exit,
|
||||||
cluster_info.clone(),
|
cluster_info.clone(),
|
||||||
verified_vote_packets_sender,
|
verified_gossip_vote_packets_sender,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
vote_tracker,
|
vote_tracker,
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
@ -111,7 +128,8 @@ impl Tpu {
|
|||||||
cluster_info,
|
cluster_info,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
verified_receiver,
|
verified_receiver,
|
||||||
verified_vote_packets_receiver,
|
verified_tpu_vote_packets_receiver,
|
||||||
|
verified_gossip_vote_packets_receiver,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
replay_vote_sender,
|
replay_vote_sender,
|
||||||
cost_tracker,
|
cost_tracker,
|
||||||
@ -131,6 +149,7 @@ impl Tpu {
|
|||||||
Self {
|
Self {
|
||||||
fetch_stage,
|
fetch_stage,
|
||||||
sigverify_stage,
|
sigverify_stage,
|
||||||
|
vote_sigverify_stage,
|
||||||
banking_stage,
|
banking_stage,
|
||||||
cluster_info_vote_listener,
|
cluster_info_vote_listener,
|
||||||
broadcast_stage,
|
broadcast_stage,
|
||||||
@ -141,6 +160,7 @@ impl Tpu {
|
|||||||
let results = vec![
|
let results = vec![
|
||||||
self.fetch_stage.join(),
|
self.fetch_stage.join(),
|
||||||
self.sigverify_stage.join(),
|
self.sigverify_stage.join(),
|
||||||
|
self.vote_sigverify_stage.join(),
|
||||||
self.cluster_info_vote_listener.join(),
|
self.cluster_info_vote_listener.join(),
|
||||||
self.banking_stage.join(),
|
self.banking_stage.join(),
|
||||||
];
|
];
|
||||||
|
@ -292,6 +292,7 @@ impl Tvu {
|
|||||||
cluster_info.clone(),
|
cluster_info.clone(),
|
||||||
poh_recorder.clone(),
|
poh_recorder.clone(),
|
||||||
tower_storage,
|
tower_storage,
|
||||||
|
bank_forks.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let (cost_update_sender, cost_update_receiver): (
|
let (cost_update_sender, cost_update_receiver): (
|
||||||
|
@ -837,6 +837,7 @@ impl Validator {
|
|||||||
retransmit_slots_receiver,
|
retransmit_slots_receiver,
|
||||||
node.sockets.tpu,
|
node.sockets.tpu,
|
||||||
node.sockets.tpu_forwards,
|
node.sockets.tpu_forwards,
|
||||||
|
node.sockets.tpu_vote,
|
||||||
node.sockets.broadcast,
|
node.sockets.broadcast,
|
||||||
&rpc_subscriptions,
|
&rpc_subscriptions,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
|
@ -2,9 +2,10 @@ use crate::tower_storage::{SavedTower, TowerStorage};
|
|||||||
use solana_gossip::cluster_info::ClusterInfo;
|
use solana_gossip::cluster_info::ClusterInfo;
|
||||||
use solana_measure::measure::Measure;
|
use solana_measure::measure::Measure;
|
||||||
use solana_poh::poh_recorder::PohRecorder;
|
use solana_poh::poh_recorder::PohRecorder;
|
||||||
|
use solana_runtime::bank_forks::BankForks;
|
||||||
use solana_sdk::{clock::Slot, transaction::Transaction};
|
use solana_sdk::{clock::Slot, transaction::Transaction};
|
||||||
use std::{
|
use std::{
|
||||||
sync::{mpsc::Receiver, Arc, Mutex},
|
sync::{mpsc::Receiver, Arc, Mutex, RwLock},
|
||||||
thread::{self, Builder, JoinHandle},
|
thread::{self, Builder, JoinHandle},
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -39,16 +40,20 @@ impl VotingService {
|
|||||||
cluster_info: Arc<ClusterInfo>,
|
cluster_info: Arc<ClusterInfo>,
|
||||||
poh_recorder: Arc<Mutex<PohRecorder>>,
|
poh_recorder: Arc<Mutex<PohRecorder>>,
|
||||||
tower_storage: Arc<dyn TowerStorage>,
|
tower_storage: Arc<dyn TowerStorage>,
|
||||||
|
bank_forks: Arc<RwLock<BankForks>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let thread_hdl = Builder::new()
|
let thread_hdl = Builder::new()
|
||||||
.name("sol-vote-service".to_string())
|
.name("sol-vote-service".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
for vote_op in vote_receiver.iter() {
|
for vote_op in vote_receiver.iter() {
|
||||||
|
let rooted_bank = bank_forks.read().unwrap().root_bank().clone();
|
||||||
|
let send_to_tpu_vote_port = rooted_bank.send_to_tpu_vote_port_enabled();
|
||||||
Self::handle_vote(
|
Self::handle_vote(
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
tower_storage.as_ref(),
|
tower_storage.as_ref(),
|
||||||
vote_op,
|
vote_op,
|
||||||
|
send_to_tpu_vote_port,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -61,6 +66,7 @@ impl VotingService {
|
|||||||
poh_recorder: &Mutex<PohRecorder>,
|
poh_recorder: &Mutex<PohRecorder>,
|
||||||
tower_storage: &dyn TowerStorage,
|
tower_storage: &dyn TowerStorage,
|
||||||
vote_op: VoteOp,
|
vote_op: VoteOp,
|
||||||
|
send_to_tpu_vote_port: bool,
|
||||||
) {
|
) {
|
||||||
if let VoteOp::PushVote { saved_tower, .. } = &vote_op {
|
if let VoteOp::PushVote { saved_tower, .. } = &vote_op {
|
||||||
let mut measure = Measure::start("tower_save-ms");
|
let mut measure = Measure::start("tower_save-ms");
|
||||||
@ -72,10 +78,12 @@ impl VotingService {
|
|||||||
inc_new_counter_info!("tower_save-ms", measure.as_ms() as usize);
|
inc_new_counter_info!("tower_save-ms", measure.as_ms() as usize);
|
||||||
}
|
}
|
||||||
|
|
||||||
let _ = cluster_info.send_transaction(
|
let target_address = if send_to_tpu_vote_port {
|
||||||
vote_op.tx(),
|
crate::banking_stage::next_leader_tpu_vote(cluster_info, poh_recorder)
|
||||||
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder),
|
} else {
|
||||||
);
|
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder)
|
||||||
|
};
|
||||||
|
let _ = cluster_info.send_transaction(vote_op.tx(), target_address);
|
||||||
|
|
||||||
match vote_op {
|
match vote_op {
|
||||||
VoteOp::PushVote {
|
VoteOp::PushVote {
|
||||||
|
@ -254,7 +254,7 @@ pub fn make_accounts_hashes_message(
|
|||||||
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
|
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
|
||||||
|
|
||||||
// TODO These messages should go through the gpu pipeline for spam filtering
|
// TODO These messages should go through the gpu pipeline for spam filtering
|
||||||
#[frozen_abi(digest = "skH6cJ1MHfUyJKVaTfRRDV9y29HjuRpfReFpxXMWNky")]
|
#[frozen_abi(digest = "4VqzaZbxQkeTgo916HVoLtaWoM8bbGaQZy6Qgw7K9kLf")]
|
||||||
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
|
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
|
||||||
#[allow(clippy::large_enum_variant)]
|
#[allow(clippy::large_enum_variant)]
|
||||||
pub(crate) enum Protocol {
|
pub(crate) enum Protocol {
|
||||||
@ -765,7 +765,7 @@ impl ClusterInfo {
|
|||||||
};
|
};
|
||||||
let ip_addr = node.gossip.ip();
|
let ip_addr = node.gossip.ip();
|
||||||
Some(format!(
|
Some(format!(
|
||||||
"{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n",
|
"{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n",
|
||||||
if ContactInfo::is_valid_address(&node.gossip, &self.socket_addr_space) {
|
if ContactInfo::is_valid_address(&node.gossip, &self.socket_addr_space) {
|
||||||
ip_addr.to_string()
|
ip_addr.to_string()
|
||||||
} else {
|
} else {
|
||||||
@ -780,6 +780,7 @@ impl ClusterInfo {
|
|||||||
"-".to_string()
|
"-".to_string()
|
||||||
},
|
},
|
||||||
addr_to_string(&ip_addr, &node.gossip),
|
addr_to_string(&ip_addr, &node.gossip),
|
||||||
|
addr_to_string(&ip_addr, &node.tpu_vote),
|
||||||
addr_to_string(&ip_addr, &node.tpu),
|
addr_to_string(&ip_addr, &node.tpu),
|
||||||
addr_to_string(&ip_addr, &node.tpu_forwards),
|
addr_to_string(&ip_addr, &node.tpu_forwards),
|
||||||
addr_to_string(&ip_addr, &node.tvu),
|
addr_to_string(&ip_addr, &node.tvu),
|
||||||
@ -794,9 +795,9 @@ impl ClusterInfo {
|
|||||||
|
|
||||||
format!(
|
format!(
|
||||||
"IP Address |Age(ms)| Node identifier \
|
"IP Address |Age(ms)| Node identifier \
|
||||||
| Version |Gossip| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer\n\
|
| Version |Gossip|TPUvote| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer\n\
|
||||||
------------------+-------+----------------------------------------------+---------+\
|
------------------+-------+----------------------------------------------+---------+\
|
||||||
------+------+------+------+------+------+------+--------\n\
|
------+------+-------+------+------+------+------+------+--------\n\
|
||||||
{}\
|
{}\
|
||||||
Nodes: {}{}{}",
|
Nodes: {}{}{}",
|
||||||
nodes.join(""),
|
nodes.join(""),
|
||||||
@ -2667,6 +2668,7 @@ pub struct Sockets {
|
|||||||
pub tvu_forwards: Vec<UdpSocket>,
|
pub tvu_forwards: Vec<UdpSocket>,
|
||||||
pub tpu: Vec<UdpSocket>,
|
pub tpu: Vec<UdpSocket>,
|
||||||
pub tpu_forwards: Vec<UdpSocket>,
|
pub tpu_forwards: Vec<UdpSocket>,
|
||||||
|
pub tpu_vote: Vec<UdpSocket>,
|
||||||
pub broadcast: Vec<UdpSocket>,
|
pub broadcast: Vec<UdpSocket>,
|
||||||
pub repair: UdpSocket,
|
pub repair: UdpSocket,
|
||||||
pub retransmit_sockets: Vec<UdpSocket>,
|
pub retransmit_sockets: Vec<UdpSocket>,
|
||||||
@ -2693,6 +2695,7 @@ impl Node {
|
|||||||
let tvu = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let tvu = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
let tvu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let tvu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
let tpu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let tpu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
|
let tpu_vote = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
let rpc_port = find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap();
|
let rpc_port = find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap();
|
||||||
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port);
|
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port);
|
||||||
@ -2703,7 +2706,6 @@ impl Node {
|
|||||||
let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()];
|
let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()];
|
||||||
let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
let unused = UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
||||||
let info = ContactInfo {
|
let info = ContactInfo {
|
||||||
id: *pubkey,
|
id: *pubkey,
|
||||||
gossip: gossip_addr,
|
gossip: gossip_addr,
|
||||||
@ -2712,7 +2714,7 @@ impl Node {
|
|||||||
repair: repair.local_addr().unwrap(),
|
repair: repair.local_addr().unwrap(),
|
||||||
tpu: tpu.local_addr().unwrap(),
|
tpu: tpu.local_addr().unwrap(),
|
||||||
tpu_forwards: tpu_forwards.local_addr().unwrap(),
|
tpu_forwards: tpu_forwards.local_addr().unwrap(),
|
||||||
unused: unused.local_addr().unwrap(),
|
tpu_vote: tpu_vote.local_addr().unwrap(),
|
||||||
rpc: rpc_addr,
|
rpc: rpc_addr,
|
||||||
rpc_pubsub: rpc_pubsub_addr,
|
rpc_pubsub: rpc_pubsub_addr,
|
||||||
serve_repair: serve_repair.local_addr().unwrap(),
|
serve_repair: serve_repair.local_addr().unwrap(),
|
||||||
@ -2728,6 +2730,7 @@ impl Node {
|
|||||||
tvu_forwards: vec![tvu_forwards],
|
tvu_forwards: vec![tvu_forwards],
|
||||||
tpu: vec![tpu],
|
tpu: vec![tpu],
|
||||||
tpu_forwards: vec![tpu_forwards],
|
tpu_forwards: vec![tpu_forwards],
|
||||||
|
tpu_vote: vec![tpu_vote],
|
||||||
broadcast,
|
broadcast,
|
||||||
repair,
|
repair,
|
||||||
retransmit_sockets: vec![retransmit_socket],
|
retransmit_sockets: vec![retransmit_socket],
|
||||||
@ -2768,6 +2771,7 @@ impl Node {
|
|||||||
let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range);
|
let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range);
|
||||||
let (tpu_port, tpu) = Self::bind(bind_ip_addr, port_range);
|
let (tpu_port, tpu) = Self::bind(bind_ip_addr, port_range);
|
||||||
let (tpu_forwards_port, tpu_forwards) = Self::bind(bind_ip_addr, port_range);
|
let (tpu_forwards_port, tpu_forwards) = Self::bind(bind_ip_addr, port_range);
|
||||||
|
let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range);
|
||||||
let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
|
let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
|
||||||
let (repair_port, repair) = Self::bind(bind_ip_addr, port_range);
|
let (repair_port, repair) = Self::bind(bind_ip_addr, port_range);
|
||||||
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
|
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
|
||||||
@ -2784,7 +2788,7 @@ impl Node {
|
|||||||
repair: SocketAddr::new(gossip_addr.ip(), repair_port),
|
repair: SocketAddr::new(gossip_addr.ip(), repair_port),
|
||||||
tpu: SocketAddr::new(gossip_addr.ip(), tpu_port),
|
tpu: SocketAddr::new(gossip_addr.ip(), tpu_port),
|
||||||
tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port),
|
tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port),
|
||||||
unused: socketaddr_any!(),
|
tpu_vote: SocketAddr::new(gossip_addr.ip(), tpu_vote_port),
|
||||||
rpc: SocketAddr::new(gossip_addr.ip(), rpc_port),
|
rpc: SocketAddr::new(gossip_addr.ip(), rpc_port),
|
||||||
rpc_pubsub: SocketAddr::new(gossip_addr.ip(), rpc_pubsub_port),
|
rpc_pubsub: SocketAddr::new(gossip_addr.ip(), rpc_pubsub_port),
|
||||||
serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port),
|
serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port),
|
||||||
@ -2802,6 +2806,7 @@ impl Node {
|
|||||||
tvu_forwards: vec![tvu_forwards],
|
tvu_forwards: vec![tvu_forwards],
|
||||||
tpu: vec![tpu],
|
tpu: vec![tpu],
|
||||||
tpu_forwards: vec![tpu_forwards],
|
tpu_forwards: vec![tpu_forwards],
|
||||||
|
tpu_vote: vec![tpu_vote],
|
||||||
broadcast: vec![broadcast],
|
broadcast: vec![broadcast],
|
||||||
repair,
|
repair,
|
||||||
retransmit_sockets: vec![retransmit_socket],
|
retransmit_sockets: vec![retransmit_socket],
|
||||||
@ -2831,6 +2836,9 @@ impl Node {
|
|||||||
let (tpu_forwards_port, tpu_forwards_sockets) =
|
let (tpu_forwards_port, tpu_forwards_sockets) =
|
||||||
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind");
|
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind");
|
||||||
|
|
||||||
|
let (tpu_vote_port, tpu_vote_sockets) =
|
||||||
|
multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind");
|
||||||
|
|
||||||
let (_, retransmit_sockets) =
|
let (_, retransmit_sockets) =
|
||||||
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("retransmit multi_bind");
|
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("retransmit multi_bind");
|
||||||
|
|
||||||
@ -2848,7 +2856,7 @@ impl Node {
|
|||||||
repair: SocketAddr::new(gossip_addr.ip(), repair_port),
|
repair: SocketAddr::new(gossip_addr.ip(), repair_port),
|
||||||
tpu: SocketAddr::new(gossip_addr.ip(), tpu_port),
|
tpu: SocketAddr::new(gossip_addr.ip(), tpu_port),
|
||||||
tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port),
|
tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port),
|
||||||
unused: socketaddr_any!(),
|
tpu_vote: SocketAddr::new(gossip_addr.ip(), tpu_vote_port),
|
||||||
rpc: socketaddr_any!(),
|
rpc: socketaddr_any!(),
|
||||||
rpc_pubsub: socketaddr_any!(),
|
rpc_pubsub: socketaddr_any!(),
|
||||||
serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port),
|
serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port),
|
||||||
@ -2865,6 +2873,7 @@ impl Node {
|
|||||||
tvu_forwards: tvu_forwards_sockets,
|
tvu_forwards: tvu_forwards_sockets,
|
||||||
tpu: tpu_sockets,
|
tpu: tpu_sockets,
|
||||||
tpu_forwards: tpu_forwards_sockets,
|
tpu_forwards: tpu_forwards_sockets,
|
||||||
|
tpu_vote: tpu_vote_sockets,
|
||||||
broadcast,
|
broadcast,
|
||||||
repair,
|
repair,
|
||||||
retransmit_sockets,
|
retransmit_sockets,
|
||||||
|
@ -28,7 +28,7 @@ pub struct ContactInfo {
|
|||||||
/// address to forward unprocessed transactions to
|
/// address to forward unprocessed transactions to
|
||||||
pub tpu_forwards: SocketAddr,
|
pub tpu_forwards: SocketAddr,
|
||||||
/// address to which to send bank state requests
|
/// address to which to send bank state requests
|
||||||
pub unused: SocketAddr,
|
pub tpu_vote: SocketAddr,
|
||||||
/// address to which to send JSON-RPC requests
|
/// address to which to send JSON-RPC requests
|
||||||
pub rpc: SocketAddr,
|
pub rpc: SocketAddr,
|
||||||
/// websocket for JSON-RPC push notifications
|
/// websocket for JSON-RPC push notifications
|
||||||
@ -76,7 +76,7 @@ impl Default for ContactInfo {
|
|||||||
repair: socketaddr_any!(),
|
repair: socketaddr_any!(),
|
||||||
tpu: socketaddr_any!(),
|
tpu: socketaddr_any!(),
|
||||||
tpu_forwards: socketaddr_any!(),
|
tpu_forwards: socketaddr_any!(),
|
||||||
unused: socketaddr_any!(),
|
tpu_vote: socketaddr_any!(),
|
||||||
rpc: socketaddr_any!(),
|
rpc: socketaddr_any!(),
|
||||||
rpc_pubsub: socketaddr_any!(),
|
rpc_pubsub: socketaddr_any!(),
|
||||||
serve_repair: socketaddr_any!(),
|
serve_repair: socketaddr_any!(),
|
||||||
@ -96,7 +96,7 @@ impl ContactInfo {
|
|||||||
repair: socketaddr!("127.0.0.1:1237"),
|
repair: socketaddr!("127.0.0.1:1237"),
|
||||||
tpu: socketaddr!("127.0.0.1:1238"),
|
tpu: socketaddr!("127.0.0.1:1238"),
|
||||||
tpu_forwards: socketaddr!("127.0.0.1:1239"),
|
tpu_forwards: socketaddr!("127.0.0.1:1239"),
|
||||||
unused: socketaddr!("127.0.0.1:1240"),
|
tpu_vote: socketaddr!("127.0.0.1:1240"),
|
||||||
rpc: socketaddr!("127.0.0.1:1241"),
|
rpc: socketaddr!("127.0.0.1:1241"),
|
||||||
rpc_pubsub: socketaddr!("127.0.0.1:1242"),
|
rpc_pubsub: socketaddr!("127.0.0.1:1242"),
|
||||||
serve_repair: socketaddr!("127.0.0.1:1243"),
|
serve_repair: socketaddr!("127.0.0.1:1243"),
|
||||||
@ -126,7 +126,7 @@ impl ContactInfo {
|
|||||||
repair: addr,
|
repair: addr,
|
||||||
tpu: addr,
|
tpu: addr,
|
||||||
tpu_forwards: addr,
|
tpu_forwards: addr,
|
||||||
unused: addr,
|
tpu_vote: addr,
|
||||||
rpc: addr,
|
rpc: addr,
|
||||||
rpc_pubsub: addr,
|
rpc_pubsub: addr,
|
||||||
serve_repair: addr,
|
serve_repair: addr,
|
||||||
@ -152,6 +152,7 @@ impl ContactInfo {
|
|||||||
let rpc = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT);
|
let rpc = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT);
|
||||||
let rpc_pubsub = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT);
|
let rpc_pubsub = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT);
|
||||||
let serve_repair = next_port(bind_addr, 6);
|
let serve_repair = next_port(bind_addr, 6);
|
||||||
|
let tpu_vote = next_port(bind_addr, 7);
|
||||||
Self {
|
Self {
|
||||||
id: *pubkey,
|
id: *pubkey,
|
||||||
gossip,
|
gossip,
|
||||||
@ -160,7 +161,7 @@ impl ContactInfo {
|
|||||||
repair,
|
repair,
|
||||||
tpu,
|
tpu,
|
||||||
tpu_forwards,
|
tpu_forwards,
|
||||||
unused: "0.0.0.0:0".parse().unwrap(),
|
tpu_vote,
|
||||||
rpc,
|
rpc,
|
||||||
rpc_pubsub,
|
rpc_pubsub,
|
||||||
serve_repair,
|
serve_repair,
|
||||||
@ -262,7 +263,7 @@ mod tests {
|
|||||||
assert!(ci.rpc.ip().is_unspecified());
|
assert!(ci.rpc.ip().is_unspecified());
|
||||||
assert!(ci.rpc_pubsub.ip().is_unspecified());
|
assert!(ci.rpc_pubsub.ip().is_unspecified());
|
||||||
assert!(ci.tpu.ip().is_unspecified());
|
assert!(ci.tpu.ip().is_unspecified());
|
||||||
assert!(ci.unused.ip().is_unspecified());
|
assert!(ci.tpu_vote.ip().is_unspecified());
|
||||||
assert!(ci.serve_repair.ip().is_unspecified());
|
assert!(ci.serve_repair.ip().is_unspecified());
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
@ -274,7 +275,7 @@ mod tests {
|
|||||||
assert!(ci.rpc.ip().is_multicast());
|
assert!(ci.rpc.ip().is_multicast());
|
||||||
assert!(ci.rpc_pubsub.ip().is_multicast());
|
assert!(ci.rpc_pubsub.ip().is_multicast());
|
||||||
assert!(ci.tpu.ip().is_multicast());
|
assert!(ci.tpu.ip().is_multicast());
|
||||||
assert!(ci.unused.ip().is_multicast());
|
assert!(ci.tpu_vote.ip().is_multicast());
|
||||||
assert!(ci.serve_repair.ip().is_multicast());
|
assert!(ci.serve_repair.ip().is_multicast());
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
@ -287,7 +288,7 @@ mod tests {
|
|||||||
assert!(ci.rpc.ip().is_unspecified());
|
assert!(ci.rpc.ip().is_unspecified());
|
||||||
assert!(ci.rpc_pubsub.ip().is_unspecified());
|
assert!(ci.rpc_pubsub.ip().is_unspecified());
|
||||||
assert!(ci.tpu.ip().is_unspecified());
|
assert!(ci.tpu.ip().is_unspecified());
|
||||||
assert!(ci.unused.ip().is_unspecified());
|
assert!(ci.tpu_vote.ip().is_unspecified());
|
||||||
assert!(ci.serve_repair.ip().is_unspecified());
|
assert!(ci.serve_repair.ip().is_unspecified());
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
@ -295,12 +296,12 @@ mod tests {
|
|||||||
let addr = socketaddr!("127.0.0.1:10");
|
let addr = socketaddr!("127.0.0.1:10");
|
||||||
let ci = ContactInfo::new_with_socketaddr(&addr);
|
let ci = ContactInfo::new_with_socketaddr(&addr);
|
||||||
assert_eq!(ci.tpu, addr);
|
assert_eq!(ci.tpu, addr);
|
||||||
|
assert_eq!(ci.tpu_vote.port(), 17);
|
||||||
assert_eq!(ci.gossip.port(), 11);
|
assert_eq!(ci.gossip.port(), 11);
|
||||||
assert_eq!(ci.tvu.port(), 12);
|
assert_eq!(ci.tvu.port(), 12);
|
||||||
assert_eq!(ci.tpu_forwards.port(), 13);
|
assert_eq!(ci.tpu_forwards.port(), 13);
|
||||||
assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT);
|
assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT);
|
||||||
assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT);
|
assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT);
|
||||||
assert!(ci.unused.ip().is_unspecified());
|
|
||||||
assert_eq!(ci.serve_repair.port(), 16);
|
assert_eq!(ci.serve_repair.port(), 16);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -327,6 +328,7 @@ mod tests {
|
|||||||
assert_eq!(d1.tvu_forwards, socketaddr!("127.0.0.1:1238"));
|
assert_eq!(d1.tvu_forwards, socketaddr!("127.0.0.1:1238"));
|
||||||
assert_eq!(d1.repair, socketaddr!("127.0.0.1:1239"));
|
assert_eq!(d1.repair, socketaddr!("127.0.0.1:1239"));
|
||||||
assert_eq!(d1.serve_repair, socketaddr!("127.0.0.1:1240"));
|
assert_eq!(d1.serve_repair, socketaddr!("127.0.0.1:1240"));
|
||||||
|
assert_eq!(d1.tpu_vote, socketaddr!("127.0.0.1:1241"));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -19,7 +19,7 @@ fn bench_sigverify(bencher: &mut Bencher) {
|
|||||||
let recycler_out = Recycler::default();
|
let recycler_out = Recycler::default();
|
||||||
// verify packets
|
// verify packets
|
||||||
bencher.iter(|| {
|
bencher.iter(|| {
|
||||||
let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out);
|
let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -34,6 +34,6 @@ fn bench_get_offsets(bencher: &mut Bencher) {
|
|||||||
let recycler = Recycler::default();
|
let recycler = Recycler::default();
|
||||||
// verify packets
|
// verify packets
|
||||||
bencher.iter(|| {
|
bencher.iter(|| {
|
||||||
let _ans = sigverify::generate_offsets(&mut batches, &recycler);
|
let _ans = sigverify::generate_offsets(&mut batches, &recycler, false);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -102,8 +102,8 @@ pub fn init() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_packet(packet: &mut Packet) {
|
fn verify_packet(packet: &mut Packet, reject_non_vote: bool) {
|
||||||
let packet_offsets = get_packet_offsets(packet, 0);
|
let packet_offsets = get_packet_offsets(packet, 0, reject_non_vote);
|
||||||
let mut sig_start = packet_offsets.sig_start as usize;
|
let mut sig_start = packet_offsets.sig_start as usize;
|
||||||
let mut pubkey_start = packet_offsets.pubkey_start as usize;
|
let mut pubkey_start = packet_offsets.pubkey_start as usize;
|
||||||
let msg_start = packet_offsets.msg_start as usize;
|
let msg_start = packet_offsets.msg_start as usize;
|
||||||
@ -276,15 +276,20 @@ fn do_get_packet_offsets(
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_packet_offsets(packet: &mut Packet, current_offset: usize) -> PacketOffsets {
|
fn get_packet_offsets(
|
||||||
|
packet: &mut Packet,
|
||||||
|
current_offset: usize,
|
||||||
|
reject_non_vote: bool,
|
||||||
|
) -> PacketOffsets {
|
||||||
let unsanitized_packet_offsets = do_get_packet_offsets(packet, current_offset);
|
let unsanitized_packet_offsets = do_get_packet_offsets(packet, current_offset);
|
||||||
if let Ok(offsets) = unsanitized_packet_offsets {
|
if let Ok(offsets) = unsanitized_packet_offsets {
|
||||||
check_for_simple_vote_transaction(packet, &offsets, current_offset).ok();
|
check_for_simple_vote_transaction(packet, &offsets, current_offset).ok();
|
||||||
offsets
|
if !reject_non_vote || packet.meta.is_simple_vote_tx {
|
||||||
} else {
|
return offsets;
|
||||||
|
}
|
||||||
|
}
|
||||||
// force sigverify to fail by returning zeros
|
// force sigverify to fail by returning zeros
|
||||||
PacketOffsets::new(0, 0, 0, 0, 0)
|
PacketOffsets::new(0, 0, 0, 0, 0)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check_for_simple_vote_transaction(
|
fn check_for_simple_vote_transaction(
|
||||||
@ -355,7 +360,11 @@ fn check_for_simple_vote_transaction(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler<TxOffset>) -> TxOffsets {
|
pub fn generate_offsets(
|
||||||
|
batches: &mut [Packets],
|
||||||
|
recycler: &Recycler<TxOffset>,
|
||||||
|
reject_non_vote: bool,
|
||||||
|
) -> TxOffsets {
|
||||||
debug!("allocating..");
|
debug!("allocating..");
|
||||||
let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets");
|
let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets");
|
||||||
signature_offsets.set_pinnable();
|
signature_offsets.set_pinnable();
|
||||||
@ -370,7 +379,7 @@ pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler<TxOffset>)
|
|||||||
batches.iter_mut().for_each(|p| {
|
batches.iter_mut().for_each(|p| {
|
||||||
let mut sig_lens = Vec::new();
|
let mut sig_lens = Vec::new();
|
||||||
p.packets.iter_mut().for_each(|packet| {
|
p.packets.iter_mut().for_each(|packet| {
|
||||||
let packet_offsets = get_packet_offsets(packet, current_offset);
|
let packet_offsets = get_packet_offsets(packet, current_offset, reject_non_vote);
|
||||||
|
|
||||||
sig_lens.push(packet_offsets.sig_len);
|
sig_lens.push(packet_offsets.sig_len);
|
||||||
|
|
||||||
@ -404,14 +413,16 @@ pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler<TxOffset>)
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn ed25519_verify_cpu(batches: &mut [Packets]) {
|
pub fn ed25519_verify_cpu(batches: &mut [Packets], reject_non_vote: bool) {
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
let count = batch_size(batches);
|
let count = batch_size(batches);
|
||||||
debug!("CPU ECDSA for {}", batch_size(batches));
|
debug!("CPU ECDSA for {}", batch_size(batches));
|
||||||
PAR_THREAD_POOL.install(|| {
|
PAR_THREAD_POOL.install(|| {
|
||||||
batches
|
batches.into_par_iter().for_each(|p| {
|
||||||
.into_par_iter()
|
p.packets
|
||||||
.for_each(|p| p.packets.par_iter_mut().for_each(verify_packet))
|
.par_iter_mut()
|
||||||
|
.for_each(|p| verify_packet(p, reject_non_vote))
|
||||||
|
})
|
||||||
});
|
});
|
||||||
inc_new_counter_debug!("ed25519_verify_cpu", count);
|
inc_new_counter_debug!("ed25519_verify_cpu", count);
|
||||||
}
|
}
|
||||||
@ -491,10 +502,11 @@ pub fn ed25519_verify(
|
|||||||
batches: &mut [Packets],
|
batches: &mut [Packets],
|
||||||
recycler: &Recycler<TxOffset>,
|
recycler: &Recycler<TxOffset>,
|
||||||
recycler_out: &Recycler<PinnedVec<u8>>,
|
recycler_out: &Recycler<PinnedVec<u8>>,
|
||||||
|
reject_non_vote: bool,
|
||||||
) {
|
) {
|
||||||
let api = perf_libs::api();
|
let api = perf_libs::api();
|
||||||
if api.is_none() {
|
if api.is_none() {
|
||||||
return ed25519_verify_cpu(batches);
|
return ed25519_verify_cpu(batches, reject_non_vote);
|
||||||
}
|
}
|
||||||
let api = api.unwrap();
|
let api = api.unwrap();
|
||||||
|
|
||||||
@ -507,11 +519,11 @@ pub fn ed25519_verify(
|
|||||||
// may be busy doing other things while being a real validator
|
// may be busy doing other things while being a real validator
|
||||||
// TODO: dynamically adjust this crossover
|
// TODO: dynamically adjust this crossover
|
||||||
if count < 64 {
|
if count < 64 {
|
||||||
return ed25519_verify_cpu(batches);
|
return ed25519_verify_cpu(batches, reject_non_vote);
|
||||||
}
|
}
|
||||||
|
|
||||||
let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) =
|
let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) =
|
||||||
generate_offsets(batches, recycler);
|
generate_offsets(batches, recycler, reject_non_vote);
|
||||||
|
|
||||||
debug!("CUDA ECDSA for {}", batch_size(batches));
|
debug!("CUDA ECDSA for {}", batch_size(batches));
|
||||||
debug!("allocating out..");
|
debug!("allocating out..");
|
||||||
@ -626,7 +638,7 @@ mod tests {
|
|||||||
let message_data = tx.message_data();
|
let message_data = tx.message_data();
|
||||||
let mut packet = sigverify::make_packet_from_transaction(tx.clone());
|
let mut packet = sigverify::make_packet_from_transaction(tx.clone());
|
||||||
|
|
||||||
let packet_offsets = sigverify::get_packet_offsets(&mut packet, 0);
|
let packet_offsets = sigverify::get_packet_offsets(&mut packet, 0, false);
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
memfind(&tx_bytes, tx.signatures[0].as_ref()),
|
memfind(&tx_bytes, tx.signatures[0].as_ref()),
|
||||||
@ -705,7 +717,7 @@ mod tests {
|
|||||||
let res = sigverify::do_get_packet_offsets(&packet, 0);
|
let res = sigverify::do_get_packet_offsets(&packet, 0);
|
||||||
assert_eq!(res, Err(PacketError::InvalidPubkeyLen));
|
assert_eq!(res, Err(PacketError::InvalidPubkeyLen));
|
||||||
|
|
||||||
verify_packet(&mut packet);
|
verify_packet(&mut packet, false);
|
||||||
assert!(packet.meta.discard);
|
assert!(packet.meta.discard);
|
||||||
|
|
||||||
packet.meta.discard = false;
|
packet.meta.discard = false;
|
||||||
@ -741,7 +753,7 @@ mod tests {
|
|||||||
let res = sigverify::do_get_packet_offsets(&packet, 0);
|
let res = sigverify::do_get_packet_offsets(&packet, 0);
|
||||||
assert_eq!(res, Err(PacketError::InvalidPubkeyLen));
|
assert_eq!(res, Err(PacketError::InvalidPubkeyLen));
|
||||||
|
|
||||||
verify_packet(&mut packet);
|
verify_packet(&mut packet, false);
|
||||||
assert!(packet.meta.discard);
|
assert!(packet.meta.discard);
|
||||||
|
|
||||||
packet.meta.discard = false;
|
packet.meta.discard = false;
|
||||||
@ -872,7 +884,8 @@ mod tests {
|
|||||||
// Just like get_packet_offsets, but not returning redundant information.
|
// Just like get_packet_offsets, but not returning redundant information.
|
||||||
fn get_packet_offsets_from_tx(tx: Transaction, current_offset: u32) -> PacketOffsets {
|
fn get_packet_offsets_from_tx(tx: Transaction, current_offset: u32) -> PacketOffsets {
|
||||||
let mut packet = sigverify::make_packet_from_transaction(tx);
|
let mut packet = sigverify::make_packet_from_transaction(tx);
|
||||||
let packet_offsets = sigverify::get_packet_offsets(&mut packet, current_offset as usize);
|
let packet_offsets =
|
||||||
|
sigverify::get_packet_offsets(&mut packet, current_offset as usize, false);
|
||||||
PacketOffsets::new(
|
PacketOffsets::new(
|
||||||
packet_offsets.sig_len,
|
packet_offsets.sig_len,
|
||||||
packet_offsets.sig_start - current_offset,
|
packet_offsets.sig_start - current_offset,
|
||||||
@ -953,7 +966,7 @@ mod tests {
|
|||||||
fn ed25519_verify(batches: &mut [Packets]) {
|
fn ed25519_verify(batches: &mut [Packets]) {
|
||||||
let recycler = Recycler::default();
|
let recycler = Recycler::default();
|
||||||
let recycler_out = Recycler::default();
|
let recycler_out = Recycler::default();
|
||||||
sigverify::ed25519_verify(batches, &recycler, &recycler_out);
|
sigverify::ed25519_verify(batches, &recycler, &recycler_out, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -1051,8 +1064,8 @@ mod tests {
|
|||||||
// verify from GPU verification pipeline (when GPU verification is enabled) are
|
// verify from GPU verification pipeline (when GPU verification is enabled) are
|
||||||
// equivalent to the CPU verification pipeline.
|
// equivalent to the CPU verification pipeline.
|
||||||
let mut batches_cpu = batches.clone();
|
let mut batches_cpu = batches.clone();
|
||||||
sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out);
|
sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false);
|
||||||
ed25519_verify_cpu(&mut batches_cpu);
|
ed25519_verify_cpu(&mut batches_cpu, false);
|
||||||
|
|
||||||
// check result
|
// check result
|
||||||
batches
|
batches
|
||||||
|
@ -5925,6 +5925,11 @@ impl Bank {
|
|||||||
.is_active(&feature_set::stakes_remove_delegation_if_inactive::id())
|
.is_active(&feature_set::stakes_remove_delegation_if_inactive::id())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn send_to_tpu_vote_port_enabled(&self) -> bool {
|
||||||
|
self.feature_set
|
||||||
|
.is_active(&feature_set::send_to_tpu_vote_port::id())
|
||||||
|
}
|
||||||
|
|
||||||
// Check if the wallclock time from bank creation to now has exceeded the allotted
|
// Check if the wallclock time from bank creation to now has exceeded the allotted
|
||||||
// time for transaction processing
|
// time for transaction processing
|
||||||
pub fn should_bank_still_be_processing_txs(
|
pub fn should_bank_still_be_processing_txs(
|
||||||
@ -6257,7 +6262,7 @@ pub fn goto_end_of_slot(bank: &mut Bank) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_simple_vote_transaction(transaction: &SanitizedTransaction) -> bool {
|
fn is_simple_vote_transaction(transaction: &SanitizedTransaction) -> bool {
|
||||||
if transaction.message().instructions().len() == 1 {
|
if transaction.message().instructions().len() == 1 {
|
||||||
let (program_pubkey, instruction) = transaction
|
let (program_pubkey, instruction) = transaction
|
||||||
.message()
|
.message()
|
||||||
|
@ -30,6 +30,8 @@ EXPOSE 8006/udp
|
|||||||
EXPOSE 8007/udp
|
EXPOSE 8007/udp
|
||||||
# broadcast
|
# broadcast
|
||||||
EXPOSE 8008/udp
|
EXPOSE 8008/udp
|
||||||
|
# tpu_vote
|
||||||
|
EXPOSE 8009/udp
|
||||||
|
|
||||||
RUN apt update && \
|
RUN apt update && \
|
||||||
apt-get install -y bzip2 libssl-dev && \
|
apt-get install -y bzip2 libssl-dev && \
|
||||||
|
@ -237,6 +237,10 @@ pub mod remove_native_loader {
|
|||||||
solana_sdk::declare_id!("HTTgmruMYRZEntyL3EdCDdnS6e4D5wRq1FA7kQsb66qq");
|
solana_sdk::declare_id!("HTTgmruMYRZEntyL3EdCDdnS6e4D5wRq1FA7kQsb66qq");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub mod send_to_tpu_vote_port {
|
||||||
|
solana_sdk::declare_id!("C5fh68nJ7uyKAuYZg2x9sEQ5YrVf3dkW6oojNBSc3Jvo");
|
||||||
|
}
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
/// Map of feature identifiers to user-visible description
|
/// Map of feature identifiers to user-visible description
|
||||||
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
|
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
|
||||||
@ -291,6 +295,7 @@ lazy_static! {
|
|||||||
(prevent_calling_precompiles_as_programs::id(), "Prevent calling precompiles as programs"),
|
(prevent_calling_precompiles_as_programs::id(), "Prevent calling precompiles as programs"),
|
||||||
(optimize_epoch_boundary_updates::id(), "Optimize epoch boundary updates"),
|
(optimize_epoch_boundary_updates::id(), "Optimize epoch boundary updates"),
|
||||||
(remove_native_loader::id(), "Remove support for the native loader"),
|
(remove_native_loader::id(), "Remove support for the native loader"),
|
||||||
|
(send_to_tpu_vote_port::id(), "Send votes to the tpu vote port"),
|
||||||
/*************** ADD NEW FEATURES HERE ***************/
|
/*************** ADD NEW FEATURES HERE ***************/
|
||||||
]
|
]
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -357,6 +357,9 @@ fn verify_reachable_ports(
|
|||||||
if ContactInfo::is_valid_address(&node.info.tpu_forwards, socket_addr_space) {
|
if ContactInfo::is_valid_address(&node.info.tpu_forwards, socket_addr_space) {
|
||||||
udp_sockets.extend(node.sockets.tpu_forwards.iter());
|
udp_sockets.extend(node.sockets.tpu_forwards.iter());
|
||||||
}
|
}
|
||||||
|
if ContactInfo::is_valid_address(&node.info.tpu_vote, socket_addr_space) {
|
||||||
|
udp_sockets.extend(node.sockets.tpu_vote.iter());
|
||||||
|
}
|
||||||
if ContactInfo::is_valid_address(&node.info.tvu, socket_addr_space) {
|
if ContactInfo::is_valid_address(&node.info.tvu, socket_addr_space) {
|
||||||
udp_sockets.extend(node.sockets.tvu.iter());
|
udp_sockets.extend(node.sockets.tvu.iter());
|
||||||
udp_sockets.extend(node.sockets.broadcast.iter());
|
udp_sockets.extend(node.sockets.broadcast.iter());
|
||||||
|
Reference in New Issue
Block a user