Extract tower storage details from Tower struct
This commit is contained in:
@@ -1,39 +1,40 @@
|
||||
use crate::{
|
||||
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
|
||||
latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks,
|
||||
progress_map::{LockoutIntervals, ProgressMap},
|
||||
};
|
||||
use chrono::prelude::*;
|
||||
use solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_db};
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_runtime::{
|
||||
bank::Bank, bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE,
|
||||
vote_account::ArcVoteAccount,
|
||||
};
|
||||
use solana_sdk::{
|
||||
clock::{Slot, UnixTimestamp},
|
||||
hash::Hash,
|
||||
instruction::Instruction,
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signature, Signer},
|
||||
slot_history::{Check, SlotHistory},
|
||||
};
|
||||
use solana_vote_program::{
|
||||
vote_instruction,
|
||||
vote_state::{BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY},
|
||||
};
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::{HashMap, HashSet},
|
||||
fs::{self, File},
|
||||
io::BufReader,
|
||||
ops::{
|
||||
Bound::{Included, Unbounded},
|
||||
Deref,
|
||||
use {
|
||||
crate::{
|
||||
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
|
||||
latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks,
|
||||
progress_map::{LockoutIntervals, ProgressMap},
|
||||
},
|
||||
path::{Path, PathBuf},
|
||||
chrono::prelude::*,
|
||||
solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_db},
|
||||
solana_runtime::{
|
||||
bank::Bank, bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE,
|
||||
vote_account::ArcVoteAccount,
|
||||
},
|
||||
solana_sdk::{
|
||||
clock::{Slot, UnixTimestamp},
|
||||
hash::Hash,
|
||||
instruction::Instruction,
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signature, Signer},
|
||||
slot_history::{Check, SlotHistory},
|
||||
},
|
||||
solana_vote_program::{
|
||||
vote_instruction,
|
||||
vote_state::{BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY},
|
||||
},
|
||||
std::{
|
||||
cmp::Ordering,
|
||||
collections::{HashMap, HashSet},
|
||||
fs::{self, File},
|
||||
io::BufReader,
|
||||
ops::{
|
||||
Bound::{Included, Unbounded},
|
||||
Deref,
|
||||
},
|
||||
path::PathBuf,
|
||||
},
|
||||
thiserror::Error,
|
||||
};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(PartialEq, Clone, Debug, AbiExample)]
|
||||
pub enum SwitchForkDecision {
|
||||
@@ -119,12 +120,6 @@ pub struct Tower {
|
||||
last_vote_tx_blockhash: Hash,
|
||||
last_timestamp: BlockTimestamp,
|
||||
#[serde(skip)]
|
||||
pub(crate) ledger_path: PathBuf,
|
||||
#[serde(skip)]
|
||||
path: PathBuf,
|
||||
#[serde(skip)]
|
||||
tmp_path: PathBuf, // used before atomic fs::rename()
|
||||
#[serde(skip)]
|
||||
// Restored last voted slot which cannot be found in SlotHistory at replayed root
|
||||
// (This is a special field for slashing-free validator restart with edge cases).
|
||||
// This could be emptied after some time; but left intact indefinitely for easier
|
||||
@@ -146,9 +141,6 @@ impl Default for Tower {
|
||||
last_vote: Vote::default(),
|
||||
last_timestamp: BlockTimestamp::default(),
|
||||
last_vote_tx_blockhash: Hash::default(),
|
||||
ledger_path: PathBuf::default(),
|
||||
path: PathBuf::default(),
|
||||
tmp_path: PathBuf::default(),
|
||||
stray_restored_slot: Option::default(),
|
||||
last_switch_threshold_check: Option::default(),
|
||||
};
|
||||
@@ -164,25 +156,15 @@ impl Tower {
|
||||
vote_account_pubkey: &Pubkey,
|
||||
root: Slot,
|
||||
bank: &Bank,
|
||||
ledger_path: &Path,
|
||||
) -> Self {
|
||||
let mut tower = Tower {
|
||||
ledger_path: ledger_path.into(),
|
||||
node_pubkey: *node_pubkey,
|
||||
..Tower::default()
|
||||
};
|
||||
tower.set_identity(*node_pubkey);
|
||||
tower.initialize_lockouts_from_bank(vote_account_pubkey, root, bank);
|
||||
tower
|
||||
}
|
||||
|
||||
fn set_identity(&mut self, node_pubkey: Pubkey) {
|
||||
let path = Self::get_filename(&self.ledger_path, &node_pubkey);
|
||||
let tmp_path = Self::get_tmp_filename(&path);
|
||||
self.node_pubkey = node_pubkey;
|
||||
self.path = path;
|
||||
self.tmp_path = tmp_path;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn new_for_tests(threshold_depth: usize, threshold_size: f64) -> Self {
|
||||
Self {
|
||||
@@ -194,7 +176,6 @@ impl Tower {
|
||||
|
||||
pub fn new_from_bankforks(
|
||||
bank_forks: &BankForks,
|
||||
ledger_path: &Path,
|
||||
node_pubkey: &Pubkey,
|
||||
vote_account: &Pubkey,
|
||||
) -> Self {
|
||||
@@ -216,7 +197,7 @@ impl Tower {
|
||||
)
|
||||
.clone();
|
||||
|
||||
Self::new(node_pubkey, vote_account, root, &heaviest_bank, ledger_path)
|
||||
Self::new(node_pubkey, vote_account, root, &heaviest_bank)
|
||||
}
|
||||
|
||||
pub(crate) fn collect_vote_lockouts<F>(
|
||||
@@ -1195,71 +1176,15 @@ impl Tower {
|
||||
self.vote_state.root_slot = Some(root);
|
||||
}
|
||||
|
||||
pub fn get_filename(path: &Path, node_pubkey: &Pubkey) -> PathBuf {
|
||||
path.join(format!("tower-{}", node_pubkey))
|
||||
.with_extension("bin")
|
||||
}
|
||||
|
||||
fn get_tmp_filename(path: &Path) -> PathBuf {
|
||||
path.with_extension("bin.new")
|
||||
}
|
||||
|
||||
pub fn save(&self, node_keypair: &Keypair) -> Result<()> {
|
||||
let mut measure = Measure::start("tower_save-ms");
|
||||
|
||||
if self.node_pubkey != node_keypair.pubkey() {
|
||||
return Err(TowerError::WrongTower(format!(
|
||||
"node_pubkey is {:?} but found tower for {:?}",
|
||||
node_keypair.pubkey(),
|
||||
self.node_pubkey
|
||||
)));
|
||||
}
|
||||
|
||||
let filename = &self.path;
|
||||
let new_filename = &self.tmp_path;
|
||||
{
|
||||
// overwrite anything if exists
|
||||
let mut file = File::create(&new_filename)?;
|
||||
let saved_tower = SavedTower::new(self, node_keypair)?;
|
||||
bincode::serialize_into(&mut file, &saved_tower)?;
|
||||
// file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster!
|
||||
}
|
||||
trace!("persisted votes: {:?}", self.voted_slots());
|
||||
fs::rename(&new_filename, &filename)?;
|
||||
// self.path.parent().sync_all() hurts performance same as the above sync
|
||||
|
||||
measure.stop();
|
||||
inc_new_counter_info!("tower_save-ms", measure.as_ms() as usize);
|
||||
|
||||
pub fn save(&self, tower_storage: &dyn TowerStorage, node_keypair: &Keypair) -> Result<()> {
|
||||
let saved_tower = SavedTower::new(self, node_keypair)?;
|
||||
tower_storage.store(&saved_tower)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn restore(ledger_path: &Path, node_pubkey: &Pubkey) -> Result<Self> {
|
||||
let filename = Self::get_filename(ledger_path, node_pubkey);
|
||||
|
||||
// Ensure to create parent dir here, because restore() precedes save() always
|
||||
fs::create_dir_all(&filename.parent().unwrap())?;
|
||||
|
||||
let file = File::open(&filename)?;
|
||||
let mut stream = BufReader::new(file);
|
||||
|
||||
let saved_tower: SavedTower = bincode::deserialize_from(&mut stream)?;
|
||||
if !saved_tower.verify(node_pubkey) {
|
||||
return Err(TowerError::InvalidSignature);
|
||||
}
|
||||
let mut tower = saved_tower.deserialize()?;
|
||||
tower.ledger_path = ledger_path.into();
|
||||
tower.path = filename;
|
||||
tower.tmp_path = Self::get_tmp_filename(&tower.path);
|
||||
|
||||
// check that the tower actually belongs to this node
|
||||
if &tower.node_pubkey != node_pubkey {
|
||||
return Err(TowerError::WrongTower(format!(
|
||||
"node_pubkey is {:?} but found tower for {:?}",
|
||||
node_pubkey, tower.node_pubkey
|
||||
)));
|
||||
}
|
||||
Ok(tower)
|
||||
pub fn restore(tower_storage: &dyn TowerStorage, node_pubkey: &Pubkey) -> Result<Self> {
|
||||
let saved_tower = tower_storage.load(node_pubkey)?;
|
||||
saved_tower.try_into_tower(node_pubkey)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1300,26 +1225,104 @@ impl TowerError {
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TowerStorage: Sync + Send {
|
||||
fn load(&self, node_pubkey: &Pubkey) -> Result<SavedTower>;
|
||||
fn store(&self, saved_tower: &SavedTower) -> Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, PartialEq)]
|
||||
pub struct FileTowerStorage {
|
||||
pub tower_path: PathBuf,
|
||||
}
|
||||
|
||||
impl FileTowerStorage {
|
||||
pub fn new(tower_path: PathBuf) -> Self {
|
||||
Self { tower_path }
|
||||
}
|
||||
|
||||
pub fn filename(&self, node_pubkey: &Pubkey) -> PathBuf {
|
||||
self.tower_path
|
||||
.join(format!("tower-{}", node_pubkey))
|
||||
.with_extension("bin")
|
||||
}
|
||||
}
|
||||
|
||||
impl TowerStorage for FileTowerStorage {
|
||||
fn load(&self, node_pubkey: &Pubkey) -> Result<SavedTower> {
|
||||
let filename = self.filename(node_pubkey);
|
||||
trace!("load {}", filename.display());
|
||||
|
||||
// Ensure to create parent dir here, because restore() precedes save() always
|
||||
fs::create_dir_all(&filename.parent().unwrap())?;
|
||||
|
||||
let file = File::open(&filename)?;
|
||||
let mut stream = BufReader::new(file);
|
||||
bincode::deserialize_from(&mut stream).map_err(|e| e.into())
|
||||
}
|
||||
|
||||
fn store(&self, saved_tower: &SavedTower) -> Result<()> {
|
||||
let filename = self.filename(&saved_tower.node_pubkey);
|
||||
trace!("store: {}", filename.display());
|
||||
let new_filename = filename.with_extension("bin.new");
|
||||
|
||||
{
|
||||
// overwrite anything if exists
|
||||
let mut file = File::create(&new_filename)?;
|
||||
bincode::serialize_into(&mut file, saved_tower)?;
|
||||
// file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster!
|
||||
}
|
||||
fs::rename(&new_filename, &filename)?;
|
||||
// self.path.parent().sync_all() hurts performance same as the above sync
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[frozen_abi(digest = "Gaxfwvx5MArn52mKZQgzHmDCyn5YfCuTHvp5Et3rFfpp")]
|
||||
#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)]
|
||||
pub struct SavedTower {
|
||||
signature: Signature,
|
||||
data: Vec<u8>,
|
||||
#[serde(skip)]
|
||||
node_pubkey: Pubkey,
|
||||
}
|
||||
|
||||
impl SavedTower {
|
||||
pub fn new<T: Signer>(tower: &Tower, keypair: &T) -> Result<Self> {
|
||||
let node_pubkey = keypair.pubkey();
|
||||
if tower.node_pubkey != node_pubkey {
|
||||
return Err(TowerError::WrongTower(format!(
|
||||
"node_pubkey is {:?} but found tower for {:?}",
|
||||
node_pubkey, tower.node_pubkey
|
||||
)));
|
||||
}
|
||||
|
||||
let data = bincode::serialize(tower)?;
|
||||
let signature = keypair.sign_message(&data);
|
||||
Ok(Self { signature, data })
|
||||
Ok(Self {
|
||||
signature,
|
||||
data,
|
||||
node_pubkey,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn verify(&self, pubkey: &Pubkey) -> bool {
|
||||
self.signature.verify(pubkey.as_ref(), &self.data)
|
||||
}
|
||||
pub fn try_into_tower(self, node_pubkey: &Pubkey) -> Result<Tower> {
|
||||
// This method assumes that `self` was just deserialized
|
||||
assert_eq!(self.node_pubkey, Pubkey::default());
|
||||
|
||||
pub fn deserialize(&self) -> Result<Tower> {
|
||||
bincode::deserialize(&self.data).map_err(|e| e.into())
|
||||
if !self.signature.verify(node_pubkey.as_ref(), &self.data) {
|
||||
return Err(TowerError::InvalidSignature);
|
||||
}
|
||||
bincode::deserialize(&self.data)
|
||||
.map_err(|e| e.into())
|
||||
.and_then(|tower: Tower| {
|
||||
if tower.node_pubkey != *node_pubkey {
|
||||
return Err(TowerError::WrongTower(format!(
|
||||
"node_pubkey is {:?} but found tower for {:?}",
|
||||
node_pubkey, tower.node_pubkey
|
||||
)));
|
||||
}
|
||||
Ok(tower)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2518,20 +2521,20 @@ pub mod test {
|
||||
F: Fn(&mut Tower, &Pubkey),
|
||||
G: Fn(&PathBuf),
|
||||
{
|
||||
let dir = TempDir::new().unwrap();
|
||||
let tower_path = TempDir::new().unwrap();
|
||||
let identity_keypair = Arc::new(Keypair::new());
|
||||
let node_pubkey = identity_keypair.pubkey();
|
||||
|
||||
// Use values that will not match the default derived from BankForks
|
||||
let mut tower = Tower::new_for_tests(10, 0.9);
|
||||
tower.ledger_path = dir.path().to_path_buf();
|
||||
tower.path = Tower::get_filename(&tower.ledger_path, &identity_keypair.pubkey());
|
||||
tower.tmp_path = Tower::get_tmp_filename(&tower.path);
|
||||
|
||||
modify_original(&mut tower, &identity_keypair.pubkey());
|
||||
let tower_storage = FileTowerStorage::new(tower_path.path().to_path_buf());
|
||||
|
||||
tower.save(&identity_keypair).unwrap();
|
||||
modify_serialized(&tower.path);
|
||||
let loaded = Tower::restore(dir.path(), &identity_keypair.pubkey());
|
||||
modify_original(&mut tower, &node_pubkey);
|
||||
|
||||
tower.save(&tower_storage, &identity_keypair).unwrap();
|
||||
modify_serialized(&tower_storage.filename(&node_pubkey));
|
||||
let loaded = Tower::restore(&tower_storage, &node_pubkey);
|
||||
|
||||
(tower, loaded)
|
||||
}
|
||||
@@ -2760,8 +2763,9 @@ pub mod test {
|
||||
fn test_load_tower_wrong_identity() {
|
||||
let identity_keypair = Arc::new(Keypair::new());
|
||||
let tower = Tower::default();
|
||||
let tower_storage = FileTowerStorage::default();
|
||||
assert_matches!(
|
||||
tower.save(&identity_keypair),
|
||||
tower.save(&tower_storage, &identity_keypair),
|
||||
Err(TowerError::WrongTower(_))
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user