separates out ClusterInfo::{gossip,listen} thread-pools (#12535)
https://github.com/solana-labs/solana/pull/12402 moved gossip-work threads: https://github.com/solana-labs/solana/blob/afd9bfc45/core/src/cluster_info.rs#L2330-L2334 to ClusterInfo::new as a new field in the ClusterInfo struct: https://github.com/solana-labs/solana/blob/35208c5ee/core/src/cluster_info.rs#L249 So that they can be shared between listen and gossip threads: https://github.com/solana-labs/solana/blob/afd9bfc45/core/src/gossip_service.rs#L54-L67 However, in testing https://github.com/solana-labs/solana/pull/12360 it turned out this will cause breakage: https://buildkite.com/solana-labs/solana/builds/31646 https://buildkite.com/solana-labs/solana/builds/31651 https://buildkite.com/solana-labs/solana/builds/31655 Whereas with separate thread pools all is good. It might be the case that one thread is slowing down the other by exhausting the thread-pool whereas with separate thread-pools we get fair scheduling guarantees from the os. This commit reverts https://github.com/solana-labs/solana/pull/12402 and instead adds separate thread-pools for listen and gossip threads: https://github.com/solana-labs/solana/blob/afd9bfc45/core/src/gossip_service.rs#L54-L67
This commit is contained in:
@ -246,7 +246,6 @@ struct GossipStats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct ClusterInfo {
|
pub struct ClusterInfo {
|
||||||
thread_pool: ThreadPool,
|
|
||||||
/// The network
|
/// The network
|
||||||
pub gossip: RwLock<CrdsGossip>,
|
pub gossip: RwLock<CrdsGossip>,
|
||||||
/// set the keypair that will be used to sign crds values generated. It is unset only in tests.
|
/// set the keypair that will be used to sign crds values generated. It is unset only in tests.
|
||||||
@ -404,11 +403,6 @@ impl ClusterInfo {
|
|||||||
pub fn new(contact_info: ContactInfo, keypair: Arc<Keypair>) -> Self {
|
pub fn new(contact_info: ContactInfo, keypair: Arc<Keypair>) -> Self {
|
||||||
let id = contact_info.id;
|
let id = contact_info.id;
|
||||||
let me = Self {
|
let me = Self {
|
||||||
thread_pool: ThreadPoolBuilder::new()
|
|
||||||
.num_threads(get_thread_count().min(8))
|
|
||||||
.thread_name(|i| format!("sol-gossip-work-{}", i))
|
|
||||||
.build()
|
|
||||||
.unwrap(),
|
|
||||||
gossip: RwLock::new(CrdsGossip::default()),
|
gossip: RwLock::new(CrdsGossip::default()),
|
||||||
keypair,
|
keypair,
|
||||||
entrypoint: RwLock::new(None),
|
entrypoint: RwLock::new(None),
|
||||||
@ -438,11 +432,6 @@ impl ClusterInfo {
|
|||||||
let mut my_contact_info = self.my_contact_info.read().unwrap().clone();
|
let mut my_contact_info = self.my_contact_info.read().unwrap().clone();
|
||||||
my_contact_info.id = *new_id;
|
my_contact_info.id = *new_id;
|
||||||
ClusterInfo {
|
ClusterInfo {
|
||||||
thread_pool: ThreadPoolBuilder::new()
|
|
||||||
.num_threads(get_thread_count().min(2))
|
|
||||||
.thread_name(|i| format!("sol-gossip-work-{}", i))
|
|
||||||
.build()
|
|
||||||
.unwrap(),
|
|
||||||
gossip: RwLock::new(gossip),
|
gossip: RwLock::new(gossip),
|
||||||
keypair: self.keypair.clone(),
|
keypair: self.keypair.clone(),
|
||||||
entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()),
|
entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()),
|
||||||
@ -1395,6 +1384,7 @@ impl ClusterInfo {
|
|||||||
|
|
||||||
fn new_pull_requests(
|
fn new_pull_requests(
|
||||||
&self,
|
&self,
|
||||||
|
_thread_pool: &ThreadPool,
|
||||||
gossip_validators: Option<&HashSet<Pubkey>>,
|
gossip_validators: Option<&HashSet<Pubkey>>,
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
) -> Vec<(SocketAddr, Protocol)> {
|
) -> Vec<(SocketAddr, Protocol)> {
|
||||||
@ -1464,12 +1454,13 @@ impl ClusterInfo {
|
|||||||
// Generate new push and pull requests
|
// Generate new push and pull requests
|
||||||
fn generate_new_gossip_requests(
|
fn generate_new_gossip_requests(
|
||||||
&self,
|
&self,
|
||||||
|
thread_pool: &ThreadPool,
|
||||||
gossip_validators: Option<&HashSet<Pubkey>>,
|
gossip_validators: Option<&HashSet<Pubkey>>,
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
generate_pull_requests: bool,
|
generate_pull_requests: bool,
|
||||||
) -> Vec<(SocketAddr, Protocol)> {
|
) -> Vec<(SocketAddr, Protocol)> {
|
||||||
let mut pulls: Vec<_> = if generate_pull_requests {
|
let mut pulls: Vec<_> = if generate_pull_requests {
|
||||||
self.new_pull_requests(gossip_validators, stakes)
|
self.new_pull_requests(&thread_pool, gossip_validators, stakes)
|
||||||
} else {
|
} else {
|
||||||
vec![]
|
vec![]
|
||||||
};
|
};
|
||||||
@ -1482,14 +1473,19 @@ impl ClusterInfo {
|
|||||||
/// At random pick a node and try to get updated changes from them
|
/// At random pick a node and try to get updated changes from them
|
||||||
fn run_gossip(
|
fn run_gossip(
|
||||||
&self,
|
&self,
|
||||||
|
thread_pool: &ThreadPool,
|
||||||
gossip_validators: Option<&HashSet<Pubkey>>,
|
gossip_validators: Option<&HashSet<Pubkey>>,
|
||||||
recycler: &PacketsRecycler,
|
recycler: &PacketsRecycler,
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
sender: &PacketSender,
|
sender: &PacketSender,
|
||||||
generate_pull_requests: bool,
|
generate_pull_requests: bool,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let reqs =
|
let reqs = self.generate_new_gossip_requests(
|
||||||
self.generate_new_gossip_requests(gossip_validators, &stakes, generate_pull_requests);
|
thread_pool,
|
||||||
|
gossip_validators,
|
||||||
|
&stakes,
|
||||||
|
generate_pull_requests,
|
||||||
|
);
|
||||||
if !reqs.is_empty() {
|
if !reqs.is_empty() {
|
||||||
let packets = to_packets_with_destination(recycler.clone(), &reqs);
|
let packets = to_packets_with_destination(recycler.clone(), &reqs);
|
||||||
sender.send(packets)?;
|
sender.send(packets)?;
|
||||||
@ -1562,6 +1558,11 @@ impl ClusterInfo {
|
|||||||
exit: &Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
let exit = exit.clone();
|
let exit = exit.clone();
|
||||||
|
let thread_pool = ThreadPoolBuilder::new()
|
||||||
|
.num_threads(std::cmp::min(get_thread_count(), 8))
|
||||||
|
.thread_name(|i| format!("ClusterInfo::gossip-{}", i))
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-gossip".to_string())
|
.name("solana-gossip".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
@ -1590,6 +1591,7 @@ impl ClusterInfo {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.run_gossip(
|
let _ = self.run_gossip(
|
||||||
|
&thread_pool,
|
||||||
gossip_validators.as_ref(),
|
gossip_validators.as_ref(),
|
||||||
&recycler,
|
&recycler,
|
||||||
&stakes,
|
&stakes,
|
||||||
@ -2093,13 +2095,14 @@ impl ClusterInfo {
|
|||||||
fn process_packets(
|
fn process_packets(
|
||||||
&self,
|
&self,
|
||||||
requests: Vec<Packets>,
|
requests: Vec<Packets>,
|
||||||
|
thread_pool: &ThreadPool,
|
||||||
recycler: &PacketsRecycler,
|
recycler: &PacketsRecycler,
|
||||||
response_sender: &PacketSender,
|
response_sender: &PacketSender,
|
||||||
stakes: HashMap<Pubkey, u64>,
|
stakes: HashMap<Pubkey, u64>,
|
||||||
epoch_time_ms: u64,
|
epoch_time_ms: u64,
|
||||||
) {
|
) {
|
||||||
let sender = response_sender.clone();
|
let sender = response_sender.clone();
|
||||||
self.thread_pool.install(|| {
|
thread_pool.install(|| {
|
||||||
requests.into_par_iter().for_each_with(sender, |s, reqs| {
|
requests.into_par_iter().for_each_with(sender, |s, reqs| {
|
||||||
self.handle_packets(&recycler, &stakes, reqs, s, epoch_time_ms)
|
self.handle_packets(&recycler, &stakes, reqs, s, epoch_time_ms)
|
||||||
});
|
});
|
||||||
@ -2113,6 +2116,7 @@ impl ClusterInfo {
|
|||||||
bank_forks: Option<&Arc<RwLock<BankForks>>>,
|
bank_forks: Option<&Arc<RwLock<BankForks>>>,
|
||||||
requests_receiver: &PacketReceiver,
|
requests_receiver: &PacketReceiver,
|
||||||
response_sender: &PacketSender,
|
response_sender: &PacketSender,
|
||||||
|
thread_pool: &ThreadPool,
|
||||||
last_print: &mut Instant,
|
last_print: &mut Instant,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timeout = Duration::new(1, 0);
|
let timeout = Duration::new(1, 0);
|
||||||
@ -2135,7 +2139,14 @@ impl ClusterInfo {
|
|||||||
|
|
||||||
let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks);
|
let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks);
|
||||||
|
|
||||||
self.process_packets(requests, recycler, response_sender, stakes, epoch_time_ms);
|
self.process_packets(
|
||||||
|
requests,
|
||||||
|
thread_pool,
|
||||||
|
recycler,
|
||||||
|
response_sender,
|
||||||
|
stakes,
|
||||||
|
epoch_time_ms,
|
||||||
|
);
|
||||||
|
|
||||||
self.print_reset_stats(last_print);
|
self.print_reset_stats(last_print);
|
||||||
|
|
||||||
@ -2344,6 +2355,11 @@ impl ClusterInfo {
|
|||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-listen".to_string())
|
.name("solana-listen".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
|
let thread_pool = ThreadPoolBuilder::new()
|
||||||
|
.num_threads(std::cmp::min(get_thread_count(), 8))
|
||||||
|
.thread_name(|i| format!("sol-gossip-work-{}", i))
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
let mut last_print = Instant::now();
|
let mut last_print = Instant::now();
|
||||||
loop {
|
loop {
|
||||||
let e = self.run_listen(
|
let e = self.run_listen(
|
||||||
@ -2351,6 +2367,7 @@ impl ClusterInfo {
|
|||||||
bank_forks.as_ref(),
|
bank_forks.as_ref(),
|
||||||
&requests_receiver,
|
&requests_receiver,
|
||||||
&response_sender,
|
&response_sender,
|
||||||
|
&thread_pool,
|
||||||
&mut last_print,
|
&mut last_print,
|
||||||
);
|
);
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
@ -2725,6 +2742,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_cluster_spy_gossip() {
|
fn test_cluster_spy_gossip() {
|
||||||
|
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
||||||
//check that gossip doesn't try to push to invalid addresses
|
//check that gossip doesn't try to push to invalid addresses
|
||||||
let node = Node::new_localhost();
|
let node = Node::new_localhost();
|
||||||
let (spy, _, _) = ClusterInfo::spy_node(&Pubkey::new_rand(), 0);
|
let (spy, _, _) = ClusterInfo::spy_node(&Pubkey::new_rand(), 0);
|
||||||
@ -2735,7 +2753,8 @@ mod tests {
|
|||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.refresh_push_active_set(&HashMap::new(), None);
|
.refresh_push_active_set(&HashMap::new(), None);
|
||||||
let reqs = cluster_info.generate_new_gossip_requests(None, &HashMap::new(), true);
|
let reqs =
|
||||||
|
cluster_info.generate_new_gossip_requests(&thread_pool, None, &HashMap::new(), true);
|
||||||
//assert none of the addrs are invalid.
|
//assert none of the addrs are invalid.
|
||||||
reqs.iter().all(|(addr, _)| {
|
reqs.iter().all(|(addr, _)| {
|
||||||
let res = ContactInfo::is_valid_address(addr);
|
let res = ContactInfo::is_valid_address(addr);
|
||||||
@ -3101,6 +3120,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_append_entrypoint_to_pulls() {
|
fn test_append_entrypoint_to_pulls() {
|
||||||
|
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
||||||
let node_keypair = Arc::new(Keypair::new());
|
let node_keypair = Arc::new(Keypair::new());
|
||||||
let cluster_info = ClusterInfo::new(
|
let cluster_info = ClusterInfo::new(
|
||||||
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
|
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
|
||||||
@ -3109,7 +3129,7 @@ mod tests {
|
|||||||
let entrypoint_pubkey = Pubkey::new_rand();
|
let entrypoint_pubkey = Pubkey::new_rand();
|
||||||
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
|
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
|
||||||
cluster_info.set_entrypoint(entrypoint.clone());
|
cluster_info.set_entrypoint(entrypoint.clone());
|
||||||
let pulls = cluster_info.new_pull_requests(None, &HashMap::new());
|
let pulls = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
|
||||||
assert_eq!(1, pulls.len() as u64);
|
assert_eq!(1, pulls.len() as u64);
|
||||||
match pulls.get(0) {
|
match pulls.get(0) {
|
||||||
Some((addr, msg)) => {
|
Some((addr, msg)) => {
|
||||||
@ -3136,7 +3156,7 @@ mod tests {
|
|||||||
vec![entrypoint_crdsvalue],
|
vec![entrypoint_crdsvalue],
|
||||||
&timeouts,
|
&timeouts,
|
||||||
);
|
);
|
||||||
let pulls = cluster_info.new_pull_requests(None, &HashMap::new());
|
let pulls = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
|
||||||
assert_eq!(1, pulls.len() as u64);
|
assert_eq!(1, pulls.len() as u64);
|
||||||
assert_eq!(*cluster_info.entrypoint.read().unwrap(), Some(entrypoint));
|
assert_eq!(*cluster_info.entrypoint.read().unwrap(), Some(entrypoint));
|
||||||
}
|
}
|
||||||
@ -3259,6 +3279,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_pull_from_entrypoint_if_not_present() {
|
fn test_pull_from_entrypoint_if_not_present() {
|
||||||
|
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
||||||
let node_keypair = Arc::new(Keypair::new());
|
let node_keypair = Arc::new(Keypair::new());
|
||||||
let cluster_info = ClusterInfo::new(
|
let cluster_info = ClusterInfo::new(
|
||||||
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
|
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
|
||||||
@ -3279,7 +3300,7 @@ mod tests {
|
|||||||
|
|
||||||
// Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a
|
// Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a
|
||||||
// fresh timestamp). There should only be one pull request to `other_node`
|
// fresh timestamp). There should only be one pull request to `other_node`
|
||||||
let pulls = cluster_info.new_pull_requests(None, &stakes);
|
let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
|
||||||
assert_eq!(1, pulls.len() as u64);
|
assert_eq!(1, pulls.len() as u64);
|
||||||
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
|
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
|
||||||
|
|
||||||
@ -3292,14 +3313,14 @@ mod tests {
|
|||||||
.as_mut()
|
.as_mut()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.wallclock = 0;
|
.wallclock = 0;
|
||||||
let pulls = cluster_info.new_pull_requests(None, &stakes);
|
let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
|
||||||
assert_eq!(2, pulls.len() as u64);
|
assert_eq!(2, pulls.len() as u64);
|
||||||
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
|
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
|
||||||
assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip);
|
assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip);
|
||||||
|
|
||||||
// Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should
|
// Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should
|
||||||
// only be one pull request to `other_node`
|
// only be one pull request to `other_node`
|
||||||
let pulls = cluster_info.new_pull_requests(None, &stakes);
|
let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
|
||||||
assert_eq!(1, pulls.len() as u64);
|
assert_eq!(1, pulls.len() as u64);
|
||||||
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
|
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user