Avoid full-range compactions with periodic filtered b.g. ones (backport #16697) (#17956)

* Avoid full-range compactions with periodic filtered b.g. ones (#16697)

* Update rocksdb to v0.16.0

* Promote the infrequent and important log to info!

* Force background compaction by ttl without manual compaction

* Fix test

* Support no compaction mode in test_ledger_cleanup_compaction

* Fix comment

* Make compaction_interval customizable

* Avoid major compaction with periodic filtering...

* Adress lazy_static, special cfs and range check

* Clean up a bit and add comment

* Add comment

* More comments...

* Config code cleanup

* Add comment

* Use .conflicts_with()

* Nullify unneeded delete_range ops for special CFs

* Some clean ups

* Clarify the locking intention

* Ensure special CFs' consistency with PurgeType::CompactionFilter

* Fix comment

* Fix bad copy paste

* Fix various types...

* Don't use tuples

* Add a unit test for compaction_filter

* Fix typo...

* Remove flag and just use new behavior always

* Fix wrong condition negation...

* Doc. about no set_last_purged_slot in purge_slots

* Write a test and fix off-by-one bug....

* Apply suggestions from code review

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>

* Follow up to github review suggestions

* Fix line-wrapping

* Fix conflict

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
(cherry picked from commit 1f97b2365f)

# Conflicts:
#	Cargo.lock
#	ledger/src/blockstore_db.rs

* Fix conflicts

Co-authored-by: Ryo Onodera <ryoqun@gmail.com>
This commit is contained in:
mergify[bot]
2021-06-15 08:49:13 +00:00
committed by GitHub
parent ff8f78199d
commit 24ee0b3934
9 changed files with 644 additions and 137 deletions

98
Cargo.lock generated
View File

@ -259,26 +259,21 @@ dependencies = [
[[package]] [[package]]
name = "bindgen" name = "bindgen"
version = "0.54.0" version = "0.57.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66c0bb6167449588ff70803f4127f0684f9063097eca5016f37eb52b92c2cf36" checksum = "fd4865004a46a0aafb2a0a5eb19d3c9fc46ee5f063a6cfc605c69ac9ecf5263d"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"cexpr", "cexpr",
"cfg-if 0.1.10",
"clang-sys", "clang-sys",
"clap",
"env_logger 0.7.1",
"lazy_static", "lazy_static",
"lazycell", "lazycell",
"log 0.4.11",
"peeking_take_while", "peeking_take_while",
"proc-macro2 1.0.24", "proc-macro2 1.0.24",
"quote 1.0.6", "quote 1.0.6",
"regex", "regex",
"rustc-hash", "rustc-hash",
"shlex", "shlex",
"which",
] ]
[[package]] [[package]]
@ -582,13 +577,13 @@ dependencies = [
[[package]] [[package]]
name = "clang-sys" name = "clang-sys"
version = "0.29.3" version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe6837df1d5cba2397b835c8530f51723267e16abbf83892e9e5af4f0e5dd10a" checksum = "853eda514c284c2287f4bf20ae614f8781f40a81d32ecda6e91449304dfe077c"
dependencies = [ dependencies = [
"glob", "glob",
"libc", "libc",
"libloading 0.5.2", "libloading 0.7.0",
] ]
[[package]] [[package]]
@ -1182,19 +1177,6 @@ dependencies = [
"syn 1.0.60", "syn 1.0.60",
] ]
[[package]]
name = "env_logger"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
dependencies = [
"atty",
"humantime 1.3.0",
"log 0.4.11",
"regex",
"termcolor",
]
[[package]] [[package]]
name = "env_logger" name = "env_logger"
version = "0.8.3" version = "0.8.3"
@ -1202,7 +1184,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f" checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f"
dependencies = [ dependencies = [
"atty", "atty",
"humantime 2.0.1", "humantime",
"log 0.4.11", "log 0.4.11",
"regex", "regex",
"termcolor", "termcolor",
@ -1762,15 +1744,6 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47"
[[package]]
name = "humantime"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
dependencies = [
"quick-error",
]
[[package]] [[package]]
name = "humantime" name = "humantime"
version = "2.0.1" version = "2.0.1"
@ -2202,16 +2175,6 @@ version = "0.2.81"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb" checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb"
[[package]]
name = "libloading"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753"
dependencies = [
"cc",
"winapi 0.3.8",
]
[[package]] [[package]]
name = "libloading" name = "libloading"
version = "0.6.2" version = "0.6.2"
@ -2222,10 +2185,20 @@ dependencies = [
] ]
[[package]] [[package]]
name = "librocksdb-sys" name = "libloading"
version = "6.11.4" version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb5b56f651c204634b936be2f92dbb42c36867e00ff7fe2405591f3b9fa66f09" checksum = "6f84d96438c15fcd6c3f244c8fce01d1e2b9c6b5623e9c711dc9286d8fc92d6a"
dependencies = [
"cfg-if 1.0.0",
"winapi 0.3.8",
]
[[package]]
name = "librocksdb-sys"
version = "6.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5da125e1c0f22c7cae785982115523a0738728498547f415c9054cb17c7e89f9"
dependencies = [ dependencies = [
"bindgen", "bindgen",
"cc", "cc",
@ -3099,12 +3072,6 @@ dependencies = [
"percent-encoding 2.1.0", "percent-encoding 2.1.0",
] ]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]] [[package]]
name = "quote" name = "quote"
version = "0.6.13" version = "0.6.13"
@ -3502,9 +3469,9 @@ dependencies = [
[[package]] [[package]]
name = "rocksdb" name = "rocksdb"
version = "0.15.0" version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23d83c02c429044d58474eaf5ae31e062d0de894e21125b47437ec0edc1397e6" checksum = "c749134fda8bfc90d0de643d59bfc841dcb3ac8a1062e12b6754bd60235c48b3"
dependencies = [ dependencies = [
"libc", "libc",
"librocksdb-sys", "librocksdb-sys",
@ -4216,7 +4183,7 @@ dependencies = [
"criterion-stats", "criterion-stats",
"ctrlc", "ctrlc",
"dirs-next", "dirs-next",
"humantime 2.0.1", "humantime",
"indicatif", "indicatif",
"log 0.4.11", "log 0.4.11",
"num-traits", "num-traits",
@ -4270,7 +4237,7 @@ dependencies = [
"base64 0.13.0", "base64 0.13.0",
"chrono", "chrono",
"console", "console",
"humantime 2.0.1", "humantime",
"indicatif", "indicatif",
"serde", "serde",
"serde_derive", "serde_derive",
@ -4806,7 +4773,7 @@ dependencies = [
name = "solana-logger" name = "solana-logger"
version = "1.6.12" version = "1.6.12"
dependencies = [ dependencies = [
"env_logger 0.8.3", "env_logger",
"lazy_static", "lazy_static",
"log 0.4.11", "log 0.4.11",
] ]
@ -4817,7 +4784,7 @@ version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ea5932e186629f47859924b3773cfd8bcb4b8796898ac85c1fa0a6a2024e5c6" checksum = "7ea5932e186629f47859924b3773cfd8bcb4b8796898ac85c1fa0a6a2024e5c6"
dependencies = [ dependencies = [
"env_logger 0.8.3", "env_logger",
"lazy_static", "lazy_static",
"log 0.4.11", "log 0.4.11",
] ]
@ -4860,7 +4827,7 @@ dependencies = [
name = "solana-metrics" name = "solana-metrics"
version = "1.6.12" version = "1.6.12"
dependencies = [ dependencies = [
"env_logger 0.8.3", "env_logger",
"gethostname", "gethostname",
"lazy_static", "lazy_static",
"log 0.4.11", "log 0.4.11",
@ -5586,7 +5553,7 @@ name = "solana-watchtower"
version = "1.6.12" version = "1.6.12"
dependencies = [ dependencies = [
"clap", "clap",
"humantime 2.0.1", "humantime",
"log 0.4.11", "log 0.4.11",
"solana-clap-utils", "solana-clap-utils",
"solana-cli-config", "solana-cli-config",
@ -5833,7 +5800,7 @@ dependencies = [
"anyhow", "anyhow",
"fnv", "fnv",
"futures 0.3.8", "futures 0.3.8",
"humantime 2.0.1", "humantime",
"log 0.4.11", "log 0.4.11",
"pin-project 1.0.1", "pin-project 1.0.1",
"rand 0.7.3", "rand 0.7.3",
@ -6888,15 +6855,6 @@ dependencies = [
"tokio-tls", "tokio-tls",
] ]
[[package]]
name = "which"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d011071ae14a2f6671d0b74080ae0cd8ebf3a6f8c9589a2cd45f23126fe29724"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.2.8" version = "0.2.8"

View File

@ -207,11 +207,25 @@ impl LedgerCleanupService {
); );
let mut purge_time = Measure::start("purge_slots"); let mut purge_time = Measure::start("purge_slots");
blockstore.purge_slots( blockstore.purge_slots(
purge_first_slot, purge_first_slot,
lowest_cleanup_slot, lowest_cleanup_slot,
PurgeType::PrimaryIndex, PurgeType::CompactionFilter,
); );
// Update only after purge operation.
// Safety: This value can be used by compaction_filters shared via Arc<AtomicU64>.
// Compactions are async and run as a multi-threaded background job. However, this
// shouldn't cause consistency issues for iterators and getters because we have
// already expired all affected keys (older than or equal to lowest_cleanup_slot)
// by the above `purge_slots`. According to the general RocksDB design where SST
// files are immutable, even running iterators aren't affected; the database grabs
// a snapshot of the live set of sst files at iterator's creation.
// Also, we passed the PurgeType::CompactionFilter, meaning no delete_range for
// transaction_status and address_signatures CFs. These are fine because they
// don't require strong consistent view for their operation.
blockstore.set_max_expired_slot(lowest_cleanup_slot);
purge_time.stop(); purge_time.stop();
info!("{}", purge_time); info!("{}", purge_time);

View File

@ -1617,9 +1617,11 @@ mod tests {
} }
drop(blockstore); drop(blockstore);
// this purges and compacts all slots greater than or equal to 5
backup_and_clear_blockstore(&blockstore_path, 5, 2); backup_and_clear_blockstore(&blockstore_path, 5, 2);
let blockstore = Blockstore::open(&blockstore_path).unwrap(); let blockstore = Blockstore::open(&blockstore_path).unwrap();
// assert that slots less than 5 aren't affected
assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty()); assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
for i in 5..10 { for i in 5..10 {
assert!(blockstore assert!(blockstore

View File

@ -39,6 +39,8 @@ mod tests {
pub cleanup_blockstore: bool, pub cleanup_blockstore: bool,
pub emit_cpu_info: bool, pub emit_cpu_info: bool,
pub assert_compaction: bool, pub assert_compaction: bool,
pub compaction_interval: Option<u64>,
pub no_compaction: bool,
} }
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
@ -154,6 +156,11 @@ mod tests {
let emit_cpu_info = read_env("EMIT_CPU_INFO", true); let emit_cpu_info = read_env("EMIT_CPU_INFO", true);
// set default to `true` once compaction is merged // set default to `true` once compaction is merged
let assert_compaction = read_env("ASSERT_COMPACTION", false); let assert_compaction = read_env("ASSERT_COMPACTION", false);
let compaction_interval = match read_env("COMPACTION_INTERVAL", 0) {
maybe_zero if maybe_zero == 0 => None,
non_zero => Some(non_zero),
};
let no_compaction = read_env("NO_COMPACTION", false);
BenchmarkConfig { BenchmarkConfig {
benchmark_slots, benchmark_slots,
@ -166,6 +173,8 @@ mod tests {
cleanup_blockstore, cleanup_blockstore,
emit_cpu_info, emit_cpu_info,
assert_compaction, assert_compaction,
compaction_interval,
no_compaction,
} }
} }
@ -211,8 +220,13 @@ mod tests {
fn test_ledger_cleanup_compaction() { fn test_ledger_cleanup_compaction() {
solana_logger::setup(); solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap()); let mut blockstore = Blockstore::open(&blockstore_path).unwrap();
let config = get_benchmark_config(); let config = get_benchmark_config();
if config.no_compaction {
blockstore.set_no_compaction(true);
}
let blockstore = Arc::new(blockstore);
eprintln!("BENCHMARK CONFIG: {:?}", config); eprintln!("BENCHMARK CONFIG: {:?}", config);
eprintln!("LEDGER_PATH: {:?}", &blockstore_path); eprintln!("LEDGER_PATH: {:?}", &blockstore_path);
@ -223,6 +237,8 @@ mod tests {
let stop_size_bytes = config.stop_size_bytes; let stop_size_bytes = config.stop_size_bytes;
let stop_size_iterations = config.stop_size_iterations; let stop_size_iterations = config.stop_size_iterations;
let pre_generate_data = config.pre_generate_data; let pre_generate_data = config.pre_generate_data;
let compaction_interval = config.compaction_interval;
let batches = benchmark_slots / batch_size; let batches = benchmark_slots / batch_size;
let (sender, receiver) = channel(); let (sender, receiver) = channel();
@ -232,7 +248,7 @@ mod tests {
blockstore.clone(), blockstore.clone(),
max_ledger_shreds, max_ledger_shreds,
&exit, &exit,
None, compaction_interval,
None, None,
); );

View File

@ -59,7 +59,7 @@ trees = "0.2.1"
[dependencies.rocksdb] [dependencies.rocksdb]
# Avoid the vendored bzip2 within rocksdb-sys that can cause linker conflicts # Avoid the vendored bzip2 within rocksdb-sys that can cause linker conflicts
# when also using the bzip2 crate # when also using the bzip2 crate
version = "0.15.0" version = "0.16.0"
default-features = false default-features = false
features = ["lz4"] features = ["lz4"]

View File

@ -54,7 +54,7 @@ use std::{
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
mpsc::{sync_channel, Receiver, SyncSender, TrySendError}, mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
Arc, Mutex, RwLock, Arc, Mutex, RwLock, RwLockWriteGuard,
}, },
}; };
use thiserror::Error; use thiserror::Error;
@ -92,6 +92,7 @@ type CompletedRanges = Vec<(u32, u32)>;
pub enum PurgeType { pub enum PurgeType {
Exact, Exact,
PrimaryIndex, PrimaryIndex,
CompactionFilter,
} }
#[derive(Error, Debug)] #[derive(Error, Debug)]
@ -144,7 +145,7 @@ pub struct Blockstore {
insert_shreds_lock: Arc<Mutex<()>>, insert_shreds_lock: Arc<Mutex<()>>,
pub new_shreds_signals: Vec<SyncSender<bool>>, pub new_shreds_signals: Vec<SyncSender<bool>>,
pub completed_slots_senders: Vec<SyncSender<Vec<Slot>>>, pub completed_slots_senders: Vec<SyncSender<Vec<Slot>>>,
pub lowest_cleanup_slot: Arc<RwLock<u64>>, pub lowest_cleanup_slot: Arc<RwLock<Slot>>,
no_compaction: bool, no_compaction: bool,
} }
@ -1905,18 +1906,24 @@ impl Blockstore {
batch.put::<cf::TransactionStatusIndex>(0, &index0)?; batch.put::<cf::TransactionStatusIndex>(0, &index0)?;
Ok(None) Ok(None)
} else { } else {
let result = if index0.frozen && to_slot > index0.max_slot { let purge_target_primary_index = if index0.frozen && to_slot > index0.max_slot {
debug!("Pruning transaction index 0 at slot {}", index0.max_slot); info!(
"Pruning expired primary index 0 up to slot {} (max requested: {})",
index0.max_slot, to_slot
);
Some(0) Some(0)
} else if index1.frozen && to_slot > index1.max_slot { } else if index1.frozen && to_slot > index1.max_slot {
debug!("Pruning transaction index 1 at slot {}", index1.max_slot); info!(
"Pruning expired primary index 1 up to slot {} (max requested: {})",
index1.max_slot, to_slot
);
Some(1) Some(1)
} else { } else {
None None
}; };
if result.is_some() { if let Some(purge_target_primary_index) = purge_target_primary_index {
*w_active_transaction_status_index = if index0.frozen { 0 } else { 1 }; *w_active_transaction_status_index = purge_target_primary_index;
if index0.frozen { if index0.frozen {
index0.max_slot = 0 index0.max_slot = 0
}; };
@ -1929,16 +1936,17 @@ impl Blockstore {
batch.put::<cf::TransactionStatusIndex>(1, &index1)?; batch.put::<cf::TransactionStatusIndex>(1, &index1)?;
} }
Ok(result) Ok(purge_target_primary_index)
} }
} }
fn get_primary_index( fn get_primary_index_to_write(
&self, &self,
slot: Slot, slot: Slot,
w_active_transaction_status_index: &mut u64, // take WriteGuard to require critical section semantics at call site
w_active_transaction_status_index: &RwLockWriteGuard<Slot>,
) -> Result<u64> { ) -> Result<u64> {
let i = *w_active_transaction_status_index; let i = **w_active_transaction_status_index;
let mut index_meta = self.transaction_status_index_cf.get(i)?.unwrap(); let mut index_meta = self.transaction_status_index_cf.get(i)?.unwrap();
if slot > index_meta.max_slot { if slot > index_meta.max_slot {
assert!(!index_meta.frozen); assert!(!index_meta.frozen);
@ -1977,9 +1985,10 @@ impl Blockstore {
let status = status.into(); let status = status.into();
// This write lock prevents interleaving issues with the transaction_status_index_cf by gating // This write lock prevents interleaving issues with the transaction_status_index_cf by gating
// writes to that column // writes to that column
let mut w_active_transaction_status_index = let w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap(); self.active_transaction_status_index.write().unwrap();
let primary_index = self.get_primary_index(slot, &mut w_active_transaction_status_index)?; let primary_index =
self.get_primary_index_to_write(slot, &w_active_transaction_status_index)?;
self.transaction_status_cf self.transaction_status_cf
.put_protobuf((primary_index, signature, slot), &status)?; .put_protobuf((primary_index, signature, slot), &status)?;
for address in writable_keys { for address in writable_keys {
@ -1997,6 +2006,21 @@ impl Blockstore {
Ok(()) Ok(())
} }
fn ensure_lowest_cleanup_slot(&self) -> (std::sync::RwLockReadGuard<Slot>, Slot) {
// Ensures consistent result by using lowest_cleanup_slot as the lower bound
// for reading columns that do not employ strong read consistency with slot-based
// delete_range
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
let lowest_available_slot = (*lowest_cleanup_slot)
.checked_add(1)
.expect("overflow from trusted value");
// Make caller hold this lock properly; otherwise LedgerCleanupService can purge/compact
// needed slots here at any given moment.
// Blockstore callers, like rpc, can process concurrent read queries
(lowest_cleanup_slot, lowest_available_slot)
}
// Returns a transaction status, as well as a loop counter for unit testing // Returns a transaction status, as well as a loop counter for unit testing
fn get_transaction_status_with_counter( fn get_transaction_status_with_counter(
&self, &self,
@ -2004,9 +2028,15 @@ impl Blockstore {
confirmed_unrooted_slots: &[Slot], confirmed_unrooted_slots: &[Slot],
) -> Result<(Option<(Slot, TransactionStatusMeta)>, u64)> { ) -> Result<(Option<(Slot, TransactionStatusMeta)>, u64)> {
let mut counter = 0; let mut counter = 0;
let (lock, lowest_available_slot) = self.ensure_lowest_cleanup_slot();
for transaction_status_cf_primary_index in 0..=1 { for transaction_status_cf_primary_index in 0..=1 {
let index_iterator = self.transaction_status_cf.iter(IteratorMode::From( let index_iterator = self.transaction_status_cf.iter(IteratorMode::From(
(transaction_status_cf_primary_index, signature, 0), (
transaction_status_cf_primary_index,
signature,
lowest_available_slot,
),
IteratorDirection::Forward, IteratorDirection::Forward,
))?; ))?;
for ((i, sig, slot), _data) in index_iterator { for ((i, sig, slot), _data) in index_iterator {
@ -2025,6 +2055,8 @@ impl Blockstore {
return Ok((status, counter)); return Ok((status, counter));
} }
} }
drop(lock);
Ok((None, counter)) Ok((None, counter))
} }
@ -2148,13 +2180,15 @@ impl Blockstore {
start_slot: Slot, start_slot: Slot,
end_slot: Slot, end_slot: Slot,
) -> Result<Vec<(Slot, Signature)>> { ) -> Result<Vec<(Slot, Signature)>> {
let (lock, lowest_available_slot) = self.ensure_lowest_cleanup_slot();
let mut signatures: Vec<(Slot, Signature)> = vec![]; let mut signatures: Vec<(Slot, Signature)> = vec![];
for transaction_status_cf_primary_index in 0..=1 { for transaction_status_cf_primary_index in 0..=1 {
let index_iterator = self.address_signatures_cf.iter(IteratorMode::From( let index_iterator = self.address_signatures_cf.iter(IteratorMode::From(
( (
transaction_status_cf_primary_index, transaction_status_cf_primary_index,
pubkey, pubkey,
start_slot, start_slot.max(lowest_available_slot),
Signature::default(), Signature::default(),
), ),
IteratorDirection::Forward, IteratorDirection::Forward,
@ -2169,6 +2203,7 @@ impl Blockstore {
} }
} }
} }
drop(lock);
signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1))); signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1)));
Ok(signatures) Ok(signatures)
} }
@ -2181,13 +2216,14 @@ impl Blockstore {
pubkey: Pubkey, pubkey: Pubkey,
slot: Slot, slot: Slot,
) -> Result<Vec<(Slot, Signature)>> { ) -> Result<Vec<(Slot, Signature)>> {
let (lock, lowest_available_slot) = self.ensure_lowest_cleanup_slot();
let mut signatures: Vec<(Slot, Signature)> = vec![]; let mut signatures: Vec<(Slot, Signature)> = vec![];
for transaction_status_cf_primary_index in 0..=1 { for transaction_status_cf_primary_index in 0..=1 {
let index_iterator = self.address_signatures_cf.iter(IteratorMode::From( let index_iterator = self.address_signatures_cf.iter(IteratorMode::From(
( (
transaction_status_cf_primary_index, transaction_status_cf_primary_index,
pubkey, pubkey,
slot, slot.max(lowest_available_slot),
Signature::default(), Signature::default(),
), ),
IteratorDirection::Forward, IteratorDirection::Forward,
@ -2202,6 +2238,7 @@ impl Blockstore {
signatures.push((slot, signature)); signatures.push((slot, signature));
} }
} }
drop(lock);
signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1))); signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1)));
Ok(signatures) Ok(signatures)
} }
@ -6617,6 +6654,176 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
fn do_test_lowest_cleanup_slot_and_special_cfs(
simulate_compaction: bool,
simulate_ledger_cleanup_service: bool,
) {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
// TransactionStatus column opens initialized with one entry at index 2
let transaction_status_cf = blockstore.db.column::<cf::TransactionStatus>();
let pre_balances_vec = vec![1, 2, 3];
let post_balances_vec = vec![3, 2, 1];
let status = TransactionStatusMeta {
status: solana_sdk::transaction::Result::<()>::Ok(()),
fee: 42u64,
pre_balances: pre_balances_vec,
post_balances: post_balances_vec,
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
rewards: Some(vec![]),
}
.into();
let signature1 = Signature::new(&[2u8; 64]);
let signature2 = Signature::new(&[3u8; 64]);
// Insert rooted slots 0..=3 with no fork
let meta0 = SlotMeta::new(0, 0);
blockstore.meta_cf.put(0, &meta0).unwrap();
let meta1 = SlotMeta::new(1, 0);
blockstore.meta_cf.put(1, &meta1).unwrap();
let meta2 = SlotMeta::new(2, 1);
blockstore.meta_cf.put(2, &meta2).unwrap();
let meta3 = SlotMeta::new(3, 2);
blockstore.meta_cf.put(3, &meta3).unwrap();
blockstore.set_roots(&[0, 1, 2, 3]).unwrap();
let lowest_cleanup_slot = 1;
let lowest_available_slot = lowest_cleanup_slot + 1;
transaction_status_cf
.put_protobuf((0, signature1, lowest_cleanup_slot), &status)
.unwrap();
transaction_status_cf
.put_protobuf((0, signature2, lowest_available_slot), &status)
.unwrap();
let address0 = solana_sdk::pubkey::new_rand();
let address1 = solana_sdk::pubkey::new_rand();
blockstore
.write_transaction_status(
lowest_cleanup_slot,
signature1,
vec![&address0],
vec![],
TransactionStatusMeta::default(),
)
.unwrap();
blockstore
.write_transaction_status(
lowest_available_slot,
signature2,
vec![&address1],
vec![],
TransactionStatusMeta::default(),
)
.unwrap();
let check_for_missing = || {
(
blockstore
.get_transaction_status_with_counter(signature1, &[])
.unwrap()
.0
.is_none(),
blockstore
.find_address_signatures_for_slot(address0, lowest_cleanup_slot)
.unwrap()
.is_empty(),
blockstore
.find_address_signatures(address0, lowest_cleanup_slot, lowest_cleanup_slot)
.unwrap()
.is_empty(),
)
};
let assert_existing_always = || {
let are_existing_always = (
blockstore
.get_transaction_status_with_counter(signature2, &[])
.unwrap()
.0
.is_some(),
!blockstore
.find_address_signatures_for_slot(address1, lowest_available_slot)
.unwrap()
.is_empty(),
!blockstore
.find_address_signatures(
address1,
lowest_available_slot,
lowest_available_slot,
)
.unwrap()
.is_empty(),
);
assert_eq!(are_existing_always, (true, true, true));
};
let are_missing = check_for_missing();
// should never be missing before the conditional compaction & simulation...
assert_eq!(are_missing, (false, false, false));
assert_existing_always();
if simulate_compaction {
blockstore.set_max_expired_slot(lowest_cleanup_slot);
// force compaction filters to run across whole key range.
blockstore
.compact_storage(Slot::min_value(), Slot::max_value())
.unwrap();
}
if simulate_ledger_cleanup_service {
*blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot;
}
let are_missing = check_for_missing();
if simulate_compaction || simulate_ledger_cleanup_service {
// ... when either simulation (or both) is effective, we should observe to be missing
// consistently
assert_eq!(are_missing, (true, true, true));
} else {
// ... otherwise, we should observe to be existing...
assert_eq!(are_missing, (false, false, false));
}
assert_existing_always();
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_lowest_cleanup_slot_and_special_cfs_with_compact_with_ledger_cleanup_service_simulation(
) {
do_test_lowest_cleanup_slot_and_special_cfs(true, true);
}
#[test]
fn test_lowest_cleanup_slot_and_special_cfs_with_compact_without_ledger_cleanup_service_simulation(
) {
do_test_lowest_cleanup_slot_and_special_cfs(true, false);
}
#[test]
fn test_lowest_cleanup_slot_and_special_cfs_without_compact_with_ledger_cleanup_service_simulation(
) {
do_test_lowest_cleanup_slot_and_special_cfs(false, true);
}
#[test]
fn test_lowest_cleanup_slot_and_special_cfs_without_compact_without_ledger_cleanup_service_simulation(
) {
do_test_lowest_cleanup_slot_and_special_cfs(false, false);
}
#[test] #[test]
fn test_get_rooted_transaction() { fn test_get_rooted_transaction() {
let slot = 2; let slot = 2;

View File

@ -32,6 +32,19 @@ impl Blockstore {
} }
} }
/// Usually this is paired with .purge_slots() but we can't internally call this in
/// that function unconditionally. That's because set_max_expired_slot()
/// expects to purge older slots by the successive chronological order, while .purge_slots()
/// can also be used to purge *future* slots for --hard-fork thing, preserving older
/// slots. It'd be quite dangerous to purge older slots in that case.
/// So, current legal user of this function is LedgerCleanupService.
pub fn set_max_expired_slot(&self, to_slot: Slot) {
// convert here from inclusive purged range end to inclusive alive range start to align
// with Slot::default() for initial compaction filter behavior consistency
let to_slot = to_slot.checked_add(1).unwrap();
self.db.set_oldest_slot(to_slot);
}
pub fn purge_and_compact_slots(&self, from_slot: Slot, to_slot: Slot) { pub fn purge_and_compact_slots(&self, from_slot: Slot, to_slot: Slot) {
self.purge_slots(from_slot, to_slot, PurgeType::Exact); self.purge_slots(from_slot, to_slot, PurgeType::Exact);
if let Err(e) = self.compact_storage(from_slot, to_slot) { if let Err(e) = self.compact_storage(from_slot, to_slot) {
@ -184,6 +197,13 @@ impl Blockstore {
to_slot, to_slot,
)?; )?;
} }
PurgeType::CompactionFilter => {
// No explicit action is required here because this purge type completely and
// indefinitely relies on the proper working of compaction filter for those
// special column families, never toggling the primary index from the current
// one. Overall, this enables well uniformly distributed writes, resulting
// in no spiky periodic huge delete_range for them.
}
} }
delete_range_timer.stop(); delete_range_timer.stop();
let mut write_timer = Measure::start("write_batch"); let mut write_timer = Measure::start("write_batch");
@ -197,6 +217,10 @@ impl Blockstore {
write_timer.stop(); write_timer.stop();
purge_stats.delete_range += delete_range_timer.as_us(); purge_stats.delete_range += delete_range_timer.as_us();
purge_stats.write_batch += write_timer.as_us(); purge_stats.write_batch += write_timer.as_us();
// only drop w_active_transaction_status_index after we do db.write(write_batch);
// otherwise, readers might be confused with inconsistent state between
// self.active_transaction_status_index and RockDb's TransactionStatusIndex contents
drop(w_active_transaction_status_index);
Ok(columns_purged) Ok(columns_purged)
} }
@ -331,18 +355,26 @@ impl Blockstore {
w_active_transaction_status_index: &mut u64, w_active_transaction_status_index: &mut u64,
to_slot: Slot, to_slot: Slot,
) -> Result<()> { ) -> Result<()> {
if let Some(index) = self.toggle_transaction_status_index( if let Some(purged_index) = self.toggle_transaction_status_index(
write_batch, write_batch,
w_active_transaction_status_index, w_active_transaction_status_index,
to_slot, to_slot,
)? { )? {
*columns_purged &= self *columns_purged &= self
.db .db
.delete_range_cf::<cf::TransactionStatus>(write_batch, index, index + 1) .delete_range_cf::<cf::TransactionStatus>(
write_batch,
purged_index,
purged_index + 1,
)
.is_ok() .is_ok()
& self & self
.db .db
.delete_range_cf::<cf::AddressSignatures>(write_batch, index, index + 1) .delete_range_cf::<cf::AddressSignatures>(
write_batch,
purged_index,
purged_index + 1,
)
.is_ok(); .is_ok();
} }
Ok(()) Ok(())

View File

@ -5,9 +5,13 @@ use log::*;
use prost::Message; use prost::Message;
pub use rocksdb::Direction as IteratorDirection; pub use rocksdb::Direction as IteratorDirection;
use rocksdb::{ use rocksdb::{
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, DBRecoveryMode, self,
IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB, compaction_filter::CompactionFilter,
compaction_filter_factory::{CompactionFilterContext, CompactionFilterFactory},
ColumnFamily, ColumnFamilyDescriptor, CompactionDecision, DBIterator, DBRawIterator,
DBRecoveryMode, IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB,
}; };
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use solana_runtime::hardened_unpack::UnpackError; use solana_runtime::hardened_unpack::UnpackError;
@ -17,7 +21,17 @@ use solana_sdk::{
signature::Signature, signature::Signature,
}; };
use solana_storage_proto::convert::generated; use solana_storage_proto::convert::generated;
use std::{collections::HashMap, fs, marker::PhantomData, path::Path, sync::Arc}; use std::{
collections::HashMap,
ffi::{CStr, CString},
fs,
marker::PhantomData,
path::Path,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use thiserror::Error; use thiserror::Error;
const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB
@ -58,6 +72,9 @@ const PERF_SAMPLES_CF: &str = "perf_samples";
/// Column family for BlockHeight /// Column family for BlockHeight
const BLOCK_HEIGHT_CF: &str = "block_height"; const BLOCK_HEIGHT_CF: &str = "block_height";
// 1 day is chosen for the same reasoning of DEFAULT_COMPACTION_SLOT_INTERVAL
const PERIODIC_COMPACTION_SECONDS: u64 = 60 * 60 * 24;
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum BlockstoreError { pub enum BlockstoreError {
ShredForIndexExists, ShredForIndexExists,
@ -208,8 +225,30 @@ impl From<BlockstoreRecoveryMode> for DBRecoveryMode {
} }
} }
#[derive(Default, Clone, Debug)]
struct OldestSlot(Arc<AtomicU64>);
impl OldestSlot {
pub fn set(&self, oldest_slot: Slot) {
// this is independently used for compaction_filter without any data dependency.
// also, compaction_filters are created via its factories, creating short-lived copies of
// this atomic value for the single job of compaction. So, Relaxed store can be justified
// in total
self.0.store(oldest_slot, Ordering::Relaxed);
}
pub fn get(&self) -> Slot {
// copy from the AtomicU64 as a general precaution so that the oldest_slot can not mutate
// across single run of compaction for simpler reasoning although this isn't strict
// requirement at the moment
// also eventual propagation (very Relaxed) load is Ok, because compaction by nature doesn't
// require strictly synchronized semantics in this regard
self.0.load(Ordering::Relaxed)
}
}
#[derive(Debug)] #[derive(Debug)]
struct Rocks(rocksdb::DB, ActualAccessType); struct Rocks(rocksdb::DB, ActualAccessType, OldestSlot);
impl Rocks { impl Rocks {
fn open( fn open(
@ -234,39 +273,73 @@ impl Rocks {
db_options.set_wal_recovery_mode(recovery_mode.into()); db_options.set_wal_recovery_mode(recovery_mode.into());
} }
let oldest_slot = OldestSlot::default();
// Column family names // Column family names
let meta_cf_descriptor = let meta_cf_descriptor = ColumnFamilyDescriptor::new(
ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(&access_type)); SlotMeta::NAME,
let dead_slots_cf_descriptor = get_cf_options::<SlotMeta>(&access_type, &oldest_slot),
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(&access_type)); );
let duplicate_slots_cf_descriptor = let dead_slots_cf_descriptor = ColumnFamilyDescriptor::new(
ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options(&access_type)); DeadSlots::NAME,
let erasure_meta_cf_descriptor = get_cf_options::<DeadSlots>(&access_type, &oldest_slot),
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(&access_type)); );
let orphans_cf_descriptor = let duplicate_slots_cf_descriptor = ColumnFamilyDescriptor::new(
ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options(&access_type)); DuplicateSlots::NAME,
let root_cf_descriptor = get_cf_options::<DuplicateSlots>(&access_type, &oldest_slot),
ColumnFamilyDescriptor::new(Root::NAME, get_cf_options(&access_type)); );
let index_cf_descriptor = let erasure_meta_cf_descriptor = ColumnFamilyDescriptor::new(
ColumnFamilyDescriptor::new(Index::NAME, get_cf_options(&access_type)); ErasureMeta::NAME,
let shred_data_cf_descriptor = get_cf_options::<ErasureMeta>(&access_type, &oldest_slot),
ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options(&access_type)); );
let shred_code_cf_descriptor = let orphans_cf_descriptor = ColumnFamilyDescriptor::new(
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options(&access_type)); Orphans::NAME,
let transaction_status_cf_descriptor = get_cf_options::<Orphans>(&access_type, &oldest_slot),
ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options(&access_type)); );
let address_signatures_cf_descriptor = let root_cf_descriptor = ColumnFamilyDescriptor::new(
ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options(&access_type)); Root::NAME,
let transaction_status_index_cf_descriptor = get_cf_options::<Root>(&access_type, &oldest_slot),
ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options(&access_type)); );
let rewards_cf_descriptor = let index_cf_descriptor = ColumnFamilyDescriptor::new(
ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options(&access_type)); Index::NAME,
let blocktime_cf_descriptor = get_cf_options::<Index>(&access_type, &oldest_slot),
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options(&access_type)); );
let perf_samples_cf_descriptor = let shred_data_cf_descriptor = ColumnFamilyDescriptor::new(
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options(&access_type)); ShredData::NAME,
let block_height_cf_descriptor = get_cf_options::<ShredData>(&access_type, &oldest_slot),
ColumnFamilyDescriptor::new(BlockHeight::NAME, get_cf_options(&access_type)); );
let shred_code_cf_descriptor = ColumnFamilyDescriptor::new(
ShredCode::NAME,
get_cf_options::<ShredCode>(&access_type, &oldest_slot),
);
let transaction_status_cf_descriptor = ColumnFamilyDescriptor::new(
TransactionStatus::NAME,
get_cf_options::<TransactionStatus>(&access_type, &oldest_slot),
);
let address_signatures_cf_descriptor = ColumnFamilyDescriptor::new(
AddressSignatures::NAME,
get_cf_options::<AddressSignatures>(&access_type, &oldest_slot),
);
let transaction_status_index_cf_descriptor = ColumnFamilyDescriptor::new(
TransactionStatusIndex::NAME,
get_cf_options::<TransactionStatusIndex>(&access_type, &oldest_slot),
);
let rewards_cf_descriptor = ColumnFamilyDescriptor::new(
Rewards::NAME,
get_cf_options::<Rewards>(&access_type, &oldest_slot),
);
let blocktime_cf_descriptor = ColumnFamilyDescriptor::new(
Blocktime::NAME,
get_cf_options::<Blocktime>(&access_type, &oldest_slot),
);
let perf_samples_cf_descriptor = ColumnFamilyDescriptor::new(
PerfSamples::NAME,
get_cf_options::<PerfSamples>(&access_type, &oldest_slot),
);
let block_height_cf_descriptor = ColumnFamilyDescriptor::new(
BlockHeight::NAME,
get_cf_options::<BlockHeight>(&access_type, &oldest_slot),
);
// Don't forget to add to both run_purge_with_stats() and // Don't forget to add to both run_purge_with_stats() and
// compact_storage() in ledger/src/blockstore/blockstore_purge.rs!! // compact_storage() in ledger/src/blockstore/blockstore_purge.rs!!
@ -291,18 +364,18 @@ impl Rocks {
(PerfSamples::NAME, perf_samples_cf_descriptor), (PerfSamples::NAME, perf_samples_cf_descriptor),
(BlockHeight::NAME, block_height_cf_descriptor), (BlockHeight::NAME, block_height_cf_descriptor),
]; ];
let cf_names: Vec<_> = cfs.iter().map(|c| c.0).collect();
// Open the database // Open the database
let db = match access_type { let db = match access_type {
AccessType::PrimaryOnly | AccessType::PrimaryOnlyForMaintenance => Rocks( AccessType::PrimaryOnly | AccessType::PrimaryOnlyForMaintenance => Rocks(
DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1))?, DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1))?,
ActualAccessType::Primary, ActualAccessType::Primary,
oldest_slot,
), ),
AccessType::TryPrimaryThenSecondary => { AccessType::TryPrimaryThenSecondary => {
let names: Vec<_> = cfs.iter().map(|c| c.0).collect();
match DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1)) { match DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1)) {
Ok(db) => Rocks(db, ActualAccessType::Primary), Ok(db) => Rocks(db, ActualAccessType::Primary, oldest_slot),
Err(err) => { Err(err) => {
let secondary_path = path.join("solana-secondary"); let secondary_path = path.join("solana-secondary");
@ -314,13 +387,75 @@ impl Rocks {
db_options.set_max_open_files(-1); db_options.set_max_open_files(-1);
Rocks( Rocks(
DB::open_cf_as_secondary(&db_options, path, &secondary_path, names)?, DB::open_cf_as_secondary(
&db_options,
path,
&secondary_path,
cf_names.clone(),
)?,
ActualAccessType::Secondary, ActualAccessType::Secondary,
oldest_slot,
) )
} }
} }
} }
}; };
// this is only needed for LedgerCleanupService. so guard with PrimaryOnly (i.e. running solana-validator)
if matches!(access_type, AccessType::PrimaryOnly) {
for cf_name in cf_names {
// this special column family must be excluded from LedgerCleanupService's rocksdb
// compactions
if cf_name == TransactionStatusIndex::NAME {
continue;
}
// This is the crux of our write-stall-free storage cleaning strategy with consistent
// state view for higher-layers
//
// For the consistent view, we commit delete_range on pruned slot range by LedgerCleanupService.
// simple story here.
//
// For actual storage cleaning, we employ RocksDB compaction. But default RocksDB compaction
// settings don't work well for us. That's because we're using it rather like a really big
// (100 GBs) ring-buffer. RocksDB is basically assuming uniform data write over the key space for
// efficient compaction, which isn't true for our use as a ring buffer.
//
// So, we customize the compaction strategy with 2 combined tweaks:
// (1) compaction_filter and (2) shortening its periodic cycles.
//
// Via the compaction_filter, we finally reclaim previously delete_range()-ed storage occupied
// by pruned slots. When compaction_filter is set, each SST files are re-compacted periodically
// to hunt for keys newly expired by the compaction_filter re-evaluation. But RocksDb's default
// `periodic_compaction_seconds` is 30 days, which is too long for our case. So, we
// shorten it to a day (24 hours).
//
// As we write newer SST files over time at rather consistent rate of speed, this
// effectively makes each newly-created ssts be re-compacted for the filter at
// well-dispersed different timings.
// As a whole, we rewrite the whole dataset at every PERIODIC_COMPACTION_SECONDS,
// slowly over the duration of PERIODIC_COMPACTION_SECONDS. So, this results in
// amortization.
// So, there is a bit inefficiency here because we'll rewrite not-so-old SST files
// too. But longer period would introduce higher variance of ledger storage sizes over
// the long period. And it's much better than the daily IO spike caused by compact_range() by
// previous implementation.
//
// `ttl` and `compact_range`(`ManualCompaction`), doesn't work nicely. That's
// because its original intention is delete_range()s to reclaim disk space. So it tries to merge
// them with N+1 SST files all way down to the bottommost SSTs, often leading to vastly large amount
// (= all) of invalidated SST files, when combined with newer writes happening at the opposite
// edge of the key space. This causes a long and heavy disk IOs and possible write
// stall and ultimately, the deadly Replay/Banking stage stall at higher layers.
db.0.set_options_cf(
db.cf_handle(cf_name),
&[(
"periodic_compaction_seconds",
&format!("{}", PERIODIC_COMPACTION_SECONDS),
)],
)
.unwrap();
}
}
Ok(db) Ok(db)
} }
@ -417,9 +552,13 @@ pub trait Column {
fn key(index: Self::Index) -> Vec<u8>; fn key(index: Self::Index) -> Vec<u8>;
fn index(key: &[u8]) -> Self::Index; fn index(key: &[u8]) -> Self::Index;
fn primary_index(index: Self::Index) -> Slot; // this return Slot or some u64
fn primary_index(index: Self::Index) -> u64;
#[allow(clippy::wrong_self_convention)] #[allow(clippy::wrong_self_convention)]
fn as_index(slot: Slot) -> Self::Index; fn as_index(slot: Slot) -> Self::Index;
fn slot(index: Self::Index) -> Slot {
Self::primary_index(index)
}
} }
pub trait ColumnName { pub trait ColumnName {
@ -493,6 +632,10 @@ impl Column for columns::TransactionStatus {
index.0 index.0
} }
fn slot(index: Self::Index) -> Slot {
index.2
}
#[allow(clippy::wrong_self_convention)] #[allow(clippy::wrong_self_convention)]
fn as_index(index: u64) -> Self::Index { fn as_index(index: u64) -> Self::Index {
(index, Signature::default(), 0) (index, Signature::default(), 0)
@ -530,6 +673,10 @@ impl Column for columns::AddressSignatures {
index.0 index.0
} }
fn slot(index: Self::Index) -> Slot {
index.2
}
#[allow(clippy::wrong_self_convention)] #[allow(clippy::wrong_self_convention)]
fn as_index(index: u64) -> Self::Index { fn as_index(index: u64) -> Self::Index {
(index, Pubkey::default(), 0, Signature::default()) (index, Pubkey::default(), 0, Signature::default())
@ -557,6 +704,10 @@ impl Column for columns::TransactionStatusIndex {
index index
} }
fn slot(_index: Self::Index) -> Slot {
unimplemented!()
}
#[allow(clippy::wrong_self_convention)] #[allow(clippy::wrong_self_convention)]
fn as_index(slot: u64) -> u64 { fn as_index(slot: u64) -> u64 {
slot slot
@ -857,6 +1008,10 @@ impl Database {
pub fn is_primary_access(&self) -> bool { pub fn is_primary_access(&self) -> bool {
self.backend.is_primary_access() self.backend.is_primary_access()
} }
pub fn set_oldest_slot(&self, oldest_slot: Slot) {
self.backend.2.set(oldest_slot);
}
} }
impl<C> LedgerColumn<C> impl<C> LedgerColumn<C>
@ -1034,7 +1189,63 @@ impl<'a> WriteBatch<'a> {
} }
} }
fn get_cf_options(access_type: &AccessType) -> Options { struct PurgedSlotFilter<C: Column + ColumnName> {
oldest_slot: Slot,
name: CString,
_phantom: PhantomData<C>,
}
impl<C: Column + ColumnName> CompactionFilter for PurgedSlotFilter<C> {
fn filter(&mut self, _level: u32, key: &[u8], _value: &[u8]) -> CompactionDecision {
use rocksdb::CompactionDecision::*;
let slot_in_key = C::slot(C::index(key));
// Refer to a comment about periodic_compaction_seconds, especially regarding implicit
// periodic execution of compaction_filters
if slot_in_key >= self.oldest_slot {
Keep
} else {
Remove
}
}
fn name(&self) -> &CStr {
&self.name
}
}
struct PurgedSlotFilterFactory<C: Column + ColumnName> {
oldest_slot: OldestSlot,
name: CString,
_phantom: PhantomData<C>,
}
impl<C: Column + ColumnName> CompactionFilterFactory for PurgedSlotFilterFactory<C> {
type Filter = PurgedSlotFilter<C>;
fn create(&mut self, _context: CompactionFilterContext) -> Self::Filter {
let copied_oldest_slot = self.oldest_slot.get();
PurgedSlotFilter::<C> {
oldest_slot: copied_oldest_slot,
name: CString::new(format!(
"purged_slot_filter({}, {:?})",
C::NAME,
copied_oldest_slot
))
.unwrap(),
_phantom: PhantomData::default(),
}
}
fn name(&self) -> &CStr {
&self.name
}
}
fn get_cf_options<C: 'static + Column + ColumnName>(
access_type: &AccessType,
oldest_slot: &OldestSlot,
) -> Options {
let mut options = Options::default(); let mut options = Options::default();
// 256 * 8 = 2GB. 6 of these columns should take at most 12GB of RAM // 256 * 8 = 2GB. 6 of these columns should take at most 12GB of RAM
options.set_max_write_buffer_number(8); options.set_max_write_buffer_number(8);
@ -1048,6 +1259,19 @@ fn get_cf_options(access_type: &AccessType) -> Options {
options.set_level_zero_file_num_compaction_trigger(file_num_compaction_trigger as i32); options.set_level_zero_file_num_compaction_trigger(file_num_compaction_trigger as i32);
options.set_max_bytes_for_level_base(total_size_base); options.set_max_bytes_for_level_base(total_size_base);
options.set_target_file_size_base(file_size_base); options.set_target_file_size_base(file_size_base);
// TransactionStatusIndex must be excluded from LedgerCleanupService's rocksdb
// compactions....
if matches!(access_type, AccessType::PrimaryOnly)
&& C::NAME != columns::TransactionStatusIndex::NAME
{
options.set_compaction_filter_factory(PurgedSlotFilterFactory::<C> {
oldest_slot: oldest_slot.clone(),
name: CString::new(format!("purged_slot_filter_factory({})", C::NAME)).unwrap(),
_phantom: PhantomData::default(),
});
}
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) { if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
options.set_disable_auto_compactions(true); options.set_disable_auto_compactions(true);
} }
@ -1079,3 +1303,57 @@ fn get_db_options(access_type: &AccessType) -> Options {
options options
} }
#[cfg(test)]
pub mod tests {
use super::*;
use crate::blockstore_db::columns::ShredData;
#[test]
fn test_compaction_filter() {
// this doesn't implement Clone...
let dummy_compaction_filter_context = || CompactionFilterContext {
is_full_compaction: true,
is_manual_compaction: true,
};
let oldest_slot = OldestSlot::default();
let mut factory = PurgedSlotFilterFactory::<ShredData> {
oldest_slot: oldest_slot.clone(),
name: CString::new("test compaction filter").unwrap(),
_phantom: PhantomData::default(),
};
let mut compaction_filter = factory.create(dummy_compaction_filter_context());
let dummy_level = 0;
let key = ShredData::key(ShredData::as_index(0));
let dummy_value = vec![];
// we can't use assert_matches! because CompactionDecision doesn't implement Debug
assert!(matches!(
compaction_filter.filter(dummy_level, &key, &dummy_value),
CompactionDecision::Keep
));
// mutating oledst_slot doen't affect existing compaction filters...
oldest_slot.set(1);
assert!(matches!(
compaction_filter.filter(dummy_level, &key, &dummy_value),
CompactionDecision::Keep
));
// recreating compaction filter starts to expire the key
let mut compaction_filter = factory.create(dummy_compaction_filter_context());
assert!(matches!(
compaction_filter.filter(dummy_level, &key, &dummy_value),
CompactionDecision::Remove
));
// newer key shouldn't be removed
let key = ShredData::key(ShredData::as_index(1));
matches!(
compaction_filter.filter(dummy_level, &key, &dummy_value),
CompactionDecision::Keep
);
}
}

