Move last_ids to a simple Hash, unwrap from Arc<RwLock>>
This commit is contained in:
@@ -62,7 +62,7 @@ impl ReplayStage {
|
|||||||
voting_keypair: &Option<Arc<T>>,
|
voting_keypair: &Option<Arc<T>>,
|
||||||
ledger_entry_sender: &EntrySender,
|
ledger_entry_sender: &EntrySender,
|
||||||
current_blob_index: &mut u64,
|
current_blob_index: &mut u64,
|
||||||
last_entry_id: &Arc<RwLock<Hash>>,
|
last_entry_id: &mut Hash,
|
||||||
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
||||||
subscriptions: &Arc<RpcSubscriptions>,
|
subscriptions: &Arc<RpcSubscriptions>,
|
||||||
slot: u64,
|
slot: u64,
|
||||||
@@ -79,7 +79,7 @@ impl ReplayStage {
|
|||||||
let mut num_entries_to_write = entries.len();
|
let mut num_entries_to_write = entries.len();
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
if !entries.as_slice().verify(&last_entry_id.read().unwrap()) {
|
if !entries.as_slice().verify(last_entry_id) {
|
||||||
inc_new_counter_info!("replicate_stage-verify-fail", entries.len());
|
inc_new_counter_info!("replicate_stage-verify-fail", entries.len());
|
||||||
return Err(Error::BlobError(BlobError::VerificationFailed));
|
return Err(Error::BlobError(BlobError::VerificationFailed));
|
||||||
}
|
}
|
||||||
@@ -155,7 +155,7 @@ impl ReplayStage {
|
|||||||
// If leader rotation happened, only write the entries up to leader rotation.
|
// If leader rotation happened, only write the entries up to leader rotation.
|
||||||
entries.truncate(num_entries_to_write);
|
entries.truncate(num_entries_to_write);
|
||||||
entries_with_meta.truncate(num_entries_to_write);
|
entries_with_meta.truncate(num_entries_to_write);
|
||||||
*last_entry_id.write().unwrap() = entries
|
*last_entry_id = entries
|
||||||
.last()
|
.last()
|
||||||
.expect("Entries cannot be empty at this point")
|
.expect("Entries cannot be empty at this point")
|
||||||
.id;
|
.id;
|
||||||
@@ -206,7 +206,7 @@ impl ReplayStage {
|
|||||||
let subscriptions_ = subscriptions.clone();
|
let subscriptions_ = subscriptions.clone();
|
||||||
|
|
||||||
// Gather up all the metadata about the current state of the ledger
|
// Gather up all the metadata about the current state of the ledger
|
||||||
let (mut bank, tick_height, last_entry_id, mut current_blob_index) = {
|
let (mut bank, tick_height, mut last_entry_id, mut current_blob_index) = {
|
||||||
let mut bank_forks = bank_forks.write().unwrap();
|
let mut bank_forks = bank_forks.write().unwrap();
|
||||||
bank_forks.set_working_bank_id(bank_forks_info[0].bank_id);
|
bank_forks.set_working_bank_id(bank_forks_info[0].bank_id);
|
||||||
let bank = bank_forks.working_bank();
|
let bank = bank_forks.working_bank();
|
||||||
@@ -219,8 +219,6 @@ impl ReplayStage {
|
|||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
let last_entry_id = Arc::new(RwLock::new(last_entry_id));
|
|
||||||
|
|
||||||
// Update Tpu and other fullnode components with the current bank
|
// Update Tpu and other fullnode components with the current bank
|
||||||
let (mut current_slot, mut current_leader_id, mut max_tick_height_for_slot) = {
|
let (mut current_slot, mut current_leader_id, mut max_tick_height_for_slot) = {
|
||||||
let leader_scheduler = leader_scheduler.read().unwrap();
|
let leader_scheduler = leader_scheduler.read().unwrap();
|
||||||
@@ -248,7 +246,7 @@ impl ReplayStage {
|
|||||||
to_leader_sender
|
to_leader_sender
|
||||||
.send(TvuRotationInfo {
|
.send(TvuRotationInfo {
|
||||||
bank: old_bank,
|
bank: old_bank,
|
||||||
last_entry_id: *last_entry_id.read().unwrap(),
|
last_entry_id,
|
||||||
slot,
|
slot,
|
||||||
leader_id,
|
leader_id,
|
||||||
})
|
})
|
||||||
@@ -341,7 +339,7 @@ impl ReplayStage {
|
|||||||
&voting_keypair,
|
&voting_keypair,
|
||||||
&ledger_entry_sender,
|
&ledger_entry_sender,
|
||||||
&mut current_blob_index,
|
&mut current_blob_index,
|
||||||
&last_entry_id,
|
&mut last_entry_id,
|
||||||
&leader_scheduler_,
|
&leader_scheduler_,
|
||||||
&subscriptions_,
|
&subscriptions_,
|
||||||
slot,
|
slot,
|
||||||
@@ -386,7 +384,7 @@ impl ReplayStage {
|
|||||||
let last_entry = blocktree
|
let last_entry = blocktree
|
||||||
.get_slot_entries(slot, meta.last_index, Some(1))
|
.get_slot_entries(slot, meta.last_index, Some(1))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
*(last_entry_id.write().unwrap()) = last_entry[0].id;
|
last_entry_id = last_entry[0].id;
|
||||||
}
|
}
|
||||||
|
|
||||||
let old_bank = bank.clone();
|
let old_bank = bank.clone();
|
||||||
@@ -419,7 +417,7 @@ impl ReplayStage {
|
|||||||
to_leader_sender
|
to_leader_sender
|
||||||
.send(TvuRotationInfo {
|
.send(TvuRotationInfo {
|
||||||
bank: old_bank,
|
bank: old_bank,
|
||||||
last_entry_id: *last_entry_id.read().unwrap(),
|
last_entry_id,
|
||||||
slot: next_slot,
|
slot: next_slot,
|
||||||
leader_id,
|
leader_id,
|
||||||
})
|
})
|
||||||
@@ -853,7 +851,7 @@ mod test {
|
|||||||
// Set up the cluster info
|
// Set up the cluster info
|
||||||
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
|
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
|
||||||
let (ledger_entry_sender, _ledger_entry_receiver) = channel();
|
let (ledger_entry_sender, _ledger_entry_receiver) = channel();
|
||||||
let last_entry_id = Hash::default();
|
let mut last_entry_id = Hash::default();
|
||||||
let mut current_blob_index = 0;
|
let mut current_blob_index = 0;
|
||||||
let mut last_id = Hash::default();
|
let mut last_id = Hash::default();
|
||||||
let mut entries = Vec::new();
|
let mut entries = Vec::new();
|
||||||
@@ -874,7 +872,7 @@ mod test {
|
|||||||
&voting_keypair,
|
&voting_keypair,
|
||||||
&ledger_entry_sender,
|
&ledger_entry_sender,
|
||||||
&mut current_blob_index,
|
&mut current_blob_index,
|
||||||
&Arc::new(RwLock::new(last_entry_id)),
|
&mut last_entry_id,
|
||||||
&leader_scheduler,
|
&leader_scheduler,
|
||||||
&Arc::new(RpcSubscriptions::default()),
|
&Arc::new(RpcSubscriptions::default()),
|
||||||
0,
|
0,
|
||||||
@@ -902,7 +900,7 @@ mod test {
|
|||||||
&voting_keypair,
|
&voting_keypair,
|
||||||
&ledger_entry_sender,
|
&ledger_entry_sender,
|
||||||
&mut current_blob_index,
|
&mut current_blob_index,
|
||||||
&Arc::new(RwLock::new(last_entry_id)),
|
&mut last_entry_id,
|
||||||
&leader_scheduler,
|
&leader_scheduler,
|
||||||
&Arc::new(RpcSubscriptions::default()),
|
&Arc::new(RpcSubscriptions::default()),
|
||||||
0,
|
0,
|
||||||
|
Reference in New Issue
Block a user