1. Persist to blockstore less frequently;

2. reduce alpha for EMA to 1 percent to have roughly 200 data points for estimatio
This commit is contained in:
Tao Zhu
2022-02-04 18:57:02 -06:00
committed by Tao Zhu
parent 6587dbfa47
commit 7aa1fb4e24
4 changed files with 232 additions and 176 deletions

View File

@ -9,18 +9,16 @@ use {
solana_measure::measure::Measure,
solana_program_runtime::timings::ExecuteTimings,
solana_runtime::{bank::Bank, cost_model::CostModel},
solana_sdk::{pubkey::Pubkey, timing::timestamp},
solana_sdk::timing::timestamp,
std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};
// Update blockstore persistence storage when accumulated cost_table updates count exceeds the threshold
const PERSIST_THRESHOLD: u64 = 1_000;
#[derive(Default)]
pub struct CostUpdateServiceTiming {
last_print: u64,
@ -32,20 +30,25 @@ pub struct CostUpdateServiceTiming {
impl CostUpdateServiceTiming {
fn update(
&mut self,
update_cost_model_count: u64,
update_cost_model_elapsed: u64,
persist_cost_table_elapsed: u64,
update_cost_model_count: Option<u64>,
update_cost_model_elapsed: Option<u64>,
persist_cost_table_elapsed: Option<u64>,
) {
self.update_cost_model_count += update_cost_model_count;
self.update_cost_model_elapsed += update_cost_model_elapsed;
self.persist_cost_table_elapsed += persist_cost_table_elapsed;
if let Some(update_cost_model_count) = update_cost_model_count {
self.update_cost_model_count += update_cost_model_count;
}
if let Some(update_cost_model_elapsed) = update_cost_model_elapsed {
self.update_cost_model_elapsed += update_cost_model_elapsed;
}
if let Some(persist_cost_table_elapsed) = persist_cost_table_elapsed {
self.persist_cost_table_elapsed += persist_cost_table_elapsed;
}
let now = timestamp();
let elapsed_ms = now - self.last_print;
if elapsed_ms > 1000 {
datapoint_info!(
"cost-update-service-stats",
("total_elapsed_us", elapsed_ms * 1000, i64),
(
"update_cost_model_count",
self.update_cost_model_count as i64,
@ -87,7 +90,6 @@ pub struct CostUpdateService {
impl CostUpdateService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
cost_model: Arc<RwLock<CostModel>>,
cost_update_receiver: CostUpdateReceiver,
@ -95,7 +97,7 @@ impl CostUpdateService {
let thread_hdl = Builder::new()
.name("solana-cost-update-service".to_string())
.spawn(move || {
Self::service_loop(exit, blockstore, cost_model, cost_update_receiver);
Self::service_loop(blockstore, cost_model, cost_update_receiver);
})
.unwrap();
@ -107,58 +109,53 @@ impl CostUpdateService {
}
fn service_loop(
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
cost_model: Arc<RwLock<CostModel>>,
cost_update_receiver: CostUpdateReceiver,
) {
let mut cost_update_service_timing = CostUpdateServiceTiming::default();
let mut update_count: u64;
let mut updated_program_costs = HashMap::<Pubkey, u64>::new();
let wait_timer = Duration::from_millis(100);
let mut update_count = 0_u64;
loop {
if exit.load(Ordering::Relaxed) {
break;
}
for cost_update in cost_update_receiver.iter() {
match cost_update {
CostUpdate::FrozenBank { bank } => {
bank.read_cost_tracker().unwrap().report_stats(bank.slot());
}
CostUpdate::ExecuteTiming {
mut execute_timings,
} => {
let mut update_cost_model_time = Measure::start("update_cost_model_time");
update_count += Self::update_cost_model(&cost_model, &mut execute_timings);
update_cost_model_time.stop();
cost_update_service_timing.update(
Some(update_count),
Some(update_cost_model_time.as_us()),
None,
);
update_count = 0_u64;
let mut update_cost_model_time = Measure::start("update_cost_model_time");
for cost_update in cost_update_receiver.try_iter() {
match cost_update {
CostUpdate::FrozenBank { bank } => {
bank.read_cost_tracker().unwrap().report_stats(bank.slot());
}
CostUpdate::ExecuteTiming {
mut execute_timings,
} => {
updated_program_costs =
Self::update_cost_model(&cost_model, &mut execute_timings);
update_count += 1;
if update_count > PERSIST_THRESHOLD {
let mut persist_cost_table_time = Measure::start("persist_cost_table_time");
Self::persist_cost_table(&blockstore, &cost_model);
update_count = 0_u64;
persist_cost_table_time.stop();
cost_update_service_timing.update(
None,
None,
Some(persist_cost_table_time.as_us()),
);
}
}
}
update_cost_model_time.stop();
let mut persist_cost_table_time = Measure::start("persist_cost_table_time");
Self::persist_cost_table(&blockstore, &updated_program_costs);
persist_cost_table_time.stop();
cost_update_service_timing.update(
update_count,
update_cost_model_time.as_us(),
persist_cost_table_time.as_us(),
);
thread::sleep(wait_timer);
}
}
// Normalize `program_timings` with current estimated cost, update instruction_cost table
// Returns number of updates applied
fn update_cost_model(
cost_model: &RwLock<CostModel>,
execute_timings: &mut ExecuteTimings,
) -> HashMap<Pubkey, u64> {
let mut updated_program_costs = HashMap::<Pubkey, u64>::new();
) -> u64 {
let mut update_count = 0_u64;
for (program_id, program_timings) in &mut execute_timings.details.per_program_timings {
let current_estimated_program_cost =
cost_model.read().unwrap().find_instruction_cost(program_id);
@ -169,50 +166,42 @@ impl CostUpdateService {
}
let units = program_timings.accumulated_units / program_timings.count as u64;
match cost_model
cost_model
.write()
.unwrap()
.upsert_instruction_cost(program_id, units)
{
Ok(cost) => {
debug!(
"after replayed into bank, instruction {:?} has averaged cost {}",
program_id, cost
);
updated_program_costs.insert(*program_id, cost);
}
Err(err) => {
debug!(
"after replayed into bank, instruction {:?} failed to update cost, err: {}",
program_id, err
);
}
}
.upsert_instruction_cost(program_id, units);
update_count += 1;
debug!(
"After replayed into bank, updated cost for instruction {:?}, update_value {}, pre_aggregated_value {}",
program_id, units, current_estimated_program_cost
);
}
updated_program_costs
update_count
}
fn persist_cost_table(blockstore: &Blockstore, updated_program_costs: &HashMap<Pubkey, u64>) {
if updated_program_costs.is_empty() {
return;
}
// 1. Remove obsolete program entries from persisted table to limit its size
// 2. Update persisted program cost. This involves EMA cost calculation at
// execute_cost_table.get_cost()
fn persist_cost_table(blockstore: &Blockstore, cost_model: &RwLock<CostModel>) {
let db_records = blockstore.read_program_costs().expect("read programs");
let cost_model = cost_model.read().unwrap();
let active_program_keys = cost_model.get_program_keys();
// delete records from blockstore if they are no longer in cost_table
db_records.iter().for_each(|(pubkey, _)| {
if !updated_program_costs.contains_key(pubkey) {
if !active_program_keys.contains(&pubkey) {
blockstore
.delete_program_cost(pubkey)
.expect("delete old program");
}
});
for (key, cost) in updated_program_costs.iter() {
active_program_keys.iter().for_each(|program_id| {
let cost = cost_model.find_instruction_cost(program_id);
blockstore
.write_program_cost(key, cost)
.write_program_cost(program_id, &cost)
.expect("persist program costs to blockstore");
}
});
}
}
@ -224,9 +213,9 @@ mod tests {
fn test_update_cost_model_with_empty_execute_timings() {
let cost_model = Arc::new(RwLock::new(CostModel::default()));
let mut empty_execute_timings = ExecuteTimings::default();
assert!(
CostUpdateService::update_cost_model(&cost_model, &mut empty_execute_timings)
.is_empty()
assert_eq!(
CostUpdateService::update_cost_model(&cost_model, &mut empty_execute_timings),
0
);
}
@ -256,12 +245,15 @@ mod tests {
total_errored_units,
},
);
let updated_program_costs =
let update_count =
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
assert_eq!(1, updated_program_costs.len());
assert_eq!(1, update_count);
assert_eq!(
Some(&expected_cost),
updated_program_costs.get(&program_key_1)
expected_cost,
cost_model
.read()
.unwrap()
.find_instruction_cost(&program_key_1)
);
}
@ -270,8 +262,8 @@ mod tests {
let accumulated_us: u64 = 2000;
let accumulated_units: u64 = 200;
let count: u32 = 10;
// to expect new cost = (mean + 2 * std)
expected_cost = 24;
// to expect new cost = (mean + 2 * std) of [10, 20]
expected_cost = 13;
execute_timings.details.per_program_timings.insert(
program_key_1,
@ -283,12 +275,15 @@ mod tests {
total_errored_units: 0,
},
);
let updated_program_costs =
let update_count =
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
assert_eq!(1, updated_program_costs.len());
assert_eq!(1, update_count);
assert_eq!(
Some(&expected_cost),
updated_program_costs.get(&program_key_1)
expected_cost,
cost_model
.read()
.unwrap()
.find_instruction_cost(&program_key_1)
);
}
}
@ -314,8 +309,9 @@ mod tests {
);
// If both the `errored_txs_compute_consumed` is empty and `count == 0`, then
// nothing should be inserted into the cost model
assert!(
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings).is_empty()
assert_eq!(
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings),
0
);
}
@ -332,12 +328,15 @@ mod tests {
total_errored_units: 0,
},
);
let updated_program_costs =
let update_count =
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
assert_eq!(1, updated_program_costs.len());
assert_eq!(1, update_count);
assert_eq!(
Some(&current_program_cost),
updated_program_costs.get(&program_key_1)
current_program_cost,
cost_model
.read()
.unwrap()
.find_instruction_cost(&program_key_1)
);
}
@ -345,6 +344,12 @@ mod tests {
// greater than the current instruction cost for the program. Should update with the
// new erroring compute costs
let cost_per_error = 1000;
// expected_cost = (mean + 2*std) of data points:
// [
// 100, // original program_cost
// 1000, // cost_per_error
// ]
let expected_cost = 289u64;
{
let errored_txs_compute_consumed = vec![cost_per_error; 3];
let total_errored_units = errored_txs_compute_consumed.iter().sum();
@ -358,26 +363,23 @@ mod tests {
total_errored_units,
},
);
let updated_program_costs =
let update_count =
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
// expected_cost = (mean + 2*std) of data points:
// [
// 100, // original program_cost
// 1000, // cost_per_error
// ]
let expected_cost = 1342u64;
assert_eq!(1, updated_program_costs.len());
assert_eq!(1, update_count);
assert_eq!(
Some(&expected_cost),
updated_program_costs.get(&program_key_1)
expected_cost,
cost_model
.read()
.unwrap()
.find_instruction_cost(&program_key_1)
);
}
// Test updating cost model with only erroring compute costs where the error cost is
// `smaller_cost_per_error`, less than the current instruction cost for the program.
// The cost should not decrease for these new lesser errors
let smaller_cost_per_error = cost_per_error - 10;
let smaller_cost_per_error = expected_cost - 10;
{
let errored_txs_compute_consumed = vec![smaller_cost_per_error; 3];
let total_errored_units = errored_txs_compute_consumed.iter().sum();
@ -391,20 +393,23 @@ mod tests {
total_errored_units,
},
);
let updated_program_costs =
let update_count =
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
// expected_cost = (mean = 2*std) of data points:
// [
// 100, // original program cost,
// 1000, // cost_per_error from above test
// 1450, // the smaller_cost_per_error will be coalesced to prev cost
// 289, // the smaller_cost_per_error will be coalesced to prev cost
// ]
let expected_cost = 1915u64;
assert_eq!(1, updated_program_costs.len());
let expected_cost = 293u64;
assert_eq!(1, update_count);
assert_eq!(
Some(&expected_cost),
updated_program_costs.get(&program_key_1)
expected_cost,
cost_model
.read()
.unwrap()
.find_instruction_cost(&program_key_1)
);
}
}

View File

@ -305,12 +305,8 @@ impl Tvu {
);
let (cost_update_sender, cost_update_receiver) = unbounded();
let cost_update_service = CostUpdateService::new(
exit.clone(),
blockstore.clone(),
cost_model.clone(),
cost_update_receiver,
);
let cost_update_service =
CostUpdateService::new(blockstore.clone(), cost_model.clone(), cost_update_receiver);
let (drop_bank_sender, drop_bank_receiver) = unbounded();