removes the nested for loop from retransmit-stage
The code can be simplified by just flattening the vector of packets.
This commit is contained in:
@ -330,10 +330,10 @@ fn retransmit(
|
|||||||
let packets = r_lock.recv_timeout(RECV_TIMEOUT)?;
|
let packets = r_lock.recv_timeout(RECV_TIMEOUT)?;
|
||||||
let mut timer_start = Measure::start("retransmit");
|
let mut timer_start = Measure::start("retransmit");
|
||||||
let mut total_packets = packets.packets.len();
|
let mut total_packets = packets.packets.len();
|
||||||
let mut packet_v = vec![packets];
|
let mut packets = vec![packets];
|
||||||
while let Ok(nq) = r_lock.try_recv() {
|
while let Ok(nq) = r_lock.try_recv() {
|
||||||
total_packets += nq.packets.len();
|
total_packets += nq.packets.len();
|
||||||
packet_v.push(nq);
|
packets.push(nq);
|
||||||
if total_packets >= MAX_PACKET_BATCH_SIZE {
|
if total_packets >= MAX_PACKET_BATCH_SIZE {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -382,95 +382,93 @@ fn retransmit(
|
|||||||
let mut packets_by_slot: HashMap<Slot, usize> = HashMap::new();
|
let mut packets_by_slot: HashMap<Slot, usize> = HashMap::new();
|
||||||
let mut packets_by_source: HashMap<String, usize> = HashMap::new();
|
let mut packets_by_source: HashMap<String, usize> = HashMap::new();
|
||||||
let mut max_slot = 0;
|
let mut max_slot = 0;
|
||||||
for packets in packet_v {
|
for packet in packets.iter().flat_map(|p| p.packets.iter()) {
|
||||||
for packet in packets.packets.iter() {
|
// skip discarded packets and repair packets
|
||||||
// skip discarded packets and repair packets
|
if packet.meta.discard {
|
||||||
if packet.meta.discard {
|
total_packets -= 1;
|
||||||
total_packets -= 1;
|
discard_total += 1;
|
||||||
discard_total += 1;
|
continue;
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if packet.meta.repair {
|
|
||||||
total_packets -= 1;
|
|
||||||
repair_total += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let shred_slot = match check_if_already_received(packet, shreds_received) {
|
|
||||||
Some(slot) => slot,
|
|
||||||
None => continue,
|
|
||||||
};
|
|
||||||
max_slot = max_slot.max(shred_slot);
|
|
||||||
|
|
||||||
if let Some(rpc_subscriptions) = rpc_subscriptions {
|
|
||||||
if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) {
|
|
||||||
rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived {
|
|
||||||
slot: shred_slot,
|
|
||||||
timestamp: timestamp(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut compute_turbine_peers = Measure::start("turbine_start");
|
|
||||||
let stakes_and_index = get_retransmit_peers(
|
|
||||||
my_id,
|
|
||||||
shred_slot,
|
|
||||||
leader_schedule_cache,
|
|
||||||
r_bank.deref(),
|
|
||||||
r_epoch_stakes_cache.deref(),
|
|
||||||
);
|
|
||||||
let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
|
|
||||||
&my_id,
|
|
||||||
&r_epoch_stakes_cache.peers,
|
|
||||||
&stakes_and_index,
|
|
||||||
packet.meta.seed,
|
|
||||||
);
|
|
||||||
// If the node is on the critical path (i.e. the first node in each
|
|
||||||
// neighborhood), then we expect that the packet arrives at tvu
|
|
||||||
// socket as opposed to tvu-forwards. If this is not the case, then
|
|
||||||
// the turbine broadcast/retransmit tree mismatch across nodes.
|
|
||||||
if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) {
|
|
||||||
retransmit_tree_mismatch += 1;
|
|
||||||
}
|
|
||||||
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
|
|
||||||
// split off the indexes, we don't need the stakes anymore
|
|
||||||
let indexes: Vec<_> = shuffled_stakes_and_index
|
|
||||||
.into_iter()
|
|
||||||
.map(|(_, index)| index)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let (neighbors, children) =
|
|
||||||
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes);
|
|
||||||
let neighbors: Vec<_> = neighbors
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|index| {
|
|
||||||
let peer = &r_epoch_stakes_cache.peers[index];
|
|
||||||
if peer.id == my_id {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(peer)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
let children: Vec<_> = children
|
|
||||||
.into_iter()
|
|
||||||
.map(|index| &r_epoch_stakes_cache.peers[index])
|
|
||||||
.collect();
|
|
||||||
compute_turbine_peers.stop();
|
|
||||||
compute_turbine_peers_total += compute_turbine_peers.as_us();
|
|
||||||
|
|
||||||
*packets_by_slot.entry(packet.meta.slot).or_insert(0) += 1;
|
|
||||||
*packets_by_source
|
|
||||||
.entry(packet.meta.addr().to_string())
|
|
||||||
.or_insert(0) += 1;
|
|
||||||
|
|
||||||
let mut retransmit_time = Measure::start("retransmit_to");
|
|
||||||
if !packet.meta.forward {
|
|
||||||
ClusterInfo::retransmit_to(&neighbors, packet, sock, true)?;
|
|
||||||
}
|
|
||||||
ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward)?;
|
|
||||||
retransmit_time.stop();
|
|
||||||
retransmit_total += retransmit_time.as_us();
|
|
||||||
}
|
}
|
||||||
|
if packet.meta.repair {
|
||||||
|
total_packets -= 1;
|
||||||
|
repair_total += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let shred_slot = match check_if_already_received(packet, shreds_received) {
|
||||||
|
Some(slot) => slot,
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
max_slot = max_slot.max(shred_slot);
|
||||||
|
|
||||||
|
if let Some(rpc_subscriptions) = rpc_subscriptions {
|
||||||
|
if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) {
|
||||||
|
rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived {
|
||||||
|
slot: shred_slot,
|
||||||
|
timestamp: timestamp(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut compute_turbine_peers = Measure::start("turbine_start");
|
||||||
|
let stakes_and_index = get_retransmit_peers(
|
||||||
|
my_id,
|
||||||
|
shred_slot,
|
||||||
|
leader_schedule_cache,
|
||||||
|
r_bank.deref(),
|
||||||
|
r_epoch_stakes_cache.deref(),
|
||||||
|
);
|
||||||
|
let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
|
||||||
|
&my_id,
|
||||||
|
&r_epoch_stakes_cache.peers,
|
||||||
|
&stakes_and_index,
|
||||||
|
packet.meta.seed,
|
||||||
|
);
|
||||||
|
// If the node is on the critical path (i.e. the first node in each
|
||||||
|
// neighborhood), then we expect that the packet arrives at tvu socket
|
||||||
|
// as opposed to tvu-forwards. If this is not the case, then the
|
||||||
|
// turbine broadcast/retransmit tree is mismatched across nodes.
|
||||||
|
if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) {
|
||||||
|
retransmit_tree_mismatch += 1;
|
||||||
|
}
|
||||||
|
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
|
||||||
|
// split off the indexes, we don't need the stakes anymore
|
||||||
|
let indexes: Vec<_> = shuffled_stakes_and_index
|
||||||
|
.into_iter()
|
||||||
|
.map(|(_, index)| index)
|
||||||
|
.collect();
|
||||||
|
debug_assert_eq!(my_id, r_epoch_stakes_cache.peers[indexes[my_index]].id);
|
||||||
|
|
||||||
|
let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes);
|
||||||
|
let neighbors: Vec<_> = neighbors
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|index| {
|
||||||
|
let peer = &r_epoch_stakes_cache.peers[index];
|
||||||
|
if peer.id == my_id {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(peer)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let children: Vec<_> = children
|
||||||
|
.into_iter()
|
||||||
|
.map(|index| &r_epoch_stakes_cache.peers[index])
|
||||||
|
.collect();
|
||||||
|
compute_turbine_peers.stop();
|
||||||
|
compute_turbine_peers_total += compute_turbine_peers.as_us();
|
||||||
|
|
||||||
|
*packets_by_slot.entry(packet.meta.slot).or_default() += 1;
|
||||||
|
*packets_by_source
|
||||||
|
.entry(packet.meta.addr().to_string())
|
||||||
|
.or_default() += 1;
|
||||||
|
|
||||||
|
let mut retransmit_time = Measure::start("retransmit_to");
|
||||||
|
if !packet.meta.forward {
|
||||||
|
ClusterInfo::retransmit_to(&neighbors, packet, sock, true)?;
|
||||||
|
}
|
||||||
|
ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward)?;
|
||||||
|
retransmit_time.stop();
|
||||||
|
retransmit_total += retransmit_time.as_us();
|
||||||
}
|
}
|
||||||
max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed);
|
max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed);
|
||||||
timer_start.stop();
|
timer_start.stop();
|
||||||
|
Reference in New Issue
Block a user