uses current timestamp when flushing local pending push queue (#16808)

local_message_pending_push_queue is recording timestamps at the time the
value is created, and uses that when the pending values are flushed:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L321
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds_gossip.rs#L96-L102

which is then used as the insert_timestamp when inserting values in the
crds table:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds_gossip_push.rs#L183

The flushing may happen 100ms after the values are created (or even
later if there is a lock contention). This will cause non-monotone
insert_timestamps in the crds table (where time goes backward),
hindering the usability of insert_timestamps for other computations.

For example both ClusterInfo::get_votes and get_epoch_slots_since rely
on monotone insert_timestamps when values are inserted into the table:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298

This commit removes timestamps from local_message_pending_push_queue and
uses current timestamp when flushing the queue.
This commit is contained in:
behzad nouri
2021-04-28 00:15:11 +00:00
committed by GitHub
parent cac666d035
commit b468ead1b1
2 changed files with 33 additions and 35 deletions

View File

@ -71,7 +71,7 @@ use std::{
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard},
thread::{sleep, Builder, JoinHandle}, thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -318,7 +318,7 @@ pub struct ClusterInfo {
id: Pubkey, id: Pubkey,
stats: GossipStats, stats: GossipStats,
socket: UdpSocket, socket: UdpSocket,
local_message_pending_push_queue: RwLock<Vec<(CrdsValue, u64)>>, local_message_pending_push_queue: Mutex<Vec<CrdsValue>>,
contact_debug_interval: u64, // milliseconds, 0 = disabled contact_debug_interval: u64, // milliseconds, 0 = disabled
contact_save_interval: u64, // milliseconds, 0 = disabled contact_save_interval: u64, // milliseconds, 0 = disabled
instance: NodeInstance, instance: NodeInstance,
@ -588,7 +588,7 @@ impl ClusterInfo {
id, id,
stats: GossipStats::default(), stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
local_message_pending_push_queue: RwLock::new(vec![]), local_message_pending_push_queue: Mutex::default(),
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
instance: NodeInstance::new(&mut thread_rng(), id, timestamp()), instance: NodeInstance::new(&mut thread_rng(), id, timestamp()),
contact_info_path: PathBuf::default(), contact_info_path: PathBuf::default(),
@ -620,9 +620,9 @@ impl ClusterInfo {
id: *new_id, id: *new_id,
stats: GossipStats::default(), stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
local_message_pending_push_queue: RwLock::new( local_message_pending_push_queue: Mutex::new(
self.local_message_pending_push_queue self.local_message_pending_push_queue
.read() .lock()
.unwrap() .unwrap()
.clone(), .clone(),
), ),
@ -651,13 +651,10 @@ impl ClusterInfo {
.into_iter() .into_iter()
.map(|v| CrdsValue::new_signed(v, &self.keypair)) .map(|v| CrdsValue::new_signed(v, &self.keypair))
.collect(); .collect();
{ self.local_message_pending_push_queue
let mut local_message_pending_push_queue = .lock()
self.local_message_pending_push_queue.write().unwrap(); .unwrap()
for entry in entries { .extend(entries);
local_message_pending_push_queue.push((entry, now));
}
}
self.gossip self.gossip
.write() .write()
.unwrap() .unwrap()
@ -1008,9 +1005,9 @@ impl ClusterInfo {
&self.keypair, &self.keypair,
); );
self.local_message_pending_push_queue self.local_message_pending_push_queue
.write() .lock()
.unwrap() .unwrap()
.push((entry, now)); .push(entry);
} }
} }
@ -1064,9 +1061,9 @@ impl ClusterInfo {
if n > 0 { if n > 0 {
let entry = CrdsValue::new_signed(CrdsData::EpochSlots(ix, slots), &self.keypair); let entry = CrdsValue::new_signed(CrdsData::EpochSlots(ix, slots), &self.keypair);
self.local_message_pending_push_queue self.local_message_pending_push_queue
.write() .lock()
.unwrap() .unwrap()
.push((entry, now)); .push(entry);
} }
num += n; num += n;
if num < update.len() { if num < update.len() {
@ -1092,12 +1089,11 @@ impl ClusterInfo {
GossipWriteLock::new(self.gossip.write().unwrap(), label, counter) GossipWriteLock::new(self.gossip.write().unwrap(), label, counter)
} }
pub fn push_message(&self, message: CrdsValue) { pub(crate) fn push_message(&self, message: CrdsValue) {
let now = message.wallclock();
self.local_message_pending_push_queue self.local_message_pending_push_queue
.write() .lock()
.unwrap() .unwrap()
.push((message, now)); .push(message);
} }
pub fn push_accounts_hashes(&self, accounts_hashes: Vec<(Slot, Hash)>) { pub fn push_accounts_hashes(&self, accounts_hashes: Vec<(Slot, Hash)>) {
@ -1695,15 +1691,16 @@ impl ClusterInfo {
}) })
.collect() .collect()
} }
fn drain_push_queue(&self) -> Vec<(CrdsValue, u64)> {
let mut push_queue = self.local_message_pending_push_queue.write().unwrap(); fn drain_push_queue(&self) -> Vec<CrdsValue> {
let mut push_queue = self.local_message_pending_push_queue.lock().unwrap();
std::mem::take(&mut *push_queue) std::mem::take(&mut *push_queue)
} }
#[cfg(test)] #[cfg(test)]
pub fn flush_push_queue(&self) { pub fn flush_push_queue(&self) {
let pending_push_messages = self.drain_push_queue(); let pending_push_messages = self.drain_push_queue();
let mut gossip = self.gossip.write().unwrap(); let mut gossip = self.gossip.write().unwrap();
gossip.process_push_messages(pending_push_messages); gossip.process_push_messages(pending_push_messages, timestamp());
} }
fn new_push_requests( fn new_push_requests(
&self, &self,

View File

@ -62,15 +62,12 @@ impl CrdsGossip {
values values
.into_iter() .into_iter()
.filter_map(|val| { .filter_map(|val| {
let res = self let old = self
.push .push
.process_push_message(&mut self.crds, from, val, now); .process_push_message(&mut self.crds, from, val, now)
if let Ok(Some(val)) = res { .ok()?;
self.pull.record_old_hash(val.value_hash, now); self.pull.record_old_hash(old.as_ref()?.value_hash, now);
Some(val) old
} else {
None
}
}) })
.collect() .collect()
} }
@ -93,8 +90,12 @@ impl CrdsGossip {
prune_map prune_map
} }
pub fn process_push_messages(&mut self, pending_push_messages: Vec<(CrdsValue, u64)>) { pub(crate) fn process_push_messages(
for (push_message, timestamp) in pending_push_messages { &mut self,
pending_push_messages: Vec<CrdsValue>,
timestamp: u64,
) {
for push_message in pending_push_messages {
let _ = let _ =
self.push self.push
.process_push_message(&mut self.crds, &self.id, push_message, timestamp); .process_push_message(&mut self.crds, &self.id, push_message, timestamp);
@ -103,10 +104,10 @@ impl CrdsGossip {
pub fn new_push_messages( pub fn new_push_messages(
&mut self, &mut self,
pending_push_messages: Vec<(CrdsValue, u64)>, pending_push_messages: Vec<CrdsValue>,
now: u64, now: u64,
) -> HashMap<Pubkey, Vec<CrdsValue>> { ) -> HashMap<Pubkey, Vec<CrdsValue>> {
self.process_push_messages(pending_push_messages); self.process_push_messages(pending_push_messages, now);
self.push.new_push_messages(&self.crds, now) self.push.new_push_messages(&self.crds, now)
} }