Measure heap usage while processing the ledger

This commit is contained in:
Michael Vines
2020-01-01 12:19:20 -07:00
parent a0fb9de515
commit 4fe0b116ae
5 changed files with 31 additions and 6 deletions

View File

@ -6,7 +6,6 @@ use crate::{
packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH}, packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH},
poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry}, poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry},
poh_service::PohService, poh_service::PohService,
result::{Error, Result},
}; };
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools; use itertools::Itertools;

View File

@ -18,8 +18,7 @@ use solana_ledger::{
leader_schedule_cache::LeaderScheduleCache, leader_schedule_cache::LeaderScheduleCache,
snapshot_package::SnapshotPackageSender, snapshot_package::SnapshotPackageSender,
}; };
use solana_measure::measure::Measure; use solana_measure::{measure::Measure, thread_mem_usage};
use solana_measure::thread_mem_usage;
use solana_metrics::inc_new_counter_info; use solana_metrics::inc_new_counter_info;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::{ use solana_sdk::{

View File

@ -3,7 +3,6 @@
use crate::packet::{self, send_to, Packets, PacketsRecycler, PACKETS_PER_BATCH}; use crate::packet::{self, send_to, Packets, PacketsRecycler, PACKETS_PER_BATCH};
use crate::recvmmsg::NUM_RCVMMSGS; use crate::recvmmsg::NUM_RCVMMSGS;
use crate::result::{Error, Result};
use solana_measure::thread_mem_usage; use solana_measure::thread_mem_usage;
use solana_sdk::timing::duration_as_ms; use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket; use std::net::UdpSocket;

View File

@ -11,6 +11,7 @@ use itertools::Itertools;
use log::*; use log::*;
use rand::{seq::SliceRandom, thread_rng}; use rand::{seq::SliceRandom, thread_rng};
use rayon::{prelude::*, ThreadPool}; use rayon::{prelude::*, ThreadPool};
use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint, datapoint_error, inc_new_counter_debug}; use solana_metrics::{datapoint, datapoint_error, inc_new_counter_debug};
use solana_rayon_threadlimit::get_thread_count; use solana_rayon_threadlimit::get_thread_count;
use solana_runtime::{ use solana_runtime::{
@ -292,6 +293,9 @@ pub fn process_blocktree_from_root(
opts: &ProcessOptions, opts: &ProcessOptions,
) -> result::Result<(BankForks, Vec<BankForksInfo>, LeaderScheduleCache), BlocktreeProcessorError> { ) -> result::Result<(BankForks, Vec<BankForksInfo>, LeaderScheduleCache), BlocktreeProcessorError> {
info!("processing ledger from root slot {}...", bank.slot()); info!("processing ledger from root slot {}...", bank.slot());
let allocated = thread_mem_usage::Allocatedp::default();
let initial_allocation = allocated.get();
// Starting slot must be a root, and thus has no parents // Starting slot must be a root, and thus has no parents
assert!(bank.parent().is_none()); assert!(bank.parent().is_none());
let start_slot = bank.slot(); let start_slot = bank.slot();
@ -344,8 +348,9 @@ pub fn process_blocktree_from_root(
}; };
info!( info!(
"ledger processed in {}ms. {} fork{} at {}", "ledger processed in {}ms. {} MB allocated. {} fork{} at {}",
duration_as_ms(&now.elapsed()), duration_as_ms(&now.elapsed()),
allocated.since(initial_allocation) / 1_000_000,
bank_forks_info.len(), bank_forks_info.len(),
if bank_forks_info.len() > 1 { "s" } else { "" }, if bank_forks_info.len() > 1 { "s" } else { "" },
bank_forks_info bank_forks_info
@ -461,6 +466,9 @@ fn process_next_slots(
// Only process full slots in blocktree_processor, replay_stage // Only process full slots in blocktree_processor, replay_stage
// handles any partials // handles any partials
if next_meta.is_full() { if next_meta.is_full() {
let allocated = thread_mem_usage::Allocatedp::default();
let initial_allocation = allocated.get();
let next_bank = Arc::new(Bank::new_from_parent( let next_bank = Arc::new(Bank::new_from_parent(
&bank, &bank,
&leader_schedule_cache &leader_schedule_cache
@ -468,7 +476,12 @@ fn process_next_slots(
.unwrap(), .unwrap(),
*next_slot, *next_slot,
)); ));
trace!("Add child bank {} of slot={}", next_slot, bank.slot()); trace!(
"New bank for slot {}, parent slot is {}. {} bytes allocated",
next_slot,
bank.slot(),
allocated.since(initial_allocation)
);
pending_slots.push((*next_slot, next_meta, next_bank, bank.last_blockhash())); pending_slots.push((*next_slot, next_meta, next_bank, bank.last_blockhash()));
} else { } else {
let bfi = BankForksInfo { let bfi = BankForksInfo {
@ -519,6 +532,9 @@ fn process_pending_slots(
continue; continue;
} }
let allocated = thread_mem_usage::Allocatedp::default();
let initial_allocation = allocated.get();
// Fetch all entries for this slot // Fetch all entries for this slot
let entries = blocktree.get_slot_entries(slot, 0, None).map_err(|err| { let entries = blocktree.get_slot_entries(slot, 0, None).map_err(|err| {
warn!("Failed to load entries for slot {}: {:?}", slot, err); warn!("Failed to load entries for slot {}: {:?}", slot, err);
@ -543,6 +559,12 @@ fn process_pending_slots(
fork_info.clear(); fork_info.clear();
} }
trace!(
"Bank for slot {} is complete. {} bytes allocated",
slot,
allocated.since(initial_allocation)
);
if slot >= dev_halt_at_slot { if slot >= dev_halt_at_slot {
let bfi = BankForksInfo { bank_slot: slot }; let bfi = BankForksInfo { bank_slot: slot };
fork_info.push((bank, bfi)); fork_info.push((bank, bfi));

View File

@ -28,6 +28,7 @@ impl Allocatedp {
Self {} Self {}
} }
/// Return current thread heap usage
pub fn get(&self) -> u64 { pub fn get(&self) -> u64 {
#[cfg(unix)] #[cfg(unix)]
{ {
@ -36,4 +37,9 @@ impl Allocatedp {
#[cfg(not(unix))] #[cfg(not(unix))]
0 0
} }
/// Return the difference in thread heap usage since a previous `get()`
pub fn since(&self, previous: u64) -> i64 {
self.get() as i64 - previous as i64
}
} }