Handle duplicate AppendVec IDs (#20096)
When reconstructing the AccountsDb, if the storages came from full and incremental snapshots generated on different nodes, it's possible that the AppendVec IDs could overlap/have duplicates, which would cause the reconstruction to fail. This commit handles this issue by unconditionally remapping the AppendVec ID for every AppendVec. Fixes #17088
This commit is contained in:
@ -8669,11 +8669,6 @@ pub mod tests {
|
|||||||
accounts.write_version.load(Ordering::Relaxed)
|
accounts.write_version.load(Ordering::Relaxed)
|
||||||
);
|
);
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
daccounts.next_id.load(Ordering::Relaxed),
|
|
||||||
accounts.next_id.load(Ordering::Relaxed)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Get the hash for the latest slot, which should be the only hash in the
|
// Get the hash for the latest slot, which should be the only hash in the
|
||||||
// bank_hashes map on the deserialized AccountsDb
|
// bank_hashes map on the deserialized AccountsDb
|
||||||
assert_eq!(daccounts.bank_hashes.read().unwrap().len(), 2);
|
assert_eq!(daccounts.bank_hashes.read().unwrap().len(), 2);
|
||||||
|
@ -22,6 +22,7 @@ use {
|
|||||||
log::*,
|
log::*,
|
||||||
rayon::prelude::*,
|
rayon::prelude::*,
|
||||||
serde::{de::DeserializeOwned, Deserialize, Serialize},
|
serde::{de::DeserializeOwned, Deserialize, Serialize},
|
||||||
|
solana_measure::measure::Measure,
|
||||||
solana_program_runtime::InstructionProcessor,
|
solana_program_runtime::InstructionProcessor,
|
||||||
solana_sdk::{
|
solana_sdk::{
|
||||||
clock::{Epoch, Slot, UnixTimestamp},
|
clock::{Epoch, Slot, UnixTimestamp},
|
||||||
@ -39,7 +40,10 @@ use {
|
|||||||
io::{self, BufReader, BufWriter, Read, Write},
|
io::{self, BufReader, BufWriter, Read, Write},
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
result::Result,
|
result::Result,
|
||||||
sync::{atomic::Ordering, Arc, RwLock},
|
sync::{
|
||||||
|
atomic::{AtomicUsize, Ordering},
|
||||||
|
Arc, RwLock,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -372,17 +376,19 @@ fn reconstruct_single_storage<E>(
|
|||||||
slot: &Slot,
|
slot: &Slot,
|
||||||
append_vec_path: &Path,
|
append_vec_path: &Path,
|
||||||
storage_entry: &E,
|
storage_entry: &E,
|
||||||
|
remapped_append_vec_id: Option<AppendVecId>,
|
||||||
new_slot_storage: &mut HashMap<AppendVecId, Arc<AccountStorageEntry>>,
|
new_slot_storage: &mut HashMap<AppendVecId, Arc<AccountStorageEntry>>,
|
||||||
) -> Result<(), Error>
|
) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
E: SerializableStorage,
|
E: SerializableStorage,
|
||||||
{
|
{
|
||||||
|
let append_vec_id = remapped_append_vec_id.unwrap_or_else(|| storage_entry.id());
|
||||||
let (accounts, num_accounts) =
|
let (accounts, num_accounts) =
|
||||||
AppendVec::new_from_file(append_vec_path, storage_entry.current_len())?;
|
AppendVec::new_from_file(append_vec_path, storage_entry.current_len())?;
|
||||||
let u_storage_entry =
|
let u_storage_entry =
|
||||||
AccountStorageEntry::new_existing(*slot, storage_entry.id(), accounts, num_accounts);
|
AccountStorageEntry::new_existing(*slot, append_vec_id, accounts, num_accounts);
|
||||||
|
|
||||||
new_slot_storage.insert(storage_entry.id(), Arc::new(u_storage_entry));
|
new_slot_storage.insert(append_vec_id, Arc::new(u_storage_entry));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -427,6 +433,9 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remap the deserialized AppendVec paths to point to correct local paths
|
// Remap the deserialized AppendVec paths to point to correct local paths
|
||||||
|
let num_collisions = AtomicUsize::new(0);
|
||||||
|
let next_append_vec_id = AtomicUsize::new(0);
|
||||||
|
let mut measure_remap = Measure::start("remap");
|
||||||
let mut storage = (0..snapshot_storages.len())
|
let mut storage = (0..snapshot_storages.len())
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|i| {
|
.map(|i| {
|
||||||
@ -442,51 +451,94 @@ where
|
|||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
// Remap the AppendVec ID to handle any duplicate IDs that may previously existed
|
||||||
|
// due to full snapshots and incremental snapshots generated from different nodes
|
||||||
|
let (remapped_append_vec_id, remapped_append_vec_path) = loop {
|
||||||
|
let remapped_append_vec_id = next_append_vec_id.fetch_add(1, Ordering::Relaxed);
|
||||||
|
let remapped_file_name = AppendVec::file_name(*slot, remapped_append_vec_id);
|
||||||
|
let remapped_append_vec_path =
|
||||||
|
append_vec_path.parent().unwrap().join(&remapped_file_name);
|
||||||
|
|
||||||
|
// Break out of the loop in the following situations:
|
||||||
|
// 1. The new ID is the same as the original ID. This means we do not need to
|
||||||
|
// rename the file, since the ID is the "correct" one already.
|
||||||
|
// 2. There is not a file already at the new path. This means it is safe to
|
||||||
|
// rename the file to this new path.
|
||||||
|
// **DEVELOPER NOTE:** Keep this check last so that it can short-circuit if
|
||||||
|
// possible.
|
||||||
|
if storage_entry.id() == remapped_append_vec_id
|
||||||
|
|| std::fs::metadata(&remapped_append_vec_path).is_err()
|
||||||
|
{
|
||||||
|
break (remapped_append_vec_id, remapped_append_vec_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we made it this far, a file exists at the new path. Record the collision
|
||||||
|
// and try again.
|
||||||
|
num_collisions.fetch_add(1, Ordering::Relaxed);
|
||||||
|
};
|
||||||
|
// Only rename the file if the new ID is actually different from the original.
|
||||||
|
if storage_entry.id() != remapped_append_vec_id {
|
||||||
|
std::fs::rename(append_vec_path, &remapped_append_vec_path)?;
|
||||||
|
}
|
||||||
|
|
||||||
reconstruct_single_storage(
|
reconstruct_single_storage(
|
||||||
slot,
|
slot,
|
||||||
append_vec_path,
|
&remapped_append_vec_path,
|
||||||
storage_entry,
|
storage_entry,
|
||||||
|
Some(remapped_append_vec_id),
|
||||||
&mut new_slot_storage,
|
&mut new_slot_storage,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
Ok((*slot, new_slot_storage))
|
Ok((*slot, new_slot_storage))
|
||||||
})
|
})
|
||||||
.collect::<Result<HashMap<Slot, _>, Error>>()?;
|
.collect::<Result<HashMap<Slot, _>, Error>>()?;
|
||||||
|
measure_remap.stop();
|
||||||
|
|
||||||
// discard any slots with no storage entries
|
// discard any slots with no storage entries
|
||||||
// this can happen if a non-root slot was serialized
|
// this can happen if a non-root slot was serialized
|
||||||
// but non-root stores should not be included in the snapshot
|
// but non-root stores should not be included in the snapshot
|
||||||
storage.retain(|_slot, stores| !stores.is_empty());
|
storage.retain(|_slot, stores| !stores.is_empty());
|
||||||
|
assert!(
|
||||||
|
!storage.is_empty(),
|
||||||
|
"At least one storage entry must exist from deserializing stream"
|
||||||
|
);
|
||||||
|
|
||||||
|
let next_append_vec_id = next_append_vec_id.load(Ordering::Relaxed);
|
||||||
|
let max_append_vec_id = next_append_vec_id - 1;
|
||||||
|
assert!(
|
||||||
|
max_append_vec_id <= AppendVecId::MAX / 2,
|
||||||
|
"Storage id {} larger than allowed max",
|
||||||
|
max_append_vec_id
|
||||||
|
);
|
||||||
|
|
||||||
|
// Process deserialized data, set necessary fields in self
|
||||||
accounts_db
|
accounts_db
|
||||||
.bank_hashes
|
.bank_hashes
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.insert(snapshot_slot, snapshot_bank_hash_info);
|
.insert(snapshot_slot, snapshot_bank_hash_info);
|
||||||
|
|
||||||
// Process deserialized data, set necessary fields in self
|
|
||||||
let max_id: usize = *storage
|
|
||||||
.values()
|
|
||||||
.flat_map(HashMap::keys)
|
|
||||||
.max()
|
|
||||||
.expect("At least one storage entry must exist from deserializing stream");
|
|
||||||
|
|
||||||
{
|
|
||||||
accounts_db.storage.0.extend(
|
accounts_db.storage.0.extend(
|
||||||
storage.into_iter().map(|(slot, slot_storage_entry)| {
|
storage
|
||||||
(slot, Arc::new(RwLock::new(slot_storage_entry)))
|
.into_iter()
|
||||||
}),
|
.map(|(slot, slot_storage_entry)| (slot, Arc::new(RwLock::new(slot_storage_entry)))),
|
||||||
);
|
);
|
||||||
}
|
accounts_db
|
||||||
|
.next_id
|
||||||
if max_id > AppendVecId::MAX / 2 {
|
.store(next_append_vec_id, Ordering::Relaxed);
|
||||||
panic!("Storage id {} larger than allowed max", max_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
accounts_db.next_id.store(max_id + 1, Ordering::Relaxed);
|
|
||||||
accounts_db
|
accounts_db
|
||||||
.write_version
|
.write_version
|
||||||
.fetch_add(snapshot_version, Ordering::Relaxed);
|
.fetch_add(snapshot_version, Ordering::Relaxed);
|
||||||
accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index);
|
accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index);
|
||||||
|
|
||||||
|
datapoint_info!(
|
||||||
|
"reconstruct_accountsdb_from_fields()",
|
||||||
|
("remap-time-us", measure_remap.as_us(), i64),
|
||||||
|
(
|
||||||
|
"remap-collisions",
|
||||||
|
num_collisions.load(Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
Ok(accounts_db)
|
Ok(accounts_db)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user