Rwlock storage opt (#9006)
* Remove unecessary account paths rwlock * Remove path rwlock in accounts_db and optimize storage critical section
This commit is contained in:
		@@ -83,11 +83,10 @@ impl Accounts {
 | 
			
		||||
    pub fn accounts_from_stream<R: Read, P: AsRef<Path>>(
 | 
			
		||||
        &self,
 | 
			
		||||
        stream: &mut BufReader<R>,
 | 
			
		||||
        local_paths: &[PathBuf],
 | 
			
		||||
        append_vecs_path: P,
 | 
			
		||||
    ) -> std::result::Result<(), IOError> {
 | 
			
		||||
        self.accounts_db
 | 
			
		||||
            .accounts_from_stream(stream, local_paths, append_vecs_path)
 | 
			
		||||
            .accounts_from_stream(stream, append_vecs_path)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Return true if the slice has any duplicate elements
 | 
			
		||||
@@ -1404,7 +1403,7 @@ mod tests {
 | 
			
		||||
        let (_accounts_dir, daccounts_paths) = get_temp_accounts_paths(2).unwrap();
 | 
			
		||||
        let daccounts = Accounts::new(daccounts_paths.clone());
 | 
			
		||||
        assert!(daccounts
 | 
			
		||||
            .accounts_from_stream(&mut reader, &daccounts_paths, copied_accounts.path())
 | 
			
		||||
            .accounts_from_stream(&mut reader, copied_accounts.path())
 | 
			
		||||
            .is_ok());
 | 
			
		||||
        check_accounts(&daccounts, &pubkeys, 100);
 | 
			
		||||
        assert_eq!(accounts.bank_hash_at(0), daccounts.bank_hash_at(0));
 | 
			
		||||
 
 | 
			
		||||
@@ -440,7 +440,7 @@ pub struct AccountsDB {
 | 
			
		||||
    write_version: AtomicUsize,
 | 
			
		||||
 | 
			
		||||
    /// Set of storage paths to pick from
 | 
			
		||||
    paths: RwLock<Vec<PathBuf>>,
 | 
			
		||||
    paths: Vec<PathBuf>,
 | 
			
		||||
 | 
			
		||||
    /// Directory of paths this accounts_db needs to hold/remove
 | 
			
		||||
    temp_paths: Option<Vec<TempDir>>,
 | 
			
		||||
@@ -469,7 +469,7 @@ impl Default for AccountsDB {
 | 
			
		||||
            storage: RwLock::new(AccountStorage(HashMap::new())),
 | 
			
		||||
            next_id: AtomicUsize::new(0),
 | 
			
		||||
            write_version: AtomicUsize::new(0),
 | 
			
		||||
            paths: RwLock::new(vec![]),
 | 
			
		||||
            paths: vec![],
 | 
			
		||||
            temp_paths: None,
 | 
			
		||||
            file_size: DEFAULT_FILE_SIZE,
 | 
			
		||||
            thread_pool: rayon::ThreadPoolBuilder::new()
 | 
			
		||||
@@ -486,7 +486,7 @@ impl AccountsDB {
 | 
			
		||||
    pub fn new(paths: Vec<PathBuf>) -> Self {
 | 
			
		||||
        let new = if !paths.is_empty() {
 | 
			
		||||
            Self {
 | 
			
		||||
                paths: RwLock::new(paths),
 | 
			
		||||
                paths,
 | 
			
		||||
                temp_paths: None,
 | 
			
		||||
                ..Self::default()
 | 
			
		||||
            }
 | 
			
		||||
@@ -495,14 +495,13 @@ impl AccountsDB {
 | 
			
		||||
            // for testing
 | 
			
		||||
            let (temp_dirs, paths) = get_temp_accounts_paths(DEFAULT_NUM_DIRS).unwrap();
 | 
			
		||||
            Self {
 | 
			
		||||
                paths: RwLock::new(paths),
 | 
			
		||||
                paths,
 | 
			
		||||
                temp_paths: Some(temp_dirs),
 | 
			
		||||
                ..Self::default()
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
        {
 | 
			
		||||
            let paths = new.paths.read().unwrap();
 | 
			
		||||
            for path in paths.iter() {
 | 
			
		||||
            for path in new.paths.iter() {
 | 
			
		||||
                std::fs::create_dir_all(path).expect("Create directory failed.");
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
@@ -527,7 +526,6 @@ impl AccountsDB {
 | 
			
		||||
    pub fn accounts_from_stream<R: Read, P: AsRef<Path>>(
 | 
			
		||||
        &self,
 | 
			
		||||
        mut stream: &mut BufReader<R>,
 | 
			
		||||
        local_account_paths: &[PathBuf],
 | 
			
		||||
        append_vecs_path: P,
 | 
			
		||||
    ) -> Result<(), IOError> {
 | 
			
		||||
        let _len: usize =
 | 
			
		||||
@@ -542,8 +540,8 @@ impl AccountsDB {
 | 
			
		||||
            .map(|(slot_id, mut slot_storage)| {
 | 
			
		||||
                let mut new_slot_storage = HashMap::new();
 | 
			
		||||
                for (id, storage_entry) in slot_storage.drain() {
 | 
			
		||||
                    let path_index = thread_rng().gen_range(0, local_account_paths.len());
 | 
			
		||||
                    let local_dir = &local_account_paths[path_index];
 | 
			
		||||
                    let path_index = thread_rng().gen_range(0, self.paths.len());
 | 
			
		||||
                    let local_dir = &self.paths[path_index];
 | 
			
		||||
 | 
			
		||||
                    std::fs::create_dir_all(local_dir).expect("Create directory failed");
 | 
			
		||||
 | 
			
		||||
@@ -602,7 +600,6 @@ impl AccountsDB {
 | 
			
		||||
        self.bank_hashes.write().unwrap().insert(slot, bank_hash);
 | 
			
		||||
 | 
			
		||||
        // Process deserialized data, set necessary fields in self
 | 
			
		||||
        *self.paths.write().unwrap() = local_account_paths.to_vec();
 | 
			
		||||
        let max_id: usize = *storage
 | 
			
		||||
            .0
 | 
			
		||||
            .values()
 | 
			
		||||
@@ -938,22 +935,14 @@ impl AccountsDB {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn create_and_insert_store(&self, slot_id: Slot, size: u64) -> Arc<AccountStorageEntry> {
 | 
			
		||||
        let path_index = thread_rng().gen_range(0, self.paths.len());
 | 
			
		||||
        let store =
 | 
			
		||||
            Arc::new(self.new_storage_entry(slot_id, &Path::new(&self.paths[path_index]), size));
 | 
			
		||||
        let store_for_index = store.clone();
 | 
			
		||||
 | 
			
		||||
        let mut stores = self.storage.write().unwrap();
 | 
			
		||||
        let slot_storage = stores.0.entry(slot_id).or_insert_with(HashMap::new);
 | 
			
		||||
 | 
			
		||||
        self.create_store(slot_id, slot_storage, size)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn create_store(
 | 
			
		||||
        &self,
 | 
			
		||||
        slot_id: Slot,
 | 
			
		||||
        slot_storage: &mut SlotStores,
 | 
			
		||||
        size: u64,
 | 
			
		||||
    ) -> Arc<AccountStorageEntry> {
 | 
			
		||||
        let paths = self.paths.read().unwrap();
 | 
			
		||||
        let path_index = thread_rng().gen_range(0, paths.len());
 | 
			
		||||
        let store = Arc::new(self.new_storage_entry(slot_id, &Path::new(&paths[path_index]), size));
 | 
			
		||||
        slot_storage.insert(store.id, store.clone());
 | 
			
		||||
        slot_storage.insert(store.id, store_for_index);
 | 
			
		||||
        store
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -2332,12 +2321,11 @@ pub mod tests {
 | 
			
		||||
        let buf = writer.into_inner();
 | 
			
		||||
        let mut reader = BufReader::new(&buf[..]);
 | 
			
		||||
        let daccounts = AccountsDB::new(Vec::new());
 | 
			
		||||
        let local_paths = daccounts.paths.read().unwrap().clone();
 | 
			
		||||
        let copied_accounts = TempDir::new().unwrap();
 | 
			
		||||
        // Simulate obtaining a copy of the AppendVecs from a tarball
 | 
			
		||||
        copy_append_vecs(&accounts, copied_accounts.path()).unwrap();
 | 
			
		||||
        daccounts
 | 
			
		||||
            .accounts_from_stream(&mut reader, &local_paths, copied_accounts.path())
 | 
			
		||||
            .accounts_from_stream(&mut reader, copied_accounts.path())
 | 
			
		||||
            .unwrap();
 | 
			
		||||
 | 
			
		||||
        print_count_and_status("daccounts", &daccounts);
 | 
			
		||||
 
 | 
			
		||||
@@ -101,13 +101,12 @@ impl BankRc {
 | 
			
		||||
    pub fn accounts_from_stream<R: Read, P: AsRef<Path>>(
 | 
			
		||||
        &self,
 | 
			
		||||
        mut stream: &mut BufReader<R>,
 | 
			
		||||
        local_paths: &[PathBuf],
 | 
			
		||||
        append_vecs_path: P,
 | 
			
		||||
    ) -> std::result::Result<(), IOError> {
 | 
			
		||||
        let _len: usize =
 | 
			
		||||
            deserialize_from(&mut stream).map_err(|e| BankRc::get_io_error(&e.to_string()))?;
 | 
			
		||||
        self.accounts
 | 
			
		||||
            .accounts_from_stream(stream, local_paths, append_vecs_path)?;
 | 
			
		||||
            .accounts_from_stream(stream, append_vecs_path)?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
@@ -4776,7 +4775,7 @@ mod tests {
 | 
			
		||||
        copy_append_vecs(&bank2.rc.accounts.accounts_db, copied_accounts.path()).unwrap();
 | 
			
		||||
        dbank
 | 
			
		||||
            .rc
 | 
			
		||||
            .accounts_from_stream(&mut reader, &dbank_paths, copied_accounts.path())
 | 
			
		||||
            .accounts_from_stream(&mut reader, copied_accounts.path())
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        assert_eq!(dbank.get_balance(&key1.pubkey()), 0);
 | 
			
		||||
        assert_eq!(dbank.get_balance(&key2.pubkey()), 10);
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user