Persist cost table to blockstore (#18123)
* Add `ProgramCosts` Column Family to blockstore, implement LedgerColumn; add `delete_cf` to Rocks * Add ProgramCosts to compaction excluding list alone side with TransactionStatusIndex in one place: `excludes_from_compaction()` * Write cost table to blockstore after `replay_stage` replayed active banks; add stats to measure persist time * Deletes program from `ProgramCosts` in blockstore when they are removed from cost_table in memory * Only try to persist to blockstore when cost_table is changed. * Restore cost table during validator startup * Offload `cost_model` related operations from replay main thread to dedicated service thread, add channel to send execute_timings between these threads; * Move `cost_update_service` to its own module; replay_stage is now decoupled from cost_model.
This commit is contained in:
275
core/src/cost_update_service.rs
Normal file
275
core/src/cost_update_service.rs
Normal file
@@ -0,0 +1,275 @@
|
||||
//! this service receives instruction ExecuteTimings from replay_stage,
|
||||
//! update cost_model which is shared with banking_stage to optimize
|
||||
//! packing transactions into block; it also triggers persisting cost
|
||||
//! table to blockstore.
|
||||
|
||||
use crate::cost_model::CostModel;
|
||||
use solana_ledger::blockstore::Blockstore;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_runtime::bank::ExecuteTimings;
|
||||
use solana_sdk::timing::timestamp;
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
mpsc::Receiver,
|
||||
Arc, RwLock,
|
||||
},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct CostUpdateServiceTiming {
|
||||
last_print: u64,
|
||||
update_cost_model_count: u64,
|
||||
update_cost_model_elapsed: u64,
|
||||
persist_cost_table_elapsed: u64,
|
||||
}
|
||||
|
||||
impl CostUpdateServiceTiming {
|
||||
fn update(
|
||||
&mut self,
|
||||
update_cost_model_count: u64,
|
||||
update_cost_model_elapsed: u64,
|
||||
persist_cost_table_elapsed: u64,
|
||||
) {
|
||||
self.update_cost_model_count += update_cost_model_count;
|
||||
self.update_cost_model_elapsed += update_cost_model_elapsed;
|
||||
self.persist_cost_table_elapsed += persist_cost_table_elapsed;
|
||||
|
||||
let now = timestamp();
|
||||
let elapsed_ms = now - self.last_print;
|
||||
if elapsed_ms > 1000 {
|
||||
datapoint_info!(
|
||||
"replay-service-timing-stats",
|
||||
("total_elapsed_us", elapsed_ms * 1000, i64),
|
||||
(
|
||||
"update_cost_model_count",
|
||||
self.update_cost_model_count as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"update_cost_model_elapsed",
|
||||
self.update_cost_model_elapsed as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"persist_cost_table_elapsed",
|
||||
self.persist_cost_table_elapsed as i64,
|
||||
i64
|
||||
),
|
||||
);
|
||||
|
||||
*self = CostUpdateServiceTiming::default();
|
||||
self.last_print = now;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type CostUpdateReceiver = Receiver<ExecuteTimings>;
|
||||
|
||||
pub struct CostUpdateService {
|
||||
thread_hdl: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl CostUpdateService {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new(
|
||||
exit: Arc<AtomicBool>,
|
||||
blockstore: Arc<Blockstore>,
|
||||
cost_model: Arc<RwLock<CostModel>>,
|
||||
cost_update_receiver: CostUpdateReceiver,
|
||||
) -> Self {
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-cost-update-service".to_string())
|
||||
.spawn(move || {
|
||||
Self::service_loop(exit, blockstore, cost_model, cost_update_receiver);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
Self { thread_hdl }
|
||||
}
|
||||
|
||||
pub fn join(self) -> thread::Result<()> {
|
||||
self.thread_hdl.join()
|
||||
}
|
||||
|
||||
fn service_loop(
|
||||
exit: Arc<AtomicBool>,
|
||||
blockstore: Arc<Blockstore>,
|
||||
cost_model: Arc<RwLock<CostModel>>,
|
||||
cost_update_receiver: CostUpdateReceiver,
|
||||
) {
|
||||
let mut cost_update_service_timing = CostUpdateServiceTiming::default();
|
||||
let mut dirty = false;
|
||||
let wait_timer = Duration::from_millis(100);
|
||||
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut update_count = 0_u64;
|
||||
let mut update_cost_model_time = Measure::start("update_cost_model_time");
|
||||
for cost_update in cost_update_receiver.try_iter() {
|
||||
dirty |= Self::update_cost_model(&cost_model, &cost_update);
|
||||
update_count += 1;
|
||||
}
|
||||
update_cost_model_time.stop();
|
||||
|
||||
let mut persist_cost_table_time = Measure::start("persist_cost_table_time");
|
||||
if dirty {
|
||||
Self::persist_cost_table(&blockstore, &cost_model);
|
||||
}
|
||||
persist_cost_table_time.stop();
|
||||
|
||||
cost_update_service_timing.update(
|
||||
update_count,
|
||||
update_cost_model_time.as_us(),
|
||||
persist_cost_table_time.as_us(),
|
||||
);
|
||||
|
||||
thread::sleep(wait_timer);
|
||||
}
|
||||
}
|
||||
|
||||
fn update_cost_model(cost_model: &RwLock<CostModel>, execute_timings: &ExecuteTimings) -> bool {
|
||||
let mut dirty = false;
|
||||
let mut cost_model_mutable = cost_model.write().unwrap();
|
||||
for (program_id, stats) in &execute_timings.details.per_program_timings {
|
||||
let cost = stats.0 / stats.1 as u64;
|
||||
match cost_model_mutable.upsert_instruction_cost(program_id, &cost) {
|
||||
Ok(c) => {
|
||||
debug!(
|
||||
"after replayed into bank, instruction {:?} has averaged cost {}",
|
||||
program_id, c
|
||||
);
|
||||
dirty = true;
|
||||
}
|
||||
Err(err) => {
|
||||
debug!(
|
||||
"after replayed into bank, instruction {:?} failed to update cost, err: {}",
|
||||
program_id, err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(cost_model_mutable);
|
||||
debug!(
|
||||
"after replayed into bank, updated cost model instruction cost table, current values: {:?}",
|
||||
cost_model.read().unwrap().get_instruction_cost_table()
|
||||
);
|
||||
dirty
|
||||
}
|
||||
|
||||
fn persist_cost_table(blockstore: &Blockstore, cost_model: &RwLock<CostModel>) {
|
||||
let cost_model_read = cost_model.read().unwrap();
|
||||
let cost_table = cost_model_read.get_instruction_cost_table();
|
||||
let db_records = blockstore.read_program_costs().expect("read programs");
|
||||
|
||||
// delete records from blockstore if they are no longer in cost_table
|
||||
db_records.iter().for_each(|(pubkey, _)| {
|
||||
if cost_table.get(pubkey).is_none() {
|
||||
blockstore
|
||||
.delete_program_cost(pubkey)
|
||||
.expect("delete old program");
|
||||
}
|
||||
});
|
||||
|
||||
for (key, cost) in cost_table.iter() {
|
||||
blockstore
|
||||
.write_program_cost(key, cost)
|
||||
.expect("persist program costs to blockstore");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
|
||||
#[test]
|
||||
fn test_update_cost_model_with_empty_execute_timings() {
|
||||
let cost_model = Arc::new(RwLock::new(CostModel::default()));
|
||||
let empty_execute_timings = ExecuteTimings::default();
|
||||
CostUpdateService::update_cost_model(&cost_model, &empty_execute_timings);
|
||||
|
||||
assert_eq!(
|
||||
0,
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_instruction_cost_table()
|
||||
.len()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_update_cost_model_with_execute_timings() {
|
||||
let cost_model = Arc::new(RwLock::new(CostModel::default()));
|
||||
let mut execute_timings = ExecuteTimings::default();
|
||||
|
||||
let program_key_1 = Pubkey::new_unique();
|
||||
let mut expected_cost: u64;
|
||||
|
||||
// add new program
|
||||
{
|
||||
let accumulated_us: u64 = 1000;
|
||||
let count: u32 = 10;
|
||||
expected_cost = accumulated_us / count as u64;
|
||||
|
||||
execute_timings
|
||||
.details
|
||||
.per_program_timings
|
||||
.insert(program_key_1, (accumulated_us, count));
|
||||
CostUpdateService::update_cost_model(&cost_model, &execute_timings);
|
||||
assert_eq!(
|
||||
1,
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_instruction_cost_table()
|
||||
.len()
|
||||
);
|
||||
assert_eq!(
|
||||
Some(&expected_cost),
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_instruction_cost_table()
|
||||
.get(&program_key_1)
|
||||
);
|
||||
}
|
||||
|
||||
// update program
|
||||
{
|
||||
let accumulated_us: u64 = 2000;
|
||||
let count: u32 = 10;
|
||||
// to expect new cost is Average(new_value, existing_value)
|
||||
expected_cost = ((accumulated_us / count as u64) + expected_cost) / 2;
|
||||
|
||||
execute_timings
|
||||
.details
|
||||
.per_program_timings
|
||||
.insert(program_key_1, (accumulated_us, count));
|
||||
CostUpdateService::update_cost_model(&cost_model, &execute_timings);
|
||||
assert_eq!(
|
||||
1,
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_instruction_cost_table()
|
||||
.len()
|
||||
);
|
||||
assert_eq!(
|
||||
Some(&expected_cost),
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_instruction_cost_table()
|
||||
.get(&program_key_1)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user