diff --git a/Cargo.lock b/Cargo.lock index db90e922e4..8020957936 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -349,6 +349,11 @@ dependencies = [ "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "bytesize" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "bzip2" version = "0.3.3" @@ -1728,6 +1733,14 @@ dependencies = [ "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "memchr" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "memchr" version = "2.2.1" @@ -1906,6 +1919,14 @@ name = "nodrop" version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "nom" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "nom" version = "4.2.3" @@ -3388,6 +3409,7 @@ dependencies = [ "solana-vote-signer 0.22.0", "symlink 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "sys-info 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)", + "systemstat 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4691,6 +4713,20 @@ dependencies = [ "walkdir 2.2.9 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "systemstat" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytesize 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "nom 3.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "take_mut" version = "0.2.2" @@ -5524,6 +5560,7 @@ dependencies = [ "checksum byte-unit 3.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6894a79550807490d9f19a138a6da0f8830e70c83e83402dd23f16fd6c479056" "checksum byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c3dd8985a7111efc5c80b44e23ecdd8c007de8ade3b96595387e812b957cf5" "checksum bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +"checksum bytesize 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "716960a18f978640f25101b5cbf1c6f6b0d3192fab36a2d98ca96f0ecbe41010" "checksum bzip2 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "42b7c3cbf0fa9c1b82308d57191728ca0256cb821220f4e2fd410a72ade26e3b" "checksum bzip2-sys 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "6584aa36f5ad4c9247f5323b0a42f37802b37a836f0ad87084d7a33961abe25f" "checksum c2-chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7d64d04786e0f528460fc884753cf8dddcc466be308f6026f8e355c41a0e4101" @@ -5677,6 +5714,7 @@ dependencies = [ "checksum mach_o_sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3e854583a83f20cf329bb9283366335387f7db59d640d1412167e05fedb98826" "checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" "checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20" +"checksum memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "148fab2e51b4f1cfc66da2a7c32981d1d3c083a803978268bb11fe4b86925e7a" "checksum memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "88579771288728879b57485cc7d6b07d648c9f0141eb955f8ab7f9d45394468e" "checksum memmap 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e2ffa2c986de11a9df78620c01eeaaf27d94d3ff02bf81bfcca953102dd0c6ff" "checksum memmap 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" @@ -5697,6 +5735,7 @@ dependencies = [ "checksum nix 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6c722bee1037d430d0f8e687bbdbf222f27cc6e4e68d5caf630857bb2b6dbdce" "checksum nix 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)" = "19a8300bf427d432716764070ff70d5b2b7801c958b9049686e6cbd8b06fad92" "checksum nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "2f9667ddcc6cc8a43afc9b7917599d7216aa09c463919ea32c59ed6cac8bc945" +"checksum nom 3.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05aec50c70fd288702bcd93284a8444607f3292dbdf2a30de5ea5dcdbe72287b" "checksum nom 4.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2ad2a91a8e869eeb30b9cb3119ae87773a8f4ae617f41b1eb9c154b2905f7bd6" "checksum num-derive 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "eafd0b45c5537c3ba526f79d3e75120036502bebacbb3f3220914067ce39dbf2" "checksum num-derive 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0c8b15b261814f992e33760b1fca9fe8b693d8a65299f20c9901688636cfb746" @@ -5874,6 +5913,7 @@ dependencies = [ "checksum synstructure 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "02353edf96d6e4dc81aea2d8490a7e9db177bf8acb0e951c24940bf866cb313f" "checksum sys-info 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0079fe39cec2c8215e21b0bc4ccec9031004c160b88358f531b601e96b77f0df" "checksum sysctl 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0501f0d0c2aa64b419abff97c209f4b82c4e67caa63e8dc5b222ecc1b574cb5c" +"checksum systemstat 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "2078da8d09c6202bffd5e075946e65bfad5ce2cfa161edb15c5f014a8440adee" "checksum take_mut 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" "checksum tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)" = "b3196bfbffbba3e57481b6ea32249fbaf590396a52505a2615adbb79d9d826d3" "checksum tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" diff --git a/core/Cargo.toml b/core/Cargo.toml index 0655cdc851..08df4b51c5 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -82,6 +82,7 @@ matches = "0.1.6" reqwest = { version = "0.9.24", default-features = false, features = ["rustls-tls"] } serial_test = "0.3.2" serial_test_derive = "0.3.1" +systemstat = "0.1.5" [[bench]] name = "banking_stage" diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs new file mode 100644 index 0000000000..d093f12c6b --- /dev/null +++ b/core/tests/ledger_cleanup.rs @@ -0,0 +1,357 @@ +// Long-running ledger_cleanup tests + +#[cfg(test)] +mod tests { + use solana_core::ledger_cleanup_service::LedgerCleanupService; + use solana_ledger::blocktree::{make_many_slot_entries, Blocktree}; + use solana_ledger::get_tmp_ledger_path; + use solana_ledger::shred::Shred; + use std::collections::VecDeque; + use std::str::FromStr; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::mpsc::channel; + use std::sync::{Arc, RwLock}; + use std::thread::{Builder, JoinHandle}; + use std::time::{Duration, Instant}; + use systemstat::{CPULoad, Platform, System}; + + const DEFAULT_BENCHMARK_SLOTS: u64 = 180; + const DEFAULT_BATCH_SIZE: u64 = 1; + const DEFAULT_MAX_LEDGER_SLOTS: u64 = 180; + const DEFAULT_ENTRIES_PER_SLOT: u64 = 50_000; + const DEFAULT_STOP_SIZE_BYTES: u64 = 0; + const DEFAULT_STOP_SIZE_ITERATIONS: u64 = 0; + + const ROCKSDB_FLUSH_GRACE_PERIOD_SECS: u64 = 20; + + #[derive(Debug)] + struct BenchmarkConfig { + pub benchmark_slots: u64, + pub batch_size: u64, + pub max_ledger_slots: u64, + pub entries_per_slot: u64, + pub stop_size_bytes: u64, + pub stop_size_iterations: u64, + pub pre_generate_data: bool, + pub cleanup_blocktree: bool, + pub emit_cpu_info: bool, + pub assert_compaction: bool, + } + + #[derive(Clone, Copy, Debug)] + struct CpuStatsInner { + pub cpu_user: f32, + pub cpu_system: f32, + pub cpu_idle: f32, + } + + impl From for CpuStatsInner { + fn from(cpu: CPULoad) -> Self { + Self { + cpu_user: cpu.user * 100.0, + cpu_system: cpu.system * 100.0, + cpu_idle: cpu.idle * 100.0, + } + } + } + + impl Default for CpuStatsInner { + fn default() -> Self { + Self { + cpu_user: 0.0, + cpu_system: 0.0, + cpu_idle: 0.0, + } + } + } + + struct CpuStats { + stats: RwLock, + sys: System, + } + + impl Default for CpuStats { + fn default() -> Self { + Self { + stats: RwLock::new(CpuStatsInner::default()), + sys: System::new(), + } + } + } + + impl CpuStats { + fn update(&self) { + match self.sys.cpu_load_aggregate() { + Ok(cpu) => { + std::thread::sleep(Duration::from_millis(400)); + let cpu_new = CpuStatsInner::from(cpu.done().unwrap()); + *self.stats.write().unwrap() = cpu_new; + } + _ => (), + } + } + + fn get_stats(&self) -> CpuStatsInner { + self.stats.read().unwrap().clone() + } + } + + struct CpuStatsUpdater { + cpu_stats: Arc, + t_cleanup: JoinHandle<()>, + } + + impl CpuStatsUpdater { + pub fn new(exit: &Arc) -> Self { + let exit = exit.clone(); + let cpu_stats = Arc::new(CpuStats::default()); + let cpu_stats_clone = cpu_stats.clone(); + + let t_cleanup = Builder::new() + .name("cpu_info".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + cpu_stats_clone.update(); + }) + .unwrap(); + + Self { + cpu_stats: cpu_stats.clone(), + t_cleanup, + } + } + + pub fn get_stats(&self) -> CpuStatsInner { + self.cpu_stats.get_stats() + } + + pub fn join(self) -> std::thread::Result<()> { + self.t_cleanup.join() + } + } + + fn read_env(key: &str, default: T) -> T + where + T: FromStr, + { + match std::env::var(key) { + Ok(val) => val.parse().unwrap_or(default), + Err(_e) => default, + } + } + + fn get_benchmark_config() -> BenchmarkConfig { + let benchmark_slots = read_env("BENCHMARK_SLOTS", DEFAULT_BENCHMARK_SLOTS); + let batch_size = read_env("BATCH_SIZE", DEFAULT_BATCH_SIZE); + let max_ledger_slots = read_env("MAX_LEDGER_SLOTS", DEFAULT_MAX_LEDGER_SLOTS); + let entries_per_slot = read_env("ENTRIES_PER_SLOT", DEFAULT_ENTRIES_PER_SLOT); + let stop_size_bytes = read_env("STOP_SIZE_BYTES", DEFAULT_STOP_SIZE_BYTES); + let stop_size_iterations = read_env("STOP_SIZE_ITERATIONS", DEFAULT_STOP_SIZE_ITERATIONS); + let pre_generate_data = read_env("PRE_GENERATE_DATA", false); + let cleanup_blocktree = read_env("CLEANUP_BLOCKTREE", true); + let emit_cpu_info = read_env("EMIT_CPU_INFO", true); + // set default to `true` once compaction is merged + let assert_compaction = read_env("ASSERT_COMPACTION", false); + + BenchmarkConfig { + benchmark_slots, + batch_size, + max_ledger_slots, + entries_per_slot, + stop_size_bytes, + stop_size_iterations, + pre_generate_data, + cleanup_blocktree, + emit_cpu_info, + assert_compaction, + } + } + + fn emit_header() { + println!("TIME_MS,DELTA_MS,START_SLOT,BATCH_SIZE,ENTRIES,MAX,SIZE,DELTA_SIZE,CPU_USER,CPU_SYSTEM,CPU_IDLE"); + } + + fn emit_stats( + time_initial: &Instant, + time_previous: &mut Instant, + storage_previous: &mut u64, + start_slot: u64, + batch_size: u64, + entries: u64, + max_slots: i64, + blocktree: &Blocktree, + cpu: &CpuStatsInner, + ) { + let time_now = Instant::now(); + let storage_now = blocktree.storage_size().unwrap_or(0); + let (cpu_user, cpu_system, cpu_idle) = (cpu.cpu_user, cpu.cpu_system, cpu.cpu_idle); + + println!( + "{},{},{},{},{},{},{},{},{},{},{}", + time_now.duration_since(*time_initial).as_millis(), + time_now.duration_since(*time_previous).as_millis(), + start_slot, + batch_size, + entries, + max_slots, + storage_now, + storage_now as i64 - *storage_previous as i64, + cpu_user, + cpu_system, + cpu_idle, + ); + + *time_previous = time_now; + *storage_previous = storage_now; + } + + #[test] + fn test_ledger_cleanup_compaction() { + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap()); + let config = get_benchmark_config(); + eprintln!("BENCHMARK CONFIG: {:?}", config); + eprintln!("LEDGER_PATH: {:?}", &blocktree_path); + + let benchmark_slots = config.benchmark_slots; + let batch_size = config.batch_size; + let max_ledger_slots = config.max_ledger_slots; + let entries_per_slot = config.entries_per_slot; + let stop_size_bytes = config.stop_size_bytes; + let stop_size_iterations = config.stop_size_iterations; + let pre_generate_data = config.pre_generate_data; + let batches = benchmark_slots / batch_size; + + let (sender, receiver) = channel(); + let exit = Arc::new(AtomicBool::new(false)); + let cleaner = + LedgerCleanupService::new(receiver, blocktree.clone(), max_ledger_slots, &exit); + + let exit_cpu = Arc::new(AtomicBool::new(false)); + let sys = CpuStatsUpdater::new(&exit_cpu); + + let mut generated_batches = VecDeque::>::new(); + + if pre_generate_data { + let t0 = Instant::now(); + eprintln!("PRE_GENERATE_DATA: (this may take a while)"); + for i in 0..batches { + let x = i * batch_size; + let (shreds, _) = make_many_slot_entries(x, batch_size, entries_per_slot); + generated_batches.push_back(shreds); + } + eprintln!("PRE_GENERATE_DATA: took {} ms", t0.elapsed().as_millis()); + }; + + let time_initial = Instant::now(); + let mut time_previous = time_initial; + let mut storage_previous = 0; + let mut stop_size_bytes_exceeded_iterations = 0; + + emit_header(); + emit_stats( + &time_initial, + &mut time_previous, + &mut storage_previous, + 0, + 0, + 0, + 0, + &blocktree, + &sys.get_stats(), + ); + + for i in 0..batches { + let x = i * batch_size; + + let shreds = if pre_generate_data { + generated_batches.pop_front().unwrap() + } else { + make_many_slot_entries(x, batch_size, entries_per_slot).0 + }; + + blocktree.insert_shreds(shreds, None, false).unwrap(); + sender.send(x).unwrap(); + + emit_stats( + &time_initial, + &mut time_previous, + &mut storage_previous, + x, + batch_size, + batch_size, + max_ledger_slots as i64, + &blocktree, + &sys.get_stats(), + ); + + if stop_size_bytes > 0 { + if storage_previous >= stop_size_bytes { + stop_size_bytes_exceeded_iterations += 1; + } else { + stop_size_bytes_exceeded_iterations = 0; + } + + if stop_size_bytes_exceeded_iterations > stop_size_iterations { + break; + } + } + } + + let u1 = storage_previous; + + // send final `ledger_cleanup` notification (since iterations above are zero-based) + sender.send(benchmark_slots).unwrap(); + + emit_stats( + &time_initial, + &mut time_previous, + &mut storage_previous, + benchmark_slots, + 0, + 0, + max_ledger_slots as i64, + &blocktree, + &sys.get_stats(), + ); + + std::thread::sleep(std::time::Duration::from_secs( + ROCKSDB_FLUSH_GRACE_PERIOD_SECS, + )); + + emit_stats( + &time_initial, + &mut time_previous, + &mut storage_previous, + benchmark_slots, + 0, + 0, + max_ledger_slots as i64, + &blocktree, + &sys.get_stats(), + ); + + let u2 = storage_previous; + + exit.store(true, Ordering::SeqCst); + cleaner.join().unwrap(); + + exit_cpu.store(true, Ordering::SeqCst); + sys.join().unwrap(); + + std::thread::sleep(std::time::Duration::from_secs( + ROCKSDB_FLUSH_GRACE_PERIOD_SECS, + )); + + if config.assert_compaction { + assert!(u2 < u1, "expected compaction! pre={},post={}", u1, u2); + } + + if config.cleanup_blocktree { + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + } +}