Fix try_erasure() (#2185)

* Fix try_erasure bug

* Re-enable asserts in test_replicator_startup

* Add test for out of order process_blobs
This commit is contained in:
carllin
2018-12-17 15:34:19 -08:00
committed by GitHub
parent fc56e1e517
commit 9720ac0019
3 changed files with 59 additions and 13 deletions

View File

@ -276,7 +276,6 @@ pub fn process_blob(
db_ledger: &Arc<RwLock<DbLedger>>, db_ledger: &Arc<RwLock<DbLedger>>,
blob: &SharedBlob, blob: &SharedBlob,
max_ix: u64, max_ix: u64,
pix: u64,
consume_queue: &mut Vec<Entry>, consume_queue: &mut Vec<Entry>,
tick_height: &mut u64, tick_height: &mut u64,
done: &Arc<AtomicBool>, done: &Arc<AtomicBool>,
@ -287,7 +286,10 @@ pub fn process_blob(
// TODO: Need to update slot in broadcast, otherwise this check will fail with // TODO: Need to update slot in broadcast, otherwise this check will fail with
// leader rotation enabled // leader rotation enabled
// Github issue: https://github.com/solana-labs/solana/issues/1899. // Github issue: https://github.com/solana-labs/solana/issues/1899.
let slot = blob.read().unwrap().slot()?; let (slot, pix) = {
let r_blob = blob.read().unwrap();
(r_blob.slot()?, r_blob.index()?)
};
let leader = leader_scheduler.read().unwrap().get_leader_for_slot(slot); let leader = leader_scheduler.read().unwrap().get_leader_for_slot(slot);
// TODO: Once the original leader signature is added to the blob, make sure that // TODO: Once the original leader signature is added to the blob, make sure that
@ -323,7 +325,7 @@ pub fn process_blob(
// If write_shared_blobs() of these recovered blobs fails fails, don't return // If write_shared_blobs() of these recovered blobs fails fails, don't return
// because consumed_entries might be nonempty from earlier, and tick height needs to // because consumed_entries might be nonempty from earlier, and tick height needs to
// be updated. Hopefully we can recover these blobs next time successfully. // be updated. Hopefully we can recover these blobs next time successfully.
if let Err(e) = try_erasure(db_ledger, consume_queue) { if let Err(e) = try_erasure(db_ledger, &mut consumed_entries) {
trace!( trace!(
"erasure::recover failed to write recovered coding blobs. Err: {:?}", "erasure::recover failed to write recovered coding blobs. Err: {:?}",
e e
@ -767,4 +769,55 @@ mod test {
&erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize], &erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize],
); );
} }
#[test]
fn test_process_blob() {
// Create the leader scheduler
let leader_keypair = Keypair::new();
let mut leader_scheduler = LeaderScheduler::from_bootstrap_leader(leader_keypair.pubkey());
// Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_process_blob");
let db_ledger = Arc::new(RwLock::new(DbLedger::open(&db_ledger_path).unwrap()));
// Mock the tick height to look like the tick height right after a leader transition
leader_scheduler.last_seed_height = None;
leader_scheduler.use_only_bootstrap_leader = false;
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
let num_entries = 10;
let original_entries = make_tiny_test_entries(num_entries);
let shared_blobs = original_entries.clone().to_blobs();
index_blobs(
shared_blobs
.iter()
.zip(vec![DEFAULT_SLOT_HEIGHT; num_entries].into_iter()),
&Keypair::new().pubkey(),
0,
);
let mut consume_queue = vec![];
let mut tick_height = 2;
let done = Arc::new(AtomicBool::new(false));
for blob in shared_blobs.iter().rev() {
process_blob(
&leader_scheduler,
&db_ledger,
blob,
0,
&mut consume_queue,
&mut tick_height,
&done,
)
.expect("Expect successful processing of blob");
}
assert_eq!(consume_queue, original_entries);
drop(db_ledger);
DB::destroy(&Options::default(), &db_ledger_path)
.expect("Expected successful database destruction");
}
} }

View File

@ -79,7 +79,6 @@ fn recv_window(
retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit)?; retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit)?;
let mut pixs = Vec::new();
//send a contiguous set of blocks //send a contiguous set of blocks
let mut consume_queue = Vec::new(); let mut consume_queue = Vec::new();
@ -97,8 +96,6 @@ fn recv_window(
.to_owned(), .to_owned(),
); );
pixs.push(pix);
trace!("{} window pix: {} size: {}", id, pix, meta_size); trace!("{} window pix: {} size: {}", id, pix, meta_size);
let _ = process_blob( let _ = process_blob(
@ -106,7 +103,6 @@ fn recv_window(
db_ledger, db_ledger,
&b, &b,
max_ix, max_ix,
pix,
&mut consume_queue, &mut consume_queue,
tick_height, tick_height,
done, done,

View File

@ -136,7 +136,7 @@ fn test_replicator_startup() {
use solana::rpc_request::{RpcClient, RpcRequest}; use solana::rpc_request::{RpcClient, RpcRequest};
let rpc_client = RpcClient::new_from_socket(validator_node_info.rpc); let rpc_client = RpcClient::new_from_socket(validator_node_info.rpc);
//let mut non_zero_pubkeys = false; let mut non_zero_pubkeys = false;
for _ in 0..30 { for _ in 0..30 {
let params = json!([0]); let params = json!([0]);
let pubkeys = RpcRequest::GetStoragePubkeysForEntryHeight let pubkeys = RpcRequest::GetStoragePubkeysForEntryHeight
@ -144,15 +144,12 @@ fn test_replicator_startup() {
.unwrap(); .unwrap();
info!("pubkeys: {:?}", pubkeys); info!("pubkeys: {:?}", pubkeys);
if pubkeys.as_array().unwrap().len() != 0 { if pubkeys.as_array().unwrap().len() != 0 {
//non_zero_pubkeys = true; non_zero_pubkeys = true;
break; break;
} }
sleep(Duration::from_secs(1)); sleep(Duration::from_secs(1));
} }
// this seems to assert when erasure is on, assert!(non_zero_pubkeys);
// entries may not be processed correctly
// TODO: turn it back on
//assert!(non_zero_pubkeys);
} }
// Check that some ledger was downloaded // Check that some ledger was downloaded