leader qos part 2: add stage to find sender stake, set to packet meta
This commit is contained in:
@ -1,5 +1,9 @@
|
|||||||
use {
|
use {
|
||||||
crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage},
|
crate::{
|
||||||
|
broadcast_stage::BroadcastStage,
|
||||||
|
find_packet_sender_stake_stage::FindPacketSenderStakeStage,
|
||||||
|
retransmit_stage::RetransmitStage,
|
||||||
|
},
|
||||||
itertools::Itertools,
|
itertools::Itertools,
|
||||||
lru::LruCache,
|
lru::LruCache,
|
||||||
rand::{seq::SliceRandom, Rng, SeedableRng},
|
rand::{seq::SliceRandom, Rng, SeedableRng},
|
||||||
@ -28,7 +32,7 @@ use {
|
|||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
iter::repeat_with,
|
iter::repeat_with,
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
net::SocketAddr,
|
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||||
ops::Deref,
|
ops::Deref,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
@ -313,6 +317,19 @@ impl ClusterNodes<RetransmitStage> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ClusterNodes<FindPacketSenderStakeStage> {
|
||||||
|
pub(crate) fn get_ip_to_stakes(&self) -> HashMap<IpAddr, u64> {
|
||||||
|
self.compat_index
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(_, i)| {
|
||||||
|
let node = &self.nodes[*i];
|
||||||
|
let contact_info = node.contact_info()?;
|
||||||
|
Some((contact_info.tvu.ip(), node.stake))
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn new_cluster_nodes<T: 'static>(
|
pub fn new_cluster_nodes<T: 'static>(
|
||||||
cluster_info: &ClusterInfo,
|
cluster_info: &ClusterInfo,
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
@ -488,9 +505,20 @@ pub fn make_test_cluster<R: Rng>(
|
|||||||
ClusterInfo,
|
ClusterInfo,
|
||||||
) {
|
) {
|
||||||
let (unstaked_numerator, unstaked_denominator) = unstaked_ratio.unwrap_or((1, 7));
|
let (unstaked_numerator, unstaked_denominator) = unstaked_ratio.unwrap_or((1, 7));
|
||||||
let mut nodes: Vec<_> = repeat_with(|| ContactInfo::new_rand(rng, None))
|
let mut ip_addr_octet: usize = 0;
|
||||||
.take(num_nodes)
|
let mut nodes: Vec<_> = repeat_with(|| {
|
||||||
.collect();
|
let mut contact_info = ContactInfo::new_rand(rng, None);
|
||||||
|
contact_info.tvu.set_ip(IpAddr::V4(Ipv4Addr::new(
|
||||||
|
127,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
(ip_addr_octet % 256) as u8,
|
||||||
|
)));
|
||||||
|
ip_addr_octet += 1;
|
||||||
|
contact_info
|
||||||
|
})
|
||||||
|
.take(num_nodes)
|
||||||
|
.collect();
|
||||||
nodes.shuffle(rng);
|
nodes.shuffle(rng);
|
||||||
let this_node = nodes[0].clone();
|
let this_node = nodes[0].clone();
|
||||||
let mut stakes: HashMap<Pubkey, u64> = nodes
|
let mut stakes: HashMap<Pubkey, u64> = nodes
|
||||||
@ -685,4 +713,35 @@ mod tests {
|
|||||||
assert_eq!(*peer, peers[index]);
|
assert_eq!(*peer, peers[index]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_cluster_nodes_transaction_weight() {
|
||||||
|
solana_logger::setup();
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 14, None);
|
||||||
|
let cluster_nodes = new_cluster_nodes::<FindPacketSenderStakeStage>(&cluster_info, &stakes);
|
||||||
|
|
||||||
|
// All nodes with contact-info should be in the index.
|
||||||
|
assert_eq!(cluster_nodes.compat_index.len(), nodes.len());
|
||||||
|
// Staked nodes with no contact-info should be included.
|
||||||
|
assert!(cluster_nodes.nodes.len() > nodes.len());
|
||||||
|
|
||||||
|
let ip_to_stake = cluster_nodes.get_ip_to_stakes();
|
||||||
|
|
||||||
|
// Only staked nodes with contact_info should be in the ip_to_stake
|
||||||
|
let stacked_nodes_with_contact_info: HashMap<_, _> = stakes
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(pubkey, stake)| {
|
||||||
|
let node = nodes.iter().find(|node| node.id == *pubkey)?;
|
||||||
|
Some((node.tvu.ip(), stake))
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
ip_to_stake.iter().for_each(|(ip, stake)| {
|
||||||
|
// ignoring the 0 staked, because non-stacked nodes are defaulted into 0 stake.
|
||||||
|
if *stake > 0 {
|
||||||
|
let expected_stake = stacked_nodes_with_contact_info.get(ip).unwrap();
|
||||||
|
assert_eq!(stake, *expected_stake);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
97
core/src/find_packet_sender_stake_stage.rs
Normal file
97
core/src/find_packet_sender_stake_stage.rs
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
use {
|
||||||
|
crate::cluster_nodes::ClusterNodesCache,
|
||||||
|
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
|
||||||
|
rayon::{prelude::*, ThreadPool},
|
||||||
|
solana_gossip::cluster_info::ClusterInfo,
|
||||||
|
solana_perf::packet::PacketBatch,
|
||||||
|
solana_rayon_threadlimit::get_thread_count,
|
||||||
|
solana_runtime::bank_forks::BankForks,
|
||||||
|
solana_streamer::streamer::{self, StreamerError},
|
||||||
|
std::{
|
||||||
|
cell::RefCell,
|
||||||
|
collections::HashMap,
|
||||||
|
net::IpAddr,
|
||||||
|
sync::{Arc, RwLock},
|
||||||
|
thread::{self, Builder, JoinHandle},
|
||||||
|
time::{Duration, Instant},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
|
||||||
|
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
|
||||||
|
const STAKES_REFRESH_PERIOD_IN_MS: u128 = 1000;
|
||||||
|
|
||||||
|
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
|
||||||
|
.num_threads(get_thread_count())
|
||||||
|
.thread_name(|ix| format!("transaction_sender_stake_stage_{}", ix))
|
||||||
|
.build()
|
||||||
|
.unwrap()));
|
||||||
|
|
||||||
|
pub struct FindPacketSenderStakeStage {
|
||||||
|
thread_hdl: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FindPacketSenderStakeStage {
|
||||||
|
pub fn new(
|
||||||
|
packet_receiver: Receiver<PacketBatch>,
|
||||||
|
sender: Sender<Vec<PacketBatch>>,
|
||||||
|
bank_forks: Arc<RwLock<BankForks>>,
|
||||||
|
cluster_info: Arc<ClusterInfo>,
|
||||||
|
) -> Self {
|
||||||
|
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<FindPacketSenderStakeStage>::new(
|
||||||
|
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
|
||||||
|
CLUSTER_NODES_CACHE_TTL,
|
||||||
|
));
|
||||||
|
let thread_hdl = Builder::new()
|
||||||
|
.name("sol-tx-sender_stake".to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
let mut last_stakes = Instant::now();
|
||||||
|
let mut ip_to_stake: HashMap<IpAddr, u64> = HashMap::new();
|
||||||
|
loop {
|
||||||
|
if last_stakes.elapsed().as_millis() > STAKES_REFRESH_PERIOD_IN_MS {
|
||||||
|
let (root_bank, working_bank) = {
|
||||||
|
let bank_forks = bank_forks.read().unwrap();
|
||||||
|
(bank_forks.root_bank(), bank_forks.working_bank())
|
||||||
|
};
|
||||||
|
ip_to_stake = cluster_nodes_cache
|
||||||
|
.get(root_bank.slot(), &root_bank, &working_bank, &cluster_info)
|
||||||
|
.get_ip_to_stakes();
|
||||||
|
last_stakes = Instant::now();
|
||||||
|
}
|
||||||
|
match streamer::recv_packet_batches(&packet_receiver) {
|
||||||
|
Ok((mut batches, _num_packets, _recv_duration)) => {
|
||||||
|
Self::apply_sender_stakes(&mut batches, &ip_to_stake);
|
||||||
|
if let Err(e) = sender.send(batches) {
|
||||||
|
info!("Sender error: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => match e {
|
||||||
|
StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
|
||||||
|
StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
|
||||||
|
_ => error!("error: {:?}", e),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
Self { thread_hdl }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn apply_sender_stakes(batches: &mut [PacketBatch], ip_to_stake: &HashMap<IpAddr, u64>) {
|
||||||
|
PAR_THREAD_POOL.with(|thread_pool| {
|
||||||
|
thread_pool.borrow().install(|| {
|
||||||
|
batches
|
||||||
|
.into_par_iter()
|
||||||
|
.flat_map(|batch| batch.packets.par_iter_mut())
|
||||||
|
.for_each(|packet| {
|
||||||
|
packet.meta.sender_stake =
|
||||||
|
*ip_to_stake.get(&packet.meta.addr().ip()).unwrap_or(&0);
|
||||||
|
});
|
||||||
|
})
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn join(self) -> thread::Result<()> {
|
||||||
|
self.thread_hdl.join()
|
||||||
|
}
|
||||||
|
}
|
@ -24,6 +24,7 @@ pub mod cost_update_service;
|
|||||||
pub mod drop_bank_service;
|
pub mod drop_bank_service;
|
||||||
pub mod duplicate_repair_status;
|
pub mod duplicate_repair_status;
|
||||||
pub mod fetch_stage;
|
pub mod fetch_stage;
|
||||||
|
pub mod find_packet_sender_stake_stage;
|
||||||
pub mod fork_choice;
|
pub mod fork_choice;
|
||||||
pub mod gen_keys;
|
pub mod gen_keys;
|
||||||
pub mod heaviest_subtree_fork_choice;
|
pub mod heaviest_subtree_fork_choice;
|
||||||
@ -61,7 +62,6 @@ pub mod system_monitor_service;
|
|||||||
mod tower1_7_14;
|
mod tower1_7_14;
|
||||||
pub mod tower_storage;
|
pub mod tower_storage;
|
||||||
pub mod tpu;
|
pub mod tpu;
|
||||||
pub mod transaction_weighting_stage;
|
|
||||||
pub mod tree_diff;
|
pub mod tree_diff;
|
||||||
pub mod tvu;
|
pub mod tvu;
|
||||||
pub mod unfrozen_gossip_verified_vote_hashes;
|
pub mod unfrozen_gossip_verified_vote_hashes;
|
||||||
|
@ -445,7 +445,7 @@ mod tests {
|
|||||||
for _ in 0..batches.len() {
|
for _ in 0..batches.len() {
|
||||||
if let Some(batch) = batches.pop() {
|
if let Some(batch) = batches.pop() {
|
||||||
sent_len += batch.packets.len();
|
sent_len += batch.packets.len();
|
||||||
packet_s.send(batch).unwrap();
|
packet_s.send(vec![batch]).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let mut received = 0;
|
let mut received = 0;
|
||||||
|
@ -10,9 +10,9 @@ use {
|
|||||||
GossipVerifiedVoteHashSender, VerifiedVoteSender, VoteTracker,
|
GossipVerifiedVoteHashSender, VerifiedVoteSender, VoteTracker,
|
||||||
},
|
},
|
||||||
fetch_stage::FetchStage,
|
fetch_stage::FetchStage,
|
||||||
|
find_packet_sender_stake_stage::FindPacketSenderStakeStage,
|
||||||
sigverify::TransactionSigVerifier,
|
sigverify::TransactionSigVerifier,
|
||||||
sigverify_stage::SigVerifyStage,
|
sigverify_stage::SigVerifyStage,
|
||||||
transaction_weighting_stage::TransactionWeightStage,
|
|
||||||
},
|
},
|
||||||
crossbeam_channel::{unbounded, Receiver},
|
crossbeam_channel::{unbounded, Receiver},
|
||||||
solana_gossip::cluster_info::ClusterInfo,
|
solana_gossip::cluster_info::ClusterInfo,
|
||||||
@ -56,8 +56,8 @@ pub struct Tpu {
|
|||||||
cluster_info_vote_listener: ClusterInfoVoteListener,
|
cluster_info_vote_listener: ClusterInfoVoteListener,
|
||||||
broadcast_stage: BroadcastStage,
|
broadcast_stage: BroadcastStage,
|
||||||
tpu_quic_t: thread::JoinHandle<()>,
|
tpu_quic_t: thread::JoinHandle<()>,
|
||||||
transaction_weight_stage: TransactionWeightStage,
|
transaction_weight_stage: FindPacketSenderStakeStage,
|
||||||
vote_transaction_weight_stage: TransactionWeightStage,
|
vote_transaction_weight_stage: FindPacketSenderStakeStage,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Tpu {
|
impl Tpu {
|
||||||
@ -107,20 +107,20 @@ impl Tpu {
|
|||||||
tpu_coalesce_ms,
|
tpu_coalesce_ms,
|
||||||
);
|
);
|
||||||
|
|
||||||
let (weighted_sender, weighted_receiver) = unbounded();
|
let (weighted_packet_sender, weighted_packet_receiver) = unbounded();
|
||||||
|
|
||||||
let transaction_weight_stage = TransactionWeightStage::new(
|
let transaction_weight_stage = FindPacketSenderStakeStage::new(
|
||||||
packet_receiver,
|
packet_receiver,
|
||||||
weighted_sender,
|
weighted_packet_sender,
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
cluster_info.clone(),
|
cluster_info.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let (vote_weighted_sender, vote_weighted_receiver) = unbounded();
|
let (vote_weighted_packet_sender, vote_weighted_packet_receiver) = unbounded();
|
||||||
|
|
||||||
let vote_transaction_weight_stage = TransactionWeightStage::new(
|
let vote_transaction_weight_stage = FindPacketSenderStakeStage::new(
|
||||||
vote_packet_receiver,
|
vote_packet_receiver,
|
||||||
vote_weighted_sender,
|
vote_weighted_packet_sender,
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
cluster_info.clone(),
|
cluster_info.clone(),
|
||||||
);
|
);
|
||||||
@ -139,7 +139,7 @@ impl Tpu {
|
|||||||
|
|
||||||
let sigverify_stage = {
|
let sigverify_stage = {
|
||||||
let verifier = TransactionSigVerifier::default();
|
let verifier = TransactionSigVerifier::default();
|
||||||
SigVerifyStage::new(weighted_receiver, verified_sender, verifier)
|
SigVerifyStage::new(weighted_packet_receiver, verified_sender, verifier)
|
||||||
};
|
};
|
||||||
|
|
||||||
let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded();
|
let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded();
|
||||||
@ -147,7 +147,7 @@ impl Tpu {
|
|||||||
let vote_sigverify_stage = {
|
let vote_sigverify_stage = {
|
||||||
let verifier = TransactionSigVerifier::new_reject_non_vote();
|
let verifier = TransactionSigVerifier::new_reject_non_vote();
|
||||||
SigVerifyStage::new(
|
SigVerifyStage::new(
|
||||||
vote_weighted_receiver,
|
vote_weighted_packet_receiver,
|
||||||
verified_tpu_vote_packets_sender,
|
verified_tpu_vote_packets_sender,
|
||||||
verifier,
|
verifier,
|
||||||
)
|
)
|
||||||
|
@ -1,77 +0,0 @@
|
|||||||
use {
|
|
||||||
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
|
|
||||||
rayon::prelude::*,
|
|
||||||
solana_gossip::cluster_info::ClusterInfo,
|
|
||||||
solana_perf::packet::PacketBatch,
|
|
||||||
solana_runtime::bank_forks::BankForks,
|
|
||||||
solana_streamer::streamer::{self, StreamerError},
|
|
||||||
std::{
|
|
||||||
collections::HashMap,
|
|
||||||
net::IpAddr,
|
|
||||||
sync::{Arc, RwLock},
|
|
||||||
thread::{self, Builder, JoinHandle},
|
|
||||||
time::Instant,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub struct TransactionWeightStage {
|
|
||||||
thread_hdl: JoinHandle<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TransactionWeightStage {
|
|
||||||
pub fn new(
|
|
||||||
packet_receiver: Receiver<PacketBatch>,
|
|
||||||
sender: Sender<Vec<PacketBatch>>,
|
|
||||||
bank_forks: Arc<RwLock<BankForks>>,
|
|
||||||
cluster_info: Arc<ClusterInfo>,
|
|
||||||
) -> Self {
|
|
||||||
let thread_hdl = Builder::new()
|
|
||||||
.name("sol-tx-weight".to_string())
|
|
||||||
.spawn(move || {
|
|
||||||
let mut last_stakes = Instant::now();
|
|
||||||
let mut ip_to_stake: HashMap<IpAddr, u64> = HashMap::new();
|
|
||||||
loop {
|
|
||||||
if last_stakes.elapsed().as_millis() > 1000 {
|
|
||||||
let root_bank = bank_forks.read().unwrap().root_bank();
|
|
||||||
let staked_nodes = root_bank.staked_nodes();
|
|
||||||
ip_to_stake = cluster_info
|
|
||||||
.tvu_peers()
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|node| {
|
|
||||||
let stake = staked_nodes.get(&node.id)?;
|
|
||||||
Some((node.tvu.ip(), *stake))
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
last_stakes = Instant::now();
|
|
||||||
}
|
|
||||||
match streamer::recv_packet_batches(&packet_receiver) {
|
|
||||||
Ok((mut batches, _num_packets, _recv_duration)) => {
|
|
||||||
Self::apply_weights(&mut batches, &ip_to_stake);
|
|
||||||
if let Err(e) = sender.send(batches) {
|
|
||||||
info!("Sender error: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => match e {
|
|
||||||
StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
|
|
||||||
StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
|
|
||||||
_ => error!("error: {:?}", e),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
Self { thread_hdl }
|
|
||||||
}
|
|
||||||
|
|
||||||
fn apply_weights(batches: &mut [PacketBatch], ip_to_stake: &HashMap<IpAddr, u64>) {
|
|
||||||
batches.into_par_iter().for_each(|batch| {
|
|
||||||
batch.packets.par_iter_mut().for_each(|packet| {
|
|
||||||
packet.meta.weight = *ip_to_stake.get(&packet.meta.addr().ip()).unwrap_or(&0);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn join(self) -> thread::Result<()> {
|
|
||||||
self.thread_hdl.join()
|
|
||||||
}
|
|
||||||
}
|
|
@ -32,7 +32,7 @@ pub struct Meta {
|
|||||||
pub addr: IpAddr,
|
pub addr: IpAddr,
|
||||||
pub port: u16,
|
pub port: u16,
|
||||||
pub flags: PacketFlags,
|
pub flags: PacketFlags,
|
||||||
pub weight: u64,
|
pub sender_stake: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@ -146,7 +146,7 @@ impl Default for Meta {
|
|||||||
addr: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
|
addr: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
|
||||||
port: 0,
|
port: 0,
|
||||||
flags: PacketFlags::empty(),
|
flags: PacketFlags::empty(),
|
||||||
weight: 0,
|
sender_stake: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user