Add newly completed slots signal to Blocktree (#4225)
* Add channel to blocktree for communicating when slots are completed * Refactor RepairService options into a RepairStrategy
This commit is contained in:
@@ -28,7 +28,7 @@ use std::cmp;
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::rc::Rc;
|
||||
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
|
||||
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
pub use self::meta::*;
|
||||
@@ -63,6 +63,10 @@ db_imports! {rocks, Rocks, "rocksdb"}
|
||||
#[cfg(feature = "kvstore")]
|
||||
db_imports! {kvs, Kvs, "kvstore"}
|
||||
|
||||
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
|
||||
|
||||
pub type CompletedSlotsReceiver = Receiver<Vec<u64>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum BlocktreeError {
|
||||
BlobForIndexExists,
|
||||
@@ -83,6 +87,7 @@ pub struct Blocktree {
|
||||
batch_processor: Arc<RwLock<BatchProcessor>>,
|
||||
session: Arc<erasure::Session>,
|
||||
pub new_blobs_signals: Vec<SyncSender<bool>>,
|
||||
pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>,
|
||||
}
|
||||
|
||||
// Column family for metadata about a leader slot
|
||||
@@ -141,15 +146,21 @@ impl Blocktree {
|
||||
session,
|
||||
new_blobs_signals: vec![],
|
||||
batch_processor,
|
||||
completed_slots_senders: vec![],
|
||||
})
|
||||
}
|
||||
|
||||
pub fn open_with_signal(ledger_path: &str) -> Result<(Self, Receiver<bool>)> {
|
||||
pub fn open_with_signal(
|
||||
ledger_path: &str,
|
||||
) -> Result<(Self, Receiver<bool>, CompletedSlotsReceiver)> {
|
||||
let mut blocktree = Self::open(ledger_path)?;
|
||||
let (signal_sender, signal_receiver) = sync_channel(1);
|
||||
let (completed_slots_sender, completed_slots_receiver) =
|
||||
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
|
||||
blocktree.new_blobs_signals = vec![signal_sender];
|
||||
blocktree.completed_slots_senders = vec![completed_slots_sender];
|
||||
|
||||
Ok((blocktree, signal_receiver))
|
||||
Ok((blocktree, signal_receiver, completed_slots_receiver))
|
||||
}
|
||||
|
||||
pub fn destroy(ledger_path: &str) -> Result<()> {
|
||||
@@ -340,11 +351,17 @@ impl Blocktree {
|
||||
// Handle chaining for the working set
|
||||
handle_chaining(&db, &mut write_batch, &slot_meta_working_set)?;
|
||||
let mut should_signal = false;
|
||||
let mut newly_completed_slots = vec![];
|
||||
|
||||
// Check if any metadata was changed, if so, insert the new version of the
|
||||
// metadata into the write batch
|
||||
for (slot, (meta, meta_backup)) in slot_meta_working_set.iter() {
|
||||
let meta: &SlotMeta = &RefCell::borrow(&*meta);
|
||||
if !self.completed_slots_senders.is_empty()
|
||||
&& is_newly_completed_slot(meta, meta_backup)
|
||||
{
|
||||
newly_completed_slots.push(*slot);
|
||||
}
|
||||
// Check if the working copy of the metadata has changed
|
||||
if Some(meta) != meta_backup.as_ref() {
|
||||
should_signal = should_signal || slot_has_updates(meta, &meta_backup);
|
||||
@@ -356,13 +373,38 @@ impl Blocktree {
|
||||
write_batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
|
||||
}
|
||||
|
||||
batch_processor.write(write_batch)?;
|
||||
|
||||
if should_signal {
|
||||
for signal in self.new_blobs_signals.iter() {
|
||||
for signal in &self.new_blobs_signals {
|
||||
let _ = signal.try_send(true);
|
||||
}
|
||||
}
|
||||
|
||||
batch_processor.write(write_batch)?;
|
||||
if !self.completed_slots_senders.is_empty() && !newly_completed_slots.is_empty() {
|
||||
let mut slots: Vec<_> = (0..self.completed_slots_senders.len() - 1)
|
||||
.map(|_| newly_completed_slots.clone())
|
||||
.collect();
|
||||
|
||||
slots.push(newly_completed_slots);
|
||||
|
||||
for (signal, slots) in self.completed_slots_senders.iter().zip(slots.into_iter()) {
|
||||
let res = signal.try_send(slots);
|
||||
if let Err(TrySendError::Full(_)) = res {
|
||||
solana_metrics::submit(
|
||||
solana_metrics::influxdb::Point::new("blocktree_error")
|
||||
.add_field(
|
||||
"error",
|
||||
solana_metrics::influxdb::Value::String(
|
||||
"Unable to send newly completed slot because channel is full"
|
||||
.to_string(),
|
||||
),
|
||||
)
|
||||
.to_owned(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -880,7 +922,7 @@ fn insert_data_blob<'a>(
|
||||
slot_meta.received = cmp::max(blob_index + 1, slot_meta.received);
|
||||
slot_meta.consumed = new_consumed;
|
||||
slot_meta.last_index = {
|
||||
// If the last slot hasn't been set before, then
|
||||
// If the last index in the slot hasn't been set before, then
|
||||
// set it to this blob index
|
||||
if slot_meta.last_index == std::u64::MAX {
|
||||
if blob_to_insert.is_last_in_slot() {
|
||||
@@ -1123,9 +1165,8 @@ fn handle_chaining_for_slot(
|
||||
.expect("Slot must exist in the working_set hashmap");
|
||||
|
||||
{
|
||||
let is_orphaned = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap());
|
||||
|
||||
let mut meta_mut = meta.borrow_mut();
|
||||
let was_orphan_slot = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap());
|
||||
|
||||
// If:
|
||||
// 1) This is a new slot
|
||||
@@ -1137,27 +1178,32 @@ fn handle_chaining_for_slot(
|
||||
// Check if the slot represented by meta_mut is either a new slot or a orphan.
|
||||
// In both cases we need to run the chaining logic b/c the parent on the slot was
|
||||
// previously unknown.
|
||||
if meta_backup.is_none() || is_orphaned {
|
||||
if meta_backup.is_none() || was_orphan_slot {
|
||||
let prev_slot_meta =
|
||||
find_slot_meta_else_create(db, working_set, new_chained_slots, prev_slot)?;
|
||||
|
||||
// This is a newly inserted slot so run the chaining logic
|
||||
// This is a newly inserted slot/orphan so run the chaining logic to link it to a
|
||||
// newly discovered parent
|
||||
chain_new_slot_to_prev_slot(&mut prev_slot_meta.borrow_mut(), slot, &mut meta_mut);
|
||||
|
||||
// If the parent of `slot` is a newly inserted orphan, insert it into the orphans
|
||||
// column family
|
||||
if is_orphan(&RefCell::borrow(&*prev_slot_meta)) {
|
||||
write_batch.put::<cf::Orphans>(prev_slot, &true)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// At this point this slot has received a parent, so no longer a orphan
|
||||
if is_orphaned {
|
||||
// At this point this slot has received a parent, so it's no longer an orphan
|
||||
if was_orphan_slot {
|
||||
write_batch.delete::<cf::Orphans>(slot)?;
|
||||
}
|
||||
}
|
||||
|
||||
// This is a newly inserted slot and slot.is_connected is true, so update all
|
||||
// child slots so that their `is_connected` = true
|
||||
// If this is a newly inserted slot, then we know the children of this slot were not previously
|
||||
// connected to the trunk of the ledger. Thus if slot.is_connected is now true, we need to
|
||||
// update all child slots with `is_connected` = true because these children are also now newly
|
||||
// connected to to trunk of the the ledger
|
||||
let should_propagate_is_connected =
|
||||
is_newly_completed_slot(&RefCell::borrow(&*meta), meta_backup)
|
||||
&& RefCell::borrow(&*meta).is_connected;
|
||||
@@ -1238,7 +1284,6 @@ fn chain_new_slot_to_prev_slot(
|
||||
fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option<SlotMeta>) -> bool {
|
||||
slot_meta.is_full()
|
||||
&& (backup_slot_meta.is_none()
|
||||
|| is_orphan(&backup_slot_meta.as_ref().unwrap())
|
||||
|| slot_meta.consumed != backup_slot_meta.as_ref().unwrap().consumed)
|
||||
}
|
||||
|
||||
@@ -2112,7 +2157,7 @@ pub mod tests {
|
||||
pub fn test_new_blobs_signal() {
|
||||
// Initialize ledger
|
||||
let ledger_path = get_tmp_ledger_path("test_new_blobs_signal");
|
||||
let (ledger, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap();
|
||||
let (ledger, recvr, _) = Blocktree::open_with_signal(&ledger_path).unwrap();
|
||||
let ledger = Arc::new(ledger);
|
||||
|
||||
let entries_per_slot = 10;
|
||||
@@ -2188,6 +2233,98 @@ pub mod tests {
|
||||
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_completed_blobs_signal() {
|
||||
// Initialize ledger
|
||||
let ledger_path = get_tmp_ledger_path("test_completed_blobs_signal");
|
||||
let (ledger, _, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap();
|
||||
let ledger = Arc::new(ledger);
|
||||
|
||||
let entries_per_slot = 10;
|
||||
|
||||
// Create blobs for slot 0
|
||||
let (blobs, _) = make_slot_entries(0, 0, entries_per_slot);
|
||||
|
||||
// Insert all but the first blob in the slot, should not be considered complete
|
||||
ledger
|
||||
.insert_data_blobs(&blobs[1..entries_per_slot as usize])
|
||||
.unwrap();
|
||||
assert!(recvr.try_recv().is_err());
|
||||
|
||||
// Insert first blob, slot should now be considered complete
|
||||
ledger.insert_data_blobs(once(&blobs[0])).unwrap();
|
||||
assert_eq!(recvr.try_recv().unwrap(), vec![0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_completed_blobs_signal_orphans() {
|
||||
// Initialize ledger
|
||||
let ledger_path = get_tmp_ledger_path("test_completed_blobs_signal_orphans");
|
||||
let (ledger, _, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap();
|
||||
let ledger = Arc::new(ledger);
|
||||
|
||||
let entries_per_slot = 10;
|
||||
let slots = vec![2, 5, 10];
|
||||
let all_blobs = make_chaining_slot_entries(&slots[..], entries_per_slot);
|
||||
|
||||
// Get the blobs for slot 5 chaining to slot 2
|
||||
let (ref orphan_blobs, _) = all_blobs[1];
|
||||
|
||||
// Get the blobs for slot 10, chaining to slot 5
|
||||
let (ref orphan_child, _) = all_blobs[2];
|
||||
|
||||
// Insert all but the first blob in the slot, should not be considered complete
|
||||
ledger
|
||||
.insert_data_blobs(&orphan_child[1..entries_per_slot as usize])
|
||||
.unwrap();
|
||||
assert!(recvr.try_recv().is_err());
|
||||
|
||||
// Insert first blob, slot should now be considered complete
|
||||
ledger.insert_data_blobs(once(&orphan_child[0])).unwrap();
|
||||
assert_eq!(recvr.try_recv().unwrap(), vec![slots[2]]);
|
||||
|
||||
// Insert the blobs for the orphan_slot
|
||||
ledger
|
||||
.insert_data_blobs(&orphan_blobs[1..entries_per_slot as usize])
|
||||
.unwrap();
|
||||
assert!(recvr.try_recv().is_err());
|
||||
|
||||
// Insert first blob, slot should now be considered complete
|
||||
ledger.insert_data_blobs(once(&orphan_blobs[0])).unwrap();
|
||||
assert_eq!(recvr.try_recv().unwrap(), vec![slots[1]]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_completed_blobs_signal_many() {
|
||||
// Initialize ledger
|
||||
let ledger_path = get_tmp_ledger_path("test_completed_blobs_signal_many");
|
||||
let (ledger, _, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap();
|
||||
let ledger = Arc::new(ledger);
|
||||
|
||||
let entries_per_slot = 10;
|
||||
let mut slots = vec![2, 5, 10];
|
||||
let all_blobs = make_chaining_slot_entries(&slots[..], entries_per_slot);
|
||||
let disconnected_slot = 4;
|
||||
|
||||
let (ref blobs0, _) = all_blobs[0];
|
||||
let (ref blobs1, _) = all_blobs[1];
|
||||
let (ref blobs2, _) = all_blobs[2];
|
||||
let (ref blobs3, _) = make_slot_entries(disconnected_slot, 1, entries_per_slot);
|
||||
|
||||
let mut all_blobs: Vec<_> = vec![blobs0, blobs1, blobs2, blobs3]
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect();
|
||||
|
||||
all_blobs.shuffle(&mut thread_rng());
|
||||
ledger.insert_data_blobs(all_blobs).unwrap();
|
||||
let mut result = recvr.try_recv().unwrap();
|
||||
result.sort();
|
||||
slots.push(disconnected_slot);
|
||||
slots.sort();
|
||||
assert_eq!(result, slots);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_handle_chaining_basic() {
|
||||
let blocktree_path = get_tmp_ledger_path("test_handle_chaining_basic");
|
||||
@@ -3375,4 +3512,28 @@ pub mod tests {
|
||||
|
||||
(blobs, entries)
|
||||
}
|
||||
|
||||
// Create blobs for slots that have a parent-child relationship defined by the input `chain`
|
||||
pub fn make_chaining_slot_entries(
|
||||
chain: &[u64],
|
||||
entries_per_slot: u64,
|
||||
) -> Vec<(Vec<Blob>, Vec<Entry>)> {
|
||||
let mut slots_blobs_and_entries = vec![];
|
||||
for (i, slot) in chain.iter().enumerate() {
|
||||
let parent_slot = {
|
||||
if *slot == 0 {
|
||||
0
|
||||
} else if i == 0 {
|
||||
std::u64::MAX
|
||||
} else {
|
||||
chain[i - 1]
|
||||
}
|
||||
};
|
||||
|
||||
let result = make_slot_entries(*slot, parent_slot, entries_per_slot);
|
||||
slots_blobs_and_entries.push(result);
|
||||
}
|
||||
|
||||
slots_blobs_and_entries
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user