Add ability to change the validator identity at runtime
This commit is contained in:
@@ -213,7 +213,7 @@ pub struct ClusterInfo {
|
||||
/// The network
|
||||
pub gossip: RwLock<CrdsGossip>,
|
||||
/// set the keypair that will be used to sign crds values generated. It is unset only in tests.
|
||||
pub keypair: Arc<Keypair>,
|
||||
keypair: RwLock<Arc<Keypair>>,
|
||||
/// Network entrypoints
|
||||
entrypoints: RwLock<Vec<ContactInfo>>,
|
||||
outbound_budget: DataBudget,
|
||||
@@ -224,7 +224,7 @@ pub struct ClusterInfo {
|
||||
local_message_pending_push_queue: Mutex<Vec<CrdsValue>>,
|
||||
contact_debug_interval: u64, // milliseconds, 0 = disabled
|
||||
contact_save_interval: u64, // milliseconds, 0 = disabled
|
||||
instance: NodeInstance,
|
||||
instance: RwLock<NodeInstance>,
|
||||
contact_info_path: PathBuf,
|
||||
}
|
||||
|
||||
@@ -469,7 +469,7 @@ impl ClusterInfo {
|
||||
let id = contact_info.id;
|
||||
let me = Self {
|
||||
gossip: RwLock::new(CrdsGossip::default()),
|
||||
keypair,
|
||||
keypair: RwLock::new(keypair),
|
||||
entrypoints: RwLock::new(vec![]),
|
||||
outbound_budget: DataBudget::default(),
|
||||
my_contact_info: RwLock::new(contact_info),
|
||||
@@ -481,7 +481,7 @@ impl ClusterInfo {
|
||||
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||
local_message_pending_push_queue: Mutex::default(),
|
||||
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
|
||||
instance: NodeInstance::new(&mut thread_rng(), id, timestamp()),
|
||||
instance: RwLock::new(NodeInstance::new(&mut thread_rng(), id, timestamp())),
|
||||
contact_info_path: PathBuf::default(),
|
||||
contact_save_interval: 0, // disabled
|
||||
};
|
||||
@@ -503,7 +503,7 @@ impl ClusterInfo {
|
||||
my_contact_info.id = *new_id;
|
||||
ClusterInfo {
|
||||
gossip: RwLock::new(gossip),
|
||||
keypair: self.keypair.clone(),
|
||||
keypair: RwLock::new(self.keypair.read().unwrap().clone()),
|
||||
entrypoints: RwLock::new(self.entrypoints.read().unwrap().clone()),
|
||||
outbound_budget: self.outbound_budget.clone_non_atomic(),
|
||||
my_contact_info: RwLock::new(my_contact_info),
|
||||
@@ -517,7 +517,7 @@ impl ClusterInfo {
|
||||
.clone(),
|
||||
),
|
||||
contact_debug_interval: self.contact_debug_interval,
|
||||
instance: NodeInstance::new(&mut thread_rng(), *new_id, timestamp()),
|
||||
instance: RwLock::new(NodeInstance::new(&mut thread_rng(), *new_id, timestamp())),
|
||||
contact_info_path: PathBuf::default(),
|
||||
contact_save_interval: 0, // disabled
|
||||
}
|
||||
@@ -536,10 +536,10 @@ impl ClusterInfo {
|
||||
self.my_contact_info.write().unwrap().wallclock = now;
|
||||
let entries: Vec<_> = vec![
|
||||
CrdsData::ContactInfo(self.my_contact_info()),
|
||||
CrdsData::NodeInstance(self.instance.with_wallclock(now)),
|
||||
CrdsData::NodeInstance(self.instance.read().unwrap().with_wallclock(now)),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|v| CrdsValue::new_signed(v, &self.keypair))
|
||||
.map(|v| CrdsValue::new_signed(v, &self.keypair()))
|
||||
.collect();
|
||||
self.local_message_pending_push_queue
|
||||
.lock()
|
||||
@@ -553,7 +553,7 @@ impl ClusterInfo {
|
||||
|
||||
// TODO kill insert_info, only used by tests
|
||||
pub fn insert_info(&self, contact_info: ContactInfo) {
|
||||
let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair);
|
||||
let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair());
|
||||
let _ = self.gossip.write().unwrap().crds.insert(value, timestamp());
|
||||
}
|
||||
|
||||
@@ -680,6 +680,25 @@ impl ClusterInfo {
|
||||
self.my_contact_info.read().unwrap().id
|
||||
}
|
||||
|
||||
pub fn keypair(&self) -> RwLockReadGuard<Arc<Keypair>> {
|
||||
self.keypair.read().unwrap()
|
||||
}
|
||||
|
||||
pub fn set_keypair(&self, new_keypair: Arc<Keypair>) {
|
||||
let id = new_keypair.pubkey();
|
||||
|
||||
self.gossip.write().unwrap().set_self(&id);
|
||||
{
|
||||
let mut instance = self.instance.write().unwrap();
|
||||
*instance = instance.with_id(id);
|
||||
}
|
||||
*self.keypair.write().unwrap() = new_keypair;
|
||||
self.my_contact_info.write().unwrap().id = id;
|
||||
|
||||
self.insert_self();
|
||||
self.push_self(&HashMap::new(), None);
|
||||
}
|
||||
|
||||
pub fn lookup_contact_info<F, Y>(&self, id: &Pubkey, map: F) -> Option<Y>
|
||||
where
|
||||
F: FnOnce(&ContactInfo) -> Y,
|
||||
@@ -887,7 +906,7 @@ impl ClusterInfo {
|
||||
if min > last {
|
||||
let entry = CrdsValue::new_signed(
|
||||
CrdsData::LowestSlot(0, LowestSlot::new(id, min, now)),
|
||||
&self.keypair,
|
||||
&self.keypair(),
|
||||
);
|
||||
self.local_message_pending_push_queue
|
||||
.lock()
|
||||
@@ -948,7 +967,7 @@ impl ClusterInfo {
|
||||
update = &update[n..];
|
||||
if n > 0 {
|
||||
let epoch_slots = CrdsData::EpochSlots(ix, slots);
|
||||
let entry = CrdsValue::new_signed(epoch_slots, &self.keypair);
|
||||
let entry = CrdsValue::new_signed(epoch_slots, &self.keypair());
|
||||
entries.push(entry);
|
||||
}
|
||||
epoch_slot_index += 1;
|
||||
@@ -991,7 +1010,7 @@ impl ClusterInfo {
|
||||
}
|
||||
|
||||
let message = CrdsData::AccountsHashes(SnapshotHash::new(self.id(), accounts_hashes));
|
||||
self.push_message(CrdsValue::new_signed(message, &self.keypair));
|
||||
self.push_message(CrdsValue::new_signed(message, &self.keypair()));
|
||||
}
|
||||
|
||||
pub fn push_snapshot_hashes(&self, snapshot_hashes: Vec<(Slot, Hash)>) {
|
||||
@@ -1004,7 +1023,7 @@ impl ClusterInfo {
|
||||
}
|
||||
|
||||
let message = CrdsData::SnapshotHashes(SnapshotHash::new(self.id(), snapshot_hashes));
|
||||
self.push_message(CrdsValue::new_signed(message, &self.keypair));
|
||||
self.push_message(CrdsValue::new_signed(message, &self.keypair()));
|
||||
}
|
||||
|
||||
fn push_vote_at_index(&self, vote: Transaction, vote_index: u8) {
|
||||
@@ -1013,7 +1032,7 @@ impl ClusterInfo {
|
||||
let now = timestamp();
|
||||
let vote = Vote::new(self_pubkey, vote, now);
|
||||
let vote = CrdsData::Vote(vote_index, vote);
|
||||
let vote = CrdsValue::new_signed(vote, &self.keypair);
|
||||
let vote = CrdsValue::new_signed(vote, &self.keypair());
|
||||
self.gossip
|
||||
.write()
|
||||
.unwrap()
|
||||
@@ -1136,7 +1155,7 @@ impl ClusterInfo {
|
||||
other_payload: &[u8],
|
||||
) -> Result<(), GossipError> {
|
||||
self.gossip.write().unwrap().push_duplicate_shred(
|
||||
&self.keypair,
|
||||
&self.keypair(),
|
||||
shred,
|
||||
other_payload,
|
||||
None::<fn(Slot) -> Option<Pubkey>>, // Leader schedule
|
||||
@@ -1431,8 +1450,10 @@ impl ClusterInfo {
|
||||
}
|
||||
|
||||
fn insert_self(&self) {
|
||||
let value =
|
||||
CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair);
|
||||
let value = CrdsValue::new_signed(
|
||||
CrdsData::ContactInfo(self.my_contact_info()),
|
||||
&self.keypair(),
|
||||
);
|
||||
let _ = self.gossip.write().unwrap().crds.insert(value, timestamp());
|
||||
}
|
||||
|
||||
@@ -1545,7 +1566,7 @@ impl ClusterInfo {
|
||||
let gossip = self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests);
|
||||
match gossip.new_pull_request(
|
||||
thread_pool,
|
||||
self.keypair.deref(),
|
||||
self.keypair().deref(),
|
||||
now,
|
||||
gossip_validators,
|
||||
stakes,
|
||||
@@ -1568,7 +1589,7 @@ impl ClusterInfo {
|
||||
}
|
||||
}
|
||||
let self_info = CrdsData::ContactInfo(self.my_contact_info());
|
||||
let self_info = CrdsValue::new_signed(self_info, &self.keypair);
|
||||
let self_info = CrdsValue::new_signed(self_info, &self.keypair());
|
||||
let pulls = pulls
|
||||
.into_iter()
|
||||
.flat_map(|(peer, filters)| repeat(peer.gossip).zip(filters))
|
||||
@@ -1804,10 +1825,12 @@ impl ClusterInfo {
|
||||
let recycler = PacketsRecycler::default();
|
||||
let crds_data = vec![
|
||||
CrdsData::Version(Version::new(self.id())),
|
||||
CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())),
|
||||
CrdsData::NodeInstance(
|
||||
self.instance.read().unwrap().with_wallclock(timestamp()),
|
||||
),
|
||||
];
|
||||
for value in crds_data {
|
||||
let value = CrdsValue::new_signed(value, &self.keypair);
|
||||
let value = CrdsValue::new_signed(value, &self.keypair());
|
||||
self.push_message(value);
|
||||
}
|
||||
let mut generate_pull_requests = true;
|
||||
@@ -1997,7 +2020,7 @@ impl ClusterInfo {
|
||||
R: Rng + CryptoRng,
|
||||
{
|
||||
let mut cache = HashMap::<(Pubkey, SocketAddr), bool>::new();
|
||||
let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair).ok();
|
||||
let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair()).ok();
|
||||
let mut ping_cache = self.ping_cache.lock().unwrap();
|
||||
let mut hard_check = move |node| {
|
||||
let (check, ping) = ping_cache.check(now, node, &mut pingf);
|
||||
@@ -2266,7 +2289,7 @@ impl ClusterInfo {
|
||||
let packets: Vec<_> = pings
|
||||
.into_iter()
|
||||
.filter_map(|(addr, ping)| {
|
||||
let pong = Pong::new(&ping, &self.keypair).ok()?;
|
||||
let pong = Pong::new(&ping, &self.keypair()).ok()?;
|
||||
let pong = Protocol::PongMessage(pong);
|
||||
match Packet::from_data(Some(&addr), pong) {
|
||||
Ok(packet) => Some(packet),
|
||||
@@ -2368,7 +2391,7 @@ impl ClusterInfo {
|
||||
destination: from,
|
||||
wallclock,
|
||||
};
|
||||
prune_data.sign(&self.keypair);
|
||||
prune_data.sign(&self.keypair());
|
||||
let prune_message = Protocol::PruneMessage(self_pubkey, prune_data);
|
||||
Some((peer.gossip, prune_message))
|
||||
})
|
||||
@@ -2469,8 +2492,9 @@ impl ClusterInfo {
|
||||
// this node with more recent timestamp.
|
||||
let check_duplicate_instance = |values: &[CrdsValue]| {
|
||||
if should_check_duplicate_instance {
|
||||
let instance = self.instance.read().unwrap();
|
||||
for value in values {
|
||||
if self.instance.check_duplicate(value) {
|
||||
if instance.check_duplicate(value) {
|
||||
return Err(GossipError::DuplicateNodeInstance);
|
||||
}
|
||||
}
|
||||
@@ -3567,7 +3591,7 @@ mod tests {
|
||||
.unwrap()
|
||||
.new_pull_request(
|
||||
&thread_pool,
|
||||
cluster_info.keypair.deref(),
|
||||
cluster_info.keypair().deref(),
|
||||
timestamp(),
|
||||
None,
|
||||
&HashMap::new(),
|
||||
|
@@ -395,24 +395,26 @@ pub struct NodeInstance {
|
||||
}
|
||||
|
||||
impl NodeInstance {
|
||||
pub fn new<R>(rng: &mut R, pubkey: Pubkey, now: u64) -> Self
|
||||
pub fn new<R>(rng: &mut R, from: Pubkey, now: u64) -> Self
|
||||
where
|
||||
R: Rng + CryptoRng,
|
||||
{
|
||||
Self {
|
||||
from: pubkey,
|
||||
from,
|
||||
wallclock: now,
|
||||
timestamp: now,
|
||||
token: rng.gen(),
|
||||
}
|
||||
}
|
||||
|
||||
// Clones the value with an updated id.
|
||||
pub(crate) fn with_id(&self, from: Pubkey) -> Self {
|
||||
Self { from, ..*self }
|
||||
}
|
||||
|
||||
// Clones the value with an updated wallclock.
|
||||
pub(crate) fn with_wallclock(&self, now: u64) -> Self {
|
||||
Self {
|
||||
wallclock: now,
|
||||
..*self
|
||||
}
|
||||
pub(crate) fn with_wallclock(&self, wallclock: u64) -> Self {
|
||||
Self { wallclock, ..*self }
|
||||
}
|
||||
|
||||
// Returns true if the crds-value is a duplicate instance
|
||||
|
@@ -121,7 +121,7 @@ pub fn discover_cluster(
|
||||
}
|
||||
|
||||
pub fn discover(
|
||||
keypair: Option<Arc<Keypair>>,
|
||||
keypair: Option<Keypair>,
|
||||
entrypoint: Option<&SocketAddr>,
|
||||
num_nodes: Option<usize>, // num_nodes only counts validators, excludes spy nodes
|
||||
timeout: Duration,
|
||||
@@ -133,8 +133,10 @@ pub fn discover(
|
||||
Vec<ContactInfo>, // all gossip peers
|
||||
Vec<ContactInfo>, // tvu peers (validators)
|
||||
)> {
|
||||
let keypair = keypair.unwrap_or_else(|| Arc::new(Keypair::new()));
|
||||
|
||||
let keypair = {
|
||||
#[allow(clippy::redundant_closure)]
|
||||
keypair.unwrap_or_else(|| Keypair::new())
|
||||
};
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (gossip_service, ip_echo, spy_ref) = make_gossip_node(
|
||||
keypair,
|
||||
@@ -295,7 +297,7 @@ fn spy(
|
||||
/// Makes a spy or gossip node based on whether or not a gossip_addr was passed in
|
||||
/// Pass in a gossip addr to fully participate in gossip instead of relying on just pulls
|
||||
fn make_gossip_node(
|
||||
keypair: Arc<Keypair>,
|
||||
keypair: Keypair,
|
||||
entrypoint: Option<&SocketAddr>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
gossip_addr: Option<&SocketAddr>,
|
||||
@@ -307,7 +309,7 @@ fn make_gossip_node(
|
||||
} else {
|
||||
ClusterInfo::spy_node(keypair.pubkey(), shred_version)
|
||||
};
|
||||
let cluster_info = ClusterInfo::new(node, keypair);
|
||||
let cluster_info = ClusterInfo::new(node, Arc::new(keypair));
|
||||
if let Some(entrypoint) = entrypoint {
|
||||
cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint));
|
||||
}
|
||||
|
@@ -15,7 +15,6 @@ use {
|
||||
error,
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
process::exit,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
},
|
||||
};
|
||||
@@ -225,7 +224,7 @@ fn process_spy(matches: &ArgMatches) -> std::io::Result<()> {
|
||||
.value_of("node_pubkey")
|
||||
.map(|pubkey_str| pubkey_str.parse::<Pubkey>().unwrap());
|
||||
let shred_version = value_t_or_exit!(matches, "shred_version", u16);
|
||||
let identity_keypair = keypair_of(matches, "identity").map(Arc::new);
|
||||
let identity_keypair = keypair_of(matches, "identity");
|
||||
|
||||
let entrypoint_addr = parse_entrypoint(matches);
|
||||
|
||||
|
Reference in New Issue
Block a user