Add option for repairing only from trusted validators (#11752) (#11772)

Co-authored-by: Carl <carl@solana.com>
(cherry picked from commit c8d67aa8eb)

Co-authored-by: carllin <wumu727@gmail.com>
This commit is contained in:
mergify[bot]
2020-08-21 09:01:39 +00:00
committed by GitHub
parent 7c956a87e5
commit 9ee69017dc
6 changed files with 178 additions and 24 deletions

View File

@ -20,7 +20,7 @@ use solana_measure::measure::Measure;
use solana_runtime::bank::Bank;
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
iter::Iterator,
net::SocketAddr,
net::UdpSocket,
@ -110,6 +110,7 @@ pub struct RepairInfo {
pub bank_forks: Arc<RwLock<BankForks>>,
pub epoch_schedule: EpochSchedule,
pub duplicate_slots_reset_sender: DuplicateSlotsResetSender,
pub repair_validators: Option<HashSet<Pubkey>>,
}
pub struct RepairSlotRange {
@ -236,6 +237,7 @@ impl RepairService {
blockstore,
&serve_repair,
&repair_info.duplicate_slots_reset_sender,
&repair_info.repair_validators,
);
Self::generate_and_send_duplicate_repairs(
&mut duplicate_slot_repair_statuses,
@ -244,6 +246,7 @@ impl RepairService {
&serve_repair,
&mut repair_stats,
&repair_socket,
&repair_info.repair_validators,
);*/
repair_weight.get_best_weighted_repairs(
@ -265,6 +268,7 @@ impl RepairService {
repair_request,
&mut cache,
&mut repair_stats,
&repair_info.repair_validators,
) {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
@ -446,9 +450,16 @@ impl RepairService {
serve_repair: &ServeRepair,
repair_stats: &mut RepairStats,
repair_socket: &UdpSocket,
repair_validators: &Option<HashSet<Pubkey>>,
) {
duplicate_slot_repair_statuses.retain(|slot, status| {
Self::update_duplicate_slot_repair_addr(*slot, status, cluster_slots, serve_repair);
Self::update_duplicate_slot_repair_addr(
*slot,
status,
cluster_slots,
serve_repair,
repair_validators,
);
if let Some((repair_pubkey, repair_addr)) = status.repair_pubkey_and_addr {
let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot);
@ -501,13 +512,17 @@ impl RepairService {
status: &mut DuplicateSlotRepairStatus,
cluster_slots: &ClusterSlots,
serve_repair: &ServeRepair,
repair_validators: &Option<HashSet<Pubkey>>,
) {
let now = timestamp();
if status.repair_pubkey_and_addr.is_none()
|| now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64
{
let repair_pubkey_and_addr =
serve_repair.repair_request_duplicate_compute_best_peer(slot, cluster_slots);
let repair_pubkey_and_addr = serve_repair.repair_request_duplicate_compute_best_peer(
slot,
cluster_slots,
repair_validators,
);
status.repair_pubkey_and_addr = repair_pubkey_and_addr.ok();
status.start = timestamp();
}
@ -522,6 +537,7 @@ impl RepairService {
blockstore: &Blockstore,
serve_repair: &ServeRepair,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
repair_validators: &Option<HashSet<Pubkey>>,
) {
for slot in new_duplicate_slots {
warn!(
@ -547,7 +563,7 @@ impl RepairService {
// Mark this slot as special repair, try to download from single
// validator to avoid corruption
let repair_pubkey_and_addr = serve_repair
.repair_request_duplicate_compute_best_peer(*slot, cluster_slots)
.repair_request_duplicate_compute_best_peer(*slot, cluster_slots, repair_validators)
.ok();
let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus {
start: timestamp(),
@ -955,6 +971,7 @@ mod test {
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&None,
);
assert!(duplicate_slot_repair_statuses
.get(&dead_slot)
@ -978,6 +995,7 @@ mod test {
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&None,
);
assert_eq!(duplicate_slot_repair_statuses.len(), 1);
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());
@ -994,6 +1012,7 @@ mod test {
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&None,
);
assert!(duplicate_slot_repair_statuses.is_empty());
}
@ -1028,6 +1047,7 @@ mod test {
&mut duplicate_status,
&cluster_slots,
&serve_repair,
&None,
);
assert_eq!(duplicate_status.repair_pubkey_and_addr, dummy_addr);
@ -1041,6 +1061,7 @@ mod test {
&mut duplicate_status,
&cluster_slots,
&serve_repair,
&None,
);
assert!(duplicate_status.repair_pubkey_and_addr.is_some());
@ -1054,6 +1075,7 @@ mod test {
&mut duplicate_status,
&cluster_slots,
&serve_repair,
&None,
);
assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr);
}
@ -1109,6 +1131,7 @@ mod test {
&blockstore,
&serve_repair,
&reset_sender,
&None,
);
// Blockstore should have been cleared

View File

@ -28,6 +28,7 @@ use solana_sdk::timing::timestamp;
use solana_streamer::streamer::PacketReceiver;
use std::{
cmp,
collections::hash_set::HashSet,
collections::{BTreeMap, HashMap},
net::UdpSocket,
sync::atomic::{AtomicBool, AtomicU64, Ordering},
@ -417,6 +418,7 @@ impl RetransmitStage {
cluster_slots: Arc<ClusterSlots>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender,
verified_vote_receiver: VerifiedVoteReceiver,
repair_validators: Option<HashSet<Pubkey>>,
) -> Self {
let (retransmit_sender, retransmit_receiver) = channel();
@ -441,6 +443,7 @@ impl RetransmitStage {
bank_forks,
epoch_schedule,
duplicate_slots_reset_sender,
repair_validators,
};
let leader_schedule_cache = leader_schedule_cache.clone();
let window_service = WindowService::new(

View File

@ -21,7 +21,7 @@ use solana_sdk::{
};
use solana_streamer::streamer::{PacketReceiver, PacketSender};
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
net::SocketAddr,
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
@ -382,12 +382,13 @@ impl ServeRepair {
repair_request: RepairType,
cache: &mut RepairCache,
repair_stats: &mut RepairStats,
repair_validators: &Option<HashSet<Pubkey>>,
) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
let slot = repair_request.slot();
if cache.get(&slot).is_none() {
let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot);
let repair_peers = self.repair_peers(&repair_validators, slot);
if repair_peers.is_empty() {
return Err(ClusterInfoError::NoPeers.into());
}
@ -411,8 +412,9 @@ impl ServeRepair {
&self,
slot: Slot,
cluster_slots: &ClusterSlots,
repair_validators: &Option<HashSet<Pubkey>>,
) -> Result<(Pubkey, SocketAddr)> {
let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot);
let repair_peers: Vec<_> = self.repair_peers(repair_validators, slot);
if repair_peers.is_empty() {
return Err(ClusterInfoError::NoPeers.into());
}
@ -448,6 +450,27 @@ impl ServeRepair {
}
}
fn repair_peers(
&self,
repair_validators: &Option<HashSet<Pubkey>>,
slot: Slot,
) -> Vec<ContactInfo> {
if let Some(repair_validators) = repair_validators {
repair_validators
.iter()
.filter_map(|key| {
if *key != self.my_info.id {
self.cluster_info.lookup_contact_info(key, |ci| ci.clone())
} else {
None
}
})
.collect()
} else {
self.cluster_info.repair_peers(slot)
}
}
fn run_window_request(
recycler: &PacketsRecycler,
from: &ContactInfo,
@ -733,6 +756,7 @@ mod tests {
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
&None,
);
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
@ -759,6 +783,7 @@ mod tests {
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
&None,
)
.unwrap();
assert_eq!(nxt.serve_repair, serve_repair_addr);
@ -791,6 +816,7 @@ mod tests {
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
&None,
)
.unwrap();
if rv.0 == serve_repair_addr {
@ -937,4 +963,71 @@ mod tests {
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_repair_with_repair_validators() {
let cluster_slots = ClusterSlots::default();
let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(me.clone()));
// Insert two peers on the network
let contact_info2 = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
let contact_info3 = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
cluster_info.insert_info(contact_info2.clone());
cluster_info.insert_info(contact_info3.clone());
let serve_repair = ServeRepair::new(cluster_info);
// If:
// 1) repair validator set doesn't exist in gossip
// 2) repair validator set only includes our own id
// then no repairs should be generated
for pubkey in &[Pubkey::new_rand(), me.id] {
let trusted_validators = Some(vec![*pubkey].into_iter().collect());
assert!(serve_repair.repair_peers(&trusted_validators, 1).is_empty());
assert!(serve_repair
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
&trusted_validators,
)
.is_err());
}
// If trusted validator exists in gossip, should return repair successfully
let trusted_validators = Some(vec![contact_info2.id].into_iter().collect());
let repair_peers = serve_repair.repair_peers(&trusted_validators, 1);
assert_eq!(repair_peers.len(), 1);
assert_eq!(repair_peers[0].id, contact_info2.id);
assert!(serve_repair
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
&trusted_validators,
)
.is_ok());
// Using no trusted validators should default to all
// validator's available in gossip, excluding myself
let repair_peers: HashSet<Pubkey> = serve_repair
.repair_peers(&None, 1)
.into_iter()
.map(|c| c.id)
.collect();
assert_eq!(repair_peers.len(), 2);
assert!(repair_peers.contains(&contact_info2.id));
assert!(repair_peers.contains(&contact_info3.id));
assert!(serve_repair
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
&None,
)
.is_ok());
}
}

View File

@ -65,6 +65,7 @@ pub struct TvuConfig {
pub shred_version: u16,
pub halt_on_trusted_validators_accounts_hash_mismatch: bool,
pub trusted_validators: Option<HashSet<Pubkey>>,
pub repair_validators: Option<HashSet<Pubkey>>,
pub accounts_hash_fault_injection_slots: u64,
}
@ -149,6 +150,7 @@ impl Tvu {
cluster_slots.clone(),
duplicate_slots_reset_sender,
verified_vote_receiver,
tvu_config.repair_validators,
);
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();

View File

@ -78,6 +78,7 @@ pub struct ValidatorConfig {
pub wait_for_supermajority: Option<Slot>,
pub new_hard_forks: Option<Vec<Slot>>,
pub trusted_validators: Option<HashSet<Pubkey>>, // None = trust all
pub repair_validators: Option<HashSet<Pubkey>>, // None = repair from all
pub halt_on_trusted_validators_accounts_hash_mismatch: bool,
pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection
pub frozen_accounts: Vec<Pubkey>,
@ -106,6 +107,7 @@ impl Default for ValidatorConfig {
wait_for_supermajority: None,
new_hard_forks: None,
trusted_validators: None,
repair_validators: None,
halt_on_trusted_validators_accounts_hash_mismatch: false,
accounts_hash_fault_injection_slots: 0,
frozen_accounts: vec![],
@ -457,6 +459,7 @@ impl Validator {
.halt_on_trusted_validators_accounts_hash_mismatch,
shred_version: node.info.shred_version,
trusted_validators: config.trusted_validators.clone(),
repair_validators: config.repair_validators.clone(),
accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots,
},
);

View File

@ -357,6 +357,29 @@ fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option<Vec<Slot>> {
}
}
fn validators_set(
identity_pubkey: &Pubkey,
matches: &ArgMatches<'_>,
matches_name: &str,
arg_name: &str,
) -> Option<HashSet<Pubkey>> {
if matches.is_present(matches_name) {
let validators_set: HashSet<_> = values_t_or_exit!(matches, matches_name, Pubkey)
.into_iter()
.collect();
if validators_set.contains(identity_pubkey) {
eprintln!(
"The validator's identity pubkey cannot be a {}: {}",
arg_name, identity_pubkey
);
exit(1);
}
Some(validators_set)
} else {
None
}
}
fn check_genesis_hash(
genesis_config: &GenesisConfig,
expected_genesis_hash: Option<Hash>,
@ -806,6 +829,16 @@ pub fn main() {
.takes_value(false)
.help("Use the RPC service of trusted validators only")
)
.arg(
Arg::with_name("repair_validators")
.long("repair-validator")
.validator(is_pubkey)
.value_name("PUBKEY")
.multiple(true)
.takes_value(true)
.help("A list of validators to request repairs from. If specified, repair will not \
request from validators outside this set [default: request repairs from all validators]")
)
.arg(
Arg::with_name("no_rocksdb_compaction")
.long("no-rocksdb-compaction")
@ -908,22 +941,18 @@ pub fn main() {
});
let no_untrusted_rpc = matches.is_present("no_untrusted_rpc");
let trusted_validators = if matches.is_present("trusted_validators") {
let trusted_validators: HashSet<_> =
values_t_or_exit!(matches, "trusted_validators", Pubkey)
.into_iter()
.collect();
if trusted_validators.contains(&identity_keypair.pubkey()) {
eprintln!(
"The validator's identity pubkey cannot be a --trusted-validator: {}",
identity_keypair.pubkey()
let trusted_validators = validators_set(
&identity_keypair.pubkey(),
&matches,
"trusted_validators",
"--trusted-validator",
);
let repair_validators = validators_set(
&identity_keypair.pubkey(),
&matches,
"repair_validators",
"--repair-validator",
);
exit(1);
}
Some(trusted_validators)
} else {
None
};
let mut validator_config = ValidatorConfig {
dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(),
@ -957,6 +986,7 @@ pub fn main() {
voting_disabled: matches.is_present("no_voting"),
wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
trusted_validators,
repair_validators,
frozen_accounts: values_t!(matches, "frozen_accounts", Pubkey).unwrap_or_default(),
no_rocksdb_compaction,
wal_recovery_mode,