Factor repair from gossip (#8044)
This commit is contained in:
@@ -21,7 +21,6 @@ use crate::{
|
||||
crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
|
||||
crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, Vote},
|
||||
packet::{Packet, PACKET_DATA_SIZE},
|
||||
repair_service::RepairType,
|
||||
result::{Error, Result},
|
||||
sendmmsg::{multicast, send_mmsg},
|
||||
weighted_shuffle::{weighted_best, weighted_shuffle},
|
||||
@@ -29,8 +28,7 @@ use crate::{
|
||||
use bincode::{serialize, serialized_size};
|
||||
use core::cmp;
|
||||
use itertools::Itertools;
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore, staking_utils};
|
||||
use solana_ledger::{bank_forks::BankForks, staking_utils};
|
||||
use solana_measure::thread_mem_usage;
|
||||
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
|
||||
use solana_net_utils::{
|
||||
@@ -63,15 +61,12 @@ pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
|
||||
pub const DATA_PLANE_FANOUT: usize = 200;
|
||||
/// milliseconds we sleep for between gossip requests
|
||||
pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
|
||||
|
||||
/// the number of slots to respond with when responding to `Orphan` requests
|
||||
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
|
||||
/// The maximum size of a bloom filter
|
||||
pub const MAX_BLOOM_SIZE: usize = 1028;
|
||||
pub const MAX_BLOOM_SIZE: usize = 1018;
|
||||
/// The maximum size of a protocol payload
|
||||
const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE;
|
||||
/// The largest protocol header size
|
||||
const MAX_PROTOCOL_HEADER_SIZE: u64 = 204;
|
||||
const MAX_PROTOCOL_HEADER_SIZE: u64 = 214;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum ClusterInfoError {
|
||||
@@ -174,12 +169,6 @@ enum Protocol {
|
||||
PullResponse(Pubkey, Vec<CrdsValue>),
|
||||
PushMessage(Pubkey, Vec<CrdsValue>),
|
||||
PruneMessage(Pubkey, PruneData),
|
||||
|
||||
/// Window protocol messages
|
||||
/// TODO: move this message to a different module
|
||||
RequestWindowIndex(ContactInfo, u64, u64),
|
||||
RequestHighestWindowIndex(ContactInfo, u64, u64),
|
||||
RequestOrphan(ContactInfo, u64),
|
||||
}
|
||||
|
||||
impl ClusterInfo {
|
||||
@@ -525,7 +514,7 @@ impl ClusterInfo {
|
||||
}
|
||||
|
||||
/// all tvu peers with valid gossip addrs that likely have the slot being requested
|
||||
fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> {
|
||||
pub fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> {
|
||||
let me = self.my_data();
|
||||
ClusterInfo::tvu_peers(self)
|
||||
.into_iter()
|
||||
@@ -866,61 +855,6 @@ impl ClusterInfo {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn window_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> {
|
||||
let req = Protocol::RequestWindowIndex(self.my_data(), slot, shred_index);
|
||||
let out = serialize(&req)?;
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
fn window_highest_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> {
|
||||
let req = Protocol::RequestHighestWindowIndex(self.my_data(), slot, shred_index);
|
||||
let out = serialize(&req)?;
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
fn orphan_bytes(&self, slot: Slot) -> Result<Vec<u8>> {
|
||||
let req = Protocol::RequestOrphan(self.my_data(), slot);
|
||||
let out = serialize(&req)?;
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec<u8>)> {
|
||||
// find a peer that appears to be accepting replication and has the desired slot, as indicated
|
||||
// by a valid tvu port location
|
||||
let valid: Vec<_> = self.repair_peers(repair_request.slot());
|
||||
if valid.is_empty() {
|
||||
return Err(ClusterInfoError::NoPeers.into());
|
||||
}
|
||||
let n = thread_rng().gen::<usize>() % valid.len();
|
||||
let addr = valid[n].gossip; // send the request to the peer's gossip port
|
||||
let out = self.map_repair_request(repair_request)?;
|
||||
|
||||
Ok((addr, out))
|
||||
}
|
||||
pub fn map_repair_request(&self, repair_request: &RepairType) -> Result<Vec<u8>> {
|
||||
match repair_request {
|
||||
RepairType::Shred(slot, shred_index) => {
|
||||
datapoint_debug!(
|
||||
"cluster_info-repair",
|
||||
("repair-slot", *slot, i64),
|
||||
("repair-ix", *shred_index, i64)
|
||||
);
|
||||
Ok(self.window_index_request_bytes(*slot, *shred_index)?)
|
||||
}
|
||||
RepairType::HighestShred(slot, shred_index) => {
|
||||
datapoint_debug!(
|
||||
"cluster_info-repair_highest",
|
||||
("repair-highest-slot", *slot, i64),
|
||||
("repair-highest-ix", *shred_index, i64)
|
||||
);
|
||||
Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?)
|
||||
}
|
||||
RepairType::Orphan(slot) => {
|
||||
datapoint_debug!("cluster_info-repair_orphan", ("repair-orphan", *slot, i64));
|
||||
Ok(self.orphan_bytes(*slot)?)
|
||||
}
|
||||
}
|
||||
}
|
||||
// If the network entrypoint hasn't been discovered yet, add it to the crds table
|
||||
fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>) {
|
||||
let pull_from_entrypoint = if let Some(entrypoint) = &mut self.entrypoint {
|
||||
@@ -1173,117 +1107,9 @@ impl ClusterInfo {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn get_data_shred_as_packet(
|
||||
blockstore: &Arc<Blockstore>,
|
||||
slot: Slot,
|
||||
shred_index: u64,
|
||||
dest: &SocketAddr,
|
||||
) -> Result<Option<Packet>> {
|
||||
let data = blockstore.get_data_shred(slot, shred_index)?;
|
||||
Ok(data.map(|data| {
|
||||
let mut packet = Packet::default();
|
||||
packet.meta.size = data.len();
|
||||
packet.meta.set_addr(dest);
|
||||
packet.data.copy_from_slice(&data);
|
||||
packet
|
||||
}))
|
||||
}
|
||||
|
||||
fn run_window_request(
|
||||
recycler: &PacketsRecycler,
|
||||
from: &ContactInfo,
|
||||
from_addr: &SocketAddr,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
me: &ContactInfo,
|
||||
slot: Slot,
|
||||
shred_index: u64,
|
||||
) -> Option<Packets> {
|
||||
if let Some(blockstore) = blockstore {
|
||||
// Try to find the requested index in one of the slots
|
||||
let packet = Self::get_data_shred_as_packet(blockstore, slot, shred_index, from_addr);
|
||||
|
||||
if let Ok(Some(packet)) = packet {
|
||||
inc_new_counter_debug!("cluster_info-window-request-ledger", 1);
|
||||
return Some(Packets::new_with_recycler_data(
|
||||
recycler,
|
||||
"run_window_request",
|
||||
vec![packet],
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
inc_new_counter_debug!("cluster_info-window-request-fail", 1);
|
||||
trace!(
|
||||
"{}: failed RequestWindowIndex {} {} {}",
|
||||
me.id,
|
||||
from.id,
|
||||
slot,
|
||||
shred_index,
|
||||
);
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn run_highest_window_request(
|
||||
recycler: &PacketsRecycler,
|
||||
from_addr: &SocketAddr,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
slot: Slot,
|
||||
highest_index: u64,
|
||||
) -> Option<Packets> {
|
||||
let blockstore = blockstore?;
|
||||
// Try to find the requested index in one of the slots
|
||||
let meta = blockstore.meta(slot).ok()??;
|
||||
if meta.received > highest_index {
|
||||
// meta.received must be at least 1 by this point
|
||||
let packet =
|
||||
Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr)
|
||||
.ok()??;
|
||||
return Some(Packets::new_with_recycler_data(
|
||||
recycler,
|
||||
"run_highest_window_request",
|
||||
vec![packet],
|
||||
));
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn run_orphan(
|
||||
recycler: &PacketsRecycler,
|
||||
from_addr: &SocketAddr,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
mut slot: Slot,
|
||||
max_responses: usize,
|
||||
) -> Option<Packets> {
|
||||
let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan");
|
||||
if let Some(blockstore) = blockstore {
|
||||
// Try to find the next "n" parent slots of the input slot
|
||||
while let Ok(Some(meta)) = blockstore.meta(slot) {
|
||||
if meta.received == 0 {
|
||||
break;
|
||||
}
|
||||
let packet =
|
||||
Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr);
|
||||
if let Ok(Some(packet)) = packet {
|
||||
res.packets.push(packet);
|
||||
}
|
||||
if meta.is_parent_set() && res.packets.len() <= max_responses {
|
||||
slot = meta.parent_slot;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if res.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(res)
|
||||
}
|
||||
|
||||
fn handle_packets(
|
||||
me: &Arc<RwLock<Self>>,
|
||||
recycler: &PacketsRecycler,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
packets: Packets,
|
||||
response_sender: &PacketSender,
|
||||
@@ -1390,13 +1216,6 @@ impl ClusterInfo {
|
||||
("prune_message", (allocated.get() - start) as i64, i64),
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
let rsp =
|
||||
Self::handle_repair(me, recycler, &from_addr, blockstore, request);
|
||||
if let Some(rsp) = rsp {
|
||||
let _ignore_disconnect = response_sender.send(rsp);
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
// process the collected pulls together
|
||||
@@ -1524,104 +1343,10 @@ impl ClusterInfo {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_repair_sender(request: &Protocol) -> &ContactInfo {
|
||||
match request {
|
||||
Protocol::RequestWindowIndex(ref from, _, _) => from,
|
||||
Protocol::RequestHighestWindowIndex(ref from, _, _) => from,
|
||||
Protocol::RequestOrphan(ref from, _) => from,
|
||||
_ => panic!("Not a repair request"),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_repair(
|
||||
me: &Arc<RwLock<Self>>,
|
||||
recycler: &PacketsRecycler,
|
||||
from_addr: &SocketAddr,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
request: Protocol,
|
||||
) -> Option<Packets> {
|
||||
let now = Instant::now();
|
||||
|
||||
//TODO this doesn't depend on cluster_info module, could be moved
|
||||
//but we are using the listen thread to service these request
|
||||
//TODO verify from is signed
|
||||
|
||||
let self_id = me.read().unwrap().gossip.id;
|
||||
let from = Self::get_repair_sender(&request);
|
||||
if from.id == me.read().unwrap().gossip.id {
|
||||
warn!(
|
||||
"{}: Ignored received repair request from ME {}",
|
||||
self_id, from.id,
|
||||
);
|
||||
inc_new_counter_debug!("cluster_info-handle-repair--eq", 1);
|
||||
return None;
|
||||
}
|
||||
|
||||
me.write()
|
||||
.unwrap()
|
||||
.gossip
|
||||
.crds
|
||||
.update_record_timestamp(&from.id, timestamp());
|
||||
let my_info = me.read().unwrap().my_data();
|
||||
|
||||
let (res, label) = {
|
||||
match &request {
|
||||
Protocol::RequestWindowIndex(from, slot, shred_index) => {
|
||||
inc_new_counter_debug!("cluster_info-request-window-index", 1);
|
||||
(
|
||||
Self::run_window_request(
|
||||
recycler,
|
||||
from,
|
||||
&from_addr,
|
||||
blockstore,
|
||||
&my_info,
|
||||
*slot,
|
||||
*shred_index,
|
||||
),
|
||||
"RequestWindowIndex",
|
||||
)
|
||||
}
|
||||
|
||||
Protocol::RequestHighestWindowIndex(_, slot, highest_index) => {
|
||||
inc_new_counter_debug!("cluster_info-request-highest-window-index", 1);
|
||||
(
|
||||
Self::run_highest_window_request(
|
||||
recycler,
|
||||
&from_addr,
|
||||
blockstore,
|
||||
*slot,
|
||||
*highest_index,
|
||||
),
|
||||
"RequestHighestWindowIndex",
|
||||
)
|
||||
}
|
||||
Protocol::RequestOrphan(_, slot) => {
|
||||
inc_new_counter_debug!("cluster_info-request-orphan", 1);
|
||||
(
|
||||
Self::run_orphan(
|
||||
recycler,
|
||||
&from_addr,
|
||||
blockstore,
|
||||
*slot,
|
||||
MAX_ORPHAN_REPAIR_RESPONSES,
|
||||
),
|
||||
"RequestOrphan",
|
||||
)
|
||||
}
|
||||
_ => panic!("Not a repair request"),
|
||||
}
|
||||
};
|
||||
|
||||
trace!("{}: received repair request: {:?}", self_id, request);
|
||||
report_time_spent(label, &now.elapsed(), "");
|
||||
res
|
||||
}
|
||||
|
||||
/// Process messages from the network
|
||||
fn run_listen(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
recycler: &PacketsRecycler,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
bank_forks: Option<&Arc<RwLock<BankForks>>>,
|
||||
requests_receiver: &PacketReceiver,
|
||||
response_sender: &PacketSender,
|
||||
@@ -1636,12 +1361,11 @@ impl ClusterInfo {
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
Self::handle_packets(obj, &recycler, blockstore, &stakes, reqs, response_sender);
|
||||
Self::handle_packets(obj, &recycler, &stakes, reqs, response_sender);
|
||||
Ok(())
|
||||
}
|
||||
pub fn listen(
|
||||
me: Arc<RwLock<Self>>,
|
||||
blockstore: Option<Arc<Blockstore>>,
|
||||
bank_forks: Option<Arc<RwLock<BankForks>>>,
|
||||
requests_receiver: PacketReceiver,
|
||||
response_sender: PacketSender,
|
||||
@@ -1655,7 +1379,6 @@ impl ClusterInfo {
|
||||
let e = Self::run_listen(
|
||||
&me,
|
||||
&recycler,
|
||||
blockstore.as_ref(),
|
||||
bank_forks.as_ref(),
|
||||
&requests_receiver,
|
||||
&response_sender,
|
||||
@@ -1690,6 +1413,7 @@ impl ClusterInfo {
|
||||
dummy_addr,
|
||||
dummy_addr,
|
||||
dummy_addr,
|
||||
dummy_addr,
|
||||
timestamp(),
|
||||
)
|
||||
}
|
||||
@@ -1770,6 +1494,7 @@ pub struct Sockets {
|
||||
pub repair: UdpSocket,
|
||||
pub retransmit_sockets: Vec<UdpSocket>,
|
||||
pub storage: Option<UdpSocket>,
|
||||
pub serve_repair: UdpSocket,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -1790,9 +1515,10 @@ impl Node {
|
||||
let storage = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let empty = "0.0.0.0:0".parse().unwrap();
|
||||
let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
|
||||
let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()];
|
||||
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
|
||||
let info = ContactInfo::new(
|
||||
pubkey,
|
||||
gossip.local_addr().unwrap(),
|
||||
@@ -1804,6 +1530,7 @@ impl Node {
|
||||
storage.local_addr().unwrap(),
|
||||
empty,
|
||||
empty,
|
||||
serve_repair.local_addr().unwrap(),
|
||||
timestamp(),
|
||||
);
|
||||
|
||||
@@ -1818,6 +1545,7 @@ impl Node {
|
||||
broadcast,
|
||||
repair,
|
||||
retransmit_sockets: vec![retransmit],
|
||||
serve_repair,
|
||||
storage: Some(storage),
|
||||
ip_echo: None,
|
||||
},
|
||||
@@ -1840,6 +1568,7 @@ impl Node {
|
||||
let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()];
|
||||
let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let storage = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let info = ContactInfo::new(
|
||||
pubkey,
|
||||
gossip_addr,
|
||||
@@ -1851,6 +1580,7 @@ impl Node {
|
||||
storage.local_addr().unwrap(),
|
||||
rpc_addr,
|
||||
rpc_pubsub_addr,
|
||||
serve_repair.local_addr().unwrap(),
|
||||
timestamp(),
|
||||
);
|
||||
Node {
|
||||
@@ -1866,6 +1596,7 @@ impl Node {
|
||||
repair,
|
||||
retransmit_sockets: vec![retransmit_socket],
|
||||
storage: None,
|
||||
serve_repair,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -1908,6 +1639,8 @@ impl Node {
|
||||
multi_bind_in_range(port_range, 8).expect("retransmit multi_bind");
|
||||
|
||||
let (repair_port, repair) = Self::bind(port_range);
|
||||
let (serve_repair_port, serve_repair) = Self::bind(port_range);
|
||||
|
||||
let (_, broadcast) = multi_bind_in_range(port_range, 4).expect("broadcast multi_bind");
|
||||
|
||||
let info = ContactInfo::new(
|
||||
@@ -1918,6 +1651,7 @@ impl Node {
|
||||
SocketAddr::new(gossip_addr.ip(), repair_port),
|
||||
SocketAddr::new(gossip_addr.ip(), tpu_port),
|
||||
SocketAddr::new(gossip_addr.ip(), tpu_forwards_port),
|
||||
SocketAddr::new(gossip_addr.ip(), serve_repair_port),
|
||||
socketaddr_any!(),
|
||||
socketaddr_any!(),
|
||||
socketaddr_any!(),
|
||||
@@ -1937,6 +1671,7 @@ impl Node {
|
||||
repair,
|
||||
retransmit_sockets,
|
||||
storage: None,
|
||||
serve_repair,
|
||||
ip_echo: Some(ip_echo),
|
||||
},
|
||||
}
|
||||
@@ -1973,18 +1708,8 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::crds_value::CrdsValueLabel;
|
||||
use crate::repair_service::RepairType;
|
||||
use crate::result::Error;
|
||||
use rayon::prelude::*;
|
||||
use solana_ledger::blockstore::make_many_slot_entries;
|
||||
use solana_ledger::blockstore::Blockstore;
|
||||
use solana_ledger::blockstore_processor::fill_blockstore_slot_with_ticks;
|
||||
use solana_ledger::get_tmp_ledger_path;
|
||||
use solana_ledger::shred::{
|
||||
max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader,
|
||||
};
|
||||
use solana_perf::test_tx::test_tx;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use std::collections::HashSet;
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
@@ -2055,242 +1780,6 @@ mod tests {
|
||||
let label = CrdsValueLabel::ContactInfo(d.id);
|
||||
assert!(cluster_info.gossip.crds.lookup(&label).is_none());
|
||||
}
|
||||
#[test]
|
||||
fn window_index_request() {
|
||||
let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
|
||||
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(me);
|
||||
let rv = cluster_info.repair_request(&RepairType::Shred(0, 0));
|
||||
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
|
||||
|
||||
let gossip_addr = socketaddr!([127, 0, 0, 1], 1234);
|
||||
let nxt = ContactInfo::new(
|
||||
&Pubkey::new_rand(),
|
||||
gossip_addr,
|
||||
socketaddr!([127, 0, 0, 1], 1235),
|
||||
socketaddr!([127, 0, 0, 1], 1236),
|
||||
socketaddr!([127, 0, 0, 1], 1237),
|
||||
socketaddr!([127, 0, 0, 1], 1238),
|
||||
socketaddr!([127, 0, 0, 1], 1239),
|
||||
socketaddr!([127, 0, 0, 1], 1240),
|
||||
socketaddr!([127, 0, 0, 1], 1241),
|
||||
socketaddr!([127, 0, 0, 1], 1242),
|
||||
0,
|
||||
);
|
||||
cluster_info.insert_info(nxt.clone());
|
||||
let rv = cluster_info
|
||||
.repair_request(&RepairType::Shred(0, 0))
|
||||
.unwrap();
|
||||
assert_eq!(nxt.gossip, gossip_addr);
|
||||
assert_eq!(rv.0, nxt.gossip);
|
||||
|
||||
let gossip_addr2 = socketaddr!([127, 0, 0, 2], 1234);
|
||||
let nxt = ContactInfo::new(
|
||||
&Pubkey::new_rand(),
|
||||
gossip_addr2,
|
||||
socketaddr!([127, 0, 0, 1], 1235),
|
||||
socketaddr!([127, 0, 0, 1], 1236),
|
||||
socketaddr!([127, 0, 0, 1], 1237),
|
||||
socketaddr!([127, 0, 0, 1], 1238),
|
||||
socketaddr!([127, 0, 0, 1], 1239),
|
||||
socketaddr!([127, 0, 0, 1], 1240),
|
||||
socketaddr!([127, 0, 0, 1], 1241),
|
||||
socketaddr!([127, 0, 0, 1], 1242),
|
||||
0,
|
||||
);
|
||||
cluster_info.insert_info(nxt);
|
||||
let mut one = false;
|
||||
let mut two = false;
|
||||
while !one || !two {
|
||||
//this randomly picks an option, so eventually it should pick both
|
||||
let rv = cluster_info
|
||||
.repair_request(&RepairType::Shred(0, 0))
|
||||
.unwrap();
|
||||
if rv.0 == gossip_addr {
|
||||
one = true;
|
||||
}
|
||||
if rv.0 == gossip_addr2 {
|
||||
two = true;
|
||||
}
|
||||
}
|
||||
assert!(one && two);
|
||||
}
|
||||
|
||||
/// test window requests respond with the right shred, and do not overrun
|
||||
#[test]
|
||||
fn run_window_request() {
|
||||
let recycler = PacketsRecycler::default();
|
||||
solana_logger::setup();
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||
let me = ContactInfo::new(
|
||||
&Pubkey::new_rand(),
|
||||
socketaddr!("127.0.0.1:1234"),
|
||||
socketaddr!("127.0.0.1:1235"),
|
||||
socketaddr!("127.0.0.1:1236"),
|
||||
socketaddr!("127.0.0.1:1237"),
|
||||
socketaddr!("127.0.0.1:1238"),
|
||||
socketaddr!("127.0.0.1:1239"),
|
||||
socketaddr!("127.0.0.1:1240"),
|
||||
socketaddr!("127.0.0.1:1241"),
|
||||
socketaddr!("127.0.0.1:1242"),
|
||||
0,
|
||||
);
|
||||
let rv = ClusterInfo::run_window_request(
|
||||
&recycler,
|
||||
&me,
|
||||
&socketaddr_any!(),
|
||||
Some(&blockstore),
|
||||
&me,
|
||||
0,
|
||||
0,
|
||||
);
|
||||
assert!(rv.is_none());
|
||||
let mut common_header = ShredCommonHeader::default();
|
||||
common_header.slot = 2;
|
||||
common_header.index = 1;
|
||||
let mut data_header = DataShredHeader::default();
|
||||
data_header.parent_offset = 1;
|
||||
let shred_info = Shred::new_empty_from_header(
|
||||
common_header,
|
||||
data_header,
|
||||
CodingShredHeader::default(),
|
||||
);
|
||||
|
||||
blockstore
|
||||
.insert_shreds(vec![shred_info], None, false)
|
||||
.expect("Expect successful ledger write");
|
||||
|
||||
let rv = ClusterInfo::run_window_request(
|
||||
&recycler,
|
||||
&me,
|
||||
&socketaddr_any!(),
|
||||
Some(&blockstore),
|
||||
&me,
|
||||
2,
|
||||
1,
|
||||
);
|
||||
assert!(!rv.is_none());
|
||||
let rv: Vec<Shred> = rv
|
||||
.expect("packets")
|
||||
.packets
|
||||
.into_iter()
|
||||
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
|
||||
.collect();
|
||||
assert_eq!(rv[0].index(), 1);
|
||||
assert_eq!(rv[0].slot(), 2);
|
||||
}
|
||||
|
||||
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
/// test run_window_requestwindow requests respond with the right shred, and do not overrun
|
||||
#[test]
|
||||
fn run_highest_window_request() {
|
||||
let recycler = PacketsRecycler::default();
|
||||
solana_logger::setup();
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||
let rv = ClusterInfo::run_highest_window_request(
|
||||
&recycler,
|
||||
&socketaddr_any!(),
|
||||
Some(&blockstore),
|
||||
0,
|
||||
0,
|
||||
);
|
||||
assert!(rv.is_none());
|
||||
|
||||
let _ = fill_blockstore_slot_with_ticks(
|
||||
&blockstore,
|
||||
max_ticks_per_n_shreds(1) + 1,
|
||||
2,
|
||||
1,
|
||||
Hash::default(),
|
||||
);
|
||||
|
||||
let rv = ClusterInfo::run_highest_window_request(
|
||||
&recycler,
|
||||
&socketaddr_any!(),
|
||||
Some(&blockstore),
|
||||
2,
|
||||
1,
|
||||
);
|
||||
let rv: Vec<Shred> = rv
|
||||
.expect("packets")
|
||||
.packets
|
||||
.into_iter()
|
||||
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
|
||||
.collect();
|
||||
assert!(!rv.is_empty());
|
||||
let index = blockstore.meta(2).unwrap().unwrap().received - 1;
|
||||
assert_eq!(rv[0].index(), index as u32);
|
||||
assert_eq!(rv[0].slot(), 2);
|
||||
|
||||
let rv = ClusterInfo::run_highest_window_request(
|
||||
&recycler,
|
||||
&socketaddr_any!(),
|
||||
Some(&blockstore),
|
||||
2,
|
||||
index + 1,
|
||||
);
|
||||
assert!(rv.is_none());
|
||||
}
|
||||
|
||||
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn run_orphan() {
|
||||
solana_logger::setup();
|
||||
let recycler = PacketsRecycler::default();
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||
let rv =
|
||||
ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 2, 0);
|
||||
assert!(rv.is_none());
|
||||
|
||||
// Create slots 1, 2, 3 with 5 shreds apiece
|
||||
let (shreds, _) = make_many_slot_entries(1, 3, 5);
|
||||
|
||||
blockstore
|
||||
.insert_shreds(shreds, None, false)
|
||||
.expect("Expect successful ledger write");
|
||||
|
||||
// We don't have slot 4, so we don't know how to service this requeset
|
||||
let rv =
|
||||
ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 4, 5);
|
||||
assert!(rv.is_none());
|
||||
|
||||
// For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively
|
||||
// for this request
|
||||
let rv: Vec<_> =
|
||||
ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 3, 5)
|
||||
.expect("run_orphan packets")
|
||||
.packets
|
||||
.iter()
|
||||
.map(|b| b.clone())
|
||||
.collect();
|
||||
let expected: Vec<_> = (1..=3)
|
||||
.rev()
|
||||
.map(|slot| {
|
||||
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
|
||||
ClusterInfo::get_data_shred_as_packet(
|
||||
&blockstore,
|
||||
slot,
|
||||
index,
|
||||
&socketaddr_any!(),
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(rv, expected)
|
||||
}
|
||||
|
||||
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
fn assert_in_range(x: u16, range: (u16, u16)) {
|
||||
assert!(x >= range.0);
|
||||
@@ -2671,13 +2160,16 @@ mod tests {
|
||||
}
|
||||
|
||||
fn test_split_messages(value: CrdsValue) {
|
||||
const NUM_VALUES: usize = 30;
|
||||
const NUM_VALUES: u64 = 30;
|
||||
let value_size = value.size();
|
||||
let expected_len = NUM_VALUES / (MAX_PROTOCOL_PAYLOAD_SIZE / value_size).max(1) as usize;
|
||||
let msgs = vec![value; NUM_VALUES];
|
||||
let num_values_per_payload = (MAX_PROTOCOL_PAYLOAD_SIZE / value_size).max(1);
|
||||
|
||||
// Expected len is the ceiling of the division
|
||||
let expected_len = (NUM_VALUES + num_values_per_payload - 1) / num_values_per_payload;
|
||||
let msgs = vec![value; NUM_VALUES as usize];
|
||||
|
||||
let split = ClusterInfo::split_gossip_messages(msgs);
|
||||
assert!(split.len() <= expected_len);
|
||||
assert!(split.len() as u64 <= expected_len);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -2850,25 +2342,6 @@ mod tests {
|
||||
- serialized_size(&PruneData::default()).unwrap(),
|
||||
);
|
||||
|
||||
// make sure repairs are always smaller than the gossip messages
|
||||
assert!(
|
||||
max_protocol_size
|
||||
> serialized_size(&Protocol::RequestWindowIndex(ContactInfo::default(), 0, 0))
|
||||
.unwrap()
|
||||
);
|
||||
assert!(
|
||||
max_protocol_size
|
||||
> serialized_size(&Protocol::RequestHighestWindowIndex(
|
||||
ContactInfo::default(),
|
||||
0,
|
||||
0
|
||||
))
|
||||
.unwrap()
|
||||
);
|
||||
assert!(
|
||||
max_protocol_size
|
||||
> serialized_size(&Protocol::RequestOrphan(ContactInfo::default(), 0)).unwrap()
|
||||
);
|
||||
// finally assert the header size estimation is correct
|
||||
assert_eq!(MAX_PROTOCOL_HEADER_SIZE, max_protocol_size);
|
||||
}
|
||||
|
Reference in New Issue
Block a user