View File

@ -1485,7 +1485,7 @@ pub fn main() {
Arg::with_name("no_rocksdb_compaction") Arg::with_name("no_rocksdb_compaction")
.long("no-rocksdb-compaction") .long("no-rocksdb-compaction")
.takes_value(false) .takes_value(false)
.help("Disable manual compaction of the ledger database. May increase storage requirements.") .help("Disable manual compaction of the ledger database (this is ignored).")
) )
.arg( .arg(
Arg::with_name("rocksdb_compaction_interval") Arg::with_name("rocksdb_compaction_interval")
@ -2001,7 +2001,7 @@ pub fn main() {
let private_rpc = matches.is_present("private_rpc"); let private_rpc = matches.is_present("private_rpc");
let no_port_check = matches.is_present("no_port_check"); let no_port_check = matches.is_present("no_port_check");
let no_rocksdb_compaction = matches.is_present("no_rocksdb_compaction"); let no_rocksdb_compaction = true;
let rocksdb_compaction_interval = value_t!(matches, "rocksdb_compaction_interval", u64).ok(); let rocksdb_compaction_interval = value_t!(matches, "rocksdb_compaction_interval", u64).ok();
let rocksdb_max_compaction_jitter = let rocksdb_max_compaction_jitter =
value_t!(matches, "rocksdb_max_compaction_jitter", u64).ok(); value_t!(matches, "rocksdb_max_compaction_jitter", u64).ok();