Replace LeaderScheduler with LeaderScheduler1 (#2962)

* Migrate to LeaderScheduler1 (and added some missing methods)
* Delete LeaderScheduler
* Rename LeaderScheduler1 to LeaderScheduler
This commit is contained in:
Pankaj Garg
2019-02-26 21:16:18 -08:00
committed by Greg Fitzgerald
parent 9750488200
commit 789fff2ae2
15 changed files with 335 additions and 1617 deletions

View File

@@ -1,4 +1,5 @@
//! Set of functions for emulating windowing functions from a database ledger implementation
use crate::bank_forks::BankForks;
use crate::blocktree::*;
#[cfg(feature = "erasure")]
use crate::erasure;
@@ -16,18 +17,18 @@ pub const MAX_REPAIR_LENGTH: usize = 128;
pub fn retransmit_blobs(
dq: &[SharedBlob],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
bank_forks: &Arc<RwLock<BankForks>>,
retransmit: &BlobSender,
id: &Pubkey,
) -> Result<()> {
let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
let leader_scheduler = LeaderScheduler::default();
for b in dq {
let bank = bank_forks.read().unwrap().working_bank();
// Don't add blobs generated by this node to the retransmit queue
let slot = b.read().unwrap().slot();
if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) {
if leader_id != *id {
retransmit_queue.push(b.clone());
}
if leader_scheduler.slot_leader_at(slot, &bank) != *id {
retransmit_queue.push(b.clone());
}
}
@@ -50,7 +51,7 @@ pub fn retransmit_blobs(
/// Process a blob: Add blob to the ledger window.
pub fn process_blob(
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
bank_forks: &Arc<RwLock<BankForks>>,
blocktree: &Arc<Blocktree>,
blob: &SharedBlob,
) -> Result<()> {
@@ -61,14 +62,11 @@ pub fn process_blob(
let r_blob = blob.read().unwrap();
(r_blob.slot(), r_blob.index())
};
let leader = leader_scheduler.read().unwrap().get_leader_for_slot(slot);
let bank = bank_forks.read().unwrap().working_bank();
let _leader = LeaderScheduler::default().slot_leader_at(slot, &bank);
// TODO: Once the original leader signature is added to the blob, make sure that
// the blob was originally generated by the expected leader for this slot
if leader.is_none() {
warn!("No leader for slot {}, blob dropped", slot);
return Ok(()); // Occurs as a leader is rotating into a validator
}
// Insert the new blob into block tree
if is_coding {
@@ -126,7 +124,8 @@ mod test {
use crate::erasure::{NUM_CODING, NUM_DATA};
use crate::packet::{index_blobs, Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
use crate::streamer::{receiver, responder, PacketReceiver};
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use std::io;
use std::io::Write;
use std::net::UdpSocket;
@@ -194,54 +193,54 @@ mod test {
t_receiver.join().expect("join");
t_responder.join().expect("join");
}
/*
#[test]
pub fn test_send_to_retransmit_stage() {
let leader = Keypair::new().pubkey();
let nonleader = Keypair::new().pubkey();
let mut leader_scheduler = LeaderScheduler::default();
leader_scheduler.set_leader_schedule(vec![leader]);
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
let blob = SharedBlob::default();
#[test]
pub fn test_send_to_retransmit_stage() {
let leader = Keypair::new().pubkey();
let nonleader = Keypair::new().pubkey();
let mut leader_scheduler = LeaderScheduler::default();
leader_scheduler.set_leader_schedule(vec![leader]);
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
let blob = SharedBlob::default();
let (blob_sender, blob_receiver) = channel();
let (blob_sender, blob_receiver) = channel();
// Expect all blobs to be sent to retransmit_stage
blob.write().unwrap().forward(false);
retransmit_blobs(
&vec![blob.clone()],
&leader_scheduler,
&blob_sender,
&nonleader,
)
.expect("Expect successful retransmit");
let _ = blob_receiver
.try_recv()
.expect("Expect input blob to be retransmitted");
blob.write().unwrap().forward(true);
retransmit_blobs(
&vec![blob.clone()],
&leader_scheduler,
&blob_sender,
&nonleader,
)
.expect("Expect successful retransmit");
let output_blob = blob_receiver
.try_recv()
.expect("Expect input blob to be retransmitted");
// retransmit_blobs shouldn't be modifying the blob. That is retransmit stage's job now
assert_eq!(*output_blob[0].read().unwrap(), *blob.read().unwrap());
// Expect blob from leader while currently leader to not be retransmitted
// Even when forward is set
blob.write().unwrap().forward(true);
retransmit_blobs(&vec![blob], &leader_scheduler, &blob_sender, &leader)
// Expect all blobs to be sent to retransmit_stage
blob.write().unwrap().forward(false);
retransmit_blobs(
&vec![blob.clone()],
&leader_scheduler,
&blob_sender,
&nonleader,
)
.expect("Expect successful retransmit");
assert!(blob_receiver.try_recv().is_err());
}
let _ = blob_receiver
.try_recv()
.expect("Expect input blob to be retransmitted");
blob.write().unwrap().forward(true);
retransmit_blobs(
&vec![blob.clone()],
&leader_scheduler,
&blob_sender,
&nonleader,
)
.expect("Expect successful retransmit");
let output_blob = blob_receiver
.try_recv()
.expect("Expect input blob to be retransmitted");
// retransmit_blobs shouldn't be modifying the blob. That is retransmit stage's job now
assert_eq!(*output_blob[0].read().unwrap(), *blob.read().unwrap());
// Expect blob from leader while currently leader to not be retransmitted
// Even when forward is set
blob.write().unwrap().forward(true);
retransmit_blobs(&vec![blob], &leader_scheduler, &blob_sender, &leader)
.expect("Expect successful retransmit");
assert!(blob_receiver.try_recv().is_err());
}
*/
#[test]
pub fn test_find_missing_data_indexes_sanity() {
let slot = 0;
@@ -533,21 +532,24 @@ mod test {
#[test]
fn test_process_blob() {
let mut leader_scheduler = LeaderScheduler::default();
leader_scheduler.set_leader_schedule(vec![Keypair::new().pubkey()]);
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap());
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
let (genesis_block, _) = GenesisBlock::new(100);
let bank0 = Bank::new(&genesis_block);
let bank_id = 0;
let bank_forks = BankForks::new(bank_id, bank0);
let num_entries = 10;
let original_entries = make_tiny_test_entries(num_entries);
let shared_blobs = original_entries.clone().to_shared_blobs();
index_blobs(&shared_blobs, &mut 0, 0);
let bank_forks = Arc::new(RwLock::new(bank_forks));
for blob in shared_blobs.iter().rev() {
process_blob(&leader_scheduler, &blocktree, blob)
process_blob(&bank_forks, &blocktree, blob)
.expect("Expect successful processing of blob");
}