v1.0: Add nonce to shreds repairs, add shred data size to header (#10110)
* Add nonce to shreds/repairs * Add data shred size to header * Align with future epoch Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
@ -13,8 +13,7 @@ use solana_core::{
|
|||||||
contact_info::ContactInfo,
|
contact_info::ContactInfo,
|
||||||
gossip_service::GossipService,
|
gossip_service::GossipService,
|
||||||
packet::{limited_deserialize, PACKET_DATA_SIZE},
|
packet::{limited_deserialize, PACKET_DATA_SIZE},
|
||||||
repair_service,
|
repair_service::{self, RepairService, RepairSlotRange, RepairStats, RepairStrategy},
|
||||||
repair_service::{RepairService, RepairSlotRange, RepairStats, RepairStrategy},
|
|
||||||
serve_repair::ServeRepair,
|
serve_repair::ServeRepair,
|
||||||
shred_fetch_stage::ShredFetchStage,
|
shred_fetch_stage::ShredFetchStage,
|
||||||
sigverify_stage::{DisabledSigVerifier, SigVerifyStage},
|
sigverify_stage::{DisabledSigVerifier, SigVerifyStage},
|
||||||
@ -846,7 +845,7 @@ impl Archiver {
|
|||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|repair_request| {
|
.filter_map(|repair_request| {
|
||||||
serve_repair
|
serve_repair
|
||||||
.map_repair_request(&repair_request, &mut repair_stats)
|
.map_repair_request(&repair_request, &mut repair_stats, Some(0))
|
||||||
.map(|result| ((archiver_info.gossip, result), repair_request))
|
.map(|result| ((archiver_info.gossip, result), repair_request))
|
||||||
.ok()
|
.ok()
|
||||||
})
|
})
|
||||||
|
@ -5,6 +5,7 @@ extern crate test;
|
|||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
use solana_core::cluster_info::{ClusterInfo, Node};
|
use solana_core::cluster_info::{ClusterInfo, Node};
|
||||||
use solana_core::contact_info::ContactInfo;
|
use solana_core::contact_info::ContactInfo;
|
||||||
|
use solana_ledger::shred::{Shred, NONCE_SHRED_PAYLOAD_SIZE};
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::timing::timestamp;
|
use solana_sdk::timing::timestamp;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@ -20,9 +21,8 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
|
|||||||
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone());
|
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone());
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
|
||||||
const SHRED_SIZE: usize = 1024;
|
|
||||||
const NUM_SHREDS: usize = 32;
|
const NUM_SHREDS: usize = 32;
|
||||||
let shreds = vec![vec![0; SHRED_SIZE]; NUM_SHREDS];
|
let shreds = vec![vec![0; NONCE_SHRED_PAYLOAD_SIZE]; NUM_SHREDS];
|
||||||
let seeds = vec![[0u8; 32]; NUM_SHREDS];
|
let seeds = vec![[0u8; 32]; NUM_SHREDS];
|
||||||
let mut stakes = HashMap::new();
|
let mut stakes = HashMap::new();
|
||||||
const NUM_PEERS: usize = 200;
|
const NUM_PEERS: usize = 200;
|
||||||
|
@ -5,11 +5,11 @@ extern crate test;
|
|||||||
use solana_ledger::entry::{create_ticks, Entry};
|
use solana_ledger::entry::{create_ticks, Entry};
|
||||||
use solana_ledger::shred::{
|
use solana_ledger::shred::{
|
||||||
max_entries_per_n_shred, max_ticks_per_n_shreds, Shred, Shredder, RECOMMENDED_FEC_RATE,
|
max_entries_per_n_shred, max_ticks_per_n_shreds, Shred, Shredder, RECOMMENDED_FEC_RATE,
|
||||||
SIZE_OF_DATA_SHRED_PAYLOAD,
|
SIZE_OF_NONCE_DATA_SHRED_PAYLOAD,
|
||||||
};
|
};
|
||||||
use solana_perf::test_tx;
|
use solana_perf::test_tx;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::signature::{Keypair, Signer};
|
use solana_sdk::signature::Keypair;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use test::Bencher;
|
use test::Bencher;
|
||||||
|
|
||||||
@ -29,10 +29,11 @@ fn make_large_unchained_entries(txs_per_entry: u64, num_entries: u64) -> Vec<Ent
|
|||||||
#[bench]
|
#[bench]
|
||||||
fn bench_shredder_ticks(bencher: &mut Bencher) {
|
fn bench_shredder_ticks(bencher: &mut Bencher) {
|
||||||
let kp = Arc::new(Keypair::new());
|
let kp = Arc::new(Keypair::new());
|
||||||
let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD;
|
let shred_size = SIZE_OF_NONCE_DATA_SHRED_PAYLOAD;
|
||||||
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
|
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
|
||||||
// ~1Mb
|
// ~1Mb
|
||||||
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
|
let num_ticks =
|
||||||
|
max_ticks_per_n_shreds(1, Some(SIZE_OF_NONCE_DATA_SHRED_PAYLOAD)) * num_shreds as u64;
|
||||||
let entries = create_ticks(num_ticks, 0, Hash::default());
|
let entries = create_ticks(num_ticks, 0, Hash::default());
|
||||||
bencher.iter(|| {
|
bencher.iter(|| {
|
||||||
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0, 0).unwrap();
|
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0, 0).unwrap();
|
||||||
@ -43,10 +44,14 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
|
|||||||
#[bench]
|
#[bench]
|
||||||
fn bench_shredder_large_entries(bencher: &mut Bencher) {
|
fn bench_shredder_large_entries(bencher: &mut Bencher) {
|
||||||
let kp = Arc::new(Keypair::new());
|
let kp = Arc::new(Keypair::new());
|
||||||
let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD;
|
let shred_size = SIZE_OF_NONCE_DATA_SHRED_PAYLOAD;
|
||||||
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
|
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
|
||||||
let txs_per_entry = 128;
|
let txs_per_entry = 128;
|
||||||
let num_entries = max_entries_per_n_shred(&make_test_entry(txs_per_entry), num_shreds as u64);
|
let num_entries = max_entries_per_n_shred(
|
||||||
|
&make_test_entry(txs_per_entry),
|
||||||
|
num_shreds as u64,
|
||||||
|
Some(shred_size),
|
||||||
|
);
|
||||||
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
|
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
|
||||||
// 1Mb
|
// 1Mb
|
||||||
bencher.iter(|| {
|
bencher.iter(|| {
|
||||||
@ -58,10 +63,10 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
|
|||||||
#[bench]
|
#[bench]
|
||||||
fn bench_deshredder(bencher: &mut Bencher) {
|
fn bench_deshredder(bencher: &mut Bencher) {
|
||||||
let kp = Arc::new(Keypair::new());
|
let kp = Arc::new(Keypair::new());
|
||||||
let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD;
|
let shred_size = SIZE_OF_NONCE_DATA_SHRED_PAYLOAD;
|
||||||
// ~10Mb
|
// ~10Mb
|
||||||
let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size;
|
let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size;
|
||||||
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
|
let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64;
|
||||||
let entries = create_ticks(num_ticks, 0, Hash::default());
|
let entries = create_ticks(num_ticks, 0, Hash::default());
|
||||||
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp, 0, 0).unwrap();
|
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp, 0, 0).unwrap();
|
||||||
let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;
|
let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;
|
||||||
@ -73,7 +78,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
|
|||||||
|
|
||||||
#[bench]
|
#[bench]
|
||||||
fn bench_deserialize_hdr(bencher: &mut Bencher) {
|
fn bench_deserialize_hdr(bencher: &mut Bencher) {
|
||||||
let data = vec![0; SIZE_OF_DATA_SHRED_PAYLOAD];
|
let data = vec![0; SIZE_OF_NONCE_DATA_SHRED_PAYLOAD];
|
||||||
|
|
||||||
let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0, 0, 1);
|
let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0, 0, 1);
|
||||||
|
|
||||||
|
@ -390,7 +390,7 @@ mod test {
|
|||||||
)));
|
)));
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let mut genesis_config = create_genesis_config(10_000).genesis_config;
|
let mut genesis_config = create_genesis_config(10_000).genesis_config;
|
||||||
genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot) + 1;
|
genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot, None) + 1;
|
||||||
let bank0 = Arc::new(Bank::new(&genesis_config));
|
let bank0 = Arc::new(Bank::new(&genesis_config));
|
||||||
(
|
(
|
||||||
blockstore,
|
blockstore,
|
||||||
@ -484,7 +484,11 @@ mod test {
|
|||||||
// Interrupting the slot should cause the unfinished_slot and stats to reset
|
// Interrupting the slot should cause the unfinished_slot and stats to reset
|
||||||
let num_shreds = 1;
|
let num_shreds = 1;
|
||||||
assert!(num_shreds < num_shreds_per_slot);
|
assert!(num_shreds < num_shreds_per_slot);
|
||||||
let ticks1 = create_ticks(max_ticks_per_n_shreds(num_shreds), 0, genesis_config.hash());
|
let ticks1 = create_ticks(
|
||||||
|
max_ticks_per_n_shreds(num_shreds, None),
|
||||||
|
0,
|
||||||
|
genesis_config.hash(),
|
||||||
|
);
|
||||||
let receive_results = ReceiveResults {
|
let receive_results = ReceiveResults {
|
||||||
entries: ticks1.clone(),
|
entries: ticks1.clone(),
|
||||||
time_elapsed: Duration::new(2, 0),
|
time_elapsed: Duration::new(2, 0),
|
||||||
|
@ -34,6 +34,7 @@ pub mod packet;
|
|||||||
pub mod poh_recorder;
|
pub mod poh_recorder;
|
||||||
pub mod poh_service;
|
pub mod poh_service;
|
||||||
pub mod recvmmsg;
|
pub mod recvmmsg;
|
||||||
|
pub mod repair_response;
|
||||||
pub mod repair_service;
|
pub mod repair_service;
|
||||||
pub mod replay_stage;
|
pub mod replay_stage;
|
||||||
mod result;
|
mod result;
|
||||||
|
129
core/src/repair_response.rs
Normal file
129
core/src/repair_response.rs
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
use solana_ledger::{
|
||||||
|
blockstore::Blockstore,
|
||||||
|
shred::{Nonce, Shred, SIZE_OF_NONCE},
|
||||||
|
};
|
||||||
|
use solana_perf::packet::limited_deserialize;
|
||||||
|
use solana_sdk::{clock::Slot, packet::Packet};
|
||||||
|
use std::{io, net::SocketAddr};
|
||||||
|
|
||||||
|
pub fn repair_response_packet(
|
||||||
|
blockstore: &Blockstore,
|
||||||
|
slot: Slot,
|
||||||
|
shred_index: u64,
|
||||||
|
dest: &SocketAddr,
|
||||||
|
nonce: Option<Nonce>,
|
||||||
|
) -> Option<Packet> {
|
||||||
|
if Shred::is_nonce_unlocked(slot) && nonce.is_none()
|
||||||
|
|| !Shred::is_nonce_unlocked(slot) && nonce.is_some()
|
||||||
|
{
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let shred = blockstore
|
||||||
|
.get_data_shred(slot, shred_index)
|
||||||
|
.expect("Blockstore could not get data shred");
|
||||||
|
shred.map(|shred| repair_response_packet_from_shred(slot, shred, dest, nonce))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn repair_response_packet_from_shred(
|
||||||
|
slot: Slot,
|
||||||
|
shred: Vec<u8>,
|
||||||
|
dest: &SocketAddr,
|
||||||
|
nonce: Option<Nonce>,
|
||||||
|
) -> Packet {
|
||||||
|
let size_of_nonce = {
|
||||||
|
if Shred::is_nonce_unlocked(slot) {
|
||||||
|
assert!(nonce.is_some());
|
||||||
|
SIZE_OF_NONCE
|
||||||
|
} else {
|
||||||
|
assert!(nonce.is_none());
|
||||||
|
0
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let mut packet = Packet::default();
|
||||||
|
packet.meta.size = shred.len() + size_of_nonce;
|
||||||
|
packet.meta.set_addr(dest);
|
||||||
|
packet.data[..shred.len()].copy_from_slice(&shred);
|
||||||
|
let mut wr = io::Cursor::new(&mut packet.data[shred.len()..]);
|
||||||
|
if let Some(nonce) = nonce {
|
||||||
|
bincode::serialize_into(&mut wr, &nonce).expect("Buffer not large enough to fit nonce");
|
||||||
|
}
|
||||||
|
packet
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn nonce(buf: &[u8]) -> Option<Nonce> {
|
||||||
|
if buf.len() < SIZE_OF_NONCE {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
limited_deserialize(&buf[buf.len() - SIZE_OF_NONCE..]).ok()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::*;
|
||||||
|
use solana_ledger::{
|
||||||
|
shred::{Shred, Shredder, UNLOCK_NONCE_SLOT},
|
||||||
|
sigverify_shreds::verify_shred_cpu,
|
||||||
|
};
|
||||||
|
use solana_sdk::signature::{Keypair, Signer};
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
net::{IpAddr, Ipv4Addr},
|
||||||
|
};
|
||||||
|
|
||||||
|
fn run_test_sigverify_shred_cpu_repair(slot: Slot) {
|
||||||
|
solana_logger::setup();
|
||||||
|
let mut shred = Shred::new_from_data(
|
||||||
|
slot,
|
||||||
|
0xc0de,
|
||||||
|
0xdead,
|
||||||
|
Some(&[1, 2, 3, 4]),
|
||||||
|
true,
|
||||||
|
true,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0xc0de,
|
||||||
|
);
|
||||||
|
assert_eq!(shred.slot(), slot);
|
||||||
|
let keypair = Keypair::new();
|
||||||
|
Shredder::sign_shred(&keypair, &mut shred);
|
||||||
|
trace!("signature {}", shred.common_header.signature);
|
||||||
|
let nonce = if Shred::is_nonce_unlocked(slot) {
|
||||||
|
Some(9)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
let mut packet = repair_response_packet_from_shred(
|
||||||
|
slot,
|
||||||
|
shred.payload,
|
||||||
|
&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
|
||||||
|
nonce,
|
||||||
|
);
|
||||||
|
packet.meta.repair = true;
|
||||||
|
|
||||||
|
let leader_slots = [(slot, keypair.pubkey().to_bytes())]
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
let rv = verify_shred_cpu(&packet, &leader_slots);
|
||||||
|
assert_eq!(rv, Some(1));
|
||||||
|
|
||||||
|
let wrong_keypair = Keypair::new();
|
||||||
|
let leader_slots = [(slot, wrong_keypair.pubkey().to_bytes())]
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
let rv = verify_shred_cpu(&packet, &leader_slots);
|
||||||
|
assert_eq!(rv, Some(0));
|
||||||
|
|
||||||
|
let leader_slots = HashMap::new();
|
||||||
|
let rv = verify_shred_cpu(&packet, &leader_slots);
|
||||||
|
assert_eq!(rv, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sigverify_shred_cpu_repair() {
|
||||||
|
run_test_sigverify_shred_cpu_repair(UNLOCK_NONCE_SLOT);
|
||||||
|
run_test_sigverify_shred_cpu_repair(UNLOCK_NONCE_SLOT + 1);
|
||||||
|
}
|
||||||
|
}
|
@ -168,22 +168,16 @@ impl RepairService {
|
|||||||
};
|
};
|
||||||
|
|
||||||
if let Ok(repairs) = repairs {
|
if let Ok(repairs) = repairs {
|
||||||
let reqs: Vec<_> = repairs
|
repairs.into_iter().for_each(|repair_request| {
|
||||||
.into_iter()
|
if let Ok((to, req)) =
|
||||||
.filter_map(|repair_request| {
|
serve_repair.repair_request(&repair_request, &mut repair_stats)
|
||||||
serve_repair
|
{
|
||||||
.repair_request(&repair_request, &mut repair_stats)
|
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
|
||||||
.map(|result| (result, repair_request))
|
info!("{} repair req send_to({}) error {:?}", id, to, e);
|
||||||
.ok()
|
0
|
||||||
})
|
});
|
||||||
.collect();
|
}
|
||||||
|
});
|
||||||
for ((to, req), _) in reqs {
|
|
||||||
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
|
|
||||||
info!("{} repair req send_to({}) error {:?}", id, to, e);
|
|
||||||
0
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if last_stats.elapsed().as_secs() > 1 {
|
if last_stats.elapsed().as_secs() > 1 {
|
||||||
let repair_total = repair_stats.shred.count
|
let repair_total = repair_stats.shred.count
|
||||||
@ -607,7 +601,7 @@ mod test {
|
|||||||
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
||||||
|
|
||||||
let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
|
let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
|
||||||
let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1;
|
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
|
||||||
|
|
||||||
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
|
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
|
||||||
for (mut slot_shreds, _) in shreds.into_iter() {
|
for (mut slot_shreds, _) in shreds.into_iter() {
|
||||||
|
@ -1738,6 +1738,7 @@ pub(crate) mod tests {
|
|||||||
ShredCommonHeader::default(),
|
ShredCommonHeader::default(),
|
||||||
data_header,
|
data_header,
|
||||||
CodingShredHeader::default(),
|
CodingShredHeader::default(),
|
||||||
|
PACKET_DATA_SIZE,
|
||||||
);
|
);
|
||||||
bincode::serialize_into(
|
bincode::serialize_into(
|
||||||
&mut shred.payload[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER..],
|
&mut shred.payload[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER..],
|
||||||
|
@ -1,19 +1,21 @@
|
|||||||
use crate::packet::limited_deserialize;
|
|
||||||
use crate::streamer::{PacketReceiver, PacketSender};
|
use crate::streamer::{PacketReceiver, PacketSender};
|
||||||
use crate::{
|
use crate::{
|
||||||
cluster_info::{ClusterInfo, ClusterInfoError},
|
cluster_info::{ClusterInfo, ClusterInfoError},
|
||||||
contact_info::ContactInfo,
|
contact_info::ContactInfo,
|
||||||
packet::Packet,
|
repair_response,
|
||||||
repair_service::RepairStats,
|
repair_service::RepairStats,
|
||||||
result::{Error, Result},
|
result::{Error, Result},
|
||||||
};
|
};
|
||||||
use bincode::serialize;
|
use bincode::serialize;
|
||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
use solana_ledger::blockstore::Blockstore;
|
use solana_ledger::{
|
||||||
|
blockstore::Blockstore,
|
||||||
|
shred::{Nonce, Shred},
|
||||||
|
};
|
||||||
use solana_measure::measure::Measure;
|
use solana_measure::measure::Measure;
|
||||||
use solana_measure::thread_mem_usage;
|
use solana_measure::thread_mem_usage;
|
||||||
use solana_metrics::{datapoint_debug, inc_new_counter_debug};
|
use solana_metrics::{datapoint_debug, inc_new_counter_debug};
|
||||||
use solana_perf::packet::{Packets, PacketsRecycler};
|
use solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler};
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
clock::Slot,
|
clock::Slot,
|
||||||
signature::{Keypair, Signer},
|
signature::{Keypair, Signer},
|
||||||
@ -29,6 +31,7 @@ use std::{
|
|||||||
|
|
||||||
/// the number of slots to respond with when responding to `Orphan` requests
|
/// the number of slots to respond with when responding to `Orphan` requests
|
||||||
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
|
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
|
||||||
|
pub const DEFAULT_NONCE: u32 = 42;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
pub enum RepairType {
|
pub enum RepairType {
|
||||||
@ -63,6 +66,9 @@ enum RepairProtocol {
|
|||||||
WindowIndex(ContactInfo, u64, u64),
|
WindowIndex(ContactInfo, u64, u64),
|
||||||
HighestWindowIndex(ContactInfo, u64, u64),
|
HighestWindowIndex(ContactInfo, u64, u64),
|
||||||
Orphan(ContactInfo, u64),
|
Orphan(ContactInfo, u64),
|
||||||
|
WindowIndexWithNonce(ContactInfo, u64, u64, Nonce),
|
||||||
|
HighestWindowIndexWithNonce(ContactInfo, u64, u64, Nonce),
|
||||||
|
OrphanWithNonce(ContactInfo, u64, Nonce),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@ -106,6 +112,9 @@ impl ServeRepair {
|
|||||||
RepairProtocol::WindowIndex(ref from, _, _) => from,
|
RepairProtocol::WindowIndex(ref from, _, _) => from,
|
||||||
RepairProtocol::HighestWindowIndex(ref from, _, _) => from,
|
RepairProtocol::HighestWindowIndex(ref from, _, _) => from,
|
||||||
RepairProtocol::Orphan(ref from, _) => from,
|
RepairProtocol::Orphan(ref from, _) => from,
|
||||||
|
RepairProtocol::WindowIndexWithNonce(ref from, _, _, _) => from,
|
||||||
|
RepairProtocol::HighestWindowIndexWithNonce(ref from, _, _, _) => from,
|
||||||
|
RepairProtocol::OrphanWithNonce(ref from, _, _) => from,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -140,6 +149,7 @@ impl ServeRepair {
|
|||||||
&me.read().unwrap().my_info,
|
&me.read().unwrap().my_info,
|
||||||
*slot,
|
*slot,
|
||||||
*shred_index,
|
*shred_index,
|
||||||
|
None,
|
||||||
),
|
),
|
||||||
"WindowIndex",
|
"WindowIndex",
|
||||||
)
|
)
|
||||||
@ -154,6 +164,7 @@ impl ServeRepair {
|
|||||||
blockstore,
|
blockstore,
|
||||||
*slot,
|
*slot,
|
||||||
*highest_index,
|
*highest_index,
|
||||||
|
None,
|
||||||
),
|
),
|
||||||
"HighestWindowIndex",
|
"HighestWindowIndex",
|
||||||
)
|
)
|
||||||
@ -167,10 +178,55 @@ impl ServeRepair {
|
|||||||
blockstore,
|
blockstore,
|
||||||
*slot,
|
*slot,
|
||||||
MAX_ORPHAN_REPAIR_RESPONSES,
|
MAX_ORPHAN_REPAIR_RESPONSES,
|
||||||
|
None,
|
||||||
),
|
),
|
||||||
"Orphan",
|
"Orphan",
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
RepairProtocol::WindowIndexWithNonce(_, slot, shred_index, nonce) => {
|
||||||
|
stats.window_index += 1;
|
||||||
|
(
|
||||||
|
Self::run_window_request(
|
||||||
|
recycler,
|
||||||
|
from,
|
||||||
|
&from_addr,
|
||||||
|
blockstore,
|
||||||
|
&me.read().unwrap().my_info,
|
||||||
|
*slot,
|
||||||
|
*shred_index,
|
||||||
|
Some(*nonce),
|
||||||
|
),
|
||||||
|
"WindowIndexWithNonce",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
RepairProtocol::HighestWindowIndexWithNonce(_, slot, highest_index, nonce) => {
|
||||||
|
stats.highest_window_index += 1;
|
||||||
|
(
|
||||||
|
Self::run_highest_window_request(
|
||||||
|
recycler,
|
||||||
|
&from_addr,
|
||||||
|
blockstore,
|
||||||
|
*slot,
|
||||||
|
*highest_index,
|
||||||
|
Some(*nonce),
|
||||||
|
),
|
||||||
|
"HighestWindowIndexWithNonce",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
RepairProtocol::OrphanWithNonce(_, slot, nonce) => {
|
||||||
|
stats.orphan += 1;
|
||||||
|
(
|
||||||
|
Self::run_orphan(
|
||||||
|
recycler,
|
||||||
|
&from_addr,
|
||||||
|
blockstore,
|
||||||
|
*slot,
|
||||||
|
MAX_ORPHAN_REPAIR_RESPONSES,
|
||||||
|
Some(*nonce),
|
||||||
|
),
|
||||||
|
"OrphanWithNonce",
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -321,20 +377,47 @@ impl ServeRepair {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn window_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> {
|
fn window_index_request_bytes(
|
||||||
let req = RepairProtocol::WindowIndex(self.my_info.clone(), slot, shred_index);
|
&self,
|
||||||
|
slot: Slot,
|
||||||
|
shred_index: u64,
|
||||||
|
nonce: Option<Nonce>,
|
||||||
|
) -> Result<Vec<u8>> {
|
||||||
|
let req = if let Some(nonce) = nonce {
|
||||||
|
RepairProtocol::WindowIndexWithNonce(self.my_info.clone(), slot, shred_index, nonce)
|
||||||
|
} else {
|
||||||
|
RepairProtocol::WindowIndex(self.my_info.clone(), slot, shred_index)
|
||||||
|
};
|
||||||
let out = serialize(&req)?;
|
let out = serialize(&req)?;
|
||||||
Ok(out)
|
Ok(out)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn window_highest_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> {
|
fn window_highest_index_request_bytes(
|
||||||
let req = RepairProtocol::HighestWindowIndex(self.my_info.clone(), slot, shred_index);
|
&self,
|
||||||
|
slot: Slot,
|
||||||
|
shred_index: u64,
|
||||||
|
nonce: Option<Nonce>,
|
||||||
|
) -> Result<Vec<u8>> {
|
||||||
|
let req = if let Some(nonce) = nonce {
|
||||||
|
RepairProtocol::HighestWindowIndexWithNonce(
|
||||||
|
self.my_info.clone(),
|
||||||
|
slot,
|
||||||
|
shred_index,
|
||||||
|
nonce,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
RepairProtocol::HighestWindowIndex(self.my_info.clone(), slot, shred_index)
|
||||||
|
};
|
||||||
let out = serialize(&req)?;
|
let out = serialize(&req)?;
|
||||||
Ok(out)
|
Ok(out)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn orphan_bytes(&self, slot: Slot) -> Result<Vec<u8>> {
|
fn orphan_bytes(&self, slot: Slot, nonce: Option<Nonce>) -> Result<Vec<u8>> {
|
||||||
let req = RepairProtocol::Orphan(self.my_info.clone(), slot);
|
let req = if let Some(nonce) = nonce {
|
||||||
|
RepairProtocol::OrphanWithNonce(self.my_info.clone(), slot, nonce)
|
||||||
|
} else {
|
||||||
|
RepairProtocol::Orphan(self.my_info.clone(), slot)
|
||||||
|
};
|
||||||
let out = serialize(&req)?;
|
let out = serialize(&req)?;
|
||||||
Ok(out)
|
Ok(out)
|
||||||
}
|
}
|
||||||
@ -346,6 +429,7 @@ impl ServeRepair {
|
|||||||
) -> Result<(SocketAddr, Vec<u8>)> {
|
) -> Result<(SocketAddr, Vec<u8>)> {
|
||||||
// find a peer that appears to be accepting replication and has the desired slot, as indicated
|
// find a peer that appears to be accepting replication and has the desired slot, as indicated
|
||||||
// by a valid tvu port location
|
// by a valid tvu port location
|
||||||
|
let slot = repair_request.slot();
|
||||||
let valid: Vec<_> = self
|
let valid: Vec<_> = self
|
||||||
.cluster_info
|
.cluster_info
|
||||||
.read()
|
.read()
|
||||||
@ -356,7 +440,12 @@ impl ServeRepair {
|
|||||||
}
|
}
|
||||||
let n = thread_rng().gen::<usize>() % valid.len();
|
let n = thread_rng().gen::<usize>() % valid.len();
|
||||||
let addr = valid[n].serve_repair; // send the request to the peer's serve_repair port
|
let addr = valid[n].serve_repair; // send the request to the peer's serve_repair port
|
||||||
let out = self.map_repair_request(repair_request, repair_stats)?;
|
let nonce = if Shred::is_nonce_unlocked(slot) {
|
||||||
|
Some(DEFAULT_NONCE)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
let out = self.map_repair_request(&repair_request, repair_stats, nonce)?;
|
||||||
|
|
||||||
Ok((addr, out))
|
Ok((addr, out))
|
||||||
}
|
}
|
||||||
@ -365,19 +454,24 @@ impl ServeRepair {
|
|||||||
&self,
|
&self,
|
||||||
repair_request: &RepairType,
|
repair_request: &RepairType,
|
||||||
repair_stats: &mut RepairStats,
|
repair_stats: &mut RepairStats,
|
||||||
|
nonce: Option<Nonce>,
|
||||||
) -> Result<Vec<u8>> {
|
) -> Result<Vec<u8>> {
|
||||||
|
let slot = repair_request.slot();
|
||||||
|
if Shred::is_nonce_unlocked(slot) {
|
||||||
|
assert!(nonce.is_some());
|
||||||
|
}
|
||||||
match repair_request {
|
match repair_request {
|
||||||
RepairType::Shred(slot, shred_index) => {
|
RepairType::Shred(slot, shred_index) => {
|
||||||
repair_stats.shred.update(*slot);
|
repair_stats.shred.update(*slot);
|
||||||
Ok(self.window_index_request_bytes(*slot, *shred_index)?)
|
Ok(self.window_index_request_bytes(*slot, *shred_index, nonce)?)
|
||||||
}
|
}
|
||||||
RepairType::HighestShred(slot, shred_index) => {
|
RepairType::HighestShred(slot, shred_index) => {
|
||||||
repair_stats.highest_shred.update(*slot);
|
repair_stats.highest_shred.update(*slot);
|
||||||
Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?)
|
Ok(self.window_highest_index_request_bytes(*slot, *shred_index, nonce)?)
|
||||||
}
|
}
|
||||||
RepairType::Orphan(slot) => {
|
RepairType::Orphan(slot) => {
|
||||||
repair_stats.orphan.update(*slot);
|
repair_stats.orphan.update(*slot);
|
||||||
Ok(self.orphan_bytes(*slot)?)
|
Ok(self.orphan_bytes(*slot, nonce)?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -390,12 +484,19 @@ impl ServeRepair {
|
|||||||
me: &ContactInfo,
|
me: &ContactInfo,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
shred_index: u64,
|
shred_index: u64,
|
||||||
|
nonce: Option<Nonce>,
|
||||||
) -> Option<Packets> {
|
) -> Option<Packets> {
|
||||||
if let Some(blockstore) = blockstore {
|
if let Some(blockstore) = blockstore {
|
||||||
// Try to find the requested index in one of the slots
|
// Try to find the requested index in one of the slots
|
||||||
let packet = Self::get_data_shred_as_packet(blockstore, slot, shred_index, from_addr);
|
let packet = repair_response::repair_response_packet(
|
||||||
|
blockstore,
|
||||||
|
slot,
|
||||||
|
shred_index,
|
||||||
|
from_addr,
|
||||||
|
nonce,
|
||||||
|
);
|
||||||
|
|
||||||
if let Ok(Some(packet)) = packet {
|
if let Some(packet) = packet {
|
||||||
inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
|
inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
|
||||||
return Some(Packets::new_with_recycler_data(
|
return Some(Packets::new_with_recycler_data(
|
||||||
recycler,
|
recycler,
|
||||||
@ -423,15 +524,20 @@ impl ServeRepair {
|
|||||||
blockstore: Option<&Arc<Blockstore>>,
|
blockstore: Option<&Arc<Blockstore>>,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
highest_index: u64,
|
highest_index: u64,
|
||||||
|
nonce: Option<Nonce>,
|
||||||
) -> Option<Packets> {
|
) -> Option<Packets> {
|
||||||
let blockstore = blockstore?;
|
let blockstore = blockstore?;
|
||||||
// Try to find the requested index in one of the slots
|
// Try to find the requested index in one of the slots
|
||||||
let meta = blockstore.meta(slot).ok()??;
|
let meta = blockstore.meta(slot).ok()??;
|
||||||
if meta.received > highest_index {
|
if meta.received > highest_index {
|
||||||
// meta.received must be at least 1 by this point
|
// meta.received must be at least 1 by this point
|
||||||
let packet =
|
let packet = repair_response::repair_response_packet(
|
||||||
Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr)
|
blockstore,
|
||||||
.ok()??;
|
slot,
|
||||||
|
meta.received - 1,
|
||||||
|
from_addr,
|
||||||
|
nonce,
|
||||||
|
)?;
|
||||||
return Some(Packets::new_with_recycler_data(
|
return Some(Packets::new_with_recycler_data(
|
||||||
recycler,
|
recycler,
|
||||||
"run_highest_window_request",
|
"run_highest_window_request",
|
||||||
@ -447,6 +553,7 @@ impl ServeRepair {
|
|||||||
blockstore: Option<&Arc<Blockstore>>,
|
blockstore: Option<&Arc<Blockstore>>,
|
||||||
mut slot: Slot,
|
mut slot: Slot,
|
||||||
max_responses: usize,
|
max_responses: usize,
|
||||||
|
nonce: Option<Nonce>,
|
||||||
) -> Option<Packets> {
|
) -> Option<Packets> {
|
||||||
let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan");
|
let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan");
|
||||||
if let Some(blockstore) = blockstore {
|
if let Some(blockstore) = blockstore {
|
||||||
@ -455,9 +562,19 @@ impl ServeRepair {
|
|||||||
if meta.received == 0 {
|
if meta.received == 0 {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
let packet =
|
let nonce = if Shred::is_nonce_unlocked(slot) {
|
||||||
Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr);
|
nonce
|
||||||
if let Ok(Some(packet)) = packet {
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
let packet = repair_response::repair_response_packet(
|
||||||
|
blockstore,
|
||||||
|
slot,
|
||||||
|
meta.received - 1,
|
||||||
|
from_addr,
|
||||||
|
nonce,
|
||||||
|
);
|
||||||
|
if let Some(packet) = packet {
|
||||||
res.packets.push(packet);
|
res.packets.push(packet);
|
||||||
}
|
}
|
||||||
if meta.is_parent_set() && res.packets.len() <= max_responses {
|
if meta.is_parent_set() && res.packets.len() <= max_responses {
|
||||||
@ -472,41 +589,31 @@ impl ServeRepair {
|
|||||||
}
|
}
|
||||||
Some(res)
|
Some(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_data_shred_as_packet(
|
|
||||||
blockstore: &Arc<Blockstore>,
|
|
||||||
slot: Slot,
|
|
||||||
shred_index: u64,
|
|
||||||
dest: &SocketAddr,
|
|
||||||
) -> Result<Option<Packet>> {
|
|
||||||
let data = blockstore.get_data_shred(slot, shred_index)?;
|
|
||||||
Ok(data.map(|data| {
|
|
||||||
let mut packet = Packet::default();
|
|
||||||
packet.meta.size = data.len();
|
|
||||||
packet.meta.set_addr(dest);
|
|
||||||
packet.data.copy_from_slice(&data);
|
|
||||||
packet
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::result::Error;
|
use crate::{repair_response, result::Error};
|
||||||
use solana_ledger::get_tmp_ledger_path;
|
use solana_ledger::get_tmp_ledger_path;
|
||||||
use solana_ledger::{
|
use solana_ledger::{
|
||||||
blockstore::make_many_slot_entries,
|
blockstore::make_many_slot_entries,
|
||||||
blockstore_processor::fill_blockstore_slot_with_ticks,
|
blockstore_processor::fill_blockstore_slot_with_ticks,
|
||||||
shred::{
|
shred::{
|
||||||
max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader,
|
max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader,
|
||||||
|
NONCE_SHRED_PAYLOAD_SIZE, UNLOCK_NONCE_SLOT,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use solana_sdk::{hash::Hash, pubkey::Pubkey, timing::timestamp};
|
use solana_sdk::{hash::Hash, pubkey::Pubkey, timing::timestamp};
|
||||||
|
|
||||||
/// test run_window_requestwindow requests respond with the right shred, and do not overrun
|
|
||||||
#[test]
|
#[test]
|
||||||
fn run_highest_window_request() {
|
fn test_run_highest_window_request() {
|
||||||
|
run_highest_window_request(UNLOCK_NONCE_SLOT + 3, 3, Some(9));
|
||||||
|
run_highest_window_request(UNLOCK_NONCE_SLOT, 3, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// test run_window_request responds with the right shred, and do not overrun
|
||||||
|
fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Option<Nonce>) {
|
||||||
let recycler = PacketsRecycler::default();
|
let recycler = PacketsRecycler::default();
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
let ledger_path = get_tmp_ledger_path!();
|
let ledger_path = get_tmp_ledger_path!();
|
||||||
@ -518,41 +625,51 @@ mod tests {
|
|||||||
Some(&blockstore),
|
Some(&blockstore),
|
||||||
0,
|
0,
|
||||||
0,
|
0,
|
||||||
|
nonce,
|
||||||
);
|
);
|
||||||
assert!(rv.is_none());
|
assert!(rv.is_none());
|
||||||
|
|
||||||
let _ = fill_blockstore_slot_with_ticks(
|
let _ = fill_blockstore_slot_with_ticks(
|
||||||
&blockstore,
|
&blockstore,
|
||||||
max_ticks_per_n_shreds(1) + 1,
|
max_ticks_per_n_shreds(1, None) + 1,
|
||||||
2,
|
slot,
|
||||||
1,
|
slot - num_slots + 1,
|
||||||
Hash::default(),
|
Hash::default(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let index = 1;
|
||||||
let rv = ServeRepair::run_highest_window_request(
|
let rv = ServeRepair::run_highest_window_request(
|
||||||
&recycler,
|
&recycler,
|
||||||
&socketaddr_any!(),
|
&socketaddr_any!(),
|
||||||
Some(&blockstore),
|
Some(&blockstore),
|
||||||
2,
|
slot,
|
||||||
1,
|
index,
|
||||||
);
|
nonce,
|
||||||
|
)
|
||||||
|
.expect("packets");
|
||||||
|
|
||||||
let rv: Vec<Shred> = rv
|
let rv: Vec<Shred> = rv
|
||||||
.expect("packets")
|
|
||||||
.packets
|
.packets
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
|
.filter_map(|b| {
|
||||||
|
if nonce.is_some() {
|
||||||
|
assert_eq!(repair_response::nonce(&b.data[..]), nonce);
|
||||||
|
}
|
||||||
|
Shred::new_from_serialized_shred(b.data.to_vec()).ok()
|
||||||
|
})
|
||||||
.collect();
|
.collect();
|
||||||
assert!(!rv.is_empty());
|
assert!(!rv.is_empty());
|
||||||
let index = blockstore.meta(2).unwrap().unwrap().received - 1;
|
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
|
||||||
assert_eq!(rv[0].index(), index as u32);
|
assert_eq!(rv[0].index(), index as u32);
|
||||||
assert_eq!(rv[0].slot(), 2);
|
assert_eq!(rv[0].slot(), slot);
|
||||||
|
|
||||||
let rv = ServeRepair::run_highest_window_request(
|
let rv = ServeRepair::run_highest_window_request(
|
||||||
&recycler,
|
&recycler,
|
||||||
&socketaddr_any!(),
|
&socketaddr_any!(),
|
||||||
Some(&blockstore),
|
Some(&blockstore),
|
||||||
2,
|
slot,
|
||||||
index + 1,
|
index + 1,
|
||||||
|
nonce,
|
||||||
);
|
);
|
||||||
assert!(rv.is_none());
|
assert!(rv.is_none());
|
||||||
}
|
}
|
||||||
@ -560,9 +677,14 @@ mod tests {
|
|||||||
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
|
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// test window requests respond with the right shred, and do not overrun
|
|
||||||
#[test]
|
#[test]
|
||||||
fn run_window_request() {
|
fn test_run_window_request() {
|
||||||
|
run_window_request(UNLOCK_NONCE_SLOT + 1, Some(9));
|
||||||
|
run_window_request(UNLOCK_NONCE_SLOT - 3, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// test window requests respond with the right shred, and do not overrun
|
||||||
|
fn run_window_request(slot: Slot, nonce: Option<Nonce>) {
|
||||||
let recycler = PacketsRecycler::default();
|
let recycler = PacketsRecycler::default();
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
let ledger_path = get_tmp_ledger_path!();
|
let ledger_path = get_tmp_ledger_path!();
|
||||||
@ -589,12 +711,13 @@ mod tests {
|
|||||||
&socketaddr_any!(),
|
&socketaddr_any!(),
|
||||||
Some(&blockstore),
|
Some(&blockstore),
|
||||||
&me,
|
&me,
|
||||||
|
slot,
|
||||||
0,
|
0,
|
||||||
0,
|
nonce,
|
||||||
);
|
);
|
||||||
assert!(rv.is_none());
|
assert!(rv.is_none());
|
||||||
let mut common_header = ShredCommonHeader::default();
|
let mut common_header = ShredCommonHeader::default();
|
||||||
common_header.slot = 2;
|
common_header.slot = slot;
|
||||||
common_header.index = 1;
|
common_header.index = 1;
|
||||||
let mut data_header = DataShredHeader::default();
|
let mut data_header = DataShredHeader::default();
|
||||||
data_header.parent_offset = 1;
|
data_header.parent_offset = 1;
|
||||||
@ -602,30 +725,37 @@ mod tests {
|
|||||||
common_header,
|
common_header,
|
||||||
data_header,
|
data_header,
|
||||||
CodingShredHeader::default(),
|
CodingShredHeader::default(),
|
||||||
|
NONCE_SHRED_PAYLOAD_SIZE,
|
||||||
);
|
);
|
||||||
|
|
||||||
blockstore
|
blockstore
|
||||||
.insert_shreds(vec![shred_info], None, false)
|
.insert_shreds(vec![shred_info], None, false)
|
||||||
.expect("Expect successful ledger write");
|
.expect("Expect successful ledger write");
|
||||||
|
|
||||||
|
let index = 1;
|
||||||
let rv = ServeRepair::run_window_request(
|
let rv = ServeRepair::run_window_request(
|
||||||
&recycler,
|
&recycler,
|
||||||
&me,
|
&me,
|
||||||
&socketaddr_any!(),
|
&socketaddr_any!(),
|
||||||
Some(&blockstore),
|
Some(&blockstore),
|
||||||
&me,
|
&me,
|
||||||
2,
|
slot,
|
||||||
1,
|
index,
|
||||||
);
|
nonce,
|
||||||
assert!(!rv.is_none());
|
)
|
||||||
|
.expect("packets");
|
||||||
let rv: Vec<Shred> = rv
|
let rv: Vec<Shred> = rv
|
||||||
.expect("packets")
|
|
||||||
.packets
|
.packets
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
|
.filter_map(|b| {
|
||||||
|
if nonce.is_some() {
|
||||||
|
assert_eq!(repair_response::nonce(&b.data[..]), nonce);
|
||||||
|
}
|
||||||
|
Shred::new_from_serialized_shred(b.data.to_vec()).ok()
|
||||||
|
})
|
||||||
.collect();
|
.collect();
|
||||||
assert_eq!(rv[0].index(), 1);
|
assert_eq!(rv[0].index(), 1);
|
||||||
assert_eq!(rv[0].slot(), 2);
|
assert_eq!(rv[0].slot(), slot);
|
||||||
}
|
}
|
||||||
|
|
||||||
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
|
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
|
||||||
@ -697,52 +827,85 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn run_orphan() {
|
fn test_run_orphan() {
|
||||||
|
run_orphan(UNLOCK_NONCE_SLOT + 1, 3, Some(9));
|
||||||
|
// Test where the response will be for some slots <= UNLOCK_NONCE_SLOT,
|
||||||
|
// and some of the response will be for some slots > UNLOCK_NONCE_SLOT.
|
||||||
|
// Should not panic.
|
||||||
|
run_orphan(UNLOCK_NONCE_SLOT, 3, None);
|
||||||
|
run_orphan(UNLOCK_NONCE_SLOT, 3, Some(9));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_orphan(slot: Slot, num_slots: u64, nonce: Option<Nonce>) {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
let recycler = PacketsRecycler::default();
|
let recycler = PacketsRecycler::default();
|
||||||
let ledger_path = get_tmp_ledger_path!();
|
let ledger_path = get_tmp_ledger_path!();
|
||||||
{
|
{
|
||||||
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||||
let rv =
|
let rv = ServeRepair::run_orphan(
|
||||||
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 2, 0);
|
&recycler,
|
||||||
|
&socketaddr_any!(),
|
||||||
|
Some(&blockstore),
|
||||||
|
slot,
|
||||||
|
0,
|
||||||
|
nonce,
|
||||||
|
);
|
||||||
assert!(rv.is_none());
|
assert!(rv.is_none());
|
||||||
|
|
||||||
// Create slots 1, 2, 3 with 5 shreds apiece
|
// Create slots [slot, slot + num_slots) with 5 shreds apiece
|
||||||
let (shreds, _) = make_many_slot_entries(1, 3, 5);
|
let (shreds, _) = make_many_slot_entries(slot, num_slots, 5);
|
||||||
|
|
||||||
blockstore
|
blockstore
|
||||||
.insert_shreds(shreds, None, false)
|
.insert_shreds(shreds, None, false)
|
||||||
.expect("Expect successful ledger write");
|
.expect("Expect successful ledger write");
|
||||||
|
|
||||||
// We don't have slot 4, so we don't know how to service this requeset
|
// We don't have slot `slot + num_slots`, so we don't know how to service this request
|
||||||
let rv =
|
let rv = ServeRepair::run_orphan(
|
||||||
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 4, 5);
|
&recycler,
|
||||||
|
&socketaddr_any!(),
|
||||||
|
Some(&blockstore),
|
||||||
|
slot + num_slots,
|
||||||
|
5,
|
||||||
|
nonce,
|
||||||
|
);
|
||||||
assert!(rv.is_none());
|
assert!(rv.is_none());
|
||||||
|
|
||||||
// For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively
|
// For a orphan request for `slot + num_slots - 1`, we should return the highest shreds
|
||||||
// for this request
|
// from slots in the range [slot, slot + num_slots - 1]
|
||||||
let rv: Vec<_> =
|
let rv: Vec<_> = ServeRepair::run_orphan(
|
||||||
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 3, 5)
|
&recycler,
|
||||||
.expect("run_orphan packets")
|
&socketaddr_any!(),
|
||||||
.packets
|
Some(&blockstore),
|
||||||
.iter()
|
slot + num_slots - 1,
|
||||||
.map(|b| b.clone())
|
5,
|
||||||
.collect();
|
nonce,
|
||||||
let expected: Vec<_> = (1..=3)
|
)
|
||||||
|
.expect("run_orphan packets")
|
||||||
|
.packets
|
||||||
|
.iter()
|
||||||
|
.map(|b| b.clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Verify responses
|
||||||
|
let expected: Vec<_> = (slot..slot + num_slots)
|
||||||
.rev()
|
.rev()
|
||||||
.map(|slot| {
|
.filter_map(|slot| {
|
||||||
|
let nonce = if Shred::is_nonce_unlocked(slot) {
|
||||||
|
nonce
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
|
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
|
||||||
ServeRepair::get_data_shred_as_packet(
|
repair_response::repair_response_packet(
|
||||||
&blockstore,
|
&blockstore,
|
||||||
slot,
|
slot,
|
||||||
index,
|
index,
|
||||||
&socketaddr_any!(),
|
&socketaddr_any!(),
|
||||||
|
nonce,
|
||||||
)
|
)
|
||||||
.unwrap()
|
|
||||||
.unwrap()
|
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
assert_eq!(rv, expected)
|
assert_eq!(rv, expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
|
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
|
||||||
|
@ -1,31 +1,37 @@
|
|||||||
//! `window_service` handles the data plane incoming shreds, storing them in
|
//! `window_service` handles the data plane incoming shreds, storing them in
|
||||||
//! blockstore and retransmitting where required
|
//! blockstore and retransmitting where required
|
||||||
//!
|
use crate::{
|
||||||
use crate::cluster_info::ClusterInfo;
|
cluster_info::ClusterInfo,
|
||||||
use crate::packet::Packets;
|
packet::Packets,
|
||||||
use crate::repair_service::{RepairService, RepairStrategy};
|
repair_response,
|
||||||
use crate::result::{Error, Result};
|
repair_service::{RepairService, RepairStrategy},
|
||||||
use crate::streamer::PacketSender;
|
result::{Error, Result},
|
||||||
|
serve_repair::DEFAULT_NONCE,
|
||||||
|
streamer::PacketSender,
|
||||||
|
};
|
||||||
use crossbeam_channel::{
|
use crossbeam_channel::{
|
||||||
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
|
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
|
||||||
};
|
};
|
||||||
use rayon::iter::IntoParallelRefMutIterator;
|
use rayon::iter::IntoParallelRefMutIterator;
|
||||||
use rayon::iter::ParallelIterator;
|
use rayon::iter::ParallelIterator;
|
||||||
use rayon::ThreadPool;
|
use rayon::ThreadPool;
|
||||||
use solana_ledger::bank_forks::BankForks;
|
use solana_ledger::{
|
||||||
use solana_ledger::blockstore::{self, Blockstore, MAX_DATA_SHREDS_PER_SLOT};
|
bank_forks::BankForks,
|
||||||
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
blockstore::{self, Blockstore, MAX_DATA_SHREDS_PER_SLOT},
|
||||||
use solana_ledger::shred::Shred;
|
leader_schedule_cache::LeaderScheduleCache,
|
||||||
|
shred::{Nonce, Shred},
|
||||||
|
};
|
||||||
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
|
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
|
||||||
use solana_rayon_threadlimit::get_thread_count;
|
use solana_rayon_threadlimit::get_thread_count;
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms};
|
||||||
use solana_sdk::timing::duration_as_ms;
|
use std::{
|
||||||
use std::net::UdpSocket;
|
net::{SocketAddr, UdpSocket},
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
sync::atomic::{AtomicBool, Ordering},
|
||||||
use std::sync::{Arc, RwLock};
|
sync::{Arc, RwLock},
|
||||||
use std::thread::{self, Builder, JoinHandle};
|
thread::{self, Builder, JoinHandle},
|
||||||
use std::time::{Duration, Instant};
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
|
fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
|
||||||
if shred.is_data() {
|
if shred.is_data() {
|
||||||
@ -102,8 +108,15 @@ fn run_check_duplicate(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn verify_repair(_shred: &Shred, repair_info: &Option<RepairMeta>) -> bool {
|
||||||
|
repair_info
|
||||||
|
.as_ref()
|
||||||
|
.map(|repair_info| repair_info.nonce == DEFAULT_NONCE)
|
||||||
|
.unwrap_or(true)
|
||||||
|
}
|
||||||
|
|
||||||
fn run_insert<F>(
|
fn run_insert<F>(
|
||||||
shred_receiver: &CrossbeamReceiver<Vec<Shred>>,
|
shred_receiver: &CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||||
handle_duplicate: F,
|
handle_duplicate: F,
|
||||||
@ -112,12 +125,16 @@ where
|
|||||||
F: Fn(Shred) -> (),
|
F: Fn(Shred) -> (),
|
||||||
{
|
{
|
||||||
let timer = Duration::from_millis(200);
|
let timer = Duration::from_millis(200);
|
||||||
let mut shreds = shred_receiver.recv_timeout(timer)?;
|
let (mut shreds, mut repair_infos) = shred_receiver.recv_timeout(timer)?;
|
||||||
|
while let Ok((more_shreds, more_repair_infos)) = shred_receiver.try_recv() {
|
||||||
while let Ok(mut more_shreds) = shred_receiver.try_recv() {
|
shreds.extend(more_shreds);
|
||||||
shreds.append(&mut more_shreds)
|
repair_infos.extend(more_repair_infos);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert_eq!(shreds.len(), repair_infos.len());
|
||||||
|
let mut i = 0;
|
||||||
|
shreds.retain(|shred| (verify_repair(&shred, &repair_infos[i]), i += 1).0);
|
||||||
|
|
||||||
let blockstore_insert_metrics = blockstore.insert_shreds_handle_duplicate(
|
let blockstore_insert_metrics = blockstore.insert_shreds_handle_duplicate(
|
||||||
shreds,
|
shreds,
|
||||||
Some(leader_schedule_cache),
|
Some(leader_schedule_cache),
|
||||||
@ -131,7 +148,7 @@ where
|
|||||||
|
|
||||||
fn recv_window<F>(
|
fn recv_window<F>(
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
insert_shred_sender: &CrossbeamSender<Vec<Shred>>,
|
insert_shred_sender: &CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
||||||
my_pubkey: &Pubkey,
|
my_pubkey: &Pubkey,
|
||||||
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
|
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
|
||||||
retransmit: &PacketSender,
|
retransmit: &PacketSender,
|
||||||
@ -155,7 +172,7 @@ where
|
|||||||
inc_new_counter_debug!("streamer-recv_window-recv", total_packets);
|
inc_new_counter_debug!("streamer-recv_window-recv", total_packets);
|
||||||
|
|
||||||
let last_root = blockstore.last_root();
|
let last_root = blockstore.last_root();
|
||||||
let shreds: Vec<_> = thread_pool.install(|| {
|
let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
|
||||||
packets
|
packets
|
||||||
.par_iter_mut()
|
.par_iter_mut()
|
||||||
.flat_map(|packets| {
|
.flat_map(|packets| {
|
||||||
@ -164,34 +181,59 @@ where
|
|||||||
.iter_mut()
|
.iter_mut()
|
||||||
.filter_map(|packet| {
|
.filter_map(|packet| {
|
||||||
if packet.meta.discard {
|
if packet.meta.discard {
|
||||||
inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1);
|
inc_new_counter_debug!(
|
||||||
|
"streamer-recv_window-invalid_or_unnecessary_packet",
|
||||||
|
1
|
||||||
|
);
|
||||||
None
|
None
|
||||||
} else if let Ok(shred) =
|
} else {
|
||||||
Shred::new_from_serialized_shred(packet.data.to_vec())
|
// shred fetch stage should be sending packets
|
||||||
{
|
// with sufficiently large buffers. Needed to ensure
|
||||||
if shred_filter(&shred, last_root) {
|
// call to `new_from_serialized_shred` is safe.
|
||||||
// Mark slot as dead if the current shred is on the boundary
|
assert_eq!(packet.data.len(), PACKET_DATA_SIZE);
|
||||||
// of max shreds per slot. However, let the current shred
|
let serialized_shred = packet.data.to_vec();
|
||||||
// get retransmitted. It'll allow peer nodes to see this shred
|
if let Ok(shred) = Shred::new_from_serialized_shred(serialized_shred) {
|
||||||
// and trigger them to mark the slot as dead.
|
let repair_info = {
|
||||||
if shred.index() >= (MAX_DATA_SHREDS_PER_SLOT - 1) as u32 {
|
if packet.meta.repair && Shred::is_nonce_unlocked(shred.slot())
|
||||||
let _ = blockstore.set_dead_slot(shred.slot());
|
{
|
||||||
|
if let Some(nonce) = repair_response::nonce(&packet.data) {
|
||||||
|
let repair_info = RepairMeta {
|
||||||
|
_from_addr: packet.meta.addr(),
|
||||||
|
nonce,
|
||||||
|
};
|
||||||
|
Some(repair_info)
|
||||||
|
} else {
|
||||||
|
// If can't parse the nonce, dump the packet
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if shred_filter(&shred, last_root) {
|
||||||
|
// Mark slot as dead if the current shred is on the boundary
|
||||||
|
// of max shreds per slot. However, let the current shred
|
||||||
|
// get retransmitted. It'll allow peer nodes to see this shred
|
||||||
|
// and trigger them to mark the slot as dead.
|
||||||
|
if shred.index() >= (MAX_DATA_SHREDS_PER_SLOT - 1) as u32 {
|
||||||
|
let _ = blockstore.set_dead_slot(shred.slot());
|
||||||
|
}
|
||||||
|
packet.meta.slot = shred.slot();
|
||||||
|
packet.meta.seed = shred.seed();
|
||||||
|
Some((shred, repair_info))
|
||||||
|
} else {
|
||||||
|
packet.meta.discard = true;
|
||||||
|
None
|
||||||
}
|
}
|
||||||
packet.meta.slot = shred.slot();
|
|
||||||
packet.meta.seed = shred.seed();
|
|
||||||
Some(shred)
|
|
||||||
} else {
|
} else {
|
||||||
packet.meta.discard = true;
|
packet.meta.discard = true;
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
packet.meta.discard = true;
|
|
||||||
None
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
})
|
})
|
||||||
.collect()
|
.unzip()
|
||||||
});
|
});
|
||||||
|
|
||||||
trace!("{:?} shreds from packets", shreds.len());
|
trace!("{:?} shreds from packets", shreds.len());
|
||||||
@ -205,7 +247,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
insert_shred_sender.send(shreds)?;
|
insert_shred_sender.send((shreds, repair_infos))?;
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
"Elapsed processing time in recv_window(): {}",
|
"Elapsed processing time in recv_window(): {}",
|
||||||
@ -215,6 +257,11 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct RepairMeta {
|
||||||
|
_from_addr: SocketAddr,
|
||||||
|
nonce: Nonce,
|
||||||
|
}
|
||||||
|
|
||||||
// Implement a destructor for the window_service thread to signal it exited
|
// Implement a destructor for the window_service thread to signal it exited
|
||||||
// even on panics
|
// even on panics
|
||||||
struct Finalizer {
|
struct Finalizer {
|
||||||
@ -336,7 +383,7 @@ impl WindowService {
|
|||||||
exit: &Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||||
insert_receiver: CrossbeamReceiver<Vec<Shred>>,
|
insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
||||||
duplicate_sender: CrossbeamSender<Shred>,
|
duplicate_sender: CrossbeamSender<Shred>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
let exit = exit.clone();
|
let exit = exit.clone();
|
||||||
@ -377,7 +424,7 @@ impl WindowService {
|
|||||||
id: Pubkey,
|
id: Pubkey,
|
||||||
exit: &Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
insert_sender: CrossbeamSender<Vec<Shred>>,
|
insert_sender: CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
||||||
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||||
shred_filter: F,
|
shred_filter: F,
|
||||||
bank_forks: Option<Arc<RwLock<BankForks>>>,
|
bank_forks: Option<Arc<RwLock<BankForks>>>,
|
||||||
@ -483,12 +530,11 @@ mod test {
|
|||||||
repair_service::RepairSlotRange,
|
repair_service::RepairSlotRange,
|
||||||
};
|
};
|
||||||
use rand::thread_rng;
|
use rand::thread_rng;
|
||||||
use solana_ledger::shred::DataShredHeader;
|
|
||||||
use solana_ledger::{
|
use solana_ledger::{
|
||||||
blockstore::{make_many_slot_entries, Blockstore},
|
blockstore::{make_many_slot_entries, Blockstore},
|
||||||
entry::{create_ticks, Entry},
|
entry::{create_ticks, Entry},
|
||||||
get_tmp_ledger_path,
|
get_tmp_ledger_path,
|
||||||
shred::Shredder,
|
shred::{DataShredHeader, Shredder, NONCE_SHRED_PAYLOAD_SIZE},
|
||||||
};
|
};
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
clock::Slot,
|
clock::Slot,
|
||||||
@ -562,8 +608,12 @@ mod test {
|
|||||||
|
|
||||||
// If it's a coding shred, test that slot >= root
|
// If it's a coding shred, test that slot >= root
|
||||||
let (common, coding) = Shredder::new_coding_shred_header(5, 5, 5, 6, 6, 0, 0);
|
let (common, coding) = Shredder::new_coding_shred_header(5, 5, 5, 6, 6, 0, 0);
|
||||||
let mut coding_shred =
|
let mut coding_shred = Shred::new_empty_from_header(
|
||||||
Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
|
common,
|
||||||
|
DataShredHeader::default(),
|
||||||
|
coding,
|
||||||
|
NONCE_SHRED_PAYLOAD_SIZE,
|
||||||
|
);
|
||||||
Shredder::sign_shred(&leader_keypair, &mut coding_shred);
|
Shredder::sign_shred(&leader_keypair, &mut coding_shred);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 0, 0),
|
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 0, 0),
|
||||||
|
@ -1982,10 +1982,11 @@ impl Blockstore {
|
|||||||
let data_shreds = data_shreds?;
|
let data_shreds = data_shreds?;
|
||||||
assert!(data_shreds.last().unwrap().data_complete());
|
assert!(data_shreds.last().unwrap().data_complete());
|
||||||
|
|
||||||
let deshred_payload = Shredder::deshred(&data_shreds).map_err(|_| {
|
let deshred_payload = Shredder::deshred(&data_shreds).map_err(|e| {
|
||||||
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
|
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!(
|
||||||
"Could not reconstruct data block from constituent shreds".to_string(),
|
"Could not reconstruct data block from constituent shreds, error: {:?}",
|
||||||
)))
|
e
|
||||||
|
))))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
debug!("{:?} shreds in last FEC set", data_shreds.len(),);
|
debug!("{:?} shreds in last FEC set", data_shreds.len(),);
|
||||||
@ -2830,7 +2831,7 @@ pub mod tests {
|
|||||||
entry::{next_entry, next_entry_mut},
|
entry::{next_entry, next_entry_mut},
|
||||||
genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
||||||
leader_schedule::{FixedSchedule, LeaderSchedule},
|
leader_schedule::{FixedSchedule, LeaderSchedule},
|
||||||
shred::{max_ticks_per_n_shreds, DataShredHeader},
|
shred::{max_ticks_per_n_shreds, DataShredHeader, NONCE_SHRED_PAYLOAD_SIZE},
|
||||||
};
|
};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use bincode::serialize;
|
use bincode::serialize;
|
||||||
@ -2980,7 +2981,7 @@ pub mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_insert_get_bytes() {
|
fn test_insert_get_bytes() {
|
||||||
// Create enough entries to ensure there are at least two shreds created
|
// Create enough entries to ensure there are at least two shreds created
|
||||||
let num_entries = max_ticks_per_n_shreds(1) + 1;
|
let num_entries = max_ticks_per_n_shreds(1, None) + 1;
|
||||||
assert!(num_entries > 1);
|
assert!(num_entries > 1);
|
||||||
|
|
||||||
let (mut shreds, _) = make_slot_entries(0, 0, num_entries);
|
let (mut shreds, _) = make_slot_entries(0, 0, num_entries);
|
||||||
@ -3220,7 +3221,7 @@ pub mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_insert_data_shreds_basic() {
|
fn test_insert_data_shreds_basic() {
|
||||||
// Create enough entries to ensure there are at least two shreds created
|
// Create enough entries to ensure there are at least two shreds created
|
||||||
let num_entries = max_ticks_per_n_shreds(1) + 1;
|
let num_entries = max_ticks_per_n_shreds(1, None) + 1;
|
||||||
assert!(num_entries > 1);
|
assert!(num_entries > 1);
|
||||||
|
|
||||||
let (mut shreds, entries) = make_slot_entries(0, 0, num_entries);
|
let (mut shreds, entries) = make_slot_entries(0, 0, num_entries);
|
||||||
@ -3267,7 +3268,7 @@ pub mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_insert_data_shreds_reverse() {
|
fn test_insert_data_shreds_reverse() {
|
||||||
let num_shreds = 10;
|
let num_shreds = 10;
|
||||||
let num_entries = max_ticks_per_n_shreds(num_shreds);
|
let num_entries = max_ticks_per_n_shreds(num_shreds, None);
|
||||||
let (mut shreds, entries) = make_slot_entries(0, 0, num_entries);
|
let (mut shreds, entries) = make_slot_entries(0, 0, num_entries);
|
||||||
let num_shreds = shreds.len() as u64;
|
let num_shreds = shreds.len() as u64;
|
||||||
|
|
||||||
@ -3444,7 +3445,7 @@ pub mod tests {
|
|||||||
{
|
{
|
||||||
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
||||||
// Create enough entries to ensure there are at least two shreds created
|
// Create enough entries to ensure there are at least two shreds created
|
||||||
let min_entries = max_ticks_per_n_shreds(1) + 1;
|
let min_entries = max_ticks_per_n_shreds(1, None) + 1;
|
||||||
for i in 0..4 {
|
for i in 0..4 {
|
||||||
let slot = i;
|
let slot = i;
|
||||||
let parent_slot = if i == 0 { 0 } else { i - 1 };
|
let parent_slot = if i == 0 { 0 } else { i - 1 };
|
||||||
@ -3871,7 +3872,7 @@ pub mod tests {
|
|||||||
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
||||||
let num_slots = 15;
|
let num_slots = 15;
|
||||||
// Create enough entries to ensure there are at least two shreds created
|
// Create enough entries to ensure there are at least two shreds created
|
||||||
let entries_per_slot = max_ticks_per_n_shreds(1) + 1;
|
let entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
|
||||||
assert!(entries_per_slot > 1);
|
assert!(entries_per_slot > 1);
|
||||||
|
|
||||||
let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
|
let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
|
||||||
@ -4241,7 +4242,7 @@ pub mod tests {
|
|||||||
let gap: u64 = 10;
|
let gap: u64 = 10;
|
||||||
assert!(gap > 3);
|
assert!(gap > 3);
|
||||||
// Create enough entries to ensure there are at least two shreds created
|
// Create enough entries to ensure there are at least two shreds created
|
||||||
let num_entries = max_ticks_per_n_shreds(1) + 1;
|
let num_entries = max_ticks_per_n_shreds(1, None) + 1;
|
||||||
let entries = create_ticks(num_entries, 0, Hash::default());
|
let entries = create_ticks(num_entries, 0, Hash::default());
|
||||||
let mut shreds = entries_to_test_shreds(entries, slot, 0, true, 0);
|
let mut shreds = entries_to_test_shreds(entries, slot, 0, true, 0);
|
||||||
let num_shreds = shreds.len();
|
let num_shreds = shreds.len();
|
||||||
@ -4553,6 +4554,7 @@ pub mod tests {
|
|||||||
shred.clone(),
|
shred.clone(),
|
||||||
DataShredHeader::default(),
|
DataShredHeader::default(),
|
||||||
coding.clone(),
|
coding.clone(),
|
||||||
|
NONCE_SHRED_PAYLOAD_SIZE,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Insert a good coding shred
|
// Insert a good coding shred
|
||||||
@ -4585,6 +4587,7 @@ pub mod tests {
|
|||||||
shred.clone(),
|
shred.clone(),
|
||||||
DataShredHeader::default(),
|
DataShredHeader::default(),
|
||||||
coding.clone(),
|
coding.clone(),
|
||||||
|
NONCE_SHRED_PAYLOAD_SIZE,
|
||||||
);
|
);
|
||||||
let index = index_cf.get(shred.slot).unwrap().unwrap();
|
let index = index_cf.get(shred.slot).unwrap().unwrap();
|
||||||
assert!(Blockstore::should_insert_coding_shred(
|
assert!(Blockstore::should_insert_coding_shred(
|
||||||
@ -4600,6 +4603,7 @@ pub mod tests {
|
|||||||
shred.clone(),
|
shred.clone(),
|
||||||
DataShredHeader::default(),
|
DataShredHeader::default(),
|
||||||
coding.clone(),
|
coding.clone(),
|
||||||
|
NONCE_SHRED_PAYLOAD_SIZE,
|
||||||
);
|
);
|
||||||
let index = coding_shred.coding_header.position - 1;
|
let index = coding_shred.coding_header.position - 1;
|
||||||
coding_shred.set_index(index as u32);
|
coding_shred.set_index(index as u32);
|
||||||
@ -4618,6 +4622,7 @@ pub mod tests {
|
|||||||
shred.clone(),
|
shred.clone(),
|
||||||
DataShredHeader::default(),
|
DataShredHeader::default(),
|
||||||
coding.clone(),
|
coding.clone(),
|
||||||
|
NONCE_SHRED_PAYLOAD_SIZE,
|
||||||
);
|
);
|
||||||
coding_shred.coding_header.num_coding_shreds = 0;
|
coding_shred.coding_header.num_coding_shreds = 0;
|
||||||
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
|
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
|
||||||
@ -4634,6 +4639,7 @@ pub mod tests {
|
|||||||
shred.clone(),
|
shred.clone(),
|
||||||
DataShredHeader::default(),
|
DataShredHeader::default(),
|
||||||
coding.clone(),
|
coding.clone(),
|
||||||
|
NONCE_SHRED_PAYLOAD_SIZE,
|
||||||
);
|
);
|
||||||
coding_shred.coding_header.num_coding_shreds = coding_shred.coding_header.position;
|
coding_shred.coding_header.num_coding_shreds = coding_shred.coding_header.position;
|
||||||
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
|
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
|
||||||
@ -4651,6 +4657,7 @@ pub mod tests {
|
|||||||
shred.clone(),
|
shred.clone(),
|
||||||
DataShredHeader::default(),
|
DataShredHeader::default(),
|
||||||
coding.clone(),
|
coding.clone(),
|
||||||
|
NONCE_SHRED_PAYLOAD_SIZE,
|
||||||
);
|
);
|
||||||
coding_shred.common_header.fec_set_index = std::u32::MAX - 1;
|
coding_shred.common_header.fec_set_index = std::u32::MAX - 1;
|
||||||
coding_shred.coding_header.num_coding_shreds = 3;
|
coding_shred.coding_header.num_coding_shreds = 3;
|
||||||
@ -4683,6 +4690,7 @@ pub mod tests {
|
|||||||
shred.clone(),
|
shred.clone(),
|
||||||
DataShredHeader::default(),
|
DataShredHeader::default(),
|
||||||
coding.clone(),
|
coding.clone(),
|
||||||
|
NONCE_SHRED_PAYLOAD_SIZE,
|
||||||
);
|
);
|
||||||
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
|
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
|
||||||
coding_shred.set_slot(*last_root.read().unwrap());
|
coding_shred.set_slot(*last_root.read().unwrap());
|
||||||
|
@ -9,7 +9,7 @@ use rayon::{
|
|||||||
slice::ParallelSlice,
|
slice::ParallelSlice,
|
||||||
ThreadPool,
|
ThreadPool,
|
||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize, Serializer};
|
||||||
use solana_metrics::datapoint_debug;
|
use solana_metrics::datapoint_debug;
|
||||||
use solana_perf::packet::Packet;
|
use solana_perf::packet::Packet;
|
||||||
use solana_rayon_threadlimit::get_thread_count;
|
use solana_rayon_threadlimit::get_thread_count;
|
||||||
@ -24,25 +24,33 @@ use std::mem::size_of;
|
|||||||
use std::{sync::Arc, time::Instant};
|
use std::{sync::Arc, time::Instant};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
|
pub type Nonce = u32;
|
||||||
|
|
||||||
/// The following constants are computed by hand, and hardcoded.
|
/// The following constants are computed by hand, and hardcoded.
|
||||||
/// `test_shred_constants` ensures that the values are correct.
|
/// `test_shred_constants` ensures that the values are correct.
|
||||||
/// Constants are used over lazy_static for performance reasons.
|
/// Constants are used over lazy_static for performance reasons.
|
||||||
pub const SIZE_OF_COMMON_SHRED_HEADER: usize = 83;
|
pub const SIZE_OF_COMMON_SHRED_HEADER: usize = 83;
|
||||||
pub const SIZE_OF_DATA_SHRED_HEADER: usize = 3;
|
pub const SIZE_OF_DATA_SHRED_HEADER: usize = 3;
|
||||||
|
pub const SIZE_OF_DATA_SHRED_HEADER_SIZE_FIELD: usize = 2;
|
||||||
pub const SIZE_OF_CODING_SHRED_HEADER: usize = 6;
|
pub const SIZE_OF_CODING_SHRED_HEADER: usize = 6;
|
||||||
pub const SIZE_OF_SIGNATURE: usize = 64;
|
pub const SIZE_OF_SIGNATURE: usize = 64;
|
||||||
pub const SIZE_OF_SHRED_TYPE: usize = 1;
|
pub const SIZE_OF_SHRED_TYPE: usize = 1;
|
||||||
pub const SIZE_OF_SHRED_SLOT: usize = 8;
|
pub const SIZE_OF_SHRED_SLOT: usize = 8;
|
||||||
pub const SIZE_OF_SHRED_INDEX: usize = 4;
|
pub const SIZE_OF_SHRED_INDEX: usize = 4;
|
||||||
|
pub const SIZE_OF_NONCE: usize = 4;
|
||||||
pub const SIZE_OF_DATA_SHRED_IGNORED_TAIL: usize =
|
pub const SIZE_OF_DATA_SHRED_IGNORED_TAIL: usize =
|
||||||
SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER;
|
SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER;
|
||||||
pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE
|
pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE
|
||||||
- SIZE_OF_COMMON_SHRED_HEADER
|
- SIZE_OF_COMMON_SHRED_HEADER
|
||||||
- SIZE_OF_DATA_SHRED_HEADER
|
- SIZE_OF_DATA_SHRED_HEADER
|
||||||
- SIZE_OF_DATA_SHRED_IGNORED_TAIL;
|
- SIZE_OF_DATA_SHRED_IGNORED_TAIL;
|
||||||
|
pub const SIZE_OF_NONCE_DATA_SHRED_PAYLOAD: usize =
|
||||||
|
SIZE_OF_DATA_SHRED_PAYLOAD - SIZE_OF_NONCE - SIZE_OF_DATA_SHRED_HEADER_SIZE_FIELD;
|
||||||
|
|
||||||
pub const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE;
|
pub const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE;
|
||||||
pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;
|
pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;
|
||||||
|
pub const NONCE_SHRED_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE;
|
||||||
|
pub const UNLOCK_NONCE_SLOT: Slot = 13_115_515;
|
||||||
|
|
||||||
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
|
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
|
||||||
.num_threads(get_thread_count())
|
.num_threads(get_thread_count())
|
||||||
@ -107,6 +115,20 @@ pub struct ShredCommonHeader {
|
|||||||
pub struct DataShredHeader {
|
pub struct DataShredHeader {
|
||||||
pub parent_offset: u16,
|
pub parent_offset: u16,
|
||||||
pub flags: u8,
|
pub flags: u8,
|
||||||
|
#[serde(skip_deserializing)]
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
#[serde(serialize_with = "option_as_u16_serialize")]
|
||||||
|
pub size: Option<u16>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::trivially_copy_pass_by_ref)]
|
||||||
|
fn option_as_u16_serialize<S>(x: &Option<u16>, s: S) -> std::result::Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
assert!(x.is_some());
|
||||||
|
let num = x.unwrap();
|
||||||
|
s.serialize_u16(num)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The coding shred header has FEC information
|
/// The coding shred header has FEC information
|
||||||
@ -168,7 +190,8 @@ impl Shred {
|
|||||||
version: u16,
|
version: u16,
|
||||||
fec_set_index: u32,
|
fec_set_index: u32,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let mut payload = vec![0; PACKET_DATA_SIZE];
|
let payload_size = Self::get_expected_payload_size_from_slot(slot);
|
||||||
|
let mut payload = vec![0; payload_size];
|
||||||
let common_header = ShredCommonHeader {
|
let common_header = ShredCommonHeader {
|
||||||
slot,
|
slot,
|
||||||
index,
|
index,
|
||||||
@ -177,9 +200,20 @@ impl Shred {
|
|||||||
..ShredCommonHeader::default()
|
..ShredCommonHeader::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let size = if Self::is_nonce_unlocked(slot) {
|
||||||
|
Some(
|
||||||
|
(data.map(|d| d.len()).unwrap_or(0)
|
||||||
|
+ SIZE_OF_DATA_SHRED_HEADER
|
||||||
|
+ SIZE_OF_DATA_SHRED_HEADER_SIZE_FIELD
|
||||||
|
+ SIZE_OF_COMMON_SHRED_HEADER) as u16,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
let mut data_header = DataShredHeader {
|
let mut data_header = DataShredHeader {
|
||||||
parent_offset,
|
parent_offset,
|
||||||
flags: reference_tick.min(SHRED_TICK_REFERENCE_MASK),
|
flags: reference_tick.min(SHRED_TICK_REFERENCE_MASK),
|
||||||
|
size,
|
||||||
};
|
};
|
||||||
|
|
||||||
if is_last_data {
|
if is_last_data {
|
||||||
@ -198,9 +232,10 @@ impl Shred {
|
|||||||
&common_header,
|
&common_header,
|
||||||
)
|
)
|
||||||
.expect("Failed to write header into shred buffer");
|
.expect("Failed to write header into shred buffer");
|
||||||
|
let size_of_data_shred_header = Shredder::get_expected_data_header_size_from_slot(slot);
|
||||||
Self::serialize_obj_into(
|
Self::serialize_obj_into(
|
||||||
&mut start,
|
&mut start,
|
||||||
SIZE_OF_DATA_SHRED_HEADER,
|
size_of_data_shred_header,
|
||||||
&mut payload,
|
&mut payload,
|
||||||
&data_header,
|
&data_header,
|
||||||
)
|
)
|
||||||
@ -218,11 +253,21 @@ impl Shred {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_from_serialized_shred(payload: Vec<u8>) -> Result<Self> {
|
pub fn new_from_serialized_shred(mut payload: Vec<u8>) -> Result<Self> {
|
||||||
let mut start = 0;
|
let mut start = 0;
|
||||||
let common_header: ShredCommonHeader =
|
let common_header: ShredCommonHeader =
|
||||||
Self::deserialize_obj(&mut start, SIZE_OF_COMMON_SHRED_HEADER, &payload)?;
|
Self::deserialize_obj(&mut start, SIZE_OF_COMMON_SHRED_HEADER, &payload)?;
|
||||||
|
|
||||||
|
let slot = common_header.slot;
|
||||||
|
let expected_data_size = Self::get_expected_payload_size_from_slot(slot);
|
||||||
|
// Safe because any payload from the network must have passed through
|
||||||
|
// window service, which implies payload wll be of size
|
||||||
|
// PACKET_DATA_SIZE, and `expected_data_size` <= PACKET_DATA_SIZE.
|
||||||
|
//
|
||||||
|
// On the other hand, if this function is called locally, the payload size should match
|
||||||
|
// the `expected_data_size`.
|
||||||
|
assert!(payload.len() >= expected_data_size);
|
||||||
|
payload.truncate(expected_data_size);
|
||||||
let shred = if common_header.shred_type == ShredType(CODING_SHRED) {
|
let shred = if common_header.shred_type == ShredType(CODING_SHRED) {
|
||||||
let coding_header: CodingShredHeader =
|
let coding_header: CodingShredHeader =
|
||||||
Self::deserialize_obj(&mut start, SIZE_OF_CODING_SHRED_HEADER, &payload)?;
|
Self::deserialize_obj(&mut start, SIZE_OF_CODING_SHRED_HEADER, &payload)?;
|
||||||
@ -233,11 +278,14 @@ impl Shred {
|
|||||||
payload,
|
payload,
|
||||||
}
|
}
|
||||||
} else if common_header.shred_type == ShredType(DATA_SHRED) {
|
} else if common_header.shred_type == ShredType(DATA_SHRED) {
|
||||||
|
// This doesn't need to change since we skip deserialization of the
|
||||||
|
// "size" field in the header for now
|
||||||
|
let size_of_data_shred_header = SIZE_OF_DATA_SHRED_HEADER;
|
||||||
let data_header: DataShredHeader =
|
let data_header: DataShredHeader =
|
||||||
Self::deserialize_obj(&mut start, SIZE_OF_DATA_SHRED_HEADER, &payload)?;
|
Self::deserialize_obj(&mut start, size_of_data_shred_header, &payload)?;
|
||||||
if u64::from(data_header.parent_offset) > common_header.slot {
|
if u64::from(data_header.parent_offset) > common_header.slot {
|
||||||
return Err(ShredError::InvalidParentOffset {
|
return Err(ShredError::InvalidParentOffset {
|
||||||
slot: common_header.slot,
|
slot,
|
||||||
parent_offset: data_header.parent_offset,
|
parent_offset: data_header.parent_offset,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -258,8 +306,10 @@ impl Shred {
|
|||||||
common_header: ShredCommonHeader,
|
common_header: ShredCommonHeader,
|
||||||
data_header: DataShredHeader,
|
data_header: DataShredHeader,
|
||||||
coding_header: CodingShredHeader,
|
coding_header: CodingShredHeader,
|
||||||
|
payload_size: usize,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let mut payload = vec![0; PACKET_DATA_SIZE];
|
assert!(payload_size == NONCE_SHRED_PAYLOAD_SIZE || payload_size == PACKET_DATA_SIZE);
|
||||||
|
let mut payload = vec![0; payload_size];
|
||||||
let mut start = 0;
|
let mut start = 0;
|
||||||
Self::serialize_obj_into(
|
Self::serialize_obj_into(
|
||||||
&mut start,
|
&mut start,
|
||||||
@ -268,10 +318,15 @@ impl Shred {
|
|||||||
&common_header,
|
&common_header,
|
||||||
)
|
)
|
||||||
.expect("Failed to write header into shred buffer");
|
.expect("Failed to write header into shred buffer");
|
||||||
|
let expected_data_header_size = if payload_size == NONCE_SHRED_PAYLOAD_SIZE {
|
||||||
|
SIZE_OF_DATA_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER_SIZE_FIELD
|
||||||
|
} else {
|
||||||
|
SIZE_OF_DATA_SHRED_HEADER
|
||||||
|
};
|
||||||
if common_header.shred_type == ShredType(DATA_SHRED) {
|
if common_header.shred_type == ShredType(DATA_SHRED) {
|
||||||
Self::serialize_obj_into(
|
Self::serialize_obj_into(
|
||||||
&mut start,
|
&mut start,
|
||||||
SIZE_OF_DATA_SHRED_HEADER,
|
expected_data_header_size,
|
||||||
&mut payload,
|
&mut payload,
|
||||||
&data_header,
|
&data_header,
|
||||||
)
|
)
|
||||||
@ -293,11 +348,13 @@ impl Shred {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_empty_data_shred() -> Self {
|
pub fn new_empty_data_shred(payload_size: usize) -> Self {
|
||||||
|
assert!(payload_size == NONCE_SHRED_PAYLOAD_SIZE || payload_size == PACKET_DATA_SIZE);
|
||||||
Self::new_empty_from_header(
|
Self::new_empty_from_header(
|
||||||
ShredCommonHeader::default(),
|
ShredCommonHeader::default(),
|
||||||
DataShredHeader::default(),
|
DataShredHeader::default(),
|
||||||
CodingShredHeader::default(),
|
CodingShredHeader::default(),
|
||||||
|
payload_size,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -403,6 +460,18 @@ impl Shred {
|
|||||||
self.signature()
|
self.signature()
|
||||||
.verify(pubkey.as_ref(), &self.payload[SIZE_OF_SIGNATURE..])
|
.verify(pubkey.as_ref(), &self.payload[SIZE_OF_SIGNATURE..])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_nonce_unlocked(slot: Slot) -> bool {
|
||||||
|
slot > UNLOCK_NONCE_SLOT
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_expected_payload_size_from_slot(slot: Slot) -> usize {
|
||||||
|
if Self::is_nonce_unlocked(slot) {
|
||||||
|
NONCE_SHRED_PAYLOAD_SIZE
|
||||||
|
} else {
|
||||||
|
PACKET_DATA_SIZE
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -467,7 +536,7 @@ impl Shredder {
|
|||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD;
|
let no_header_size = Self::get_expected_data_shred_payload_size_from_slot(self.slot);
|
||||||
let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size;
|
let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size;
|
||||||
let last_shred_index = next_shred_index + num_shreds as u32 - 1;
|
let last_shred_index = next_shred_index + num_shreds as u32 - 1;
|
||||||
|
|
||||||
@ -628,7 +697,8 @@ impl Shredder {
|
|||||||
let start_index = data_shred_batch[0].common_header.index;
|
let start_index = data_shred_batch[0].common_header.index;
|
||||||
|
|
||||||
// All information after coding shred field in a data shred is encoded
|
// All information after coding shred field in a data shred is encoded
|
||||||
let valid_data_len = PACKET_DATA_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL;
|
let expected_payload_size = Shred::get_expected_payload_size_from_slot(slot);
|
||||||
|
let valid_data_len = expected_payload_size - SIZE_OF_DATA_SHRED_IGNORED_TAIL;
|
||||||
let data_ptrs: Vec<_> = data_shred_batch
|
let data_ptrs: Vec<_> = data_shred_batch
|
||||||
.iter()
|
.iter()
|
||||||
.map(|data| &data.payload[..valid_data_len])
|
.map(|data| &data.payload[..valid_data_len])
|
||||||
@ -646,8 +716,12 @@ impl Shredder {
|
|||||||
i,
|
i,
|
||||||
version,
|
version,
|
||||||
);
|
);
|
||||||
let shred =
|
let shred = Shred::new_empty_from_header(
|
||||||
Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header);
|
header,
|
||||||
|
DataShredHeader::default(),
|
||||||
|
coding_header,
|
||||||
|
expected_payload_size,
|
||||||
|
);
|
||||||
coding_shreds.push(shred.payload);
|
coding_shreds.push(shred.payload);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -701,7 +775,10 @@ impl Shredder {
|
|||||||
expected_index: usize,
|
expected_index: usize,
|
||||||
index_found: usize,
|
index_found: usize,
|
||||||
present: &mut [bool],
|
present: &mut [bool],
|
||||||
|
payload_size: usize,
|
||||||
) -> Vec<Vec<u8>> {
|
) -> Vec<Vec<u8>> {
|
||||||
|
// Safe to assert because `new_from_serialized_shred` guarantees the size
|
||||||
|
assert!(payload_size == NONCE_SHRED_PAYLOAD_SIZE || payload_size == PACKET_DATA_SIZE);
|
||||||
let end_index = index_found.saturating_sub(1);
|
let end_index = index_found.saturating_sub(1);
|
||||||
// The index of current shred must be within the range of shreds that are being
|
// The index of current shred must be within the range of shreds that are being
|
||||||
// recovered
|
// recovered
|
||||||
@ -715,9 +792,9 @@ impl Shredder {
|
|||||||
.map(|missing| {
|
.map(|missing| {
|
||||||
present[missing.saturating_sub(first_index_in_fec_set)] = false;
|
present[missing.saturating_sub(first_index_in_fec_set)] = false;
|
||||||
if missing < first_index_in_fec_set + num_data {
|
if missing < first_index_in_fec_set + num_data {
|
||||||
Shred::new_empty_data_shred().payload
|
Shred::new_empty_data_shred(payload_size).payload
|
||||||
} else {
|
} else {
|
||||||
vec![0; PACKET_DATA_SIZE]
|
vec![0; payload_size]
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
@ -732,6 +809,8 @@ impl Shredder {
|
|||||||
first_code_index: usize,
|
first_code_index: usize,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
) -> std::result::Result<Vec<Shred>, reed_solomon_erasure::Error> {
|
) -> std::result::Result<Vec<Shred>, reed_solomon_erasure::Error> {
|
||||||
|
let expected_payload_size =
|
||||||
|
Self::verify_consistent_shred_payload_sizes(&"try_recovery()", &shreds)?;
|
||||||
let mut recovered_data = vec![];
|
let mut recovered_data = vec![];
|
||||||
let fec_set_size = num_data + num_coding;
|
let fec_set_size = num_data + num_coding;
|
||||||
|
|
||||||
@ -751,6 +830,7 @@ impl Shredder {
|
|||||||
next_expected_index,
|
next_expected_index,
|
||||||
index,
|
index,
|
||||||
&mut present,
|
&mut present,
|
||||||
|
expected_payload_size,
|
||||||
);
|
);
|
||||||
blocks.push(shred.payload);
|
blocks.push(shred.payload);
|
||||||
next_expected_index = index + 1;
|
next_expected_index = index + 1;
|
||||||
@ -767,6 +847,7 @@ impl Shredder {
|
|||||||
next_expected_index,
|
next_expected_index,
|
||||||
first_index + fec_set_size,
|
first_index + fec_set_size,
|
||||||
&mut present,
|
&mut present,
|
||||||
|
expected_payload_size,
|
||||||
);
|
);
|
||||||
|
|
||||||
shred_bufs.append(&mut pending_shreds);
|
shred_bufs.append(&mut pending_shreds);
|
||||||
@ -777,7 +858,7 @@ impl Shredder {
|
|||||||
|
|
||||||
let session = Session::new(num_data, num_coding)?;
|
let session = Session::new(num_data, num_coding)?;
|
||||||
|
|
||||||
let valid_data_len = PACKET_DATA_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL;
|
let valid_data_len = expected_payload_size - SIZE_OF_DATA_SHRED_IGNORED_TAIL;
|
||||||
let coding_block_offset = SIZE_OF_CODING_SHRED_HEADER + SIZE_OF_COMMON_SHRED_HEADER;
|
let coding_block_offset = SIZE_OF_CODING_SHRED_HEADER + SIZE_OF_COMMON_SHRED_HEADER;
|
||||||
let mut blocks: Vec<(&mut [u8], bool)> = shred_bufs
|
let mut blocks: Vec<(&mut [u8], bool)> = shred_bufs
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
@ -822,8 +903,11 @@ impl Shredder {
|
|||||||
/// Combines all shreds to recreate the original buffer
|
/// Combines all shreds to recreate the original buffer
|
||||||
pub fn deshred(shreds: &[Shred]) -> std::result::Result<Vec<u8>, reed_solomon_erasure::Error> {
|
pub fn deshred(shreds: &[Shred]) -> std::result::Result<Vec<u8>, reed_solomon_erasure::Error> {
|
||||||
let num_data = shreds.len();
|
let num_data = shreds.len();
|
||||||
let data_shred_bufs = {
|
let expected_payload_size =
|
||||||
|
Self::verify_consistent_shred_payload_sizes(&"deshred()", shreds)?;
|
||||||
|
let (data_shred_bufs, slot) = {
|
||||||
let first_index = shreds.first().unwrap().index() as usize;
|
let first_index = shreds.first().unwrap().index() as usize;
|
||||||
|
let slot = shreds.first().unwrap().slot();
|
||||||
let last_shred = shreds.last().unwrap();
|
let last_shred = shreds.last().unwrap();
|
||||||
let last_index = if last_shred.data_complete() || last_shred.last_in_slot() {
|
let last_index = if last_shred.data_complete() || last_shred.last_in_slot() {
|
||||||
last_shred.index() as usize
|
last_shred.index() as usize
|
||||||
@ -835,10 +919,32 @@ impl Shredder {
|
|||||||
return Err(reed_solomon_erasure::Error::TooFewDataShards);
|
return Err(reed_solomon_erasure::Error::TooFewDataShards);
|
||||||
}
|
}
|
||||||
|
|
||||||
shreds.iter().map(|shred| &shred.payload).collect()
|
(shreds.iter().map(|shred| &shred.payload).collect(), slot)
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(Self::reassemble_payload(num_data, data_shred_bufs))
|
let expected_data_header_size = Self::get_expected_data_header_size_from_slot(slot);
|
||||||
|
Ok(Self::reassemble_payload(
|
||||||
|
num_data,
|
||||||
|
data_shred_bufs,
|
||||||
|
expected_payload_size,
|
||||||
|
expected_data_header_size,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_expected_data_shred_payload_size_from_slot(slot: Slot) -> usize {
|
||||||
|
if Shred::is_nonce_unlocked(slot) {
|
||||||
|
SIZE_OF_NONCE_DATA_SHRED_PAYLOAD
|
||||||
|
} else {
|
||||||
|
SIZE_OF_DATA_SHRED_PAYLOAD
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_expected_data_header_size_from_slot(slot: Slot) -> usize {
|
||||||
|
if Shred::is_nonce_unlocked(slot) {
|
||||||
|
SIZE_OF_DATA_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER_SIZE_FIELD
|
||||||
|
} else {
|
||||||
|
SIZE_OF_DATA_SHRED_HEADER
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_shred_index(
|
fn get_shred_index(
|
||||||
@ -854,26 +960,60 @@ impl Shredder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reassemble_payload(num_data: usize, data_shred_bufs: Vec<&Vec<u8>>) -> Vec<u8> {
|
fn reassemble_payload(
|
||||||
let valid_data_len = PACKET_DATA_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL;
|
num_data: usize,
|
||||||
|
data_shred_bufs: Vec<&Vec<u8>>,
|
||||||
|
expected_payload_size: usize,
|
||||||
|
expected_data_header_size: usize,
|
||||||
|
) -> Vec<u8> {
|
||||||
|
let valid_data_len = expected_payload_size - SIZE_OF_DATA_SHRED_IGNORED_TAIL;
|
||||||
data_shred_bufs[..num_data]
|
data_shred_bufs[..num_data]
|
||||||
.iter()
|
.iter()
|
||||||
.flat_map(|data| {
|
.flat_map(|data| {
|
||||||
let offset = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER;
|
let offset = SIZE_OF_COMMON_SHRED_HEADER + expected_data_header_size;
|
||||||
data[offset..valid_data_len].iter()
|
data[offset..valid_data_len].iter()
|
||||||
})
|
})
|
||||||
.cloned()
|
.cloned()
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn verify_consistent_shred_payload_sizes(
|
||||||
|
caller: &str,
|
||||||
|
shreds: &[Shred],
|
||||||
|
) -> std::result::Result<usize, reed_solomon_erasure::Error> {
|
||||||
|
if shreds.is_empty() {
|
||||||
|
return Err(reed_solomon_erasure::Error::TooFewShardsPresent);
|
||||||
|
}
|
||||||
|
let slot = shreds[0].slot();
|
||||||
|
let expected_payload_size = Shred::get_expected_payload_size_from_slot(slot);
|
||||||
|
for shred in shreds {
|
||||||
|
if shred.payload.len() != expected_payload_size {
|
||||||
|
error!(
|
||||||
|
"{} Shreds for slot: {} are inconsistent sizes. One shred: {} Another shred: {}",
|
||||||
|
caller,
|
||||||
|
slot,
|
||||||
|
expected_payload_size,
|
||||||
|
shred.payload.len()
|
||||||
|
);
|
||||||
|
return Err(reed_solomon_erasure::Error::IncorrectShardSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(expected_payload_size)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn max_ticks_per_n_shreds(num_shreds: u64) -> u64 {
|
pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option<usize>) -> u64 {
|
||||||
let ticks = create_ticks(1, 0, Hash::default());
|
let ticks = create_ticks(1, 0, Hash::default());
|
||||||
max_entries_per_n_shred(&ticks[0], num_shreds)
|
max_entries_per_n_shred(&ticks[0], num_shreds, shred_data_size)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn max_entries_per_n_shred(entry: &Entry, num_shreds: u64) -> u64 {
|
pub fn max_entries_per_n_shred(
|
||||||
let shred_data_size = SIZE_OF_DATA_SHRED_PAYLOAD as u64;
|
entry: &Entry,
|
||||||
|
num_shreds: u64,
|
||||||
|
shred_data_size: Option<usize>,
|
||||||
|
) -> u64 {
|
||||||
|
let shred_data_size = shred_data_size.unwrap_or(SIZE_OF_NONCE_DATA_SHRED_PAYLOAD) as u64;
|
||||||
let vec_size = bincode::serialized_size(&vec![entry]).unwrap();
|
let vec_size = bincode::serialized_size(&vec![entry]).unwrap();
|
||||||
let entry_size = bincode::serialized_size(entry).unwrap();
|
let entry_size = bincode::serialized_size(entry).unwrap();
|
||||||
let count_size = vec_size - entry_size;
|
let count_size = vec_size - entry_size;
|
||||||
@ -891,7 +1031,8 @@ pub fn verify_test_data_shred(
|
|||||||
is_last_in_slot: bool,
|
is_last_in_slot: bool,
|
||||||
is_last_in_fec_set: bool,
|
is_last_in_fec_set: bool,
|
||||||
) {
|
) {
|
||||||
assert_eq!(shred.payload.len(), PACKET_DATA_SIZE);
|
let expected_payload_size = Shred::get_expected_payload_size_from_slot(slot);
|
||||||
|
assert_eq!(shred.payload.len(), expected_payload_size);
|
||||||
assert!(shred.is_data());
|
assert!(shred.is_data());
|
||||||
assert_eq!(shred.index(), index);
|
assert_eq!(shred.index(), index);
|
||||||
assert_eq!(shred.slot(), slot);
|
assert_eq!(shred.slot(), slot);
|
||||||
@ -932,6 +1073,14 @@ pub mod tests {
|
|||||||
SIZE_OF_DATA_SHRED_HEADER,
|
SIZE_OF_DATA_SHRED_HEADER,
|
||||||
serialized_size(&DataShredHeader::default()).unwrap() as usize
|
serialized_size(&DataShredHeader::default()).unwrap() as usize
|
||||||
);
|
);
|
||||||
|
let data_shred_header_with_size = DataShredHeader {
|
||||||
|
size: Some(1000),
|
||||||
|
..DataShredHeader::default()
|
||||||
|
};
|
||||||
|
assert_eq!(
|
||||||
|
SIZE_OF_DATA_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER_SIZE_FIELD,
|
||||||
|
serialized_size(&data_shred_header_with_size).unwrap() as usize
|
||||||
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
SIZE_OF_SIGNATURE,
|
SIZE_OF_SIGNATURE,
|
||||||
bincode::serialized_size(&Signature::default()).unwrap() as usize
|
bincode::serialized_size(&Signature::default()).unwrap() as usize
|
||||||
@ -951,17 +1100,16 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn verify_test_code_shred(shred: &Shred, index: u32, slot: Slot, pk: &Pubkey, verify: bool) {
|
fn verify_test_code_shred(shred: &Shred, index: u32, slot: Slot, pk: &Pubkey, verify: bool) {
|
||||||
assert_eq!(shred.payload.len(), PACKET_DATA_SIZE);
|
let expected_payload_size = Shred::get_expected_payload_size_from_slot(slot);
|
||||||
|
assert_eq!(shred.payload.len(), expected_payload_size);
|
||||||
assert!(!shred.is_data());
|
assert!(!shred.is_data());
|
||||||
assert_eq!(shred.index(), index);
|
assert_eq!(shred.index(), index);
|
||||||
assert_eq!(shred.slot(), slot);
|
assert_eq!(shred.slot(), slot);
|
||||||
assert_eq!(verify, shred.verify(pk));
|
assert_eq!(verify, shred.verify(pk));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
fn run_test_data_shredder(slot: Slot) {
|
||||||
fn test_data_shredder() {
|
|
||||||
let keypair = Arc::new(Keypair::new());
|
let keypair = Arc::new(Keypair::new());
|
||||||
let slot = 0x123456789abcdef0;
|
|
||||||
|
|
||||||
// Test that parent cannot be > current slot
|
// Test that parent cannot be > current slot
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
@ -996,7 +1144,7 @@ pub mod tests {
|
|||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let size = serialized_size(&entries).unwrap();
|
let size = serialized_size(&entries).unwrap();
|
||||||
let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD as u64;
|
let no_header_size = Shredder::get_expected_data_shred_payload_size_from_slot(slot) as u64;
|
||||||
let num_expected_data_shreds = (size + no_header_size - 1) / no_header_size;
|
let num_expected_data_shreds = (size + no_header_size - 1) / no_header_size;
|
||||||
let num_expected_coding_shreds =
|
let num_expected_coding_shreds =
|
||||||
Shredder::calculate_num_coding_shreds(num_expected_data_shreds as f32, fec_rate);
|
Shredder::calculate_num_coding_shreds(num_expected_data_shreds as f32, fec_rate);
|
||||||
@ -1051,6 +1199,12 @@ pub mod tests {
|
|||||||
assert_eq!(entries, deshred_entries);
|
assert_eq!(entries, deshred_entries);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_data_shredder() {
|
||||||
|
run_test_data_shredder(UNLOCK_NONCE_SLOT);
|
||||||
|
run_test_data_shredder(UNLOCK_NONCE_SLOT + 1);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_deserialize_shred_payload() {
|
fn test_deserialize_shred_payload() {
|
||||||
let keypair = Arc::new(Keypair::new());
|
let keypair = Arc::new(Keypair::new());
|
||||||
@ -1077,12 +1231,10 @@ pub mod tests {
|
|||||||
assert_eq!(deserialized_shred, *data_shreds.last().unwrap());
|
assert_eq!(deserialized_shred, *data_shreds.last().unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
fn run_test_shred_reference_tick(slot: Slot) {
|
||||||
fn test_shred_reference_tick() {
|
|
||||||
let keypair = Arc::new(Keypair::new());
|
let keypair = Arc::new(Keypair::new());
|
||||||
let slot = 1;
|
|
||||||
|
|
||||||
let parent_slot = 0;
|
let parent_slot = slot - 1;
|
||||||
let shredder = Shredder::new(slot, parent_slot, 0.0, keypair.clone(), 5, 0)
|
let shredder = Shredder::new(slot, parent_slot, 0.0, keypair.clone(), 5, 0)
|
||||||
.expect("Failed in creating shredder");
|
.expect("Failed in creating shredder");
|
||||||
|
|
||||||
@ -1107,6 +1259,12 @@ pub mod tests {
|
|||||||
assert_eq!(deserialized_shred.reference_tick(), 5);
|
assert_eq!(deserialized_shred.reference_tick(), 5);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_shred_reference_tick() {
|
||||||
|
run_test_shred_reference_tick(UNLOCK_NONCE_SLOT);
|
||||||
|
run_test_shred_reference_tick(UNLOCK_NONCE_SLOT + 1);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_shred_reference_tick_overflow() {
|
fn test_shred_reference_tick_overflow() {
|
||||||
let keypair = Arc::new(Keypair::new());
|
let keypair = Arc::new(Keypair::new());
|
||||||
@ -1143,22 +1301,21 @@ pub mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
fn run_test_data_and_code_shredder(slot: Slot) {
|
||||||
fn test_data_and_code_shredder() {
|
|
||||||
let keypair = Arc::new(Keypair::new());
|
let keypair = Arc::new(Keypair::new());
|
||||||
|
|
||||||
let slot = 0x123456789abcdef0;
|
|
||||||
// Test that FEC rate cannot be > 1.0
|
// Test that FEC rate cannot be > 1.0
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
Shredder::new(slot, slot - 5, 1.001, keypair.clone(), 0, 0),
|
Shredder::new(slot, slot - 5, 1.001, keypair.clone(), 0, 0),
|
||||||
Err(ShredError::InvalidFecRate(_))
|
Err(ShredError::InvalidFecRate(_))
|
||||||
);
|
);
|
||||||
|
|
||||||
let shredder = Shredder::new(0x123456789abcdef0, slot - 5, 1.0, keypair.clone(), 0, 0)
|
let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0)
|
||||||
.expect("Failed in creating shredder");
|
.expect("Failed in creating shredder");
|
||||||
|
|
||||||
// Create enough entries to make > 1 shred
|
// Create enough entries to make > 1 shred
|
||||||
let num_entries = max_ticks_per_n_shreds(1) + 1;
|
let no_header_size = Shredder::get_expected_data_shred_payload_size_from_slot(slot);
|
||||||
|
let num_entries = max_ticks_per_n_shreds(1, Some(no_header_size)) + 1;
|
||||||
let entries: Vec<_> = (0..num_entries)
|
let entries: Vec<_> = (0..num_entries)
|
||||||
.map(|_| {
|
.map(|_| {
|
||||||
let keypair0 = Keypair::new();
|
let keypair0 = Keypair::new();
|
||||||
@ -1190,9 +1347,13 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_recovery_and_reassembly() {
|
fn test_data_and_code_shredder() {
|
||||||
|
run_test_data_and_code_shredder(UNLOCK_NONCE_SLOT);
|
||||||
|
run_test_data_and_code_shredder(UNLOCK_NONCE_SLOT + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_test_recovery_and_reassembly(slot: Slot) {
|
||||||
let keypair = Arc::new(Keypair::new());
|
let keypair = Arc::new(Keypair::new());
|
||||||
let slot = 0x123456789abcdef0;
|
|
||||||
let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0)
|
let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0)
|
||||||
.expect("Failed in creating shredder");
|
.expect("Failed in creating shredder");
|
||||||
|
|
||||||
@ -1202,7 +1363,9 @@ pub mod tests {
|
|||||||
let entry = Entry::new(&Hash::default(), 1, vec![tx0]);
|
let entry = Entry::new(&Hash::default(), 1, vec![tx0]);
|
||||||
|
|
||||||
let num_data_shreds: usize = 5;
|
let num_data_shreds: usize = 5;
|
||||||
let num_entries = max_entries_per_n_shred(&entry, num_data_shreds as u64);
|
let no_header_size = Shredder::get_expected_data_shred_payload_size_from_slot(slot);
|
||||||
|
let num_entries =
|
||||||
|
max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(no_header_size));
|
||||||
let entries: Vec<_> = (0..num_entries)
|
let entries: Vec<_> = (0..num_entries)
|
||||||
.map(|_| {
|
.map(|_| {
|
||||||
let keypair0 = Keypair::new();
|
let keypair0 = Keypair::new();
|
||||||
@ -1441,6 +1604,12 @@ pub mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_recovery_and_reassembly() {
|
||||||
|
run_test_recovery_and_reassembly(UNLOCK_NONCE_SLOT);
|
||||||
|
run_test_recovery_and_reassembly(UNLOCK_NONCE_SLOT + 1);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_shred_version() {
|
fn test_shred_version() {
|
||||||
let keypair = Arc::new(Keypair::new());
|
let keypair = Arc::new(Keypair::new());
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
#![allow(clippy::implicit_hasher)]
|
#![allow(clippy::implicit_hasher)]
|
||||||
use crate::shred::ShredType;
|
use crate::shred::{Shred, ShredType, SIZE_OF_NONCE};
|
||||||
use rayon::{
|
use rayon::{
|
||||||
iter::{
|
iter::{
|
||||||
IndexedParallelIterator, IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator,
|
IndexedParallelIterator, IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator,
|
||||||
@ -16,9 +16,12 @@ use solana_perf::{
|
|||||||
sigverify::{self, batch_size, TxOffset},
|
sigverify::{self, batch_size, TxOffset},
|
||||||
};
|
};
|
||||||
use solana_rayon_threadlimit::get_thread_count;
|
use solana_rayon_threadlimit::get_thread_count;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::{
|
||||||
use solana_sdk::signature::Signature;
|
clock::Slot,
|
||||||
use solana_sdk::signature::{Keypair, Signer};
|
pubkey::Pubkey,
|
||||||
|
signature::Signature,
|
||||||
|
signature::{Keypair, Signer},
|
||||||
|
};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{collections::HashMap, mem::size_of};
|
use std::{collections::HashMap, mem::size_of};
|
||||||
|
|
||||||
@ -40,13 +43,12 @@ lazy_static! {
|
|||||||
/// ...
|
/// ...
|
||||||
/// }
|
/// }
|
||||||
/// Signature is the first thing in the packet, and slot is the first thing in the signed message.
|
/// Signature is the first thing in the packet, and slot is the first thing in the signed message.
|
||||||
fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap<u64, [u8; 32]>) -> Option<u8> {
|
pub fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap<u64, [u8; 32]>) -> Option<u8> {
|
||||||
let sig_start = 0;
|
let sig_start = 0;
|
||||||
let sig_end = size_of::<Signature>();
|
let sig_end = size_of::<Signature>();
|
||||||
let slot_start = sig_end + size_of::<ShredType>();
|
let slot_start = sig_end + size_of::<ShredType>();
|
||||||
let slot_end = slot_start + size_of::<u64>();
|
let slot_end = slot_start + size_of::<u64>();
|
||||||
let msg_start = sig_end;
|
let msg_start = sig_end;
|
||||||
let msg_end = packet.meta.size;
|
|
||||||
if packet.meta.discard {
|
if packet.meta.discard {
|
||||||
return Some(0);
|
return Some(0);
|
||||||
}
|
}
|
||||||
@ -55,6 +57,11 @@ fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap<u64, [u8; 32]>) -> O
|
|||||||
return Some(0);
|
return Some(0);
|
||||||
}
|
}
|
||||||
let slot: u64 = limited_deserialize(&packet.data[slot_start..slot_end]).ok()?;
|
let slot: u64 = limited_deserialize(&packet.data[slot_start..slot_end]).ok()?;
|
||||||
|
let msg_end = if packet.meta.repair && Shred::is_nonce_unlocked(slot) {
|
||||||
|
packet.meta.size.saturating_sub(SIZE_OF_NONCE)
|
||||||
|
} else {
|
||||||
|
packet.meta.size
|
||||||
|
};
|
||||||
trace!("slot {}", slot);
|
trace!("slot {}", slot);
|
||||||
let pubkey = slot_leaders.get(&slot)?;
|
let pubkey = slot_leaders.get(&slot)?;
|
||||||
if packet.meta.size < sig_end {
|
if packet.meta.size < sig_end {
|
||||||
@ -94,10 +101,10 @@ fn slot_key_data_for_gpu<
|
|||||||
batches: &[Packets],
|
batches: &[Packets],
|
||||||
slot_keys: &HashMap<u64, T>,
|
slot_keys: &HashMap<u64, T>,
|
||||||
recycler_cache: &RecyclerCache,
|
recycler_cache: &RecyclerCache,
|
||||||
) -> (PinnedVec<u8>, TxOffset, usize) {
|
) -> (PinnedVec<u8>, TxOffset, usize, Vec<Vec<Slot>>) {
|
||||||
//TODO: mark Pubkey::default shreds as failed after the GPU returns
|
//TODO: mark Pubkey::default shreds as failed after the GPU returns
|
||||||
assert_eq!(slot_keys.get(&std::u64::MAX), Some(&T::default()));
|
assert_eq!(slot_keys.get(&std::u64::MAX), Some(&T::default()));
|
||||||
let slots: Vec<Vec<u64>> = SIGVERIFY_THREAD_POOL.install(|| {
|
let slots: Vec<Vec<Slot>> = SIGVERIFY_THREAD_POOL.install(|| {
|
||||||
batches
|
batches
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|p| {
|
.map(|p| {
|
||||||
@ -157,7 +164,7 @@ fn slot_key_data_for_gpu<
|
|||||||
trace!("keyvec.len: {}", keyvec.len());
|
trace!("keyvec.len: {}", keyvec.len());
|
||||||
trace!("keyvec: {:?}", keyvec);
|
trace!("keyvec: {:?}", keyvec);
|
||||||
trace!("offsets: {:?}", offsets);
|
trace!("offsets: {:?}", offsets);
|
||||||
(keyvec, offsets, num_in_packets)
|
(keyvec, offsets, num_in_packets, slots)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn vec_size_in_packets(keyvec: &PinnedVec<u8>) -> usize {
|
fn vec_size_in_packets(keyvec: &PinnedVec<u8>) -> usize {
|
||||||
@ -177,6 +184,7 @@ fn shred_gpu_offsets(
|
|||||||
mut pubkeys_end: usize,
|
mut pubkeys_end: usize,
|
||||||
batches: &[Packets],
|
batches: &[Packets],
|
||||||
recycler_cache: &RecyclerCache,
|
recycler_cache: &RecyclerCache,
|
||||||
|
slots: Option<Vec<Vec<Slot>>>,
|
||||||
) -> (TxOffset, TxOffset, TxOffset, Vec<Vec<u32>>) {
|
) -> (TxOffset, TxOffset, TxOffset, Vec<Vec<u32>>) {
|
||||||
let mut signature_offsets = recycler_cache.offsets().allocate("shred_signatures");
|
let mut signature_offsets = recycler_cache.offsets().allocate("shred_signatures");
|
||||||
signature_offsets.set_pinnable();
|
signature_offsets.set_pinnable();
|
||||||
@ -185,13 +193,30 @@ fn shred_gpu_offsets(
|
|||||||
let mut msg_sizes = recycler_cache.offsets().allocate("shred_msg_sizes");
|
let mut msg_sizes = recycler_cache.offsets().allocate("shred_msg_sizes");
|
||||||
msg_sizes.set_pinnable();
|
msg_sizes.set_pinnable();
|
||||||
let mut v_sig_lens = vec![];
|
let mut v_sig_lens = vec![];
|
||||||
for batch in batches {
|
let mut slots_iter;
|
||||||
|
let mut slots_iter_ref: &mut dyn Iterator<Item = Vec<Slot>> = &mut std::iter::repeat(vec![]);
|
||||||
|
if let Some(slots) = slots {
|
||||||
|
slots_iter = slots.into_iter();
|
||||||
|
slots_iter_ref = &mut slots_iter;
|
||||||
|
}
|
||||||
|
for (batch, slots) in batches.iter().zip(slots_iter_ref) {
|
||||||
let mut sig_lens = Vec::new();
|
let mut sig_lens = Vec::new();
|
||||||
for packet in &batch.packets {
|
let mut inner_slot_iter;
|
||||||
|
let mut inner_slot_iter_ref: &mut dyn Iterator<Item = Slot> = &mut std::iter::repeat(0);
|
||||||
|
if !slots.is_empty() {
|
||||||
|
inner_slot_iter = slots.into_iter();
|
||||||
|
inner_slot_iter_ref = &mut inner_slot_iter;
|
||||||
|
};
|
||||||
|
|
||||||
|
for (packet, slot) in batch.packets.iter().zip(inner_slot_iter_ref) {
|
||||||
let sig_start = pubkeys_end;
|
let sig_start = pubkeys_end;
|
||||||
let sig_end = sig_start + size_of::<Signature>();
|
let sig_end = sig_start + size_of::<Signature>();
|
||||||
let msg_start = sig_end;
|
let msg_start = sig_end;
|
||||||
let msg_end = sig_start + packet.meta.size;
|
let msg_end = if packet.meta.repair && Shred::is_nonce_unlocked(slot) {
|
||||||
|
sig_start + packet.meta.size.saturating_sub(SIZE_OF_NONCE)
|
||||||
|
} else {
|
||||||
|
sig_start + packet.meta.size
|
||||||
|
};
|
||||||
signature_offsets.push(sig_start as u32);
|
signature_offsets.push(sig_start as u32);
|
||||||
msg_start_offsets.push(msg_start as u32);
|
msg_start_offsets.push(msg_start as u32);
|
||||||
let msg_size = if msg_end < msg_start {
|
let msg_size = if msg_end < msg_start {
|
||||||
@ -222,7 +247,7 @@ pub fn verify_shreds_gpu(
|
|||||||
let mut elems = Vec::new();
|
let mut elems = Vec::new();
|
||||||
let mut rvs = Vec::new();
|
let mut rvs = Vec::new();
|
||||||
let count = batch_size(batches);
|
let count = batch_size(batches);
|
||||||
let (pubkeys, pubkey_offsets, mut num_packets) =
|
let (pubkeys, pubkey_offsets, mut num_packets, slots) =
|
||||||
slot_key_data_for_gpu(0, batches, slot_leaders, recycler_cache);
|
slot_key_data_for_gpu(0, batches, slot_leaders, recycler_cache);
|
||||||
//HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU
|
//HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU
|
||||||
//TODO: GPU needs a more opaque interface, which can handle variable sized structures for data
|
//TODO: GPU needs a more opaque interface, which can handle variable sized structures for data
|
||||||
@ -230,7 +255,7 @@ pub fn verify_shreds_gpu(
|
|||||||
trace!("num_packets: {}", num_packets);
|
trace!("num_packets: {}", num_packets);
|
||||||
trace!("pubkeys_len: {}", pubkeys_len);
|
trace!("pubkeys_len: {}", pubkeys_len);
|
||||||
let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) =
|
let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) =
|
||||||
shred_gpu_offsets(pubkeys_len, batches, recycler_cache);
|
shred_gpu_offsets(pubkeys_len, batches, recycler_cache, Some(slots));
|
||||||
let mut out = recycler_cache.buffer().allocate("out_buffer");
|
let mut out = recycler_cache.buffer().allocate("out_buffer");
|
||||||
out.set_pinnable();
|
out.set_pinnable();
|
||||||
elems.push(
|
elems.push(
|
||||||
@ -367,7 +392,7 @@ pub fn sign_shreds_gpu(
|
|||||||
|
|
||||||
trace!("offset: {}", offset);
|
trace!("offset: {}", offset);
|
||||||
let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) =
|
let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) =
|
||||||
shred_gpu_offsets(offset, batches, recycler_cache);
|
shred_gpu_offsets(offset, batches, recycler_cache, None);
|
||||||
let total_sigs = signature_offsets.len();
|
let total_sigs = signature_offsets.len();
|
||||||
let mut signatures_out = recycler_cache.buffer().allocate("ed25519 signatures");
|
let mut signatures_out = recycler_cache.buffer().allocate("ed25519 signatures");
|
||||||
signatures_out.set_pinnable();
|
signatures_out.set_pinnable();
|
||||||
@ -445,14 +470,12 @@ pub fn sign_shreds_gpu(
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod tests {
|
pub mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::shred::SIZE_OF_DATA_SHRED_PAYLOAD;
|
use crate::shred::{Shred, Shredder, SIZE_OF_DATA_SHRED_PAYLOAD, UNLOCK_NONCE_SLOT};
|
||||||
use crate::shred::{Shred, Shredder};
|
|
||||||
use solana_sdk::signature::{Keypair, Signer};
|
use solana_sdk::signature::{Keypair, Signer};
|
||||||
#[test]
|
|
||||||
fn test_sigverify_shred_cpu() {
|
fn run_test_sigverify_shred_cpu(slot: Slot) {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
let mut packet = Packet::default();
|
let mut packet = Packet::default();
|
||||||
let slot = 0xdeadc0de;
|
|
||||||
let mut shred = Shred::new_from_data(
|
let mut shred = Shred::new_from_data(
|
||||||
slot,
|
slot,
|
||||||
0xc0de,
|
0xc0de,
|
||||||
@ -492,10 +515,14 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sigverify_shreds_cpu() {
|
fn test_sigverify_shred_cpu() {
|
||||||
|
run_test_sigverify_shred_cpu(UNLOCK_NONCE_SLOT);
|
||||||
|
run_test_sigverify_shred_cpu(UNLOCK_NONCE_SLOT + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_test_sigverify_shreds_cpu(slot: Slot) {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
let mut batch = [Packets::default()];
|
let mut batch = [Packets::default()];
|
||||||
let slot = 0xdeadc0de;
|
|
||||||
let mut shred = Shred::new_from_data(
|
let mut shred = Shred::new_from_data(
|
||||||
slot,
|
slot,
|
||||||
0xc0de,
|
0xc0de,
|
||||||
@ -542,12 +569,16 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sigverify_shreds_gpu() {
|
fn test_sigverify_shreds_cpu() {
|
||||||
|
run_test_sigverify_shreds_cpu(UNLOCK_NONCE_SLOT);
|
||||||
|
run_test_sigverify_shreds_cpu(UNLOCK_NONCE_SLOT + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_test_sigverify_shreds_gpu(slot: Slot) {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
let recycler_cache = RecyclerCache::default();
|
let recycler_cache = RecyclerCache::default();
|
||||||
|
|
||||||
let mut batch = [Packets::default()];
|
let mut batch = [Packets::default()];
|
||||||
let slot = 0xdeadc0de;
|
|
||||||
let mut shred = Shred::new_from_data(
|
let mut shred = Shred::new_from_data(
|
||||||
slot,
|
slot,
|
||||||
0xc0de,
|
0xc0de,
|
||||||
@ -603,14 +634,18 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sigverify_shreds_sign_gpu() {
|
fn test_sigverify_shreds_gpu() {
|
||||||
|
run_test_sigverify_shreds_gpu(UNLOCK_NONCE_SLOT);
|
||||||
|
run_test_sigverify_shreds_gpu(UNLOCK_NONCE_SLOT + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_test_sigverify_shreds_sign_gpu(slot: Slot) {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
let recycler_cache = RecyclerCache::default();
|
let recycler_cache = RecyclerCache::default();
|
||||||
|
|
||||||
let mut packets = Packets::default();
|
let mut packets = Packets::default();
|
||||||
let num_packets = 32;
|
let num_packets = 32;
|
||||||
let num_batches = 100;
|
let num_batches = 100;
|
||||||
let slot = 0xdeadc0de;
|
|
||||||
packets.packets.resize(num_packets, Packet::default());
|
packets.packets.resize(num_packets, Packet::default());
|
||||||
for (i, p) in packets.packets.iter_mut().enumerate() {
|
for (i, p) in packets.packets.iter_mut().enumerate() {
|
||||||
let shred = Shred::new_from_data(
|
let shred = Shred::new_from_data(
|
||||||
@ -650,11 +685,15 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sigverify_shreds_sign_cpu() {
|
fn test_sigverify_shreds_sign_gpu() {
|
||||||
|
run_test_sigverify_shreds_sign_gpu(UNLOCK_NONCE_SLOT);
|
||||||
|
run_test_sigverify_shreds_sign_gpu(UNLOCK_NONCE_SLOT + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_test_sigverify_shreds_sign_cpu(slot: Slot) {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
|
|
||||||
let mut batch = [Packets::default()];
|
let mut batch = [Packets::default()];
|
||||||
let slot = 0xdeadc0de;
|
|
||||||
let keypair = Keypair::new();
|
let keypair = Keypair::new();
|
||||||
let shred = Shred::new_from_data(
|
let shred = Shred::new_from_data(
|
||||||
slot,
|
slot,
|
||||||
@ -685,4 +724,10 @@ pub mod tests {
|
|||||||
let rv = verify_shreds_cpu(&batch, &pubkeys);
|
let rv = verify_shreds_cpu(&batch, &pubkeys);
|
||||||
assert_eq!(rv, vec![vec![1]]);
|
assert_eq!(rv, vec![vec![1]]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sigverify_shreds_sign_cpu() {
|
||||||
|
run_test_sigverify_shreds_sign_cpu(UNLOCK_NONCE_SLOT);
|
||||||
|
run_test_sigverify_shreds_sign_cpu(UNLOCK_NONCE_SLOT + 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,16 +1,15 @@
|
|||||||
use solana_ledger::entry::Entry;
|
use solana_ledger::entry::Entry;
|
||||||
use solana_ledger::shred::{
|
use solana_ledger::shred::{
|
||||||
max_entries_per_n_shred, verify_test_data_shred, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK,
|
max_entries_per_n_shred, verify_test_data_shred, Shred, Shredder,
|
||||||
|
MAX_DATA_SHREDS_PER_FEC_BLOCK, UNLOCK_NONCE_SLOT,
|
||||||
};
|
};
|
||||||
use solana_sdk::signature::{Keypair, Signer};
|
use solana_sdk::signature::{Keypair, Signer};
|
||||||
use solana_sdk::{hash::Hash, system_transaction};
|
use solana_sdk::{clock::Slot, hash::Hash, system_transaction};
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[test]
|
fn run_test_multi_fec_block_coding(slot: Slot) {
|
||||||
fn test_multi_fec_block_coding() {
|
|
||||||
let keypair = Arc::new(Keypair::new());
|
let keypair = Arc::new(Keypair::new());
|
||||||
let slot = 0x123456789abcdef0;
|
|
||||||
let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0)
|
let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0)
|
||||||
.expect("Failed in creating shredder");
|
.expect("Failed in creating shredder");
|
||||||
|
|
||||||
@ -20,7 +19,8 @@ fn test_multi_fec_block_coding() {
|
|||||||
let keypair1 = Keypair::new();
|
let keypair1 = Keypair::new();
|
||||||
let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());
|
let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());
|
||||||
let entry = Entry::new(&Hash::default(), 1, vec![tx0]);
|
let entry = Entry::new(&Hash::default(), 1, vec![tx0]);
|
||||||
let num_entries = max_entries_per_n_shred(&entry, num_data_shreds as u64);
|
let no_header_size = Shredder::get_expected_data_shred_payload_size_from_slot(slot);
|
||||||
|
let num_entries = max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(no_header_size));
|
||||||
|
|
||||||
let entries: Vec<_> = (0..num_entries)
|
let entries: Vec<_> = (0..num_entries)
|
||||||
.map(|_| {
|
.map(|_| {
|
||||||
@ -94,3 +94,9 @@ fn test_multi_fec_block_coding() {
|
|||||||
let result = Shredder::deshred(&all_shreds[..]).unwrap();
|
let result = Shredder::deshred(&all_shreds[..]).unwrap();
|
||||||
assert_eq!(serialized_entries[..], result[..serialized_entries.len()]);
|
assert_eq!(serialized_entries[..], result[..serialized_entries.len()]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_multi_fec_block_coding() {
|
||||||
|
run_test_multi_fec_block_coding(UNLOCK_NONCE_SLOT);
|
||||||
|
run_test_multi_fec_block_coding(UNLOCK_NONCE_SLOT + 1);
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user