@ -1,5 +1,4 @@
|
||||
use crate::blocktree::Blocktree;
|
||||
use solana_storage_api::SLOTS_PER_SEGMENT;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::io::{BufWriter, Write};
|
||||
@ -14,6 +13,7 @@ pub const CHACHA_KEY_SIZE: usize = 32;
|
||||
pub fn chacha_cbc_encrypt_ledger(
|
||||
blocktree: &Arc<Blocktree>,
|
||||
slice: u64,
|
||||
slots_per_segment: u64,
|
||||
out_path: &Path,
|
||||
ivec: &mut [u8; CHACHA_BLOCK_SIZE],
|
||||
) -> io::Result<usize> {
|
||||
@ -28,7 +28,7 @@ pub fn chacha_cbc_encrypt_ledger(
|
||||
let mut entry = slice;
|
||||
|
||||
loop {
|
||||
match blocktree.read_blobs_bytes(0, SLOTS_PER_SEGMENT - total_entries, &mut buffer, entry) {
|
||||
match blocktree.read_blobs_bytes(0, slots_per_segment - total_entries, &mut buffer, entry) {
|
||||
Ok((num_entries, entry_len)) => {
|
||||
debug!(
|
||||
"chacha: encrypting slice: {} num_entries: {} entry_len: {}",
|
||||
@ -113,10 +113,11 @@ mod tests {
|
||||
let ledger_dir = "chacha_test_encrypt_file";
|
||||
let ledger_path = get_tmp_ledger_path(ledger_dir);
|
||||
let ticks_per_slot = 16;
|
||||
let slots_per_segment = 32;
|
||||
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
|
||||
let out_path = Path::new("test_chacha_encrypt_file_output.txt.enc");
|
||||
|
||||
let entries = make_tiny_deterministic_test_entries(32);
|
||||
let entries = make_tiny_deterministic_test_entries(slots_per_segment);
|
||||
blocktree
|
||||
.write_entries(0, 0, 0, ticks_per_slot, &entries)
|
||||
.unwrap();
|
||||
@ -125,7 +126,8 @@ mod tests {
|
||||
"abcd1234abcd1234abcd1234abcd1234 abcd1234abcd1234abcd1234abcd1234
|
||||
abcd1234abcd1234abcd1234abcd1234 abcd1234abcd1234abcd1234abcd1234"
|
||||
);
|
||||
chacha_cbc_encrypt_ledger(&blocktree, 0, out_path, &mut key).unwrap();
|
||||
chacha_cbc_encrypt_ledger(&blocktree, 0, slots_per_segment as u64, out_path, &mut key)
|
||||
.unwrap();
|
||||
let mut out_file = File::open(out_path).unwrap();
|
||||
let mut buf = vec![];
|
||||
let size = out_file.read_to_end(&mut buf).unwrap();
|
||||
@ -133,7 +135,7 @@ mod tests {
|
||||
hasher.hash(&buf[..size]);
|
||||
|
||||
// golden needs to be updated if blob stuff changes....
|
||||
let golden: Hash = "53P7UXH7JstJa994fZsfuyr7nrRDrDDpvS3WSPvMUs45"
|
||||
let golden: Hash = "5FzYtpCqL7v6ZxZ1fW4wRkn8TK96NdiD8cLV59Rr7yav"
|
||||
.parse()
|
||||
.unwrap();
|
||||
|
||||
|
@ -7,7 +7,6 @@ use crate::sigverify::{
|
||||
chacha_cbc_encrypt_many_sample, chacha_end_sha_state, chacha_init_sha_state,
|
||||
};
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_storage_api::SLOTS_PER_SEGMENT;
|
||||
use std::io;
|
||||
use std::mem::size_of;
|
||||
use std::sync::Arc;
|
||||
@ -19,6 +18,7 @@ use std::sync::Arc;
|
||||
pub fn chacha_cbc_encrypt_file_many_keys(
|
||||
blocktree: &Arc<Blocktree>,
|
||||
segment: u64,
|
||||
slots_per_segment: u64,
|
||||
ivecs: &mut [u8],
|
||||
samples: &[u64],
|
||||
) -> io::Result<Vec<Hash>> {
|
||||
@ -46,7 +46,7 @@ pub fn chacha_cbc_encrypt_file_many_keys(
|
||||
chacha_init_sha_state(int_sha_states.as_mut_ptr(), num_keys as u32);
|
||||
}
|
||||
loop {
|
||||
match blocktree.read_blobs_bytes(entry, SLOTS_PER_SEGMENT - total_entries, &mut buffer, 0) {
|
||||
match blocktree.read_blobs_bytes(entry, slots_per_segment - total_entries, &mut buffer, 0) {
|
||||
Ok((num_entries, entry_len)) => {
|
||||
debug!(
|
||||
"chacha_cuda: encrypting segment: {} num_entries: {} entry_len: {}",
|
||||
@ -76,9 +76,9 @@ pub fn chacha_cbc_encrypt_file_many_keys(
|
||||
entry += num_entries;
|
||||
debug!(
|
||||
"total entries: {} entry: {} segment: {} entries_per_segment: {}",
|
||||
total_entries, entry, segment, SLOTS_PER_SEGMENT
|
||||
total_entries, entry, segment, slots_per_segment
|
||||
);
|
||||
if (entry - segment) >= SLOTS_PER_SEGMENT {
|
||||
if (entry - segment) >= slots_per_segment {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -113,6 +113,7 @@ mod tests {
|
||||
use crate::entry::make_tiny_test_entries;
|
||||
use crate::replicator::sample_file;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::timing::DEFAULT_SLOTS_PER_SEGMENT;
|
||||
use std::fs::{remove_dir_all, remove_file};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
@ -121,7 +122,8 @@ mod tests {
|
||||
fn test_encrypt_file_many_keys_single() {
|
||||
solana_logger::setup();
|
||||
|
||||
let entries = make_tiny_test_entries(32);
|
||||
let slots_per_segment = 32;
|
||||
let entries = make_tiny_test_entries(slots_per_segment);
|
||||
let ledger_dir = "test_encrypt_file_many_keys_single";
|
||||
let ledger_path = get_tmp_ledger_path(ledger_dir);
|
||||
let ticks_per_slot = 16;
|
||||
@ -140,12 +142,25 @@ mod tests {
|
||||
);
|
||||
|
||||
let mut cpu_iv = ivecs.clone();
|
||||
chacha_cbc_encrypt_ledger(&blocktree, 0, out_path, &mut cpu_iv).unwrap();
|
||||
chacha_cbc_encrypt_ledger(
|
||||
&blocktree,
|
||||
0,
|
||||
slots_per_segment as u64,
|
||||
out_path,
|
||||
&mut cpu_iv,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let ref_hash = sample_file(&out_path, &samples).unwrap();
|
||||
|
||||
let hashes =
|
||||
chacha_cbc_encrypt_file_many_keys(&blocktree, 0, &mut ivecs, &samples).unwrap();
|
||||
let hashes = chacha_cbc_encrypt_file_many_keys(
|
||||
&blocktree,
|
||||
0,
|
||||
slots_per_segment as u64,
|
||||
&mut ivecs,
|
||||
&samples,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(hashes[0], ref_hash);
|
||||
|
||||
@ -178,7 +193,14 @@ mod tests {
|
||||
);
|
||||
ivec[0] = i;
|
||||
ivecs.extend(ivec.clone().iter());
|
||||
chacha_cbc_encrypt_ledger(&blocktree.clone(), 0, out_path, &mut ivec).unwrap();
|
||||
chacha_cbc_encrypt_ledger(
|
||||
&blocktree.clone(),
|
||||
0,
|
||||
DEFAULT_SLOTS_PER_SEGMENT,
|
||||
out_path,
|
||||
&mut ivec,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
ref_hashes.push(sample_file(&out_path, &samples).unwrap());
|
||||
info!(
|
||||
@ -189,8 +211,14 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
let hashes =
|
||||
chacha_cbc_encrypt_file_many_keys(&blocktree, 0, &mut ivecs, &samples).unwrap();
|
||||
let hashes = chacha_cbc_encrypt_file_many_keys(
|
||||
&blocktree,
|
||||
0,
|
||||
DEFAULT_SLOTS_PER_SEGMENT,
|
||||
&mut ivecs,
|
||||
&samples,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(hashes, ref_hashes);
|
||||
|
||||
@ -205,6 +233,13 @@ mod tests {
|
||||
let ledger_path = get_tmp_ledger_path(ledger_dir);
|
||||
let samples = [0];
|
||||
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
|
||||
assert!(chacha_cbc_encrypt_file_many_keys(&blocktree, 0, &mut keys, &samples,).is_err());
|
||||
assert!(chacha_cbc_encrypt_file_many_keys(
|
||||
&blocktree,
|
||||
0,
|
||||
DEFAULT_SLOTS_PER_SEGMENT,
|
||||
&mut keys,
|
||||
&samples,
|
||||
)
|
||||
.is_err());
|
||||
}
|
||||
}
|
||||
|
@ -28,11 +28,11 @@ use solana_sdk::client::{AsyncClient, SyncClient};
|
||||
use solana_sdk::hash::{Hash, Hasher};
|
||||
use solana_sdk::message::Message;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
|
||||
use solana_sdk::timing::timestamp;
|
||||
use solana_sdk::timing::{get_segment_from_slot, timestamp};
|
||||
use solana_sdk::transaction::Transaction;
|
||||
use solana_sdk::transport::TransportError;
|
||||
use solana_storage_api::storage_contract::StorageContract;
|
||||
use solana_storage_api::{get_segment_from_slot, storage_instruction, SLOTS_PER_SEGMENT};
|
||||
use solana_storage_api::storage_instruction;
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufReader, ErrorKind, Read, Seek, SeekFrom};
|
||||
use std::mem::size_of;
|
||||
@ -59,6 +59,7 @@ pub struct Replicator {
|
||||
#[derive(Default)]
|
||||
struct ReplicatorMeta {
|
||||
slot: u64,
|
||||
slots_per_segment: u64,
|
||||
ledger_path: String,
|
||||
signature: Signature,
|
||||
ledger_data_file_encrypted: PathBuf,
|
||||
@ -103,15 +104,19 @@ pub(crate) fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<
|
||||
Ok(hasher.result())
|
||||
}
|
||||
|
||||
fn get_slot_from_blockhash(signature: &ed25519_dalek::Signature, storage_slot: u64) -> u64 {
|
||||
fn get_slot_from_blockhash(
|
||||
signature: &ed25519_dalek::Signature,
|
||||
storage_turn: u64,
|
||||
slots_per_segment: u64,
|
||||
) -> u64 {
|
||||
let signature_vec = signature.to_bytes();
|
||||
let mut segment_index = u64::from(signature_vec[0])
|
||||
| (u64::from(signature_vec[1]) << 8)
|
||||
| (u64::from(signature_vec[1]) << 16)
|
||||
| (u64::from(signature_vec[2]) << 24);
|
||||
let max_segment_index = get_segment_from_slot(storage_slot);
|
||||
let max_segment_index = get_segment_from_slot(storage_turn, slots_per_segment);
|
||||
segment_index %= max_segment_index as u64;
|
||||
segment_index * SLOTS_PER_SEGMENT
|
||||
segment_index * slots_per_segment
|
||||
}
|
||||
|
||||
fn create_request_processor(
|
||||
@ -333,17 +338,20 @@ impl Replicator {
|
||||
|
||||
// TODO make this a lot more frequent by picking a "new" blockhash instead of picking a storage blockhash
|
||||
// prep the next proof
|
||||
let (storage_blockhash, _) =
|
||||
match Self::poll_for_blockhash_and_slot(&cluster_info, &meta.blockhash) {
|
||||
Ok(blockhash_and_slot) => blockhash_and_slot,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Error couldn't get a newer blockhash than {:?}. {:?}",
|
||||
meta.blockhash, e
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
let (storage_blockhash, _) = match Self::poll_for_blockhash_and_slot(
|
||||
&cluster_info,
|
||||
meta.slots_per_segment,
|
||||
&meta.blockhash,
|
||||
) {
|
||||
Ok(blockhash_and_slot) => blockhash_and_slot,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Error couldn't get a newer blockhash than {:?}. {:?}",
|
||||
meta.blockhash, e
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
meta.blockhash = storage_blockhash;
|
||||
Self::redeem_rewards(&cluster_info, replicator_keypair, storage_keypair);
|
||||
}
|
||||
@ -393,26 +401,39 @@ impl Replicator {
|
||||
blob_fetch_receiver: BlobReceiver,
|
||||
slot_sender: Sender<u64>,
|
||||
) -> Result<(WindowService)> {
|
||||
let (storage_blockhash, storage_slot) =
|
||||
match Self::poll_for_blockhash_and_slot(&cluster_info, &Hash::default()) {
|
||||
Ok(blockhash_and_slot) => blockhash_and_slot,
|
||||
Err(e) => {
|
||||
//shutdown services before exiting
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
let slots_per_segment = match Self::get_segment_config(&cluster_info) {
|
||||
Ok(slots_per_segment) => slots_per_segment,
|
||||
Err(e) => {
|
||||
error!("unable to get segment size configuration, exiting...");
|
||||
//shutdown services before exiting
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let (storage_blockhash, storage_slot) = match Self::poll_for_blockhash_and_slot(
|
||||
&cluster_info,
|
||||
slots_per_segment,
|
||||
&Hash::default(),
|
||||
) {
|
||||
Ok(blockhash_and_slot) => blockhash_and_slot,
|
||||
Err(e) => {
|
||||
error!("unable to get turn status, exiting...");
|
||||
//shutdown services before exiting
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let signature = storage_keypair.sign(storage_blockhash.as_ref());
|
||||
let slot = get_slot_from_blockhash(&signature, storage_slot);
|
||||
let slot = get_slot_from_blockhash(&signature, storage_slot, slots_per_segment);
|
||||
info!("replicating slot: {}", slot);
|
||||
slot_sender.send(slot)?;
|
||||
meta.slot = slot;
|
||||
meta.slots_per_segment = slots_per_segment;
|
||||
meta.signature = Signature::new(&signature.to_bytes());
|
||||
meta.blockhash = storage_blockhash;
|
||||
|
||||
let mut repair_slot_range = RepairSlotRange::default();
|
||||
repair_slot_range.end = slot + SLOTS_PER_SEGMENT;
|
||||
repair_slot_range.end = slot + slots_per_segment;
|
||||
repair_slot_range.start = slot;
|
||||
|
||||
let (retransmit_sender, _) = channel();
|
||||
@ -428,12 +449,20 @@ impl Replicator {
|
||||
|_, _, _| true,
|
||||
);
|
||||
info!("waiting for ledger download");
|
||||
Self::wait_for_segment_download(slot, &blocktree, &exit, &node_info, cluster_info);
|
||||
Self::wait_for_segment_download(
|
||||
slot,
|
||||
slots_per_segment,
|
||||
&blocktree,
|
||||
&exit,
|
||||
&node_info,
|
||||
cluster_info,
|
||||
);
|
||||
Ok(window_service)
|
||||
}
|
||||
|
||||
fn wait_for_segment_download(
|
||||
start_slot: u64,
|
||||
slots_per_segment: u64,
|
||||
blocktree: &Arc<Blocktree>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
node_info: &ContactInfo,
|
||||
@ -448,7 +477,7 @@ impl Replicator {
|
||||
while blocktree.is_full(current_slot) {
|
||||
current_slot += 1;
|
||||
info!("current slot: {}", current_slot);
|
||||
if current_slot >= start_slot + SLOTS_PER_SEGMENT {
|
||||
if current_slot >= start_slot + slots_per_segment {
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
@ -481,6 +510,7 @@ impl Replicator {
|
||||
let num_encrypted_bytes = chacha_cbc_encrypt_ledger(
|
||||
blocktree,
|
||||
meta.slot,
|
||||
meta.slots_per_segment,
|
||||
&meta.ledger_data_file_encrypted,
|
||||
&mut ivec,
|
||||
)?;
|
||||
@ -593,7 +623,7 @@ impl Replicator {
|
||||
let instruction = storage_instruction::mining_proof(
|
||||
&storage_keypair.pubkey(),
|
||||
meta.sha_state,
|
||||
get_segment_from_slot(meta.slot),
|
||||
get_segment_from_slot(meta.slot, meta.slots_per_segment),
|
||||
Signature::new(&meta.signature.as_ref()),
|
||||
meta.blockhash,
|
||||
);
|
||||
@ -625,9 +655,37 @@ impl Replicator {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_segment_config(cluster_info: &Arc<RwLock<ClusterInfo>>) -> result::Result<u64, Error> {
|
||||
let rpc_peers = {
|
||||
let cluster_info = cluster_info.read().unwrap();
|
||||
cluster_info.rpc_peers()
|
||||
};
|
||||
debug!("rpc peers: {:?}", rpc_peers);
|
||||
if !rpc_peers.is_empty() {
|
||||
let rpc_client = {
|
||||
let node_index = thread_rng().gen_range(0, rpc_peers.len());
|
||||
RpcClient::new_socket(rpc_peers[node_index].rpc)
|
||||
};
|
||||
Ok(rpc_client
|
||||
.retry_make_rpc_request(&RpcRequest::GetSlotsPerSegment, None, 0)
|
||||
.map_err(|err| {
|
||||
warn!("Error while making rpc request {:?}", err);
|
||||
Error::IO(io::Error::new(ErrorKind::Other, "rpc error"))
|
||||
})?
|
||||
.as_u64()
|
||||
.unwrap())
|
||||
} else {
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"No RPC peers...".to_string(),
|
||||
))?
|
||||
}
|
||||
}
|
||||
|
||||
/// Poll for a different blockhash and associated max_slot than `previous_blockhash`
|
||||
fn poll_for_blockhash_and_slot(
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
slots_per_segment: u64,
|
||||
previous_blockhash: &Hash,
|
||||
) -> result::Result<(Hash, u64), Error> {
|
||||
for _ in 0..10 {
|
||||
@ -673,7 +731,7 @@ impl Replicator {
|
||||
.as_u64()
|
||||
.unwrap();
|
||||
info!("storage slot: {}", storage_slot);
|
||||
if get_segment_from_slot(storage_slot) != 0 {
|
||||
if get_segment_from_slot(storage_slot, slots_per_segment) != 0 {
|
||||
return Ok((storage_blockhash, storage_slot));
|
||||
}
|
||||
}
|
||||
@ -696,6 +754,7 @@ impl Replicator {
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
replicator_info: &ContactInfo,
|
||||
blocktree: &Arc<Blocktree>,
|
||||
slots_per_segment: u64,
|
||||
) -> Result<(u64)> {
|
||||
// Create a client which downloads from the replicator and see that it
|
||||
// can respond with blobs.
|
||||
@ -714,7 +773,7 @@ impl Replicator {
|
||||
);
|
||||
let repair_slot_range = RepairSlotRange {
|
||||
start: start_slot,
|
||||
end: start_slot + SLOTS_PER_SEGMENT,
|
||||
end: start_slot + slots_per_segment,
|
||||
};
|
||||
// try for upto 180 seconds //TODO needs tuning if segments are huge
|
||||
for _ in 0..120 {
|
||||
@ -761,7 +820,7 @@ impl Replicator {
|
||||
window_service::process_blobs(&blobs, blocktree)?;
|
||||
}
|
||||
// check if all the slots in the segment are complete
|
||||
if Self::segment_complete(start_slot, blocktree) {
|
||||
if Self::segment_complete(start_slot, slots_per_segment, blocktree) {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_millis(500));
|
||||
@ -770,7 +829,7 @@ impl Replicator {
|
||||
t_receiver.join().unwrap();
|
||||
|
||||
// check if all the slots in the segment are complete
|
||||
if !Self::segment_complete(start_slot, blocktree) {
|
||||
if !Self::segment_complete(start_slot, slots_per_segment, blocktree) {
|
||||
Err(io::Error::new(
|
||||
ErrorKind::Other,
|
||||
"Unable to download the full segment",
|
||||
@ -779,8 +838,12 @@ impl Replicator {
|
||||
Ok(start_slot)
|
||||
}
|
||||
|
||||
fn segment_complete(start_slot: u64, blocktree: &Arc<Blocktree>) -> bool {
|
||||
for slot in start_slot..(start_slot + SLOTS_PER_SEGMENT) {
|
||||
fn segment_complete(
|
||||
start_slot: u64,
|
||||
slots_per_segment: u64,
|
||||
blocktree: &Arc<Blocktree>,
|
||||
) -> bool {
|
||||
for slot in start_slot..(start_slot + slots_per_segment) {
|
||||
if !blocktree.is_full(slot) {
|
||||
return false;
|
||||
}
|
||||
|
@ -143,6 +143,10 @@ impl JsonRpcRequestProcessor {
|
||||
Ok(self.storage_state.get_slot())
|
||||
}
|
||||
|
||||
fn get_slots_per_segment(&self) -> Result<u64> {
|
||||
Ok(self.bank().slots_per_segment())
|
||||
}
|
||||
|
||||
fn get_storage_pubkeys_for_slot(&self, slot: u64) -> Result<Vec<Pubkey>> {
|
||||
Ok(self
|
||||
.storage_state
|
||||
@ -265,6 +269,9 @@ pub trait RpcSol {
|
||||
#[rpc(meta, name = "getStorageSlot")]
|
||||
fn get_storage_slot(&self, _: Self::Metadata) -> Result<u64>;
|
||||
|
||||
#[rpc(meta, name = "getSlotsPerSegment")]
|
||||
fn get_slots_per_segment(&self, _: Self::Metadata) -> Result<u64>;
|
||||
|
||||
#[rpc(meta, name = "getStoragePubkeysForSlot")]
|
||||
fn get_storage_pubkeys_for_slot(&self, _: Self::Metadata, _: u64) -> Result<Vec<Pubkey>>;
|
||||
|
||||
@ -531,6 +538,13 @@ impl RpcSol for RpcSolImpl {
|
||||
meta.request_processor.read().unwrap().get_storage_slot()
|
||||
}
|
||||
|
||||
fn get_slots_per_segment(&self, meta: Self::Metadata) -> Result<u64> {
|
||||
meta.request_processor
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_slots_per_segment()
|
||||
}
|
||||
|
||||
fn get_storage_pubkeys_for_slot(&self, meta: Self::Metadata, slot: u64) -> Result<Vec<Pubkey>> {
|
||||
meta.request_processor
|
||||
.read()
|
||||
|
@ -20,10 +20,11 @@ use solana_sdk::instruction::Instruction;
|
||||
use solana_sdk::message::Message;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
|
||||
use solana_sdk::timing::get_segment_from_slot;
|
||||
use solana_sdk::transaction::Transaction;
|
||||
use solana_storage_api::storage_contract::{Proof, ProofStatus, StorageContract};
|
||||
use solana_storage_api::storage_instruction;
|
||||
use solana_storage_api::storage_instruction::proof_validation;
|
||||
use solana_storage_api::{get_segment_from_slot, storage_instruction};
|
||||
use std::collections::HashMap;
|
||||
use std::mem::size_of;
|
||||
use std::net::UdpSocket;
|
||||
@ -47,6 +48,7 @@ pub struct StorageStateInner {
|
||||
replicator_map: ReplicatorMap,
|
||||
storage_blockhash: Hash,
|
||||
slot: u64,
|
||||
slots_per_segment: u64,
|
||||
}
|
||||
|
||||
// Used to track root slots in storage stage
|
||||
@ -86,7 +88,7 @@ fn get_identity_index_from_signature(key: &Signature) -> usize {
|
||||
}
|
||||
|
||||
impl StorageState {
|
||||
pub fn new(hash: &Hash) -> Self {
|
||||
pub fn new(hash: &Hash, slots_per_segment: u64) -> Self {
|
||||
let storage_keys = vec![0u8; KEY_SIZE * NUM_IDENTITIES];
|
||||
let storage_results = vec![Hash::default(); NUM_IDENTITIES];
|
||||
let replicator_map = vec![];
|
||||
@ -96,6 +98,7 @@ impl StorageState {
|
||||
storage_results,
|
||||
replicator_map,
|
||||
slot: 0,
|
||||
slots_per_segment,
|
||||
storage_blockhash: *hash,
|
||||
};
|
||||
|
||||
@ -129,7 +132,8 @@ impl StorageState {
|
||||
) -> Vec<Pubkey> {
|
||||
// TODO: keep track of age?
|
||||
const MAX_PUBKEYS_TO_RETURN: usize = 5;
|
||||
let index = get_segment_from_slot(slot) as usize;
|
||||
let index =
|
||||
get_segment_from_slot(slot, self.state.read().unwrap().slots_per_segment) as usize;
|
||||
let replicator_map = &self.state.read().unwrap().replicator_map;
|
||||
let working_bank = bank_forks.read().unwrap().working_bank();
|
||||
let accounts = replicator_accounts(&working_bank);
|
||||
@ -356,6 +360,7 @@ impl StorageStage {
|
||||
_blocktree: &Arc<Blocktree>,
|
||||
blockhash: Hash,
|
||||
slot: u64,
|
||||
slots_per_segment: u64,
|
||||
instruction_sender: &InstructionSender,
|
||||
) -> Result<()> {
|
||||
let mut seed = [0u8; 32];
|
||||
@ -364,7 +369,7 @@ impl StorageStage {
|
||||
let ix = storage_instruction::advertise_recent_blockhash(
|
||||
&storage_keypair.pubkey(),
|
||||
blockhash,
|
||||
slot,
|
||||
get_segment_from_slot(slot, slots_per_segment),
|
||||
);
|
||||
instruction_sender.send(ix)?;
|
||||
|
||||
@ -379,7 +384,7 @@ impl StorageStage {
|
||||
}
|
||||
|
||||
// Regenerate the answers
|
||||
let num_segments = get_segment_from_slot(slot) as usize;
|
||||
let num_segments = get_segment_from_slot(slot, slots_per_segment) as usize;
|
||||
if num_segments == 0 {
|
||||
info!("Ledger has 0 segments!");
|
||||
return Ok(());
|
||||
@ -412,6 +417,7 @@ impl StorageStage {
|
||||
match chacha_cbc_encrypt_file_many_keys(
|
||||
_blocktree,
|
||||
segment as u64,
|
||||
statew.slots_per_segment,
|
||||
&mut statew.storage_keys,
|
||||
&samples,
|
||||
) {
|
||||
@ -430,6 +436,7 @@ impl StorageStage {
|
||||
|
||||
fn collect_proofs(
|
||||
slot: u64,
|
||||
slots_per_segment: u64,
|
||||
account_id: Pubkey,
|
||||
account: Account,
|
||||
storage_state: &Arc<RwLock<StorageStateInner>>,
|
||||
@ -437,7 +444,7 @@ impl StorageStage {
|
||||
) {
|
||||
if let Ok(StorageContract::ReplicatorStorage { proofs, .. }) = account.state() {
|
||||
//convert slot to segment
|
||||
let segment = get_segment_from_slot(slot);
|
||||
let segment = get_segment_from_slot(slot, slots_per_segment);
|
||||
if let Some(proofs) = proofs.get(&segment) {
|
||||
for proof in proofs.iter() {
|
||||
{
|
||||
@ -454,10 +461,12 @@ impl StorageStage {
|
||||
}
|
||||
|
||||
let mut statew = storage_state.write().unwrap();
|
||||
if statew.replicator_map.len() < segment {
|
||||
statew.replicator_map.resize(segment, HashMap::new());
|
||||
if statew.replicator_map.len() < segment as usize {
|
||||
statew
|
||||
.replicator_map
|
||||
.resize(segment as usize, HashMap::new());
|
||||
}
|
||||
let proof_segment_index = proof.segment_index;
|
||||
let proof_segment_index = proof.segment_index as usize;
|
||||
if proof_segment_index < statew.replicator_map.len() {
|
||||
// TODO randomly select and verify the proof first
|
||||
// Copy the submitted proof
|
||||
@ -503,6 +512,7 @@ impl StorageStage {
|
||||
for (account_id, account) in replicator_accounts.into_iter() {
|
||||
Self::collect_proofs(
|
||||
bank.slot(),
|
||||
bank.slots_per_segment(),
|
||||
account_id,
|
||||
account,
|
||||
storage_state,
|
||||
@ -517,10 +527,11 @@ impl StorageStage {
|
||||
&blocktree,
|
||||
bank.last_blockhash(),
|
||||
bank.slot(),
|
||||
bank.slots_per_segment(),
|
||||
instruction_sender,
|
||||
);
|
||||
Self::submit_verifications(
|
||||
get_segment_from_slot(bank.slot()),
|
||||
get_segment_from_slot(bank.slot(), bank.slots_per_segment()),
|
||||
&storage_state,
|
||||
&storage_keypair,
|
||||
instruction_sender,
|
||||
@ -532,7 +543,7 @@ impl StorageStage {
|
||||
}
|
||||
|
||||
fn submit_verifications(
|
||||
current_segment: usize,
|
||||
current_segment: u64,
|
||||
storage_state: &Arc<RwLock<StorageStateInner>>,
|
||||
storage_keypair: &Arc<Keypair>,
|
||||
ix_sender: &Sender<Instruction>,
|
||||
@ -578,7 +589,7 @@ impl StorageStage {
|
||||
.map(|checked_proofs| {
|
||||
proof_validation(
|
||||
&storage_keypair.pubkey(),
|
||||
current_segment as u64,
|
||||
current_segment,
|
||||
checked_proofs.to_vec(),
|
||||
)
|
||||
})
|
||||
@ -626,7 +637,6 @@ mod tests {
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
|
||||
use solana_storage_api::SLOTS_PER_SEGMENT;
|
||||
use std::cmp::{max, min};
|
||||
use std::fs::remove_dir_all;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
@ -646,7 +656,7 @@ mod tests {
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank.clone()], 0)));
|
||||
let (_slot_sender, slot_receiver) = channel();
|
||||
let storage_state = StorageState::new(&bank.last_blockhash());
|
||||
let storage_state = StorageState::new(&bank.last_blockhash(), bank.slots_per_segment());
|
||||
let storage_stage = StorageStage::new(
|
||||
&storage_state,
|
||||
slot_receiver,
|
||||
@ -685,7 +695,7 @@ mod tests {
|
||||
|
||||
let cluster_info = test_cluster_info(&keypair.pubkey());
|
||||
let (bank_sender, bank_receiver) = channel();
|
||||
let storage_state = StorageState::new(&bank.last_blockhash());
|
||||
let storage_state = StorageState::new(&bank.last_blockhash(), bank.slots_per_segment());
|
||||
let storage_stage = StorageStage::new(
|
||||
&storage_state,
|
||||
bank_receiver,
|
||||
@ -710,7 +720,7 @@ mod tests {
|
||||
assert_eq!(result, Hash::default());
|
||||
|
||||
let mut last_bank = bank;
|
||||
let rooted_banks = (slot..slot + SLOTS_PER_SEGMENT + 1)
|
||||
let rooted_banks = (slot..slot + last_bank.slots_per_segment() + 1)
|
||||
.map(|i| {
|
||||
let bank = Bank::new_from_parent(&last_bank, &keypair.pubkey(), i);
|
||||
blocktree_processor::process_entries(
|
||||
@ -774,7 +784,7 @@ mod tests {
|
||||
let cluster_info = test_cluster_info(&keypair.pubkey());
|
||||
|
||||
let (bank_sender, bank_receiver) = channel();
|
||||
let storage_state = StorageState::new(&bank.last_blockhash());
|
||||
let storage_state = StorageState::new(&bank.last_blockhash(), bank.slots_per_segment());
|
||||
let storage_stage = StorageStage::new(
|
||||
&storage_state,
|
||||
bank_receiver,
|
||||
@ -827,7 +837,7 @@ mod tests {
|
||||
blocktree_processor::process_entries(
|
||||
&next_bank,
|
||||
&entry::create_ticks(
|
||||
DEFAULT_TICKS_PER_SLOT * SLOTS_PER_SEGMENT + 1,
|
||||
DEFAULT_TICKS_PER_SLOT * next_bank.slots_per_segment() + 1,
|
||||
bank.last_blockhash(),
|
||||
),
|
||||
)
|
||||
|
@ -23,8 +23,7 @@ use solana_sdk::genesis_block::GenesisBlock;
|
||||
use solana_sdk::poh_config::PohConfig;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use solana_sdk::timing::timestamp;
|
||||
use solana_storage_api::SLOTS_PER_SEGMENT;
|
||||
use solana_sdk::timing::{timestamp, DEFAULT_SLOTS_PER_SEGMENT};
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::Receiver;
|
||||
@ -48,7 +47,7 @@ impl Default for ValidatorConfig {
|
||||
// TODO: remove this, temporary parameter to configure
|
||||
// storage amount differently for test configurations
|
||||
// so tests don't take forever to run.
|
||||
const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = SLOTS_PER_SEGMENT;
|
||||
const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = DEFAULT_SLOTS_PER_SEGMENT;
|
||||
Self {
|
||||
sigverify_disabled: false,
|
||||
voting_disabled: false,
|
||||
@ -158,7 +157,7 @@ impl Validator {
|
||||
keypair.clone(),
|
||||
)));
|
||||
|
||||
let storage_state = StorageState::new(&bank.last_blockhash());
|
||||
let storage_state = StorageState::new(&bank.last_blockhash(), bank.slots_per_segment());
|
||||
|
||||
let rpc_service = if node.info.rpc.port() == 0 {
|
||||
None
|
||||
|
@ -16,6 +16,7 @@ use solana::validator::ValidatorConfig;
|
||||
use solana_client::thin_client::create_client;
|
||||
use solana_sdk::genesis_block::create_genesis_block;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use solana_sdk::timing::DEFAULT_SLOTS_PER_SEGMENT;
|
||||
use std::fs::remove_dir_all;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
@ -28,7 +29,7 @@ fn run_replicator_startup_basic(num_nodes: usize, num_replicators: usize) {
|
||||
let mut validator_config = ValidatorConfig::default();
|
||||
validator_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT;
|
||||
let config = ClusterConfig {
|
||||
validator_configs: vec![ValidatorConfig::default(); num_nodes],
|
||||
validator_configs: vec![validator_config; num_nodes],
|
||||
num_replicators,
|
||||
node_stakes: vec![100; num_nodes],
|
||||
cluster_lamports: 10_000,
|
||||
@ -62,7 +63,13 @@ fn run_replicator_startup_basic(num_nodes: usize, num_replicators: usize) {
|
||||
let path = get_tmp_ledger_path("test");
|
||||
let blocktree = Arc::new(Blocktree::open(&path).unwrap());
|
||||
assert_eq!(
|
||||
Replicator::download_from_replicator(&cluster_info, &replicator_info, &blocktree).unwrap(),
|
||||
Replicator::download_from_replicator(
|
||||
&cluster_info,
|
||||
&replicator_info,
|
||||
&blocktree,
|
||||
DEFAULT_SLOTS_PER_SEGMENT,
|
||||
)
|
||||
.unwrap(),
|
||||
0
|
||||
);
|
||||
}
|
||||
|
Reference in New Issue
Block a user