automerge
This commit is contained in:
@ -58,8 +58,11 @@ impl BlockstreamService {
|
|||||||
let timeout = Duration::new(1, 0);
|
let timeout = Duration::new(1, 0);
|
||||||
let (slot, slot_leader) = slot_full_receiver.recv_timeout(timeout)?;
|
let (slot, slot_leader) = slot_full_receiver.recv_timeout(timeout)?;
|
||||||
|
|
||||||
let entries = blockstore.get_slot_entries(slot, 0, None).unwrap();
|
// Slot might not exist due to LedgerCleanupService, check first
|
||||||
let blockstore_meta = blockstore.meta(slot).unwrap().unwrap();
|
let blockstore_meta = blockstore.meta(slot).unwrap();
|
||||||
|
if let Some(blockstore_meta) = blockstore_meta {
|
||||||
|
// Return error to main loop. Thread won't exit, will just log the error
|
||||||
|
let entries = blockstore.get_slot_entries(slot, 0, None)?;
|
||||||
let _parent_slot = if slot == 0 {
|
let _parent_slot = if slot == 0 {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
@ -85,6 +88,7 @@ impl BlockstreamService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,9 +68,13 @@ impl LedgerCleanupService {
|
|||||||
let disk_utilization_pre = blockstore.storage_size();
|
let disk_utilization_pre = blockstore.storage_size();
|
||||||
|
|
||||||
let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
|
let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
|
||||||
|
|
||||||
|
// Notify blockstore of impending purge
|
||||||
if root > *next_purge_batch {
|
if root > *next_purge_batch {
|
||||||
//cleanup
|
//cleanup
|
||||||
blockstore.purge_slots(0, Some(root - max_ledger_slots));
|
let lowest_slot = root - max_ledger_slots;
|
||||||
|
*blockstore.lowest_cleanup_slot.write().unwrap() = lowest_slot;
|
||||||
|
blockstore.purge_slots(0, Some(lowest_slot));
|
||||||
*next_purge_batch += DEFAULT_PURGE_BATCH_SIZE;
|
*next_purge_batch += DEFAULT_PURGE_BATCH_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -527,6 +527,9 @@ impl ReplayStage {
|
|||||||
let tx_count = tx_count_after - tx_count_before;
|
let tx_count = tx_count_after - tx_count_before;
|
||||||
|
|
||||||
confirm_result.map_err(|err| {
|
confirm_result.map_err(|err| {
|
||||||
|
// LedgerCleanupService should not be cleaning up anything
|
||||||
|
// that comes after the root, so we should not see any
|
||||||
|
// errors related to the slot being purged
|
||||||
let slot = bank.slot();
|
let slot = bank.slot();
|
||||||
warn!("Fatal replay error in slot: {}, err: {:?}", slot, err);
|
warn!("Fatal replay error in slot: {}, err: {:?}", slot, err);
|
||||||
datapoint_error!(
|
datapoint_error!(
|
||||||
|
@ -383,7 +383,11 @@ impl JsonRpcRequestProcessor {
|
|||||||
let stakes = HashMap::new();
|
let stakes = HashMap::new();
|
||||||
let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes);
|
let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes);
|
||||||
|
|
||||||
Ok(self.blockstore.get_block_time(slot, slot_duration, stakes))
|
Ok(self
|
||||||
|
.blockstore
|
||||||
|
.get_block_time(slot, slot_duration, stakes)
|
||||||
|
.ok()
|
||||||
|
.unwrap_or(None))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,6 +90,7 @@ pub struct Blockstore {
|
|||||||
insert_shreds_lock: Arc<Mutex<()>>,
|
insert_shreds_lock: Arc<Mutex<()>>,
|
||||||
pub new_shreds_signals: Vec<SyncSender<bool>>,
|
pub new_shreds_signals: Vec<SyncSender<bool>>,
|
||||||
pub completed_slots_senders: Vec<SyncSender<Vec<Slot>>>,
|
pub completed_slots_senders: Vec<SyncSender<Vec<Slot>>>,
|
||||||
|
pub lowest_cleanup_slot: Arc<RwLock<u64>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct IndexMetaWorkingSetEntry {
|
pub struct IndexMetaWorkingSetEntry {
|
||||||
@ -207,7 +208,7 @@ impl Blockstore {
|
|||||||
|
|
||||||
measure.stop();
|
measure.stop();
|
||||||
info!("{:?} {}", blockstore_path, measure);
|
info!("{:?} {}", blockstore_path, measure);
|
||||||
Ok(Blockstore {
|
let blockstore = Blockstore {
|
||||||
db,
|
db,
|
||||||
meta_cf,
|
meta_cf,
|
||||||
dead_slots_cf,
|
dead_slots_cf,
|
||||||
@ -222,7 +223,9 @@ impl Blockstore {
|
|||||||
completed_slots_senders: vec![],
|
completed_slots_senders: vec![],
|
||||||
insert_shreds_lock: Arc::new(Mutex::new(())),
|
insert_shreds_lock: Arc::new(Mutex::new(())),
|
||||||
last_root,
|
last_root,
|
||||||
})
|
lowest_cleanup_slot: Arc::new(RwLock::new(0)),
|
||||||
|
};
|
||||||
|
Ok(blockstore)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn open_with_signal(
|
pub fn open_with_signal(
|
||||||
@ -1059,6 +1062,12 @@ impl Blockstore {
|
|||||||
to_index: u64,
|
to_index: u64,
|
||||||
buffer: &mut [u8],
|
buffer: &mut [u8],
|
||||||
) -> Result<(u64, usize)> {
|
) -> Result<(u64, usize)> {
|
||||||
|
// lowest_cleanup_slot is the last slot that was not cleaned up by
|
||||||
|
// LedgerCleanupService
|
||||||
|
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
|
||||||
|
if *lowest_cleanup_slot > slot {
|
||||||
|
return Err(BlockstoreError::SlotCleanedUp);
|
||||||
|
}
|
||||||
let meta_cf = self.db.column::<cf::SlotMeta>();
|
let meta_cf = self.db.column::<cf::SlotMeta>();
|
||||||
let mut buffer_offset = 0;
|
let mut buffer_offset = 0;
|
||||||
let mut last_index = 0;
|
let mut last_index = 0;
|
||||||
@ -1288,14 +1297,26 @@ impl Blockstore {
|
|||||||
slot: Slot,
|
slot: Slot,
|
||||||
slot_duration: Duration,
|
slot_duration: Duration,
|
||||||
stakes: &HashMap<Pubkey, (u64, Account)>,
|
stakes: &HashMap<Pubkey, (u64, Account)>,
|
||||||
) -> Option<UnixTimestamp> {
|
) -> Result<Option<UnixTimestamp>> {
|
||||||
|
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
|
||||||
|
// lowest_cleanup_slot is the last slot that was not cleaned up by
|
||||||
|
// LedgerCleanupService
|
||||||
|
if *lowest_cleanup_slot > slot {
|
||||||
|
return Err(BlockstoreError::SlotCleanedUp);
|
||||||
|
}
|
||||||
|
|
||||||
let unique_timestamps: HashMap<Pubkey, (Slot, UnixTimestamp)> = self
|
let unique_timestamps: HashMap<Pubkey, (Slot, UnixTimestamp)> = self
|
||||||
.get_timestamp_slots(slot, TIMESTAMP_SLOT_INTERVAL, TIMESTAMP_SLOT_RANGE)
|
.get_timestamp_slots(slot, TIMESTAMP_SLOT_INTERVAL, TIMESTAMP_SLOT_RANGE)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default())
|
.flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
calculate_stake_weighted_timestamp(unique_timestamps, stakes, slot, slot_duration)
|
Ok(calculate_stake_weighted_timestamp(
|
||||||
|
unique_timestamps,
|
||||||
|
stakes,
|
||||||
|
slot,
|
||||||
|
slot_duration,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_timestamp_slots(
|
fn get_timestamp_slots(
|
||||||
@ -1346,6 +1367,12 @@ impl Blockstore {
|
|||||||
slot: Slot,
|
slot: Slot,
|
||||||
encoding: Option<RpcTransactionEncoding>,
|
encoding: Option<RpcTransactionEncoding>,
|
||||||
) -> Result<RpcConfirmedBlock> {
|
) -> Result<RpcConfirmedBlock> {
|
||||||
|
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
|
||||||
|
// lowest_cleanup_slot is the last slot that was not cleaned up by
|
||||||
|
// LedgerCleanupService
|
||||||
|
if *lowest_cleanup_slot > slot {
|
||||||
|
return Err(BlockstoreError::SlotCleanedUp);
|
||||||
|
}
|
||||||
let encoding = encoding.unwrap_or(RpcTransactionEncoding::Json);
|
let encoding = encoding.unwrap_or(RpcTransactionEncoding::Json);
|
||||||
if self.is_root(slot) {
|
if self.is_root(slot) {
|
||||||
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
|
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
|
||||||
@ -1466,6 +1493,14 @@ impl Blockstore {
|
|||||||
if self.is_dead(slot) {
|
if self.is_dead(slot) {
|
||||||
return Err(BlockstoreError::DeadSlot);
|
return Err(BlockstoreError::DeadSlot);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// lowest_cleanup_slot is the last slot that was not cleaned up by
|
||||||
|
// LedgerCleanupService
|
||||||
|
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
|
||||||
|
if *lowest_cleanup_slot > slot {
|
||||||
|
return Err(BlockstoreError::SlotCleanedUp);
|
||||||
|
}
|
||||||
|
|
||||||
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
|
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
|
||||||
let slot_meta = slot_meta_cf.get(slot)?;
|
let slot_meta = slot_meta_cf.get(slot)?;
|
||||||
if slot_meta.is_none() {
|
if slot_meta.is_none() {
|
||||||
@ -4886,10 +4921,11 @@ pub mod tests {
|
|||||||
})
|
})
|
||||||
.sum();
|
.sum();
|
||||||
expected_time /= total_stake;
|
expected_time /= total_stake;
|
||||||
assert_eq!(block_time_slot_3.unwrap() as u64, expected_time);
|
assert_eq!(block_time_slot_3.unwrap().unwrap() as u64, expected_time);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
blockstore
|
blockstore
|
||||||
.get_block_time(8, slot_duration.clone(), &stakes)
|
.get_block_time(8, slot_duration.clone(), &stakes)
|
||||||
|
.unwrap()
|
||||||
.unwrap() as u64,
|
.unwrap() as u64,
|
||||||
expected_time + 2 // At 400ms block duration, 5 slots == 2sec
|
expected_time + 2 // At 400ms block duration, 5 slots == 2sec
|
||||||
);
|
);
|
||||||
|
@ -49,6 +49,7 @@ pub enum BlockstoreError {
|
|||||||
IO(#[from] std::io::Error),
|
IO(#[from] std::io::Error),
|
||||||
Serialize(#[from] Box<bincode::ErrorKind>),
|
Serialize(#[from] Box<bincode::ErrorKind>),
|
||||||
FsExtraError(#[from] fs_extra::error::Error),
|
FsExtraError(#[from] fs_extra::error::Error),
|
||||||
|
SlotCleanedUp,
|
||||||
}
|
}
|
||||||
pub(crate) type Result<T> = std::result::Result<T, BlockstoreError>;
|
pub(crate) type Result<T> = std::result::Result<T, BlockstoreError>;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user