Restart validator nodes from snapshots

This commit is contained in:
Sathish Ambley
2019-06-05 21:51:44 -07:00
committed by Michael Vines
parent dc5c6e7cf8
commit 7fd879b417
9 changed files with 193 additions and 174 deletions

View File

@ -33,7 +33,7 @@ use solana_sdk::pubkey::Pubkey;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::fs::{create_dir_all, remove_dir_all};
use std::io::Cursor;
use std::io::{BufReader, Cursor, Error, ErrorKind, Read};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
@ -311,6 +311,44 @@ impl AccountsDB {
Self::new_with_file_size(paths, ACCOUNT_DATA_FILE_SIZE)
}
pub fn update_from_stream<R: Read>(
&self,
mut stream: &mut BufReader<R>,
) -> Result<(), std::io::Error> {
AppendVec::set_account_paths(&self.paths);
let _len: usize = deserialize_from(&mut stream)
.map_err(|_| AccountsDB::get_io_error("len deserialize error"))?;
let accounts_index: AccountsIndex<AccountInfo> = deserialize_from(&mut stream)
.map_err(|_| AccountsDB::get_io_error("accounts index deserialize error"))?;
let storage: AccountStorage = deserialize_from(&mut stream)
.map_err(|_| AccountsDB::get_io_error("storage deserialize error"))?;
let version: usize = deserialize_from(&mut stream)
.map_err(|_| AccountsDB::get_io_error("write version deserialize error"))?;
let mut ids: Vec<usize> = storage
.0
.values()
.flat_map(HashMap::keys)
.cloned()
.collect();
ids.sort();
{
let mut index = self.accounts_index.write().unwrap();
index.account_maps.extend(accounts_index.account_maps);
let union = index.roots.union(&accounts_index.roots);
index.roots = union.cloned().collect();
index.last_root = accounts_index.last_root;
}
*self.storage.write().unwrap() = storage;
self.next_id
.store(ids[ids.len() - 1] + 1, Ordering::Relaxed);
self.write_version.store(version, Ordering::Relaxed);
self.generate_index();
Ok(())
}
fn new_storage_entry(&self, fork_id: Fork, path: &str) -> AccountStorageEntry {
AccountStorageEntry::new(
path,
@ -576,10 +614,17 @@ impl AccountsDB {
}
}
fn generate_index(&mut self) {
fn get_io_error(error: &str) -> Error {
warn!("AccountsDB error: {:?}", error);
Error::new(ErrorKind::Other, error)
}
fn generate_index(&self) {
let mut forks: Vec<Fork> = self.storage.read().unwrap().0.keys().cloned().collect();
forks.sort();
let mut accounts_index = self.accounts_index.write().unwrap();
accounts_index.roots.insert(0);
for fork_id in forks.iter() {
let mut accumulator: Vec<HashMap<Pubkey, (u64, AccountInfo)>> = self
.scan_account_storage(
@ -603,7 +648,6 @@ impl AccountsDB {
while let Some(maps) = accumulator.pop() {
AccountsDB::merge(&mut account_maps, &maps);
}
let mut accounts_index = self.accounts_index.write().unwrap();
for (pubkey, (_, account_info)) in account_maps.iter() {
accounts_index.add_index(*fork_id, pubkey, account_info.clone());
}
@ -618,90 +662,27 @@ impl Serialize for AccountsDB {
{
use serde::ser::Error;
let len = serialized_size(&self.accounts_index).unwrap()
+ serialized_size(&self.paths).unwrap()
+ serialized_size(&self.storage).unwrap()
+ std::mem::size_of::<u64>() as u64
+ serialized_size(&self.file_size).unwrap();
+ std::mem::size_of::<usize>() as u64;
let mut buf = vec![0u8; len as usize];
let mut wr = Cursor::new(&mut buf[..]);
serialize_into(&mut wr, &self.accounts_index).map_err(Error::custom)?;
serialize_into(&mut wr, &self.paths).map_err(Error::custom)?;
serialize_into(&mut wr, &self.storage).map_err(Error::custom)?;
serialize_into(
&mut wr,
&(self.write_version.load(Ordering::Relaxed) as u64),
)
.map_err(Error::custom)?;
serialize_into(&mut wr, &self.file_size).map_err(Error::custom)?;
let len = wr.position() as usize;
serializer.serialize_bytes(&wr.into_inner()[..len])
}
}
struct AccountsDBVisitor;
impl<'a> serde::de::Visitor<'a> for AccountsDBVisitor {
type Value = AccountsDB;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expecting AccountsDB")
}
fn visit_bytes<E>(self, data: &[u8]) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
use serde::de::Error;
let mut rd = Cursor::new(&data[..]);
let accounts_index: RwLock<AccountsIndex<AccountInfo>> =
deserialize_from(&mut rd).map_err(Error::custom)?;
let paths: Vec<String> = deserialize_from(&mut rd).map_err(Error::custom)?;
let storage: RwLock<AccountStorage> = deserialize_from(&mut rd).map_err(Error::custom)?;
let write_version: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;
let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;
let mut ids: Vec<usize> = storage
.read()
.unwrap()
.0
.values()
.flat_map(HashMap::keys)
.cloned()
.collect();
ids.sort();
let mut accounts_db = AccountsDB {
accounts_index,
storage,
next_id: AtomicUsize::new(ids[ids.len() - 1] + 1),
write_version: AtomicUsize::new(write_version as usize),
paths,
file_size,
thread_pool: rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build()
.unwrap(),
};
accounts_db.generate_index();
Ok(accounts_db)
}
}
impl<'de> Deserialize<'de> for AccountsDB {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: ::serde::Deserializer<'de>,
{
deserializer.deserialize_bytes(AccountsDBVisitor)
}
}
#[cfg(test)]
mod tests {
// TODO: all the bank tests are bank specific, issue: 2194
use super::*;
use bincode::{deserialize_from, serialize_into, serialized_size};
use bincode::{serialize_into, serialized_size};
use maplit::hashmap;
use rand::{thread_rng, Rng};
use solana_sdk::account::Account;
@ -1008,6 +989,7 @@ mod tests {
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.0[&0].len(), 1);
assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Available);
assert_eq!(stores.0[&0][&0].count(), count);
stores.0[&0][&0].count() == count
}
@ -1257,8 +1239,9 @@ mod tests {
let mut writer = Cursor::new(&mut buf[..]);
serialize_into(&mut writer, &accounts).unwrap();
let mut reader = Cursor::new(&mut buf[..]);
let daccounts: AccountsDB = deserialize_from(&mut reader).unwrap();
let mut reader = BufReader::new(&buf[..]);
let daccounts = AccountsDB::new(&paths.paths);
assert!(daccounts.update_from_stream(&mut reader).is_ok());
check_accounts(&daccounts, &pubkeys, 0, 100, 2);
check_accounts(&daccounts, &pubkeys1, 1, 10, 1);
}