Rename replicator to archiver (#6464)

* Rename replicator to archiver

* cargo fmt

* Fix grammar
This commit is contained in:
Greg Fitzgerald
2019-10-21 11:29:37 -06:00
committed by GitHub
parent 6c79f56c2c
commit 9232057e95
61 changed files with 529 additions and 560 deletions

View File

@ -51,18 +51,18 @@ use std::time::Duration;
static ENCRYPTED_FILENAME: &str = "ledger.enc";
#[derive(Serialize, Deserialize)]
pub enum ReplicatorRequest {
pub enum ArchiverRequest {
GetSlotHeight(SocketAddr),
}
pub struct Replicator {
pub struct Archiver {
thread_handles: Vec<JoinHandle<()>>,
exit: Arc<AtomicBool>,
}
// Shared Replicator Meta struct used internally
// Shared Archiver Meta struct used internally
#[derive(Default)]
struct ReplicatorMeta {
struct ArchiverMeta {
slot: u64,
slots_per_segment: u64,
ledger_path: PathBuf,
@ -135,16 +135,10 @@ fn create_request_processor(
let (s_responder, r_responder) = channel();
let storage_socket = Arc::new(socket);
let recycler = Recycler::default();
let t_receiver = receiver(
storage_socket.clone(),
exit,
s_reader,
recycler,
"replicator",
);
let t_receiver = receiver(storage_socket.clone(), exit, s_reader, recycler, "archiver");
thread_handles.push(t_receiver);
let t_responder = responder("replicator-responder", storage_socket.clone(), r_responder);
let t_responder = responder("archiver-responder", storage_socket.clone(), r_responder);
thread_handles.push(t_responder);
let exit = exit.clone();
@ -160,10 +154,10 @@ fn create_request_processor(
if let Ok(packets) = packets {
for packet in &packets.packets {
let req: result::Result<ReplicatorRequest, Box<bincode::ErrorKind>> =
let req: result::Result<ArchiverRequest, Box<bincode::ErrorKind>> =
deserialize(&packet.data[..packet.meta.size]);
match req {
Ok(ReplicatorRequest::GetSlotHeight(from)) => {
Ok(ArchiverRequest::GetSlotHeight(from)) => {
if let Ok(blob) = to_shared_blob(slot, from) {
let _ = s_responder.send(vec![blob]);
}
@ -192,15 +186,15 @@ fn poll_for_slot(receiver: Receiver<u64>, exit: &Arc<AtomicBool>) -> u64 {
}
}
impl Replicator {
/// Returns a Result that contains a replicator on success
impl Archiver {
/// Returns a Result that contains an archiver on success
///
/// # Arguments
/// * `ledger_path` - path to where the ledger will be stored.
/// Causes panic if none
/// * `node` - The replicator node
/// * `node` - The archiver node
/// * `cluster_entrypoint` - ContactInfo representing an entry into the network
/// * `keypair` - Keypair for this replicator
/// * `keypair` - Keypair for this archiver
#[allow(clippy::new_ret_no_self)]
pub fn new(
ledger_path: &Path,
@ -211,7 +205,7 @@ impl Replicator {
) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false));
info!("Replicator: id: {}", keypair.pubkey());
info!("Archiver: id: {}", keypair.pubkey());
info!("Creating cluster info....");
let mut cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone());
cluster_info.set_entrypoint(cluster_entrypoint.clone());
@ -235,7 +229,7 @@ impl Replicator {
info!("Connecting to the cluster via {:?}", cluster_entrypoint);
let (nodes, _) =
match crate::gossip_service::discover_cluster(&cluster_entrypoint.gossip, 1) {
Ok(nodes_and_replicators) => nodes_and_replicators,
Ok(nodes_and_archivers) => nodes_and_archivers,
Err(e) => {
//shutdown services before exiting
exit.store(true, Ordering::Relaxed);
@ -273,15 +267,15 @@ impl Replicator {
let request_processor =
create_request_processor(node.sockets.storage.unwrap(), &exit, slot_receiver);
let t_replicator = {
let t_archiver = {
let exit = exit.clone();
let node_info = node.info.clone();
let mut meta = ReplicatorMeta {
let mut meta = ArchiverMeta {
ledger_path: ledger_path.to_path_buf(),
..ReplicatorMeta::default()
..ArchiverMeta::default()
};
spawn(move || {
// setup replicator
// setup archiver
let window_service = match Self::setup(
&mut meta,
cluster_info.clone(),
@ -296,7 +290,7 @@ impl Replicator {
Ok(window_service) => window_service,
Err(e) => {
//shutdown services before exiting
error!("setup failed {:?}; replicator thread exiting...", e);
error!("setup failed {:?}; archiver thread exiting...", e);
exit.store(true, Ordering::Relaxed);
request_processor
.into_iter()
@ -308,7 +302,7 @@ impl Replicator {
};
info!("setup complete");
// run replicator
// run archiver
Self::run(
&mut meta,
&blocktree,
@ -328,16 +322,16 @@ impl Replicator {
};
Ok(Self {
thread_handles: vec![t_replicator],
thread_handles: vec![t_archiver],
exit,
})
}
fn run(
meta: &mut ReplicatorMeta,
meta: &mut ArchiverMeta,
blocktree: &Arc<Blocktree>,
cluster_info: Arc<RwLock<ClusterInfo>>,
replicator_keypair: &Arc<Keypair>,
archiver_keypair: &Arc<Keypair>,
storage_keypair: &Arc<Keypair>,
exit: &Arc<AtomicBool>,
) {
@ -362,7 +356,7 @@ impl Replicator {
}
};
Self::submit_mining_proof(meta, &cluster_info, replicator_keypair, storage_keypair);
Self::submit_mining_proof(meta, &cluster_info, archiver_keypair, storage_keypair);
// TODO make this a lot more frequent by picking a "new" blockhash instead of picking a storage blockhash
// prep the next proof
@ -382,34 +376,34 @@ impl Replicator {
}
};
meta.blockhash = storage_blockhash;
Self::redeem_rewards(&cluster_info, replicator_keypair, storage_keypair);
Self::redeem_rewards(&cluster_info, archiver_keypair, storage_keypair);
}
exit.store(true, Ordering::Relaxed);
}
fn redeem_rewards(
cluster_info: &Arc<RwLock<ClusterInfo>>,
replicator_keypair: &Arc<Keypair>,
archiver_keypair: &Arc<Keypair>,
storage_keypair: &Arc<Keypair>,
) {
let nodes = cluster_info.read().unwrap().tvu_peers();
let client = crate::gossip_service::get_client(&nodes);
if let Ok(Some(account)) = client.get_account(&storage_keypair.pubkey()) {
if let Ok(StorageContract::ReplicatorStorage { validations, .. }) = account.state() {
if let Ok(StorageContract::ArchiverStorage { validations, .. }) = account.state() {
if !validations.is_empty() {
let ix = storage_instruction::claim_reward(
&replicator_keypair.pubkey(),
&archiver_keypair.pubkey(),
&storage_keypair.pubkey(),
);
let message =
Message::new_with_payer(vec![ix], Some(&replicator_keypair.pubkey()));
if let Err(e) = client.send_message(&[&replicator_keypair], message) {
Message::new_with_payer(vec![ix], Some(&archiver_keypair.pubkey()));
if let Err(e) = client.send_message(&[&archiver_keypair], message) {
error!("unable to redeem reward, tx failed: {:?}", e);
} else {
info!(
"collected mining rewards: Account balance {:?}",
client.get_balance(&replicator_keypair.pubkey())
client.get_balance(&archiver_keypair.pubkey())
);
}
}
@ -421,7 +415,7 @@ impl Replicator {
// Find a segment to replicate and download it.
fn setup(
meta: &mut ReplicatorMeta,
meta: &mut ArchiverMeta,
cluster_info: Arc<RwLock<ClusterInfo>>,
blocktree: &Arc<Blocktree>,
exit: &Arc<AtomicBool>,
@ -520,7 +514,7 @@ impl Replicator {
info!("Done receiving entries from window_service");
// Remove replicator from the data plane
// Remove archiver from the data plane
let mut contact_info = node_info.clone();
contact_info.tvu = "0.0.0.0:0".parse().unwrap();
contact_info.wallclock = timestamp();
@ -530,7 +524,7 @@ impl Replicator {
}
}
fn encrypt_ledger(meta: &mut ReplicatorMeta, blocktree: &Arc<Blocktree>) -> Result<()> {
fn encrypt_ledger(meta: &mut ArchiverMeta, blocktree: &Arc<Blocktree>) -> Result<()> {
meta.ledger_data_file_encrypted = meta.ledger_path.join(ENCRYPTED_FILENAME);
{
@ -555,7 +549,7 @@ impl Replicator {
Ok(())
}
fn create_sampling_offsets(meta: &mut ReplicatorMeta) {
fn create_sampling_offsets(meta: &mut ArchiverMeta) {
meta.sampling_offsets.clear();
let mut rng_seed = [0u8; 32];
rng_seed.copy_from_slice(&meta.blockhash.as_ref());
@ -580,7 +574,7 @@ impl Replicator {
keypair: &Keypair,
storage_keypair: &Keypair,
) -> Result<()> {
// make sure replicator has some balance
// make sure archiver has some balance
if client.poll_get_balance(&keypair.pubkey())? == 0 {
return Err(
io::Error::new(io::ErrorKind::Other, "keypair account has no balance").into(),
@ -605,7 +599,7 @@ impl Replicator {
&keypair.pubkey(),
&storage_keypair.pubkey(),
1,
StorageAccountType::Replicator,
StorageAccountType::Archiver,
);
let tx = Transaction::new_signed_instructions(&[keypair], ix, blockhash);
let signature = client.async_send_transaction(tx)?;
@ -623,9 +617,9 @@ impl Replicator {
}
fn submit_mining_proof(
meta: &ReplicatorMeta,
meta: &ArchiverMeta,
cluster_info: &Arc<RwLock<ClusterInfo>>,
replicator_keypair: &Arc<Keypair>,
archiver_keypair: &Arc<Keypair>,
storage_keypair: &Arc<Keypair>,
) {
// No point if we've got no storage account...
@ -637,9 +631,9 @@ impl Replicator {
return;
}
// ...or no lamports for fees
let balance = client.poll_get_balance(&replicator_keypair.pubkey());
let balance = client.poll_get_balance(&archiver_keypair.pubkey());
if balance.is_err() || balance.unwrap() == 0 {
error!("Unable to submit mining proof, insufficient Replicator Account balance");
error!("Unable to submit mining proof, insufficient Archiver Account balance");
return;
}
@ -657,15 +651,14 @@ impl Replicator {
Signature::new(&meta.signature.as_ref()),
meta.blockhash,
);
let message =
Message::new_with_payer(vec![instruction], Some(&replicator_keypair.pubkey()));
let message = Message::new_with_payer(vec![instruction], Some(&archiver_keypair.pubkey()));
let mut transaction = Transaction::new(
&[replicator_keypair.as_ref(), storage_keypair.as_ref()],
&[archiver_keypair.as_ref(), storage_keypair.as_ref()],
message,
blockhash,
);
if let Err(err) = client.send_and_confirm_transaction(
&[&replicator_keypair, &storage_keypair],
&[&archiver_keypair, &storage_keypair],
&mut transaction,
10,
0,
@ -787,21 +780,21 @@ impl Replicator {
}
}
/// Ask a replicator to populate a given blocktree with its segment.
/// Return the slot at the start of the replicator's segment
/// Ask an archiver to populate a given blocktree with its segment.
/// Return the slot at the start of the archiver's segment
///
/// It is recommended to use a temporary blocktree for this since the download will not verify
/// blobs received and might impact the chaining of blobs across slots
pub fn download_from_replicator(
pub fn download_from_archiver(
cluster_info: &Arc<RwLock<ClusterInfo>>,
replicator_info: &ContactInfo,
archiver_info: &ContactInfo,
blocktree: &Arc<Blocktree>,
slots_per_segment: u64,
) -> Result<(u64)> {
// Create a client which downloads from the replicator and see that it
// Create a client which downloads from the archiver and see that it
// can respond with blobs.
let start_slot = Self::get_replicator_segment_slot(replicator_info.storage_addr);
info!("Replicator download: start at {}", start_slot);
let start_slot = Self::get_archiver_segment_slot(archiver_info.storage_addr);
info!("Archiver download: start at {}", start_slot);
let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel();
@ -811,13 +804,13 @@ impl Replicator {
&exit,
s_reader.clone(),
Recycler::default(),
"replicator_reeciver",
"archiver_reeciver",
);
let id = cluster_info.read().unwrap().id();
info!(
"Sending repair requests from: {} to: {}",
cluster_info.read().unwrap().my_data().id,
replicator_info.gossip
archiver_info.gossip
);
let repair_slot_range = RepairSlotRange {
start: start_slot,
@ -825,7 +818,7 @@ impl Replicator {
};
// try for upto 180 seconds //TODO needs tuning if segments are huge
for _ in 0..120 {
// Strategy used by replicators
// Strategy used by archivers
let repairs = RepairService::generate_repairs_in_range(
blocktree,
repair_service::MAX_REPAIR_LENGTH,
@ -840,7 +833,7 @@ impl Replicator {
.read()
.unwrap()
.map_repair_request(&repair_request)
.map(|result| ((replicator_info.gossip, result), repair_request))
.map(|result| ((archiver_info.gossip, result), repair_request))
.ok()
})
.collect();
@ -848,7 +841,7 @@ impl Replicator {
for ((to, req), repair_request) in reqs {
if let Ok(local_addr) = repair_socket.local_addr() {
datapoint_info!(
"replicator_download",
"archiver_download",
("repair_request", format!("{:?}", repair_request), String),
("to", to.to_string(), String),
("from", local_addr.to_string(), String),
@ -856,7 +849,7 @@ impl Replicator {
);
}
repair_socket
.send_to(&req, replicator_info.gossip)
.send_to(&req, archiver_info.gossip)
.unwrap_or_else(|e| {
error!("{} repair req send_to({}) error {:?}", id, to, e);
0
@ -906,13 +899,13 @@ impl Replicator {
true
}
fn get_replicator_segment_slot(to: SocketAddr) -> u64 {
fn get_archiver_segment_slot(to: SocketAddr) -> u64 {
let (_port, socket) = bind_in_range(VALIDATOR_PORT_RANGE).unwrap();
socket
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
let req = ReplicatorRequest::GetSlotHeight(socket.local_addr().unwrap());
let req = ArchiverRequest::GetSlotHeight(socket.local_addr().unwrap());
let serialized_req = bincode::serialize(&req).unwrap();
for _ in 0..10 {
socket.send_to(&serialized_req, to).unwrap();
@ -922,7 +915,7 @@ impl Replicator {
}
sleep(Duration::from_millis(500));
}
panic!("Couldn't get segment slot from replicator!");
panic!("Couldn't get segment slot from archiver!");
}
}

View File

@ -113,8 +113,8 @@ pub fn chacha_cbc_encrypt_file_many_keys(
#[cfg(test)]
mod tests {
use super::*;
use crate::archiver::sample_file;
use crate::chacha::chacha_cbc_encrypt_ledger;
use crate::replicator::sample_file;
use solana_ledger::blocktree::get_tmp_ledger_path;
use solana_ledger::entry::create_ticks;
use solana_sdk::clock::DEFAULT_SLOTS_PER_SEGMENT;

View File

@ -242,7 +242,7 @@ impl ClusterInfo {
pub fn contact_info_trace(&self) -> String {
let now = timestamp();
let mut spy_nodes = 0;
let mut replicators = 0;
let mut archivers = 0;
let my_pubkey = self.my_data().id;
let nodes: Vec<_> = self
.all_peers()
@ -250,8 +250,8 @@ impl ClusterInfo {
.map(|(node, last_updated)| {
if Self::is_spy_node(&node) {
spy_nodes += 1;
} else if Self::is_replicator(&node) {
replicators += 1;
} else if Self::is_archiver(&node) {
archivers += 1;
}
fn addr_to_string(addr: &SocketAddr) -> String {
if ContactInfo::is_valid_address(addr) {
@ -281,9 +281,9 @@ impl ClusterInfo {
{}\
Nodes: {}{}{}",
nodes.join(""),
nodes.len() - spy_nodes - replicators,
if replicators > 0 {
format!("\nReplicators: {}", replicators)
nodes.len() - spy_nodes - archivers,
if archivers > 0 {
format!("\nArchivers: {}", archivers)
} else {
"".to_string()
},
@ -426,7 +426,7 @@ impl ClusterInfo {
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| ContactInfo::is_valid_address(&x.tvu))
.filter(|x| !ClusterInfo::is_replicator(x))
.filter(|x| !ClusterInfo::is_archiver(x))
.filter(|x| x.id != me)
.cloned()
.collect()
@ -478,7 +478,7 @@ impl ClusterInfo {
&& !ContactInfo::is_valid_address(&contact_info.storage_addr)
}
pub fn is_replicator(contact_info: &ContactInfo) -> bool {
pub fn is_archiver(contact_info: &ContactInfo) -> bool {
ContactInfo::is_valid_address(&contact_info.storage_addr)
&& !ContactInfo::is_valid_address(&contact_info.tpu)
}
@ -1593,7 +1593,7 @@ impl Node {
let pubkey = Pubkey::new_rand();
Self::new_localhost_with_pubkey(&pubkey)
}
pub fn new_localhost_replicator(pubkey: &Pubkey) -> Self {
pub fn new_localhost_archiver(pubkey: &Pubkey) -> Self {
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let tvu = UdpSocket::bind("127.0.0.1:0").unwrap();
let tvu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap();
@ -1748,7 +1748,7 @@ impl Node {
},
}
}
pub fn new_replicator_with_external_ip(
pub fn new_archiver_with_external_ip(
pubkey: &Pubkey,
gossip_addr: &SocketAddr,
port_range: PortRange,
@ -2130,9 +2130,9 @@ mod tests {
}
#[test]
fn new_replicator_external_ip_test() {
fn new_archiver_external_ip_test() {
let ip = Ipv4Addr::from(0);
let node = Node::new_replicator_with_external_ip(
let node = Node::new_archiver_with_external_ip(
&Pubkey::new_rand(),
&socketaddr!(ip, 0),
VALIDATOR_PORT_RANGE,

View File

@ -53,7 +53,7 @@ impl GossipService {
}
}
/// Discover Nodes and Replicators in a cluster
/// Discover Nodes and Archivers in a cluster
pub fn discover_cluster(
entry_point: &SocketAddr,
num_nodes: usize,
@ -76,7 +76,7 @@ pub fn discover(
info!("Gossip entry point: {:?}", entry_point);
info!("Spy node id: {:?}", id);
let (met_criteria, secs, tvu_peers, replicators) = spy(
let (met_criteria, secs, tvu_peers, archivers) = spy(
spy_ref.clone(),
num_nodes,
timeout,
@ -93,7 +93,7 @@ pub fn discover(
secs,
spy_ref.read().unwrap().contact_info_trace()
);
return Ok((tvu_peers, replicators));
return Ok((tvu_peers, archivers));
}
if !tvu_peers.is_empty() {
@ -101,7 +101,7 @@ pub fn discover(
"discover failed to match criteria by timeout...\n{}",
spy_ref.read().unwrap().contact_info_trace()
);
return Ok((tvu_peers, replicators));
return Ok((tvu_peers, archivers));
}
info!(
@ -159,7 +159,7 @@ fn spy(
let now = Instant::now();
let mut met_criteria = false;
let mut tvu_peers: Vec<ContactInfo> = Vec::new();
let mut replicators: Vec<ContactInfo> = Vec::new();
let mut archivers: Vec<ContactInfo> = Vec::new();
let mut i = 0;
loop {
if let Some(secs) = timeout {
@ -167,22 +167,22 @@ fn spy(
break;
}
}
// collect tvu peers but filter out replicators since their tvu is transient and we do not want
// collect tvu peers but filter out archivers since their tvu is transient and we do not want
// it to show up as a "node"
tvu_peers = spy_ref
.read()
.unwrap()
.tvu_peers()
.into_iter()
.filter(|node| !ClusterInfo::is_replicator(&node))
.filter(|node| !ClusterInfo::is_archiver(&node))
.collect::<Vec<_>>();
replicators = spy_ref.read().unwrap().storage_peers();
archivers = spy_ref.read().unwrap().storage_peers();
if let Some(num) = num_nodes {
if tvu_peers.len() + replicators.len() >= num {
if tvu_peers.len() + archivers.len() >= num {
if let Some(ipaddr) = find_node_by_ipaddr {
if tvu_peers
.iter()
.chain(replicators.iter())
.chain(archivers.iter())
.any(|x| x.gossip.ip() == ipaddr)
{
met_criteria = true;
@ -192,7 +192,7 @@ fn spy(
if let Some(pubkey) = find_node_by_pubkey {
if tvu_peers
.iter()
.chain(replicators.iter())
.chain(archivers.iter())
.any(|x| x.id == pubkey)
{
met_criteria = true;
@ -209,7 +209,7 @@ fn spy(
if let Some(pubkey) = find_node_by_pubkey {
if tvu_peers
.iter()
.chain(replicators.iter())
.chain(archivers.iter())
.any(|x| x.id == pubkey)
{
met_criteria = true;
@ -219,7 +219,7 @@ fn spy(
if let Some(ipaddr) = find_node_by_ipaddr {
if tvu_peers
.iter()
.chain(replicators.iter())
.chain(archivers.iter())
.any(|x| x.gossip.ip() == ipaddr)
{
met_criteria = true;
@ -238,12 +238,7 @@ fn spy(
));
i += 1;
}
(
met_criteria,
now.elapsed().as_secs(),
tvu_peers,
replicators,
)
(met_criteria, now.elapsed().as_secs(), tvu_peers, archivers)
}
/// Makes a spy or gossip node based on whether or not a gossip_addr was passed in

View File

@ -15,6 +15,7 @@ pub mod recycler;
pub mod shred_fetch_stage;
#[macro_use]
pub mod contact_info;
pub mod archiver;
pub mod blockstream;
pub mod blockstream_service;
pub mod cluster_info;
@ -39,7 +40,6 @@ pub mod poh_service;
pub mod recvmmsg;
pub mod repair_service;
pub mod replay_stage;
pub mod replicator;
pub mod result;
pub mod retransmit_stage;
pub mod rpc;

View File

@ -129,7 +129,7 @@ impl RepairService {
let repairs = {
match repair_strategy {
RepairStrategy::RepairRange(ref repair_slot_range) => {
// Strategy used by replicators
// Strategy used by archivers
Self::generate_repairs_in_range(
blocktree,
MAX_REPAIR_LENGTH,

View File

@ -1,5 +1,5 @@
// A stage that handles generating the keys used to encrypt the ledger and sample it
// for storage mining. Replicators submit storage proofs, validator then bundles them
// for storage mining. Archivers submit storage proofs, validator then bundles them
// to submit its proof for mining to be rewarded.
use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys;
@ -12,7 +12,7 @@ use rand_chacha::ChaChaRng;
use solana_ledger::bank_forks::BankForks;
use solana_ledger::blocktree::Blocktree;
use solana_runtime::bank::Bank;
use solana_runtime::storage_utils::replicator_accounts;
use solana_runtime::storage_utils::archiver_accounts;
use solana_sdk::account::Account;
use solana_sdk::account_utils::State;
use solana_sdk::clock::get_segment_from_slot;
@ -39,13 +39,13 @@ use std::{cmp, io};
// Vec of [ledger blocks] x [keys]
type StorageResults = Vec<Hash>;
type StorageKeys = Vec<u8>;
type ReplicatorMap = Vec<HashMap<Pubkey, Vec<Proof>>>;
type ArchiverMap = Vec<HashMap<Pubkey, Vec<Proof>>>;
#[derive(Default)]
pub struct StorageStateInner {
storage_results: StorageResults,
pub storage_keys: StorageKeys,
replicator_map: ReplicatorMap,
archiver_map: ArchiverMap,
storage_blockhash: Hash,
slot: u64,
slots_per_segment: u64,
@ -92,12 +92,12 @@ impl StorageState {
pub fn new(hash: &Hash, slots_per_turn: u64, slots_per_segment: u64) -> Self {
let storage_keys = vec![0u8; KEY_SIZE * NUM_IDENTITIES];
let storage_results = vec![Hash::default(); NUM_IDENTITIES];
let replicator_map = vec![];
let archiver_map = vec![];
let state = StorageStateInner {
storage_keys,
storage_results,
replicator_map,
archiver_map,
slots_per_turn,
slot: 0,
slots_per_segment,
@ -140,17 +140,16 @@ impl StorageState {
const MAX_PUBKEYS_TO_RETURN: usize = 5;
let index =
get_segment_from_slot(slot, self.state.read().unwrap().slots_per_segment) as usize;
let replicator_map = &self.state.read().unwrap().replicator_map;
let archiver_map = &self.state.read().unwrap().archiver_map;
let working_bank = bank_forks.read().unwrap().working_bank();
let accounts = replicator_accounts(&working_bank);
if index < replicator_map.len() {
let accounts = archiver_accounts(&working_bank);
if index < archiver_map.len() {
//perform an account owner lookup
let mut slot_replicators = replicator_map[index]
let mut slot_archivers = archiver_map[index]
.keys()
.filter_map(|account_id| {
accounts.get(account_id).and_then(|account| {
if let Ok(StorageContract::ReplicatorStorage { owner, .. }) =
account.state()
if let Ok(StorageContract::ArchiverStorage { owner, .. }) = account.state()
{
Some(owner)
} else {
@ -159,8 +158,8 @@ impl StorageState {
})
})
.collect::<Vec<_>>();
slot_replicators.truncate(MAX_PUBKEYS_TO_RETURN);
slot_replicators
slot_archivers.truncate(MAX_PUBKEYS_TO_RETURN);
slot_archivers
} else {
vec![]
}
@ -448,7 +447,7 @@ impl StorageStage {
storage_state: &Arc<RwLock<StorageStateInner>>,
current_key_idx: &mut usize,
) {
if let Ok(StorageContract::ReplicatorStorage { proofs, .. }) = account.state() {
if let Ok(StorageContract::ArchiverStorage { proofs, .. }) = account.state() {
//convert slot to segment
let segment = get_segment_from_slot(slot, slots_per_segment);
if let Some(proofs) = proofs.get(&segment) {
@ -467,16 +466,14 @@ impl StorageStage {
}
let mut statew = storage_state.write().unwrap();
if statew.replicator_map.len() < segment as usize {
statew
.replicator_map
.resize(segment as usize, HashMap::new());
if statew.archiver_map.len() < segment as usize {
statew.archiver_map.resize(segment as usize, HashMap::new());
}
let proof_segment_index = proof.segment_index as usize;
if proof_segment_index < statew.replicator_map.len() {
if proof_segment_index < statew.archiver_map.len() {
// TODO randomly select and verify the proof first
// Copy the submitted proof
statew.replicator_map[proof_segment_index]
statew.archiver_map[proof_segment_index]
.entry(account_id)
.or_default()
.push(proof.clone());
@ -510,11 +507,11 @@ impl StorageStage {
storage_slots.slot_count += 1;
storage_slots.last_root = bank.slot();
if storage_slots.slot_count % slots_per_turn == 0 {
// load all the replicator accounts in the bank. collect all their proofs at the current slot
let replicator_accounts = replicator_accounts(bank.as_ref());
// load all the archiver accounts in the bank. collect all their proofs at the current slot
let archiver_accounts = archiver_accounts(bank.as_ref());
// find proofs, and use them to update
// the storage_keys with their signatures
for (account_id, account) in replicator_accounts.into_iter() {
for (account_id, account) in archiver_accounts.into_iter() {
Self::collect_proofs(
bank.slot(),
bank.slots_per_segment(),
@ -553,13 +550,13 @@ impl StorageStage {
storage_keypair: &Arc<Keypair>,
ix_sender: &Sender<Instruction>,
) -> Result<()> {
// bundle up mining submissions from replicators
// bundle up mining submissions from archivers
// and submit them in a tx to the leader to get rewarded.
let mut w_state = storage_state.write().unwrap();
let mut max_proof_mask = 0;
let proof_mask_limit = storage_instruction::proof_mask_limit();
let instructions: Vec<_> = w_state
.replicator_map
.archiver_map
.iter_mut()
.enumerate()
.flat_map(|(_, proof_map)| {

View File

@ -118,7 +118,7 @@ where
);
if !packets.packets.is_empty() {
// Ignore the send error, as the retransmit is optional (e.g. replicators don't retransmit)
// Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit)
let _ = retransmit.send(packets);
}