Parallelize entry processing in replay stage in validators (#2212)

* Parallelize entry processing in replay stage in validators

- single threaded entry processing is not utlizing CPU cores to the fullest

* fix tests and address review comments
This commit is contained in:
Pankaj Garg
2018-12-18 16:06:05 -08:00
committed by GitHub
parent a65022aed7
commit 974249f2a5
3 changed files with 107 additions and 25 deletions

View File

@ -638,6 +638,10 @@ impl Bank {
// if its a tick, execute the group and register the tick // if its a tick, execute the group and register the tick
self.par_execute_entries(&mt_group)?; self.par_execute_entries(&mt_group)?;
self.register_tick(&entry.id); self.register_tick(&entry.id);
self.leader_scheduler
.write()
.unwrap()
.update_height(self.tick_height(), self);
mt_group = vec![]; mt_group = vec![];
continue; continue;
} }
@ -649,6 +653,7 @@ impl Bank {
self.par_execute_entries(&mt_group)?; self.par_execute_entries(&mt_group)?;
mt_group = vec![]; mt_group = vec![];
//reset the lock and push the entry //reset the lock and push the entry
self.unlock_accounts(&entry.transactions, &locked);
let locked = self.lock_accounts(&entry.transactions); let locked = self.lock_accounts(&entry.transactions);
mt_group.push((entry, locked)); mt_group.push((entry, locked));
} else { } else {
@ -1584,6 +1589,58 @@ mod tests {
assert_eq!(bank.last_id(), last_id); assert_eq!(bank.last_id(), last_id);
} }
#[test] #[test]
fn test_par_process_entries_2_txes_collision() {
let mint = Mint::new(1000);
let bank = Bank::new(&mint);
let keypair1 = Keypair::new();
let keypair2 = Keypair::new();
let keypair3 = Keypair::new();
println!("KP1 {:?}", keypair1.pubkey());
println!("KP2 {:?}", keypair2.pubkey());
println!("KP3 {:?}", keypair3.pubkey());
println!("Mint {:?}", mint.keypair().pubkey());
// fund: put 4 in each of 1 and 2
assert_matches!(
bank.transfer(4, &mint.keypair(), keypair1.pubkey(), bank.last_id()),
Ok(_)
);
assert_matches!(
bank.transfer(4, &mint.keypair(), keypair2.pubkey(), bank.last_id()),
Ok(_)
);
// construct an Entry whose 2nd transaction would cause a lock conflict with previous entry
let entry_1_to_mint = next_entry(
&bank.last_id(),
1,
vec![Transaction::system_new(
&keypair1,
mint.keypair().pubkey(),
1,
bank.last_id(),
)],
);
let entry_2_to_3_mint_to_1 = next_entry(
&entry_1_to_mint.id,
1,
vec![
Transaction::system_new(&keypair2, keypair3.pubkey(), 2, bank.last_id()), // should be fine
Transaction::system_new(&keypair1, mint.keypair().pubkey(), 2, bank.last_id()), // will collide
],
);
assert_eq!(
bank.par_process_entries(&[entry_1_to_mint, entry_2_to_3_mint_to_1]),
Ok(())
);
assert_eq!(bank.get_balance(&keypair1.pubkey()), 1);
assert_eq!(bank.get_balance(&keypair2.pubkey()), 2);
assert_eq!(bank.get_balance(&keypair3.pubkey()), 2);
}
#[test]
fn test_par_process_entries_2_entries_par() { fn test_par_process_entries_2_entries_par() {
let mint = Mint::new(1000); let mint = Mint::new(1000);
let bank = Bank::new(&mint); let bank = Bank::new(&mint);

View File

@ -977,7 +977,7 @@ mod tests {
let ledger_initial_len = genesis_entries.len() as u64 + active_set_entries_len; let ledger_initial_len = genesis_entries.len() as u64 + active_set_entries_len;
// Set the leader scheduler for the validator // Set the leader scheduler for the validator
let leader_rotation_interval = 10; let leader_rotation_interval = 16;
let num_bootstrap_slots = 2; let num_bootstrap_slots = 2;
let bootstrap_height = num_bootstrap_slots * leader_rotation_interval; let bootstrap_height = num_bootstrap_slots * leader_rotation_interval;

View File

@ -101,36 +101,61 @@ impl ReplayStage {
.get_current_leader() .get_current_leader()
.expect("Scheduled leader should be calculated by this point"); .expect("Scheduled leader should be calculated by this point");
let my_id = keypair.pubkey(); let my_id = keypair.pubkey();
// Next vote tick is ceiling of (current tick/ticks per block)
let mut num_ticks_to_next_vote = BLOCK_TICK_COUNT - (bank.tick_height() % BLOCK_TICK_COUNT);
let mut start_entry_index = 0;
for (i, entry) in entries.iter().enumerate() { for (i, entry) in entries.iter().enumerate() {
res = bank.process_entry(&entry); inc_new_counter_info!("replicate-stage_bank-tick", bank.tick_height() as usize);
if res.is_err() { if entry.is_tick() {
// TODO: This will return early from the first entry that has an erroneous num_ticks_to_next_vote -= 1;
// transaction, instead of processing the rest of the entries in the vector
// of received entries. This is in line with previous behavior when
// bank.process_entries() was used to process the entries, but doesn't solve the
// issue that the bank state was still changed, leading to inconsistencies with the
// leader as the leader currently should not be publishing erroneous transactions
break;
} }
inc_new_counter_info!(
"replicate-stage_tick-to-vote",
num_ticks_to_next_vote as usize
);
// If it's the last entry in the vector, i will be vec len - 1.
// If we don't process the entry now, the for loop will exit and the entry
// will be dropped.
if 0 == num_ticks_to_next_vote || (i + 1) == entries.len() {
res = bank.process_entries(&entries[start_entry_index..=i]);
if bank.tick_height() % BLOCK_TICK_COUNT == 0 { if res.is_err() {
if let Some(sender) = vote_blob_sender { // TODO: This will return early from the first entry that has an erroneous
send_validator_vote(bank, vote_account_keypair, &cluster_info, sender).unwrap(); // transaction, instead of processing the rest of the entries in the vector
// of received entries. This is in line with previous behavior when
// bank.process_entries() was used to process the entries, but doesn't solve the
// issue that the bank state was still changed, leading to inconsistencies with the
// leader as the leader currently should not be publishing erroneous transactions
inc_new_counter_info!(
"replicate-stage_failed_process_entries",
(i - start_entry_index)
);
break;
} }
}
let (scheduled_leader, _) = bank if 0 == num_ticks_to_next_vote {
.get_current_leader() if let Some(sender) = vote_blob_sender {
.expect("Scheduled leader should be calculated by this point"); send_validator_vote(bank, vote_account_keypair, &cluster_info, sender)
.unwrap();
}
}
let (scheduled_leader, _) = bank
.get_current_leader()
.expect("Scheduled leader should be calculated by this point");
// TODO: Remove this soon once we boot the leader from ClusterInfo // TODO: Remove this soon once we boot the leader from ClusterInfo
if scheduled_leader != current_leader { if scheduled_leader != current_leader {
cluster_info.write().unwrap().set_leader(scheduled_leader); cluster_info.write().unwrap().set_leader(scheduled_leader);
} }
if my_id == scheduled_leader { if my_id == scheduled_leader {
num_entries_to_write = i + 1; num_entries_to_write = i + 1;
break; break;
}
start_entry_index = i + 1;
num_ticks_to_next_vote = BLOCK_TICK_COUNT;
} }
} }
@ -314,7 +339,7 @@ mod test {
// Set up the LeaderScheduler so that this this node becomes the leader at // Set up the LeaderScheduler so that this this node becomes the leader at
// bootstrap_height = num_bootstrap_slots * leader_rotation_interval // bootstrap_height = num_bootstrap_slots * leader_rotation_interval
let leader_rotation_interval = 10; let leader_rotation_interval = 16;
let num_bootstrap_slots = 2; let num_bootstrap_slots = 2;
let bootstrap_height = num_bootstrap_slots * leader_rotation_interval; let bootstrap_height = num_bootstrap_slots * leader_rotation_interval;
let leader_scheduler_config = LeaderSchedulerConfig::new( let leader_scheduler_config = LeaderSchedulerConfig::new(