Upgrade Repair be more intelligent and agressive (bp #6789) (#6793)

automerge
This commit is contained in:
mergify[bot]
2019-11-07 21:36:53 -08:00
committed by Grimes
parent e599a90333
commit 80d780d666
12 changed files with 236 additions and 71 deletions

View File

@@ -20,10 +20,11 @@ use rocksdb::DBRawIterator;
use solana_measure::measure::Measure;
use solana_metrics::{datapoint_debug, datapoint_error};
use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::clock::Slot;
use solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SECOND};
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::timestamp;
use std::cell::RefCell;
use std::cmp;
use std::collections::HashMap;
@@ -41,6 +42,7 @@ thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::
.unwrap()));
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
pub const MAX_TURBINE_PROPAGATION_DELAY_TICKS: u64 = 16;
pub type CompletedSlotsReceiver = Receiver<Vec<u64>>;
@@ -833,6 +835,7 @@ impl Blocktree {
slot_meta,
index as u32,
new_consumed,
shred.reference_tick(),
);
data_index.set_present(index, true);
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
@@ -909,7 +912,7 @@ impl Blocktree {
},
|v| v,
);
let mut shredder = Shredder::new(current_slot, parent_slot, 0.0, keypair.clone())
let mut shredder = Shredder::new(current_slot, parent_slot, 0.0, keypair.clone(), 0)
.expect("Failed to create entry shredder");
let mut all_shreds = vec![];
let mut slot_entries = vec![];
@@ -932,8 +935,14 @@ impl Blocktree {
shredder.entries_to_shreds(&current_entries, true, start_index);
all_shreds.append(&mut data_shreds);
all_shreds.append(&mut coding_shreds);
shredder = Shredder::new(current_slot, parent_slot, 0.0, keypair.clone())
.expect("Failed to create entry shredder");
shredder = Shredder::new(
current_slot,
parent_slot,
0.0,
keypair.clone(),
(ticks_per_slot - remaining_ticks_in_slot) as u8,
)
.expect("Failed to create entry shredder");
}
if entry.is_tick() {
@@ -970,7 +979,8 @@ impl Blocktree {
// for the slot with the specified slot
fn find_missing_indexes<C>(
db_iterator: &mut DBRawIterator,
slot: u64,
slot: Slot,
first_timestamp: u64,
start_index: u64,
end_index: u64,
max_missing: usize,
@@ -983,6 +993,8 @@ impl Blocktree {
}
let mut missing_indexes = vec![];
let ticks_since_first_insert =
DEFAULT_TICKS_PER_SECOND * (timestamp() - first_timestamp) / 1000;
// Seek to the first shred with index >= start_index
db_iterator.seek(&C::key((slot, start_index)));
@@ -1010,7 +1022,15 @@ impl Blocktree {
};
let upper_index = cmp::min(current_index, end_index);
// the tick that will be used to figure out the timeout for this hole
let reference_tick = u64::from(Shred::reference_tick_from_data(
&db_iterator.value().expect("couldn't read value"),
));
if ticks_since_first_insert < reference_tick + MAX_TURBINE_PROPAGATION_DELAY_TICKS {
// The higher index holes have not timed out yet
break 'outer;
}
for i in prev_index..upper_index {
missing_indexes.push(i);
if missing_indexes.len() == max_missing {
@@ -1035,7 +1055,8 @@ impl Blocktree {
pub fn find_missing_data_indexes(
&self,
slot: u64,
slot: Slot,
first_timestamp: u64,
start_index: u64,
end_index: u64,
max_missing: usize,
@@ -1047,6 +1068,7 @@ impl Blocktree {
Self::find_missing_indexes::<cf::ShredData>(
&mut db_iterator,
slot,
first_timestamp,
start_index,
end_index,
max_missing,
@@ -1311,10 +1333,17 @@ fn update_slot_meta(
slot_meta: &mut SlotMeta,
index: u32,
new_consumed: u64,
reference_tick: u8,
) {
let maybe_first_insert = slot_meta.received == 0;
// Index is zero-indexed, while the "received" height starts from 1,
// so received = index + 1 for the same shred.
slot_meta.received = cmp::max((u64::from(index) + 1) as u64, slot_meta.received);
if maybe_first_insert && slot_meta.received > 0 {
// predict the timestamp of what would have been the first shred in this slot
let slot_time_elapsed = u64::from(reference_tick) * 1000 / DEFAULT_TICKS_PER_SECOND;
slot_meta.first_shred_timestamp = timestamp() - slot_time_elapsed;
}
slot_meta.consumed = new_consumed;
slot_meta.last_index = {
// If the last index in the slot hasn't been set before, then
@@ -1707,7 +1736,7 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re
let entries = create_ticks(ticks_per_slot, genesis_block.hash());
let last_hash = entries.last().unwrap().hash;
let shredder = Shredder::new(0, 0, 0.0, Arc::new(Keypair::new()))
let shredder = Shredder::new(0, 0, 0.0, Arc::new(Keypair::new()), 0)
.expect("Failed to create entry shredder");
let shreds = shredder.entries_to_shreds(&entries, true, 0).0;
assert!(shreds.last().unwrap().last_in_slot());
@@ -1792,7 +1821,7 @@ pub fn entries_to_test_shreds(
parent_slot: u64,
is_full_slot: bool,
) -> Vec<Shred> {
let shredder = Shredder::new(slot, parent_slot, 0.0, Arc::new(Keypair::new()))
let shredder = Shredder::new(slot, parent_slot, 0.0, Arc::new(Keypair::new()), 0)
.expect("Failed to create entry shredder");
shredder.entries_to_shreds(&entries, is_full_slot, 0).0
@@ -3163,27 +3192,27 @@ pub mod tests {
// range of [0, gap)
let expected: Vec<u64> = (1..gap).collect();
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap, gap as usize),
blocktree.find_missing_data_indexes(slot, 0, 0, gap, gap as usize),
expected
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 1, gap, (gap - 1) as usize),
blocktree.find_missing_data_indexes(slot, 0, 1, gap, (gap - 1) as usize),
expected,
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap - 1, (gap - 1) as usize),
blocktree.find_missing_data_indexes(slot, 0, 0, gap - 1, (gap - 1) as usize),
&expected[..expected.len() - 1],
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, gap - 2, gap, gap as usize),
blocktree.find_missing_data_indexes(slot, 0, gap - 2, gap, gap as usize),
vec![gap - 2, gap - 1],
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, gap - 2, gap, 1),
blocktree.find_missing_data_indexes(slot, 0, gap - 2, gap, 1),
vec![gap - 2],
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap, 1),
blocktree.find_missing_data_indexes(slot, 0, 0, gap, 1),
vec![1],
);
@@ -3192,11 +3221,11 @@ pub mod tests {
let mut expected: Vec<u64> = (1..gap).collect();
expected.push(gap + 1);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap + 2) as usize),
blocktree.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap + 2) as usize),
expected,
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap - 1) as usize),
blocktree.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap - 1) as usize),
&expected[..expected.len() - 1],
);
@@ -3212,6 +3241,7 @@ pub mod tests {
assert_eq!(
blocktree.find_missing_data_indexes(
slot,
0,
j * gap,
i * gap,
((i - j) * gap) as usize
@@ -3225,6 +3255,34 @@ pub mod tests {
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_find_missing_data_indexes_timeout() {
let slot = 0;
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Write entries
let gap: u64 = 10;
let shreds: Vec<_> = (0..64)
.map(|i| Shred::new_from_data(slot, (i * gap) as u32, 0, None, false, false, i as u8))
.collect();
blocktree.insert_shreds(shreds, None).unwrap();
let empty: Vec<u64> = vec![];
assert_eq!(
blocktree.find_missing_data_indexes(slot, timestamp(), 0, 50, 1),
empty
);
let expected: Vec<_> = (1..=9).collect();
assert_eq!(
blocktree.find_missing_data_indexes(slot, timestamp() - 400, 0, 50, 9),
expected
);
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_find_missing_data_indexes_sanity() {
let slot = 0;
@@ -3234,10 +3292,10 @@ pub mod tests {
// Early exit conditions
let empty: Vec<u64> = vec![];
assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 0, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 5, 5, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 0, 0, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 5, 5, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 4, 3, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 1, 2, 0), empty);
let entries = create_ticks(100, Hash::default());
let mut shreds = entries_to_test_shreds(entries, slot, 0, true);
@@ -3261,7 +3319,7 @@ pub mod tests {
// [i, first_index - 1]
for start in 0..STARTS {
let result = blocktree.find_missing_data_indexes(
slot, start, // start
slot, 0, start, // start
END, //end
MAX, //max
);
@@ -3291,7 +3349,7 @@ pub mod tests {
for i in 0..num_shreds as u64 {
for j in 0..i {
assert_eq!(
blocktree.find_missing_data_indexes(slot, j, i, (i - j) as usize),
blocktree.find_missing_data_indexes(slot, 0, j, i, (i - j) as usize),
empty
);
}
@@ -3816,6 +3874,7 @@ pub mod tests {
Some(&[1, 1, 1]),
true,
true,
0,
)];
// With the corruption, nothing should be returned, even though an

View File

@@ -12,11 +12,13 @@ pub struct SlotMeta {
// The total number of consecutive blobs starting from index 0
// we have received for this slot.
pub consumed: u64,
// The index *plus one* of the highest blob received for this slot. Useful
// for checking if the slot has received any blobs yet, and to calculate the
// The index *plus one* of the highest shred received for this slot. Useful
// for checking if the slot has received any shreds yet, and to calculate the
// range where there is one or more holes: `(consumed..received)`.
pub received: u64,
// The index of the blob that is flagged as the last blob for this slot.
// The timestamp of the first time a shred was added for this slot
pub first_shred_timestamp: u64,
// The index of the shred that is flagged as the last shred for this slot.
pub last_index: u64,
// The slot height of the block this one derives from.
pub parent_slot: u64,
@@ -31,7 +33,7 @@ pub struct SlotMeta {
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
/// Index recording presence/absence of blobs
/// Index recording presence/absence of shreds
pub struct Index {
pub slot: u64,
data: DataIndex,
@@ -40,14 +42,14 @@ pub struct Index {
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct DataIndex {
/// Map representing presence/absence of data blobs
/// Map representing presence/absence of data shreds
index: BTreeSet<u64>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
/// Erasure coding information
pub struct CodingIndex {
/// Map from set index, to hashmap from blob index to presence bool
/// Map from set index, to hashmap from shred index to presence bool
index: BTreeSet<u64>,
}
@@ -145,8 +147,8 @@ impl DataIndex {
impl SlotMeta {
pub fn is_full(&self) -> bool {
// last_index is std::u64::MAX when it has no information about how
// many blobs will fill this slot.
// Note: A full slot with zero blobs is not possible.
// many shreds will fill this slot.
// Note: A full slot with zero shreds is not possible.
if self.last_index == std::u64::MAX {
return false;
}
@@ -179,6 +181,7 @@ impl SlotMeta {
slot,
consumed: 0,
received: 0,
first_shred_timestamp: 0,
parent_slot,
next_slots: vec![],
is_connected: slot == 0,

View File

@@ -8,12 +8,15 @@ use rayon::ThreadPool;
use serde::{Deserialize, Serialize};
use solana_metrics::datapoint_debug;
use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::hash::Hash;
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use std::sync::Arc;
use std::time::Instant;
use solana_sdk::{
clock::Slot,
hash::Hash,
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
signature::{Keypair, KeypairUtil, Signature},
};
use std::mem::size_of;
use std::{sync::Arc, time::Instant};
/// The following constants are computed by hand, and hardcoded.
/// `test_shred_constants` ensures that the values are correct.
@@ -41,8 +44,9 @@ pub const CODING_SHRED: u8 = 0b0101_1010;
pub const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 32;
pub const RECOMMENDED_FEC_RATE: f32 = 1.0;
const LAST_SHRED_IN_SLOT: u8 = 0b0000_0001;
pub const DATA_COMPLETE_SHRED: u8 = 0b0000_0010;
pub const SHRED_TICK_REFERENCE_MASK: u8 = 0b0011_1111;
const LAST_SHRED_IN_SLOT: u8 = 0b1000_0000;
pub const DATA_COMPLETE_SHRED: u8 = 0b0100_0000;
#[derive(Debug)]
pub enum ShredError {
@@ -131,6 +135,7 @@ impl Shred {
data: Option<&[u8]>,
is_last_data: bool,
is_last_in_slot: bool,
reference_tick: u8,
) -> Self {
let mut payload = vec![0; PACKET_DATA_SIZE];
let mut common_header = ShredCommonHeader::default();
@@ -139,6 +144,7 @@ impl Shred {
let mut data_header = DataShredHeader::default();
data_header.parent_offset = parent_offset;
data_header.flags = reference_tick.min(SHRED_TICK_REFERENCE_MASK);
if is_last_data {
data_header.flags |= DATA_COMPLETE_SHRED
@@ -324,6 +330,19 @@ impl Shred {
}
}
pub fn reference_tick(&self) -> u8 {
if self.is_data() {
self.data_header.flags & SHRED_TICK_REFERENCE_MASK
} else {
SHRED_TICK_REFERENCE_MASK
}
}
pub fn reference_tick_from_data(data: &[u8]) -> u8 {
let flags = data[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER - size_of::<u8>()];
flags & SHRED_TICK_REFERENCE_MASK
}
pub fn verify(&self, pubkey: &Pubkey) -> bool {
self.signature()
.verify(pubkey.as_ref(), &self.payload[SIZE_OF_SIGNATURE..])
@@ -337,10 +356,17 @@ pub struct Shredder {
fec_rate: f32,
keypair: Arc<Keypair>,
pub signing_coding_time: u128,
reference_tick: u8,
}
impl Shredder {
pub fn new(slot: u64, parent_slot: u64, fec_rate: f32, keypair: Arc<Keypair>) -> Result<Self> {
pub fn new(
slot: Slot,
parent_slot: Slot,
fec_rate: f32,
keypair: Arc<Keypair>,
reference_tick: u8,
) -> Result<Self> {
if fec_rate > 1.0 || fec_rate < 0.0 {
Err(ShredError::InvalidFecRate(fec_rate))
} else if slot < parent_slot || slot - parent_slot > u64::from(std::u16::MAX) {
@@ -352,6 +378,7 @@ impl Shredder {
fec_rate,
keypair,
signing_coding_time: 0,
reference_tick,
})
}
}
@@ -395,6 +422,7 @@ impl Shredder {
Some(shred_data),
is_last_data,
is_last_in_slot,
self.reference_tick,
);
Shredder::sign_shred(&self.keypair, &mut shred);
@@ -797,7 +825,7 @@ pub mod tests {
// Test that parent cannot be > current slot
assert_matches!(
Shredder::new(slot, slot + 1, 1.00, keypair.clone()),
Shredder::new(slot, slot + 1, 1.00, keypair.clone(), 0),
Err(ShredError::SlotTooLow {
slot: _,
parent_slot: _,
@@ -805,7 +833,7 @@ pub mod tests {
);
// Test that slot - parent cannot be > u16 MAX
assert_matches!(
Shredder::new(slot, slot - 1 - 0xffff, 1.00, keypair.clone()),
Shredder::new(slot, slot - 1 - 0xffff, 1.00, keypair.clone(), 0),
Err(ShredError::SlotTooLow {
slot: _,
parent_slot: _,
@@ -814,7 +842,7 @@ pub mod tests {
let fec_rate = 0.25;
let parent_slot = slot - 5;
let shredder = Shredder::new(slot, parent_slot, fec_rate, keypair.clone())
let shredder = Shredder::new(slot, parent_slot, fec_rate, keypair.clone(), 0)
.expect("Failed in creating shredder");
let entries: Vec<_> = (0..5)
@@ -889,7 +917,7 @@ pub mod tests {
let slot = 1;
let parent_slot = 0;
let shredder = Shredder::new(slot, parent_slot, 0.0, keypair.clone())
let shredder = Shredder::new(slot, parent_slot, 0.0, keypair.clone(), 0)
.expect("Failed in creating shredder");
let entries: Vec<_> = (0..5)
@@ -909,6 +937,72 @@ pub mod tests {
assert_eq!(deserialized_shred, *data_shreds.last().unwrap());
}
#[test]
fn test_shred_reference_tick() {
let keypair = Arc::new(Keypair::new());
let slot = 1;
let parent_slot = 0;
let shredder = Shredder::new(slot, parent_slot, 0.0, keypair.clone(), 5)
.expect("Failed in creating shredder");
let entries: Vec<_> = (0..5)
.map(|_| {
let keypair0 = Keypair::new();
let keypair1 = Keypair::new();
let tx0 =
system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());
Entry::new(&Hash::default(), 1, vec![tx0])
})
.collect();
let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;
data_shreds.iter().for_each(|s| {
assert_eq!(s.reference_tick(), 5);
assert_eq!(Shred::reference_tick_from_data(&s.payload), 5);
});
let deserialized_shred =
Shred::new_from_serialized_shred(data_shreds.last().unwrap().payload.clone()).unwrap();
assert_eq!(deserialized_shred.reference_tick(), 5);
}
#[test]
fn test_shred_reference_tick_overflow() {
let keypair = Arc::new(Keypair::new());
let slot = 1;
let parent_slot = 0;
let shredder = Shredder::new(slot, parent_slot, 0.0, keypair.clone(), u8::max_value())
.expect("Failed in creating shredder");
let entries: Vec<_> = (0..5)
.map(|_| {
let keypair0 = Keypair::new();
let keypair1 = Keypair::new();
let tx0 =
system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());
Entry::new(&Hash::default(), 1, vec![tx0])
})
.collect();
let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;
data_shreds.iter().for_each(|s| {
assert_eq!(s.reference_tick(), SHRED_TICK_REFERENCE_MASK);
assert_eq!(
Shred::reference_tick_from_data(&s.payload),
SHRED_TICK_REFERENCE_MASK
);
});
let deserialized_shred =
Shred::new_from_serialized_shred(data_shreds.last().unwrap().payload.clone()).unwrap();
assert_eq!(
deserialized_shred.reference_tick(),
SHRED_TICK_REFERENCE_MASK
);
}
#[test]
fn test_data_and_code_shredder() {
let keypair = Arc::new(Keypair::new());
@@ -916,11 +1010,11 @@ pub mod tests {
let slot = 0x123456789abcdef0;
// Test that FEC rate cannot be > 1.0
assert_matches!(
Shredder::new(slot, slot - 5, 1.001, keypair.clone()),
Shredder::new(slot, slot - 5, 1.001, keypair.clone(), 0),
Err(ShredError::InvalidFecRate(_))
);
let shredder = Shredder::new(0x123456789abcdef0, slot - 5, 1.0, keypair.clone())
let shredder = Shredder::new(0x123456789abcdef0, slot - 5, 1.0, keypair.clone(), 0)
.expect("Failed in creating shredder");
// Create enough entries to make > 1 shred
@@ -962,7 +1056,7 @@ pub mod tests {
fn test_recovery_and_reassembly() {
let keypair = Arc::new(Keypair::new());
let slot = 0x123456789abcdef0;
let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone())
let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0)
.expect("Failed in creating shredder");
let keypair0 = Keypair::new();
@@ -1208,7 +1302,7 @@ pub mod tests {
fn test_multi_fec_block_coding() {
let keypair = Arc::new(Keypair::new());
let slot = 0x123456789abcdef0;
let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone())
let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0)
.expect("Failed in creating shredder");
let num_fec_sets = 100;