Revert "1. Persist to blockstore less frequently;"

This reverts commit 7aa1fb4e24.
This commit is contained in:
Carl Lin
2022-03-08 17:59:59 -05:00
committed by Michael Vines
parent 0a17edcc1f
commit c878c9e2cb
4 changed files with 176 additions and 232 deletions

View File

@ -9,16 +9,18 @@ use {
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_program_runtime::timings::ExecuteTimings, solana_program_runtime::timings::ExecuteTimings,
solana_runtime::{bank::Bank, cost_model::CostModel}, solana_runtime::{bank::Bank, cost_model::CostModel},
solana_sdk::timing::timestamp, solana_sdk::{pubkey::Pubkey, timing::timestamp},
std::{ std::{
sync::{Arc, RwLock}, collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Duration,
}, },
}; };
// Update blockstore persistence storage when accumulated cost_table updates count exceeds the threshold
const PERSIST_THRESHOLD: u64 = 1_000;
#[derive(Default)] #[derive(Default)]
pub struct CostUpdateServiceTiming { pub struct CostUpdateServiceTiming {
last_print: u64, last_print: u64,
@ -30,25 +32,20 @@ pub struct CostUpdateServiceTiming {
impl CostUpdateServiceTiming { impl CostUpdateServiceTiming {
fn update( fn update(
&mut self, &mut self,
update_cost_model_count: Option<u64>, update_cost_model_count: u64,
update_cost_model_elapsed: Option<u64>, update_cost_model_elapsed: u64,
persist_cost_table_elapsed: Option<u64>, persist_cost_table_elapsed: u64,
) { ) {
if let Some(update_cost_model_count) = update_cost_model_count { self.update_cost_model_count += update_cost_model_count;
self.update_cost_model_count += update_cost_model_count; self.update_cost_model_elapsed += update_cost_model_elapsed;
} self.persist_cost_table_elapsed += persist_cost_table_elapsed;
if let Some(update_cost_model_elapsed) = update_cost_model_elapsed {
self.update_cost_model_elapsed += update_cost_model_elapsed;
}
if let Some(persist_cost_table_elapsed) = persist_cost_table_elapsed {
self.persist_cost_table_elapsed += persist_cost_table_elapsed;
}
let now = timestamp(); let now = timestamp();
let elapsed_ms = now - self.last_print; let elapsed_ms = now - self.last_print;
if elapsed_ms > 1000 { if elapsed_ms > 1000 {
datapoint_info!( datapoint_info!(
"cost-update-service-stats", "cost-update-service-stats",
("total_elapsed_us", elapsed_ms * 1000, i64),
( (
"update_cost_model_count", "update_cost_model_count",
self.update_cost_model_count as i64, self.update_cost_model_count as i64,
@ -90,6 +87,7 @@ pub struct CostUpdateService {
impl CostUpdateService { impl CostUpdateService {
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new( pub fn new(
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
cost_model: Arc<RwLock<CostModel>>, cost_model: Arc<RwLock<CostModel>>,
cost_update_receiver: CostUpdateReceiver, cost_update_receiver: CostUpdateReceiver,
@ -97,7 +95,7 @@ impl CostUpdateService {
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-cost-update-service".to_string()) .name("solana-cost-update-service".to_string())
.spawn(move || { .spawn(move || {
Self::service_loop(blockstore, cost_model, cost_update_receiver); Self::service_loop(exit, blockstore, cost_model, cost_update_receiver);
}) })
.unwrap(); .unwrap();
@ -109,53 +107,58 @@ impl CostUpdateService {
} }
fn service_loop( fn service_loop(
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
cost_model: Arc<RwLock<CostModel>>, cost_model: Arc<RwLock<CostModel>>,
cost_update_receiver: CostUpdateReceiver, cost_update_receiver: CostUpdateReceiver,
) { ) {
let mut cost_update_service_timing = CostUpdateServiceTiming::default(); let mut cost_update_service_timing = CostUpdateServiceTiming::default();
let mut update_count = 0_u64; let mut update_count: u64;
let mut updated_program_costs = HashMap::<Pubkey, u64>::new();
let wait_timer = Duration::from_millis(100);
for cost_update in cost_update_receiver.iter() { loop {
match cost_update { if exit.load(Ordering::Relaxed) {
CostUpdate::FrozenBank { bank } => { break;
bank.read_cost_tracker().unwrap().report_stats(bank.slot()); }
}
CostUpdate::ExecuteTiming {
mut execute_timings,
} => {
let mut update_cost_model_time = Measure::start("update_cost_model_time");
update_count += Self::update_cost_model(&cost_model, &mut execute_timings);
update_cost_model_time.stop();
cost_update_service_timing.update(
Some(update_count),
Some(update_cost_model_time.as_us()),
None,
);
if update_count > PERSIST_THRESHOLD { update_count = 0_u64;
let mut persist_cost_table_time = Measure::start("persist_cost_table_time"); let mut update_cost_model_time = Measure::start("update_cost_model_time");
Self::persist_cost_table(&blockstore, &cost_model); for cost_update in cost_update_receiver.try_iter() {
update_count = 0_u64; match cost_update {
persist_cost_table_time.stop(); CostUpdate::FrozenBank { bank } => {
cost_update_service_timing.update( bank.read_cost_tracker().unwrap().report_stats(bank.slot());
None, }
None, CostUpdate::ExecuteTiming {
Some(persist_cost_table_time.as_us()), mut execute_timings,
); } => {
updated_program_costs =
Self::update_cost_model(&cost_model, &mut execute_timings);
update_count += 1;
} }
} }
} }
update_cost_model_time.stop();
let mut persist_cost_table_time = Measure::start("persist_cost_table_time");
Self::persist_cost_table(&blockstore, &updated_program_costs);
persist_cost_table_time.stop();
cost_update_service_timing.update(
update_count,
update_cost_model_time.as_us(),
persist_cost_table_time.as_us(),
);
thread::sleep(wait_timer);
} }
} }
// Normalize `program_timings` with current estimated cost, update instruction_cost table
// Returns number of updates applied
fn update_cost_model( fn update_cost_model(
cost_model: &RwLock<CostModel>, cost_model: &RwLock<CostModel>,
execute_timings: &mut ExecuteTimings, execute_timings: &mut ExecuteTimings,
) -> u64 { ) -> HashMap<Pubkey, u64> {
let mut update_count = 0_u64; let mut updated_program_costs = HashMap::<Pubkey, u64>::new();
for (program_id, program_timings) in &mut execute_timings.details.per_program_timings { for (program_id, program_timings) in &mut execute_timings.details.per_program_timings {
let current_estimated_program_cost = let current_estimated_program_cost =
cost_model.read().unwrap().find_instruction_cost(program_id); cost_model.read().unwrap().find_instruction_cost(program_id);
@ -166,42 +169,50 @@ impl CostUpdateService {
} }
let units = program_timings.accumulated_units / program_timings.count as u64; let units = program_timings.accumulated_units / program_timings.count as u64;
cost_model match cost_model
.write() .write()
.unwrap() .unwrap()
.upsert_instruction_cost(program_id, units); .upsert_instruction_cost(program_id, units)
update_count += 1; {
debug!( Ok(cost) => {
"After replayed into bank, updated cost for instruction {:?}, update_value {}, pre_aggregated_value {}", debug!(
program_id, units, current_estimated_program_cost "after replayed into bank, instruction {:?} has averaged cost {}",
); program_id, cost
);
updated_program_costs.insert(*program_id, cost);
}
Err(err) => {
debug!(
"after replayed into bank, instruction {:?} failed to update cost, err: {}",
program_id, err
);
}
}
} }
update_count updated_program_costs
} }
// 1. Remove obsolete program entries from persisted table to limit its size fn persist_cost_table(blockstore: &Blockstore, updated_program_costs: &HashMap<Pubkey, u64>) {
// 2. Update persisted program cost. This involves EMA cost calculation at if updated_program_costs.is_empty() {
// execute_cost_table.get_cost() return;
fn persist_cost_table(blockstore: &Blockstore, cost_model: &RwLock<CostModel>) { }
let db_records = blockstore.read_program_costs().expect("read programs"); let db_records = blockstore.read_program_costs().expect("read programs");
let cost_model = cost_model.read().unwrap();
let active_program_keys = cost_model.get_program_keys();
// delete records from blockstore if they are no longer in cost_table // delete records from blockstore if they are no longer in cost_table
db_records.iter().for_each(|(pubkey, _)| { db_records.iter().for_each(|(pubkey, _)| {
if !active_program_keys.contains(&pubkey) { if !updated_program_costs.contains_key(pubkey) {
blockstore blockstore
.delete_program_cost(pubkey) .delete_program_cost(pubkey)
.expect("delete old program"); .expect("delete old program");
} }
}); });
active_program_keys.iter().for_each(|program_id| { for (key, cost) in updated_program_costs.iter() {
let cost = cost_model.find_instruction_cost(program_id);
blockstore blockstore
.write_program_cost(program_id, &cost) .write_program_cost(key, cost)
.expect("persist program costs to blockstore"); .expect("persist program costs to blockstore");
}); }
} }
} }
@ -213,9 +224,9 @@ mod tests {
fn test_update_cost_model_with_empty_execute_timings() { fn test_update_cost_model_with_empty_execute_timings() {
let cost_model = Arc::new(RwLock::new(CostModel::default())); let cost_model = Arc::new(RwLock::new(CostModel::default()));
let mut empty_execute_timings = ExecuteTimings::default(); let mut empty_execute_timings = ExecuteTimings::default();
assert_eq!( assert!(
CostUpdateService::update_cost_model(&cost_model, &mut empty_execute_timings), CostUpdateService::update_cost_model(&cost_model, &mut empty_execute_timings)
0 .is_empty()
); );
} }
@ -245,15 +256,12 @@ mod tests {
total_errored_units, total_errored_units,
}, },
); );
let update_count = let updated_program_costs =
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings); CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
assert_eq!(1, update_count); assert_eq!(1, updated_program_costs.len());
assert_eq!( assert_eq!(
expected_cost, Some(&expected_cost),
cost_model updated_program_costs.get(&program_key_1)
.read()
.unwrap()
.find_instruction_cost(&program_key_1)
); );
} }
@ -262,8 +270,8 @@ mod tests {
let accumulated_us: u64 = 2000; let accumulated_us: u64 = 2000;
let accumulated_units: u64 = 200; let accumulated_units: u64 = 200;
let count: u32 = 10; let count: u32 = 10;
// to expect new cost = (mean + 2 * std) of [10, 20] // to expect new cost = (mean + 2 * std)
expected_cost = 13; expected_cost = 24;
execute_timings.details.per_program_timings.insert( execute_timings.details.per_program_timings.insert(
program_key_1, program_key_1,
@ -275,15 +283,12 @@ mod tests {
total_errored_units: 0, total_errored_units: 0,
}, },
); );
let update_count = let updated_program_costs =
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings); CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
assert_eq!(1, update_count); assert_eq!(1, updated_program_costs.len());
assert_eq!( assert_eq!(
expected_cost, Some(&expected_cost),
cost_model updated_program_costs.get(&program_key_1)
.read()
.unwrap()
.find_instruction_cost(&program_key_1)
); );
} }
} }
@ -309,9 +314,8 @@ mod tests {
); );
// If both the `errored_txs_compute_consumed` is empty and `count == 0`, then // If both the `errored_txs_compute_consumed` is empty and `count == 0`, then
// nothing should be inserted into the cost model // nothing should be inserted into the cost model
assert_eq!( assert!(
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings), CostUpdateService::update_cost_model(&cost_model, &mut execute_timings).is_empty()
0
); );
} }
@ -328,15 +332,12 @@ mod tests {
total_errored_units: 0, total_errored_units: 0,
}, },
); );
let update_count = let updated_program_costs =
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings); CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
assert_eq!(1, update_count); assert_eq!(1, updated_program_costs.len());
assert_eq!( assert_eq!(
current_program_cost, Some(&current_program_cost),
cost_model updated_program_costs.get(&program_key_1)
.read()
.unwrap()
.find_instruction_cost(&program_key_1)
); );
} }
@ -344,12 +345,6 @@ mod tests {
// greater than the current instruction cost for the program. Should update with the // greater than the current instruction cost for the program. Should update with the
// new erroring compute costs // new erroring compute costs
let cost_per_error = 1000; let cost_per_error = 1000;
// expected_cost = (mean + 2*std) of data points:
// [
// 100, // original program_cost
// 1000, // cost_per_error
// ]
let expected_cost = 289u64;
{ {
let errored_txs_compute_consumed = vec![cost_per_error; 3]; let errored_txs_compute_consumed = vec![cost_per_error; 3];
let total_errored_units = errored_txs_compute_consumed.iter().sum(); let total_errored_units = errored_txs_compute_consumed.iter().sum();
@ -363,23 +358,26 @@ mod tests {
total_errored_units, total_errored_units,
}, },
); );
let update_count = let updated_program_costs =
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings); CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
assert_eq!(1, update_count); // expected_cost = (mean + 2*std) of data points:
// [
// 100, // original program_cost
// 1000, // cost_per_error
// ]
let expected_cost = 1342u64;
assert_eq!(1, updated_program_costs.len());
assert_eq!( assert_eq!(
expected_cost, Some(&expected_cost),
cost_model updated_program_costs.get(&program_key_1)
.read()
.unwrap()
.find_instruction_cost(&program_key_1)
); );
} }
// Test updating cost model with only erroring compute costs where the error cost is // Test updating cost model with only erroring compute costs where the error cost is
// `smaller_cost_per_error`, less than the current instruction cost for the program. // `smaller_cost_per_error`, less than the current instruction cost for the program.
// The cost should not decrease for these new lesser errors // The cost should not decrease for these new lesser errors
let smaller_cost_per_error = expected_cost - 10; let smaller_cost_per_error = cost_per_error - 10;
{ {
let errored_txs_compute_consumed = vec![smaller_cost_per_error; 3]; let errored_txs_compute_consumed = vec![smaller_cost_per_error; 3];
let total_errored_units = errored_txs_compute_consumed.iter().sum(); let total_errored_units = errored_txs_compute_consumed.iter().sum();
@ -393,23 +391,20 @@ mod tests {
total_errored_units, total_errored_units,
}, },
); );
let update_count = let updated_program_costs =
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings); CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
// expected_cost = (mean = 2*std) of data points: // expected_cost = (mean = 2*std) of data points:
// [ // [
// 100, // original program cost, // 100, // original program cost,
// 1000, // cost_per_error from above test // 1000, // cost_per_error from above test
// 289, // the smaller_cost_per_error will be coalesced to prev cost // 1450, // the smaller_cost_per_error will be coalesced to prev cost
// ] // ]
let expected_cost = 293u64; let expected_cost = 1915u64;
assert_eq!(1, update_count); assert_eq!(1, updated_program_costs.len());
assert_eq!( assert_eq!(
expected_cost, Some(&expected_cost),
cost_model updated_program_costs.get(&program_key_1)
.read()
.unwrap()
.find_instruction_cost(&program_key_1)
); );
} }
} }

