ClusterInfo cleanup (#10504) (#10657)

automerge

Co-authored-by: sakridge <sakridge@gmail.com>
This commit is contained in:
mergify[bot]
2020-06-17 15:28:41 +00:00
committed by GitHub
parent 3c39fee5a8
commit b6f484ddee

View File

@ -1423,13 +1423,13 @@ impl ClusterInfo {
/// At random pick a node and try to get updated changes from them
fn run_gossip(
obj: &Self,
&self,
recycler: &PacketsRecycler,
stakes: &HashMap<Pubkey, u64>,
sender: &PacketSender,
generate_pull_requests: bool,
) -> Result<()> {
let reqs = obj.generate_new_gossip_requests(&stakes, generate_pull_requests);
let reqs = self.generate_new_gossip_requests(&stakes, generate_pull_requests);
if !reqs.is_empty() {
let packets = to_packets_with_destination(recycler.clone(), &reqs);
sender.send(packets)?;
@ -1437,14 +1437,14 @@ impl ClusterInfo {
Ok(())
}
fn handle_adopt_shred_version(obj: &Arc<Self>, adopt_shred_version: &mut bool) {
fn handle_adopt_shred_version(self: &Arc<Self>, adopt_shred_version: &mut bool) {
// Adopt the entrypoint's `shred_version` if ours is unset
if *adopt_shred_version {
// If gossip was given an entrypoint, lookup its id
let entrypoint_id = obj.entrypoint.read().unwrap().as_ref().map(|e| e.id);
let entrypoint_id = self.entrypoint.read().unwrap().as_ref().map(|e| e.id);
if let Some(entrypoint_id) = entrypoint_id {
// If a pull from the entrypoint was successful, it should exist in the crds table
let entrypoint = obj.lookup_contact_info(&entrypoint_id, |ci| ci.clone());
let entrypoint = self.lookup_contact_info(&entrypoint_id, |ci| ci.clone());
if let Some(entrypoint) = entrypoint {
if entrypoint.shred_version == 0 {
info!("Unable to adopt entrypoint's shred version");
@ -1453,13 +1453,13 @@ impl ClusterInfo {
"Setting shred version to {:?} from entrypoint {:?}",
entrypoint.shred_version, entrypoint.id
);
obj.my_contact_info.write().unwrap().shred_version =
self.my_contact_info.write().unwrap().shred_version =
entrypoint.shred_version;
obj.gossip
self.gossip
.write()
.unwrap()
.set_shred_version(entrypoint.shred_version);
obj.insert_self();
self.insert_self();
*adopt_shred_version = false;
}
}
@ -1468,7 +1468,7 @@ impl ClusterInfo {
}
fn handle_purge(
obj: &Arc<Self>,
self: &Arc<Self>,
bank_forks: &Option<Arc<RwLock<BankForks>>>,
stakes: &HashMap<Pubkey, u64>,
) {
@ -1483,12 +1483,12 @@ impl ClusterInfo {
CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
}
};
let timeouts = obj.gossip.read().unwrap().make_timeouts(stakes, timeout);
let num_purged = obj
.time_gossip_write_lock("purge", &obj.stats.purge)
let timeouts = self.gossip.read().unwrap().make_timeouts(stakes, timeout);
let num_purged = self
.time_gossip_write_lock("purge", &self.stats.purge)
.purge(timestamp(), &timeouts);
inc_new_counter_info!("cluster_info-purge-count", num_purged);
let table_size = obj.gossip.read().unwrap().crds.table.len();
let table_size = self.gossip.read().unwrap().crds.table.len();
datapoint_debug!(
"cluster_info-purge",
("table_size", table_size as i64, i64),
@ -1498,7 +1498,7 @@ impl ClusterInfo {
/// randomly pick a node and ask them for updates asynchronously
pub fn gossip(
obj: Arc<Self>,
self: Arc<Self>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
sender: PacketSender,
exit: &Arc<AtomicBool>,
@ -1509,18 +1509,18 @@ impl ClusterInfo {
.spawn(move || {
let mut last_push = timestamp();
let mut last_contact_info_trace = timestamp();
let mut adopt_shred_version = obj.my_shred_version() == 0;
let mut adopt_shred_version = self.my_shred_version() == 0;
let recycler = PacketsRecycler::default();
let message = CrdsData::Version(Version::new(obj.id()));
obj.push_message(CrdsValue::new_signed(message, &obj.keypair));
let message = CrdsData::Version(Version::new(self.id()));
self.push_message(CrdsValue::new_signed(message, &self.keypair));
let mut generate_pull_requests = true;
loop {
let start = timestamp();
thread_mem_usage::datapoint("solana-gossip");
if start - last_contact_info_trace > 10000 {
// Log contact info every 10 seconds
info!("\n{}", obj.contact_info_trace());
info!("\n{}", self.contact_info_trace());
last_contact_info_trace = start;
}
@ -1531,20 +1531,19 @@ impl ClusterInfo {
None => HashMap::new(),
};
let _ =
Self::run_gossip(&obj, &recycler, &stakes, &sender, generate_pull_requests);
let _ = self.run_gossip(&recycler, &stakes, &sender, generate_pull_requests);
if exit.load(Ordering::Relaxed) {
return;
}
Self::handle_purge(&obj, &bank_forks, &stakes);
self.handle_purge(&bank_forks, &stakes);
Self::handle_adopt_shred_version(&obj, &mut adopt_shred_version);
self.handle_adopt_shred_version(&mut adopt_shred_version);
//TODO: possibly tune this parameter
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
//we saw a deadlock passing an self.read().unwrap().timeout into sleep
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
obj.push_self(&stakes);
self.push_self(&stakes);
last_push = timestamp();
}
let elapsed = timestamp() - start;
@ -1560,17 +1559,21 @@ impl ClusterInfo {
#[allow(clippy::cognitive_complexity)]
fn handle_packets(
me: &Self,
&self,
recycler: &PacketsRecycler,
stakes: &HashMap<Pubkey, u64>,
packets: Packets,
response_sender: &PacketSender,
epoch_ms: u64,
epoch_time_ms: u64,
) {
// iter over the packets, collect pulls separately and process everything else
let allocated = thread_mem_usage::Allocatedp::default();
let mut gossip_pull_data: Vec<PullData> = vec![];
let timeouts = me.gossip.read().unwrap().make_timeouts(&stakes, epoch_ms);
let timeouts = self
.gossip
.read()
.unwrap()
.make_timeouts(&stakes, epoch_time_ms);
let mut pull_responses = HashMap::new();
packets.packets.iter().for_each(|packet| {
let from_addr = packet.meta.addr();
@ -1586,12 +1589,12 @@ impl ClusterInfo {
1
);
} else if let Some(contact_info) = caller.contact_info() {
if contact_info.id == me.id() {
if contact_info.id == self.id() {
warn!("PullRequest ignored, I'm talking to myself");
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
} else if contact_info.shred_version == 0
|| contact_info.shred_version == me.my_shred_version()
|| me.my_shred_version() == 0
|| contact_info.shred_version == self.my_shred_version()
|| self.my_shred_version() == 0
{
gossip_pull_data.push(PullData {
from_addr,
@ -1599,7 +1602,7 @@ impl ClusterInfo {
filter,
});
} else {
me.stats.skip_pull_shred_version.add_relaxed(1);
self.stats.skip_pull_shred_version.add_relaxed(1);
}
}
datapoint_debug!(
@ -1638,7 +1641,7 @@ impl ClusterInfo {
}
ret
});
let rsp = Self::handle_push_message(me, recycler, &from, data, stakes);
let rsp = Self::handle_push_message(self, recycler, &from, data, stakes);
if let Some(rsp) = rsp {
let _ignore_disconnect = response_sender.send(rsp);
}
@ -1655,8 +1658,8 @@ impl ClusterInfo {
"cluster_info-prune_message-size",
data.prunes.len()
);
match me
.time_gossip_write_lock("process_prune", &me.stats.process_prune)
match self
.time_gossip_write_lock("process_prune", &self.stats.process_prune)
.process_prune_msg(
&from,
&data.destination,
@ -1684,11 +1687,11 @@ impl ClusterInfo {
});
for (from, data) in pull_responses {
Self::handle_pull_response(me, &from, data, &timeouts);
self.handle_pull_response(&from, data, &timeouts);
}
// process the collected pulls together
let rsp = Self::handle_pull_requests(me, recycler, gossip_pull_data, stakes);
let rsp = self.handle_pull_requests(recycler, gossip_pull_data, stakes);
if let Some(rsp) = rsp {
let _ignore_disconnect = response_sender.send(rsp);
}
@ -1717,7 +1720,7 @@ impl ClusterInfo {
// Pull requests take an incoming bloom filter of contained entries from a node
// and tries to send back to them the values it detects are missing.
fn handle_pull_requests(
me: &Self,
&self,
recycler: &PacketsRecycler,
requests: Vec<PullData>,
stakes: &HashMap<Pubkey, u64>,
@ -1726,19 +1729,22 @@ impl ClusterInfo {
let mut caller_and_filters = vec![];
let mut addrs = vec![];
let mut time = Measure::start("handle_pull_requests");
me.update_data_budget(stakes);
self.update_data_budget(stakes);
for pull_data in requests {
caller_and_filters.push((pull_data.caller, pull_data.filter));
addrs.push(pull_data.from_addr);
}
let now = timestamp();
let self_id = me.id();
let self_id = self.id();
let pull_responses = me
.time_gossip_read_lock("generate_pull_responses", &me.stats.generate_pull_responses)
let pull_responses = self
.time_gossip_read_lock(
"generate_pull_responses",
&self.stats.generate_pull_responses,
)
.generate_pull_responses(&caller_and_filters);
me.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests)
self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests)
.process_pull_requests(caller_and_filters, now);
// Filter bad to addresses
@ -1804,7 +1810,7 @@ impl ClusterInfo {
let protocol = Protocol::PullResponse(self_id, vec![response]);
let new_packet = Packet::from_data(&from_addr, protocol);
{
let mut w_outbound_budget = me.outbound_budget.write().unwrap();
let mut w_outbound_budget = self.outbound_budget.write().unwrap();
if w_outbound_budget.bytes > new_packet.meta.size {
sent.insert(index);
w_outbound_budget.bytes -= new_packet.meta.size;
@ -1837,31 +1843,31 @@ impl ClusterInfo {
// Returns (failed, timeout, success)
fn handle_pull_response(
me: &Self,
&self,
from: &Pubkey,
mut crds_values: Vec<CrdsValue>,
timeouts: &HashMap<Pubkey, u64>,
) -> (usize, usize, usize) {
let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", me.id, from, len);
trace!("PullResponse me: {} from: {} len={}", self.id, from, len);
if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) {
if let Some(shred_version) = self.lookup_contact_info(from, |ci| ci.shred_version) {
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
me.my_shred_version(),
self.my_shred_version(),
);
}
let filtered_len = crds_values.len();
let mut pull_stats = ProcessPullStats::default();
let (filtered_pulls, filtered_pulls_expired_timeout) = me
.time_gossip_read_lock("filter_pull_resp", &me.stats.filter_pull_response)
let (filtered_pulls, filtered_pulls_expired_timeout) = self
.time_gossip_read_lock("filter_pull_resp", &self.stats.filter_pull_response)
.filter_pull_responses(timeouts, crds_values, timestamp(), &mut pull_stats);
if !filtered_pulls.is_empty() || !filtered_pulls_expired_timeout.is_empty() {
me.time_gossip_write_lock("process_pull_resp", &me.stats.process_pull_response)
self.time_gossip_write_lock("process_pull_resp", &self.stats.process_pull_response)
.process_pull_responses(
from,
filtered_pulls,
@ -1871,23 +1877,23 @@ impl ClusterInfo {
);
}
me.stats
self.stats
.skip_pull_response_shred_version
.add_relaxed((len - filtered_len) as u64);
me.stats.process_pull_response_count.add_relaxed(1);
me.stats
self.stats.process_pull_response_count.add_relaxed(1);
self.stats
.process_pull_response_len
.add_relaxed(filtered_len as u64);
me.stats
self.stats
.process_pull_response_timeout
.add_relaxed(pull_stats.timeout_count as u64);
me.stats
self.stats
.process_pull_response_fail_insert
.add_relaxed(pull_stats.failed_insert as u64);
me.stats
self.stats
.process_pull_response_fail_timeout
.add_relaxed(pull_stats.failed_timeout as u64);
me.stats
self.stats
.process_pull_response_success
.add_relaxed(pull_stats.success as u64);
@ -1915,46 +1921,46 @@ impl ClusterInfo {
}
fn handle_push_message(
me: &Self,
&self,
recycler: &PacketsRecycler,
from: &Pubkey,
mut crds_values: Vec<CrdsValue>,
stakes: &HashMap<Pubkey, u64>,
) -> Option<Packets> {
let self_id = me.id();
me.stats.push_message_count.add_relaxed(1);
let self_id = self.id();
self.stats.push_message_count.add_relaxed(1);
let len = crds_values.len();
if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) {
if let Some(shred_version) = self.lookup_contact_info(from, |ci| ci.shred_version) {
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
me.my_shred_version(),
self.my_shred_version(),
);
}
let filtered_len = crds_values.len();
me.stats
self.stats
.push_message_value_count
.add_relaxed(filtered_len as u64);
me.stats
self.stats
.skip_push_message_shred_version
.add_relaxed((len - filtered_len) as u64);
let updated: Vec<_> = me
.time_gossip_write_lock("process_push", &me.stats.process_push_message)
let updated: Vec<_> = self
.time_gossip_write_lock("process_push", &self.stats.process_push_message)
.process_push_message(from, crds_values, timestamp());
let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect();
let prunes_map: HashMap<Pubkey, HashSet<Pubkey>> = me
.time_gossip_write_lock("prune_received_cache", &me.stats.prune_received_cache)
let prunes_map: HashMap<Pubkey, HashSet<Pubkey>> = self
.time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache)
.prune_received_cache(updated_labels, stakes);
let rsp: Vec<_> = prunes_map
.into_iter()
.filter_map(|(from, prune_set)| {
inc_new_counter_debug!("cluster_info-push_message-prunes", prune_set.len());
me.lookup_contact_info(&from, |ci| ci.clone())
self.lookup_contact_info(&from, |ci| ci.clone())
.and_then(|ci| {
let mut prune_msg = PruneData {
pubkey: self_id,
@ -1963,7 +1969,7 @@ impl ClusterInfo {
destination: from,
wallclock: timestamp(),
};
prune_msg.sign(&me.keypair);
prune_msg.sign(&self.keypair);
let rsp = Protocol::PruneMessage(self_id, prune_msg);
Some((ci.gossip, rsp))
})
@ -1973,11 +1979,11 @@ impl ClusterInfo {
return None;
}
let mut packets = to_packets_with_destination(recycler.clone(), &rsp);
me.stats
self.stats
.push_response_count
.add_relaxed(packets.packets.len() as u64);
if !packets.is_empty() {
let pushes: Vec<_> = me.new_push_requests();
let pushes: Vec<_> = self.new_push_requests();
inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len());
pushes.into_iter().for_each(|(remote_gossip_addr, req)| {
if !remote_gossip_addr.ip().is_unspecified() && remote_gossip_addr.port() != 0 {
@ -1993,9 +1999,48 @@ impl ClusterInfo {
}
}
fn get_stakes_and_epoch_time(
bank_forks: Option<&Arc<RwLock<BankForks>>>,
) -> (HashMap<Pubkey, u64>, u64) {
let epoch_time_ms;
let stakes: HashMap<_, _> = match bank_forks {
Some(ref bank_forks) => {
let bank = bank_forks.read().unwrap().working_bank();
let epoch = bank.epoch();
let epoch_schedule = bank.epoch_schedule();
epoch_time_ms = epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT;
staking_utils::staked_nodes(&bank)
}
None => {
inc_new_counter_info!("cluster_info-purge-no_working_bank", 1);
epoch_time_ms = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
HashMap::new()
}
};
(stakes, epoch_time_ms)
}
fn process_packets(
&self,
requests: Vec<Packets>,
thread_pool: &ThreadPool,
recycler: &PacketsRecycler,
response_sender: &PacketSender,
stakes: HashMap<Pubkey, u64>,
epoch_time_ms: u64,
) {
let sender = response_sender.clone();
thread_pool.install(|| {
requests.into_par_iter().for_each_with(sender, |s, reqs| {
self.handle_packets(&recycler, &stakes, reqs, s, epoch_time_ms)
});
});
}
/// Process messages from the network
fn run_listen(
obj: &Self,
&self,
recycler: &PacketsRecycler,
bank_forks: Option<&Arc<RwLock<BankForks>>>,
requests_receiver: &PacketReceiver,
@ -2003,7 +2048,6 @@ impl ClusterInfo {
thread_pool: &ThreadPool,
last_print: &mut Instant,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
let mut requests = vec![requests_receiver.recv_timeout(timeout)?];
let mut num_requests = requests.last().unwrap().packets.len();
@ -2014,35 +2058,26 @@ impl ClusterInfo {
num_requests += more_reqs.packets.len();
requests.push(more_reqs)
}
if num_requests >= MAX_GOSSIP_TRAFFIC {
warn!(
"Too much gossip traffic, ignoring some messages (requests={}, max requests={})",
num_requests, MAX_GOSSIP_TRAFFIC
);
}
let epoch_ms;
let stakes: HashMap<_, _> = match bank_forks {
Some(ref bank_forks) => {
let bank = bank_forks.read().unwrap().working_bank();
let epoch = bank.epoch();
let epoch_schedule = bank.epoch_schedule();
epoch_ms = epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT;
staking_utils::staked_nodes(&bank)
}
None => {
inc_new_counter_info!("cluster_info-purge-no_working_bank", 1);
epoch_ms = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
HashMap::new()
}
};
let sender = response_sender.clone();
thread_pool.install(|| {
requests.into_par_iter().for_each_with(sender, |s, reqs| {
Self::handle_packets(obj, &recycler, &stakes, reqs, s, epoch_ms)
});
});
Self::print_reset_stats(obj, last_print);
let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks);
self.process_packets(
requests,
thread_pool,
recycler,
response_sender,
stakes,
epoch_time_ms,
);
self.print_reset_stats(last_print);
Ok(())
}
@ -2214,7 +2249,7 @@ impl ClusterInfo {
}
pub fn listen(
me: Arc<Self>,
self: Arc<Self>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
requests_receiver: PacketReceiver,
response_sender: PacketSender,
@ -2232,8 +2267,7 @@ impl ClusterInfo {
.unwrap();
let mut last_print = Instant::now();
loop {
let e = Self::run_listen(
&me,
let e = self.run_listen(
&recycler,
bank_forks.as_ref(),
&requests_receiver,
@ -2245,10 +2279,10 @@ impl ClusterInfo {
return;
}
if e.is_err() {
let r_gossip = me.gossip.read().unwrap();
let r_gossip = self.gossip.read().unwrap();
debug!(
"{}: run_listen timeout, table size: {}",
me.id(),
self.id(),
r_gossip.crds.table.len()
);
}