master: Add nonce to shreds repairs, add shred data size to header (#10109)

* Add nonce to shreds/repairs

* Add data shred size to header

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin
2020-05-19 12:38:18 -07:00
committed by GitHub
parent 427c78d891
commit 97f2bcff69
15 changed files with 598 additions and 256 deletions

View File

@ -464,7 +464,7 @@ pub mod test {
Vec<TransmitShreds>,
Vec<TransmitShreds>,
) {
let num_entries = max_ticks_per_n_shreds(num);
let num_entries = max_ticks_per_n_shreds(num, None);
let (data_shreds, _) = make_slot_entries(slot, 0, num_entries);
let keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(slot, 0, RECOMMENDED_FEC_RATE, keypair, 0, 0)

View File

@ -428,7 +428,7 @@ mod test {
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(leader_info.info));
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut genesis_config = create_genesis_config(10_000).genesis_config;
genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot) + 1;
genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot, None) + 1;
let bank0 = Arc::new(Bank::new(&genesis_config));
(
blockstore,
@ -537,7 +537,11 @@ mod test {
// Interrupting the slot should cause the unfinished_slot and stats to reset
let num_shreds = 1;
assert!(num_shreds < num_shreds_per_slot);
let ticks1 = create_ticks(max_ticks_per_n_shreds(num_shreds), 0, genesis_config.hash());
let ticks1 = create_ticks(
max_ticks_per_n_shreds(num_shreds, None),
0,
genesis_config.hash(),
);
let receive_results = ReceiveResults {
entries: ticks1.clone(),
time_elapsed: Duration::new(2, 0),

View File

@ -35,6 +35,7 @@ pub mod poh_recorder;
pub mod poh_service;
pub mod progress_map;
pub mod pubkey_references;
pub mod repair_response;
pub mod repair_service;
pub mod replay_stage;
mod result;

112
core/src/repair_response.rs Normal file
View File

@ -0,0 +1,112 @@
use solana_ledger::{
blockstore::Blockstore,
shred::{Nonce, SIZE_OF_NONCE},
};
use solana_perf::packet::limited_deserialize;
use solana_sdk::{clock::Slot, packet::Packet};
use std::{io, net::SocketAddr};
pub fn repair_response_packet(
blockstore: &Blockstore,
slot: Slot,
shred_index: u64,
dest: &SocketAddr,
nonce: Nonce,
) -> Option<Packet> {
let shred = blockstore
.get_data_shred(slot, shred_index)
.expect("Blockstore could not get data shred");
shred
.map(|shred| repair_response_packet_from_shred(shred, dest, nonce))
.unwrap_or(None)
}
pub fn repair_response_packet_from_shred(
shred: Vec<u8>,
dest: &SocketAddr,
nonce: Nonce,
) -> Option<Packet> {
let mut packet = Packet::default();
packet.meta.size = shred.len() + SIZE_OF_NONCE;
if packet.meta.size > packet.data.len() {
return None;
}
packet.meta.set_addr(dest);
packet.data[..shred.len()].copy_from_slice(&shred);
let mut wr = io::Cursor::new(&mut packet.data[shred.len()..]);
bincode::serialize_into(&mut wr, &nonce).expect("Buffer not large enough to fit nonce");
Some(packet)
}
pub fn nonce(buf: &[u8]) -> Option<Nonce> {
if buf.len() < SIZE_OF_NONCE {
None
} else {
limited_deserialize(&buf[buf.len() - SIZE_OF_NONCE..]).ok()
}
}
#[cfg(test)]
mod test {
use super::*;
use solana_ledger::{
shred::{Shred, Shredder},
sigverify_shreds::verify_shred_cpu,
};
use solana_sdk::signature::{Keypair, Signer};
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr},
};
fn run_test_sigverify_shred_cpu_repair(slot: Slot) {
solana_logger::setup();
let mut shred = Shred::new_from_data(
slot,
0xc0de,
0xdead,
Some(&[1, 2, 3, 4]),
true,
true,
0,
0,
0xc0de,
);
assert_eq!(shred.slot(), slot);
let keypair = Keypair::new();
Shredder::sign_shred(&keypair, &mut shred);
trace!("signature {}", shred.common_header.signature);
let nonce = 9;
let mut packet = repair_response_packet_from_shred(
shred.payload,
&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
nonce,
)
.unwrap();
packet.meta.repair = true;
let leader_slots = [(slot, keypair.pubkey().to_bytes())]
.iter()
.cloned()
.collect();
let rv = verify_shred_cpu(&packet, &leader_slots);
assert_eq!(rv, Some(1));
let wrong_keypair = Keypair::new();
let leader_slots = [(slot, wrong_keypair.pubkey().to_bytes())]
.iter()
.cloned()
.collect();
let rv = verify_shred_cpu(&packet, &leader_slots);
assert_eq!(rv, Some(0));
let leader_slots = HashMap::new();
let rv = verify_shred_cpu(&packet, &leader_slots);
assert_eq!(rv, None);
}
#[test]
fn test_sigverify_shred_cpu_repair() {
run_test_sigverify_shred_cpu_repair(0xdead_c0de);
}
}

View File

@ -5,12 +5,13 @@ use crate::{
cluster_slots::ClusterSlots,
consensus::VOTE_THRESHOLD_SIZE,
result::Result,
serve_repair::{RepairType, ServeRepair},
serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use solana_ledger::{
bank_forks::BankForks,
blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta},
shred::Nonce,
};
use solana_runtime::bank::Bank;
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
@ -104,7 +105,7 @@ impl RepairService {
&blockstore,
&exit,
&repair_socket,
&cluster_info,
cluster_info,
repair_info,
&cluster_slots,
)
@ -118,19 +119,19 @@ impl RepairService {
blockstore: &Blockstore,
exit: &AtomicBool,
repair_socket: &UdpSocket,
cluster_info: &Arc<ClusterInfo>,
cluster_info: Arc<ClusterInfo>,
repair_info: RepairInfo,
cluster_slots: &Arc<ClusterSlots>,
cluster_slots: &ClusterSlots,
) {
let serve_repair = ServeRepair::new(cluster_info.clone());
let id = cluster_info.id();
Self::initialize_lowest_slot(id, blockstore, cluster_info);
Self::initialize_lowest_slot(id, blockstore, &cluster_info);
let mut repair_stats = RepairStats::default();
let mut last_stats = Instant::now();
let mut duplicate_slot_repair_statuses = HashMap::new();
Self::initialize_epoch_slots(
blockstore,
cluster_info,
&cluster_info,
&repair_info.completed_slots_receiver,
);
loop {
@ -144,7 +145,7 @@ impl RepairService {
let lowest_slot = blockstore.lowest_slot();
Self::update_lowest_slot(&id, lowest_slot, &cluster_info);
Self::update_completed_slots(&repair_info.completed_slots_receiver, &cluster_info);
cluster_slots.update(new_root, cluster_info, &repair_info.bank_forks);
cluster_slots.update(new_root, &cluster_info, &repair_info.bank_forks);
let new_duplicate_slots = Self::find_new_duplicate_slots(
&duplicate_slot_repair_statuses,
blockstore,
@ -178,27 +179,19 @@ impl RepairService {
if let Ok(repairs) = repairs {
let mut cache = HashMap::new();
let reqs: Vec<((SocketAddr, Vec<u8>), RepairType)> = repairs
.into_iter()
.filter_map(|repair_request| {
serve_repair
.repair_request(
&cluster_slots,
&repair_request,
&mut cache,
&mut repair_stats,
)
.map(|result| (result, repair_request))
.ok()
})
.collect();
for ((to, req), _) in reqs {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
0
});
}
repairs.into_iter().for_each(|repair_request| {
if let Ok((to, req)) = serve_repair.repair_request(
&cluster_slots,
repair_request,
&mut cache,
&mut repair_stats,
) {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
0
});
}
});
}
if last_stats.elapsed().as_secs() > 1 {
@ -326,6 +319,7 @@ impl RepairService {
&repair_addr,
serve_repair,
repair_stats,
DEFAULT_NONCE,
) {
info!("repair req send_to({}) error {:?}", repair_addr, e);
}
@ -346,8 +340,9 @@ impl RepairService {
to: &SocketAddr,
serve_repair: &ServeRepair,
repair_stats: &mut RepairStats,
nonce: Nonce,
) -> Result<()> {
let req = serve_repair.map_repair_request(&repair_type, repair_stats)?;
let req = serve_repair.map_repair_request(&repair_type, repair_stats, nonce)?;
repair_socket.send_to(&req, to)?;
Ok(())
}
@ -740,7 +735,7 @@ mod test {
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1;
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
for (mut slot_shreds, _) in shreds.into_iter() {
@ -850,7 +845,7 @@ mod test {
);
// Insert some shreds to create a SlotMeta, should make repairs
let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1;
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot);
blockstore
.insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false)
@ -883,7 +878,7 @@ mod test {
};
// Insert some shreds to create a SlotMeta,
let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1;
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot);
blockstore
.insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false)

View File

@ -2,16 +2,17 @@ use crate::{
cluster_info::{ClusterInfo, ClusterInfoError},
cluster_slots::ClusterSlots,
contact_info::ContactInfo,
repair_response,
repair_service::RepairStats,
result::{Error, Result},
weighted_shuffle::weighted_best,
};
use bincode::serialize;
use solana_ledger::blockstore::Blockstore;
use solana_ledger::{blockstore::Blockstore, shred::Nonce};
use solana_measure::measure::Measure;
use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint_debug, inc_new_counter_debug};
use solana_perf::packet::{limited_deserialize, Packet, Packets, PacketsRecycler};
use solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler};
use solana_sdk::{
clock::Slot,
pubkey::Pubkey,
@ -30,6 +31,7 @@ use std::{
/// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
pub const DEFAULT_NONCE: u32 = 42;
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
pub enum RepairType {
@ -65,6 +67,9 @@ pub enum RepairProtocol {
WindowIndex(ContactInfo, u64, u64),
HighestWindowIndex(ContactInfo, u64, u64),
Orphan(ContactInfo, u64),
WindowIndexWithNonce(ContactInfo, u64, u64, Nonce),
HighestWindowIndexWithNonce(ContactInfo, u64, u64, Nonce),
OrphanWithNonce(ContactInfo, u64, Nonce),
}
#[derive(Clone)]
@ -107,6 +112,9 @@ impl ServeRepair {
RepairProtocol::WindowIndex(ref from, _, _) => from,
RepairProtocol::HighestWindowIndex(ref from, _, _) => from,
RepairProtocol::Orphan(ref from, _) => from,
RepairProtocol::WindowIndexWithNonce(ref from, _, _, _) => from,
RepairProtocol::HighestWindowIndexWithNonce(ref from, _, _, _) => from,
RepairProtocol::OrphanWithNonce(ref from, _, _) => from,
}
}
@ -130,7 +138,7 @@ impl ServeRepair {
let (res, label) = {
match &request {
RepairProtocol::WindowIndex(from, slot, shred_index) => {
RepairProtocol::WindowIndexWithNonce(_, slot, shred_index, nonce) => {
stats.window_index += 1;
(
Self::run_window_request(
@ -141,12 +149,12 @@ impl ServeRepair {
&me.read().unwrap().my_info,
*slot,
*shred_index,
*nonce,
),
"WindowIndex",
"WindowIndexWithNonce",
)
}
RepairProtocol::HighestWindowIndex(_, slot, highest_index) => {
RepairProtocol::HighestWindowIndexWithNonce(_, slot, highest_index, nonce) => {
stats.highest_window_index += 1;
(
Self::run_highest_window_request(
@ -155,11 +163,12 @@ impl ServeRepair {
blockstore,
*slot,
*highest_index,
*nonce,
),
"HighestWindowIndex",
"HighestWindowIndexWithNonce",
)
}
RepairProtocol::Orphan(_, slot) => {
RepairProtocol::OrphanWithNonce(_, slot, nonce) => {
stats.orphan += 1;
(
Self::run_orphan(
@ -168,10 +177,12 @@ impl ServeRepair {
blockstore,
*slot,
MAX_ORPHAN_REPAIR_RESPONSES,
*nonce,
),
"Orphan",
"OrphanWithNonce",
)
}
_ => (None, "Unsupported repair type"),
}
};
@ -331,20 +342,36 @@ impl ServeRepair {
});
}
fn window_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> {
let req = RepairProtocol::WindowIndex(self.my_info.clone(), slot, shred_index);
fn window_index_request_bytes(
&self,
slot: Slot,
shred_index: u64,
nonce: Nonce,
) -> Result<Vec<u8>> {
let req =
RepairProtocol::WindowIndexWithNonce(self.my_info.clone(), slot, shred_index, nonce);
let out = serialize(&req)?;
Ok(out)
}
fn window_highest_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> {
let req = RepairProtocol::HighestWindowIndex(self.my_info.clone(), slot, shred_index);
fn window_highest_index_request_bytes(
&self,
slot: Slot,
shred_index: u64,
nonce: Nonce,
) -> Result<Vec<u8>> {
let req = RepairProtocol::HighestWindowIndexWithNonce(
self.my_info.clone(),
slot,
shred_index,
nonce,
);
let out = serialize(&req)?;
Ok(out)
}
fn orphan_bytes(&self, slot: Slot) -> Result<Vec<u8>> {
let req = RepairProtocol::Orphan(self.my_info.clone(), slot);
fn orphan_bytes(&self, slot: Slot, nonce: Nonce) -> Result<Vec<u8>> {
let req = RepairProtocol::OrphanWithNonce(self.my_info.clone(), slot, nonce);
let out = serialize(&req)?;
Ok(out)
}
@ -352,24 +379,25 @@ impl ServeRepair {
pub fn repair_request(
&self,
cluster_slots: &ClusterSlots,
repair_request: &RepairType,
repair_request: RepairType,
cache: &mut RepairCache,
repair_stats: &mut RepairStats,
) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
if cache.get(&repair_request.slot()).is_none() {
let repair_peers: Vec<_> = self.cluster_info.repair_peers(repair_request.slot());
let slot = repair_request.slot();
if cache.get(&slot).is_none() {
let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot);
if repair_peers.is_empty() {
return Err(ClusterInfoError::NoPeers.into());
}
let weights = cluster_slots.compute_weights(repair_request.slot(), &repair_peers);
cache.insert(repair_request.slot(), (repair_peers, weights));
let weights = cluster_slots.compute_weights(slot, &repair_peers);
cache.insert(slot, (repair_peers, weights));
}
let (repair_peers, weights) = cache.get(&repair_request.slot()).unwrap();
let (repair_peers, weights) = cache.get(&slot).unwrap();
let n = weighted_best(&weights, Pubkey::new_rand().to_bytes());
let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port
let out = self.map_repair_request(repair_request, repair_stats)?;
let out = self.map_repair_request(&repair_request, repair_stats, DEFAULT_NONCE)?;
Ok((addr, out))
}
@ -391,19 +419,20 @@ impl ServeRepair {
&self,
repair_request: &RepairType,
repair_stats: &mut RepairStats,
nonce: Nonce,
) -> Result<Vec<u8>> {
match repair_request {
RepairType::Shred(slot, shred_index) => {
repair_stats.shred.update(*slot);
Ok(self.window_index_request_bytes(*slot, *shred_index)?)
Ok(self.window_index_request_bytes(*slot, *shred_index, nonce)?)
}
RepairType::HighestShred(slot, shred_index) => {
repair_stats.highest_shred.update(*slot);
Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?)
Ok(self.window_highest_index_request_bytes(*slot, *shred_index, nonce)?)
}
RepairType::Orphan(slot) => {
repair_stats.orphan.update(*slot);
Ok(self.orphan_bytes(*slot)?)
Ok(self.orphan_bytes(*slot, nonce)?)
}
}
}
@ -416,12 +445,19 @@ impl ServeRepair {
me: &ContactInfo,
slot: Slot,
shred_index: u64,
nonce: Nonce,
) -> Option<Packets> {
if let Some(blockstore) = blockstore {
// Try to find the requested index in one of the slots
let packet = Self::get_data_shred_as_packet(blockstore, slot, shred_index, from_addr);
let packet = repair_response::repair_response_packet(
blockstore,
slot,
shred_index,
from_addr,
nonce,
);
if let Ok(Some(packet)) = packet {
if let Some(packet) = packet {
inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
return Some(Packets::new_with_recycler_data(
recycler,
@ -449,15 +485,20 @@ impl ServeRepair {
blockstore: Option<&Arc<Blockstore>>,
slot: Slot,
highest_index: u64,
nonce: Nonce,
) -> Option<Packets> {
let blockstore = blockstore?;
// Try to find the requested index in one of the slots
let meta = blockstore.meta(slot).ok()??;
if meta.received > highest_index {
// meta.received must be at least 1 by this point
let packet =
Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr)
.ok()??;
let packet = repair_response::repair_response_packet(
blockstore,
slot,
meta.received - 1,
from_addr,
nonce,
)?;
return Some(Packets::new_with_recycler_data(
recycler,
"run_highest_window_request",
@ -473,6 +514,7 @@ impl ServeRepair {
blockstore: Option<&Arc<Blockstore>>,
mut slot: Slot,
max_responses: usize,
nonce: Nonce,
) -> Option<Packets> {
let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan");
if let Some(blockstore) = blockstore {
@ -481,9 +523,14 @@ impl ServeRepair {
if meta.received == 0 {
break;
}
let packet =
Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr);
if let Ok(Some(packet)) = packet {
let packet = repair_response::repair_response_packet(
blockstore,
slot,
meta.received - 1,
from_addr,
nonce,
);
if let Some(packet) = packet {
res.packets.push(packet);
}
if meta.is_parent_set() && res.packets.len() <= max_responses {
@ -498,28 +545,12 @@ impl ServeRepair {
}
Some(res)
}
fn get_data_shred_as_packet(
blockstore: &Arc<Blockstore>,
slot: Slot,
shred_index: u64,
dest: &SocketAddr,
) -> Result<Option<Packet>> {
let data = blockstore.get_data_shred(slot, shred_index)?;
Ok(data.map(|data| {
let mut packet = Packet::default();
packet.meta.size = data.len();
packet.meta.set_addr(dest);
packet.data.copy_from_slice(&data);
packet
}))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::result::Error;
use crate::{repair_response, result::Error};
use solana_ledger::get_tmp_ledger_path;
use solana_ledger::{
blockstore::make_many_slot_entries,
@ -530,9 +561,13 @@ mod tests {
};
use solana_sdk::{hash::Hash, pubkey::Pubkey, timing::timestamp};
/// test run_window_requestwindow requests respond with the right shred, and do not overrun
#[test]
fn run_highest_window_request() {
fn test_run_highest_window_request() {
run_highest_window_request(5, 3, 9);
}
/// test run_window_request responds with the right shred, and do not overrun
fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) {
let recycler = PacketsRecycler::default();
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
@ -544,41 +579,49 @@ mod tests {
Some(&blockstore),
0,
0,
nonce,
);
assert!(rv.is_none());
let _ = fill_blockstore_slot_with_ticks(
&blockstore,
max_ticks_per_n_shreds(1) + 1,
2,
1,
max_ticks_per_n_shreds(1, None) + 1,
slot,
slot - num_slots + 1,
Hash::default(),
);
let index = 1;
let rv = ServeRepair::run_highest_window_request(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
2,
1,
);
slot,
index,
nonce,
)
.expect("packets");
let rv: Vec<Shred> = rv
.expect("packets")
.packets
.into_iter()
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
.filter_map(|b| {
assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce);
Shred::new_from_serialized_shred(b.data.to_vec()).ok()
})
.collect();
assert!(!rv.is_empty());
let index = blockstore.meta(2).unwrap().unwrap().received - 1;
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
assert_eq!(rv[0].index(), index as u32);
assert_eq!(rv[0].slot(), 2);
assert_eq!(rv[0].slot(), slot);
let rv = ServeRepair::run_highest_window_request(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
2,
slot,
index + 1,
nonce,
);
assert!(rv.is_none());
}
@ -586,9 +629,13 @@ mod tests {
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
/// test window requests respond with the right shred, and do not overrun
#[test]
fn run_window_request() {
fn test_run_window_request() {
run_window_request(2, 9);
}
/// test window requests respond with the right shred, and do not overrun
fn run_window_request(slot: Slot, nonce: Nonce) {
let recycler = PacketsRecycler::default();
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
@ -615,12 +662,13 @@ mod tests {
&socketaddr_any!(),
Some(&blockstore),
&me,
slot,
0,
0,
nonce,
);
assert!(rv.is_none());
let mut common_header = ShredCommonHeader::default();
common_header.slot = 2;
common_header.slot = slot;
common_header.index = 1;
let mut data_header = DataShredHeader::default();
data_header.parent_offset = 1;
@ -634,24 +682,28 @@ mod tests {
.insert_shreds(vec![shred_info], None, false)
.expect("Expect successful ledger write");
let index = 1;
let rv = ServeRepair::run_window_request(
&recycler,
&me,
&socketaddr_any!(),
Some(&blockstore),
&me,
2,
1,
);
assert!(!rv.is_none());
slot,
index,
nonce,
)
.expect("packets");
let rv: Vec<Shred> = rv
.expect("packets")
.packets
.into_iter()
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
.filter_map(|b| {
assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce);
Shred::new_from_serialized_shred(b.data.to_vec()).ok()
})
.collect();
assert_eq!(rv[0].index(), 1);
assert_eq!(rv[0].slot(), 2);
assert_eq!(rv[0].slot(), slot);
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
@ -665,7 +717,7 @@ mod tests {
let serve_repair = ServeRepair::new(cluster_info.clone());
let rv = serve_repair.repair_request(
&cluster_slots,
&RepairType::Shred(0, 0),
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
);
@ -691,7 +743,7 @@ mod tests {
let rv = serve_repair
.repair_request(
&cluster_slots,
&RepairType::Shred(0, 0),
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
)
@ -723,7 +775,7 @@ mod tests {
let rv = serve_repair
.repair_request(
&cluster_slots,
&RepairType::Shred(0, 0),
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
)
@ -739,52 +791,75 @@ mod tests {
}
#[test]
fn run_orphan() {
fn test_run_orphan() {
run_orphan(2, 3, 9);
}
fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) {
solana_logger::setup();
let recycler = PacketsRecycler::default();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let rv =
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 2, 0);
let rv = ServeRepair::run_orphan(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
slot,
0,
nonce,
);
assert!(rv.is_none());
// Create slots 1, 2, 3 with 5 shreds apiece
let (shreds, _) = make_many_slot_entries(1, 3, 5);
// Create slots [slot, slot + num_slots) with 5 shreds apiece
let (shreds, _) = make_many_slot_entries(slot, num_slots, 5);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
// We don't have slot 4, so we don't know how to service this requeset
let rv =
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 4, 5);
// We don't have slot `slot + num_slots`, so we don't know how to service this request
let rv = ServeRepair::run_orphan(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
slot + num_slots,
5,
nonce,
);
assert!(rv.is_none());
// For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively
// for this request
let rv: Vec<_> =
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 3, 5)
.expect("run_orphan packets")
.packets
.iter()
.cloned()
.collect();
let expected: Vec<_> = (1..=3)
// For a orphan request for `slot + num_slots - 1`, we should return the highest shreds
// from slots in the range [slot, slot + num_slots - 1]
let rv: Vec<_> = ServeRepair::run_orphan(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
slot + num_slots - 1,
5,
nonce,
)
.expect("run_orphan packets")
.packets
.iter()
.cloned()
.collect();
// Verify responses
let expected: Vec<_> = (slot..slot + num_slots)
.rev()
.map(|slot| {
.filter_map(|slot| {
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
ServeRepair::get_data_shred_as_packet(
repair_response::repair_response_packet(
&blockstore,
slot,
index,
&socketaddr_any!(),
nonce,
)
.unwrap()
.unwrap()
})
.collect();
assert_eq!(rv, expected)
assert_eq!(rv, expected);
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");

View File

@ -4,8 +4,10 @@
use crate::{
cluster_info::ClusterInfo,
cluster_slots::ClusterSlots,
repair_response,
repair_service::{RepairInfo, RepairService},
result::{Error, Result},
serve_repair::DEFAULT_NONCE,
};
use crossbeam_channel::{
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
@ -13,24 +15,25 @@ use crossbeam_channel::{
use rayon::iter::IntoParallelRefMutIterator;
use rayon::iter::ParallelIterator;
use rayon::ThreadPool;
use solana_ledger::bank_forks::BankForks;
use solana_ledger::blockstore::{
self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT,
use solana_ledger::{
bank_forks::BankForks,
blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT},
leader_schedule_cache::LeaderScheduleCache,
shred::{Nonce, Shred},
};
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_ledger::shred::Shred;
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
use solana_perf::packet::Packets;
use solana_rayon_threadlimit::get_thread_count;
use solana_runtime::bank::Bank;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms;
use solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms};
use solana_streamer::streamer::PacketSender;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::{Duration, Instant};
use std::{
net::{SocketAddr, UdpSocket},
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
};
fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
if shred.is_data() {
@ -107,8 +110,15 @@ fn run_check_duplicate(
Ok(())
}
fn verify_repair(_shred: &Shred, repair_info: &Option<RepairMeta>) -> bool {
repair_info
.as_ref()
.map(|repair_info| repair_info.nonce == DEFAULT_NONCE)
.unwrap_or(true)
}
fn run_insert<F>(
shred_receiver: &CrossbeamReceiver<Vec<Shred>>,
shred_receiver: &CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
blockstore: &Arc<Blockstore>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
handle_duplicate: F,
@ -118,12 +128,16 @@ where
F: Fn(Shred) -> (),
{
let timer = Duration::from_millis(200);
let mut shreds = shred_receiver.recv_timeout(timer)?;
while let Ok(mut more_shreds) = shred_receiver.try_recv() {
shreds.append(&mut more_shreds)
let (mut shreds, mut repair_infos) = shred_receiver.recv_timeout(timer)?;
while let Ok((more_shreds, more_repair_infos)) = shred_receiver.try_recv() {
shreds.extend(more_shreds);
repair_infos.extend(more_repair_infos);
}
assert_eq!(shreds.len(), repair_infos.len());
let mut i = 0;
shreds.retain(|shred| (verify_repair(&shred, &repair_infos[i]), i += 1).0);
blockstore.insert_shreds_handle_duplicate(
shreds,
Some(leader_schedule_cache),
@ -136,7 +150,7 @@ where
fn recv_window<F>(
blockstore: &Arc<Blockstore>,
insert_shred_sender: &CrossbeamSender<Vec<Shred>>,
insert_shred_sender: &CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
my_pubkey: &Pubkey,
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
retransmit: &PacketSender,
@ -160,7 +174,7 @@ where
inc_new_counter_debug!("streamer-recv_window-recv", total_packets);
let last_root = blockstore.last_root();
let shreds: Vec<_> = thread_pool.install(|| {
let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
packets
.par_iter_mut()
.flat_map(|packets| {
@ -169,34 +183,58 @@ where
.iter_mut()
.filter_map(|packet| {
if packet.meta.discard {
inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1);
inc_new_counter_debug!(
"streamer-recv_window-invalid_or_unnecessary_packet",
1
);
None
} else if let Ok(shred) =
Shred::new_from_serialized_shred(packet.data.to_vec())
{
if shred_filter(&shred, last_root) {
// Mark slot as dead if the current shred is on the boundary
// of max shreds per slot. However, let the current shred
// get retransmitted. It'll allow peer nodes to see this shred
// and trigger them to mark the slot as dead.
if shred.index() >= (MAX_DATA_SHREDS_PER_SLOT - 1) as u32 {
let _ = blockstore.set_dead_slot(shred.slot());
} else {
// shred fetch stage should be sending packets
// with sufficiently large buffers. Needed to ensure
// call to `new_from_serialized_shred` is safe.
assert_eq!(packet.data.len(), PACKET_DATA_SIZE);
let serialized_shred = packet.data.to_vec();
if let Ok(shred) = Shred::new_from_serialized_shred(serialized_shred) {
let repair_info = {
if packet.meta.repair {
if let Some(nonce) = repair_response::nonce(&packet.data) {
let repair_info = RepairMeta {
_from_addr: packet.meta.addr(),
nonce,
};
Some(repair_info)
} else {
// If can't parse the nonce, dump the packet
return None;
}
} else {
None
}
};
if shred_filter(&shred, last_root) {
// Mark slot as dead if the current shred is on the boundary
// of max shreds per slot. However, let the current shred
// get retransmitted. It'll allow peer nodes to see this shred
// and trigger them to mark the slot as dead.
if shred.index() >= (MAX_DATA_SHREDS_PER_SLOT - 1) as u32 {
let _ = blockstore.set_dead_slot(shred.slot());
}
packet.meta.slot = shred.slot();
packet.meta.seed = shred.seed();
Some((shred, repair_info))
} else {
packet.meta.discard = true;
None
}
packet.meta.slot = shred.slot();
packet.meta.seed = shred.seed();
Some(shred)
} else {
packet.meta.discard = true;
None
}
} else {
packet.meta.discard = true;
None
}
})
.collect::<Vec<_>>()
})
.collect()
.unzip()
});
trace!("{:?} shreds from packets", shreds.len());
@ -210,7 +248,7 @@ where
}
}
insert_shred_sender.send(shreds)?;
insert_shred_sender.send((shreds, repair_infos))?;
trace!(
"Elapsed processing time in recv_window(): {}",
@ -220,6 +258,11 @@ where
Ok(())
}
struct RepairMeta {
_from_addr: SocketAddr,
nonce: Nonce,
}
// Implement a destructor for the window_service thread to signal it exited
// even on panics
struct Finalizer {
@ -340,7 +383,7 @@ impl WindowService {
exit: &Arc<AtomicBool>,
blockstore: &Arc<Blockstore>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
insert_receiver: CrossbeamReceiver<Vec<Shred>>,
insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
duplicate_sender: CrossbeamSender<Shred>,
) -> JoinHandle<()> {
let exit = exit.clone();
@ -390,7 +433,7 @@ impl WindowService {
id: Pubkey,
exit: &Arc<AtomicBool>,
blockstore: &Arc<Blockstore>,
insert_sender: CrossbeamSender<Vec<Shred>>,
insert_sender: CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
shred_filter: F,
bank_forks: Option<Arc<RwLock<BankForks>>>,
@ -488,13 +531,12 @@ impl WindowService {
#[cfg(test)]
mod test {
use super::*;
use solana_ledger::shred::DataShredHeader;
use solana_ledger::{
blockstore::{make_many_slot_entries, Blockstore},
entry::{create_ticks, Entry},
genesis_utils::create_genesis_config_with_leader,
get_tmp_ledger_path,
shred::Shredder,
shred::{DataShredHeader, Shredder},
};
use solana_sdk::{
clock::Slot,