RetransmitStage now gets a BankForks
This commit is contained in:
@ -268,7 +268,7 @@ impl Fullnode {
|
|||||||
|
|
||||||
let tvu = Tvu::new(
|
let tvu = Tvu::new(
|
||||||
voting_keypair_option,
|
voting_keypair_option,
|
||||||
&bank,
|
&bank_forks,
|
||||||
blob_index,
|
blob_index,
|
||||||
entry_height,
|
entry_height,
|
||||||
last_entry_id,
|
last_entry_id,
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
//! The `retransmit_stage` retransmits blobs between validators
|
//! The `retransmit_stage` retransmits blobs between validators
|
||||||
|
|
||||||
|
use crate::bank_forks::BankForks;
|
||||||
use crate::blocktree::Blocktree;
|
use crate::blocktree::Blocktree;
|
||||||
use crate::cluster_info::{
|
use crate::cluster_info::{
|
||||||
compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY,
|
compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY,
|
||||||
@ -14,7 +15,6 @@ use crate::window_service::WindowService;
|
|||||||
use log::Level;
|
use log::Level;
|
||||||
use solana_metrics::counter::Counter;
|
use solana_metrics::counter::Counter;
|
||||||
use solana_metrics::{influxdb, submit};
|
use solana_metrics::{influxdb, submit};
|
||||||
use solana_runtime::bank::Bank;
|
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
@ -24,7 +24,7 @@ use std::thread::{self, Builder, JoinHandle};
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
fn retransmit(
|
fn retransmit(
|
||||||
bank: &Arc<Bank>,
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
r: &BlobReceiver,
|
r: &BlobReceiver,
|
||||||
sock: &UdpSocket,
|
sock: &UdpSocket,
|
||||||
@ -41,7 +41,7 @@ fn retransmit(
|
|||||||
.to_owned(),
|
.to_owned(),
|
||||||
);
|
);
|
||||||
let (neighbors, children) = compute_retransmit_peers(
|
let (neighbors, children) = compute_retransmit_peers(
|
||||||
&bank.staked_nodes(),
|
&bank_forks.read().unwrap().working_bank().staked_nodes(),
|
||||||
cluster_info,
|
cluster_info,
|
||||||
DATA_PLANE_FANOUT,
|
DATA_PLANE_FANOUT,
|
||||||
NEIGHBORHOOD_SIZE,
|
NEIGHBORHOOD_SIZE,
|
||||||
@ -76,7 +76,7 @@ fn copy_for_neighbors(b: &SharedBlob) -> SharedBlob {
|
|||||||
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
|
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
|
||||||
fn retransmitter(
|
fn retransmitter(
|
||||||
sock: Arc<UdpSocket>,
|
sock: Arc<UdpSocket>,
|
||||||
bank: Arc<Bank>,
|
bank_forks: Arc<RwLock<BankForks>>,
|
||||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||||
r: BlobReceiver,
|
r: BlobReceiver,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
@ -85,7 +85,7 @@ fn retransmitter(
|
|||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
trace!("retransmitter started");
|
trace!("retransmitter started");
|
||||||
loop {
|
loop {
|
||||||
if let Err(e) = retransmit(&bank, &cluster_info, &r, &sock) {
|
if let Err(e) = retransmit(&bank_forks, &cluster_info, &r, &sock) {
|
||||||
match e {
|
match e {
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
@ -108,7 +108,7 @@ pub struct RetransmitStage {
|
|||||||
impl RetransmitStage {
|
impl RetransmitStage {
|
||||||
#[allow(clippy::new_ret_no_self)]
|
#[allow(clippy::new_ret_no_self)]
|
||||||
pub fn new(
|
pub fn new(
|
||||||
bank: &Arc<Bank>,
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
blocktree: Arc<Blocktree>,
|
blocktree: Arc<Blocktree>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
retransmit_socket: Arc<UdpSocket>,
|
retransmit_socket: Arc<UdpSocket>,
|
||||||
@ -121,7 +121,7 @@ impl RetransmitStage {
|
|||||||
|
|
||||||
let t_retransmit = retransmitter(
|
let t_retransmit = retransmitter(
|
||||||
retransmit_socket,
|
retransmit_socket,
|
||||||
bank.clone(),
|
bank_forks.clone(),
|
||||||
cluster_info.clone(),
|
cluster_info.clone(),
|
||||||
retransmit_receiver,
|
retransmit_receiver,
|
||||||
);
|
);
|
||||||
|
16
src/tvu.rs
16
src/tvu.rs
@ -12,6 +12,7 @@
|
|||||||
//! 4. StorageStage
|
//! 4. StorageStage
|
||||||
//! - Generating the keys used to encrypt the ledger and sample it for storage mining.
|
//! - Generating the keys used to encrypt the ledger and sample it for storage mining.
|
||||||
|
|
||||||
|
use crate::bank_forks::BankForks;
|
||||||
use crate::blob_fetch_stage::BlobFetchStage;
|
use crate::blob_fetch_stage::BlobFetchStage;
|
||||||
use crate::blocktree::Blocktree;
|
use crate::blocktree::Blocktree;
|
||||||
use crate::cluster_info::ClusterInfo;
|
use crate::cluster_info::ClusterInfo;
|
||||||
@ -23,7 +24,6 @@ use crate::rpc_subscriptions::RpcSubscriptions;
|
|||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::storage_stage::{StorageStage, StorageState};
|
use crate::storage_stage::{StorageStage, StorageState};
|
||||||
use crate::voting_keypair::VotingKeypair;
|
use crate::voting_keypair::VotingKeypair;
|
||||||
use solana_runtime::bank::Bank;
|
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
@ -70,7 +70,7 @@ impl Tvu {
|
|||||||
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
|
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
|
||||||
pub fn new(
|
pub fn new(
|
||||||
voting_keypair: Option<Arc<VotingKeypair>>,
|
voting_keypair: Option<Arc<VotingKeypair>>,
|
||||||
bank: &Arc<Bank>,
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
blob_index: u64,
|
blob_index: u64,
|
||||||
entry_height: u64,
|
entry_height: u64,
|
||||||
last_entry_id: Hash,
|
last_entry_id: Hash,
|
||||||
@ -111,7 +111,7 @@ impl Tvu {
|
|||||||
//the packets coming out of blob_receiver need to be sent to the GPU and verified
|
//the packets coming out of blob_receiver need to be sent to the GPU and verified
|
||||||
//then sent to the window, which does the erasure coding reconstruction
|
//then sent to the window, which does the erasure coding reconstruction
|
||||||
let retransmit_stage = RetransmitStage::new(
|
let retransmit_stage = RetransmitStage::new(
|
||||||
bank,
|
&bank_forks,
|
||||||
blocktree.clone(),
|
blocktree.clone(),
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
Arc::new(retransmit_socket),
|
Arc::new(retransmit_socket),
|
||||||
@ -121,6 +121,7 @@ impl Tvu {
|
|||||||
exit.clone(),
|
exit.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let bank = bank_forks.read().unwrap().working_bank();
|
||||||
let (replay_stage, mut previous_receiver) = ReplayStage::new(
|
let (replay_stage, mut previous_receiver) = ReplayStage::new(
|
||||||
keypair.pubkey(),
|
keypair.pubkey(),
|
||||||
voting_keypair,
|
voting_keypair,
|
||||||
@ -210,6 +211,7 @@ pub mod tests {
|
|||||||
use crate::cluster_info::{ClusterInfo, Node};
|
use crate::cluster_info::{ClusterInfo, Node};
|
||||||
use crate::leader_scheduler::LeaderSchedulerConfig;
|
use crate::leader_scheduler::LeaderSchedulerConfig;
|
||||||
use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT;
|
use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT;
|
||||||
|
use solana_runtime::bank::Bank;
|
||||||
use solana_sdk::genesis_block::GenesisBlock;
|
use solana_sdk::genesis_block::GenesisBlock;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -221,9 +223,11 @@ pub mod tests {
|
|||||||
|
|
||||||
let starting_balance = 10_000;
|
let starting_balance = 10_000;
|
||||||
let (genesis_block, _mint_keypair) = GenesisBlock::new(starting_balance);
|
let (genesis_block, _mint_keypair) = GenesisBlock::new(starting_balance);
|
||||||
let bank = Arc::new(Bank::new(&genesis_block));
|
|
||||||
|
let bank_forks = BankForks::new(0, Bank::new(&genesis_block));
|
||||||
let leader_scheduler_config = LeaderSchedulerConfig::default();
|
let leader_scheduler_config = LeaderSchedulerConfig::default();
|
||||||
let leader_scheduler = LeaderScheduler::new_with_bank(&leader_scheduler_config, &bank);
|
let leader_scheduler =
|
||||||
|
LeaderScheduler::new_with_bank(&leader_scheduler_config, &bank_forks.working_bank());
|
||||||
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
|
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
|
||||||
|
|
||||||
//start cluster_info1
|
//start cluster_info1
|
||||||
@ -241,7 +245,7 @@ pub mod tests {
|
|||||||
let (sender, _receiver) = channel();
|
let (sender, _receiver) = channel();
|
||||||
let tvu = Tvu::new(
|
let tvu = Tvu::new(
|
||||||
Some(Arc::new(voting_keypair)),
|
Some(Arc::new(voting_keypair)),
|
||||||
&bank,
|
&Arc::new(RwLock::new(bank_forks)),
|
||||||
0,
|
0,
|
||||||
0,
|
0,
|
||||||
cur_hash,
|
cur_hash,
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
use log::trace;
|
use log::trace;
|
||||||
|
use solana::bank_forks::BankForks;
|
||||||
use solana::blocktree::Blocktree;
|
use solana::blocktree::Blocktree;
|
||||||
use solana::blocktree::{get_tmp_ledger_path, BlocktreeConfig};
|
use solana::blocktree::{get_tmp_ledger_path, BlocktreeConfig};
|
||||||
use solana::cluster_info::{ClusterInfo, Node};
|
use solana::cluster_info::{ClusterInfo, Node};
|
||||||
@ -84,7 +85,9 @@ fn test_replay() {
|
|||||||
let starting_balance = 10_000;
|
let starting_balance = 10_000;
|
||||||
let (genesis_block, mint_keypair) = GenesisBlock::new(starting_balance);
|
let (genesis_block, mint_keypair) = GenesisBlock::new(starting_balance);
|
||||||
let tvu_addr = target1.info.tvu;
|
let tvu_addr = target1.info.tvu;
|
||||||
let bank = Arc::new(Bank::new(&genesis_block));
|
|
||||||
|
let bank_forks = BankForks::new(0, Bank::new(&genesis_block));
|
||||||
|
let bank = bank_forks.working_bank();
|
||||||
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new_with_bank(
|
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new_with_bank(
|
||||||
&leader_scheduler_config,
|
&leader_scheduler_config,
|
||||||
&bank,
|
&bank,
|
||||||
@ -109,7 +112,7 @@ fn test_replay() {
|
|||||||
let (sender, _) = channel();
|
let (sender, _) = channel();
|
||||||
let tvu = Tvu::new(
|
let tvu = Tvu::new(
|
||||||
Some(Arc::new(voting_keypair)),
|
Some(Arc::new(voting_keypair)),
|
||||||
&bank,
|
&Arc::new(RwLock::new(bank_forks)),
|
||||||
0,
|
0,
|
||||||
0,
|
0,
|
||||||
cur_hash,
|
cur_hash,
|
||||||
|
Reference in New Issue
Block a user