Add limit and shrink policy for recycler (#15320)

This commit is contained in:
carllin
2021-02-24 00:15:58 -08:00
committed by GitHub
parent 2f46da346d
commit c2e8814dce
24 changed files with 515 additions and 127 deletions

1
Cargo.lock generated
View File

@ -4896,6 +4896,7 @@ dependencies = [
"serde", "serde",
"solana-budget-program", "solana-budget-program",
"solana-logger 1.6.0", "solana-logger 1.6.0",
"solana-measure",
"solana-metrics", "solana-metrics",
"solana-rayon-threadlimit", "solana-rayon-threadlimit",
"solana-sdk", "solana-sdk",

View File

@ -75,7 +75,7 @@ fn main() -> Result<()> {
let mut read_channels = Vec::new(); let mut read_channels = Vec::new();
let mut read_threads = Vec::new(); let mut read_threads = Vec::new();
let recycler = PacketsRecycler::default(); let recycler = PacketsRecycler::new_without_limit("bench-streamer-recycler-shrink-stats");
for _ in 0..num_sockets { for _ in 0..num_sockets {
let read = solana_net_utils::bind_to(ip_addr, port, false).unwrap(); let read = solana_net_utils::bind_to(ip_addr, port, false).unwrap();
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();

View File

@ -1853,7 +1853,7 @@ impl ClusterInfo {
let mut last_contact_info_trace = timestamp(); let mut last_contact_info_trace = timestamp();
let mut last_contact_info_save = timestamp(); let mut last_contact_info_save = timestamp();
let mut entrypoints_processed = false; let mut entrypoints_processed = false;
let recycler = PacketsRecycler::default(); let recycler = PacketsRecycler::new_without_limit("gossip-recycler-shrink-stats");
let crds_data = vec![ let crds_data = vec![
CrdsData::Version(Version::new(self.id())), CrdsData::Version(Version::new(self.id())),
CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())), CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())),
@ -2104,7 +2104,7 @@ impl ClusterInfo {
.process_pull_requests(callers.cloned(), timestamp()); .process_pull_requests(callers.cloned(), timestamp());
let output_size_limit = let output_size_limit =
self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE; self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); let mut packets = Packets::new_with_recycler(recycler.clone(), 64).unwrap();
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = { let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let check_pull_request = let check_pull_request =
@ -2389,8 +2389,7 @@ impl ClusterInfo {
if packets.is_empty() { if packets.is_empty() {
None None
} else { } else {
let packets = let packets = Packets::new_with_recycler_data(recycler, packets).unwrap();
Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets);
Some(packets) Some(packets)
} }
} }
@ -3019,7 +3018,8 @@ impl ClusterInfo {
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let exit = exit.clone(); let exit = exit.clone();
let recycler = PacketsRecycler::default(); let recycler =
PacketsRecycler::new_without_limit("cluster-info-listen-recycler-shrink-stats");
Builder::new() Builder::new()
.name("solana-listen".to_string()) .name("solana-listen".to_string())
.spawn(move || { .spawn(move || {
@ -3464,7 +3464,7 @@ mod tests {
.iter() .iter()
.map(|ping| Pong::new(ping, &this_node).unwrap()) .map(|ping| Pong::new(ping, &this_node).unwrap())
.collect(); .collect();
let recycler = PacketsRecycler::default(); let recycler = PacketsRecycler::new_without_limit("");
let packets = cluster_info let packets = cluster_info
.handle_ping_messages( .handle_ping_messages(
remote_nodes remote_nodes

View File

@ -29,7 +29,14 @@ impl FetchStage {
) -> (Self, PacketReceiver) { ) -> (Self, PacketReceiver) {
let (sender, receiver) = channel(); let (sender, receiver) = channel();
( (
Self::new_with_sender(sockets, tpu_forwards_sockets, exit, &sender, &poh_recorder), Self::new_with_sender(
sockets,
tpu_forwards_sockets,
exit,
&sender,
&poh_recorder,
None,
),
receiver, receiver,
) )
} }
@ -39,6 +46,7 @@ impl FetchStage {
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
sender: &PacketSender, sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
allocated_packet_limit: Option<u32>,
) -> Self { ) -> Self {
let tx_sockets = sockets.into_iter().map(Arc::new).collect(); let tx_sockets = sockets.into_iter().map(Arc::new).collect();
let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect(); let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect();
@ -48,6 +56,7 @@ impl FetchStage {
exit, exit,
&sender, &sender,
&poh_recorder, &poh_recorder,
allocated_packet_limit,
) )
} }
@ -92,8 +101,10 @@ impl FetchStage {
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
sender: &PacketSender, sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
limit: Option<u32>,
) -> Self { ) -> Self {
let recycler: PacketsRecycler = Recycler::warmed(1000, 1024); let recycler: PacketsRecycler =
Recycler::warmed(1000, 1024, limit, "fetch_stage_recycler_shrink");
let tpu_threads = sockets.into_iter().map(|socket| { let tpu_threads = sockets.into_iter().map(|socket| {
streamer::receiver( streamer::receiver(

View File

@ -47,7 +47,7 @@ impl GossipService {
gossip_socket.clone(), gossip_socket.clone(),
&exit, &exit,
request_sender, request_sender,
Recycler::default(), Recycler::new_without_limit("gossip-receiver-recycler-shrink-stats"),
"gossip_receiver", "gossip_receiver",
); );
let (response_sender, response_receiver) = channel(); let (response_sender, response_receiver) = channel();

View File

@ -279,7 +279,7 @@ impl ServeRepair {
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let exit = exit.clone(); let exit = exit.clone();
let recycler = PacketsRecycler::default(); let recycler = PacketsRecycler::new_without_limit("serve-repair-recycler-shrink-stats");
Builder::new() Builder::new()
.name("solana-repair-listen".to_string()) .name("solana-repair-listen".to_string())
.spawn(move || { .spawn(move || {
@ -498,11 +498,7 @@ impl ServeRepair {
if let Some(packet) = packet { if let Some(packet) = packet {
inc_new_counter_debug!("serve_repair-window-request-ledger", 1); inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
return Some(Packets::new_with_recycler_data( return Some(Packets::new_with_recycler_data(recycler, vec![packet])).unwrap();
recycler,
"run_window_request",
vec![packet],
));
} }
} }
@ -538,11 +534,7 @@ impl ServeRepair {
from_addr, from_addr,
nonce, nonce,
)?; )?;
return Some(Packets::new_with_recycler_data( return Packets::new_with_recycler_data(recycler, vec![packet]);
recycler,
"run_highest_window_request",
vec![packet],
));
} }
None None
} }
@ -555,7 +547,7 @@ impl ServeRepair {
max_responses: usize, max_responses: usize,
nonce: Nonce, nonce: Nonce,
) -> Option<Packets> { ) -> Option<Packets> {
let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan"); let mut res = Packets::new_with_recycler(recycler.clone(), 64).unwrap();
if let Some(blockstore) = blockstore { if let Some(blockstore) = blockstore {
// Try to find the next "n" parent slots of the input slot // Try to find the next "n" parent slots of the input slot
while let Ok(Some(meta)) = blockstore.meta(slot) { while let Ok(Some(meta)) = blockstore.meta(slot) {
@ -609,7 +601,7 @@ mod tests {
/// test run_window_request responds with the right shred, and do not overrun /// test run_window_request responds with the right shred, and do not overrun
fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) { fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) {
let recycler = PacketsRecycler::default(); let recycler = PacketsRecycler::new_without_limit("");
solana_logger::setup(); solana_logger::setup();
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
{ {
@ -677,7 +669,7 @@ mod tests {
/// test window requests respond with the right shred, and do not overrun /// test window requests respond with the right shred, and do not overrun
fn run_window_request(slot: Slot, nonce: Nonce) { fn run_window_request(slot: Slot, nonce: Nonce) {
let recycler = PacketsRecycler::default(); let recycler = PacketsRecycler::new_without_limit("");
solana_logger::setup(); solana_logger::setup();
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
{ {
@ -845,7 +837,7 @@ mod tests {
fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) { fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) {
solana_logger::setup(); solana_logger::setup();
let recycler = PacketsRecycler::default(); let recycler = PacketsRecycler::new_without_limit("");
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
{ {
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
@ -916,7 +908,7 @@ mod tests {
#[test] #[test]
fn run_orphan_corrupted_shred_size() { fn run_orphan_corrupted_shred_size() {
solana_logger::setup(); solana_logger::setup();
let recycler = PacketsRecycler::default(); let recycler = PacketsRecycler::new_without_limit("");
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
{ {
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());

View File

@ -30,7 +30,7 @@ impl ServeRepairService {
serve_repair_socket.clone(), serve_repair_socket.clone(),
&exit, &exit,
request_sender, request_sender,
Recycler::default(), Recycler::new_without_limit("serve-repair-receiver-recycler-shrink-stats"),
"serve_repair_receiver", "serve_repair_receiver",
); );
let (response_sender, response_receiver) = channel(); let (response_sender, response_receiver) = channel();

View File

@ -167,8 +167,10 @@ impl ShredFetchStage {
sender: &PacketSender, sender: &PacketSender,
bank_forks: Option<Arc<RwLock<BankForks>>>, bank_forks: Option<Arc<RwLock<BankForks>>>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
limit: Option<u32>,
) -> Self { ) -> Self {
let recycler: PacketsRecycler = Recycler::warmed(100, 1024); let recycler: PacketsRecycler =
Recycler::warmed(100, 1024, limit, "shred_fetch_stage_recycler_shrink");
let (mut tvu_threads, tvu_filter) = Self::packet_modifier( let (mut tvu_threads, tvu_filter) = Self::packet_modifier(
sockets, sockets,

View File

@ -23,8 +23,8 @@ impl Default for TransactionSigVerifier {
fn default() -> Self { fn default() -> Self {
init(); init();
Self { Self {
recycler: Recycler::warmed(50, 4096), recycler: Recycler::warmed(50, 4096, None, ""),
recycler_out: Recycler::warmed(50, 4096), recycler_out: Recycler::warmed(50, 4096, None, ""),
} }
} }
} }

View File

@ -26,7 +26,10 @@ impl ShredSigVerifier {
Self { Self {
bank_forks, bank_forks,
leader_schedule_cache, leader_schedule_cache,
recycler_cache: RecyclerCache::warmed(), recycler_cache: RecyclerCache::warmed(
"shred-sig-verifier-offsets-recycler-shrink-stats",
"shred-sig-verifier-buffer-recycler-shrink-stats",
),
} }
} }
fn read_slots(batches: &[Packets]) -> HashSet<u64> { fn read_slots(batches: &[Packets]) -> HashSet<u64> {

View File

@ -67,6 +67,9 @@ impl Tpu {
&exit, &exit,
&packet_sender, &packet_sender,
&poh_recorder, &poh_recorder,
// At 1024 packets per `Packet`, each packet about MTU size ~1k, this is roughly
// 20GB
Some(20_000),
); );
let (verified_sender, verified_receiver) = unbounded(); let (verified_sender, verified_receiver) = unbounded();

View File

@ -145,6 +145,7 @@ impl Tvu {
&fetch_sender, &fetch_sender,
Some(bank_forks.clone()), Some(bank_forks.clone()),
&exit, &exit,
None,
); );
let (verified_sender, verified_receiver) = unbounded(); let (verified_sender, verified_receiver) = unbounded();

View File

@ -16,7 +16,7 @@ const NUM_PACKETS: usize = 256;
const NUM_BATCHES: usize = 1; const NUM_BATCHES: usize = 1;
#[bench] #[bench]
fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) {
let recycler_cache = RecyclerCache::default(); let recycler_cache = RecyclerCache::new("", "");
let mut packets = Packets::default(); let mut packets = Packets::default();
packets.packets.set_pinnable(); packets.packets.set_pinnable();

View File

@ -225,12 +225,21 @@ pub struct EntryVerificationState {
device_verification_data: DeviceVerificationData, device_verification_data: DeviceVerificationData,
} }
#[derive(Default, Clone)] #[derive(Clone)]
pub struct VerifyRecyclers { pub struct VerifyRecyclers {
hash_recycler: Recycler<PinnedVec<Hash>>, hash_recycler: Recycler<PinnedVec<Hash>>,
tick_count_recycler: Recycler<PinnedVec<u64>>, tick_count_recycler: Recycler<PinnedVec<u64>>,
} }
impl Default for VerifyRecyclers {
fn default() -> Self {
Self {
hash_recycler: Recycler::new_without_limit("hash_recycler_shrink_stats"),
tick_count_recycler: Recycler::new_without_limit("tick_count_recycler_shrink_stats"),
}
}
}
#[derive(PartialEq, Clone, Copy, Debug)] #[derive(PartialEq, Clone, Copy, Debug)]
pub enum EntryVerificationStatus { pub enum EntryVerificationStatus {
Failure, Failure,
@ -554,14 +563,12 @@ impl EntrySlice for [Entry] {
.take(self.len()) .take(self.len())
.collect(); .collect();
let mut hashes_pinned = recyclers.hash_recycler.allocate("poh_verify_hash"); let mut hashes_pinned = recyclers.hash_recycler.allocate().unwrap();
hashes_pinned.set_pinnable(); hashes_pinned.set_pinnable();
hashes_pinned.resize(hashes.len(), Hash::default()); hashes_pinned.resize(hashes.len(), Hash::default());
hashes_pinned.copy_from_slice(&hashes); hashes_pinned.copy_from_slice(&hashes);
let mut num_hashes_vec = recyclers let mut num_hashes_vec = recyclers.tick_count_recycler.allocate().unwrap();
.tick_count_recycler
.allocate("poh_verify_num_hashes");
num_hashes_vec.reserve_and_pin(cmp::max(1, self.len())); num_hashes_vec.reserve_and_pin(cmp::max(1, self.len()));
for entry in self { for entry in self {
num_hashes_vec.push(entry.num_hashes.saturating_sub(1)); num_hashes_vec.push(entry.num_hashes.saturating_sub(1));

View File

@ -137,7 +137,7 @@ fn slot_key_data_for_gpu<
.push(*slot); .push(*slot);
} }
} }
let mut keyvec = recycler_cache.buffer().allocate("shred_gpu_pubkeys"); let mut keyvec = recycler_cache.buffer().allocate().unwrap();
keyvec.set_pinnable(); keyvec.set_pinnable();
let mut slot_to_key_ix = HashMap::new(); let mut slot_to_key_ix = HashMap::new();
@ -152,7 +152,7 @@ fn slot_key_data_for_gpu<
slot_to_key_ix.insert(s, i); slot_to_key_ix.insert(s, i);
} }
} }
let mut offsets = recycler_cache.offsets().allocate("shred_offsets"); let mut offsets = recycler_cache.offsets().allocate().unwrap();
offsets.set_pinnable(); offsets.set_pinnable();
slots.iter().for_each(|packet_slots| { slots.iter().for_each(|packet_slots| {
packet_slots.iter().for_each(|slot| { packet_slots.iter().for_each(|slot| {
@ -185,11 +185,11 @@ fn shred_gpu_offsets(
batches: &[Packets], batches: &[Packets],
recycler_cache: &RecyclerCache, recycler_cache: &RecyclerCache,
) -> (TxOffset, TxOffset, TxOffset, Vec<Vec<u32>>) { ) -> (TxOffset, TxOffset, TxOffset, Vec<Vec<u32>>) {
let mut signature_offsets = recycler_cache.offsets().allocate("shred_signatures"); let mut signature_offsets = recycler_cache.offsets().allocate().unwrap();
signature_offsets.set_pinnable(); signature_offsets.set_pinnable();
let mut msg_start_offsets = recycler_cache.offsets().allocate("shred_msg_starts"); let mut msg_start_offsets = recycler_cache.offsets().allocate().unwrap();
msg_start_offsets.set_pinnable(); msg_start_offsets.set_pinnable();
let mut msg_sizes = recycler_cache.offsets().allocate("shred_msg_sizes"); let mut msg_sizes = recycler_cache.offsets().allocate().unwrap();
msg_sizes.set_pinnable(); msg_sizes.set_pinnable();
let mut v_sig_lens = vec![]; let mut v_sig_lens = vec![];
for batch in batches.iter() { for batch in batches.iter() {
@ -242,7 +242,7 @@ pub fn verify_shreds_gpu(
trace!("pubkeys_len: {}", pubkeys_len); trace!("pubkeys_len: {}", pubkeys_len);
let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) = let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) =
shred_gpu_offsets(pubkeys_len, batches, recycler_cache); shred_gpu_offsets(pubkeys_len, batches, recycler_cache);
let mut out = recycler_cache.buffer().allocate("out_buffer"); let mut out = recycler_cache.buffer().allocate().unwrap();
out.set_pinnable(); out.set_pinnable();
elems.push( elems.push(
perf_libs::Elems { perf_libs::Elems {
@ -332,7 +332,7 @@ pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [Packets]) {
} }
pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) -> PinnedVec<u8> { pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) -> PinnedVec<u8> {
let mut vec = cache.buffer().allocate("pinned_keypair"); let mut vec = cache.buffer().allocate().unwrap();
let pubkey = keypair.pubkey().to_bytes(); let pubkey = keypair.pubkey().to_bytes();
let secret = keypair.secret().to_bytes(); let secret = keypair.secret().to_bytes();
let mut hasher = Sha512::default(); let mut hasher = Sha512::default();
@ -370,17 +370,17 @@ pub fn sign_shreds_gpu(
let mut num_packets = num_keypair_packets; let mut num_packets = num_keypair_packets;
//should be zero //should be zero
let mut pubkey_offsets = recycler_cache.offsets().allocate("pubkey offsets"); let mut pubkey_offsets = recycler_cache.offsets().allocate().unwrap();
pubkey_offsets.resize(count, 0); pubkey_offsets.resize(count, 0);
let mut secret_offsets = recycler_cache.offsets().allocate("secret_offsets"); let mut secret_offsets = recycler_cache.offsets().allocate().unwrap();
secret_offsets.resize(count, pubkey_size as u32); secret_offsets.resize(count, pubkey_size as u32);
trace!("offset: {}", offset); trace!("offset: {}", offset);
let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) = let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) =
shred_gpu_offsets(offset, batches, recycler_cache); shred_gpu_offsets(offset, batches, recycler_cache);
let total_sigs = signature_offsets.len(); let total_sigs = signature_offsets.len();
let mut signatures_out = recycler_cache.buffer().allocate("ed25519 signatures"); let mut signatures_out = recycler_cache.buffer().allocate().unwrap();
signatures_out.set_pinnable(); signatures_out.set_pinnable();
signatures_out.resize(total_sigs * sig_size, 0); signatures_out.resize(total_sigs * sig_size, 0);
elems.push( elems.push(
@ -560,7 +560,7 @@ pub mod tests {
fn run_test_sigverify_shreds_gpu(slot: Slot) { fn run_test_sigverify_shreds_gpu(slot: Slot) {
solana_logger::setup(); solana_logger::setup();
let recycler_cache = RecyclerCache::default(); let recycler_cache = RecyclerCache::new("", "");
let mut batch = [Packets::default()]; let mut batch = [Packets::default()];
let mut shred = Shred::new_from_data( let mut shred = Shred::new_from_data(
@ -624,7 +624,7 @@ pub mod tests {
fn run_test_sigverify_shreds_sign_gpu(slot: Slot) { fn run_test_sigverify_shreds_sign_gpu(slot: Slot) {
solana_logger::setup(); solana_logger::setup();
let recycler_cache = RecyclerCache::default(); let recycler_cache = RecyclerCache::new("", "");
let mut packets = Packets::default(); let mut packets = Packets::default();
let num_packets = 32; let num_packets = 32;

View File

@ -21,6 +21,7 @@ solana-sdk = { path = "../sdk", version = "1.6.0" }
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.6.0" } solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.6.0" }
solana-budget-program = { path = "../programs/budget", version = "1.6.0" } solana-budget-program = { path = "../programs/budget", version = "1.6.0" }
solana-logger = { path = "../logger", version = "1.6.0" } solana-logger = { path = "../logger", version = "1.6.0" }
solana-measure = { path = "../measure", version = "1.6.0" }
solana-metrics = { path = "../metrics", version = "1.6.0" } solana-metrics = { path = "../metrics", version = "1.6.0" }
curve25519-dalek = { version = "2" } curve25519-dalek = { version = "2" }

22
perf/benches/recycler.rs Normal file
View File

@ -0,0 +1,22 @@
#![feature(test)]
extern crate test;
use solana_perf::{packet::PacketsRecycler, recycler::Recycler};
use test::Bencher;
#[bench]
fn bench_recycler(bencher: &mut Bencher) {
solana_logger::setup();
let recycler: PacketsRecycler = Recycler::new_without_limit("me");
for _ in 0..1000 {
recycler.recycle_for_test(recycler.allocate().expect("There is no limit"));
}
bencher.iter(move || {
recycler.recycle_for_test(recycler.allocate().expect("There is no limit"));
});
}

View File

@ -15,8 +15,8 @@ fn bench_sigverify(bencher: &mut Bencher) {
// generate packet vector // generate packet vector
let batches = to_packets_chunked(&std::iter::repeat(tx).take(128).collect::<Vec<_>>(), 128); let batches = to_packets_chunked(&std::iter::repeat(tx).take(128).collect::<Vec<_>>(), 128);
let recycler = Recycler::default(); let recycler = Recycler::new_without_limit("");
let recycler_out = Recycler::default(); let recycler_out = Recycler::new_without_limit("");
// verify packets // verify packets
bencher.iter(|| { bencher.iter(|| {
let _ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); let _ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out);
@ -30,7 +30,7 @@ fn bench_get_offsets(bencher: &mut Bencher) {
// generate packet vector // generate packet vector
let batches = to_packets_chunked(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 1024); let batches = to_packets_chunked(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 1024);
let recycler = Recycler::default(); let recycler = Recycler::new_without_limit("");
// verify packets // verify packets
bencher.iter(|| { bencher.iter(|| {
let _ans = sigverify::generate_offsets(&batches, &recycler); let _ans = sigverify::generate_offsets(&batches, &recycler);

View File

@ -76,6 +76,9 @@ impl<T: Default + Clone + Sized> Reset for PinnedVec<T> {
fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>) { fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>) {
self.recycler = Some(recycler); self.recycler = Some(recycler);
} }
fn unset_recycler(&mut self) {
self.recycler = None;
}
} }
impl<T: Clone + Default + Sized> Default for PinnedVec<T> { impl<T: Clone + Default + Sized> Default for PinnedVec<T> {

View File

@ -29,19 +29,21 @@ impl Packets {
Packets { packets } Packets { packets }
} }
pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self { pub fn new_with_recycler(recycler: PacketsRecycler, size: usize) -> Option<Self> {
let mut packets = recycler.allocate(name); let maybe_packets = recycler.allocate();
maybe_packets.map(|mut packets| {
packets.reserve_and_pin(size); packets.reserve_and_pin(size);
Packets { packets } Packets { packets }
})
} }
pub fn new_with_recycler_data( pub fn new_with_recycler_data(
recycler: &PacketsRecycler, recycler: &PacketsRecycler,
name: &'static str,
mut packets: Vec<Packet>, mut packets: Vec<Packet>,
) -> Self { ) -> Option<Self> {
let mut vec = Self::new_with_recycler(recycler.clone(), packets.len(), name); Self::new_with_recycler(recycler.clone(), packets.len()).map(|mut vec| {
vec.packets.append(&mut packets); vec.packets.append(&mut packets);
vec vec
})
} }
pub fn set_addr(&mut self, addr: &SocketAddr) { pub fn set_addr(&mut self, addr: &SocketAddr) {
@ -77,11 +79,7 @@ pub fn to_packets_with_destination<T: Serialize>(
recycler: PacketsRecycler, recycler: PacketsRecycler,
dests_and_data: &[(SocketAddr, T)], dests_and_data: &[(SocketAddr, T)],
) -> Packets { ) -> Packets {
let mut out = Packets::new_with_recycler( let mut out = Packets::new_with_recycler(recycler, dests_and_data.len()).unwrap();
recycler,
dests_and_data.len(),
"to_packets_with_destination",
);
out.packets.resize(dests_and_data.len(), Packet::default()); out.packets.resize(dests_and_data.len(), Packet::default());
for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) { for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) {
if !dest_and_data.0.ip().is_unspecified() && dest_and_data.0.port() != 0 { if !dest_and_data.0.ip().is_unspecified() && dest_and_data.0.port() != 0 {
@ -139,9 +137,9 @@ mod tests {
#[test] #[test]
fn test_to_packets_pinning() { fn test_to_packets_pinning() {
let recycler = PacketsRecycler::default(); let recycler = PacketsRecycler::new_without_limit("");
for i in 0..2 { for i in 0..2 {
let _first_packets = Packets::new_with_recycler(recycler.clone(), i + 1, "first one"); let _first_packets = Packets::new_with_recycler(recycler.clone(), i + 1);
} }
} }
} }

View File

@ -1,7 +1,23 @@
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use std::sync::atomic::AtomicBool; use solana_measure::measure::Measure;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::{
use std::sync::{Arc, Mutex, Weak}; sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex, Weak,
},
time::Instant,
};
pub const DEFAULT_MINIMUM_OBJECT_COUNT: u32 = 1000;
pub const DEFAULT_SHRINK_PCT: u32 = 80;
pub const DEFAULT_MAX_ABOVE_SHRINK_PCT_COUNT: u32 = 10;
pub const DEFAULT_CHECK_SHRINK_INTERVAL_MS: u32 = 10000;
enum AllocationDecision<T> {
Reuse(T),
Allocate(u32, usize),
AllocationLimitReached,
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct RecyclerStats { struct RecyclerStats {
@ -11,36 +27,219 @@ struct RecyclerStats {
max_gc: AtomicUsize, max_gc: AtomicUsize,
} }
#[derive(Clone, Default)] #[derive(Debug, Default)]
pub struct Recycler<T> { struct RecyclerShrinkStats {
resulting_size: u32,
target_size: u32,
ideal_num_to_remove: u32,
shrink_elapsed: u64,
drop_elapsed: u64,
}
impl RecyclerShrinkStats {
fn report(&self, shrink_metric_name: &'static str) {
datapoint_info!(
shrink_metric_name,
("target_size", self.target_size as i64, i64),
("resulting_size", self.resulting_size as i64, i64),
("ideal_num_to_remove", self.ideal_num_to_remove as i64, i64),
("recycler_shrink_elapsed", self.shrink_elapsed as i64, i64),
("drop_elapsed", self.drop_elapsed as i64, i64)
);
}
}
#[derive(Clone)]
pub struct Recycler<T: Reset> {
recycler: Arc<RecyclerX<T>>, recycler: Arc<RecyclerX<T>>,
shrink_metric_name: &'static str,
}
impl<T: Default + Reset> Recycler<T> {
pub fn new_without_limit(shrink_metric_name: &'static str) -> Self {
Self {
recycler: Arc::new(RecyclerX::default()),
shrink_metric_name,
}
}
pub fn new_with_limit(shrink_metric_name: &'static str, limit: u32) -> Self {
Self {
recycler: Arc::new(RecyclerX::new(Some(limit))),
shrink_metric_name,
}
}
} }
#[derive(Debug)] #[derive(Debug)]
pub struct RecyclerX<T> { pub struct ObjectPool<T: Reset> {
gc: Mutex<Vec<T>>, objects: Vec<T>,
shrink_pct: u32,
minimum_object_count: u32,
above_shrink_pct_count: u32,
max_above_shrink_pct_count: u32,
check_shrink_interval_ms: u32,
last_shrink_check_time: Instant,
pub total_allocated_count: u32,
limit: Option<u32>,
}
impl<T: Default + Reset> Default for ObjectPool<T> {
fn default() -> Self {
ObjectPool {
objects: vec![],
shrink_pct: DEFAULT_SHRINK_PCT,
minimum_object_count: DEFAULT_MINIMUM_OBJECT_COUNT,
above_shrink_pct_count: 0,
max_above_shrink_pct_count: DEFAULT_MAX_ABOVE_SHRINK_PCT_COUNT,
check_shrink_interval_ms: DEFAULT_CHECK_SHRINK_INTERVAL_MS,
last_shrink_check_time: Instant::now(),
total_allocated_count: 0,
limit: None,
}
}
}
impl<T: Default + Reset> ObjectPool<T> {
fn new(limit: Option<u32>) -> Self {
Self {
limit,
..Self::default()
}
}
fn len(&self) -> usize {
self.objects.len()
}
fn get_shrink_target(shrink_pct: u32, current_size: u32) -> u32 {
((shrink_pct * current_size) + 99) / 100
}
fn shrink_if_necessary(
&mut self,
recycler_name: &'static str,
) -> Option<(RecyclerShrinkStats, Vec<T>)> {
let is_consistent = self.total_allocated_count as usize >= self.len();
assert!(
is_consistent,
"Object pool inconsistent: {} {} {}",
self.total_allocated_count,
self.len(),
recycler_name
);
if self.last_shrink_check_time.elapsed().as_millis() > self.check_shrink_interval_ms as u128
{
self.last_shrink_check_time = Instant::now();
let shrink_threshold_count =
Self::get_shrink_target(self.shrink_pct, self.total_allocated_count);
// If more than the shrink threshold of all allocated objects are sitting doing nothing,
// increment the `above_shrink_pct_count`.
if self.len() > self.minimum_object_count as usize
&& self.len() > shrink_threshold_count as usize
{
self.above_shrink_pct_count += 1;
} else {
self.above_shrink_pct_count = 0;
}
if self.above_shrink_pct_count as usize >= self.max_above_shrink_pct_count as usize {
let mut recycler_shrink_elapsed = Measure::start("recycler_shrink");
// Do the shrink
let target_size = std::cmp::max(self.minimum_object_count, shrink_threshold_count);
let ideal_num_to_remove = self.total_allocated_count - target_size;
let mut shrink_removed_objects = Vec::with_capacity(ideal_num_to_remove as usize);
for _ in 0..ideal_num_to_remove {
if let Some(mut expired_object) = self.objects.pop() {
expired_object.unset_recycler();
// Drop these outside of the lock because the Drop() implmentation for
// certain objects like PinnedVec's can be expensive
shrink_removed_objects.push(expired_object);
// May not be able to shrink exactly `ideal_num_to_remove` objects since
// in the case of new allocations, `total_allocated_count` is incremented
// before the object is allocated (see `should_allocate_new` logic below).
// This race allows a difference of up to the number of threads allocating
// with this recycler.
self.total_allocated_count -= 1;
} else {
break;
}
}
recycler_shrink_elapsed.stop();
self.above_shrink_pct_count = 0;
Some((
RecyclerShrinkStats {
resulting_size: self.total_allocated_count,
target_size,
ideal_num_to_remove,
shrink_elapsed: recycler_shrink_elapsed.as_us(),
// Filled in later
drop_elapsed: 0,
},
shrink_removed_objects,
))
} else {
None
}
} else {
None
}
}
fn make_allocation_decision(&mut self) -> AllocationDecision<T> {
if let Some(reused_object) = self.objects.pop() {
AllocationDecision::Reuse(reused_object)
} else if let Some(limit) = self.limit {
if self.total_allocated_count < limit {
self.total_allocated_count += 1;
AllocationDecision::Allocate(self.total_allocated_count, self.len())
} else {
AllocationDecision::AllocationLimitReached
}
} else {
self.total_allocated_count += 1;
AllocationDecision::Allocate(self.total_allocated_count, self.len())
}
}
}
#[derive(Debug)]
pub struct RecyclerX<T: Reset> {
gc: Mutex<ObjectPool<T>>,
stats: RecyclerStats, stats: RecyclerStats,
id: usize, id: usize,
} }
impl<T: Default> Default for RecyclerX<T> { impl<T: Default + Reset> Default for RecyclerX<T> {
fn default() -> RecyclerX<T> { fn default() -> RecyclerX<T> {
let id = thread_rng().gen_range(0, 1000); let id = thread_rng().gen_range(0, 1000);
trace!("new recycler..{}", id); trace!("new recycler..{}", id);
RecyclerX { RecyclerX {
gc: Mutex::new(vec![]), gc: Mutex::new(ObjectPool::default()),
stats: RecyclerStats::default(), stats: RecyclerStats::default(),
id, id,
} }
} }
} }
impl<T: Default + Reset> RecyclerX<T> {
fn new(limit: Option<u32>) -> Self {
RecyclerX {
gc: Mutex::new(ObjectPool::new(limit)),
..Self::default()
}
}
}
pub trait Reset { pub trait Reset {
fn reset(&mut self); fn reset(&mut self);
fn warm(&mut self, size_hint: usize); fn warm(&mut self, size_hint: usize);
fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>) fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>)
where where
Self: std::marker::Sized; Self: std::marker::Sized;
fn unset_recycler(&mut self)
where
Self: std::marker::Sized;
} }
lazy_static! { lazy_static! {
@ -56,12 +255,21 @@ fn warm_recyclers() -> bool {
} }
impl<T: Default + Reset + Sized> Recycler<T> { impl<T: Default + Reset + Sized> Recycler<T> {
pub fn warmed(num: usize, size_hint: usize) -> Self { pub fn warmed(
let new = Self::default(); num: u32,
size_hint: usize,
limit: Option<u32>,
shrink_metric_name: &'static str,
) -> Self {
assert!(num <= limit.unwrap_or(std::u32::MAX));
let new = Self {
recycler: Arc::new(RecyclerX::new(limit)),
shrink_metric_name,
};
if warm_recyclers() { if warm_recyclers() {
let warmed_items: Vec<_> = (0..num) let warmed_items: Vec<_> = (0..num)
.map(|_| { .map(|_| {
let mut item = new.allocate("warming"); let mut item = new.allocate().unwrap();
item.warm(size_hint); item.warm(size_hint);
item item
}) })
@ -73,33 +281,55 @@ impl<T: Default + Reset + Sized> Recycler<T> {
new new
} }
pub fn allocate(&self, name: &'static str) -> T { pub fn allocate(&self) -> Option<T> {
let new = self let (allocation_decision, shrink_output) = {
let mut object_pool = self
.recycler .recycler
.gc .gc
.lock() .lock()
.expect("recycler lock in pb fn allocate") .expect("recycler lock in pb fn allocate");
.pop();
if let Some(mut x) = new { let shrink_output = object_pool.shrink_if_necessary(self.shrink_metric_name);
self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
x.reset(); // Grab the allocation decision and shrinking stats, do the expensive
return x; // allocations/deallocations outside of the lock.
(object_pool.make_allocation_decision(), shrink_output)
};
if let Some((mut shrink_stats, shrink_removed_objects)) = shrink_output {
let mut shrink_removed_object_elapsed = Measure::start("shrink_removed_object_elapsed");
drop(shrink_removed_objects);
shrink_removed_object_elapsed.stop();
shrink_stats.drop_elapsed = shrink_removed_object_elapsed.as_us();
shrink_stats.report(self.shrink_metric_name);
} }
let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed); match allocation_decision {
trace!( AllocationDecision::Reuse(mut reused_object) => {
"allocating new: total {} {:?} id: {} reuse: {} max_gc: {}", self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
total, reused_object.reset();
name, Some(reused_object)
self.recycler.id, }
self.recycler.stats.reuse.load(Ordering::Relaxed), AllocationDecision::Allocate(total_allocated_count, recycled_len) => {
self.recycler.stats.max_gc.load(Ordering::Relaxed),
);
let mut t = T::default(); let mut t = T::default();
t.set_recycler(Arc::downgrade(&self.recycler)); t.set_recycler(Arc::downgrade(&self.recycler));
t if total_allocated_count % 1000 == 0 {
datapoint_info!(
"recycler_total_allocated_count",
("name", self.shrink_metric_name, String),
("count", total_allocated_count as i64, i64),
("recycled_len", recycled_len as i64, i64),
)
}
Some(t)
}
AllocationDecision::AllocationLimitReached => None,
}
}
pub fn recycle_for_test(&self, x: T) {
self.recycler.recycle(x);
} }
} }
@ -107,7 +337,7 @@ impl<T: Default + Reset> RecyclerX<T> {
pub fn recycle(&self, x: T) { pub fn recycle(&self, x: T) {
let len = { let len = {
let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
gc.push(x); gc.objects.push(x);
gc.len() gc.len()
}; };
@ -137,6 +367,8 @@ impl<T: Default + Reset> RecyclerX<T> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::packet::PacketsRecycler;
use std::{thread::sleep, time::Duration};
impl Reset for u64 { impl Reset for u64 {
fn reset(&mut self) { fn reset(&mut self) {
@ -144,19 +376,115 @@ mod tests {
} }
fn warm(&mut self, _size_hint: usize) {} fn warm(&mut self, _size_hint: usize) {}
fn set_recycler(&mut self, _recycler: Weak<RecyclerX<Self>>) {} fn set_recycler(&mut self, _recycler: Weak<RecyclerX<Self>>) {}
fn unset_recycler(&mut self) {}
} }
#[test] #[test]
fn test_recycler() { fn test_recycler() {
let recycler = Recycler::default(); let recycler = Recycler::new_without_limit("");
let mut y: u64 = recycler.allocate("test_recycler1"); let mut y: u64 = recycler.allocate().unwrap();
assert_eq!(y, 0); assert_eq!(y, 0);
y = 20; y = 20;
let recycler2 = recycler.clone(); let recycler2 = recycler.clone();
recycler2.recycler.recycle(y); recycler2.recycler.recycle(y);
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1);
let z = recycler.allocate("test_recycler2"); let z = recycler.allocate().unwrap();
assert_eq!(z, 10); assert_eq!(z, 10);
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0);
} }
#[test]
fn test_recycler_limit() {
let limit = 10;
assert!(limit <= DEFAULT_MINIMUM_OBJECT_COUNT);
// Use PacketRecycler so that dropping the allocated object
// actually recycles
let recycler = PacketsRecycler::new_with_limit("", limit);
let mut allocated_items = vec![];
for i in 0..limit * 2 {
let x = recycler.allocate();
if i < limit {
allocated_items.push(x.unwrap());
} else {
assert!(x.is_none());
}
}
assert_eq!(
recycler.recycler.gc.lock().unwrap().total_allocated_count,
limit
);
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0_usize);
drop(allocated_items);
assert_eq!(
recycler.recycler.gc.lock().unwrap().total_allocated_count,
limit
);
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), limit as usize);
}
#[test]
fn test_recycler_shrink() {
let limit = DEFAULT_MINIMUM_OBJECT_COUNT * 2;
let max_above_shrink_pct_count = 2;
let shrink_pct = 80;
let recycler = PacketsRecycler::new_with_limit("", limit);
{
let mut locked_recycler = recycler.recycler.gc.lock().unwrap();
// Make the shrink interval a long time so shrinking doesn't happen yet
locked_recycler.check_shrink_interval_ms = std::u32::MAX;
// Set the count to one so that we shrink on every other allocation later.
locked_recycler.max_above_shrink_pct_count = max_above_shrink_pct_count;
locked_recycler.shrink_pct = shrink_pct;
}
let mut allocated_items = vec![];
for _ in 0..limit {
allocated_items.push(recycler.allocate().unwrap());
}
assert_eq!(
recycler.recycler.gc.lock().unwrap().total_allocated_count,
limit
);
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0);
drop(allocated_items);
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), limit as usize);
let shrink_interval = 10;
{
let mut locked_recycler = recycler.recycler.gc.lock().unwrap();
locked_recycler.check_shrink_interval_ms = shrink_interval;
}
let mut current_total_allocated_count =
recycler.recycler.gc.lock().unwrap().total_allocated_count;
// Shrink the recycler until it hits the minimum
let mut i = 0;
while current_total_allocated_count != DEFAULT_MINIMUM_OBJECT_COUNT {
sleep(Duration::from_millis(shrink_interval as u64 * 2));
recycler.allocate().unwrap();
let expected_above_shrink_pct_count = (i + 1) % max_above_shrink_pct_count;
assert_eq!(
recycler.recycler.gc.lock().unwrap().above_shrink_pct_count,
(i + 1) % max_above_shrink_pct_count
);
if expected_above_shrink_pct_count == 0 {
// Shrink happened, update the expected `current_total_allocated_count`;
current_total_allocated_count = std::cmp::max(
ObjectPool::<u64>::get_shrink_target(shrink_pct, current_total_allocated_count),
DEFAULT_MINIMUM_OBJECT_COUNT,
);
assert_eq!(
recycler.recycler.gc.lock().unwrap().total_allocated_count,
current_total_allocated_count
);
assert_eq!(
recycler.recycler.gc.lock().unwrap().len(),
current_total_allocated_count as usize
);
}
i += 1;
}
}
} }

View File

@ -2,17 +2,24 @@ use crate::cuda_runtime::PinnedVec;
use crate::recycler::Recycler; use crate::recycler::Recycler;
use crate::sigverify::TxOffset; use crate::sigverify::TxOffset;
#[derive(Default, Clone)] #[derive(Clone)]
pub struct RecyclerCache { pub struct RecyclerCache {
recycler_offsets: Recycler<TxOffset>, recycler_offsets: Recycler<TxOffset>,
recycler_buffer: Recycler<PinnedVec<u8>>, recycler_buffer: Recycler<PinnedVec<u8>>,
} }
impl RecyclerCache { impl RecyclerCache {
pub fn warmed() -> Self { pub fn new(offsets_shrink_name: &'static str, buffer_shrink_name: &'static str) -> Self {
Self { Self {
recycler_offsets: Recycler::warmed(50, 4096), recycler_offsets: Recycler::new_without_limit(offsets_shrink_name),
recycler_buffer: Recycler::warmed(50, 4096), recycler_buffer: Recycler::new_without_limit(buffer_shrink_name),
}
}
pub fn warmed(offsets_shrink_name: &'static str, buffer_shrink_name: &'static str) -> Self {
Self {
recycler_offsets: Recycler::warmed(50, 4096, None, offsets_shrink_name),
recycler_buffer: Recycler::warmed(50, 4096, None, buffer_shrink_name),
} }
} }
pub fn offsets(&self) -> &Recycler<TxOffset> { pub fn offsets(&self) -> &Recycler<TxOffset> {

View File

@ -194,13 +194,13 @@ fn get_packet_offsets(packet: &Packet, current_offset: u32) -> PacketOffsets {
pub fn generate_offsets(batches: &[Packets], recycler: &Recycler<TxOffset>) -> TxOffsets { pub fn generate_offsets(batches: &[Packets], recycler: &Recycler<TxOffset>) -> TxOffsets {
debug!("allocating.."); debug!("allocating..");
let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets"); let mut signature_offsets: PinnedVec<_> = recycler.allocate().unwrap();
signature_offsets.set_pinnable(); signature_offsets.set_pinnable();
let mut pubkey_offsets: PinnedVec<_> = recycler.allocate("pubkey_offsets"); let mut pubkey_offsets: PinnedVec<_> = recycler.allocate().unwrap();
pubkey_offsets.set_pinnable(); pubkey_offsets.set_pinnable();
let mut msg_start_offsets: PinnedVec<_> = recycler.allocate("msg_start_offsets"); let mut msg_start_offsets: PinnedVec<_> = recycler.allocate().unwrap();
msg_start_offsets.set_pinnable(); msg_start_offsets.set_pinnable();
let mut msg_sizes: PinnedVec<_> = recycler.allocate("msg_size_offsets"); let mut msg_sizes: PinnedVec<_> = recycler.allocate().unwrap();
msg_sizes.set_pinnable(); msg_sizes.set_pinnable();
let mut current_packet = 0; let mut current_packet = 0;
let mut v_sig_lens = Vec::new(); let mut v_sig_lens = Vec::new();
@ -347,7 +347,7 @@ pub fn ed25519_verify(
debug!("CUDA ECDSA for {}", batch_size(batches)); debug!("CUDA ECDSA for {}", batch_size(batches));
debug!("allocating out.."); debug!("allocating out..");
let mut out = recycler_out.allocate("out_buffer"); let mut out = recycler_out.allocate().unwrap();
out.set_pinnable(); out.set_pinnable();
let mut elems = Vec::new(); let mut elems = Vec::new();
let mut rvs = Vec::new(); let mut rvs = Vec::new();
@ -678,8 +678,8 @@ mod tests {
let batches = generate_packet_vec(&packet, n, 2); let batches = generate_packet_vec(&packet, n, 2);
let recycler = Recycler::default(); let recycler = Recycler::new_without_limit("");
let recycler_out = Recycler::default(); let recycler_out = Recycler::new_without_limit("");
// verify packets // verify packets
let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out);
@ -697,8 +697,8 @@ mod tests {
let batches = generate_packet_vec(&packet, 1, 1); let batches = generate_packet_vec(&packet, 1, 1);
let recycler = Recycler::default(); let recycler = Recycler::new_without_limit("");
let recycler_out = Recycler::default(); let recycler_out = Recycler::new_without_limit("");
// verify packets // verify packets
let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out);
@ -735,8 +735,8 @@ mod tests {
batches[0].packets.push(packet); batches[0].packets.push(packet);
let recycler = Recycler::default(); let recycler = Recycler::new_without_limit("");
let recycler_out = Recycler::default(); let recycler_out = Recycler::new_without_limit("");
// verify packets // verify packets
let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out);
@ -755,8 +755,8 @@ mod tests {
let tx = test_multisig_tx(); let tx = test_multisig_tx();
let packet = sigverify::make_packet_from_transaction(tx); let packet = sigverify::make_packet_from_transaction(tx);
let recycler = Recycler::default(); let recycler = Recycler::new_without_limit("");
let recycler_out = Recycler::default(); let recycler_out = Recycler::new_without_limit("");
for _ in 0..50 { for _ in 0..50 {
let n = thread_rng().gen_range(1, 30); let n = thread_rng().gen_range(1, 30);
let num_batches = thread_rng().gen_range(2, 30); let num_batches = thread_rng().gen_range(2, 30);

View File

@ -42,7 +42,10 @@ fn recv_loop(
let mut now = Instant::now(); let mut now = Instant::now();
let mut num_max_received = 0; // Number of times maximum packets were received let mut num_max_received = 0; // Number of times maximum packets were received
loop { loop {
let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name); let (mut msgs, should_send) =
Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH)
.map(|allocated| (allocated, true))
.unwrap_or((Packets::with_capacity(PACKETS_PER_BATCH), false));
loop { loop {
// Check for exit signal, even if socket is busy // Check for exit signal, even if socket is busy
// (for instance the leader transaction socket) // (for instance the leader transaction socket)
@ -55,7 +58,7 @@ fn recv_loop(
} }
recv_count += len; recv_count += len;
call_count += 1; call_count += 1;
if len > 0 { if len > 0 && should_send {
channel.send(msgs)?; channel.send(msgs)?;
} }
break; break;
@ -198,7 +201,13 @@ mod test {
let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = receiver(Arc::new(read), &exit, s_reader, Recycler::default(), "test"); let t_receiver = receiver(
Arc::new(read),
&exit,
s_reader,
Recycler::new_without_limit(""),
"test",
);
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);