Use struct to pass all Tpu sockets as one argument to Tpu::new() (#21965)
Tpu::new() now matches Tvu::new() in having struct to reduce argument list. Additionally, Rust supports partial moves, so there is no need to clone the Tvu sockets out of Node object.
This commit is contained in:
@ -40,6 +40,13 @@ use {
|
|||||||
|
|
||||||
pub const DEFAULT_TPU_COALESCE_MS: u64 = 5;
|
pub const DEFAULT_TPU_COALESCE_MS: u64 = 5;
|
||||||
|
|
||||||
|
pub struct TpuSockets {
|
||||||
|
pub transactions: Vec<UdpSocket>,
|
||||||
|
pub transaction_forwards: Vec<UdpSocket>,
|
||||||
|
pub vote: Vec<UdpSocket>,
|
||||||
|
pub broadcast: Vec<UdpSocket>,
|
||||||
|
}
|
||||||
|
|
||||||
pub struct Tpu {
|
pub struct Tpu {
|
||||||
fetch_stage: FetchStage,
|
fetch_stage: FetchStage,
|
||||||
sigverify_stage: SigVerifyStage,
|
sigverify_stage: SigVerifyStage,
|
||||||
@ -56,10 +63,7 @@ impl Tpu {
|
|||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
entry_receiver: Receiver<WorkingBankEntry>,
|
entry_receiver: Receiver<WorkingBankEntry>,
|
||||||
retransmit_slots_receiver: RetransmitSlotsReceiver,
|
retransmit_slots_receiver: RetransmitSlotsReceiver,
|
||||||
transactions_sockets: Vec<UdpSocket>,
|
sockets: TpuSockets,
|
||||||
tpu_forwards_sockets: Vec<UdpSocket>,
|
|
||||||
tpu_vote_sockets: Vec<UdpSocket>,
|
|
||||||
broadcast_sockets: Vec<UdpSocket>,
|
|
||||||
subscriptions: &Arc<RpcSubscriptions>,
|
subscriptions: &Arc<RpcSubscriptions>,
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
@ -77,6 +81,13 @@ impl Tpu {
|
|||||||
cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender,
|
cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender,
|
||||||
cost_model: &Arc<RwLock<CostModel>>,
|
cost_model: &Arc<RwLock<CostModel>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
let TpuSockets {
|
||||||
|
transactions: transactions_sockets,
|
||||||
|
transaction_forwards: tpu_forwards_sockets,
|
||||||
|
vote: tpu_vote_sockets,
|
||||||
|
broadcast: broadcast_sockets,
|
||||||
|
} = sockets;
|
||||||
|
|
||||||
let (packet_sender, packet_receiver) = channel();
|
let (packet_sender, packet_receiver) = channel();
|
||||||
let (vote_packet_sender, vote_packet_receiver) = channel();
|
let (vote_packet_sender, vote_packet_receiver) = channel();
|
||||||
let fetch_stage = FetchStage::new_with_sender(
|
let fetch_stage = FetchStage::new_with_sender(
|
||||||
|
@ -78,7 +78,7 @@ pub struct Tvu {
|
|||||||
drop_bank_service: DropBankService,
|
drop_bank_service: DropBankService,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Sockets {
|
pub struct TvuSockets {
|
||||||
pub fetch: Vec<UdpSocket>,
|
pub fetch: Vec<UdpSocket>,
|
||||||
pub repair: UdpSocket,
|
pub repair: UdpSocket,
|
||||||
pub retransmit: Vec<UdpSocket>,
|
pub retransmit: Vec<UdpSocket>,
|
||||||
@ -116,7 +116,7 @@ impl Tvu {
|
|||||||
authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
|
authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
cluster_info: &Arc<ClusterInfo>,
|
cluster_info: &Arc<ClusterInfo>,
|
||||||
sockets: Sockets,
|
sockets: TvuSockets,
|
||||||
blockstore: Arc<Blockstore>,
|
blockstore: Arc<Blockstore>,
|
||||||
ledger_signal_receiver: Receiver<bool>,
|
ledger_signal_receiver: Receiver<bool>,
|
||||||
rpc_subscriptions: &Arc<RpcSubscriptions>,
|
rpc_subscriptions: &Arc<RpcSubscriptions>,
|
||||||
@ -146,7 +146,7 @@ impl Tvu {
|
|||||||
last_full_snapshot_slot: Option<Slot>,
|
last_full_snapshot_slot: Option<Slot>,
|
||||||
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
|
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let Sockets {
|
let TvuSockets {
|
||||||
repair: repair_socket,
|
repair: repair_socket,
|
||||||
fetch: fetch_sockets,
|
fetch: fetch_sockets,
|
||||||
retransmit: retransmit_sockets,
|
retransmit: retransmit_sockets,
|
||||||
@ -464,7 +464,7 @@ pub mod tests {
|
|||||||
&bank_forks,
|
&bank_forks,
|
||||||
&cref1,
|
&cref1,
|
||||||
{
|
{
|
||||||
Sockets {
|
TvuSockets {
|
||||||
repair: target1.sockets.repair,
|
repair: target1.sockets.repair,
|
||||||
retransmit: target1.sockets.retransmit_sockets,
|
retransmit: target1.sockets.retransmit_sockets,
|
||||||
fetch: target1.sockets.tvu,
|
fetch: target1.sockets.tvu,
|
||||||
|
@ -17,8 +17,8 @@ use {
|
|||||||
stats_reporter_service::StatsReporterService,
|
stats_reporter_service::StatsReporterService,
|
||||||
system_monitor_service::{verify_udp_stats_access, SystemMonitorService},
|
system_monitor_service::{verify_udp_stats_access, SystemMonitorService},
|
||||||
tower_storage::TowerStorage,
|
tower_storage::TowerStorage,
|
||||||
tpu::{Tpu, DEFAULT_TPU_COALESCE_MS},
|
tpu::{Tpu, TpuSockets, DEFAULT_TPU_COALESCE_MS},
|
||||||
tvu::{Sockets, Tvu, TvuConfig},
|
tvu::{Tvu, TvuConfig, TvuSockets},
|
||||||
},
|
},
|
||||||
crossbeam_channel::{bounded, unbounded},
|
crossbeam_channel::{bounded, unbounded},
|
||||||
rand::{thread_rng, Rng},
|
rand::{thread_rng, Rng},
|
||||||
@ -822,35 +822,12 @@ impl Validator {
|
|||||||
authorized_voter_keypairs,
|
authorized_voter_keypairs,
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
Sockets {
|
TvuSockets {
|
||||||
repair: node
|
repair: node.sockets.repair,
|
||||||
.sockets
|
retransmit: node.sockets.retransmit_sockets,
|
||||||
.repair
|
fetch: node.sockets.tvu,
|
||||||
.try_clone()
|
forwards: node.sockets.tvu_forwards,
|
||||||
.expect("Failed to clone repair socket"),
|
ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
|
||||||
retransmit: node
|
|
||||||
.sockets
|
|
||||||
.retransmit_sockets
|
|
||||||
.iter()
|
|
||||||
.map(|s| s.try_clone().expect("Failed to clone retransmit socket"))
|
|
||||||
.collect(),
|
|
||||||
fetch: node
|
|
||||||
.sockets
|
|
||||||
.tvu
|
|
||||||
.iter()
|
|
||||||
.map(|s| s.try_clone().expect("Failed to clone TVU Sockets"))
|
|
||||||
.collect(),
|
|
||||||
forwards: node
|
|
||||||
.sockets
|
|
||||||
.tvu_forwards
|
|
||||||
.iter()
|
|
||||||
.map(|s| s.try_clone().expect("Failed to clone TVU forwards Sockets"))
|
|
||||||
.collect(),
|
|
||||||
ancestor_hashes_requests: node
|
|
||||||
.sockets
|
|
||||||
.ancestor_hashes_requests
|
|
||||||
.try_clone()
|
|
||||||
.expect("Failed to clone ancestor_hashes_requests socket"),
|
|
||||||
},
|
},
|
||||||
blockstore.clone(),
|
blockstore.clone(),
|
||||||
ledger_signal_receiver,
|
ledger_signal_receiver,
|
||||||
@ -902,10 +879,12 @@ impl Validator {
|
|||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
entry_receiver,
|
entry_receiver,
|
||||||
retransmit_slots_receiver,
|
retransmit_slots_receiver,
|
||||||
node.sockets.tpu,
|
TpuSockets {
|
||||||
node.sockets.tpu_forwards,
|
transactions: node.sockets.tpu,
|
||||||
node.sockets.tpu_vote,
|
transaction_forwards: node.sockets.tpu_forwards,
|
||||||
node.sockets.broadcast,
|
vote: node.sockets.tpu_vote,
|
||||||
|
broadcast: node.sockets.broadcast,
|
||||||
|
},
|
||||||
&rpc_subscriptions,
|
&rpc_subscriptions,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
&blockstore,
|
&blockstore,
|
||||||
|
Reference in New Issue
Block a user