Add protocol request for requesting the highest blob in a slot (#2759)
This commit is contained in:
@@ -157,6 +157,7 @@ enum Protocol {
|
||||
/// Window protocol messages
|
||||
/// TODO: move this message to a different module
|
||||
RequestWindowIndex(NodeInfo, u64, u64),
|
||||
RequestHighestWindowIndex(NodeInfo, u64, u64),
|
||||
}
|
||||
|
||||
impl ClusterInfo {
|
||||
@@ -712,10 +713,22 @@ impl ClusterInfo {
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
pub fn window_highest_index_request_bytes(
|
||||
&self,
|
||||
slot_height: u64,
|
||||
blob_index: u64,
|
||||
) -> Result<Vec<u8>> {
|
||||
let req =
|
||||
Protocol::RequestHighestWindowIndex(self.my_data().clone(), slot_height, blob_index);
|
||||
let out = serialize(&req)?;
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
pub fn window_index_request(
|
||||
&self,
|
||||
slot_height: u64,
|
||||
blob_index: u64,
|
||||
get_highest: bool,
|
||||
) -> Result<(SocketAddr, Vec<u8>)> {
|
||||
// find a peer that appears to be accepting replication, as indicated
|
||||
// by a valid tvu port location
|
||||
@@ -725,7 +738,13 @@ impl ClusterInfo {
|
||||
}
|
||||
let n = thread_rng().gen::<usize>() % valid.len();
|
||||
let addr = valid[n].gossip; // send the request to the peer's gossip port
|
||||
let out = self.window_index_request_bytes(slot_height, blob_index)?;
|
||||
let out = {
|
||||
if get_highest {
|
||||
self.window_highest_index_request_bytes(slot_height, blob_index)?
|
||||
} else {
|
||||
self.window_index_request_bytes(slot_height, blob_index)?
|
||||
}
|
||||
};
|
||||
|
||||
submit(
|
||||
influxdb::Point::new("cluster-info")
|
||||
@@ -735,6 +754,7 @@ impl ClusterInfo {
|
||||
|
||||
Ok((addr, out))
|
||||
}
|
||||
|
||||
fn new_pull_requests(&mut self) -> Vec<(SocketAddr, Protocol)> {
|
||||
let now = timestamp();
|
||||
let pulls: Vec<_> = self.gossip.new_pull_request(now).ok().into_iter().collect();
|
||||
@@ -854,9 +874,6 @@ impl ClusterInfo {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
// TODO: To support repairing multiple slots, broadcast needs to reset
|
||||
// blob index for every slot, and window requests should be by slot + index.
|
||||
// Issue: https://github.com/solana-labs/solana/issues/2440
|
||||
fn run_window_request(
|
||||
from: &NodeInfo,
|
||||
from_addr: &SocketAddr,
|
||||
@@ -889,6 +906,32 @@ impl ClusterInfo {
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn run_highest_window_request(
|
||||
from_addr: &SocketAddr,
|
||||
blocktree: Option<&Arc<Blocktree>>,
|
||||
slot_height: u64,
|
||||
highest_index: u64,
|
||||
) -> Vec<SharedBlob> {
|
||||
if let Some(blocktree) = blocktree {
|
||||
// Try to find the requested index in one of the slots
|
||||
let meta = blocktree.meta(slot_height);
|
||||
|
||||
if let Ok(Some(meta)) = meta {
|
||||
if meta.received > highest_index {
|
||||
// meta.received must be at least 1 by this point
|
||||
let blob = blocktree.get_data_blob(slot_height, meta.received - 1);
|
||||
|
||||
if let Ok(Some(mut blob)) = blob {
|
||||
blob.meta.set_addr(from_addr);
|
||||
return vec![Arc::new(RwLock::new(blob))];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
vec![]
|
||||
}
|
||||
|
||||
//TODO we should first coalesce all the requests
|
||||
fn handle_blob(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
@@ -1012,6 +1055,7 @@ impl ClusterInfo {
|
||||
slot_height: u64,
|
||||
blob_index: u64,
|
||||
from_addr: &SocketAddr,
|
||||
is_get_highest: bool,
|
||||
) -> Vec<SharedBlob> {
|
||||
let now = Instant::now();
|
||||
|
||||
@@ -1039,14 +1083,20 @@ impl ClusterInfo {
|
||||
slot_height,
|
||||
blob_index,
|
||||
);
|
||||
let res = Self::run_window_request(
|
||||
let res = {
|
||||
if is_get_highest {
|
||||
Self::run_highest_window_request(&from_addr, blocktree, slot_height, blob_index)
|
||||
} else {
|
||||
Self::run_window_request(
|
||||
&from,
|
||||
&from_addr,
|
||||
blocktree,
|
||||
&my_info,
|
||||
slot_height,
|
||||
blob_index,
|
||||
);
|
||||
)
|
||||
}
|
||||
};
|
||||
report_time_spent(
|
||||
"RequestWindowIndex",
|
||||
&now.elapsed(),
|
||||
@@ -1054,6 +1104,7 @@ impl ClusterInfo {
|
||||
);
|
||||
res
|
||||
}
|
||||
|
||||
fn handle_protocol(
|
||||
me: &Arc<RwLock<Self>>,
|
||||
from_addr: &SocketAddr,
|
||||
@@ -1120,6 +1171,18 @@ impl ClusterInfo {
|
||||
slot_height,
|
||||
blob_index,
|
||||
from_addr,
|
||||
false,
|
||||
)
|
||||
}
|
||||
Protocol::RequestHighestWindowIndex(from, slot_height, highest_index) => {
|
||||
Self::handle_request_window_index(
|
||||
me,
|
||||
&from,
|
||||
blocktree,
|
||||
slot_height,
|
||||
highest_index,
|
||||
from_addr,
|
||||
true,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -1368,7 +1431,7 @@ mod tests {
|
||||
fn window_index_request() {
|
||||
let me = NodeInfo::new_localhost(Keypair::new().pubkey(), timestamp());
|
||||
let mut cluster_info = ClusterInfo::new(me);
|
||||
let rv = cluster_info.window_index_request(0, 0);
|
||||
let rv = cluster_info.window_index_request(0, 0, false);
|
||||
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
|
||||
|
||||
let gossip_addr = socketaddr!([127, 0, 0, 1], 1234);
|
||||
@@ -1383,7 +1446,7 @@ mod tests {
|
||||
0,
|
||||
);
|
||||
cluster_info.insert_info(nxt.clone());
|
||||
let rv = cluster_info.window_index_request(0, 0).unwrap();
|
||||
let rv = cluster_info.window_index_request(0, 0, false).unwrap();
|
||||
assert_eq!(nxt.gossip, gossip_addr);
|
||||
assert_eq!(rv.0, nxt.gossip);
|
||||
|
||||
@@ -1403,7 +1466,7 @@ mod tests {
|
||||
let mut two = false;
|
||||
while !one || !two {
|
||||
//this randomly picks an option, so eventually it should pick both
|
||||
let rv = cluster_info.window_index_request(0, 0).unwrap();
|
||||
let rv = cluster_info.window_index_request(0, 0, false).unwrap();
|
||||
if rv.0 == gossip_addr {
|
||||
one = true;
|
||||
}
|
||||
@@ -1472,6 +1535,54 @@ mod tests {
|
||||
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
/// test run_window_requestwindow requests respond with the right blob, and do not overrun
|
||||
#[test]
|
||||
fn run_highest_window_request() {
|
||||
solana_logger::setup();
|
||||
let ledger_path = get_tmp_ledger_path("run_highest_window_request");
|
||||
{
|
||||
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
|
||||
let rv =
|
||||
ClusterInfo::run_highest_window_request(&socketaddr_any!(), Some(&blocktree), 0, 0);
|
||||
assert!(rv.is_empty());
|
||||
|
||||
let data_size = 1;
|
||||
let max_index = 5;
|
||||
let blobs: Vec<_> = (0..max_index)
|
||||
.map(|i| {
|
||||
let mut blob = Blob::default();
|
||||
blob.set_size(data_size);
|
||||
blob.set_index(i);
|
||||
blob.set_slot(2);
|
||||
blob.meta.size = data_size + BLOB_HEADER_SIZE;
|
||||
blob
|
||||
})
|
||||
.collect();
|
||||
|
||||
blocktree
|
||||
.write_blobs(&blobs)
|
||||
.expect("Expect successful ledger write");
|
||||
|
||||
let rv =
|
||||
ClusterInfo::run_highest_window_request(&socketaddr_any!(), Some(&blocktree), 2, 1);
|
||||
assert!(!rv.is_empty());
|
||||
let v = rv[0].clone();
|
||||
assert_eq!(v.read().unwrap().index(), max_index - 1);
|
||||
assert_eq!(v.read().unwrap().slot(), 2);
|
||||
assert_eq!(v.read().unwrap().meta.size, BLOB_HEADER_SIZE + data_size);
|
||||
|
||||
let rv = ClusterInfo::run_highest_window_request(
|
||||
&socketaddr_any!(),
|
||||
Some(&blocktree),
|
||||
2,
|
||||
max_index,
|
||||
);
|
||||
assert!(rv.is_empty());
|
||||
}
|
||||
|
||||
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_leader() {
|
||||
solana_logger::setup();
|
||||
|
@@ -59,7 +59,7 @@ impl RepairService {
|
||||
cluster_info
|
||||
.read()
|
||||
.unwrap()
|
||||
.window_index_request(slot_height, blob_index)
|
||||
.window_index_request(slot_height, blob_index, false)
|
||||
.map(|result| (result, slot_height, blob_index))
|
||||
.ok()
|
||||
})
|
||||
|
Reference in New Issue
Block a user