automerge
This commit is contained in:
@ -3,10 +3,13 @@
|
|||||||
extern crate test;
|
extern crate test;
|
||||||
|
|
||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
|
use solana_core::broadcast_stage::{broadcast_shreds, get_broadcast_peers};
|
||||||
use solana_core::cluster_info::{ClusterInfo, Node};
|
use solana_core::cluster_info::{ClusterInfo, Node};
|
||||||
use solana_core::contact_info::ContactInfo;
|
use solana_core::contact_info::ContactInfo;
|
||||||
|
use solana_ledger::shred::Shred;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::timing::timestamp;
|
use solana_sdk::timing::timestamp;
|
||||||
|
use std::sync::RwLock;
|
||||||
use std::{collections::HashMap, net::UdpSocket, sync::Arc, time::Instant};
|
use std::{collections::HashMap, net::UdpSocket, sync::Arc, time::Instant};
|
||||||
use test::Bencher;
|
use test::Bencher;
|
||||||
|
|
||||||
@ -18,10 +21,8 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
|
|||||||
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone());
|
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone());
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
|
||||||
const SHRED_SIZE: usize = 1024;
|
|
||||||
const NUM_SHREDS: usize = 32;
|
const NUM_SHREDS: usize = 32;
|
||||||
let shreds = vec![vec![0; SHRED_SIZE]; NUM_SHREDS];
|
let shreds = vec![Shred::new_empty_data_shred(); NUM_SHREDS];
|
||||||
let seeds = vec![[0u8; 32]; NUM_SHREDS];
|
|
||||||
let mut stakes = HashMap::new();
|
let mut stakes = HashMap::new();
|
||||||
const NUM_PEERS: usize = 200;
|
const NUM_PEERS: usize = 200;
|
||||||
for _ in 0..NUM_PEERS {
|
for _ in 0..NUM_PEERS {
|
||||||
@ -31,15 +32,18 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
|
|||||||
stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64);
|
stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64);
|
||||||
}
|
}
|
||||||
let stakes = Arc::new(stakes);
|
let stakes = Arc::new(stakes);
|
||||||
|
let cluster_info = Arc::new(RwLock::new(cluster_info));
|
||||||
|
let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(stakes.clone()));
|
||||||
|
let shreds = Arc::new(shreds);
|
||||||
bencher.iter(move || {
|
bencher.iter(move || {
|
||||||
let shreds = shreds.clone();
|
let shreds = shreds.clone();
|
||||||
cluster_info
|
broadcast_shreds(
|
||||||
.broadcast_shreds(
|
|
||||||
&socket,
|
&socket,
|
||||||
shreds,
|
&shreds,
|
||||||
&seeds,
|
&peers_and_stakes,
|
||||||
Some(stakes.clone()),
|
&peers,
|
||||||
&mut Instant::now(),
|
&mut Instant::now(),
|
||||||
|
&mut 0,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
});
|
});
|
||||||
|
@ -4,6 +4,9 @@ use self::{
|
|||||||
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
|
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
|
||||||
standard_broadcast_run::StandardBroadcastRun,
|
standard_broadcast_run::StandardBroadcastRun,
|
||||||
};
|
};
|
||||||
|
use crate::contact_info::ContactInfo;
|
||||||
|
use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
|
||||||
|
use crate::weighted_shuffle::weighted_best;
|
||||||
use crate::{
|
use crate::{
|
||||||
cluster_info::{ClusterInfo, ClusterInfoError},
|
cluster_info::{ClusterInfo, ClusterInfoError},
|
||||||
poh_recorder::WorkingBankEntry,
|
poh_recorder::WorkingBankEntry,
|
||||||
@ -14,9 +17,13 @@ use crossbeam_channel::{
|
|||||||
Sender as CrossbeamSender,
|
Sender as CrossbeamSender,
|
||||||
};
|
};
|
||||||
use solana_ledger::{blockstore::Blockstore, shred::Shred, staking_utils};
|
use solana_ledger::{blockstore::Blockstore, shred::Shred, staking_utils};
|
||||||
|
use solana_measure::measure::Measure;
|
||||||
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
|
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
|
use solana_sdk::timing::duration_as_s;
|
||||||
|
use solana_sdk::timing::timestamp;
|
||||||
use solana_sdk::{clock::Slot, pubkey::Pubkey};
|
use solana_sdk::{clock::Slot, pubkey::Pubkey};
|
||||||
|
use solana_streamer::sendmmsg::send_mmsg;
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
net::UdpSocket,
|
net::UdpSocket,
|
||||||
@ -328,6 +335,84 @@ impl BroadcastStage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_peer_stats(num_live_peers: i64, broadcast_len: i64, last_datapoint_submit: &mut Instant) {
|
||||||
|
if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 {
|
||||||
|
datapoint_info!(
|
||||||
|
"cluster_info-num_nodes",
|
||||||
|
("live_count", num_live_peers, i64),
|
||||||
|
("broadcast_count", broadcast_len, i64)
|
||||||
|
);
|
||||||
|
*last_datapoint_submit = Instant::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_broadcast_peers<S: std::hash::BuildHasher>(
|
||||||
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
|
stakes: Option<Arc<HashMap<Pubkey, u64, S>>>,
|
||||||
|
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
|
||||||
|
use crate::cluster_info;
|
||||||
|
let mut peers = cluster_info.read().unwrap().tvu_peers();
|
||||||
|
let peers_and_stakes = cluster_info::stake_weight_peers(&mut peers, stakes);
|
||||||
|
(peers, peers_and_stakes)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// broadcast messages from the leader to layer 1 nodes
|
||||||
|
/// # Remarks
|
||||||
|
pub fn broadcast_shreds(
|
||||||
|
s: &UdpSocket,
|
||||||
|
shreds: &Arc<Vec<Shred>>,
|
||||||
|
peers_and_stakes: &[(u64, usize)],
|
||||||
|
peers: &[ContactInfo],
|
||||||
|
last_datapoint_submit: &mut Instant,
|
||||||
|
send_mmsg_total: &mut u64,
|
||||||
|
) -> Result<()> {
|
||||||
|
let broadcast_len = peers_and_stakes.len();
|
||||||
|
if broadcast_len == 0 {
|
||||||
|
update_peer_stats(1, 1, last_datapoint_submit);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
let packets: Vec<_> = shreds
|
||||||
|
.iter()
|
||||||
|
.map(|shred| {
|
||||||
|
let broadcast_index = weighted_best(&peers_and_stakes, shred.seed());
|
||||||
|
|
||||||
|
(&shred.payload, &peers[broadcast_index].tvu)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut sent = 0;
|
||||||
|
let mut send_mmsg_time = Measure::start("send_mmsg");
|
||||||
|
while sent < packets.len() {
|
||||||
|
match send_mmsg(s, &packets[sent..]) {
|
||||||
|
Ok(n) => sent += n,
|
||||||
|
Err(e) => {
|
||||||
|
return Err(Error::IO(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
send_mmsg_time.stop();
|
||||||
|
*send_mmsg_total += send_mmsg_time.as_us();
|
||||||
|
|
||||||
|
let num_live_peers = num_live_peers(&peers);
|
||||||
|
update_peer_stats(
|
||||||
|
num_live_peers,
|
||||||
|
broadcast_len as i64 + 1,
|
||||||
|
last_datapoint_submit,
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn num_live_peers(peers: &[ContactInfo]) -> i64 {
|
||||||
|
let mut num_live_peers = 1i64;
|
||||||
|
peers.iter().for_each(|p| {
|
||||||
|
// A peer is considered live if they generated their contact info recently
|
||||||
|
if timestamp() - p.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS {
|
||||||
|
num_live_peers += 1;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
num_live_peers
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod test {
|
pub mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -78,16 +78,19 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
|||||||
sock: &UdpSocket,
|
sock: &UdpSocket,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (stakes, shreds) = receiver.lock().unwrap().recv()?;
|
let (stakes, shreds) = receiver.lock().unwrap().recv()?;
|
||||||
let all_seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect();
|
|
||||||
// Broadcast data
|
// Broadcast data
|
||||||
let all_shred_bufs: Vec<Vec<u8>> = shreds.to_vec().into_iter().map(|s| s.payload).collect();
|
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
|
||||||
cluster_info.read().unwrap().broadcast_shreds(
|
|
||||||
|
let mut send_mmsg_total = 0;
|
||||||
|
broadcast_shreds(
|
||||||
sock,
|
sock,
|
||||||
all_shred_bufs,
|
&shreds,
|
||||||
&all_seeds,
|
&peers_and_stakes,
|
||||||
stakes,
|
&peers,
|
||||||
&mut Instant::now(),
|
&mut Instant::now(),
|
||||||
|
&mut send_mmsg_total,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
fn record(
|
fn record(
|
||||||
|
@ -17,20 +17,17 @@ struct BroadcastStats {
|
|||||||
broadcast_elapsed: u64,
|
broadcast_elapsed: u64,
|
||||||
receive_elapsed: u64,
|
receive_elapsed: u64,
|
||||||
seed_elapsed: u64,
|
seed_elapsed: u64,
|
||||||
|
send_mmsg_elapsed: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BroadcastStats {
|
impl BroadcastStats {
|
||||||
fn reset(&mut self) {
|
fn reset(&mut self) {
|
||||||
self.insert_shreds_elapsed = 0;
|
*self = Self::default();
|
||||||
self.shredding_elapsed = 0;
|
|
||||||
self.broadcast_elapsed = 0;
|
|
||||||
self.receive_elapsed = 0;
|
|
||||||
self.seed_elapsed = 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub(super) struct StandardBroadcastRun {
|
pub struct StandardBroadcastRun {
|
||||||
stats: Arc<RwLock<BroadcastStats>>,
|
stats: Arc<RwLock<BroadcastStats>>,
|
||||||
unfinished_slot: Option<UnfinishedSlotInfo>,
|
unfinished_slot: Option<UnfinishedSlotInfo>,
|
||||||
current_slot_and_parent: Option<(u64, u64)>,
|
current_slot_and_parent: Option<(u64, u64)>,
|
||||||
@ -258,20 +255,22 @@ impl StandardBroadcastRun {
|
|||||||
shreds: Arc<Vec<Shred>>,
|
shreds: Arc<Vec<Shred>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let seed_start = Instant::now();
|
let seed_start = Instant::now();
|
||||||
let seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect();
|
|
||||||
let seed_elapsed = seed_start.elapsed();
|
let seed_elapsed = seed_start.elapsed();
|
||||||
|
|
||||||
// Broadcast the shreds
|
// Broadcast the shreds
|
||||||
let broadcast_start = Instant::now();
|
let broadcast_start = Instant::now();
|
||||||
let shred_bufs: Vec<Vec<u8>> = shreds.to_vec().into_iter().map(|s| s.payload).collect();
|
trace!("Broadcasting {:?} shreds", shreds.len());
|
||||||
trace!("Broadcasting {:?} shreds", shred_bufs.len());
|
|
||||||
|
|
||||||
cluster_info.read().unwrap().broadcast_shreds(
|
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
|
||||||
|
|
||||||
|
let mut send_mmsg_total = 0;
|
||||||
|
broadcast_shreds(
|
||||||
sock,
|
sock,
|
||||||
shred_bufs,
|
&shreds,
|
||||||
&seeds,
|
&peers_and_stakes,
|
||||||
stakes,
|
&peers,
|
||||||
&mut self.last_datapoint_submit,
|
&mut self.last_datapoint_submit,
|
||||||
|
&mut send_mmsg_total,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let broadcast_elapsed = broadcast_start.elapsed();
|
let broadcast_elapsed = broadcast_start.elapsed();
|
||||||
@ -279,6 +278,7 @@ impl StandardBroadcastRun {
|
|||||||
self.update_broadcast_stats(BroadcastStats {
|
self.update_broadcast_stats(BroadcastStats {
|
||||||
broadcast_elapsed: duration_as_us(&broadcast_elapsed),
|
broadcast_elapsed: duration_as_us(&broadcast_elapsed),
|
||||||
seed_elapsed: duration_as_us(&seed_elapsed),
|
seed_elapsed: duration_as_us(&seed_elapsed),
|
||||||
|
send_mmsg_elapsed: send_mmsg_total,
|
||||||
..BroadcastStats::default()
|
..BroadcastStats::default()
|
||||||
});
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -291,6 +291,7 @@ impl StandardBroadcastRun {
|
|||||||
wstats.insert_shreds_elapsed += stats.insert_shreds_elapsed;
|
wstats.insert_shreds_elapsed += stats.insert_shreds_elapsed;
|
||||||
wstats.broadcast_elapsed += stats.broadcast_elapsed;
|
wstats.broadcast_elapsed += stats.broadcast_elapsed;
|
||||||
wstats.seed_elapsed += stats.seed_elapsed;
|
wstats.seed_elapsed += stats.seed_elapsed;
|
||||||
|
wstats.send_mmsg_elapsed += stats.send_mmsg_elapsed;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn report_and_reset_stats(&mut self) {
|
fn report_and_reset_stats(&mut self) {
|
||||||
@ -303,6 +304,7 @@ impl StandardBroadcastRun {
|
|||||||
("insertion_time", stats.insert_shreds_elapsed as i64, i64),
|
("insertion_time", stats.insert_shreds_elapsed as i64, i64),
|
||||||
("broadcast_time", stats.broadcast_elapsed as i64, i64),
|
("broadcast_time", stats.broadcast_elapsed as i64, i64),
|
||||||
("receive_time", stats.receive_elapsed as i64, i64),
|
("receive_time", stats.receive_elapsed as i64, i64),
|
||||||
|
("send_mmsg", stats.send_mmsg_elapsed as i64, i64),
|
||||||
("seed", stats.seed_elapsed as i64, i64),
|
("seed", stats.seed_elapsed as i64, i64),
|
||||||
(
|
(
|
||||||
"num_shreds",
|
"num_shreds",
|
||||||
|
@ -22,7 +22,7 @@ use crate::{
|
|||||||
},
|
},
|
||||||
epoch_slots::EpochSlots,
|
epoch_slots::EpochSlots,
|
||||||
result::{Error, Result},
|
result::{Error, Result},
|
||||||
weighted_shuffle::{weighted_best, weighted_shuffle},
|
weighted_shuffle::weighted_shuffle,
|
||||||
};
|
};
|
||||||
use bincode::{serialize, serialized_size};
|
use bincode::{serialize, serialized_size};
|
||||||
use core::cmp;
|
use core::cmp;
|
||||||
@ -43,7 +43,6 @@ use solana_perf::packet::{
|
|||||||
};
|
};
|
||||||
use solana_rayon_threadlimit::get_thread_count;
|
use solana_rayon_threadlimit::get_thread_count;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::timing::duration_as_s;
|
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH},
|
clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH},
|
||||||
pubkey::Pubkey,
|
pubkey::Pubkey,
|
||||||
@ -51,7 +50,7 @@ use solana_sdk::{
|
|||||||
timing::{duration_as_ms, timestamp},
|
timing::{duration_as_ms, timestamp},
|
||||||
transaction::Transaction,
|
transaction::Transaction,
|
||||||
};
|
};
|
||||||
use solana_streamer::sendmmsg::{multicast, send_mmsg};
|
use solana_streamer::sendmmsg::multicast;
|
||||||
use solana_streamer::streamer::{PacketReceiver, PacketSender};
|
use solana_streamer::streamer::{PacketReceiver, PacketSender};
|
||||||
use std::{
|
use std::{
|
||||||
borrow::Cow,
|
borrow::Cow,
|
||||||
@ -934,77 +933,6 @@ impl ClusterInfo {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sorted_tvu_peers_and_stakes(
|
|
||||||
&self,
|
|
||||||
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
|
|
||||||
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
|
|
||||||
let mut peers = self.tvu_peers();
|
|
||||||
peers.dedup();
|
|
||||||
let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes);
|
|
||||||
(peers, stakes_and_index)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// broadcast messages from the leader to layer 1 nodes
|
|
||||||
/// # Remarks
|
|
||||||
pub fn broadcast_shreds(
|
|
||||||
&self,
|
|
||||||
s: &UdpSocket,
|
|
||||||
shreds: Vec<Vec<u8>>,
|
|
||||||
seeds: &[[u8; 32]],
|
|
||||||
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
|
|
||||||
last_datapoint_submit: &mut Instant,
|
|
||||||
) -> Result<()> {
|
|
||||||
let (peers, peers_and_stakes) = self.sorted_tvu_peers_and_stakes(stakes);
|
|
||||||
let broadcast_len = peers_and_stakes.len();
|
|
||||||
if broadcast_len == 0 {
|
|
||||||
if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 {
|
|
||||||
datapoint_info!(
|
|
||||||
"cluster_info-num_nodes",
|
|
||||||
("live_count", 1, i64),
|
|
||||||
("broadcast_count", 1, i64)
|
|
||||||
);
|
|
||||||
*last_datapoint_submit = Instant::now();
|
|
||||||
}
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
let mut packets: Vec<_> = shreds
|
|
||||||
.into_iter()
|
|
||||||
.zip(seeds)
|
|
||||||
.map(|(shred, seed)| {
|
|
||||||
let broadcast_index = weighted_best(&peers_and_stakes, *seed);
|
|
||||||
|
|
||||||
(shred, &peers[broadcast_index].tvu)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let mut sent = 0;
|
|
||||||
while sent < packets.len() {
|
|
||||||
match send_mmsg(s, &mut packets[sent..]) {
|
|
||||||
Ok(n) => sent += n,
|
|
||||||
Err(e) => {
|
|
||||||
return Err(Error::IO(e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut num_live_peers = 1i64;
|
|
||||||
peers.iter().for_each(|p| {
|
|
||||||
// A peer is considered live if they generated their contact info recently
|
|
||||||
if timestamp() - p.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS {
|
|
||||||
num_live_peers += 1;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 {
|
|
||||||
datapoint_info!(
|
|
||||||
"cluster_info-num_nodes",
|
|
||||||
("live_count", num_live_peers, i64),
|
|
||||||
("broadcast_count", broadcast_len + 1, i64)
|
|
||||||
);
|
|
||||||
*last_datapoint_submit = Instant::now();
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// retransmit messages to a list of nodes
|
/// retransmit messages to a list of nodes
|
||||||
/// # Remarks
|
/// # Remarks
|
||||||
/// We need to avoid having obj locked while doing a io, such as the `send_to`
|
/// We need to avoid having obj locked while doing a io, such as the `send_to`
|
||||||
@ -1942,6 +1870,14 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn stake_weight_peers<S: std::hash::BuildHasher>(
|
||||||
|
peers: &mut Vec<ContactInfo>,
|
||||||
|
stakes: Option<Arc<HashMap<Pubkey, u64, S>>>,
|
||||||
|
) -> Vec<(u64, usize)> {
|
||||||
|
peers.dedup();
|
||||||
|
ClusterInfo::sorted_stakes_with_index(peers, stakes)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
@ -2480,7 +2416,8 @@ mod tests {
|
|||||||
stakes.insert(id4, 10);
|
stakes.insert(id4, 10);
|
||||||
|
|
||||||
let stakes = Arc::new(stakes);
|
let stakes = Arc::new(stakes);
|
||||||
let (peers, peers_and_stakes) = cluster_info.sorted_tvu_peers_and_stakes(Some(stakes));
|
let mut peers = cluster_info.tvu_peers();
|
||||||
|
let peers_and_stakes = stake_weight_peers(&mut peers, Some(stakes));
|
||||||
assert_eq!(peers.len(), 2);
|
assert_eq!(peers.len(), 2);
|
||||||
assert_eq!(peers[0].id, id);
|
assert_eq!(peers[0].id, id);
|
||||||
assert_eq!(peers[1].id, id2);
|
assert_eq!(peers[1].id, id2);
|
||||||
|
@ -4,7 +4,7 @@ use std::io;
|
|||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
|
|
||||||
#[cfg(not(target_os = "linux"))]
|
#[cfg(not(target_os = "linux"))]
|
||||||
pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec<u8>, &SocketAddr)]) -> io::Result<usize> {
|
pub fn send_mmsg(sock: &UdpSocket, packets: &[(&Vec<u8>, &SocketAddr)]) -> io::Result<usize> {
|
||||||
let count = packets.len();
|
let count = packets.len();
|
||||||
for (p, a) in packets {
|
for (p, a) in packets {
|
||||||
sock.send_to(p, *a)?;
|
sock.send_to(p, *a)?;
|
||||||
@ -18,7 +18,7 @@ use libc::{iovec, mmsghdr, sockaddr_in, sockaddr_in6};
|
|||||||
|
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
fn mmsghdr_for_packet(
|
fn mmsghdr_for_packet(
|
||||||
packet: &mut [u8],
|
packet: &[u8],
|
||||||
dest: &SocketAddr,
|
dest: &SocketAddr,
|
||||||
index: usize,
|
index: usize,
|
||||||
addr_in_len: u32,
|
addr_in_len: u32,
|
||||||
@ -32,7 +32,7 @@ fn mmsghdr_for_packet(
|
|||||||
use std::mem;
|
use std::mem;
|
||||||
|
|
||||||
iovs.push(iovec {
|
iovs.push(iovec {
|
||||||
iov_base: packet.as_mut_ptr() as *mut c_void,
|
iov_base: packet.as_ptr() as *mut c_void,
|
||||||
iov_len: packet.len(),
|
iov_len: packet.len(),
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -57,7 +57,7 @@ fn mmsghdr_for_packet(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec<u8>, &SocketAddr)]) -> io::Result<usize> {
|
pub fn send_mmsg(sock: &UdpSocket, packets: &[(&Vec<u8>, &SocketAddr)]) -> io::Result<usize> {
|
||||||
use libc::{sendmmsg, socklen_t};
|
use libc::{sendmmsg, socklen_t};
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::os::unix::io::AsRawFd;
|
use std::os::unix::io::AsRawFd;
|
||||||
@ -73,7 +73,7 @@ pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec<u8>, &SocketAddr)]) -> io
|
|||||||
let sock_fd = sock.as_raw_fd();
|
let sock_fd = sock.as_raw_fd();
|
||||||
|
|
||||||
let mut hdrs: Vec<mmsghdr> = packets
|
let mut hdrs: Vec<mmsghdr> = packets
|
||||||
.iter_mut()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(i, (packet, dest))| {
|
.map(|(i, (packet, dest))| {
|
||||||
mmsghdr_for_packet(
|
mmsghdr_for_packet(
|
||||||
@ -160,11 +160,10 @@ mod tests {
|
|||||||
let addr = reader.local_addr().unwrap();
|
let addr = reader.local_addr().unwrap();
|
||||||
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
|
||||||
let mut packets: Vec<_> = (0..32)
|
let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
|
||||||
.map(|_| (vec![0u8; PACKET_DATA_SIZE], &addr))
|
let packet_refs: Vec<_> = packets.iter().map(|p| (p, &addr)).collect();
|
||||||
.collect();
|
|
||||||
|
|
||||||
let sent = send_mmsg(&sender, &mut packets).ok();
|
let sent = send_mmsg(&sender, &packet_refs).ok();
|
||||||
assert_eq!(sent, Some(32));
|
assert_eq!(sent, Some(32));
|
||||||
|
|
||||||
let mut packets = vec![Packet::default(); 32];
|
let mut packets = vec![Packet::default(); 32];
|
||||||
@ -182,17 +181,14 @@ mod tests {
|
|||||||
|
|
||||||
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
|
||||||
let mut packets: Vec<_> = (0..32)
|
let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
|
||||||
.map(|i| {
|
let packet_refs: Vec<_> = packets
|
||||||
if i < 16 {
|
.iter()
|
||||||
(vec![0u8; PACKET_DATA_SIZE], &addr)
|
.enumerate()
|
||||||
} else {
|
.map(|(i, p)| if i < 16 { (p, &addr) } else { (p, &addr2) })
|
||||||
(vec![0u8; PACKET_DATA_SIZE], &addr2)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let sent = send_mmsg(&sender, &mut packets).ok();
|
let sent = send_mmsg(&sender, &packet_refs).ok();
|
||||||
assert_eq!(sent, Some(32));
|
assert_eq!(sent, Some(32));
|
||||||
|
|
||||||
let mut packets = vec![Packet::default(); 32];
|
let mut packets = vec![Packet::default(); 32];
|
||||||
|
Reference in New Issue
Block a user