Add separate push queue to reduce push lock contention (#12713)

This commit is contained in:
sakridge
2020-10-13 18:10:25 -07:00
committed by GitHub
parent b8f03c9b0f
commit 1f1eb9f26e
6 changed files with 68 additions and 18 deletions

View File

@@ -257,6 +257,7 @@ pub struct ClusterInfo {
id: Pubkey,
stats: GossipStats,
socket: UdpSocket,
local_message_pending_push_queue: RwLock<Vec<(CrdsValue, u64)>>,
}
impl Default for ClusterInfo {
@@ -414,6 +415,7 @@ impl ClusterInfo {
id,
stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
local_message_pending_push_queue: RwLock::new(vec![]),
};
{
let mut gossip = me.gossip.write().unwrap();
@@ -440,6 +442,12 @@ impl ClusterInfo {
id: *new_id,
stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
local_message_pending_push_queue: RwLock::new(
self.local_message_pending_push_queue
.read()
.unwrap()
.clone(),
),
}
}
@@ -462,9 +470,14 @@ impl ClusterInfo {
self.my_contact_info.write().unwrap().wallclock = now;
let entry =
CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair);
let mut w_gossip = self.gossip.write().unwrap();
w_gossip.refresh_push_active_set(stakes, gossip_validators);
w_gossip.process_push_message(&self.id(), vec![entry], now);
self.gossip
.write()
.unwrap()
.refresh_push_active_set(stakes, gossip_validators);
self.local_message_pending_push_queue
.write()
.unwrap()
.push((entry, now));
}
// TODO kill insert_info, only used by tests
@@ -633,10 +646,10 @@ impl ClusterInfo {
CrdsData::LowestSlot(0, LowestSlot::new(id, min, now)),
&self.keypair,
);
self.gossip
self.local_message_pending_push_queue
.write()
.unwrap()
.process_push_message(&self.id(), vec![entry], now);
.push((entry, now));
}
}
@@ -689,8 +702,10 @@ impl ClusterInfo {
let n = slots.fill(&update[num..], now);
if n > 0 {
let entry = CrdsValue::new_signed(CrdsData::EpochSlots(ix, slots), &self.keypair);
self.time_gossip_write_lock("epcoh_slots_push", &self.stats.epoch_slots_push)
.process_push_message(&self.id(), vec![entry], now);
self.local_message_pending_push_queue
.write()
.unwrap()
.push((entry, now));
}
num += n;
if num < update.len() {
@@ -718,9 +733,10 @@ impl ClusterInfo {
pub fn push_message(&self, message: CrdsValue) {
let now = message.wallclock();
let id = message.pubkey();
self.time_gossip_write_lock("process_push_message", &self.stats.push_message)
.process_push_message(&id, vec![message], now);
self.local_message_pending_push_queue
.write()
.unwrap()
.push((message, now));
}
pub fn push_accounts_hashes(&self, accounts_hashes: Vec<(Slot, Hash)>) {
@@ -761,8 +777,10 @@ impl ClusterInfo {
CrdsValue::compute_vote_index(tower_index, current_votes)
};
let entry = CrdsValue::new_signed(CrdsData::Vote(vote_ix, vote), &self.keypair);
self.time_gossip_write_lock("push_vote_process_push", &self.stats.vote_process_push)
.process_push_message(&self.id(), vec![entry], now);
self.local_message_pending_push_queue
.write()
.unwrap()
.push((entry, now));
}
pub fn send_vote(&self, vote: &Transaction) -> Result<()> {
@@ -1425,11 +1443,21 @@ impl ClusterInfo {
})
.collect()
}
fn drain_push_queue(&self) -> Vec<(CrdsValue, u64)> {
let mut push_queue = self.local_message_pending_push_queue.write().unwrap();
std::mem::take(&mut *push_queue)
}
#[cfg(test)]
pub fn flush_push_queue(&self) {
let pending_push_messages = self.drain_push_queue();
let mut gossip = self.gossip.write().unwrap();
gossip.process_push_messages(pending_push_messages);
}
fn new_push_requests(&self) -> Vec<(SocketAddr, Protocol)> {
let self_id = self.id();
let (_, push_messages) = self
.time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests)
.new_push_messages(timestamp());
.new_push_messages(self.drain_push_queue(), timestamp());
let messages: Vec<_> = push_messages
.into_iter()
.filter_map(|(peer, messages)| {
@@ -2909,7 +2937,7 @@ mod tests {
.gossip
.write()
.unwrap()
.new_push_messages(timestamp());
.new_push_messages(cluster_info.drain_push_queue(), timestamp());
// there should be some pushes ready
assert_eq!(push_messages.is_empty(), false);
push_messages
@@ -3092,6 +3120,7 @@ mod tests {
let tx = test_tx();
let index = 1;
cluster_info.push_vote(index, tx.clone());
cluster_info.flush_push_queue();
// -1 to make sure that the clock is strictly lower then when insert occurred
let (labels, votes, max_ts) = cluster_info.get_votes(now - 1);
@@ -3121,6 +3150,7 @@ mod tests {
assert!(slots.is_empty());
assert!(since.is_none());
cluster_info.push_epoch_slots(&[0]);
cluster_info.flush_push_queue();
let (slots, since) = cluster_info.get_epoch_slots_since(Some(std::u64::MAX));
assert!(slots.is_empty());
@@ -3449,7 +3479,9 @@ mod tests {
range.push(last + rand::thread_rng().gen_range(1, 32));
}
cluster_info.push_epoch_slots(&range[..16000]);
cluster_info.flush_push_queue();
cluster_info.push_epoch_slots(&range[16000..]);
cluster_info.flush_push_queue();
let (slots, since) = cluster_info.get_epoch_slots_since(None);
let slots: Vec<_> = slots.iter().flat_map(|x| x.to_slots(0)).collect();
assert_eq!(slots, range);