Free up the term 'replicate' for exclusive use in replicator
Also, align Sockets field names with ContactInfo.
This commit is contained in:
		@@ -1014,8 +1014,8 @@ impl ClusterInfo {
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
pub struct Sockets {
 | 
			
		||||
    pub gossip: UdpSocket,
 | 
			
		||||
    pub replicate: Vec<UdpSocket>,
 | 
			
		||||
    pub transaction: Vec<UdpSocket>,
 | 
			
		||||
    pub tvu: Vec<UdpSocket>,
 | 
			
		||||
    pub tpu: Vec<UdpSocket>,
 | 
			
		||||
    pub broadcast: UdpSocket,
 | 
			
		||||
    pub repair: UdpSocket,
 | 
			
		||||
    pub retransmit: UdpSocket,
 | 
			
		||||
@@ -1033,9 +1033,9 @@ impl Node {
 | 
			
		||||
        Self::new_localhost_with_pubkey(pubkey)
 | 
			
		||||
    }
 | 
			
		||||
    pub fn new_localhost_with_pubkey(pubkey: Pubkey) -> Self {
 | 
			
		||||
        let transaction = UdpSocket::bind("127.0.0.1:0").unwrap();
 | 
			
		||||
        let tpu = UdpSocket::bind("127.0.0.1:0").unwrap();
 | 
			
		||||
        let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
 | 
			
		||||
        let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
 | 
			
		||||
        let tvu = UdpSocket::bind("127.0.0.1:0").unwrap();
 | 
			
		||||
        let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
 | 
			
		||||
        let rpc_port = find_available_port_in_range((1024, 65535)).unwrap();
 | 
			
		||||
        let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port);
 | 
			
		||||
@@ -1049,8 +1049,8 @@ impl Node {
 | 
			
		||||
        let info = NodeInfo::new(
 | 
			
		||||
            pubkey,
 | 
			
		||||
            gossip.local_addr().unwrap(),
 | 
			
		||||
            replicate.local_addr().unwrap(),
 | 
			
		||||
            transaction.local_addr().unwrap(),
 | 
			
		||||
            tvu.local_addr().unwrap(),
 | 
			
		||||
            tpu.local_addr().unwrap(),
 | 
			
		||||
            storage.local_addr().unwrap(),
 | 
			
		||||
            rpc_addr,
 | 
			
		||||
            rpc_pubsub_addr,
 | 
			
		||||
@@ -1060,8 +1060,8 @@ impl Node {
 | 
			
		||||
            info,
 | 
			
		||||
            sockets: Sockets {
 | 
			
		||||
                gossip,
 | 
			
		||||
                replicate: vec![replicate],
 | 
			
		||||
                transaction: vec![transaction],
 | 
			
		||||
                tvu: vec![tvu],
 | 
			
		||||
                tpu: vec![tpu],
 | 
			
		||||
                broadcast,
 | 
			
		||||
                repair,
 | 
			
		||||
                retransmit,
 | 
			
		||||
@@ -1082,10 +1082,10 @@ impl Node {
 | 
			
		||||
            bind()
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let (replicate_port, replicate_sockets) =
 | 
			
		||||
        let (tvu_port, tvu_sockets) =
 | 
			
		||||
            multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tvu multi_bind");
 | 
			
		||||
 | 
			
		||||
        let (transaction_port, transaction_sockets) =
 | 
			
		||||
        let (tpu_port, tpu_sockets) =
 | 
			
		||||
            multi_bind_in_range(FULLNODE_PORT_RANGE, 32).expect("tpu multi_bind");
 | 
			
		||||
 | 
			
		||||
        let (_, repair) = bind();
 | 
			
		||||
@@ -1096,8 +1096,8 @@ impl Node {
 | 
			
		||||
        let info = NodeInfo::new(
 | 
			
		||||
            pubkey,
 | 
			
		||||
            SocketAddr::new(gossip_addr.ip(), gossip_port),
 | 
			
		||||
            SocketAddr::new(gossip_addr.ip(), replicate_port),
 | 
			
		||||
            SocketAddr::new(gossip_addr.ip(), transaction_port),
 | 
			
		||||
            SocketAddr::new(gossip_addr.ip(), tvu_port),
 | 
			
		||||
            SocketAddr::new(gossip_addr.ip(), tpu_port),
 | 
			
		||||
            SocketAddr::new(gossip_addr.ip(), storage_port),
 | 
			
		||||
            SocketAddr::new(gossip_addr.ip(), RPC_PORT),
 | 
			
		||||
            SocketAddr::new(gossip_addr.ip(), RPC_PORT + 1),
 | 
			
		||||
@@ -1109,8 +1109,8 @@ impl Node {
 | 
			
		||||
            info,
 | 
			
		||||
            sockets: Sockets {
 | 
			
		||||
                gossip,
 | 
			
		||||
                replicate: replicate_sockets,
 | 
			
		||||
                transaction: transaction_sockets,
 | 
			
		||||
                tvu: tvu_sockets,
 | 
			
		||||
                tpu: tpu_sockets,
 | 
			
		||||
                broadcast,
 | 
			
		||||
                repair,
 | 
			
		||||
                retransmit,
 | 
			
		||||
@@ -1384,28 +1384,28 @@ mod tests {
 | 
			
		||||
        let ip = Ipv4Addr::from(0);
 | 
			
		||||
        let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(ip, 0));
 | 
			
		||||
        assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip);
 | 
			
		||||
        assert!(node.sockets.replicate.len() > 1);
 | 
			
		||||
        for tx_socket in node.sockets.replicate.iter() {
 | 
			
		||||
        assert!(node.sockets.tvu.len() > 1);
 | 
			
		||||
        for tx_socket in node.sockets.tvu.iter() {
 | 
			
		||||
            assert_eq!(tx_socket.local_addr().unwrap().ip(), ip);
 | 
			
		||||
        }
 | 
			
		||||
        assert!(node.sockets.transaction.len() > 1);
 | 
			
		||||
        for tx_socket in node.sockets.transaction.iter() {
 | 
			
		||||
        assert!(node.sockets.tpu.len() > 1);
 | 
			
		||||
        for tx_socket in node.sockets.tpu.iter() {
 | 
			
		||||
            assert_eq!(tx_socket.local_addr().unwrap().ip(), ip);
 | 
			
		||||
        }
 | 
			
		||||
        assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip);
 | 
			
		||||
 | 
			
		||||
        assert!(node.sockets.gossip.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
 | 
			
		||||
        assert!(node.sockets.gossip.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
 | 
			
		||||
        let tx_port = node.sockets.replicate[0].local_addr().unwrap().port();
 | 
			
		||||
        let tx_port = node.sockets.tvu[0].local_addr().unwrap().port();
 | 
			
		||||
        assert!(tx_port >= FULLNODE_PORT_RANGE.0);
 | 
			
		||||
        assert!(tx_port < FULLNODE_PORT_RANGE.1);
 | 
			
		||||
        for tx_socket in node.sockets.replicate.iter() {
 | 
			
		||||
        for tx_socket in node.sockets.tvu.iter() {
 | 
			
		||||
            assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port);
 | 
			
		||||
        }
 | 
			
		||||
        let tx_port = node.sockets.transaction[0].local_addr().unwrap().port();
 | 
			
		||||
        let tx_port = node.sockets.tpu[0].local_addr().unwrap().port();
 | 
			
		||||
        assert!(tx_port >= FULLNODE_PORT_RANGE.0);
 | 
			
		||||
        assert!(tx_port < FULLNODE_PORT_RANGE.1);
 | 
			
		||||
        for tx_socket in node.sockets.transaction.iter() {
 | 
			
		||||
        for tx_socket in node.sockets.tpu.iter() {
 | 
			
		||||
            assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port);
 | 
			
		||||
        }
 | 
			
		||||
        assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
 | 
			
		||||
@@ -1417,27 +1417,27 @@ mod tests {
 | 
			
		||||
        let ip = IpAddr::V4(Ipv4Addr::from(0));
 | 
			
		||||
        let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(0, 8050));
 | 
			
		||||
        assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip);
 | 
			
		||||
        assert!(node.sockets.replicate.len() > 1);
 | 
			
		||||
        for tx_socket in node.sockets.replicate.iter() {
 | 
			
		||||
        assert!(node.sockets.tvu.len() > 1);
 | 
			
		||||
        for tx_socket in node.sockets.tvu.iter() {
 | 
			
		||||
            assert_eq!(tx_socket.local_addr().unwrap().ip(), ip);
 | 
			
		||||
        }
 | 
			
		||||
        assert!(node.sockets.transaction.len() > 1);
 | 
			
		||||
        for tx_socket in node.sockets.transaction.iter() {
 | 
			
		||||
        assert!(node.sockets.tpu.len() > 1);
 | 
			
		||||
        for tx_socket in node.sockets.tpu.iter() {
 | 
			
		||||
            assert_eq!(tx_socket.local_addr().unwrap().ip(), ip);
 | 
			
		||||
        }
 | 
			
		||||
        assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip);
 | 
			
		||||
 | 
			
		||||
        assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050);
 | 
			
		||||
        let tx_port = node.sockets.replicate[0].local_addr().unwrap().port();
 | 
			
		||||
        let tx_port = node.sockets.tvu[0].local_addr().unwrap().port();
 | 
			
		||||
        assert!(tx_port >= FULLNODE_PORT_RANGE.0);
 | 
			
		||||
        assert!(tx_port < FULLNODE_PORT_RANGE.1);
 | 
			
		||||
        for tx_socket in node.sockets.replicate.iter() {
 | 
			
		||||
        for tx_socket in node.sockets.tvu.iter() {
 | 
			
		||||
            assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port);
 | 
			
		||||
        }
 | 
			
		||||
        let tx_port = node.sockets.transaction[0].local_addr().unwrap().port();
 | 
			
		||||
        let tx_port = node.sockets.tpu[0].local_addr().unwrap().port();
 | 
			
		||||
        assert!(tx_port >= FULLNODE_PORT_RANGE.0);
 | 
			
		||||
        assert!(tx_port < FULLNODE_PORT_RANGE.1);
 | 
			
		||||
        for tx_socket in node.sockets.transaction.iter() {
 | 
			
		||||
        for tx_socket in node.sockets.tpu.iter() {
 | 
			
		||||
            assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port);
 | 
			
		||||
        }
 | 
			
		||||
        assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
 | 
			
		||||
 
 | 
			
		||||
@@ -119,16 +119,16 @@ impl ContactInfo {
 | 
			
		||||
        nxt_addr
 | 
			
		||||
    }
 | 
			
		||||
    pub fn new_with_pubkey_socketaddr(pubkey: Pubkey, bind_addr: &SocketAddr) -> Self {
 | 
			
		||||
        let transactions_addr = *bind_addr;
 | 
			
		||||
        let tpu_addr = *bind_addr;
 | 
			
		||||
        let gossip_addr = Self::next_port(&bind_addr, 1);
 | 
			
		||||
        let replicate_addr = Self::next_port(&bind_addr, 2);
 | 
			
		||||
        let tvu_addr = Self::next_port(&bind_addr, 2);
 | 
			
		||||
        let rpc_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT);
 | 
			
		||||
        let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT + 1);
 | 
			
		||||
        ContactInfo::new(
 | 
			
		||||
            pubkey,
 | 
			
		||||
            gossip_addr,
 | 
			
		||||
            replicate_addr,
 | 
			
		||||
            transactions_addr,
 | 
			
		||||
            tvu_addr,
 | 
			
		||||
            tpu_addr,
 | 
			
		||||
            "0.0.0.0:0".parse().unwrap(),
 | 
			
		||||
            rpc_addr,
 | 
			
		||||
            rpc_pubsub_addr,
 | 
			
		||||
@@ -267,7 +267,7 @@ mod tests {
 | 
			
		||||
        assert!(ci.storage_addr.ip().is_unspecified());
 | 
			
		||||
    }
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn replicated_data_new_with_socketaddr_with_pubkey() {
 | 
			
		||||
    fn replayed_data_new_with_socketaddr_with_pubkey() {
 | 
			
		||||
        let keypair = Keypair::new();
 | 
			
		||||
        let d1 = ContactInfo::new_with_pubkey_socketaddr(
 | 
			
		||||
            keypair.pubkey().clone(),
 | 
			
		||||
 
 | 
			
		||||
@@ -54,12 +54,12 @@ pub fn repair(
 | 
			
		||||
                    // In the case that we are not in the current scope of the leader schedule
 | 
			
		||||
                    // window then either:
 | 
			
		||||
                    //
 | 
			
		||||
                    // 1) The replicate stage hasn't caught up to the "consumed" entries we sent,
 | 
			
		||||
                    // 1) The replay stage hasn't caught up to the "consumed" entries we sent,
 | 
			
		||||
                    // in which case it will eventually catch up
 | 
			
		||||
                    //
 | 
			
		||||
                    // 2) We are on the border between seed_rotation_intervals, so the
 | 
			
		||||
                    // schedule won't be known until the entry on that cusp is received
 | 
			
		||||
                    // by the replicate stage (which comes after this stage). Hence, the next
 | 
			
		||||
                    // by the replay stage (which comes after this stage). Hence, the next
 | 
			
		||||
                    // leader at the beginning of that next epoch will not know they are the
 | 
			
		||||
                    // leader until they receive that last "cusp" entry. The leader also won't ask for repairs
 | 
			
		||||
                    // for that entry because "is_next_leader" won't be set here. In this case,
 | 
			
		||||
 
 | 
			
		||||
@@ -100,10 +100,10 @@ pub struct Fullnode {
 | 
			
		||||
    ledger_path: String,
 | 
			
		||||
    sigverify_disabled: bool,
 | 
			
		||||
    shared_window: SharedWindow,
 | 
			
		||||
    replicate_socket: Vec<UdpSocket>,
 | 
			
		||||
    tvu_sockets: Vec<UdpSocket>,
 | 
			
		||||
    repair_socket: UdpSocket,
 | 
			
		||||
    retransmit_socket: UdpSocket,
 | 
			
		||||
    transaction_sockets: Vec<UdpSocket>,
 | 
			
		||||
    tpu_sockets: Vec<UdpSocket>,
 | 
			
		||||
    broadcast_socket: UdpSocket,
 | 
			
		||||
    rpc_addr: SocketAddr,
 | 
			
		||||
    rpc_pubsub_addr: SocketAddr,
 | 
			
		||||
@@ -277,9 +277,9 @@ impl Fullnode {
 | 
			
		||||
                *last_entry_id,
 | 
			
		||||
                cluster_info.clone(),
 | 
			
		||||
                node.sockets
 | 
			
		||||
                    .replicate
 | 
			
		||||
                    .tvu
 | 
			
		||||
                    .iter()
 | 
			
		||||
                    .map(|s| s.try_clone().expect("Failed to clone replicate sockets"))
 | 
			
		||||
                    .map(|s| s.try_clone().expect("Failed to clone TVU sockets"))
 | 
			
		||||
                    .collect(),
 | 
			
		||||
                node.sockets
 | 
			
		||||
                    .repair
 | 
			
		||||
@@ -294,9 +294,9 @@ impl Fullnode {
 | 
			
		||||
            );
 | 
			
		||||
            let tpu_forwarder = TpuForwarder::new(
 | 
			
		||||
                node.sockets
 | 
			
		||||
                    .transaction
 | 
			
		||||
                    .tpu
 | 
			
		||||
                    .iter()
 | 
			
		||||
                    .map(|s| s.try_clone().expect("Failed to clone transaction sockets"))
 | 
			
		||||
                    .map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
 | 
			
		||||
                    .collect(),
 | 
			
		||||
                cluster_info.clone(),
 | 
			
		||||
            );
 | 
			
		||||
@@ -314,9 +314,9 @@ impl Fullnode {
 | 
			
		||||
                &bank,
 | 
			
		||||
                Default::default(),
 | 
			
		||||
                node.sockets
 | 
			
		||||
                    .transaction
 | 
			
		||||
                    .tpu
 | 
			
		||||
                    .iter()
 | 
			
		||||
                    .map(|s| s.try_clone().expect("Failed to clone transaction sockets"))
 | 
			
		||||
                    .map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
 | 
			
		||||
                    .collect(),
 | 
			
		||||
                ledger_path,
 | 
			
		||||
                sigverify_disabled,
 | 
			
		||||
@@ -356,10 +356,10 @@ impl Fullnode {
 | 
			
		||||
            node_role,
 | 
			
		||||
            ledger_path: ledger_path.to_owned(),
 | 
			
		||||
            exit,
 | 
			
		||||
            replicate_socket: node.sockets.replicate,
 | 
			
		||||
            tvu_sockets: node.sockets.tvu,
 | 
			
		||||
            repair_socket: node.sockets.repair,
 | 
			
		||||
            retransmit_socket: node.sockets.retransmit,
 | 
			
		||||
            transaction_sockets: node.sockets.transaction,
 | 
			
		||||
            tpu_sockets: node.sockets.tpu,
 | 
			
		||||
            broadcast_socket: node.sockets.broadcast,
 | 
			
		||||
            rpc_addr,
 | 
			
		||||
            rpc_pubsub_addr,
 | 
			
		||||
@@ -435,9 +435,9 @@ impl Fullnode {
 | 
			
		||||
                entry_height,
 | 
			
		||||
                last_entry_id,
 | 
			
		||||
                self.cluster_info.clone(),
 | 
			
		||||
                self.replicate_socket
 | 
			
		||||
                self.tvu_sockets
 | 
			
		||||
                    .iter()
 | 
			
		||||
                    .map(|s| s.try_clone().expect("Failed to clone replicate sockets"))
 | 
			
		||||
                    .map(|s| s.try_clone().expect("Failed to clone TVU sockets"))
 | 
			
		||||
                    .collect(),
 | 
			
		||||
                self.repair_socket
 | 
			
		||||
                    .try_clone()
 | 
			
		||||
@@ -449,9 +449,9 @@ impl Fullnode {
 | 
			
		||||
                self.db_ledger.clone(),
 | 
			
		||||
            );
 | 
			
		||||
            let tpu_forwarder = TpuForwarder::new(
 | 
			
		||||
                self.transaction_sockets
 | 
			
		||||
                self.tpu_sockets
 | 
			
		||||
                    .iter()
 | 
			
		||||
                    .map(|s| s.try_clone().expect("Failed to clone transaction sockets"))
 | 
			
		||||
                    .map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
 | 
			
		||||
                    .collect(),
 | 
			
		||||
                self.cluster_info.clone(),
 | 
			
		||||
            );
 | 
			
		||||
@@ -476,15 +476,15 @@ impl Fullnode {
 | 
			
		||||
        let (tpu, blob_receiver, tpu_exit) = Tpu::new(
 | 
			
		||||
            &self.bank,
 | 
			
		||||
            Default::default(),
 | 
			
		||||
            self.transaction_sockets
 | 
			
		||||
            self.tpu_sockets
 | 
			
		||||
                .iter()
 | 
			
		||||
                .map(|s| s.try_clone().expect("Failed to clone transaction sockets"))
 | 
			
		||||
                .map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
 | 
			
		||||
                .collect(),
 | 
			
		||||
            &self.ledger_path,
 | 
			
		||||
            self.sigverify_disabled,
 | 
			
		||||
            max_tick_height,
 | 
			
		||||
            // We pass the last_entry_id from the replicate stage because we can't trust that
 | 
			
		||||
            // the window didn't overwrite the slot at for the last entry that the replicate stage
 | 
			
		||||
            // We pass the last_entry_id from the replay stage because we can't trust that
 | 
			
		||||
            // the window didn't overwrite the slot at for the last entry that the replay stage
 | 
			
		||||
            // processed. We also want to avoid reading processing the ledger for the last id.
 | 
			
		||||
            &last_id,
 | 
			
		||||
            self.keypair.pubkey(),
 | 
			
		||||
@@ -1017,12 +1017,8 @@ mod tests {
 | 
			
		||||
        // Send blobs to the validator from our mock leader
 | 
			
		||||
        let t_responder = {
 | 
			
		||||
            let (s_responder, r_responder) = channel();
 | 
			
		||||
            let blob_sockets: Vec<Arc<UdpSocket>> = leader_node
 | 
			
		||||
                .sockets
 | 
			
		||||
                .replicate
 | 
			
		||||
                .into_iter()
 | 
			
		||||
                .map(Arc::new)
 | 
			
		||||
                .collect();
 | 
			
		||||
            let blob_sockets: Vec<Arc<UdpSocket>> =
 | 
			
		||||
                leader_node.sockets.tvu.into_iter().map(Arc::new).collect();
 | 
			
		||||
 | 
			
		||||
            let t_responder = responder(
 | 
			
		||||
                "test_validator_to_leader_transition",
 | 
			
		||||
 
 | 
			
		||||
@@ -50,7 +50,7 @@ pub mod poh;
 | 
			
		||||
pub mod poh_recorder;
 | 
			
		||||
pub mod poh_service;
 | 
			
		||||
pub mod recvmmsg;
 | 
			
		||||
pub mod replicate_stage;
 | 
			
		||||
pub mod replay_stage;
 | 
			
		||||
pub mod replicator;
 | 
			
		||||
pub mod result;
 | 
			
		||||
pub mod retransmit_stage;
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,4 @@
 | 
			
		||||
//! The `replicate_stage` replicates transactions broadcast by the leader.
 | 
			
		||||
//! The `replay_stage` replays transactions broadcast by the leader.
 | 
			
		||||
 | 
			
		||||
use crate::bank::Bank;
 | 
			
		||||
use crate::cluster_info::ClusterInfo;
 | 
			
		||||
@@ -26,11 +26,11 @@ use std::time::Duration;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, PartialEq, Eq, Clone)]
 | 
			
		||||
pub enum ReplicateStageReturnType {
 | 
			
		||||
pub enum ReplayStageReturnType {
 | 
			
		||||
    LeaderRotation(u64, u64, Hash),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Implement a destructor for the ReplicateStage thread to signal it exited
 | 
			
		||||
// Implement a destructor for the ReplayStage thread to signal it exited
 | 
			
		||||
// even on panics
 | 
			
		||||
struct Finalizer {
 | 
			
		||||
    exit_sender: Arc<AtomicBool>,
 | 
			
		||||
@@ -48,14 +48,14 @@ impl Drop for Finalizer {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct ReplicateStage {
 | 
			
		||||
pub struct ReplayStage {
 | 
			
		||||
    t_responder: JoinHandle<()>,
 | 
			
		||||
    t_replicate: JoinHandle<Option<ReplicateStageReturnType>>,
 | 
			
		||||
    t_replay: JoinHandle<Option<ReplayStageReturnType>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ReplicateStage {
 | 
			
		||||
impl ReplayStage {
 | 
			
		||||
    /// Process entry blobs, already in order
 | 
			
		||||
    fn replicate_requests(
 | 
			
		||||
    fn process_entries(
 | 
			
		||||
        bank: &Arc<Bank>,
 | 
			
		||||
        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
			
		||||
        window_receiver: &EntryReceiver,
 | 
			
		||||
@@ -74,7 +74,7 @@ impl ReplicateStage {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        submit(
 | 
			
		||||
            influxdb::Point::new("replicate-stage")
 | 
			
		||||
            influxdb::Point::new("replay-stage")
 | 
			
		||||
                .add_field("count", influxdb::Value::Integer(entries.len() as i64))
 | 
			
		||||
                .to_owned(),
 | 
			
		||||
        );
 | 
			
		||||
@@ -83,11 +83,11 @@ impl ReplicateStage {
 | 
			
		||||
        let mut num_entries_to_write = entries.len();
 | 
			
		||||
        let now = Instant::now();
 | 
			
		||||
        if !entries.as_slice().verify(last_entry_id) {
 | 
			
		||||
            inc_new_counter_info!("replicate_stage-verify-fail", entries.len());
 | 
			
		||||
            inc_new_counter_info!("replay_stage-verify-fail", entries.len());
 | 
			
		||||
            return Err(Error::BlobError(BlobError::VerificationFailed));
 | 
			
		||||
        }
 | 
			
		||||
        inc_new_counter_info!(
 | 
			
		||||
            "replicate_stage-verify-duration",
 | 
			
		||||
            "replay_stage-verify-duration",
 | 
			
		||||
            duration_as_ms(&now.elapsed()) as usize
 | 
			
		||||
        );
 | 
			
		||||
        let (current_leader, _) = bank
 | 
			
		||||
@@ -128,7 +128,7 @@ impl ReplicateStage {
 | 
			
		||||
            .id;
 | 
			
		||||
 | 
			
		||||
        inc_new_counter_info!(
 | 
			
		||||
            "replicate-transactions",
 | 
			
		||||
            "replay-transactions",
 | 
			
		||||
            entries.iter().map(|x| x.transactions.len()).sum()
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
@@ -164,12 +164,12 @@ impl ReplicateStage {
 | 
			
		||||
        let (vote_blob_sender, vote_blob_receiver) = channel();
 | 
			
		||||
        let (ledger_entry_sender, ledger_entry_receiver) = channel();
 | 
			
		||||
        let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
 | 
			
		||||
        let t_responder = responder("replicate_stage", Arc::new(send), vote_blob_receiver);
 | 
			
		||||
        let t_responder = responder("replay_stage", Arc::new(send), vote_blob_receiver);
 | 
			
		||||
 | 
			
		||||
        let keypair = Arc::new(keypair);
 | 
			
		||||
 | 
			
		||||
        let t_replicate = Builder::new()
 | 
			
		||||
            .name("solana-replicate-stage".to_string())
 | 
			
		||||
        let t_replay = Builder::new()
 | 
			
		||||
            .name("solana-replay-stage".to_string())
 | 
			
		||||
            .spawn(move || {
 | 
			
		||||
                let _exit = Finalizer::new(exit);
 | 
			
		||||
                let now = Instant::now();
 | 
			
		||||
@@ -182,7 +182,7 @@ impl ReplicateStage {
 | 
			
		||||
                        .expect("Scheduled leader id should never be unknown at this point");
 | 
			
		||||
 | 
			
		||||
                    if leader_id == keypair.pubkey() {
 | 
			
		||||
                        return Some(ReplicateStageReturnType::LeaderRotation(
 | 
			
		||||
                        return Some(ReplayStageReturnType::LeaderRotation(
 | 
			
		||||
                            bank.tick_height(),
 | 
			
		||||
                            entry_height_,
 | 
			
		||||
                            // We should never start the TPU / this stage on an exact entry that causes leader
 | 
			
		||||
@@ -201,7 +201,7 @@ impl ReplicateStage {
 | 
			
		||||
                        None
 | 
			
		||||
                    };
 | 
			
		||||
 | 
			
		||||
                    match Self::replicate_requests(
 | 
			
		||||
                    match Self::process_entries(
 | 
			
		||||
                        &bank,
 | 
			
		||||
                        &cluster_info,
 | 
			
		||||
                        &window_receiver,
 | 
			
		||||
@@ -226,19 +226,19 @@ impl ReplicateStage {
 | 
			
		||||
        (
 | 
			
		||||
            Self {
 | 
			
		||||
                t_responder,
 | 
			
		||||
                t_replicate,
 | 
			
		||||
                t_replay,
 | 
			
		||||
            },
 | 
			
		||||
            ledger_entry_receiver,
 | 
			
		||||
        )
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Service for ReplicateStage {
 | 
			
		||||
    type JoinReturnType = Option<ReplicateStageReturnType>;
 | 
			
		||||
impl Service for ReplayStage {
 | 
			
		||||
    type JoinReturnType = Option<ReplayStageReturnType>;
 | 
			
		||||
 | 
			
		||||
    fn join(self) -> thread::Result<Option<ReplicateStageReturnType>> {
 | 
			
		||||
    fn join(self) -> thread::Result<Option<ReplayStageReturnType>> {
 | 
			
		||||
        self.t_responder.join()?;
 | 
			
		||||
        self.t_replicate.join()
 | 
			
		||||
        self.t_replay.join()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -254,7 +254,7 @@ mod test {
 | 
			
		||||
    use crate::ledger::{create_ticks, create_tmp_sample_ledger, LedgerWriter};
 | 
			
		||||
    use crate::logger;
 | 
			
		||||
    use crate::packet::BlobError;
 | 
			
		||||
    use crate::replicate_stage::{ReplicateStage, ReplicateStageReturnType};
 | 
			
		||||
    use crate::replay_stage::{ReplayStage, ReplayStageReturnType};
 | 
			
		||||
    use crate::result::Error;
 | 
			
		||||
    use crate::service::Service;
 | 
			
		||||
    use crate::vote_stage::{send_validator_vote, VoteError};
 | 
			
		||||
@@ -266,10 +266,10 @@ mod test {
 | 
			
		||||
    use std::sync::{Arc, RwLock};
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    pub fn test_replicate_stage_leader_rotation_exit() {
 | 
			
		||||
    pub fn test_replay_stage_leader_rotation_exit() {
 | 
			
		||||
        logger::setup();
 | 
			
		||||
 | 
			
		||||
        // Set up dummy node to host a ReplicateStage
 | 
			
		||||
        // Set up dummy node to host a ReplayStage
 | 
			
		||||
        let my_keypair = Keypair::new();
 | 
			
		||||
        let my_id = my_keypair.pubkey();
 | 
			
		||||
        let my_node = Node::new_localhost_with_pubkey(my_id);
 | 
			
		||||
@@ -281,7 +281,7 @@ mod test {
 | 
			
		||||
        // Create a ledger
 | 
			
		||||
        let num_ending_ticks = 1;
 | 
			
		||||
        let (mint, my_ledger_path, genesis_entries) = create_tmp_sample_ledger(
 | 
			
		||||
            "test_replicate_stage_leader_rotation_exit",
 | 
			
		||||
            "test_replay_stage_leader_rotation_exit",
 | 
			
		||||
            10_000,
 | 
			
		||||
            num_ending_ticks,
 | 
			
		||||
            old_leader_id,
 | 
			
		||||
@@ -327,10 +327,10 @@ mod test {
 | 
			
		||||
        let (bank, _, last_entry_id) =
 | 
			
		||||
            Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
 | 
			
		||||
 | 
			
		||||
        // Set up the replicate stage
 | 
			
		||||
        // Set up the replay stage
 | 
			
		||||
        let (entry_sender, entry_receiver) = channel();
 | 
			
		||||
        let exit = Arc::new(AtomicBool::new(false));
 | 
			
		||||
        let (replicate_stage, ledger_writer_recv) = ReplicateStage::new(
 | 
			
		||||
        let (replay_stage, ledger_writer_recv) = ReplayStage::new(
 | 
			
		||||
            Arc::new(my_keypair),
 | 
			
		||||
            Arc::new(vote_account_keypair),
 | 
			
		||||
            Arc::new(bank),
 | 
			
		||||
@@ -363,14 +363,14 @@ mod test {
 | 
			
		||||
        let expected_last_id = entries_to_send[leader_rotation_index].id;
 | 
			
		||||
        entry_sender.send(entries_to_send.clone()).unwrap();
 | 
			
		||||
 | 
			
		||||
        // Wait for replicate_stage to exit and check return value is correct
 | 
			
		||||
        // Wait for replay_stage to exit and check return value is correct
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            Some(ReplicateStageReturnType::LeaderRotation(
 | 
			
		||||
            Some(ReplayStageReturnType::LeaderRotation(
 | 
			
		||||
                bootstrap_height,
 | 
			
		||||
                expected_entry_height,
 | 
			
		||||
                expected_last_id,
 | 
			
		||||
            )),
 | 
			
		||||
            replicate_stage.join().expect("replicate stage join")
 | 
			
		||||
            replay_stage.join().expect("replay stage join")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        // Check that the entries on the ledger writer channel are correct
 | 
			
		||||
@@ -389,8 +389,8 @@ mod test {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_vote_error_replicate_stage_correctness() {
 | 
			
		||||
        // Set up dummy node to host a ReplicateStage
 | 
			
		||||
    fn test_vote_error_replay_stage_correctness() {
 | 
			
		||||
        // Set up dummy node to host a ReplayStage
 | 
			
		||||
        let my_keypair = Keypair::new();
 | 
			
		||||
        let my_id = my_keypair.pubkey();
 | 
			
		||||
        let my_node = Node::new_localhost_with_pubkey(my_id);
 | 
			
		||||
@@ -401,7 +401,7 @@ mod test {
 | 
			
		||||
 | 
			
		||||
        let num_ending_ticks = 0;
 | 
			
		||||
        let (_, my_ledger_path, genesis_entries) = create_tmp_sample_ledger(
 | 
			
		||||
            "test_vote_error_replicate_stage_correctness",
 | 
			
		||||
            "test_vote_error_replay_stage_correctness",
 | 
			
		||||
            10_000,
 | 
			
		||||
            num_ending_ticks,
 | 
			
		||||
            leader_id,
 | 
			
		||||
@@ -417,12 +417,12 @@ mod test {
 | 
			
		||||
        // Set up the cluster info
 | 
			
		||||
        let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
 | 
			
		||||
 | 
			
		||||
        // Set up the replicate stage
 | 
			
		||||
        // Set up the replay stage
 | 
			
		||||
        let vote_account_keypair = Arc::new(Keypair::new());
 | 
			
		||||
        let bank = Arc::new(bank);
 | 
			
		||||
        let (entry_sender, entry_receiver) = channel();
 | 
			
		||||
        let exit = Arc::new(AtomicBool::new(false));
 | 
			
		||||
        let (replicate_stage, ledger_writer_recv) = ReplicateStage::new(
 | 
			
		||||
        let (replay_stage, ledger_writer_recv) = ReplayStage::new(
 | 
			
		||||
            Arc::new(my_keypair),
 | 
			
		||||
            vote_account_keypair.clone(),
 | 
			
		||||
            bank.clone(),
 | 
			
		||||
@@ -444,7 +444,7 @@ mod test {
 | 
			
		||||
            panic!("Expected validator vote to fail with LeaderInfoNotFound");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Send ReplicateStage an entry, should see it on the ledger writer receiver
 | 
			
		||||
        // Send ReplayStage an entry, should see it on the ledger writer receiver
 | 
			
		||||
        let next_tick = create_ticks(
 | 
			
		||||
            1,
 | 
			
		||||
            genesis_entries
 | 
			
		||||
@@ -454,22 +454,22 @@ mod test {
 | 
			
		||||
        );
 | 
			
		||||
        entry_sender
 | 
			
		||||
            .send(next_tick.clone())
 | 
			
		||||
            .expect("Error sending entry to ReplicateStage");
 | 
			
		||||
            .expect("Error sending entry to ReplayStage");
 | 
			
		||||
        let received_tick = ledger_writer_recv
 | 
			
		||||
            .recv()
 | 
			
		||||
            .expect("Expected to recieve an entry on the ledger writer receiver");
 | 
			
		||||
 | 
			
		||||
        assert_eq!(next_tick, received_tick);
 | 
			
		||||
        drop(entry_sender);
 | 
			
		||||
        replicate_stage
 | 
			
		||||
        replay_stage
 | 
			
		||||
            .join()
 | 
			
		||||
            .expect("Expect successful ReplicateStage exit");
 | 
			
		||||
            .expect("Expect successful ReplayStage exit");
 | 
			
		||||
        let _ignored = remove_dir_all(&my_ledger_path);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_vote_error_replicate_stage_leader_rotation() {
 | 
			
		||||
        // Set up dummy node to host a ReplicateStage
 | 
			
		||||
    fn test_vote_error_replay_stage_leader_rotation() {
 | 
			
		||||
        // Set up dummy node to host a ReplayStage
 | 
			
		||||
        let my_keypair = Keypair::new();
 | 
			
		||||
        let my_id = my_keypair.pubkey();
 | 
			
		||||
        let my_node = Node::new_localhost_with_pubkey(my_id);
 | 
			
		||||
@@ -479,7 +479,7 @@ mod test {
 | 
			
		||||
 | 
			
		||||
        // Create the ledger
 | 
			
		||||
        let (mint, my_ledger_path, genesis_entries) = create_tmp_sample_ledger(
 | 
			
		||||
            "test_vote_error_replicate_stage_leader_rotation",
 | 
			
		||||
            "test_vote_error_replay_stage_leader_rotation",
 | 
			
		||||
            10_000,
 | 
			
		||||
            0,
 | 
			
		||||
            leader_id,
 | 
			
		||||
@@ -529,12 +529,12 @@ mod test {
 | 
			
		||||
        // Set up the cluster info
 | 
			
		||||
        let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
 | 
			
		||||
 | 
			
		||||
        // Set up the replicate stage
 | 
			
		||||
        // Set up the replay stage
 | 
			
		||||
        let vote_account_keypair = Arc::new(vote_account_keypair);
 | 
			
		||||
        let bank = Arc::new(bank);
 | 
			
		||||
        let (entry_sender, entry_receiver) = channel();
 | 
			
		||||
        let exit = Arc::new(AtomicBool::new(false));
 | 
			
		||||
        let (replicate_stage, ledger_writer_recv) = ReplicateStage::new(
 | 
			
		||||
        let (replay_stage, ledger_writer_recv) = ReplayStage::new(
 | 
			
		||||
            Arc::new(my_keypair),
 | 
			
		||||
            vote_account_keypair.clone(),
 | 
			
		||||
            bank.clone(),
 | 
			
		||||
@@ -571,7 +571,7 @@ mod test {
 | 
			
		||||
            last_id = entry.id;
 | 
			
		||||
            entry_sender
 | 
			
		||||
                .send(vec![entry.clone()])
 | 
			
		||||
                .expect("Expected to be able to send entry to ReplicateStage");
 | 
			
		||||
                .expect("Expected to be able to send entry to ReplayStage");
 | 
			
		||||
            // Check that the entries on the ledger writer channel are correct
 | 
			
		||||
            let received_entry = ledger_writer_recv
 | 
			
		||||
                .recv()
 | 
			
		||||
@@ -585,14 +585,14 @@ mod test {
 | 
			
		||||
 | 
			
		||||
        assert_ne!(expected_last_id, Hash::default());
 | 
			
		||||
 | 
			
		||||
        // Wait for replicate_stage to exit and check return value is correct
 | 
			
		||||
        // Wait for replay_stage to exit and check return value is correct
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            Some(ReplicateStageReturnType::LeaderRotation(
 | 
			
		||||
            Some(ReplayStageReturnType::LeaderRotation(
 | 
			
		||||
                bootstrap_height,
 | 
			
		||||
                expected_entry_height,
 | 
			
		||||
                expected_last_id,
 | 
			
		||||
            )),
 | 
			
		||||
            replicate_stage.join().expect("replicate stage join")
 | 
			
		||||
            replay_stage.join().expect("replay stage join")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        assert_eq!(exit.load(Ordering::Relaxed), true);
 | 
			
		||||
@@ -600,8 +600,8 @@ mod test {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_replicate_stage_poh_error_entry_receiver() {
 | 
			
		||||
        // Set up dummy node to host a ReplicateStage
 | 
			
		||||
    fn test_replay_stage_poh_error_entry_receiver() {
 | 
			
		||||
        // Set up dummy node to host a ReplayStage
 | 
			
		||||
        let my_keypair = Keypair::new();
 | 
			
		||||
        let my_id = my_keypair.pubkey();
 | 
			
		||||
        let vote_keypair = Keypair::new();
 | 
			
		||||
@@ -615,7 +615,7 @@ mod test {
 | 
			
		||||
        let old_leader_id = Keypair::new().pubkey();
 | 
			
		||||
 | 
			
		||||
        let (_, my_ledger_path, _) = create_tmp_sample_ledger(
 | 
			
		||||
            "test_replicate_stage_leader_rotation_exit",
 | 
			
		||||
            "test_replay_stage_leader_rotation_exit",
 | 
			
		||||
            10_000,
 | 
			
		||||
            0,
 | 
			
		||||
            old_leader_id,
 | 
			
		||||
@@ -634,7 +634,7 @@ mod test {
 | 
			
		||||
            .send(entries.clone())
 | 
			
		||||
            .expect("Expected to err out");
 | 
			
		||||
 | 
			
		||||
        let res = ReplicateStage::replicate_requests(
 | 
			
		||||
        let res = ReplayStage::process_entries(
 | 
			
		||||
            &Arc::new(Bank::default()),
 | 
			
		||||
            &cluster_info_me,
 | 
			
		||||
            &entry_receiver,
 | 
			
		||||
@@ -660,7 +660,7 @@ mod test {
 | 
			
		||||
            .send(entries.clone())
 | 
			
		||||
            .expect("Expected to err out");
 | 
			
		||||
 | 
			
		||||
        let res = ReplicateStage::replicate_requests(
 | 
			
		||||
        let res = ReplayStage::process_entries(
 | 
			
		||||
            &Arc::new(Bank::default()),
 | 
			
		||||
            &cluster_info_me,
 | 
			
		||||
            &entry_receiver,
 | 
			
		||||
@@ -164,7 +164,7 @@ impl Replicator {
 | 
			
		||||
 | 
			
		||||
        let repair_socket = Arc::new(node.sockets.repair);
 | 
			
		||||
        let mut blob_sockets: Vec<Arc<UdpSocket>> =
 | 
			
		||||
            node.sockets.replicate.into_iter().map(Arc::new).collect();
 | 
			
		||||
            node.sockets.tvu.into_iter().map(Arc::new).collect();
 | 
			
		||||
        blob_sockets.push(repair_socket.clone());
 | 
			
		||||
        let (fetch_stage, blob_fetch_receiver) =
 | 
			
		||||
            BlobFetchStage::new_multi_socket(blob_sockets, exit.clone());
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										54
									
								
								src/tvu.rs
									
									
									
									
									
								
							
							
						
						
									
										54
									
								
								src/tvu.rs
									
									
									
									
									
								
							@@ -1,21 +1,25 @@
 | 
			
		||||
//! The `tvu` module implements the Transaction Validation Unit, a
 | 
			
		||||
//! 3-stage transaction validation pipeline in software.
 | 
			
		||||
//! 5-stage transaction validation pipeline in software.
 | 
			
		||||
//!
 | 
			
		||||
//! 1. Fetch Stage
 | 
			
		||||
//! - Incoming blobs are picked up from the replicate socket and repair socket.
 | 
			
		||||
//! 2. SharedWindow Stage
 | 
			
		||||
//! 1. BlobFetchStage
 | 
			
		||||
//! - Incoming blobs are picked up from the TVU sockets and repair socket.
 | 
			
		||||
//! 2. RetransmitStage
 | 
			
		||||
//! - Blobs are windowed until a contiguous chunk is available.  This stage also repairs and
 | 
			
		||||
//! retransmits blobs that are in the queue.
 | 
			
		||||
//! 3. Replicate Stage
 | 
			
		||||
//! 3. ReplayStage
 | 
			
		||||
//! - Transactions in blobs are processed and applied to the bank.
 | 
			
		||||
//! - TODO We need to verify the signatures in the blobs.
 | 
			
		||||
//! 4. LedgerWriteStage
 | 
			
		||||
//! - Write the replayed ledger to disk.
 | 
			
		||||
//! 5. StorageStage
 | 
			
		||||
//! - Generating the keys used to encrypt the ledger and sample it for storage mining.
 | 
			
		||||
 | 
			
		||||
use crate::bank::Bank;
 | 
			
		||||
use crate::blob_fetch_stage::BlobFetchStage;
 | 
			
		||||
use crate::cluster_info::ClusterInfo;
 | 
			
		||||
use crate::db_ledger::DbLedger;
 | 
			
		||||
use crate::ledger_write_stage::LedgerWriteStage;
 | 
			
		||||
use crate::replicate_stage::{ReplicateStage, ReplicateStageReturnType};
 | 
			
		||||
use crate::replay_stage::{ReplayStage, ReplayStageReturnType};
 | 
			
		||||
use crate::retransmit_stage::RetransmitStage;
 | 
			
		||||
use crate::service::Service;
 | 
			
		||||
use crate::storage_stage::StorageStage;
 | 
			
		||||
@@ -32,9 +36,9 @@ pub enum TvuReturnType {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct Tvu {
 | 
			
		||||
    replicate_stage: ReplicateStage,
 | 
			
		||||
    fetch_stage: BlobFetchStage,
 | 
			
		||||
    retransmit_stage: RetransmitStage,
 | 
			
		||||
    replay_stage: ReplayStage,
 | 
			
		||||
    ledger_write_stage: LedgerWriteStage,
 | 
			
		||||
    storage_stage: StorageStage,
 | 
			
		||||
    exit: Arc<AtomicBool>,
 | 
			
		||||
@@ -50,7 +54,7 @@ impl Tvu {
 | 
			
		||||
    /// * `entry_height` - Initial ledger height
 | 
			
		||||
    /// * `cluster_info` - The cluster_info state.
 | 
			
		||||
    /// * `window` - The window state.
 | 
			
		||||
    /// * `replicate_socket` - my replicate socket
 | 
			
		||||
    /// * `fetch_sockets` - my fetch sockets
 | 
			
		||||
    /// * `repair_socket` - my repair socket
 | 
			
		||||
    /// * `retransmit_socket` - my retransmit socket
 | 
			
		||||
    /// * `ledger_path` - path to the ledger file
 | 
			
		||||
@@ -62,7 +66,7 @@ impl Tvu {
 | 
			
		||||
        entry_height: u64,
 | 
			
		||||
        last_entry_id: Hash,
 | 
			
		||||
        cluster_info: Arc<RwLock<ClusterInfo>>,
 | 
			
		||||
        replicate_sockets: Vec<UdpSocket>,
 | 
			
		||||
        fetch_sockets: Vec<UdpSocket>,
 | 
			
		||||
        repair_socket: UdpSocket,
 | 
			
		||||
        retransmit_socket: UdpSocket,
 | 
			
		||||
        ledger_path: Option<&str>,
 | 
			
		||||
@@ -72,7 +76,7 @@ impl Tvu {
 | 
			
		||||
 | 
			
		||||
        let repair_socket = Arc::new(repair_socket);
 | 
			
		||||
        let mut blob_sockets: Vec<Arc<UdpSocket>> =
 | 
			
		||||
            replicate_sockets.into_iter().map(Arc::new).collect();
 | 
			
		||||
            fetch_sockets.into_iter().map(Arc::new).collect();
 | 
			
		||||
        blob_sockets.push(repair_socket.clone());
 | 
			
		||||
        let (fetch_stage, blob_fetch_receiver) =
 | 
			
		||||
            BlobFetchStage::new_multi_socket(blob_sockets, exit.clone());
 | 
			
		||||
@@ -91,7 +95,7 @@ impl Tvu {
 | 
			
		||||
            bank.leader_scheduler.clone(),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        let (replicate_stage, ledger_entry_receiver) = ReplicateStage::new(
 | 
			
		||||
        let (replay_stage, ledger_entry_receiver) = ReplayStage::new(
 | 
			
		||||
            keypair.clone(),
 | 
			
		||||
            vote_account_keypair,
 | 
			
		||||
            bank.clone(),
 | 
			
		||||
@@ -115,9 +119,9 @@ impl Tvu {
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        Tvu {
 | 
			
		||||
            replicate_stage,
 | 
			
		||||
            fetch_stage,
 | 
			
		||||
            retransmit_stage,
 | 
			
		||||
            replay_stage,
 | 
			
		||||
            ledger_write_stage,
 | 
			
		||||
            storage_stage,
 | 
			
		||||
            exit,
 | 
			
		||||
@@ -146,8 +150,8 @@ impl Service for Tvu {
 | 
			
		||||
        self.fetch_stage.join()?;
 | 
			
		||||
        self.ledger_write_stage.join()?;
 | 
			
		||||
        self.storage_stage.join()?;
 | 
			
		||||
        match self.replicate_stage.join()? {
 | 
			
		||||
            Some(ReplicateStageReturnType::LeaderRotation(
 | 
			
		||||
        match self.replay_stage.join()? {
 | 
			
		||||
            Some(ReplayStageReturnType::LeaderRotation(
 | 
			
		||||
                tick_height,
 | 
			
		||||
                entry_height,
 | 
			
		||||
                last_entry_id,
 | 
			
		||||
@@ -200,10 +204,10 @@ pub mod tests {
 | 
			
		||||
        (gossip_service, window)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Test that message sent from leader to target1 and replicated to target2
 | 
			
		||||
    /// Test that message sent from leader to target1 and replayed to target2
 | 
			
		||||
    #[test]
 | 
			
		||||
    #[ignore]
 | 
			
		||||
    fn test_replicate() {
 | 
			
		||||
    fn test_replay() {
 | 
			
		||||
        logger::setup();
 | 
			
		||||
        let leader = Node::new_localhost();
 | 
			
		||||
        let target1_keypair = Keypair::new();
 | 
			
		||||
@@ -230,26 +234,22 @@ pub mod tests {
 | 
			
		||||
        // to simulate the source peer and get blobs out of the socket to
 | 
			
		||||
        // simulate target peer
 | 
			
		||||
        let (s_reader, r_reader) = channel();
 | 
			
		||||
        let blob_sockets: Vec<Arc<UdpSocket>> = target2
 | 
			
		||||
            .sockets
 | 
			
		||||
            .replicate
 | 
			
		||||
            .into_iter()
 | 
			
		||||
            .map(Arc::new)
 | 
			
		||||
            .collect();
 | 
			
		||||
        let blob_sockets: Vec<Arc<UdpSocket>> =
 | 
			
		||||
            target2.sockets.tvu.into_iter().map(Arc::new).collect();
 | 
			
		||||
 | 
			
		||||
        let t_receiver = streamer::blob_receiver(blob_sockets[0].clone(), exit.clone(), s_reader);
 | 
			
		||||
 | 
			
		||||
        // simulate leader sending messages
 | 
			
		||||
        let (s_responder, r_responder) = channel();
 | 
			
		||||
        let t_responder = streamer::responder(
 | 
			
		||||
            "test_replicate",
 | 
			
		||||
            "test_replay",
 | 
			
		||||
            Arc::new(leader.sockets.retransmit),
 | 
			
		||||
            r_responder,
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        let starting_balance = 10_000;
 | 
			
		||||
        let mint = Mint::new(starting_balance);
 | 
			
		||||
        let replicate_addr = target1.info.tvu;
 | 
			
		||||
        let tvu_addr = target1.info.tvu;
 | 
			
		||||
        let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
 | 
			
		||||
            leader_id,
 | 
			
		||||
        )));
 | 
			
		||||
@@ -266,7 +266,7 @@ pub mod tests {
 | 
			
		||||
 | 
			
		||||
        let vote_account_keypair = Arc::new(Keypair::new());
 | 
			
		||||
        let mut cur_hash = Hash::default();
 | 
			
		||||
        let db_ledger_path = get_tmp_ledger_path("test_replicate");
 | 
			
		||||
        let db_ledger_path = get_tmp_ledger_path("test_replay");
 | 
			
		||||
        let db_ledger =
 | 
			
		||||
            DbLedger::open(&db_ledger_path).expect("Expected to successfully open ledger");
 | 
			
		||||
        let tvu = Tvu::new(
 | 
			
		||||
@@ -276,7 +276,7 @@ pub mod tests {
 | 
			
		||||
            0,
 | 
			
		||||
            cur_hash,
 | 
			
		||||
            cref1,
 | 
			
		||||
            target1.sockets.replicate,
 | 
			
		||||
            target1.sockets.tvu,
 | 
			
		||||
            target1.sockets.repair,
 | 
			
		||||
            target1.sockets.retransmit,
 | 
			
		||||
            None,
 | 
			
		||||
@@ -324,7 +324,7 @@ pub mod tests {
 | 
			
		||||
 | 
			
		||||
                    w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry);
 | 
			
		||||
                    w.set_size(serialized_entry.len());
 | 
			
		||||
                    w.meta.set_addr(&replicate_addr);
 | 
			
		||||
                    w.meta.set_addr(&tvu_addr);
 | 
			
		||||
                }
 | 
			
		||||
                msgs.push(b);
 | 
			
		||||
            }
 | 
			
		||||
 
 | 
			
		||||
@@ -75,7 +75,7 @@ pub fn send_validator_vote(
 | 
			
		||||
    let last_id = bank.last_id();
 | 
			
		||||
 | 
			
		||||
    let shared_blob = create_new_signed_vote_blob(&last_id, vote_account, bank, cluster_info)?;
 | 
			
		||||
    inc_new_counter_info!("replicate-vote_sent", 1);
 | 
			
		||||
    inc_new_counter_info!("validator-vote_sent", 1);
 | 
			
		||||
    vote_blob_sender.send(vec![shared_blob])?;
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
 
 | 
			
		||||
@@ -156,12 +156,12 @@ impl WindowUtil for Window {
 | 
			
		||||
                        // In the case that we are not in the current scope of the leader schedule
 | 
			
		||||
                        // window then either:
 | 
			
		||||
                        //
 | 
			
		||||
                        // 1) The replicate stage hasn't caught up to the "consumed" entries we sent,
 | 
			
		||||
                        // 1) The replay stage hasn't caught up to the "consumed" entries we sent,
 | 
			
		||||
                        // in which case it will eventually catch up
 | 
			
		||||
                        //
 | 
			
		||||
                        // 2) We are on the border between seed_rotation_intervals, so the
 | 
			
		||||
                        // schedule won't be known until the entry on that cusp is received
 | 
			
		||||
                        // by the replicate stage (which comes after this stage). Hence, the next
 | 
			
		||||
                        // by the replay stage (which comes after this stage). Hence, the next
 | 
			
		||||
                        // leader at the beginning of that next epoch will not know they are the
 | 
			
		||||
                        // leader until they receive that last "cusp" entry. The leader also won't ask for repairs
 | 
			
		||||
                        // for that entry because "is_next_leader" won't be set here. In this case,
 | 
			
		||||
 
 | 
			
		||||
@@ -295,7 +295,7 @@ mod test {
 | 
			
		||||
        let t_responder = {
 | 
			
		||||
            let (s_responder, r_responder) = channel();
 | 
			
		||||
            let blob_sockets: Vec<Arc<UdpSocket>> =
 | 
			
		||||
                tn.sockets.replicate.into_iter().map(Arc::new).collect();
 | 
			
		||||
                tn.sockets.tvu.into_iter().map(Arc::new).collect();
 | 
			
		||||
 | 
			
		||||
            let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
 | 
			
		||||
            let num_blobs_to_make = 10;
 | 
			
		||||
@@ -365,7 +365,7 @@ mod test {
 | 
			
		||||
        let t_responder = {
 | 
			
		||||
            let (s_responder, r_responder) = channel();
 | 
			
		||||
            let blob_sockets: Vec<Arc<UdpSocket>> =
 | 
			
		||||
                tn.sockets.replicate.into_iter().map(Arc::new).collect();
 | 
			
		||||
                tn.sockets.tvu.into_iter().map(Arc::new).collect();
 | 
			
		||||
            let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
 | 
			
		||||
            let mut msgs = Vec::new();
 | 
			
		||||
            for v in 0..10 {
 | 
			
		||||
 
 | 
			
		||||
@@ -24,7 +24,7 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, GossipService,
 | 
			
		||||
    let w = Arc::new(RwLock::new(vec![]));
 | 
			
		||||
    let d = GossipService::new(&c.clone(), w, None, tn.sockets.gossip, exit);
 | 
			
		||||
    let _ = c.read().unwrap().my_data();
 | 
			
		||||
    (c, d, tn.sockets.replicate.pop().unwrap())
 | 
			
		||||
    (c, d, tn.sockets.tvu.pop().unwrap())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Test that the network converges.
 | 
			
		||||
 
 | 
			
		||||
@@ -1548,7 +1548,7 @@ fn test_broadcast_last_tick() {
 | 
			
		||||
        .iter_mut()
 | 
			
		||||
        .map(|(_, _, node, _)| {
 | 
			
		||||
            BlobFetchStage::new(
 | 
			
		||||
                Arc::new(node.sockets.replicate.pop().unwrap()),
 | 
			
		||||
                Arc::new(node.sockets.tvu.pop().unwrap()),
 | 
			
		||||
                blob_receiver_exit.clone(),
 | 
			
		||||
            )
 | 
			
		||||
        })
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user