The number of shreds that result from a given number of entries is variable and in our test case, somewhat unintuitive to think about when trying to determine how much data we're pushing into the blockstore. So, this change converts the unit of test parameters from entries to shreds. This change also cleans up some variable naming for clarity and prints.
584 lines
19 KiB
Rust
584 lines
19 KiB
Rust
#![allow(clippy::integer_arithmetic)]
|
|
// Long-running ledger_cleanup tests
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use {
|
|
log::*,
|
|
solana_core::ledger_cleanup_service::LedgerCleanupService,
|
|
solana_ledger::{
|
|
blockstore::{make_many_slot_shreds, Blockstore},
|
|
get_tmp_ledger_path,
|
|
},
|
|
solana_measure::measure::Measure,
|
|
std::{
|
|
collections::VecDeque,
|
|
str::FromStr,
|
|
sync::{
|
|
atomic::{AtomicBool, AtomicU64, Ordering},
|
|
mpsc::channel,
|
|
Arc, Mutex, RwLock,
|
|
},
|
|
thread::{self, Builder, JoinHandle},
|
|
time::{Duration, Instant},
|
|
},
|
|
systemstat::{CPULoad, Platform, System},
|
|
};
|
|
|
|
const DEFAULT_BENCHMARK_SLOTS: u64 = 50;
|
|
const DEFAULT_BATCH_SIZE_SLOTS: u64 = 1;
|
|
const DEFAULT_MAX_LEDGER_SHREDS: u64 = 50;
|
|
const DEFAULT_SHREDS_PER_SLOT: u64 = 25;
|
|
const DEFAULT_STOP_SIZE_BYTES: u64 = 0;
|
|
const DEFAULT_STOP_SIZE_ITERATIONS: u64 = 0;
|
|
|
|
const ROCKSDB_FLUSH_GRACE_PERIOD_SECS: u64 = 20;
|
|
|
|
#[derive(Debug)]
|
|
struct BenchmarkConfig {
|
|
benchmark_slots: u64,
|
|
batch_size_slots: u64,
|
|
max_ledger_shreds: u64,
|
|
shreds_per_slot: u64,
|
|
stop_size_bytes: u64,
|
|
stop_size_iterations: u64,
|
|
pre_generate_data: bool,
|
|
cleanup_blockstore: bool,
|
|
assert_compaction: bool,
|
|
compaction_interval: Option<u64>,
|
|
no_compaction: bool,
|
|
num_writers: u64,
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug)]
|
|
struct CpuStatsInner {
|
|
cpu_user: f32,
|
|
cpu_system: f32,
|
|
cpu_idle: f32,
|
|
}
|
|
|
|
impl From<CPULoad> for CpuStatsInner {
|
|
fn from(cpu: CPULoad) -> Self {
|
|
Self {
|
|
cpu_user: cpu.user * 100.0,
|
|
cpu_system: cpu.system * 100.0,
|
|
cpu_idle: cpu.idle * 100.0,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Default for CpuStatsInner {
|
|
fn default() -> Self {
|
|
Self {
|
|
cpu_user: 0.0,
|
|
cpu_system: 0.0,
|
|
cpu_idle: 0.0,
|
|
}
|
|
}
|
|
}
|
|
|
|
struct CpuStats {
|
|
stats: RwLock<CpuStatsInner>,
|
|
sys: System,
|
|
}
|
|
|
|
impl Default for CpuStats {
|
|
fn default() -> Self {
|
|
Self {
|
|
stats: RwLock::new(CpuStatsInner::default()),
|
|
sys: System::new(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl CpuStats {
|
|
fn update(&self) {
|
|
if let Ok(cpu) = self.sys.cpu_load_aggregate() {
|
|
std::thread::sleep(Duration::from_millis(400));
|
|
let cpu_new = CpuStatsInner::from(cpu.done().unwrap());
|
|
*self.stats.write().unwrap() = cpu_new;
|
|
}
|
|
}
|
|
|
|
fn get_stats(&self) -> CpuStatsInner {
|
|
*self.stats.read().unwrap()
|
|
}
|
|
}
|
|
|
|
struct CpuStatsUpdater {
|
|
cpu_stats: Arc<CpuStats>,
|
|
t_cleanup: JoinHandle<()>,
|
|
}
|
|
|
|
impl CpuStatsUpdater {
|
|
pub fn new(exit: &Arc<AtomicBool>) -> Self {
|
|
let exit = exit.clone();
|
|
let cpu_stats = Arc::new(CpuStats::default());
|
|
let cpu_stats_clone = cpu_stats.clone();
|
|
|
|
let t_cleanup = Builder::new()
|
|
.name("cpu_info".to_string())
|
|
.spawn(move || loop {
|
|
if exit.load(Ordering::Relaxed) {
|
|
break;
|
|
}
|
|
cpu_stats_clone.update();
|
|
})
|
|
.unwrap();
|
|
|
|
Self {
|
|
cpu_stats,
|
|
t_cleanup,
|
|
}
|
|
}
|
|
|
|
pub fn get_stats(&self) -> CpuStatsInner {
|
|
self.cpu_stats.get_stats()
|
|
}
|
|
|
|
pub fn join(self) -> std::thread::Result<()> {
|
|
self.t_cleanup.join()
|
|
}
|
|
}
|
|
|
|
fn read_env<T>(key: &str, default: T) -> T
|
|
where
|
|
T: FromStr,
|
|
{
|
|
match std::env::var(key) {
|
|
Ok(val) => val.parse().unwrap_or(default),
|
|
Err(_e) => default,
|
|
}
|
|
}
|
|
|
|
fn get_benchmark_config() -> BenchmarkConfig {
|
|
let benchmark_slots = read_env("BENCHMARK_SLOTS", DEFAULT_BENCHMARK_SLOTS);
|
|
let batch_size_slots = read_env("BATCH_SIZE", DEFAULT_BATCH_SIZE_SLOTS);
|
|
let max_ledger_shreds = read_env("MAX_LEDGER_SHREDS", DEFAULT_MAX_LEDGER_SHREDS);
|
|
let shreds_per_slot = read_env("SHREDS_PER_SLOT", DEFAULT_SHREDS_PER_SLOT);
|
|
let stop_size_bytes = read_env("STOP_SIZE_BYTES", DEFAULT_STOP_SIZE_BYTES);
|
|
let stop_size_iterations = read_env("STOP_SIZE_ITERATIONS", DEFAULT_STOP_SIZE_ITERATIONS);
|
|
let pre_generate_data = read_env("PRE_GENERATE_DATA", false);
|
|
let cleanup_blockstore = read_env("CLEANUP_BLOCKSTORE", true);
|
|
// set default to `true` once compaction is merged
|
|
let assert_compaction = read_env("ASSERT_COMPACTION", false);
|
|
let compaction_interval = match read_env("COMPACTION_INTERVAL", 0) {
|
|
maybe_zero if maybe_zero == 0 => None,
|
|
non_zero => Some(non_zero),
|
|
};
|
|
let no_compaction = read_env("NO_COMPACTION", false);
|
|
let num_writers = read_env("NUM_WRITERS", 1);
|
|
|
|
BenchmarkConfig {
|
|
benchmark_slots,
|
|
batch_size_slots,
|
|
max_ledger_shreds,
|
|
shreds_per_slot,
|
|
stop_size_bytes,
|
|
stop_size_iterations,
|
|
pre_generate_data,
|
|
cleanup_blockstore,
|
|
assert_compaction,
|
|
compaction_interval,
|
|
no_compaction,
|
|
num_writers,
|
|
}
|
|
}
|
|
|
|
fn emit_header() {
|
|
println!("TIME_MS,DELTA_MS,START_SLOT,BATCH_SIZE,SHREDS,MAX,SIZE,DELTA_SIZE,CPU_USER,CPU_SYSTEM,CPU_IDLE");
|
|
}
|
|
|
|
fn emit_stats(
|
|
time_initial: Instant,
|
|
time_previous: &mut Instant,
|
|
storage_previous: &mut u64,
|
|
start_slot: u64,
|
|
batch_size: u64,
|
|
num_shreds: u64,
|
|
max_shreds: i64,
|
|
blockstore: &Blockstore,
|
|
cpu: &CpuStatsInner,
|
|
) {
|
|
let time_now = Instant::now();
|
|
let storage_now = blockstore.storage_size().unwrap_or(0);
|
|
let (cpu_user, cpu_system, cpu_idle) = (cpu.cpu_user, cpu.cpu_system, cpu.cpu_idle);
|
|
|
|
println!(
|
|
"{},{},{},{},{},{},{},{},{:.2},{:.2},{:.2}",
|
|
time_now.duration_since(time_initial).as_millis(),
|
|
time_now.duration_since(*time_previous).as_millis(),
|
|
start_slot,
|
|
batch_size,
|
|
num_shreds,
|
|
max_shreds,
|
|
storage_now,
|
|
storage_now as i64 - *storage_previous as i64,
|
|
cpu_user,
|
|
cpu_system,
|
|
cpu_idle,
|
|
);
|
|
|
|
*time_previous = time_now;
|
|
*storage_previous = storage_now;
|
|
}
|
|
|
|
/**
|
|
* Example run command:
|
|
* BENCHMARK_SLOTS=10000 BATCH_SIZE=10 SHREDS_PER_SLOT=1000 NUM_WRITERS=1 \
|
|
* PRE_GENERATE_DATA=true cargo test --release tests::test_ledger_cleanup_compaction \
|
|
* -- --exact --nocapture
|
|
*/
|
|
#[test]
|
|
fn test_ledger_cleanup_compaction() {
|
|
solana_logger::setup();
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
let mut blockstore = Blockstore::open(&blockstore_path).unwrap();
|
|
let config = get_benchmark_config();
|
|
if config.no_compaction {
|
|
blockstore.set_no_compaction(true);
|
|
}
|
|
let blockstore = Arc::new(blockstore);
|
|
|
|
eprintln!("BENCHMARK CONFIG: {:?}", config);
|
|
eprintln!("LEDGER_PATH: {:?}", &blockstore_path);
|
|
|
|
let benchmark_slots = config.benchmark_slots;
|
|
let batch_size_slots = config.batch_size_slots;
|
|
let max_ledger_shreds = config.max_ledger_shreds;
|
|
let shreds_per_slot = config.shreds_per_slot;
|
|
let stop_size_bytes = config.stop_size_bytes;
|
|
let stop_size_iterations = config.stop_size_iterations;
|
|
let pre_generate_data = config.pre_generate_data;
|
|
let compaction_interval = config.compaction_interval;
|
|
let num_writers = config.num_writers;
|
|
|
|
let num_batches = benchmark_slots / batch_size_slots;
|
|
|
|
let (sender, receiver) = channel();
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
let cleaner = LedgerCleanupService::new(
|
|
receiver,
|
|
blockstore.clone(),
|
|
max_ledger_shreds,
|
|
&exit,
|
|
compaction_interval,
|
|
None,
|
|
);
|
|
|
|
let exit_cpu = Arc::new(AtomicBool::new(false));
|
|
let sys = CpuStatsUpdater::new(&exit_cpu);
|
|
|
|
let mut shreds = VecDeque::new();
|
|
|
|
if pre_generate_data {
|
|
let t0 = Instant::now();
|
|
eprintln!("PRE_GENERATE_DATA: (this may take a while)");
|
|
for i in 0..num_batches {
|
|
let start_slot = i * batch_size_slots;
|
|
let (new_shreds, _) =
|
|
make_many_slot_shreds(start_slot, batch_size_slots, shreds_per_slot);
|
|
shreds.push_back(new_shreds);
|
|
}
|
|
eprintln!("PRE_GENERATE_DATA: took {} ms", t0.elapsed().as_millis());
|
|
}
|
|
let shreds = Arc::new(Mutex::new(shreds));
|
|
|
|
let (mut shreds_batch, entries) =
|
|
make_many_slot_shreds(0, batch_size_slots, shreds_per_slot);
|
|
info!(
|
|
"batch size (slots): {}, entries_per_slot: {}, shreds_per_slot: {}",
|
|
batch_size_slots,
|
|
entries.len() as u64 / batch_size_slots,
|
|
shreds_per_slot
|
|
);
|
|
|
|
let time_initial = Instant::now();
|
|
let mut time_previous = time_initial;
|
|
let mut storage_previous = 0;
|
|
let mut stop_size_bytes_exceeded_iterations = 0;
|
|
|
|
emit_header();
|
|
emit_stats(
|
|
time_initial,
|
|
&mut time_previous,
|
|
&mut storage_previous,
|
|
0,
|
|
0,
|
|
0,
|
|
0,
|
|
&blockstore,
|
|
&sys.get_stats(),
|
|
);
|
|
|
|
let mut total_make = 0;
|
|
let mut num_slots = 0;
|
|
let mut total_slots = 0;
|
|
let mut time = Instant::now();
|
|
let mut start = Measure::start("start");
|
|
let insert_exit = Arc::new(AtomicBool::new(false));
|
|
let mut insert_threads = vec![];
|
|
|
|
for i in 0..num_writers {
|
|
let cloned_insert_exit = insert_exit.clone();
|
|
let cloned_blockstore = blockstore.clone();
|
|
let cloned_shreds = shreds.clone();
|
|
let insert_thread = Builder::new()
|
|
.name(format!("insert_shreds-{}", i))
|
|
.spawn(move || {
|
|
let start = Instant::now();
|
|
let mut now = Instant::now();
|
|
let mut total = 0;
|
|
let mut total_batches = 0;
|
|
let mut total_inserted_shreds = 0;
|
|
let mut num_shreds = 0;
|
|
let mut max_speed = 0f32;
|
|
let mut min_speed = f32::MAX;
|
|
loop {
|
|
let (new_shreds, len) = {
|
|
let mut sl = cloned_shreds.lock().unwrap();
|
|
(sl.pop_front(), sl.len())
|
|
};
|
|
if now.elapsed().as_secs() > 0 {
|
|
let shreds_per_second = num_shreds as f32 / now.elapsed().as_secs() as f32;
|
|
warn!(
|
|
"T{} tried: {} inserted: {} batches: {} len: {} shreds_per_second: {}",
|
|
i, total, total_inserted_shreds, total_batches, len, shreds_per_second,
|
|
);
|
|
let average_speed =
|
|
total_inserted_shreds as f32 / start.elapsed().as_secs() as f32;
|
|
max_speed = max_speed.max(shreds_per_second);
|
|
min_speed = min_speed.min(shreds_per_second);
|
|
warn!(
|
|
"highest: {} lowest: {} avg: {}",
|
|
max_speed, min_speed, average_speed
|
|
);
|
|
now = Instant::now();
|
|
num_shreds = 0;
|
|
}
|
|
if let Some(new_shreds) = new_shreds {
|
|
total += new_shreds.len();
|
|
total_batches += 1;
|
|
let br = cloned_blockstore.insert_shreds(
|
|
new_shreds, None, false).unwrap();
|
|
total_inserted_shreds += br.1.len();
|
|
num_shreds += br.1.len();
|
|
} else {
|
|
warn!("Thread {} sleeps for 200 millis", i);
|
|
thread::sleep(Duration::from_millis(200));
|
|
}
|
|
if cloned_insert_exit.load(Ordering::Relaxed) {
|
|
info!(
|
|
"insert exiting... highest shreds/s: {} lowest shreds/s: {}",
|
|
max_speed, min_speed
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
})
|
|
.unwrap();
|
|
insert_threads.push(insert_thread);
|
|
}
|
|
|
|
for i in 0..num_batches {
|
|
let start_slot = i * batch_size_slots;
|
|
|
|
if time.elapsed().as_secs() > 0 {
|
|
warn!(
|
|
"total slots: {} slots: {} make: {}ms {:.2}",
|
|
total_slots,
|
|
num_slots,
|
|
total_make / (1000),
|
|
num_slots as f32 / time.elapsed().as_secs() as f32,
|
|
);
|
|
num_slots = 0;
|
|
total_make = 0;
|
|
time = Instant::now();
|
|
}
|
|
|
|
if !pre_generate_data && shreds.lock().unwrap().len() < 50 {
|
|
let mut make_time = Measure::start("make_entries");
|
|
num_slots += batch_size_slots;
|
|
total_slots += batch_size_slots;
|
|
shreds_batch
|
|
.iter_mut()
|
|
.for_each(|shred| shred.set_slot(shred.slot() + batch_size_slots));
|
|
let new_shreds = shreds_batch.clone();
|
|
shreds.lock().unwrap().push_back(new_shreds);
|
|
make_time.stop();
|
|
total_make += make_time.as_us();
|
|
}
|
|
|
|
sender.send(start_slot).unwrap();
|
|
|
|
emit_stats(
|
|
time_initial,
|
|
&mut time_previous,
|
|
&mut storage_previous,
|
|
start_slot,
|
|
batch_size_slots,
|
|
shreds_per_slot,
|
|
max_ledger_shreds as i64,
|
|
&blockstore,
|
|
&sys.get_stats(),
|
|
);
|
|
|
|
if stop_size_bytes > 0 {
|
|
if storage_previous >= stop_size_bytes {
|
|
stop_size_bytes_exceeded_iterations += 1;
|
|
} else {
|
|
stop_size_bytes_exceeded_iterations = 0;
|
|
}
|
|
|
|
if stop_size_bytes_exceeded_iterations > stop_size_iterations {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
let mut now = Instant::now();
|
|
loop {
|
|
if now.elapsed().as_secs() > 1 {
|
|
warn!(
|
|
"waiting for insert queue to clear.. {}",
|
|
shreds.lock().unwrap().len()
|
|
);
|
|
now = Instant::now();
|
|
}
|
|
if shreds.lock().unwrap().is_empty() {
|
|
break;
|
|
} else {
|
|
thread::sleep(Duration::from_millis(200));
|
|
}
|
|
}
|
|
insert_exit.store(true, Ordering::Relaxed);
|
|
|
|
while let Some(thread) = insert_threads.pop() {
|
|
thread.join().unwrap();
|
|
}
|
|
start.stop();
|
|
|
|
eprintln!(
|
|
"done {} {} slots/s",
|
|
start,
|
|
benchmark_slots as f32 / start.as_s()
|
|
);
|
|
let u1 = storage_previous;
|
|
|
|
// send final `ledger_cleanup` notification (since iterations above are zero-based)
|
|
sender.send(benchmark_slots).unwrap();
|
|
|
|
emit_stats(
|
|
time_initial,
|
|
&mut time_previous,
|
|
&mut storage_previous,
|
|
benchmark_slots,
|
|
0,
|
|
0,
|
|
max_ledger_shreds as i64,
|
|
&blockstore,
|
|
&sys.get_stats(),
|
|
);
|
|
|
|
// Poll on some compaction happening
|
|
let start_poll = Instant::now();
|
|
while blockstore.storage_size().unwrap_or(0) >= u1 {
|
|
if start_poll.elapsed().as_secs() > ROCKSDB_FLUSH_GRACE_PERIOD_SECS {
|
|
break;
|
|
}
|
|
std::thread::sleep(Duration::from_millis(200));
|
|
}
|
|
|
|
info!("done polling");
|
|
emit_stats(
|
|
time_initial,
|
|
&mut time_previous,
|
|
&mut storage_previous,
|
|
benchmark_slots,
|
|
0,
|
|
0,
|
|
max_ledger_shreds as i64,
|
|
&blockstore,
|
|
&sys.get_stats(),
|
|
);
|
|
|
|
let u2 = storage_previous;
|
|
|
|
exit.store(true, Ordering::SeqCst);
|
|
cleaner.join().unwrap();
|
|
|
|
exit_cpu.store(true, Ordering::SeqCst);
|
|
sys.join().unwrap();
|
|
|
|
if config.assert_compaction {
|
|
assert!(u2 < u1, "expected compaction! pre={},post={}", u1, u2);
|
|
}
|
|
|
|
if config.cleanup_blockstore {
|
|
drop(blockstore);
|
|
Blockstore::destroy(&blockstore_path)
|
|
.expect("Expected successful database destruction");
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_compaction() {
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
|
|
|
|
let n = 10_000;
|
|
let batch_size_slots = 100;
|
|
let num_batches = n / batch_size_slots;
|
|
let max_ledger_shreds = 100;
|
|
|
|
for i in 0..num_batches {
|
|
let start_slot = i * batch_size_slots;
|
|
let (shreds, _) = make_many_slot_shreds(start_slot, batch_size_slots, 1);
|
|
blockstore.insert_shreds(shreds, None, false).unwrap();
|
|
}
|
|
|
|
let u1 = blockstore.storage_size().unwrap() as f64;
|
|
|
|
// send signal to cleanup slots
|
|
let (sender, receiver) = channel();
|
|
sender.send(n).unwrap();
|
|
let mut last_purge_slot = 0;
|
|
let highest_compact_slot = Arc::new(AtomicU64::new(0));
|
|
LedgerCleanupService::cleanup_ledger(
|
|
&receiver,
|
|
&blockstore,
|
|
max_ledger_shreds,
|
|
&mut last_purge_slot,
|
|
10,
|
|
&highest_compact_slot,
|
|
)
|
|
.unwrap();
|
|
|
|
let mut compaction_jitter = 0;
|
|
let mut last_compaction_slot = 0;
|
|
LedgerCleanupService::compact_ledger(
|
|
&blockstore,
|
|
&mut last_compaction_slot,
|
|
10,
|
|
&highest_compact_slot,
|
|
&mut compaction_jitter,
|
|
None,
|
|
);
|
|
|
|
thread::sleep(Duration::from_secs(2));
|
|
|
|
let u2 = blockstore.storage_size().unwrap() as f64;
|
|
|
|
assert!(u2 < u1, "insufficient compaction! pre={},post={}", u1, u2,);
|
|
|
|
// check that early slots don't exist
|
|
let max_slot = n - max_ledger_shreds - 1;
|
|
blockstore
|
|
.slot_meta_iterator(0)
|
|
.unwrap()
|
|
.for_each(|(slot, _)| assert!(slot > max_slot));
|
|
|
|
drop(blockstore);
|
|
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
|
}
|
|
}
|