Add genesis blockhash to replicators and blob filter for window (#4275)

* Add genesis blockhash to replicators and blob filter for window

* Fixes to mining submission and ledger download

* Add todo over sleep

* Update log
This commit is contained in:
Sagar Dhawan
2019-05-13 21:19:51 -07:00
committed by GitHub
parent 3bd921264a
commit 88c2d0fad4
3 changed files with 86 additions and 77 deletions

View File

@ -2,8 +2,9 @@ use crate::blob_fetch_stage::BlobFetchStage;
use crate::blocktree::Blocktree;
#[cfg(feature = "chacha")]
use crate::chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE};
use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE};
use crate::cluster_info::{ClusterInfo, Node};
use crate::contact_info::ContactInfo;
use crate::fullnode::new_banks_from_blocktree;
use crate::gossip_service::GossipService;
use crate::packet::to_shared_blob;
use crate::repair_service::{RepairSlotRange, RepairStrategy};
@ -18,7 +19,7 @@ use rand::thread_rng;
use rand::Rng;
use solana_client::rpc_client::RpcClient;
use solana_client::rpc_request::RpcRequest;
use solana_client::thin_client::{create_client, ThinClient};
use solana_client::thin_client::ThinClient;
use solana_ed25519_dalek as ed25519_dalek;
use solana_sdk::client::{AsyncClient, SyncClient};
use solana_sdk::hash::{Hash, Hasher};
@ -64,7 +65,7 @@ pub struct Replicator {
keypair: Arc<Keypair>,
storage_keypair: Arc<Keypair>,
signature: ed25519_dalek::Signature,
cluster_entrypoint: ContactInfo,
client: ThinClient,
ledger_data_file_encrypted: PathBuf,
sampling_offsets: Vec<u64>,
hash: Hash,
@ -188,15 +189,15 @@ impl Replicator {
cluster_info.set_entrypoint(cluster_entrypoint.clone());
let cluster_info = Arc::new(RwLock::new(cluster_info));
// Create Blocktree, eventually will simply repurpose the input
// ledger path as the Blocktree path once we replace the ledger with
// Blocktree. Note for now, this ledger will not contain any of the existing entries
// Note for now, this ledger will not contain any of the existing entries
// in the ledger located at ledger_path, and will only append on newly received
// entries after being passed to window_service
let blocktree =
Blocktree::open(ledger_path).expect("Expected to be able to open database ledger");
let (bank_forks, bank_forks_info, blocktree, _, _, _) =
new_banks_from_blocktree(ledger_path, None);
let blocktree = Arc::new(blocktree);
let bank_info = &bank_forks_info[0];
let bank = bank_forks[bank_info.bank_slot].clone();
let genesis_blockhash = bank.last_blockhash();
let gossip_service = GossipService::new(
&cluster_info,
@ -231,7 +232,6 @@ impl Replicator {
let (retransmit_sender, retransmit_receiver) = channel();
let window_service = WindowService::new(
None, //TODO: need a way to validate blobs... https://github.com/solana-labs/solana/issues/3924
blocktree.clone(),
cluster_info.clone(),
blob_fetch_receiver,
@ -239,7 +239,8 @@ impl Replicator {
repair_socket,
&exit,
RepairStrategy::RepairRange(repair_slot_range),
&Hash::default(),
&genesis_blockhash,
|_, _, _| true,
);
Self::setup_mining_account(&client, &keypair, &storage_keypair)?;
@ -261,11 +262,8 @@ impl Replicator {
let t_replicate = {
let exit = exit.clone();
let blocktree = blocktree.clone();
spawn(move || loop {
Self::wait_for_ledger_download(slot, &blocktree, &exit, &node_info, &cluster_info);
if exit.load(Ordering::Relaxed) {
break;
}
spawn(move || {
Self::wait_for_ledger_download(slot, &blocktree, &exit, &node_info, &cluster_info)
})
};
//always push this last
@ -282,7 +280,7 @@ impl Replicator {
keypair,
storage_keypair,
signature,
cluster_entrypoint,
client,
ledger_data_file_encrypted: PathBuf::default(),
sampling_offsets: vec![],
hash: Hash::default(),
@ -305,6 +303,8 @@ impl Replicator {
break;
}
self.submit_mining_proof();
// TODO: Replicators should be submitting proofs as fast as possible
sleep(Duration::from_secs(2));
}
}
@ -315,16 +315,17 @@ impl Replicator {
node_info: &ContactInfo,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) {
info!("window created, waiting for ledger download");
let mut _received_so_far = 0;
info!(
"window created, waiting for ledger download starting at slot {:?}",
start_slot
);
let mut current_slot = start_slot;
'outer: loop {
while let Ok(meta) = blocktree.meta(current_slot) {
if let Some(meta) = meta {
if meta.is_connected {
if meta.is_full() {
current_slot += 1;
warn!("current slot: {}", current_slot);
info!("current slot: {}", current_slot);
if current_slot >= start_slot + SLOTS_PER_SEGMENT {
break 'outer;
}
@ -444,20 +445,25 @@ impl Replicator {
}
fn submit_mining_proof(&self) {
let client = create_client(
self.cluster_entrypoint.client_facing_addr(),
FULLNODE_PORT_RANGE,
);
// No point if we've got no storage account...
assert!(
client
self.client
.poll_get_balance(&self.storage_keypair.pubkey())
.unwrap()
> 0
);
// ...or no lamports for fees
assert!(client.poll_get_balance(&self.keypair.pubkey()).unwrap() > 0);
assert!(
self.client
.poll_get_balance(&self.keypair.pubkey())
.unwrap()
> 0
);
let blockhash = self
.client
.get_recent_blockhash()
.expect("No recent blockhash");
let instruction = storage_instruction::mining_proof(
&self.storage_keypair.pubkey(),
self.hash,
@ -465,8 +471,12 @@ impl Replicator {
Signature::new(&self.signature.to_bytes()),
);
let message = Message::new_with_payer(vec![instruction], Some(&self.keypair.pubkey()));
let mut transaction = Transaction::new_unsigned(message);
client
let mut transaction = Transaction::new(
&[self.keypair.as_ref(), self.storage_keypair.as_ref()],
message,
blockhash,
);
self.client
.send_and_confirm_transaction(
&[&self.keypair, &self.storage_keypair],
&mut transaction,

View File

@ -9,7 +9,7 @@ use crate::result::{Error, Result};
use crate::service::Service;
use crate::staking_utils;
use crate::streamer::BlobReceiver;
use crate::window_service::WindowService;
use crate::window_service::{should_retransmit_and_persist, WindowService};
use solana_metrics::{datapoint, inc_new_counter_info};
use solana_runtime::epoch_schedule::EpochSchedule;
use solana_sdk::hash::Hash;
@ -135,8 +135,8 @@ impl RetransmitStage {
completed_slots_receiver,
epoch_schedule,
};
let leader_schedule_cache = leader_schedule_cache.clone();
let window_service = WindowService::new(
Some(leader_schedule_cache.clone()),
blocktree,
cluster_info.clone(),
fetch_stage_receiver,
@ -145,6 +145,9 @@ impl RetransmitStage {
exit,
repair_strategy,
genesis_blockhash,
move |id, blob, working_bank| {
should_retransmit_and_persist(blob, working_bank, &leader_schedule_cache, id)
},
);
let thread_hdls = vec![t_retransmit];

View File

@ -1,11 +1,9 @@
//! `window_service` handles the data plane incoming blobs, storing them in
//! blocktree and retransmitting where required
//!
use crate::bank_forks::BankForks;
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::leader_schedule_utils::slot_leader_at;
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use crate::repair_service::{RepairService, RepairStrategy};
use crate::result::{Error, Result};
@ -77,19 +75,16 @@ fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc<Blocktree>) -> Result<()>
}
/// drop blobs that are from myself or not from the correct leader for the
/// blob's slot
fn should_retransmit_and_persist(
/// blob's slot
pub fn should_retransmit_and_persist(
blob: &Blob,
bank: Option<&Arc<Bank>>,
leader_schedule_cache: Option<&Arc<LeaderScheduleCache>>,
bank: Option<Arc<Bank>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
my_id: &Pubkey,
) -> bool {
let slot_leader_id = match bank {
None => leader_schedule_cache.and_then(|cache| cache.slot_leader_at(blob.slot(), None)),
Some(bank) => match leader_schedule_cache {
None => slot_leader_at(blob.slot(), &bank),
Some(cache) => cache.slot_leader_at(blob.slot(), Some(bank)),
},
None => leader_schedule_cache.slot_leader_at(blob.slot(), None),
Some(bank) => leader_schedule_cache.slot_leader_at(blob.slot(), Some(&bank)),
};
if blob.id() == *my_id {
@ -106,15 +101,17 @@ fn should_retransmit_and_persist(
}
}
fn recv_window(
bank_forks: Option<&Arc<RwLock<BankForks>>>,
leader_schedule_cache: Option<&Arc<LeaderScheduleCache>>,
fn recv_window<F>(
blocktree: &Arc<Blocktree>,
my_id: &Pubkey,
r: &BlobReceiver,
retransmit: &BlobSender,
genesis_blockhash: &Hash,
) -> Result<()> {
blob_filter: F,
) -> Result<()>
where
F: Fn(&Blob) -> bool,
{
let timer = Duration::from_millis(200);
let mut blobs = r.recv_timeout(timer)?;
@ -125,14 +122,8 @@ fn recv_window(
inc_new_counter_info!("streamer-recv_window-recv", blobs.len(), 0, 1000);
blobs.retain(|blob| {
should_retransmit_and_persist(
&blob.read().unwrap(),
bank_forks
.map(|bank_forks| bank_forks.read().unwrap().working_bank())
.as_ref(),
leader_schedule_cache,
my_id,
) && blob.read().unwrap().genesis_blockhash() == *genesis_blockhash
blob_filter(&blob.read().unwrap())
&& blob.read().unwrap().genesis_blockhash() == *genesis_blockhash
});
retransmit_blobs(&blobs, retransmit, my_id)?;
@ -174,8 +165,7 @@ pub struct WindowService {
impl WindowService {
#[allow(clippy::too_many_arguments)]
pub fn new(
leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
pub fn new<F>(
blocktree: Arc<Blocktree>,
cluster_info: Arc<RwLock<ClusterInfo>>,
r: BlobReceiver,
@ -184,7 +174,14 @@ impl WindowService {
exit: &Arc<AtomicBool>,
repair_strategy: RepairStrategy,
genesis_blockhash: &Hash,
) -> WindowService {
blob_filter: F,
) -> WindowService
where
F: 'static
+ Fn(&Pubkey, &Blob, Option<Arc<Bank>>) -> bool
+ std::marker::Send
+ std::marker::Sync,
{
let bank_forks = match repair_strategy {
RepairStrategy::RepairRange(_) => None,
@ -199,8 +196,9 @@ impl WindowService {
repair_strategy,
);
let exit = exit.clone();
let leader_schedule_cache = leader_schedule_cache.clone();
let hash = *genesis_blockhash;
let blob_filter = Arc::new(blob_filter);
let bank_forks = bank_forks.clone();
let t_window = Builder::new()
.name("solana-window".to_string())
.spawn(move || {
@ -212,15 +210,15 @@ impl WindowService {
break;
}
if let Err(e) = recv_window(
bank_forks.as_ref(),
leader_schedule_cache.as_ref(),
&blocktree,
&id,
&r,
&retransmit,
&hash,
) {
if let Err(e) = recv_window(&blocktree, &id, &r, &retransmit, &hash, |blob| {
blob_filter(
&id,
blob,
bank_forks
.as_ref()
.map(|bank_forks| bank_forks.read().unwrap().working_bank()),
)
}) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
@ -307,20 +305,20 @@ mod test {
// without a Bank and blobs not from me, blob continues
assert_eq!(
should_retransmit_and_persist(&blob, None, None, &me_id),
should_retransmit_and_persist(&blob, None, &cache, &me_id),
true
);
// with a Bank for slot 0, blob continues
assert_eq!(
should_retransmit_and_persist(&blob, Some(&bank), Some(&cache), &me_id),
should_retransmit_and_persist(&blob, Some(bank.clone()), &cache, &me_id),
true
);
// set the blob to have come from the wrong leader
blob.set_id(&Pubkey::new_rand());
assert_eq!(
should_retransmit_and_persist(&blob, Some(&bank), Some(&cache), &me_id),
should_retransmit_and_persist(&blob, Some(bank.clone()), &cache, &me_id),
false
);
@ -328,14 +326,14 @@ mod test {
// TODO: persist in blocktree that we didn't know who the leader was at the time?
blob.set_slot(MINIMUM_SLOT_LENGTH as u64 * 3);
assert_eq!(
should_retransmit_and_persist(&blob, Some(&bank), Some(&cache), &me_id),
should_retransmit_and_persist(&blob, Some(bank), &cache, &me_id),
true
);
// if the blob came back from me, it doesn't continue, whether or not I have a bank
blob.set_id(&me_id);
assert_eq!(
should_retransmit_and_persist(&blob, None, None, &me_id),
should_retransmit_and_persist(&blob, None, &cache, &me_id),
false
);
}
@ -361,7 +359,6 @@ mod test {
let blocktree = Arc::new(blocktree);
let bank = Bank::new(&create_genesis_block_with_leader(100, &me_id, 10).0);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let repair_strategy = RepairStrategy::RepairAll {
bank_forks: bank_forks.clone(),
@ -374,7 +371,6 @@ mod test {
.clone(),
};
let t_window = WindowService::new(
Some(leader_schedule_cache),
blocktree,
subs,
r_reader,
@ -383,6 +379,7 @@ mod test {
&exit,
repair_strategy,
&Hash::default(),
|_, _, _| true,
);
let t_responder = {
let (s_responder, r_responder) = channel();
@ -449,7 +446,6 @@ mod test {
let blocktree = Arc::new(blocktree);
let bank = Bank::new(&create_genesis_block_with_leader(100, &me_id, 10).0);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let epoch_schedule = *bank_forks.read().unwrap().working_bank().epoch_schedule();
let repair_strategy = RepairStrategy::RepairAll {
@ -458,7 +454,6 @@ mod test {
epoch_schedule,
};
let t_window = WindowService::new(
Some(leader_schedule_cache),
blocktree,
subs.clone(),
r_reader,
@ -467,6 +462,7 @@ mod test {
&exit,
repair_strategy,
&Hash::default(),
|_, _, _| true,
);
let t_responder = {
let (s_responder, r_responder) = channel();