Minor code restyling, no functional changes
This commit is contained in:
@ -58,7 +58,7 @@ pub struct ClusterConfig {
|
|||||||
pub fullnode_config: FullnodeConfig,
|
pub fullnode_config: FullnodeConfig,
|
||||||
/// Number of replicators in the cluster
|
/// Number of replicators in the cluster
|
||||||
/// Note- replicators will timeout if ticks_per_slot is much larger than the default 8
|
/// Note- replicators will timeout if ticks_per_slot is much larger than the default 8
|
||||||
pub num_replicators: u64,
|
pub num_replicators: usize,
|
||||||
/// Number of nodes that are unstaked and not voting (a.k.a listening)
|
/// Number of nodes that are unstaked and not voting (a.k.a listening)
|
||||||
pub num_listeners: u64,
|
pub num_listeners: u64,
|
||||||
/// The stakes of each node
|
/// The stakes of each node
|
||||||
|
@ -141,82 +141,85 @@ impl StorageStage {
|
|||||||
storage_rotate_count: u64,
|
storage_rotate_count: u64,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let storage_state_inner = storage_state.state.clone();
|
|
||||||
let exit0 = exit.clone();
|
|
||||||
let keypair0 = storage_keypair.clone();
|
|
||||||
|
|
||||||
let (instruction_sender, instruction_receiver) = channel();
|
let (instruction_sender, instruction_receiver) = channel();
|
||||||
|
|
||||||
let t_storage_mining_verifier = Builder::new()
|
let t_storage_mining_verifier = {
|
||||||
.name("solana-storage-mining-verify-stage".to_string())
|
let storage_state_inner = storage_state.state.clone();
|
||||||
.spawn(move || {
|
let exit = exit.clone();
|
||||||
let mut current_key = 0;
|
let storage_keypair = storage_keypair.clone();
|
||||||
let mut slot_count = 0;
|
Builder::new()
|
||||||
loop {
|
.name("solana-storage-mining-verify-stage".to_string())
|
||||||
if let Some(ref some_blocktree) = blocktree {
|
.spawn(move || {
|
||||||
if let Err(e) = Self::process_entries(
|
let mut current_key = 0;
|
||||||
&keypair0,
|
let mut slot_count = 0;
|
||||||
&storage_state_inner,
|
loop {
|
||||||
&slot_receiver,
|
if let Some(ref some_blocktree) = blocktree {
|
||||||
&some_blocktree,
|
if let Err(e) = Self::process_entries(
|
||||||
&mut slot_count,
|
&storage_keypair,
|
||||||
&mut current_key,
|
&storage_state_inner,
|
||||||
storage_rotate_count,
|
&slot_receiver,
|
||||||
&instruction_sender,
|
&some_blocktree,
|
||||||
) {
|
&mut slot_count,
|
||||||
match e {
|
&mut current_key,
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
storage_rotate_count,
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
&instruction_sender,
|
||||||
_ => info!("Error from process_entries: {:?}", e),
|
) {
|
||||||
|
match e {
|
||||||
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
|
_ => info!("Error from process_entries: {:?}", e),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
if exit.load(Ordering::Relaxed) {
|
||||||
if exit0.load(Ordering::Relaxed) {
|
break;
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let cluster_info0 = cluster_info.clone();
|
|
||||||
let exit1 = exit.clone();
|
|
||||||
let keypair1 = keypair.clone();
|
|
||||||
let storage_keypair1 = storage_keypair.clone();
|
|
||||||
let bank_forks1 = bank_forks.clone();
|
|
||||||
let t_storage_create_accounts = Builder::new()
|
|
||||||
.name("solana-storage-create-accounts".to_string())
|
|
||||||
.spawn(move || {
|
|
||||||
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
||||||
loop {
|
|
||||||
match instruction_receiver.recv_timeout(Duration::from_secs(1)) {
|
|
||||||
Ok(instruction) => {
|
|
||||||
if Self::send_transaction(
|
|
||||||
&bank_forks1,
|
|
||||||
&cluster_info0,
|
|
||||||
instruction,
|
|
||||||
&keypair1,
|
|
||||||
&storage_keypair1,
|
|
||||||
Some(storage_keypair1.pubkey()),
|
|
||||||
&transactions_socket,
|
|
||||||
)
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
debug!("Failed to send storage transaction");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Err(e) => match e {
|
|
||||||
RecvTimeoutError::Disconnected => break,
|
|
||||||
RecvTimeoutError::Timeout => (),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
if exit1.load(Ordering::Relaxed) {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
sleep(Duration::from_millis(100));
|
})
|
||||||
}
|
.unwrap()
|
||||||
})
|
};
|
||||||
.unwrap();
|
|
||||||
|
let t_storage_create_accounts = {
|
||||||
|
let cluster_info = cluster_info.clone();
|
||||||
|
let exit = exit.clone();
|
||||||
|
let keypair = keypair.clone();
|
||||||
|
let storage_keypair = storage_keypair.clone();
|
||||||
|
let bank_forks = bank_forks.clone();
|
||||||
|
Builder::new()
|
||||||
|
.name("solana-storage-create-accounts".to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
loop {
|
||||||
|
match instruction_receiver.recv_timeout(Duration::from_secs(1)) {
|
||||||
|
Ok(instruction) => {
|
||||||
|
Self::send_transaction(
|
||||||
|
&bank_forks,
|
||||||
|
&cluster_info,
|
||||||
|
instruction,
|
||||||
|
&keypair,
|
||||||
|
&storage_keypair,
|
||||||
|
&transactions_socket,
|
||||||
|
)
|
||||||
|
.unwrap_or_else(|err| {
|
||||||
|
info!("failed to send storage transaction: {:?}", err)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => match e {
|
||||||
|
RecvTimeoutError::Disconnected => break,
|
||||||
|
RecvTimeoutError::Timeout => (),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
sleep(Duration::from_millis(100));
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
|
};
|
||||||
|
|
||||||
StorageStage {
|
StorageStage {
|
||||||
t_storage_mining_verifier,
|
t_storage_mining_verifier,
|
||||||
@ -230,27 +233,27 @@ impl StorageStage {
|
|||||||
instruction: Instruction,
|
instruction: Instruction,
|
||||||
keypair: &Arc<Keypair>,
|
keypair: &Arc<Keypair>,
|
||||||
storage_keypair: &Arc<Keypair>,
|
storage_keypair: &Arc<Keypair>,
|
||||||
account_to_create: Option<Pubkey>,
|
|
||||||
transactions_socket: &UdpSocket,
|
transactions_socket: &UdpSocket,
|
||||||
) -> io::Result<()> {
|
) -> io::Result<()> {
|
||||||
let working_bank = bank_forks.read().unwrap().working_bank();
|
let working_bank = bank_forks.read().unwrap().working_bank();
|
||||||
let blockhash = working_bank.confirmed_last_blockhash();
|
let blockhash = working_bank.confirmed_last_blockhash();
|
||||||
let mut instructions = vec![];
|
let mut instructions = vec![];
|
||||||
let mut signing_keys = vec![];
|
let mut signing_keys = vec![];
|
||||||
if let Some(account) = account_to_create {
|
if working_bank
|
||||||
if working_bank.get_account(&account).is_none() {
|
.get_account(&storage_keypair.pubkey())
|
||||||
// TODO the account space needs to be well defined somewhere
|
.is_none()
|
||||||
let create_instruction = system_instruction::create_account(
|
{
|
||||||
&keypair.pubkey(),
|
// TODO the account space needs to be well defined somewhere
|
||||||
&storage_keypair.pubkey(),
|
let create_instruction = system_instruction::create_account(
|
||||||
1,
|
&keypair.pubkey(),
|
||||||
1024 * 4,
|
&storage_keypair.pubkey(),
|
||||||
&solana_storage_api::id(),
|
1000,
|
||||||
);
|
1024 * 4,
|
||||||
instructions.push(create_instruction);
|
&solana_storage_api::id(),
|
||||||
signing_keys.push(keypair.as_ref());
|
);
|
||||||
info!("storage account requested");
|
instructions.push(create_instruction);
|
||||||
}
|
signing_keys.push(keypair.as_ref());
|
||||||
|
info!("storage account requested");
|
||||||
}
|
}
|
||||||
instructions.push(instruction);
|
instructions.push(instruction);
|
||||||
signing_keys.push(storage_keypair.as_ref());
|
signing_keys.push(storage_keypair.as_ref());
|
||||||
@ -264,17 +267,21 @@ impl StorageStage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn process_entry_crossing(
|
fn process_entry_crossing(
|
||||||
|
storage_keypair: &Arc<Keypair>,
|
||||||
state: &Arc<RwLock<StorageStateInner>>,
|
state: &Arc<RwLock<StorageStateInner>>,
|
||||||
keypair: &Arc<Keypair>,
|
|
||||||
_blocktree: &Arc<Blocktree>,
|
_blocktree: &Arc<Blocktree>,
|
||||||
entry_id: Hash,
|
entry_id: Hash,
|
||||||
slot: u64,
|
slot: u64,
|
||||||
instruction_sender: &InstructionSender,
|
instruction_sender: &InstructionSender,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut seed = [0u8; 32];
|
let mut seed = [0u8; 32];
|
||||||
let signature = keypair.sign(&entry_id.as_ref());
|
let signature = storage_keypair.sign(&entry_id.as_ref());
|
||||||
|
|
||||||
let ix = storage_instruction::advertise_recent_blockhash(&keypair.pubkey(), entry_id, slot);
|
let ix = storage_instruction::advertise_recent_blockhash(
|
||||||
|
&storage_keypair.pubkey(),
|
||||||
|
entry_id,
|
||||||
|
slot,
|
||||||
|
);
|
||||||
instruction_sender.send(ix)?;
|
instruction_sender.send(ix)?;
|
||||||
|
|
||||||
seed.copy_from_slice(&signature.to_bytes()[..32]);
|
seed.copy_from_slice(&signature.to_bytes()[..32]);
|
||||||
@ -383,7 +390,7 @@ impl StorageStage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn process_entries(
|
fn process_entries(
|
||||||
keypair: &Arc<Keypair>,
|
storage_keypair: &Arc<Keypair>,
|
||||||
storage_state: &Arc<RwLock<StorageStateInner>>,
|
storage_state: &Arc<RwLock<StorageStateInner>>,
|
||||||
slot_receiver: &Receiver<u64>,
|
slot_receiver: &Receiver<u64>,
|
||||||
blocktree: &Arc<Blocktree>,
|
blocktree: &Arc<Blocktree>,
|
||||||
@ -419,8 +426,8 @@ impl StorageStage {
|
|||||||
slot, entry.num_hashes
|
slot, entry.num_hashes
|
||||||
);
|
);
|
||||||
Self::process_entry_crossing(
|
Self::process_entry_crossing(
|
||||||
|
&storage_keypair,
|
||||||
&storage_state,
|
&storage_state,
|
||||||
&keypair,
|
|
||||||
&blocktree,
|
&blocktree,
|
||||||
entry.hash,
|
entry.hash,
|
||||||
slot,
|
slot,
|
||||||
|
@ -107,7 +107,7 @@ fn run_replicator_startup_basic(num_nodes: usize, num_replicators: usize) {
|
|||||||
fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT;
|
fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT;
|
||||||
let config = ClusterConfig {
|
let config = ClusterConfig {
|
||||||
fullnode_config,
|
fullnode_config,
|
||||||
num_replicators: num_replicators as u64,
|
num_replicators,
|
||||||
node_stakes: vec![100; num_nodes],
|
node_stakes: vec![100; num_nodes],
|
||||||
cluster_lamports: 10_000,
|
cluster_lamports: 10_000,
|
||||||
..ClusterConfig::default()
|
..ClusterConfig::default()
|
||||||
|
@ -85,8 +85,11 @@ fn test_replay() {
|
|||||||
|
|
||||||
let (bank_forks, _bank_forks_info, blocktree, ledger_signal_receiver, leader_schedule_cache) =
|
let (bank_forks, _bank_forks_info, blocktree, ledger_signal_receiver, leader_schedule_cache) =
|
||||||
fullnode::new_banks_from_blocktree(&blocktree_path, None);
|
fullnode::new_banks_from_blocktree(&blocktree_path, None);
|
||||||
let bank = bank_forks.working_bank();
|
let working_bank = bank_forks.working_bank();
|
||||||
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), mint_balance);
|
assert_eq!(
|
||||||
|
working_bank.get_balance(&mint_keypair.pubkey()),
|
||||||
|
mint_balance
|
||||||
|
);
|
||||||
|
|
||||||
let leader_schedule_cache = Arc::new(leader_schedule_cache);
|
let leader_schedule_cache = Arc::new(leader_schedule_cache);
|
||||||
// start cluster_info1
|
// start cluster_info1
|
||||||
@ -100,7 +103,7 @@ fn test_replay() {
|
|||||||
let blocktree = Arc::new(blocktree);
|
let blocktree = Arc::new(blocktree);
|
||||||
{
|
{
|
||||||
let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) =
|
let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) =
|
||||||
create_test_recorder(&bank, &blocktree);
|
create_test_recorder(&working_bank, &blocktree);
|
||||||
let tvu = Tvu::new(
|
let tvu = Tvu::new(
|
||||||
&voting_keypair.pubkey(),
|
&voting_keypair.pubkey(),
|
||||||
Some(Arc::new(voting_keypair)),
|
Some(Arc::new(voting_keypair)),
|
||||||
|
Reference in New Issue
Block a user