From 9720ac001911d5b4c95d8f3e840561de41f8a208 Mon Sep 17 00:00:00 2001 From: carllin Date: Mon, 17 Dec 2018 15:34:19 -0800 Subject: [PATCH] Fix try_erasure() (#2185) * Fix try_erasure bug * Re-enable asserts in test_replicator_startup * Add test for out of order process_blobs --- src/db_window.rs | 59 ++++++++++++++++++++++++++++++++++++++++--- src/window_service.rs | 4 --- tests/replicator.rs | 9 +++---- 3 files changed, 59 insertions(+), 13 deletions(-) diff --git a/src/db_window.rs b/src/db_window.rs index 50b1563089..87adca46ec 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -276,7 +276,6 @@ pub fn process_blob( db_ledger: &Arc>, blob: &SharedBlob, max_ix: u64, - pix: u64, consume_queue: &mut Vec, tick_height: &mut u64, done: &Arc, @@ -287,7 +286,10 @@ pub fn process_blob( // TODO: Need to update slot in broadcast, otherwise this check will fail with // leader rotation enabled // Github issue: https://github.com/solana-labs/solana/issues/1899. - let slot = blob.read().unwrap().slot()?; + let (slot, pix) = { + let r_blob = blob.read().unwrap(); + (r_blob.slot()?, r_blob.index()?) + }; let leader = leader_scheduler.read().unwrap().get_leader_for_slot(slot); // TODO: Once the original leader signature is added to the blob, make sure that @@ -323,7 +325,7 @@ pub fn process_blob( // If write_shared_blobs() of these recovered blobs fails fails, don't return // because consumed_entries might be nonempty from earlier, and tick height needs to // be updated. Hopefully we can recover these blobs next time successfully. - if let Err(e) = try_erasure(db_ledger, consume_queue) { + if let Err(e) = try_erasure(db_ledger, &mut consumed_entries) { trace!( "erasure::recover failed to write recovered coding blobs. Err: {:?}", e @@ -767,4 +769,55 @@ mod test { &erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize], ); } + + #[test] + fn test_process_blob() { + // Create the leader scheduler + let leader_keypair = Keypair::new(); + let mut leader_scheduler = LeaderScheduler::from_bootstrap_leader(leader_keypair.pubkey()); + + // Create RocksDb ledger + let db_ledger_path = get_tmp_ledger_path("test_process_blob"); + let db_ledger = Arc::new(RwLock::new(DbLedger::open(&db_ledger_path).unwrap())); + + // Mock the tick height to look like the tick height right after a leader transition + leader_scheduler.last_seed_height = None; + leader_scheduler.use_only_bootstrap_leader = false; + + let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); + let num_entries = 10; + let original_entries = make_tiny_test_entries(num_entries); + let shared_blobs = original_entries.clone().to_blobs(); + + index_blobs( + shared_blobs + .iter() + .zip(vec![DEFAULT_SLOT_HEIGHT; num_entries].into_iter()), + &Keypair::new().pubkey(), + 0, + ); + + let mut consume_queue = vec![]; + let mut tick_height = 2; + let done = Arc::new(AtomicBool::new(false)); + + for blob in shared_blobs.iter().rev() { + process_blob( + &leader_scheduler, + &db_ledger, + blob, + 0, + &mut consume_queue, + &mut tick_height, + &done, + ) + .expect("Expect successful processing of blob"); + } + + assert_eq!(consume_queue, original_entries); + + drop(db_ledger); + DB::destroy(&Options::default(), &db_ledger_path) + .expect("Expected successful database destruction"); + } } diff --git a/src/window_service.rs b/src/window_service.rs index 520948a16f..2bda1b59ac 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -79,7 +79,6 @@ fn recv_window( retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit)?; - let mut pixs = Vec::new(); //send a contiguous set of blocks let mut consume_queue = Vec::new(); @@ -97,8 +96,6 @@ fn recv_window( .to_owned(), ); - pixs.push(pix); - trace!("{} window pix: {} size: {}", id, pix, meta_size); let _ = process_blob( @@ -106,7 +103,6 @@ fn recv_window( db_ledger, &b, max_ix, - pix, &mut consume_queue, tick_height, done, diff --git a/tests/replicator.rs b/tests/replicator.rs index bb9137dd2a..81b336f0f1 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -136,7 +136,7 @@ fn test_replicator_startup() { use solana::rpc_request::{RpcClient, RpcRequest}; let rpc_client = RpcClient::new_from_socket(validator_node_info.rpc); - //let mut non_zero_pubkeys = false; + let mut non_zero_pubkeys = false; for _ in 0..30 { let params = json!([0]); let pubkeys = RpcRequest::GetStoragePubkeysForEntryHeight @@ -144,15 +144,12 @@ fn test_replicator_startup() { .unwrap(); info!("pubkeys: {:?}", pubkeys); if pubkeys.as_array().unwrap().len() != 0 { - //non_zero_pubkeys = true; + non_zero_pubkeys = true; break; } sleep(Duration::from_secs(1)); } - // this seems to assert when erasure is on, - // entries may not be processed correctly - // TODO: turn it back on - //assert!(non_zero_pubkeys); + assert!(non_zero_pubkeys); } // Check that some ledger was downloaded