Move blocktree_processor to solana_ledger (#6460)
* Drop core::result dependency in bank_forks * Move blocktree_processor into solana_ledger
This commit is contained in:
@ -1,354 +0,0 @@
|
||||
//! The `bank_forks` module implments BankForks a DAG of checkpointed Banks
|
||||
|
||||
use crate::result::Result;
|
||||
use crate::snapshot_package::SnapshotPackageSender;
|
||||
use crate::snapshot_utils;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_metrics::inc_new_counter_info;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_runtime::status_cache::MAX_CACHE_ENTRIES;
|
||||
use solana_sdk::timing;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::ops::Index;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct SnapshotConfig {
|
||||
// Generate a new snapshot every this many slots
|
||||
pub snapshot_interval_slots: usize,
|
||||
|
||||
// Where to store the latest packaged snapshot
|
||||
pub snapshot_package_output_path: PathBuf,
|
||||
|
||||
// Where to place the snapshots for recent slots
|
||||
pub snapshot_path: PathBuf,
|
||||
}
|
||||
|
||||
pub struct BankForks {
|
||||
pub banks: HashMap<u64, Arc<Bank>>,
|
||||
working_bank: Arc<Bank>,
|
||||
root: u64,
|
||||
pub snapshot_config: Option<SnapshotConfig>,
|
||||
last_snapshot_slot: u64,
|
||||
}
|
||||
|
||||
impl Index<u64> for BankForks {
|
||||
type Output = Arc<Bank>;
|
||||
fn index(&self, bank_slot: u64) -> &Arc<Bank> {
|
||||
&self.banks[&bank_slot]
|
||||
}
|
||||
}
|
||||
|
||||
impl BankForks {
|
||||
pub fn new(bank_slot: u64, bank: Bank) -> Self {
|
||||
let mut banks = HashMap::new();
|
||||
let working_bank = Arc::new(bank);
|
||||
banks.insert(bank_slot, working_bank.clone());
|
||||
Self {
|
||||
banks,
|
||||
working_bank,
|
||||
root: 0,
|
||||
snapshot_config: None,
|
||||
last_snapshot_slot: bank_slot,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a map of bank slot id to the set of ancestors for the bank slot.
|
||||
pub fn ancestors(&self) -> HashMap<u64, HashSet<u64>> {
|
||||
let mut ancestors = HashMap::new();
|
||||
let root = self.root;
|
||||
for bank in self.banks.values() {
|
||||
let mut set: HashSet<u64> = bank
|
||||
.ancestors
|
||||
.keys()
|
||||
.filter(|k| **k >= root)
|
||||
.cloned()
|
||||
.collect();
|
||||
set.remove(&bank.slot());
|
||||
ancestors.insert(bank.slot(), set);
|
||||
}
|
||||
ancestors
|
||||
}
|
||||
|
||||
/// Create a map of bank slot id to the set of all of its descendants
|
||||
#[allow(clippy::or_fun_call)]
|
||||
pub fn descendants(&self) -> HashMap<u64, HashSet<u64>> {
|
||||
let mut descendants = HashMap::new();
|
||||
for bank in self.banks.values() {
|
||||
let _ = descendants.entry(bank.slot()).or_insert(HashSet::new());
|
||||
let mut set: HashSet<u64> = bank.ancestors.keys().cloned().collect();
|
||||
set.remove(&bank.slot());
|
||||
for parent in set {
|
||||
descendants
|
||||
.entry(parent)
|
||||
.or_insert(HashSet::new())
|
||||
.insert(bank.slot());
|
||||
}
|
||||
}
|
||||
descendants
|
||||
}
|
||||
|
||||
pub fn frozen_banks(&self) -> HashMap<u64, Arc<Bank>> {
|
||||
self.banks
|
||||
.iter()
|
||||
.filter(|(_, b)| b.is_frozen())
|
||||
.map(|(k, b)| (*k, b.clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn active_banks(&self) -> Vec<u64> {
|
||||
self.banks
|
||||
.iter()
|
||||
.filter(|(_, v)| !v.is_frozen())
|
||||
.map(|(k, _v)| *k)
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn get(&self, bank_slot: u64) -> Option<&Arc<Bank>> {
|
||||
self.banks.get(&bank_slot)
|
||||
}
|
||||
|
||||
pub fn new_from_banks(initial_forks: &[Arc<Bank>], rooted_path: Vec<u64>) -> Self {
|
||||
let mut banks = HashMap::new();
|
||||
let working_bank = initial_forks[0].clone();
|
||||
|
||||
// Iterate through the heads of all the different forks
|
||||
for bank in initial_forks {
|
||||
banks.insert(bank.slot(), bank.clone());
|
||||
let parents = bank.parents();
|
||||
for parent in parents {
|
||||
if banks.contains_key(&parent.slot()) {
|
||||
// All ancestors have already been inserted by another fork
|
||||
break;
|
||||
}
|
||||
banks.insert(parent.slot(), parent.clone());
|
||||
}
|
||||
}
|
||||
let root = *rooted_path.last().unwrap();
|
||||
Self {
|
||||
root,
|
||||
banks,
|
||||
working_bank,
|
||||
snapshot_config: None,
|
||||
last_snapshot_slot: root,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, bank: Bank) -> Arc<Bank> {
|
||||
let bank = Arc::new(bank);
|
||||
let prev = self.banks.insert(bank.slot(), bank.clone());
|
||||
assert!(prev.is_none());
|
||||
|
||||
self.working_bank = bank.clone();
|
||||
bank
|
||||
}
|
||||
|
||||
// TODO: really want to kill this...
|
||||
pub fn working_bank(&self) -> Arc<Bank> {
|
||||
self.working_bank.clone()
|
||||
}
|
||||
|
||||
pub fn set_root(&mut self, root: u64, snapshot_package_sender: &Option<SnapshotPackageSender>) {
|
||||
self.root = root;
|
||||
let set_root_start = Instant::now();
|
||||
let root_bank = self
|
||||
.banks
|
||||
.get(&root)
|
||||
.expect("root bank didn't exist in bank_forks");
|
||||
let root_tx_count = root_bank
|
||||
.parents()
|
||||
.last()
|
||||
.map(|bank| bank.transaction_count())
|
||||
.unwrap_or(0);
|
||||
|
||||
root_bank.squash();
|
||||
let new_tx_count = root_bank.transaction_count();
|
||||
|
||||
// Generate a snapshot if snapshots are configured and it's been an appropriate number
|
||||
// of banks since the last snapshot
|
||||
if self.snapshot_config.is_some() && snapshot_package_sender.is_some() {
|
||||
let config = self.snapshot_config.as_ref().unwrap();
|
||||
info!("setting snapshot root: {}", root);
|
||||
if root - self.last_snapshot_slot >= config.snapshot_interval_slots as u64 {
|
||||
let mut snapshot_time = Measure::start("total-snapshot-ms");
|
||||
let r = self.generate_snapshot(
|
||||
root,
|
||||
&root_bank.src.roots(),
|
||||
snapshot_package_sender.as_ref().unwrap(),
|
||||
snapshot_utils::get_snapshot_tar_path(&config.snapshot_package_output_path),
|
||||
);
|
||||
if r.is_err() {
|
||||
warn!("Error generating snapshot for bank: {}, err: {:?}", root, r);
|
||||
} else {
|
||||
self.last_snapshot_slot = root;
|
||||
}
|
||||
|
||||
// Cleanup outdated snapshots
|
||||
self.purge_old_snapshots();
|
||||
snapshot_time.stop();
|
||||
inc_new_counter_info!("total-snapshot-ms", snapshot_time.as_ms() as usize);
|
||||
}
|
||||
}
|
||||
|
||||
self.prune_non_root(root);
|
||||
|
||||
inc_new_counter_info!(
|
||||
"bank-forks_set_root_ms",
|
||||
timing::duration_as_ms(&set_root_start.elapsed()) as usize
|
||||
);
|
||||
inc_new_counter_info!(
|
||||
"bank-forks_set_root_tx_count",
|
||||
(new_tx_count - root_tx_count) as usize
|
||||
);
|
||||
}
|
||||
|
||||
pub fn root(&self) -> u64 {
|
||||
self.root
|
||||
}
|
||||
|
||||
pub fn purge_old_snapshots(&self) {
|
||||
// Remove outdated snapshots
|
||||
let config = self.snapshot_config.as_ref().unwrap();
|
||||
let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path);
|
||||
let num_to_remove = slot_snapshot_paths.len().saturating_sub(MAX_CACHE_ENTRIES);
|
||||
for slot_files in &slot_snapshot_paths[..num_to_remove] {
|
||||
let r = snapshot_utils::remove_snapshot(slot_files.slot, &config.snapshot_path);
|
||||
if r.is_err() {
|
||||
warn!("Couldn't remove snapshot at: {:?}", config.snapshot_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn generate_snapshot<P: AsRef<Path>>(
|
||||
&self,
|
||||
root: u64,
|
||||
slots_to_snapshot: &[u64],
|
||||
snapshot_package_sender: &SnapshotPackageSender,
|
||||
tar_output_file: P,
|
||||
) -> Result<()> {
|
||||
let config = self.snapshot_config.as_ref().unwrap();
|
||||
|
||||
// Add a snapshot for the new root
|
||||
let bank = self
|
||||
.get(root)
|
||||
.cloned()
|
||||
.expect("root must exist in BankForks");
|
||||
|
||||
let mut add_snapshot_time = Measure::start("add-snapshot-ms");
|
||||
snapshot_utils::add_snapshot(&config.snapshot_path, &bank)?;
|
||||
add_snapshot_time.stop();
|
||||
inc_new_counter_info!("add-snapshot-ms", add_snapshot_time.as_ms() as usize);
|
||||
|
||||
// Package the relevant snapshots
|
||||
let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path);
|
||||
let latest_slot_snapshot_paths = slot_snapshot_paths
|
||||
.last()
|
||||
.expect("no snapshots found in config snapshot_path");
|
||||
// We only care about the last bank's snapshot.
|
||||
// We'll ask the bank for MAX_CACHE_ENTRIES (on the rooted path) worth of statuses
|
||||
let package = snapshot_utils::package_snapshot(
|
||||
&bank,
|
||||
latest_slot_snapshot_paths,
|
||||
tar_output_file,
|
||||
&config.snapshot_path,
|
||||
slots_to_snapshot,
|
||||
)?;
|
||||
|
||||
// Send the package to the packaging thread
|
||||
snapshot_package_sender.send(package)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prune_non_root(&mut self, root: u64) {
|
||||
let descendants = self.descendants();
|
||||
self.banks
|
||||
.retain(|slot, _| slot == &root || descendants[&root].contains(slot));
|
||||
}
|
||||
|
||||
pub fn set_snapshot_config(&mut self, snapshot_config: SnapshotConfig) {
|
||||
self.snapshot_config = Some(snapshot_config);
|
||||
}
|
||||
|
||||
pub fn snapshot_config(&self) -> &Option<SnapshotConfig> {
|
||||
&self.snapshot_config
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks() {
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(10_000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let mut bank_forks = BankForks::new(0, bank);
|
||||
let child_bank = Bank::new_from_parent(&bank_forks[0u64], &Pubkey::default(), 1);
|
||||
child_bank.register_tick(&Hash::default());
|
||||
bank_forks.insert(child_bank);
|
||||
assert_eq!(bank_forks[1u64].tick_height(), 1);
|
||||
assert_eq!(bank_forks.working_bank().tick_height(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks_descendants() {
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(10_000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let mut bank_forks = BankForks::new(0, bank);
|
||||
let bank0 = bank_forks[0].clone();
|
||||
let bank = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
|
||||
bank_forks.insert(bank);
|
||||
let bank = Bank::new_from_parent(&bank0, &Pubkey::default(), 2);
|
||||
bank_forks.insert(bank);
|
||||
let descendants = bank_forks.descendants();
|
||||
let children: HashSet<u64> = [1u64, 2u64].to_vec().into_iter().collect();
|
||||
assert_eq!(children, *descendants.get(&0).unwrap());
|
||||
assert!(descendants[&1].is_empty());
|
||||
assert!(descendants[&2].is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks_ancestors() {
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(10_000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let mut bank_forks = BankForks::new(0, bank);
|
||||
let bank0 = bank_forks[0].clone();
|
||||
let bank = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
|
||||
bank_forks.insert(bank);
|
||||
let bank = Bank::new_from_parent(&bank0, &Pubkey::default(), 2);
|
||||
bank_forks.insert(bank);
|
||||
let ancestors = bank_forks.ancestors();
|
||||
assert!(ancestors[&0].is_empty());
|
||||
let parents: Vec<u64> = ancestors[&1].iter().cloned().collect();
|
||||
assert_eq!(parents, vec![0]);
|
||||
let parents: Vec<u64> = ancestors[&2].iter().cloned().collect();
|
||||
assert_eq!(parents, vec![0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks_frozen_banks() {
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(10_000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let mut bank_forks = BankForks::new(0, bank);
|
||||
let child_bank = Bank::new_from_parent(&bank_forks[0u64], &Pubkey::default(), 1);
|
||||
bank_forks.insert(child_bank);
|
||||
assert!(bank_forks.frozen_banks().get(&0).is_some());
|
||||
assert!(bank_forks.frozen_banks().get(&1).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks_active_banks() {
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(10_000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let mut bank_forks = BankForks::new(0, bank);
|
||||
let child_bank = Bank::new_from_parent(&bank_forks[0u64], &Pubkey::default(), 1);
|
||||
bank_forks.insert(child_bank);
|
||||
assert_eq!(bank_forks.active_banks(), vec![1]);
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -12,7 +12,6 @@
|
||||
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
|
||||
//!
|
||||
//! Bank needs to provide an interface for us to query the stake weight
|
||||
use crate::bank_forks::BankForks;
|
||||
use crate::contact_info::ContactInfo;
|
||||
use crate::crds_gossip::CrdsGossip;
|
||||
use crate::crds_gossip_error::CrdsGossipError;
|
||||
@ -30,6 +29,7 @@ use itertools::Itertools;
|
||||
use rand::SeedableRng;
|
||||
use rand::{thread_rng, Rng};
|
||||
use rand_chacha::ChaChaRng;
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
use solana_ledger::blocktree::Blocktree;
|
||||
use solana_ledger::staking_utils;
|
||||
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
|
||||
@ -1779,7 +1779,6 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::blocktree_processor::tests::fill_blocktree_slot_with_ticks;
|
||||
use crate::crds_value::CrdsValueLabel;
|
||||
use crate::repair_service::RepairType;
|
||||
use crate::result::Error;
|
||||
@ -1788,6 +1787,7 @@ mod tests {
|
||||
use solana_ledger::blocktree::get_tmp_ledger_path;
|
||||
use solana_ledger::blocktree::make_many_slot_entries;
|
||||
use solana_ledger::blocktree::Blocktree;
|
||||
use solana_ledger::blocktree_processor::fill_blocktree_slot_with_ticks;
|
||||
use solana_ledger::shred::{
|
||||
max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader,
|
||||
};
|
||||
|
@ -1,13 +1,13 @@
|
||||
use crate::cluster_info::ClusterInfo;
|
||||
use crate::crds_value::EpochSlots;
|
||||
use crate::result::Result;
|
||||
use crate::rooted_slot_iterator::RootedSlotIterator;
|
||||
use crate::service::Service;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::SeedableRng;
|
||||
use rand_chacha::ChaChaRng;
|
||||
use solana_ledger::blocktree::Blocktree;
|
||||
use solana_ledger::rooted_slot_iterator::RootedSlotIterator;
|
||||
use solana_metrics::datapoint;
|
||||
use solana_sdk::{epoch_schedule::EpochSchedule, pubkey::Pubkey};
|
||||
use std::{
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::bank_forks::BankForks;
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
use solana_metrics::datapoint_debug;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::account::Account;
|
||||
|
@ -1,12 +1,12 @@
|
||||
//! The `gossip_service` module implements the network control plane.
|
||||
|
||||
use crate::bank_forks::BankForks;
|
||||
use crate::cluster_info::{ClusterInfo, VALIDATOR_PORT_RANGE};
|
||||
use crate::contact_info::ContactInfo;
|
||||
use crate::service::Service;
|
||||
use crate::streamer;
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_client::thin_client::{create_client, ThinClient};
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
use solana_ledger::blocktree::Blocktree;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
|
@ -5,7 +5,6 @@
|
||||
//! command-line tools to spin up validators and a Rust library
|
||||
//!
|
||||
|
||||
pub mod bank_forks;
|
||||
pub mod banking_stage;
|
||||
pub mod broadcast_stage;
|
||||
pub mod chacha;
|
||||
@ -18,7 +17,6 @@ pub mod shred_fetch_stage;
|
||||
pub mod contact_info;
|
||||
pub mod blockstream;
|
||||
pub mod blockstream_service;
|
||||
pub mod blocktree_processor;
|
||||
pub mod cluster_info;
|
||||
pub mod cluster_info_repair_listener;
|
||||
pub mod consensus;
|
||||
@ -44,7 +42,6 @@ pub mod replay_stage;
|
||||
pub mod replicator;
|
||||
pub mod result;
|
||||
pub mod retransmit_stage;
|
||||
pub mod rooted_slot_iterator;
|
||||
pub mod rpc;
|
||||
pub mod rpc_pubsub;
|
||||
pub mod rpc_pubsub_service;
|
||||
@ -54,9 +51,7 @@ pub mod sendmmsg;
|
||||
pub mod service;
|
||||
pub mod sigverify;
|
||||
pub mod sigverify_stage;
|
||||
pub mod snapshot_package;
|
||||
pub mod snapshot_packager_service;
|
||||
pub mod snapshot_utils;
|
||||
pub mod storage_stage;
|
||||
pub mod streamer;
|
||||
pub mod test_tx;
|
||||
@ -70,8 +65,6 @@ pub mod window_service;
|
||||
#[macro_use]
|
||||
extern crate solana_budget_program;
|
||||
|
||||
extern crate solana_storage_program;
|
||||
|
||||
#[cfg(test)]
|
||||
#[macro_use]
|
||||
extern crate hex_literal;
|
||||
@ -95,11 +88,3 @@ extern crate matches;
|
||||
|
||||
#[macro_use]
|
||||
extern crate solana_ledger;
|
||||
|
||||
extern crate bzip2;
|
||||
extern crate crossbeam_channel;
|
||||
extern crate dir_diff;
|
||||
extern crate fs_extra;
|
||||
extern crate symlink;
|
||||
extern crate tar;
|
||||
extern crate tempfile;
|
||||
|
@ -1,9 +1,10 @@
|
||||
//! The `repair_service` module implements the tools necessary to generate a thread which
|
||||
//! regularly finds missing blobs in the ledger and sends repair requests for those blobs
|
||||
use crate::{
|
||||
bank_forks::BankForks, cluster_info::ClusterInfo,
|
||||
cluster_info_repair_listener::ClusterInfoRepairListener, result::Result, service::Service,
|
||||
cluster_info::ClusterInfo, cluster_info_repair_listener::ClusterInfoRepairListener,
|
||||
result::Result, service::Service,
|
||||
};
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
use solana_ledger::blocktree::{Blocktree, CompletedSlotsReceiver, SlotMeta};
|
||||
use solana_sdk::{epoch_schedule::EpochSchedule, pubkey::Pubkey};
|
||||
use std::{
|
||||
|
@ -1,7 +1,5 @@
|
||||
//! The `replay_stage` replays transactions broadcast by the leader.
|
||||
|
||||
use crate::bank_forks::BankForks;
|
||||
use crate::blocktree_processor;
|
||||
use crate::cluster_info::ClusterInfo;
|
||||
use crate::confidence::{
|
||||
AggregateConfidenceService, ConfidenceAggregationData, ForkConfidenceCache,
|
||||
@ -12,10 +10,12 @@ use crate::poh_recorder::PohRecorder;
|
||||
use crate::result::{Error, Result};
|
||||
use crate::rpc_subscriptions::RpcSubscriptions;
|
||||
use crate::service::Service;
|
||||
use crate::snapshot_package::SnapshotPackageSender;
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
use solana_ledger::blocktree::{Blocktree, BlocktreeError};
|
||||
use solana_ledger::blocktree_processor;
|
||||
use solana_ledger::entry::{Entry, EntrySlice};
|
||||
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
||||
use solana_ledger::snapshot_package::SnapshotPackageSender;
|
||||
use solana_metrics::{datapoint_warn, inc_new_counter_info};
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::hash::Hash;
|
||||
|
@ -3,12 +3,9 @@
|
||||
use crate::cluster_info;
|
||||
use crate::packet;
|
||||
use crate::poh_recorder;
|
||||
use crate::snapshot_utils;
|
||||
use bincode;
|
||||
use serde_json;
|
||||
use solana_ledger::blocktree;
|
||||
use solana_ledger::snapshot_utils;
|
||||
use solana_sdk::transaction;
|
||||
use std;
|
||||
use std::any::Any;
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -1,7 +1,6 @@
|
||||
//! The `retransmit_stage` retransmits blobs between validators
|
||||
|
||||
use crate::{
|
||||
bank_forks::BankForks,
|
||||
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
|
||||
repair_service::RepairStrategy,
|
||||
result::{Error, Result},
|
||||
@ -12,6 +11,7 @@ use crate::{
|
||||
use rand::SeedableRng;
|
||||
use rand_chacha::ChaChaRng;
|
||||
use solana_ledger::{
|
||||
bank_forks::BankForks,
|
||||
blocktree::{Blocktree, CompletedSlotsReceiver},
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
staking_utils,
|
||||
@ -262,11 +262,11 @@ impl Service for RetransmitStage {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::blocktree_processor::{process_blocktree, ProcessOptions};
|
||||
use crate::contact_info::ContactInfo;
|
||||
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
|
||||
use crate::packet::{Meta, Packet, Packets};
|
||||
use solana_ledger::blocktree::create_new_tmp_ledger;
|
||||
use solana_ledger::blocktree_processor::{process_blocktree, ProcessOptions};
|
||||
use solana_netutil::find_available_port_in_range;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
|
||||
|
@ -1,134 +0,0 @@
|
||||
use solana_ledger::blocktree::*;
|
||||
|
||||
pub struct RootedSlotIterator<'a> {
|
||||
next_slots: Vec<u64>,
|
||||
blocktree: &'a Blocktree,
|
||||
}
|
||||
|
||||
impl<'a> RootedSlotIterator<'a> {
|
||||
pub fn new(start_slot: u64, blocktree: &'a Blocktree) -> Result<Self> {
|
||||
if blocktree.is_root(start_slot) {
|
||||
Ok(Self {
|
||||
next_slots: vec![start_slot],
|
||||
blocktree,
|
||||
})
|
||||
} else {
|
||||
Err(BlocktreeError::SlotNotRooted)
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<'a> Iterator for RootedSlotIterator<'a> {
|
||||
type Item = (u64, SlotMeta);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
// Clone b/c passing the closure to the map below requires exclusive access to
|
||||
// `self`, which is borrowed here if we don't clone.
|
||||
let rooted_slot = self
|
||||
.next_slots
|
||||
.iter()
|
||||
.find(|x| self.blocktree.is_root(**x))
|
||||
.cloned();
|
||||
|
||||
rooted_slot
|
||||
.map(|rooted_slot| {
|
||||
let slot_meta = self
|
||||
.blocktree
|
||||
.meta(rooted_slot)
|
||||
.expect("Database failure, couldnt fetch SlotMeta");
|
||||
|
||||
if slot_meta.is_none() {
|
||||
warn!("Rooted SlotMeta was deleted in between checking is_root and fetch");
|
||||
}
|
||||
|
||||
slot_meta.map(|slot_meta| {
|
||||
self.next_slots = slot_meta.next_slots.clone();
|
||||
(rooted_slot, slot_meta)
|
||||
})
|
||||
})
|
||||
.unwrap_or(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::blocktree_processor::tests::fill_blocktree_slot_with_ticks;
|
||||
use solana_sdk::hash::Hash;
|
||||
|
||||
#[test]
|
||||
fn test_rooted_slot_iterator() {
|
||||
let blocktree_path = get_tmp_ledger_path("test_rooted_slot_iterator");
|
||||
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
||||
blocktree.set_roots(&[0]).unwrap();
|
||||
let ticks_per_slot = 5;
|
||||
/*
|
||||
Build a blocktree in the ledger with the following fork structure:
|
||||
|
||||
slot 0
|
||||
|
|
||||
slot 1 <-- set_root(true)
|
||||
/ \
|
||||
slot 2 |
|
||||
/ |
|
||||
slot 3 |
|
||||
|
|
||||
slot 4
|
||||
|
||||
*/
|
||||
|
||||
// Fork 1, ending at slot 3
|
||||
let last_entry_hash = Hash::default();
|
||||
let fork_point = 1;
|
||||
let mut fork_hash = Hash::default();
|
||||
for slot in 0..=3 {
|
||||
let parent = {
|
||||
if slot == 0 {
|
||||
0
|
||||
} else {
|
||||
slot - 1
|
||||
}
|
||||
};
|
||||
let last_entry_hash = fill_blocktree_slot_with_ticks(
|
||||
&blocktree,
|
||||
ticks_per_slot,
|
||||
slot,
|
||||
parent,
|
||||
last_entry_hash,
|
||||
);
|
||||
|
||||
if slot == fork_point {
|
||||
fork_hash = last_entry_hash;
|
||||
}
|
||||
}
|
||||
|
||||
// Fork 2, ending at slot 4
|
||||
let _ =
|
||||
fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, 4, fork_point, fork_hash);
|
||||
|
||||
// Set a root
|
||||
blocktree.set_roots(&[1, 2, 3]).unwrap();
|
||||
|
||||
// Trying to get an iterator on a different fork will error
|
||||
assert!(RootedSlotIterator::new(4, &blocktree).is_err());
|
||||
|
||||
// Trying to get an iterator on any slot on the root fork should succeed
|
||||
let result: Vec<_> = RootedSlotIterator::new(3, &blocktree)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|(slot, _)| slot)
|
||||
.collect();
|
||||
let expected = vec![3];
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let result: Vec<_> = RootedSlotIterator::new(0, &blocktree)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|(slot, _)| slot)
|
||||
.collect();
|
||||
let expected = vec![0, 1, 2, 3];
|
||||
assert_eq!(result, expected);
|
||||
|
||||
drop(blocktree);
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
}
|
||||
}
|
@ -1,7 +1,6 @@
|
||||
//! The `rpc` module implements the Solana RPC interface.
|
||||
|
||||
use crate::{
|
||||
bank_forks::BankForks,
|
||||
cluster_info::ClusterInfo,
|
||||
confidence::{BankConfidence, ForkConfidenceCache},
|
||||
contact_info::ContactInfo,
|
||||
@ -15,6 +14,7 @@ use jsonrpc_core::{Error, Metadata, Result};
|
||||
use jsonrpc_derive::rpc;
|
||||
use solana_client::rpc_request::{RpcEpochInfo, RpcVoteAccountInfo, RpcVoteAccountStatus};
|
||||
use solana_drone::drone::request_airdrop_transaction;
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::{
|
||||
account::Account,
|
||||
|
@ -238,13 +238,13 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::bank_forks::BankForks;
|
||||
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
|
||||
use jsonrpc_core::futures::sync::mpsc;
|
||||
use jsonrpc_core::Response;
|
||||
use jsonrpc_pubsub::{PubSubHandler, Session};
|
||||
use solana_budget_api;
|
||||
use solana_budget_api::budget_instruction;
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
|
@ -1,14 +1,15 @@
|
||||
//! The `rpc_service` module implements the Solana JSON RPC service.
|
||||
|
||||
use crate::{
|
||||
bank_forks::BankForks, cluster_info::ClusterInfo, confidence::ForkConfidenceCache, rpc::*,
|
||||
service::Service, storage_stage::StorageState, validator::ValidatorExit,
|
||||
cluster_info::ClusterInfo, confidence::ForkConfidenceCache, rpc::*, service::Service,
|
||||
storage_stage::StorageState, validator::ValidatorExit,
|
||||
};
|
||||
use jsonrpc_core::MetaIoHandler;
|
||||
use jsonrpc_http_server::{
|
||||
hyper, AccessControlAllowOrigin, CloseHandle, DomainsValidation, RequestMiddleware,
|
||||
RequestMiddlewareAction, ServerBuilder,
|
||||
};
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
use solana_sdk::hash::Hash;
|
||||
use std::{
|
||||
net::SocketAddr,
|
||||
|
@ -1,11 +1,11 @@
|
||||
//! The `pubsub` module implements a threaded subscription service on client RPC request
|
||||
|
||||
use crate::bank_forks::BankForks;
|
||||
use core::hash::Hash;
|
||||
use jsonrpc_core::futures::Future;
|
||||
use jsonrpc_pubsub::typed::Sink;
|
||||
use jsonrpc_pubsub::SubscriptionId;
|
||||
use serde::Serialize;
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::account::Account;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
|
@ -1,36 +0,0 @@
|
||||
use solana_runtime::accounts_db::AccountStorageEntry;
|
||||
use solana_runtime::status_cache::SlotDelta;
|
||||
use solana_sdk::transaction::Result as TransactionResult;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::mpsc::{Receiver, Sender};
|
||||
use std::sync::Arc;
|
||||
use tempfile::TempDir;
|
||||
|
||||
pub type SnapshotPackageSender = Sender<SnapshotPackage>;
|
||||
pub type SnapshotPackageReceiver = Receiver<SnapshotPackage>;
|
||||
|
||||
pub struct SnapshotPackage {
|
||||
pub root: u64,
|
||||
pub slot_deltas: Vec<SlotDelta<TransactionResult<()>>>,
|
||||
pub snapshot_links: TempDir,
|
||||
pub storage_entries: Vec<Arc<AccountStorageEntry>>,
|
||||
pub tar_output_file: PathBuf,
|
||||
}
|
||||
|
||||
impl SnapshotPackage {
|
||||
pub fn new(
|
||||
root: u64,
|
||||
slot_deltas: Vec<SlotDelta<TransactionResult<()>>>,
|
||||
snapshot_links: TempDir,
|
||||
storage_entries: Vec<Arc<AccountStorageEntry>>,
|
||||
tar_output_file: PathBuf,
|
||||
) -> Self {
|
||||
Self {
|
||||
root,
|
||||
slot_deltas,
|
||||
snapshot_links,
|
||||
storage_entries,
|
||||
tar_output_file,
|
||||
}
|
||||
}
|
||||
}
|
@ -1,8 +1,8 @@
|
||||
use crate::result::{Error, Result};
|
||||
use crate::service::Service;
|
||||
use crate::snapshot_package::{SnapshotPackage, SnapshotPackageReceiver};
|
||||
use crate::snapshot_utils::{self, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR};
|
||||
use bincode::serialize_into;
|
||||
use solana_ledger::snapshot_package::{SnapshotPackage, SnapshotPackageReceiver};
|
||||
use solana_ledger::snapshot_utils::{self, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR};
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_metrics::datapoint_info;
|
||||
use solana_runtime::status_cache::SlotDelta;
|
||||
@ -187,7 +187,7 @@ impl Service for SnapshotPackagerService {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::snapshot_utils;
|
||||
use solana_ledger::snapshot_utils;
|
||||
use solana_runtime::accounts_db::AccountStorageEntry;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::Write;
|
||||
|
@ -1,326 +0,0 @@
|
||||
use crate::snapshot_package::SnapshotPackage;
|
||||
use bincode::{deserialize_from, serialize_into};
|
||||
use bzip2::bufread::BzDecoder;
|
||||
use fs_extra::dir::CopyOptions;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_runtime::status_cache::SlotDelta;
|
||||
use solana_sdk::transaction;
|
||||
use std::cmp::Ordering;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, BufWriter, Error as IOError, ErrorKind};
|
||||
use std::path::{Path, PathBuf};
|
||||
use tar::Archive;
|
||||
|
||||
pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";
|
||||
pub const TAR_SNAPSHOTS_DIR: &str = "snapshots";
|
||||
pub const TAR_ACCOUNTS_DIR: &str = "accounts";
|
||||
|
||||
#[derive(PartialEq, Ord, Eq, Debug)]
|
||||
pub struct SlotSnapshotPaths {
|
||||
pub slot: u64,
|
||||
pub snapshot_file_path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SnapshotError {
|
||||
IO(std::io::Error),
|
||||
Serialize(std::boxed::Box<bincode::ErrorKind>),
|
||||
FsExtra(fs_extra::error::Error),
|
||||
}
|
||||
pub type Result<T> = std::result::Result<T, SnapshotError>;
|
||||
|
||||
impl std::convert::From<std::io::Error> for SnapshotError {
|
||||
fn from(e: std::io::Error) -> SnapshotError {
|
||||
SnapshotError::IO(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::convert::From<std::boxed::Box<bincode::ErrorKind>> for SnapshotError {
|
||||
fn from(e: std::boxed::Box<bincode::ErrorKind>) -> SnapshotError {
|
||||
SnapshotError::Serialize(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::convert::From<fs_extra::error::Error> for SnapshotError {
|
||||
fn from(e: fs_extra::error::Error) -> SnapshotError {
|
||||
SnapshotError::FsExtra(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for SlotSnapshotPaths {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.slot.cmp(&other.slot))
|
||||
}
|
||||
}
|
||||
|
||||
impl SlotSnapshotPaths {
|
||||
fn copy_snapshot_directory<P: AsRef<Path>>(&self, snapshot_hardlink_dir: P) -> Result<()> {
|
||||
// Create a new directory in snapshot_hardlink_dir
|
||||
let new_slot_hardlink_dir = snapshot_hardlink_dir.as_ref().join(self.slot.to_string());
|
||||
let _ = fs::remove_dir_all(&new_slot_hardlink_dir);
|
||||
fs::create_dir_all(&new_slot_hardlink_dir)?;
|
||||
|
||||
// Copy the snapshot
|
||||
fs::copy(
|
||||
&self.snapshot_file_path,
|
||||
&new_slot_hardlink_dir.join(self.slot.to_string()),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
bank: &Bank,
|
||||
snapshot_files: &SlotSnapshotPaths,
|
||||
snapshot_package_output_file: P,
|
||||
snapshot_path: Q,
|
||||
slots_to_snapshot: &[u64],
|
||||
) -> Result<SnapshotPackage> {
|
||||
// Hard link all the snapshots we need for this package
|
||||
let snapshot_hard_links_dir = tempfile::tempdir_in(snapshot_path)?;
|
||||
|
||||
// Get a reference to all the relevant AccountStorageEntries
|
||||
let account_storage_entries: Vec<_> = bank
|
||||
.rc
|
||||
.get_storage_entries()
|
||||
.into_iter()
|
||||
.filter(|x| x.fork_id() <= bank.slot())
|
||||
.collect();
|
||||
|
||||
// Create a snapshot package
|
||||
info!(
|
||||
"Snapshot for bank: {} has {} account storage entries",
|
||||
bank.slot(),
|
||||
account_storage_entries.len()
|
||||
);
|
||||
|
||||
// Any errors from this point on will cause the above SnapshotPackage to drop, clearing
|
||||
// any temporary state created for the SnapshotPackage (like the snapshot_hard_links_dir)
|
||||
snapshot_files.copy_snapshot_directory(snapshot_hard_links_dir.path())?;
|
||||
|
||||
let package = SnapshotPackage::new(
|
||||
bank.slot(),
|
||||
bank.src.slot_deltas(slots_to_snapshot),
|
||||
snapshot_hard_links_dir,
|
||||
account_storage_entries,
|
||||
snapshot_package_output_file.as_ref().to_path_buf(),
|
||||
);
|
||||
|
||||
Ok(package)
|
||||
}
|
||||
|
||||
pub fn get_snapshot_paths<P: AsRef<Path>>(snapshot_path: P) -> Vec<SlotSnapshotPaths>
|
||||
where
|
||||
P: std::fmt::Debug,
|
||||
{
|
||||
match fs::read_dir(&snapshot_path) {
|
||||
Ok(paths) => {
|
||||
let mut names = paths
|
||||
.filter_map(|entry| {
|
||||
entry.ok().and_then(|e| {
|
||||
e.path()
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str().map(|s| s.parse::<u64>().ok()))
|
||||
.unwrap_or(None)
|
||||
})
|
||||
})
|
||||
.map(|slot| {
|
||||
let snapshot_path = snapshot_path.as_ref().join(slot.to_string());
|
||||
SlotSnapshotPaths {
|
||||
slot,
|
||||
snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<SlotSnapshotPaths>>();
|
||||
|
||||
names.sort();
|
||||
names
|
||||
}
|
||||
Err(err) => {
|
||||
info!(
|
||||
"Unable to read snapshot directory {:?}: {}",
|
||||
snapshot_path, err
|
||||
);
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_snapshot<P: AsRef<Path>>(snapshot_path: P, bank: &Bank) -> Result<()> {
|
||||
let slot = bank.slot();
|
||||
// snapshot_path/slot
|
||||
let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot);
|
||||
fs::create_dir_all(slot_snapshot_dir.clone())?;
|
||||
|
||||
// the snapshot is stored as snapshot_path/slot/slot
|
||||
let snapshot_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot));
|
||||
info!(
|
||||
"creating snapshot {}, path: {:?}",
|
||||
bank.slot(),
|
||||
snapshot_file_path,
|
||||
);
|
||||
|
||||
let snapshot_file = File::create(&snapshot_file_path)?;
|
||||
// snapshot writer
|
||||
let mut snapshot_stream = BufWriter::new(snapshot_file);
|
||||
// Create the snapshot
|
||||
serialize_into(&mut snapshot_stream, &*bank)?;
|
||||
let mut bank_rc_serialize = Measure::start("bank_rc_serialize-ms");
|
||||
serialize_into(&mut snapshot_stream, &bank.rc)?;
|
||||
bank_rc_serialize.stop();
|
||||
inc_new_counter_info!("bank-rc-serialize-ms", bank_rc_serialize.as_ms() as usize);
|
||||
|
||||
info!(
|
||||
"successfully created snapshot {}, path: {:?}",
|
||||
bank.slot(),
|
||||
snapshot_file_path,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn remove_snapshot<P: AsRef<Path>>(slot: u64, snapshot_path: P) -> Result<()> {
|
||||
let slot_snapshot_dir = get_bank_snapshot_dir(&snapshot_path, slot);
|
||||
// Remove the snapshot directory for this slot
|
||||
fs::remove_dir_all(slot_snapshot_dir)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn bank_slot_from_archive<P: AsRef<Path>>(snapshot_tar: P) -> Result<u64> {
|
||||
let tempdir = tempfile::TempDir::new()?;
|
||||
untar_snapshot_in(&snapshot_tar, &tempdir)?;
|
||||
let unpacked_snapshots_dir = tempdir.path().join(TAR_SNAPSHOTS_DIR);
|
||||
let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
|
||||
let last_root_paths = snapshot_paths
|
||||
.last()
|
||||
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;
|
||||
let file = File::open(&last_root_paths.snapshot_file_path)?;
|
||||
let mut stream = BufReader::new(file);
|
||||
let bank: Bank = deserialize_from(&mut stream)?;
|
||||
Ok(bank.slot())
|
||||
}
|
||||
|
||||
pub fn bank_from_archive<P: AsRef<Path>>(
|
||||
account_paths: String,
|
||||
snapshot_path: &PathBuf,
|
||||
snapshot_tar: P,
|
||||
) -> Result<Bank> {
|
||||
// Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()`
|
||||
let unpack_dir = tempfile::tempdir_in(snapshot_path)?;
|
||||
untar_snapshot_in(&snapshot_tar, &unpack_dir)?;
|
||||
|
||||
let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR);
|
||||
let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR);
|
||||
let bank = rebuild_bank_from_snapshots(
|
||||
account_paths,
|
||||
&unpacked_snapshots_dir,
|
||||
unpacked_accounts_dir,
|
||||
)?;
|
||||
|
||||
if !bank.verify_hash_internal_state() {
|
||||
panic!("Snapshot bank failed to verify");
|
||||
}
|
||||
|
||||
// Move the unpacked snapshots into `snapshot_path`
|
||||
let dir_files = fs::read_dir(&unpacked_snapshots_dir).unwrap_or_else(|err| {
|
||||
panic!(
|
||||
"Invalid snapshot path {:?}: {}",
|
||||
unpacked_snapshots_dir, err
|
||||
)
|
||||
});
|
||||
let paths: Vec<PathBuf> = dir_files
|
||||
.filter_map(|entry| entry.ok().map(|e| e.path()))
|
||||
.collect();
|
||||
let mut copy_options = CopyOptions::new();
|
||||
copy_options.overwrite = true;
|
||||
fs_extra::move_items(&paths, &snapshot_path, ©_options)?;
|
||||
|
||||
Ok(bank)
|
||||
}
|
||||
|
||||
pub fn get_snapshot_tar_path<P: AsRef<Path>>(snapshot_output_dir: P) -> PathBuf {
|
||||
snapshot_output_dir.as_ref().join("snapshot.tar.bz2")
|
||||
}
|
||||
|
||||
pub fn untar_snapshot_in<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
snapshot_tar: P,
|
||||
unpack_dir: Q,
|
||||
) -> Result<()> {
|
||||
let tar_bz2 = File::open(snapshot_tar)?;
|
||||
let tar = BzDecoder::new(BufReader::new(tar_bz2));
|
||||
let mut archive = Archive::new(tar);
|
||||
archive.unpack(&unpack_dir)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rebuild_bank_from_snapshots<P>(
|
||||
local_account_paths: String,
|
||||
unpacked_snapshots_dir: &PathBuf,
|
||||
append_vecs_path: P,
|
||||
) -> Result<Bank>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let mut snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
|
||||
if snapshot_paths.len() > 1 {
|
||||
return Err(get_io_error("invalid snapshot format"));
|
||||
}
|
||||
let root_paths = snapshot_paths
|
||||
.pop()
|
||||
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;
|
||||
|
||||
// Rebuild the root bank
|
||||
info!("Loading from {:?}", &root_paths.snapshot_file_path);
|
||||
let file = File::open(&root_paths.snapshot_file_path)?;
|
||||
let mut stream = BufReader::new(file);
|
||||
let bank: Bank = deserialize_from(&mut stream)?;
|
||||
|
||||
// Rebuild accounts
|
||||
bank.rc
|
||||
.accounts_from_stream(&mut stream, local_account_paths, append_vecs_path)?;
|
||||
|
||||
// Rebuild status cache
|
||||
let status_cache_path = unpacked_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
|
||||
let status_cache = File::open(status_cache_path)?;
|
||||
let mut stream = BufReader::new(status_cache);
|
||||
let slot_deltas: Vec<SlotDelta<transaction::Result<()>>> =
|
||||
deserialize_from(&mut stream).unwrap_or_default();
|
||||
|
||||
bank.src.append(&slot_deltas);
|
||||
|
||||
Ok(bank)
|
||||
}
|
||||
|
||||
fn get_snapshot_file_name(slot: u64) -> String {
|
||||
slot.to_string()
|
||||
}
|
||||
|
||||
fn get_bank_snapshot_dir<P: AsRef<Path>>(path: P, slot: u64) -> PathBuf {
|
||||
path.as_ref().join(slot.to_string())
|
||||
}
|
||||
|
||||
fn get_io_error(error: &str) -> SnapshotError {
|
||||
warn!("Snapshot Error: {:?}", error);
|
||||
SnapshotError::IO(IOError::new(ErrorKind::Other, error))
|
||||
}
|
||||
|
||||
pub fn verify_snapshot_tar<P, Q, R>(snapshot_tar: P, snapshots_to_verify: Q, storages_to_verify: R)
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
Q: AsRef<Path>,
|
||||
R: AsRef<Path>,
|
||||
{
|
||||
let temp_dir = tempfile::TempDir::new().unwrap();
|
||||
let unpack_dir = temp_dir.path();
|
||||
untar_snapshot_in(snapshot_tar, &unpack_dir).unwrap();
|
||||
|
||||
// Check snapshots are the same
|
||||
let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOTS_DIR);
|
||||
assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
|
||||
|
||||
// Check the account entries are the same
|
||||
let unpacked_accounts = unpack_dir.join(&TAR_ACCOUNTS_DIR);
|
||||
assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap());
|
||||
}
|
@ -2,7 +2,6 @@
|
||||
// for storage mining. Replicators submit storage proofs, validator then bundles them
|
||||
// to submit its proof for mining to be rewarded.
|
||||
|
||||
use crate::bank_forks::BankForks;
|
||||
use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys;
|
||||
use crate::cluster_info::ClusterInfo;
|
||||
use crate::contact_info::ContactInfo;
|
||||
@ -10,6 +9,7 @@ use crate::result::{Error, Result};
|
||||
use crate::service::Service;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use rand_chacha::ChaChaRng;
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
use solana_ledger::blocktree::Blocktree;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_runtime::storage_utils::replicator_accounts;
|
||||
|
@ -12,7 +12,6 @@
|
||||
//! 4. StorageStage
|
||||
//! - Generating the keys used to encrypt the ledger and sample it for storage mining.
|
||||
|
||||
use crate::bank_forks::BankForks;
|
||||
use crate::blockstream_service::BlockstreamService;
|
||||
use crate::cluster_info::ClusterInfo;
|
||||
use crate::confidence::ForkConfidenceCache;
|
||||
@ -25,6 +24,7 @@ use crate::service::Service;
|
||||
use crate::shred_fetch_stage::ShredFetchStage;
|
||||
use crate::snapshot_packager_service::SnapshotPackagerService;
|
||||
use crate::storage_stage::{StorageStage, StorageState};
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
use solana_ledger::blocktree::{Blocktree, CompletedSlotsReceiver};
|
||||
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
|
@ -1,7 +1,5 @@
|
||||
//! The `validator` module hosts all the validator microservices.
|
||||
|
||||
use crate::bank_forks::{BankForks, SnapshotConfig};
|
||||
use crate::blocktree_processor::{self, BankForksInfo};
|
||||
use crate::broadcast_stage::BroadcastStageType;
|
||||
use crate::cluster_info::{ClusterInfo, Node};
|
||||
use crate::confidence::ForkConfidenceCache;
|
||||
@ -15,12 +13,14 @@ use crate::rpc_service::JsonRpcService;
|
||||
use crate::rpc_subscriptions::RpcSubscriptions;
|
||||
use crate::service::Service;
|
||||
use crate::sigverify;
|
||||
use crate::snapshot_utils;
|
||||
use crate::storage_stage::StorageState;
|
||||
use crate::tpu::Tpu;
|
||||
use crate::tvu::{Sockets, Tvu};
|
||||
use solana_ledger::bank_forks::{BankForks, SnapshotConfig};
|
||||
use solana_ledger::blocktree::{Blocktree, CompletedSlotsReceiver};
|
||||
use solana_ledger::blocktree_processor::{self, BankForksInfo};
|
||||
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
||||
use solana_ledger::snapshot_utils;
|
||||
use solana_metrics::datapoint_info;
|
||||
use solana_sdk::clock::{Slot, DEFAULT_SLOTS_PER_TURN};
|
||||
use solana_sdk::genesis_block::GenesisBlock;
|
||||
|
Reference in New Issue
Block a user