Adjust packet batching post-decoupling from blobs (#5783)
This commit is contained in:
@ -7,7 +7,7 @@ use crate::entry;
|
|||||||
use crate::entry::{hash_transactions, Entry};
|
use crate::entry::{hash_transactions, Entry};
|
||||||
use crate::leader_schedule_cache::LeaderScheduleCache;
|
use crate::leader_schedule_cache::LeaderScheduleCache;
|
||||||
use crate::packet;
|
use crate::packet;
|
||||||
use crate::packet::PACKETS_PER_BLOB;
|
use crate::packet::PACKETS_PER_BATCH;
|
||||||
use crate::packet::{Packet, Packets};
|
use crate::packet::{Packet, Packets};
|
||||||
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries};
|
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries};
|
||||||
use crate::poh_service::PohService;
|
use crate::poh_service::PohService;
|
||||||
@ -87,7 +87,7 @@ impl BankingStage {
|
|||||||
verified_vote_receiver: CrossbeamReceiver<VerifiedPackets>,
|
verified_vote_receiver: CrossbeamReceiver<VerifiedPackets>,
|
||||||
num_threads: u32,
|
num_threads: u32,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BLOB);
|
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
|
||||||
// Single thread to generate entries from many banks.
|
// Single thread to generate entries from many banks.
|
||||||
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
|
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
|
||||||
// Once an entry has been recorded, its blockhash is registered with the bank.
|
// Once an entry has been recorded, its blockhash is registered with the bank.
|
||||||
|
@ -32,7 +32,9 @@ pub const BLOB_SIZE: usize = (2 * 1024 - 128); // wikipedia says there should be
|
|||||||
pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - (BLOB_HEADER_SIZE * 2);
|
pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - (BLOB_HEADER_SIZE * 2);
|
||||||
pub const BLOB_DATA_ALIGN: usize = 16; // safe for erasure input pointers, gf.c needs 16byte-aligned buffers
|
pub const BLOB_DATA_ALIGN: usize = 16; // safe for erasure input pointers, gf.c needs 16byte-aligned buffers
|
||||||
pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE;
|
pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE;
|
||||||
pub const PACKETS_PER_BLOB: usize = 8; // reasonable estimate for payment packets per blob based on ~200b transaction size
|
|
||||||
|
pub const PACKETS_PER_BATCH: usize = 256;
|
||||||
|
pub const PACKETS_BATCH_SIZE: usize = (PACKETS_PER_BATCH * PACKET_DATA_SIZE);
|
||||||
|
|
||||||
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
|
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
|
||||||
#[repr(C)]
|
#[repr(C)]
|
||||||
@ -291,9 +293,9 @@ impl Packets {
|
|||||||
trace!("got {} packets", npkts);
|
trace!("got {} packets", npkts);
|
||||||
i += npkts;
|
i += npkts;
|
||||||
total_size += size;
|
total_size += size;
|
||||||
// Try to batch into blob-sized buffers
|
// Try to batch into big enough buffers
|
||||||
// will cause less re-shuffling later on.
|
// will cause less re-shuffling later on.
|
||||||
if start.elapsed().as_millis() > 1 || total_size >= (BLOB_DATA_SIZE - 512) {
|
if start.elapsed().as_millis() > 1 || total_size >= PACKETS_BATCH_SIZE {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
|
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
|
||||||
//!
|
//!
|
||||||
|
|
||||||
use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs, PACKETS_PER_BLOB};
|
use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs, PACKETS_PER_BATCH};
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
use solana_sdk::timing::duration_as_ms;
|
use solana_sdk::timing::duration_as_ms;
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
@ -24,7 +24,7 @@ fn recv_loop(
|
|||||||
name: &'static str,
|
name: &'static str,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
loop {
|
loop {
|
||||||
let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BLOB, name);
|
let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name);
|
||||||
loop {
|
loop {
|
||||||
// Check for exit signal, even if socket is busy
|
// Check for exit signal, even if socket is busy
|
||||||
// (for instance the leader trasaction socket)
|
// (for instance the leader trasaction socket)
|
||||||
|
Reference in New Issue
Block a user