Delete expensive integration test in unit-test suite
This commit is contained in:
136
src/fullnode.rs
136
src/fullnode.rs
@ -521,18 +521,13 @@ impl Service for Fullnode {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::blob_fetch_stage::BlobFetchStage;
|
|
||||||
use crate::blocktree::{
|
use crate::blocktree::{
|
||||||
create_tmp_sample_ledger, tmp_copy_ledger, BlocktreeConfig, DEFAULT_SLOT_HEIGHT,
|
create_tmp_sample_ledger, tmp_copy_ledger, BlocktreeConfig, DEFAULT_SLOT_HEIGHT,
|
||||||
};
|
};
|
||||||
use crate::entry::make_consecutive_blobs;
|
use crate::entry::make_consecutive_blobs;
|
||||||
use crate::entry::EntrySlice;
|
|
||||||
use crate::gossip_service::{converge, make_listening_node};
|
|
||||||
use crate::leader_scheduler::make_active_set_entries;
|
use crate::leader_scheduler::make_active_set_entries;
|
||||||
use crate::streamer::responder;
|
use crate::streamer::responder;
|
||||||
use std::fs::remove_dir_all;
|
use std::fs::remove_dir_all;
|
||||||
use std::sync::atomic::Ordering;
|
|
||||||
use std::thread::sleep;
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn validator_exit() {
|
fn validator_exit() {
|
||||||
@ -854,137 +849,6 @@ mod tests {
|
|||||||
let _ignored = remove_dir_all(&validator_ledger_path).unwrap();
|
let _ignored = remove_dir_all(&validator_ledger_path).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_tvu_behind() {
|
|
||||||
solana_logger::setup();
|
|
||||||
|
|
||||||
// Make leader node
|
|
||||||
let ticks_per_slot = 5;
|
|
||||||
let slots_per_epoch = 1;
|
|
||||||
let leader_keypair = Arc::new(Keypair::new());
|
|
||||||
let validator_keypair = Arc::new(Keypair::new());
|
|
||||||
|
|
||||||
info!("leader: {:?}", leader_keypair.pubkey());
|
|
||||||
info!("validator: {:?}", validator_keypair.pubkey());
|
|
||||||
|
|
||||||
let mut fullnode_config = FullnodeConfig::default();
|
|
||||||
fullnode_config.leader_scheduler_config = LeaderSchedulerConfig::new(
|
|
||||||
ticks_per_slot,
|
|
||||||
slots_per_epoch,
|
|
||||||
ticks_per_slot * slots_per_epoch,
|
|
||||||
);
|
|
||||||
let config = PohServiceConfig::Sleep(Duration::from_millis(200));
|
|
||||||
fullnode_config.tick_config = config;
|
|
||||||
|
|
||||||
let (leader_node, _, leader_ledger_path, _, _) = setup_leader_validator(
|
|
||||||
&leader_keypair,
|
|
||||||
&validator_keypair,
|
|
||||||
1,
|
|
||||||
0,
|
|
||||||
"test_tvu_behind",
|
|
||||||
&fullnode_config.ledger_config(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let leader_node_info = leader_node.info.clone();
|
|
||||||
|
|
||||||
info!("Start up a listener");
|
|
||||||
let blob_receiver_exit = Arc::new(AtomicBool::new(false));
|
|
||||||
let (_, _, mut listening_node, _) = make_listening_node(&leader_node.info);
|
|
||||||
let (blob_fetch_sender, blob_fetch_receiver) = channel();
|
|
||||||
let blob_fetch_stage = BlobFetchStage::new(
|
|
||||||
Arc::new(listening_node.sockets.tvu.pop().unwrap()),
|
|
||||||
&blob_fetch_sender,
|
|
||||||
blob_receiver_exit.clone(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let voting_keypair = VotingKeypair::new_local(&leader_keypair);
|
|
||||||
info!("Start the bootstrap leader");
|
|
||||||
let leader = Fullnode::new(
|
|
||||||
leader_node,
|
|
||||||
&leader_keypair,
|
|
||||||
&leader_ledger_path,
|
|
||||||
voting_keypair,
|
|
||||||
Some(&leader_node_info),
|
|
||||||
&fullnode_config,
|
|
||||||
);
|
|
||||||
|
|
||||||
let (rotation_sender, rotation_receiver) = channel();
|
|
||||||
|
|
||||||
info!("Pause the Tvu");
|
|
||||||
let pause_tvu = leader.node_services.tvu.get_pause();
|
|
||||||
pause_tvu.store(true, Ordering::Relaxed);
|
|
||||||
|
|
||||||
// Wait for convergence
|
|
||||||
converge(&leader_node_info, 2);
|
|
||||||
|
|
||||||
// Wait for Tpu bank to progress while the Tvu bank is stuck
|
|
||||||
sleep(Duration::from_millis(1000));
|
|
||||||
|
|
||||||
info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)");
|
|
||||||
{
|
|
||||||
let w_last_ids = leader.bank.last_ids().write().unwrap();
|
|
||||||
assert!(w_last_ids.tick_height < ticks_per_slot - 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear the blobs we've received so far. After this rotation, we should
|
|
||||||
// no longer receive blobs from slot 0
|
|
||||||
while let Ok(_) = blob_fetch_receiver.try_recv() {}
|
|
||||||
|
|
||||||
let leader_exit = leader.run(Some(rotation_sender));
|
|
||||||
|
|
||||||
// Wait for Tpu bank to progress while the Tvu bank is stuck
|
|
||||||
sleep(Duration::from_millis(1000));
|
|
||||||
|
|
||||||
// Tvu bank lock is released here, so tvu should start making progress again and should signal a
|
|
||||||
// rotation. After rotation it will still be the slot leader as a new leader schedule has
|
|
||||||
// not been computed yet (still in epoch 0). In the next epoch (epoch 1), the node will
|
|
||||||
// transition to a validator.
|
|
||||||
info!("Unpause the Tvu");
|
|
||||||
pause_tvu.store(false, Ordering::Relaxed);
|
|
||||||
let expected_rotations = vec![(
|
|
||||||
FullnodeReturnType::LeaderToValidatorRotation,
|
|
||||||
ticks_per_slot,
|
|
||||||
)];
|
|
||||||
|
|
||||||
for expected_rotation in expected_rotations {
|
|
||||||
loop {
|
|
||||||
let transition = rotation_receiver.recv().unwrap();
|
|
||||||
info!("leader transition: {:?}", transition);
|
|
||||||
assert_eq!(expected_rotation, transition);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Shut down");
|
|
||||||
leader_exit();
|
|
||||||
|
|
||||||
// Make sure that after rotation we don't receive any blobs from slot 0 (make sure
|
|
||||||
// broadcast started again at the correct place)
|
|
||||||
while let Ok(new_blobs) = blob_fetch_receiver.try_recv() {
|
|
||||||
for blob in new_blobs {
|
|
||||||
assert_ne!(blob.read().unwrap().slot(), 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check the ledger to make sure the PoH chains
|
|
||||||
{
|
|
||||||
let blocktree =
|
|
||||||
Blocktree::open_config(&leader_ledger_path, &fullnode_config.ledger_config())
|
|
||||||
.unwrap();
|
|
||||||
let entries: Vec<_> = (0..3)
|
|
||||||
.flat_map(|slot_height| blocktree.get_slot_entries(slot_height, 0, None).unwrap())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
assert!(entries[1..].verify(&entries[0].id))
|
|
||||||
}
|
|
||||||
|
|
||||||
blob_receiver_exit.store(true, Ordering::Relaxed);
|
|
||||||
blob_fetch_stage.join().unwrap();
|
|
||||||
|
|
||||||
Blocktree::destroy(&leader_ledger_path).expect("Expected successful database destruction");
|
|
||||||
let _ignored = remove_dir_all(&leader_ledger_path).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn setup_leader_validator(
|
fn setup_leader_validator(
|
||||||
leader_keypair: &Arc<Keypair>,
|
leader_keypair: &Arc<Keypair>,
|
||||||
validator_keypair: &Arc<Keypair>,
|
validator_keypair: &Arc<Keypair>,
|
||||||
|
@ -22,8 +22,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
|||||||
use std::sync::mpsc::RecvTimeoutError;
|
use std::sync::mpsc::RecvTimeoutError;
|
||||||
use std::sync::mpsc::{channel, Receiver};
|
use std::sync::mpsc::{channel, Receiver};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
#[cfg(test)]
|
|
||||||
use std::thread::sleep;
|
|
||||||
use std::thread::{self, Builder, JoinHandle};
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
@ -51,8 +49,6 @@ impl Drop for Finalizer {
|
|||||||
pub struct ReplayStage {
|
pub struct ReplayStage {
|
||||||
t_replay: JoinHandle<()>,
|
t_replay: JoinHandle<()>,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
#[cfg(test)]
|
|
||||||
pause: Arc<AtomicBool>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ReplayStage {
|
impl ReplayStage {
|
||||||
@ -184,12 +180,6 @@ impl ReplayStage {
|
|||||||
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
||||||
) -> (Self, EntryReceiver) {
|
) -> (Self, EntryReceiver) {
|
||||||
let (ledger_entry_sender, ledger_entry_receiver) = channel();
|
let (ledger_entry_sender, ledger_entry_receiver) = channel();
|
||||||
#[cfg(test)]
|
|
||||||
let (pause, pause_) = {
|
|
||||||
let pause = Arc::new(AtomicBool::new(false));
|
|
||||||
let pause_ = pause.clone();
|
|
||||||
(pause, pause_)
|
|
||||||
};
|
|
||||||
let exit_ = exit.clone();
|
let exit_ = exit.clone();
|
||||||
let leader_scheduler_ = leader_scheduler.clone();
|
let leader_scheduler_ = leader_scheduler.clone();
|
||||||
let to_leader_sender = to_leader_sender.clone();
|
let to_leader_sender = to_leader_sender.clone();
|
||||||
@ -219,10 +209,6 @@ impl ReplayStage {
|
|||||||
if exit_.load(Ordering::Relaxed) {
|
if exit_.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
#[cfg(test)]
|
|
||||||
while pause_.load(Ordering::Relaxed) {
|
|
||||||
sleep(Duration::from_millis(200));
|
|
||||||
}
|
|
||||||
let timer = Duration::from_millis(100);
|
let timer = Duration::from_millis(100);
|
||||||
let e = ledger_signal_receiver.recv_timeout(timer);
|
let e = ledger_signal_receiver.recv_timeout(timer);
|
||||||
match e {
|
match e {
|
||||||
@ -308,20 +294,7 @@ impl ReplayStage {
|
|||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
(
|
(Self { t_replay, exit }, ledger_entry_receiver)
|
||||||
Self {
|
|
||||||
t_replay,
|
|
||||||
exit,
|
|
||||||
#[cfg(test)]
|
|
||||||
pause,
|
|
||||||
},
|
|
||||||
ledger_entry_receiver,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
pub fn get_pause(&self) -> Arc<AtomicBool> {
|
|
||||||
self.pause.clone()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn close(self) -> thread::Result<()> {
|
pub fn close(self) -> thread::Result<()> {
|
||||||
|
@ -167,11 +167,6 @@ impl Tvu {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
pub fn get_pause(&self) -> Arc<AtomicBool> {
|
|
||||||
self.replay_stage.get_pause()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_state(&self) -> Hash {
|
pub fn get_state(&self) -> Hash {
|
||||||
*self.last_entry_id.read().unwrap()
|
*self.last_entry_id.read().unwrap()
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user