Storage fixes

* replicators generate their sample values
* fixes to replicator block height logic
This commit is contained in:
Stephen Akridge
2019-01-02 11:02:15 -08:00
committed by sakridge
parent c0c38463c7
commit 1fd7bd7ede
13 changed files with 302 additions and 135 deletions

View File

@@ -9,15 +9,15 @@ use crate::leader_scheduler::LeaderScheduler;
use crate::result::Result;
use crate::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler};
use crate::service::Service;
use crate::storage_stage::ENTRIES_PER_SEGMENT;
use crate::storage_stage::{get_segment_from_entry, ENTRIES_PER_SEGMENT};
use crate::streamer::BlobReceiver;
use crate::thin_client::retry_get_balance;
use crate::thin_client::{retry_get_balance, ThinClient};
use crate::window_service::window_service;
use rand::thread_rng;
use rand::Rng;
use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT};
use solana_sdk::hash::{Hash, Hasher};
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::storage_program::StorageTransaction;
use solana_sdk::transaction::Transaction;
use std::fs::File;
@@ -43,6 +43,7 @@ pub struct Replicator {
t_window: JoinHandle<()>,
pub retransmit_receiver: BlobReceiver,
exit: Arc<AtomicBool>,
entry_height: u64,
}
pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<Hash> {
@@ -80,6 +81,23 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<Hash> {
Ok(hasher.result())
}
fn get_entry_heights_from_last_id(
signature: &ring::signature::Signature,
storage_entry_height: u64,
) -> (u64, u64) {
let signature_vec = signature.as_ref();
let mut segment_index = u64::from(signature_vec[0])
| (u64::from(signature_vec[1]) << 8)
| (u64::from(signature_vec[1]) << 16)
| (u64::from(signature_vec[2]) << 24);
let max_segment_index = get_segment_from_entry(storage_entry_height);
segment_index %= max_segment_index as u64;
let entry_height = segment_index * ENTRIES_PER_SEGMENT;
let max_entry_height = entry_height + ENTRIES_PER_SEGMENT;
(entry_height, max_entry_height)
}
impl Replicator {
#[allow(clippy::new_ret_no_self)]
pub fn new(
@@ -120,53 +138,18 @@ impl Replicator {
);
info!("polling for leader");
let leader;
loop {
if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() {
leader = l.clone();
break;
}
sleep(Duration::from_millis(900));
info!("{}", cluster_info.read().unwrap().node_info_trace());
}
let leader = Self::poll_for_leader(&cluster_info)?;
info!("Got leader: {:?}", leader);
let mut storage_last_id;
let mut storage_entry_height;
loop {
let rpc_client = {
let cluster_info = cluster_info.read().unwrap();
let rpc_peers = cluster_info.rpc_peers();
info!("rpc peers: {:?}", rpc_peers);
let node_idx = thread_rng().gen_range(0, rpc_peers.len());
RpcClient::new_from_socket(rpc_peers[node_idx].rpc)
};
storage_last_id = rpc_client
.make_rpc_request(2, RpcRequest::GetStorageMiningLastId, None)
.expect("rpc request")
.to_string();
storage_entry_height = rpc_client
.make_rpc_request(2, RpcRequest::GetStorageMiningEntryHeight, None)
.expect("rpc request")
.as_u64()
.unwrap();
if storage_entry_height != 0 {
break;
}
}
let (storage_last_id, storage_entry_height) =
Self::poll_for_last_id_and_entry_height(&cluster_info)?;
let signature = keypair.sign(storage_last_id.as_ref());
let signature = signature.as_ref();
let block_index = u64::from(signature[0])
| (u64::from(signature[1]) << 8)
| (u64::from(signature[1]) << 16)
| (u64::from(signature[2]) << 24);
let mut entry_height = block_index * ENTRIES_PER_SEGMENT;
entry_height %= storage_entry_height;
let max_entry_height = entry_height + ENTRIES_PER_SEGMENT;
let (entry_height, max_entry_height) =
get_entry_heights_from_last_id(&signature, storage_entry_height);
info!("replicating entry_height: {}", entry_height);
let repair_socket = Arc::new(node.sockets.repair);
let mut blob_sockets: Vec<Arc<UdpSocket>> =
@@ -208,7 +191,141 @@ impl Replicator {
let mut client = mk_client(&leader);
if retry_get_balance(&mut client, &keypair.pubkey(), None).is_none() {
Self::get_airdrop_tokens(&mut client, keypair, &leader_info);
info!("Done downloading ledger at {}", ledger_path.unwrap());
let ledger_path = Path::new(ledger_path.unwrap());
let ledger_data_file_encrypted = ledger_path.join("ledger.enc");
let mut sampling_offsets = Vec::new();
#[cfg(not(feature = "chacha"))]
sampling_offsets.push(0);
#[cfg(feature = "chacha")]
{
use crate::storage_stage::NUM_STORAGE_SAMPLES;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
let mut ivec = [0u8; 64];
ivec.copy_from_slice(signature.as_ref());
let num_encrypted_bytes = chacha_cbc_encrypt_ledger(
&db_ledger,
entry_height,
&ledger_data_file_encrypted,
&mut ivec,
)?;
let num_chacha_blocks = num_encrypted_bytes / CHACHA_BLOCK_SIZE;
let mut rng_seed = [0u8; 32];
rng_seed.copy_from_slice(&signature.as_ref()[0..32]);
let mut rng = ChaChaRng::from_seed(rng_seed);
for _ in 0..NUM_STORAGE_SAMPLES {
sampling_offsets.push(rng.gen_range(0, num_chacha_blocks) as u64);
}
}
info!("Done encrypting the ledger");
match sample_file(&ledger_data_file_encrypted, &sampling_offsets) {
Ok(hash) => {
let last_id = client.get_last_id();
info!("sampled hash: {}", hash);
let mut tx = Transaction::storage_new_mining_proof(
&keypair,
hash,
last_id,
entry_height,
Signature::new(signature.as_ref()),
);
client
.retry_transfer(&keypair, &mut tx, 10)
.expect("transfer didn't work!");
}
Err(e) => info!("Error occurred while sampling: {:?}", e),
}
Ok(Self {
gossip_service,
fetch_stage,
t_window,
retransmit_receiver,
exit,
entry_height,
})
}
pub fn close(self) {
self.exit.store(true, Ordering::Relaxed);
self.join()
}
pub fn join(self) {
self.gossip_service.join().unwrap();
self.fetch_stage.join().unwrap();
self.t_window.join().unwrap();
// Drain the queue here to prevent self.retransmit_receiver from being dropped
// before the window_service thread is joined
let mut retransmit_queue_count = 0;
while let Ok(_blob) = self.retransmit_receiver.recv_timeout(Duration::new(1, 0)) {
retransmit_queue_count += 1;
}
debug!("retransmit channel count: {}", retransmit_queue_count);
}
pub fn entry_height(&self) -> u64 {
self.entry_height
}
fn poll_for_leader(cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<NodeInfo> {
for _ in 0..30 {
if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() {
return Ok(l.clone());
}
sleep(Duration::from_millis(900));
info!("{}", cluster_info.read().unwrap().node_info_trace());
}
Err(Error::new(ErrorKind::Other, "Couldn't find leader"))?
}
fn poll_for_last_id_and_entry_height(
cluster_info: &Arc<RwLock<ClusterInfo>>,
) -> Result<(String, u64)> {
for _ in 0..10 {
let rpc_client = {
let cluster_info = cluster_info.read().unwrap();
let rpc_peers = cluster_info.rpc_peers();
debug!("rpc peers: {:?}", rpc_peers);
let node_idx = thread_rng().gen_range(0, rpc_peers.len());
RpcClient::new_from_socket(rpc_peers[node_idx].rpc)
};
let storage_last_id = rpc_client
.make_rpc_request(2, RpcRequest::GetStorageMiningLastId, None)
.expect("rpc request")
.to_string();
let storage_entry_height = rpc_client
.make_rpc_request(2, RpcRequest::GetStorageMiningEntryHeight, None)
.expect("rpc request")
.as_u64()
.unwrap();
if get_segment_from_entry(storage_entry_height) != 0 {
return Ok((storage_last_id, storage_entry_height));
}
info!("max entry_height: {}", storage_entry_height);
sleep(Duration::from_secs(3));
}
Err(Error::new(
ErrorKind::Other,
"Couldn't get last_id or entry_height",
))?
}
fn get_airdrop_tokens(client: &mut ThinClient, keypair: &Keypair, leader_info: &NodeInfo) {
if retry_get_balance(client, &keypair.pubkey(), None).is_none() {
let mut drone_addr = leader_info.tpu;
drone_addr.set_port(DRONE_PORT);
@@ -233,65 +350,6 @@ impl Replicator {
}
};
}
info!("Done downloading ledger at {}", ledger_path.unwrap());
let ledger_path = Path::new(ledger_path.unwrap());
let ledger_data_file_encrypted = ledger_path.join("ledger.enc");
#[cfg(feature = "chacha")]
{
let mut ivec = [0u8; CHACHA_BLOCK_SIZE];
ivec[0..4].copy_from_slice(&[2, 3, 4, 5]);
chacha_cbc_encrypt_ledger(
&db_ledger,
entry_height,
&ledger_data_file_encrypted,
&mut ivec,
)?;
}
info!("Done encrypting the ledger");
let sampling_offsets = [0, 1, 2, 3];
match sample_file(&ledger_data_file_encrypted, &sampling_offsets) {
Ok(hash) => {
let last_id = client.get_last_id();
info!("sampled hash: {}", hash);
let tx =
Transaction::storage_new_mining_proof(&keypair, hash, last_id, entry_height);
client.transfer_signed(&tx).expect("transfer didn't work!");
}
Err(e) => info!("Error occurred while sampling: {:?}", e),
}
Ok(Self {
gossip_service,
fetch_stage,
t_window,
retransmit_receiver,
exit,
})
}
pub fn close(self) {
self.exit.store(true, Ordering::Relaxed);
self.join()
}
pub fn join(self) {
self.gossip_service.join().unwrap();
self.fetch_stage.join().unwrap();
self.t_window.join().unwrap();
// Drain the queue here to prevent self.retransmit_receiver from being dropped
// before the window_service thread is joined
let mut retransmit_queue_count = 0;
while let Ok(_blob) = self.retransmit_receiver.recv_timeout(Duration::new(1, 0)) {
retransmit_queue_count += 1;
}
debug!("retransmit channel count: {}", retransmit_queue_count);
}
}