* Revert "fix tests after merge" This reverts commitba2d83f580
. (cherry picked from commit0a17edcc1f
) * Revert "1. Persist to blockstore less frequently;" This reverts commit7aa1fb4e24
. (cherry picked from commitc878c9e2cb
) # Conflicts: # core/src/cost_update_service.rs # core/src/tvu.rs # runtime/src/cost_model.rs * Revert "use EMA in place of Welford" This reverts commit6587dbfa47
. (cherry picked from commit9acbfa5eb1
) * Revert "- estimate a program cost as 2 standard deviation above mean" This reverts commita25ac1c988
. (cherry picked from commit5a0cd05866
) # Conflicts: # core/src/cost_update_service.rs # runtime/src/cost_model.rs * fix merge conflicts Co-authored-by: Carl Lin <carl@solana.com> Co-authored-by: Tao Zhu <tao@solana.com>
This commit is contained in:
@ -3102,10 +3102,6 @@ mod tests {
|
||||
..
|
||||
} = create_slow_genesis_config(lamports);
|
||||
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
|
||||
// set cost tracker limits to MAX so it will not filter out TXs
|
||||
bank.write_cost_tracker()
|
||||
.unwrap()
|
||||
.set_limits(std::u64::MAX, std::u64::MAX, std::u64::MAX);
|
||||
|
||||
// Transfer more than the balance of the mint keypair, should cause a
|
||||
// InstructionError::InsufficientFunds that is then committed. Needs to be
|
||||
@ -3162,10 +3158,6 @@ mod tests {
|
||||
..
|
||||
} = create_slow_genesis_config(10_000);
|
||||
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
|
||||
// set cost tracker limits to MAX so it will not filter out TXs
|
||||
bank.write_cost_tracker()
|
||||
.unwrap()
|
||||
.set_limits(std::u64::MAX, std::u64::MAX, std::u64::MAX);
|
||||
|
||||
// Make all repetitive transactions that conflict on the `mint_keypair`, so only 1 should be executed
|
||||
let mut transactions = vec![
|
||||
|
@ -10,14 +10,13 @@ use {
|
||||
solana_runtime::{bank::Bank, cost_model::CostModel},
|
||||
solana_sdk::timing::timestamp,
|
||||
std::{
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
sync::{mpsc::Receiver, Arc, RwLock},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
time::Duration,
|
||||
},
|
||||
};
|
||||
|
||||
// Update blockstore persistence storage when accumulated cost_table updates count exceeds the threshold
|
||||
const PERSIST_THRESHOLD: u64 = 1_000;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct CostUpdateServiceTiming {
|
||||
last_print: u64,
|
||||
@ -29,25 +28,20 @@ pub struct CostUpdateServiceTiming {
|
||||
impl CostUpdateServiceTiming {
|
||||
fn update(
|
||||
&mut self,
|
||||
update_cost_model_count: Option<u64>,
|
||||
update_cost_model_elapsed: Option<u64>,
|
||||
persist_cost_table_elapsed: Option<u64>,
|
||||
update_cost_model_count: u64,
|
||||
update_cost_model_elapsed: u64,
|
||||
persist_cost_table_elapsed: u64,
|
||||
) {
|
||||
if let Some(update_cost_model_count) = update_cost_model_count {
|
||||
self.update_cost_model_count += update_cost_model_count;
|
||||
}
|
||||
if let Some(update_cost_model_elapsed) = update_cost_model_elapsed {
|
||||
self.update_cost_model_elapsed += update_cost_model_elapsed;
|
||||
}
|
||||
if let Some(persist_cost_table_elapsed) = persist_cost_table_elapsed {
|
||||
self.persist_cost_table_elapsed += persist_cost_table_elapsed;
|
||||
}
|
||||
self.update_cost_model_count += update_cost_model_count;
|
||||
self.update_cost_model_elapsed += update_cost_model_elapsed;
|
||||
self.persist_cost_table_elapsed += persist_cost_table_elapsed;
|
||||
|
||||
let now = timestamp();
|
||||
let elapsed_ms = now - self.last_print;
|
||||
if elapsed_ms > 1000 {
|
||||
datapoint_info!(
|
||||
"cost-update-service-stats",
|
||||
("total_elapsed_us", elapsed_ms * 1000, i64),
|
||||
(
|
||||
"update_cost_model_count",
|
||||
self.update_cost_model_count as i64,
|
||||
@ -89,6 +83,7 @@ pub struct CostUpdateService {
|
||||
impl CostUpdateService {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new(
|
||||
exit: Arc<AtomicBool>,
|
||||
blockstore: Arc<Blockstore>,
|
||||
cost_model: Arc<RwLock<CostModel>>,
|
||||
cost_update_receiver: CostUpdateReceiver,
|
||||
@ -96,7 +91,7 @@ impl CostUpdateService {
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-cost-update-service".to_string())
|
||||
.spawn(move || {
|
||||
Self::service_loop(blockstore, cost_model, cost_update_receiver);
|
||||
Self::service_loop(exit, blockstore, cost_model, cost_update_receiver);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
@ -108,99 +103,118 @@ impl CostUpdateService {
|
||||
}
|
||||
|
||||
fn service_loop(
|
||||
exit: Arc<AtomicBool>,
|
||||
blockstore: Arc<Blockstore>,
|
||||
cost_model: Arc<RwLock<CostModel>>,
|
||||
cost_update_receiver: CostUpdateReceiver,
|
||||
) {
|
||||
let mut cost_update_service_timing = CostUpdateServiceTiming::default();
|
||||
let mut update_count = 0_u64;
|
||||
let mut dirty: bool;
|
||||
let mut update_count: u64;
|
||||
let wait_timer = Duration::from_millis(100);
|
||||
|
||||
for cost_update in cost_update_receiver.iter() {
|
||||
match cost_update {
|
||||
CostUpdate::FrozenBank { bank } => {
|
||||
bank.read_cost_tracker().unwrap().report_stats(bank.slot());
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
dirty = false;
|
||||
update_count = 0_u64;
|
||||
let mut update_cost_model_time = Measure::start("update_cost_model_time");
|
||||
for cost_update in cost_update_receiver.try_iter() {
|
||||
match cost_update {
|
||||
CostUpdate::FrozenBank { bank } => {
|
||||
bank.read_cost_tracker().unwrap().report_stats(bank.slot());
|
||||
}
|
||||
CostUpdate::ExecuteTiming {
|
||||
mut execute_timings,
|
||||
} => {
|
||||
dirty |= Self::update_cost_model(&cost_model, &mut execute_timings);
|
||||
update_count += 1;
|
||||
}
|
||||
}
|
||||
CostUpdate::ExecuteTiming {
|
||||
mut execute_timings,
|
||||
} => {
|
||||
let mut update_cost_model_time = Measure::start("update_cost_model_time");
|
||||
update_count += Self::update_cost_model(&cost_model, &mut execute_timings);
|
||||
update_cost_model_time.stop();
|
||||
cost_update_service_timing.update(
|
||||
Some(update_count),
|
||||
Some(update_cost_model_time.as_us()),
|
||||
None,
|
||||
);
|
||||
}
|
||||
update_cost_model_time.stop();
|
||||
|
||||
if update_count > PERSIST_THRESHOLD {
|
||||
let mut persist_cost_table_time = Measure::start("persist_cost_table_time");
|
||||
Self::persist_cost_table(&blockstore, &cost_model);
|
||||
update_count = 0_u64;
|
||||
persist_cost_table_time.stop();
|
||||
cost_update_service_timing.update(
|
||||
None,
|
||||
None,
|
||||
Some(persist_cost_table_time.as_us()),
|
||||
let mut persist_cost_table_time = Measure::start("persist_cost_table_time");
|
||||
if dirty {
|
||||
Self::persist_cost_table(&blockstore, &cost_model);
|
||||
}
|
||||
persist_cost_table_time.stop();
|
||||
|
||||
cost_update_service_timing.update(
|
||||
update_count,
|
||||
update_cost_model_time.as_us(),
|
||||
persist_cost_table_time.as_us(),
|
||||
);
|
||||
|
||||
thread::sleep(wait_timer);
|
||||
}
|
||||
}
|
||||
|
||||
fn update_cost_model(
|
||||
cost_model: &RwLock<CostModel>,
|
||||
execute_timings: &mut ExecuteTimings,
|
||||
) -> bool {
|
||||
let mut dirty = false;
|
||||
{
|
||||
for (program_id, program_timings) in &mut execute_timings.details.per_program_timings {
|
||||
let current_estimated_program_cost =
|
||||
cost_model.read().unwrap().find_instruction_cost(program_id);
|
||||
program_timings.coalesce_error_timings(current_estimated_program_cost);
|
||||
|
||||
if program_timings.count < 1 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let units = program_timings.accumulated_units / program_timings.count as u64;
|
||||
match cost_model
|
||||
.write()
|
||||
.unwrap()
|
||||
.upsert_instruction_cost(program_id, units)
|
||||
{
|
||||
Ok(c) => {
|
||||
debug!(
|
||||
"after replayed into bank, instruction {:?} has averaged cost {}",
|
||||
program_id, c
|
||||
);
|
||||
dirty = true;
|
||||
}
|
||||
Err(err) => {
|
||||
debug!(
|
||||
"after replayed into bank, instruction {:?} failed to update cost, err: {}",
|
||||
program_id, err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!(
|
||||
"after replayed into bank, updated cost model instruction cost table, current values: {:?}",
|
||||
cost_model.read().unwrap().get_instruction_cost_table()
|
||||
);
|
||||
dirty
|
||||
}
|
||||
|
||||
// Normalize `program_timings` with current estimated cost, update instruction_cost table
|
||||
// Returns number of updates applied
|
||||
fn update_cost_model(
|
||||
cost_model: &RwLock<CostModel>,
|
||||
execute_timings: &mut ExecuteTimings,
|
||||
) -> u64 {
|
||||
let mut update_count = 0_u64;
|
||||
for (program_id, program_timings) in &mut execute_timings.details.per_program_timings {
|
||||
let current_estimated_program_cost =
|
||||
cost_model.read().unwrap().find_instruction_cost(program_id);
|
||||
program_timings.coalesce_error_timings(current_estimated_program_cost);
|
||||
|
||||
if program_timings.count < 1 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let units = program_timings.accumulated_units / program_timings.count as u64;
|
||||
cost_model
|
||||
.write()
|
||||
.unwrap()
|
||||
.upsert_instruction_cost(program_id, units);
|
||||
update_count += 1;
|
||||
debug!(
|
||||
"After replayed into bank, updated cost for instruction {:?}, update_value {}, pre_aggregated_value {}",
|
||||
program_id, units, current_estimated_program_cost
|
||||
);
|
||||
}
|
||||
update_count
|
||||
}
|
||||
|
||||
// 1. Remove obsolete program entries from persisted table to limit its size
|
||||
// 2. Update persisted program cost. This involves EMA cost calculation at
|
||||
// execute_cost_table.get_cost()
|
||||
fn persist_cost_table(blockstore: &Blockstore, cost_model: &RwLock<CostModel>) {
|
||||
let cost_model_read = cost_model.read().unwrap();
|
||||
let cost_table = cost_model_read.get_instruction_cost_table();
|
||||
let db_records = blockstore.read_program_costs().expect("read programs");
|
||||
let cost_model = cost_model.read().unwrap();
|
||||
let active_program_keys = cost_model.get_program_keys();
|
||||
|
||||
// delete records from blockstore if they are no longer in cost_table
|
||||
db_records.iter().for_each(|(pubkey, _)| {
|
||||
if !active_program_keys.contains(&pubkey) {
|
||||
if cost_table.get(pubkey).is_none() {
|
||||
blockstore
|
||||
.delete_program_cost(pubkey)
|
||||
.expect("delete old program");
|
||||
}
|
||||
});
|
||||
|
||||
active_program_keys.iter().for_each(|program_id| {
|
||||
let cost = cost_model.find_instruction_cost(program_id);
|
||||
for (key, cost) in cost_table.iter() {
|
||||
blockstore
|
||||
.write_program_cost(program_id, &cost)
|
||||
.write_program_cost(key, cost)
|
||||
.expect("persist program costs to blockstore");
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -212,9 +226,15 @@ mod tests {
|
||||
fn test_update_cost_model_with_empty_execute_timings() {
|
||||
let cost_model = Arc::new(RwLock::new(CostModel::default()));
|
||||
let mut empty_execute_timings = ExecuteTimings::default();
|
||||
CostUpdateService::update_cost_model(&cost_model, &mut empty_execute_timings);
|
||||
|
||||
assert_eq!(
|
||||
CostUpdateService::update_cost_model(&cost_model, &mut empty_execute_timings),
|
||||
0
|
||||
0,
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_instruction_cost_table()
|
||||
.len()
|
||||
);
|
||||
}
|
||||
|
||||
@ -232,7 +252,7 @@ mod tests {
|
||||
let accumulated_units: u64 = 100;
|
||||
let total_errored_units = 0;
|
||||
let count: u32 = 10;
|
||||
expected_cost = accumulated_units / count as u64; // = 10
|
||||
expected_cost = accumulated_units / count as u64;
|
||||
|
||||
execute_timings.details.per_program_timings.insert(
|
||||
program_key_1,
|
||||
@ -244,15 +264,22 @@ mod tests {
|
||||
total_errored_units,
|
||||
},
|
||||
);
|
||||
let update_count =
|
||||
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
|
||||
assert_eq!(1, update_count);
|
||||
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
|
||||
assert_eq!(
|
||||
expected_cost,
|
||||
1,
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.find_instruction_cost(&program_key_1)
|
||||
.get_instruction_cost_table()
|
||||
.len()
|
||||
);
|
||||
assert_eq!(
|
||||
Some(&expected_cost),
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_instruction_cost_table()
|
||||
.get(&program_key_1)
|
||||
);
|
||||
}
|
||||
|
||||
@ -261,8 +288,8 @@ mod tests {
|
||||
let accumulated_us: u64 = 2000;
|
||||
let accumulated_units: u64 = 200;
|
||||
let count: u32 = 10;
|
||||
// to expect new cost = (mean + 2 * std) of [10, 20]
|
||||
expected_cost = 13;
|
||||
// to expect new cost is Average(new_value, existing_value)
|
||||
expected_cost = ((accumulated_units / count as u64) + expected_cost) / 2;
|
||||
|
||||
execute_timings.details.per_program_timings.insert(
|
||||
program_key_1,
|
||||
@ -274,15 +301,22 @@ mod tests {
|
||||
total_errored_units: 0,
|
||||
},
|
||||
);
|
||||
let update_count =
|
||||
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
|
||||
assert_eq!(1, update_count);
|
||||
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
|
||||
assert_eq!(
|
||||
expected_cost,
|
||||
1,
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.find_instruction_cost(&program_key_1)
|
||||
.get_instruction_cost_table()
|
||||
.len()
|
||||
);
|
||||
assert_eq!(
|
||||
Some(&expected_cost),
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_instruction_cost_table()
|
||||
.get(&program_key_1)
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -306,49 +340,20 @@ mod tests {
|
||||
total_errored_units: 0,
|
||||
},
|
||||
);
|
||||
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
|
||||
// If both the `errored_txs_compute_consumed` is empty and `count == 0`, then
|
||||
// nothing should be inserted into the cost model
|
||||
assert_eq!(
|
||||
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings),
|
||||
0
|
||||
);
|
||||
}
|
||||
|
||||
// set up current instruction cost to 100
|
||||
let current_program_cost = 100;
|
||||
{
|
||||
execute_timings.details.per_program_timings.insert(
|
||||
program_key_1,
|
||||
ProgramTiming {
|
||||
accumulated_us: 1000,
|
||||
accumulated_units: current_program_cost,
|
||||
count: 1,
|
||||
errored_txs_compute_consumed: vec![],
|
||||
total_errored_units: 0,
|
||||
},
|
||||
);
|
||||
let update_count =
|
||||
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
|
||||
assert_eq!(1, update_count);
|
||||
assert_eq!(
|
||||
current_program_cost,
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.find_instruction_cost(&program_key_1)
|
||||
);
|
||||
assert!(cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_instruction_cost_table()
|
||||
.is_empty());
|
||||
}
|
||||
|
||||
// Test updating cost model with only erroring compute costs where the `cost_per_error` is
|
||||
// greater than the current instruction cost for the program. Should update with the
|
||||
// new erroring compute costs
|
||||
let cost_per_error = 1000;
|
||||
// expected_cost = (mean + 2*std) of data points:
|
||||
// [
|
||||
// 100, // original program_cost
|
||||
// 1000, // cost_per_error
|
||||
// ]
|
||||
let expected_cost = 289u64;
|
||||
{
|
||||
let errored_txs_compute_consumed = vec![cost_per_error; 3];
|
||||
let total_errored_units = errored_txs_compute_consumed.iter().sum();
|
||||
@ -362,23 +367,29 @@ mod tests {
|
||||
total_errored_units,
|
||||
},
|
||||
);
|
||||
let update_count =
|
||||
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
|
||||
|
||||
assert_eq!(1, update_count);
|
||||
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
|
||||
assert_eq!(
|
||||
expected_cost,
|
||||
1,
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.find_instruction_cost(&program_key_1)
|
||||
.get_instruction_cost_table()
|
||||
.len()
|
||||
);
|
||||
assert_eq!(
|
||||
Some(&cost_per_error),
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_instruction_cost_table()
|
||||
.get(&program_key_1)
|
||||
);
|
||||
}
|
||||
|
||||
// Test updating cost model with only erroring compute costs where the error cost is
|
||||
// `smaller_cost_per_error`, less than the current instruction cost for the program.
|
||||
// The cost should not decrease for these new lesser errors
|
||||
let smaller_cost_per_error = expected_cost - 10;
|
||||
let smaller_cost_per_error = cost_per_error - 10;
|
||||
{
|
||||
let errored_txs_compute_consumed = vec![smaller_cost_per_error; 3];
|
||||
let total_errored_units = errored_txs_compute_consumed.iter().sum();
|
||||
@ -392,23 +403,22 @@ mod tests {
|
||||
total_errored_units,
|
||||
},
|
||||
);
|
||||
let update_count =
|
||||
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
|
||||
|
||||
// expected_cost = (mean = 2*std) of data points:
|
||||
// [
|
||||
// 100, // original program cost,
|
||||
// 1000, // cost_per_error from above test
|
||||
// 289, // the smaller_cost_per_error will be coalesced to prev cost
|
||||
// ]
|
||||
let expected_cost = 293u64;
|
||||
assert_eq!(1, update_count);
|
||||
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
|
||||
assert_eq!(
|
||||
expected_cost,
|
||||
1,
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.find_instruction_cost(&program_key_1)
|
||||
.get_instruction_cost_table()
|
||||
.len()
|
||||
);
|
||||
assert_eq!(
|
||||
Some(&cost_per_error),
|
||||
cost_model
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_instruction_cost_table()
|
||||
.get(&program_key_1)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -311,8 +311,12 @@ impl Tvu {
|
||||
);
|
||||
|
||||
let (cost_update_sender, cost_update_receiver) = channel();
|
||||
let cost_update_service =
|
||||
CostUpdateService::new(blockstore.clone(), cost_model.clone(), cost_update_receiver);
|
||||
let cost_update_service = CostUpdateService::new(
|
||||
exit.clone(),
|
||||
blockstore.clone(),
|
||||
cost_model.clone(),
|
||||
cost_update_receiver,
|
||||
);
|
||||
|
||||
let (drop_bank_sender, drop_bank_receiver) = channel();
|
||||
|
||||
|
Reference in New Issue
Block a user