_id => _pubkey variable renaming (#4419)
* wallet: rename *_account_id to *_account_pubkey * s/from_id/from_pubkey/g * s/node_id/node_pubkey/g * s/stake_id/stake_pubkey/g * s/voter_id/voter_pubkey/g * s/vote_id/vote_pubkey/g * s/delegate_id/delegate_pubkey/g * s/account_id/account_pubkey/g * s/to_id/to_pubkey/g * s/my_id/my_pubkey/g * cargo fmt * s/staker_id/staker_pubkey/g * s/mining_pool_id/mining_pool_pubkey/g * s/leader_id/leader_pubkey/g * cargo fmt * s/funding_id/funding_pubkey/g
This commit is contained in:
@ -144,7 +144,7 @@ impl BankingStage {
|
||||
}
|
||||
|
||||
pub fn consume_buffered_packets(
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
buffered_packets: &[PacketsAndOffsets],
|
||||
) -> Result<UnprocessedPackets> {
|
||||
@ -194,7 +194,7 @@ impl BankingStage {
|
||||
&bank,
|
||||
&msgs,
|
||||
&unprocessed_indexes,
|
||||
my_id,
|
||||
my_pubkey,
|
||||
next_leader,
|
||||
);
|
||||
Self::push_unprocessed(
|
||||
@ -226,12 +226,12 @@ impl BankingStage {
|
||||
}
|
||||
|
||||
fn consume_or_forward_packets(
|
||||
leader_id: Option<Pubkey>,
|
||||
leader_pubkey: Option<Pubkey>,
|
||||
bank_is_available: bool,
|
||||
would_be_leader: bool,
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
) -> BufferedPacketsDecision {
|
||||
leader_id.map_or(
|
||||
leader_pubkey.map_or(
|
||||
// If leader is not known, return the buffered packets as is
|
||||
BufferedPacketsDecision::Hold,
|
||||
// else process the packets
|
||||
@ -242,7 +242,7 @@ impl BankingStage {
|
||||
} else if would_be_leader {
|
||||
// If the node will be the leader soon, hold the packets for now
|
||||
BufferedPacketsDecision::Hold
|
||||
} else if x != *my_id {
|
||||
} else if x != *my_pubkey {
|
||||
// If the current node is not the leader, forward the buffered packets
|
||||
BufferedPacketsDecision::Forward
|
||||
} else {
|
||||
@ -282,8 +282,8 @@ impl BankingStage {
|
||||
}
|
||||
BufferedPacketsDecision::Forward => {
|
||||
if enable_forwarding {
|
||||
next_leader.map_or(Ok(buffered_packets.to_vec()), |leader_id| {
|
||||
rcluster_info.lookup(&leader_id).map_or(
|
||||
next_leader.map_or(Ok(buffered_packets.to_vec()), |leader_pubkey| {
|
||||
rcluster_info.lookup(&leader_pubkey).map_or(
|
||||
Ok(buffered_packets.to_vec()),
|
||||
|leader| {
|
||||
let _ = Self::forward_buffered_packets(
|
||||
@ -665,14 +665,14 @@ impl BankingStage {
|
||||
bank: &Arc<Bank>,
|
||||
msgs: &Packets,
|
||||
transaction_indexes: &[usize],
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
next_leader: Option<Pubkey>,
|
||||
) -> Vec<usize> {
|
||||
// Check if we are the next leader. If so, let's not filter the packets
|
||||
// as we'll filter it again while processing the packets.
|
||||
// Filtering helps if we were going to forward the packets to some other node
|
||||
if let Some(leader) = next_leader {
|
||||
if leader == *my_id {
|
||||
if leader == *my_pubkey {
|
||||
return transaction_indexes.to_vec();
|
||||
}
|
||||
}
|
||||
@ -753,7 +753,7 @@ impl BankingStage {
|
||||
|
||||
if processed < verified_txs_len {
|
||||
let next_leader = poh.lock().unwrap().next_slot_leader();
|
||||
let my_id = cluster_info.read().unwrap().id();
|
||||
let my_pubkey = cluster_info.read().unwrap().id();
|
||||
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
|
||||
while let Some((msgs, vers)) = mms_iter.next() {
|
||||
let packet_indexes = Self::generate_packet_indexes(vers);
|
||||
@ -761,7 +761,7 @@ impl BankingStage {
|
||||
&bank,
|
||||
&msgs,
|
||||
&packet_indexes,
|
||||
&my_id,
|
||||
&my_pubkey,
|
||||
next_leader,
|
||||
);
|
||||
Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes);
|
||||
@ -1391,40 +1391,65 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_should_process_or_forward_packets() {
|
||||
let my_id = Pubkey::new_rand();
|
||||
let my_id1 = Pubkey::new_rand();
|
||||
let my_pubkey = Pubkey::new_rand();
|
||||
let my_pubkey1 = Pubkey::new_rand();
|
||||
|
||||
assert_eq!(
|
||||
BankingStage::consume_or_forward_packets(None, true, false, &my_id),
|
||||
BankingStage::consume_or_forward_packets(None, true, false, &my_pubkey),
|
||||
BufferedPacketsDecision::Hold
|
||||
);
|
||||
assert_eq!(
|
||||
BankingStage::consume_or_forward_packets(None, false, false, &my_id),
|
||||
BankingStage::consume_or_forward_packets(None, false, false, &my_pubkey),
|
||||
BufferedPacketsDecision::Hold
|
||||
);
|
||||
assert_eq!(
|
||||
BankingStage::consume_or_forward_packets(None, false, false, &my_id1),
|
||||
BankingStage::consume_or_forward_packets(None, false, false, &my_pubkey1),
|
||||
BufferedPacketsDecision::Hold
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
BankingStage::consume_or_forward_packets(Some(my_id1.clone()), false, false, &my_id),
|
||||
BankingStage::consume_or_forward_packets(
|
||||
Some(my_pubkey1.clone()),
|
||||
false,
|
||||
false,
|
||||
&my_pubkey
|
||||
),
|
||||
BufferedPacketsDecision::Forward
|
||||
);
|
||||
assert_eq!(
|
||||
BankingStage::consume_or_forward_packets(Some(my_id1.clone()), false, true, &my_id),
|
||||
BankingStage::consume_or_forward_packets(
|
||||
Some(my_pubkey1.clone()),
|
||||
false,
|
||||
true,
|
||||
&my_pubkey
|
||||
),
|
||||
BufferedPacketsDecision::Hold
|
||||
);
|
||||
assert_eq!(
|
||||
BankingStage::consume_or_forward_packets(Some(my_id1.clone()), true, false, &my_id),
|
||||
BankingStage::consume_or_forward_packets(
|
||||
Some(my_pubkey1.clone()),
|
||||
true,
|
||||
false,
|
||||
&my_pubkey
|
||||
),
|
||||
BufferedPacketsDecision::Consume
|
||||
);
|
||||
assert_eq!(
|
||||
BankingStage::consume_or_forward_packets(Some(my_id1.clone()), false, false, &my_id1),
|
||||
BankingStage::consume_or_forward_packets(
|
||||
Some(my_pubkey1.clone()),
|
||||
false,
|
||||
false,
|
||||
&my_pubkey1
|
||||
),
|
||||
BufferedPacketsDecision::Hold
|
||||
);
|
||||
assert_eq!(
|
||||
BankingStage::consume_or_forward_packets(Some(my_id1.clone()), true, false, &my_id1),
|
||||
BankingStage::consume_or_forward_packets(
|
||||
Some(my_pubkey1.clone()),
|
||||
true,
|
||||
false,
|
||||
&my_pubkey1
|
||||
),
|
||||
BufferedPacketsDecision::Consume
|
||||
);
|
||||
}
|
||||
|
@ -65,14 +65,14 @@ pub trait BlockstreamEvents {
|
||||
&self,
|
||||
slot: u64,
|
||||
tick_height: u64,
|
||||
leader_id: &Pubkey,
|
||||
leader_pubkey: &Pubkey,
|
||||
entries: &Entry,
|
||||
) -> Result<()>;
|
||||
fn emit_block_event(
|
||||
&self,
|
||||
slot: u64,
|
||||
tick_height: u64,
|
||||
leader_id: &Pubkey,
|
||||
leader_pubkey: &Pubkey,
|
||||
blockhash: Hash,
|
||||
) -> Result<()>;
|
||||
}
|
||||
@ -90,7 +90,7 @@ where
|
||||
&self,
|
||||
slot: u64,
|
||||
tick_height: u64,
|
||||
leader_id: &Pubkey,
|
||||
leader_pubkey: &Pubkey,
|
||||
entry: &Entry,
|
||||
) -> Result<()> {
|
||||
let transactions: Vec<Vec<u8>> = serialize_transactions(entry);
|
||||
@ -105,7 +105,7 @@ where
|
||||
Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true),
|
||||
slot,
|
||||
tick_height,
|
||||
leader_id,
|
||||
leader_pubkey,
|
||||
json_entry,
|
||||
);
|
||||
self.output.write(payload)?;
|
||||
@ -116,7 +116,7 @@ where
|
||||
&self,
|
||||
slot: u64,
|
||||
tick_height: u64,
|
||||
leader_id: &Pubkey,
|
||||
leader_pubkey: &Pubkey,
|
||||
blockhash: Hash,
|
||||
) -> Result<()> {
|
||||
let payload = format!(
|
||||
@ -124,7 +124,7 @@ where
|
||||
Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true),
|
||||
slot,
|
||||
tick_height,
|
||||
leader_id,
|
||||
leader_pubkey,
|
||||
blockhash,
|
||||
);
|
||||
self.output.write(payload)?;
|
||||
@ -206,19 +206,19 @@ mod test {
|
||||
let tick_height_initial = 0;
|
||||
let tick_height_final = tick_height_initial + ticks_per_slot + 2;
|
||||
let mut curr_slot = 0;
|
||||
let leader_id = Pubkey::new_rand();
|
||||
let leader_pubkey = Pubkey::new_rand();
|
||||
|
||||
for tick_height in tick_height_initial..=tick_height_final {
|
||||
if tick_height == 5 {
|
||||
blockstream
|
||||
.emit_block_event(curr_slot, tick_height - 1, &leader_id, blockhash)
|
||||
.emit_block_event(curr_slot, tick_height - 1, &leader_pubkey, blockhash)
|
||||
.unwrap();
|
||||
curr_slot += 1;
|
||||
}
|
||||
let entry = Entry::new(&mut blockhash, 1, vec![]); // just ticks
|
||||
blockhash = entry.hash;
|
||||
blockstream
|
||||
.emit_entry_event(curr_slot, tick_height, &leader_id, &entry)
|
||||
.emit_entry_event(curr_slot, tick_height, &leader_pubkey, &entry)
|
||||
.unwrap();
|
||||
expected_entries.push(entry.clone());
|
||||
entries.push(entry);
|
||||
|
@ -121,7 +121,7 @@ mod test {
|
||||
#[test]
|
||||
fn test_blockstream_service_process_entries() {
|
||||
let ticks_per_slot = 5;
|
||||
let leader_id = Pubkey::new_rand();
|
||||
let leader_pubkey = Pubkey::new_rand();
|
||||
|
||||
// Set up genesis block and blocktree
|
||||
let GenesisBlockInfo {
|
||||
@ -162,7 +162,7 @@ mod test {
|
||||
.write_entries(1, 0, 0, ticks_per_slot, &entries)
|
||||
.unwrap();
|
||||
|
||||
slot_full_sender.send((1, leader_id)).unwrap();
|
||||
slot_full_sender.send((1, leader_pubkey)).unwrap();
|
||||
BlockstreamService::process_entries(
|
||||
&slot_full_receiver,
|
||||
&Arc::new(blocktree),
|
||||
|
@ -1,6 +1,6 @@
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
|
||||
pub trait Cluster {
|
||||
fn get_node_ids(&self) -> Vec<Pubkey>;
|
||||
fn get_node_pubkeys(&self) -> Vec<Pubkey>;
|
||||
fn restart_node(&mut self, pubkey: Pubkey);
|
||||
}
|
||||
|
@ -72,9 +72,9 @@ pub struct ClusterInfo {
|
||||
pub gossip: CrdsGossip,
|
||||
/// set the keypair that will be used to sign crds values generated. It is unset only in tests.
|
||||
pub(crate) keypair: Arc<Keypair>,
|
||||
// TODO: remove gossip_leader_id once all usage of `set_leader()` and `leader_data()` is
|
||||
// TODO: remove gossip_leader_pubkey once all usage of `set_leader()` and `leader_data()` is
|
||||
// purged
|
||||
gossip_leader_id: Pubkey,
|
||||
gossip_leader_pubkey: Pubkey,
|
||||
/// The network entrypoint
|
||||
entrypoint: Option<ContactInfo>,
|
||||
}
|
||||
@ -175,7 +175,7 @@ impl ClusterInfo {
|
||||
let mut me = Self {
|
||||
gossip: CrdsGossip::default(),
|
||||
keypair,
|
||||
gossip_leader_id: Pubkey::default(),
|
||||
gossip_leader_pubkey: Pubkey::default(),
|
||||
entrypoint: None,
|
||||
};
|
||||
let id = contact_info.id;
|
||||
@ -232,18 +232,18 @@ impl ClusterInfo {
|
||||
|
||||
// Deprecated: don't use leader_data().
|
||||
pub fn leader_data(&self) -> Option<&ContactInfo> {
|
||||
let leader_id = self.gossip_leader_id;
|
||||
if leader_id == Pubkey::default() {
|
||||
let leader_pubkey = self.gossip_leader_pubkey;
|
||||
if leader_pubkey == Pubkey::default() {
|
||||
return None;
|
||||
}
|
||||
self.lookup(&leader_id)
|
||||
self.lookup(&leader_pubkey)
|
||||
}
|
||||
|
||||
pub fn contact_info_trace(&self) -> String {
|
||||
let now = timestamp();
|
||||
let mut spy_nodes = 0;
|
||||
let mut replicators = 0;
|
||||
let my_id = self.my_data().id;
|
||||
let my_pubkey = self.my_data().id;
|
||||
let nodes: Vec<_> = self
|
||||
.all_peers()
|
||||
.into_iter()
|
||||
@ -268,7 +268,7 @@ impl ClusterInfo {
|
||||
addr_to_string(&node.gossip),
|
||||
now.saturating_sub(last_updated),
|
||||
node.id,
|
||||
if node.id == my_id { "(me)" } else { "" }.to_string(),
|
||||
if node.id == my_pubkey { "(me)" } else { "" }.to_string(),
|
||||
addr_to_string(&node.tpu),
|
||||
addr_to_string(&node.rpc),
|
||||
)
|
||||
@ -296,13 +296,13 @@ impl ClusterInfo {
|
||||
}
|
||||
|
||||
/// Record the id of the current leader for use by `leader_tpu_via_blobs()`
|
||||
pub fn set_leader(&mut self, leader_id: &Pubkey) {
|
||||
if *leader_id != self.gossip_leader_id {
|
||||
pub fn set_leader(&mut self, leader_pubkey: &Pubkey) {
|
||||
if *leader_pubkey != self.gossip_leader_pubkey {
|
||||
warn!(
|
||||
"{}: LEADER_UPDATE TO {} from {}",
|
||||
self.gossip.id, leader_id, self.gossip_leader_id,
|
||||
self.gossip.id, leader_pubkey, self.gossip_leader_pubkey,
|
||||
);
|
||||
self.gossip_leader_id = *leader_id;
|
||||
self.gossip_leader_pubkey = *leader_pubkey;
|
||||
}
|
||||
}
|
||||
|
||||
@ -730,7 +730,7 @@ impl ClusterInfo {
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
peers: &[ContactInfo],
|
||||
blob: &SharedBlob,
|
||||
slot_leader_id: Option<Pubkey>,
|
||||
slot_leader_pubkey: Option<Pubkey>,
|
||||
s: &UdpSocket,
|
||||
forwarded: bool,
|
||||
) -> Result<()> {
|
||||
@ -746,7 +746,7 @@ impl ClusterInfo {
|
||||
trace!("retransmit orders {}", orders.len());
|
||||
let errs: Vec<_> = orders
|
||||
.par_iter()
|
||||
.filter(|v| v.id != slot_leader_id.unwrap_or_default())
|
||||
.filter(|v| v.id != slot_leader_pubkey.unwrap_or_default())
|
||||
.map(|v| {
|
||||
debug!(
|
||||
"{}: retransmit blob {} to {} {}",
|
||||
@ -2283,8 +2283,8 @@ fn test_add_entrypoint() {
|
||||
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
|
||||
node_keypair,
|
||||
);
|
||||
let entrypoint_id = Pubkey::new_rand();
|
||||
let entrypoint = ContactInfo::new_localhost(&entrypoint_id, timestamp());
|
||||
let entrypoint_pubkey = Pubkey::new_rand();
|
||||
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
|
||||
cluster_info.set_entrypoint(entrypoint.clone());
|
||||
let pulls = cluster_info.new_pull_requests(&HashMap::new());
|
||||
assert_eq!(1, pulls.len());
|
||||
@ -2305,7 +2305,11 @@ fn test_add_entrypoint() {
|
||||
// now add this message back to the table and make sure after the next pull, the entrypoint is unset
|
||||
let entrypoint_crdsvalue = CrdsValue::ContactInfo(entrypoint.clone());
|
||||
let cluster_info = Arc::new(RwLock::new(cluster_info));
|
||||
ClusterInfo::handle_pull_response(&cluster_info, &entrypoint_id, vec![entrypoint_crdsvalue]);
|
||||
ClusterInfo::handle_pull_response(
|
||||
&cluster_info,
|
||||
&entrypoint_pubkey,
|
||||
vec![entrypoint_crdsvalue],
|
||||
);
|
||||
let pulls = cluster_info
|
||||
.write()
|
||||
.unwrap()
|
||||
|
@ -111,7 +111,7 @@ impl ClusterInfoRepairListener {
|
||||
epoch_schedule: &EpochSchedule,
|
||||
) -> Result<()> {
|
||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let my_id = cluster_info.read().unwrap().id();
|
||||
let my_pubkey = cluster_info.read().unwrap().id();
|
||||
let mut my_gossiped_root = 0;
|
||||
|
||||
loop {
|
||||
@ -127,7 +127,7 @@ impl ClusterInfoRepairListener {
|
||||
for peer in peers {
|
||||
let last_update_ts = Self::get_last_ts(peer.id, peer_roots);
|
||||
let my_root =
|
||||
Self::read_my_gossiped_root(&my_id, cluster_info, &mut my_gossiped_root);
|
||||
Self::read_my_gossiped_root(&my_pubkey, cluster_info, &mut my_gossiped_root);
|
||||
{
|
||||
let r_cluster_info = cluster_info.read().unwrap();
|
||||
|
||||
@ -155,7 +155,7 @@ impl ClusterInfoRepairListener {
|
||||
|
||||
// After updating all the peers, send out repairs to those that need it
|
||||
let _ = Self::serve_repairs(
|
||||
&my_id,
|
||||
&my_pubkey,
|
||||
blocktree,
|
||||
peer_roots,
|
||||
&peers_needing_repairs,
|
||||
@ -170,7 +170,7 @@ impl ClusterInfoRepairListener {
|
||||
}
|
||||
|
||||
fn serve_repairs(
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
blocktree: &Blocktree,
|
||||
peer_roots: &HashMap<Pubkey, (u64, u64)>,
|
||||
repairees: &HashMap<Pubkey, EpochSlots>,
|
||||
@ -179,19 +179,19 @@ impl ClusterInfoRepairListener {
|
||||
my_gossiped_root: &mut u64,
|
||||
epoch_schedule: &EpochSchedule,
|
||||
) -> Result<()> {
|
||||
for (repairee_id, repairee_epoch_slots) in repairees {
|
||||
for (repairee_pubkey, repairee_epoch_slots) in repairees {
|
||||
let repairee_root = repairee_epoch_slots.root;
|
||||
|
||||
let repairee_tvu = {
|
||||
let r_cluster_info = cluster_info.read().unwrap();
|
||||
let contact_info = r_cluster_info.get_contact_info_for_node(repairee_id);
|
||||
let contact_info = r_cluster_info.get_contact_info_for_node(repairee_pubkey);
|
||||
contact_info.map(|c| c.tvu)
|
||||
};
|
||||
|
||||
if let Some(repairee_tvu) = repairee_tvu {
|
||||
// For every repairee, get the set of repairmen who are responsible for
|
||||
let mut eligible_repairmen = Self::find_eligible_repairmen(
|
||||
my_id,
|
||||
my_pubkey,
|
||||
repairee_root,
|
||||
peer_roots,
|
||||
NUM_BUFFER_SLOTS,
|
||||
@ -199,14 +199,15 @@ impl ClusterInfoRepairListener {
|
||||
|
||||
Self::shuffle_repairmen(
|
||||
&mut eligible_repairmen,
|
||||
repairee_id,
|
||||
repairee_pubkey,
|
||||
repairee_epoch_slots.root,
|
||||
);
|
||||
|
||||
let my_root = Self::read_my_gossiped_root(my_id, cluster_info, my_gossiped_root);
|
||||
let my_root =
|
||||
Self::read_my_gossiped_root(my_pubkey, cluster_info, my_gossiped_root);
|
||||
|
||||
let _ = Self::serve_repairs_to_repairee(
|
||||
my_id,
|
||||
my_pubkey,
|
||||
my_root,
|
||||
blocktree,
|
||||
&repairee_epoch_slots,
|
||||
@ -223,7 +224,7 @@ impl ClusterInfoRepairListener {
|
||||
}
|
||||
|
||||
fn serve_repairs_to_repairee(
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
my_root: u64,
|
||||
blocktree: &Blocktree,
|
||||
repairee_epoch_slots: &EpochSlots,
|
||||
@ -266,7 +267,7 @@ impl ClusterInfoRepairListener {
|
||||
// the cluster
|
||||
let num_blobs_in_slot = slot_meta.received as usize;
|
||||
if let Some(my_repair_indexes) = Self::calculate_my_repairman_index_for_slot(
|
||||
my_id,
|
||||
my_pubkey,
|
||||
&eligible_repairmen,
|
||||
num_blobs_in_slot,
|
||||
REPAIR_REDUNDANCY,
|
||||
@ -316,13 +317,13 @@ impl ClusterInfoRepairListener {
|
||||
|
||||
fn shuffle_repairmen(
|
||||
eligible_repairmen: &mut Vec<&Pubkey>,
|
||||
repairee_id: &Pubkey,
|
||||
repairee_pubkey: &Pubkey,
|
||||
repairee_root: u64,
|
||||
) {
|
||||
// Make a seed from pubkey + repairee root
|
||||
let mut seed = [0u8; mem::size_of::<Pubkey>()];
|
||||
let repairee_id_bytes = repairee_id.as_ref();
|
||||
seed[..repairee_id_bytes.len()].copy_from_slice(repairee_id_bytes);
|
||||
let repairee_pubkey_bytes = repairee_pubkey.as_ref();
|
||||
seed[..repairee_pubkey_bytes.len()].copy_from_slice(repairee_pubkey_bytes);
|
||||
LittleEndian::write_u64(&mut seed[0..], repairee_root);
|
||||
|
||||
// Deterministically shuffle the eligible repairmen based on the seed
|
||||
@ -334,7 +335,7 @@ impl ClusterInfoRepairListener {
|
||||
// such that each blob in the slot is the responsibility of `repair_redundancy` or
|
||||
// `repair_redundancy + 1` number of repairmen in the cluster.
|
||||
fn calculate_my_repairman_index_for_slot(
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
eligible_repairmen: &[&Pubkey],
|
||||
num_blobs_in_slot: usize,
|
||||
repair_redundancy: usize,
|
||||
@ -350,7 +351,7 @@ impl ClusterInfoRepairListener {
|
||||
// Calculate the indexes this node is responsible for
|
||||
if let Some(my_position) = eligible_repairmen[..total_repairmen_for_slot]
|
||||
.iter()
|
||||
.position(|id| *id == my_id)
|
||||
.position(|id| *id == my_pubkey)
|
||||
{
|
||||
let start_index = my_position % num_blobs_in_slot;
|
||||
Some(BlobIndexesToRepairIterator::new(
|
||||
@ -367,7 +368,7 @@ impl ClusterInfoRepairListener {
|
||||
}
|
||||
|
||||
fn find_eligible_repairmen<'a>(
|
||||
my_id: &'a Pubkey,
|
||||
my_pubkey: &'a Pubkey,
|
||||
repairee_root: u64,
|
||||
repairman_roots: &'a HashMap<Pubkey, (u64, u64)>,
|
||||
num_buffer_slots: usize,
|
||||
@ -387,19 +388,19 @@ impl ClusterInfoRepairListener {
|
||||
})
|
||||
.collect();
|
||||
|
||||
repairmen.push(my_id);
|
||||
repairmen.push(my_pubkey);
|
||||
repairmen
|
||||
}
|
||||
|
||||
fn read_my_gossiped_root(
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
old_root: &mut u64,
|
||||
) -> u64 {
|
||||
let new_root = cluster_info
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_gossiped_root_for_node(&my_id, None);
|
||||
.get_gossiped_root_for_node(&my_pubkey, None);
|
||||
|
||||
if let Some(new_root) = new_root {
|
||||
*old_root = new_root;
|
||||
@ -519,7 +520,7 @@ mod tests {
|
||||
blocktree.set_root(num_slots - 1, 0).unwrap();
|
||||
|
||||
// Set up my information
|
||||
let my_id = Pubkey::new_rand();
|
||||
let my_pubkey = Pubkey::new_rand();
|
||||
let my_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
|
||||
// Set up a mock repairee with a socket listening for incoming repairs
|
||||
@ -536,7 +537,7 @@ mod tests {
|
||||
let num_repairmen = blobs_per_slot - 1;
|
||||
let mut eligible_repairmen: Vec<_> =
|
||||
(0..num_repairmen).map(|_| Pubkey::new_rand()).collect();
|
||||
eligible_repairmen.push(my_id);
|
||||
eligible_repairmen.push(my_pubkey);
|
||||
let eligible_repairmen_refs: Vec<_> = eligible_repairmen.iter().collect();
|
||||
|
||||
// Have all the repairman send the repairs
|
||||
@ -595,7 +596,7 @@ mod tests {
|
||||
blocktree.set_root(slots_per_epoch * 2 - 1, 0).unwrap();
|
||||
|
||||
// Set up my information
|
||||
let my_id = Pubkey::new_rand();
|
||||
let my_pubkey = Pubkey::new_rand();
|
||||
let my_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
|
||||
// Set up a mock repairee with a socket listening for incoming repairs
|
||||
@ -613,11 +614,11 @@ mod tests {
|
||||
EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots.clone(), 1);
|
||||
|
||||
ClusterInfoRepairListener::serve_repairs_to_repairee(
|
||||
&my_id,
|
||||
&my_pubkey,
|
||||
total_slots - 1,
|
||||
&blocktree,
|
||||
&repairee_epoch_slots,
|
||||
&vec![&my_id],
|
||||
&vec![&my_pubkey],
|
||||
&my_socket,
|
||||
&mock_repairee.tvu_address,
|
||||
1 as usize,
|
||||
@ -634,11 +635,11 @@ mod tests {
|
||||
let repairee_epoch_slots =
|
||||
EpochSlots::new(mock_repairee.id, stakers_slot_offset, repairee_slots, 1);
|
||||
ClusterInfoRepairListener::serve_repairs_to_repairee(
|
||||
&my_id,
|
||||
&my_pubkey,
|
||||
total_slots - 1,
|
||||
&blocktree,
|
||||
&repairee_epoch_slots,
|
||||
&vec![&my_id],
|
||||
&vec![&my_pubkey],
|
||||
&my_socket,
|
||||
&mock_repairee.tvu_address,
|
||||
1 as usize,
|
||||
|
@ -263,7 +263,7 @@ mod test {
|
||||
fn test_new_mark_creation_time() {
|
||||
let mut crds = Crds::default();
|
||||
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
|
||||
let node_id = entry.label().pubkey();
|
||||
let node_pubkey = entry.label().pubkey();
|
||||
let mut node = CrdsGossipPull::default();
|
||||
crds.insert(entry.clone(), 0).unwrap();
|
||||
let old = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
|
||||
@ -276,7 +276,7 @@ mod test {
|
||||
|
||||
// odds of getting the other request should be 1 in u64::max_value()
|
||||
for _ in 0..10 {
|
||||
let req = node.new_pull_request(&crds, &node_id, u64::max_value(), &HashMap::new());
|
||||
let req = node.new_pull_request(&crds, &node_pubkey, u64::max_value(), &HashMap::new());
|
||||
let (to, _, self_info) = req.unwrap();
|
||||
assert_eq!(to, old.label().pubkey());
|
||||
assert_eq!(self_info, entry);
|
||||
@ -287,12 +287,12 @@ mod test {
|
||||
fn test_process_pull_request() {
|
||||
let mut node_crds = Crds::default();
|
||||
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
|
||||
let node_id = entry.label().pubkey();
|
||||
let node_pubkey = entry.label().pubkey();
|
||||
let node = CrdsGossipPull::default();
|
||||
node_crds.insert(entry.clone(), 0).unwrap();
|
||||
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
|
||||
node_crds.insert(new.clone(), 0).unwrap();
|
||||
let req = node.new_pull_request(&node_crds, &node_id, 0, &HashMap::new());
|
||||
let req = node.new_pull_request(&node_crds, &node_pubkey, 0, &HashMap::new());
|
||||
|
||||
let mut dest_crds = Crds::default();
|
||||
let mut dest = CrdsGossipPull::default();
|
||||
@ -319,7 +319,7 @@ mod test {
|
||||
fn test_process_pull_request_response() {
|
||||
let mut node_crds = Crds::default();
|
||||
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
|
||||
let node_id = entry.label().pubkey();
|
||||
let node_pubkey = entry.label().pubkey();
|
||||
let mut node = CrdsGossipPull::default();
|
||||
node_crds.insert(entry.clone(), 0).unwrap();
|
||||
|
||||
@ -347,7 +347,7 @@ mod test {
|
||||
let mut done = false;
|
||||
for _ in 0..30 {
|
||||
// there is a chance of a false positive with bloom filters
|
||||
let req = node.new_pull_request(&node_crds, &node_id, 0, &HashMap::new());
|
||||
let req = node.new_pull_request(&node_crds, &node_pubkey, 0, &HashMap::new());
|
||||
let (_, filter, caller) = req.unwrap();
|
||||
let rsp = dest.process_pull_request(&mut dest_crds, caller, filter, 0);
|
||||
// if there is a false positive this is empty
|
||||
@ -357,7 +357,7 @@ mod test {
|
||||
}
|
||||
|
||||
assert_eq!(rsp.len(), 1);
|
||||
let failed = node.process_pull_response(&mut node_crds, &node_id, rsp, 1);
|
||||
let failed = node.process_pull_response(&mut node_crds, &node_pubkey, rsp, 1);
|
||||
assert_eq!(failed, 0);
|
||||
assert_eq!(
|
||||
node_crds
|
||||
@ -384,7 +384,7 @@ mod test {
|
||||
let mut node_crds = Crds::default();
|
||||
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
|
||||
let node_label = entry.label();
|
||||
let node_id = node_label.pubkey();
|
||||
let node_pubkey = node_label.pubkey();
|
||||
let mut node = CrdsGossipPull::default();
|
||||
node_crds.insert(entry.clone(), 0).unwrap();
|
||||
let old = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
|
||||
@ -395,7 +395,7 @@ mod test {
|
||||
assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);
|
||||
|
||||
// purge
|
||||
node.purge_active(&mut node_crds, &node_id, 1);
|
||||
node.purge_active(&mut node_crds, &node_pubkey, 1);
|
||||
|
||||
//verify self is still valid after purge
|
||||
assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);
|
||||
|
@ -385,13 +385,13 @@ mod tests {
|
||||
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
|
||||
|
||||
// Create new vote account
|
||||
let node_id = Pubkey::new_rand();
|
||||
let vote_id = Pubkey::new_rand();
|
||||
let node_pubkey = Pubkey::new_rand();
|
||||
let vote_pubkey = Pubkey::new_rand();
|
||||
setup_vote_and_stake_accounts(
|
||||
&bank,
|
||||
&mint_keypair,
|
||||
&vote_id,
|
||||
&node_id,
|
||||
&vote_pubkey,
|
||||
&node_pubkey,
|
||||
BOOTSTRAP_LEADER_LAMPORTS,
|
||||
);
|
||||
|
||||
@ -412,14 +412,14 @@ mod tests {
|
||||
|
||||
let schedule = cache.compute_epoch_schedule(epoch, &bank).unwrap();
|
||||
let mut index = 0;
|
||||
while schedule[index] != node_id {
|
||||
while schedule[index] != node_pubkey {
|
||||
index += 1;
|
||||
assert_ne!(index, genesis_block.slots_per_epoch);
|
||||
}
|
||||
expected_slot += index;
|
||||
|
||||
assert_eq!(
|
||||
cache.next_leader_slot(&node_id, 0, &bank, None),
|
||||
cache.next_leader_slot(&node_pubkey, 0, &bank, None),
|
||||
Some(expected_slot),
|
||||
);
|
||||
}
|
||||
|
@ -41,9 +41,9 @@ fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
|
||||
// Sort first by stake. If stakes are the same, sort by pubkey to ensure a
|
||||
// deterministic result.
|
||||
// Note: Use unstable sort, because we dedup right after to remove the equal elements.
|
||||
stakes.sort_unstable_by(|(l_id, l_stake), (r_id, r_stake)| {
|
||||
stakes.sort_unstable_by(|(l_pubkey, l_stake), (r_pubkey, r_stake)| {
|
||||
if r_stake == l_stake {
|
||||
r_id.cmp(&l_id)
|
||||
r_pubkey.cmp(&l_pubkey)
|
||||
} else {
|
||||
r_stake.cmp(&l_stake)
|
||||
}
|
||||
@ -68,10 +68,10 @@ mod tests {
|
||||
create_genesis_block_with_leader(0, &pubkey, BOOTSTRAP_LEADER_LAMPORTS).genesis_block;
|
||||
let bank = Bank::new(&genesis_block);
|
||||
|
||||
let ids_and_stakes: Vec<_> = staking_utils::staked_nodes(&bank).into_iter().collect();
|
||||
let pubkeys_and_stakes: Vec<_> = staking_utils::staked_nodes(&bank).into_iter().collect();
|
||||
let seed = [0u8; 32];
|
||||
let leader_schedule = LeaderSchedule::new(
|
||||
&ids_and_stakes,
|
||||
&pubkeys_and_stakes,
|
||||
seed,
|
||||
genesis_block.slots_per_epoch,
|
||||
NUM_CONSECUTIVE_LEADER_SLOTS,
|
||||
|
@ -36,14 +36,14 @@ pub struct ValidatorInfo {
|
||||
}
|
||||
|
||||
pub struct ReplicatorInfo {
|
||||
pub replicator_storage_id: Pubkey,
|
||||
pub replicator_storage_pubkey: Pubkey,
|
||||
pub ledger_path: String,
|
||||
}
|
||||
|
||||
impl ReplicatorInfo {
|
||||
fn new(storage_id: Pubkey, ledger_path: String) -> Self {
|
||||
fn new(storage_pubkey: Pubkey, ledger_path: String) -> Self {
|
||||
Self {
|
||||
replicator_storage_id: storage_id,
|
||||
replicator_storage_pubkey: storage_pubkey,
|
||||
ledger_path,
|
||||
}
|
||||
}
|
||||
@ -396,7 +396,7 @@ impl LocalCluster {
|
||||
amount: u64,
|
||||
) -> Result<()> {
|
||||
let vote_account_pubkey = vote_account.pubkey();
|
||||
let node_id = from_account.pubkey();
|
||||
let node_pubkey = from_account.pubkey();
|
||||
|
||||
// Create the vote account if necessary
|
||||
if client.poll_get_balance(&vote_account_pubkey).unwrap_or(0) == 0 {
|
||||
@ -407,7 +407,7 @@ impl LocalCluster {
|
||||
vote_instruction::create_account(
|
||||
&from_account.pubkey(),
|
||||
&vote_account_pubkey,
|
||||
&node_id,
|
||||
&node_pubkey,
|
||||
0,
|
||||
amount,
|
||||
),
|
||||
@ -461,7 +461,7 @@ impl LocalCluster {
|
||||
let vote_account_user_data = client.get_account_data(&vote_account_pubkey);
|
||||
if let Ok(Some(vote_account_user_data)) = vote_account_user_data {
|
||||
if let Ok(vote_state) = VoteState::deserialize(&vote_account_user_data) {
|
||||
if vote_state.node_id == node_id {
|
||||
if vote_state.node_pubkey == node_pubkey {
|
||||
info!("vote account registered");
|
||||
return Ok(());
|
||||
}
|
||||
@ -506,7 +506,7 @@ impl LocalCluster {
|
||||
}
|
||||
|
||||
impl Cluster for LocalCluster {
|
||||
fn get_node_ids(&self) -> Vec<Pubkey> {
|
||||
fn get_node_pubkeys(&self) -> Vec<Pubkey> {
|
||||
self.fullnodes.keys().cloned().collect()
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,7 @@ pub struct EpochStakes {
|
||||
stakes: HashMap<Pubkey, u64>,
|
||||
self_staked: u64,
|
||||
total_staked: u64,
|
||||
delegate_id: Pubkey,
|
||||
delegate_pubkey: Pubkey,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
@ -39,15 +39,15 @@ pub struct Locktower {
|
||||
}
|
||||
|
||||
impl EpochStakes {
|
||||
pub fn new(epoch: u64, stakes: HashMap<Pubkey, u64>, delegate_id: &Pubkey) -> Self {
|
||||
pub fn new(epoch: u64, stakes: HashMap<Pubkey, u64>, delegate_pubkey: &Pubkey) -> Self {
|
||||
let total_staked = stakes.values().sum();
|
||||
let self_staked = *stakes.get(&delegate_id).unwrap_or(&0);
|
||||
let self_staked = *stakes.get(&delegate_pubkey).unwrap_or(&0);
|
||||
Self {
|
||||
epoch,
|
||||
stakes,
|
||||
total_staked,
|
||||
self_staked,
|
||||
delegate_id: *delegate_id,
|
||||
delegate_pubkey: *delegate_pubkey,
|
||||
}
|
||||
}
|
||||
pub fn new_for_tests(lamports: u64) -> Self {
|
||||
@ -61,21 +61,21 @@ impl EpochStakes {
|
||||
let stakes = accounts.iter().map(|(k, (v, _))| (*k, *v)).collect();
|
||||
Self::new(epoch, stakes, &accounts[0].0)
|
||||
}
|
||||
pub fn new_from_bank(bank: &Bank, my_id: &Pubkey) -> Self {
|
||||
pub fn new_from_bank(bank: &Bank, my_pubkey: &Pubkey) -> Self {
|
||||
let bank_epoch = bank.get_epoch_and_slot_index(bank.slot()).0;
|
||||
let stakes = staking_utils::vote_account_stakes_at_epoch(bank, bank_epoch)
|
||||
.expect("voting require a bank with stakes");
|
||||
Self::new(bank_epoch, stakes, my_id)
|
||||
Self::new(bank_epoch, stakes, my_pubkey)
|
||||
}
|
||||
}
|
||||
|
||||
impl Locktower {
|
||||
pub fn new_from_forks(bank_forks: &BankForks, my_id: &Pubkey) -> Self {
|
||||
pub fn new_from_forks(bank_forks: &BankForks, my_pubkey: &Pubkey) -> Self {
|
||||
let mut frozen_banks: Vec<_> = bank_forks.frozen_banks().values().cloned().collect();
|
||||
frozen_banks.sort_by_key(|b| (b.parents().len(), b.slot()));
|
||||
let epoch_stakes = {
|
||||
if let Some(bank) = frozen_banks.last() {
|
||||
EpochStakes::new_from_bank(bank, my_id)
|
||||
EpochStakes::new_from_bank(bank, my_pubkey)
|
||||
} else {
|
||||
return Self::default();
|
||||
}
|
||||
@ -124,8 +124,8 @@ impl Locktower {
|
||||
}
|
||||
let mut vote_state = vote_state.unwrap();
|
||||
|
||||
if key == self.epoch_stakes.delegate_id
|
||||
|| vote_state.node_id == self.epoch_stakes.delegate_id
|
||||
if key == self.epoch_stakes.delegate_pubkey
|
||||
|| vote_state.node_pubkey == self.epoch_stakes.delegate_pubkey
|
||||
{
|
||||
debug!("vote state {:?}", vote_state);
|
||||
debug!(
|
||||
@ -220,7 +220,8 @@ impl Locktower {
|
||||
bank.slot(),
|
||||
self.epoch_stakes.epoch
|
||||
);
|
||||
self.epoch_stakes = EpochStakes::new_from_bank(bank, &self.epoch_stakes.delegate_id);
|
||||
self.epoch_stakes =
|
||||
EpochStakes::new_from_bank(bank, &self.epoch_stakes.delegate_pubkey);
|
||||
datapoint_info!(
|
||||
"locktower-epoch",
|
||||
("epoch", self.epoch_stakes.epoch, i64),
|
||||
@ -382,8 +383,8 @@ impl Locktower {
|
||||
fn initialize_lockouts_from_bank(bank: &Bank, current_epoch: u64) -> VoteState {
|
||||
let mut lockouts = VoteState::default();
|
||||
if let Some(iter) = bank.epoch_vote_accounts(current_epoch) {
|
||||
for (delegate_id, (_, account)) in iter {
|
||||
if *delegate_id == bank.collector_id() {
|
||||
for (delegate_pubkey, (_, account)) in iter {
|
||||
if *delegate_pubkey == bank.collector_id() {
|
||||
let state = VoteState::deserialize(&account.data).expect("votes");
|
||||
if lockouts.votes.len() < state.votes.len() {
|
||||
lockouts = state;
|
||||
|
@ -88,11 +88,12 @@ impl PohRecorder {
|
||||
|
||||
pub fn would_be_leader(&self, within_next_n_ticks: u64) -> bool {
|
||||
let close_to_leader_tick = self.start_leader_at_tick.map_or(false, |leader_tick| {
|
||||
let leader_ideal_start_tick =
|
||||
let leader_pubkeyeal_start_tick =
|
||||
leader_tick.saturating_sub(self.max_last_leader_grace_ticks);
|
||||
|
||||
self.tick_height() <= self.last_leader_tick.unwrap_or(0)
|
||||
&& self.tick_height() >= leader_ideal_start_tick.saturating_sub(within_next_n_ticks)
|
||||
&& self.tick_height()
|
||||
>= leader_pubkeyeal_start_tick.saturating_sub(within_next_n_ticks)
|
||||
});
|
||||
|
||||
self.working_bank.is_some() || close_to_leader_tick
|
||||
@ -128,7 +129,7 @@ impl PohRecorder {
|
||||
self.max_last_leader_grace_ticks
|
||||
);
|
||||
|
||||
let leader_ideal_start_tick =
|
||||
let leader_pubkeyeal_start_tick =
|
||||
target_tick.saturating_sub(self.max_last_leader_grace_ticks);
|
||||
// Is the current tick in the same slot as the target tick?
|
||||
// Check if either grace period has expired,
|
||||
@ -140,7 +141,8 @@ impl PohRecorder {
|
||||
{
|
||||
return (
|
||||
true,
|
||||
self.tick_height().saturating_sub(leader_ideal_start_tick),
|
||||
self.tick_height()
|
||||
.saturating_sub(leader_pubkeyeal_start_tick),
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -72,7 +72,7 @@ impl ForkProgress {
|
||||
impl ReplayStage {
|
||||
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
|
||||
pub fn new<T>(
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
vote_account: &Pubkey,
|
||||
voting_keypair: Option<&Arc<T>>,
|
||||
blocktree: Arc<Blocktree>,
|
||||
@ -94,9 +94,9 @@ impl ReplayStage {
|
||||
let subscriptions = subscriptions.clone();
|
||||
let bank_forks = bank_forks.clone();
|
||||
let poh_recorder = poh_recorder.clone();
|
||||
let my_id = *my_id;
|
||||
let my_pubkey = *my_pubkey;
|
||||
let mut ticks_per_slot = 0;
|
||||
let mut locktower = Locktower::new_from_forks(&bank_forks.read().unwrap(), &my_id);
|
||||
let mut locktower = Locktower::new_from_forks(&bank_forks.read().unwrap(), &my_pubkey);
|
||||
// Start the replay stage loop
|
||||
let leader_schedule_cache = leader_schedule_cache.clone();
|
||||
let vote_account = *vote_account;
|
||||
@ -124,7 +124,7 @@ impl ReplayStage {
|
||||
Self::replay_active_banks(
|
||||
&blocktree,
|
||||
&bank_forks,
|
||||
&my_id,
|
||||
&my_pubkey,
|
||||
&mut ticks_per_slot,
|
||||
&mut progress,
|
||||
&slot_full_sender,
|
||||
@ -156,7 +156,7 @@ impl ReplayStage {
|
||||
)?;
|
||||
|
||||
Self::reset_poh_recorder(
|
||||
&my_id,
|
||||
&my_pubkey,
|
||||
&blocktree,
|
||||
&bank,
|
||||
&poh_recorder,
|
||||
@ -182,7 +182,7 @@ impl ReplayStage {
|
||||
poh_tick_height + 1,
|
||||
);
|
||||
Self::start_leader(
|
||||
&my_id,
|
||||
&my_pubkey,
|
||||
&bank_forks,
|
||||
&poh_recorder,
|
||||
&cluster_info,
|
||||
@ -211,7 +211,7 @@ impl ReplayStage {
|
||||
(Self { t_replay }, slot_full_receiver, root_slot_receiver)
|
||||
}
|
||||
pub fn start_leader(
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
@ -220,7 +220,7 @@ impl ReplayStage {
|
||||
grace_ticks: u64,
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
) {
|
||||
trace!("{} checking poh slot {}", my_id, poh_slot);
|
||||
trace!("{} checking poh slot {}", my_pubkey, poh_slot);
|
||||
if bank_forks.read().unwrap().get(poh_slot).is_none() {
|
||||
let parent_slot = poh_recorder.lock().unwrap().start_slot();
|
||||
let parent = {
|
||||
@ -235,16 +235,16 @@ impl ReplayStage {
|
||||
.map(|next_leader| {
|
||||
debug!(
|
||||
"me: {} leader {} at poh slot {}",
|
||||
my_id, next_leader, poh_slot
|
||||
my_pubkey, next_leader, poh_slot
|
||||
);
|
||||
cluster_info.write().unwrap().set_leader(&next_leader);
|
||||
if next_leader == *my_id && reached_leader_tick {
|
||||
debug!("{} starting tpu for slot {}", my_id, poh_slot);
|
||||
if next_leader == *my_pubkey && reached_leader_tick {
|
||||
debug!("{} starting tpu for slot {}", my_pubkey, poh_slot);
|
||||
datapoint_warn!(
|
||||
"replay_stage-new_leader",
|
||||
("count", poh_slot, i64),
|
||||
("grace", grace_ticks, i64));
|
||||
let tpu_bank = Bank::new_from_parent(&parent, my_id, poh_slot);
|
||||
let tpu_bank = Bank::new_from_parent(&parent, my_pubkey, poh_slot);
|
||||
bank_forks.write().unwrap().insert(tpu_bank);
|
||||
if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() {
|
||||
assert_eq!(
|
||||
@ -253,7 +253,7 @@ impl ReplayStage {
|
||||
);
|
||||
debug!(
|
||||
"poh_recorder new working bank: me: {} next_slot: {} next_leader: {}",
|
||||
my_id,
|
||||
my_pubkey,
|
||||
tpu_bank.slot(),
|
||||
next_leader
|
||||
);
|
||||
@ -262,7 +262,7 @@ impl ReplayStage {
|
||||
}
|
||||
})
|
||||
.or_else(|| {
|
||||
warn!("{} No next leader found", my_id);
|
||||
warn!("{} No next leader found", my_pubkey);
|
||||
None
|
||||
});
|
||||
}
|
||||
@ -345,7 +345,7 @@ impl ReplayStage {
|
||||
}
|
||||
|
||||
fn reset_poh_recorder(
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
blocktree: &Blocktree,
|
||||
bank: &Arc<Bank>,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
@ -353,7 +353,7 @@ impl ReplayStage {
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
) {
|
||||
let next_leader_slot =
|
||||
leader_schedule_cache.next_leader_slot(&my_id, bank.slot(), &bank, Some(blocktree));
|
||||
leader_schedule_cache.next_leader_slot(&my_pubkey, bank.slot(), &bank, Some(blocktree));
|
||||
poh_recorder.lock().unwrap().reset(
|
||||
bank.tick_height(),
|
||||
bank.last_blockhash(),
|
||||
@ -363,7 +363,7 @@ impl ReplayStage {
|
||||
);
|
||||
debug!(
|
||||
"{:?} voted and reset poh at {}. next leader slot {:?}",
|
||||
my_id,
|
||||
my_pubkey,
|
||||
bank.tick_height(),
|
||||
next_leader_slot
|
||||
);
|
||||
@ -372,7 +372,7 @@ impl ReplayStage {
|
||||
fn replay_active_banks(
|
||||
blocktree: &Arc<Blocktree>,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
ticks_per_slot: &mut u64,
|
||||
progress: &mut HashMap<u64, ForkProgress>,
|
||||
slot_full_sender: &Sender<(u64, Pubkey)>,
|
||||
@ -383,12 +383,12 @@ impl ReplayStage {
|
||||
for bank_slot in &active_banks {
|
||||
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
|
||||
*ticks_per_slot = bank.ticks_per_slot();
|
||||
if bank.collector_id() != *my_id {
|
||||
if bank.collector_id() != *my_pubkey {
|
||||
Self::replay_blocktree_into_bank(&bank, &blocktree, progress)?;
|
||||
}
|
||||
let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1;
|
||||
if bank.tick_height() == max_tick_height {
|
||||
Self::process_completed_bank(my_id, bank, slot_full_sender);
|
||||
Self::process_completed_bank(my_pubkey, bank, slot_full_sender);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@ -556,14 +556,14 @@ impl ReplayStage {
|
||||
}
|
||||
|
||||
fn process_completed_bank(
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
bank: Arc<Bank>,
|
||||
slot_full_sender: &Sender<(u64, Pubkey)>,
|
||||
) {
|
||||
bank.freeze();
|
||||
info!("bank frozen {}", bank.slot());
|
||||
if let Err(e) = slot_full_sender.send((bank.slot(), bank.collector_id())) {
|
||||
trace!("{} slot_full alert failed: {:?}", my_id, e);
|
||||
trace!("{} slot_full alert failed: {:?}", my_pubkey, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -496,8 +496,8 @@ impl Replicator {
|
||||
let cluster_info = cluster_info.read().unwrap();
|
||||
let rpc_peers = cluster_info.rpc_peers();
|
||||
debug!("rpc peers: {:?}", rpc_peers);
|
||||
let node_idx = thread_rng().gen_range(0, rpc_peers.len());
|
||||
RpcClient::new_socket(rpc_peers[node_idx].rpc)
|
||||
let node_index = thread_rng().gen_range(0, rpc_peers.len());
|
||||
RpcClient::new_socket(rpc_peers[node_index].rpc)
|
||||
};
|
||||
let storage_blockhash = rpc_client
|
||||
.retry_make_rpc_request(&RpcRequest::GetStorageBlockhash, None, 0)
|
||||
|
@ -548,7 +548,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_rpc_get_balance() {
|
||||
let bob_pubkey = Pubkey::new_rand();
|
||||
let (io, meta, _blockhash, _alice, _leader_id) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
let (io, meta, _blockhash, _alice, _leader_pubkey) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
|
||||
let req = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":1,"method":"getBalance","params":["{}"]}}"#,
|
||||
@ -566,7 +566,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_rpc_get_cluster_nodes() {
|
||||
let bob_pubkey = Pubkey::new_rand();
|
||||
let (io, meta, _blockhash, _alice, leader_id) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
let (io, meta, _blockhash, _alice, leader_pubkey) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
|
||||
let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"getClusterNodes"}}"#);
|
||||
let res = io.handle_request_sync(&req, meta);
|
||||
@ -575,7 +575,7 @@ mod tests {
|
||||
|
||||
let expected = format!(
|
||||
r#"{{"jsonrpc":"2.0","result":[{{"id": "{}", "gossip": "127.0.0.1:1235", "tpu": "127.0.0.1:1234", "rpc": "127.0.0.1:8899"}}],"id":1}}"#,
|
||||
leader_id,
|
||||
leader_pubkey,
|
||||
);
|
||||
|
||||
let expected: Response =
|
||||
@ -586,7 +586,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_rpc_get_slot_leader() {
|
||||
let bob_pubkey = Pubkey::new_rand();
|
||||
let (io, meta, _blockhash, _alice, _leader_id) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
let (io, meta, _blockhash, _alice, _leader_pubkey) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
|
||||
let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"getSlotLeader"}}"#);
|
||||
let res = io.handle_request_sync(&req, meta);
|
||||
@ -602,7 +602,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_rpc_get_tx_count() {
|
||||
let bob_pubkey = Pubkey::new_rand();
|
||||
let (io, meta, _blockhash, _alice, _leader_id) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
let (io, meta, _blockhash, _alice, _leader_pubkey) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
|
||||
let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"getTransactionCount"}}"#);
|
||||
let res = io.handle_request_sync(&req, meta);
|
||||
@ -617,7 +617,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_rpc_get_account_info() {
|
||||
let bob_pubkey = Pubkey::new_rand();
|
||||
let (io, meta, _blockhash, _alice, _leader_id) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
let (io, meta, _blockhash, _alice, _leader_pubkey) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
|
||||
let req = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":1,"method":"getAccountInfo","params":["{}"]}}"#,
|
||||
@ -644,7 +644,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_rpc_confirm_tx() {
|
||||
let bob_pubkey = Pubkey::new_rand();
|
||||
let (io, meta, blockhash, alice, _leader_id) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
let (io, meta, blockhash, alice, _leader_pubkey) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash);
|
||||
|
||||
let req = format!(
|
||||
@ -663,7 +663,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_rpc_get_signature_status() {
|
||||
let bob_pubkey = Pubkey::new_rand();
|
||||
let (io, meta, blockhash, alice, _leader_id) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
let (io, meta, blockhash, alice, _leader_pubkey) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash);
|
||||
|
||||
let req = format!(
|
||||
@ -727,7 +727,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_rpc_get_recent_blockhash() {
|
||||
let bob_pubkey = Pubkey::new_rand();
|
||||
let (io, meta, blockhash, _alice, _leader_id) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
let (io, meta, blockhash, _alice, _leader_pubkey) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
|
||||
let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"getRecentBlockhash"}}"#);
|
||||
let res = io.handle_request_sync(&req, meta);
|
||||
@ -745,7 +745,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_rpc_fail_request_airdrop() {
|
||||
let bob_pubkey = Pubkey::new_rand();
|
||||
let (io, meta, _blockhash, _alice, _leader_id) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
let (io, meta, _blockhash, _alice, _leader_pubkey) = start_rpc_handler_with_tx(&bob_pubkey);
|
||||
|
||||
// Expect internal error because no drone is available
|
||||
let req = format!(
|
||||
|
@ -50,7 +50,7 @@ pub fn staked_nodes_at_epoch(bank: &Bank, epoch_height: u64) -> Option<HashMap<P
|
||||
.map(|vote_accounts| to_staked_nodes(to_vote_states(vote_accounts.into_iter())))
|
||||
}
|
||||
|
||||
// input (vote_id, (stake, vote_account)) => (stake, vote_state)
|
||||
// input (vote_pubkey, (stake, vote_account)) => (stake, vote_state)
|
||||
fn to_vote_states(
|
||||
node_staked_accounts: impl Iterator<Item = (impl Borrow<Pubkey>, impl Borrow<(u64, Account)>)>,
|
||||
) -> impl Iterator<Item = (u64, VoteState)> {
|
||||
@ -67,7 +67,7 @@ fn to_staked_nodes(
|
||||
) -> HashMap<Pubkey, u64> {
|
||||
let mut map: HashMap<Pubkey, u64> = HashMap::new();
|
||||
node_staked_accounts.for_each(|(stake, state)| {
|
||||
map.entry(state.node_id)
|
||||
map.entry(state.node_pubkey)
|
||||
.and_modify(|s| *s += stake)
|
||||
.or_insert(stake);
|
||||
});
|
||||
@ -158,8 +158,8 @@ pub(crate) mod tests {
|
||||
pub(crate) fn setup_vote_and_stake_accounts(
|
||||
bank: &Bank,
|
||||
from_account: &Keypair,
|
||||
vote_id: &Pubkey,
|
||||
node_id: &Pubkey,
|
||||
vote_pubkey: &Pubkey,
|
||||
node_pubkey: &Pubkey,
|
||||
amount: u64,
|
||||
) {
|
||||
fn process_instructions<T: KeypairUtil>(
|
||||
@ -178,7 +178,13 @@ pub(crate) mod tests {
|
||||
process_instructions(
|
||||
bank,
|
||||
&[from_account],
|
||||
vote_instruction::create_account(&from_account.pubkey(), vote_id, node_id, 0, amount),
|
||||
vote_instruction::create_account(
|
||||
&from_account.pubkey(),
|
||||
vote_pubkey,
|
||||
node_pubkey,
|
||||
0,
|
||||
amount,
|
||||
),
|
||||
);
|
||||
|
||||
let stake_account_keypair = Keypair::new();
|
||||
@ -200,7 +206,7 @@ pub(crate) mod tests {
|
||||
vec![stake_instruction::delegate_stake(
|
||||
&from_account.pubkey(),
|
||||
&stake_account_pubkey,
|
||||
vote_id,
|
||||
vote_pubkey,
|
||||
)],
|
||||
);
|
||||
}
|
||||
@ -217,7 +223,7 @@ pub(crate) mod tests {
|
||||
} = create_genesis_block(10_000);
|
||||
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let vote_id = Pubkey::new_rand();
|
||||
let vote_pubkey = Pubkey::new_rand();
|
||||
|
||||
// Give the validator some stake but don't setup a staking account
|
||||
// Validator has no lamports staked, so they get filtered out. Only the bootstrap leader
|
||||
@ -230,7 +236,7 @@ pub(crate) mod tests {
|
||||
setup_vote_and_stake_accounts(
|
||||
&bank,
|
||||
&mint_keypair,
|
||||
&vote_id,
|
||||
&vote_pubkey,
|
||||
&mint_keypair.pubkey(),
|
||||
stake,
|
||||
);
|
||||
|
@ -80,20 +80,20 @@ pub fn should_retransmit_and_persist(
|
||||
blob: &Blob,
|
||||
bank: Option<Arc<Bank>>,
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
) -> bool {
|
||||
let slot_leader_id = match bank {
|
||||
let slot_leader_pubkey = match bank {
|
||||
None => leader_schedule_cache.slot_leader_at(blob.slot(), None),
|
||||
Some(bank) => leader_schedule_cache.slot_leader_at(blob.slot(), Some(&bank)),
|
||||
};
|
||||
|
||||
if blob.id() == *my_id {
|
||||
if blob.id() == *my_pubkey {
|
||||
inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1);
|
||||
false
|
||||
} else if slot_leader_id == None {
|
||||
} else if slot_leader_pubkey == None {
|
||||
inc_new_counter_debug!("streamer-recv_window-unknown_leader", 1);
|
||||
true
|
||||
} else if slot_leader_id != Some(blob.id()) {
|
||||
} else if slot_leader_pubkey != Some(blob.id()) {
|
||||
inc_new_counter_debug!("streamer-recv_window-wrong_leader", 1);
|
||||
false
|
||||
} else {
|
||||
@ -103,7 +103,7 @@ pub fn should_retransmit_and_persist(
|
||||
|
||||
fn recv_window<F>(
|
||||
blocktree: &Arc<Blocktree>,
|
||||
my_id: &Pubkey,
|
||||
my_pubkey: &Pubkey,
|
||||
r: &BlobReceiver,
|
||||
retransmit: &BlobSender,
|
||||
genesis_blockhash: &Hash,
|
||||
@ -126,9 +126,9 @@ where
|
||||
&& blob.read().unwrap().genesis_blockhash() == *genesis_blockhash
|
||||
});
|
||||
|
||||
retransmit_blobs(&blobs, retransmit, my_id)?;
|
||||
retransmit_blobs(&blobs, retransmit, my_pubkey)?;
|
||||
|
||||
trace!("{} num blobs received: {}", my_id, blobs.len());
|
||||
trace!("{} num blobs received: {}", my_pubkey, blobs.len());
|
||||
|
||||
process_blobs(&blobs, blocktree)?;
|
||||
|
||||
@ -294,14 +294,14 @@ mod test {
|
||||
#[test]
|
||||
fn test_should_retransmit_and_persist() {
|
||||
let me_id = Pubkey::new_rand();
|
||||
let leader_id = Pubkey::new_rand();
|
||||
let leader_pubkey = Pubkey::new_rand();
|
||||
let bank = Arc::new(Bank::new(
|
||||
&create_genesis_block_with_leader(100, &leader_id, 10).genesis_block,
|
||||
&create_genesis_block_with_leader(100, &leader_pubkey, 10).genesis_block,
|
||||
));
|
||||
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
|
||||
|
||||
let mut blob = Blob::default();
|
||||
blob.set_id(&leader_id);
|
||||
blob.set_id(&leader_pubkey);
|
||||
|
||||
// without a Bank and blobs not from me, blob continues
|
||||
assert_eq!(
|
||||
|
Reference in New Issue
Block a user