Batch packet forwarding in banking stage

This commit is contained in:
Carl
2019-03-09 02:47:41 -08:00
committed by Pankaj Garg
parent b60b8ec5ae
commit cd1a9faacd
4 changed files with 75 additions and 19 deletions

View File

@ -6,8 +6,8 @@ use crate::cluster_info::ClusterInfo;
use crate::entry::Entry; use crate::entry::Entry;
use crate::leader_confirmation_service::LeaderConfirmationService; use crate::leader_confirmation_service::LeaderConfirmationService;
use crate::leader_schedule_utils; use crate::leader_schedule_utils;
use crate::packet::Packets;
use crate::packet::SharedPackets; use crate::packet::SharedPackets;
use crate::packet::{Blob, Packets};
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries}; use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries};
use crate::poh_service::{PohService, PohServiceConfig}; use crate::poh_service::{PohService, PohServiceConfig};
use crate::result::{Error, Result}; use crate::result::{Error, Result};
@ -76,15 +76,29 @@ impl BankingStage {
fn forward_unprocessed_packets( fn forward_unprocessed_packets(
socket: &std::net::UdpSocket, socket: &std::net::UdpSocket,
tpu: &std::net::SocketAddr, forwarder: &std::net::SocketAddr,
unprocessed_packets: &[(SharedPackets, usize)], unprocessed_packets: &[(SharedPackets, usize)],
) -> std::io::Result<()> { ) -> std::io::Result<()> {
let mut blob = Blob::default();
for (packets, start_index) in unprocessed_packets { for (packets, start_index) in unprocessed_packets {
let packets = packets.read().unwrap(); let packets = packets.read().unwrap();
for packet in packets.packets.iter().skip(*start_index) { let mut current_index = *start_index;
socket.send_to(&packet.data[..packet.meta.size], tpu)?; while current_index < packets.packets.len() {
current_index += blob.store_packets(&packets.packets[current_index..]) as usize;
if current_index < packets.packets.len() {
// Blob is full, send it
socket.send_to(&blob.data[..blob.meta.size], forwarder)?;
blob = Blob::default();
} else {
break;
}
} }
} }
if blob.size() > 0 {
socket.send_to(&blob.data[..blob.meta.size], forwarder)?;
}
Ok(()) Ok(())
} }
@ -110,8 +124,11 @@ impl BankingStage {
if bank.is_none() { if bank.is_none() {
if let Some(leader) = leader.clone() { if let Some(leader) = leader.clone() {
if my_id == leader.id { if my_id == leader.id {
let _ = let _ = Self::forward_unprocessed_packets(
Self::forward_unprocessed_packets(&socket, &leader.tpu, &buffered_packets); &socket,
&leader.forwarder,
&buffered_packets,
);
return true; return true;
} }
} }
@ -119,8 +136,11 @@ impl BankingStage {
// If there's a bank, and leader is available, forward the packets // If there's a bank, and leader is available, forward the packets
if bank.is_some() && leader.is_some() { if bank.is_some() && leader.is_some() {
let _ = let _ = Self::forward_unprocessed_packets(
Self::forward_unprocessed_packets(&socket, &leader.unwrap().tpu, &buffered_packets); &socket,
&leader.unwrap().forwarder,
&buffered_packets,
);
return true; return true;
} }
@ -179,7 +199,7 @@ impl BankingStage {
if let Some(leader) = cluster_info.read().unwrap().leader_data() { if let Some(leader) = cluster_info.read().unwrap().leader_data() {
let _ = Self::forward_unprocessed_packets( let _ = Self::forward_unprocessed_packets(
&socket, &socket,
&leader.tpu, &leader.forwarder,
&unprocessed_packets, &unprocessed_packets,
); );
} }

View File

@ -419,17 +419,25 @@ impl Blob {
self.set_data_size(new_size as u64); self.set_data_size(new_size as u64);
} }
pub fn store_packets(&mut self, packets: &[Packet]) -> Result<()> { pub fn store_packets(&mut self, packets: &[Packet]) -> u64 {
let mut cursor = Cursor::new(&mut self.data[..]); let size = self.size();
cursor.set_position(BLOB_HEADER_SIZE as u64); let mut cursor = Cursor::new(&mut self.data_mut()[size..]);
let mut written = 0;
let mut last_index = 0;
for packet in packets { for packet in packets {
serialize_into(&mut cursor, &packet.meta)?; if serialize_into(&mut cursor, &packet.meta).is_err() {
cursor.write(&packet.data[..])?; break;
} }
let size = cursor.position(); if cursor.write_all(&packet.data[..]).is_err() {
self.set_size(size as usize); break;
}
Ok(()) written = cursor.position() as usize;
last_index += 1;
}
self.set_size(size + written);
last_index
} }
pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> {
@ -514,6 +522,8 @@ mod tests {
to_packets, Blob, Meta, Packet, Packets, SharedBlob, SharedPackets, NUM_PACKETS, to_packets, Blob, Meta, Packet, Packets, SharedBlob, SharedPackets, NUM_PACKETS,
PACKET_DATA_SIZE, PACKET_DATA_SIZE,
}; };
use crate::packet::{BLOB_HEADER_SIZE, BLOB_SIZE};
use bincode::serialized_size;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::system_transaction::SystemTransaction;
@ -629,4 +639,29 @@ mod tests {
assert!(b.should_forward()); assert!(b.should_forward());
} }
#[test]
fn test_store_blobs_max() {
let meta = Meta::default();
let serialized_meta_size = serialized_size(&meta).unwrap() as usize;
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
let num_packets = (BLOB_SIZE - BLOB_HEADER_SIZE) / serialized_packet_size + 1;
let mut blob = Blob::default();
let packets: Vec<_> = (0..num_packets).map(|_| Packet::default()).collect();
// Everything except the last packet should have been written
assert_eq!(blob.store_packets(&packets[..]), (num_packets - 1) as u64);
blob = Blob::default();
// Store packets such that blob only has room for one more
assert_eq!(
blob.store_packets(&packets[..num_packets - 2]),
(num_packets - 2) as u64
);
// Fill the last packet in the blob
assert_eq!(blob.store_packets(&packets[..num_packets - 2]), 1);
// Blob is now full
assert_eq!(blob.store_packets(&packets), 0);
}
} }

View File

@ -308,7 +308,7 @@ mod test {
.collect(); .collect();
let mut blob = Blob::default(); let mut blob = Blob::default();
blob.store_packets(&packets[..]).unwrap(); assert_eq!(blob.store_packets(&packets[..]), num_packets);
let result = deserialize_packets_in_blob( let result = deserialize_packets_in_blob(
&blob.data()[..blob.size()], &blob.data()[..blob.size()],
serialized_packet_size, serialized_packet_size,

View File

@ -26,6 +26,7 @@ pub struct Tpu {
} }
impl Tpu { impl Tpu {
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
id: &Pubkey, id: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,