From a65022aed7672640add7130d6ef896e04a698912 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Tue, 18 Dec 2018 15:18:57 -0800 Subject: [PATCH] DbLedger doesn't need to be mut, doesn't need an RwLock (#2215) * DbLedger doesn't need to be mut, doesn't need an RwLock * fix erasure cases --- src/broadcast_service.rs | 18 +++++------ src/cluster_info.rs | 37 ++++++++------------- src/db_ledger.rs | 16 +++++----- src/db_window.rs | 69 ++++++++++++++++------------------------ src/erasure.rs | 51 ++++++++++------------------- src/fullnode.rs | 6 ++-- src/gossip_service.rs | 2 +- src/replicator.rs | 4 +-- src/retransmit_stage.rs | 2 +- src/tvu.rs | 4 +-- src/window_service.rs | 25 ++++++--------- 11 files changed, 95 insertions(+), 139 deletions(-) diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index fb7d81a100..6b3020bab3 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -33,7 +33,7 @@ pub enum BroadcastServiceReturnType { #[allow(clippy::too_many_arguments)] fn broadcast( - db_ledger: &Arc>, + db_ledger: &Arc, max_tick_height: Option, leader_id: Pubkey, node_info: &NodeInfo, @@ -130,7 +130,7 @@ fn broadcast( assert!(win[pos].data.is_none()); win[pos].data = Some(b.clone()); } - db_ledger.write().unwrap().write_shared_blobs(vec![b])?; + db_ledger.write_shared_blobs(vec![b])?; } } @@ -236,7 +236,7 @@ pub struct BroadcastService { impl BroadcastService { fn run( - db_ledger: &Arc>, + db_ledger: &Arc, sock: &UdpSocket, cluster_info: &Arc>, window: &SharedWindow, @@ -304,7 +304,7 @@ impl BroadcastService { /// completing the cycle. #[allow(clippy::too_many_arguments, clippy::new_ret_no_self)] pub fn new( - db_ledger: Arc>, + db_ledger: Arc, sock: UdpSocket, cluster_info: Arc>, window: SharedWindow, @@ -365,7 +365,7 @@ mod test { use std::time::Duration; struct DummyBroadcastService { - db_ledger: Arc>, + db_ledger: Arc, broadcast_service: BroadcastService, entry_sender: Sender>, exit_signal: Arc, @@ -379,7 +379,7 @@ mod test { max_tick_height: u64, ) -> DummyBroadcastService { // Make the database ledger - let db_ledger = Arc::new(RwLock::new(DbLedger::open(ledger_path).unwrap())); + let db_ledger = Arc::new(DbLedger::open(ledger_path).unwrap()); // Make the leader node and scheduler let leader_info = Node::new_localhost_with_pubkey(leader_pubkey); @@ -459,16 +459,16 @@ mod test { } sleep(Duration::from_millis(2000)); - let r_db = broadcast_service.db_ledger.read().unwrap(); + let db_ledger = broadcast_service.db_ledger; for i in 0..max_tick_height - start_tick_height { let (_, slot) = leader_scheduler .read() .unwrap() .get_scheduled_leader(start_tick_height + i + 1) .expect("Leader should exist"); - let result = r_db + let result = db_ledger .data_cf - .get_by_slot_index(&r_db.db, slot, entry_height + i) + .get_by_slot_index(&db_ledger.db, slot, entry_height + i) .unwrap(); assert!(result.is_some()); diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 15ac356058..f5449b1504 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -673,26 +673,20 @@ impl ClusterInfo { fn run_window_request( from: &NodeInfo, from_addr: &SocketAddr, - db_ledger: Option<&Arc>>, + db_ledger: Option<&Arc>, me: &NodeInfo, ix: u64, ) -> Vec { if let Some(db_ledger) = db_ledger { - let meta = { - let r_db = db_ledger.read().unwrap(); - - r_db.meta_cf - .get(&r_db.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) - }; + let meta = db_ledger + .meta_cf + .get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)); if let Ok(Some(meta)) = meta { let max_slot = meta.received_slot; // Try to find the requested index in one of the slots for i in 0..=max_slot { - let get_result = { - let r_db = db_ledger.read().unwrap(); - r_db.data_cf.get_by_slot_index(&r_db.db, i, ix) - }; + let get_result = db_ledger.data_cf.get_by_slot_index(&db_ledger.db, i, ix); if let Ok(Some(blob_data)) = get_result { inc_new_counter_info!("cluster_info-window-request-ledger", 1); @@ -716,7 +710,7 @@ impl ClusterInfo { //TODO we should first coalesce all the requests fn handle_blob( obj: &Arc>, - db_ledger: Option<&Arc>>, + db_ledger: Option<&Arc>, blob: &Blob, ) -> Vec { deserialize(&blob.data[..blob.meta.size]) @@ -830,7 +824,7 @@ impl ClusterInfo { fn handle_request_window_index( me: &Arc>, from: &ContactInfo, - db_ledger: Option<&Arc>>, + db_ledger: Option<&Arc>, ix: u64, from_addr: &SocketAddr, ) -> Vec { @@ -870,7 +864,7 @@ impl ClusterInfo { fn handle_protocol( me: &Arc>, from_addr: &SocketAddr, - db_ledger: Option<&Arc>>, + db_ledger: Option<&Arc>, request: Protocol, ) -> Vec { match request { @@ -934,7 +928,7 @@ impl ClusterInfo { /// Process messages from the network fn run_listen( obj: &Arc>, - db_ledger: Option<&Arc>>, + db_ledger: Option<&Arc>, requests_receiver: &BlobReceiver, response_sender: &BlobSender, ) -> Result<()> { @@ -954,7 +948,7 @@ impl ClusterInfo { } pub fn listen( me: Arc>, - db_ledger: Option>>, + db_ledger: Option>, requests_receiver: BlobReceiver, response_sender: BlobSender, exit: Arc, @@ -1225,7 +1219,7 @@ mod tests { solana_logger::setup(); let ledger_path = get_tmp_ledger_path("run_window_request"); { - let db_ledger = Arc::new(RwLock::new(DbLedger::open(&ledger_path).unwrap())); + let db_ledger = Arc::new(DbLedger::open(&ledger_path).unwrap()); let me = NodeInfo::new( Keypair::new().pubkey(), socketaddr!("127.0.0.1:1234"), @@ -1249,12 +1243,9 @@ mod tests { w_blob.meta.size = data_size + BLOB_HEADER_SIZE; } - { - let mut w_ledger = db_ledger.write().unwrap(); - w_ledger - .write_shared_blobs(vec![&blob]) - .expect("Expect successful ledger write"); - } + db_ledger + .write_shared_blobs(vec![&blob]) + .expect("Expect successful ledger write"); let rv = ClusterInfo::run_window_request(&me, &socketaddr_any!(), Some(&db_ledger), &me, 1); diff --git a/src/db_ledger.rs b/src/db_ledger.rs index ec36ce5f6f..bfd267e222 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -285,7 +285,7 @@ impl DbLedger { Ok(()) } - pub fn write_shared_blobs(&mut self, shared_blobs: I) -> Result> + pub fn write_shared_blobs(&self, shared_blobs: I) -> Result> where I: IntoIterator, I::Item: Borrow, @@ -302,7 +302,7 @@ impl DbLedger { Ok(entries) } - pub fn write_blobs<'a, I>(&mut self, blobs: I) -> Result> + pub fn write_blobs<'a, I>(&self, blobs: I) -> Result> where I: IntoIterator, { @@ -316,7 +316,7 @@ impl DbLedger { Ok(entries) } - pub fn write_entries(&mut self, slot: u64, entries: I) -> Result> + pub fn write_entries(&self, slot: u64, entries: I) -> Result> where I: IntoIterator, I::Item: Borrow, @@ -427,7 +427,7 @@ impl DbLedger { // // Return tuple of (number of blob read, total size of blobs read) pub fn get_blob_bytes( - &mut self, + &self, start_index: u64, num_blobs: u64, buf: &mut [u8], @@ -533,7 +533,7 @@ where { let mut entries = entries.into_iter(); for ledger_path in ledger_paths { - let mut db_ledger = + let db_ledger = DbLedger::open(ledger_path).expect("Expected to be able to open database ledger"); db_ledger .write_entries(slot_height, entries.by_ref()) @@ -545,7 +545,7 @@ pub fn genesis<'a, I>(ledger_path: &str, keypair: &Keypair, entries: I) -> Resul where I: IntoIterator, { - let mut db_ledger = DbLedger::open(ledger_path)?; + let db_ledger = DbLedger::open(ledger_path)?; // TODO sign these blobs with keypair let blobs = entries.into_iter().enumerate().map(|(idx, entry)| { @@ -631,7 +631,7 @@ mod tests { let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); let ledger_path = get_tmp_ledger_path("test_get_blobs_bytes"); - let mut ledger = DbLedger::open(&ledger_path).unwrap(); + let ledger = DbLedger::open(&ledger_path).unwrap(); ledger.write_blobs(&blobs).unwrap(); let mut buf = [0; 1024]; @@ -814,7 +814,7 @@ mod tests { // Create RocksDb ledger let db_ledger_path = get_tmp_ledger_path("test_iteration_order"); { - let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); + let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); // Write entries let num_entries = 8; diff --git a/src/db_window.rs b/src/db_window.rs index 87adca46ec..0e4c9e733d 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -273,7 +273,7 @@ pub fn add_blob_to_retransmit_queue( /// range of blobs to a queue to be sent on to the next stage. pub fn process_blob( leader_scheduler: &Arc>, - db_ledger: &Arc>, + db_ledger: &Arc, blob: &SharedBlob, max_ix: u64, consume_queue: &mut Vec, @@ -303,21 +303,15 @@ pub fn process_blob( let erasure_key = ErasureCf::key(slot, pix); let rblob = &blob.read().unwrap(); let size = rblob.size()?; - { - let w_db = db_ledger.write().unwrap(); - w_db.erasure_cf.put( - &w_db.db, - &erasure_key, - &rblob.data[..BLOB_HEADER_SIZE + size], - )?; - } + db_ledger.erasure_cf.put( + &db_ledger.db, + &erasure_key, + &rblob.data[..BLOB_HEADER_SIZE + size], + )?; vec![] } else { let data_key = DataCf::key(slot, pix); - db_ledger - .write() - .unwrap() - .insert_data_blob(&data_key, &blob.read().unwrap())? + db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())? }; #[cfg(feature = "erasure")] @@ -341,12 +335,10 @@ pub fn process_blob( // we only want up to a certain index // then stop if max_ix != 0 && !consumed_entries.is_empty() { - let meta = { - let r_db = db_ledger.read().unwrap(); - r_db.meta_cf - .get(&r_db.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))? - .expect("Expect metadata to exist if consumed entries is nonzero") - }; + let meta = db_ledger + .meta_cf + .get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))? + .expect("Expect metadata to exist if consumed entries is nonzero"); let consumed = meta.consumed; @@ -385,12 +377,10 @@ pub fn calculate_max_repair_entry_height( } #[cfg(feature = "erasure")] -fn try_erasure(db_ledger: &Arc>, consume_queue: &mut Vec) -> Result<()> { - let meta = { - let r_db = db_ledger.read().unwrap(); - r_db.meta_cf - .get(&r_db.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))? - }; +fn try_erasure(db_ledger: &Arc, consume_queue: &mut Vec) -> Result<()> { + let meta = db_ledger + .meta_cf + .get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))?; if let Some(meta) = meta { let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?; @@ -401,12 +391,14 @@ fn try_erasure(db_ledger: &Arc>, consume_queue: &mut Vec cl.index().expect("Recovered blob must set index"), ); let size = cl.size().expect("Recovered blob must set size"); - let r_db = db_ledger.read().unwrap(); - r_db.erasure_cf - .put(&r_db.db, &erasure_key, &cl.data[..BLOB_HEADER_SIZE + size])?; + db_ledger.erasure_cf.put( + &db_ledger.db, + &erasure_key, + &cl.data[..BLOB_HEADER_SIZE + size], + )?; } - let entries = db_ledger.write().unwrap().write_shared_blobs(data)?; + let entries = db_ledger.write_shared_blobs(data)?; consume_queue.extend(entries); } @@ -549,7 +541,7 @@ mod test { // Create RocksDb ledger let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes_sanity"); - let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); + let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); // Early exit conditions let empty: Vec = vec![]; @@ -597,7 +589,7 @@ mod test { let slot = DEFAULT_SLOT_HEIGHT; // Create RocksDb ledger let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes"); - let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); + let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); // Write entries let gap = 10; @@ -687,7 +679,7 @@ mod test { let slot = DEFAULT_SLOT_HEIGHT; // Create RocksDb ledger let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes"); - let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); + let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); // Write entries let num_entries = 10; @@ -741,11 +733,7 @@ mod test { // Generate the db_ledger from the window let ledger_path = get_tmp_ledger_path("test_try_erasure"); - let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( - &ledger_path, - &window, - false, - ))); + let db_ledger = Arc::new(generate_db_ledger_from_window(&ledger_path, &window, false)); let mut consume_queue = vec![]; try_erasure(&db_ledger, &mut consume_queue).expect("Expected successful erasure attempt"); @@ -759,11 +747,10 @@ mod test { assert_eq!(consume_queue, expected); let erased_coding_l = erased_coding.read().unwrap(); - let r_db = db_ledger.read().unwrap(); assert_eq!( - &r_db + &db_ledger .erasure_cf - .get_by_slot_index(&r_db.db, slot_height, erase_offset as u64) + .get_by_slot_index(&db_ledger.db, slot_height, erase_offset as u64) .unwrap() .unwrap()[BLOB_HEADER_SIZE..], &erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize], @@ -778,7 +765,7 @@ mod test { // Create RocksDb ledger let db_ledger_path = get_tmp_ledger_path("test_process_blob"); - let db_ledger = Arc::new(RwLock::new(DbLedger::open(&db_ledger_path).unwrap())); + let db_ledger = Arc::new(DbLedger::open(&db_ledger_path).unwrap()); // Mock the tick height to look like the tick height right after a leader transition leader_scheduler.last_seed_height = None; diff --git a/src/erasure.rs b/src/erasure.rs index 862389542a..18e7bf7d7c 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -351,7 +351,7 @@ pub fn generate_coding( // Recover the missing data and coding blobs from the input ledger. Returns a vector // of the recovered missing data blobs and a vector of the recovered coding blobs pub fn recover( - db_ledger: &Arc>, + db_ledger: &Arc, slot: u64, start_idx: u64, ) -> Result<(Vec, Vec)> { @@ -367,17 +367,11 @@ pub fn recover( block_end_idx ); - let data_missing = find_missing_data_indexes( - slot, - &db_ledger.read().unwrap(), - block_start_idx, - block_end_idx, - NUM_DATA, - ) - .len(); + let data_missing = + find_missing_data_indexes(slot, &db_ledger, block_start_idx, block_end_idx, NUM_DATA).len(); let coding_missing = find_missing_coding_indexes( slot, - &db_ledger.read().unwrap(), + &db_ledger, coding_start_idx, block_end_idx, NUM_CODING, @@ -416,10 +410,9 @@ pub fn recover( // Add the data blobs we have into the recovery vector, mark the missing ones for i in block_start_idx..block_end_idx { - let result = { - let r_db = db_ledger.read().unwrap(); - r_db.data_cf.get_by_slot_index(&r_db.db, slot, i)? - }; + let result = db_ledger + .data_cf + .get_by_slot_index(&db_ledger.db, slot, i)?; categorize_blob( &result, @@ -432,10 +425,9 @@ pub fn recover( // Add the coding blobs we have into the recovery vector, mark the missing ones for i in coding_start_idx..block_end_idx { - let result = { - let r_db = db_ledger.read().unwrap(); - r_db.erasure_cf.get_by_slot_index(&r_db.db, slot, i)? - }; + let result = db_ledger + .erasure_cf + .get_by_slot_index(&db_ledger.db, slot, i)?; categorize_blob( &result, @@ -528,10 +520,9 @@ pub fn recover( // Remove the corrupted coding blobs so there's no effort wasted in trying to reconstruct // the blobs again for i in coding_start_idx..block_end_idx { - { - let r_db = db_ledger.read().unwrap(); - r_db.erasure_cf.delete_by_slot_index(&r_db.db, slot, i)?; - } + db_ledger + .erasure_cf + .delete_by_slot_index(&db_ledger.db, slot, i)?; } return Ok((vec![], vec![])); } @@ -576,7 +567,7 @@ pub mod test { use rand::{thread_rng, Rng}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; - use std::sync::{Arc, RwLock}; + use std::sync::Arc; #[test] pub fn test_coding() { @@ -636,7 +627,7 @@ pub mod test { window: &[WindowSlot], use_random: bool, ) -> DbLedger { - let mut db_ledger = + let db_ledger = DbLedger::open(ledger_path).expect("Expected to be able to open database ledger"); for slot in window { if let Some(ref data) = slot.data { @@ -842,11 +833,7 @@ pub mod test { // Generate the db_ledger from the window let ledger_path = get_tmp_ledger_path("test_window_recover_basic"); - let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( - &ledger_path, - &window, - true, - ))); + let db_ledger = Arc::new(generate_db_ledger_from_window(&ledger_path, &window, true)); // Recover it from coding let (recovered_data, recovered_coding) = recover(&db_ledger, 0, offset as u64) @@ -896,11 +883,7 @@ pub mod test { window[erase_offset].data = None; window[erase_offset].coding = None; let ledger_path = get_tmp_ledger_path("test_window_recover_basic2"); - let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( - &ledger_path, - &window, - true, - ))); + let db_ledger = Arc::new(generate_db_ledger_from_window(&ledger_path, &window, true)); // Recover it from coding let (recovered_data, recovered_coding) = recover(&db_ledger, 0, offset as u64) diff --git a/src/fullnode.rs b/src/fullnode.rs index 91d0e92eba..300d24e735 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -108,7 +108,7 @@ pub struct Fullnode { broadcast_socket: UdpSocket, rpc_addr: SocketAddr, rpc_pubsub_addr: SocketAddr, - db_ledger: Arc>, + db_ledger: Arc, } impl Fullnode { @@ -587,7 +587,7 @@ impl Fullnode { ) } - fn make_db_ledger(ledger_path: &str) -> Arc> { + fn make_db_ledger(ledger_path: &str) -> Arc { // Destroy any existing instances of the RocksDb ledger DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); let ledger_entries = read_ledger(ledger_path, true) @@ -597,7 +597,7 @@ impl Fullnode { write_entries_to_ledger(&[ledger_path], ledger_entries, DEFAULT_SLOT_HEIGHT); let db = DbLedger::open(ledger_path).expect("Expected to successfully open database ledger"); - Arc::new(RwLock::new(db)) + Arc::new(db) } } diff --git a/src/gossip_service.rs b/src/gossip_service.rs index bde0cf04cf..6f56ebb2f4 100644 --- a/src/gossip_service.rs +++ b/src/gossip_service.rs @@ -18,7 +18,7 @@ pub struct GossipService { impl GossipService { pub fn new( cluster_info: &Arc>, - db_ledger: Option>>, + db_ledger: Option>, gossip_socket: UdpSocket, exit: Arc, ) -> Self { diff --git a/src/replicator.rs b/src/replicator.rs index 411380e30b..d864e92ac5 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -113,10 +113,10 @@ impl Replicator { // RocksDb. Note for now, this ledger will not contain any of the existing entries // in the ledger located at ledger_path, and will only append on newly received // entries after being passed to window_service - let db_ledger = Arc::new(RwLock::new( + let db_ledger = Arc::new( DbLedger::open(&ledger_path.unwrap()) .expect("Expected to be able to open database ledger"), - )); + ); let gossip_service = GossipService::new( &cluster_info, diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index e5a3a8474b..be58faf25f 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -83,7 +83,7 @@ pub struct RetransmitStage { impl RetransmitStage { #[allow(clippy::new_ret_no_self)] pub fn new( - db_ledger: Arc>, + db_ledger: Arc, cluster_info: &Arc>, tick_height: u64, entry_height: u64, diff --git a/src/tvu.rs b/src/tvu.rs index b55c16d585..c6696e6730 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -70,7 +70,7 @@ impl Tvu { cluster_info: &Arc>, sockets: Sockets, ledger_path: Option<&str>, - db_ledger: Arc>, + db_ledger: Arc, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); let keypair: Arc = cluster_info @@ -294,7 +294,7 @@ pub mod tests { } }, None, - Arc::new(RwLock::new(db_ledger)), + Arc::new(db_ledger), ); let mut alice_ref_balance = starting_balance; diff --git a/src/window_service.rs b/src/window_service.rs index 2bda1b59ac..19ce74ad35 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -14,7 +14,6 @@ use rand::{thread_rng, Rng}; use solana_metrics::{influxdb, submit}; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; -use std::borrow::Borrow; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::mpsc::RecvTimeoutError; @@ -52,7 +51,7 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool { #[allow(clippy::too_many_arguments)] fn recv_window( - db_ledger: &Arc>, + db_ledger: &Arc, id: &Pubkey, leader_scheduler: &Arc>, tick_height: &mut u64, @@ -123,7 +122,7 @@ fn recv_window( #[allow(clippy::too_many_arguments)] pub fn window_service( - db_ledger: Arc>, + db_ledger: Arc, cluster_info: Arc>, tick_height: u64, entry_height: u64, @@ -165,13 +164,9 @@ pub fn window_service( } } - let meta = { - let rlock = db_ledger.read().unwrap(); - - rlock - .meta_cf - .get(&rlock.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) - }; + let meta = db_ledger + .meta_cf + .get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)); if let Ok(Some(meta)) = meta { let received = meta.received; @@ -203,7 +198,7 @@ pub fn window_service( trace!("{} let's repair! times = {}", id, times); let reqs = repair( - db_ledger.read().unwrap().borrow(), + &db_ledger, &cluster_info, &id, times, @@ -277,9 +272,9 @@ mod test { let (s_retransmit, r_retransmit) = channel(); let done = Arc::new(AtomicBool::new(false)); let db_ledger_path = get_tmp_ledger_path("window_send_test"); - let db_ledger = Arc::new(RwLock::new( + let db_ledger = Arc::new( DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), - )); + ); let t_window = window_service( db_ledger, subs, @@ -347,9 +342,9 @@ mod test { let (s_retransmit, r_retransmit) = channel(); let done = Arc::new(AtomicBool::new(false)); let db_ledger_path = get_tmp_ledger_path("window_send_late_leader_test"); - let db_ledger = Arc::new(RwLock::new( + let db_ledger = Arc::new( DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), - )); + ); let t_window = window_service( db_ledger, subs.clone(),