Fix storage program space issues and limit storage transaction data (#4677)
This commit is contained in:
@ -52,7 +52,7 @@ pub struct Replicator {
|
||||
ledger_path: String,
|
||||
keypair: Arc<Keypair>,
|
||||
storage_keypair: Arc<Keypair>,
|
||||
blockhash: String,
|
||||
blockhash: Hash,
|
||||
signature: ed25519_dalek::Signature,
|
||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||
ledger_data_file_encrypted: PathBuf,
|
||||
@ -204,7 +204,7 @@ impl Replicator {
|
||||
let client = crate::gossip_service::get_client(&nodes);
|
||||
|
||||
let (storage_blockhash, storage_slot) =
|
||||
match Self::poll_for_blockhash_and_slot(&cluster_info, &Hash::default().to_string()) {
|
||||
match Self::poll_for_blockhash_and_slot(&cluster_info, &Hash::default()) {
|
||||
Ok(blockhash_and_slot) => blockhash_and_slot,
|
||||
Err(e) => {
|
||||
//shutdown services before exiting
|
||||
@ -301,9 +301,9 @@ impl Replicator {
|
||||
pub fn run(&mut self) {
|
||||
info!("waiting for ledger download");
|
||||
self.thread_handles.pop().unwrap().join().unwrap();
|
||||
self.encrypt_ledger()
|
||||
.expect("ledger encrypt not successful");
|
||||
loop {
|
||||
self.encrypt_ledger()
|
||||
.expect("ledger encrypt not successful");
|
||||
self.create_sampling_offsets();
|
||||
if let Err(err) = self.sample_file_to_create_mining_hash() {
|
||||
warn!("Error sampling file, exiting: {:?}", err);
|
||||
@ -311,6 +311,7 @@ impl Replicator {
|
||||
}
|
||||
self.submit_mining_proof();
|
||||
|
||||
// Todo make this a lot more frequent by picking a "new" blockhash instead of picking a storage blockhash
|
||||
// prep the next proof
|
||||
let (storage_blockhash, _) =
|
||||
match Self::poll_for_blockhash_and_slot(&self.cluster_info, &self.blockhash) {
|
||||
@ -323,7 +324,6 @@ impl Replicator {
|
||||
break;
|
||||
}
|
||||
};
|
||||
self.signature = self.storage_keypair.sign(storage_blockhash.as_ref());
|
||||
self.blockhash = storage_blockhash;
|
||||
}
|
||||
}
|
||||
@ -473,6 +473,7 @@ impl Replicator {
|
||||
self.sha_state,
|
||||
get_segment_from_slot(self.slot),
|
||||
Signature::new(&self.signature.to_bytes()),
|
||||
self.blockhash,
|
||||
);
|
||||
let message = Message::new_with_payer(vec![instruction], Some(&self.keypair.pubkey()));
|
||||
let mut transaction = Transaction::new(
|
||||
@ -507,8 +508,8 @@ impl Replicator {
|
||||
/// Poll for a different blockhash and associated max_slot than `previous_blockhash`
|
||||
fn poll_for_blockhash_and_slot(
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
previous_blockhash: &str,
|
||||
) -> result::Result<(String, u64), Error> {
|
||||
previous_blockhash: &Hash,
|
||||
) -> result::Result<(Hash, u64), Error> {
|
||||
for _ in 0..10 {
|
||||
let rpc_client = {
|
||||
let cluster_info = cluster_info.read().unwrap();
|
||||
@ -517,13 +518,28 @@ impl Replicator {
|
||||
let node_index = thread_rng().gen_range(0, rpc_peers.len());
|
||||
RpcClient::new_socket(rpc_peers[node_index].rpc)
|
||||
};
|
||||
let storage_blockhash = rpc_client
|
||||
let response = rpc_client
|
||||
.retry_make_rpc_request(&RpcRequest::GetStorageBlockhash, None, 0)
|
||||
.map_err(|err| {
|
||||
warn!("Error while making rpc request {:?}", err);
|
||||
Error::IO(io::Error::new(ErrorKind::Other, "rpc error"))
|
||||
})?
|
||||
.to_string();
|
||||
})?;
|
||||
let storage_blockhash =
|
||||
serde_json::from_value::<(String)>(response).map_err(|err| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("Couldn't parse response: {:?}", err),
|
||||
)
|
||||
})?;
|
||||
let storage_blockhash = storage_blockhash.parse().map_err(|err| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!(
|
||||
"Blockhash parse failure: {:?} on {:?}",
|
||||
err, storage_blockhash
|
||||
),
|
||||
)
|
||||
})?;
|
||||
if storage_blockhash != *previous_blockhash {
|
||||
let storage_slot = rpc_client
|
||||
.retry_make_rpc_request(&RpcRequest::GetStorageSlot, None, 0)
|
||||
|
@ -21,11 +21,10 @@ use solana_sdk::message::Message;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
|
||||
use solana_sdk::transaction::Transaction;
|
||||
use solana_storage_api::storage_contract::{CheckedProof, Proof, ProofStatus, StorageContract};
|
||||
use solana_storage_api::storage_contract::{Proof, ProofStatus, StorageContract};
|
||||
use solana_storage_api::storage_instruction::proof_validation;
|
||||
use solana_storage_api::{get_segment_from_slot, storage_instruction};
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::mem::size_of;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
@ -33,6 +32,7 @@ use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{self, sleep, Builder, JoinHandle};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{cmp, io};
|
||||
|
||||
// Block of hash answers to validate against
|
||||
// Vec of [ledger blocks] x [keys]
|
||||
@ -86,7 +86,7 @@ fn get_identity_index_from_signature(key: &Signature) -> usize {
|
||||
}
|
||||
|
||||
impl StorageState {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(hash: &Hash) -> Self {
|
||||
let storage_keys = vec![0u8; KEY_SIZE * NUM_IDENTITIES];
|
||||
let storage_results = vec![Hash::default(); NUM_IDENTITIES];
|
||||
let replicator_map = vec![];
|
||||
@ -96,7 +96,7 @@ impl StorageState {
|
||||
storage_results,
|
||||
replicator_map,
|
||||
slot: 0,
|
||||
storage_blockhash: Hash::default(),
|
||||
storage_blockhash: *hash,
|
||||
};
|
||||
|
||||
StorageState {
|
||||
@ -439,7 +439,7 @@ impl StorageStage {
|
||||
//convert slot to segment
|
||||
let segment = get_segment_from_slot(slot);
|
||||
if let Some(proofs) = proofs.get(&segment) {
|
||||
for (_, proof) in proofs.iter() {
|
||||
for proof in proofs.iter() {
|
||||
{
|
||||
debug!(
|
||||
"generating storage_keys from storage txs current_key_idx: {}",
|
||||
@ -543,6 +543,8 @@ impl StorageStage {
|
||||
// bundle up mining submissions from replicators
|
||||
// and submit them in a tx to the leader to get rewarded.
|
||||
let mut w_state = storage_state.write().unwrap();
|
||||
let mut max_proof_mask = 0;
|
||||
let proof_mask_limit = storage_instruction::proof_mask_limit();
|
||||
let instructions: Vec<_> = w_state
|
||||
.replicator_map
|
||||
.iter_mut()
|
||||
@ -552,32 +554,44 @@ impl StorageStage {
|
||||
.iter_mut()
|
||||
.filter_map(|(id, proofs)| {
|
||||
if !proofs.is_empty() {
|
||||
Some((
|
||||
*id,
|
||||
proofs
|
||||
.drain(..)
|
||||
.map(|proof| CheckedProof {
|
||||
proof,
|
||||
status: ProofStatus::Valid,
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
))
|
||||
if (proofs.len() as u64) >= proof_mask_limit {
|
||||
proofs.clear();
|
||||
None
|
||||
} else {
|
||||
max_proof_mask = cmp::max(max_proof_mask, proofs.len());
|
||||
Some((
|
||||
*id,
|
||||
proofs
|
||||
.drain(..)
|
||||
.map(|_| ProofStatus::Valid)
|
||||
.collect::<Vec<_>>(),
|
||||
))
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
.collect::<Vec<(_, _)>>();
|
||||
|
||||
if !checked_proofs.is_empty() {
|
||||
let ix = proof_validation(
|
||||
&storage_keypair.pubkey(),
|
||||
current_segment as u64,
|
||||
checked_proofs,
|
||||
);
|
||||
Some(ix)
|
||||
let max_accounts_per_ix =
|
||||
storage_instruction::validation_account_limit(max_proof_mask);
|
||||
let ixs = checked_proofs
|
||||
.chunks(max_accounts_per_ix as usize)
|
||||
.map(|checked_proofs| {
|
||||
proof_validation(
|
||||
&storage_keypair.pubkey(),
|
||||
current_segment as u64,
|
||||
checked_proofs.to_vec(),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
Some(ixs)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.flatten()
|
||||
.collect();
|
||||
let res: std::result::Result<_, _> = instructions
|
||||
.into_iter()
|
||||
@ -633,9 +647,9 @@ mod tests {
|
||||
let cluster_info = test_cluster_info(&keypair.pubkey());
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank], 0)));
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank.clone()], 0)));
|
||||
let (_slot_sender, slot_receiver) = channel();
|
||||
let storage_state = StorageState::new();
|
||||
let storage_state = StorageState::new(&bank.last_blockhash());
|
||||
let storage_stage = StorageStage::new(
|
||||
&storage_state,
|
||||
slot_receiver,
|
||||
@ -674,7 +688,7 @@ mod tests {
|
||||
|
||||
let cluster_info = test_cluster_info(&keypair.pubkey());
|
||||
let (bank_sender, bank_receiver) = channel();
|
||||
let storage_state = StorageState::new();
|
||||
let storage_state = StorageState::new(&bank.last_blockhash());
|
||||
let storage_stage = StorageStage::new(
|
||||
&storage_state,
|
||||
bank_receiver,
|
||||
@ -763,7 +777,7 @@ mod tests {
|
||||
let cluster_info = test_cluster_info(&keypair.pubkey());
|
||||
|
||||
let (bank_sender, bank_receiver) = channel();
|
||||
let storage_state = StorageState::new();
|
||||
let storage_state = StorageState::new(&bank.last_blockhash());
|
||||
let storage_stage = StorageStage::new(
|
||||
&storage_state,
|
||||
bank_receiver,
|
||||
@ -808,6 +822,7 @@ mod tests {
|
||||
Hash::default(),
|
||||
0,
|
||||
keypair.sign_message(b"test"),
|
||||
bank.last_blockhash(),
|
||||
);
|
||||
|
||||
let next_bank = Arc::new(Bank::new_from_parent(&bank, &keypair.pubkey(), 2));
|
||||
|
@ -153,7 +153,7 @@ impl Validator {
|
||||
keypair.clone(),
|
||||
)));
|
||||
|
||||
let storage_state = StorageState::new();
|
||||
let storage_state = StorageState::new(&bank.last_blockhash());
|
||||
|
||||
let rpc_service = if node.info.rpc.port() == 0 {
|
||||
None
|
||||
|
Reference in New Issue
Block a user