Do a recv on join to prevent channel destruction (#1320)
before window thread join
This commit is contained in:
@ -8,6 +8,7 @@ use std::sync::atomic::AtomicBool;
|
|||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread::JoinHandle;
|
use std::thread::JoinHandle;
|
||||||
|
use std::time::Duration;
|
||||||
use store_ledger_stage::StoreLedgerStage;
|
use store_ledger_stage::StoreLedgerStage;
|
||||||
use streamer::BlobReceiver;
|
use streamer::BlobReceiver;
|
||||||
use window;
|
use window;
|
||||||
@ -86,6 +87,14 @@ impl Replicator {
|
|||||||
self.fetch_stage.join().unwrap();
|
self.fetch_stage.join().unwrap();
|
||||||
self.t_window.join().unwrap();
|
self.t_window.join().unwrap();
|
||||||
self.store_ledger_stage.join().unwrap();
|
self.store_ledger_stage.join().unwrap();
|
||||||
|
|
||||||
|
// Drain the queue here to prevent self.retransmit_receiver from being dropped
|
||||||
|
// before the window_service thread is joined
|
||||||
|
let mut retransmit_queue_count = 0;
|
||||||
|
while let Ok(_blob) = self.retransmit_receiver.recv_timeout(Duration::new(1, 0)) {
|
||||||
|
retransmit_queue_count += 1;
|
||||||
|
}
|
||||||
|
debug!("retransmit channel count: {}", retransmit_queue_count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -105,7 +114,6 @@ mod tests {
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
|
||||||
fn test_replicator_startup() {
|
fn test_replicator_startup() {
|
||||||
logger::setup();
|
logger::setup();
|
||||||
info!("starting replicator test");
|
info!("starting replicator test");
|
||||||
@ -154,7 +162,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let mut num_entries = 0;
|
let mut num_entries = 0;
|
||||||
for _ in 0..10 {
|
for _ in 0..60 {
|
||||||
match read_ledger(replicator_ledger_path, true) {
|
match read_ledger(replicator_ledger_path, true) {
|
||||||
Ok(entries) => {
|
Ok(entries) => {
|
||||||
for _ in entries {
|
for _ in entries {
|
||||||
@ -169,7 +177,7 @@ mod tests {
|
|||||||
info!("error reading ledger: {:?}", e);
|
info!("error reading ledger: {:?}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sleep(Duration::new(1, 0));
|
sleep(Duration::from_millis(300));
|
||||||
let last_id = leader_client.get_last_id();
|
let last_id = leader_client.get_last_id();
|
||||||
leader_client
|
leader_client
|
||||||
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
|
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
|
||||||
|
Reference in New Issue
Block a user