View File

@ -309,8 +309,12 @@ impl Tvu {
); );
let (cost_update_sender, cost_update_receiver) = unbounded(); let (cost_update_sender, cost_update_receiver) = unbounded();
let cost_update_service = let cost_update_service = CostUpdateService::new(
CostUpdateService::new(blockstore.clone(), cost_model.clone(), cost_update_receiver); exit.clone(),
blockstore.clone(),
cost_model.clone(),
cost_update_receiver,
);
let (drop_bank_sender, drop_bank_receiver) = unbounded(); let (drop_bank_sender, drop_bank_receiver) = unbounded();

View File

@ -96,11 +96,17 @@ impl CostModel {
tx_cost tx_cost
} }
// update-or-insert op is always successful. However the result of upsert, eg the aggregated pub fn upsert_instruction_cost(
// value, requires additional calculation, which should only be envoked when needed. &mut self,
pub fn upsert_instruction_cost(&mut self, program_key: &Pubkey, cost: u64) { program_key: &Pubkey,
cost: u64,
) -> Result<u64, &'static str> {
self.instruction_execution_cost_table self.instruction_execution_cost_table
.upsert(program_key, cost); .upsert(program_key, cost);
match self.instruction_execution_cost_table.get_cost(program_key) {
Some(cost) => Ok(cost),
None => Err("failed to upsert to ExecuteCostTable"),
}
} }
pub fn find_instruction_cost(&self, program_key: &Pubkey) -> u64 { pub fn find_instruction_cost(&self, program_key: &Pubkey) -> u64 {
@ -109,7 +115,7 @@ impl CostModel {
None => { None => {
let default_value = self.instruction_execution_cost_table.get_default(); let default_value = self.instruction_execution_cost_table.get_default();
debug!( debug!(
"instruction {:?} does not have aggregated cost, using default {}", "Program key {:?} does not have assigned cost, using default value {}",
program_key, default_value program_key, default_value
); );
default_value default_value
@ -117,10 +123,6 @@ impl CostModel {
} }
} }
pub fn get_program_keys(&self) -> Vec<&Pubkey> {
self.instruction_execution_cost_table.get_program_keys()
}
fn get_signature_cost(&self, transaction: &SanitizedTransaction) -> u64 { fn get_signature_cost(&self, transaction: &SanitizedTransaction) -> u64 {
transaction.signatures().len() as u64 * SIGNATURE_COST transaction.signatures().len() as u64 * SIGNATURE_COST
} }
@ -244,7 +246,6 @@ mod tests {
transaction::Transaction, transaction::Transaction,
}, },
std::{ std::{
collections::HashMap,
str::FromStr, str::FromStr,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
thread::{self, JoinHandle}, thread::{self, JoinHandle},
@ -268,11 +269,13 @@ mod tests {
let mut testee = CostModel::default(); let mut testee = CostModel::default();
let known_key = Pubkey::from_str("known11111111111111111111111111111111111111").unwrap(); let known_key = Pubkey::from_str("known11111111111111111111111111111111111111").unwrap();
testee.upsert_instruction_cost(&known_key, 100); testee.upsert_instruction_cost(&known_key, 100).unwrap();
// find cost for known programs // find cost for known programs
assert_eq!(100, testee.find_instruction_cost(&known_key)); assert_eq!(100, testee.find_instruction_cost(&known_key));
testee.upsert_instruction_cost(&bpf_loader::id(), 1999); testee
.upsert_instruction_cost(&bpf_loader::id(), 1999)
.unwrap();
assert_eq!(1999, testee.find_instruction_cost(&bpf_loader::id())); assert_eq!(1999, testee.find_instruction_cost(&bpf_loader::id()));
// unknown program is assigned with default cost // unknown program is assigned with default cost
@ -284,35 +287,6 @@ mod tests {
); );
} }
#[test]
fn test_iterating_instruction_cost_by_program_keys() {
solana_logger::setup();
let mut testee = CostModel::default();
let mut test_key_and_cost = HashMap::<Pubkey, u64>::new();
(0u64..10u64).for_each(|n| {
test_key_and_cost.insert(Pubkey::new_unique(), n);
});
test_key_and_cost.iter().for_each(|(key, cost)| {
testee.upsert_instruction_cost(key, *cost);
info!("key {:?} cost {}", key, cost);
});
let keys = testee.get_program_keys();
// verify each key has pre-set value
keys.iter().for_each(|key| {
let expected_cost = test_key_and_cost.get(key).unwrap();
info!(
"check key {:?} expect {} find {}",
key,
expected_cost,
testee.find_instruction_cost(key)
);
assert_eq!(*expected_cost, testee.find_instruction_cost(key));
});
}
#[test] #[test]
fn test_cost_model_data_len_cost() { fn test_cost_model_data_len_cost() {
let lamports = 0; let lamports = 0;
@ -377,7 +351,9 @@ mod tests {
let expected_cost = 8; let expected_cost = 8;
let mut testee = CostModel::default(); let mut testee = CostModel::default();
testee.upsert_instruction_cost(&system_program::id(), expected_cost); testee
.upsert_instruction_cost(&system_program::id(), expected_cost)
.unwrap();
assert_eq!( assert_eq!(
expected_cost, expected_cost,
testee.get_transaction_cost(&simple_transaction) testee.get_transaction_cost(&simple_transaction)
@ -405,7 +381,9 @@ mod tests {
let expected_cost = program_cost * 2; let expected_cost = program_cost * 2;
let mut testee = CostModel::default(); let mut testee = CostModel::default();
testee.upsert_instruction_cost(&system_program::id(), program_cost); testee
.upsert_instruction_cost(&system_program::id(), program_cost)
.unwrap();
assert_eq!(expected_cost, testee.get_transaction_cost(&tx)); assert_eq!(expected_cost, testee.get_transaction_cost(&tx));
} }
@ -486,7 +464,7 @@ mod tests {
); );
// insert instruction cost to table // insert instruction cost to table
cost_model.upsert_instruction_cost(&key1, cost1); assert!(cost_model.upsert_instruction_cost(&key1, cost1).is_ok());
// now it is known insturction with known cost // now it is known insturction with known cost
assert_eq!(cost1, cost_model.find_instruction_cost(&key1)); assert_eq!(cost1, cost_model.find_instruction_cost(&key1));
@ -506,7 +484,9 @@ mod tests {
let expected_execution_cost = 8; let expected_execution_cost = 8;
let mut cost_model = CostModel::default(); let mut cost_model = CostModel::default();
cost_model.upsert_instruction_cost(&system_program::id(), expected_execution_cost); cost_model
.upsert_instruction_cost(&system_program::id(), expected_execution_cost)
.unwrap();
let tx_cost = cost_model.calculate_cost(&tx); let tx_cost = cost_model.calculate_cost(&tx);
assert_eq!(expected_account_cost, tx_cost.write_lock_cost); assert_eq!(expected_account_cost, tx_cost.write_lock_cost);
assert_eq!(expected_execution_cost, tx_cost.execution_cost); assert_eq!(expected_execution_cost, tx_cost.execution_cost);
@ -518,17 +498,17 @@ mod tests {
let key1 = Pubkey::new_unique(); let key1 = Pubkey::new_unique();
let cost1 = 100; let cost1 = 100;
let cost2 = 200; let cost2 = 200;
// updated_cost = (mean + 2*std) of [100, 200] => 120.899 // updated_cost = (mean + 2*std)
let updated_cost = 121; let updated_cost = 238;
let mut cost_model = CostModel::default(); let mut cost_model = CostModel::default();
// insert instruction cost to table // insert instruction cost to table
cost_model.upsert_instruction_cost(&key1, cost1); assert!(cost_model.upsert_instruction_cost(&key1, cost1).is_ok());
assert_eq!(cost1, cost_model.find_instruction_cost(&key1)); assert_eq!(cost1, cost_model.find_instruction_cost(&key1));
// update instruction cost // update instruction cost
cost_model.upsert_instruction_cost(&key1, cost2); assert!(cost_model.upsert_instruction_cost(&key1, cost2).is_ok());
assert_eq!(updated_cost, cost_model.find_instruction_cost(&key1)); assert_eq!(updated_cost, cost_model.find_instruction_cost(&key1));
} }
@ -570,8 +550,8 @@ mod tests {
if i == 5 { if i == 5 {
thread::spawn(move || { thread::spawn(move || {
let mut cost_model = cost_model.write().unwrap(); let mut cost_model = cost_model.write().unwrap();
cost_model.upsert_instruction_cost(&prog1, cost1); assert!(cost_model.upsert_instruction_cost(&prog1, cost1).is_ok());
cost_model.upsert_instruction_cost(&prog2, cost2); assert!(cost_model.upsert_instruction_cost(&prog2, cost2).is_ok());
}) })
} else { } else {
thread::spawn(move || { thread::spawn(move || {

View File

@ -4,10 +4,7 @@
/// When its capacity limit is reached, it prunes old and less-used programs /// When its capacity limit is reached, it prunes old and less-used programs
/// to make room for new ones. /// to make room for new ones.
use log::*; use log::*;
use { use {solana_sdk::pubkey::Pubkey, std::collections::HashMap};
solana_sdk::pubkey::Pubkey,
std::collections::{hash_map::Entry, HashMap},
};
// prune is rather expensive op, free up bulk space in each operation // prune is rather expensive op, free up bulk space in each operation
// would be more efficient. PRUNE_RATIO defines the after prune table // would be more efficient. PRUNE_RATIO defines the after prune table
@ -21,8 +18,7 @@ const DEFAULT_CAPACITY: usize = 1024;
// The coefficient represents the degree of weighting decrease in EMA, // The coefficient represents the degree of weighting decrease in EMA,
// a constant smoothing factor between 0 and 1. A higher alpha // a constant smoothing factor between 0 and 1. A higher alpha
// discounts older observations faster. // discounts older observations faster.
// Setting it using `2/(N+1)` where N is 200 samples const COEFFICIENT: f64 = 0.4;
const COEFFICIENT: f64 = 0.01;
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct AggregatedVarianceStats { struct AggregatedVarianceStats {
@ -57,27 +53,19 @@ impl ExecuteCostTable {
self.table.len() self.table.len()
} }
// default program cost to max // default prorgam cost to max
pub fn get_default(&self) -> u64 { pub fn get_default(&self) -> u64 {
// default max compute units per program // default max comoute units per program
200_000u64 200_000u64
} }
// returns None if program doesn't exist in table. In this case, // returns None if program doesn't exist in table. In this case,
// it is advised to call `get_default()` for default program cost. // it is advised to call `get_default()` for default program costdefault/
// Program cost is estimated as 2 standard deviations above mean, eg // Program cost is estimated as 2 standard deviations above mean, eg
// cost = (mean + 2 * std) // cost = (mean + 2 * std)
pub fn get_cost(&self, key: &Pubkey) -> Option<u64> { pub fn get_cost(&self, key: &Pubkey) -> Option<u64> {
let aggregated = self.table.get(key)?; let aggregated = self.table.get(key)?;
let cost_f64 = (aggregated.ema + 2.0 * aggregated.ema_var.sqrt()).ceil(); Some((aggregated.ema + 2.0 * aggregated.ema_var.sqrt()).ceil() as u64)
// check if cost:f64 can be losslessly convert to u64, otherwise return None
let cost_u64 = cost_f64 as u64;
if cost_f64 == cost_u64 as f64 {
Some(cost_u64)
} else {
None
}
} }
pub fn upsert(&mut self, key: &Pubkey, value: u64) { pub fn upsert(&mut self, key: &Pubkey, value: u64) {
@ -89,21 +77,21 @@ impl ExecuteCostTable {
// exponential moving average algorithm // exponential moving average algorithm
// https://en.wikipedia.org/wiki/Moving_average#Exponentially_weighted_moving_variance_and_standard_deviation // https://en.wikipedia.org/wiki/Moving_average#Exponentially_weighted_moving_variance_and_standard_deviation
match self.table.entry(*key) { if self.table.contains_key(key) {
Entry::Occupied(mut entry) => { let aggregated = self.table.get_mut(key).unwrap();
let aggregated = entry.get_mut(); let theta = value as f64 - aggregated.ema;
let theta = value as f64 - aggregated.ema; aggregated.ema += theta * COEFFICIENT;
aggregated.ema += theta * COEFFICIENT; aggregated.ema_var =
aggregated.ema_var = (1.0 - COEFFICIENT) * (aggregated.ema_var + COEFFICIENT * theta * theta)
(1.0 - COEFFICIENT) * (aggregated.ema_var + COEFFICIENT * theta * theta); } else {
} // the starting values
Entry::Vacant(entry) => { self.table.insert(
// the starting values *key,
entry.insert(AggregatedVarianceStats { AggregatedVarianceStats {
ema: value as f64, ema: value as f64,
ema_var: 0.0, ema_var: 0.0,
}); },
} );
} }
let (count, timestamp) = self let (count, timestamp) = self
@ -114,10 +102,6 @@ impl ExecuteCostTable {
*timestamp = Self::micros_since_epoch(); *timestamp = Self::micros_since_epoch();
} }
pub fn get_program_keys(&self) -> Vec<&Pubkey> {
self.table.keys().collect()
}
// prune the old programs so the table contains `new_size` of records, // prune the old programs so the table contains `new_size` of records,
// where `old` is defined as weighted age, which is negatively correlated // where `old` is defined as weighted age, which is negatively correlated
// with program's age and // with program's age and
@ -205,9 +189,9 @@ mod tests {
let key2 = Pubkey::new_unique(); let key2 = Pubkey::new_unique();
let key3 = Pubkey::new_unique(); let key3 = Pubkey::new_unique();
// simulate a lot of occurrences to key1, so even there're longer than // simulate a lot of occurences to key1, so even there're longer than
// usual delay between upsert(key1..) and upsert(key2, ..), test // usual delay between upsert(key1..) and upsert(key2, ..), test
// would still satisfy as key1 has enough occurrences to compensate // would still satisfy as key1 has enough occurences to compensate
// its age. // its age.
for i in 0..1000 { for i in 0..1000 {
testee.upsert(&key1, i); testee.upsert(&key1, i);
@ -251,8 +235,8 @@ mod tests {
// update 1st record // update 1st record
testee.upsert(&key1, cost2); testee.upsert(&key1, cost2);
assert_eq!(2, testee.get_count()); assert_eq!(2, testee.get_count());
// expected key1 cost is EMA of [100, 110] with alpha=0.01 => 103 // expected key1 cost = (mean + 2*std) = (105 + 2*5) = 115
let expected_cost = 103; let expected_cost = 114;
assert_eq!(expected_cost, testee.get_cost(&key1).unwrap()); assert_eq!(expected_cost, testee.get_cost(&key1).unwrap());
assert_eq!(cost2, testee.get_cost(&key2).unwrap()); assert_eq!(cost2, testee.get_cost(&key2).unwrap());
} }
@ -296,29 +280,10 @@ mod tests {
testee.upsert(&key4, cost4); testee.upsert(&key4, cost4);
assert_eq!(2, testee.get_count()); assert_eq!(2, testee.get_count());
assert!(testee.get_cost(&key1).is_none()); assert!(testee.get_cost(&key1).is_none());
// expected key2 cost = (mean + 2*std) of [110, 100] => 112 // expected key2 cost = (mean + 2*std) = (105 + 2*5) = 115
let expected_cost_2 = 112; let expected_cost_2 = 116;
assert_eq!(expected_cost_2, testee.get_cost(&key2).unwrap()); assert_eq!(expected_cost_2, testee.get_cost(&key2).unwrap());
assert!(testee.get_cost(&key3).is_none()); assert!(testee.get_cost(&key3).is_none());
assert_eq!(cost4, testee.get_cost(&key4).unwrap()); assert_eq!(cost4, testee.get_cost(&key4).unwrap());
} }
#[test]
fn test_get_cost_overflow_u64() {
solana_logger::setup();
let mut testee = ExecuteCostTable::default();
let key1 = Pubkey::new_unique();
let cost1: u64 = f64::MAX as u64;
let cost2: u64 = u64::MAX / 2; // create large variance so the final result will overflow
// insert one record
testee.upsert(&key1, cost1);
assert_eq!(1, testee.get_count());
assert_eq!(cost1, testee.get_cost(&key1).unwrap());
// update cost
testee.upsert(&key1, cost2);
assert!(testee.get_cost(&key1).is_none());
}
} }